forked from tangled.org/core
Monorepo for Tangled

spindle: setup jetstream ingester

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

authored by anirudh.fi and committed by Tangled 504e391a b0e291db

Changed files
+156 -25
spindle
+3 -1
spindle/config/config.go
··· 12 12 Hostname string `env:"HOSTNAME, required"` 13 13 JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"` 14 14 Dev bool `env:"DEV, default=false"` 15 + Owner string `env:"OWNER, required"` 15 16 } 16 17 17 18 type Config struct { 18 - Server Server `env:",prefix=SPINDLE_SERVER_"` 19 + Server Server `env:",prefix=SPINDLE_SERVER_"` 20 + Knots []string `env:"SPINDLE_SUBSCRIBED_KNOTS,required"` 19 21 } 20 22 21 23 func Load(ctx context.Context) (*Config, error) {
+87
spindle/ingester.go
··· 1 + package spindle 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + 8 + "github.com/bluesky-social/jetstream/pkg/models" 9 + "tangled.sh/tangled.sh/core/api/tangled" 10 + ) 11 + 12 + type Ingester func(ctx context.Context, e *models.Event) error 13 + 14 + func (s *Spindle) ingest() Ingester { 15 + return func(ctx context.Context, e *models.Event) error { 16 + var err error 17 + defer func() { 18 + eventTime := e.TimeUS 19 + lastTimeUs := eventTime + 1 20 + if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 21 + err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 22 + } 23 + }() 24 + 25 + if e.Kind != models.EventKindCommit { 26 + return nil 27 + } 28 + 29 + switch e.Commit.Collection { 30 + case tangled.SpindleMemberNSID: 31 + s.ingestMember(ctx, e) 32 + } 33 + 34 + return err 35 + } 36 + } 37 + 38 + func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 39 + did := e.Did 40 + var err error 41 + 42 + l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 43 + 44 + switch e.Commit.Operation { 45 + case models.CommitOperationCreate, models.CommitOperationUpdate: 46 + raw := e.Commit.Record 47 + record := tangled.SpindleMember{} 48 + err = json.Unmarshal(raw, &record) 49 + if err != nil { 50 + l.Error("invalid record", "error", err) 51 + return err 52 + } 53 + 54 + domain := s.cfg.Server.Hostname 55 + if s.cfg.Server.Dev { 56 + domain = s.cfg.Server.ListenAddr 57 + } 58 + recordInstance := *record.Instance 59 + 60 + if recordInstance != domain { 61 + l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 62 + return fmt.Errorf("domain mismatch: %s != %s", *record.Instance, domain) 63 + } 64 + 65 + ok, err := s.e.E.Enforce(did, rbacDomain, rbacDomain, "server:invite") 66 + if err != nil || !ok { 67 + l.Error("failed to add member", "did", did) 68 + return fmt.Errorf("failed to enforce permissions: %w", err) 69 + } 70 + 71 + if err := s.e.AddMember(rbacDomain, record.Subject); err != nil { 72 + l.Error("failed to add member", "error", err) 73 + return fmt.Errorf("failed to add member: %w", err) 74 + } 75 + l.Info("added member from firehose", "member", record.Subject) 76 + 77 + if err := s.db.AddDid(did); err != nil { 78 + l.Error("failed to add did", "error", err) 79 + return fmt.Errorf("failed to add did: %w", err) 80 + } 81 + s.jc.AddDid(did) 82 + 83 + return nil 84 + 85 + } 86 + return nil 87 + }
+66 -24
spindle/server.go
··· 22 22 "tangled.sh/tangled.sh/core/spindle/queue" 23 23 ) 24 24 25 + const ( 26 + rbacDomain = "thisserver" 27 + ) 28 + 25 29 type Spindle struct { 26 30 jc *jetstream.JetstreamClient 27 31 db *db.DB ··· 30 34 n *notifier.Notifier 31 35 eng *engine.Engine 32 36 jq *queue.Queue 37 + cfg *config.Config 33 38 } 34 39 35 40 func Run(ctx context.Context) error { 41 + logger := log.FromContext(ctx) 42 + 36 43 cfg, err := config.Load(ctx) 37 44 if err != nil { 38 45 return fmt.Errorf("failed to load config: %w", err) ··· 47 54 if err != nil { 48 55 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 49 56 } 57 + e.E.EnableAutoSave(true) 50 58 51 - logger := log.FromContext(ctx) 59 + n := notifier.New() 52 60 53 - collections := []string{tangled.SpindleMemberNSID} 54 - jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false) 55 - if err != nil { 56 - return fmt.Errorf("failed to setup jetstream client: %w", err) 57 - } 58 - 59 - n := notifier.New() 60 61 eng, err := engine.New(ctx, d, &n) 61 62 if err != nil { 62 63 return err ··· 64 65 65 66 jq := queue.NewQueue(100, 2) 66 67 67 - // starts a job queue runner in the background 68 - jq.Start() 69 - defer jq.Stop() 68 + collections := []string{tangled.SpindleMemberNSID} 69 + jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, false, false) 70 + if err != nil { 71 + return fmt.Errorf("failed to setup jetstream client: %w", err) 72 + } 70 73 71 74 spindle := Spindle{ 72 75 jc: jc, ··· 76 79 n: &n, 77 80 eng: eng, 78 81 jq: jq, 82 + cfg: cfg, 79 83 } 80 84 81 - // for each incoming sh.tangled.pipeline, we execute 82 - // spindle.processPipeline, which in turn enqueues the pipeline 83 - // job in the above registered queue. 85 + err = e.AddDomain(rbacDomain) 86 + if err != nil { 87 + return fmt.Errorf("failed to set rbac domain: %w", err) 88 + } 89 + err = spindle.configureOwner() 90 + if err != nil { 91 + return err 92 + } 93 + logger.Info("owner set", "did", cfg.Server.Owner) 94 + 95 + // starts a job queue runner in the background 96 + jq.Start() 97 + defer jq.Stop() 98 + 84 99 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 85 100 if err != nil { 86 101 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 87 102 } 88 - go func() { 89 - logger.Info("starting event consumer") 90 - knotEventSource := knotclient.NewEventSource("localhost:6000") 91 103 92 - ccfg := knotclient.NewConsumerConfig() 93 - ccfg.Logger = logger 94 - ccfg.Dev = cfg.Server.Dev 95 - ccfg.ProcessFunc = spindle.processPipeline 96 - ccfg.CursorStore = cursorStore 97 - ccfg.AddEventSource(knotEventSource) 104 + err = jc.StartJetstream(ctx, spindle.ingest()) 105 + if err != nil { 106 + return fmt.Errorf("failed to start jetstream consumer: %w", err) 107 + } 98 108 99 - ec := knotclient.NewEventConsumer(*ccfg) 109 + // for each incoming sh.tangled.pipeline, we execute 110 + // spindle.processPipeline, which in turn enqueues the pipeline 111 + // job in the above registered queue. 100 112 113 + ccfg := knotclient.NewConsumerConfig() 114 + ccfg.Logger = logger 115 + ccfg.Dev = cfg.Server.Dev 116 + ccfg.ProcessFunc = spindle.processPipeline 117 + ccfg.CursorStore = cursorStore 118 + for _, knot := range spindle.cfg.Knots { 119 + kes := knotclient.NewEventSource(knot) 120 + ccfg.AddEventSource(kes) 121 + } 122 + ec := knotclient.NewEventConsumer(*ccfg) 123 + 124 + go func() { 125 + logger.Info("starting knot event consumer", "knots", spindle.cfg.Knots) 101 126 ec.Start(ctx) 102 127 }() 103 128 ··· 159 184 160 185 return nil 161 186 } 187 + 188 + func (s *Spindle) configureOwner() error { 189 + cfgOwner := s.cfg.Server.Owner 190 + serverOwner, err := s.e.GetUserByRole("server:owner", rbacDomain) 191 + if err != nil { 192 + return fmt.Errorf("failed to fetch server:owner: %w", err) 193 + } 194 + 195 + if len(serverOwner) == 0 { 196 + s.e.AddOwner(rbacDomain, cfgOwner) 197 + } else { 198 + if serverOwner[0] != cfgOwner { 199 + return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0]) 200 + } 201 + } 202 + return nil 203 + }