""" Temporal block management with ATProto repository synchronization. Manages journal blocks in both Letta memory and Bluesky's ATProto repository. """ import os import json import logging import requests from datetime import date from typing import Optional, Tuple, List, Dict, Any from letta_client import Letta from pathlib import Path logger = logging.getLogger(__name__) def get_temporal_labels(today: Optional[date] = None, agent_id: Optional[str] = None) -> Tuple[str, str, str]: """ Generate temporal block labels using valid Letta block names with agent scoping. Args: today: Date to generate labels for (defaults to today) agent_id: Agent ID to scope the blocks (uses first 16 chars as prefix) Returns: Tuple of (day_label, month_label, year_label) """ if today is None: today = date.today() # Use first 16 characters of agent ID as prefix for uniqueness agent_prefix = "" if agent_id: agent_prefix = f"{agent_id[:16]}_" day_label = f"{agent_prefix}stream_thought_journal_day_{today.strftime('%Y_%m_%d')}" month_label = f"{agent_prefix}stream_thought_journal_month_{today.strftime('%Y_%m')}" year_label = f"{agent_prefix}stream_thought_journal_year_{today.year}" return day_label, month_label, year_label def get_temporal_rkeys(today: Optional[date] = None) -> Tuple[str, str, str]: """ Generate rkeys for temporal records. Returns: Tuple of (day_rkey, month_rkey, year_rkey) """ if today is None: today = date.today() day_rkey = f"day-{today.strftime('%Y-%m-%d')}" month_rkey = f"month-{today.strftime('%Y-%m')}" year_rkey = f"year-{today.year}" return day_rkey, month_rkey, year_rkey def get_atproto_auth() -> Tuple[str, str, str]: """ Get ATProto authentication details from configuration and session. Returns: Tuple of (access_token, did, pds_host) """ from config_loader import get_bluesky_config # Get configuration bsky_config = get_bluesky_config() username = bsky_config['username'] pds_host = bsky_config['pds_uri'] # Try to get existing session session_file = Path(f"session_{username}.txt") if not session_file.exists(): raise ValueError(f"No session file found for {username}. Bot must be running.") with open(session_file, 'r') as f: session_string = f.read().strip() # Parse the session string format: username:::did:::accessJwt:::refreshJwt:::pds_host parts = session_string.split(':::') if len(parts) < 5: raise ValueError("Invalid session format") did = parts[1] access_token = parts[2] if not access_token or not did: raise ValueError("Invalid session data") return access_token, did, pds_host def sync_temporal_block_to_atproto(label: str, content: str) -> bool: """ Sync a temporal block to ATProto repository. Args: label: The block label (e.g., "a1b2c3d4_stream_thought_journal_day_2025_08_23") content: The block content Returns: True if successful, False otherwise """ try: # Parse label to get collection and rkey (handle agent prefix) if "_stream_thought_journal_day_" in label: # Extract date from agent_prefix_stream_thought_journal_day_2025_08_23 parts = label.split("_stream_thought_journal_day_")[1] # Gets "2025_08_23" date_parts = parts.split("_") # Gets ["2025", "08", "23"] collection = "stream.thought.journal.day" rkey = f"{date_parts[0]}-{date_parts[1]}-{date_parts[2]}" # "2025-08-23" elif "_stream_thought_journal_month_" in label: # Extract month from agent_prefix_stream_thought_journal_month_2025_08 parts = label.split("_stream_thought_journal_month_")[1] # Gets "2025_08" date_parts = parts.split("_") # Gets ["2025", "08"] collection = "stream.thought.journal.month" rkey = f"{date_parts[0]}-{date_parts[1]}" # "2025-08" elif "_stream_thought_journal_year_" in label: # Extract year from agent_prefix_stream_thought_journal_year_2025 year = label.split("_stream_thought_journal_year_")[1] # Gets "2025" collection = "stream.thought.journal.year" rkey = year # Just use "2025" as rkey else: logger.error(f"Invalid temporal label format: {label}") return False access_token, did, pds_host = get_atproto_auth() # Create the journal record - simple structure journal_record = { "$type": collection, "content": content, "createdAt": date.today().isoformat() + "T00:00:00.000Z" } # Use putRecord to create or update with consistent rkey headers = {"Authorization": f"Bearer {access_token}"} put_record_url = f"{pds_host}/xrpc/com.atproto.repo.putRecord" put_data = { "repo": did, "collection": collection, "rkey": rkey, "record": journal_record, "validate": False # Don't validate against lexicon } response = requests.post(put_record_url, headers=headers, json=put_data, timeout=10) if response.status_code == 200: result = response.json() logger.info(f"Synced temporal block to ATProto: {collection}/{rkey} (CID: {result.get('cid')})") return True else: logger.error(f"Failed to sync temporal block {collection}/{rkey}: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Error syncing temporal block to ATProto: {e}") return False def attach_temporal_blocks(client: Letta, agent_id: str) -> Tuple[bool, List[str]]: """ Attach temporal journal blocks to the agent for synthesis. Creates blocks if they don't exist and syncs with ATProto. Args: client: Letta client agent_id: Agent ID Returns: Tuple of (success: bool, attached_labels: list) """ try: today = date.today() # Generate temporal block labels with agent scoping day_label, month_label, year_label = get_temporal_labels(today, agent_id) temporal_labels = [day_label, month_label, year_label] attached_labels = [] # Get current blocks attached to agent current_blocks = client.agents.blocks.list(agent_id=agent_id) current_block_labels = {block.label for block in current_blocks} current_block_ids = {str(block.id) for block in current_blocks} for label in temporal_labels: try: # Skip if already attached if label in current_block_labels: logger.debug(f"Temporal block already attached: {label}") attached_labels.append(label) continue # Check if block exists globally blocks = client.blocks.list(label=label) if blocks and len(blocks) > 0: block = blocks[0] # Check if already attached by ID if str(block.id) in current_block_ids: logger.debug(f"Temporal block already attached by ID: {label}") attached_labels.append(label) continue else: # Create new temporal block with appropriate header if "day/" in label: header = f"# Daily Journal - {today.strftime('%B %d, %Y')}" initial_content = f"{header}\n\nNo entries yet for today." elif "month/" in label: header = f"# Monthly Journal - {today.strftime('%B %Y')}" initial_content = f"{header}\n\nNo entries yet for this month." else: # year header = f"# Yearly Journal - {today.year}" initial_content = f"{header}\n\nNo entries yet for this year." block = client.blocks.create( label=label, value=initial_content, limit=10000 # Larger limit for journal blocks ) logger.info(f"Created new temporal block: {label}") # Sync new block to ATProto sync_temporal_block_to_atproto(label, initial_content) # Attach the block client.agents.blocks.attach( agent_id=agent_id, block_id=str(block.id) ) attached_labels.append(label) logger.info(f"Attached temporal block: {label}") except Exception as e: # Check for duplicate constraint errors error_str = str(e) if "duplicate key value violates unique constraint" in error_str: logger.debug(f"Temporal block already attached (constraint): {label}") attached_labels.append(label) else: logger.warning(f"Failed to attach temporal block {label}: {e}") logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}") return True, attached_labels except Exception as e: logger.error(f"Error attaching temporal blocks: {e}") return False, [] def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: Optional[List[str]] = None) -> bool: """ Detach temporal journal blocks from the agent after synthesis. Syncs final content to ATProto before detaching. Args: client: Letta client agent_id: Agent ID labels_to_detach: Optional list of specific labels to detach Returns: bool: Success status """ try: # If no specific labels provided, generate today's labels if labels_to_detach is None: labels_to_detach = list(get_temporal_labels(agent_id=agent_id)) # Get current blocks and build label to ID mapping current_blocks = client.agents.blocks.list(agent_id=agent_id) block_label_to_id = {block.label: str(block.id) for block in current_blocks} block_label_to_content = {block.label: block.value for block in current_blocks} detached_count = 0 for label in labels_to_detach: if label in block_label_to_id: try: # Sync current content to ATProto before detaching content = block_label_to_content.get(label, "") if content: sync_temporal_block_to_atproto(label, content) # Detach from agent client.agents.blocks.detach( agent_id=agent_id, block_id=block_label_to_id[label] ) detached_count += 1 logger.info(f"Detached temporal block: {label}") except Exception as e: logger.warning(f"Failed to detach temporal block {label}: {e}") else: logger.debug(f"Temporal block not attached, skipping: {label}") logger.info(f"Temporal blocks detached: {detached_count}/{len(labels_to_detach)}") return detached_count > 0 except Exception as e: logger.error(f"Error detaching temporal blocks: {e}") return False def update_temporal_blocks_after_synthesis(client: Letta, agent_id: str, attached_labels: List[str]) -> bool: """ Update temporal blocks in ATProto after synthesis completes. Args: client: Letta client agent_id: Agent ID attached_labels: List of labels that were attached during synthesis Returns: bool: Success status """ try: # Get current blocks to retrieve updated content current_blocks = client.agents.blocks.list(agent_id=agent_id) block_content = {block.label: block.value for block in current_blocks} synced_count = 0 for label in attached_labels: if label in block_content: content = block_content[label] if sync_temporal_block_to_atproto(label, content): synced_count += 1 logger.debug(f"Synced updated content for {label}") else: logger.warning(f"Could not find content for label {label}") logger.info(f"Synced {synced_count}/{len(attached_labels)} temporal blocks to ATProto") return synced_count > 0 except Exception as e: logger.error(f"Error updating temporal blocks after synthesis: {e}") return False