a digital person for bluesky
at x 13 kB view raw
1""" 2Temporal block management with ATProto repository synchronization. 3Manages journal blocks in both Letta memory and Bluesky's ATProto repository. 4""" 5 6import os 7import json 8import logging 9import requests 10from datetime import date 11from typing import Optional, Tuple, List, Dict, Any 12from letta_client import Letta 13from pathlib import Path 14 15logger = logging.getLogger(__name__) 16 17 18def get_temporal_labels(today: Optional[date] = None, agent_id: Optional[str] = None) -> Tuple[str, str, str]: 19 """ 20 Generate temporal block labels using valid Letta block names with agent scoping. 21 22 Args: 23 today: Date to generate labels for (defaults to today) 24 agent_id: Agent ID to scope the blocks (uses first 16 chars as prefix) 25 26 Returns: 27 Tuple of (day_label, month_label, year_label) 28 """ 29 if today is None: 30 today = date.today() 31 32 # Use first 16 characters of agent ID as prefix for uniqueness 33 agent_prefix = "" 34 if agent_id: 35 agent_prefix = f"{agent_id[:16]}_" 36 37 day_label = f"{agent_prefix}stream_thought_journal_day_{today.strftime('%Y_%m_%d')}" 38 month_label = f"{agent_prefix}stream_thought_journal_month_{today.strftime('%Y_%m')}" 39 year_label = f"{agent_prefix}stream_thought_journal_year_{today.year}" 40 41 return day_label, month_label, year_label 42 43 44def get_temporal_rkeys(today: Optional[date] = None) -> Tuple[str, str, str]: 45 """ 46 Generate rkeys for temporal records. 47 48 Returns: 49 Tuple of (day_rkey, month_rkey, year_rkey) 50 """ 51 if today is None: 52 today = date.today() 53 54 day_rkey = f"day-{today.strftime('%Y-%m-%d')}" 55 month_rkey = f"month-{today.strftime('%Y-%m')}" 56 year_rkey = f"year-{today.year}" 57 58 return day_rkey, month_rkey, year_rkey 59 60 61def get_atproto_auth() -> Tuple[str, str, str]: 62 """ 63 Get ATProto authentication details from configuration and session. 64 65 Returns: 66 Tuple of (access_token, did, pds_host) 67 """ 68 from config_loader import get_bluesky_config 69 70 # Get configuration 71 bsky_config = get_bluesky_config() 72 username = bsky_config['username'] 73 pds_host = bsky_config['pds_uri'] 74 75 # Try to get existing session 76 session_file = Path(f"session_{username}.txt") 77 if not session_file.exists(): 78 raise ValueError(f"No session file found for {username}. Bot must be running.") 79 80 with open(session_file, 'r') as f: 81 session_string = f.read().strip() 82 83 # Parse the session string format: username:::did:::accessJwt:::refreshJwt:::pds_host 84 parts = session_string.split(':::') 85 if len(parts) < 5: 86 raise ValueError("Invalid session format") 87 88 did = parts[1] 89 access_token = parts[2] 90 91 if not access_token or not did: 92 raise ValueError("Invalid session data") 93 94 return access_token, did, pds_host 95 96 97 98 99def sync_temporal_block_to_atproto(label: str, content: str) -> bool: 100 """ 101 Sync a temporal block to ATProto repository. 102 103 Args: 104 label: The block label (e.g., "a1b2c3d4_stream_thought_journal_day_2025_08_23") 105 content: The block content 106 107 Returns: 108 True if successful, False otherwise 109 """ 110 try: 111 # Parse label to get collection and rkey (handle agent prefix) 112 if "_stream_thought_journal_day_" in label: 113 # Extract date from agent_prefix_stream_thought_journal_day_2025_08_23 114 parts = label.split("_stream_thought_journal_day_")[1] # Gets "2025_08_23" 115 date_parts = parts.split("_") # Gets ["2025", "08", "23"] 116 collection = "stream.thought.journal.day" 117 rkey = f"{date_parts[0]}-{date_parts[1]}-{date_parts[2]}" # "2025-08-23" 118 elif "_stream_thought_journal_month_" in label: 119 # Extract month from agent_prefix_stream_thought_journal_month_2025_08 120 parts = label.split("_stream_thought_journal_month_")[1] # Gets "2025_08" 121 date_parts = parts.split("_") # Gets ["2025", "08"] 122 collection = "stream.thought.journal.month" 123 rkey = f"{date_parts[0]}-{date_parts[1]}" # "2025-08" 124 elif "_stream_thought_journal_year_" in label: 125 # Extract year from agent_prefix_stream_thought_journal_year_2025 126 year = label.split("_stream_thought_journal_year_")[1] # Gets "2025" 127 collection = "stream.thought.journal.year" 128 rkey = year # Just use "2025" as rkey 129 else: 130 logger.error(f"Invalid temporal label format: {label}") 131 return False 132 133 access_token, did, pds_host = get_atproto_auth() 134 135 # Create the journal record - simple structure 136 journal_record = { 137 "$type": collection, 138 "content": content, 139 "createdAt": date.today().isoformat() + "T00:00:00.000Z" 140 } 141 142 # Use putRecord to create or update with consistent rkey 143 headers = {"Authorization": f"Bearer {access_token}"} 144 put_record_url = f"{pds_host}/xrpc/com.atproto.repo.putRecord" 145 146 put_data = { 147 "repo": did, 148 "collection": collection, 149 "rkey": rkey, 150 "record": journal_record, 151 "validate": False # Don't validate against lexicon 152 } 153 154 response = requests.post(put_record_url, headers=headers, json=put_data, timeout=10) 155 156 if response.status_code == 200: 157 result = response.json() 158 logger.info(f"Synced temporal block to ATProto: {collection}/{rkey} (CID: {result.get('cid')})") 159 return True 160 else: 161 logger.error(f"Failed to sync temporal block {collection}/{rkey}: {response.status_code} - {response.text}") 162 return False 163 164 except Exception as e: 165 logger.error(f"Error syncing temporal block to ATProto: {e}") 166 return False 167 168 169def attach_temporal_blocks(client: Letta, agent_id: str) -> Tuple[bool, List[str]]: 170 """ 171 Attach temporal journal blocks to the agent for synthesis. 172 Creates blocks if they don't exist and syncs with ATProto. 173 174 Args: 175 client: Letta client 176 agent_id: Agent ID 177 178 Returns: 179 Tuple of (success: bool, attached_labels: list) 180 """ 181 try: 182 today = date.today() 183 184 # Generate temporal block labels with agent scoping 185 day_label, month_label, year_label = get_temporal_labels(today, agent_id) 186 187 temporal_labels = [day_label, month_label, year_label] 188 attached_labels = [] 189 190 # Get current blocks attached to agent 191 current_blocks = client.agents.blocks.list(agent_id=agent_id) 192 current_block_labels = {block.label for block in current_blocks} 193 current_block_ids = {str(block.id) for block in current_blocks} 194 195 for label in temporal_labels: 196 try: 197 # Skip if already attached 198 if label in current_block_labels: 199 logger.debug(f"Temporal block already attached: {label}") 200 attached_labels.append(label) 201 continue 202 203 # Check if block exists globally 204 blocks = client.blocks.list(label=label) 205 206 if blocks and len(blocks) > 0: 207 block = blocks[0] 208 # Check if already attached by ID 209 if str(block.id) in current_block_ids: 210 logger.debug(f"Temporal block already attached by ID: {label}") 211 attached_labels.append(label) 212 continue 213 else: 214 # Create new temporal block with appropriate header 215 if "day/" in label: 216 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}" 217 initial_content = f"{header}\n\nNo entries yet for today." 218 elif "month/" in label: 219 header = f"# Monthly Journal - {today.strftime('%B %Y')}" 220 initial_content = f"{header}\n\nNo entries yet for this month." 221 else: # year 222 header = f"# Yearly Journal - {today.year}" 223 initial_content = f"{header}\n\nNo entries yet for this year." 224 225 block = client.blocks.create( 226 label=label, 227 value=initial_content, 228 limit=10000 # Larger limit for journal blocks 229 ) 230 logger.info(f"Created new temporal block: {label}") 231 232 # Sync new block to ATProto 233 sync_temporal_block_to_atproto(label, initial_content) 234 235 # Attach the block 236 client.agents.blocks.attach( 237 agent_id=agent_id, 238 block_id=str(block.id) 239 ) 240 attached_labels.append(label) 241 logger.info(f"Attached temporal block: {label}") 242 243 except Exception as e: 244 # Check for duplicate constraint errors 245 error_str = str(e) 246 if "duplicate key value violates unique constraint" in error_str: 247 logger.debug(f"Temporal block already attached (constraint): {label}") 248 attached_labels.append(label) 249 else: 250 logger.warning(f"Failed to attach temporal block {label}: {e}") 251 252 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}") 253 return True, attached_labels 254 255 except Exception as e: 256 logger.error(f"Error attaching temporal blocks: {e}") 257 return False, [] 258 259 260def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: Optional[List[str]] = None) -> bool: 261 """ 262 Detach temporal journal blocks from the agent after synthesis. 263 Syncs final content to ATProto before detaching. 264 265 Args: 266 client: Letta client 267 agent_id: Agent ID 268 labels_to_detach: Optional list of specific labels to detach 269 270 Returns: 271 bool: Success status 272 """ 273 try: 274 # If no specific labels provided, generate today's labels 275 if labels_to_detach is None: 276 labels_to_detach = list(get_temporal_labels(agent_id=agent_id)) 277 278 # Get current blocks and build label to ID mapping 279 current_blocks = client.agents.blocks.list(agent_id=agent_id) 280 block_label_to_id = {block.label: str(block.id) for block in current_blocks} 281 block_label_to_content = {block.label: block.value for block in current_blocks} 282 283 detached_count = 0 284 for label in labels_to_detach: 285 if label in block_label_to_id: 286 try: 287 # Sync current content to ATProto before detaching 288 content = block_label_to_content.get(label, "") 289 if content: 290 sync_temporal_block_to_atproto(label, content) 291 292 # Detach from agent 293 client.agents.blocks.detach( 294 agent_id=agent_id, 295 block_id=block_label_to_id[label] 296 ) 297 detached_count += 1 298 logger.info(f"Detached temporal block: {label}") 299 except Exception as e: 300 logger.warning(f"Failed to detach temporal block {label}: {e}") 301 else: 302 logger.debug(f"Temporal block not attached, skipping: {label}") 303 304 logger.info(f"Temporal blocks detached: {detached_count}/{len(labels_to_detach)}") 305 return detached_count > 0 306 307 except Exception as e: 308 logger.error(f"Error detaching temporal blocks: {e}") 309 return False 310 311 312def update_temporal_blocks_after_synthesis(client: Letta, agent_id: str, attached_labels: List[str]) -> bool: 313 """ 314 Update temporal blocks in ATProto after synthesis completes. 315 316 Args: 317 client: Letta client 318 agent_id: Agent ID 319 attached_labels: List of labels that were attached during synthesis 320 321 Returns: 322 bool: Success status 323 """ 324 try: 325 # Get current blocks to retrieve updated content 326 current_blocks = client.agents.blocks.list(agent_id=agent_id) 327 block_content = {block.label: block.value for block in current_blocks} 328 329 synced_count = 0 330 for label in attached_labels: 331 if label in block_content: 332 content = block_content[label] 333 if sync_temporal_block_to_atproto(label, content): 334 synced_count += 1 335 logger.debug(f"Synced updated content for {label}") 336 else: 337 logger.warning(f"Could not find content for label {label}") 338 339 logger.info(f"Synced {synced_count}/{len(attached_labels)} temporal blocks to ATProto") 340 return synced_count > 0 341 342 except Exception as e: 343 logger.error(f"Error updating temporal blocks after synthesis: {e}") 344 return False