Live video on the AT Protocol
at eli/postgres 129 lines 4.3 kB view raw
1package model 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "time" 8 9 "github.com/bluesky-social/indigo/api/bsky" 10 lexutil "github.com/bluesky-social/indigo/lex/util" 11 "gorm.io/gorm" 12 "stream.place/streamplace/pkg/streamplace" 13) 14 15type Livestream struct { 16 CID string `json:"cid" gorm:"primaryKey;column:cid"` 17 URI string `json:"uri"` 18 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_repo_created,priority:2"` 19 Livestream *[]byte `json:"livestream"` 20 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_created,priority:1"` 21 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 22 Post *FeedPost `json:"post,omitempty" gorm:"foreignKey:CID;references:PostCID"` 23 PostCID string `json:"postCID" gorm:"column:post_cid;index:idx_post_cid"` 24 PostURI string `json:"postURI" gorm:"column:post_uri"` 25} 26 27func (ls *Livestream) ToLivestreamView() (*streamplace.Livestream_LivestreamView, error) { 28 rec, err := lexutil.CborDecodeValue(*ls.Livestream) 29 if err != nil { 30 return nil, fmt.Errorf("error decoding feed post: %w", err) 31 } 32 postView := streamplace.Livestream_LivestreamView{ 33 LexiconTypeID: "place.stream.livestream#livestreamView", 34 Cid: ls.CID, 35 Uri: ls.URI, 36 Author: &bsky.ActorDefs_ProfileViewBasic{ 37 Did: ls.RepoDID, 38 Handle: ls.Repo.Handle, 39 }, 40 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 41 IndexedAt: time.Now().Format(time.RFC3339), 42 } 43 return &postView, nil 44} 45 46func (m *DBModel) CreateLivestream(ctx context.Context, ls *Livestream) error { 47 return m.DB.Create(ls).Error 48} 49 50// GetLatestLivestreamForRepo returns the most recent livestream for a given repo DID 51func (m *DBModel) GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) { 52 var livestream Livestream 53 err := m.DB. 54 Preload("Repo"). 55 Preload("Post"). 56 Where("repo_did = ?", repoDID). 57 Order("created_at DESC"). 58 First(&livestream).Error 59 if err != nil { 60 return nil, fmt.Errorf("error retrieving latest livestream: %w", err) 61 } 62 return &livestream, nil 63} 64 65func (m *DBModel) GetLivestreamByPostCID(postCID string) (*Livestream, error) { 66 var livestream Livestream 67 err := m.DB. 68 Preload("Repo"). 69 Preload("Post"). 70 Where("post_cid = ?", postCID). 71 First(&livestream).Error 72 if errors.Is(err, gorm.ErrRecordNotFound) { 73 return nil, nil 74 } 75 if err != nil { 76 return nil, fmt.Errorf("error retrieving livestream by postCID: %w", err) 77 } 78 return &livestream, nil 79} 80 81// GetLatestLivestreams returns the most recent livestreams, given a limit and a cursor 82// Only gets livestreams with a valid segment no less than 30 seconds old 83func (m *DBModel) GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) { 84 var recentLivestreams []Livestream 85 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 86 87 // get latest segment for the repo DID 88 latestRecentSegmentsSubQuery := m.DB.Table("segments"). 89 Select("repo_did, MAX(start_time) as latest_segment_start_time"). 90 Where("(repo_did, start_time) IN (?)", 91 m.DB.Table("segments"). 92 Select("repo_did, MAX(start_time)"). 93 Group("repo_did")). 94 Where("start_time > ?", thirtySecondsAgo.UTC()). 95 Group("repo_did") 96 97 rankedLivestreamsSubQuery := m.DB.Table("livestreams"). 98 Select("livestreams.*, ROW_NUMBER() OVER(PARTITION BY livestreams.repo_did ORDER BY livestreams.created_at DESC) as rn"). 99 Joins("JOIN repos ON livestreams.repo_did = repos.did") 100 101 mainQuery := m.DB.Table("(?) as ranked_livestreams", rankedLivestreamsSubQuery). 102 Joins("JOIN (?) as latest_segments ON ranked_livestreams.repo_did = latest_segments.repo_did", latestRecentSegmentsSubQuery). 103 Select("ranked_livestreams.*, latest_segments.latest_segment_start_time"). 104 Where("ranked_livestreams.rn = 1") 105 106 if before != nil { 107 mainQuery = mainQuery.Where("livestreams.created_at < ?", *before) 108 } 109 110 mainQuery = mainQuery.Order("ranked_livestreams.created_at DESC"). 111 Limit(limit). 112 Preload("Repo") 113 114 err := mainQuery.Find(&recentLivestreams).Error 115 116 if errors.Is(err, gorm.ErrRecordNotFound) { 117 return nil, nil 118 } 119 120 if err != nil { 121 return nil, fmt.Errorf("error fetching recent livestreams: %w", err) 122 } 123 124 if len(recentLivestreams) == 0 { 125 return nil, nil 126 } 127 128 return recentLivestreams, nil 129}