Live video on the AT Protocol
1package statedb
2
3import (
4 "bytes"
5 "errors"
6 "time"
7
8 comatproto "github.com/bluesky-social/indigo/api/atproto"
9 lexutil "github.com/bluesky-social/indigo/lex/util"
10 "github.com/bluesky-social/indigo/util"
11 "github.com/ipfs/go-cid"
12 "gorm.io/gorm"
13)
14
15type XrpcStreamEvent struct {
16 CID string `json:"cid" gorm:"column:cid;primaryKey"`
17 RepoDID string `json:"repoDID" gorm:"index:idx_repo_timestamp,priority:1;index:idx_repo_seq,priority:1;column:repo_did"`
18 Timestamp time.Time `json:"timestamp" gorm:"index:idx_repo_timestamp,priority:2;column:timestamp"`
19 Data []byte `json:"data" gorm:"column:data"`
20 SignedData string `json:"signedData" gorm:"column:signed_data"`
21 Seq int64 `json:"seq" gorm:"index:idx_repo_seq,priority:2;column:seq"`
22}
23
24func (ev *XrpcStreamEvent) ToCommitEvent() (*comatproto.SyncSubscribeRepos_Commit, error) {
25 commit := &comatproto.SyncSubscribeRepos_Commit{}
26 err := commit.UnmarshalCBOR(bytes.NewReader(ev.Data))
27 if err != nil {
28 return nil, err
29 }
30 return commit, nil
31}
32
33const CommitLockKey = "commit_lock"
34
35func (state *StatefulDB) CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit, signedData string) error {
36 unlock, err := state.GetNamedLock(CommitLockKey)
37 if err != nil {
38 return err
39 }
40 defer unlock()
41 prev, err := state.GetMostRecentCommitEvent(commit.Repo)
42 if err != nil {
43 return err
44 }
45 if prev != nil {
46 prevCommit, err := prev.ToCommitEvent()
47 if err != nil {
48 return err
49 }
50 commit.Seq = prevCommit.Seq + 1
51 c, err := cid.Parse(prev.SignedData)
52 if err != nil {
53 return err
54 }
55 ll := lexutil.LexLink(c)
56 commit.PrevData = &ll
57 commit.Since = &prevCommit.Rev
58 } else {
59 commit.Seq = 1
60 }
61 buf := bytes.Buffer{}
62 err = commit.MarshalCBOR(&buf)
63 if err != nil {
64 return err
65 }
66 timestamp, err := time.Parse(util.ISO8601, commit.Time)
67 if err != nil {
68 return err
69 }
70 event := &XrpcStreamEvent{
71 CID: commit.Commit.String(),
72 RepoDID: commit.Repo,
73 Timestamp: timestamp.UTC(),
74 Data: buf.Bytes(),
75 Seq: commit.Seq,
76 SignedData: signedData,
77 }
78 return state.DB.Create(event).Error
79}
80
81func (state *StatefulDB) GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) {
82 var events []*XrpcStreamEvent
83 query := state.DB.Where("repo_did = ?", repoDID)
84 query = query.Where("timestamp > ?", t.UTC())
85 err := query.Order("timestamp ASC").Find(&events).Error
86 if err != nil {
87 return nil, err
88 }
89 return events, nil
90}
91
92func (state *StatefulDB) GetCommitEventsSinceSeq(repoDID string, seq int64) ([]*XrpcStreamEvent, error) {
93 var events []*XrpcStreamEvent
94 query := state.DB.Where("repo_did = ?", repoDID)
95 query = query.Where("seq > ?", seq)
96 err := query.Order("timestamp ASC").Find(&events).Error
97 if err != nil {
98 return nil, err
99 }
100 return events, nil
101}
102
103func (state *StatefulDB) GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) {
104 var event XrpcStreamEvent
105 err := state.DB.Where("repo_did = ?", repoDID).
106 Order("timestamp DESC").
107 Limit(1).
108 First(&event).Error
109 if errors.Is(err, gorm.ErrRecordNotFound) {
110 return nil, nil
111 } else if err != nil {
112 return nil, err
113 }
114 return &event, nil
115}