forked from tangled.org/core
Monorepo for Tangled

appview: ingester: process public-key records from firehose

renames jetstream consumer in appview to ingester.

authored by oppi.li and committed by Tangled 90388746 8d3243ee

Changed files
+138 -74
appview
+2 -2
appview/db/pubkeys.go
··· 23 24 func DeletePublicKeyByRkey(e Execer, did, rkey string) error { 25 _, err := e.Exec(` 26 - delete or ignore from public_keys 27 - where did = ? and name = ? and rkey = ?`, 28 did, rkey) 29 return err 30 }
··· 23 24 func DeletePublicKeyByRkey(e Execer, did, rkey string) error { 25 _, err := e.Exec(` 26 + delete from public_keys 27 + where did = ? and rkey = ?`, 28 did, rkey) 29 return err 30 }
+133
appview/ingester.go
···
··· 1 + package appview 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "log" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "github.com/bluesky-social/jetstream/pkg/models" 11 + tangled "tangled.sh/tangled.sh/core/api/tangled" 12 + "tangled.sh/tangled.sh/core/appview/db" 13 + ) 14 + 15 + type Ingester func(ctx context.Context, e *models.Event) error 16 + 17 + func Ingest(d db.DbWrapper) Ingester { 18 + return func(ctx context.Context, e *models.Event) error { 19 + var err error 20 + defer func() { 21 + eventTime := e.TimeUS 22 + lastTimeUs := eventTime + 1 23 + if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 24 + err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 25 + } 26 + }() 27 + 28 + if e.Kind != models.EventKindCommit { 29 + return nil 30 + } 31 + 32 + switch e.Commit.Collection { 33 + case tangled.GraphFollowNSID: 34 + ingestFollow(&d, e) 35 + case tangled.FeedStarNSID: 36 + ingestStar(&d, e) 37 + case tangled.PublicKeyNSID: 38 + ingestPublicKey(&d, e) 39 + } 40 + 41 + return err 42 + } 43 + } 44 + 45 + func ingestStar(d *db.DbWrapper, e *models.Event) error { 46 + var err error 47 + did := e.Did 48 + 49 + switch e.Commit.Operation { 50 + case models.CommitOperationCreate, models.CommitOperationUpdate: 51 + var subjectUri syntax.ATURI 52 + 53 + raw := json.RawMessage(e.Commit.Record) 54 + record := tangled.FeedStar{} 55 + err := json.Unmarshal(raw, &record) 56 + if err != nil { 57 + log.Println("invalid record") 58 + return err 59 + } 60 + 61 + subjectUri, err = syntax.ParseATURI(record.Subject) 62 + if err != nil { 63 + log.Println("invalid record") 64 + return err 65 + } 66 + err = db.AddStar(d, did, subjectUri, e.Commit.RKey) 67 + case models.CommitOperationDelete: 68 + err = db.DeleteStarByRkey(d, did, e.Commit.RKey) 69 + } 70 + 71 + if err != nil { 72 + return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 73 + } 74 + 75 + return nil 76 + } 77 + 78 + func ingestFollow(d *db.DbWrapper, e *models.Event) error { 79 + var err error 80 + did := e.Did 81 + 82 + switch e.Commit.Operation { 83 + case models.CommitOperationCreate, models.CommitOperationUpdate: 84 + raw := json.RawMessage(e.Commit.Record) 85 + record := tangled.GraphFollow{} 86 + err = json.Unmarshal(raw, &record) 87 + if err != nil { 88 + log.Println("invalid record") 89 + return err 90 + } 91 + 92 + subjectDid := record.Subject 93 + err = db.AddFollow(d, did, subjectDid, e.Commit.RKey) 94 + case models.CommitOperationDelete: 95 + err = db.DeleteFollowByRkey(d, did, e.Commit.RKey) 96 + } 97 + 98 + if err != nil { 99 + return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 100 + } 101 + 102 + return nil 103 + } 104 + 105 + func ingestPublicKey(d *db.DbWrapper, e *models.Event) error { 106 + did := e.Did 107 + var err error 108 + 109 + switch e.Commit.Operation { 110 + case models.CommitOperationCreate, models.CommitOperationUpdate: 111 + log.Println("processing add of pubkey") 112 + raw := json.RawMessage(e.Commit.Record) 113 + record := tangled.PublicKey{} 114 + err = json.Unmarshal(raw, &record) 115 + if err != nil { 116 + log.Printf("invalid record: %s", err) 117 + return err 118 + } 119 + 120 + name := record.Name 121 + key := record.Key 122 + err = db.AddPublicKey(d, did, name, key, e.Commit.RKey) 123 + case models.CommitOperationDelete: 124 + log.Println("processing delete of pubkey") 125 + err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey) 126 + } 127 + 128 + if err != nil { 129 + return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 130 + } 131 + 132 + return nil 133 + }
-70
appview/state/jetstream.go
··· 1 - package state 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "fmt" 7 - "log" 8 - 9 - "github.com/bluesky-social/indigo/atproto/syntax" 10 - "github.com/bluesky-social/jetstream/pkg/models" 11 - tangled "tangled.sh/tangled.sh/core/api/tangled" 12 - "tangled.sh/tangled.sh/core/appview/db" 13 - ) 14 - 15 - type Ingester func(ctx context.Context, e *models.Event) error 16 - 17 - func jetstreamIngester(d db.DbWrapper) Ingester { 18 - return func(ctx context.Context, e *models.Event) error { 19 - var err error 20 - defer func() { 21 - eventTime := e.TimeUS 22 - lastTimeUs := eventTime + 1 23 - if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 24 - err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 25 - } 26 - }() 27 - 28 - if e.Kind != models.EventKindCommit { 29 - return nil 30 - } 31 - 32 - did := e.Did 33 - raw := json.RawMessage(e.Commit.Record) 34 - 35 - switch e.Commit.Collection { 36 - case tangled.GraphFollowNSID: 37 - record := tangled.GraphFollow{} 38 - err := json.Unmarshal(raw, &record) 39 - if err != nil { 40 - log.Println("invalid record") 41 - return err 42 - } 43 - err = db.AddFollow(d, did, record.Subject, e.Commit.RKey) 44 - if err != nil { 45 - return fmt.Errorf("failed to add follow to db: %w", err) 46 - } 47 - case tangled.FeedStarNSID: 48 - record := tangled.FeedStar{} 49 - err := json.Unmarshal(raw, &record) 50 - if err != nil { 51 - log.Println("invalid record") 52 - return err 53 - } 54 - 55 - subjectUri, err := syntax.ParseATURI(record.Subject) 56 - 57 - if err != nil { 58 - log.Println("invalid record") 59 - return err 60 - } 61 - 62 - err = db.AddStar(d, did, subjectUri, e.Commit.RKey) 63 - if err != nil { 64 - return fmt.Errorf("failed to add follow to db: %w", err) 65 - } 66 - } 67 - 68 - return err 69 - } 70 - }
···
+2 -2
appview/state/state.go
··· 63 jc, err := jetstream.NewJetstreamClient( 64 config.JetstreamEndpoint, 65 "appview", 66 - []string{tangled.GraphFollowNSID, tangled.FeedStarNSID}, 67 nil, 68 slog.Default(), 69 wrapper, ··· 72 if err != nil { 73 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 74 } 75 - err = jc.StartJetstream(context.Background(), jetstreamIngester(wrapper)) 76 if err != nil { 77 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 78 }
··· 63 jc, err := jetstream.NewJetstreamClient( 64 config.JetstreamEndpoint, 65 "appview", 66 + []string{tangled.GraphFollowNSID, tangled.FeedStarNSID, tangled.PublicKeyNSID}, 67 nil, 68 slog.Default(), 69 wrapper, ··· 72 if err != nil { 73 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 74 } 75 + err = jc.StartJetstream(context.Background(), appview.Ingest(wrapper)) 76 if err != nil { 77 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 78 }
+1
flake.nix
··· 173 TANGLED_DEV=true ${pkgs.air}/bin/air -c /dev/null \ 174 -build.cmd "${pkgs.tailwindcss}/bin/tailwindcss -i input.css -o ./appview/pages/static/tw.css && ${pkgs.go}/bin/go build -o ./out/${name}.out ./cmd/${name}/main.go" \ 175 -build.bin "./out/${name}.out" \ 176 -build.include_ext "go" 177 ''; 178 tailwind-watcher =
··· 173 TANGLED_DEV=true ${pkgs.air}/bin/air -c /dev/null \ 174 -build.cmd "${pkgs.tailwindcss}/bin/tailwindcss -i input.css -o ./appview/pages/static/tw.css && ${pkgs.go}/bin/go build -o ./out/${name}.out ./cmd/${name}/main.go" \ 175 -build.bin "./out/${name}.out" \ 176 + -build.stop_on_error "true" \ 177 -build.include_ext "go" 178 ''; 179 tailwind-watcher =