//! PostgreSQL composite type reader for deserializing wire format use crate::infrastructure::types::{CaptionMimeType, LanguageCode}; use chrono::{DateTime, Utc}; use diesel::deserialize::{self, FromSql}; use diesel::pg::Pg; use diesel::sql_types::SqlType; use super::types::VideoCaption; /// Helper for reading PostgreSQL composite types from wire format pub struct CompositeReader<'a> { bytes: &'a [u8], offset: usize, } impl<'a> CompositeReader<'a> { pub fn new(bytes: &'a [u8]) -> Self { Self { bytes, offset: 0 } } pub fn read_i32(&mut self) -> deserialize::Result { if self.offset + 4 > self.bytes.len() { return Err("unexpected end of input".into()); } let val = i32::from_be_bytes([ self.bytes[self.offset], self.bytes[self.offset + 1], self.bytes[self.offset + 2], self.bytes[self.offset + 3], ]); self.offset += 4; Ok(val) } pub fn read_field_count(&mut self) -> deserialize::Result { self.read_i32() } pub fn read_field_header(&mut self) -> deserialize::Result<(i32, i32)> { let oid = self.read_i32()?; let len = self.read_i32()?; Ok((oid, len)) } pub fn read_text_field(&mut self) -> deserialize::Result> { let (_oid, len) = self.read_field_header()?; if len == -1 { return Ok(None); } let len = len as usize; if self.offset + len > self.bytes.len() { return Err("unexpected end of input".into()); } let s = std::str::from_utf8(&self.bytes[self.offset..self.offset + len]) .map_err(|e| Box::new(e) as Box)?; self.offset += len; Ok(Some(s.to_string())) } pub fn read_bytea_field(&mut self) -> deserialize::Result>> { let (_oid, len) = self.read_field_header()?; if len == -1 { return Ok(None); } let len = len as usize; if self.offset + len > self.bytes.len() { return Err("unexpected end of input".into()); } let bytes = self.bytes[self.offset..self.offset + len].to_vec(); self.offset += len; Ok(Some(bytes)) } pub fn read_int_field(&mut self) -> deserialize::Result> { let (_oid, len) = self.read_field_header()?; if len == -1 { return Ok(None); } if len != 4 { return Err(format!("expected 4 bytes for i32, got {}", len).into()); } let val = self.read_i32()?; Ok(Some(val)) } pub fn read_enum_field(&mut self) -> deserialize::Result> where T: std::str::FromStr, ::Err: std::fmt::Display, { match self.read_text_field()? { Some(s) => { let val = s .parse::() .map_err(|e| format!("Failed to parse enum: {}", e))?; Ok(Some(val)) } None => Ok(None), } } pub fn read_composite_field(&mut self) -> deserialize::Result> where T: FromSql, ST: SqlType, { let (_oid, len) = self.read_field_header()?; if len == -1 { return Ok(None); } let len = len as usize; if self.offset + len > self.bytes.len() { return Err("unexpected end of input".into()); } let composite_bytes = &self.bytes[self.offset..self.offset + len]; self.offset += len; // For VideoCaption specifically let mut nested_reader = CompositeReader::new(composite_bytes); let field_count = nested_reader.read_field_count()?; if field_count != 3 { return Err(format!("nested composite: expected 3 fields, got {}", field_count).into()); } let lang = nested_reader .read_enum_field::()? .ok_or("caption.lang cannot be NULL")?; let mime_type = nested_reader .read_enum_field::()? .ok_or("caption.mime_type cannot be NULL")?; let cid = nested_reader .read_bytea_field()? .ok_or("caption.cid cannot be NULL")?; let caption = Box::new(VideoCaption { lang, mime_type, cid, }); let result = unsafe { Box::from_raw(Box::into_raw(caption) as *mut T) }; Ok(Some(*result)) } pub fn read_bigint_field(&mut self) -> deserialize::Result> { let (_oid, len) = self.read_field_header()?; if len == -1 { return Ok(None); } if len != 8 { return Err(format!("expected 8 bytes for i64, got {}", len).into()); } if self.offset + 8 > self.bytes.len() { return Err("unexpected end of input".into()); } let val = i64::from_be_bytes([ self.bytes[self.offset], self.bytes[self.offset + 1], self.bytes[self.offset + 2], self.bytes[self.offset + 3], self.bytes[self.offset + 4], self.bytes[self.offset + 5], self.bytes[self.offset + 6], self.bytes[self.offset + 7], ]); self.offset += 8; Ok(Some(val)) } pub fn read_bool_field(&mut self) -> deserialize::Result> { let (_oid, len) = self.read_field_header()?; if len == -1 { return Ok(None); } if len != 1 { return Err(format!("expected 1 byte for bool, got {}", len).into()); } if self.offset + 1 > self.bytes.len() { return Err("unexpected end of input".into()); } let val = self.bytes[self.offset] != 0; self.offset += 1; Ok(Some(val)) } pub fn read_timestamptz_field(&mut self) -> deserialize::Result>> { let (_oid, len) = self.read_field_header()?; if len == -1 { return Ok(None); } if len != 8 { return Err(format!("expected 8 bytes for timestamptz, got {}", len).into()); } if self.offset + 8 > self.bytes.len() { return Err("unexpected end of input".into()); } let micros = i64::from_be_bytes([ self.bytes[self.offset], self.bytes[self.offset + 1], self.bytes[self.offset + 2], self.bytes[self.offset + 3], self.bytes[self.offset + 4], self.bytes[self.offset + 5], self.bytes[self.offset + 6], self.bytes[self.offset + 7], ]); self.offset += 8; const PG_EPOCH_SECS: i64 = 946684800; let total_micros = PG_EPOCH_SECS * 1_000_000 + micros; let secs = total_micros / 1_000_000; let nsecs = ((total_micros % 1_000_000) * 1000) as u32; let timestamp = chrono::DateTime::from_timestamp(secs, nsecs).ok_or("Invalid timestamp")?; Ok(Some(timestamp)) } pub fn read_jsonb_field(&mut self) -> deserialize::Result> { let (_oid, len) = self.read_field_header()?; if len == -1 { return Ok(None); } let len = len as usize; if self.offset + len > self.bytes.len() { return Err("unexpected end of input".into()); } if len < 1 { return Err("JSONB data too short".into()); } let jsonb_bytes = &self.bytes[self.offset + 1..self.offset + len]; self.offset += len; let value: serde_json::Value = serde_json::from_slice(jsonb_bytes) .map_err(|e| format!("Failed to parse JSONB: {}", e))?; Ok(Some(value)) } }