1//! PostgreSQL composite type reader for deserializing wire format
2
3use crate::infrastructure::types::{CaptionMimeType, LanguageCode};
4use chrono::{DateTime, Utc};
5use diesel::deserialize::{self, FromSql};
6use diesel::pg::Pg;
7use diesel::sql_types::SqlType;
8
9use super::types::VideoCaption;
10
11/// Helper for reading PostgreSQL composite types from wire format
12pub struct CompositeReader<'a> {
13 bytes: &'a [u8],
14 offset: usize,
15}
16
17impl<'a> CompositeReader<'a> {
18 pub fn new(bytes: &'a [u8]) -> Self {
19 Self { bytes, offset: 0 }
20 }
21
22 pub fn read_i32(&mut self) -> deserialize::Result<i32> {
23 if self.offset + 4 > self.bytes.len() {
24 return Err("unexpected end of input".into());
25 }
26 let val = i32::from_be_bytes([
27 self.bytes[self.offset],
28 self.bytes[self.offset + 1],
29 self.bytes[self.offset + 2],
30 self.bytes[self.offset + 3],
31 ]);
32 self.offset += 4;
33 Ok(val)
34 }
35
36 pub fn read_field_count(&mut self) -> deserialize::Result<i32> {
37 self.read_i32()
38 }
39
40 pub fn read_field_header(&mut self) -> deserialize::Result<(i32, i32)> {
41 let oid = self.read_i32()?;
42 let len = self.read_i32()?;
43 Ok((oid, len))
44 }
45
46 pub fn read_text_field(&mut self) -> deserialize::Result<Option<String>> {
47 let (_oid, len) = self.read_field_header()?;
48 if len == -1 {
49 return Ok(None);
50 }
51 let len = len as usize;
52 if self.offset + len > self.bytes.len() {
53 return Err("unexpected end of input".into());
54 }
55 let s = std::str::from_utf8(&self.bytes[self.offset..self.offset + len])
56 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
57 self.offset += len;
58 Ok(Some(s.to_string()))
59 }
60
61 pub fn read_bytea_field(&mut self) -> deserialize::Result<Option<Vec<u8>>> {
62 let (_oid, len) = self.read_field_header()?;
63 if len == -1 {
64 return Ok(None);
65 }
66 let len = len as usize;
67 if self.offset + len > self.bytes.len() {
68 return Err("unexpected end of input".into());
69 }
70 let bytes = self.bytes[self.offset..self.offset + len].to_vec();
71 self.offset += len;
72 Ok(Some(bytes))
73 }
74
75 pub fn read_int_field(&mut self) -> deserialize::Result<Option<i32>> {
76 let (_oid, len) = self.read_field_header()?;
77 if len == -1 {
78 return Ok(None);
79 }
80 if len != 4 {
81 return Err(format!("expected 4 bytes for i32, got {}", len).into());
82 }
83 let val = self.read_i32()?;
84 Ok(Some(val))
85 }
86
87 pub fn read_enum_field<T>(&mut self) -> deserialize::Result<Option<T>>
88 where
89 T: std::str::FromStr,
90 <T as std::str::FromStr>::Err: std::fmt::Display,
91 {
92 match self.read_text_field()? {
93 Some(s) => {
94 let val = s
95 .parse::<T>()
96 .map_err(|e| format!("Failed to parse enum: {}", e))?;
97 Ok(Some(val))
98 }
99 None => Ok(None),
100 }
101 }
102
103 pub fn read_composite_field<T, ST>(&mut self) -> deserialize::Result<Option<T>>
104 where
105 T: FromSql<ST, Pg>,
106 ST: SqlType,
107 {
108 let (_oid, len) = self.read_field_header()?;
109 if len == -1 {
110 return Ok(None);
111 }
112 let len = len as usize;
113 if self.offset + len > self.bytes.len() {
114 return Err("unexpected end of input".into());
115 }
116 let composite_bytes = &self.bytes[self.offset..self.offset + len];
117 self.offset += len;
118
119 // For VideoCaption specifically
120 let mut nested_reader = CompositeReader::new(composite_bytes);
121 let field_count = nested_reader.read_field_count()?;
122 if field_count != 3 {
123 return Err(format!("nested composite: expected 3 fields, got {}", field_count).into());
124 }
125
126 let lang = nested_reader
127 .read_enum_field::<LanguageCode>()?
128 .ok_or("caption.lang cannot be NULL")?;
129 let mime_type = nested_reader
130 .read_enum_field::<CaptionMimeType>()?
131 .ok_or("caption.mime_type cannot be NULL")?;
132 let cid = nested_reader
133 .read_bytea_field()?
134 .ok_or("caption.cid cannot be NULL")?;
135
136 let caption = Box::new(VideoCaption {
137 lang,
138 mime_type,
139 cid,
140 });
141 let result = unsafe { Box::from_raw(Box::into_raw(caption) as *mut T) };
142 Ok(Some(*result))
143 }
144
145 pub fn read_bigint_field(&mut self) -> deserialize::Result<Option<i64>> {
146 let (_oid, len) = self.read_field_header()?;
147 if len == -1 {
148 return Ok(None);
149 }
150 if len != 8 {
151 return Err(format!("expected 8 bytes for i64, got {}", len).into());
152 }
153 if self.offset + 8 > self.bytes.len() {
154 return Err("unexpected end of input".into());
155 }
156 let val = i64::from_be_bytes([
157 self.bytes[self.offset],
158 self.bytes[self.offset + 1],
159 self.bytes[self.offset + 2],
160 self.bytes[self.offset + 3],
161 self.bytes[self.offset + 4],
162 self.bytes[self.offset + 5],
163 self.bytes[self.offset + 6],
164 self.bytes[self.offset + 7],
165 ]);
166 self.offset += 8;
167 Ok(Some(val))
168 }
169
170 pub fn read_bool_field(&mut self) -> deserialize::Result<Option<bool>> {
171 let (_oid, len) = self.read_field_header()?;
172 if len == -1 {
173 return Ok(None);
174 }
175 if len != 1 {
176 return Err(format!("expected 1 byte for bool, got {}", len).into());
177 }
178 if self.offset + 1 > self.bytes.len() {
179 return Err("unexpected end of input".into());
180 }
181 let val = self.bytes[self.offset] != 0;
182 self.offset += 1;
183 Ok(Some(val))
184 }
185
186 pub fn read_timestamptz_field(&mut self) -> deserialize::Result<Option<DateTime<Utc>>> {
187 let (_oid, len) = self.read_field_header()?;
188 if len == -1 {
189 return Ok(None);
190 }
191 if len != 8 {
192 return Err(format!("expected 8 bytes for timestamptz, got {}", len).into());
193 }
194 if self.offset + 8 > self.bytes.len() {
195 return Err("unexpected end of input".into());
196 }
197
198 let micros = i64::from_be_bytes([
199 self.bytes[self.offset],
200 self.bytes[self.offset + 1],
201 self.bytes[self.offset + 2],
202 self.bytes[self.offset + 3],
203 self.bytes[self.offset + 4],
204 self.bytes[self.offset + 5],
205 self.bytes[self.offset + 6],
206 self.bytes[self.offset + 7],
207 ]);
208 self.offset += 8;
209
210 const PG_EPOCH_SECS: i64 = 946684800;
211 let total_micros = PG_EPOCH_SECS * 1_000_000 + micros;
212 let secs = total_micros / 1_000_000;
213 let nsecs = ((total_micros % 1_000_000) * 1000) as u32;
214
215 let timestamp = chrono::DateTime::from_timestamp(secs, nsecs).ok_or("Invalid timestamp")?;
216 Ok(Some(timestamp))
217 }
218
219 pub fn read_jsonb_field(&mut self) -> deserialize::Result<Option<serde_json::Value>> {
220 let (_oid, len) = self.read_field_header()?;
221 if len == -1 {
222 return Ok(None);
223 }
224 let len = len as usize;
225 if self.offset + len > self.bytes.len() {
226 return Err("unexpected end of input".into());
227 }
228
229 if len < 1 {
230 return Err("JSONB data too short".into());
231 }
232
233 let jsonb_bytes = &self.bytes[self.offset + 1..self.offset + len];
234 self.offset += len;
235
236 let value: serde_json::Value = serde_json::from_slice(jsonb_bytes)
237 .map_err(|e| format!("Failed to parse JSONB: {}", e))?;
238 Ok(Some(value))
239 }
240}