Monorepo for Tangled — https://tangled.org
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 212 lines 5.8 kB view raw
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log/slog" 8 "os" 9 "strings" 10 11 securejoin "github.com/cyphar/filepath-securejoin" 12 _ "github.com/mattn/go-sqlite3" 13 "tangled.org/core/log" 14 "tangled.org/core/orm" 15) 16 17type DB struct { 18 db *sql.DB 19 logger *slog.Logger 20} 21 22func Setup(ctx context.Context, dbPath string) (*DB, error) { 23 // https://github.com/mattn/go-sqlite3#connection-string 24 opts := []string{ 25 "_foreign_keys=1", 26 "_journal_mode=WAL", 27 "_synchronous=NORMAL", 28 "_auto_vacuum=incremental", 29 "_busy_timeout=5000", 30 } 31 32 logger := log.FromContext(ctx) 33 logger = log.SubLogger(logger, "db") 34 35 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 36 if err != nil { 37 return nil, err 38 } 39 40 conn, err := db.Conn(ctx) 41 if err != nil { 42 return nil, err 43 } 44 defer conn.Close() 45 46 _, err = conn.ExecContext(ctx, ` 47 create table if not exists known_dids ( 48 did text primary key 49 ); 50 51 create table if not exists public_keys ( 52 id integer primary key autoincrement, 53 did text not null, 54 key text not null, 55 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 56 unique(did, key), 57 foreign key (did) references known_dids(did) on delete cascade 58 ); 59 60 create table if not exists _jetstream ( 61 id integer primary key autoincrement, 62 last_time_us integer not null 63 ); 64 65 create table if not exists events ( 66 rkey text not null, 67 nsid text not null, 68 event text not null, -- json 69 created integer not null default (strftime('%s', 'now')), 70 primary key (rkey, nsid) 71 ); 72 73 create table if not exists repo_keys ( 74 repo_did text primary key, 75 signing_key blob not null, 76 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 77 ); 78 79 create table if not exists migrations ( 80 id integer primary key autoincrement, 81 name text unique 82 ); 83 `) 84 if err != nil { 85 return nil, err 86 } 87 88 if err := orm.RunMigration(conn, logger, "add-owner-did-to-repo-keys", func(tx *sql.Tx) error { 89 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN owner_did TEXT`) 90 return mErr 91 }); err != nil { 92 return nil, err 93 } 94 95 if err := orm.RunMigration(conn, logger, "add-repo-name-to-repo-keys", func(tx *sql.Tx) error { 96 _, mErr := tx.ExecContext(ctx, `ALTER TABLE repo_keys ADD COLUMN repo_name TEXT`) 97 return mErr 98 }); err != nil { 99 return nil, err 100 } 101 102 if err := orm.RunMigration(conn, logger, "add-unique-owner-repo-on-repo-keys", func(tx *sql.Tx) error { 103 _, mErr := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_repo_keys_owner_repo ON repo_keys(owner_did, repo_name)`) 104 return mErr 105 }); err != nil { 106 return nil, err 107 } 108 109 if err := orm.RunMigration(conn, logger, "add-key-type-and-nullable-signing-key", func(tx *sql.Tx) error { 110 _, mErr := tx.ExecContext(ctx, ` 111 create table repo_keys_new ( 112 repo_did text primary key, 113 signing_key blob, 114 created_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 115 owner_did text, 116 repo_name text, 117 at_uri text, 118 key_type text not null default 'k256' 119 ); 120 insert into repo_keys_new (repo_did, signing_key, created_at, owner_did, repo_name, key_type) 121 select repo_did, signing_key, created_at, owner_did, repo_name, 'k256' 122 from repo_keys; 123 drop table repo_keys; 124 alter table repo_keys_new rename to repo_keys; 125 create unique index if not exists idx_repo_keys_owner_repo 126 on repo_keys(owner_did, repo_name); 127 `) 128 return mErr 129 }); err != nil { 130 return nil, err 131 } 132 133 return &DB{ 134 db: db, 135 logger: logger, 136 }, nil 137} 138 139func (d *DB) StoreRepoKey(repoDid string, signingKey []byte, ownerDid, repoName, atUri string) error { 140 _, err := d.db.Exec( 141 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, at_uri, key_type) VALUES (?, ?, ?, ?, ?, 'k256')`, 142 repoDid, signingKey, ownerDid, repoName, atUri, 143 ) 144 return err 145} 146 147func (d *DB) StoreRepoDidWeb(repoDid, ownerDid, repoName, atUri string) error { 148 _, err := d.db.Exec( 149 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, at_uri, key_type) VALUES (?, NULL, ?, ?, ?, 'web')`, 150 repoDid, ownerDid, repoName, atUri, 151 ) 152 return err 153} 154 155func (d *DB) DeleteRepoKey(repoDid string) error { 156 _, err := d.db.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid) 157 return err 158} 159 160func (d *DB) RepoDidExists(repoDid string) (bool, error) { 161 var count int 162 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count) 163 return count > 0, err 164} 165 166func (d *DB) GetRepoDid(ownerDid, repoName string) (string, error) { 167 var repoDid string 168 err := d.db.QueryRow( 169 `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`, 170 ownerDid, repoName, 171 ).Scan(&repoDid) 172 return repoDid, err 173} 174 175func (d *DB) GetRepoKeyOwner(repoDid string) (ownerDid string, repoName string, err error) { 176 var nullOwner, nullName sql.NullString 177 err = d.db.QueryRow( 178 `SELECT owner_did, repo_name FROM repo_keys WHERE repo_did = ?`, 179 repoDid, 180 ).Scan(&nullOwner, &nullName) 181 if err != nil { 182 return 183 } 184 if !nullOwner.Valid || !nullName.Valid || nullOwner.String == "" || nullName.String == "" { 185 err = fmt.Errorf("repo_keys row for %s has empty or null owner_did or repo_name", repoDid) 186 return 187 } 188 ownerDid = nullOwner.String 189 repoName = nullName.String 190 return 191} 192 193func (d *DB) ResolveRepoDIDOnDisk(scanPath, repoDid string) (repoPath, ownerDid, repoName string, err error) { 194 ownerDid, repoName, err = d.GetRepoKeyOwner(repoDid) 195 if err != nil { 196 return 197 } 198 199 didPath, joinErr := securejoin.SecureJoin(scanPath, repoDid) 200 if joinErr != nil { 201 err = fmt.Errorf("securejoin failed for repo DID path %s: %w", repoDid, joinErr) 202 return 203 } 204 205 if _, statErr := os.Stat(didPath); statErr != nil { 206 err = fmt.Errorf("repo DID directory not found on disk: %s", didPath) 207 return 208 } 209 210 repoPath = didPath 211 return 212}