Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/bluesky-social/indigo/api/bsky"
10 lexutil "github.com/bluesky-social/indigo/lex/util"
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/streamplace"
13)
14
15type Livestream struct {
16 CID string `json:"cid" gorm:"primaryKey;column:cid"`
17 URI string `json:"uri"`
18 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_repo_created,priority:2"`
19 Livestream *[]byte `json:"livestream"`
20 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_created,priority:1"`
21 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
22 Post *FeedPost `json:"post,omitempty" gorm:"foreignKey:CID;references:PostCID"`
23 PostCID string `json:"postCID" gorm:"column:post_cid;index:idx_post_cid"`
24 PostURI string `json:"postURI" gorm:"column:post_uri"`
25}
26
27func (ls *Livestream) ToLivestreamView() (*streamplace.Livestream_LivestreamView, error) {
28 rec, err := lexutil.CborDecodeValue(*ls.Livestream)
29 if err != nil {
30 return nil, fmt.Errorf("error decoding feed post: %w", err)
31 }
32 postView := streamplace.Livestream_LivestreamView{
33 LexiconTypeID: "place.stream.livestream#livestreamView",
34 Cid: ls.CID,
35 Uri: ls.URI,
36 Author: &bsky.ActorDefs_ProfileViewBasic{
37 Did: ls.RepoDID,
38 Handle: ls.Repo.Handle,
39 },
40 Record: &lexutil.LexiconTypeDecoder{Val: rec},
41 IndexedAt: time.Now().Format(time.RFC3339),
42 }
43 return &postView, nil
44}
45
46func (m *DBModel) CreateLivestream(ctx context.Context, ls *Livestream) error {
47 return m.DB.Create(ls).Error
48}
49
50// GetLatestLivestreamForRepo returns the most recent livestream for a given repo DID
51func (m *DBModel) GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) {
52 var livestream Livestream
53 err := m.DB.
54 Preload("Repo").
55 Where("repo_did = ?", repoDID).
56 Order("created_at DESC").
57 First(&livestream).Error
58 if err != nil {
59 return nil, fmt.Errorf("error retrieving latest livestream: %w", err)
60 }
61 return &livestream, nil
62}
63
64func (m *DBModel) GetLivestreamByPostCID(postCID string) (*Livestream, error) {
65 var livestream Livestream
66 err := m.DB.
67 Preload("Repo").
68 Where("post_cid = ?", postCID).
69 First(&livestream).Error
70 if errors.Is(err, gorm.ErrRecordNotFound) {
71 return nil, nil
72 }
73 if err != nil {
74 return nil, fmt.Errorf("error retrieving livestream by postCID: %w", err)
75 }
76 return &livestream, nil
77}
78
79// GetLatestLivestreams returns the most recent livestreams, given a limit and a cursor
80// Only gets livestreams with a valid segment no less than 30 seconds old
81func (m *DBModel) GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) {
82 var recentLivestreams []Livestream
83 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
84
85 // get latest segment for the repo DID
86 latestRecentSegmentsSubQuery := m.DB.Table("segments").
87 Select("repo_did, MAX(start_time) as latest_segment_start_time").
88 Where("(repo_did, start_time) IN (?)",
89 m.DB.Table("segments").
90 Select("repo_did, MAX(start_time)").
91 Group("repo_did")).
92 Where("start_time > ?", thirtySecondsAgo.UTC()).
93 Group("repo_did")
94
95 rankedLivestreamsSubQuery := m.DB.Table("livestreams").
96 Select("livestreams.*, ROW_NUMBER() OVER(PARTITION BY livestreams.repo_did ORDER BY livestreams.created_at DESC) as rn").
97 Joins("JOIN repos ON livestreams.repo_did = repos.did")
98
99 mainQuery := m.DB.Table("(?) as ranked_livestreams", rankedLivestreamsSubQuery).
100 Joins("JOIN (?) as latest_segments ON ranked_livestreams.repo_did = latest_segments.repo_did", latestRecentSegmentsSubQuery).
101 Select("ranked_livestreams.*, latest_segments.latest_segment_start_time").
102 Where("ranked_livestreams.rn = 1")
103
104 if before != nil {
105 mainQuery = mainQuery.Where("livestreams.created_at < ?", *before)
106 }
107
108 mainQuery = mainQuery.Order("ranked_livestreams.created_at DESC").
109 Limit(limit).
110 Preload("Repo")
111
112 err := mainQuery.Find(&recentLivestreams).Error
113
114 if errors.Is(err, gorm.ErrRecordNotFound) {
115 return nil, nil
116 }
117
118 if err != nil {
119 return nil, fmt.Errorf("error fetching recent livestreams: %w", err)
120 }
121
122 if len(recentLivestreams) == 0 {
123 return nil, nil
124 }
125
126 return recentLivestreams, nil
127}