Highly ambitious ATProtocol AppView service and sdks
at main 266 lines 6.8 kB view raw
1import type { SessionAdapter, SessionData } from "../types.ts"; 2import { Client } from "pg"; 3 4interface SessionTable { 5 session_id: string; 6 user_id: string; 7 handle: string | null; 8 is_authenticated: boolean; 9 data: string | null; 10 created_at: Date; 11 expires_at: Date; 12 last_accessed_at: Date; 13} 14 15export class PostgresAdapter implements SessionAdapter { 16 private connectionString: string; 17 18 constructor(connectionString: string) { 19 this.connectionString = connectionString; 20 } 21 22 // Initialize the sessions table 23 async initialize(): Promise<void> { 24 const client = new Client({ connectionString: this.connectionString }); 25 26 try { 27 await client.connect(); 28 29 await client.query(` 30 CREATE TABLE IF NOT EXISTS sessions ( 31 session_id TEXT PRIMARY KEY, 32 user_id TEXT NOT NULL, 33 handle TEXT, 34 is_authenticated BOOLEAN NOT NULL DEFAULT true, 35 data JSONB, 36 created_at BIGINT NOT NULL, 37 expires_at BIGINT NOT NULL, 38 last_accessed_at BIGINT NOT NULL 39 ) 40 `); 41 42 // Index for cleanup operations 43 await client.query(` 44 CREATE INDEX IF NOT EXISTS idx_sessions_expires_at 45 ON sessions(expires_at) 46 `); 47 48 // Index for user lookups 49 await client.query(` 50 CREATE INDEX IF NOT EXISTS idx_sessions_user_id 51 ON sessions(user_id) 52 `); 53 } finally { 54 await client.end(); 55 } 56 } 57 58 async get(sessionId: string): Promise<SessionData | null> { 59 const client = new Client({ connectionString: this.connectionString }); 60 61 try { 62 await client.connect(); 63 64 const result = await client.query( 65 "SELECT * FROM sessions WHERE session_id = $1", 66 [sessionId] 67 ); 68 69 if (result.rows.length === 0) return null; 70 71 return this.rowToSessionData(result.rows[0]); 72 } finally { 73 await client.end(); 74 } 75 } 76 77 async set(sessionId: string, data: SessionData): Promise<void> { 78 const client = new Client({ connectionString: this.connectionString }); 79 80 try { 81 await client.connect(); 82 83 await client.query( 84 ` 85 INSERT INTO sessions 86 (session_id, user_id, handle, is_authenticated, data, created_at, expires_at, last_accessed_at) 87 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 88 ON CONFLICT (session_id) 89 DO UPDATE SET 90 user_id = $2, 91 handle = $3, 92 is_authenticated = $4, 93 data = $5, 94 created_at = $6, 95 expires_at = $7, 96 last_accessed_at = $8 97 `, 98 [ 99 sessionId, 100 data.userId, 101 data.handle || null, 102 data.isAuthenticated, 103 data.data ? JSON.stringify(data.data) : null, 104 data.createdAt, 105 data.expiresAt, 106 data.lastAccessedAt, 107 ] 108 ); 109 } finally { 110 await client.end(); 111 } 112 } 113 114 async update( 115 sessionId: string, 116 updates: Partial<SessionData> 117 ): Promise<boolean> { 118 const setParts: string[] = []; 119 const values: (string | number | boolean | null)[] = []; 120 let paramIndex = 1; 121 122 if (updates.userId !== undefined) { 123 setParts.push(`user_id = $${paramIndex++}`); 124 values.push(updates.userId); 125 } 126 127 if (updates.handle !== undefined) { 128 setParts.push(`handle = $${paramIndex++}`); 129 values.push(updates.handle); 130 } 131 132 if (updates.isAuthenticated !== undefined) { 133 setParts.push(`is_authenticated = $${paramIndex++}`); 134 values.push(updates.isAuthenticated); 135 } 136 137 if (updates.data !== undefined) { 138 setParts.push(`data = $${paramIndex++}`); 139 values.push(updates.data ? JSON.stringify(updates.data) : null); 140 } 141 142 if (updates.expiresAt !== undefined) { 143 setParts.push(`expires_at = $${paramIndex++}`); 144 values.push(updates.expiresAt); 145 } 146 147 if (updates.lastAccessedAt !== undefined) { 148 setParts.push(`last_accessed_at = $${paramIndex++}`); 149 values.push(updates.lastAccessedAt); 150 } 151 152 if (setParts.length === 0) return false; 153 154 values.push(sessionId); 155 156 const client = new Client({ connectionString: this.connectionString }); 157 158 try { 159 await client.connect(); 160 161 const result = await client.query( 162 ` 163 UPDATE sessions 164 SET ${setParts.join(", ")} 165 WHERE session_id = $${paramIndex} 166 `, 167 values 168 ); 169 170 return result.rowCount !== null && result.rowCount > 0; 171 } finally { 172 await client.end(); 173 } 174 } 175 176 async delete(sessionId: string): Promise<void> { 177 const client = new Client({ connectionString: this.connectionString }); 178 179 try { 180 await client.connect(); 181 await client.query("DELETE FROM sessions WHERE session_id = $1", [ 182 sessionId, 183 ]); 184 } finally { 185 await client.end(); 186 } 187 } 188 189 async cleanup(expiresBeforeMs: number): Promise<number> { 190 const client = new Client({ connectionString: this.connectionString }); 191 192 try { 193 await client.connect(); 194 195 const result = await client.query( 196 "DELETE FROM sessions WHERE expires_at < $1", 197 [expiresBeforeMs] 198 ); 199 200 return result.rowCount || 0; 201 } finally { 202 await client.end(); 203 } 204 } 205 206 async exists(sessionId: string): Promise<boolean> { 207 const client = new Client({ connectionString: this.connectionString }); 208 209 try { 210 await client.connect(); 211 212 const result = await client.query( 213 "SELECT 1 FROM sessions WHERE session_id = $1 LIMIT 1", 214 [sessionId] 215 ); 216 217 return result.rows.length > 0; 218 } finally { 219 await client.end(); 220 } 221 } 222 223 private rowToSessionData(row: SessionTable): SessionData { 224 return { 225 sessionId: row.session_id, 226 userId: row.user_id, 227 handle: row.handle || undefined, 228 isAuthenticated: row.is_authenticated, 229 data: row.data ? JSON.parse(row.data) : undefined, 230 createdAt: row.created_at.getTime(), 231 expiresAt: row.expires_at.getTime(), 232 lastAccessedAt: row.last_accessed_at.getTime(), 233 }; 234 } 235 236 // PostgreSQL-specific methods 237 async getSessionsByUser(userId: string): Promise<SessionData[]> { 238 const client = new Client({ connectionString: this.connectionString }); 239 240 try { 241 await client.connect(); 242 243 const result = await client.query( 244 "SELECT * FROM sessions WHERE user_id = $1", 245 [userId] 246 ); 247 248 return result.rows.map((row: Record<string, unknown>) => 249 this.rowToSessionData(row as unknown as SessionTable) 250 ); 251 } finally { 252 await client.end(); 253 } 254 } 255 256 async vacuum(): Promise<void> { 257 const client = new Client({ connectionString: this.connectionString }); 258 259 try { 260 await client.connect(); 261 await client.query("VACUUM ANALYZE sessions"); 262 } finally { 263 await client.end(); 264 } 265 } 266}