personal memory agent
at main 241 lines 7.2 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Shared utilities for the observer app. 5 6Provides common helpers for observer metadata management and sync history 7that are used by both routes.py and events.py. 8""" 9 10from __future__ import annotations 11 12import json 13import logging 14import os 15from pathlib import Path 16 17from apps.utils import get_app_storage_path 18 19logger = logging.getLogger(__name__) 20 21 22def get_observers_dir() -> Path: 23 """Get the observers storage directory.""" 24 return get_app_storage_path("observer", "observers", ensure_exists=True) 25 26 27def get_hist_dir(key_prefix: str, ensure_exists: bool = True) -> Path: 28 """Get the history directory for an observer. 29 30 Args: 31 key_prefix: First 8 chars of observer key 32 ensure_exists: Create directory if it doesn't exist (default: True) 33 34 Returns: 35 Path to apps/observer/observers/<key_prefix>/hist/ 36 """ 37 return get_app_storage_path( 38 "observer", "observers", key_prefix, "hist", ensure_exists=ensure_exists 39 ) 40 41 42def load_observer(key: str) -> dict | None: 43 """Load observer metadata by key. 44 45 Args: 46 key: Full observer authentication key 47 48 Returns: 49 Observer metadata dict if found and key matches, None otherwise 50 """ 51 observers_dir = get_observers_dir() 52 observer_path = observers_dir / f"{key[:8]}.json" 53 if not observer_path.exists(): 54 return None 55 try: 56 with open(observer_path) as f: 57 data = json.load(f) 58 # Verify full key matches 59 if data.get("key") != key: 60 return None 61 return data 62 except (json.JSONDecodeError, OSError): 63 return None 64 65 66def save_observer(data: dict) -> bool: 67 """Save observer metadata. 68 69 Args: 70 data: Observer metadata dict (must contain 'key' field) 71 72 Returns: 73 True if saved successfully, False otherwise 74 """ 75 key = data.get("key") 76 if not key: 77 return False 78 observers_dir = get_observers_dir() 79 observer_path = observers_dir / f"{key[:8]}.json" 80 try: 81 with open(observer_path, "w") as f: 82 json.dump(data, f, indent=2) 83 os.chmod(observer_path, 0o600) 84 return True 85 except OSError: 86 return False 87 88 89def list_observers() -> list[dict]: 90 """List all registered observers. 91 92 Returns: 93 List of observer metadata dicts, sorted by created_at descending 94 """ 95 observers_dir = get_observers_dir() 96 observers = [] 97 for observer_path in observers_dir.glob("*.json"): 98 try: 99 with open(observer_path) as f: 100 data = json.load(f) 101 observers.append(data) 102 except (json.JSONDecodeError, OSError): 103 continue 104 observers.sort(key=lambda x: x.get("created_at", 0), reverse=True) 105 return observers 106 107 108def find_observer_by_name(name: str) -> dict | None: 109 """Find observer metadata by name. 110 111 Args: 112 name: Observer name to search for 113 114 Returns: 115 Observer metadata dict if found, None otherwise 116 """ 117 for observer in list_observers(): 118 if observer.get("name") == name: 119 return observer 120 return None 121 122 123def append_history_record(key_prefix: str, day: str, record: dict) -> None: 124 """Append a record to the sync history file. 125 126 Args: 127 key_prefix: First 8 chars of observer key 128 day: Day string (YYYYMMDD) 129 record: Record to append (will be JSON-serialized) 130 """ 131 hist_dir = get_hist_dir(key_prefix) 132 hist_path = hist_dir / f"{day}.jsonl" 133 with open(hist_path, "a", encoding="utf-8") as f: 134 f.write(json.dumps(record, ensure_ascii=False) + "\n") 135 136 137def load_history(key_prefix: str, day: str) -> list[dict]: 138 """Load sync history for an observer on a given day. 139 140 Args: 141 key_prefix: First 8 chars of observer key 142 day: Day string (YYYYMMDD) 143 144 Returns: 145 List of history records, empty if file doesn't exist 146 """ 147 hist_dir = get_hist_dir(key_prefix, ensure_exists=False) 148 hist_path = hist_dir / f"{day}.jsonl" 149 if not hist_path.exists(): 150 return [] 151 152 records = [] 153 try: 154 with open(hist_path, encoding="utf-8") as f: 155 for line in f: 156 line = line.strip() 157 if line: 158 records.append(json.loads(line)) 159 except (json.JSONDecodeError, OSError) as e: 160 logger.warning(f"Failed to load sync history {hist_path}: {e}") 161 return records 162 163 164def increment_stat(key_prefix: str, stat_name: str) -> None: 165 """Increment a stat counter for an observer. 166 167 Args: 168 key_prefix: First 8 chars of observer key 169 stat_name: Name of the stat to increment (e.g., 'segments_observed') 170 """ 171 observers_dir = get_observers_dir() 172 observer_path = observers_dir / f"{key_prefix}.json" 173 if not observer_path.exists(): 174 return 175 176 try: 177 with open(observer_path) as f: 178 data = json.load(f) 179 180 data["stats"][stat_name] = data["stats"].get(stat_name, 0) + 1 181 182 with open(observer_path, "w") as f: 183 json.dump(data, f, indent=2) 184 os.chmod(observer_path, 0o600) 185 except (json.JSONDecodeError, OSError, KeyError) as e: 186 logger.warning(f"Failed to update {stat_name} for {key_prefix}: {e}") 187 188 189def find_segment_by_sha256( 190 key_prefix: str, day: str, file_sha256s: set[str] 191) -> tuple[str | None, set[str]]: 192 """Find existing segment with matching file SHA256 signatures. 193 194 Searches history records for the given day to find a segment where 195 all provided SHA256 hashes match existing files. 196 197 Args: 198 key_prefix: First 8 chars of observer key 199 day: Day string (YYYYMMDD) 200 file_sha256s: Set of SHA256 hashes to match 201 202 Returns: 203 Tuple of (segment_key, matched_sha256s): 204 - If full match: (segment_key, all sha256s) 205 - If partial match: (None, set of matching sha256s) 206 - If no match: (None, empty set) 207 """ 208 records = load_history(key_prefix, day) 209 if not records: 210 return None, set() 211 212 # Build map of sha256 -> segment for all upload records 213 sha256_to_segment: dict[str, str] = {} 214 segment_sha256s: dict[str, set[str]] = {} 215 216 for record in records: 217 # Skip non-upload records (e.g., "observed" type) 218 if record.get("type"): 219 continue 220 221 segment = record.get("segment", "") 222 if not segment: 223 continue 224 225 if segment not in segment_sha256s: 226 segment_sha256s[segment] = set() 227 228 for file_rec in record.get("files", []): 229 sha256 = file_rec.get("sha256", "") 230 if sha256: 231 sha256_to_segment[sha256] = segment 232 segment_sha256s[segment].add(sha256) 233 234 # Check for full match - all incoming sha256s exist in a single segment 235 for segment, existing_sha256s in segment_sha256s.items(): 236 if file_sha256s and file_sha256s.issubset(existing_sha256s): 237 return segment, file_sha256s 238 239 # Check for partial match - some sha256s already exist 240 matched = file_sha256s & set(sha256_to_segment.keys()) 241 return None, matched