1#!/usr/bin/env python3
2
3"""
4Update a Python package expression by passing in the `.nix` file, or the directory containing it.
5You can pass in multiple files or paths.
6
7You'll likely want to use
8``
9 $ ./update-python-libraries ../../pkgs/development/python-modules/**/default.nix
10``
11to update all non-pinned libraries in that folder.
12"""
13
14import argparse
15import logging
16import os
17import re
18import requests
19import toolz
20from concurrent.futures import ThreadPoolExecutor as Pool
21from packaging.version import Version as _Version
22from packaging.version import InvalidVersion
23from packaging.specifiers import SpecifierSet
24import collections
25import subprocess
26
27INDEX = "https://pypi.io/pypi"
28"""url of PyPI"""
29
30EXTENSIONS = ['tar.gz', 'tar.bz2', 'tar', 'zip', '.whl']
31"""Permitted file extensions. These are evaluated from left to right and the first occurance is returned."""
32
33PRERELEASES = False
34
35GIT = "git"
36
37import logging
38logging.basicConfig(level=logging.INFO)
39
40
41class Version(_Version, collections.abc.Sequence):
42
43 def __init__(self, version):
44 super().__init__(version)
45 # We cannot use `str(Version(0.04.21))` because that becomes `0.4.21`
46 # https://github.com/avian2/unidecode/issues/13#issuecomment-354538882
47 self.raw_version = version
48
49 def __getitem__(self, i):
50 return self._version.release[i]
51
52 def __len__(self):
53 return len(self._version.release)
54
55 def __iter__(self):
56 yield from self._version.release
57
58
59def _get_values(attribute, text):
60 """Match attribute in text and return all matches.
61
62 :returns: List of matches.
63 """
64 regex = '{}\s+=\s+"(.*)";'.format(attribute)
65 regex = re.compile(regex)
66 values = regex.findall(text)
67 return values
68
69def _get_unique_value(attribute, text):
70 """Match attribute in text and return unique match.
71
72 :returns: Single match.
73 """
74 values = _get_values(attribute, text)
75 n = len(values)
76 if n > 1:
77 raise ValueError("found too many values for {}".format(attribute))
78 elif n == 1:
79 return values[0]
80 else:
81 raise ValueError("no value found for {}".format(attribute))
82
83def _get_line_and_value(attribute, text):
84 """Match attribute in text. Return the line and the value of the attribute."""
85 regex = '({}\s+=\s+"(.*)";)'.format(attribute)
86 regex = re.compile(regex)
87 value = regex.findall(text)
88 n = len(value)
89 if n > 1:
90 raise ValueError("found too many values for {}".format(attribute))
91 elif n == 1:
92 return value[0]
93 else:
94 raise ValueError("no value found for {}".format(attribute))
95
96
97def _replace_value(attribute, value, text):
98 """Search and replace value of attribute in text."""
99 old_line, old_value = _get_line_and_value(attribute, text)
100 new_line = old_line.replace(old_value, value)
101 new_text = text.replace(old_line, new_line)
102 return new_text
103
104def _fetch_page(url):
105 r = requests.get(url)
106 if r.status_code == requests.codes.ok:
107 return r.json()
108 else:
109 raise ValueError("request for {} failed".format(url))
110
111
112SEMVER = {
113 'major' : 0,
114 'minor' : 1,
115 'patch' : 2,
116}
117
118
119def _determine_latest_version(current_version, target, versions):
120 """Determine latest version, given `target`.
121 """
122 current_version = Version(current_version)
123
124 def _parse_versions(versions):
125 for v in versions:
126 try:
127 yield Version(v)
128 except InvalidVersion:
129 pass
130
131 versions = _parse_versions(versions)
132
133 index = SEMVER[target]
134
135 ceiling = list(current_version[0:index])
136 if len(ceiling) == 0:
137 ceiling = None
138 else:
139 ceiling[-1]+=1
140 ceiling = Version(".".join(map(str, ceiling)))
141
142 # We do not want prereleases
143 versions = SpecifierSet(prereleases=PRERELEASES).filter(versions)
144
145 if ceiling is not None:
146 versions = SpecifierSet(f"<{ceiling}").filter(versions)
147
148 return (max(sorted(versions))).raw_version
149
150
151def _get_latest_version_pypi(package, extension, current_version, target):
152 """Get latest version and hash from PyPI."""
153 url = "{}/{}/json".format(INDEX, package)
154 json = _fetch_page(url)
155
156 versions = json['releases'].keys()
157 version = _determine_latest_version(current_version, target, versions)
158
159 try:
160 releases = json['releases'][version]
161 except KeyError as e:
162 raise KeyError('Could not find version {} for {}'.format(version, package)) from e
163 for release in releases:
164 if release['filename'].endswith(extension):
165 # TODO: In case of wheel we need to do further checks!
166 sha256 = release['digests']['sha256']
167 break
168 else:
169 sha256 = None
170 return version, sha256
171
172
173def _get_latest_version_github(package, extension, current_version, target):
174 raise ValueError("updating from GitHub is not yet supported.")
175
176
177FETCHERS = {
178 'fetchFromGitHub' : _get_latest_version_github,
179 'fetchPypi' : _get_latest_version_pypi,
180 'fetchurl' : _get_latest_version_pypi,
181}
182
183
184DEFAULT_SETUPTOOLS_EXTENSION = 'tar.gz'
185
186
187FORMATS = {
188 'setuptools' : DEFAULT_SETUPTOOLS_EXTENSION,
189 'wheel' : 'whl'
190}
191
192def _determine_fetcher(text):
193 # Count occurences of fetchers.
194 nfetchers = sum(text.count('src = {}'.format(fetcher)) for fetcher in FETCHERS.keys())
195 if nfetchers == 0:
196 raise ValueError("no fetcher.")
197 elif nfetchers > 1:
198 raise ValueError("multiple fetchers.")
199 else:
200 # Then we check which fetcher to use.
201 for fetcher in FETCHERS.keys():
202 if 'src = {}'.format(fetcher) in text:
203 return fetcher
204
205
206def _determine_extension(text, fetcher):
207 """Determine what extension is used in the expression.
208
209 If we use:
210 - fetchPypi, we check if format is specified.
211 - fetchurl, we determine the extension from the url.
212 - fetchFromGitHub we simply use `.tar.gz`.
213 """
214 if fetcher == 'fetchPypi':
215 try:
216 src_format = _get_unique_value('format', text)
217 except ValueError as e:
218 src_format = None # format was not given
219
220 try:
221 extension = _get_unique_value('extension', text)
222 except ValueError as e:
223 extension = None # extension was not given
224
225 if extension is None:
226 if src_format is None:
227 src_format = 'setuptools'
228 elif src_format == 'flit':
229 raise ValueError("Don't know how to update a Flit package.")
230 elif src_format == 'other':
231 raise ValueError("Don't know how to update a format='other' package.")
232 elif src_format == 'pyproject':
233 raise ValueError("Don't know how to update a pyproject package.")
234 extension = FORMATS[src_format]
235
236 elif fetcher == 'fetchurl':
237 url = _get_unique_value('url', text)
238 extension = os.path.splitext(url)[1]
239 if 'pypi' not in url:
240 raise ValueError('url does not point to PyPI.')
241
242 elif fetcher == 'fetchFromGitHub':
243 raise ValueError('updating from GitHub is not yet implemented.')
244
245 return extension
246
247
248def _update_package(path, target):
249
250 # Read the expression
251 with open(path, 'r') as f:
252 text = f.read()
253
254 # Determine pname.
255 pname = _get_unique_value('pname', text)
256
257 # Determine version.
258 version = _get_unique_value('version', text)
259
260 # First we check how many fetchers are mentioned.
261 fetcher = _determine_fetcher(text)
262
263 extension = _determine_extension(text, fetcher)
264
265 new_version, new_sha256 = FETCHERS[fetcher](pname, extension, version, target)
266
267 if new_version == version:
268 logging.info("Path {}: no update available for {}.".format(path, pname))
269 return False
270 elif Version(new_version) <= Version(version):
271 raise ValueError("downgrade for {}.".format(pname))
272 if not new_sha256:
273 raise ValueError("no file available for {}.".format(pname))
274
275 text = _replace_value('version', new_version, text)
276 text = _replace_value('sha256', new_sha256, text)
277
278 with open(path, 'w') as f:
279 f.write(text)
280
281 logging.info("Path {}: updated {} from {} to {}".format(path, pname, version, new_version))
282
283 result = {
284 'path' : path,
285 'target': target,
286 'pname': pname,
287 'old_version' : version,
288 'new_version' : new_version,
289 #'fetcher' : fetcher,
290 }
291
292 return result
293
294
295def _update(path, target):
296
297 # We need to read and modify a Nix expression.
298 if os.path.isdir(path):
299 path = os.path.join(path, 'default.nix')
300
301 # If a default.nix does not exist, we quit.
302 if not os.path.isfile(path):
303 logging.info("Path {}: does not exist.".format(path))
304 return False
305
306 # If file is not a Nix expression, we quit.
307 if not path.endswith(".nix"):
308 logging.info("Path {}: does not end with `.nix`.".format(path))
309 return False
310
311 try:
312 return _update_package(path, target)
313 except ValueError as e:
314 logging.warning("Path {}: {}".format(path, e))
315 return False
316
317
318def _commit(path, pname, old_version, new_version, pkgs_prefix="python: ", **kwargs):
319 """Commit result.
320 """
321
322 msg = f'{pkgs_prefix}{pname}: {old_version} -> {new_version}'
323
324 try:
325 subprocess.check_call([GIT, 'add', path])
326 subprocess.check_call([GIT, 'commit', '-m', msg])
327 except subprocess.CalledProcessError as e:
328 subprocess.check_call([GIT, 'checkout', path])
329 raise subprocess.CalledProcessError(f'Could not commit {path}') from e
330
331 return True
332
333
334def main():
335
336 parser = argparse.ArgumentParser()
337 parser.add_argument('package', type=str, nargs='+')
338 parser.add_argument('--target', type=str, choices=SEMVER.keys(), default='major')
339 parser.add_argument('--commit', action='store_true', help='Create a commit for each package update')
340 parser.add_argument('--use-pkgs-prefix', action='store_true', help='Use python3Packages.${pname}: instead of python: ${pname}: when making commits')
341
342 args = parser.parse_args()
343 target = args.target
344
345 packages = list(map(os.path.abspath, args.package))
346
347 logging.info("Updating packages...")
348
349 # Use threads to update packages concurrently
350 with Pool() as p:
351 results = list(filter(bool, p.map(lambda pkg: _update(pkg, target), packages)))
352
353 logging.info("Finished updating packages.")
354
355 commit_options = {}
356 if args.use_pkgs_prefix:
357 logging.info("Using python3Packages. prefix for commits")
358 commit_options["pkgs_prefix"] = "python3Packages."
359
360 # Commits are created sequentially.
361 if args.commit:
362 logging.info("Committing updates...")
363 # list forces evaluation
364 list(map(lambda x: _commit(**x, **commit_options), results))
365 logging.info("Finished committing updates")
366
367 count = len(results)
368 logging.info("{} package(s) updated".format(count))
369
370
371
372if __name__ == '__main__':
373 main()