forked from tangled.org/core
Monorepo for Tangled

jestream: allow caller to optionally waitForDid

This is handy because the knotserver only subscribes to specific dids,
and we don't want jestream to run without (which defaults to all dids).
The oppsite is true for the appview which wants to see all dids.

Also adds some nil checks to ensure it doesn't break when event is not a
Commit.

anirudh.fi 0aa1229c d527fd0c

verified
Changed files
+25 -9
appview
state
cmd
knotserver
jetstream
knotserver
+7 -2
appview/state/state.go
··· 59 59 60 60 resolver := appview.NewResolver() 61 61 62 - jc, err := jetstream.NewJetstreamClient("appview", []string{tangled.GraphFollowNSID}, nil, db) 62 + jc, err := jetstream.NewJetstreamClient("appview", []string{tangled.GraphFollowNSID}, nil, db, false) 63 63 if err != nil { 64 64 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 65 65 } 66 66 err = jc.StartJetstream(context.Background(), func(ctx context.Context, e *models.Event) error { 67 + if e.Kind != models.EventKindCommit { 68 + return nil 69 + } 70 + 67 71 did := e.Did 68 - raw := e.Commit.Record 72 + fmt.Println("got event", e.Commit.Collection, e.Commit.RKey, e.Commit.Record) 73 + raw := json.RawMessage(e.Commit.Record) 69 74 70 75 switch e.Commit.Collection { 71 76 case tangled.GraphFollowNSID:
+1 -1
cmd/knotserver/main.go
··· 45 45 jc, err := jetstream.NewJetstreamClient("knotserver", []string{ 46 46 tangled.PublicKeyNSID, 47 47 tangled.KnotMemberNSID, 48 - }, nil, db) 48 + }, nil, db, true) 49 49 if err != nil { 50 50 l.Error("failed to setup jetstream", "error", err) 51 51 }
+14 -6
jetstream/jetstream.go
··· 24 24 25 25 db DB 26 26 reconnectCh chan struct{} 27 + waitForDid bool 27 28 mu sync.RWMutex 28 29 } 29 30 ··· 41 42 j.reconnectCh <- struct{}{} 42 43 } 43 44 44 - func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB) (*JetstreamClient, error) { 45 + func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB, waitForDid bool) (*JetstreamClient, error) { 45 46 if cfg == nil { 46 47 cfg = client.DefaultClientConfig() 47 48 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" ··· 49 50 } 50 51 51 52 return &JetstreamClient{ 52 - cfg: cfg, 53 - ident: ident, 54 - db: db, 53 + cfg: cfg, 54 + ident: ident, 55 + db: db, 56 + 57 + // This will make the goroutine in StartJetstream wait until 58 + // cfg.WantedDids has been populated, typically using UpdateDids. 59 + waitForDid: waitForDid, 55 60 reconnectCh: make(chan struct{}, 1), 56 61 }, nil 57 62 } ··· 82 87 83 88 go func() { 84 89 lastTimeUs := j.getLastTimeUs(ctx) 85 - for len(j.cfg.WantedDids) == 0 { 86 - time.Sleep(time.Second) 90 + if j.waitForDid { 91 + for len(j.cfg.WantedDids) == 0 { 92 + time.Sleep(time.Second) 93 + } 87 94 } 95 + logger.Info("done waiting for did") 88 96 j.connectAndRead(ctx, &lastTimeUs) 89 97 }() 90 98
+3
knotserver/jetstream.go
··· 108 108 109 109 func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 110 110 did := event.Did 111 + if event.Kind != models.EventKindCommit { 112 + return nil 113 + } 111 114 112 115 raw := json.RawMessage(event.Commit.Record) 113 116