Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v3.13-rc3 442 lines 11 kB view raw
1/* RxRPC recvmsg() implementation 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12#include <linux/net.h> 13#include <linux/skbuff.h> 14#include <linux/export.h> 15#include <net/sock.h> 16#include <net/af_rxrpc.h> 17#include "ar-internal.h" 18 19/* 20 * removal a call's user ID from the socket tree to make the user ID available 21 * again and so that it won't be seen again in association with that call 22 */ 23void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call) 24{ 25 _debug("RELEASE CALL %d", call->debug_id); 26 27 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 28 write_lock_bh(&rx->call_lock); 29 rb_erase(&call->sock_node, &call->socket->calls); 30 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags); 31 write_unlock_bh(&rx->call_lock); 32 } 33 34 read_lock_bh(&call->state_lock); 35 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 36 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 37 rxrpc_queue_call(call); 38 read_unlock_bh(&call->state_lock); 39} 40 41/* 42 * receive a message from an RxRPC socket 43 * - we need to be careful about two or more threads calling recvmsg 44 * simultaneously 45 */ 46int rxrpc_recvmsg(struct kiocb *iocb, struct socket *sock, 47 struct msghdr *msg, size_t len, int flags) 48{ 49 struct rxrpc_skb_priv *sp; 50 struct rxrpc_call *call = NULL, *continue_call = NULL; 51 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 52 struct sk_buff *skb; 53 long timeo; 54 int copy, ret, ullen, offset, copied = 0; 55 u32 abort_code; 56 57 DEFINE_WAIT(wait); 58 59 _enter(",,,%zu,%d", len, flags); 60 61 if (flags & (MSG_OOB | MSG_TRUNC)) 62 return -EOPNOTSUPP; 63 64 ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long); 65 66 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 67 msg->msg_flags |= MSG_MORE; 68 69 lock_sock(&rx->sk); 70 71 for (;;) { 72 /* return immediately if a client socket has no outstanding 73 * calls */ 74 if (RB_EMPTY_ROOT(&rx->calls)) { 75 if (copied) 76 goto out; 77 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 78 release_sock(&rx->sk); 79 if (continue_call) 80 rxrpc_put_call(continue_call); 81 return -ENODATA; 82 } 83 } 84 85 /* get the next message on the Rx queue */ 86 skb = skb_peek(&rx->sk.sk_receive_queue); 87 if (!skb) { 88 /* nothing remains on the queue */ 89 if (copied && 90 (msg->msg_flags & MSG_PEEK || timeo == 0)) 91 goto out; 92 93 /* wait for a message to turn up */ 94 release_sock(&rx->sk); 95 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 96 TASK_INTERRUPTIBLE); 97 ret = sock_error(&rx->sk); 98 if (ret) 99 goto wait_error; 100 101 if (skb_queue_empty(&rx->sk.sk_receive_queue)) { 102 if (signal_pending(current)) 103 goto wait_interrupted; 104 timeo = schedule_timeout(timeo); 105 } 106 finish_wait(sk_sleep(&rx->sk), &wait); 107 lock_sock(&rx->sk); 108 continue; 109 } 110 111 peek_next_packet: 112 sp = rxrpc_skb(skb); 113 call = sp->call; 114 ASSERT(call != NULL); 115 116 _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]); 117 118 /* make sure we wait for the state to be updated in this call */ 119 spin_lock_bh(&call->lock); 120 spin_unlock_bh(&call->lock); 121 122 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 123 _debug("packet from released call"); 124 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 125 BUG(); 126 rxrpc_free_skb(skb); 127 continue; 128 } 129 130 /* determine whether to continue last data receive */ 131 if (continue_call) { 132 _debug("maybe cont"); 133 if (call != continue_call || 134 skb->mark != RXRPC_SKB_MARK_DATA) { 135 release_sock(&rx->sk); 136 rxrpc_put_call(continue_call); 137 _leave(" = %d [noncont]", copied); 138 return copied; 139 } 140 } 141 142 rxrpc_get_call(call); 143 144 /* copy the peer address and timestamp */ 145 if (!continue_call) { 146 if (msg->msg_name) { 147 size_t len = 148 sizeof(call->conn->trans->peer->srx); 149 memcpy(msg->msg_name, 150 &call->conn->trans->peer->srx, len); 151 msg->msg_namelen = len; 152 } 153 sock_recv_ts_and_drops(msg, &rx->sk, skb); 154 } 155 156 /* receive the message */ 157 if (skb->mark != RXRPC_SKB_MARK_DATA) 158 goto receive_non_data_message; 159 160 _debug("recvmsg DATA #%u { %d, %d }", 161 ntohl(sp->hdr.seq), skb->len, sp->offset); 162 163 if (!continue_call) { 164 /* only set the control data once per recvmsg() */ 165 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 166 ullen, &call->user_call_ID); 167 if (ret < 0) 168 goto copy_error; 169 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 170 } 171 172 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv); 173 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1); 174 call->rx_data_recv = ntohl(sp->hdr.seq); 175 176 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten); 177 178 offset = sp->offset; 179 copy = skb->len - offset; 180 if (copy > len - copied) 181 copy = len - copied; 182 183 if (skb->ip_summed == CHECKSUM_UNNECESSARY) { 184 ret = skb_copy_datagram_iovec(skb, offset, 185 msg->msg_iov, copy); 186 } else { 187 ret = skb_copy_and_csum_datagram_iovec(skb, offset, 188 msg->msg_iov); 189 if (ret == -EINVAL) 190 goto csum_copy_error; 191 } 192 193 if (ret < 0) 194 goto copy_error; 195 196 /* handle piecemeal consumption of data packets */ 197 _debug("copied %d+%d", copy, copied); 198 199 offset += copy; 200 copied += copy; 201 202 if (!(flags & MSG_PEEK)) 203 sp->offset = offset; 204 205 if (sp->offset < skb->len) { 206 _debug("buffer full"); 207 ASSERTCMP(copied, ==, len); 208 break; 209 } 210 211 /* we transferred the whole data packet */ 212 if (sp->hdr.flags & RXRPC_LAST_PACKET) { 213 _debug("last"); 214 if (call->conn->out_clientflag) { 215 /* last byte of reply received */ 216 ret = copied; 217 goto terminal_message; 218 } 219 220 /* last bit of request received */ 221 if (!(flags & MSG_PEEK)) { 222 _debug("eat packet"); 223 if (skb_dequeue(&rx->sk.sk_receive_queue) != 224 skb) 225 BUG(); 226 rxrpc_free_skb(skb); 227 } 228 msg->msg_flags &= ~MSG_MORE; 229 break; 230 } 231 232 /* move on to the next data message */ 233 _debug("next"); 234 if (!continue_call) 235 continue_call = sp->call; 236 else 237 rxrpc_put_call(call); 238 call = NULL; 239 240 if (flags & MSG_PEEK) { 241 _debug("peek next"); 242 skb = skb->next; 243 if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue) 244 break; 245 goto peek_next_packet; 246 } 247 248 _debug("eat packet"); 249 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 250 BUG(); 251 rxrpc_free_skb(skb); 252 } 253 254 /* end of non-terminal data packet reception for the moment */ 255 _debug("end rcv data"); 256out: 257 release_sock(&rx->sk); 258 if (call) 259 rxrpc_put_call(call); 260 if (continue_call) 261 rxrpc_put_call(continue_call); 262 _leave(" = %d [data]", copied); 263 return copied; 264 265 /* handle non-DATA messages such as aborts, incoming connections and 266 * final ACKs */ 267receive_non_data_message: 268 _debug("non-data"); 269 270 if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) { 271 _debug("RECV NEW CALL"); 272 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code); 273 if (ret < 0) 274 goto copy_error; 275 if (!(flags & MSG_PEEK)) { 276 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 277 BUG(); 278 rxrpc_free_skb(skb); 279 } 280 goto out; 281 } 282 283 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 284 ullen, &call->user_call_ID); 285 if (ret < 0) 286 goto copy_error; 287 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 288 289 switch (skb->mark) { 290 case RXRPC_SKB_MARK_DATA: 291 BUG(); 292 case RXRPC_SKB_MARK_FINAL_ACK: 293 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code); 294 break; 295 case RXRPC_SKB_MARK_BUSY: 296 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code); 297 break; 298 case RXRPC_SKB_MARK_REMOTE_ABORT: 299 abort_code = call->abort_code; 300 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); 301 break; 302 case RXRPC_SKB_MARK_NET_ERROR: 303 _debug("RECV NET ERROR %d", sp->error); 304 abort_code = sp->error; 305 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code); 306 break; 307 case RXRPC_SKB_MARK_LOCAL_ERROR: 308 _debug("RECV LOCAL ERROR %d", sp->error); 309 abort_code = sp->error; 310 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, 311 &abort_code); 312 break; 313 default: 314 BUG(); 315 break; 316 } 317 318 if (ret < 0) 319 goto copy_error; 320 321terminal_message: 322 _debug("terminal"); 323 msg->msg_flags &= ~MSG_MORE; 324 msg->msg_flags |= MSG_EOR; 325 326 if (!(flags & MSG_PEEK)) { 327 _net("free terminal skb %p", skb); 328 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 329 BUG(); 330 rxrpc_free_skb(skb); 331 rxrpc_remove_user_ID(rx, call); 332 } 333 334 release_sock(&rx->sk); 335 rxrpc_put_call(call); 336 if (continue_call) 337 rxrpc_put_call(continue_call); 338 _leave(" = %d", ret); 339 return ret; 340 341copy_error: 342 _debug("copy error"); 343 release_sock(&rx->sk); 344 rxrpc_put_call(call); 345 if (continue_call) 346 rxrpc_put_call(continue_call); 347 _leave(" = %d", ret); 348 return ret; 349 350csum_copy_error: 351 _debug("csum error"); 352 release_sock(&rx->sk); 353 if (continue_call) 354 rxrpc_put_call(continue_call); 355 rxrpc_kill_skb(skb); 356 skb_kill_datagram(&rx->sk, skb, flags); 357 rxrpc_put_call(call); 358 return -EAGAIN; 359 360wait_interrupted: 361 ret = sock_intr_errno(timeo); 362wait_error: 363 finish_wait(sk_sleep(&rx->sk), &wait); 364 if (continue_call) 365 rxrpc_put_call(continue_call); 366 if (copied) 367 copied = ret; 368 _leave(" = %d [waitfail %d]", copied, ret); 369 return copied; 370 371} 372 373/** 374 * rxrpc_kernel_data_delivered - Record delivery of data message 375 * @skb: Message holding data 376 * 377 * Record the delivery of a data message. This permits RxRPC to keep its 378 * tracking correct. The socket buffer will be deleted. 379 */ 380void rxrpc_kernel_data_delivered(struct sk_buff *skb) 381{ 382 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 383 struct rxrpc_call *call = sp->call; 384 385 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv); 386 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1); 387 call->rx_data_recv = ntohl(sp->hdr.seq); 388 389 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten); 390 rxrpc_free_skb(skb); 391} 392 393EXPORT_SYMBOL(rxrpc_kernel_data_delivered); 394 395/** 396 * rxrpc_kernel_is_data_last - Determine if data message is last one 397 * @skb: Message holding data 398 * 399 * Determine if data message is last one for the parent call. 400 */ 401bool rxrpc_kernel_is_data_last(struct sk_buff *skb) 402{ 403 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 404 405 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA); 406 407 return sp->hdr.flags & RXRPC_LAST_PACKET; 408} 409 410EXPORT_SYMBOL(rxrpc_kernel_is_data_last); 411 412/** 413 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message 414 * @skb: Message indicating an abort 415 * 416 * Get the abort code from an RxRPC abort message. 417 */ 418u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) 419{ 420 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 421 422 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT); 423 424 return sp->call->abort_code; 425} 426 427EXPORT_SYMBOL(rxrpc_kernel_get_abort_code); 428 429/** 430 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message 431 * @skb: Message indicating an error 432 * 433 * Get the error number from an RxRPC error message. 434 */ 435int rxrpc_kernel_get_error_number(struct sk_buff *skb) 436{ 437 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 438 439 return sp->error; 440} 441 442EXPORT_SYMBOL(rxrpc_kernel_get_error_number);