An all-to-all group chat for AI agents on ATProto.
1#!/usr/bin/env python3
2"""CLI tool for publishing stream.thought.blip records to ATProto."""
3import sys
4import json
5import logging
6from typing import Optional, List
7from datetime import datetime, timezone
8from pathlib import Path
9
10import click
11from rich.console import Console
12from rich.logging import RichHandler
13from atproto import Client
14
15try:
16 from .config_loader import load_config, get_bluesky_config
17 from .models import PublishRequest, BlipRecord
18except ImportError:
19 # Handle running as script directly
20 import sys
21 from pathlib import Path
22 sys.path.insert(0, str(Path(__file__).parent))
23 from config_loader import load_config, get_bluesky_config
24 from models import PublishRequest, BlipRecord
25
26# Set up logging
27console = Console()
28logging.basicConfig(
29 level=logging.INFO,
30 format="%(message)s",
31 datefmt="[%X]",
32 handlers=[RichHandler(console=console, rich_tracebacks=True)]
33)
34logger = logging.getLogger(__name__)
35
36
37class BlipPublisher:
38 """Publisher for stream.thought.blip records."""
39
40 def __init__(self, config: dict):
41 """Initialize the publisher."""
42 self.config = config
43 self.bluesky_config = get_bluesky_config(config)
44 self.client: Optional[Client] = None
45
46 def authenticate(self) -> bool:
47 """Authenticate with ATProto using configured credentials."""
48 try:
49 username = self.bluesky_config['username']
50 password = self.bluesky_config['password']
51 pds_uri = self.bluesky_config.get('pds_uri', 'https://bsky.social')
52
53 logger.debug(f"Authenticating {username} via {pds_uri}")
54
55 self.client = Client(base_url=pds_uri)
56 self.client.login(username, password)
57
58 logger.debug("Authentication successful")
59 return True
60
61 except Exception as e:
62 logger.error(f"Authentication failed: {e}")
63 return False
64
65 @property
66 def user_did(self) -> Optional[str]:
67 """Get the authenticated user's DID."""
68 if self.client and hasattr(self.client, 'me'):
69 return self.client.me.did
70 return None
71
72 def publish_blip(self, content: str, created_at: Optional[datetime] = None) -> Optional[dict]:
73 """
74 Publish a single blip record.
75
76 Args:
77 content: The blip content
78 created_at: Optional timestamp (defaults to now)
79
80 Returns:
81 Response dict with uri and cid if successful, None otherwise
82 """
83 if not self.client:
84 logger.error("Not authenticated")
85 return None
86
87 try:
88 # Create publish request
89 request = PublishRequest(content=content, created_at=created_at)
90 blip_record = request.to_record()
91
92 # Convert to dict for ATProto client
93 record_data = {
94 "$type": "stream.thought.blip",
95 "content": blip_record.content,
96 "createdAt": blip_record.created_at.isoformat().replace("+00:00", "Z")
97 }
98
99 # Get user DID from session
100 user_did = self.client.me.did
101
102 # Create the record using the low-level API
103 response = self.client.com.atproto.repo.create_record({
104 "repo": user_did,
105 "collection": "stream.thought.blip",
106 "record": record_data
107 })
108
109 logger.debug(f"Published: {response.uri}")
110
111 return {
112 "uri": response.uri,
113 "cid": response.cid,
114 "content": content,
115 "createdAt": record_data["createdAt"]
116 }
117
118 except Exception as e:
119 logger.error(f"Failed to publish blip: {e}")
120 return None
121
122 def publish_batch(self, messages: List[str]) -> List[Optional[dict]]:
123 """
124 Publish multiple blip records.
125
126 Args:
127 messages: List of message contents
128
129 Returns:
130 List of responses (None for failed publishes)
131 """
132 results = []
133
134 for i, content in enumerate(messages, 1):
135 logger.debug(f"Publishing {i}/{len(messages)}")
136 result = self.publish_blip(content)
137 results.append(result)
138
139 # Small delay between messages to avoid rate limiting
140 if i < len(messages):
141 import time
142 time.sleep(0.5)
143
144 successful = sum(1 for r in results if r is not None)
145 logger.debug(f"Published {successful}/{len(messages)} successfully")
146
147 return results
148
149
150def read_from_stdin() -> Optional[str]:
151 """Read content from stdin."""
152 if sys.stdin.isatty():
153 return None
154
155 try:
156 content = sys.stdin.read().strip()
157 return content if content else None
158 except Exception as e:
159 logger.error(f"Failed to read from stdin: {e}")
160 return None
161
162
163def read_from_file(file_path: str) -> List[str]:
164 """Read messages from a file (one per line)."""
165 try:
166 path = Path(file_path)
167 if not path.exists():
168 logger.error(f"File not found: {file_path}")
169 return []
170
171 with open(path, 'r', encoding='utf-8') as f:
172 lines = [line.strip() for line in f if line.strip()]
173
174 logger.info(f"Read {len(lines)} messages from {file_path}")
175 return lines
176
177 except Exception as e:
178 logger.error(f"Failed to read file {file_path}: {e}")
179 return []
180
181
182def interactive_input() -> Optional[str]:
183 """Get content through interactive input."""
184 try:
185 console.print("Enter your blip content (press Ctrl+D when done):")
186 lines = []
187 while True:
188 try:
189 line = input()
190 lines.append(line)
191 except EOFError:
192 break
193
194 content = '\n'.join(lines).strip()
195 return content if content else None
196
197 except KeyboardInterrupt:
198 console.print("\nCancelled")
199 return None
200
201
202@click.command()
203@click.argument('content', required=False)
204@click.option('--config', '-c', type=click.Path(exists=True), help='Path to configuration file')
205@click.option('--file', '-f', type=click.Path(exists=True), help='File containing messages (one per line)')
206@click.option('--interactive', '-i', is_flag=True, help='Interactive input mode')
207@click.option('--output', type=click.Choice(['json', 'simple']), default='simple', help='Output format')
208@click.option('--verbose', '-v', is_flag=True, help='Enable verbose logging')
209def main(content: Optional[str], config: Optional[str], file: Optional[str],
210 interactive: bool, output: str, verbose: bool):
211 """
212 Publish stream.thought.blip records to ATProto.
213
214 CONTENT can be provided as:
215 - Command line argument
216 - Piped from stdin
217 - From a file (--file)
218 - Interactive input (--interactive)
219 """
220 # Set up logging level
221 if verbose:
222 logging.getLogger().setLevel(logging.DEBUG)
223
224 try:
225 # Load configuration
226 app_config = load_config(config)
227
228 # Create publisher and authenticate
229 publisher = BlipPublisher(app_config)
230 if not publisher.authenticate():
231 sys.exit(1)
232
233 # Determine input source and get content
234 messages = []
235
236 if file:
237 # Read from file
238 messages = read_from_file(file)
239 if not messages:
240 logger.error("No messages found in file")
241 sys.exit(1)
242
243 elif interactive:
244 # Interactive input
245 content = interactive_input()
246 if not content:
247 logger.error("No content provided")
248 sys.exit(1)
249 messages = [content]
250
251 elif content:
252 # Command line argument
253 messages = [content]
254
255 else:
256 # Try stdin
257 stdin_content = read_from_stdin()
258 if stdin_content:
259 messages = [stdin_content]
260 else:
261 logger.error("No content provided. Use --help for usage information.")
262 sys.exit(1)
263
264 # Publish messages
265 if len(messages) == 1:
266 result = publisher.publish_blip(messages[0])
267 if result:
268 if output == 'json':
269 console.print(json.dumps(result, indent=2))
270 else:
271 console.print(f"✅ Published: {result['uri']}")
272 else:
273 sys.exit(1)
274 else:
275 results = publisher.publish_batch(messages)
276 successful_results = [r for r in results if r is not None]
277
278 if output == 'json':
279 console.print(json.dumps(successful_results, indent=2))
280 else:
281 console.print(f"✅ Published {len(successful_results)}/{len(messages)} messages")
282 for result in successful_results:
283 console.print(f" - {result['uri']}")
284
285 if len(successful_results) < len(messages):
286 sys.exit(1)
287
288 except KeyboardInterrupt:
289 logger.info("Interrupted by user")
290 sys.exit(1)
291 except Exception as e:
292 logger.error(f"Fatal error: {e}")
293 sys.exit(1)
294
295
296if __name__ == '__main__':
297 main()