personal memory agent
at main 402 lines 14 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Semantic markdown chunking formatter. 5 6This module provides semantic chunking for markdown files, breaking documents 7into context-aware chunks that preserve header hierarchy and structural context. 8Each chunk is self-contained with its relevant headers included in the AST. 9""" 10 11import copy 12import logging 13from typing import Any 14 15LOG = logging.getLogger(__name__) 16 17_MAX_LINE_CHARS = 2048 18_MAX_CHUNK_CHARS = 4096 19 20import mistune 21from mistune.core import BlockState 22from mistune.plugins.table import table 23from mistune.renderers.markdown import MarkdownRenderer 24 25 26class ExtendedMarkdownRenderer(MarkdownRenderer): 27 """MarkdownRenderer extended with table support.""" 28 29 def table(self, token, state): 30 return self.render_children(token, state) + "\n" 31 32 def table_head(self, token, state): 33 cells = token.get("children", []) 34 if not cells: 35 return "" 36 37 header_line = ( 38 "| " 39 + " | ".join(self.render_children(cell, state).strip() for cell in cells) 40 + " |" 41 ) 42 43 sep_parts = [] 44 for cell in cells: 45 align = cell.get("attrs", {}).get("align") 46 if align == "left": 47 sep_parts.append(":---") 48 elif align == "right": 49 sep_parts.append("---:") 50 elif align == "center": 51 sep_parts.append(":---:") 52 else: 53 sep_parts.append("---") 54 sep_line = "| " + " | ".join(sep_parts) + " |" 55 56 return header_line + "\n" + sep_line + "\n" 57 58 def table_body(self, token, state): 59 return self.render_children(token, state) 60 61 def table_row(self, token, state): 62 cells = token.get("children", []) 63 return ( 64 "| " 65 + " | ".join(self.render_children(cell, state).strip() for cell in cells) 66 + " |\n" 67 ) 68 69 def table_cell(self, token, state): 70 return self.render_children(token, state) 71 72 73def extract_text(node) -> str: 74 """Recursively extract raw text from a node for preview purposes.""" 75 if isinstance(node, str): 76 return node 77 if isinstance(node, list): 78 return " ".join(extract_text(n) for n in node) 79 if isinstance(node, dict): 80 if "raw" in node: 81 return node["raw"] 82 if "children" in node: 83 return extract_text(node["children"]) 84 return "" 85 86 87def get_header_path(header_stack: list) -> list[dict]: 88 """Extract header text path from header stack.""" 89 path = [] 90 for h in header_stack: 91 level = h.get("attrs", {}).get("level", 1) 92 text = extract_text(h.get("children", [])) 93 path.append({"level": level, "text": text}) 94 return path 95 96 97def find_next_content_node(ast_data: list, start_idx: int) -> dict | None: 98 """Find the next non-blank node after start_idx.""" 99 for i in range(start_idx + 1, len(ast_data)): 100 if ast_data[i].get("type") != "blank_line": 101 return ast_data[i] 102 return None 103 104 105def is_intro_paragraph(node: dict, next_content_node: dict | None) -> bool: 106 """Check if a paragraph is an intro (precedes a list or table).""" 107 if node.get("type") != "paragraph": 108 return False 109 if not next_content_node or next_content_node.get("type") not in ("list", "table"): 110 return False 111 return True 112 113 114def is_simple_text_item(item: dict) -> bool: 115 """Check if list item is simple text (no complex sub-structures).""" 116 children = item.get("children", []) 117 if len(children) != 1: 118 return False 119 child = children[0] 120 if child.get("type") not in ("paragraph", "block_text"): 121 return False 122 return True 123 124 125def is_definition_item(item: dict) -> bool: 126 """Check if item matches **field:** value pattern (no trailing period).""" 127 if not is_simple_text_item(item): 128 return False 129 text_block = item["children"][0] 130 kids = text_block.get("children", []) 131 if not kids or kids[0].get("type") != "strong": 132 return False 133 strong_text = extract_text(kids[0]) 134 following_text = extract_text(kids[1:]) if len(kids) > 1 else "" 135 has_colon = strong_text.rstrip().endswith( 136 ":" 137 ) or following_text.lstrip().startswith(":") 138 if not has_colon: 139 return False 140 full_text = extract_text(text_block).strip() 141 return not full_text.endswith(".") 142 143 144def is_definition_list(list_node: dict) -> bool: 145 """Check if list is primarily definition-style (2+ matching items, >=50%).""" 146 items = [c for c in list_node.get("children", []) if c.get("type") == "list_item"] 147 if len(items) < 2: 148 return False 149 matches = sum(1 for item in items if is_definition_item(item)) 150 return matches >= 2 and matches >= len(items) * 0.5 151 152 153def chunk_ast(ast_data: list) -> list[dict]: 154 """Process Mistune AST into context-aware semantic chunks. 155 156 Returns a list of dicts with: 157 - index: chunk index 158 - type: chunk type (paragraph, list_item, table_row, etc.) 159 - header_path: list of {level, text} for header context 160 - intro: optional intro paragraph text 161 - preview: text preview of the chunk content 162 - ast: the chunk's AST (headers + intro + content) 163 """ 164 chunks = [] 165 header_stack = [] 166 intro_paragraph = None 167 168 for i, node in enumerate(ast_data): 169 node_type = node.get("type") 170 next_content = find_next_content_node(ast_data, i) 171 172 # Handle Headings (Context Builders) 173 if node_type == "heading": 174 level = node.get("attrs", {}).get("level", 1) 175 header_stack = [ 176 h for h in header_stack if h.get("attrs", {}).get("level", 0) < level 177 ] 178 header_stack.append(node) 179 intro_paragraph = None 180 181 # Handle Paragraphs 182 elif node_type == "paragraph": 183 if is_intro_paragraph(node, next_content): 184 intro_paragraph = node 185 else: 186 intro_paragraph = None 187 chunk_ast_nodes = copy.deepcopy(header_stack) 188 chunk_ast_nodes.append(node) 189 chunks.append( 190 { 191 "index": len(chunks), 192 "type": "paragraph", 193 "header_path": get_header_path(header_stack), 194 "preview": extract_text(node)[:100], 195 "ast": chunk_ast_nodes, 196 } 197 ) 198 199 # Handle Lists (Container Nodes) 200 elif node_type == "list": 201 if is_definition_list(node): 202 chunk_ast_list = copy.deepcopy(header_stack) 203 if intro_paragraph: 204 chunk_ast_list.append(copy.deepcopy(intro_paragraph)) 205 chunk_ast_list.append(node) 206 chunks.append( 207 { 208 "index": len(chunks), 209 "type": "definition_list", 210 "header_path": get_header_path(header_stack), 211 "intro": ( 212 extract_text(intro_paragraph)[:100] 213 if intro_paragraph 214 else None 215 ), 216 "preview": extract_text(node)[:100], 217 "ast": chunk_ast_list, 218 } 219 ) 220 else: 221 for item in node.get("children", []): 222 if item.get("type") == "list_item": 223 synthetic_list = copy.deepcopy(node) 224 synthetic_list["children"] = [item] 225 226 chunk_ast_list = copy.deepcopy(header_stack) 227 if intro_paragraph: 228 chunk_ast_list.append(copy.deepcopy(intro_paragraph)) 229 chunk_ast_list.append(synthetic_list) 230 chunks.append( 231 { 232 "index": len(chunks), 233 "type": "list_item", 234 "header_path": get_header_path(header_stack), 235 "intro": ( 236 extract_text(intro_paragraph)[:100] 237 if intro_paragraph 238 else None 239 ), 240 "preview": extract_text(item)[:100], 241 "ast": chunk_ast_list, 242 } 243 ) 244 intro_paragraph = None 245 246 # Handle Tables (Complex Container Nodes) 247 elif node_type == "table": 248 children = node.get("children", []) 249 thead = next((c for c in children if c["type"] == "table_head"), None) 250 tbody = next((c for c in children if c["type"] == "table_body"), None) 251 252 if tbody: 253 for row in tbody.get("children", []): 254 if row.get("type") == "table_row": 255 synthetic_table = copy.deepcopy(node) 256 synthetic_body = copy.deepcopy(tbody) 257 synthetic_body["children"] = [row] 258 259 new_children = [] 260 if thead: 261 new_children.append(thead) 262 new_children.append(synthetic_body) 263 synthetic_table["children"] = new_children 264 265 chunk_ast_nodes = copy.deepcopy(header_stack) 266 if intro_paragraph: 267 chunk_ast_nodes.append(copy.deepcopy(intro_paragraph)) 268 chunk_ast_nodes.append(synthetic_table) 269 chunks.append( 270 { 271 "index": len(chunks), 272 "type": "table_row", 273 "header_path": get_header_path(header_stack), 274 "intro": ( 275 extract_text(intro_paragraph)[:100] 276 if intro_paragraph 277 else None 278 ), 279 "preview": extract_text(row)[:100], 280 "ast": chunk_ast_nodes, 281 } 282 ) 283 intro_paragraph = None 284 285 # Handle Block Code 286 elif node_type == "block_code": 287 chunk_ast_nodes = copy.deepcopy(header_stack) 288 chunk_ast_nodes.append(node) 289 info = node.get("attrs", {}).get("info", "") 290 raw = node.get("raw", "")[:80] 291 chunks.append( 292 { 293 "index": len(chunks), 294 "type": "block_code", 295 "header_path": get_header_path(header_stack), 296 "preview": f"[{info}] {raw}" if info else raw, 297 "ast": chunk_ast_nodes, 298 } 299 ) 300 301 # Handle Blockquotes 302 elif node_type == "block_quote": 303 chunk_ast_nodes = copy.deepcopy(header_stack) 304 chunk_ast_nodes.append(node) 305 chunks.append( 306 { 307 "index": len(chunks), 308 "type": "block_quote", 309 "header_path": get_header_path(header_stack), 310 "preview": extract_text(node)[:100], 311 "ast": chunk_ast_nodes, 312 } 313 ) 314 315 # Skip Thematic Breaks (no indexable content) 316 317 return chunks 318 319 320def parse_markdown(text: str) -> list: 321 """Parse markdown text into AST tokens.""" 322 md = mistune.create_markdown(renderer=None, plugins=[table]) 323 return md(text) 324 325 326def render_chunk(chunk: dict) -> str: 327 """Render a chunk's AST back to markdown.""" 328 renderer = ExtendedMarkdownRenderer() 329 return renderer(chunk["ast"], state=BlockState()) 330 331 332def chunk_markdown(text: str) -> list[dict]: 333 """Parse markdown and return semantic chunks.""" 334 ast = parse_markdown(text) 335 return chunk_ast(ast) 336 337 338def sanitize_markdown(text: str) -> str: 339 """Drop degenerate lines that exceed the max line length. 340 341 AI models (notably older Gemini Flash) sometimes produce lines with 342 thousands of repeated characters or whitespace-padded table cells. 343 These are not useful content and bloat the index. 344 """ 345 lines = text.split("\n") 346 clean: list[str] = [] 347 dropped = 0 348 for line in lines: 349 if len(line) > _MAX_LINE_CHARS: 350 dropped += 1 351 continue 352 clean.append(line) 353 if dropped: 354 LOG.warning( 355 "Dropped %d line(s) exceeding %d chars during markdown sanitization", 356 dropped, 357 _MAX_LINE_CHARS, 358 ) 359 return "\n".join(clean) 360 361 362def _render_header_stub(raw_chunk: dict, original_size: int) -> str: 363 """Render a header-only stub for an oversized chunk.""" 364 parts = [] 365 for h in raw_chunk.get("header_path", []): 366 prefix = "#" * h["level"] 367 parts.append(f"{prefix} {h['text']}") 368 parts.append(f"\n[Content too large to index: {original_size:,} chars]") 369 return "\n\n".join(parts) 370 371 372def format_markdown( 373 text: str, 374 context: dict[str, Any] | None = None, 375) -> tuple[list[dict[str, Any]], dict[str, Any]]: 376 """Format markdown text into semantic chunks. 377 378 This is the formatter interface for markdown files. Each chunk contains 379 its full context (headers, intro paragraphs) rendered back to markdown. 380 381 Note: Unlike JSONL formatters, this does not return indexer metadata. 382 Agent for markdown files is derived from path by extract_path_metadata(). 383 384 Args: 385 text: Markdown text to chunk 386 context: Optional context dict (unused, for formatter interface compatibility) 387 388 Returns: 389 Tuple of (chunks, meta) where: 390 - chunks: List of {"markdown": str} dicts (timestamp omitted) 391 - meta: Empty dict (no header or indexer - context is in each chunk, 392 agent is path-derived) 393 """ 394 text = sanitize_markdown(text) 395 raw_chunks = chunk_markdown(text) 396 chunks = [] 397 for rc in raw_chunks: 398 rendered = render_chunk(rc) 399 if len(rendered) > _MAX_CHUNK_CHARS: 400 rendered = _render_header_stub(rc, len(rendered)) 401 chunks.append({"markdown": rendered}) 402 return chunks, {}