import glob import os import pathlib from typing import Literal import ast from src.id.nsid import NSID from src.lexicon.lexicon import ( LLexicon, LObject, LParams, LQuery, SchemaObject, lexicon, ) TYPE_MAP: dict[str, str] = { "null": "None", "boolean": "bool", "integer": "int", "string": "str", "bytes": "bytes", # "cid-link": ..., # "blob": ..., "array": "list", "object": "object", # "params": ..., "token": "str", # "ref": ..., # "union": ..., # "unknown": ..., # "record": ..., # "query": ..., # "procedure": ..., # "subscription": ..., } LEXICON_STORE = pathlib.Path.cwd() / "lex" def content_for_type(fragment: SchemaObject): if isinstance(fragment, LObject): return [ ast.AnnAssign( target=ast.Name(field), annotation=ast.Constant( TYPE_MAP[schema.__lexicon_type__] + (" | None" if field in (fragment.nullables or []) else "") ), value=None, simple=True, ) for field, schema in fragment.properties.items() ] or [ast.Pass()] if isinstance(fragment, LParams): return [ ast.AnnAssign( target=ast.Name(field), annotation=ast.Constant( TYPE_MAP[schema.__lexicon_type__] + (" | None" if field not in (fragment.required or []) else "") ), value=None, simple=True, ) for field, schema in fragment.properties.items() ] or [ast.Pass()] if isinstance(fragment, LQuery): return [] return [ast.Pass()] def create_lexicon(lexicon: LLexicon): lex_nsid = str(lexicon.id).replace("-", "_") body: list[ast.stmt] = [ ast.ImportFrom( "src.lexicon.model", [ast.alias("BaseFragment"), ast.alias("BaseExport")], level=0, lineno=0, ), ] names_for_main = [] for key, fragment in sorted(lexicon.defs.items(), key=lambda x: x[0] == "main"): if key == "main": continue names_for_main.append(key) body.append( ast.ClassDef( bases=[ast.Name("BaseFragment")], name="_meta_" + key, body=[ ast.Assign( targets=[ast.Name("__lexicon__")], value=ast.Constant(lex_nsid), lineno=0, ), ast.Assign( targets=[ast.Name("__fragment__")], value=ast.Constant(key), lineno=0, ), ], ) ) body.append( ast.ClassDef( "_frag_" + key, bases=[ast.Name(TYPE_MAP[fragment.__lexicon_type__])], keywords=[ast.keyword("metaclass", ast.Name("_meta_" + key))], body=[ ast.Assign( [ast.Name("__qualname__")], ast.Constant(key), lineno=0 ), *content_for_type(fragment), ], ), ) if "main" in lexicon.defs: fragment = lexicon.defs["main"] body.append( ast.ClassDef( bases=[ast.Name("BaseFragment")], name="_meta_main", body=[ ast.Assign( targets=[ast.Name("__lexicon__")], value=ast.Constant(lex_nsid), lineno=0, ), ast.Assign( targets=[ast.Name("__fragment__")], value=ast.Constant("main"), lineno=0, ), *[ ast.Assign( targets=[ast.Name(name)], value=ast.Name("_frag_" + name), lineno=0, ) for name in names_for_main ], ], ) ) body.append( ast.ClassDef( "_frag_main", bases=[ast.Name(TYPE_MAP[fragment.__lexicon_type__])], keywords=[ast.keyword("metaclass", ast.Name("_meta_main"))], body=[ ast.Assign( [ast.Name("__qualname__")], ast.Constant("main"), lineno=0 ), *content_for_type(fragment), ], ), ) body.append( ast.Assign( targets=[ast.Name(lexicon.id.name)], value=ast.Name("_frag_main"), lineno=0, ) ) else: body.append( ast.ClassDef( bases=[ast.Name("BaseExport")], name="_export", body=[ ast.Assign( targets=[ast.Name("__lexicon__")], value=ast.Constant(lex_nsid), lineno=0, ), *[ ast.Assign( targets=[ast.Name(name)], value=ast.Name("_frag_" + name), lineno=0, ) for name in names_for_main ], ], ) ) body.append( ast.Assign( targets=[ast.Name(lexicon.id.name)], value=ast.Call(ast.Name("_export")), lineno=0, ) ) return ast.Module( body=body, ) def convert_lexicon(lexicon: LLexicon): lexicon_authority_store = pathlib.Path(LEXICON_STORE, *lexicon.id.domain_authority) os.makedirs(lexicon_authority_store, exist_ok=True) lexicon_store = pathlib.Path(LEXICON_STORE, *lexicon.id.segments).with_suffix(".py") with lexicon_store.open("w", encoding="utf-8") as file: file.write(ast.unparse(create_lexicon(lexicon))) def regenerate_dunder_inits(changed: NSID): segments = ["", *changed.domain_authority] lng = len(segments) - 1 for idx in range(len(segments)): segment_path = pathlib.Path(LEXICON_STORE, *segments[: idx + 1]) files = list(segment_path.glob("*")) file_content = ast.unparse( ast.Module( [ *[ ast.ImportFrom( ("." if idx < lng else "." + file.with_suffix("").name), [ast.alias(file.with_suffix("").name)], level=0, lineno=0, ) for file in files if not file.name.startswith("__") ], ast.Assign( [ast.Name("__all__")], ast.List( [ ast.Constant(file.with_suffix("").name) for file in files if not file.name.startswith("__") ] ), lineno=0, ), ] ) ) with (segment_path / "__init__.py").open("w", encoding="utf-8") as fw: fw.write(file_content)