Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

padata: Fix race on sequence number wrap

When padata_do_parallel() is called from multiple cpus for the same
padata instance, we can get object reordering on sequence number wrap
because testing for sequence number wrap and reseting the sequence
number must happen atomically but is implemented with two atomic
operations. This patch fixes this by converting the sequence number
from atomic_t to an unsigned int and protect the access with a
spin_lock. As a side effect, we get rid of the sequence number wrap
handling because the seqence number wraps back to null now without
the need to do anything.

Signed-off-by: Steffen Klassert <steffen.klassert@secunet.com>
Signed-off-by: Herbert Xu <herbert@gondor.apana.org.au>

authored by

Steffen Klassert and committed by
Herbert Xu
2dc9b5db 3047817b

+12 -32
+2 -4
include/linux/padata.h
··· 46 46 struct list_head list; 47 47 struct parallel_data *pd; 48 48 int cb_cpu; 49 - int seq_nr; 50 49 int info; 51 50 void (*parallel)(struct padata_priv *padata); 52 51 void (*serial)(struct padata_priv *padata); ··· 115 116 * @pinst: padata instance. 116 117 * @pqueue: percpu padata queues used for parallelization. 117 118 * @squeue: percpu padata queues used for serialuzation. 118 - * @seq_nr: The sequence number that will be attached to the next object. 119 119 * @reorder_objects: Number of objects waiting in the reorder queues. 120 120 * @refcnt: Number of objects holding a reference on this parallel_data. 121 121 * @max_seq_nr: Maximal used sequence number. ··· 127 129 struct padata_instance *pinst; 128 130 struct padata_parallel_queue __percpu *pqueue; 129 131 struct padata_serial_queue __percpu *squeue; 130 - atomic_t seq_nr; 131 132 atomic_t reorder_objects; 132 133 atomic_t refcnt; 133 - unsigned int max_seq_nr; 134 134 struct padata_cpumask cpumask; 135 135 spinlock_t lock ____cacheline_aligned; 136 + spinlock_t seq_lock; 137 + unsigned int seq_nr; 136 138 unsigned int processed; 137 139 struct timer_list timer; 138 140 };
+10 -28
kernel/padata.c
··· 29 29 #include <linux/sysfs.h> 30 30 #include <linux/rcupdate.h> 31 31 32 - #define MAX_SEQ_NR (INT_MAX - NR_CPUS) 33 32 #define MAX_OBJ_NUM 1000 34 33 35 34 static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index) ··· 42 43 return target_cpu; 43 44 } 44 45 45 - static int padata_cpu_hash(struct padata_priv *padata) 46 + static int padata_cpu_hash(struct parallel_data *pd) 46 47 { 47 48 int cpu_index; 48 - struct parallel_data *pd; 49 - 50 - pd = padata->pd; 51 49 52 50 /* 53 51 * Hash the sequence numbers to the cpus by taking 54 52 * seq_nr mod. number of cpus in use. 55 53 */ 56 - cpu_index = padata->seq_nr % cpumask_weight(pd->cpumask.pcpu); 54 + 55 + spin_lock(&pd->seq_lock); 56 + cpu_index = pd->seq_nr % cpumask_weight(pd->cpumask.pcpu); 57 + pd->seq_nr++; 58 + spin_unlock(&pd->seq_lock); 57 59 58 60 return padata_index_to_cpu(pd, cpu_index); 59 61 } ··· 132 132 padata->pd = pd; 133 133 padata->cb_cpu = cb_cpu; 134 134 135 - if (unlikely(atomic_read(&pd->seq_nr) == pd->max_seq_nr)) 136 - atomic_set(&pd->seq_nr, -1); 137 - 138 - padata->seq_nr = atomic_inc_return(&pd->seq_nr); 139 - 140 - target_cpu = padata_cpu_hash(padata); 135 + target_cpu = padata_cpu_hash(pd); 141 136 queue = per_cpu_ptr(pd->pqueue, target_cpu); 142 137 143 138 spin_lock(&queue->parallel.lock); ··· 168 173 static struct padata_priv *padata_get_next(struct parallel_data *pd) 169 174 { 170 175 int cpu, num_cpus; 171 - int next_nr, next_index; 176 + unsigned int next_nr, next_index; 172 177 struct padata_parallel_queue *queue, *next_queue; 173 178 struct padata_priv *padata; 174 179 struct padata_list *reorder; ··· 184 189 cpu = padata_index_to_cpu(pd, next_index); 185 190 next_queue = per_cpu_ptr(pd->pqueue, cpu); 186 191 187 - if (unlikely(next_nr > pd->max_seq_nr)) { 188 - next_nr = next_nr - pd->max_seq_nr - 1; 189 - next_index = next_nr % num_cpus; 190 - cpu = padata_index_to_cpu(pd, next_index); 191 - next_queue = per_cpu_ptr(pd->pqueue, cpu); 192 - pd->processed = 0; 193 - } 194 - 195 192 padata = NULL; 196 193 197 194 reorder = &next_queue->reorder; ··· 191 204 if (!list_empty(&reorder->list)) { 192 205 padata = list_entry(reorder->list.next, 193 206 struct padata_priv, list); 194 - 195 - BUG_ON(next_nr != padata->seq_nr); 196 207 197 208 spin_lock(&reorder->lock); 198 209 list_del_init(&padata->list); ··· 387 402 /* Initialize all percpu queues used by parallel workers */ 388 403 static void padata_init_pqueues(struct parallel_data *pd) 389 404 { 390 - int cpu_index, num_cpus, cpu; 405 + int cpu_index, cpu; 391 406 struct padata_parallel_queue *pqueue; 392 407 393 408 cpu_index = 0; ··· 402 417 INIT_WORK(&pqueue->work, padata_parallel_worker); 403 418 atomic_set(&pqueue->num_obj, 0); 404 419 } 405 - 406 - num_cpus = cpumask_weight(pd->cpumask.pcpu); 407 - pd->max_seq_nr = num_cpus ? (MAX_SEQ_NR / num_cpus) * num_cpus - 1 : 0; 408 420 } 409 421 410 422 /* Allocate and initialize the internal cpumask dependend resources. */ ··· 428 446 padata_init_pqueues(pd); 429 447 padata_init_squeues(pd); 430 448 setup_timer(&pd->timer, padata_reorder_timer, (unsigned long)pd); 431 - atomic_set(&pd->seq_nr, -1); 449 + pd->seq_nr = 0; 432 450 atomic_set(&pd->reorder_objects, 0); 433 451 atomic_set(&pd->refcnt, 0); 434 452 pd->pinst = pinst;