search and/or read your saved and liked bluesky posts
wails
go
svelte
sqlite
desktop
bluesky
1package main
2
3import (
4 "database/sql"
5 "embed"
6 "fmt"
7 "os"
8 "path/filepath"
9 "strings"
10 "time"
11
12 _ "modernc.org/sqlite"
13)
14
15var db *sql.DB
16
17//go:embed migrations/*.sql
18var migrationsFS embed.FS
19
20// Open opens the database connection and runs migrations
21func Open(dbPath string) error {
22 LogInfof("opening database: %s", dbPath)
23
24 dir := filepath.Dir(dbPath)
25 if err := os.MkdirAll(dir, 0755); err != nil {
26 return wrapDBError("failed to create database directory", err)
27 }
28
29 var err error
30 db, err = sql.Open("sqlite", dbPath+"?_pragma=foreign_keys(1)")
31 if err != nil {
32 return wrapDBError("failed to open database", err)
33 }
34
35 if err := db.Ping(); err != nil {
36 return wrapDBError("failed to ping database", err)
37 }
38
39 _, err = db.Exec("PRAGMA journal_mode=WAL")
40 if err != nil {
41 return wrapDBError("failed to enable WAL mode", err)
42 }
43
44 LogInfo("database connection established with WAL mode")
45
46 if err := runMigrations(); err != nil {
47 return wrapDBError("failed to run migrations", err)
48 }
49
50 LogInfo("database migrations completed successfully")
51 return nil
52}
53
54func runMigrations() error {
55 content, err := migrationsFS.ReadFile("migrations/000_initial_schema.sql")
56 if err != nil {
57 return wrapDBError("failed to read migration", err)
58 }
59
60 if _, err := db.Exec(string(content)); err != nil {
61 return wrapDBError("failed to execute migration", err)
62 }
63
64 if err := ensureSchemaCompatibility(); err != nil {
65 return wrapDBError("failed to upgrade schema", err)
66 }
67
68 return nil
69}
70
71func ensureSchemaCompatibility() error {
72 columnsByTable := map[string][]struct {
73 name string
74 definition string
75 }{
76 "posts": {
77 {name: "facets", definition: "TEXT"},
78 },
79 "auth": {
80 {name: "session_id", definition: "TEXT"},
81 {name: "auth_server_url", definition: "TEXT"},
82 {name: "auth_server_token_endpoint", definition: "TEXT"},
83 {name: "auth_server_revocation_endpoint", definition: "TEXT"},
84 {name: "dpop_auth_nonce", definition: "TEXT"},
85 {name: "dpop_host_nonce", definition: "TEXT"},
86 {name: "dpop_private_key", definition: "TEXT"},
87 },
88 }
89
90 for table, columns := range columnsByTable {
91 exists, err := tableExists(table)
92 if err != nil {
93 return err
94 }
95 if !exists {
96 continue
97 }
98
99 for _, column := range columns {
100 hasColumn, err := columnExists(table, column.name)
101 if err != nil {
102 return err
103 }
104 if hasColumn {
105 continue
106 }
107
108 query := fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", table, column.name, column.definition)
109 if _, err := db.Exec(query); err != nil {
110 return wrapDBError("failed to add "+table+"."+column.name, err)
111 }
112 }
113 }
114
115 return nil
116}
117
118func tableExists(table string) (bool, error) {
119 var count int
120 if err := db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = ?", table).Scan(&count); err != nil {
121 return false, err
122 }
123 return count > 0, nil
124}
125
126func columnExists(table, column string) (bool, error) {
127 rows, err := db.Query("SELECT name FROM pragma_table_info(?)", table)
128 if err != nil {
129 return false, err
130 }
131 defer rows.Close()
132
133 for rows.Next() {
134 var name string
135
136 if err := rows.Scan(&name); err != nil {
137 return false, err
138 }
139
140 if name == column {
141 return true, nil
142 }
143 }
144
145 return false, rows.Err()
146}
147
148// Close closes the database connection
149func Close() error {
150 LogInfo("closing database connection")
151 if db != nil {
152 err := db.Close()
153 if err != nil {
154 LogErrorf("failed to close database: %v", err)
155 return err
156 }
157 LogInfo("database connection closed")
158 }
159 return nil
160}
161
162// PostExists checks if a post with the given URI already exists in the database
163func PostExists(uri string) (bool, error) {
164 var exists bool
165 err := db.QueryRow("SELECT EXISTS(SELECT 1 FROM posts WHERE uri = ?)", uri).Scan(&exists)
166 if err != nil {
167 return false, err
168 }
169 return exists, nil
170}
171
172// InsertPost inserts a post into the database
173func InsertPost(post *Post) error {
174 LogInfof("inserting post: %s by %s", post.URI, post.AuthorHandle)
175
176 exists, err := PostExists(post.URI)
177 if err != nil {
178 LogErrorf("failed to check if post exists: %s, error: %v", post.URI, err)
179 return err
180 }
181
182 if exists {
183 LogDebugf("skipping already indexed post: %s", post.URI)
184 return nil
185 }
186
187 query := `
188 INSERT INTO posts (uri, cid, author_did, author_handle, text, created_at, like_count, repost_count, reply_count, source, facets)
189 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
190 ON CONFLICT(uri) DO UPDATE SET
191 cid = excluded.cid,
192 author_did = excluded.author_did,
193 author_handle = excluded.author_handle,
194 text = excluded.text,
195 created_at = excluded.created_at,
196 like_count = excluded.like_count,
197 repost_count = excluded.repost_count,
198 reply_count = excluded.reply_count,
199 source = excluded.source,
200 facets = excluded.facets,
201 indexed_at = CURRENT_TIMESTAMP
202 `
203
204 _, err = db.Exec(query,
205 post.URI,
206 post.CID,
207 post.AuthorDID,
208 post.AuthorHandle,
209 post.Text,
210 post.CreatedAt,
211 post.LikeCount,
212 post.RepostCount,
213 post.ReplyCount,
214 post.Source,
215 post.Facets,
216 )
217
218 if err != nil {
219 LogErrorf("failed to insert post: %s, error: %v", post.URI, err)
220 }
221
222 return err
223}
224
225// UpsertAuth inserts or updates auth information
226func UpsertAuth(auth *Auth) error {
227 LogInfof("upserting auth: %s (%s)", auth.DID, auth.Handle)
228
229 query := `
230 INSERT INTO auth (did, handle, access_jwt, refresh_jwt, pds_url, session_id,
231 auth_server_url, auth_server_token_endpoint, auth_server_revocation_endpoint,
232 dpop_auth_nonce, dpop_host_nonce, dpop_private_key, updated_at)
233 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
234 ON CONFLICT(did) DO UPDATE SET
235 handle = excluded.handle,
236 access_jwt = excluded.access_jwt,
237 refresh_jwt = excluded.refresh_jwt,
238 pds_url = excluded.pds_url,
239 session_id = excluded.session_id,
240 auth_server_url = excluded.auth_server_url,
241 auth_server_token_endpoint = excluded.auth_server_token_endpoint,
242 auth_server_revocation_endpoint = excluded.auth_server_revocation_endpoint,
243 dpop_auth_nonce = excluded.dpop_auth_nonce,
244 dpop_host_nonce = excluded.dpop_host_nonce,
245 dpop_private_key = excluded.dpop_private_key,
246 updated_at = CURRENT_TIMESTAMP
247 `
248
249 _, err := db.Exec(query,
250 auth.DID,
251 auth.Handle,
252 auth.AccessJWT,
253 auth.RefreshJWT,
254 auth.PDSURL,
255 auth.SessionID,
256 auth.AuthServerURL,
257 auth.AuthServerTokenEndpoint,
258 auth.AuthServerRevocationEndpoint,
259 auth.DPoPAuthNonce,
260 auth.DPoPHostNonce,
261 auth.DPoPPrivateKey,
262 )
263
264 if err != nil {
265 LogErrorf("failed to upsert auth: %s, error: %v", auth.DID, err)
266 }
267
268 return err
269}
270
271// ClearAuth removes all persisted auth rows for this desktop client.
272func ClearAuth() error {
273 _, err := db.Exec("DELETE FROM auth")
274 return err
275}
276
277// GetAuth loads the auth record from the database
278func GetAuth() (*Auth, error) {
279 LogInfo("loading auth from database")
280
281 query := `SELECT did, handle, access_jwt, refresh_jwt, pds_url, session_id,
282 auth_server_url, auth_server_token_endpoint, auth_server_revocation_endpoint,
283 dpop_auth_nonce, dpop_host_nonce, dpop_private_key, updated_at
284 FROM auth
285 ORDER BY updated_at DESC
286 LIMIT 1`
287
288 auth, err := getAuthByQuery(query)
289
290 if err == sql.ErrNoRows {
291 LogInfo("no auth record found in database")
292 return nil, nil
293 }
294 if err != nil {
295 LogErrorf("failed to load auth: %v", err)
296 return nil, err
297 }
298
299 LogInfof("auth loaded successfully: %s (%s)", auth.DID, auth.Handle)
300 return auth, nil
301}
302
303// GetAuthByDID loads auth for a specific DID.
304func GetAuthByDID(did string) (*Auth, error) {
305 query := `SELECT did, handle, access_jwt, refresh_jwt, pds_url, session_id,
306 auth_server_url, auth_server_token_endpoint, auth_server_revocation_endpoint,
307 dpop_auth_nonce, dpop_host_nonce, dpop_private_key, updated_at
308 FROM auth
309 WHERE did = ?
310 LIMIT 1`
311
312 auth, err := getAuthByQuery(query, did)
313 if err == sql.ErrNoRows {
314 return nil, nil
315 }
316 if err != nil {
317 return nil, err
318 }
319 return auth, nil
320}
321
322func getAuthByQuery(query string, args ...any) (*Auth, error) {
323 var auth Auth
324 var updatedAt string
325
326 var sessionID, authServerURL, authServerTokenEndpoint, authServerRevocationEndpoint, dpopAuthNonce, dpopHostNonce, dpopPrivateKey sql.NullString
327
328 err := db.QueryRow(query, args...).Scan(
329 &auth.DID,
330 &auth.Handle,
331 &auth.AccessJWT,
332 &auth.RefreshJWT,
333 &auth.PDSURL,
334 &sessionID,
335 &authServerURL,
336 &authServerTokenEndpoint,
337 &authServerRevocationEndpoint,
338 &dpopAuthNonce,
339 &dpopHostNonce,
340 &dpopPrivateKey,
341 &updatedAt,
342 )
343 if err != nil {
344 return nil, err
345 }
346
347 if sessionID.Valid {
348 auth.SessionID = sessionID.String
349 }
350 if authServerURL.Valid {
351 auth.AuthServerURL = authServerURL.String
352 }
353 if authServerTokenEndpoint.Valid {
354 auth.AuthServerTokenEndpoint = authServerTokenEndpoint.String
355 }
356 if authServerRevocationEndpoint.Valid {
357 auth.AuthServerRevocationEndpoint = authServerRevocationEndpoint.String
358 }
359 if dpopAuthNonce.Valid {
360 auth.DPoPAuthNonce = dpopAuthNonce.String
361 }
362 if dpopHostNonce.Valid {
363 auth.DPoPHostNonce = dpopHostNonce.String
364 }
365 if dpopPrivateKey.Valid {
366 auth.DPoPPrivateKey = dpopPrivateKey.String
367 }
368
369 auth.UpdatedAt = parseStoredTime(updatedAt)
370 return &auth, nil
371}
372
373// SearchPosts searches posts using FTS5
374func SearchPosts(query string, source string) ([]SearchResult, error) {
375 query = strings.TrimSpace(query)
376 if query == "*" {
377 query = ""
378 }
379
380 LogInfof("searching posts: query=%s, source=%s", query, source)
381
382 if query == "" {
383 return listRecentPosts(source)
384 }
385
386 sqlQuery := `
387 SELECT p.uri, p.cid, p.author_did, p.author_handle, p.text, p.created_at,
388 p.like_count, p.repost_count, p.reply_count, p.source, p.indexed_at,
389 bm25(posts_fts, 5.0, 1.0) AS rank
390 FROM posts_fts
391 JOIN posts p ON posts_fts.rowid = p.rowid
392 WHERE posts_fts MATCH ?
393 AND (? = '' OR p.source = ?)
394 ORDER BY rank
395 LIMIT 25
396 `
397
398 rows, err := db.Query(sqlQuery, query, source, source)
399 if err != nil {
400 LogErrorf("failed to execute search query: %v", err)
401 return nil, err
402 }
403 defer rows.Close()
404
405 var results []SearchResult
406 for rows.Next() {
407 var r SearchResult
408 var createdAt, indexedAt string
409
410 err := rows.Scan(
411 &r.URI,
412 &r.CID,
413 &r.AuthorDID,
414 &r.AuthorHandle,
415 &r.Text,
416 &createdAt,
417 &r.LikeCount,
418 &r.RepostCount,
419 &r.ReplyCount,
420 &r.Source,
421 &indexedAt,
422 &r.Rank,
423 )
424 if err != nil {
425 return nil, err
426 }
427
428 r.CreatedAt = parseStoredTime(createdAt)
429 r.IndexedAt = parseStoredTime(indexedAt)
430 results = append(results, r)
431 }
432
433 LogInfof("search completed: %d results", len(results))
434 return results, rows.Err()
435}
436
437func listRecentPosts(source string) ([]SearchResult, error) {
438 rows, err := db.Query(`
439 SELECT uri, cid, author_did, author_handle, text, created_at,
440 like_count, repost_count, reply_count, source, indexed_at
441 FROM posts
442 WHERE (? = '' OR source = ?)
443 ORDER BY created_at DESC
444 LIMIT 25
445 `, source, source)
446 if err != nil {
447 LogErrorf("failed to list recent posts: %v", err)
448 return nil, err
449 }
450 defer rows.Close()
451
452 var results []SearchResult
453 for rows.Next() {
454 var r SearchResult
455 var createdAt, indexedAt string
456
457 err := rows.Scan(
458 &r.URI,
459 &r.CID,
460 &r.AuthorDID,
461 &r.AuthorHandle,
462 &r.Text,
463 &createdAt,
464 &r.LikeCount,
465 &r.RepostCount,
466 &r.ReplyCount,
467 &r.Source,
468 &indexedAt,
469 )
470 if err != nil {
471 return nil, err
472 }
473
474 r.CreatedAt = parseStoredTime(createdAt)
475 r.IndexedAt = parseStoredTime(indexedAt)
476 results = append(results, r)
477 }
478
479 LogInfof("browse completed: %d results", len(results))
480 return results, rows.Err()
481}
482
483func parseStoredTime(value string) time.Time {
484 if value == "" {
485 return time.Time{}
486 }
487
488 layouts := []string{
489 time.RFC3339Nano,
490 time.RFC3339,
491 "2006-01-02 15:04:05.999999999-07:00",
492 "2006-01-02 15:04:05.999999999Z07:00",
493 "2006-01-02 15:04:05.999999999",
494 "2006-01-02 15:04:05 -0700 MST",
495 "2006-01-02 15:04:05",
496 }
497
498 for _, layout := range layouts {
499 parsed, err := time.Parse(layout, value)
500 if err == nil {
501 return parsed
502 }
503 }
504
505 return time.Time{}
506}
507
508// CountPosts returns the total number of posts in the database
509func CountPosts() (int, error) {
510 LogInfo("counting posts in database")
511
512 var count int
513 err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&count)
514 if err != nil {
515 LogErrorf("failed to count posts: %v", err)
516 return 0, err
517 }
518
519 LogInfof("post count: %d", count)
520 return count, nil
521}
522
523func wrapDBError(message string, err error) error {
524 return &dbError{message: message, err: err}
525}
526
527type dbError struct {
528 message string
529 err error
530}
531
532func (e *dbError) Error() string {
533 return e.message + ": " + e.err.Error()
534}
535
536func (e *dbError) Unwrap() error {
537 return e.err
538}