SUNRPC: rpc_timeout_upcall_queue should not sleep

The function rpc_timeout_upcall_queue runs from a workqueue, and hence
sleeping is not recommended. Convert the protection of the upcall queue
from being mutex-based to being spinlock-based.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>

+58 -38
+58 -38
net/sunrpc/rpc_pipe.c
··· 38 38 39 39 #define RPC_UPCALL_TIMEOUT (30*HZ) 40 40 41 - static void 42 - __rpc_purge_list(struct rpc_inode *rpci, struct list_head *head, int err) 41 + static void rpc_purge_list(struct rpc_inode *rpci, struct list_head *head, 42 + void (*destroy_msg)(struct rpc_pipe_msg *), int err) 43 43 { 44 44 struct rpc_pipe_msg *msg; 45 - void (*destroy_msg)(struct rpc_pipe_msg *); 46 45 47 - destroy_msg = rpci->ops->destroy_msg; 48 - while (!list_empty(head)) { 46 + if (list_empty(head)) 47 + return; 48 + do { 49 49 msg = list_entry(head->next, struct rpc_pipe_msg, list); 50 - list_del_init(&msg->list); 50 + list_del(&msg->list); 51 51 msg->errno = err; 52 52 destroy_msg(msg); 53 - } 54 - } 55 - 56 - static void 57 - __rpc_purge_upcall(struct inode *inode, int err) 58 - { 59 - struct rpc_inode *rpci = RPC_I(inode); 60 - 61 - __rpc_purge_list(rpci, &rpci->pipe, err); 62 - rpci->pipelen = 0; 53 + } while (!list_empty(head)); 63 54 wake_up(&rpci->waitq); 64 55 } 65 56 66 57 static void 67 58 rpc_timeout_upcall_queue(void *data) 68 59 { 60 + LIST_HEAD(free_list); 69 61 struct rpc_inode *rpci = (struct rpc_inode *)data; 70 62 struct inode *inode = &rpci->vfs_inode; 63 + void (*destroy_msg)(struct rpc_pipe_msg *); 71 64 72 - mutex_lock(&inode->i_mutex); 73 - if (rpci->ops == NULL) 74 - goto out; 75 - if (rpci->nreaders == 0 && !list_empty(&rpci->pipe)) 76 - __rpc_purge_upcall(inode, -ETIMEDOUT); 77 - out: 78 - mutex_unlock(&inode->i_mutex); 65 + spin_lock(&inode->i_lock); 66 + if (rpci->ops == NULL) { 67 + spin_unlock(&inode->i_lock); 68 + return; 69 + } 70 + destroy_msg = rpci->ops->destroy_msg; 71 + if (rpci->nreaders == 0) { 72 + list_splice_init(&rpci->pipe, &free_list); 73 + rpci->pipelen = 0; 74 + } 75 + spin_unlock(&inode->i_lock); 76 + rpc_purge_list(rpci, &free_list, destroy_msg, -ETIMEDOUT); 79 77 } 80 78 81 79 int ··· 82 84 struct rpc_inode *rpci = RPC_I(inode); 83 85 int res = -EPIPE; 84 86 85 - mutex_lock(&inode->i_mutex); 87 + spin_lock(&inode->i_lock); 86 88 if (rpci->ops == NULL) 87 89 goto out; 88 90 if (rpci->nreaders) { ··· 98 100 res = 0; 99 101 } 100 102 out: 101 - mutex_unlock(&inode->i_mutex); 103 + spin_unlock(&inode->i_lock); 102 104 wake_up(&rpci->waitq); 103 105 return res; 104 106 } ··· 113 115 rpc_close_pipes(struct inode *inode) 114 116 { 115 117 struct rpc_inode *rpci = RPC_I(inode); 118 + struct rpc_pipe_ops *ops; 116 119 117 120 mutex_lock(&inode->i_mutex); 118 - if (rpci->ops != NULL) { 121 + ops = rpci->ops; 122 + if (ops != NULL) { 123 + LIST_HEAD(free_list); 124 + 125 + spin_lock(&inode->i_lock); 119 126 rpci->nreaders = 0; 120 - __rpc_purge_list(rpci, &rpci->in_upcall, -EPIPE); 121 - __rpc_purge_upcall(inode, -EPIPE); 122 - rpci->nwriters = 0; 123 - if (rpci->ops->release_pipe) 124 - rpci->ops->release_pipe(inode); 127 + list_splice_init(&rpci->in_upcall, &free_list); 128 + list_splice_init(&rpci->pipe, &free_list); 129 + rpci->pipelen = 0; 125 130 rpci->ops = NULL; 131 + spin_unlock(&inode->i_lock); 132 + rpc_purge_list(rpci, &free_list, ops->destroy_msg, -EPIPE); 133 + rpci->nwriters = 0; 134 + if (ops->release_pipe) 135 + ops->release_pipe(inode); 136 + cancel_delayed_work(&rpci->queue_timeout); 137 + flush_scheduled_work(); 126 138 } 127 139 rpc_inode_setowner(inode, NULL); 128 140 mutex_unlock(&inode->i_mutex); 129 - cancel_delayed_work(&rpci->queue_timeout); 130 - flush_scheduled_work(); 131 141 } 132 142 133 143 static struct inode * ··· 183 177 goto out; 184 178 msg = (struct rpc_pipe_msg *)filp->private_data; 185 179 if (msg != NULL) { 180 + spin_lock(&inode->i_lock); 186 181 msg->errno = -EAGAIN; 187 - list_del_init(&msg->list); 182 + list_del(&msg->list); 183 + spin_unlock(&inode->i_lock); 188 184 rpci->ops->destroy_msg(msg); 189 185 } 190 186 if (filp->f_mode & FMODE_WRITE) 191 187 rpci->nwriters --; 192 - if (filp->f_mode & FMODE_READ) 188 + if (filp->f_mode & FMODE_READ) { 193 189 rpci->nreaders --; 194 - if (!rpci->nreaders) 195 - __rpc_purge_upcall(inode, -EAGAIN); 190 + if (rpci->nreaders == 0) { 191 + LIST_HEAD(free_list); 192 + spin_lock(&inode->i_lock); 193 + list_splice_init(&rpci->pipe, &free_list); 194 + rpci->pipelen = 0; 195 + spin_unlock(&inode->i_lock); 196 + rpc_purge_list(rpci, &free_list, 197 + rpci->ops->destroy_msg, -EAGAIN); 198 + } 199 + } 196 200 if (rpci->ops->release_pipe) 197 201 rpci->ops->release_pipe(inode); 198 202 out: ··· 225 209 } 226 210 msg = filp->private_data; 227 211 if (msg == NULL) { 212 + spin_lock(&inode->i_lock); 228 213 if (!list_empty(&rpci->pipe)) { 229 214 msg = list_entry(rpci->pipe.next, 230 215 struct rpc_pipe_msg, ··· 235 218 filp->private_data = msg; 236 219 msg->copied = 0; 237 220 } 221 + spin_unlock(&inode->i_lock); 238 222 if (msg == NULL) 239 223 goto out_unlock; 240 224 } ··· 243 225 res = rpci->ops->upcall(filp, msg, buf, len); 244 226 if (res < 0 || msg->len == msg->copied) { 245 227 filp->private_data = NULL; 246 - list_del_init(&msg->list); 228 + spin_lock(&inode->i_lock); 229 + list_del(&msg->list); 230 + spin_unlock(&inode->i_lock); 247 231 rpci->ops->destroy_msg(msg); 248 232 } 249 233 out_unlock: