Live video on the AT Protocol
at eli/postgres 393 lines 12 kB view raw
1package atproto 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "reflect" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/atproto/data" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "stream.place/streamplace/pkg/aqtime" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/model" 16 "stream.place/streamplace/pkg/statedb" 17 "stream.place/streamplace/pkg/streamplace" 18 19 lexutil "github.com/bluesky-social/indigo/lex/util" 20) 21 22func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error { 23 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String()) 24 now := time.Now() 25 r, err := atsync.Model.GetRepo(userDID) 26 if err != nil { 27 return fmt.Errorf("failed to get repo: %w", err) 28 } 29 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String()) 30 aturi, err := syntax.ParseATURI(maybeATURI) 31 if err != nil { 32 return fmt.Errorf("failed to parse ATURI: %w", err) 33 } 34 d, err := data.UnmarshalCBOR(*recCBOR) 35 if err != nil { 36 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err) 37 } 38 cb, err := lexutil.CborDecodeValue(*recCBOR) 39 if errors.Is(err, lexutil.ErrUnrecognizedType) { 40 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err) 41 return nil 42 } else if err != nil { 43 return fmt.Errorf("failed to decode record CBOR: %w", err) 44 } 45 switch rec := cb.(type) { 46 case *bsky.GraphFollow: 47 if r == nil { 48 // someone we don't know about 49 return nil 50 } 51 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject) 52 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec) 53 if err != nil { 54 log.Debug(ctx, "failed to create follow", "err", err) 55 } 56 57 case *bsky.GraphBlock: 58 if r == nil { 59 // someone we don't know about 60 return nil 61 } 62 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject) 63 block := &model.Block{ 64 RKey: rkey.String(), 65 RepoDID: userDID, 66 SubjectDID: rec.Subject, 67 Record: *recCBOR, 68 CID: cid, 69 } 70 err := atsync.Model.CreateBlock(ctx, block) 71 if err != nil { 72 return fmt.Errorf("failed to create block: %w", err) 73 } 74 block, err = atsync.Model.GetBlock(ctx, rkey.String()) 75 if err != nil { 76 return fmt.Errorf("failed to get block after we just saved it?!: %w", err) 77 } 78 streamplaceBlock, err := block.ToStreamplaceBlock() 79 if err != nil { 80 return fmt.Errorf("failed to convert block to streamplace block: %w", err) 81 } 82 go atsync.Bus.Publish(userDID, streamplaceBlock) 83 84 case *streamplace.ChatMessage: 85 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 86 if err != nil { 87 return fmt.Errorf("failed to sync bluesky repo: %w", err) 88 } 89 90 go func() { 91 _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 92 if err != nil { 93 log.Error(ctx, "failed to sync bluesky repo", "err", err) 94 } 95 }() 96 97 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 98 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID) 99 if err != nil { 100 return fmt.Errorf("failed to get user block: %w", err) 101 } 102 if block != nil { 103 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer) 104 return nil 105 } 106 mcm := &model.ChatMessage{ 107 CID: cid, 108 URI: aturi.String(), 109 CreatedAt: now, 110 ChatMessage: recCBOR, 111 RepoDID: userDID, 112 Repo: repo, 113 StreamerRepoDID: rec.Streamer, 114 IndexedAt: &now, 115 } 116 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 117 mcm.ReplyToCID = &rec.Reply.Parent.Cid 118 } 119 err = atsync.Model.CreateChatMessage(ctx, mcm) 120 if err != nil { 121 log.Error(ctx, "failed to create chat message", "err", err) 122 } 123 mcm, err = atsync.Model.GetChatMessage(cid) 124 if err != nil { 125 log.Error(ctx, "failed to get just-saved chat message", "err", err) 126 } 127 if mcm == nil { 128 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err) 129 return nil 130 } 131 scm, err := mcm.ToStreamplaceMessageView() 132 if err != nil { 133 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 134 } 135 go atsync.Bus.Publish(rec.Streamer, scm) 136 137 if !isUpdate && !isFirstSync { 138 139 task := &statedb.ChatTask{ 140 MessageView: scm, 141 } 142 143 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskChat, task, statedb.WithTaskKey(fmt.Sprintf("chat-message::%s", aturi.String()))) 144 if err != nil { 145 log.Error(ctx, "failed to enqueue notification task", "err", err) 146 } 147 } 148 149 case *streamplace.ChatGate: 150 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 151 if err != nil { 152 return fmt.Errorf("failed to sync bluesky repo: %w", err) 153 } 154 if r == nil { 155 // someone we don't know about 156 return nil 157 } 158 log.Debug(ctx, "creating gate", "userDID", userDID, "hiddenMessage", rec.HiddenMessage) 159 gate := &model.Gate{ 160 RKey: rkey.String(), 161 RepoDID: userDID, 162 HiddenMessage: rec.HiddenMessage, 163 CID: cid, 164 CreatedAt: now, 165 Repo: repo, 166 } 167 err = atsync.Model.CreateGate(ctx, gate) 168 if err != nil { 169 return fmt.Errorf("failed to create gate: %w", err) 170 } 171 gate, err = atsync.Model.GetGate(ctx, rkey.String()) 172 if err != nil { 173 return fmt.Errorf("failed to get gate after we just saved it?!: %w", err) 174 } 175 streamplaceGate, err := gate.ToStreamplaceGate() 176 if err != nil { 177 return fmt.Errorf("failed to convert gate to streamplace gate: %w", err) 178 } 179 go atsync.Bus.Publish(userDID, streamplaceGate) 180 181 case *streamplace.ChatProfile: 182 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 183 if err != nil { 184 return fmt.Errorf("failed to sync bluesky repo: %w", err) 185 } 186 mcm := &model.ChatProfile{ 187 RepoDID: userDID, 188 Repo: repo, 189 Record: recCBOR, 190 } 191 err = atsync.Model.CreateChatProfile(ctx, mcm) 192 if err != nil { 193 log.Error(ctx, "failed to create chat profile", "err", err) 194 } 195 196 case *streamplace.ServerSettings: 197 _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 198 if err != nil { 199 return fmt.Errorf("failed to sync bluesky repo: %w", err) 200 } 201 settings := &model.ServerSettings{ 202 Server: rkey.String(), 203 RepoDID: userDID, 204 Record: recCBOR, 205 } 206 err = atsync.Model.UpdateServerSettings(ctx, settings) 207 if err != nil { 208 log.Error(ctx, "failed to create server settings", "err", err) 209 } 210 211 case *bsky.FeedPost: 212 // jsonData, err := json.Marshal(d) 213 // if err != nil { 214 // log.Error(ctx, "failed to marshal record data", "err", err) 215 // } else { 216 // log.Log(ctx, "record data", "json", string(jsonData)) 217 // } 218 219 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 220 if err != nil { 221 return fmt.Errorf("failed to parse createdAt: %w", err) 222 } 223 224 if livestream, ok := d["place.stream.livestream"]; ok { 225 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 226 if err != nil { 227 return fmt.Errorf("failed to sync bluesky repo: %w", err) 228 } 229 livestream, ok := livestream.(map[string]interface{}) 230 if !ok { 231 return fmt.Errorf("livestream is not a map") 232 } 233 url, ok := livestream["url"].(string) 234 if !ok { 235 return fmt.Errorf("livestream url is not a string") 236 } 237 log.Debug(ctx, "livestream url", "url", url) 238 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{ 239 CID: cid, 240 CreatedAt: createdAt, 241 FeedPost: recCBOR, 242 RepoDID: userDID, 243 Repo: repo, 244 Type: "livestream", 245 URI: aturi.String(), 246 IndexedAt: &now, 247 }); err != nil { 248 return fmt.Errorf("failed to create bluesky post: %w", err) 249 } 250 } else { 251 if rec.Reply == nil || rec.Reply.Root == nil { 252 return nil 253 } 254 livestream, err := atsync.Model.GetLivestreamByPostCID(rec.Reply.Root.Cid) 255 if err != nil { 256 return fmt.Errorf("failed to get livestream: %w", err) 257 } 258 if livestream == nil { 259 return nil 260 } 261 // log.Warn(ctx, "chat message detected", "uri", livestream.URI) 262 // if this post is a reply to someone's livestream post 263 // log.Warn(ctx, "chat message detected", "message", rec.Text) 264 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 265 if err != nil { 266 return fmt.Errorf("failed to sync bluesky repo: %w", err) 267 } 268 269 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle) 270 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID) 271 if err != nil { 272 return fmt.Errorf("failed to get user block: %w", err) 273 } 274 if block != nil { 275 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID) 276 return nil 277 } 278 // if fc.cli.PrintChat { 279 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text) 280 // } 281 fp := &model.FeedPost{ 282 CID: cid, 283 CreatedAt: createdAt, 284 FeedPost: recCBOR, 285 RepoDID: userDID, 286 Type: "reply", 287 Repo: repo, 288 ReplyRootCID: &livestream.PostCID, 289 ReplyRootRepoDID: &livestream.RepoDID, 290 URI: aturi.String(), 291 IndexedAt: &now, 292 } 293 err = atsync.Model.CreateFeedPost(ctx, fp) 294 if err != nil { 295 log.Error(ctx, "failed to create feed post", "err", err) 296 } 297 postView, err := fp.ToBskyPostView() 298 if err != nil { 299 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err) 300 } 301 go atsync.Bus.Publish(livestream.RepoDID, postView) 302 } 303 304 case *streamplace.Livestream: 305 if r == nil { 306 // we don't know about this repo 307 return nil 308 } 309 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 310 if err != nil { 311 log.Error(ctx, "failed to parse createdAt", "err", err) 312 return nil 313 } 314 ls := &model.Livestream{ 315 CID: cid, 316 URI: aturi.String(), 317 CreatedAt: createdAt, 318 Livestream: recCBOR, 319 RepoDID: userDID, 320 } 321 if rec.Post != nil { 322 ls.PostCID = rec.Post.Cid 323 ls.PostURI = rec.Post.Uri 324 } 325 err = atsync.Model.CreateLivestream(ctx, ls) 326 if err != nil { 327 return fmt.Errorf("failed to create livestream: %w", err) 328 } 329 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID) 330 if err != nil { 331 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 332 } 333 lsv, err := lsHydrated.ToLivestreamView() 334 if err != nil { 335 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err) 336 } 337 go atsync.Bus.Publish(userDID, lsv) 338 339 var postView *bsky.FeedDefs_PostView 340 if lsHydrated.Post != nil { 341 postView, err = lsHydrated.Post.ToBskyPostView() 342 if err != nil { 343 return fmt.Errorf("failed to convert livestream post to bsky post view: %w", err) 344 } 345 } 346 347 task := &statedb.NotificationTask{ 348 Livestream: lsv, 349 FeedPost: postView, 350 PDSURL: r.PDS, 351 } 352 353 cp, err := atsync.Model.GetChatProfile(ctx, userDID) 354 if err != nil { 355 return fmt.Errorf("failed to get chat profile: %w", err) 356 } 357 if cp != nil { 358 spcp, err := cp.ToStreamplaceChatProfile() 359 if err != nil { 360 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 361 } 362 task.ChatProfile = spcp 363 } 364 365 if !isUpdate && !isFirstSync { 366 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", aturi.String()))) 367 if err != nil { 368 log.Error(ctx, "failed to enqueue notification task", "err", err) 369 } 370 } 371 372 case *streamplace.Key: 373 log.Debug(ctx, "creating key", "key", rec) 374 time, err := aqtime.FromString(rec.CreatedAt) 375 if err != nil { 376 return fmt.Errorf("failed to parse createdAt: %w", err) 377 } 378 key := model.SigningKey{ 379 DID: rec.SigningKey, 380 RKey: rkey.String(), 381 CreatedAt: time.Time(), 382 RepoDID: userDID, 383 } 384 err = atsync.Model.UpdateSigningKey(&key) 385 if err != nil { 386 log.Error(ctx, "failed to create signing key", "err", err) 387 } 388 389 default: 390 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec)) 391 } 392 return nil 393}