Live video on the AT Protocol
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/multitesting 115 lines 3.2 kB view raw
1package statedb 2 3import ( 4 "bytes" 5 "errors" 6 "time" 7 8 comatproto "github.com/bluesky-social/indigo/api/atproto" 9 lexutil "github.com/bluesky-social/indigo/lex/util" 10 "github.com/bluesky-social/indigo/util" 11 "github.com/ipfs/go-cid" 12 "gorm.io/gorm" 13) 14 15type XrpcStreamEvent struct { 16 CID string `json:"cid" gorm:"column:cid;primaryKey"` 17 RepoDID string `json:"repoDID" gorm:"index:idx_repo_timestamp,priority:1;index:idx_repo_seq,priority:1;column:repo_did"` 18 Timestamp time.Time `json:"timestamp" gorm:"index:idx_repo_timestamp,priority:2;column:timestamp"` 19 Data []byte `json:"data" gorm:"column:data"` 20 SignedData string `json:"signedData" gorm:"column:signed_data"` 21 Seq int64 `json:"seq" gorm:"index:idx_repo_seq,priority:2;column:seq"` 22} 23 24func (ev *XrpcStreamEvent) ToCommitEvent() (*comatproto.SyncSubscribeRepos_Commit, error) { 25 commit := &comatproto.SyncSubscribeRepos_Commit{} 26 err := commit.UnmarshalCBOR(bytes.NewReader(ev.Data)) 27 if err != nil { 28 return nil, err 29 } 30 return commit, nil 31} 32 33const CommitLockKey = "commit_lock" 34 35func (state *StatefulDB) CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit, signedData string) error { 36 unlock, err := state.GetNamedLock(CommitLockKey) 37 if err != nil { 38 return err 39 } 40 defer unlock() 41 prev, err := state.GetMostRecentCommitEvent(commit.Repo) 42 if err != nil { 43 return err 44 } 45 if prev != nil { 46 prevCommit, err := prev.ToCommitEvent() 47 if err != nil { 48 return err 49 } 50 commit.Seq = prevCommit.Seq + 1 51 c, err := cid.Parse(prev.SignedData) 52 if err != nil { 53 return err 54 } 55 ll := lexutil.LexLink(c) 56 commit.PrevData = &ll 57 commit.Since = &prevCommit.Rev 58 } else { 59 commit.Seq = 1 60 } 61 buf := bytes.Buffer{} 62 err = commit.MarshalCBOR(&buf) 63 if err != nil { 64 return err 65 } 66 timestamp, err := time.Parse(util.ISO8601, commit.Time) 67 if err != nil { 68 return err 69 } 70 event := &XrpcStreamEvent{ 71 CID: commit.Commit.String(), 72 RepoDID: commit.Repo, 73 Timestamp: timestamp.UTC(), 74 Data: buf.Bytes(), 75 Seq: commit.Seq, 76 SignedData: signedData, 77 } 78 return state.DB.Create(event).Error 79} 80 81func (state *StatefulDB) GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) { 82 var events []*XrpcStreamEvent 83 query := state.DB.Where("repo_did = ?", repoDID) 84 query = query.Where("timestamp > ?", t.UTC()) 85 err := query.Order("timestamp ASC").Find(&events).Error 86 if err != nil { 87 return nil, err 88 } 89 return events, nil 90} 91 92func (state *StatefulDB) GetCommitEventsSinceSeq(repoDID string, seq int64) ([]*XrpcStreamEvent, error) { 93 var events []*XrpcStreamEvent 94 query := state.DB.Where("repo_did = ?", repoDID) 95 query = query.Where("seq > ?", seq) 96 err := query.Order("timestamp ASC").Find(&events).Error 97 if err != nil { 98 return nil, err 99 } 100 return events, nil 101} 102 103func (state *StatefulDB) GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) { 104 var event XrpcStreamEvent 105 err := state.DB.Where("repo_did = ?", repoDID). 106 Order("timestamp DESC"). 107 Limit(1). 108 First(&event).Error 109 if errors.Is(err, gorm.ErrRecordNotFound) { 110 return nil, nil 111 } else if err != nil { 112 return nil, err 113 } 114 return &event, nil 115}