A lil service that creates embeddings of posts, profiles, and avatars to store them in Qdrant
at main 3.0 kB view raw
1import asyncio 2import json 3import logging 4import signal 5import sys 6from typing import Optional 7 8import click 9from aiokafka import AIOKafkaConsumer 10 11from config import CONFIG 12from database import QDRANT_SERVICE 13from embedder import EMBEDDING_SERVICE 14from indexer import Indexer 15from metrics import prom_metrics 16from models import AtKafkaEvent 17 18shutdown_requested = False 19 20logging.basicConfig( 21 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 22) 23logger = logging.getLogger(__name__) 24 25 26def signal_handler(signum, frame): 27 global shutdown_requested 28 logger.info("Shutdown signal received") 29 shutdown_requested = True 30 31 32async def consume( 33 bootstrap_servers: Optional[str], 34 topic: Optional[str], 35 group_id: Optional[str], 36): 37 kafka_servers = bootstrap_servers or CONFIG.kafka_bootstrap_servers 38 kafka_topic = topic or CONFIG.kafka_topic 39 kafka_group = group_id or CONFIG.kafka_group_id 40 kafka_offset = CONFIG.kafka_auto_offset_reset 41 42 consumer = AIOKafkaConsumer( 43 kafka_topic, 44 bootstrap_servers=kafka_servers, 45 group_id=kafka_group, 46 auto_offset_reset=kafka_offset, 47 enable_auto_commit=True, 48 auto_commit_interval_ms=5000, 49 session_timeout_ms=30000, 50 max_poll_interval_ms=300000, 51 value_deserializer=lambda m: json.loads(m.decode("utf-8")), 52 ) 53 54 indexer = Indexer() 55 56 try: 57 await consumer.start() 58 logger.info("Connected to Kafka") 59 60 async for message in consumer: 61 if shutdown_requested: 62 break 63 64 try: 65 event = AtKafkaEvent.model_validate(message.value) 66 await indexer.handle_event(event) 67 except Exception as e: 68 logger.error(f"Error processing message: {e}") 69 except Exception as e: 70 logger.error(f"Error in consumer: {e}") 71 finally: 72 logger.info("Shutting Kafka consumer down...") 73 await consumer.stop() 74 75 76@click.command() 77@click.option( 78 "--bootstrap-servers", 79 default=None, 80) 81@click.option( 82 "--topic", 83 default=None, 84) 85@click.option( 86 "--group-id", 87 default=None, 88) 89def main( 90 bootstrap_servers: Optional[str], 91 topic: Optional[str], 92 group_id: Optional[str], 93): 94 signal.signal(signal.SIGINT, signal_handler) 95 signal.signal(signal.SIGTERM, signal_handler) 96 97 prom_metrics.start_http(CONFIG.metrics_port, CONFIG.metrics_host) 98 99 EMBEDDING_SERVICE.initialize() 100 101 QDRANT_SERVICE.initialize() 102 103 try: 104 asyncio.run( 105 consume(bootstrap_servers=bootstrap_servers, topic=topic, group_id=group_id) 106 ) 107 except KeyboardInterrupt: 108 logger.info("Keyboard interrupt received") 109 except Exception as e: 110 logger.error(f"Error in Kafka consumer: {e}", exc_info=True) 111 sys.exit(1) 112 finally: 113 logger.info("Closing Kafka consumer...") 114 consumer.close() 115 116 117if __name__ == "__main__": 118 main()