Live video on the AT Protocol
at natb/vodius-codex 281 lines 9.0 kB view raw
1package statedb 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/xrpc" 14 "gorm.io/gorm" 15 "stream.place/streamplace/pkg/integrations/webhook" 16 "stream.place/streamplace/pkg/log" 17 notificationpkg "stream.place/streamplace/pkg/notifications" 18 "stream.place/streamplace/pkg/streamplace" 19 20 comatproto "github.com/bluesky-social/indigo/api/atproto" 21) 22 23var TaskNotification = "notification" 24var TaskChat = "chat" 25var TaskFinalizeLivestream = "finalize_livestream" 26 27type NotificationTask struct { 28 Livestream *streamplace.Livestream_LivestreamView 29 FeedPost *bsky.FeedDefs_PostView 30 ChatProfile *streamplace.ChatProfile 31 PDSURL string 32} 33 34type ChatTask struct { 35 MessageView *streamplace.ChatDefs_MessageView 36} 37 38type FinalizeLivestreamTask struct { 39 LivestreamURI string `json:"livestreamURI"` 40} 41 42func (state *StatefulDB) ProcessQueue(ctx context.Context) error { 43 for { 44 task, err := state.DequeueTask(ctx, "queue_processor") 45 if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { 46 return err 47 } 48 if task != nil { 49 err := state.processTask(ctx, task) 50 if err != nil { 51 log.Error(ctx, "failed to process task", "err", err) 52 } 53 } else { 54 select { 55 case <-ctx.Done(): 56 return ctx.Err() 57 case <-time.After(1 * time.Second): 58 continue 59 case <-state.pokeQueue: 60 continue 61 } 62 } 63 64 } 65} 66 67func (state *StatefulDB) processTask(ctx context.Context, task *AppTask) error { 68 switch task.Type { 69 case TaskNotification: 70 return state.processNotificationTask(ctx, task) 71 case TaskChat: 72 return state.processChatMessageTask(ctx, task) 73 case TaskFinalizeLivestream: 74 return state.processFinalizeLivestreamTask(ctx, task) 75 default: 76 return fmt.Errorf("unknown task type: %s", task.Type) 77 } 78} 79 80func (state *StatefulDB) processFinalizeLivestreamTask(ctx context.Context, task *AppTask) error { 81 ctx = log.WithLogValues(ctx, "func", "processFinalizeLivestreamTask") 82 log.Debug(ctx, "processing finalize livestream task") 83 var finalizeLivestreamTask FinalizeLivestreamTask 84 if err := json.Unmarshal(task.Payload, &finalizeLivestreamTask); err != nil { 85 return err 86 } 87 livestream, err := state.model.GetLivestream(finalizeLivestreamTask.LivestreamURI) 88 if err != nil { 89 return fmt.Errorf("failed to get latest livestream for userDID: %w", err) 90 } 91 if livestream == nil { 92 return fmt.Errorf("no livestream found for URI: %s", finalizeLivestreamTask.LivestreamURI) 93 } 94 lastLivestreamView, err := livestream.ToLivestreamView() 95 if err != nil { 96 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err) 97 } 98 rec, ok := lastLivestreamView.Record.Val.(*streamplace.Livestream) 99 if !ok { 100 return fmt.Errorf("livestream is not a streamplace livestream") 101 } 102 if rec.LastSeenAt == nil { 103 return fmt.Errorf("livestream has no last seen at") 104 } 105 lastSeenTime, err := time.Parse(time.RFC3339, *rec.LastSeenAt) 106 if err != nil { 107 return fmt.Errorf("could not parse last seen at: %w", err) 108 } 109 if rec.IdleTimeoutSeconds == nil || *rec.IdleTimeoutSeconds == 0 { 110 log.Debug(ctx, "livestream has no idle timeout, skipping finalization", "uri", livestream.URI) 111 return nil 112 } 113 if time.Since(lastSeenTime) < (time.Duration(*rec.IdleTimeoutSeconds) * time.Second) { 114 log.Debug(ctx, "livestream is active, skipping finalization", "lastSeenAt", lastSeenTime) 115 return nil 116 } 117 session, err := state.GetSessionByDID(livestream.RepoDID) 118 if err != nil { 119 return fmt.Errorf("failed to get session: %w", err) 120 } 121 session, err = state.OATProxy.RefreshIfNeeded(session) 122 if err != nil { 123 return fmt.Errorf("failed to refresh session: %w", err) 124 } 125 client, err := state.OATProxy.GetXrpcClient(session) 126 if err != nil { 127 return fmt.Errorf("failed to get xrpc client: %w", err) 128 } 129 if rec.EndedAt != nil { 130 log.Debug(ctx, "livestream has already ended, skipping", "uri", livestream.URI, "endedAt", *rec.EndedAt) 131 return nil 132 } 133 134 uri, err := syntax.ParseATURI(livestream.URI) 135 if err != nil { 136 return fmt.Errorf("failed to parse ATURI: %w", err) 137 } 138 139 rec.EndedAt = rec.LastSeenAt 140 141 inp := comatproto.RepoPutRecord_Input{ 142 Collection: "place.stream.livestream", 143 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 144 Rkey: uri.RecordKey().String(), 145 Repo: livestream.RepoDID, 146 SwapRecord: &livestream.CID, 147 } 148 out := comatproto.RepoPutRecord_Output{} 149 150 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 151 if err != nil { 152 return fmt.Errorf("failed to update livestream record: %w", err) 153 } 154 155 log.Log(ctx, "livestream finalized", "uri", livestream.URI, "endedAt", *rec.EndedAt) 156 157 return nil 158} 159 160func (state *StatefulDB) processNotificationTask(ctx context.Context, task *AppTask) error { 161 var notificationTask NotificationTask 162 if err := json.Unmarshal(task.Payload, &notificationTask); err != nil { 163 return err 164 } 165 lsv := notificationTask.Livestream 166 rec, ok := lsv.Record.Val.(*streamplace.Livestream) 167 if !ok { 168 return fmt.Errorf("invalid livestream record") 169 } 170 userDID := lsv.Author.Did 171 172 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", rec.Url, "createdAt", rec.CreatedAt, "repo", userDID) 173 followers, err := state.model.GetUserFollowers(ctx, userDID) 174 if err != nil { 175 return err 176 } 177 178 followersDIDs := make([]string, 0, len(followers)) 179 for _, follower := range followers { 180 followersDIDs = append(followersDIDs, follower.UserDID) 181 } 182 183 log.Log(ctx, "found followers", "count", len(followersDIDs)) 184 185 notifications, err := state.GetManyNotificationTokens(followersDIDs) 186 if err != nil { 187 return err 188 } 189 190 if state.noter != nil { 191 nb := &notificationpkg.NotificationBlast{ 192 Title: fmt.Sprintf("🔴 @%s is LIVE!", lsv.Author.Handle), 193 Body: rec.Title, 194 Data: map[string]string{ 195 "path": fmt.Sprintf("/%s", lsv.Author.Handle), 196 }, 197 } 198 err = state.noter.Blast(ctx, notifications, nb) 199 if err != nil { 200 log.Error(ctx, "failed to blast notifications", "err", err) 201 } else { 202 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb) 203 } 204 } else { 205 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications)) 206 } 207 208 // Send to webhooks using webhook manager 209 webhooks, err := state.GetActiveWebhooksForUser(userDID, "livestream") 210 if err != nil { 211 log.Error(ctx, "failed to get livestream webhooks", "err", err) 212 } else { 213 for _, w := range webhooks { 214 lexiconWebhook, err := w.ToLexicon() 215 if err != nil { 216 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID) 217 continue 218 } 219 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) { 220 err := webhook.SendLivestreamWebhook(ctx, lexiconWebhook, notificationTask.PDSURL, lsv, notificationTask.FeedPost, notificationTask.ChatProfile) 221 if err != nil { 222 log.Error(ctx, "failed to send livestream to webhook", "err", err, "webhook_id", wid) 223 err := state.IncrementWebhookError(wid) 224 if err != nil { 225 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid) 226 } 227 } else { 228 log.Log(ctx, "sent livestream to webhook", "webhook_id", wid) 229 err := state.ResetWebhookError(wid) 230 if err != nil { 231 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid) 232 } 233 } 234 }(lexiconWebhook, w.ID) 235 } 236 } 237 return nil 238} 239 240func (state *StatefulDB) processChatMessageTask(ctx context.Context, task *AppTask) error { 241 var chatTask ChatTask 242 if err := json.Unmarshal(task.Payload, &chatTask); err != nil { 243 return err 244 } 245 scm := chatTask.MessageView 246 rec, ok := scm.Record.Val.(*streamplace.ChatMessage) 247 if !ok { 248 return fmt.Errorf("invalid chat message record") 249 } 250 251 // Send to webhooks using webhook manager 252 webhooks, err := state.GetActiveWebhooksForUser(rec.Streamer, "chat") 253 if err != nil { 254 log.Error(ctx, "failed to get chat webhooks", "err", err) 255 } else { 256 for _, w := range webhooks { 257 lexiconWebhook, err := w.ToLexicon() 258 if err != nil { 259 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID) 260 continue 261 } 262 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) { 263 err := webhook.SendChatWebhook(ctx, lexiconWebhook, scm.Author.Did, scm) 264 if err != nil { 265 log.Error(ctx, "failed to send chat to webhook", "err", err, "webhook_id", wid) 266 err = state.IncrementWebhookError(wid) 267 if err != nil { 268 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid) 269 } 270 } else { 271 log.Log(ctx, "sent chat to webhook", "webhook_id", wid) 272 err = state.ResetWebhookError(wid) 273 if err != nil { 274 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid) 275 } 276 } 277 }(lexiconWebhook, w.ID) 278 } 279 } 280 return nil 281}