forked from tangled.org/core
Monorepo for Tangled

defer last event time in appview ingester

Changed files
+51 -31
appview
knotserver
-2
appview/db/timeline.go
··· 1 1 package db 2 2 3 3 import ( 4 - "log" 5 4 "sort" 6 5 "time" 7 6 ) ··· 26 25 } 27 26 28 27 for _, repo := range repos { 29 - log.Println(repo.Created) 30 28 events = append(events, TimelineEvent{ 31 29 Repo: &repo, 32 30 Follow: nil,
+50
appview/state/jetstream.go
··· 1 + package state 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "log" 8 + 9 + "github.com/bluesky-social/jetstream/pkg/models" 10 + tangled "github.com/sotangled/tangled/api/tangled" 11 + "github.com/sotangled/tangled/appview/db" 12 + ) 13 + 14 + type Ingester func(ctx context.Context, e *models.Event) error 15 + 16 + func jetstreamIngester(db *db.DB) Ingester { 17 + return func(ctx context.Context, e *models.Event) error { 18 + var err error 19 + defer func() { 20 + eventTime := e.TimeUS 21 + lastTimeUs := eventTime + 1 22 + if err := db.UpdateLastTimeUs(lastTimeUs); err != nil { 23 + err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 24 + } 25 + }() 26 + 27 + if e.Kind != models.EventKindCommit { 28 + return nil 29 + } 30 + 31 + did := e.Did 32 + raw := json.RawMessage(e.Commit.Record) 33 + 34 + switch e.Commit.Collection { 35 + case tangled.GraphFollowNSID: 36 + record := tangled.GraphFollow{} 37 + err := json.Unmarshal(raw, &record) 38 + if err != nil { 39 + log.Println("invalid record") 40 + return err 41 + } 42 + err = db.AddFollow(did, record.Subject, e.Commit.RKey) 43 + if err != nil { 44 + return fmt.Errorf("failed to add follow to db: %w", err) 45 + } 46 + } 47 + 48 + return err 49 + } 50 + }
+1 -27
appview/state/state.go
··· 5 5 "crypto/hmac" 6 6 "crypto/sha256" 7 7 "encoding/hex" 8 - "encoding/json" 9 8 "fmt" 10 9 "log" 11 10 "log/slog" ··· 16 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 17 16 "github.com/bluesky-social/indigo/atproto/syntax" 18 17 lexutil "github.com/bluesky-social/indigo/lex/util" 19 - "github.com/bluesky-social/jetstream/pkg/models" 20 18 securejoin "github.com/cyphar/filepath-securejoin" 21 19 "github.com/go-chi/chi/v5" 22 20 tangled "github.com/sotangled/tangled/api/tangled" ··· 64 62 if err != nil { 65 63 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 66 64 } 67 - err = jc.StartJetstream(context.Background(), func(ctx context.Context, e *models.Event) error { 68 - if e.Kind != models.EventKindCommit { 69 - return nil 70 - } 71 - 72 - did := e.Did 73 - raw := json.RawMessage(e.Commit.Record) 74 - 75 - switch e.Commit.Collection { 76 - case tangled.GraphFollowNSID: 77 - record := tangled.GraphFollow{} 78 - err := json.Unmarshal(raw, &record) 79 - if err != nil { 80 - log.Println("invalid record") 81 - return err 82 - } 83 - err = db.AddFollow(did, record.Subject, e.Commit.RKey) 84 - if err != nil { 85 - return fmt.Errorf("failed to add follow to db: %w", err) 86 - } 87 - return db.UpdateLastTimeUs(e.TimeUS) 88 - } 89 - 90 - return nil 91 - }) 65 + err = jc.StartJetstream(context.Background(), jetstreamIngester(db)) 92 66 if err != nil { 93 67 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 94 68 }
-2
knotserver/middleware.go
··· 4 4 "crypto/hmac" 5 5 "crypto/sha256" 6 6 "encoding/hex" 7 - "log" 8 7 "net/http" 9 8 "time" 10 9 ) ··· 15 14 } 16 15 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 17 16 signature := r.Header.Get("X-Signature") 18 - log.Println(signature) 19 17 if signature == "" || !h.verifyHMAC(signature, r) { 20 18 writeError(w, "signature verification failed", http.StatusForbidden) 21 19 return