this repo has no description
1/*
2 * Copyright (c) 2011 Joakim Johansson <jocke@tbricks.com>.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21#include <stdio.h>
22#include <stdlib.h>
23#include <ctype.h>
24#include <string.h>
25#include <errno.h>
26#include <time.h>
27
28#ifndef _WIN32
29# include <unistd.h>
30# include <pthread.h>
31# include <sys/time.h>
32#endif
33
34#include "latency.h"
35
36pthread_workqueue_t workqueues[WORKQUEUE_COUNT];
37struct wq_statistics workqueue_statistics[WORKQUEUE_COUNT];
38struct wq_event_generator workqueue_generator[GENERATOR_WORKQUEUE_COUNT];
39
40struct wq_statistics global_statistics;
41unsigned int global_stats_used = 0;
42
43pthread_mutex_t generator_mutex;
44pthread_cond_t generator_condition;
45static unsigned int events_processed;
46
47#define PERCENTILE_COUNT 8
48double percentiles[PERCENTILE_COUNT] = {50.0, 80.0, 98.0, 99.0, 99.5, 99.8, 99.9, 99.99};
49mytime_t real_start, real_end;
50
51#ifdef __APPLE__
52
53#include <assert.h>
54#include <CoreServices/CoreServices.h>
55#include <mach/mach.h>
56#include <mach/mach_time.h>
57
58static mach_timebase_info_data_t sTimebaseInfo;
59
60// From http://developer.apple.com/library/mac/#qa/qa2004/qa1398.html
61unsigned long gettime(void)
62{
63 return (mach_absolute_time() * sTimebaseInfo.numer / sTimebaseInfo.denom);
64}
65
66#else
67
68static mytime_t gettime(void)
69{
70#ifdef __linux__
71 struct timespec ts;
72 if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0)
73 fprintf(stderr, "Failed to get high resolution clock! errno = %d\n", errno);
74 return ((ts.tv_sec * NANOSECONDS_PER_SECOND) + ts.tv_nsec);
75#elif defined(_WIN32)
76 LARGE_INTEGER now;
77 LARGE_INTEGER freq;
78 if (!QueryPerformanceCounter(&now) )
79 fprintf(stderr, "Failed to get performance counter!\n");
80 if (!QueryPerformanceFrequency(&freq) )
81 fprintf(stderr, "Failed to get performance frequency!\n");
82
83 return (mytime_t)(now.QuadPart * NANOSECONDS_PER_SECOND / freq.QuadPart);
84#else
85 struct timespec ts;
86 if (clock_gettime(CLOCK_HIGHRES, &ts) != 0)
87 fprintf(stderr, "Failed to get high resolution clock! errno = %d\n", errno);
88 return ((ts.tv_sec * NANOSECONDS_PER_SECOND) + ts.tv_nsec);
89#endif
90}
91
92#endif
93
94#ifdef _WIN32
95
96static void my_sleep(unsigned long nanoseconds) {
97 LARGE_INTEGER start, end;
98 LARGE_INTEGER freq;
99
100 QueryPerformanceCounter(&start);
101 QueryPerformanceFrequency(&freq);
102
103 // sleep with ms resolution ...
104 Sleep(nanoseconds / 1000000);
105
106 // ... and busy-wait afterwards, until the requested delay was reached
107 QueryPerformanceCounter(&end);
108 while( (end.QuadPart - start.QuadPart) * NANOSECONDS_PER_SECOND / freq.QuadPart < nanoseconds ){
109 YieldProcessor();
110 QueryPerformanceCounter(&end);
111 }
112
113}
114
115#else
116
117// real resolution on solaris is at best system clock tick, i.e. 100Hz unless having the
118// high res system clock (1000Hz in that case)
119
120static void my_sleep(unsigned long nanoseconds)
121{
122 struct timespec timeout0;
123 struct timespec timeout1;
124 struct timespec* tmp;
125 struct timespec* t0 = &timeout0;
126 struct timespec* t1 = &timeout1;
127
128 t0->tv_sec = nanoseconds / NANOSECONDS_PER_SECOND;
129 t0->tv_nsec = nanoseconds % NANOSECONDS_PER_SECOND;
130
131 while ((nanosleep(t0, t1) == (-1)) && (errno == EINTR))
132 {
133 tmp = t0;
134 t0 = t1;
135 t1 = tmp;
136 }
137
138 return;
139}
140
141#endif
142
143static void _process_data(void* context)
144{
145 struct wq_event *event = (struct wq_event *) context;
146 mytime_t elapsed_time;
147
148 elapsed_time = gettime() - event->start_time;
149
150 workqueue_statistics[event->queue_index].avg = ((workqueue_statistics[event->queue_index].count * workqueue_statistics[event->queue_index].avg) + elapsed_time) / (workqueue_statistics[event->queue_index].count + 1);
151 workqueue_statistics[event->queue_index].total += elapsed_time;
152 workqueue_statistics[event->queue_index].count += 1;
153
154 if (elapsed_time < workqueue_statistics[event->queue_index].min ||
155 workqueue_statistics[event->queue_index].min == 0)
156 workqueue_statistics[event->queue_index].min = elapsed_time;
157
158 if (elapsed_time > workqueue_statistics[event->queue_index].max)
159 workqueue_statistics[event->queue_index].max = elapsed_time;
160
161 if ((elapsed_time / 1000) < DISTRIBUTION_BUCKETS)
162 workqueue_statistics[event->queue_index].distribution[(int)(elapsed_time / 1000)] += 1;
163 else
164 workqueue_statistics[event->queue_index].distribution[DISTRIBUTION_BUCKETS-1] += 1;
165
166 // allow generator thread to continue when all events have been processed
167 if (atomic_dec_nv(&events_processed) == 0)
168 {
169 pthread_mutex_lock(&generator_mutex);
170 pthread_cond_signal(&generator_condition);
171 pthread_mutex_unlock(&generator_mutex);
172 }
173 return;
174}
175
176// Perform a small microburst for this tick
177static void _event_tick(void* context)
178{
179 struct wq_event *current_event;
180 long i, generator_workqueue = (long) context;
181
182 for (i = 0; i < EVENTS_GENERATED_PER_TICK; i++)
183 {
184 current_event = &workqueue_generator[generator_workqueue].wq_events[i];
185 current_event->start_time = gettime();
186 current_event->queue_index = (current_event->start_time % WORKQUEUE_COUNT);
187
188 (void) pthread_workqueue_additem_np(workqueues[current_event->queue_index], _process_data, current_event, NULL, NULL);
189 }
190
191 return;
192}
193
194static void _generate_simulated_events()
195{
196 unsigned long i = 0, tick;
197 mytime_t overhead;
198 mytime_t start, current, overhead_start = 0, overhead_end = 0;
199
200 start = current = gettime();
201
202 for (tick = 0; tick < TOTAL_TICKS_TO_RUN; tick++)
203 {
204 start = current = overhead_end;
205 overhead = overhead_end - overhead_start;
206
207 // wait until we have waited proper amount of time for current rate
208 // we should remove overhead of previous lap to not lag behind in data rate
209 // one call to gethrtime() alone is around 211ns on Nehalem 2.93
210 // use busy waiting in case the frequency is higher than the supported resolution of nanosleep()
211
212 if (overhead > EVENT_TIME_SLICE)
213 {
214 printf("Warning: Event processing overhead > event time slice, readjust test parameters.\n");
215 }
216 else
217 if ((EVENT_GENERATION_FREQUENCY > SYSTEM_CLOCK_RESOLUTION) || FORCE_BUSY_LOOP)
218 {
219 while ((current - start) < (EVENT_TIME_SLICE - overhead))
220 current = gettime();
221 }
222 else
223 {
224 my_sleep(EVENT_TIME_SLICE - overhead);
225 }
226
227 overhead_start = gettime();
228
229 events_processed = GENERATOR_WORKQUEUE_COUNT * EVENTS_GENERATED_PER_TICK; // number of items that will be processed
230
231#if (LATENCY_RUN_GENERATOR_IN_MAIN_THREAD == 0)
232 for (i = 0; i < GENERATOR_WORKQUEUE_COUNT; i++)
233 (void) pthread_workqueue_additem_np(workqueue_generator[i].wq, _event_tick, (void *) i, NULL, NULL);
234#else
235 _event_tick((void *)i);
236#endif
237
238 // wait for all events to be processed
239 pthread_mutex_lock(&generator_mutex);
240 while (events_processed > 0)
241 pthread_cond_wait(&generator_condition, &generator_mutex);
242 pthread_mutex_unlock(&generator_mutex);
243
244 overhead_end = gettime();
245 }
246
247 return;
248}
249
250static void _gather_statistics(unsigned long queue_index)
251{
252 unsigned long i;
253
254 if (workqueue_statistics[queue_index].count > 0)
255 {
256 global_stats_used ++;
257
258 global_statistics.avg = ((global_statistics.count * global_statistics.avg) + (workqueue_statistics[queue_index].avg * workqueue_statistics[queue_index].count)) / (global_statistics.count + workqueue_statistics[queue_index].count);
259 global_statistics.total += workqueue_statistics[queue_index].total;
260 global_statistics.count += workqueue_statistics[queue_index].count;
261
262 if (workqueue_statistics[queue_index].min < global_statistics.min || global_statistics.min == 0)
263 global_statistics.min = workqueue_statistics[queue_index].min;
264
265 if (workqueue_statistics[queue_index].max > global_statistics.max)
266 global_statistics.max = workqueue_statistics[queue_index].max;
267
268 for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
269 global_statistics.distribution[i] += workqueue_statistics[queue_index].distribution[i];
270 }
271
272 return;
273}
274
275void _print_statistics()
276{
277 unsigned long i, j, total_events = 0, last_percentile = 0, accumulated_percentile = 0;
278
279 printf("Collecting statistics...\n");
280
281 for (i = 0; i < WORKQUEUE_COUNT; i++)
282 _gather_statistics(i);
283
284 printf("Test is done, run time was %.3f seconds, %.1fM events generated and processed.\n", (double)((double)(real_end - real_start) / (double) NANOSECONDS_PER_SECOND), total_events/1000000.0);
285
286 //FIXME - casting from mytime_t (u_long) to int will truncate the result
287 printf("Global dispatch queue aggregate statistics for %d queues: %dM events, min = %d ns, avg = %.1f ns, max = %d ns\n",
288 global_stats_used, global_statistics.count/1000000, (int) global_statistics.min, global_statistics.avg, (int) global_statistics.max);
289
290 printf("\nDistribution:\n");
291 for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
292 {
293 printf("%3ld us: %d ", i, global_statistics.distribution[i]);
294 for (j=0; j<(((double) global_statistics.distribution[i] / (double) global_statistics.count) * 400.0); j++)
295 printf("*");
296 printf("\n");
297 }
298
299 printf("\nPercentiles:\n");
300
301 for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
302 {
303 while ((last_percentile < PERCENTILE_COUNT) && ((100.0 * ((double) accumulated_percentile / (double) global_statistics.count)) > percentiles[last_percentile]))
304 {
305 printf("%.2f < %ld us\n", percentiles[last_percentile], i-1);
306 last_percentile++;
307 }
308 accumulated_percentile += global_statistics.distribution[i];
309 }
310
311 while ((last_percentile < PERCENTILE_COUNT) && ((100.0 * ((double) accumulated_percentile / (double) global_statistics.count)) > percentiles[last_percentile]))
312 {
313 printf("%.2f > %d us\n", percentiles[last_percentile], DISTRIBUTION_BUCKETS-1);
314 last_percentile++;
315 }
316
317 return;
318}
319
320int main(void)
321{
322 int i;
323 pthread_workqueue_attr_t attr;
324
325#ifdef __APPLE__
326 (void) mach_timebase_info(&sTimebaseInfo);
327#endif
328
329#ifdef MAKE_STATIC
330 pthread_workqueue_init_np();
331#endif
332
333 memset(&workqueues, 0, sizeof(workqueues));
334 memset(&workqueue_statistics, 0, sizeof(workqueue_statistics));
335 memset(&global_statistics, 0, sizeof(global_statistics));
336 memset(&workqueue_generator, 0, sizeof(workqueue_generator));
337
338 pthread_mutex_init(&generator_mutex, NULL);
339 pthread_cond_init(&generator_condition, NULL);
340
341 if (pthread_workqueue_attr_init_np(&attr) != 0)
342 fprintf(stderr, "Failed to set workqueue attributes\n");
343
344 for (i = 0; i < GENERATOR_WORKQUEUE_COUNT; i++)
345 {
346 if (pthread_workqueue_attr_setqueuepriority_np(&attr, i) != 0)
347 fprintf(stderr, "Failed to set workqueue priority\n");
348
349 if (pthread_workqueue_attr_setovercommit_np(&attr, 1) != 0)
350 fprintf(stderr, "Failed to set workqueue overcommit\n");
351
352 workqueue_generator[i].wq_events = malloc(sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK);
353 memset(workqueue_generator[i].wq_events, 0, (sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK));
354
355 if (pthread_workqueue_create_np(&workqueue_generator[i].wq, &attr) != 0)
356 fprintf(stderr, "Failed to create workqueue\n");
357 }
358
359 for (i = 0; i < WORKQUEUE_COUNT; i++)
360 {
361 if (pthread_workqueue_attr_init_np(&attr) != 0)
362 fprintf(stderr, "Failed to set workqueue attributes\n");
363
364 if (pthread_workqueue_attr_setqueuepriority_np(&attr, i) != 0)
365 fprintf(stderr, "Failed to set workqueue priority\n");
366
367 if (pthread_workqueue_create_np(&workqueues[i], &attr) != 0)
368 fprintf(stderr, "Failed to create workqueue\n");
369 }
370
371 if (SLEEP_BEFORE_START > 0)
372 {
373 printf("Sleeping for %d seconds to allow for processor set configuration...\n",SLEEP_BEFORE_START);
374 sleep(SLEEP_BEFORE_START);
375 }
376
377 printf("%d workqueues, running for %d seconds at %d Hz, %d events per tick.\n",WORKQUEUE_COUNT, SECONDS_TO_RUN, EVENT_GENERATION_FREQUENCY, EVENTS_GENERATED_PER_TICK);
378
379 printf("Running %d generator threads at %dK events/s, the aggregated data rate is %dK events/s. %.2f MB is used for %.2fK events.\n",
380 GENERATOR_WORKQUEUE_COUNT,AGGREGATE_DATA_RATE_PER_SECOND/1000, TOTAL_DATA_PER_SECOND/1000,
381 (double) GENERATOR_WORKQUEUE_COUNT * ((sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK + sizeof(workqueues))/(1024.0*1024.0)),
382 GENERATOR_WORKQUEUE_COUNT * EVENTS_GENERATED_PER_TICK/1000.0);
383
384 real_start = gettime();
385
386 _generate_simulated_events();
387
388 real_end = gettime();
389
390 _print_statistics();
391
392 return 0;
393}