lol

enpass: update update script

+58 -79
+58 -79
pkgs/tools/security/enpass/update_script.py
··· 1 - from __future__ import print_function 2 - 3 - 4 - import argparse 5 - import bz2 6 - import email 1 + #! /usr/bin/env nix-shell 2 + #! nix-shell -i python3 -p python3 python3.pkgs.packaging python3.pkgs.requests 3 + import gzip 7 4 import json 8 5 import logging 6 + import pathlib 7 + import re 8 + import subprocess 9 + import sys 9 10 10 - from itertools import product 11 - from operator import itemgetter 11 + from packaging import version 12 + import requests 12 13 13 - import attr 14 - import pkg_resources 14 + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) 15 15 16 - from pathlib2 import Path 17 - from requests import Session 18 - from six.moves.urllib_parse import urljoin 16 + current_path = pathlib.Path(__file__).parent 17 + DATA_JSON = current_path.joinpath("data.json").resolve() 18 + logging.debug(f"Path to version file: {DATA_JSON}") 19 + last_new_version = None 19 20 21 + with open(DATA_JSON, "r") as versions_file: 22 + versions = json.load(versions_file) 20 23 21 - @attr.s 22 - class ReleaseElement(object): 23 - sha256 = attr.ib(repr=False) 24 - size = attr.ib(convert=int) 25 - path = attr.ib() 24 + def find_latest_version(arch): 25 + CHECK_URL = f'https://apt.enpass.io/dists/stable/main/binary-{arch}/Packages.gz' 26 + packages = gzip.decompress(requests.get(CHECK_URL).content).decode() 26 27 27 - log = logging.getLogger('enpass.updater') 28 + # Loop every package to find the newest one! 29 + version_selector = re.compile("Version: (?P<version>.+)") 30 + path_selector = re.compile("Filename: (?P<path>.+)") 31 + hash_selector = re.compile("SHA256: (?P<sha256>.+)") 32 + last_version = version.parse("0") 33 + for package in packages.split("\n\n"): 34 + matches = version_selector.search(package) 35 + matched_version = matches.group('version') if matches and matches.group('version') else "0" 36 + parsed_version = version.parse(matched_version) 37 + if parsed_version > last_version: 38 + path = path_selector.search(package).group('path') 39 + sha256 = hash_selector.search(package).group('sha256') 40 + last_version = parsed_version 41 + return {"path": path, "sha256": sha256, "version": matched_version} 28 42 43 + for arch in versions.keys(): 44 + current_version = versions[arch]['version'] 45 + logging.info(f"Current Version for {arch} is {current_version}") 46 + new_version = find_latest_version(arch) 29 47 30 - parser = argparse.ArgumentParser() 31 - parser.add_argument('--repo') 32 - parser.add_argument('--target', type=Path) 48 + if not new_version or new_version['version'] == current_version: 49 + continue 33 50 51 + last_current_version = current_version 52 + last_new_version = new_version 53 + logging.info(f"Update found ({arch}): enpass: {current_version} -> {new_version['version']}") 54 + versions[arch]['path'] = new_version['path'] 55 + versions[arch]['sha256'] = new_version['sha256'] 56 + versions[arch]['version'] = new_version['version'] 34 57 35 - session = Session() 36 58 37 - 38 - def parse_bz2_msg(msg): 39 - msg = bz2.decompress(msg) 40 - if '\n\n' in msg: 41 - parts = msg.split('\n\n') 42 - return list(map(email.message_from_string, parts)) 43 - return email.message_from_string(msg) 44 - 45 - 46 - def fetch_meta(repo, name, parse=email.message_from_string, split=False): 47 - url = urljoin(repo, 'dists/stable', name) 48 - response = session.get("{repo}/dists/stable/{name}".format(**locals())) 49 - return parse(response.content) 50 - 51 - 52 - def fetch_filehashes(repo, path): 53 - meta = fetch_meta(repo, path, parse=parse_bz2_msg) 54 - for item in meta: 55 - yield { 56 - 'version': pkg_resources.parse_version(str(item['Version'])), 57 - 'path': item['Filename'], 58 - 'sha256': item['sha256'], 59 - } 60 - 61 - 62 - def fetch_archs(repo): 63 - m = fetch_meta(repo, 'Release') 64 - 65 - architectures = m['Architectures'].split() 66 - elements = [ReleaseElement(*x.split()) for x in m['SHA256'].splitlines()] 67 - elements = [x for x in elements if x.path.endswith('bz2')] 68 - 69 - for arch, elem in product(architectures, elements): 70 - if arch in elem.path: 71 - yield arch, max(fetch_filehashes(repo, elem.path), 72 - key=itemgetter('version')) 73 - 59 + if not last_new_version: 60 + logging.info('#### No update found ####') 61 + sys.exit(0) 74 62 75 - class OurVersionEncoder(json.JSONEncoder): 76 - def default(self, obj): 77 - # the other way around to avoid issues with 78 - # newer setuptools having strict/legacy versions 79 - if not isinstance(obj, (dict, str)): 80 - return str(obj) 81 - return json.JSONEncoder.default(self, obj) 82 - 83 - 84 - def main(repo, target): 85 - logging.basicConfig(level=logging.DEBUG) 86 - with target.open(mode='wb') as fp: 87 - json.dump( 88 - dict(fetch_archs(repo)), fp, 89 - cls=OurVersionEncoder, 90 - indent=2, 91 - sort_keys=True) 63 + # write new versions back 64 + with open(DATA_JSON, "w") as versions_file: 65 + json.dump(versions, versions_file, indent=2) 66 + versions_file.write("\n") 92 67 68 + # Commit the result: 69 + logging.info("Committing changes...") 70 + commit_message = f"enpass: {last_current_version} -> {last_new_version['version']}" 71 + subprocess.run(['git', 'add', DATA_JSON], check=True) 72 + subprocess.run(['git', 'commit', '--file=-'], input=commit_message.encode(), check=True) 93 73 94 - opts = parser.parse_args() 95 - main(opts.repo, opts.target) 74 + logging.info("Done.")