Monorepo for Tangled tangled.org

appview/db: fix orphaned pipelines and triggers on repo deletion #1071

pipelines and triggers were not cleaned up when a repository was deleted. add repo_at foreign key with on delete cascade to both tables so the database handles cleanup automatically. pipeline_statuses also cascade through pipelines, ensuring the full chain is removed. replaces repo_owner/repo_name lookups with repo_at

Signed-off-by: moshyfawn email@moshyfawn.dev

Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:sshwio5obbx4zjfrn3fhsen6/sh.tangled.repo.pull/3meo7ptudqx22
+331 -62
Diff #0
+116 -2
appview/db/db.go
··· 377 377 378 378 repo_owner text not null, 379 379 repo_name text not null, 380 + repo_at text not null, 380 381 381 382 -- every pipeline must be associated with exactly one commit 382 383 sha text not null check (length(sha) = 40), ··· 386 387 trigger_id integer not null, 387 388 388 389 unique(knot, rkey), 389 - foreign key (trigger_id) references triggers(id) on delete cascade 390 + foreign key (trigger_id) references triggers(id) on delete cascade, 391 + foreign key (repo_at) references repos(at_uri) on delete cascade 390 392 ); 391 393 392 394 create table if not exists triggers ( ··· 395 397 396 398 -- top-level fields 397 399 kind text not null, 400 + repo_at text not null, 398 401 399 402 -- pushTriggerData fields 400 403 push_ref text, ··· 405 408 pr_source_branch text, 406 409 pr_target_branch text, 407 410 pr_source_sha text check (length(pr_source_sha) = 40), 408 - pr_action text 411 + pr_action text, 412 + 413 + foreign key (repo_at) references repos(at_uri) on delete cascade 409 414 ); 410 415 411 416 create table if not exists pipeline_statuses ( ··· 1204 1209 `) 1205 1210 return err 1206 1211 }) 1212 + 1213 + conn.ExecContext(ctx, "pragma foreign_keys = off;") 1214 + orm.RunMigration(conn, logger, "add-repo-at-fk-to-pipelines", func(tx *sql.Tx) error { 1215 + _, err := tx.Exec(` 1216 + create table if not exists pipelines_new ( 1217 + id integer primary key autoincrement, 1218 + knot text not null, 1219 + rkey text not null, 1220 + 1221 + repo_owner text not null, 1222 + repo_name text not null, 1223 + repo_at text not null, 1224 + 1225 + sha text not null check (length(sha) = 40), 1226 + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 1227 + 1228 + trigger_id integer not null, 1229 + 1230 + unique(knot, rkey), 1231 + foreign key (trigger_id) references triggers(id) on delete cascade, 1232 + foreign key (repo_at) references repos(at_uri) on delete cascade 1233 + ); 1234 + `) 1235 + if err != nil { 1236 + return err 1237 + } 1238 + 1239 + _, err = tx.Exec(` 1240 + -- backfill repo_at from repos; orphaned rows are dropped 1241 + insert into pipelines_new (id, knot, rkey, repo_owner, repo_name, repo_at, sha, created, trigger_id) 1242 + select 1243 + p.id, 1244 + p.knot, 1245 + p.rkey, 1246 + p.repo_owner, 1247 + p.repo_name, 1248 + r.at_uri, 1249 + p.sha, 1250 + p.created, 1251 + p.trigger_id 1252 + from pipelines p 1253 + join repos r on p.repo_owner = r.did and p.repo_name = r.name; 1254 + `) 1255 + if err != nil { 1256 + return err 1257 + } 1258 + 1259 + _, err = tx.Exec(`drop table pipelines`) 1260 + if err != nil { 1261 + return err 1262 + } 1263 + 1264 + _, err = tx.Exec(`alter table pipelines_new rename to pipelines`) 1265 + return err 1266 + }) 1267 + 1268 + orm.RunMigration(conn, logger, "add-repo-at-fk-to-triggers", func(tx *sql.Tx) error { 1269 + _, err := tx.Exec(` 1270 + create table if not exists triggers_new ( 1271 + id integer primary key autoincrement, 1272 + kind text not null, 1273 + repo_at text not null, 1274 + 1275 + push_ref text, 1276 + push_new_sha text check (length(push_new_sha) = 40), 1277 + push_old_sha text check (length(push_old_sha) = 40), 1278 + 1279 + pr_source_branch text, 1280 + pr_target_branch text, 1281 + pr_source_sha text check (length(pr_source_sha) = 40), 1282 + pr_action text, 1283 + 1284 + foreign key (repo_at) references repos(at_uri) on delete cascade 1285 + ); 1286 + `) 1287 + if err != nil { 1288 + return err 1289 + } 1290 + 1291 + _, err = tx.Exec(` 1292 + -- backfill repo_at from pipelines; orphaned rows are dropped 1293 + insert into triggers_new (id, kind, repo_at, push_ref, push_new_sha, push_old_sha, pr_source_branch, pr_target_branch, pr_source_sha, pr_action) 1294 + select 1295 + t.id, 1296 + t.kind, 1297 + p.repo_at, 1298 + t.push_ref, 1299 + t.push_new_sha, 1300 + t.push_old_sha, 1301 + t.pr_source_branch, 1302 + t.pr_target_branch, 1303 + t.pr_source_sha, 1304 + t.pr_action 1305 + from triggers t 1306 + join pipelines p on t.id = p.trigger_id; 1307 + `) 1308 + if err != nil { 1309 + return err 1310 + } 1311 + 1312 + _, err = tx.Exec(`drop table triggers`) 1313 + if err != nil { 1314 + return err 1315 + } 1316 + 1317 + _, err = tx.Exec(`alter table triggers_new rename to triggers`) 1318 + return err 1319 + }) 1320 + conn.ExecContext(ctx, "pragma foreign_keys = on;") 1207 1321 1208 1322 return &DB{ 1209 1323 db,
+8 -1
appview/db/pipeline.go
··· 26 26 whereClause = " where " + strings.Join(conditions, " and ") 27 27 } 28 28 29 - query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 29 + query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, repo_at, sha, created from pipelines %s`, whereClause) 30 30 31 31 rows, err := e.Query(query, args...) 32 32 ··· 44 44 &pipeline.Knot, 45 45 &pipeline.RepoOwner, 46 46 &pipeline.RepoName, 47 + &pipeline.RepoAt, 47 48 &pipeline.Sha, 48 49 &createdAt, 49 50 ) ··· 71 72 pipeline.Knot, 72 73 pipeline.RepoOwner, 73 74 pipeline.RepoName, 75 + pipeline.RepoAt, 74 76 pipeline.TriggerId, 75 77 pipeline.Sha, 76 78 } ··· 86 88 knot, 87 89 repo_owner, 88 90 repo_name, 91 + repo_at, 89 92 trigger_id, 90 93 sha 91 94 ) values (%s) ··· 99 102 func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { 100 103 args := []any{ 101 104 trigger.Kind, 105 + trigger.RepoAt, 102 106 trigger.PushRef, 103 107 trigger.PushNewSha, 104 108 trigger.PushOldSha, ··· 115 119 116 120 query := fmt.Sprintf(`insert or ignore into triggers ( 117 121 kind, 122 + repo_at, 118 123 push_ref, 119 124 push_new_sha, 120 125 push_old_sha, ··· 193 198 p.rkey, 194 199 p.repo_owner, 195 200 p.repo_name, 201 + p.repo_at, 196 202 p.sha, 197 203 p.created, 198 204 t.id, ··· 231 237 &p.Rkey, 232 238 &p.RepoOwner, 233 239 &p.RepoName, 240 + &p.RepoAt, 234 241 &p.Sha, 235 242 &created, 236 243 &p.TriggerId,
+180
appview/db/pipeline_test.go
··· 1 + package db 2 + 3 + import ( 4 + "database/sql" 5 + "testing" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + _ "github.com/mattn/go-sqlite3" 9 + "github.com/stretchr/testify/assert" 10 + "github.com/stretchr/testify/require" 11 + "tangled.org/core/appview/models" 12 + "tangled.org/core/orm" 13 + ) 14 + 15 + func setupPipelineTestDB(t *testing.T) *sql.DB { 16 + t.Helper() 17 + db, err := sql.Open("sqlite3", ":memory:?_foreign_keys=1") 18 + require.NoError(t, err) 19 + 20 + _, err = db.Exec(` 21 + create table if not exists repos ( 22 + id integer primary key autoincrement, 23 + did text not null, 24 + name text not null, 25 + rkey text not null default '', 26 + at_uri text not null unique, 27 + knot text not null default '', 28 + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 29 + unique(did, name) 30 + ); 31 + 32 + create table if not exists triggers ( 33 + -- primary key 34 + id integer primary key autoincrement, 35 + 36 + -- top-level fields 37 + kind text not null, 38 + repo_at text not null, 39 + 40 + -- pushTriggerData fields 41 + push_ref text, 42 + push_new_sha text check (length(push_new_sha) = 40), 43 + push_old_sha text check (length(push_old_sha) = 40), 44 + 45 + -- pullRequestTriggerData fields 46 + pr_source_branch text, 47 + pr_target_branch text, 48 + pr_source_sha text check (length(pr_source_sha) = 40), 49 + pr_action text, 50 + 51 + foreign key (repo_at) references repos(at_uri) on delete cascade 52 + ); 53 + 54 + create table if not exists pipelines ( 55 + -- identifiers 56 + id integer primary key autoincrement, 57 + knot text not null, 58 + rkey text not null, 59 + 60 + repo_owner text not null, 61 + repo_name text not null, 62 + repo_at text not null, 63 + 64 + -- every pipeline must be associated with exactly one commit 65 + sha text not null check (length(sha) = 40), 66 + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 67 + 68 + -- trigger data 69 + trigger_id integer not null, 70 + 71 + unique(knot, rkey), 72 + foreign key (trigger_id) references triggers(id) on delete cascade, 73 + foreign key (repo_at) references repos(at_uri) on delete cascade 74 + ); 75 + 76 + create table if not exists pipeline_statuses ( 77 + -- identifiers 78 + id integer primary key autoincrement, 79 + spindle text not null, 80 + rkey text not null, 81 + 82 + -- referenced pipeline. these form the (did, rkey) pair 83 + pipeline_knot text not null, 84 + pipeline_rkey text not null, 85 + 86 + -- content 87 + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 88 + workflow text not null, 89 + status text not null, 90 + error text, 91 + exit_code integer not null default 0, 92 + 93 + unique (spindle, rkey), 94 + foreign key (pipeline_knot, pipeline_rkey) 95 + references pipelines (knot, rkey) 96 + on delete cascade 97 + ); 98 + `) 99 + require.NoError(t, err) 100 + 101 + t.Cleanup(func() { db.Close() }) 102 + return db 103 + } 104 + 105 + func seedRepo(t *testing.T, db *sql.DB, did, name, rkey, knot string) syntax.ATURI { 106 + t.Helper() 107 + atURI := syntax.ATURI("at://" + did + "/sh.tangled.repo/" + rkey) 108 + _, err := db.Exec( 109 + `insert into repos (did, name, rkey, at_uri, knot) values (?, ?, ?, ?, ?)`, 110 + did, name, rkey, string(atURI), knot, 111 + ) 112 + require.NoError(t, err) 113 + return atURI 114 + } 115 + 116 + func TestPipelineCascadeDeleteOnRepoRemoval(t *testing.T) { 117 + d := setupPipelineTestDB(t) 118 + 119 + knot := "example.com" 120 + did := "did:plc:foo" 121 + repoName := "my-repo" 122 + testSha := "0000000000000000000000000000000000000000" 123 + 124 + repoAt := seedRepo(t, d, did, repoName, "abc123", knot) 125 + 126 + triggerID, err := AddTrigger(d, models.Trigger{ 127 + Kind: "push", 128 + RepoAt: repoAt, 129 + }) 130 + require.NoError(t, err) 131 + 132 + err = AddPipeline(d, models.Pipeline{ 133 + Rkey: "pip1", 134 + Knot: knot, 135 + RepoOwner: syntax.DID(did), 136 + RepoName: repoName, 137 + RepoAt: repoAt, 138 + TriggerId: int(triggerID), 139 + Sha: testSha, 140 + }) 141 + require.NoError(t, err) 142 + 143 + // add a pipeline_status referencing the pipeline 144 + _, err = d.Exec( 145 + `insert into pipeline_statuses (spindle, rkey, pipeline_knot, pipeline_rkey, workflow, status, exit_code) 146 + values (?, ?, ?, ?, ?, ?, ?)`, 147 + "spindle.example.com", "status1", knot, "pip1", "ci.yaml", "success", 0, 148 + ) 149 + require.NoError(t, err) 150 + 151 + ps, err := GetPipelines(d, orm.FilterEq("repo_at", repoAt)) 152 + require.NoError(t, err) 153 + assert.Len(t, ps, 1) 154 + assert.Equal(t, repoAt, ps[0].RepoAt) 155 + 156 + // verify the production query path (join + repo_at filter) 157 + joined, err := GetPipelineStatuses(d, 1, orm.FilterEq("p.repo_at", repoAt)) 158 + require.NoError(t, err) 159 + assert.Len(t, joined, 1) 160 + assert.Equal(t, repoAt, joined[0].RepoAt) 161 + 162 + // cascade: removing the repo should delete its pipelines, triggers, 163 + // and pipeline_statuses 164 + _, err = d.Exec(`delete from repos where at_uri = ?`, string(repoAt)) 165 + require.NoError(t, err) 166 + 167 + ps, err = GetPipelines(d) 168 + require.NoError(t, err) 169 + assert.Empty(t, ps) 170 + 171 + var triggerCount int 172 + err = d.QueryRow(`select count(*) from triggers`).Scan(&triggerCount) 173 + require.NoError(t, err) 174 + assert.Equal(t, 0, triggerCount) 175 + 176 + var statusCount int 177 + err = d.QueryRow(`select count(*) from pipeline_statuses`).Scan(&statusCount) 178 + require.NoError(t, err) 179 + assert.Equal(t, 0, statusCount) 180 + }
+4 -2
appview/models/pipeline.go
··· 19 19 Knot string 20 20 RepoOwner syntax.DID 21 21 RepoName string 22 + RepoAt syntax.ATURI 22 23 TriggerId int 23 24 Sha string 24 25 Created time.Time ··· 127 128 } 128 129 129 130 type Trigger struct { 130 - Id int 131 - Kind workflow.TriggerKind 131 + Id int 132 + Kind workflow.TriggerKind 133 + RepoAt syntax.ATURI 132 134 133 135 // push trigger fields 134 136 PushRef *string
+4 -8
appview/pipelines/pipelines.go
··· 88 88 89 89 filterKind := r.URL.Query().Get("trigger") 90 90 filters := []orm.Filter{ 91 - orm.FilterEq("p.repo_owner", f.Did), 92 - orm.FilterEq("p.repo_name", f.Name), 91 + orm.FilterEq("p.repo_at", f.RepoAt()), 93 92 orm.FilterEq("p.knot", f.Knot), 94 93 } 95 94 switch filterKind { ··· 152 151 ps, err := db.GetPipelineStatuses( 153 152 p.db, 154 153 1, 155 - orm.FilterEq("p.repo_owner", f.Did), 156 - orm.FilterEq("p.repo_name", f.Name), 154 + orm.FilterEq("p.repo_at", f.RepoAt()), 157 155 orm.FilterEq("p.knot", f.Knot), 158 156 orm.FilterEq("p.id", pipelineId), 159 157 ) ··· 219 217 ps, err := db.GetPipelineStatuses( 220 218 p.db, 221 219 1, 222 - orm.FilterEq("p.repo_owner", f.Did), 223 - orm.FilterEq("p.repo_name", f.Name), 220 + orm.FilterEq("p.repo_at", f.RepoAt()), 224 221 orm.FilterEq("p.knot", f.Knot), 225 222 orm.FilterEq("p.id", pipelineId), 226 223 ) ··· 368 365 ps, err := db.GetPipelineStatuses( 369 366 p.db, 370 367 1, 371 - orm.FilterEq("p.repo_owner", f.Did), 372 - orm.FilterEq("p.repo_name", f.Name), 368 + orm.FilterEq("p.repo_at", f.RepoAt()), 373 369 orm.FilterEq("p.knot", f.Knot), 374 370 orm.FilterEq("p.id", pipelineId), 375 371 )
+3 -37
appview/pulls/pulls.go
··· 214 214 ps, err := db.GetPipelineStatuses( 215 215 s.db, 216 216 len(shas), 217 - orm.FilterEq("p.repo_owner", f.Did), 218 - orm.FilterEq("p.repo_name", f.Name), 217 + orm.FilterEq("p.repo_at", f.RepoAt()), 219 218 orm.FilterEq("p.knot", f.Knot), 220 219 orm.FilterIn("p.sha", shas), 221 220 ) ··· 553 552 554 553 keyword := params.Get("q") 555 554 556 - repoInfo := s.repoResolver.GetRepoInfo(r, user) 557 - 558 555 var pulls []*models.Pull 559 556 searchOpts := models.PullSearchOptions{ 560 557 Keyword: keyword, ··· 572 569 totalPulls = int(res.Total) 573 570 l.Debug("searched pulls with indexer", "count", len(res.Hits)) 574 571 575 - // count matching pulls in the other states to display correct counts 576 - for _, other := range []models.PullState{models.PullOpen, models.PullMerged, models.PullClosed} { 577 - if other == state { 578 - continue 579 - } 580 - countRes, err := s.indexer.Search(r.Context(), models.PullSearchOptions{ 581 - Keyword: keyword, RepoAt: f.RepoAt().String(), State: other, 582 - Page: pagination.Page{Limit: 1}, 583 - }) 584 - if err != nil { 585 - continue 586 - } 587 - switch other { 588 - case models.PullOpen: 589 - repoInfo.Stats.PullCount.Open = int(countRes.Total) 590 - case models.PullMerged: 591 - repoInfo.Stats.PullCount.Merged = int(countRes.Total) 592 - case models.PullClosed: 593 - repoInfo.Stats.PullCount.Closed = int(countRes.Total) 594 - } 595 - } 596 - switch state { 597 - case models.PullOpen: 598 - repoInfo.Stats.PullCount.Open = int(res.Total) 599 - case models.PullMerged: 600 - repoInfo.Stats.PullCount.Merged = int(res.Total) 601 - case models.PullClosed: 602 - repoInfo.Stats.PullCount.Closed = int(res.Total) 603 - } 604 - 605 572 pulls, err = db.GetPulls( 606 573 s.db, 607 574 orm.FilterIn("id", res.Hits), ··· 668 635 ps, err := db.GetPipelineStatuses( 669 636 s.db, 670 637 len(shas), 671 - orm.FilterEq("p.repo_owner", f.Did), 672 - orm.FilterEq("p.repo_name", f.Name), 638 + orm.FilterEq("p.repo_at", f.RepoAt()), 673 639 orm.FilterEq("p.knot", f.Knot), 674 640 orm.FilterIn("p.sha", shas), 675 641 ) ··· 700 666 701 667 s.pages.RepoPulls(w, pages.RepoPullsParams{ 702 668 LoggedInUser: s.oauth.GetMultiAccountUser(r), 703 - RepoInfo: repoInfo, 669 + RepoInfo: s.repoResolver.GetRepoInfo(r, user), 704 670 Pulls: pulls, 705 671 LabelDefs: defs, 706 672 FilteringBy: state,
+1 -2
appview/repo/repo_util.go
··· 103 103 ps, err := db.GetPipelineStatuses( 104 104 d, 105 105 len(shas), 106 - orm.FilterEq("p.repo_owner", repo.Did), 107 - orm.FilterEq("p.repo_name", repo.Name), 106 + orm.FilterEq("p.repo_at", repo.RepoAt()), 108 107 orm.FilterEq("p.knot", repo.Knot), 109 108 orm.FilterIn("p.sha", shas), 110 109 )
+2
appview/state/knotstream.go
··· 246 246 return fmt.Errorf("failed to start txn: %w", err) 247 247 } 248 248 249 + trigger.RepoAt = repos[0].RepoAt() 249 250 triggerId, err := db.AddTrigger(tx, trigger) 250 251 if err != nil { 251 252 return fmt.Errorf("failed to add trigger entry: %w", err) ··· 256 257 Knot: source.Key(), 257 258 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 258 259 RepoName: record.TriggerMetadata.Repo.Repo, 260 + RepoAt: repos[0].RepoAt(), 259 261 TriggerId: int(triggerId), 260 262 Sha: sha, 261 263 }
+13 -10
cmd/populatepipelines/populate_pipelines.go
··· 68 68 log.Fatalf("Invalid repo format: %s (expected: did:plc:xyz/reponame)", *repo) 69 69 } 70 70 71 + // Construct the AT URI for repo_at lookups 72 + repoAt := fmt.Sprintf("at://%s/sh.tangled.repo/%s", did, repoName) 73 + 71 74 db, err := sql.Open("sqlite3", *dbPath) 72 75 if err != nil { 73 76 log.Fatalf("Failed to open database: %v", err) ··· 87 90 88 91 var triggerId int64 89 92 if isPush { 90 - triggerId, err = createPushTrigger(db, branches) 93 + triggerId, err = createPushTrigger(db, repoAt, branches) 91 94 } else { 92 - triggerId, err = createPRTrigger(db, branches) 95 + triggerId, err = createPRTrigger(db, repoAt, branches) 93 96 } 94 97 if err != nil { 95 98 log.Fatalf("Failed to create trigger: %v", err) ··· 147 150 return "", "", false 148 151 } 149 152 150 - func createPushTrigger(db *sql.DB, branches []string) (int64, error) { 153 + func createPushTrigger(db *sql.DB, repoAt string, branches []string) (int64, error) { 151 154 branch := branches[rand.Intn(len(branches))] 152 155 oldSha := generateRandomSha() 153 156 newSha := generateRandomSha() 154 157 155 158 result, err := db.Exec(` 156 - INSERT INTO triggers (kind, push_ref, push_new_sha, push_old_sha) 157 - VALUES (?, ?, ?, ?) 158 - `, "push", "refs/heads/"+branch, newSha, oldSha) 159 + INSERT INTO triggers (kind, repo_at, push_ref, push_new_sha, push_old_sha) 160 + VALUES (?, ?, ?, ?, ?) 161 + `, "push", repoAt, "refs/heads/"+branch, newSha, oldSha) 159 162 160 163 if err != nil { 161 164 return 0, err ··· 164 167 return result.LastInsertId() 165 168 } 166 169 167 - func createPRTrigger(db *sql.DB, branches []string) (int64, error) { 170 + func createPRTrigger(db *sql.DB, repoAt string, branches []string) (int64, error) { 168 171 targetBranch := branches[0] // Usually main 169 172 sourceBranch := branches[rand.Intn(len(branches)-1)+1] 170 173 sourceSha := generateRandomSha() ··· 172 175 action := actions[rand.Intn(len(actions))] 173 176 174 177 result, err := db.Exec(` 175 - INSERT INTO triggers (kind, pr_source_branch, pr_target_branch, pr_source_sha, pr_action) 176 - VALUES (?, ?, ?, ?, ?) 177 - `, "pull_request", sourceBranch, targetBranch, sourceSha, action) 178 + INSERT INTO triggers (kind, repo_at, pr_source_branch, pr_target_branch, pr_source_sha, pr_action) 179 + VALUES (?, ?, ?, ?, ?, ?) 180 + `, "pull_request", repoAt, sourceBranch, targetBranch, sourceSha, action) 178 181 179 182 if err != nil { 180 183 return 0, err

History

1 round 1 comment
sign up or login to add to the discussion
moshyfawn.dev submitted #0
2 commits
expand
appview/db: fix orphaned pipelines and triggers on repo deletion
appview/db: add cascade delete test for pipelines and triggers
no conflicts, ready to merge
expand 1 comment
  • here, it seems the migration on here handles the additions of repo_at and the fk constraint, why write it down here again? is this to serve as a reference? i think that is valuable but as a rule of thumb, we just avoid mutating old queries.
  • here minor style nit, can we indent the sql?
  • pipelines.trigger_id references triggers.id, and a cascading relationship is already present, so we don't need to add repo_at to that table! when a pipeline is deleted, its corresponding trigger metadata is also deleted

the rest of the code lgtm! thanks for working on this!