A lil service that creates embeddings of posts, profiles, and avatars to store them in Qdrant
at main 3.3 kB view raw
1import logging 2 3from database import QDRANT_SERVICE 4from embedder import EMBEDDING_SERVICE 5from retina import RETINA_CLIENT, binary_to_float_vector, hex_to_binary 6from models import AtKafkaEvent 7from metrics import prom_metrics 8 9 10logger = logging.getLogger(__name__) 11 12 13class Indexer: 14 def __init__(self) -> None: 15 pass 16 17 async def handle_event(self, event: AtKafkaEvent): 18 if not event.operation: 19 return 20 21 if event.operation.action == "delete": 22 return 23 24 if event.operation.collection == "app.bsky.actor.profile": 25 self._handle_profile(event) 26 elif event.operation.collection == "app.bsky.feed.post": 27 self._handle_post(event) 28 29 def _handle_profile(self, event: AtKafkaEvent): 30 record = event.operation.record 31 32 description = record.get("description") 33 if isinstance(description, str) and description and description.strip(): 34 status = "error" 35 36 try: 37 vector = EMBEDDING_SERVICE.encode(description) 38 39 QDRANT_SERVICE.upsert_profile( 40 did=event.did, description=description, vector=vector 41 ) 42 43 status = "ok" 44 except Exception as e: 45 logger.error(f"Error handling profile: {e}") 46 finally: 47 prom_metrics.events_handled.labels(kind="profile", status=status).inc() 48 49 avatar = record.get("avatar") 50 if isinstance(avatar, dict): 51 status = "error" 52 try: 53 ref = avatar.get("ref") 54 55 if not isinstance(ref, dict): 56 return 57 58 link = ref.get("$link") 59 if not isinstance(link, str): 60 return 61 62 resp = RETINA_CLIENT.get_image_hash(event.did, link) 63 64 if resp.quality_too_low: 65 logger.info("avatar quality was too low") 66 return 67 68 if not resp.hash: 69 logger.error("no hash in response") 70 return 71 72 bin = hex_to_binary(resp.hash) 73 vector = binary_to_float_vector(bin) 74 75 QDRANT_SERVICE.upsert_avatar( 76 did=event.did, 77 cid=link, 78 vector=vector, 79 ) 80 81 status = "ok" 82 except Exception as e: 83 logger.error(f"Failed to get avatar hash: {e}") 84 finally: 85 prom_metrics.events_handled.labels(kind="avatar", status=status).inc() 86 87 def _handle_post(self, event: AtKafkaEvent): 88 text = event.operation.record.get("text") 89 if not isinstance(text, str): 90 return 91 92 if not text or not text.strip(): 93 return 94 95 status = "error" 96 97 try: 98 vector = EMBEDDING_SERVICE.encode(text) 99 100 QDRANT_SERVICE.upsert_post( 101 did=event.did, 102 uri=f"at://{event.did}/app.bsky.feed.post/{event.operation.rkey}", 103 text=text, 104 vector=vector, 105 ) 106 107 status = "ok" 108 except Exception as e: 109 logger.error(f"Error handling post: {e}") 110 finally: 111 prom_metrics.events_handled.labels(kind="post", status=status).inc()