Clone of https://github.com/NixOS/nixpkgs.git (to stress-test knotserver)
1from __future__ import annotations 2from typing import Dict, Generator, List, Optional, Tuple 3import argparse 4import asyncio 5import contextlib 6import json 7import os 8import re 9import subprocess 10import sys 11import tempfile 12 13class CalledProcessError(Exception): 14 process: asyncio.subprocess.Process 15 16class UpdateFailedException(Exception): 17 pass 18 19def eprint(*args, **kwargs): 20 print(*args, file=sys.stderr, **kwargs) 21 22async def check_subprocess(*args, **kwargs): 23 """ 24 Emulate check argument of subprocess.run function. 25 """ 26 process = await asyncio.create_subprocess_exec(*args, **kwargs) 27 returncode = await process.wait() 28 29 if returncode != 0: 30 error = CalledProcessError() 31 error.process = process 32 33 raise error 34 35 return process 36 37async def run_update_script(nixpkgs_root: str, merge_lock: asyncio.Lock, temp_dir: Optional[Tuple[str, str]], package: Dict, keep_going: bool): 38 worktree: Optional[str] = None 39 40 update_script_command = package['updateScript'] 41 42 if temp_dir is not None: 43 worktree, _branch = temp_dir 44 45 # Ensure the worktree is clean before update. 46 await check_subprocess('git', 'reset', '--hard', '--quiet', 'HEAD', cwd=worktree) 47 48 # Update scripts can use $(dirname $0) to get their location but we want to run 49 # their clones in the git worktree, not in the main nixpkgs repo. 50 update_script_command = map(lambda arg: re.sub(r'^{0}'.format(re.escape(nixpkgs_root)), worktree, arg), update_script_command) 51 52 eprint(f" - {package['name']}: UPDATING ...") 53 54 try: 55 update_process = await check_subprocess( 56 'env', 57 f"UPDATE_NIX_NAME={package['name']}", 58 f"UPDATE_NIX_PNAME={package['pname']}", 59 f"UPDATE_NIX_OLD_VERSION={package['oldVersion']}", 60 f"UPDATE_NIX_ATTR_PATH={package['attrPath']}", 61 *update_script_command, 62 stdout=asyncio.subprocess.PIPE, 63 stderr=asyncio.subprocess.PIPE, 64 cwd=worktree, 65 ) 66 update_info = await update_process.stdout.read() 67 68 await merge_changes(merge_lock, package, update_info, temp_dir) 69 except KeyboardInterrupt as e: 70 eprint('Cancelling…') 71 raise asyncio.exceptions.CancelledError() 72 except CalledProcessError as e: 73 eprint(f" - {package['name']}: ERROR") 74 eprint() 75 eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------") 76 eprint() 77 stderr = await e.process.stderr.read() 78 eprint(stderr.decode('utf-8')) 79 with open(f"{package['pname']}.log", 'wb') as logfile: 80 logfile.write(stderr) 81 eprint() 82 eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------") 83 84 if not keep_going: 85 raise UpdateFailedException(f"The update script for {package['name']} failed with exit code {e.process.returncode}") 86 87@contextlib.contextmanager 88def make_worktree() -> Generator[Tuple[str, str], None, None]: 89 with tempfile.TemporaryDirectory() as wt: 90 branch_name = f'update-{os.path.basename(wt)}' 91 target_directory = f'{wt}/nixpkgs' 92 93 subprocess.run(['git', 'worktree', 'add', '-b', branch_name, target_directory]) 94 yield (target_directory, branch_name) 95 subprocess.run(['git', 'worktree', 'remove', '--force', target_directory]) 96 subprocess.run(['git', 'branch', '-D', branch_name]) 97 98async def commit_changes(name: str, merge_lock: asyncio.Lock, worktree: str, branch: str, changes: List[Dict]) -> None: 99 for change in changes: 100 # Git can only handle a single index operation at a time 101 async with merge_lock: 102 await check_subprocess('git', 'add', *change['files'], cwd=worktree) 103 commit_message = '{attrPath}: {oldVersion} -> {newVersion}'.format(**change) 104 if 'commitMessage' in change: 105 commit_message = change['commitMessage'] 106 elif 'commitBody' in change: 107 commit_message = commit_message + '\n\n' + change['commitBody'] 108 await check_subprocess('git', 'commit', '--quiet', '-m', commit_message, cwd=worktree) 109 await check_subprocess('git', 'cherry-pick', branch) 110 111async def check_changes(package: Dict, worktree: str, update_info: str): 112 if 'commit' in package['supportedFeatures']: 113 changes = json.loads(update_info) 114 else: 115 changes = [{}] 116 117 # Try to fill in missing attributes when there is just a single change. 118 if len(changes) == 1: 119 # Dynamic data from updater take precedence over static data from passthru.updateScript. 120 if 'attrPath' not in changes[0]: 121 # update.nix is always passing attrPath 122 changes[0]['attrPath'] = package['attrPath'] 123 124 if 'oldVersion' not in changes[0]: 125 # update.nix is always passing oldVersion 126 changes[0]['oldVersion'] = package['oldVersion'] 127 128 if 'newVersion' not in changes[0]: 129 attr_path = changes[0]['attrPath'] 130 obtain_new_version_process = await check_subprocess('nix-instantiate', '--expr', f'with import ./. {{}}; lib.getVersion {attr_path}', '--eval', '--strict', '--json', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=worktree) 131 changes[0]['newVersion'] = json.loads((await obtain_new_version_process.stdout.read()).decode('utf-8')) 132 133 if 'files' not in changes[0]: 134 changed_files_process = await check_subprocess('git', 'diff', '--name-only', 'HEAD', stdout=asyncio.subprocess.PIPE, cwd=worktree) 135 changed_files = (await changed_files_process.stdout.read()).splitlines() 136 changes[0]['files'] = changed_files 137 138 if len(changed_files) == 0: 139 return [] 140 141 return changes 142 143async def merge_changes(merge_lock: asyncio.Lock, package: Dict, update_info: str, temp_dir: Optional[Tuple[str, str]]) -> None: 144 if temp_dir is not None: 145 worktree, branch = temp_dir 146 changes = await check_changes(package, worktree, update_info) 147 148 if len(changes) > 0: 149 await commit_changes(package['name'], merge_lock, worktree, branch, changes) 150 else: 151 eprint(f" - {package['name']}: DONE, no changes.") 152 else: 153 eprint(f" - {package['name']}: DONE.") 154 155async def updater(nixpkgs_root: str, temp_dir: Optional[Tuple[str, str]], merge_lock: asyncio.Lock, packages_to_update: asyncio.Queue[Optional[Dict]], keep_going: bool, commit: bool): 156 while True: 157 package = await packages_to_update.get() 158 if package is None: 159 # A sentinel received, we are done. 160 return 161 162 if not ('commit' in package['supportedFeatures'] or 'attrPath' in package): 163 temp_dir = None 164 165 await run_update_script(nixpkgs_root, merge_lock, temp_dir, package, keep_going) 166 167async def start_updates(max_workers: int, keep_going: bool, commit: bool, packages: List[Dict]): 168 merge_lock = asyncio.Lock() 169 packages_to_update: asyncio.Queue[Optional[Dict]] = asyncio.Queue() 170 171 with contextlib.ExitStack() as stack: 172 temp_dirs: List[Optional[Tuple[str, str]]] = [] 173 174 # Do not create more workers than there are packages. 175 num_workers = min(max_workers, len(packages)) 176 177 nixpkgs_root_process = await check_subprocess('git', 'rev-parse', '--show-toplevel', stdout=asyncio.subprocess.PIPE) 178 nixpkgs_root = (await nixpkgs_root_process.stdout.read()).decode('utf-8').strip() 179 180 # Set up temporary directories when using auto-commit. 181 for i in range(num_workers): 182 temp_dir = stack.enter_context(make_worktree()) if commit else None 183 temp_dirs.append(temp_dir) 184 185 # Fill up an update queue, 186 for package in packages: 187 await packages_to_update.put(package) 188 189 # Add sentinels, one for each worker. 190 # A workers will terminate when it gets sentinel from the queue. 191 for i in range(num_workers): 192 await packages_to_update.put(None) 193 194 # Prepare updater workers for each temp_dir directory. 195 # At most `num_workers` instances of `run_update_script` will be running at one time. 196 updaters = asyncio.gather(*[updater(nixpkgs_root, temp_dir, merge_lock, packages_to_update, keep_going, commit) for temp_dir in temp_dirs]) 197 198 try: 199 # Start updater workers. 200 await updaters 201 except asyncio.exceptions.CancelledError: 202 # When one worker is cancelled, cancel the others too. 203 updaters.cancel() 204 except UpdateFailedException as e: 205 # When one worker fails, cancel the others, as this exception is only thrown when keep_going is false. 206 updaters.cancel() 207 eprint(e) 208 sys.exit(1) 209 210def main(max_workers: int, keep_going: bool, commit: bool, packages_path: str) -> None: 211 with open(packages_path) as f: 212 packages = json.load(f) 213 214 eprint() 215 eprint('Going to be running update for following packages:') 216 for package in packages: 217 eprint(f" - {package['name']}") 218 eprint() 219 220 confirm = input('Press Enter key to continue...') 221 if confirm == '': 222 eprint() 223 eprint('Running update for:') 224 225 asyncio.run(start_updates(max_workers, keep_going, commit, packages)) 226 227 eprint() 228 eprint('Packages updated!') 229 sys.exit() 230 else: 231 eprint('Aborting!') 232 sys.exit(130) 233 234parser = argparse.ArgumentParser(description='Update packages') 235parser.add_argument('--max-workers', '-j', dest='max_workers', type=int, help='Number of updates to run concurrently', nargs='?', default=4) 236parser.add_argument('--keep-going', '-k', dest='keep_going', action='store_true', help='Do not stop after first failure') 237parser.add_argument('--commit', '-c', dest='commit', action='store_true', help='Commit the changes') 238parser.add_argument('packages', help='JSON file containing the list of package names and their update scripts') 239 240if __name__ == '__main__': 241 args = parser.parse_args() 242 243 try: 244 main(args.max_workers, args.keep_going, args.commit, args.packages) 245 except KeyboardInterrupt as e: 246 # Let’s cancel outside of the main loop too. 247 sys.exit(130)