Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1// SPDX-License-Identifier: GPL-2.0-or-later
2/* AF_RXRPC sendmsg() implementation.
3 *
4 * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
6 */
7
8#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10#include <linux/net.h>
11#include <linux/gfp.h>
12#include <linux/skbuff.h>
13#include <linux/export.h>
14#include <linux/sched/signal.h>
15
16#include <net/sock.h>
17#include <net/af_rxrpc.h>
18#include "ar-internal.h"
19
20/*
21 * Propose an abort to be made in the I/O thread.
22 */
23bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error,
24 enum rxrpc_abort_reason why)
25{
26 _enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
27
28 if (!call->send_abort && !rxrpc_call_is_complete(call)) {
29 call->send_abort_why = why;
30 call->send_abort_err = error;
31 call->send_abort_seq = 0;
32 trace_rxrpc_abort_call(call, abort_code);
33 /* Request abort locklessly vs rxrpc_input_call_event(). */
34 smp_store_release(&call->send_abort, abort_code);
35 rxrpc_poke_call(call, rxrpc_call_poke_abort);
36 return true;
37 }
38
39 return false;
40}
41
42/*
43 * Wait for a call to become connected. Interruption here doesn't cause the
44 * call to be aborted.
45 */
46static int rxrpc_wait_to_be_connected(struct rxrpc_call *call, long *timeo)
47{
48 DECLARE_WAITQUEUE(myself, current);
49 int ret = 0;
50
51 _enter("%d", call->debug_id);
52
53 if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN)
54 goto no_wait;
55
56 add_wait_queue_exclusive(&call->waitq, &myself);
57
58 for (;;) {
59 switch (call->interruptibility) {
60 case RXRPC_INTERRUPTIBLE:
61 case RXRPC_PREINTERRUPTIBLE:
62 set_current_state(TASK_INTERRUPTIBLE);
63 break;
64 case RXRPC_UNINTERRUPTIBLE:
65 default:
66 set_current_state(TASK_UNINTERRUPTIBLE);
67 break;
68 }
69
70 if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN)
71 break;
72 if ((call->interruptibility == RXRPC_INTERRUPTIBLE ||
73 call->interruptibility == RXRPC_PREINTERRUPTIBLE) &&
74 signal_pending(current)) {
75 ret = sock_intr_errno(*timeo);
76 break;
77 }
78 *timeo = schedule_timeout(*timeo);
79 }
80
81 remove_wait_queue(&call->waitq, &myself);
82 __set_current_state(TASK_RUNNING);
83
84no_wait:
85 if (ret == 0 && rxrpc_call_is_complete(call))
86 ret = call->error;
87
88 _leave(" = %d", ret);
89 return ret;
90}
91
92/*
93 * Return true if there's sufficient Tx queue space.
94 */
95static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
96{
97 rxrpc_seq_t tx_bottom = READ_ONCE(call->tx_bottom);
98
99 if (_tx_win)
100 *_tx_win = tx_bottom;
101 return call->send_top - tx_bottom < 256;
102}
103
104/*
105 * Wait for space to appear in the Tx queue or a signal to occur.
106 */
107static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
108 struct rxrpc_call *call,
109 long *timeo)
110{
111 for (;;) {
112 set_current_state(TASK_INTERRUPTIBLE);
113 if (rxrpc_check_tx_space(call, NULL))
114 return 0;
115
116 if (rxrpc_call_is_complete(call))
117 return call->error;
118
119 if (signal_pending(current))
120 return sock_intr_errno(*timeo);
121
122 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
123 *timeo = schedule_timeout(*timeo);
124 }
125}
126
127/*
128 * Wait for space to appear in the Tx queue uninterruptibly, but with
129 * a timeout of 2*RTT if no progress was made and a signal occurred.
130 */
131static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
132 struct rxrpc_call *call)
133{
134 rxrpc_seq_t tx_start, tx_win;
135 signed long rtt, timeout;
136
137 rtt = READ_ONCE(call->srtt_us) >> 3;
138 rtt = usecs_to_jiffies(rtt) * 2;
139 if (rtt < 2)
140 rtt = 2;
141
142 timeout = rtt;
143 tx_start = READ_ONCE(call->tx_bottom);
144
145 for (;;) {
146 set_current_state(TASK_UNINTERRUPTIBLE);
147
148 if (rxrpc_check_tx_space(call, &tx_win))
149 return 0;
150
151 if (rxrpc_call_is_complete(call))
152 return call->error;
153
154 if (timeout == 0 &&
155 tx_win == tx_start && signal_pending(current))
156 return -EINTR;
157
158 if (tx_win != tx_start) {
159 timeout = rtt;
160 tx_start = tx_win;
161 }
162
163 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
164 timeout = schedule_timeout(timeout);
165 }
166}
167
168/*
169 * Wait for space to appear in the Tx queue uninterruptibly.
170 */
171static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
172 struct rxrpc_call *call,
173 long *timeo)
174{
175 for (;;) {
176 set_current_state(TASK_UNINTERRUPTIBLE);
177 if (rxrpc_check_tx_space(call, NULL))
178 return 0;
179
180 if (rxrpc_call_is_complete(call))
181 return call->error;
182
183 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
184 *timeo = schedule_timeout(*timeo);
185 }
186}
187
188/*
189 * wait for space to appear in the transmit/ACK window
190 * - caller holds the socket locked
191 */
192static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
193 struct rxrpc_call *call,
194 long *timeo,
195 bool waitall)
196{
197 DECLARE_WAITQUEUE(myself, current);
198 int ret;
199
200 _enter(",{%u,%u,%u}",
201 call->tx_bottom, call->tx_top, call->tx_winsize);
202
203 add_wait_queue(&call->waitq, &myself);
204
205 switch (call->interruptibility) {
206 case RXRPC_INTERRUPTIBLE:
207 if (waitall)
208 ret = rxrpc_wait_for_tx_window_waitall(rx, call);
209 else
210 ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
211 break;
212 case RXRPC_PREINTERRUPTIBLE:
213 case RXRPC_UNINTERRUPTIBLE:
214 default:
215 ret = rxrpc_wait_for_tx_window_nonintr(rx, call, timeo);
216 break;
217 }
218
219 remove_wait_queue(&call->waitq, &myself);
220 set_current_state(TASK_RUNNING);
221 _leave(" = %d", ret);
222 return ret;
223}
224
225/*
226 * Notify the owner of the call that the transmit phase is ended and the last
227 * packet has been queued.
228 */
229static void rxrpc_notify_end_tx(struct rxrpc_sock *rx, struct rxrpc_call *call,
230 rxrpc_notify_end_tx_t notify_end_tx)
231{
232 if (notify_end_tx)
233 notify_end_tx(&rx->sk, call, call->user_call_ID);
234}
235
236/*
237 * Queue a DATA packet for transmission, set the resend timeout and send
238 * the packet immediately. Returns the error from rxrpc_send_data_packet()
239 * in case the caller wants to do something with it.
240 */
241static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
242 struct rxrpc_txbuf *txb,
243 rxrpc_notify_end_tx_t notify_end_tx)
244{
245 struct rxrpc_txqueue *sq = call->send_queue;
246 rxrpc_seq_t seq = txb->seq;
247 bool poke, last = txb->flags & RXRPC_LAST_PACKET;
248 int ix = seq & RXRPC_TXQ_MASK;
249 rxrpc_inc_stat(call->rxnet, stat_tx_data);
250
251 ASSERTCMP(txb->seq, ==, call->send_top + 1);
252
253 if (last)
254 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
255 else
256 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
257
258 if (WARN_ON_ONCE(sq->bufs[ix]))
259 trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue_dup);
260 else
261 trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue);
262
263 /* Add the packet to the call's output buffer */
264 poke = (READ_ONCE(call->tx_bottom) == call->send_top);
265 sq->bufs[ix] = txb;
266 /* Order send_top after the queue->next pointer and txb content. */
267 smp_store_release(&call->send_top, seq);
268 if (last) {
269 set_bit(RXRPC_CALL_TX_NO_MORE, &call->flags);
270 rxrpc_notify_end_tx(rx, call, notify_end_tx);
271 call->send_queue = NULL;
272 }
273
274 if (poke)
275 rxrpc_poke_call(call, rxrpc_call_poke_start);
276}
277
278/*
279 * Allocate a new txqueue unit and add it to the transmission queue.
280 */
281static int rxrpc_alloc_txqueue(struct sock *sk, struct rxrpc_call *call)
282{
283 struct rxrpc_txqueue *tq;
284
285 tq = kzalloc(sizeof(*tq), sk->sk_allocation);
286 if (!tq)
287 return -ENOMEM;
288
289 tq->xmit_ts_base = KTIME_MIN;
290 for (int i = 0; i < RXRPC_NR_TXQUEUE; i++)
291 tq->segment_xmit_ts[i] = UINT_MAX;
292
293 if (call->send_queue) {
294 tq->qbase = call->send_top + 1;
295 call->send_queue->next = tq;
296 call->send_queue = tq;
297 } else if (WARN_ON(call->tx_queue)) {
298 kfree(tq);
299 return -ENOMEM;
300 } else {
301 /* We start at seq 1, so pretend seq 0 is hard-acked. */
302 tq->nr_reported_acks = 1;
303 tq->segment_acked = 1UL;
304 tq->qbase = 0;
305 call->tx_qbase = 0;
306 call->send_queue = tq;
307 call->tx_qtail = tq;
308 call->tx_queue = tq;
309 }
310
311 trace_rxrpc_tq(call, tq, call->send_top, rxrpc_tq_alloc);
312 return 0;
313}
314
315/*
316 * send data through a socket
317 * - must be called in process context
318 * - The caller holds the call user access mutex, but not the socket lock.
319 */
320static int rxrpc_send_data(struct rxrpc_sock *rx,
321 struct rxrpc_call *call,
322 struct msghdr *msg, size_t len,
323 rxrpc_notify_end_tx_t notify_end_tx,
324 bool *_dropped_lock)
325{
326 struct rxrpc_txbuf *txb;
327 struct sock *sk = &rx->sk;
328 enum rxrpc_call_state state;
329 long timeo;
330 bool more = msg->msg_flags & MSG_MORE;
331 int ret, copied = 0;
332
333 if (test_bit(RXRPC_CALL_TX_NO_MORE, &call->flags)) {
334 trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
335 call->cid, call->call_id, call->rx_consumed,
336 0, -EPROTO);
337 return -EPROTO;
338 }
339
340 timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
341
342 ret = rxrpc_wait_to_be_connected(call, &timeo);
343 if (ret < 0)
344 return ret;
345
346 if (call->conn->state == RXRPC_CONN_CLIENT_UNSECURED) {
347 ret = rxrpc_init_client_conn_security(call->conn);
348 if (ret < 0)
349 return ret;
350 }
351
352 /* this should be in poll */
353 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
354
355reload:
356 txb = call->tx_pending;
357 call->tx_pending = NULL;
358 if (txb)
359 rxrpc_see_txbuf(txb, rxrpc_txbuf_see_send_more);
360
361 ret = -EPIPE;
362 if (sk->sk_shutdown & SEND_SHUTDOWN)
363 goto maybe_error;
364 state = rxrpc_call_state(call);
365 ret = -ESHUTDOWN;
366 if (state >= RXRPC_CALL_COMPLETE)
367 goto maybe_error;
368 ret = -EPROTO;
369 if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
370 state != RXRPC_CALL_SERVER_ACK_REQUEST &&
371 state != RXRPC_CALL_SERVER_SEND_REPLY) {
372 /* Request phase complete for this client call */
373 trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
374 call->cid, call->call_id, call->rx_consumed,
375 0, -EPROTO);
376 goto maybe_error;
377 }
378
379 ret = -EMSGSIZE;
380 if (call->tx_total_len != -1) {
381 if (len - copied > call->tx_total_len)
382 goto maybe_error;
383 if (!more && len - copied != call->tx_total_len)
384 goto maybe_error;
385 }
386
387 do {
388 if (!txb) {
389 size_t remain;
390
391 _debug("alloc");
392
393 if (!rxrpc_check_tx_space(call, NULL))
394 goto wait_for_space;
395
396 /* See if we need to begin/extend the Tx queue. */
397 if (!call->send_queue || !((call->send_top + 1) & RXRPC_TXQ_MASK)) {
398 ret = rxrpc_alloc_txqueue(sk, call);
399 if (ret < 0)
400 goto maybe_error;
401 }
402
403 /* Work out the maximum size of a packet. Assume that
404 * the security header is going to be in the padded
405 * region (enc blocksize), but the trailer is not.
406 */
407 remain = more ? INT_MAX : msg_data_left(msg);
408 txb = call->conn->security->alloc_txbuf(call, remain, sk->sk_allocation);
409 if (!txb) {
410 ret = -ENOMEM;
411 goto maybe_error;
412 }
413 }
414
415 _debug("append");
416
417 /* append next segment of data to the current buffer */
418 if (msg_data_left(msg) > 0) {
419 size_t copy = umin(txb->space, msg_data_left(msg));
420
421 _debug("add %zu", copy);
422 if (!copy_from_iter_full(txb->data + txb->offset,
423 copy, &msg->msg_iter))
424 goto efault;
425 _debug("added");
426 txb->space -= copy;
427 txb->len += copy;
428 txb->offset += copy;
429 copied += copy;
430 if (call->tx_total_len != -1)
431 call->tx_total_len -= copy;
432 }
433
434 /* check for the far side aborting the call or a network error
435 * occurring */
436 if (rxrpc_call_is_complete(call))
437 goto call_terminated;
438
439 /* add the packet to the send queue if it's now full */
440 if (!txb->space ||
441 (msg_data_left(msg) == 0 && !more)) {
442 if (msg_data_left(msg) == 0 && !more)
443 txb->flags |= RXRPC_LAST_PACKET;
444
445 ret = call->security->secure_packet(call, txb);
446 if (ret < 0)
447 goto out;
448 rxrpc_queue_packet(rx, call, txb, notify_end_tx);
449 txb = NULL;
450 }
451 } while (msg_data_left(msg) > 0);
452
453success:
454 ret = copied;
455 if (rxrpc_call_is_complete(call) &&
456 call->error < 0)
457 ret = call->error;
458out:
459 call->tx_pending = txb;
460 _leave(" = %d", ret);
461 return ret;
462
463call_terminated:
464 rxrpc_put_txbuf(txb, rxrpc_txbuf_put_send_aborted);
465 _leave(" = %d", call->error);
466 return call->error;
467
468maybe_error:
469 if (copied)
470 goto success;
471 goto out;
472
473efault:
474 ret = -EFAULT;
475 goto out;
476
477wait_for_space:
478 ret = -EAGAIN;
479 if (msg->msg_flags & MSG_DONTWAIT)
480 goto maybe_error;
481 mutex_unlock(&call->user_mutex);
482 *_dropped_lock = true;
483 ret = rxrpc_wait_for_tx_window(rx, call, &timeo,
484 msg->msg_flags & MSG_WAITALL);
485 if (ret < 0)
486 goto maybe_error;
487 if (call->interruptibility == RXRPC_INTERRUPTIBLE) {
488 if (mutex_lock_interruptible(&call->user_mutex) < 0) {
489 ret = sock_intr_errno(timeo);
490 goto maybe_error;
491 }
492 } else {
493 mutex_lock(&call->user_mutex);
494 }
495 *_dropped_lock = false;
496 goto reload;
497}
498
499/*
500 * extract control messages from the sendmsg() control buffer
501 */
502static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p)
503{
504 struct cmsghdr *cmsg;
505 bool got_user_ID = false;
506 int len;
507
508 if (msg->msg_controllen == 0)
509 return -EINVAL;
510
511 for_each_cmsghdr(cmsg, msg) {
512 if (!CMSG_OK(msg, cmsg))
513 return -EINVAL;
514
515 len = cmsg->cmsg_len - sizeof(struct cmsghdr);
516 _debug("CMSG %d, %d, %d",
517 cmsg->cmsg_level, cmsg->cmsg_type, len);
518
519 if (cmsg->cmsg_level != SOL_RXRPC)
520 continue;
521
522 switch (cmsg->cmsg_type) {
523 case RXRPC_USER_CALL_ID:
524 if (msg->msg_flags & MSG_CMSG_COMPAT) {
525 if (len != sizeof(u32))
526 return -EINVAL;
527 p->call.user_call_ID = *(u32 *)CMSG_DATA(cmsg);
528 } else {
529 if (len != sizeof(unsigned long))
530 return -EINVAL;
531 p->call.user_call_ID = *(unsigned long *)
532 CMSG_DATA(cmsg);
533 }
534 got_user_ID = true;
535 break;
536
537 case RXRPC_ABORT:
538 if (p->command != RXRPC_CMD_SEND_DATA)
539 return -EINVAL;
540 p->command = RXRPC_CMD_SEND_ABORT;
541 if (len != sizeof(p->abort_code))
542 return -EINVAL;
543 p->abort_code = *(unsigned int *)CMSG_DATA(cmsg);
544 if (p->abort_code == 0)
545 return -EINVAL;
546 break;
547
548 case RXRPC_CHARGE_ACCEPT:
549 if (p->command != RXRPC_CMD_SEND_DATA)
550 return -EINVAL;
551 p->command = RXRPC_CMD_CHARGE_ACCEPT;
552 if (len != 0)
553 return -EINVAL;
554 break;
555
556 case RXRPC_EXCLUSIVE_CALL:
557 p->exclusive = true;
558 if (len != 0)
559 return -EINVAL;
560 break;
561
562 case RXRPC_UPGRADE_SERVICE:
563 p->upgrade = true;
564 if (len != 0)
565 return -EINVAL;
566 break;
567
568 case RXRPC_TX_LENGTH:
569 if (p->call.tx_total_len != -1 || len != sizeof(__s64))
570 return -EINVAL;
571 p->call.tx_total_len = *(__s64 *)CMSG_DATA(cmsg);
572 if (p->call.tx_total_len < 0)
573 return -EINVAL;
574 break;
575
576 case RXRPC_SET_CALL_TIMEOUT:
577 if (len & 3 || len < 4 || len > 12)
578 return -EINVAL;
579 memcpy(&p->call.timeouts, CMSG_DATA(cmsg), len);
580 p->call.nr_timeouts = len / 4;
581 if (p->call.timeouts.hard > INT_MAX / HZ)
582 return -ERANGE;
583 if (p->call.nr_timeouts >= 2 && p->call.timeouts.idle > 60 * 60 * 1000)
584 return -ERANGE;
585 if (p->call.nr_timeouts >= 3 && p->call.timeouts.normal > 60 * 60 * 1000)
586 return -ERANGE;
587 break;
588
589 default:
590 return -EINVAL;
591 }
592 }
593
594 if (!got_user_ID)
595 return -EINVAL;
596 if (p->call.tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA)
597 return -EINVAL;
598 _leave(" = 0");
599 return 0;
600}
601
602/*
603 * Create a new client call for sendmsg().
604 * - Called with the socket lock held, which it must release.
605 * - If it returns a call, the call's lock will need releasing by the caller.
606 */
607static struct rxrpc_call *
608rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
609 struct rxrpc_send_params *p)
610 __releases(&rx->sk.sk_lock)
611 __acquires(&call->user_mutex)
612{
613 struct rxrpc_conn_parameters cp;
614 struct rxrpc_peer *peer;
615 struct rxrpc_call *call;
616 struct key *key;
617
618 DECLARE_SOCKADDR(struct sockaddr_rxrpc *, srx, msg->msg_name);
619
620 _enter("");
621
622 if (!msg->msg_name) {
623 release_sock(&rx->sk);
624 return ERR_PTR(-EDESTADDRREQ);
625 }
626
627 peer = rxrpc_lookup_peer(rx->local, srx, GFP_KERNEL);
628 if (!peer) {
629 release_sock(&rx->sk);
630 return ERR_PTR(-ENOMEM);
631 }
632
633 key = rx->key;
634 if (key && !rx->key->payload.data[0])
635 key = NULL;
636
637 memset(&cp, 0, sizeof(cp));
638 cp.local = rx->local;
639 cp.peer = peer;
640 cp.key = rx->key;
641 cp.security_level = rx->min_sec_level;
642 cp.exclusive = rx->exclusive | p->exclusive;
643 cp.upgrade = p->upgrade;
644 cp.service_id = srx->srx_service;
645 call = rxrpc_new_client_call(rx, &cp, &p->call, GFP_KERNEL,
646 atomic_inc_return(&rxrpc_debug_id));
647 /* The socket is now unlocked */
648
649 rxrpc_put_peer(peer, rxrpc_peer_put_application);
650 _leave(" = %p\n", call);
651 return call;
652}
653
654/*
655 * send a message forming part of a client call through an RxRPC socket
656 * - caller holds the socket locked
657 * - the socket may be either a client socket or a server socket
658 */
659int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
660{
661 struct rxrpc_call *call;
662 bool dropped_lock = false;
663 int ret;
664
665 struct rxrpc_send_params p = {
666 .call.tx_total_len = -1,
667 .call.user_call_ID = 0,
668 .call.nr_timeouts = 0,
669 .call.interruptibility = RXRPC_INTERRUPTIBLE,
670 .abort_code = 0,
671 .command = RXRPC_CMD_SEND_DATA,
672 .exclusive = false,
673 .upgrade = false,
674 };
675
676 _enter("");
677
678 ret = rxrpc_sendmsg_cmsg(msg, &p);
679 if (ret < 0)
680 goto error_release_sock;
681
682 if (p.command == RXRPC_CMD_CHARGE_ACCEPT) {
683 ret = -EINVAL;
684 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
685 goto error_release_sock;
686 ret = rxrpc_user_charge_accept(rx, p.call.user_call_ID);
687 goto error_release_sock;
688 }
689
690 call = rxrpc_find_call_by_user_ID(rx, p.call.user_call_ID);
691 if (!call) {
692 ret = -EBADSLT;
693 if (p.command != RXRPC_CMD_SEND_DATA)
694 goto error_release_sock;
695 call = rxrpc_new_client_call_for_sendmsg(rx, msg, &p);
696 /* The socket is now unlocked... */
697 if (IS_ERR(call))
698 return PTR_ERR(call);
699 /* ... and we have the call lock. */
700 p.call.nr_timeouts = 0;
701 ret = 0;
702 if (rxrpc_call_is_complete(call))
703 goto out_put_unlock;
704 } else {
705 switch (rxrpc_call_state(call)) {
706 case RXRPC_CALL_CLIENT_AWAIT_CONN:
707 case RXRPC_CALL_SERVER_RECV_REQUEST:
708 if (p.command == RXRPC_CMD_SEND_ABORT)
709 break;
710 fallthrough;
711 case RXRPC_CALL_UNINITIALISED:
712 case RXRPC_CALL_SERVER_PREALLOC:
713 rxrpc_put_call(call, rxrpc_call_put_sendmsg);
714 ret = -EBUSY;
715 goto error_release_sock;
716 default:
717 break;
718 }
719
720 ret = mutex_lock_interruptible(&call->user_mutex);
721 release_sock(&rx->sk);
722 if (ret < 0) {
723 ret = -ERESTARTSYS;
724 goto error_put;
725 }
726
727 if (p.call.tx_total_len != -1) {
728 ret = -EINVAL;
729 if (call->tx_total_len != -1 ||
730 call->tx_pending ||
731 call->tx_top != 0)
732 goto out_put_unlock;
733 call->tx_total_len = p.call.tx_total_len;
734 }
735 }
736
737 switch (p.call.nr_timeouts) {
738 case 3:
739 WRITE_ONCE(call->next_rx_timo, p.call.timeouts.normal);
740 fallthrough;
741 case 2:
742 WRITE_ONCE(call->next_req_timo, p.call.timeouts.idle);
743 fallthrough;
744 case 1:
745 if (p.call.timeouts.hard > 0) {
746 ktime_t delay = ms_to_ktime(p.call.timeouts.hard * MSEC_PER_SEC);
747
748 WRITE_ONCE(call->expect_term_by,
749 ktime_add(p.call.timeouts.hard,
750 ktime_get_real()));
751 trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_hard);
752 rxrpc_poke_call(call, rxrpc_call_poke_set_timeout);
753
754 }
755 break;
756 }
757
758 if (rxrpc_call_is_complete(call)) {
759 /* it's too late for this call */
760 ret = -ESHUTDOWN;
761 goto out_put_unlock;
762 }
763
764 switch (p.command) {
765 case RXRPC_CMD_SEND_ABORT:
766 rxrpc_propose_abort(call, p.abort_code, -ECONNABORTED,
767 rxrpc_abort_call_sendmsg);
768 ret = 0;
769 break;
770 case RXRPC_CMD_SEND_DATA:
771 ret = rxrpc_send_data(rx, call, msg, len, NULL, &dropped_lock);
772 break;
773 default:
774 ret = -EINVAL;
775 break;
776 }
777
778out_put_unlock:
779 if (!dropped_lock)
780 mutex_unlock(&call->user_mutex);
781error_put:
782 rxrpc_put_call(call, rxrpc_call_put_sendmsg);
783 _leave(" = %d", ret);
784 return ret;
785
786error_release_sock:
787 release_sock(&rx->sk);
788 return ret;
789}
790
791/**
792 * rxrpc_kernel_send_data - Allow a kernel service to send data on a call
793 * @sock: The socket the call is on
794 * @call: The call to send data through
795 * @msg: The data to send
796 * @len: The amount of data to send
797 * @notify_end_tx: Notification that the last packet is queued.
798 *
799 * Allow a kernel service to send data on a call. The call must be in an state
800 * appropriate to sending data. No control data should be supplied in @msg,
801 * nor should an address be supplied. MSG_MORE should be flagged if there's
802 * more data to come, otherwise this data will end the transmission phase.
803 *
804 * Return: %0 if successful and a negative error code otherwise.
805 */
806int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
807 struct msghdr *msg, size_t len,
808 rxrpc_notify_end_tx_t notify_end_tx)
809{
810 bool dropped_lock = false;
811 int ret;
812
813 _enter("{%d},", call->debug_id);
814
815 ASSERTCMP(msg->msg_name, ==, NULL);
816 ASSERTCMP(msg->msg_control, ==, NULL);
817
818 mutex_lock(&call->user_mutex);
819
820 ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
821 notify_end_tx, &dropped_lock);
822 if (ret == -ESHUTDOWN)
823 ret = call->error;
824
825 if (!dropped_lock)
826 mutex_unlock(&call->user_mutex);
827 _leave(" = %d", ret);
828 return ret;
829}
830EXPORT_SYMBOL(rxrpc_kernel_send_data);
831
832/**
833 * rxrpc_kernel_abort_call - Allow a kernel service to abort a call
834 * @sock: The socket the call is on
835 * @call: The call to be aborted
836 * @abort_code: The abort code to stick into the ABORT packet
837 * @error: Local error value
838 * @why: Indication as to why.
839 *
840 * Allow a kernel service to abort a call if it's still in an abortable state.
841 *
842 * Return: %true if the call was aborted, %false if it was already complete.
843 */
844bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
845 u32 abort_code, int error, enum rxrpc_abort_reason why)
846{
847 bool aborted;
848
849 _enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
850
851 mutex_lock(&call->user_mutex);
852 aborted = rxrpc_propose_abort(call, abort_code, error, why);
853 mutex_unlock(&call->user_mutex);
854 return aborted;
855}
856EXPORT_SYMBOL(rxrpc_kernel_abort_call);
857
858/**
859 * rxrpc_kernel_set_tx_length - Set the total Tx length on a call
860 * @sock: The socket the call is on
861 * @call: The call to be informed
862 * @tx_total_len: The amount of data to be transmitted for this call
863 *
864 * Allow a kernel service to set the total transmit length on a call. This
865 * allows buffer-to-packet encrypt-and-copy to be performed.
866 *
867 * This function is primarily for use for setting the reply length since the
868 * request length can be set when beginning the call.
869 */
870void rxrpc_kernel_set_tx_length(struct socket *sock, struct rxrpc_call *call,
871 s64 tx_total_len)
872{
873 WARN_ON(call->tx_total_len != -1);
874 call->tx_total_len = tx_total_len;
875}
876EXPORT_SYMBOL(rxrpc_kernel_set_tx_length);