Live video on the AT Protocol
1package statedb
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/integrations/discord"
13 "stream.place/streamplace/pkg/log"
14 notificationpkg "stream.place/streamplace/pkg/notifications"
15 "stream.place/streamplace/pkg/streamplace"
16)
17
18var TaskNotification = "notification"
19var TaskChat = "chat"
20
21type NotificationTask struct {
22 Livestream *streamplace.Livestream_LivestreamView
23 FeedPost *bsky.FeedDefs_PostView
24 ChatProfile *streamplace.ChatProfile
25 PDSURL string
26}
27
28type ChatTask struct {
29 MessageView *streamplace.ChatDefs_MessageView
30}
31
32func (state *StatefulDB) ProcessQueue(ctx context.Context) error {
33 for {
34 task, err := state.DequeueTask(ctx, "queue_processor")
35 if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
36 return err
37 }
38 if task != nil {
39 err := state.processTask(ctx, task)
40 if err != nil {
41 log.Error(ctx, "failed to process task", "err", err)
42 }
43 } else {
44 select {
45 case <-ctx.Done():
46 return ctx.Err()
47 case <-time.After(1 * time.Second):
48 continue
49 case <-state.pokeQueue:
50 continue
51 }
52 }
53
54 }
55}
56
57func (state *StatefulDB) processTask(ctx context.Context, task *AppTask) error {
58 switch task.Type {
59 case TaskNotification:
60 return state.processNotificationTask(ctx, task)
61 case TaskChat:
62 return state.processChatMessageTask(ctx, task)
63 default:
64 return fmt.Errorf("unknown task type: %s", task.Type)
65 }
66}
67
68func (state *StatefulDB) processNotificationTask(ctx context.Context, task *AppTask) error {
69 var notificationTask NotificationTask
70 if err := json.Unmarshal(task.Payload, ¬ificationTask); err != nil {
71 return err
72 }
73 lsv := notificationTask.Livestream
74 rec, ok := lsv.Record.Val.(*streamplace.Livestream)
75 if !ok {
76 return fmt.Errorf("invalid livestream record")
77 }
78 userDID := lsv.Author.Did
79
80 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", rec.Url, "createdAt", rec.CreatedAt, "repo", userDID)
81 followers, err := state.model.GetUserFollowers(ctx, userDID)
82 if err != nil {
83 return err
84 }
85
86 followersDIDs := make([]string, 0, len(followers))
87 for _, follower := range followers {
88 followersDIDs = append(followersDIDs, follower.UserDID)
89 }
90
91 log.Log(ctx, "found followers", "count", len(followersDIDs))
92
93 notifications, err := state.GetManyNotificationTokens(followersDIDs)
94 if err != nil {
95 return err
96 }
97
98 if state.noter != nil {
99 nb := ¬ificationpkg.NotificationBlast{
100 Title: fmt.Sprintf("🔴 @%s is LIVE!", lsv.Author.Handle),
101 Body: rec.Title,
102 Data: map[string]string{
103 "path": fmt.Sprintf("/%s", lsv.Author.Handle),
104 },
105 }
106 err = state.noter.Blast(ctx, notifications, nb)
107 if err != nil {
108 log.Error(ctx, "failed to blast notifications", "err", err)
109 } else {
110 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb)
111 }
112 } else {
113 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications))
114 }
115
116 for _, webhook := range state.CLI.DiscordWebhooks {
117 if webhook.DID == userDID && webhook.Type == "livestream" {
118 go func() {
119 err := discord.SendLivestream(ctx, webhook, notificationTask.PDSURL, lsv, notificationTask.FeedPost, notificationTask.ChatProfile)
120 if err != nil {
121 log.Error(ctx, "failed to send livestream to discord", "err", err)
122 } else {
123 log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL)
124 }
125 }()
126 }
127 }
128 return nil
129}
130
131func (state *StatefulDB) processChatMessageTask(ctx context.Context, task *AppTask) error {
132 var chatTask ChatTask
133 if err := json.Unmarshal(task.Payload, &chatTask); err != nil {
134 return err
135 }
136 scm := chatTask.MessageView
137 rec, ok := scm.Record.Val.(*streamplace.ChatMessage)
138 if !ok {
139 return fmt.Errorf("invalid chat message record")
140 }
141 userDID := scm.Author.Did
142
143 for _, webhook := range state.CLI.DiscordWebhooks {
144 if webhook.DID == rec.Streamer && webhook.Type == "chat" {
145 go func() {
146 err := discord.SendChat(ctx, webhook, scm.Author.Did, scm)
147 if err != nil {
148 log.Error(ctx, "failed to send livestream to discord", "err", err)
149 } else {
150 log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL)
151 }
152 }()
153 }
154 }
155 return nil
156}