import logging from database import QDRANT_SERVICE from embedder import EMBEDDING_SERVICE from retina import RETINA_CLIENT, binary_to_float_vector, hex_to_binary from models import AtKafkaEvent from metrics import prom_metrics logger = logging.getLogger(__name__) class Indexer: def __init__(self) -> None: pass async def handle_event(self, event: AtKafkaEvent): if not event.operation: return if event.operation.action == "delete": return if event.operation.collection == "app.bsky.actor.profile": self._handle_profile(event) elif event.operation.collection == "app.bsky.feed.post": self._handle_post(event) def _handle_profile(self, event: AtKafkaEvent): record = event.operation.record description = record.get("description") if isinstance(description, str) and description and description.strip(): status = "error" try: vector = EMBEDDING_SERVICE.encode(description) QDRANT_SERVICE.upsert_profile( did=event.did, description=description, vector=vector ) status = "ok" except Exception as e: logger.error(f"Error handling profile: {e}") finally: prom_metrics.events_handled.labels(kind="profile", status=status).inc() avatar = record.get("avatar") if isinstance(avatar, dict): status = "error" try: ref = avatar.get("ref") if not isinstance(ref, dict): return link = ref.get("$link") if not isinstance(link, str): return resp = RETINA_CLIENT.get_image_hash(event.did, link) if resp.quality_too_low: logger.info("avatar quality was too low") return if not resp.hash: logger.error("no hash in response") return bin = hex_to_binary(resp.hash) vector = binary_to_float_vector(bin) QDRANT_SERVICE.upsert_avatar( did=event.did, cid=link, vector=vector, ) status = "ok" except Exception as e: logger.error(f"Failed to get avatar hash: {e}") finally: prom_metrics.events_handled.labels(kind="avatar", status=status).inc() def _handle_post(self, event: AtKafkaEvent): text = event.operation.record.get("text") if not isinstance(text, str): return if not text or not text.strip(): return status = "error" try: vector = EMBEDDING_SERVICE.encode(text) QDRANT_SERVICE.upsert_post( did=event.did, uri=f"at://{event.did}/app.bsky.feed.post/{event.operation.rkey}", text=text, vector=vector, ) status = "ok" except Exception as e: logger.error(f"Error handling post: {e}") finally: prom_metrics.events_handled.labels(kind="post", status=status).inc()