Live video on the AT Protocol
at eli/rtmp-push 488 lines 15 kB view raw
1package atproto 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "reflect" 9 "time" 10 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/atproto/atdata" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "stream.place/streamplace/pkg/aqtime" 15 "stream.place/streamplace/pkg/log" 16 "stream.place/streamplace/pkg/model" 17 "stream.place/streamplace/pkg/statedb" 18 "stream.place/streamplace/pkg/streamplace" 19 20 lexutil "github.com/bluesky-social/indigo/lex/util" 21) 22 23func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error { 24 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String()) 25 now := time.Now() 26 r, err := atsync.Model.GetRepo(userDID) 27 if err != nil { 28 return fmt.Errorf("failed to get repo: %w", err) 29 } 30 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String()) 31 aturi, err := syntax.ParseATURI(maybeATURI) 32 if err != nil { 33 return fmt.Errorf("failed to parse ATURI: %w", err) 34 } 35 d, err := atdata.UnmarshalCBOR(*recCBOR) 36 if err != nil { 37 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err) 38 } 39 cb, err := lexutil.CborDecodeValue(*recCBOR) 40 if errors.Is(err, lexutil.ErrUnrecognizedType) { 41 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err) 42 return nil 43 } else if err != nil { 44 return fmt.Errorf("failed to decode record CBOR: %w", err) 45 } 46 switch rec := cb.(type) { 47 case *bsky.GraphFollow: 48 if r == nil { 49 // someone we don't know about 50 return nil 51 } 52 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject) 53 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec) 54 if err != nil { 55 log.Debug(ctx, "failed to create follow", "err", err) 56 } 57 58 case *bsky.GraphBlock: 59 if r == nil { 60 // someone we don't know about 61 return nil 62 } 63 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject) 64 block := &model.Block{ 65 RKey: rkey.String(), 66 RepoDID: userDID, 67 SubjectDID: rec.Subject, 68 Record: *recCBOR, 69 CID: cid, 70 } 71 err := atsync.Model.CreateBlock(ctx, block) 72 if err != nil { 73 return fmt.Errorf("failed to create block: %w", err) 74 } 75 block, err = atsync.Model.GetBlock(ctx, rkey.String()) 76 if err != nil { 77 return fmt.Errorf("failed to get block after we just saved it?!: %w", err) 78 } 79 streamplaceBlock, err := block.ToStreamplaceBlock() 80 if err != nil { 81 return fmt.Errorf("failed to convert block to streamplace block: %w", err) 82 } 83 go atsync.Bus.Publish(userDID, streamplaceBlock) 84 85 case *streamplace.ChatMessage: 86 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 87 if err != nil { 88 return fmt.Errorf("failed to sync bluesky repo: %w", err) 89 } 90 91 go func() { 92 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 93 if err != nil { 94 log.Error(ctx, "failed to sync bluesky repo", "err", err) 95 } 96 }() 97 98 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 99 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID) 100 if err != nil { 101 return fmt.Errorf("failed to get user block: %w", err) 102 } 103 if block != nil { 104 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer) 105 return nil 106 } 107 mcm := &model.ChatMessage{ 108 CID: cid, 109 URI: aturi.String(), 110 CreatedAt: now, 111 ChatMessage: recCBOR, 112 RepoDID: userDID, 113 Repo: repo, 114 StreamerRepoDID: rec.Streamer, 115 IndexedAt: &now, 116 } 117 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 118 mcm.ReplyToCID = &rec.Reply.Parent.Cid 119 } 120 err = atsync.Model.CreateChatMessage(ctx, mcm) 121 if err != nil { 122 log.Error(ctx, "failed to create chat message", "err", err) 123 return nil 124 } 125 mcm, err = atsync.Model.GetChatMessage(aturi.String()) 126 if err != nil { 127 log.Error(ctx, "failed to get just-saved chat message", "err", err) 128 return nil 129 } 130 if mcm == nil { 131 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err) 132 return nil 133 } 134 scm, err := mcm.ToStreamplaceMessageView() 135 if err != nil { 136 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 137 return nil 138 } 139 go atsync.Bus.Publish(rec.Streamer, scm) 140 141 if !isUpdate && !isFirstSync { 142 143 task := &statedb.ChatTask{ 144 MessageView: scm, 145 } 146 147 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskChat, task, statedb.WithTaskKey(fmt.Sprintf("chat-message::%s", aturi.String()))) 148 if err != nil { 149 log.Error(ctx, "failed to enqueue notification task", "err", err) 150 } 151 } 152 153 case *streamplace.ChatGate: 154 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 155 if err != nil { 156 return fmt.Errorf("failed to sync bluesky repo: %w", err) 157 } 158 if r == nil { 159 // someone we don't know about 160 return nil 161 } 162 log.Debug(ctx, "creating gate", "userDID", userDID, "hiddenMessage", rec.HiddenMessage) 163 gate := &model.Gate{ 164 RKey: rkey.String(), 165 RepoDID: userDID, 166 HiddenMessage: rec.HiddenMessage, 167 CID: cid, 168 CreatedAt: now, 169 Repo: repo, 170 } 171 err = atsync.Model.CreateGate(ctx, gate) 172 if err != nil { 173 return fmt.Errorf("failed to create gate: %w", err) 174 } 175 gate, err = atsync.Model.GetGate(ctx, rkey.String()) 176 if err != nil { 177 return fmt.Errorf("failed to get gate after we just saved it?!: %w", err) 178 } 179 streamplaceGate, err := gate.ToStreamplaceGate() 180 if err != nil { 181 return fmt.Errorf("failed to convert gate to streamplace gate: %w", err) 182 } 183 go atsync.Bus.Publish(userDID, streamplaceGate) 184 185 case *streamplace.ChatProfile: 186 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 187 if err != nil { 188 return fmt.Errorf("failed to sync bluesky repo: %w", err) 189 } 190 mcm := &model.ChatProfile{ 191 RepoDID: userDID, 192 Repo: repo, 193 Record: recCBOR, 194 } 195 err = atsync.Model.CreateChatProfile(ctx, mcm) 196 if err != nil { 197 log.Error(ctx, "failed to create chat profile", "err", err) 198 } 199 200 case *streamplace.ServerSettings: 201 _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 202 if err != nil { 203 return fmt.Errorf("failed to sync bluesky repo: %w", err) 204 } 205 settings := &model.ServerSettings{ 206 Server: rkey.String(), 207 RepoDID: userDID, 208 Record: recCBOR, 209 } 210 err = atsync.Model.UpdateServerSettings(ctx, settings) 211 if err != nil { 212 log.Error(ctx, "failed to create server settings", "err", err) 213 } 214 215 case *bsky.FeedPost: 216 // jsonData, err := json.Marshal(d) 217 // if err != nil { 218 // log.Error(ctx, "failed to marshal record data", "err", err) 219 // } else { 220 // log.Log(ctx, "record data", "json", string(jsonData)) 221 // } 222 223 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 224 if err != nil { 225 return fmt.Errorf("failed to parse createdAt: %w", err) 226 } 227 228 if livestream, ok := d["place.stream.livestream"]; ok { 229 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 230 if err != nil { 231 return fmt.Errorf("failed to sync bluesky repo: %w", err) 232 } 233 livestream, ok := livestream.(map[string]interface{}) 234 if !ok { 235 return fmt.Errorf("livestream is not a map") 236 } 237 url, ok := livestream["url"].(string) 238 if !ok { 239 return fmt.Errorf("livestream url is not a string") 240 } 241 log.Debug(ctx, "livestream url", "url", url) 242 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{ 243 CID: cid, 244 CreatedAt: createdAt, 245 FeedPost: recCBOR, 246 RepoDID: userDID, 247 Repo: repo, 248 Type: "livestream", 249 URI: aturi.String(), 250 IndexedAt: &now, 251 }); err != nil { 252 return fmt.Errorf("failed to create bluesky post: %w", err) 253 } 254 } else { 255 if rec.Reply == nil || rec.Reply.Root == nil { 256 return nil 257 } 258 livestream, err := atsync.Model.GetLivestreamByPostURI(rec.Reply.Root.Uri) 259 if err != nil { 260 return fmt.Errorf("failed to get livestream: %w", err) 261 } 262 if livestream == nil { 263 return nil 264 } 265 // log.Warn(ctx, "chat message detected", "uri", livestream.URI) 266 // if this post is a reply to someone's livestream post 267 // log.Warn(ctx, "chat message detected", "message", rec.Text) 268 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 269 if err != nil { 270 return fmt.Errorf("failed to sync bluesky repo: %w", err) 271 } 272 273 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle) 274 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID) 275 if err != nil { 276 return fmt.Errorf("failed to get user block: %w", err) 277 } 278 if block != nil { 279 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID) 280 return nil 281 } 282 // if fc.cli.PrintChat { 283 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text) 284 // } 285 fp := &model.FeedPost{ 286 CID: cid, 287 CreatedAt: createdAt, 288 FeedPost: recCBOR, 289 RepoDID: userDID, 290 Type: "reply", 291 Repo: repo, 292 ReplyRootURI: &livestream.PostURI, 293 ReplyRootRepoDID: &livestream.RepoDID, 294 URI: aturi.String(), 295 IndexedAt: &now, 296 } 297 err = atsync.Model.CreateFeedPost(ctx, fp) 298 if err != nil { 299 log.Error(ctx, "failed to create feed post", "err", err) 300 } 301 postView, err := fp.ToBskyPostView() 302 if err != nil { 303 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err) 304 } 305 go atsync.Bus.Publish(livestream.RepoDID, postView) 306 } 307 308 case *streamplace.Livestream: 309 if r == nil { 310 // we don't know about this repo 311 return nil 312 } 313 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 314 if err != nil { 315 log.Error(ctx, "failed to parse createdAt", "err", err) 316 return nil 317 } 318 ls := &model.Livestream{ 319 CID: cid, 320 URI: aturi.String(), 321 CreatedAt: createdAt, 322 Livestream: recCBOR, 323 RepoDID: userDID, 324 } 325 if rec.Post != nil { 326 ls.PostCID = rec.Post.Cid 327 ls.PostURI = rec.Post.Uri 328 } 329 err = atsync.Model.CreateLivestream(ctx, ls) 330 if err != nil { 331 return fmt.Errorf("failed to create livestream: %w", err) 332 } 333 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID) 334 if err != nil { 335 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 336 } 337 lsv, err := lsHydrated.ToLivestreamView() 338 if err != nil { 339 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err) 340 } 341 go atsync.Bus.Publish(userDID, lsv) 342 343 var postView *bsky.FeedDefs_PostView 344 if lsHydrated.Post != nil { 345 postView, err = lsHydrated.Post.ToBskyPostView() 346 if err != nil { 347 return fmt.Errorf("failed to convert livestream post to bsky post view: %w", err) 348 } 349 } 350 351 task := &statedb.NotificationTask{ 352 Livestream: lsv, 353 FeedPost: postView, 354 PDSURL: r.PDS, 355 } 356 357 cp, err := atsync.Model.GetChatProfile(ctx, userDID) 358 if err != nil { 359 return fmt.Errorf("failed to get chat profile: %w", err) 360 } 361 if cp != nil { 362 spcp, err := cp.ToStreamplaceChatProfile() 363 if err != nil { 364 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 365 } 366 task.ChatProfile = spcp 367 } 368 369 case *streamplace.Key: 370 log.Debug(ctx, "creating key", "key", rec) 371 time, err := aqtime.FromString(rec.CreatedAt) 372 if err != nil { 373 return fmt.Errorf("failed to parse createdAt: %w", err) 374 } 375 key := model.SigningKey{ 376 DID: rec.SigningKey, 377 RKey: rkey.String(), 378 CreatedAt: time.Time(), 379 RepoDID: userDID, 380 } 381 err = atsync.Model.UpdateSigningKey(&key) 382 if err != nil { 383 log.Error(ctx, "failed to create signing key", "err", err) 384 } 385 386 case *streamplace.BroadcastOrigin: 387 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 388 if err != nil { 389 return fmt.Errorf("failed to sync broadcast origin creator bluesky repo: %w", err) 390 } 391 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 392 if err != nil { 393 return fmt.Errorf("failed to sync broadcast origin streamer bluesky repo: %w", err) 394 } 395 err = atsync.Model.UpdateBroadcastOrigin(ctx, rec, aturi) 396 if err != nil { 397 log.Error(ctx, "failed to update broadcast origin", "err", err) 398 } 399 view := &streamplace.BroadcastDefs_BroadcastOriginView{ 400 Uri: aturi.String(), 401 Cid: cid, 402 Author: &bsky.ActorDefs_ProfileViewBasic{ 403 Did: userDID, 404 Handle: repo.Handle, 405 }, 406 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 407 } 408 // publishes with an empty string because we're discovering the stream 409 go atsync.Bus.Publish("", view) 410 411 case *streamplace.MetadataConfiguration: 412 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 413 if err != nil { 414 return fmt.Errorf("failed to sync bluesky repo: %w", err) 415 } 416 log.Debug(ctx, "creating metadata configuration", "metadata", rec) 417 metadata := &model.MetadataConfiguration{ 418 RepoDID: userDID, 419 Record: recCBOR, 420 Repo: repo, 421 } 422 err = atsync.Model.CreateMetadataConfiguration(ctx, metadata) 423 if err != nil { 424 log.Error(ctx, "failed to create metadata configuration", "err", err) 425 } 426 427 case *streamplace.ModerationPermission: 428 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 429 if err != nil { 430 return fmt.Errorf("failed to sync bluesky repo: %w", err) 431 } 432 log.Debug(ctx, "creating moderation delegation", "streamerDID", userDID, "moderatorDID", rec.Moderator) 433 434 err = atsync.Model.CreateModerationDelegation(ctx, rec, aturi) 435 if err != nil { 436 return fmt.Errorf("failed to create moderation delegation: %w", err) 437 } 438 439 view := &streamplace.ModerationDefs_PermissionView{ 440 Uri: aturi.String(), 441 Cid: cid, 442 Author: &bsky.ActorDefs_ProfileViewBasic{ 443 Did: userDID, 444 Handle: repo.Handle, 445 }, 446 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 447 } 448 // Publish moderation permission view to WebSocket bus for real-time updates 449 // This allows moderators to see their permissions instantly without page refresh 450 go atsync.Bus.Publish(userDID, view) 451 452 case *streamplace.LiveRecommendations: 453 log.Debug(ctx, "creating recommendations", "userDID", userDID, "count", len(rec.Streamers)) 454 455 // Validate max 8 streamers 456 if len(rec.Streamers) > 8 { 457 log.Warn(ctx, "recommendations exceed maximum of 8", "count", len(rec.Streamers)) 458 return fmt.Errorf("maximum 8 recommendations allowed, got %d", len(rec.Streamers)) 459 } 460 461 // Marshal streamers to JSON 462 streamersJSON, err := json.Marshal(rec.Streamers) 463 if err != nil { 464 return fmt.Errorf("failed to marshal streamers: %w", err) 465 } 466 467 // Parse createdAt timestamp 468 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 469 if err != nil { 470 return fmt.Errorf("failed to parse createdAt: %w", err) 471 } 472 473 recommendation := &model.Recommendation{ 474 UserDID: userDID, 475 Streamers: json.RawMessage(streamersJSON), 476 CreatedAt: createdAt, 477 } 478 479 err = atsync.Model.UpsertRecommendation(recommendation) 480 if err != nil { 481 return fmt.Errorf("failed to upsert recommendation: %w", err) 482 } 483 484 default: 485 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec)) 486 } 487 return nil 488}