at v2.6.16 2278 lines 58 kB view raw
1/* call.c: Rx call routines 2 * 3 * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12#include <linux/sched.h> 13#include <linux/slab.h> 14#include <linux/module.h> 15#include <rxrpc/rxrpc.h> 16#include <rxrpc/transport.h> 17#include <rxrpc/peer.h> 18#include <rxrpc/connection.h> 19#include <rxrpc/call.h> 20#include <rxrpc/message.h> 21#include "internal.h" 22 23__RXACCT_DECL(atomic_t rxrpc_call_count); 24__RXACCT_DECL(atomic_t rxrpc_message_count); 25 26LIST_HEAD(rxrpc_calls); 27DECLARE_RWSEM(rxrpc_calls_sem); 28 29unsigned rxrpc_call_rcv_timeout = HZ/3; 30static unsigned rxrpc_call_acks_timeout = HZ/3; 31static unsigned rxrpc_call_dfr_ack_timeout = HZ/20; 32static unsigned short rxrpc_call_max_resend = HZ/10; 33 34const char *rxrpc_call_states[] = { 35 "COMPLETE", 36 "ERROR", 37 "SRVR_RCV_OPID", 38 "SRVR_RCV_ARGS", 39 "SRVR_GOT_ARGS", 40 "SRVR_SND_REPLY", 41 "SRVR_RCV_FINAL_ACK", 42 "CLNT_SND_ARGS", 43 "CLNT_RCV_REPLY", 44 "CLNT_GOT_REPLY" 45}; 46 47const char *rxrpc_call_error_states[] = { 48 "NO_ERROR", 49 "LOCAL_ABORT", 50 "PEER_ABORT", 51 "LOCAL_ERROR", 52 "REMOTE_ERROR" 53}; 54 55const char *rxrpc_pkts[] = { 56 "?00", 57 "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug", 58 "?09", "?10", "?11", "?12", "?13", "?14", "?15" 59}; 60 61static const char *rxrpc_acks[] = { 62 "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL", 63 "-?-" 64}; 65 66static const char _acktype[] = "NA-"; 67 68static void rxrpc_call_receive_packet(struct rxrpc_call *call); 69static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, 70 struct rxrpc_message *msg); 71static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, 72 struct rxrpc_message *msg); 73static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, 74 rxrpc_seq_t higest); 75static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest); 76static int __rxrpc_call_read_data(struct rxrpc_call *call); 77 78static int rxrpc_call_record_ACK(struct rxrpc_call *call, 79 struct rxrpc_message *msg, 80 rxrpc_seq_t seq, 81 size_t count); 82 83static int rxrpc_call_flush(struct rxrpc_call *call); 84 85#define _state(call) \ 86 _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]); 87 88static void rxrpc_call_default_attn_func(struct rxrpc_call *call) 89{ 90 wake_up(&call->waitq); 91} 92 93static void rxrpc_call_default_error_func(struct rxrpc_call *call) 94{ 95 wake_up(&call->waitq); 96} 97 98static void rxrpc_call_default_aemap_func(struct rxrpc_call *call) 99{ 100 switch (call->app_err_state) { 101 case RXRPC_ESTATE_LOCAL_ABORT: 102 call->app_abort_code = -call->app_errno; 103 case RXRPC_ESTATE_PEER_ABORT: 104 call->app_errno = -ECONNABORTED; 105 default: 106 break; 107 } 108} 109 110static void __rxrpc_call_acks_timeout(unsigned long _call) 111{ 112 struct rxrpc_call *call = (struct rxrpc_call *) _call; 113 114 _debug("ACKS TIMEOUT %05lu", jiffies - call->cjif); 115 116 call->flags |= RXRPC_CALL_ACKS_TIMO; 117 rxrpc_krxiod_queue_call(call); 118} 119 120static void __rxrpc_call_rcv_timeout(unsigned long _call) 121{ 122 struct rxrpc_call *call = (struct rxrpc_call *) _call; 123 124 _debug("RCV TIMEOUT %05lu", jiffies - call->cjif); 125 126 call->flags |= RXRPC_CALL_RCV_TIMO; 127 rxrpc_krxiod_queue_call(call); 128} 129 130static void __rxrpc_call_ackr_timeout(unsigned long _call) 131{ 132 struct rxrpc_call *call = (struct rxrpc_call *) _call; 133 134 _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif); 135 136 call->flags |= RXRPC_CALL_ACKR_TIMO; 137 rxrpc_krxiod_queue_call(call); 138} 139 140/*****************************************************************************/ 141/* 142 * calculate a timeout based on an RTT value 143 */ 144static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call, 145 unsigned long val) 146{ 147 unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ); 148 149 expiry += 10; 150 if (expiry < HZ / 25) 151 expiry = HZ / 25; 152 if (expiry > HZ) 153 expiry = HZ; 154 155 _leave(" = %lu jiffies", expiry); 156 return jiffies + expiry; 157} /* end __rxrpc_rtt_based_timeout() */ 158 159/*****************************************************************************/ 160/* 161 * create a new call record 162 */ 163static inline int __rxrpc_create_call(struct rxrpc_connection *conn, 164 struct rxrpc_call **_call) 165{ 166 struct rxrpc_call *call; 167 168 _enter("%p", conn); 169 170 /* allocate and initialise a call record */ 171 call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL); 172 if (!call) { 173 _leave(" ENOMEM"); 174 return -ENOMEM; 175 } 176 177 atomic_set(&call->usage, 1); 178 179 init_waitqueue_head(&call->waitq); 180 spin_lock_init(&call->lock); 181 INIT_LIST_HEAD(&call->link); 182 INIT_LIST_HEAD(&call->acks_pendq); 183 INIT_LIST_HEAD(&call->rcv_receiveq); 184 INIT_LIST_HEAD(&call->rcv_krxiodq_lk); 185 INIT_LIST_HEAD(&call->app_readyq); 186 INIT_LIST_HEAD(&call->app_unreadyq); 187 INIT_LIST_HEAD(&call->app_link); 188 INIT_LIST_HEAD(&call->app_attn_link); 189 190 init_timer(&call->acks_timeout); 191 call->acks_timeout.data = (unsigned long) call; 192 call->acks_timeout.function = __rxrpc_call_acks_timeout; 193 194 init_timer(&call->rcv_timeout); 195 call->rcv_timeout.data = (unsigned long) call; 196 call->rcv_timeout.function = __rxrpc_call_rcv_timeout; 197 198 init_timer(&call->ackr_dfr_timo); 199 call->ackr_dfr_timo.data = (unsigned long) call; 200 call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout; 201 202 call->conn = conn; 203 call->ackr_win_bot = 1; 204 call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1; 205 call->ackr_prev_seq = 0; 206 call->app_mark = RXRPC_APP_MARK_EOF; 207 call->app_attn_func = rxrpc_call_default_attn_func; 208 call->app_error_func = rxrpc_call_default_error_func; 209 call->app_aemap_func = rxrpc_call_default_aemap_func; 210 call->app_scr_alloc = call->app_scratch; 211 212 call->cjif = jiffies; 213 214 _leave(" = 0 (%p)", call); 215 216 *_call = call; 217 218 return 0; 219} /* end __rxrpc_create_call() */ 220 221/*****************************************************************************/ 222/* 223 * create a new call record for outgoing calls 224 */ 225int rxrpc_create_call(struct rxrpc_connection *conn, 226 rxrpc_call_attn_func_t attn, 227 rxrpc_call_error_func_t error, 228 rxrpc_call_aemap_func_t aemap, 229 struct rxrpc_call **_call) 230{ 231 DECLARE_WAITQUEUE(myself, current); 232 233 struct rxrpc_call *call; 234 int ret, cix, loop; 235 236 _enter("%p", conn); 237 238 /* allocate and initialise a call record */ 239 ret = __rxrpc_create_call(conn, &call); 240 if (ret < 0) { 241 _leave(" = %d", ret); 242 return ret; 243 } 244 245 call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS; 246 if (attn) 247 call->app_attn_func = attn; 248 if (error) 249 call->app_error_func = error; 250 if (aemap) 251 call->app_aemap_func = aemap; 252 253 _state(call); 254 255 spin_lock(&conn->lock); 256 set_current_state(TASK_INTERRUPTIBLE); 257 add_wait_queue(&conn->chanwait, &myself); 258 259 try_again: 260 /* try to find an unused channel */ 261 for (cix = 0; cix < 4; cix++) 262 if (!conn->channels[cix]) 263 goto obtained_chan; 264 265 /* no free channels - wait for one to become available */ 266 ret = -EINTR; 267 if (signal_pending(current)) 268 goto error_unwait; 269 270 spin_unlock(&conn->lock); 271 272 schedule(); 273 set_current_state(TASK_INTERRUPTIBLE); 274 275 spin_lock(&conn->lock); 276 goto try_again; 277 278 /* got a channel - now attach to the connection */ 279 obtained_chan: 280 remove_wait_queue(&conn->chanwait, &myself); 281 set_current_state(TASK_RUNNING); 282 283 /* concoct a unique call number */ 284 next_callid: 285 call->call_id = htonl(++conn->call_counter); 286 for (loop = 0; loop < 4; loop++) 287 if (conn->channels[loop] && 288 conn->channels[loop]->call_id == call->call_id) 289 goto next_callid; 290 291 rxrpc_get_connection(conn); 292 conn->channels[cix] = call; /* assign _after_ done callid check loop */ 293 do_gettimeofday(&conn->atime); 294 call->chan_ix = htonl(cix); 295 296 spin_unlock(&conn->lock); 297 298 down_write(&rxrpc_calls_sem); 299 list_add_tail(&call->call_link, &rxrpc_calls); 300 up_write(&rxrpc_calls_sem); 301 302 __RXACCT(atomic_inc(&rxrpc_call_count)); 303 *_call = call; 304 305 _leave(" = 0 (call=%p cix=%u)", call, cix); 306 return 0; 307 308 error_unwait: 309 remove_wait_queue(&conn->chanwait, &myself); 310 set_current_state(TASK_RUNNING); 311 spin_unlock(&conn->lock); 312 313 free_page((unsigned long) call); 314 _leave(" = %d", ret); 315 return ret; 316} /* end rxrpc_create_call() */ 317 318/*****************************************************************************/ 319/* 320 * create a new call record for incoming calls 321 */ 322int rxrpc_incoming_call(struct rxrpc_connection *conn, 323 struct rxrpc_message *msg, 324 struct rxrpc_call **_call) 325{ 326 struct rxrpc_call *call; 327 unsigned cix; 328 int ret; 329 330 cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK; 331 332 _enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix); 333 334 /* allocate and initialise a call record */ 335 ret = __rxrpc_create_call(conn, &call); 336 if (ret < 0) { 337 _leave(" = %d", ret); 338 return ret; 339 } 340 341 call->pkt_rcv_count = 1; 342 call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID; 343 call->app_mark = sizeof(uint32_t); 344 345 _state(call); 346 347 /* attach to the connection */ 348 ret = -EBUSY; 349 call->chan_ix = htonl(cix); 350 call->call_id = msg->hdr.callNumber; 351 352 spin_lock(&conn->lock); 353 354 if (!conn->channels[cix] || 355 conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE || 356 conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR 357 ) { 358 conn->channels[cix] = call; 359 rxrpc_get_connection(conn); 360 ret = 0; 361 } 362 363 spin_unlock(&conn->lock); 364 365 if (ret < 0) { 366 free_page((unsigned long) call); 367 call = NULL; 368 } 369 370 if (ret == 0) { 371 down_write(&rxrpc_calls_sem); 372 list_add_tail(&call->call_link, &rxrpc_calls); 373 up_write(&rxrpc_calls_sem); 374 __RXACCT(atomic_inc(&rxrpc_call_count)); 375 *_call = call; 376 } 377 378 _leave(" = %d [%p]", ret, call); 379 return ret; 380} /* end rxrpc_incoming_call() */ 381 382/*****************************************************************************/ 383/* 384 * free a call record 385 */ 386void rxrpc_put_call(struct rxrpc_call *call) 387{ 388 struct rxrpc_connection *conn = call->conn; 389 struct rxrpc_message *msg; 390 391 _enter("%p{u=%d}",call,atomic_read(&call->usage)); 392 393 /* sanity check */ 394 if (atomic_read(&call->usage) <= 0) 395 BUG(); 396 397 /* to prevent a race, the decrement and the de-list must be effectively 398 * atomic */ 399 spin_lock(&conn->lock); 400 if (likely(!atomic_dec_and_test(&call->usage))) { 401 spin_unlock(&conn->lock); 402 _leave(""); 403 return; 404 } 405 406 if (conn->channels[ntohl(call->chan_ix)] == call) 407 conn->channels[ntohl(call->chan_ix)] = NULL; 408 409 spin_unlock(&conn->lock); 410 411 wake_up(&conn->chanwait); 412 413 rxrpc_put_connection(conn); 414 415 /* clear the timers and dequeue from krxiod */ 416 del_timer_sync(&call->acks_timeout); 417 del_timer_sync(&call->rcv_timeout); 418 del_timer_sync(&call->ackr_dfr_timo); 419 420 rxrpc_krxiod_dequeue_call(call); 421 422 /* clean up the contents of the struct */ 423 if (call->snd_nextmsg) 424 rxrpc_put_message(call->snd_nextmsg); 425 426 if (call->snd_ping) 427 rxrpc_put_message(call->snd_ping); 428 429 while (!list_empty(&call->acks_pendq)) { 430 msg = list_entry(call->acks_pendq.next, 431 struct rxrpc_message, link); 432 list_del(&msg->link); 433 rxrpc_put_message(msg); 434 } 435 436 while (!list_empty(&call->rcv_receiveq)) { 437 msg = list_entry(call->rcv_receiveq.next, 438 struct rxrpc_message, link); 439 list_del(&msg->link); 440 rxrpc_put_message(msg); 441 } 442 443 while (!list_empty(&call->app_readyq)) { 444 msg = list_entry(call->app_readyq.next, 445 struct rxrpc_message, link); 446 list_del(&msg->link); 447 rxrpc_put_message(msg); 448 } 449 450 while (!list_empty(&call->app_unreadyq)) { 451 msg = list_entry(call->app_unreadyq.next, 452 struct rxrpc_message, link); 453 list_del(&msg->link); 454 rxrpc_put_message(msg); 455 } 456 457 module_put(call->owner); 458 459 down_write(&rxrpc_calls_sem); 460 list_del(&call->call_link); 461 up_write(&rxrpc_calls_sem); 462 463 __RXACCT(atomic_dec(&rxrpc_call_count)); 464 free_page((unsigned long) call); 465 466 _leave(" [destroyed]"); 467} /* end rxrpc_put_call() */ 468 469/*****************************************************************************/ 470/* 471 * actually generate a normal ACK 472 */ 473static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call, 474 rxrpc_seq_t seq) 475{ 476 struct rxrpc_message *msg; 477 struct kvec diov[3]; 478 __be32 aux[4]; 479 int delta, ret; 480 481 /* ACKs default to DELAY */ 482 if (!call->ackr.reason) 483 call->ackr.reason = RXRPC_ACK_DELAY; 484 485 _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", 486 jiffies - call->cjif, 487 ntohs(call->ackr.maxSkew), 488 ntohl(call->ackr.firstPacket), 489 ntohl(call->ackr.previousPacket), 490 ntohl(call->ackr.serial), 491 rxrpc_acks[call->ackr.reason], 492 call->ackr.nAcks); 493 494 aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */ 495 aux[1] = htonl(1444); /* max MTU */ 496 aux[2] = htonl(16); /* rwind */ 497 aux[3] = htonl(4); /* max packets */ 498 499 diov[0].iov_len = sizeof(struct rxrpc_ackpacket); 500 diov[0].iov_base = &call->ackr; 501 diov[1].iov_len = call->ackr_pend_cnt + 3; 502 diov[1].iov_base = call->ackr_array; 503 diov[2].iov_len = sizeof(aux); 504 diov[2].iov_base = &aux; 505 506 /* build and send the message */ 507 ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, 508 3, diov, GFP_KERNEL, &msg); 509 if (ret < 0) 510 goto out; 511 512 msg->seq = seq; 513 msg->hdr.seq = htonl(seq); 514 msg->hdr.flags |= RXRPC_SLOW_START_OK; 515 516 ret = rxrpc_conn_sendmsg(call->conn, msg); 517 rxrpc_put_message(msg); 518 if (ret < 0) 519 goto out; 520 call->pkt_snd_count++; 521 522 /* count how many actual ACKs there were at the front */ 523 for (delta = 0; delta < call->ackr_pend_cnt; delta++) 524 if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK) 525 break; 526 527 call->ackr_pend_cnt -= delta; /* all ACK'd to this point */ 528 529 /* crank the ACK window around */ 530 if (delta == 0) { 531 /* un-ACK'd window */ 532 } 533 else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) { 534 /* partially ACK'd window 535 * - shuffle down to avoid losing out-of-sequence packets 536 */ 537 call->ackr_win_bot += delta; 538 call->ackr_win_top += delta; 539 540 memmove(&call->ackr_array[0], 541 &call->ackr_array[delta], 542 call->ackr_pend_cnt); 543 544 memset(&call->ackr_array[call->ackr_pend_cnt], 545 RXRPC_ACK_TYPE_NACK, 546 sizeof(call->ackr_array) - call->ackr_pend_cnt); 547 } 548 else { 549 /* fully ACK'd window 550 * - just clear the whole thing 551 */ 552 memset(&call->ackr_array, 553 RXRPC_ACK_TYPE_NACK, 554 sizeof(call->ackr_array)); 555 } 556 557 /* clear this ACK */ 558 memset(&call->ackr, 0, sizeof(call->ackr)); 559 560 out: 561 if (!call->app_call_state) 562 printk("___ STATE 0 ___\n"); 563 return ret; 564} /* end __rxrpc_call_gen_normal_ACK() */ 565 566/*****************************************************************************/ 567/* 568 * note the reception of a packet in the call's ACK records and generate an 569 * appropriate ACK packet if necessary 570 * - returns 0 if packet should be processed, 1 if packet should be ignored 571 * and -ve on an error 572 */ 573static int rxrpc_call_generate_ACK(struct rxrpc_call *call, 574 struct rxrpc_header *hdr, 575 struct rxrpc_ackpacket *ack) 576{ 577 struct rxrpc_message *msg; 578 rxrpc_seq_t seq; 579 unsigned offset; 580 int ret = 0, err; 581 u8 special_ACK, do_ACK, force; 582 583 _enter("%p,%p { seq=%d tp=%d fl=%02x }", 584 call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags); 585 586 seq = ntohl(hdr->seq); 587 offset = seq - call->ackr_win_bot; 588 do_ACK = RXRPC_ACK_DELAY; 589 special_ACK = 0; 590 force = (seq == 1); 591 592 if (call->ackr_high_seq < seq) 593 call->ackr_high_seq = seq; 594 595 /* deal with generation of obvious special ACKs first */ 596 if (ack && ack->reason == RXRPC_ACK_PING) { 597 special_ACK = RXRPC_ACK_PING_RESPONSE; 598 ret = 1; 599 goto gen_ACK; 600 } 601 602 if (seq < call->ackr_win_bot) { 603 special_ACK = RXRPC_ACK_DUPLICATE; 604 ret = 1; 605 goto gen_ACK; 606 } 607 608 if (seq >= call->ackr_win_top) { 609 special_ACK = RXRPC_ACK_EXCEEDS_WINDOW; 610 ret = 1; 611 goto gen_ACK; 612 } 613 614 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) { 615 special_ACK = RXRPC_ACK_DUPLICATE; 616 ret = 1; 617 goto gen_ACK; 618 } 619 620 /* okay... it's a normal data packet inside the ACK window */ 621 call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK; 622 623 if (offset < call->ackr_pend_cnt) { 624 } 625 else if (offset > call->ackr_pend_cnt) { 626 do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE; 627 call->ackr_pend_cnt = offset; 628 goto gen_ACK; 629 } 630 631 if (hdr->flags & RXRPC_REQUEST_ACK) { 632 do_ACK = RXRPC_ACK_REQUESTED; 633 } 634 635 /* generate an ACK on the final packet of a reply just received */ 636 if (hdr->flags & RXRPC_LAST_PACKET) { 637 if (call->conn->out_clientflag) 638 force = 1; 639 } 640 else if (!(hdr->flags & RXRPC_MORE_PACKETS)) { 641 do_ACK = RXRPC_ACK_REQUESTED; 642 } 643 644 /* re-ACK packets previously received out-of-order */ 645 for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++) 646 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK) 647 break; 648 649 call->ackr_pend_cnt = offset; 650 651 /* generate an ACK if we fill up the window */ 652 if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE) 653 force = 1; 654 655 gen_ACK: 656 _debug("%05lu ACKs pend=%u norm=%s special=%s%s", 657 jiffies - call->cjif, 658 call->ackr_pend_cnt, 659 rxrpc_acks[do_ACK], 660 rxrpc_acks[special_ACK], 661 force ? " immediate" : 662 do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" : 663 hdr->flags & RXRPC_LAST_PACKET ? " finalise" : 664 " defer" 665 ); 666 667 /* send any pending normal ACKs if need be */ 668 if (call->ackr_pend_cnt > 0) { 669 /* fill out the appropriate form */ 670 call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); 671 call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq, 672 65535U)); 673 call->ackr.firstPacket = htonl(call->ackr_win_bot); 674 call->ackr.previousPacket = call->ackr_prev_seq; 675 call->ackr.serial = hdr->serial; 676 call->ackr.nAcks = call->ackr_pend_cnt; 677 678 if (do_ACK == RXRPC_ACK_REQUESTED) 679 call->ackr.reason = do_ACK; 680 681 /* generate the ACK immediately if necessary */ 682 if (special_ACK || force) { 683 err = __rxrpc_call_gen_normal_ACK( 684 call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq); 685 if (err < 0) { 686 ret = err; 687 goto out; 688 } 689 } 690 } 691 692 if (call->ackr.reason == RXRPC_ACK_REQUESTED) 693 call->ackr_dfr_seq = seq; 694 695 /* start the ACK timer if not running if there are any pending deferred 696 * ACKs */ 697 if (call->ackr_pend_cnt > 0 && 698 call->ackr.reason != RXRPC_ACK_REQUESTED && 699 !timer_pending(&call->ackr_dfr_timo) 700 ) { 701 unsigned long timo; 702 703 timo = rxrpc_call_dfr_ack_timeout + jiffies; 704 705 _debug("START ACKR TIMER for cj=%lu", timo - call->cjif); 706 707 spin_lock(&call->lock); 708 mod_timer(&call->ackr_dfr_timo, timo); 709 spin_unlock(&call->lock); 710 } 711 else if ((call->ackr_pend_cnt == 0 || 712 call->ackr.reason == RXRPC_ACK_REQUESTED) && 713 timer_pending(&call->ackr_dfr_timo) 714 ) { 715 /* stop timer if no pending ACKs */ 716 _debug("CLEAR ACKR TIMER"); 717 del_timer_sync(&call->ackr_dfr_timo); 718 } 719 720 /* send a special ACK if one is required */ 721 if (special_ACK) { 722 struct rxrpc_ackpacket ack; 723 struct kvec diov[2]; 724 uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK }; 725 726 /* fill out the appropriate form */ 727 ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); 728 ack.maxSkew = htons(min(call->ackr_high_seq - seq, 729 65535U)); 730 ack.firstPacket = htonl(call->ackr_win_bot); 731 ack.previousPacket = call->ackr_prev_seq; 732 ack.serial = hdr->serial; 733 ack.reason = special_ACK; 734 ack.nAcks = 0; 735 736 _proto("Rx Sending s-ACK" 737 " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", 738 ntohs(ack.maxSkew), 739 ntohl(ack.firstPacket), 740 ntohl(ack.previousPacket), 741 ntohl(ack.serial), 742 rxrpc_acks[ack.reason], 743 ack.nAcks); 744 745 diov[0].iov_len = sizeof(struct rxrpc_ackpacket); 746 diov[0].iov_base = &ack; 747 diov[1].iov_len = sizeof(acks); 748 diov[1].iov_base = acks; 749 750 /* build and send the message */ 751 err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, 752 hdr->seq ? 2 : 1, diov, 753 GFP_KERNEL, 754 &msg); 755 if (err < 0) { 756 ret = err; 757 goto out; 758 } 759 760 msg->seq = seq; 761 msg->hdr.seq = htonl(seq); 762 msg->hdr.flags |= RXRPC_SLOW_START_OK; 763 764 err = rxrpc_conn_sendmsg(call->conn, msg); 765 rxrpc_put_message(msg); 766 if (err < 0) { 767 ret = err; 768 goto out; 769 } 770 call->pkt_snd_count++; 771 } 772 773 out: 774 if (hdr->seq) 775 call->ackr_prev_seq = hdr->seq; 776 777 _leave(" = %d", ret); 778 return ret; 779} /* end rxrpc_call_generate_ACK() */ 780 781/*****************************************************************************/ 782/* 783 * handle work to be done on a call 784 * - includes packet reception and timeout processing 785 */ 786void rxrpc_call_do_stuff(struct rxrpc_call *call) 787{ 788 _enter("%p{flags=%lx}", call, call->flags); 789 790 /* handle packet reception */ 791 if (call->flags & RXRPC_CALL_RCV_PKT) { 792 _debug("- receive packet"); 793 call->flags &= ~RXRPC_CALL_RCV_PKT; 794 rxrpc_call_receive_packet(call); 795 } 796 797 /* handle overdue ACKs */ 798 if (call->flags & RXRPC_CALL_ACKS_TIMO) { 799 _debug("- overdue ACK timeout"); 800 call->flags &= ~RXRPC_CALL_ACKS_TIMO; 801 rxrpc_call_resend(call, call->snd_seq_count); 802 } 803 804 /* handle lack of reception */ 805 if (call->flags & RXRPC_CALL_RCV_TIMO) { 806 _debug("- reception timeout"); 807 call->flags &= ~RXRPC_CALL_RCV_TIMO; 808 rxrpc_call_abort(call, -EIO); 809 } 810 811 /* handle deferred ACKs */ 812 if (call->flags & RXRPC_CALL_ACKR_TIMO || 813 (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED) 814 ) { 815 _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u", 816 jiffies - call->cjif, 817 rxrpc_acks[call->ackr.reason], 818 call->ackr.nAcks); 819 820 call->flags &= ~RXRPC_CALL_ACKR_TIMO; 821 822 if (call->ackr.nAcks > 0 && 823 call->app_call_state != RXRPC_CSTATE_ERROR) { 824 /* generate ACK */ 825 __rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq); 826 call->ackr_dfr_seq = 0; 827 } 828 } 829 830 _leave(""); 831 832} /* end rxrpc_call_do_stuff() */ 833 834/*****************************************************************************/ 835/* 836 * send an abort message at call or connection level 837 * - must be called with call->lock held 838 * - the supplied error code is sent as the packet data 839 */ 840static int __rxrpc_call_abort(struct rxrpc_call *call, int errno) 841{ 842 struct rxrpc_connection *conn = call->conn; 843 struct rxrpc_message *msg; 844 struct kvec diov[1]; 845 int ret; 846 __be32 _error; 847 848 _enter("%p{%08x},%p{%d},%d", 849 conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno); 850 851 /* if this call is already aborted, then just wake up any waiters */ 852 if (call->app_call_state == RXRPC_CSTATE_ERROR) { 853 spin_unlock(&call->lock); 854 call->app_error_func(call); 855 _leave(" = 0"); 856 return 0; 857 } 858 859 rxrpc_get_call(call); 860 861 /* change the state _with_ the lock still held */ 862 call->app_call_state = RXRPC_CSTATE_ERROR; 863 call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT; 864 call->app_errno = errno; 865 call->app_mark = RXRPC_APP_MARK_EOF; 866 call->app_read_buf = NULL; 867 call->app_async_read = 0; 868 869 _state(call); 870 871 /* ask the app to translate the error code */ 872 call->app_aemap_func(call); 873 874 spin_unlock(&call->lock); 875 876 /* flush any outstanding ACKs */ 877 del_timer_sync(&call->acks_timeout); 878 del_timer_sync(&call->rcv_timeout); 879 del_timer_sync(&call->ackr_dfr_timo); 880 881 if (rxrpc_call_is_ack_pending(call)) 882 __rxrpc_call_gen_normal_ACK(call, 0); 883 884 /* send the abort packet only if we actually traded some other 885 * packets */ 886 ret = 0; 887 if (call->pkt_snd_count || call->pkt_rcv_count) { 888 /* actually send the abort */ 889 _proto("Rx Sending Call ABORT { data=%d }", 890 call->app_abort_code); 891 892 _error = htonl(call->app_abort_code); 893 894 diov[0].iov_len = sizeof(_error); 895 diov[0].iov_base = &_error; 896 897 ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT, 898 1, diov, GFP_KERNEL, &msg); 899 if (ret == 0) { 900 ret = rxrpc_conn_sendmsg(conn, msg); 901 rxrpc_put_message(msg); 902 } 903 } 904 905 /* tell the app layer to let go */ 906 call->app_error_func(call); 907 908 rxrpc_put_call(call); 909 910 _leave(" = %d", ret); 911 return ret; 912} /* end __rxrpc_call_abort() */ 913 914/*****************************************************************************/ 915/* 916 * send an abort message at call or connection level 917 * - the supplied error code is sent as the packet data 918 */ 919int rxrpc_call_abort(struct rxrpc_call *call, int error) 920{ 921 spin_lock(&call->lock); 922 923 return __rxrpc_call_abort(call, error); 924 925} /* end rxrpc_call_abort() */ 926 927/*****************************************************************************/ 928/* 929 * process packets waiting for this call 930 */ 931static void rxrpc_call_receive_packet(struct rxrpc_call *call) 932{ 933 struct rxrpc_message *msg; 934 struct list_head *_p; 935 936 _enter("%p", call); 937 938 rxrpc_get_call(call); /* must not go away too soon if aborted by 939 * app-layer */ 940 941 while (!list_empty(&call->rcv_receiveq)) { 942 /* try to get next packet */ 943 _p = NULL; 944 spin_lock(&call->lock); 945 if (!list_empty(&call->rcv_receiveq)) { 946 _p = call->rcv_receiveq.next; 947 list_del_init(_p); 948 } 949 spin_unlock(&call->lock); 950 951 if (!_p) 952 break; 953 954 msg = list_entry(_p, struct rxrpc_message, link); 955 956 _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)", 957 jiffies - call->cjif, 958 rxrpc_pkts[msg->hdr.type], 959 ntohl(msg->hdr.serial), 960 msg->seq, 961 msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-', 962 msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-', 963 msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-', 964 msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-', 965 msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S' 966 ); 967 968 switch (msg->hdr.type) { 969 /* deal with data packets */ 970 case RXRPC_PACKET_TYPE_DATA: 971 /* ACK the packet if necessary */ 972 switch (rxrpc_call_generate_ACK(call, &msg->hdr, 973 NULL)) { 974 case 0: /* useful packet */ 975 rxrpc_call_receive_data_packet(call, msg); 976 break; 977 case 1: /* duplicate or out-of-window packet */ 978 break; 979 default: 980 rxrpc_put_message(msg); 981 goto out; 982 } 983 break; 984 985 /* deal with ACK packets */ 986 case RXRPC_PACKET_TYPE_ACK: 987 rxrpc_call_receive_ack_packet(call, msg); 988 break; 989 990 /* deal with abort packets */ 991 case RXRPC_PACKET_TYPE_ABORT: { 992 __be32 _dbuf, *dp; 993 994 dp = skb_header_pointer(msg->pkt, msg->offset, 995 sizeof(_dbuf), &_dbuf); 996 if (dp == NULL) 997 printk("Rx Received short ABORT packet\n"); 998 999 _proto("Rx Received Call ABORT { data=%d }", 1000 (dp ? ntohl(*dp) : 0)); 1001 1002 spin_lock(&call->lock); 1003 call->app_call_state = RXRPC_CSTATE_ERROR; 1004 call->app_err_state = RXRPC_ESTATE_PEER_ABORT; 1005 call->app_abort_code = (dp ? ntohl(*dp) : 0); 1006 call->app_errno = -ECONNABORTED; 1007 call->app_mark = RXRPC_APP_MARK_EOF; 1008 call->app_read_buf = NULL; 1009 call->app_async_read = 0; 1010 1011 /* ask the app to translate the error code */ 1012 call->app_aemap_func(call); 1013 _state(call); 1014 spin_unlock(&call->lock); 1015 call->app_error_func(call); 1016 break; 1017 } 1018 default: 1019 /* deal with other packet types */ 1020 _proto("Rx Unsupported packet type %u (#%u)", 1021 msg->hdr.type, msg->seq); 1022 break; 1023 } 1024 1025 rxrpc_put_message(msg); 1026 } 1027 1028 out: 1029 rxrpc_put_call(call); 1030 _leave(""); 1031} /* end rxrpc_call_receive_packet() */ 1032 1033/*****************************************************************************/ 1034/* 1035 * process next data packet 1036 * - as the next data packet arrives: 1037 * - it is queued on app_readyq _if_ it is the next one expected 1038 * (app_ready_seq+1) 1039 * - it is queued on app_unreadyq _if_ it is not the next one expected 1040 * - if a packet placed on app_readyq completely fills a hole leading up to 1041 * the first packet on app_unreadyq, then packets now in sequence are 1042 * tranferred to app_readyq 1043 * - the application layer can only see packets on app_readyq 1044 * (app_ready_qty bytes) 1045 * - the application layer is prodded every time a new packet arrives 1046 */ 1047static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, 1048 struct rxrpc_message *msg) 1049{ 1050 const struct rxrpc_operation *optbl, *op; 1051 struct rxrpc_message *pmsg; 1052 struct list_head *_p; 1053 int ret, lo, hi, rmtimo; 1054 __be32 opid; 1055 1056 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq); 1057 1058 rxrpc_get_message(msg); 1059 1060 /* add to the unready queue if we'd have to create a hole in the ready 1061 * queue otherwise */ 1062 if (msg->seq != call->app_ready_seq + 1) { 1063 _debug("Call add packet %d to unreadyq", msg->seq); 1064 1065 /* insert in seq order */ 1066 list_for_each(_p, &call->app_unreadyq) { 1067 pmsg = list_entry(_p, struct rxrpc_message, link); 1068 if (pmsg->seq > msg->seq) 1069 break; 1070 } 1071 1072 list_add_tail(&msg->link, _p); 1073 1074 _leave(" [unreadyq]"); 1075 return; 1076 } 1077 1078 /* next in sequence - simply append into the call's ready queue */ 1079 _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)", 1080 msg->seq, msg->dsize, call->app_ready_qty); 1081 1082 spin_lock(&call->lock); 1083 call->app_ready_seq = msg->seq; 1084 call->app_ready_qty += msg->dsize; 1085 list_add_tail(&msg->link, &call->app_readyq); 1086 1087 /* move unready packets to the readyq if we got rid of a hole */ 1088 while (!list_empty(&call->app_unreadyq)) { 1089 pmsg = list_entry(call->app_unreadyq.next, 1090 struct rxrpc_message, link); 1091 1092 if (pmsg->seq != call->app_ready_seq + 1) 1093 break; 1094 1095 /* next in sequence - just move list-to-list */ 1096 _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)", 1097 pmsg->seq, pmsg->dsize, call->app_ready_qty); 1098 1099 call->app_ready_seq = pmsg->seq; 1100 call->app_ready_qty += pmsg->dsize; 1101 list_del_init(&pmsg->link); 1102 list_add_tail(&pmsg->link, &call->app_readyq); 1103 } 1104 1105 /* see if we've got the last packet yet */ 1106 if (!list_empty(&call->app_readyq)) { 1107 pmsg = list_entry(call->app_readyq.prev, 1108 struct rxrpc_message, link); 1109 if (pmsg->hdr.flags & RXRPC_LAST_PACKET) { 1110 call->app_last_rcv = 1; 1111 _debug("Last packet on readyq"); 1112 } 1113 } 1114 1115 switch (call->app_call_state) { 1116 /* do nothing if call already aborted */ 1117 case RXRPC_CSTATE_ERROR: 1118 spin_unlock(&call->lock); 1119 _leave(" [error]"); 1120 return; 1121 1122 /* extract the operation ID from an incoming call if that's not 1123 * yet been done */ 1124 case RXRPC_CSTATE_SRVR_RCV_OPID: 1125 spin_unlock(&call->lock); 1126 1127 /* handle as yet insufficient data for the operation ID */ 1128 if (call->app_ready_qty < 4) { 1129 if (call->app_last_rcv) 1130 /* trouble - last packet seen */ 1131 rxrpc_call_abort(call, -EINVAL); 1132 1133 _leave(""); 1134 return; 1135 } 1136 1137 /* pull the operation ID out of the buffer */ 1138 ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0); 1139 if (ret < 0) { 1140 printk("Unexpected error from read-data: %d\n", ret); 1141 if (call->app_call_state != RXRPC_CSTATE_ERROR) 1142 rxrpc_call_abort(call, ret); 1143 _leave(""); 1144 return; 1145 } 1146 call->app_opcode = ntohl(opid); 1147 1148 /* locate the operation in the available ops table */ 1149 optbl = call->conn->service->ops_begin; 1150 lo = 0; 1151 hi = call->conn->service->ops_end - optbl; 1152 1153 while (lo < hi) { 1154 int mid = (hi + lo) / 2; 1155 op = &optbl[mid]; 1156 if (call->app_opcode == op->id) 1157 goto found_op; 1158 if (call->app_opcode > op->id) 1159 lo = mid + 1; 1160 else 1161 hi = mid; 1162 } 1163 1164 /* search failed */ 1165 kproto("Rx Client requested operation %d from %s service", 1166 call->app_opcode, call->conn->service->name); 1167 rxrpc_call_abort(call, -EINVAL); 1168 _leave(" [inval]"); 1169 return; 1170 1171 found_op: 1172 _proto("Rx Client requested operation %s from %s service", 1173 op->name, call->conn->service->name); 1174 1175 /* we're now waiting for the argument block (unless the call 1176 * was aborted) */ 1177 spin_lock(&call->lock); 1178 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_OPID || 1179 call->app_call_state == RXRPC_CSTATE_SRVR_SND_REPLY) { 1180 if (!call->app_last_rcv) 1181 call->app_call_state = 1182 RXRPC_CSTATE_SRVR_RCV_ARGS; 1183 else if (call->app_ready_qty > 0) 1184 call->app_call_state = 1185 RXRPC_CSTATE_SRVR_GOT_ARGS; 1186 else 1187 call->app_call_state = 1188 RXRPC_CSTATE_SRVR_SND_REPLY; 1189 call->app_mark = op->asize; 1190 call->app_user = op->user; 1191 } 1192 spin_unlock(&call->lock); 1193 1194 _state(call); 1195 break; 1196 1197 case RXRPC_CSTATE_SRVR_RCV_ARGS: 1198 /* change state if just received last packet of arg block */ 1199 if (call->app_last_rcv) 1200 call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS; 1201 spin_unlock(&call->lock); 1202 1203 _state(call); 1204 break; 1205 1206 case RXRPC_CSTATE_CLNT_RCV_REPLY: 1207 /* change state if just received last packet of reply block */ 1208 rmtimo = 0; 1209 if (call->app_last_rcv) { 1210 call->app_call_state = RXRPC_CSTATE_CLNT_GOT_REPLY; 1211 rmtimo = 1; 1212 } 1213 spin_unlock(&call->lock); 1214 1215 if (rmtimo) { 1216 del_timer_sync(&call->acks_timeout); 1217 del_timer_sync(&call->rcv_timeout); 1218 del_timer_sync(&call->ackr_dfr_timo); 1219 } 1220 1221 _state(call); 1222 break; 1223 1224 default: 1225 /* deal with data reception in an unexpected state */ 1226 printk("Unexpected state [[[ %u ]]]\n", call->app_call_state); 1227 __rxrpc_call_abort(call, -EBADMSG); 1228 _leave(""); 1229 return; 1230 } 1231 1232 if (call->app_call_state == RXRPC_CSTATE_CLNT_RCV_REPLY && 1233 call->app_last_rcv) 1234 BUG(); 1235 1236 /* otherwise just invoke the data function whenever we can satisfy its desire for more 1237 * data 1238 */ 1239 _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s", 1240 call->app_call_state, call->app_ready_qty, call->app_mark, 1241 call->app_last_rcv ? " last-rcvd" : ""); 1242 1243 spin_lock(&call->lock); 1244 1245 ret = __rxrpc_call_read_data(call); 1246 switch (ret) { 1247 case 0: 1248 spin_unlock(&call->lock); 1249 call->app_attn_func(call); 1250 break; 1251 case -EAGAIN: 1252 spin_unlock(&call->lock); 1253 break; 1254 case -ECONNABORTED: 1255 spin_unlock(&call->lock); 1256 break; 1257 default: 1258 __rxrpc_call_abort(call, ret); 1259 break; 1260 } 1261 1262 _state(call); 1263 1264 _leave(""); 1265 1266} /* end rxrpc_call_receive_data_packet() */ 1267 1268/*****************************************************************************/ 1269/* 1270 * received an ACK packet 1271 */ 1272static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, 1273 struct rxrpc_message *msg) 1274{ 1275 struct rxrpc_ackpacket _ack, *ap; 1276 rxrpc_serial_net_t serial; 1277 rxrpc_seq_t seq; 1278 int ret; 1279 1280 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq); 1281 1282 /* extract the basic ACK record */ 1283 ap = skb_header_pointer(msg->pkt, msg->offset, sizeof(_ack), &_ack); 1284 if (ap == NULL) { 1285 printk("Rx Received short ACK packet\n"); 1286 return; 1287 } 1288 msg->offset += sizeof(_ack); 1289 1290 serial = ap->serial; 1291 seq = ntohl(ap->firstPacket); 1292 1293 _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }", 1294 ntohl(msg->hdr.serial), 1295 ntohs(ap->bufferSpace), 1296 ntohs(ap->maxSkew), 1297 seq, 1298 ntohl(ap->previousPacket), 1299 ntohl(serial), 1300 rxrpc_acks[ap->reason], 1301 call->ackr.nAcks 1302 ); 1303 1304 /* check the other side isn't ACK'ing a sequence number I haven't sent 1305 * yet */ 1306 if (ap->nAcks > 0 && 1307 (seq > call->snd_seq_count || 1308 seq + ap->nAcks - 1 > call->snd_seq_count)) { 1309 printk("Received ACK (#%u-#%u) for unsent packet\n", 1310 seq, seq + ap->nAcks - 1); 1311 rxrpc_call_abort(call, -EINVAL); 1312 _leave(""); 1313 return; 1314 } 1315 1316 /* deal with RTT calculation */ 1317 if (serial) { 1318 struct rxrpc_message *rttmsg; 1319 1320 /* find the prompting packet */ 1321 spin_lock(&call->lock); 1322 if (call->snd_ping && call->snd_ping->hdr.serial == serial) { 1323 /* it was a ping packet */ 1324 rttmsg = call->snd_ping; 1325 call->snd_ping = NULL; 1326 spin_unlock(&call->lock); 1327 1328 if (rttmsg) { 1329 rttmsg->rttdone = 1; 1330 rxrpc_peer_calculate_rtt(call->conn->peer, 1331 rttmsg, msg); 1332 rxrpc_put_message(rttmsg); 1333 } 1334 } 1335 else { 1336 struct list_head *_p; 1337 1338 /* it ought to be a data packet - look in the pending 1339 * ACK list */ 1340 list_for_each(_p, &call->acks_pendq) { 1341 rttmsg = list_entry(_p, struct rxrpc_message, 1342 link); 1343 if (rttmsg->hdr.serial == serial) { 1344 if (rttmsg->rttdone) 1345 /* never do RTT twice without 1346 * resending */ 1347 break; 1348 1349 rttmsg->rttdone = 1; 1350 rxrpc_peer_calculate_rtt( 1351 call->conn->peer, rttmsg, msg); 1352 break; 1353 } 1354 } 1355 spin_unlock(&call->lock); 1356 } 1357 } 1358 1359 switch (ap->reason) { 1360 /* deal with negative/positive acknowledgement of data 1361 * packets */ 1362 case RXRPC_ACK_REQUESTED: 1363 case RXRPC_ACK_DELAY: 1364 case RXRPC_ACK_IDLE: 1365 rxrpc_call_definitively_ACK(call, seq - 1); 1366 1367 case RXRPC_ACK_DUPLICATE: 1368 case RXRPC_ACK_OUT_OF_SEQUENCE: 1369 case RXRPC_ACK_EXCEEDS_WINDOW: 1370 call->snd_resend_cnt = 0; 1371 ret = rxrpc_call_record_ACK(call, msg, seq, ap->nAcks); 1372 if (ret < 0) 1373 rxrpc_call_abort(call, ret); 1374 break; 1375 1376 /* respond to ping packets immediately */ 1377 case RXRPC_ACK_PING: 1378 rxrpc_call_generate_ACK(call, &msg->hdr, ap); 1379 break; 1380 1381 /* only record RTT on ping response packets */ 1382 case RXRPC_ACK_PING_RESPONSE: 1383 if (call->snd_ping) { 1384 struct rxrpc_message *rttmsg; 1385 1386 /* only do RTT stuff if the response matches the 1387 * retained ping */ 1388 rttmsg = NULL; 1389 spin_lock(&call->lock); 1390 if (call->snd_ping && 1391 call->snd_ping->hdr.serial == ap->serial) { 1392 rttmsg = call->snd_ping; 1393 call->snd_ping = NULL; 1394 } 1395 spin_unlock(&call->lock); 1396 1397 if (rttmsg) { 1398 rttmsg->rttdone = 1; 1399 rxrpc_peer_calculate_rtt(call->conn->peer, 1400 rttmsg, msg); 1401 rxrpc_put_message(rttmsg); 1402 } 1403 } 1404 break; 1405 1406 default: 1407 printk("Unsupported ACK reason %u\n", ap->reason); 1408 break; 1409 } 1410 1411 _leave(""); 1412} /* end rxrpc_call_receive_ack_packet() */ 1413 1414/*****************************************************************************/ 1415/* 1416 * record definitive ACKs for all messages up to and including the one with the 1417 * 'highest' seq 1418 */ 1419static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, 1420 rxrpc_seq_t highest) 1421{ 1422 struct rxrpc_message *msg; 1423 int now_complete; 1424 1425 _enter("%p{ads=%u},%u", call, call->acks_dftv_seq, highest); 1426 1427 while (call->acks_dftv_seq < highest) { 1428 call->acks_dftv_seq++; 1429 1430 _proto("Definitive ACK on packet #%u", call->acks_dftv_seq); 1431 1432 /* discard those at front of queue until message with highest 1433 * ACK is found */ 1434 spin_lock(&call->lock); 1435 msg = NULL; 1436 if (!list_empty(&call->acks_pendq)) { 1437 msg = list_entry(call->acks_pendq.next, 1438 struct rxrpc_message, link); 1439 list_del_init(&msg->link); /* dequeue */ 1440 if (msg->state == RXRPC_MSG_SENT) 1441 call->acks_pend_cnt--; 1442 } 1443 spin_unlock(&call->lock); 1444 1445 /* insanity check */ 1446 if (!msg) 1447 panic("%s(): acks_pendq unexpectedly empty\n", 1448 __FUNCTION__); 1449 1450 if (msg->seq != call->acks_dftv_seq) 1451 panic("%s(): Packet #%u expected at front of acks_pendq" 1452 " (#%u found)\n", 1453 __FUNCTION__, call->acks_dftv_seq, msg->seq); 1454 1455 /* discard the message */ 1456 msg->state = RXRPC_MSG_DONE; 1457 rxrpc_put_message(msg); 1458 } 1459 1460 /* if all sent packets are definitively ACK'd then prod any sleepers just in case */ 1461 now_complete = 0; 1462 spin_lock(&call->lock); 1463 if (call->acks_dftv_seq == call->snd_seq_count) { 1464 if (call->app_call_state != RXRPC_CSTATE_COMPLETE) { 1465 call->app_call_state = RXRPC_CSTATE_COMPLETE; 1466 _state(call); 1467 now_complete = 1; 1468 } 1469 } 1470 spin_unlock(&call->lock); 1471 1472 if (now_complete) { 1473 del_timer_sync(&call->acks_timeout); 1474 del_timer_sync(&call->rcv_timeout); 1475 del_timer_sync(&call->ackr_dfr_timo); 1476 call->app_attn_func(call); 1477 } 1478 1479 _leave(""); 1480} /* end rxrpc_call_definitively_ACK() */ 1481 1482/*****************************************************************************/ 1483/* 1484 * record the specified amount of ACKs/NAKs 1485 */ 1486static int rxrpc_call_record_ACK(struct rxrpc_call *call, 1487 struct rxrpc_message *msg, 1488 rxrpc_seq_t seq, 1489 size_t count) 1490{ 1491 struct rxrpc_message *dmsg; 1492 struct list_head *_p; 1493 rxrpc_seq_t highest; 1494 unsigned ix; 1495 size_t chunk; 1496 char resend, now_complete; 1497 u8 acks[16]; 1498 1499 _enter("%p{apc=%u ads=%u},%p,%u,%Zu", 1500 call, call->acks_pend_cnt, call->acks_dftv_seq, 1501 msg, seq, count); 1502 1503 /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order 1504 * ACKs) */ 1505 if (seq <= call->acks_dftv_seq) { 1506 unsigned delta = call->acks_dftv_seq - seq; 1507 1508 if (count <= delta) { 1509 _leave(" = 0 [all definitively ACK'd]"); 1510 return 0; 1511 } 1512 1513 seq += delta; 1514 count -= delta; 1515 msg->offset += delta; 1516 } 1517 1518 highest = seq + count - 1; 1519 resend = 0; 1520 while (count > 0) { 1521 /* extract up to 16 ACK slots at a time */ 1522 chunk = min(count, sizeof(acks)); 1523 count -= chunk; 1524 1525 memset(acks, 2, sizeof(acks)); 1526 1527 if (skb_copy_bits(msg->pkt, msg->offset, &acks, chunk) < 0) { 1528 printk("Rx Received short ACK packet\n"); 1529 _leave(" = -EINVAL"); 1530 return -EINVAL; 1531 } 1532 msg->offset += chunk; 1533 1534 /* check that the ACK set is valid */ 1535 for (ix = 0; ix < chunk; ix++) { 1536 switch (acks[ix]) { 1537 case RXRPC_ACK_TYPE_ACK: 1538 break; 1539 case RXRPC_ACK_TYPE_NACK: 1540 resend = 1; 1541 break; 1542 default: 1543 printk("Rx Received unsupported ACK state" 1544 " %u\n", acks[ix]); 1545 _leave(" = -EINVAL"); 1546 return -EINVAL; 1547 } 1548 } 1549 1550 _proto("Rx ACK of packets #%u-#%u " 1551 "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)", 1552 seq, (unsigned) (seq + chunk - 1), 1553 _acktype[acks[0x0]], 1554 _acktype[acks[0x1]], 1555 _acktype[acks[0x2]], 1556 _acktype[acks[0x3]], 1557 _acktype[acks[0x4]], 1558 _acktype[acks[0x5]], 1559 _acktype[acks[0x6]], 1560 _acktype[acks[0x7]], 1561 _acktype[acks[0x8]], 1562 _acktype[acks[0x9]], 1563 _acktype[acks[0xA]], 1564 _acktype[acks[0xB]], 1565 _acktype[acks[0xC]], 1566 _acktype[acks[0xD]], 1567 _acktype[acks[0xE]], 1568 _acktype[acks[0xF]], 1569 call->acks_pend_cnt 1570 ); 1571 1572 /* mark the packets in the ACK queue as being provisionally 1573 * ACK'd */ 1574 ix = 0; 1575 spin_lock(&call->lock); 1576 1577 /* find the first packet ACK'd/NAK'd here */ 1578 list_for_each(_p, &call->acks_pendq) { 1579 dmsg = list_entry(_p, struct rxrpc_message, link); 1580 if (dmsg->seq == seq) 1581 goto found_first; 1582 _debug("- %u: skipping #%u", ix, dmsg->seq); 1583 } 1584 goto bad_queue; 1585 1586 found_first: 1587 do { 1588 _debug("- %u: processing #%u (%c) apc=%u", 1589 ix, dmsg->seq, _acktype[acks[ix]], 1590 call->acks_pend_cnt); 1591 1592 if (acks[ix] == RXRPC_ACK_TYPE_ACK) { 1593 if (dmsg->state == RXRPC_MSG_SENT) 1594 call->acks_pend_cnt--; 1595 dmsg->state = RXRPC_MSG_ACKED; 1596 } 1597 else { 1598 if (dmsg->state == RXRPC_MSG_ACKED) 1599 call->acks_pend_cnt++; 1600 dmsg->state = RXRPC_MSG_SENT; 1601 } 1602 ix++; 1603 seq++; 1604 1605 _p = dmsg->link.next; 1606 dmsg = list_entry(_p, struct rxrpc_message, link); 1607 } while(ix < chunk && 1608 _p != &call->acks_pendq && 1609 dmsg->seq == seq); 1610 1611 if (ix < chunk) 1612 goto bad_queue; 1613 1614 spin_unlock(&call->lock); 1615 } 1616 1617 if (resend) 1618 rxrpc_call_resend(call, highest); 1619 1620 /* if all packets are provisionally ACK'd, then wake up anyone who's 1621 * waiting for that */ 1622 now_complete = 0; 1623 spin_lock(&call->lock); 1624 if (call->acks_pend_cnt == 0) { 1625 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_FINAL_ACK) { 1626 call->app_call_state = RXRPC_CSTATE_COMPLETE; 1627 _state(call); 1628 } 1629 now_complete = 1; 1630 } 1631 spin_unlock(&call->lock); 1632 1633 if (now_complete) { 1634 _debug("- wake up waiters"); 1635 del_timer_sync(&call->acks_timeout); 1636 del_timer_sync(&call->rcv_timeout); 1637 del_timer_sync(&call->ackr_dfr_timo); 1638 call->app_attn_func(call); 1639 } 1640 1641 _leave(" = 0 (apc=%u)", call->acks_pend_cnt); 1642 return 0; 1643 1644 bad_queue: 1645 panic("%s(): acks_pendq in bad state (packet #%u absent)\n", 1646 __FUNCTION__, seq); 1647 1648} /* end rxrpc_call_record_ACK() */ 1649 1650/*****************************************************************************/ 1651/* 1652 * transfer data from the ready packet queue to the asynchronous read buffer 1653 * - since this func is the only one going to look at packets queued on 1654 * app_readyq, we don't need a lock to modify or access them, only to modify 1655 * the queue pointers 1656 * - called with call->lock held 1657 * - the buffer must be in kernel space 1658 * - returns: 1659 * 0 if buffer filled 1660 * -EAGAIN if buffer not filled and more data to come 1661 * -EBADMSG if last packet received and insufficient data left 1662 * -ECONNABORTED if the call has in an error state 1663 */ 1664static int __rxrpc_call_read_data(struct rxrpc_call *call) 1665{ 1666 struct rxrpc_message *msg; 1667 size_t qty; 1668 int ret; 1669 1670 _enter("%p{as=%d buf=%p qty=%Zu/%Zu}", 1671 call, 1672 call->app_async_read, call->app_read_buf, 1673 call->app_ready_qty, call->app_mark); 1674 1675 /* check the state */ 1676 switch (call->app_call_state) { 1677 case RXRPC_CSTATE_SRVR_RCV_ARGS: 1678 case RXRPC_CSTATE_CLNT_RCV_REPLY: 1679 if (call->app_last_rcv) { 1680 printk("%s(%p,%p,%Zd):" 1681 " Inconsistent call state (%s, last pkt)", 1682 __FUNCTION__, 1683 call, call->app_read_buf, call->app_mark, 1684 rxrpc_call_states[call->app_call_state]); 1685 BUG(); 1686 } 1687 break; 1688 1689 case RXRPC_CSTATE_SRVR_RCV_OPID: 1690 case RXRPC_CSTATE_SRVR_GOT_ARGS: 1691 case RXRPC_CSTATE_CLNT_GOT_REPLY: 1692 break; 1693 1694 case RXRPC_CSTATE_SRVR_SND_REPLY: 1695 if (!call->app_last_rcv) { 1696 printk("%s(%p,%p,%Zd):" 1697 " Inconsistent call state (%s, not last pkt)", 1698 __FUNCTION__, 1699 call, call->app_read_buf, call->app_mark, 1700 rxrpc_call_states[call->app_call_state]); 1701 BUG(); 1702 } 1703 _debug("Trying to read data from call in SND_REPLY state"); 1704 break; 1705 1706 case RXRPC_CSTATE_ERROR: 1707 _leave(" = -ECONNABORTED"); 1708 return -ECONNABORTED; 1709 1710 default: 1711 printk("reading in unexpected state [[[ %u ]]]\n", 1712 call->app_call_state); 1713 BUG(); 1714 } 1715 1716 /* handle the case of not having an async buffer */ 1717 if (!call->app_async_read) { 1718 if (call->app_mark == RXRPC_APP_MARK_EOF) { 1719 ret = call->app_last_rcv ? 0 : -EAGAIN; 1720 } 1721 else { 1722 if (call->app_mark >= call->app_ready_qty) { 1723 call->app_mark = RXRPC_APP_MARK_EOF; 1724 ret = 0; 1725 } 1726 else { 1727 ret = call->app_last_rcv ? -EBADMSG : -EAGAIN; 1728 } 1729 } 1730 1731 _leave(" = %d [no buf]", ret); 1732 return 0; 1733 } 1734 1735 while (!list_empty(&call->app_readyq) && call->app_mark > 0) { 1736 msg = list_entry(call->app_readyq.next, 1737 struct rxrpc_message, link); 1738 1739 /* drag as much data as we need out of this packet */ 1740 qty = min(call->app_mark, msg->dsize); 1741 1742 _debug("reading %Zu from skb=%p off=%lu", 1743 qty, msg->pkt, msg->offset); 1744 1745 if (call->app_read_buf) 1746 if (skb_copy_bits(msg->pkt, msg->offset, 1747 call->app_read_buf, qty) < 0) 1748 panic("%s: Failed to copy data from packet:" 1749 " (%p,%p,%Zd)", 1750 __FUNCTION__, 1751 call, call->app_read_buf, qty); 1752 1753 /* if that packet is now empty, discard it */ 1754 call->app_ready_qty -= qty; 1755 msg->dsize -= qty; 1756 1757 if (msg->dsize == 0) { 1758 list_del_init(&msg->link); 1759 rxrpc_put_message(msg); 1760 } 1761 else { 1762 msg->offset += qty; 1763 } 1764 1765 call->app_mark -= qty; 1766 if (call->app_read_buf) 1767 call->app_read_buf += qty; 1768 } 1769 1770 if (call->app_mark == 0) { 1771 call->app_async_read = 0; 1772 call->app_mark = RXRPC_APP_MARK_EOF; 1773 call->app_read_buf = NULL; 1774 1775 /* adjust the state if used up all packets */ 1776 if (list_empty(&call->app_readyq) && call->app_last_rcv) { 1777 switch (call->app_call_state) { 1778 case RXRPC_CSTATE_SRVR_RCV_OPID: 1779 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; 1780 call->app_mark = RXRPC_APP_MARK_EOF; 1781 _state(call); 1782 del_timer_sync(&call->rcv_timeout); 1783 break; 1784 case RXRPC_CSTATE_SRVR_GOT_ARGS: 1785 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; 1786 _state(call); 1787 del_timer_sync(&call->rcv_timeout); 1788 break; 1789 default: 1790 call->app_call_state = RXRPC_CSTATE_COMPLETE; 1791 _state(call); 1792 del_timer_sync(&call->acks_timeout); 1793 del_timer_sync(&call->ackr_dfr_timo); 1794 del_timer_sync(&call->rcv_timeout); 1795 break; 1796 } 1797 } 1798 1799 _leave(" = 0"); 1800 return 0; 1801 } 1802 1803 if (call->app_last_rcv) { 1804 _debug("Insufficient data (%Zu/%Zu)", 1805 call->app_ready_qty, call->app_mark); 1806 call->app_async_read = 0; 1807 call->app_mark = RXRPC_APP_MARK_EOF; 1808 call->app_read_buf = NULL; 1809 1810 _leave(" = -EBADMSG"); 1811 return -EBADMSG; 1812 } 1813 1814 _leave(" = -EAGAIN"); 1815 return -EAGAIN; 1816} /* end __rxrpc_call_read_data() */ 1817 1818/*****************************************************************************/ 1819/* 1820 * attempt to read the specified amount of data from the call's ready queue 1821 * into the buffer provided 1822 * - since this func is the only one going to look at packets queued on 1823 * app_readyq, we don't need a lock to modify or access them, only to modify 1824 * the queue pointers 1825 * - if the buffer pointer is NULL, then data is merely drained, not copied 1826 * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is 1827 * enough data or an error will be generated 1828 * - note that the caller must have added the calling task to the call's wait 1829 * queue beforehand 1830 * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this 1831 * function doesn't read all available data 1832 */ 1833int rxrpc_call_read_data(struct rxrpc_call *call, 1834 void *buffer, size_t size, int flags) 1835{ 1836 int ret; 1837 1838 _enter("%p{arq=%Zu},%p,%Zd,%x", 1839 call, call->app_ready_qty, buffer, size, flags); 1840 1841 spin_lock(&call->lock); 1842 1843 if (unlikely(!!call->app_read_buf)) { 1844 spin_unlock(&call->lock); 1845 _leave(" = -EBUSY"); 1846 return -EBUSY; 1847 } 1848 1849 call->app_mark = size; 1850 call->app_read_buf = buffer; 1851 call->app_async_read = 1; 1852 call->app_read_count++; 1853 1854 /* read as much data as possible */ 1855 ret = __rxrpc_call_read_data(call); 1856 switch (ret) { 1857 case 0: 1858 if (flags & RXRPC_CALL_READ_ALL && 1859 (!call->app_last_rcv || call->app_ready_qty > 0)) { 1860 _leave(" = -EBADMSG"); 1861 __rxrpc_call_abort(call, -EBADMSG); 1862 return -EBADMSG; 1863 } 1864 1865 spin_unlock(&call->lock); 1866 call->app_attn_func(call); 1867 _leave(" = 0"); 1868 return ret; 1869 1870 case -ECONNABORTED: 1871 spin_unlock(&call->lock); 1872 _leave(" = %d [aborted]", ret); 1873 return ret; 1874 1875 default: 1876 __rxrpc_call_abort(call, ret); 1877 _leave(" = %d", ret); 1878 return ret; 1879 1880 case -EAGAIN: 1881 spin_unlock(&call->lock); 1882 1883 if (!(flags & RXRPC_CALL_READ_BLOCK)) { 1884 _leave(" = -EAGAIN"); 1885 return -EAGAIN; 1886 } 1887 1888 /* wait for the data to arrive */ 1889 _debug("blocking for data arrival"); 1890 1891 for (;;) { 1892 set_current_state(TASK_INTERRUPTIBLE); 1893 if (!call->app_async_read || signal_pending(current)) 1894 break; 1895 schedule(); 1896 } 1897 set_current_state(TASK_RUNNING); 1898 1899 if (signal_pending(current)) { 1900 _leave(" = -EINTR"); 1901 return -EINTR; 1902 } 1903 1904 if (call->app_call_state == RXRPC_CSTATE_ERROR) { 1905 _leave(" = -ECONNABORTED"); 1906 return -ECONNABORTED; 1907 } 1908 1909 _leave(" = 0"); 1910 return 0; 1911 } 1912 1913} /* end rxrpc_call_read_data() */ 1914 1915/*****************************************************************************/ 1916/* 1917 * write data to a call 1918 * - the data may not be sent immediately if it doesn't fill a buffer 1919 * - if we can't queue all the data for buffering now, siov[] will have been 1920 * adjusted to take account of what has been sent 1921 */ 1922int rxrpc_call_write_data(struct rxrpc_call *call, 1923 size_t sioc, 1924 struct kvec *siov, 1925 u8 rxhdr_flags, 1926 gfp_t alloc_flags, 1927 int dup_data, 1928 size_t *size_sent) 1929{ 1930 struct rxrpc_message *msg; 1931 struct kvec *sptr; 1932 size_t space, size, chunk, tmp; 1933 char *buf; 1934 int ret; 1935 1936 _enter("%p,%Zu,%p,%02x,%x,%d,%p", 1937 call, sioc, siov, rxhdr_flags, alloc_flags, dup_data, 1938 size_sent); 1939 1940 *size_sent = 0; 1941 size = 0; 1942 ret = -EINVAL; 1943 1944 /* can't send more if we've sent last packet from this end */ 1945 switch (call->app_call_state) { 1946 case RXRPC_CSTATE_SRVR_SND_REPLY: 1947 case RXRPC_CSTATE_CLNT_SND_ARGS: 1948 break; 1949 case RXRPC_CSTATE_ERROR: 1950 ret = call->app_errno; 1951 default: 1952 goto out; 1953 } 1954 1955 /* calculate how much data we've been given */ 1956 sptr = siov; 1957 for (; sioc > 0; sptr++, sioc--) { 1958 if (!sptr->iov_len) 1959 continue; 1960 1961 if (!sptr->iov_base) 1962 goto out; 1963 1964 size += sptr->iov_len; 1965 } 1966 1967 _debug("- size=%Zu mtu=%Zu", size, call->conn->mtu_size); 1968 1969 do { 1970 /* make sure there's a message under construction */ 1971 if (!call->snd_nextmsg) { 1972 /* no - allocate a message with no data yet attached */ 1973 ret = rxrpc_conn_newmsg(call->conn, call, 1974 RXRPC_PACKET_TYPE_DATA, 1975 0, NULL, alloc_flags, 1976 &call->snd_nextmsg); 1977 if (ret < 0) 1978 goto out; 1979 _debug("- allocated new message [ds=%Zu]", 1980 call->snd_nextmsg->dsize); 1981 } 1982 1983 msg = call->snd_nextmsg; 1984 msg->hdr.flags |= rxhdr_flags; 1985 1986 /* deal with zero-length terminal packet */ 1987 if (size == 0) { 1988 if (rxhdr_flags & RXRPC_LAST_PACKET) { 1989 ret = rxrpc_call_flush(call); 1990 if (ret < 0) 1991 goto out; 1992 } 1993 break; 1994 } 1995 1996 /* work out how much space current packet has available */ 1997 space = call->conn->mtu_size - msg->dsize; 1998 chunk = min(space, size); 1999 2000 _debug("- [before] space=%Zu chunk=%Zu", space, chunk); 2001 2002 while (!siov->iov_len) 2003 siov++; 2004 2005 /* if we are going to have to duplicate the data then coalesce 2006 * it too */ 2007 if (dup_data) { 2008 /* don't allocate more that 1 page at a time */ 2009 if (chunk > PAGE_SIZE) 2010 chunk = PAGE_SIZE; 2011 2012 /* allocate a data buffer and attach to the message */ 2013 buf = kmalloc(chunk, alloc_flags); 2014 if (unlikely(!buf)) { 2015 if (msg->dsize == 2016 sizeof(struct rxrpc_header)) { 2017 /* discard an empty msg and wind back 2018 * the seq counter */ 2019 rxrpc_put_message(msg); 2020 call->snd_nextmsg = NULL; 2021 call->snd_seq_count--; 2022 } 2023 2024 ret = -ENOMEM; 2025 goto out; 2026 } 2027 2028 tmp = msg->dcount++; 2029 set_bit(tmp, &msg->dfree); 2030 msg->data[tmp].iov_base = buf; 2031 msg->data[tmp].iov_len = chunk; 2032 msg->dsize += chunk; 2033 *size_sent += chunk; 2034 size -= chunk; 2035 2036 /* load the buffer with data */ 2037 while (chunk > 0) { 2038 tmp = min(chunk, siov->iov_len); 2039 memcpy(buf, siov->iov_base, tmp); 2040 buf += tmp; 2041 siov->iov_base += tmp; 2042 siov->iov_len -= tmp; 2043 if (!siov->iov_len) 2044 siov++; 2045 chunk -= tmp; 2046 } 2047 } 2048 else { 2049 /* we want to attach the supplied buffers directly */ 2050 while (chunk > 0 && 2051 msg->dcount < RXRPC_MSG_MAX_IOCS) { 2052 tmp = msg->dcount++; 2053 msg->data[tmp].iov_base = siov->iov_base; 2054 msg->data[tmp].iov_len = siov->iov_len; 2055 msg->dsize += siov->iov_len; 2056 *size_sent += siov->iov_len; 2057 size -= siov->iov_len; 2058 chunk -= siov->iov_len; 2059 siov++; 2060 } 2061 } 2062 2063 _debug("- [loaded] chunk=%Zu size=%Zu", chunk, size); 2064 2065 /* dispatch the message when full, final or requesting ACK */ 2066 if (msg->dsize >= call->conn->mtu_size || rxhdr_flags) { 2067 ret = rxrpc_call_flush(call); 2068 if (ret < 0) 2069 goto out; 2070 } 2071 2072 } while(size > 0); 2073 2074 ret = 0; 2075 out: 2076 _leave(" = %d (%Zd queued, %Zd rem)", ret, *size_sent, size); 2077 return ret; 2078 2079} /* end rxrpc_call_write_data() */ 2080 2081/*****************************************************************************/ 2082/* 2083 * flush outstanding packets to the network 2084 */ 2085static int rxrpc_call_flush(struct rxrpc_call *call) 2086{ 2087 struct rxrpc_message *msg; 2088 int ret = 0; 2089 2090 _enter("%p", call); 2091 2092 rxrpc_get_call(call); 2093 2094 /* if there's a packet under construction, then dispatch it now */ 2095 if (call->snd_nextmsg) { 2096 msg = call->snd_nextmsg; 2097 call->snd_nextmsg = NULL; 2098 2099 if (msg->hdr.flags & RXRPC_LAST_PACKET) { 2100 msg->hdr.flags &= ~RXRPC_MORE_PACKETS; 2101 if (call->app_call_state != RXRPC_CSTATE_CLNT_SND_ARGS) 2102 msg->hdr.flags |= RXRPC_REQUEST_ACK; 2103 } 2104 else { 2105 msg->hdr.flags |= RXRPC_MORE_PACKETS; 2106 } 2107 2108 _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }", 2109 msg->dsize, msg->dcount, msg->dfree); 2110 2111 /* queue and adjust call state */ 2112 spin_lock(&call->lock); 2113 list_add_tail(&msg->link, &call->acks_pendq); 2114 2115 /* decide what to do depending on current state and if this is 2116 * the last packet */ 2117 ret = -EINVAL; 2118 switch (call->app_call_state) { 2119 case RXRPC_CSTATE_SRVR_SND_REPLY: 2120 if (msg->hdr.flags & RXRPC_LAST_PACKET) { 2121 call->app_call_state = 2122 RXRPC_CSTATE_SRVR_RCV_FINAL_ACK; 2123 _state(call); 2124 } 2125 break; 2126 2127 case RXRPC_CSTATE_CLNT_SND_ARGS: 2128 if (msg->hdr.flags & RXRPC_LAST_PACKET) { 2129 call->app_call_state = 2130 RXRPC_CSTATE_CLNT_RCV_REPLY; 2131 _state(call); 2132 } 2133 break; 2134 2135 case RXRPC_CSTATE_ERROR: 2136 ret = call->app_errno; 2137 default: 2138 spin_unlock(&call->lock); 2139 goto out; 2140 } 2141 2142 call->acks_pend_cnt++; 2143 2144 mod_timer(&call->acks_timeout, 2145 __rxrpc_rtt_based_timeout(call, 2146 rxrpc_call_acks_timeout)); 2147 2148 spin_unlock(&call->lock); 2149 2150 ret = rxrpc_conn_sendmsg(call->conn, msg); 2151 if (ret == 0) 2152 call->pkt_snd_count++; 2153 } 2154 2155 out: 2156 rxrpc_put_call(call); 2157 2158 _leave(" = %d", ret); 2159 return ret; 2160 2161} /* end rxrpc_call_flush() */ 2162 2163/*****************************************************************************/ 2164/* 2165 * resend NAK'd or unacknowledged packets up to the highest one specified 2166 */ 2167static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest) 2168{ 2169 struct rxrpc_message *msg; 2170 struct list_head *_p; 2171 rxrpc_seq_t seq = 0; 2172 2173 _enter("%p,%u", call, highest); 2174 2175 _proto("Rx Resend required"); 2176 2177 /* handle too many resends */ 2178 if (call->snd_resend_cnt >= rxrpc_call_max_resend) { 2179 _debug("Aborting due to too many resends (rcv=%d)", 2180 call->pkt_rcv_count); 2181 rxrpc_call_abort(call, 2182 call->pkt_rcv_count > 0 ? -EIO : -ETIMEDOUT); 2183 _leave(""); 2184 return; 2185 } 2186 2187 spin_lock(&call->lock); 2188 call->snd_resend_cnt++; 2189 for (;;) { 2190 /* determine which the next packet we might need to ACK is */ 2191 if (seq <= call->acks_dftv_seq) 2192 seq = call->acks_dftv_seq; 2193 seq++; 2194 2195 if (seq > highest) 2196 break; 2197 2198 /* look for the packet in the pending-ACK queue */ 2199 list_for_each(_p, &call->acks_pendq) { 2200 msg = list_entry(_p, struct rxrpc_message, link); 2201 if (msg->seq == seq) 2202 goto found_msg; 2203 } 2204 2205 panic("%s(%p,%d):" 2206 " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n", 2207 __FUNCTION__, call, highest, 2208 call->acks_dftv_seq, call->snd_seq_count, seq); 2209 2210 found_msg: 2211 if (msg->state != RXRPC_MSG_SENT) 2212 continue; /* only un-ACK'd packets */ 2213 2214 rxrpc_get_message(msg); 2215 spin_unlock(&call->lock); 2216 2217 /* send each message again (and ignore any errors we might 2218 * incur) */ 2219 _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }", 2220 msg->dsize, msg->dcount, msg->dfree); 2221 2222 if (rxrpc_conn_sendmsg(call->conn, msg) == 0) 2223 call->pkt_snd_count++; 2224 2225 rxrpc_put_message(msg); 2226 2227 spin_lock(&call->lock); 2228 } 2229 2230 /* reset the timeout */ 2231 mod_timer(&call->acks_timeout, 2232 __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout)); 2233 2234 spin_unlock(&call->lock); 2235 2236 _leave(""); 2237} /* end rxrpc_call_resend() */ 2238 2239/*****************************************************************************/ 2240/* 2241 * handle an ICMP error being applied to a call 2242 */ 2243void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno) 2244{ 2245 _enter("%p{%u},%d", call, ntohl(call->call_id), errno); 2246 2247 /* if this call is already aborted, then just wake up any waiters */ 2248 if (call->app_call_state == RXRPC_CSTATE_ERROR) { 2249 call->app_error_func(call); 2250 } 2251 else { 2252 /* tell the app layer what happened */ 2253 spin_lock(&call->lock); 2254 call->app_call_state = RXRPC_CSTATE_ERROR; 2255 _state(call); 2256 if (local) 2257 call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR; 2258 else 2259 call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR; 2260 call->app_errno = errno; 2261 call->app_mark = RXRPC_APP_MARK_EOF; 2262 call->app_read_buf = NULL; 2263 call->app_async_read = 0; 2264 2265 /* map the error */ 2266 call->app_aemap_func(call); 2267 2268 del_timer_sync(&call->acks_timeout); 2269 del_timer_sync(&call->rcv_timeout); 2270 del_timer_sync(&call->ackr_dfr_timo); 2271 2272 spin_unlock(&call->lock); 2273 2274 call->app_error_func(call); 2275 } 2276 2277 _leave(""); 2278} /* end rxrpc_call_handle_error() */