A fork of attic a self-hostable Nix Binary Cache server
1use std::io::IsTerminal;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{anyhow, Result};
6use clap::Parser;
7use indicatif::MultiProgress;
8use tokio::io::{self, AsyncBufReadExt, BufReader};
9
10use crate::api::ApiClient;
11use crate::cache::{CacheName, CacheRef, ServerName};
12use crate::cli::Opts;
13use crate::config::Config;
14use crate::push::{PushConfig, PushSessionConfig, Pusher};
15use attic::nix_store::NixStore;
16
17/// Push closures to a binary cache.
18#[derive(Debug, Parser)]
19pub struct Push {
20 /// The cache to push to.
21 ///
22 /// This can be either `servername:cachename` or `cachename`
23 /// when using the default server.
24 cache: CacheRef,
25
26 /// The store paths to push.
27 paths: Vec<PathBuf>,
28
29 /// Read paths from the standard input.
30 #[clap(long)]
31 stdin: bool,
32
33 /// Push the specified paths only and do not compute closures.
34 #[clap(long)]
35 no_closure: bool,
36
37 /// Ignore the upstream cache filter.
38 #[clap(long)]
39 ignore_upstream_cache_filter: bool,
40
41 /// The maximum number of parallel upload processes.
42 #[clap(short = 'j', long, default_value = "5")]
43 jobs: usize,
44
45 /// Always send the upload info as part of the payload.
46 #[clap(long, hide = true)]
47 force_preamble: bool,
48}
49
50struct PushContext {
51 store: Arc<NixStore>,
52 cache_name: CacheName,
53 server_name: ServerName,
54 pusher: Pusher,
55 no_closure: bool,
56 ignore_upstream_cache_filter: bool,
57}
58
59impl PushContext {
60 async fn push_static(self, paths: Vec<PathBuf>) -> Result<()> {
61 if paths.is_empty() {
62 eprintln!("🤷 Nothing specified.");
63 if !std::io::stdin().is_terminal() {
64 eprintln!(
65 "Hint: Pass --stdin to read the list of store paths from standard input."
66 );
67 }
68 return Ok(());
69 }
70
71 let roots = paths
72 .into_iter()
73 .map(|p| self.store.follow_store_path(p))
74 .collect::<std::result::Result<Vec<_>, _>>()?;
75
76 let plan = self
77 .pusher
78 .plan(roots, self.no_closure, self.ignore_upstream_cache_filter)
79 .await?;
80
81 if plan.store_path_map.is_empty() {
82 if plan.num_all_paths == 0 {
83 eprintln!("🤷 Nothing selected.");
84 } else {
85 eprintln!(
86 "✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)",
87 num_already_cached = plan.num_already_cached,
88 num_upstream = plan.num_upstream,
89 );
90 }
91
92 return Ok(());
93 } else {
94 eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...",
95 cache = self.cache_name.as_str(),
96 server = self.server_name.as_str(),
97 num_missing_paths = plan.store_path_map.len(),
98 num_already_cached = plan.num_already_cached,
99 num_upstream = plan.num_upstream,
100 );
101 }
102
103 for (_, path_info) in plan.store_path_map {
104 self.pusher.queue(path_info).await?;
105 }
106
107 let results = self.pusher.wait().await;
108 results.into_values().collect::<Result<Vec<()>>>()?;
109
110 Ok(())
111 }
112
113 async fn push_stdin(self) -> Result<()> {
114 let session = self.pusher.into_push_session(PushSessionConfig {
115 no_closure: self.no_closure,
116 ignore_upstream_cache_filter: self.ignore_upstream_cache_filter,
117 });
118
119 let stdin = BufReader::new(io::stdin());
120 let mut lines = stdin.lines();
121 while let Some(line) = lines.next_line().await? {
122 if line.is_empty() {
123 continue;
124 }
125
126 let path = self.store.follow_store_path(line)?;
127 session.queue_many(vec![path])?;
128 }
129
130 let results = session.wait().await?;
131 results.into_values().collect::<Result<Vec<()>>>()?;
132
133 Ok(())
134 }
135}
136
137pub async fn run(opts: Opts) -> Result<()> {
138 let sub = opts.command.as_push().unwrap();
139 if sub.jobs == 0 {
140 return Err(anyhow!("The number of jobs cannot be 0"));
141 }
142
143 let config = Config::load()?;
144
145 let store = Arc::new(NixStore::connect()?);
146
147 let (server_name, server, cache_name) = config.resolve_cache(&sub.cache)?;
148
149 let mut api = ApiClient::from_server_config(server.clone())?;
150
151 // Confirm remote cache validity, query cache config
152 let cache_config = api.get_cache_config(cache_name).await?;
153
154 if let Some(api_endpoint) = &cache_config.api_endpoint {
155 // Use delegated API endpoint
156 api.set_endpoint(api_endpoint)?;
157 }
158
159 let push_config = PushConfig {
160 num_workers: sub.jobs,
161 force_preamble: sub.force_preamble,
162 };
163
164 let mp = MultiProgress::new();
165
166 let pusher = Pusher::new(
167 store.clone(),
168 api,
169 cache_name.to_owned(),
170 cache_config,
171 mp,
172 push_config,
173 );
174
175 let push_ctx = PushContext {
176 store,
177 cache_name: cache_name.clone(),
178 server_name: server_name.clone(),
179 pusher,
180 no_closure: sub.no_closure,
181 ignore_upstream_cache_filter: sub.ignore_upstream_cache_filter,
182 };
183
184 if sub.stdin {
185 if !sub.paths.is_empty() {
186 return Err(anyhow!(
187 "No paths can be specified on the command line with --stdin"
188 ));
189 }
190
191 push_ctx.push_stdin().await?;
192 } else {
193 push_ctx.push_static(sub.paths.clone()).await?;
194 }
195
196 Ok(())
197}