Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/handle-old-auth 127 lines 4.2 kB view raw
1package model 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "time" 8 9 "github.com/bluesky-social/indigo/api/bsky" 10 lexutil "github.com/bluesky-social/indigo/lex/util" 11 "gorm.io/gorm" 12 "stream.place/streamplace/pkg/streamplace" 13) 14 15type Livestream struct { 16 CID string `json:"cid" gorm:"primaryKey;column:cid"` 17 URI string `json:"uri"` 18 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_repo_created,priority:2"` 19 Livestream *[]byte `json:"livestream"` 20 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_created,priority:1"` 21 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 22 Post *FeedPost `json:"post,omitempty" gorm:"foreignKey:CID;references:PostCID"` 23 PostCID string `json:"postCID" gorm:"column:post_cid;index:idx_post_cid"` 24 PostURI string `json:"postURI" gorm:"column:post_uri"` 25} 26 27func (ls *Livestream) ToLivestreamView() (*streamplace.Livestream_LivestreamView, error) { 28 rec, err := lexutil.CborDecodeValue(*ls.Livestream) 29 if err != nil { 30 return nil, fmt.Errorf("error decoding feed post: %w", err) 31 } 32 postView := streamplace.Livestream_LivestreamView{ 33 LexiconTypeID: "place.stream.livestream#livestreamView", 34 Cid: ls.CID, 35 Uri: ls.URI, 36 Author: &bsky.ActorDefs_ProfileViewBasic{ 37 Did: ls.RepoDID, 38 Handle: ls.Repo.Handle, 39 }, 40 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 41 IndexedAt: time.Now().Format(time.RFC3339), 42 } 43 return &postView, nil 44} 45 46func (m *DBModel) CreateLivestream(ctx context.Context, ls *Livestream) error { 47 return m.DB.Create(ls).Error 48} 49 50// GetLatestLivestreamForRepo returns the most recent livestream for a given repo DID 51func (m *DBModel) GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) { 52 var livestream Livestream 53 err := m.DB. 54 Preload("Repo"). 55 Where("repo_did = ?", repoDID). 56 Order("created_at DESC"). 57 First(&livestream).Error 58 if err != nil { 59 return nil, fmt.Errorf("error retrieving latest livestream: %w", err) 60 } 61 return &livestream, nil 62} 63 64func (m *DBModel) GetLivestreamByPostCID(postCID string) (*Livestream, error) { 65 var livestream Livestream 66 err := m.DB. 67 Preload("Repo"). 68 Where("post_cid = ?", postCID). 69 First(&livestream).Error 70 if errors.Is(err, gorm.ErrRecordNotFound) { 71 return nil, nil 72 } 73 if err != nil { 74 return nil, fmt.Errorf("error retrieving livestream by postCID: %w", err) 75 } 76 return &livestream, nil 77} 78 79// GetLatestLivestreams returns the most recent livestreams, given a limit and a cursor 80// Only gets livestreams with a valid segment no less than 30 seconds old 81func (m *DBModel) GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) { 82 var recentLivestreams []Livestream 83 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 84 85 // get latest segment for the repo DID 86 latestRecentSegmentsSubQuery := m.DB.Table("segments"). 87 Select("repo_did, MAX(start_time) as latest_segment_start_time"). 88 Where("(repo_did, start_time) IN (?)", 89 m.DB.Table("segments"). 90 Select("repo_did, MAX(start_time)"). 91 Group("repo_did")). 92 Where("start_time > ?", thirtySecondsAgo.UTC()). 93 Group("repo_did") 94 95 rankedLivestreamsSubQuery := m.DB.Table("livestreams"). 96 Select("livestreams.*, ROW_NUMBER() OVER(PARTITION BY livestreams.repo_did ORDER BY livestreams.created_at DESC) as rn"). 97 Joins("JOIN repos ON livestreams.repo_did = repos.did") 98 99 mainQuery := m.DB.Table("(?) as ranked_livestreams", rankedLivestreamsSubQuery). 100 Joins("JOIN (?) as latest_segments ON ranked_livestreams.repo_did = latest_segments.repo_did", latestRecentSegmentsSubQuery). 101 Select("ranked_livestreams.*, latest_segments.latest_segment_start_time"). 102 Where("ranked_livestreams.rn = 1") 103 104 if before != nil { 105 mainQuery = mainQuery.Where("livestreams.created_at < ?", *before) 106 } 107 108 mainQuery = mainQuery.Order("ranked_livestreams.created_at DESC"). 109 Limit(limit). 110 Preload("Repo") 111 112 err := mainQuery.Find(&recentLivestreams).Error 113 114 if errors.Is(err, gorm.ErrRecordNotFound) { 115 return nil, nil 116 } 117 118 if err != nil { 119 return nil, fmt.Errorf("error fetching recent livestreams: %w", err) 120 } 121 122 if len(recentLivestreams) == 0 { 123 return nil, nil 124 } 125 126 return recentLivestreams, nil 127}