Monorepo for Tangled tangled.org

appview: upsert star/reaction/follow records #998

open opened by boltless.me targeting master from sl/sqkrqopzkvoo

Most service flow will be:

  1. start db transaction
  2. run db operation
  3. run PDS operation
  4. rollback db if anything above failed
  5. commit transaction

If PDS operation succeed, don't try rollback anymore. The ingester will backfill the missed db operations.

Signed-off-by: Seongmin Lee git@boltless.me

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3mcsfwwhtvl22
+143 -63
Diff #1
+12 -3
appview/db/follow.go
··· 11 "tangled.org/core/orm" 12 ) 13 14 - func AddFollow(e Execer, follow *models.Follow) error { 15 - query := `insert or ignore into follows (did, subject_did, rkey) values (?, ?, ?)` 16 - _, err := e.Exec(query, follow.UserDid, follow.SubjectDid, follow.Rkey) 17 return err 18 } 19
··· 11 "tangled.org/core/orm" 12 ) 13 14 + func UpsertFollow(e Execer, follow models.Follow) error { 15 + _, err := e.Exec( 16 + `insert into follows (did, rkey, subject_did, created) 17 + values (?, ?, ?, ?) 18 + on conflict(did, rkey) do update set 19 + subject_did = excluded.subject_did, 20 + created = excluded.created`, 21 + follow.UserDid, 22 + follow.Rkey, 23 + follow.SubjectDid, 24 + follow.FollowedAt.Format(time.RFC3339), 25 + ) 26 return err 27 } 28
+14 -3
appview/db/reaction.go
··· 9 "tangled.org/core/appview/models" 10 ) 11 12 - func AddReaction(e Execer, did string, subjectAt syntax.ATURI, kind models.ReactionKind, rkey string) error { 13 - query := `insert or ignore into reactions (did, subject_at, kind, rkey) values (?, ?, ?, ?)` 14 - _, err := e.Exec(query, did, subjectAt, kind, rkey) 15 return err 16 } 17
··· 9 "tangled.org/core/appview/models" 10 ) 11 12 + func UpsertReaction(e Execer, reaction models.Reaction) error { 13 + _, err := e.Exec( 14 + `insert into reactions (did, rkey, subject_at, kind, created) 15 + values (?, ?, ?, ?, ?) 16 + on conflict(did, rkey) do update set 17 + subject_at = excluded.subject_at, 18 + kind = excluded.kind, 19 + created = excluded.created`, 20 + reaction.ReactedByDid, 21 + reaction.Rkey, 22 + reaction.ThreadAt, 23 + reaction.Kind, 24 + reaction.Created.Format(time.RFC3339), 25 + ) 26 return err 27 } 28
+8 -4
appview/db/star.go
··· 13 "tangled.org/core/orm" 14 ) 15 16 - func AddStar(e Execer, star *models.Star) error { 17 - query := `insert or ignore into stars (did, subject_at, rkey) values (?, ?, ?)` 18 _, err := e.Exec( 19 - query, 20 star.Did, 21 - star.RepoAt.String(), 22 star.Rkey, 23 ) 24 return err 25 }
··· 13 "tangled.org/core/orm" 14 ) 15 16 + func UpsertStar(e Execer, star models.Star) error { 17 _, err := e.Exec( 18 + `insert into stars (did, rkey, subject_at, created) 19 + values (?, ?, ?, ?) 20 + on conflict(did, rkey) do update set 21 + subject_at = excluded.subject_at, 22 + created = excluded.created`, 23 star.Did, 24 star.Rkey, 25 + star.RepoAt, 26 + star.Created.Format(time.RFC3339), 27 ) 28 return err 29 }
+2 -2
appview/ingester.go
··· 119 l.Error("invalid record", "err", err) 120 return err 121 } 122 - err = db.AddStar(i.Db, &models.Star{ 123 Did: did, 124 RepoAt: subjectUri, 125 Rkey: e.Commit.RKey, ··· 152 return err 153 } 154 155 - err = db.AddFollow(i.Db, &models.Follow{ 156 UserDid: did, 157 SubjectDid: record.Subject, 158 Rkey: e.Commit.RKey,
··· 119 l.Error("invalid record", "err", err) 120 return err 121 } 122 + err = db.UpsertStar(i.Db, models.Star{ 123 Did: did, 124 RepoAt: subjectUri, 125 Rkey: e.Commit.RKey, ··· 152 return err 153 } 154 155 + err = db.UpsertFollow(i.Db, models.Follow{ 156 UserDid: did, 157 SubjectDid: record.Subject, 158 Rkey: e.Commit.RKey,
+9
appview/models/follow.go
··· 2 3 import ( 4 "time" 5 ) 6 7 type Follow struct { ··· 11 Rkey string 12 } 13 14 type FollowStats struct { 15 Followers int64 16 Following int64
··· 2 3 import ( 4 "time" 5 + 6 + "tangled.org/core/api/tangled" 7 ) 8 9 type Follow struct { ··· 13 Rkey string 14 } 15 16 + func (f *Follow) AsRecord() tangled.GraphFollow { 17 + return tangled.GraphFollow{ 18 + Subject: f.SubjectDid, 19 + CreatedAt: f.FollowedAt.Format(time.RFC3339), 20 + } 21 + } 22 + 23 type FollowStats struct { 24 Followers int64 25 Following int64
+9
appview/models/reaction.go
··· 4 "time" 5 6 "github.com/bluesky-social/indigo/atproto/syntax" 7 ) 8 9 type ReactionKind string ··· 56 Kind ReactionKind 57 } 58 59 type ReactionDisplayData struct { 60 Count int 61 Users []string
··· 4 "time" 5 6 "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/api/tangled" 8 ) 9 10 type ReactionKind string ··· 57 Kind ReactionKind 58 } 59 60 + func (r *Reaction) AsRecord() tangled.FeedReaction { 61 + return tangled.FeedReaction{ 62 + Subject: r.ThreadAt.String(), 63 + Reaction: r.Kind.String(), 64 + CreatedAt: r.Created.Format(time.RFC3339), 65 + } 66 + } 67 + 68 type ReactionDisplayData struct { 69 Count int 70 Users []string
+8
appview/models/star.go
··· 4 "time" 5 6 "github.com/bluesky-social/indigo/atproto/syntax" 7 ) 8 9 type Star struct { ··· 13 Rkey string 14 } 15 16 // RepoStar is used for reverse mapping to repos 17 type RepoStar struct { 18 Star
··· 4 "time" 5 6 "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/api/tangled" 8 ) 9 10 type Star struct { ··· 14 Rkey string 15 } 16 17 + func (s *Star) AsRecord() tangled.FeedStar { 18 + return tangled.FeedStar{ 19 + Subject: s.RepoAt.String(), 20 + CreatedAt: s.Created.Format(time.RFC3339), 21 + } 22 + } 23 + 24 // RepoStar is used for reverse mapping to repos 25 type RepoStar struct { 26 Star
+27 -19
appview/state/follow.go
··· 43 44 switch r.Method { 45 case http.MethodPost: 46 - createdAt := time.Now().Format(time.RFC3339) 47 - rkey := tid.TID() 48 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 49 Collection: tangled.GraphFollowNSID, 50 Repo: currentUser.Active.Did, 51 - Rkey: rkey, 52 Record: &lexutil.LexiconTypeDecoder{ 53 - Val: &tangled.GraphFollow{ 54 - Subject: subjectIdent.DID.String(), 55 - CreatedAt: createdAt, 56 - }}, 57 }) 58 if err != nil { 59 log.Println("failed to create atproto record", err) 60 return 61 } 62 - 63 log.Println("created atproto record: ", resp.Uri) 64 65 - follow := &models.Follow{ 66 - UserDid: currentUser.Active.Did, 67 - SubjectDid: subjectIdent.DID.String(), 68 - Rkey: rkey, 69 - } 70 - 71 - err = db.AddFollow(s.db, follow) 72 - if err != nil { 73 - log.Println("failed to follow", err) 74 - return 75 } 76 77 - s.notifier.NewFollow(r.Context(), follow) 78 79 followStats, err := db.GetFollowerFollowingCount(s.db, subjectIdent.DID.String()) 80 if err != nil {
··· 43 44 switch r.Method { 45 case http.MethodPost: 46 + follow := models.Follow{ 47 + UserDid: currentUser.Active.Did, 48 + SubjectDid: subjectIdent.DID.String(), 49 + Rkey: tid.TID(), 50 + FollowedAt: time.Now(), 51 + } 52 + 53 + tx, err := s.db.BeginTx(r.Context(), nil) 54 + if err != nil { 55 + s.logger.Error("failed to start transaction", "err", err) 56 + return 57 + } 58 + defer tx.Rollback() 59 + 60 + if err := db.UpsertFollow(tx, follow); err != nil { 61 + s.logger.Error("failed to follow", "err", err) 62 + return 63 + } 64 + 65 + record := follow.AsRecord() 66 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 67 Collection: tangled.GraphFollowNSID, 68 Repo: currentUser.Active.Did, 69 + Rkey: follow.Rkey, 70 Record: &lexutil.LexiconTypeDecoder{ 71 + Val: &record, 72 + }, 73 }) 74 if err != nil { 75 log.Println("failed to create atproto record", err) 76 return 77 } 78 log.Println("created atproto record: ", resp.Uri) 79 80 + if err := tx.Commit(); err != nil { 81 + s.logger.Error("failed to commit transaction", "err", err) 82 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 83 } 84 85 + s.notifier.NewFollow(r.Context(), &follow) 86 87 followStats, err := db.GetFollowerFollowingCount(s.db, subjectIdent.DID.String()) 88 if err != nil {
+27 -14
appview/state/reaction.go
··· 45 46 switch r.Method { 47 case http.MethodPost: 48 - createdAt := time.Now().Format(time.RFC3339) 49 - rkey := tid.TID() 50 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 51 Collection: tangled.FeedReactionNSID, 52 Repo: currentUser.Active.Did, 53 - Rkey: rkey, 54 Record: &lexutil.LexiconTypeDecoder{ 55 - Val: &tangled.FeedReaction{ 56 - Subject: subjectUri.String(), 57 - Reaction: reactionKind.String(), 58 - CreatedAt: createdAt, 59 - }, 60 }, 61 }) 62 if err != nil { 63 log.Println("failed to create atproto record", err) 64 return 65 } 66 67 - err = db.AddReaction(s.db, currentUser.Active.Did, subjectUri, reactionKind, rkey) 68 - if err != nil { 69 - log.Println("failed to react", err) 70 - return 71 } 72 73 reactionMap, err := db.GetReactionMap(s.db, 20, subjectUri) ··· 75 log.Println("failed to get reactions for ", subjectUri) 76 } 77 78 - log.Println("created atproto record: ", resp.Uri) 79 - 80 s.pages.ThreadReactionFragment(w, pages.ThreadReactionFragmentParams{ 81 ThreadAt: subjectUri, 82 Kind: reactionKind,
··· 45 46 switch r.Method { 47 case http.MethodPost: 48 + reaction := models.Reaction{ 49 + ReactedByDid: currentUser.Active.Did, 50 + Rkey: tid.TID(), 51 + Kind: reactionKind, 52 + ThreadAt: subjectUri, 53 + Created: time.Now(), 54 + } 55 + 56 + tx, err := s.db.BeginTx(r.Context(), nil) 57 + if err != nil { 58 + s.logger.Error("failed to start transaction", "err", err) 59 + return 60 + } 61 + defer tx.Rollback() 62 + 63 + if err := db.UpsertReaction(tx, reaction); err != nil { 64 + log.Println("failed to react", err) 65 + return 66 + } 67 + 68 + record := reaction.AsRecord() 69 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 70 Collection: tangled.FeedReactionNSID, 71 Repo: currentUser.Active.Did, 72 + Rkey: reaction.Rkey, 73 Record: &lexutil.LexiconTypeDecoder{ 74 + Val: &record, 75 }, 76 }) 77 if err != nil { 78 log.Println("failed to create atproto record", err) 79 return 80 } 81 + log.Println("created atproto record: ", resp.Uri) 82 83 + if err := tx.Commit(); err != nil { 84 + s.logger.Error("failed to commit transaction", "err", err) 85 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 86 } 87 88 reactionMap, err := db.GetReactionMap(s.db, 20, subjectUri) ··· 90 log.Println("failed to get reactions for ", subjectUri) 91 } 92 93 s.pages.ThreadReactionFragment(w, pages.ThreadReactionFragmentParams{ 94 ThreadAt: subjectUri, 95 Kind: reactionKind,
+27 -18
appview/state/star.go
··· 38 39 switch r.Method { 40 case http.MethodPost: 41 - createdAt := time.Now().Format(time.RFC3339) 42 - rkey := tid.TID() 43 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 44 Collection: tangled.FeedStarNSID, 45 Repo: currentUser.Active.Did, 46 - Rkey: rkey, 47 Record: &lexutil.LexiconTypeDecoder{ 48 - Val: &tangled.FeedStar{ 49 - Subject: subjectUri.String(), 50 - CreatedAt: createdAt, 51 - }}, 52 }) 53 if err != nil { 54 log.Println("failed to create atproto record", err) ··· 56 } 57 log.Println("created atproto record: ", resp.Uri) 58 59 - star := &models.Star{ 60 - Did: currentUser.Active.Did, 61 - RepoAt: subjectUri, 62 - Rkey: rkey, 63 } 64 65 - err = db.AddStar(s.db, star) 66 - if err != nil { 67 - log.Println("failed to star", err) 68 - return 69 - } 70 71 starCount, err := db.GetStarCount(s.db, subjectUri) 72 if err != nil { 73 log.Println("failed to get star count for ", subjectUri) 74 } 75 76 - s.notifier.NewStar(r.Context(), star) 77 - 78 s.pages.StarBtnFragment(w, pages.StarBtnFragmentParams{ 79 IsStarred: true, 80 SubjectAt: subjectUri,
··· 38 39 switch r.Method { 40 case http.MethodPost: 41 + star := models.Star{ 42 + Did: currentUser.Active.Did, 43 + Rkey: tid.TID(), 44 + RepoAt: subjectUri, 45 + Created: time.Now(), 46 + } 47 + 48 + tx, err := s.db.BeginTx(r.Context(), nil) 49 + if err != nil { 50 + s.logger.Error("failed to start transaction", "err", err) 51 + return 52 + } 53 + defer tx.Rollback() 54 + 55 + if err := db.UpsertStar(tx, star); err != nil { 56 + s.logger.Error("failed to star", "err", err) 57 + return 58 + } 59 + 60 + record := star.AsRecord() 61 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 62 Collection: tangled.FeedStarNSID, 63 Repo: currentUser.Active.Did, 64 + Rkey: star.Rkey, 65 Record: &lexutil.LexiconTypeDecoder{ 66 + Val: &record, 67 + }, 68 }) 69 if err != nil { 70 log.Println("failed to create atproto record", err) ··· 72 } 73 log.Println("created atproto record: ", resp.Uri) 74 75 + if err := tx.Commit(); err != nil { 76 + s.logger.Error("failed to commit transaction", "err", err) 77 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 78 } 79 80 + s.notifier.NewStar(r.Context(), &star) 81 82 starCount, err := db.GetStarCount(s.db, subjectUri) 83 if err != nil { 84 log.Println("failed to get star count for ", subjectUri) 85 } 86 87 s.pages.StarBtnFragment(w, pages.StarBtnFragmentParams{ 88 IsStarred: true, 89 SubjectAt: subjectUri,

History

2 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
appview: upsert star/reaction/follow records
2/3 failed, 1/3 success
expand
merge conflicts detected
expand
  • appview/pages/markup/markdown.go:47
  • appview/pages/pages.go:57
  • appview/validator/label.go:1
expand 0 comments
1 commit
expand
appview: upsert star/reaction/follow records
3/3 success
expand
expand 0 comments