Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge branch 'rwsem-optimizations'

Merge rwsem optimizations from Michel Lespinasse:
"These patches extend Alex Shi's work (which added write lock stealing
on the rwsem slow path) in order to provide rwsem write lock stealing
on the fast path (that is, without taking the rwsem's wait_lock).

I have unfortunately been unable to push this through -next before due
to Ingo Molnar / David Howells / Peter Zijlstra being busy with other
things. However, this has gotten some attention from Rik van Riel and
Davidlohr Bueso who both commented that they felt this was ready for
v3.10, and Ingo Molnar has said that he was OK with me pushing
directly to you. So, here goes :)

Davidlohr got the following test results from pgbench running on a
quad-core laptop:

| db_size | clients | tps-vanilla | tps-rwsem |
+---------+----------+----------------+--------------+
| 160 MB | 1 | 5803 | 6906 | + 19.0%
| 160 MB | 2 | 13092 | 15931 |
| 160 MB | 4 | 29412 | 33021 |
| 160 MB | 8 | 32448 | 34626 |
| 160 MB | 16 | 32758 | 33098 |
| 160 MB | 20 | 26940 | 31343 | + 16.3%
| 160 MB | 30 | 25147 | 28961 |
| 160 MB | 40 | 25484 | 26902 |
| 160 MB | 50 | 24528 | 25760 |
------------------------------------------------------
| 1.6 GB | 1 | 5733 | 7729 | + 34.8%
| 1.6 GB | 2 | 9411 | 19009 | + 101.9%
| 1.6 GB | 4 | 31818 | 33185 |
| 1.6 GB | 8 | 33700 | 34550 |
| 1.6 GB | 16 | 32751 | 33079 |
| 1.6 GB | 20 | 30919 | 31494 |
| 1.6 GB | 30 | 28540 | 28535 |
| 1.6 GB | 40 | 26380 | 27054 |
| 1.6 GB | 50 | 25241 | 25591 |
------------------------------------------------------
| 7.6 GB | 1 | 5779 | 6224 |
| 7.6 GB | 2 | 10897 | 13611 | + 24.9%
| 7.6 GB | 4 | 32683 | 33108 |
| 7.6 GB | 8 | 33968 | 34712 |
| 7.6 GB | 16 | 32287 | 32895 |
| 7.6 GB | 20 | 27770 | 31689 | + 14.1%
| 7.6 GB | 30 | 26739 | 29003 |
| 7.6 GB | 40 | 24901 | 26683 |
| 7.6 GB | 50 | 17115 | 25925 | + 51.5%
------------------------------------------------------

(Davidlohr also has one additional patch which further improves
throughput, though I will ask him to send it directly to you as I have
suggested some minor changes)."

* emailed patches from Michel Lespinasse <walken@google.com>:
rwsem: no need for explicit signed longs
x86 rwsem: avoid taking slow path when stealing write lock
rwsem: do not block readers at head of queue if other readers are active
rwsem: implement support for write lock stealing on the fastpath
rwsem: simplify __rwsem_do_wake
rwsem: skip initial trylock in rwsem_down_write_failed
rwsem: avoid taking wait_lock in rwsem_down_write_failed
rwsem: use cmpxchg for trying to steal write lock
rwsem: more agressive lock stealing in rwsem_down_write_failed
rwsem: simplify rwsem_down_write_failed
rwsem: simplify rwsem_down_read_failed
rwsem: move rwsem_down_failed_common code into rwsem_down_{read,write}_failed
rwsem: shorter spinlocked section in rwsem_down_failed_common()
rwsem: make the waiter type an enumeration rather than a bitmask

+153 -153
+21 -7
arch/x86/include/asm/rwsem.h
··· 105 105 asm volatile("# beginning down_write\n\t" 106 106 LOCK_PREFIX " xadd %1,(%2)\n\t" 107 107 /* adds 0xffff0001, returns the old value */ 108 - " test %1,%1\n\t" 109 - /* was the count 0 before? */ 108 + " test " __ASM_SEL(%w1,%k1) "," __ASM_SEL(%w1,%k1) "\n\t" 109 + /* was the active mask 0 before? */ 110 110 " jz 1f\n" 111 111 " call call_rwsem_down_write_failed\n" 112 112 "1:\n" ··· 126 126 */ 127 127 static inline int __down_write_trylock(struct rw_semaphore *sem) 128 128 { 129 - long ret = cmpxchg(&sem->count, RWSEM_UNLOCKED_VALUE, 130 - RWSEM_ACTIVE_WRITE_BIAS); 131 - if (ret == RWSEM_UNLOCKED_VALUE) 132 - return 1; 133 - return 0; 129 + long result, tmp; 130 + asm volatile("# beginning __down_write_trylock\n\t" 131 + " mov %0,%1\n\t" 132 + "1:\n\t" 133 + " test " __ASM_SEL(%w1,%k1) "," __ASM_SEL(%w1,%k1) "\n\t" 134 + /* was the active mask 0 before? */ 135 + " jnz 2f\n\t" 136 + " mov %1,%2\n\t" 137 + " add %3,%2\n\t" 138 + LOCK_PREFIX " cmpxchg %2,%0\n\t" 139 + " jnz 1b\n\t" 140 + "2:\n\t" 141 + " sete %b1\n\t" 142 + " movzbl %b1, %k1\n\t" 143 + "# ending __down_write_trylock\n\t" 144 + : "+m" (sem->count), "=&a" (result), "=&r" (tmp) 145 + : "er" (RWSEM_ACTIVE_WRITE_BIAS) 146 + : "memory", "cc"); 147 + return result; 134 148 } 135 149 136 150 /*
+16 -22
lib/rwsem-spinlock.c
··· 9 9 #include <linux/sched.h> 10 10 #include <linux/export.h> 11 11 12 + enum rwsem_waiter_type { 13 + RWSEM_WAITING_FOR_WRITE, 14 + RWSEM_WAITING_FOR_READ 15 + }; 16 + 12 17 struct rwsem_waiter { 13 18 struct list_head list; 14 19 struct task_struct *task; 15 - unsigned int flags; 16 - #define RWSEM_WAITING_FOR_READ 0x00000001 17 - #define RWSEM_WAITING_FOR_WRITE 0x00000002 20 + enum rwsem_waiter_type type; 18 21 }; 19 22 20 23 int rwsem_is_locked(struct rw_semaphore *sem) ··· 70 67 71 68 waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list); 72 69 73 - if (!wakewrite) { 74 - if (waiter->flags & RWSEM_WAITING_FOR_WRITE) 75 - goto out; 76 - goto dont_wake_writers; 77 - } 78 - 79 - /* 80 - * as we support write lock stealing, we can't set sem->activity 81 - * to -1 here to indicate we get the lock. Instead, we wake it up 82 - * to let it go get it again. 83 - */ 84 - if (waiter->flags & RWSEM_WAITING_FOR_WRITE) { 85 - wake_up_process(waiter->task); 70 + if (waiter->type == RWSEM_WAITING_FOR_WRITE) { 71 + if (wakewrite) 72 + /* Wake up a writer. Note that we do not grant it the 73 + * lock - it will have to acquire it when it runs. */ 74 + wake_up_process(waiter->task); 86 75 goto out; 87 76 } 88 77 89 78 /* grant an infinite number of read locks to the front of the queue */ 90 - dont_wake_writers: 91 79 woken = 0; 92 - while (waiter->flags & RWSEM_WAITING_FOR_READ) { 80 + do { 93 81 struct list_head *next = waiter->list.next; 94 82 95 83 list_del(&waiter->list); ··· 90 96 wake_up_process(tsk); 91 97 put_task_struct(tsk); 92 98 woken++; 93 - if (list_empty(&sem->wait_list)) 99 + if (next == &sem->wait_list) 94 100 break; 95 101 waiter = list_entry(next, struct rwsem_waiter, list); 96 - } 102 + } while (waiter->type != RWSEM_WAITING_FOR_WRITE); 97 103 98 104 sem->activity += woken; 99 105 ··· 138 144 139 145 /* set up my own style of waitqueue */ 140 146 waiter.task = tsk; 141 - waiter.flags = RWSEM_WAITING_FOR_READ; 147 + waiter.type = RWSEM_WAITING_FOR_READ; 142 148 get_task_struct(tsk); 143 149 144 150 list_add_tail(&waiter.list, &sem->wait_list); ··· 195 201 /* set up my own style of waitqueue */ 196 202 tsk = current; 197 203 waiter.task = tsk; 198 - waiter.flags = RWSEM_WAITING_FOR_WRITE; 204 + waiter.type = RWSEM_WAITING_FOR_WRITE; 199 205 list_add_tail(&waiter.list, &sem->wait_list); 200 206 201 207 /* wait for someone to release the lock */
+116 -124
lib/rwsem.c
··· 4 4 * Derived from arch/i386/kernel/semaphore.c 5 5 * 6 6 * Writer lock-stealing by Alex Shi <alex.shi@intel.com> 7 + * and Michel Lespinasse <walken@google.com> 7 8 */ 8 9 #include <linux/rwsem.h> 9 10 #include <linux/sched.h> ··· 31 30 32 31 EXPORT_SYMBOL(__init_rwsem); 33 32 33 + enum rwsem_waiter_type { 34 + RWSEM_WAITING_FOR_WRITE, 35 + RWSEM_WAITING_FOR_READ 36 + }; 37 + 34 38 struct rwsem_waiter { 35 39 struct list_head list; 36 40 struct task_struct *task; 37 - unsigned int flags; 38 - #define RWSEM_WAITING_FOR_READ 0x00000001 39 - #define RWSEM_WAITING_FOR_WRITE 0x00000002 41 + enum rwsem_waiter_type type; 40 42 }; 41 43 42 - /* Wake types for __rwsem_do_wake(). Note that RWSEM_WAKE_NO_ACTIVE and 43 - * RWSEM_WAKE_READ_OWNED imply that the spinlock must have been kept held 44 - * since the rwsem value was observed. 45 - */ 46 - #define RWSEM_WAKE_ANY 0 /* Wake whatever's at head of wait list */ 47 - #define RWSEM_WAKE_NO_ACTIVE 1 /* rwsem was observed with no active thread */ 48 - #define RWSEM_WAKE_READ_OWNED 2 /* rwsem was observed to be read owned */ 44 + enum rwsem_wake_type { 45 + RWSEM_WAKE_ANY, /* Wake whatever's at head of wait list */ 46 + RWSEM_WAKE_READERS, /* Wake readers only */ 47 + RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */ 48 + }; 49 49 50 50 /* 51 51 * handle the lock release when processes blocked on it that can now run ··· 59 57 * - writers are only woken if downgrading is false 60 58 */ 61 59 static struct rw_semaphore * 62 - __rwsem_do_wake(struct rw_semaphore *sem, int wake_type) 60 + __rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type) 63 61 { 64 62 struct rwsem_waiter *waiter; 65 63 struct task_struct *tsk; 66 64 struct list_head *next; 67 - signed long woken, loop, adjustment; 65 + long oldcount, woken, loop, adjustment; 68 66 69 67 waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list); 70 - if (!(waiter->flags & RWSEM_WAITING_FOR_WRITE)) 71 - goto readers_only; 72 - 73 - if (wake_type == RWSEM_WAKE_READ_OWNED) 74 - /* Another active reader was observed, so wakeup is not 75 - * likely to succeed. Save the atomic op. 76 - */ 68 + if (waiter->type == RWSEM_WAITING_FOR_WRITE) { 69 + if (wake_type == RWSEM_WAKE_ANY) 70 + /* Wake writer at the front of the queue, but do not 71 + * grant it the lock yet as we want other writers 72 + * to be able to steal it. Readers, on the other hand, 73 + * will block as they will notice the queued writer. 74 + */ 75 + wake_up_process(waiter->task); 77 76 goto out; 77 + } 78 78 79 - /* Wake up the writing waiter and let the task grab the sem: */ 80 - wake_up_process(waiter->task); 81 - goto out; 82 - 83 - readers_only: 84 - /* If we come here from up_xxxx(), another thread might have reached 85 - * rwsem_down_failed_common() before we acquired the spinlock and 86 - * woken up a waiter, making it now active. We prefer to check for 87 - * this first in order to not spend too much time with the spinlock 88 - * held if we're not going to be able to wake up readers in the end. 89 - * 90 - * Note that we do not need to update the rwsem count: any writer 91 - * trying to acquire rwsem will run rwsem_down_write_failed() due 92 - * to the waiting threads and block trying to acquire the spinlock. 93 - * 94 - * We use a dummy atomic update in order to acquire the cache line 95 - * exclusively since we expect to succeed and run the final rwsem 96 - * count adjustment pretty soon. 79 + /* Writers might steal the lock before we grant it to the next reader. 80 + * We prefer to do the first reader grant before counting readers 81 + * so we can bail out early if a writer stole the lock. 97 82 */ 98 - if (wake_type == RWSEM_WAKE_ANY && 99 - rwsem_atomic_update(0, sem) < RWSEM_WAITING_BIAS) 100 - /* Someone grabbed the sem for write already */ 101 - goto out; 83 + adjustment = 0; 84 + if (wake_type != RWSEM_WAKE_READ_OWNED) { 85 + adjustment = RWSEM_ACTIVE_READ_BIAS; 86 + try_reader_grant: 87 + oldcount = rwsem_atomic_update(adjustment, sem) - adjustment; 88 + if (unlikely(oldcount < RWSEM_WAITING_BIAS)) { 89 + /* A writer stole the lock. Undo our reader grant. */ 90 + if (rwsem_atomic_update(-adjustment, sem) & 91 + RWSEM_ACTIVE_MASK) 92 + goto out; 93 + /* Last active locker left. Retry waking readers. */ 94 + goto try_reader_grant; 95 + } 96 + } 102 97 103 98 /* Grant an infinite number of read locks to the readers at the front 104 99 * of the queue. Note we increment the 'active part' of the count by ··· 111 112 waiter = list_entry(waiter->list.next, 112 113 struct rwsem_waiter, list); 113 114 114 - } while (waiter->flags & RWSEM_WAITING_FOR_READ); 115 + } while (waiter->type != RWSEM_WAITING_FOR_WRITE); 115 116 116 - adjustment = woken * RWSEM_ACTIVE_READ_BIAS; 117 - if (waiter->flags & RWSEM_WAITING_FOR_READ) 117 + adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment; 118 + if (waiter->type != RWSEM_WAITING_FOR_WRITE) 118 119 /* hit end of list above */ 119 120 adjustment -= RWSEM_WAITING_BIAS; 120 121 121 - rwsem_atomic_add(adjustment, sem); 122 + if (adjustment) 123 + rwsem_atomic_add(adjustment, sem); 122 124 123 125 next = sem->wait_list.next; 124 - for (loop = woken; loop > 0; loop--) { 126 + loop = woken; 127 + do { 125 128 waiter = list_entry(next, struct rwsem_waiter, list); 126 129 next = waiter->list.next; 127 130 tsk = waiter->task; ··· 131 130 waiter->task = NULL; 132 131 wake_up_process(tsk); 133 132 put_task_struct(tsk); 134 - } 133 + } while (--loop); 135 134 136 135 sem->wait_list.next = next; 137 136 next->prev = &sem->wait_list; ··· 140 139 return sem; 141 140 } 142 141 143 - /* Try to get write sem, caller holds sem->wait_lock: */ 144 - static int try_get_writer_sem(struct rw_semaphore *sem, 145 - struct rwsem_waiter *waiter) 146 - { 147 - struct rwsem_waiter *fwaiter; 148 - long oldcount, adjustment; 149 - 150 - /* only steal when first waiter is writing */ 151 - fwaiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list); 152 - if (!(fwaiter->flags & RWSEM_WAITING_FOR_WRITE)) 153 - return 0; 154 - 155 - adjustment = RWSEM_ACTIVE_WRITE_BIAS; 156 - /* Only one waiter in the queue: */ 157 - if (fwaiter == waiter && waiter->list.next == &sem->wait_list) 158 - adjustment -= RWSEM_WAITING_BIAS; 159 - 160 - try_again_write: 161 - oldcount = rwsem_atomic_update(adjustment, sem) - adjustment; 162 - if (!(oldcount & RWSEM_ACTIVE_MASK)) { 163 - /* No active lock: */ 164 - struct task_struct *tsk = waiter->task; 165 - 166 - list_del(&waiter->list); 167 - smp_mb(); 168 - put_task_struct(tsk); 169 - tsk->state = TASK_RUNNING; 170 - return 1; 171 - } 172 - /* some one grabbed the sem already */ 173 - if (rwsem_atomic_update(-adjustment, sem) & RWSEM_ACTIVE_MASK) 174 - return 0; 175 - goto try_again_write; 176 - } 177 - 178 142 /* 179 - * wait for a lock to be granted 143 + * wait for the read lock to be granted 180 144 */ 181 - static struct rw_semaphore __sched * 182 - rwsem_down_failed_common(struct rw_semaphore *sem, 183 - unsigned int flags, signed long adjustment) 145 + struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem) 184 146 { 147 + long count, adjustment = -RWSEM_ACTIVE_READ_BIAS; 185 148 struct rwsem_waiter waiter; 186 149 struct task_struct *tsk = current; 187 - signed long count; 188 - 189 - set_task_state(tsk, TASK_UNINTERRUPTIBLE); 190 150 191 151 /* set up my own style of waitqueue */ 192 - raw_spin_lock_irq(&sem->wait_lock); 193 152 waiter.task = tsk; 194 - waiter.flags = flags; 153 + waiter.type = RWSEM_WAITING_FOR_READ; 195 154 get_task_struct(tsk); 196 155 156 + raw_spin_lock_irq(&sem->wait_lock); 197 157 if (list_empty(&sem->wait_list)) 198 158 adjustment += RWSEM_WAITING_BIAS; 199 159 list_add_tail(&waiter.list, &sem->wait_list); ··· 162 200 /* we're now waiting on the lock, but no longer actively locking */ 163 201 count = rwsem_atomic_update(adjustment, sem); 164 202 165 - /* If there are no active locks, wake the front queued process(es) up. 203 + /* If there are no active locks, wake the front queued process(es). 166 204 * 167 - * Alternatively, if we're called from a failed down_write(), there 168 - * were already threads queued before us and there are no active 169 - * writers, the lock must be read owned; so we try to wake any read 170 - * locks that were queued ahead of us. */ 171 - if (count == RWSEM_WAITING_BIAS) 172 - sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE); 173 - else if (count > RWSEM_WAITING_BIAS && 174 - adjustment == -RWSEM_ACTIVE_WRITE_BIAS) 175 - sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED); 205 + * If there are no writers and we are first in the queue, 206 + * wake our own waiter to join the existing active readers ! 207 + */ 208 + if (count == RWSEM_WAITING_BIAS || 209 + (count > RWSEM_WAITING_BIAS && 210 + adjustment != -RWSEM_ACTIVE_READ_BIAS)) 211 + sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY); 176 212 177 213 raw_spin_unlock_irq(&sem->wait_lock); 178 214 179 215 /* wait to be given the lock */ 180 - for (;;) { 216 + while (true) { 217 + set_task_state(tsk, TASK_UNINTERRUPTIBLE); 181 218 if (!waiter.task) 182 219 break; 183 - 184 - raw_spin_lock_irq(&sem->wait_lock); 185 - /* Try to get the writer sem, may steal from the head writer: */ 186 - if (flags == RWSEM_WAITING_FOR_WRITE) 187 - if (try_get_writer_sem(sem, &waiter)) { 188 - raw_spin_unlock_irq(&sem->wait_lock); 189 - return sem; 190 - } 191 - raw_spin_unlock_irq(&sem->wait_lock); 192 220 schedule(); 193 - set_task_state(tsk, TASK_UNINTERRUPTIBLE); 194 221 } 195 222 196 223 tsk->state = TASK_RUNNING; ··· 188 237 } 189 238 190 239 /* 191 - * wait for the read lock to be granted 192 - */ 193 - struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem) 194 - { 195 - return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_READ, 196 - -RWSEM_ACTIVE_READ_BIAS); 197 - } 198 - 199 - /* 200 - * wait for the write lock to be granted 240 + * wait until we successfully acquire the write lock 201 241 */ 202 242 struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem) 203 243 { 204 - return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_WRITE, 205 - -RWSEM_ACTIVE_WRITE_BIAS); 244 + long count, adjustment = -RWSEM_ACTIVE_WRITE_BIAS; 245 + struct rwsem_waiter waiter; 246 + struct task_struct *tsk = current; 247 + 248 + /* set up my own style of waitqueue */ 249 + waiter.task = tsk; 250 + waiter.type = RWSEM_WAITING_FOR_WRITE; 251 + 252 + raw_spin_lock_irq(&sem->wait_lock); 253 + if (list_empty(&sem->wait_list)) 254 + adjustment += RWSEM_WAITING_BIAS; 255 + list_add_tail(&waiter.list, &sem->wait_list); 256 + 257 + /* we're now waiting on the lock, but no longer actively locking */ 258 + count = rwsem_atomic_update(adjustment, sem); 259 + 260 + /* If there were already threads queued before us and there are no 261 + * active writers, the lock must be read owned; so we try to wake 262 + * any read locks that were queued ahead of us. */ 263 + if (count > RWSEM_WAITING_BIAS && 264 + adjustment == -RWSEM_ACTIVE_WRITE_BIAS) 265 + sem = __rwsem_do_wake(sem, RWSEM_WAKE_READERS); 266 + 267 + /* wait until we successfully acquire the lock */ 268 + set_task_state(tsk, TASK_UNINTERRUPTIBLE); 269 + while (true) { 270 + if (!(count & RWSEM_ACTIVE_MASK)) { 271 + /* Try acquiring the write lock. */ 272 + count = RWSEM_ACTIVE_WRITE_BIAS; 273 + if (!list_is_singular(&sem->wait_list)) 274 + count += RWSEM_WAITING_BIAS; 275 + if (cmpxchg(&sem->count, RWSEM_WAITING_BIAS, count) == 276 + RWSEM_WAITING_BIAS) 277 + break; 278 + } 279 + 280 + raw_spin_unlock_irq(&sem->wait_lock); 281 + 282 + /* Block until there are no active lockers. */ 283 + do { 284 + schedule(); 285 + set_task_state(tsk, TASK_UNINTERRUPTIBLE); 286 + } while ((count = sem->count) & RWSEM_ACTIVE_MASK); 287 + 288 + raw_spin_lock_irq(&sem->wait_lock); 289 + } 290 + 291 + list_del(&waiter.list); 292 + raw_spin_unlock_irq(&sem->wait_lock); 293 + tsk->state = TASK_RUNNING; 294 + 295 + return sem; 206 296 } 207 297 208 298 /*