Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1// SPDX-License-Identifier: GPL-2.0-or-later
2/* In-kernel rxperf server for testing purposes.
3 *
4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
6 */
7
8#define pr_fmt(fmt) "rxperf: " fmt
9#include <linux/module.h>
10#include <linux/slab.h>
11#include <crypto/krb5.h>
12#include <net/sock.h>
13#include <net/af_rxrpc.h>
14#define RXRPC_TRACE_ONLY_DEFINE_ENUMS
15#include <trace/events/rxrpc.h>
16
17MODULE_DESCRIPTION("rxperf test server (afs)");
18MODULE_AUTHOR("Red Hat, Inc.");
19MODULE_LICENSE("GPL");
20
21#define RXPERF_PORT 7009
22#define RX_PERF_SERVICE 147
23#define RX_PERF_VERSION 3
24#define RX_PERF_SEND 0
25#define RX_PERF_RECV 1
26#define RX_PERF_RPC 3
27#define RX_PERF_FILE 4
28#define RX_PERF_MAGIC_COOKIE 0x4711
29
30struct rxperf_proto_params {
31 __be32 version;
32 __be32 type;
33 __be32 rsize;
34 __be32 wsize;
35} __packed;
36
37static const u8 rxperf_magic_cookie[] = { 0x00, 0x00, 0x47, 0x11 };
38static const u8 secret[8] = { 0xa7, 0x83, 0x8a, 0xcb, 0xc7, 0x83, 0xec, 0x94 };
39
40enum rxperf_call_state {
41 RXPERF_CALL_SV_AWAIT_PARAMS, /* Server: Awaiting parameter block */
42 RXPERF_CALL_SV_AWAIT_REQUEST, /* Server: Awaiting request data */
43 RXPERF_CALL_SV_REPLYING, /* Server: Replying */
44 RXPERF_CALL_SV_AWAIT_ACK, /* Server: Awaiting final ACK */
45 RXPERF_CALL_COMPLETE, /* Completed or failed */
46};
47
48struct rxperf_call {
49 struct rxrpc_call *rxcall;
50 struct iov_iter iter;
51 struct kvec kvec[1];
52 struct work_struct work;
53 const char *type;
54 size_t iov_len;
55 size_t req_len; /* Size of request blob */
56 size_t reply_len; /* Size of reply blob */
57 unsigned int debug_id;
58 unsigned int operation_id;
59 struct rxperf_proto_params params;
60 __be32 tmp[2];
61 s32 abort_code;
62 enum rxperf_call_state state;
63 short error;
64 unsigned short unmarshal;
65 u16 service_id;
66 int (*deliver)(struct rxperf_call *call);
67 void (*processor)(struct work_struct *work);
68};
69
70static struct socket *rxperf_socket;
71static struct key *rxperf_sec_keyring; /* Ring of security/crypto keys */
72static struct workqueue_struct *rxperf_workqueue;
73
74static void rxperf_deliver_to_call(struct work_struct *work);
75static int rxperf_deliver_param_block(struct rxperf_call *call);
76static int rxperf_deliver_request(struct rxperf_call *call);
77static int rxperf_process_call(struct rxperf_call *call);
78static void rxperf_charge_preallocation(struct work_struct *work);
79
80static DECLARE_WORK(rxperf_charge_preallocation_work,
81 rxperf_charge_preallocation);
82
83static inline void rxperf_set_call_state(struct rxperf_call *call,
84 enum rxperf_call_state to)
85{
86 call->state = to;
87}
88
89static inline void rxperf_set_call_complete(struct rxperf_call *call,
90 int error, s32 remote_abort)
91{
92 if (call->state != RXPERF_CALL_COMPLETE) {
93 call->abort_code = remote_abort;
94 call->error = error;
95 call->state = RXPERF_CALL_COMPLETE;
96 }
97}
98
99static void rxperf_rx_discard_new_call(struct rxrpc_call *rxcall,
100 unsigned long user_call_ID)
101{
102 kfree((struct rxperf_call *)user_call_ID);
103}
104
105static void rxperf_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
106 unsigned long user_call_ID)
107{
108 queue_work(rxperf_workqueue, &rxperf_charge_preallocation_work);
109}
110
111static void rxperf_queue_call_work(struct rxperf_call *call)
112{
113 queue_work(rxperf_workqueue, &call->work);
114}
115
116static void rxperf_notify_rx(struct sock *sk, struct rxrpc_call *rxcall,
117 unsigned long call_user_ID)
118{
119 struct rxperf_call *call = (struct rxperf_call *)call_user_ID;
120
121 if (call->state != RXPERF_CALL_COMPLETE)
122 rxperf_queue_call_work(call);
123}
124
125static void rxperf_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
126{
127 struct rxperf_call *call = (struct rxperf_call *)user_call_ID;
128
129 call->rxcall = rxcall;
130}
131
132static void rxperf_notify_end_reply_tx(struct sock *sock,
133 struct rxrpc_call *rxcall,
134 unsigned long call_user_ID)
135{
136 rxperf_set_call_state((struct rxperf_call *)call_user_ID,
137 RXPERF_CALL_SV_AWAIT_ACK);
138}
139
140static const struct rxrpc_kernel_ops rxperf_rxrpc_callback_ops = {
141 .notify_new_call = rxperf_rx_new_call,
142 .discard_new_call = rxperf_rx_discard_new_call,
143 .user_attach_call = rxperf_rx_attach,
144};
145
146/*
147 * Charge the incoming call preallocation.
148 */
149static void rxperf_charge_preallocation(struct work_struct *work)
150{
151 struct rxperf_call *call;
152
153 for (;;) {
154 call = kzalloc(sizeof(*call), GFP_KERNEL);
155 if (!call)
156 break;
157
158 call->type = "unset";
159 call->debug_id = atomic_inc_return(&rxrpc_debug_id);
160 call->deliver = rxperf_deliver_param_block;
161 call->state = RXPERF_CALL_SV_AWAIT_PARAMS;
162 call->service_id = RX_PERF_SERVICE;
163 call->iov_len = sizeof(call->params);
164 call->kvec[0].iov_len = sizeof(call->params);
165 call->kvec[0].iov_base = &call->params;
166 iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
167 INIT_WORK(&call->work, rxperf_deliver_to_call);
168
169 if (rxrpc_kernel_charge_accept(rxperf_socket,
170 rxperf_notify_rx,
171 (unsigned long)call,
172 GFP_KERNEL,
173 call->debug_id) < 0)
174 break;
175 call = NULL;
176 }
177
178 kfree(call);
179}
180
181/*
182 * Open an rxrpc socket and bind it to be a server for callback notifications
183 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
184 */
185static int rxperf_open_socket(void)
186{
187 struct sockaddr_rxrpc srx;
188 struct socket *socket;
189 int ret;
190
191 ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6,
192 &socket);
193 if (ret < 0)
194 goto error_1;
195
196 socket->sk->sk_allocation = GFP_NOFS;
197
198 /* bind the callback manager's address to make this a server socket */
199 memset(&srx, 0, sizeof(srx));
200 srx.srx_family = AF_RXRPC;
201 srx.srx_service = RX_PERF_SERVICE;
202 srx.transport_type = SOCK_DGRAM;
203 srx.transport_len = sizeof(srx.transport.sin6);
204 srx.transport.sin6.sin6_family = AF_INET6;
205 srx.transport.sin6.sin6_port = htons(RXPERF_PORT);
206
207 ret = rxrpc_sock_set_min_security_level(socket->sk,
208 RXRPC_SECURITY_ENCRYPT);
209 if (ret < 0)
210 goto error_2;
211
212 ret = rxrpc_sock_set_security_keyring(socket->sk, rxperf_sec_keyring);
213
214 ret = kernel_bind(socket, (struct sockaddr_unsized *)&srx, sizeof(srx));
215 if (ret < 0)
216 goto error_2;
217
218 rxrpc_kernel_set_notifications(socket, &rxperf_rxrpc_callback_ops);
219
220 ret = kernel_listen(socket, INT_MAX);
221 if (ret < 0)
222 goto error_2;
223
224 rxperf_socket = socket;
225 rxperf_charge_preallocation(&rxperf_charge_preallocation_work);
226 return 0;
227
228error_2:
229 sock_release(socket);
230error_1:
231 pr_err("Can't set up rxperf socket: %d\n", ret);
232 return ret;
233}
234
235/*
236 * close the rxrpc socket rxperf was using
237 */
238static void rxperf_close_socket(void)
239{
240 kernel_listen(rxperf_socket, 0);
241 kernel_sock_shutdown(rxperf_socket, SHUT_RDWR);
242 flush_workqueue(rxperf_workqueue);
243 sock_release(rxperf_socket);
244}
245
246/*
247 * Log remote abort codes that indicate that we have a protocol disagreement
248 * with the server.
249 */
250static void rxperf_log_error(struct rxperf_call *call, s32 remote_abort)
251{
252 static int max = 0;
253 const char *msg;
254 int m;
255
256 switch (remote_abort) {
257 case RX_EOF: msg = "unexpected EOF"; break;
258 case RXGEN_CC_MARSHAL: msg = "client marshalling"; break;
259 case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling"; break;
260 case RXGEN_SS_MARSHAL: msg = "server marshalling"; break;
261 case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling"; break;
262 case RXGEN_DECODE: msg = "opcode decode"; break;
263 case RXGEN_SS_XDRFREE: msg = "server XDR cleanup"; break;
264 case RXGEN_CC_XDRFREE: msg = "client XDR cleanup"; break;
265 case -32: msg = "insufficient data"; break;
266 default:
267 return;
268 }
269
270 m = max;
271 if (m < 3) {
272 max = m + 1;
273 pr_info("Peer reported %s failure on %s\n", msg, call->type);
274 }
275}
276
277/*
278 * deliver messages to a call
279 */
280static void rxperf_deliver_to_call(struct work_struct *work)
281{
282 struct rxperf_call *call = container_of(work, struct rxperf_call, work);
283 enum rxperf_call_state state;
284 u32 abort_code, remote_abort = 0;
285 int ret = 0;
286
287 if (call->state == RXPERF_CALL_COMPLETE)
288 return;
289
290 while (state = call->state,
291 state == RXPERF_CALL_SV_AWAIT_PARAMS ||
292 state == RXPERF_CALL_SV_AWAIT_REQUEST ||
293 state == RXPERF_CALL_SV_AWAIT_ACK
294 ) {
295 if (state == RXPERF_CALL_SV_AWAIT_ACK) {
296 if (!rxrpc_kernel_check_life(rxperf_socket, call->rxcall))
297 goto call_complete;
298 return;
299 }
300
301 ret = call->deliver(call);
302 if (ret == 0)
303 ret = rxperf_process_call(call);
304
305 switch (ret) {
306 case 0:
307 continue;
308 case -EINPROGRESS:
309 case -EAGAIN:
310 return;
311 case -ECONNABORTED:
312 rxperf_log_error(call, call->abort_code);
313 goto call_complete;
314 case -EOPNOTSUPP:
315 abort_code = RXGEN_OPCODE;
316 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
317 abort_code, ret,
318 rxperf_abort_op_not_supported);
319 goto call_complete;
320 case -ENOTSUPP:
321 abort_code = RX_USER_ABORT;
322 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
323 abort_code, ret,
324 rxperf_abort_op_not_supported);
325 goto call_complete;
326 case -EIO:
327 pr_err("Call %u in bad state %u\n",
328 call->debug_id, call->state);
329 fallthrough;
330 case -ENODATA:
331 case -EBADMSG:
332 case -EMSGSIZE:
333 case -ENOMEM:
334 case -EFAULT:
335 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
336 RXGEN_SS_UNMARSHAL, ret,
337 rxperf_abort_unmarshal_error);
338 goto call_complete;
339 default:
340 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
341 RX_CALL_DEAD, ret,
342 rxperf_abort_general_error);
343 goto call_complete;
344 }
345 }
346
347call_complete:
348 rxperf_set_call_complete(call, ret, remote_abort);
349 /* The call may have been requeued */
350 rxrpc_kernel_shutdown_call(rxperf_socket, call->rxcall);
351 rxrpc_kernel_put_call(rxperf_socket, call->rxcall);
352 cancel_work(&call->work);
353 kfree(call);
354}
355
356/*
357 * Extract a piece of data from the received data socket buffers.
358 */
359static int rxperf_extract_data(struct rxperf_call *call, bool want_more)
360{
361 u32 remote_abort = 0;
362 int ret;
363
364 ret = rxrpc_kernel_recv_data(rxperf_socket, call->rxcall, &call->iter,
365 &call->iov_len, want_more, &remote_abort,
366 &call->service_id);
367 pr_debug("Extract i=%zu l=%zu m=%u ret=%d\n",
368 iov_iter_count(&call->iter), call->iov_len, want_more, ret);
369 if (ret == 0 || ret == -EAGAIN)
370 return ret;
371
372 if (ret == 1) {
373 switch (call->state) {
374 case RXPERF_CALL_SV_AWAIT_REQUEST:
375 rxperf_set_call_state(call, RXPERF_CALL_SV_REPLYING);
376 break;
377 case RXPERF_CALL_COMPLETE:
378 pr_debug("premature completion %d", call->error);
379 return call->error;
380 default:
381 break;
382 }
383 return 0;
384 }
385
386 rxperf_set_call_complete(call, ret, remote_abort);
387 return ret;
388}
389
390/*
391 * Grab the operation ID from an incoming manager call.
392 */
393static int rxperf_deliver_param_block(struct rxperf_call *call)
394{
395 u32 version;
396 int ret;
397
398 /* Extract the parameter block */
399 ret = rxperf_extract_data(call, true);
400 if (ret < 0)
401 return ret;
402
403 version = ntohl(call->params.version);
404 call->operation_id = ntohl(call->params.type);
405 call->deliver = rxperf_deliver_request;
406
407 if (version != RX_PERF_VERSION) {
408 pr_info("Version mismatch %x\n", version);
409 return -ENOTSUPP;
410 }
411
412 switch (call->operation_id) {
413 case RX_PERF_SEND:
414 call->type = "send";
415 call->reply_len = 0;
416 call->iov_len = 4; /* Expect req size */
417 break;
418 case RX_PERF_RECV:
419 call->type = "recv";
420 call->req_len = 0;
421 call->iov_len = 4; /* Expect reply size */
422 break;
423 case RX_PERF_RPC:
424 call->type = "rpc";
425 call->iov_len = 8; /* Expect req size and reply size */
426 break;
427 case RX_PERF_FILE:
428 call->type = "file";
429 fallthrough;
430 default:
431 return -EOPNOTSUPP;
432 }
433
434 rxperf_set_call_state(call, RXPERF_CALL_SV_AWAIT_REQUEST);
435 return call->deliver(call);
436}
437
438/*
439 * Deliver the request data.
440 */
441static int rxperf_deliver_request(struct rxperf_call *call)
442{
443 int ret;
444
445 switch (call->unmarshal) {
446 case 0:
447 call->kvec[0].iov_len = call->iov_len;
448 call->kvec[0].iov_base = call->tmp;
449 iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
450 call->unmarshal++;
451 fallthrough;
452 case 1:
453 ret = rxperf_extract_data(call, true);
454 if (ret < 0)
455 return ret;
456
457 switch (call->operation_id) {
458 case RX_PERF_SEND:
459 call->type = "send";
460 call->req_len = ntohl(call->tmp[0]);
461 call->reply_len = 0;
462 break;
463 case RX_PERF_RECV:
464 call->type = "recv";
465 call->req_len = 0;
466 call->reply_len = ntohl(call->tmp[0]);
467 break;
468 case RX_PERF_RPC:
469 call->type = "rpc";
470 call->req_len = ntohl(call->tmp[0]);
471 call->reply_len = ntohl(call->tmp[1]);
472 break;
473 default:
474 pr_info("Can't parse extra params\n");
475 return -EIO;
476 }
477
478 pr_debug("CALL op=%s rq=%zx rp=%zx\n",
479 call->type, call->req_len, call->reply_len);
480
481 call->iov_len = call->req_len;
482 iov_iter_discard(&call->iter, READ, call->req_len);
483 call->unmarshal++;
484 fallthrough;
485 case 2:
486 ret = rxperf_extract_data(call, true);
487 if (ret < 0)
488 return ret;
489
490 /* Deal with the terminal magic cookie. */
491 call->iov_len = 4;
492 call->kvec[0].iov_len = call->iov_len;
493 call->kvec[0].iov_base = call->tmp;
494 iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
495 call->unmarshal++;
496 fallthrough;
497 case 3:
498 ret = rxperf_extract_data(call, false);
499 if (ret < 0)
500 return ret;
501 call->unmarshal++;
502 fallthrough;
503 default:
504 return 0;
505 }
506}
507
508/*
509 * Process a call for which we've received the request.
510 */
511static int rxperf_process_call(struct rxperf_call *call)
512{
513 struct msghdr msg = {};
514 struct bio_vec bv;
515 struct kvec iov[1];
516 ssize_t n;
517 size_t reply_len = call->reply_len, len;
518
519 rxrpc_kernel_set_tx_length(rxperf_socket, call->rxcall,
520 reply_len + sizeof(rxperf_magic_cookie));
521
522 while (reply_len > 0) {
523 len = umin(reply_len, PAGE_SIZE);
524 bvec_set_page(&bv, ZERO_PAGE(0), len, 0);
525 iov_iter_bvec(&msg.msg_iter, WRITE, &bv, 1, len);
526 msg.msg_flags = MSG_MORE;
527 n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg,
528 len, rxperf_notify_end_reply_tx);
529 if (n < 0)
530 return n;
531 if (n == 0)
532 return -EIO;
533 reply_len -= n;
534 }
535
536 len = sizeof(rxperf_magic_cookie);
537 iov[0].iov_base = (void *)rxperf_magic_cookie;
538 iov[0].iov_len = len;
539 iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
540 msg.msg_flags = 0;
541 n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, len,
542 rxperf_notify_end_reply_tx);
543 if (n >= 0)
544 return 0; /* Success */
545
546 if (n == -ENOMEM)
547 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
548 RXGEN_SS_MARSHAL, -ENOMEM,
549 rxperf_abort_oom);
550 return n;
551}
552
553/*
554 * Add an rxkad key to the security keyring.
555 */
556static int rxperf_add_rxkad_key(struct key *keyring)
557{
558 key_ref_t kref;
559 int ret;
560
561 kref = key_create_or_update(make_key_ref(keyring, true),
562 "rxrpc_s",
563 __stringify(RX_PERF_SERVICE) ":2",
564 secret,
565 sizeof(secret),
566 KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH
567 | KEY_USR_VIEW,
568 KEY_ALLOC_NOT_IN_QUOTA);
569
570 if (IS_ERR(kref)) {
571 pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref));
572 return PTR_ERR(kref);
573 }
574
575 ret = key_link(keyring, key_ref_to_ptr(kref));
576 if (ret < 0)
577 pr_err("Can't link rxperf server key: %d\n", ret);
578 key_ref_put(kref);
579 return ret;
580}
581
582#ifdef CONFIG_RXGK
583/*
584 * Add a yfs-rxgk key to the security keyring.
585 */
586static int rxperf_add_yfs_rxgk_key(struct key *keyring, u32 enctype)
587{
588 const struct krb5_enctype *krb5 = crypto_krb5_find_enctype(enctype);
589 key_ref_t kref;
590 char name[64];
591 int ret;
592 u8 key[32];
593
594 if (!krb5 || krb5->key_len > sizeof(key))
595 return 0;
596
597 /* The key is just { 0, 1, 2, 3, 4, ... } */
598 for (int i = 0; i < krb5->key_len; i++)
599 key[i] = i;
600
601 sprintf(name, "%u:6:1:%u", RX_PERF_SERVICE, enctype);
602
603 kref = key_create_or_update(make_key_ref(keyring, true),
604 "rxrpc_s", name,
605 key, krb5->key_len,
606 KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH |
607 KEY_USR_VIEW,
608 KEY_ALLOC_NOT_IN_QUOTA);
609
610 if (IS_ERR(kref)) {
611 pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref));
612 return PTR_ERR(kref);
613 }
614
615 ret = key_link(keyring, key_ref_to_ptr(kref));
616 if (ret < 0)
617 pr_err("Can't link rxperf server key: %d\n", ret);
618 key_ref_put(kref);
619 return ret;
620}
621#endif
622
623/*
624 * Initialise the rxperf server.
625 */
626static int __init rxperf_init(void)
627{
628 struct key *keyring;
629 int ret = -ENOMEM;
630
631 pr_info("Server registering\n");
632
633 rxperf_workqueue = alloc_workqueue("rxperf", WQ_PERCPU, 0);
634 if (!rxperf_workqueue)
635 goto error_workqueue;
636
637 keyring = keyring_alloc("rxperf_server",
638 GLOBAL_ROOT_UID, GLOBAL_ROOT_GID, current_cred(),
639 KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH |
640 KEY_POS_WRITE |
641 KEY_USR_VIEW | KEY_USR_READ | KEY_USR_SEARCH |
642 KEY_USR_WRITE |
643 KEY_OTH_VIEW | KEY_OTH_READ | KEY_OTH_SEARCH,
644 KEY_ALLOC_NOT_IN_QUOTA,
645 NULL, NULL);
646 if (IS_ERR(keyring)) {
647 pr_err("Can't allocate rxperf server keyring: %ld\n",
648 PTR_ERR(keyring));
649 goto error_keyring;
650 }
651 rxperf_sec_keyring = keyring;
652 ret = rxperf_add_rxkad_key(keyring);
653 if (ret < 0)
654 goto error_key;
655#ifdef CONFIG_RXGK
656 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES128_CTS_HMAC_SHA1_96);
657 if (ret < 0)
658 goto error_key;
659 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES256_CTS_HMAC_SHA1_96);
660 if (ret < 0)
661 goto error_key;
662 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES128_CTS_HMAC_SHA256_128);
663 if (ret < 0)
664 goto error_key;
665 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES256_CTS_HMAC_SHA384_192);
666 if (ret < 0)
667 goto error_key;
668 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_CAMELLIA128_CTS_CMAC);
669 if (ret < 0)
670 goto error_key;
671 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_CAMELLIA256_CTS_CMAC);
672 if (ret < 0)
673 goto error_key;
674#endif
675
676 ret = rxperf_open_socket();
677 if (ret < 0)
678 goto error_socket;
679 return 0;
680
681error_socket:
682error_key:
683 key_put(rxperf_sec_keyring);
684error_keyring:
685 destroy_workqueue(rxperf_workqueue);
686 rcu_barrier();
687error_workqueue:
688 pr_err("Failed to register: %d\n", ret);
689 return ret;
690}
691late_initcall(rxperf_init); /* Must be called after net/ to create socket */
692
693static void __exit rxperf_exit(void)
694{
695 pr_info("Server unregistering.\n");
696
697 rxperf_close_socket();
698 key_put(rxperf_sec_keyring);
699 destroy_workqueue(rxperf_workqueue);
700 rcu_barrier();
701}
702module_exit(rxperf_exit);
703