Clone of https://github.com/NixOS/nixpkgs.git (to stress-test knotserver)
at 19.03 362 lines 10 kB view raw
1#!/usr/bin/env python3 2 3""" 4Update a Python package expression by passing in the `.nix` file, or the directory containing it. 5You can pass in multiple files or paths. 6 7You'll likely want to use 8`` 9 $ ./update-python-libraries ../../pkgs/development/python-modules/* 10`` 11to update all libraries in that folder. 12""" 13 14import argparse 15import logging 16import os 17import re 18import requests 19import toolz 20from concurrent.futures import ThreadPoolExecutor as Pool 21from packaging.version import Version as _Version 22from packaging.version import InvalidVersion 23from packaging.specifiers import SpecifierSet 24import collections 25import subprocess 26 27INDEX = "https://pypi.io/pypi" 28"""url of PyPI""" 29 30EXTENSIONS = ['tar.gz', 'tar.bz2', 'tar', 'zip', '.whl'] 31"""Permitted file extensions. These are evaluated from left to right and the first occurance is returned.""" 32 33PRERELEASES = False 34 35GIT = "git" 36 37import logging 38logging.basicConfig(level=logging.INFO) 39 40 41class Version(_Version, collections.abc.Sequence): 42 43 def __init__(self, version): 44 super().__init__(version) 45 # We cannot use `str(Version(0.04.21))` because that becomes `0.4.21` 46 # https://github.com/avian2/unidecode/issues/13#issuecomment-354538882 47 self.raw_version = version 48 49 def __getitem__(self, i): 50 return self._version.release[i] 51 52 def __len__(self): 53 return len(self._version.release) 54 55 def __iter__(self): 56 yield from self._version.release 57 58 59def _get_values(attribute, text): 60 """Match attribute in text and return all matches. 61 62 :returns: List of matches. 63 """ 64 regex = '{}\s+=\s+"(.*)";'.format(attribute) 65 regex = re.compile(regex) 66 values = regex.findall(text) 67 return values 68 69def _get_unique_value(attribute, text): 70 """Match attribute in text and return unique match. 71 72 :returns: Single match. 73 """ 74 values = _get_values(attribute, text) 75 n = len(values) 76 if n > 1: 77 raise ValueError("found too many values for {}".format(attribute)) 78 elif n == 1: 79 return values[0] 80 else: 81 raise ValueError("no value found for {}".format(attribute)) 82 83def _get_line_and_value(attribute, text): 84 """Match attribute in text. Return the line and the value of the attribute.""" 85 regex = '({}\s+=\s+"(.*)";)'.format(attribute) 86 regex = re.compile(regex) 87 value = regex.findall(text) 88 n = len(value) 89 if n > 1: 90 raise ValueError("found too many values for {}".format(attribute)) 91 elif n == 1: 92 return value[0] 93 else: 94 raise ValueError("no value found for {}".format(attribute)) 95 96 97def _replace_value(attribute, value, text): 98 """Search and replace value of attribute in text.""" 99 old_line, old_value = _get_line_and_value(attribute, text) 100 new_line = old_line.replace(old_value, value) 101 new_text = text.replace(old_line, new_line) 102 return new_text 103 104def _fetch_page(url): 105 r = requests.get(url) 106 if r.status_code == requests.codes.ok: 107 return r.json() 108 else: 109 raise ValueError("request for {} failed".format(url)) 110 111 112SEMVER = { 113 'major' : 0, 114 'minor' : 1, 115 'patch' : 2, 116} 117 118 119def _determine_latest_version(current_version, target, versions): 120 """Determine latest version, given `target`. 121 """ 122 current_version = Version(current_version) 123 124 def _parse_versions(versions): 125 for v in versions: 126 try: 127 yield Version(v) 128 except InvalidVersion: 129 pass 130 131 versions = _parse_versions(versions) 132 133 index = SEMVER[target] 134 135 ceiling = list(current_version[0:index]) 136 if len(ceiling) == 0: 137 ceiling = None 138 else: 139 ceiling[-1]+=1 140 ceiling = Version(".".join(map(str, ceiling))) 141 142 # We do not want prereleases 143 versions = SpecifierSet(prereleases=PRERELEASES).filter(versions) 144 145 if ceiling is not None: 146 versions = SpecifierSet(f"<{ceiling}").filter(versions) 147 148 return (max(sorted(versions))).raw_version 149 150 151def _get_latest_version_pypi(package, extension, current_version, target): 152 """Get latest version and hash from PyPI.""" 153 url = "{}/{}/json".format(INDEX, package) 154 json = _fetch_page(url) 155 156 versions = json['releases'].keys() 157 version = _determine_latest_version(current_version, target, versions) 158 159 try: 160 releases = json['releases'][version] 161 except KeyError as e: 162 raise KeyError('Could not find version {} for {}'.format(version, package)) from e 163 for release in releases: 164 if release['filename'].endswith(extension): 165 # TODO: In case of wheel we need to do further checks! 166 sha256 = release['digests']['sha256'] 167 break 168 else: 169 sha256 = None 170 return version, sha256 171 172 173def _get_latest_version_github(package, extension, current_version, target): 174 raise ValueError("updating from GitHub is not yet supported.") 175 176 177FETCHERS = { 178 'fetchFromGitHub' : _get_latest_version_github, 179 'fetchPypi' : _get_latest_version_pypi, 180 'fetchurl' : _get_latest_version_pypi, 181} 182 183 184DEFAULT_SETUPTOOLS_EXTENSION = 'tar.gz' 185 186 187FORMATS = { 188 'setuptools' : DEFAULT_SETUPTOOLS_EXTENSION, 189 'wheel' : 'whl' 190} 191 192def _determine_fetcher(text): 193 # Count occurences of fetchers. 194 nfetchers = sum(text.count('src = {}'.format(fetcher)) for fetcher in FETCHERS.keys()) 195 if nfetchers == 0: 196 raise ValueError("no fetcher.") 197 elif nfetchers > 1: 198 raise ValueError("multiple fetchers.") 199 else: 200 # Then we check which fetcher to use. 201 for fetcher in FETCHERS.keys(): 202 if 'src = {}'.format(fetcher) in text: 203 return fetcher 204 205 206def _determine_extension(text, fetcher): 207 """Determine what extension is used in the expression. 208 209 If we use: 210 - fetchPypi, we check if format is specified. 211 - fetchurl, we determine the extension from the url. 212 - fetchFromGitHub we simply use `.tar.gz`. 213 """ 214 if fetcher == 'fetchPypi': 215 try: 216 src_format = _get_unique_value('format', text) 217 except ValueError as e: 218 src_format = None # format was not given 219 220 try: 221 extension = _get_unique_value('extension', text) 222 except ValueError as e: 223 extension = None # extension was not given 224 225 if extension is None: 226 if src_format is None: 227 src_format = 'setuptools' 228 elif src_format == 'flit': 229 raise ValueError("Don't know how to update a Flit package.") 230 extension = FORMATS[src_format] 231 232 elif fetcher == 'fetchurl': 233 url = _get_unique_value('url', text) 234 extension = os.path.splitext(url)[1] 235 if 'pypi' not in url: 236 raise ValueError('url does not point to PyPI.') 237 238 elif fetcher == 'fetchFromGitHub': 239 raise ValueError('updating from GitHub is not yet implemented.') 240 241 return extension 242 243 244def _update_package(path, target): 245 246 # Read the expression 247 with open(path, 'r') as f: 248 text = f.read() 249 250 # Determine pname. 251 pname = _get_unique_value('pname', text) 252 253 # Determine version. 254 version = _get_unique_value('version', text) 255 256 # First we check how many fetchers are mentioned. 257 fetcher = _determine_fetcher(text) 258 259 extension = _determine_extension(text, fetcher) 260 261 new_version, new_sha256 = FETCHERS[fetcher](pname, extension, version, target) 262 263 if new_version == version: 264 logging.info("Path {}: no update available for {}.".format(path, pname)) 265 return False 266 elif Version(new_version) <= Version(version): 267 raise ValueError("downgrade for {}.".format(pname)) 268 if not new_sha256: 269 raise ValueError("no file available for {}.".format(pname)) 270 271 text = _replace_value('version', new_version, text) 272 text = _replace_value('sha256', new_sha256, text) 273 274 with open(path, 'w') as f: 275 f.write(text) 276 277 logging.info("Path {}: updated {} from {} to {}".format(path, pname, version, new_version)) 278 279 result = { 280 'path' : path, 281 'target': target, 282 'pname': pname, 283 'old_version' : version, 284 'new_version' : new_version, 285 #'fetcher' : fetcher, 286 } 287 288 return result 289 290 291def _update(path, target): 292 293 # We need to read and modify a Nix expression. 294 if os.path.isdir(path): 295 path = os.path.join(path, 'default.nix') 296 297 # If a default.nix does not exist, we quit. 298 if not os.path.isfile(path): 299 logging.info("Path {}: does not exist.".format(path)) 300 return False 301 302 # If file is not a Nix expression, we quit. 303 if not path.endswith(".nix"): 304 logging.info("Path {}: does not end with `.nix`.".format(path)) 305 return False 306 307 try: 308 return _update_package(path, target) 309 except ValueError as e: 310 logging.warning("Path {}: {}".format(path, e)) 311 return False 312 313 314def _commit(path, pname, old_version, new_version, **kwargs): 315 """Commit result. 316 """ 317 318 msg = f'python: {pname}: {old_version} -> {new_version}' 319 320 try: 321 subprocess.check_call([GIT, 'add', path]) 322 subprocess.check_call([GIT, 'commit', '-m', msg]) 323 except subprocess.CalledProcessError as e: 324 subprocess.check_call([GIT, 'checkout', path]) 325 raise subprocess.CalledProcessError(f'Could not commit {path}') from e 326 327 return True 328 329 330def main(): 331 332 parser = argparse.ArgumentParser() 333 parser.add_argument('package', type=str, nargs='+') 334 parser.add_argument('--target', type=str, choices=SEMVER.keys(), default='major') 335 parser.add_argument('--commit', action='store_true', help='Create a commit for each package update') 336 337 args = parser.parse_args() 338 target = args.target 339 340 packages = list(map(os.path.abspath, args.package)) 341 342 logging.info("Updating packages...") 343 344 # Use threads to update packages concurrently 345 with Pool() as p: 346 results = list(p.map(lambda pkg: _update(pkg, target), packages)) 347 348 logging.info("Finished updating packages.") 349 350 # Commits are created sequentially. 351 if args.commit: 352 logging.info("Committing updates...") 353 list(map(lambda x: _commit(**x), filter(bool, results))) 354 logging.info("Finished committing updates") 355 356 count = sum(map(bool, results)) 357 logging.info("{} package(s) updated".format(count)) 358 359 360 361if __name__ == '__main__': 362 main()