Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

net/rds: Get rid of "wait_clean_list_grace" and add locking

Waiting for activity on the "clean_list" to quiesce is no substitute
for proper locking.

We can have multiple threads competing for "llist_del_first"
via "rds_ib_reuse_mr", and a single thread competing
for "llist_del_all" and "llist_del_first" via "rds_ib_flush_mr_pool".

Since "llist_del_first" depends on "list->first->next" not to change
in the midst of the operation, simply waiting for all current calls
to "rds_ib_reuse_mr" to quiesce across all CPUs is woefully inadequate:

By the time "wait_clean_list_grace" is done iterating over all CPUs to see
that there is no concurrent caller to "rds_ib_reuse_mr", a new caller may
have just shown up on the first CPU.

Furthermore, <linux/llist.h> explicitly calls out the need for locking:
* Cases where locking is needed:
* If we have multiple consumers with llist_del_first used in one consumer,
* and llist_del_first or llist_del_all used in other consumers,
* then a lock is needed.

Also, while at it, drop the unused "pool" parameter
from "list_to_llist_nodes".

Signed-off-by: Gerd Rausch <gerd.rausch@oracle.com>
Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

authored by

Gerd Rausch and committed by
David S. Miller
c9467447 2c7da8e6

+18 -37
+1
net/rds/ib_mr.h
··· 98 98 struct llist_head free_list; /* unused MRs */ 99 99 struct llist_head clean_list; /* unused & unmapped MRs */ 100 100 wait_queue_head_t flush_wait; 101 + spinlock_t clean_lock; /* "clean_list" concurrency */ 101 102 102 103 atomic_t free_pinned; /* memory pinned by free MRs */ 103 104 unsigned long max_items;
+17 -37
net/rds/ib_rdma.c
··· 40 40 41 41 struct workqueue_struct *rds_ib_mr_wq; 42 42 43 - static DEFINE_PER_CPU(unsigned long, clean_list_grace); 44 - #define CLEAN_LIST_BUSY_BIT 0 45 - 46 43 static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr) 47 44 { 48 45 struct rds_ib_device *rds_ibdev; ··· 192 195 { 193 196 struct rds_ib_mr *ibmr = NULL; 194 197 struct llist_node *ret; 195 - unsigned long *flag; 198 + unsigned long flags; 196 199 197 - preempt_disable(); 198 - flag = this_cpu_ptr(&clean_list_grace); 199 - set_bit(CLEAN_LIST_BUSY_BIT, flag); 200 + spin_lock_irqsave(&pool->clean_lock, flags); 200 201 ret = llist_del_first(&pool->clean_list); 202 + spin_unlock_irqrestore(&pool->clean_lock, flags); 201 203 if (ret) { 202 204 ibmr = llist_entry(ret, struct rds_ib_mr, llnode); 203 205 if (pool->pool_type == RDS_IB_MR_8K_POOL) ··· 205 209 rds_ib_stats_inc(s_ib_rdma_mr_1m_reused); 206 210 } 207 211 208 - clear_bit(CLEAN_LIST_BUSY_BIT, flag); 209 - preempt_enable(); 210 212 return ibmr; 211 - } 212 - 213 - static inline void wait_clean_list_grace(void) 214 - { 215 - int cpu; 216 - unsigned long *flag; 217 - 218 - for_each_online_cpu(cpu) { 219 - flag = &per_cpu(clean_list_grace, cpu); 220 - while (test_bit(CLEAN_LIST_BUSY_BIT, flag)) 221 - cpu_relax(); 222 - } 223 213 } 224 214 225 215 void rds_ib_sync_mr(void *trans_private, int direction) ··· 306 324 * of clusters. Each cluster has linked llist nodes of 307 325 * MR_CLUSTER_SIZE mrs that are ready for reuse. 308 326 */ 309 - static void list_to_llist_nodes(struct rds_ib_mr_pool *pool, 310 - struct list_head *list, 327 + static void list_to_llist_nodes(struct list_head *list, 311 328 struct llist_node **nodes_head, 312 329 struct llist_node **nodes_tail) 313 330 { ··· 383 402 */ 384 403 dirty_to_clean = llist_append_to_list(&pool->drop_list, &unmap_list); 385 404 dirty_to_clean += llist_append_to_list(&pool->free_list, &unmap_list); 386 - if (free_all) 405 + if (free_all) { 406 + unsigned long flags; 407 + 408 + spin_lock_irqsave(&pool->clean_lock, flags); 387 409 llist_append_to_list(&pool->clean_list, &unmap_list); 410 + spin_unlock_irqrestore(&pool->clean_lock, flags); 411 + } 388 412 389 413 free_goal = rds_ib_flush_goal(pool, free_all); 390 414 ··· 402 416 rds_ib_unreg_fmr(&unmap_list, &nfreed, &unpinned, free_goal); 403 417 404 418 if (!list_empty(&unmap_list)) { 405 - /* we have to make sure that none of the things we're about 406 - * to put on the clean list would race with other cpus trying 407 - * to pull items off. The llist would explode if we managed to 408 - * remove something from the clean list and then add it back again 409 - * while another CPU was spinning on that same item in llist_del_first. 410 - * 411 - * This is pretty unlikely, but just in case wait for an llist grace period 412 - * here before adding anything back into the clean list. 413 - */ 414 - wait_clean_list_grace(); 419 + unsigned long flags; 415 420 416 - list_to_llist_nodes(pool, &unmap_list, &clean_nodes, &clean_tail); 421 + list_to_llist_nodes(&unmap_list, &clean_nodes, &clean_tail); 417 422 if (ibmr_ret) { 418 423 *ibmr_ret = llist_entry(clean_nodes, struct rds_ib_mr, llnode); 419 424 clean_nodes = clean_nodes->next; 420 425 } 421 426 /* more than one entry in llist nodes */ 422 - if (clean_nodes) 427 + if (clean_nodes) { 428 + spin_lock_irqsave(&pool->clean_lock, flags); 423 429 llist_add_batch(clean_nodes, clean_tail, 424 430 &pool->clean_list); 425 - 431 + spin_unlock_irqrestore(&pool->clean_lock, flags); 432 + } 426 433 } 427 434 428 435 atomic_sub(unpinned, &pool->free_pinned); ··· 589 610 init_llist_head(&pool->free_list); 590 611 init_llist_head(&pool->drop_list); 591 612 init_llist_head(&pool->clean_list); 613 + spin_lock_init(&pool->clean_lock); 592 614 mutex_init(&pool->flush_lock); 593 615 init_waitqueue_head(&pool->flush_wait); 594 616 INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);