keyboard stuff
1"""Functions that help us generate and use info.json files.
2"""
3import json
4import hjson
5import jsonschema
6from collections.abc import Mapping
7from functools import lru_cache
8from typing import OrderedDict
9from pathlib import Path
10from copy import deepcopy
11
12from milc import cli
13
14from qmk.util import maybe_exit
15
16
17def _dict_raise_on_duplicates(ordered_pairs):
18 """Reject duplicate keys."""
19 d = {}
20 for k, v in ordered_pairs:
21 if k in d:
22 raise ValueError("duplicate key: %r" % (k,))
23 else:
24 d[k] = v
25 return d
26
27
28@lru_cache(maxsize=20)
29def _json_load_impl(json_file, strict=True):
30 """Load a json file from disk.
31
32 Note: file must be a Path object.
33 """
34 try:
35 # Get the IO Stream for Path objects
36 # Not necessary if the data is provided via stdin
37 if isinstance(json_file, Path):
38 json_file = json_file.open(encoding='utf-8')
39 return hjson.load(json_file, object_pairs_hook=_dict_raise_on_duplicates if strict else None)
40
41 except (json.decoder.JSONDecodeError, hjson.HjsonDecodeError) as e:
42 cli.log.error('Invalid JSON encountered attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
43 maybe_exit(1)
44 except Exception as e:
45 cli.log.error('Unknown error attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
46 maybe_exit(1)
47
48
49def json_load(json_file, strict=True):
50 return deepcopy(_json_load_impl(json_file=json_file, strict=strict))
51
52
53@lru_cache(maxsize=20)
54def load_jsonschema(schema_name):
55 """Read a jsonschema file from disk.
56 """
57 if Path(schema_name).exists():
58 return json_load(schema_name)
59
60 schema_path = Path(f'data/schemas/{schema_name}.jsonschema')
61
62 if not schema_path.exists():
63 schema_path = Path('data/schemas/false.jsonschema')
64
65 return json_load(schema_path)
66
67
68@lru_cache(maxsize=1)
69def compile_schema_store():
70 """Compile all our schemas into a schema store.
71 """
72 schema_store = {}
73
74 for schema_file in Path('data/schemas').glob('*.jsonschema'):
75 schema_data = load_jsonschema(schema_file)
76 if not isinstance(schema_data, dict):
77 cli.log.debug('Skipping schema file %s', schema_file)
78 continue
79
80 # `$id`-based references
81 schema_store[schema_data['$id']] = schema_data
82
83 # Path-based references
84 schema_store[Path(schema_file).name] = schema_data
85
86 return schema_store
87
88
89@lru_cache(maxsize=20)
90def create_validator(schema):
91 """Creates a validator for the given schema id.
92 """
93 schema_store = compile_schema_store()
94 resolver = jsonschema.RefResolver.from_schema(schema_store[schema], store=schema_store)
95
96 return jsonschema.Draft202012Validator(schema_store[schema], resolver=resolver).validate
97
98
99def validate(data, schema):
100 """Validates data against a schema.
101 """
102 validator = create_validator(schema)
103
104 return validator(data)
105
106
107def deep_update(origdict, newdict):
108 """Update a dictionary in place, recursing to do a depth-first deep copy.
109 """
110 for key, value in newdict.items():
111 if isinstance(value, Mapping):
112 origdict[key] = deep_update(origdict.get(key, {}), value)
113
114 else:
115 origdict[key] = value
116
117 return origdict
118
119
120def merge_ordered_dicts(dicts):
121 """Merges nested OrderedDict objects resulting from reading a hjson file.
122 Later input dicts overrides earlier dicts for plain values.
123 If any value is "!delete!", the existing value will be removed from its parent.
124 Arrays will be appended. If the first entry of an array is "!reset!", the contents of the array will be cleared and replaced with RHS.
125 Dictionaries will be recursively merged. If any entry is "!reset!", the contents of the dictionary will be cleared and replaced with RHS.
126 """
127 result = OrderedDict()
128
129 def add_entry(target, k, v):
130 if k in target and isinstance(v, (OrderedDict, dict)):
131 if "!reset!" in v:
132 target[k] = v
133 else:
134 target[k] = merge_ordered_dicts([target[k], v])
135 if "!reset!" in target[k]:
136 del target[k]["!reset!"]
137 elif k in target and isinstance(v, list):
138 if v[0] == '!reset!':
139 target[k] = v[1:]
140 else:
141 target[k] = target[k] + v
142 elif v == "!delete!" and isinstance(target, (OrderedDict, dict)):
143 del target[k]
144 else:
145 target[k] = v
146
147 for d in dicts:
148 for (k, v) in d.items():
149 add_entry(result, k, v)
150
151 return result