at master 18 kB view raw
1// SPDX-License-Identifier: GPL-2.0-or-later 2/* In-kernel rxperf server for testing purposes. 3 * 4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8#define pr_fmt(fmt) "rxperf: " fmt 9#include <linux/module.h> 10#include <linux/slab.h> 11#include <crypto/krb5.h> 12#include <net/sock.h> 13#include <net/af_rxrpc.h> 14#define RXRPC_TRACE_ONLY_DEFINE_ENUMS 15#include <trace/events/rxrpc.h> 16 17MODULE_DESCRIPTION("rxperf test server (afs)"); 18MODULE_AUTHOR("Red Hat, Inc."); 19MODULE_LICENSE("GPL"); 20 21#define RXPERF_PORT 7009 22#define RX_PERF_SERVICE 147 23#define RX_PERF_VERSION 3 24#define RX_PERF_SEND 0 25#define RX_PERF_RECV 1 26#define RX_PERF_RPC 3 27#define RX_PERF_FILE 4 28#define RX_PERF_MAGIC_COOKIE 0x4711 29 30struct rxperf_proto_params { 31 __be32 version; 32 __be32 type; 33 __be32 rsize; 34 __be32 wsize; 35} __packed; 36 37static const u8 rxperf_magic_cookie[] = { 0x00, 0x00, 0x47, 0x11 }; 38static const u8 secret[8] = { 0xa7, 0x83, 0x8a, 0xcb, 0xc7, 0x83, 0xec, 0x94 }; 39 40enum rxperf_call_state { 41 RXPERF_CALL_SV_AWAIT_PARAMS, /* Server: Awaiting parameter block */ 42 RXPERF_CALL_SV_AWAIT_REQUEST, /* Server: Awaiting request data */ 43 RXPERF_CALL_SV_REPLYING, /* Server: Replying */ 44 RXPERF_CALL_SV_AWAIT_ACK, /* Server: Awaiting final ACK */ 45 RXPERF_CALL_COMPLETE, /* Completed or failed */ 46}; 47 48struct rxperf_call { 49 struct rxrpc_call *rxcall; 50 struct iov_iter iter; 51 struct kvec kvec[1]; 52 struct work_struct work; 53 const char *type; 54 size_t iov_len; 55 size_t req_len; /* Size of request blob */ 56 size_t reply_len; /* Size of reply blob */ 57 unsigned int debug_id; 58 unsigned int operation_id; 59 struct rxperf_proto_params params; 60 __be32 tmp[2]; 61 s32 abort_code; 62 enum rxperf_call_state state; 63 short error; 64 unsigned short unmarshal; 65 u16 service_id; 66 int (*deliver)(struct rxperf_call *call); 67 void (*processor)(struct work_struct *work); 68}; 69 70static struct socket *rxperf_socket; 71static struct key *rxperf_sec_keyring; /* Ring of security/crypto keys */ 72static struct workqueue_struct *rxperf_workqueue; 73 74static void rxperf_deliver_to_call(struct work_struct *work); 75static int rxperf_deliver_param_block(struct rxperf_call *call); 76static int rxperf_deliver_request(struct rxperf_call *call); 77static int rxperf_process_call(struct rxperf_call *call); 78static void rxperf_charge_preallocation(struct work_struct *work); 79 80static DECLARE_WORK(rxperf_charge_preallocation_work, 81 rxperf_charge_preallocation); 82 83static inline void rxperf_set_call_state(struct rxperf_call *call, 84 enum rxperf_call_state to) 85{ 86 call->state = to; 87} 88 89static inline void rxperf_set_call_complete(struct rxperf_call *call, 90 int error, s32 remote_abort) 91{ 92 if (call->state != RXPERF_CALL_COMPLETE) { 93 call->abort_code = remote_abort; 94 call->error = error; 95 call->state = RXPERF_CALL_COMPLETE; 96 } 97} 98 99static void rxperf_rx_discard_new_call(struct rxrpc_call *rxcall, 100 unsigned long user_call_ID) 101{ 102 kfree((struct rxperf_call *)user_call_ID); 103} 104 105static void rxperf_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall, 106 unsigned long user_call_ID) 107{ 108 queue_work(rxperf_workqueue, &rxperf_charge_preallocation_work); 109} 110 111static void rxperf_queue_call_work(struct rxperf_call *call) 112{ 113 queue_work(rxperf_workqueue, &call->work); 114} 115 116static void rxperf_notify_rx(struct sock *sk, struct rxrpc_call *rxcall, 117 unsigned long call_user_ID) 118{ 119 struct rxperf_call *call = (struct rxperf_call *)call_user_ID; 120 121 if (call->state != RXPERF_CALL_COMPLETE) 122 rxperf_queue_call_work(call); 123} 124 125static void rxperf_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID) 126{ 127 struct rxperf_call *call = (struct rxperf_call *)user_call_ID; 128 129 call->rxcall = rxcall; 130} 131 132static void rxperf_notify_end_reply_tx(struct sock *sock, 133 struct rxrpc_call *rxcall, 134 unsigned long call_user_ID) 135{ 136 rxperf_set_call_state((struct rxperf_call *)call_user_ID, 137 RXPERF_CALL_SV_AWAIT_ACK); 138} 139 140static const struct rxrpc_kernel_ops rxperf_rxrpc_callback_ops = { 141 .notify_new_call = rxperf_rx_new_call, 142 .discard_new_call = rxperf_rx_discard_new_call, 143 .user_attach_call = rxperf_rx_attach, 144}; 145 146/* 147 * Charge the incoming call preallocation. 148 */ 149static void rxperf_charge_preallocation(struct work_struct *work) 150{ 151 struct rxperf_call *call; 152 153 for (;;) { 154 call = kzalloc(sizeof(*call), GFP_KERNEL); 155 if (!call) 156 break; 157 158 call->type = "unset"; 159 call->debug_id = atomic_inc_return(&rxrpc_debug_id); 160 call->deliver = rxperf_deliver_param_block; 161 call->state = RXPERF_CALL_SV_AWAIT_PARAMS; 162 call->service_id = RX_PERF_SERVICE; 163 call->iov_len = sizeof(call->params); 164 call->kvec[0].iov_len = sizeof(call->params); 165 call->kvec[0].iov_base = &call->params; 166 iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len); 167 INIT_WORK(&call->work, rxperf_deliver_to_call); 168 169 if (rxrpc_kernel_charge_accept(rxperf_socket, 170 rxperf_notify_rx, 171 (unsigned long)call, 172 GFP_KERNEL, 173 call->debug_id) < 0) 174 break; 175 call = NULL; 176 } 177 178 kfree(call); 179} 180 181/* 182 * Open an rxrpc socket and bind it to be a server for callback notifications 183 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT 184 */ 185static int rxperf_open_socket(void) 186{ 187 struct sockaddr_rxrpc srx; 188 struct socket *socket; 189 int ret; 190 191 ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6, 192 &socket); 193 if (ret < 0) 194 goto error_1; 195 196 socket->sk->sk_allocation = GFP_NOFS; 197 198 /* bind the callback manager's address to make this a server socket */ 199 memset(&srx, 0, sizeof(srx)); 200 srx.srx_family = AF_RXRPC; 201 srx.srx_service = RX_PERF_SERVICE; 202 srx.transport_type = SOCK_DGRAM; 203 srx.transport_len = sizeof(srx.transport.sin6); 204 srx.transport.sin6.sin6_family = AF_INET6; 205 srx.transport.sin6.sin6_port = htons(RXPERF_PORT); 206 207 ret = rxrpc_sock_set_min_security_level(socket->sk, 208 RXRPC_SECURITY_ENCRYPT); 209 if (ret < 0) 210 goto error_2; 211 212 ret = rxrpc_sock_set_security_keyring(socket->sk, rxperf_sec_keyring); 213 214 ret = kernel_bind(socket, (struct sockaddr_unsized *)&srx, sizeof(srx)); 215 if (ret < 0) 216 goto error_2; 217 218 rxrpc_kernel_set_notifications(socket, &rxperf_rxrpc_callback_ops); 219 220 ret = kernel_listen(socket, INT_MAX); 221 if (ret < 0) 222 goto error_2; 223 224 rxperf_socket = socket; 225 rxperf_charge_preallocation(&rxperf_charge_preallocation_work); 226 return 0; 227 228error_2: 229 sock_release(socket); 230error_1: 231 pr_err("Can't set up rxperf socket: %d\n", ret); 232 return ret; 233} 234 235/* 236 * close the rxrpc socket rxperf was using 237 */ 238static void rxperf_close_socket(void) 239{ 240 kernel_listen(rxperf_socket, 0); 241 kernel_sock_shutdown(rxperf_socket, SHUT_RDWR); 242 flush_workqueue(rxperf_workqueue); 243 sock_release(rxperf_socket); 244} 245 246/* 247 * Log remote abort codes that indicate that we have a protocol disagreement 248 * with the server. 249 */ 250static void rxperf_log_error(struct rxperf_call *call, s32 remote_abort) 251{ 252 static int max = 0; 253 const char *msg; 254 int m; 255 256 switch (remote_abort) { 257 case RX_EOF: msg = "unexpected EOF"; break; 258 case RXGEN_CC_MARSHAL: msg = "client marshalling"; break; 259 case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling"; break; 260 case RXGEN_SS_MARSHAL: msg = "server marshalling"; break; 261 case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling"; break; 262 case RXGEN_DECODE: msg = "opcode decode"; break; 263 case RXGEN_SS_XDRFREE: msg = "server XDR cleanup"; break; 264 case RXGEN_CC_XDRFREE: msg = "client XDR cleanup"; break; 265 case -32: msg = "insufficient data"; break; 266 default: 267 return; 268 } 269 270 m = max; 271 if (m < 3) { 272 max = m + 1; 273 pr_info("Peer reported %s failure on %s\n", msg, call->type); 274 } 275} 276 277/* 278 * deliver messages to a call 279 */ 280static void rxperf_deliver_to_call(struct work_struct *work) 281{ 282 struct rxperf_call *call = container_of(work, struct rxperf_call, work); 283 enum rxperf_call_state state; 284 u32 abort_code, remote_abort = 0; 285 int ret = 0; 286 287 if (call->state == RXPERF_CALL_COMPLETE) 288 return; 289 290 while (state = call->state, 291 state == RXPERF_CALL_SV_AWAIT_PARAMS || 292 state == RXPERF_CALL_SV_AWAIT_REQUEST || 293 state == RXPERF_CALL_SV_AWAIT_ACK 294 ) { 295 if (state == RXPERF_CALL_SV_AWAIT_ACK) { 296 if (!rxrpc_kernel_check_life(rxperf_socket, call->rxcall)) 297 goto call_complete; 298 return; 299 } 300 301 ret = call->deliver(call); 302 if (ret == 0) 303 ret = rxperf_process_call(call); 304 305 switch (ret) { 306 case 0: 307 continue; 308 case -EINPROGRESS: 309 case -EAGAIN: 310 return; 311 case -ECONNABORTED: 312 rxperf_log_error(call, call->abort_code); 313 goto call_complete; 314 case -EOPNOTSUPP: 315 abort_code = RXGEN_OPCODE; 316 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 317 abort_code, ret, 318 rxperf_abort_op_not_supported); 319 goto call_complete; 320 case -ENOTSUPP: 321 abort_code = RX_USER_ABORT; 322 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 323 abort_code, ret, 324 rxperf_abort_op_not_supported); 325 goto call_complete; 326 case -EIO: 327 pr_err("Call %u in bad state %u\n", 328 call->debug_id, call->state); 329 fallthrough; 330 case -ENODATA: 331 case -EBADMSG: 332 case -EMSGSIZE: 333 case -ENOMEM: 334 case -EFAULT: 335 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 336 RXGEN_SS_UNMARSHAL, ret, 337 rxperf_abort_unmarshal_error); 338 goto call_complete; 339 default: 340 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 341 RX_CALL_DEAD, ret, 342 rxperf_abort_general_error); 343 goto call_complete; 344 } 345 } 346 347call_complete: 348 rxperf_set_call_complete(call, ret, remote_abort); 349 /* The call may have been requeued */ 350 rxrpc_kernel_shutdown_call(rxperf_socket, call->rxcall); 351 rxrpc_kernel_put_call(rxperf_socket, call->rxcall); 352 cancel_work(&call->work); 353 kfree(call); 354} 355 356/* 357 * Extract a piece of data from the received data socket buffers. 358 */ 359static int rxperf_extract_data(struct rxperf_call *call, bool want_more) 360{ 361 u32 remote_abort = 0; 362 int ret; 363 364 ret = rxrpc_kernel_recv_data(rxperf_socket, call->rxcall, &call->iter, 365 &call->iov_len, want_more, &remote_abort, 366 &call->service_id); 367 pr_debug("Extract i=%zu l=%zu m=%u ret=%d\n", 368 iov_iter_count(&call->iter), call->iov_len, want_more, ret); 369 if (ret == 0 || ret == -EAGAIN) 370 return ret; 371 372 if (ret == 1) { 373 switch (call->state) { 374 case RXPERF_CALL_SV_AWAIT_REQUEST: 375 rxperf_set_call_state(call, RXPERF_CALL_SV_REPLYING); 376 break; 377 case RXPERF_CALL_COMPLETE: 378 pr_debug("premature completion %d", call->error); 379 return call->error; 380 default: 381 break; 382 } 383 return 0; 384 } 385 386 rxperf_set_call_complete(call, ret, remote_abort); 387 return ret; 388} 389 390/* 391 * Grab the operation ID from an incoming manager call. 392 */ 393static int rxperf_deliver_param_block(struct rxperf_call *call) 394{ 395 u32 version; 396 int ret; 397 398 /* Extract the parameter block */ 399 ret = rxperf_extract_data(call, true); 400 if (ret < 0) 401 return ret; 402 403 version = ntohl(call->params.version); 404 call->operation_id = ntohl(call->params.type); 405 call->deliver = rxperf_deliver_request; 406 407 if (version != RX_PERF_VERSION) { 408 pr_info("Version mismatch %x\n", version); 409 return -ENOTSUPP; 410 } 411 412 switch (call->operation_id) { 413 case RX_PERF_SEND: 414 call->type = "send"; 415 call->reply_len = 0; 416 call->iov_len = 4; /* Expect req size */ 417 break; 418 case RX_PERF_RECV: 419 call->type = "recv"; 420 call->req_len = 0; 421 call->iov_len = 4; /* Expect reply size */ 422 break; 423 case RX_PERF_RPC: 424 call->type = "rpc"; 425 call->iov_len = 8; /* Expect req size and reply size */ 426 break; 427 case RX_PERF_FILE: 428 call->type = "file"; 429 fallthrough; 430 default: 431 return -EOPNOTSUPP; 432 } 433 434 rxperf_set_call_state(call, RXPERF_CALL_SV_AWAIT_REQUEST); 435 return call->deliver(call); 436} 437 438/* 439 * Deliver the request data. 440 */ 441static int rxperf_deliver_request(struct rxperf_call *call) 442{ 443 int ret; 444 445 switch (call->unmarshal) { 446 case 0: 447 call->kvec[0].iov_len = call->iov_len; 448 call->kvec[0].iov_base = call->tmp; 449 iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len); 450 call->unmarshal++; 451 fallthrough; 452 case 1: 453 ret = rxperf_extract_data(call, true); 454 if (ret < 0) 455 return ret; 456 457 switch (call->operation_id) { 458 case RX_PERF_SEND: 459 call->type = "send"; 460 call->req_len = ntohl(call->tmp[0]); 461 call->reply_len = 0; 462 break; 463 case RX_PERF_RECV: 464 call->type = "recv"; 465 call->req_len = 0; 466 call->reply_len = ntohl(call->tmp[0]); 467 break; 468 case RX_PERF_RPC: 469 call->type = "rpc"; 470 call->req_len = ntohl(call->tmp[0]); 471 call->reply_len = ntohl(call->tmp[1]); 472 break; 473 default: 474 pr_info("Can't parse extra params\n"); 475 return -EIO; 476 } 477 478 pr_debug("CALL op=%s rq=%zx rp=%zx\n", 479 call->type, call->req_len, call->reply_len); 480 481 call->iov_len = call->req_len; 482 iov_iter_discard(&call->iter, READ, call->req_len); 483 call->unmarshal++; 484 fallthrough; 485 case 2: 486 ret = rxperf_extract_data(call, true); 487 if (ret < 0) 488 return ret; 489 490 /* Deal with the terminal magic cookie. */ 491 call->iov_len = 4; 492 call->kvec[0].iov_len = call->iov_len; 493 call->kvec[0].iov_base = call->tmp; 494 iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len); 495 call->unmarshal++; 496 fallthrough; 497 case 3: 498 ret = rxperf_extract_data(call, false); 499 if (ret < 0) 500 return ret; 501 call->unmarshal++; 502 fallthrough; 503 default: 504 return 0; 505 } 506} 507 508/* 509 * Process a call for which we've received the request. 510 */ 511static int rxperf_process_call(struct rxperf_call *call) 512{ 513 struct msghdr msg = {}; 514 struct bio_vec bv; 515 struct kvec iov[1]; 516 ssize_t n; 517 size_t reply_len = call->reply_len, len; 518 519 rxrpc_kernel_set_tx_length(rxperf_socket, call->rxcall, 520 reply_len + sizeof(rxperf_magic_cookie)); 521 522 while (reply_len > 0) { 523 len = umin(reply_len, PAGE_SIZE); 524 bvec_set_page(&bv, ZERO_PAGE(0), len, 0); 525 iov_iter_bvec(&msg.msg_iter, WRITE, &bv, 1, len); 526 msg.msg_flags = MSG_MORE; 527 n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, 528 len, rxperf_notify_end_reply_tx); 529 if (n < 0) 530 return n; 531 if (n == 0) 532 return -EIO; 533 reply_len -= n; 534 } 535 536 len = sizeof(rxperf_magic_cookie); 537 iov[0].iov_base = (void *)rxperf_magic_cookie; 538 iov[0].iov_len = len; 539 iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len); 540 msg.msg_flags = 0; 541 n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, len, 542 rxperf_notify_end_reply_tx); 543 if (n >= 0) 544 return 0; /* Success */ 545 546 if (n == -ENOMEM) 547 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 548 RXGEN_SS_MARSHAL, -ENOMEM, 549 rxperf_abort_oom); 550 return n; 551} 552 553/* 554 * Add an rxkad key to the security keyring. 555 */ 556static int rxperf_add_rxkad_key(struct key *keyring) 557{ 558 key_ref_t kref; 559 int ret; 560 561 kref = key_create_or_update(make_key_ref(keyring, true), 562 "rxrpc_s", 563 __stringify(RX_PERF_SERVICE) ":2", 564 secret, 565 sizeof(secret), 566 KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH 567 | KEY_USR_VIEW, 568 KEY_ALLOC_NOT_IN_QUOTA); 569 570 if (IS_ERR(kref)) { 571 pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref)); 572 return PTR_ERR(kref); 573 } 574 575 ret = key_link(keyring, key_ref_to_ptr(kref)); 576 if (ret < 0) 577 pr_err("Can't link rxperf server key: %d\n", ret); 578 key_ref_put(kref); 579 return ret; 580} 581 582#ifdef CONFIG_RXGK 583/* 584 * Add a yfs-rxgk key to the security keyring. 585 */ 586static int rxperf_add_yfs_rxgk_key(struct key *keyring, u32 enctype) 587{ 588 const struct krb5_enctype *krb5 = crypto_krb5_find_enctype(enctype); 589 key_ref_t kref; 590 char name[64]; 591 int ret; 592 u8 key[32]; 593 594 if (!krb5 || krb5->key_len > sizeof(key)) 595 return 0; 596 597 /* The key is just { 0, 1, 2, 3, 4, ... } */ 598 for (int i = 0; i < krb5->key_len; i++) 599 key[i] = i; 600 601 sprintf(name, "%u:6:1:%u", RX_PERF_SERVICE, enctype); 602 603 kref = key_create_or_update(make_key_ref(keyring, true), 604 "rxrpc_s", name, 605 key, krb5->key_len, 606 KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH | 607 KEY_USR_VIEW, 608 KEY_ALLOC_NOT_IN_QUOTA); 609 610 if (IS_ERR(kref)) { 611 pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref)); 612 return PTR_ERR(kref); 613 } 614 615 ret = key_link(keyring, key_ref_to_ptr(kref)); 616 if (ret < 0) 617 pr_err("Can't link rxperf server key: %d\n", ret); 618 key_ref_put(kref); 619 return ret; 620} 621#endif 622 623/* 624 * Initialise the rxperf server. 625 */ 626static int __init rxperf_init(void) 627{ 628 struct key *keyring; 629 int ret = -ENOMEM; 630 631 pr_info("Server registering\n"); 632 633 rxperf_workqueue = alloc_workqueue("rxperf", WQ_PERCPU, 0); 634 if (!rxperf_workqueue) 635 goto error_workqueue; 636 637 keyring = keyring_alloc("rxperf_server", 638 GLOBAL_ROOT_UID, GLOBAL_ROOT_GID, current_cred(), 639 KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH | 640 KEY_POS_WRITE | 641 KEY_USR_VIEW | KEY_USR_READ | KEY_USR_SEARCH | 642 KEY_USR_WRITE | 643 KEY_OTH_VIEW | KEY_OTH_READ | KEY_OTH_SEARCH, 644 KEY_ALLOC_NOT_IN_QUOTA, 645 NULL, NULL); 646 if (IS_ERR(keyring)) { 647 pr_err("Can't allocate rxperf server keyring: %ld\n", 648 PTR_ERR(keyring)); 649 goto error_keyring; 650 } 651 rxperf_sec_keyring = keyring; 652 ret = rxperf_add_rxkad_key(keyring); 653 if (ret < 0) 654 goto error_key; 655#ifdef CONFIG_RXGK 656 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES128_CTS_HMAC_SHA1_96); 657 if (ret < 0) 658 goto error_key; 659 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES256_CTS_HMAC_SHA1_96); 660 if (ret < 0) 661 goto error_key; 662 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES128_CTS_HMAC_SHA256_128); 663 if (ret < 0) 664 goto error_key; 665 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES256_CTS_HMAC_SHA384_192); 666 if (ret < 0) 667 goto error_key; 668 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_CAMELLIA128_CTS_CMAC); 669 if (ret < 0) 670 goto error_key; 671 ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_CAMELLIA256_CTS_CMAC); 672 if (ret < 0) 673 goto error_key; 674#endif 675 676 ret = rxperf_open_socket(); 677 if (ret < 0) 678 goto error_socket; 679 return 0; 680 681error_socket: 682error_key: 683 key_put(rxperf_sec_keyring); 684error_keyring: 685 destroy_workqueue(rxperf_workqueue); 686 rcu_barrier(); 687error_workqueue: 688 pr_err("Failed to register: %d\n", ret); 689 return ret; 690} 691late_initcall(rxperf_init); /* Must be called after net/ to create socket */ 692 693static void __exit rxperf_exit(void) 694{ 695 pr_info("Server unregistering.\n"); 696 697 rxperf_close_socket(); 698 key_put(rxperf_sec_keyring); 699 destroy_workqueue(rxperf_workqueue); 700 rcu_barrier(); 701} 702module_exit(rxperf_exit); 703