Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

*: serialize on disk with bincode2 instead of serde

+43 -9
+33
Cargo.lock
··· 32 32 dependencies = [ 33 33 "anyhow", 34 34 "async-compression", 35 + "bincode", 35 36 "bytevec", 36 37 "chrono", 37 38 "clap", ··· 264 265 version = "0.22.1" 265 266 source = "registry+https://github.com/rust-lang/crates.io-index" 266 267 checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" 268 + 269 + [[package]] 270 + name = "bincode" 271 + version = "2.0.1" 272 + source = "registry+https://github.com/rust-lang/crates.io-index" 273 + checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" 274 + dependencies = [ 275 + "bincode_derive", 276 + "serde", 277 + "unty", 278 + ] 279 + 280 + [[package]] 281 + name = "bincode_derive" 282 + version = "2.0.1" 283 + source = "registry+https://github.com/rust-lang/crates.io-index" 284 + checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" 285 + dependencies = [ 286 + "virtue", 287 + ] 267 288 268 289 [[package]] 269 290 name = "bindgen" ··· 3053 3074 checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" 3054 3075 3055 3076 [[package]] 3077 + name = "unty" 3078 + version = "0.0.4" 3079 + source = "registry+https://github.com/rust-lang/crates.io-index" 3080 + checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" 3081 + 3082 + [[package]] 3056 3083 name = "url" 3057 3084 version = "2.5.7" 3058 3085 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3093 3120 version = "0.9.5" 3094 3121 source = "registry+https://github.com/rust-lang/crates.io-index" 3095 3122 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 3123 + 3124 + [[package]] 3125 + name = "virtue" 3126 + version = "0.0.18" 3127 + source = "registry+https://github.com/rust-lang/crates.io-index" 3128 + checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" 3096 3129 3097 3130 [[package]] 3098 3131 name = "want"
+1
Cargo.toml
··· 9 9 [dependencies] 10 10 anyhow = "1.0.99" 11 11 async-compression = { version = "0.4.30", features = ["futures-io", "tokio", "gzip"] } 12 + bincode = { version = "2.0.1", features = ["derive"] } 12 13 bytevec = "0.2.0" 13 14 chrono = { version = "0.4.42", features = ["serde"] } 14 15 clap = { version = "4.5.47", features = ["derive", "env"] }
+9 -9
src/plc_rocksdb.rs
··· 1 1 use async_compression::tokio::write::{GzipDecoder, GzipEncoder}; 2 - use futures::{TryFutureExt, stream}; 2 + use futures::stream; 3 3 use rocksdb::Options; 4 4 use serde::{Deserialize, Serialize}; 5 5 use std::{path::PathBuf, sync::Arc, time::Instant}; ··· 56 56 db.put_cf( 57 57 &cf, 58 58 op_key(&op, id), 59 - serde_json::to_vec(&RocksOp::from_op(&op).await).unwrap(), 59 + bincode::encode_to_vec(&RocksOp::from_op(&op).await, bincode::config::standard()) 60 + .unwrap(), 60 61 )?; 61 62 self.set_latest_timestamp(&db, op.created_at).await?; 62 63 ··· 114 115 } 115 116 } 116 117 117 - #[derive(Serialize, Deserialize)] 118 + #[derive(bincode::Encode, bincode::Decode)] 118 119 struct RocksOp { 119 120 pub did: String, 120 121 pub cid: String, 121 - pub created_at: Dt, 122 + pub created_at: i64, 122 123 pub nullified: bool, 123 124 pub operation: Vec<u8>, 124 125 } ··· 135 136 operation: op_gz, 136 137 did: op.did.clone(), 137 138 cid: op.cid.clone(), 138 - created_at: op.created_at, 139 + created_at: op.created_at.timestamp_millis(), 139 140 nullified: op.nullified, 140 141 } 141 142 } ··· 150 151 Op { 151 152 did: self.did.clone(), 152 153 cid: self.cid.clone(), 153 - created_at: self.created_at, 154 + created_at: Dt::from_timestamp_millis(self.created_at).unwrap(), 154 155 nullified: self.nullified, 155 156 operation: serde_json::value::RawValue::from_string(dec_op).unwrap(), 156 157 } ··· 237 238 } 238 239 239 240 async fn query_ops(&self, did: String) -> Result<Vec<Op>, Error> { 240 - // TODO(geesawra): check if did exists first! 241 241 let db = self.db.lock().await; 242 242 243 243 let id = match self ··· 263 263 return None; 264 264 } 265 265 266 - let rop: RocksOp = serde_json::from_slice(&v).unwrap(); 266 + let asd = bincode::decode_from_slice(&v, bincode::config::standard()).unwrap(); 267 267 268 - Some(rop) 268 + Some(asd.0) 269 269 }) 270 270 .collect(); 271 271