Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge tag 'rxrpc-fixes-20190809' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
Here's a couple of fixes for rxrpc:

(1) Fix refcounting of the local endpoint.

(2) Don't calculate or report packet skew information. This has been
obsolete since AFS 3.1 and so is a waste of resources.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>

+100 -83
+3 -3
net/rxrpc/af_rxrpc.c
··· 193 193 194 194 service_in_use: 195 195 write_unlock(&local->services_lock); 196 - rxrpc_put_local(local); 196 + rxrpc_unuse_local(local); 197 197 ret = -EADDRINUSE; 198 198 error_unlock: 199 199 release_sock(&rx->sk); ··· 402 402 */ 403 403 void rxrpc_kernel_probe_life(struct socket *sock, struct rxrpc_call *call) 404 404 { 405 - rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, 0, true, false, 405 + rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false, 406 406 rxrpc_propose_ack_ping_for_check_life); 407 407 rxrpc_send_ack_packet(call, true, NULL); 408 408 } ··· 901 901 rxrpc_queue_work(&rxnet->service_conn_reaper); 902 902 rxrpc_queue_work(&rxnet->client_conn_reaper); 903 903 904 - rxrpc_put_local(rx->local); 904 + rxrpc_unuse_local(rx->local); 905 905 rx->local = NULL; 906 906 key_put(rx->key); 907 907 rx->key = NULL;
+5 -3
net/rxrpc/ar-internal.h
··· 254 254 */ 255 255 struct rxrpc_local { 256 256 struct rcu_head rcu; 257 - atomic_t usage; 257 + atomic_t active_users; /* Number of users of the local endpoint */ 258 + atomic_t usage; /* Number of references to the structure */ 258 259 struct rxrpc_net *rxnet; /* The network ns in which this resides */ 259 260 struct list_head link; 260 261 struct socket *socket; /* my UDP socket */ ··· 650 649 651 650 /* receive-phase ACK management */ 652 651 u8 ackr_reason; /* reason to ACK */ 653 - u16 ackr_skew; /* skew on packet being ACK'd */ 654 652 rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */ 655 653 rxrpc_serial_t ackr_first_seq; /* first sequence number received */ 656 654 rxrpc_seq_t ackr_prev_seq; /* previous sequence number received */ ··· 743 743 /* 744 744 * call_event.c 745 745 */ 746 - void rxrpc_propose_ACK(struct rxrpc_call *, u8, u16, u32, bool, bool, 746 + void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool, bool, 747 747 enum rxrpc_propose_ack_trace); 748 748 void rxrpc_process_call(struct work_struct *); 749 749 ··· 1002 1002 struct rxrpc_local *rxrpc_get_local(struct rxrpc_local *); 1003 1003 struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *); 1004 1004 void rxrpc_put_local(struct rxrpc_local *); 1005 + struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *); 1006 + void rxrpc_unuse_local(struct rxrpc_local *); 1005 1007 void rxrpc_queue_local(struct rxrpc_local *); 1006 1008 void rxrpc_destroy_all_locals(struct rxrpc_net *); 1007 1009
+6 -9
net/rxrpc/call_event.c
··· 43 43 * propose an ACK be sent 44 44 */ 45 45 static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, 46 - u16 skew, u32 serial, bool immediate, 47 - bool background, 46 + u32 serial, bool immediate, bool background, 48 47 enum rxrpc_propose_ack_trace why) 49 48 { 50 49 enum rxrpc_propose_ack_outcome outcome = rxrpc_propose_ack_use; ··· 68 69 if (RXRPC_ACK_UPDATEABLE & (1 << ack_reason)) { 69 70 outcome = rxrpc_propose_ack_update; 70 71 call->ackr_serial = serial; 71 - call->ackr_skew = skew; 72 72 } 73 73 if (!immediate) 74 74 goto trace; 75 75 } else if (prior > rxrpc_ack_priority[call->ackr_reason]) { 76 76 call->ackr_reason = ack_reason; 77 77 call->ackr_serial = serial; 78 - call->ackr_skew = skew; 79 78 } else { 80 79 outcome = rxrpc_propose_ack_subsume; 81 80 } ··· 134 137 * propose an ACK be sent, locking the call structure 135 138 */ 136 139 void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, 137 - u16 skew, u32 serial, bool immediate, bool background, 140 + u32 serial, bool immediate, bool background, 138 141 enum rxrpc_propose_ack_trace why) 139 142 { 140 143 spin_lock_bh(&call->lock); 141 - __rxrpc_propose_ACK(call, ack_reason, skew, serial, 144 + __rxrpc_propose_ACK(call, ack_reason, serial, 142 145 immediate, background, why); 143 146 spin_unlock_bh(&call->lock); 144 147 } ··· 236 239 ack_ts = ktime_sub(now, call->acks_latest_ts); 237 240 if (ktime_to_ns(ack_ts) < call->peer->rtt) 238 241 goto out; 239 - rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, 0, true, false, 242 + rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false, 240 243 rxrpc_propose_ack_ping_for_lost_ack); 241 244 rxrpc_send_ack_packet(call, true, NULL); 242 245 goto out; ··· 369 372 if (time_after_eq(now, t)) { 370 373 trace_rxrpc_timer(call, rxrpc_timer_exp_keepalive, now); 371 374 cmpxchg(&call->keepalive_at, t, now + MAX_JIFFY_OFFSET); 372 - rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, 0, true, true, 375 + rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, true, 373 376 rxrpc_propose_ack_ping_for_keepalive); 374 377 set_bit(RXRPC_CALL_EV_PING, &call->events); 375 378 } ··· 404 407 send_ack = NULL; 405 408 if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events)) { 406 409 call->acks_lost_top = call->tx_top; 407 - rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, 0, true, false, 410 + rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false, 408 411 rxrpc_propose_ack_ping_for_lost_ack); 409 412 send_ack = &call->acks_lost_ping; 410 413 }
+28 -31
net/rxrpc/input.c
··· 196 196 * Ping the other end to fill our RTT cache and to retrieve the rwind 197 197 * and MTU parameters. 198 198 */ 199 - static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb, 200 - int skew) 199 + static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb) 201 200 { 202 201 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 203 202 ktime_t now = skb->tstamp; 204 203 205 204 if (call->peer->rtt_usage < 3 || 206 205 ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), now)) 207 - rxrpc_propose_ACK(call, RXRPC_ACK_PING, skew, sp->hdr.serial, 206 + rxrpc_propose_ACK(call, RXRPC_ACK_PING, sp->hdr.serial, 208 207 true, true, 209 208 rxrpc_propose_ack_ping_for_params); 210 209 } ··· 418 419 /* 419 420 * Process a DATA packet, adding the packet to the Rx ring. 420 421 */ 421 - static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb, 422 - u16 skew) 422 + static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) 423 423 { 424 424 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 425 425 enum rxrpc_call_state state; ··· 598 600 599 601 ack: 600 602 if (ack) 601 - rxrpc_propose_ACK(call, ack, skew, ack_serial, 603 + rxrpc_propose_ACK(call, ack, ack_serial, 602 604 immediate_ack, true, 603 605 rxrpc_propose_ack_input_data); 604 606 else 605 - rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, skew, serial, 607 + rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, 606 608 false, true, 607 609 rxrpc_propose_ack_input_data); 608 610 ··· 820 822 * soft-ACK means that the packet may be discarded and retransmission 821 823 * requested. A phase is complete when all packets are hard-ACK'd. 822 824 */ 823 - static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb, 824 - u16 skew) 825 + static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) 825 826 { 826 827 struct rxrpc_ack_summary summary = { 0 }; 827 828 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); ··· 864 867 if (buf.ack.reason == RXRPC_ACK_PING) { 865 868 _proto("Rx ACK %%%u PING Request", sp->hdr.serial); 866 869 rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE, 867 - skew, sp->hdr.serial, true, true, 870 + sp->hdr.serial, true, true, 868 871 rxrpc_propose_ack_respond_to_ping); 869 872 } else if (sp->hdr.flags & RXRPC_REQUEST_ACK) { 870 873 rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED, 871 - skew, sp->hdr.serial, true, true, 874 + sp->hdr.serial, true, true, 872 875 rxrpc_propose_ack_respond_to_ack); 873 876 } 874 877 ··· 945 948 RXRPC_TX_ANNO_LAST && 946 949 summary.nr_acks == call->tx_top - hard_ack && 947 950 rxrpc_is_client_call(call)) 948 - rxrpc_propose_ACK(call, RXRPC_ACK_PING, skew, sp->hdr.serial, 951 + rxrpc_propose_ACK(call, RXRPC_ACK_PING, sp->hdr.serial, 949 952 false, true, 950 953 rxrpc_propose_ack_ping_for_lost_reply); 951 954 ··· 1001 1004 * Process an incoming call packet. 1002 1005 */ 1003 1006 static void rxrpc_input_call_packet(struct rxrpc_call *call, 1004 - struct sk_buff *skb, u16 skew) 1007 + struct sk_buff *skb) 1005 1008 { 1006 1009 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 1007 1010 unsigned long timo; ··· 1020 1023 1021 1024 switch (sp->hdr.type) { 1022 1025 case RXRPC_PACKET_TYPE_DATA: 1023 - rxrpc_input_data(call, skb, skew); 1026 + rxrpc_input_data(call, skb); 1024 1027 break; 1025 1028 1026 1029 case RXRPC_PACKET_TYPE_ACK: 1027 - rxrpc_input_ack(call, skb, skew); 1030 + rxrpc_input_ack(call, skb); 1028 1031 break; 1029 1032 1030 1033 case RXRPC_PACKET_TYPE_BUSY: ··· 1105 1108 { 1106 1109 _enter("%p,%p", local, skb); 1107 1110 1108 - skb_queue_tail(&local->event_queue, skb); 1109 - rxrpc_queue_local(local); 1111 + if (rxrpc_get_local_maybe(local)) { 1112 + skb_queue_tail(&local->event_queue, skb); 1113 + rxrpc_queue_local(local); 1114 + } else { 1115 + rxrpc_free_skb(skb, rxrpc_skb_rx_freed); 1116 + } 1110 1117 } 1111 1118 1112 1119 /* ··· 1120 1119 { 1121 1120 CHECK_SLAB_OKAY(&local->usage); 1122 1121 1123 - skb_queue_tail(&local->reject_queue, skb); 1124 - rxrpc_queue_local(local); 1122 + if (rxrpc_get_local_maybe(local)) { 1123 + skb_queue_tail(&local->reject_queue, skb); 1124 + rxrpc_queue_local(local); 1125 + } else { 1126 + rxrpc_free_skb(skb, rxrpc_skb_rx_freed); 1127 + } 1125 1128 } 1126 1129 1127 1130 /* ··· 1178 1173 struct rxrpc_peer *peer = NULL; 1179 1174 struct rxrpc_sock *rx = NULL; 1180 1175 unsigned int channel; 1181 - int skew = 0; 1182 1176 1183 1177 _enter("%p", udp_sk); 1184 1178 ··· 1305 1301 goto out; 1306 1302 } 1307 1303 1308 - /* Note the serial number skew here */ 1309 - skew = (int)sp->hdr.serial - (int)conn->hi_serial; 1310 - if (skew >= 0) { 1311 - if (skew > 0) 1312 - conn->hi_serial = sp->hdr.serial; 1313 - } else { 1314 - skew = -skew; 1315 - skew = min(skew, 65535); 1316 - } 1304 + if ((int)sp->hdr.serial - (int)conn->hi_serial > 0) 1305 + conn->hi_serial = sp->hdr.serial; 1317 1306 1318 1307 /* Call-bound packets are routed by connection channel. */ 1319 1308 channel = sp->hdr.cid & RXRPC_CHANNELMASK; ··· 1369 1372 call = rxrpc_new_incoming_call(local, rx, skb); 1370 1373 if (!call) 1371 1374 goto reject_packet; 1372 - rxrpc_send_ping(call, skb, skew); 1375 + rxrpc_send_ping(call, skb); 1373 1376 mutex_unlock(&call->user_mutex); 1374 1377 } 1375 1378 1376 - rxrpc_input_call_packet(call, skb, skew); 1379 + rxrpc_input_call_packet(call, skb); 1377 1380 goto discard; 1378 1381 1379 1382 discard:
+54 -32
net/rxrpc/local_object.c
··· 79 79 local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL); 80 80 if (local) { 81 81 atomic_set(&local->usage, 1); 82 + atomic_set(&local->active_users, 1); 82 83 local->rxnet = rxnet; 83 84 INIT_LIST_HEAD(&local->link); 84 85 INIT_WORK(&local->processor, rxrpc_local_processor); ··· 267 266 * bind the transport socket may still fail if we're attempting 268 267 * to use a local address that the dying object is still using. 269 268 */ 270 - if (!rxrpc_get_local_maybe(local)) { 271 - cursor = cursor->next; 272 - list_del_init(&local->link); 269 + if (!rxrpc_use_local(local)) 273 270 break; 274 - } 275 271 276 272 age = "old"; 277 273 goto found; ··· 282 284 if (ret < 0) 283 285 goto sock_error; 284 286 285 - list_add_tail(&local->link, cursor); 287 + if (cursor != &rxnet->local_endpoints) 288 + list_replace(cursor, &local->link); 289 + else 290 + list_add_tail(&local->link, cursor); 286 291 age = "new"; 287 292 288 293 found: ··· 343 342 } 344 343 345 344 /* 346 - * Queue a local endpoint. 345 + * Queue a local endpoint unless it has become unreferenced and pass the 346 + * caller's reference to the work item. 347 347 */ 348 348 void rxrpc_queue_local(struct rxrpc_local *local) 349 349 { ··· 353 351 if (rxrpc_queue_work(&local->processor)) 354 352 trace_rxrpc_local(local, rxrpc_local_queued, 355 353 atomic_read(&local->usage), here); 356 - } 357 - 358 - /* 359 - * A local endpoint reached its end of life. 360 - */ 361 - static void __rxrpc_put_local(struct rxrpc_local *local) 362 - { 363 - _enter("%d", local->debug_id); 364 - rxrpc_queue_work(&local->processor); 354 + else 355 + rxrpc_put_local(local); 365 356 } 366 357 367 358 /* ··· 370 375 trace_rxrpc_local(local, rxrpc_local_put, n, here); 371 376 372 377 if (n == 0) 373 - __rxrpc_put_local(local); 378 + call_rcu(&local->rcu, rxrpc_local_rcu); 374 379 } 380 + } 381 + 382 + /* 383 + * Start using a local endpoint. 384 + */ 385 + struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *local) 386 + { 387 + unsigned int au; 388 + 389 + local = rxrpc_get_local_maybe(local); 390 + if (!local) 391 + return NULL; 392 + 393 + au = atomic_fetch_add_unless(&local->active_users, 1, 0); 394 + if (au == 0) { 395 + rxrpc_put_local(local); 396 + return NULL; 397 + } 398 + 399 + return local; 400 + } 401 + 402 + /* 403 + * Cease using a local endpoint. Once the number of active users reaches 0, we 404 + * start the closure of the transport in the work processor. 405 + */ 406 + void rxrpc_unuse_local(struct rxrpc_local *local) 407 + { 408 + unsigned int au; 409 + 410 + au = atomic_dec_return(&local->active_users); 411 + if (au == 0) 412 + rxrpc_queue_local(local); 413 + else 414 + rxrpc_put_local(local); 375 415 } 376 416 377 417 /* ··· 422 392 struct rxrpc_net *rxnet = local->rxnet; 423 393 424 394 _enter("%d", local->debug_id); 425 - 426 - /* We can get a race between an incoming call packet queueing the 427 - * processor again and the work processor starting the destruction 428 - * process which will shut down the UDP socket. 429 - */ 430 - if (local->dead) { 431 - _leave(" [already dead]"); 432 - return; 433 - } 434 - local->dead = true; 435 395 436 396 mutex_lock(&rxnet->local_mutex); 437 397 list_del_init(&local->link); ··· 442 422 */ 443 423 rxrpc_purge_queue(&local->reject_queue); 444 424 rxrpc_purge_queue(&local->event_queue); 445 - 446 - _debug("rcu local %d", local->debug_id); 447 - call_rcu(&local->rcu, rxrpc_local_rcu); 448 425 } 449 426 450 427 /* 451 - * Process events on an endpoint 428 + * Process events on an endpoint. The work item carries a ref which 429 + * we must release. 452 430 */ 453 431 static void rxrpc_local_processor(struct work_struct *work) 454 432 { ··· 459 441 460 442 do { 461 443 again = false; 462 - if (atomic_read(&local->usage) == 0) 463 - return rxrpc_local_destroyer(local); 444 + if (atomic_read(&local->active_users) == 0) { 445 + rxrpc_local_destroyer(local); 446 + break; 447 + } 464 448 465 449 if (!skb_queue_empty(&local->reject_queue)) { 466 450 rxrpc_reject_packets(local); ··· 474 454 again = true; 475 455 } 476 456 } while (again); 457 + 458 + rxrpc_put_local(local); 477 459 } 478 460 479 461 /*
+1 -2
net/rxrpc/output.c
··· 87 87 *_top = top; 88 88 89 89 pkt->ack.bufferSpace = htons(8); 90 - pkt->ack.maxSkew = htons(call->ackr_skew); 90 + pkt->ack.maxSkew = htons(0); 91 91 pkt->ack.firstPacket = htonl(hard_ack + 1); 92 92 pkt->ack.previousPacket = htonl(call->ackr_prev_seq); 93 93 pkt->ack.serial = htonl(serial); ··· 228 228 if (ping) 229 229 clear_bit(RXRPC_CALL_PINGING, &call->flags); 230 230 rxrpc_propose_ACK(call, pkt->ack.reason, 231 - ntohs(pkt->ack.maxSkew), 232 231 ntohl(pkt->ack.serial), 233 232 false, true, 234 233 rxrpc_propose_ack_retry_tx);
+3 -3
net/rxrpc/recvmsg.c
··· 141 141 ASSERTCMP(call->rx_hard_ack, ==, call->rx_top); 142 142 143 143 if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) { 144 - rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, false, true, 144 + rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, false, true, 145 145 rxrpc_propose_ack_terminal_ack); 146 146 //rxrpc_send_ack_packet(call, false, NULL); 147 147 } ··· 159 159 call->state = RXRPC_CALL_SERVER_ACK_REQUEST; 160 160 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET; 161 161 write_unlock_bh(&call->state_lock); 162 - rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial, false, true, 162 + rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false, true, 163 163 rxrpc_propose_ack_processing_op); 164 164 break; 165 165 default: ··· 212 212 if (after_eq(hard_ack, call->ackr_consumed + 2) || 213 213 after_eq(top, call->ackr_seen + 2) || 214 214 (hard_ack == top && after(hard_ack, call->ackr_consumed))) 215 - rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial, 215 + rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, 216 216 true, true, 217 217 rxrpc_propose_ack_rotate_rx); 218 218 if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)