#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["atproto", "pydantic-settings", "geopy", "httpx", "jinja2"] # /// """ Monitor flights passing overhead and send BlueSky DMs. Usage: # Single user mode (backward compatible) ./dm-me-when-a-flight-passes-over # Multi-subscriber mode with JSON file ./dm-me-when-a-flight-passes-over --subscribers subscribers.json # Multi-subscriber mode with stdin echo '[{"handle": "user1.bsky.social", "latitude": 41.8781, "longitude": -87.6298, "radius_miles": 5}]' | ./dm-me-when-a-flight-passes-over --subscribers - This script monitors flights within a configurable radius and sends DMs on BlueSky when flights pass overhead. Supports multiple subscribers with different locations. ## Future Architecture Ideas ### Web App Deployment Options 1. **FastAPI + Fly.io/Railway/Render** - REST API with endpoints: - POST /subscribe - Register user with BlueSky handle - DELETE /unsubscribe - Remove subscription - POST /update-location - Update user's location - GET /status - Check subscription status - Background worker using Celery/RQ/APScheduler - PostgreSQL/SQLite for subscriber persistence - Redis for caching flight data & deduplication 2. **Vercel/Netlify Edge Functions** - Serverless approach with scheduled cron jobs - Use Vercel KV or Upstash Redis for state - Challenge: Long-running monitoring needs workarounds - Solution: Trigger checks via cron every minute 3. **Self-Hosted with ngrok/Cloudflare Tunnel** - Quick prototype option - Run this script as daemon - Expose simple Flask/FastAPI wrapper - Security concerns: rate limiting, auth required ### Mobile/Browser Integration 1. **Progressive Web App (PWA)** - Service worker for background location updates - Geolocation API for current position - Push notifications instead of/alongside DMs - IndexedDB for offline capability 2. **iOS Shortcuts Integration** - Create shortcut that gets location - Calls webhook with location + BlueSky handle - Could run automatically based on focus modes 3. **Browser Extension** - Background script polls location - Lighter weight than full app - Cross-platform solution ### Architecture Components 1. **Location Services Layer** - Browser Geolocation API - IP-based geolocation fallback - Manual location picker UI - Privacy: Only send location when checking flights 2. **Notification Options** - BlueSky DMs (current) - Web Push Notifications - Webhooks to other services - Email/SMS via Twilio/SendGrid 3. **Subscription Management** - OAuth with BlueSky for auth - User preferences: radius, notification types - Quiet hours/Do Not Disturb - Rate limiting per user 4. **Data Optimization** - Cache FlightRadar API responses - Batch location updates - Aggregate nearby users for efficiency - WebSocket for real-time updates ### Implementation Approach Phase 1: Web API Wrapper - FastAPI with /subscribe endpoint - SQLite for subscribers - Run monitoring in background thread - Deploy to Fly.io free tier Phase 2: Web UI - Simple React/Vue form - Geolocation permission request - Show nearby flights on map - Subscription management Phase 3: Mobile Experience - PWA with service workers - Background location updates - Local notifications - Offline support ### Security Considerations - Rate limit FlightRadar API calls - Authenticate BlueSky handles - Validate location bounds - Prevent subscription spam - GDPR compliance for location data """ import argparse import time import math import json import sys from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed import httpx from atproto import Client from geopy import distance from jinja2 import Template from pydantic import BaseModel, Field from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): """App settings loaded from environment variables""" model_config = SettingsConfigDict(env_file=".env", extra="ignore") bsky_handle: str = Field(...) bsky_password: str = Field(...) flightradar_api_token: str = Field(...) class Subscriber(BaseModel): """Subscriber with location and notification preferences""" handle: str latitude: float longitude: float radius_miles: float = 5.0 filters: dict[str, list[str]] = Field(default_factory=dict) message_template: str | None = None class Flight(BaseModel): """Flight data model""" hex: str latitude: float longitude: float altitude: float | None = None ground_speed: float | None = None heading: float | None = None aircraft_type: str | None = None registration: str | None = None origin: str | None = None destination: str | None = None callsign: str | None = None distance_miles: float def get_flights_in_area( settings: Settings, latitude: float, longitude: float, radius_miles: float ) -> list[Flight]: """Get flights within the specified radius using FlightRadar24 API.""" lat_offset = radius_miles / 69 # 1 degree latitude ≈ 69 miles lon_offset = radius_miles / (69 * abs(math.cos(math.radians(latitude)))) bounds = { "north": latitude + lat_offset, "south": latitude - lat_offset, "west": longitude - lon_offset, "east": longitude + lon_offset, } headers = { "Authorization": f"Bearer {settings.flightradar_api_token}", "Accept": "application/json", "Accept-Version": "v1", } url = "https://fr24api.flightradar24.com/api/live/flight-positions/full" params = { "bounds": f"{bounds['north']},{bounds['south']},{bounds['west']},{bounds['east']}" } try: with httpx.Client() as client: response = client.get(url, headers=headers, params=params, timeout=10) response.raise_for_status() data = response.json() flights_in_radius = [] center = (latitude, longitude) if isinstance(data, dict) and "data" in data: for flight_data in data["data"]: lat = flight_data.get("lat") lon = flight_data.get("lon") if lat and lon: flight_pos = (lat, lon) dist = distance.distance(center, flight_pos).miles if dist <= radius_miles: flight = Flight( hex=flight_data.get("fr24_id", ""), latitude=lat, longitude=lon, altitude=flight_data.get("alt"), ground_speed=flight_data.get("gspeed"), heading=flight_data.get("track"), aircraft_type=flight_data.get("type"), registration=flight_data.get("reg"), origin=flight_data.get("orig_iata"), destination=flight_data.get("dest_iata"), callsign=flight_data.get("flight"), distance_miles=round(dist, 2), ) flights_in_radius.append(flight) return flights_in_radius except httpx.HTTPStatusError as e: print(f"HTTP error fetching flights: {e}") print(f"Response status: {e.response.status_code}") print(f"Response content: {e.response.text[:500]}") return [] except Exception as e: print(f"Error fetching flights: {e}") return [] DEFAULT_MESSAGE_TEMPLATE = """✈️ Flight passing overhead! Flight: {{ flight.callsign or 'Unknown' }} Distance: {{ flight.distance_miles }} miles {%- if flight.altitude %} Altitude: {{ "{:,.0f}".format(flight.altitude) }} ft {%- endif %} {%- if flight.ground_speed %} Speed: {{ "{:.0f}".format(flight.ground_speed) }} kts {%- endif %} {%- if flight.heading %} Heading: {{ "{:.0f}".format(flight.heading) }}° {%- endif %} {%- if flight.aircraft_type %} Aircraft: {{ flight.aircraft_type }} {%- endif %} {%- if flight.origin or flight.destination %} Route: {{ flight.origin or '???' }} → {{ flight.destination or '???' }} {%- endif %} Time: {{ timestamp }}""" def format_flight_info(flight: Flight, template_str: str | None = None) -> str: """Format flight information for a DM using Jinja2 template.""" template_str = template_str or DEFAULT_MESSAGE_TEMPLATE template = Template(template_str) return template.render( flight=flight, timestamp=datetime.now().strftime('%H:%M:%S') ) def send_dm(client: Client, message: str, target_handle: str) -> bool: """Send a direct message to the specified handle on BlueSky.""" try: resolved = client.com.atproto.identity.resolve_handle( params={"handle": target_handle} ) target_did = resolved.did chat_client = client.with_bsky_chat_proxy() convo_response = chat_client.chat.bsky.convo.get_convo_for_members( {"members": [target_did]} ) if not convo_response or not convo_response.convo: print(f"Could not create/get conversation with {target_handle}") return False recipient = None for member in convo_response.convo.members: if member.did != client.me.did: recipient = member break if not recipient or recipient.handle != target_handle: print( f"ERROR: About to message wrong person! Expected {target_handle}, but found {recipient.handle if recipient else 'no recipient'}" ) return False chat_client.chat.bsky.convo.send_message( data={ "convoId": convo_response.convo.id, "message": {"text": message, "facets": []}, } ) print(f"DM sent to {target_handle}") return True except Exception as e: print(f"Error sending DM to {target_handle}: {e}") return False def flight_matches_filters(flight: Flight, filters: dict[str, list[str]]) -> bool: """Check if a flight matches the subscriber's filters.""" if not filters: return True for field, allowed_values in filters.items(): if not allowed_values: continue flight_value = getattr(flight, field, None) if flight_value is None: return False if field == "aircraft_type": # Case-insensitive partial matching for aircraft types flight_value_lower = str(flight_value).lower() if not any(allowed.lower() in flight_value_lower for allowed in allowed_values): return False else: # Exact matching for other fields if str(flight_value) not in [str(v) for v in allowed_values]: return False return True def process_subscriber( client: Client, settings: Settings, subscriber: Subscriber, notified_flights: dict[str, set[str]], ) -> None: """Process flights for a single subscriber.""" try: flights = get_flights_in_area( settings, subscriber.latitude, subscriber.longitude, subscriber.radius_miles ) if subscriber.handle not in notified_flights: notified_flights[subscriber.handle] = set() subscriber_notified = notified_flights[subscriber.handle] filtered_count = 0 for flight in flights: flight_id = flight.hex if not flight_matches_filters(flight, subscriber.filters): filtered_count += 1 continue if flight_id not in subscriber_notified: message = format_flight_info(flight, subscriber.message_template) print(f"\n[{subscriber.handle}] {message}\n") if send_dm(client, message, subscriber.handle): print(f"DM sent to {subscriber.handle} for flight {flight_id}") subscriber_notified.add(flight_id) else: print( f"Failed to send DM to {subscriber.handle} for flight {flight_id}" ) current_flight_ids = {f.hex for f in flights} notified_flights[subscriber.handle] &= current_flight_ids if not flights: print( f"[{subscriber.handle}] No flights in range at {datetime.now().strftime('%H:%M:%S')}" ) elif filtered_count > 0 and filtered_count == len(flights): print( f"[{subscriber.handle}] {filtered_count} flights filtered out at {datetime.now().strftime('%H:%M:%S')}" ) except Exception as e: print(f"Error processing subscriber {subscriber.handle}: {e}") def load_subscribers(subscribers_input: str | None) -> list[Subscriber]: """Load subscribers from JSON file or stdin.""" if subscribers_input: with open(subscribers_input, "r") as f: data = json.load(f) else: print("Reading subscriber data from stdin (provide JSON array)...") data = json.load(sys.stdin) return [Subscriber(**item) for item in data] def main(): """Main monitoring loop.""" parser = argparse.ArgumentParser( description="Monitor flights overhead and send BlueSky DMs" ) parser.add_argument( "--subscribers", type=str, help="JSON file with subscriber list, or '-' for stdin", ) parser.add_argument( "--latitude", type=float, default=41.8781, help="Latitude (default: Chicago)" ) parser.add_argument( "--longitude", type=float, default=-87.6298, help="Longitude (default: Chicago)" ) parser.add_argument( "--radius", type=float, default=5.0, help="Radius in miles (default: 5)" ) parser.add_argument( "--handle", type=str, default="alternatebuild.dev", help="BlueSky handle to DM (default: alternatebuild.dev)", ) parser.add_argument( "--filter-aircraft-type", type=str, nargs="+", help="Filter by aircraft types (e.g., B737 A320 C172)", ) parser.add_argument( "--filter-callsign", type=str, nargs="+", help="Filter by callsigns (e.g., UAL DL AAL)", ) parser.add_argument( "--filter-origin", type=str, nargs="+", help="Filter by origin airports (e.g., ORD LAX JFK)", ) parser.add_argument( "--filter-destination", type=str, nargs="+", help="Filter by destination airports (e.g., ORD LAX JFK)", ) parser.add_argument( "--message-template", type=str, help="Custom Jinja2 template for messages", ) parser.add_argument( "--message-template-file", type=str, help="Path to file containing custom Jinja2 template", ) parser.add_argument( "--interval", type=int, default=60, help="Check interval in seconds (default: 60)", ) parser.add_argument( "--once", action="store_true", help="Run once and exit (for testing)" ) parser.add_argument( "--max-workers", type=int, default=5, help="Max concurrent workers for processing subscribers (default: 5)", ) args = parser.parse_args() try: settings = Settings() except Exception as e: print(f"Error loading settings: {e}") print( "Ensure .env file exists with BSKY_HANDLE, BSKY_PASSWORD, and FLIGHTRADAR_API_TOKEN" ) return client = Client() try: client.login(settings.bsky_handle, settings.bsky_password) print(f"Logged in to BlueSky as {settings.bsky_handle}") except Exception as e: print(f"Error logging into BlueSky: {e}") return if args.subscribers: if args.subscribers == "-": subscribers_input = None else: subscribers_input = args.subscribers try: subscribers = load_subscribers(subscribers_input) print(f"Loaded {len(subscribers)} subscriber(s)") except Exception as e: print(f"Error loading subscribers: {e}") return else: # Build filters from CLI args filters = {} if args.filter_aircraft_type: filters["aircraft_type"] = args.filter_aircraft_type if args.filter_callsign: filters["callsign"] = args.filter_callsign if args.filter_origin: filters["origin"] = args.filter_origin if args.filter_destination: filters["destination"] = args.filter_destination # Load custom template if provided message_template = None if args.message_template_file: with open(args.message_template_file, "r") as f: message_template = f.read() elif args.message_template: message_template = args.message_template subscribers = [ Subscriber( handle=args.handle, latitude=args.latitude, longitude=args.longitude, radius_miles=args.radius, filters=filters, message_template=message_template, ) ] print( f"Monitoring flights within {args.radius} miles of ({args.latitude}, {args.longitude}) for {args.handle}" ) if filters: print(f"Active filters: {filters}") print(f"Checking every {args.interval} seconds...") notified_flights: dict[str, set[str]] = {} while True: try: with ThreadPoolExecutor(max_workers=args.max_workers) as executor: futures = [] for subscriber in subscribers: future = executor.submit( process_subscriber, client, settings, subscriber, notified_flights, ) futures.append(future) for future in as_completed(futures): future.result() if args.once: break time.sleep(args.interval) except KeyboardInterrupt: print("\nStopping flight monitor...") break except Exception as e: print(f"Error in monitoring loop: {e}") time.sleep(args.interval) if __name__ == "__main__": main()