nixpkgs mirror (for testing)
github.com/NixOS/nixpkgs
nix
1from graphlib import TopologicalSorter
2from pathlib import Path
3from typing import Any, Final, Generator, Literal
4import argparse
5import asyncio
6import contextlib
7import json
8import os
9import re
10import shlex
11import subprocess
12import sys
13import tempfile
14
15
16Order = Literal["arbitrary", "reverse-topological", "topological"]
17
18
19FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES: Final[str] = (
20 "::fake_dependency_for_independent_packages"
21)
22
23
24class CalledProcessError(Exception):
25 process: asyncio.subprocess.Process
26 stderr: bytes | None
27
28
29class UpdateFailedException(Exception):
30 pass
31
32
33def eprint(*args: Any, **kwargs: Any) -> None:
34 print(*args, file=sys.stderr, **kwargs)
35
36
37async def check_subprocess_output(*args: str, **kwargs: Any) -> bytes:
38 """
39 Emulate check and capture_output arguments of subprocess.run function.
40 """
41 process = await asyncio.create_subprocess_exec(*args, **kwargs)
42 # We need to use communicate() instead of wait(), as the OS pipe buffers
43 # can fill up and cause a deadlock.
44 stdout, stderr = await process.communicate()
45
46 if process.returncode != 0:
47 error = CalledProcessError()
48 error.process = process
49 error.stderr = stderr
50
51 raise error
52
53 return stdout
54
55
56async def nix_instantiate(attr_path: str) -> Path:
57 out = await check_subprocess_output(
58 "nix-instantiate",
59 "-A",
60 attr_path,
61 stdout=asyncio.subprocess.PIPE,
62 stderr=asyncio.subprocess.PIPE,
63 )
64 drv = out.decode("utf-8").strip().split("!", 1)[0]
65
66 return Path(drv)
67
68
69async def nix_query_requisites(drv: Path) -> list[Path]:
70 requisites = await check_subprocess_output(
71 "nix-store",
72 "--query",
73 "--requisites",
74 str(drv),
75 stdout=asyncio.subprocess.PIPE,
76 stderr=asyncio.subprocess.PIPE,
77 )
78
79 drv_str = str(drv)
80
81 return [
82 Path(requisite)
83 for requisite in requisites.decode("utf-8").splitlines()
84 # Avoid self-loops.
85 if requisite != drv_str
86 ]
87
88
89async def attr_instantiation_worker(
90 semaphore: asyncio.Semaphore,
91 attr_path: str,
92) -> tuple[Path, str]:
93 async with semaphore:
94 eprint(f"Instantiating {attr_path}…")
95 try:
96 return (await nix_instantiate(attr_path), attr_path)
97 except Exception as e:
98 # Failure should normally terminate the script but
99 # looks like Python is buggy so we need to do it ourselves.
100 eprint(f"Failed to instantiate {attr_path}")
101 if e.stderr:
102 eprint(e.stderr.decode("utf-8"))
103 sys.exit(1)
104
105
106async def requisites_worker(
107 semaphore: asyncio.Semaphore,
108 drv: Path,
109) -> tuple[Path, list[Path]]:
110 async with semaphore:
111 eprint(f"Obtaining requisites for {drv}…")
112 return (drv, await nix_query_requisites(drv))
113
114
115def requisites_to_attrs(
116 drv_attr_paths: dict[Path, str],
117 requisites: list[Path],
118) -> set[str]:
119 """
120 Converts a set of requisite `.drv`s to a set of attribute paths.
121 Derivations that do not correspond to any of the packages we want to update will be discarded.
122 """
123 return {
124 drv_attr_paths[requisite]
125 for requisite in requisites
126 if requisite in drv_attr_paths
127 }
128
129
130def reverse_edges(graph: dict[str, set[str]]) -> dict[str, set[str]]:
131 """
132 Flips the edges of a directed graph.
133
134 Packages without any dependency relation in the updated set
135 will be added to `FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES` node.
136 """
137
138 reversed_graph: dict[str, set[str]] = {}
139 for dependent, dependencies in graph.items():
140 dependencies = dependencies or {FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES}
141 for dependency in dependencies:
142 reversed_graph.setdefault(dependency, set()).add(dependent)
143
144 return reversed_graph
145
146
147def get_independent_sorter(
148 packages: list[dict],
149) -> TopologicalSorter[str]:
150 """
151 Returns a sorter which treats all packages as independent,
152 which will allow them to be updated in parallel.
153 """
154
155 attr_deps: dict[str, set[str]] = {
156 package["attrPath"]: set() for package in packages
157 }
158 sorter = TopologicalSorter(attr_deps)
159 sorter.prepare()
160
161 return sorter
162
163
164async def get_topological_sorter(
165 max_workers: int,
166 packages: list[dict],
167 reverse_order: bool,
168) -> tuple[TopologicalSorter[str], list[dict]]:
169 """
170 Returns a sorter which returns packages in topological or reverse topological order,
171 which will ensure a package is updated before or after its dependencies, respectively.
172 """
173
174 semaphore = asyncio.Semaphore(max_workers)
175
176 drv_attr_paths = dict(
177 await asyncio.gather(
178 *(
179 attr_instantiation_worker(semaphore, package["attrPath"])
180 for package in packages
181 )
182 )
183 )
184
185 drv_requisites = await asyncio.gather(
186 *(requisites_worker(semaphore, drv) for drv in drv_attr_paths.keys())
187 )
188
189 attr_deps = {
190 drv_attr_paths[drv]: requisites_to_attrs(drv_attr_paths, requisites)
191 for drv, requisites in drv_requisites
192 }
193
194 if reverse_order:
195 attr_deps = reverse_edges(attr_deps)
196
197 # Adjust packages order based on the topological one
198 ordered = list(TopologicalSorter(attr_deps).static_order())
199 packages = sorted(packages, key=lambda package: ordered.index(package["attrPath"]))
200
201 sorter = TopologicalSorter(attr_deps)
202 sorter.prepare()
203
204 return sorter, packages
205
206
207async def run_update_script(
208 nixpkgs_root: str,
209 merge_lock: asyncio.Lock,
210 temp_dir: tuple[str, str] | None,
211 package: dict,
212 keep_going: bool,
213) -> None:
214 worktree: str | None = None
215
216 update_script_command = package["updateScript"]
217
218 if temp_dir is not None:
219 worktree, _branch = temp_dir
220
221 # Ensure the worktree is clean before update.
222 await check_subprocess_output(
223 "git",
224 "reset",
225 "--hard",
226 "--quiet",
227 "HEAD",
228 cwd=worktree,
229 )
230
231 # Update scripts can use $(dirname $0) to get their location but we want to run
232 # their clones in the git worktree, not in the main nixpkgs repo.
233 update_script_command = map(
234 lambda arg: re.sub(r"^{0}".format(re.escape(nixpkgs_root)), worktree, arg),
235 update_script_command,
236 )
237
238 eprint(f" - {package['name']}: UPDATING ...")
239
240 try:
241 update_info = await check_subprocess_output(
242 "env",
243 f"UPDATE_NIX_NAME={package['name']}",
244 f"UPDATE_NIX_PNAME={package['pname']}",
245 f"UPDATE_NIX_OLD_VERSION={package['oldVersion']}",
246 f"UPDATE_NIX_ATTR_PATH={package['attrPath']}",
247 # Run all update scripts in the Nixpkgs development shell to get access to formatters and co.
248 "nix-shell",
249 nixpkgs_root + "/shell.nix",
250 "--run",
251 " ".join([ shlex.quote(s) for s in update_script_command ]),
252 stdout=asyncio.subprocess.PIPE,
253 stderr=asyncio.subprocess.PIPE,
254 cwd=worktree,
255 )
256 await merge_changes(merge_lock, package, update_info, temp_dir)
257 except KeyboardInterrupt as e:
258 eprint("Cancelling…")
259 raise asyncio.exceptions.CancelledError()
260 except CalledProcessError as e:
261 eprint(f" - {package['name']}: ERROR")
262 if e.stderr is not None:
263 eprint()
264 eprint(
265 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------"
266 )
267 eprint()
268 eprint(e.stderr.decode("utf-8"))
269 with open(f"{package['pname']}.log", "wb") as logfile:
270 logfile.write(e.stderr)
271 eprint()
272 eprint(
273 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------"
274 )
275
276 if not keep_going:
277 raise UpdateFailedException(
278 f"The update script for {package['name']} failed with exit code {e.process.returncode}"
279 )
280
281
282@contextlib.contextmanager
283def make_worktree() -> Generator[tuple[str, str], None, None]:
284 with tempfile.TemporaryDirectory() as wt:
285 branch_name = f"update-{os.path.basename(wt)}"
286 target_directory = f"{wt}/nixpkgs"
287
288 subprocess.run(["git", "worktree", "add", "-b", branch_name, target_directory])
289 try:
290 yield (target_directory, branch_name)
291 finally:
292 subprocess.run(["git", "worktree", "remove", "--force", target_directory])
293 subprocess.run(["git", "branch", "-D", branch_name])
294
295
296async def commit_changes(
297 name: str,
298 merge_lock: asyncio.Lock,
299 worktree: str,
300 branch: str,
301 changes: list[dict],
302) -> None:
303 for change in changes:
304 # Git can only handle a single index operation at a time
305 async with merge_lock:
306 await check_subprocess_output("git", "add", *change["files"], cwd=worktree)
307 commit_message = "{attrPath}: {oldVersion} -> {newVersion}".format(**change)
308 if "commitMessage" in change:
309 commit_message = change["commitMessage"]
310 elif "commitBody" in change:
311 commit_message = commit_message + "\n\n" + change["commitBody"]
312 await check_subprocess_output(
313 "git",
314 "commit",
315 "--quiet",
316 "-m",
317 commit_message,
318 cwd=worktree,
319 )
320 await check_subprocess_output("git", "cherry-pick", branch)
321
322
323async def check_changes(
324 package: dict,
325 worktree: str,
326 update_info: bytes,
327) -> list[dict]:
328 if "commit" in package["supportedFeatures"]:
329 changes = json.loads(update_info)
330 else:
331 changes = [{}]
332
333 # Try to fill in missing attributes when there is just a single change.
334 if len(changes) == 1:
335 # Dynamic data from updater take precedence over static data from passthru.updateScript.
336 if "attrPath" not in changes[0]:
337 # update.nix is always passing attrPath
338 changes[0]["attrPath"] = package["attrPath"]
339
340 if "oldVersion" not in changes[0]:
341 # update.nix is always passing oldVersion
342 changes[0]["oldVersion"] = package["oldVersion"]
343
344 if "newVersion" not in changes[0]:
345 attr_path = changes[0]["attrPath"]
346 obtain_new_version_output = await check_subprocess_output(
347 "nix-instantiate",
348 "--expr",
349 f"with import ./. {{}}; lib.getVersion {attr_path}",
350 "--eval",
351 "--strict",
352 "--json",
353 stdout=asyncio.subprocess.PIPE,
354 stderr=asyncio.subprocess.PIPE,
355 cwd=worktree,
356 )
357 changes[0]["newVersion"] = json.loads(
358 obtain_new_version_output.decode("utf-8")
359 )
360
361 if "files" not in changes[0]:
362 changed_files_output = await check_subprocess_output(
363 "git",
364 "diff",
365 "--name-only",
366 "HEAD",
367 stdout=asyncio.subprocess.PIPE,
368 cwd=worktree,
369 )
370 changed_files = changed_files_output.splitlines()
371 changes[0]["files"] = changed_files
372
373 if len(changed_files) == 0:
374 return []
375
376 return changes
377
378
379async def merge_changes(
380 merge_lock: asyncio.Lock,
381 package: dict,
382 update_info: bytes,
383 temp_dir: tuple[str, str] | None,
384) -> None:
385 if temp_dir is not None:
386 worktree, branch = temp_dir
387 changes = await check_changes(package, worktree, update_info)
388
389 if len(changes) > 0:
390 await commit_changes(package["name"], merge_lock, worktree, branch, changes)
391 else:
392 eprint(f" - {package['name']}: DONE, no changes.")
393 else:
394 eprint(f" - {package['name']}: DONE.")
395
396
397async def updater(
398 nixpkgs_root: str,
399 temp_dir: tuple[str, str] | None,
400 merge_lock: asyncio.Lock,
401 packages_to_update: asyncio.Queue[dict | None],
402 keep_going: bool,
403 commit: bool,
404) -> None:
405 while True:
406 package = await packages_to_update.get()
407 if package is None:
408 # A sentinel received, we are done.
409 return
410
411 if not ("commit" in package["supportedFeatures"] or "attrPath" in package):
412 temp_dir = None
413
414 await run_update_script(nixpkgs_root, merge_lock, temp_dir, package, keep_going)
415
416 packages_to_update.task_done()
417
418
419async def populate_queue(
420 attr_packages: dict[str, dict],
421 sorter: TopologicalSorter[str],
422 packages_to_update: asyncio.Queue[dict | None],
423 num_workers: int,
424) -> None:
425 """
426 Keeps populating the queue with packages that can be updated
427 according to ordering requirements. If topological order
428 is used, the packages will appear in waves, as packages with
429 no dependencies are processed and removed from the sorter.
430 With `order="none"`, all packages will be enqueued simultaneously.
431 """
432
433 # Fill up an update queue,
434 while sorter.is_active():
435 ready_packages = list(sorter.get_ready())
436 eprint(f"Enqueuing group of {len(ready_packages)} packages")
437 for package in ready_packages:
438 if package == FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES:
439 continue
440 await packages_to_update.put(attr_packages[package])
441 await packages_to_update.join()
442 sorter.done(*ready_packages)
443
444 # Add sentinels, one for each worker.
445 # A worker will terminate when it gets a sentinel from the queue.
446 for i in range(num_workers):
447 await packages_to_update.put(None)
448
449
450async def start_updates(
451 max_workers: int,
452 keep_going: bool,
453 commit: bool,
454 attr_packages: dict[str, dict],
455 sorter: TopologicalSorter[str],
456) -> None:
457 merge_lock = asyncio.Lock()
458 packages_to_update: asyncio.Queue[dict | None] = asyncio.Queue()
459
460 with contextlib.ExitStack() as stack:
461 temp_dirs: list[tuple[str, str] | None] = []
462
463 # Do not create more workers than there are packages.
464 num_workers = min(max_workers, len(attr_packages))
465
466 nixpkgs_root_output = await check_subprocess_output(
467 "git",
468 "rev-parse",
469 "--show-toplevel",
470 stdout=asyncio.subprocess.PIPE,
471 )
472 nixpkgs_root = nixpkgs_root_output.decode("utf-8").strip()
473
474 # Set up temporary directories when using auto-commit.
475 for i in range(num_workers):
476 temp_dir = stack.enter_context(make_worktree()) if commit else None
477 temp_dirs.append(temp_dir)
478
479 queue_task = populate_queue(
480 attr_packages,
481 sorter,
482 packages_to_update,
483 num_workers,
484 )
485
486 # Prepare updater workers for each temp_dir directory.
487 # At most `num_workers` instances of `run_update_script` will be running at one time.
488 updater_tasks = [
489 updater(
490 nixpkgs_root,
491 temp_dir,
492 merge_lock,
493 packages_to_update,
494 keep_going,
495 commit,
496 )
497 for temp_dir in temp_dirs
498 ]
499
500 tasks = asyncio.gather(
501 *updater_tasks,
502 queue_task,
503 )
504
505 try:
506 # Start updater workers.
507 await tasks
508 except asyncio.exceptions.CancelledError:
509 # When one worker is cancelled, cancel the others too.
510 tasks.cancel()
511 except UpdateFailedException as e:
512 # When one worker fails, cancel the others, as this exception is only thrown when keep_going is false.
513 tasks.cancel()
514 eprint(e)
515 sys.exit(1)
516
517
518async def main(
519 max_workers: int,
520 keep_going: bool,
521 commit: bool,
522 packages_path: str,
523 skip_prompt: bool,
524 order: Order,
525) -> None:
526 with open(packages_path) as f:
527 packages = json.load(f)
528
529 if order != "arbitrary":
530 eprint("Sorting packages…")
531 reverse_order = order == "reverse-topological"
532 sorter, packages = await get_topological_sorter(
533 max_workers,
534 packages,
535 reverse_order,
536 )
537 else:
538 sorter = get_independent_sorter(packages)
539
540 attr_packages = {package["attrPath"]: package for package in packages}
541
542 eprint()
543 eprint("Going to be running update for following packages:")
544 for package in packages:
545 eprint(f" - {package['name']}")
546 eprint()
547
548 confirm = "" if skip_prompt else input("Press Enter key to continue...")
549
550 if confirm == "":
551 eprint()
552 eprint("Running update for:")
553
554 await start_updates(max_workers, keep_going, commit, attr_packages, sorter)
555
556 eprint()
557 eprint("Packages updated!")
558 sys.exit()
559 else:
560 eprint("Aborting!")
561 sys.exit(130)
562
563
564parser = argparse.ArgumentParser(description="Update packages")
565parser.add_argument(
566 "--max-workers",
567 "-j",
568 dest="max_workers",
569 type=int,
570 help="Number of updates to run concurrently",
571 nargs="?",
572 default=4,
573)
574parser.add_argument(
575 "--keep-going",
576 "-k",
577 dest="keep_going",
578 action="store_true",
579 help="Do not stop after first failure",
580)
581parser.add_argument(
582 "--commit",
583 "-c",
584 dest="commit",
585 action="store_true",
586 help="Commit the changes",
587)
588parser.add_argument(
589 "--order",
590 dest="order",
591 default="arbitrary",
592 choices=["arbitrary", "reverse-topological", "topological"],
593 help="Sort the packages based on dependency relation",
594)
595parser.add_argument(
596 "packages",
597 help="JSON file containing the list of package names and their update scripts",
598)
599parser.add_argument(
600 "--skip-prompt",
601 "-s",
602 dest="skip_prompt",
603 action="store_true",
604 help="Do not stop for prompts",
605)
606
607if __name__ == "__main__":
608 args = parser.parse_args()
609
610 try:
611 asyncio.run(
612 main(
613 args.max_workers,
614 args.keep_going,
615 args.commit,
616 args.packages,
617 args.skip_prompt,
618 args.order,
619 )
620 )
621 except KeyboardInterrupt as e:
622 # Let’s cancel outside of the main loop too.
623 sys.exit(130)