Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3
4import argparse
5import json
6import pprint
7import time
8
9from lib import YnlFamily, Netlink, NlError
10
11
12class YnlEncoder(json.JSONEncoder):
13 def default(self, obj):
14 if isinstance(obj, bytes):
15 return bytes.hex(obj)
16 if isinstance(obj, set):
17 return list(obj)
18 return json.JSONEncoder.default(self, obj)
19
20
21def main():
22 description = """
23 YNL CLI utility - a general purpose netlink utility that uses YAML
24 specs to drive protocol encoding and decoding.
25 """
26 epilog = """
27 The --multi option can be repeated to include several do operations
28 in the same netlink payload.
29 """
30
31 parser = argparse.ArgumentParser(description=description,
32 epilog=epilog)
33 parser.add_argument('--spec', dest='spec', type=str, required=True)
34 parser.add_argument('--schema', dest='schema', type=str)
35 parser.add_argument('--no-schema', action='store_true')
36 parser.add_argument('--json', dest='json_text', type=str)
37
38 group = parser.add_mutually_exclusive_group()
39 group.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str)
40 group.add_argument('--multi', dest='multi', nargs=2, action='append',
41 metavar=('DO-OPERATION', 'JSON_TEXT'), type=str)
42 group.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str)
43 group.add_argument('--list-ops', action='store_true')
44 group.add_argument('--list-msgs', action='store_true')
45
46 parser.add_argument('--sleep', dest='sleep', type=int)
47 parser.add_argument('--subscribe', dest='ntf', type=str)
48 parser.add_argument('--replace', dest='flags', action='append_const',
49 const=Netlink.NLM_F_REPLACE)
50 parser.add_argument('--excl', dest='flags', action='append_const',
51 const=Netlink.NLM_F_EXCL)
52 parser.add_argument('--create', dest='flags', action='append_const',
53 const=Netlink.NLM_F_CREATE)
54 parser.add_argument('--append', dest='flags', action='append_const',
55 const=Netlink.NLM_F_APPEND)
56 parser.add_argument('--process-unknown', action=argparse.BooleanOptionalAction)
57 parser.add_argument('--output-json', action='store_true')
58 parser.add_argument('--dbg-small-recv', default=0, const=4000,
59 action='store', nargs='?', type=int)
60 args = parser.parse_args()
61
62 def output(msg):
63 if args.output_json:
64 print(json.dumps(msg, cls=YnlEncoder))
65 else:
66 pprint.PrettyPrinter().pprint(msg)
67
68 if args.no_schema:
69 args.schema = ''
70
71 attrs = {}
72 if args.json_text:
73 attrs = json.loads(args.json_text)
74
75 ynl = YnlFamily(args.spec, args.schema, args.process_unknown,
76 recv_size=args.dbg_small_recv)
77 if args.dbg_small_recv:
78 ynl.set_recv_dbg(True)
79
80 if args.ntf:
81 ynl.ntf_subscribe(args.ntf)
82
83 if args.sleep:
84 time.sleep(args.sleep)
85
86 if args.list_ops:
87 for op_name, op in ynl.ops.items():
88 print(op_name, " [", ", ".join(op.modes), "]")
89 if args.list_msgs:
90 for op_name, op in ynl.msgs.items():
91 print(op_name, " [", ", ".join(op.modes), "]")
92
93 try:
94 if args.do:
95 reply = ynl.do(args.do, attrs, args.flags)
96 output(reply)
97 if args.dump:
98 reply = ynl.dump(args.dump, attrs)
99 output(reply)
100 if args.multi:
101 ops = [ (item[0], json.loads(item[1]), args.flags or []) for item in args.multi ]
102 reply = ynl.do_multi(ops)
103 output(reply)
104 except NlError as e:
105 print(e)
106 exit(1)
107
108 if args.ntf:
109 ynl.check_ntf()
110 output(ynl.async_msg_queue)
111
112
113if __name__ == "__main__":
114 main()