personal memory agent
at main 249 lines 7.8 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Root blueprint: authentication and core routes.""" 5 6from __future__ import annotations 7 8import json 9import os 10from datetime import date 11from pathlib import Path 12from typing import Any 13 14from flask import ( 15 Blueprint, 16 jsonify, 17 redirect, 18 render_template, 19 request, 20 send_from_directory, 21 session, 22 url_for, 23) 24from werkzeug.security import check_password_hash, generate_password_hash 25 26from think.cluster import cluster_segments 27from think.utils import day_dirs, get_config, get_journal 28 29 30def _get_password_hash() -> str: 31 """Get current password hash from config, reloading on each call.""" 32 try: 33 config = get_config() 34 convey_config = config.get("convey", {}) 35 return convey_config.get("password_hash", "") 36 except Exception: 37 return "" 38 39 40def _save_config_section(section: str, data: dict) -> dict: 41 """Merge data into a config section and write back to journal.json.""" 42 config = get_config() 43 config.setdefault(section, {}).update(data) 44 config_path = Path(get_journal()) / "config" / "journal.json" 45 config_path.parent.mkdir(parents=True, exist_ok=True) 46 with open(config_path, "w", encoding="utf-8") as f: 47 json.dump(config, f, indent=2, ensure_ascii=False) 48 f.write("\n") 49 os.chmod(config_path, 0o600) 50 return config 51 52 53bp = Blueprint( 54 "root", 55 __name__, 56 template_folder="templates", 57 static_folder="static", 58) 59 60 61@bp.before_app_request 62def require_login() -> Any: 63 if request.endpoint in { 64 "root.init", 65 "root.init_password", 66 "root.init_identity", 67 "root.init_provider", 68 "root.init_observers", 69 "root.init_finalize", 70 "root.login", 71 "root.static", 72 "root.favicon", 73 # Observer ingest endpoints use key-based auth, not session 74 "app:observer.ingest_upload", 75 "app:observer.ingest_event", 76 "app:observer.ingest_segments", 77 }: 78 return None 79 80 # Auto-bypass for localhost requests WITHOUT proxy headers 81 remote_addr = request.remote_addr 82 is_localhost = remote_addr in ("127.0.0.1", "::1", "localhost") 83 84 # Detect proxy headers that might indicate forwarded external request 85 proxy_headers = ( 86 request.headers.get("X-Forwarded-For") 87 or request.headers.get("X-Real-IP") 88 or request.headers.get("X-Forwarded-Host") 89 ) 90 91 if is_localhost and not proxy_headers: 92 # Genuine localhost request - auto-bypass 93 return None 94 95 # Otherwise require session authentication 96 if not session.get("logged_in"): 97 if not _get_password_hash(): 98 return redirect(url_for("root.init")) 99 return redirect(url_for("root.login")) 100 101 102@bp.route("/login", methods=["GET", "POST"]) 103def login() -> Any: 104 # Re-check password from config on each request 105 password_hash = _get_password_hash() 106 107 # If no password is configured, show error page 108 if not password_hash: 109 error = "No password configured. Run 'sol password set' to set one." 110 return render_template("login.html", error=error, no_password=True) 111 112 error = None 113 if request.method == "POST": 114 if check_password_hash(password_hash, request.form.get("password", "")): 115 session["logged_in"] = True 116 session.permanent = True 117 return redirect(url_for("root.index")) 118 error = "Invalid password" 119 return render_template("login.html", error=error, no_password=False) 120 121 122@bp.route("/init") 123def init() -> Any: 124 if _get_password_hash(): 125 return redirect(url_for("root.index")) 126 127 config_path = str(Path(get_journal()) / "config" / "journal.json") 128 repo_path = str(Path(__file__).resolve().parent.parent) 129 return render_template("init.html", config_path=config_path, repo_path=repo_path) 130 131 132@bp.route("/init/password", methods=["POST"]) 133def init_password() -> Any: 134 if _get_password_hash(): 135 return jsonify({"error": "Already configured"}), 400 136 137 data = request.get_json(silent=True) or {} 138 password = data.get("password", "") 139 if len(password) < 8: 140 return jsonify({"error": "Password must be at least 8 characters"}), 400 141 142 hashed = generate_password_hash(password) 143 _save_config_section("convey", {"password_hash": hashed}) 144 return jsonify({"success": True}) 145 146 147@bp.route("/init/identity", methods=["POST"]) 148def init_identity() -> Any: 149 if not _get_password_hash(): 150 return jsonify({"error": "Password required first"}), 403 151 152 data = request.get_json(silent=True) or {} 153 allowed = {k: data[k] for k in ("name", "preferred", "timezone") if k in data} 154 _save_config_section("identity", allowed) 155 return jsonify({"success": True}) 156 157 158@bp.route("/init/provider", methods=["POST"]) 159def init_provider() -> Any: 160 if not _get_password_hash(): 161 return jsonify({"error": "Password required first"}), 403 162 163 data = request.get_json(silent=True) or {} 164 key = data.get("key", "") 165 _save_config_section("env", {"GOOGLE_API_KEY": key}) 166 167 from think.providers import validate_key 168 169 try: 170 result = validate_key("google", key) 171 except Exception as e: 172 result = {"valid": False, "error": str(e)} 173 return jsonify({"success": True, "validation": result}) 174 175 176@bp.route("/init/observers") 177def init_observers() -> Any: 178 if not _get_password_hash(): 179 return jsonify({"error": "Password required first"}), 403 180 181 from apps.observer.utils import list_observers 182 183 observers_list = [] 184 for observer in list_observers(): 185 if observer.get("revoked", False): 186 continue 187 observers_list.append( 188 { 189 "key_prefix": observer.get("key", "")[:8], 190 "name": observer.get("name", ""), 191 "created_at": observer.get("created_at", 0), 192 "last_seen": observer.get("last_seen"), 193 "last_segment": observer.get("last_segment"), 194 "enabled": observer.get("enabled", True), 195 "revoked": observer.get("revoked", False), 196 "revoked_at": observer.get("revoked_at"), 197 "stats": observer.get("stats", {}), 198 } 199 ) 200 return jsonify(observers_list) 201 202 203@bp.route("/init/finalize", methods=["POST"]) 204def init_finalize() -> Any: 205 if not _get_password_hash(): 206 return jsonify({"error": "Password required first"}), 403 207 208 from think.utils import now_ms 209 210 data = request.get_json(silent=True) or {} 211 coding_agent = data.get("coding_agent", "") 212 _save_config_section( 213 "setup", 214 {"coding_agent": coding_agent, "completed_at": now_ms()}, 215 ) 216 session["logged_in"] = True 217 session.permanent = True 218 return jsonify({"success": True, "redirect": url_for("root.index")}) 219 220 221@bp.route("/logout") 222def logout() -> Any: 223 session.pop("logged_in", None) 224 return redirect(url_for("root.login")) 225 226 227@bp.route("/favicon.ico") 228def favicon() -> Any: 229 """Serve the favicon from the project root.""" 230 project_root = os.path.dirname( 231 os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 232 ) 233 return send_from_directory(project_root, "favicon.ico", mimetype="image/x-icon") 234 235 236@bp.route("/app/today") 237def app_today() -> Any: 238 """Redirect /app/today to the most recent day with journal data.""" 239 today = date.today().strftime("%Y%m%d") 240 for day in sorted(day_dirs().keys(), reverse=True): 241 if cluster_segments(day): 242 return redirect(url_for("app:transcripts.transcripts_day", day=day)) 243 return redirect(url_for("app:transcripts.transcripts_day", day=today)) 244 245 246@bp.route("/") 247def index() -> Any: 248 """Root redirect — always to home; the app handles new journals there.""" 249 return redirect(url_for("app:home.index"))