a love letter to tangled (android, iOS, and a search API)
1package store
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "strings"
9 "time"
10)
11
12type PostgresStore struct {
13 db *sql.DB
14}
15
16type pgArgs struct {
17 args []any
18}
19
20func newPGArgs(initial ...any) *pgArgs {
21 return &pgArgs{args: append([]any{}, initial...)}
22}
23
24func (p *pgArgs) Add(value any) string {
25 p.args = append(p.args, value)
26 return fmt.Sprintf("$%d", len(p.args))
27}
28
29func (p *pgArgs) Values() []any {
30 return p.args
31}
32
33func (s *PostgresStore) UpsertDocument(ctx context.Context, doc *Document) error {
34 doc.IndexedAt = time.Now().UTC().Format(time.RFC3339)
35
36 _, err := s.db.ExecContext(ctx, `
37 INSERT INTO documents (
38 id, did, collection, rkey, at_uri, cid, record_type,
39 title, body, summary, repo_did, repo_name, author_handle,
40 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at
41 ) VALUES (
42 $1, $2, $3, $4, $5, $6, $7,
43 $8, $9, $10, $11, $12, $13,
44 $14, $15, $16, $17, $18, $19, $20
45 )
46 ON CONFLICT(id) DO UPDATE SET
47 did = excluded.did,
48 collection = excluded.collection,
49 rkey = excluded.rkey,
50 at_uri = excluded.at_uri,
51 cid = excluded.cid,
52 record_type = excluded.record_type,
53 title = excluded.title,
54 body = excluded.body,
55 summary = excluded.summary,
56 repo_did = excluded.repo_did,
57 repo_name = excluded.repo_name,
58 author_handle = excluded.author_handle,
59 tags_json = excluded.tags_json,
60 language = excluded.language,
61 created_at = excluded.created_at,
62 updated_at = excluded.updated_at,
63 indexed_at = excluded.indexed_at,
64 web_url = excluded.web_url,
65 deleted_at = excluded.deleted_at`,
66 doc.ID, doc.DID, doc.Collection, doc.RKey, doc.ATURI, doc.CID, doc.RecordType,
67 doc.Title, doc.Body, doc.Summary, doc.RepoDID, doc.RepoName, doc.AuthorHandle,
68 doc.TagsJSON, doc.Language, doc.CreatedAt, doc.UpdatedAt, doc.IndexedAt,
69 doc.WebURL, nullableStr(doc.DeletedAt),
70 )
71 if err != nil {
72 return fmt.Errorf("upsert document: %w", err)
73 }
74 return nil
75}
76
77func (s *PostgresStore) ListDocuments(ctx context.Context, filter DocumentFilter) ([]*Document, error) {
78 query := `SELECT id, did, collection, rkey, at_uri, cid, record_type,
79 title, body, summary, repo_did, repo_name, author_handle,
80 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at
81 FROM documents WHERE deleted_at IS NULL`
82 args := newPGArgs()
83
84 if filter.DocumentID != "" {
85 query += " AND id = " + args.Add(filter.DocumentID)
86 }
87 if filter.Collection != "" {
88 query += " AND collection = " + args.Add(filter.Collection)
89 }
90 if filter.DID != "" {
91 query += " AND did = " + args.Add(filter.DID)
92 }
93
94 rows, err := s.db.QueryContext(ctx, query, args.Values()...)
95 if err != nil {
96 return nil, fmt.Errorf("list documents: %w", err)
97 }
98 defer rows.Close()
99
100 var docs []*Document
101 for rows.Next() {
102 doc := &Document{}
103 var (
104 title, body, summary, repoDID, repoName, authorHandle sql.NullString
105 tagsJSON, language, createdAt, updatedAt, webURL, deletedAt sql.NullString
106 )
107 if err := rows.Scan(
108 &doc.ID, &doc.DID, &doc.Collection, &doc.RKey, &doc.ATURI, &doc.CID, &doc.RecordType,
109 &title, &body, &summary, &repoDID, &repoName, &authorHandle,
110 &tagsJSON, &language, &createdAt, &updatedAt, &doc.IndexedAt, &webURL, &deletedAt,
111 ); err != nil {
112 return nil, fmt.Errorf("scan document: %w", err)
113 }
114 doc.Title = title.String
115 doc.Body = body.String
116 doc.Summary = summary.String
117 doc.RepoDID = repoDID.String
118 doc.RepoName = repoName.String
119 doc.AuthorHandle = authorHandle.String
120 doc.TagsJSON = tagsJSON.String
121 doc.Language = language.String
122 doc.CreatedAt = createdAt.String
123 doc.UpdatedAt = updatedAt.String
124 doc.WebURL = webURL.String
125 doc.DeletedAt = deletedAt.String
126 docs = append(docs, doc)
127 }
128 if err := rows.Err(); err != nil {
129 return nil, fmt.Errorf("iterate documents: %w", err)
130 }
131 return docs, nil
132}
133
134func (s *PostgresStore) OptimizeSearchIndex(_ context.Context) error {
135 return nil
136}
137
138func (s *PostgresStore) GetDocument(ctx context.Context, id string) (*Document, error) {
139 row := s.db.QueryRowContext(ctx, `
140 SELECT id, did, collection, rkey, at_uri, cid, record_type,
141 title, body, summary, repo_did, repo_name, author_handle,
142 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at
143 FROM documents WHERE id = $1`, id)
144
145 doc, err := scanDocument(row)
146 if errors.Is(err, sql.ErrNoRows) {
147 return nil, nil
148 }
149 if err != nil {
150 return nil, fmt.Errorf("get document: %w", err)
151 }
152 return doc, nil
153}
154
155func (s *PostgresStore) MarkDeleted(ctx context.Context, id string) error {
156 now := time.Now().UTC().Format(time.RFC3339)
157 _, err := s.db.ExecContext(ctx, `UPDATE documents SET deleted_at = $1 WHERE id = $2`, now, id)
158 if err != nil {
159 return fmt.Errorf("mark deleted: %w", err)
160 }
161 return nil
162}
163
164func (s *PostgresStore) GetSyncState(ctx context.Context, consumer string) (*SyncState, error) {
165 row := s.db.QueryRowContext(ctx, `
166 SELECT consumer_name, cursor, COALESCE(high_water_mark, ''), updated_at
167 FROM sync_state WHERE consumer_name = $1`, consumer)
168
169 ss := &SyncState{}
170 err := row.Scan(&ss.ConsumerName, &ss.Cursor, &ss.HighWaterMark, &ss.UpdatedAt)
171 if errors.Is(err, sql.ErrNoRows) {
172 return nil, nil
173 }
174 if err != nil {
175 return nil, fmt.Errorf("get sync state: %w", err)
176 }
177 return ss, nil
178}
179
180func (s *PostgresStore) SetSyncState(ctx context.Context, consumer string, cursor string) error {
181 now := time.Now().UTC().Format(time.RFC3339)
182 _, err := s.db.ExecContext(ctx, `
183 INSERT INTO sync_state (consumer_name, cursor, updated_at) VALUES ($1, $2, $3)
184 ON CONFLICT(consumer_name) DO UPDATE SET
185 cursor = excluded.cursor,
186 updated_at = excluded.updated_at`,
187 consumer, cursor, now,
188 )
189 if err != nil {
190 return fmt.Errorf("set sync state: %w", err)
191 }
192 return nil
193}
194
195func (s *PostgresStore) UpdateRecordState(ctx context.Context, subjectURI string, state string) error {
196 now := time.Now().UTC().Format(time.RFC3339)
197 _, err := s.db.ExecContext(ctx, `
198 INSERT INTO record_state (subject_uri, state, updated_at) VALUES ($1, $2, $3)
199 ON CONFLICT(subject_uri) DO UPDATE SET
200 state = excluded.state,
201 updated_at = excluded.updated_at`,
202 subjectURI, state, now,
203 )
204 if err != nil {
205 return fmt.Errorf("update record state: %w", err)
206 }
207 return nil
208}
209
210func (s *PostgresStore) UpsertIdentityHandle(
211 ctx context.Context, did, handle string, isActive bool, status string,
212) error {
213 now := time.Now().UTC().Format(time.RFC3339)
214 _, err := s.db.ExecContext(ctx, `
215 INSERT INTO identity_handles (did, handle, is_active, status, updated_at)
216 VALUES ($1, $2, $3, $4, $5)
217 ON CONFLICT(did) DO UPDATE SET
218 handle = excluded.handle,
219 is_active = excluded.is_active,
220 status = excluded.status,
221 updated_at = excluded.updated_at`,
222 did, handle, isActive, nullableStr(status), now,
223 )
224 if err != nil {
225 return fmt.Errorf("upsert identity handle: %w", err)
226 }
227 return nil
228}
229
230func (s *PostgresStore) GetIdentityHandle(ctx context.Context, did string) (string, error) {
231 var handle sql.NullString
232 err := s.db.QueryRowContext(ctx, `SELECT handle FROM identity_handles WHERE did = $1`, did).Scan(&handle)
233 if errors.Is(err, sql.ErrNoRows) {
234 return "", nil
235 }
236 if err != nil {
237 return "", fmt.Errorf("get identity handle: %w", err)
238 }
239 return handle.String, nil
240}
241
242func (s *PostgresStore) GetIndexingJob(ctx context.Context, documentID string) (*IndexingJob, error) {
243 row := s.db.QueryRowContext(ctx, `
244 SELECT document_id, did, collection, rkey, cid, record_json, source,
245 attempts, status, COALESCE(last_error, ''), scheduled_at, updated_at,
246 COALESCE(lease_owner, ''), COALESCE(lease_expires_at, ''),
247 COALESCE(completed_at, '')
248 FROM indexing_jobs
249 WHERE document_id = $1`, documentID)
250
251 job, err := scanIndexingJob(row)
252 if errors.Is(err, sql.ErrNoRows) {
253 return nil, nil
254 }
255 if err != nil {
256 return nil, fmt.Errorf("get indexing job: %w", err)
257 }
258 return job, nil
259}
260
261func (s *PostgresStore) EnqueueIndexingJob(ctx context.Context, input IndexingJobInput) error {
262 now := time.Now().UTC().Format(time.RFC3339)
263 source := strings.TrimSpace(input.Source)
264 if source == "" {
265 source = IndexSourceReadThrough
266 }
267
268 _, err := s.db.ExecContext(ctx, `
269 INSERT INTO indexing_jobs (
270 document_id, did, collection, rkey, cid, record_json, source,
271 status, attempts, last_error, scheduled_at, updated_at,
272 lease_owner, lease_expires_at, completed_at
273 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 0, NULL, $9, $10, '', '', '')
274 ON CONFLICT(document_id) DO UPDATE SET
275 did = excluded.did,
276 collection = excluded.collection,
277 rkey = excluded.rkey,
278 cid = excluded.cid,
279 record_json = excluded.record_json,
280 source = excluded.source,
281 status = excluded.status,
282 last_error = NULL,
283 scheduled_at = excluded.scheduled_at,
284 updated_at = excluded.updated_at,
285 lease_owner = '',
286 lease_expires_at = '',
287 completed_at = ''`,
288 input.DocumentID, input.DID, input.Collection, input.RKey, input.CID,
289 input.RecordJSON, source, IndexingJobPending, now, now,
290 )
291 if err != nil {
292 return fmt.Errorf("enqueue indexing job: %w", err)
293 }
294 return nil
295}
296
297func (s *PostgresStore) ClaimIndexingJob(
298 ctx context.Context, workerID string, leaseUntil string,
299) (*IndexingJob, error) {
300 now := time.Now().UTC().Format(time.RFC3339)
301 tx, err := s.db.BeginTx(ctx, nil)
302 if err != nil {
303 return nil, fmt.Errorf("begin claim tx: %w", err)
304 }
305 defer tx.Rollback()
306
307 row := tx.QueryRowContext(ctx, `
308 WITH candidate AS (
309 SELECT document_id
310 FROM indexing_jobs
311 WHERE (
312 status = $1
313 AND scheduled_at::timestamptz <= $2::timestamptz
314 ) OR (
315 status = $3
316 AND lease_expires_at IS NOT NULL
317 AND lease_expires_at != ''
318 AND lease_expires_at::timestamptz <= $2::timestamptz
319 )
320 ORDER BY scheduled_at::timestamptz ASC, updated_at::timestamptz ASC
321 FOR UPDATE SKIP LOCKED
322 LIMIT 1
323 )
324 UPDATE indexing_jobs jobs
325 SET status = $4, updated_at = $5, lease_owner = $6, lease_expires_at = $7
326 FROM candidate
327 WHERE jobs.document_id = candidate.document_id
328 RETURNING jobs.document_id, jobs.did, jobs.collection, jobs.rkey, jobs.cid,
329 jobs.record_json, jobs.source, jobs.attempts, jobs.status,
330 COALESCE(jobs.last_error, ''), jobs.scheduled_at, jobs.updated_at,
331 COALESCE(jobs.lease_owner, ''), COALESCE(jobs.lease_expires_at, ''),
332 COALESCE(jobs.completed_at, '')`,
333 IndexingJobPending, now, IndexingJobProcessing, IndexingJobProcessing,
334 now, workerID, leaseUntil,
335 )
336
337 job, err := scanIndexingJob(row)
338 if errors.Is(err, sql.ErrNoRows) {
339 if err := tx.Commit(); err != nil {
340 return nil, fmt.Errorf("commit empty claim tx: %w", err)
341 }
342 return nil, nil
343 }
344 if err != nil {
345 return nil, fmt.Errorf("claim indexing job: %w", err)
346 }
347 if err := tx.Commit(); err != nil {
348 return nil, fmt.Errorf("commit claim tx: %w", err)
349 }
350 return job, nil
351}
352
353func (s *PostgresStore) CompleteIndexingJob(ctx context.Context, documentID string) error {
354 now := time.Now().UTC().Format(time.RFC3339)
355 _, err := s.db.ExecContext(ctx, `
356 UPDATE indexing_jobs
357 SET status = $1, updated_at = $2, completed_at = $3,
358 lease_owner = '', lease_expires_at = '', last_error = NULL
359 WHERE document_id = $4`,
360 IndexingJobCompleted, now, now, documentID,
361 )
362 if err != nil {
363 return fmt.Errorf("complete indexing job: %w", err)
364 }
365 return nil
366}
367
368func (s *PostgresStore) RetryIndexingJob(
369 ctx context.Context, documentID string, nextScheduledAt string, lastError string,
370) error {
371 now := time.Now().UTC().Format(time.RFC3339)
372 _, err := s.db.ExecContext(ctx, `
373 UPDATE indexing_jobs
374 SET status = $1, attempts = attempts + 1, last_error = $2, scheduled_at = $3,
375 updated_at = $4, lease_owner = '', lease_expires_at = ''
376 WHERE document_id = $5`,
377 IndexingJobPending, lastError, nextScheduledAt, now, documentID,
378 )
379 if err != nil {
380 return fmt.Errorf("retry indexing job: %w", err)
381 }
382 return nil
383}
384
385func (s *PostgresStore) FailIndexingJob(
386 ctx context.Context, documentID string, status string, lastError string,
387) error {
388 now := time.Now().UTC().Format(time.RFC3339)
389 _, err := s.db.ExecContext(ctx, `
390 UPDATE indexing_jobs
391 SET status = $1, attempts = attempts + 1, last_error = $2, updated_at = $3,
392 lease_owner = '', lease_expires_at = ''
393 WHERE document_id = $4`,
394 status, lastError, now, documentID,
395 )
396 if err != nil {
397 return fmt.Errorf("fail indexing job: %w", err)
398 }
399 return nil
400}
401
402func (s *PostgresStore) ListIndexingJobs(
403 ctx context.Context, filter IndexingJobFilter,
404) ([]*IndexingJob, error) {
405 query := `
406 SELECT document_id, did, collection, rkey, cid, record_json, source,
407 attempts, status, COALESCE(last_error, ''), scheduled_at, updated_at,
408 COALESCE(lease_owner, ''), COALESCE(lease_expires_at, ''),
409 COALESCE(completed_at, '')
410 FROM indexing_jobs
411 WHERE 1 = 1`
412 args := newPGArgs()
413
414 if filter.DocumentID != "" {
415 query += " AND document_id = " + args.Add(filter.DocumentID)
416 }
417 if filter.Status != "" {
418 query += " AND status = " + args.Add(filter.Status)
419 }
420 if filter.Source != "" {
421 query += " AND source = " + args.Add(filter.Source)
422 }
423
424 query += " ORDER BY updated_at::timestamptz DESC"
425 limit := filter.Limit
426 if limit <= 0 {
427 limit = 50
428 }
429 query += " LIMIT " + args.Add(limit)
430 if filter.Offset > 0 {
431 query += " OFFSET " + args.Add(filter.Offset)
432 }
433
434 rows, err := s.db.QueryContext(ctx, query, args.Values()...)
435 if err != nil {
436 return nil, fmt.Errorf("list indexing jobs: %w", err)
437 }
438 defer rows.Close()
439
440 var jobs []*IndexingJob
441 for rows.Next() {
442 job, err := scanIndexingJob(rows)
443 if err != nil {
444 return nil, fmt.Errorf("scan indexing job: %w", err)
445 }
446 jobs = append(jobs, job)
447 }
448 if err := rows.Err(); err != nil {
449 return nil, fmt.Errorf("iterate indexing jobs: %w", err)
450 }
451 return jobs, nil
452}
453
454func (s *PostgresStore) GetIndexingJobStats(ctx context.Context) (*IndexingJobStats, error) {
455 row := s.db.QueryRowContext(ctx, `
456 SELECT
457 COUNT(*) FILTER (WHERE status = $1),
458 COUNT(*) FILTER (WHERE status = $2),
459 COUNT(*) FILTER (WHERE status = $3),
460 COUNT(*) FILTER (WHERE status = $4),
461 COUNT(*) FILTER (WHERE status = $5),
462 COALESCE(MIN(CASE WHEN status = $1 THEN scheduled_at END), ''),
463 COALESCE(MIN(CASE WHEN status = $2 THEN updated_at END), ''),
464 COALESCE(MAX(completed_at), ''),
465 COALESCE(MAX(updated_at), '')
466 FROM indexing_jobs`,
467 IndexingJobPending, IndexingJobProcessing, IndexingJobCompleted,
468 IndexingJobFailed, IndexingJobDeadLetter,
469 )
470
471 stats := &IndexingJobStats{}
472 err := row.Scan(
473 &stats.Pending, &stats.Processing, &stats.Completed, &stats.Failed,
474 &stats.DeadLetter, &stats.OldestPendingAt, &stats.OldestRunningAt,
475 &stats.LastCompletedAt, &stats.LastProcessedAt,
476 )
477 if err != nil {
478 return nil, fmt.Errorf("get indexing job stats: %w", err)
479 }
480 return stats, nil
481}
482
483func (s *PostgresStore) AppendIndexingAudit(ctx context.Context, input IndexingAuditInput) error {
484 now := time.Now().UTC().Format(time.RFC3339)
485 _, err := s.db.ExecContext(ctx, `
486 INSERT INTO indexing_audit (
487 source, document_id, collection, cid, decision, attempt, error, created_at
488 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
489 input.Source, input.DocumentID, input.Collection, input.CID,
490 input.Decision, input.Attempt, nullableStr(input.Error), now,
491 )
492 if err != nil {
493 return fmt.Errorf("append indexing audit: %w", err)
494 }
495 return nil
496}
497
498func (s *PostgresStore) ListIndexingAudit(
499 ctx context.Context, filter IndexingAuditFilter,
500) ([]*IndexingAuditEntry, error) {
501 query := `
502 SELECT id, source, document_id, collection, cid, decision,
503 attempt, COALESCE(error, ''), created_at
504 FROM indexing_audit
505 WHERE 1 = 1`
506 args := newPGArgs()
507
508 if filter.DocumentID != "" {
509 query += " AND document_id = " + args.Add(filter.DocumentID)
510 }
511 if filter.Source != "" {
512 query += " AND source = " + args.Add(filter.Source)
513 }
514 if filter.Decision != "" {
515 query += " AND decision = " + args.Add(filter.Decision)
516 }
517
518 query += " ORDER BY created_at::timestamptz DESC"
519 limit := filter.Limit
520 if limit <= 0 {
521 limit = 50
522 }
523 query += " LIMIT " + args.Add(limit)
524 if filter.Offset > 0 {
525 query += " OFFSET " + args.Add(filter.Offset)
526 }
527
528 rows, err := s.db.QueryContext(ctx, query, args.Values()...)
529 if err != nil {
530 return nil, fmt.Errorf("list indexing audit: %w", err)
531 }
532 defer rows.Close()
533
534 var entries []*IndexingAuditEntry
535 for rows.Next() {
536 entry := &IndexingAuditEntry{}
537 if err := rows.Scan(
538 &entry.ID, &entry.Source, &entry.DocumentID, &entry.Collection,
539 &entry.CID, &entry.Decision, &entry.Attempt, &entry.Error,
540 &entry.CreatedAt,
541 ); err != nil {
542 return nil, fmt.Errorf("scan indexing audit: %w", err)
543 }
544 entries = append(entries, entry)
545 }
546 if err := rows.Err(); err != nil {
547 return nil, fmt.Errorf("iterate indexing audit: %w", err)
548 }
549 return entries, nil
550}
551
552func (s *PostgresStore) GetFollowSubjects(ctx context.Context, did string) ([]string, error) {
553 rows, err := s.db.QueryContext(ctx, `
554 SELECT DISTINCT repo_did
555 FROM documents
556 WHERE did = $1
557 AND collection = 'sh.tangled.graph.follow'
558 AND deleted_at IS NULL
559 AND repo_did IS NOT NULL
560 AND repo_did != ''`,
561 did,
562 )
563 if err != nil {
564 return nil, fmt.Errorf("get follow subjects: %w", err)
565 }
566 defer rows.Close()
567
568 var subjects []string
569 for rows.Next() {
570 var subject string
571 if err := rows.Scan(&subject); err != nil {
572 return nil, fmt.Errorf("scan follow subject: %w", err)
573 }
574 subjects = append(subjects, subject)
575 }
576 if err := rows.Err(); err != nil {
577 return nil, fmt.Errorf("iterate follow subjects: %w", err)
578 }
579 return subjects, nil
580}
581
582func (s *PostgresStore) GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) {
583 rows, err := s.db.QueryContext(ctx, `
584 SELECT DISTINCT did
585 FROM documents
586 WHERE repo_did = $1
587 AND did != $1
588 AND deleted_at IS NULL
589 AND collection IN (
590 'sh.tangled.repo.issue',
591 'sh.tangled.repo.pull',
592 'sh.tangled.repo.issue.comment',
593 'sh.tangled.repo.pull.comment'
594 )`,
595 repoOwnerDID,
596 )
597 if err != nil {
598 return nil, fmt.Errorf("get repo collaborators: %w", err)
599 }
600 defer rows.Close()
601
602 var collaborators []string
603 for rows.Next() {
604 var collaborator string
605 if err := rows.Scan(&collaborator); err != nil {
606 return nil, fmt.Errorf("scan collaborator: %w", err)
607 }
608 collaborators = append(collaborators, collaborator)
609 }
610 if err := rows.Err(); err != nil {
611 return nil, fmt.Errorf("iterate collaborators: %w", err)
612 }
613 return collaborators, nil
614}
615
616func (s *PostgresStore) CountDocuments(ctx context.Context) (int64, error) {
617 var n int64
618 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM documents WHERE deleted_at IS NULL`).Scan(&n)
619 if err != nil {
620 return 0, fmt.Errorf("count documents: %w", err)
621 }
622 return n, nil
623}
624
625func (s *PostgresStore) CountPendingIndexingJobs(ctx context.Context) (int64, error) {
626 var n int64
627 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM indexing_jobs WHERE status = 'pending'`).Scan(&n)
628 if err != nil {
629 return 0, fmt.Errorf("count pending indexing jobs: %w", err)
630 }
631 return n, nil
632}
633
634func (s *PostgresStore) InsertJetstreamEvent(ctx context.Context, event *JetstreamEvent, maxEvents int) error {
635 tx, err := s.db.BeginTx(ctx, nil)
636 if err != nil {
637 return fmt.Errorf("begin insert jetstream event tx: %w", err)
638 }
639 defer tx.Rollback()
640
641 _, err = tx.ExecContext(ctx, `
642 INSERT INTO jetstream_events (time_us, did, kind, collection, rkey, operation, payload, received_at)
643 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
644 event.TimeUS, event.DID, event.Kind, nullableStr(event.Collection),
645 nullableStr(event.RKey), nullableStr(event.Operation), event.Payload, event.ReceivedAt,
646 )
647 if err != nil {
648 return fmt.Errorf("insert jetstream event: %w", err)
649 }
650
651 if maxEvents > 0 {
652 _, err = tx.ExecContext(ctx, `
653 DELETE FROM jetstream_events
654 WHERE id IN (
655 SELECT id FROM jetstream_events
656 ORDER BY time_us DESC
657 OFFSET $1
658 )`, maxEvents)
659 if err != nil {
660 return fmt.Errorf("trim jetstream events: %w", err)
661 }
662 }
663
664 if err := tx.Commit(); err != nil {
665 return fmt.Errorf("commit insert jetstream event tx: %w", err)
666 }
667 return nil
668}
669
670func (s *PostgresStore) ListJetstreamEvents(
671 ctx context.Context, filter JetstreamEventFilter,
672) ([]*JetstreamEvent, error) {
673 query := `
674 SELECT id, time_us, did, kind,
675 COALESCE(collection, ''), COALESCE(rkey, ''), COALESCE(operation, ''),
676 payload, received_at
677 FROM jetstream_events WHERE 1=1`
678 args := newPGArgs()
679
680 if filter.Collection != "" {
681 query += " AND collection = " + args.Add(filter.Collection)
682 }
683 if filter.DID != "" {
684 query += " AND did = " + args.Add(filter.DID)
685 }
686 if filter.Operation != "" {
687 query += " AND operation = " + args.Add(filter.Operation)
688 }
689
690 query += " ORDER BY time_us DESC"
691 limit := filter.Limit
692 if limit <= 0 {
693 limit = 50
694 }
695 query += " LIMIT " + args.Add(limit)
696 if filter.Offset > 0 {
697 query += " OFFSET " + args.Add(filter.Offset)
698 }
699
700 rows, err := s.db.QueryContext(ctx, query, args.Values()...)
701 if err != nil {
702 return nil, fmt.Errorf("list jetstream events: %w", err)
703 }
704 defer rows.Close()
705
706 var events []*JetstreamEvent
707 for rows.Next() {
708 e := &JetstreamEvent{}
709 if err := rows.Scan(
710 &e.ID, &e.TimeUS, &e.DID, &e.Kind,
711 &e.Collection, &e.RKey, &e.Operation,
712 &e.Payload, &e.ReceivedAt,
713 ); err != nil {
714 return nil, fmt.Errorf("scan jetstream event: %w", err)
715 }
716 events = append(events, e)
717 }
718 if err := rows.Err(); err != nil {
719 return nil, fmt.Errorf("iterate jetstream events: %w", err)
720 }
721 return events, nil
722}
723
724func (s *PostgresStore) Ping(ctx context.Context) error {
725 return s.db.PingContext(ctx)
726}