Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

pipe: Add general notification queue support

Make it possible to have a general notification queue built on top of a
standard pipe. Notifications are 'spliced' into the pipe and then read
out. splice(), vmsplice() and sendfile() are forbidden on pipes used for
notifications as post_one_notification() cannot take pipe->mutex. This
means that notifications could be posted in between individual pipe
buffers, making iov_iter_revert() difficult to effect.

The way the notification queue is used is:

(1) An application opens a pipe with a special flag and indicates the
number of messages it wishes to be able to queue at once (this can
only be set once):

pipe2(fds, O_NOTIFICATION_PIPE);
ioctl(fds[0], IOC_WATCH_QUEUE_SET_SIZE, queue_depth);

(2) The application then uses poll() and read() as normal to extract data
from the pipe. read() will return multiple notifications if the
buffer is big enough, but it will not split a notification across
buffers - rather it will return a short read or EMSGSIZE.

Notification messages include a length in the header so that the
caller can split them up.

Each message has a header that describes it:

struct watch_notification {
__u32 type:24;
__u32 subtype:8;
__u32 info;
};

The type indicates the source (eg. mount tree changes, superblock events,
keyring changes, block layer events) and the subtype indicates the event
type (eg. mount, unmount; EIO, EDQUOT; link, unlink). The info field
indicates a number of things, including the entry length, an ID assigned to
a watchpoint contributing to this buffer and type-specific flags.

Supplementary data, such as the key ID that generated an event, can be
attached in additional slots. The maximum message size is 127 bytes.
Messages may not be padded or aligned, so there is no guarantee, for
example, that the notification type will be on a 4-byte bounary.

Signed-off-by: David Howells <dhowells@redhat.com>

+1318 -76
+1
Documentation/userspace-api/ioctl/ioctl-number.rst
··· 201 201 'W' 00-1F linux/wanrouter.h conflict! (pre 3.9) 202 202 'W' 00-3F sound/asound.h conflict! 203 203 'W' 40-5F drivers/pci/switch/switchtec.c 204 + 'W' 60-61 linux/watch_queue.h 204 205 'X' all fs/xfs/xfs_fs.h, conflict! 205 206 fs/xfs/linux-2.6/xfs_ioctl32.h, 206 207 include/linux/falloc.h,
+339
Documentation/watch_queue.rst
··· 1 + ============================== 2 + General notification mechanism 3 + ============================== 4 + 5 + The general notification mechanism is built on top of the standard pipe driver 6 + whereby it effectively splices notification messages from the kernel into pipes 7 + opened by userspace. This can be used in conjunction with:: 8 + 9 + * Key/keyring notifications 10 + 11 + 12 + The notifications buffers can be enabled by: 13 + 14 + "General setup"/"General notification queue" 15 + (CONFIG_WATCH_QUEUE) 16 + 17 + This document has the following sections: 18 + 19 + .. contents:: :local: 20 + 21 + 22 + Overview 23 + ======== 24 + 25 + This facility appears as a pipe that is opened in a special mode. The pipe's 26 + internal ring buffer is used to hold messages that are generated by the kernel. 27 + These messages are then read out by read(). Splice and similar are disabled on 28 + such pipes due to them wanting to, under some circumstances, revert their 29 + additions to the ring - which might end up interleaved with notification 30 + messages. 31 + 32 + The owner of the pipe has to tell the kernel which sources it would like to 33 + watch through that pipe. Only sources that have been connected to a pipe will 34 + insert messages into it. Note that a source may be bound to multiple pipes and 35 + insert messages into all of them simultaneously. 36 + 37 + Filters may also be emplaced on a pipe so that certain source types and 38 + subevents can be ignored if they're not of interest. 39 + 40 + A message will be discarded if there isn't a slot available in the ring or if 41 + no preallocated message buffer is available. In both of these cases, read() 42 + will insert a WATCH_META_LOSS_NOTIFICATION message into the output buffer after 43 + the last message currently in the buffer has been read. 44 + 45 + Note that when producing a notification, the kernel does not wait for the 46 + consumers to collect it, but rather just continues on. This means that 47 + notifications can be generated whilst spinlocks are held and also protects the 48 + kernel from being held up indefinitely by a userspace malfunction. 49 + 50 + 51 + Message Structure 52 + ================= 53 + 54 + Notification messages begin with a short header:: 55 + 56 + struct watch_notification { 57 + __u32 type:24; 58 + __u32 subtype:8; 59 + __u32 info; 60 + }; 61 + 62 + "type" indicates the source of the notification record and "subtype" indicates 63 + the type of record from that source (see the Watch Sources section below). The 64 + type may also be "WATCH_TYPE_META". This is a special record type generated 65 + internally by the watch queue itself. There are two subtypes: 66 + 67 + * WATCH_META_REMOVAL_NOTIFICATION 68 + * WATCH_META_LOSS_NOTIFICATION 69 + 70 + The first indicates that an object on which a watch was installed was removed 71 + or destroyed and the second indicates that some messages have been lost. 72 + 73 + "info" indicates a bunch of things, including: 74 + 75 + * The length of the message in bytes, including the header (mask with 76 + WATCH_INFO_LENGTH and shift by WATCH_INFO_LENGTH__SHIFT). This indicates 77 + the size of the record, which may be between 8 and 127 bytes. 78 + 79 + * The watch ID (mask with WATCH_INFO_ID and shift by WATCH_INFO_ID__SHIFT). 80 + This indicates that caller's ID of the watch, which may be between 0 81 + and 255. Multiple watches may share a queue, and this provides a means to 82 + distinguish them. 83 + 84 + * A type-specific field (WATCH_INFO_TYPE_INFO). This is set by the 85 + notification producer to indicate some meaning specific to the type and 86 + subtype. 87 + 88 + Everything in info apart from the length can be used for filtering. 89 + 90 + The header can be followed by supplementary information. The format of this is 91 + at the discretion is defined by the type and subtype. 92 + 93 + 94 + Watch List (Notification Source) API 95 + ==================================== 96 + 97 + A "watch list" is a list of watchers that are subscribed to a source of 98 + notifications. A list may be attached to an object (say a key or a superblock) 99 + or may be global (say for device events). From a userspace perspective, a 100 + non-global watch list is typically referred to by reference to the object it 101 + belongs to (such as using KEYCTL_NOTIFY and giving it a key serial number to 102 + watch that specific key). 103 + 104 + To manage a watch list, the following functions are provided: 105 + 106 + * ``void init_watch_list(struct watch_list *wlist, 107 + void (*release_watch)(struct watch *wlist));`` 108 + 109 + Initialise a watch list. If ``release_watch`` is not NULL, then this 110 + indicates a function that should be called when the watch_list object is 111 + destroyed to discard any references the watch list holds on the watched 112 + object. 113 + 114 + * ``void remove_watch_list(struct watch_list *wlist);`` 115 + 116 + This removes all of the watches subscribed to a watch_list and frees them 117 + and then destroys the watch_list object itself. 118 + 119 + 120 + Watch Queue (Notification Output) API 121 + ===================================== 122 + 123 + A "watch queue" is the buffer allocated by an application that notification 124 + records will be written into. The workings of this are hidden entirely inside 125 + of the pipe device driver, but it is necessary to gain a reference to it to set 126 + a watch. These can be managed with: 127 + 128 + * ``struct watch_queue *get_watch_queue(int fd);`` 129 + 130 + Since watch queues are indicated to the kernel by the fd of the pipe that 131 + implements the buffer, userspace must hand that fd through a system call. 132 + This can be used to look up an opaque pointer to the watch queue from the 133 + system call. 134 + 135 + * ``void put_watch_queue(struct watch_queue *wqueue);`` 136 + 137 + This discards the reference obtained from ``get_watch_queue()``. 138 + 139 + 140 + Watch Subscription API 141 + ====================== 142 + 143 + A "watch" is a subscription on a watch list, indicating the watch queue, and 144 + thus the buffer, into which notification records should be written. The watch 145 + queue object may also carry filtering rules for that object, as set by 146 + userspace. Some parts of the watch struct can be set by the driver:: 147 + 148 + struct watch { 149 + union { 150 + u32 info_id; /* ID to be OR'd in to info field */ 151 + ... 152 + }; 153 + void *private; /* Private data for the watched object */ 154 + u64 id; /* Internal identifier */ 155 + ... 156 + }; 157 + 158 + The ``info_id`` value should be an 8-bit number obtained from userspace and 159 + shifted by WATCH_INFO_ID__SHIFT. This is OR'd into the WATCH_INFO_ID field of 160 + struct watch_notification::info when and if the notification is written into 161 + the associated watch queue buffer. 162 + 163 + The ``private`` field is the driver's data associated with the watch_list and 164 + is cleaned up by the ``watch_list::release_watch()`` method. 165 + 166 + The ``id`` field is the source's ID. Notifications that are posted with a 167 + different ID are ignored. 168 + 169 + The following functions are provided to manage watches: 170 + 171 + * ``void init_watch(struct watch *watch, struct watch_queue *wqueue);`` 172 + 173 + Initialise a watch object, setting its pointer to the watch queue, using 174 + appropriate barriering to avoid lockdep complaints. 175 + 176 + * ``int add_watch_to_object(struct watch *watch, struct watch_list *wlist);`` 177 + 178 + Subscribe a watch to a watch list (notification source). The 179 + driver-settable fields in the watch struct must have been set before this 180 + is called. 181 + 182 + * ``int remove_watch_from_object(struct watch_list *wlist, 183 + struct watch_queue *wqueue, 184 + u64 id, false);`` 185 + 186 + Remove a watch from a watch list, where the watch must match the specified 187 + watch queue (``wqueue``) and object identifier (``id``). A notification 188 + (``WATCH_META_REMOVAL_NOTIFICATION``) is sent to the watch queue to 189 + indicate that the watch got removed. 190 + 191 + * ``int remove_watch_from_object(struct watch_list *wlist, NULL, 0, true);`` 192 + 193 + Remove all the watches from a watch list. It is expected that this will be 194 + called preparatory to destruction and that the watch list will be 195 + inaccessible to new watches by this point. A notification 196 + (``WATCH_META_REMOVAL_NOTIFICATION``) is sent to the watch queue of each 197 + subscribed watch to indicate that the watch got removed. 198 + 199 + 200 + Notification Posting API 201 + ======================== 202 + 203 + To post a notification to watch list so that the subscribed watches can see it, 204 + the following function should be used:: 205 + 206 + void post_watch_notification(struct watch_list *wlist, 207 + struct watch_notification *n, 208 + const struct cred *cred, 209 + u64 id); 210 + 211 + The notification should be preformatted and a pointer to the header (``n``) 212 + should be passed in. The notification may be larger than this and the size in 213 + units of buffer slots is noted in ``n->info & WATCH_INFO_LENGTH``. 214 + 215 + The ``cred`` struct indicates the credentials of the source (subject) and is 216 + passed to the LSMs, such as SELinux, to allow or suppress the recording of the 217 + note in each individual queue according to the credentials of that queue 218 + (object). 219 + 220 + The ``id`` is the ID of the source object (such as the serial number on a key). 221 + Only watches that have the same ID set in them will see this notification. 222 + 223 + 224 + Watch Sources 225 + ============= 226 + 227 + Any particular buffer can be fed from multiple sources. Sources include: 228 + 229 + * WATCH_TYPE_KEY_NOTIFY 230 + 231 + Notifications of this type indicate changes to keys and keyrings, including 232 + the changes of keyring contents or the attributes of keys. 233 + 234 + See Documentation/security/keys/core.rst for more information. 235 + 236 + 237 + Event Filtering 238 + =============== 239 + 240 + Once a watch queue has been created, a set of filters can be applied to limit 241 + the events that are received using:: 242 + 243 + struct watch_notification_filter filter = { 244 + ... 245 + }; 246 + ioctl(fd, IOC_WATCH_QUEUE_SET_FILTER, &filter) 247 + 248 + The filter description is a variable of type:: 249 + 250 + struct watch_notification_filter { 251 + __u32 nr_filters; 252 + __u32 __reserved; 253 + struct watch_notification_type_filter filters[]; 254 + }; 255 + 256 + Where "nr_filters" is the number of filters in filters[] and "__reserved" 257 + should be 0. The "filters" array has elements of the following type:: 258 + 259 + struct watch_notification_type_filter { 260 + __u32 type; 261 + __u32 info_filter; 262 + __u32 info_mask; 263 + __u32 subtype_filter[8]; 264 + }; 265 + 266 + Where: 267 + 268 + * ``type`` is the event type to filter for and should be something like 269 + "WATCH_TYPE_KEY_NOTIFY" 270 + 271 + * ``info_filter`` and ``info_mask`` act as a filter on the info field of the 272 + notification record. The notification is only written into the buffer if:: 273 + 274 + (watch.info & info_mask) == info_filter 275 + 276 + This could be used, for example, to ignore events that are not exactly on 277 + the watched point in a mount tree. 278 + 279 + * ``subtype_filter`` is a bitmask indicating the subtypes that are of 280 + interest. Bit 0 of subtype_filter[0] corresponds to subtype 0, bit 1 to 281 + subtype 1, and so on. 282 + 283 + If the argument to the ioctl() is NULL, then the filters will be removed and 284 + all events from the watched sources will come through. 285 + 286 + 287 + Userspace Code Example 288 + ====================== 289 + 290 + A buffer is created with something like the following:: 291 + 292 + pipe2(fds, O_TMPFILE); 293 + ioctl(fds[1], IOC_WATCH_QUEUE_SET_SIZE, 256); 294 + 295 + It can then be set to receive keyring change notifications:: 296 + 297 + keyctl(KEYCTL_WATCH_KEY, KEY_SPEC_SESSION_KEYRING, fds[1], 0x01); 298 + 299 + The notifications can then be consumed by something like the following:: 300 + 301 + static void consumer(int rfd, struct watch_queue_buffer *buf) 302 + { 303 + unsigned char buffer[128]; 304 + ssize_t buf_len; 305 + 306 + while (buf_len = read(rfd, buffer, sizeof(buffer)), 307 + buf_len > 0 308 + ) { 309 + void *p = buffer; 310 + void *end = buffer + buf_len; 311 + while (p < end) { 312 + union { 313 + struct watch_notification n; 314 + unsigned char buf1[128]; 315 + } n; 316 + size_t largest, len; 317 + 318 + largest = end - p; 319 + if (largest > 128) 320 + largest = 128; 321 + memcpy(&n, p, largest); 322 + 323 + len = (n->info & WATCH_INFO_LENGTH) >> 324 + WATCH_INFO_LENGTH__SHIFT; 325 + if (len == 0 || len > largest) 326 + return; 327 + 328 + switch (n.n.type) { 329 + case WATCH_TYPE_META: 330 + got_meta(&n.n); 331 + case WATCH_TYPE_KEY_NOTIFY: 332 + saw_key_change(&n.n); 333 + break; 334 + } 335 + 336 + p += len; 337 + } 338 + } 339 + }
+137 -69
fs/pipe.c
··· 24 24 #include <linux/syscalls.h> 25 25 #include <linux/fcntl.h> 26 26 #include <linux/memcontrol.h> 27 + #include <linux/watch_queue.h> 27 28 28 29 #include <linux/uaccess.h> 29 30 #include <asm/ioctls.h> ··· 460 459 goto out; 461 460 } 462 461 462 + #ifdef CONFIG_WATCH_QUEUE 463 + if (pipe->watch_queue) { 464 + ret = -EXDEV; 465 + goto out; 466 + } 467 + #endif 468 + 463 469 /* 464 470 * Only wake up if the pipe started out empty, since 465 471 * otherwise there should be no readers waiting. ··· 636 628 int count, head, tail, mask; 637 629 638 630 switch (cmd) { 639 - case FIONREAD: 640 - __pipe_lock(pipe); 641 - count = 0; 642 - head = pipe->head; 643 - tail = pipe->tail; 644 - mask = pipe->ring_size - 1; 631 + case FIONREAD: 632 + __pipe_lock(pipe); 633 + count = 0; 634 + head = pipe->head; 635 + tail = pipe->tail; 636 + mask = pipe->ring_size - 1; 645 637 646 - while (tail != head) { 647 - count += pipe->bufs[tail & mask].len; 648 - tail++; 649 - } 650 - __pipe_unlock(pipe); 638 + while (tail != head) { 639 + count += pipe->bufs[tail & mask].len; 640 + tail++; 641 + } 642 + __pipe_unlock(pipe); 651 643 652 - return put_user(count, (int __user *)arg); 653 - default: 654 - return -ENOIOCTLCMD; 644 + return put_user(count, (int __user *)arg); 645 + 646 + #ifdef CONFIG_WATCH_QUEUE 647 + case IOC_WATCH_QUEUE_SET_SIZE: { 648 + int ret; 649 + __pipe_lock(pipe); 650 + ret = watch_queue_set_size(pipe, arg); 651 + __pipe_unlock(pipe); 652 + return ret; 653 + } 654 + 655 + case IOC_WATCH_QUEUE_SET_FILTER: 656 + return watch_queue_set_filter( 657 + pipe, (struct watch_notification_filter __user *)arg); 658 + #endif 659 + 660 + default: 661 + return -ENOIOCTLCMD; 655 662 } 656 663 } 657 664 ··· 777 754 return retval; 778 755 } 779 756 780 - static unsigned long account_pipe_buffers(struct user_struct *user, 781 - unsigned long old, unsigned long new) 757 + unsigned long account_pipe_buffers(struct user_struct *user, 758 + unsigned long old, unsigned long new) 782 759 { 783 760 return atomic_long_add_return(new - old, &user->pipe_bufs); 784 761 } 785 762 786 - static bool too_many_pipe_buffers_soft(unsigned long user_bufs) 763 + bool too_many_pipe_buffers_soft(unsigned long user_bufs) 787 764 { 788 765 unsigned long soft_limit = READ_ONCE(pipe_user_pages_soft); 789 766 790 767 return soft_limit && user_bufs > soft_limit; 791 768 } 792 769 793 - static bool too_many_pipe_buffers_hard(unsigned long user_bufs) 770 + bool too_many_pipe_buffers_hard(unsigned long user_bufs) 794 771 { 795 772 unsigned long hard_limit = READ_ONCE(pipe_user_pages_hard); 796 773 797 774 return hard_limit && user_bufs > hard_limit; 798 775 } 799 776 800 - static bool is_unprivileged_user(void) 777 + bool pipe_is_unprivileged_user(void) 801 778 { 802 779 return !capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN); 803 780 } ··· 819 796 820 797 user_bufs = account_pipe_buffers(user, 0, pipe_bufs); 821 798 822 - if (too_many_pipe_buffers_soft(user_bufs) && is_unprivileged_user()) { 799 + if (too_many_pipe_buffers_soft(user_bufs) && pipe_is_unprivileged_user()) { 823 800 user_bufs = account_pipe_buffers(user, pipe_bufs, 1); 824 801 pipe_bufs = 1; 825 802 } 826 803 827 - if (too_many_pipe_buffers_hard(user_bufs) && is_unprivileged_user()) 804 + if (too_many_pipe_buffers_hard(user_bufs) && pipe_is_unprivileged_user()) 828 805 goto out_revert_acct; 829 806 830 807 pipe->bufs = kcalloc(pipe_bufs, sizeof(struct pipe_buffer), ··· 836 813 pipe->r_counter = pipe->w_counter = 1; 837 814 pipe->max_usage = pipe_bufs; 838 815 pipe->ring_size = pipe_bufs; 816 + pipe->nr_accounted = pipe_bufs; 839 817 pipe->user = user; 840 818 mutex_init(&pipe->mutex); 841 819 return pipe; ··· 854 830 { 855 831 int i; 856 832 857 - (void) account_pipe_buffers(pipe->user, pipe->ring_size, 0); 833 + #ifdef CONFIG_WATCH_QUEUE 834 + if (pipe->watch_queue) { 835 + watch_queue_clear(pipe->watch_queue); 836 + put_watch_queue(pipe->watch_queue); 837 + } 838 + #endif 839 + 840 + (void) account_pipe_buffers(pipe->user, pipe->nr_accounted, 0); 858 841 free_uid(pipe->user); 859 842 for (i = 0; i < pipe->ring_size; i++) { 860 843 struct pipe_buffer *buf = pipe->bufs + i; ··· 937 906 if (!inode) 938 907 return -ENFILE; 939 908 909 + if (flags & O_NOTIFICATION_PIPE) { 910 + #ifdef CONFIG_WATCH_QUEUE 911 + if (watch_queue_init(inode->i_pipe) < 0) { 912 + iput(inode); 913 + return -ENOMEM; 914 + } 915 + #else 916 + return -ENOPKG; 917 + #endif 918 + } 919 + 940 920 f = alloc_file_pseudo(inode, pipe_mnt, "", 941 921 O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT)), 942 922 &pipefifo_fops); ··· 978 936 int error; 979 937 int fdw, fdr; 980 938 981 - if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT)) 939 + if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT | O_NOTIFICATION_PIPE)) 982 940 return -EINVAL; 983 941 984 942 error = create_pipe_files(files, flags); ··· 1226 1184 } 1227 1185 1228 1186 /* 1229 - * Allocate a new array of pipe buffers and copy the info over. Returns the 1230 - * pipe size if successful, or return -ERROR on error. 1187 + * Resize the pipe ring to a number of slots. 1231 1188 */ 1232 - static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg) 1189 + int pipe_resize_ring(struct pipe_inode_info *pipe, unsigned int nr_slots) 1233 1190 { 1234 1191 struct pipe_buffer *bufs; 1235 - unsigned int size, nr_slots, head, tail, mask, n; 1236 - unsigned long user_bufs; 1237 - long ret = 0; 1238 - 1239 - size = round_pipe_size(arg); 1240 - nr_slots = size >> PAGE_SHIFT; 1241 - 1242 - if (!nr_slots) 1243 - return -EINVAL; 1244 - 1245 - /* 1246 - * If trying to increase the pipe capacity, check that an 1247 - * unprivileged user is not trying to exceed various limits 1248 - * (soft limit check here, hard limit check just below). 1249 - * Decreasing the pipe capacity is always permitted, even 1250 - * if the user is currently over a limit. 1251 - */ 1252 - if (nr_slots > pipe->ring_size && 1253 - size > pipe_max_size && !capable(CAP_SYS_RESOURCE)) 1254 - return -EPERM; 1255 - 1256 - user_bufs = account_pipe_buffers(pipe->user, pipe->ring_size, nr_slots); 1257 - 1258 - if (nr_slots > pipe->ring_size && 1259 - (too_many_pipe_buffers_hard(user_bufs) || 1260 - too_many_pipe_buffers_soft(user_bufs)) && 1261 - is_unprivileged_user()) { 1262 - ret = -EPERM; 1263 - goto out_revert_acct; 1264 - } 1192 + unsigned int head, tail, mask, n; 1265 1193 1266 1194 /* 1267 1195 * We can shrink the pipe, if arg is greater than the ring occupancy. ··· 1243 1231 head = pipe->head; 1244 1232 tail = pipe->tail; 1245 1233 n = pipe_occupancy(pipe->head, pipe->tail); 1246 - if (nr_slots < n) { 1247 - ret = -EBUSY; 1248 - goto out_revert_acct; 1249 - } 1234 + if (nr_slots < n) 1235 + return -EBUSY; 1250 1236 1251 1237 bufs = kcalloc(nr_slots, sizeof(*bufs), 1252 1238 GFP_KERNEL_ACCOUNT | __GFP_NOWARN); 1253 - if (unlikely(!bufs)) { 1254 - ret = -ENOMEM; 1255 - goto out_revert_acct; 1256 - } 1239 + if (unlikely(!bufs)) 1240 + return -ENOMEM; 1257 1241 1258 1242 /* 1259 1243 * The pipe array wraps around, so just start the new one at zero ··· 1277 1269 kfree(pipe->bufs); 1278 1270 pipe->bufs = bufs; 1279 1271 pipe->ring_size = nr_slots; 1280 - pipe->max_usage = nr_slots; 1272 + if (pipe->max_usage > nr_slots) 1273 + pipe->max_usage = nr_slots; 1281 1274 pipe->tail = tail; 1282 1275 pipe->head = head; 1283 1276 1284 1277 /* This might have made more room for writers */ 1285 1278 wake_up_interruptible(&pipe->wr_wait); 1279 + return 0; 1280 + } 1281 + 1282 + /* 1283 + * Allocate a new array of pipe buffers and copy the info over. Returns the 1284 + * pipe size if successful, or return -ERROR on error. 1285 + */ 1286 + static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg) 1287 + { 1288 + unsigned long user_bufs; 1289 + unsigned int nr_slots, size; 1290 + long ret = 0; 1291 + 1292 + #ifdef CONFIG_WATCH_QUEUE 1293 + if (pipe->watch_queue) 1294 + return -EBUSY; 1295 + #endif 1296 + 1297 + size = round_pipe_size(arg); 1298 + nr_slots = size >> PAGE_SHIFT; 1299 + 1300 + if (!nr_slots) 1301 + return -EINVAL; 1302 + 1303 + /* 1304 + * If trying to increase the pipe capacity, check that an 1305 + * unprivileged user is not trying to exceed various limits 1306 + * (soft limit check here, hard limit check just below). 1307 + * Decreasing the pipe capacity is always permitted, even 1308 + * if the user is currently over a limit. 1309 + */ 1310 + if (nr_slots > pipe->max_usage && 1311 + size > pipe_max_size && !capable(CAP_SYS_RESOURCE)) 1312 + return -EPERM; 1313 + 1314 + user_bufs = account_pipe_buffers(pipe->user, pipe->nr_accounted, nr_slots); 1315 + 1316 + if (nr_slots > pipe->max_usage && 1317 + (too_many_pipe_buffers_hard(user_bufs) || 1318 + too_many_pipe_buffers_soft(user_bufs)) && 1319 + pipe_is_unprivileged_user()) { 1320 + ret = -EPERM; 1321 + goto out_revert_acct; 1322 + } 1323 + 1324 + ret = pipe_resize_ring(pipe, nr_slots); 1325 + if (ret < 0) 1326 + goto out_revert_acct; 1327 + 1328 + pipe->max_usage = nr_slots; 1329 + pipe->nr_accounted = nr_slots; 1286 1330 return pipe->max_usage * PAGE_SIZE; 1287 1331 1288 1332 out_revert_acct: 1289 - (void) account_pipe_buffers(pipe->user, nr_slots, pipe->ring_size); 1333 + (void) account_pipe_buffers(pipe->user, nr_slots, pipe->nr_accounted); 1290 1334 return ret; 1291 1335 } 1292 1336 ··· 1347 1287 * location, so checking ->i_pipe is not enough to verify that this is a 1348 1288 * pipe. 1349 1289 */ 1350 - struct pipe_inode_info *get_pipe_info(struct file *file) 1290 + struct pipe_inode_info *get_pipe_info(struct file *file, bool for_splice) 1351 1291 { 1352 - return file->f_op == &pipefifo_fops ? file->private_data : NULL; 1292 + struct pipe_inode_info *pipe = file->private_data; 1293 + 1294 + if (file->f_op != &pipefifo_fops || !pipe) 1295 + return NULL; 1296 + #ifdef CONFIG_WATCH_QUEUE 1297 + if (for_splice && pipe->watch_queue) 1298 + return NULL; 1299 + #endif 1300 + return pipe; 1353 1301 } 1354 1302 1355 1303 long pipe_fcntl(struct file *file, unsigned int cmd, unsigned long arg) ··· 1365 1297 struct pipe_inode_info *pipe; 1366 1298 long ret; 1367 1299 1368 - pipe = get_pipe_info(file); 1300 + pipe = get_pipe_info(file, false); 1369 1301 if (!pipe) 1370 1302 return -EBADF; 1371 1303
+6 -6
fs/splice.c
··· 1122 1122 !(out->f_mode & FMODE_WRITE))) 1123 1123 return -EBADF; 1124 1124 1125 - ipipe = get_pipe_info(in); 1126 - opipe = get_pipe_info(out); 1125 + ipipe = get_pipe_info(in, true); 1126 + opipe = get_pipe_info(out, true); 1127 1127 1128 1128 if (ipipe && opipe) { 1129 1129 if (off_in || off_out) ··· 1273 1273 static long vmsplice_to_user(struct file *file, struct iov_iter *iter, 1274 1274 unsigned int flags) 1275 1275 { 1276 - struct pipe_inode_info *pipe = get_pipe_info(file); 1276 + struct pipe_inode_info *pipe = get_pipe_info(file, true); 1277 1277 struct splice_desc sd = { 1278 1278 .total_len = iov_iter_count(iter), 1279 1279 .flags = flags, ··· 1308 1308 if (flags & SPLICE_F_GIFT) 1309 1309 buf_flag = PIPE_BUF_FLAG_GIFT; 1310 1310 1311 - pipe = get_pipe_info(file); 1311 + pipe = get_pipe_info(file, true); 1312 1312 if (!pipe) 1313 1313 return -EBADF; 1314 1314 ··· 1757 1757 static long do_tee(struct file *in, struct file *out, size_t len, 1758 1758 unsigned int flags) 1759 1759 { 1760 - struct pipe_inode_info *ipipe = get_pipe_info(in); 1761 - struct pipe_inode_info *opipe = get_pipe_info(out); 1760 + struct pipe_inode_info *ipipe = get_pipe_info(in, true); 1761 + struct pipe_inode_info *opipe = get_pipe_info(out, true); 1762 1762 int ret = -EINVAL; 1763 1763 1764 1764 if (unlikely(!(in->f_mode & FMODE_READ) ||
+18 -1
include/linux/pipe_fs_i.h
··· 35 35 * @tail: The point of buffer consumption 36 36 * @max_usage: The maximum number of slots that may be used in the ring 37 37 * @ring_size: total number of buffers (should be a power of 2) 38 + * @nr_accounted: The amount this pipe accounts for in user->pipe_bufs 38 39 * @tmp_page: cached released page 39 40 * @readers: number of current readers of this pipe 40 41 * @writers: number of current writers of this pipe ··· 46 45 * @fasync_writers: writer side fasync 47 46 * @bufs: the circular array of pipe buffers 48 47 * @user: the user who created this pipe 48 + * @watch_queue: If this pipe is a watch_queue, this is the stuff for that 49 49 **/ 50 50 struct pipe_inode_info { 51 51 struct mutex mutex; ··· 55 53 unsigned int tail; 56 54 unsigned int max_usage; 57 55 unsigned int ring_size; 56 + unsigned int nr_accounted; 58 57 unsigned int readers; 59 58 unsigned int writers; 60 59 unsigned int files; ··· 66 63 struct fasync_struct *fasync_writers; 67 64 struct pipe_buffer *bufs; 68 65 struct user_struct *user; 66 + #ifdef CONFIG_WATCH_QUEUE 67 + struct watch_queue *watch_queue; 68 + #endif 69 69 }; 70 70 71 71 /* ··· 243 237 244 238 extern const struct pipe_buf_operations nosteal_pipe_buf_ops; 245 239 240 + #ifdef CONFIG_WATCH_QUEUE 241 + unsigned long account_pipe_buffers(struct user_struct *user, 242 + unsigned long old, unsigned long new); 243 + bool too_many_pipe_buffers_soft(unsigned long user_bufs); 244 + bool too_many_pipe_buffers_hard(unsigned long user_bufs); 245 + bool pipe_is_unprivileged_user(void); 246 + #endif 247 + 246 248 /* for F_SETPIPE_SZ and F_GETPIPE_SZ */ 249 + #ifdef CONFIG_WATCH_QUEUE 250 + int pipe_resize_ring(struct pipe_inode_info *pipe, unsigned int nr_slots); 251 + #endif 247 252 long pipe_fcntl(struct file *, unsigned int, unsigned long arg); 248 - struct pipe_inode_info *get_pipe_info(struct file *file); 253 + struct pipe_inode_info *get_pipe_info(struct file *file, bool for_splice); 249 254 250 255 int create_pipe_files(struct file **, int); 251 256 unsigned int round_pipe_size(unsigned long size);
+127
include/linux/watch_queue.h
··· 1 + // SPDX-License-Identifier: GPL-2.0 2 + /* User-mappable watch queue 3 + * 4 + * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. 5 + * Written by David Howells (dhowells@redhat.com) 6 + * 7 + * See Documentation/watch_queue.rst 8 + */ 9 + 10 + #ifndef _LINUX_WATCH_QUEUE_H 11 + #define _LINUX_WATCH_QUEUE_H 12 + 13 + #include <uapi/linux/watch_queue.h> 14 + #include <linux/kref.h> 15 + #include <linux/rcupdate.h> 16 + 17 + #ifdef CONFIG_WATCH_QUEUE 18 + 19 + struct cred; 20 + 21 + struct watch_type_filter { 22 + enum watch_notification_type type; 23 + __u32 subtype_filter[1]; /* Bitmask of subtypes to filter on */ 24 + __u32 info_filter; /* Filter on watch_notification::info */ 25 + __u32 info_mask; /* Mask of relevant bits in info_filter */ 26 + }; 27 + 28 + struct watch_filter { 29 + union { 30 + struct rcu_head rcu; 31 + unsigned long type_filter[2]; /* Bitmask of accepted types */ 32 + }; 33 + u32 nr_filters; /* Number of filters */ 34 + struct watch_type_filter filters[]; 35 + }; 36 + 37 + struct watch_queue { 38 + struct rcu_head rcu; 39 + struct watch_filter __rcu *filter; 40 + struct pipe_inode_info *pipe; /* The pipe we're using as a buffer */ 41 + struct hlist_head watches; /* Contributory watches */ 42 + struct page **notes; /* Preallocated notifications */ 43 + unsigned long *notes_bitmap; /* Allocation bitmap for notes */ 44 + struct kref usage; /* Object usage count */ 45 + spinlock_t lock; 46 + unsigned int nr_notes; /* Number of notes */ 47 + unsigned int nr_pages; /* Number of pages in notes[] */ 48 + bool defunct; /* T when queues closed */ 49 + }; 50 + 51 + /* 52 + * Representation of a watch on an object. 53 + */ 54 + struct watch { 55 + union { 56 + struct rcu_head rcu; 57 + u32 info_id; /* ID to be OR'd in to info field */ 58 + }; 59 + struct watch_queue __rcu *queue; /* Queue to post events to */ 60 + struct hlist_node queue_node; /* Link in queue->watches */ 61 + struct watch_list __rcu *watch_list; 62 + struct hlist_node list_node; /* Link in watch_list->watchers */ 63 + const struct cred *cred; /* Creds of the owner of the watch */ 64 + void *private; /* Private data for the watched object */ 65 + u64 id; /* Internal identifier */ 66 + struct kref usage; /* Object usage count */ 67 + }; 68 + 69 + /* 70 + * List of watches on an object. 71 + */ 72 + struct watch_list { 73 + struct rcu_head rcu; 74 + struct hlist_head watchers; 75 + void (*release_watch)(struct watch *); 76 + spinlock_t lock; 77 + }; 78 + 79 + extern void __post_watch_notification(struct watch_list *, 80 + struct watch_notification *, 81 + const struct cred *, 82 + u64); 83 + extern struct watch_queue *get_watch_queue(int); 84 + extern void put_watch_queue(struct watch_queue *); 85 + extern void init_watch(struct watch *, struct watch_queue *); 86 + extern int add_watch_to_object(struct watch *, struct watch_list *); 87 + extern int remove_watch_from_object(struct watch_list *, struct watch_queue *, u64, bool); 88 + extern long watch_queue_set_size(struct pipe_inode_info *, unsigned int); 89 + extern long watch_queue_set_filter(struct pipe_inode_info *, 90 + struct watch_notification_filter __user *); 91 + extern int watch_queue_init(struct pipe_inode_info *); 92 + extern void watch_queue_clear(struct watch_queue *); 93 + 94 + static inline void init_watch_list(struct watch_list *wlist, 95 + void (*release_watch)(struct watch *)) 96 + { 97 + INIT_HLIST_HEAD(&wlist->watchers); 98 + spin_lock_init(&wlist->lock); 99 + wlist->release_watch = release_watch; 100 + } 101 + 102 + static inline void post_watch_notification(struct watch_list *wlist, 103 + struct watch_notification *n, 104 + const struct cred *cred, 105 + u64 id) 106 + { 107 + if (unlikely(wlist)) 108 + __post_watch_notification(wlist, n, cred, id); 109 + } 110 + 111 + static inline void remove_watch_list(struct watch_list *wlist, u64 id) 112 + { 113 + if (wlist) { 114 + remove_watch_from_object(wlist, NULL, id, true); 115 + kfree_rcu(wlist, rcu); 116 + } 117 + } 118 + 119 + /** 120 + * watch_sizeof - Calculate the information part of the size of a watch record, 121 + * given the structure size. 122 + */ 123 + #define watch_sizeof(STRUCT) (sizeof(STRUCT) << WATCH_INFO_LENGTH__SHIFT) 124 + 125 + #endif 126 + 127 + #endif /* _LINUX_WATCH_QUEUE_H */
+20
include/uapi/linux/watch_queue.h
··· 4 4 5 5 #include <linux/types.h> 6 6 #include <linux/fcntl.h> 7 + #include <linux/ioctl.h> 7 8 8 9 #define O_NOTIFICATION_PIPE O_EXCL /* Parameter to pipe2() selecting notification pipe */ 10 + 11 + #define IOC_WATCH_QUEUE_SET_SIZE _IO('W', 0x60) /* Set the size in pages */ 12 + #define IOC_WATCH_QUEUE_SET_FILTER _IO('W', 0x61) /* Set the filter */ 9 13 10 14 enum watch_notification_type { 11 15 WATCH_TYPE_META = 0, /* Special record */ ··· 43 39 #define WATCH_INFO_FLAG_5 0x00200000 44 40 #define WATCH_INFO_FLAG_6 0x00400000 45 41 #define WATCH_INFO_FLAG_7 0x00800000 42 + }; 43 + 44 + /* 45 + * Notification filtering rules (IOC_WATCH_QUEUE_SET_FILTER). 46 + */ 47 + struct watch_notification_type_filter { 48 + __u32 type; /* Type to apply filter to */ 49 + __u32 info_filter; /* Filter on watch_notification::info */ 50 + __u32 info_mask; /* Mask of relevant bits in info_filter */ 51 + __u32 subtype_filter[8]; /* Bitmask of subtypes to filter on */ 52 + }; 53 + 54 + struct watch_notification_filter { 55 + __u32 nr_filters; /* Number of filters */ 56 + __u32 __reserved; /* Must be 0 */ 57 + struct watch_notification_type_filter filters[]; 46 58 }; 47 59 48 60
+12
init/Kconfig
··· 326 326 depends on SYSCTL 327 327 default y 328 328 329 + config WATCH_QUEUE 330 + bool "General notification queue" 331 + default n 332 + help 333 + 334 + This is a general notification queue for the kernel to pass events to 335 + userspace by splicing them into pipes. It can be used in conjunction 336 + with watches for key/keyring change notifications and device 337 + notifications. 338 + 339 + See Documentation/watch_queue.rst 340 + 329 341 config CROSS_MEMORY_ATTACH 330 342 bool "Enable process_vm_readv/writev syscalls" 331 343 depends on MMU
+1
kernel/Makefile
··· 115 115 116 116 obj-$(CONFIG_HAS_IOMEM) += iomem.o 117 117 obj-$(CONFIG_RSEQ) += rseq.o 118 + obj-$(CONFIG_WATCH_QUEUE) += watch_queue.o 118 119 119 120 obj-$(CONFIG_SYSCTL_KUNIT_TEST) += sysctl-test.o 120 121
+657
kernel/watch_queue.c
··· 1 + // SPDX-License-Identifier: GPL-2.0 2 + /* Watch queue and general notification mechanism, built on pipes 3 + * 4 + * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. 5 + * Written by David Howells (dhowells@redhat.com) 6 + * 7 + * See Documentation/watch_queue.rst 8 + */ 9 + 10 + #define pr_fmt(fmt) "watchq: " fmt 11 + #include <linux/module.h> 12 + #include <linux/init.h> 13 + #include <linux/sched.h> 14 + #include <linux/slab.h> 15 + #include <linux/printk.h> 16 + #include <linux/miscdevice.h> 17 + #include <linux/fs.h> 18 + #include <linux/mm.h> 19 + #include <linux/pagemap.h> 20 + #include <linux/poll.h> 21 + #include <linux/uaccess.h> 22 + #include <linux/vmalloc.h> 23 + #include <linux/file.h> 24 + #include <linux/security.h> 25 + #include <linux/cred.h> 26 + #include <linux/sched/signal.h> 27 + #include <linux/watch_queue.h> 28 + #include <linux/pipe_fs_i.h> 29 + 30 + MODULE_DESCRIPTION("Watch queue"); 31 + MODULE_AUTHOR("Red Hat, Inc."); 32 + MODULE_LICENSE("GPL"); 33 + 34 + #define WATCH_QUEUE_NOTE_SIZE 128 35 + #define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE) 36 + 37 + static void watch_queue_pipe_buf_release(struct pipe_inode_info *pipe, 38 + struct pipe_buffer *buf) 39 + { 40 + struct watch_queue *wqueue = (struct watch_queue *)buf->private; 41 + struct page *page; 42 + unsigned int bit; 43 + 44 + /* We need to work out which note within the page this refers to, but 45 + * the note might have been maximum size, so merely ANDing the offset 46 + * off doesn't work. OTOH, the note must've been more than zero size. 47 + */ 48 + bit = buf->offset + buf->len; 49 + if ((bit & (WATCH_QUEUE_NOTE_SIZE - 1)) == 0) 50 + bit -= WATCH_QUEUE_NOTE_SIZE; 51 + bit /= WATCH_QUEUE_NOTE_SIZE; 52 + 53 + page = buf->page; 54 + bit += page->index; 55 + 56 + set_bit(bit, wqueue->notes_bitmap); 57 + } 58 + 59 + static int watch_queue_pipe_buf_steal(struct pipe_inode_info *pipe, 60 + struct pipe_buffer *buf) 61 + { 62 + return -1; /* No. */ 63 + } 64 + 65 + /* New data written to a pipe may be appended to a buffer with this type. */ 66 + static const struct pipe_buf_operations watch_queue_pipe_buf_ops = { 67 + .confirm = generic_pipe_buf_confirm, 68 + .release = watch_queue_pipe_buf_release, 69 + .steal = watch_queue_pipe_buf_steal, 70 + .get = generic_pipe_buf_get, 71 + }; 72 + 73 + /* 74 + * Post a notification to a watch queue. 75 + */ 76 + static bool post_one_notification(struct watch_queue *wqueue, 77 + struct watch_notification *n) 78 + { 79 + void *p; 80 + struct pipe_inode_info *pipe = wqueue->pipe; 81 + struct pipe_buffer *buf; 82 + struct page *page; 83 + unsigned int head, tail, mask, note, offset, len; 84 + bool done = false; 85 + 86 + if (!pipe) 87 + return false; 88 + 89 + spin_lock_irq(&pipe->rd_wait.lock); 90 + 91 + if (wqueue->defunct) 92 + goto out; 93 + 94 + mask = pipe->ring_size - 1; 95 + head = pipe->head; 96 + tail = pipe->tail; 97 + if (pipe_full(head, tail, pipe->ring_size)) 98 + goto lost; 99 + 100 + note = find_first_bit(wqueue->notes_bitmap, wqueue->nr_notes); 101 + if (note >= wqueue->nr_notes) 102 + goto lost; 103 + 104 + page = wqueue->notes[note / WATCH_QUEUE_NOTES_PER_PAGE]; 105 + offset = note % WATCH_QUEUE_NOTES_PER_PAGE * WATCH_QUEUE_NOTE_SIZE; 106 + get_page(page); 107 + len = n->info & WATCH_INFO_LENGTH; 108 + p = kmap_atomic(page); 109 + memcpy(p + offset, n, len); 110 + kunmap_atomic(p); 111 + 112 + buf = &pipe->bufs[head & mask]; 113 + buf->page = page; 114 + buf->private = (unsigned long)wqueue; 115 + buf->ops = &watch_queue_pipe_buf_ops; 116 + buf->offset = offset; 117 + buf->len = len; 118 + buf->flags = 0; 119 + pipe->head = head + 1; 120 + 121 + if (!test_and_clear_bit(note, wqueue->notes_bitmap)) { 122 + spin_unlock_irq(&pipe->rd_wait.lock); 123 + BUG(); 124 + } 125 + wake_up_interruptible_sync_poll_locked(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM); 126 + done = true; 127 + 128 + out: 129 + spin_unlock_irq(&pipe->rd_wait.lock); 130 + if (done) 131 + kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); 132 + return done; 133 + 134 + lost: 135 + goto out; 136 + } 137 + 138 + /* 139 + * Apply filter rules to a notification. 140 + */ 141 + static bool filter_watch_notification(const struct watch_filter *wf, 142 + const struct watch_notification *n) 143 + { 144 + const struct watch_type_filter *wt; 145 + unsigned int st_bits = sizeof(wt->subtype_filter[0]) * 8; 146 + unsigned int st_index = n->subtype / st_bits; 147 + unsigned int st_bit = 1U << (n->subtype % st_bits); 148 + int i; 149 + 150 + if (!test_bit(n->type, wf->type_filter)) 151 + return false; 152 + 153 + for (i = 0; i < wf->nr_filters; i++) { 154 + wt = &wf->filters[i]; 155 + if (n->type == wt->type && 156 + (wt->subtype_filter[st_index] & st_bit) && 157 + (n->info & wt->info_mask) == wt->info_filter) 158 + return true; 159 + } 160 + 161 + return false; /* If there is a filter, the default is to reject. */ 162 + } 163 + 164 + /** 165 + * __post_watch_notification - Post an event notification 166 + * @wlist: The watch list to post the event to. 167 + * @n: The notification record to post. 168 + * @cred: The creds of the process that triggered the notification. 169 + * @id: The ID to match on the watch. 170 + * 171 + * Post a notification of an event into a set of watch queues and let the users 172 + * know. 173 + * 174 + * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and 175 + * should be in units of sizeof(*n). 176 + */ 177 + void __post_watch_notification(struct watch_list *wlist, 178 + struct watch_notification *n, 179 + const struct cred *cred, 180 + u64 id) 181 + { 182 + const struct watch_filter *wf; 183 + struct watch_queue *wqueue; 184 + struct watch *watch; 185 + 186 + if (((n->info & WATCH_INFO_LENGTH) >> WATCH_INFO_LENGTH__SHIFT) == 0) { 187 + WARN_ON(1); 188 + return; 189 + } 190 + 191 + rcu_read_lock(); 192 + 193 + hlist_for_each_entry_rcu(watch, &wlist->watchers, list_node) { 194 + if (watch->id != id) 195 + continue; 196 + n->info &= ~WATCH_INFO_ID; 197 + n->info |= watch->info_id; 198 + 199 + wqueue = rcu_dereference(watch->queue); 200 + wf = rcu_dereference(wqueue->filter); 201 + if (wf && !filter_watch_notification(wf, n)) 202 + continue; 203 + 204 + if (security_post_notification(watch->cred, cred, n) < 0) 205 + continue; 206 + 207 + post_one_notification(wqueue, n); 208 + } 209 + 210 + rcu_read_unlock(); 211 + } 212 + EXPORT_SYMBOL(__post_watch_notification); 213 + 214 + /* 215 + * Allocate sufficient pages to preallocation for the requested number of 216 + * notifications. 217 + */ 218 + long watch_queue_set_size(struct pipe_inode_info *pipe, unsigned int nr_notes) 219 + { 220 + struct watch_queue *wqueue = pipe->watch_queue; 221 + struct page **pages; 222 + unsigned long *bitmap; 223 + unsigned long user_bufs; 224 + unsigned int bmsize; 225 + int ret, i, nr_pages; 226 + 227 + if (!wqueue) 228 + return -ENODEV; 229 + if (wqueue->notes) 230 + return -EBUSY; 231 + 232 + if (nr_notes < 1 || 233 + nr_notes > 512) /* TODO: choose a better hard limit */ 234 + return -EINVAL; 235 + 236 + nr_pages = (nr_notes + WATCH_QUEUE_NOTES_PER_PAGE - 1); 237 + nr_pages /= WATCH_QUEUE_NOTES_PER_PAGE; 238 + user_bufs = account_pipe_buffers(pipe->user, pipe->nr_accounted, nr_pages); 239 + 240 + if (nr_pages > pipe->max_usage && 241 + (too_many_pipe_buffers_hard(user_bufs) || 242 + too_many_pipe_buffers_soft(user_bufs)) && 243 + pipe_is_unprivileged_user()) { 244 + ret = -EPERM; 245 + goto error; 246 + } 247 + 248 + ret = pipe_resize_ring(pipe, nr_notes); 249 + if (ret < 0) 250 + goto error; 251 + 252 + pages = kcalloc(sizeof(struct page *), nr_pages, GFP_KERNEL); 253 + if (!pages) 254 + goto error; 255 + 256 + for (i = 0; i < nr_pages; i++) { 257 + pages[i] = alloc_page(GFP_KERNEL); 258 + if (!pages[i]) 259 + goto error_p; 260 + pages[i]->index = i * WATCH_QUEUE_NOTES_PER_PAGE; 261 + } 262 + 263 + bmsize = (nr_notes + BITS_PER_LONG - 1) / BITS_PER_LONG; 264 + bmsize *= sizeof(unsigned long); 265 + bitmap = kmalloc(bmsize, GFP_KERNEL); 266 + if (!bitmap) 267 + goto error_p; 268 + 269 + memset(bitmap, 0xff, bmsize); 270 + wqueue->notes = pages; 271 + wqueue->notes_bitmap = bitmap; 272 + wqueue->nr_pages = nr_pages; 273 + wqueue->nr_notes = nr_pages * WATCH_QUEUE_NOTES_PER_PAGE; 274 + return 0; 275 + 276 + error_p: 277 + for (i = 0; i < nr_pages; i++) 278 + __free_page(pages[i]); 279 + kfree(pages); 280 + error: 281 + (void) account_pipe_buffers(pipe->user, nr_pages, pipe->nr_accounted); 282 + return ret; 283 + } 284 + 285 + /* 286 + * Set the filter on a watch queue. 287 + */ 288 + long watch_queue_set_filter(struct pipe_inode_info *pipe, 289 + struct watch_notification_filter __user *_filter) 290 + { 291 + struct watch_notification_type_filter *tf; 292 + struct watch_notification_filter filter; 293 + struct watch_type_filter *q; 294 + struct watch_filter *wfilter; 295 + struct watch_queue *wqueue = pipe->watch_queue; 296 + int ret, nr_filter = 0, i; 297 + 298 + if (!wqueue) 299 + return -ENODEV; 300 + 301 + if (!_filter) { 302 + /* Remove the old filter */ 303 + wfilter = NULL; 304 + goto set; 305 + } 306 + 307 + /* Grab the user's filter specification */ 308 + if (copy_from_user(&filter, _filter, sizeof(filter)) != 0) 309 + return -EFAULT; 310 + if (filter.nr_filters == 0 || 311 + filter.nr_filters > 16 || 312 + filter.__reserved != 0) 313 + return -EINVAL; 314 + 315 + tf = memdup_user(_filter->filters, filter.nr_filters * sizeof(*tf)); 316 + if (IS_ERR(tf)) 317 + return PTR_ERR(tf); 318 + 319 + ret = -EINVAL; 320 + for (i = 0; i < filter.nr_filters; i++) { 321 + if ((tf[i].info_filter & ~tf[i].info_mask) || 322 + tf[i].info_mask & WATCH_INFO_LENGTH) 323 + goto err_filter; 324 + /* Ignore any unknown types */ 325 + if (tf[i].type >= sizeof(wfilter->type_filter) * 8) 326 + continue; 327 + nr_filter++; 328 + } 329 + 330 + /* Now we need to build the internal filter from only the relevant 331 + * user-specified filters. 332 + */ 333 + ret = -ENOMEM; 334 + wfilter = kzalloc(struct_size(wfilter, filters, nr_filter), GFP_KERNEL); 335 + if (!wfilter) 336 + goto err_filter; 337 + wfilter->nr_filters = nr_filter; 338 + 339 + q = wfilter->filters; 340 + for (i = 0; i < filter.nr_filters; i++) { 341 + if (tf[i].type >= sizeof(wfilter->type_filter) * BITS_PER_LONG) 342 + continue; 343 + 344 + q->type = tf[i].type; 345 + q->info_filter = tf[i].info_filter; 346 + q->info_mask = tf[i].info_mask; 347 + q->subtype_filter[0] = tf[i].subtype_filter[0]; 348 + __set_bit(q->type, wfilter->type_filter); 349 + q++; 350 + } 351 + 352 + kfree(tf); 353 + set: 354 + pipe_lock(pipe); 355 + wfilter = rcu_replace_pointer(wqueue->filter, wfilter, 356 + lockdep_is_held(&pipe->mutex)); 357 + pipe_unlock(pipe); 358 + if (wfilter) 359 + kfree_rcu(wfilter, rcu); 360 + return 0; 361 + 362 + err_filter: 363 + kfree(tf); 364 + return ret; 365 + } 366 + 367 + static void __put_watch_queue(struct kref *kref) 368 + { 369 + struct watch_queue *wqueue = 370 + container_of(kref, struct watch_queue, usage); 371 + struct watch_filter *wfilter; 372 + int i; 373 + 374 + for (i = 0; i < wqueue->nr_pages; i++) 375 + __free_page(wqueue->notes[i]); 376 + 377 + wfilter = rcu_access_pointer(wqueue->filter); 378 + if (wfilter) 379 + kfree_rcu(wfilter, rcu); 380 + kfree_rcu(wqueue, rcu); 381 + } 382 + 383 + /** 384 + * put_watch_queue - Dispose of a ref on a watchqueue. 385 + * @wqueue: The watch queue to unref. 386 + */ 387 + void put_watch_queue(struct watch_queue *wqueue) 388 + { 389 + kref_put(&wqueue->usage, __put_watch_queue); 390 + } 391 + EXPORT_SYMBOL(put_watch_queue); 392 + 393 + static void free_watch(struct rcu_head *rcu) 394 + { 395 + struct watch *watch = container_of(rcu, struct watch, rcu); 396 + 397 + put_watch_queue(rcu_access_pointer(watch->queue)); 398 + put_cred(watch->cred); 399 + } 400 + 401 + static void __put_watch(struct kref *kref) 402 + { 403 + struct watch *watch = container_of(kref, struct watch, usage); 404 + 405 + call_rcu(&watch->rcu, free_watch); 406 + } 407 + 408 + /* 409 + * Discard a watch. 410 + */ 411 + static void put_watch(struct watch *watch) 412 + { 413 + kref_put(&watch->usage, __put_watch); 414 + } 415 + 416 + /** 417 + * init_watch_queue - Initialise a watch 418 + * @watch: The watch to initialise. 419 + * @wqueue: The queue to assign. 420 + * 421 + * Initialise a watch and set the watch queue. 422 + */ 423 + void init_watch(struct watch *watch, struct watch_queue *wqueue) 424 + { 425 + kref_init(&watch->usage); 426 + INIT_HLIST_NODE(&watch->list_node); 427 + INIT_HLIST_NODE(&watch->queue_node); 428 + rcu_assign_pointer(watch->queue, wqueue); 429 + } 430 + 431 + /** 432 + * add_watch_to_object - Add a watch on an object to a watch list 433 + * @watch: The watch to add 434 + * @wlist: The watch list to add to 435 + * 436 + * @watch->queue must have been set to point to the queue to post notifications 437 + * to and the watch list of the object to be watched. @watch->cred must also 438 + * have been set to the appropriate credentials and a ref taken on them. 439 + * 440 + * The caller must pin the queue and the list both and must hold the list 441 + * locked against racing watch additions/removals. 442 + */ 443 + int add_watch_to_object(struct watch *watch, struct watch_list *wlist) 444 + { 445 + struct watch_queue *wqueue = rcu_access_pointer(watch->queue); 446 + struct watch *w; 447 + 448 + hlist_for_each_entry(w, &wlist->watchers, list_node) { 449 + struct watch_queue *wq = rcu_access_pointer(w->queue); 450 + if (wqueue == wq && watch->id == w->id) 451 + return -EBUSY; 452 + } 453 + 454 + watch->cred = get_current_cred(); 455 + rcu_assign_pointer(watch->watch_list, wlist); 456 + 457 + spin_lock_bh(&wqueue->lock); 458 + kref_get(&wqueue->usage); 459 + kref_get(&watch->usage); 460 + hlist_add_head(&watch->queue_node, &wqueue->watches); 461 + spin_unlock_bh(&wqueue->lock); 462 + 463 + hlist_add_head(&watch->list_node, &wlist->watchers); 464 + return 0; 465 + } 466 + EXPORT_SYMBOL(add_watch_to_object); 467 + 468 + /** 469 + * remove_watch_from_object - Remove a watch or all watches from an object. 470 + * @wlist: The watch list to remove from 471 + * @wq: The watch queue of interest (ignored if @all is true) 472 + * @id: The ID of the watch to remove (ignored if @all is true) 473 + * @all: True to remove all objects 474 + * 475 + * Remove a specific watch or all watches from an object. A notification is 476 + * sent to the watcher to tell them that this happened. 477 + */ 478 + int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq, 479 + u64 id, bool all) 480 + { 481 + struct watch_notification_removal n; 482 + struct watch_queue *wqueue; 483 + struct watch *watch; 484 + int ret = -EBADSLT; 485 + 486 + rcu_read_lock(); 487 + 488 + again: 489 + spin_lock(&wlist->lock); 490 + hlist_for_each_entry(watch, &wlist->watchers, list_node) { 491 + if (all || 492 + (watch->id == id && rcu_access_pointer(watch->queue) == wq)) 493 + goto found; 494 + } 495 + spin_unlock(&wlist->lock); 496 + goto out; 497 + 498 + found: 499 + ret = 0; 500 + hlist_del_init_rcu(&watch->list_node); 501 + rcu_assign_pointer(watch->watch_list, NULL); 502 + spin_unlock(&wlist->lock); 503 + 504 + /* We now own the reference on watch that used to belong to wlist. */ 505 + 506 + n.watch.type = WATCH_TYPE_META; 507 + n.watch.subtype = WATCH_META_REMOVAL_NOTIFICATION; 508 + n.watch.info = watch->info_id | watch_sizeof(n.watch); 509 + n.id = id; 510 + if (id != 0) 511 + n.watch.info = watch->info_id | watch_sizeof(n); 512 + 513 + wqueue = rcu_dereference(watch->queue); 514 + 515 + /* We don't need the watch list lock for the next bit as RCU is 516 + * protecting *wqueue from deallocation. 517 + */ 518 + if (wqueue) { 519 + post_one_notification(wqueue, &n.watch); 520 + 521 + spin_lock_bh(&wqueue->lock); 522 + 523 + if (!hlist_unhashed(&watch->queue_node)) { 524 + hlist_del_init_rcu(&watch->queue_node); 525 + put_watch(watch); 526 + } 527 + 528 + spin_unlock_bh(&wqueue->lock); 529 + } 530 + 531 + if (wlist->release_watch) { 532 + void (*release_watch)(struct watch *); 533 + 534 + release_watch = wlist->release_watch; 535 + rcu_read_unlock(); 536 + (*release_watch)(watch); 537 + rcu_read_lock(); 538 + } 539 + put_watch(watch); 540 + 541 + if (all && !hlist_empty(&wlist->watchers)) 542 + goto again; 543 + out: 544 + rcu_read_unlock(); 545 + return ret; 546 + } 547 + EXPORT_SYMBOL(remove_watch_from_object); 548 + 549 + /* 550 + * Remove all the watches that are contributory to a queue. This has the 551 + * potential to race with removal of the watches by the destruction of the 552 + * objects being watched or with the distribution of notifications. 553 + */ 554 + void watch_queue_clear(struct watch_queue *wqueue) 555 + { 556 + struct watch_list *wlist; 557 + struct watch *watch; 558 + bool release; 559 + 560 + rcu_read_lock(); 561 + spin_lock_bh(&wqueue->lock); 562 + 563 + /* Prevent new additions and prevent notifications from happening */ 564 + wqueue->defunct = true; 565 + 566 + while (!hlist_empty(&wqueue->watches)) { 567 + watch = hlist_entry(wqueue->watches.first, struct watch, queue_node); 568 + hlist_del_init_rcu(&watch->queue_node); 569 + /* We now own a ref on the watch. */ 570 + spin_unlock_bh(&wqueue->lock); 571 + 572 + /* We can't do the next bit under the queue lock as we need to 573 + * get the list lock - which would cause a deadlock if someone 574 + * was removing from the opposite direction at the same time or 575 + * posting a notification. 576 + */ 577 + wlist = rcu_dereference(watch->watch_list); 578 + if (wlist) { 579 + void (*release_watch)(struct watch *); 580 + 581 + spin_lock(&wlist->lock); 582 + 583 + release = !hlist_unhashed(&watch->list_node); 584 + if (release) { 585 + hlist_del_init_rcu(&watch->list_node); 586 + rcu_assign_pointer(watch->watch_list, NULL); 587 + 588 + /* We now own a second ref on the watch. */ 589 + } 590 + 591 + release_watch = wlist->release_watch; 592 + spin_unlock(&wlist->lock); 593 + 594 + if (release) { 595 + if (release_watch) { 596 + rcu_read_unlock(); 597 + /* This might need to call dput(), so 598 + * we have to drop all the locks. 599 + */ 600 + (*release_watch)(watch); 601 + rcu_read_lock(); 602 + } 603 + put_watch(watch); 604 + } 605 + } 606 + 607 + put_watch(watch); 608 + spin_lock_bh(&wqueue->lock); 609 + } 610 + 611 + spin_unlock_bh(&wqueue->lock); 612 + rcu_read_unlock(); 613 + } 614 + 615 + /** 616 + * get_watch_queue - Get a watch queue from its file descriptor. 617 + * @fd: The fd to query. 618 + */ 619 + struct watch_queue *get_watch_queue(int fd) 620 + { 621 + struct pipe_inode_info *pipe; 622 + struct watch_queue *wqueue = ERR_PTR(-EINVAL); 623 + struct fd f; 624 + 625 + f = fdget(fd); 626 + if (f.file) { 627 + pipe = get_pipe_info(f.file, false); 628 + if (pipe && pipe->watch_queue) { 629 + wqueue = pipe->watch_queue; 630 + kref_get(&wqueue->usage); 631 + } 632 + fdput(f); 633 + } 634 + 635 + return wqueue; 636 + } 637 + EXPORT_SYMBOL(get_watch_queue); 638 + 639 + /* 640 + * Initialise a watch queue 641 + */ 642 + int watch_queue_init(struct pipe_inode_info *pipe) 643 + { 644 + struct watch_queue *wqueue; 645 + 646 + wqueue = kzalloc(sizeof(*wqueue), GFP_KERNEL); 647 + if (!wqueue) 648 + return -ENOMEM; 649 + 650 + wqueue->pipe = pipe; 651 + kref_init(&wqueue->usage); 652 + spin_lock_init(&wqueue->lock); 653 + INIT_HLIST_HEAD(&wqueue->watches); 654 + 655 + pipe->watch_queue = wqueue; 656 + return 0; 657 + }