A lil service that creates embeddings of posts, profiles, and avatars to store them in Qdrant

add prom metrics

+38 -6
database.py
··· 1 - from dataclasses import dataclass 2 - from datetime import datetime, timezone 3 1 import logging 4 2 import sys 3 + import uuid 4 + from dataclasses import dataclass 5 + from datetime import datetime, timezone 6 + from time import time 5 7 from typing import List, Optional 6 - import uuid 7 8 9 + from qdrant_client import QdrantClient 8 10 from qdrant_client.grpc import OptimizersConfigDiff 9 11 from qdrant_client.http.models import BinaryQuantizationConfig 10 - 11 - from config import CONFIG 12 - from qdrant_client import QdrantClient 13 12 from qdrant_client.models import ( 14 13 BinaryQuantization, 15 14 Distance, ··· 25 24 ScalarType, 26 25 VectorParams, 27 26 ) 27 + 28 + from config import CONFIG 29 + from metrics import prom_metrics 28 30 29 31 logger = logging.getLogger(__name__) 30 32 ··· 190 192 logger.info("Collection created successfully") 191 193 192 194 def upsert_profile(self, did: str, description: str, vector: List[float]): 195 + status = "error" 196 + start_time = time() 197 + 193 198 try: 194 199 payload = { 195 200 "did": did, ··· 220 225 points=[point], 221 226 ) 222 227 228 + status = "ok" 229 + 223 230 return True 224 231 except Exception as e: 225 232 logger.error(f"Error upserting profile: {e}") 226 233 return False 234 + finally: 235 + prom_metrics.upserts.labels(kind="profile", status=status).inc() 236 + prom_metrics.upsert_duration.labels(kind="profile", status=status).observe( 237 + time() - start_time 238 + ) 227 239 228 240 def upsert_avatar(self, did: str, cid: str, vector: List[float]): 241 + status = "error" 242 + start_time = time() 243 + 229 244 try: 230 245 payload = { 231 246 "did": did, ··· 255 270 collection_name=self.avatar_collection_name, 256 271 points=[point], 257 272 ) 273 + 274 + status = "ok" 258 275 259 276 return True 260 277 except Exception as e: 261 278 logger.error(f"Error upserting avatar: {e}") 262 279 return False 280 + finally: 281 + prom_metrics.upserts.labels(kind="avatar", status=status).inc() 282 + prom_metrics.upsert_duration.labels(kind="avatar", status=status).observe( 283 + time() - start_time 284 + ) 263 285 264 286 def upsert_post(self, did: str, uri: str, text: str, vector: List[float]): 287 + status = "error" 288 + start_time = time() 289 + 265 290 word_ct = len(text.split()) 266 291 267 292 try: ··· 287 312 points=[point], 288 313 ) 289 314 315 + status = "ok" 316 + 290 317 return True 291 318 except Exception as e: 292 319 logger.error(f"Error upserting post: {e}") 293 320 return False 321 + finally: 322 + prom_metrics.upserts.labels(kind="avatar", status=status).inc() 323 + prom_metrics.upsert_duration.labels(kind="avatar", status=status).observe( 324 + time() - start_time 325 + ) 294 326 295 327 def search_similar( 296 328 self,
+16 -2
embedder.py
··· 1 1 import logging 2 2 from typing import List 3 + from time import time 3 4 4 5 from sentence_transformers import SentenceTransformer 5 6 import torch 6 7 7 8 from config import CONFIG 9 + from metrics import prom_metrics 8 10 9 11 10 12 logger = logging.getLogger(__name__) ··· 35 37 if not text or not text.strip(): 36 38 return [0.0] * CONFIG.embedding_size 37 39 38 - vector = self.model.encode(text, convert_to_numpy=True) 39 - return vector.tolist() 40 + status = "error" 41 + start_time = time() 42 + try: 43 + vector = self.model.encode(text, convert_to_numpy=True) 44 + status = "ok" 45 + return vector.tolist() 46 + except Exception as e: 47 + logger.error(f"Error getting embedding: {e}") 48 + raise e 49 + finally: 50 + prom_metrics.embedding_performed.labels(status=status).inc() 51 + prom_metrics.embedding_duration.labels(status=status).observe( 52 + time() - start_time 53 + ) 40 54 41 55 42 56 EMBEDDING_SERVICE = EmbeddingService()
+58 -43
indexer.py
··· 4 4 from embedder import EMBEDDING_SERVICE 5 5 from retina import RETINA_CLIENT, binary_to_float_vector, hex_to_binary 6 6 from models import AtKafkaEvent 7 + from metrics import prom_metrics 7 8 8 9 9 10 logger = logging.getLogger(__name__) ··· 29 30 record = event.operation.record 30 31 31 32 description = record.get("description") 32 - if not isinstance(description, str): 33 - return 33 + if isinstance(description, str) and description and description.strip(): 34 + status = "error" 34 35 35 - if not description or not description.strip(): 36 - return 36 + try: 37 + vector = EMBEDDING_SERVICE.encode(description) 37 38 38 - vector = EMBEDDING_SERVICE.encode(description) 39 + QDRANT_SERVICE.upsert_profile( 40 + did=event.did, description=description, vector=vector 41 + ) 39 42 40 - QDRANT_SERVICE.upsert_profile( 41 - did=event.did, description=description, vector=vector 42 - ) 43 + status = "ok" 44 + except Exception as e: 45 + logger.error(f"Error handling profile: {e}") 46 + finally: 47 + prom_metrics.events_handled.labels(kind="profile", status=status).inc() 43 48 44 - try: 45 - avatar = record.get("avatar") 49 + avatar = record.get("avatar") 50 + if isinstance(avatar, dict): 51 + status = "error" 52 + try: 53 + ref = avatar.get("ref") 46 54 47 - if not isinstance(avatar, dict): 48 - return 55 + if not isinstance(ref, dict): 56 + return 49 57 50 - ref = avatar.get("ref") 58 + link = ref.get("$link") 59 + if not isinstance(link, str): 60 + return 51 61 52 - if not isinstance(ref, dict): 53 - return 62 + resp = RETINA_CLIENT.get_image_hash(event.did, link) 54 63 55 - link = ref.get("$link") 56 - if not isinstance(link, str): 57 - return 64 + if resp.quality_too_low: 65 + logger.info("avatar quality was too low") 66 + return 58 67 59 - resp = RETINA_CLIENT.get_image_hash(event.did, link) 60 - except Exception as e: 61 - logger.error(f"Failed to get avatar hash: {e}") 62 - return 68 + if not resp.hash: 69 + logger.error("no hash in response") 70 + return 63 71 64 - if resp.quality_too_low: 65 - logger.info("avatar quality was too low") 66 - return 72 + bin = hex_to_binary(resp.hash) 73 + vector = binary_to_float_vector(bin) 67 74 68 - if not resp.hash: 69 - logger.error("no hash in response") 70 - return 75 + QDRANT_SERVICE.upsert_avatar( 76 + did=event.did, 77 + cid=link, 78 + vector=vector, 79 + ) 71 80 72 - bin = hex_to_binary(resp.hash) 73 - vector = binary_to_float_vector(bin) 74 - 75 - QDRANT_SERVICE.upsert_avatar( 76 - did=event.did, 77 - cid=link, 78 - vector=vector, 79 - ) 81 + status = "ok" 82 + except Exception as e: 83 + logger.error(f"Failed to get avatar hash: {e}") 84 + finally: 85 + prom_metrics.events_handled.labels(kind="avatar", status=status).inc() 80 86 81 87 def _handle_post(self, event: AtKafkaEvent): 82 88 text = event.operation.record.get("text") ··· 86 92 if not text or not text.strip(): 87 93 return 88 94 89 - vector = EMBEDDING_SERVICE.encode(text) 95 + status = "error" 96 + 97 + try: 98 + vector = EMBEDDING_SERVICE.encode(text) 99 + 100 + QDRANT_SERVICE.upsert_post( 101 + did=event.did, 102 + uri=f"at://{event.did}/app.bsky.feed.post/{event.operation.rkey}", 103 + text=text, 104 + vector=vector, 105 + ) 90 106 91 - QDRANT_SERVICE.upsert_post( 92 - did=event.did, 93 - uri=f"at://{event.did}/app.bsky.feed.post/{event.operation.rkey}", 94 - text=text, 95 - vector=vector, 96 - ) 107 + status = "ok" 108 + except Exception as e: 109 + logger.error(f"Error handling post: {e}") 110 + finally: 111 + prom_metrics.events_handled.labels(kind="post", status=status).inc()
+6 -3
main.py
··· 1 - import logging 2 1 import asyncio 3 - import sys 4 - import signal 5 2 import json 3 + import logging 4 + import signal 5 + import sys 6 6 from typing import Optional 7 7 8 8 import click ··· 12 12 from database import QDRANT_SERVICE 13 13 from embedder import EMBEDDING_SERVICE 14 14 from indexer import Indexer 15 + from metrics import prom_metrics 15 16 from models import AtKafkaEvent 16 17 17 18 shutdown_requested = False ··· 92 93 ): 93 94 signal.signal(signal.SIGINT, signal_handler) 94 95 signal.signal(signal.SIGTERM, signal_handler) 96 + 97 + prom_metrics.start_http(8500, "0.0.0.0") 95 98 96 99 EMBEDDING_SERVICE.initialize() 97 100
+95
metrics.py
··· 1 + import logging 2 + from os import name 3 + 4 + from prometheus_client import Counter, Histogram, start_http_server 5 + 6 + NAMESPACE = "skyembed" 7 + 8 + logger = logging.getLogger(__name__) 9 + 10 + 11 + class PromMetrics: 12 + _instance = None 13 + 14 + def __new__(cls): 15 + if cls._instance is None: 16 + cls._instance = super().__new__(cls) 17 + cls._instance._initialized = False 18 + return cls._instance 19 + 20 + def __init__(self): 21 + if self._initialized: 22 + return 23 + 24 + self.embedding_performed = Counter( 25 + name="requests", 26 + namespace=NAMESPACE, 27 + documentation="Number of embeddings performed", 28 + labelnames=["status"], 29 + ) 30 + 31 + self.embedding_duration = Histogram( 32 + name="embedding_duration_seconds", 33 + namespace=NAMESPACE, 34 + buckets=( 35 + 0.001, 36 + 0.005, 37 + 0.01, 38 + 0.025, 39 + 0.05, 40 + 0.1, 41 + 0.25, 42 + 0.5, 43 + 1.0, 44 + 2.5, 45 + 5.0, 46 + 10.0, 47 + ), 48 + labelnames=["status"], 49 + documentation="Time taken to create an embedding", 50 + ) 51 + 52 + self.events_handled = Counter( 53 + name="events_handled", 54 + namespace=NAMESPACE, 55 + documentation="Number of events handled", 56 + labelnames=["kind", "status"], 57 + ) 58 + 59 + self.upserts = Counter( 60 + name="upserts", 61 + namespace=NAMESPACE, 62 + documentation="Number of database upserts", 63 + labelnames=["kind", "status"], 64 + ) 65 + 66 + self.upsert_duration = Histogram( 67 + name="upsert_duration_seconds", 68 + namespace=NAMESPACE, 69 + buckets=( 70 + 0.001, 71 + 0.005, 72 + 0.01, 73 + 0.025, 74 + 0.05, 75 + 0.1, 76 + 0.25, 77 + 0.5, 78 + 1.0, 79 + 2.5, 80 + 5.0, 81 + 10.0, 82 + ), 83 + labelnames=["kind", "status"], 84 + documentation="Time taken to perform an upsert", 85 + ) 86 + 87 + self._initialized = True 88 + 89 + def start_http(self, port: int, addr: str = "0.0.0.0"): 90 + logger.info(f"Starting Prometheus client on {addr}:{port}") 91 + start_http_server(port=port, addr=addr) 92 + logger.info(f"Prometheus client running on {addr}:{port}") 93 + 94 + 95 + prom_metrics = PromMetrics()
+1
pyproject.toml
··· 8 8 "aiokafka>=0.12.0", 9 9 "click>=8.3.1", 10 10 "confluent-kafka>=2.12.2", 11 + "prometheus-client>=0.23.1", 11 12 "pydantic>=2.12.5", 12 13 "pydantic-settings>=2.12.0", 13 14 "python-snappy>=0.7.3",
+11
uv.lock
··· 776 776 ] 777 777 778 778 [[package]] 779 + name = "prometheus-client" 780 + version = "0.23.1" 781 + source = { registry = "https://pypi.org/simple" } 782 + sdist = { url = "https://files.pythonhosted.org/packages/23/53/3edb5d68ecf6b38fcbcc1ad28391117d2a322d9a1a3eff04bfdb184d8c3b/prometheus_client-0.23.1.tar.gz", hash = "sha256:6ae8f9081eaaaf153a2e959d2e6c4f4fb57b12ef76c8c7980202f1e57b48b2ce", size = 80481, upload-time = "2025-09-18T20:47:25.043Z" } 783 + wheels = [ 784 + { url = "https://files.pythonhosted.org/packages/b8/db/14bafcb4af2139e046d03fd00dea7873e48eafe18b7d2797e73d6681f210/prometheus_client-0.23.1-py3-none-any.whl", hash = "sha256:dd1913e6e76b59cfe44e7a4b83e01afc9873c1bdfd2ed8739f1e76aeca115f99", size = 61145, upload-time = "2025-09-18T20:47:23.875Z" }, 785 + ] 786 + 787 + [[package]] 779 788 name = "protobuf" 780 789 version = "6.33.2" 781 790 source = { registry = "https://pypi.org/simple" } ··· 1268 1277 { name = "aiokafka" }, 1269 1278 { name = "click" }, 1270 1279 { name = "confluent-kafka" }, 1280 + { name = "prometheus-client" }, 1271 1281 { name = "pydantic" }, 1272 1282 { name = "pydantic-settings" }, 1273 1283 { name = "python-snappy" }, ··· 1284 1294 { name = "aiokafka", specifier = ">=0.12.0" }, 1285 1295 { name = "click", specifier = ">=8.3.1" }, 1286 1296 { name = "confluent-kafka", specifier = ">=2.12.2" }, 1297 + { name = "prometheus-client", specifier = ">=0.23.1" }, 1287 1298 { name = "pydantic", specifier = ">=2.12.5" }, 1288 1299 { name = "pydantic-settings", specifier = ">=2.12.0" }, 1289 1300 { name = "python-snappy", specifier = ">=0.7.3" },