fork
Configure Feed
Select the types of activity you want to include in your feed.
Live video on the AT Protocol
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package statedb
2
3import (
4 "bytes"
5 "errors"
6 "time"
7
8 comatproto "github.com/bluesky-social/indigo/api/atproto"
9 lexutil "github.com/bluesky-social/indigo/lex/util"
10 "github.com/bluesky-social/indigo/util"
11 "github.com/ipfs/go-cid"
12 "gorm.io/gorm"
13)
14
15type XrpcStreamEvent struct {
16 CID string `json:"cid" gorm:"column:cid;primaryKey"`
17 RepoDID string `json:"repoDID" gorm:"index:idx_repo_timestamp,priority:1;index:idx_repo_seq,priority:1;column:repo_did"`
18 Timestamp time.Time `json:"timestamp" gorm:"index:idx_repo_timestamp,priority:2;column:timestamp"`
19 Data []byte `json:"data" gorm:"column:data"`
20 SignedData string `json:"signedData" gorm:"column:signed_data"`
21 Seq int64 `json:"seq" gorm:"index:idx_repo_seq,priority:2;column:seq"`
22}
23
24func (ev *XrpcStreamEvent) ToCommitEvent() (*comatproto.SyncSubscribeRepos_Commit, error) {
25 commit := &comatproto.SyncSubscribeRepos_Commit{}
26 err := commit.UnmarshalCBOR(bytes.NewReader(ev.Data))
27 if err != nil {
28 return nil, err
29 }
30 return commit, nil
31}
32
33const CommitLockKey = "commit_lock"
34
35func (state *StatefulDB) CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit, signedData string) error {
36 unlock, err := state.GetNamedLock(CommitLockKey)
37 if err != nil {
38 return err
39 }
40 defer unlock()
41 prev, err := state.GetMostRecentCommitEvent(commit.Repo)
42 if err != nil {
43 return err
44 }
45 if prev != nil {
46 prevCommit, err := prev.ToCommitEvent()
47 if err != nil {
48 return err
49 }
50 commit.Seq = prevCommit.Seq + 1
51 c, err := cid.Parse(prev.SignedData)
52 if err != nil {
53 return err
54 }
55 ll := lexutil.LexLink(c)
56 commit.PrevData = &ll
57 commit.Since = &prevCommit.Rev
58 } else {
59 commit.Seq = 1
60 }
61 buf := bytes.Buffer{}
62 err = commit.MarshalCBOR(&buf)
63 if err != nil {
64 return err
65 }
66 timestamp, err := time.Parse(util.ISO8601, commit.Time)
67 if err != nil {
68 return err
69 }
70 event := &XrpcStreamEvent{
71 CID: commit.Commit.String(),
72 RepoDID: commit.Repo,
73 Timestamp: timestamp.UTC(),
74 Data: buf.Bytes(),
75 Seq: commit.Seq,
76 SignedData: signedData,
77 }
78 return state.DB.Create(event).Error
79}
80
81func (state *StatefulDB) GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) {
82 var events []*XrpcStreamEvent
83 query := state.DB.Where("repo_did = ?", repoDID)
84 query = query.Where("timestamp > ?", t.UTC())
85 err := query.Order("timestamp ASC").Find(&events).Error
86 if err != nil {
87 return nil, err
88 }
89 return events, nil
90}
91
92func (state *StatefulDB) GetCommitEventsSinceSeq(repoDID string, seq int64) ([]*XrpcStreamEvent, error) {
93 var events []*XrpcStreamEvent
94 query := state.DB.Where("repo_did = ?", repoDID)
95 query = query.Where("seq > ?", seq)
96 err := query.Order("timestamp ASC").Find(&events).Error
97 if err != nil {
98 return nil, err
99 }
100 return events, nil
101}
102
103func (state *StatefulDB) GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) {
104 var event XrpcStreamEvent
105 err := state.DB.Where("repo_did = ?", repoDID).
106 Order("timestamp DESC").
107 Limit(1).
108 First(&event).Error
109 if errors.Is(err, gorm.ErrRecordNotFound) {
110 return nil, nil
111 } else if err != nil {
112 return nil, err
113 }
114 return &event, nil
115}