Live video on the AT Protocol
1package statedb
2
3import (
4 "bytes"
5 "errors"
6 "time"
7
8 comatproto "github.com/bluesky-social/indigo/api/atproto"
9 lexutil "github.com/bluesky-social/indigo/lex/util"
10 "github.com/bluesky-social/indigo/util"
11 "github.com/ipfs/go-cid"
12 "gorm.io/gorm"
13)
14
15type XrpcStreamEvent struct {
16 CID string `json:"cid" gorm:"column:cid;primaryKey"`
17 RepoDID string `json:"repoDID" gorm:"index:idx_repo_timestamp,priority:1;index:idx_repo_seq,priority:1;column:repo_did"`
18 Timestamp time.Time `json:"timestamp" gorm:"index:idx_repo_timestamp,priority:2;column:timestamp"`
19 Data []byte `json:"data" gorm:"column:data"`
20 SignedData string `json:"signedData" gorm:"column:signed_data"`
21 Seq int64 `json:"seq" gorm:"index:idx_repo_seq,priority:2;column:seq"`
22}
23
24func (ev *XrpcStreamEvent) ToCommitEvent() (*comatproto.SyncSubscribeRepos_Commit, error) {
25 commit := &comatproto.SyncSubscribeRepos_Commit{}
26 err := commit.UnmarshalCBOR(bytes.NewReader(ev.Data))
27 if err != nil {
28 return nil, err
29 }
30 return commit, nil
31}
32
33func (state *StatefulDB) CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit, signedData string) error {
34 prev, err := state.GetMostRecentCommitEvent(commit.Repo)
35 if err != nil {
36 return err
37 }
38 if prev != nil {
39 prevCommit, err := prev.ToCommitEvent()
40 if err != nil {
41 return err
42 }
43 commit.Seq = prevCommit.Seq + 1
44 c, err := cid.Parse(prev.SignedData)
45 if err != nil {
46 return err
47 }
48 ll := lexutil.LexLink(c)
49 commit.PrevData = &ll
50 commit.Since = &prevCommit.Rev
51 } else {
52 commit.Seq = 1
53 }
54 buf := bytes.Buffer{}
55 err = commit.MarshalCBOR(&buf)
56 if err != nil {
57 return err
58 }
59 timestamp, err := time.Parse(util.ISO8601, commit.Time)
60 if err != nil {
61 return err
62 }
63 event := &XrpcStreamEvent{
64 CID: commit.Commit.String(),
65 RepoDID: commit.Repo,
66 Timestamp: timestamp.UTC(),
67 Data: buf.Bytes(),
68 Seq: commit.Seq,
69 SignedData: signedData,
70 }
71 return state.DB.Create(event).Error
72}
73
74func (state *StatefulDB) GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) {
75 var events []*XrpcStreamEvent
76 query := state.DB.Where("repo_did = ?", repoDID)
77 query = query.Where("timestamp > ?", t.UTC())
78 err := query.Order("timestamp ASC").Find(&events).Error
79 if err != nil {
80 return nil, err
81 }
82 return events, nil
83}
84
85func (state *StatefulDB) GetCommitEventsSinceSeq(repoDID string, seq int64) ([]*XrpcStreamEvent, error) {
86 var events []*XrpcStreamEvent
87 query := state.DB.Where("repo_did = ?", repoDID)
88 query = query.Where("seq > ?", seq)
89 err := query.Order("timestamp ASC").Find(&events).Error
90 if err != nil {
91 return nil, err
92 }
93 return events, nil
94}
95
96func (state *StatefulDB) GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) {
97 var event XrpcStreamEvent
98 err := state.DB.Where("repo_did = ?", repoDID).
99 Order("timestamp DESC").
100 Limit(1).
101 First(&event).Error
102 if errors.Is(err, gorm.ErrRecordNotFound) {
103 return nil, nil
104 } else if err != nil {
105 return nil, err
106 }
107 return &event, nil
108}