Live video on the AT Protocol
1package atproto
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "reflect"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 "github.com/bluesky-social/indigo/atproto/data"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "stream.place/streamplace/pkg/aqtime"
14 "stream.place/streamplace/pkg/log"
15 "stream.place/streamplace/pkg/model"
16 "stream.place/streamplace/pkg/statedb"
17 "stream.place/streamplace/pkg/streamplace"
18
19 lexutil "github.com/bluesky-social/indigo/lex/util"
20)
21
22func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error {
23 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String())
24 now := time.Now()
25 r, err := atsync.Model.GetRepo(userDID)
26 if err != nil {
27 return fmt.Errorf("failed to get repo: %w", err)
28 }
29 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String())
30 aturi, err := syntax.ParseATURI(maybeATURI)
31 if err != nil {
32 return fmt.Errorf("failed to parse ATURI: %w", err)
33 }
34 d, err := data.UnmarshalCBOR(*recCBOR)
35 if err != nil {
36 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err)
37 }
38 cb, err := lexutil.CborDecodeValue(*recCBOR)
39 if errors.Is(err, lexutil.ErrUnrecognizedType) {
40 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err)
41 return nil
42 } else if err != nil {
43 return fmt.Errorf("failed to decode record CBOR: %w", err)
44 }
45 switch rec := cb.(type) {
46 case *bsky.GraphFollow:
47 if r == nil {
48 // someone we don't know about
49 return nil
50 }
51 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject)
52 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec)
53 if err != nil {
54 log.Debug(ctx, "failed to create follow", "err", err)
55 }
56
57 case *bsky.GraphBlock:
58 if r == nil {
59 // someone we don't know about
60 return nil
61 }
62 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject)
63 block := &model.Block{
64 RKey: rkey.String(),
65 RepoDID: userDID,
66 SubjectDID: rec.Subject,
67 Record: *recCBOR,
68 CID: cid,
69 }
70 err := atsync.Model.CreateBlock(ctx, block)
71 if err != nil {
72 return fmt.Errorf("failed to create block: %w", err)
73 }
74 block, err = atsync.Model.GetBlock(ctx, rkey.String())
75 if err != nil {
76 return fmt.Errorf("failed to get block after we just saved it?!: %w", err)
77 }
78 streamplaceBlock, err := block.ToStreamplaceBlock()
79 if err != nil {
80 return fmt.Errorf("failed to convert block to streamplace block: %w", err)
81 }
82 go atsync.Bus.Publish(userDID, streamplaceBlock)
83
84 case *streamplace.ChatMessage:
85 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
86 if err != nil {
87 return fmt.Errorf("failed to sync bluesky repo: %w", err)
88 }
89
90 go func() {
91 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model)
92 if err != nil {
93 log.Error(ctx, "failed to sync bluesky repo", "err", err)
94 }
95 }()
96
97 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle)
98 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID)
99 if err != nil {
100 return fmt.Errorf("failed to get user block: %w", err)
101 }
102 if block != nil {
103 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer)
104 return nil
105 }
106 mcm := &model.ChatMessage{
107 CID: cid,
108 URI: aturi.String(),
109 CreatedAt: now,
110 ChatMessage: recCBOR,
111 RepoDID: userDID,
112 Repo: repo,
113 StreamerRepoDID: rec.Streamer,
114 IndexedAt: &now,
115 }
116 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil {
117 mcm.ReplyToCID = &rec.Reply.Parent.Cid
118 }
119 err = atsync.Model.CreateChatMessage(ctx, mcm)
120 if err != nil {
121 log.Error(ctx, "failed to create chat message", "err", err)
122 }
123 mcm, err = atsync.Model.GetChatMessage(cid)
124 if err != nil {
125 log.Error(ctx, "failed to get just-saved chat message", "err", err)
126 }
127 if mcm == nil {
128 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err)
129 return nil
130 }
131 scm, err := mcm.ToStreamplaceMessageView()
132 if err != nil {
133 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err)
134 }
135 go atsync.Bus.Publish(rec.Streamer, scm)
136
137 if !isUpdate && !isFirstSync {
138
139 task := &statedb.ChatTask{
140 MessageView: scm,
141 }
142
143 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskChat, task, statedb.WithTaskKey(fmt.Sprintf("chat-message::%s", aturi.String())))
144 if err != nil {
145 log.Error(ctx, "failed to enqueue notification task", "err", err)
146 }
147 }
148
149 case *streamplace.ChatGate:
150 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
151 if err != nil {
152 return fmt.Errorf("failed to sync bluesky repo: %w", err)
153 }
154 if r == nil {
155 // someone we don't know about
156 return nil
157 }
158 log.Debug(ctx, "creating gate", "userDID", userDID, "hiddenMessage", rec.HiddenMessage)
159 gate := &model.Gate{
160 RKey: rkey.String(),
161 RepoDID: userDID,
162 HiddenMessage: rec.HiddenMessage,
163 CID: cid,
164 CreatedAt: now,
165 Repo: repo,
166 }
167 err = atsync.Model.CreateGate(ctx, gate)
168 if err != nil {
169 return fmt.Errorf("failed to create gate: %w", err)
170 }
171 gate, err = atsync.Model.GetGate(ctx, rkey.String())
172 if err != nil {
173 return fmt.Errorf("failed to get gate after we just saved it?!: %w", err)
174 }
175 streamplaceGate, err := gate.ToStreamplaceGate()
176 if err != nil {
177 return fmt.Errorf("failed to convert gate to streamplace gate: %w", err)
178 }
179 go atsync.Bus.Publish(userDID, streamplaceGate)
180
181 case *streamplace.ChatProfile:
182 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
183 if err != nil {
184 return fmt.Errorf("failed to sync bluesky repo: %w", err)
185 }
186 mcm := &model.ChatProfile{
187 RepoDID: userDID,
188 Repo: repo,
189 Record: recCBOR,
190 }
191 err = atsync.Model.CreateChatProfile(ctx, mcm)
192 if err != nil {
193 log.Error(ctx, "failed to create chat profile", "err", err)
194 }
195
196 case *streamplace.ServerSettings:
197 _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
198 if err != nil {
199 return fmt.Errorf("failed to sync bluesky repo: %w", err)
200 }
201 settings := &model.ServerSettings{
202 Server: rkey.String(),
203 RepoDID: userDID,
204 Record: recCBOR,
205 }
206 err = atsync.Model.UpdateServerSettings(ctx, settings)
207 if err != nil {
208 log.Error(ctx, "failed to create server settings", "err", err)
209 }
210
211 case *bsky.FeedPost:
212 // jsonData, err := json.Marshal(d)
213 // if err != nil {
214 // log.Error(ctx, "failed to marshal record data", "err", err)
215 // } else {
216 // log.Log(ctx, "record data", "json", string(jsonData))
217 // }
218
219 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
220 if err != nil {
221 return fmt.Errorf("failed to parse createdAt: %w", err)
222 }
223
224 if livestream, ok := d["place.stream.livestream"]; ok {
225 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
226 if err != nil {
227 return fmt.Errorf("failed to sync bluesky repo: %w", err)
228 }
229 livestream, ok := livestream.(map[string]interface{})
230 if !ok {
231 return fmt.Errorf("livestream is not a map")
232 }
233 url, ok := livestream["url"].(string)
234 if !ok {
235 return fmt.Errorf("livestream url is not a string")
236 }
237 log.Debug(ctx, "livestream url", "url", url)
238 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{
239 CID: cid,
240 CreatedAt: createdAt,
241 FeedPost: recCBOR,
242 RepoDID: userDID,
243 Repo: repo,
244 Type: "livestream",
245 URI: aturi.String(),
246 IndexedAt: &now,
247 }); err != nil {
248 return fmt.Errorf("failed to create bluesky post: %w", err)
249 }
250 } else {
251 if rec.Reply == nil || rec.Reply.Root == nil {
252 return nil
253 }
254 livestream, err := atsync.Model.GetLivestreamByPostCID(rec.Reply.Root.Cid)
255 if err != nil {
256 return fmt.Errorf("failed to get livestream: %w", err)
257 }
258 if livestream == nil {
259 return nil
260 }
261 // log.Warn(ctx, "chat message detected", "uri", livestream.URI)
262 // if this post is a reply to someone's livestream post
263 // log.Warn(ctx, "chat message detected", "message", rec.Text)
264 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
265 if err != nil {
266 return fmt.Errorf("failed to sync bluesky repo: %w", err)
267 }
268
269 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle)
270 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID)
271 if err != nil {
272 return fmt.Errorf("failed to get user block: %w", err)
273 }
274 if block != nil {
275 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID)
276 return nil
277 }
278 // if fc.cli.PrintChat {
279 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text)
280 // }
281 fp := &model.FeedPost{
282 CID: cid,
283 CreatedAt: createdAt,
284 FeedPost: recCBOR,
285 RepoDID: userDID,
286 Type: "reply",
287 Repo: repo,
288 ReplyRootCID: &livestream.PostCID,
289 ReplyRootRepoDID: &livestream.RepoDID,
290 URI: aturi.String(),
291 IndexedAt: &now,
292 }
293 err = atsync.Model.CreateFeedPost(ctx, fp)
294 if err != nil {
295 log.Error(ctx, "failed to create feed post", "err", err)
296 }
297 postView, err := fp.ToBskyPostView()
298 if err != nil {
299 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err)
300 }
301 go atsync.Bus.Publish(livestream.RepoDID, postView)
302 }
303
304 case *streamplace.Livestream:
305 if r == nil {
306 // we don't know about this repo
307 return nil
308 }
309 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
310 if err != nil {
311 log.Error(ctx, "failed to parse createdAt", "err", err)
312 return nil
313 }
314 ls := &model.Livestream{
315 CID: cid,
316 URI: aturi.String(),
317 CreatedAt: createdAt,
318 Livestream: recCBOR,
319 RepoDID: userDID,
320 }
321 if rec.Post != nil {
322 ls.PostCID = rec.Post.Cid
323 ls.PostURI = rec.Post.Uri
324 }
325 err = atsync.Model.CreateLivestream(ctx, ls)
326 if err != nil {
327 return fmt.Errorf("failed to create livestream: %w", err)
328 }
329 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID)
330 if err != nil {
331 return fmt.Errorf("failed to get latest livestream for repo: %w", err)
332 }
333 lsv, err := lsHydrated.ToLivestreamView()
334 if err != nil {
335 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err)
336 }
337 go atsync.Bus.Publish(userDID, lsv)
338
339 var postView *bsky.FeedDefs_PostView
340 if lsHydrated.Post != nil {
341 postView, err = lsHydrated.Post.ToBskyPostView()
342 if err != nil {
343 return fmt.Errorf("failed to convert livestream post to bsky post view: %w", err)
344 }
345 }
346
347 task := &statedb.NotificationTask{
348 Livestream: lsv,
349 FeedPost: postView,
350 PDSURL: r.PDS,
351 }
352
353 cp, err := atsync.Model.GetChatProfile(ctx, userDID)
354 if err != nil {
355 return fmt.Errorf("failed to get chat profile: %w", err)
356 }
357 if cp != nil {
358 spcp, err := cp.ToStreamplaceChatProfile()
359 if err != nil {
360 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err)
361 }
362 task.ChatProfile = spcp
363 }
364
365 if !isUpdate && !isFirstSync {
366 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", aturi.String())))
367 if err != nil {
368 log.Error(ctx, "failed to enqueue notification task", "err", err)
369 }
370 }
371
372 case *streamplace.Key:
373 log.Debug(ctx, "creating key", "key", rec)
374 time, err := aqtime.FromString(rec.CreatedAt)
375 if err != nil {
376 return fmt.Errorf("failed to parse createdAt: %w", err)
377 }
378 key := model.SigningKey{
379 DID: rec.SigningKey,
380 RKey: rkey.String(),
381 CreatedAt: time.Time(),
382 RepoDID: userDID,
383 }
384 err = atsync.Model.UpdateSigningKey(&key)
385 if err != nil {
386 log.Error(ctx, "failed to create signing key", "err", err)
387 }
388
389 default:
390 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec))
391 }
392 return nil
393}