Live video on the AT Protocol
1package atproto
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "reflect"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 "github.com/bluesky-social/indigo/atproto/data"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "stream.place/streamplace/pkg/aqtime"
14 "stream.place/streamplace/pkg/integrations/discord"
15 "stream.place/streamplace/pkg/log"
16 "stream.place/streamplace/pkg/model"
17 notificationpkg "stream.place/streamplace/pkg/notifications"
18 "stream.place/streamplace/pkg/streamplace"
19
20 lexutil "github.com/bluesky-social/indigo/lex/util"
21)
22
23func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error {
24 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String())
25 now := time.Now()
26 r, err := atsync.Model.GetRepo(userDID)
27 if err != nil {
28 return fmt.Errorf("failed to get repo: %w", err)
29 }
30 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String())
31 aturi, err := syntax.ParseATURI(maybeATURI)
32 if err != nil {
33 return fmt.Errorf("failed to parse ATURI: %w", err)
34 }
35 d, err := data.UnmarshalCBOR(*recCBOR)
36 if err != nil {
37 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err)
38 }
39 cb, err := lexutil.CborDecodeValue(*recCBOR)
40 if errors.Is(err, lexutil.ErrUnrecognizedType) {
41 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err)
42 return nil
43 } else if err != nil {
44 return fmt.Errorf("failed to decode record CBOR: %w", err)
45 }
46 switch rec := cb.(type) {
47 case *bsky.GraphFollow:
48 if r == nil {
49 // someone we don't know about
50 return nil
51 }
52 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject)
53 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec)
54 if err != nil {
55 log.Debug(ctx, "failed to create follow", "err", err)
56 }
57
58 case *bsky.GraphBlock:
59 if r == nil {
60 // someone we don't know about
61 return nil
62 }
63 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject)
64 block := &model.Block{
65 RKey: rkey.String(),
66 RepoDID: userDID,
67 SubjectDID: rec.Subject,
68 Record: *recCBOR,
69 CID: cid,
70 }
71 err := atsync.Model.CreateBlock(ctx, block)
72 if err != nil {
73 return fmt.Errorf("failed to create block: %w", err)
74 }
75 block, err = atsync.Model.GetBlock(ctx, rkey.String())
76 if err != nil {
77 return fmt.Errorf("failed to get block after we just saved it?!: %w", err)
78 }
79 streamplaceBlock, err := block.ToStreamplaceBlock()
80 if err != nil {
81 return fmt.Errorf("failed to convert block to streamplace block: %w", err)
82 }
83 go atsync.Bus.Publish(userDID, streamplaceBlock)
84
85 case *streamplace.ChatMessage:
86 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
87 if err != nil {
88 return fmt.Errorf("failed to sync bluesky repo: %w", err)
89 }
90
91 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model)
92 if err != nil {
93 log.Error(ctx, "failed to sync bluesky repo", "err", err)
94 }
95
96 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle)
97 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID)
98 if err != nil {
99 return fmt.Errorf("failed to get user block: %w", err)
100 }
101 if block != nil {
102 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer)
103 return nil
104 }
105 mcm := &model.ChatMessage{
106 CID: cid,
107 URI: aturi.String(),
108 CreatedAt: now,
109 ChatMessage: recCBOR,
110 RepoDID: userDID,
111 Repo: repo,
112 StreamerRepoDID: rec.Streamer,
113 IndexedAt: &now,
114 }
115 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil {
116 mcm.ReplyToCID = &rec.Reply.Parent.Cid
117 }
118 err = atsync.Model.CreateChatMessage(ctx, mcm)
119 if err != nil {
120 log.Error(ctx, "failed to create chat message", "err", err)
121 }
122 mcm, err = atsync.Model.GetChatMessage(cid)
123 if err != nil {
124 log.Error(ctx, "failed to get just-saved chat message", "err", err)
125 }
126 if mcm == nil {
127 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err)
128 return nil
129 }
130 scm, err := mcm.ToStreamplaceMessageView()
131 if err != nil {
132 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err)
133 }
134 go atsync.Bus.Publish(rec.Streamer, scm)
135
136 if !isUpdate && !isFirstSync {
137 for _, webhook := range atsync.CLI.DiscordWebhooks {
138 if webhook.DID == rec.Streamer && webhook.Type == "chat" {
139 go func() {
140 err := discord.SendChat(ctx, webhook, repo, scm)
141 if err != nil {
142 log.Error(ctx, "failed to send livestream to discord", "err", err)
143 } else {
144 log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL)
145 }
146 }()
147 }
148 }
149 }
150
151 case *streamplace.ChatGate:
152 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
153 if err != nil {
154 return fmt.Errorf("failed to sync bluesky repo: %w", err)
155 }
156 if r == nil {
157 // someone we don't know about
158 return nil
159 }
160 log.Debug(ctx, "creating gate", "userDID", userDID, "hiddenMessage", rec.HiddenMessage)
161 gate := &model.Gate{
162 RKey: rkey.String(),
163 RepoDID: userDID,
164 HiddenMessage: rec.HiddenMessage,
165 CID: cid,
166 CreatedAt: now,
167 Repo: repo,
168 }
169 err = atsync.Model.CreateGate(ctx, gate)
170 if err != nil {
171 return fmt.Errorf("failed to create gate: %w", err)
172 }
173 gate, err = atsync.Model.GetGate(ctx, rkey.String())
174 if err != nil {
175 return fmt.Errorf("failed to get gate after we just saved it?!: %w", err)
176 }
177 streamplaceGate, err := gate.ToStreamplaceGate()
178 if err != nil {
179 return fmt.Errorf("failed to convert gate to streamplace gate: %w", err)
180 }
181 go atsync.Bus.Publish(userDID, streamplaceGate)
182
183 case *streamplace.ChatProfile:
184 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
185 if err != nil {
186 return fmt.Errorf("failed to sync bluesky repo: %w", err)
187 }
188 mcm := &model.ChatProfile{
189 RepoDID: userDID,
190 Repo: repo,
191 Record: recCBOR,
192 }
193 err = atsync.Model.CreateChatProfile(ctx, mcm)
194 if err != nil {
195 log.Error(ctx, "failed to create chat profile", "err", err)
196 }
197
198 case *streamplace.ServerSettings:
199 _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
200 if err != nil {
201 return fmt.Errorf("failed to sync bluesky repo: %w", err)
202 }
203 settings := &model.ServerSettings{
204 Server: rkey.String(),
205 RepoDID: userDID,
206 Record: recCBOR,
207 }
208 err = atsync.Model.UpdateServerSettings(ctx, settings)
209 if err != nil {
210 log.Error(ctx, "failed to create server settings", "err", err)
211 }
212
213 case *bsky.FeedPost:
214 // jsonData, err := json.Marshal(d)
215 // if err != nil {
216 // log.Error(ctx, "failed to marshal record data", "err", err)
217 // } else {
218 // log.Log(ctx, "record data", "json", string(jsonData))
219 // }
220
221 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
222 if err != nil {
223 return fmt.Errorf("failed to parse createdAt: %w", err)
224 }
225
226 if livestream, ok := d["place.stream.livestream"]; ok {
227 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
228 if err != nil {
229 return fmt.Errorf("failed to sync bluesky repo: %w", err)
230 }
231 livestream, ok := livestream.(map[string]interface{})
232 if !ok {
233 return fmt.Errorf("livestream is not a map")
234 }
235 url, ok := livestream["url"].(string)
236 if !ok {
237 return fmt.Errorf("livestream url is not a string")
238 }
239 log.Debug(ctx, "livestream url", "url", url)
240 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{
241 CID: cid,
242 CreatedAt: createdAt,
243 FeedPost: recCBOR,
244 RepoDID: userDID,
245 Repo: repo,
246 Type: "livestream",
247 URI: aturi.String(),
248 IndexedAt: &now,
249 }); err != nil {
250 return fmt.Errorf("failed to create bluesky post: %w", err)
251 }
252 } else {
253 if rec.Reply == nil || rec.Reply.Root == nil {
254 return nil
255 }
256 livestream, err := atsync.Model.GetLivestreamByPostCID(rec.Reply.Root.Cid)
257 if err != nil {
258 return fmt.Errorf("failed to get livestream: %w", err)
259 }
260 if livestream == nil {
261 return nil
262 }
263 // log.Warn(ctx, "chat message detected", "uri", livestream.URI)
264 // if this post is a reply to someone's livestream post
265 // log.Warn(ctx, "chat message detected", "message", rec.Text)
266 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
267 if err != nil {
268 return fmt.Errorf("failed to sync bluesky repo: %w", err)
269 }
270
271 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle)
272 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID)
273 if err != nil {
274 return fmt.Errorf("failed to get user block: %w", err)
275 }
276 if block != nil {
277 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID)
278 return nil
279 }
280 // if fc.cli.PrintChat {
281 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text)
282 // }
283 fp := &model.FeedPost{
284 CID: cid,
285 CreatedAt: createdAt,
286 FeedPost: recCBOR,
287 RepoDID: userDID,
288 Type: "reply",
289 Repo: repo,
290 ReplyRootCID: &livestream.PostCID,
291 ReplyRootRepoDID: &livestream.RepoDID,
292 URI: aturi.String(),
293 IndexedAt: &now,
294 }
295 err = atsync.Model.CreateFeedPost(ctx, fp)
296 if err != nil {
297 log.Error(ctx, "failed to create feed post", "err", err)
298 }
299 postView, err := fp.ToBskyPostView()
300 if err != nil {
301 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err)
302 }
303 go atsync.Bus.Publish(livestream.RepoDID, postView)
304 }
305
306 case *streamplace.Livestream:
307 var u string
308 if rec.Url != nil {
309 u = *rec.Url
310 }
311 if r == nil {
312 // we don't know about this repo
313 return nil
314 }
315 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
316 if err != nil {
317 log.Error(ctx, "failed to parse createdAt", "err", err)
318 return nil
319 }
320 ls := &model.Livestream{
321 CID: cid,
322 URI: aturi.String(),
323 CreatedAt: createdAt,
324 Livestream: recCBOR,
325 RepoDID: userDID,
326 }
327 if rec.Post != nil {
328 ls.PostCID = rec.Post.Cid
329 ls.PostURI = rec.Post.Uri
330 }
331 err = atsync.Model.CreateLivestream(ctx, ls)
332 if err != nil {
333 return fmt.Errorf("failed to create livestream: %w", err)
334 }
335 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID)
336 if err != nil {
337 return fmt.Errorf("failed to get latest livestream for repo: %w", err)
338 }
339 lsv, err := lsHydrated.ToLivestreamView()
340 if err != nil {
341 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err)
342 }
343 go atsync.Bus.Publish(userDID, lsv)
344
345 if !isUpdate && !isFirstSync {
346 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", u, "createdAt", rec.CreatedAt, "repo", userDID)
347 notifications, err := atsync.Model.GetFollowersNotificationTokens(userDID)
348 if err != nil {
349 return err
350 }
351
352 nb := ¬ificationpkg.NotificationBlast{
353 Title: fmt.Sprintf("🔴 @%s is LIVE!", r.Handle),
354 Body: rec.Title,
355 Data: map[string]string{
356 "path": fmt.Sprintf("/%s", r.Handle),
357 },
358 }
359 if atsync.Noter != nil {
360 err := atsync.Noter.Blast(ctx, notifications, nb)
361 if err != nil {
362 log.Error(ctx, "failed to blast notifications", "err", err)
363 } else {
364 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb)
365 }
366 } else {
367 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications), "content", nb)
368 }
369
370 var postView *bsky.FeedDefs_PostView
371 if lsHydrated.Post != nil {
372 postView, err = lsHydrated.Post.ToBskyPostView()
373 if err != nil {
374 log.Error(ctx, "failed to convert livestream post to bsky post view", "err", err)
375 }
376 } else {
377 log.Warn(ctx, "no post found for livestream", "livestream", lsHydrated)
378 }
379
380 var spcp *streamplace.ChatProfile
381 cp, err := atsync.Model.GetChatProfile(ctx, userDID)
382 if err != nil {
383 log.Error(ctx, "failed to get chat profile", "err", err)
384 }
385 if cp != nil {
386 spcp, err = cp.ToStreamplaceChatProfile()
387 if err != nil {
388 log.Error(ctx, "failed to convert chat profile to streamplace chat profile", "err", err)
389 }
390 }
391
392 for _, webhook := range atsync.CLI.DiscordWebhooks {
393 if webhook.DID == userDID && webhook.Type == "livestream" {
394 go func() {
395 err := discord.SendLivestream(ctx, webhook, r, lsv, postView, spcp)
396 if err != nil {
397 log.Error(ctx, "failed to send livestream to discord", "err", err)
398 } else {
399 log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL)
400 }
401 }()
402 }
403 }
404 }
405
406 case *streamplace.Key:
407 log.Debug(ctx, "creating key", "key", rec)
408 time, err := aqtime.FromString(rec.CreatedAt)
409 if err != nil {
410 return fmt.Errorf("failed to parse createdAt: %w", err)
411 }
412 key := model.SigningKey{
413 DID: rec.SigningKey,
414 RKey: rkey.String(),
415 CreatedAt: time.Time(),
416 RepoDID: userDID,
417 }
418 err = atsync.Model.UpdateSigningKey(&key)
419 if err != nil {
420 log.Error(ctx, "failed to create signing key", "err", err)
421 }
422
423 default:
424 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec))
425 }
426 return nil
427}