Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

rxrpc: Rework local endpoint management

Rework the local RxRPC endpoint management.

Local endpoint objects are maintained in a flat list as before. This
should be okay as there shouldn't be more than one per open AF_RXRPC socket
(there can be fewer as local endpoints can be shared if their local service
ID is 0 and they share the same local transport parameters).

Changes:

(1) Local endpoints may now only be shared if they have local service ID 0
(ie. they're not being used for listening).

This prevents a scenario where process A is listening of the Cache
Manager port and process B contacts a fileserver - which may then
attempt to send CM requests back to B. But if A and B are sharing a
local endpoint, A will get the CM requests meant for B.

(2) We use a mutex to handle lookups and don't provide RCU-only lookups
since we only expect to access the list when opening a socket or
destroying an endpoint.

The local endpoint object is pointed to by the transport socket's
sk_user_data for the life of the transport socket - allowing us to
refer to it directly from the sk_data_ready and sk_error_report
callbacks.

(3) atomic_inc_not_zero() now exists and can be used to only share a local
endpoint if the last reference hasn't yet gone.

(4) We can remove rxrpc_local_lock - a spinlock that had to be taken with
BH processing disabled given that we assume sk_user_data won't change
under us.

(5) The transport socket is shut down before we clear the sk_user_data
pointer so that we can be sure that the transport socket's callbacks
won't be invoked once the RCU destruction is scheduled.

(6) Local endpoints have a work item that handles both destruction and
event processing. The means that destruction doesn't then need to
wait for event processing. The event queues can then be cleared after
the transport socket is shut down.

(7) Local endpoints are no longer available for resurrection beyond the
life of the sockets that had them open. As soon as their last ref
goes, they are scheduled for destruction and may not have their usage
count moved from 0.

Signed-off-by: David Howells <dhowells@redhat.com>

+277 -231
+18 -1
net/rxrpc/af_rxrpc.c
··· 102 102 103 103 switch (srx->transport.family) { 104 104 case AF_INET: 105 + if (srx->transport_len < sizeof(struct sockaddr_in)) 106 + return -EINVAL; 105 107 _debug("INET: %x @ %pI4", 106 108 ntohs(srx->transport.sin.sin_port), 107 109 &srx->transport.sin.sin_addr); ··· 837 835 rxrpc_destroy_all_calls(); 838 836 rxrpc_destroy_all_connections(); 839 837 rxrpc_destroy_all_transports(); 840 - rxrpc_destroy_all_locals(); 841 838 842 839 ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0); 843 840 841 + /* We need to flush the scheduled work twice because the local endpoint 842 + * records involve a work item in their destruction as they can only be 843 + * destroyed from process context. However, a connection may have a 844 + * work item outstanding - and this will pin the local endpoint record 845 + * until the connection goes away. 846 + * 847 + * Peers don't pin locals and calls pin sockets - which prevents the 848 + * module from being unloaded - so we should only need two flushes. 849 + */ 844 850 _debug("flush scheduled work"); 845 851 flush_workqueue(rxrpc_workqueue); 852 + _debug("flush scheduled work 2"); 853 + flush_workqueue(rxrpc_workqueue); 854 + _debug("synchronise RCU"); 855 + rcu_barrier(); 856 + _debug("destroy locals"); 857 + rxrpc_destroy_all_locals(); 858 + 846 859 remove_proc_entry("rxrpc_conns", init_net.proc_net); 847 860 remove_proc_entry("rxrpc_calls", init_net.proc_net); 848 861 destroy_workqueue(rxrpc_workqueue);
+31 -24
net/rxrpc/ar-internal.h
··· 170 170 }; 171 171 172 172 /* 173 - * RxRPC local transport endpoint definition 174 - * - matched by local port, address and protocol type 173 + * RxRPC local transport endpoint description 174 + * - owned by a single AF_RXRPC socket 175 + * - pointed to by transport socket struct sk_user_data 175 176 */ 176 177 struct rxrpc_local { 178 + struct rcu_head rcu; 179 + atomic_t usage; 180 + struct list_head link; 177 181 struct socket *socket; /* my UDP socket */ 178 - struct work_struct destroyer; /* endpoint destroyer */ 179 - struct work_struct acceptor; /* incoming call processor */ 180 - struct work_struct rejecter; /* packet reject writer */ 181 - struct work_struct event_processor; /* endpoint event processor */ 182 + struct work_struct processor; 182 183 struct list_head services; /* services listening on this endpoint */ 183 - struct list_head link; /* link in endpoint list */ 184 184 struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */ 185 185 struct sk_buff_head accept_queue; /* incoming calls awaiting acceptance */ 186 186 struct sk_buff_head reject_queue; /* packets awaiting rejection */ 187 187 struct sk_buff_head event_queue; /* endpoint event packets awaiting processing */ 188 + struct mutex conn_lock; /* Client connection creation lock */ 188 189 spinlock_t lock; /* access lock */ 189 190 rwlock_t services_lock; /* lock for services list */ 190 - atomic_t usage; 191 191 int debug_id; /* debug ID for printks */ 192 + bool dead; 192 193 struct sockaddr_rxrpc srx; /* local address */ 193 194 }; 194 195 ··· 488 487 /* 489 488 * call_accept.c 490 489 */ 491 - void rxrpc_accept_incoming_calls(struct work_struct *); 490 + void rxrpc_accept_incoming_calls(struct rxrpc_local *); 492 491 struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long); 493 492 int rxrpc_reject_call(struct rxrpc_sock *); 494 493 ··· 528 527 */ 529 528 void rxrpc_process_connection(struct work_struct *); 530 529 void rxrpc_reject_packet(struct rxrpc_local *, struct sk_buff *); 531 - void rxrpc_reject_packets(struct work_struct *); 530 + void rxrpc_reject_packets(struct rxrpc_local *); 532 531 533 532 /* 534 533 * conn_object.c ··· 576 575 /* 577 576 * local_event.c 578 577 */ 579 - extern void rxrpc_process_local_events(struct work_struct *); 578 + extern void rxrpc_process_local_events(struct rxrpc_local *); 580 579 581 580 /* 582 581 * local_object.c 583 582 */ 584 - extern rwlock_t rxrpc_local_lock; 585 - 586 - struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *); 587 - void rxrpc_put_local(struct rxrpc_local *); 583 + struct rxrpc_local *rxrpc_lookup_local(const struct sockaddr_rxrpc *); 584 + void __rxrpc_put_local(struct rxrpc_local *); 588 585 void __exit rxrpc_destroy_all_locals(void); 586 + 587 + static inline void rxrpc_get_local(struct rxrpc_local *local) 588 + { 589 + atomic_inc(&local->usage); 590 + } 591 + 592 + static inline 593 + struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *local) 594 + { 595 + return atomic_inc_not_zero(&local->usage) ? local : NULL; 596 + } 597 + 598 + static inline void rxrpc_put_local(struct rxrpc_local *local) 599 + { 600 + if (atomic_dec_and_test(&local->usage)) 601 + __rxrpc_put_local(local); 602 + } 589 603 590 604 /* 591 605 * misc.c ··· 889 873 while ((skb = skb_dequeue((list))) != NULL) 890 874 rxrpc_free_skb(skb); 891 875 } 892 - 893 - static inline void __rxrpc_get_local(struct rxrpc_local *local, const char *f) 894 - { 895 - CHECK_SLAB_OKAY(&local->usage); 896 - if (atomic_inc_return(&local->usage) == 1) 897 - printk("resurrected (%s)\n", f); 898 - } 899 - 900 - #define rxrpc_get_local(LOCAL) __rxrpc_get_local((LOCAL), __func__) 901 876 902 877 #define rxrpc_get_call(CALL) \ 903 878 do { \
+5 -20
net/rxrpc/call_accept.c
··· 202 202 * accept incoming calls that need peer, transport and/or connection setting up 203 203 * - the packets we get are all incoming client DATA packets that have seq == 1 204 204 */ 205 - void rxrpc_accept_incoming_calls(struct work_struct *work) 205 + void rxrpc_accept_incoming_calls(struct rxrpc_local *local) 206 206 { 207 - struct rxrpc_local *local = 208 - container_of(work, struct rxrpc_local, acceptor); 209 207 struct rxrpc_skb_priv *sp; 210 208 struct sockaddr_rxrpc srx; 211 209 struct rxrpc_sock *rx; ··· 213 215 214 216 _enter("%d", local->debug_id); 215 217 216 - read_lock_bh(&rxrpc_local_lock); 217 - if (atomic_read(&local->usage) > 0) 218 - rxrpc_get_local(local); 219 - else 220 - local = NULL; 221 - read_unlock_bh(&rxrpc_local_lock); 222 - if (!local) { 223 - _leave(" [local dead]"); 224 - return; 225 - } 226 - 227 - process_next_packet: 228 218 skb = skb_dequeue(&local->accept_queue); 229 219 if (!skb) { 230 - rxrpc_put_local(local); 231 220 _leave("\n"); 232 221 return; 233 222 } ··· 277 292 case -ECONNRESET: /* old calls are ignored */ 278 293 case -ECONNABORTED: /* aborted calls are reaborted or ignored */ 279 294 case 0: 280 - goto process_next_packet; 295 + return; 281 296 case -ECONNREFUSED: 282 297 goto invalid_service; 283 298 case -EBUSY: ··· 293 308 busy: 294 309 rxrpc_busy(local, &srx, &whdr); 295 310 rxrpc_free_skb(skb); 296 - goto process_next_packet; 311 + return; 297 312 298 313 invalid_service: 299 314 skb->priority = RX_INVALID_OPERATION; 300 315 rxrpc_reject_packet(local, skb); 301 - goto process_next_packet; 316 + return; 302 317 303 318 /* can't change connection security type mid-flow */ 304 319 security_mismatch: 305 320 skb->priority = RX_PROTOCOL_ERROR; 306 321 rxrpc_reject_packet(local, skb); 307 - goto process_next_packet; 322 + return; 308 323 } 309 324 310 325 /*
+2 -13
net/rxrpc/conn_event.c
··· 314 314 { 315 315 CHECK_SLAB_OKAY(&local->usage); 316 316 317 - if (!atomic_inc_not_zero(&local->usage)) { 318 - printk("resurrected on reject\n"); 319 - BUG(); 320 - } 321 - 322 317 skb_queue_tail(&local->reject_queue, skb); 323 - rxrpc_queue_work(&local->rejecter); 318 + rxrpc_queue_work(&local->processor); 324 319 } 325 320 326 321 /* 327 322 * reject packets through the local endpoint 328 323 */ 329 - void rxrpc_reject_packets(struct work_struct *work) 324 + void rxrpc_reject_packets(struct rxrpc_local *local) 330 325 { 331 326 union { 332 327 struct sockaddr sa; ··· 329 334 } sa; 330 335 struct rxrpc_skb_priv *sp; 331 336 struct rxrpc_wire_header whdr; 332 - struct rxrpc_local *local; 333 337 struct sk_buff *skb; 334 338 struct msghdr msg; 335 339 struct kvec iov[2]; 336 340 size_t size; 337 341 __be32 code; 338 - 339 - local = container_of(work, struct rxrpc_local, rejecter); 340 - rxrpc_get_local(local); 341 342 342 343 _enter("%d", local->debug_id); 343 344 ··· 386 395 } 387 396 388 397 rxrpc_free_skb(skb); 389 - rxrpc_put_local(local); 390 398 } 391 399 392 - rxrpc_put_local(local); 393 400 _leave(""); 394 401 }
+7 -22
net/rxrpc/input.c
··· 594 594 { 595 595 _enter("%p,%p", local, skb); 596 596 597 - atomic_inc(&local->usage); 598 597 skb_queue_tail(&local->event_queue, skb); 599 - rxrpc_queue_work(&local->event_processor); 598 + rxrpc_queue_work(&local->processor); 600 599 } 601 600 602 601 /* ··· 663 664 /* 664 665 * handle data received on the local endpoint 665 666 * - may be called in interrupt context 667 + * 668 + * The socket is locked by the caller and this prevents the socket from being 669 + * shut down and the local endpoint from going away, thus sk_user_data will not 670 + * be cleared until this function returns. 666 671 */ 667 672 void rxrpc_data_ready(struct sock *sk) 668 673 { 669 674 struct rxrpc_skb_priv *sp; 670 - struct rxrpc_local *local; 675 + struct rxrpc_local *local = sk->sk_user_data; 671 676 struct sk_buff *skb; 672 677 int ret; 673 678 ··· 679 676 680 677 ASSERT(!irqs_disabled()); 681 678 682 - read_lock_bh(&rxrpc_local_lock); 683 - local = sk->sk_user_data; 684 - if (local && atomic_read(&local->usage) > 0) 685 - rxrpc_get_local(local); 686 - else 687 - local = NULL; 688 - read_unlock_bh(&rxrpc_local_lock); 689 - if (!local) { 690 - _leave(" [local dead]"); 691 - return; 692 - } 693 - 694 679 skb = skb_recv_datagram(sk, 0, 1, &ret); 695 680 if (!skb) { 696 - rxrpc_put_local(local); 697 681 if (ret == -EAGAIN) 698 682 return; 699 683 _debug("UDP socket error %d", ret); ··· 694 704 /* we'll probably need to checksum it (didn't call sock_recvmsg) */ 695 705 if (skb_checksum_complete(skb)) { 696 706 rxrpc_free_skb(skb); 697 - rxrpc_put_local(local); 698 707 __UDP_INC_STATS(&init_net, UDP_MIB_INERRORS, 0); 699 708 _leave(" [CSUM failed]"); 700 709 return; ··· 758 769 } 759 770 760 771 out: 761 - rxrpc_put_local(local); 762 772 return; 763 773 764 774 cant_route_call: ··· 767 779 if (sp->hdr.seq == 1) { 768 780 _debug("first packet"); 769 781 skb_queue_tail(&local->accept_queue, skb); 770 - rxrpc_queue_work(&local->acceptor); 771 - rxrpc_put_local(local); 782 + rxrpc_queue_work(&local->processor); 772 783 _leave(" [incoming]"); 773 784 return; 774 785 } ··· 780 793 _debug("reject type %d",sp->hdr.type); 781 794 rxrpc_reject_packet(local, skb); 782 795 } 783 - rxrpc_put_local(local); 784 796 _leave(" [no call]"); 785 797 return; 786 798 787 799 bad_message: 788 800 skb->priority = RX_PROTOCOL_ERROR; 789 801 rxrpc_reject_packet(local, skb); 790 - rxrpc_put_local(local); 791 802 _leave(" [badmsg]"); 792 803 }
+3 -7
net/rxrpc/local_event.c
··· 82 82 /* 83 83 * Process event packets targetted at a local endpoint. 84 84 */ 85 - void rxrpc_process_local_events(struct work_struct *work) 85 + void rxrpc_process_local_events(struct rxrpc_local *local) 86 86 { 87 - struct rxrpc_local *local = container_of(work, struct rxrpc_local, event_processor); 88 87 struct sk_buff *skb; 89 88 char v; 90 89 91 90 _enter(""); 92 91 93 - atomic_inc(&local->usage); 94 - 95 - while ((skb = skb_dequeue(&local->event_queue))) { 92 + skb = skb_dequeue(&local->event_queue); 93 + if (skb) { 96 94 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 97 95 98 96 _debug("{%d},{%u}", local->debug_id, sp->hdr.type); ··· 109 111 break; 110 112 } 111 113 112 - rxrpc_put_local(local); 113 114 rxrpc_free_skb(skb); 114 115 } 115 116 116 - rxrpc_put_local(local); 117 117 _leave(""); 118 118 }
+211 -144
net/rxrpc/local_object.c
··· 1 1 /* Local endpoint object management 2 2 * 3 - * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 3 + * Copyright (C) 2016 Red Hat, Inc. All Rights Reserved. 4 4 * Written by David Howells (dhowells@redhat.com) 5 5 * 6 6 * This program is free software; you can redistribute it and/or ··· 17 17 #include <linux/slab.h> 18 18 #include <linux/udp.h> 19 19 #include <linux/ip.h> 20 + #include <linux/hashtable.h> 20 21 #include <net/sock.h> 21 22 #include <net/af_rxrpc.h> 22 23 #include "ar-internal.h" 23 24 24 - static LIST_HEAD(rxrpc_locals); 25 - DEFINE_RWLOCK(rxrpc_local_lock); 26 - static DECLARE_RWSEM(rxrpc_local_sem); 27 - static DECLARE_WAIT_QUEUE_HEAD(rxrpc_local_wq); 25 + static void rxrpc_local_processor(struct work_struct *); 26 + static void rxrpc_local_rcu(struct rcu_head *); 28 27 29 - static void rxrpc_destroy_local(struct work_struct *work); 28 + static DEFINE_MUTEX(rxrpc_local_mutex); 29 + static LIST_HEAD(rxrpc_local_endpoints); 30 30 31 31 /* 32 - * allocate a new local 32 + * Compare a local to an address. Return -ve, 0 or +ve to indicate less than, 33 + * same or greater than. 34 + * 35 + * We explicitly don't compare the RxRPC service ID as we want to reject 36 + * conflicting uses by differing services. Further, we don't want to share 37 + * addresses with different options (IPv6), so we don't compare those bits 38 + * either. 33 39 */ 34 - static 35 - struct rxrpc_local *rxrpc_alloc_local(struct sockaddr_rxrpc *srx) 40 + static long rxrpc_local_cmp_key(const struct rxrpc_local *local, 41 + const struct sockaddr_rxrpc *srx) 42 + { 43 + long diff; 44 + 45 + diff = ((local->srx.transport_type - srx->transport_type) ?: 46 + (local->srx.transport_len - srx->transport_len) ?: 47 + (local->srx.transport.family - srx->transport.family)); 48 + if (diff != 0) 49 + return diff; 50 + 51 + switch (srx->transport.family) { 52 + case AF_INET: 53 + /* If the choice of UDP port is left up to the transport, then 54 + * the endpoint record doesn't match. 55 + */ 56 + return ((u16 __force)local->srx.transport.sin.sin_port - 57 + (u16 __force)srx->transport.sin.sin_port) ?: 58 + memcmp(&local->srx.transport.sin.sin_addr, 59 + &srx->transport.sin.sin_addr, 60 + sizeof(struct in_addr)); 61 + default: 62 + BUG(); 63 + } 64 + } 65 + 66 + /* 67 + * Allocate a new local endpoint. 68 + */ 69 + static struct rxrpc_local *rxrpc_alloc_local(const struct sockaddr_rxrpc *srx) 36 70 { 37 71 struct rxrpc_local *local; 38 72 39 73 local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL); 40 74 if (local) { 41 - INIT_WORK(&local->destroyer, &rxrpc_destroy_local); 42 - INIT_WORK(&local->acceptor, &rxrpc_accept_incoming_calls); 43 - INIT_WORK(&local->rejecter, &rxrpc_reject_packets); 44 - INIT_WORK(&local->event_processor, &rxrpc_process_local_events); 45 - INIT_LIST_HEAD(&local->services); 75 + atomic_set(&local->usage, 1); 46 76 INIT_LIST_HEAD(&local->link); 77 + INIT_WORK(&local->processor, rxrpc_local_processor); 78 + INIT_LIST_HEAD(&local->services); 47 79 init_rwsem(&local->defrag_sem); 48 80 skb_queue_head_init(&local->accept_queue); 49 81 skb_queue_head_init(&local->reject_queue); 50 82 skb_queue_head_init(&local->event_queue); 83 + mutex_init(&local->conn_lock); 51 84 spin_lock_init(&local->lock); 52 85 rwlock_init(&local->services_lock); 53 - atomic_set(&local->usage, 1); 54 86 local->debug_id = atomic_inc_return(&rxrpc_debug_id); 55 87 memcpy(&local->srx, srx, sizeof(*srx)); 56 88 } ··· 93 61 94 62 /* 95 63 * create the local socket 96 - * - must be called with rxrpc_local_sem writelocked 64 + * - must be called with rxrpc_local_mutex locked 97 65 */ 98 - static int rxrpc_create_local(struct rxrpc_local *local) 66 + static int rxrpc_open_socket(struct rxrpc_local *local) 99 67 { 100 68 struct sock *sock; 101 69 int ret, opt; ··· 114 82 if (local->srx.transport_len > sizeof(sa_family_t)) { 115 83 _debug("bind"); 116 84 ret = kernel_bind(local->socket, 117 - (struct sockaddr *) &local->srx.transport, 85 + (struct sockaddr *)&local->srx.transport, 118 86 local->srx.transport_len); 119 87 if (ret < 0) { 120 - _debug("bind failed"); 88 + _debug("bind failed %d", ret); 121 89 goto error; 122 90 } 123 91 } ··· 140 108 goto error; 141 109 } 142 110 143 - write_lock_bh(&rxrpc_local_lock); 144 - list_add(&local->link, &rxrpc_locals); 145 - write_unlock_bh(&rxrpc_local_lock); 146 - 147 111 /* set the socket up */ 148 112 sock = local->socket->sk; 149 113 sock->sk_user_data = local; ··· 159 131 } 160 132 161 133 /* 162 - * create a new local endpoint using the specified UDP address 134 + * Look up or create a new local endpoint using the specified local address. 163 135 */ 164 - struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *srx) 136 + struct rxrpc_local *rxrpc_lookup_local(const struct sockaddr_rxrpc *srx) 165 137 { 166 138 struct rxrpc_local *local; 139 + struct list_head *cursor; 140 + const char *age; 141 + long diff; 167 142 int ret; 168 143 169 - _enter("{%d,%u,%pI4+%hu}", 170 - srx->transport_type, 171 - srx->transport.family, 172 - &srx->transport.sin.sin_addr, 173 - ntohs(srx->transport.sin.sin_port)); 144 + if (srx->transport.family == AF_INET) { 145 + _enter("{%d,%u,%pI4+%hu}", 146 + srx->transport_type, 147 + srx->transport.family, 148 + &srx->transport.sin.sin_addr, 149 + ntohs(srx->transport.sin.sin_port)); 150 + } else { 151 + _enter("{%d,%u}", 152 + srx->transport_type, 153 + srx->transport.family); 154 + return ERR_PTR(-EAFNOSUPPORT); 155 + } 174 156 175 - down_write(&rxrpc_local_sem); 157 + mutex_lock(&rxrpc_local_mutex); 176 158 177 - /* see if we have a suitable local local endpoint already */ 178 - read_lock_bh(&rxrpc_local_lock); 159 + for (cursor = rxrpc_local_endpoints.next; 160 + cursor != &rxrpc_local_endpoints; 161 + cursor = cursor->next) { 162 + local = list_entry(cursor, struct rxrpc_local, link); 179 163 180 - list_for_each_entry(local, &rxrpc_locals, link) { 181 - _debug("CMP {%d,%u,%pI4+%hu}", 182 - local->srx.transport_type, 183 - local->srx.transport.family, 184 - &local->srx.transport.sin.sin_addr, 185 - ntohs(local->srx.transport.sin.sin_port)); 186 - 187 - if (local->srx.transport_type != srx->transport_type || 188 - local->srx.transport.family != srx->transport.family) 164 + diff = rxrpc_local_cmp_key(local, srx); 165 + if (diff < 0) 189 166 continue; 167 + if (diff > 0) 168 + break; 190 169 191 - switch (srx->transport.family) { 192 - case AF_INET: 193 - if (local->srx.transport.sin.sin_port != 194 - srx->transport.sin.sin_port) 195 - continue; 196 - if (memcmp(&local->srx.transport.sin.sin_addr, 197 - &srx->transport.sin.sin_addr, 198 - sizeof(struct in_addr)) != 0) 199 - continue; 200 - goto found_local; 201 - 202 - default: 203 - BUG(); 170 + /* Services aren't allowed to share transport sockets, so 171 + * reject that here. It is possible that the object is dying - 172 + * but it may also still have the local transport address that 173 + * we want bound. 174 + */ 175 + if (srx->srx_service) { 176 + local = NULL; 177 + goto addr_in_use; 204 178 } 179 + 180 + /* Found a match. We replace a dying object. Attempting to 181 + * bind the transport socket may still fail if we're attempting 182 + * to use a local address that the dying object is still using. 183 + */ 184 + if (!atomic_inc_not_zero(&local->usage)) { 185 + cursor = cursor->next; 186 + list_del_init(&local->link); 187 + break; 188 + } 189 + 190 + age = "old"; 191 + goto found; 205 192 } 206 193 207 - read_unlock_bh(&rxrpc_local_lock); 208 - 209 - /* we didn't find one, so we need to create one */ 210 194 local = rxrpc_alloc_local(srx); 211 - if (!local) { 212 - up_write(&rxrpc_local_sem); 213 - return ERR_PTR(-ENOMEM); 214 - } 195 + if (!local) 196 + goto nomem; 215 197 216 - ret = rxrpc_create_local(local); 217 - if (ret < 0) { 218 - up_write(&rxrpc_local_sem); 219 - kfree(local); 220 - _leave(" = %d", ret); 221 - return ERR_PTR(ret); 222 - } 198 + ret = rxrpc_open_socket(local); 199 + if (ret < 0) 200 + goto sock_error; 223 201 224 - up_write(&rxrpc_local_sem); 202 + list_add_tail(&local->link, cursor); 203 + age = "new"; 225 204 226 - _net("LOCAL new %d {%d,%u,%pI4+%hu}", 205 + found: 206 + mutex_unlock(&rxrpc_local_mutex); 207 + 208 + _net("LOCAL %s %d {%d,%u,%pI4+%hu}", 209 + age, 227 210 local->debug_id, 228 211 local->srx.transport_type, 229 212 local->srx.transport.family, 230 213 &local->srx.transport.sin.sin_addr, 231 214 ntohs(local->srx.transport.sin.sin_port)); 232 215 233 - _leave(" = %p [new]", local); 216 + _leave(" = %p", local); 234 217 return local; 235 218 236 - found_local: 237 - rxrpc_get_local(local); 238 - read_unlock_bh(&rxrpc_local_lock); 239 - up_write(&rxrpc_local_sem); 219 + nomem: 220 + ret = -ENOMEM; 221 + sock_error: 222 + mutex_unlock(&rxrpc_local_mutex); 223 + kfree(local); 224 + _leave(" = %d", ret); 225 + return ERR_PTR(ret); 240 226 241 - _net("LOCAL old %d {%d,%u,%pI4+%hu}", 242 - local->debug_id, 243 - local->srx.transport_type, 244 - local->srx.transport.family, 245 - &local->srx.transport.sin.sin_addr, 246 - ntohs(local->srx.transport.sin.sin_port)); 247 - 248 - _leave(" = %p [reuse]", local); 249 - return local; 227 + addr_in_use: 228 + mutex_unlock(&rxrpc_local_mutex); 229 + _leave(" = -EADDRINUSE"); 230 + return ERR_PTR(-EADDRINUSE); 250 231 } 251 232 252 233 /* 253 - * release a local endpoint 234 + * A local endpoint reached its end of life. 254 235 */ 255 - void rxrpc_put_local(struct rxrpc_local *local) 236 + void __rxrpc_put_local(struct rxrpc_local *local) 256 237 { 257 - _enter("%p{u=%d}", local, atomic_read(&local->usage)); 258 - 259 - ASSERTCMP(atomic_read(&local->usage), >, 0); 260 - 261 - /* to prevent a race, the decrement and the dequeue must be effectively 262 - * atomic */ 263 - write_lock_bh(&rxrpc_local_lock); 264 - if (unlikely(atomic_dec_and_test(&local->usage))) { 265 - _debug("destroy local"); 266 - rxrpc_queue_work(&local->destroyer); 267 - } 268 - write_unlock_bh(&rxrpc_local_lock); 269 - _leave(""); 238 + _enter("%d", local->debug_id); 239 + rxrpc_queue_work(&local->processor); 270 240 } 271 241 272 242 /* 273 - * destroy a local endpoint 243 + * Destroy a local endpoint's socket and then hand the record to RCU to dispose 244 + * of. 245 + * 246 + * Closing the socket cannot be done from bottom half context or RCU callback 247 + * context because it might sleep. 274 248 */ 275 - static void rxrpc_destroy_local(struct work_struct *work) 249 + static void rxrpc_local_destroyer(struct rxrpc_local *local) 276 250 { 277 - struct rxrpc_local *local = 278 - container_of(work, struct rxrpc_local, destroyer); 251 + struct socket *socket = local->socket; 279 252 280 - _enter("%p{%d}", local, atomic_read(&local->usage)); 253 + _enter("%d", local->debug_id); 281 254 282 - down_write(&rxrpc_local_sem); 283 - 284 - write_lock_bh(&rxrpc_local_lock); 285 - if (atomic_read(&local->usage) > 0) { 286 - write_unlock_bh(&rxrpc_local_lock); 287 - up_read(&rxrpc_local_sem); 288 - _leave(" [resurrected]"); 255 + /* We can get a race between an incoming call packet queueing the 256 + * processor again and the work processor starting the destruction 257 + * process which will shut down the UDP socket. 258 + */ 259 + if (local->dead) { 260 + _leave(" [already dead]"); 289 261 return; 290 262 } 263 + local->dead = true; 291 264 292 - list_del(&local->link); 293 - local->socket->sk->sk_user_data = NULL; 294 - write_unlock_bh(&rxrpc_local_lock); 295 - 296 - downgrade_write(&rxrpc_local_sem); 265 + mutex_lock(&rxrpc_local_mutex); 266 + list_del_init(&local->link); 267 + mutex_unlock(&rxrpc_local_mutex); 297 268 298 269 ASSERT(list_empty(&local->services)); 299 - ASSERT(!work_pending(&local->acceptor)); 300 - ASSERT(!work_pending(&local->rejecter)); 301 - ASSERT(!work_pending(&local->event_processor)); 302 270 303 - /* finish cleaning up the local descriptor */ 271 + if (socket) { 272 + local->socket = NULL; 273 + kernel_sock_shutdown(socket, SHUT_RDWR); 274 + socket->sk->sk_user_data = NULL; 275 + sock_release(socket); 276 + } 277 + 278 + /* At this point, there should be no more packets coming in to the 279 + * local endpoint. 280 + */ 304 281 rxrpc_purge_queue(&local->accept_queue); 305 282 rxrpc_purge_queue(&local->reject_queue); 306 283 rxrpc_purge_queue(&local->event_queue); 307 - kernel_sock_shutdown(local->socket, SHUT_RDWR); 308 - sock_release(local->socket); 309 284 310 - up_read(&rxrpc_local_sem); 285 + _debug("rcu local %d", local->debug_id); 286 + call_rcu(&local->rcu, rxrpc_local_rcu); 287 + } 288 + 289 + /* 290 + * Process events on an endpoint 291 + */ 292 + static void rxrpc_local_processor(struct work_struct *work) 293 + { 294 + struct rxrpc_local *local = 295 + container_of(work, struct rxrpc_local, processor); 296 + bool again; 297 + 298 + _enter("%d", local->debug_id); 299 + 300 + do { 301 + again = false; 302 + if (atomic_read(&local->usage) == 0) 303 + return rxrpc_local_destroyer(local); 304 + 305 + if (!skb_queue_empty(&local->accept_queue)) { 306 + rxrpc_accept_incoming_calls(local); 307 + again = true; 308 + } 309 + 310 + if (!skb_queue_empty(&local->reject_queue)) { 311 + rxrpc_reject_packets(local); 312 + again = true; 313 + } 314 + 315 + if (!skb_queue_empty(&local->event_queue)) { 316 + rxrpc_process_local_events(local); 317 + again = true; 318 + } 319 + } while (again); 320 + } 321 + 322 + /* 323 + * Destroy a local endpoint after the RCU grace period expires. 324 + */ 325 + static void rxrpc_local_rcu(struct rcu_head *rcu) 326 + { 327 + struct rxrpc_local *local = container_of(rcu, struct rxrpc_local, rcu); 328 + 329 + _enter("%d", local->debug_id); 330 + 331 + ASSERT(!work_pending(&local->processor)); 311 332 312 333 _net("DESTROY LOCAL %d", local->debug_id); 313 334 kfree(local); 314 - 315 - if (list_empty(&rxrpc_locals)) 316 - wake_up_all(&rxrpc_local_wq); 317 - 318 335 _leave(""); 319 336 } 320 337 321 338 /* 322 - * preemptively destroy all local local endpoint rather than waiting for 323 - * them to be destroyed 339 + * Verify the local endpoint list is empty by this point. 324 340 */ 325 341 void __exit rxrpc_destroy_all_locals(void) 326 342 { 327 - DECLARE_WAITQUEUE(myself,current); 343 + struct rxrpc_local *local; 328 344 329 345 _enter(""); 330 346 331 - /* we simply have to wait for them to go away */ 332 - if (!list_empty(&rxrpc_locals)) { 333 - set_current_state(TASK_UNINTERRUPTIBLE); 334 - add_wait_queue(&rxrpc_local_wq, &myself); 347 + if (list_empty(&rxrpc_local_endpoints)) 348 + return; 335 349 336 - while (!list_empty(&rxrpc_locals)) { 337 - schedule(); 338 - set_current_state(TASK_UNINTERRUPTIBLE); 339 - } 340 - 341 - remove_wait_queue(&rxrpc_local_wq, &myself); 342 - set_current_state(TASK_RUNNING); 350 + mutex_lock(&rxrpc_local_mutex); 351 + list_for_each_entry(local, &rxrpc_local_endpoints, link) { 352 + pr_err("AF_RXRPC: Leaked local %p {%d}\n", 353 + local, atomic_read(&local->usage)); 343 354 } 344 - 345 - _leave(""); 355 + mutex_unlock(&rxrpc_local_mutex); 356 + BUG(); 346 357 }