Monorepo for Tangled tangled.org

knotserver/jetstream: actually save last time_us to db

anirudh.fi 40c080cd ea1b8530

verified
Changed files
+13 -13
knotserver
+2 -2
go.mod
··· 13 github.com/gliderlabs/ssh v0.3.5 14 github.com/go-chi/chi/v5 v5.2.0 15 github.com/go-git/go-git/v5 v5.12.0 16 - github.com/google/uuid v1.6.0 17 github.com/gorilla/sessions v1.4.0 18 - github.com/gorilla/websocket v1.5.1 19 github.com/ipfs/go-cid v0.4.1 20 github.com/mattn/go-sqlite3 v1.14.24 21 github.com/microcosm-cc/bluemonday v1.0.27 ··· 47 github.com/go-logr/stdr v1.2.2 // indirect 48 github.com/goccy/go-json v0.10.2 // indirect 49 github.com/gogo/protobuf v1.3.2 // indirect 50 github.com/gorilla/css v1.0.1 // indirect 51 github.com/gorilla/securecookie v1.1.2 // indirect 52 github.com/hashicorp/go-cleanhttp v0.5.2 // indirect 53 github.com/hashicorp/go-retryablehttp v0.7.5 // indirect 54 github.com/hashicorp/golang-lru v1.0.2 // indirect
··· 13 github.com/gliderlabs/ssh v0.3.5 14 github.com/go-chi/chi/v5 v5.2.0 15 github.com/go-git/go-git/v5 v5.12.0 16 github.com/gorilla/sessions v1.4.0 17 github.com/ipfs/go-cid v0.4.1 18 github.com/mattn/go-sqlite3 v1.14.24 19 github.com/microcosm-cc/bluemonday v1.0.27 ··· 45 github.com/go-logr/stdr v1.2.2 // indirect 46 github.com/goccy/go-json v0.10.2 // indirect 47 github.com/gogo/protobuf v1.3.2 // indirect 48 + github.com/google/uuid v1.6.0 // indirect 49 github.com/gorilla/css v1.0.1 // indirect 50 github.com/gorilla/securecookie v1.1.2 // indirect 51 + github.com/gorilla/websocket v1.5.1 // indirect 52 github.com/hashicorp/go-cleanhttp v0.5.2 // indirect 53 github.com/hashicorp/go-retryablehttp v0.7.5 // indirect 54 github.com/hashicorp/golang-lru v1.0.2 // indirect
+11 -11
knotserver/jetstream.go
··· 27 } 28 29 func (h *Handle) StartJetstream(ctx context.Context) error { 30 - l := h.l.With("component", "jetstream") 31 ctx = log.IntoContext(ctx, l) 32 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID} 33 dids := []string{} 34 35 - lastTimeUs, err := h.getLastTimeUs(ctx) 36 - if err != nil { 37 - return err 38 - } 39 - 40 cfg := client.DefaultClientConfig() 41 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 42 cfg.WantedCollections = collections ··· 58 h.jc = jc 59 60 go func() { 61 for len(h.jc.cfg.WantedDids) == 0 { 62 time.Sleep(time.Second) 63 } ··· 71 for { 72 select { 73 case <-h.jc.reconnectCh: 74 - l.Info("reconnecting jetstream client") 75 h.jc.client.Scheduler.Shutdown() 76 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil { 77 l.Error("error reading jetstream", "error", err) ··· 91 j.reconnectCh <- struct{}{} 92 } 93 94 - func (h *Handle) getLastTimeUs(ctx context.Context) (int64, error) { 95 l := log.FromContext(ctx) 96 lastTimeUs, err := h.db.GetLastTimeUs() 97 if err != nil { 98 - l.Info("couldn't get last time us, starting from now") 99 lastTimeUs = time.Now().UnixMicro() 100 } 101 102 // If last time is older than a week, start from now 103 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 104 lastTimeUs = time.Now().UnixMicro() 105 - l.Info("last time us is older than a week. discarding that and starting from now") 106 err = h.db.SaveLastTimeUs(lastTimeUs) 107 if err != nil { 108 l.Error("failed to save last time us") ··· 110 } 111 112 l.Info("found last time_us", "time_us", lastTimeUs) 113 - return lastTimeUs, nil 114 } 115 116 func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
··· 27 } 28 29 func (h *Handle) StartJetstream(ctx context.Context) error { 30 + l := h.l 31 ctx = log.IntoContext(ctx, l) 32 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID} 33 dids := []string{} 34 35 cfg := client.DefaultClientConfig() 36 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 37 cfg.WantedCollections = collections ··· 53 h.jc = jc 54 55 go func() { 56 + lastTimeUs := h.getLastTimeUs(ctx) 57 for len(h.jc.cfg.WantedDids) == 0 { 58 time.Sleep(time.Second) 59 } ··· 67 for { 68 select { 69 case <-h.jc.reconnectCh: 70 + l.Info("(re)connecting jetstream client") 71 h.jc.client.Scheduler.Shutdown() 72 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil { 73 l.Error("error reading jetstream", "error", err) ··· 87 j.reconnectCh <- struct{}{} 88 } 89 90 + func (h *Handle) getLastTimeUs(ctx context.Context) int64 { 91 l := log.FromContext(ctx) 92 lastTimeUs, err := h.db.GetLastTimeUs() 93 if err != nil { 94 + l.Warn("couldn't get last time us, starting from now", "error", err) 95 lastTimeUs = time.Now().UnixMicro() 96 + err = h.db.SaveLastTimeUs(lastTimeUs) 97 + if err != nil { 98 + l.Error("failed to save last time us") 99 + } 100 } 101 102 // If last time is older than a week, start from now 103 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 104 lastTimeUs = time.Now().UnixMicro() 105 + l.Warn("last time us is older than a week. discarding that and starting from now") 106 err = h.db.SaveLastTimeUs(lastTimeUs) 107 if err != nil { 108 l.Error("failed to save last time us") ··· 110 } 111 112 l.Info("found last time_us", "time_us", lastTimeUs) 113 + return lastTimeUs 114 } 115 116 func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {