at v2.6.19 2277 lines 58 kB view raw
1/* call.c: Rx call routines 2 * 3 * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12#include <linux/sched.h> 13#include <linux/slab.h> 14#include <linux/module.h> 15#include <rxrpc/rxrpc.h> 16#include <rxrpc/transport.h> 17#include <rxrpc/peer.h> 18#include <rxrpc/connection.h> 19#include <rxrpc/call.h> 20#include <rxrpc/message.h> 21#include "internal.h" 22 23__RXACCT_DECL(atomic_t rxrpc_call_count); 24__RXACCT_DECL(atomic_t rxrpc_message_count); 25 26LIST_HEAD(rxrpc_calls); 27DECLARE_RWSEM(rxrpc_calls_sem); 28 29unsigned rxrpc_call_rcv_timeout = HZ/3; 30static unsigned rxrpc_call_acks_timeout = HZ/3; 31static unsigned rxrpc_call_dfr_ack_timeout = HZ/20; 32static unsigned short rxrpc_call_max_resend = HZ/10; 33 34const char *rxrpc_call_states[] = { 35 "COMPLETE", 36 "ERROR", 37 "SRVR_RCV_OPID", 38 "SRVR_RCV_ARGS", 39 "SRVR_GOT_ARGS", 40 "SRVR_SND_REPLY", 41 "SRVR_RCV_FINAL_ACK", 42 "CLNT_SND_ARGS", 43 "CLNT_RCV_REPLY", 44 "CLNT_GOT_REPLY" 45}; 46 47const char *rxrpc_call_error_states[] = { 48 "NO_ERROR", 49 "LOCAL_ABORT", 50 "PEER_ABORT", 51 "LOCAL_ERROR", 52 "REMOTE_ERROR" 53}; 54 55const char *rxrpc_pkts[] = { 56 "?00", 57 "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug", 58 "?09", "?10", "?11", "?12", "?13", "?14", "?15" 59}; 60 61static const char *rxrpc_acks[] = { 62 "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL", 63 "-?-" 64}; 65 66static const char _acktype[] = "NA-"; 67 68static void rxrpc_call_receive_packet(struct rxrpc_call *call); 69static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, 70 struct rxrpc_message *msg); 71static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, 72 struct rxrpc_message *msg); 73static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, 74 rxrpc_seq_t higest); 75static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest); 76static int __rxrpc_call_read_data(struct rxrpc_call *call); 77 78static int rxrpc_call_record_ACK(struct rxrpc_call *call, 79 struct rxrpc_message *msg, 80 rxrpc_seq_t seq, 81 size_t count); 82 83static int rxrpc_call_flush(struct rxrpc_call *call); 84 85#define _state(call) \ 86 _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]); 87 88static void rxrpc_call_default_attn_func(struct rxrpc_call *call) 89{ 90 wake_up(&call->waitq); 91} 92 93static void rxrpc_call_default_error_func(struct rxrpc_call *call) 94{ 95 wake_up(&call->waitq); 96} 97 98static void rxrpc_call_default_aemap_func(struct rxrpc_call *call) 99{ 100 switch (call->app_err_state) { 101 case RXRPC_ESTATE_LOCAL_ABORT: 102 call->app_abort_code = -call->app_errno; 103 case RXRPC_ESTATE_PEER_ABORT: 104 call->app_errno = -ECONNABORTED; 105 default: 106 break; 107 } 108} 109 110static void __rxrpc_call_acks_timeout(unsigned long _call) 111{ 112 struct rxrpc_call *call = (struct rxrpc_call *) _call; 113 114 _debug("ACKS TIMEOUT %05lu", jiffies - call->cjif); 115 116 call->flags |= RXRPC_CALL_ACKS_TIMO; 117 rxrpc_krxiod_queue_call(call); 118} 119 120static void __rxrpc_call_rcv_timeout(unsigned long _call) 121{ 122 struct rxrpc_call *call = (struct rxrpc_call *) _call; 123 124 _debug("RCV TIMEOUT %05lu", jiffies - call->cjif); 125 126 call->flags |= RXRPC_CALL_RCV_TIMO; 127 rxrpc_krxiod_queue_call(call); 128} 129 130static void __rxrpc_call_ackr_timeout(unsigned long _call) 131{ 132 struct rxrpc_call *call = (struct rxrpc_call *) _call; 133 134 _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif); 135 136 call->flags |= RXRPC_CALL_ACKR_TIMO; 137 rxrpc_krxiod_queue_call(call); 138} 139 140/*****************************************************************************/ 141/* 142 * calculate a timeout based on an RTT value 143 */ 144static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call, 145 unsigned long val) 146{ 147 unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ); 148 149 expiry += 10; 150 if (expiry < HZ / 25) 151 expiry = HZ / 25; 152 if (expiry > HZ) 153 expiry = HZ; 154 155 _leave(" = %lu jiffies", expiry); 156 return jiffies + expiry; 157} /* end __rxrpc_rtt_based_timeout() */ 158 159/*****************************************************************************/ 160/* 161 * create a new call record 162 */ 163static inline int __rxrpc_create_call(struct rxrpc_connection *conn, 164 struct rxrpc_call **_call) 165{ 166 struct rxrpc_call *call; 167 168 _enter("%p", conn); 169 170 /* allocate and initialise a call record */ 171 call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL); 172 if (!call) { 173 _leave(" ENOMEM"); 174 return -ENOMEM; 175 } 176 177 atomic_set(&call->usage, 1); 178 179 init_waitqueue_head(&call->waitq); 180 spin_lock_init(&call->lock); 181 INIT_LIST_HEAD(&call->link); 182 INIT_LIST_HEAD(&call->acks_pendq); 183 INIT_LIST_HEAD(&call->rcv_receiveq); 184 INIT_LIST_HEAD(&call->rcv_krxiodq_lk); 185 INIT_LIST_HEAD(&call->app_readyq); 186 INIT_LIST_HEAD(&call->app_unreadyq); 187 INIT_LIST_HEAD(&call->app_link); 188 INIT_LIST_HEAD(&call->app_attn_link); 189 190 init_timer(&call->acks_timeout); 191 call->acks_timeout.data = (unsigned long) call; 192 call->acks_timeout.function = __rxrpc_call_acks_timeout; 193 194 init_timer(&call->rcv_timeout); 195 call->rcv_timeout.data = (unsigned long) call; 196 call->rcv_timeout.function = __rxrpc_call_rcv_timeout; 197 198 init_timer(&call->ackr_dfr_timo); 199 call->ackr_dfr_timo.data = (unsigned long) call; 200 call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout; 201 202 call->conn = conn; 203 call->ackr_win_bot = 1; 204 call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1; 205 call->ackr_prev_seq = 0; 206 call->app_mark = RXRPC_APP_MARK_EOF; 207 call->app_attn_func = rxrpc_call_default_attn_func; 208 call->app_error_func = rxrpc_call_default_error_func; 209 call->app_aemap_func = rxrpc_call_default_aemap_func; 210 call->app_scr_alloc = call->app_scratch; 211 212 call->cjif = jiffies; 213 214 _leave(" = 0 (%p)", call); 215 216 *_call = call; 217 218 return 0; 219} /* end __rxrpc_create_call() */ 220 221/*****************************************************************************/ 222/* 223 * create a new call record for outgoing calls 224 */ 225int rxrpc_create_call(struct rxrpc_connection *conn, 226 rxrpc_call_attn_func_t attn, 227 rxrpc_call_error_func_t error, 228 rxrpc_call_aemap_func_t aemap, 229 struct rxrpc_call **_call) 230{ 231 DECLARE_WAITQUEUE(myself, current); 232 233 struct rxrpc_call *call; 234 int ret, cix, loop; 235 236 _enter("%p", conn); 237 238 /* allocate and initialise a call record */ 239 ret = __rxrpc_create_call(conn, &call); 240 if (ret < 0) { 241 _leave(" = %d", ret); 242 return ret; 243 } 244 245 call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS; 246 if (attn) 247 call->app_attn_func = attn; 248 if (error) 249 call->app_error_func = error; 250 if (aemap) 251 call->app_aemap_func = aemap; 252 253 _state(call); 254 255 spin_lock(&conn->lock); 256 set_current_state(TASK_INTERRUPTIBLE); 257 add_wait_queue(&conn->chanwait, &myself); 258 259 try_again: 260 /* try to find an unused channel */ 261 for (cix = 0; cix < 4; cix++) 262 if (!conn->channels[cix]) 263 goto obtained_chan; 264 265 /* no free channels - wait for one to become available */ 266 ret = -EINTR; 267 if (signal_pending(current)) 268 goto error_unwait; 269 270 spin_unlock(&conn->lock); 271 272 schedule(); 273 set_current_state(TASK_INTERRUPTIBLE); 274 275 spin_lock(&conn->lock); 276 goto try_again; 277 278 /* got a channel - now attach to the connection */ 279 obtained_chan: 280 remove_wait_queue(&conn->chanwait, &myself); 281 set_current_state(TASK_RUNNING); 282 283 /* concoct a unique call number */ 284 next_callid: 285 call->call_id = htonl(++conn->call_counter); 286 for (loop = 0; loop < 4; loop++) 287 if (conn->channels[loop] && 288 conn->channels[loop]->call_id == call->call_id) 289 goto next_callid; 290 291 rxrpc_get_connection(conn); 292 conn->channels[cix] = call; /* assign _after_ done callid check loop */ 293 do_gettimeofday(&conn->atime); 294 call->chan_ix = htonl(cix); 295 296 spin_unlock(&conn->lock); 297 298 down_write(&rxrpc_calls_sem); 299 list_add_tail(&call->call_link, &rxrpc_calls); 300 up_write(&rxrpc_calls_sem); 301 302 __RXACCT(atomic_inc(&rxrpc_call_count)); 303 *_call = call; 304 305 _leave(" = 0 (call=%p cix=%u)", call, cix); 306 return 0; 307 308 error_unwait: 309 remove_wait_queue(&conn->chanwait, &myself); 310 set_current_state(TASK_RUNNING); 311 spin_unlock(&conn->lock); 312 313 free_page((unsigned long) call); 314 _leave(" = %d", ret); 315 return ret; 316} /* end rxrpc_create_call() */ 317 318/*****************************************************************************/ 319/* 320 * create a new call record for incoming calls 321 */ 322int rxrpc_incoming_call(struct rxrpc_connection *conn, 323 struct rxrpc_message *msg, 324 struct rxrpc_call **_call) 325{ 326 struct rxrpc_call *call; 327 unsigned cix; 328 int ret; 329 330 cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK; 331 332 _enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix); 333 334 /* allocate and initialise a call record */ 335 ret = __rxrpc_create_call(conn, &call); 336 if (ret < 0) { 337 _leave(" = %d", ret); 338 return ret; 339 } 340 341 call->pkt_rcv_count = 1; 342 call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID; 343 call->app_mark = sizeof(uint32_t); 344 345 _state(call); 346 347 /* attach to the connection */ 348 ret = -EBUSY; 349 call->chan_ix = htonl(cix); 350 call->call_id = msg->hdr.callNumber; 351 352 spin_lock(&conn->lock); 353 354 if (!conn->channels[cix] || 355 conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE || 356 conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR 357 ) { 358 conn->channels[cix] = call; 359 rxrpc_get_connection(conn); 360 ret = 0; 361 } 362 363 spin_unlock(&conn->lock); 364 365 if (ret < 0) { 366 free_page((unsigned long) call); 367 call = NULL; 368 } 369 370 if (ret == 0) { 371 down_write(&rxrpc_calls_sem); 372 list_add_tail(&call->call_link, &rxrpc_calls); 373 up_write(&rxrpc_calls_sem); 374 __RXACCT(atomic_inc(&rxrpc_call_count)); 375 *_call = call; 376 } 377 378 _leave(" = %d [%p]", ret, call); 379 return ret; 380} /* end rxrpc_incoming_call() */ 381 382/*****************************************************************************/ 383/* 384 * free a call record 385 */ 386void rxrpc_put_call(struct rxrpc_call *call) 387{ 388 struct rxrpc_connection *conn = call->conn; 389 struct rxrpc_message *msg; 390 391 _enter("%p{u=%d}",call,atomic_read(&call->usage)); 392 393 /* sanity check */ 394 if (atomic_read(&call->usage) <= 0) 395 BUG(); 396 397 /* to prevent a race, the decrement and the de-list must be effectively 398 * atomic */ 399 spin_lock(&conn->lock); 400 if (likely(!atomic_dec_and_test(&call->usage))) { 401 spin_unlock(&conn->lock); 402 _leave(""); 403 return; 404 } 405 406 if (conn->channels[ntohl(call->chan_ix)] == call) 407 conn->channels[ntohl(call->chan_ix)] = NULL; 408 409 spin_unlock(&conn->lock); 410 411 wake_up(&conn->chanwait); 412 413 rxrpc_put_connection(conn); 414 415 /* clear the timers and dequeue from krxiod */ 416 del_timer_sync(&call->acks_timeout); 417 del_timer_sync(&call->rcv_timeout); 418 del_timer_sync(&call->ackr_dfr_timo); 419 420 rxrpc_krxiod_dequeue_call(call); 421 422 /* clean up the contents of the struct */ 423 if (call->snd_nextmsg) 424 rxrpc_put_message(call->snd_nextmsg); 425 426 if (call->snd_ping) 427 rxrpc_put_message(call->snd_ping); 428 429 while (!list_empty(&call->acks_pendq)) { 430 msg = list_entry(call->acks_pendq.next, 431 struct rxrpc_message, link); 432 list_del(&msg->link); 433 rxrpc_put_message(msg); 434 } 435 436 while (!list_empty(&call->rcv_receiveq)) { 437 msg = list_entry(call->rcv_receiveq.next, 438 struct rxrpc_message, link); 439 list_del(&msg->link); 440 rxrpc_put_message(msg); 441 } 442 443 while (!list_empty(&call->app_readyq)) { 444 msg = list_entry(call->app_readyq.next, 445 struct rxrpc_message, link); 446 list_del(&msg->link); 447 rxrpc_put_message(msg); 448 } 449 450 while (!list_empty(&call->app_unreadyq)) { 451 msg = list_entry(call->app_unreadyq.next, 452 struct rxrpc_message, link); 453 list_del(&msg->link); 454 rxrpc_put_message(msg); 455 } 456 457 module_put(call->owner); 458 459 down_write(&rxrpc_calls_sem); 460 list_del(&call->call_link); 461 up_write(&rxrpc_calls_sem); 462 463 __RXACCT(atomic_dec(&rxrpc_call_count)); 464 free_page((unsigned long) call); 465 466 _leave(" [destroyed]"); 467} /* end rxrpc_put_call() */ 468 469/*****************************************************************************/ 470/* 471 * actually generate a normal ACK 472 */ 473static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call, 474 rxrpc_seq_t seq) 475{ 476 struct rxrpc_message *msg; 477 struct kvec diov[3]; 478 __be32 aux[4]; 479 int delta, ret; 480 481 /* ACKs default to DELAY */ 482 if (!call->ackr.reason) 483 call->ackr.reason = RXRPC_ACK_DELAY; 484 485 _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", 486 jiffies - call->cjif, 487 ntohs(call->ackr.maxSkew), 488 ntohl(call->ackr.firstPacket), 489 ntohl(call->ackr.previousPacket), 490 ntohl(call->ackr.serial), 491 rxrpc_acks[call->ackr.reason], 492 call->ackr.nAcks); 493 494 aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */ 495 aux[1] = htonl(1444); /* max MTU */ 496 aux[2] = htonl(16); /* rwind */ 497 aux[3] = htonl(4); /* max packets */ 498 499 diov[0].iov_len = sizeof(struct rxrpc_ackpacket); 500 diov[0].iov_base = &call->ackr; 501 diov[1].iov_len = call->ackr_pend_cnt + 3; 502 diov[1].iov_base = call->ackr_array; 503 diov[2].iov_len = sizeof(aux); 504 diov[2].iov_base = &aux; 505 506 /* build and send the message */ 507 ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, 508 3, diov, GFP_KERNEL, &msg); 509 if (ret < 0) 510 goto out; 511 512 msg->seq = seq; 513 msg->hdr.seq = htonl(seq); 514 msg->hdr.flags |= RXRPC_SLOW_START_OK; 515 516 ret = rxrpc_conn_sendmsg(call->conn, msg); 517 rxrpc_put_message(msg); 518 if (ret < 0) 519 goto out; 520 call->pkt_snd_count++; 521 522 /* count how many actual ACKs there were at the front */ 523 for (delta = 0; delta < call->ackr_pend_cnt; delta++) 524 if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK) 525 break; 526 527 call->ackr_pend_cnt -= delta; /* all ACK'd to this point */ 528 529 /* crank the ACK window around */ 530 if (delta == 0) { 531 /* un-ACK'd window */ 532 } 533 else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) { 534 /* partially ACK'd window 535 * - shuffle down to avoid losing out-of-sequence packets 536 */ 537 call->ackr_win_bot += delta; 538 call->ackr_win_top += delta; 539 540 memmove(&call->ackr_array[0], 541 &call->ackr_array[delta], 542 call->ackr_pend_cnt); 543 544 memset(&call->ackr_array[call->ackr_pend_cnt], 545 RXRPC_ACK_TYPE_NACK, 546 sizeof(call->ackr_array) - call->ackr_pend_cnt); 547 } 548 else { 549 /* fully ACK'd window 550 * - just clear the whole thing 551 */ 552 memset(&call->ackr_array, 553 RXRPC_ACK_TYPE_NACK, 554 sizeof(call->ackr_array)); 555 } 556 557 /* clear this ACK */ 558 memset(&call->ackr, 0, sizeof(call->ackr)); 559 560 out: 561 if (!call->app_call_state) 562 printk("___ STATE 0 ___\n"); 563 return ret; 564} /* end __rxrpc_call_gen_normal_ACK() */ 565 566/*****************************************************************************/ 567/* 568 * note the reception of a packet in the call's ACK records and generate an 569 * appropriate ACK packet if necessary 570 * - returns 0 if packet should be processed, 1 if packet should be ignored 571 * and -ve on an error 572 */ 573static int rxrpc_call_generate_ACK(struct rxrpc_call *call, 574 struct rxrpc_header *hdr, 575 struct rxrpc_ackpacket *ack) 576{ 577 struct rxrpc_message *msg; 578 rxrpc_seq_t seq; 579 unsigned offset; 580 int ret = 0, err; 581 u8 special_ACK, do_ACK, force; 582 583 _enter("%p,%p { seq=%d tp=%d fl=%02x }", 584 call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags); 585 586 seq = ntohl(hdr->seq); 587 offset = seq - call->ackr_win_bot; 588 do_ACK = RXRPC_ACK_DELAY; 589 special_ACK = 0; 590 force = (seq == 1); 591 592 if (call->ackr_high_seq < seq) 593 call->ackr_high_seq = seq; 594 595 /* deal with generation of obvious special ACKs first */ 596 if (ack && ack->reason == RXRPC_ACK_PING) { 597 special_ACK = RXRPC_ACK_PING_RESPONSE; 598 ret = 1; 599 goto gen_ACK; 600 } 601 602 if (seq < call->ackr_win_bot) { 603 special_ACK = RXRPC_ACK_DUPLICATE; 604 ret = 1; 605 goto gen_ACK; 606 } 607 608 if (seq >= call->ackr_win_top) { 609 special_ACK = RXRPC_ACK_EXCEEDS_WINDOW; 610 ret = 1; 611 goto gen_ACK; 612 } 613 614 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) { 615 special_ACK = RXRPC_ACK_DUPLICATE; 616 ret = 1; 617 goto gen_ACK; 618 } 619 620 /* okay... it's a normal data packet inside the ACK window */ 621 call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK; 622 623 if (offset < call->ackr_pend_cnt) { 624 } 625 else if (offset > call->ackr_pend_cnt) { 626 do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE; 627 call->ackr_pend_cnt = offset; 628 goto gen_ACK; 629 } 630 631 if (hdr->flags & RXRPC_REQUEST_ACK) { 632 do_ACK = RXRPC_ACK_REQUESTED; 633 } 634 635 /* generate an ACK on the final packet of a reply just received */ 636 if (hdr->flags & RXRPC_LAST_PACKET) { 637 if (call->conn->out_clientflag) 638 force = 1; 639 } 640 else if (!(hdr->flags & RXRPC_MORE_PACKETS)) { 641 do_ACK = RXRPC_ACK_REQUESTED; 642 } 643 644 /* re-ACK packets previously received out-of-order */ 645 for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++) 646 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK) 647 break; 648 649 call->ackr_pend_cnt = offset; 650 651 /* generate an ACK if we fill up the window */ 652 if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE) 653 force = 1; 654 655 gen_ACK: 656 _debug("%05lu ACKs pend=%u norm=%s special=%s%s", 657 jiffies - call->cjif, 658 call->ackr_pend_cnt, 659 rxrpc_acks[do_ACK], 660 rxrpc_acks[special_ACK], 661 force ? " immediate" : 662 do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" : 663 hdr->flags & RXRPC_LAST_PACKET ? " finalise" : 664 " defer" 665 ); 666 667 /* send any pending normal ACKs if need be */ 668 if (call->ackr_pend_cnt > 0) { 669 /* fill out the appropriate form */ 670 call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); 671 call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq, 672 65535U)); 673 call->ackr.firstPacket = htonl(call->ackr_win_bot); 674 call->ackr.previousPacket = call->ackr_prev_seq; 675 call->ackr.serial = hdr->serial; 676 call->ackr.nAcks = call->ackr_pend_cnt; 677 678 if (do_ACK == RXRPC_ACK_REQUESTED) 679 call->ackr.reason = do_ACK; 680 681 /* generate the ACK immediately if necessary */ 682 if (special_ACK || force) { 683 err = __rxrpc_call_gen_normal_ACK( 684 call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq); 685 if (err < 0) { 686 ret = err; 687 goto out; 688 } 689 } 690 } 691 692 if (call->ackr.reason == RXRPC_ACK_REQUESTED) 693 call->ackr_dfr_seq = seq; 694 695 /* start the ACK timer if not running if there are any pending deferred 696 * ACKs */ 697 if (call->ackr_pend_cnt > 0 && 698 call->ackr.reason != RXRPC_ACK_REQUESTED && 699 !timer_pending(&call->ackr_dfr_timo) 700 ) { 701 unsigned long timo; 702 703 timo = rxrpc_call_dfr_ack_timeout + jiffies; 704 705 _debug("START ACKR TIMER for cj=%lu", timo - call->cjif); 706 707 spin_lock(&call->lock); 708 mod_timer(&call->ackr_dfr_timo, timo); 709 spin_unlock(&call->lock); 710 } 711 else if ((call->ackr_pend_cnt == 0 || 712 call->ackr.reason == RXRPC_ACK_REQUESTED) && 713 timer_pending(&call->ackr_dfr_timo) 714 ) { 715 /* stop timer if no pending ACKs */ 716 _debug("CLEAR ACKR TIMER"); 717 del_timer_sync(&call->ackr_dfr_timo); 718 } 719 720 /* send a special ACK if one is required */ 721 if (special_ACK) { 722 struct rxrpc_ackpacket ack; 723 struct kvec diov[2]; 724 uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK }; 725 726 /* fill out the appropriate form */ 727 ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); 728 ack.maxSkew = htons(min(call->ackr_high_seq - seq, 729 65535U)); 730 ack.firstPacket = htonl(call->ackr_win_bot); 731 ack.previousPacket = call->ackr_prev_seq; 732 ack.serial = hdr->serial; 733 ack.reason = special_ACK; 734 ack.nAcks = 0; 735 736 _proto("Rx Sending s-ACK" 737 " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", 738 ntohs(ack.maxSkew), 739 ntohl(ack.firstPacket), 740 ntohl(ack.previousPacket), 741 ntohl(ack.serial), 742 rxrpc_acks[ack.reason], 743 ack.nAcks); 744 745 diov[0].iov_len = sizeof(struct rxrpc_ackpacket); 746 diov[0].iov_base = &ack; 747 diov[1].iov_len = sizeof(acks); 748 diov[1].iov_base = acks; 749 750 /* build and send the message */ 751 err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, 752 hdr->seq ? 2 : 1, diov, 753 GFP_KERNEL, 754 &msg); 755 if (err < 0) { 756 ret = err; 757 goto out; 758 } 759 760 msg->seq = seq; 761 msg->hdr.seq = htonl(seq); 762 msg->hdr.flags |= RXRPC_SLOW_START_OK; 763 764 err = rxrpc_conn_sendmsg(call->conn, msg); 765 rxrpc_put_message(msg); 766 if (err < 0) { 767 ret = err; 768 goto out; 769 } 770 call->pkt_snd_count++; 771 } 772 773 out: 774 if (hdr->seq) 775 call->ackr_prev_seq = hdr->seq; 776 777 _leave(" = %d", ret); 778 return ret; 779} /* end rxrpc_call_generate_ACK() */ 780 781/*****************************************************************************/ 782/* 783 * handle work to be done on a call 784 * - includes packet reception and timeout processing 785 */ 786void rxrpc_call_do_stuff(struct rxrpc_call *call) 787{ 788 _enter("%p{flags=%lx}", call, call->flags); 789 790 /* handle packet reception */ 791 if (call->flags & RXRPC_CALL_RCV_PKT) { 792 _debug("- receive packet"); 793 call->flags &= ~RXRPC_CALL_RCV_PKT; 794 rxrpc_call_receive_packet(call); 795 } 796 797 /* handle overdue ACKs */ 798 if (call->flags & RXRPC_CALL_ACKS_TIMO) { 799 _debug("- overdue ACK timeout"); 800 call->flags &= ~RXRPC_CALL_ACKS_TIMO; 801 rxrpc_call_resend(call, call->snd_seq_count); 802 } 803 804 /* handle lack of reception */ 805 if (call->flags & RXRPC_CALL_RCV_TIMO) { 806 _debug("- reception timeout"); 807 call->flags &= ~RXRPC_CALL_RCV_TIMO; 808 rxrpc_call_abort(call, -EIO); 809 } 810 811 /* handle deferred ACKs */ 812 if (call->flags & RXRPC_CALL_ACKR_TIMO || 813 (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED) 814 ) { 815 _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u", 816 jiffies - call->cjif, 817 rxrpc_acks[call->ackr.reason], 818 call->ackr.nAcks); 819 820 call->flags &= ~RXRPC_CALL_ACKR_TIMO; 821 822 if (call->ackr.nAcks > 0 && 823 call->app_call_state != RXRPC_CSTATE_ERROR) { 824 /* generate ACK */ 825 __rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq); 826 call->ackr_dfr_seq = 0; 827 } 828 } 829 830 _leave(""); 831 832} /* end rxrpc_call_do_stuff() */ 833 834/*****************************************************************************/ 835/* 836 * send an abort message at call or connection level 837 * - must be called with call->lock held 838 * - the supplied error code is sent as the packet data 839 */ 840static int __rxrpc_call_abort(struct rxrpc_call *call, int errno) 841{ 842 struct rxrpc_connection *conn = call->conn; 843 struct rxrpc_message *msg; 844 struct kvec diov[1]; 845 int ret; 846 __be32 _error; 847 848 _enter("%p{%08x},%p{%d},%d", 849 conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno); 850 851 /* if this call is already aborted, then just wake up any waiters */ 852 if (call->app_call_state == RXRPC_CSTATE_ERROR) { 853 spin_unlock(&call->lock); 854 call->app_error_func(call); 855 _leave(" = 0"); 856 return 0; 857 } 858 859 rxrpc_get_call(call); 860 861 /* change the state _with_ the lock still held */ 862 call->app_call_state = RXRPC_CSTATE_ERROR; 863 call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT; 864 call->app_errno = errno; 865 call->app_mark = RXRPC_APP_MARK_EOF; 866 call->app_read_buf = NULL; 867 call->app_async_read = 0; 868 869 _state(call); 870 871 /* ask the app to translate the error code */ 872 call->app_aemap_func(call); 873 874 spin_unlock(&call->lock); 875 876 /* flush any outstanding ACKs */ 877 del_timer_sync(&call->acks_timeout); 878 del_timer_sync(&call->rcv_timeout); 879 del_timer_sync(&call->ackr_dfr_timo); 880 881 if (rxrpc_call_is_ack_pending(call)) 882 __rxrpc_call_gen_normal_ACK(call, 0); 883 884 /* send the abort packet only if we actually traded some other 885 * packets */ 886 ret = 0; 887 if (call->pkt_snd_count || call->pkt_rcv_count) { 888 /* actually send the abort */ 889 _proto("Rx Sending Call ABORT { data=%d }", 890 call->app_abort_code); 891 892 _error = htonl(call->app_abort_code); 893 894 diov[0].iov_len = sizeof(_error); 895 diov[0].iov_base = &_error; 896 897 ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT, 898 1, diov, GFP_KERNEL, &msg); 899 if (ret == 0) { 900 ret = rxrpc_conn_sendmsg(conn, msg); 901 rxrpc_put_message(msg); 902 } 903 } 904 905 /* tell the app layer to let go */ 906 call->app_error_func(call); 907 908 rxrpc_put_call(call); 909 910 _leave(" = %d", ret); 911 return ret; 912} /* end __rxrpc_call_abort() */ 913 914/*****************************************************************************/ 915/* 916 * send an abort message at call or connection level 917 * - the supplied error code is sent as the packet data 918 */ 919int rxrpc_call_abort(struct rxrpc_call *call, int error) 920{ 921 spin_lock(&call->lock); 922 923 return __rxrpc_call_abort(call, error); 924 925} /* end rxrpc_call_abort() */ 926 927/*****************************************************************************/ 928/* 929 * process packets waiting for this call 930 */ 931static void rxrpc_call_receive_packet(struct rxrpc_call *call) 932{ 933 struct rxrpc_message *msg; 934 struct list_head *_p; 935 936 _enter("%p", call); 937 938 rxrpc_get_call(call); /* must not go away too soon if aborted by 939 * app-layer */ 940 941 while (!list_empty(&call->rcv_receiveq)) { 942 /* try to get next packet */ 943 _p = NULL; 944 spin_lock(&call->lock); 945 if (!list_empty(&call->rcv_receiveq)) { 946 _p = call->rcv_receiveq.next; 947 list_del_init(_p); 948 } 949 spin_unlock(&call->lock); 950 951 if (!_p) 952 break; 953 954 msg = list_entry(_p, struct rxrpc_message, link); 955 956 _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)", 957 jiffies - call->cjif, 958 rxrpc_pkts[msg->hdr.type], 959 ntohl(msg->hdr.serial), 960 msg->seq, 961 msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-', 962 msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-', 963 msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-', 964 msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-', 965 msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S' 966 ); 967 968 switch (msg->hdr.type) { 969 /* deal with data packets */ 970 case RXRPC_PACKET_TYPE_DATA: 971 /* ACK the packet if necessary */ 972 switch (rxrpc_call_generate_ACK(call, &msg->hdr, 973 NULL)) { 974 case 0: /* useful packet */ 975 rxrpc_call_receive_data_packet(call, msg); 976 break; 977 case 1: /* duplicate or out-of-window packet */ 978 break; 979 default: 980 rxrpc_put_message(msg); 981 goto out; 982 } 983 break; 984 985 /* deal with ACK packets */ 986 case RXRPC_PACKET_TYPE_ACK: 987 rxrpc_call_receive_ack_packet(call, msg); 988 break; 989 990 /* deal with abort packets */ 991 case RXRPC_PACKET_TYPE_ABORT: { 992 __be32 _dbuf, *dp; 993 994 dp = skb_header_pointer(msg->pkt, msg->offset, 995 sizeof(_dbuf), &_dbuf); 996 if (dp == NULL) 997 printk("Rx Received short ABORT packet\n"); 998 999 _proto("Rx Received Call ABORT { data=%d }", 1000 (dp ? ntohl(*dp) : 0)); 1001 1002 spin_lock(&call->lock); 1003 call->app_call_state = RXRPC_CSTATE_ERROR; 1004 call->app_err_state = RXRPC_ESTATE_PEER_ABORT; 1005 call->app_abort_code = (dp ? ntohl(*dp) : 0); 1006 call->app_errno = -ECONNABORTED; 1007 call->app_mark = RXRPC_APP_MARK_EOF; 1008 call->app_read_buf = NULL; 1009 call->app_async_read = 0; 1010 1011 /* ask the app to translate the error code */ 1012 call->app_aemap_func(call); 1013 _state(call); 1014 spin_unlock(&call->lock); 1015 call->app_error_func(call); 1016 break; 1017 } 1018 default: 1019 /* deal with other packet types */ 1020 _proto("Rx Unsupported packet type %u (#%u)", 1021 msg->hdr.type, msg->seq); 1022 break; 1023 } 1024 1025 rxrpc_put_message(msg); 1026 } 1027 1028 out: 1029 rxrpc_put_call(call); 1030 _leave(""); 1031} /* end rxrpc_call_receive_packet() */ 1032 1033/*****************************************************************************/ 1034/* 1035 * process next data packet 1036 * - as the next data packet arrives: 1037 * - it is queued on app_readyq _if_ it is the next one expected 1038 * (app_ready_seq+1) 1039 * - it is queued on app_unreadyq _if_ it is not the next one expected 1040 * - if a packet placed on app_readyq completely fills a hole leading up to 1041 * the first packet on app_unreadyq, then packets now in sequence are 1042 * tranferred to app_readyq 1043 * - the application layer can only see packets on app_readyq 1044 * (app_ready_qty bytes) 1045 * - the application layer is prodded every time a new packet arrives 1046 */ 1047static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, 1048 struct rxrpc_message *msg) 1049{ 1050 const struct rxrpc_operation *optbl, *op; 1051 struct rxrpc_message *pmsg; 1052 struct list_head *_p; 1053 int ret, lo, hi, rmtimo; 1054 __be32 opid; 1055 1056 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq); 1057 1058 rxrpc_get_message(msg); 1059 1060 /* add to the unready queue if we'd have to create a hole in the ready 1061 * queue otherwise */ 1062 if (msg->seq != call->app_ready_seq + 1) { 1063 _debug("Call add packet %d to unreadyq", msg->seq); 1064 1065 /* insert in seq order */ 1066 list_for_each(_p, &call->app_unreadyq) { 1067 pmsg = list_entry(_p, struct rxrpc_message, link); 1068 if (pmsg->seq > msg->seq) 1069 break; 1070 } 1071 1072 list_add_tail(&msg->link, _p); 1073 1074 _leave(" [unreadyq]"); 1075 return; 1076 } 1077 1078 /* next in sequence - simply append into the call's ready queue */ 1079 _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)", 1080 msg->seq, msg->dsize, call->app_ready_qty); 1081 1082 spin_lock(&call->lock); 1083 call->app_ready_seq = msg->seq; 1084 call->app_ready_qty += msg->dsize; 1085 list_add_tail(&msg->link, &call->app_readyq); 1086 1087 /* move unready packets to the readyq if we got rid of a hole */ 1088 while (!list_empty(&call->app_unreadyq)) { 1089 pmsg = list_entry(call->app_unreadyq.next, 1090 struct rxrpc_message, link); 1091 1092 if (pmsg->seq != call->app_ready_seq + 1) 1093 break; 1094 1095 /* next in sequence - just move list-to-list */ 1096 _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)", 1097 pmsg->seq, pmsg->dsize, call->app_ready_qty); 1098 1099 call->app_ready_seq = pmsg->seq; 1100 call->app_ready_qty += pmsg->dsize; 1101 list_move_tail(&pmsg->link, &call->app_readyq); 1102 } 1103 1104 /* see if we've got the last packet yet */ 1105 if (!list_empty(&call->app_readyq)) { 1106 pmsg = list_entry(call->app_readyq.prev, 1107 struct rxrpc_message, link); 1108 if (pmsg->hdr.flags & RXRPC_LAST_PACKET) { 1109 call->app_last_rcv = 1; 1110 _debug("Last packet on readyq"); 1111 } 1112 } 1113 1114 switch (call->app_call_state) { 1115 /* do nothing if call already aborted */ 1116 case RXRPC_CSTATE_ERROR: 1117 spin_unlock(&call->lock); 1118 _leave(" [error]"); 1119 return; 1120 1121 /* extract the operation ID from an incoming call if that's not 1122 * yet been done */ 1123 case RXRPC_CSTATE_SRVR_RCV_OPID: 1124 spin_unlock(&call->lock); 1125 1126 /* handle as yet insufficient data for the operation ID */ 1127 if (call->app_ready_qty < 4) { 1128 if (call->app_last_rcv) 1129 /* trouble - last packet seen */ 1130 rxrpc_call_abort(call, -EINVAL); 1131 1132 _leave(""); 1133 return; 1134 } 1135 1136 /* pull the operation ID out of the buffer */ 1137 ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0); 1138 if (ret < 0) { 1139 printk("Unexpected error from read-data: %d\n", ret); 1140 if (call->app_call_state != RXRPC_CSTATE_ERROR) 1141 rxrpc_call_abort(call, ret); 1142 _leave(""); 1143 return; 1144 } 1145 call->app_opcode = ntohl(opid); 1146 1147 /* locate the operation in the available ops table */ 1148 optbl = call->conn->service->ops_begin; 1149 lo = 0; 1150 hi = call->conn->service->ops_end - optbl; 1151 1152 while (lo < hi) { 1153 int mid = (hi + lo) / 2; 1154 op = &optbl[mid]; 1155 if (call->app_opcode == op->id) 1156 goto found_op; 1157 if (call->app_opcode > op->id) 1158 lo = mid + 1; 1159 else 1160 hi = mid; 1161 } 1162 1163 /* search failed */ 1164 kproto("Rx Client requested operation %d from %s service", 1165 call->app_opcode, call->conn->service->name); 1166 rxrpc_call_abort(call, -EINVAL); 1167 _leave(" [inval]"); 1168 return; 1169 1170 found_op: 1171 _proto("Rx Client requested operation %s from %s service", 1172 op->name, call->conn->service->name); 1173 1174 /* we're now waiting for the argument block (unless the call 1175 * was aborted) */ 1176 spin_lock(&call->lock); 1177 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_OPID || 1178 call->app_call_state == RXRPC_CSTATE_SRVR_SND_REPLY) { 1179 if (!call->app_last_rcv) 1180 call->app_call_state = 1181 RXRPC_CSTATE_SRVR_RCV_ARGS; 1182 else if (call->app_ready_qty > 0) 1183 call->app_call_state = 1184 RXRPC_CSTATE_SRVR_GOT_ARGS; 1185 else 1186 call->app_call_state = 1187 RXRPC_CSTATE_SRVR_SND_REPLY; 1188 call->app_mark = op->asize; 1189 call->app_user = op->user; 1190 } 1191 spin_unlock(&call->lock); 1192 1193 _state(call); 1194 break; 1195 1196 case RXRPC_CSTATE_SRVR_RCV_ARGS: 1197 /* change state if just received last packet of arg block */ 1198 if (call->app_last_rcv) 1199 call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS; 1200 spin_unlock(&call->lock); 1201 1202 _state(call); 1203 break; 1204 1205 case RXRPC_CSTATE_CLNT_RCV_REPLY: 1206 /* change state if just received last packet of reply block */ 1207 rmtimo = 0; 1208 if (call->app_last_rcv) { 1209 call->app_call_state = RXRPC_CSTATE_CLNT_GOT_REPLY; 1210 rmtimo = 1; 1211 } 1212 spin_unlock(&call->lock); 1213 1214 if (rmtimo) { 1215 del_timer_sync(&call->acks_timeout); 1216 del_timer_sync(&call->rcv_timeout); 1217 del_timer_sync(&call->ackr_dfr_timo); 1218 } 1219 1220 _state(call); 1221 break; 1222 1223 default: 1224 /* deal with data reception in an unexpected state */ 1225 printk("Unexpected state [[[ %u ]]]\n", call->app_call_state); 1226 __rxrpc_call_abort(call, -EBADMSG); 1227 _leave(""); 1228 return; 1229 } 1230 1231 if (call->app_call_state == RXRPC_CSTATE_CLNT_RCV_REPLY && 1232 call->app_last_rcv) 1233 BUG(); 1234 1235 /* otherwise just invoke the data function whenever we can satisfy its desire for more 1236 * data 1237 */ 1238 _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s", 1239 call->app_call_state, call->app_ready_qty, call->app_mark, 1240 call->app_last_rcv ? " last-rcvd" : ""); 1241 1242 spin_lock(&call->lock); 1243 1244 ret = __rxrpc_call_read_data(call); 1245 switch (ret) { 1246 case 0: 1247 spin_unlock(&call->lock); 1248 call->app_attn_func(call); 1249 break; 1250 case -EAGAIN: 1251 spin_unlock(&call->lock); 1252 break; 1253 case -ECONNABORTED: 1254 spin_unlock(&call->lock); 1255 break; 1256 default: 1257 __rxrpc_call_abort(call, ret); 1258 break; 1259 } 1260 1261 _state(call); 1262 1263 _leave(""); 1264 1265} /* end rxrpc_call_receive_data_packet() */ 1266 1267/*****************************************************************************/ 1268/* 1269 * received an ACK packet 1270 */ 1271static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, 1272 struct rxrpc_message *msg) 1273{ 1274 struct rxrpc_ackpacket _ack, *ap; 1275 rxrpc_serial_net_t serial; 1276 rxrpc_seq_t seq; 1277 int ret; 1278 1279 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq); 1280 1281 /* extract the basic ACK record */ 1282 ap = skb_header_pointer(msg->pkt, msg->offset, sizeof(_ack), &_ack); 1283 if (ap == NULL) { 1284 printk("Rx Received short ACK packet\n"); 1285 return; 1286 } 1287 msg->offset += sizeof(_ack); 1288 1289 serial = ap->serial; 1290 seq = ntohl(ap->firstPacket); 1291 1292 _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }", 1293 ntohl(msg->hdr.serial), 1294 ntohs(ap->bufferSpace), 1295 ntohs(ap->maxSkew), 1296 seq, 1297 ntohl(ap->previousPacket), 1298 ntohl(serial), 1299 rxrpc_acks[ap->reason], 1300 call->ackr.nAcks 1301 ); 1302 1303 /* check the other side isn't ACK'ing a sequence number I haven't sent 1304 * yet */ 1305 if (ap->nAcks > 0 && 1306 (seq > call->snd_seq_count || 1307 seq + ap->nAcks - 1 > call->snd_seq_count)) { 1308 printk("Received ACK (#%u-#%u) for unsent packet\n", 1309 seq, seq + ap->nAcks - 1); 1310 rxrpc_call_abort(call, -EINVAL); 1311 _leave(""); 1312 return; 1313 } 1314 1315 /* deal with RTT calculation */ 1316 if (serial) { 1317 struct rxrpc_message *rttmsg; 1318 1319 /* find the prompting packet */ 1320 spin_lock(&call->lock); 1321 if (call->snd_ping && call->snd_ping->hdr.serial == serial) { 1322 /* it was a ping packet */ 1323 rttmsg = call->snd_ping; 1324 call->snd_ping = NULL; 1325 spin_unlock(&call->lock); 1326 1327 if (rttmsg) { 1328 rttmsg->rttdone = 1; 1329 rxrpc_peer_calculate_rtt(call->conn->peer, 1330 rttmsg, msg); 1331 rxrpc_put_message(rttmsg); 1332 } 1333 } 1334 else { 1335 struct list_head *_p; 1336 1337 /* it ought to be a data packet - look in the pending 1338 * ACK list */ 1339 list_for_each(_p, &call->acks_pendq) { 1340 rttmsg = list_entry(_p, struct rxrpc_message, 1341 link); 1342 if (rttmsg->hdr.serial == serial) { 1343 if (rttmsg->rttdone) 1344 /* never do RTT twice without 1345 * resending */ 1346 break; 1347 1348 rttmsg->rttdone = 1; 1349 rxrpc_peer_calculate_rtt( 1350 call->conn->peer, rttmsg, msg); 1351 break; 1352 } 1353 } 1354 spin_unlock(&call->lock); 1355 } 1356 } 1357 1358 switch (ap->reason) { 1359 /* deal with negative/positive acknowledgement of data 1360 * packets */ 1361 case RXRPC_ACK_REQUESTED: 1362 case RXRPC_ACK_DELAY: 1363 case RXRPC_ACK_IDLE: 1364 rxrpc_call_definitively_ACK(call, seq - 1); 1365 1366 case RXRPC_ACK_DUPLICATE: 1367 case RXRPC_ACK_OUT_OF_SEQUENCE: 1368 case RXRPC_ACK_EXCEEDS_WINDOW: 1369 call->snd_resend_cnt = 0; 1370 ret = rxrpc_call_record_ACK(call, msg, seq, ap->nAcks); 1371 if (ret < 0) 1372 rxrpc_call_abort(call, ret); 1373 break; 1374 1375 /* respond to ping packets immediately */ 1376 case RXRPC_ACK_PING: 1377 rxrpc_call_generate_ACK(call, &msg->hdr, ap); 1378 break; 1379 1380 /* only record RTT on ping response packets */ 1381 case RXRPC_ACK_PING_RESPONSE: 1382 if (call->snd_ping) { 1383 struct rxrpc_message *rttmsg; 1384 1385 /* only do RTT stuff if the response matches the 1386 * retained ping */ 1387 rttmsg = NULL; 1388 spin_lock(&call->lock); 1389 if (call->snd_ping && 1390 call->snd_ping->hdr.serial == ap->serial) { 1391 rttmsg = call->snd_ping; 1392 call->snd_ping = NULL; 1393 } 1394 spin_unlock(&call->lock); 1395 1396 if (rttmsg) { 1397 rttmsg->rttdone = 1; 1398 rxrpc_peer_calculate_rtt(call->conn->peer, 1399 rttmsg, msg); 1400 rxrpc_put_message(rttmsg); 1401 } 1402 } 1403 break; 1404 1405 default: 1406 printk("Unsupported ACK reason %u\n", ap->reason); 1407 break; 1408 } 1409 1410 _leave(""); 1411} /* end rxrpc_call_receive_ack_packet() */ 1412 1413/*****************************************************************************/ 1414/* 1415 * record definitive ACKs for all messages up to and including the one with the 1416 * 'highest' seq 1417 */ 1418static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, 1419 rxrpc_seq_t highest) 1420{ 1421 struct rxrpc_message *msg; 1422 int now_complete; 1423 1424 _enter("%p{ads=%u},%u", call, call->acks_dftv_seq, highest); 1425 1426 while (call->acks_dftv_seq < highest) { 1427 call->acks_dftv_seq++; 1428 1429 _proto("Definitive ACK on packet #%u", call->acks_dftv_seq); 1430 1431 /* discard those at front of queue until message with highest 1432 * ACK is found */ 1433 spin_lock(&call->lock); 1434 msg = NULL; 1435 if (!list_empty(&call->acks_pendq)) { 1436 msg = list_entry(call->acks_pendq.next, 1437 struct rxrpc_message, link); 1438 list_del_init(&msg->link); /* dequeue */ 1439 if (msg->state == RXRPC_MSG_SENT) 1440 call->acks_pend_cnt--; 1441 } 1442 spin_unlock(&call->lock); 1443 1444 /* insanity check */ 1445 if (!msg) 1446 panic("%s(): acks_pendq unexpectedly empty\n", 1447 __FUNCTION__); 1448 1449 if (msg->seq != call->acks_dftv_seq) 1450 panic("%s(): Packet #%u expected at front of acks_pendq" 1451 " (#%u found)\n", 1452 __FUNCTION__, call->acks_dftv_seq, msg->seq); 1453 1454 /* discard the message */ 1455 msg->state = RXRPC_MSG_DONE; 1456 rxrpc_put_message(msg); 1457 } 1458 1459 /* if all sent packets are definitively ACK'd then prod any sleepers just in case */ 1460 now_complete = 0; 1461 spin_lock(&call->lock); 1462 if (call->acks_dftv_seq == call->snd_seq_count) { 1463 if (call->app_call_state != RXRPC_CSTATE_COMPLETE) { 1464 call->app_call_state = RXRPC_CSTATE_COMPLETE; 1465 _state(call); 1466 now_complete = 1; 1467 } 1468 } 1469 spin_unlock(&call->lock); 1470 1471 if (now_complete) { 1472 del_timer_sync(&call->acks_timeout); 1473 del_timer_sync(&call->rcv_timeout); 1474 del_timer_sync(&call->ackr_dfr_timo); 1475 call->app_attn_func(call); 1476 } 1477 1478 _leave(""); 1479} /* end rxrpc_call_definitively_ACK() */ 1480 1481/*****************************************************************************/ 1482/* 1483 * record the specified amount of ACKs/NAKs 1484 */ 1485static int rxrpc_call_record_ACK(struct rxrpc_call *call, 1486 struct rxrpc_message *msg, 1487 rxrpc_seq_t seq, 1488 size_t count) 1489{ 1490 struct rxrpc_message *dmsg; 1491 struct list_head *_p; 1492 rxrpc_seq_t highest; 1493 unsigned ix; 1494 size_t chunk; 1495 char resend, now_complete; 1496 u8 acks[16]; 1497 1498 _enter("%p{apc=%u ads=%u},%p,%u,%Zu", 1499 call, call->acks_pend_cnt, call->acks_dftv_seq, 1500 msg, seq, count); 1501 1502 /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order 1503 * ACKs) */ 1504 if (seq <= call->acks_dftv_seq) { 1505 unsigned delta = call->acks_dftv_seq - seq; 1506 1507 if (count <= delta) { 1508 _leave(" = 0 [all definitively ACK'd]"); 1509 return 0; 1510 } 1511 1512 seq += delta; 1513 count -= delta; 1514 msg->offset += delta; 1515 } 1516 1517 highest = seq + count - 1; 1518 resend = 0; 1519 while (count > 0) { 1520 /* extract up to 16 ACK slots at a time */ 1521 chunk = min(count, sizeof(acks)); 1522 count -= chunk; 1523 1524 memset(acks, 2, sizeof(acks)); 1525 1526 if (skb_copy_bits(msg->pkt, msg->offset, &acks, chunk) < 0) { 1527 printk("Rx Received short ACK packet\n"); 1528 _leave(" = -EINVAL"); 1529 return -EINVAL; 1530 } 1531 msg->offset += chunk; 1532 1533 /* check that the ACK set is valid */ 1534 for (ix = 0; ix < chunk; ix++) { 1535 switch (acks[ix]) { 1536 case RXRPC_ACK_TYPE_ACK: 1537 break; 1538 case RXRPC_ACK_TYPE_NACK: 1539 resend = 1; 1540 break; 1541 default: 1542 printk("Rx Received unsupported ACK state" 1543 " %u\n", acks[ix]); 1544 _leave(" = -EINVAL"); 1545 return -EINVAL; 1546 } 1547 } 1548 1549 _proto("Rx ACK of packets #%u-#%u " 1550 "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)", 1551 seq, (unsigned) (seq + chunk - 1), 1552 _acktype[acks[0x0]], 1553 _acktype[acks[0x1]], 1554 _acktype[acks[0x2]], 1555 _acktype[acks[0x3]], 1556 _acktype[acks[0x4]], 1557 _acktype[acks[0x5]], 1558 _acktype[acks[0x6]], 1559 _acktype[acks[0x7]], 1560 _acktype[acks[0x8]], 1561 _acktype[acks[0x9]], 1562 _acktype[acks[0xA]], 1563 _acktype[acks[0xB]], 1564 _acktype[acks[0xC]], 1565 _acktype[acks[0xD]], 1566 _acktype[acks[0xE]], 1567 _acktype[acks[0xF]], 1568 call->acks_pend_cnt 1569 ); 1570 1571 /* mark the packets in the ACK queue as being provisionally 1572 * ACK'd */ 1573 ix = 0; 1574 spin_lock(&call->lock); 1575 1576 /* find the first packet ACK'd/NAK'd here */ 1577 list_for_each(_p, &call->acks_pendq) { 1578 dmsg = list_entry(_p, struct rxrpc_message, link); 1579 if (dmsg->seq == seq) 1580 goto found_first; 1581 _debug("- %u: skipping #%u", ix, dmsg->seq); 1582 } 1583 goto bad_queue; 1584 1585 found_first: 1586 do { 1587 _debug("- %u: processing #%u (%c) apc=%u", 1588 ix, dmsg->seq, _acktype[acks[ix]], 1589 call->acks_pend_cnt); 1590 1591 if (acks[ix] == RXRPC_ACK_TYPE_ACK) { 1592 if (dmsg->state == RXRPC_MSG_SENT) 1593 call->acks_pend_cnt--; 1594 dmsg->state = RXRPC_MSG_ACKED; 1595 } 1596 else { 1597 if (dmsg->state == RXRPC_MSG_ACKED) 1598 call->acks_pend_cnt++; 1599 dmsg->state = RXRPC_MSG_SENT; 1600 } 1601 ix++; 1602 seq++; 1603 1604 _p = dmsg->link.next; 1605 dmsg = list_entry(_p, struct rxrpc_message, link); 1606 } while(ix < chunk && 1607 _p != &call->acks_pendq && 1608 dmsg->seq == seq); 1609 1610 if (ix < chunk) 1611 goto bad_queue; 1612 1613 spin_unlock(&call->lock); 1614 } 1615 1616 if (resend) 1617 rxrpc_call_resend(call, highest); 1618 1619 /* if all packets are provisionally ACK'd, then wake up anyone who's 1620 * waiting for that */ 1621 now_complete = 0; 1622 spin_lock(&call->lock); 1623 if (call->acks_pend_cnt == 0) { 1624 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_FINAL_ACK) { 1625 call->app_call_state = RXRPC_CSTATE_COMPLETE; 1626 _state(call); 1627 } 1628 now_complete = 1; 1629 } 1630 spin_unlock(&call->lock); 1631 1632 if (now_complete) { 1633 _debug("- wake up waiters"); 1634 del_timer_sync(&call->acks_timeout); 1635 del_timer_sync(&call->rcv_timeout); 1636 del_timer_sync(&call->ackr_dfr_timo); 1637 call->app_attn_func(call); 1638 } 1639 1640 _leave(" = 0 (apc=%u)", call->acks_pend_cnt); 1641 return 0; 1642 1643 bad_queue: 1644 panic("%s(): acks_pendq in bad state (packet #%u absent)\n", 1645 __FUNCTION__, seq); 1646 1647} /* end rxrpc_call_record_ACK() */ 1648 1649/*****************************************************************************/ 1650/* 1651 * transfer data from the ready packet queue to the asynchronous read buffer 1652 * - since this func is the only one going to look at packets queued on 1653 * app_readyq, we don't need a lock to modify or access them, only to modify 1654 * the queue pointers 1655 * - called with call->lock held 1656 * - the buffer must be in kernel space 1657 * - returns: 1658 * 0 if buffer filled 1659 * -EAGAIN if buffer not filled and more data to come 1660 * -EBADMSG if last packet received and insufficient data left 1661 * -ECONNABORTED if the call has in an error state 1662 */ 1663static int __rxrpc_call_read_data(struct rxrpc_call *call) 1664{ 1665 struct rxrpc_message *msg; 1666 size_t qty; 1667 int ret; 1668 1669 _enter("%p{as=%d buf=%p qty=%Zu/%Zu}", 1670 call, 1671 call->app_async_read, call->app_read_buf, 1672 call->app_ready_qty, call->app_mark); 1673 1674 /* check the state */ 1675 switch (call->app_call_state) { 1676 case RXRPC_CSTATE_SRVR_RCV_ARGS: 1677 case RXRPC_CSTATE_CLNT_RCV_REPLY: 1678 if (call->app_last_rcv) { 1679 printk("%s(%p,%p,%Zd):" 1680 " Inconsistent call state (%s, last pkt)", 1681 __FUNCTION__, 1682 call, call->app_read_buf, call->app_mark, 1683 rxrpc_call_states[call->app_call_state]); 1684 BUG(); 1685 } 1686 break; 1687 1688 case RXRPC_CSTATE_SRVR_RCV_OPID: 1689 case RXRPC_CSTATE_SRVR_GOT_ARGS: 1690 case RXRPC_CSTATE_CLNT_GOT_REPLY: 1691 break; 1692 1693 case RXRPC_CSTATE_SRVR_SND_REPLY: 1694 if (!call->app_last_rcv) { 1695 printk("%s(%p,%p,%Zd):" 1696 " Inconsistent call state (%s, not last pkt)", 1697 __FUNCTION__, 1698 call, call->app_read_buf, call->app_mark, 1699 rxrpc_call_states[call->app_call_state]); 1700 BUG(); 1701 } 1702 _debug("Trying to read data from call in SND_REPLY state"); 1703 break; 1704 1705 case RXRPC_CSTATE_ERROR: 1706 _leave(" = -ECONNABORTED"); 1707 return -ECONNABORTED; 1708 1709 default: 1710 printk("reading in unexpected state [[[ %u ]]]\n", 1711 call->app_call_state); 1712 BUG(); 1713 } 1714 1715 /* handle the case of not having an async buffer */ 1716 if (!call->app_async_read) { 1717 if (call->app_mark == RXRPC_APP_MARK_EOF) { 1718 ret = call->app_last_rcv ? 0 : -EAGAIN; 1719 } 1720 else { 1721 if (call->app_mark >= call->app_ready_qty) { 1722 call->app_mark = RXRPC_APP_MARK_EOF; 1723 ret = 0; 1724 } 1725 else { 1726 ret = call->app_last_rcv ? -EBADMSG : -EAGAIN; 1727 } 1728 } 1729 1730 _leave(" = %d [no buf]", ret); 1731 return 0; 1732 } 1733 1734 while (!list_empty(&call->app_readyq) && call->app_mark > 0) { 1735 msg = list_entry(call->app_readyq.next, 1736 struct rxrpc_message, link); 1737 1738 /* drag as much data as we need out of this packet */ 1739 qty = min(call->app_mark, msg->dsize); 1740 1741 _debug("reading %Zu from skb=%p off=%lu", 1742 qty, msg->pkt, msg->offset); 1743 1744 if (call->app_read_buf) 1745 if (skb_copy_bits(msg->pkt, msg->offset, 1746 call->app_read_buf, qty) < 0) 1747 panic("%s: Failed to copy data from packet:" 1748 " (%p,%p,%Zd)", 1749 __FUNCTION__, 1750 call, call->app_read_buf, qty); 1751 1752 /* if that packet is now empty, discard it */ 1753 call->app_ready_qty -= qty; 1754 msg->dsize -= qty; 1755 1756 if (msg->dsize == 0) { 1757 list_del_init(&msg->link); 1758 rxrpc_put_message(msg); 1759 } 1760 else { 1761 msg->offset += qty; 1762 } 1763 1764 call->app_mark -= qty; 1765 if (call->app_read_buf) 1766 call->app_read_buf += qty; 1767 } 1768 1769 if (call->app_mark == 0) { 1770 call->app_async_read = 0; 1771 call->app_mark = RXRPC_APP_MARK_EOF; 1772 call->app_read_buf = NULL; 1773 1774 /* adjust the state if used up all packets */ 1775 if (list_empty(&call->app_readyq) && call->app_last_rcv) { 1776 switch (call->app_call_state) { 1777 case RXRPC_CSTATE_SRVR_RCV_OPID: 1778 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; 1779 call->app_mark = RXRPC_APP_MARK_EOF; 1780 _state(call); 1781 del_timer_sync(&call->rcv_timeout); 1782 break; 1783 case RXRPC_CSTATE_SRVR_GOT_ARGS: 1784 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; 1785 _state(call); 1786 del_timer_sync(&call->rcv_timeout); 1787 break; 1788 default: 1789 call->app_call_state = RXRPC_CSTATE_COMPLETE; 1790 _state(call); 1791 del_timer_sync(&call->acks_timeout); 1792 del_timer_sync(&call->ackr_dfr_timo); 1793 del_timer_sync(&call->rcv_timeout); 1794 break; 1795 } 1796 } 1797 1798 _leave(" = 0"); 1799 return 0; 1800 } 1801 1802 if (call->app_last_rcv) { 1803 _debug("Insufficient data (%Zu/%Zu)", 1804 call->app_ready_qty, call->app_mark); 1805 call->app_async_read = 0; 1806 call->app_mark = RXRPC_APP_MARK_EOF; 1807 call->app_read_buf = NULL; 1808 1809 _leave(" = -EBADMSG"); 1810 return -EBADMSG; 1811 } 1812 1813 _leave(" = -EAGAIN"); 1814 return -EAGAIN; 1815} /* end __rxrpc_call_read_data() */ 1816 1817/*****************************************************************************/ 1818/* 1819 * attempt to read the specified amount of data from the call's ready queue 1820 * into the buffer provided 1821 * - since this func is the only one going to look at packets queued on 1822 * app_readyq, we don't need a lock to modify or access them, only to modify 1823 * the queue pointers 1824 * - if the buffer pointer is NULL, then data is merely drained, not copied 1825 * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is 1826 * enough data or an error will be generated 1827 * - note that the caller must have added the calling task to the call's wait 1828 * queue beforehand 1829 * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this 1830 * function doesn't read all available data 1831 */ 1832int rxrpc_call_read_data(struct rxrpc_call *call, 1833 void *buffer, size_t size, int flags) 1834{ 1835 int ret; 1836 1837 _enter("%p{arq=%Zu},%p,%Zd,%x", 1838 call, call->app_ready_qty, buffer, size, flags); 1839 1840 spin_lock(&call->lock); 1841 1842 if (unlikely(!!call->app_read_buf)) { 1843 spin_unlock(&call->lock); 1844 _leave(" = -EBUSY"); 1845 return -EBUSY; 1846 } 1847 1848 call->app_mark = size; 1849 call->app_read_buf = buffer; 1850 call->app_async_read = 1; 1851 call->app_read_count++; 1852 1853 /* read as much data as possible */ 1854 ret = __rxrpc_call_read_data(call); 1855 switch (ret) { 1856 case 0: 1857 if (flags & RXRPC_CALL_READ_ALL && 1858 (!call->app_last_rcv || call->app_ready_qty > 0)) { 1859 _leave(" = -EBADMSG"); 1860 __rxrpc_call_abort(call, -EBADMSG); 1861 return -EBADMSG; 1862 } 1863 1864 spin_unlock(&call->lock); 1865 call->app_attn_func(call); 1866 _leave(" = 0"); 1867 return ret; 1868 1869 case -ECONNABORTED: 1870 spin_unlock(&call->lock); 1871 _leave(" = %d [aborted]", ret); 1872 return ret; 1873 1874 default: 1875 __rxrpc_call_abort(call, ret); 1876 _leave(" = %d", ret); 1877 return ret; 1878 1879 case -EAGAIN: 1880 spin_unlock(&call->lock); 1881 1882 if (!(flags & RXRPC_CALL_READ_BLOCK)) { 1883 _leave(" = -EAGAIN"); 1884 return -EAGAIN; 1885 } 1886 1887 /* wait for the data to arrive */ 1888 _debug("blocking for data arrival"); 1889 1890 for (;;) { 1891 set_current_state(TASK_INTERRUPTIBLE); 1892 if (!call->app_async_read || signal_pending(current)) 1893 break; 1894 schedule(); 1895 } 1896 set_current_state(TASK_RUNNING); 1897 1898 if (signal_pending(current)) { 1899 _leave(" = -EINTR"); 1900 return -EINTR; 1901 } 1902 1903 if (call->app_call_state == RXRPC_CSTATE_ERROR) { 1904 _leave(" = -ECONNABORTED"); 1905 return -ECONNABORTED; 1906 } 1907 1908 _leave(" = 0"); 1909 return 0; 1910 } 1911 1912} /* end rxrpc_call_read_data() */ 1913 1914/*****************************************************************************/ 1915/* 1916 * write data to a call 1917 * - the data may not be sent immediately if it doesn't fill a buffer 1918 * - if we can't queue all the data for buffering now, siov[] will have been 1919 * adjusted to take account of what has been sent 1920 */ 1921int rxrpc_call_write_data(struct rxrpc_call *call, 1922 size_t sioc, 1923 struct kvec *siov, 1924 u8 rxhdr_flags, 1925 gfp_t alloc_flags, 1926 int dup_data, 1927 size_t *size_sent) 1928{ 1929 struct rxrpc_message *msg; 1930 struct kvec *sptr; 1931 size_t space, size, chunk, tmp; 1932 char *buf; 1933 int ret; 1934 1935 _enter("%p,%Zu,%p,%02x,%x,%d,%p", 1936 call, sioc, siov, rxhdr_flags, alloc_flags, dup_data, 1937 size_sent); 1938 1939 *size_sent = 0; 1940 size = 0; 1941 ret = -EINVAL; 1942 1943 /* can't send more if we've sent last packet from this end */ 1944 switch (call->app_call_state) { 1945 case RXRPC_CSTATE_SRVR_SND_REPLY: 1946 case RXRPC_CSTATE_CLNT_SND_ARGS: 1947 break; 1948 case RXRPC_CSTATE_ERROR: 1949 ret = call->app_errno; 1950 default: 1951 goto out; 1952 } 1953 1954 /* calculate how much data we've been given */ 1955 sptr = siov; 1956 for (; sioc > 0; sptr++, sioc--) { 1957 if (!sptr->iov_len) 1958 continue; 1959 1960 if (!sptr->iov_base) 1961 goto out; 1962 1963 size += sptr->iov_len; 1964 } 1965 1966 _debug("- size=%Zu mtu=%Zu", size, call->conn->mtu_size); 1967 1968 do { 1969 /* make sure there's a message under construction */ 1970 if (!call->snd_nextmsg) { 1971 /* no - allocate a message with no data yet attached */ 1972 ret = rxrpc_conn_newmsg(call->conn, call, 1973 RXRPC_PACKET_TYPE_DATA, 1974 0, NULL, alloc_flags, 1975 &call->snd_nextmsg); 1976 if (ret < 0) 1977 goto out; 1978 _debug("- allocated new message [ds=%Zu]", 1979 call->snd_nextmsg->dsize); 1980 } 1981 1982 msg = call->snd_nextmsg; 1983 msg->hdr.flags |= rxhdr_flags; 1984 1985 /* deal with zero-length terminal packet */ 1986 if (size == 0) { 1987 if (rxhdr_flags & RXRPC_LAST_PACKET) { 1988 ret = rxrpc_call_flush(call); 1989 if (ret < 0) 1990 goto out; 1991 } 1992 break; 1993 } 1994 1995 /* work out how much space current packet has available */ 1996 space = call->conn->mtu_size - msg->dsize; 1997 chunk = min(space, size); 1998 1999 _debug("- [before] space=%Zu chunk=%Zu", space, chunk); 2000 2001 while (!siov->iov_len) 2002 siov++; 2003 2004 /* if we are going to have to duplicate the data then coalesce 2005 * it too */ 2006 if (dup_data) { 2007 /* don't allocate more that 1 page at a time */ 2008 if (chunk > PAGE_SIZE) 2009 chunk = PAGE_SIZE; 2010 2011 /* allocate a data buffer and attach to the message */ 2012 buf = kmalloc(chunk, alloc_flags); 2013 if (unlikely(!buf)) { 2014 if (msg->dsize == 2015 sizeof(struct rxrpc_header)) { 2016 /* discard an empty msg and wind back 2017 * the seq counter */ 2018 rxrpc_put_message(msg); 2019 call->snd_nextmsg = NULL; 2020 call->snd_seq_count--; 2021 } 2022 2023 ret = -ENOMEM; 2024 goto out; 2025 } 2026 2027 tmp = msg->dcount++; 2028 set_bit(tmp, &msg->dfree); 2029 msg->data[tmp].iov_base = buf; 2030 msg->data[tmp].iov_len = chunk; 2031 msg->dsize += chunk; 2032 *size_sent += chunk; 2033 size -= chunk; 2034 2035 /* load the buffer with data */ 2036 while (chunk > 0) { 2037 tmp = min(chunk, siov->iov_len); 2038 memcpy(buf, siov->iov_base, tmp); 2039 buf += tmp; 2040 siov->iov_base += tmp; 2041 siov->iov_len -= tmp; 2042 if (!siov->iov_len) 2043 siov++; 2044 chunk -= tmp; 2045 } 2046 } 2047 else { 2048 /* we want to attach the supplied buffers directly */ 2049 while (chunk > 0 && 2050 msg->dcount < RXRPC_MSG_MAX_IOCS) { 2051 tmp = msg->dcount++; 2052 msg->data[tmp].iov_base = siov->iov_base; 2053 msg->data[tmp].iov_len = siov->iov_len; 2054 msg->dsize += siov->iov_len; 2055 *size_sent += siov->iov_len; 2056 size -= siov->iov_len; 2057 chunk -= siov->iov_len; 2058 siov++; 2059 } 2060 } 2061 2062 _debug("- [loaded] chunk=%Zu size=%Zu", chunk, size); 2063 2064 /* dispatch the message when full, final or requesting ACK */ 2065 if (msg->dsize >= call->conn->mtu_size || rxhdr_flags) { 2066 ret = rxrpc_call_flush(call); 2067 if (ret < 0) 2068 goto out; 2069 } 2070 2071 } while(size > 0); 2072 2073 ret = 0; 2074 out: 2075 _leave(" = %d (%Zd queued, %Zd rem)", ret, *size_sent, size); 2076 return ret; 2077 2078} /* end rxrpc_call_write_data() */ 2079 2080/*****************************************************************************/ 2081/* 2082 * flush outstanding packets to the network 2083 */ 2084static int rxrpc_call_flush(struct rxrpc_call *call) 2085{ 2086 struct rxrpc_message *msg; 2087 int ret = 0; 2088 2089 _enter("%p", call); 2090 2091 rxrpc_get_call(call); 2092 2093 /* if there's a packet under construction, then dispatch it now */ 2094 if (call->snd_nextmsg) { 2095 msg = call->snd_nextmsg; 2096 call->snd_nextmsg = NULL; 2097 2098 if (msg->hdr.flags & RXRPC_LAST_PACKET) { 2099 msg->hdr.flags &= ~RXRPC_MORE_PACKETS; 2100 if (call->app_call_state != RXRPC_CSTATE_CLNT_SND_ARGS) 2101 msg->hdr.flags |= RXRPC_REQUEST_ACK; 2102 } 2103 else { 2104 msg->hdr.flags |= RXRPC_MORE_PACKETS; 2105 } 2106 2107 _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }", 2108 msg->dsize, msg->dcount, msg->dfree); 2109 2110 /* queue and adjust call state */ 2111 spin_lock(&call->lock); 2112 list_add_tail(&msg->link, &call->acks_pendq); 2113 2114 /* decide what to do depending on current state and if this is 2115 * the last packet */ 2116 ret = -EINVAL; 2117 switch (call->app_call_state) { 2118 case RXRPC_CSTATE_SRVR_SND_REPLY: 2119 if (msg->hdr.flags & RXRPC_LAST_PACKET) { 2120 call->app_call_state = 2121 RXRPC_CSTATE_SRVR_RCV_FINAL_ACK; 2122 _state(call); 2123 } 2124 break; 2125 2126 case RXRPC_CSTATE_CLNT_SND_ARGS: 2127 if (msg->hdr.flags & RXRPC_LAST_PACKET) { 2128 call->app_call_state = 2129 RXRPC_CSTATE_CLNT_RCV_REPLY; 2130 _state(call); 2131 } 2132 break; 2133 2134 case RXRPC_CSTATE_ERROR: 2135 ret = call->app_errno; 2136 default: 2137 spin_unlock(&call->lock); 2138 goto out; 2139 } 2140 2141 call->acks_pend_cnt++; 2142 2143 mod_timer(&call->acks_timeout, 2144 __rxrpc_rtt_based_timeout(call, 2145 rxrpc_call_acks_timeout)); 2146 2147 spin_unlock(&call->lock); 2148 2149 ret = rxrpc_conn_sendmsg(call->conn, msg); 2150 if (ret == 0) 2151 call->pkt_snd_count++; 2152 } 2153 2154 out: 2155 rxrpc_put_call(call); 2156 2157 _leave(" = %d", ret); 2158 return ret; 2159 2160} /* end rxrpc_call_flush() */ 2161 2162/*****************************************************************************/ 2163/* 2164 * resend NAK'd or unacknowledged packets up to the highest one specified 2165 */ 2166static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest) 2167{ 2168 struct rxrpc_message *msg; 2169 struct list_head *_p; 2170 rxrpc_seq_t seq = 0; 2171 2172 _enter("%p,%u", call, highest); 2173 2174 _proto("Rx Resend required"); 2175 2176 /* handle too many resends */ 2177 if (call->snd_resend_cnt >= rxrpc_call_max_resend) { 2178 _debug("Aborting due to too many resends (rcv=%d)", 2179 call->pkt_rcv_count); 2180 rxrpc_call_abort(call, 2181 call->pkt_rcv_count > 0 ? -EIO : -ETIMEDOUT); 2182 _leave(""); 2183 return; 2184 } 2185 2186 spin_lock(&call->lock); 2187 call->snd_resend_cnt++; 2188 for (;;) { 2189 /* determine which the next packet we might need to ACK is */ 2190 if (seq <= call->acks_dftv_seq) 2191 seq = call->acks_dftv_seq; 2192 seq++; 2193 2194 if (seq > highest) 2195 break; 2196 2197 /* look for the packet in the pending-ACK queue */ 2198 list_for_each(_p, &call->acks_pendq) { 2199 msg = list_entry(_p, struct rxrpc_message, link); 2200 if (msg->seq == seq) 2201 goto found_msg; 2202 } 2203 2204 panic("%s(%p,%d):" 2205 " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n", 2206 __FUNCTION__, call, highest, 2207 call->acks_dftv_seq, call->snd_seq_count, seq); 2208 2209 found_msg: 2210 if (msg->state != RXRPC_MSG_SENT) 2211 continue; /* only un-ACK'd packets */ 2212 2213 rxrpc_get_message(msg); 2214 spin_unlock(&call->lock); 2215 2216 /* send each message again (and ignore any errors we might 2217 * incur) */ 2218 _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }", 2219 msg->dsize, msg->dcount, msg->dfree); 2220 2221 if (rxrpc_conn_sendmsg(call->conn, msg) == 0) 2222 call->pkt_snd_count++; 2223 2224 rxrpc_put_message(msg); 2225 2226 spin_lock(&call->lock); 2227 } 2228 2229 /* reset the timeout */ 2230 mod_timer(&call->acks_timeout, 2231 __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout)); 2232 2233 spin_unlock(&call->lock); 2234 2235 _leave(""); 2236} /* end rxrpc_call_resend() */ 2237 2238/*****************************************************************************/ 2239/* 2240 * handle an ICMP error being applied to a call 2241 */ 2242void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno) 2243{ 2244 _enter("%p{%u},%d", call, ntohl(call->call_id), errno); 2245 2246 /* if this call is already aborted, then just wake up any waiters */ 2247 if (call->app_call_state == RXRPC_CSTATE_ERROR) { 2248 call->app_error_func(call); 2249 } 2250 else { 2251 /* tell the app layer what happened */ 2252 spin_lock(&call->lock); 2253 call->app_call_state = RXRPC_CSTATE_ERROR; 2254 _state(call); 2255 if (local) 2256 call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR; 2257 else 2258 call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR; 2259 call->app_errno = errno; 2260 call->app_mark = RXRPC_APP_MARK_EOF; 2261 call->app_read_buf = NULL; 2262 call->app_async_read = 0; 2263 2264 /* map the error */ 2265 call->app_aemap_func(call); 2266 2267 del_timer_sync(&call->acks_timeout); 2268 del_timer_sync(&call->rcv_timeout); 2269 del_timer_sync(&call->ackr_dfr_timo); 2270 2271 spin_unlock(&call->lock); 2272 2273 call->app_error_func(call); 2274 } 2275 2276 _leave(""); 2277} /* end rxrpc_call_handle_error() */