Live video on the AT Protocol
1package atproto 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "reflect" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/atproto/data" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "stream.place/streamplace/pkg/aqtime" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/model" 16 "stream.place/streamplace/pkg/statedb" 17 "stream.place/streamplace/pkg/streamplace" 18 19 lexutil "github.com/bluesky-social/indigo/lex/util" 20) 21 22func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error { 23 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String()) 24 now := time.Now() 25 r, err := atsync.Model.GetRepo(userDID) 26 if err != nil { 27 return fmt.Errorf("failed to get repo: %w", err) 28 } 29 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String()) 30 aturi, err := syntax.ParseATURI(maybeATURI) 31 if err != nil { 32 return fmt.Errorf("failed to parse ATURI: %w", err) 33 } 34 d, err := data.UnmarshalCBOR(*recCBOR) 35 if err != nil { 36 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err) 37 } 38 cb, err := lexutil.CborDecodeValue(*recCBOR) 39 if errors.Is(err, lexutil.ErrUnrecognizedType) { 40 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err) 41 return nil 42 } else if err != nil { 43 return fmt.Errorf("failed to decode record CBOR: %w", err) 44 } 45 switch rec := cb.(type) { 46 case *bsky.GraphFollow: 47 if r == nil { 48 // someone we don't know about 49 return nil 50 } 51 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject) 52 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec) 53 if err != nil { 54 log.Debug(ctx, "failed to create follow", "err", err) 55 } 56 57 case *bsky.GraphBlock: 58 if r == nil { 59 // someone we don't know about 60 return nil 61 } 62 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject) 63 block := &model.Block{ 64 RKey: rkey.String(), 65 RepoDID: userDID, 66 SubjectDID: rec.Subject, 67 Record: *recCBOR, 68 CID: cid, 69 } 70 err := atsync.Model.CreateBlock(ctx, block) 71 if err != nil { 72 return fmt.Errorf("failed to create block: %w", err) 73 } 74 block, err = atsync.Model.GetBlock(ctx, rkey.String()) 75 if err != nil { 76 return fmt.Errorf("failed to get block after we just saved it?!: %w", err) 77 } 78 streamplaceBlock, err := block.ToStreamplaceBlock() 79 if err != nil { 80 return fmt.Errorf("failed to convert block to streamplace block: %w", err) 81 } 82 go atsync.Bus.Publish(userDID, streamplaceBlock) 83 84 case *streamplace.ChatMessage: 85 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 86 if err != nil { 87 return fmt.Errorf("failed to sync bluesky repo: %w", err) 88 } 89 90 go func() { 91 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 92 if err != nil { 93 log.Error(ctx, "failed to sync bluesky repo", "err", err) 94 } 95 }() 96 97 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 98 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID) 99 if err != nil { 100 return fmt.Errorf("failed to get user block: %w", err) 101 } 102 if block != nil { 103 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer) 104 return nil 105 } 106 mcm := &model.ChatMessage{ 107 CID: cid, 108 URI: aturi.String(), 109 CreatedAt: now, 110 ChatMessage: recCBOR, 111 RepoDID: userDID, 112 Repo: repo, 113 StreamerRepoDID: rec.Streamer, 114 IndexedAt: &now, 115 } 116 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 117 mcm.ReplyToCID = &rec.Reply.Parent.Cid 118 } 119 err = atsync.Model.CreateChatMessage(ctx, mcm) 120 if err != nil { 121 log.Error(ctx, "failed to create chat message", "err", err) 122 return nil 123 } 124 mcm, err = atsync.Model.GetChatMessage(aturi.String()) 125 if err != nil { 126 log.Error(ctx, "failed to get just-saved chat message", "err", err) 127 return nil 128 } 129 if mcm == nil { 130 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err) 131 return nil 132 } 133 scm, err := mcm.ToStreamplaceMessageView() 134 if err != nil { 135 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 136 return nil 137 } 138 go atsync.Bus.Publish(rec.Streamer, scm) 139 140 if !isUpdate && !isFirstSync { 141 142 task := &statedb.ChatTask{ 143 MessageView: scm, 144 } 145 146 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskChat, task, statedb.WithTaskKey(fmt.Sprintf("chat-message::%s", aturi.String()))) 147 if err != nil { 148 log.Error(ctx, "failed to enqueue notification task", "err", err) 149 } 150 } 151 152 case *streamplace.ChatGate: 153 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 154 if err != nil { 155 return fmt.Errorf("failed to sync bluesky repo: %w", err) 156 } 157 if r == nil { 158 // someone we don't know about 159 return nil 160 } 161 log.Debug(ctx, "creating gate", "userDID", userDID, "hiddenMessage", rec.HiddenMessage) 162 gate := &model.Gate{ 163 RKey: rkey.String(), 164 RepoDID: userDID, 165 HiddenMessage: rec.HiddenMessage, 166 CID: cid, 167 CreatedAt: now, 168 Repo: repo, 169 } 170 err = atsync.Model.CreateGate(ctx, gate) 171 if err != nil { 172 return fmt.Errorf("failed to create gate: %w", err) 173 } 174 gate, err = atsync.Model.GetGate(ctx, rkey.String()) 175 if err != nil { 176 return fmt.Errorf("failed to get gate after we just saved it?!: %w", err) 177 } 178 streamplaceGate, err := gate.ToStreamplaceGate() 179 if err != nil { 180 return fmt.Errorf("failed to convert gate to streamplace gate: %w", err) 181 } 182 go atsync.Bus.Publish(userDID, streamplaceGate) 183 184 case *streamplace.ChatProfile: 185 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 186 if err != nil { 187 return fmt.Errorf("failed to sync bluesky repo: %w", err) 188 } 189 mcm := &model.ChatProfile{ 190 RepoDID: userDID, 191 Repo: repo, 192 Record: recCBOR, 193 } 194 err = atsync.Model.CreateChatProfile(ctx, mcm) 195 if err != nil { 196 log.Error(ctx, "failed to create chat profile", "err", err) 197 } 198 199 case *streamplace.ServerSettings: 200 _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 201 if err != nil { 202 return fmt.Errorf("failed to sync bluesky repo: %w", err) 203 } 204 settings := &model.ServerSettings{ 205 Server: rkey.String(), 206 RepoDID: userDID, 207 Record: recCBOR, 208 } 209 err = atsync.Model.UpdateServerSettings(ctx, settings) 210 if err != nil { 211 log.Error(ctx, "failed to create server settings", "err", err) 212 } 213 214 case *bsky.FeedPost: 215 // jsonData, err := json.Marshal(d) 216 // if err != nil { 217 // log.Error(ctx, "failed to marshal record data", "err", err) 218 // } else { 219 // log.Log(ctx, "record data", "json", string(jsonData)) 220 // } 221 222 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 223 if err != nil { 224 return fmt.Errorf("failed to parse createdAt: %w", err) 225 } 226 227 if livestream, ok := d["place.stream.livestream"]; ok { 228 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 229 if err != nil { 230 return fmt.Errorf("failed to sync bluesky repo: %w", err) 231 } 232 livestream, ok := livestream.(map[string]interface{}) 233 if !ok { 234 return fmt.Errorf("livestream is not a map") 235 } 236 url, ok := livestream["url"].(string) 237 if !ok { 238 return fmt.Errorf("livestream url is not a string") 239 } 240 log.Debug(ctx, "livestream url", "url", url) 241 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{ 242 CID: cid, 243 CreatedAt: createdAt, 244 FeedPost: recCBOR, 245 RepoDID: userDID, 246 Repo: repo, 247 Type: "livestream", 248 URI: aturi.String(), 249 IndexedAt: &now, 250 }); err != nil { 251 return fmt.Errorf("failed to create bluesky post: %w", err) 252 } 253 } else { 254 if rec.Reply == nil || rec.Reply.Root == nil { 255 return nil 256 } 257 livestream, err := atsync.Model.GetLivestreamByPostURI(rec.Reply.Root.Uri) 258 if err != nil { 259 return fmt.Errorf("failed to get livestream: %w", err) 260 } 261 if livestream == nil { 262 return nil 263 } 264 // log.Warn(ctx, "chat message detected", "uri", livestream.URI) 265 // if this post is a reply to someone's livestream post 266 // log.Warn(ctx, "chat message detected", "message", rec.Text) 267 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 268 if err != nil { 269 return fmt.Errorf("failed to sync bluesky repo: %w", err) 270 } 271 272 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle) 273 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID) 274 if err != nil { 275 return fmt.Errorf("failed to get user block: %w", err) 276 } 277 if block != nil { 278 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID) 279 return nil 280 } 281 // if fc.cli.PrintChat { 282 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text) 283 // } 284 fp := &model.FeedPost{ 285 CID: cid, 286 CreatedAt: createdAt, 287 FeedPost: recCBOR, 288 RepoDID: userDID, 289 Type: "reply", 290 Repo: repo, 291 ReplyRootURI: &livestream.PostURI, 292 ReplyRootRepoDID: &livestream.RepoDID, 293 URI: aturi.String(), 294 IndexedAt: &now, 295 } 296 err = atsync.Model.CreateFeedPost(ctx, fp) 297 if err != nil { 298 log.Error(ctx, "failed to create feed post", "err", err) 299 } 300 postView, err := fp.ToBskyPostView() 301 if err != nil { 302 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err) 303 } 304 go atsync.Bus.Publish(livestream.RepoDID, postView) 305 } 306 307 case *streamplace.Livestream: 308 if r == nil { 309 // we don't know about this repo 310 return nil 311 } 312 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 313 if err != nil { 314 log.Error(ctx, "failed to parse createdAt", "err", err) 315 return nil 316 } 317 ls := &model.Livestream{ 318 CID: cid, 319 URI: aturi.String(), 320 CreatedAt: createdAt, 321 Livestream: recCBOR, 322 RepoDID: userDID, 323 } 324 if rec.Post != nil { 325 ls.PostCID = rec.Post.Cid 326 ls.PostURI = rec.Post.Uri 327 } 328 err = atsync.Model.CreateLivestream(ctx, ls) 329 if err != nil { 330 return fmt.Errorf("failed to create livestream: %w", err) 331 } 332 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID) 333 if err != nil { 334 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 335 } 336 lsv, err := lsHydrated.ToLivestreamView() 337 if err != nil { 338 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err) 339 } 340 go atsync.Bus.Publish(userDID, lsv) 341 342 var postView *bsky.FeedDefs_PostView 343 if lsHydrated.Post != nil { 344 postView, err = lsHydrated.Post.ToBskyPostView() 345 if err != nil { 346 return fmt.Errorf("failed to convert livestream post to bsky post view: %w", err) 347 } 348 } 349 350 task := &statedb.NotificationTask{ 351 Livestream: lsv, 352 FeedPost: postView, 353 PDSURL: r.PDS, 354 } 355 356 cp, err := atsync.Model.GetChatProfile(ctx, userDID) 357 if err != nil { 358 return fmt.Errorf("failed to get chat profile: %w", err) 359 } 360 if cp != nil { 361 spcp, err := cp.ToStreamplaceChatProfile() 362 if err != nil { 363 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 364 } 365 task.ChatProfile = spcp 366 } 367 368 if !isUpdate && !isFirstSync { 369 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", aturi.String()))) 370 if err != nil { 371 log.Error(ctx, "failed to enqueue notification task", "err", err) 372 } 373 } 374 375 case *streamplace.Key: 376 log.Debug(ctx, "creating key", "key", rec) 377 time, err := aqtime.FromString(rec.CreatedAt) 378 if err != nil { 379 return fmt.Errorf("failed to parse createdAt: %w", err) 380 } 381 key := model.SigningKey{ 382 DID: rec.SigningKey, 383 RKey: rkey.String(), 384 CreatedAt: time.Time(), 385 RepoDID: userDID, 386 } 387 err = atsync.Model.UpdateSigningKey(&key) 388 if err != nil { 389 log.Error(ctx, "failed to create signing key", "err", err) 390 } 391 392 case *streamplace.BroadcastOrigin: 393 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 394 if err != nil { 395 return fmt.Errorf("failed to sync broadcast origin creator bluesky repo: %w", err) 396 } 397 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 398 if err != nil { 399 return fmt.Errorf("failed to sync broadcast origin streamer bluesky repo: %w", err) 400 } 401 err = atsync.Model.UpdateBroadcastOrigin(ctx, rec, aturi) 402 if err != nil { 403 log.Error(ctx, "failed to update broadcast origin", "err", err) 404 } 405 view := &streamplace.BroadcastDefs_BroadcastOriginView{ 406 Uri: aturi.String(), 407 Cid: cid, 408 Author: &bsky.ActorDefs_ProfileViewBasic{ 409 Did: userDID, 410 Handle: repo.Handle, 411 }, 412 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 413 } 414 // publishes with an empty string because we're discovering the stream 415 go atsync.Bus.Publish("", view) 416 417 case *streamplace.MetadataConfiguration: 418 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 419 if err != nil { 420 return fmt.Errorf("failed to sync bluesky repo: %w", err) 421 } 422 log.Debug(ctx, "creating metadata configuration", "metadata", rec) 423 metadata := &model.MetadataConfiguration{ 424 RepoDID: userDID, 425 Record: recCBOR, 426 Repo: repo, 427 } 428 err = atsync.Model.CreateMetadataConfiguration(ctx, metadata) 429 if err != nil { 430 log.Error(ctx, "failed to create metadata configuration", "err", err) 431 } 432 433 default: 434 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec)) 435 } 436 return nil 437}