collection of golang services under the Red Dwarf umbrella server.reddwarf.app
bluesky reddwarf microcosm appview
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at ccd50988a21247be441dc8d4a052510acdfcb76c 113 lines 2.7 kB view raw
1package backstream 2 3import ( 4 "crypto/rand" 5 "encoding/hex" 6 "log" 7 "sync" 8 "time" 9) 10 11type Session struct { 12 Ticket string 13 Params BackfillParams 14 LastAccessed time.Time 15 16 ListReposCursor string // Cursor for listReposByCollection if wantedDids=* 17 CompletedDIDs map[string]bool // Set of DIDs that have been fully processed. 18 listRecordsCursors map[string]string // Key: "did/collection", Value: cursor 19 20 mu sync.Mutex 21} 22 23func (s *Session) GetListRecordsCursor(did, collection string) string { 24 s.mu.Lock() 25 defer s.mu.Unlock() 26 key := did + "/" + collection 27 return s.listRecordsCursors[key] 28} 29 30func (s *Session) SetListRecordsCursor(did, collection, cursor string) { 31 s.mu.Lock() 32 defer s.mu.Unlock() 33 key := did + "/" + collection 34 s.listRecordsCursors[key] = cursor 35 s.LastAccessed = time.Now() 36} 37 38func (s *Session) MarkDIDComplete(did string) { 39 s.mu.Lock() 40 defer s.mu.Unlock() 41 s.CompletedDIDs[did] = true 42 s.LastAccessed = time.Now() 43} 44 45func (s *Session) IsDIDComplete(did string) bool { 46 s.mu.Lock() 47 defer s.mu.Unlock() 48 return s.CompletedDIDs[did] 49} 50 51type SessionManager struct { 52 sessions map[string]*Session 53 ttl time.Duration 54 mu sync.Mutex 55} 56 57func NewSessionManager(ttl time.Duration) *SessionManager { 58 sm := &SessionManager{ 59 sessions: make(map[string]*Session), 60 ttl: ttl, 61 } 62 go sm.cleanupLoop() 63 return sm 64} 65 66func (sm *SessionManager) GetOrCreate(ticket string, params BackfillParams) *Session { 67 sm.mu.Lock() 68 defer sm.mu.Unlock() 69 70 if session, exists := sm.sessions[ticket]; exists { 71 log.Printf("Resuming existing session for ticket: %s", ticket) 72 session.LastAccessed = time.Now() 73 if session.CompletedDIDs == nil { 74 session.CompletedDIDs = make(map[string]bool) 75 } 76 return session 77 } 78 79 log.Printf("Creating new session for ticket: %s", ticket) 80 newSession := &Session{ 81 Ticket: ticket, 82 Params: params, 83 LastAccessed: time.Now(), 84 listRecordsCursors: make(map[string]string), 85 CompletedDIDs: make(map[string]bool), 86 } 87 sm.sessions[ticket] = newSession 88 return newSession 89} 90 91func (sm *SessionManager) cleanupLoop() { 92 ticker := time.NewTicker(sm.ttl / 2) 93 defer ticker.Stop() 94 for range ticker.C { 95 sm.mu.Lock() 96 now := time.Now() 97 for ticket, session := range sm.sessions { 98 if now.Sub(session.LastAccessed) > sm.ttl { 99 log.Printf("Session %s expired. Cleaning up.", ticket) 100 delete(sm.sessions, ticket) 101 } 102 } 103 sm.mu.Unlock() 104 } 105} 106 107func generateTicket() string { 108 bytes := make([]byte, 16) 109 if _, err := rand.Read(bytes); err != nil { 110 return "fallback-ticket-" + time.Now().String() 111 } 112 return hex.EncodeToString(bytes) 113}