Clone of https://github.com/NixOS/nixpkgs.git (to stress-test knotserver)
at devShellTools-shell 269 lines 8.8 kB view raw
1#!/usr/bin/env nix-shell 2#!nix-shell -I nixpkgs=channel:nixpkgs-unstable -i python3 -p "python3.withPackages (ps: with ps; [ aiohttp packaging ])" -p git nurl pyright ruff isort 3 4import argparse 5import asyncio 6import json 7import os 8import re 9import sys 10from subprocess import check_output, run 11from typing import Dict, Final, List, Optional, Union 12 13import aiohttp 14from aiohttp import ClientSession 15from packaging.version import Version 16 17ROOT: Final = check_output([ 18 "git", 19 "rev-parse", 20 "--show-toplevel", 21]).decode().strip() 22 23 24def run_sync(cmd: List[str]) -> None: 25 print(f"$ {' '.join(cmd)}") 26 process = run(cmd) 27 28 if process.returncode != 0: 29 sys.exit(1) 30 31 32async def check_async(cmd: List[str]) -> str: 33 print(f"$ {' '.join(cmd)}") 34 process = await asyncio.create_subprocess_exec( 35 *cmd, 36 stdout=asyncio.subprocess.PIPE, 37 stderr=asyncio.subprocess.PIPE 38 ) 39 stdout, stderr = await process.communicate() 40 41 if process.returncode != 0: 42 error = stderr.decode() 43 raise RuntimeError(f"{cmd[0]} failed: {error}") 44 45 return stdout.decode().strip() 46 47 48async def run_async(cmd: List[str]): 49 print(f"$ {' '.join(cmd)}") 50 51 process = await asyncio.create_subprocess_exec( 52 *cmd, 53 stdout=asyncio.subprocess.PIPE, 54 stderr=asyncio.subprocess.PIPE, 55 ) 56 stdout, stderr = await process.communicate() 57 58 print(stdout.decode()) 59 60 if process.returncode != 0: 61 error = stderr.decode() 62 raise RuntimeError(f"{cmd[0]} failed: {error}") 63 64 65class File: 66 def __init__(self, path: str): 67 self.path = os.path.join(ROOT, path) 68 69 def __enter__(self): 70 with open(self.path, "r") as handle: 71 self.text = handle.read() 72 return self 73 74 def get_exact_match(self, attr: str, value: str): 75 matches = re.findall( 76 rf'{re.escape(attr)}\s+=\s+\"?{re.escape(value)}\"?', 77 self.text 78 ) 79 80 n = len(matches) 81 if n > 1: 82 raise ValueError(f"multiple occurrences found for {attr}={value}") 83 elif n == 1: 84 return matches.pop() 85 else: 86 raise ValueError(f"no occurrence found for {attr}={value}") 87 88 def substitute(self, attr: str, old_value: str, new_value: str) -> None: 89 old_line = self.get_exact_match(attr, old_value) 90 new_line = old_line.replace(old_value, new_value) 91 self.text = self.text.replace(old_line, new_line) 92 print(f"Substitute `{attr}` value `{old_value}` with `{new_value}`") 93 94 def __exit__(self, exc_type, exc_val, exc_tb): 95 with open(self.path, "w") as handle: 96 handle.write(self.text) 97 98class Nurl: 99 @classmethod 100 async def prefetch(cls, url: str, version: str, *extra_args: str) -> str: 101 cmd = [ 102 "nurl", 103 "--hash", 104 url, 105 version, 106 ] 107 cmd.extend(extra_args) 108 return await check_async(cmd) 109 110 111class Nix: 112 base_cmd: Final = [ 113 "nix", 114 "--show-trace", 115 "--extra-experimental-features", "nix-command" 116 ] 117 118 @classmethod 119 async def _run(cls, args: List[str]) -> Optional[str]: 120 return await check_async(cls.base_cmd + args) 121 122 @classmethod 123 async def eval(cls, expr: str) -> Union[List, Dict, int, float, str, bool]: 124 response = await cls._run([ 125 "eval", 126 "-f", f"{ROOT}/default.nix", 127 "--json", 128 expr 129 ]) 130 if response is None: 131 raise RuntimeError("Nix eval expression returned no response") 132 try: 133 return json.loads(response) 134 except (TypeError, ValueError): 135 raise RuntimeError("Nix eval response could not be parsed from JSON") 136 137 @classmethod 138 async def hash_to_sri(cls, algorithm: str, value: str) -> Optional[str]: 139 return await cls._run([ 140 "hash", 141 "to-sri", 142 "--type", algorithm, 143 value 144 ]) 145 146 147class HomeAssistant: 148 def __init__(self, session: ClientSession): 149 self._session = session 150 151 async def get_latest_core_version( 152 self, 153 owner: str = "home-assistant", 154 repo: str = "core" 155 ) -> str: 156 async with self._session.get( 157 f"https://api.github.com/repos/{owner}/{repo}/releases/latest" 158 ) as response: 159 document = await response.json() 160 try: 161 return str(document.get("name")) 162 except KeyError: 163 raise RuntimeError("No tag name in response document") 164 165 166 async def get_latest_frontend_version( 167 self, 168 core_version: str 169 ) -> str: 170 async with self._session.get( 171 f"https://raw.githubusercontent.com/home-assistant/core/{core_version}/homeassistant/components/frontend/manifest.json" 172 ) as response: 173 document = await response.json(content_type="text/plain") 174 175 requirements = [ 176 requirement 177 for requirement in document.get("requirements", []) 178 if requirement.startswith("home-assistant-frontend==") 179 ] 180 181 if len(requirements) > 1: 182 raise RuntimeError( 183 "Found more than one version specifier for the frontend package" 184 ) 185 elif len(requirements) == 1: 186 requirement = requirements.pop() 187 _, version = requirement.split("==", maxsplit=1) 188 return str(version) 189 else: 190 raise RuntimeError( 191 "Found no version specifier for frontend package" 192 ) 193 194 195 async def update_core(self, old_version: str, new_version: str) -> None: 196 old_sdist_hash = str(await Nix.eval("home-assistant.sdist.outputHash")) 197 new_sdist_hash = await Nurl.prefetch("https://pypi.org/project/homeassistant/", new_version) 198 print(f"sdist: {old_sdist_hash} -> {new_sdist_hash}") 199 200 old_git_hash = str(await Nix.eval("home-assistant.src.outputHash")) 201 new_git_hash = await Nurl.prefetch("https://github.com/home-assistant/core/", new_version) 202 print(f"git: {old_git_hash} -> {new_git_hash}") 203 204 with File("pkgs/servers/home-assistant/default.nix") as file: 205 file.substitute("hassVersion", old_version, new_version) 206 file.substitute("hash", old_sdist_hash, new_sdist_hash) 207 file.substitute("hash", old_git_hash, new_git_hash) 208 209 async def update_frontend(self, old_version: str, new_version: str) -> None: 210 old_hash = str(await Nix.eval("home-assistant.frontend.src.outputHash")) 211 new_hash = await Nurl.prefetch( 212 "https://pypi.org/project/home_assistant_frontend/", 213 new_version, 214 "-A", "format", "wheel", 215 "-A", "dist", "py3", 216 "-A", "python", "py3" 217 ) 218 print(f"frontend: {old_hash} -> {new_hash}") 219 220 with File("pkgs/servers/home-assistant/frontend.nix") as file: 221 file.substitute("version", old_version, new_version) 222 file.substitute("hash", old_hash, new_hash) 223 224 async def update_components(self): 225 await run_async([ 226 f"{ROOT}/pkgs/servers/home-assistant/update-component-packages.py" 227 ]) 228 229 230async def main(target_version: Optional[str] = None): 231 headers = {} 232 if token := os.environ.get("GITHUB_TOKEN", None): 233 headers.update({"GITHUB_TOKEN": token}) 234 235 async with aiohttp.ClientSession(headers=headers) as client: 236 hass = HomeAssistant(client) 237 238 core_current = str(await Nix.eval("home-assistant.version")) 239 core_latest = target_version or await hass.get_latest_core_version() 240 241 if Version(core_latest) > Version(core_current): 242 print(f"New Home Assistant version {core_latest} is available") 243 await hass.update_core(str(core_current), str(core_latest)) 244 245 frontend_current = str(await Nix.eval("home-assistant.frontend.version")) 246 frontend_latest = await hass.get_latest_frontend_version(str(core_latest)) 247 248 if Version(frontend_latest) > Version(frontend_current): 249 await hass.update_frontend(str(frontend_current), str(frontend_latest)) 250 251 await hass.update_components() 252 253 else: 254 print(f"Home Assistant {core_current} is still the latest version.") 255 256 # wait for async client sessions to close 257 # https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown 258 await asyncio.sleep(0) 259 260if __name__ == "__main__": 261 parser = argparse.ArgumentParser() 262 parser.add_argument("version", nargs="?") 263 args = parser.parse_args() 264 265 run_sync(["pyright", __file__]) 266 run_sync(["ruff", "check", "--ignore=E501", __file__]) 267 run_sync(["isort", __file__]) 268 269 asyncio.run(main(args.version))