1import logging
2
3from database import QDRANT_SERVICE
4from embedder import EMBEDDING_SERVICE
5from retina import RETINA_CLIENT, binary_to_float_vector, hex_to_binary
6from models import AtKafkaEvent
7from metrics import prom_metrics
8
9
10logger = logging.getLogger(__name__)
11
12
13class Indexer:
14 def __init__(self) -> None:
15 pass
16
17 async def handle_event(self, event: AtKafkaEvent):
18 if not event.operation:
19 return
20
21 if event.operation.action == "delete":
22 return
23
24 if event.operation.collection == "app.bsky.actor.profile":
25 self._handle_profile(event)
26 elif event.operation.collection == "app.bsky.feed.post":
27 self._handle_post(event)
28
29 def _handle_profile(self, event: AtKafkaEvent):
30 record = event.operation.record
31
32 description = record.get("description")
33 if isinstance(description, str) and description and description.strip():
34 status = "error"
35
36 try:
37 vector = EMBEDDING_SERVICE.encode(description)
38
39 QDRANT_SERVICE.upsert_profile(
40 did=event.did, description=description, vector=vector
41 )
42
43 status = "ok"
44 except Exception as e:
45 logger.error(f"Error handling profile: {e}")
46 finally:
47 prom_metrics.events_handled.labels(kind="profile", status=status).inc()
48
49 avatar = record.get("avatar")
50 if isinstance(avatar, dict):
51 status = "error"
52 try:
53 ref = avatar.get("ref")
54
55 if not isinstance(ref, dict):
56 return
57
58 link = ref.get("$link")
59 if not isinstance(link, str):
60 return
61
62 resp = RETINA_CLIENT.get_image_hash(event.did, link)
63
64 if resp.quality_too_low:
65 logger.info("avatar quality was too low")
66 return
67
68 if not resp.hash:
69 logger.error("no hash in response")
70 return
71
72 bin = hex_to_binary(resp.hash)
73 vector = binary_to_float_vector(bin)
74
75 QDRANT_SERVICE.upsert_avatar(
76 did=event.did,
77 cid=link,
78 vector=vector,
79 )
80
81 status = "ok"
82 except Exception as e:
83 logger.error(f"Failed to get avatar hash: {e}")
84 finally:
85 prom_metrics.events_handled.labels(kind="avatar", status=status).inc()
86
87 def _handle_post(self, event: AtKafkaEvent):
88 text = event.operation.record.get("text")
89 if not isinstance(text, str):
90 return
91
92 if not text or not text.strip():
93 return
94
95 status = "error"
96
97 try:
98 vector = EMBEDDING_SERVICE.encode(text)
99
100 QDRANT_SERVICE.upsert_post(
101 did=event.did,
102 uri=f"at://{event.did}/app.bsky.feed.post/{event.operation.rkey}",
103 text=text,
104 vector=vector,
105 )
106
107 status = "ok"
108 except Exception as e:
109 logger.error(f"Error handling post: {e}")
110 finally:
111 prom_metrics.events_handled.labels(kind="post", status=status).inc()