nixpkgs mirror (for testing) github.com/NixOS/nixpkgs
nix
at python-updates 623 lines 18 kB view raw
1from graphlib import TopologicalSorter 2from pathlib import Path 3from typing import Any, Final, Generator, Literal 4import argparse 5import asyncio 6import contextlib 7import json 8import os 9import re 10import shlex 11import subprocess 12import sys 13import tempfile 14 15 16Order = Literal["arbitrary", "reverse-topological", "topological"] 17 18 19FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES: Final[str] = ( 20 "::fake_dependency_for_independent_packages" 21) 22 23 24class CalledProcessError(Exception): 25 process: asyncio.subprocess.Process 26 stderr: bytes | None 27 28 29class UpdateFailedException(Exception): 30 pass 31 32 33def eprint(*args: Any, **kwargs: Any) -> None: 34 print(*args, file=sys.stderr, **kwargs) 35 36 37async def check_subprocess_output(*args: str, **kwargs: Any) -> bytes: 38 """ 39 Emulate check and capture_output arguments of subprocess.run function. 40 """ 41 process = await asyncio.create_subprocess_exec(*args, **kwargs) 42 # We need to use communicate() instead of wait(), as the OS pipe buffers 43 # can fill up and cause a deadlock. 44 stdout, stderr = await process.communicate() 45 46 if process.returncode != 0: 47 error = CalledProcessError() 48 error.process = process 49 error.stderr = stderr 50 51 raise error 52 53 return stdout 54 55 56async def nix_instantiate(attr_path: str) -> Path: 57 out = await check_subprocess_output( 58 "nix-instantiate", 59 "-A", 60 attr_path, 61 stdout=asyncio.subprocess.PIPE, 62 stderr=asyncio.subprocess.PIPE, 63 ) 64 drv = out.decode("utf-8").strip().split("!", 1)[0] 65 66 return Path(drv) 67 68 69async def nix_query_requisites(drv: Path) -> list[Path]: 70 requisites = await check_subprocess_output( 71 "nix-store", 72 "--query", 73 "--requisites", 74 str(drv), 75 stdout=asyncio.subprocess.PIPE, 76 stderr=asyncio.subprocess.PIPE, 77 ) 78 79 drv_str = str(drv) 80 81 return [ 82 Path(requisite) 83 for requisite in requisites.decode("utf-8").splitlines() 84 # Avoid self-loops. 85 if requisite != drv_str 86 ] 87 88 89async def attr_instantiation_worker( 90 semaphore: asyncio.Semaphore, 91 attr_path: str, 92) -> tuple[Path, str]: 93 async with semaphore: 94 eprint(f"Instantiating {attr_path}") 95 try: 96 return (await nix_instantiate(attr_path), attr_path) 97 except Exception as e: 98 # Failure should normally terminate the script but 99 # looks like Python is buggy so we need to do it ourselves. 100 eprint(f"Failed to instantiate {attr_path}") 101 if e.stderr: 102 eprint(e.stderr.decode("utf-8")) 103 sys.exit(1) 104 105 106async def requisites_worker( 107 semaphore: asyncio.Semaphore, 108 drv: Path, 109) -> tuple[Path, list[Path]]: 110 async with semaphore: 111 eprint(f"Obtaining requisites for {drv}") 112 return (drv, await nix_query_requisites(drv)) 113 114 115def requisites_to_attrs( 116 drv_attr_paths: dict[Path, str], 117 requisites: list[Path], 118) -> set[str]: 119 """ 120 Converts a set of requisite `.drv`s to a set of attribute paths. 121 Derivations that do not correspond to any of the packages we want to update will be discarded. 122 """ 123 return { 124 drv_attr_paths[requisite] 125 for requisite in requisites 126 if requisite in drv_attr_paths 127 } 128 129 130def reverse_edges(graph: dict[str, set[str]]) -> dict[str, set[str]]: 131 """ 132 Flips the edges of a directed graph. 133 134 Packages without any dependency relation in the updated set 135 will be added to `FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES` node. 136 """ 137 138 reversed_graph: dict[str, set[str]] = {} 139 for dependent, dependencies in graph.items(): 140 dependencies = dependencies or {FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES} 141 for dependency in dependencies: 142 reversed_graph.setdefault(dependency, set()).add(dependent) 143 144 return reversed_graph 145 146 147def get_independent_sorter( 148 packages: list[dict], 149) -> TopologicalSorter[str]: 150 """ 151 Returns a sorter which treats all packages as independent, 152 which will allow them to be updated in parallel. 153 """ 154 155 attr_deps: dict[str, set[str]] = { 156 package["attrPath"]: set() for package in packages 157 } 158 sorter = TopologicalSorter(attr_deps) 159 sorter.prepare() 160 161 return sorter 162 163 164async def get_topological_sorter( 165 max_workers: int, 166 packages: list[dict], 167 reverse_order: bool, 168) -> tuple[TopologicalSorter[str], list[dict]]: 169 """ 170 Returns a sorter which returns packages in topological or reverse topological order, 171 which will ensure a package is updated before or after its dependencies, respectively. 172 """ 173 174 semaphore = asyncio.Semaphore(max_workers) 175 176 drv_attr_paths = dict( 177 await asyncio.gather( 178 *( 179 attr_instantiation_worker(semaphore, package["attrPath"]) 180 for package in packages 181 ) 182 ) 183 ) 184 185 drv_requisites = await asyncio.gather( 186 *(requisites_worker(semaphore, drv) for drv in drv_attr_paths.keys()) 187 ) 188 189 attr_deps = { 190 drv_attr_paths[drv]: requisites_to_attrs(drv_attr_paths, requisites) 191 for drv, requisites in drv_requisites 192 } 193 194 if reverse_order: 195 attr_deps = reverse_edges(attr_deps) 196 197 # Adjust packages order based on the topological one 198 ordered = list(TopologicalSorter(attr_deps).static_order()) 199 packages = sorted(packages, key=lambda package: ordered.index(package["attrPath"])) 200 201 sorter = TopologicalSorter(attr_deps) 202 sorter.prepare() 203 204 return sorter, packages 205 206 207async def run_update_script( 208 nixpkgs_root: str, 209 merge_lock: asyncio.Lock, 210 temp_dir: tuple[str, str] | None, 211 package: dict, 212 keep_going: bool, 213) -> None: 214 worktree: str | None = None 215 216 update_script_command = package["updateScript"] 217 218 if temp_dir is not None: 219 worktree, _branch = temp_dir 220 221 # Ensure the worktree is clean before update. 222 await check_subprocess_output( 223 "git", 224 "reset", 225 "--hard", 226 "--quiet", 227 "HEAD", 228 cwd=worktree, 229 ) 230 231 # Update scripts can use $(dirname $0) to get their location but we want to run 232 # their clones in the git worktree, not in the main nixpkgs repo. 233 update_script_command = map( 234 lambda arg: re.sub(r"^{0}".format(re.escape(nixpkgs_root)), worktree, arg), 235 update_script_command, 236 ) 237 238 eprint(f" - {package['name']}: UPDATING ...") 239 240 try: 241 update_info = await check_subprocess_output( 242 "env", 243 f"UPDATE_NIX_NAME={package['name']}", 244 f"UPDATE_NIX_PNAME={package['pname']}", 245 f"UPDATE_NIX_OLD_VERSION={package['oldVersion']}", 246 f"UPDATE_NIX_ATTR_PATH={package['attrPath']}", 247 # Run all update scripts in the Nixpkgs development shell to get access to formatters and co. 248 "nix-shell", 249 nixpkgs_root + "/shell.nix", 250 "--run", 251 " ".join([ shlex.quote(s) for s in update_script_command ]), 252 stdout=asyncio.subprocess.PIPE, 253 stderr=asyncio.subprocess.PIPE, 254 cwd=worktree, 255 ) 256 await merge_changes(merge_lock, package, update_info, temp_dir) 257 except KeyboardInterrupt as e: 258 eprint("Cancelling…") 259 raise asyncio.exceptions.CancelledError() 260 except CalledProcessError as e: 261 eprint(f" - {package['name']}: ERROR") 262 if e.stderr is not None: 263 eprint() 264 eprint( 265 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------" 266 ) 267 eprint() 268 eprint(e.stderr.decode("utf-8")) 269 with open(f"{package['pname']}.log", "wb") as logfile: 270 logfile.write(e.stderr) 271 eprint() 272 eprint( 273 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------" 274 ) 275 276 if not keep_going: 277 raise UpdateFailedException( 278 f"The update script for {package['name']} failed with exit code {e.process.returncode}" 279 ) 280 281 282@contextlib.contextmanager 283def make_worktree() -> Generator[tuple[str, str], None, None]: 284 with tempfile.TemporaryDirectory() as wt: 285 branch_name = f"update-{os.path.basename(wt)}" 286 target_directory = f"{wt}/nixpkgs" 287 288 subprocess.run(["git", "worktree", "add", "-b", branch_name, target_directory]) 289 try: 290 yield (target_directory, branch_name) 291 finally: 292 subprocess.run(["git", "worktree", "remove", "--force", target_directory]) 293 subprocess.run(["git", "branch", "-D", branch_name]) 294 295 296async def commit_changes( 297 name: str, 298 merge_lock: asyncio.Lock, 299 worktree: str, 300 branch: str, 301 changes: list[dict], 302) -> None: 303 for change in changes: 304 # Git can only handle a single index operation at a time 305 async with merge_lock: 306 await check_subprocess_output("git", "add", *change["files"], cwd=worktree) 307 commit_message = "{attrPath}: {oldVersion} -> {newVersion}".format(**change) 308 if "commitMessage" in change: 309 commit_message = change["commitMessage"] 310 elif "commitBody" in change: 311 commit_message = commit_message + "\n\n" + change["commitBody"] 312 await check_subprocess_output( 313 "git", 314 "commit", 315 "--quiet", 316 "-m", 317 commit_message, 318 cwd=worktree, 319 ) 320 await check_subprocess_output("git", "cherry-pick", branch) 321 322 323async def check_changes( 324 package: dict, 325 worktree: str, 326 update_info: bytes, 327) -> list[dict]: 328 if "commit" in package["supportedFeatures"]: 329 changes = json.loads(update_info) 330 else: 331 changes = [{}] 332 333 # Try to fill in missing attributes when there is just a single change. 334 if len(changes) == 1: 335 # Dynamic data from updater take precedence over static data from passthru.updateScript. 336 if "attrPath" not in changes[0]: 337 # update.nix is always passing attrPath 338 changes[0]["attrPath"] = package["attrPath"] 339 340 if "oldVersion" not in changes[0]: 341 # update.nix is always passing oldVersion 342 changes[0]["oldVersion"] = package["oldVersion"] 343 344 if "newVersion" not in changes[0]: 345 attr_path = changes[0]["attrPath"] 346 obtain_new_version_output = await check_subprocess_output( 347 "nix-instantiate", 348 "--expr", 349 f"with import ./. {{}}; lib.getVersion {attr_path}", 350 "--eval", 351 "--strict", 352 "--json", 353 stdout=asyncio.subprocess.PIPE, 354 stderr=asyncio.subprocess.PIPE, 355 cwd=worktree, 356 ) 357 changes[0]["newVersion"] = json.loads( 358 obtain_new_version_output.decode("utf-8") 359 ) 360 361 if "files" not in changes[0]: 362 changed_files_output = await check_subprocess_output( 363 "git", 364 "diff", 365 "--name-only", 366 "HEAD", 367 stdout=asyncio.subprocess.PIPE, 368 cwd=worktree, 369 ) 370 changed_files = changed_files_output.splitlines() 371 changes[0]["files"] = changed_files 372 373 if len(changed_files) == 0: 374 return [] 375 376 return changes 377 378 379async def merge_changes( 380 merge_lock: asyncio.Lock, 381 package: dict, 382 update_info: bytes, 383 temp_dir: tuple[str, str] | None, 384) -> None: 385 if temp_dir is not None: 386 worktree, branch = temp_dir 387 changes = await check_changes(package, worktree, update_info) 388 389 if len(changes) > 0: 390 await commit_changes(package["name"], merge_lock, worktree, branch, changes) 391 else: 392 eprint(f" - {package['name']}: DONE, no changes.") 393 else: 394 eprint(f" - {package['name']}: DONE.") 395 396 397async def updater( 398 nixpkgs_root: str, 399 temp_dir: tuple[str, str] | None, 400 merge_lock: asyncio.Lock, 401 packages_to_update: asyncio.Queue[dict | None], 402 keep_going: bool, 403 commit: bool, 404) -> None: 405 while True: 406 package = await packages_to_update.get() 407 if package is None: 408 # A sentinel received, we are done. 409 return 410 411 if not ("commit" in package["supportedFeatures"] or "attrPath" in package): 412 temp_dir = None 413 414 await run_update_script(nixpkgs_root, merge_lock, temp_dir, package, keep_going) 415 416 packages_to_update.task_done() 417 418 419async def populate_queue( 420 attr_packages: dict[str, dict], 421 sorter: TopologicalSorter[str], 422 packages_to_update: asyncio.Queue[dict | None], 423 num_workers: int, 424) -> None: 425 """ 426 Keeps populating the queue with packages that can be updated 427 according to ordering requirements. If topological order 428 is used, the packages will appear in waves, as packages with 429 no dependencies are processed and removed from the sorter. 430 With `order="none"`, all packages will be enqueued simultaneously. 431 """ 432 433 # Fill up an update queue, 434 while sorter.is_active(): 435 ready_packages = list(sorter.get_ready()) 436 eprint(f"Enqueuing group of {len(ready_packages)} packages") 437 for package in ready_packages: 438 if package == FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES: 439 continue 440 await packages_to_update.put(attr_packages[package]) 441 await packages_to_update.join() 442 sorter.done(*ready_packages) 443 444 # Add sentinels, one for each worker. 445 # A worker will terminate when it gets a sentinel from the queue. 446 for i in range(num_workers): 447 await packages_to_update.put(None) 448 449 450async def start_updates( 451 max_workers: int, 452 keep_going: bool, 453 commit: bool, 454 attr_packages: dict[str, dict], 455 sorter: TopologicalSorter[str], 456) -> None: 457 merge_lock = asyncio.Lock() 458 packages_to_update: asyncio.Queue[dict | None] = asyncio.Queue() 459 460 with contextlib.ExitStack() as stack: 461 temp_dirs: list[tuple[str, str] | None] = [] 462 463 # Do not create more workers than there are packages. 464 num_workers = min(max_workers, len(attr_packages)) 465 466 nixpkgs_root_output = await check_subprocess_output( 467 "git", 468 "rev-parse", 469 "--show-toplevel", 470 stdout=asyncio.subprocess.PIPE, 471 ) 472 nixpkgs_root = nixpkgs_root_output.decode("utf-8").strip() 473 474 # Set up temporary directories when using auto-commit. 475 for i in range(num_workers): 476 temp_dir = stack.enter_context(make_worktree()) if commit else None 477 temp_dirs.append(temp_dir) 478 479 queue_task = populate_queue( 480 attr_packages, 481 sorter, 482 packages_to_update, 483 num_workers, 484 ) 485 486 # Prepare updater workers for each temp_dir directory. 487 # At most `num_workers` instances of `run_update_script` will be running at one time. 488 updater_tasks = [ 489 updater( 490 nixpkgs_root, 491 temp_dir, 492 merge_lock, 493 packages_to_update, 494 keep_going, 495 commit, 496 ) 497 for temp_dir in temp_dirs 498 ] 499 500 tasks = asyncio.gather( 501 *updater_tasks, 502 queue_task, 503 ) 504 505 try: 506 # Start updater workers. 507 await tasks 508 except asyncio.exceptions.CancelledError: 509 # When one worker is cancelled, cancel the others too. 510 tasks.cancel() 511 except UpdateFailedException as e: 512 # When one worker fails, cancel the others, as this exception is only thrown when keep_going is false. 513 tasks.cancel() 514 eprint(e) 515 sys.exit(1) 516 517 518async def main( 519 max_workers: int, 520 keep_going: bool, 521 commit: bool, 522 packages_path: str, 523 skip_prompt: bool, 524 order: Order, 525) -> None: 526 with open(packages_path) as f: 527 packages = json.load(f) 528 529 if order != "arbitrary": 530 eprint("Sorting packages…") 531 reverse_order = order == "reverse-topological" 532 sorter, packages = await get_topological_sorter( 533 max_workers, 534 packages, 535 reverse_order, 536 ) 537 else: 538 sorter = get_independent_sorter(packages) 539 540 attr_packages = {package["attrPath"]: package for package in packages} 541 542 eprint() 543 eprint("Going to be running update for following packages:") 544 for package in packages: 545 eprint(f" - {package['name']}") 546 eprint() 547 548 confirm = "" if skip_prompt else input("Press Enter key to continue...") 549 550 if confirm == "": 551 eprint() 552 eprint("Running update for:") 553 554 await start_updates(max_workers, keep_going, commit, attr_packages, sorter) 555 556 eprint() 557 eprint("Packages updated!") 558 sys.exit() 559 else: 560 eprint("Aborting!") 561 sys.exit(130) 562 563 564parser = argparse.ArgumentParser(description="Update packages") 565parser.add_argument( 566 "--max-workers", 567 "-j", 568 dest="max_workers", 569 type=int, 570 help="Number of updates to run concurrently", 571 nargs="?", 572 default=4, 573) 574parser.add_argument( 575 "--keep-going", 576 "-k", 577 dest="keep_going", 578 action="store_true", 579 help="Do not stop after first failure", 580) 581parser.add_argument( 582 "--commit", 583 "-c", 584 dest="commit", 585 action="store_true", 586 help="Commit the changes", 587) 588parser.add_argument( 589 "--order", 590 dest="order", 591 default="arbitrary", 592 choices=["arbitrary", "reverse-topological", "topological"], 593 help="Sort the packages based on dependency relation", 594) 595parser.add_argument( 596 "packages", 597 help="JSON file containing the list of package names and their update scripts", 598) 599parser.add_argument( 600 "--skip-prompt", 601 "-s", 602 dest="skip_prompt", 603 action="store_true", 604 help="Do not stop for prompts", 605) 606 607if __name__ == "__main__": 608 args = parser.parse_args() 609 610 try: 611 asyncio.run( 612 main( 613 args.max_workers, 614 args.keep_going, 615 args.commit, 616 args.packages, 617 args.skip_prompt, 618 args.order, 619 ) 620 ) 621 except KeyboardInterrupt as e: 622 # Let’s cancel outside of the main loop too. 623 sys.exit(130)