Monorepo for Tangled tangled.org

appview/labels: add handlers to perform label operations

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li debd92e8 1f95e691

verified
Changed files
+247
appview
labels
state
+240
appview/labels/labels.go
··· 1 + package labels 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "net/http" 10 + "time" 11 + 12 + comatproto "github.com/bluesky-social/indigo/api/atproto" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + lexutil "github.com/bluesky-social/indigo/lex/util" 15 + "github.com/go-chi/chi/v5" 16 + 17 + "tangled.sh/tangled.sh/core/api/tangled" 18 + "tangled.sh/tangled.sh/core/appview/config" 19 + "tangled.sh/tangled.sh/core/appview/db" 20 + "tangled.sh/tangled.sh/core/appview/middleware" 21 + "tangled.sh/tangled.sh/core/appview/oauth" 22 + "tangled.sh/tangled.sh/core/appview/pages" 23 + "tangled.sh/tangled.sh/core/appview/reporesolver" 24 + "tangled.sh/tangled.sh/core/appview/xrpcclient" 25 + "tangled.sh/tangled.sh/core/eventconsumer" 26 + "tangled.sh/tangled.sh/core/idresolver" 27 + "tangled.sh/tangled.sh/core/log" 28 + "tangled.sh/tangled.sh/core/rbac" 29 + "tangled.sh/tangled.sh/core/tid" 30 + ) 31 + 32 + type Labels struct { 33 + repoResolver *reporesolver.RepoResolver 34 + idResolver *idresolver.Resolver 35 + oauth *oauth.OAuth 36 + pages *pages.Pages 37 + db *db.DB 38 + logger *slog.Logger 39 + } 40 + 41 + func New( 42 + oauth *oauth.OAuth, 43 + repoResolver *reporesolver.RepoResolver, 44 + pages *pages.Pages, 45 + spindlestream *eventconsumer.Consumer, 46 + idResolver *idresolver.Resolver, 47 + db *db.DB, 48 + config *config.Config, 49 + enforcer *rbac.Enforcer, 50 + ) *Labels { 51 + logger := log.New("labels") 52 + 53 + return &Labels{ 54 + oauth: oauth, 55 + repoResolver: repoResolver, 56 + pages: pages, 57 + idResolver: idResolver, 58 + db: db, 59 + logger: logger, 60 + } 61 + } 62 + 63 + func (l *Labels) Router(mw *middleware.Middleware) http.Handler { 64 + r := chi.NewRouter() 65 + 66 + r.With(middleware.AuthMiddleware(l.oauth)).Put("/perform", l.PerformLabelOp) 67 + 68 + return r 69 + } 70 + 71 + func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) { 72 + user := l.oauth.GetUser(r) 73 + 74 + if err := r.ParseForm(); err != nil { 75 + l.logger.Error("failed to parse form data", "error", err) 76 + http.Error(w, "Invalid form data", http.StatusBadRequest) 77 + return 78 + } 79 + 80 + did := user.Did 81 + rkey := tid.TID() 82 + performedAt := time.Now() 83 + indexedAt := time.Now() 84 + repoAt := r.Form.Get("repo") 85 + subjectUri := r.Form.Get("subject") 86 + keys := r.Form["operand-key"] 87 + vals := r.Form["operand-val"] 88 + 89 + var labelOps []db.LabelOp 90 + for i := range len(keys) { 91 + op := r.FormValue(fmt.Sprintf("op-%d", i)) 92 + if op == "" { 93 + op = string(db.LabelOperationDel) 94 + } 95 + key := keys[i] 96 + val := vals[i] 97 + 98 + labelOps = append(labelOps, db.LabelOp{ 99 + Did: did, 100 + Rkey: rkey, 101 + Subject: syntax.ATURI(subjectUri), 102 + Operation: db.LabelOperation(op), 103 + OperandKey: key, 104 + OperandValue: val, 105 + PerformedAt: performedAt, 106 + IndexedAt: indexedAt, 107 + }) 108 + } 109 + 110 + // TODO: validate the operations 111 + 112 + // find all the labels that this repo subscribes to 113 + repoLabels, err := db.GetRepoLabels(l.db, db.FilterEq("repo_at", repoAt)) 114 + if err != nil { 115 + http.Error(w, "Invalid form data", http.StatusBadRequest) 116 + return 117 + } 118 + 119 + var labelAts []string 120 + for _, rl := range repoLabels { 121 + labelAts = append(labelAts, rl.LabelAt.String()) 122 + } 123 + 124 + actx, err := db.NewLabelApplicationCtx(l.db, db.FilterIn("at_uri", labelAts)) 125 + if err != nil { 126 + http.Error(w, "Invalid form data", http.StatusBadRequest) 127 + return 128 + } 129 + 130 + // calculate the start state by applying already known labels 131 + existingOps, err := db.GetLabelOps(l.db, db.FilterEq("subject", subjectUri)) 132 + if err != nil { 133 + http.Error(w, "Invalid form data", http.StatusBadRequest) 134 + return 135 + } 136 + 137 + labelState := db.NewLabelState() 138 + actx.ApplyLabelOps(labelState, existingOps) 139 + 140 + // next, apply all ops introduced in this request and filter out ones that are no-ops 141 + validLabelOps := labelOps[:0] 142 + for _, op := range labelOps { 143 + if err = actx.ApplyLabelOp(labelState, op); err != db.LabelNoOpError { 144 + validLabelOps = append(validLabelOps, op) 145 + } 146 + } 147 + 148 + // nothing to do 149 + if len(validLabelOps) == 0 { 150 + l.pages.HxRefresh(w) 151 + return 152 + } 153 + 154 + // create an atproto record of valid ops 155 + record := db.LabelOpsAsRecord(validLabelOps) 156 + 157 + client, err := l.oauth.AuthorizedClient(r) 158 + if err != nil { 159 + l.logger.Error("failed to create client", "error", err) 160 + http.Error(w, "Invalid form data", http.StatusBadRequest) 161 + return 162 + } 163 + 164 + resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 165 + Collection: tangled.LabelOpNSID, 166 + Repo: did, 167 + Rkey: rkey, 168 + Record: &lexutil.LexiconTypeDecoder{ 169 + Val: &record, 170 + }, 171 + }) 172 + if err != nil { 173 + l.logger.Error("failed to write to PDS", "error", err) 174 + http.Error(w, "failed to write to PDS", http.StatusInternalServerError) 175 + return 176 + } 177 + atUri := resp.Uri 178 + 179 + tx, err := l.db.BeginTx(r.Context(), nil) 180 + if err != nil { 181 + l.logger.Error("failed to start tx", "error", err) 182 + return 183 + } 184 + 185 + rollback := func() { 186 + err1 := tx.Rollback() 187 + err2 := rollbackRecord(context.Background(), atUri, client) 188 + 189 + // ignore txn complete errors, this is okay 190 + if errors.Is(err1, sql.ErrTxDone) { 191 + err1 = nil 192 + } 193 + 194 + if errs := errors.Join(err1, err2); errs != nil { 195 + return 196 + } 197 + } 198 + defer rollback() 199 + 200 + for _, o := range validLabelOps { 201 + if _, err := db.AddLabelOp(l.db, &o); err != nil { 202 + l.logger.Error("failed to add op", "err", err) 203 + return 204 + } 205 + 206 + l.logger.Info("performed label op", "did", o.Did, "rkey", o.Rkey, "kind", o.Operation, "subjcet", o.Subject, "key", o.OperandKey) 207 + } 208 + 209 + err = tx.Commit() 210 + if err != nil { 211 + return 212 + } 213 + 214 + // clear aturi when everything is successful 215 + atUri = "" 216 + 217 + l.pages.HxRefresh(w) 218 + } 219 + 220 + // this is used to rollback changes made to the PDS 221 + // 222 + // it is a no-op if the provided ATURI is empty 223 + func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 224 + if aturi == "" { 225 + return nil 226 + } 227 + 228 + parsed := syntax.ATURI(aturi) 229 + 230 + collection := parsed.Collection().String() 231 + repo := parsed.Authority().String() 232 + rkey := parsed.RecordKey().String() 233 + 234 + _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 235 + Collection: collection, 236 + Repo: repo, 237 + Rkey: rkey, 238 + }) 239 + return err 240 + }
+7
appview/state/router.go
··· 8 8 "github.com/gorilla/sessions" 9 9 "tangled.sh/tangled.sh/core/appview/issues" 10 10 "tangled.sh/tangled.sh/core/appview/knots" 11 + "tangled.sh/tangled.sh/core/appview/labels" 11 12 "tangled.sh/tangled.sh/core/appview/middleware" 12 13 oauthhandler "tangled.sh/tangled.sh/core/appview/oauth/handler" 13 14 "tangled.sh/tangled.sh/core/appview/pipelines" ··· 90 91 r.Mount("/issues", s.IssuesRouter(mw)) 91 92 r.Mount("/pulls", s.PullsRouter(mw)) 92 93 r.Mount("/pipelines", s.PipelinesRouter(mw)) 94 + r.Mount("/labels", s.LabelsRouter(mw)) 93 95 94 96 // These routes get proxied to the knot 95 97 r.Get("/info/refs", s.InfoRefs) ··· 248 250 func (s *State) PipelinesRouter(mw *middleware.Middleware) http.Handler { 249 251 pipes := pipelines.New(s.oauth, s.repoResolver, s.pages, s.spindlestream, s.idResolver, s.db, s.config, s.enforcer) 250 252 return pipes.Router(mw) 253 + } 254 + 255 + func (s *State) LabelsRouter(mw *middleware.Middleware) http.Handler { 256 + ls := labels.New(s.oauth, s.repoResolver, s.pages, s.spindlestream, s.idResolver, s.db, s.config, s.enforcer) 257 + return ls.Router(mw) 251 258 } 252 259 253 260 func (s *State) SignupRouter() http.Handler {