this repo has no description
1import asyncio
2import logging
3from datetime import datetime, timedelta
4from typing import Any, Dict, Optional
5
6import httpx
7
8from config import Config
9
10logger = logging.getLogger(__name__)
11
12
13class OzoneClient:
14 """
15 An Ozone client that I took out of Osprey, but uses asyncio instead of gevent for
16 concurrency. In particular, it will manage a session and keep it refreshed. Removed
17 some of the features like repo fetching since we don't need them here.
18 """
19
20 _instance: Optional["OzoneClient"] = None
21 _init_lock = asyncio.Lock()
22
23 def __init__(self, config: Config):
24 self._session_lock = asyncio.Lock()
25
26 self._access_jwt: Optional[str] = None
27 self._refresh_jwt: Optional[str] = None
28
29 self._last_refresh: Optional[datetime] = None
30 self._refresh_interval = timedelta(minutes=30)
31
32 self._did = config.ozone_did
33 self._identifier = config.ozone_identifier
34 self._password = config.ozone_password
35 self._pds_url = config.ozone_pds_url
36 self._request_timeout = 30.0
37
38 self._http_client: Optional[httpx.AsyncClient] = None
39
40 if not self._identifier or not self._password:
41 raise ValueError("OZONE_IDENTIFIER and OZONE_PASSWORD must be set")
42
43 async def _ensure_http_client(self) -> httpx.AsyncClient:
44 if self._http_client is None:
45 self._http_client = httpx.AsyncClient(timeout=self._request_timeout)
46 return self._http_client
47
48 @classmethod
49 async def get_instance(cls, config: Config) -> "OzoneClient":
50 if cls._instance is None:
51 async with cls._init_lock:
52 if cls._instance is None:
53 cls._instance = cls(config)
54 await cls._instance._create_session()
55 return cls._instance
56
57 async def _create_session(self):
58 async with self._session_lock:
59 try:
60 client = await self._ensure_http_client()
61 resp = await client.post(
62 f"{self._pds_url}/xrpc/com.atproto.server.createSession",
63 json={
64 "identifier": self._identifier,
65 "password": self._password,
66 },
67 )
68 resp.raise_for_status()
69 data = resp.json()
70
71 self._access_jwt = data["accessJwt"]
72 self._refresh_jwt = data["refreshJwt"]
73 self._did = data["did"]
74 self._last_refresh = datetime.now()
75
76 logger.info(f"Created Bluesky session for DID: {self._did}")
77 except Exception as e:
78 logger.error(f"Failed to create Bluesky session: {e}")
79 raise
80
81 async def _refresh_session(self):
82 async with self._session_lock:
83 try:
84 client = await self._ensure_http_client()
85 resp = await client.post(
86 f"{self._pds_url}/xrpc/com.atproto.server.refreshSession",
87 headers={"authorization": f"Bearer {self._refresh_jwt}"},
88 )
89 resp.raise_for_status()
90 data = resp.json()
91
92 self._access_jwt = data["accessJwt"]
93 self._refresh_jwt = data["refreshJwt"]
94 self._last_refresh = datetime.now()
95
96 logger.debug(f"Refreshed Bluesky session for DID: {self._did}")
97 except Exception as e:
98 logger.error(f"Failed to refresh session: {e}")
99 logger.info("Attempting to create new session after refresh failure")
100 await self._create_session()
101
102 async def _ensure_valid_session(self):
103 if (
104 self._last_refresh is None
105 or datetime.now() - self._last_refresh >= self._refresh_interval
106 ):
107 await self._refresh_session()
108
109 async def get_auth_headers(self) -> Dict[str, str]:
110 await self._ensure_valid_session()
111
112 return {
113 "accept": "*/*",
114 "content-type": "application/json",
115 "authorization": f"Bearer {self._access_jwt}",
116 }
117
118 async def _get_moderation_headers(self) -> Dict[str, str]:
119 await self._ensure_valid_session()
120
121 labeler_did = self._did
122
123 return {
124 "accept": "*/*",
125 "content-type": "application/json",
126 "authorization": f"Bearer {self._access_jwt}",
127 "atproto-accept-labelers": f"{labeler_did};redact",
128 "atproto-proxy": f"{labeler_did}#atproto_labeler",
129 }
130
131 def get_did(self) -> Optional[str]:
132 return self._did
133
134 def get_pds_url(self) -> str:
135 return self._pds_url
136
137 async def add_or_remove_label(
138 self,
139 action_id: int,
140 entity_id: str,
141 label: str,
142 neg: bool,
143 comment: Optional[str] = None,
144 expiration_in_hours: Optional[int] = None,
145 cid: Optional[str] = None,
146 ):
147 try:
148 subject: Dict[str, str] = {}
149 if entity_id.startswith("did:"):
150 subject["$type"] = "com.atproto.admin.defs#repoRef"
151 subject["did"] = entity_id
152 elif entity_id.startswith("at://"):
153 subject["$type"] = "com.atproto.repo.strongRef"
154 subject["cid"] = cid or ""
155 subject["uri"] = entity_id
156 else:
157 raise ValueError(f"Invalid entity_id format: {entity_id}")
158
159 payload: Dict[str, Any] = {
160 "subject": subject,
161 "createdBy": self._did,
162 "subjectBlobCids": [],
163 "event": {
164 "$type": "tools.ozone.moderation.defs#modEventLabel",
165 "comment": comment or "",
166 "createLabelVals": [label] if not neg else [],
167 "negateLabelVals": [label] if neg else [],
168 "durationInHours": expiration_in_hours or 0,
169 },
170 "modTool": {"name": "osprey", "meta": {"actionId": str(action_id)}},
171 }
172
173 headers = await self._get_moderation_headers()
174 client = await self._ensure_http_client()
175
176 resp = await client.post(
177 f"{self._pds_url}/xrpc/tools.ozone.moderation.emitEvent",
178 headers=headers,
179 json=payload,
180 )
181 resp.raise_for_status()
182
183 action = "removed" if neg else "added"
184 logger.info(f"Successfully {action} label '{label}' for {entity_id}")
185
186 except Exception as e:
187 logger.error(f"Failed to emit label event for {entity_id}: {e}")
188 raise
189
190 async def close(self):
191 """Close the HTTP client. Call this when shutting down."""
192 if self._http_client is not None:
193 await self._http_client.aclose()
194 self._http_client = None
195 logger.info("Closed OzoneClient HTTP connection")