A community based topic aggregation platform built on atproto
at main 374 lines 13 kB view raw
1""" 2Main Orchestration Script for Reddit Highlights Aggregator. 3 4Coordinates all components to: 51. Apply anti-detection jitter delay 62. Fetch Reddit RSS feeds 73. Extract streamable video links 84. Deduplicate via state tracking 95. Post to Coves communities 10 11Designed to run via CRON (single execution, then exit). 12""" 13import os 14import re 15import sys 16import time 17import random 18import logging 19from pathlib import Path 20from datetime import datetime 21from typing import Optional 22 23from src.config import ConfigLoader 24from src.rss_fetcher import RSSFetcher 25from src.link_extractor import LinkExtractor 26from src.state_manager import StateManager 27from src.coves_client import CovesClient 28from src.models import RedditPost 29 30# Setup logging 31logging.basicConfig( 32 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 33) 34logger = logging.getLogger(__name__) 35 36# Reddit RSS URL template 37REDDIT_RSS_URL = "https://www.reddit.com/r/{subreddit}/.rss" 38 39# Anti-detection jitter range (0-10 minutes in seconds) 40JITTER_MIN_SECONDS = 0 41JITTER_MAX_SECONDS = 600 42 43 44class Aggregator: 45 """ 46 Main aggregator orchestration. 47 48 Coordinates all components to fetch, filter, and post video highlights. 49 """ 50 51 def __init__( 52 self, 53 config_path: Path, 54 state_file: Path, 55 coves_client: Optional[CovesClient] = None, 56 skip_jitter: bool = False, 57 ): 58 """ 59 Initialize aggregator. 60 61 Args: 62 config_path: Path to config.yaml 63 state_file: Path to state.json 64 coves_client: Optional CovesClient (for testing) 65 skip_jitter: Skip anti-detection delay (for testing) 66 """ 67 self.skip_jitter = skip_jitter 68 69 # Load configuration 70 logger.info("Loading configuration...") 71 config_loader = ConfigLoader(config_path) 72 self.config = config_loader.load() 73 74 # Initialize components 75 logger.info("Initializing components...") 76 self.rss_fetcher = RSSFetcher() 77 self.link_extractor = LinkExtractor( 78 allowed_domains=self.config.allowed_domains 79 ) 80 self.state_manager = StateManager(state_file) 81 self.state_file = state_file 82 83 # Initialize Coves client (or use provided one for testing) 84 if coves_client: 85 self.coves_client = coves_client 86 else: 87 api_key = os.getenv("COVES_API_KEY") 88 if not api_key: 89 raise ValueError("COVES_API_KEY environment variable required") 90 91 self.coves_client = CovesClient( 92 api_url=self.config.coves_api_url, api_key=api_key 93 ) 94 95 def run(self): 96 """ 97 Run aggregator: apply jitter, fetch, filter, post, and update state. 98 99 This is the main entry point for CRON execution. 100 """ 101 logger.info("=" * 60) 102 logger.info("Starting Reddit Highlights Aggregator") 103 logger.info("=" * 60) 104 105 # Anti-detection jitter: random delay before starting 106 if not self.skip_jitter: 107 jitter_seconds = random.uniform(JITTER_MIN_SECONDS, JITTER_MAX_SECONDS) 108 logger.info( 109 f"Applying anti-detection jitter: sleeping for {jitter_seconds:.1f} seconds " 110 f"({jitter_seconds/60:.1f} minutes)" 111 ) 112 time.sleep(jitter_seconds) 113 114 # Get enabled subreddits only 115 enabled_subreddits = [s for s in self.config.subreddits if s.enabled] 116 logger.info(f"Processing {len(enabled_subreddits)} enabled subreddits") 117 118 # Authenticate once at the start 119 try: 120 self.coves_client.authenticate() 121 except Exception as e: 122 logger.error(f"Failed to authenticate: {e}") 123 logger.error("Cannot continue without authentication") 124 raise RuntimeError("Authentication failed") from e 125 126 # Process each subreddit 127 for subreddit_config in enabled_subreddits: 128 try: 129 self._process_subreddit(subreddit_config) 130 except KeyboardInterrupt: 131 # Re-raise interrupt signals - don't suppress user abort 132 logger.info("Received interrupt signal, stopping...") 133 raise 134 except Exception as e: 135 # Log error but continue with other subreddits 136 logger.error( 137 f"Error processing subreddit '{subreddit_config.name}': {e}", 138 exc_info=True, 139 ) 140 continue 141 142 logger.info("=" * 60) 143 logger.info("Aggregator run completed") 144 logger.info("=" * 60) 145 146 def _process_subreddit(self, subreddit_config): 147 """ 148 Process a single subreddit. 149 150 Args: 151 subreddit_config: SubredditConfig object 152 """ 153 subreddit_name = subreddit_config.name 154 community_handle = subreddit_config.community_handle 155 156 # Sanitize subreddit name to prevent URL injection 157 # Only allow alphanumeric, underscores, and hyphens 158 if not re.match(r'^[a-zA-Z0-9_-]+$', subreddit_name): 159 raise ValueError(f"Invalid subreddit name: {subreddit_name}") 160 161 logger.info(f"Processing subreddit: r/{subreddit_name} -> {community_handle}") 162 163 # Build RSS URL 164 rss_url = REDDIT_RSS_URL.format(subreddit=subreddit_name) 165 166 # Fetch RSS feed 167 try: 168 feed = self.rss_fetcher.fetch_feed(rss_url) 169 except Exception as e: 170 logger.error(f"Failed to fetch feed for r/{subreddit_name}: {e}") 171 raise 172 173 # Check for feed errors 174 if feed.bozo: 175 bozo_exception = getattr(feed, 'bozo_exception', None) 176 logger.warning( 177 f"Feed for r/{subreddit_name} has parsing issues (bozo flag set): {bozo_exception}" 178 ) 179 180 # Find top N entries with streamable links (filter first, then limit) 181 max_posts = self.config.max_posts_per_run 182 new_posts = 0 183 skipped_posts = 0 184 no_video_count = 0 185 entries_checked = 0 186 187 logger.info(f"Scanning feed for top {max_posts} streamable entries") 188 189 for entry in feed.entries: 190 # Stop once we've found enough posts to process 191 if new_posts + skipped_posts >= max_posts: 192 break 193 194 entries_checked += 1 195 196 try: 197 # Extract video URL - skip if not a streamable link 198 video_url = self.link_extractor.extract_video_url(entry) 199 if not video_url: 200 no_video_count += 1 201 continue # Skip posts without video links 202 203 # Get entry ID for deduplication 204 entry_id = self._get_entry_id(entry) 205 if self.state_manager.is_posted(subreddit_name, entry_id): 206 skipped_posts += 1 207 logger.debug(f"Skipping already-posted entry: {entry_id}") 208 continue 209 210 # Parse entry into RedditPost 211 reddit_post = self._parse_entry(entry, subreddit_name, video_url) 212 213 # Create embed with sources and video metadata 214 # Note: Thumbnail is fetched by backend via unfurl service 215 embed = self.coves_client.create_external_embed( 216 uri=reddit_post.streamable_url, 217 title=reddit_post.title, 218 description=f"From r/{subreddit_name}", 219 sources=[ 220 { 221 "uri": reddit_post.reddit_url, 222 "title": f"r/{subreddit_name}", 223 "domain": "reddit.com", 224 } 225 ], 226 embed_type="video", 227 provider="streamable", 228 domain="streamable.com", 229 ) 230 231 # Post to community 232 try: 233 post_uri = self.coves_client.create_post( 234 community_handle=community_handle, 235 title=reddit_post.title, 236 content="", # No additional content needed 237 facets=[], 238 embed=embed, 239 ) 240 241 # Mark as posted (only if successful) 242 self.state_manager.mark_posted(subreddit_name, entry_id, post_uri) 243 new_posts += 1 244 logger.info(f"Posted: {reddit_post.title[:50]}... -> {post_uri}") 245 246 except Exception as e: 247 # Don't update state if posting failed 248 logger.error(f"Failed to post '{reddit_post.title}': {e}") 249 continue 250 251 except Exception as e: 252 # Log error but continue with other entries 253 logger.error(f"Error processing entry: {e}", exc_info=True) 254 continue 255 256 # Update last run timestamp 257 self.state_manager.update_last_run(subreddit_name, datetime.now()) 258 259 logger.info( 260 f"r/{subreddit_name}: {new_posts} new posts, {skipped_posts} duplicates " 261 f"(checked {entries_checked} entries, skipped {no_video_count} without streamable link)" 262 ) 263 264 def _get_entry_id(self, entry) -> str: 265 """ 266 Get unique identifier for RSS entry. 267 268 Args: 269 entry: feedparser entry 270 271 Returns: 272 Unique ID string 273 """ 274 # Reddit RSS uses 'id' field with format like 't3_abc123' 275 if hasattr(entry, "id") and entry.id: 276 return entry.id 277 278 # Fallback to link 279 if hasattr(entry, "link") and entry.link: 280 return entry.link 281 282 # Last resort: title hash (using SHA-256 for security) 283 if hasattr(entry, "title") and entry.title: 284 import hashlib 285 286 logger.warning(f"Using fallback hash for entry ID (no id or link found)") 287 return hashlib.sha256(entry.title.encode()).hexdigest() 288 289 raise ValueError("Cannot determine entry ID") 290 291 def _parse_entry(self, entry, subreddit: str, video_url: str) -> RedditPost: 292 """ 293 Parse RSS entry into RedditPost object. 294 295 Args: 296 entry: feedparser entry 297 subreddit: Subreddit name 298 video_url: Extracted video URL 299 300 Returns: 301 RedditPost object 302 """ 303 # Get entry ID 304 entry_id = self._get_entry_id(entry) 305 306 # Get title 307 title = entry.title if hasattr(entry, "title") else "Untitled" 308 309 # Get Reddit permalink 310 reddit_url = entry.link if hasattr(entry, "link") else "" 311 312 # Get author (Reddit RSS uses 'author' field) 313 author = "" 314 if hasattr(entry, "author"): 315 author = entry.author 316 elif hasattr(entry, "author_detail") and hasattr(entry.author_detail, "name"): 317 author = entry.author_detail.name 318 319 # Get published date 320 published = None 321 if hasattr(entry, "published_parsed") and entry.published_parsed: 322 try: 323 published = datetime(*entry.published_parsed[:6]) 324 except (TypeError, ValueError) as e: 325 logger.warning(f"Failed to parse published date for entry: {e}") 326 327 return RedditPost( 328 id=entry_id, 329 title=title, 330 link=entry.link if hasattr(entry, "link") else "", 331 reddit_url=reddit_url, 332 subreddit=subreddit, 333 author=author, 334 published=published, 335 streamable_url=video_url, 336 ) 337 338 339def main(): 340 """ 341 Main entry point for command-line execution. 342 343 Usage: 344 python -m src.main 345 """ 346 # Get paths from environment or use defaults 347 config_path = Path(os.getenv("CONFIG_PATH", "config.yaml")) 348 state_file = Path(os.getenv("STATE_FILE", "data/state.json")) 349 350 # Check for skip jitter flag (for testing) 351 skip_jitter = os.getenv("SKIP_JITTER", "").lower() in ("true", "1", "yes") 352 353 # Validate config file exists 354 if not config_path.exists(): 355 logger.error(f"Configuration file not found: {config_path}") 356 logger.error("Please create config.yaml (see config.example.yaml)") 357 sys.exit(1) 358 359 # Create aggregator and run 360 try: 361 aggregator = Aggregator( 362 config_path=config_path, 363 state_file=state_file, 364 skip_jitter=skip_jitter, 365 ) 366 aggregator.run() 367 sys.exit(0) 368 except Exception as e: 369 logger.error(f"Aggregator failed: {e}", exc_info=True) 370 sys.exit(1) 371 372 373if __name__ == "__main__": 374 main()