Live video on the AT Protocol
1package statedb
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/integrations/webhook"
13 "stream.place/streamplace/pkg/log"
14 notificationpkg "stream.place/streamplace/pkg/notifications"
15 "stream.place/streamplace/pkg/streamplace"
16)
17
18var TaskNotification = "notification"
19var TaskChat = "chat"
20
21type NotificationTask struct {
22 Livestream *streamplace.Livestream_LivestreamView
23 FeedPost *bsky.FeedDefs_PostView
24 ChatProfile *streamplace.ChatProfile
25 PDSURL string
26}
27
28type ChatTask struct {
29 MessageView *streamplace.ChatDefs_MessageView
30}
31
32func (state *StatefulDB) ProcessQueue(ctx context.Context) error {
33 for {
34 task, err := state.DequeueTask(ctx, "queue_processor")
35 if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
36 return err
37 }
38 if task != nil {
39 err := state.processTask(ctx, task)
40 if err != nil {
41 log.Error(ctx, "failed to process task", "err", err)
42 }
43 } else {
44 select {
45 case <-ctx.Done():
46 return ctx.Err()
47 case <-time.After(1 * time.Second):
48 continue
49 case <-state.pokeQueue:
50 continue
51 }
52 }
53
54 }
55}
56
57func (state *StatefulDB) processTask(ctx context.Context, task *AppTask) error {
58 switch task.Type {
59 case TaskNotification:
60 return state.processNotificationTask(ctx, task)
61 case TaskChat:
62 return state.processChatMessageTask(ctx, task)
63 default:
64 return fmt.Errorf("unknown task type: %s", task.Type)
65 }
66}
67
68func (state *StatefulDB) processNotificationTask(ctx context.Context, task *AppTask) error {
69 var notificationTask NotificationTask
70 if err := json.Unmarshal(task.Payload, ¬ificationTask); err != nil {
71 return err
72 }
73 lsv := notificationTask.Livestream
74 rec, ok := lsv.Record.Val.(*streamplace.Livestream)
75 if !ok {
76 return fmt.Errorf("invalid livestream record")
77 }
78 userDID := lsv.Author.Did
79
80 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", rec.Url, "createdAt", rec.CreatedAt, "repo", userDID)
81 followers, err := state.model.GetUserFollowers(ctx, userDID)
82 if err != nil {
83 return err
84 }
85
86 followersDIDs := make([]string, 0, len(followers))
87 for _, follower := range followers {
88 followersDIDs = append(followersDIDs, follower.UserDID)
89 }
90
91 log.Log(ctx, "found followers", "count", len(followersDIDs))
92
93 notifications, err := state.GetManyNotificationTokens(followersDIDs)
94 if err != nil {
95 return err
96 }
97
98 if state.noter != nil {
99 nb := ¬ificationpkg.NotificationBlast{
100 Title: fmt.Sprintf("🔴 @%s is LIVE!", lsv.Author.Handle),
101 Body: rec.Title,
102 Data: map[string]string{
103 "path": fmt.Sprintf("/%s", lsv.Author.Handle),
104 },
105 }
106 err = state.noter.Blast(ctx, notifications, nb)
107 if err != nil {
108 log.Error(ctx, "failed to blast notifications", "err", err)
109 } else {
110 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb)
111 }
112 } else {
113 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications))
114 }
115
116 // Send to webhooks using webhook manager
117 webhooks, err := state.GetActiveWebhooksForUser(userDID, "livestream")
118 if err != nil {
119 log.Error(ctx, "failed to get livestream webhooks", "err", err)
120 } else {
121 for _, w := range webhooks {
122 lexiconWebhook, err := w.ToLexicon()
123 if err != nil {
124 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID)
125 continue
126 }
127 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) {
128 err := webhook.SendLivestreamWebhook(ctx, lexiconWebhook, notificationTask.PDSURL, lsv, notificationTask.FeedPost, notificationTask.ChatProfile)
129 if err != nil {
130 log.Error(ctx, "failed to send livestream to webhook", "err", err, "webhook_id", wid)
131 err := state.IncrementWebhookError(wid)
132 if err != nil {
133 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid)
134 }
135 } else {
136 log.Log(ctx, "sent livestream to webhook", "webhook_id", wid)
137 err := state.ResetWebhookError(wid)
138 if err != nil {
139 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid)
140 }
141 }
142 }(lexiconWebhook, w.ID)
143 }
144 }
145 return nil
146}
147
148func (state *StatefulDB) processChatMessageTask(ctx context.Context, task *AppTask) error {
149 var chatTask ChatTask
150 if err := json.Unmarshal(task.Payload, &chatTask); err != nil {
151 return err
152 }
153 scm := chatTask.MessageView
154 rec, ok := scm.Record.Val.(*streamplace.ChatMessage)
155 if !ok {
156 return fmt.Errorf("invalid chat message record")
157 }
158
159 // Send to webhooks using webhook manager
160 webhooks, err := state.GetActiveWebhooksForUser(rec.Streamer, "chat")
161 if err != nil {
162 log.Error(ctx, "failed to get chat webhooks", "err", err)
163 } else {
164 for _, w := range webhooks {
165 lexiconWebhook, err := w.ToLexicon()
166 if err != nil {
167 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID)
168 continue
169 }
170 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) {
171 err := webhook.SendChatWebhook(ctx, lexiconWebhook, scm.Author.Did, scm)
172 if err != nil {
173 log.Error(ctx, "failed to send chat to webhook", "err", err, "webhook_id", wid)
174 err = state.IncrementWebhookError(wid)
175 if err != nil {
176 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid)
177 }
178 } else {
179 log.Log(ctx, "sent chat to webhook", "webhook_id", wid)
180 err = state.ResetWebhookError(wid)
181 if err != nil {
182 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid)
183 }
184 }
185 }(lexiconWebhook, w.ID)
186 }
187 }
188 return nil
189}