Live video on the AT Protocol
at natb/block-javascript-protocol 502 lines 16 kB view raw
1package atproto 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "reflect" 9 "strings" 10 "time" 11 12 "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/bluesky-social/indigo/atproto/atdata" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "stream.place/streamplace/pkg/aqtime" 16 "stream.place/streamplace/pkg/log" 17 "stream.place/streamplace/pkg/model" 18 "stream.place/streamplace/pkg/statedb" 19 "stream.place/streamplace/pkg/streamplace" 20 21 lexutil "github.com/bluesky-social/indigo/lex/util" 22) 23 24func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error { 25 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String()) 26 now := time.Now() 27 r, err := atsync.Model.GetRepo(userDID) 28 if err != nil { 29 return fmt.Errorf("failed to get repo: %w", err) 30 } 31 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String()) 32 aturi, err := syntax.ParseATURI(maybeATURI) 33 if err != nil { 34 return fmt.Errorf("failed to parse ATURI: %w", err) 35 } 36 d, err := atdata.UnmarshalCBOR(*recCBOR) 37 if err != nil { 38 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err) 39 } 40 cb, err := lexutil.CborDecodeValue(*recCBOR) 41 if errors.Is(err, lexutil.ErrUnrecognizedType) { 42 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err) 43 return nil 44 } else if err != nil { 45 return fmt.Errorf("failed to decode record CBOR: %w", err) 46 } 47 switch rec := cb.(type) { 48 case *bsky.GraphFollow: 49 if r == nil { 50 // someone we don't know about 51 return nil 52 } 53 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject) 54 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec) 55 if err != nil { 56 log.Debug(ctx, "failed to create follow", "err", err) 57 } 58 59 case *bsky.GraphBlock: 60 if r == nil { 61 // someone we don't know about 62 return nil 63 } 64 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject) 65 block := &model.Block{ 66 RKey: rkey.String(), 67 RepoDID: userDID, 68 SubjectDID: rec.Subject, 69 Record: *recCBOR, 70 CID: cid, 71 } 72 err := atsync.Model.CreateBlock(ctx, block) 73 if err != nil { 74 return fmt.Errorf("failed to create block: %w", err) 75 } 76 block, err = atsync.Model.GetBlock(ctx, rkey.String()) 77 if err != nil { 78 return fmt.Errorf("failed to get block after we just saved it?!: %w", err) 79 } 80 streamplaceBlock, err := block.ToStreamplaceBlock() 81 if err != nil { 82 return fmt.Errorf("failed to convert block to streamplace block: %w", err) 83 } 84 go atsync.Bus.Publish(userDID, streamplaceBlock) 85 86 case *streamplace.ChatMessage: 87 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 88 if err != nil { 89 return fmt.Errorf("failed to sync bluesky repo: %w", err) 90 } 91 92 go func() { 93 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 94 if err != nil { 95 log.Error(ctx, "failed to sync bluesky repo", "err", err) 96 } 97 }() 98 99 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 100 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID) 101 if err != nil { 102 return fmt.Errorf("failed to get user block: %w", err) 103 } 104 if block != nil { 105 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer) 106 return nil 107 } 108 mcm := &model.ChatMessage{ 109 CID: cid, 110 URI: aturi.String(), 111 CreatedAt: now, 112 ChatMessage: recCBOR, 113 RepoDID: userDID, 114 Repo: repo, 115 StreamerRepoDID: rec.Streamer, 116 IndexedAt: &now, 117 } 118 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 119 mcm.ReplyToCID = &rec.Reply.Parent.Cid 120 } 121 122 // check if we have any link facets with 'javascript:' links 123 for _, facet := range rec.Facets { 124 for _, feature := range facet.Features { 125 if link := feature.RichtextFacet_Link; link != nil { 126 if link.Uri != "" && strings.HasPrefix(strings.ToLower(link.Uri), "javascript:") { 127 log.Warn(ctx, "excluding message with javascript: link", "uri", aturi.String(), "link", link.Uri) 128 return nil 129 } 130 } 131 } 132 } 133 134 err = atsync.Model.CreateChatMessage(ctx, mcm) 135 if err != nil { 136 log.Error(ctx, "failed to create chat message", "err", err) 137 return nil 138 } 139 mcm, err = atsync.Model.GetChatMessage(aturi.String()) 140 if err != nil { 141 log.Error(ctx, "failed to get just-saved chat message", "err", err) 142 return nil 143 } 144 if mcm == nil { 145 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err) 146 return nil 147 } 148 scm, err := mcm.ToStreamplaceMessageView() 149 if err != nil { 150 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 151 return nil 152 } 153 go atsync.Bus.Publish(rec.Streamer, scm) 154 155 if !isUpdate && !isFirstSync { 156 157 task := &statedb.ChatTask{ 158 MessageView: scm, 159 } 160 161 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskChat, task, statedb.WithTaskKey(fmt.Sprintf("chat-message::%s", aturi.String()))) 162 if err != nil { 163 log.Error(ctx, "failed to enqueue notification task", "err", err) 164 } 165 } 166 167 case *streamplace.ChatGate: 168 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 169 if err != nil { 170 return fmt.Errorf("failed to sync bluesky repo: %w", err) 171 } 172 if r == nil { 173 // someone we don't know about 174 return nil 175 } 176 log.Debug(ctx, "creating gate", "userDID", userDID, "hiddenMessage", rec.HiddenMessage) 177 gate := &model.Gate{ 178 RKey: rkey.String(), 179 RepoDID: userDID, 180 HiddenMessage: rec.HiddenMessage, 181 CID: cid, 182 CreatedAt: now, 183 Repo: repo, 184 } 185 err = atsync.Model.CreateGate(ctx, gate) 186 if err != nil { 187 return fmt.Errorf("failed to create gate: %w", err) 188 } 189 gate, err = atsync.Model.GetGate(ctx, rkey.String()) 190 if err != nil { 191 return fmt.Errorf("failed to get gate after we just saved it?!: %w", err) 192 } 193 streamplaceGate, err := gate.ToStreamplaceGate() 194 if err != nil { 195 return fmt.Errorf("failed to convert gate to streamplace gate: %w", err) 196 } 197 go atsync.Bus.Publish(userDID, streamplaceGate) 198 199 case *streamplace.ChatProfile: 200 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 201 if err != nil { 202 return fmt.Errorf("failed to sync bluesky repo: %w", err) 203 } 204 mcm := &model.ChatProfile{ 205 RepoDID: userDID, 206 Repo: repo, 207 Record: recCBOR, 208 } 209 err = atsync.Model.CreateChatProfile(ctx, mcm) 210 if err != nil { 211 log.Error(ctx, "failed to create chat profile", "err", err) 212 } 213 214 case *streamplace.ServerSettings: 215 _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 216 if err != nil { 217 return fmt.Errorf("failed to sync bluesky repo: %w", err) 218 } 219 settings := &model.ServerSettings{ 220 Server: rkey.String(), 221 RepoDID: userDID, 222 Record: recCBOR, 223 } 224 err = atsync.Model.UpdateServerSettings(ctx, settings) 225 if err != nil { 226 log.Error(ctx, "failed to create server settings", "err", err) 227 } 228 229 case *bsky.FeedPost: 230 // jsonData, err := json.Marshal(d) 231 // if err != nil { 232 // log.Error(ctx, "failed to marshal record data", "err", err) 233 // } else { 234 // log.Log(ctx, "record data", "json", string(jsonData)) 235 // } 236 237 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 238 if err != nil { 239 return fmt.Errorf("failed to parse createdAt: %w", err) 240 } 241 242 if livestream, ok := d["place.stream.livestream"]; ok { 243 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 244 if err != nil { 245 return fmt.Errorf("failed to sync bluesky repo: %w", err) 246 } 247 livestream, ok := livestream.(map[string]interface{}) 248 if !ok { 249 return fmt.Errorf("livestream is not a map") 250 } 251 url, ok := livestream["url"].(string) 252 if !ok { 253 return fmt.Errorf("livestream url is not a string") 254 } 255 log.Debug(ctx, "livestream url", "url", url) 256 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{ 257 CID: cid, 258 CreatedAt: createdAt, 259 FeedPost: recCBOR, 260 RepoDID: userDID, 261 Repo: repo, 262 Type: "livestream", 263 URI: aturi.String(), 264 IndexedAt: &now, 265 }); err != nil { 266 return fmt.Errorf("failed to create bluesky post: %w", err) 267 } 268 } else { 269 if rec.Reply == nil || rec.Reply.Root == nil { 270 return nil 271 } 272 livestream, err := atsync.Model.GetLivestreamByPostURI(rec.Reply.Root.Uri) 273 if err != nil { 274 return fmt.Errorf("failed to get livestream: %w", err) 275 } 276 if livestream == nil { 277 return nil 278 } 279 // log.Warn(ctx, "chat message detected", "uri", livestream.URI) 280 // if this post is a reply to someone's livestream post 281 // log.Warn(ctx, "chat message detected", "message", rec.Text) 282 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 283 if err != nil { 284 return fmt.Errorf("failed to sync bluesky repo: %w", err) 285 } 286 287 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle) 288 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID) 289 if err != nil { 290 return fmt.Errorf("failed to get user block: %w", err) 291 } 292 if block != nil { 293 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID) 294 return nil 295 } 296 // if fc.cli.PrintChat { 297 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text) 298 // } 299 fp := &model.FeedPost{ 300 CID: cid, 301 CreatedAt: createdAt, 302 FeedPost: recCBOR, 303 RepoDID: userDID, 304 Type: "reply", 305 Repo: repo, 306 ReplyRootURI: &livestream.PostURI, 307 ReplyRootRepoDID: &livestream.RepoDID, 308 URI: aturi.String(), 309 IndexedAt: &now, 310 } 311 err = atsync.Model.CreateFeedPost(ctx, fp) 312 if err != nil { 313 log.Error(ctx, "failed to create feed post", "err", err) 314 } 315 postView, err := fp.ToBskyPostView() 316 if err != nil { 317 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err) 318 } 319 go atsync.Bus.Publish(livestream.RepoDID, postView) 320 } 321 322 case *streamplace.Livestream: 323 if r == nil { 324 // we don't know about this repo 325 return nil 326 } 327 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 328 if err != nil { 329 log.Error(ctx, "failed to parse createdAt", "err", err) 330 return nil 331 } 332 ls := &model.Livestream{ 333 CID: cid, 334 URI: aturi.String(), 335 CreatedAt: createdAt, 336 Livestream: recCBOR, 337 RepoDID: userDID, 338 } 339 if rec.Post != nil { 340 ls.PostCID = rec.Post.Cid 341 ls.PostURI = rec.Post.Uri 342 } 343 err = atsync.Model.CreateLivestream(ctx, ls) 344 if err != nil { 345 return fmt.Errorf("failed to create livestream: %w", err) 346 } 347 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID) 348 if err != nil { 349 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 350 } 351 lsv, err := lsHydrated.ToLivestreamView() 352 if err != nil { 353 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err) 354 } 355 go atsync.Bus.Publish(userDID, lsv) 356 357 var postView *bsky.FeedDefs_PostView 358 if lsHydrated.Post != nil { 359 postView, err = lsHydrated.Post.ToBskyPostView() 360 if err != nil { 361 return fmt.Errorf("failed to convert livestream post to bsky post view: %w", err) 362 } 363 } 364 365 task := &statedb.NotificationTask{ 366 Livestream: lsv, 367 FeedPost: postView, 368 PDSURL: r.PDS, 369 } 370 371 cp, err := atsync.Model.GetChatProfile(ctx, userDID) 372 if err != nil { 373 return fmt.Errorf("failed to get chat profile: %w", err) 374 } 375 if cp != nil { 376 spcp, err := cp.ToStreamplaceChatProfile() 377 if err != nil { 378 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 379 } 380 task.ChatProfile = spcp 381 } 382 383 case *streamplace.Key: 384 log.Debug(ctx, "creating key", "key", rec) 385 time, err := aqtime.FromString(rec.CreatedAt) 386 if err != nil { 387 return fmt.Errorf("failed to parse createdAt: %w", err) 388 } 389 key := model.SigningKey{ 390 DID: rec.SigningKey, 391 RKey: rkey.String(), 392 CreatedAt: time.Time(), 393 RepoDID: userDID, 394 } 395 err = atsync.Model.UpdateSigningKey(&key) 396 if err != nil { 397 log.Error(ctx, "failed to create signing key", "err", err) 398 } 399 400 case *streamplace.BroadcastOrigin: 401 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 402 if err != nil { 403 return fmt.Errorf("failed to sync broadcast origin creator bluesky repo: %w", err) 404 } 405 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 406 if err != nil { 407 return fmt.Errorf("failed to sync broadcast origin streamer bluesky repo: %w", err) 408 } 409 err = atsync.Model.UpdateBroadcastOrigin(ctx, rec, aturi) 410 if err != nil { 411 log.Error(ctx, "failed to update broadcast origin", "err", err) 412 } 413 view := &streamplace.BroadcastDefs_BroadcastOriginView{ 414 Uri: aturi.String(), 415 Cid: cid, 416 Author: &bsky.ActorDefs_ProfileViewBasic{ 417 Did: userDID, 418 Handle: repo.Handle, 419 }, 420 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 421 } 422 // publishes with an empty string because we're discovering the stream 423 go atsync.Bus.Publish("", view) 424 425 case *streamplace.MetadataConfiguration: 426 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 427 if err != nil { 428 return fmt.Errorf("failed to sync bluesky repo: %w", err) 429 } 430 log.Debug(ctx, "creating metadata configuration", "metadata", rec) 431 metadata := &model.MetadataConfiguration{ 432 RepoDID: userDID, 433 Record: recCBOR, 434 Repo: repo, 435 } 436 err = atsync.Model.CreateMetadataConfiguration(ctx, metadata) 437 if err != nil { 438 log.Error(ctx, "failed to create metadata configuration", "err", err) 439 } 440 441 case *streamplace.ModerationPermission: 442 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 443 if err != nil { 444 return fmt.Errorf("failed to sync bluesky repo: %w", err) 445 } 446 log.Debug(ctx, "creating moderation delegation", "streamerDID", userDID, "moderatorDID", rec.Moderator) 447 448 err = atsync.Model.CreateModerationDelegation(ctx, rec, aturi) 449 if err != nil { 450 return fmt.Errorf("failed to create moderation delegation: %w", err) 451 } 452 453 view := &streamplace.ModerationDefs_PermissionView{ 454 Uri: aturi.String(), 455 Cid: cid, 456 Author: &bsky.ActorDefs_ProfileViewBasic{ 457 Did: userDID, 458 Handle: repo.Handle, 459 }, 460 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 461 } 462 // Publish moderation permission view to WebSocket bus for real-time updates 463 // This allows moderators to see their permissions instantly without page refresh 464 go atsync.Bus.Publish(userDID, view) 465 466 case *streamplace.LiveRecommendations: 467 log.Debug(ctx, "creating recommendations", "userDID", userDID, "count", len(rec.Streamers)) 468 469 // Validate max 8 streamers 470 if len(rec.Streamers) > 8 { 471 log.Warn(ctx, "recommendations exceed maximum of 8", "count", len(rec.Streamers)) 472 return fmt.Errorf("maximum 8 recommendations allowed, got %d", len(rec.Streamers)) 473 } 474 475 // Marshal streamers to JSON 476 streamersJSON, err := json.Marshal(rec.Streamers) 477 if err != nil { 478 return fmt.Errorf("failed to marshal streamers: %w", err) 479 } 480 481 // Parse createdAt timestamp 482 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 483 if err != nil { 484 return fmt.Errorf("failed to parse createdAt: %w", err) 485 } 486 487 recommendation := &model.Recommendation{ 488 UserDID: userDID, 489 Streamers: json.RawMessage(streamersJSON), 490 CreatedAt: createdAt, 491 } 492 493 err = atsync.Model.UpsertRecommendation(recommendation) 494 if err != nil { 495 return fmt.Errorf("failed to upsert recommendation: %w", err) 496 } 497 498 default: 499 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec)) 500 } 501 return nil 502}