An AI agent built to do Ralph loops - plan mode for planning and ralph mode for implementing.
1use crate::db::Database;
2use crate::graph::store::SqliteGraphStore;
3use anyhow::{Result, anyhow};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use tokio_rusqlite::OptionalExtension;
7use uuid::Uuid;
8
9/// A session represents a work period for a goal
10#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
11pub struct Session {
12 pub id: String,
13 pub project_id: String,
14 pub goal_id: String,
15 pub started_at: DateTime<Utc>,
16 pub ended_at: Option<DateTime<Utc>>,
17 pub handoff_notes: Option<String>,
18 pub agent_ids: Vec<String>,
19 pub summary: Option<String>,
20}
21
22/// Store for managing sessions
23pub struct SessionStore {
24 db: Database,
25}
26
27impl SessionStore {
28 /// Create a new SessionStore
29 pub fn new(db: Database) -> Self {
30 Self { db }
31 }
32
33 /// Create a new session for a goal
34 pub async fn create_session(&self, project_id: &str, goal_id: &str) -> Result<Session> {
35 let session_id = format!("sess-{}", &Uuid::new_v4().simple().to_string()[..8]);
36 let now = Utc::now();
37 let now_rfc3339 = now.to_rfc3339();
38 let project_id_owned = project_id.to_string();
39 let goal_id_owned = goal_id.to_string();
40 let session_id_clone = session_id.clone();
41
42 self.db
43 .connection()
44 .call(move |conn| {
45 conn.execute_batch("BEGIN IMMEDIATE")
46 .map_err(tokio_rusqlite::Error::Rusqlite)?;
47
48 conn.execute(
49 "INSERT INTO sessions (id, project_id, goal_id, started_at, agent_ids)
50 VALUES (?, ?, ?, ?, ?)",
51 rusqlite::params![
52 &session_id_clone,
53 &project_id_owned,
54 &goal_id_owned,
55 &now_rfc3339,
56 "[]"
57 ],
58 )
59 .map_err(tokio_rusqlite::Error::Rusqlite)?;
60
61 conn.execute_batch("COMMIT")
62 .map_err(tokio_rusqlite::Error::Rusqlite)
63 })
64 .await
65 .map_err(|e| anyhow!("failed to insert session: {}", e))?;
66
67 Ok(Session {
68 id: session_id,
69 project_id: project_id.to_string(),
70 goal_id: goal_id.to_string(),
71 started_at: now,
72 ended_at: None,
73 handoff_notes: None,
74 agent_ids: vec![],
75 summary: None,
76 })
77 }
78
79 /// End a session and generate handoff notes
80 pub async fn end_session(
81 &self,
82 session_id: &str,
83 _graph_store: &SqliteGraphStore,
84 ) -> Result<()> {
85 // First, get the session to find the goal_id
86 let session = self
87 .get_session(session_id)
88 .await?
89 .ok_or_else(|| anyhow!("session not found: {}", session_id))?;
90
91 let goal_id = session.goal_id.clone();
92 let now = Utc::now();
93 let now_rfc3339 = now.to_rfc3339();
94 let session_id_owned = session_id.to_string();
95
96 // Generate handoff notes within a transaction
97 self.db
98 .connection()
99 .call(move |conn| {
100 conn.execute_batch("BEGIN IMMEDIATE")
101 .map_err(tokio_rusqlite::Error::Rusqlite)?;
102 let notes = generate_handoff_notes(conn, &goal_id)
103 .map_err(tokio_rusqlite::Error::Rusqlite)?;
104 conn.execute(
105 "UPDATE sessions SET ended_at = ?, handoff_notes = ? WHERE id = ?",
106 rusqlite::params![&now_rfc3339, ¬es, &session_id_owned],
107 )
108 .map_err(tokio_rusqlite::Error::Rusqlite)?;
109 conn.execute_batch("COMMIT")
110 .map_err(tokio_rusqlite::Error::Rusqlite)?;
111 Ok::<(), tokio_rusqlite::Error>(())
112 })
113 .await
114 .map_err(|e| anyhow!("database error: {}", e))?;
115
116 Ok(())
117 }
118
119 /// Get a session by ID
120 pub async fn get_session(&self, session_id: &str) -> Result<Option<Session>> {
121 let session_id_owned = session_id.to_string();
122
123 let result = self
124 .db
125 .connection()
126 .call(move |conn| {
127 let mut stmt = conn.prepare(
128 "SELECT id, project_id, goal_id, started_at, ended_at, handoff_notes, agent_ids, summary
129 FROM sessions WHERE id = ?",
130 )?;
131
132 let session: Option<Session> = stmt
133 .query_row([&session_id_owned], map_session_row)
134 .optional()?;
135
136 Ok(session)
137 })
138 .await
139 .map_err(|e| anyhow!("database error: {}", e))?;
140
141 Ok(result)
142 }
143
144 /// Get the most recent session for a goal
145 pub async fn get_latest_session(&self, goal_id: &str) -> Result<Option<Session>> {
146 let goal_id_owned = goal_id.to_string();
147
148 let result = self
149 .db
150 .connection()
151 .call(move |conn| {
152 let mut stmt = conn.prepare(
153 "SELECT id, project_id, goal_id, started_at, ended_at, handoff_notes, agent_ids, summary
154 FROM sessions WHERE goal_id = ?
155 ORDER BY started_at DESC LIMIT 1",
156 )?;
157
158 let session: Option<Session> = stmt
159 .query_row([&goal_id_owned], map_session_row)
160 .optional()?;
161
162 Ok(session)
163 })
164 .await
165 .map_err(|e| anyhow!("database error: {}", e))?;
166
167 Ok(result)
168 }
169
170 /// List all sessions for a goal
171 pub async fn list_sessions(&self, goal_id: &str) -> Result<Vec<Session>> {
172 let goal_id_owned = goal_id.to_string();
173
174 self.db
175 .connection()
176 .call(move |conn| {
177 let mut stmt = conn.prepare(
178 "SELECT id, project_id, goal_id, started_at, ended_at, handoff_notes, agent_ids, summary
179 FROM sessions WHERE goal_id = ?
180 ORDER BY started_at DESC",
181 )?;
182
183 let mut sessions = vec![];
184 let rows = stmt.query_map([&goal_id_owned], map_session_row)?;
185
186 for session_result in rows {
187 sessions.push(session_result?);
188 }
189
190 Ok(sessions)
191 })
192 .await
193 .map_err(|e| anyhow!("database error: {}", e))
194 }
195}
196
197/// Map a database row to a Session struct
198fn map_session_row(row: &rusqlite::Row) -> rusqlite::Result<Session> {
199 let started_at_str: String = row.get(3)?;
200 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
201 .ok()
202 .map(|dt| dt.with_timezone(&Utc))
203 .ok_or(rusqlite::Error::InvalidQuery)?;
204
205 let ended_at_str: Option<String> = row.get(4)?;
206 let ended_at = ended_at_str.and_then(|s| {
207 chrono::DateTime::parse_from_rfc3339(&s)
208 .ok()
209 .map(|dt| dt.with_timezone(&Utc))
210 });
211
212 let agent_ids_json: String = row.get(6)?;
213 let agent_ids: Vec<String> = serde_json::from_str(&agent_ids_json).unwrap_or_default();
214
215 Ok(Session {
216 id: row.get(0)?,
217 project_id: row.get(1)?,
218 goal_id: row.get(2)?,
219 started_at,
220 ended_at,
221 handoff_notes: row.get(5)?,
222 agent_ids,
223 summary: row.get(7)?,
224 })
225}
226
227/// Generate handoff notes from the current graph state
228/// This runs synchronously within a transaction on the raw rusqlite connection
229fn generate_handoff_notes(conn: &rusqlite::Connection, goal_id: &str) -> rusqlite::Result<String> {
230 let mut notes = String::new();
231
232 // Query all descendants of the goal
233 let descendants = get_descendants(conn, goal_id)?;
234 let descendant_ids: Vec<String> = descendants.iter().map(|d| d.0.clone()).collect();
235
236 if descendant_ids.is_empty() {
237 // No descendants, return empty template
238 notes.push_str("## Done\n\n");
239 notes.push_str("## Remaining\n\n");
240 notes.push_str("## Blocked\n\n");
241 notes.push_str("## Decisions Made\n\n");
242 return Ok(notes);
243 }
244
245 // Build placeholders for SQL IN clause
246 let placeholders = descendant_ids
247 .iter()
248 .map(|_| "?")
249 .collect::<Vec<_>>()
250 .join(",");
251
252 // Query for Done nodes (Completed or Decided)
253 let done_query = format!(
254 "SELECT id, title, status FROM nodes WHERE id IN ({})
255 AND (status = 'completed' OR status = 'decided')
256 ORDER BY completed_at ASC, created_at ASC",
257 placeholders
258 );
259 notes.push_str("## Done\n");
260 let mut stmt = conn.prepare(&done_query)?;
261 let done_nodes: Vec<_> = stmt
262 .query_map(
263 rusqlite::params_from_iter(descendant_ids.iter().map(|s| s.as_str())),
264 |row| {
265 Ok((
266 row.get::<_, String>(0)?,
267 row.get::<_, String>(1)?,
268 row.get::<_, String>(2)?,
269 ))
270 },
271 )?
272 .collect::<Result<Vec<_>, _>>()?;
273 if done_nodes.is_empty() {
274 notes.push_str("(none)\n");
275 } else {
276 for (id, title, status) in done_nodes {
277 notes.push_str(&format!("- {}: {} ({})\n", id, title, status));
278 }
279 }
280 notes.push('\n');
281
282 // Query for Remaining nodes (Ready, Pending, InProgress)
283 let remaining_query = format!(
284 "SELECT id, title, status FROM nodes WHERE id IN ({})
285 AND (status = 'ready' OR status = 'pending' OR status = 'in_progress')
286 ORDER BY created_at ASC",
287 placeholders
288 );
289 notes.push_str("## Remaining\n");
290 let mut stmt = conn.prepare(&remaining_query)?;
291 let remaining_nodes: Vec<_> = stmt
292 .query_map(
293 rusqlite::params_from_iter(descendant_ids.iter().map(|s| s.as_str())),
294 |row| {
295 Ok((
296 row.get::<_, String>(0)?,
297 row.get::<_, String>(1)?,
298 row.get::<_, String>(2)?,
299 ))
300 },
301 )?
302 .collect::<Result<Vec<_>, _>>()?;
303 if remaining_nodes.is_empty() {
304 notes.push_str("(none)\n");
305 } else {
306 for (id, title, status) in remaining_nodes {
307 notes.push_str(&format!("- {}: {} [{}]\n", id, title, status));
308 }
309 }
310 notes.push('\n');
311
312 // Query for Blocked nodes
313 let blocked_query = format!(
314 "SELECT id, title, blocked_reason FROM nodes WHERE id IN ({})
315 AND status = 'blocked'
316 ORDER BY created_at ASC",
317 placeholders
318 );
319 notes.push_str("## Blocked\n");
320 let mut stmt = conn.prepare(&blocked_query)?;
321 let blocked_nodes: Vec<_> = stmt
322 .query_map(
323 rusqlite::params_from_iter(descendant_ids.iter().map(|s| s.as_str())),
324 |row| {
325 Ok((
326 row.get::<_, String>(0)?,
327 row.get::<_, String>(1)?,
328 row.get::<_, Option<String>>(2)?,
329 ))
330 },
331 )?
332 .collect::<Result<Vec<_>, _>>()?;
333 if blocked_nodes.is_empty() {
334 notes.push_str("(none)\n");
335 } else {
336 for (id, title, blocked_reason) in blocked_nodes {
337 if let Some(reason) = blocked_reason {
338 notes.push_str(&format!("- {}: {} — {}\n", id, title, reason));
339 } else {
340 notes.push_str(&format!("- {}: {}\n", id, title));
341 }
342 }
343 }
344 notes.push('\n');
345
346 // Query for Decisions Made (Decision nodes with Decided status)
347 let decisions_query = format!(
348 "SELECT id, title FROM nodes WHERE id IN ({})
349 AND node_type = 'decision' AND status = 'decided'
350 ORDER BY created_at ASC",
351 placeholders
352 );
353 notes.push_str("## Decisions Made\n");
354 let mut stmt = conn.prepare(&decisions_query)?;
355 let decision_nodes: Vec<_> = stmt
356 .query_map(
357 rusqlite::params_from_iter(descendant_ids.iter().map(|s| s.as_str())),
358 |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
359 )?
360 .collect::<Result<Vec<_>, _>>()?;
361 if decision_nodes.is_empty() {
362 notes.push_str("(none)\n");
363 } else {
364 for (decision_id, decision_title) in decision_nodes {
365 // Find the chosen option
366 let mut chosen_stmt = conn.prepare(
367 "SELECT n.title, e.label FROM edges e
368 JOIN nodes n ON e.to_node = n.id
369 WHERE e.from_node = ? AND e.edge_type = 'chosen'",
370 )?;
371
372 let chosen_option: Option<(String, Option<String>)> = chosen_stmt
373 .query_row([&decision_id], |row| {
374 Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
375 })
376 .optional()?;
377
378 if let Some((option_title, rationale)) = chosen_option {
379 if let Some(r) = rationale {
380 notes.push_str(&format!(
381 "- {}: {} → {} ({})\n",
382 decision_id, decision_title, option_title, r
383 ));
384 } else {
385 notes.push_str(&format!(
386 "- {}: {} → {}\n",
387 decision_id, decision_title, option_title
388 ));
389 }
390 } else {
391 notes.push_str(&format!("- {}: {}\n", decision_id, decision_title));
392 }
393 }
394 }
395 notes.push('\n');
396
397 Ok(notes)
398}
399
400/// Get all descendants of a node via Contains edges (helper for handoff notes)
401fn get_descendants(
402 conn: &rusqlite::Connection,
403 parent_id: &str,
404) -> rusqlite::Result<Vec<(String, String)>> {
405 // Recursive CTE to get all descendants
406 let query = "
407 WITH RECURSIVE descendants AS (
408 SELECT id, node_type FROM nodes WHERE id = ?
409 UNION ALL
410 SELECT n.id, n.node_type FROM nodes n
411 JOIN edges e ON n.id = e.to_node
412 JOIN descendants d ON e.from_node = d.id
413 WHERE e.edge_type = 'contains'
414 )
415 SELECT id, node_type FROM descendants WHERE id != ?
416 ";
417
418 let mut stmt = conn.prepare(query)?;
419 let descendants = stmt
420 .query_map(rusqlite::params![parent_id, parent_id], |row| {
421 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
422 })?
423 .collect::<Result<Vec<_>, _>>()?;
424
425 Ok(descendants)
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431
432 #[tokio::test]
433 async fn test_session_creation() {
434 // This test structure will be used in session_test.rs
435 // Just verify Session struct can be created
436 let session = Session {
437 id: "sess-test".to_string(),
438 project_id: "proj-1".to_string(),
439 goal_id: "ra-1234".to_string(),
440 started_at: Utc::now(),
441 ended_at: None,
442 handoff_notes: None,
443 agent_ids: vec![],
444 summary: None,
445 };
446 assert_eq!(session.id, "sess-test");
447 }
448}