import asyncio import json import logging import signal import sys from typing import Optional import click from aiokafka import AIOKafkaConsumer from config import CONFIG from database import QDRANT_SERVICE from embedder import EMBEDDING_SERVICE from indexer import Indexer from metrics import prom_metrics from models import AtKafkaEvent shutdown_requested = False logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) def signal_handler(signum, frame): global shutdown_requested logger.info("Shutdown signal received") shutdown_requested = True async def consume( bootstrap_servers: Optional[str], topic: Optional[str], group_id: Optional[str], ): kafka_servers = bootstrap_servers or CONFIG.kafka_bootstrap_servers kafka_topic = topic or CONFIG.kafka_topic kafka_group = group_id or CONFIG.kafka_group_id kafka_offset = CONFIG.kafka_auto_offset_reset consumer = AIOKafkaConsumer( kafka_topic, bootstrap_servers=kafka_servers, group_id=kafka_group, auto_offset_reset=kafka_offset, enable_auto_commit=True, auto_commit_interval_ms=5000, session_timeout_ms=30000, max_poll_interval_ms=300000, value_deserializer=lambda m: json.loads(m.decode("utf-8")), ) indexer = Indexer() try: await consumer.start() logger.info("Connected to Kafka") async for message in consumer: if shutdown_requested: break try: event = AtKafkaEvent.model_validate(message.value) await indexer.handle_event(event) except Exception as e: logger.error(f"Error processing message: {e}") except Exception as e: logger.error(f"Error in consumer: {e}") finally: logger.info("Shutting Kafka consumer down...") await consumer.stop() @click.command() @click.option( "--bootstrap-servers", default=None, ) @click.option( "--topic", default=None, ) @click.option( "--group-id", default=None, ) def main( bootstrap_servers: Optional[str], topic: Optional[str], group_id: Optional[str], ): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) prom_metrics.start_http(CONFIG.metrics_port, CONFIG.metrics_host) EMBEDDING_SERVICE.initialize() QDRANT_SERVICE.initialize() try: asyncio.run( consume(bootstrap_servers=bootstrap_servers, topic=topic, group_id=group_id) ) except KeyboardInterrupt: logger.info("Keyboard interrupt received") except Exception as e: logger.error(f"Error in Kafka consumer: {e}", exc_info=True) sys.exit(1) finally: logger.info("Closing Kafka consumer...") consumer.close() if __name__ == "__main__": main()