a digital person for bluesky
1#!/usr/bin/env python3
2"""SQLite database for robust notification tracking."""
3
4import sqlite3
5import json
6from pathlib import Path
7from datetime import datetime, timedelta
8from typing import Set, Dict, List, Optional, Tuple
9import logging
10
11logger = logging.getLogger(__name__)
12
13class NotificationDB:
14 """Database for tracking notification processing state."""
15
16 def __init__(self, db_path: str = "queue/notifications.db"):
17 """Initialize the notification database."""
18 self.db_path = Path(db_path)
19 self.db_path.parent.mkdir(exist_ok=True, parents=True)
20 self.conn = None
21 self._init_db()
22
23 def _init_db(self):
24 """Initialize database schema."""
25 self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
26 self.conn.row_factory = sqlite3.Row
27
28 # Create main notifications table
29 self.conn.execute("""
30 CREATE TABLE IF NOT EXISTS notifications (
31 uri TEXT PRIMARY KEY,
32 indexed_at TEXT NOT NULL,
33 processed_at TEXT,
34 status TEXT NOT NULL DEFAULT 'pending',
35 reason TEXT,
36 author_handle TEXT,
37 author_did TEXT,
38 text TEXT,
39 parent_uri TEXT,
40 root_uri TEXT,
41 error TEXT,
42 metadata TEXT
43 )
44 """)
45
46 # Create indexes for faster lookups
47 self.conn.execute("""
48 CREATE INDEX IF NOT EXISTS idx_indexed_at
49 ON notifications(indexed_at DESC)
50 """)
51
52 self.conn.execute("""
53 CREATE INDEX IF NOT EXISTS idx_status
54 ON notifications(status)
55 """)
56
57 self.conn.execute("""
58 CREATE INDEX IF NOT EXISTS idx_author_handle
59 ON notifications(author_handle)
60 """)
61
62 # Create session tracking table
63 self.conn.execute("""
64 CREATE TABLE IF NOT EXISTS sessions (
65 id INTEGER PRIMARY KEY AUTOINCREMENT,
66 started_at TEXT NOT NULL,
67 ended_at TEXT,
68 last_seen_at TEXT,
69 notifications_processed INTEGER DEFAULT 0,
70 notifications_skipped INTEGER DEFAULT 0,
71 notifications_error INTEGER DEFAULT 0
72 )
73 """)
74
75 self.conn.commit()
76
77 def add_notification(self, notif_dict: Dict) -> bool:
78 """Add a notification to the database."""
79 try:
80 # Handle None input
81 if not notif_dict:
82 return False
83
84 # Extract key fields
85 uri = notif_dict.get('uri', '')
86 if not uri:
87 return False
88
89 indexed_at = notif_dict.get('indexed_at', '')
90 reason = notif_dict.get('reason', '')
91 author = notif_dict.get('author', {}) if notif_dict.get('author') else {}
92 author_handle = author.get('handle', '') if author else ''
93 author_did = author.get('did', '') if author else ''
94
95 # Extract text from record if available (handle None records)
96 record = notif_dict.get('record') or {}
97 text = record.get('text', '')[:500] if record else ''
98
99 # Extract thread info
100 parent_uri = None
101 root_uri = None
102 if record and 'reply' in record and record['reply']:
103 reply_info = record['reply']
104 if reply_info and isinstance(reply_info, dict):
105 parent_info = reply_info.get('parent', {})
106 root_info = reply_info.get('root', {})
107 if parent_info:
108 parent_uri = parent_info.get('uri')
109 if root_info:
110 root_uri = root_info.get('uri')
111
112 # Store additional metadata as JSON
113 metadata = {
114 'cid': notif_dict.get('cid'),
115 'labels': notif_dict.get('labels', []),
116 'is_read': notif_dict.get('is_read', False)
117 }
118
119 self.conn.execute("""
120 INSERT OR IGNORE INTO notifications
121 (uri, indexed_at, reason, author_handle, author_did, text,
122 parent_uri, root_uri, status, metadata)
123 VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?)
124 """, (uri, indexed_at, reason, author_handle, author_did, text,
125 parent_uri, root_uri, json.dumps(metadata)))
126
127 self.conn.commit()
128 return True
129
130 except Exception as e:
131 logger.error(f"Error adding notification to DB: {e}")
132 return False
133
134 def is_processed(self, uri: str) -> bool:
135 """Check if a notification has been processed."""
136 cursor = self.conn.execute("""
137 SELECT status FROM notifications WHERE uri = ?
138 """, (uri,))
139 row = cursor.fetchone()
140
141 if row:
142 return row['status'] in ['processed', 'ignored', 'no_reply']
143 return False
144
145 def mark_processed(self, uri: str, status: str = 'processed', error: str = None):
146 """Mark a notification as processed."""
147 try:
148 self.conn.execute("""
149 UPDATE notifications
150 SET status = ?, processed_at = ?, error = ?
151 WHERE uri = ?
152 """, (status, datetime.now().isoformat(), error, uri))
153 self.conn.commit()
154 except Exception as e:
155 logger.error(f"Error marking notification processed: {e}")
156
157 def get_unprocessed(self, limit: int = 100) -> List[Dict]:
158 """Get unprocessed notifications."""
159 cursor = self.conn.execute("""
160 SELECT * FROM notifications
161 WHERE status = 'pending'
162 ORDER BY indexed_at ASC
163 LIMIT ?
164 """, (limit,))
165
166 return [dict(row) for row in cursor]
167
168 def get_latest_processed_time(self) -> Optional[str]:
169 """Get the timestamp of the most recently processed notification."""
170 cursor = self.conn.execute("""
171 SELECT MAX(indexed_at) as latest
172 FROM notifications
173 WHERE status IN ('processed', 'ignored', 'no_reply')
174 """)
175 row = cursor.fetchone()
176 return row['latest'] if row and row['latest'] else None
177
178 def cleanup_old_records(self, days: int = 7):
179 """Remove records older than specified days."""
180 cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
181
182 deleted = self.conn.execute("""
183 DELETE FROM notifications
184 WHERE indexed_at < ?
185 AND status IN ('processed', 'ignored', 'no_reply', 'error')
186 """, (cutoff_date,)).rowcount
187
188 self.conn.commit()
189
190 if deleted > 0:
191 logger.info(f"Cleaned up {deleted} old notification records")
192 # Vacuum to reclaim space
193 self.conn.execute("VACUUM")
194
195 def get_stats(self) -> Dict:
196 """Get database statistics."""
197 stats = {}
198
199 # Count by status
200 cursor = self.conn.execute("""
201 SELECT status, COUNT(*) as count
202 FROM notifications
203 GROUP BY status
204 """)
205
206 for row in cursor:
207 stats[f"status_{row['status']}"] = row['count']
208
209 # Total count
210 cursor = self.conn.execute("SELECT COUNT(*) as total FROM notifications")
211 stats['total'] = cursor.fetchone()['total']
212
213 # Recent activity (last 24h)
214 yesterday = (datetime.now() - timedelta(days=1)).isoformat()
215 cursor = self.conn.execute("""
216 SELECT COUNT(*) as recent
217 FROM notifications
218 WHERE indexed_at > ?
219 """, (yesterday,))
220 stats['recent_24h'] = cursor.fetchone()['recent']
221
222 return stats
223
224 def start_session(self) -> int:
225 """Start a new processing session."""
226 cursor = self.conn.execute("""
227 INSERT INTO sessions (started_at, last_seen_at)
228 VALUES (?, ?)
229 """, (datetime.now().isoformat(), datetime.now().isoformat()))
230 self.conn.commit()
231 return cursor.lastrowid
232
233 def update_session(self, session_id: int, processed: int = 0, skipped: int = 0, error: int = 0):
234 """Update session statistics."""
235 self.conn.execute("""
236 UPDATE sessions
237 SET last_seen_at = ?,
238 notifications_processed = notifications_processed + ?,
239 notifications_skipped = notifications_skipped + ?,
240 notifications_error = notifications_error + ?
241 WHERE id = ?
242 """, (datetime.now().isoformat(), processed, skipped, error, session_id))
243 self.conn.commit()
244
245 def end_session(self, session_id: int):
246 """End a processing session."""
247 self.conn.execute("""
248 UPDATE sessions
249 SET ended_at = ?
250 WHERE id = ?
251 """, (datetime.now().isoformat(), session_id))
252 self.conn.commit()
253
254 def get_processed_uris(self, limit: int = 10000) -> Set[str]:
255 """Get set of processed URIs for compatibility with existing code."""
256 cursor = self.conn.execute("""
257 SELECT uri FROM notifications
258 WHERE status IN ('processed', 'ignored', 'no_reply')
259 ORDER BY processed_at DESC
260 LIMIT ?
261 """, (limit,))
262
263 return {row['uri'] for row in cursor}
264
265 def migrate_from_json(self, json_path: str = "queue/processed_notifications.json"):
266 """Migrate data from the old JSON format."""
267 json_file = Path(json_path)
268 if not json_file.exists():
269 return
270
271 try:
272 with open(json_file, 'r') as f:
273 uris = json.load(f)
274
275 migrated = 0
276 for uri in uris:
277 # Add as processed with unknown timestamp
278 self.conn.execute("""
279 INSERT OR IGNORE INTO notifications
280 (uri, indexed_at, status, processed_at)
281 VALUES (?, ?, 'processed', ?)
282 """, (uri, datetime.now().isoformat(), datetime.now().isoformat()))
283 migrated += 1
284
285 self.conn.commit()
286 logger.info(f"Migrated {migrated} URIs from JSON to database")
287
288 # Rename old file to backup
289 backup_path = json_file.with_suffix('.json.backup')
290 json_file.rename(backup_path)
291 logger.info(f"Renamed old JSON file to {backup_path}")
292
293 except Exception as e:
294 logger.error(f"Error migrating from JSON: {e}")
295
296 def close(self):
297 """Close database connection."""
298 if self.conn:
299 self.conn.close()