this repo has no description
at fixPythonPipStalling 292 lines 5.9 kB view raw
1#include <limits.h> 2#include <stdlib.h> 3#include <stdio.h> 4#include <string.h> 5 6#if !defined(_WIN32) 7# include <sys/wait.h> 8# if !defined(NO_CONFIG_H) 9# include "config.h" 10# endif 11# include <semaphore.h> 12#else 13# define inline _inline 14# include "../../src/windows/platform.h" 15# include "posix_semaphore.h" 16#endif 17#include "../../src/private.h" 18 19#if HAVE_ERR_H 20# include <err.h> 21#else 22# define err(rc,msg,...) do { perror(msg); exit(rc); } while (0) 23# define errx(rc,msg,...) do { puts(msg); exit(rc); } while (0) 24#endif 25 26#include <pthread_workqueue.h> 27 28static int work_cnt; 29 30/* If non-zero, extra debugging statements will be printed */ 31static int dbg = 0; 32 33static sem_t test_complete; 34static int test_rounds; 35#undef dbg_puts 36#define dbg_puts(s) if (dbg) puts(s) 37#undef dbg_printf 38#define dbg_printf(fmt,...) if (dbg) fprintf(stderr, fmt, __VA_ARGS__) 39 40void additem(pthread_workqueue_t wq, void (*func)(void *), 41 void * arg) 42{ 43 44 int rv; 45 46 rv = pthread_workqueue_additem_np(wq, *func, arg, NULL, NULL); 47 if (rv != 0) 48 errx(1, "unable to add item: %s", strerror(rv)); 49 dbg_puts("added item\n"); 50} 51 52void 53mark_progress(void) 54{ 55 static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; 56 57 pthread_mutex_lock(&mtx); 58 test_rounds--; 59 pthread_mutex_unlock(&mtx); 60 61 dbg_printf("rounds = %d\n", test_rounds); 62 if (test_rounds == 0) { 63 sem_post(&test_complete); 64 } 65} 66 67void 68sem_up(void *arg) 69{ 70 dbg_puts("semaphore UP\n"); 71 sem_post((sem_t *) arg); 72 mark_progress(); 73} 74 75void 76sem_down(void *arg) 77{ 78 dbg_puts("semaphore DOWN\n"); 79 sem_wait((sem_t *) arg); 80 dbg_puts("semaphore UP\n"); 81 sem_post((sem_t *) arg); 82 mark_progress(); 83} 84 85void 86compute(void *arg) 87{ 88 int *count = (int *) arg; 89#define nval 5000 90 int val[nval]; 91 int i,j; 92 93 /* Do some useless computation */ 94 for (i = 0; i < nval; i++) { 95 val[i] = INT_MAX; 96 } 97 for (j = 0; j < nval; j++) { 98 for (i = 0; i < nval; i++) { 99 val[i] /= 3; 100 val[i] *= 2; 101 val[i] /= 4; 102 val[i] *= 5; 103 } 104 } 105 106 if (count != NULL) 107 (*count)--; 108} 109 110 111void 112sleepy(void *msg) 113{ 114 printf("%s\n", (char *) msg); 115 if (strcmp(msg, "done") == 0) 116 exit(0); 117 sleep(random() % 6); 118} 119 120void 121lazy(void *arg) 122{ 123 sleep(3); 124 dbg_printf("item %lu complete\n", (unsigned long) arg); 125 work_cnt--; 126} 127 128void 129run_blocking_test(pthread_workqueue_t wq, int rounds) 130{ 131 long i = 0; 132 work_cnt = rounds; 133 for (i = 0; i < rounds; i++) { 134 additem(wq, lazy, (void *) i); 135 } 136 while (work_cnt > 0) 137 sleep(1); 138} 139 140void 141run_cond_wait_test(pthread_workqueue_t wq) 142{ 143 const int rounds = 10; 144 long i = 0; 145 146 sleep(3); /* Allow time for the workers to enter pthread_cond_wait() */ 147 work_cnt = rounds; 148 for (i = 0; i < rounds; i++) { 149 additem(wq, lazy, (void *) i); 150 sleep(1); 151 } 152 while (work_cnt > 0) 153 sleep(1); 154} 155 156void 157run_load_test(pthread_workqueue_t wq) 158{ 159 char buf[16]; 160 int i = 0; 161 for (i = 0; i < 1024; i++) { 162 sprintf(buf, "%d", i); 163 additem(wq, sleepy, strdup(buf)); 164 additem(wq, compute, NULL); 165 } 166 additem(wq, sleepy, "done"); 167} 168 169/* Try to overwhelm the CPU with computation requests */ 170void 171run_stress_test(pthread_workqueue_t wq, int rounds) 172{ 173 int i = 0; 174 work_cnt = rounds; 175 for (i = 0; i < rounds; i++) { 176 additem(wq, compute, &work_cnt); 177 } 178 while (work_cnt > 0) 179 sleep(1); 180} 181 182/* 183 * Ensure that the library is reinitialized after fork(2) is called. 184 */ 185void 186run_fork_test(pthread_workqueue_t wq) 187{ 188#if !defined(_WIN32) 189 pid_t pid; 190 int rv, status, timeout; 191 192 puts("fork test... "); 193 pid = fork(); 194 if (pid < 0) 195 err(1, "fork"); 196 197 if (pid == 0) { 198 /* Child */ 199 wq = NULL; 200 rv = pthread_workqueue_create_np(&wq, NULL); 201 if (rv < 0) 202 errx(1, "pthread_workqueue_create_np"); 203 work_cnt = 1; 204 timeout = 5; 205 additem(wq, compute, &work_cnt); 206 while (work_cnt > 0) { 207 sleep(1); 208 if (--timeout == 0) 209 errx(1, "work was not completed"); 210 } 211 exit(0); 212 } else { 213 /* Parent */ 214 if (wait(&status) != pid) 215 err(1, "waitpid"); 216 if (WEXITSTATUS(status) != 0) 217 errx(1, "fork test failed"); 218 puts("ok\n"); 219 } 220#else 221 puts("fork test... N/A\n"); 222#endif 223} 224 225void 226run_overcommit_test(pthread_workqueue_t wq) 227{ 228 sem_t sem; 229 pthread_workqueue_t ocwq; 230 pthread_workqueue_attr_t attr; 231 int i, rv; 232 233 (void)wq; 234 sem_init(&sem, 0, 0); 235 236 printf("pthread_workqueue_create_np() - overcommit enabled "); 237 pthread_workqueue_attr_init_np(&attr); 238 pthread_workqueue_attr_setovercommit_np(&attr, 1); 239 rv = pthread_workqueue_create_np(&ocwq, &attr); 240 if (rv != 0) 241 err(1, "failed"); 242 243 puts("ok\n"); 244 245 printf("stress test - overcommit enabled "); 246 run_stress_test(ocwq, 25); 247 puts("ok\n"); 248 249 /* FIXME: should use a multiple of the number of CPUs instead of magic number */ 250 printf("deadlock test - overcommit enabled "); 251 test_rounds = 41; 252 for (i = 0; i < 40; i++) { 253 additem(ocwq, sem_down, &sem); 254 } 255 additem(ocwq, sem_up, &sem); 256 sem_wait(&test_complete); 257 puts("ok\n"); 258} 259 260int main() { 261 pthread_workqueue_t wq; 262 int rv; 263 264#ifdef MAKE_STATIC 265 pthread_workqueue_init_np(); 266#endif 267 268 sem_init(&test_complete, 0, 0); 269 270 run_overcommit_test(NULL); 271 272 printf("pthread_workqueue_create_np().. "); 273 rv = pthread_workqueue_create_np(&wq, NULL); 274 if (rv != 0) 275 err(1, "failed"); 276 printf("ok\n"); 277 278 printf("stress test.. "); 279 run_stress_test(wq, 25); 280 printf("ok\n"); 281 282 run_fork_test(wq); 283 284 //run_deadlock_test(); 285// run_cond_wait_test(); 286// run_blocking_test(); 287 //run_load_test(); 288 289 290 puts("All tests completed.\n"); 291 exit(0); 292}