Live video on the AT Protocol
1package statedb
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 lexutil "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/xrpc"
14 "gorm.io/gorm"
15 "stream.place/streamplace/pkg/integrations/webhook"
16 "stream.place/streamplace/pkg/log"
17 notificationpkg "stream.place/streamplace/pkg/notifications"
18 "stream.place/streamplace/pkg/streamplace"
19
20 comatproto "github.com/bluesky-social/indigo/api/atproto"
21)
22
23var TaskNotification = "notification"
24var TaskChat = "chat"
25var TaskFinalizeLivestream = "finalize_livestream"
26
27type NotificationTask struct {
28 Livestream *streamplace.Livestream_LivestreamView
29 FeedPost *bsky.FeedDefs_PostView
30 ChatProfile *streamplace.ChatProfile
31 PDSURL string
32}
33
34type ChatTask struct {
35 MessageView *streamplace.ChatDefs_MessageView
36}
37
38type FinalizeLivestreamTask struct {
39 LivestreamURI string `json:"livestreamURI"`
40}
41
42func (state *StatefulDB) ProcessQueue(ctx context.Context) error {
43 for {
44 task, err := state.DequeueTask(ctx, "queue_processor")
45 if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
46 return err
47 }
48 if task != nil {
49 err := state.processTask(ctx, task)
50 if err != nil {
51 log.Error(ctx, "failed to process task", "err", err)
52 }
53 } else {
54 select {
55 case <-ctx.Done():
56 return ctx.Err()
57 case <-time.After(1 * time.Second):
58 continue
59 case <-state.pokeQueue:
60 continue
61 }
62 }
63
64 }
65}
66
67func (state *StatefulDB) processTask(ctx context.Context, task *AppTask) error {
68 switch task.Type {
69 case TaskNotification:
70 return state.processNotificationTask(ctx, task)
71 case TaskChat:
72 return state.processChatMessageTask(ctx, task)
73 case TaskFinalizeLivestream:
74 return state.processFinalizeLivestreamTask(ctx, task)
75 default:
76 return fmt.Errorf("unknown task type: %s", task.Type)
77 }
78}
79
80func (state *StatefulDB) processFinalizeLivestreamTask(ctx context.Context, task *AppTask) error {
81 ctx = log.WithLogValues(ctx, "func", "processFinalizeLivestreamTask")
82 log.Debug(ctx, "processing finalize livestream task")
83 var finalizeLivestreamTask FinalizeLivestreamTask
84 if err := json.Unmarshal(task.Payload, &finalizeLivestreamTask); err != nil {
85 return err
86 }
87 livestream, err := state.model.GetLivestream(finalizeLivestreamTask.LivestreamURI)
88 if err != nil {
89 return fmt.Errorf("failed to get latest livestream for userDID: %w", err)
90 }
91 if livestream == nil {
92 return fmt.Errorf("no livestream found for URI: %s", finalizeLivestreamTask.LivestreamURI)
93 }
94 lastLivestreamView, err := livestream.ToLivestreamView()
95 if err != nil {
96 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err)
97 }
98 rec, ok := lastLivestreamView.Record.Val.(*streamplace.Livestream)
99 if !ok {
100 return fmt.Errorf("livestream is not a streamplace livestream")
101 }
102 if rec.LastSeenAt == nil {
103 return fmt.Errorf("livestream has no last seen at")
104 }
105 lastSeenTime, err := time.Parse(time.RFC3339, *rec.LastSeenAt)
106 if err != nil {
107 return fmt.Errorf("could not parse last seen at: %w", err)
108 }
109 if rec.IdleTimeoutSeconds == nil || *rec.IdleTimeoutSeconds == 0 {
110 log.Debug(ctx, "livestream has no idle timeout, skipping finalization", "uri", livestream.URI)
111 return nil
112 }
113 if time.Since(lastSeenTime) < (time.Duration(*rec.IdleTimeoutSeconds) * time.Second) {
114 log.Debug(ctx, "livestream is active, skipping finalization", "lastSeenAt", lastSeenTime)
115 return nil
116 }
117 session, err := state.GetSessionByDID(livestream.RepoDID)
118 if err != nil {
119 return fmt.Errorf("failed to get session: %w", err)
120 }
121 session, err = state.OATProxy.RefreshIfNeeded(session)
122 if err != nil {
123 return fmt.Errorf("failed to refresh session: %w", err)
124 }
125 client, err := state.OATProxy.GetXrpcClient(session)
126 if err != nil {
127 return fmt.Errorf("failed to get xrpc client: %w", err)
128 }
129 if rec.EndedAt != nil {
130 log.Debug(ctx, "livestream has already ended, skipping", "uri", livestream.URI, "endedAt", *rec.EndedAt)
131 return nil
132 }
133
134 uri, err := syntax.ParseATURI(livestream.URI)
135 if err != nil {
136 return fmt.Errorf("failed to parse ATURI: %w", err)
137 }
138
139 rec.EndedAt = rec.LastSeenAt
140
141 inp := comatproto.RepoPutRecord_Input{
142 Collection: "place.stream.livestream",
143 Record: &lexutil.LexiconTypeDecoder{Val: rec},
144 Rkey: uri.RecordKey().String(),
145 Repo: livestream.RepoDID,
146 SwapRecord: &livestream.CID,
147 }
148 out := comatproto.RepoPutRecord_Output{}
149
150 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
151 if err != nil {
152 return fmt.Errorf("failed to update livestream record: %w", err)
153 }
154
155 log.Log(ctx, "livestream finalized", "uri", livestream.URI, "endedAt", *rec.EndedAt)
156
157 return nil
158}
159
160func (state *StatefulDB) processNotificationTask(ctx context.Context, task *AppTask) error {
161 var notificationTask NotificationTask
162 if err := json.Unmarshal(task.Payload, ¬ificationTask); err != nil {
163 return err
164 }
165 lsv := notificationTask.Livestream
166 rec, ok := lsv.Record.Val.(*streamplace.Livestream)
167 if !ok {
168 return fmt.Errorf("invalid livestream record")
169 }
170 userDID := lsv.Author.Did
171
172 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", rec.Url, "createdAt", rec.CreatedAt, "repo", userDID)
173 followers, err := state.model.GetUserFollowers(ctx, userDID)
174 if err != nil {
175 return err
176 }
177
178 followersDIDs := make([]string, 0, len(followers))
179 for _, follower := range followers {
180 followersDIDs = append(followersDIDs, follower.UserDID)
181 }
182
183 log.Log(ctx, "found followers", "count", len(followersDIDs))
184
185 notifications, err := state.GetManyNotificationTokens(followersDIDs)
186 if err != nil {
187 return err
188 }
189
190 if state.noter != nil {
191 nb := ¬ificationpkg.NotificationBlast{
192 Title: fmt.Sprintf("🔴 @%s is LIVE!", lsv.Author.Handle),
193 Body: rec.Title,
194 Data: map[string]string{
195 "path": fmt.Sprintf("/%s", lsv.Author.Handle),
196 },
197 }
198 err = state.noter.Blast(ctx, notifications, nb)
199 if err != nil {
200 log.Error(ctx, "failed to blast notifications", "err", err)
201 } else {
202 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb)
203 }
204 } else {
205 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications))
206 }
207
208 // Send to webhooks using webhook manager
209 webhooks, err := state.GetActiveWebhooksForUser(userDID, "livestream")
210 if err != nil {
211 log.Error(ctx, "failed to get livestream webhooks", "err", err)
212 } else {
213 for _, w := range webhooks {
214 lexiconWebhook, err := w.ToLexicon()
215 if err != nil {
216 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID)
217 continue
218 }
219 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) {
220 err := webhook.SendLivestreamWebhook(ctx, lexiconWebhook, notificationTask.PDSURL, lsv, notificationTask.FeedPost, notificationTask.ChatProfile)
221 if err != nil {
222 log.Error(ctx, "failed to send livestream to webhook", "err", err, "webhook_id", wid)
223 err := state.IncrementWebhookError(wid)
224 if err != nil {
225 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid)
226 }
227 } else {
228 log.Log(ctx, "sent livestream to webhook", "webhook_id", wid)
229 err := state.ResetWebhookError(wid)
230 if err != nil {
231 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid)
232 }
233 }
234 }(lexiconWebhook, w.ID)
235 }
236 }
237 return nil
238}
239
240func (state *StatefulDB) processChatMessageTask(ctx context.Context, task *AppTask) error {
241 var chatTask ChatTask
242 if err := json.Unmarshal(task.Payload, &chatTask); err != nil {
243 return err
244 }
245 scm := chatTask.MessageView
246 rec, ok := scm.Record.Val.(*streamplace.ChatMessage)
247 if !ok {
248 return fmt.Errorf("invalid chat message record")
249 }
250
251 // Send to webhooks using webhook manager
252 webhooks, err := state.GetActiveWebhooksForUser(rec.Streamer, "chat")
253 if err != nil {
254 log.Error(ctx, "failed to get chat webhooks", "err", err)
255 } else {
256 for _, w := range webhooks {
257 lexiconWebhook, err := w.ToLexicon()
258 if err != nil {
259 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID)
260 continue
261 }
262 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) {
263 err := webhook.SendChatWebhook(ctx, lexiconWebhook, scm.Author.Did, scm)
264 if err != nil {
265 log.Error(ctx, "failed to send chat to webhook", "err", err, "webhook_id", wid)
266 err = state.IncrementWebhookError(wid)
267 if err != nil {
268 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid)
269 }
270 } else {
271 log.Log(ctx, "sent chat to webhook", "webhook_id", wid)
272 err = state.ResetWebhookError(wid)
273 if err != nil {
274 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid)
275 }
276 }
277 }(lexiconWebhook, w.ID)
278 }
279 }
280 return nil
281}