Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/bluesky-social/indigo/api/bsky"
10 lexutil "github.com/bluesky-social/indigo/lex/util"
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/streamplace"
13)
14
15type Livestream struct {
16 CID string `json:"cid" gorm:"primaryKey;column:cid"`
17 URI string `json:"uri"`
18 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_repo_created,priority:2"`
19 Livestream *[]byte `json:"livestream"`
20 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_created,priority:1"`
21 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
22 Post *FeedPost `json:"post,omitempty" gorm:"foreignKey:CID;references:PostCID"`
23 PostCID string `json:"postCID" gorm:"column:post_cid;index:idx_post_cid"`
24 PostURI string `json:"postURI" gorm:"column:post_uri"`
25}
26
27func (ls *Livestream) ToLivestreamView() (*streamplace.Livestream_LivestreamView, error) {
28 rec, err := lexutil.CborDecodeValue(*ls.Livestream)
29 if err != nil {
30 return nil, fmt.Errorf("error decoding feed post: %w", err)
31 }
32 postView := streamplace.Livestream_LivestreamView{
33 LexiconTypeID: "place.stream.livestream#livestreamView",
34 Cid: ls.CID,
35 Uri: ls.URI,
36 Author: &bsky.ActorDefs_ProfileViewBasic{
37 Did: ls.RepoDID,
38 Handle: ls.Repo.Handle,
39 },
40 Record: &lexutil.LexiconTypeDecoder{Val: rec},
41 IndexedAt: time.Now().Format(time.RFC3339),
42 }
43 return &postView, nil
44}
45
46func (m *DBModel) CreateLivestream(ctx context.Context, ls *Livestream) error {
47 return m.DB.Create(ls).Error
48}
49
50// GetLatestLivestreamForRepo returns the most recent livestream for a given repo DID
51func (m *DBModel) GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) {
52 var livestream Livestream
53 err := m.DB.
54 Preload("Repo").
55 Preload("Post").
56 Where("repo_did = ?", repoDID).
57 Order("created_at DESC").
58 First(&livestream).Error
59 if err != nil {
60 return nil, fmt.Errorf("error retrieving latest livestream: %w", err)
61 }
62 return &livestream, nil
63}
64
65func (m *DBModel) GetLivestreamByPostCID(postCID string) (*Livestream, error) {
66 var livestream Livestream
67 err := m.DB.
68 Preload("Repo").
69 Preload("Post").
70 Where("post_cid = ?", postCID).
71 First(&livestream).Error
72 if errors.Is(err, gorm.ErrRecordNotFound) {
73 return nil, nil
74 }
75 if err != nil {
76 return nil, fmt.Errorf("error retrieving livestream by postCID: %w", err)
77 }
78 return &livestream, nil
79}
80
81// GetLatestLivestreams returns the most recent livestreams, given a limit and a cursor
82// Only gets livestreams with a valid segment no less than 30 seconds old
83func (m *DBModel) GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) {
84 var recentLivestreams []Livestream
85 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
86
87 // get latest segment for the repo DID
88 latestRecentSegmentsSubQuery := m.DB.Table("segments").
89 Select("repo_did, MAX(start_time) as latest_segment_start_time").
90 Where("(repo_did, start_time) IN (?)",
91 m.DB.Table("segments").
92 Select("repo_did, MAX(start_time)").
93 Group("repo_did")).
94 Where("start_time > ?", thirtySecondsAgo.UTC()).
95 Group("repo_did")
96
97 rankedLivestreamsSubQuery := m.DB.Table("livestreams").
98 Select("livestreams.*, ROW_NUMBER() OVER(PARTITION BY livestreams.repo_did ORDER BY livestreams.created_at DESC) as rn").
99 Joins("JOIN repos ON livestreams.repo_did = repos.did")
100
101 mainQuery := m.DB.Table("(?) as ranked_livestreams", rankedLivestreamsSubQuery).
102 Joins("JOIN (?) as latest_segments ON ranked_livestreams.repo_did = latest_segments.repo_did", latestRecentSegmentsSubQuery).
103 Select("ranked_livestreams.*, latest_segments.latest_segment_start_time").
104 Where("ranked_livestreams.rn = 1")
105
106 if before != nil {
107 mainQuery = mainQuery.Where("livestreams.created_at < ?", *before)
108 }
109
110 mainQuery = mainQuery.Order("ranked_livestreams.created_at DESC").
111 Limit(limit).
112 Preload("Repo")
113
114 err := mainQuery.Find(&recentLivestreams).Error
115
116 if errors.Is(err, gorm.ErrRecordNotFound) {
117 return nil, nil
118 }
119
120 if err != nil {
121 return nil, fmt.Errorf("error fetching recent livestreams: %w", err)
122 }
123
124 if len(recentLivestreams) == 0 {
125 return nil, nil
126 }
127
128 return recentLivestreams, nil
129}