1#!/usr/bin/env nix-shell
2#!nix-shell -p nix-prefetch-git -p python3 nix -i python3
3
4# format:
5# $ nix run nixpkgs.python3Packages.black -c black update.py
6# type-check:
7# $ nix run nixpkgs.python3Packages.mypy -c mypy update.py
8# linted:
9# $ nix run nixpkgs.python3Packages.flake8 -c flake8 --ignore E501,E265 update.py
10
11import functools
12import json
13import os
14import subprocess
15import sys
16import traceback
17import urllib.error
18import urllib.request
19import xml.etree.ElementTree as ET
20from datetime import datetime
21from multiprocessing.dummy import Pool
22from pathlib import Path
23from typing import Dict, List, Optional, Tuple, Union, Any
24from urllib.parse import urljoin, urlparse
25from tempfile import NamedTemporaryFile
26
27ATOM_ENTRY = "{http://www.w3.org/2005/Atom}entry"
28ATOM_LINK = "{http://www.w3.org/2005/Atom}link"
29ATOM_UPDATED = "{http://www.w3.org/2005/Atom}updated"
30
31ROOT = Path(__file__).parent
32
33
34class Repo:
35 def __init__(self, owner: str, name: str) -> None:
36 self.owner = owner
37 self.name = name
38
39 def url(self, path: str) -> str:
40 return urljoin(f"https://github.com/{self.owner}/{self.name}/", path)
41
42 def __repr__(self) -> str:
43 return f"Repo({self.owner}, {self.name})"
44
45 def has_submodules(self) -> bool:
46 try:
47 urllib.request.urlopen(self.url("blob/master/.gitmodules")).close()
48 except urllib.error.HTTPError as e:
49 if e.code == 404:
50 return False
51 else:
52 raise
53 return True
54
55 def latest_commit(self) -> Tuple[str, datetime]:
56 with urllib.request.urlopen(self.url("commits/master.atom")) as req:
57 xml = req.read()
58 root = ET.fromstring(xml)
59 latest_entry = root.find(ATOM_ENTRY)
60 assert latest_entry is not None, f"No commits found in repository {self}"
61 commit_link = latest_entry.find(ATOM_LINK)
62 assert commit_link is not None, f"No link tag found feed entry {xml}"
63 url = urlparse(commit_link.get("href"))
64 updated_tag = latest_entry.find(ATOM_UPDATED)
65 assert (
66 updated_tag is not None and updated_tag.text is not None
67 ), f"No updated tag found feed entry {xml}"
68 updated = datetime.strptime(updated_tag.text, "%Y-%m-%dT%H:%M:%SZ")
69 return Path(url.path).name, updated
70
71 def prefetch_git(self, ref: str) -> str:
72 data = subprocess.check_output(
73 ["nix-prefetch-git", "--fetch-submodules", self.url(""), ref]
74 )
75 return json.loads(data)["sha256"]
76
77 def prefetch_github(self, ref: str) -> str:
78 data = subprocess.check_output(
79 ["nix-prefetch-url", "--unpack", self.url(f"archive/{ref}.tar.gz")]
80 )
81 return data.strip().decode("utf-8")
82
83
84class Plugin:
85 def __init__(
86 self,
87 name: str,
88 commit: str,
89 has_submodules: bool,
90 sha256: str,
91 date: Optional[datetime] = None,
92 ) -> None:
93 self.name = name
94 self.commit = commit
95 self.has_submodules = has_submodules
96 self.sha256 = sha256
97 self.date = date
98
99 @property
100 def normalized_name(self) -> str:
101 return self.name.replace(".", "-")
102
103 @property
104 def version(self) -> str:
105 assert self.date is not None
106 return self.date.strftime("%Y-%m-%d")
107
108 def as_json(self) -> Dict[str, str]:
109 copy = self.__dict__.copy()
110 del copy["date"]
111 return copy
112
113
114GET_PLUGINS = """(with import <localpkgs> {};
115let
116 hasChecksum = value: lib.isAttrs value && lib.hasAttrByPath ["src" "outputHash"] value;
117 getChecksum = name: value:
118 if hasChecksum value then {
119 submodules = value.src.fetchSubmodules or false;
120 sha256 = value.src.outputHash;
121 rev = value.src.rev;
122 } else null;
123 checksums = lib.mapAttrs getChecksum vimPlugins;
124in lib.filterAttrs (n: v: v != null) checksums)"""
125
126
127class CleanEnvironment(object):
128 def __enter__(self) -> None:
129 self.old_environ = os.environ.copy()
130 local_pkgs = str(ROOT.joinpath("../../.."))
131 os.environ["NIX_PATH"] = f"localpkgs={local_pkgs}"
132 self.empty_config = NamedTemporaryFile()
133 self.empty_config.write(b"{}")
134 self.empty_config.flush()
135 os.environ["NIXPKGS_CONFIG"] = self.empty_config.name
136
137 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
138 os.environ.update(self.old_environ)
139 self.empty_config.close()
140
141
142def get_current_plugins() -> List[Plugin]:
143 with CleanEnvironment():
144 out = subprocess.check_output(["nix", "eval", "--json", GET_PLUGINS])
145 data = json.loads(out)
146 plugins = []
147 for name, attr in data.items():
148 p = Plugin(name, attr["rev"], attr["submodules"], attr["sha256"])
149 plugins.append(p)
150 return plugins
151
152
153def prefetch_plugin(user: str, repo_name: str, cache: "Cache") -> Plugin:
154 repo = Repo(user, repo_name)
155 commit, date = repo.latest_commit()
156 has_submodules = repo.has_submodules()
157 cached_plugin = cache[commit]
158 if cached_plugin is not None:
159 cached_plugin.name = repo_name
160 cached_plugin.date = date
161 return cached_plugin
162
163 print(f"prefetch {user}/{repo_name}")
164 if has_submodules:
165 sha256 = repo.prefetch_git(commit)
166 else:
167 sha256 = repo.prefetch_github(commit)
168
169 return Plugin(repo_name, commit, has_submodules, sha256, date=date)
170
171
172def print_download_error(plugin: str, ex: Exception):
173 print(f"{plugin}: {ex}", file=sys.stderr)
174 ex_traceback = ex.__traceback__
175 tb_lines = [
176 line.rstrip("\n")
177 for line in traceback.format_exception(ex.__class__, ex, ex_traceback)
178 ]
179 print("\n".join(tb_lines))
180
181
182def check_results(
183 results: List[Tuple[str, str, Union[Exception, Plugin]]]
184) -> List[Tuple[str, str, Plugin]]:
185 failures: List[Tuple[str, Exception]] = []
186 plugins = []
187 for (owner, name, result) in results:
188 if isinstance(result, Exception):
189 failures.append((name, result))
190 else:
191 plugins.append((owner, name, result))
192
193 print(f"{len(results) - len(failures)} plugins were checked", end="")
194 if len(failures) == 0:
195 print()
196 return plugins
197 else:
198 print(f", {len(failures)} plugin(s) could not be downloaded:\n")
199
200 for (plugin, exception) in failures:
201 print_download_error(plugin, exception)
202
203 sys.exit(1)
204
205
206def load_plugin_spec() -> List[Tuple[str, str]]:
207 plugin_file = ROOT.joinpath("vim-plugin-names")
208 plugins = []
209 with open(plugin_file) as f:
210 for line in f:
211 spec = line.strip()
212 parts = spec.split("/")
213 if len(parts) != 2:
214 msg = f"Invalid repository {spec}, must be in the format owner/repo"
215 print(msg, file=sys.stderr)
216 sys.exit(1)
217 plugins.append((parts[0], parts[1]))
218 return plugins
219
220
221def get_cache_path() -> Optional[Path]:
222 xdg_cache = os.environ.get("XDG_CACHE_HOME", None)
223 if xdg_cache is None:
224 home = os.environ.get("HOME", None)
225 if home is None:
226 return None
227 xdg_cache = str(Path(home, ".cache"))
228
229 return Path(xdg_cache, "vim-plugin-cache.json")
230
231
232class Cache:
233 def __init__(self, initial_plugins: List[Plugin]) -> None:
234 self.cache_file = get_cache_path()
235
236 downloads = {}
237 for plugin in initial_plugins:
238 downloads[plugin.commit] = plugin
239 downloads.update(self.load())
240 self.downloads = downloads
241
242 def load(self) -> Dict[str, Plugin]:
243 if self.cache_file is None or not self.cache_file.exists():
244 return {}
245
246 downloads: Dict[str, Plugin] = {}
247 with open(self.cache_file) as f:
248 data = json.load(f)
249 for attr in data.values():
250 p = Plugin(
251 attr["name"], attr["commit"], attr["has_submodules"], attr["sha256"]
252 )
253 downloads[attr["commit"]] = p
254 return downloads
255
256 def store(self) -> None:
257 if self.cache_file is None:
258 return
259
260 os.makedirs(self.cache_file.parent, exist_ok=True)
261 with open(self.cache_file, "w+") as f:
262 data = {}
263 for name, attr in self.downloads.items():
264 data[name] = attr.as_json()
265 json.dump(data, f, indent=4, sort_keys=True)
266
267 def __getitem__(self, key: str) -> Optional[Plugin]:
268 return self.downloads.get(key, None)
269
270 def __setitem__(self, key: str, value: Plugin) -> None:
271 self.downloads[key] = value
272
273
274def prefetch(
275 args: Tuple[str, str], cache: Cache
276) -> Tuple[str, str, Union[Exception, Plugin]]:
277 assert len(args) == 2
278 owner, repo = args
279 try:
280 plugin = prefetch_plugin(owner, repo, cache)
281 cache[plugin.commit] = plugin
282 return (owner, repo, plugin)
283 except Exception as e:
284 return (owner, repo, e)
285
286
287header = (
288 "# This file has been generated by ./pkgs/misc/vim-plugins/update.py. Do not edit!"
289)
290
291
292def generate_nix(plugins: List[Tuple[str, str, Plugin]]):
293 sorted_plugins = sorted(plugins, key=lambda v: v[2].name.lower())
294
295 with open(ROOT.joinpath("generated.nix"), "w+") as f:
296 f.write(header)
297 f.write(
298 """
299{ lib, buildVimPluginFrom2Nix, fetchFromGitHub, overrides ? (self: super: {}) }:
300
301let
302 packages = ( self:
303{"""
304 )
305 for owner, repo, plugin in sorted_plugins:
306 if plugin.has_submodules:
307 submodule_attr = "\n fetchSubmodules = true;"
308 else:
309 submodule_attr = ""
310
311 f.write(
312 f"""
313 {plugin.normalized_name} = buildVimPluginFrom2Nix {{
314 pname = "{plugin.normalized_name}";
315 version = "{plugin.version}";
316 src = fetchFromGitHub {{
317 owner = "{owner}";
318 repo = "{repo}";
319 rev = "{plugin.commit}";
320 sha256 = "{plugin.sha256}";{submodule_attr}
321 }};
322 }};
323"""
324 )
325 f.write("""
326});
327in lib.fix' (lib.extends overrides packages)
328""")
329 print("updated generated.nix")
330
331
332def main() -> None:
333 plugin_names = load_plugin_spec()
334 current_plugins = get_current_plugins()
335 cache = Cache(current_plugins)
336
337 prefetch_with_cache = functools.partial(prefetch, cache=cache)
338
339 try:
340 # synchronous variant for debugging
341 # results = map(prefetch_with_cache, plugins)
342 pool = Pool(processes=30)
343 results = pool.map(prefetch_with_cache, plugin_names)
344 finally:
345 cache.store()
346
347 plugins = check_results(results)
348
349 generate_nix(plugins)
350
351
352if __name__ == "__main__":
353 main()