1# python library used to update plugins:
2# - pkgs/applications/editors/vim/plugins/update.py
3# - pkgs/applications/editors/kakoune/plugins/update.py
4# - pkgs/development/lua-modules/updater/updater.py
5
6# format:
7# $ nix run nixpkgs#ruff maintainers/scripts/pluginupdate.py
8# type-check:
9# $ nix run nixpkgs#python3.pkgs.mypy maintainers/scripts/pluginupdate.py
10# linted:
11# $ nix run nixpkgs#python3.pkgs.flake8 -- --ignore E501,E265 maintainers/scripts/pluginupdate.py
12
13import argparse
14import csv
15import functools
16import http
17import json
18import logging
19import os
20import re
21import subprocess
22import sys
23import time
24import traceback
25import urllib.error
26import urllib.parse
27import urllib.request
28import xml.etree.ElementTree as ET
29from dataclasses import asdict, dataclass
30from datetime import UTC, datetime
31from functools import wraps
32from multiprocessing.dummy import Pool
33from pathlib import Path
34from tempfile import NamedTemporaryFile
35from typing import Any, Callable
36from urllib.parse import urljoin, urlparse
37
38import git
39
40ATOM_ENTRY = "{http://www.w3.org/2005/Atom}entry" # " vim gets confused here
41ATOM_LINK = "{http://www.w3.org/2005/Atom}link" # "
42ATOM_UPDATED = "{http://www.w3.org/2005/Atom}updated" # "
43
44LOG_LEVELS = {
45 logging.getLevelName(level): level
46 for level in [logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR]
47}
48
49log = logging.getLogger()
50
51
52def retry(ExceptionToCheck: Any, tries: int = 4, delay: float = 3, backoff: float = 2):
53 """Retry calling the decorated function using an exponential backoff.
54 http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/
55 original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
56 (BSD licensed)
57 :param ExceptionToCheck: the exception on which to retry
58 :param tries: number of times to try (not retry) before giving up
59 :param delay: initial delay between retries in seconds
60 :param backoff: backoff multiplier e.g. value of 2 will double the delay
61 each retry
62 """
63
64 def deco_retry(f: Callable) -> Callable:
65 @wraps(f)
66 def f_retry(*args: Any, **kwargs: Any) -> Any:
67 mtries, mdelay = tries, delay
68 while mtries > 1:
69 try:
70 return f(*args, **kwargs)
71 except ExceptionToCheck as e:
72 print(f"{str(e)}, Retrying in {mdelay} seconds...")
73 time.sleep(mdelay)
74 mtries -= 1
75 mdelay *= backoff
76 return f(*args, **kwargs)
77
78 return f_retry # true decorator
79
80 return deco_retry
81
82
83@dataclass
84class FetchConfig:
85 proc: int
86 github_token: str
87
88
89def make_request(url: str, token=None) -> urllib.request.Request:
90 headers = {}
91 if token is not None:
92 headers["Authorization"] = f"token {token}"
93 return urllib.request.Request(url, headers=headers)
94
95
96# a dictionary of plugins and their new repositories
97Redirects = dict["PluginDesc", "Repo"]
98
99
100class Repo:
101 def __init__(self, uri: str, branch: str) -> None:
102 self.uri = uri
103 """Url to the repo"""
104 self._branch = branch
105 # Redirect is the new Repo to use
106 self.redirect: "Repo | None" = None
107 self.token = "dummy_token"
108
109 @property
110 def name(self):
111 return self.uri.strip("/").split("/")[-1]
112
113 @property
114 def branch(self):
115 return self._branch or "HEAD"
116
117 def __str__(self) -> str:
118 return f"{self.uri}"
119
120 def __repr__(self) -> str:
121 return f"Repo({self.name}, {self.uri})"
122
123 @retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
124 def has_submodules(self) -> bool:
125 return True
126
127 @retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
128 def latest_commit(self) -> tuple[str, datetime]:
129 log.debug("Latest commit")
130 loaded = self._prefetch(None)
131 updated = datetime.strptime(loaded["date"], "%Y-%m-%dT%H:%M:%S%z")
132
133 return loaded["rev"], updated
134
135 def _prefetch(self, ref: str | None):
136 cmd = ["nix-prefetch-git", "--quiet", "--fetch-submodules", self.uri]
137 if ref is not None:
138 cmd.append(ref)
139 log.debug(cmd)
140 data = subprocess.check_output(cmd)
141 loaded = json.loads(data)
142 return loaded
143
144 def prefetch(self, ref: str | None) -> str:
145 log.info("Prefetching %s", self.uri)
146 loaded = self._prefetch(ref)
147 return loaded["sha256"]
148
149 def as_nix(self, plugin: "Plugin") -> str:
150 return f"""fetchgit {{
151 url = "{self.uri}";
152 rev = "{plugin.commit}";
153 sha256 = "{plugin.sha256}";
154 }}"""
155
156
157class RepoGitHub(Repo):
158 def __init__(self, owner: str, repo: str, branch: str) -> None:
159 self.owner = owner
160 self.repo = repo
161 self.token = None
162 """Url to the repo"""
163 super().__init__(self.url(""), branch)
164 log.debug(
165 "Instantiating github repo owner=%s and repo=%s", self.owner, self.repo
166 )
167
168 @property
169 def name(self):
170 return self.repo
171
172 def url(self, path: str) -> str:
173 res = urljoin(f"https://github.com/{self.owner}/{self.repo}/", path)
174 return res
175
176 @retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
177 def has_submodules(self) -> bool:
178 try:
179 req = make_request(self.url(f"blob/{self.branch}/.gitmodules"), self.token)
180 urllib.request.urlopen(req, timeout=10).close()
181 except urllib.error.HTTPError as e:
182 if e.code == 404:
183 return False
184 else:
185 raise
186 return True
187
188 @retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
189 def latest_commit(self) -> tuple[str, datetime]:
190 commit_url = self.url(f"commits/{self.branch}.atom")
191 log.debug("Sending request to %s", commit_url)
192 commit_req = make_request(commit_url, self.token)
193 with urllib.request.urlopen(commit_req, timeout=10) as req:
194 self._check_for_redirect(commit_url, req)
195 xml = req.read()
196
197 # Filter out illegal XML characters
198 illegal_xml_regex = re.compile(b"[\x00-\x08\x0b-\x0c\x0e-\x1f\x7f]")
199 xml = illegal_xml_regex.sub(b"", xml)
200
201 root = ET.fromstring(xml)
202 latest_entry = root.find(ATOM_ENTRY)
203 assert latest_entry is not None, f"No commits found in repository {self}"
204 commit_link = latest_entry.find(ATOM_LINK)
205 assert commit_link is not None, f"No link tag found feed entry {xml}"
206 url = urlparse(commit_link.get("href"))
207 updated_tag = latest_entry.find(ATOM_UPDATED)
208 assert (
209 updated_tag is not None and updated_tag.text is not None
210 ), f"No updated tag found feed entry {xml}"
211 updated = datetime.strptime(updated_tag.text, "%Y-%m-%dT%H:%M:%SZ")
212 return Path(str(url.path)).name, updated
213
214 def _check_for_redirect(self, url: str, req: http.client.HTTPResponse):
215 response_url = req.geturl()
216 if url != response_url:
217 new_owner, new_name = (
218 urllib.parse.urlsplit(response_url).path.strip("/").split("/")[:2]
219 )
220
221 new_repo = RepoGitHub(owner=new_owner, repo=new_name, branch=self.branch)
222 self.redirect = new_repo
223
224 def prefetch(self, commit: str) -> str:
225 if self.has_submodules():
226 sha256 = super().prefetch(commit)
227 else:
228 sha256 = self.prefetch_github(commit)
229 return sha256
230
231 def prefetch_github(self, ref: str) -> str:
232 cmd = ["nix-prefetch-url", "--unpack", self.url(f"archive/{ref}.tar.gz")]
233 log.debug("Running %s", cmd)
234 data = subprocess.check_output(cmd)
235 return data.strip().decode("utf-8")
236
237 def as_nix(self, plugin: "Plugin") -> str:
238 if plugin.has_submodules:
239 submodule_attr = "\n fetchSubmodules = true;"
240 else:
241 submodule_attr = ""
242
243 return f"""fetchFromGitHub {{
244 owner = "{self.owner}";
245 repo = "{self.repo}";
246 rev = "{plugin.commit}";
247 sha256 = "{plugin.sha256}";{submodule_attr}
248 }}"""
249
250
251@dataclass(frozen=True)
252class PluginDesc:
253 repo: Repo
254 branch: str
255 alias: str | None
256
257 @property
258 def name(self):
259 return self.alias or self.repo.name
260
261 @staticmethod
262 def load_from_csv(config: FetchConfig, row: dict[str, str]) -> "PluginDesc":
263 log.debug("Loading row %s", row)
264 branch = row["branch"]
265 repo = make_repo(row["repo"], branch.strip())
266 repo.token = config.github_token
267 return PluginDesc(
268 repo,
269 branch.strip(),
270 # alias is usually an empty string
271 row["alias"] if row["alias"] else None,
272 )
273
274 @staticmethod
275 def load_from_string(config: FetchConfig, line: str) -> "PluginDesc":
276 branch = "HEAD"
277 alias = None
278 uri = line
279 if " as " in uri:
280 uri, alias = uri.split(" as ")
281 alias = alias.strip()
282 if "@" in uri:
283 uri, branch = uri.split("@")
284 repo = make_repo(uri.strip(), branch.strip())
285 repo.token = config.github_token
286 return PluginDesc(repo, branch.strip(), alias)
287
288
289@dataclass
290class Plugin:
291 name: str
292 commit: str
293 has_submodules: bool
294 sha256: str
295 date: datetime | None = None
296
297 @property
298 def normalized_name(self) -> str:
299 return self.name.replace(".", "-")
300
301 @property
302 def version(self) -> str:
303 assert self.date is not None
304 return self.date.strftime("%Y-%m-%d")
305
306 def as_json(self) -> dict[str, str]:
307 copy = self.__dict__.copy()
308 del copy["date"]
309 return copy
310
311
312def load_plugins_from_csv(
313 config: FetchConfig,
314 input_file: Path,
315) -> list[PluginDesc]:
316 log.debug("Load plugins from csv %s", input_file)
317 plugins = []
318 with open(input_file, newline="") as csvfile:
319 log.debug("Writing into %s", input_file)
320 reader = csv.DictReader(
321 csvfile,
322 )
323 for line in reader:
324 plugin = PluginDesc.load_from_csv(config, line)
325 plugins.append(plugin)
326
327 return plugins
328
329
330def run_nix_expr(expr, nixpkgs: str, **args):
331 """
332 :param expr nix expression to fetch current plugins
333 :param nixpkgs Path towards a nixpkgs checkout
334 """
335 with CleanEnvironment(nixpkgs) as nix_path:
336 cmd = [
337 "nix",
338 "eval",
339 "--extra-experimental-features",
340 "nix-command",
341 "--impure",
342 "--json",
343 "--expr",
344 expr,
345 "--nix-path",
346 nix_path,
347 ]
348 log.debug("Running command: %s", " ".join(cmd))
349 out = subprocess.check_output(cmd, **args)
350 data = json.loads(out)
351 return data
352
353
354class Editor:
355 """The configuration of the update script."""
356
357 def __init__(
358 self,
359 name: str,
360 root: Path,
361 get_plugins: str,
362 default_in: Path | None = None,
363 default_out: Path | None = None,
364 deprecated: Path | None = None,
365 cache_file: str | None = None,
366 ):
367 log.debug("get_plugins:", get_plugins)
368 self.name = name
369 self.root = root
370 self.get_plugins = get_plugins
371 self.default_in = default_in or root.joinpath(f"{name}-plugin-names")
372 self.default_out = default_out or root.joinpath("generated.nix")
373 self.deprecated = deprecated or root.joinpath("deprecated.json")
374 self.cache_file = cache_file or f"{name}-plugin-cache.json"
375 self.nixpkgs_repo = None
376
377 def add(self, args):
378 """CSV spec"""
379 log.debug("called the 'add' command")
380 fetch_config = FetchConfig(args.proc, args.github_token)
381 editor = self
382 for plugin_line in args.add_plugins:
383 log.debug("using plugin_line %s", plugin_line)
384 pdesc = PluginDesc.load_from_string(fetch_config, plugin_line)
385 log.debug("loaded as pdesc %s", pdesc)
386 append = [pdesc]
387 editor.rewrite_input(
388 fetch_config, args.input_file, editor.deprecated, append=append
389 )
390 plugin, _ = prefetch_plugin(pdesc)
391
392 if ( # lua updater doesn't support updating individual plugin
393 self.name != "lua"
394 ):
395 # update generated.nix
396 update = self.get_update(
397 args.input_file,
398 args.outfile,
399 fetch_config,
400 [plugin.normalized_name],
401 )
402 update()
403
404 autocommit = not args.no_commit
405 if autocommit:
406 commit(
407 editor.nixpkgs_repo,
408 "{drv_name}: init at {version}".format(
409 drv_name=editor.get_drv_name(plugin.normalized_name),
410 version=plugin.version,
411 ),
412 [args.outfile, args.input_file],
413 )
414
415 # Expects arguments generated by 'update' subparser
416 def update(self, args):
417 """CSV spec"""
418 print("the update member function should be overridden in subclasses")
419
420 def get_current_plugins(
421 self, config: FetchConfig, nixpkgs: str
422 ) -> list[tuple[PluginDesc, Plugin]]:
423 """To fill the cache"""
424 data = run_nix_expr(self.get_plugins, nixpkgs)
425 plugins = []
426 for name, attr in data.items():
427 checksum = attr["checksum"]
428
429 # https://github.com/NixOS/nixpkgs/blob/8a335419/pkgs/applications/editors/neovim/build-neovim-plugin.nix#L36
430 # https://github.com/NixOS/nixpkgs/pull/344478#discussion_r1786646055
431 version = re.search(r"\d\d\d\d-\d\d?-\d\d?", attr["version"])
432 if version is None:
433 raise ValueError(f"Cannot parse version: {attr['version']}")
434 date = datetime.strptime(version.group(), "%Y-%m-%d")
435
436 pdesc = PluginDesc.load_from_string(config, f'{attr["homePage"]} as {name}')
437 p = Plugin(
438 attr["pname"],
439 checksum["rev"],
440 checksum["submodules"],
441 checksum["sha256"],
442 date,
443 )
444
445 plugins.append((pdesc, p))
446 return plugins
447
448 def load_plugin_spec(self, config: FetchConfig, plugin_file) -> list[PluginDesc]:
449 """CSV spec"""
450 return load_plugins_from_csv(config, plugin_file)
451
452 def generate_nix(self, _plugins, _outfile: str):
453 """Returns nothing for now, writes directly to outfile"""
454 raise NotImplementedError()
455
456 def filter_plugins_to_update(
457 self, plugin: PluginDesc, to_update: list[str]
458 ) -> bool:
459 """Function for filtering out plugins, that user doesn't want to update.
460
461 It is mainly used for updating only specific plugins, not all of them.
462 By default it filters out plugins not present in `to_update`,
463 assuming `to_update` is a list of plugin names (the same as in the
464 result expression).
465
466 This function is never called if `to_update` is empty.
467 Feel free to override this function in derived classes.
468
469 Note:
470 Known bug: you have to use a deprecated name, instead of new one.
471 This is because we resolve deprecations later and can't get new
472 plugin URL before we request info about it.
473
474 Although, we could parse deprecated.json, but it's a whole bunch
475 of spaghetti code, which I don't want to write.
476
477 Arguments:
478 plugin: Plugin on which you decide whether to ignore or not.
479 to_update:
480 List of strings passed to via the `--update` command line parameter.
481 By default, we assume it is a list of URIs identical to what
482 is in the input file.
483
484 Returns:
485 True if we should update plugin and False if not.
486 """
487 return plugin.name.replace(".", "-") in to_update
488
489 def get_update(
490 self,
491 input_file: str,
492 output_file: str,
493 config: FetchConfig,
494 to_update: list[str] | None,
495 ):
496 if to_update is None:
497 to_update = []
498
499 current_plugins = self.get_current_plugins(config, self.nixpkgs)
500 current_plugin_specs = self.load_plugin_spec(config, input_file)
501
502 cache: Cache = Cache(
503 [plugin for _description, plugin in current_plugins], self.cache_file
504 )
505 _prefetch = functools.partial(prefetch, cache=cache)
506
507 plugins_to_update = (
508 current_plugin_specs
509 if len(to_update) == 0
510 else [
511 description
512 for description in current_plugin_specs
513 if self.filter_plugins_to_update(description, to_update)
514 ]
515 )
516
517 def update() -> Redirects:
518 if len(plugins_to_update) == 0:
519 log.error(
520 "\n\n\n\nIt seems like you provided some arguments to `--update`:\n"
521 + ", ".join(to_update)
522 + "\nBut after filtering, the result list of plugins is empty\n"
523 "\n"
524 "Are you sure you provided the same URIs as in your input file?\n"
525 "(" + str(input_file) + ")\n\n"
526 )
527 return {}
528
529 try:
530 pool = Pool(processes=config.proc)
531 results = pool.map(_prefetch, plugins_to_update)
532 finally:
533 cache.store()
534
535 print(f"{len(results)} of {len(current_plugins)} were checked")
536 # Do only partial update of out file
537 if len(results) != len(current_plugins):
538 results = self.merge_results(current_plugins, results)
539 plugins, redirects = check_results(results)
540
541 plugins = sorted(plugins, key=lambda v: v[1].normalized_name)
542 self.generate_nix(plugins, output_file)
543
544 return redirects
545
546 return update
547
548 def merge_results(
549 self,
550 current: list[tuple[PluginDesc, Plugin]],
551 fetched: list[tuple[PluginDesc, Exception | Plugin, Repo | None]],
552 ) -> list[tuple[PluginDesc, Exception | Plugin, Repo | None]]:
553 # transforming this to dict, so lookup is O(1) instead of O(n) (n is len(current))
554 result: dict[str, tuple[PluginDesc, Exception | Plugin, Repo | None]] = {
555 # also adding redirect (third item in the result tuple)
556 pl.normalized_name: (pdesc, pl, None)
557 for pdesc, pl in current
558 }
559
560 for plugin_desc, plugin, redirect in fetched:
561 # Check if plugin is a Plugin object and has normalized_name attribute
562 if isinstance(plugin, Plugin) and hasattr(plugin, 'normalized_name'):
563 result[plugin.normalized_name] = (plugin_desc, plugin, redirect)
564 elif isinstance(plugin, Exception):
565 # For exceptions, we can't determine the normalized_name
566 # Just log the error and continue
567 log.error(f"Error fetching plugin {plugin_desc.name}: {plugin!r}")
568 else:
569 # For unexpected types, log the issue
570 log.error(f"Unexpected plugin type for {plugin_desc.name}: {type(plugin)}")
571
572 return list(result.values())
573
574 @property
575 def attr_path(self):
576 return self.name + "Plugins"
577
578 def get_drv_name(self, name: str):
579 return self.attr_path + "." + name
580
581 def rewrite_input(self, *args, **kwargs):
582 return rewrite_input(*args, **kwargs)
583
584 def create_parser(self):
585 common = argparse.ArgumentParser(
586 add_help=False,
587 description=(
588 f"""
589 Updates nix derivations for {self.name} plugins.\n
590 By default from {self.default_in} to {self.default_out}"""
591 ),
592 )
593 common.add_argument(
594 "--nixpkgs",
595 type=str,
596 default=os.getcwd(),
597 help="Adjust log level",
598 )
599 common.add_argument(
600 "--input-names",
601 "-i",
602 dest="input_file",
603 type=Path,
604 default=self.default_in,
605 help="A list of plugins in the form owner/repo",
606 )
607 common.add_argument(
608 "--out",
609 "-o",
610 dest="outfile",
611 default=self.default_out,
612 type=Path,
613 help="Filename to save generated nix code",
614 )
615 common.add_argument(
616 "--proc",
617 "-p",
618 dest="proc",
619 type=int,
620 default=30,
621 help="Number of concurrent processes to spawn. Setting --github-token allows higher values.",
622 )
623 common.add_argument(
624 "--github-token",
625 "-t",
626 type=str,
627 default=os.getenv("GITHUB_TOKEN"),
628 help="""Allows to set --proc to higher values.
629 Uses GITHUB_TOKEN environment variables as the default value.""",
630 )
631 common.add_argument(
632 "--no-commit",
633 "-n",
634 action="store_true",
635 default=False,
636 help="Whether to autocommit changes",
637 )
638 common.add_argument(
639 "--debug",
640 "-d",
641 choices=LOG_LEVELS.keys(),
642 default=logging.getLevelName(logging.WARN),
643 help="Adjust log level",
644 )
645
646 main = argparse.ArgumentParser(
647 parents=[common],
648 description=(
649 f"""
650 Updates nix derivations for {self.name} plugins.\n
651 By default from {self.default_in} to {self.default_out}"""
652 ),
653 )
654
655 subparsers = main.add_subparsers(dest="command", required=False)
656 padd = subparsers.add_parser(
657 "add",
658 parents=[],
659 description="Add new plugin",
660 add_help=False,
661 )
662 padd.set_defaults(func=self.add)
663 padd.add_argument(
664 "add_plugins",
665 default=None,
666 nargs="+",
667 help=f"Plugin to add to {self.attr_path} from Github in the form owner/repo",
668 )
669
670 pupdate = subparsers.add_parser(
671 "update",
672 description="Update all or a subset of existing plugins",
673 add_help=False,
674 )
675 pupdate.add_argument(
676 "update_only",
677 default=None,
678 nargs="*",
679 help="Plugin URLs to update (must be the same as in the input file)",
680 )
681 pupdate.set_defaults(func=self.update)
682 return main
683
684 def run(
685 self,
686 ):
687 """
688 Convenience function
689 """
690 parser = self.create_parser()
691 args = parser.parse_args()
692 command = args.command or "update"
693 logging.basicConfig()
694 log.setLevel(LOG_LEVELS[args.debug])
695 log.info("Chose to run command: %s", command)
696 self.nixpkgs = args.nixpkgs
697
698 self.nixpkgs_repo = git.Repo(args.nixpkgs, search_parent_directories=True)
699
700 getattr(self, command)(args)
701
702
703class CleanEnvironment(object):
704 def __init__(self, nixpkgs):
705 self.local_pkgs = nixpkgs
706
707 def __enter__(self) -> str:
708 """
709 local_pkgs = str(Path(__file__).parent.parent.parent)
710 """
711 self.old_environ = os.environ.copy()
712 self.empty_config = NamedTemporaryFile()
713 self.empty_config.write(b"{}")
714 self.empty_config.flush()
715 return f"localpkgs={self.local_pkgs}"
716
717 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
718 os.environ.update(self.old_environ)
719 self.empty_config.close()
720
721
722def prefetch_plugin(
723 p: PluginDesc,
724 cache: "Cache | None" = None,
725) -> tuple[Plugin, Repo | None]:
726 commit = None
727 log.info(f"Fetching last commit for plugin {p.name} from {p.repo.uri}@{p.branch}")
728 commit, date = p.repo.latest_commit()
729
730 cached_plugin = cache[commit] if cache else None
731 if cached_plugin is not None:
732 log.debug(f"Cache hit for {p.name}!")
733 cached_plugin.name = p.name
734 cached_plugin.date = date
735 return cached_plugin, p.repo.redirect
736
737 has_submodules = p.repo.has_submodules()
738 log.debug(f"prefetch {p.name}")
739 sha256 = p.repo.prefetch(commit)
740
741 return (
742 Plugin(p.name, commit, has_submodules, sha256, date=date),
743 p.repo.redirect,
744 )
745
746
747def print_download_error(plugin: PluginDesc, ex: Exception):
748 print(f"{plugin}: {ex}", file=sys.stderr)
749 ex_traceback = ex.__traceback__
750 tb_lines = [
751 line.rstrip("\n")
752 for line in traceback.format_exception(ex.__class__, ex, ex_traceback)
753 ]
754 print("\n".join(tb_lines))
755
756
757def check_results(
758 results: list[tuple[PluginDesc, Exception | Plugin, Repo | None]],
759) -> tuple[list[tuple[PluginDesc, Plugin]], Redirects]:
760 """ """
761 failures: list[tuple[PluginDesc, Exception]] = []
762 plugins = []
763 redirects: Redirects = {}
764 for pdesc, result, redirect in results:
765 if isinstance(result, Exception):
766 failures.append((pdesc, result))
767 else:
768 new_pdesc = pdesc
769 if redirect is not None:
770 redirects.update({pdesc: redirect})
771 new_pdesc = PluginDesc(redirect, pdesc.branch, pdesc.alias)
772 plugins.append((new_pdesc, result))
773
774 if len(failures) == 0:
775 return plugins, redirects
776 else:
777 log.error(f"{len(failures)} plugin(s) could not be downloaded:\n")
778
779 for plugin, exception in failures:
780 print_download_error(plugin, exception)
781
782 sys.exit(1)
783
784
785def make_repo(uri: str, branch) -> Repo:
786 """Instantiate a Repo with the correct specialization depending on server (gitub spec)"""
787 # dumb check to see if it's of the form owner/repo (=> github) or https://...
788 res = urlparse(uri)
789 if res.netloc in ["github.com", ""]:
790 res = res.path.strip("/").split("/")
791 repo = RepoGitHub(res[0], res[1], branch)
792 else:
793 repo = Repo(uri.strip(), branch)
794 return repo
795
796
797def get_cache_path(cache_file_name: str) -> Path | None:
798 xdg_cache = os.environ.get("XDG_CACHE_HOME", None)
799 if xdg_cache is None:
800 home = os.environ.get("HOME", None)
801 if home is None:
802 return None
803 xdg_cache = str(Path(home, ".cache"))
804
805 return Path(xdg_cache, cache_file_name)
806
807
808class Cache:
809 def __init__(self, initial_plugins: list[Plugin], cache_file_name: str) -> None:
810 self.cache_file = get_cache_path(cache_file_name)
811
812 downloads = {}
813 for plugin in initial_plugins:
814 downloads[plugin.commit] = plugin
815 downloads.update(self.load())
816 self.downloads = downloads
817
818 def load(self) -> dict[str, Plugin]:
819 if self.cache_file is None or not self.cache_file.exists():
820 return {}
821
822 downloads: dict[str, Plugin] = {}
823 with open(self.cache_file) as f:
824 data = json.load(f)
825 for attr in data.values():
826 p = Plugin(
827 attr["name"], attr["commit"], attr["has_submodules"], attr["sha256"]
828 )
829 downloads[attr["commit"]] = p
830 return downloads
831
832 def store(self) -> None:
833 if self.cache_file is None:
834 return
835
836 os.makedirs(self.cache_file.parent, exist_ok=True)
837 with open(self.cache_file, "w+") as f:
838 data = {}
839 for name, attr in self.downloads.items():
840 data[name] = attr.as_json()
841 json.dump(data, f, indent=4, sort_keys=True)
842
843 def __getitem__(self, key: str) -> Plugin | None:
844 return self.downloads.get(key, None)
845
846 def __setitem__(self, key: str, value: Plugin) -> None:
847 self.downloads[key] = value
848
849
850def prefetch(
851 pluginDesc: PluginDesc, cache: Cache
852) -> tuple[PluginDesc, Exception | Plugin, Repo | None]:
853 try:
854 plugin, redirect = prefetch_plugin(pluginDesc, cache)
855 cache[plugin.commit] = plugin
856 return (pluginDesc, plugin, redirect)
857 except Exception as e:
858 return (pluginDesc, e, None)
859
860
861def rewrite_input(
862 config: FetchConfig,
863 input_file: Path,
864 deprecated: Path,
865 # old pluginDesc and the new
866 redirects: Redirects = {},
867 append: list[PluginDesc] = [],
868):
869 log.info("Rewriting input file %s", input_file)
870 plugins = load_plugins_from_csv(config, input_file)
871
872 plugins.extend(append)
873
874 if redirects:
875 log.debug("Dealing with deprecated plugins listed in %s", deprecated)
876
877 cur_date_iso = datetime.now().strftime("%Y-%m-%d")
878 with open(deprecated, "r") as f:
879 deprecations = json.load(f)
880 # TODO parallelize this step
881 for pdesc, new_repo in redirects.items():
882 log.info("Resolving deprecated plugin %s -> %s", pdesc.name, new_repo.name)
883 new_pdesc = PluginDesc(new_repo, pdesc.branch, pdesc.alias)
884
885 old_plugin, _ = prefetch_plugin(pdesc)
886 new_plugin, _ = prefetch_plugin(new_pdesc)
887
888 if old_plugin.normalized_name != new_plugin.normalized_name:
889 deprecations[old_plugin.normalized_name] = {
890 "new": new_plugin.normalized_name,
891 "date": cur_date_iso,
892 }
893
894 # remove plugin from index file, so we won't add it to deprecations again
895 for i, plugin in enumerate(plugins):
896 if plugin.name == pdesc.name:
897 plugins.pop(i)
898 break
899 plugins.append(new_pdesc)
900
901 with open(deprecated, "w") as f:
902 json.dump(deprecations, f, indent=4, sort_keys=True)
903 f.write("\n")
904
905 with open(input_file, "w") as f:
906 log.debug("Writing into %s", input_file)
907 # fields = dataclasses.fields(PluginDesc)
908 fieldnames = ["repo", "branch", "alias"]
909 writer = csv.DictWriter(f, fieldnames, dialect="unix", quoting=csv.QUOTE_NONE)
910 writer.writeheader()
911 for plugin in sorted(plugins, key=lambda x: x.name):
912 writer.writerow(asdict(plugin))
913
914
915def commit(repo: git.Repo, message: str, files: list[Path]) -> None:
916 repo.index.add([str(f.resolve()) for f in files])
917
918 if repo.index.diff("HEAD"):
919 print(f'committing to nixpkgs "{message}"')
920 repo.index.commit(message)
921 else:
922 print("no changes in working tree to commit")
923
924
925def update_plugins(editor: Editor, args):
926 """The main entry function of this module.
927 All input arguments are grouped in the `Editor`."""
928
929 log.info("Start updating plugins")
930 if args.proc > 1 and args.github_token == None:
931 log.warning(
932 "You have enabled parallel updates but haven't set a github token.\n"
933 "You may be hit with `HTTP Error 429: too many requests` as a consequence."
934 "Either set --proc=1 or --github-token=YOUR_TOKEN. "
935 )
936
937 fetch_config = FetchConfig(args.proc, args.github_token)
938 update = editor.get_update(
939 input_file=args.input_file,
940 output_file=args.outfile,
941 config=fetch_config,
942 to_update=getattr( # if script was called without arguments
943 args, "update_only", None
944 ),
945 )
946
947 start_time = time.time()
948 redirects = update()
949 duration = time.time() - start_time
950 print(f"The plugin update took {duration:.2f}s.")
951 editor.rewrite_input(fetch_config, args.input_file, editor.deprecated, redirects)
952
953 autocommit = not args.no_commit
954
955 if autocommit:
956 try:
957 repo = git.Repo(os.getcwd())
958 updated = datetime.now(tz=UTC).strftime("%Y-%m-%d")
959 print(args.outfile)
960 commit(repo, f"{editor.attr_path}: update on {updated}", [args.outfile])
961 except git.InvalidGitRepositoryError as e:
962 print(f"Not in a git repository: {e}", file=sys.stderr)
963 sys.exit(1)
964
965 if redirects:
966 update()
967 if autocommit:
968 commit(
969 editor.nixpkgs_repo,
970 f"{editor.attr_path}: resolve github repository redirects",
971 [args.outfile, args.input_file, editor.deprecated],
972 )