fork
Configure Feed
Select the types of activity you want to include in your feed.
Live video on the AT Protocol
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package model
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/bluesky-social/indigo/api/bsky"
10 lexutil "github.com/bluesky-social/indigo/lex/util"
11 "gorm.io/gorm"
12 "gorm.io/gorm/clause"
13 "stream.place/streamplace/pkg/streamplace"
14)
15
16type Livestream struct {
17 URI string `json:"uri" gorm:"primaryKey;column:uri"`
18 CID string `json:"cid" gorm:"column:cid"`
19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_repo_created,priority:2"`
20 Livestream *[]byte `json:"livestream"`
21 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_created,priority:1"`
22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
23 Post *FeedPost `json:"post,omitempty" gorm:"foreignKey:CID;references:PostCID"`
24 PostCID string `json:"postCID" gorm:"column:post_cid"`
25 PostURI string `json:"postURI" gorm:"column:post_uri;index:idx_post_uri"`
26}
27
28func (ls *Livestream) ToLivestreamView() (*streamplace.Livestream_LivestreamView, error) {
29 rec, err := lexutil.CborDecodeValue(*ls.Livestream)
30 if err != nil {
31 return nil, fmt.Errorf("error decoding feed post: %w", err)
32 }
33 postView := streamplace.Livestream_LivestreamView{
34 LexiconTypeID: "place.stream.livestream#livestreamView",
35 Cid: ls.CID,
36 Uri: ls.URI,
37 Author: &bsky.ActorDefs_ProfileViewBasic{
38 Did: ls.RepoDID,
39 Handle: ls.Repo.Handle,
40 },
41 Record: &lexutil.LexiconTypeDecoder{Val: rec},
42 IndexedAt: time.Now().Format(time.RFC3339),
43 }
44 return &postView, nil
45}
46
47func (m *DBModel) CreateLivestream(ctx context.Context, ls *Livestream) error {
48 // upsert livestream record, actually
49 return m.DB.Clauses(clause.OnConflict{
50 Columns: []clause.Column{{Name: "uri"}},
51 DoUpdates: clause.AssignmentColumns([]string{"cid", "created_at", "livestream", "repo_did", "post_cid", "post_uri"}),
52 }).Create(ls).Error
53}
54
55// GetLatestLivestreamForRepo returns the most recent livestream for a given repo DID
56func (m *DBModel) GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) {
57 var livestream Livestream
58 err := m.DB.
59 Preload("Repo").
60 Preload("Post").
61 Where("repo_did = ?", repoDID).
62 Order("created_at DESC").
63 First(&livestream).Error
64 if err != nil {
65 return nil, fmt.Errorf("error retrieving latest livestream: %w", err)
66 }
67 return &livestream, nil
68}
69
70func (m *DBModel) GetLivestreamByPostURI(postURI string) (*Livestream, error) {
71 var livestream Livestream
72 err := m.DB.
73 Preload("Repo").
74 Preload("Post").
75 Where("post_uri = ?", postURI).
76 First(&livestream).Error
77 if errors.Is(err, gorm.ErrRecordNotFound) {
78 return nil, nil
79 }
80 if err != nil {
81 return nil, fmt.Errorf("error retrieving livestream by postURI: %w", err)
82 }
83 return &livestream, nil
84}
85
86// GetLatestLivestreams returns the most recent livestreams, given a limit and a cursor
87// Only gets livestreams with a valid segment no less than 30 seconds old
88func (m *DBModel) GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) {
89 var recentLivestreams []Livestream
90 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
91
92 // get latest segment for the repo DID
93 latestRecentSegmentsSubQuery := m.DB.Table("segments").
94 Select("repo_did, MAX(start_time) as latest_segment_start_time").
95 Where("(repo_did, start_time) IN (?)",
96 m.DB.Table("segments").
97 Select("repo_did, MAX(start_time)").
98 Group("repo_did")).
99 Where("start_time > ?", thirtySecondsAgo.UTC()).
100 Group("repo_did")
101
102 rankedLivestreamsSubQuery := m.DB.Table("livestreams").
103 Select("livestreams.*, ROW_NUMBER() OVER(PARTITION BY livestreams.repo_did ORDER BY livestreams.created_at DESC) as rn").
104 Joins("JOIN repos ON livestreams.repo_did = repos.did")
105
106 mainQuery := m.DB.Table("(?) as ranked_livestreams", rankedLivestreamsSubQuery).
107 Joins("JOIN (?) as latest_segments ON ranked_livestreams.repo_did = latest_segments.repo_did", latestRecentSegmentsSubQuery).
108 Select("ranked_livestreams.*, latest_segments.latest_segment_start_time").
109 Where("ranked_livestreams.rn = 1")
110
111 if before != nil {
112 mainQuery = mainQuery.Where("livestreams.created_at < ?", *before)
113 }
114
115 mainQuery = mainQuery.Order("ranked_livestreams.created_at DESC").
116 Limit(limit).
117 Preload("Repo")
118
119 err := mainQuery.Find(&recentLivestreams).Error
120
121 if errors.Is(err, gorm.ErrRecordNotFound) {
122 return nil, nil
123 }
124
125 if err != nil {
126 return nil, fmt.Errorf("error fetching recent livestreams: %w", err)
127 }
128
129 if len(recentLivestreams) == 0 {
130 return nil, nil
131 }
132
133 return recentLivestreams, nil
134}