Live video on the AT Protocol
at eli/github-skip-darwin 377 lines 12 kB view raw
1package atproto 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "reflect" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/atproto/data" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "stream.place/streamplace/pkg/aqtime" 14 "stream.place/streamplace/pkg/integrations/discord" 15 "stream.place/streamplace/pkg/log" 16 "stream.place/streamplace/pkg/model" 17 notificationpkg "stream.place/streamplace/pkg/notifications" 18 "stream.place/streamplace/pkg/streamplace" 19 20 lexutil "github.com/bluesky-social/indigo/lex/util" 21) 22 23func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error { 24 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String()) 25 now := time.Now() 26 r, err := atsync.Model.GetRepo(userDID) 27 if err != nil { 28 return fmt.Errorf("failed to get repo: %w", err) 29 } 30 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String()) 31 aturi, err := syntax.ParseATURI(maybeATURI) 32 if err != nil { 33 return fmt.Errorf("failed to parse ATURI: %w", err) 34 } 35 d, err := data.UnmarshalCBOR(*recCBOR) 36 if err != nil { 37 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err) 38 } 39 cb, err := lexutil.CborDecodeValue(*recCBOR) 40 if errors.Is(err, lexutil.ErrUnrecognizedType) { 41 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err) 42 return nil 43 } else if err != nil { 44 return fmt.Errorf("failed to decode record CBOR: %w", err) 45 } 46 switch rec := cb.(type) { 47 case *bsky.GraphFollow: 48 if r == nil { 49 // someone we don't know about 50 return nil 51 } 52 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject) 53 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec) 54 if err != nil { 55 log.Debug(ctx, "failed to create follow", "err", err) 56 } 57 58 case *bsky.GraphBlock: 59 if r == nil { 60 // someone we don't know about 61 return nil 62 } 63 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject) 64 block := &model.Block{ 65 RKey: rkey.String(), 66 RepoDID: userDID, 67 SubjectDID: rec.Subject, 68 Record: *recCBOR, 69 CID: cid, 70 } 71 err := atsync.Model.CreateBlock(ctx, block) 72 if err != nil { 73 return fmt.Errorf("failed to create block: %w", err) 74 } 75 block, err = atsync.Model.GetBlock(ctx, rkey.String()) 76 if err != nil { 77 return fmt.Errorf("failed to get block after we just saved it?!: %w", err) 78 } 79 streamplaceBlock, err := block.ToStreamplaceBlock() 80 if err != nil { 81 return fmt.Errorf("failed to convert block to streamplace block: %w", err) 82 } 83 go atsync.Bus.Publish(userDID, streamplaceBlock) 84 85 case *streamplace.ChatMessage: 86 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 87 if err != nil { 88 return fmt.Errorf("failed to sync bluesky repo: %w", err) 89 } 90 streamerRepo, err := atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 91 if err != nil { 92 return fmt.Errorf("failed to sync bluesky repo: %w", err) 93 } 94 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 95 block, err := atsync.Model.GetUserBlock(ctx, streamerRepo.DID, userDID) 96 if err != nil { 97 return fmt.Errorf("failed to get user block: %w", err) 98 } 99 if block != nil { 100 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", streamerRepo.DID) 101 return nil 102 } 103 mcm := &model.ChatMessage{ 104 CID: cid, 105 URI: aturi.String(), 106 CreatedAt: now, 107 ChatMessage: recCBOR, 108 RepoDID: userDID, 109 Repo: repo, 110 StreamerRepoDID: streamerRepo.DID, 111 IndexedAt: &now, 112 } 113 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 114 mcm.ReplyToCID = &rec.Reply.Parent.Cid 115 } 116 err = atsync.Model.CreateChatMessage(ctx, mcm) 117 if err != nil { 118 log.Error(ctx, "failed to create chat message", "err", err) 119 } 120 mcm, err = atsync.Model.GetChatMessage(cid) 121 if err != nil { 122 log.Error(ctx, "failed to get just-saved chat message", "err", err) 123 } 124 if mcm == nil { 125 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err) 126 return nil 127 } 128 scm, err := mcm.ToStreamplaceMessageView() 129 if err != nil { 130 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 131 } 132 go atsync.Bus.Publish(streamerRepo.DID, scm) 133 134 if !isUpdate && !isFirstSync { 135 for _, webhook := range atsync.CLI.DiscordWebhooks { 136 if webhook.DID == streamerRepo.DID && webhook.Type == "chat" { 137 go func() { 138 err := discord.SendChat(ctx, webhook, r, scm) 139 if err != nil { 140 log.Error(ctx, "failed to send livestream to discord", "err", err) 141 } else { 142 log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL) 143 } 144 }() 145 } 146 } 147 } 148 149 case *streamplace.ChatProfile: 150 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 151 if err != nil { 152 return fmt.Errorf("failed to sync bluesky repo: %w", err) 153 } 154 mcm := &model.ChatProfile{ 155 RepoDID: userDID, 156 Repo: repo, 157 Record: recCBOR, 158 } 159 err = atsync.Model.CreateChatProfile(ctx, mcm) 160 if err != nil { 161 log.Error(ctx, "failed to create chat profile", "err", err) 162 } 163 164 case *bsky.FeedPost: 165 // jsonData, err := json.Marshal(d) 166 // if err != nil { 167 // log.Error(ctx, "failed to marshal record data", "err", err) 168 // } else { 169 // log.Log(ctx, "record data", "json", string(jsonData)) 170 // } 171 172 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 173 if err != nil { 174 return fmt.Errorf("failed to parse createdAt: %w", err) 175 } 176 177 if livestream, ok := d["place.stream.livestream"]; ok { 178 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 179 if err != nil { 180 return fmt.Errorf("failed to sync bluesky repo: %w", err) 181 } 182 livestream, ok := livestream.(map[string]interface{}) 183 if !ok { 184 return fmt.Errorf("livestream is not a map") 185 } 186 url, ok := livestream["url"].(string) 187 if !ok { 188 return fmt.Errorf("livestream url is not a string") 189 } 190 log.Debug(ctx, "livestream url", "url", url) 191 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{ 192 CID: cid, 193 CreatedAt: createdAt, 194 FeedPost: recCBOR, 195 RepoDID: userDID, 196 Repo: repo, 197 Type: "livestream", 198 URI: aturi.String(), 199 IndexedAt: &now, 200 }); err != nil { 201 return fmt.Errorf("failed to create bluesky post: %w", err) 202 } 203 } else { 204 if rec.Reply == nil || rec.Reply.Root == nil { 205 return nil 206 } 207 livestream, err := atsync.Model.GetLivestreamByPostCID(rec.Reply.Root.Cid) 208 if err != nil { 209 return fmt.Errorf("failed to get livestream: %w", err) 210 } 211 if livestream == nil { 212 return nil 213 } 214 // log.Warn(ctx, "chat message detected", "uri", livestream.URI) 215 // if this post is a reply to someone's livestream post 216 // log.Warn(ctx, "chat message detected", "message", rec.Text) 217 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 218 if err != nil { 219 return fmt.Errorf("failed to sync bluesky repo: %w", err) 220 } 221 222 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle) 223 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID) 224 if err != nil { 225 return fmt.Errorf("failed to get user block: %w", err) 226 } 227 if block != nil { 228 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID) 229 return nil 230 } 231 // if fc.cli.PrintChat { 232 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text) 233 // } 234 fp := &model.FeedPost{ 235 CID: cid, 236 CreatedAt: createdAt, 237 FeedPost: recCBOR, 238 RepoDID: userDID, 239 Type: "reply", 240 Repo: repo, 241 ReplyRootCID: &livestream.PostCID, 242 ReplyRootRepoDID: &livestream.RepoDID, 243 URI: aturi.String(), 244 IndexedAt: &now, 245 } 246 err = atsync.Model.CreateFeedPost(ctx, fp) 247 if err != nil { 248 log.Error(ctx, "failed to create feed post", "err", err) 249 } 250 postView, err := fp.ToBskyPostView() 251 if err != nil { 252 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err) 253 } 254 go atsync.Bus.Publish(livestream.RepoDID, postView) 255 } 256 257 case *streamplace.Livestream: 258 var u string 259 if rec.Url != nil { 260 u = *rec.Url 261 } 262 if r == nil { 263 // we don't know about this repo 264 return nil 265 } 266 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 267 if err != nil { 268 log.Error(ctx, "failed to parse createdAt", "err", err) 269 return nil 270 } 271 ls := &model.Livestream{ 272 CID: cid, 273 URI: aturi.String(), 274 CreatedAt: createdAt, 275 Livestream: recCBOR, 276 RepoDID: userDID, 277 } 278 if rec.Post != nil { 279 ls.PostCID = rec.Post.Cid 280 ls.PostURI = rec.Post.Uri 281 } 282 err = atsync.Model.CreateLivestream(ctx, ls) 283 if err != nil { 284 return fmt.Errorf("failed to create livestream: %w", err) 285 } 286 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID) 287 if err != nil { 288 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 289 } 290 lsv, err := lsHydrated.ToLivestreamView() 291 if err != nil { 292 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err) 293 } 294 go atsync.Bus.Publish(userDID, lsv) 295 296 if !isUpdate && !isFirstSync { 297 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", u, "createdAt", rec.CreatedAt, "repo", userDID) 298 notifications, err := atsync.Model.GetFollowersNotificationTokens(userDID) 299 if err != nil { 300 return err 301 } 302 303 nb := &notificationpkg.NotificationBlast{ 304 Title: fmt.Sprintf("🔴 @%s is LIVE!", r.Handle), 305 Body: rec.Title, 306 Data: map[string]string{ 307 "path": fmt.Sprintf("/%s", r.Handle), 308 }, 309 } 310 if atsync.Noter != nil { 311 err := atsync.Noter.Blast(ctx, notifications, nb) 312 if err != nil { 313 log.Error(ctx, "failed to blast notifications", "err", err) 314 } else { 315 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb) 316 } 317 } else { 318 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications), "content", nb) 319 } 320 321 var postView *bsky.FeedDefs_PostView 322 if lsHydrated.Post != nil { 323 postView, err = lsHydrated.Post.ToBskyPostView() 324 if err != nil { 325 log.Error(ctx, "failed to convert livestream post to bsky post view", "err", err) 326 } 327 } else { 328 log.Warn(ctx, "no post found for livestream", "livestream", lsHydrated) 329 } 330 331 var spcp *streamplace.ChatProfile 332 cp, err := atsync.Model.GetChatProfile(ctx, userDID) 333 if err != nil { 334 log.Error(ctx, "failed to get chat profile", "err", err) 335 } 336 if cp != nil { 337 spcp, err = cp.ToStreamplaceChatProfile() 338 if err != nil { 339 log.Error(ctx, "failed to convert chat profile to streamplace chat profile", "err", err) 340 } 341 } 342 343 for _, webhook := range atsync.CLI.DiscordWebhooks { 344 if webhook.DID == userDID && webhook.Type == "livestream" { 345 go func() { 346 err := discord.SendLivestream(ctx, webhook, r, lsv, postView, spcp) 347 if err != nil { 348 log.Error(ctx, "failed to send livestream to discord", "err", err) 349 } else { 350 log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL) 351 } 352 }() 353 } 354 } 355 } 356 357 case *streamplace.Key: 358 log.Debug(ctx, "creating key", "key", rec) 359 time, err := aqtime.FromString(rec.CreatedAt) 360 if err != nil { 361 return fmt.Errorf("failed to parse createdAt: %w", err) 362 } 363 key := model.SigningKey{ 364 DID: rec.SigningKey, 365 CreatedAt: time.Time(), 366 RepoDID: userDID, 367 } 368 err = atsync.Model.UpdateSigningKey(&key) 369 if err != nil { 370 log.Error(ctx, "failed to create signing key", "err", err) 371 } 372 373 default: 374 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec)) 375 } 376 return nil 377}