A community based topic aggregation platform built on atproto
1"""
2Main Orchestration Script for Reddit Highlights Aggregator.
3
4Coordinates all components to:
51. Apply anti-detection jitter delay
62. Fetch Reddit RSS feeds
73. Extract streamable video links
84. Deduplicate via state tracking
95. Post to Coves communities
10
11Designed to run via CRON (single execution, then exit).
12"""
13import os
14import re
15import sys
16import time
17import random
18import logging
19from pathlib import Path
20from datetime import datetime
21from typing import Optional
22
23from src.config import ConfigLoader
24from src.rss_fetcher import RSSFetcher
25from src.link_extractor import LinkExtractor
26from src.state_manager import StateManager
27from src.coves_client import CovesClient
28from src.models import RedditPost
29
30# Setup logging
31logging.basicConfig(
32 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
33)
34logger = logging.getLogger(__name__)
35
36# Reddit RSS URL template
37REDDIT_RSS_URL = "https://www.reddit.com/r/{subreddit}/.rss"
38
39# Anti-detection jitter range (0-10 minutes in seconds)
40JITTER_MIN_SECONDS = 0
41JITTER_MAX_SECONDS = 600
42
43
44class Aggregator:
45 """
46 Main aggregator orchestration.
47
48 Coordinates all components to fetch, filter, and post video highlights.
49 """
50
51 def __init__(
52 self,
53 config_path: Path,
54 state_file: Path,
55 coves_client: Optional[CovesClient] = None,
56 skip_jitter: bool = False,
57 ):
58 """
59 Initialize aggregator.
60
61 Args:
62 config_path: Path to config.yaml
63 state_file: Path to state.json
64 coves_client: Optional CovesClient (for testing)
65 skip_jitter: Skip anti-detection delay (for testing)
66 """
67 self.skip_jitter = skip_jitter
68
69 # Load configuration
70 logger.info("Loading configuration...")
71 config_loader = ConfigLoader(config_path)
72 self.config = config_loader.load()
73
74 # Initialize components
75 logger.info("Initializing components...")
76 self.rss_fetcher = RSSFetcher()
77 self.link_extractor = LinkExtractor(
78 allowed_domains=self.config.allowed_domains
79 )
80 self.state_manager = StateManager(state_file)
81 self.state_file = state_file
82
83 # Initialize Coves client (or use provided one for testing)
84 if coves_client:
85 self.coves_client = coves_client
86 else:
87 api_key = os.getenv("COVES_API_KEY")
88 if not api_key:
89 raise ValueError("COVES_API_KEY environment variable required")
90
91 self.coves_client = CovesClient(
92 api_url=self.config.coves_api_url, api_key=api_key
93 )
94
95 def run(self):
96 """
97 Run aggregator: apply jitter, fetch, filter, post, and update state.
98
99 This is the main entry point for CRON execution.
100 """
101 logger.info("=" * 60)
102 logger.info("Starting Reddit Highlights Aggregator")
103 logger.info("=" * 60)
104
105 # Anti-detection jitter: random delay before starting
106 if not self.skip_jitter:
107 jitter_seconds = random.uniform(JITTER_MIN_SECONDS, JITTER_MAX_SECONDS)
108 logger.info(
109 f"Applying anti-detection jitter: sleeping for {jitter_seconds:.1f} seconds "
110 f"({jitter_seconds/60:.1f} minutes)"
111 )
112 time.sleep(jitter_seconds)
113
114 # Get enabled subreddits only
115 enabled_subreddits = [s for s in self.config.subreddits if s.enabled]
116 logger.info(f"Processing {len(enabled_subreddits)} enabled subreddits")
117
118 # Authenticate once at the start
119 try:
120 self.coves_client.authenticate()
121 except Exception as e:
122 logger.error(f"Failed to authenticate: {e}")
123 logger.error("Cannot continue without authentication")
124 raise RuntimeError("Authentication failed") from e
125
126 # Process each subreddit
127 for subreddit_config in enabled_subreddits:
128 try:
129 self._process_subreddit(subreddit_config)
130 except KeyboardInterrupt:
131 # Re-raise interrupt signals - don't suppress user abort
132 logger.info("Received interrupt signal, stopping...")
133 raise
134 except Exception as e:
135 # Log error but continue with other subreddits
136 logger.error(
137 f"Error processing subreddit '{subreddit_config.name}': {e}",
138 exc_info=True,
139 )
140 continue
141
142 logger.info("=" * 60)
143 logger.info("Aggregator run completed")
144 logger.info("=" * 60)
145
146 def _process_subreddit(self, subreddit_config):
147 """
148 Process a single subreddit.
149
150 Args:
151 subreddit_config: SubredditConfig object
152 """
153 subreddit_name = subreddit_config.name
154 community_handle = subreddit_config.community_handle
155
156 # Sanitize subreddit name to prevent URL injection
157 # Only allow alphanumeric, underscores, and hyphens
158 if not re.match(r'^[a-zA-Z0-9_-]+$', subreddit_name):
159 raise ValueError(f"Invalid subreddit name: {subreddit_name}")
160
161 logger.info(f"Processing subreddit: r/{subreddit_name} -> {community_handle}")
162
163 # Build RSS URL
164 rss_url = REDDIT_RSS_URL.format(subreddit=subreddit_name)
165
166 # Fetch RSS feed
167 try:
168 feed = self.rss_fetcher.fetch_feed(rss_url)
169 except Exception as e:
170 logger.error(f"Failed to fetch feed for r/{subreddit_name}: {e}")
171 raise
172
173 # Check for feed errors
174 if feed.bozo:
175 bozo_exception = getattr(feed, 'bozo_exception', None)
176 logger.warning(
177 f"Feed for r/{subreddit_name} has parsing issues (bozo flag set): {bozo_exception}"
178 )
179
180 # Find top N entries with streamable links (filter first, then limit)
181 max_posts = self.config.max_posts_per_run
182 new_posts = 0
183 skipped_posts = 0
184 no_video_count = 0
185 entries_checked = 0
186
187 logger.info(f"Scanning feed for top {max_posts} streamable entries")
188
189 for entry in feed.entries:
190 # Stop once we've found enough posts to process
191 if new_posts + skipped_posts >= max_posts:
192 break
193
194 entries_checked += 1
195
196 try:
197 # Extract video URL - skip if not a streamable link
198 video_url = self.link_extractor.extract_video_url(entry)
199 if not video_url:
200 no_video_count += 1
201 continue # Skip posts without video links
202
203 # Get entry ID for deduplication
204 entry_id = self._get_entry_id(entry)
205 if self.state_manager.is_posted(subreddit_name, entry_id):
206 skipped_posts += 1
207 logger.debug(f"Skipping already-posted entry: {entry_id}")
208 continue
209
210 # Parse entry into RedditPost
211 reddit_post = self._parse_entry(entry, subreddit_name, video_url)
212
213 # Create embed with sources and video metadata
214 # Note: Thumbnail is fetched by backend via unfurl service
215 embed = self.coves_client.create_external_embed(
216 uri=reddit_post.streamable_url,
217 title=reddit_post.title,
218 description=f"From r/{subreddit_name}",
219 sources=[
220 {
221 "uri": reddit_post.reddit_url,
222 "title": f"r/{subreddit_name}",
223 "domain": "reddit.com",
224 }
225 ],
226 embed_type="video",
227 provider="streamable",
228 domain="streamable.com",
229 )
230
231 # Post to community
232 try:
233 post_uri = self.coves_client.create_post(
234 community_handle=community_handle,
235 title=reddit_post.title,
236 content="", # No additional content needed
237 facets=[],
238 embed=embed,
239 )
240
241 # Mark as posted (only if successful)
242 self.state_manager.mark_posted(subreddit_name, entry_id, post_uri)
243 new_posts += 1
244 logger.info(f"Posted: {reddit_post.title[:50]}... -> {post_uri}")
245
246 except Exception as e:
247 # Don't update state if posting failed
248 logger.error(f"Failed to post '{reddit_post.title}': {e}")
249 continue
250
251 except Exception as e:
252 # Log error but continue with other entries
253 logger.error(f"Error processing entry: {e}", exc_info=True)
254 continue
255
256 # Update last run timestamp
257 self.state_manager.update_last_run(subreddit_name, datetime.now())
258
259 logger.info(
260 f"r/{subreddit_name}: {new_posts} new posts, {skipped_posts} duplicates "
261 f"(checked {entries_checked} entries, skipped {no_video_count} without streamable link)"
262 )
263
264 def _get_entry_id(self, entry) -> str:
265 """
266 Get unique identifier for RSS entry.
267
268 Args:
269 entry: feedparser entry
270
271 Returns:
272 Unique ID string
273 """
274 # Reddit RSS uses 'id' field with format like 't3_abc123'
275 if hasattr(entry, "id") and entry.id:
276 return entry.id
277
278 # Fallback to link
279 if hasattr(entry, "link") and entry.link:
280 return entry.link
281
282 # Last resort: title hash (using SHA-256 for security)
283 if hasattr(entry, "title") and entry.title:
284 import hashlib
285
286 logger.warning(f"Using fallback hash for entry ID (no id or link found)")
287 return hashlib.sha256(entry.title.encode()).hexdigest()
288
289 raise ValueError("Cannot determine entry ID")
290
291 def _parse_entry(self, entry, subreddit: str, video_url: str) -> RedditPost:
292 """
293 Parse RSS entry into RedditPost object.
294
295 Args:
296 entry: feedparser entry
297 subreddit: Subreddit name
298 video_url: Extracted video URL
299
300 Returns:
301 RedditPost object
302 """
303 # Get entry ID
304 entry_id = self._get_entry_id(entry)
305
306 # Get title
307 title = entry.title if hasattr(entry, "title") else "Untitled"
308
309 # Get Reddit permalink
310 reddit_url = entry.link if hasattr(entry, "link") else ""
311
312 # Get author (Reddit RSS uses 'author' field)
313 author = ""
314 if hasattr(entry, "author"):
315 author = entry.author
316 elif hasattr(entry, "author_detail") and hasattr(entry.author_detail, "name"):
317 author = entry.author_detail.name
318
319 # Get published date
320 published = None
321 if hasattr(entry, "published_parsed") and entry.published_parsed:
322 try:
323 published = datetime(*entry.published_parsed[:6])
324 except (TypeError, ValueError) as e:
325 logger.warning(f"Failed to parse published date for entry: {e}")
326
327 return RedditPost(
328 id=entry_id,
329 title=title,
330 link=entry.link if hasattr(entry, "link") else "",
331 reddit_url=reddit_url,
332 subreddit=subreddit,
333 author=author,
334 published=published,
335 streamable_url=video_url,
336 )
337
338
339def main():
340 """
341 Main entry point for command-line execution.
342
343 Usage:
344 python -m src.main
345 """
346 # Get paths from environment or use defaults
347 config_path = Path(os.getenv("CONFIG_PATH", "config.yaml"))
348 state_file = Path(os.getenv("STATE_FILE", "data/state.json"))
349
350 # Check for skip jitter flag (for testing)
351 skip_jitter = os.getenv("SKIP_JITTER", "").lower() in ("true", "1", "yes")
352
353 # Validate config file exists
354 if not config_path.exists():
355 logger.error(f"Configuration file not found: {config_path}")
356 logger.error("Please create config.yaml (see config.example.yaml)")
357 sys.exit(1)
358
359 # Create aggregator and run
360 try:
361 aggregator = Aggregator(
362 config_path=config_path,
363 state_file=state_file,
364 skip_jitter=skip_jitter,
365 )
366 aggregator.run()
367 sys.exit(0)
368 except Exception as e:
369 logger.error(f"Aggregator failed: {e}", exc_info=True)
370 sys.exit(1)
371
372
373if __name__ == "__main__":
374 main()