1from urllib.parse import quote
2import json
3import subprocess as sub
4import os
5import sys
6from typing import Iterator, Any, Literal, TypedDict, Optional
7from tempfile import NamedTemporaryFile
8
9debug: bool = True if os.environ.get("DEBUG", False) else False
10Bin = str
11args: dict[str, Any] = json.loads(os.environ["ARGS"])
12bins: dict[str, Bin] = args["binaries"]
13
14mode: str = sys.argv[1]
15jsonArg: dict = json.loads(sys.argv[2])
16
17Args = Iterator[str]
18
19
20def log(msg: str) -> None:
21 print(msg, file=sys.stderr)
22
23
24def atomically_write(file_path: str, content: bytes) -> None:
25 """atomically write the content into `file_path`"""
26 with NamedTemporaryFile(
27 # write to the parent dir, so that it’s guaranteed to be on the same filesystem
28 dir=os.path.dirname(file_path),
29 delete=False
30 ) as tmp:
31 try:
32 tmp.write(content)
33 os.rename(
34 src=tmp.name,
35 dst=file_path
36 )
37 except Exception:
38 os.unlink(tmp.name)
39
40
41def curl_github_args(token: str | None, url: str) -> Args:
42 """Query the github API via curl"""
43 yield bins["curl"]
44 if not debug:
45 yield "--silent"
46 # follow redirects
47 yield "--location"
48 if token:
49 yield "-H"
50 yield f"Authorization: token {token}"
51 yield url
52
53
54def curl_result(output: bytes) -> Any | Literal["not found"]:
55 """Parse the curl result of the github API"""
56 res: Any = json.loads(output)
57 match res:
58 case dict(res):
59 message: str = res.get("message", "")
60 if "rate limit" in message:
61 sys.exit("Rate limited by the Github API")
62 if "Not Found" in message:
63 return "not found"
64 # if the result is another type, we can pass it on
65 return res
66
67
68def nix_prefetch_git_args(url: str, version_rev: str) -> Args:
69 """Prefetch a git repository"""
70 yield bins["nix-prefetch-git"]
71 if not debug:
72 yield "--quiet"
73 yield "--no-deepClone"
74 yield "--url"
75 yield url
76 yield "--rev"
77 yield version_rev
78
79
80def run_cmd(args: Args) -> bytes:
81 all = list(args)
82 if debug:
83 log(str(all))
84 return sub.check_output(all)
85
86
87Dir = str
88
89
90def fetchRepo() -> None:
91 """fetch the given repo and write its nix-prefetch output to the corresponding grammar json file"""
92 match jsonArg:
93 case {
94 "orga": orga,
95 "repo": repo,
96 "outputDir": outputDir,
97 "nixRepoAttrName": nixRepoAttrName,
98 }:
99 token: str | None = os.environ.get("GITHUB_TOKEN", None)
100 out = run_cmd(
101 curl_github_args(
102 token,
103 url=f"https://api.github.com/repos/{quote(orga)}/{quote(repo)}/releases/latest"
104 )
105 )
106 release: str
107 match curl_result(out):
108 case "not found":
109 if "branch" in jsonArg:
110 branch = jsonArg.get("branch")
111 release = f"refs/heads/{branch}"
112 else:
113 # github sometimes returns an empty list even tough there are releases
114 log(f"uh-oh, latest for {orga}/{repo} is not there, using HEAD")
115 release = "HEAD"
116 case {"tag_name": tag_name}:
117 release = tag_name
118 case _:
119 sys.exit(f"git result for {orga}/{repo} did not have a `tag_name` field")
120
121 log(f"Fetching latest release ({release}) of {orga}/{repo} …")
122 res = run_cmd(
123 nix_prefetch_git_args(
124 url=f"https://github.com/{quote(orga)}/{quote(repo)}",
125 version_rev=release
126 )
127 )
128 atomically_write(
129 file_path=os.path.join(
130 outputDir,
131 f"{nixRepoAttrName}.json"
132 ),
133 content=res
134 )
135 case _:
136 sys.exit("input json must have `orga` and `repo` keys")
137
138
139def fetchOrgaLatestRepos(orga: str) -> set[str]:
140 """fetch the latest (100) repos from the given github organization"""
141 token: str | None = os.environ.get("GITHUB_TOKEN", None)
142 out = run_cmd(
143 curl_github_args(
144 token,
145 url=f"https://api.github.com/orgs/{quote(orga)}/repos?per_page=100"
146 )
147 )
148 match curl_result(out):
149 case "not found":
150 sys.exit(f"github organization {orga} not found")
151 case list(repos):
152 res: list[str] = []
153 for repo in repos:
154 name = repo.get("name")
155 if name:
156 res.append(name)
157 return set(res)
158 case _:
159 sys.exit("github result was not a list of repos, but {other}")
160
161
162def checkTreeSitterRepos(latest_github_repos: set[str]) -> None:
163 """Make sure we know about all tree sitter repos on the tree sitter orga."""
164 known: set[str] = set(args["knownTreeSitterOrgGrammarRepos"])
165 ignored: set[str] = set(args["ignoredTreeSitterOrgRepos"])
166
167 unknown = latest_github_repos - (known | ignored)
168
169 if unknown:
170 sys.exit(f"These repositories are neither known nor ignored:\n{unknown}")
171
172
173Grammar = TypedDict(
174 "Grammar",
175 {
176 "nixRepoAttrName": str,
177 "orga": str,
178 "repo": str,
179 "branch": Optional[str]
180 }
181)
182
183
184def printAllGrammarsNixFile() -> None:
185 """Print a .nix file that imports all grammars."""
186 allGrammars: list[dict[str, Grammar]] = jsonArg["allGrammars"]
187 outputDir: Dir = jsonArg["outputDir"]
188
189 def file() -> Iterator[str]:
190 yield "{ lib }:"
191 yield "{"
192 for grammar in allGrammars:
193 n = grammar["nixRepoAttrName"]
194 yield f" {n} = lib.importJSON ./{n}.json;"
195 yield "}"
196 yield ""
197
198 atomically_write(
199 file_path=os.path.join(
200 outputDir,
201 "default.nix"
202 ),
203 content="\n".join(file()).encode()
204 )
205
206
207def fetchAndCheckTreeSitterRepos() -> None:
208 log("fetching list of grammars")
209 latest_repos = fetchOrgaLatestRepos(orga="tree-sitter")
210 log("checking the tree-sitter repo list against the grammars we know")
211 checkTreeSitterRepos(latest_repos)
212
213
214match mode:
215 case "fetch-repo":
216 fetchRepo()
217 case "fetch-and-check-tree-sitter-repos":
218 fetchAndCheckTreeSitterRepos()
219 case "print-all-grammars-nix-file":
220 printAllGrammarsNixFile()
221 case _:
222 sys.exit(f"mode {mode} unknown")