A fork of attic a self-hostable Nix Binary Cache server
at main 197 lines 5.7 kB view raw
1use std::io::IsTerminal; 2use std::path::PathBuf; 3use std::sync::Arc; 4 5use anyhow::{anyhow, Result}; 6use clap::Parser; 7use indicatif::MultiProgress; 8use tokio::io::{self, AsyncBufReadExt, BufReader}; 9 10use crate::api::ApiClient; 11use crate::cache::{CacheName, CacheRef, ServerName}; 12use crate::cli::Opts; 13use crate::config::Config; 14use crate::push::{PushConfig, PushSessionConfig, Pusher}; 15use attic::nix_store::NixStore; 16 17/// Push closures to a binary cache. 18#[derive(Debug, Parser)] 19pub struct Push { 20 /// The cache to push to. 21 /// 22 /// This can be either `servername:cachename` or `cachename` 23 /// when using the default server. 24 cache: CacheRef, 25 26 /// The store paths to push. 27 paths: Vec<PathBuf>, 28 29 /// Read paths from the standard input. 30 #[clap(long)] 31 stdin: bool, 32 33 /// Push the specified paths only and do not compute closures. 34 #[clap(long)] 35 no_closure: bool, 36 37 /// Ignore the upstream cache filter. 38 #[clap(long)] 39 ignore_upstream_cache_filter: bool, 40 41 /// The maximum number of parallel upload processes. 42 #[clap(short = 'j', long, default_value = "5")] 43 jobs: usize, 44 45 /// Always send the upload info as part of the payload. 46 #[clap(long, hide = true)] 47 force_preamble: bool, 48} 49 50struct PushContext { 51 store: Arc<NixStore>, 52 cache_name: CacheName, 53 server_name: ServerName, 54 pusher: Pusher, 55 no_closure: bool, 56 ignore_upstream_cache_filter: bool, 57} 58 59impl PushContext { 60 async fn push_static(self, paths: Vec<PathBuf>) -> Result<()> { 61 if paths.is_empty() { 62 eprintln!("🤷 Nothing specified."); 63 if !std::io::stdin().is_terminal() { 64 eprintln!( 65 "Hint: Pass --stdin to read the list of store paths from standard input." 66 ); 67 } 68 return Ok(()); 69 } 70 71 let roots = paths 72 .into_iter() 73 .map(|p| self.store.follow_store_path(p)) 74 .collect::<std::result::Result<Vec<_>, _>>()?; 75 76 let plan = self 77 .pusher 78 .plan(roots, self.no_closure, self.ignore_upstream_cache_filter) 79 .await?; 80 81 if plan.store_path_map.is_empty() { 82 if plan.num_all_paths == 0 { 83 eprintln!("🤷 Nothing selected."); 84 } else { 85 eprintln!( 86 "✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)", 87 num_already_cached = plan.num_already_cached, 88 num_upstream = plan.num_upstream, 89 ); 90 } 91 92 return Ok(()); 93 } else { 94 eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...", 95 cache = self.cache_name.as_str(), 96 server = self.server_name.as_str(), 97 num_missing_paths = plan.store_path_map.len(), 98 num_already_cached = plan.num_already_cached, 99 num_upstream = plan.num_upstream, 100 ); 101 } 102 103 for (_, path_info) in plan.store_path_map { 104 self.pusher.queue(path_info).await?; 105 } 106 107 let results = self.pusher.wait().await; 108 results.into_values().collect::<Result<Vec<()>>>()?; 109 110 Ok(()) 111 } 112 113 async fn push_stdin(self) -> Result<()> { 114 let session = self.pusher.into_push_session(PushSessionConfig { 115 no_closure: self.no_closure, 116 ignore_upstream_cache_filter: self.ignore_upstream_cache_filter, 117 }); 118 119 let stdin = BufReader::new(io::stdin()); 120 let mut lines = stdin.lines(); 121 while let Some(line) = lines.next_line().await? { 122 if line.is_empty() { 123 continue; 124 } 125 126 let path = self.store.follow_store_path(line)?; 127 session.queue_many(vec![path])?; 128 } 129 130 let results = session.wait().await?; 131 results.into_values().collect::<Result<Vec<()>>>()?; 132 133 Ok(()) 134 } 135} 136 137pub async fn run(opts: Opts) -> Result<()> { 138 let sub = opts.command.as_push().unwrap(); 139 if sub.jobs == 0 { 140 return Err(anyhow!("The number of jobs cannot be 0")); 141 } 142 143 let config = Config::load()?; 144 145 let store = Arc::new(NixStore::connect()?); 146 147 let (server_name, server, cache_name) = config.resolve_cache(&sub.cache)?; 148 149 let mut api = ApiClient::from_server_config(server.clone())?; 150 151 // Confirm remote cache validity, query cache config 152 let cache_config = api.get_cache_config(cache_name).await?; 153 154 if let Some(api_endpoint) = &cache_config.api_endpoint { 155 // Use delegated API endpoint 156 api.set_endpoint(api_endpoint)?; 157 } 158 159 let push_config = PushConfig { 160 num_workers: sub.jobs, 161 force_preamble: sub.force_preamble, 162 }; 163 164 let mp = MultiProgress::new(); 165 166 let pusher = Pusher::new( 167 store.clone(), 168 api, 169 cache_name.to_owned(), 170 cache_config, 171 mp, 172 push_config, 173 ); 174 175 let push_ctx = PushContext { 176 store, 177 cache_name: cache_name.clone(), 178 server_name: server_name.clone(), 179 pusher, 180 no_closure: sub.no_closure, 181 ignore_upstream_cache_filter: sub.ignore_upstream_cache_filter, 182 }; 183 184 if sub.stdin { 185 if !sub.paths.is_empty() { 186 return Err(anyhow!( 187 "No paths can be specified on the command line with --stdin" 188 )); 189 } 190 191 push_ctx.push_stdin().await?; 192 } else { 193 push_ctx.push_static(sub.paths.clone()).await?; 194 } 195 196 Ok(()) 197}