ALPHA: wire is a tool to deploy nixos systems
wire.althaea.zone/
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2024-2025 wire Contributors
3
4use std::{
5 env,
6 path::{Path, PathBuf},
7};
8
9use sqlx::{
10 Pool, Sqlite,
11 migrate::Migrator,
12 sqlite::{SqliteConnectOptions, SqlitePoolOptions},
13};
14use tokio::fs::create_dir_all;
15use tracing::{debug, error, trace};
16
17use crate::hive::{FlakePrefetch, Hive};
18
19#[derive(Clone)]
20pub struct InspectionCache {
21 pool: Pool<Sqlite>,
22}
23
24static MIGRATOR: Migrator = sqlx::migrate!("src/cache/migrations");
25
26async fn get_cache_directory() -> Option<PathBuf> {
27 let home = PathBuf::from(
28 env::var("HOME")
29 .inspect_err(|_| error!("HOME env var not found"))
30 .ok()?,
31 );
32
33 trace!(home = ?home);
34
35 let cache_home = env::var("XDG_CACHE_HOME")
36 .inspect_err(|_| debug!("XDG_CACHE_HOME not found"))
37 .ok()
38 .map(PathBuf::from)
39 .unwrap_or(home.join(".cache"));
40
41 let cache_directory = cache_home.join("wire");
42
43 trace!(cache_directory = ?cache_directory);
44
45 let _ = create_dir_all(&cache_directory).await;
46
47 Some(cache_directory)
48}
49
50impl InspectionCache {
51 pub async fn new() -> Option<Self> {
52 let cache_path = get_cache_directory().await?.join("inspect.db");
53 debug!(cache_path = ?cache_path);
54
55 let pool = SqlitePoolOptions::new()
56 .max_connections(1)
57 .connect_with(
58 SqliteConnectOptions::new()
59 .filename(cache_path)
60 .create_if_missing(true),
61 )
62 .await
63 .inspect_err(|x| error!("failed to open cache db: {x}"))
64 .ok()?;
65
66 MIGRATOR
67 .run(&pool)
68 .await
69 .inspect_err(|err| error!("failed to run cache migrations: {err:?}"))
70 .ok()?;
71
72 Some(Self { pool })
73 }
74
75 fn cache_invalid(store_path: &String) -> bool {
76 let path = Path::new(store_path);
77
78 // possible TOCTOU
79 !path.exists()
80 }
81
82 pub async fn get_hive(&self, prefetch: &FlakePrefetch) -> Option<Hive> {
83 struct Query {
84 json_value: Vec<u8>,
85 store_path: String,
86 }
87
88 let cached_blob = sqlx::query_as!(
89 Query,
90 "
91 select
92 inspection_blobs.json_value,
93 inspection_cache.store_path
94 from
95 inspection_blobs
96 join inspection_cache on inspection_cache.blob_id = inspection_blobs.id
97 where
98 inspection_cache.store_path = $1
99 and inspection_cache.hash = $2
100 and inspection_blobs.schema_version = $3
101 limit
102 1
103 ",
104 prefetch.store_path,
105 prefetch.hash,
106 Hive::SCHEMA_VERSION
107 )
108 .fetch_optional(&self.pool)
109 .await
110 .inspect_err(|x| error!("failed to fetch cached hive: {x}"))
111 .ok()??;
112
113 // the cached path may of been garbage collected, discard it
114 // it is quite hard to replicate this bug but its occurred to me
115 // atleast once
116 if Self::cache_invalid(&cached_blob.store_path) {
117 trace!("discarding cache that does not exist in the nix store");
118 return None;
119 }
120
121 trace!(
122 "read {} bytes of zstd data from cache",
123 cached_blob.json_value.len()
124 );
125
126 let json_string = zstd::decode_all(cached_blob.json_value.as_slice())
127 .inspect_err(|err| error!("failed to decode cached zstd data: {err}"))
128 .ok()?;
129
130 trace!(
131 "inflated {} > {} in decoding",
132 cached_blob.json_value.len(),
133 json_string.len()
134 );
135
136 serde_json::from_slice(&json_string)
137 .inspect_err(|err| {
138 error!("could not use cached evaluation: {err}");
139 })
140 .ok()
141 }
142
143 pub async fn store_hive(&self, prefetch: &FlakePrefetch, json_value: &String) {
144 let Ok(json_value) = zstd::encode_all(json_value.as_bytes(), 0)
145 .inspect_err(|err| error!("failed to encode data w/ zstd: {err}"))
146 else {
147 return;
148 };
149
150 let hive_inspection = sqlx::query_scalar!(
151 "
152 insert into inspection_blobs (json_value, schema_version)
153 values ($1, $2)
154 on conflict(json_value)
155 do update set json_value = excluded.json_value
156 returning inspection_blobs.id
157 ",
158 json_value,
159 Hive::SCHEMA_VERSION
160 )
161 .fetch_one(&self.pool)
162 .await
163 .inspect_err(|x| error!("could not insert hive_inspection: {x}"));
164
165 let Ok(blob_id) = hive_inspection else {
166 return;
167 };
168
169 let cached_inspection = sqlx::query!(
170 "
171 insert into
172 inspection_cache (store_path, hash, blob_id)
173 values
174 ($1, $2, $3)
175 ",
176 prefetch.store_path,
177 prefetch.hash,
178 blob_id
179 )
180 .execute(&self.pool)
181 .await;
182
183 if let Err(err) = cached_inspection {
184 error!("could not insert cached_inspection: {err}");
185 }
186 }
187
188 pub async fn gc(&self) -> Result<(), sqlx::Error> {
189 // keep newest 30 AND
190 // delete caches that refer to a blob w/ wrong schema
191 sqlx::query!(
192 "delete from inspection_cache
193where
194 blob_id in (
195 select
196 id
197 from
198 inspection_blobs
199 where
200 schema_version != $1
201 )
202 or ROWID in (
203 select
204 ROWID
205 from
206 inspection_cache
207 order by
208 ROWID desc
209 limit
210 -1
211 offset
212 30
213 )",
214 Hive::SCHEMA_VERSION
215 )
216 .execute(&self.pool)
217 .await?;
218
219 // delete orphaned blobs
220 sqlx::query!(
221 "delete from inspection_blobs
222where
223 not exists (
224 select
225 1
226 from
227 inspection_cache
228 where
229 inspection_cache.blob_id = inspection_blobs.id
230 )"
231 )
232 .execute(&self.pool)
233 .await?;
234
235 Ok(())
236 }
237}