Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v3.12-rc4 439 lines 11 kB view raw
1/* RxRPC recvmsg() implementation 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12#include <linux/net.h> 13#include <linux/skbuff.h> 14#include <linux/export.h> 15#include <net/sock.h> 16#include <net/af_rxrpc.h> 17#include "ar-internal.h" 18 19/* 20 * removal a call's user ID from the socket tree to make the user ID available 21 * again and so that it won't be seen again in association with that call 22 */ 23void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call) 24{ 25 _debug("RELEASE CALL %d", call->debug_id); 26 27 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 28 write_lock_bh(&rx->call_lock); 29 rb_erase(&call->sock_node, &call->socket->calls); 30 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags); 31 write_unlock_bh(&rx->call_lock); 32 } 33 34 read_lock_bh(&call->state_lock); 35 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 36 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 37 rxrpc_queue_call(call); 38 read_unlock_bh(&call->state_lock); 39} 40 41/* 42 * receive a message from an RxRPC socket 43 * - we need to be careful about two or more threads calling recvmsg 44 * simultaneously 45 */ 46int rxrpc_recvmsg(struct kiocb *iocb, struct socket *sock, 47 struct msghdr *msg, size_t len, int flags) 48{ 49 struct rxrpc_skb_priv *sp; 50 struct rxrpc_call *call = NULL, *continue_call = NULL; 51 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 52 struct sk_buff *skb; 53 long timeo; 54 int copy, ret, ullen, offset, copied = 0; 55 u32 abort_code; 56 57 DEFINE_WAIT(wait); 58 59 _enter(",,,%zu,%d", len, flags); 60 61 if (flags & (MSG_OOB | MSG_TRUNC)) 62 return -EOPNOTSUPP; 63 64 ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long); 65 66 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 67 msg->msg_flags |= MSG_MORE; 68 69 lock_sock(&rx->sk); 70 71 for (;;) { 72 /* return immediately if a client socket has no outstanding 73 * calls */ 74 if (RB_EMPTY_ROOT(&rx->calls)) { 75 if (copied) 76 goto out; 77 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 78 release_sock(&rx->sk); 79 if (continue_call) 80 rxrpc_put_call(continue_call); 81 return -ENODATA; 82 } 83 } 84 85 /* get the next message on the Rx queue */ 86 skb = skb_peek(&rx->sk.sk_receive_queue); 87 if (!skb) { 88 /* nothing remains on the queue */ 89 if (copied && 90 (msg->msg_flags & MSG_PEEK || timeo == 0)) 91 goto out; 92 93 /* wait for a message to turn up */ 94 release_sock(&rx->sk); 95 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 96 TASK_INTERRUPTIBLE); 97 ret = sock_error(&rx->sk); 98 if (ret) 99 goto wait_error; 100 101 if (skb_queue_empty(&rx->sk.sk_receive_queue)) { 102 if (signal_pending(current)) 103 goto wait_interrupted; 104 timeo = schedule_timeout(timeo); 105 } 106 finish_wait(sk_sleep(&rx->sk), &wait); 107 lock_sock(&rx->sk); 108 continue; 109 } 110 111 peek_next_packet: 112 sp = rxrpc_skb(skb); 113 call = sp->call; 114 ASSERT(call != NULL); 115 116 _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]); 117 118 /* make sure we wait for the state to be updated in this call */ 119 spin_lock_bh(&call->lock); 120 spin_unlock_bh(&call->lock); 121 122 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 123 _debug("packet from released call"); 124 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 125 BUG(); 126 rxrpc_free_skb(skb); 127 continue; 128 } 129 130 /* determine whether to continue last data receive */ 131 if (continue_call) { 132 _debug("maybe cont"); 133 if (call != continue_call || 134 skb->mark != RXRPC_SKB_MARK_DATA) { 135 release_sock(&rx->sk); 136 rxrpc_put_call(continue_call); 137 _leave(" = %d [noncont]", copied); 138 return copied; 139 } 140 } 141 142 rxrpc_get_call(call); 143 144 /* copy the peer address and timestamp */ 145 if (!continue_call) { 146 if (msg->msg_name && msg->msg_namelen > 0) 147 memcpy(msg->msg_name, 148 &call->conn->trans->peer->srx, 149 sizeof(call->conn->trans->peer->srx)); 150 sock_recv_ts_and_drops(msg, &rx->sk, skb); 151 } 152 153 /* receive the message */ 154 if (skb->mark != RXRPC_SKB_MARK_DATA) 155 goto receive_non_data_message; 156 157 _debug("recvmsg DATA #%u { %d, %d }", 158 ntohl(sp->hdr.seq), skb->len, sp->offset); 159 160 if (!continue_call) { 161 /* only set the control data once per recvmsg() */ 162 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 163 ullen, &call->user_call_ID); 164 if (ret < 0) 165 goto copy_error; 166 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 167 } 168 169 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv); 170 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1); 171 call->rx_data_recv = ntohl(sp->hdr.seq); 172 173 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten); 174 175 offset = sp->offset; 176 copy = skb->len - offset; 177 if (copy > len - copied) 178 copy = len - copied; 179 180 if (skb->ip_summed == CHECKSUM_UNNECESSARY) { 181 ret = skb_copy_datagram_iovec(skb, offset, 182 msg->msg_iov, copy); 183 } else { 184 ret = skb_copy_and_csum_datagram_iovec(skb, offset, 185 msg->msg_iov); 186 if (ret == -EINVAL) 187 goto csum_copy_error; 188 } 189 190 if (ret < 0) 191 goto copy_error; 192 193 /* handle piecemeal consumption of data packets */ 194 _debug("copied %d+%d", copy, copied); 195 196 offset += copy; 197 copied += copy; 198 199 if (!(flags & MSG_PEEK)) 200 sp->offset = offset; 201 202 if (sp->offset < skb->len) { 203 _debug("buffer full"); 204 ASSERTCMP(copied, ==, len); 205 break; 206 } 207 208 /* we transferred the whole data packet */ 209 if (sp->hdr.flags & RXRPC_LAST_PACKET) { 210 _debug("last"); 211 if (call->conn->out_clientflag) { 212 /* last byte of reply received */ 213 ret = copied; 214 goto terminal_message; 215 } 216 217 /* last bit of request received */ 218 if (!(flags & MSG_PEEK)) { 219 _debug("eat packet"); 220 if (skb_dequeue(&rx->sk.sk_receive_queue) != 221 skb) 222 BUG(); 223 rxrpc_free_skb(skb); 224 } 225 msg->msg_flags &= ~MSG_MORE; 226 break; 227 } 228 229 /* move on to the next data message */ 230 _debug("next"); 231 if (!continue_call) 232 continue_call = sp->call; 233 else 234 rxrpc_put_call(call); 235 call = NULL; 236 237 if (flags & MSG_PEEK) { 238 _debug("peek next"); 239 skb = skb->next; 240 if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue) 241 break; 242 goto peek_next_packet; 243 } 244 245 _debug("eat packet"); 246 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 247 BUG(); 248 rxrpc_free_skb(skb); 249 } 250 251 /* end of non-terminal data packet reception for the moment */ 252 _debug("end rcv data"); 253out: 254 release_sock(&rx->sk); 255 if (call) 256 rxrpc_put_call(call); 257 if (continue_call) 258 rxrpc_put_call(continue_call); 259 _leave(" = %d [data]", copied); 260 return copied; 261 262 /* handle non-DATA messages such as aborts, incoming connections and 263 * final ACKs */ 264receive_non_data_message: 265 _debug("non-data"); 266 267 if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) { 268 _debug("RECV NEW CALL"); 269 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code); 270 if (ret < 0) 271 goto copy_error; 272 if (!(flags & MSG_PEEK)) { 273 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 274 BUG(); 275 rxrpc_free_skb(skb); 276 } 277 goto out; 278 } 279 280 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 281 ullen, &call->user_call_ID); 282 if (ret < 0) 283 goto copy_error; 284 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 285 286 switch (skb->mark) { 287 case RXRPC_SKB_MARK_DATA: 288 BUG(); 289 case RXRPC_SKB_MARK_FINAL_ACK: 290 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code); 291 break; 292 case RXRPC_SKB_MARK_BUSY: 293 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code); 294 break; 295 case RXRPC_SKB_MARK_REMOTE_ABORT: 296 abort_code = call->abort_code; 297 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); 298 break; 299 case RXRPC_SKB_MARK_NET_ERROR: 300 _debug("RECV NET ERROR %d", sp->error); 301 abort_code = sp->error; 302 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code); 303 break; 304 case RXRPC_SKB_MARK_LOCAL_ERROR: 305 _debug("RECV LOCAL ERROR %d", sp->error); 306 abort_code = sp->error; 307 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, 308 &abort_code); 309 break; 310 default: 311 BUG(); 312 break; 313 } 314 315 if (ret < 0) 316 goto copy_error; 317 318terminal_message: 319 _debug("terminal"); 320 msg->msg_flags &= ~MSG_MORE; 321 msg->msg_flags |= MSG_EOR; 322 323 if (!(flags & MSG_PEEK)) { 324 _net("free terminal skb %p", skb); 325 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 326 BUG(); 327 rxrpc_free_skb(skb); 328 rxrpc_remove_user_ID(rx, call); 329 } 330 331 release_sock(&rx->sk); 332 rxrpc_put_call(call); 333 if (continue_call) 334 rxrpc_put_call(continue_call); 335 _leave(" = %d", ret); 336 return ret; 337 338copy_error: 339 _debug("copy error"); 340 release_sock(&rx->sk); 341 rxrpc_put_call(call); 342 if (continue_call) 343 rxrpc_put_call(continue_call); 344 _leave(" = %d", ret); 345 return ret; 346 347csum_copy_error: 348 _debug("csum error"); 349 release_sock(&rx->sk); 350 if (continue_call) 351 rxrpc_put_call(continue_call); 352 rxrpc_kill_skb(skb); 353 skb_kill_datagram(&rx->sk, skb, flags); 354 rxrpc_put_call(call); 355 return -EAGAIN; 356 357wait_interrupted: 358 ret = sock_intr_errno(timeo); 359wait_error: 360 finish_wait(sk_sleep(&rx->sk), &wait); 361 if (continue_call) 362 rxrpc_put_call(continue_call); 363 if (copied) 364 copied = ret; 365 _leave(" = %d [waitfail %d]", copied, ret); 366 return copied; 367 368} 369 370/** 371 * rxrpc_kernel_data_delivered - Record delivery of data message 372 * @skb: Message holding data 373 * 374 * Record the delivery of a data message. This permits RxRPC to keep its 375 * tracking correct. The socket buffer will be deleted. 376 */ 377void rxrpc_kernel_data_delivered(struct sk_buff *skb) 378{ 379 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 380 struct rxrpc_call *call = sp->call; 381 382 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv); 383 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1); 384 call->rx_data_recv = ntohl(sp->hdr.seq); 385 386 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten); 387 rxrpc_free_skb(skb); 388} 389 390EXPORT_SYMBOL(rxrpc_kernel_data_delivered); 391 392/** 393 * rxrpc_kernel_is_data_last - Determine if data message is last one 394 * @skb: Message holding data 395 * 396 * Determine if data message is last one for the parent call. 397 */ 398bool rxrpc_kernel_is_data_last(struct sk_buff *skb) 399{ 400 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 401 402 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA); 403 404 return sp->hdr.flags & RXRPC_LAST_PACKET; 405} 406 407EXPORT_SYMBOL(rxrpc_kernel_is_data_last); 408 409/** 410 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message 411 * @skb: Message indicating an abort 412 * 413 * Get the abort code from an RxRPC abort message. 414 */ 415u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) 416{ 417 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 418 419 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT); 420 421 return sp->call->abort_code; 422} 423 424EXPORT_SYMBOL(rxrpc_kernel_get_abort_code); 425 426/** 427 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message 428 * @skb: Message indicating an error 429 * 430 * Get the error number from an RxRPC error message. 431 */ 432int rxrpc_kernel_get_error_number(struct sk_buff *skb) 433{ 434 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 435 436 return sp->error; 437} 438 439EXPORT_SYMBOL(rxrpc_kernel_get_error_number);