a digital person for bluesky
1"""
2Temporal block management with ATProto repository synchronization.
3Manages journal blocks in both Letta memory and Bluesky's ATProto repository.
4"""
5
6import os
7import json
8import logging
9import requests
10from datetime import date
11from typing import Optional, Tuple, List, Dict, Any
12from letta_client import Letta
13from pathlib import Path
14
15logger = logging.getLogger(__name__)
16
17
18def get_temporal_labels(today: Optional[date] = None, agent_id: Optional[str] = None) -> Tuple[str, str, str]:
19 """
20 Generate temporal block labels using valid Letta block names with agent scoping.
21
22 Args:
23 today: Date to generate labels for (defaults to today)
24 agent_id: Agent ID to scope the blocks (uses first 16 chars as prefix)
25
26 Returns:
27 Tuple of (day_label, month_label, year_label)
28 """
29 if today is None:
30 today = date.today()
31
32 # Use first 16 characters of agent ID as prefix for uniqueness
33 agent_prefix = ""
34 if agent_id:
35 agent_prefix = f"{agent_id[:16]}_"
36
37 day_label = f"{agent_prefix}stream_thought_journal_day_{today.strftime('%Y_%m_%d')}"
38 month_label = f"{agent_prefix}stream_thought_journal_month_{today.strftime('%Y_%m')}"
39 year_label = f"{agent_prefix}stream_thought_journal_year_{today.year}"
40
41 return day_label, month_label, year_label
42
43
44def get_temporal_rkeys(today: Optional[date] = None) -> Tuple[str, str, str]:
45 """
46 Generate rkeys for temporal records.
47
48 Returns:
49 Tuple of (day_rkey, month_rkey, year_rkey)
50 """
51 if today is None:
52 today = date.today()
53
54 day_rkey = f"day-{today.strftime('%Y-%m-%d')}"
55 month_rkey = f"month-{today.strftime('%Y-%m')}"
56 year_rkey = f"year-{today.year}"
57
58 return day_rkey, month_rkey, year_rkey
59
60
61def get_atproto_auth() -> Tuple[str, str, str]:
62 """
63 Get ATProto authentication details from configuration and session.
64
65 Returns:
66 Tuple of (access_token, did, pds_host)
67 """
68 from config_loader import get_bluesky_config
69
70 # Get configuration
71 bsky_config = get_bluesky_config()
72 username = bsky_config['username']
73 pds_host = bsky_config['pds_uri']
74
75 # Try to get existing session
76 session_file = Path(f"session_{username}.txt")
77 if not session_file.exists():
78 raise ValueError(f"No session file found for {username}. Bot must be running.")
79
80 with open(session_file, 'r') as f:
81 session_string = f.read().strip()
82
83 # Parse the session string format: username:::did:::accessJwt:::refreshJwt:::pds_host
84 parts = session_string.split(':::')
85 if len(parts) < 5:
86 raise ValueError("Invalid session format")
87
88 did = parts[1]
89 access_token = parts[2]
90
91 if not access_token or not did:
92 raise ValueError("Invalid session data")
93
94 return access_token, did, pds_host
95
96
97
98
99def sync_temporal_block_to_atproto(label: str, content: str) -> bool:
100 """
101 Sync a temporal block to ATProto repository.
102
103 Args:
104 label: The block label (e.g., "a1b2c3d4_stream_thought_journal_day_2025_08_23")
105 content: The block content
106
107 Returns:
108 True if successful, False otherwise
109 """
110 try:
111 # Parse label to get collection and rkey (handle agent prefix)
112 if "_stream_thought_journal_day_" in label:
113 # Extract date from agent_prefix_stream_thought_journal_day_2025_08_23
114 parts = label.split("_stream_thought_journal_day_")[1] # Gets "2025_08_23"
115 date_parts = parts.split("_") # Gets ["2025", "08", "23"]
116 collection = "stream.thought.journal.day"
117 rkey = f"{date_parts[0]}-{date_parts[1]}-{date_parts[2]}" # "2025-08-23"
118 elif "_stream_thought_journal_month_" in label:
119 # Extract month from agent_prefix_stream_thought_journal_month_2025_08
120 parts = label.split("_stream_thought_journal_month_")[1] # Gets "2025_08"
121 date_parts = parts.split("_") # Gets ["2025", "08"]
122 collection = "stream.thought.journal.month"
123 rkey = f"{date_parts[0]}-{date_parts[1]}" # "2025-08"
124 elif "_stream_thought_journal_year_" in label:
125 # Extract year from agent_prefix_stream_thought_journal_year_2025
126 year = label.split("_stream_thought_journal_year_")[1] # Gets "2025"
127 collection = "stream.thought.journal.year"
128 rkey = year # Just use "2025" as rkey
129 else:
130 logger.error(f"Invalid temporal label format: {label}")
131 return False
132
133 access_token, did, pds_host = get_atproto_auth()
134
135 # Create the journal record - simple structure
136 journal_record = {
137 "$type": collection,
138 "content": content,
139 "createdAt": date.today().isoformat() + "T00:00:00.000Z"
140 }
141
142 # Use putRecord to create or update with consistent rkey
143 headers = {"Authorization": f"Bearer {access_token}"}
144 put_record_url = f"{pds_host}/xrpc/com.atproto.repo.putRecord"
145
146 put_data = {
147 "repo": did,
148 "collection": collection,
149 "rkey": rkey,
150 "record": journal_record,
151 "validate": False # Don't validate against lexicon
152 }
153
154 response = requests.post(put_record_url, headers=headers, json=put_data, timeout=10)
155
156 if response.status_code == 200:
157 result = response.json()
158 logger.info(f"Synced temporal block to ATProto: {collection}/{rkey} (CID: {result.get('cid')})")
159 return True
160 else:
161 logger.error(f"Failed to sync temporal block {collection}/{rkey}: {response.status_code} - {response.text}")
162 return False
163
164 except Exception as e:
165 logger.error(f"Error syncing temporal block to ATProto: {e}")
166 return False
167
168
169def attach_temporal_blocks(client: Letta, agent_id: str) -> Tuple[bool, List[str]]:
170 """
171 Attach temporal journal blocks to the agent for synthesis.
172 Creates blocks if they don't exist and syncs with ATProto.
173
174 Args:
175 client: Letta client
176 agent_id: Agent ID
177
178 Returns:
179 Tuple of (success: bool, attached_labels: list)
180 """
181 try:
182 today = date.today()
183
184 # Generate temporal block labels with agent scoping
185 day_label, month_label, year_label = get_temporal_labels(today, agent_id)
186
187 temporal_labels = [day_label, month_label, year_label]
188 attached_labels = []
189
190 # Get current blocks attached to agent
191 current_blocks = client.agents.blocks.list(agent_id=agent_id)
192 current_block_labels = {block.label for block in current_blocks}
193 current_block_ids = {str(block.id) for block in current_blocks}
194
195 for label in temporal_labels:
196 try:
197 # Skip if already attached
198 if label in current_block_labels:
199 logger.debug(f"Temporal block already attached: {label}")
200 attached_labels.append(label)
201 continue
202
203 # Check if block exists globally
204 blocks = client.blocks.list(label=label)
205
206 if blocks and len(blocks) > 0:
207 block = blocks[0]
208 # Check if already attached by ID
209 if str(block.id) in current_block_ids:
210 logger.debug(f"Temporal block already attached by ID: {label}")
211 attached_labels.append(label)
212 continue
213 else:
214 # Create new temporal block with appropriate header
215 if "day/" in label:
216 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}"
217 initial_content = f"{header}\n\nNo entries yet for today."
218 elif "month/" in label:
219 header = f"# Monthly Journal - {today.strftime('%B %Y')}"
220 initial_content = f"{header}\n\nNo entries yet for this month."
221 else: # year
222 header = f"# Yearly Journal - {today.year}"
223 initial_content = f"{header}\n\nNo entries yet for this year."
224
225 block = client.blocks.create(
226 label=label,
227 value=initial_content,
228 limit=10000 # Larger limit for journal blocks
229 )
230 logger.info(f"Created new temporal block: {label}")
231
232 # Sync new block to ATProto
233 sync_temporal_block_to_atproto(label, initial_content)
234
235 # Attach the block
236 client.agents.blocks.attach(
237 agent_id=agent_id,
238 block_id=str(block.id)
239 )
240 attached_labels.append(label)
241 logger.info(f"Attached temporal block: {label}")
242
243 except Exception as e:
244 # Check for duplicate constraint errors
245 error_str = str(e)
246 if "duplicate key value violates unique constraint" in error_str:
247 logger.debug(f"Temporal block already attached (constraint): {label}")
248 attached_labels.append(label)
249 else:
250 logger.warning(f"Failed to attach temporal block {label}: {e}")
251
252 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}")
253 return True, attached_labels
254
255 except Exception as e:
256 logger.error(f"Error attaching temporal blocks: {e}")
257 return False, []
258
259
260def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: Optional[List[str]] = None) -> bool:
261 """
262 Detach temporal journal blocks from the agent after synthesis.
263 Syncs final content to ATProto before detaching.
264
265 Args:
266 client: Letta client
267 agent_id: Agent ID
268 labels_to_detach: Optional list of specific labels to detach
269
270 Returns:
271 bool: Success status
272 """
273 try:
274 # If no specific labels provided, generate today's labels
275 if labels_to_detach is None:
276 labels_to_detach = list(get_temporal_labels(agent_id=agent_id))
277
278 # Get current blocks and build label to ID mapping
279 current_blocks = client.agents.blocks.list(agent_id=agent_id)
280 block_label_to_id = {block.label: str(block.id) for block in current_blocks}
281 block_label_to_content = {block.label: block.value for block in current_blocks}
282
283 detached_count = 0
284 for label in labels_to_detach:
285 if label in block_label_to_id:
286 try:
287 # Sync current content to ATProto before detaching
288 content = block_label_to_content.get(label, "")
289 if content:
290 sync_temporal_block_to_atproto(label, content)
291
292 # Detach from agent
293 client.agents.blocks.detach(
294 agent_id=agent_id,
295 block_id=block_label_to_id[label]
296 )
297 detached_count += 1
298 logger.info(f"Detached temporal block: {label}")
299 except Exception as e:
300 logger.warning(f"Failed to detach temporal block {label}: {e}")
301 else:
302 logger.debug(f"Temporal block not attached, skipping: {label}")
303
304 logger.info(f"Temporal blocks detached: {detached_count}/{len(labels_to_detach)}")
305 return detached_count > 0
306
307 except Exception as e:
308 logger.error(f"Error detaching temporal blocks: {e}")
309 return False
310
311
312def update_temporal_blocks_after_synthesis(client: Letta, agent_id: str, attached_labels: List[str]) -> bool:
313 """
314 Update temporal blocks in ATProto after synthesis completes.
315
316 Args:
317 client: Letta client
318 agent_id: Agent ID
319 attached_labels: List of labels that were attached during synthesis
320
321 Returns:
322 bool: Success status
323 """
324 try:
325 # Get current blocks to retrieve updated content
326 current_blocks = client.agents.blocks.list(agent_id=agent_id)
327 block_content = {block.label: block.value for block in current_blocks}
328
329 synced_count = 0
330 for label in attached_labels:
331 if label in block_content:
332 content = block_content[label]
333 if sync_temporal_block_to_atproto(label, content):
334 synced_count += 1
335 logger.debug(f"Synced updated content for {label}")
336 else:
337 logger.warning(f"Could not find content for label {label}")
338
339 logger.info(f"Synced {synced_count}/{len(attached_labels)} temporal blocks to ATProto")
340 return synced_count > 0
341
342 except Exception as e:
343 logger.error(f"Error updating temporal blocks after synthesis: {e}")
344 return False