Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/bluesky-social/indigo/api/bsky"
10 lexutil "github.com/bluesky-social/indigo/lex/util"
11 "gorm.io/gorm"
12 "gorm.io/gorm/clause"
13 "stream.place/streamplace/pkg/streamplace"
14)
15
16type Livestream struct {
17 URI string `json:"uri" gorm:"primaryKey;column:uri"`
18 CID string `json:"cid" gorm:"column:cid"`
19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_repo_created,priority:2"`
20 Livestream *[]byte `json:"livestream"`
21 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_created,priority:1"`
22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
23 Post *FeedPost `json:"post,omitempty" gorm:"foreignKey:CID;references:PostCID"`
24 PostCID string `json:"postCID" gorm:"column:post_cid"`
25 PostURI string `json:"postURI" gorm:"column:post_uri;index:idx_post_uri"`
26}
27
28func (ls *Livestream) ToLivestreamView() (*streamplace.Livestream_LivestreamView, error) {
29 rec, err := lexutil.CborDecodeValue(*ls.Livestream)
30 if err != nil {
31 return nil, fmt.Errorf("error decoding feed post: %w", err)
32 }
33 postView := streamplace.Livestream_LivestreamView{
34 LexiconTypeID: "place.stream.livestream#livestreamView",
35 Cid: ls.CID,
36 Uri: ls.URI,
37 Author: &bsky.ActorDefs_ProfileViewBasic{
38 Did: ls.RepoDID,
39 Handle: ls.Repo.Handle,
40 },
41 Record: &lexutil.LexiconTypeDecoder{Val: rec},
42 IndexedAt: time.Now().Format(time.RFC3339),
43 }
44 return &postView, nil
45}
46
47func (m *DBModel) CreateLivestream(ctx context.Context, ls *Livestream) error {
48 // upsert livestream record, actually
49 return m.DB.Clauses(clause.OnConflict{
50 Columns: []clause.Column{{Name: "uri"}},
51 DoUpdates: clause.AssignmentColumns([]string{"cid", "created_at", "livestream", "repo_did", "post_cid", "post_uri"}),
52 }).Create(ls).Error
53}
54
55func (m *DBModel) GetLivestream(uri string) (*Livestream, error) {
56 var livestream Livestream
57 err := m.DB.
58 Preload("Repo").
59 Preload("Post").
60 Where("uri = ?", uri).
61 First(&livestream).Error
62 if errors.Is(err, gorm.ErrRecordNotFound) {
63 return nil, nil
64 }
65 if err != nil {
66 return nil, fmt.Errorf("error retrieving livestream by uri: %w", err)
67 }
68 return &livestream, nil
69}
70
71// GetLatestLivestreamForRepo returns the most recent livestream for a given repo DID
72func (m *DBModel) GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) {
73 var livestream Livestream
74 err := m.DB.
75 Preload("Repo").
76 Preload("Post").
77 Where("repo_did = ?", repoDID).
78 Order("created_at DESC").
79 First(&livestream).Error
80 if errors.Is(err, gorm.ErrRecordNotFound) {
81 return nil, nil
82 }
83 if err != nil {
84 return nil, fmt.Errorf("error retrieving latest livestream: %w", err)
85 }
86 return &livestream, nil
87}
88
89func (m *DBModel) GetLivestreamByPostURI(postURI string) (*Livestream, error) {
90 var livestream Livestream
91 err := m.DB.
92 Preload("Repo").
93 Preload("Post").
94 Where("post_uri = ?", postURI).
95 First(&livestream).Error
96 if errors.Is(err, gorm.ErrRecordNotFound) {
97 return nil, nil
98 }
99 if err != nil {
100 return nil, fmt.Errorf("error retrieving livestream by postURI: %w", err)
101 }
102 return &livestream, nil
103}
104
105// Get the latest livestreams for a given list of repo DIDs
106func (m *DBModel) GetLatestLivestreams(limit int, before *time.Time, dids []string) ([]Livestream, error) {
107 var recentLivestreams []Livestream
108 now := time.Now().UTC()
109
110 if len(dids) == 0 {
111 return []Livestream{}, nil
112 }
113
114 // Subquery to get the most recent livestream for each repo_did
115 subQuery := m.DB.
116 Table("livestreams").
117 Select("MAX(created_at) as max_created_at, repo_did").
118 Where("repo_did IN ?", dids).
119 Group("repo_did")
120
121 mainQuery := m.DB.
122 Table("livestreams").
123 Select("livestreams.*").
124 Joins("JOIN (?) as sq ON livestreams.repo_did = sq.repo_did AND livestreams.created_at = sq.max_created_at", subQuery).
125 Where("livestreams.repo_did IN ?", dids).
126 // exclude livestreams with !hide label on the record
127 Where("NOT EXISTS (?)",
128 m.DB.Table("labels").
129 Select("1").
130 Where("labels.uri = livestreams.uri").
131 Where("labels.val = ?", "!hide").
132 Where("labels.neg = ?", false).
133 Where("(labels.exp IS NULL OR labels.exp > ?)", now),
134 ).
135 // exclude livestreams with !hide label on the user
136 Where("NOT EXISTS (?)",
137 m.DB.Table("labels").
138 Select("1").
139 Where("labels.uri = livestreams.repo_did").
140 Where("labels.val = ?", "!hide").
141 Where("labels.neg = ?", false).
142 Where("(labels.exp IS NULL OR labels.exp > ?)", now),
143 )
144
145 if before != nil {
146 mainQuery = mainQuery.Where("livestreams.created_at < ?", *before)
147 }
148
149 mainQuery = mainQuery.
150 Order("livestreams.created_at DESC").
151 Limit(limit).
152 Preload("Repo")
153
154 err := mainQuery.Find(&recentLivestreams).Error
155
156 if errors.Is(err, gorm.ErrRecordNotFound) {
157 return nil, nil
158 }
159
160 if err != nil {
161 return nil, fmt.Errorf("error fetching recent livestreams: %w", err)
162 }
163
164 if len(recentLivestreams) == 0 {
165 return nil, nil
166 }
167
168 return recentLivestreams, nil
169}