this repo has no description
1#include <limits.h>
2#include <stdlib.h>
3#include <stdio.h>
4#include <string.h>
5
6#if !defined(_WIN32)
7# include <sys/wait.h>
8# if !defined(NO_CONFIG_H)
9# include "config.h"
10# endif
11# include <semaphore.h>
12#else
13# define inline _inline
14# include "../../src/windows/platform.h"
15# include "posix_semaphore.h"
16#endif
17#include "../../src/private.h"
18
19#if HAVE_ERR_H
20# include <err.h>
21#else
22# define err(rc,msg,...) do { perror(msg); exit(rc); } while (0)
23# define errx(rc,msg,...) do { puts(msg); exit(rc); } while (0)
24#endif
25
26#include <pthread_workqueue.h>
27
28static int work_cnt;
29
30/* If non-zero, extra debugging statements will be printed */
31static int dbg = 0;
32
33static sem_t test_complete;
34static int test_rounds;
35#undef dbg_puts
36#define dbg_puts(s) if (dbg) puts(s)
37#undef dbg_printf
38#define dbg_printf(fmt,...) if (dbg) fprintf(stderr, fmt, __VA_ARGS__)
39
40void additem(pthread_workqueue_t wq, void (*func)(void *),
41 void * arg)
42{
43
44 int rv;
45
46 rv = pthread_workqueue_additem_np(wq, *func, arg, NULL, NULL);
47 if (rv != 0)
48 errx(1, "unable to add item: %s", strerror(rv));
49 dbg_puts("added item\n");
50}
51
52void
53mark_progress(void)
54{
55 static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
56
57 pthread_mutex_lock(&mtx);
58 test_rounds--;
59 pthread_mutex_unlock(&mtx);
60
61 dbg_printf("rounds = %d\n", test_rounds);
62 if (test_rounds == 0) {
63 sem_post(&test_complete);
64 }
65}
66
67void
68sem_up(void *arg)
69{
70 dbg_puts("semaphore UP\n");
71 sem_post((sem_t *) arg);
72 mark_progress();
73}
74
75void
76sem_down(void *arg)
77{
78 dbg_puts("semaphore DOWN\n");
79 sem_wait((sem_t *) arg);
80 dbg_puts("semaphore UP\n");
81 sem_post((sem_t *) arg);
82 mark_progress();
83}
84
85void
86compute(void *arg)
87{
88 int *count = (int *) arg;
89#define nval 5000
90 int val[nval];
91 int i,j;
92
93 /* Do some useless computation */
94 for (i = 0; i < nval; i++) {
95 val[i] = INT_MAX;
96 }
97 for (j = 0; j < nval; j++) {
98 for (i = 0; i < nval; i++) {
99 val[i] /= 3;
100 val[i] *= 2;
101 val[i] /= 4;
102 val[i] *= 5;
103 }
104 }
105
106 if (count != NULL)
107 (*count)--;
108}
109
110
111void
112sleepy(void *msg)
113{
114 printf("%s\n", (char *) msg);
115 if (strcmp(msg, "done") == 0)
116 exit(0);
117 sleep(random() % 6);
118}
119
120void
121lazy(void *arg)
122{
123 sleep(3);
124 dbg_printf("item %lu complete\n", (unsigned long) arg);
125 work_cnt--;
126}
127
128void
129run_blocking_test(pthread_workqueue_t wq, int rounds)
130{
131 long i = 0;
132 work_cnt = rounds;
133 for (i = 0; i < rounds; i++) {
134 additem(wq, lazy, (void *) i);
135 }
136 while (work_cnt > 0)
137 sleep(1);
138}
139
140void
141run_cond_wait_test(pthread_workqueue_t wq)
142{
143 const int rounds = 10;
144 long i = 0;
145
146 sleep(3); /* Allow time for the workers to enter pthread_cond_wait() */
147 work_cnt = rounds;
148 for (i = 0; i < rounds; i++) {
149 additem(wq, lazy, (void *) i);
150 sleep(1);
151 }
152 while (work_cnt > 0)
153 sleep(1);
154}
155
156void
157run_load_test(pthread_workqueue_t wq)
158{
159 char buf[16];
160 int i = 0;
161 for (i = 0; i < 1024; i++) {
162 sprintf(buf, "%d", i);
163 additem(wq, sleepy, strdup(buf));
164 additem(wq, compute, NULL);
165 }
166 additem(wq, sleepy, "done");
167}
168
169/* Try to overwhelm the CPU with computation requests */
170void
171run_stress_test(pthread_workqueue_t wq, int rounds)
172{
173 int i = 0;
174 work_cnt = rounds;
175 for (i = 0; i < rounds; i++) {
176 additem(wq, compute, &work_cnt);
177 }
178 while (work_cnt > 0)
179 sleep(1);
180}
181
182/*
183 * Ensure that the library is reinitialized after fork(2) is called.
184 */
185void
186run_fork_test(pthread_workqueue_t wq)
187{
188#if !defined(_WIN32)
189 pid_t pid;
190 int rv, status, timeout;
191
192 puts("fork test... ");
193 pid = fork();
194 if (pid < 0)
195 err(1, "fork");
196
197 if (pid == 0) {
198 /* Child */
199 wq = NULL;
200 rv = pthread_workqueue_create_np(&wq, NULL);
201 if (rv < 0)
202 errx(1, "pthread_workqueue_create_np");
203 work_cnt = 1;
204 timeout = 5;
205 additem(wq, compute, &work_cnt);
206 while (work_cnt > 0) {
207 sleep(1);
208 if (--timeout == 0)
209 errx(1, "work was not completed");
210 }
211 exit(0);
212 } else {
213 /* Parent */
214 if (wait(&status) != pid)
215 err(1, "waitpid");
216 if (WEXITSTATUS(status) != 0)
217 errx(1, "fork test failed");
218 puts("ok\n");
219 }
220#else
221 puts("fork test... N/A\n");
222#endif
223}
224
225void
226run_overcommit_test(pthread_workqueue_t wq)
227{
228 sem_t sem;
229 pthread_workqueue_t ocwq;
230 pthread_workqueue_attr_t attr;
231 int i, rv;
232
233 (void)wq;
234 sem_init(&sem, 0, 0);
235
236 printf("pthread_workqueue_create_np() - overcommit enabled ");
237 pthread_workqueue_attr_init_np(&attr);
238 pthread_workqueue_attr_setovercommit_np(&attr, 1);
239 rv = pthread_workqueue_create_np(&ocwq, &attr);
240 if (rv != 0)
241 err(1, "failed");
242
243 puts("ok\n");
244
245 printf("stress test - overcommit enabled ");
246 run_stress_test(ocwq, 25);
247 puts("ok\n");
248
249 /* FIXME: should use a multiple of the number of CPUs instead of magic number */
250 printf("deadlock test - overcommit enabled ");
251 test_rounds = 41;
252 for (i = 0; i < 40; i++) {
253 additem(ocwq, sem_down, &sem);
254 }
255 additem(ocwq, sem_up, &sem);
256 sem_wait(&test_complete);
257 puts("ok\n");
258}
259
260int main() {
261 pthread_workqueue_t wq;
262 int rv;
263
264#ifdef MAKE_STATIC
265 pthread_workqueue_init_np();
266#endif
267
268 sem_init(&test_complete, 0, 0);
269
270 run_overcommit_test(NULL);
271
272 printf("pthread_workqueue_create_np().. ");
273 rv = pthread_workqueue_create_np(&wq, NULL);
274 if (rv != 0)
275 err(1, "failed");
276 printf("ok\n");
277
278 printf("stress test.. ");
279 run_stress_test(wq, 25);
280 printf("ok\n");
281
282 run_fork_test(wq);
283
284 //run_deadlock_test();
285// run_cond_wait_test();
286// run_blocking_test();
287 //run_load_test();
288
289
290 puts("All tests completed.\n");
291 exit(0);
292}