this repo has no description
1import glob
2import os
3import pathlib
4from typing import Literal
5import ast
6from src.id.nsid import NSID
7from src.lexicon.lexicon import (
8 LLexicon,
9 LObject,
10 LParams,
11 LQuery,
12 SchemaObject,
13 lexicon,
14)
15
16
17TYPE_MAP: dict[str, str] = {
18 "null": "None",
19 "boolean": "bool",
20 "integer": "int",
21 "string": "str",
22 "bytes": "bytes",
23 # "cid-link": ...,
24 # "blob": ...,
25 "array": "list",
26 "object": "object",
27 # "params": ...,
28 "token": "str",
29 # "ref": ...,
30 # "union": ...,
31 # "unknown": ...,
32 # "record": ...,
33 # "query": ...,
34 # "procedure": ...,
35 # "subscription": ...,
36}
37
38LEXICON_STORE = pathlib.Path.cwd() / "lex"
39
40
41def content_for_type(fragment: SchemaObject):
42 if isinstance(fragment, LObject):
43 return [
44 ast.AnnAssign(
45 target=ast.Name(field),
46 annotation=ast.Constant(
47 TYPE_MAP[schema.__lexicon_type__]
48 + (" | None" if field in (fragment.nullables or []) else "")
49 ),
50 value=None,
51 simple=True,
52 )
53 for field, schema in fragment.properties.items()
54 ] or [ast.Pass()]
55 if isinstance(fragment, LParams):
56 return [
57 ast.AnnAssign(
58 target=ast.Name(field),
59 annotation=ast.Constant(
60 TYPE_MAP[schema.__lexicon_type__]
61 + (" | None" if field not in (fragment.required or []) else "")
62 ),
63 value=None,
64 simple=True,
65 )
66 for field, schema in fragment.properties.items()
67 ] or [ast.Pass()]
68 if isinstance(fragment, LQuery):
69 return []
70 return [ast.Pass()]
71
72
73def create_lexicon(lexicon: LLexicon):
74 lex_nsid = str(lexicon.id).replace("-", "_")
75
76 body: list[ast.stmt] = [
77 ast.ImportFrom(
78 "src.lexicon.model",
79 [ast.alias("BaseFragment"), ast.alias("BaseExport")],
80 level=0,
81 lineno=0,
82 ),
83 ]
84 names_for_main = []
85 for key, fragment in sorted(lexicon.defs.items(), key=lambda x: x[0] == "main"):
86 if key == "main":
87 continue
88 names_for_main.append(key)
89 body.append(
90 ast.ClassDef(
91 bases=[ast.Name("BaseFragment")],
92 name="_meta_" + key,
93 body=[
94 ast.Assign(
95 targets=[ast.Name("__lexicon__")],
96 value=ast.Constant(lex_nsid),
97 lineno=0,
98 ),
99 ast.Assign(
100 targets=[ast.Name("__fragment__")],
101 value=ast.Constant(key),
102 lineno=0,
103 ),
104 ],
105 )
106 )
107 body.append(
108 ast.ClassDef(
109 "_frag_" + key,
110 bases=[ast.Name(TYPE_MAP[fragment.__lexicon_type__])],
111 keywords=[ast.keyword("metaclass", ast.Name("_meta_" + key))],
112 body=[
113 ast.Assign(
114 [ast.Name("__qualname__")], ast.Constant(key), lineno=0
115 ),
116 *content_for_type(fragment),
117 ],
118 ),
119 )
120 if "main" in lexicon.defs:
121 fragment = lexicon.defs["main"]
122 body.append(
123 ast.ClassDef(
124 bases=[ast.Name("BaseFragment")],
125 name="_meta_main",
126 body=[
127 ast.Assign(
128 targets=[ast.Name("__lexicon__")],
129 value=ast.Constant(lex_nsid),
130 lineno=0,
131 ),
132 ast.Assign(
133 targets=[ast.Name("__fragment__")],
134 value=ast.Constant("main"),
135 lineno=0,
136 ),
137 *[
138 ast.Assign(
139 targets=[ast.Name(name)],
140 value=ast.Name("_frag_" + name),
141 lineno=0,
142 )
143 for name in names_for_main
144 ],
145 ],
146 )
147 )
148 body.append(
149 ast.ClassDef(
150 "_frag_main",
151 bases=[ast.Name(TYPE_MAP[fragment.__lexicon_type__])],
152 keywords=[ast.keyword("metaclass", ast.Name("_meta_main"))],
153 body=[
154 ast.Assign(
155 [ast.Name("__qualname__")], ast.Constant("main"), lineno=0
156 ),
157 *content_for_type(fragment),
158 ],
159 ),
160 )
161 body.append(
162 ast.Assign(
163 targets=[ast.Name(lexicon.id.name)],
164 value=ast.Name("_frag_main"),
165 lineno=0,
166 )
167 )
168 else:
169 body.append(
170 ast.ClassDef(
171 bases=[ast.Name("BaseExport")],
172 name="_export",
173 body=[
174 ast.Assign(
175 targets=[ast.Name("__lexicon__")],
176 value=ast.Constant(lex_nsid),
177 lineno=0,
178 ),
179 *[
180 ast.Assign(
181 targets=[ast.Name(name)],
182 value=ast.Name("_frag_" + name),
183 lineno=0,
184 )
185 for name in names_for_main
186 ],
187 ],
188 )
189 )
190 body.append(
191 ast.Assign(
192 targets=[ast.Name(lexicon.id.name)],
193 value=ast.Call(ast.Name("_export")),
194 lineno=0,
195 )
196 )
197 return ast.Module(
198 body=body,
199 )
200
201
202def convert_lexicon(lexicon: LLexicon):
203 lexicon_authority_store = pathlib.Path(LEXICON_STORE, *lexicon.id.domain_authority)
204 os.makedirs(lexicon_authority_store, exist_ok=True)
205 lexicon_store = pathlib.Path(LEXICON_STORE, *lexicon.id.segments).with_suffix(".py")
206 with lexicon_store.open("w", encoding="utf-8") as file:
207 file.write(ast.unparse(create_lexicon(lexicon)))
208
209
210def regenerate_dunder_inits(changed: NSID):
211 segments = ["", *changed.domain_authority]
212 lng = len(segments) - 1
213 for idx in range(len(segments)):
214 segment_path = pathlib.Path(LEXICON_STORE, *segments[: idx + 1])
215 files = list(segment_path.glob("*"))
216 file_content = ast.unparse(
217 ast.Module(
218 [
219 *[
220 ast.ImportFrom(
221 ("." if idx < lng else "." + file.with_suffix("").name),
222 [ast.alias(file.with_suffix("").name)],
223 level=0,
224 lineno=0,
225 )
226 for file in files
227 if not file.name.startswith("__")
228 ],
229 ast.Assign(
230 [ast.Name("__all__")],
231 ast.List(
232 [
233 ast.Constant(file.with_suffix("").name)
234 for file in files
235 if not file.name.startswith("__")
236 ]
237 ),
238 lineno=0,
239 ),
240 ]
241 )
242 )
243 with (segment_path / "__init__.py").open("w", encoding="utf-8") as fw:
244 fw.write(file_content)