Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at next 169 lines 5.1 kB view raw
1package model 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "time" 8 9 "github.com/bluesky-social/indigo/api/bsky" 10 lexutil "github.com/bluesky-social/indigo/lex/util" 11 "gorm.io/gorm" 12 "gorm.io/gorm/clause" 13 "stream.place/streamplace/pkg/streamplace" 14) 15 16type Livestream struct { 17 URI string `json:"uri" gorm:"primaryKey;column:uri"` 18 CID string `json:"cid" gorm:"column:cid"` 19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_repo_created,priority:2"` 20 Livestream *[]byte `json:"livestream"` 21 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_created,priority:1"` 22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 23 Post *FeedPost `json:"post,omitempty" gorm:"foreignKey:CID;references:PostCID"` 24 PostCID string `json:"postCID" gorm:"column:post_cid"` 25 PostURI string `json:"postURI" gorm:"column:post_uri;index:idx_post_uri"` 26} 27 28func (ls *Livestream) ToLivestreamView() (*streamplace.Livestream_LivestreamView, error) { 29 rec, err := lexutil.CborDecodeValue(*ls.Livestream) 30 if err != nil { 31 return nil, fmt.Errorf("error decoding feed post: %w", err) 32 } 33 postView := streamplace.Livestream_LivestreamView{ 34 LexiconTypeID: "place.stream.livestream#livestreamView", 35 Cid: ls.CID, 36 Uri: ls.URI, 37 Author: &bsky.ActorDefs_ProfileViewBasic{ 38 Did: ls.RepoDID, 39 Handle: ls.Repo.Handle, 40 }, 41 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 42 IndexedAt: time.Now().Format(time.RFC3339), 43 } 44 return &postView, nil 45} 46 47func (m *DBModel) CreateLivestream(ctx context.Context, ls *Livestream) error { 48 // upsert livestream record, actually 49 return m.DB.Clauses(clause.OnConflict{ 50 Columns: []clause.Column{{Name: "uri"}}, 51 DoUpdates: clause.AssignmentColumns([]string{"cid", "created_at", "livestream", "repo_did", "post_cid", "post_uri"}), 52 }).Create(ls).Error 53} 54 55func (m *DBModel) GetLivestream(uri string) (*Livestream, error) { 56 var livestream Livestream 57 err := m.DB. 58 Preload("Repo"). 59 Preload("Post"). 60 Where("uri = ?", uri). 61 First(&livestream).Error 62 if errors.Is(err, gorm.ErrRecordNotFound) { 63 return nil, nil 64 } 65 if err != nil { 66 return nil, fmt.Errorf("error retrieving livestream by uri: %w", err) 67 } 68 return &livestream, nil 69} 70 71// GetLatestLivestreamForRepo returns the most recent livestream for a given repo DID 72func (m *DBModel) GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) { 73 var livestream Livestream 74 err := m.DB. 75 Preload("Repo"). 76 Preload("Post"). 77 Where("repo_did = ?", repoDID). 78 Order("created_at DESC"). 79 First(&livestream).Error 80 if errors.Is(err, gorm.ErrRecordNotFound) { 81 return nil, nil 82 } 83 if err != nil { 84 return nil, fmt.Errorf("error retrieving latest livestream: %w", err) 85 } 86 return &livestream, nil 87} 88 89func (m *DBModel) GetLivestreamByPostURI(postURI string) (*Livestream, error) { 90 var livestream Livestream 91 err := m.DB. 92 Preload("Repo"). 93 Preload("Post"). 94 Where("post_uri = ?", postURI). 95 First(&livestream).Error 96 if errors.Is(err, gorm.ErrRecordNotFound) { 97 return nil, nil 98 } 99 if err != nil { 100 return nil, fmt.Errorf("error retrieving livestream by postURI: %w", err) 101 } 102 return &livestream, nil 103} 104 105// Get the latest livestreams for a given list of repo DIDs 106func (m *DBModel) GetLatestLivestreams(limit int, before *time.Time, dids []string) ([]Livestream, error) { 107 var recentLivestreams []Livestream 108 now := time.Now().UTC() 109 110 if len(dids) == 0 { 111 return []Livestream{}, nil 112 } 113 114 // Subquery to get the most recent livestream for each repo_did 115 subQuery := m.DB. 116 Table("livestreams"). 117 Select("MAX(created_at) as max_created_at, repo_did"). 118 Where("repo_did IN ?", dids). 119 Group("repo_did") 120 121 mainQuery := m.DB. 122 Table("livestreams"). 123 Select("livestreams.*"). 124 Joins("JOIN (?) as sq ON livestreams.repo_did = sq.repo_did AND livestreams.created_at = sq.max_created_at", subQuery). 125 Where("livestreams.repo_did IN ?", dids). 126 // exclude livestreams with !hide label on the record 127 Where("NOT EXISTS (?)", 128 m.DB.Table("labels"). 129 Select("1"). 130 Where("labels.uri = livestreams.uri"). 131 Where("labels.val = ?", "!hide"). 132 Where("labels.neg = ?", false). 133 Where("(labels.exp IS NULL OR labels.exp > ?)", now), 134 ). 135 // exclude livestreams with !hide label on the user 136 Where("NOT EXISTS (?)", 137 m.DB.Table("labels"). 138 Select("1"). 139 Where("labels.uri = livestreams.repo_did"). 140 Where("labels.val = ?", "!hide"). 141 Where("labels.neg = ?", false). 142 Where("(labels.exp IS NULL OR labels.exp > ?)", now), 143 ) 144 145 if before != nil { 146 mainQuery = mainQuery.Where("livestreams.created_at < ?", *before) 147 } 148 149 mainQuery = mainQuery. 150 Order("livestreams.created_at DESC"). 151 Limit(limit). 152 Preload("Repo") 153 154 err := mainQuery.Find(&recentLivestreams).Error 155 156 if errors.Is(err, gorm.ErrRecordNotFound) { 157 return nil, nil 158 } 159 160 if err != nil { 161 return nil, fmt.Errorf("error fetching recent livestreams: %w", err) 162 } 163 164 if len(recentLivestreams) == 0 { 165 return nil, nil 166 } 167 168 return recentLivestreams, nil 169}