personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Root blueprint: authentication and core routes."""
5
6from __future__ import annotations
7
8import json
9import os
10from datetime import date
11from pathlib import Path
12from typing import Any
13
14from flask import (
15 Blueprint,
16 jsonify,
17 redirect,
18 render_template,
19 request,
20 send_from_directory,
21 session,
22 url_for,
23)
24from werkzeug.security import check_password_hash, generate_password_hash
25
26from think.cluster import cluster_segments
27from think.utils import day_dirs, get_config, get_journal
28
29
30def _get_password_hash() -> str:
31 """Get current password hash from config, reloading on each call."""
32 try:
33 config = get_config()
34 convey_config = config.get("convey", {})
35 return convey_config.get("password_hash", "")
36 except Exception:
37 return ""
38
39
40def _save_config_section(section: str, data: dict) -> dict:
41 """Merge data into a config section and write back to journal.json."""
42 config = get_config()
43 config.setdefault(section, {}).update(data)
44 config_path = Path(get_journal()) / "config" / "journal.json"
45 config_path.parent.mkdir(parents=True, exist_ok=True)
46 with open(config_path, "w", encoding="utf-8") as f:
47 json.dump(config, f, indent=2, ensure_ascii=False)
48 f.write("\n")
49 os.chmod(config_path, 0o600)
50 return config
51
52
53bp = Blueprint(
54 "root",
55 __name__,
56 template_folder="templates",
57 static_folder="static",
58)
59
60
61@bp.before_app_request
62def require_login() -> Any:
63 if request.endpoint in {
64 "root.init",
65 "root.init_password",
66 "root.init_identity",
67 "root.init_provider",
68 "root.init_observers",
69 "root.init_finalize",
70 "root.login",
71 "root.static",
72 "root.favicon",
73 # Observer ingest endpoints use key-based auth, not session
74 "app:observer.ingest_upload",
75 "app:observer.ingest_event",
76 "app:observer.ingest_segments",
77 }:
78 return None
79
80 # Auto-bypass for localhost requests WITHOUT proxy headers
81 remote_addr = request.remote_addr
82 is_localhost = remote_addr in ("127.0.0.1", "::1", "localhost")
83
84 # Detect proxy headers that might indicate forwarded external request
85 proxy_headers = (
86 request.headers.get("X-Forwarded-For")
87 or request.headers.get("X-Real-IP")
88 or request.headers.get("X-Forwarded-Host")
89 )
90
91 if is_localhost and not proxy_headers:
92 # Genuine localhost request - auto-bypass
93 return None
94
95 # Otherwise require session authentication
96 if not session.get("logged_in"):
97 if not _get_password_hash():
98 return redirect(url_for("root.init"))
99 return redirect(url_for("root.login"))
100
101
102@bp.route("/login", methods=["GET", "POST"])
103def login() -> Any:
104 # Re-check password from config on each request
105 password_hash = _get_password_hash()
106
107 # If no password is configured, show error page
108 if not password_hash:
109 error = "No password configured. Run 'sol password set' to set one."
110 return render_template("login.html", error=error, no_password=True)
111
112 error = None
113 if request.method == "POST":
114 if check_password_hash(password_hash, request.form.get("password", "")):
115 session["logged_in"] = True
116 session.permanent = True
117 return redirect(url_for("root.index"))
118 error = "Invalid password"
119 return render_template("login.html", error=error, no_password=False)
120
121
122@bp.route("/init")
123def init() -> Any:
124 if _get_password_hash():
125 return redirect(url_for("root.index"))
126
127 config_path = str(Path(get_journal()) / "config" / "journal.json")
128 repo_path = str(Path(__file__).resolve().parent.parent)
129 return render_template("init.html", config_path=config_path, repo_path=repo_path)
130
131
132@bp.route("/init/password", methods=["POST"])
133def init_password() -> Any:
134 if _get_password_hash():
135 return jsonify({"error": "Already configured"}), 400
136
137 data = request.get_json(silent=True) or {}
138 password = data.get("password", "")
139 if len(password) < 8:
140 return jsonify({"error": "Password must be at least 8 characters"}), 400
141
142 hashed = generate_password_hash(password)
143 _save_config_section("convey", {"password_hash": hashed})
144 return jsonify({"success": True})
145
146
147@bp.route("/init/identity", methods=["POST"])
148def init_identity() -> Any:
149 if not _get_password_hash():
150 return jsonify({"error": "Password required first"}), 403
151
152 data = request.get_json(silent=True) or {}
153 allowed = {k: data[k] for k in ("name", "preferred", "timezone") if k in data}
154 _save_config_section("identity", allowed)
155 return jsonify({"success": True})
156
157
158@bp.route("/init/provider", methods=["POST"])
159def init_provider() -> Any:
160 if not _get_password_hash():
161 return jsonify({"error": "Password required first"}), 403
162
163 data = request.get_json(silent=True) or {}
164 key = data.get("key", "")
165 _save_config_section("env", {"GOOGLE_API_KEY": key})
166
167 from think.providers import validate_key
168
169 try:
170 result = validate_key("google", key)
171 except Exception as e:
172 result = {"valid": False, "error": str(e)}
173 return jsonify({"success": True, "validation": result})
174
175
176@bp.route("/init/observers")
177def init_observers() -> Any:
178 if not _get_password_hash():
179 return jsonify({"error": "Password required first"}), 403
180
181 from apps.observer.utils import list_observers
182
183 observers_list = []
184 for observer in list_observers():
185 if observer.get("revoked", False):
186 continue
187 observers_list.append(
188 {
189 "key_prefix": observer.get("key", "")[:8],
190 "name": observer.get("name", ""),
191 "created_at": observer.get("created_at", 0),
192 "last_seen": observer.get("last_seen"),
193 "last_segment": observer.get("last_segment"),
194 "enabled": observer.get("enabled", True),
195 "revoked": observer.get("revoked", False),
196 "revoked_at": observer.get("revoked_at"),
197 "stats": observer.get("stats", {}),
198 }
199 )
200 return jsonify(observers_list)
201
202
203@bp.route("/init/finalize", methods=["POST"])
204def init_finalize() -> Any:
205 if not _get_password_hash():
206 return jsonify({"error": "Password required first"}), 403
207
208 from think.utils import now_ms
209
210 data = request.get_json(silent=True) or {}
211 coding_agent = data.get("coding_agent", "")
212 _save_config_section(
213 "setup",
214 {"coding_agent": coding_agent, "completed_at": now_ms()},
215 )
216 session["logged_in"] = True
217 session.permanent = True
218 return jsonify({"success": True, "redirect": url_for("root.index")})
219
220
221@bp.route("/logout")
222def logout() -> Any:
223 session.pop("logged_in", None)
224 return redirect(url_for("root.login"))
225
226
227@bp.route("/favicon.ico")
228def favicon() -> Any:
229 """Serve the favicon from the project root."""
230 project_root = os.path.dirname(
231 os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
232 )
233 return send_from_directory(project_root, "favicon.ico", mimetype="image/x-icon")
234
235
236@bp.route("/app/today")
237def app_today() -> Any:
238 """Redirect /app/today to the most recent day with journal data."""
239 today = date.today().strftime("%Y%m%d")
240 for day in sorted(day_dirs().keys(), reverse=True):
241 if cluster_segments(day):
242 return redirect(url_for("app:transcripts.transcripts_day", day=day))
243 return redirect(url_for("app:transcripts.transcripts_day", day=today))
244
245
246@bp.route("/")
247def index() -> Any:
248 """Root redirect — always to home; the app handles new journals there."""
249 return redirect(url_for("app:home.index"))