Live video on the AT Protocol
1package model
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "time"
8
9 "github.com/bluesky-social/indigo/api/bsky"
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 lexutil "github.com/bluesky-social/indigo/lex/util"
12 "stream.place/streamplace/pkg/aqtime"
13 "stream.place/streamplace/pkg/spid"
14 "stream.place/streamplace/pkg/streamplace"
15)
16
17type BroadcastOrigin struct {
18 URI string `gorm:"primaryKey;column:uri"`
19 CID string `gorm:"column:cid"`
20 RepoDID string `gorm:"column:repo_did"`
21 Repo *Repo `gorm:"foreignKey:DID;references:RepoDID"`
22 StreamerRepoDID string `gorm:"column:streamer_repo_did;index:idx_streamer_repo_did_indexed_at,priority:1"`
23 StreamerRepo *Repo `gorm:"foreignKey:DID;references:StreamerRepoDID"`
24 ServerRepoDID string `gorm:"column:server_repo_did;index:idx_server_repo_did_indexed_at,priority:1"`
25 IndexedAt time.Time `gorm:"column:indexed_at;index:idx_streamer_repo_did_indexed_at,priority:2;index:idx_server_repo_did_indexed_at,priority:2"`
26 Record []byte `gorm:"column:record"`
27}
28
29func (bo *BroadcastOrigin) TableName() string {
30 return "broadcast_origins"
31}
32
33func (bo *BroadcastOrigin) ToBroadcastOriginView() (*streamplace.BroadcastDefs_BroadcastOriginView, error) {
34 rec, err := lexutil.CborDecodeValue(bo.Record)
35 if err != nil {
36 return nil, fmt.Errorf("error decoding broadcast origin: %w", err)
37 }
38 return &streamplace.BroadcastDefs_BroadcastOriginView{
39 Author: &bsky.ActorDefs_ProfileViewBasic{
40 Did: bo.StreamerRepoDID,
41 },
42 Cid: bo.CID,
43 Record: &lexutil.LexiconTypeDecoder{Val: rec},
44 Uri: bo.URI,
45 }, nil
46}
47
48func (m *DBModel) UpdateBroadcastOrigin(ctx context.Context, origin *streamplace.BroadcastOrigin, aturi syntax.ATURI) error {
49 repoDID := aturi.Authority().String()
50 cid, err := spid.GetCID(origin)
51 if err != nil {
52 return fmt.Errorf("failed to get CID: %w", err)
53 }
54 validATURI := fmt.Sprintf("at://%s/place.stream.broadcast.origin/%s::%s", repoDID, origin.Streamer, origin.Server)
55 if validATURI != aturi.String() {
56 return fmt.Errorf("invalid ATURI: %s != %s", validATURI, aturi.String())
57 }
58 buf := bytes.Buffer{}
59 err = origin.MarshalCBOR(&buf)
60 if err != nil {
61 return fmt.Errorf("failed to marshal origin: %w", err)
62 }
63 aqt := aqtime.FromTime(time.Now().UTC())
64
65 bo := &BroadcastOrigin{
66 URI: validATURI,
67 CID: cid.String(),
68 StreamerRepoDID: origin.Streamer,
69 ServerRepoDID: origin.Server,
70 IndexedAt: aqt.Time().UTC(),
71 Record: buf.Bytes(),
72 }
73 return m.DB.Save(bo).Error
74}
75
76func (m *DBModel) GetRecentBroadcastOrigins(ctx context.Context) ([]*streamplace.BroadcastDefs_BroadcastOriginView, error) {
77 now := time.Now()
78 oneMinuteAgo := now.Add(-1 * time.Minute)
79
80 var origins []*BroadcastOrigin
81 err := m.DB.
82 Where("indexed_at >= ?", oneMinuteAgo.UTC()).
83 Find(&origins).Error
84 if err != nil {
85 return nil, err
86 }
87 views := make([]*streamplace.BroadcastDefs_BroadcastOriginView, len(origins))
88 for i, o := range origins {
89 view, err := o.ToBroadcastOriginView()
90 if err != nil {
91 return nil, err
92 }
93 views[i] = view
94 }
95 return views, nil
96}