a love letter to tangled (android, iOS, and a search API)
at main 495 lines 15 kB view raw
1package store 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "time" 9) 10 11// SQLiteStore implements Store against a SQLite database. 12type SQLiteStore struct { 13 db *sql.DB 14} 15 16// New wraps an open *sql.DB in a Store implementation. 17func New(url string, db *sql.DB) Store { 18 if DetectBackend(url) == BackendPostgres { 19 return &PostgresStore{db: db} 20 } 21 return &SQLiteStore{db: db} 22} 23 24func (s *SQLiteStore) UpsertDocument(ctx context.Context, doc *Document) error { 25 doc.IndexedAt = time.Now().UTC().Format(time.RFC3339) 26 tx, err := s.db.BeginTx(ctx, nil) 27 if err != nil { 28 return fmt.Errorf("begin upsert document tx: %w", err) 29 } 30 defer tx.Rollback() 31 32 _, err = tx.ExecContext(ctx, ` 33 INSERT INTO documents ( 34 id, did, collection, rkey, at_uri, cid, record_type, 35 title, body, summary, repo_did, repo_name, author_handle, 36 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at 37 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 38 ON CONFLICT(id) DO UPDATE SET 39 did = excluded.did, 40 collection = excluded.collection, 41 rkey = excluded.rkey, 42 at_uri = excluded.at_uri, 43 cid = excluded.cid, 44 record_type = excluded.record_type, 45 title = excluded.title, 46 body = excluded.body, 47 summary = excluded.summary, 48 repo_did = excluded.repo_did, 49 repo_name = excluded.repo_name, 50 author_handle = excluded.author_handle, 51 tags_json = excluded.tags_json, 52 language = excluded.language, 53 created_at = excluded.created_at, 54 updated_at = excluded.updated_at, 55 indexed_at = excluded.indexed_at, 56 web_url = excluded.web_url, 57 deleted_at = excluded.deleted_at`, 58 doc.ID, doc.DID, doc.Collection, doc.RKey, doc.ATURI, doc.CID, doc.RecordType, 59 doc.Title, doc.Body, doc.Summary, doc.RepoDID, doc.RepoName, doc.AuthorHandle, 60 doc.TagsJSON, doc.Language, doc.CreatedAt, doc.UpdatedAt, doc.IndexedAt, doc.WebURL, nullableStr(doc.DeletedAt), 61 ) 62 if err != nil { 63 return fmt.Errorf("upsert document: %w", err) 64 } 65 if err := syncDocumentFTS(ctx, tx, doc); err != nil { 66 return err 67 } 68 if err := tx.Commit(); err != nil { 69 return fmt.Errorf("commit upsert document tx: %w", err) 70 } 71 return nil 72} 73 74func (s *SQLiteStore) ListDocuments(ctx context.Context, filter DocumentFilter) ([]*Document, error) { 75 query := `SELECT id, did, collection, rkey, at_uri, cid, record_type, 76 title, body, summary, repo_did, repo_name, author_handle, 77 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at 78 FROM documents WHERE deleted_at IS NULL` 79 args := []any{} 80 81 if filter.DocumentID != "" { 82 query += " AND id = ?" 83 args = append(args, filter.DocumentID) 84 } 85 if filter.Collection != "" { 86 query += " AND collection = ?" 87 args = append(args, filter.Collection) 88 } 89 if filter.DID != "" { 90 query += " AND did = ?" 91 args = append(args, filter.DID) 92 } 93 94 rows, err := s.db.QueryContext(ctx, query, args...) 95 if err != nil { 96 return nil, fmt.Errorf("list documents: %w", err) 97 } 98 defer rows.Close() 99 100 var docs []*Document 101 for rows.Next() { 102 doc := &Document{} 103 var ( 104 title, body, summary, repoDID, repoName, authorHandle sql.NullString 105 tagsJSON, language, createdAt, updatedAt, webURL, deletedAt sql.NullString 106 ) 107 if err := rows.Scan( 108 &doc.ID, &doc.DID, &doc.Collection, &doc.RKey, &doc.ATURI, &doc.CID, &doc.RecordType, 109 &title, &body, &summary, &repoDID, &repoName, &authorHandle, 110 &tagsJSON, &language, &createdAt, &updatedAt, &doc.IndexedAt, &webURL, &deletedAt, 111 ); err != nil { 112 return nil, fmt.Errorf("scan document: %w", err) 113 } 114 doc.Title = title.String 115 doc.Body = body.String 116 doc.Summary = summary.String 117 doc.RepoDID = repoDID.String 118 doc.RepoName = repoName.String 119 doc.AuthorHandle = authorHandle.String 120 doc.TagsJSON = tagsJSON.String 121 doc.Language = language.String 122 doc.CreatedAt = createdAt.String 123 doc.UpdatedAt = updatedAt.String 124 doc.WebURL = webURL.String 125 doc.DeletedAt = deletedAt.String 126 docs = append(docs, doc) 127 } 128 if err := rows.Err(); err != nil { 129 return nil, fmt.Errorf("iterate documents: %w", err) 130 } 131 return docs, nil 132} 133 134func (s *SQLiteStore) OptimizeSearchIndex(ctx context.Context) error { 135 _, err := s.db.ExecContext(ctx, `INSERT INTO documents_fts(documents_fts) VALUES('optimize')`) 136 if err != nil { 137 return fmt.Errorf("optimize search index: %w", err) 138 } 139 return nil 140} 141 142func (s *SQLiteStore) GetDocument(ctx context.Context, id string) (*Document, error) { 143 row := s.db.QueryRowContext(ctx, ` 144 SELECT id, did, collection, rkey, at_uri, cid, record_type, 145 title, body, summary, repo_did, repo_name, author_handle, 146 tags_json, language, created_at, updated_at, indexed_at, web_url, deleted_at 147 FROM documents WHERE id = ?`, id) 148 149 doc, err := scanDocument(row) 150 if errors.Is(err, sql.ErrNoRows) { 151 return nil, nil 152 } 153 if err != nil { 154 return nil, fmt.Errorf("get document: %w", err) 155 } 156 return doc, nil 157} 158 159func (s *SQLiteStore) MarkDeleted(ctx context.Context, id string) error { 160 now := time.Now().UTC().Format(time.RFC3339) 161 tx, err := s.db.BeginTx(ctx, nil) 162 if err != nil { 163 return fmt.Errorf("begin mark deleted tx: %w", err) 164 } 165 defer tx.Rollback() 166 167 _, err = tx.ExecContext(ctx, 168 `UPDATE documents SET deleted_at = ? WHERE id = ?`, now, id) 169 if err != nil { 170 return fmt.Errorf("mark deleted: %w", err) 171 } 172 if _, err := tx.ExecContext(ctx, `DELETE FROM documents_fts WHERE id = ?`, id); err != nil { 173 return fmt.Errorf("delete document from fts: %w", err) 174 } 175 if err := tx.Commit(); err != nil { 176 return fmt.Errorf("commit mark deleted tx: %w", err) 177 } 178 return nil 179} 180 181func (s *SQLiteStore) GetSyncState(ctx context.Context, consumer string) (*SyncState, error) { 182 row := s.db.QueryRowContext(ctx, ` 183 SELECT consumer_name, cursor, high_water_mark, updated_at 184 FROM sync_state WHERE consumer_name = ?`, consumer) 185 186 ss := &SyncState{} 187 var hwm sql.NullString 188 err := row.Scan(&ss.ConsumerName, &ss.Cursor, &hwm, &ss.UpdatedAt) 189 if errors.Is(err, sql.ErrNoRows) { 190 return nil, nil 191 } 192 if err != nil { 193 return nil, fmt.Errorf("get sync state: %w", err) 194 } 195 ss.HighWaterMark = hwm.String 196 return ss, nil 197} 198 199func (s *SQLiteStore) SetSyncState(ctx context.Context, consumer string, cursor string) error { 200 now := time.Now().UTC().Format(time.RFC3339) 201 _, err := s.db.ExecContext(ctx, ` 202 INSERT INTO sync_state (consumer_name, cursor, updated_at) VALUES (?, ?, ?) 203 ON CONFLICT(consumer_name) DO UPDATE SET 204 cursor = excluded.cursor, 205 updated_at = excluded.updated_at`, 206 consumer, cursor, now, 207 ) 208 if err != nil { 209 return fmt.Errorf("set sync state: %w", err) 210 } 211 return nil 212} 213 214func (s *SQLiteStore) UpdateRecordState(ctx context.Context, subjectURI string, state string) error { 215 now := time.Now().UTC().Format(time.RFC3339) 216 _, err := s.db.ExecContext(ctx, ` 217 INSERT INTO record_state (subject_uri, state, updated_at) VALUES (?, ?, ?) 218 ON CONFLICT(subject_uri) DO UPDATE SET 219 state = excluded.state, 220 updated_at = excluded.updated_at`, 221 subjectURI, state, now, 222 ) 223 if err != nil { 224 return fmt.Errorf("update record state: %w", err) 225 } 226 return nil 227} 228 229func (s *SQLiteStore) UpsertIdentityHandle(ctx context.Context, did, handle string, isActive bool, status string) error { 230 now := time.Now().UTC().Format(time.RFC3339) 231 _, err := s.db.ExecContext(ctx, ` 232 INSERT INTO identity_handles (did, handle, is_active, status, updated_at) 233 VALUES (?, ?, ?, ?, ?) 234 ON CONFLICT(did) DO UPDATE SET 235 handle = excluded.handle, 236 is_active = excluded.is_active, 237 status = excluded.status, 238 updated_at = excluded.updated_at`, 239 did, handle, isActive, status, now, 240 ) 241 if err != nil { 242 return fmt.Errorf("upsert identity handle: %w", err) 243 } 244 return nil 245} 246 247func (s *SQLiteStore) GetIdentityHandle(ctx context.Context, did string) (string, error) { 248 var handle sql.NullString 249 err := s.db.QueryRowContext(ctx, `SELECT handle FROM identity_handles WHERE did = ?`, did).Scan(&handle) 250 if errors.Is(err, sql.ErrNoRows) { 251 return "", nil 252 } 253 if err != nil { 254 return "", fmt.Errorf("get identity handle: %w", err) 255 } 256 return handle.String, nil 257} 258 259func (s *SQLiteStore) GetFollowSubjects(ctx context.Context, did string) ([]string, error) { 260 rows, err := s.db.QueryContext(ctx, ` 261 SELECT DISTINCT repo_did 262 FROM documents 263 WHERE did = ? 264 AND collection = 'sh.tangled.graph.follow' 265 AND deleted_at IS NULL 266 AND repo_did IS NOT NULL 267 AND repo_did != ''`, 268 did, 269 ) 270 if err != nil { 271 return nil, fmt.Errorf("get follow subjects: %w", err) 272 } 273 defer rows.Close() 274 275 var subjects []string 276 for rows.Next() { 277 var subject string 278 if err := rows.Scan(&subject); err != nil { 279 return nil, fmt.Errorf("scan follow subject: %w", err) 280 } 281 subjects = append(subjects, subject) 282 } 283 if err := rows.Err(); err != nil { 284 return nil, fmt.Errorf("iterate follow subjects: %w", err) 285 } 286 return subjects, nil 287} 288 289func (s *SQLiteStore) GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) { 290 rows, err := s.db.QueryContext(ctx, ` 291 SELECT DISTINCT did 292 FROM documents 293 WHERE repo_did = ? 294 AND did != ? 295 AND deleted_at IS NULL 296 AND collection IN ( 297 'sh.tangled.repo.issue', 298 'sh.tangled.repo.pull', 299 'sh.tangled.repo.issue.comment', 300 'sh.tangled.repo.pull.comment' 301 )`, 302 repoOwnerDID, repoOwnerDID, 303 ) 304 if err != nil { 305 return nil, fmt.Errorf("get repo collaborators: %w", err) 306 } 307 defer rows.Close() 308 309 var collaborators []string 310 for rows.Next() { 311 var collaborator string 312 if err := rows.Scan(&collaborator); err != nil { 313 return nil, fmt.Errorf("scan collaborator: %w", err) 314 } 315 collaborators = append(collaborators, collaborator) 316 } 317 if err := rows.Err(); err != nil { 318 return nil, fmt.Errorf("iterate collaborators: %w", err) 319 } 320 return collaborators, nil 321} 322 323func (s *SQLiteStore) CountDocuments(ctx context.Context) (int64, error) { 324 var n int64 325 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM documents WHERE deleted_at IS NULL`).Scan(&n) 326 if err != nil { 327 return 0, fmt.Errorf("count documents: %w", err) 328 } 329 return n, nil 330} 331 332func (s *SQLiteStore) CountPendingIndexingJobs(ctx context.Context) (int64, error) { 333 var n int64 334 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM indexing_jobs WHERE status = 'pending'`).Scan(&n) 335 if err != nil { 336 return 0, fmt.Errorf("count pending indexing jobs: %w", err) 337 } 338 return n, nil 339} 340 341func (s *SQLiteStore) InsertJetstreamEvent(ctx context.Context, event *JetstreamEvent, maxEvents int) error { 342 tx, err := s.db.BeginTx(ctx, nil) 343 if err != nil { 344 return fmt.Errorf("begin insert jetstream event tx: %w", err) 345 } 346 defer tx.Rollback() 347 348 _, err = tx.ExecContext(ctx, ` 349 INSERT INTO jetstream_events (time_us, did, kind, collection, rkey, operation, payload, received_at) 350 VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, 351 event.TimeUS, event.DID, event.Kind, 352 nullableStr(event.Collection), nullableStr(event.RKey), nullableStr(event.Operation), 353 event.Payload, event.ReceivedAt, 354 ) 355 if err != nil { 356 return fmt.Errorf("insert jetstream event: %w", err) 357 } 358 359 if maxEvents > 0 { 360 _, err = tx.ExecContext(ctx, ` 361 DELETE FROM jetstream_events 362 WHERE id NOT IN ( 363 SELECT id FROM jetstream_events ORDER BY time_us DESC LIMIT ? 364 )`, maxEvents) 365 if err != nil { 366 return fmt.Errorf("trim jetstream events: %w", err) 367 } 368 } 369 370 if err := tx.Commit(); err != nil { 371 return fmt.Errorf("commit insert jetstream event tx: %w", err) 372 } 373 return nil 374} 375 376func (s *SQLiteStore) ListJetstreamEvents(ctx context.Context, filter JetstreamEventFilter) ([]*JetstreamEvent, error) { 377 query := ` 378 SELECT id, time_us, did, kind, 379 COALESCE(collection, ''), COALESCE(rkey, ''), COALESCE(operation, ''), 380 payload, received_at 381 FROM jetstream_events WHERE 1=1` 382 args := []any{} 383 384 if filter.Collection != "" { 385 query += " AND collection = ?" 386 args = append(args, filter.Collection) 387 } 388 if filter.DID != "" { 389 query += " AND did = ?" 390 args = append(args, filter.DID) 391 } 392 if filter.Operation != "" { 393 query += " AND operation = ?" 394 args = append(args, filter.Operation) 395 } 396 397 query += " ORDER BY time_us DESC" 398 399 limit := filter.Limit 400 if limit <= 0 { 401 limit = 50 402 } 403 query += " LIMIT ?" 404 args = append(args, limit) 405 406 if filter.Offset > 0 { 407 query += " OFFSET ?" 408 args = append(args, filter.Offset) 409 } 410 411 rows, err := s.db.QueryContext(ctx, query, args...) 412 if err != nil { 413 return nil, fmt.Errorf("list jetstream events: %w", err) 414 } 415 defer rows.Close() 416 417 var events []*JetstreamEvent 418 for rows.Next() { 419 e := &JetstreamEvent{} 420 if err := rows.Scan( 421 &e.ID, &e.TimeUS, &e.DID, &e.Kind, 422 &e.Collection, &e.RKey, &e.Operation, 423 &e.Payload, &e.ReceivedAt, 424 ); err != nil { 425 return nil, fmt.Errorf("scan jetstream event: %w", err) 426 } 427 events = append(events, e) 428 } 429 if err := rows.Err(); err != nil { 430 return nil, fmt.Errorf("iterate jetstream events: %w", err) 431 } 432 return events, nil 433} 434 435func (s *SQLiteStore) Ping(ctx context.Context) error { 436 return s.db.PingContext(ctx) 437} 438 439func scanDocument(row *sql.Row) (*Document, error) { 440 doc := &Document{} 441 var ( 442 title, body, summary, repoDID, repoName, authorHandle sql.NullString 443 tagsJSON, language, createdAt, updatedAt, webURL, deletedAt sql.NullString 444 ) 445 err := row.Scan( 446 &doc.ID, &doc.DID, &doc.Collection, &doc.RKey, &doc.ATURI, &doc.CID, &doc.RecordType, 447 &title, &body, &summary, &repoDID, &repoName, &authorHandle, 448 &tagsJSON, &language, &createdAt, &updatedAt, &doc.IndexedAt, &webURL, &deletedAt, 449 ) 450 if err != nil { 451 return nil, err 452 } 453 doc.Title = title.String 454 doc.Body = body.String 455 doc.Summary = summary.String 456 doc.RepoDID = repoDID.String 457 doc.RepoName = repoName.String 458 doc.AuthorHandle = authorHandle.String 459 doc.TagsJSON = tagsJSON.String 460 doc.Language = language.String 461 doc.CreatedAt = createdAt.String 462 doc.UpdatedAt = updatedAt.String 463 doc.WebURL = webURL.String 464 doc.DeletedAt = deletedAt.String 465 return doc, nil 466} 467 468func nullableStr(s string) any { 469 if s == "" { 470 return nil 471 } 472 return s 473} 474 475type execer interface { 476 ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) 477} 478 479func syncDocumentFTS(ctx context.Context, db execer, doc *Document) error { 480 if _, err := db.ExecContext(ctx, `DELETE FROM documents_fts WHERE id = ?`, doc.ID); err != nil { 481 return fmt.Errorf("delete document from fts: %w", err) 482 } 483 if doc.DeletedAt != "" { 484 return nil 485 } 486 _, err := db.ExecContext(ctx, ` 487 INSERT INTO documents_fts (id, title, body, summary, repo_name, author_handle, tags_json) 488 VALUES (?, ?, ?, ?, ?, ?, ?)`, 489 doc.ID, doc.Title, doc.Body, doc.Summary, doc.RepoName, doc.AuthorHandle, doc.TagsJSON, 490 ) 491 if err != nil { 492 return fmt.Errorf("insert document into fts: %w", err) 493 } 494 return nil 495}