Live video on the AT Protocol
1package atproto
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "reflect"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 "github.com/bluesky-social/indigo/atproto/data"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "stream.place/streamplace/pkg/aqtime"
14 "stream.place/streamplace/pkg/log"
15 "stream.place/streamplace/pkg/model"
16 "stream.place/streamplace/pkg/statedb"
17 "stream.place/streamplace/pkg/streamplace"
18
19 lexutil "github.com/bluesky-social/indigo/lex/util"
20)
21
22func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error {
23 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String())
24 now := time.Now()
25 r, err := atsync.Model.GetRepo(userDID)
26 if err != nil {
27 return fmt.Errorf("failed to get repo: %w", err)
28 }
29 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String())
30 aturi, err := syntax.ParseATURI(maybeATURI)
31 if err != nil {
32 return fmt.Errorf("failed to parse ATURI: %w", err)
33 }
34 d, err := data.UnmarshalCBOR(*recCBOR)
35 if err != nil {
36 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err)
37 }
38 cb, err := lexutil.CborDecodeValue(*recCBOR)
39 if errors.Is(err, lexutil.ErrUnrecognizedType) {
40 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err)
41 return nil
42 } else if err != nil {
43 return fmt.Errorf("failed to decode record CBOR: %w", err)
44 }
45 switch rec := cb.(type) {
46 case *bsky.GraphFollow:
47 if r == nil {
48 // someone we don't know about
49 return nil
50 }
51 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject)
52 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec)
53 if err != nil {
54 log.Debug(ctx, "failed to create follow", "err", err)
55 }
56
57 case *bsky.GraphBlock:
58 if r == nil {
59 // someone we don't know about
60 return nil
61 }
62 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject)
63 block := &model.Block{
64 RKey: rkey.String(),
65 RepoDID: userDID,
66 SubjectDID: rec.Subject,
67 Record: *recCBOR,
68 CID: cid,
69 }
70 err := atsync.Model.CreateBlock(ctx, block)
71 if err != nil {
72 return fmt.Errorf("failed to create block: %w", err)
73 }
74 block, err = atsync.Model.GetBlock(ctx, rkey.String())
75 if err != nil {
76 return fmt.Errorf("failed to get block after we just saved it?!: %w", err)
77 }
78 streamplaceBlock, err := block.ToStreamplaceBlock()
79 if err != nil {
80 return fmt.Errorf("failed to convert block to streamplace block: %w", err)
81 }
82 go atsync.Bus.Publish(userDID, streamplaceBlock)
83
84 case *streamplace.ChatMessage:
85 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
86 if err != nil {
87 return fmt.Errorf("failed to sync bluesky repo: %w", err)
88 }
89
90 go func() {
91 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model)
92 if err != nil {
93 log.Error(ctx, "failed to sync bluesky repo", "err", err)
94 }
95 }()
96
97 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle)
98 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID)
99 if err != nil {
100 return fmt.Errorf("failed to get user block: %w", err)
101 }
102 if block != nil {
103 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer)
104 return nil
105 }
106 mcm := &model.ChatMessage{
107 CID: cid,
108 URI: aturi.String(),
109 CreatedAt: now,
110 ChatMessage: recCBOR,
111 RepoDID: userDID,
112 Repo: repo,
113 StreamerRepoDID: rec.Streamer,
114 IndexedAt: &now,
115 }
116 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil {
117 mcm.ReplyToCID = &rec.Reply.Parent.Cid
118 }
119 err = atsync.Model.CreateChatMessage(ctx, mcm)
120 if err != nil {
121 log.Error(ctx, "failed to create chat message", "err", err)
122 return nil
123 }
124 mcm, err = atsync.Model.GetChatMessage(aturi.String())
125 if err != nil {
126 log.Error(ctx, "failed to get just-saved chat message", "err", err)
127 return nil
128 }
129 if mcm == nil {
130 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err)
131 return nil
132 }
133 scm, err := mcm.ToStreamplaceMessageView()
134 if err != nil {
135 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err)
136 return nil
137 }
138 go atsync.Bus.Publish(rec.Streamer, scm)
139
140 if !isUpdate && !isFirstSync {
141
142 task := &statedb.ChatTask{
143 MessageView: scm,
144 }
145
146 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskChat, task, statedb.WithTaskKey(fmt.Sprintf("chat-message::%s", aturi.String())))
147 if err != nil {
148 log.Error(ctx, "failed to enqueue notification task", "err", err)
149 }
150 }
151
152 case *streamplace.ChatGate:
153 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
154 if err != nil {
155 return fmt.Errorf("failed to sync bluesky repo: %w", err)
156 }
157 if r == nil {
158 // someone we don't know about
159 return nil
160 }
161 log.Debug(ctx, "creating gate", "userDID", userDID, "hiddenMessage", rec.HiddenMessage)
162 gate := &model.Gate{
163 RKey: rkey.String(),
164 RepoDID: userDID,
165 HiddenMessage: rec.HiddenMessage,
166 CID: cid,
167 CreatedAt: now,
168 Repo: repo,
169 }
170 err = atsync.Model.CreateGate(ctx, gate)
171 if err != nil {
172 return fmt.Errorf("failed to create gate: %w", err)
173 }
174 gate, err = atsync.Model.GetGate(ctx, rkey.String())
175 if err != nil {
176 return fmt.Errorf("failed to get gate after we just saved it?!: %w", err)
177 }
178 streamplaceGate, err := gate.ToStreamplaceGate()
179 if err != nil {
180 return fmt.Errorf("failed to convert gate to streamplace gate: %w", err)
181 }
182 go atsync.Bus.Publish(userDID, streamplaceGate)
183
184 case *streamplace.ChatProfile:
185 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
186 if err != nil {
187 return fmt.Errorf("failed to sync bluesky repo: %w", err)
188 }
189 mcm := &model.ChatProfile{
190 RepoDID: userDID,
191 Repo: repo,
192 Record: recCBOR,
193 }
194 err = atsync.Model.CreateChatProfile(ctx, mcm)
195 if err != nil {
196 log.Error(ctx, "failed to create chat profile", "err", err)
197 }
198
199 case *streamplace.ServerSettings:
200 _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
201 if err != nil {
202 return fmt.Errorf("failed to sync bluesky repo: %w", err)
203 }
204 settings := &model.ServerSettings{
205 Server: rkey.String(),
206 RepoDID: userDID,
207 Record: recCBOR,
208 }
209 err = atsync.Model.UpdateServerSettings(ctx, settings)
210 if err != nil {
211 log.Error(ctx, "failed to create server settings", "err", err)
212 }
213
214 case *bsky.FeedPost:
215 // jsonData, err := json.Marshal(d)
216 // if err != nil {
217 // log.Error(ctx, "failed to marshal record data", "err", err)
218 // } else {
219 // log.Log(ctx, "record data", "json", string(jsonData))
220 // }
221
222 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
223 if err != nil {
224 return fmt.Errorf("failed to parse createdAt: %w", err)
225 }
226
227 if livestream, ok := d["place.stream.livestream"]; ok {
228 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
229 if err != nil {
230 return fmt.Errorf("failed to sync bluesky repo: %w", err)
231 }
232 livestream, ok := livestream.(map[string]interface{})
233 if !ok {
234 return fmt.Errorf("livestream is not a map")
235 }
236 url, ok := livestream["url"].(string)
237 if !ok {
238 return fmt.Errorf("livestream url is not a string")
239 }
240 log.Debug(ctx, "livestream url", "url", url)
241 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{
242 CID: cid,
243 CreatedAt: createdAt,
244 FeedPost: recCBOR,
245 RepoDID: userDID,
246 Repo: repo,
247 Type: "livestream",
248 URI: aturi.String(),
249 IndexedAt: &now,
250 }); err != nil {
251 return fmt.Errorf("failed to create bluesky post: %w", err)
252 }
253 } else {
254 if rec.Reply == nil || rec.Reply.Root == nil {
255 return nil
256 }
257 livestream, err := atsync.Model.GetLivestreamByPostURI(rec.Reply.Root.Uri)
258 if err != nil {
259 return fmt.Errorf("failed to get livestream: %w", err)
260 }
261 if livestream == nil {
262 return nil
263 }
264 // log.Warn(ctx, "chat message detected", "uri", livestream.URI)
265 // if this post is a reply to someone's livestream post
266 // log.Warn(ctx, "chat message detected", "message", rec.Text)
267 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
268 if err != nil {
269 return fmt.Errorf("failed to sync bluesky repo: %w", err)
270 }
271
272 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle)
273 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID)
274 if err != nil {
275 return fmt.Errorf("failed to get user block: %w", err)
276 }
277 if block != nil {
278 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID)
279 return nil
280 }
281 // if fc.cli.PrintChat {
282 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text)
283 // }
284 fp := &model.FeedPost{
285 CID: cid,
286 CreatedAt: createdAt,
287 FeedPost: recCBOR,
288 RepoDID: userDID,
289 Type: "reply",
290 Repo: repo,
291 ReplyRootURI: &livestream.PostURI,
292 ReplyRootRepoDID: &livestream.RepoDID,
293 URI: aturi.String(),
294 IndexedAt: &now,
295 }
296 err = atsync.Model.CreateFeedPost(ctx, fp)
297 if err != nil {
298 log.Error(ctx, "failed to create feed post", "err", err)
299 }
300 postView, err := fp.ToBskyPostView()
301 if err != nil {
302 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err)
303 }
304 go atsync.Bus.Publish(livestream.RepoDID, postView)
305 }
306
307 case *streamplace.Livestream:
308 if r == nil {
309 // we don't know about this repo
310 return nil
311 }
312 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
313 if err != nil {
314 log.Error(ctx, "failed to parse createdAt", "err", err)
315 return nil
316 }
317 ls := &model.Livestream{
318 CID: cid,
319 URI: aturi.String(),
320 CreatedAt: createdAt,
321 Livestream: recCBOR,
322 RepoDID: userDID,
323 }
324 if rec.Post != nil {
325 ls.PostCID = rec.Post.Cid
326 ls.PostURI = rec.Post.Uri
327 }
328 err = atsync.Model.CreateLivestream(ctx, ls)
329 if err != nil {
330 return fmt.Errorf("failed to create livestream: %w", err)
331 }
332 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID)
333 if err != nil {
334 return fmt.Errorf("failed to get latest livestream for repo: %w", err)
335 }
336 lsv, err := lsHydrated.ToLivestreamView()
337 if err != nil {
338 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err)
339 }
340 go atsync.Bus.Publish(userDID, lsv)
341
342 var postView *bsky.FeedDefs_PostView
343 if lsHydrated.Post != nil {
344 postView, err = lsHydrated.Post.ToBskyPostView()
345 if err != nil {
346 return fmt.Errorf("failed to convert livestream post to bsky post view: %w", err)
347 }
348 }
349
350 task := &statedb.NotificationTask{
351 Livestream: lsv,
352 FeedPost: postView,
353 PDSURL: r.PDS,
354 }
355
356 cp, err := atsync.Model.GetChatProfile(ctx, userDID)
357 if err != nil {
358 return fmt.Errorf("failed to get chat profile: %w", err)
359 }
360 if cp != nil {
361 spcp, err := cp.ToStreamplaceChatProfile()
362 if err != nil {
363 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err)
364 }
365 task.ChatProfile = spcp
366 }
367
368 if !isUpdate && !isFirstSync {
369 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", aturi.String())))
370 if err != nil {
371 log.Error(ctx, "failed to enqueue notification task", "err", err)
372 }
373 }
374
375 case *streamplace.Key:
376 log.Debug(ctx, "creating key", "key", rec)
377 time, err := aqtime.FromString(rec.CreatedAt)
378 if err != nil {
379 return fmt.Errorf("failed to parse createdAt: %w", err)
380 }
381 key := model.SigningKey{
382 DID: rec.SigningKey,
383 RKey: rkey.String(),
384 CreatedAt: time.Time(),
385 RepoDID: userDID,
386 }
387 err = atsync.Model.UpdateSigningKey(&key)
388 if err != nil {
389 log.Error(ctx, "failed to create signing key", "err", err)
390 }
391
392 case *streamplace.BroadcastOrigin:
393 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
394 if err != nil {
395 return fmt.Errorf("failed to sync broadcast origin creator bluesky repo: %w", err)
396 }
397 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model)
398 if err != nil {
399 return fmt.Errorf("failed to sync broadcast origin streamer bluesky repo: %w", err)
400 }
401 err = atsync.Model.UpdateBroadcastOrigin(ctx, rec, aturi)
402 if err != nil {
403 log.Error(ctx, "failed to update broadcast origin", "err", err)
404 }
405 view := &streamplace.BroadcastDefs_BroadcastOriginView{
406 Uri: aturi.String(),
407 Cid: cid,
408 Author: &bsky.ActorDefs_ProfileViewBasic{
409 Did: userDID,
410 Handle: repo.Handle,
411 },
412 Record: &lexutil.LexiconTypeDecoder{Val: rec},
413 }
414 // publishes with an empty string because we're discovering the stream
415 go atsync.Bus.Publish("", view)
416
417 case *streamplace.MetadataConfiguration:
418 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
419 if err != nil {
420 return fmt.Errorf("failed to sync bluesky repo: %w", err)
421 }
422 log.Debug(ctx, "creating metadata configuration", "metadata", rec)
423 metadata := &model.MetadataConfiguration{
424 RepoDID: userDID,
425 Record: recCBOR,
426 Repo: repo,
427 }
428 err = atsync.Model.CreateMetadataConfiguration(ctx, metadata)
429 if err != nil {
430 log.Error(ctx, "failed to create metadata configuration", "err", err)
431 }
432
433 default:
434 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec))
435 }
436 return nil
437}