Live video on the AT Protocol
at eli/fix-gitlab 326 lines 10 kB view raw
1package atproto 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "reflect" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/atproto/data" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "stream.place/streamplace/pkg/aqtime" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/model" 16 notificationpkg "stream.place/streamplace/pkg/notifications" 17 "stream.place/streamplace/pkg/streamplace" 18 19 lexutil "github.com/bluesky-social/indigo/lex/util" 20) 21 22func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool) error { 23 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String()) 24 now := time.Now() 25 r, err := atsync.Model.GetRepo(userDID) 26 if err != nil { 27 return fmt.Errorf("failed to get repo: %w", err) 28 } 29 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String()) 30 aturi, err := syntax.ParseATURI(maybeATURI) 31 if err != nil { 32 return fmt.Errorf("failed to parse ATURI: %w", err) 33 } 34 d, err := data.UnmarshalCBOR(*recCBOR) 35 if err != nil { 36 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err) 37 } 38 cb, err := lexutil.CborDecodeValue(*recCBOR) 39 if errors.Is(err, lexutil.ErrUnrecognizedType) { 40 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err) 41 return nil 42 } else if err != nil { 43 return fmt.Errorf("failed to decode record CBOR: %w", err) 44 } 45 switch rec := cb.(type) { 46 case *bsky.GraphFollow: 47 if r == nil { 48 // someone we don't know about 49 return nil 50 } 51 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject) 52 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec) 53 if err != nil { 54 log.Debug(ctx, "failed to create follow", "err", err) 55 } 56 57 case *bsky.GraphBlock: 58 if r == nil { 59 // someone we don't know about 60 return nil 61 } 62 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject) 63 block := &model.Block{ 64 RKey: rkey.String(), 65 RepoDID: userDID, 66 SubjectDID: rec.Subject, 67 Record: *recCBOR, 68 CID: cid, 69 } 70 err := atsync.Model.CreateBlock(ctx, block) 71 if err != nil { 72 return fmt.Errorf("failed to create block: %w", err) 73 } 74 block, err = atsync.Model.GetBlock(ctx, rkey.String()) 75 if err != nil { 76 return fmt.Errorf("failed to get block after we just saved it?!: %w", err) 77 } 78 streamplaceBlock, err := block.ToStreamplaceBlock() 79 if err != nil { 80 return fmt.Errorf("failed to convert block to streamplace block: %w", err) 81 } 82 go atsync.Bus.Publish(userDID, streamplaceBlock) 83 84 case *streamplace.ChatMessage: 85 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 86 if err != nil { 87 return fmt.Errorf("failed to sync bluesky repo: %w", err) 88 } 89 streamerRepo, err := atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 90 if err != nil { 91 return fmt.Errorf("failed to sync bluesky repo: %w", err) 92 } 93 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 94 block, err := atsync.Model.GetUserBlock(ctx, streamerRepo.DID, userDID) 95 if err != nil { 96 return fmt.Errorf("failed to get user block: %w", err) 97 } 98 if block != nil { 99 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", streamerRepo.DID) 100 return nil 101 } 102 mcm := &model.ChatMessage{ 103 CID: cid, 104 URI: aturi.String(), 105 CreatedAt: now, 106 ChatMessage: recCBOR, 107 RepoDID: userDID, 108 Repo: repo, 109 StreamerRepoDID: streamerRepo.DID, 110 IndexedAt: &now, 111 } 112 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 113 mcm.ReplyToCID = &rec.Reply.Parent.Cid 114 } 115 err = atsync.Model.CreateChatMessage(ctx, mcm) 116 if err != nil { 117 log.Error(ctx, "failed to create chat message", "err", err) 118 } 119 mcm, err = atsync.Model.GetChatMessage(cid) 120 if err != nil { 121 log.Error(ctx, "failed to get just-saved chat message", "err", err) 122 } 123 if mcm == nil { 124 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err) 125 return nil 126 } 127 scm, err := mcm.ToStreamplaceMessageView() 128 if err != nil { 129 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 130 } 131 go atsync.Bus.Publish(streamerRepo.DID, scm) 132 133 case *streamplace.ChatProfile: 134 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 135 if err != nil { 136 return fmt.Errorf("failed to sync bluesky repo: %w", err) 137 } 138 mcm := &model.ChatProfile{ 139 RepoDID: userDID, 140 Repo: repo, 141 Record: recCBOR, 142 } 143 err = atsync.Model.CreateChatProfile(ctx, mcm) 144 if err != nil { 145 log.Error(ctx, "failed to create chat profile", "err", err) 146 } 147 148 case *bsky.FeedPost: 149 // jsonData, err := json.Marshal(d) 150 // if err != nil { 151 // log.Error(ctx, "failed to marshal record data", "err", err) 152 // } else { 153 // log.Log(ctx, "record data", "json", string(jsonData)) 154 // } 155 156 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 157 if err != nil { 158 return fmt.Errorf("failed to parse createdAt: %w", err) 159 } 160 161 if livestream, ok := d["place.stream.livestream"]; ok { 162 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 163 if err != nil { 164 return fmt.Errorf("failed to sync bluesky repo: %w", err) 165 } 166 livestream, ok := livestream.(map[string]interface{}) 167 if !ok { 168 return fmt.Errorf("livestream is not a map") 169 } 170 url, ok := livestream["url"].(string) 171 if !ok { 172 return fmt.Errorf("livestream url is not a string") 173 } 174 log.Debug(ctx, "livestream url", "url", url) 175 if err := atsync.Model.CreateFeedPost(ctx, &model.FeedPost{ 176 CID: cid, 177 CreatedAt: createdAt, 178 FeedPost: recCBOR, 179 RepoDID: userDID, 180 Repo: repo, 181 Type: "livestream", 182 URI: aturi.String(), 183 IndexedAt: &now, 184 }); err != nil { 185 return fmt.Errorf("failed to create bluesky post: %w", err) 186 } 187 } else { 188 if rec.Reply == nil || rec.Reply.Root == nil { 189 return nil 190 } 191 livestream, err := atsync.Model.GetLivestreamByPostCID(rec.Reply.Root.Cid) 192 if err != nil { 193 return fmt.Errorf("failed to get livestream: %w", err) 194 } 195 if livestream == nil { 196 return nil 197 } 198 // log.Warn(ctx, "chat message detected", "uri", livestream.URI) 199 // if this post is a reply to someone's livestream post 200 // log.Warn(ctx, "chat message detected", "message", rec.Text) 201 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 202 if err != nil { 203 return fmt.Errorf("failed to sync bluesky repo: %w", err) 204 } 205 206 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle) 207 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID) 208 if err != nil { 209 return fmt.Errorf("failed to get user block: %w", err) 210 } 211 if block != nil { 212 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID) 213 return nil 214 } 215 // if fc.cli.PrintChat { 216 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text) 217 // } 218 fp := &model.FeedPost{ 219 CID: cid, 220 CreatedAt: createdAt, 221 FeedPost: recCBOR, 222 RepoDID: userDID, 223 Type: "reply", 224 Repo: repo, 225 ReplyRootCID: &livestream.PostCID, 226 ReplyRootRepoDID: &livestream.RepoDID, 227 URI: aturi.String(), 228 IndexedAt: &now, 229 } 230 err = atsync.Model.CreateFeedPost(ctx, fp) 231 if err != nil { 232 log.Error(ctx, "failed to create feed post", "err", err) 233 } 234 postView, err := fp.ToBskyPostView() 235 if err != nil { 236 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err) 237 } 238 go atsync.Bus.Publish(livestream.RepoDID, postView) 239 } 240 241 case *streamplace.Livestream: 242 var u string 243 if rec.Url != nil { 244 u = *rec.Url 245 } 246 if r == nil { 247 // we don't know about this repo 248 return nil 249 } 250 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 251 if err != nil { 252 log.Error(ctx, "failed to parse createdAt", "err", err) 253 return nil 254 } 255 ls := &model.Livestream{ 256 CID: cid, 257 URI: aturi.String(), 258 CreatedAt: createdAt, 259 Livestream: recCBOR, 260 RepoDID: userDID, 261 } 262 if rec.Post != nil { 263 ls.PostCID = rec.Post.Cid 264 ls.PostURI = rec.Post.Uri 265 } 266 err = atsync.Model.CreateLivestream(ctx, ls) 267 if err != nil { 268 return fmt.Errorf("failed to create livestream: %w", err) 269 } 270 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID) 271 if err != nil { 272 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 273 } 274 lsv, err := lsHydrated.ToLivestreamView() 275 if err != nil { 276 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err) 277 } 278 go atsync.Bus.Publish(userDID, lsv) 279 280 if !isUpdate { 281 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", u, "createdAt", rec.CreatedAt, "repo", userDID) 282 notifications, err := atsync.Model.GetFollowersNotificationTokens(userDID) 283 if err != nil { 284 return err 285 } 286 287 nb := &notificationpkg.NotificationBlast{ 288 Title: fmt.Sprintf("🔴 @%s is LIVE!", r.Handle), 289 Body: rec.Title, 290 Data: map[string]string{ 291 "path": fmt.Sprintf("/%s", r.Handle), 292 }, 293 } 294 if atsync.Noter != nil { 295 err := atsync.Noter.Blast(ctx, notifications, nb) 296 if err != nil { 297 log.Error(ctx, "failed to blast notifications", "err", err) 298 } else { 299 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb) 300 } 301 } else { 302 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications), "content", nb) 303 } 304 } 305 306 case *streamplace.Key: 307 log.Debug(ctx, "creating key", "key", rec) 308 time, err := aqtime.FromString(rec.CreatedAt) 309 if err != nil { 310 return fmt.Errorf("failed to parse createdAt: %w", err) 311 } 312 key := model.SigningKey{ 313 DID: rec.SigningKey, 314 CreatedAt: time.Time(), 315 RepoDID: userDID, 316 } 317 err = atsync.Model.UpdateSigningKey(&key) 318 if err != nil { 319 log.Error(ctx, "failed to create signing key", "err", err) 320 } 321 322 default: 323 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec)) 324 } 325 return nil 326}