Monorepo for Tangled tangled.org

appview: setup jetstream client

anirudh.fi 41e895e2 1a81770e

verified
Changed files
+64 -11
appview
+4
appview/db/db.go
··· 48 48 primary key (user_did, subject_did), 49 49 check (user_did <> subject_did) 50 50 ); 51 + create table if not exists _jetstream ( 52 + id integer primary key autoincrement, 53 + last_time_us integer not null 54 + ); 51 55 `) 52 56 if err != nil { 53 57 return nil, err
+5 -5
appview/db/follow.go
··· 9 9 UserDid string 10 10 SubjectDid string 11 11 FollowedAt *time.Time 12 - AtUri string 12 + RKey string 13 13 } 14 14 15 - func (d *DB) AddFollow(userDid, subjectDid, atUri string) error { 16 - query := `insert into follows (user_did, subject_did, at_uri) values (?, ?, ?)` 17 - _, err := d.db.Exec(query, userDid, subjectDid, atUri) 15 + func (d *DB) AddFollow(userDid, subjectDid, rkey string) error { 16 + query := `insert into follows (user_did, subject_did, rkey) values (?, ?, ?)` 17 + _, err := d.db.Exec(query, userDid, subjectDid, rkey) 18 18 return err 19 19 } 20 20 ··· 25 25 26 26 var follow Follow 27 27 var followedAt string 28 - err := row.Scan(&follow.UserDid, &follow.SubjectDid, &followedAt, &follow.AtUri) 28 + err := row.Scan(&follow.UserDid, &follow.SubjectDid, &followedAt, &follow.RKey) 29 29 if err != nil { 30 30 return nil, err 31 31 }
+13
appview/db/jetstream.go
··· 1 + package db 2 + 3 + func (d *DB) SaveLastTimeUs(lastTimeUs int64) error { 4 + _, err := d.db.Exec(`insert into _jetstream (last_time_us) values (?)`, lastTimeUs) 5 + return err 6 + } 7 + 8 + func (d *DB) GetLastTimeUs() (int64, error) { 9 + var lastTimeUs int64 10 + row := d.db.QueryRow(`select last_time_us from _jetstream`) 11 + err := row.Scan(&lastTimeUs) 12 + return lastTimeUs, err 13 + }
+42 -6
appview/state/state.go
··· 1 1 package state 2 2 3 3 import ( 4 + "context" 4 5 "crypto/hmac" 5 6 "crypto/sha256" 6 7 "encoding/hex" 8 + "encoding/json" 7 9 "fmt" 8 10 "log" 9 11 "net/http" ··· 14 16 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 17 "github.com/bluesky-social/indigo/atproto/syntax" 16 18 lexutil "github.com/bluesky-social/indigo/lex/util" 19 + "github.com/bluesky-social/jetstream/pkg/models" 17 20 "github.com/go-chi/chi/v5" 18 21 tangled "github.com/sotangled/tangled/api/tangled" 19 22 "github.com/sotangled/tangled/appview" 20 23 "github.com/sotangled/tangled/appview/auth" 21 24 "github.com/sotangled/tangled/appview/db" 22 25 "github.com/sotangled/tangled/appview/pages" 26 + "github.com/sotangled/tangled/jetstream" 23 27 "github.com/sotangled/tangled/rbac" 24 28 ) 25 29 ··· 30 34 tidClock *syntax.TIDClock 31 35 pages *pages.Pages 32 36 resolver *appview.Resolver 37 + jc *jetstream.JetstreamClient 33 38 } 34 39 35 40 func Make() (*State, error) { ··· 54 59 55 60 resolver := appview.NewResolver() 56 61 62 + jc, err := jetstream.NewJetstreamClient("appview", []string{tangled.GraphFollowNSID}, nil, db) 63 + if err != nil { 64 + return nil, fmt.Errorf("failed to create jetstream client: %w", err) 65 + } 66 + err = jc.StartJetstream(context.Background(), func(ctx context.Context, e *models.Event) error { 67 + did := e.Did 68 + raw := e.Commit.Record 69 + 70 + switch e.Commit.Collection { 71 + case tangled.GraphFollowNSID: 72 + record := tangled.GraphFollow{} 73 + err := json.Unmarshal(raw, &record) 74 + if err != nil { 75 + return err 76 + } 77 + err = db.AddFollow(did, record.Subject, e.Commit.RKey) 78 + if err != nil { 79 + return fmt.Errorf("failed to add follow to db: %w", err) 80 + } 81 + } 82 + 83 + return nil 84 + }) 85 + if err != nil { 86 + return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 87 + } 88 + 57 89 state := &State{ 58 90 db, 59 - auth, enforcer, clock, pgs, resolver, 91 + auth, 92 + enforcer, 93 + clock, 94 + pgs, 95 + resolver, 96 + jc, 60 97 } 61 98 62 99 return state, nil ··· 554 591 switch r.Method { 555 592 case http.MethodPost: 556 593 createdAt := time.Now().Format(time.RFC3339) 594 + rkey := s.TID() 557 595 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 558 596 Collection: tangled.GraphFollowNSID, 559 597 Repo: currentUser.Did, 560 - Rkey: s.TID(), 598 + Rkey: rkey, 561 599 Record: &lexutil.LexiconTypeDecoder{ 562 600 Val: &tangled.GraphFollow{ 563 601 Subject: subjectIdent.DID.String(), ··· 569 607 return 570 608 } 571 609 572 - err = s.db.AddFollow(currentUser.Did, subjectIdent.DID.String(), resp.Uri) 610 + err = s.db.AddFollow(currentUser.Did, subjectIdent.DID.String(), rkey) 573 611 if err != nil { 574 612 log.Println("failed to follow", err) 575 613 return ··· 587 625 return 588 626 } 589 627 590 - existingRecordUri, _ := syntax.ParseATURI(follow.AtUri) 591 - 592 628 resp, err := comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 593 629 Collection: tangled.GraphFollowNSID, 594 630 Repo: currentUser.Did, 595 - Rkey: existingRecordUri.RecordKey().String(), 631 + Rkey: follow.RKey, 596 632 }) 597 633 598 634 log.Println(resp.Commit.Cid)