Live video on the AT Protocol
at natb/reporting-interface 427 lines 14 kB view raw
1package atproto 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "reflect" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/atproto/data" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "stream.place/streamplace/pkg/aqtime" 14 "stream.place/streamplace/pkg/integrations/discord" 15 "stream.place/streamplace/pkg/log" 16 "stream.place/streamplace/pkg/model" 17 notificationpkg "stream.place/streamplace/pkg/notifications" 18 "stream.place/streamplace/pkg/streamplace" 19 20 lexutil "github.com/bluesky-social/indigo/lex/util" 21) 22 23func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error { 24 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String()) 25 now := time.Now() 26 r, err := atsync.Model.GetRepo(userDID) 27 if err != nil { 28 return fmt.Errorf("failed to get repo: %w", err) 29 } 30 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String()) 31 aturi, err := syntax.ParseATURI(maybeATURI) 32 if err != nil { 33 return fmt.Errorf("failed to parse ATURI: %w", err) 34 } 35 d, err := data.UnmarshalCBOR(*recCBOR) 36 if err != nil { 37 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err) 38 } 39 cb, err := lexutil.CborDecodeValue(*recCBOR) 40 if errors.Is(err, lexutil.ErrUnrecognizedType) { 41 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err) 42 return nil 43 } else if err != nil { 44 return fmt.Errorf("failed to decode record CBOR: %w", err) 45 } 46 switch rec := cb.(type) { 47 case *bsky.GraphFollow: 48 if r == nil { 49 // someone we don't know about 50 return nil 51 } 52 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject) 53 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec) 54 if err != nil { 55 log.Debug(ctx, "failed to create follow", "err", err) 56 } 57 58 case *bsky.GraphBlock: 59 if r == nil { 60 // someone we don't know about 61 return nil 62 } 63 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject) 64 block := &model.Block{ 65 RKey: rkey.String(), 66 RepoDID: userDID, 67 SubjectDID: rec.Subject, 68 Record: *recCBOR, 69 CID: cid, 70 } 71 err := atsync.Model.CreateBlock(ctx, block) 72 if err != nil { 73 return fmt.Errorf("failed to create block: %w", err) 74 } 75 block, err = atsync.Model.GetBlock(ctx, rkey.String()) 76 if err != nil { 77 return fmt.Errorf("failed to get block after we just saved it?!: %w", err) 78 } 79 streamplaceBlock, err := block.ToStreamplaceBlock() 80 if err != nil { 81 return fmt.Errorf("failed to convert block to streamplace block: %w", err) 82 } 83 go atsync.Bus.Publish(userDID, streamplaceBlock) 84 85 case *streamplace.ChatMessage: 86 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 87 if err != nil { 88 return fmt.Errorf("failed to sync bluesky repo: %w", err) 89 } 90 91 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 92 if err != nil { 93 log.Error(ctx, "failed to sync bluesky repo", "err", err) 94 } 95 96 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 97 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID) 98 if err != nil { 99 return fmt.Errorf("failed to get user block: %w", err) 100 } 101 if block != nil { 102 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer) 103 return nil 104 } 105 mcm := &model.ChatMessage{ 106 CID: cid, 107 URI: aturi.String(), 108 CreatedAt: now, 109 ChatMessage: recCBOR, 110 RepoDID: userDID, 111 Repo: repo, 112 StreamerRepoDID: rec.Streamer, 113 IndexedAt: &now, 114 } 115 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 116 mcm.ReplyToCID = &rec.Reply.Parent.Cid 117 } 118 err = atsync.Model.CreateChatMessage(ctx, mcm) 119 if err != nil { 120 log.Error(ctx, "failed to create chat message", "err", err) 121 } 122 mcm, err = atsync.Model.GetChatMessage(cid) 123 if err != nil { 124 log.Error(ctx, "failed to get just-saved chat message", "err", err) 125 } 126 if mcm == nil { 127 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err) 128 return nil 129 } 130 scm, err := mcm.ToStreamplaceMessageView() 131 if err != nil { 132 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 133 } 134 go atsync.Bus.Publish(rec.Streamer, scm) 135 136 if !isUpdate && !isFirstSync { 137 for _, webhook := range atsync.CLI.DiscordWebhooks { 138 if webhook.DID == rec.Streamer && webhook.Type == "chat" { 139 go func() { 140 err := discord.SendChat(ctx, webhook, repo, scm) 141 if err != nil { 142 log.Error(ctx, "failed to send livestream to discord", "err", err) 143 } else { 144 log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL) 145 } 146 }() 147 } 148 } 149 } 150 151 case *streamplace.ChatGate: 152 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 153 if err != nil { 154 return fmt.Errorf("failed to sync bluesky repo: %w", err) 155 } 156 if r == nil { 157 // someone we don't know about 158 return nil 159 } 160 log.Debug(ctx, "creating gate", "userDID", userDID, "hiddenMessage", rec.HiddenMessage) 161 gate := &model.Gate{ 162 RKey: rkey.String(), 163 RepoDID: userDID, 164 HiddenMessage: rec.HiddenMessage, 165 CID: cid, 166 CreatedAt: now, 167 Repo: repo, 168 } 169 err = atsync.Model.CreateGate(ctx, gate) 170 if err != nil { 171 return fmt.Errorf("failed to create gate: %w", err) 172 } 173 gate, err = atsync.Model.GetGate(ctx, rkey.String()) 174 if err != nil { 175 return fmt.Errorf("failed to get gate after we just saved it?!: %w", err) 176 } 177 streamplaceGate, err := gate.ToStreamplaceGate() 178 if err != nil { 179 return fmt.Errorf("failed to convert gate to streamplace gate: %w", err) 180 } 181 go atsync.Bus.Publish(userDID, streamplaceGate) 182 183 case *streamplace.ChatProfile: 184 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 185 if err != nil { 186 return fmt.Errorf("failed to sync bluesky repo: %w", err) 187 } 188 mcm := &model.ChatProfile{ 189 RepoDID: userDID, 190 Repo: repo, 191 Record: recCBOR, 192 } 193 err = atsync.Model.CreateChatProfile(ctx, mcm) 194 if err != nil { 195 log.Error(ctx, "failed to create chat profile", "err", err) 196 } 197 198 case *streamplace.ServerSettings: 199 _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 200 if err != nil { 201 return fmt.Errorf("failed to sync bluesky repo: %w", err) 202 } 203 settings := &model.ServerSettings{ 204 Server: rkey.String(), 205 RepoDID: userDID, 206 Record: recCBOR, 207 } 208 err = atsync.Model.UpdateServerSettings(ctx, settings) 209 if err != nil { 210 log.Error(ctx, "failed to create server settings", "err", err) 211 } 212 213 case *bsky.FeedPost: 214 // jsonData, err := json.Marshal(d) 215 // if err != nil { 216 // log.Error(ctx, "failed to marshal record data", "err", err) 217 // } else { 218 // log.Log(ctx, "record data", "json", string(jsonData)) 219 // } 220 221 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 222 if err != nil { 223 return fmt.Errorf("failed to parse createdAt: %w", err) 224 } 225 226 if livestream, ok := d["place.stream.livestream"]; ok { 227 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 228 if err != nil { 229 return fmt.Errorf("failed to sync bluesky repo: %w", err) 230 } 231 livestream, ok := livestream.(map[string]interface{}) 232 if !ok { 233 return fmt.Errorf("livestream is not a map") 234 } 235 url, ok := livestream["url"].(string) 236 if !ok { 237 return fmt.Errorf("livestream url is not a string") 238 } 239 log.Debug(ctx, "livestream url", "url", url) 240 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{ 241 CID: cid, 242 CreatedAt: createdAt, 243 FeedPost: recCBOR, 244 RepoDID: userDID, 245 Repo: repo, 246 Type: "livestream", 247 URI: aturi.String(), 248 IndexedAt: &now, 249 }); err != nil { 250 return fmt.Errorf("failed to create bluesky post: %w", err) 251 } 252 } else { 253 if rec.Reply == nil || rec.Reply.Root == nil { 254 return nil 255 } 256 livestream, err := atsync.Model.GetLivestreamByPostCID(rec.Reply.Root.Cid) 257 if err != nil { 258 return fmt.Errorf("failed to get livestream: %w", err) 259 } 260 if livestream == nil { 261 return nil 262 } 263 // log.Warn(ctx, "chat message detected", "uri", livestream.URI) 264 // if this post is a reply to someone's livestream post 265 // log.Warn(ctx, "chat message detected", "message", rec.Text) 266 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 267 if err != nil { 268 return fmt.Errorf("failed to sync bluesky repo: %w", err) 269 } 270 271 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle) 272 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID) 273 if err != nil { 274 return fmt.Errorf("failed to get user block: %w", err) 275 } 276 if block != nil { 277 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID) 278 return nil 279 } 280 // if fc.cli.PrintChat { 281 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text) 282 // } 283 fp := &model.FeedPost{ 284 CID: cid, 285 CreatedAt: createdAt, 286 FeedPost: recCBOR, 287 RepoDID: userDID, 288 Type: "reply", 289 Repo: repo, 290 ReplyRootCID: &livestream.PostCID, 291 ReplyRootRepoDID: &livestream.RepoDID, 292 URI: aturi.String(), 293 IndexedAt: &now, 294 } 295 err = atsync.Model.CreateFeedPost(ctx, fp) 296 if err != nil { 297 log.Error(ctx, "failed to create feed post", "err", err) 298 } 299 postView, err := fp.ToBskyPostView() 300 if err != nil { 301 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err) 302 } 303 go atsync.Bus.Publish(livestream.RepoDID, postView) 304 } 305 306 case *streamplace.Livestream: 307 var u string 308 if rec.Url != nil { 309 u = *rec.Url 310 } 311 if r == nil { 312 // we don't know about this repo 313 return nil 314 } 315 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 316 if err != nil { 317 log.Error(ctx, "failed to parse createdAt", "err", err) 318 return nil 319 } 320 ls := &model.Livestream{ 321 CID: cid, 322 URI: aturi.String(), 323 CreatedAt: createdAt, 324 Livestream: recCBOR, 325 RepoDID: userDID, 326 } 327 if rec.Post != nil { 328 ls.PostCID = rec.Post.Cid 329 ls.PostURI = rec.Post.Uri 330 } 331 err = atsync.Model.CreateLivestream(ctx, ls) 332 if err != nil { 333 return fmt.Errorf("failed to create livestream: %w", err) 334 } 335 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID) 336 if err != nil { 337 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 338 } 339 lsv, err := lsHydrated.ToLivestreamView() 340 if err != nil { 341 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err) 342 } 343 go atsync.Bus.Publish(userDID, lsv) 344 345 if !isUpdate && !isFirstSync { 346 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", u, "createdAt", rec.CreatedAt, "repo", userDID) 347 notifications, err := atsync.Model.GetFollowersNotificationTokens(userDID) 348 if err != nil { 349 return err 350 } 351 352 nb := &notificationpkg.NotificationBlast{ 353 Title: fmt.Sprintf("🔴 @%s is LIVE!", r.Handle), 354 Body: rec.Title, 355 Data: map[string]string{ 356 "path": fmt.Sprintf("/%s", r.Handle), 357 }, 358 } 359 if atsync.Noter != nil { 360 err := atsync.Noter.Blast(ctx, notifications, nb) 361 if err != nil { 362 log.Error(ctx, "failed to blast notifications", "err", err) 363 } else { 364 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb) 365 } 366 } else { 367 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications), "content", nb) 368 } 369 370 var postView *bsky.FeedDefs_PostView 371 if lsHydrated.Post != nil { 372 postView, err = lsHydrated.Post.ToBskyPostView() 373 if err != nil { 374 log.Error(ctx, "failed to convert livestream post to bsky post view", "err", err) 375 } 376 } else { 377 log.Warn(ctx, "no post found for livestream", "livestream", lsHydrated) 378 } 379 380 var spcp *streamplace.ChatProfile 381 cp, err := atsync.Model.GetChatProfile(ctx, userDID) 382 if err != nil { 383 log.Error(ctx, "failed to get chat profile", "err", err) 384 } 385 if cp != nil { 386 spcp, err = cp.ToStreamplaceChatProfile() 387 if err != nil { 388 log.Error(ctx, "failed to convert chat profile to streamplace chat profile", "err", err) 389 } 390 } 391 392 for _, webhook := range atsync.CLI.DiscordWebhooks { 393 if webhook.DID == userDID && webhook.Type == "livestream" { 394 go func() { 395 err := discord.SendLivestream(ctx, webhook, r, lsv, postView, spcp) 396 if err != nil { 397 log.Error(ctx, "failed to send livestream to discord", "err", err) 398 } else { 399 log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL) 400 } 401 }() 402 } 403 } 404 } 405 406 case *streamplace.Key: 407 log.Debug(ctx, "creating key", "key", rec) 408 time, err := aqtime.FromString(rec.CreatedAt) 409 if err != nil { 410 return fmt.Errorf("failed to parse createdAt: %w", err) 411 } 412 key := model.SigningKey{ 413 DID: rec.SigningKey, 414 RKey: rkey.String(), 415 CreatedAt: time.Time(), 416 RepoDID: userDID, 417 } 418 err = atsync.Model.UpdateSigningKey(&key) 419 if err != nil { 420 log.Error(ctx, "failed to create signing key", "err", err) 421 } 422 423 default: 424 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec)) 425 } 426 return nil 427}