personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Deterministic activity state machine replacing LLM-based activity tracking."""
5
6from datetime import datetime, timezone
7
8from think.activities import LEVEL_VALUES, make_activity_id
9from think.utils import segment_parse
10
11# 10 min; R&D default. Existing LLM hook uses 3600.
12GAP_THRESHOLD_SECONDS = 600
13
14
15class ActivityStateMachine:
16 def __init__(self) -> None:
17 self.state: dict[str, dict] = {}
18 self.last_segment_key: str | None = None
19 self.last_segment_day: str | None = None
20 self.history: list[dict] = []
21 self._completed: list[dict] = []
22
23 def _parse_segment_seconds(self, segment_key: str) -> int | None:
24 start_time, _end_time = segment_parse(segment_key)
25 if start_time is None:
26 return None
27 return start_time.hour * 3600 + start_time.minute * 60 + start_time.second
28
29 def _parse_segment_end_seconds(self, segment_key: str) -> int | None:
30 _start_time, end_time = segment_parse(segment_key)
31 if end_time is None:
32 return None
33 return end_time.hour * 3600 + end_time.minute * 60 + end_time.second
34
35 def _should_reset(
36 self, segment_key: str, day: str, previous_segment_key: str | None
37 ) -> bool:
38 if self.last_segment_day is None:
39 return False
40 if day != self.last_segment_day:
41 return True
42
43 prev_key = previous_segment_key or self.last_segment_key
44 if prev_key is None:
45 return False
46
47 prev_end = self._parse_segment_end_seconds(prev_key)
48 curr_start = self._parse_segment_seconds(segment_key)
49 if prev_end is None or curr_start is None:
50 return False
51
52 return (curr_start - prev_end) > GAP_THRESHOLD_SECONDS
53
54 def _end_all(self, segment_key: str, change: str) -> list[dict]:
55 changes = []
56 for facet in sorted(self.state):
57 prior = self.state[facet]
58 entry = {
59 "id": prior["id"],
60 "activity": prior["activity"],
61 "state": "ended",
62 "since": prior["since"],
63 "description": prior["description"],
64 "_change": change,
65 "_facet": facet,
66 "_segment": segment_key,
67 }
68 changes.append(entry)
69 self._completed.append(self._make_completed_record(prior))
70
71 self.state = {}
72 return changes
73
74 def _make_completed_record(self, entry: dict) -> dict:
75 return {
76 "id": entry["id"],
77 "activity": entry["activity"],
78 "segments": [entry["since"]],
79 "level_avg": LEVEL_VALUES.get(entry.get("level", "medium"), 0.5),
80 "description": entry["description"],
81 "active_entities": entry.get("active_entities", []),
82 "created_at": datetime.now(tz=timezone.utc).isoformat(),
83 }
84
85 def update(
86 self,
87 sense_output: dict,
88 segment_key: str,
89 day: str,
90 previous_segment_key: str | None = None,
91 ) -> list[dict]:
92 changes = []
93
94 if self._should_reset(segment_key, day, previous_segment_key):
95 changes.extend(self._end_all(segment_key, "ended_gap"))
96
97 density = sense_output.get("density") or "active"
98 content_type = sense_output.get("content_type") or "idle"
99 activity_summary = sense_output.get("activity_summary") or ""
100 raw_entities = sense_output.get("entities") or []
101 entity_names = [
102 entry["name"]
103 for entry in raw_entities
104 if isinstance(entry, dict) and entry.get("name")
105 ]
106 raw_facets = sense_output.get("facets") or []
107
108 if density == "idle":
109 changes.extend(self._end_all(segment_key, "ended_idle"))
110 self.last_segment_key = segment_key
111 self.last_segment_day = day
112 self.history.extend(dict(change) for change in changes)
113 return changes
114
115 facet_map = {}
116 for facet in raw_facets:
117 if isinstance(facet, dict) and facet.get("facet"):
118 facet_map[facet["facet"]] = facet
119 current_facets = set(facet_map.keys()) if facet_map else {"__"}
120
121 for facet in sorted(set(self.state.keys()) - current_facets):
122 prior = self.state.pop(facet)
123 entry = {
124 "id": prior["id"],
125 "activity": prior["activity"],
126 "state": "ended",
127 "since": prior["since"],
128 "description": prior["description"],
129 "_change": "ended_facet_gone",
130 "_facet": facet,
131 "_segment": segment_key,
132 }
133 changes.append(entry)
134 self._completed.append(self._make_completed_record(prior))
135
136 for facet in sorted(current_facets):
137 facet_data = facet_map.get(facet, {})
138 level = facet_data.get("level", "medium")
139 if level not in ("high", "medium", "low"):
140 level = "medium"
141
142 if facet in self.state:
143 prior = self.state[facet]
144 if prior["activity"] != content_type:
145 ended = {
146 "id": prior["id"],
147 "activity": prior["activity"],
148 "state": "ended",
149 "since": prior["since"],
150 "description": prior["description"],
151 "_change": "ended_type_change",
152 "_facet": facet,
153 "_segment": segment_key,
154 }
155 changes.append(ended)
156 self._completed.append(self._make_completed_record(prior))
157
158 new_entry = {
159 "id": make_activity_id(content_type, segment_key),
160 "activity": content_type,
161 "state": "active",
162 "since": segment_key,
163 "description": activity_summary,
164 "level": level,
165 "active_entities": entity_names,
166 "_change": "new",
167 "_facet": facet,
168 "_segment": segment_key,
169 }
170 self.state[facet] = new_entry
171 changes.append(dict(new_entry))
172 else:
173 prior["description"] = activity_summary
174 prior["level"] = level
175 prior["active_entities"] = entity_names
176 prior["_change"] = "continuing"
177 prior["_segment"] = segment_key
178 changes.append(dict(prior))
179 else:
180 new_entry = {
181 "id": make_activity_id(content_type, segment_key),
182 "activity": content_type,
183 "state": "active",
184 "since": segment_key,
185 "description": activity_summary,
186 "level": level,
187 "active_entities": entity_names,
188 "_change": "new",
189 "_facet": facet,
190 "_segment": segment_key,
191 }
192 self.state[facet] = new_entry
193 changes.append(dict(new_entry))
194
195 self.last_segment_key = segment_key
196 self.last_segment_day = day
197 self.history.extend(dict(change) for change in changes)
198 return changes
199
200 def get_current_state(self) -> list[dict]:
201 result = []
202 for facet in sorted(self.state):
203 entry = self.state[facet]
204 clean = {k: v for k, v in entry.items() if not k.startswith("_")}
205 if "active_entities" in clean:
206 clean["active_entities"] = list(clean["active_entities"])
207 result.append(clean)
208 return result
209
210 def get_completed_activities(self) -> list[dict]:
211 return list(self._completed)