1#!/usr/bin/env python3
2
3"""
4Update a Python package expression by passing in the `.nix` file, or the directory containing it.
5You can pass in multiple files or paths.
6
7You'll likely want to use
8``
9 $ ./update-python-libraries ../../pkgs/development/python-modules/**/default.nix
10``
11to update all non-pinned libraries in that folder.
12"""
13
14import argparse
15import collections
16import json
17import logging
18import os
19import re
20import subprocess
21from concurrent.futures import ThreadPoolExecutor as Pool
22from typing import Any, Optional
23
24import requests
25from packaging.specifiers import SpecifierSet
26from packaging.version import InvalidVersion
27from packaging.version import Version as _Version
28
29INDEX = "https://pypi.io/pypi"
30"""url of PyPI"""
31
32EXTENSIONS = ["tar.gz", "tar.bz2", "tar", "zip", ".whl"]
33"""Permitted file extensions. These are evaluated from left to right and the first occurance is returned."""
34
35PRERELEASES = False
36
37BULK_UPDATE = False
38
39GIT = "git"
40
41NIXPKGS_ROOT = (
42 subprocess.check_output(["git", "rev-parse", "--show-toplevel"])
43 .decode("utf-8")
44 .strip()
45)
46
47logging.basicConfig(level=logging.INFO)
48
49
50class Version(_Version, collections.abc.Sequence):
51 def __init__(self, version):
52 super().__init__(version)
53 # We cannot use `str(Version(0.04.21))` because that becomes `0.4.21`
54 # https://github.com/avian2/unidecode/issues/13#issuecomment-354538882
55 self.raw_version = version
56
57 def __getitem__(self, i):
58 return self._version.release[i]
59
60 def __len__(self):
61 return len(self._version.release)
62
63 def __iter__(self):
64 yield from self._version.release
65
66
67def _get_values(attribute, text):
68 """Match attribute in text and return all matches.
69
70 :returns: List of matches.
71 """
72 regex = rf'{re.escape(attribute)}\s+=\s+"(.*)";'
73 regex = re.compile(regex)
74 values = regex.findall(text)
75 return values
76
77
78def _get_attr_value(attr_path: str) -> Optional[Any]:
79 try:
80 response = subprocess.check_output(
81 [
82 "nix",
83 "--extra-experimental-features",
84 "nix-command",
85 "eval",
86 "-f",
87 f"{NIXPKGS_ROOT}/default.nix",
88 "--json",
89 f"{attr_path}",
90 ],
91 stderr=subprocess.DEVNULL,
92 )
93 return json.loads(response.decode())
94 except (subprocess.CalledProcessError, ValueError):
95 return None
96
97
98def _get_unique_value(attribute, text):
99 """Match attribute in text and return unique match.
100
101 :returns: Single match.
102 """
103 values = _get_values(attribute, text)
104 n = len(values)
105 if n > 1:
106 raise ValueError("found too many values for {}".format(attribute))
107 elif n == 1:
108 return values[0]
109 else:
110 raise ValueError("no value found for {}".format(attribute))
111
112
113def _get_line_and_value(attribute, text, value=None):
114 """Match attribute in text. Return the line and the value of the attribute."""
115 if value is None:
116 regex = rf"({re.escape(attribute)}\s+=\s+\"(.*)\";)"
117 else:
118 regex = rf"({re.escape(attribute)}\s+=\s+\"({re.escape(value)})\";)"
119 regex = re.compile(regex)
120 results = regex.findall(text)
121 n = len(results)
122 if n > 1:
123 raise ValueError("found too many values for {}".format(attribute))
124 elif n == 1:
125 return results[0]
126 else:
127 raise ValueError("no value found for {}".format(attribute))
128
129
130def _replace_value(attribute, value, text, oldvalue=None):
131 """Search and replace value of attribute in text."""
132 if oldvalue is None:
133 old_line, old_value = _get_line_and_value(attribute, text)
134 else:
135 old_line, old_value = _get_line_and_value(attribute, text, oldvalue)
136 new_line = old_line.replace(old_value, value)
137 new_text = text.replace(old_line, new_line)
138 return new_text
139
140
141def _fetch_page(url):
142 r = requests.get(url)
143 if r.status_code == requests.codes.ok:
144 return r.json()
145 else:
146 raise ValueError("request for {} failed".format(url))
147
148
149def _fetch_github(url):
150 headers = {}
151 token = os.environ.get("GITHUB_API_TOKEN")
152 if token:
153 headers["Authorization"] = f"token {token}"
154 r = requests.get(url, headers=headers)
155
156 if r.status_code == requests.codes.ok:
157 return r.json()
158 else:
159 raise ValueError("request for {} failed".format(url))
160
161
162def _hash_to_sri(algorithm, value):
163 """Convert a hash to its SRI representation"""
164 return (
165 subprocess.check_output(["nix", "hash", "to-sri", "--type", algorithm, value])
166 .decode()
167 .strip()
168 )
169
170
171def _skip_bulk_update(attr_name: str) -> bool:
172 return bool(_get_attr_value(f"{attr_name}.skipBulkUpdate"))
173
174
175SEMVER = {
176 "major": 0,
177 "minor": 1,
178 "patch": 2,
179}
180
181
182def _determine_latest_version(current_version, target, versions):
183 """Determine latest version, given `target`."""
184 current_version = Version(current_version)
185
186 def _parse_versions(versions):
187 for v in versions:
188 try:
189 yield Version(v)
190 except InvalidVersion:
191 pass
192
193 versions = _parse_versions(versions)
194
195 index = SEMVER[target]
196
197 ceiling = list(current_version[0:index])
198 if len(ceiling) == 0:
199 ceiling = None
200 else:
201 ceiling[-1] += 1
202 ceiling = Version(".".join(map(str, ceiling)))
203
204 # We do not want prereleases
205 versions = SpecifierSet(prereleases=PRERELEASES).filter(versions)
206
207 if ceiling is not None:
208 versions = SpecifierSet(f"<{ceiling}").filter(versions)
209
210 return (max(sorted(versions))).raw_version
211
212
213def _get_latest_version_pypi(attr_path, package, extension, current_version, target):
214 """Get latest version and hash from PyPI."""
215 url = "{}/{}/json".format(INDEX, package)
216 json = _fetch_page(url)
217
218 versions = {
219 version
220 for version, releases in json["releases"].items()
221 if not all(release["yanked"] for release in releases)
222 }
223 version = _determine_latest_version(current_version, target, versions)
224
225 try:
226 releases = json["releases"][version]
227 except KeyError as e:
228 raise KeyError(
229 "Could not find version {} for {}".format(version, package)
230 ) from e
231 for release in releases:
232 if release["filename"].endswith(extension):
233 # TODO: In case of wheel we need to do further checks!
234 sha256 = release["digests"]["sha256"]
235 break
236 else:
237 sha256 = None
238 return version, sha256, None
239
240
241def _get_latest_version_github(attr_path, package, extension, current_version, target):
242 def strip_prefix(tag):
243 return re.sub("^[^0-9]*", "", tag)
244
245 def get_prefix(string):
246 matches = re.findall(r"^([^0-9]*)", string)
247 return next(iter(matches), "")
248
249 try:
250 homepage = subprocess.check_output(
251 [
252 "nix",
253 "eval",
254 "-f",
255 f"{NIXPKGS_ROOT}/default.nix",
256 "--raw",
257 f"{attr_path}.src.meta.homepage",
258 ]
259 ).decode("utf-8")
260 except Exception as e:
261 raise ValueError(f"Unable to determine homepage: {e}")
262 owner_repo = homepage[len("https://github.com/") :] # remove prefix
263 owner, repo = owner_repo.split("/")
264
265 url = f"https://api.github.com/repos/{owner}/{repo}/releases"
266 all_releases = _fetch_github(url)
267 releases = list(filter(lambda x: not x["prerelease"], all_releases))
268
269 if len(releases) == 0:
270 raise ValueError(f"{homepage} does not contain any stable releases")
271
272 versions = map(lambda x: strip_prefix(x["tag_name"]), releases)
273 version = _determine_latest_version(current_version, target, versions)
274
275 release = next(filter(lambda x: strip_prefix(x["tag_name"]) == version, releases))
276 prefix = get_prefix(release["tag_name"])
277
278 # some attributes require using the fetchgit
279 git_fetcher_args = []
280 if _get_attr_value(f"{attr_path}.src.fetchSubmodules"):
281 git_fetcher_args.append("--fetch-submodules")
282 if _get_attr_value(f"{attr_path}.src.fetchLFS"):
283 git_fetcher_args.append("--fetch-lfs")
284 if _get_attr_value(f"{attr_path}.src.leaveDotGit"):
285 git_fetcher_args.append("--leave-dotGit")
286
287 if git_fetcher_args:
288 algorithm = "sha256"
289 cmd = [
290 "nix-prefetch-git",
291 f"https://github.com/{owner}/{repo}.git",
292 "--hash",
293 algorithm,
294 "--rev",
295 f"refs/tags/{release['tag_name']}",
296 ]
297 cmd.extend(git_fetcher_args)
298 response = subprocess.check_output(cmd)
299 document = json.loads(response.decode())
300 hash = _hash_to_sri(algorithm, document[algorithm])
301 else:
302 try:
303 hash = (
304 subprocess.check_output(
305 [
306 "nix-prefetch-url",
307 "--type",
308 "sha256",
309 "--unpack",
310 f"{release['tarball_url']}",
311 ],
312 stderr=subprocess.DEVNULL,
313 )
314 .decode("utf-8")
315 .strip()
316 )
317 except (subprocess.CalledProcessError, UnicodeError):
318 # this may fail if they have both a branch and a tag of the same name, attempt tag name
319 tag_url = str(release["tarball_url"]).replace(
320 "tarball", "tarball/refs/tags"
321 )
322 hash = (
323 subprocess.check_output(
324 ["nix-prefetch-url", "--type", "sha256", "--unpack", tag_url],
325 stderr=subprocess.DEVNULL,
326 )
327 .decode("utf-8")
328 .strip()
329 )
330
331 return version, hash, prefix
332
333
334FETCHERS = {
335 "fetchFromGitHub": _get_latest_version_github,
336 "fetchPypi": _get_latest_version_pypi,
337 "fetchurl": _get_latest_version_pypi,
338}
339
340
341DEFAULT_SETUPTOOLS_EXTENSION = "tar.gz"
342
343
344FORMATS = {
345 "setuptools": DEFAULT_SETUPTOOLS_EXTENSION,
346 "wheel": "whl",
347 "pyproject": "tar.gz",
348 "flit": "tar.gz",
349}
350
351
352def _determine_fetcher(text):
353 # Count occurrences of fetchers.
354 nfetchers = sum(
355 text.count("src = {}".format(fetcher)) for fetcher in FETCHERS.keys()
356 )
357 if nfetchers == 0:
358 raise ValueError("no fetcher.")
359 elif nfetchers > 1:
360 raise ValueError("multiple fetchers.")
361 else:
362 # Then we check which fetcher to use.
363 for fetcher in FETCHERS.keys():
364 if "src = {}".format(fetcher) in text:
365 return fetcher
366
367
368def _determine_extension(text, fetcher):
369 """Determine what extension is used in the expression.
370
371 If we use:
372 - fetchPypi, we check if format is specified.
373 - fetchurl, we determine the extension from the url.
374 - fetchFromGitHub we simply use `.tar.gz`.
375 """
376 if fetcher == "fetchPypi":
377 try:
378 src_format = _get_unique_value("format", text)
379 except ValueError:
380 src_format = None # format was not given
381
382 try:
383 extension = _get_unique_value("extension", text)
384 except ValueError:
385 extension = None # extension was not given
386
387 if extension is None:
388 if src_format is None:
389 src_format = "setuptools"
390 elif src_format == "other":
391 raise ValueError("Don't know how to update a format='other' package.")
392 extension = FORMATS[src_format]
393
394 elif fetcher == "fetchurl":
395 url = _get_unique_value("url", text)
396 extension = os.path.splitext(url)[1]
397 if "pypi" not in url:
398 raise ValueError("url does not point to PyPI.")
399
400 elif fetcher == "fetchFromGitHub":
401 extension = "tar.gz"
402
403 return extension
404
405
406def _update_package(path, target):
407 # Read the expression
408 with open(path, "r") as f:
409 text = f.read()
410
411 # Determine pname. Many files have more than one pname
412 pnames = _get_values("pname", text)
413
414 # Determine version.
415 version = _get_unique_value("version", text)
416
417 # First we check how many fetchers are mentioned.
418 fetcher = _determine_fetcher(text)
419
420 extension = _determine_extension(text, fetcher)
421
422 # Attempt a fetch using each pname, e.g. backports-zoneinfo vs backports.zoneinfo
423 successful_fetch = False
424 for pname in pnames:
425 # when invoked as an updateScript, UPDATE_NIX_ATTR_PATH will be set
426 # this allows us to work with packages which live outside of python-modules
427 attr_path = os.environ.get("UPDATE_NIX_ATTR_PATH", f"python3Packages.{pname}")
428
429 if BULK_UPDATE and _skip_bulk_update(attr_path):
430 raise ValueError(f"Bulk update skipped for {pname}")
431 elif _get_attr_value(f"{attr_path}.cargoDeps") is not None:
432 raise ValueError(f"Cargo dependencies are unsupported, skipping {pname}")
433 try:
434 new_version, new_sha256, prefix = FETCHERS[fetcher](
435 attr_path, pname, extension, version, target
436 )
437 successful_fetch = True
438 break
439 except ValueError:
440 continue
441
442 if not successful_fetch:
443 raise ValueError(f"Unable to find correct package using these pnames: {pnames}")
444
445 if new_version == version:
446 logging.info("Path {}: no update available for {}.".format(path, pname))
447 return False
448 elif Version(new_version) <= Version(version):
449 raise ValueError("downgrade for {}.".format(pname))
450 if not new_sha256:
451 raise ValueError("no file available for {}.".format(pname))
452
453 text = _replace_value("version", new_version, text)
454
455 # hashes from pypi are 16-bit encoded sha256's, normalize it to sri to avoid merge conflicts
456 # sri hashes have been the default format since nix 2.4+
457 sri_hash = _hash_to_sri("sha256", new_sha256)
458
459 # retrieve the old output hash for a more precise match
460 if old_hash := _get_attr_value(f"{attr_path}.src.outputHash"):
461 # fetchers can specify a sha256, or a sri hash
462 try:
463 text = _replace_value("hash", sri_hash, text, old_hash)
464 except ValueError:
465 text = _replace_value("sha256", sri_hash, text, old_hash)
466 else:
467 raise ValueError(f"Unable to retrieve old hash for {pname}")
468
469 if fetcher == "fetchFromGitHub":
470 # in the case of fetchFromGitHub, it's common to see `rev = version;` or `rev = "v${version}";`
471 # in which no string value is meant to be substituted. However, we can just overwrite the previous value.
472 regex = r"(rev\s+=\s+[^;]*;)"
473 regex = re.compile(regex)
474 matches = regex.findall(text)
475 n = len(matches)
476
477 if n == 0:
478 raise ValueError("Unable to find rev value for {}.".format(pname))
479 else:
480 # forcefully rewrite rev, incase tagging conventions changed for a release
481 match = matches[0]
482 text = text.replace(match, f'rev = "refs/tags/{prefix}${{version}}";')
483 # incase there's no prefix, just rewrite without interpolation
484 text = text.replace('"${version}";', "version;")
485
486 with open(path, "w") as f:
487 f.write(text)
488
489 logging.info(
490 "Path {}: updated {} from {} to {}".format(
491 path, pname, version, new_version
492 )
493 )
494
495 result = {
496 "path": path,
497 "target": target,
498 "pname": pname,
499 "old_version": version,
500 "new_version": new_version,
501 #'fetcher' : fetcher,
502 }
503
504 return result
505
506
507def _update(path, target):
508 # We need to read and modify a Nix expression.
509 if os.path.isdir(path):
510 path = os.path.join(path, "default.nix")
511
512 # If a default.nix does not exist, we quit.
513 if not os.path.isfile(path):
514 logging.info("Path {}: does not exist.".format(path))
515 return False
516
517 # If file is not a Nix expression, we quit.
518 if not path.endswith(".nix"):
519 logging.info("Path {}: does not end with `.nix`.".format(path))
520 return False
521
522 try:
523 return _update_package(path, target)
524 except ValueError as e:
525 logging.warning("Path {}: {}".format(path, e))
526 return False
527
528
529def _commit(path, pname, old_version, new_version, pkgs_prefix="python: ", **kwargs):
530 """Commit result."""
531
532 msg = f"{pkgs_prefix}{pname}: {old_version} -> {new_version}"
533
534 if changelog := _get_attr_value(f"{pkgs_prefix}{pname}.meta.changelog"):
535 msg += f"\n\n{changelog}"
536
537 try:
538 subprocess.check_call([GIT, "add", path])
539 subprocess.check_call([GIT, "commit", "-m", msg])
540 except subprocess.CalledProcessError as e:
541 subprocess.check_call([GIT, "checkout", path])
542 raise subprocess.CalledProcessError(f"Could not commit {path}") from e
543
544 return True
545
546
547def main():
548 epilog = """
549environment variables:
550 GITHUB_API_TOKEN\tGitHub API token used when updating github packages
551 """
552 parser = argparse.ArgumentParser(
553 formatter_class=argparse.RawDescriptionHelpFormatter, epilog=epilog
554 )
555 parser.add_argument("package", type=str, nargs="+")
556 parser.add_argument("--target", type=str, choices=SEMVER.keys(), default="major")
557 parser.add_argument(
558 "--commit", action="store_true", help="Create a commit for each package update"
559 )
560 parser.add_argument(
561 "--use-pkgs-prefix",
562 action="store_true",
563 help="Use python3Packages.${pname}: instead of python: ${pname}: when making commits",
564 )
565
566 args = parser.parse_args()
567 target = args.target
568
569 packages = list(map(os.path.abspath, args.package))
570
571 if len(packages) > 1:
572 global BULK_UPDATE
573 BULK_UPDATE = True
574
575 logging.info("Updating packages...")
576
577 # Use threads to update packages concurrently
578 with Pool() as p:
579 results = list(filter(bool, p.map(lambda pkg: _update(pkg, target), packages)))
580
581 logging.info("Finished updating packages.")
582
583 commit_options = {}
584 if args.use_pkgs_prefix:
585 logging.info("Using python3Packages. prefix for commits")
586 commit_options["pkgs_prefix"] = "python3Packages."
587
588 # Commits are created sequentially.
589 if args.commit:
590 logging.info("Committing updates...")
591 # list forces evaluation
592 list(map(lambda x: _commit(**x, **commit_options), results))
593 logging.info("Finished committing updates")
594
595 count = len(results)
596 logging.info("{} package(s) updated".format(count))
597
598
599if __name__ == "__main__":
600 main()