Monorepo for Tangled tangled.org

appview: upsert star/reaction/follow records #998

open opened by boltless.me targeting master from sl/sqkrqopzkvoo

Most service flow will be:

  1. start db transaction
  2. run db operation
  3. run PDS operation
  4. rollback db if anything above failed
  5. commit transaction

If PDS operation succeed, don't try rollback anymore. The ingester will backfill the missed db operations.

Signed-off-by: Seongmin Lee git@boltless.me

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3mcsfwwhtvl22
+143 -63
Diff #0
+12 -3
appview/db/follow.go
··· 11 11 "tangled.org/core/orm" 12 12 ) 13 13 14 - func AddFollow(e Execer, follow *models.Follow) error { 15 - query := `insert or ignore into follows (did, subject_did, rkey) values (?, ?, ?)` 16 - _, err := e.Exec(query, follow.UserDid, follow.SubjectDid, follow.Rkey) 14 + func UpsertFollow(e Execer, follow models.Follow) error { 15 + _, err := e.Exec( 16 + `insert into follows (did, rkey, subject_did, created) 17 + values (?, ?, ?, ?) 18 + on conflict(did, rkey) do update set 19 + subject_did = excluded.subject_did, 20 + created = excluded.created`, 21 + follow.UserDid, 22 + follow.Rkey, 23 + follow.SubjectDid, 24 + follow.FollowedAt.Format(time.RFC3339), 25 + ) 17 26 return err 18 27 } 19 28
+14 -3
appview/db/reaction.go
··· 9 9 "tangled.org/core/appview/models" 10 10 ) 11 11 12 - func AddReaction(e Execer, did string, subjectAt syntax.ATURI, kind models.ReactionKind, rkey string) error { 13 - query := `insert or ignore into reactions (did, subject_at, kind, rkey) values (?, ?, ?, ?)` 14 - _, err := e.Exec(query, did, subjectAt, kind, rkey) 12 + func UpsertReaction(e Execer, reaction models.Reaction) error { 13 + _, err := e.Exec( 14 + `insert into reactions (did, rkey, subject_at, kind, created) 15 + values (?, ?, ?, ?, ?) 16 + on conflict(did, rkey) do update set 17 + subject_at = excluded.subject_at, 18 + kind = excluded.kind, 19 + created = excluded.created`, 20 + reaction.ReactedByDid, 21 + reaction.Rkey, 22 + reaction.ThreadAt, 23 + reaction.Kind, 24 + reaction.Created.Format(time.RFC3339), 25 + ) 15 26 return err 16 27 } 17 28
+8 -4
appview/db/star.go
··· 13 13 "tangled.org/core/orm" 14 14 ) 15 15 16 - func AddStar(e Execer, star *models.Star) error { 17 - query := `insert or ignore into stars (did, subject_at, rkey) values (?, ?, ?)` 16 + func UpsertStar(e Execer, star models.Star) error { 18 17 _, err := e.Exec( 19 - query, 18 + `insert into stars (did, rkey, subject_at, created) 19 + values (?, ?, ?, ?) 20 + on conflict(did, rkey) do update set 21 + subject_at = excluded.subject_at, 22 + created = excluded.created`, 20 23 star.Did, 21 - star.RepoAt.String(), 22 24 star.Rkey, 25 + star.RepoAt, 26 + star.Created.Format(time.RFC3339), 23 27 ) 24 28 return err 25 29 }
+2 -2
appview/ingester.go
··· 119 119 l.Error("invalid record", "err", err) 120 120 return err 121 121 } 122 - err = db.AddStar(i.Db, &models.Star{ 122 + err = db.UpsertStar(i.Db, models.Star{ 123 123 Did: did, 124 124 RepoAt: subjectUri, 125 125 Rkey: e.Commit.RKey, ··· 152 152 return err 153 153 } 154 154 155 - err = db.AddFollow(i.Db, &models.Follow{ 155 + err = db.UpsertFollow(i.Db, models.Follow{ 156 156 UserDid: did, 157 157 SubjectDid: record.Subject, 158 158 Rkey: e.Commit.RKey,
+9
appview/models/follow.go
··· 2 2 3 3 import ( 4 4 "time" 5 + 6 + "tangled.org/core/api/tangled" 5 7 ) 6 8 7 9 type Follow struct { ··· 11 13 Rkey string 12 14 } 13 15 16 + func (f *Follow) AsRecord() tangled.GraphFollow { 17 + return tangled.GraphFollow{ 18 + Subject: f.SubjectDid, 19 + CreatedAt: f.FollowedAt.Format(time.RFC3339), 20 + } 21 + } 22 + 14 23 type FollowStats struct { 15 24 Followers int64 16 25 Following int64
+9
appview/models/reaction.go
··· 4 4 "time" 5 5 6 6 "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/api/tangled" 7 8 ) 8 9 9 10 type ReactionKind string ··· 56 57 Kind ReactionKind 57 58 } 58 59 60 + func (r *Reaction) AsRecord() tangled.FeedReaction { 61 + return tangled.FeedReaction{ 62 + Subject: r.ThreadAt.String(), 63 + Reaction: r.Kind.String(), 64 + CreatedAt: r.Created.Format(time.RFC3339), 65 + } 66 + } 67 + 59 68 type ReactionDisplayData struct { 60 69 Count int 61 70 Users []string
+8
appview/models/star.go
··· 4 4 "time" 5 5 6 6 "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/api/tangled" 7 8 ) 8 9 9 10 type Star struct { ··· 13 14 Rkey string 14 15 } 15 16 17 + func (s *Star) AsRecord() tangled.FeedStar { 18 + return tangled.FeedStar{ 19 + Subject: s.RepoAt.String(), 20 + CreatedAt: s.Created.Format(time.RFC3339), 21 + } 22 + } 23 + 16 24 // RepoStar is used for reverse mapping to repos 17 25 type RepoStar struct { 18 26 Star
+27 -19
appview/state/follow.go
··· 43 43 44 44 switch r.Method { 45 45 case http.MethodPost: 46 - createdAt := time.Now().Format(time.RFC3339) 47 - rkey := tid.TID() 46 + follow := models.Follow{ 47 + UserDid: currentUser.Active.Did, 48 + SubjectDid: subjectIdent.DID.String(), 49 + Rkey: tid.TID(), 50 + FollowedAt: time.Now(), 51 + } 52 + 53 + tx, err := s.db.BeginTx(r.Context(), nil) 54 + if err != nil { 55 + s.logger.Error("failed to start transaction", "err", err) 56 + return 57 + } 58 + defer tx.Rollback() 59 + 60 + if err := db.UpsertFollow(tx, follow); err != nil { 61 + s.logger.Error("failed to follow", "err", err) 62 + return 63 + } 64 + 65 + record := follow.AsRecord() 48 66 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 49 67 Collection: tangled.GraphFollowNSID, 50 68 Repo: currentUser.Active.Did, 51 - Rkey: rkey, 69 + Rkey: follow.Rkey, 52 70 Record: &lexutil.LexiconTypeDecoder{ 53 - Val: &tangled.GraphFollow{ 54 - Subject: subjectIdent.DID.String(), 55 - CreatedAt: createdAt, 56 - }}, 71 + Val: &record, 72 + }, 57 73 }) 58 74 if err != nil { 59 75 log.Println("failed to create atproto record", err) 60 76 return 61 77 } 62 - 63 78 log.Println("created atproto record: ", resp.Uri) 64 79 65 - follow := &models.Follow{ 66 - UserDid: currentUser.Active.Did, 67 - SubjectDid: subjectIdent.DID.String(), 68 - Rkey: rkey, 69 - } 70 - 71 - err = db.AddFollow(s.db, follow) 72 - if err != nil { 73 - log.Println("failed to follow", err) 74 - return 80 + if err := tx.Commit(); err != nil { 81 + s.logger.Error("failed to commit transaction", "err", err) 82 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 75 83 } 76 84 77 - s.notifier.NewFollow(r.Context(), follow) 85 + s.notifier.NewFollow(r.Context(), &follow) 78 86 79 87 followStats, err := db.GetFollowerFollowingCount(s.db, subjectIdent.DID.String()) 80 88 if err != nil {
+27 -14
appview/state/reaction.go
··· 45 45 46 46 switch r.Method { 47 47 case http.MethodPost: 48 - createdAt := time.Now().Format(time.RFC3339) 49 - rkey := tid.TID() 48 + reaction := models.Reaction{ 49 + ReactedByDid: currentUser.Active.Did, 50 + Rkey: tid.TID(), 51 + Kind: reactionKind, 52 + ThreadAt: subjectUri, 53 + Created: time.Now(), 54 + } 55 + 56 + tx, err := s.db.BeginTx(r.Context(), nil) 57 + if err != nil { 58 + s.logger.Error("failed to start transaction", "err", err) 59 + return 60 + } 61 + defer tx.Rollback() 62 + 63 + if err := db.UpsertReaction(tx, reaction); err != nil { 64 + log.Println("failed to react", err) 65 + return 66 + } 67 + 68 + record := reaction.AsRecord() 50 69 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 51 70 Collection: tangled.FeedReactionNSID, 52 71 Repo: currentUser.Active.Did, 53 - Rkey: rkey, 72 + Rkey: reaction.Rkey, 54 73 Record: &lexutil.LexiconTypeDecoder{ 55 - Val: &tangled.FeedReaction{ 56 - Subject: subjectUri.String(), 57 - Reaction: reactionKind.String(), 58 - CreatedAt: createdAt, 59 - }, 74 + Val: &record, 60 75 }, 61 76 }) 62 77 if err != nil { 63 78 log.Println("failed to create atproto record", err) 64 79 return 65 80 } 81 + log.Println("created atproto record: ", resp.Uri) 66 82 67 - err = db.AddReaction(s.db, currentUser.Active.Did, subjectUri, reactionKind, rkey) 68 - if err != nil { 69 - log.Println("failed to react", err) 70 - return 83 + if err := tx.Commit(); err != nil { 84 + s.logger.Error("failed to commit transaction", "err", err) 85 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 71 86 } 72 87 73 88 reactionMap, err := db.GetReactionMap(s.db, 20, subjectUri) ··· 75 90 log.Println("failed to get reactions for ", subjectUri) 76 91 } 77 92 78 - log.Println("created atproto record: ", resp.Uri) 79 - 80 93 s.pages.ThreadReactionFragment(w, pages.ThreadReactionFragmentParams{ 81 94 ThreadAt: subjectUri, 82 95 Kind: reactionKind,
+27 -18
appview/state/star.go
··· 38 38 39 39 switch r.Method { 40 40 case http.MethodPost: 41 - createdAt := time.Now().Format(time.RFC3339) 42 - rkey := tid.TID() 41 + star := models.Star{ 42 + Did: currentUser.Active.Did, 43 + Rkey: tid.TID(), 44 + RepoAt: subjectUri, 45 + Created: time.Now(), 46 + } 47 + 48 + tx, err := s.db.BeginTx(r.Context(), nil) 49 + if err != nil { 50 + s.logger.Error("failed to start transaction", "err", err) 51 + return 52 + } 53 + defer tx.Rollback() 54 + 55 + if err := db.UpsertStar(tx, star); err != nil { 56 + s.logger.Error("failed to star", "err", err) 57 + return 58 + } 59 + 60 + record := star.AsRecord() 43 61 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 44 62 Collection: tangled.FeedStarNSID, 45 63 Repo: currentUser.Active.Did, 46 - Rkey: rkey, 64 + Rkey: star.Rkey, 47 65 Record: &lexutil.LexiconTypeDecoder{ 48 - Val: &tangled.FeedStar{ 49 - Subject: subjectUri.String(), 50 - CreatedAt: createdAt, 51 - }}, 66 + Val: &record, 67 + }, 52 68 }) 53 69 if err != nil { 54 70 log.Println("failed to create atproto record", err) ··· 56 72 } 57 73 log.Println("created atproto record: ", resp.Uri) 58 74 59 - star := &models.Star{ 60 - Did: currentUser.Active.Did, 61 - RepoAt: subjectUri, 62 - Rkey: rkey, 75 + if err := tx.Commit(); err != nil { 76 + s.logger.Error("failed to commit transaction", "err", err) 77 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 63 78 } 64 79 65 - err = db.AddStar(s.db, star) 66 - if err != nil { 67 - log.Println("failed to star", err) 68 - return 69 - } 80 + s.notifier.NewStar(r.Context(), &star) 70 81 71 82 starCount, err := db.GetStarCount(s.db, subjectUri) 72 83 if err != nil { 73 84 log.Println("failed to get star count for ", subjectUri) 74 85 } 75 86 76 - s.notifier.NewStar(r.Context(), star) 77 - 78 87 s.pages.StarBtnFragment(w, pages.StarBtnFragmentParams{ 79 88 IsStarred: true, 80 89 SubjectAt: subjectUri,

Submissions

sign up or login to add to the discussion
boltless.me submitted #0
1 commit
expand
appview: upsert star/reaction/follow records
3/3 success
expand
no conflicts, ready to merge