Monorepo for Tangled tangled.org

knotserver/jetstream: preserve last time_us even on err'd exit

Also reduces last time_us discard window down to 2 days.

anirudh.fi 71252a20 cd93d12e

verified
Changed files
+16 -11
jetstream
knotserver
+2 -2
jetstream/jetstream.go
··· 141 } 142 143 // If last time is older than a week, start from now 144 - if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 145 lastTimeUs = time.Now().UnixMicro() 146 - l.Warn("last time us is older than a week. discarding that and starting from now") 147 err = j.db.UpdateLastTimeUs(lastTimeUs) 148 if err != nil { 149 l.Error("failed to save last time us", "error", err)
··· 141 } 142 143 // If last time is older than a week, start from now 144 + if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 { 145 lastTimeUs = time.Now().UnixMicro() 146 + l.Warn("last time us is older than 2 days; discarding that and starting from now") 147 err = j.db.UpdateLastTimeUs(lastTimeUs) 148 if err != nil { 149 l.Error("failed to save last time us", "error", err)
+14 -9
knotserver/jetstream.go
··· 29 return nil 30 } 31 32 - func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember, eventTime int64) error { 33 l := log.FromContext(ctx) 34 35 if record.Domain != h.c.Server.Hostname { ··· 58 return fmt.Errorf("failed to fetch and add keys: %w", err) 59 } 60 61 - lastTimeUs := eventTime + 1 62 - fmt.Println("lastTimeUs", lastTimeUs) 63 - if err := h.db.UpdateLastTimeUs(lastTimeUs); err != nil { 64 - return fmt.Errorf("failed to save last time us: %w", err) 65 - } 66 - h.jc.UpdateDids([]string{did}) 67 return nil 68 } 69 ··· 116 return nil 117 } 118 119 raw := json.RawMessage(event.Commit.Record) 120 121 switch event.Commit.Collection { ··· 133 if err := json.Unmarshal(raw, &record); err != nil { 134 return fmt.Errorf("failed to unmarshal record: %w", err) 135 } 136 - if err := h.processKnotMember(ctx, did, record, event.TimeUS); err != nil { 137 return fmt.Errorf("failed to process knot member: %w", err) 138 } 139 } 140 141 - return nil 142 }
··· 29 return nil 30 } 31 32 + func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 33 l := log.FromContext(ctx) 34 35 if record.Domain != h.c.Server.Hostname { ··· 58 return fmt.Errorf("failed to fetch and add keys: %w", err) 59 } 60 61 return nil 62 } 63 ··· 110 return nil 111 } 112 113 + var err error 114 + defer func() { 115 + eventTime := event.TimeUS 116 + lastTimeUs := eventTime + 1 117 + fmt.Println("lastTimeUs", lastTimeUs) 118 + if err := h.db.UpdateLastTimeUs(lastTimeUs); err != nil { 119 + err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 120 + } 121 + h.jc.UpdateDids([]string{did}) 122 + }() 123 + 124 raw := json.RawMessage(event.Commit.Record) 125 126 switch event.Commit.Collection { ··· 138 if err := json.Unmarshal(raw, &record); err != nil { 139 return fmt.Errorf("failed to unmarshal record: %w", err) 140 } 141 + if err := h.processKnotMember(ctx, did, record); err != nil { 142 return fmt.Errorf("failed to process knot member: %w", err) 143 } 144 } 145 146 + return err 147 }