Live video on the AT Protocol

atproto: save lexicon commit events

+154 -12
+42 -2
pkg/atproto/lexicon_repo.go
··· 9 9 "io" 10 10 "io/fs" 11 11 "strings" 12 + "time" 12 13 14 + comatproto "github.com/bluesky-social/indigo/api/atproto" 13 15 atcrypto "github.com/bluesky-social/indigo/atproto/crypto" 14 16 "github.com/bluesky-social/indigo/atproto/data" 15 17 "github.com/bluesky-social/indigo/atproto/lexicon" ··· 18 20 "github.com/bluesky-social/indigo/models" 19 21 "github.com/bluesky-social/indigo/mst" 20 22 atrepo "github.com/bluesky-social/indigo/repo" 23 + "github.com/bluesky-social/indigo/util" 21 24 "github.com/ipfs/go-cid" 22 25 cbg "github.com/whyrusleeping/cbor-gen" 23 26 "gorm.io/driver/sqlite" ··· 27 30 "stream.place/streamplace/lexicons" 28 31 "stream.place/streamplace/pkg/config" 29 32 "stream.place/streamplace/pkg/log" 33 + "stream.place/streamplace/pkg/model" 30 34 ) 31 35 32 36 var LexiconRepo *atrepo.Repo 33 37 var LexiconPubMultibase string 34 38 var RepoUser models.Uid = models.Uid(1) 35 39 var CarStore carstore.CarStore 40 + var ActionCreate = "create" 41 + var ActionUpdate = "update" 42 + var ActionDelete = "delete" 36 43 37 44 func walkLexicons(ctx context.Context, bundle fs.FS, path string) ([][]byte, error) { 38 45 ret := [][]byte{} ··· 117 124 Close() error 118 125 } 119 126 120 - func MakeLexiconRepo(ctx context.Context, cli *config.CLI) (Closer, error) { 127 + func MakeLexiconRepo(ctx context.Context, cli *config.CLI, mod model.Model) (Closer, error) { 121 128 ctx = log.WithLogValues(ctx, "func", "MakeLexiconRepo") 122 129 fd, err := cli.DataFileCreate([]string{"carstore", "empty"}, true) 123 130 if err != nil { ··· 211 218 if err != nil { 212 219 return nil, fmt.Errorf("failed to walk lexicon files: %w", err) 213 220 } 221 + 222 + ops := []*comatproto.SyncSubscribeRepos_RepoOp{} 223 + 214 224 for _, lex := range lexs { 215 225 lexFile := lexicon.SchemaFile{} 216 226 err := json.Unmarshal(lex, &lexFile) ··· 226 236 if err != nil { 227 237 return nil, err 228 238 } 239 + cidLink := lexutil.LexLink(*newCid) 229 240 230 241 oldCid, _, err := LexiconRepo.GetRecord(ctx, rpath) 231 242 if errors.Is(err, mst.ErrNotFound) { ··· 234 245 return nil, err 235 246 } 236 247 log.Log(ctx, "created new lexicon record", "rpath", rpath, "cid", newCid.String()) 248 + ops = append(ops, &comatproto.SyncSubscribeRepos_RepoOp{ 249 + Action: ActionCreate, 250 + Path: rpath, 251 + Cid: &cidLink, 252 + }) 237 253 } else if err != nil { 238 254 return nil, err 239 255 } else { ··· 246 262 if err != nil { 247 263 return nil, err 248 264 } 265 + oldLink := lexutil.LexLink(oldCid) 266 + ops = append(ops, &comatproto.SyncSubscribeRepos_RepoOp{ 267 + Action: ActionUpdate, 268 + Path: rpath, 269 + Prev: &oldLink, 270 + Cid: &cidLink, 271 + }) 249 272 } 250 273 } 251 274 currentRoot, currentRev, err = LexiconRepo.Commit(ctx, signer) 252 275 if err != nil { 253 276 return nil, fmt.Errorf("failed to commit: %w", err) 254 277 } 278 + 255 279 log.Log(ctx, "LexiconRepo committed", "cid", currentRoot.String(), "rev", currentRev) 256 280 } 257 - _, err = ses.CloseWithRoot(ctx, currentRoot, currentRev) 281 + blocks, err := ses.CloseWithRoot(ctx, currentRoot, currentRev) 258 282 if err != nil { 259 283 return nil, fmt.Errorf("failed to close delta session: %w", err) 284 + } 285 + if len(ops) > 0 { 286 + commit := &comatproto.SyncSubscribeRepos_Commit{ 287 + Repo: cli.MyDID(), 288 + Blocks: blocks, 289 + Rev: currentRev, 290 + // Since: currentRev, 291 + Commit: lexutil.LexLink(currentRoot), 292 + Time: time.Now().Format(util.ISO8601), 293 + Ops: ops, 294 + TooBig: false, 295 + } 296 + err := mod.CreateCommitEvent(commit) 297 + if err != nil { 298 + return nil, fmt.Errorf("failed to create commit event: %w", err) 299 + } 260 300 } 261 301 262 302 return sqlDB, nil
+23 -5
pkg/atproto/lexicon_repo_test.go
··· 6 6 "io/fs" 7 7 "testing" 8 8 "testing/fstest" 9 + "time" 9 10 10 11 "github.com/stretchr/testify/require" 11 12 "stream.place/streamplace/lexicons" 12 13 "stream.place/streamplace/pkg/config" 14 + "stream.place/streamplace/pkg/model" 13 15 ) 14 16 15 17 func TestLexiconRepo(t *testing.T) { 16 - cli := config.CLI{} 18 + cli := config.CLI{ 19 + PublicHost: "example.com", 20 + } 17 21 cli.DataDir = t.TempDir() 22 + mod, err := model.MakeDB(":memory:") 23 + require.NoError(t, err) 18 24 19 25 // creating a new repo 20 - handle, err := MakeLexiconRepo(context.Background(), &cli) 26 + handle, err := MakeLexiconRepo(context.Background(), &cli, mod) 21 27 require.NoError(t, err) 22 28 r, sess, err := OpenLexiconRepo(context.Background()) 23 29 require.NoError(t, err) ··· 30 36 require.NotNil(t, rec) 31 37 handle.Close() 32 38 39 + evts, err := mod.GetCommitEventsSince(cli.MyDID(), time.Time{}) 40 + require.NoError(t, err) 41 + require.Len(t, evts, 1) 42 + require.Equal(t, evts[0].RepoDID, cli.MyDID()) 43 + 33 44 // opening an existing repo 34 - handle, err = MakeLexiconRepo(context.Background(), &cli) 45 + handle, err = MakeLexiconRepo(context.Background(), &cli, mod) 35 46 require.NoError(t, err) 36 47 handle.Close() 37 48 ··· 81 92 AllFiles = modifiedFS 82 93 83 94 // opening an existing repo with modified lexicon 84 - handle, err = MakeLexiconRepo(context.Background(), &cli) 95 + handle, err = MakeLexiconRepo(context.Background(), &cli, mod) 85 96 require.NoError(t, err) 86 97 handle.Close() 87 98 88 - // Now modifiedFS is a fs.FS with the first file modified 99 + evts, err = mod.GetCommitEventsSince(cli.MyDID(), time.Time{}) 100 + require.NoError(t, err) 101 + require.Len(t, evts, 2) 102 + require.Equal(t, evts[0].RepoDID, cli.MyDID()) 103 + require.Equal(t, evts[1].RepoDID, cli.MyDID()) 104 + commit, err := evts[1].ToCommitEvent() 105 + require.NoError(t, err) 106 + require.Equal(t, commit.Since, &evts[0].CID) 89 107 }
+5 -5
pkg/cmd/streamplace.go
··· 158 158 if err != nil { 159 159 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 160 160 } 161 - handle, err := atproto.MakeLexiconRepo(ctx, &cli) 162 - if err != nil { 163 - return err 164 - } 165 - defer handle.Close() 166 161 schema, err := v0.MakeV0Schema() 167 162 if err != nil { 168 163 return err ··· 252 247 if err != nil { 253 248 return err 254 249 } 250 + handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod) 251 + if err != nil { 252 + return err 253 + } 254 + defer handle.Close() 255 255 var noter notifications.FirebaseNotifier 256 256 if cli.FirebaseServiceAccount != "" { 257 257 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount)
+6
pkg/model/model.go
··· 8 8 "strings" 9 9 "time" 10 10 11 + comatproto "github.com/bluesky-social/indigo/api/atproto" 11 12 "github.com/bluesky-social/indigo/api/bsky" 12 13 "github.com/lmittmann/tint" 13 14 slogGorm "github.com/orandin/slog-gorm" ··· 96 97 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 97 98 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 98 99 DeleteServerSettings(ctx context.Context, server string, repoDID string) error 100 + 101 + CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit) error 102 + GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) 103 + GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) 99 104 } 100 105 101 106 func MakeDB(dbURL string) (Model, error) { ··· 156 161 ChatProfile{}, 157 162 oatproxy.OAuthSession{}, 158 163 ServerSettings{}, 164 + XrpcStreamEvent{}, 159 165 } { 160 166 err = db.AutoMigrate(model) 161 167 if err != nil {
+78
pkg/model/xrpc_stream_event.go
··· 1 + package model 2 + 3 + import ( 4 + "bytes" 5 + "errors" 6 + "time" 7 + 8 + comatproto "github.com/bluesky-social/indigo/api/atproto" 9 + "github.com/bluesky-social/indigo/util" 10 + "gorm.io/gorm" 11 + ) 12 + 13 + type XrpcStreamEvent struct { 14 + CID string `json:"cid" gorm:"primaryKey"` 15 + RepoDID string `json:"repoDID" gorm:"index:idx_repo_timestamp,priority:1;column:repo_did"` 16 + Timestamp time.Time `json:"timestamp" gorm:"index:idx_repo_timestamp,priority:2;column:timestamp"` 17 + Data []byte `json:"data"` 18 + } 19 + 20 + func (ev *XrpcStreamEvent) ToCommitEvent() (*comatproto.SyncSubscribeRepos_Commit, error) { 21 + commit := &comatproto.SyncSubscribeRepos_Commit{} 22 + err := commit.UnmarshalCBOR(bytes.NewReader(ev.Data)) 23 + if err != nil { 24 + return nil, err 25 + } 26 + return commit, nil 27 + } 28 + 29 + func (m *DBModel) CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit) error { 30 + prev, err := m.GetMostRecentCommitEvent(commit.Repo) 31 + if err != nil { 32 + return err 33 + } 34 + if prev != nil { 35 + commit.Since = &prev.CID 36 + } 37 + buf := bytes.Buffer{} 38 + err = commit.MarshalCBOR(&buf) 39 + if err != nil { 40 + return err 41 + } 42 + timestamp, err := time.Parse(util.ISO8601, commit.Time) 43 + if err != nil { 44 + return err 45 + } 46 + event := &XrpcStreamEvent{ 47 + CID: commit.Commit.String(), 48 + RepoDID: commit.Repo, 49 + Timestamp: timestamp.UTC(), 50 + Data: buf.Bytes(), 51 + } 52 + return m.DB.Create(event).Error 53 + } 54 + 55 + func (m *DBModel) GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) { 56 + var events []*XrpcStreamEvent 57 + query := m.DB.Where("repo_did = ?", repoDID) 58 + query = query.Where("timestamp > ?", t.UTC()) 59 + err := query.Order("timestamp ASC").Find(&events).Error 60 + if err != nil { 61 + return nil, err 62 + } 63 + return events, nil 64 + } 65 + 66 + func (m *DBModel) GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) { 67 + var event XrpcStreamEvent 68 + err := m.DB.Where("repo_did = ?", repoDID). 69 + Order("timestamp DESC"). 70 + Limit(1). 71 + First(&event).Error 72 + if errors.Is(err, gorm.ErrRecordNotFound) { 73 + return nil, nil 74 + } else if err != nil { 75 + return nil, err 76 + } 77 + return &event, nil 78 + }