A trust and safety agent that interacts with Osprey for investigation, real-time analysis, and prevention implementations
at main 79 lines 2.7 kB view raw
1import re 2from typing import Any 3 4import httpx 5 6from src.tools.registry import TOOL_REGISTRY, ToolContext, ToolParameter 7 8_IP_REGEX = re.compile( 9 r"^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$" 10 r"|^([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}$" 11 r"|^::$" 12 r"|^([0-9a-fA-F]{1,4}:){1,7}:$" 13 r"|^::[0-9a-fA-F]{1,4}(:[0-9a-fA-F]{1,4}){0,5}$" 14) 15 16 17@TOOL_REGISTRY.tool( 18 name="ip.lookup", 19 description="GeoIP and ASN lookup for an IP address. Returns geographic location (country, region, city, coordinates, timezone), network information (ISP, org, ASN), and flags for mobile, proxy, and hosting IPs.", 20 parameters=[ 21 ToolParameter( 22 name="ip", 23 type="string", 24 description="The IP address to look up (IPv4 or IPv6)", 25 ), 26 ], 27) 28async def ip_lookup(ctx: ToolContext, ip: str) -> dict[str, Any]: 29 ip = ip.strip() 30 if not _IP_REGEX.match(ip): 31 return {"success": False, "ip": ip, "error": "Invalid IP address format"} 32 33 try: 34 # ip-api.com free tier requires HTTP, not HTTPS 35 async with httpx.AsyncClient(timeout=10.0) as client: 36 response = await client.get( 37 f"http://ip-api.com/json/{ip}", 38 params={ 39 "fields": "status,message,country,countryCode,region,regionName,city,zip,lat,lon,timezone,isp,org,as,asname,mobile,proxy,hosting,query" 40 }, 41 ) 42 data = response.json() 43 44 if data.get("status") == "fail": 45 return { 46 "success": False, 47 "ip": ip, 48 "error": data.get("message", "Lookup failed"), 49 } 50 51 return { 52 "success": True, 53 "ip": data.get("query", ip), 54 "geo": { 55 "country": data.get("country"), 56 "country_code": data.get("countryCode"), 57 "region": data.get("regionName"), 58 "region_code": data.get("region"), 59 "city": data.get("city"), 60 "zip": data.get("zip"), 61 "lat": data.get("lat"), 62 "lon": data.get("lon"), 63 "timezone": data.get("timezone"), 64 }, 65 "network": { 66 "isp": data.get("isp"), 67 "org": data.get("org"), 68 "asn": data.get("as"), 69 "asn_name": data.get("asname"), 70 }, 71 "flags": { 72 "is_mobile": data.get("mobile", False), 73 "is_proxy": data.get("proxy", False), 74 "is_hosting": data.get("hosting", False), 75 }, 76 } 77 78 except Exception as e: 79 return {"success": False, "ip": ip, "error": str(e)}