Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.13 96 lines 3.2 kB view raw
1package model 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "time" 8 9 "github.com/bluesky-social/indigo/api/bsky" 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 lexutil "github.com/bluesky-social/indigo/lex/util" 12 "stream.place/streamplace/pkg/aqtime" 13 "stream.place/streamplace/pkg/spid" 14 "stream.place/streamplace/pkg/streamplace" 15) 16 17type BroadcastOrigin struct { 18 URI string `gorm:"primaryKey;column:uri"` 19 CID string `gorm:"column:cid"` 20 RepoDID string `gorm:"column:repo_did"` 21 Repo *Repo `gorm:"foreignKey:DID;references:RepoDID"` 22 StreamerRepoDID string `gorm:"column:streamer_repo_did;index:idx_streamer_repo_did_indexed_at,priority:1"` 23 StreamerRepo *Repo `gorm:"foreignKey:DID;references:StreamerRepoDID"` 24 ServerRepoDID string `gorm:"column:server_repo_did;index:idx_server_repo_did_indexed_at,priority:1"` 25 IndexedAt time.Time `gorm:"column:indexed_at;index:idx_streamer_repo_did_indexed_at,priority:2;index:idx_server_repo_did_indexed_at,priority:2"` 26 Record []byte `gorm:"column:record"` 27} 28 29func (bo *BroadcastOrigin) TableName() string { 30 return "broadcast_origins" 31} 32 33func (bo *BroadcastOrigin) ToBroadcastOriginView() (*streamplace.BroadcastDefs_BroadcastOriginView, error) { 34 rec, err := lexutil.CborDecodeValue(bo.Record) 35 if err != nil { 36 return nil, fmt.Errorf("error decoding broadcast origin: %w", err) 37 } 38 return &streamplace.BroadcastDefs_BroadcastOriginView{ 39 Author: &bsky.ActorDefs_ProfileViewBasic{ 40 Did: bo.StreamerRepoDID, 41 }, 42 Cid: bo.CID, 43 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 44 Uri: bo.URI, 45 }, nil 46} 47 48func (m *DBModel) UpdateBroadcastOrigin(ctx context.Context, origin *streamplace.BroadcastOrigin, aturi syntax.ATURI) error { 49 repoDID := aturi.Authority().String() 50 cid, err := spid.GetCID(origin) 51 if err != nil { 52 return fmt.Errorf("failed to get CID: %w", err) 53 } 54 validATURI := fmt.Sprintf("at://%s/place.stream.broadcast.origin/%s::%s", repoDID, origin.Streamer, origin.Server) 55 if validATURI != aturi.String() { 56 return fmt.Errorf("invalid ATURI: %s != %s", validATURI, aturi.String()) 57 } 58 buf := bytes.Buffer{} 59 err = origin.MarshalCBOR(&buf) 60 if err != nil { 61 return fmt.Errorf("failed to marshal origin: %w", err) 62 } 63 aqt := aqtime.FromTime(time.Now().UTC()) 64 65 bo := &BroadcastOrigin{ 66 URI: validATURI, 67 CID: cid.String(), 68 StreamerRepoDID: origin.Streamer, 69 ServerRepoDID: origin.Server, 70 IndexedAt: aqt.Time().UTC(), 71 Record: buf.Bytes(), 72 } 73 return m.DB.Save(bo).Error 74} 75 76func (m *DBModel) GetRecentBroadcastOrigins(ctx context.Context) ([]*streamplace.BroadcastDefs_BroadcastOriginView, error) { 77 now := time.Now() 78 oneMinuteAgo := now.Add(-1 * time.Minute) 79 80 var origins []*BroadcastOrigin 81 err := m.DB. 82 Where("indexed_at >= ?", oneMinuteAgo.UTC()). 83 Find(&origins).Error 84 if err != nil { 85 return nil, err 86 } 87 views := make([]*streamplace.BroadcastDefs_BroadcastOriginView, len(origins)) 88 for i, o := range origins { 89 view, err := o.ToBroadcastOriginView() 90 if err != nil { 91 return nil, err 92 } 93 views[i] = view 94 } 95 return views, nil 96}