a love letter to tangled (android, iOS, and a search API)
at main 726 lines 22 kB view raw
1package store 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "strings" 9 "time" 10) 11 12type PostgresStore struct { 13 db *sql.DB 14} 15 16type pgArgs struct { 17 args []any 18} 19 20func newPGArgs(initial ...any) *pgArgs { 21 return &pgArgs{args: append([]any{}, initial...)} 22} 23 24func (p *pgArgs) Add(value any) string { 25 p.args = append(p.args, value) 26 return fmt.Sprintf("$%d", len(p.args)) 27} 28 29func (p *pgArgs) Values() []any { 30 return p.args 31} 32 33func (s *PostgresStore) UpsertDocument(ctx context.Context, doc *Document) error { 34 doc.IndexedAt = time.Now().UTC().Format(time.RFC3339) 35 36 _, err := s.db.ExecContext(ctx, ` 37 INSERT INTO documents ( 38 id, did, collection, rkey, at_uri, cid, record_type, 39 title, body, summary, repo_did, repo_name, author_handle, 40 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at 41 ) VALUES ( 42 $1, $2, $3, $4, $5, $6, $7, 43 $8, $9, $10, $11, $12, $13, 44 $14, $15, $16, $17, $18, $19, $20 45 ) 46 ON CONFLICT(id) DO UPDATE SET 47 did = excluded.did, 48 collection = excluded.collection, 49 rkey = excluded.rkey, 50 at_uri = excluded.at_uri, 51 cid = excluded.cid, 52 record_type = excluded.record_type, 53 title = excluded.title, 54 body = excluded.body, 55 summary = excluded.summary, 56 repo_did = excluded.repo_did, 57 repo_name = excluded.repo_name, 58 author_handle = excluded.author_handle, 59 tags_json = excluded.tags_json, 60 language = excluded.language, 61 created_at = excluded.created_at, 62 updated_at = excluded.updated_at, 63 indexed_at = excluded.indexed_at, 64 web_url = excluded.web_url, 65 deleted_at = excluded.deleted_at`, 66 doc.ID, doc.DID, doc.Collection, doc.RKey, doc.ATURI, doc.CID, doc.RecordType, 67 doc.Title, doc.Body, doc.Summary, doc.RepoDID, doc.RepoName, doc.AuthorHandle, 68 doc.TagsJSON, doc.Language, doc.CreatedAt, doc.UpdatedAt, doc.IndexedAt, 69 doc.WebURL, nullableStr(doc.DeletedAt), 70 ) 71 if err != nil { 72 return fmt.Errorf("upsert document: %w", err) 73 } 74 return nil 75} 76 77func (s *PostgresStore) ListDocuments(ctx context.Context, filter DocumentFilter) ([]*Document, error) { 78 query := `SELECT id, did, collection, rkey, at_uri, cid, record_type, 79 title, body, summary, repo_did, repo_name, author_handle, 80 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at 81 FROM documents WHERE deleted_at IS NULL` 82 args := newPGArgs() 83 84 if filter.DocumentID != "" { 85 query += " AND id = " + args.Add(filter.DocumentID) 86 } 87 if filter.Collection != "" { 88 query += " AND collection = " + args.Add(filter.Collection) 89 } 90 if filter.DID != "" { 91 query += " AND did = " + args.Add(filter.DID) 92 } 93 94 rows, err := s.db.QueryContext(ctx, query, args.Values()...) 95 if err != nil { 96 return nil, fmt.Errorf("list documents: %w", err) 97 } 98 defer rows.Close() 99 100 var docs []*Document 101 for rows.Next() { 102 doc := &Document{} 103 var ( 104 title, body, summary, repoDID, repoName, authorHandle sql.NullString 105 tagsJSON, language, createdAt, updatedAt, webURL, deletedAt sql.NullString 106 ) 107 if err := rows.Scan( 108 &doc.ID, &doc.DID, &doc.Collection, &doc.RKey, &doc.ATURI, &doc.CID, &doc.RecordType, 109 &title, &body, &summary, &repoDID, &repoName, &authorHandle, 110 &tagsJSON, &language, &createdAt, &updatedAt, &doc.IndexedAt, &webURL, &deletedAt, 111 ); err != nil { 112 return nil, fmt.Errorf("scan document: %w", err) 113 } 114 doc.Title = title.String 115 doc.Body = body.String 116 doc.Summary = summary.String 117 doc.RepoDID = repoDID.String 118 doc.RepoName = repoName.String 119 doc.AuthorHandle = authorHandle.String 120 doc.TagsJSON = tagsJSON.String 121 doc.Language = language.String 122 doc.CreatedAt = createdAt.String 123 doc.UpdatedAt = updatedAt.String 124 doc.WebURL = webURL.String 125 doc.DeletedAt = deletedAt.String 126 docs = append(docs, doc) 127 } 128 if err := rows.Err(); err != nil { 129 return nil, fmt.Errorf("iterate documents: %w", err) 130 } 131 return docs, nil 132} 133 134func (s *PostgresStore) OptimizeSearchIndex(_ context.Context) error { 135 return nil 136} 137 138func (s *PostgresStore) GetDocument(ctx context.Context, id string) (*Document, error) { 139 row := s.db.QueryRowContext(ctx, ` 140 SELECT id, did, collection, rkey, at_uri, cid, record_type, 141 title, body, summary, repo_did, repo_name, author_handle, 142 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at 143 FROM documents WHERE id = $1`, id) 144 145 doc, err := scanDocument(row) 146 if errors.Is(err, sql.ErrNoRows) { 147 return nil, nil 148 } 149 if err != nil { 150 return nil, fmt.Errorf("get document: %w", err) 151 } 152 return doc, nil 153} 154 155func (s *PostgresStore) MarkDeleted(ctx context.Context, id string) error { 156 now := time.Now().UTC().Format(time.RFC3339) 157 _, err := s.db.ExecContext(ctx, `UPDATE documents SET deleted_at = $1 WHERE id = $2`, now, id) 158 if err != nil { 159 return fmt.Errorf("mark deleted: %w", err) 160 } 161 return nil 162} 163 164func (s *PostgresStore) GetSyncState(ctx context.Context, consumer string) (*SyncState, error) { 165 row := s.db.QueryRowContext(ctx, ` 166 SELECT consumer_name, cursor, COALESCE(high_water_mark, ''), updated_at 167 FROM sync_state WHERE consumer_name = $1`, consumer) 168 169 ss := &SyncState{} 170 err := row.Scan(&ss.ConsumerName, &ss.Cursor, &ss.HighWaterMark, &ss.UpdatedAt) 171 if errors.Is(err, sql.ErrNoRows) { 172 return nil, nil 173 } 174 if err != nil { 175 return nil, fmt.Errorf("get sync state: %w", err) 176 } 177 return ss, nil 178} 179 180func (s *PostgresStore) SetSyncState(ctx context.Context, consumer string, cursor string) error { 181 now := time.Now().UTC().Format(time.RFC3339) 182 _, err := s.db.ExecContext(ctx, ` 183 INSERT INTO sync_state (consumer_name, cursor, updated_at) VALUES ($1, $2, $3) 184 ON CONFLICT(consumer_name) DO UPDATE SET 185 cursor = excluded.cursor, 186 updated_at = excluded.updated_at`, 187 consumer, cursor, now, 188 ) 189 if err != nil { 190 return fmt.Errorf("set sync state: %w", err) 191 } 192 return nil 193} 194 195func (s *PostgresStore) UpdateRecordState(ctx context.Context, subjectURI string, state string) error { 196 now := time.Now().UTC().Format(time.RFC3339) 197 _, err := s.db.ExecContext(ctx, ` 198 INSERT INTO record_state (subject_uri, state, updated_at) VALUES ($1, $2, $3) 199 ON CONFLICT(subject_uri) DO UPDATE SET 200 state = excluded.state, 201 updated_at = excluded.updated_at`, 202 subjectURI, state, now, 203 ) 204 if err != nil { 205 return fmt.Errorf("update record state: %w", err) 206 } 207 return nil 208} 209 210func (s *PostgresStore) UpsertIdentityHandle( 211 ctx context.Context, did, handle string, isActive bool, status string, 212) error { 213 now := time.Now().UTC().Format(time.RFC3339) 214 _, err := s.db.ExecContext(ctx, ` 215 INSERT INTO identity_handles (did, handle, is_active, status, updated_at) 216 VALUES ($1, $2, $3, $4, $5) 217 ON CONFLICT(did) DO UPDATE SET 218 handle = excluded.handle, 219 is_active = excluded.is_active, 220 status = excluded.status, 221 updated_at = excluded.updated_at`, 222 did, handle, isActive, nullableStr(status), now, 223 ) 224 if err != nil { 225 return fmt.Errorf("upsert identity handle: %w", err) 226 } 227 return nil 228} 229 230func (s *PostgresStore) GetIdentityHandle(ctx context.Context, did string) (string, error) { 231 var handle sql.NullString 232 err := s.db.QueryRowContext(ctx, `SELECT handle FROM identity_handles WHERE did = $1`, did).Scan(&handle) 233 if errors.Is(err, sql.ErrNoRows) { 234 return "", nil 235 } 236 if err != nil { 237 return "", fmt.Errorf("get identity handle: %w", err) 238 } 239 return handle.String, nil 240} 241 242func (s *PostgresStore) GetIndexingJob(ctx context.Context, documentID string) (*IndexingJob, error) { 243 row := s.db.QueryRowContext(ctx, ` 244 SELECT document_id, did, collection, rkey, cid, record_json, source, 245 attempts, status, COALESCE(last_error, ''), scheduled_at, updated_at, 246 COALESCE(lease_owner, ''), COALESCE(lease_expires_at, ''), 247 COALESCE(completed_at, '') 248 FROM indexing_jobs 249 WHERE document_id = $1`, documentID) 250 251 job, err := scanIndexingJob(row) 252 if errors.Is(err, sql.ErrNoRows) { 253 return nil, nil 254 } 255 if err != nil { 256 return nil, fmt.Errorf("get indexing job: %w", err) 257 } 258 return job, nil 259} 260 261func (s *PostgresStore) EnqueueIndexingJob(ctx context.Context, input IndexingJobInput) error { 262 now := time.Now().UTC().Format(time.RFC3339) 263 source := strings.TrimSpace(input.Source) 264 if source == "" { 265 source = IndexSourceReadThrough 266 } 267 268 _, err := s.db.ExecContext(ctx, ` 269 INSERT INTO indexing_jobs ( 270 document_id, did, collection, rkey, cid, record_json, source, 271 status, attempts, last_error, scheduled_at, updated_at, 272 lease_owner, lease_expires_at, completed_at 273 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 0, NULL, $9, $10, '', '', '') 274 ON CONFLICT(document_id) DO UPDATE SET 275 did = excluded.did, 276 collection = excluded.collection, 277 rkey = excluded.rkey, 278 cid = excluded.cid, 279 record_json = excluded.record_json, 280 source = excluded.source, 281 status = excluded.status, 282 last_error = NULL, 283 scheduled_at = excluded.scheduled_at, 284 updated_at = excluded.updated_at, 285 lease_owner = '', 286 lease_expires_at = '', 287 completed_at = ''`, 288 input.DocumentID, input.DID, input.Collection, input.RKey, input.CID, 289 input.RecordJSON, source, IndexingJobPending, now, now, 290 ) 291 if err != nil { 292 return fmt.Errorf("enqueue indexing job: %w", err) 293 } 294 return nil 295} 296 297func (s *PostgresStore) ClaimIndexingJob( 298 ctx context.Context, workerID string, leaseUntil string, 299) (*IndexingJob, error) { 300 now := time.Now().UTC().Format(time.RFC3339) 301 tx, err := s.db.BeginTx(ctx, nil) 302 if err != nil { 303 return nil, fmt.Errorf("begin claim tx: %w", err) 304 } 305 defer tx.Rollback() 306 307 row := tx.QueryRowContext(ctx, ` 308 WITH candidate AS ( 309 SELECT document_id 310 FROM indexing_jobs 311 WHERE ( 312 status = $1 313 AND scheduled_at::timestamptz <= $2::timestamptz 314 ) OR ( 315 status = $3 316 AND lease_expires_at IS NOT NULL 317 AND lease_expires_at != '' 318 AND lease_expires_at::timestamptz <= $2::timestamptz 319 ) 320 ORDER BY scheduled_at::timestamptz ASC, updated_at::timestamptz ASC 321 FOR UPDATE SKIP LOCKED 322 LIMIT 1 323 ) 324 UPDATE indexing_jobs jobs 325 SET status = $4, updated_at = $5, lease_owner = $6, lease_expires_at = $7 326 FROM candidate 327 WHERE jobs.document_id = candidate.document_id 328 RETURNING jobs.document_id, jobs.did, jobs.collection, jobs.rkey, jobs.cid, 329 jobs.record_json, jobs.source, jobs.attempts, jobs.status, 330 COALESCE(jobs.last_error, ''), jobs.scheduled_at, jobs.updated_at, 331 COALESCE(jobs.lease_owner, ''), COALESCE(jobs.lease_expires_at, ''), 332 COALESCE(jobs.completed_at, '')`, 333 IndexingJobPending, now, IndexingJobProcessing, IndexingJobProcessing, 334 now, workerID, leaseUntil, 335 ) 336 337 job, err := scanIndexingJob(row) 338 if errors.Is(err, sql.ErrNoRows) { 339 if err := tx.Commit(); err != nil { 340 return nil, fmt.Errorf("commit empty claim tx: %w", err) 341 } 342 return nil, nil 343 } 344 if err != nil { 345 return nil, fmt.Errorf("claim indexing job: %w", err) 346 } 347 if err := tx.Commit(); err != nil { 348 return nil, fmt.Errorf("commit claim tx: %w", err) 349 } 350 return job, nil 351} 352 353func (s *PostgresStore) CompleteIndexingJob(ctx context.Context, documentID string) error { 354 now := time.Now().UTC().Format(time.RFC3339) 355 _, err := s.db.ExecContext(ctx, ` 356 UPDATE indexing_jobs 357 SET status = $1, updated_at = $2, completed_at = $3, 358 lease_owner = '', lease_expires_at = '', last_error = NULL 359 WHERE document_id = $4`, 360 IndexingJobCompleted, now, now, documentID, 361 ) 362 if err != nil { 363 return fmt.Errorf("complete indexing job: %w", err) 364 } 365 return nil 366} 367 368func (s *PostgresStore) RetryIndexingJob( 369 ctx context.Context, documentID string, nextScheduledAt string, lastError string, 370) error { 371 now := time.Now().UTC().Format(time.RFC3339) 372 _, err := s.db.ExecContext(ctx, ` 373 UPDATE indexing_jobs 374 SET status = $1, attempts = attempts + 1, last_error = $2, scheduled_at = $3, 375 updated_at = $4, lease_owner = '', lease_expires_at = '' 376 WHERE document_id = $5`, 377 IndexingJobPending, lastError, nextScheduledAt, now, documentID, 378 ) 379 if err != nil { 380 return fmt.Errorf("retry indexing job: %w", err) 381 } 382 return nil 383} 384 385func (s *PostgresStore) FailIndexingJob( 386 ctx context.Context, documentID string, status string, lastError string, 387) error { 388 now := time.Now().UTC().Format(time.RFC3339) 389 _, err := s.db.ExecContext(ctx, ` 390 UPDATE indexing_jobs 391 SET status = $1, attempts = attempts + 1, last_error = $2, updated_at = $3, 392 lease_owner = '', lease_expires_at = '' 393 WHERE document_id = $4`, 394 status, lastError, now, documentID, 395 ) 396 if err != nil { 397 return fmt.Errorf("fail indexing job: %w", err) 398 } 399 return nil 400} 401 402func (s *PostgresStore) ListIndexingJobs( 403 ctx context.Context, filter IndexingJobFilter, 404) ([]*IndexingJob, error) { 405 query := ` 406 SELECT document_id, did, collection, rkey, cid, record_json, source, 407 attempts, status, COALESCE(last_error, ''), scheduled_at, updated_at, 408 COALESCE(lease_owner, ''), COALESCE(lease_expires_at, ''), 409 COALESCE(completed_at, '') 410 FROM indexing_jobs 411 WHERE 1 = 1` 412 args := newPGArgs() 413 414 if filter.DocumentID != "" { 415 query += " AND document_id = " + args.Add(filter.DocumentID) 416 } 417 if filter.Status != "" { 418 query += " AND status = " + args.Add(filter.Status) 419 } 420 if filter.Source != "" { 421 query += " AND source = " + args.Add(filter.Source) 422 } 423 424 query += " ORDER BY updated_at::timestamptz DESC" 425 limit := filter.Limit 426 if limit <= 0 { 427 limit = 50 428 } 429 query += " LIMIT " + args.Add(limit) 430 if filter.Offset > 0 { 431 query += " OFFSET " + args.Add(filter.Offset) 432 } 433 434 rows, err := s.db.QueryContext(ctx, query, args.Values()...) 435 if err != nil { 436 return nil, fmt.Errorf("list indexing jobs: %w", err) 437 } 438 defer rows.Close() 439 440 var jobs []*IndexingJob 441 for rows.Next() { 442 job, err := scanIndexingJob(rows) 443 if err != nil { 444 return nil, fmt.Errorf("scan indexing job: %w", err) 445 } 446 jobs = append(jobs, job) 447 } 448 if err := rows.Err(); err != nil { 449 return nil, fmt.Errorf("iterate indexing jobs: %w", err) 450 } 451 return jobs, nil 452} 453 454func (s *PostgresStore) GetIndexingJobStats(ctx context.Context) (*IndexingJobStats, error) { 455 row := s.db.QueryRowContext(ctx, ` 456 SELECT 457 COUNT(*) FILTER (WHERE status = $1), 458 COUNT(*) FILTER (WHERE status = $2), 459 COUNT(*) FILTER (WHERE status = $3), 460 COUNT(*) FILTER (WHERE status = $4), 461 COUNT(*) FILTER (WHERE status = $5), 462 COALESCE(MIN(CASE WHEN status = $1 THEN scheduled_at END), ''), 463 COALESCE(MIN(CASE WHEN status = $2 THEN updated_at END), ''), 464 COALESCE(MAX(completed_at), ''), 465 COALESCE(MAX(updated_at), '') 466 FROM indexing_jobs`, 467 IndexingJobPending, IndexingJobProcessing, IndexingJobCompleted, 468 IndexingJobFailed, IndexingJobDeadLetter, 469 ) 470 471 stats := &IndexingJobStats{} 472 err := row.Scan( 473 &stats.Pending, &stats.Processing, &stats.Completed, &stats.Failed, 474 &stats.DeadLetter, &stats.OldestPendingAt, &stats.OldestRunningAt, 475 &stats.LastCompletedAt, &stats.LastProcessedAt, 476 ) 477 if err != nil { 478 return nil, fmt.Errorf("get indexing job stats: %w", err) 479 } 480 return stats, nil 481} 482 483func (s *PostgresStore) AppendIndexingAudit(ctx context.Context, input IndexingAuditInput) error { 484 now := time.Now().UTC().Format(time.RFC3339) 485 _, err := s.db.ExecContext(ctx, ` 486 INSERT INTO indexing_audit ( 487 source, document_id, collection, cid, decision, attempt, error, created_at 488 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, 489 input.Source, input.DocumentID, input.Collection, input.CID, 490 input.Decision, input.Attempt, nullableStr(input.Error), now, 491 ) 492 if err != nil { 493 return fmt.Errorf("append indexing audit: %w", err) 494 } 495 return nil 496} 497 498func (s *PostgresStore) ListIndexingAudit( 499 ctx context.Context, filter IndexingAuditFilter, 500) ([]*IndexingAuditEntry, error) { 501 query := ` 502 SELECT id, source, document_id, collection, cid, decision, 503 attempt, COALESCE(error, ''), created_at 504 FROM indexing_audit 505 WHERE 1 = 1` 506 args := newPGArgs() 507 508 if filter.DocumentID != "" { 509 query += " AND document_id = " + args.Add(filter.DocumentID) 510 } 511 if filter.Source != "" { 512 query += " AND source = " + args.Add(filter.Source) 513 } 514 if filter.Decision != "" { 515 query += " AND decision = " + args.Add(filter.Decision) 516 } 517 518 query += " ORDER BY created_at::timestamptz DESC" 519 limit := filter.Limit 520 if limit <= 0 { 521 limit = 50 522 } 523 query += " LIMIT " + args.Add(limit) 524 if filter.Offset > 0 { 525 query += " OFFSET " + args.Add(filter.Offset) 526 } 527 528 rows, err := s.db.QueryContext(ctx, query, args.Values()...) 529 if err != nil { 530 return nil, fmt.Errorf("list indexing audit: %w", err) 531 } 532 defer rows.Close() 533 534 var entries []*IndexingAuditEntry 535 for rows.Next() { 536 entry := &IndexingAuditEntry{} 537 if err := rows.Scan( 538 &entry.ID, &entry.Source, &entry.DocumentID, &entry.Collection, 539 &entry.CID, &entry.Decision, &entry.Attempt, &entry.Error, 540 &entry.CreatedAt, 541 ); err != nil { 542 return nil, fmt.Errorf("scan indexing audit: %w", err) 543 } 544 entries = append(entries, entry) 545 } 546 if err := rows.Err(); err != nil { 547 return nil, fmt.Errorf("iterate indexing audit: %w", err) 548 } 549 return entries, nil 550} 551 552func (s *PostgresStore) GetFollowSubjects(ctx context.Context, did string) ([]string, error) { 553 rows, err := s.db.QueryContext(ctx, ` 554 SELECT DISTINCT repo_did 555 FROM documents 556 WHERE did = $1 557 AND collection = 'sh.tangled.graph.follow' 558 AND deleted_at IS NULL 559 AND repo_did IS NOT NULL 560 AND repo_did != ''`, 561 did, 562 ) 563 if err != nil { 564 return nil, fmt.Errorf("get follow subjects: %w", err) 565 } 566 defer rows.Close() 567 568 var subjects []string 569 for rows.Next() { 570 var subject string 571 if err := rows.Scan(&subject); err != nil { 572 return nil, fmt.Errorf("scan follow subject: %w", err) 573 } 574 subjects = append(subjects, subject) 575 } 576 if err := rows.Err(); err != nil { 577 return nil, fmt.Errorf("iterate follow subjects: %w", err) 578 } 579 return subjects, nil 580} 581 582func (s *PostgresStore) GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) { 583 rows, err := s.db.QueryContext(ctx, ` 584 SELECT DISTINCT did 585 FROM documents 586 WHERE repo_did = $1 587 AND did != $1 588 AND deleted_at IS NULL 589 AND collection IN ( 590 'sh.tangled.repo.issue', 591 'sh.tangled.repo.pull', 592 'sh.tangled.repo.issue.comment', 593 'sh.tangled.repo.pull.comment' 594 )`, 595 repoOwnerDID, 596 ) 597 if err != nil { 598 return nil, fmt.Errorf("get repo collaborators: %w", err) 599 } 600 defer rows.Close() 601 602 var collaborators []string 603 for rows.Next() { 604 var collaborator string 605 if err := rows.Scan(&collaborator); err != nil { 606 return nil, fmt.Errorf("scan collaborator: %w", err) 607 } 608 collaborators = append(collaborators, collaborator) 609 } 610 if err := rows.Err(); err != nil { 611 return nil, fmt.Errorf("iterate collaborators: %w", err) 612 } 613 return collaborators, nil 614} 615 616func (s *PostgresStore) CountDocuments(ctx context.Context) (int64, error) { 617 var n int64 618 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM documents WHERE deleted_at IS NULL`).Scan(&n) 619 if err != nil { 620 return 0, fmt.Errorf("count documents: %w", err) 621 } 622 return n, nil 623} 624 625func (s *PostgresStore) CountPendingIndexingJobs(ctx context.Context) (int64, error) { 626 var n int64 627 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM indexing_jobs WHERE status = 'pending'`).Scan(&n) 628 if err != nil { 629 return 0, fmt.Errorf("count pending indexing jobs: %w", err) 630 } 631 return n, nil 632} 633 634func (s *PostgresStore) InsertJetstreamEvent(ctx context.Context, event *JetstreamEvent, maxEvents int) error { 635 tx, err := s.db.BeginTx(ctx, nil) 636 if err != nil { 637 return fmt.Errorf("begin insert jetstream event tx: %w", err) 638 } 639 defer tx.Rollback() 640 641 _, err = tx.ExecContext(ctx, ` 642 INSERT INTO jetstream_events (time_us, did, kind, collection, rkey, operation, payload, received_at) 643 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, 644 event.TimeUS, event.DID, event.Kind, nullableStr(event.Collection), 645 nullableStr(event.RKey), nullableStr(event.Operation), event.Payload, event.ReceivedAt, 646 ) 647 if err != nil { 648 return fmt.Errorf("insert jetstream event: %w", err) 649 } 650 651 if maxEvents > 0 { 652 _, err = tx.ExecContext(ctx, ` 653 DELETE FROM jetstream_events 654 WHERE id IN ( 655 SELECT id FROM jetstream_events 656 ORDER BY time_us DESC 657 OFFSET $1 658 )`, maxEvents) 659 if err != nil { 660 return fmt.Errorf("trim jetstream events: %w", err) 661 } 662 } 663 664 if err := tx.Commit(); err != nil { 665 return fmt.Errorf("commit insert jetstream event tx: %w", err) 666 } 667 return nil 668} 669 670func (s *PostgresStore) ListJetstreamEvents( 671 ctx context.Context, filter JetstreamEventFilter, 672) ([]*JetstreamEvent, error) { 673 query := ` 674 SELECT id, time_us, did, kind, 675 COALESCE(collection, ''), COALESCE(rkey, ''), COALESCE(operation, ''), 676 payload, received_at 677 FROM jetstream_events WHERE 1=1` 678 args := newPGArgs() 679 680 if filter.Collection != "" { 681 query += " AND collection = " + args.Add(filter.Collection) 682 } 683 if filter.DID != "" { 684 query += " AND did = " + args.Add(filter.DID) 685 } 686 if filter.Operation != "" { 687 query += " AND operation = " + args.Add(filter.Operation) 688 } 689 690 query += " ORDER BY time_us DESC" 691 limit := filter.Limit 692 if limit <= 0 { 693 limit = 50 694 } 695 query += " LIMIT " + args.Add(limit) 696 if filter.Offset > 0 { 697 query += " OFFSET " + args.Add(filter.Offset) 698 } 699 700 rows, err := s.db.QueryContext(ctx, query, args.Values()...) 701 if err != nil { 702 return nil, fmt.Errorf("list jetstream events: %w", err) 703 } 704 defer rows.Close() 705 706 var events []*JetstreamEvent 707 for rows.Next() { 708 e := &JetstreamEvent{} 709 if err := rows.Scan( 710 &e.ID, &e.TimeUS, &e.DID, &e.Kind, 711 &e.Collection, &e.RKey, &e.Operation, 712 &e.Payload, &e.ReceivedAt, 713 ); err != nil { 714 return nil, fmt.Errorf("scan jetstream event: %w", err) 715 } 716 events = append(events, e) 717 } 718 if err := rows.Err(); err != nil { 719 return nil, fmt.Errorf("iterate jetstream events: %w", err) 720 } 721 return events, nil 722} 723 724func (s *PostgresStore) Ping(ctx context.Context) error { 725 return s.db.PingContext(ctx) 726}