ALPHA: wire is a tool to deploy nixos systems wire.althaea.zone/
at trunk 237 lines 6.1 kB view raw
1// SPDX-License-Identifier: AGPL-3.0-or-later 2// Copyright 2024-2025 wire Contributors 3 4use std::{ 5 env, 6 path::{Path, PathBuf}, 7}; 8 9use sqlx::{ 10 Pool, Sqlite, 11 migrate::Migrator, 12 sqlite::{SqliteConnectOptions, SqlitePoolOptions}, 13}; 14use tokio::fs::create_dir_all; 15use tracing::{debug, error, trace}; 16 17use crate::hive::{FlakePrefetch, Hive}; 18 19#[derive(Clone)] 20pub struct InspectionCache { 21 pool: Pool<Sqlite>, 22} 23 24static MIGRATOR: Migrator = sqlx::migrate!("src/cache/migrations"); 25 26async fn get_cache_directory() -> Option<PathBuf> { 27 let home = PathBuf::from( 28 env::var("HOME") 29 .inspect_err(|_| error!("HOME env var not found")) 30 .ok()?, 31 ); 32 33 trace!(home = ?home); 34 35 let cache_home = env::var("XDG_CACHE_HOME") 36 .inspect_err(|_| debug!("XDG_CACHE_HOME not found")) 37 .ok() 38 .map(PathBuf::from) 39 .unwrap_or(home.join(".cache")); 40 41 let cache_directory = cache_home.join("wire"); 42 43 trace!(cache_directory = ?cache_directory); 44 45 let _ = create_dir_all(&cache_directory).await; 46 47 Some(cache_directory) 48} 49 50impl InspectionCache { 51 pub async fn new() -> Option<Self> { 52 let cache_path = get_cache_directory().await?.join("inspect.db"); 53 debug!(cache_path = ?cache_path); 54 55 let pool = SqlitePoolOptions::new() 56 .max_connections(1) 57 .connect_with( 58 SqliteConnectOptions::new() 59 .filename(cache_path) 60 .create_if_missing(true), 61 ) 62 .await 63 .inspect_err(|x| error!("failed to open cache db: {x}")) 64 .ok()?; 65 66 MIGRATOR 67 .run(&pool) 68 .await 69 .inspect_err(|err| error!("failed to run cache migrations: {err:?}")) 70 .ok()?; 71 72 Some(Self { pool }) 73 } 74 75 fn cache_invalid(store_path: &String) -> bool { 76 let path = Path::new(store_path); 77 78 // possible TOCTOU 79 !path.exists() 80 } 81 82 pub async fn get_hive(&self, prefetch: &FlakePrefetch) -> Option<Hive> { 83 struct Query { 84 json_value: Vec<u8>, 85 store_path: String, 86 } 87 88 let cached_blob = sqlx::query_as!( 89 Query, 90 " 91 select 92 inspection_blobs.json_value, 93 inspection_cache.store_path 94 from 95 inspection_blobs 96 join inspection_cache on inspection_cache.blob_id = inspection_blobs.id 97 where 98 inspection_cache.store_path = $1 99 and inspection_cache.hash = $2 100 and inspection_blobs.schema_version = $3 101 limit 102 1 103 ", 104 prefetch.store_path, 105 prefetch.hash, 106 Hive::SCHEMA_VERSION 107 ) 108 .fetch_optional(&self.pool) 109 .await 110 .inspect_err(|x| error!("failed to fetch cached hive: {x}")) 111 .ok()??; 112 113 // the cached path may of been garbage collected, discard it 114 // it is quite hard to replicate this bug but its occurred to me 115 // atleast once 116 if Self::cache_invalid(&cached_blob.store_path) { 117 trace!("discarding cache that does not exist in the nix store"); 118 return None; 119 } 120 121 trace!( 122 "read {} bytes of zstd data from cache", 123 cached_blob.json_value.len() 124 ); 125 126 let json_string = zstd::decode_all(cached_blob.json_value.as_slice()) 127 .inspect_err(|err| error!("failed to decode cached zstd data: {err}")) 128 .ok()?; 129 130 trace!( 131 "inflated {} > {} in decoding", 132 cached_blob.json_value.len(), 133 json_string.len() 134 ); 135 136 serde_json::from_slice(&json_string) 137 .inspect_err(|err| { 138 error!("could not use cached evaluation: {err}"); 139 }) 140 .ok() 141 } 142 143 pub async fn store_hive(&self, prefetch: &FlakePrefetch, json_value: &String) { 144 let Ok(json_value) = zstd::encode_all(json_value.as_bytes(), 0) 145 .inspect_err(|err| error!("failed to encode data w/ zstd: {err}")) 146 else { 147 return; 148 }; 149 150 let hive_inspection = sqlx::query_scalar!( 151 " 152 insert into inspection_blobs (json_value, schema_version) 153 values ($1, $2) 154 on conflict(json_value) 155 do update set json_value = excluded.json_value 156 returning inspection_blobs.id 157 ", 158 json_value, 159 Hive::SCHEMA_VERSION 160 ) 161 .fetch_one(&self.pool) 162 .await 163 .inspect_err(|x| error!("could not insert hive_inspection: {x}")); 164 165 let Ok(blob_id) = hive_inspection else { 166 return; 167 }; 168 169 let cached_inspection = sqlx::query!( 170 " 171 insert into 172 inspection_cache (store_path, hash, blob_id) 173 values 174 ($1, $2, $3) 175 ", 176 prefetch.store_path, 177 prefetch.hash, 178 blob_id 179 ) 180 .execute(&self.pool) 181 .await; 182 183 if let Err(err) = cached_inspection { 184 error!("could not insert cached_inspection: {err}"); 185 } 186 } 187 188 pub async fn gc(&self) -> Result<(), sqlx::Error> { 189 // keep newest 30 AND 190 // delete caches that refer to a blob w/ wrong schema 191 sqlx::query!( 192 "delete from inspection_cache 193where 194 blob_id in ( 195 select 196 id 197 from 198 inspection_blobs 199 where 200 schema_version != $1 201 ) 202 or ROWID in ( 203 select 204 ROWID 205 from 206 inspection_cache 207 order by 208 ROWID desc 209 limit 210 -1 211 offset 212 30 213 )", 214 Hive::SCHEMA_VERSION 215 ) 216 .execute(&self.pool) 217 .await?; 218 219 // delete orphaned blobs 220 sqlx::query!( 221 "delete from inspection_blobs 222where 223 not exists ( 224 select 225 1 226 from 227 inspection_cache 228 where 229 inspection_cache.blob_id = inspection_blobs.id 230 )" 231 ) 232 .execute(&self.pool) 233 .await?; 234 235 Ok(()) 236 } 237}