Monorepo for Tangled tangled.org

add in-memory jetstream did filter

Changed files
+120 -207
appview
db
pages
templates
state
cmd
jstest
knotserver
jetstream
knotserver
+6 -10
appview/db/jetstream.go
··· 5 5 } 6 6 7 7 func (db DbWrapper) SaveLastTimeUs(lastTimeUs int64) error { 8 - _, err := db.Exec(`insert into _jetstream (last_time_us) values (?)`, lastTimeUs) 8 + _, err := db.Exec(` 9 + insert into _jetstream (id, last_time_us) 10 + values (1, ?) 11 + on conflict(id) do update set last_time_us = excluded.last_time_us 12 + `, lastTimeUs) 9 13 return err 10 14 } 11 15 12 - func (db DbWrapper) UpdateLastTimeUs(lastTimeUs int64) error { 13 - _, err := db.Exec(`update _jetstream set last_time_us = ? where rowid = 1`, lastTimeUs) 14 - if err != nil { 15 - return err 16 - } 17 - return nil 18 - } 19 - 20 16 func (db DbWrapper) GetLastTimeUs() (int64, error) { 21 17 var lastTimeUs int64 22 - row := db.QueryRow(`select last_time_us from _jetstream`) 18 + row := db.QueryRow(`select last_time_us from _jetstream where id = 1;`) 23 19 err := row.Scan(&lastTimeUs) 24 20 return lastTimeUs, err 25 21 }
+1 -2
appview/pages/templates/knots.html
··· 8 8 <section class="rounded bg-white dark:bg-gray-800 drop-shadow-sm px-6 py-4 mb-6 w-full lg:w-fit"> 9 9 <p class="mb-8 dark:text-gray-300">Generate a key to initialize your knot server.</p> 10 10 <form 11 - hx-put="/knots/key" 12 - hx-swap="none" 11 + hx-post="/knots/key" 13 12 class="max-w-2xl mb-8 space-y-4" 14 13 > 15 14 <input
+1 -1
appview/state/jetstream.go
··· 20 20 defer func() { 21 21 eventTime := e.TimeUS 22 22 lastTimeUs := eventTime + 1 23 - if err := d.UpdateLastTimeUs(lastTimeUs); err != nil { 23 + if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 24 24 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 25 25 } 26 26 }()
+9 -1
appview/state/state.go
··· 60 60 resolver := appview.NewResolver() 61 61 62 62 wrapper := db.DbWrapper{d} 63 - jc, err := jetstream.NewJetstreamClient(config.JetstreamEndpoint, "appview", []string{tangled.GraphFollowNSID}, nil, slog.Default(), wrapper, false) 63 + jc, err := jetstream.NewJetstreamClient( 64 + config.JetstreamEndpoint, 65 + "appview", 66 + []string{tangled.GraphFollowNSID}, 67 + nil, 68 + slog.Default(), 69 + wrapper, 70 + false, 71 + ) 64 72 if err != nil { 65 73 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 66 74 }
-150
cmd/jstest/main.go
··· 1 - package main 2 - 3 - import ( 4 - "context" 5 - "flag" 6 - "log/slog" 7 - "os" 8 - "os/signal" 9 - "strings" 10 - "syscall" 11 - "time" 12 - 13 - "github.com/bluesky-social/jetstream/pkg/client" 14 - "github.com/bluesky-social/jetstream/pkg/models" 15 - "tangled.sh/tangled.sh/core/jetstream" 16 - ) 17 - 18 - // Simple in-memory implementation of DB interface 19 - type MemoryDB struct { 20 - lastTimeUs int64 21 - } 22 - 23 - func (m *MemoryDB) GetLastTimeUs() (int64, error) { 24 - if m.lastTimeUs == 0 { 25 - return time.Now().UnixMicro(), nil 26 - } 27 - return m.lastTimeUs, nil 28 - } 29 - 30 - func (m *MemoryDB) SaveLastTimeUs(ts int64) error { 31 - m.lastTimeUs = ts 32 - return nil 33 - } 34 - 35 - func (m *MemoryDB) UpdateLastTimeUs(ts int64) error { 36 - m.lastTimeUs = ts 37 - return nil 38 - } 39 - 40 - func main() { 41 - // Setup logger 42 - logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ 43 - Level: slog.LevelInfo, 44 - })) 45 - 46 - // Create in-memory DB 47 - db := &MemoryDB{} 48 - 49 - // Get query URL from flag 50 - var queryURL string 51 - flag.StringVar(&queryURL, "query-url", "", "Jetstream query URL containing DIDs") 52 - flag.Parse() 53 - 54 - if queryURL == "" { 55 - logger.Error("No query URL provided, use --query-url flag") 56 - os.Exit(1) 57 - } 58 - 59 - // Extract wantedDids parameters 60 - didParams := strings.Split(queryURL, "&wantedDids=") 61 - dids := make([]string, 0, len(didParams)-1) 62 - for i, param := range didParams { 63 - if i == 0 { 64 - // Skip the first part (the base URL with cursor) 65 - continue 66 - } 67 - dids = append(dids, param) 68 - } 69 - 70 - // Extract collections 71 - collections := []string{"sh.tangled.publicKey", "sh.tangled.knot.member"} 72 - 73 - // Create client configuration 74 - cfg := client.DefaultClientConfig() 75 - cfg.WebsocketURL = "wss://jetstream2.us-west.bsky.network/subscribe" 76 - cfg.WantedCollections = collections 77 - 78 - // Create jetstream client 79 - jsClient, err := jetstream.NewJetstreamClient( 80 - cfg.WebsocketURL, 81 - "tangled-jetstream", 82 - collections, 83 - cfg, 84 - logger, 85 - db, 86 - false, 87 - ) 88 - if err != nil { 89 - logger.Error("Failed to create jetstream client", "error", err) 90 - os.Exit(1) 91 - } 92 - 93 - // Update DIDs 94 - jsClient.UpdateDids(dids) 95 - 96 - // Create a context that will be canceled on SIGINT or SIGTERM 97 - ctx, cancel := context.WithCancel(context.Background()) 98 - defer cancel() 99 - 100 - // Setup signal handling with a buffered channel 101 - sigCh := make(chan os.Signal, 1) 102 - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) 103 - 104 - // Process function for events 105 - processFunc := func(ctx context.Context, event *models.Event) error { 106 - // Log the event details 107 - logger.Info("Received event", 108 - "collection", event.Commit.Collection, 109 - "did", event.Did, 110 - "rkey", event.Commit.RKey, 111 - "action", event.Kind, 112 - "time_us", event.TimeUS, 113 - ) 114 - 115 - // Save the last time_us 116 - if err := db.UpdateLastTimeUs(event.TimeUS); err != nil { 117 - logger.Error("Failed to update last time_us", "error", err) 118 - } 119 - 120 - return nil 121 - } 122 - 123 - // Start jetstream 124 - if err := jsClient.StartJetstream(ctx, processFunc); err != nil { 125 - logger.Error("Failed to start jetstream", "error", err) 126 - os.Exit(1) 127 - } 128 - 129 - // Wait for signal instead of context.Done() 130 - sig := <-sigCh 131 - logger.Info("Received signal, shutting down", "signal", sig) 132 - cancel() // Cancel context after receiving signal 133 - 134 - // Shutdown gracefully with a timeout 135 - shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) 136 - defer shutdownCancel() 137 - 138 - done := make(chan struct{}) 139 - go func() { 140 - jsClient.Shutdown() 141 - close(done) 142 - }() 143 - 144 - select { 145 - case <-done: 146 - logger.Info("Jetstream client shut down gracefully") 147 - case <-shutdownCtx.Done(): 148 - logger.Warn("Shutdown timed out, forcing exit") 149 - } 150 - }
+1 -1
cmd/knotserver/main.go
··· 49 49 jc, err := jetstream.NewJetstreamClient(c.Server.JetstreamEndpoint, "knotserver", []string{ 50 50 tangled.PublicKeyNSID, 51 51 tangled.KnotMemberNSID, 52 - }, nil, l, db, false) 52 + }, nil, l, db, true) 53 53 if err != nil { 54 54 l.Error("failed to setup jetstream", "error", err) 55 55 }
+86 -24
jetstream/jetstream.go
··· 4 4 "context" 5 5 "fmt" 6 6 "log/slog" 7 + "os" 8 + "os/signal" 7 9 "sync" 10 + "syscall" 8 11 "time" 9 12 10 13 "github.com/bluesky-social/jetstream/pkg/client" ··· 16 19 type DB interface { 17 20 GetLastTimeUs() (int64, error) 18 21 SaveLastTimeUs(int64) error 19 - UpdateLastTimeUs(int64) error 20 22 } 21 23 24 + type Set[T comparable] map[T]struct{} 25 + 22 26 type JetstreamClient struct { 23 27 cfg *client.ClientConfig 24 28 client *client.Client 25 29 ident string 26 30 l *slog.Logger 27 31 32 + wantedDids Set[string] 28 33 db DB 29 34 waitForDid bool 30 35 mu sync.RWMutex ··· 37 42 if did == "" { 38 43 return 39 44 } 45 + 40 46 j.mu.Lock() 41 - j.cfg.WantedDids = append(j.cfg.WantedDids, did) 47 + j.wantedDids[did] = struct{}{} 42 48 j.mu.Unlock() 43 49 } 44 50 45 - func (j *JetstreamClient) UpdateDids(dids []string) { 46 - j.mu.Lock() 47 - for _, did := range dids { 48 - if did != "" { 49 - j.cfg.WantedDids = append(j.cfg.WantedDids, did) 51 + type processor func(context.Context, *models.Event) error 52 + 53 + func (j *JetstreamClient) withDidFilter(processFunc processor) processor { 54 + // since this closure references j.WantedDids; it should auto-update 55 + // existing instances of the closure when j.WantedDids is mutated 56 + return func(ctx context.Context, evt *models.Event) error { 57 + if _, ok := j.wantedDids[evt.Did]; ok { 58 + return processFunc(ctx, evt) 59 + } else { 60 + return nil 50 61 } 51 62 } 52 - j.mu.Unlock() 53 - 54 - j.cancelMu.Lock() 55 - if j.cancel != nil { 56 - j.cancel() 57 - } 58 - j.cancelMu.Unlock() 59 63 } 60 64 61 65 func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid bool) (*JetstreamClient, error) { ··· 66 70 } 67 71 68 72 return &JetstreamClient{ 69 - cfg: cfg, 70 - ident: ident, 71 - db: db, 72 - l: logger, 73 + cfg: cfg, 74 + ident: ident, 75 + db: db, 76 + l: logger, 77 + wantedDids: make(map[string]struct{}), 73 78 74 79 // This will make the goroutine in StartJetstream wait until 75 - // cfg.WantedDids has been populated, typically using UpdateDids. 80 + // j.wantedDids has been populated, typically using addDids. 76 81 waitForDid: waitForDid, 77 82 }, nil 78 83 } 79 84 80 85 // StartJetstream starts the jetstream client and processes events using the provided processFunc. 81 - // The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs). 86 + // The caller is responsible for saving the last time_us to the database (just use your db.UpdateLastTimeUs). 82 87 func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error { 83 88 logger := j.l 84 89 85 - sched := sequential.NewScheduler(j.ident, logger, processFunc) 90 + sched := sequential.NewScheduler(j.ident, logger, j.withDidFilter(processFunc)) 86 91 87 92 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched) 88 93 if err != nil { ··· 92 97 93 98 go func() { 94 99 if j.waitForDid { 95 - for len(j.cfg.WantedDids) == 0 { 100 + for len(j.wantedDids) == 0 { 96 101 time.Sleep(time.Second) 97 102 } 98 103 } 99 104 logger.Info("done waiting for did") 105 + 106 + go j.periodicLastTimeSave(ctx) 107 + j.saveIfKilled(ctx) 108 + 100 109 j.connectAndRead(ctx) 101 110 }() 102 111 ··· 130 139 } 131 140 } 132 141 142 + // save cursor periodically 143 + func (j *JetstreamClient) periodicLastTimeSave(ctx context.Context) { 144 + ticker := time.NewTicker(time.Minute) 145 + defer ticker.Stop() 146 + 147 + for { 148 + select { 149 + case <-ctx.Done(): 150 + return 151 + case <-ticker.C: 152 + j.db.SaveLastTimeUs(time.Now().UnixMicro()) 153 + } 154 + } 155 + } 156 + 133 157 func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 { 134 158 l := log.FromContext(ctx) 135 159 lastTimeUs, err := j.db.GetLastTimeUs() ··· 142 166 } 143 167 } 144 168 145 - // If last time is older than a week, start from now 169 + // If last time is older than 2 days, start from now 146 170 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 { 147 171 lastTimeUs = time.Now().UnixMicro() 148 172 l.Warn("last time us is older than 2 days; discarding that and starting from now") 149 - err = j.db.UpdateLastTimeUs(lastTimeUs) 173 + err = j.db.SaveLastTimeUs(lastTimeUs) 150 174 if err != nil { 151 175 l.Error("failed to save last time us", "error", err) 152 176 } ··· 155 179 l.Info("found last time_us", "time_us", lastTimeUs) 156 180 return &lastTimeUs 157 181 } 182 + 183 + func (j *JetstreamClient) saveIfKilled(ctx context.Context) context.Context { 184 + ctxWithCancel, cancel := context.WithCancel(ctx) 185 + 186 + sigChan := make(chan os.Signal, 1) 187 + 188 + signal.Notify(sigChan, 189 + syscall.SIGINT, 190 + syscall.SIGTERM, 191 + syscall.SIGQUIT, 192 + syscall.SIGHUP, 193 + syscall.SIGKILL, 194 + syscall.SIGSTOP, 195 + ) 196 + 197 + go func() { 198 + sig := <-sigChan 199 + j.l.Info("Received signal, initiating graceful shutdown", "signal", sig) 200 + 201 + lastTimeUs := time.Now().UnixMicro() 202 + if err := j.db.SaveLastTimeUs(lastTimeUs); err != nil { 203 + j.l.Error("Failed to save last time during shutdown", "error", err) 204 + } 205 + j.l.Info("Saved lastTimeUs before shutdown", "lastTimeUs", lastTimeUs) 206 + 207 + j.cancelMu.Lock() 208 + if j.cancel != nil { 209 + j.cancel() 210 + } 211 + j.cancelMu.Unlock() 212 + 213 + cancel() 214 + 215 + os.Exit(0) 216 + }() 217 + 218 + return ctxWithCancel 219 + }
+6 -10
knotserver/db/jetstream.go
··· 1 1 package db 2 2 3 3 func (d *DB) SaveLastTimeUs(lastTimeUs int64) error { 4 - _, err := d.db.Exec(`insert into _jetstream (last_time_us) values (?)`, lastTimeUs) 4 + _, err := d.db.Exec(` 5 + insert into _jetstream (id, last_time_us) 6 + values (1, ?) 7 + on conflict(id) do update set last_time_us = excluded.last_time_us 8 + `, lastTimeUs) 5 9 return err 6 10 } 7 11 8 - func (d *DB) UpdateLastTimeUs(lastTimeUs int64) error { 9 - _, err := d.db.Exec(`update _jetstream set last_time_us = ? where rowid = 1`, lastTimeUs) 10 - if err != nil { 11 - return err 12 - } 13 - return nil 14 - } 15 - 16 12 func (d *DB) GetLastTimeUs() (int64, error) { 17 13 var lastTimeUs int64 18 - row := d.db.QueryRow(`select last_time_us from _jetstream`) 14 + row := d.db.QueryRow(`select last_time_us from _jetstream where id = 1;`) 19 15 err := row.Scan(&lastTimeUs) 20 16 return lastTimeUs, err 21 17 }
+2 -2
knotserver/db/pubkeys.go
··· 44 44 return err 45 45 } 46 46 47 - func (pk *PublicKey) JSON() map[string]interface{} { 48 - return map[string]interface{}{ 47 + func (pk *PublicKey) JSON() map[string]any { 48 + return map[string]any{ 49 49 "did": pk.Did, 50 50 "key": pk.Key, 51 51 "created": pk.Created,
+3 -1
knotserver/handler.go
··· 63 63 if len(dids) > 0 { 64 64 h.knotInitialized = true 65 65 close(h.init) 66 - // h.jc.UpdateDids(dids) 66 + for _, d := range dids { 67 + h.jc.AddDid(d) 68 + } 67 69 } 68 70 69 71 r.Get("/", h.Index)
+2 -2
knotserver/jetstream.go
··· 53 53 l.Error("failed to add did", "error", err) 54 54 return fmt.Errorf("failed to add did: %w", err) 55 55 } 56 + h.jc.AddDid(did) 56 57 57 58 if err := h.fetchAndAddKeys(ctx, did); err != nil { 58 59 return fmt.Errorf("failed to fetch and add keys: %w", err) ··· 115 116 eventTime := event.TimeUS 116 117 lastTimeUs := eventTime + 1 117 118 fmt.Println("lastTimeUs", lastTimeUs) 118 - if err := h.db.UpdateLastTimeUs(lastTimeUs); err != nil { 119 + if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 119 120 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 120 121 } 121 - // h.jc.UpdateDids([]string{did}) 122 122 }() 123 123 124 124 raw := json.RawMessage(event.Commit.Record)
+3 -3
knotserver/routes.go
··· 448 448 return 449 449 } 450 450 451 - data := make([]map[string]interface{}, 0) 451 + data := make([]map[string]any, 0) 452 452 for _, key := range keys { 453 453 j := key.JSON() 454 454 data = append(data, j) ··· 684 684 writeError(w, err.Error(), http.StatusInternalServerError) 685 685 return 686 686 } 687 - 688 687 h.jc.AddDid(did) 688 + 689 689 if err := h.e.AddMember(ThisServer, did); err != nil { 690 690 l.Error("adding member", "error", err.Error()) 691 691 writeError(w, err.Error(), http.StatusInternalServerError) ··· 768 768 writeError(w, err.Error(), http.StatusInternalServerError) 769 769 return 770 770 } 771 + h.jc.AddDid(data.Did) 771 772 772 - // h.jc.UpdateDids([]string{data.Did}) 773 773 if err := h.e.AddOwner(ThisServer, data.Did); err != nil { 774 774 l.Error("adding owner", "error", err.Error()) 775 775 writeError(w, err.Error(), http.StatusInternalServerError)