search and/or read your saved and liked bluesky posts
wails go svelte sqlite desktop bluesky
at main 538 lines 13 kB view raw
1package main 2 3import ( 4 "database/sql" 5 "embed" 6 "fmt" 7 "os" 8 "path/filepath" 9 "strings" 10 "time" 11 12 _ "modernc.org/sqlite" 13) 14 15var db *sql.DB 16 17//go:embed migrations/*.sql 18var migrationsFS embed.FS 19 20// Open opens the database connection and runs migrations 21func Open(dbPath string) error { 22 LogInfof("opening database: %s", dbPath) 23 24 dir := filepath.Dir(dbPath) 25 if err := os.MkdirAll(dir, 0755); err != nil { 26 return wrapDBError("failed to create database directory", err) 27 } 28 29 var err error 30 db, err = sql.Open("sqlite", dbPath+"?_pragma=foreign_keys(1)") 31 if err != nil { 32 return wrapDBError("failed to open database", err) 33 } 34 35 if err := db.Ping(); err != nil { 36 return wrapDBError("failed to ping database", err) 37 } 38 39 _, err = db.Exec("PRAGMA journal_mode=WAL") 40 if err != nil { 41 return wrapDBError("failed to enable WAL mode", err) 42 } 43 44 LogInfo("database connection established with WAL mode") 45 46 if err := runMigrations(); err != nil { 47 return wrapDBError("failed to run migrations", err) 48 } 49 50 LogInfo("database migrations completed successfully") 51 return nil 52} 53 54func runMigrations() error { 55 content, err := migrationsFS.ReadFile("migrations/000_initial_schema.sql") 56 if err != nil { 57 return wrapDBError("failed to read migration", err) 58 } 59 60 if _, err := db.Exec(string(content)); err != nil { 61 return wrapDBError("failed to execute migration", err) 62 } 63 64 if err := ensureSchemaCompatibility(); err != nil { 65 return wrapDBError("failed to upgrade schema", err) 66 } 67 68 return nil 69} 70 71func ensureSchemaCompatibility() error { 72 columnsByTable := map[string][]struct { 73 name string 74 definition string 75 }{ 76 "posts": { 77 {name: "facets", definition: "TEXT"}, 78 }, 79 "auth": { 80 {name: "session_id", definition: "TEXT"}, 81 {name: "auth_server_url", definition: "TEXT"}, 82 {name: "auth_server_token_endpoint", definition: "TEXT"}, 83 {name: "auth_server_revocation_endpoint", definition: "TEXT"}, 84 {name: "dpop_auth_nonce", definition: "TEXT"}, 85 {name: "dpop_host_nonce", definition: "TEXT"}, 86 {name: "dpop_private_key", definition: "TEXT"}, 87 }, 88 } 89 90 for table, columns := range columnsByTable { 91 exists, err := tableExists(table) 92 if err != nil { 93 return err 94 } 95 if !exists { 96 continue 97 } 98 99 for _, column := range columns { 100 hasColumn, err := columnExists(table, column.name) 101 if err != nil { 102 return err 103 } 104 if hasColumn { 105 continue 106 } 107 108 query := fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", table, column.name, column.definition) 109 if _, err := db.Exec(query); err != nil { 110 return wrapDBError("failed to add "+table+"."+column.name, err) 111 } 112 } 113 } 114 115 return nil 116} 117 118func tableExists(table string) (bool, error) { 119 var count int 120 if err := db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = ?", table).Scan(&count); err != nil { 121 return false, err 122 } 123 return count > 0, nil 124} 125 126func columnExists(table, column string) (bool, error) { 127 rows, err := db.Query("SELECT name FROM pragma_table_info(?)", table) 128 if err != nil { 129 return false, err 130 } 131 defer rows.Close() 132 133 for rows.Next() { 134 var name string 135 136 if err := rows.Scan(&name); err != nil { 137 return false, err 138 } 139 140 if name == column { 141 return true, nil 142 } 143 } 144 145 return false, rows.Err() 146} 147 148// Close closes the database connection 149func Close() error { 150 LogInfo("closing database connection") 151 if db != nil { 152 err := db.Close() 153 if err != nil { 154 LogErrorf("failed to close database: %v", err) 155 return err 156 } 157 LogInfo("database connection closed") 158 } 159 return nil 160} 161 162// PostExists checks if a post with the given URI already exists in the database 163func PostExists(uri string) (bool, error) { 164 var exists bool 165 err := db.QueryRow("SELECT EXISTS(SELECT 1 FROM posts WHERE uri = ?)", uri).Scan(&exists) 166 if err != nil { 167 return false, err 168 } 169 return exists, nil 170} 171 172// InsertPost inserts a post into the database 173func InsertPost(post *Post) error { 174 LogInfof("inserting post: %s by %s", post.URI, post.AuthorHandle) 175 176 exists, err := PostExists(post.URI) 177 if err != nil { 178 LogErrorf("failed to check if post exists: %s, error: %v", post.URI, err) 179 return err 180 } 181 182 if exists { 183 LogDebugf("skipping already indexed post: %s", post.URI) 184 return nil 185 } 186 187 query := ` 188 INSERT INTO posts (uri, cid, author_did, author_handle, text, created_at, like_count, repost_count, reply_count, source, facets) 189 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 190 ON CONFLICT(uri) DO UPDATE SET 191 cid = excluded.cid, 192 author_did = excluded.author_did, 193 author_handle = excluded.author_handle, 194 text = excluded.text, 195 created_at = excluded.created_at, 196 like_count = excluded.like_count, 197 repost_count = excluded.repost_count, 198 reply_count = excluded.reply_count, 199 source = excluded.source, 200 facets = excluded.facets, 201 indexed_at = CURRENT_TIMESTAMP 202 ` 203 204 _, err = db.Exec(query, 205 post.URI, 206 post.CID, 207 post.AuthorDID, 208 post.AuthorHandle, 209 post.Text, 210 post.CreatedAt, 211 post.LikeCount, 212 post.RepostCount, 213 post.ReplyCount, 214 post.Source, 215 post.Facets, 216 ) 217 218 if err != nil { 219 LogErrorf("failed to insert post: %s, error: %v", post.URI, err) 220 } 221 222 return err 223} 224 225// UpsertAuth inserts or updates auth information 226func UpsertAuth(auth *Auth) error { 227 LogInfof("upserting auth: %s (%s)", auth.DID, auth.Handle) 228 229 query := ` 230 INSERT INTO auth (did, handle, access_jwt, refresh_jwt, pds_url, session_id, 231 auth_server_url, auth_server_token_endpoint, auth_server_revocation_endpoint, 232 dpop_auth_nonce, dpop_host_nonce, dpop_private_key, updated_at) 233 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) 234 ON CONFLICT(did) DO UPDATE SET 235 handle = excluded.handle, 236 access_jwt = excluded.access_jwt, 237 refresh_jwt = excluded.refresh_jwt, 238 pds_url = excluded.pds_url, 239 session_id = excluded.session_id, 240 auth_server_url = excluded.auth_server_url, 241 auth_server_token_endpoint = excluded.auth_server_token_endpoint, 242 auth_server_revocation_endpoint = excluded.auth_server_revocation_endpoint, 243 dpop_auth_nonce = excluded.dpop_auth_nonce, 244 dpop_host_nonce = excluded.dpop_host_nonce, 245 dpop_private_key = excluded.dpop_private_key, 246 updated_at = CURRENT_TIMESTAMP 247 ` 248 249 _, err := db.Exec(query, 250 auth.DID, 251 auth.Handle, 252 auth.AccessJWT, 253 auth.RefreshJWT, 254 auth.PDSURL, 255 auth.SessionID, 256 auth.AuthServerURL, 257 auth.AuthServerTokenEndpoint, 258 auth.AuthServerRevocationEndpoint, 259 auth.DPoPAuthNonce, 260 auth.DPoPHostNonce, 261 auth.DPoPPrivateKey, 262 ) 263 264 if err != nil { 265 LogErrorf("failed to upsert auth: %s, error: %v", auth.DID, err) 266 } 267 268 return err 269} 270 271// ClearAuth removes all persisted auth rows for this desktop client. 272func ClearAuth() error { 273 _, err := db.Exec("DELETE FROM auth") 274 return err 275} 276 277// GetAuth loads the auth record from the database 278func GetAuth() (*Auth, error) { 279 LogInfo("loading auth from database") 280 281 query := `SELECT did, handle, access_jwt, refresh_jwt, pds_url, session_id, 282 auth_server_url, auth_server_token_endpoint, auth_server_revocation_endpoint, 283 dpop_auth_nonce, dpop_host_nonce, dpop_private_key, updated_at 284 FROM auth 285 ORDER BY updated_at DESC 286 LIMIT 1` 287 288 auth, err := getAuthByQuery(query) 289 290 if err == sql.ErrNoRows { 291 LogInfo("no auth record found in database") 292 return nil, nil 293 } 294 if err != nil { 295 LogErrorf("failed to load auth: %v", err) 296 return nil, err 297 } 298 299 LogInfof("auth loaded successfully: %s (%s)", auth.DID, auth.Handle) 300 return auth, nil 301} 302 303// GetAuthByDID loads auth for a specific DID. 304func GetAuthByDID(did string) (*Auth, error) { 305 query := `SELECT did, handle, access_jwt, refresh_jwt, pds_url, session_id, 306 auth_server_url, auth_server_token_endpoint, auth_server_revocation_endpoint, 307 dpop_auth_nonce, dpop_host_nonce, dpop_private_key, updated_at 308 FROM auth 309 WHERE did = ? 310 LIMIT 1` 311 312 auth, err := getAuthByQuery(query, did) 313 if err == sql.ErrNoRows { 314 return nil, nil 315 } 316 if err != nil { 317 return nil, err 318 } 319 return auth, nil 320} 321 322func getAuthByQuery(query string, args ...any) (*Auth, error) { 323 var auth Auth 324 var updatedAt string 325 326 var sessionID, authServerURL, authServerTokenEndpoint, authServerRevocationEndpoint, dpopAuthNonce, dpopHostNonce, dpopPrivateKey sql.NullString 327 328 err := db.QueryRow(query, args...).Scan( 329 &auth.DID, 330 &auth.Handle, 331 &auth.AccessJWT, 332 &auth.RefreshJWT, 333 &auth.PDSURL, 334 &sessionID, 335 &authServerURL, 336 &authServerTokenEndpoint, 337 &authServerRevocationEndpoint, 338 &dpopAuthNonce, 339 &dpopHostNonce, 340 &dpopPrivateKey, 341 &updatedAt, 342 ) 343 if err != nil { 344 return nil, err 345 } 346 347 if sessionID.Valid { 348 auth.SessionID = sessionID.String 349 } 350 if authServerURL.Valid { 351 auth.AuthServerURL = authServerURL.String 352 } 353 if authServerTokenEndpoint.Valid { 354 auth.AuthServerTokenEndpoint = authServerTokenEndpoint.String 355 } 356 if authServerRevocationEndpoint.Valid { 357 auth.AuthServerRevocationEndpoint = authServerRevocationEndpoint.String 358 } 359 if dpopAuthNonce.Valid { 360 auth.DPoPAuthNonce = dpopAuthNonce.String 361 } 362 if dpopHostNonce.Valid { 363 auth.DPoPHostNonce = dpopHostNonce.String 364 } 365 if dpopPrivateKey.Valid { 366 auth.DPoPPrivateKey = dpopPrivateKey.String 367 } 368 369 auth.UpdatedAt = parseStoredTime(updatedAt) 370 return &auth, nil 371} 372 373// SearchPosts searches posts using FTS5 374func SearchPosts(query string, source string) ([]SearchResult, error) { 375 query = strings.TrimSpace(query) 376 if query == "*" { 377 query = "" 378 } 379 380 LogInfof("searching posts: query=%s, source=%s", query, source) 381 382 if query == "" { 383 return listRecentPosts(source) 384 } 385 386 sqlQuery := ` 387 SELECT p.uri, p.cid, p.author_did, p.author_handle, p.text, p.created_at, 388 p.like_count, p.repost_count, p.reply_count, p.source, p.indexed_at, 389 bm25(posts_fts, 5.0, 1.0) AS rank 390 FROM posts_fts 391 JOIN posts p ON posts_fts.rowid = p.rowid 392 WHERE posts_fts MATCH ? 393 AND (? = '' OR p.source = ?) 394 ORDER BY rank 395 LIMIT 25 396 ` 397 398 rows, err := db.Query(sqlQuery, query, source, source) 399 if err != nil { 400 LogErrorf("failed to execute search query: %v", err) 401 return nil, err 402 } 403 defer rows.Close() 404 405 var results []SearchResult 406 for rows.Next() { 407 var r SearchResult 408 var createdAt, indexedAt string 409 410 err := rows.Scan( 411 &r.URI, 412 &r.CID, 413 &r.AuthorDID, 414 &r.AuthorHandle, 415 &r.Text, 416 &createdAt, 417 &r.LikeCount, 418 &r.RepostCount, 419 &r.ReplyCount, 420 &r.Source, 421 &indexedAt, 422 &r.Rank, 423 ) 424 if err != nil { 425 return nil, err 426 } 427 428 r.CreatedAt = parseStoredTime(createdAt) 429 r.IndexedAt = parseStoredTime(indexedAt) 430 results = append(results, r) 431 } 432 433 LogInfof("search completed: %d results", len(results)) 434 return results, rows.Err() 435} 436 437func listRecentPosts(source string) ([]SearchResult, error) { 438 rows, err := db.Query(` 439 SELECT uri, cid, author_did, author_handle, text, created_at, 440 like_count, repost_count, reply_count, source, indexed_at 441 FROM posts 442 WHERE (? = '' OR source = ?) 443 ORDER BY created_at DESC 444 LIMIT 25 445 `, source, source) 446 if err != nil { 447 LogErrorf("failed to list recent posts: %v", err) 448 return nil, err 449 } 450 defer rows.Close() 451 452 var results []SearchResult 453 for rows.Next() { 454 var r SearchResult 455 var createdAt, indexedAt string 456 457 err := rows.Scan( 458 &r.URI, 459 &r.CID, 460 &r.AuthorDID, 461 &r.AuthorHandle, 462 &r.Text, 463 &createdAt, 464 &r.LikeCount, 465 &r.RepostCount, 466 &r.ReplyCount, 467 &r.Source, 468 &indexedAt, 469 ) 470 if err != nil { 471 return nil, err 472 } 473 474 r.CreatedAt = parseStoredTime(createdAt) 475 r.IndexedAt = parseStoredTime(indexedAt) 476 results = append(results, r) 477 } 478 479 LogInfof("browse completed: %d results", len(results)) 480 return results, rows.Err() 481} 482 483func parseStoredTime(value string) time.Time { 484 if value == "" { 485 return time.Time{} 486 } 487 488 layouts := []string{ 489 time.RFC3339Nano, 490 time.RFC3339, 491 "2006-01-02 15:04:05.999999999-07:00", 492 "2006-01-02 15:04:05.999999999Z07:00", 493 "2006-01-02 15:04:05.999999999", 494 "2006-01-02 15:04:05 -0700 MST", 495 "2006-01-02 15:04:05", 496 } 497 498 for _, layout := range layouts { 499 parsed, err := time.Parse(layout, value) 500 if err == nil { 501 return parsed 502 } 503 } 504 505 return time.Time{} 506} 507 508// CountPosts returns the total number of posts in the database 509func CountPosts() (int, error) { 510 LogInfo("counting posts in database") 511 512 var count int 513 err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&count) 514 if err != nil { 515 LogErrorf("failed to count posts: %v", err) 516 return 0, err 517 } 518 519 LogInfof("post count: %d", count) 520 return count, nil 521} 522 523func wrapDBError(message string, err error) error { 524 return &dbError{message: message, err: err} 525} 526 527type dbError struct { 528 message string 529 err error 530} 531 532func (e *dbError) Error() string { 533 return e.message + ": " + e.err.Error() 534} 535 536func (e *dbError) Unwrap() error { 537 return e.err 538}