bluesky appview implementation using microcosm and other services
server.reddwarf.app
appview
bluesky
reddwarf
microcosm
1package backstream
2
3import (
4 "crypto/rand"
5 "encoding/hex"
6 "log"
7 "sync"
8 "time"
9)
10
11type Session struct {
12 Ticket string
13 Params BackfillParams
14 LastAccessed time.Time
15
16 ListReposCursor string // Cursor for listReposByCollection if wantedDids=*
17 CompletedDIDs map[string]bool // Set of DIDs that have been fully processed.
18 listRecordsCursors map[string]string // Key: "did/collection", Value: cursor
19
20 mu sync.Mutex
21}
22
23func (s *Session) GetListRecordsCursor(did, collection string) string {
24 s.mu.Lock()
25 defer s.mu.Unlock()
26 key := did + "/" + collection
27 return s.listRecordsCursors[key]
28}
29
30func (s *Session) SetListRecordsCursor(did, collection, cursor string) {
31 s.mu.Lock()
32 defer s.mu.Unlock()
33 key := did + "/" + collection
34 s.listRecordsCursors[key] = cursor
35 s.LastAccessed = time.Now()
36}
37
38func (s *Session) MarkDIDComplete(did string) {
39 s.mu.Lock()
40 defer s.mu.Unlock()
41 s.CompletedDIDs[did] = true
42 s.LastAccessed = time.Now()
43}
44
45func (s *Session) IsDIDComplete(did string) bool {
46 s.mu.Lock()
47 defer s.mu.Unlock()
48 return s.CompletedDIDs[did]
49}
50
51type SessionManager struct {
52 sessions map[string]*Session
53 ttl time.Duration
54 mu sync.Mutex
55}
56
57func NewSessionManager(ttl time.Duration) *SessionManager {
58 sm := &SessionManager{
59 sessions: make(map[string]*Session),
60 ttl: ttl,
61 }
62 go sm.cleanupLoop()
63 return sm
64}
65
66func (sm *SessionManager) GetOrCreate(ticket string, params BackfillParams) *Session {
67 sm.mu.Lock()
68 defer sm.mu.Unlock()
69
70 if session, exists := sm.sessions[ticket]; exists {
71 log.Printf("Resuming existing session for ticket: %s", ticket)
72 session.LastAccessed = time.Now()
73 if session.CompletedDIDs == nil {
74 session.CompletedDIDs = make(map[string]bool)
75 }
76 return session
77 }
78
79 log.Printf("Creating new session for ticket: %s", ticket)
80 newSession := &Session{
81 Ticket: ticket,
82 Params: params,
83 LastAccessed: time.Now(),
84 listRecordsCursors: make(map[string]string),
85 CompletedDIDs: make(map[string]bool),
86 }
87 sm.sessions[ticket] = newSession
88 return newSession
89}
90
91func (sm *SessionManager) cleanupLoop() {
92 ticker := time.NewTicker(sm.ttl / 2)
93 defer ticker.Stop()
94 for range ticker.C {
95 sm.mu.Lock()
96 now := time.Now()
97 for ticket, session := range sm.sessions {
98 if now.Sub(session.LastAccessed) > sm.ttl {
99 log.Printf("Session %s expired. Cleaning up.", ticket)
100 delete(sm.sessions, ticket)
101 }
102 }
103 sm.mu.Unlock()
104 }
105}
106
107func generateTicket() string {
108 bytes := make([]byte, 16)
109 if _, err := rand.Read(bytes); err != nil {
110 return "fallback-ticket-" + time.Now().String()
111 }
112 return hex.EncodeToString(bytes)
113}