Live video on the AT Protocol
at next 189 lines 5.7 kB view raw
1package statedb 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "gorm.io/gorm" 12 "stream.place/streamplace/pkg/integrations/webhook" 13 "stream.place/streamplace/pkg/log" 14 notificationpkg "stream.place/streamplace/pkg/notifications" 15 "stream.place/streamplace/pkg/streamplace" 16) 17 18var TaskNotification = "notification" 19var TaskChat = "chat" 20 21type NotificationTask struct { 22 Livestream *streamplace.Livestream_LivestreamView 23 FeedPost *bsky.FeedDefs_PostView 24 ChatProfile *streamplace.ChatProfile 25 PDSURL string 26} 27 28type ChatTask struct { 29 MessageView *streamplace.ChatDefs_MessageView 30} 31 32func (state *StatefulDB) ProcessQueue(ctx context.Context) error { 33 for { 34 task, err := state.DequeueTask(ctx, "queue_processor") 35 if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { 36 return err 37 } 38 if task != nil { 39 err := state.processTask(ctx, task) 40 if err != nil { 41 log.Error(ctx, "failed to process task", "err", err) 42 } 43 } else { 44 select { 45 case <-ctx.Done(): 46 return ctx.Err() 47 case <-time.After(1 * time.Second): 48 continue 49 case <-state.pokeQueue: 50 continue 51 } 52 } 53 54 } 55} 56 57func (state *StatefulDB) processTask(ctx context.Context, task *AppTask) error { 58 switch task.Type { 59 case TaskNotification: 60 return state.processNotificationTask(ctx, task) 61 case TaskChat: 62 return state.processChatMessageTask(ctx, task) 63 default: 64 return fmt.Errorf("unknown task type: %s", task.Type) 65 } 66} 67 68func (state *StatefulDB) processNotificationTask(ctx context.Context, task *AppTask) error { 69 var notificationTask NotificationTask 70 if err := json.Unmarshal(task.Payload, &notificationTask); err != nil { 71 return err 72 } 73 lsv := notificationTask.Livestream 74 rec, ok := lsv.Record.Val.(*streamplace.Livestream) 75 if !ok { 76 return fmt.Errorf("invalid livestream record") 77 } 78 userDID := lsv.Author.Did 79 80 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", rec.Url, "createdAt", rec.CreatedAt, "repo", userDID) 81 followers, err := state.model.GetUserFollowers(ctx, userDID) 82 if err != nil { 83 return err 84 } 85 86 followersDIDs := make([]string, 0, len(followers)) 87 for _, follower := range followers { 88 followersDIDs = append(followersDIDs, follower.UserDID) 89 } 90 91 log.Log(ctx, "found followers", "count", len(followersDIDs)) 92 93 notifications, err := state.GetManyNotificationTokens(followersDIDs) 94 if err != nil { 95 return err 96 } 97 98 if state.noter != nil { 99 nb := &notificationpkg.NotificationBlast{ 100 Title: fmt.Sprintf("🔴 @%s is LIVE!", lsv.Author.Handle), 101 Body: rec.Title, 102 Data: map[string]string{ 103 "path": fmt.Sprintf("/%s", lsv.Author.Handle), 104 }, 105 } 106 err = state.noter.Blast(ctx, notifications, nb) 107 if err != nil { 108 log.Error(ctx, "failed to blast notifications", "err", err) 109 } else { 110 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb) 111 } 112 } else { 113 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications)) 114 } 115 116 // Send to webhooks using webhook manager 117 webhooks, err := state.GetActiveWebhooksForUser(userDID, "livestream") 118 if err != nil { 119 log.Error(ctx, "failed to get livestream webhooks", "err", err) 120 } else { 121 for _, w := range webhooks { 122 lexiconWebhook, err := w.ToLexicon() 123 if err != nil { 124 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID) 125 continue 126 } 127 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) { 128 err := webhook.SendLivestreamWebhook(ctx, lexiconWebhook, notificationTask.PDSURL, lsv, notificationTask.FeedPost, notificationTask.ChatProfile) 129 if err != nil { 130 log.Error(ctx, "failed to send livestream to webhook", "err", err, "webhook_id", wid) 131 err := state.IncrementWebhookError(wid) 132 if err != nil { 133 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid) 134 } 135 } else { 136 log.Log(ctx, "sent livestream to webhook", "webhook_id", wid) 137 err := state.ResetWebhookError(wid) 138 if err != nil { 139 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid) 140 } 141 } 142 }(lexiconWebhook, w.ID) 143 } 144 } 145 return nil 146} 147 148func (state *StatefulDB) processChatMessageTask(ctx context.Context, task *AppTask) error { 149 var chatTask ChatTask 150 if err := json.Unmarshal(task.Payload, &chatTask); err != nil { 151 return err 152 } 153 scm := chatTask.MessageView 154 rec, ok := scm.Record.Val.(*streamplace.ChatMessage) 155 if !ok { 156 return fmt.Errorf("invalid chat message record") 157 } 158 159 // Send to webhooks using webhook manager 160 webhooks, err := state.GetActiveWebhooksForUser(rec.Streamer, "chat") 161 if err != nil { 162 log.Error(ctx, "failed to get chat webhooks", "err", err) 163 } else { 164 for _, w := range webhooks { 165 lexiconWebhook, err := w.ToLexicon() 166 if err != nil { 167 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID) 168 continue 169 } 170 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) { 171 err := webhook.SendChatWebhook(ctx, lexiconWebhook, scm.Author.Did, scm) 172 if err != nil { 173 log.Error(ctx, "failed to send chat to webhook", "err", err, "webhook_id", wid) 174 err = state.IncrementWebhookError(wid) 175 if err != nil { 176 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid) 177 } 178 } else { 179 log.Log(ctx, "sent chat to webhook", "webhook_id", wid) 180 err = state.ResetWebhookError(wid) 181 if err != nil { 182 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid) 183 } 184 } 185 }(lexiconWebhook, w.ID) 186 } 187 } 188 return nil 189}