Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

ipc/mqueue: add rbtree node caching support

When I wrote the first patch that added the rbtree support for message
queue insertion, it sped up the case where the queue was very full
drastically from the original code. It, however, slowed down the case
where the queue was empty (not drastically though).

This patch caches the last freed rbtree node struct so we can quickly
reuse it when we get a new message. This is the common path for any queue
that very frequently goes from 0 to 1 then back to 0 messages in queue.

Andrew Morton didn't like that we were doing a GFP_ATOMIC allocation in
msg_insert, so this patch attempts to speculatively allocate a new node
struct outside of the spin lock when we know we need it, but will still
fall back to a GFP_ATOMIC allocation if it has to.

Once I added the caching, the necessary various ret = ; spin_unlock
gyrations in mq_timedsend were getting pretty ugly, so this also slightly
refactors that function to streamline the flow of the code and the
function exit.

Finally, while working on getting performance back I made sure that all of
the node structs were always fully initialized when they were first used,
rendering the use of kzalloc unnecessary and a waste of CPU cycles.

The net result of all of this is:

1) We will avoid a GFP_ATOMIC allocation when possible, but fall back
on it when necessary.

2) We will speculatively allocate a node struct using GFP_KERNEL if our
cache is empty (and save the struct to our cache if it's still empty
after we have obtained the spin lock).

3) The performance of the common queue empty case has significantly
improved and is now much more in line with the older performance for
this case.

The performance changes are:

Old mqueue new mqueue new mqueue + caching
queue empty
send/recv 305/288ns 349/318ns 310/322ns

I don't think we'll ever be able to get the recv performance back, but
that's because the old recv performance was a direct result and
consequence of the old methods abysmal send performance. The recv path
simply must do more so that the send path does not incur such a penalty
under higher queue depths.

As it turns out, the new caching code also sped up the various queue full
cases relative to my last patch. That could be because of the difference
between the syscall path in 3.3.4-rc5 and 3.3.4-rc6, or because of the
change in code flow in the mq_timedsend routine. Regardless, I'll take
it. It wasn't huge, and I *would* say it was within the margin for error,
but after many repeated runs what I'm seeing is that the old numbers trend
slightly higher (about 10 to 20ns depending on which test is the one
running).

[akpm@linux-foundation.org: checkpatch fixes]
Signed-off-by: Doug Ledford <dledford@redhat.com>
Cc: Frederic Weisbecker <fweisbec@gmail.com>
Cc: Manfred Spraul <manfred@colorfullife.com>
Cc: Stephen Rothwell <sfr@canb.auug.org.au>
Cc: KOSAKI Motohiro <kosaki.motohiro@jp.fujitsu.com>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>

authored by

Doug Ledford and committed by
Linus Torvalds
ce2d52cc 7820b071

+81 -23
+81 -23
ipc/mqueue.c
··· 69 69 wait_queue_head_t wait_q; 70 70 71 71 struct rb_root msg_tree; 72 + struct posix_msg_tree_node *node_cache; 72 73 struct mq_attr attr; 73 74 74 75 struct sigevent notify; ··· 135 134 else 136 135 p = &(*p)->rb_right; 137 136 } 138 - leaf = kzalloc(sizeof(*leaf), GFP_ATOMIC); 139 - if (!leaf) 140 - return -ENOMEM; 141 - rb_init_node(&leaf->rb_node); 142 - INIT_LIST_HEAD(&leaf->msg_list); 137 + if (info->node_cache) { 138 + leaf = info->node_cache; 139 + info->node_cache = NULL; 140 + } else { 141 + leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC); 142 + if (!leaf) 143 + return -ENOMEM; 144 + rb_init_node(&leaf->rb_node); 145 + INIT_LIST_HEAD(&leaf->msg_list); 146 + info->qsize += sizeof(*leaf); 147 + } 143 148 leaf->priority = msg->m_type; 144 149 rb_link_node(&leaf->rb_node, parent, p); 145 150 rb_insert_color(&leaf->rb_node, &info->msg_tree); 146 - info->qsize += sizeof(struct posix_msg_tree_node); 147 151 insert_msg: 148 152 info->attr.mq_curmsgs++; 149 153 info->qsize += msg->m_ts; ··· 183 177 return NULL; 184 178 } 185 179 leaf = rb_entry(parent, struct posix_msg_tree_node, rb_node); 186 - if (list_empty(&leaf->msg_list)) { 180 + if (unlikely(list_empty(&leaf->msg_list))) { 187 181 pr_warn_once("Inconsistency in POSIX message queue, " 188 182 "empty leaf node but we haven't implemented " 189 183 "lazy leaf delete!\n"); 190 184 rb_erase(&leaf->rb_node, &info->msg_tree); 191 - info->qsize -= sizeof(struct posix_msg_tree_node); 192 - kfree(leaf); 185 + if (info->node_cache) { 186 + info->qsize -= sizeof(*leaf); 187 + kfree(leaf); 188 + } else { 189 + info->node_cache = leaf; 190 + } 193 191 goto try_again; 194 192 } else { 195 193 msg = list_first_entry(&leaf->msg_list, ··· 201 191 list_del(&msg->m_list); 202 192 if (list_empty(&leaf->msg_list)) { 203 193 rb_erase(&leaf->rb_node, &info->msg_tree); 204 - info->qsize -= sizeof(struct posix_msg_tree_node); 205 - kfree(leaf); 194 + if (info->node_cache) { 195 + info->qsize -= sizeof(*leaf); 196 + kfree(leaf); 197 + } else { 198 + info->node_cache = leaf; 199 + } 206 200 } 207 201 } 208 202 info->attr.mq_curmsgs--; ··· 249 235 info->qsize = 0; 250 236 info->user = NULL; /* set when all is ok */ 251 237 info->msg_tree = RB_ROOT; 238 + info->node_cache = NULL; 252 239 memset(&info->attr, 0, sizeof(info->attr)); 253 240 info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max, 254 241 ipc_ns->mq_msg_default); ··· 382 367 spin_lock(&info->lock); 383 368 while ((msg = msg_get(info)) != NULL) 384 369 free_msg(msg); 370 + kfree(info->node_cache); 385 371 spin_unlock(&info->lock); 386 372 387 373 /* Total amount of bytes accounted for the mqueue */ ··· 980 964 struct mqueue_inode_info *info; 981 965 ktime_t expires, *timeout = NULL; 982 966 struct timespec ts; 983 - int ret; 967 + struct posix_msg_tree_node *new_leaf = NULL; 968 + int ret = 0; 984 969 985 970 if (u_abs_timeout) { 986 971 int res = prepare_timeout(u_abs_timeout, &expires, &ts); ··· 1029 1012 msg_ptr->m_ts = msg_len; 1030 1013 msg_ptr->m_type = msg_prio; 1031 1014 1015 + /* 1016 + * msg_insert really wants us to have a valid, spare node struct so 1017 + * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will 1018 + * fall back to that if necessary. 1019 + */ 1020 + if (!info->node_cache) 1021 + new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL); 1022 + 1032 1023 spin_lock(&info->lock); 1024 + 1025 + if (!info->node_cache && new_leaf) { 1026 + /* Save our speculative allocation into the cache */ 1027 + rb_init_node(&new_leaf->rb_node); 1028 + INIT_LIST_HEAD(&new_leaf->msg_list); 1029 + info->node_cache = new_leaf; 1030 + info->qsize += sizeof(*new_leaf); 1031 + new_leaf = NULL; 1032 + } else { 1033 + kfree(new_leaf); 1034 + } 1033 1035 1034 1036 if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) { 1035 1037 if (filp->f_flags & O_NONBLOCK) { 1036 - spin_unlock(&info->lock); 1037 1038 ret = -EAGAIN; 1038 1039 } else { 1039 1040 wait.task = current; 1040 1041 wait.msg = (void *) msg_ptr; 1041 1042 wait.state = STATE_NONE; 1042 1043 ret = wq_sleep(info, SEND, timeout, &wait); 1044 + /* 1045 + * wq_sleep must be called with info->lock held, and 1046 + * returns with the lock released 1047 + */ 1048 + goto out_free; 1043 1049 } 1044 - if (ret < 0) 1045 - free_msg(msg_ptr); 1046 1050 } else { 1047 1051 receiver = wq_get_first_waiter(info, RECV); 1048 1052 if (receiver) { 1049 1053 pipelined_send(info, msg_ptr, receiver); 1050 1054 } else { 1051 1055 /* adds message to the queue */ 1052 - if (msg_insert(msg_ptr, info)) { 1053 - free_msg(msg_ptr); 1054 - ret = -ENOMEM; 1055 - spin_unlock(&info->lock); 1056 - goto out_fput; 1057 - } 1056 + ret = msg_insert(msg_ptr, info); 1057 + if (ret) 1058 + goto out_unlock; 1058 1059 __do_notify(info); 1059 1060 } 1060 1061 inode->i_atime = inode->i_mtime = inode->i_ctime = 1061 1062 CURRENT_TIME; 1062 - spin_unlock(&info->lock); 1063 - ret = 0; 1064 1063 } 1064 + out_unlock: 1065 + spin_unlock(&info->lock); 1066 + out_free: 1067 + if (ret) 1068 + free_msg(msg_ptr); 1065 1069 out_fput: 1066 1070 fput(filp); 1067 1071 out: ··· 1101 1063 struct ext_wait_queue wait; 1102 1064 ktime_t expires, *timeout = NULL; 1103 1065 struct timespec ts; 1066 + struct posix_msg_tree_node *new_leaf = NULL; 1104 1067 1105 1068 if (u_abs_timeout) { 1106 1069 int res = prepare_timeout(u_abs_timeout, &expires, &ts); ··· 1137 1098 goto out_fput; 1138 1099 } 1139 1100 1101 + /* 1102 + * msg_insert really wants us to have a valid, spare node struct so 1103 + * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will 1104 + * fall back to that if necessary. 1105 + */ 1106 + if (!info->node_cache) 1107 + new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL); 1108 + 1140 1109 spin_lock(&info->lock); 1110 + 1111 + if (!info->node_cache && new_leaf) { 1112 + /* Save our speculative allocation into the cache */ 1113 + rb_init_node(&new_leaf->rb_node); 1114 + INIT_LIST_HEAD(&new_leaf->msg_list); 1115 + info->node_cache = new_leaf; 1116 + info->qsize += sizeof(*new_leaf); 1117 + } else { 1118 + kfree(new_leaf); 1119 + } 1120 + 1141 1121 if (info->attr.mq_curmsgs == 0) { 1142 1122 if (filp->f_flags & O_NONBLOCK) { 1143 1123 spin_unlock(&info->lock);