Live video on the AT Protocol
at eli/node-22 322 lines 10 kB view raw
1package atproto 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "reflect" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/atproto/data" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "stream.place/streamplace/pkg/aqtime" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/model" 16 notificationpkg "stream.place/streamplace/pkg/notifications" 17 "stream.place/streamplace/pkg/streamplace" 18 19 lexutil "github.com/bluesky-social/indigo/lex/util" 20) 21 22func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID) error { 23 ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String()) 24 now := time.Now() 25 r, err := atsync.Model.GetRepo(userDID) 26 if err != nil { 27 return fmt.Errorf("failed to get repo: %w", err) 28 } 29 maybeATURI := fmt.Sprintf("at://%s/%s/%s", userDID, collection.String(), rkey.String()) 30 aturi, err := syntax.ParseATURI(maybeATURI) 31 if err != nil { 32 return fmt.Errorf("failed to parse ATURI: %w", err) 33 } 34 d, err := data.UnmarshalCBOR(*recCBOR) 35 if err != nil { 36 return fmt.Errorf("failed to unmarhsal record CBOR: %w", err) 37 } 38 cb, err := lexutil.CborDecodeValue(*recCBOR) 39 if errors.Is(err, lexutil.ErrUnrecognizedType) { 40 log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err) 41 return nil 42 } else if err != nil { 43 return fmt.Errorf("failed to decode record CBOR: %w", err) 44 } 45 switch rec := cb.(type) { 46 case *bsky.GraphFollow: 47 if r == nil { 48 // someone we don't know about 49 return nil 50 } 51 log.Debug(ctx, "creating follow", "userDID", userDID, "subjectDID", rec.Subject) 52 err := atsync.Model.CreateFollow(ctx, userDID, rkey.String(), rec) 53 if err != nil { 54 log.Debug(ctx, "failed to create follow", "err", err) 55 } 56 57 case *bsky.GraphBlock: 58 if r == nil { 59 // someone we don't know about 60 return nil 61 } 62 log.Debug(ctx, "creating block", "userDID", userDID, "subjectDID", rec.Subject) 63 block := &model.Block{ 64 RKey: rkey.String(), 65 RepoDID: userDID, 66 SubjectDID: rec.Subject, 67 Record: *recCBOR, 68 CID: cid, 69 } 70 err := atsync.Model.CreateBlock(ctx, block) 71 if err != nil { 72 return fmt.Errorf("failed to create block: %w", err) 73 } 74 block, err = atsync.Model.GetBlock(ctx, rkey.String()) 75 if err != nil { 76 return fmt.Errorf("failed to get block after we just saved it?!: %w", err) 77 } 78 streamplaceBlock, err := block.ToStreamplaceBlock() 79 if err != nil { 80 return fmt.Errorf("failed to convert block to streamplace block: %w", err) 81 } 82 go atsync.Bus.Publish(userDID, streamplaceBlock) 83 84 case *streamplace.ChatMessage: 85 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 86 if err != nil { 87 return fmt.Errorf("failed to sync bluesky repo: %w", err) 88 } 89 streamerRepo, err := atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 90 if err != nil { 91 return fmt.Errorf("failed to sync bluesky repo: %w", err) 92 } 93 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 94 block, err := atsync.Model.GetUserBlock(ctx, streamerRepo.DID, userDID) 95 if err != nil { 96 return fmt.Errorf("failed to get user block: %w", err) 97 } 98 if block != nil { 99 log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", streamerRepo.DID) 100 return nil 101 } 102 mcm := &model.ChatMessage{ 103 CID: cid, 104 URI: aturi.String(), 105 CreatedAt: now, 106 ChatMessage: recCBOR, 107 RepoDID: userDID, 108 Repo: repo, 109 StreamerRepoDID: streamerRepo.DID, 110 IndexedAt: &now, 111 } 112 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 113 mcm.ReplyToCID = &rec.Reply.Parent.Cid 114 } 115 err = atsync.Model.CreateChatMessage(ctx, mcm) 116 if err != nil { 117 log.Error(ctx, "failed to create chat message", "err", err) 118 } 119 mcm, err = atsync.Model.GetChatMessage(cid) 120 if err != nil { 121 log.Error(ctx, "failed to get just-saved chat message", "err", err) 122 } 123 if mcm == nil { 124 log.Error(ctx, "failed to retrieve just-saved chat message", "err", err) 125 return nil 126 } 127 scm, err := mcm.ToStreamplaceMessageView() 128 if err != nil { 129 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 130 } 131 go atsync.Bus.Publish(streamerRepo.DID, scm) 132 133 case *streamplace.ChatProfile: 134 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 135 if err != nil { 136 return fmt.Errorf("failed to sync bluesky repo: %w", err) 137 } 138 mcm := &model.ChatProfile{ 139 RepoDID: userDID, 140 Repo: repo, 141 Record: recCBOR, 142 } 143 err = atsync.Model.CreateChatProfile(ctx, mcm) 144 if err != nil { 145 log.Error(ctx, "failed to create chat profile", "err", err) 146 } 147 148 case *bsky.FeedPost: 149 // jsonData, err := json.Marshal(d) 150 // if err != nil { 151 // log.Error(ctx, "failed to marshal record data", "err", err) 152 // } else { 153 // log.Log(ctx, "record data", "json", string(jsonData)) 154 // } 155 156 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 157 if err != nil { 158 return fmt.Errorf("failed to parse createdAt: %w", err) 159 } 160 161 if livestream, ok := d["place.stream.livestream"]; ok { 162 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 163 if err != nil { 164 return fmt.Errorf("failed to sync bluesky repo: %w", err) 165 } 166 livestream, ok := livestream.(map[string]interface{}) 167 if !ok { 168 return fmt.Errorf("livestream is not a map") 169 } 170 url, ok := livestream["url"].(string) 171 if !ok { 172 return fmt.Errorf("livestream url is not a string") 173 } 174 log.Debug(ctx, "livestream url", "url", url) 175 atsync.Model.CreateFeedPost(ctx, &model.FeedPost{ 176 CID: cid, 177 CreatedAt: createdAt, 178 FeedPost: recCBOR, 179 RepoDID: userDID, 180 Repo: repo, 181 Type: "livestream", 182 URI: aturi.String(), 183 IndexedAt: &now, 184 }) 185 } else { 186 if rec.Reply == nil || rec.Reply.Root == nil { 187 return nil 188 } 189 livestream, err := atsync.Model.GetLivestreamByPostCID(rec.Reply.Root.Cid) 190 if err != nil { 191 return fmt.Errorf("failed to get livestream: %w", err) 192 } 193 if livestream == nil { 194 return nil 195 } 196 // log.Warn(ctx, "chat message detected", "uri", livestream.URI) 197 // if this post is a reply to someone's livestream post 198 // log.Warn(ctx, "chat message detected", "message", rec.Text) 199 repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 200 if err != nil { 201 return fmt.Errorf("failed to sync bluesky repo: %w", err) 202 } 203 204 // log.Warn(ctx, "chat message detected", "message", rec.Text, "repo", repo.Handle) 205 block, err := atsync.Model.GetUserBlock(ctx, livestream.RepoDID, userDID) 206 if err != nil { 207 return fmt.Errorf("failed to get user block: %w", err) 208 } 209 if block != nil { 210 log.Warn(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", livestream.RepoDID) 211 return nil 212 } 213 // if fc.cli.PrintChat { 214 // fmt.Printf("@%s%s %s\n", blue.Sprintf(repo.Handle), green.Sprintf(":"), rec.Text) 215 // } 216 fp := &model.FeedPost{ 217 CID: cid, 218 CreatedAt: createdAt, 219 FeedPost: recCBOR, 220 RepoDID: userDID, 221 Type: "reply", 222 Repo: repo, 223 ReplyRootCID: &livestream.PostCID, 224 ReplyRootRepoDID: &livestream.RepoDID, 225 URI: aturi.String(), 226 IndexedAt: &now, 227 } 228 err = atsync.Model.CreateFeedPost(ctx, fp) 229 if err != nil { 230 log.Error(ctx, "failed to create feed post", "err", err) 231 } 232 postView, err := fp.ToBskyPostView() 233 if err != nil { 234 log.Error(ctx, "failed to convert feed post to bsky post view", "err", err) 235 } 236 go atsync.Bus.Publish(livestream.RepoDID, postView) 237 } 238 239 case *streamplace.Livestream: 240 var u string 241 if rec.Url != nil { 242 u = *rec.Url 243 } 244 if r == nil { 245 // we don't know about this repo 246 return nil 247 } 248 createdAt, err := time.Parse(time.RFC3339, rec.CreatedAt) 249 if err != nil { 250 log.Error(ctx, "failed to parse createdAt", "err", err) 251 return nil 252 } 253 ls := &model.Livestream{ 254 CID: cid, 255 URI: aturi.String(), 256 CreatedAt: createdAt, 257 Livestream: recCBOR, 258 RepoDID: userDID, 259 } 260 if rec.Post != nil { 261 ls.PostCID = rec.Post.Cid 262 ls.PostURI = rec.Post.Uri 263 } 264 err = atsync.Model.CreateLivestream(ctx, ls) 265 if err != nil { 266 return fmt.Errorf("failed to create livestream: %w", err) 267 } 268 lsHydrated, err := atsync.Model.GetLatestLivestreamForRepo(userDID) 269 if err != nil { 270 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 271 } 272 lsv, err := lsHydrated.ToLivestreamView() 273 if err != nil { 274 return fmt.Errorf("failed to convert livestream to bsky post view: %w", err) 275 } 276 go atsync.Bus.Publish(userDID, lsv) 277 278 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", u, "createdAt", rec.CreatedAt, "repo", userDID) 279 notifications, err := atsync.Model.GetFollowersNotificationTokens(userDID) 280 if err != nil { 281 return err 282 } 283 284 nb := &notificationpkg.NotificationBlast{ 285 Title: fmt.Sprintf("🔴 @%s is LIVE!", r.Handle), 286 Body: rec.Title, 287 Data: map[string]string{ 288 "path": fmt.Sprintf("/%s", r.Handle), 289 }, 290 } 291 if atsync.Noter != nil { 292 err := atsync.Noter.Blast(ctx, notifications, nb) 293 if err != nil { 294 log.Error(ctx, "failed to blast notifications", "err", err) 295 } else { 296 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb) 297 } 298 } else { 299 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications), "content", nb) 300 } 301 302 case *streamplace.Key: 303 log.Debug(ctx, "creating key", "key", rec) 304 time, err := aqtime.FromString(rec.CreatedAt) 305 if err != nil { 306 return fmt.Errorf("failed to parse createdAt: %w", err) 307 } 308 key := model.SigningKey{ 309 DID: rec.SigningKey, 310 CreatedAt: time.Time(), 311 RepoDID: userDID, 312 } 313 err = atsync.Model.UpdateSigningKey(&key) 314 if err != nil { 315 log.Error(ctx, "failed to create signing key", "err", err) 316 } 317 318 default: 319 log.Debug(ctx, "unhandled record type", "type", reflect.TypeOf(rec)) 320 } 321 return nil 322}