personal memory agent
at main 601 lines 19 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Browser scenario verification using Pinchtab snapshots and screenshots.""" 5 6from __future__ import annotations 7 8import argparse 9import base64 10import json 11import logging 12import os 13import signal 14import subprocess 15import time 16from pathlib import Path 17from typing import Any 18 19import requests 20 21logger = logging.getLogger(__name__) 22 23 24SCENARIOS: list[dict[str, Any]] = [ 25 # smoke scenarios 26 { 27 "app": "agents", 28 "name": "smoke", 29 "steps": [ 30 {"do": "navigate", "path": "/app/agents/20260304"}, 31 {"do": "wait", "ms": 1000}, 32 {"do": "screenshot"}, 33 ], 34 }, 35 { 36 "app": "calendar", 37 "name": "smoke", 38 "steps": [ 39 {"do": "navigate", "path": "/app/calendar/20260304"}, 40 {"do": "wait", "ms": 1000}, 41 {"do": "screenshot"}, 42 ], 43 }, 44 { 45 "app": "graph", 46 "name": "smoke", 47 "steps": [ 48 {"do": "navigate", "path": "/app/graph"}, 49 {"do": "wait", "ms": 1000}, 50 {"do": "screenshot"}, 51 ], 52 }, 53 { 54 "app": "speakers", 55 "name": "smoke", 56 "steps": [ 57 {"do": "navigate", "path": "/app/speakers/20260304"}, 58 {"do": "wait", "ms": 1000}, 59 {"do": "screenshot"}, 60 ], 61 }, 62 { 63 "app": "todos", 64 "name": "smoke", 65 "steps": [ 66 {"do": "navigate", "path": "/app/todos/20260304"}, 67 {"do": "wait", "ms": 1000}, 68 {"do": "screenshot"}, 69 ], 70 }, 71 { 72 "app": "tokens", 73 "name": "smoke", 74 "steps": [ 75 {"do": "navigate", "path": "/app/tokens/20260304"}, 76 {"do": "wait", "ms": 1000}, 77 {"do": "screenshot"}, 78 ], 79 }, 80 { 81 "app": "transcripts", 82 "name": "smoke", 83 "steps": [ 84 {"do": "navigate", "path": "/app/transcripts/20260304"}, 85 {"do": "wait", "ms": 1000}, 86 {"do": "screenshot"}, 87 ], 88 }, 89 { 90 "app": "dev", 91 "name": "smoke", 92 "steps": [ 93 {"do": "navigate", "path": "/app/dev"}, 94 {"do": "wait", "ms": 1000}, 95 {"do": "screenshot"}, 96 ], 97 }, 98 { 99 "app": "entities", 100 "name": "smoke", 101 "steps": [ 102 {"do": "navigate", "path": "/app/entities"}, 103 {"do": "wait", "ms": 1000}, 104 {"do": "screenshot"}, 105 ], 106 }, 107 { 108 "app": "health", 109 "name": "smoke", 110 "steps": [ 111 {"do": "navigate", "path": "/app/health"}, 112 {"do": "wait", "ms": 1000}, 113 {"do": "screenshot"}, 114 ], 115 }, 116 { 117 "app": "import", 118 "name": "smoke", 119 "steps": [ 120 {"do": "navigate", "path": "/app/import"}, 121 {"do": "wait", "ms": 1000}, 122 {"do": "screenshot"}, 123 ], 124 }, 125 { 126 "app": "remote", 127 "name": "smoke", 128 "steps": [ 129 {"do": "navigate", "path": "/app/remote"}, 130 {"do": "wait", "ms": 1000}, 131 {"do": "screenshot"}, 132 ], 133 }, 134 { 135 "app": "search", 136 "name": "smoke", 137 "steps": [ 138 {"do": "navigate", "path": "/app/search"}, 139 {"do": "wait", "ms": 1000}, 140 {"do": "screenshot"}, 141 ], 142 }, 143 { 144 "app": "settings", 145 "name": "smoke", 146 "steps": [ 147 {"do": "navigate", "path": "/app/settings"}, 148 {"do": "wait", "ms": 1000}, 149 {"do": "screenshot"}, 150 ], 151 }, 152 { 153 "app": "stats", 154 "name": "smoke", 155 "steps": [ 156 {"do": "navigate", "path": "/app/stats"}, 157 {"do": "wait", "ms": 1000}, 158 {"do": "screenshot"}, 159 ], 160 }, 161 # interactive scenarios 162 { 163 "app": "search", 164 "name": "search-flow", 165 "steps": [ 166 {"do": "navigate", "path": "/app/search"}, 167 {"do": "wait", "ms": 1000}, 168 {"do": "snapshot"}, 169 {"do": "find_input", "as": "search_input"}, 170 {"do": "type", "var": "search_input", "text": "romeo"}, 171 {"do": "wait", "ms": 1500}, 172 {"do": "screenshot"}, 173 ], 174 }, 175 { 176 "app": "graph", 177 "name": "load", 178 "steps": [ 179 {"do": "navigate", "path": "/app/graph"}, 180 {"do": "wait", "ms": 1000}, 181 {"do": "screenshot"}, 182 ], 183 }, 184 { 185 "app": "entities", 186 "name": "entity-detail", 187 "steps": [ 188 {"do": "navigate", "path": "/app/entities/work/romeo_montague"}, 189 {"do": "wait", "ms": 1000}, 190 {"do": "screenshot"}, 191 ], 192 }, 193 { 194 "app": "todos", 195 "name": "todo-states", 196 "steps": [ 197 {"do": "evaluate", "expression": "document.cookie='facet=work;path=/'"}, 198 {"do": "navigate", "path": "/app/todos/20260304"}, 199 {"do": "wait", "ms": 1200}, 200 {"do": "screenshot"}, 201 ], 202 }, 203 { 204 "app": "graph", 205 "name": "facet-filter", 206 "steps": [ 207 {"do": "evaluate", "expression": "document.cookie='facet=montague;path=/'"}, 208 {"do": "navigate", "path": "/app/graph"}, 209 {"do": "wait", "ms": 1200}, 210 {"do": "screenshot"}, 211 ], 212 }, 213] 214 215 216_ERROR_LISTENER_JS = ( 217 "window.__pt_errors=[];" 218 "window.addEventListener('error',e=>window.__pt_errors.push(e.message));" 219 "window.onerror=(_,__,___,____,e)=>window.__pt_errors.push(e?.message||'unknown')" 220) 221 222 223def baseline_path(scenario: dict[str, Any]) -> Path: 224 return Path("tests/baselines/visual") / scenario["app"] / f"{scenario['name']}.jpg" 225 226 227class PinchTab: 228 """Minimal pinchtab HTTP client with process lifecycle. 229 230 Pinchtab v0.7.x uses a flat API — endpoints are at the root level 231 (e.g., /navigate, /screenshot, /snapshot) rather than nested under 232 /tabs/<id>/ or /instances/. Chrome is auto-managed by the server. 233 """ 234 235 def __init__(self, port: int = 19867) -> None: 236 self.port = port 237 self.base_url = f"http://localhost:{port}" 238 self._process: subprocess.Popen | None = None 239 self._session = requests.Session() 240 241 def start(self, timeout: int = 30) -> None: 242 """Launch pinchtab and wait for health check.""" 243 env = { 244 **os.environ, 245 "BRIDGE_PORT": str(self.port), 246 "BRIDGE_HEADLESS": "true", 247 } 248 self._stderr_path = f"/tmp/pinchtab-{self.port}.log" 249 self._stderr_file = open(self._stderr_path, "w") 250 try: 251 self._process = subprocess.Popen( 252 ["pinchtab"], 253 env=env, 254 stdout=subprocess.DEVNULL, 255 stderr=self._stderr_file, 256 start_new_session=True, 257 ) 258 except Exception as exc: 259 self._stderr_file.close() 260 raise RuntimeError("failed to start pinchtab") from exc 261 262 deadline = time.monotonic() + timeout 263 while time.monotonic() < deadline: 264 if self._process.poll() is not None: 265 self._stderr_file.close() 266 try: 267 stderr = Path(self._stderr_path).read_text() 268 except Exception: 269 stderr = "" 270 raise RuntimeError( 271 f"pinchtab exited with code {self._process.returncode}\n{stderr}" 272 ) 273 try: 274 response = self._session.get(f"{self.base_url}/health", timeout=2) 275 if response.status_code == 200: 276 health = response.json() 277 if health.get("status") == "ok": 278 return 279 except requests.ConnectionError: 280 pass 281 time.sleep(0.5) 282 self.stop() 283 raise RuntimeError("pinchtab failed to start") 284 285 def stop(self) -> None: 286 """Terminate pinchtab process and all children.""" 287 if hasattr(self, "_stderr_file") and self._stderr_file: 288 try: 289 self._stderr_file.close() 290 except Exception: 291 pass 292 if self._process: 293 pid = self._process.pid 294 if self._process.poll() is None: 295 self._session.close() 296 # Kill the entire process group to catch the Go binary child 297 try: 298 os.killpg(os.getpgid(pid), signal.SIGTERM) 299 except (ProcessLookupError, PermissionError): 300 self._process.terminate() 301 try: 302 self._process.wait(timeout=5) 303 except subprocess.TimeoutExpired: 304 try: 305 os.killpg(os.getpgid(pid), signal.SIGKILL) 306 except (ProcessLookupError, PermissionError): 307 self._process.send_signal(signal.SIGKILL) 308 self._process.wait() 309 self._process = None 310 311 def navigate(self, url: str) -> None: 312 response = self._session.post( 313 f"{self.base_url}/navigate", 314 json={"url": url}, 315 timeout=30, 316 ) 317 response.raise_for_status() 318 319 def screenshot(self) -> bytes: 320 response = self._session.get( 321 f"{self.base_url}/screenshot", 322 timeout=30, 323 ) 324 response.raise_for_status() 325 payload = response.json() 326 return base64.b64decode(payload["base64"]) 327 328 def snapshot(self) -> dict: 329 response = self._session.get( 330 f"{self.base_url}/snapshot", 331 timeout=30, 332 ) 333 response.raise_for_status() 334 return response.json() 335 336 def text(self) -> str: 337 response = self._session.get( 338 f"{self.base_url}/text", 339 timeout=30, 340 ) 341 response.raise_for_status() 342 payload = response.json() 343 if isinstance(payload, dict): 344 return payload.get("text", "") 345 if isinstance(payload, str): 346 return payload 347 return "" 348 349 def action(self, kind: str, **kwargs: Any) -> None: 350 response = self._session.post( 351 f"{self.base_url}/action", 352 json={"kind": kind, **kwargs}, 353 timeout=30, 354 ) 355 response.raise_for_status() 356 357 def evaluate(self, expression: str) -> Any: 358 response = self._session.post( 359 f"{self.base_url}/evaluate", 360 json={"expression": expression}, 361 timeout=30, 362 ) 363 response.raise_for_status() 364 try: 365 return response.json() 366 except ValueError: 367 return response.text 368 369 370def inject_error_listener(pt: PinchTab) -> None: 371 pt.evaluate(_ERROR_LISTENER_JS) 372 373 374def collect_console_errors(pt: PinchTab) -> list[str]: 375 result = pt.evaluate("JSON.stringify(window.__pt_errors||[])") 376 value = result if isinstance(result, str) else result.get("result", "[]") 377 try: 378 return json.loads(value) 379 except (json.JSONDecodeError, TypeError): 380 return [] 381 382 383def find_input_ref(snapshot: dict) -> str | None: 384 """Find first text input node ref from snapshot.""" 385 for node in snapshot.get("nodes", []): 386 role = str(node.get("role", "")).lower() 387 tag = str(node.get("tag", "")).lower() 388 if role in ("textbox", "searchbox", "combobox") or tag == "input": 389 return node.get("ref") 390 return None 391 392 393def find_ref(snapshot: dict, text: str) -> str | None: 394 needle = str(text).lower() 395 for node in snapshot.get("nodes", []): 396 ref = node.get("ref") 397 if not ref: 398 continue 399 if needle == "": 400 return ref 401 if ( 402 needle in str(node.get("name", "")).lower() 403 or needle in str(node.get("text", "")).lower() 404 or needle in str(node.get("label", "")).lower() 405 or needle in str(node.get("value", "")).lower() 406 ): 407 return ref 408 return None 409 410 411def _resolve_url(base_url: str, path: str) -> str: 412 return f"{base_url.rstrip('/')}{path}" 413 414 415def run_scenario( 416 pt: PinchTab, scenario: dict[str, Any], base_url: str, mode: str 417) -> dict[str, Any]: 418 """Execute one scenario. Returns {ok, errors, console_errors}.""" 419 identifier = f"{scenario['app']}/{scenario['name']}" 420 errors: list[str] = [] 421 variables: dict[str, str] = {} 422 last_snapshot: dict[str, Any] | None = None 423 console_errors: list[str] = [] 424 425 logger.info(" %s", identifier) 426 427 try: 428 inject_error_listener(pt) 429 except Exception: 430 pass 431 432 for step in scenario["steps"]: 433 action = step["do"] 434 try: 435 if action == "navigate": 436 url = _resolve_url(base_url, step["path"]) 437 pt.navigate(url) 438 time.sleep(0.3) 439 try: 440 inject_error_listener(pt) 441 except Exception: 442 pass 443 444 elif action == "wait": 445 time.sleep(float(step["ms"]) / 1000) 446 447 elif action == "snapshot": 448 last_snapshot = pt.snapshot() 449 450 elif action == "screenshot": 451 png = pt.screenshot() 452 path = baseline_path(scenario) 453 if mode == "update": 454 path.parent.mkdir(parents=True, exist_ok=True) 455 path.write_bytes(png) 456 else: 457 if not path.exists(): 458 errors.append(f"baseline not found: {path}") 459 # No pixel comparison — baselines are for human review 460 461 elif action == "find": 462 if last_snapshot is None: 463 errors.append("find without prior snapshot") 464 continue 465 ref = find_ref(last_snapshot, step["text"]) 466 if ref is None: 467 errors.append(f"find: text not found: {step['text']!r}") 468 continue 469 variables[step["as"]] = ref 470 471 elif action == "find_input": 472 if last_snapshot is None: 473 errors.append("find_input without prior snapshot") 474 continue 475 ref = find_input_ref(last_snapshot) 476 if ref is None: 477 errors.append("no text input found in snapshot") 478 continue 479 variables[step["as"]] = ref 480 481 elif action == "click": 482 ref = step.get("ref") or variables.get(step.get("var", "")) 483 if not ref: 484 errors.append(f"click: no ref resolved for {step}") 485 continue 486 pt.action("click", ref=ref) 487 488 elif action == "type": 489 ref = step.get("ref") or variables.get(step.get("var", "")) 490 if not ref: 491 errors.append(f"type: no ref resolved for {step}") 492 continue 493 pt.action("type", ref=ref, text=step["text"]) 494 495 elif action == "assert_text": 496 text = step["text"] 497 page_text = pt.text().lower() 498 if str(text).lower() not in page_text: 499 errors.append(f"assert_text: '{text}' not found") 500 501 elif action == "evaluate": 502 pt.evaluate(step["expression"]) 503 504 else: 505 errors.append(f"unknown step type: {action}") 506 507 except Exception as exc: 508 errors.append(f"step {action} failed: {exc}") 509 510 try: 511 console_errors = collect_console_errors(pt) 512 except Exception: 513 logger.debug("Unable to collect console errors for %s", identifier) 514 515 return { 516 "ok": len(errors) == 0, 517 "errors": errors, 518 "console_errors": console_errors, 519 } 520 521 522def run_all( 523 pt: PinchTab, base_url: str, mode: str 524) -> tuple[list[dict[str, Any]], list[tuple[str, list[str]]]]: 525 """Run all scenarios. Returns (results, console_error_pairs).""" 526 results: list[dict[str, Any]] = [] 527 all_console_errors: list[tuple[str, list[str]]] = [] 528 for scenario in SCENARIOS: 529 identifier = f"{scenario['app']}/{scenario['name']}" 530 result = run_scenario(pt, scenario, base_url, mode) 531 results.append({"scenario": identifier, **result}) 532 if result["console_errors"]: 533 all_console_errors.append((identifier, result["console_errors"])) 534 return results, all_console_errors 535 536 537def parse_args(argv: list[str] | None = None) -> argparse.Namespace: 538 parser = argparse.ArgumentParser(description="Browser scenario verification") 539 parser.add_argument( 540 "command", 541 choices=["verify", "update"], 542 help="Verify or update baselines", 543 ) 544 parser.add_argument("--base-url", required=True, help="Convey base URL") 545 parser.add_argument( 546 "--pinchtab-port", 547 type=int, 548 default=19867, 549 help="Pinchtab bridge port", 550 ) 551 return parser.parse_args(argv) 552 553 554def main(argv: list[str] | None = None) -> int: 555 args = parse_args(argv) 556 logging.basicConfig(level=logging.INFO, format="%(message)s") 557 558 pt = PinchTab(port=args.pinchtab_port) 559 logger.info("Starting pinchtab on port %d...", args.pinchtab_port) 560 pt.start() 561 562 try: 563 logger.info("Running %d scenarios (%s)...", len(SCENARIOS), args.command) 564 results, console_errors = run_all(pt, args.base_url, args.command) 565 566 passed = sum(1 for r in results if r["ok"]) 567 failed = sum(1 for r in results if not r["ok"]) 568 569 if failed: 570 logger.info("") 571 logger.info("Failures:") 572 for result in results: 573 if result["ok"]: 574 continue 575 for err in result["errors"]: 576 logger.info(" %s: %s", result["scenario"], err) 577 578 if console_errors: 579 logger.info("") 580 logger.info("JS console errors:") 581 for scenario, errors in console_errors: 582 for err in errors: 583 logger.info(" %s: %s", scenario, err) 584 585 logger.info("") 586 if args.command == "update": 587 logger.info("Updated %d scenario baselines.", passed + failed) 588 else: 589 logger.info("Browser verification: %d passed, %d failed.", passed, failed) 590 591 if failed: 592 logger.info("Run 'make update-browser-baselines' to update baselines") 593 return 1 594 595 return 0 596 finally: 597 pt.stop() 598 599 600if __name__ == "__main__": 601 raise SystemExit(main())