Live video on the AT Protocol

Merge pull request #456 from streamplace/eli/postgres

statedb: migrate to a stateful database for stateful things

authored by Eli Mallon and committed by GitHub a28b5458 781146f4

+1302 -416
+19
.vscode/launch.json
···
··· 1 + { 2 + "version": "0.2.0", 3 + "configurations": [ 4 + { 5 + "name": "Run Streamplace (linux)", 6 + "type": "go", 7 + "request": "launch", 8 + "mode": "exec", 9 + "program": "${workspaceFolder}/build-linux-amd64/libstreamplace" 10 + }, 11 + { 12 + "name": "Run Streamplace (darwin)", 13 + "type": "go", 14 + "request": "launch", 15 + "mode": "exec", 16 + "program": "${workspaceFolder}/build-darwin-amd64/libstreamplace" 17 + } 18 + ] 19 + }
+4 -2
go.mod
··· 8 9 replace github.com/AxisCommunications/go-dpop => github.com/streamplace/go-dpop v0.0.0-20250510031900-c897158a8ad4 10 11 require ( 12 firebase.google.com/go/v4 v4.14.1 13 git.stream.place/streamplace/c2pa-go v0.7.0 ··· 53 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e 54 github.com/slok/go-http-metrics v0.13.0 55 github.com/streamplace/atproto-oauth-golang v0.0.0-20250619231223-a9c04fb888ac 56 - github.com/streamplace/oatproxy v0.0.0-20250722215839-1c878921d185 57 github.com/stretchr/testify v1.10.0 58 github.com/tdewolff/canvas v0.0.0-20250728095813-50d4cb1eee71 59 github.com/whyrusleeping/cbor-gen v0.3.1 ··· 72 golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da 73 google.golang.org/api v0.228.0 74 gorm.io/datatypes v1.2.4 75 gorm.io/driver/sqlite v1.5.7 76 ) 77 ··· 518 gopkg.in/warnings.v0 v0.1.2 // indirect 519 gopkg.in/yaml.v2 v2.4.0 // indirect 520 gorm.io/driver/mysql v1.5.6 // indirect 521 - gorm.io/driver/postgres v1.5.7 // indirect 522 gotest.tools/v3 v3.5.2 // indirect 523 honnef.co/go/tools v0.6.1 // indirect 524 howett.net/plist v1.0.0 // indirect
··· 8 9 replace github.com/AxisCommunications/go-dpop => github.com/streamplace/go-dpop v0.0.0-20250510031900-c897158a8ad4 10 11 + replace github.com/bluesky-social/indigo => github.com/streamplace/indigo v0.0.0-20250813192504-b19ccd82854b 12 + 13 require ( 14 firebase.google.com/go/v4 v4.14.1 15 git.stream.place/streamplace/c2pa-go v0.7.0 ··· 55 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e 56 github.com/slok/go-http-metrics v0.13.0 57 github.com/streamplace/atproto-oauth-golang v0.0.0-20250619231223-a9c04fb888ac 58 + github.com/streamplace/oatproxy v0.0.0-20250813225644-39354b83af26 59 github.com/stretchr/testify v1.10.0 60 github.com/tdewolff/canvas v0.0.0-20250728095813-50d4cb1eee71 61 github.com/whyrusleeping/cbor-gen v0.3.1 ··· 74 golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da 75 google.golang.org/api v0.228.0 76 gorm.io/datatypes v1.2.4 77 + gorm.io/driver/postgres v1.5.7 78 gorm.io/driver/sqlite v1.5.7 79 ) 80 ··· 521 gopkg.in/warnings.v0 v0.1.2 // indirect 522 gopkg.in/yaml.v2 v2.4.0 // indirect 523 gorm.io/driver/mysql v1.5.6 // indirect 524 gotest.tools/v3 v3.5.2 // indirect 525 honnef.co/go/tools v0.6.1 // indirect 526 howett.net/plist v1.0.0 // indirect
+4 -2
go.sum
··· 210 github.com/bluenviron/gortsplib/v4 v4.12.3/go.mod h1:SkZPdaMNr+IvHt2PKRjUXxZN6FDutmSZn4eT0GmF0sk= 211 github.com/bluenviron/mediacommon/v2 v2.4.0 h1:Ss1T7AMxTrICJ+a/N5urS/1lp1ZpsF+3iJq3B/RLDMw= 212 github.com/bluenviron/mediacommon/v2 v2.4.0/go.mod h1:a6MbPmXtYda9mKibKVMZlW20GYLLrX2R7ZkUE+1pwV0= 213 - github.com/bluesky-social/indigo v0.0.0-20250729223159-573ae927246a h1:S12KN45uIkRglMHC8PqD/Vsz0+u3KbIaBF/6rit8/Pg= 214 - github.com/bluesky-social/indigo v0.0.0-20250729223159-573ae927246a/go.mod h1:0XUyOCRtL4/OiyeqMTmr6RlVHQMDgw3LS7CfibuZR5Q= 215 github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= 216 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= 217 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= ··· 1279 github.com/streamplace/atproto-oauth-golang v0.0.0-20250619231223-a9c04fb888ac/go.mod h1:9LlKkqciiO5lRfbX0n4Wn5KNY9nvFb4R3by8FdW2TWc= 1280 github.com/streamplace/go-dpop v0.0.0-20250510031900-c897158a8ad4 h1:L1fS4HJSaAyNnkwfuZubgfeZy8rkWmA0cMtH5Z0HqNc= 1281 github.com/streamplace/go-dpop v0.0.0-20250510031900-c897158a8ad4/go.mod h1:bGUXY9Wd4mnd+XUrOYZr358J2f6z9QO/dLhL1SsiD+0= 1282 github.com/streamplace/oatproxy v0.0.0-20250722215839-1c878921d185 h1:knTi1I/zbppb+CmbDRmPKNgqO4QPOKdZBds63mMWTms= 1283 github.com/streamplace/oatproxy v0.0.0-20250722215839-1c878921d185/go.mod h1:pXi24hA7xBHj8eEywX6wGqJOR9FaEYlGwQ/72rN6okw= 1284 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= 1285 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= 1286 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
··· 210 github.com/bluenviron/gortsplib/v4 v4.12.3/go.mod h1:SkZPdaMNr+IvHt2PKRjUXxZN6FDutmSZn4eT0GmF0sk= 211 github.com/bluenviron/mediacommon/v2 v2.4.0 h1:Ss1T7AMxTrICJ+a/N5urS/1lp1ZpsF+3iJq3B/RLDMw= 212 github.com/bluenviron/mediacommon/v2 v2.4.0/go.mod h1:a6MbPmXtYda9mKibKVMZlW20GYLLrX2R7ZkUE+1pwV0= 213 github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= 214 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= 215 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= ··· 1277 github.com/streamplace/atproto-oauth-golang v0.0.0-20250619231223-a9c04fb888ac/go.mod h1:9LlKkqciiO5lRfbX0n4Wn5KNY9nvFb4R3by8FdW2TWc= 1278 github.com/streamplace/go-dpop v0.0.0-20250510031900-c897158a8ad4 h1:L1fS4HJSaAyNnkwfuZubgfeZy8rkWmA0cMtH5Z0HqNc= 1279 github.com/streamplace/go-dpop v0.0.0-20250510031900-c897158a8ad4/go.mod h1:bGUXY9Wd4mnd+XUrOYZr358J2f6z9QO/dLhL1SsiD+0= 1280 + github.com/streamplace/indigo v0.0.0-20250813192504-b19ccd82854b h1:vh0Z1bDgcVXQjgmcXOBkC3B4uWi+HaxeyTvETzcA/Vg= 1281 + github.com/streamplace/indigo v0.0.0-20250813192504-b19ccd82854b/go.mod h1:n6QE1NDPFoi7PRbMUZmc2y7FibCqiVU4ePpsvhHUBR8= 1282 github.com/streamplace/oatproxy v0.0.0-20250722215839-1c878921d185 h1:knTi1I/zbppb+CmbDRmPKNgqO4QPOKdZBds63mMWTms= 1283 github.com/streamplace/oatproxy v0.0.0-20250722215839-1c878921d185/go.mod h1:pXi24hA7xBHj8eEywX6wGqJOR9FaEYlGwQ/72rN6okw= 1284 + github.com/streamplace/oatproxy v0.0.0-20250813225644-39354b83af26 h1:tbGie3B0tbjp47BnG0sAhjlYLZXHBbEEW8chdbcbrn4= 1285 + github.com/streamplace/oatproxy v0.0.0-20250813225644-39354b83af26/go.mod h1:pXi24hA7xBHj8eEywX6wGqJOR9FaEYlGwQ/72rN6okw= 1286 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= 1287 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= 1288 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+6 -3
pkg/api/api.go
··· 41 "stream.place/streamplace/pkg/notifications" 42 "stream.place/streamplace/pkg/spmetrics" 43 "stream.place/streamplace/pkg/spxrpc" 44 "stream.place/streamplace/pkg/streamplace" 45 46 metrics "github.com/slok/go-http-metrics/metrics/prometheus" ··· 53 type StreamplaceAPI struct { 54 CLI *config.CLI 55 Model model.Model 56 Updater *Updater 57 Signer *eip712.EIP712Signer 58 Mimes map[string]string ··· 80 mu sync.RWMutex 81 } 82 83 - func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 84 updater, err := PrepareUpdater(cli) 85 if err != nil { 86 return nil, err 87 } 88 a := &StreamplaceAPI{CLI: cli, 89 Model: mod, 90 Updater: updater, 91 Signer: signer, 92 FirebaseNotifier: noter, ··· 135 Recorder: metrics.NewRecorder(metrics.Config{}), 136 }) 137 var xrpc http.Handler 138 - xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op, mdlw, a.ATSync) 139 if err != nil { 140 return nil, err 141 } ··· 473 w.WriteHeader(400) 474 return 475 } 476 - err = a.Model.CreateNotification(n.Token, n.RepoDID) 477 if err != nil { 478 log.Log(ctx, "error creating notification", "error", err) 479 w.WriteHeader(400)
··· 41 "stream.place/streamplace/pkg/notifications" 42 "stream.place/streamplace/pkg/spmetrics" 43 "stream.place/streamplace/pkg/spxrpc" 44 + "stream.place/streamplace/pkg/statedb" 45 "stream.place/streamplace/pkg/streamplace" 46 47 metrics "github.com/slok/go-http-metrics/metrics/prometheus" ··· 54 type StreamplaceAPI struct { 55 CLI *config.CLI 56 Model model.Model 57 + StatefulDB *statedb.StatefulDB 58 Updater *Updater 59 Signer *eip712.EIP712Signer 60 Mimes map[string]string ··· 82 mu sync.RWMutex 83 } 84 85 + func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 86 updater, err := PrepareUpdater(cli) 87 if err != nil { 88 return nil, err 89 } 90 a := &StreamplaceAPI{CLI: cli, 91 Model: mod, 92 + StatefulDB: statefulDB, 93 Updater: updater, 94 Signer: signer, 95 FirebaseNotifier: noter, ··· 138 Recorder: metrics.NewRecorder(metrics.Config{}), 139 }) 140 var xrpc http.Handler 141 + xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync) 142 if err != nil { 143 return nil, err 144 } ··· 476 w.WriteHeader(400) 477 return 478 } 479 + err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID) 480 if err != nil { 481 log.Log(ctx, "error creating notification", "error", err) 482 w.WriteHeader(400)
+3 -3
pkg/api/api_internal.go
··· 379 }) 380 381 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 382 - notifications, err := a.Model.ListNotifications() 383 if err != nil { 384 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 385 return ··· 437 }) 438 439 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 440 - sessions, err := a.Model.ListOAuthSessions() 441 if err != nil { 442 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 443 return ··· 458 errors.WriteHTTPBadRequest(w, "invalid request body", err) 459 return 460 } 461 - notifications, err := a.Model.ListNotifications() 462 if err != nil { 463 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 464 return
··· 379 }) 380 381 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 382 + notifications, err := a.StatefulDB.ListNotifications() 383 if err != nil { 384 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 385 return ··· 437 }) 438 439 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 440 + sessions, err := a.StatefulDB.ListOAuthSessions() 441 if err != nil { 442 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 443 return ··· 458 errors.WriteHTTPBadRequest(w, "invalid request body", err) 459 return 460 } 461 + notifications, err := a.StatefulDB.ListNotifications() 462 if err != nil { 463 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 464 return
+8 -6
pkg/atproto/firehose.go
··· 25 "stream.place/streamplace/pkg/log" 26 "stream.place/streamplace/pkg/model" 27 notificationpkg "stream.place/streamplace/pkg/notifications" 28 29 "slices" 30 ··· 32 ) 33 34 type ATProtoSynchronizer struct { 35 - CLI *config.CLI 36 - Model model.Model 37 - LastSeen time.Time 38 - LastEvent time.Time 39 - Noter notificationpkg.FirebaseNotifier 40 - Bus *bus.Bus 41 } 42 43 func (atsync *ATProtoSynchronizer) StartFirehose(ctx context.Context) error {
··· 25 "stream.place/streamplace/pkg/log" 26 "stream.place/streamplace/pkg/model" 27 notificationpkg "stream.place/streamplace/pkg/notifications" 28 + "stream.place/streamplace/pkg/statedb" 29 30 "slices" 31 ··· 33 ) 34 35 type ATProtoSynchronizer struct { 36 + CLI *config.CLI 37 + Model model.Model 38 + StatefulDB *statedb.StatefulDB 39 + LastSeen time.Time 40 + LastEvent time.Time 41 + Noter notificationpkg.FirebaseNotifier 42 + Bus *bus.Bus 43 } 44 45 func (atsync *ATProtoSynchronizer) StartFirehose(ctx context.Context) error {
-45
pkg/atproto/jwks.go
··· 1 - package atproto 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "os" 7 - 8 - "github.com/lestrrat-go/jwx/v2/jwk" 9 - oauth_helpers "github.com/streamplace/atproto-oauth-golang/helpers" 10 - "stream.place/streamplace/pkg/log" 11 - ) 12 - 13 - func EnsureJWK(ctx context.Context, fPath string) (jwk.Key, error) { 14 - var key jwk.Key 15 - _, err := os.Stat(fPath) 16 - if err == nil { 17 - b, err := os.ReadFile(fPath) 18 - if err != nil { 19 - return nil, err 20 - } 21 - key, err = jwk.ParseKey(b) 22 - if err != nil { 23 - return nil, err 24 - } 25 - } else if os.IsNotExist(err) { 26 - key, err = oauth_helpers.GenerateKey(nil) 27 - if err != nil { 28 - return nil, err 29 - } 30 - 31 - b, err := json.Marshal(key) 32 - if err != nil { 33 - return nil, err 34 - } 35 - 36 - if err := os.WriteFile(fPath, b, 0600); err != nil { 37 - return nil, err 38 - } 39 - log.Log(ctx, "generated JWK", "path", fPath) 40 - } else { 41 - return nil, err 42 - } 43 - 44 - return key, nil 45 - }
···
+62 -47
pkg/atproto/lexicon_repo.go
··· 23 "github.com/bluesky-social/indigo/util" 24 "github.com/ipfs/go-cid" 25 cbg "github.com/whyrusleeping/cbor-gen" 26 - "gorm.io/driver/sqlite" 27 - "gorm.io/gorm" 28 29 "github.com/whyrusleeping/go-did" 30 "stream.place/streamplace/lexicons" 31 "stream.place/streamplace/pkg/config" 32 "stream.place/streamplace/pkg/log" 33 "stream.place/streamplace/pkg/model" 34 ) 35 36 var LexiconRepo *atrepo.Repo ··· 124 Close() error 125 } 126 127 - func MakeLexiconRepo(ctx context.Context, cli *config.CLI, mod model.Model) (Closer, error) { 128 ctx = log.WithLogValues(ctx, "func", "MakeLexiconRepo") 129 - fd, err := cli.DataFileCreate([]string{"carstore", "empty"}, true) 130 - if err != nil { 131 - return nil, err 132 - } 133 - sqlitePath := cli.DataFilePath([]string{"carstore", "meta.sqlite"}) 134 135 - db, err := gorm.Open(sqlite.Open(sqlitePath)) 136 - if err != nil { 137 - return nil, err 138 - } 139 - err = fd.Close() 140 - if err != nil { 141 - return nil, err 142 - } 143 - CarStore, err = carstore.NewCarStore(db, []string{ 144 - cli.DataFilePath([]string{"carstore"}), 145 - }) 146 - if err != nil { 147 - return nil, err 148 - } 149 150 - sqlDB, err := db.DB() 151 if err != nil { 152 return nil, err 153 } 154 155 var priv *atcrypto.PrivateKeyK256 156 - exists, err := cli.DataFileExists([]string{"carstore", "repo.key"}) 157 if err != nil { 158 return nil, err 159 } 160 - if exists { 161 - buf := bytes.Buffer{} 162 - err := cli.DataFileRead([]string{"carstore", "repo.key"}, &buf) 163 if err != nil { 164 - return nil, err 165 } 166 - priv, err = atcrypto.ParsePrivateBytesK256(buf.Bytes()) 167 if err != nil { 168 return nil, err 169 } 170 - } else { 171 - priv, err = atcrypto.GeneratePrivateKeyK256() 172 - if err != nil { 173 - return nil, err 174 } 175 bs := priv.Bytes() 176 - err = cli.DataFileWrite([]string{"carstore", "repo.key"}, bytes.NewReader(bs), true) 177 if err != nil { 178 - return nil, err 179 } 180 } 181 ··· 189 return priv.HashAndSign(sb) 190 } 191 192 - ses, err := CarStore.NewDeltaSession(ctx, RepoUser, nil) 193 if err != nil { 194 - return nil, fmt.Errorf("failed to create delta session: %w", err) 195 } 196 197 - currentRoot, err := CarStore.GetUserRepoHead(ctx, RepoUser) 198 - if err != nil { 199 - return nil, fmt.Errorf("failed to get user repo head: %w", err) 200 } 201 - currentRev := "" 202 203 if currentRoot == cid.Undef { 204 LexiconRepo = atrepo.NewRepo(ctx, cli.MyDID(), ses) 205 } else { 206 LexiconRepo, err = atrepo.OpenRepo(ctx, ses, currentRoot) 207 if err != nil { 208 return nil, fmt.Errorf("failed to open repo: %w", err) 209 - } 210 - currentRev, err = CarStore.GetUserRepoRev(ctx, RepoUser) 211 - if err != nil { 212 - return nil, fmt.Errorf("failed to get user repo rev: %w", err) 213 } 214 } 215 ··· 294 Ops: ops, 295 TooBig: false, 296 } 297 - err := mod.CreateCommitEvent(commit, signed.Data.String()) 298 if err != nil { 299 return nil, fmt.Errorf("failed to create commit event: %w", err) 300 } 301 } 302 303 - return sqlDB, nil 304 } 305 306 func OpenLexiconRepo(ctx context.Context) (*atrepo.Repo, *carstore.DeltaSession, error) {
··· 23 "github.com/bluesky-social/indigo/util" 24 "github.com/ipfs/go-cid" 25 cbg "github.com/whyrusleeping/cbor-gen" 26 27 "github.com/whyrusleeping/go-did" 28 "stream.place/streamplace/lexicons" 29 "stream.place/streamplace/pkg/config" 30 "stream.place/streamplace/pkg/log" 31 "stream.place/streamplace/pkg/model" 32 + "stream.place/streamplace/pkg/statedb" 33 ) 34 35 var LexiconRepo *atrepo.Repo ··· 123 Close() error 124 } 125 126 + type NoopCloser struct{} 127 + 128 + func (c *NoopCloser) Close() error { 129 + return nil 130 + } 131 + 132 + func MakeLexiconRepo(ctx context.Context, cli *config.CLI, mod model.Model, state *statedb.StatefulDB) (Closer, error) { 133 ctx = log.WithLogValues(ctx, "func", "MakeLexiconRepo") 134 + var err error 135 136 + sqliteStore := &carstore.SQLiteStore{} 137 138 + err = sqliteStore.Open(":memory:") 139 if err != nil { 140 return nil, err 141 } 142 + CarStore = sqliteStore 143 144 var priv *atcrypto.PrivateKeyK256 145 + 146 + keyBs, err := state.GetConfig("repo-key") 147 if err != nil { 148 return nil, err 149 } 150 + if keyBs != nil { 151 + // good path: we have a key in the stateful database 152 + priv, err = atcrypto.ParsePrivateBytesK256(keyBs.Value) 153 if err != nil { 154 + return nil, fmt.Errorf("failed to parse repo key from stateful database: %w", err) 155 } 156 + } else { 157 + // migration path: maybe we have an old one on disk. 158 + exists, err := cli.DataFileExists([]string{"carstore", "repo.key"}) 159 if err != nil { 160 return nil, err 161 } 162 + if exists { 163 + log.Warn(ctx, "found old repo key on disk, migrating to stateful database", "path", cli.DataFilePath([]string{"carstore", "repo.key"})) 164 + buf := bytes.Buffer{} 165 + err := cli.DataFileRead([]string{"carstore", "repo.key"}, &buf) 166 + if err != nil { 167 + return nil, err 168 + } 169 + priv, err = atcrypto.ParsePrivateBytesK256(buf.Bytes()) 170 + if err != nil { 171 + return nil, fmt.Errorf("failed to read repo key from disk: %w", err) 172 + } 173 + } else { 174 + priv, err = atcrypto.GeneratePrivateKeyK256() 175 + if err != nil { 176 + return nil, err 177 + } 178 } 179 bs := priv.Bytes() 180 + err = state.PutConfig("repo-key", bs) 181 if err != nil { 182 + return nil, fmt.Errorf("failed to save repo key to stateful database: %w", err) 183 } 184 } 185 ··· 193 return priv.HashAndSign(sb) 194 } 195 196 + events, err := state.GetCommitEventsSince(cli.MyDID(), time.Time{}) 197 if err != nil { 198 + return nil, fmt.Errorf("failed to get commit events: %w", err) 199 } 200 201 + var ses *carstore.DeltaSession 202 + var currentRoot cid.Cid 203 + var currentRev string 204 + 205 + for _, event := range events { 206 + evt, err := event.ToCommitEvent() 207 + if err != nil { 208 + return nil, fmt.Errorf("failed to convert event to commit event: %w", err) 209 + } 210 + currentRoot, ses, err = CarStore.ImportSlice(ctx, RepoUser, nil, evt.Blocks) 211 + if err != nil { 212 + return nil, fmt.Errorf("failed to import slice: %w", err) 213 + } 214 + currentRev = evt.Rev 215 } 216 217 if currentRoot == cid.Undef { 218 + log.Warn(ctx, "no existing lexicon repo, creating new one") 219 + ses, err = CarStore.NewDeltaSession(ctx, RepoUser, nil) 220 + if err != nil { 221 + return nil, fmt.Errorf("failed to create delta session: %w", err) 222 + } 223 LexiconRepo = atrepo.NewRepo(ctx, cli.MyDID(), ses) 224 } else { 225 LexiconRepo, err = atrepo.OpenRepo(ctx, ses, currentRoot) 226 if err != nil { 227 return nil, fmt.Errorf("failed to open repo: %w", err) 228 } 229 } 230 ··· 309 Ops: ops, 310 TooBig: false, 311 } 312 + err := state.CreateCommitEvent(commit, signed.Data.String()) 313 if err != nil { 314 return nil, fmt.Errorf("failed to create commit event: %w", err) 315 } 316 } 317 318 + return &NoopCloser{}, nil 319 } 320 321 func OpenLexiconRepo(ctx context.Context) (*atrepo.Repo, *carstore.DeltaSession, error) {
+9 -5
pkg/atproto/lexicon_repo_test.go
··· 11 "stream.place/streamplace/lexicons" 12 "stream.place/streamplace/pkg/config" 13 "stream.place/streamplace/pkg/model" 14 ) 15 16 func TestLexiconRepo(t *testing.T) { 17 cli := config.CLI{ 18 PublicHost: "example.com", 19 } 20 cli.DataDir = t.TempDir() 21 mod, err := model.MakeDB(":memory:") 22 require.NoError(t, err) 23 24 // creating a new repo 25 - handle, err := MakeLexiconRepo(context.Background(), &cli, mod) 26 require.NoError(t, err) 27 r, sess, err := OpenLexiconRepo(context.Background()) 28 require.NoError(t, err) ··· 35 require.NotNil(t, rec) 36 handle.Close() 37 38 - evts, err := mod.GetCommitEventsSinceSeq(cli.MyDID(), 0) 39 require.NoError(t, err) 40 require.Len(t, evts, 1) 41 require.Equal(t, evts[0].RepoDID, cli.MyDID()) 42 43 // opening an existing repo 44 - handle, err = MakeLexiconRepo(context.Background(), &cli, mod) 45 require.NoError(t, err) 46 handle.Close() 47 ··· 91 AllFiles = modifiedFS 92 93 // opening an existing repo with modified lexicon 94 - handle, err = MakeLexiconRepo(context.Background(), &cli, mod) 95 require.NoError(t, err) 96 handle.Close() 97 98 - evts, err = mod.GetCommitEventsSinceSeq(cli.MyDID(), 0) 99 require.NoError(t, err) 100 require.Len(t, evts, 2) 101 require.Equal(t, evts[0].RepoDID, cli.MyDID())
··· 11 "stream.place/streamplace/lexicons" 12 "stream.place/streamplace/pkg/config" 13 "stream.place/streamplace/pkg/model" 14 + "stream.place/streamplace/pkg/statedb" 15 ) 16 17 func TestLexiconRepo(t *testing.T) { 18 cli := config.CLI{ 19 PublicHost: "example.com", 20 + DBURL: ":memory:", 21 } 22 cli.DataDir = t.TempDir() 23 mod, err := model.MakeDB(":memory:") 24 require.NoError(t, err) 25 + state, err := statedb.MakeDB(&cli, nil, mod) 26 + require.NoError(t, err) 27 28 // creating a new repo 29 + handle, err := MakeLexiconRepo(context.Background(), &cli, mod, state) 30 require.NoError(t, err) 31 r, sess, err := OpenLexiconRepo(context.Background()) 32 require.NoError(t, err) ··· 39 require.NotNil(t, rec) 40 handle.Close() 41 42 + evts, err := state.GetCommitEventsSinceSeq(cli.MyDID(), 0) 43 require.NoError(t, err) 44 require.Len(t, evts, 1) 45 require.Equal(t, evts[0].RepoDID, cli.MyDID()) 46 47 // opening an existing repo 48 + handle, err = MakeLexiconRepo(context.Background(), &cli, mod, state) 49 require.NoError(t, err) 50 handle.Close() 51 ··· 95 AllFiles = modifiedFS 96 97 // opening an existing repo with modified lexicon 98 + handle, err = MakeLexiconRepo(context.Background(), &cli, mod, state) 99 require.NoError(t, err) 100 handle.Close() 101 102 + evts, err = state.GetCommitEventsSinceSeq(cli.MyDID(), 0) 103 require.NoError(t, err) 104 require.Len(t, evts, 2) 105 require.Equal(t, evts[0].RepoDID, cli.MyDID())
+38 -72
pkg/atproto/sync.go
··· 11 "github.com/bluesky-social/indigo/atproto/data" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "stream.place/streamplace/pkg/aqtime" 14 - "stream.place/streamplace/pkg/integrations/discord" 15 "stream.place/streamplace/pkg/log" 16 "stream.place/streamplace/pkg/model" 17 - notificationpkg "stream.place/streamplace/pkg/notifications" 18 "stream.place/streamplace/pkg/streamplace" 19 20 lexutil "github.com/bluesky-social/indigo/lex/util" ··· 88 return fmt.Errorf("failed to sync bluesky repo: %w", err) 89 } 90 91 - _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 92 - if err != nil { 93 - log.Error(ctx, "failed to sync bluesky repo", "err", err) 94 - } 95 96 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 97 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID) ··· 134 go atsync.Bus.Publish(rec.Streamer, scm) 135 136 if !isUpdate && !isFirstSync { 137 - for _, webhook := range atsync.CLI.DiscordWebhooks { 138 - if webhook.DID == rec.Streamer && webhook.Type == "chat" { 139 - go func() { 140 - err := discord.SendChat(ctx, webhook, repo, scm) 141 - if err != nil { 142 - log.Error(ctx, "failed to send livestream to discord", "err", err) 143 - } else { 144 - log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL) 145 - } 146 - }() 147 - } 148 } 149 } 150 ··· 304 } 305 306 case *streamplace.Livestream: 307 - var u string 308 - if rec.Url != nil { 309 - u = *rec.Url 310 - } 311 if r == nil { 312 // we don't know about this repo 313 return nil ··· 342 } 343 go atsync.Bus.Publish(userDID, lsv) 344 345 - if !isUpdate && !isFirstSync { 346 - log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", u, "createdAt", rec.CreatedAt, "repo", userDID) 347 - notifications, err := atsync.Model.GetFollowersNotificationTokens(userDID) 348 if err != nil { 349 - return err 350 - } 351 - 352 - nb := &notificationpkg.NotificationBlast{ 353 - Title: fmt.Sprintf("🔴 @%s is LIVE!", r.Handle), 354 - Body: rec.Title, 355 - Data: map[string]string{ 356 - "path": fmt.Sprintf("/%s", r.Handle), 357 - }, 358 - } 359 - if atsync.Noter != nil { 360 - err := atsync.Noter.Blast(ctx, notifications, nb) 361 - if err != nil { 362 - log.Error(ctx, "failed to blast notifications", "err", err) 363 - } else { 364 - log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb) 365 - } 366 - } else { 367 - log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications), "content", nb) 368 } 369 370 - var postView *bsky.FeedDefs_PostView 371 - if lsHydrated.Post != nil { 372 - postView, err = lsHydrated.Post.ToBskyPostView() 373 - if err != nil { 374 - log.Error(ctx, "failed to convert livestream post to bsky post view", "err", err) 375 - } 376 - } else { 377 - log.Warn(ctx, "no post found for livestream", "livestream", lsHydrated) 378 - } 379 380 - var spcp *streamplace.ChatProfile 381 - cp, err := atsync.Model.GetChatProfile(ctx, userDID) 382 if err != nil { 383 - log.Error(ctx, "failed to get chat profile", "err", err) 384 - } 385 - if cp != nil { 386 - spcp, err = cp.ToStreamplaceChatProfile() 387 - if err != nil { 388 - log.Error(ctx, "failed to convert chat profile to streamplace chat profile", "err", err) 389 - } 390 } 391 392 - for _, webhook := range atsync.CLI.DiscordWebhooks { 393 - if webhook.DID == userDID && webhook.Type == "livestream" { 394 - go func() { 395 - err := discord.SendLivestream(ctx, webhook, r, lsv, postView, spcp) 396 - if err != nil { 397 - log.Error(ctx, "failed to send livestream to discord", "err", err) 398 - } else { 399 - log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL) 400 - } 401 - }() 402 - } 403 } 404 } 405
··· 11 "github.com/bluesky-social/indigo/atproto/data" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "stream.place/streamplace/pkg/aqtime" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/model" 16 + "stream.place/streamplace/pkg/statedb" 17 "stream.place/streamplace/pkg/streamplace" 18 19 lexutil "github.com/bluesky-social/indigo/lex/util" ··· 87 return fmt.Errorf("failed to sync bluesky repo: %w", err) 88 } 89 90 + go func() { 91 + _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 92 + if err != nil { 93 + log.Error(ctx, "failed to sync bluesky repo", "err", err) 94 + } 95 + }() 96 97 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 98 block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID) ··· 135 go atsync.Bus.Publish(rec.Streamer, scm) 136 137 if !isUpdate && !isFirstSync { 138 + 139 + task := &statedb.ChatTask{ 140 + MessageView: scm, 141 + } 142 + 143 + _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskChat, task, statedb.WithTaskKey(fmt.Sprintf("chat-message::%s", aturi.String()))) 144 + if err != nil { 145 + log.Error(ctx, "failed to enqueue notification task", "err", err) 146 } 147 } 148 ··· 302 } 303 304 case *streamplace.Livestream: 305 if r == nil { 306 // we don't know about this repo 307 return nil ··· 336 } 337 go atsync.Bus.Publish(userDID, lsv) 338 339 + var postView *bsky.FeedDefs_PostView 340 + if lsHydrated.Post != nil { 341 + postView, err = lsHydrated.Post.ToBskyPostView() 342 if err != nil { 343 + return fmt.Errorf("failed to convert livestream post to bsky post view: %w", err) 344 } 345 + } 346 347 + task := &statedb.NotificationTask{ 348 + Livestream: lsv, 349 + FeedPost: postView, 350 + PDSURL: r.PDS, 351 + } 352 353 + cp, err := atsync.Model.GetChatProfile(ctx, userDID) 354 + if err != nil { 355 + return fmt.Errorf("failed to get chat profile: %w", err) 356 + } 357 + if cp != nil { 358 + spcp, err := cp.ToStreamplaceChatProfile() 359 if err != nil { 360 + return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 361 } 362 + task.ChatProfile = spcp 363 + } 364 365 + if !isUpdate && !isFirstSync { 366 + _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", aturi.String()))) 367 + if err != nil { 368 + log.Error(ctx, "failed to enqueue notification task", "err", err) 369 } 370 } 371
+40 -19
pkg/cmd/streamplace.go
··· 16 "syscall" 17 "time" 18 19 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 20 "github.com/peterbourgon/ff/v3" 21 "github.com/streamplace/oatproxy/pkg/oatproxy" ··· 35 "stream.place/streamplace/pkg/rtmps" 36 v0 "stream.place/streamplace/pkg/schema/v0" 37 "stream.place/streamplace/pkg/spmetrics" 38 39 "github.com/ThalesGroup/crypto11" 40 _ "github.com/go-gst/go-glib/glib" ··· 192 if *version { 193 return nil 194 } 195 spmetrics.Version.WithLabelValues(build.Version).Inc() 196 if cli.LivepeerHelp { 197 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) ··· 302 signer = hwsigner 303 } 304 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 305 - mod, err := model.MakeDB(cli.DBPath) 306 - if err != nil { 307 - return err 308 - } 309 - handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod) 310 if err != nil { 311 return err 312 } 313 - defer handle.Close() 314 var noter notifications.FirebaseNotifier 315 if cli.FirebaseServiceAccount != "" { 316 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) ··· 318 return err 319 } 320 } 321 322 - jwkPath := cli.DataFilePath([]string{"jwk.json"}) 323 - jwk, err := atproto.EnsureJWK(ctx, jwkPath) 324 if err != nil { 325 return err 326 } 327 cli.JWK = jwk 328 329 - accessJWKPath := cli.DataFilePath([]string{"access-jwk.json"}) 330 - accessJWK, err := atproto.EnsureJWK(ctx, accessJWKPath) 331 if err != nil { 332 return err 333 } ··· 335 336 b := bus.NewBus() 337 atsync := &atproto.ATProtoSynchronizer{ 338 - CLI: &cli, 339 - Model: mod, 340 - Noter: noter, 341 - Bus: b, 342 } 343 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 344 if err != nil { ··· 361 362 op := oatproxy.New(&oatproxy.Config{ 363 Host: cli.PublicHost, 364 - CreateOAuthSession: mod.CreateOAuthSession, 365 - UpdateOAuthSession: mod.UpdateOAuthSession, 366 - GetOAuthSession: mod.LoadOAuthSession, 367 Scope: "atproto transition:generic", 368 UpstreamJWK: cli.JWK, 369 DownstreamJWK: cli.AccessJWK, 370 ClientMetadata: clientMetadata, 371 }) 372 - d := director.NewDirector(mm, mod, &cli, b, op) 373 - a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d, op) 374 if err != nil { 375 return err 376 } ··· 380 381 group.Go(func() error { 382 return handleSignals(ctx) 383 }) 384 385 if cli.TracingEndpoint != "" {
··· 16 "syscall" 17 "time" 18 19 + "github.com/bluesky-social/indigo/carstore" 20 "github.com/livepeer/go-livepeer/cmd/livepeer/starter" 21 "github.com/peterbourgon/ff/v3" 22 "github.com/streamplace/oatproxy/pkg/oatproxy" ··· 36 "stream.place/streamplace/pkg/rtmps" 37 v0 "stream.place/streamplace/pkg/schema/v0" 38 "stream.place/streamplace/pkg/spmetrics" 39 + "stream.place/streamplace/pkg/statedb" 40 41 "github.com/ThalesGroup/crypto11" 42 _ "github.com/go-gst/go-glib/glib" ··· 194 if *version { 195 return nil 196 } 197 + 198 + if len(os.Args) > 1 && os.Args[1] == "migrate" { 199 + return statedb.Migrate(&cli) 200 + } 201 + 202 spmetrics.Version.WithLabelValues(build.Version).Inc() 203 if cli.LivepeerHelp { 204 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) ··· 309 signer = hwsigner 310 } 311 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 312 + 313 + mod, err := model.MakeDB(cli.IndexDBPath) 314 if err != nil { 315 return err 316 } 317 var noter notifications.FirebaseNotifier 318 if cli.FirebaseServiceAccount != "" { 319 noter, err = notifications.MakeFirebaseNotifier(ctx, cli.FirebaseServiceAccount) ··· 321 return err 322 } 323 } 324 + out := carstore.SQLiteStore{} 325 + err = out.Open(":memory:") 326 + if err != nil { 327 + return err 328 + } 329 + state, err := statedb.MakeDB(&cli, noter, mod) 330 + if err != nil { 331 + return err 332 + } 333 + handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state) 334 + if err != nil { 335 + return err 336 + } 337 + defer handle.Close() 338 339 + jwk, err := state.EnsureJWK(ctx, "jwk") 340 if err != nil { 341 return err 342 } 343 cli.JWK = jwk 344 345 + accessJWK, err := state.EnsureJWK(ctx, "access-jwk") 346 if err != nil { 347 return err 348 } ··· 350 351 b := bus.NewBus() 352 atsync := &atproto.ATProtoSynchronizer{ 353 + CLI: &cli, 354 + Model: mod, 355 + StatefulDB: state, 356 + Noter: noter, 357 + Bus: b, 358 } 359 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 360 if err != nil { ··· 377 378 op := oatproxy.New(&oatproxy.Config{ 379 Host: cli.PublicHost, 380 + CreateOAuthSession: state.CreateOAuthSession, 381 + UpdateOAuthSession: state.UpdateOAuthSession, 382 + GetOAuthSession: state.LoadOAuthSession, 383 + Lock: state.GetNamedLock, 384 Scope: "atproto transition:generic", 385 UpstreamJWK: cli.JWK, 386 DownstreamJWK: cli.AccessJWK, 387 ClientMetadata: clientMetadata, 388 }) 389 + d := director.NewDirector(mm, mod, &cli, b, op, state) 390 + a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op) 391 if err != nil { 392 return err 393 } ··· 397 398 group.Go(func() error { 399 return handleSignals(ctx) 400 + }) 401 + 402 + group.Go(func() error { 403 + return state.ProcessQueue(ctx) 404 }) 405 406 if cli.TracingEndpoint != "" {
+5 -2
pkg/config/config.go
··· 51 AdminAccount string 52 Build *BuildFlags 53 DataDir string 54 - DBPath string 55 EthAccountAddr string 56 EthKeystorePath string 57 EthPassword string ··· 124 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 125 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 126 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 127 - cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file") 128 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 129 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 130 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
··· 51 AdminAccount string 52 Build *BuildFlags 53 DataDir string 54 + DBURL string 55 + IndexDBPath string 56 EthAccountAddr string 57 EthKeystorePath string 58 EthPassword string ··· 125 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 126 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 127 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 128 + fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state") 129 + cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL) 130 + cli.DataDirFlag(fs, &cli.IndexDBPath, "index-db-path", "db.sqlite", "path to sqlite database file for maintaining atproto index") 131 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 132 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 133 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
+5 -1
pkg/director/director.go
··· 12 "stream.place/streamplace/pkg/log" 13 "stream.place/streamplace/pkg/media" 14 "stream.place/streamplace/pkg/model" 15 ) 16 17 // director is responsible for managing the lifecycle of a stream, making business ··· 28 streamSessions map[string]*StreamSession 29 streamSessionsMu sync.Mutex 30 op *oatproxy.OATProxy 31 } 32 33 - func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy) *Director { 34 return &Director{ 35 mm: mm, 36 mod: mod, ··· 39 streamSessions: make(map[string]*StreamSession), 40 streamSessionsMu: sync.Mutex{}, 41 op: op, 42 } 43 } 44 ··· 68 op: d.op, 69 packets: make([]bus.PacketizedSegment, 0), 70 started: make(chan struct{}), 71 } 72 d.streamSessions[not.Segment.RepoDID] = ss 73 g.Go(func() error {
··· 12 "stream.place/streamplace/pkg/log" 13 "stream.place/streamplace/pkg/media" 14 "stream.place/streamplace/pkg/model" 15 + "stream.place/streamplace/pkg/statedb" 16 ) 17 18 // director is responsible for managing the lifecycle of a stream, making business ··· 29 streamSessions map[string]*StreamSession 30 streamSessionsMu sync.Mutex 31 op *oatproxy.OATProxy 32 + statefulDB *statedb.StatefulDB 33 } 34 35 + func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB) *Director { 36 return &Director{ 37 mm: mm, 38 mod: mod, ··· 41 streamSessions: make(map[string]*StreamSession), 42 streamSessionsMu: sync.Mutex{}, 43 op: op, 44 + statefulDB: statefulDB, 45 } 46 } 47 ··· 71 op: d.op, 72 packets: make([]bus.PacketizedSegment, 0), 73 started: make(chan struct{}), 74 + statefulDB: d.statefulDB, 75 } 76 d.streamSessions[not.Segment.RepoDID] = ss 77 g.Go(func() error {
+3 -1
pkg/director/stream_session.go
··· 22 "stream.place/streamplace/pkg/model" 23 "stream.place/streamplace/pkg/renditions" 24 "stream.place/streamplace/pkg/spmetrics" 25 "stream.place/streamplace/pkg/streamplace" 26 "stream.place/streamplace/pkg/thumbnail" 27 ) ··· 42 started chan struct{} 43 ctx context.Context 44 packets []bus.PacketizedSegment 45 } 46 47 func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error { ··· 252 return nil 253 } 254 255 - session, err := ss.mod.GetSessionByDID(repoDID) 256 if err != nil { 257 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 258 }
··· 22 "stream.place/streamplace/pkg/model" 23 "stream.place/streamplace/pkg/renditions" 24 "stream.place/streamplace/pkg/spmetrics" 25 + "stream.place/streamplace/pkg/statedb" 26 "stream.place/streamplace/pkg/streamplace" 27 "stream.place/streamplace/pkg/thumbnail" 28 ) ··· 43 started chan struct{} 44 ctx context.Context 45 packets []bus.PacketizedSegment 46 + statefulDB *statedb.StatefulDB 47 } 48 49 func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error { ··· 254 return nil 255 } 256 257 + session, err := ss.statefulDB.GetSessionByDID(repoDID) 258 if err != nil { 259 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 260 }
+4 -10
pkg/integrations/discord/avatars.go
··· 2 3 import ( 4 "context" 5 - "fmt" 6 "sync" 7 8 "github.com/bluesky-social/indigo/api/bsky" 9 "github.com/bluesky-social/indigo/xrpc" 10 "stream.place/streamplace/pkg/aqhttp" 11 - "stream.place/streamplace/pkg/model" 12 ) 13 14 var avatarCache = make(map[string]string) ··· 17 // getAvatarURL gets the avatar URL for a Bluesky from the public appview 18 // pretty ugly. we're going to replace this with indexing bluesky profiles 19 // at some point. 20 - func getAvatarURL(ctx context.Context, r *model.Repo) (string, error) { 21 avatarCacheMutex.Lock() 22 defer avatarCacheMutex.Unlock() 23 24 - if r == nil || r.DID == "" { 25 - return "", fmt.Errorf("repo or DID is nil or empty") 26 - } 27 - 28 - if avatar, ok := avatarCache[r.DID]; ok { 29 return avatar, nil 30 } 31 ··· 34 Client: &aqhttp.Client, 35 } 36 37 - profile, err := bsky.ActorGetProfile(ctx, xrpc, r.DID) 38 if err != nil { 39 return "", err 40 } 41 42 if profile.Avatar != nil { 43 - avatarCache[r.DID] = *profile.Avatar 44 return *profile.Avatar, nil 45 } 46
··· 2 3 import ( 4 "context" 5 "sync" 6 7 "github.com/bluesky-social/indigo/api/bsky" 8 "github.com/bluesky-social/indigo/xrpc" 9 "stream.place/streamplace/pkg/aqhttp" 10 ) 11 12 var avatarCache = make(map[string]string) ··· 15 // getAvatarURL gets the avatar URL for a Bluesky from the public appview 16 // pretty ugly. we're going to replace this with indexing bluesky profiles 17 // at some point. 18 + func getAvatarURL(ctx context.Context, did string) (string, error) { 19 avatarCacheMutex.Lock() 20 defer avatarCacheMutex.Unlock() 21 22 + if avatar, ok := avatarCache[did]; ok { 23 return avatar, nil 24 } 25 ··· 28 Client: &aqhttp.Client, 29 } 30 31 + profile, err := bsky.ActorGetProfile(ctx, xrpc, did) 32 if err != nil { 33 return "", err 34 } 35 36 if profile.Avatar != nil { 37 + avatarCache[did] = *profile.Avatar 38 return *profile.Avatar, nil 39 } 40
+2 -3
pkg/integrations/discord/send-chat.go
··· 13 "stream.place/streamplace/pkg/aqhttp" 14 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 15 "stream.place/streamplace/pkg/log" 16 - "stream.place/streamplace/pkg/model" 17 "stream.place/streamplace/pkg/streamplace" 18 ) 19 20 - func SendChat(ctx context.Context, w *discordtypes.Webhook, r *model.Repo, scm *streamplace.ChatDefs_MessageView) error { 21 22 msg, ok := scm.Record.Val.(*streamplace.ChatMessage) 23 if !ok { 24 return fmt.Errorf("failed to cast chat message to streamplace chat message") 25 } 26 27 - avatarURL, err := getAvatarURL(ctx, r) 28 if err != nil { 29 log.Warn(ctx, "failed to get avatar URL", "err", err) 30 }
··· 13 "stream.place/streamplace/pkg/aqhttp" 14 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 15 "stream.place/streamplace/pkg/log" 16 "stream.place/streamplace/pkg/streamplace" 17 ) 18 19 + func SendChat(ctx context.Context, w *discordtypes.Webhook, did string, scm *streamplace.ChatDefs_MessageView) error { 20 21 msg, ok := scm.Record.Val.(*streamplace.ChatMessage) 22 if !ok { 23 return fmt.Errorf("failed to cast chat message to streamplace chat message") 24 } 25 26 + avatarURL, err := getAvatarURL(ctx, did) 27 if err != nil { 28 log.Warn(ctx, "failed to get avatar URL", "err", err) 29 }
+7 -7
pkg/integrations/discord/send-livestream.go
··· 20 "stream.place/streamplace/pkg/streamplace" 21 ) 22 23 - func SendLivestream(ctx context.Context, w *discordtypes.Webhook, r *model.Repo, lsv *streamplace.Livestream_LivestreamView, postView *bsky.FeedDefs_PostView, spcp *streamplace.ChatProfile) error { 24 ctx = log.WithLogValues(ctx, "func", "SendLivestream") 25 ls, ok := lsv.Record.Val.(*streamplace.Livestream) 26 if !ok { ··· 33 } 34 35 payload := discordtypes.Payload{ 36 - Username: fmt.Sprintf("@%s", r.Handle), 37 Content: fmt.Sprintf("%s%s%s", w.Prefix, content, w.Suffix), 38 } 39 40 - avatarURL, err := getAvatarURL(ctx, r) 41 if err != nil { 42 log.Warn(ctx, "failed to get avatar URL", "err", err) 43 } ··· 67 log.Warn(ctx, "failed to parse URL", "err", err) 68 } else { 69 suffix = fmt.Sprintf(" on %s!", u.Host) 70 - payload.Embeds[0].URL = fmt.Sprintf("%s/%s", *ls.Url, r.Handle) 71 } 72 } 73 74 - payload.Embeds[0].Title = fmt.Sprintf("@%s is LIVE%s", r.Handle, suffix) 75 76 if ls.Thumb != nil { 77 - u, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", r.PDS)) 78 if err != nil { 79 return fmt.Errorf("failed to parse base URL: %w", err) 80 } 81 q := u.Query() 82 - q.Set("did", r.DID) 83 q.Set("cid", ls.Thumb.Ref.String()) 84 u.RawQuery = q.Encode() 85 imageURL := u.String()
··· 20 "stream.place/streamplace/pkg/streamplace" 21 ) 22 23 + func SendLivestream(ctx context.Context, w *discordtypes.Webhook, pdsURL string, lsv *streamplace.Livestream_LivestreamView, postView *bsky.FeedDefs_PostView, spcp *streamplace.ChatProfile) error { 24 ctx = log.WithLogValues(ctx, "func", "SendLivestream") 25 ls, ok := lsv.Record.Val.(*streamplace.Livestream) 26 if !ok { ··· 33 } 34 35 payload := discordtypes.Payload{ 36 + Username: fmt.Sprintf("@%s", lsv.Author.Handle), 37 Content: fmt.Sprintf("%s%s%s", w.Prefix, content, w.Suffix), 38 } 39 40 + avatarURL, err := getAvatarURL(ctx, lsv.Author.Did) 41 if err != nil { 42 log.Warn(ctx, "failed to get avatar URL", "err", err) 43 } ··· 67 log.Warn(ctx, "failed to parse URL", "err", err) 68 } else { 69 suffix = fmt.Sprintf(" on %s!", u.Host) 70 + payload.Embeds[0].URL = fmt.Sprintf("%s/%s", *ls.Url, lsv.Author.Handle) 71 } 72 } 73 74 + payload.Embeds[0].Title = fmt.Sprintf("@%s is LIVE%s", lsv.Author.Handle, suffix) 75 76 if ls.Thumb != nil { 77 + u, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", pdsURL)) 78 if err != nil { 79 return fmt.Errorf("failed to parse base URL: %w", err) 80 } 81 q := u.Query() 82 + q.Set("did", lsv.Author.Did) 83 q.Set("cid", ls.Thumb.Ref.String()) 84 u.RawQuery = q.Encode() 85 imageURL := u.String()
+4 -3
pkg/media/media_test.go
··· 42 AllowedStreams: []string{"did:key:zQ3shhoPCrDZWE8CryCEHYCrb1x8mCkr2byTkF5EGJT7dgazC"}, 43 }) 44 atsync := &atproto.ATProtoSynchronizer{ 45 - CLI: cli, 46 - Model: mod, 47 - Bus: bus.NewBus(), 48 } 49 mm, err := MakeMediaManager(context.Background(), cli, signer, &boring.BoringReplicator{}, mod, bus.NewBus(), atsync) 50 require.NoError(t, err)
··· 42 AllowedStreams: []string{"did:key:zQ3shhoPCrDZWE8CryCEHYCrb1x8mCkr2byTkF5EGJT7dgazC"}, 43 }) 44 atsync := &atproto.ATProtoSynchronizer{ 45 + CLI: cli, 46 + Model: mod, 47 + StatefulDB: nil, // Test doesn't need StatefulDB for now 48 + Bus: bus.NewBus(), 49 } 50 mm, err := MakeMediaManager(context.Background(), cli, signer, &boring.BoringReplicator{}, mod, bus.NewBus(), atsync) 51 require.NoError(t, err)
+1 -19
pkg/model/model.go
··· 12 "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/lmittmann/tint" 14 slogGorm "github.com/orandin/slog-gorm" 15 - "github.com/streamplace/oatproxy/pkg/oatproxy" 16 "gorm.io/driver/sqlite" 17 "gorm.io/gorm" 18 "stream.place/streamplace/pkg/config" ··· 26 } 27 28 type Model interface { 29 - CreateNotification(token, repoDID string) error 30 - ListNotifications() ([]Notification, error) 31 - 32 CreatePlayerEvent(event PlayerEventAPI) error 33 ListPlayerEvents(playerID string) ([]PlayerEvent, error) 34 PlayerReport(playerID string) (map[string]any, error) ··· 63 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 64 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 65 DeleteFollow(ctx context.Context, userDID, rev string) error 66 - GetFollowersNotificationTokens(userDID string) ([]string, error) 67 68 CreateFeedPost(ctx context.Context, post *FeedPost) error 69 ListFeedPosts() ([]FeedPost, error) ··· 93 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 94 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 95 96 - CreateOAuthSession(id string, session *oatproxy.OAuthSession) error 97 - LoadOAuthSession(id string) (*oatproxy.OAuthSession, error) 98 - UpdateOAuthSession(id string, session *oatproxy.OAuthSession) error 99 - ListOAuthSessions() ([]oatproxy.OAuthSession, error) 100 - GetSessionByDID(did string) (*oatproxy.OAuthSession, error) 101 - 102 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 103 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 104 DeleteServerSettings(ctx context.Context, server string, repoDID string) error 105 - 106 - CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit, signedData string) error 107 - GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) 108 - GetCommitEventsSinceSeq(repoDID string, seq int64) ([]*XrpcStreamEvent, error) 109 - GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) 110 111 CreateLabeler(did string) (*Labeler, error) 112 GetLabeler(did string) (*Labeler, error) ··· 159 } 160 sqlDB.SetMaxOpenConns(1) 161 for _, model := range []any{ 162 - Notification{}, 163 PlayerEvent{}, 164 Segment{}, 165 Thumbnail{}, ··· 173 ChatMessage{}, 174 ChatProfile{}, 175 Gate{}, 176 - oatproxy.OAuthSession{}, 177 ServerSettings{}, 178 - XrpcStreamEvent{}, 179 Labeler{}, 180 Label{}, 181 } {
··· 12 "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/lmittmann/tint" 14 slogGorm "github.com/orandin/slog-gorm" 15 "gorm.io/driver/sqlite" 16 "gorm.io/gorm" 17 "stream.place/streamplace/pkg/config" ··· 25 } 26 27 type Model interface { 28 CreatePlayerEvent(event PlayerEventAPI) error 29 ListPlayerEvents(playerID string) ([]PlayerEvent, error) 30 PlayerReport(playerID string) (map[string]any, error) ··· 59 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 60 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 61 DeleteFollow(ctx context.Context, userDID, rev string) error 62 63 CreateFeedPost(ctx context.Context, post *FeedPost) error 64 ListFeedPosts() ([]FeedPost, error) ··· 88 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 89 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 90 91 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 92 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 93 DeleteServerSettings(ctx context.Context, server string, repoDID string) error 94 95 CreateLabeler(did string) (*Labeler, error) 96 GetLabeler(did string) (*Labeler, error) ··· 143 } 144 sqlDB.SetMaxOpenConns(1) 145 for _, model := range []any{ 146 PlayerEvent{}, 147 Segment{}, 148 Thumbnail{}, ··· 156 ChatMessage{}, 157 ChatProfile{}, 158 Gate{}, 159 ServerSettings{}, 160 + 161 Labeler{}, 162 Label{}, 163 } {
-74
pkg/model/notification.go
··· 1 - package model 2 - 3 - import ( 4 - "fmt" 5 - "time" 6 - 7 - "gorm.io/gorm" 8 - ) 9 - 10 - type Notification struct { 11 - Token string `gorm:"primarykey"` 12 - RepoDID string `json:"repoDID,omitempty" gorm:"column:repo_did;index"` 13 - CreatedAt time.Time 14 - UpdatedAt time.Time 15 - DeletedAt gorm.DeletedAt `gorm:"index"` 16 - } 17 - 18 - func (m *DBModel) CreateNotification(token string, repoDID string) error { 19 - not := Notification{ 20 - Token: token, 21 - } 22 - if repoDID != "" { 23 - not.RepoDID = repoDID 24 - } 25 - err := m.DB.Save(&not).Error 26 - if err != nil { 27 - return err 28 - } 29 - return nil 30 - } 31 - 32 - func (m *DBModel) ListNotifications() ([]Notification, error) { 33 - nots := []Notification{} 34 - err := m.DB.Find(&nots).Error 35 - if err != nil { 36 - return nil, fmt.Errorf("error retrieving notifications: %w", err) 37 - } 38 - return nots, nil 39 - } 40 - 41 - func (m *DBModel) ListUserNotifications(userDID string) ([]Notification, error) { 42 - nots := []Notification{} 43 - err := m.DB.Where("repo_did = ?", userDID).Find(&nots).Error 44 - if err != nil { 45 - return nil, fmt.Errorf("error retrieving notifications: %w", err) 46 - } 47 - return nots, nil 48 - } 49 - 50 - func (m *DBModel) GetFollowersNotificationTokens(userDID string) ([]string, error) { 51 - var tokens []string 52 - 53 - err := m.DB.Model(&Notification{}). 54 - Distinct("notifications.token"). 55 - Joins("JOIN follows ON follows.user_did = notifications.repo_did"). 56 - Where("follows.subject_did = ?", userDID). 57 - Pluck("notifications.token", &tokens). 58 - Error 59 - 60 - if err != nil { 61 - return nil, fmt.Errorf("error retrieving follower notification tokens: %w", err) 62 - } 63 - 64 - // also you prolly wanna get one for yourself 65 - nots, err := m.ListUserNotifications(userDID) 66 - if err != nil { 67 - return nil, fmt.Errorf("error retrieving user notifications: %w", err) 68 - } 69 - for _, not := range nots { 70 - tokens = append(tokens, not.Token) 71 - } 72 - 73 - return tokens, nil 74 - }
···
-50
pkg/model/oauth_session.go
··· 1 - package model 2 - 3 - import ( 4 - "errors" 5 - 6 - "github.com/streamplace/oatproxy/pkg/oatproxy" 7 - "gorm.io/gorm" 8 - ) 9 - 10 - func (m *DBModel) CreateOAuthSession(id string, session *oatproxy.OAuthSession) error { 11 - return m.DB.Create(session).Error 12 - } 13 - 14 - func (m *DBModel) LoadOAuthSession(id string) (*oatproxy.OAuthSession, error) { 15 - var session oatproxy.OAuthSession 16 - if err := m.DB.Where("downstream_dpop_jkt = ?", id).First(&session).Error; err != nil { 17 - if errors.Is(err, gorm.ErrRecordNotFound) { 18 - return nil, nil 19 - } 20 - return nil, err 21 - } 22 - return &session, nil 23 - } 24 - 25 - func (m *DBModel) UpdateOAuthSession(id string, session *oatproxy.OAuthSession) error { 26 - res := m.DB.Model(&oatproxy.OAuthSession{}).Where("downstream_dpop_jkt = ?", id).Updates(session) 27 - if res.Error != nil { 28 - return res.Error 29 - } 30 - if res.RowsAffected == 0 { 31 - return errors.New("no rows affected") 32 - } 33 - return nil 34 - } 35 - 36 - func (m *DBModel) ListOAuthSessions() ([]oatproxy.OAuthSession, error) { 37 - var sessions []oatproxy.OAuthSession 38 - if err := m.DB.Find(&sessions).Error; err != nil { 39 - return nil, err 40 - } 41 - return sessions, nil 42 - } 43 - 44 - func (m *DBModel) GetSessionByDID(did string) (*oatproxy.OAuthSession, error) { 45 - var session oatproxy.OAuthSession 46 - if err := m.DB.Where("repo_did = ? AND revoked_at IS NULL", did).Order("updated_at DESC").First(&session).Error; err != nil { 47 - return nil, err 48 - } 49 - return &session, nil 50 - }
···
+12 -12
pkg/model/xrpc_stream_event.go pkg/statedb/xrpc_stream_event.go
··· 1 - package model 2 3 import ( 4 "bytes" ··· 13 ) 14 15 type XrpcStreamEvent struct { 16 - CID string `json:"cid" gorm:"primaryKey"` 17 RepoDID string `json:"repoDID" gorm:"index:idx_repo_timestamp,priority:1;index:idx_repo_seq,priority:1;column:repo_did"` 18 Timestamp time.Time `json:"timestamp" gorm:"index:idx_repo_timestamp,priority:2;column:timestamp"` 19 - Data []byte `json:"data"` 20 SignedData string `json:"signedData" gorm:"column:signed_data"` 21 Seq int64 `json:"seq" gorm:"index:idx_repo_seq,priority:2;column:seq"` 22 } ··· 30 return commit, nil 31 } 32 33 - func (m *DBModel) CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit, signedData string) error { 34 - prev, err := m.GetMostRecentCommitEvent(commit.Repo) 35 if err != nil { 36 return err 37 } ··· 68 Seq: commit.Seq, 69 SignedData: signedData, 70 } 71 - return m.DB.Create(event).Error 72 } 73 74 - func (m *DBModel) GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) { 75 var events []*XrpcStreamEvent 76 - query := m.DB.Where("repo_did = ?", repoDID) 77 query = query.Where("timestamp > ?", t.UTC()) 78 err := query.Order("timestamp ASC").Find(&events).Error 79 if err != nil { ··· 82 return events, nil 83 } 84 85 - func (m *DBModel) GetCommitEventsSinceSeq(repoDID string, seq int64) ([]*XrpcStreamEvent, error) { 86 var events []*XrpcStreamEvent 87 - query := m.DB.Where("repo_did = ?", repoDID) 88 query = query.Where("seq > ?", seq) 89 err := query.Order("timestamp ASC").Find(&events).Error 90 if err != nil { ··· 93 return events, nil 94 } 95 96 - func (m *DBModel) GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) { 97 var event XrpcStreamEvent 98 - err := m.DB.Where("repo_did = ?", repoDID). 99 Order("timestamp DESC"). 100 Limit(1). 101 First(&event).Error
··· 1 + package statedb 2 3 import ( 4 "bytes" ··· 13 ) 14 15 type XrpcStreamEvent struct { 16 + CID string `json:"cid" gorm:"column:cid;primaryKey"` 17 RepoDID string `json:"repoDID" gorm:"index:idx_repo_timestamp,priority:1;index:idx_repo_seq,priority:1;column:repo_did"` 18 Timestamp time.Time `json:"timestamp" gorm:"index:idx_repo_timestamp,priority:2;column:timestamp"` 19 + Data []byte `json:"data" gorm:"column:data"` 20 SignedData string `json:"signedData" gorm:"column:signed_data"` 21 Seq int64 `json:"seq" gorm:"index:idx_repo_seq,priority:2;column:seq"` 22 } ··· 30 return commit, nil 31 } 32 33 + func (state *StatefulDB) CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit, signedData string) error { 34 + prev, err := state.GetMostRecentCommitEvent(commit.Repo) 35 if err != nil { 36 return err 37 } ··· 68 Seq: commit.Seq, 69 SignedData: signedData, 70 } 71 + return state.DB.Create(event).Error 72 } 73 74 + func (state *StatefulDB) GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) { 75 var events []*XrpcStreamEvent 76 + query := state.DB.Where("repo_did = ?", repoDID) 77 query = query.Where("timestamp > ?", t.UTC()) 78 err := query.Order("timestamp ASC").Find(&events).Error 79 if err != nil { ··· 82 return events, nil 83 } 84 85 + func (state *StatefulDB) GetCommitEventsSinceSeq(repoDID string, seq int64) ([]*XrpcStreamEvent, error) { 86 var events []*XrpcStreamEvent 87 + query := state.DB.Where("repo_did = ?", repoDID) 88 query = query.Where("seq > ?", seq) 89 err := query.Order("timestamp ASC").Find(&events).Error 90 if err != nil { ··· 93 return events, nil 94 } 95 96 + func (state *StatefulDB) GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) { 97 var event XrpcStreamEvent 98 + err := state.DB.Where("repo_did = ?", repoDID). 99 Order("timestamp DESC"). 100 Limit(1). 101 First(&event).Error
+31 -28
pkg/resync/resync.go
··· 16 // resync a fresh database from the PDSses, copying over the few pieces of local state 17 // that we have 18 func Resync(ctx context.Context, cli *config.CLI) error { 19 - oldMod, err := model.MakeDB(cli.DBPath) 20 if err != nil { 21 return err 22 } 23 - tempDBPath := cli.DBPath + ".temp." + fmt.Sprintf("%d", time.Now().UnixNano()) 24 newMod, err := model.MakeDB(tempDBPath) 25 if err != nil { 26 return err ··· 31 } 32 33 atsync := &atproto.ATProtoSynchronizer{ 34 - CLI: cli, 35 - Model: newMod, 36 - Noter: nil, 37 - Bus: bus.NewBus(), 38 } 39 40 doneMap := make(map[string]bool) ··· 80 return err 81 } 82 83 - oauthSessions, err := oldMod.ListOAuthSessions() 84 - if err != nil { 85 - return err 86 - } 87 - for _, session := range oauthSessions { 88 - err := newMod.CreateOAuthSession(session.DownstreamDPoPJKT, &session) 89 - if err != nil { 90 - return fmt.Errorf("failed to create oauth session: %w", err) 91 - } 92 - } 93 - log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions)) 94 95 - notificationTokens, err := oldMod.ListNotifications() 96 - if err != nil { 97 - return err 98 - } 99 - for _, token := range notificationTokens { 100 - err := newMod.CreateNotification(token.Token, token.RepoDID) 101 - if err != nil { 102 - return fmt.Errorf("failed to create notification: %w", err) 103 - } 104 - } 105 - log.Log(ctx, "migrated notification tokens", "count", len(notificationTokens)) 106 107 log.Log(ctx, "resync complete!", "newDBPath", tempDBPath) 108
··· 16 // resync a fresh database from the PDSses, copying over the few pieces of local state 17 // that we have 18 func Resync(ctx context.Context, cli *config.CLI) error { 19 + oldMod, err := model.MakeDB(cli.IndexDBPath) 20 if err != nil { 21 return err 22 } 23 + tempDBPath := cli.IndexDBPath + ".temp." + fmt.Sprintf("%d", time.Now().UnixNano()) 24 newMod, err := model.MakeDB(tempDBPath) 25 if err != nil { 26 return err ··· 31 } 32 33 atsync := &atproto.ATProtoSynchronizer{ 34 + CLI: cli, 35 + Model: newMod, 36 + StatefulDB: nil, // TODO: Add StatefulDB for resync when migration is ready 37 + Noter: nil, 38 + Bus: bus.NewBus(), 39 } 40 41 doneMap := make(map[string]bool) ··· 81 return err 82 } 83 84 + // TODO: Update OAuth session migration to use new statefulDB 85 + // oauthSessions, err := oldMod.ListOAuthSessions() 86 + // if err != nil { 87 + // return err 88 + // } 89 + // for _, session := range oauthSessions { 90 + // err := newMod.CreateOAuthSession(session.DownstreamDPoPJKT, &session) 91 + // if err != nil { 92 + // return fmt.Errorf("failed to create oauth session: %w", err) 93 + // } 94 + // } 95 + // log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions)) 96 97 + // TODO: Update notification migration to use new statefulDB 98 + // notificationTokens, err := oldMod.ListNotifications() 99 + // if err != nil { 100 + // return err 101 + // } 102 + // for _, token := range notificationTokens { 103 + // err := newMod.CreateNotification(token.Token, token.RepoDID) 104 + // if err != nil { 105 + // return fmt.Errorf("failed to create notification: %w", err) 106 + // } 107 + // } 108 + // log.Log(ctx, "migrated notification tokens", "count", len(notificationTokens)) 109 110 log.Log(ctx, "resync complete!", "newDBPath", tempDBPath) 111
+4 -1
pkg/spxrpc/com_atproto_sync.go
··· 49 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get user repo head: %w", err) 50 } 51 52 r, err := repo.OpenRepo(ctx, bs, root) 53 if err != nil { 54 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err) ··· 74 } 75 76 for _, blk := range blocks { 77 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 78 return nil, err 79 } ··· 108 return err 109 } 110 111 - evts, err := s.model.GetCommitEventsSinceSeq(atproto.LexiconRepo.RepoDid(), int64(seq)) 112 if err != nil { 113 return err 114 }
··· 49 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get user repo head: %w", err) 50 } 51 52 + log.Warn(ctx, "got root", "root", root.String()) 53 + 54 r, err := repo.OpenRepo(ctx, bs, root) 55 if err != nil { 56 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err) ··· 76 } 77 78 for _, blk := range blocks { 79 + log.Warn(ctx, "writing block", "cid", blk.Cid().String(), "version", blk.Cid().Version()) 80 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 81 return nil, err 82 } ··· 111 return err 112 } 113 114 + evts, err := s.statefulDB.GetCommitEventsSinceSeq(atproto.LexiconRepo.RepoDid(), int64(seq)) 115 if err != nil { 116 return err 117 }
+4 -1
pkg/spxrpc/spxrpc.go
··· 14 "stream.place/streamplace/pkg/config" 15 "stream.place/streamplace/pkg/log" 16 "stream.place/streamplace/pkg/model" 17 ) 18 19 type Server struct { ··· 22 model model.Model 23 OGImageCache *cache.Cache 24 ATSync *atproto.ATProtoSynchronizer 25 } 26 27 - func NewServer(ctx context.Context, cli *config.CLI, model model.Model, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer) (*Server, error) { 28 e := echo.New() 29 s := &Server{ 30 e: e, ··· 32 model: model, 33 OGImageCache: cache.New(5*time.Minute, 10*time.Minute), // 5min TTL, 10min cleanup 34 ATSync: atsync, 35 } 36 e.Use(s.ErrorHandlingMiddleware()) 37 e.Use(s.ContextPreservingMiddleware())
··· 14 "stream.place/streamplace/pkg/config" 15 "stream.place/streamplace/pkg/log" 16 "stream.place/streamplace/pkg/model" 17 + "stream.place/streamplace/pkg/statedb" 18 ) 19 20 type Server struct { ··· 23 model model.Model 24 OGImageCache *cache.Cache 25 ATSync *atproto.ATProtoSynchronizer 26 + statefulDB *statedb.StatefulDB 27 } 28 29 + func NewServer(ctx context.Context, cli *config.CLI, model model.Model, statefulDB *statedb.StatefulDB, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer) (*Server, error) { 30 e := echo.New() 31 s := &Server{ 32 e: e, ··· 34 model: model, 35 OGImageCache: cache.New(5*time.Minute, 10*time.Minute), // 5min TTL, 10min cleanup 36 ATSync: atsync, 37 + statefulDB: statefulDB, 38 } 39 e.Use(s.ErrorHandlingMiddleware()) 40 e.Use(s.ContextPreservingMiddleware())
+34
pkg/statedb/config.go
···
··· 1 + package statedb 2 + 3 + import ( 4 + "errors" 5 + "time" 6 + 7 + "gorm.io/gorm" 8 + ) 9 + 10 + type Config struct { 11 + Key string `gorm:"column:key;primarykey"` 12 + Value []byte `gorm:"column:value"` 13 + CreatedAt time.Time `gorm:"column:created_at"` 14 + UpdatedAt time.Time `gorm:"column:updated_at"` 15 + } 16 + 17 + func (state *StatefulDB) GetConfig(key string) (*Config, error) { 18 + var config Config 19 + if err := state.DB.Where("key = ?", key).First(&config).Error; err != nil { 20 + if errors.Is(err, gorm.ErrRecordNotFound) { 21 + return nil, nil 22 + } 23 + return nil, err 24 + } 25 + return &config, nil 26 + } 27 + 28 + func (state *StatefulDB) PutConfig(key string, value []byte) error { 29 + config := Config{ 30 + Key: key, 31 + Value: value, 32 + } 33 + return state.DB.Save(&config).Error 34 + }
+73
pkg/statedb/jwks.go
···
··· 1 + package statedb 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "os" 8 + 9 + "github.com/lestrrat-go/jwx/v2/jwk" 10 + oauth_helpers "github.com/streamplace/atproto-oauth-golang/helpers" 11 + "stream.place/streamplace/pkg/log" 12 + ) 13 + 14 + func (state *StatefulDB) EnsureJWK(ctx context.Context, name string) (jwk.Key, error) { 15 + var key jwk.Key 16 + 17 + conf, err := state.GetConfig(name) 18 + if err != nil { 19 + return nil, err 20 + } 21 + 22 + // happy path: we found the jwk in the database, use that 23 + if conf != nil { 24 + key, err = jwk.ParseKey(conf.Value) 25 + if err != nil { 26 + return nil, err 27 + } 28 + return key, nil 29 + } 30 + 31 + // migration path: maybe we have an old one on disk. 32 + key, _ = state.getOldJWK(ctx, name) 33 + 34 + // new path: found neither, generate a new one 35 + if key == nil { 36 + log.Warn(ctx, "no JWK found, generating new one", "name", name) 37 + key, err = oauth_helpers.GenerateKey(nil) 38 + if err != nil { 39 + return nil, fmt.Errorf("failed to generate JWK: %w", err) 40 + } 41 + } 42 + 43 + b, err := json.Marshal(key) 44 + if err != nil { 45 + return nil, fmt.Errorf("failed to marshal JWK: %w", err) 46 + } 47 + err = state.PutConfig(name, b) 48 + if err != nil { 49 + return nil, fmt.Errorf("failed to save JWK: %w", err) 50 + } 51 + 52 + return key, nil 53 + } 54 + 55 + // migration for the old one we stored on disk 56 + func (state *StatefulDB) getOldJWK(ctx context.Context, name string) (jwk.Key, error) { 57 + var key jwk.Key 58 + jwkPath := state.CLI.DataFilePath([]string{name + ".json"}) 59 + _, err := os.Stat(jwkPath) 60 + if err == nil { 61 + b, err := os.ReadFile(jwkPath) 62 + if err != nil { 63 + return nil, err 64 + } 65 + key, err = jwk.ParseKey(b) 66 + if err != nil { 67 + return nil, err 68 + } 69 + log.Warn(ctx, "found old JWK on disk, migrating to stateful database", "path", jwkPath) 70 + return key, nil 71 + } 72 + return nil, nil 73 + }
+71
pkg/statedb/locks.go
···
··· 1 + package statedb 2 + 3 + import ( 4 + "crypto/sha256" 5 + "encoding/binary" 6 + "fmt" 7 + "sync" 8 + ) 9 + 10 + func (state *StatefulDB) GetNamedLock(name string) (func(), error) { 11 + switch state.Type { 12 + case DBTypeSQLite: 13 + return state.getNamedLockSQLite(name) 14 + case DBTypePostgres: 15 + return state.getNamedLockPostgres(name) 16 + } 17 + panic("unsupported database type") 18 + } 19 + 20 + func (state *StatefulDB) getNamedLockPostgres(name string) (func(), error) { 21 + // Convert string to sha256 hash and use decimal value for advisory lock 22 + h := sha256.Sum256([]byte(name)) 23 + nameInt := binary.BigEndian.Uint64(h[:32]) 24 + 25 + err := state.DB.Exec("SELECT pg_advisory_lock($1)", nameInt).Error 26 + if err != nil { 27 + return nil, err 28 + } 29 + return func() { 30 + err := state.DB.Exec("SELECT pg_advisory_unlock($1)", nameInt).Error 31 + if err != nil { 32 + // unfortunate, but the risk is that we're holding on to the lock forever, 33 + // so it's responsible to crash in this case 34 + panic(fmt.Errorf("error unlocking named lock: %w", err)) 35 + } 36 + }, nil 37 + } 38 + 39 + func (state *StatefulDB) getNamedLockSQLite(name string) (func(), error) { 40 + lock := state.locks.GetLock(name) 41 + lock.Lock() 42 + return func() { 43 + lock.Unlock() 44 + }, nil 45 + } 46 + 47 + // Local mutex implementation for sqlite 48 + type NamedLocks struct { 49 + mu sync.Mutex 50 + locks map[string]*sync.Mutex 51 + } 52 + 53 + // NewNamedLocks creates a new NamedLocks instance 54 + func NewNamedLocks() *NamedLocks { 55 + return &NamedLocks{ 56 + locks: make(map[string]*sync.Mutex), 57 + } 58 + } 59 + 60 + // GetLock returns the mutex for the given name, creating it if it doesn't exist 61 + func (n *NamedLocks) GetLock(name string) *sync.Mutex { 62 + n.mu.Lock() 63 + defer n.mu.Unlock() 64 + 65 + lock, exists := n.locks[name] 66 + if !exists { 67 + lock = &sync.Mutex{} 68 + n.locks[name] = lock 69 + } 70 + return lock 71 + }
+65
pkg/statedb/migrate.go
···
··· 1 + package statedb 2 + 3 + import ( 4 + "context" 5 + "os" 6 + "time" 7 + 8 + "github.com/lmittmann/tint" 9 + slogGorm "github.com/orandin/slog-gorm" 10 + "github.com/streamplace/oatproxy/pkg/oatproxy" 11 + "gorm.io/driver/sqlite" 12 + "gorm.io/gorm" 13 + "stream.place/streamplace/pkg/config" 14 + "stream.place/streamplace/pkg/log" 15 + ) 16 + 17 + func Migrate(cli *config.CLI) error { 18 + gormLogger := slogGorm.New( 19 + slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 20 + TimeFormat: time.RFC3339, 21 + })), 22 + // slogGorm.WithTraceAll(), 23 + ) 24 + 25 + newDB, err := MakeDB(cli, nil, nil) 26 + if err != nil { 27 + return err 28 + } 29 + 30 + oldDB, err := gorm.Open(sqlite.Open(cli.IndexDBPath), &gorm.Config{ 31 + Logger: gormLogger, 32 + }) 33 + if err != nil { 34 + return err 35 + } 36 + 37 + var sessions []oatproxy.OAuthSession 38 + if err := oldDB.Find(&sessions).Error; err != nil { 39 + return err 40 + } 41 + 42 + for _, session := range sessions { 43 + log.Log(context.Background(), "migrating session", "session", session.DownstreamDPoPJKT) 44 + err := newDB.DB.Save(&session).Error 45 + if err != nil { 46 + return err 47 + } 48 + } 49 + 50 + var notifications []Notification 51 + if err := oldDB.Find(&notifications).Error; err != nil { 52 + time.Sleep(1 * time.Second) 53 + return err 54 + } 55 + 56 + for _, notification := range notifications { 57 + log.Log(context.Background(), "migrating notification", "notification", notification) 58 + err := newDB.DB.Save(&notification).Error 59 + if err != nil { 60 + return err 61 + } 62 + } 63 + 64 + return nil 65 + }
+57
pkg/statedb/notification.go
···
··· 1 + package statedb 2 + 3 + import ( 4 + "fmt" 5 + "time" 6 + ) 7 + 8 + type Notification struct { 9 + Token string `gorm:"column:token;primarykey"` 10 + RepoDID string `json:"repoDID,omitempty" gorm:"column:repo_did;index"` 11 + CreatedAt time.Time `gorm:"column:created_at"` 12 + UpdatedAt time.Time `gorm:"column:updated_at"` 13 + } 14 + 15 + func (state *StatefulDB) CreateNotification(token string, repoDID string) error { 16 + not := Notification{ 17 + Token: token, 18 + } 19 + if repoDID != "" { 20 + not.RepoDID = repoDID 21 + } 22 + err := state.DB.Save(&not).Error 23 + if err != nil { 24 + return err 25 + } 26 + return nil 27 + } 28 + 29 + func (state *StatefulDB) ListNotifications() ([]Notification, error) { 30 + nots := []Notification{} 31 + err := state.DB.Find(&nots).Error 32 + if err != nil { 33 + return nil, fmt.Errorf("error retrieving notifications: %w", err) 34 + } 35 + return nots, nil 36 + } 37 + 38 + func (state *StatefulDB) ListUserNotifications(userDID string) ([]Notification, error) { 39 + nots := []Notification{} 40 + err := state.DB.Where("repo_did = ?", userDID).Find(&nots).Error 41 + if err != nil { 42 + return nil, fmt.Errorf("error retrieving notifications: %w", err) 43 + } 44 + return nots, nil 45 + } 46 + 47 + func (state *StatefulDB) GetManyNotificationTokens(userDIDs []string) ([]string, error) { 48 + tokens := []string{} 49 + err := state.DB.Model(&Notification{}). 50 + Where("repo_did IN (?)", userDIDs). 51 + Pluck("token", &tokens). 52 + Error 53 + if err != nil { 54 + return nil, fmt.Errorf("error retrieving notification tokens: %w", err) 55 + } 56 + return tokens, nil 57 + }
+50
pkg/statedb/oauth_session.go
···
··· 1 + package statedb 2 + 3 + import ( 4 + "errors" 5 + 6 + "github.com/streamplace/oatproxy/pkg/oatproxy" 7 + "gorm.io/gorm" 8 + ) 9 + 10 + func (state *StatefulDB) CreateOAuthSession(id string, session *oatproxy.OAuthSession) error { 11 + return state.DB.Create(session).Error 12 + } 13 + 14 + func (state *StatefulDB) LoadOAuthSession(id string) (*oatproxy.OAuthSession, error) { 15 + var session oatproxy.OAuthSession 16 + if err := state.DB.Where("downstream_dpop_jkt = ?", id).First(&session).Error; err != nil { 17 + if errors.Is(err, gorm.ErrRecordNotFound) { 18 + return nil, nil 19 + } 20 + return nil, err 21 + } 22 + return &session, nil 23 + } 24 + 25 + func (state *StatefulDB) UpdateOAuthSession(id string, session *oatproxy.OAuthSession) error { 26 + res := state.DB.Model(&oatproxy.OAuthSession{}).Where("downstream_dpop_jkt = ?", id).Updates(session) 27 + if res.Error != nil { 28 + return res.Error 29 + } 30 + if res.RowsAffected == 0 { 31 + return errors.New("no rows affected") 32 + } 33 + return nil 34 + } 35 + 36 + func (state *StatefulDB) ListOAuthSessions() ([]oatproxy.OAuthSession, error) { 37 + var sessions []oatproxy.OAuthSession 38 + if err := state.DB.Find(&sessions).Error; err != nil { 39 + return nil, err 40 + } 41 + return sessions, nil 42 + } 43 + 44 + func (state *StatefulDB) GetSessionByDID(did string) (*oatproxy.OAuthSession, error) { 45 + var session oatproxy.OAuthSession 46 + if err := state.DB.Where("repo_did = ? AND revoked_at IS NULL", did).Order("updated_at DESC").First(&session).Error; err != nil { 47 + return nil, err 48 + } 49 + return &session, nil 50 + }
+156
pkg/statedb/queue_processor.go
···
··· 1 + package statedb 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "fmt" 8 + "time" 9 + 10 + "github.com/bluesky-social/indigo/api/bsky" 11 + "gorm.io/gorm" 12 + "stream.place/streamplace/pkg/integrations/discord" 13 + "stream.place/streamplace/pkg/log" 14 + notificationpkg "stream.place/streamplace/pkg/notifications" 15 + "stream.place/streamplace/pkg/streamplace" 16 + ) 17 + 18 + var TaskNotification = "notification" 19 + var TaskChat = "chat" 20 + 21 + type NotificationTask struct { 22 + Livestream *streamplace.Livestream_LivestreamView 23 + FeedPost *bsky.FeedDefs_PostView 24 + ChatProfile *streamplace.ChatProfile 25 + PDSURL string 26 + } 27 + 28 + type ChatTask struct { 29 + MessageView *streamplace.ChatDefs_MessageView 30 + } 31 + 32 + func (state *StatefulDB) ProcessQueue(ctx context.Context) error { 33 + for { 34 + task, err := state.DequeueTask(ctx, "queue_processor") 35 + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { 36 + return err 37 + } 38 + if task != nil { 39 + err := state.processTask(ctx, task) 40 + if err != nil { 41 + log.Error(ctx, "failed to process task", "err", err) 42 + } 43 + } else { 44 + select { 45 + case <-ctx.Done(): 46 + return ctx.Err() 47 + case <-time.After(1 * time.Second): 48 + continue 49 + case <-state.pokeQueue: 50 + continue 51 + } 52 + } 53 + 54 + } 55 + } 56 + 57 + func (state *StatefulDB) processTask(ctx context.Context, task *AppTask) error { 58 + switch task.Type { 59 + case TaskNotification: 60 + return state.processNotificationTask(ctx, task) 61 + case TaskChat: 62 + return state.processChatMessageTask(ctx, task) 63 + default: 64 + return fmt.Errorf("unknown task type: %s", task.Type) 65 + } 66 + } 67 + 68 + func (state *StatefulDB) processNotificationTask(ctx context.Context, task *AppTask) error { 69 + var notificationTask NotificationTask 70 + if err := json.Unmarshal(task.Payload, &notificationTask); err != nil { 71 + return err 72 + } 73 + lsv := notificationTask.Livestream 74 + rec, ok := lsv.Record.Val.(*streamplace.Livestream) 75 + if !ok { 76 + return fmt.Errorf("invalid livestream record") 77 + } 78 + userDID := lsv.Author.Did 79 + 80 + log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", rec.Url, "createdAt", rec.CreatedAt, "repo", userDID) 81 + followers, err := state.model.GetUserFollowers(ctx, userDID) 82 + if err != nil { 83 + return err 84 + } 85 + 86 + followersDIDs := make([]string, 0, len(followers)) 87 + for _, follower := range followers { 88 + followersDIDs = append(followersDIDs, follower.UserDID) 89 + } 90 + 91 + log.Log(ctx, "found followers", "count", len(followersDIDs)) 92 + 93 + notifications, err := state.GetManyNotificationTokens(followersDIDs) 94 + if err != nil { 95 + return err 96 + } 97 + 98 + if state.noter != nil { 99 + nb := &notificationpkg.NotificationBlast{ 100 + Title: fmt.Sprintf("🔴 @%s is LIVE!", lsv.Author.Handle), 101 + Body: rec.Title, 102 + Data: map[string]string{ 103 + "path": fmt.Sprintf("/%s", lsv.Author.Handle), 104 + }, 105 + } 106 + err = state.noter.Blast(ctx, notifications, nb) 107 + if err != nil { 108 + log.Error(ctx, "failed to blast notifications", "err", err) 109 + } else { 110 + log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb) 111 + } 112 + } else { 113 + log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications)) 114 + } 115 + 116 + for _, webhook := range state.CLI.DiscordWebhooks { 117 + if webhook.DID == userDID && webhook.Type == "livestream" { 118 + go func() { 119 + err := discord.SendLivestream(ctx, webhook, notificationTask.PDSURL, lsv, notificationTask.FeedPost, notificationTask.ChatProfile) 120 + if err != nil { 121 + log.Error(ctx, "failed to send livestream to discord", "err", err) 122 + } else { 123 + log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL) 124 + } 125 + }() 126 + } 127 + } 128 + return nil 129 + } 130 + 131 + func (state *StatefulDB) processChatMessageTask(ctx context.Context, task *AppTask) error { 132 + var chatTask ChatTask 133 + if err := json.Unmarshal(task.Payload, &chatTask); err != nil { 134 + return err 135 + } 136 + scm := chatTask.MessageView 137 + rec, ok := scm.Record.Val.(*streamplace.ChatMessage) 138 + if !ok { 139 + return fmt.Errorf("invalid chat message record") 140 + } 141 + userDID := scm.Author.Did 142 + 143 + for _, webhook := range state.CLI.DiscordWebhooks { 144 + if webhook.DID == rec.Streamer && webhook.Type == "chat" { 145 + go func() { 146 + err := discord.SendChat(ctx, webhook, scm.Author.Did, scm) 147 + if err != nil { 148 + log.Error(ctx, "failed to send livestream to discord", "err", err) 149 + } else { 150 + log.Log(ctx, "sent livestream to discord", "user", userDID, "webhook", webhook.URL) 151 + } 152 + }() 153 + } 154 + } 155 + return nil 156 + }
+163
pkg/statedb/statedb.go
···
··· 1 + package statedb 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/url" 7 + "os" 8 + "strings" 9 + "time" 10 + 11 + "github.com/lmittmann/tint" 12 + slogGorm "github.com/orandin/slog-gorm" 13 + "github.com/streamplace/oatproxy/pkg/oatproxy" 14 + "gorm.io/driver/postgres" 15 + "gorm.io/driver/sqlite" 16 + "gorm.io/gorm" 17 + "stream.place/streamplace/pkg/config" 18 + "stream.place/streamplace/pkg/log" 19 + "stream.place/streamplace/pkg/model" 20 + notificationpkg "stream.place/streamplace/pkg/notifications" 21 + ) 22 + 23 + type DBType string 24 + 25 + const ( 26 + DBTypeSQLite DBType = "sqlite" 27 + DBTypePostgres DBType = "postgres" 28 + ) 29 + 30 + type StatefulDB struct { 31 + DB *gorm.DB 32 + CLI *config.CLI 33 + Type DBType 34 + locks *NamedLocks 35 + noter notificationpkg.FirebaseNotifier 36 + model model.Model 37 + // pokeQueue is used to wake up the queue processor when a new task is enqueued 38 + pokeQueue chan struct{} 39 + } 40 + 41 + // list tables here so we can migrate them 42 + var StatefulDBModels = []any{ 43 + oatproxy.OAuthSession{}, 44 + Notification{}, 45 + Config{}, 46 + XrpcStreamEvent{}, 47 + AppTask{}, 48 + } 49 + 50 + var NoPostgresDatabaseCode = "3D000" 51 + 52 + // Stateful database for storing private streamplace state 53 + func MakeDB(cli *config.CLI, noter notificationpkg.FirebaseNotifier, model model.Model) (*StatefulDB, error) { 54 + dbURL := cli.DBURL 55 + log.Log(context.Background(), "starting stateful database", "dbURL", redactDBURL(dbURL)) 56 + var dial gorm.Dialector 57 + var dbType DBType 58 + if dbURL == ":memory:" { 59 + dial = sqlite.Open(":memory:") 60 + dbType = DBTypeSQLite 61 + } else if strings.HasPrefix(dbURL, "sqlite://") { 62 + dial = sqlite.Open(dbURL[len("sqlite://"):]) 63 + dbType = DBTypeSQLite 64 + } else if strings.HasPrefix(dbURL, "postgres://") { 65 + dial = postgres.Open(dbURL) 66 + dbType = DBTypePostgres 67 + } else { 68 + return nil, fmt.Errorf("unsupported database URL (most start with sqlite:// or postgres://): %s", redactDBURL(dbURL)) 69 + } 70 + 71 + db, err := openDB(dial) 72 + 73 + if err != nil { 74 + if dbType == DBTypePostgres && strings.Contains(err.Error(), NoPostgresDatabaseCode) { 75 + db, err = makePostgresDB(dbURL) 76 + if err != nil { 77 + return nil, fmt.Errorf("error creating streamplace database: %w", err) 78 + } 79 + } else { 80 + return nil, fmt.Errorf("error starting database: %w", err) 81 + } 82 + } 83 + if dbType == DBTypeSQLite { 84 + err = db.Exec("PRAGMA journal_mode=WAL;").Error 85 + if err != nil { 86 + return nil, fmt.Errorf("error setting journal mode: %w", err) 87 + } 88 + sqlDB, err := db.DB() 89 + if err != nil { 90 + return nil, fmt.Errorf("error getting database: %w", err) 91 + } 92 + sqlDB.SetMaxOpenConns(1) 93 + } 94 + for _, model := range StatefulDBModels { 95 + err = db.AutoMigrate(model) 96 + if err != nil { 97 + return nil, err 98 + } 99 + } 100 + return &StatefulDB{ 101 + DB: db, 102 + CLI: cli, 103 + Type: dbType, 104 + locks: NewNamedLocks(), 105 + model: model, 106 + pokeQueue: make(chan struct{}, 1), 107 + }, nil 108 + } 109 + 110 + func openDB(dial gorm.Dialector) (*gorm.DB, error) { 111 + gormLogger := slogGorm.New( 112 + slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 113 + TimeFormat: time.RFC3339, 114 + })), 115 + // slogGorm.WithTraceAll(), 116 + ) 117 + 118 + return gorm.Open(dial, &gorm.Config{ 119 + SkipDefaultTransaction: true, 120 + TranslateError: true, 121 + Logger: gormLogger, 122 + }) 123 + } 124 + 125 + // helper function for creating the requested postgres database 126 + func makePostgresDB(dbURL string) (*gorm.DB, error) { 127 + u, err := url.Parse(dbURL) 128 + if err != nil { 129 + return nil, err 130 + } 131 + dbName := strings.TrimPrefix(u.Path, "/") 132 + u.Path = "/postgres" 133 + 134 + rootDial := postgres.Open(u.String()) 135 + 136 + db, err := openDB(rootDial) 137 + if err != nil { 138 + return nil, err 139 + } 140 + 141 + // postgres doesn't support prepared statements for CREATE DATABASE. don't SQL inject yourself. 142 + err = db.Exec(fmt.Sprintf("CREATE DATABASE %s;", dbName)).Error 143 + if err != nil { 144 + return nil, err 145 + } 146 + 147 + log.Warn(context.Background(), "created postgres database", "dbName", dbName) 148 + 149 + realDial := postgres.Open(dbURL) 150 + 151 + return openDB(realDial) 152 + } 153 + 154 + func redactDBURL(dbURL string) string { 155 + u, err := url.Parse(dbURL) 156 + if err != nil { 157 + return "db url is malformed" 158 + } 159 + if u.User != nil { 160 + u.User = url.UserPassword(u.User.Username(), "redacted") 161 + } 162 + return u.String() 163 + }
+358
pkg/statedb/task.go
···
··· 1 + package statedb 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "fmt" 8 + "strings" 9 + "time" 10 + 11 + "gorm.io/gorm" 12 + ) 13 + 14 + // TaskStatus represents the status of a task in the queue 15 + type TaskStatus string 16 + 17 + const ( 18 + TaskStatusPending TaskStatus = "PENDING" 19 + TaskStatusProcessing TaskStatus = "PROCESSING" 20 + TaskStatusCompleted TaskStatus = "COMPLETED" 21 + TaskStatusFailed TaskStatus = "FAILED" 22 + TaskStatusRetrying TaskStatus = "RETRYING" 23 + ) 24 + 25 + // AppTask represents a task in the queue 26 + type AppTask struct { 27 + ID uint `gorm:"column:id;primarykey"` 28 + Type string `gorm:"column:type;not null;index"` 29 + TaskKey *string `gorm:"column:task_key;index:idx_task_dedup,unique"` 30 + Status TaskStatus `gorm:"column:status;not null;index;default:'PENDING'"` 31 + Payload json.RawMessage `gorm:"column:payload;type:jsonb"` 32 + Priority int `gorm:"column:priority;default:0;index"` 33 + TryCount int `gorm:"column:try_count;default:0"` 34 + MaxTries int `gorm:"column:max_tries;default:3"` 35 + LockExpires *time.Time `gorm:"column:lock_expires"` 36 + WorkerID *string `gorm:"column:worker_id"` 37 + Error *string `gorm:"column:error"` 38 + CreatedAt time.Time `gorm:"column:created_at"` 39 + UpdatedAt time.Time `gorm:"column:updated_at"` 40 + ScheduledAt *time.Time `gorm:"column:scheduled_at"` // for delayed tasks 41 + } 42 + 43 + // EnqueueTask adds a new task to the queue 44 + func (state *StatefulDB) EnqueueTask(ctx context.Context, taskType string, payload any, options ...TaskOption) (*AppTask, error) { 45 + payloadBytes, err := json.Marshal(payload) 46 + if err != nil { 47 + return nil, fmt.Errorf("failed to marshal payload: %w", err) 48 + } 49 + 50 + task := &AppTask{ 51 + Type: taskType, 52 + Status: TaskStatusPending, 53 + Payload: payloadBytes, 54 + Priority: 0, 55 + MaxTries: 3, 56 + } 57 + 58 + // Apply options 59 + for _, opt := range options { 60 + opt(task) 61 + } 62 + 63 + // If task has a key, check for deduplication 64 + if task.TaskKey != nil { 65 + existingTask, err := state.GetTaskByKey(ctx, *task.TaskKey) 66 + if err != nil { 67 + return nil, fmt.Errorf("failed to check for existing task: %w", err) 68 + } 69 + if existingTask != nil { 70 + // Task already exists, return the existing one 71 + return existingTask, nil 72 + } 73 + } 74 + 75 + if err := state.DB.WithContext(ctx).Create(task).Error; err != nil { 76 + // Handle unique constraint violation gracefully 77 + if strings.Contains(err.Error(), "duplicate") || strings.Contains(err.Error(), "UNIQUE constraint") { 78 + // Another node beat us to it, try to fetch the existing task 79 + if task.TaskKey != nil { 80 + existingTask, fetchErr := state.GetTaskByKey(ctx, *task.TaskKey) 81 + if fetchErr == nil && existingTask != nil { 82 + return existingTask, nil 83 + } 84 + } 85 + } 86 + return nil, fmt.Errorf("failed to enqueue task: %w", err) 87 + } 88 + 89 + go func() { 90 + select { 91 + case state.pokeQueue <- struct{}{}: 92 + // wake up the queue processor 93 + default: 94 + // queue is already awake, do nothing 95 + } 96 + }() 97 + 98 + return task, nil 99 + } 100 + 101 + // DequeueTask retrieves the next available task from the queue and locks it 102 + func (state *StatefulDB) DequeueTask(ctx context.Context, workerID string, taskTypes ...string) (*AppTask, error) { 103 + var task AppTask 104 + 105 + err := state.DB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { 106 + query := tx.Where("status = ?", TaskStatusPending). 107 + Where("try_count < max_tries"). 108 + Where("(lock_expires IS NULL OR lock_expires < ?)", time.Now()). 109 + Where("(scheduled_at IS NULL OR scheduled_at <= ?)", time.Now()) 110 + 111 + if len(taskTypes) > 0 { 112 + query = query.Where("type IN ?", taskTypes) 113 + } 114 + 115 + // Use raw SQL for PostgreSQL-specific locking 116 + if state.Type == DBTypePostgres { 117 + baseQuery := "SELECT * FROM app_tasks WHERE status = ? AND try_count < max_tries AND (lock_expires IS NULL OR lock_expires < ?) AND (scheduled_at IS NULL OR scheduled_at <= ?)" 118 + if len(taskTypes) > 0 { 119 + baseQuery += " AND type IN ?" 120 + params := []interface{}{TaskStatusPending, time.Now(), time.Now(), taskTypes} 121 + err := tx.Raw(baseQuery+" ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED", params...). 122 + Scan(&task).Error 123 + if err != nil { 124 + return err 125 + } 126 + } else { 127 + err := tx.Raw(baseQuery+" ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED", 128 + TaskStatusPending, time.Now(), time.Now()). 129 + Scan(&task).Error 130 + if err != nil { 131 + return err 132 + } 133 + } 134 + } else { 135 + // Fallback for SQLite (no SKIP LOCKED support) 136 + err := query.Order("priority DESC, created_at ASC").First(&task).Error 137 + if err != nil { 138 + return err 139 + } 140 + } 141 + 142 + if task.ID == 0 { 143 + return gorm.ErrRecordNotFound 144 + } 145 + 146 + // Lock the task 147 + lockExpires := time.Now().Add(30 * time.Minute) // 30-minute lock 148 + updates := map[string]interface{}{ 149 + "status": TaskStatusProcessing, 150 + "worker_id": workerID, 151 + "lock_expires": lockExpires, 152 + "try_count": task.TryCount + 1, 153 + } 154 + 155 + return tx.Model(&task).Updates(updates).Error 156 + }) 157 + 158 + if err != nil { 159 + if errors.Is(err, gorm.ErrRecordNotFound) { 160 + return nil, nil // No tasks available 161 + } 162 + return nil, fmt.Errorf("failed to dequeue task: %w", err) 163 + } 164 + 165 + // Reload the task to get updated fields 166 + if err := state.DB.WithContext(ctx).First(&task, task.ID).Error; err != nil { 167 + return nil, fmt.Errorf("failed to reload task: %w", err) 168 + } 169 + 170 + return &task, nil 171 + } 172 + 173 + // CompleteTask marks a task as completed 174 + func (state *StatefulDB) CompleteTask(ctx context.Context, taskID uint) error { 175 + result := state.DB.WithContext(ctx).Model(&AppTask{}). 176 + Where("id = ?", taskID). 177 + Updates(map[string]interface{}{ 178 + "status": TaskStatusCompleted, 179 + "lock_expires": nil, 180 + "worker_id": nil, 181 + }) 182 + 183 + if result.Error != nil { 184 + return fmt.Errorf("failed to complete task: %w", result.Error) 185 + } 186 + 187 + if result.RowsAffected == 0 { 188 + return errors.New("task not found") 189 + } 190 + 191 + return nil 192 + } 193 + 194 + // FailTask marks a task as failed and optionally retries it 195 + func (state *StatefulDB) FailTask(ctx context.Context, taskID uint, errorMsg string) error { 196 + var task AppTask 197 + err := state.DB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { 198 + if err := tx.First(&task, taskID).Error; err != nil { 199 + return err 200 + } 201 + 202 + updates := map[string]interface{}{ 203 + "error": errorMsg, 204 + "lock_expires": nil, 205 + "worker_id": nil, 206 + } 207 + 208 + if task.TryCount >= task.MaxTries { 209 + updates["status"] = TaskStatusFailed 210 + } else { 211 + updates["status"] = TaskStatusPending 212 + } 213 + 214 + return tx.Model(&task).Updates(updates).Error 215 + }) 216 + 217 + if err != nil { 218 + return fmt.Errorf("failed to mark task as failed: %w", err) 219 + } 220 + 221 + return nil 222 + } 223 + 224 + // ReleaseTask releases a locked task back to the queue (e.g., worker shutdown) 225 + func (state *StatefulDB) ReleaseTask(ctx context.Context, taskID uint) error { 226 + result := state.DB.WithContext(ctx).Model(&AppTask{}). 227 + Where("id = ?", taskID). 228 + Updates(map[string]interface{}{ 229 + "status": TaskStatusPending, 230 + "lock_expires": nil, 231 + "worker_id": nil, 232 + }) 233 + 234 + if result.Error != nil { 235 + return fmt.Errorf("failed to release task: %w", result.Error) 236 + } 237 + 238 + if result.RowsAffected == 0 { 239 + return errors.New("task not found") 240 + } 241 + 242 + return nil 243 + } 244 + 245 + // GetTask retrieves a task by ID 246 + func (state *StatefulDB) GetTask(ctx context.Context, taskID uint) (*AppTask, error) { 247 + var task AppTask 248 + if err := state.DB.WithContext(ctx).First(&task, taskID).Error; err != nil { 249 + if errors.Is(err, gorm.ErrRecordNotFound) { 250 + return nil, nil 251 + } 252 + return nil, fmt.Errorf("failed to get task: %w", err) 253 + } 254 + return &task, nil 255 + } 256 + 257 + // GetTaskByKey retrieves a task by its unique task key 258 + func (state *StatefulDB) GetTaskByKey(ctx context.Context, taskKey string) (*AppTask, error) { 259 + var task AppTask 260 + if err := state.DB.WithContext(ctx).Where("task_key = ?", taskKey).First(&task).Error; err != nil { 261 + if errors.Is(err, gorm.ErrRecordNotFound) { 262 + return nil, nil 263 + } 264 + return nil, fmt.Errorf("failed to get task by key: %w", err) 265 + } 266 + return &task, nil 267 + } 268 + 269 + // ListTasks retrieves tasks with optional filters 270 + func (state *StatefulDB) ListTasks(ctx context.Context, filters TaskFilters) ([]AppTask, error) { 271 + var tasks []AppTask 272 + query := state.DB.WithContext(ctx).Model(&AppTask{}) 273 + 274 + if filters.Status != "" { 275 + query = query.Where("status = ?", filters.Status) 276 + } 277 + if filters.Type != "" { 278 + query = query.Where("type = ?", filters.Type) 279 + } 280 + if filters.TaskKey != "" { 281 + query = query.Where("task_key = ?", filters.TaskKey) 282 + } 283 + if filters.WorkerID != "" { 284 + query = query.Where("worker_id = ?", filters.WorkerID) 285 + } 286 + if filters.Limit > 0 { 287 + query = query.Limit(filters.Limit) 288 + } 289 + if filters.Offset > 0 { 290 + query = query.Offset(filters.Offset) 291 + } 292 + 293 + query = query.Order("created_at DESC") 294 + 295 + if err := query.Find(&tasks).Error; err != nil { 296 + return nil, fmt.Errorf("failed to list tasks: %w", err) 297 + } 298 + 299 + return tasks, nil 300 + } 301 + 302 + // CleanupExpiredLocks releases tasks with expired locks 303 + func (state *StatefulDB) CleanupExpiredLocks(ctx context.Context) (int64, error) { 304 + result := state.DB.WithContext(ctx).Model(&AppTask{}). 305 + Where("status = ? AND lock_expires < ?", TaskStatusProcessing, time.Now()). 306 + Updates(map[string]interface{}{ 307 + "status": TaskStatusPending, 308 + "lock_expires": nil, 309 + "worker_id": nil, 310 + }) 311 + 312 + if result.Error != nil { 313 + return 0, fmt.Errorf("failed to cleanup expired locks: %w", result.Error) 314 + } 315 + 316 + return result.RowsAffected, nil 317 + } 318 + 319 + // TaskOption is a function that configures a task 320 + type TaskOption func(*AppTask) 321 + 322 + // WithPriority sets the task priority (higher numbers = higher priority) 323 + func WithPriority(priority int) TaskOption { 324 + return func(t *AppTask) { 325 + t.Priority = priority 326 + } 327 + } 328 + 329 + // WithMaxTries sets the maximum number of retry attempts 330 + func WithMaxTries(maxTries int) TaskOption { 331 + return func(t *AppTask) { 332 + t.MaxTries = maxTries 333 + } 334 + } 335 + 336 + // WithScheduledAt sets when the task should be processed (for delayed tasks) 337 + func WithScheduledAt(scheduledAt time.Time) TaskOption { 338 + return func(t *AppTask) { 339 + t.ScheduledAt = &scheduledAt 340 + } 341 + } 342 + 343 + // WithTaskKey sets a unique key for task deduplication 344 + func WithTaskKey(taskKey string) TaskOption { 345 + return func(t *AppTask) { 346 + t.TaskKey = &taskKey 347 + } 348 + } 349 + 350 + // TaskFilters holds filters for listing tasks 351 + type TaskFilters struct { 352 + Status TaskStatus 353 + Type string 354 + TaskKey string 355 + WorkerID string 356 + Limit int 357 + Offset int 358 + }