A trust and safety agent that interacts with Osprey for investigation, real-time analysis, and prevention implementations
at main 372 lines 14 kB view raw
1import asyncio 2import json 3import logging 4import os 5from pathlib import Path 6from typing import Any 7 8from src.tools.registry import ToolContext, ToolRegistry 9 10logger = logging.getLogger(__name__) 11 12DENO_DIR = Path(__file__).parent / "deno" 13 14# security limits for deno execution 15MAX_CODE_SIZE = 50_000 # max input code size in characters 16MAX_TOOL_CALLS = 25 # max number of tool calls per execution 17MAX_OUTPUT_SIZE = 1_000_000 # max total output size in bytes 18MAX_EXECUTION_TIME = 60.0 # total wall-clock timeout in seconds 19DENO_MEMORY_LIMIT_MB = 256 # v8 heap limit 20 21 22class ToolExecutor: 23 """executor that runs Typescript code in a deno subprocess""" 24 25 def __init__(self, registry: ToolRegistry, ctx: ToolContext) -> None: 26 self._registry = registry 27 self._ctx = ctx 28 self._database_schema: str | None = None 29 self._osprey_features: str | None = None 30 self._osprey_labels: str | None = None 31 self._osprey_udfs: str | None = None 32 self._osprey_rule_files: str | None = None 33 self._tool_definition: dict[str, Any] | None = None 34 35 async def initialize(self) -> None: 36 # prefetch data for inclusion in the tool description, so that the agent doesn't waste tool calls on discovery 37 try: 38 schema = await self._registry.execute(self._ctx, "clickhouse.getSchema", {}) 39 lines = [f" {col['name']} ({col['type']})" for col in schema] 40 self._database_schema = "\n".join(lines) 41 logger.info("Prefetched database schema (%d columns)", len(schema)) 42 except Exception: 43 logger.warning("Failed to prefetch database schema", exc_info=True) 44 45 try: 46 config = await self._registry.execute(self._ctx, "osprey.getConfig", {}) 47 feature_lines = [ 48 f" {name}: {ftype}" for name, ftype in config["features"].items() 49 ] 50 self._osprey_features = "\n".join(feature_lines) 51 52 label_lines = [ 53 f" {l['name']}: {l['description']} (valid for: {l['valid_for']})" 54 for l in config["labels"] 55 ] 56 self._osprey_labels = "\n".join(label_lines) 57 58 logger.info( 59 "Prefetched osprey config (%d features, %d labels)", 60 len(config["features"]), 61 len(config["labels"]), 62 ) 63 except Exception: 64 logger.warning("Failed to prefetch osprey config", exc_info=True) 65 66 try: 67 udfs = await self._registry.execute(self._ctx, "osprey.getUdfs", {}) 68 udf_lines: list[str] = [] 69 for cat in udfs["categories"]: 70 udf_lines.append(f"## {cat['name']}") 71 for udf in cat["udfs"]: 72 udf_lines.append(f" {udf['signature']}") 73 if udf.get("doc"): 74 first_line = udf["doc"].strip().split("\n")[0] 75 udf_lines.append(f" {first_line}") 76 self._osprey_udfs = "\n".join(udf_lines) 77 logger.info("Prefetched osprey UDFs") 78 except Exception: 79 logger.warning("Failed to prefetch osprey UDFs", exc_info=True) 80 81 try: 82 result = await self._registry.execute(self._ctx, "osprey.listRuleFiles", {}) 83 self._osprey_rule_files = "\n".join(f" {f}" for f in result["files"]) 84 logger.info("Prefetched osprey rule files (%d files)", len(result["files"])) 85 except Exception: 86 logger.warning("Failed to prefetch osprey rule files", exc_info=True) 87 88 async def execute_code(self, code: str) -> dict[str, Any]: 89 """ 90 execute Typescript code in a deno subprocess. 91 92 code has access to tools defined in the registry via the generated typescript 93 stubs. calls are bridged to pythin via stdin/out 94 """ 95 96 if len(code) > MAX_CODE_SIZE: 97 return { 98 "success": False, 99 "error": f"code too large ({len(code)} chars, max {MAX_CODE_SIZE})", 100 "debug": [], 101 } 102 103 self._write_generated_tools() 104 105 import tempfile 106 107 with tempfile.NamedTemporaryFile( 108 mode="w", suffix=".ts", delete=False, dir=DENO_DIR 109 ) as f: 110 # start by adding all the imports that we need... 111 full_code = f""" 112import {{ output, debug }} from "./runtime.ts"; 113import * as tools from "./tools.ts"; 114export {{ tools }}; 115 116{code} 117""" 118 f.write(full_code) 119 temp_path = f.name 120 121 try: 122 return await self._run_deno(temp_path) 123 finally: 124 os.unlink(temp_path) 125 126 def _write_generated_tools(self) -> None: 127 """generate tool stubs and write them to the deno directory""" 128 129 tools_ts = self._registry.generate_typescript_types() 130 tools_path = DENO_DIR / "tools.ts" 131 tools_path.write_text(tools_ts) 132 133 @staticmethod 134 def _kill_process(process: asyncio.subprocess.Process) -> None: 135 """kill a subprocess, ignoring errors if it's already dead""" 136 try: 137 process.kill() 138 except ProcessLookupError: 139 pass 140 141 async def _run_deno(self, script_path: str) -> dict[str, Any]: 142 """run the input script in a deno subprocess""" 143 144 # spawn a subprocess that executes deno with minimal permissions. explicit deny flags 145 # ensure these can't be escalated via dynamic imports or permission prompts. 146 deno_read_path = str(DENO_DIR) 147 process = await asyncio.create_subprocess_exec( 148 "deno", 149 "run", 150 f"--allow-read={deno_read_path}", 151 "--deny-write", 152 "--deny-net", 153 "--deny-run", 154 "--deny-env", 155 "--deny-ffi", 156 "--deny-sys", 157 "--no-prompt", 158 "--no-remote", 159 "--no-npm", 160 f"--v8-flags=--max-old-space-size={DENO_MEMORY_LIMIT_MB}", 161 script_path, 162 stdin=asyncio.subprocess.PIPE, 163 stdout=asyncio.subprocess.PIPE, 164 stderr=asyncio.subprocess.PIPE, 165 ) 166 167 assert process.stdin is not None 168 assert process.stdout is not None 169 assert process.stderr is not None 170 171 outputs: list[Any] = [] 172 debug_messages: list[str] = [] 173 error: str | None = None 174 tool_call_count = 0 175 total_output_bytes = 0 176 deadline = asyncio.get_event_loop().time() + MAX_EXECUTION_TIME 177 178 try: 179 while True: 180 # calculate remaining time against the total execution deadline 181 remaining = deadline - asyncio.get_event_loop().time() 182 if remaining <= 0: 183 self._kill_process(process) 184 error = f"execution timed out after {MAX_EXECUTION_TIME:.0f} seconds (total)" 185 break 186 187 # read next line with the lesser of 30s or remaining wall-clock time 188 read_timeout = min(30.0, remaining) 189 line = await asyncio.wait_for( 190 process.stdout.readline(), timeout=read_timeout 191 ) 192 193 # if there are no more lines we're finished... 194 if not line: 195 break 196 197 line_str = line.decode().strip() 198 if not line_str: 199 continue 200 201 # track total output size to prevent stdout flooding 202 total_output_bytes += len(line) 203 if total_output_bytes > MAX_OUTPUT_SIZE: 204 self._kill_process(process) 205 error = f"output exceeded {MAX_OUTPUT_SIZE} bytes, killed" 206 break 207 208 try: 209 message = json.loads(line_str) 210 except json.JSONDecodeError: 211 debug_messages.append(line_str) 212 continue 213 214 # whenever we encounter a tool call, we then need to execute that tool and give 215 # it the response 216 if "__tool_call__" in message: 217 tool_call_count += 1 218 if tool_call_count > MAX_TOOL_CALLS: 219 self._kill_process(process) 220 error = f"exceeded maximum of {MAX_TOOL_CALLS} tool calls" 221 break 222 223 tool_name = message["tool"] 224 params = message["params"] 225 logger.info(f"Tool call: {tool_name} with params: {params}") 226 227 try: 228 result = await self._registry.execute( 229 self._ctx, tool_name, params 230 ) 231 response = json.dumps({"__tool_result__": result}, default=str) 232 except Exception as e: 233 logger.exception(f"Tool error: {tool_name}") 234 response = json.dumps({"__tool_error__": str(e)}) 235 236 try: 237 process.stdin.write((response + "\n").encode()) 238 await process.stdin.drain() 239 except (ConnectionResetError, BrokenPipeError): 240 error = f"deno process exited while sending tool result for {tool_name}" 241 break 242 243 elif "__output__" in message: 244 outputs.append(message["__output__"]) 245 246 elif "__debug__" in message: 247 debug_messages.append(message["__debug__"]) 248 249 else: 250 debug_messages.append(line_str) 251 252 # make sure that we kill deno subprocess if the execution times out 253 except asyncio.TimeoutError: 254 self._kill_process(process) 255 error = "execution timed out" 256 # also kill it for any other exceptions we encounter 257 except Exception as e: 258 self._kill_process(process) 259 error = str(e) 260 261 await process.wait() 262 263 stderr_content = await process.stderr.read() 264 if stderr_content: 265 stderr_str = stderr_content.decode().strip() 266 if stderr_str: 267 if error: 268 error += f"\n\nStderr:\n{stderr_str}" 269 else: 270 error = stderr_str 271 272 success = process.returncode == 0 and error is None 273 274 result: dict[str, Any] = { 275 "success": success, 276 "debug": debug_messages, 277 } 278 279 if outputs: 280 result["output"] = outputs[-1] if len(outputs) == 1 else outputs 281 282 if error: 283 result["error"] = error 284 285 return result 286 287 def get_execute_code_tool_definition(self) -> dict[str, Any]: 288 """get tool definition for execute_code, including all the docs for available backend tools""" 289 290 if self._tool_definition is not None: 291 return self._tool_definition 292 293 tool_docs = self._registry.generate_tool_documentation() 294 295 schema_section = "" 296 if self._database_schema: 297 schema_section = f""" 298 299# Database Schema 300 301The `default.osprey_execution_results` table has these columns: 302{self._database_schema} 303 304Use these exact column names when writing SQL queries. Do NOT guess column names. 305""" 306 307 osprey_section = "" 308 if self._osprey_features: 309 osprey_section += f""" 310 311# Available Osprey Features 312 313These features are available in rule conditions (pre-loaded — no need to call getConfig): 314{self._osprey_features} 315""" 316 if self._osprey_labels: 317 osprey_section += f""" 318# Available Labels 319 320Use these label names with AtprotoLabel effects: 321{self._osprey_labels} 322""" 323 if self._osprey_udfs: 324 osprey_section += f""" 325# Available UDFs and Effects 326 327These are the available functions for use in rules (pre-loaded — no need to call getUdfs): 328{self._osprey_udfs} 329""" 330 if self._osprey_rule_files: 331 osprey_section += f""" 332# Existing Rule Files 333 334These .sml files already exist in the ruleset (pre-loaded — no need to call listRuleFiles unless you need a fresh list after mutations): 335{self._osprey_rule_files} 336""" 337 338 description = f"""Execute Typescript code in a sandboxed Deno runtime. 339 340The code has access to backend tools via the `tools` namespace. Use `output()` to return results. 341 342Example: 343```typescript 344// format a relative timestamp for ClickHouse DateTime64 columns 345const thirtyMinAgo = new Date(Date.now() - 30 * 60 * 1000).toISOString().slice(0, 19).replace('T', ' '); 346 347// run multiple independent queries safely with Promise.allSettled 348const results = await Promise.allSettled([ 349 tools.clickhouse.query(`SELECT Count() as cnt FROM default.osprey_execution_results WHERE __timestamp >= parseDateTimeBestEffort('${{thirtyMinAgo}}') LIMIT 1`), 350 tools.clickhouse.query(`SELECT UserId, Count() as n FROM default.osprey_execution_results WHERE __timestamp >= parseDateTimeBestEffort('${{thirtyMinAgo}}') GROUP BY UserId ORDER BY n DESC LIMIT 10`), 351]); 352 353output(results.map(r => r.status === 'fulfilled' ? r.value : r.reason?.message)); 354``` 355 356{tool_docs}{schema_section}{osprey_section}""" 357 358 self._tool_definition = { 359 "name": "execute_code", 360 "description": description, 361 "input_schema": { 362 "type": "object", 363 "properties": { 364 "code": { 365 "type": "string", 366 "description": "Typescript code to execute. Has access to `tools` namespace and `output()` function.", 367 } 368 }, 369 "required": ["code"], 370 }, 371 } 372 return self._tool_definition