Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

SUNRPC: move waitq from RPC pipe to RPC inode

Currently, wait queue, used for polling of RPC pipe changes from user-space,
is a part of RPC pipe. But the pipe data itself can be released on NFS umount
prior to dentry-inode pair, connected to it (is case of this pair is open by
some process).
This is not a problem for almost all pipe users, because all PipeFS file
operations checks pipe reference prior to using it.
Except evenfd. This thing registers itself with "poll" file operation and thus
has a reference to pipe wait queue. This leads to oopses on destroying eventfd
after NFS umount (like rpc_idmapd do) since not pipe data left to the point
already.
The solution is to wait queue from pipe data to internal RPC inode data. This
looks more logical, because this wiat queue used only for user-space processes,
which already holds inode reference.

Note: upcalls have to get pipe->dentry prior to dereferecing wait queue to make
sure, that mount point won't disappear from underneath us.

Signed-off-by: Stanislav Kinsbursky <skinsbursky@parallels.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>

authored by

Stanislav Kinsbursky and committed by
Trond Myklebust
591ad7fe 2c9030ee

+27 -14
+1 -1
include/linux/sunrpc/rpc_pipe_fs.h
··· 28 28 int pipelen; 29 29 int nreaders; 30 30 int nwriters; 31 - wait_queue_head_t waitq; 32 31 #define RPC_PIPE_WAIT_FOR_OPEN 1 33 32 int flags; 34 33 struct delayed_work queue_timeout; ··· 40 41 struct inode vfs_inode; 41 42 void *private; 42 43 struct rpc_pipe *pipe; 44 + wait_queue_head_t waitq; 43 45 }; 44 46 45 47 static inline struct rpc_inode *
+26 -13
net/sunrpc/rpc_pipe.c
··· 57 57 } 58 58 EXPORT_SYMBOL_GPL(rpc_pipefs_notifier_unregister); 59 59 60 - static void rpc_purge_list(struct rpc_pipe *pipe, struct list_head *head, 60 + static void rpc_purge_list(wait_queue_head_t *waitq, struct list_head *head, 61 61 void (*destroy_msg)(struct rpc_pipe_msg *), int err) 62 62 { 63 63 struct rpc_pipe_msg *msg; ··· 70 70 msg->errno = err; 71 71 destroy_msg(msg); 72 72 } while (!list_empty(head)); 73 - wake_up(&pipe->waitq); 73 + wake_up(waitq); 74 74 } 75 75 76 76 static void ··· 80 80 struct rpc_pipe *pipe = 81 81 container_of(work, struct rpc_pipe, queue_timeout.work); 82 82 void (*destroy_msg)(struct rpc_pipe_msg *); 83 + struct dentry *dentry; 83 84 84 85 spin_lock(&pipe->lock); 85 86 destroy_msg = pipe->ops->destroy_msg; ··· 88 87 list_splice_init(&pipe->pipe, &free_list); 89 88 pipe->pipelen = 0; 90 89 } 90 + dentry = dget(pipe->dentry); 91 91 spin_unlock(&pipe->lock); 92 - rpc_purge_list(pipe, &free_list, destroy_msg, -ETIMEDOUT); 92 + if (dentry) { 93 + rpc_purge_list(&RPC_I(dentry->d_inode)->waitq, 94 + &free_list, destroy_msg, -ETIMEDOUT); 95 + dput(dentry); 96 + } 93 97 } 94 98 95 99 ssize_t rpc_pipe_generic_upcall(struct file *filp, struct rpc_pipe_msg *msg, ··· 131 125 rpc_queue_upcall(struct rpc_pipe *pipe, struct rpc_pipe_msg *msg) 132 126 { 133 127 int res = -EPIPE; 128 + struct dentry *dentry; 134 129 135 130 spin_lock(&pipe->lock); 136 131 if (pipe->nreaders) { ··· 147 140 pipe->pipelen += msg->len; 148 141 res = 0; 149 142 } 143 + dentry = dget(pipe->dentry); 150 144 spin_unlock(&pipe->lock); 151 - wake_up(&pipe->waitq); 145 + if (dentry) { 146 + wake_up(&RPC_I(dentry->d_inode)->waitq); 147 + dput(dentry); 148 + } 152 149 return res; 153 150 } 154 151 EXPORT_SYMBOL_GPL(rpc_queue_upcall); ··· 179 168 pipe->pipelen = 0; 180 169 pipe->dentry = NULL; 181 170 spin_unlock(&pipe->lock); 182 - rpc_purge_list(pipe, &free_list, pipe->ops->destroy_msg, -EPIPE); 171 + rpc_purge_list(&RPC_I(inode)->waitq, &free_list, pipe->ops->destroy_msg, -EPIPE); 183 172 pipe->nwriters = 0; 184 173 if (need_release && pipe->ops->release_pipe) 185 174 pipe->ops->release_pipe(inode); ··· 268 257 list_splice_init(&pipe->pipe, &free_list); 269 258 pipe->pipelen = 0; 270 259 spin_unlock(&pipe->lock); 271 - rpc_purge_list(pipe, &free_list, 260 + rpc_purge_list(&RPC_I(inode)->waitq, &free_list, 272 261 pipe->ops->destroy_msg, -EAGAIN); 273 262 } 274 263 } ··· 341 330 static unsigned int 342 331 rpc_pipe_poll(struct file *filp, struct poll_table_struct *wait) 343 332 { 344 - struct rpc_pipe *pipe = RPC_I(filp->f_path.dentry->d_inode)->pipe; 345 - unsigned int mask = 0; 333 + struct inode *inode = filp->f_path.dentry->d_inode; 334 + struct rpc_inode *rpci = RPC_I(inode); 335 + unsigned int mask = POLLOUT | POLLWRNORM; 346 336 347 - poll_wait(filp, &pipe->waitq, wait); 337 + poll_wait(filp, &rpci->waitq, wait); 348 338 349 - mask = POLLOUT | POLLWRNORM; 350 - if (pipe->dentry == NULL) 339 + mutex_lock(&inode->i_mutex); 340 + if (rpci->pipe == NULL) 351 341 mask |= POLLERR | POLLHUP; 352 - if (filp->private_data || !list_empty(&pipe->pipe)) 342 + else if (filp->private_data || !list_empty(&rpci->pipe->pipe)) 353 343 mask |= POLLIN | POLLRDNORM; 344 + mutex_unlock(&inode->i_mutex); 354 345 return mask; 355 346 } 356 347 ··· 556 543 INIT_LIST_HEAD(&pipe->in_downcall); 557 544 INIT_LIST_HEAD(&pipe->pipe); 558 545 pipe->pipelen = 0; 559 - init_waitqueue_head(&pipe->waitq); 560 546 INIT_DELAYED_WORK(&pipe->queue_timeout, 561 547 rpc_timeout_upcall_queue); 562 548 pipe->ops = NULL; ··· 1177 1165 inode_init_once(&rpci->vfs_inode); 1178 1166 rpci->private = NULL; 1179 1167 rpci->pipe = NULL; 1168 + init_waitqueue_head(&rpci->waitq); 1180 1169 } 1181 1170 1182 1171 int register_rpc_pipefs(void)