this repo has no description
at main 7.0 kB view raw
1import asyncio 2import logging 3from datetime import datetime, timedelta 4from typing import Any, Dict, Optional 5 6import httpx 7 8from config import Config 9 10logger = logging.getLogger(__name__) 11 12 13class OzoneClient: 14 """ 15 An Ozone client that I took out of Osprey, but uses asyncio instead of gevent for 16 concurrency. In particular, it will manage a session and keep it refreshed. Removed 17 some of the features like repo fetching since we don't need them here. 18 """ 19 20 _instance: Optional["OzoneClient"] = None 21 _init_lock = asyncio.Lock() 22 23 def __init__(self, config: Config): 24 self._session_lock = asyncio.Lock() 25 26 self._access_jwt: Optional[str] = None 27 self._refresh_jwt: Optional[str] = None 28 29 self._last_refresh: Optional[datetime] = None 30 self._refresh_interval = timedelta(minutes=30) 31 32 self._did = config.ozone_did 33 self._identifier = config.ozone_identifier 34 self._password = config.ozone_password 35 self._pds_url = config.ozone_pds_url 36 self._request_timeout = 30.0 37 38 self._http_client: Optional[httpx.AsyncClient] = None 39 40 if not self._identifier or not self._password: 41 raise ValueError("OZONE_IDENTIFIER and OZONE_PASSWORD must be set") 42 43 async def _ensure_http_client(self) -> httpx.AsyncClient: 44 if self._http_client is None: 45 self._http_client = httpx.AsyncClient(timeout=self._request_timeout) 46 return self._http_client 47 48 @classmethod 49 async def get_instance(cls, config: Config) -> "OzoneClient": 50 if cls._instance is None: 51 async with cls._init_lock: 52 if cls._instance is None: 53 cls._instance = cls(config) 54 await cls._instance._create_session() 55 return cls._instance 56 57 async def _create_session(self): 58 async with self._session_lock: 59 try: 60 client = await self._ensure_http_client() 61 resp = await client.post( 62 f"{self._pds_url}/xrpc/com.atproto.server.createSession", 63 json={ 64 "identifier": self._identifier, 65 "password": self._password, 66 }, 67 ) 68 resp.raise_for_status() 69 data = resp.json() 70 71 self._access_jwt = data["accessJwt"] 72 self._refresh_jwt = data["refreshJwt"] 73 self._did = data["did"] 74 self._last_refresh = datetime.now() 75 76 logger.info(f"Created Bluesky session for DID: {self._did}") 77 except Exception as e: 78 logger.error(f"Failed to create Bluesky session: {e}") 79 raise 80 81 async def _refresh_session(self): 82 async with self._session_lock: 83 try: 84 client = await self._ensure_http_client() 85 resp = await client.post( 86 f"{self._pds_url}/xrpc/com.atproto.server.refreshSession", 87 headers={"authorization": f"Bearer {self._refresh_jwt}"}, 88 ) 89 resp.raise_for_status() 90 data = resp.json() 91 92 self._access_jwt = data["accessJwt"] 93 self._refresh_jwt = data["refreshJwt"] 94 self._last_refresh = datetime.now() 95 96 logger.debug(f"Refreshed Bluesky session for DID: {self._did}") 97 except Exception as e: 98 logger.error(f"Failed to refresh session: {e}") 99 logger.info("Attempting to create new session after refresh failure") 100 await self._create_session() 101 102 async def _ensure_valid_session(self): 103 if ( 104 self._last_refresh is None 105 or datetime.now() - self._last_refresh >= self._refresh_interval 106 ): 107 await self._refresh_session() 108 109 async def get_auth_headers(self) -> Dict[str, str]: 110 await self._ensure_valid_session() 111 112 return { 113 "accept": "*/*", 114 "content-type": "application/json", 115 "authorization": f"Bearer {self._access_jwt}", 116 } 117 118 async def _get_moderation_headers(self) -> Dict[str, str]: 119 await self._ensure_valid_session() 120 121 labeler_did = self._did 122 123 return { 124 "accept": "*/*", 125 "content-type": "application/json", 126 "authorization": f"Bearer {self._access_jwt}", 127 "atproto-accept-labelers": f"{labeler_did};redact", 128 "atproto-proxy": f"{labeler_did}#atproto_labeler", 129 } 130 131 def get_did(self) -> Optional[str]: 132 return self._did 133 134 def get_pds_url(self) -> str: 135 return self._pds_url 136 137 async def add_or_remove_label( 138 self, 139 action_id: int, 140 entity_id: str, 141 label: str, 142 neg: bool, 143 comment: Optional[str] = None, 144 expiration_in_hours: Optional[int] = None, 145 cid: Optional[str] = None, 146 ): 147 try: 148 subject: Dict[str, str] = {} 149 if entity_id.startswith("did:"): 150 subject["$type"] = "com.atproto.admin.defs#repoRef" 151 subject["did"] = entity_id 152 elif entity_id.startswith("at://"): 153 subject["$type"] = "com.atproto.repo.strongRef" 154 subject["cid"] = cid or "" 155 subject["uri"] = entity_id 156 else: 157 raise ValueError(f"Invalid entity_id format: {entity_id}") 158 159 payload: Dict[str, Any] = { 160 "subject": subject, 161 "createdBy": self._did, 162 "subjectBlobCids": [], 163 "event": { 164 "$type": "tools.ozone.moderation.defs#modEventLabel", 165 "comment": comment or "", 166 "createLabelVals": [label] if not neg else [], 167 "negateLabelVals": [label] if neg else [], 168 "durationInHours": expiration_in_hours or 0, 169 }, 170 "modTool": {"name": "osprey", "meta": {"actionId": str(action_id)}}, 171 } 172 173 headers = await self._get_moderation_headers() 174 client = await self._ensure_http_client() 175 176 resp = await client.post( 177 f"{self._pds_url}/xrpc/tools.ozone.moderation.emitEvent", 178 headers=headers, 179 json=payload, 180 ) 181 resp.raise_for_status() 182 183 action = "removed" if neg else "added" 184 logger.info(f"Successfully {action} label '{label}' for {entity_id}") 185 186 except Exception as e: 187 logger.error(f"Failed to emit label event for {entity_id}: {e}") 188 raise 189 190 async def close(self): 191 """Close the HTTP client. Call this when shutting down.""" 192 if self._http_client is not None: 193 await self._http_client.aclose() 194 self._http_client = None 195 logger.info("Closed OzoneClient HTTP connection")