Live video on the AT Protocol
1package atproto
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "reflect"
9 "strings"
10 "time"
11
12 "github.com/bluesky-social/indigo/api/bsky"
13 "github.com/bluesky-social/indigo/atproto/atdata"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "stream.place/streamplace/pkg/aqtime"
16 "stream.place/streamplace/pkg/log"
17 "stream.place/streamplace/pkg/model"
18 "stream.place/streamplace/pkg/statedb"
19 "stream.place/streamplace/pkg/streamplace"
20
21 lexutil "github.com/bluesky-social/indigo/lex/util"
22)
23
24func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error {
25 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String())
26 now := time.Now()
27 r, err := atsync.Model.GetRepo(userDID)
28 if err != nil {
29 return fmt.Errorf("failed to get repo: %w", err)
30 }
31 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String())
32 aturi, err := syntax.ParseATURI(maybeATURI)
33 if err != nil {
34 return fmt.Errorf("failed to parse ATURI: %w", err)
35 }
36 d, err := atdata.UnmarshalCBOR(*recCBOR)
37 if err != nil {
38 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err)
39 }
40 cb, err := lexutil.CborDecodeValue(*recCBOR)
41 if errors.Is(err, lexutil.ErrUnrecognizedType) {
42 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err)
43 return nil
44 } else if err != nil {
45 return fmt.Errorf("failed to decode record CBOR: %w", err)
46 }
47 switch rec := cb.(type) {
48 case *bsky.GraphFollow:
49 if r == nil {
50 // someone we don't know about
51 return nil
52 }
53 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject)
54 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec)
55 if err != nil {
56 log.Debug(ctx, "failed to create follow", "err", err)
57 }
58
59 case *bsky.GraphBlock:
60 if r == nil {
61 // someone we don't know about
62 return nil
63 }
64 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject)
65 block := &model.Block{
66 RKey: rkey.String(),
67 RepoDID: userDID,
68 SubjectDID: rec.Subject,
69 Record: *recCBOR,
70 CID: cid,
71 }
72 err := atsync.Model.CreateBlock(ctx, block)
73 if err != nil {
74 return fmt.Errorf("failed to create block: %w", err)
75 }
76 block, err = atsync.Model.GetBlock(ctx, rkey.String())
77 if err != nil || block == nil {
78 return fmt.Errorf("failed to get block after we just saved it?!: %w", err)
79 }
80 streamplaceBlock, err := block.ToStreamplaceBlock()
81 if err != nil {
82 return fmt.Errorf("failed to convert block to streamplace block: %w", err)
83 }
84 go atsync.Bus.Publish(userDID, streamplaceBlock)
85
86 case *streamplace.ChatMessage:
87 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
88 if err != nil {
89 return fmt.Errorf("failed to sync bluesky repo: %w", err)
90 }
91
92 go func() {
93 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model)
94 if err != nil {
95 log.Error(ctx, "failed to sync bluesky repo", "err", err)
96 }
97 }()
98
99 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle)
100 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID)
101 if err != nil {
102 return fmt.Errorf("failed to get user block: %w", err)
103 }
104 if block != nil {
105 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer)
106 return nil
107 }
108 mcm := &model.ChatMessage{
109 CID: cid,
110 URI: aturi.String(),
111 CreatedAt: now,
112 ChatMessage: recCBOR,
113 RepoDID: userDID,
114 Repo: repo,
115 StreamerRepoDID: rec.Streamer,
116 IndexedAt: &now,
117 }
118 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil {
119 mcm.ReplyToCID = &rec.Reply.Parent.Cid
120 }
121
122 // check if we have any link facets with 'javascript:' links
123 for _, facet := range rec.Facets {
124 for _, feature := range facet.Features {
125 if link := feature.RichtextFacet_Link; link != nil {
126 if link.Uri != "" && strings.HasPrefix(strings.ToLower(link.Uri), "javascript:") {
127 log.Warn(ctx, "excluding message with javascript: link", "uri", aturi.String(), "link", link.Uri)
128 return nil
129 }
130 }
131 }
132 }
133
134 err = atsync.Model.CreateChatMessage(ctx, mcm)
135 if err != nil {
136 log.Error(ctx, "failed to create chat message", "err", err)
137 return nil
138 }
139 mcm, err = atsync.Model.GetChatMessage(aturi.String())
140 if err != nil {
141 log.Error(ctx, "failed to get just-saved chat message", "err", err)
142 return nil
143 }
144 if mcm == nil {
145 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err)
146 return nil
147 }
148 scm, err := mcm.ToStreamplaceMessageView()
149 if err != nil {
150 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err)
151 return nil
152 }
153 go atsync.Bus.Publish(rec.Streamer, scm)
154
155 if !isUpdate && !isFirstSync {
156
157 task := &statedb.ChatTask{
158 MessageView: scm,
159 }
160
161 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskChat, task, statedb.WithTaskKey(fmt.Sprintf("chat-message::%s", aturi.String())))
162 if err != nil {
163 log.Error(ctx, "failed to enqueue notification task", "err", err)
164 }
165 }
166
167 case *streamplace.ChatGate:
168 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
169 if err != nil {
170 return fmt.Errorf("failed to sync bluesky repo: %w", err)
171 }
172 if r == nil {
173 // someone we don't know about
174 return nil
175 }
176 log.Debug(ctx, "creating gate", "userDID", userDID, "hiddenMessage", rec.HiddenMessage)
177 gate := &model.Gate{
178 RKey: rkey.String(),
179 RepoDID: userDID,
180 HiddenMessage: rec.HiddenMessage,
181 CID: cid,
182 CreatedAt: now,
183 Repo: repo,
184 }
185 err = atsync.Model.CreateGate(ctx, gate)
186 if err != nil {
187 return fmt.Errorf("failed to create gate: %w", err)
188 }
189 gate, err = atsync.Model.GetGate(ctx, rkey.String())
190 if err != nil {
191 return fmt.Errorf("failed to get gate after we just saved it?!: %w", err)
192 }
193 streamplaceGate, err := gate.ToStreamplaceGate()
194 if err != nil {
195 return fmt.Errorf("failed to convert gate to streamplace gate: %w", err)
196 }
197 go atsync.Bus.Publish(userDID, streamplaceGate)
198
199 case *streamplace.ChatProfile:
200 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
201 if err != nil {
202 return fmt.Errorf("failed to sync bluesky repo: %w", err)
203 }
204 mcm := &model.ChatProfile{
205 RepoDID: userDID,
206 Repo: repo,
207 Record: recCBOR,
208 }
209 err = atsync.Model.CreateChatProfile(ctx, mcm)
210 if err != nil {
211 log.Error(ctx, "failed to create chat profile", "err", err)
212 }
213
214 case *streamplace.ServerSettings:
215 _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
216 if err != nil {
217 return fmt.Errorf("failed to sync bluesky repo: %w", err)
218 }
219 settings := &model.ServerSettings{
220 Server: rkey.String(),
221 RepoDID: userDID,
222 Record: recCBOR,
223 }
224 err = atsync.Model.UpdateServerSettings(ctx, settings)
225 if err != nil {
226 log.Error(ctx, "failed to create server settings", "err", err)
227 }
228
229 case *bsky.FeedPost:
230 // jsonData, err := json.Marshal(d)
231 // if err != nil {
232 // log.Error(ctx, "failed to marshal record data", "err", err)
233 // } else {
234 // log.Log(ctx, "record data", "json", string(jsonData))
235 // }
236
237 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
238 if err != nil {
239 return fmt.Errorf("failed to parse createdAt: %w", err)
240 }
241
242 if livestream, ok := d["place.stream.livestream"]; ok {
243 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
244 if err != nil {
245 return fmt.Errorf("failed to sync bluesky repo: %w", err)
246 }
247 livestream, ok := livestream.(map[string]interface{})
248 if !ok {
249 return fmt.Errorf("livestream is not a map")
250 }
251 url, ok := livestream["url"].(string)
252 if !ok {
253 return fmt.Errorf("livestream url is not a string")
254 }
255 log.Debug(ctx, "livestream url", "url", url)
256 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{
257 CID: cid,
258 CreatedAt: createdAt,
259 FeedPost: recCBOR,
260 RepoDID: userDID,
261 Repo: repo,
262 Type: "livestream",
263 URI: aturi.String(),
264 IndexedAt: &now,
265 }); err != nil {
266 return fmt.Errorf("failed to create bluesky post: %w", err)
267 }
268 } else {
269 if rec.Reply == nil || rec.Reply.Root == nil {
270 return nil
271 }
272 livestream, err := atsync.Model.GetLivestreamByPostURI(rec.Reply.Root.Uri)
273 if err != nil {
274 return fmt.Errorf("failed to get livestream: %w", err)
275 }
276 if livestream == nil {
277 return nil
278 }
279 // log.Warn(ctx, "chat message detected", "uri", livestream.URI)
280 // if this post is a reply to someone's livestream post
281 // log.Warn(ctx, "chat message detected", "message", rec.Text)
282 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
283 if err != nil {
284 return fmt.Errorf("failed to sync bluesky repo: %w", err)
285 }
286
287 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle)
288 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID)
289 if err != nil {
290 return fmt.Errorf("failed to get user block: %w", err)
291 }
292 if block != nil {
293 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID)
294 return nil
295 }
296 // if fc.cli.PrintChat {
297 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text)
298 // }
299 fp := &model.FeedPost{
300 CID: cid,
301 CreatedAt: createdAt,
302 FeedPost: recCBOR,
303 RepoDID: userDID,
304 Type: "reply",
305 Repo: repo,
306 ReplyRootURI: &livestream.PostURI,
307 ReplyRootRepoDID: &livestream.RepoDID,
308 URI: aturi.String(),
309 IndexedAt: &now,
310 }
311 err = atsync.Model.CreateFeedPost(ctx, fp)
312 if err != nil {
313 log.Error(ctx, "failed to create feed post", "err", err)
314 }
315 postView, err := fp.ToBskyPostView()
316 if err != nil {
317 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err)
318 }
319 go atsync.Bus.Publish(livestream.RepoDID, postView)
320 }
321
322 case *streamplace.Livestream:
323 if r == nil {
324 // we don't know about this repo
325 return nil
326 }
327 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
328 if err != nil {
329 log.Error(ctx, "failed to parse createdAt", "err", err)
330 return nil
331 }
332 ls := &model.Livestream{
333 CID: cid,
334 URI: aturi.String(),
335 CreatedAt: createdAt,
336 Livestream: recCBOR,
337 RepoDID: userDID,
338 }
339 if rec.Post != nil {
340 ls.PostCID = rec.Post.Cid
341 ls.PostURI = rec.Post.Uri
342 }
343 err = atsync.Model.CreateLivestream(ctx, ls)
344 if err != nil {
345 return fmt.Errorf("failed to create livestream: %w", err)
346 }
347 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID)
348 if err != nil {
349 return fmt.Errorf("failed to get latest livestream for repo: %w", err)
350 }
351 lsv, err := lsHydrated.ToLivestreamView()
352 if err != nil {
353 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err)
354 }
355 go atsync.Bus.Publish(userDID, lsv)
356
357 var postView *bsky.FeedDefs_PostView
358 if lsHydrated.Post != nil {
359 postView, err = lsHydrated.Post.ToBskyPostView()
360 if err != nil {
361 return fmt.Errorf("failed to convert livestream post to bsky post view: %w", err)
362 }
363 }
364
365 task := &statedb.NotificationTask{
366 Livestream: lsv,
367 FeedPost: postView,
368 PDSURL: r.PDS,
369 }
370
371 cp, err := atsync.Model.GetChatProfile(ctx, userDID)
372 if err != nil {
373 return fmt.Errorf("failed to get chat profile: %w", err)
374 }
375 if cp != nil {
376 spcp, err := cp.ToStreamplaceChatProfile()
377 if err != nil {
378 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err)
379 }
380 task.ChatProfile = spcp
381 }
382
383 case *streamplace.LiveTeleport:
384 if r == nil {
385 return nil
386 }
387 startsAt, err := time.Parse(time.RFC3339, rec.StartsAt)
388 if err != nil {
389 log.Error(ctx, "failed to parse startsAt", "err", err)
390 return nil
391 }
392 viewerCount := atsync.Bus.GetViewerCount(userDID)
393 tp := &model.Teleport{
394 CID: cid,
395 URI: aturi.String(),
396 StartsAt: startsAt,
397 DurationSeconds: rec.DurationSeconds,
398 ViewerCount: int64(viewerCount),
399 Teleport: recCBOR,
400 RepoDID: userDID,
401 TargetDID: rec.Streamer,
402 }
403 err = atsync.Model.CreateTeleport(ctx, tp)
404 if err != nil {
405 return fmt.Errorf("failed to create teleport: %w", err)
406 }
407 go atsync.Bus.Publish(userDID, rec)
408
409 // schedule arrival notification 10 seconds after startsAt
410 arrivalTime := startsAt.Add(10 * time.Second)
411 waitDuration := time.Until(arrivalTime)
412 if waitDuration < 0 {
413 waitDuration = 0
414 }
415
416 time.AfterFunc(waitDuration, func() {
417 // verify teleport still exists
418 existingTp, err := atsync.Model.GetTeleportByURI(aturi.String())
419 if err != nil {
420 log.Error(ctx, "failed to get teleport by uri", "err", err)
421 return
422 }
423 if existingTp == nil || existingTp.Denied {
424 log.Debug(ctx, "teleport no longer active, skipping arrival notification", "uri", aturi.String())
425 return
426 }
427
428 // get the source profile
429 sourceRepo, err := atsync.Model.GetRepo(userDID)
430 if err != nil {
431 log.Error(ctx, "failed to get source repo", "err", err)
432 return
433 }
434
435 viewerCount := existingTp.ViewerCount
436
437 arrivalMsg := &streamplace.Livestream_TeleportArrival{
438 LexiconTypeID: "place.stream.livestream#teleportArrival",
439 TeleportUri: aturi.String(),
440 Source: &bsky.ActorDefs_ProfileViewBasic{
441 Did: userDID,
442 Handle: sourceRepo.Handle,
443 },
444 ViewerCount: int64(viewerCount),
445 StartsAt: rec.StartsAt,
446 }
447
448 // get the source chat profile
449 chatProfile, err := atsync.Model.GetChatProfile(ctx, userDID)
450 if err == nil && chatProfile != nil {
451 spcp, err := chatProfile.ToStreamplaceChatProfile()
452 if err == nil {
453 arrivalMsg.ChatProfile = spcp
454 }
455 }
456
457 atsync.Bus.Publish(rec.Streamer, arrivalMsg)
458 })
459
460 case *streamplace.Key:
461 log.Debug(ctx, "creating key", "key", rec)
462 time, err := aqtime.FromString(rec.CreatedAt)
463 if err != nil {
464 return fmt.Errorf("failed to parse createdAt: %w", err)
465 }
466 key := model.SigningKey{
467 DID: rec.SigningKey,
468 RKey: rkey.String(),
469 CreatedAt: time.Time(),
470 RepoDID: userDID,
471 }
472 err = atsync.Model.UpdateSigningKey(&key)
473 if err != nil {
474 log.Error(ctx, "failed to create signing key", "err", err)
475 }
476
477 case *streamplace.BroadcastOrigin:
478 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
479 if err != nil {
480 return fmt.Errorf("failed to sync broadcast origin creator bluesky repo: %w", err)
481 }
482 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model)
483 if err != nil {
484 return fmt.Errorf("failed to sync broadcast origin streamer bluesky repo: %w", err)
485 }
486 err = atsync.Model.UpdateBroadcastOrigin(ctx, rec, aturi)
487 if err != nil {
488 log.Error(ctx, "failed to update broadcast origin", "err", err)
489 }
490 view := &streamplace.BroadcastDefs_BroadcastOriginView{
491 Uri: aturi.String(),
492 Cid: cid,
493 Author: &bsky.ActorDefs_ProfileViewBasic{
494 Did: userDID,
495 Handle: repo.Handle,
496 },
497 Record: &lexutil.LexiconTypeDecoder{Val: rec},
498 }
499 // publishes with an empty string because we're discovering the stream
500 go atsync.Bus.Publish("", view)
501
502 case *streamplace.MetadataConfiguration:
503 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
504 if err != nil {
505 return fmt.Errorf("failed to sync bluesky repo: %w", err)
506 }
507 log.Debug(ctx, "creating metadata configuration", "metadata", rec)
508 metadata := &model.MetadataConfiguration{
509 RepoDID: userDID,
510 Record: recCBOR,
511 Repo: repo,
512 }
513 err = atsync.Model.CreateMetadataConfiguration(ctx, metadata)
514 if err != nil {
515 log.Error(ctx, "failed to create metadata configuration", "err", err)
516 }
517
518 case *streamplace.ModerationPermission:
519 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
520 if err != nil {
521 return fmt.Errorf("failed to sync bluesky repo: %w", err)
522 }
523 log.Debug(ctx, "creating moderation delegation", "streamerDID", userDID, "moderatorDID", rec.Moderator)
524
525 err = atsync.Model.CreateModerationDelegation(ctx, rec, aturi)
526 if err != nil {
527 return fmt.Errorf("failed to create moderation delegation: %w", err)
528 }
529
530 view := &streamplace.ModerationDefs_PermissionView{
531 Uri: aturi.String(),
532 Cid: cid,
533 Author: &bsky.ActorDefs_ProfileViewBasic{
534 Did: userDID,
535 Handle: repo.Handle,
536 },
537 Record: &lexutil.LexiconTypeDecoder{Val: rec},
538 }
539 // Publish moderation permission view to WebSocket bus for real-time updates
540 // This allows moderators to see their permissions instantly without page refresh
541 go atsync.Bus.Publish(userDID, view)
542
543 case *streamplace.LiveRecommendations:
544 log.Debug(ctx, "creating recommendations", "userDID", userDID, "count", len(rec.Streamers))
545
546 // Validate max 8 streamers
547 if len(rec.Streamers) > 8 {
548 log.Warn(ctx, "recommendations exceed maximum of 8", "count", len(rec.Streamers))
549 return fmt.Errorf("maximum 8 recommendations allowed, got %d", len(rec.Streamers))
550 }
551
552 // Marshal streamers to JSON
553 streamersJSON, err := json.Marshal(rec.Streamers)
554 if err != nil {
555 return fmt.Errorf("failed to marshal streamers: %w", err)
556 }
557
558 // Parse createdAt timestamp
559 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
560 if err != nil {
561 return fmt.Errorf("failed to parse createdAt: %w", err)
562 }
563
564 recommendation := &model.Recommendation{
565 UserDID: userDID,
566 Streamers: json.RawMessage(streamersJSON),
567 CreatedAt: createdAt,
568 }
569
570 err = atsync.Model.UpsertRecommendation(recommendation)
571 if err != nil {
572 return fmt.Errorf("failed to upsert recommendation: %w", err)
573 }
574
575 default:
576 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec))
577 }
578 return nil
579}