use crate::error::{Result, StarError, VerificationKind}; use crate::types::{StarCommit, StarItem, StarMstNode}; use crate::validation::validate_node_structure; use cid::Cid; use sha2::{Digest, Sha256}; #[derive(Debug, Default)] enum State { #[default] Header, Body { stack: Vec, current_len: Option, }, Done, } #[derive(Debug)] enum StackItem { Node { expected: Option, expected_height: Option, }, Record { key: Vec, expected: Option, implicit_index: Option, }, VerifyLayer0 { node: StarMstNode, parent_expected: Option, pending_records: Vec<(usize, Cid)>, }, } #[derive(Default)] pub struct StarParser { state: State, } impl StarParser { pub fn new() -> Self { Self { state: State::Header, } } pub fn parse(&mut self, buf: &[u8]) -> Result<(usize, Option)> { let mut consumed = 0; loop { let is_body_done = if let State::Body { stack, .. } = &self.state { stack.is_empty() } else { false }; if is_body_done { self.state = State::Done; return Ok((consumed, None)); } let current_buf = &buf[consumed..]; match &mut self.state { State::Done => return Ok((consumed, None)), State::Header => { let (n, item) = match self.parse_header(current_buf)? { Some((n, item)) => (n, item), None => return Ok((consumed, None)), }; consumed += n; return Ok((consumed, Some(item))); } State::Body { stack, current_len } => { if Self::process_verification(stack)? { continue; } let (len_consumed, len) = match Self::read_length(current_buf, current_len)? { Some((n, len)) => (n, len), None => return Ok((consumed, None)), }; let body_buf = ¤t_buf[len_consumed..]; if body_buf.len() < len { consumed += len_consumed; return Ok((consumed, None)); } let block_bytes = &body_buf[..len]; *current_len = None; let item = stack.pop().unwrap(); let result_item = match item { StackItem::Node { expected, expected_height, } => Self::process_node(block_bytes, expected, expected_height, stack)?, StackItem::Record { key, expected, implicit_index, } => { Self::process_record(block_bytes, key, expected, implicit_index, stack)? } _ => return Err(StarError::InvalidState("Unexpected stack item".into())), }; consumed += len_consumed + len; return Ok((consumed, result_item)); } } } } fn parse_header(&mut self, buf: &[u8]) -> Result> { if buf.is_empty() { return Ok(None); } if buf[0] != 0x2A { return Err(StarError::InvalidHeader); } let slice = &buf[1..]; let (ver, remaining1) = match unsigned_varint::decode::usize(slice) { Ok(res) => res, Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), }; let (len, remaining2) = match unsigned_varint::decode::usize(remaining1) { Ok(res) => res, Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), }; let header_varints_len = buf.len() - 1 - remaining2.len(); let total_header_len = 1 + header_varints_len; let total_len = total_header_len + len; if buf.len() < total_len { return Ok(None); } let commit_bytes = &buf[total_header_len..total_len]; let commit: StarCommit = serde_ipld_dagcbor::from_slice(commit_bytes) .map_err(|e| StarError::Cbor(e.to_string()))?; let _ = ver; let mut stack = Vec::new(); if let Some(root_cid) = commit.data { stack.push(StackItem::Node { expected: Some(root_cid), expected_height: None, // Root }); } self.state = State::Body { stack, current_len: None, }; Ok(Some((total_len, StarItem::Commit(commit)))) } fn process_verification(stack: &mut Vec) -> Result { if let Some(StackItem::VerifyLayer0 { .. }) = stack.last() && let Some(StackItem::VerifyLayer0 { mut node, parent_expected, pending_records, .. }) = stack.pop() { for (idx, cid) in pending_records { if idx < node.e.len() { node.e[idx].v = Some(cid); } } let repo_node = node.to_repo()?; let bytes = serde_ipld_dagcbor::to_vec(&repo_node) .map_err(|e| StarError::Cbor(e.to_string()))?; let hash = Sha256::digest(&bytes); let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); if let Some(expected) = parent_expected && cid != expected { return Err(StarError::VerificationFailed { kind: VerificationKind::Node, expected: Box::new(expected), computed: Box::new(cid), }); } return Ok(true); } Ok(false) } fn read_length(buf: &[u8], current_len: &mut Option) -> Result> { if let Some(len) = current_len { return Ok(Some((0, *len))); } match unsigned_varint::decode::usize(buf) { Ok((l, remaining)) => { let consumed = buf.len() - remaining.len(); *current_len = Some(l); Ok(Some((consumed, l))) } Err(unsigned_varint::decode::Error::Insufficient) => Ok(None), Err(e) => Err(StarError::InvalidState(format!("Varint error: {}", e))), } } fn process_node( block_bytes: &[u8], expected: Option, expected_height: Option, stack: &mut Vec, ) -> Result> { let node: StarMstNode = serde_ipld_dagcbor::from_slice(block_bytes) .map_err(|e| StarError::Cbor(e.to_string()))?; // Use shared validation logic let (height, entry_keys) = validate_node_structure(&node, expected_height)?; // Check for implicit records (needed for VerifyLayer0 logic) let mut has_implicit = false; if height == 0 { for e in &node.e { if e.v_archived == Some(true) { has_implicit = true; break; } } } if !has_implicit { let repo_node = node.to_repo()?; let bytes = serde_ipld_dagcbor::to_vec(&repo_node) .map_err(|e| StarError::Cbor(e.to_string()))?; let hash = Sha256::digest(&bytes); let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); if let Some(exp) = expected && cid != exp { return Err(StarError::VerificationFailed { kind: VerificationKind::Node, expected: Box::new(exp), computed: Box::new(cid), }); } } else { stack.push(StackItem::VerifyLayer0 { node: node.clone(), parent_expected: expected, pending_records: Vec::new(), }); } // Push children in reverse let child_expected_height = height.checked_sub(1); if height > 0 { let next_h = child_expected_height.unwrap(); for i in (0..node.e.len()).rev() { let e = &node.e[i]; let key = entry_keys[i].clone(); if e.t_archived == Some(true) { stack.push(StackItem::Node { expected: e.t, expected_height: Some(next_h), }); } if e.v_archived == Some(true) { let implicit_index = if e.v.is_none() { Some(i) } else { None }; stack.push(StackItem::Record { key, expected: e.v, implicit_index, }); } } if node.l_archived == Some(true) { stack.push(StackItem::Node { expected: node.l, expected_height: Some(next_h), }); } } else { // Height 0: Push records only for i in (0..node.e.len()).rev() { let e = &node.e[i]; let key = entry_keys[i].clone(); if e.v_archived == Some(true) { let implicit_index = if e.v.is_none() { Some(i) } else { None }; stack.push(StackItem::Record { key, expected: e.v, implicit_index, }); } } } Ok(Some(StarItem::Node(node))) } fn process_record( block_bytes: &[u8], key: Vec, expected: Option, implicit_index: Option, stack: &mut [StackItem], ) -> Result> { let hash = Sha256::digest(block_bytes); let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); if let Some(exp) = expected && cid != exp { return Err(StarError::VerificationFailed { kind: VerificationKind::Record { key: key.clone() }, expected: Box::new(exp), computed: Box::new(cid), }); } if let Some(idx) = implicit_index { let mut found = false; for item in stack.iter_mut().rev() { if let StackItem::VerifyLayer0 { pending_records, .. } = item { pending_records.push((idx, cid)); found = true; break; } } if !found { return Err(StarError::InvalidState( "Implicit record verification context missing".into(), )); } } Ok(Some(StarItem::Record { key, cid, content: Some(block_bytes.to_vec()), })) } }