"""DID resolution cache with LRU eviction and TTL support.""" import json import time import asyncio import aiohttp import logging from typing import Optional, Dict, Any from pathlib import Path from threading import Lock from cachetools import TTLCache try: from .models import CacheEntry, DIDDocument, ProfileData except ImportError: # Handle running as script directly import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from models import CacheEntry, DIDDocument, ProfileData logger = logging.getLogger(__name__) class DIDCache: """Thread-safe LRU cache with TTL for DID resolution.""" def __init__(self, max_size: int = 1000, ttl: int = 3600, cache_file: Optional[str] = None): """ Initialize DID cache. Args: max_size: Maximum number of entries to cache ttl: Time-to-live for cache entries in seconds cache_file: Path to persistent cache file """ self.cache = TTLCache(maxsize=max_size, ttl=ttl) self.lock = Lock() self.cache_file = cache_file or str(Path(__file__).parent.parent / "cache" / "did_cache.json") self.session: Optional[aiohttp.ClientSession] = None # Load persistent cache on startup self._load_cache() def _load_cache(self) -> None: """Load cache from disk if it exists.""" try: cache_path = Path(self.cache_file) if cache_path.exists(): with open(cache_path, 'r') as f: data = json.load(f) # Restore non-expired entries current_time = time.time() loaded_count = 0 for did, entry_data in data.items(): try: entry = CacheEntry(**entry_data) if not entry.is_expired: with self.lock: self.cache[did] = entry.value loaded_count += 1 except Exception as e: logger.warning(f"Failed to load cache entry for {did}: {e}") logger.info(f"Loaded {loaded_count} DID cache entries from disk") except Exception as e: logger.warning(f"Failed to load cache from disk: {e}") def _save_cache(self) -> None: """Save cache to disk.""" try: cache_path = Path(self.cache_file) cache_path.parent.mkdir(parents=True, exist_ok=True) # Prepare data for serialization data = {} current_time = time.time() with self.lock: # TTLCache doesn't expose internal data directly in newer versions # Just save current entries with their remaining TTL for did, profile_data in self.cache.items(): # For TTLCache, we can't easily get the exact remaining TTL # So we'll save with a reasonable default data[did] = { "value": profile_data.dict(), "timestamp": current_time, "ttl": self.cache.ttl # Use full TTL for now } with open(cache_path, 'w') as f: json.dump(data, f, indent=2) logger.debug(f"Saved {len(data)} DID cache entries to disk") except Exception as e: logger.error(f"Failed to save cache to disk: {e}") def get(self, did: str) -> Optional[str]: """ Get handle for a DID from cache. Args: did: The DID to look up Returns: Handle if found and not expired, None otherwise """ with self.lock: return self.cache.get(did) def set(self, did: str, handle: str) -> None: """ Set handle for a DID in cache. Args: did: The DID handle: The resolved handle """ with self.lock: self.cache[did] = handle # Save to disk periodically (every 10th entry) if len(self.cache) % 10 == 0: self._save_cache() async def resolve_did(self, did: str, force_refresh: bool = False) -> Optional[str]: """ Resolve a DID to a handle, using cache when possible. Args: did: The DID to resolve force_refresh: If True, bypass cache and fetch fresh data Returns: Handle if resolution succeeds, None otherwise """ # Check cache first unless force refresh if not force_refresh: cached = self.get(did) if cached: logger.debug(f"Cache hit for DID {did} -> {cached}") return cached # Resolve via API try: handle = await self._resolve_did_api(did) if handle: self.set(did, handle) logger.debug(f"Resolved DID {did} -> {handle}") return handle except Exception as e: logger.warning(f"Failed to resolve DID {did}: {e}") return None async def _resolve_did_api(self, did: str) -> Optional[str]: """ Resolve DID via ATProto identity API. Args: did: The DID to resolve Returns: Handle if found, None otherwise """ if not self.session: self.session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=10), headers={ 'User-Agent': 'Mozilla/5.0 (compatible; thought.stream/1.0)', 'Accept': 'application/json' } ) try: url = f"https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile" params = {"actor": did} async with self.session.get(url, params=params) as response: if response.status == 200: data = await response.json() # Extract handle and display name from profile handle = data.get('handle') display_name = data.get('displayName') if handle: return ProfileData( handle=handle, display_name=display_name ) else: logger.warning(f"No handle found in profile for {did}") return None else: logger.warning(f"Profile fetch failed with status {response.status} for {did}") return None except asyncio.TimeoutError: logger.warning(f"Timeout resolving DID {did}") return None except Exception as e: logger.warning(f"Error resolving DID {did}: {e}") return None async def resolve_batch(self, dids: list[str]) -> Dict[str, Optional[ProfileData]]: """ Resolve multiple DIDs concurrently. Args: dids: List of DIDs to resolve Returns: Dict mapping DID to ProfileData (or None if resolution failed) """ tasks = [self.resolve_did(did) for did in dids] results = await asyncio.gather(*tasks, return_exceptions=True) resolved = {} for did, result in zip(dids, results): if isinstance(result, Exception): logger.warning(f"Exception resolving {did}: {result}") resolved[did] = None else: resolved[did] = result return resolved async def close(self) -> None: """Close the cache and cleanup resources.""" if self.session: await self.session.close() self.session = None # Save cache to disk self._save_cache() def stats(self) -> Dict[str, Any]: """Get cache statistics.""" with self.lock: return { "size": len(self.cache), "max_size": self.cache.maxsize, "ttl": self.cache.ttl, "hits": getattr(self.cache, 'hits', 0), "misses": getattr(self.cache, 'misses', 0) } def clear(self) -> None: """Clear all cache entries.""" with self.lock: self.cache.clear() logger.info("DID cache cleared")