personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Semantic markdown chunking formatter.
5
6This module provides semantic chunking for markdown files, breaking documents
7into context-aware chunks that preserve header hierarchy and structural context.
8Each chunk is self-contained with its relevant headers included in the AST.
9"""
10
11import copy
12import logging
13from typing import Any
14
15LOG = logging.getLogger(__name__)
16
17_MAX_LINE_CHARS = 2048
18_MAX_CHUNK_CHARS = 4096
19
20import mistune
21from mistune.core import BlockState
22from mistune.plugins.table import table
23from mistune.renderers.markdown import MarkdownRenderer
24
25
26class ExtendedMarkdownRenderer(MarkdownRenderer):
27 """MarkdownRenderer extended with table support."""
28
29 def table(self, token, state):
30 return self.render_children(token, state) + "\n"
31
32 def table_head(self, token, state):
33 cells = token.get("children", [])
34 if not cells:
35 return ""
36
37 header_line = (
38 "| "
39 + " | ".join(self.render_children(cell, state).strip() for cell in cells)
40 + " |"
41 )
42
43 sep_parts = []
44 for cell in cells:
45 align = cell.get("attrs", {}).get("align")
46 if align == "left":
47 sep_parts.append(":---")
48 elif align == "right":
49 sep_parts.append("---:")
50 elif align == "center":
51 sep_parts.append(":---:")
52 else:
53 sep_parts.append("---")
54 sep_line = "| " + " | ".join(sep_parts) + " |"
55
56 return header_line + "\n" + sep_line + "\n"
57
58 def table_body(self, token, state):
59 return self.render_children(token, state)
60
61 def table_row(self, token, state):
62 cells = token.get("children", [])
63 return (
64 "| "
65 + " | ".join(self.render_children(cell, state).strip() for cell in cells)
66 + " |\n"
67 )
68
69 def table_cell(self, token, state):
70 return self.render_children(token, state)
71
72
73def extract_text(node) -> str:
74 """Recursively extract raw text from a node for preview purposes."""
75 if isinstance(node, str):
76 return node
77 if isinstance(node, list):
78 return " ".join(extract_text(n) for n in node)
79 if isinstance(node, dict):
80 if "raw" in node:
81 return node["raw"]
82 if "children" in node:
83 return extract_text(node["children"])
84 return ""
85
86
87def get_header_path(header_stack: list) -> list[dict]:
88 """Extract header text path from header stack."""
89 path = []
90 for h in header_stack:
91 level = h.get("attrs", {}).get("level", 1)
92 text = extract_text(h.get("children", []))
93 path.append({"level": level, "text": text})
94 return path
95
96
97def find_next_content_node(ast_data: list, start_idx: int) -> dict | None:
98 """Find the next non-blank node after start_idx."""
99 for i in range(start_idx + 1, len(ast_data)):
100 if ast_data[i].get("type") != "blank_line":
101 return ast_data[i]
102 return None
103
104
105def is_intro_paragraph(node: dict, next_content_node: dict | None) -> bool:
106 """Check if a paragraph is an intro (precedes a list or table)."""
107 if node.get("type") != "paragraph":
108 return False
109 if not next_content_node or next_content_node.get("type") not in ("list", "table"):
110 return False
111 return True
112
113
114def is_simple_text_item(item: dict) -> bool:
115 """Check if list item is simple text (no complex sub-structures)."""
116 children = item.get("children", [])
117 if len(children) != 1:
118 return False
119 child = children[0]
120 if child.get("type") not in ("paragraph", "block_text"):
121 return False
122 return True
123
124
125def is_definition_item(item: dict) -> bool:
126 """Check if item matches **field:** value pattern (no trailing period)."""
127 if not is_simple_text_item(item):
128 return False
129 text_block = item["children"][0]
130 kids = text_block.get("children", [])
131 if not kids or kids[0].get("type") != "strong":
132 return False
133 strong_text = extract_text(kids[0])
134 following_text = extract_text(kids[1:]) if len(kids) > 1 else ""
135 has_colon = strong_text.rstrip().endswith(
136 ":"
137 ) or following_text.lstrip().startswith(":")
138 if not has_colon:
139 return False
140 full_text = extract_text(text_block).strip()
141 return not full_text.endswith(".")
142
143
144def is_definition_list(list_node: dict) -> bool:
145 """Check if list is primarily definition-style (2+ matching items, >=50%)."""
146 items = [c for c in list_node.get("children", []) if c.get("type") == "list_item"]
147 if len(items) < 2:
148 return False
149 matches = sum(1 for item in items if is_definition_item(item))
150 return matches >= 2 and matches >= len(items) * 0.5
151
152
153def chunk_ast(ast_data: list) -> list[dict]:
154 """Process Mistune AST into context-aware semantic chunks.
155
156 Returns a list of dicts with:
157 - index: chunk index
158 - type: chunk type (paragraph, list_item, table_row, etc.)
159 - header_path: list of {level, text} for header context
160 - intro: optional intro paragraph text
161 - preview: text preview of the chunk content
162 - ast: the chunk's AST (headers + intro + content)
163 """
164 chunks = []
165 header_stack = []
166 intro_paragraph = None
167
168 for i, node in enumerate(ast_data):
169 node_type = node.get("type")
170 next_content = find_next_content_node(ast_data, i)
171
172 # Handle Headings (Context Builders)
173 if node_type == "heading":
174 level = node.get("attrs", {}).get("level", 1)
175 header_stack = [
176 h for h in header_stack if h.get("attrs", {}).get("level", 0) < level
177 ]
178 header_stack.append(node)
179 intro_paragraph = None
180
181 # Handle Paragraphs
182 elif node_type == "paragraph":
183 if is_intro_paragraph(node, next_content):
184 intro_paragraph = node
185 else:
186 intro_paragraph = None
187 chunk_ast_nodes = copy.deepcopy(header_stack)
188 chunk_ast_nodes.append(node)
189 chunks.append(
190 {
191 "index": len(chunks),
192 "type": "paragraph",
193 "header_path": get_header_path(header_stack),
194 "preview": extract_text(node)[:100],
195 "ast": chunk_ast_nodes,
196 }
197 )
198
199 # Handle Lists (Container Nodes)
200 elif node_type == "list":
201 if is_definition_list(node):
202 chunk_ast_list = copy.deepcopy(header_stack)
203 if intro_paragraph:
204 chunk_ast_list.append(copy.deepcopy(intro_paragraph))
205 chunk_ast_list.append(node)
206 chunks.append(
207 {
208 "index": len(chunks),
209 "type": "definition_list",
210 "header_path": get_header_path(header_stack),
211 "intro": (
212 extract_text(intro_paragraph)[:100]
213 if intro_paragraph
214 else None
215 ),
216 "preview": extract_text(node)[:100],
217 "ast": chunk_ast_list,
218 }
219 )
220 else:
221 for item in node.get("children", []):
222 if item.get("type") == "list_item":
223 synthetic_list = copy.deepcopy(node)
224 synthetic_list["children"] = [item]
225
226 chunk_ast_list = copy.deepcopy(header_stack)
227 if intro_paragraph:
228 chunk_ast_list.append(copy.deepcopy(intro_paragraph))
229 chunk_ast_list.append(synthetic_list)
230 chunks.append(
231 {
232 "index": len(chunks),
233 "type": "list_item",
234 "header_path": get_header_path(header_stack),
235 "intro": (
236 extract_text(intro_paragraph)[:100]
237 if intro_paragraph
238 else None
239 ),
240 "preview": extract_text(item)[:100],
241 "ast": chunk_ast_list,
242 }
243 )
244 intro_paragraph = None
245
246 # Handle Tables (Complex Container Nodes)
247 elif node_type == "table":
248 children = node.get("children", [])
249 thead = next((c for c in children if c["type"] == "table_head"), None)
250 tbody = next((c for c in children if c["type"] == "table_body"), None)
251
252 if tbody:
253 for row in tbody.get("children", []):
254 if row.get("type") == "table_row":
255 synthetic_table = copy.deepcopy(node)
256 synthetic_body = copy.deepcopy(tbody)
257 synthetic_body["children"] = [row]
258
259 new_children = []
260 if thead:
261 new_children.append(thead)
262 new_children.append(synthetic_body)
263 synthetic_table["children"] = new_children
264
265 chunk_ast_nodes = copy.deepcopy(header_stack)
266 if intro_paragraph:
267 chunk_ast_nodes.append(copy.deepcopy(intro_paragraph))
268 chunk_ast_nodes.append(synthetic_table)
269 chunks.append(
270 {
271 "index": len(chunks),
272 "type": "table_row",
273 "header_path": get_header_path(header_stack),
274 "intro": (
275 extract_text(intro_paragraph)[:100]
276 if intro_paragraph
277 else None
278 ),
279 "preview": extract_text(row)[:100],
280 "ast": chunk_ast_nodes,
281 }
282 )
283 intro_paragraph = None
284
285 # Handle Block Code
286 elif node_type == "block_code":
287 chunk_ast_nodes = copy.deepcopy(header_stack)
288 chunk_ast_nodes.append(node)
289 info = node.get("attrs", {}).get("info", "")
290 raw = node.get("raw", "")[:80]
291 chunks.append(
292 {
293 "index": len(chunks),
294 "type": "block_code",
295 "header_path": get_header_path(header_stack),
296 "preview": f"[{info}] {raw}" if info else raw,
297 "ast": chunk_ast_nodes,
298 }
299 )
300
301 # Handle Blockquotes
302 elif node_type == "block_quote":
303 chunk_ast_nodes = copy.deepcopy(header_stack)
304 chunk_ast_nodes.append(node)
305 chunks.append(
306 {
307 "index": len(chunks),
308 "type": "block_quote",
309 "header_path": get_header_path(header_stack),
310 "preview": extract_text(node)[:100],
311 "ast": chunk_ast_nodes,
312 }
313 )
314
315 # Skip Thematic Breaks (no indexable content)
316
317 return chunks
318
319
320def parse_markdown(text: str) -> list:
321 """Parse markdown text into AST tokens."""
322 md = mistune.create_markdown(renderer=None, plugins=[table])
323 return md(text)
324
325
326def render_chunk(chunk: dict) -> str:
327 """Render a chunk's AST back to markdown."""
328 renderer = ExtendedMarkdownRenderer()
329 return renderer(chunk["ast"], state=BlockState())
330
331
332def chunk_markdown(text: str) -> list[dict]:
333 """Parse markdown and return semantic chunks."""
334 ast = parse_markdown(text)
335 return chunk_ast(ast)
336
337
338def sanitize_markdown(text: str) -> str:
339 """Drop degenerate lines that exceed the max line length.
340
341 AI models (notably older Gemini Flash) sometimes produce lines with
342 thousands of repeated characters or whitespace-padded table cells.
343 These are not useful content and bloat the index.
344 """
345 lines = text.split("\n")
346 clean: list[str] = []
347 dropped = 0
348 for line in lines:
349 if len(line) > _MAX_LINE_CHARS:
350 dropped += 1
351 continue
352 clean.append(line)
353 if dropped:
354 LOG.warning(
355 "Dropped %d line(s) exceeding %d chars during markdown sanitization",
356 dropped,
357 _MAX_LINE_CHARS,
358 )
359 return "\n".join(clean)
360
361
362def _render_header_stub(raw_chunk: dict, original_size: int) -> str:
363 """Render a header-only stub for an oversized chunk."""
364 parts = []
365 for h in raw_chunk.get("header_path", []):
366 prefix = "#" * h["level"]
367 parts.append(f"{prefix} {h['text']}")
368 parts.append(f"\n[Content too large to index: {original_size:,} chars]")
369 return "\n\n".join(parts)
370
371
372def format_markdown(
373 text: str,
374 context: dict[str, Any] | None = None,
375) -> tuple[list[dict[str, Any]], dict[str, Any]]:
376 """Format markdown text into semantic chunks.
377
378 This is the formatter interface for markdown files. Each chunk contains
379 its full context (headers, intro paragraphs) rendered back to markdown.
380
381 Note: Unlike JSONL formatters, this does not return indexer metadata.
382 Agent for markdown files is derived from path by extract_path_metadata().
383
384 Args:
385 text: Markdown text to chunk
386 context: Optional context dict (unused, for formatter interface compatibility)
387
388 Returns:
389 Tuple of (chunks, meta) where:
390 - chunks: List of {"markdown": str} dicts (timestamp omitted)
391 - meta: Empty dict (no header or indexer - context is in each chunk,
392 agent is path-derived)
393 """
394 text = sanitize_markdown(text)
395 raw_chunks = chunk_markdown(text)
396 chunks = []
397 for rc in raw_chunks:
398 rendered = render_chunk(rc)
399 if len(rendered) > _MAX_CHUNK_CHARS:
400 rendered = _render_header_stub(rc, len(rendered))
401 chunks.append({"markdown": rendered})
402 return chunks, {}