Rust AppView - highly experimental!
at experiments 240 lines 7.9 kB view raw
1//! PostgreSQL composite type reader for deserializing wire format 2 3use crate::infrastructure::types::{CaptionMimeType, LanguageCode}; 4use chrono::{DateTime, Utc}; 5use diesel::deserialize::{self, FromSql}; 6use diesel::pg::Pg; 7use diesel::sql_types::SqlType; 8 9use super::types::VideoCaption; 10 11/// Helper for reading PostgreSQL composite types from wire format 12pub struct CompositeReader<'a> { 13 bytes: &'a [u8], 14 offset: usize, 15} 16 17impl<'a> CompositeReader<'a> { 18 pub fn new(bytes: &'a [u8]) -> Self { 19 Self { bytes, offset: 0 } 20 } 21 22 pub fn read_i32(&mut self) -> deserialize::Result<i32> { 23 if self.offset + 4 > self.bytes.len() { 24 return Err("unexpected end of input".into()); 25 } 26 let val = i32::from_be_bytes([ 27 self.bytes[self.offset], 28 self.bytes[self.offset + 1], 29 self.bytes[self.offset + 2], 30 self.bytes[self.offset + 3], 31 ]); 32 self.offset += 4; 33 Ok(val) 34 } 35 36 pub fn read_field_count(&mut self) -> deserialize::Result<i32> { 37 self.read_i32() 38 } 39 40 pub fn read_field_header(&mut self) -> deserialize::Result<(i32, i32)> { 41 let oid = self.read_i32()?; 42 let len = self.read_i32()?; 43 Ok((oid, len)) 44 } 45 46 pub fn read_text_field(&mut self) -> deserialize::Result<Option<String>> { 47 let (_oid, len) = self.read_field_header()?; 48 if len == -1 { 49 return Ok(None); 50 } 51 let len = len as usize; 52 if self.offset + len > self.bytes.len() { 53 return Err("unexpected end of input".into()); 54 } 55 let s = std::str::from_utf8(&self.bytes[self.offset..self.offset + len]) 56 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?; 57 self.offset += len; 58 Ok(Some(s.to_string())) 59 } 60 61 pub fn read_bytea_field(&mut self) -> deserialize::Result<Option<Vec<u8>>> { 62 let (_oid, len) = self.read_field_header()?; 63 if len == -1 { 64 return Ok(None); 65 } 66 let len = len as usize; 67 if self.offset + len > self.bytes.len() { 68 return Err("unexpected end of input".into()); 69 } 70 let bytes = self.bytes[self.offset..self.offset + len].to_vec(); 71 self.offset += len; 72 Ok(Some(bytes)) 73 } 74 75 pub fn read_int_field(&mut self) -> deserialize::Result<Option<i32>> { 76 let (_oid, len) = self.read_field_header()?; 77 if len == -1 { 78 return Ok(None); 79 } 80 if len != 4 { 81 return Err(format!("expected 4 bytes for i32, got {}", len).into()); 82 } 83 let val = self.read_i32()?; 84 Ok(Some(val)) 85 } 86 87 pub fn read_enum_field<T>(&mut self) -> deserialize::Result<Option<T>> 88 where 89 T: std::str::FromStr, 90 <T as std::str::FromStr>::Err: std::fmt::Display, 91 { 92 match self.read_text_field()? { 93 Some(s) => { 94 let val = s 95 .parse::<T>() 96 .map_err(|e| format!("Failed to parse enum: {}", e))?; 97 Ok(Some(val)) 98 } 99 None => Ok(None), 100 } 101 } 102 103 pub fn read_composite_field<T, ST>(&mut self) -> deserialize::Result<Option<T>> 104 where 105 T: FromSql<ST, Pg>, 106 ST: SqlType, 107 { 108 let (_oid, len) = self.read_field_header()?; 109 if len == -1 { 110 return Ok(None); 111 } 112 let len = len as usize; 113 if self.offset + len > self.bytes.len() { 114 return Err("unexpected end of input".into()); 115 } 116 let composite_bytes = &self.bytes[self.offset..self.offset + len]; 117 self.offset += len; 118 119 // For VideoCaption specifically 120 let mut nested_reader = CompositeReader::new(composite_bytes); 121 let field_count = nested_reader.read_field_count()?; 122 if field_count != 3 { 123 return Err(format!("nested composite: expected 3 fields, got {}", field_count).into()); 124 } 125 126 let lang = nested_reader 127 .read_enum_field::<LanguageCode>()? 128 .ok_or("caption.lang cannot be NULL")?; 129 let mime_type = nested_reader 130 .read_enum_field::<CaptionMimeType>()? 131 .ok_or("caption.mime_type cannot be NULL")?; 132 let cid = nested_reader 133 .read_bytea_field()? 134 .ok_or("caption.cid cannot be NULL")?; 135 136 let caption = Box::new(VideoCaption { 137 lang, 138 mime_type, 139 cid, 140 }); 141 let result = unsafe { Box::from_raw(Box::into_raw(caption) as *mut T) }; 142 Ok(Some(*result)) 143 } 144 145 pub fn read_bigint_field(&mut self) -> deserialize::Result<Option<i64>> { 146 let (_oid, len) = self.read_field_header()?; 147 if len == -1 { 148 return Ok(None); 149 } 150 if len != 8 { 151 return Err(format!("expected 8 bytes for i64, got {}", len).into()); 152 } 153 if self.offset + 8 > self.bytes.len() { 154 return Err("unexpected end of input".into()); 155 } 156 let val = i64::from_be_bytes([ 157 self.bytes[self.offset], 158 self.bytes[self.offset + 1], 159 self.bytes[self.offset + 2], 160 self.bytes[self.offset + 3], 161 self.bytes[self.offset + 4], 162 self.bytes[self.offset + 5], 163 self.bytes[self.offset + 6], 164 self.bytes[self.offset + 7], 165 ]); 166 self.offset += 8; 167 Ok(Some(val)) 168 } 169 170 pub fn read_bool_field(&mut self) -> deserialize::Result<Option<bool>> { 171 let (_oid, len) = self.read_field_header()?; 172 if len == -1 { 173 return Ok(None); 174 } 175 if len != 1 { 176 return Err(format!("expected 1 byte for bool, got {}", len).into()); 177 } 178 if self.offset + 1 > self.bytes.len() { 179 return Err("unexpected end of input".into()); 180 } 181 let val = self.bytes[self.offset] != 0; 182 self.offset += 1; 183 Ok(Some(val)) 184 } 185 186 pub fn read_timestamptz_field(&mut self) -> deserialize::Result<Option<DateTime<Utc>>> { 187 let (_oid, len) = self.read_field_header()?; 188 if len == -1 { 189 return Ok(None); 190 } 191 if len != 8 { 192 return Err(format!("expected 8 bytes for timestamptz, got {}", len).into()); 193 } 194 if self.offset + 8 > self.bytes.len() { 195 return Err("unexpected end of input".into()); 196 } 197 198 let micros = i64::from_be_bytes([ 199 self.bytes[self.offset], 200 self.bytes[self.offset + 1], 201 self.bytes[self.offset + 2], 202 self.bytes[self.offset + 3], 203 self.bytes[self.offset + 4], 204 self.bytes[self.offset + 5], 205 self.bytes[self.offset + 6], 206 self.bytes[self.offset + 7], 207 ]); 208 self.offset += 8; 209 210 const PG_EPOCH_SECS: i64 = 946684800; 211 let total_micros = PG_EPOCH_SECS * 1_000_000 + micros; 212 let secs = total_micros / 1_000_000; 213 let nsecs = ((total_micros % 1_000_000) * 1000) as u32; 214 215 let timestamp = chrono::DateTime::from_timestamp(secs, nsecs).ok_or("Invalid timestamp")?; 216 Ok(Some(timestamp)) 217 } 218 219 pub fn read_jsonb_field(&mut self) -> deserialize::Result<Option<serde_json::Value>> { 220 let (_oid, len) = self.read_field_header()?; 221 if len == -1 { 222 return Ok(None); 223 } 224 let len = len as usize; 225 if self.offset + len > self.bytes.len() { 226 return Err("unexpected end of input".into()); 227 } 228 229 if len < 1 { 230 return Err("JSONB data too short".into()); 231 } 232 233 let jsonb_bytes = &self.bytes[self.offset + 1..self.offset + len]; 234 self.offset += len; 235 236 let value: serde_json::Value = serde_json::from_slice(jsonb_bytes) 237 .map_err(|e| format!("Failed to parse JSONB: {}", e))?; 238 Ok(Some(value)) 239 } 240}