Live video on the AT Protocol
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/optional-convergence 134 lines 4.6 kB view raw
1package model 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "time" 8 9 "github.com/bluesky-social/indigo/api/bsky" 10 lexutil "github.com/bluesky-social/indigo/lex/util" 11 "gorm.io/gorm" 12 "gorm.io/gorm/clause" 13 "stream.place/streamplace/pkg/streamplace" 14) 15 16type Livestream struct { 17 URI string `json:"uri" gorm:"primaryKey;column:uri"` 18 CID string `json:"cid" gorm:"column:cid"` 19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_repo_created,priority:2"` 20 Livestream *[]byte `json:"livestream"` 21 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_created,priority:1"` 22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 23 Post *FeedPost `json:"post,omitempty" gorm:"foreignKey:CID;references:PostCID"` 24 PostCID string `json:"postCID" gorm:"column:post_cid"` 25 PostURI string `json:"postURI" gorm:"column:post_uri;index:idx_post_uri"` 26} 27 28func (ls *Livestream) ToLivestreamView() (*streamplace.Livestream_LivestreamView, error) { 29 rec, err := lexutil.CborDecodeValue(*ls.Livestream) 30 if err != nil { 31 return nil, fmt.Errorf("error decoding feed post: %w", err) 32 } 33 postView := streamplace.Livestream_LivestreamView{ 34 LexiconTypeID: "place.stream.livestream#livestreamView", 35 Cid: ls.CID, 36 Uri: ls.URI, 37 Author: &bsky.ActorDefs_ProfileViewBasic{ 38 Did: ls.RepoDID, 39 Handle: ls.Repo.Handle, 40 }, 41 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 42 IndexedAt: time.Now().Format(time.RFC3339), 43 } 44 return &postView, nil 45} 46 47func (m *DBModel) CreateLivestream(ctx context.Context, ls *Livestream) error { 48 // upsert livestream record, actually 49 return m.DB.Clauses(clause.OnConflict{ 50 Columns: []clause.Column{{Name: "uri"}}, 51 DoUpdates: clause.AssignmentColumns([]string{"cid", "created_at", "livestream", "repo_did", "post_cid", "post_uri"}), 52 }).Create(ls).Error 53} 54 55// GetLatestLivestreamForRepo returns the most recent livestream for a given repo DID 56func (m *DBModel) GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) { 57 var livestream Livestream 58 err := m.DB. 59 Preload("Repo"). 60 Preload("Post"). 61 Where("repo_did = ?", repoDID). 62 Order("created_at DESC"). 63 First(&livestream).Error 64 if err != nil { 65 return nil, fmt.Errorf("error retrieving latest livestream: %w", err) 66 } 67 return &livestream, nil 68} 69 70func (m *DBModel) GetLivestreamByPostURI(postURI string) (*Livestream, error) { 71 var livestream Livestream 72 err := m.DB. 73 Preload("Repo"). 74 Preload("Post"). 75 Where("post_uri = ?", postURI). 76 First(&livestream).Error 77 if errors.Is(err, gorm.ErrRecordNotFound) { 78 return nil, nil 79 } 80 if err != nil { 81 return nil, fmt.Errorf("error retrieving livestream by postURI: %w", err) 82 } 83 return &livestream, nil 84} 85 86// GetLatestLivestreams returns the most recent livestreams, given a limit and a cursor 87// Only gets livestreams with a valid segment no less than 30 seconds old 88func (m *DBModel) GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) { 89 var recentLivestreams []Livestream 90 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 91 92 // get latest segment for the repo DID 93 latestRecentSegmentsSubQuery := m.DB.Table("segments"). 94 Select("repo_did, MAX(start_time) as latest_segment_start_time"). 95 Where("(repo_did, start_time) IN (?)", 96 m.DB.Table("segments"). 97 Select("repo_did, MAX(start_time)"). 98 Group("repo_did")). 99 Where("start_time > ?", thirtySecondsAgo.UTC()). 100 Group("repo_did") 101 102 rankedLivestreamsSubQuery := m.DB.Table("livestreams"). 103 Select("livestreams.*, ROW_NUMBER() OVER(PARTITION BY livestreams.repo_did ORDER BY livestreams.created_at DESC) as rn"). 104 Joins("JOIN repos ON livestreams.repo_did = repos.did") 105 106 mainQuery := m.DB.Table("(?) as ranked_livestreams", rankedLivestreamsSubQuery). 107 Joins("JOIN (?) as latest_segments ON ranked_livestreams.repo_did = latest_segments.repo_did", latestRecentSegmentsSubQuery). 108 Select("ranked_livestreams.*, latest_segments.latest_segment_start_time"). 109 Where("ranked_livestreams.rn = 1") 110 111 if before != nil { 112 mainQuery = mainQuery.Where("livestreams.created_at < ?", *before) 113 } 114 115 mainQuery = mainQuery.Order("ranked_livestreams.created_at DESC"). 116 Limit(limit). 117 Preload("Repo") 118 119 err := mainQuery.Find(&recentLivestreams).Error 120 121 if errors.Is(err, gorm.ErrRecordNotFound) { 122 return nil, nil 123 } 124 125 if err != nil { 126 return nil, fmt.Errorf("error fetching recent livestreams: %w", err) 127 } 128 129 if len(recentLivestreams) == 0 { 130 return nil, nil 131 } 132 133 return recentLivestreams, nil 134}