An AI agent built to do Ralph loops - plan mode for planning and ralph mode for implementing.
at new-directions 448 lines 15 kB view raw
1use crate::db::Database; 2use crate::graph::store::SqliteGraphStore; 3use anyhow::{Result, anyhow}; 4use chrono::{DateTime, Utc}; 5use serde::{Deserialize, Serialize}; 6use tokio_rusqlite::OptionalExtension; 7use uuid::Uuid; 8 9/// A session represents a work period for a goal 10#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 11pub struct Session { 12 pub id: String, 13 pub project_id: String, 14 pub goal_id: String, 15 pub started_at: DateTime<Utc>, 16 pub ended_at: Option<DateTime<Utc>>, 17 pub handoff_notes: Option<String>, 18 pub agent_ids: Vec<String>, 19 pub summary: Option<String>, 20} 21 22/// Store for managing sessions 23pub struct SessionStore { 24 db: Database, 25} 26 27impl SessionStore { 28 /// Create a new SessionStore 29 pub fn new(db: Database) -> Self { 30 Self { db } 31 } 32 33 /// Create a new session for a goal 34 pub async fn create_session(&self, project_id: &str, goal_id: &str) -> Result<Session> { 35 let session_id = format!("sess-{}", &Uuid::new_v4().simple().to_string()[..8]); 36 let now = Utc::now(); 37 let now_rfc3339 = now.to_rfc3339(); 38 let project_id_owned = project_id.to_string(); 39 let goal_id_owned = goal_id.to_string(); 40 let session_id_clone = session_id.clone(); 41 42 self.db 43 .connection() 44 .call(move |conn| { 45 conn.execute_batch("BEGIN IMMEDIATE") 46 .map_err(tokio_rusqlite::Error::Rusqlite)?; 47 48 conn.execute( 49 "INSERT INTO sessions (id, project_id, goal_id, started_at, agent_ids) 50 VALUES (?, ?, ?, ?, ?)", 51 rusqlite::params![ 52 &session_id_clone, 53 &project_id_owned, 54 &goal_id_owned, 55 &now_rfc3339, 56 "[]" 57 ], 58 ) 59 .map_err(tokio_rusqlite::Error::Rusqlite)?; 60 61 conn.execute_batch("COMMIT") 62 .map_err(tokio_rusqlite::Error::Rusqlite) 63 }) 64 .await 65 .map_err(|e| anyhow!("failed to insert session: {}", e))?; 66 67 Ok(Session { 68 id: session_id, 69 project_id: project_id.to_string(), 70 goal_id: goal_id.to_string(), 71 started_at: now, 72 ended_at: None, 73 handoff_notes: None, 74 agent_ids: vec![], 75 summary: None, 76 }) 77 } 78 79 /// End a session and generate handoff notes 80 pub async fn end_session( 81 &self, 82 session_id: &str, 83 _graph_store: &SqliteGraphStore, 84 ) -> Result<()> { 85 // First, get the session to find the goal_id 86 let session = self 87 .get_session(session_id) 88 .await? 89 .ok_or_else(|| anyhow!("session not found: {}", session_id))?; 90 91 let goal_id = session.goal_id.clone(); 92 let now = Utc::now(); 93 let now_rfc3339 = now.to_rfc3339(); 94 let session_id_owned = session_id.to_string(); 95 96 // Generate handoff notes within a transaction 97 self.db 98 .connection() 99 .call(move |conn| { 100 conn.execute_batch("BEGIN IMMEDIATE") 101 .map_err(tokio_rusqlite::Error::Rusqlite)?; 102 let notes = generate_handoff_notes(conn, &goal_id) 103 .map_err(tokio_rusqlite::Error::Rusqlite)?; 104 conn.execute( 105 "UPDATE sessions SET ended_at = ?, handoff_notes = ? WHERE id = ?", 106 rusqlite::params![&now_rfc3339, &notes, &session_id_owned], 107 ) 108 .map_err(tokio_rusqlite::Error::Rusqlite)?; 109 conn.execute_batch("COMMIT") 110 .map_err(tokio_rusqlite::Error::Rusqlite)?; 111 Ok::<(), tokio_rusqlite::Error>(()) 112 }) 113 .await 114 .map_err(|e| anyhow!("database error: {}", e))?; 115 116 Ok(()) 117 } 118 119 /// Get a session by ID 120 pub async fn get_session(&self, session_id: &str) -> Result<Option<Session>> { 121 let session_id_owned = session_id.to_string(); 122 123 let result = self 124 .db 125 .connection() 126 .call(move |conn| { 127 let mut stmt = conn.prepare( 128 "SELECT id, project_id, goal_id, started_at, ended_at, handoff_notes, agent_ids, summary 129 FROM sessions WHERE id = ?", 130 )?; 131 132 let session: Option<Session> = stmt 133 .query_row([&session_id_owned], map_session_row) 134 .optional()?; 135 136 Ok(session) 137 }) 138 .await 139 .map_err(|e| anyhow!("database error: {}", e))?; 140 141 Ok(result) 142 } 143 144 /// Get the most recent session for a goal 145 pub async fn get_latest_session(&self, goal_id: &str) -> Result<Option<Session>> { 146 let goal_id_owned = goal_id.to_string(); 147 148 let result = self 149 .db 150 .connection() 151 .call(move |conn| { 152 let mut stmt = conn.prepare( 153 "SELECT id, project_id, goal_id, started_at, ended_at, handoff_notes, agent_ids, summary 154 FROM sessions WHERE goal_id = ? 155 ORDER BY started_at DESC LIMIT 1", 156 )?; 157 158 let session: Option<Session> = stmt 159 .query_row([&goal_id_owned], map_session_row) 160 .optional()?; 161 162 Ok(session) 163 }) 164 .await 165 .map_err(|e| anyhow!("database error: {}", e))?; 166 167 Ok(result) 168 } 169 170 /// List all sessions for a goal 171 pub async fn list_sessions(&self, goal_id: &str) -> Result<Vec<Session>> { 172 let goal_id_owned = goal_id.to_string(); 173 174 self.db 175 .connection() 176 .call(move |conn| { 177 let mut stmt = conn.prepare( 178 "SELECT id, project_id, goal_id, started_at, ended_at, handoff_notes, agent_ids, summary 179 FROM sessions WHERE goal_id = ? 180 ORDER BY started_at DESC", 181 )?; 182 183 let mut sessions = vec![]; 184 let rows = stmt.query_map([&goal_id_owned], map_session_row)?; 185 186 for session_result in rows { 187 sessions.push(session_result?); 188 } 189 190 Ok(sessions) 191 }) 192 .await 193 .map_err(|e| anyhow!("database error: {}", e)) 194 } 195} 196 197/// Map a database row to a Session struct 198fn map_session_row(row: &rusqlite::Row) -> rusqlite::Result<Session> { 199 let started_at_str: String = row.get(3)?; 200 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str) 201 .ok() 202 .map(|dt| dt.with_timezone(&Utc)) 203 .ok_or(rusqlite::Error::InvalidQuery)?; 204 205 let ended_at_str: Option<String> = row.get(4)?; 206 let ended_at = ended_at_str.and_then(|s| { 207 chrono::DateTime::parse_from_rfc3339(&s) 208 .ok() 209 .map(|dt| dt.with_timezone(&Utc)) 210 }); 211 212 let agent_ids_json: String = row.get(6)?; 213 let agent_ids: Vec<String> = serde_json::from_str(&agent_ids_json).unwrap_or_default(); 214 215 Ok(Session { 216 id: row.get(0)?, 217 project_id: row.get(1)?, 218 goal_id: row.get(2)?, 219 started_at, 220 ended_at, 221 handoff_notes: row.get(5)?, 222 agent_ids, 223 summary: row.get(7)?, 224 }) 225} 226 227/// Generate handoff notes from the current graph state 228/// This runs synchronously within a transaction on the raw rusqlite connection 229fn generate_handoff_notes(conn: &rusqlite::Connection, goal_id: &str) -> rusqlite::Result<String> { 230 let mut notes = String::new(); 231 232 // Query all descendants of the goal 233 let descendants = get_descendants(conn, goal_id)?; 234 let descendant_ids: Vec<String> = descendants.iter().map(|d| d.0.clone()).collect(); 235 236 if descendant_ids.is_empty() { 237 // No descendants, return empty template 238 notes.push_str("## Done\n\n"); 239 notes.push_str("## Remaining\n\n"); 240 notes.push_str("## Blocked\n\n"); 241 notes.push_str("## Decisions Made\n\n"); 242 return Ok(notes); 243 } 244 245 // Build placeholders for SQL IN clause 246 let placeholders = descendant_ids 247 .iter() 248 .map(|_| "?") 249 .collect::<Vec<_>>() 250 .join(","); 251 252 // Query for Done nodes (Completed or Decided) 253 let done_query = format!( 254 "SELECT id, title, status FROM nodes WHERE id IN ({}) 255 AND (status = 'completed' OR status = 'decided') 256 ORDER BY completed_at ASC, created_at ASC", 257 placeholders 258 ); 259 notes.push_str("## Done\n"); 260 let mut stmt = conn.prepare(&done_query)?; 261 let done_nodes: Vec<_> = stmt 262 .query_map( 263 rusqlite::params_from_iter(descendant_ids.iter().map(|s| s.as_str())), 264 |row| { 265 Ok(( 266 row.get::<_, String>(0)?, 267 row.get::<_, String>(1)?, 268 row.get::<_, String>(2)?, 269 )) 270 }, 271 )? 272 .collect::<Result<Vec<_>, _>>()?; 273 if done_nodes.is_empty() { 274 notes.push_str("(none)\n"); 275 } else { 276 for (id, title, status) in done_nodes { 277 notes.push_str(&format!("- {}: {} ({})\n", id, title, status)); 278 } 279 } 280 notes.push('\n'); 281 282 // Query for Remaining nodes (Ready, Pending, InProgress) 283 let remaining_query = format!( 284 "SELECT id, title, status FROM nodes WHERE id IN ({}) 285 AND (status = 'ready' OR status = 'pending' OR status = 'in_progress') 286 ORDER BY created_at ASC", 287 placeholders 288 ); 289 notes.push_str("## Remaining\n"); 290 let mut stmt = conn.prepare(&remaining_query)?; 291 let remaining_nodes: Vec<_> = stmt 292 .query_map( 293 rusqlite::params_from_iter(descendant_ids.iter().map(|s| s.as_str())), 294 |row| { 295 Ok(( 296 row.get::<_, String>(0)?, 297 row.get::<_, String>(1)?, 298 row.get::<_, String>(2)?, 299 )) 300 }, 301 )? 302 .collect::<Result<Vec<_>, _>>()?; 303 if remaining_nodes.is_empty() { 304 notes.push_str("(none)\n"); 305 } else { 306 for (id, title, status) in remaining_nodes { 307 notes.push_str(&format!("- {}: {} [{}]\n", id, title, status)); 308 } 309 } 310 notes.push('\n'); 311 312 // Query for Blocked nodes 313 let blocked_query = format!( 314 "SELECT id, title, blocked_reason FROM nodes WHERE id IN ({}) 315 AND status = 'blocked' 316 ORDER BY created_at ASC", 317 placeholders 318 ); 319 notes.push_str("## Blocked\n"); 320 let mut stmt = conn.prepare(&blocked_query)?; 321 let blocked_nodes: Vec<_> = stmt 322 .query_map( 323 rusqlite::params_from_iter(descendant_ids.iter().map(|s| s.as_str())), 324 |row| { 325 Ok(( 326 row.get::<_, String>(0)?, 327 row.get::<_, String>(1)?, 328 row.get::<_, Option<String>>(2)?, 329 )) 330 }, 331 )? 332 .collect::<Result<Vec<_>, _>>()?; 333 if blocked_nodes.is_empty() { 334 notes.push_str("(none)\n"); 335 } else { 336 for (id, title, blocked_reason) in blocked_nodes { 337 if let Some(reason) = blocked_reason { 338 notes.push_str(&format!("- {}: {}{}\n", id, title, reason)); 339 } else { 340 notes.push_str(&format!("- {}: {}\n", id, title)); 341 } 342 } 343 } 344 notes.push('\n'); 345 346 // Query for Decisions Made (Decision nodes with Decided status) 347 let decisions_query = format!( 348 "SELECT id, title FROM nodes WHERE id IN ({}) 349 AND node_type = 'decision' AND status = 'decided' 350 ORDER BY created_at ASC", 351 placeholders 352 ); 353 notes.push_str("## Decisions Made\n"); 354 let mut stmt = conn.prepare(&decisions_query)?; 355 let decision_nodes: Vec<_> = stmt 356 .query_map( 357 rusqlite::params_from_iter(descendant_ids.iter().map(|s| s.as_str())), 358 |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)), 359 )? 360 .collect::<Result<Vec<_>, _>>()?; 361 if decision_nodes.is_empty() { 362 notes.push_str("(none)\n"); 363 } else { 364 for (decision_id, decision_title) in decision_nodes { 365 // Find the chosen option 366 let mut chosen_stmt = conn.prepare( 367 "SELECT n.title, e.label FROM edges e 368 JOIN nodes n ON e.to_node = n.id 369 WHERE e.from_node = ? AND e.edge_type = 'chosen'", 370 )?; 371 372 let chosen_option: Option<(String, Option<String>)> = chosen_stmt 373 .query_row([&decision_id], |row| { 374 Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?)) 375 }) 376 .optional()?; 377 378 if let Some((option_title, rationale)) = chosen_option { 379 if let Some(r) = rationale { 380 notes.push_str(&format!( 381 "- {}: {}{} ({})\n", 382 decision_id, decision_title, option_title, r 383 )); 384 } else { 385 notes.push_str(&format!( 386 "- {}: {}{}\n", 387 decision_id, decision_title, option_title 388 )); 389 } 390 } else { 391 notes.push_str(&format!("- {}: {}\n", decision_id, decision_title)); 392 } 393 } 394 } 395 notes.push('\n'); 396 397 Ok(notes) 398} 399 400/// Get all descendants of a node via Contains edges (helper for handoff notes) 401fn get_descendants( 402 conn: &rusqlite::Connection, 403 parent_id: &str, 404) -> rusqlite::Result<Vec<(String, String)>> { 405 // Recursive CTE to get all descendants 406 let query = " 407 WITH RECURSIVE descendants AS ( 408 SELECT id, node_type FROM nodes WHERE id = ? 409 UNION ALL 410 SELECT n.id, n.node_type FROM nodes n 411 JOIN edges e ON n.id = e.to_node 412 JOIN descendants d ON e.from_node = d.id 413 WHERE e.edge_type = 'contains' 414 ) 415 SELECT id, node_type FROM descendants WHERE id != ? 416 "; 417 418 let mut stmt = conn.prepare(query)?; 419 let descendants = stmt 420 .query_map(rusqlite::params![parent_id, parent_id], |row| { 421 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)) 422 })? 423 .collect::<Result<Vec<_>, _>>()?; 424 425 Ok(descendants) 426} 427 428#[cfg(test)] 429mod tests { 430 use super::*; 431 432 #[tokio::test] 433 async fn test_session_creation() { 434 // This test structure will be used in session_test.rs 435 // Just verify Session struct can be created 436 let session = Session { 437 id: "sess-test".to_string(), 438 project_id: "proj-1".to_string(), 439 goal_id: "ra-1234".to_string(), 440 started_at: Utc::now(), 441 ended_at: None, 442 handoff_notes: None, 443 agent_ids: vec![], 444 summary: None, 445 }; 446 assert_eq!(session.id, "sess-test"); 447 } 448}