A trust and safety agent that interacts with Osprey for investigation, real-time analysis, and prevention implementations
1import re
2from typing import Any
3
4import httpx
5
6from src.tools.registry import TOOL_REGISTRY, ToolContext, ToolParameter
7
8_IP_REGEX = re.compile(
9 r"^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$"
10 r"|^([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}$"
11 r"|^::$"
12 r"|^([0-9a-fA-F]{1,4}:){1,7}:$"
13 r"|^::[0-9a-fA-F]{1,4}(:[0-9a-fA-F]{1,4}){0,5}$"
14)
15
16
17@TOOL_REGISTRY.tool(
18 name="ip.lookup",
19 description="GeoIP and ASN lookup for an IP address. Returns geographic location (country, region, city, coordinates, timezone), network information (ISP, org, ASN), and flags for mobile, proxy, and hosting IPs.",
20 parameters=[
21 ToolParameter(
22 name="ip",
23 type="string",
24 description="The IP address to look up (IPv4 or IPv6)",
25 ),
26 ],
27)
28async def ip_lookup(ctx: ToolContext, ip: str) -> dict[str, Any]:
29 ip = ip.strip()
30 if not _IP_REGEX.match(ip):
31 return {"success": False, "ip": ip, "error": "Invalid IP address format"}
32
33 try:
34 # ip-api.com free tier requires HTTP, not HTTPS
35 async with httpx.AsyncClient(timeout=10.0) as client:
36 response = await client.get(
37 f"http://ip-api.com/json/{ip}",
38 params={
39 "fields": "status,message,country,countryCode,region,regionName,city,zip,lat,lon,timezone,isp,org,as,asname,mobile,proxy,hosting,query"
40 },
41 )
42 data = response.json()
43
44 if data.get("status") == "fail":
45 return {
46 "success": False,
47 "ip": ip,
48 "error": data.get("message", "Lookup failed"),
49 }
50
51 return {
52 "success": True,
53 "ip": data.get("query", ip),
54 "geo": {
55 "country": data.get("country"),
56 "country_code": data.get("countryCode"),
57 "region": data.get("regionName"),
58 "region_code": data.get("region"),
59 "city": data.get("city"),
60 "zip": data.get("zip"),
61 "lat": data.get("lat"),
62 "lon": data.get("lon"),
63 "timezone": data.get("timezone"),
64 },
65 "network": {
66 "isp": data.get("isp"),
67 "org": data.get("org"),
68 "asn": data.get("as"),
69 "asn_name": data.get("asname"),
70 },
71 "flags": {
72 "is_mobile": data.get("mobile", False),
73 "is_proxy": data.get("proxy", False),
74 "is_hosting": data.get("hosting", False),
75 },
76 }
77
78 except Exception as e:
79 return {"success": False, "ip": ip, "error": str(e)}