1import asyncio
2import json
3import logging
4import signal
5import sys
6from typing import Optional
7
8import click
9from aiokafka import AIOKafkaConsumer
10
11from config import CONFIG
12from database import QDRANT_SERVICE
13from embedder import EMBEDDING_SERVICE
14from indexer import Indexer
15from metrics import prom_metrics
16from models import AtKafkaEvent
17
18shutdown_requested = False
19
20logging.basicConfig(
21 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
22)
23logger = logging.getLogger(__name__)
24
25
26def signal_handler(signum, frame):
27 global shutdown_requested
28 logger.info("Shutdown signal received")
29 shutdown_requested = True
30
31
32async def consume(
33 bootstrap_servers: Optional[str],
34 topic: Optional[str],
35 group_id: Optional[str],
36):
37 kafka_servers = bootstrap_servers or CONFIG.kafka_bootstrap_servers
38 kafka_topic = topic or CONFIG.kafka_topic
39 kafka_group = group_id or CONFIG.kafka_group_id
40 kafka_offset = CONFIG.kafka_auto_offset_reset
41
42 consumer = AIOKafkaConsumer(
43 kafka_topic,
44 bootstrap_servers=kafka_servers,
45 group_id=kafka_group,
46 auto_offset_reset=kafka_offset,
47 enable_auto_commit=True,
48 auto_commit_interval_ms=5000,
49 session_timeout_ms=30000,
50 max_poll_interval_ms=300000,
51 value_deserializer=lambda m: json.loads(m.decode("utf-8")),
52 )
53
54 indexer = Indexer()
55
56 try:
57 await consumer.start()
58 logger.info("Connected to Kafka")
59
60 async for message in consumer:
61 if shutdown_requested:
62 break
63
64 try:
65 event = AtKafkaEvent.model_validate(message.value)
66 await indexer.handle_event(event)
67 except Exception as e:
68 logger.error(f"Error processing message: {e}")
69 except Exception as e:
70 logger.error(f"Error in consumer: {e}")
71 finally:
72 logger.info("Shutting Kafka consumer down...")
73 await consumer.stop()
74
75
76@click.command()
77@click.option(
78 "--bootstrap-servers",
79 default=None,
80)
81@click.option(
82 "--topic",
83 default=None,
84)
85@click.option(
86 "--group-id",
87 default=None,
88)
89def main(
90 bootstrap_servers: Optional[str],
91 topic: Optional[str],
92 group_id: Optional[str],
93):
94 signal.signal(signal.SIGINT, signal_handler)
95 signal.signal(signal.SIGTERM, signal_handler)
96
97 prom_metrics.start_http(CONFIG.metrics_port, CONFIG.metrics_host)
98
99 EMBEDDING_SERVICE.initialize()
100
101 QDRANT_SERVICE.initialize()
102
103 try:
104 asyncio.run(
105 consume(bootstrap_servers=bootstrap_servers, topic=topic, group_id=group_id)
106 )
107 except KeyboardInterrupt:
108 logger.info("Keyboard interrupt received")
109 except Exception as e:
110 logger.error(f"Error in Kafka consumer: {e}", exc_info=True)
111 sys.exit(1)
112 finally:
113 logger.info("Closing Kafka consumer...")
114 consumer.close()
115
116
117if __name__ == "__main__":
118 main()