at master 4.6 kB view raw
1"""Functions that help us generate and use info.json files. 2""" 3import json 4import hjson 5import jsonschema 6from collections.abc import Mapping 7from functools import lru_cache 8from typing import OrderedDict 9from pathlib import Path 10from copy import deepcopy 11 12from milc import cli 13 14from qmk.util import maybe_exit 15 16 17def _dict_raise_on_duplicates(ordered_pairs): 18 """Reject duplicate keys.""" 19 d = {} 20 for k, v in ordered_pairs: 21 if k in d: 22 raise ValueError("duplicate key: %r" % (k,)) 23 else: 24 d[k] = v 25 return d 26 27 28@lru_cache(maxsize=20) 29def _json_load_impl(json_file, strict=True): 30 """Load a json file from disk. 31 32 Note: file must be a Path object. 33 """ 34 try: 35 # Get the IO Stream for Path objects 36 # Not necessary if the data is provided via stdin 37 if isinstance(json_file, Path): 38 json_file = json_file.open(encoding='utf-8') 39 return hjson.load(json_file, object_pairs_hook=_dict_raise_on_duplicates if strict else None) 40 41 except (json.decoder.JSONDecodeError, hjson.HjsonDecodeError) as e: 42 cli.log.error('Invalid JSON encountered attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e) 43 maybe_exit(1) 44 except Exception as e: 45 cli.log.error('Unknown error attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e) 46 maybe_exit(1) 47 48 49def json_load(json_file, strict=True): 50 return deepcopy(_json_load_impl(json_file=json_file, strict=strict)) 51 52 53@lru_cache(maxsize=20) 54def load_jsonschema(schema_name): 55 """Read a jsonschema file from disk. 56 """ 57 if Path(schema_name).exists(): 58 return json_load(schema_name) 59 60 schema_path = Path(f'data/schemas/{schema_name}.jsonschema') 61 62 if not schema_path.exists(): 63 schema_path = Path('data/schemas/false.jsonschema') 64 65 return json_load(schema_path) 66 67 68@lru_cache(maxsize=1) 69def compile_schema_store(): 70 """Compile all our schemas into a schema store. 71 """ 72 schema_store = {} 73 74 for schema_file in Path('data/schemas').glob('*.jsonschema'): 75 schema_data = load_jsonschema(schema_file) 76 if not isinstance(schema_data, dict): 77 cli.log.debug('Skipping schema file %s', schema_file) 78 continue 79 80 # `$id`-based references 81 schema_store[schema_data['$id']] = schema_data 82 83 # Path-based references 84 schema_store[Path(schema_file).name] = schema_data 85 86 return schema_store 87 88 89@lru_cache(maxsize=20) 90def create_validator(schema): 91 """Creates a validator for the given schema id. 92 """ 93 schema_store = compile_schema_store() 94 resolver = jsonschema.RefResolver.from_schema(schema_store[schema], store=schema_store) 95 96 return jsonschema.Draft202012Validator(schema_store[schema], resolver=resolver).validate 97 98 99def validate(data, schema): 100 """Validates data against a schema. 101 """ 102 validator = create_validator(schema) 103 104 return validator(data) 105 106 107def deep_update(origdict, newdict): 108 """Update a dictionary in place, recursing to do a depth-first deep copy. 109 """ 110 for key, value in newdict.items(): 111 if isinstance(value, Mapping): 112 origdict[key] = deep_update(origdict.get(key, {}), value) 113 114 else: 115 origdict[key] = value 116 117 return origdict 118 119 120def merge_ordered_dicts(dicts): 121 """Merges nested OrderedDict objects resulting from reading a hjson file. 122 Later input dicts overrides earlier dicts for plain values. 123 If any value is "!delete!", the existing value will be removed from its parent. 124 Arrays will be appended. If the first entry of an array is "!reset!", the contents of the array will be cleared and replaced with RHS. 125 Dictionaries will be recursively merged. If any entry is "!reset!", the contents of the dictionary will be cleared and replaced with RHS. 126 """ 127 result = OrderedDict() 128 129 def add_entry(target, k, v): 130 if k in target and isinstance(v, (OrderedDict, dict)): 131 if "!reset!" in v: 132 target[k] = v 133 else: 134 target[k] = merge_ordered_dicts([target[k], v]) 135 if "!reset!" in target[k]: 136 del target[k]["!reset!"] 137 elif k in target and isinstance(v, list): 138 if v[0] == '!reset!': 139 target[k] = v[1:] 140 else: 141 target[k] = target[k] + v 142 elif v == "!delete!" and isinstance(target, (OrderedDict, dict)): 143 del target[k] 144 else: 145 target[k] = v 146 147 for d in dicts: 148 for (k, v) in d.items(): 149 add_entry(result, k, v) 150 151 return result