personal memory agent
at main 211 lines 8.1 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Deterministic activity state machine replacing LLM-based activity tracking.""" 5 6from datetime import datetime, timezone 7 8from think.activities import LEVEL_VALUES, make_activity_id 9from think.utils import segment_parse 10 11# 10 min; R&D default. Existing LLM hook uses 3600. 12GAP_THRESHOLD_SECONDS = 600 13 14 15class ActivityStateMachine: 16 def __init__(self) -> None: 17 self.state: dict[str, dict] = {} 18 self.last_segment_key: str | None = None 19 self.last_segment_day: str | None = None 20 self.history: list[dict] = [] 21 self._completed: list[dict] = [] 22 23 def _parse_segment_seconds(self, segment_key: str) -> int | None: 24 start_time, _end_time = segment_parse(segment_key) 25 if start_time is None: 26 return None 27 return start_time.hour * 3600 + start_time.minute * 60 + start_time.second 28 29 def _parse_segment_end_seconds(self, segment_key: str) -> int | None: 30 _start_time, end_time = segment_parse(segment_key) 31 if end_time is None: 32 return None 33 return end_time.hour * 3600 + end_time.minute * 60 + end_time.second 34 35 def _should_reset( 36 self, segment_key: str, day: str, previous_segment_key: str | None 37 ) -> bool: 38 if self.last_segment_day is None: 39 return False 40 if day != self.last_segment_day: 41 return True 42 43 prev_key = previous_segment_key or self.last_segment_key 44 if prev_key is None: 45 return False 46 47 prev_end = self._parse_segment_end_seconds(prev_key) 48 curr_start = self._parse_segment_seconds(segment_key) 49 if prev_end is None or curr_start is None: 50 return False 51 52 return (curr_start - prev_end) > GAP_THRESHOLD_SECONDS 53 54 def _end_all(self, segment_key: str, change: str) -> list[dict]: 55 changes = [] 56 for facet in sorted(self.state): 57 prior = self.state[facet] 58 entry = { 59 "id": prior["id"], 60 "activity": prior["activity"], 61 "state": "ended", 62 "since": prior["since"], 63 "description": prior["description"], 64 "_change": change, 65 "_facet": facet, 66 "_segment": segment_key, 67 } 68 changes.append(entry) 69 self._completed.append(self._make_completed_record(prior)) 70 71 self.state = {} 72 return changes 73 74 def _make_completed_record(self, entry: dict) -> dict: 75 return { 76 "id": entry["id"], 77 "activity": entry["activity"], 78 "segments": [entry["since"]], 79 "level_avg": LEVEL_VALUES.get(entry.get("level", "medium"), 0.5), 80 "description": entry["description"], 81 "active_entities": entry.get("active_entities", []), 82 "created_at": datetime.now(tz=timezone.utc).isoformat(), 83 } 84 85 def update( 86 self, 87 sense_output: dict, 88 segment_key: str, 89 day: str, 90 previous_segment_key: str | None = None, 91 ) -> list[dict]: 92 changes = [] 93 94 if self._should_reset(segment_key, day, previous_segment_key): 95 changes.extend(self._end_all(segment_key, "ended_gap")) 96 97 density = sense_output.get("density") or "active" 98 content_type = sense_output.get("content_type") or "idle" 99 activity_summary = sense_output.get("activity_summary") or "" 100 raw_entities = sense_output.get("entities") or [] 101 entity_names = [ 102 entry["name"] 103 for entry in raw_entities 104 if isinstance(entry, dict) and entry.get("name") 105 ] 106 raw_facets = sense_output.get("facets") or [] 107 108 if density == "idle": 109 changes.extend(self._end_all(segment_key, "ended_idle")) 110 self.last_segment_key = segment_key 111 self.last_segment_day = day 112 self.history.extend(dict(change) for change in changes) 113 return changes 114 115 facet_map = {} 116 for facet in raw_facets: 117 if isinstance(facet, dict) and facet.get("facet"): 118 facet_map[facet["facet"]] = facet 119 current_facets = set(facet_map.keys()) if facet_map else {"__"} 120 121 for facet in sorted(set(self.state.keys()) - current_facets): 122 prior = self.state.pop(facet) 123 entry = { 124 "id": prior["id"], 125 "activity": prior["activity"], 126 "state": "ended", 127 "since": prior["since"], 128 "description": prior["description"], 129 "_change": "ended_facet_gone", 130 "_facet": facet, 131 "_segment": segment_key, 132 } 133 changes.append(entry) 134 self._completed.append(self._make_completed_record(prior)) 135 136 for facet in sorted(current_facets): 137 facet_data = facet_map.get(facet, {}) 138 level = facet_data.get("level", "medium") 139 if level not in ("high", "medium", "low"): 140 level = "medium" 141 142 if facet in self.state: 143 prior = self.state[facet] 144 if prior["activity"] != content_type: 145 ended = { 146 "id": prior["id"], 147 "activity": prior["activity"], 148 "state": "ended", 149 "since": prior["since"], 150 "description": prior["description"], 151 "_change": "ended_type_change", 152 "_facet": facet, 153 "_segment": segment_key, 154 } 155 changes.append(ended) 156 self._completed.append(self._make_completed_record(prior)) 157 158 new_entry = { 159 "id": make_activity_id(content_type, segment_key), 160 "activity": content_type, 161 "state": "active", 162 "since": segment_key, 163 "description": activity_summary, 164 "level": level, 165 "active_entities": entity_names, 166 "_change": "new", 167 "_facet": facet, 168 "_segment": segment_key, 169 } 170 self.state[facet] = new_entry 171 changes.append(dict(new_entry)) 172 else: 173 prior["description"] = activity_summary 174 prior["level"] = level 175 prior["active_entities"] = entity_names 176 prior["_change"] = "continuing" 177 prior["_segment"] = segment_key 178 changes.append(dict(prior)) 179 else: 180 new_entry = { 181 "id": make_activity_id(content_type, segment_key), 182 "activity": content_type, 183 "state": "active", 184 "since": segment_key, 185 "description": activity_summary, 186 "level": level, 187 "active_entities": entity_names, 188 "_change": "new", 189 "_facet": facet, 190 "_segment": segment_key, 191 } 192 self.state[facet] = new_entry 193 changes.append(dict(new_entry)) 194 195 self.last_segment_key = segment_key 196 self.last_segment_day = day 197 self.history.extend(dict(change) for change in changes) 198 return changes 199 200 def get_current_state(self) -> list[dict]: 201 result = [] 202 for facet in sorted(self.state): 203 entry = self.state[facet] 204 clean = {k: v for k, v in entry.items() if not k.startswith("_")} 205 if "active_entities" in clean: 206 clean["active_entities"] = list(clean["active_entities"]) 207 result.append(clean) 208 return result 209 210 def get_completed_activities(self) -> list[dict]: 211 return list(self._completed)