Live video on the AT Protocol
1package atproto
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "reflect"
9 "time"
10
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/atproto/atdata"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "stream.place/streamplace/pkg/aqtime"
15 "stream.place/streamplace/pkg/log"
16 "stream.place/streamplace/pkg/model"
17 "stream.place/streamplace/pkg/statedb"
18 "stream.place/streamplace/pkg/streamplace"
19
20 lexutil "github.com/bluesky-social/indigo/lex/util"
21)
22
23func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error {
24 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String())
25 now := time.Now()
26 r, err := atsync.Model.GetRepo(userDID)
27 if err != nil {
28 return fmt.Errorf("failed to get repo: %w", err)
29 }
30 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String())
31 aturi, err := syntax.ParseATURI(maybeATURI)
32 if err != nil {
33 return fmt.Errorf("failed to parse ATURI: %w", err)
34 }
35 d, err := atdata.UnmarshalCBOR(*recCBOR)
36 if err != nil {
37 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err)
38 }
39 cb, err := lexutil.CborDecodeValue(*recCBOR)
40 if errors.Is(err, lexutil.ErrUnrecognizedType) {
41 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err)
42 return nil
43 } else if err != nil {
44 return fmt.Errorf("failed to decode record CBOR: %w", err)
45 }
46 switch rec := cb.(type) {
47 case *bsky.GraphFollow:
48 if r == nil {
49 // someone we don't know about
50 return nil
51 }
52 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject)
53 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec)
54 if err != nil {
55 log.Debug(ctx, "failed to create follow", "err", err)
56 }
57
58 case *bsky.GraphBlock:
59 if r == nil {
60 // someone we don't know about
61 return nil
62 }
63 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject)
64 block := &model.Block{
65 RKey: rkey.String(),
66 RepoDID: userDID,
67 SubjectDID: rec.Subject,
68 Record: *recCBOR,
69 CID: cid,
70 }
71 err := atsync.Model.CreateBlock(ctx, block)
72 if err != nil {
73 return fmt.Errorf("failed to create block: %w", err)
74 }
75 block, err = atsync.Model.GetBlock(ctx, rkey.String())
76 if err != nil {
77 return fmt.Errorf("failed to get block after we just saved it?!: %w", err)
78 }
79 streamplaceBlock, err := block.ToStreamplaceBlock()
80 if err != nil {
81 return fmt.Errorf("failed to convert block to streamplace block: %w", err)
82 }
83 go atsync.Bus.Publish(userDID, streamplaceBlock)
84
85 case *streamplace.ChatMessage:
86 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
87 if err != nil {
88 return fmt.Errorf("failed to sync bluesky repo: %w", err)
89 }
90
91 go func() {
92 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model)
93 if err != nil {
94 log.Error(ctx, "failed to sync bluesky repo", "err", err)
95 }
96 }()
97
98 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle)
99 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID)
100 if err != nil {
101 return fmt.Errorf("failed to get user block: %w", err)
102 }
103 if block != nil {
104 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer)
105 return nil
106 }
107 mcm := &model.ChatMessage{
108 CID: cid,
109 URI: aturi.String(),
110 CreatedAt: now,
111 ChatMessage: recCBOR,
112 RepoDID: userDID,
113 Repo: repo,
114 StreamerRepoDID: rec.Streamer,
115 IndexedAt: &now,
116 }
117 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil {
118 mcm.ReplyToCID = &rec.Reply.Parent.Cid
119 }
120 err = atsync.Model.CreateChatMessage(ctx, mcm)
121 if err != nil {
122 log.Error(ctx, "failed to create chat message", "err", err)
123 return nil
124 }
125 mcm, err = atsync.Model.GetChatMessage(aturi.String())
126 if err != nil {
127 log.Error(ctx, "failed to get just-saved chat message", "err", err)
128 return nil
129 }
130 if mcm == nil {
131 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err)
132 return nil
133 }
134 scm, err := mcm.ToStreamplaceMessageView()
135 if err != nil {
136 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err)
137 return nil
138 }
139 go atsync.Bus.Publish(rec.Streamer, scm)
140
141 if !isUpdate && !isFirstSync {
142
143 task := &statedb.ChatTask{
144 MessageView: scm,
145 }
146
147 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskChat, task, statedb.WithTaskKey(fmt.Sprintf("chat-message::%s", aturi.String())))
148 if err != nil {
149 log.Error(ctx, "failed to enqueue notification task", "err", err)
150 }
151 }
152
153 case *streamplace.ChatGate:
154 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
155 if err != nil {
156 return fmt.Errorf("failed to sync bluesky repo: %w", err)
157 }
158 if r == nil {
159 // someone we don't know about
160 return nil
161 }
162 log.Debug(ctx, "creating gate", "userDID", userDID, "hiddenMessage", rec.HiddenMessage)
163 gate := &model.Gate{
164 RKey: rkey.String(),
165 RepoDID: userDID,
166 HiddenMessage: rec.HiddenMessage,
167 CID: cid,
168 CreatedAt: now,
169 Repo: repo,
170 }
171 err = atsync.Model.CreateGate(ctx, gate)
172 if err != nil {
173 return fmt.Errorf("failed to create gate: %w", err)
174 }
175 gate, err = atsync.Model.GetGate(ctx, rkey.String())
176 if err != nil {
177 return fmt.Errorf("failed to get gate after we just saved it?!: %w", err)
178 }
179 streamplaceGate, err := gate.ToStreamplaceGate()
180 if err != nil {
181 return fmt.Errorf("failed to convert gate to streamplace gate: %w", err)
182 }
183 go atsync.Bus.Publish(userDID, streamplaceGate)
184
185 case *streamplace.ChatProfile:
186 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
187 if err != nil {
188 return fmt.Errorf("failed to sync bluesky repo: %w", err)
189 }
190 mcm := &model.ChatProfile{
191 RepoDID: userDID,
192 Repo: repo,
193 Record: recCBOR,
194 }
195 err = atsync.Model.CreateChatProfile(ctx, mcm)
196 if err != nil {
197 log.Error(ctx, "failed to create chat profile", "err", err)
198 }
199
200 case *streamplace.ServerSettings:
201 _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
202 if err != nil {
203 return fmt.Errorf("failed to sync bluesky repo: %w", err)
204 }
205 settings := &model.ServerSettings{
206 Server: rkey.String(),
207 RepoDID: userDID,
208 Record: recCBOR,
209 }
210 err = atsync.Model.UpdateServerSettings(ctx, settings)
211 if err != nil {
212 log.Error(ctx, "failed to create server settings", "err", err)
213 }
214
215 case *bsky.FeedPost:
216 // jsonData, err := json.Marshal(d)
217 // if err != nil {
218 // log.Error(ctx, "failed to marshal record data", "err", err)
219 // } else {
220 // log.Log(ctx, "record data", "json", string(jsonData))
221 // }
222
223 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
224 if err != nil {
225 return fmt.Errorf("failed to parse createdAt: %w", err)
226 }
227
228 if livestream, ok := d["place.stream.livestream"]; ok {
229 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
230 if err != nil {
231 return fmt.Errorf("failed to sync bluesky repo: %w", err)
232 }
233 livestream, ok := livestream.(map[string]interface{})
234 if !ok {
235 return fmt.Errorf("livestream is not a map")
236 }
237 url, ok := livestream["url"].(string)
238 if !ok {
239 return fmt.Errorf("livestream url is not a string")
240 }
241 log.Debug(ctx, "livestream url", "url", url)
242 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{
243 CID: cid,
244 CreatedAt: createdAt,
245 FeedPost: recCBOR,
246 RepoDID: userDID,
247 Repo: repo,
248 Type: "livestream",
249 URI: aturi.String(),
250 IndexedAt: &now,
251 }); err != nil {
252 return fmt.Errorf("failed to create bluesky post: %w", err)
253 }
254 } else {
255 if rec.Reply == nil || rec.Reply.Root == nil {
256 return nil
257 }
258 livestream, err := atsync.Model.GetLivestreamByPostURI(rec.Reply.Root.Uri)
259 if err != nil {
260 return fmt.Errorf("failed to get livestream: %w", err)
261 }
262 if livestream == nil {
263 return nil
264 }
265 // log.Warn(ctx, "chat message detected", "uri", livestream.URI)
266 // if this post is a reply to someone's livestream post
267 // log.Warn(ctx, "chat message detected", "message", rec.Text)
268 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
269 if err != nil {
270 return fmt.Errorf("failed to sync bluesky repo: %w", err)
271 }
272
273 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle)
274 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID)
275 if err != nil {
276 return fmt.Errorf("failed to get user block: %w", err)
277 }
278 if block != nil {
279 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID)
280 return nil
281 }
282 // if fc.cli.PrintChat {
283 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text)
284 // }
285 fp := &model.FeedPost{
286 CID: cid,
287 CreatedAt: createdAt,
288 FeedPost: recCBOR,
289 RepoDID: userDID,
290 Type: "reply",
291 Repo: repo,
292 ReplyRootURI: &livestream.PostURI,
293 ReplyRootRepoDID: &livestream.RepoDID,
294 URI: aturi.String(),
295 IndexedAt: &now,
296 }
297 err = atsync.Model.CreateFeedPost(ctx, fp)
298 if err != nil {
299 log.Error(ctx, "failed to create feed post", "err", err)
300 }
301 postView, err := fp.ToBskyPostView()
302 if err != nil {
303 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err)
304 }
305 go atsync.Bus.Publish(livestream.RepoDID, postView)
306 }
307
308 case *streamplace.Livestream:
309 if r == nil {
310 // we don't know about this repo
311 return nil
312 }
313 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
314 if err != nil {
315 log.Error(ctx, "failed to parse createdAt", "err", err)
316 return nil
317 }
318 ls := &model.Livestream{
319 CID: cid,
320 URI: aturi.String(),
321 CreatedAt: createdAt,
322 Livestream: recCBOR,
323 RepoDID: userDID,
324 }
325 if rec.Post != nil {
326 ls.PostCID = rec.Post.Cid
327 ls.PostURI = rec.Post.Uri
328 }
329 err = atsync.Model.CreateLivestream(ctx, ls)
330 if err != nil {
331 return fmt.Errorf("failed to create livestream: %w", err)
332 }
333 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID)
334 if err != nil {
335 return fmt.Errorf("failed to get latest livestream for repo: %w", err)
336 }
337 lsv, err := lsHydrated.ToLivestreamView()
338 if err != nil {
339 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err)
340 }
341 go atsync.Bus.Publish(userDID, lsv)
342
343 var postView *bsky.FeedDefs_PostView
344 if lsHydrated.Post != nil {
345 postView, err = lsHydrated.Post.ToBskyPostView()
346 if err != nil {
347 return fmt.Errorf("failed to convert livestream post to bsky post view: %w", err)
348 }
349 }
350
351 task := &statedb.NotificationTask{
352 Livestream: lsv,
353 FeedPost: postView,
354 PDSURL: r.PDS,
355 }
356
357 cp, err := atsync.Model.GetChatProfile(ctx, userDID)
358 if err != nil {
359 return fmt.Errorf("failed to get chat profile: %w", err)
360 }
361 if cp != nil {
362 spcp, err := cp.ToStreamplaceChatProfile()
363 if err != nil {
364 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err)
365 }
366 task.ChatProfile = spcp
367 }
368
369 case *streamplace.Key:
370 log.Debug(ctx, "creating key", "key", rec)
371 time, err := aqtime.FromString(rec.CreatedAt)
372 if err != nil {
373 return fmt.Errorf("failed to parse createdAt: %w", err)
374 }
375 key := model.SigningKey{
376 DID: rec.SigningKey,
377 RKey: rkey.String(),
378 CreatedAt: time.Time(),
379 RepoDID: userDID,
380 }
381 err = atsync.Model.UpdateSigningKey(&key)
382 if err != nil {
383 log.Error(ctx, "failed to create signing key", "err", err)
384 }
385
386 case *streamplace.BroadcastOrigin:
387 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
388 if err != nil {
389 return fmt.Errorf("failed to sync broadcast origin creator bluesky repo: %w", err)
390 }
391 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model)
392 if err != nil {
393 return fmt.Errorf("failed to sync broadcast origin streamer bluesky repo: %w", err)
394 }
395 err = atsync.Model.UpdateBroadcastOrigin(ctx, rec, aturi)
396 if err != nil {
397 log.Error(ctx, "failed to update broadcast origin", "err", err)
398 }
399 view := &streamplace.BroadcastDefs_BroadcastOriginView{
400 Uri: aturi.String(),
401 Cid: cid,
402 Author: &bsky.ActorDefs_ProfileViewBasic{
403 Did: userDID,
404 Handle: repo.Handle,
405 },
406 Record: &lexutil.LexiconTypeDecoder{Val: rec},
407 }
408 // publishes with an empty string because we're discovering the stream
409 go atsync.Bus.Publish("", view)
410
411 case *streamplace.MetadataConfiguration:
412 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
413 if err != nil {
414 return fmt.Errorf("failed to sync bluesky repo: %w", err)
415 }
416 log.Debug(ctx, "creating metadata configuration", "metadata", rec)
417 metadata := &model.MetadataConfiguration{
418 RepoDID: userDID,
419 Record: recCBOR,
420 Repo: repo,
421 }
422 err = atsync.Model.CreateMetadataConfiguration(ctx, metadata)
423 if err != nil {
424 log.Error(ctx, "failed to create metadata configuration", "err", err)
425 }
426
427 case *streamplace.ModerationPermission:
428 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
429 if err != nil {
430 return fmt.Errorf("failed to sync bluesky repo: %w", err)
431 }
432 log.Debug(ctx, "creating moderation delegation", "streamerDID", userDID, "moderatorDID", rec.Moderator)
433
434 err = atsync.Model.CreateModerationDelegation(ctx, rec, aturi)
435 if err != nil {
436 return fmt.Errorf("failed to create moderation delegation: %w", err)
437 }
438
439 view := &streamplace.ModerationDefs_PermissionView{
440 Uri: aturi.String(),
441 Cid: cid,
442 Author: &bsky.ActorDefs_ProfileViewBasic{
443 Did: userDID,
444 Handle: repo.Handle,
445 },
446 Record: &lexutil.LexiconTypeDecoder{Val: rec},
447 }
448 // Publish moderation permission view to WebSocket bus for real-time updates
449 // This allows moderators to see their permissions instantly without page refresh
450 go atsync.Bus.Publish(userDID, view)
451
452 case *streamplace.LiveRecommendations:
453 log.Debug(ctx, "creating recommendations", "userDID", userDID, "count", len(rec.Streamers))
454
455 // Validate max 8 streamers
456 if len(rec.Streamers) > 8 {
457 log.Warn(ctx, "recommendations exceed maximum of 8", "count", len(rec.Streamers))
458 return fmt.Errorf("maximum 8 recommendations allowed, got %d", len(rec.Streamers))
459 }
460
461 // Marshal streamers to JSON
462 streamersJSON, err := json.Marshal(rec.Streamers)
463 if err != nil {
464 return fmt.Errorf("failed to marshal streamers: %w", err)
465 }
466
467 // Parse createdAt timestamp
468 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
469 if err != nil {
470 return fmt.Errorf("failed to parse createdAt: %w", err)
471 }
472
473 recommendation := &model.Recommendation{
474 UserDID: userDID,
475 Streamers: json.RawMessage(streamersJSON),
476 CreatedAt: createdAt,
477 }
478
479 err = atsync.Model.UpsertRecommendation(recommendation)
480 if err != nil {
481 return fmt.Errorf("failed to upsert recommendation: %w", err)
482 }
483
484 default:
485 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec))
486 }
487 return nil
488}