Fast and robust atproto CAR file processing in rust

explicitly commit transaction

it was *okay* in drop since we were dropping in spawn_blocking tasks, but kind of setting things up for problems later.

also all the option stuff to let it be take()n was gross and now it's gone

Changed files
+11 -15
src
+9 -12
src/disk.rs
··· 53 53 pub fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, rusqlite::Error> { 54 54 let tx = self.conn.transaction()?; 55 55 // let insert_stmt = tx.prepare("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 56 - Ok(SqliteWriter { tx: Some(tx) }) 56 + Ok(SqliteWriter { tx }) 57 57 } 58 58 pub fn get_reader(&'_ self) -> Result<SqliteReader<'_>, rusqlite::Error> { 59 59 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; ··· 62 62 } 63 63 64 64 pub struct SqliteWriter<'conn> { 65 - tx: Option<rusqlite::Transaction<'conn>>, 66 - } 67 - 68 - /// oops careful in async 69 - impl Drop for SqliteWriter<'_> { 70 - fn drop(&mut self) { 71 - let tx = self.tx.take(); 72 - tx.unwrap().commit().unwrap(); 73 - } 65 + tx: rusqlite::Transaction<'conn>, 74 66 } 75 67 76 68 impl SqliteWriter<'_> { ··· 78 70 &mut self, 79 71 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 80 72 ) -> Result<(), DriveError> { 81 - let tx = self.tx.as_ref().unwrap(); 82 - let mut insert_stmt = tx.prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 73 + let mut insert_stmt = self 74 + .tx 75 + .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 83 76 for pair in kv { 84 77 let (k, v) = pair?; 85 78 insert_stmt.execute((k, v))?; 86 79 } 80 + Ok(()) 81 + } 82 + pub fn commit(self) -> Result<(), rusqlite::Error> { 83 + self.tx.commit()?; 87 84 Ok(()) 88 85 } 89 86 }
+2 -3
src/drive.rs
··· 197 197 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 198 198 199 199 writer.put_many(kvs)?; 200 - 201 - drop(writer); // cannot outlive access 200 + writer.commit()?; 202 201 Ok::<_, DriveError>(access) 203 202 }) 204 203 .await??; ··· 215 214 writer.put_many(kvs)?; 216 215 } 217 216 218 - drop(writer); // cannot outlive access 217 + writer.commit()?; 219 218 Ok::<_, DriveError>(access) 220 219 }); // await later 221 220