Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/bluesky-social/indigo/api/bsky"
10 lexutil "github.com/bluesky-social/indigo/lex/util"
11 "gorm.io/gorm"
12 "gorm.io/gorm/clause"
13 "stream.place/streamplace/pkg/streamplace"
14)
15
16type Livestream struct {
17 URI string `json:"uri" gorm:"primaryKey;column:uri"`
18 CID string `json:"cid" gorm:"column:cid"`
19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_repo_created,priority:2"`
20 Livestream *[]byte `json:"livestream"`
21 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_created,priority:1"`
22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
23 Post *FeedPost `json:"post,omitempty" gorm:"foreignKey:CID;references:PostCID"`
24 PostCID string `json:"postCID" gorm:"column:post_cid"`
25 PostURI string `json:"postURI" gorm:"column:post_uri;index:idx_post_uri"`
26}
27
28func (ls *Livestream) ToLivestreamView() (*streamplace.Livestream_LivestreamView, error) {
29 rec, err := lexutil.CborDecodeValue(*ls.Livestream)
30 if err != nil {
31 return nil, fmt.Errorf("error decoding feed post: %w", err)
32 }
33 postView := streamplace.Livestream_LivestreamView{
34 LexiconTypeID: "place.stream.livestream#livestreamView",
35 Cid: ls.CID,
36 Uri: ls.URI,
37 Author: &bsky.ActorDefs_ProfileViewBasic{
38 Did: ls.RepoDID,
39 Handle: ls.Repo.Handle,
40 },
41 Record: &lexutil.LexiconTypeDecoder{Val: rec},
42 IndexedAt: time.Now().Format(time.RFC3339),
43 }
44 return &postView, nil
45}
46
47func (m *DBModel) CreateLivestream(ctx context.Context, ls *Livestream) error {
48 // upsert livestream record, actually
49 return m.DB.Clauses(clause.OnConflict{
50 Columns: []clause.Column{{Name: "uri"}},
51 DoUpdates: clause.AssignmentColumns([]string{"cid", "created_at", "livestream", "repo_did", "post_cid", "post_uri"}),
52 }).Create(ls).Error
53}
54
55// GetLatestLivestreamForRepo returns the most recent livestream for a given repo DID
56func (m *DBModel) GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) {
57 var livestream Livestream
58 err := m.DB.
59 Preload("Repo").
60 Preload("Post").
61 Where("repo_did = ?", repoDID).
62 Order("created_at DESC").
63 First(&livestream).Error
64 if err != nil {
65 return nil, fmt.Errorf("error retrieving latest livestream: %w", err)
66 }
67 return &livestream, nil
68}
69
70func (m *DBModel) GetLivestreamByPostURI(postURI string) (*Livestream, error) {
71 var livestream Livestream
72 err := m.DB.
73 Preload("Repo").
74 Preload("Post").
75 Where("post_uri = ?", postURI).
76 First(&livestream).Error
77 if errors.Is(err, gorm.ErrRecordNotFound) {
78 return nil, nil
79 }
80 if err != nil {
81 return nil, fmt.Errorf("error retrieving livestream by postURI: %w", err)
82 }
83 return &livestream, nil
84}
85
86// GetLatestLivestreams returns the most recent livestreams, given a limit and a cursor
87// Only gets livestreams with a valid segment no less than 30 seconds old
88func (m *DBModel) GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) {
89 var recentLivestreams []Livestream
90 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
91
92 // get latest segment for the repo DID
93 latestRecentSegmentsSubQuery := m.DB.Table("segments").
94 Select("repo_did, MAX(start_time) as latest_segment_start_time").
95 Where("(repo_did, start_time) IN (?)",
96 m.DB.Table("segments").
97 Select("repo_did, MAX(start_time)").
98 Group("repo_did")).
99 Where("start_time > ?", thirtySecondsAgo.UTC()).
100 Group("repo_did")
101
102 rankedLivestreamsSubQuery := m.DB.Table("livestreams").
103 Select("livestreams.*, ROW_NUMBER() OVER(PARTITION BY livestreams.repo_did ORDER BY livestreams.created_at DESC) as rn").
104 Joins("JOIN repos ON livestreams.repo_did = repos.did")
105
106 mainQuery := m.DB.Table("(?) as ranked_livestreams", rankedLivestreamsSubQuery).
107 Joins("JOIN (?) as latest_segments ON ranked_livestreams.repo_did = latest_segments.repo_did", latestRecentSegmentsSubQuery).
108 Select("ranked_livestreams.*, latest_segments.latest_segment_start_time").
109 Where("ranked_livestreams.rn = 1")
110
111 if before != nil {
112 mainQuery = mainQuery.Where("livestreams.created_at < ?", *before)
113 }
114
115 mainQuery = mainQuery.Order("ranked_livestreams.created_at DESC").
116 Limit(limit).
117 Preload("Repo")
118
119 err := mainQuery.Find(&recentLivestreams).Error
120
121 if errors.Is(err, gorm.ErrRecordNotFound) {
122 return nil, nil
123 }
124
125 if err != nil {
126 return nil, fmt.Errorf("error fetching recent livestreams: %w", err)
127 }
128
129 if len(recentLivestreams) == 0 {
130 return nil, nil
131 }
132
133 return recentLivestreams, nil
134}