A trust and safety agent that interacts with Osprey for investigation, real-time analysis, and prevention implementations
1import asyncio
2import json
3import logging
4import os
5from pathlib import Path
6from typing import Any
7
8from src.tools.registry import ToolContext, ToolRegistry
9
10logger = logging.getLogger(__name__)
11
12DENO_DIR = Path(__file__).parent / "deno"
13
14# security limits for deno execution
15MAX_CODE_SIZE = 50_000 # max input code size in characters
16MAX_TOOL_CALLS = 25 # max number of tool calls per execution
17MAX_OUTPUT_SIZE = 1_000_000 # max total output size in bytes
18MAX_EXECUTION_TIME = 60.0 # total wall-clock timeout in seconds
19DENO_MEMORY_LIMIT_MB = 256 # v8 heap limit
20
21
22class ToolExecutor:
23 """executor that runs Typescript code in a deno subprocess"""
24
25 def __init__(self, registry: ToolRegistry, ctx: ToolContext) -> None:
26 self._registry = registry
27 self._ctx = ctx
28 self._database_schema: str | None = None
29 self._osprey_features: str | None = None
30 self._osprey_labels: str | None = None
31 self._osprey_udfs: str | None = None
32 self._osprey_rule_files: str | None = None
33 self._tool_definition: dict[str, Any] | None = None
34
35 async def initialize(self) -> None:
36 # prefetch data for inclusion in the tool description, so that the agent doesn't waste tool calls on discovery
37 try:
38 schema = await self._registry.execute(self._ctx, "clickhouse.getSchema", {})
39 lines = [f" {col['name']} ({col['type']})" for col in schema]
40 self._database_schema = "\n".join(lines)
41 logger.info("Prefetched database schema (%d columns)", len(schema))
42 except Exception:
43 logger.warning("Failed to prefetch database schema", exc_info=True)
44
45 try:
46 config = await self._registry.execute(self._ctx, "osprey.getConfig", {})
47 feature_lines = [
48 f" {name}: {ftype}" for name, ftype in config["features"].items()
49 ]
50 self._osprey_features = "\n".join(feature_lines)
51
52 label_lines = [
53 f" {l['name']}: {l['description']} (valid for: {l['valid_for']})"
54 for l in config["labels"]
55 ]
56 self._osprey_labels = "\n".join(label_lines)
57
58 logger.info(
59 "Prefetched osprey config (%d features, %d labels)",
60 len(config["features"]),
61 len(config["labels"]),
62 )
63 except Exception:
64 logger.warning("Failed to prefetch osprey config", exc_info=True)
65
66 try:
67 udfs = await self._registry.execute(self._ctx, "osprey.getUdfs", {})
68 udf_lines: list[str] = []
69 for cat in udfs["categories"]:
70 udf_lines.append(f"## {cat['name']}")
71 for udf in cat["udfs"]:
72 udf_lines.append(f" {udf['signature']}")
73 if udf.get("doc"):
74 first_line = udf["doc"].strip().split("\n")[0]
75 udf_lines.append(f" {first_line}")
76 self._osprey_udfs = "\n".join(udf_lines)
77 logger.info("Prefetched osprey UDFs")
78 except Exception:
79 logger.warning("Failed to prefetch osprey UDFs", exc_info=True)
80
81 try:
82 result = await self._registry.execute(self._ctx, "osprey.listRuleFiles", {})
83 self._osprey_rule_files = "\n".join(f" {f}" for f in result["files"])
84 logger.info("Prefetched osprey rule files (%d files)", len(result["files"]))
85 except Exception:
86 logger.warning("Failed to prefetch osprey rule files", exc_info=True)
87
88 async def execute_code(self, code: str) -> dict[str, Any]:
89 """
90 execute Typescript code in a deno subprocess.
91
92 code has access to tools defined in the registry via the generated typescript
93 stubs. calls are bridged to pythin via stdin/out
94 """
95
96 if len(code) > MAX_CODE_SIZE:
97 return {
98 "success": False,
99 "error": f"code too large ({len(code)} chars, max {MAX_CODE_SIZE})",
100 "debug": [],
101 }
102
103 self._write_generated_tools()
104
105 import tempfile
106
107 with tempfile.NamedTemporaryFile(
108 mode="w", suffix=".ts", delete=False, dir=DENO_DIR
109 ) as f:
110 # start by adding all the imports that we need...
111 full_code = f"""
112import {{ output, debug }} from "./runtime.ts";
113import * as tools from "./tools.ts";
114export {{ tools }};
115
116{code}
117"""
118 f.write(full_code)
119 temp_path = f.name
120
121 try:
122 return await self._run_deno(temp_path)
123 finally:
124 os.unlink(temp_path)
125
126 def _write_generated_tools(self) -> None:
127 """generate tool stubs and write them to the deno directory"""
128
129 tools_ts = self._registry.generate_typescript_types()
130 tools_path = DENO_DIR / "tools.ts"
131 tools_path.write_text(tools_ts)
132
133 @staticmethod
134 def _kill_process(process: asyncio.subprocess.Process) -> None:
135 """kill a subprocess, ignoring errors if it's already dead"""
136 try:
137 process.kill()
138 except ProcessLookupError:
139 pass
140
141 async def _run_deno(self, script_path: str) -> dict[str, Any]:
142 """run the input script in a deno subprocess"""
143
144 # spawn a subprocess that executes deno with minimal permissions. explicit deny flags
145 # ensure these can't be escalated via dynamic imports or permission prompts.
146 deno_read_path = str(DENO_DIR)
147 process = await asyncio.create_subprocess_exec(
148 "deno",
149 "run",
150 f"--allow-read={deno_read_path}",
151 "--deny-write",
152 "--deny-net",
153 "--deny-run",
154 "--deny-env",
155 "--deny-ffi",
156 "--deny-sys",
157 "--no-prompt",
158 "--no-remote",
159 "--no-npm",
160 f"--v8-flags=--max-old-space-size={DENO_MEMORY_LIMIT_MB}",
161 script_path,
162 stdin=asyncio.subprocess.PIPE,
163 stdout=asyncio.subprocess.PIPE,
164 stderr=asyncio.subprocess.PIPE,
165 )
166
167 assert process.stdin is not None
168 assert process.stdout is not None
169 assert process.stderr is not None
170
171 outputs: list[Any] = []
172 debug_messages: list[str] = []
173 error: str | None = None
174 tool_call_count = 0
175 total_output_bytes = 0
176 deadline = asyncio.get_event_loop().time() + MAX_EXECUTION_TIME
177
178 try:
179 while True:
180 # calculate remaining time against the total execution deadline
181 remaining = deadline - asyncio.get_event_loop().time()
182 if remaining <= 0:
183 self._kill_process(process)
184 error = f"execution timed out after {MAX_EXECUTION_TIME:.0f} seconds (total)"
185 break
186
187 # read next line with the lesser of 30s or remaining wall-clock time
188 read_timeout = min(30.0, remaining)
189 line = await asyncio.wait_for(
190 process.stdout.readline(), timeout=read_timeout
191 )
192
193 # if there are no more lines we're finished...
194 if not line:
195 break
196
197 line_str = line.decode().strip()
198 if not line_str:
199 continue
200
201 # track total output size to prevent stdout flooding
202 total_output_bytes += len(line)
203 if total_output_bytes > MAX_OUTPUT_SIZE:
204 self._kill_process(process)
205 error = f"output exceeded {MAX_OUTPUT_SIZE} bytes, killed"
206 break
207
208 try:
209 message = json.loads(line_str)
210 except json.JSONDecodeError:
211 debug_messages.append(line_str)
212 continue
213
214 # whenever we encounter a tool call, we then need to execute that tool and give
215 # it the response
216 if "__tool_call__" in message:
217 tool_call_count += 1
218 if tool_call_count > MAX_TOOL_CALLS:
219 self._kill_process(process)
220 error = f"exceeded maximum of {MAX_TOOL_CALLS} tool calls"
221 break
222
223 tool_name = message["tool"]
224 params = message["params"]
225 logger.info(f"Tool call: {tool_name} with params: {params}")
226
227 try:
228 result = await self._registry.execute(
229 self._ctx, tool_name, params
230 )
231 response = json.dumps({"__tool_result__": result}, default=str)
232 except Exception as e:
233 logger.exception(f"Tool error: {tool_name}")
234 response = json.dumps({"__tool_error__": str(e)})
235
236 try:
237 process.stdin.write((response + "\n").encode())
238 await process.stdin.drain()
239 except (ConnectionResetError, BrokenPipeError):
240 error = f"deno process exited while sending tool result for {tool_name}"
241 break
242
243 elif "__output__" in message:
244 outputs.append(message["__output__"])
245
246 elif "__debug__" in message:
247 debug_messages.append(message["__debug__"])
248
249 else:
250 debug_messages.append(line_str)
251
252 # make sure that we kill deno subprocess if the execution times out
253 except asyncio.TimeoutError:
254 self._kill_process(process)
255 error = "execution timed out"
256 # also kill it for any other exceptions we encounter
257 except Exception as e:
258 self._kill_process(process)
259 error = str(e)
260
261 await process.wait()
262
263 stderr_content = await process.stderr.read()
264 if stderr_content:
265 stderr_str = stderr_content.decode().strip()
266 if stderr_str:
267 if error:
268 error += f"\n\nStderr:\n{stderr_str}"
269 else:
270 error = stderr_str
271
272 success = process.returncode == 0 and error is None
273
274 result: dict[str, Any] = {
275 "success": success,
276 "debug": debug_messages,
277 }
278
279 if outputs:
280 result["output"] = outputs[-1] if len(outputs) == 1 else outputs
281
282 if error:
283 result["error"] = error
284
285 return result
286
287 def get_execute_code_tool_definition(self) -> dict[str, Any]:
288 """get tool definition for execute_code, including all the docs for available backend tools"""
289
290 if self._tool_definition is not None:
291 return self._tool_definition
292
293 tool_docs = self._registry.generate_tool_documentation()
294
295 schema_section = ""
296 if self._database_schema:
297 schema_section = f"""
298
299# Database Schema
300
301The `default.osprey_execution_results` table has these columns:
302{self._database_schema}
303
304Use these exact column names when writing SQL queries. Do NOT guess column names.
305"""
306
307 osprey_section = ""
308 if self._osprey_features:
309 osprey_section += f"""
310
311# Available Osprey Features
312
313These features are available in rule conditions (pre-loaded — no need to call getConfig):
314{self._osprey_features}
315"""
316 if self._osprey_labels:
317 osprey_section += f"""
318# Available Labels
319
320Use these label names with AtprotoLabel effects:
321{self._osprey_labels}
322"""
323 if self._osprey_udfs:
324 osprey_section += f"""
325# Available UDFs and Effects
326
327These are the available functions for use in rules (pre-loaded — no need to call getUdfs):
328{self._osprey_udfs}
329"""
330 if self._osprey_rule_files:
331 osprey_section += f"""
332# Existing Rule Files
333
334These .sml files already exist in the ruleset (pre-loaded — no need to call listRuleFiles unless you need a fresh list after mutations):
335{self._osprey_rule_files}
336"""
337
338 description = f"""Execute Typescript code in a sandboxed Deno runtime.
339
340The code has access to backend tools via the `tools` namespace. Use `output()` to return results.
341
342Example:
343```typescript
344// format a relative timestamp for ClickHouse DateTime64 columns
345const thirtyMinAgo = new Date(Date.now() - 30 * 60 * 1000).toISOString().slice(0, 19).replace('T', ' ');
346
347// run multiple independent queries safely with Promise.allSettled
348const results = await Promise.allSettled([
349 tools.clickhouse.query(`SELECT Count() as cnt FROM default.osprey_execution_results WHERE __timestamp >= parseDateTimeBestEffort('${{thirtyMinAgo}}') LIMIT 1`),
350 tools.clickhouse.query(`SELECT UserId, Count() as n FROM default.osprey_execution_results WHERE __timestamp >= parseDateTimeBestEffort('${{thirtyMinAgo}}') GROUP BY UserId ORDER BY n DESC LIMIT 10`),
351]);
352
353output(results.map(r => r.status === 'fulfilled' ? r.value : r.reason?.message));
354```
355
356{tool_docs}{schema_section}{osprey_section}"""
357
358 self._tool_definition = {
359 "name": "execute_code",
360 "description": description,
361 "input_schema": {
362 "type": "object",
363 "properties": {
364 "code": {
365 "type": "string",
366 "description": "Typescript code to execute. Has access to `tools` namespace and `output()` function.",
367 }
368 },
369 "required": ["code"],
370 },
371 }
372 return self._tool_definition