Live video on the AT Protocol
at next 579 lines 18 kB view raw
1package atproto 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "reflect" 9 "strings" 10 "time" 11 12 "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/bluesky-social/indigo/atproto/atdata" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "stream.place/streamplace/pkg/aqtime" 16 "stream.place/streamplace/pkg/log" 17 "stream.place/streamplace/pkg/model" 18 "stream.place/streamplace/pkg/statedb" 19 "stream.place/streamplace/pkg/streamplace" 20 21 lexutil "github.com/bluesky-social/indigo/lex/util" 22) 23 24func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error { 25 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String()) 26 now := time.Now() 27 r, err := atsync.Model.GetRepo(userDID) 28 if err != nil { 29 return fmt.Errorf("failed to get repo: %w", err) 30 } 31 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String()) 32 aturi, err := syntax.ParseATURI(maybeATURI) 33 if err != nil { 34 return fmt.Errorf("failed to parse ATURI: %w", err) 35 } 36 d, err := atdata.UnmarshalCBOR(*recCBOR) 37 if err != nil { 38 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err) 39 } 40 cb, err := lexutil.CborDecodeValue(*recCBOR) 41 if errors.Is(err, lexutil.ErrUnrecognizedType) { 42 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err) 43 return nil 44 } else if err != nil { 45 return fmt.Errorf("failed to decode record CBOR: %w", err) 46 } 47 switch rec := cb.(type) { 48 case *bsky.GraphFollow: 49 if r == nil { 50 // someone we don't know about 51 return nil 52 } 53 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject) 54 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec) 55 if err != nil { 56 log.Debug(ctx, "failed to create follow", "err", err) 57 } 58 59 case *bsky.GraphBlock: 60 if r == nil { 61 // someone we don't know about 62 return nil 63 } 64 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject) 65 block := &model.Block{ 66 RKey: rkey.String(), 67 RepoDID: userDID, 68 SubjectDID: rec.Subject, 69 Record: *recCBOR, 70 CID: cid, 71 } 72 err := atsync.Model.CreateBlock(ctx, block) 73 if err != nil { 74 return fmt.Errorf("failed to create block: %w", err) 75 } 76 block, err = atsync.Model.GetBlock(ctx, rkey.String()) 77 if err != nil || block == nil { 78 return fmt.Errorf("failed to get block after we just saved it?!: %w", err) 79 } 80 streamplaceBlock, err := block.ToStreamplaceBlock() 81 if err != nil { 82 return fmt.Errorf("failed to convert block to streamplace block: %w", err) 83 } 84 go atsync.Bus.Publish(userDID, streamplaceBlock) 85 86 case *streamplace.ChatMessage: 87 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 88 if err != nil { 89 return fmt.Errorf("failed to sync bluesky repo: %w", err) 90 } 91 92 go func() { 93 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 94 if err != nil { 95 log.Error(ctx, "failed to sync bluesky repo", "err", err) 96 } 97 }() 98 99 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 100 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID) 101 if err != nil { 102 return fmt.Errorf("failed to get user block: %w", err) 103 } 104 if block != nil { 105 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer) 106 return nil 107 } 108 mcm := &model.ChatMessage{ 109 CID: cid, 110 URI: aturi.String(), 111 CreatedAt: now, 112 ChatMessage: recCBOR, 113 RepoDID: userDID, 114 Repo: repo, 115 StreamerRepoDID: rec.Streamer, 116 IndexedAt: &now, 117 } 118 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 119 mcm.ReplyToCID = &rec.Reply.Parent.Cid 120 } 121 122 // check if we have any link facets with 'javascript:' links 123 for _, facet := range rec.Facets { 124 for _, feature := range facet.Features { 125 if link := feature.RichtextFacet_Link; link != nil { 126 if link.Uri != "" && strings.HasPrefix(strings.ToLower(link.Uri), "javascript:") { 127 log.Warn(ctx, "excluding message with javascript: link", "uri", aturi.String(), "link", link.Uri) 128 return nil 129 } 130 } 131 } 132 } 133 134 err = atsync.Model.CreateChatMessage(ctx, mcm) 135 if err != nil { 136 log.Error(ctx, "failed to create chat message", "err", err) 137 return nil 138 } 139 mcm, err = atsync.Model.GetChatMessage(aturi.String()) 140 if err != nil { 141 log.Error(ctx, "failed to get just-saved chat message", "err", err) 142 return nil 143 } 144 if mcm == nil { 145 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err) 146 return nil 147 } 148 scm, err := mcm.ToStreamplaceMessageView() 149 if err != nil { 150 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 151 return nil 152 } 153 go atsync.Bus.Publish(rec.Streamer, scm) 154 155 if !isUpdate && !isFirstSync { 156 157 task := &statedb.ChatTask{ 158 MessageView: scm, 159 } 160 161 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskChat, task, statedb.WithTaskKey(fmt.Sprintf("chat-message::%s", aturi.String()))) 162 if err != nil { 163 log.Error(ctx, "failed to enqueue notification task", "err", err) 164 } 165 } 166 167 case *streamplace.ChatGate: 168 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 169 if err != nil { 170 return fmt.Errorf("failed to sync bluesky repo: %w", err) 171 } 172 if r == nil { 173 // someone we don't know about 174 return nil 175 } 176 log.Debug(ctx, "creating gate", "userDID", userDID, "hiddenMessage", rec.HiddenMessage) 177 gate := &model.Gate{ 178 RKey: rkey.String(), 179 RepoDID: userDID, 180 HiddenMessage: rec.HiddenMessage, 181 CID: cid, 182 CreatedAt: now, 183 Repo: repo, 184 } 185 err = atsync.Model.CreateGate(ctx, gate) 186 if err != nil { 187 return fmt.Errorf("failed to create gate: %w", err) 188 } 189 gate, err = atsync.Model.GetGate(ctx, rkey.String()) 190 if err != nil { 191 return fmt.Errorf("failed to get gate after we just saved it?!: %w", err) 192 } 193 streamplaceGate, err := gate.ToStreamplaceGate() 194 if err != nil { 195 return fmt.Errorf("failed to convert gate to streamplace gate: %w", err) 196 } 197 go atsync.Bus.Publish(userDID, streamplaceGate) 198 199 case *streamplace.ChatProfile: 200 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 201 if err != nil { 202 return fmt.Errorf("failed to sync bluesky repo: %w", err) 203 } 204 mcm := &model.ChatProfile{ 205 RepoDID: userDID, 206 Repo: repo, 207 Record: recCBOR, 208 } 209 err = atsync.Model.CreateChatProfile(ctx, mcm) 210 if err != nil { 211 log.Error(ctx, "failed to create chat profile", "err", err) 212 } 213 214 case *streamplace.ServerSettings: 215 _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 216 if err != nil { 217 return fmt.Errorf("failed to sync bluesky repo: %w", err) 218 } 219 settings := &model.ServerSettings{ 220 Server: rkey.String(), 221 RepoDID: userDID, 222 Record: recCBOR, 223 } 224 err = atsync.Model.UpdateServerSettings(ctx, settings) 225 if err != nil { 226 log.Error(ctx, "failed to create server settings", "err", err) 227 } 228 229 case *bsky.FeedPost: 230 // jsonData, err := json.Marshal(d) 231 // if err != nil { 232 // log.Error(ctx, "failed to marshal record data", "err", err) 233 // } else { 234 // log.Log(ctx, "record data", "json", string(jsonData)) 235 // } 236 237 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 238 if err != nil { 239 return fmt.Errorf("failed to parse createdAt: %w", err) 240 } 241 242 if livestream, ok := d["place.stream.livestream"]; ok { 243 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 244 if err != nil { 245 return fmt.Errorf("failed to sync bluesky repo: %w", err) 246 } 247 livestream, ok := livestream.(map[string]interface{}) 248 if !ok { 249 return fmt.Errorf("livestream is not a map") 250 } 251 url, ok := livestream["url"].(string) 252 if !ok { 253 return fmt.Errorf("livestream url is not a string") 254 } 255 log.Debug(ctx, "livestream url", "url", url) 256 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{ 257 CID: cid, 258 CreatedAt: createdAt, 259 FeedPost: recCBOR, 260 RepoDID: userDID, 261 Repo: repo, 262 Type: "livestream", 263 URI: aturi.String(), 264 IndexedAt: &now, 265 }); err != nil { 266 return fmt.Errorf("failed to create bluesky post: %w", err) 267 } 268 } else { 269 if rec.Reply == nil || rec.Reply.Root == nil { 270 return nil 271 } 272 livestream, err := atsync.Model.GetLivestreamByPostURI(rec.Reply.Root.Uri) 273 if err != nil { 274 return fmt.Errorf("failed to get livestream: %w", err) 275 } 276 if livestream == nil { 277 return nil 278 } 279 // log.Warn(ctx, "chat message detected", "uri", livestream.URI) 280 // if this post is a reply to someone's livestream post 281 // log.Warn(ctx, "chat message detected", "message", rec.Text) 282 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 283 if err != nil { 284 return fmt.Errorf("failed to sync bluesky repo: %w", err) 285 } 286 287 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle) 288 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID) 289 if err != nil { 290 return fmt.Errorf("failed to get user block: %w", err) 291 } 292 if block != nil { 293 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID) 294 return nil 295 } 296 // if fc.cli.PrintChat { 297 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text) 298 // } 299 fp := &model.FeedPost{ 300 CID: cid, 301 CreatedAt: createdAt, 302 FeedPost: recCBOR, 303 RepoDID: userDID, 304 Type: "reply", 305 Repo: repo, 306 ReplyRootURI: &livestream.PostURI, 307 ReplyRootRepoDID: &livestream.RepoDID, 308 URI: aturi.String(), 309 IndexedAt: &now, 310 } 311 err = atsync.Model.CreateFeedPost(ctx, fp) 312 if err != nil { 313 log.Error(ctx, "failed to create feed post", "err", err) 314 } 315 postView, err := fp.ToBskyPostView() 316 if err != nil { 317 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err) 318 } 319 go atsync.Bus.Publish(livestream.RepoDID, postView) 320 } 321 322 case *streamplace.Livestream: 323 if r == nil { 324 // we don't know about this repo 325 return nil 326 } 327 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 328 if err != nil { 329 log.Error(ctx, "failed to parse createdAt", "err", err) 330 return nil 331 } 332 ls := &model.Livestream{ 333 CID: cid, 334 URI: aturi.String(), 335 CreatedAt: createdAt, 336 Livestream: recCBOR, 337 RepoDID: userDID, 338 } 339 if rec.Post != nil { 340 ls.PostCID = rec.Post.Cid 341 ls.PostURI = rec.Post.Uri 342 } 343 err = atsync.Model.CreateLivestream(ctx, ls) 344 if err != nil { 345 return fmt.Errorf("failed to create livestream: %w", err) 346 } 347 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID) 348 if err != nil { 349 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 350 } 351 lsv, err := lsHydrated.ToLivestreamView() 352 if err != nil { 353 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err) 354 } 355 go atsync.Bus.Publish(userDID, lsv) 356 357 var postView *bsky.FeedDefs_PostView 358 if lsHydrated.Post != nil { 359 postView, err = lsHydrated.Post.ToBskyPostView() 360 if err != nil { 361 return fmt.Errorf("failed to convert livestream post to bsky post view: %w", err) 362 } 363 } 364 365 task := &statedb.NotificationTask{ 366 Livestream: lsv, 367 FeedPost: postView, 368 PDSURL: r.PDS, 369 } 370 371 cp, err := atsync.Model.GetChatProfile(ctx, userDID) 372 if err != nil { 373 return fmt.Errorf("failed to get chat profile: %w", err) 374 } 375 if cp != nil { 376 spcp, err := cp.ToStreamplaceChatProfile() 377 if err != nil { 378 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 379 } 380 task.ChatProfile = spcp 381 } 382 383 case *streamplace.LiveTeleport: 384 if r == nil { 385 return nil 386 } 387 startsAt, err := time.Parse(time.RFC3339, rec.StartsAt) 388 if err != nil { 389 log.Error(ctx, "failed to parse startsAt", "err", err) 390 return nil 391 } 392 viewerCount := atsync.Bus.GetViewerCount(userDID) 393 tp := &model.Teleport{ 394 CID: cid, 395 URI: aturi.String(), 396 StartsAt: startsAt, 397 DurationSeconds: rec.DurationSeconds, 398 ViewerCount: int64(viewerCount), 399 Teleport: recCBOR, 400 RepoDID: userDID, 401 TargetDID: rec.Streamer, 402 } 403 err = atsync.Model.CreateTeleport(ctx, tp) 404 if err != nil { 405 return fmt.Errorf("failed to create teleport: %w", err) 406 } 407 go atsync.Bus.Publish(userDID, rec) 408 409 // schedule arrival notification 10 seconds after startsAt 410 arrivalTime := startsAt.Add(10 * time.Second) 411 waitDuration := time.Until(arrivalTime) 412 if waitDuration < 0 { 413 waitDuration = 0 414 } 415 416 time.AfterFunc(waitDuration, func() { 417 // verify teleport still exists 418 existingTp, err := atsync.Model.GetTeleportByURI(aturi.String()) 419 if err != nil { 420 log.Error(ctx, "failed to get teleport by uri", "err", err) 421 return 422 } 423 if existingTp == nil || existingTp.Denied { 424 log.Debug(ctx, "teleport no longer active, skipping arrival notification", "uri", aturi.String()) 425 return 426 } 427 428 // get the source profile 429 sourceRepo, err := atsync.Model.GetRepo(userDID) 430 if err != nil { 431 log.Error(ctx, "failed to get source repo", "err", err) 432 return 433 } 434 435 viewerCount := existingTp.ViewerCount 436 437 arrivalMsg := &streamplace.Livestream_TeleportArrival{ 438 LexiconTypeID: "place.stream.livestream#teleportArrival", 439 TeleportUri: aturi.String(), 440 Source: &bsky.ActorDefs_ProfileViewBasic{ 441 Did: userDID, 442 Handle: sourceRepo.Handle, 443 }, 444 ViewerCount: int64(viewerCount), 445 StartsAt: rec.StartsAt, 446 } 447 448 // get the source chat profile 449 chatProfile, err := atsync.Model.GetChatProfile(ctx, userDID) 450 if err == nil && chatProfile != nil { 451 spcp, err := chatProfile.ToStreamplaceChatProfile() 452 if err == nil { 453 arrivalMsg.ChatProfile = spcp 454 } 455 } 456 457 atsync.Bus.Publish(rec.Streamer, arrivalMsg) 458 }) 459 460 case *streamplace.Key: 461 log.Debug(ctx, "creating key", "key", rec) 462 time, err := aqtime.FromString(rec.CreatedAt) 463 if err != nil { 464 return fmt.Errorf("failed to parse createdAt: %w", err) 465 } 466 key := model.SigningKey{ 467 DID: rec.SigningKey, 468 RKey: rkey.String(), 469 CreatedAt: time.Time(), 470 RepoDID: userDID, 471 } 472 err = atsync.Model.UpdateSigningKey(&key) 473 if err != nil { 474 log.Error(ctx, "failed to create signing key", "err", err) 475 } 476 477 case *streamplace.BroadcastOrigin: 478 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 479 if err != nil { 480 return fmt.Errorf("failed to sync broadcast origin creator bluesky repo: %w", err) 481 } 482 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 483 if err != nil { 484 return fmt.Errorf("failed to sync broadcast origin streamer bluesky repo: %w", err) 485 } 486 err = atsync.Model.UpdateBroadcastOrigin(ctx, rec, aturi) 487 if err != nil { 488 log.Error(ctx, "failed to update broadcast origin", "err", err) 489 } 490 view := &streamplace.BroadcastDefs_BroadcastOriginView{ 491 Uri: aturi.String(), 492 Cid: cid, 493 Author: &bsky.ActorDefs_ProfileViewBasic{ 494 Did: userDID, 495 Handle: repo.Handle, 496 }, 497 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 498 } 499 // publishes with an empty string because we're discovering the stream 500 go atsync.Bus.Publish("", view) 501 502 case *streamplace.MetadataConfiguration: 503 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 504 if err != nil { 505 return fmt.Errorf("failed to sync bluesky repo: %w", err) 506 } 507 log.Debug(ctx, "creating metadata configuration", "metadata", rec) 508 metadata := &model.MetadataConfiguration{ 509 RepoDID: userDID, 510 Record: recCBOR, 511 Repo: repo, 512 } 513 err = atsync.Model.CreateMetadataConfiguration(ctx, metadata) 514 if err != nil { 515 log.Error(ctx, "failed to create metadata configuration", "err", err) 516 } 517 518 case *streamplace.ModerationPermission: 519 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 520 if err != nil { 521 return fmt.Errorf("failed to sync bluesky repo: %w", err) 522 } 523 log.Debug(ctx, "creating moderation delegation", "streamerDID", userDID, "moderatorDID", rec.Moderator) 524 525 err = atsync.Model.CreateModerationDelegation(ctx, rec, aturi) 526 if err != nil { 527 return fmt.Errorf("failed to create moderation delegation: %w", err) 528 } 529 530 view := &streamplace.ModerationDefs_PermissionView{ 531 Uri: aturi.String(), 532 Cid: cid, 533 Author: &bsky.ActorDefs_ProfileViewBasic{ 534 Did: userDID, 535 Handle: repo.Handle, 536 }, 537 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 538 } 539 // Publish moderation permission view to WebSocket bus for real-time updates 540 // This allows moderators to see their permissions instantly without page refresh 541 go atsync.Bus.Publish(userDID, view) 542 543 case *streamplace.LiveRecommendations: 544 log.Debug(ctx, "creating recommendations", "userDID", userDID, "count", len(rec.Streamers)) 545 546 // Validate max 8 streamers 547 if len(rec.Streamers) > 8 { 548 log.Warn(ctx, "recommendations exceed maximum of 8", "count", len(rec.Streamers)) 549 return fmt.Errorf("maximum 8 recommendations allowed, got %d", len(rec.Streamers)) 550 } 551 552 // Marshal streamers to JSON 553 streamersJSON, err := json.Marshal(rec.Streamers) 554 if err != nil { 555 return fmt.Errorf("failed to marshal streamers: %w", err) 556 } 557 558 // Parse createdAt timestamp 559 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 560 if err != nil { 561 return fmt.Errorf("failed to parse createdAt: %w", err) 562 } 563 564 recommendation := &model.Recommendation{ 565 UserDID: userDID, 566 Streamers: json.RawMessage(streamersJSON), 567 CreatedAt: createdAt, 568 } 569 570 err = atsync.Model.UpsertRecommendation(recommendation) 571 if err != nil { 572 return fmt.Errorf("failed to upsert recommendation: %w", err) 573 } 574 575 default: 576 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec)) 577 } 578 return nil 579}