# SPDX-License-Identifier: AGPL-3.0-only # Copyright (c) 2026 sol pbc """Shared utilities for output extraction hooks. This module provides common functions used by extraction hooks like occurrence.py and anticipation.py in the talent/ directory. """ import json import logging import os from pathlib import Path # Minimum content length for meaningful event extraction MIN_EXTRACTION_CHARS = 50 def should_skip_extraction(result: str, context: dict) -> str | None: """Check if extraction should be skipped and return reason, or None to proceed. Args: result: The generated output markdown content. context: Hook context dict with meta and span. Returns: Skip reason string if extraction should be skipped, None otherwise. """ meta = context.get("meta", {}) # Skip if extraction disabled via journal config if meta.get("extract") is False: return "extraction disabled via journal config" # Skip for JSON output (output IS the structured data) if meta.get("output") == "json": return "JSON output (already structured)" # Skip in span mode (multiple sequential segments) if context.get("span"): return "span mode" # Skip for minimal content if len(result.strip()) < MIN_EXTRACTION_CHARS: return f"minimal content ({len(result.strip())} chars < {MIN_EXTRACTION_CHARS})" return None def log_extraction_failure(e: Exception, name: str) -> None: """Log enhanced diagnostics for extraction generation failures. Handles IncompleteJSONError specially by logging a single-line summary with a head+tail sample and degenerate repetition detection. Args: e: The exception from generate(). name: Generator name for log context. """ from think.models import IncompleteJSONError if not isinstance(e, IncompleteJSONError): logging.error("Extraction generation failed for %s: %s", name, e) return partial = e.partial_text length = len(partial) # Build single-line head+tail sample (newlines collapsed for log grep) def _collapse(s: str) -> str: return s.replace("\n", "\\n").replace("\r", "") if length <= 300: sample = _collapse(partial) else: sample = f"{_collapse(partial[:150])} ... {_collapse(partial[-150:])}" # Repetition detection: count unique chars in last 1000 tail = partial[-1000:] if length >= 1000 else partial unique_count = len(set(tail)) repetition_flag = "" if unique_count < 20: repetition_flag = ( f" [POSSIBLE DEGENERATE REPETITION: " f"{unique_count} unique chars in last {len(tail)}]" ) logging.error( "Extraction generation failed for %s: %s " "(partial_text: %d chars, %d unique in tail%s) sample: %s", name, e, length, unique_count, repetition_flag, sample, ) def write_events_jsonl( events: list[dict], agent: str, occurred: bool, source_output: str, capture_day: str, ) -> list[Path]: """Write events to facet-based JSONL files. Groups events by facet and writes each to the appropriate file: facets/{facet}/events/{event_day}.jsonl Args: events: List of event dictionaries from extraction. agent: Source generator agent (e.g., "meetings", "schedule"). occurred: True for occurrences, False for anticipations. source_output: Relative path to source output file. capture_day: Day the output was captured (YYYYMMDD). Returns: List of paths to written JSONL files. """ from think.utils import get_journal journal = get_journal() # Group events by (facet, event_day) grouped: dict[tuple[str, str], list[dict]] = {} for event in events: facet = event.get("facet", "") if not facet: continue # Skip events without facet # Determine the event day if occurred: # Occurrences use capture day event_day = capture_day else: # Anticipations use their scheduled date event_date = event.get("date", "") # Convert YYYY-MM-DD to YYYYMMDD event_day = event_date.replace("-", "") if event_date else capture_day if not event_day: continue key = (facet, event_day) if key not in grouped: grouped[key] = [] # Enrich event with metadata enriched = dict(event) enriched["agent"] = agent enriched["occurred"] = occurred enriched["source"] = source_output grouped[key].append(enriched) # Write each group to its JSONL file written_paths: list[Path] = [] for (facet, event_day), facet_events in grouped.items(): events_dir = Path(journal) / "facets" / facet / "events" events_dir.mkdir(parents=True, exist_ok=True) jsonl_path = events_dir / f"{event_day}.jsonl" with open(jsonl_path, "a", encoding="utf-8") as f: for event in facet_events: f.write(json.dumps(event, ensure_ascii=False) + "\n") written_paths.append(jsonl_path) return written_paths def compute_output_source(context: dict) -> str: """Compute relative source output path from hook context. Args: context: Hook context dict with day, segment, name, output_path, meta. Returns: Relative path like "20240101/agents/meetings.md". """ from think.talent import get_output_name from think.utils import get_journal day = context.get("day", "") output_path = context.get("output_path", "") name = context.get("name", "unknown") journal = get_journal() try: return os.path.relpath(output_path, journal) except ValueError: segment = context.get("segment") output_name = get_output_name(name) # Check for facet in meta (for multi-facet agents) meta = context.get("meta", {}) facet = meta.get("facet") if meta else None filename = f"{output_name}.md" if segment and facet: return os.path.join(day, segment, "agents", facet, filename) if segment: return os.path.join(day, segment, "agents", filename) if facet: return os.path.join(day, "agents", facet, filename) return os.path.join(day, "agents", filename)