this repo has no description
at master 244 lines 7.8 kB view raw
1import glob 2import os 3import pathlib 4from typing import Literal 5import ast 6from src.id.nsid import NSID 7from src.lexicon.lexicon import ( 8 LLexicon, 9 LObject, 10 LParams, 11 LQuery, 12 SchemaObject, 13 lexicon, 14) 15 16 17TYPE_MAP: dict[str, str] = { 18 "null": "None", 19 "boolean": "bool", 20 "integer": "int", 21 "string": "str", 22 "bytes": "bytes", 23 # "cid-link": ..., 24 # "blob": ..., 25 "array": "list", 26 "object": "object", 27 # "params": ..., 28 "token": "str", 29 # "ref": ..., 30 # "union": ..., 31 # "unknown": ..., 32 # "record": ..., 33 # "query": ..., 34 # "procedure": ..., 35 # "subscription": ..., 36} 37 38LEXICON_STORE = pathlib.Path.cwd() / "lex" 39 40 41def content_for_type(fragment: SchemaObject): 42 if isinstance(fragment, LObject): 43 return [ 44 ast.AnnAssign( 45 target=ast.Name(field), 46 annotation=ast.Constant( 47 TYPE_MAP[schema.__lexicon_type__] 48 + (" | None" if field in (fragment.nullables or []) else "") 49 ), 50 value=None, 51 simple=True, 52 ) 53 for field, schema in fragment.properties.items() 54 ] or [ast.Pass()] 55 if isinstance(fragment, LParams): 56 return [ 57 ast.AnnAssign( 58 target=ast.Name(field), 59 annotation=ast.Constant( 60 TYPE_MAP[schema.__lexicon_type__] 61 + (" | None" if field not in (fragment.required or []) else "") 62 ), 63 value=None, 64 simple=True, 65 ) 66 for field, schema in fragment.properties.items() 67 ] or [ast.Pass()] 68 if isinstance(fragment, LQuery): 69 return [] 70 return [ast.Pass()] 71 72 73def create_lexicon(lexicon: LLexicon): 74 lex_nsid = str(lexicon.id).replace("-", "_") 75 76 body: list[ast.stmt] = [ 77 ast.ImportFrom( 78 "src.lexicon.model", 79 [ast.alias("BaseFragment"), ast.alias("BaseExport")], 80 level=0, 81 lineno=0, 82 ), 83 ] 84 names_for_main = [] 85 for key, fragment in sorted(lexicon.defs.items(), key=lambda x: x[0] == "main"): 86 if key == "main": 87 continue 88 names_for_main.append(key) 89 body.append( 90 ast.ClassDef( 91 bases=[ast.Name("BaseFragment")], 92 name="_meta_" + key, 93 body=[ 94 ast.Assign( 95 targets=[ast.Name("__lexicon__")], 96 value=ast.Constant(lex_nsid), 97 lineno=0, 98 ), 99 ast.Assign( 100 targets=[ast.Name("__fragment__")], 101 value=ast.Constant(key), 102 lineno=0, 103 ), 104 ], 105 ) 106 ) 107 body.append( 108 ast.ClassDef( 109 "_frag_" + key, 110 bases=[ast.Name(TYPE_MAP[fragment.__lexicon_type__])], 111 keywords=[ast.keyword("metaclass", ast.Name("_meta_" + key))], 112 body=[ 113 ast.Assign( 114 [ast.Name("__qualname__")], ast.Constant(key), lineno=0 115 ), 116 *content_for_type(fragment), 117 ], 118 ), 119 ) 120 if "main" in lexicon.defs: 121 fragment = lexicon.defs["main"] 122 body.append( 123 ast.ClassDef( 124 bases=[ast.Name("BaseFragment")], 125 name="_meta_main", 126 body=[ 127 ast.Assign( 128 targets=[ast.Name("__lexicon__")], 129 value=ast.Constant(lex_nsid), 130 lineno=0, 131 ), 132 ast.Assign( 133 targets=[ast.Name("__fragment__")], 134 value=ast.Constant("main"), 135 lineno=0, 136 ), 137 *[ 138 ast.Assign( 139 targets=[ast.Name(name)], 140 value=ast.Name("_frag_" + name), 141 lineno=0, 142 ) 143 for name in names_for_main 144 ], 145 ], 146 ) 147 ) 148 body.append( 149 ast.ClassDef( 150 "_frag_main", 151 bases=[ast.Name(TYPE_MAP[fragment.__lexicon_type__])], 152 keywords=[ast.keyword("metaclass", ast.Name("_meta_main"))], 153 body=[ 154 ast.Assign( 155 [ast.Name("__qualname__")], ast.Constant("main"), lineno=0 156 ), 157 *content_for_type(fragment), 158 ], 159 ), 160 ) 161 body.append( 162 ast.Assign( 163 targets=[ast.Name(lexicon.id.name)], 164 value=ast.Name("_frag_main"), 165 lineno=0, 166 ) 167 ) 168 else: 169 body.append( 170 ast.ClassDef( 171 bases=[ast.Name("BaseExport")], 172 name="_export", 173 body=[ 174 ast.Assign( 175 targets=[ast.Name("__lexicon__")], 176 value=ast.Constant(lex_nsid), 177 lineno=0, 178 ), 179 *[ 180 ast.Assign( 181 targets=[ast.Name(name)], 182 value=ast.Name("_frag_" + name), 183 lineno=0, 184 ) 185 for name in names_for_main 186 ], 187 ], 188 ) 189 ) 190 body.append( 191 ast.Assign( 192 targets=[ast.Name(lexicon.id.name)], 193 value=ast.Call(ast.Name("_export")), 194 lineno=0, 195 ) 196 ) 197 return ast.Module( 198 body=body, 199 ) 200 201 202def convert_lexicon(lexicon: LLexicon): 203 lexicon_authority_store = pathlib.Path(LEXICON_STORE, *lexicon.id.domain_authority) 204 os.makedirs(lexicon_authority_store, exist_ok=True) 205 lexicon_store = pathlib.Path(LEXICON_STORE, *lexicon.id.segments).with_suffix(".py") 206 with lexicon_store.open("w", encoding="utf-8") as file: 207 file.write(ast.unparse(create_lexicon(lexicon))) 208 209 210def regenerate_dunder_inits(changed: NSID): 211 segments = ["", *changed.domain_authority] 212 lng = len(segments) - 1 213 for idx in range(len(segments)): 214 segment_path = pathlib.Path(LEXICON_STORE, *segments[: idx + 1]) 215 files = list(segment_path.glob("*")) 216 file_content = ast.unparse( 217 ast.Module( 218 [ 219 *[ 220 ast.ImportFrom( 221 ("." if idx < lng else "." + file.with_suffix("").name), 222 [ast.alias(file.with_suffix("").name)], 223 level=0, 224 lineno=0, 225 ) 226 for file in files 227 if not file.name.startswith("__") 228 ], 229 ast.Assign( 230 [ast.Name("__all__")], 231 ast.List( 232 [ 233 ast.Constant(file.with_suffix("").name) 234 for file in files 235 if not file.name.startswith("__") 236 ] 237 ), 238 lineno=0, 239 ), 240 ] 241 ) 242 ) 243 with (segment_path / "__init__.py").open("w", encoding="utf-8") as fw: 244 fw.write(file_content)