use crate::cypher::planner::{QueryPlan, ScanPlan, CreatePlan, ProjectPlan}; use crate::cypher::ast::{CypherQuery, MatchClause, CreateClause, ReturnClause, Expression, PatternElement}; use crate::core::{Graph, PropertyValue}; use crate::index::{IndexQuery, IndexType}; use crate::{Result, GigabrainError, NodeId, RelationshipId}; use std::sync::Arc; use std::collections::HashMap; use tracing::{debug, info}; pub struct QueryExecutor { graph: Arc, } impl QueryExecutor { pub fn new(graph: Arc) -> Self { Self { graph } } pub async fn execute_query(&self, query: &CypherQuery) -> Result { match query { CypherQuery::Match(match_clause) => self.execute_match(match_clause).await, CypherQuery::Create(create_clause) => self.execute_create_query(create_clause).await, CypherQuery::Return(return_clause) => self.execute_return(return_clause).await, CypherQuery::Compound(queries) => { let mut context = ExecutionContext::new(); for query in queries { let _result = self.execute_query_with_context(query, &mut context).await?; // Store intermediate results in context for compound queries } Ok(context.into_result()) }, _ => Err(GigabrainError::Query("Query type not supported yet".to_string())), } } fn execute_query_with_context<'a>(&'a self, query: &'a CypherQuery, context: &'a mut ExecutionContext) -> std::pin::Pin> + 'a>> { Box::pin(async move { match query { CypherQuery::Match(match_clause) => { let result = self.execute_match_with_context(match_clause, context).await?; context.merge_result(result); Ok(QueryResult::empty()) }, CypherQuery::Return(return_clause) => { self.execute_return_with_context(return_clause, context).await }, _ => self.execute_query(query).await, } }) } async fn execute_match(&self, match_clause: &MatchClause) -> Result { let mut context = ExecutionContext::new(); self.execute_match_with_context(match_clause, &mut context).await?; // Return all matched nodes directly let all_nodes = self.graph.get_all_nodes(); let columns = vec!["n".to_string()]; let mut rows = Vec::new(); for node_id in all_nodes { rows.push(Row { values: vec![Value::Node(node_id)] }); } Ok(QueryResult { rows, columns }) } async fn execute_match_with_context(&self, match_clause: &MatchClause, context: &mut ExecutionContext) -> Result { // Simple pattern matching for demonstration // In reality, this would use a sophisticated query planner let pattern = &match_clause.pattern; // Process pattern elements sequentially let mut i = 0; while i < pattern.elements.len() { match &pattern.elements[i] { PatternElement::Node(node_pattern) => { if let Some(variable) = &node_pattern.variable { // Get nodes matching the pattern let matching_nodes = self.find_matching_nodes(node_pattern).await?; context.bind_variable(variable.clone(), VariableBinding::Nodes(matching_nodes)); } i += 1; }, PatternElement::Relationship(rel_pattern) => { // Handle relationship patterns if let Some(variable) = &rel_pattern.variable { // For now, get all relationships // TODO: Implement proper relationship matching based on type and direction let all_relationships = self.get_all_relationships().await?; context.bind_variable(variable.clone(), VariableBinding::Relationships(all_relationships)); } i += 1; }, _ => { i += 1; } } } // Apply WHERE clause filtering if present if let Some(where_expr) = &match_clause.where_clause { self.apply_where_filter(context, where_expr).await?; } Ok(QueryResult::empty()) } async fn apply_where_filter(&self, context: &mut ExecutionContext, where_expr: &Expression) -> Result<()> { // Create a new context to store filtered results let mut filtered_context = ExecutionContext::new(); // For each variable binding, evaluate the WHERE clause let variable_names: Vec = context.variables.keys().cloned().collect(); if variable_names.is_empty() { return Ok(()); } // Get all possible combinations of variable bindings let combinations = self.generate_binding_combinations(context).await?; // Filter combinations based on WHERE clause let mut filtered_combinations = Vec::new(); for combination in combinations { // Create a temporary context with this specific binding combination let mut temp_context = ExecutionContext::new(); for (var_name, binding) in &combination { temp_context.bind_variable(var_name.clone(), binding.clone()); } // Evaluate WHERE clause with this binding let result = self.evaluate_expression(where_expr, &temp_context, None).await?; if self.is_truthy(&result) { filtered_combinations.push(combination); } } // Rebuild the context with filtered results self.rebuild_context_from_combinations(context, filtered_combinations); Ok(()) } async fn generate_binding_combinations(&self, context: &ExecutionContext) -> Result>> { let mut combinations = Vec::new(); let variable_names: Vec = context.variables.keys().cloned().collect(); if variable_names.is_empty() { return Ok(combinations); } // Generate all combinations of variable bindings // This is a simplified approach - for performance, we'd use join algorithms if variable_names.len() == 1 { let var_name = &variable_names[0]; if let Some(binding) = context.variables.get(var_name) { match binding { VariableBinding::Nodes(node_ids) => { for &node_id in node_ids { let mut combination = HashMap::new(); combination.insert(var_name.clone(), VariableBinding::Nodes(vec![node_id])); combinations.push(combination); } }, VariableBinding::Relationships(rel_ids) => { for &rel_id in rel_ids { let mut combination = HashMap::new(); combination.insert(var_name.clone(), VariableBinding::Relationships(vec![rel_id])); combinations.push(combination); } } } } } else { // For multiple variables, generate cartesian product // This is simplified - real implementation would be more efficient self.generate_cartesian_product(context, &variable_names, 0, &mut HashMap::new(), &mut combinations); } Ok(combinations) } fn generate_cartesian_product( &self, context: &ExecutionContext, variable_names: &[String], index: usize, current_combination: &mut HashMap, combinations: &mut Vec> ) { if index >= variable_names.len() { combinations.push(current_combination.clone()); return; } let var_name = &variable_names[index]; if let Some(binding) = context.variables.get(var_name) { match binding { VariableBinding::Nodes(node_ids) => { for &node_id in node_ids { current_combination.insert(var_name.clone(), VariableBinding::Nodes(vec![node_id])); self.generate_cartesian_product(context, variable_names, index + 1, current_combination, combinations); } }, VariableBinding::Relationships(rel_ids) => { for &rel_id in rel_ids { current_combination.insert(var_name.clone(), VariableBinding::Relationships(vec![rel_id])); self.generate_cartesian_product(context, variable_names, index + 1, current_combination, combinations); } } } } } fn rebuild_context_from_combinations(&self, context: &mut ExecutionContext, combinations: Vec>) { // Clear current context context.variables.clear(); // Group combinations back into variable bindings let mut var_to_nodes: HashMap> = HashMap::new(); let mut var_to_rels: HashMap> = HashMap::new(); for combination in combinations { for (var_name, binding) in combination { match binding { VariableBinding::Nodes(node_ids) => { var_to_nodes.entry(var_name).or_insert_with(Vec::new).extend(node_ids); }, VariableBinding::Relationships(rel_ids) => { var_to_rels.entry(var_name).or_insert_with(Vec::new).extend(rel_ids); } } } } // Rebuild context with filtered bindings for (var_name, node_ids) in var_to_nodes { context.bind_variable(var_name, VariableBinding::Nodes(node_ids)); } for (var_name, rel_ids) in var_to_rels { context.bind_variable(var_name, VariableBinding::Relationships(rel_ids)); } } async fn find_matching_nodes(&self, node_pattern: &crate::cypher::ast::NodePattern) -> Result> { debug!("Finding nodes matching pattern: labels={:?}, properties={:?}", node_pattern.labels, node_pattern.properties); let mut candidate_nodes: Option> = None; // Use label indexes if labels are specified if !node_pattern.labels.is_empty() { debug!("Using label indexes for labels: {:?}", node_pattern.labels); let mut label_matches = Vec::new(); for label_name in &node_pattern.labels { match self.graph.find_nodes_by_label(label_name) { Ok(nodes) => { if label_matches.is_empty() { label_matches = nodes; } else { // Intersect with previous results (AND logic for multiple labels) label_matches.retain(|node_id| nodes.contains(node_id)); } }, Err(e) => { debug!("Failed to use label index for '{}': {}", label_name, e); // Fall back to scanning all nodes label_matches = self.graph.get_all_nodes(); break; } } } candidate_nodes = Some(label_matches); info!("Label index returned {} candidate nodes", candidate_nodes.as_ref().unwrap().len()); } // Use property indexes if properties are specified if let Some(ref properties) = node_pattern.properties { debug!("Using property indexes for {} properties", properties.len()); let mut property_matches: Option> = None; for (prop_name, value_expr) in properties { // For now, only handle literal values in property patterns if let Expression::Literal(prop_value) = value_expr { match self.graph.find_nodes_by_property(prop_name, prop_value) { Ok(nodes) => { if let Some(ref mut existing_matches) = property_matches { // Intersect with previous property matches existing_matches.retain(|node_id| nodes.contains(node_id)); } else { property_matches = Some(nodes); } }, Err(e) => { debug!("Failed to use property index for '{}': {}", prop_name, e); // Continue with other properties or fall back to scanning } } } } if let Some(prop_nodes) = property_matches { if let Some(ref mut candidates) = candidate_nodes { // Intersect label matches with property matches candidates.retain(|node_id| prop_nodes.contains(node_id)); } else { candidate_nodes = Some(prop_nodes); } info!("Property index intersection returned {} candidate nodes", candidate_nodes.as_ref().unwrap().len()); } } // If no indexes were used, scan all nodes let result_nodes = candidate_nodes.unwrap_or_else(|| { debug!("No indexes available, scanning all nodes"); self.graph.get_all_nodes() }); // TODO: Apply additional filtering for complex property expressions // that couldn't be handled by indexes debug!("Final result: {} nodes matched the pattern", result_nodes.len()); Ok(result_nodes) } async fn get_all_relationships(&self) -> Result> { // Get all relationships in the graph // This is a simplified implementation - normally we'd iterate through the graph structure let mut all_relationships = Vec::new(); // For each node, get its relationships for node_id in self.graph.get_all_nodes() { let relationships = self.graph.get_node_relationships( node_id, crate::core::relationship::Direction::Both, None ); for rel in relationships { if !all_relationships.contains(&rel.id) { all_relationships.push(rel.id); } } } Ok(all_relationships) } async fn execute_create_query(&self, create_clause: &CreateClause) -> Result { // Create new nodes/relationships based on pattern let mut created_nodes = Vec::new(); let mut created_relationships = Vec::new(); let mut node_variables = HashMap::new(); // First pass: create all nodes for element in &create_clause.pattern.elements { if let PatternElement::Node(node_pattern) = element { // Create a new node let node_id = self.graph.create_node(); // Add labels and properties using update_node self.graph.update_node(node_id, |node| { // Add labels if specified if !node_pattern.labels.is_empty() { let schema = self.graph.schema(); for label_name in &node_pattern.labels { let label_id = { let mut schema_guard = schema.write(); schema_guard.get_or_create_label(label_name) }; node.add_label(label_id); } } // Add properties if specified if let Some(ref properties) = node_pattern.properties { let schema = self.graph.schema(); for (key, value_expr) in properties { let property_key_id = { let mut schema_guard = schema.write(); schema_guard.get_or_create_property_key(key) }; // Convert expression to property value (simplified) let property_value = match value_expr { Expression::Literal(literal) => literal.clone(), _ => PropertyValue::String("unknown".to_string()), }; node.properties.insert(property_key_id, property_value); } } })?; created_nodes.push(node_id); // Add to indexes after creating the node if let Some(node) = self.graph.get_node(node_id) { let labels: Vec<_> = node.labels.iter().cloned().collect(); let properties = node.properties.clone(); // Ensure indexes exist for any new labels for &label_id in &labels { let index_type = IndexType::Label(label_id); if let Err(e) = self.graph.index_manager().create_index(index_type, None, false) { debug!("Label index may already exist or failed to create: {}", e); } } // Ensure indexes exist for any new properties for &property_key_id in properties.keys() { let index_type = IndexType::Property(property_key_id); if let Err(e) = self.graph.index_manager().create_index(index_type, None, false) { debug!("Property index may already exist or failed to create: {}", e); } } // Add node to indexes if let Err(e) = self.graph.index_manager().add_node(node_id, &labels, &properties) { debug!("Failed to add created node to indexes: {}", e); } else { debug!("Added newly created node {} to indexes", node_id.0); } } // Store node variable mapping for relationships if let Some(var_name) = &node_pattern.variable { node_variables.insert(var_name.clone(), node_id); } } } // Second pass: create relationships between nodes let mut i = 0; while i < create_clause.pattern.elements.len() { if i + 2 < create_clause.pattern.elements.len() { if let (PatternElement::Node(start_node), PatternElement::Relationship(rel_pattern), PatternElement::Node(end_node)) = (&create_clause.pattern.elements[i], &create_clause.pattern.elements[i + 1], &create_clause.pattern.elements[i + 2]) { // Get start and end node IDs let start_id = if let Some(var_name) = &start_node.variable { node_variables.get(var_name).copied() } else { created_nodes.first().copied() }; let end_id = if let Some(var_name) = &end_node.variable { node_variables.get(var_name).copied() } else { created_nodes.last().copied() }; if let (Some(start_id), Some(end_id)) = (start_id, end_id) { // Create the relationship let rel_type_id = if let Some(rel_type_name) = rel_pattern.types.first() { let schema = self.graph.schema(); let mut schema_guard = schema.write(); schema_guard.get_or_create_relationship_type(rel_type_name) } else { 0 // Default relationship type }; let rel_id = self.graph.create_relationship(start_id, end_id, rel_type_id)?; created_relationships.push(rel_id); // Add relationship properties if specified if let Some(ref properties) = rel_pattern.properties { // TODO: Implement relationship property setting // This would require extending the graph API to update relationship properties } } i += 3; // Skip the relationship pattern and end node } else { i += 1; } } else { i += 1; } } // Return result indicating what was created let mut columns = Vec::new(); let mut values = Vec::new(); if !created_nodes.is_empty() { columns.push("nodes_created".to_string()); values.push(Value::Integer(created_nodes.len() as i64)); } if !created_relationships.is_empty() { columns.push("relationships_created".to_string()); values.push(Value::Integer(created_relationships.len() as i64)); } let rows = if !values.is_empty() { vec![Row { values }] } else { vec![Row { values: vec![Value::Integer(0)] }] }; Ok(QueryResult { rows, columns }) } async fn execute_return(&self, return_clause: &ReturnClause) -> Result { let mut context = ExecutionContext::new(); self.execute_return_with_context(return_clause, &mut context).await } async fn execute_return_with_context(&self, return_clause: &ReturnClause, context: &ExecutionContext) -> Result { let mut columns = Vec::new(); let mut rows = Vec::new(); // Extract column names for item in &return_clause.items { let column_name = if let Some(alias) = &item.alias { alias.clone() } else { self.expression_to_string(&item.expression) }; columns.push(column_name); } // Generate result rows based on variable bindings if let Some(first_binding) = context.variables.values().next() { match first_binding { VariableBinding::Nodes(node_ids) => { for node_id in node_ids { let mut values = Vec::new(); for item in &return_clause.items { values.push(self.evaluate_expression(&item.expression, context, Some(*node_id)).await?); } rows.push(Row { values }); } }, _ => {} } } Ok(QueryResult { rows, columns }) } fn evaluate_expression<'a>(&'a self, expr: &'a Expression, context: &'a ExecutionContext, current_node: Option) -> std::pin::Pin> + 'a>> { Box::pin(async move { match expr { Expression::Variable(var_name) => { if let Some(binding) = context.variables.get(var_name) { match binding { VariableBinding::Nodes(nodes) => { if let Some(node_id) = current_node { Ok(Value::Node(node_id)) } else if let Some(&first_node) = nodes.first() { Ok(Value::Node(first_node)) } else { Ok(Value::Null) } }, VariableBinding::Relationships(rels) => { if let Some(&first_rel) = rels.first() { Ok(Value::Relationship(first_rel)) } else { Ok(Value::Null) } } } } else { Ok(Value::Null) } }, Expression::Property(prop_expr) => { let base_value = self.evaluate_expression(&prop_expr.expression, context, current_node).await?; self.get_property_value(base_value, &prop_expr.property).await }, Expression::Literal(literal) => { Ok(self.property_value_to_executor_value(literal)) }, Expression::Equal(left, right) => { let left_val = self.evaluate_expression(left, context, current_node).await?; let right_val = self.evaluate_expression(right, context, current_node).await?; Ok(Value::Boolean(self.values_equal(&left_val, &right_val))) }, Expression::NotEqual(left, right) => { let left_val = self.evaluate_expression(left, context, current_node).await?; let right_val = self.evaluate_expression(right, context, current_node).await?; Ok(Value::Boolean(!self.values_equal(&left_val, &right_val))) }, Expression::And(left, right) => { let left_val = self.evaluate_expression(left, context, current_node).await?; let right_val = self.evaluate_expression(right, context, current_node).await?; Ok(Value::Boolean(self.is_truthy(&left_val) && self.is_truthy(&right_val))) }, Expression::Or(left, right) => { let left_val = self.evaluate_expression(left, context, current_node).await?; let right_val = self.evaluate_expression(right, context, current_node).await?; Ok(Value::Boolean(self.is_truthy(&left_val) || self.is_truthy(&right_val))) }, Expression::Not(expr) => { let val = self.evaluate_expression(expr, context, current_node).await?; Ok(Value::Boolean(!self.is_truthy(&val))) }, Expression::LessThan(left, right) => { let left_val = self.evaluate_expression(left, context, current_node).await?; let right_val = self.evaluate_expression(right, context, current_node).await?; Ok(Value::Boolean(self.compare_values(&left_val, &right_val) < 0)) }, Expression::LessThanOrEqual(left, right) => { let left_val = self.evaluate_expression(left, context, current_node).await?; let right_val = self.evaluate_expression(right, context, current_node).await?; Ok(Value::Boolean(self.compare_values(&left_val, &right_val) <= 0)) }, Expression::GreaterThan(left, right) => { let left_val = self.evaluate_expression(left, context, current_node).await?; let right_val = self.evaluate_expression(right, context, current_node).await?; Ok(Value::Boolean(self.compare_values(&left_val, &right_val) > 0)) }, Expression::GreaterThanOrEqual(left, right) => { let left_val = self.evaluate_expression(left, context, current_node).await?; let right_val = self.evaluate_expression(right, context, current_node).await?; Ok(Value::Boolean(self.compare_values(&left_val, &right_val) >= 0)) }, _ => Ok(Value::Null), // TODO: Implement remaining expression types } }) } async fn get_property_value(&self, base_value: Value, property_name: &str) -> Result { match base_value { Value::Node(node_id) => { if let Some(node) = self.graph.get_node(node_id) { // Get property value from node let schema = self.graph.schema(); let schema_guard = schema.read(); for (prop_key, _) in &schema_guard.property_keys { if prop_key == property_name { let prop_key_id = schema_guard.property_keys[prop_key]; if let Some(prop_value) = node.properties.get(&prop_key_id) { return Ok(self.property_value_to_executor_value(prop_value)); } } } } Ok(Value::Null) }, Value::Relationship(_rel_id) => { // TODO: Implement relationship property access Ok(Value::Null) }, _ => Ok(Value::Null), } } fn property_value_to_executor_value(&self, prop_val: &PropertyValue) -> Value { match prop_val { PropertyValue::String(s) => Value::String(s.clone()), PropertyValue::Integer(i) => Value::Integer(*i), PropertyValue::Float(f) => Value::Float(*f), PropertyValue::Boolean(b) => Value::Boolean(*b), PropertyValue::List(list) => { let converted_list: Vec = list.iter() .map(|item| self.property_value_to_executor_value(item)) .collect(); Value::List(converted_list) }, PropertyValue::Null => Value::Null, PropertyValue::Map(_) => Value::Null, // TODO: Implement map support } } fn values_equal(&self, left: &Value, right: &Value) -> bool { match (left, right) { (Value::String(a), Value::String(b)) => a == b, (Value::Integer(a), Value::Integer(b)) => a == b, (Value::Float(a), Value::Float(b)) => (a - b).abs() < f64::EPSILON, (Value::Boolean(a), Value::Boolean(b)) => a == b, (Value::Node(a), Value::Node(b)) => a == b, (Value::Relationship(a), Value::Relationship(b)) => a == b, (Value::Null, Value::Null) => true, // Type coercion for numbers (Value::Integer(a), Value::Float(b)) => (*a as f64 - b).abs() < f64::EPSILON, (Value::Float(a), Value::Integer(b)) => (a - *b as f64).abs() < f64::EPSILON, _ => false, } } fn is_truthy(&self, value: &Value) -> bool { match value { Value::Boolean(b) => *b, Value::Null => false, Value::Integer(i) => *i != 0, Value::Float(f) => *f != 0.0, Value::String(s) => !s.is_empty(), Value::List(list) => !list.is_empty(), Value::Node(_) => true, Value::Relationship(_) => true, } } fn compare_values(&self, left: &Value, right: &Value) -> i32 { match (left, right) { (Value::Integer(a), Value::Integer(b)) => a.cmp(b) as i32, (Value::Float(a), Value::Float(b)) => a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal) as i32, (Value::Integer(a), Value::Float(b)) => (*a as f64).partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal) as i32, (Value::Float(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)).unwrap_or(std::cmp::Ordering::Equal) as i32, (Value::String(a), Value::String(b)) => a.cmp(b) as i32, _ => 0, // Incomparable types are considered equal } } fn expression_to_string(&self, expr: &Expression) -> String { match expr { Expression::Variable(name) => name.clone(), _ => "expr".to_string(), } } pub async fn execute(&self, plan: QueryPlan) -> Result { match plan { QueryPlan::Scan(scan_plan) => self.execute_scan(scan_plan).await, QueryPlan::Create(create_plan) => self.execute_create(create_plan).await, QueryPlan::Project(project_plan) => self.execute_project(project_plan).await, _ => Err(GigabrainError::Query("Execution not implemented for this plan type".to_string())), } } async fn execute_scan(&self, _scan_plan: ScanPlan) -> Result { Ok(QueryResult::empty()) } async fn execute_create(&self, _create_plan: CreatePlan) -> Result { Ok(QueryResult::empty()) } async fn execute_project(&self, _project_plan: ProjectPlan) -> Result { Ok(QueryResult::empty()) } } #[derive(Debug)] pub struct ExecutionContext { variables: HashMap, } impl ExecutionContext { fn new() -> Self { Self { variables: HashMap::new(), } } fn bind_variable(&mut self, name: String, binding: VariableBinding) { self.variables.insert(name, binding); } fn merge_result(&mut self, _result: QueryResult) { // Merge results from previous query steps } fn into_result(self) -> QueryResult { // If we have variable bindings, create a result from them if let Some((var_name, binding)) = self.variables.iter().next() { match binding { VariableBinding::Nodes(node_ids) => { let columns = vec![var_name.clone()]; let mut rows = Vec::new(); for node_id in node_ids { rows.push(Row { values: vec![Value::Node(*node_id)] }); } QueryResult { rows, columns } }, VariableBinding::Relationships(rel_ids) => { let columns = vec![var_name.clone()]; let mut rows = Vec::new(); for rel_id in rel_ids { rows.push(Row { values: vec![Value::Relationship(*rel_id)] }); } QueryResult { rows, columns } }, } } else { QueryResult::empty() } } } #[derive(Debug, Clone)] enum VariableBinding { Nodes(Vec), Relationships(Vec), } #[derive(Debug)] pub struct QueryResult { pub rows: Vec, pub columns: Vec, } impl QueryResult { fn empty() -> Self { Self { rows: Vec::new(), columns: Vec::new(), } } } #[derive(Debug)] pub struct Row { pub values: Vec, } #[derive(Debug)] pub enum Value { Null, Node(crate::NodeId), Relationship(crate::RelationshipId), Integer(i64), Float(f64), String(String), Boolean(bool), List(Vec), }