Clone of https://github.com/NixOS/nixpkgs.git (to stress-test knotserver)
at 20.09 373 lines 11 kB view raw
1#!/usr/bin/env python3 2 3""" 4Update a Python package expression by passing in the `.nix` file, or the directory containing it. 5You can pass in multiple files or paths. 6 7You'll likely want to use 8`` 9 $ ./update-python-libraries ../../pkgs/development/python-modules/**/default.nix 10`` 11to update all non-pinned libraries in that folder. 12""" 13 14import argparse 15import logging 16import os 17import re 18import requests 19import toolz 20from concurrent.futures import ThreadPoolExecutor as Pool 21from packaging.version import Version as _Version 22from packaging.version import InvalidVersion 23from packaging.specifiers import SpecifierSet 24import collections 25import subprocess 26 27INDEX = "https://pypi.io/pypi" 28"""url of PyPI""" 29 30EXTENSIONS = ['tar.gz', 'tar.bz2', 'tar', 'zip', '.whl'] 31"""Permitted file extensions. These are evaluated from left to right and the first occurance is returned.""" 32 33PRERELEASES = False 34 35GIT = "git" 36 37import logging 38logging.basicConfig(level=logging.INFO) 39 40 41class Version(_Version, collections.abc.Sequence): 42 43 def __init__(self, version): 44 super().__init__(version) 45 # We cannot use `str(Version(0.04.21))` because that becomes `0.4.21` 46 # https://github.com/avian2/unidecode/issues/13#issuecomment-354538882 47 self.raw_version = version 48 49 def __getitem__(self, i): 50 return self._version.release[i] 51 52 def __len__(self): 53 return len(self._version.release) 54 55 def __iter__(self): 56 yield from self._version.release 57 58 59def _get_values(attribute, text): 60 """Match attribute in text and return all matches. 61 62 :returns: List of matches. 63 """ 64 regex = '{}\s+=\s+"(.*)";'.format(attribute) 65 regex = re.compile(regex) 66 values = regex.findall(text) 67 return values 68 69def _get_unique_value(attribute, text): 70 """Match attribute in text and return unique match. 71 72 :returns: Single match. 73 """ 74 values = _get_values(attribute, text) 75 n = len(values) 76 if n > 1: 77 raise ValueError("found too many values for {}".format(attribute)) 78 elif n == 1: 79 return values[0] 80 else: 81 raise ValueError("no value found for {}".format(attribute)) 82 83def _get_line_and_value(attribute, text): 84 """Match attribute in text. Return the line and the value of the attribute.""" 85 regex = '({}\s+=\s+"(.*)";)'.format(attribute) 86 regex = re.compile(regex) 87 value = regex.findall(text) 88 n = len(value) 89 if n > 1: 90 raise ValueError("found too many values for {}".format(attribute)) 91 elif n == 1: 92 return value[0] 93 else: 94 raise ValueError("no value found for {}".format(attribute)) 95 96 97def _replace_value(attribute, value, text): 98 """Search and replace value of attribute in text.""" 99 old_line, old_value = _get_line_and_value(attribute, text) 100 new_line = old_line.replace(old_value, value) 101 new_text = text.replace(old_line, new_line) 102 return new_text 103 104def _fetch_page(url): 105 r = requests.get(url) 106 if r.status_code == requests.codes.ok: 107 return r.json() 108 else: 109 raise ValueError("request for {} failed".format(url)) 110 111 112SEMVER = { 113 'major' : 0, 114 'minor' : 1, 115 'patch' : 2, 116} 117 118 119def _determine_latest_version(current_version, target, versions): 120 """Determine latest version, given `target`. 121 """ 122 current_version = Version(current_version) 123 124 def _parse_versions(versions): 125 for v in versions: 126 try: 127 yield Version(v) 128 except InvalidVersion: 129 pass 130 131 versions = _parse_versions(versions) 132 133 index = SEMVER[target] 134 135 ceiling = list(current_version[0:index]) 136 if len(ceiling) == 0: 137 ceiling = None 138 else: 139 ceiling[-1]+=1 140 ceiling = Version(".".join(map(str, ceiling))) 141 142 # We do not want prereleases 143 versions = SpecifierSet(prereleases=PRERELEASES).filter(versions) 144 145 if ceiling is not None: 146 versions = SpecifierSet(f"<{ceiling}").filter(versions) 147 148 return (max(sorted(versions))).raw_version 149 150 151def _get_latest_version_pypi(package, extension, current_version, target): 152 """Get latest version and hash from PyPI.""" 153 url = "{}/{}/json".format(INDEX, package) 154 json = _fetch_page(url) 155 156 versions = json['releases'].keys() 157 version = _determine_latest_version(current_version, target, versions) 158 159 try: 160 releases = json['releases'][version] 161 except KeyError as e: 162 raise KeyError('Could not find version {} for {}'.format(version, package)) from e 163 for release in releases: 164 if release['filename'].endswith(extension): 165 # TODO: In case of wheel we need to do further checks! 166 sha256 = release['digests']['sha256'] 167 break 168 else: 169 sha256 = None 170 return version, sha256 171 172 173def _get_latest_version_github(package, extension, current_version, target): 174 raise ValueError("updating from GitHub is not yet supported.") 175 176 177FETCHERS = { 178 'fetchFromGitHub' : _get_latest_version_github, 179 'fetchPypi' : _get_latest_version_pypi, 180 'fetchurl' : _get_latest_version_pypi, 181} 182 183 184DEFAULT_SETUPTOOLS_EXTENSION = 'tar.gz' 185 186 187FORMATS = { 188 'setuptools' : DEFAULT_SETUPTOOLS_EXTENSION, 189 'wheel' : 'whl' 190} 191 192def _determine_fetcher(text): 193 # Count occurences of fetchers. 194 nfetchers = sum(text.count('src = {}'.format(fetcher)) for fetcher in FETCHERS.keys()) 195 if nfetchers == 0: 196 raise ValueError("no fetcher.") 197 elif nfetchers > 1: 198 raise ValueError("multiple fetchers.") 199 else: 200 # Then we check which fetcher to use. 201 for fetcher in FETCHERS.keys(): 202 if 'src = {}'.format(fetcher) in text: 203 return fetcher 204 205 206def _determine_extension(text, fetcher): 207 """Determine what extension is used in the expression. 208 209 If we use: 210 - fetchPypi, we check if format is specified. 211 - fetchurl, we determine the extension from the url. 212 - fetchFromGitHub we simply use `.tar.gz`. 213 """ 214 if fetcher == 'fetchPypi': 215 try: 216 src_format = _get_unique_value('format', text) 217 except ValueError as e: 218 src_format = None # format was not given 219 220 try: 221 extension = _get_unique_value('extension', text) 222 except ValueError as e: 223 extension = None # extension was not given 224 225 if extension is None: 226 if src_format is None: 227 src_format = 'setuptools' 228 elif src_format == 'flit': 229 raise ValueError("Don't know how to update a Flit package.") 230 elif src_format == 'other': 231 raise ValueError("Don't know how to update a format='other' package.") 232 elif src_format == 'pyproject': 233 raise ValueError("Don't know how to update a pyproject package.") 234 extension = FORMATS[src_format] 235 236 elif fetcher == 'fetchurl': 237 url = _get_unique_value('url', text) 238 extension = os.path.splitext(url)[1] 239 if 'pypi' not in url: 240 raise ValueError('url does not point to PyPI.') 241 242 elif fetcher == 'fetchFromGitHub': 243 raise ValueError('updating from GitHub is not yet implemented.') 244 245 return extension 246 247 248def _update_package(path, target): 249 250 # Read the expression 251 with open(path, 'r') as f: 252 text = f.read() 253 254 # Determine pname. 255 pname = _get_unique_value('pname', text) 256 257 # Determine version. 258 version = _get_unique_value('version', text) 259 260 # First we check how many fetchers are mentioned. 261 fetcher = _determine_fetcher(text) 262 263 extension = _determine_extension(text, fetcher) 264 265 new_version, new_sha256 = FETCHERS[fetcher](pname, extension, version, target) 266 267 if new_version == version: 268 logging.info("Path {}: no update available for {}.".format(path, pname)) 269 return False 270 elif Version(new_version) <= Version(version): 271 raise ValueError("downgrade for {}.".format(pname)) 272 if not new_sha256: 273 raise ValueError("no file available for {}.".format(pname)) 274 275 text = _replace_value('version', new_version, text) 276 text = _replace_value('sha256', new_sha256, text) 277 278 with open(path, 'w') as f: 279 f.write(text) 280 281 logging.info("Path {}: updated {} from {} to {}".format(path, pname, version, new_version)) 282 283 result = { 284 'path' : path, 285 'target': target, 286 'pname': pname, 287 'old_version' : version, 288 'new_version' : new_version, 289 #'fetcher' : fetcher, 290 } 291 292 return result 293 294 295def _update(path, target): 296 297 # We need to read and modify a Nix expression. 298 if os.path.isdir(path): 299 path = os.path.join(path, 'default.nix') 300 301 # If a default.nix does not exist, we quit. 302 if not os.path.isfile(path): 303 logging.info("Path {}: does not exist.".format(path)) 304 return False 305 306 # If file is not a Nix expression, we quit. 307 if not path.endswith(".nix"): 308 logging.info("Path {}: does not end with `.nix`.".format(path)) 309 return False 310 311 try: 312 return _update_package(path, target) 313 except ValueError as e: 314 logging.warning("Path {}: {}".format(path, e)) 315 return False 316 317 318def _commit(path, pname, old_version, new_version, pkgs_prefix="python: ", **kwargs): 319 """Commit result. 320 """ 321 322 msg = f'{pkgs_prefix}{pname}: {old_version} -> {new_version}' 323 324 try: 325 subprocess.check_call([GIT, 'add', path]) 326 subprocess.check_call([GIT, 'commit', '-m', msg]) 327 except subprocess.CalledProcessError as e: 328 subprocess.check_call([GIT, 'checkout', path]) 329 raise subprocess.CalledProcessError(f'Could not commit {path}') from e 330 331 return True 332 333 334def main(): 335 336 parser = argparse.ArgumentParser() 337 parser.add_argument('package', type=str, nargs='+') 338 parser.add_argument('--target', type=str, choices=SEMVER.keys(), default='major') 339 parser.add_argument('--commit', action='store_true', help='Create a commit for each package update') 340 parser.add_argument('--use-pkgs-prefix', action='store_true', help='Use python3Packages.${pname}: instead of python: ${pname}: when making commits') 341 342 args = parser.parse_args() 343 target = args.target 344 345 packages = list(map(os.path.abspath, args.package)) 346 347 logging.info("Updating packages...") 348 349 # Use threads to update packages concurrently 350 with Pool() as p: 351 results = list(filter(bool, p.map(lambda pkg: _update(pkg, target), packages))) 352 353 logging.info("Finished updating packages.") 354 355 commit_options = {} 356 if args.use_pkgs_prefix: 357 logging.info("Using python3Packages. prefix for commits") 358 commit_options["pkgs_prefix"] = "python3Packages." 359 360 # Commits are created sequentially. 361 if args.commit: 362 logging.info("Committing updates...") 363 # list forces evaluation 364 list(map(lambda x: _commit(**x, **commit_options), results)) 365 logging.info("Finished committing updates") 366 367 count = len(results) 368 logging.info("{} package(s) updated".format(count)) 369 370 371 372if __name__ == '__main__': 373 main()