Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/dev-env 108 lines 3.0 kB view raw
1package statedb 2 3import ( 4 "bytes" 5 "errors" 6 "time" 7 8 comatproto "github.com/bluesky-social/indigo/api/atproto" 9 lexutil "github.com/bluesky-social/indigo/lex/util" 10 "github.com/bluesky-social/indigo/util" 11 "github.com/ipfs/go-cid" 12 "gorm.io/gorm" 13) 14 15type XrpcStreamEvent struct { 16 CID string `json:"cid" gorm:"column:cid;primaryKey"` 17 RepoDID string `json:"repoDID" gorm:"index:idx_repo_timestamp,priority:1;index:idx_repo_seq,priority:1;column:repo_did"` 18 Timestamp time.Time `json:"timestamp" gorm:"index:idx_repo_timestamp,priority:2;column:timestamp"` 19 Data []byte `json:"data" gorm:"column:data"` 20 SignedData string `json:"signedData" gorm:"column:signed_data"` 21 Seq int64 `json:"seq" gorm:"index:idx_repo_seq,priority:2;column:seq"` 22} 23 24func (ev *XrpcStreamEvent) ToCommitEvent() (*comatproto.SyncSubscribeRepos_Commit, error) { 25 commit := &comatproto.SyncSubscribeRepos_Commit{} 26 err := commit.UnmarshalCBOR(bytes.NewReader(ev.Data)) 27 if err != nil { 28 return nil, err 29 } 30 return commit, nil 31} 32 33func (state *StatefulDB) CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit, signedData string) error { 34 prev, err := state.GetMostRecentCommitEvent(commit.Repo) 35 if err != nil { 36 return err 37 } 38 if prev != nil { 39 prevCommit, err := prev.ToCommitEvent() 40 if err != nil { 41 return err 42 } 43 commit.Seq = prevCommit.Seq + 1 44 c, err := cid.Parse(prev.SignedData) 45 if err != nil { 46 return err 47 } 48 ll := lexutil.LexLink(c) 49 commit.PrevData = &ll 50 commit.Since = &prevCommit.Rev 51 } else { 52 commit.Seq = 1 53 } 54 buf := bytes.Buffer{} 55 err = commit.MarshalCBOR(&buf) 56 if err != nil { 57 return err 58 } 59 timestamp, err := time.Parse(util.ISO8601, commit.Time) 60 if err != nil { 61 return err 62 } 63 event := &XrpcStreamEvent{ 64 CID: commit.Commit.String(), 65 RepoDID: commit.Repo, 66 Timestamp: timestamp.UTC(), 67 Data: buf.Bytes(), 68 Seq: commit.Seq, 69 SignedData: signedData, 70 } 71 return state.DB.Create(event).Error 72} 73 74func (state *StatefulDB) GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) { 75 var events []*XrpcStreamEvent 76 query := state.DB.Where("repo_did = ?", repoDID) 77 query = query.Where("timestamp > ?", t.UTC()) 78 err := query.Order("timestamp ASC").Find(&events).Error 79 if err != nil { 80 return nil, err 81 } 82 return events, nil 83} 84 85func (state *StatefulDB) GetCommitEventsSinceSeq(repoDID string, seq int64) ([]*XrpcStreamEvent, error) { 86 var events []*XrpcStreamEvent 87 query := state.DB.Where("repo_did = ?", repoDID) 88 query = query.Where("seq > ?", seq) 89 err := query.Order("timestamp ASC").Find(&events).Error 90 if err != nil { 91 return nil, err 92 } 93 return events, nil 94} 95 96func (state *StatefulDB) GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) { 97 var event XrpcStreamEvent 98 err := state.DB.Where("repo_did = ?", repoDID). 99 Order("timestamp DESC"). 100 Limit(1). 101 First(&event).Error 102 if errors.Is(err, gorm.ErrRecordNotFound) { 103 return nil, nil 104 } else if err != nil { 105 return nil, err 106 } 107 return &event, nil 108}