a digital person for bluesky
42
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add agent listing utility and X profile analysis tool

- Add list_agents.py for querying Letta agents with filtering capabilities
- Add tools/x_profile.py for analyzing X user profiles using Bluesky agent
- Update bsky.py with minor improvements to queue processing

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

+235 -57
+69 -57
bsky.py
··· 73 74 75 # Create a client with extended timeout for LLM operations 76 CLIENT= Letta( 77 - token=os.environ["LETTA_API_KEY"], 78 - timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 79 ) 80 81 # Use the "Bluesky" project 82 - PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 83 84 # Notification check delay 85 FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response ··· 157 logger.error(f"Failed to export agent: {e}") 158 159 def initialize_void(): 160 - logger.info("Starting void agent initialization...") 161 162 - # Get the configured void agent by ID 163 - logger.info("Loading void agent from config...") 164 - from config_loader import get_letta_config 165 - letta_config = get_letta_config() 166 agent_id = letta_config['agent_id'] 167 168 try: 169 - void_agent = CLIENT.agents.retrieve(agent_id=agent_id) 170 - logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})") 171 except Exception as e: 172 - logger.error(f"Failed to load void agent {agent_id}: {e}") 173 logger.error("Please ensure the agent_id in config.yaml is correct") 174 raise e 175 176 # Export agent state 177 logger.info("Exporting agent state...") 178 - export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 179 180 # Log agent details 181 - logger.info(f"Void agent details - ID: {void_agent.id}") 182 - logger.info(f"Agent name: {void_agent.name}") 183 - if hasattr(void_agent, 'llm_config'): 184 - logger.info(f"Agent model: {void_agent.llm_config.model}") 185 - logger.info(f"Agent project_id: {void_agent.project_id}") 186 - if hasattr(void_agent, 'tools'): 187 - logger.info(f"Agent has {len(void_agent.tools)} tools") 188 - for tool in void_agent.tools[:3]: # Show first 3 tools 189 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 190 191 - return void_agent 192 193 194 - def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 195 """Process a mention and generate a reply using the Letta agent. 196 197 Args: 198 - void_agent: The Letta agent instance 199 atproto_client: The AT Protocol client 200 notification_data: The notification data dictionary 201 queue_filepath: Optional Path object to the queue file (for cleanup on halt) ··· 325 326 try: 327 # Check for known bots in thread 328 - bot_check_result = check_known_bots(unique_handles, void_agent) 329 bot_check_data = json.loads(bot_check_result) 330 331 if bot_check_data.get("bot_detected", False): ··· 351 if unique_handles: 352 try: 353 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 354 - attach_result = attach_user_blocks(unique_handles, void_agent) 355 attached_handles = unique_handles # Track successfully attached handles 356 logger.debug(f"Attach result: {attach_result}") 357 except Exception as attach_error: ··· 378 try: 379 # Use streaming to avoid 524 timeout errors 380 message_stream = CLIENT.agents.messages.create_stream( 381 - agent_id=void_agent.id, 382 messages=[{"role": "user", "content": prompt}], 383 stream_tokens=False, # Step streaming only (faster than token streaming) 384 max_steps=100 ··· 666 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 667 logger.error("Update the agent's tools using register_tools.py") 668 # Export agent state before terminating 669 - export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 670 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 671 exit(1) 672 ··· 711 save_processed_notifications(processed_uris) 712 713 # Export agent state before terminating 714 - export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 715 716 # Exit the program 717 logger.info("=== BOT TERMINATED BY AGENT ===") ··· 724 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 725 logger.error("Update the agent's tools using register_tools.py") 726 # Export agent state before terminating 727 - export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 728 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 729 exit(1) 730 ··· 871 if 'attached_handles' in locals() and attached_handles: 872 try: 873 logger.info(f"Detaching user blocks for handles: {attached_handles}") 874 - detach_result = detach_user_blocks(attached_handles, void_agent) 875 logger.debug(f"Detach result: {detach_result}") 876 except Exception as detach_error: 877 logger.warning(f"Failed to detach user blocks: {detach_error}") ··· 999 return False 1000 1001 1002 - def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 1003 """Load and process all notifications from the queue in priority order.""" 1004 try: 1005 # Get all JSON files in queue directory (excluding processed_notifications.json) ··· 1078 # Process based on type using dict data directly 1079 success = False 1080 if notif_data['reason'] == "mention": 1081 - success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1082 if success: 1083 message_counters['mentions'] += 1 1084 elif notif_data['reason'] == "reply": 1085 - success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1086 if success: 1087 message_counters['replies'] += 1 1088 elif notif_data['reason'] == "follow": 1089 author_handle = notif_data['author']['handle'] 1090 author_display_name = notif_data['author'].get('display_name', 'no display name') 1091 follow_update = f"@{author_handle} ({author_display_name}) started following you." ··· 1095 try: 1096 # Use streaming to match other notification processing 1097 message_stream = CLIENT.agents.messages.create_stream( 1098 - agent_id=void_agent.id, 1099 messages=[{"role": "user", "content": follow_message}], 1100 stream_tokens=False, 1101 max_steps=50 # Fewer steps needed for simple follow updates ··· 1288 return 0 1289 1290 1291 - def process_notifications(void_agent, atproto_client, testing_mode=False): 1292 """Fetch new notifications, queue them, and process the queue.""" 1293 try: 1294 # Fetch and queue new notifications ··· 1298 logger.info(f"Found {new_count} new notifications to process") 1299 1300 # Now process the entire queue (old + new notifications) 1301 - load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1302 1303 except Exception as e: 1304 logger.error(f"Error processing notifications: {e}") ··· 1330 synthesis_prompt = f"""Time for synthesis and reflection. 1331 1332 You have access to temporal journal blocks for recording your thoughts and experiences: 1333 - - void_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')}) 1334 - - void_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')}) 1335 - - void_year_{today.year}: This year's journal ({today.year}) 1336 1337 These journal blocks are attached temporarily for this synthesis session. Use them to: 1338 1. Record significant interactions and insights from recent experiences ··· 1343 1344 The journal entries should be cumulative - add to existing content rather than replacing it. 1345 Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly). 1346 - 1347 - After recording in your journals, synthesize your recent experiences into your core memory blocks 1348 - (zeitgeist, void-persona, void-humans) as you normally would. 1349 1350 Begin your synthesis and journaling now.""" 1351 ··· 1652 1653 def main(): 1654 # Parse command line arguments 1655 - parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1656 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1657 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1658 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') ··· 1665 1666 # Configure logging based on command line arguments 1667 if args.simple_logs: 1668 - log_format = "void - %(levelname)s - %(message)s" 1669 else: 1670 # Create custom formatter with symbols 1671 class SymbolFormatter(logging.Formatter): ··· 1710 logging.root.addHandler(handler) 1711 1712 global logger, prompt_logger, console 1713 - logger = logging.getLogger("void_bot") 1714 logger.setLevel(logging.INFO) 1715 1716 # Create a separate logger for prompts (set to WARNING to hide by default) 1717 - prompt_logger = logging.getLogger("void_bot.prompts") 1718 if args.reasoning: 1719 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1720 else: ··· 1756 """Main bot loop that continuously monitors for notifications.""" 1757 global start_time 1758 start_time = time.time() 1759 - logger.info("=== STARTING VOID BOT ===") 1760 - void_agent = initialize_void() 1761 - logger.info(f"Void agent initialized: {void_agent.id}") 1762 1763 # Ensure correct tools are attached for Bluesky 1764 logger.info("Configuring tools for Bluesky platform...") 1765 try: 1766 from tool_manager import ensure_platform_tools 1767 - ensure_platform_tools('bluesky', void_agent.id) 1768 except Exception as e: 1769 logger.error(f"Failed to configure platform tools: {e}") 1770 logger.warning("Continuing with existing tool configuration") 1771 1772 # Check if agent has required tools 1773 - if hasattr(void_agent, 'tools') and void_agent.tools: 1774 - tool_names = [tool.name for tool in void_agent.tools] 1775 # Check for bluesky-related tools 1776 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1777 if not bluesky_tools: ··· 1781 1782 # Clean up all user blocks at startup 1783 logger.info("🧹 Cleaning up user blocks at startup...") 1784 - periodic_user_block_cleanup(CLIENT, void_agent.id) 1785 1786 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 1787 if not SYNTHESIS_ONLY: ··· 1812 try: 1813 # Send synthesis message immediately on first run 1814 logger.info("🧠 Sending synthesis message") 1815 - send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1816 1817 # Wait for next interval 1818 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") ··· 1844 while True: 1845 try: 1846 cycle_count += 1 1847 - process_notifications(void_agent, atproto_client, TESTING_MODE) 1848 1849 # Check if synthesis interval has passed 1850 if SYNTHESIS_INTERVAL > 0: ··· 1852 global last_synthesis_time 1853 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 1854 logger.info(f"⏰ {SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 1855 - send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1856 last_synthesis_time = current_time 1857 1858 # Run periodic cleanup every N cycles 1859 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 1860 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 1861 - periodic_user_block_cleanup(CLIENT, void_agent.id) 1862 1863 # Log cycle completion with stats 1864 elapsed_time = time.time() - start_time
··· 73 74 75 # Create a client with extended timeout for LLM operations 76 + from config_loader import get_letta_config 77 + letta_config = get_letta_config() 78 CLIENT= Letta( 79 + token=letta_config['api_key'], 80 + timeout=letta_config['timeout'] # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 81 ) 82 83 # Use the "Bluesky" project 84 + PROJECT_ID = letta_config['project_id'] 85 86 # Notification check delay 87 FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response ··· 159 logger.error(f"Failed to export agent: {e}") 160 161 def initialize_void(): 162 + logger.info("Starting agent initialization...") 163 164 + # Get the configured agent by ID 165 + logger.info("Loading agent from config...") 166 agent_id = letta_config['agent_id'] 167 168 try: 169 + agent = CLIENT.agents.retrieve(agent_id=agent_id) 170 + logger.info(f"Successfully loaded agent: {agent.name} ({agent_id})") 171 except Exception as e: 172 + logger.error(f"Failed to load agent {agent_id}: {e}") 173 logger.error("Please ensure the agent_id in config.yaml is correct") 174 raise e 175 176 # Export agent state 177 logger.info("Exporting agent state...") 178 + export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) 179 180 # Log agent details 181 + logger.info(f"Agent details - ID: {agent.id}") 182 + logger.info(f"Agent name: {agent.name}") 183 + if hasattr(agent, 'llm_config'): 184 + logger.info(f"Agent model: {agent.llm_config.model}") 185 + logger.info(f"Agent project_id: {agent.project_id}") 186 + if hasattr(agent, 'tools'): 187 + logger.info(f"Agent has {len(agent.tools)} tools") 188 + for tool in agent.tools[:3]: # Show first 3 tools 189 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 190 191 + return agent 192 193 194 + def process_mention(agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 195 """Process a mention and generate a reply using the Letta agent. 196 197 Args: 198 + agent: The Letta agent instance 199 atproto_client: The AT Protocol client 200 notification_data: The notification data dictionary 201 queue_filepath: Optional Path object to the queue file (for cleanup on halt) ··· 325 326 try: 327 # Check for known bots in thread 328 + bot_check_result = check_known_bots(unique_handles, agent) 329 bot_check_data = json.loads(bot_check_result) 330 331 if bot_check_data.get("bot_detected", False): ··· 351 if unique_handles: 352 try: 353 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 354 + attach_result = attach_user_blocks(unique_handles, agent) 355 attached_handles = unique_handles # Track successfully attached handles 356 logger.debug(f"Attach result: {attach_result}") 357 except Exception as attach_error: ··· 378 try: 379 # Use streaming to avoid 524 timeout errors 380 message_stream = CLIENT.agents.messages.create_stream( 381 + agent_id=agent.id, 382 messages=[{"role": "user", "content": prompt}], 383 stream_tokens=False, # Step streaming only (faster than token streaming) 384 max_steps=100 ··· 666 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 667 logger.error("Update the agent's tools using register_tools.py") 668 # Export agent state before terminating 669 + export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) 670 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 671 exit(1) 672 ··· 711 save_processed_notifications(processed_uris) 712 713 # Export agent state before terminating 714 + export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) 715 716 # Exit the program 717 logger.info("=== BOT TERMINATED BY AGENT ===") ··· 724 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 725 logger.error("Update the agent's tools using register_tools.py") 726 # Export agent state before terminating 727 + export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) 728 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 729 exit(1) 730 ··· 871 if 'attached_handles' in locals() and attached_handles: 872 try: 873 logger.info(f"Detaching user blocks for handles: {attached_handles}") 874 + detach_result = detach_user_blocks(attached_handles, agent) 875 logger.debug(f"Detach result: {detach_result}") 876 except Exception as detach_error: 877 logger.warning(f"Failed to detach user blocks: {detach_error}") ··· 999 return False 1000 1001 1002 + def load_and_process_queued_notifications(agent, atproto_client, testing_mode=False): 1003 """Load and process all notifications from the queue in priority order.""" 1004 try: 1005 # Get all JSON files in queue directory (excluding processed_notifications.json) ··· 1078 # Process based on type using dict data directly 1079 success = False 1080 if notif_data['reason'] == "mention": 1081 + success = process_mention(agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1082 if success: 1083 message_counters['mentions'] += 1 1084 elif notif_data['reason'] == "reply": 1085 + success = process_mention(agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1086 if success: 1087 message_counters['replies'] += 1 1088 elif notif_data['reason'] == "follow": 1089 + # Skip 1090 + logging.info("Skipping new follower notification, currently disabled") 1091 + 1092 + 1093 author_handle = notif_data['author']['handle'] 1094 author_display_name = notif_data['author'].get('display_name', 'no display name') 1095 follow_update = f"@{author_handle} ({author_display_name}) started following you." ··· 1099 try: 1100 # Use streaming to match other notification processing 1101 message_stream = CLIENT.agents.messages.create_stream( 1102 + agent_id=agent.id, 1103 messages=[{"role": "user", "content": follow_message}], 1104 stream_tokens=False, 1105 max_steps=50 # Fewer steps needed for simple follow updates ··· 1292 return 0 1293 1294 1295 + def process_notifications(agent, atproto_client, testing_mode=False): 1296 """Fetch new notifications, queue them, and process the queue.""" 1297 try: 1298 # Fetch and queue new notifications ··· 1302 logger.info(f"Found {new_count} new notifications to process") 1303 1304 # Now process the entire queue (old + new notifications) 1305 + load_and_process_queued_notifications(agent, atproto_client, testing_mode) 1306 1307 except Exception as e: 1308 logger.error(f"Error processing notifications: {e}") ··· 1334 synthesis_prompt = f"""Time for synthesis and reflection. 1335 1336 You have access to temporal journal blocks for recording your thoughts and experiences: 1337 + - comind_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')}) 1338 + - comind_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')}) 1339 + - comind_year_{today.year}: This year's journal ({today.year}) 1340 1341 These journal blocks are attached temporarily for this synthesis session. Use them to: 1342 1. Record significant interactions and insights from recent experiences ··· 1347 1348 The journal entries should be cumulative - add to existing content rather than replacing it. 1349 Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly). 1350 1351 Begin your synthesis and journaling now.""" 1352 ··· 1653 1654 def main(): 1655 # Parse command line arguments 1656 + parser = argparse.ArgumentParser(description='Comind - Bluesky autonomous agent') 1657 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1658 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1659 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') ··· 1666 1667 # Configure logging based on command line arguments 1668 if args.simple_logs: 1669 + log_format = "comind - %(levelname)s - %(message)s" 1670 else: 1671 # Create custom formatter with symbols 1672 class SymbolFormatter(logging.Formatter): ··· 1711 logging.root.addHandler(handler) 1712 1713 global logger, prompt_logger, console 1714 + logger = logging.getLogger("comind_bot") 1715 logger.setLevel(logging.INFO) 1716 1717 # Create a separate logger for prompts (set to WARNING to hide by default) 1718 + prompt_logger = logging.getLogger("comind_bot.prompts") 1719 if args.reasoning: 1720 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1721 else: ··· 1757 """Main bot loop that continuously monitors for notifications.""" 1758 global start_time 1759 start_time = time.time() 1760 + logger.info(""" 1761 + ███ █████ 1762 + ░░░ ░░███ 1763 + ██████ ██████ █████████████ ████ ████████ ███████ 1764 + ███░░███ ███░░███░░███░░███░░███ ░░███ ░░███░░███ ███░░███ 1765 + ░███ ░░░ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ 1766 + ░███ ███░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ 1767 + ░░██████ ░░██████ █████░███ █████ █████ ████ █████░░████████ 1768 + ░░░░░░ ░░░░░░ ░░░░░ ░░░ ░░░░░ ░░░░░ ░░░░ ░░░░░ ░░░░░░░░ 1769 + 1770 + 1771 + """) 1772 + agent = initialize_void() 1773 + logger.info(f"Agent initialized: {agent.id}") 1774 1775 # Ensure correct tools are attached for Bluesky 1776 logger.info("Configuring tools for Bluesky platform...") 1777 try: 1778 from tool_manager import ensure_platform_tools 1779 + ensure_platform_tools('bluesky', agent.id) 1780 except Exception as e: 1781 logger.error(f"Failed to configure platform tools: {e}") 1782 logger.warning("Continuing with existing tool configuration") 1783 1784 # Check if agent has required tools 1785 + if hasattr(agent, 'tools') and agent.tools: 1786 + tool_names = [tool.name for tool in agent.tools] 1787 # Check for bluesky-related tools 1788 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1789 if not bluesky_tools: ··· 1793 1794 # Clean up all user blocks at startup 1795 logger.info("🧹 Cleaning up user blocks at startup...") 1796 + periodic_user_block_cleanup(CLIENT, agent.id) 1797 1798 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 1799 if not SYNTHESIS_ONLY: ··· 1824 try: 1825 # Send synthesis message immediately on first run 1826 logger.info("🧠 Sending synthesis message") 1827 + send_synthesis_message(CLIENT, agent.id, atproto_client) 1828 1829 # Wait for next interval 1830 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") ··· 1856 while True: 1857 try: 1858 cycle_count += 1 1859 + process_notifications(agent, atproto_client, TESTING_MODE) 1860 1861 # Check if synthesis interval has passed 1862 if SYNTHESIS_INTERVAL > 0: ··· 1864 global last_synthesis_time 1865 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 1866 logger.info(f"⏰ {SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 1867 + send_synthesis_message(CLIENT, agent.id, atproto_client) 1868 last_synthesis_time = current_time 1869 1870 # Run periodic cleanup every N cycles 1871 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 1872 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 1873 + periodic_user_block_cleanup(CLIENT, agent.id) 1874 1875 # Log cycle completion with stats 1876 elapsed_time = time.time() - start_time
+32
list_agents.py
···
··· 1 + #!/usr/bin/env python 2 + """List all available agents in the Letta project.""" 3 + 4 + from letta_client import Letta 5 + from config_loader import get_letta_config 6 + 7 + # Get configuration 8 + letta_config = get_letta_config() 9 + 10 + # Create client 11 + client = Letta( 12 + token=letta_config['api_key'], 13 + timeout=letta_config['timeout'] 14 + ) 15 + 16 + # List all agents 17 + print("Available agents:") 18 + print("-" * 50) 19 + 20 + try: 21 + agents = client.agents.list() 22 + if not agents: 23 + print("No agents found in this project.") 24 + else: 25 + for agent in agents: 26 + print(f"Name: {agent.name}") 27 + print(f"ID: {agent.id}") 28 + if hasattr(agent, 'description'): 29 + print(f"Description: {agent.description}") 30 + print("-" * 50) 31 + except Exception as e: 32 + print(f"Error listing agents: {e}")
+134
tools/x_profile.py
···
··· 1 + """Profile search tool for X (Twitter) users.""" 2 + from pydantic import BaseModel, Field 3 + from typing import Optional 4 + 5 + 6 + class SearchXProfileArgs(BaseModel): 7 + username: str = Field(..., description="X username to look up (without @)") 8 + 9 + 10 + def search_x_profile(username: str) -> str: 11 + """ 12 + Look up detailed profile information for an X (Twitter) user. 13 + 14 + Args: 15 + username: X username to look up (without @) 16 + 17 + Returns: 18 + YAML-formatted profile information including bio, metrics, verification status, etc. 19 + """ 20 + import os 21 + import yaml 22 + import requests 23 + from datetime import datetime 24 + 25 + try: 26 + # Validate username (remove @ if present) 27 + username = username.lstrip('@') 28 + if not username: 29 + raise Exception("Username cannot be empty") 30 + 31 + # Get credentials from environment 32 + consumer_key = os.getenv("X_CONSUMER_KEY") 33 + consumer_secret = os.getenv("X_CONSUMER_SECRET") 34 + access_token = os.getenv("X_ACCESS_TOKEN") 35 + access_token_secret = os.getenv("X_ACCESS_TOKEN_SECRET") 36 + 37 + # Also check for bearer token as fallback 38 + bearer_token = os.getenv("X_BEARER_TOKEN") 39 + 40 + if not any([bearer_token, (consumer_key and consumer_secret and access_token and access_token_secret)]): 41 + raise Exception("X API credentials not found in environment variables") 42 + 43 + # Set up authentication headers 44 + base_url = "https://api.x.com/2" 45 + if bearer_token: 46 + headers = { 47 + "Authorization": f"Bearer {bearer_token}", 48 + "Content-Type": "application/json" 49 + } 50 + else: 51 + # For OAuth 1.0a, we'd need requests_oauthlib 52 + # Since this is a cloud function, we'll require bearer token for simplicity 53 + raise Exception("Bearer token required for X API authentication in cloud environment") 54 + 55 + # Get user profile information 56 + user_lookup_url = f"{base_url}/users/by/username/{username}" 57 + user_params = { 58 + "user.fields": ",".join([ 59 + "id", 60 + "name", 61 + "username", 62 + "description", 63 + "location", 64 + "url", 65 + "created_at", 66 + "verified", 67 + "verified_type", 68 + "public_metrics", 69 + "profile_image_url", 70 + "profile_banner_url", 71 + "protected" 72 + ]) 73 + } 74 + 75 + try: 76 + response = requests.get(user_lookup_url, headers=headers, params=user_params, timeout=10) 77 + response.raise_for_status() 78 + data = response.json() 79 + 80 + if "data" not in data: 81 + raise Exception(f"User @{username} not found") 82 + 83 + user = data["data"] 84 + 85 + except requests.exceptions.HTTPError as e: 86 + if response.status_code == 404: 87 + raise Exception(f"User @{username} not found") 88 + elif response.status_code == 429: 89 + raise Exception("X API rate limit exceeded. Please try again later.") 90 + else: 91 + raise Exception(f"Failed to look up user @{username}: {str(e)}") 92 + 93 + # Format the profile data 94 + profile_data = { 95 + "x_user_profile": { 96 + "basic_info": { 97 + "id": user.get("id"), 98 + "username": user.get("username"), 99 + "display_name": user.get("name"), 100 + "description": user.get("description", ""), 101 + "location": user.get("location", ""), 102 + "website": user.get("url", ""), 103 + "created_at": user.get("created_at", ""), 104 + "profile_url": f"https://x.com/{username}" 105 + }, 106 + "verification": { 107 + "verified": user.get("verified", False), 108 + "verified_type": user.get("verified_type", ""), 109 + }, 110 + "privacy": { 111 + "protected": user.get("protected", False) 112 + }, 113 + "media": { 114 + "profile_image_url": user.get("profile_image_url", ""), 115 + "profile_banner_url": user.get("profile_banner_url", "") 116 + } 117 + } 118 + } 119 + 120 + # Add public metrics if available 121 + if "public_metrics" in user: 122 + metrics = user["public_metrics"] 123 + profile_data["x_user_profile"]["metrics"] = { 124 + "followers_count": metrics.get("followers_count", 0), 125 + "following_count": metrics.get("following_count", 0), 126 + "tweet_count": metrics.get("tweet_count", 0), 127 + "listed_count": metrics.get("listed_count", 0), 128 + "like_count": metrics.get("like_count", 0) 129 + } 130 + 131 + return yaml.dump(profile_data, default_flow_style=False, sort_keys=False) 132 + 133 + except Exception as e: 134 + raise Exception(f"Error searching X profile: {str(e)}")