Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v4.7-rc1 1286 lines 33 kB view raw
1/* Management of Tx window, Tx resend, ACKs and out-of-sequence reception 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12#include <linux/module.h> 13#include <linux/circ_buf.h> 14#include <linux/net.h> 15#include <linux/skbuff.h> 16#include <linux/slab.h> 17#include <linux/udp.h> 18#include <net/sock.h> 19#include <net/af_rxrpc.h> 20#include "ar-internal.h" 21 22/* 23 * propose an ACK be sent 24 */ 25void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, 26 u32 serial, bool immediate) 27{ 28 unsigned long expiry; 29 s8 prior = rxrpc_ack_priority[ack_reason]; 30 31 ASSERTCMP(prior, >, 0); 32 33 _enter("{%d},%s,%%%x,%u", 34 call->debug_id, rxrpc_acks(ack_reason), serial, immediate); 35 36 if (prior < rxrpc_ack_priority[call->ackr_reason]) { 37 if (immediate) 38 goto cancel_timer; 39 return; 40 } 41 42 /* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial 43 * numbers */ 44 if (prior == rxrpc_ack_priority[call->ackr_reason]) { 45 if (prior <= 4) 46 call->ackr_serial = serial; 47 if (immediate) 48 goto cancel_timer; 49 return; 50 } 51 52 call->ackr_reason = ack_reason; 53 call->ackr_serial = serial; 54 55 switch (ack_reason) { 56 case RXRPC_ACK_DELAY: 57 _debug("run delay timer"); 58 expiry = rxrpc_soft_ack_delay; 59 goto run_timer; 60 61 case RXRPC_ACK_IDLE: 62 if (!immediate) { 63 _debug("run defer timer"); 64 expiry = rxrpc_idle_ack_delay; 65 goto run_timer; 66 } 67 goto cancel_timer; 68 69 case RXRPC_ACK_REQUESTED: 70 expiry = rxrpc_requested_ack_delay; 71 if (!expiry) 72 goto cancel_timer; 73 if (!immediate || serial == 1) { 74 _debug("run defer timer"); 75 goto run_timer; 76 } 77 78 default: 79 _debug("immediate ACK"); 80 goto cancel_timer; 81 } 82 83run_timer: 84 expiry += jiffies; 85 if (!timer_pending(&call->ack_timer) || 86 time_after(call->ack_timer.expires, expiry)) 87 mod_timer(&call->ack_timer, expiry); 88 return; 89 90cancel_timer: 91 _debug("cancel timer %%%u", serial); 92 try_to_del_timer_sync(&call->ack_timer); 93 read_lock_bh(&call->state_lock); 94 if (call->state <= RXRPC_CALL_COMPLETE && 95 !test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events)) 96 rxrpc_queue_call(call); 97 read_unlock_bh(&call->state_lock); 98} 99 100/* 101 * propose an ACK be sent, locking the call structure 102 */ 103void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, 104 u32 serial, bool immediate) 105{ 106 s8 prior = rxrpc_ack_priority[ack_reason]; 107 108 if (prior > rxrpc_ack_priority[call->ackr_reason]) { 109 spin_lock_bh(&call->lock); 110 __rxrpc_propose_ACK(call, ack_reason, serial, immediate); 111 spin_unlock_bh(&call->lock); 112 } 113} 114 115/* 116 * set the resend timer 117 */ 118static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend, 119 unsigned long resend_at) 120{ 121 read_lock_bh(&call->state_lock); 122 if (call->state >= RXRPC_CALL_COMPLETE) 123 resend = 0; 124 125 if (resend & 1) { 126 _debug("SET RESEND"); 127 set_bit(RXRPC_CALL_EV_RESEND, &call->events); 128 } 129 130 if (resend & 2) { 131 _debug("MODIFY RESEND TIMER"); 132 set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); 133 mod_timer(&call->resend_timer, resend_at); 134 } else { 135 _debug("KILL RESEND TIMER"); 136 del_timer_sync(&call->resend_timer); 137 clear_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events); 138 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); 139 } 140 read_unlock_bh(&call->state_lock); 141} 142 143/* 144 * resend packets 145 */ 146static void rxrpc_resend(struct rxrpc_call *call) 147{ 148 struct rxrpc_wire_header *whdr; 149 struct rxrpc_skb_priv *sp; 150 struct sk_buff *txb; 151 unsigned long *p_txb, resend_at; 152 bool stop; 153 int loop; 154 u8 resend; 155 156 _enter("{%d,%d,%d,%d},", 157 call->acks_hard, call->acks_unacked, 158 atomic_read(&call->sequence), 159 CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz)); 160 161 stop = false; 162 resend = 0; 163 resend_at = 0; 164 165 for (loop = call->acks_tail; 166 loop != call->acks_head || stop; 167 loop = (loop + 1) & (call->acks_winsz - 1) 168 ) { 169 p_txb = call->acks_window + loop; 170 smp_read_barrier_depends(); 171 if (*p_txb & 1) 172 continue; 173 174 txb = (struct sk_buff *) *p_txb; 175 sp = rxrpc_skb(txb); 176 177 if (sp->need_resend) { 178 sp->need_resend = false; 179 180 /* each Tx packet has a new serial number */ 181 sp->hdr.serial = atomic_inc_return(&call->conn->serial); 182 183 whdr = (struct rxrpc_wire_header *)txb->head; 184 whdr->serial = htonl(sp->hdr.serial); 185 186 _proto("Tx DATA %%%u { #%d }", 187 sp->hdr.serial, sp->hdr.seq); 188 if (rxrpc_send_packet(call->conn->trans, txb) < 0) { 189 stop = true; 190 sp->resend_at = jiffies + 3; 191 } else { 192 sp->resend_at = 193 jiffies + rxrpc_resend_timeout; 194 } 195 } 196 197 if (time_after_eq(jiffies + 1, sp->resend_at)) { 198 sp->need_resend = true; 199 resend |= 1; 200 } else if (resend & 2) { 201 if (time_before(sp->resend_at, resend_at)) 202 resend_at = sp->resend_at; 203 } else { 204 resend_at = sp->resend_at; 205 resend |= 2; 206 } 207 } 208 209 rxrpc_set_resend(call, resend, resend_at); 210 _leave(""); 211} 212 213/* 214 * handle resend timer expiry 215 */ 216static void rxrpc_resend_timer(struct rxrpc_call *call) 217{ 218 struct rxrpc_skb_priv *sp; 219 struct sk_buff *txb; 220 unsigned long *p_txb, resend_at; 221 int loop; 222 u8 resend; 223 224 _enter("%d,%d,%d", 225 call->acks_tail, call->acks_unacked, call->acks_head); 226 227 if (call->state >= RXRPC_CALL_COMPLETE) 228 return; 229 230 resend = 0; 231 resend_at = 0; 232 233 for (loop = call->acks_unacked; 234 loop != call->acks_head; 235 loop = (loop + 1) & (call->acks_winsz - 1) 236 ) { 237 p_txb = call->acks_window + loop; 238 smp_read_barrier_depends(); 239 txb = (struct sk_buff *) (*p_txb & ~1); 240 sp = rxrpc_skb(txb); 241 242 ASSERT(!(*p_txb & 1)); 243 244 if (sp->need_resend) { 245 ; 246 } else if (time_after_eq(jiffies + 1, sp->resend_at)) { 247 sp->need_resend = true; 248 resend |= 1; 249 } else if (resend & 2) { 250 if (time_before(sp->resend_at, resend_at)) 251 resend_at = sp->resend_at; 252 } else { 253 resend_at = sp->resend_at; 254 resend |= 2; 255 } 256 } 257 258 rxrpc_set_resend(call, resend, resend_at); 259 _leave(""); 260} 261 262/* 263 * process soft ACKs of our transmitted packets 264 * - these indicate packets the peer has or has not received, but hasn't yet 265 * given to the consumer, and so can still be discarded and re-requested 266 */ 267static int rxrpc_process_soft_ACKs(struct rxrpc_call *call, 268 struct rxrpc_ackpacket *ack, 269 struct sk_buff *skb) 270{ 271 struct rxrpc_skb_priv *sp; 272 struct sk_buff *txb; 273 unsigned long *p_txb, resend_at; 274 int loop; 275 u8 sacks[RXRPC_MAXACKS], resend; 276 277 _enter("{%d,%d},{%d},", 278 call->acks_hard, 279 CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz), 280 ack->nAcks); 281 282 if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0) 283 goto protocol_error; 284 285 resend = 0; 286 resend_at = 0; 287 for (loop = 0; loop < ack->nAcks; loop++) { 288 p_txb = call->acks_window; 289 p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1); 290 smp_read_barrier_depends(); 291 txb = (struct sk_buff *) (*p_txb & ~1); 292 sp = rxrpc_skb(txb); 293 294 switch (sacks[loop]) { 295 case RXRPC_ACK_TYPE_ACK: 296 sp->need_resend = false; 297 *p_txb |= 1; 298 break; 299 case RXRPC_ACK_TYPE_NACK: 300 sp->need_resend = true; 301 *p_txb &= ~1; 302 resend = 1; 303 break; 304 default: 305 _debug("Unsupported ACK type %d", sacks[loop]); 306 goto protocol_error; 307 } 308 } 309 310 smp_mb(); 311 call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1); 312 313 /* anything not explicitly ACK'd is implicitly NACK'd, but may just not 314 * have been received or processed yet by the far end */ 315 for (loop = call->acks_unacked; 316 loop != call->acks_head; 317 loop = (loop + 1) & (call->acks_winsz - 1) 318 ) { 319 p_txb = call->acks_window + loop; 320 smp_read_barrier_depends(); 321 txb = (struct sk_buff *) (*p_txb & ~1); 322 sp = rxrpc_skb(txb); 323 324 if (*p_txb & 1) { 325 /* packet must have been discarded */ 326 sp->need_resend = true; 327 *p_txb &= ~1; 328 resend |= 1; 329 } else if (sp->need_resend) { 330 ; 331 } else if (time_after_eq(jiffies + 1, sp->resend_at)) { 332 sp->need_resend = true; 333 resend |= 1; 334 } else if (resend & 2) { 335 if (time_before(sp->resend_at, resend_at)) 336 resend_at = sp->resend_at; 337 } else { 338 resend_at = sp->resend_at; 339 resend |= 2; 340 } 341 } 342 343 rxrpc_set_resend(call, resend, resend_at); 344 _leave(" = 0"); 345 return 0; 346 347protocol_error: 348 _leave(" = -EPROTO"); 349 return -EPROTO; 350} 351 352/* 353 * discard hard-ACK'd packets from the Tx window 354 */ 355static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard) 356{ 357 unsigned long _skb; 358 int tail = call->acks_tail, old_tail; 359 int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz); 360 361 _enter("{%u,%u},%u", call->acks_hard, win, hard); 362 363 ASSERTCMP(hard - call->acks_hard, <=, win); 364 365 while (call->acks_hard < hard) { 366 smp_read_barrier_depends(); 367 _skb = call->acks_window[tail] & ~1; 368 rxrpc_free_skb((struct sk_buff *) _skb); 369 old_tail = tail; 370 tail = (tail + 1) & (call->acks_winsz - 1); 371 call->acks_tail = tail; 372 if (call->acks_unacked == old_tail) 373 call->acks_unacked = tail; 374 call->acks_hard++; 375 } 376 377 wake_up(&call->tx_waitq); 378} 379 380/* 381 * clear the Tx window in the event of a failure 382 */ 383static void rxrpc_clear_tx_window(struct rxrpc_call *call) 384{ 385 rxrpc_rotate_tx_window(call, atomic_read(&call->sequence)); 386} 387 388/* 389 * drain the out of sequence received packet queue into the packet Rx queue 390 */ 391static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call) 392{ 393 struct rxrpc_skb_priv *sp; 394 struct sk_buff *skb; 395 bool terminal; 396 int ret; 397 398 _enter("{%d,%d}", call->rx_data_post, call->rx_first_oos); 399 400 spin_lock_bh(&call->lock); 401 402 ret = -ECONNRESET; 403 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) 404 goto socket_unavailable; 405 406 skb = skb_dequeue(&call->rx_oos_queue); 407 if (skb) { 408 sp = rxrpc_skb(skb); 409 410 _debug("drain OOS packet %d [%d]", 411 sp->hdr.seq, call->rx_first_oos); 412 413 if (sp->hdr.seq != call->rx_first_oos) { 414 skb_queue_head(&call->rx_oos_queue, skb); 415 call->rx_first_oos = rxrpc_skb(skb)->hdr.seq; 416 _debug("requeue %p {%u}", skb, call->rx_first_oos); 417 } else { 418 skb->mark = RXRPC_SKB_MARK_DATA; 419 terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) && 420 !(sp->hdr.flags & RXRPC_CLIENT_INITIATED)); 421 ret = rxrpc_queue_rcv_skb(call, skb, true, terminal); 422 BUG_ON(ret < 0); 423 _debug("drain #%u", call->rx_data_post); 424 call->rx_data_post++; 425 426 /* find out what the next packet is */ 427 skb = skb_peek(&call->rx_oos_queue); 428 if (skb) 429 call->rx_first_oos = rxrpc_skb(skb)->hdr.seq; 430 else 431 call->rx_first_oos = 0; 432 _debug("peek %p {%u}", skb, call->rx_first_oos); 433 } 434 } 435 436 ret = 0; 437socket_unavailable: 438 spin_unlock_bh(&call->lock); 439 _leave(" = %d", ret); 440 return ret; 441} 442 443/* 444 * insert an out of sequence packet into the buffer 445 */ 446static void rxrpc_insert_oos_packet(struct rxrpc_call *call, 447 struct sk_buff *skb) 448{ 449 struct rxrpc_skb_priv *sp, *psp; 450 struct sk_buff *p; 451 u32 seq; 452 453 sp = rxrpc_skb(skb); 454 seq = sp->hdr.seq; 455 _enter(",,{%u}", seq); 456 457 skb->destructor = rxrpc_packet_destructor; 458 ASSERTCMP(sp->call, ==, NULL); 459 sp->call = call; 460 rxrpc_get_call(call); 461 462 /* insert into the buffer in sequence order */ 463 spin_lock_bh(&call->lock); 464 465 skb_queue_walk(&call->rx_oos_queue, p) { 466 psp = rxrpc_skb(p); 467 if (psp->hdr.seq > seq) { 468 _debug("insert oos #%u before #%u", seq, psp->hdr.seq); 469 skb_insert(p, skb, &call->rx_oos_queue); 470 goto inserted; 471 } 472 } 473 474 _debug("append oos #%u", seq); 475 skb_queue_tail(&call->rx_oos_queue, skb); 476inserted: 477 478 /* we might now have a new front to the queue */ 479 if (call->rx_first_oos == 0 || seq < call->rx_first_oos) 480 call->rx_first_oos = seq; 481 482 read_lock(&call->state_lock); 483 if (call->state < RXRPC_CALL_COMPLETE && 484 call->rx_data_post == call->rx_first_oos) { 485 _debug("drain rx oos now"); 486 set_bit(RXRPC_CALL_EV_DRAIN_RX_OOS, &call->events); 487 } 488 read_unlock(&call->state_lock); 489 490 spin_unlock_bh(&call->lock); 491 _leave(" [stored #%u]", call->rx_first_oos); 492} 493 494/* 495 * clear the Tx window on final ACK reception 496 */ 497static void rxrpc_zap_tx_window(struct rxrpc_call *call) 498{ 499 struct rxrpc_skb_priv *sp; 500 struct sk_buff *skb; 501 unsigned long _skb, *acks_window; 502 u8 winsz = call->acks_winsz; 503 int tail; 504 505 acks_window = call->acks_window; 506 call->acks_window = NULL; 507 508 while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) { 509 tail = call->acks_tail; 510 smp_read_barrier_depends(); 511 _skb = acks_window[tail] & ~1; 512 smp_mb(); 513 call->acks_tail = (call->acks_tail + 1) & (winsz - 1); 514 515 skb = (struct sk_buff *) _skb; 516 sp = rxrpc_skb(skb); 517 _debug("+++ clear Tx %u", sp->hdr.seq); 518 rxrpc_free_skb(skb); 519 } 520 521 kfree(acks_window); 522} 523 524/* 525 * process the extra information that may be appended to an ACK packet 526 */ 527static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb, 528 unsigned int latest, int nAcks) 529{ 530 struct rxrpc_ackinfo ackinfo; 531 struct rxrpc_peer *peer; 532 unsigned int mtu; 533 534 if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) { 535 _leave(" [no ackinfo]"); 536 return; 537 } 538 539 _proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }", 540 latest, 541 ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU), 542 ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max)); 543 544 mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU)); 545 546 peer = call->conn->trans->peer; 547 if (mtu < peer->maxdata) { 548 spin_lock_bh(&peer->lock); 549 peer->maxdata = mtu; 550 peer->mtu = mtu + peer->hdrsize; 551 spin_unlock_bh(&peer->lock); 552 _net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata); 553 } 554} 555 556/* 557 * process packets in the reception queue 558 */ 559static int rxrpc_process_rx_queue(struct rxrpc_call *call, 560 u32 *_abort_code) 561{ 562 struct rxrpc_ackpacket ack; 563 struct rxrpc_skb_priv *sp; 564 struct sk_buff *skb; 565 bool post_ACK; 566 int latest; 567 u32 hard, tx; 568 569 _enter(""); 570 571process_further: 572 skb = skb_dequeue(&call->rx_queue); 573 if (!skb) 574 return -EAGAIN; 575 576 _net("deferred skb %p", skb); 577 578 sp = rxrpc_skb(skb); 579 580 _debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state); 581 582 post_ACK = false; 583 584 switch (sp->hdr.type) { 585 /* data packets that wind up here have been received out of 586 * order, need security processing or are jumbo packets */ 587 case RXRPC_PACKET_TYPE_DATA: 588 _proto("OOSQ DATA %%%u { #%u }", sp->hdr.serial, sp->hdr.seq); 589 590 /* secured packets must be verified and possibly decrypted */ 591 if (call->conn->security->verify_packet(call, skb, 592 _abort_code) < 0) 593 goto protocol_error; 594 595 rxrpc_insert_oos_packet(call, skb); 596 goto process_further; 597 598 /* partial ACK to process */ 599 case RXRPC_PACKET_TYPE_ACK: 600 if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) { 601 _debug("extraction failure"); 602 goto protocol_error; 603 } 604 if (!skb_pull(skb, sizeof(ack))) 605 BUG(); 606 607 latest = sp->hdr.serial; 608 hard = ntohl(ack.firstPacket); 609 tx = atomic_read(&call->sequence); 610 611 _proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", 612 latest, 613 ntohs(ack.maxSkew), 614 hard, 615 ntohl(ack.previousPacket), 616 ntohl(ack.serial), 617 rxrpc_acks(ack.reason), 618 ack.nAcks); 619 620 rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks); 621 622 if (ack.reason == RXRPC_ACK_PING) { 623 _proto("Rx ACK %%%u PING Request", latest); 624 rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE, 625 sp->hdr.serial, true); 626 } 627 628 /* discard any out-of-order or duplicate ACKs */ 629 if (latest - call->acks_latest <= 0) { 630 _debug("discard ACK %d <= %d", 631 latest, call->acks_latest); 632 goto discard; 633 } 634 call->acks_latest = latest; 635 636 if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST && 637 call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY && 638 call->state != RXRPC_CALL_SERVER_SEND_REPLY && 639 call->state != RXRPC_CALL_SERVER_AWAIT_ACK) 640 goto discard; 641 642 _debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state); 643 644 if (hard > 0) { 645 if (hard - 1 > tx) { 646 _debug("hard-ACK'd packet %d not transmitted" 647 " (%d top)", 648 hard - 1, tx); 649 goto protocol_error; 650 } 651 652 if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY || 653 call->state == RXRPC_CALL_SERVER_AWAIT_ACK) && 654 hard > tx) { 655 call->acks_hard = tx; 656 goto all_acked; 657 } 658 659 smp_rmb(); 660 rxrpc_rotate_tx_window(call, hard - 1); 661 } 662 663 if (ack.nAcks > 0) { 664 if (hard - 1 + ack.nAcks > tx) { 665 _debug("soft-ACK'd packet %d+%d not" 666 " transmitted (%d top)", 667 hard - 1, ack.nAcks, tx); 668 goto protocol_error; 669 } 670 671 if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0) 672 goto protocol_error; 673 } 674 goto discard; 675 676 /* complete ACK to process */ 677 case RXRPC_PACKET_TYPE_ACKALL: 678 goto all_acked; 679 680 /* abort and busy are handled elsewhere */ 681 case RXRPC_PACKET_TYPE_BUSY: 682 case RXRPC_PACKET_TYPE_ABORT: 683 BUG(); 684 685 /* connection level events - also handled elsewhere */ 686 case RXRPC_PACKET_TYPE_CHALLENGE: 687 case RXRPC_PACKET_TYPE_RESPONSE: 688 case RXRPC_PACKET_TYPE_DEBUG: 689 BUG(); 690 } 691 692 /* if we've had a hard ACK that covers all the packets we've sent, then 693 * that ends that phase of the operation */ 694all_acked: 695 write_lock_bh(&call->state_lock); 696 _debug("ack all %d", call->state); 697 698 switch (call->state) { 699 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 700 call->state = RXRPC_CALL_CLIENT_RECV_REPLY; 701 break; 702 case RXRPC_CALL_SERVER_AWAIT_ACK: 703 _debug("srv complete"); 704 call->state = RXRPC_CALL_COMPLETE; 705 post_ACK = true; 706 break; 707 case RXRPC_CALL_CLIENT_SEND_REQUEST: 708 case RXRPC_CALL_SERVER_RECV_REQUEST: 709 goto protocol_error_unlock; /* can't occur yet */ 710 default: 711 write_unlock_bh(&call->state_lock); 712 goto discard; /* assume packet left over from earlier phase */ 713 } 714 715 write_unlock_bh(&call->state_lock); 716 717 /* if all the packets we sent are hard-ACK'd, then we can discard 718 * whatever we've got left */ 719 _debug("clear Tx %d", 720 CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz)); 721 722 del_timer_sync(&call->resend_timer); 723 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); 724 clear_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events); 725 726 if (call->acks_window) 727 rxrpc_zap_tx_window(call); 728 729 if (post_ACK) { 730 /* post the final ACK message for userspace to pick up */ 731 _debug("post ACK"); 732 skb->mark = RXRPC_SKB_MARK_FINAL_ACK; 733 sp->call = call; 734 rxrpc_get_call(call); 735 spin_lock_bh(&call->lock); 736 if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0) 737 BUG(); 738 spin_unlock_bh(&call->lock); 739 goto process_further; 740 } 741 742discard: 743 rxrpc_free_skb(skb); 744 goto process_further; 745 746protocol_error_unlock: 747 write_unlock_bh(&call->state_lock); 748protocol_error: 749 rxrpc_free_skb(skb); 750 _leave(" = -EPROTO"); 751 return -EPROTO; 752} 753 754/* 755 * post a message to the socket Rx queue for recvmsg() to pick up 756 */ 757static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error, 758 bool fatal) 759{ 760 struct rxrpc_skb_priv *sp; 761 struct sk_buff *skb; 762 int ret; 763 764 _enter("{%d,%lx},%u,%u,%d", 765 call->debug_id, call->flags, mark, error, fatal); 766 767 /* remove timers and things for fatal messages */ 768 if (fatal) { 769 del_timer_sync(&call->resend_timer); 770 del_timer_sync(&call->ack_timer); 771 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); 772 } 773 774 if (mark != RXRPC_SKB_MARK_NEW_CALL && 775 !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 776 _leave("[no userid]"); 777 return 0; 778 } 779 780 if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) { 781 skb = alloc_skb(0, GFP_NOFS); 782 if (!skb) 783 return -ENOMEM; 784 785 rxrpc_new_skb(skb); 786 787 skb->mark = mark; 788 789 sp = rxrpc_skb(skb); 790 memset(sp, 0, sizeof(*sp)); 791 sp->error = error; 792 sp->call = call; 793 rxrpc_get_call(call); 794 795 spin_lock_bh(&call->lock); 796 ret = rxrpc_queue_rcv_skb(call, skb, true, fatal); 797 spin_unlock_bh(&call->lock); 798 BUG_ON(ret < 0); 799 } 800 801 return 0; 802} 803 804/* 805 * handle background processing of incoming call packets and ACK / abort 806 * generation 807 */ 808void rxrpc_process_call(struct work_struct *work) 809{ 810 struct rxrpc_call *call = 811 container_of(work, struct rxrpc_call, processor); 812 struct rxrpc_wire_header whdr; 813 struct rxrpc_ackpacket ack; 814 struct rxrpc_ackinfo ackinfo; 815 struct msghdr msg; 816 struct kvec iov[5]; 817 enum rxrpc_call_event genbit; 818 unsigned long bits; 819 __be32 data, pad; 820 size_t len; 821 int loop, nbit, ioc, ret, mtu; 822 u32 serial, abort_code = RX_PROTOCOL_ERROR; 823 u8 *acks = NULL; 824 825 //printk("\n--------------------\n"); 826 _enter("{%d,%s,%lx} [%lu]", 827 call->debug_id, rxrpc_call_states[call->state], call->events, 828 (jiffies - call->creation_jif) / (HZ / 10)); 829 830 if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) { 831 _debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX"); 832 return; 833 } 834 835 /* there's a good chance we're going to have to send a message, so set 836 * one up in advance */ 837 msg.msg_name = &call->conn->trans->peer->srx.transport; 838 msg.msg_namelen = call->conn->trans->peer->srx.transport_len; 839 msg.msg_control = NULL; 840 msg.msg_controllen = 0; 841 msg.msg_flags = 0; 842 843 whdr.epoch = htonl(call->conn->epoch); 844 whdr.cid = htonl(call->cid); 845 whdr.callNumber = htonl(call->call_id); 846 whdr.seq = 0; 847 whdr.type = RXRPC_PACKET_TYPE_ACK; 848 whdr.flags = call->conn->out_clientflag; 849 whdr.userStatus = 0; 850 whdr.securityIndex = call->conn->security_ix; 851 whdr._rsvd = 0; 852 whdr.serviceId = htons(call->service_id); 853 854 memset(iov, 0, sizeof(iov)); 855 iov[0].iov_base = &whdr; 856 iov[0].iov_len = sizeof(whdr); 857 858 /* deal with events of a final nature */ 859 if (test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) { 860 rxrpc_release_call(call); 861 clear_bit(RXRPC_CALL_EV_RELEASE, &call->events); 862 } 863 864 if (test_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events)) { 865 int error; 866 867 clear_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events); 868 clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events); 869 clear_bit(RXRPC_CALL_EV_ABORT, &call->events); 870 871 error = call->conn->trans->peer->net_error; 872 _debug("post net error %d", error); 873 874 if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR, 875 error, true) < 0) 876 goto no_mem; 877 clear_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events); 878 goto kill_ACKs; 879 } 880 881 if (test_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events)) { 882 ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE); 883 884 clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events); 885 clear_bit(RXRPC_CALL_EV_ABORT, &call->events); 886 887 _debug("post conn abort"); 888 889 if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR, 890 call->conn->error, true) < 0) 891 goto no_mem; 892 clear_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events); 893 goto kill_ACKs; 894 } 895 896 if (test_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events)) { 897 whdr.type = RXRPC_PACKET_TYPE_BUSY; 898 genbit = RXRPC_CALL_EV_REJECT_BUSY; 899 goto send_message; 900 } 901 902 if (test_bit(RXRPC_CALL_EV_ABORT, &call->events)) { 903 ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE); 904 905 if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR, 906 ECONNABORTED, true) < 0) 907 goto no_mem; 908 whdr.type = RXRPC_PACKET_TYPE_ABORT; 909 data = htonl(call->local_abort); 910 iov[1].iov_base = &data; 911 iov[1].iov_len = sizeof(data); 912 genbit = RXRPC_CALL_EV_ABORT; 913 goto send_message; 914 } 915 916 if (test_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events)) { 917 genbit = RXRPC_CALL_EV_ACK_FINAL; 918 919 ack.bufferSpace = htons(8); 920 ack.maxSkew = 0; 921 ack.serial = 0; 922 ack.reason = RXRPC_ACK_IDLE; 923 ack.nAcks = 0; 924 call->ackr_reason = 0; 925 926 spin_lock_bh(&call->lock); 927 ack.serial = htonl(call->ackr_serial); 928 ack.previousPacket = htonl(call->ackr_prev_seq); 929 ack.firstPacket = htonl(call->rx_data_eaten + 1); 930 spin_unlock_bh(&call->lock); 931 932 pad = 0; 933 934 iov[1].iov_base = &ack; 935 iov[1].iov_len = sizeof(ack); 936 iov[2].iov_base = &pad; 937 iov[2].iov_len = 3; 938 iov[3].iov_base = &ackinfo; 939 iov[3].iov_len = sizeof(ackinfo); 940 goto send_ACK; 941 } 942 943 if (call->events & ((1 << RXRPC_CALL_EV_RCVD_BUSY) | 944 (1 << RXRPC_CALL_EV_RCVD_ABORT)) 945 ) { 946 u32 mark; 947 948 if (test_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events)) 949 mark = RXRPC_SKB_MARK_REMOTE_ABORT; 950 else 951 mark = RXRPC_SKB_MARK_BUSY; 952 953 _debug("post abort/busy"); 954 rxrpc_clear_tx_window(call); 955 if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0) 956 goto no_mem; 957 958 clear_bit(RXRPC_CALL_EV_RCVD_BUSY, &call->events); 959 clear_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events); 960 goto kill_ACKs; 961 } 962 963 if (test_and_clear_bit(RXRPC_CALL_EV_RCVD_ACKALL, &call->events)) { 964 _debug("do implicit ackall"); 965 rxrpc_clear_tx_window(call); 966 } 967 968 if (test_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events)) { 969 write_lock_bh(&call->state_lock); 970 if (call->state <= RXRPC_CALL_COMPLETE) { 971 call->state = RXRPC_CALL_LOCALLY_ABORTED; 972 call->local_abort = RX_CALL_TIMEOUT; 973 set_bit(RXRPC_CALL_EV_ABORT, &call->events); 974 } 975 write_unlock_bh(&call->state_lock); 976 977 _debug("post timeout"); 978 if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR, 979 ETIME, true) < 0) 980 goto no_mem; 981 982 clear_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events); 983 goto kill_ACKs; 984 } 985 986 /* deal with assorted inbound messages */ 987 if (!skb_queue_empty(&call->rx_queue)) { 988 switch (rxrpc_process_rx_queue(call, &abort_code)) { 989 case 0: 990 case -EAGAIN: 991 break; 992 case -ENOMEM: 993 goto no_mem; 994 case -EKEYEXPIRED: 995 case -EKEYREJECTED: 996 case -EPROTO: 997 rxrpc_abort_call(call, abort_code); 998 goto kill_ACKs; 999 } 1000 } 1001 1002 /* handle resending */ 1003 if (test_and_clear_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events)) 1004 rxrpc_resend_timer(call); 1005 if (test_and_clear_bit(RXRPC_CALL_EV_RESEND, &call->events)) 1006 rxrpc_resend(call); 1007 1008 /* consider sending an ordinary ACK */ 1009 if (test_bit(RXRPC_CALL_EV_ACK, &call->events)) { 1010 _debug("send ACK: window: %d - %d { %lx }", 1011 call->rx_data_eaten, call->ackr_win_top, 1012 call->ackr_window[0]); 1013 1014 if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST && 1015 call->ackr_reason != RXRPC_ACK_PING_RESPONSE) { 1016 /* ACK by sending reply DATA packet in this state */ 1017 clear_bit(RXRPC_CALL_EV_ACK, &call->events); 1018 goto maybe_reschedule; 1019 } 1020 1021 genbit = RXRPC_CALL_EV_ACK; 1022 1023 acks = kzalloc(call->ackr_win_top - call->rx_data_eaten, 1024 GFP_NOFS); 1025 if (!acks) 1026 goto no_mem; 1027 1028 //hdr.flags = RXRPC_SLOW_START_OK; 1029 ack.bufferSpace = htons(8); 1030 ack.maxSkew = 0; 1031 1032 spin_lock_bh(&call->lock); 1033 ack.reason = call->ackr_reason; 1034 ack.serial = htonl(call->ackr_serial); 1035 ack.previousPacket = htonl(call->ackr_prev_seq); 1036 ack.firstPacket = htonl(call->rx_data_eaten + 1); 1037 1038 ack.nAcks = 0; 1039 for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) { 1040 nbit = loop * BITS_PER_LONG; 1041 for (bits = call->ackr_window[loop]; bits; bits >>= 1 1042 ) { 1043 _debug("- l=%d n=%d b=%lx", loop, nbit, bits); 1044 if (bits & 1) { 1045 acks[nbit] = RXRPC_ACK_TYPE_ACK; 1046 ack.nAcks = nbit + 1; 1047 } 1048 nbit++; 1049 } 1050 } 1051 call->ackr_reason = 0; 1052 spin_unlock_bh(&call->lock); 1053 1054 pad = 0; 1055 1056 iov[1].iov_base = &ack; 1057 iov[1].iov_len = sizeof(ack); 1058 iov[2].iov_base = acks; 1059 iov[2].iov_len = ack.nAcks; 1060 iov[3].iov_base = &pad; 1061 iov[3].iov_len = 3; 1062 iov[4].iov_base = &ackinfo; 1063 iov[4].iov_len = sizeof(ackinfo); 1064 1065 switch (ack.reason) { 1066 case RXRPC_ACK_REQUESTED: 1067 case RXRPC_ACK_DUPLICATE: 1068 case RXRPC_ACK_OUT_OF_SEQUENCE: 1069 case RXRPC_ACK_EXCEEDS_WINDOW: 1070 case RXRPC_ACK_NOSPACE: 1071 case RXRPC_ACK_PING: 1072 case RXRPC_ACK_PING_RESPONSE: 1073 goto send_ACK_with_skew; 1074 case RXRPC_ACK_DELAY: 1075 case RXRPC_ACK_IDLE: 1076 goto send_ACK; 1077 } 1078 } 1079 1080 /* handle completion of security negotiations on an incoming 1081 * connection */ 1082 if (test_and_clear_bit(RXRPC_CALL_EV_SECURED, &call->events)) { 1083 _debug("secured"); 1084 spin_lock_bh(&call->lock); 1085 1086 if (call->state == RXRPC_CALL_SERVER_SECURING) { 1087 _debug("securing"); 1088 write_lock(&call->conn->lock); 1089 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 1090 !test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) { 1091 _debug("not released"); 1092 call->state = RXRPC_CALL_SERVER_ACCEPTING; 1093 list_move_tail(&call->accept_link, 1094 &call->socket->acceptq); 1095 } 1096 write_unlock(&call->conn->lock); 1097 read_lock(&call->state_lock); 1098 if (call->state < RXRPC_CALL_COMPLETE) 1099 set_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events); 1100 read_unlock(&call->state_lock); 1101 } 1102 1103 spin_unlock_bh(&call->lock); 1104 if (!test_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events)) 1105 goto maybe_reschedule; 1106 } 1107 1108 /* post a notification of an acceptable connection to the app */ 1109 if (test_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events)) { 1110 _debug("post accept"); 1111 if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL, 1112 0, false) < 0) 1113 goto no_mem; 1114 clear_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events); 1115 goto maybe_reschedule; 1116 } 1117 1118 /* handle incoming call acceptance */ 1119 if (test_and_clear_bit(RXRPC_CALL_EV_ACCEPTED, &call->events)) { 1120 _debug("accepted"); 1121 ASSERTCMP(call->rx_data_post, ==, 0); 1122 call->rx_data_post = 1; 1123 read_lock_bh(&call->state_lock); 1124 if (call->state < RXRPC_CALL_COMPLETE) 1125 set_bit(RXRPC_CALL_EV_DRAIN_RX_OOS, &call->events); 1126 read_unlock_bh(&call->state_lock); 1127 } 1128 1129 /* drain the out of sequence received packet queue into the packet Rx 1130 * queue */ 1131 if (test_and_clear_bit(RXRPC_CALL_EV_DRAIN_RX_OOS, &call->events)) { 1132 while (call->rx_data_post == call->rx_first_oos) 1133 if (rxrpc_drain_rx_oos_queue(call) < 0) 1134 break; 1135 goto maybe_reschedule; 1136 } 1137 1138 /* other events may have been raised since we started checking */ 1139 goto maybe_reschedule; 1140 1141send_ACK_with_skew: 1142 ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) - 1143 ntohl(ack.serial)); 1144send_ACK: 1145 mtu = call->conn->trans->peer->if_mtu; 1146 mtu -= call->conn->trans->peer->hdrsize; 1147 ackinfo.maxMTU = htonl(mtu); 1148 ackinfo.rwind = htonl(rxrpc_rx_window_size); 1149 1150 /* permit the peer to send us jumbo packets if it wants to */ 1151 ackinfo.rxMTU = htonl(rxrpc_rx_mtu); 1152 ackinfo.jumbo_max = htonl(rxrpc_rx_jumbo_max); 1153 1154 serial = atomic_inc_return(&call->conn->serial); 1155 whdr.serial = htonl(serial); 1156 _proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", 1157 serial, 1158 ntohs(ack.maxSkew), 1159 ntohl(ack.firstPacket), 1160 ntohl(ack.previousPacket), 1161 ntohl(ack.serial), 1162 rxrpc_acks(ack.reason), 1163 ack.nAcks); 1164 1165 del_timer_sync(&call->ack_timer); 1166 if (ack.nAcks > 0) 1167 set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags); 1168 goto send_message_2; 1169 1170send_message: 1171 _debug("send message"); 1172 1173 serial = atomic_inc_return(&call->conn->serial); 1174 whdr.serial = htonl(serial); 1175 _proto("Tx %s %%%u", rxrpc_pkts[whdr.type], serial); 1176send_message_2: 1177 1178 len = iov[0].iov_len; 1179 ioc = 1; 1180 if (iov[4].iov_len) { 1181 ioc = 5; 1182 len += iov[4].iov_len; 1183 len += iov[3].iov_len; 1184 len += iov[2].iov_len; 1185 len += iov[1].iov_len; 1186 } else if (iov[3].iov_len) { 1187 ioc = 4; 1188 len += iov[3].iov_len; 1189 len += iov[2].iov_len; 1190 len += iov[1].iov_len; 1191 } else if (iov[2].iov_len) { 1192 ioc = 3; 1193 len += iov[2].iov_len; 1194 len += iov[1].iov_len; 1195 } else if (iov[1].iov_len) { 1196 ioc = 2; 1197 len += iov[1].iov_len; 1198 } 1199 1200 ret = kernel_sendmsg(call->conn->trans->local->socket, 1201 &msg, iov, ioc, len); 1202 if (ret < 0) { 1203 _debug("sendmsg failed: %d", ret); 1204 read_lock_bh(&call->state_lock); 1205 if (call->state < RXRPC_CALL_DEAD) 1206 rxrpc_queue_call(call); 1207 read_unlock_bh(&call->state_lock); 1208 goto error; 1209 } 1210 1211 switch (genbit) { 1212 case RXRPC_CALL_EV_ABORT: 1213 clear_bit(genbit, &call->events); 1214 clear_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events); 1215 goto kill_ACKs; 1216 1217 case RXRPC_CALL_EV_ACK_FINAL: 1218 write_lock_bh(&call->state_lock); 1219 if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK) 1220 call->state = RXRPC_CALL_COMPLETE; 1221 write_unlock_bh(&call->state_lock); 1222 goto kill_ACKs; 1223 1224 default: 1225 clear_bit(genbit, &call->events); 1226 switch (call->state) { 1227 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 1228 case RXRPC_CALL_CLIENT_RECV_REPLY: 1229 case RXRPC_CALL_SERVER_RECV_REQUEST: 1230 case RXRPC_CALL_SERVER_ACK_REQUEST: 1231 _debug("start ACK timer"); 1232 rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 1233 call->ackr_serial, false); 1234 default: 1235 break; 1236 } 1237 goto maybe_reschedule; 1238 } 1239 1240kill_ACKs: 1241 del_timer_sync(&call->ack_timer); 1242 if (test_and_clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events)) 1243 rxrpc_put_call(call); 1244 clear_bit(RXRPC_CALL_EV_ACK, &call->events); 1245 1246maybe_reschedule: 1247 if (call->events || !skb_queue_empty(&call->rx_queue)) { 1248 read_lock_bh(&call->state_lock); 1249 if (call->state < RXRPC_CALL_DEAD) 1250 rxrpc_queue_call(call); 1251 read_unlock_bh(&call->state_lock); 1252 } 1253 1254 /* don't leave aborted connections on the accept queue */ 1255 if (call->state >= RXRPC_CALL_COMPLETE && 1256 !list_empty(&call->accept_link)) { 1257 _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }", 1258 call, call->events, call->flags, call->conn->cid); 1259 1260 read_lock_bh(&call->state_lock); 1261 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 1262 !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events)) 1263 rxrpc_queue_call(call); 1264 read_unlock_bh(&call->state_lock); 1265 } 1266 1267error: 1268 clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags); 1269 kfree(acks); 1270 1271 /* because we don't want two CPUs both processing the work item for one 1272 * call at the same time, we use a flag to note when it's busy; however 1273 * this means there's a race between clearing the flag and setting the 1274 * work pending bit and the work item being processed again */ 1275 if (call->events && !work_pending(&call->processor)) { 1276 _debug("jumpstart %x", call->conn->cid); 1277 rxrpc_queue_call(call); 1278 } 1279 1280 _leave(""); 1281 return; 1282 1283no_mem: 1284 _debug("out of memory"); 1285 goto maybe_reschedule; 1286}