personal memory agent
at main 436 lines 15 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Entity loading functions. 5 6This module handles loading entities from storage: 7- load_entities: Load attached or detected entities for a facet 8- load_all_attached_entities: Load from all facets with deduplication 9- load_entity_names / load_recent_entity_names: For transcription context 10""" 11 12import json 13import os 14import re 15from pathlib import Path 16 17from think.entities.core import ( 18 EntityDict, 19 entity_last_active_ts, 20 entity_slug, 21 is_valid_entity_type, 22) 23from think.entities.journal import load_all_journal_entities 24from think.entities.relationships import ( 25 enrich_relationship_with_journal, 26 load_facet_relationship, 27 scan_facet_relationships, 28) 29from think.utils import get_journal 30 31 32def detected_entities_path(facet: str, day: str) -> Path: 33 """Return path to detected entities file for a facet and day. 34 35 Args: 36 facet: Facet name (e.g., "personal", "work") 37 day: Day in YYYYMMDD format 38 39 Returns: 40 Path to facets/{facet}/entities/{day}.jsonl 41 """ 42 return Path(get_journal()) / "facets" / facet / "entities" / f"{day}.jsonl" 43 44 45def parse_entity_file( 46 file_path: str, *, validate_types: bool = True 47) -> list[EntityDict]: 48 """Parse entities from a JSONL file. 49 50 This is the low-level file parsing function used for detected entity files. 51 Each line in the file should be a JSON object with type, name, and description fields. 52 53 Generates `id` field (slug) for entities that don't have one. 54 55 Args: 56 file_path: Absolute path to entities JSONL file 57 validate_types: If True, filters out invalid entity types (default: True) 58 59 Returns: 60 List of entity dictionaries with id, type, name, and description keys 61 62 Example: 63 >>> parse_entity_file("/path/to/20250101.jsonl") 64 [{"id": "john_smith", "type": "Person", "name": "John Smith", "description": "Friend"}] 65 """ 66 if not os.path.isfile(file_path): 67 return [] 68 69 entities = [] 70 with open(file_path, "r", encoding="utf-8") as f: 71 for line in f: 72 line = line.strip() 73 if not line: 74 continue 75 try: 76 data = json.loads(line) 77 etype = data.get("type", "") 78 name = data.get("name", "") 79 desc = data.get("description", "") 80 81 # Validate if requested 82 if validate_types and not is_valid_entity_type(etype): 83 continue 84 85 # Generate id from name if not present 86 entity_id = data.get("id") or entity_slug(name) 87 88 # Preserve all fields from JSON, ensuring core fields exist 89 # Put id first for readability in JSONL output 90 entity: EntityDict = { 91 "id": entity_id, 92 "type": etype, 93 "name": name, 94 "description": desc, 95 } 96 # Add any additional fields from the JSON 97 for key, value in data.items(): 98 if key not in entity: 99 entity[key] = value 100 101 entities.append(entity) 102 except (json.JSONDecodeError, AttributeError): 103 continue # Skip malformed lines 104 105 return entities 106 107 108def _load_entities_from_relationships( 109 facet: str, *, include_detached: bool = False, include_blocked: bool = False 110) -> list[EntityDict]: 111 """Load attached entities from facet relationships + journal entities. 112 113 Args: 114 facet: Facet name 115 include_detached: If True, includes detached entities 116 include_blocked: If True, includes blocked entities (journal-level block) 117 118 Returns: 119 List of enriched entity dicts 120 """ 121 entity_ids = scan_facet_relationships(facet) 122 if not entity_ids: 123 return [] 124 125 # Load all journal entities for enrichment 126 journal_entities = load_all_journal_entities() 127 128 entities = [] 129 for entity_id in entity_ids: 130 relationship = load_facet_relationship(facet, entity_id) 131 if relationship is None: 132 continue 133 134 # Skip detached if not requested 135 if not include_detached and relationship.get("detached"): 136 continue 137 138 # Enrich with journal entity data 139 journal_entity = journal_entities.get(entity_id) 140 enriched = enrich_relationship_with_journal(relationship, journal_entity) 141 142 # Skip blocked if not requested (blocked is set from journal entity) 143 if not include_blocked and enriched.get("blocked"): 144 continue 145 146 entities.append(enriched) 147 148 return entities 149 150 151def load_entities( 152 facet: str, 153 day: str | None = None, 154 *, 155 include_detached: bool = False, 156 include_blocked: bool = False, 157) -> list[EntityDict]: 158 """Load entities from facet. 159 160 For attached entities (day=None), loads from facet relationships 161 enriched with journal entity data. 162 163 For detected entities (day provided), loads from day-specific JSONL files. 164 165 Args: 166 facet: Facet name 167 day: Optional day in YYYYMMDD format for detected entities 168 include_detached: If True, includes entities with detached=True. 169 Default False excludes detached entities. 170 Only applies to attached entities (day=None). 171 include_blocked: If True, includes entities with blocked=True (journal-level). 172 Default False excludes blocked entities. 173 Only applies to attached entities (day=None). 174 175 Returns: 176 List of entity dictionaries with id, type, name, description, and other fields. 177 178 Example: 179 >>> load_entities("personal") 180 [{"id": "john_smith", "type": "Person", "name": "John Smith", "description": "Friend"}] 181 """ 182 # For detected entities, use day-specific files 183 if day is not None: 184 path = detected_entities_path(facet, day) 185 return parse_entity_file(str(path)) 186 187 # For attached entities, load from relationships 188 return _load_entities_from_relationships( 189 facet, include_detached=include_detached, include_blocked=include_blocked 190 ) 191 192 193def load_all_attached_entities( 194 *, 195 sort_by: str | None = None, 196 limit: int | None = None, 197) -> list[EntityDict]: 198 """Load all attached entities from all facets with deduplication. 199 200 Iterates facets in sorted (alphabetical) order. When the same entity 201 ID appears in multiple facets, keeps the first occurrence. 202 203 Args: 204 sort_by: Optional field to sort by. Currently supports "last_seen" 205 which sorts by recency (entities without the field go to end). 206 limit: Optional maximum number of entities to return (applied after 207 deduplication and sorting). 208 209 Returns: 210 List of entity dictionaries, deduplicated by id 211 212 Example: 213 >>> load_all_attached_entities() 214 [{"id": "john_smith", "type": "Person", "name": "John Smith", ...}, ...] 215 216 >>> load_all_attached_entities(sort_by="last_seen", limit=20) 217 # Returns 20 most recently seen entities 218 219 Note: 220 Used for agent context loading. Provides deterministic behavior 221 despite allowing independent entity descriptions across facets. 222 """ 223 facets_dir = Path(get_journal()) / "facets" 224 if not facets_dir.exists(): 225 return [] 226 227 # Track seen IDs for deduplication (use ID instead of name for uniqueness) 228 seen_ids: set[str] = set() 229 all_entities: list[EntityDict] = [] 230 231 # Process facets in sorted order for deterministic results 232 for facet_path in sorted(facets_dir.iterdir()): 233 if not facet_path.is_dir(): 234 continue 235 236 facet_name = facet_path.name 237 238 for entity in load_entities(facet_name, include_detached=False): 239 entity_id = entity.get("id", "") 240 # Keep first occurrence only (deduplicate by ID) 241 if entity_id and entity_id not in seen_ids: 242 seen_ids.add(entity_id) 243 all_entities.append(entity) 244 245 # Sort if requested 246 if sort_by == "last_seen": 247 # Sort by activity timestamp descending (uses full fallback chain) 248 all_entities.sort( 249 key=entity_last_active_ts, 250 reverse=True, 251 ) 252 253 # Apply limit if requested 254 if limit is not None and limit > 0: 255 all_entities = all_entities[:limit] 256 257 return all_entities 258 259 260def _is_speakable(name: str) -> bool: 261 """Check if a name is suitable for speech recognition vocabularies. 262 263 Allows letters, digits, spaces, periods, hyphens, and apostrophes. 264 Must contain at least one letter (Rev.ai requirement). 265 Rejects underscores and other programming symbols. 266 267 Args: 268 name: The name to check 269 270 Returns: 271 True if the name is speakable (has a letter, no underscores/symbols) 272 """ 273 # Must have at least one letter, only allowed chars, no underscores 274 return bool(re.fullmatch(r"[a-zA-Z0-9\s.\-']+", name)) and any( 275 c.isalpha() for c in name 276 ) 277 278 279def _extract_spoken_names(entities: list[EntityDict]) -> list[str]: 280 """Extract spoken-form names from entity list. 281 282 Extracts shortened forms optimized for audio transcription: 283 - First word from base name (without parentheses) 284 - All items from within parentheses (comma-separated) 285 - Filters out names with underscores or no letters (not speakable) 286 287 Examples: 288 - "Ryan Reed (R2)" → ["Ryan", "R2"] 289 - "Federal Aviation Administration (FAA)" → ["Federal", "FAA"] 290 - "Acme Corp" → ["Acme"] 291 - "send2trash" → ["send2trash"] (allowed: has letters) 292 - "entity_registry" → [] (filtered: contains underscore) 293 294 Args: 295 entities: List of entity dictionaries with "name" and optional "aka" fields 296 297 Returns: 298 List of unique spoken names, preserving insertion order 299 """ 300 spoken_names: list[str] = [] 301 302 def add_if_speakable(name: str) -> None: 303 """Add name to spoken_names if it's speakable and not already present.""" 304 if name and name not in spoken_names and _is_speakable(name): 305 spoken_names.append(name) 306 307 def add_name_variants(name: str) -> None: 308 """Extract and add first word + parenthetical items from a name.""" 309 if not name: 310 return 311 312 # Get base name (without parens) and extract first word 313 base_name = re.sub(r"\s*\([^)]+\)", "", name).strip() 314 first_word = base_name.split()[0] if base_name else None 315 316 # Add first word if speakable 317 add_if_speakable(first_word) 318 319 # Extract and add all items from parens (comma-separated) 320 paren_match = re.search(r"\(([^)]+)\)", name) 321 if paren_match: 322 paren_items = [item.strip() for item in paren_match.group(1).split(",")] 323 for item in paren_items: 324 add_if_speakable(item) 325 326 for entity in entities: 327 name = entity.get("name", "") 328 if name: 329 add_name_variants(name) 330 331 # Process aka list with same logic 332 aka_list = entity.get("aka", []) 333 if isinstance(aka_list, list): 334 for aka_name in aka_list: 335 add_name_variants(aka_name) 336 337 return spoken_names 338 339 340def load_entity_names( 341 *, 342 facet: str | None = None, 343 spoken: bool = False, 344) -> str | list[str] | None: 345 """Load entity names from entities for AI transcription context. 346 347 This function extracts just the entity names (no types or descriptions) from 348 entity files. When spoken=False (default), returns them as a 349 semicolon-delimited string. When spoken=True, returns a list of shortened forms 350 optimized for audio transcription. 351 352 When facet is None, loads and merges entities from ALL facets with 353 deduplication (first occurrence wins when same name appears in multiple facets). 354 355 When spoken=True, uses uniform processing for all entity types: 356 - Extracts first word from base name (without parentheses) 357 - Extracts all items from within parentheses (comma-separated) 358 - Examples: 359 - "Ryan Reed (R2)" → ["Ryan", "R2"] 360 - "Federal Aviation Administration (FAA)" → ["Federal", "FAA"] 361 - "Acme Corp" → ["Acme"] 362 - "pytest" → ["pytest"] 363 364 Args: 365 facet: Optional facet name. If provided, loads from that facet only. 366 If None, loads from ALL facets using load_all_attached_entities(). 367 spoken: If True, returns list of shortened forms for speech recognition. 368 If False, returns semicolon-delimited string of full names. 369 370 Returns: 371 When spoken=False: Semicolon-delimited string of entity names with aka values in parentheses 372 (e.g., "John Smith (Johnny); Acme Corp (ACME, AcmeCo)"), 373 or None if no entities found. 374 When spoken=True: List of shortened entity names for speech, or None if no entities found. 375 """ 376 # Load entities using existing utilities 377 if facet is None: 378 # Load from ALL facets with deduplication 379 entities = load_all_attached_entities() 380 else: 381 # Load from specific facet 382 entities = load_entities(facet) 383 384 if not entities: 385 return None 386 387 # Transform entity dicts into desired format 388 if not spoken: 389 # Non-spoken mode: semicolon-delimited string of full names with aka in parentheses 390 entity_names = [] 391 for entity in entities: 392 name = entity.get("name", "") 393 if name and name not in entity_names: 394 # Check for aka values and append in parentheses 395 aka_list = entity.get("aka", []) 396 if isinstance(aka_list, list) and aka_list: 397 # Format: "Name (aka1, aka2, aka3)" 398 aka_str = ", ".join(aka_list) 399 formatted_name = f"{name} ({aka_str})" 400 else: 401 formatted_name = name 402 entity_names.append(formatted_name) 403 return "; ".join(entity_names) if entity_names else None 404 else: 405 # Spoken mode: list of shortened forms 406 spoken_names = _extract_spoken_names(entities) 407 return spoken_names if spoken_names else None 408 409 410def load_recent_entity_names(*, limit: int = 20) -> list[str] | None: 411 """Load recently active entity names for transcription context. 412 413 Returns spoken-form names from the most recently seen entities across all 414 facets. Caller is responsible for formatting the list as needed. 415 416 Args: 417 limit: Maximum number of entities to include (default 20) 418 419 Returns: 420 List of spoken-form entity names, or None if no entities found. 421 422 Example: 423 >>> load_recent_entity_names(limit=5) 424 ["Alice", "Bob", "R2", "Acme", "FAA"] 425 """ 426 # Get most recently seen entities 427 entities = load_all_attached_entities(sort_by="last_seen", limit=limit) 428 if not entities: 429 return None 430 431 # Extract spoken names 432 spoken_names = _extract_spoken_names(entities) 433 if not spoken_names: 434 return None 435 436 return spoken_names