Live video on the AT Protocol
at issue-784 96 lines 3.2 kB view raw
1package model 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "time" 8 9 "github.com/bluesky-social/indigo/api/bsky" 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 lexutil "github.com/bluesky-social/indigo/lex/util" 12 "stream.place/streamplace/pkg/aqtime" 13 "stream.place/streamplace/pkg/spid" 14 "stream.place/streamplace/pkg/streamplace" 15) 16 17type BroadcastOrigin struct { 18 URI string `gorm:"primaryKey;column:uri"` 19 CID string `gorm:"column:cid"` 20 RepoDID string `gorm:"column:repo_did"` 21 Repo *Repo `gorm:"foreignKey:DID;references:RepoDID"` 22 StreamerRepoDID string `gorm:"column:streamer_repo_did;index:idx_streamer_repo_did_indexed_at,priority:1"` 23 StreamerRepo *Repo `gorm:"foreignKey:DID;references:StreamerRepoDID"` 24 ServerRepoDID string `gorm:"column:server_repo_did;index:idx_server_repo_did_indexed_at,priority:1"` 25 IndexedAt time.Time `gorm:"column:indexed_at;index:idx_streamer_repo_did_indexed_at,priority:2;index:idx_server_repo_did_indexed_at,priority:2"` 26 Record []byte `gorm:"column:record"` 27} 28 29func (bo *BroadcastOrigin) TableName() string { 30 return "broadcast_origins" 31} 32 33func (bo *BroadcastOrigin) ToBroadcastOriginView() (*streamplace.BroadcastDefs_BroadcastOriginView, error) { 34 rec, err := lexutil.CborDecodeValue(bo.Record) 35 if err != nil { 36 return nil, fmt.Errorf("error decoding broadcast origin: %w", err) 37 } 38 return &streamplace.BroadcastDefs_BroadcastOriginView{ 39 Author: &bsky.ActorDefs_ProfileViewBasic{ 40 Did: bo.StreamerRepoDID, 41 }, 42 Cid: bo.CID, 43 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 44 Uri: bo.URI, 45 }, nil 46} 47 48func (m *DBModel) UpdateBroadcastOrigin(ctx context.Context, origin *streamplace.BroadcastOrigin, aturi syntax.ATURI) error { 49 repoDID := aturi.Authority().String() 50 cid, err := spid.GetCID(origin) 51 if err != nil { 52 return fmt.Errorf("failed to get CID: %w", err) 53 } 54 validATURI := fmt.Sprintf("at://%s/place.stream.broadcast.origin/%s::%s", repoDID, origin.Streamer, origin.Server) 55 if validATURI != aturi.String() { 56 return fmt.Errorf("invalid ATURI: %s != %s", validATURI, aturi.String()) 57 } 58 buf := bytes.Buffer{} 59 err = origin.MarshalCBOR(&buf) 60 if err != nil { 61 return fmt.Errorf("failed to marshal origin: %w", err) 62 } 63 aqt := aqtime.FromTime(time.Now().UTC()) 64 65 bo := &BroadcastOrigin{ 66 URI: validATURI, 67 CID: cid.String(), 68 StreamerRepoDID: origin.Streamer, 69 ServerRepoDID: origin.Server, 70 IndexedAt: aqt.Time().UTC(), 71 Record: buf.Bytes(), 72 } 73 return m.DB.Save(bo).Error 74} 75 76func (m *DBModel) GetRecentBroadcastOrigins(ctx context.Context) ([]*streamplace.BroadcastDefs_BroadcastOriginView, error) { 77 now := time.Now() 78 oneMinuteAgo := now.Add(-1 * time.Minute) 79 80 var origins []*BroadcastOrigin 81 err := m.DB. 82 Where("indexed_at >= ?", oneMinuteAgo.UTC()). 83 Find(&origins).Error 84 if err != nil { 85 return nil, err 86 } 87 views := make([]*streamplace.BroadcastDefs_BroadcastOriginView, len(origins)) 88 for i, o := range origins { 89 view, err := o.ToBroadcastOriginView() 90 if err != nil { 91 return nil, err 92 } 93 views[i] = view 94 } 95 return views, nil 96}