Live video on the AT Protocol
1package atproto
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "reflect"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 "github.com/bluesky-social/indigo/atproto/data"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "stream.place/streamplace/pkg/aqtime"
14 "stream.place/streamplace/pkg/log"
15 "stream.place/streamplace/pkg/model"
16 notificationpkg "stream.place/streamplace/pkg/notifications"
17 "stream.place/streamplace/pkg/streamplace"
18
19 lexutil "github.com/bluesky-social/indigo/lex/util"
20)
21
22func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool) error {
23 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String())
24 now := time.Now()
25 r, err := atsync.Model.GetRepo(userDID)
26 if err != nil {
27 return fmt.Errorf("failed to get repo: %w", err)
28 }
29 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String())
30 aturi, err := syntax.ParseATURI(maybeATURI)
31 if err != nil {
32 return fmt.Errorf("failed to parse ATURI: %w", err)
33 }
34 d, err := data.UnmarshalCBOR(*recCBOR)
35 if err != nil {
36 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err)
37 }
38 cb, err := lexutil.CborDecodeValue(*recCBOR)
39 if errors.Is(err, lexutil.ErrUnrecognizedType) {
40 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err)
41 return nil
42 } else if err != nil {
43 return fmt.Errorf("failed to decode record CBOR: %w", err)
44 }
45 switch rec := cb.(type) {
46 case *bsky.GraphFollow:
47 if r == nil {
48 // someone we don't know about
49 return nil
50 }
51 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject)
52 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec)
53 if err != nil {
54 log.Debug(ctx, "failed to create follow", "err", err)
55 }
56
57 case *bsky.GraphBlock:
58 if r == nil {
59 // someone we don't know about
60 return nil
61 }
62 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject)
63 block := &model.Block{
64 RKey: rkey.String(),
65 RepoDID: userDID,
66 SubjectDID: rec.Subject,
67 Record: *recCBOR,
68 CID: cid,
69 }
70 err := atsync.Model.CreateBlock(ctx, block)
71 if err != nil {
72 return fmt.Errorf("failed to create block: %w", err)
73 }
74 block, err = atsync.Model.GetBlock(ctx, rkey.String())
75 if err != nil {
76 return fmt.Errorf("failed to get block after we just saved it?!: %w", err)
77 }
78 streamplaceBlock, err := block.ToStreamplaceBlock()
79 if err != nil {
80 return fmt.Errorf("failed to convert block to streamplace block: %w", err)
81 }
82 go atsync.Bus.Publish(userDID, streamplaceBlock)
83
84 case *streamplace.ChatMessage:
85 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
86 if err != nil {
87 return fmt.Errorf("failed to sync bluesky repo: %w", err)
88 }
89 streamerRepo, err := atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model)
90 if err != nil {
91 return fmt.Errorf("failed to sync bluesky repo: %w", err)
92 }
93 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle)
94 block, err := atsync.Model.GetUserBlock(ctx, streamerRepo.DID, userDID)
95 if err != nil {
96 return fmt.Errorf("failed to get user block: %w", err)
97 }
98 if block != nil {
99 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", streamerRepo.DID)
100 return nil
101 }
102 mcm := &model.ChatMessage{
103 CID: cid,
104 URI: aturi.String(),
105 CreatedAt: now,
106 ChatMessage: recCBOR,
107 RepoDID: userDID,
108 Repo: repo,
109 StreamerRepoDID: streamerRepo.DID,
110 IndexedAt: &now,
111 }
112 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil {
113 mcm.ReplyToCID = &rec.Reply.Parent.Cid
114 }
115 err = atsync.Model.CreateChatMessage(ctx, mcm)
116 if err != nil {
117 log.Error(ctx, "failed to create chat message", "err", err)
118 }
119 mcm, err = atsync.Model.GetChatMessage(cid)
120 if err != nil {
121 log.Error(ctx, "failed to get just-saved chat message", "err", err)
122 }
123 if mcm == nil {
124 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err)
125 return nil
126 }
127 scm, err := mcm.ToStreamplaceMessageView()
128 if err != nil {
129 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err)
130 }
131 go atsync.Bus.Publish(streamerRepo.DID, scm)
132
133 case *streamplace.ChatProfile:
134 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
135 if err != nil {
136 return fmt.Errorf("failed to sync bluesky repo: %w", err)
137 }
138 mcm := &model.ChatProfile{
139 RepoDID: userDID,
140 Repo: repo,
141 Record: recCBOR,
142 }
143 err = atsync.Model.CreateChatProfile(ctx, mcm)
144 if err != nil {
145 log.Error(ctx, "failed to create chat profile", "err", err)
146 }
147
148 case *bsky.FeedPost:
149 // jsonData, err := json.Marshal(d)
150 // if err != nil {
151 // log.Error(ctx, "failed to marshal record data", "err", err)
152 // } else {
153 // log.Log(ctx, "record data", "json", string(jsonData))
154 // }
155
156 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
157 if err != nil {
158 return fmt.Errorf("failed to parse createdAt: %w", err)
159 }
160
161 if livestream, ok := d["place.stream.livestream"]; ok {
162 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
163 if err != nil {
164 return fmt.Errorf("failed to sync bluesky repo: %w", err)
165 }
166 livestream, ok := livestream.(map[string]interface{})
167 if !ok {
168 return fmt.Errorf("livestream is not a map")
169 }
170 url, ok := livestream["url"].(string)
171 if !ok {
172 return fmt.Errorf("livestream url is not a string")
173 }
174 log.Debug(ctx, "livestream url", "url", url)
175 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{
176 CID: cid,
177 CreatedAt: createdAt,
178 FeedPost: recCBOR,
179 RepoDID: userDID,
180 Repo: repo,
181 Type: "livestream",
182 URI: aturi.String(),
183 IndexedAt: &now,
184 }); err != nil {
185 return fmt.Errorf("failed to create bluesky post: %w", err)
186 }
187 } else {
188 if rec.Reply == nil || rec.Reply.Root == nil {
189 return nil
190 }
191 livestream, err := atsync.Model.GetLivestreamByPostCID(rec.Reply.Root.Cid)
192 if err != nil {
193 return fmt.Errorf("failed to get livestream: %w", err)
194 }
195 if livestream == nil {
196 return nil
197 }
198 // log.Warn(ctx, "chat message detected", "uri", livestream.URI)
199 // if this post is a reply to someone's livestream post
200 // log.Warn(ctx, "chat message detected", "message", rec.Text)
201 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
202 if err != nil {
203 return fmt.Errorf("failed to sync bluesky repo: %w", err)
204 }
205
206 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle)
207 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID)
208 if err != nil {
209 return fmt.Errorf("failed to get user block: %w", err)
210 }
211 if block != nil {
212 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID)
213 return nil
214 }
215 // if fc.cli.PrintChat {
216 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text)
217 // }
218 fp := &model.FeedPost{
219 CID: cid,
220 CreatedAt: createdAt,
221 FeedPost: recCBOR,
222 RepoDID: userDID,
223 Type: "reply",
224 Repo: repo,
225 ReplyRootCID: &livestream.PostCID,
226 ReplyRootRepoDID: &livestream.RepoDID,
227 URI: aturi.String(),
228 IndexedAt: &now,
229 }
230 err = atsync.Model.CreateFeedPost(ctx, fp)
231 if err != nil {
232 log.Error(ctx, "failed to create feed post", "err", err)
233 }
234 postView, err := fp.ToBskyPostView()
235 if err != nil {
236 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err)
237 }
238 go atsync.Bus.Publish(livestream.RepoDID, postView)
239 }
240
241 case *streamplace.Livestream:
242 var u string
243 if rec.Url != nil {
244 u = *rec.Url
245 }
246 if r == nil {
247 // we don't know about this repo
248 return nil
249 }
250 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt)
251 if err != nil {
252 log.Error(ctx, "failed to parse createdAt", "err", err)
253 return nil
254 }
255 ls := &model.Livestream{
256 CID: cid,
257 URI: aturi.String(),
258 CreatedAt: createdAt,
259 Livestream: recCBOR,
260 RepoDID: userDID,
261 }
262 if rec.Post != nil {
263 ls.PostCID = rec.Post.Cid
264 ls.PostURI = rec.Post.Uri
265 }
266 err = atsync.Model.CreateLivestream(ctx, ls)
267 if err != nil {
268 return fmt.Errorf("failed to create livestream: %w", err)
269 }
270 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID)
271 if err != nil {
272 return fmt.Errorf("failed to get latest livestream for repo: %w", err)
273 }
274 lsv, err := lsHydrated.ToLivestreamView()
275 if err != nil {
276 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err)
277 }
278 go atsync.Bus.Publish(userDID, lsv)
279
280 if !isUpdate {
281 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", u, "createdAt", rec.CreatedAt, "repo", userDID)
282 notifications, err := atsync.Model.GetFollowersNotificationTokens(userDID)
283 if err != nil {
284 return err
285 }
286
287 nb := ¬ificationpkg.NotificationBlast{
288 Title: fmt.Sprintf("🔴 @%s is LIVE!", r.Handle),
289 Body: rec.Title,
290 Data: map[string]string{
291 "path": fmt.Sprintf("/%s", r.Handle),
292 },
293 }
294 if atsync.Noter != nil {
295 err := atsync.Noter.Blast(ctx, notifications, nb)
296 if err != nil {
297 log.Error(ctx, "failed to blast notifications", "err", err)
298 } else {
299 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb)
300 }
301 } else {
302 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications), "content", nb)
303 }
304 }
305
306 case *streamplace.Key:
307 log.Debug(ctx, "creating key", "key", rec)
308 time, err := aqtime.FromString(rec.CreatedAt)
309 if err != nil {
310 return fmt.Errorf("failed to parse createdAt: %w", err)
311 }
312 key := model.SigningKey{
313 DID: rec.SigningKey,
314 CreatedAt: time.Time(),
315 RepoDID: userDID,
316 }
317 err = atsync.Model.UpdateSigningKey(&key)
318 if err != nil {
319 log.Error(ctx, "failed to create signing key", "err", err)
320 }
321
322 default:
323 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec))
324 }
325 return nil
326}