bluesky appview implementation using microcosm and other services server.reddwarf.app
appview bluesky reddwarf microcosm
at main 2.7 kB view raw
1package backstream 2 3import ( 4 "crypto/rand" 5 "encoding/hex" 6 "log" 7 "sync" 8 "time" 9) 10 11type Session struct { 12 Ticket string 13 Params BackfillParams 14 LastAccessed time.Time 15 16 ListReposCursor string // Cursor for listReposByCollection if wantedDids=* 17 CompletedDIDs map[string]bool // Set of DIDs that have been fully processed. 18 listRecordsCursors map[string]string // Key: "did/collection", Value: cursor 19 20 mu sync.Mutex 21} 22 23func (s *Session) GetListRecordsCursor(did, collection string) string { 24 s.mu.Lock() 25 defer s.mu.Unlock() 26 key := did + "/" + collection 27 return s.listRecordsCursors[key] 28} 29 30func (s *Session) SetListRecordsCursor(did, collection, cursor string) { 31 s.mu.Lock() 32 defer s.mu.Unlock() 33 key := did + "/" + collection 34 s.listRecordsCursors[key] = cursor 35 s.LastAccessed = time.Now() 36} 37 38func (s *Session) MarkDIDComplete(did string) { 39 s.mu.Lock() 40 defer s.mu.Unlock() 41 s.CompletedDIDs[did] = true 42 s.LastAccessed = time.Now() 43} 44 45func (s *Session) IsDIDComplete(did string) bool { 46 s.mu.Lock() 47 defer s.mu.Unlock() 48 return s.CompletedDIDs[did] 49} 50 51type SessionManager struct { 52 sessions map[string]*Session 53 ttl time.Duration 54 mu sync.Mutex 55} 56 57func NewSessionManager(ttl time.Duration) *SessionManager { 58 sm := &SessionManager{ 59 sessions: make(map[string]*Session), 60 ttl: ttl, 61 } 62 go sm.cleanupLoop() 63 return sm 64} 65 66func (sm *SessionManager) GetOrCreate(ticket string, params BackfillParams) *Session { 67 sm.mu.Lock() 68 defer sm.mu.Unlock() 69 70 if session, exists := sm.sessions[ticket]; exists { 71 log.Printf("Resuming existing session for ticket: %s", ticket) 72 session.LastAccessed = time.Now() 73 if session.CompletedDIDs == nil { 74 session.CompletedDIDs = make(map[string]bool) 75 } 76 return session 77 } 78 79 log.Printf("Creating new session for ticket: %s", ticket) 80 newSession := &Session{ 81 Ticket: ticket, 82 Params: params, 83 LastAccessed: time.Now(), 84 listRecordsCursors: make(map[string]string), 85 CompletedDIDs: make(map[string]bool), 86 } 87 sm.sessions[ticket] = newSession 88 return newSession 89} 90 91func (sm *SessionManager) cleanupLoop() { 92 ticker := time.NewTicker(sm.ttl / 2) 93 defer ticker.Stop() 94 for range ticker.C { 95 sm.mu.Lock() 96 now := time.Now() 97 for ticket, session := range sm.sessions { 98 if now.Sub(session.LastAccessed) > sm.ttl { 99 log.Printf("Session %s expired. Cleaning up.", ticket) 100 delete(sm.sessions, ticket) 101 } 102 } 103 sm.mu.Unlock() 104 } 105} 106 107func generateTicket() string { 108 bytes := make([]byte, 16) 109 if _, err := rand.Read(bytes); err != nil { 110 return "fallback-ticket-" + time.Now().String() 111 } 112 return hex.EncodeToString(bytes) 113}