a love letter to tangled (android, iOS, and a search API)
1package store
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "time"
9)
10
11// SQLiteStore implements Store against a SQLite database.
12type SQLiteStore struct {
13 db *sql.DB
14}
15
16// New wraps an open *sql.DB in a Store implementation.
17func New(url string, db *sql.DB) Store {
18 if DetectBackend(url) == BackendPostgres {
19 return &PostgresStore{db: db}
20 }
21 return &SQLiteStore{db: db}
22}
23
24func (s *SQLiteStore) UpsertDocument(ctx context.Context, doc *Document) error {
25 doc.IndexedAt = time.Now().UTC().Format(time.RFC3339)
26 tx, err := s.db.BeginTx(ctx, nil)
27 if err != nil {
28 return fmt.Errorf("begin upsert document tx: %w", err)
29 }
30 defer tx.Rollback()
31
32 _, err = tx.ExecContext(ctx, `
33 INSERT INTO documents (
34 id, did, collection, rkey, at_uri, cid, record_type,
35 title, body, summary, repo_did, repo_name, author_handle,
36 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at
37 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
38 ON CONFLICT(id) DO UPDATE SET
39 did = excluded.did,
40 collection = excluded.collection,
41 rkey = excluded.rkey,
42 at_uri = excluded.at_uri,
43 cid = excluded.cid,
44 record_type = excluded.record_type,
45 title = excluded.title,
46 body = excluded.body,
47 summary = excluded.summary,
48 repo_did = excluded.repo_did,
49 repo_name = excluded.repo_name,
50 author_handle = excluded.author_handle,
51 tags_json = excluded.tags_json,
52 language = excluded.language,
53 created_at = excluded.created_at,
54 updated_at = excluded.updated_at,
55 indexed_at = excluded.indexed_at,
56 web_url = excluded.web_url,
57 deleted_at = excluded.deleted_at`,
58 doc.ID, doc.DID, doc.Collection, doc.RKey, doc.ATURI, doc.CID, doc.RecordType,
59 doc.Title, doc.Body, doc.Summary, doc.RepoDID, doc.RepoName, doc.AuthorHandle,
60 doc.TagsJSON, doc.Language, doc.CreatedAt, doc.UpdatedAt, doc.IndexedAt, doc.WebURL, nullableStr(doc.DeletedAt),
61 )
62 if err != nil {
63 return fmt.Errorf("upsert document: %w", err)
64 }
65 if err := syncDocumentFTS(ctx, tx, doc); err != nil {
66 return err
67 }
68 if err := tx.Commit(); err != nil {
69 return fmt.Errorf("commit upsert document tx: %w", err)
70 }
71 return nil
72}
73
74func (s *SQLiteStore) ListDocuments(ctx context.Context, filter DocumentFilter) ([]*Document, error) {
75 query := `SELECT id, did, collection, rkey, at_uri, cid, record_type,
76 title, body, summary, repo_did, repo_name, author_handle,
77 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at
78 FROM documents WHERE deleted_at IS NULL`
79 args := []any{}
80
81 if filter.DocumentID != "" {
82 query += " AND id = ?"
83 args = append(args, filter.DocumentID)
84 }
85 if filter.Collection != "" {
86 query += " AND collection = ?"
87 args = append(args, filter.Collection)
88 }
89 if filter.DID != "" {
90 query += " AND did = ?"
91 args = append(args, filter.DID)
92 }
93
94 rows, err := s.db.QueryContext(ctx, query, args...)
95 if err != nil {
96 return nil, fmt.Errorf("list documents: %w", err)
97 }
98 defer rows.Close()
99
100 var docs []*Document
101 for rows.Next() {
102 doc := &Document{}
103 var (
104 title, body, summary, repoDID, repoName, authorHandle sql.NullString
105 tagsJSON, language, createdAt, updatedAt, webURL, deletedAt sql.NullString
106 )
107 if err := rows.Scan(
108 &doc.ID, &doc.DID, &doc.Collection, &doc.RKey, &doc.ATURI, &doc.CID, &doc.RecordType,
109 &title, &body, &summary, &repoDID, &repoName, &authorHandle,
110 &tagsJSON, &language, &createdAt, &updatedAt, &doc.IndexedAt, &webURL, &deletedAt,
111 ); err != nil {
112 return nil, fmt.Errorf("scan document: %w", err)
113 }
114 doc.Title = title.String
115 doc.Body = body.String
116 doc.Summary = summary.String
117 doc.RepoDID = repoDID.String
118 doc.RepoName = repoName.String
119 doc.AuthorHandle = authorHandle.String
120 doc.TagsJSON = tagsJSON.String
121 doc.Language = language.String
122 doc.CreatedAt = createdAt.String
123 doc.UpdatedAt = updatedAt.String
124 doc.WebURL = webURL.String
125 doc.DeletedAt = deletedAt.String
126 docs = append(docs, doc)
127 }
128 if err := rows.Err(); err != nil {
129 return nil, fmt.Errorf("iterate documents: %w", err)
130 }
131 return docs, nil
132}
133
134func (s *SQLiteStore) OptimizeSearchIndex(ctx context.Context) error {
135 _, err := s.db.ExecContext(ctx, `INSERT INTO documents_fts(documents_fts) VALUES('optimize')`)
136 if err != nil {
137 return fmt.Errorf("optimize search index: %w", err)
138 }
139 return nil
140}
141
142func (s *SQLiteStore) GetDocument(ctx context.Context, id string) (*Document, error) {
143 row := s.db.QueryRowContext(ctx, `
144 SELECT id, did, collection, rkey, at_uri, cid, record_type,
145 title, body, summary, repo_did, repo_name, author_handle,
146 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at
147 FROM documents WHERE id = ?`, id)
148
149 doc, err := scanDocument(row)
150 if errors.Is(err, sql.ErrNoRows) {
151 return nil, nil
152 }
153 if err != nil {
154 return nil, fmt.Errorf("get document: %w", err)
155 }
156 return doc, nil
157}
158
159func (s *SQLiteStore) MarkDeleted(ctx context.Context, id string) error {
160 now := time.Now().UTC().Format(time.RFC3339)
161 tx, err := s.db.BeginTx(ctx, nil)
162 if err != nil {
163 return fmt.Errorf("begin mark deleted tx: %w", err)
164 }
165 defer tx.Rollback()
166
167 _, err = tx.ExecContext(ctx,
168 `UPDATE documents SET deleted_at = ? WHERE id = ?`, now, id)
169 if err != nil {
170 return fmt.Errorf("mark deleted: %w", err)
171 }
172 if _, err := tx.ExecContext(ctx, `DELETE FROM documents_fts WHERE id = ?`, id); err != nil {
173 return fmt.Errorf("delete document from fts: %w", err)
174 }
175 if err := tx.Commit(); err != nil {
176 return fmt.Errorf("commit mark deleted tx: %w", err)
177 }
178 return nil
179}
180
181func (s *SQLiteStore) GetSyncState(ctx context.Context, consumer string) (*SyncState, error) {
182 row := s.db.QueryRowContext(ctx, `
183 SELECT consumer_name, cursor, high_water_mark, updated_at
184 FROM sync_state WHERE consumer_name = ?`, consumer)
185
186 ss := &SyncState{}
187 var hwm sql.NullString
188 err := row.Scan(&ss.ConsumerName, &ss.Cursor, &hwm, &ss.UpdatedAt)
189 if errors.Is(err, sql.ErrNoRows) {
190 return nil, nil
191 }
192 if err != nil {
193 return nil, fmt.Errorf("get sync state: %w", err)
194 }
195 ss.HighWaterMark = hwm.String
196 return ss, nil
197}
198
199func (s *SQLiteStore) SetSyncState(ctx context.Context, consumer string, cursor string) error {
200 now := time.Now().UTC().Format(time.RFC3339)
201 _, err := s.db.ExecContext(ctx, `
202 INSERT INTO sync_state (consumer_name, cursor, updated_at) VALUES (?, ?, ?)
203 ON CONFLICT(consumer_name) DO UPDATE SET
204 cursor = excluded.cursor,
205 updated_at = excluded.updated_at`,
206 consumer, cursor, now,
207 )
208 if err != nil {
209 return fmt.Errorf("set sync state: %w", err)
210 }
211 return nil
212}
213
214func (s *SQLiteStore) UpdateRecordState(ctx context.Context, subjectURI string, state string) error {
215 now := time.Now().UTC().Format(time.RFC3339)
216 _, err := s.db.ExecContext(ctx, `
217 INSERT INTO record_state (subject_uri, state, updated_at) VALUES (?, ?, ?)
218 ON CONFLICT(subject_uri) DO UPDATE SET
219 state = excluded.state,
220 updated_at = excluded.updated_at`,
221 subjectURI, state, now,
222 )
223 if err != nil {
224 return fmt.Errorf("update record state: %w", err)
225 }
226 return nil
227}
228
229func (s *SQLiteStore) UpsertIdentityHandle(ctx context.Context, did, handle string, isActive bool, status string) error {
230 now := time.Now().UTC().Format(time.RFC3339)
231 _, err := s.db.ExecContext(ctx, `
232 INSERT INTO identity_handles (did, handle, is_active, status, updated_at)
233 VALUES (?, ?, ?, ?, ?)
234 ON CONFLICT(did) DO UPDATE SET
235 handle = excluded.handle,
236 is_active = excluded.is_active,
237 status = excluded.status,
238 updated_at = excluded.updated_at`,
239 did, handle, isActive, status, now,
240 )
241 if err != nil {
242 return fmt.Errorf("upsert identity handle: %w", err)
243 }
244 return nil
245}
246
247func (s *SQLiteStore) GetIdentityHandle(ctx context.Context, did string) (string, error) {
248 var handle sql.NullString
249 err := s.db.QueryRowContext(ctx, `SELECT handle FROM identity_handles WHERE did = ?`, did).Scan(&handle)
250 if errors.Is(err, sql.ErrNoRows) {
251 return "", nil
252 }
253 if err != nil {
254 return "", fmt.Errorf("get identity handle: %w", err)
255 }
256 return handle.String, nil
257}
258
259func (s *SQLiteStore) GetFollowSubjects(ctx context.Context, did string) ([]string, error) {
260 rows, err := s.db.QueryContext(ctx, `
261 SELECT DISTINCT repo_did
262 FROM documents
263 WHERE did = ?
264 AND collection = 'sh.tangled.graph.follow'
265 AND deleted_at IS NULL
266 AND repo_did IS NOT NULL
267 AND repo_did != ''`,
268 did,
269 )
270 if err != nil {
271 return nil, fmt.Errorf("get follow subjects: %w", err)
272 }
273 defer rows.Close()
274
275 var subjects []string
276 for rows.Next() {
277 var subject string
278 if err := rows.Scan(&subject); err != nil {
279 return nil, fmt.Errorf("scan follow subject: %w", err)
280 }
281 subjects = append(subjects, subject)
282 }
283 if err := rows.Err(); err != nil {
284 return nil, fmt.Errorf("iterate follow subjects: %w", err)
285 }
286 return subjects, nil
287}
288
289func (s *SQLiteStore) GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) {
290 rows, err := s.db.QueryContext(ctx, `
291 SELECT DISTINCT did
292 FROM documents
293 WHERE repo_did = ?
294 AND did != ?
295 AND deleted_at IS NULL
296 AND collection IN (
297 'sh.tangled.repo.issue',
298 'sh.tangled.repo.pull',
299 'sh.tangled.repo.issue.comment',
300 'sh.tangled.repo.pull.comment'
301 )`,
302 repoOwnerDID, repoOwnerDID,
303 )
304 if err != nil {
305 return nil, fmt.Errorf("get repo collaborators: %w", err)
306 }
307 defer rows.Close()
308
309 var collaborators []string
310 for rows.Next() {
311 var collaborator string
312 if err := rows.Scan(&collaborator); err != nil {
313 return nil, fmt.Errorf("scan collaborator: %w", err)
314 }
315 collaborators = append(collaborators, collaborator)
316 }
317 if err := rows.Err(); err != nil {
318 return nil, fmt.Errorf("iterate collaborators: %w", err)
319 }
320 return collaborators, nil
321}
322
323func (s *SQLiteStore) CountDocuments(ctx context.Context) (int64, error) {
324 var n int64
325 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM documents WHERE deleted_at IS NULL`).Scan(&n)
326 if err != nil {
327 return 0, fmt.Errorf("count documents: %w", err)
328 }
329 return n, nil
330}
331
332func (s *SQLiteStore) CountPendingIndexingJobs(ctx context.Context) (int64, error) {
333 var n int64
334 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM indexing_jobs WHERE status = 'pending'`).Scan(&n)
335 if err != nil {
336 return 0, fmt.Errorf("count pending indexing jobs: %w", err)
337 }
338 return n, nil
339}
340
341func (s *SQLiteStore) InsertJetstreamEvent(ctx context.Context, event *JetstreamEvent, maxEvents int) error {
342 tx, err := s.db.BeginTx(ctx, nil)
343 if err != nil {
344 return fmt.Errorf("begin insert jetstream event tx: %w", err)
345 }
346 defer tx.Rollback()
347
348 _, err = tx.ExecContext(ctx, `
349 INSERT INTO jetstream_events (time_us, did, kind, collection, rkey, operation, payload, received_at)
350 VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
351 event.TimeUS, event.DID, event.Kind,
352 nullableStr(event.Collection), nullableStr(event.RKey), nullableStr(event.Operation),
353 event.Payload, event.ReceivedAt,
354 )
355 if err != nil {
356 return fmt.Errorf("insert jetstream event: %w", err)
357 }
358
359 if maxEvents > 0 {
360 _, err = tx.ExecContext(ctx, `
361 DELETE FROM jetstream_events
362 WHERE id NOT IN (
363 SELECT id FROM jetstream_events ORDER BY time_us DESC LIMIT ?
364 )`, maxEvents)
365 if err != nil {
366 return fmt.Errorf("trim jetstream events: %w", err)
367 }
368 }
369
370 if err := tx.Commit(); err != nil {
371 return fmt.Errorf("commit insert jetstream event tx: %w", err)
372 }
373 return nil
374}
375
376func (s *SQLiteStore) ListJetstreamEvents(ctx context.Context, filter JetstreamEventFilter) ([]*JetstreamEvent, error) {
377 query := `
378 SELECT id, time_us, did, kind,
379 COALESCE(collection, ''), COALESCE(rkey, ''), COALESCE(operation, ''),
380 payload, received_at
381 FROM jetstream_events WHERE 1=1`
382 args := []any{}
383
384 if filter.Collection != "" {
385 query += " AND collection = ?"
386 args = append(args, filter.Collection)
387 }
388 if filter.DID != "" {
389 query += " AND did = ?"
390 args = append(args, filter.DID)
391 }
392 if filter.Operation != "" {
393 query += " AND operation = ?"
394 args = append(args, filter.Operation)
395 }
396
397 query += " ORDER BY time_us DESC"
398
399 limit := filter.Limit
400 if limit <= 0 {
401 limit = 50
402 }
403 query += " LIMIT ?"
404 args = append(args, limit)
405
406 if filter.Offset > 0 {
407 query += " OFFSET ?"
408 args = append(args, filter.Offset)
409 }
410
411 rows, err := s.db.QueryContext(ctx, query, args...)
412 if err != nil {
413 return nil, fmt.Errorf("list jetstream events: %w", err)
414 }
415 defer rows.Close()
416
417 var events []*JetstreamEvent
418 for rows.Next() {
419 e := &JetstreamEvent{}
420 if err := rows.Scan(
421 &e.ID, &e.TimeUS, &e.DID, &e.Kind,
422 &e.Collection, &e.RKey, &e.Operation,
423 &e.Payload, &e.ReceivedAt,
424 ); err != nil {
425 return nil, fmt.Errorf("scan jetstream event: %w", err)
426 }
427 events = append(events, e)
428 }
429 if err := rows.Err(); err != nil {
430 return nil, fmt.Errorf("iterate jetstream events: %w", err)
431 }
432 return events, nil
433}
434
435func (s *SQLiteStore) Ping(ctx context.Context) error {
436 return s.db.PingContext(ctx)
437}
438
439func scanDocument(row *sql.Row) (*Document, error) {
440 doc := &Document{}
441 var (
442 title, body, summary, repoDID, repoName, authorHandle sql.NullString
443 tagsJSON, language, createdAt, updatedAt, webURL, deletedAt sql.NullString
444 )
445 err := row.Scan(
446 &doc.ID, &doc.DID, &doc.Collection, &doc.RKey, &doc.ATURI, &doc.CID, &doc.RecordType,
447 &title, &body, &summary, &repoDID, &repoName, &authorHandle,
448 &tagsJSON, &language, &createdAt, &updatedAt, &doc.IndexedAt, &webURL, &deletedAt,
449 )
450 if err != nil {
451 return nil, err
452 }
453 doc.Title = title.String
454 doc.Body = body.String
455 doc.Summary = summary.String
456 doc.RepoDID = repoDID.String
457 doc.RepoName = repoName.String
458 doc.AuthorHandle = authorHandle.String
459 doc.TagsJSON = tagsJSON.String
460 doc.Language = language.String
461 doc.CreatedAt = createdAt.String
462 doc.UpdatedAt = updatedAt.String
463 doc.WebURL = webURL.String
464 doc.DeletedAt = deletedAt.String
465 return doc, nil
466}
467
468func nullableStr(s string) any {
469 if s == "" {
470 return nil
471 }
472 return s
473}
474
475type execer interface {
476 ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
477}
478
479func syncDocumentFTS(ctx context.Context, db execer, doc *Document) error {
480 if _, err := db.ExecContext(ctx, `DELETE FROM documents_fts WHERE id = ?`, doc.ID); err != nil {
481 return fmt.Errorf("delete document from fts: %w", err)
482 }
483 if doc.DeletedAt != "" {
484 return nil
485 }
486 _, err := db.ExecContext(ctx, `
487 INSERT INTO documents_fts (id, title, body, summary, repo_name, author_handle, tags_json)
488 VALUES (?, ?, ?, ?, ?, ?, ?)`,
489 doc.ID, doc.Title, doc.Body, doc.Summary, doc.RepoName, doc.AuthorHandle, doc.TagsJSON,
490 )
491 if err != nil {
492 return fmt.Errorf("insert document into fts: %w", err)
493 }
494 return nil
495}