this repo has no description
at fixPythonPipStalling 393 lines 14 kB view raw
1/* 2 * Copyright (c) 2011 Joakim Johansson <jocke@tbricks.com>. 3 * 4 * @APPLE_APACHE_LICENSE_HEADER_START@ 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 * @APPLE_APACHE_LICENSE_HEADER_END@ 19 */ 20 21#include <stdio.h> 22#include <stdlib.h> 23#include <ctype.h> 24#include <string.h> 25#include <errno.h> 26#include <time.h> 27 28#ifndef _WIN32 29# include <unistd.h> 30# include <pthread.h> 31# include <sys/time.h> 32#endif 33 34#include "latency.h" 35 36pthread_workqueue_t workqueues[WORKQUEUE_COUNT]; 37struct wq_statistics workqueue_statistics[WORKQUEUE_COUNT]; 38struct wq_event_generator workqueue_generator[GENERATOR_WORKQUEUE_COUNT]; 39 40struct wq_statistics global_statistics; 41unsigned int global_stats_used = 0; 42 43pthread_mutex_t generator_mutex; 44pthread_cond_t generator_condition; 45static unsigned int events_processed; 46 47#define PERCENTILE_COUNT 8 48double percentiles[PERCENTILE_COUNT] = {50.0, 80.0, 98.0, 99.0, 99.5, 99.8, 99.9, 99.99}; 49mytime_t real_start, real_end; 50 51#ifdef __APPLE__ 52 53#include <assert.h> 54#include <CoreServices/CoreServices.h> 55#include <mach/mach.h> 56#include <mach/mach_time.h> 57 58static mach_timebase_info_data_t sTimebaseInfo; 59 60// From http://developer.apple.com/library/mac/#qa/qa2004/qa1398.html 61unsigned long gettime(void) 62{ 63 return (mach_absolute_time() * sTimebaseInfo.numer / sTimebaseInfo.denom); 64} 65 66#else 67 68static mytime_t gettime(void) 69{ 70#ifdef __linux__ 71 struct timespec ts; 72 if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0) 73 fprintf(stderr, "Failed to get high resolution clock! errno = %d\n", errno); 74 return ((ts.tv_sec * NANOSECONDS_PER_SECOND) + ts.tv_nsec); 75#elif defined(_WIN32) 76 LARGE_INTEGER now; 77 LARGE_INTEGER freq; 78 if (!QueryPerformanceCounter(&now) ) 79 fprintf(stderr, "Failed to get performance counter!\n"); 80 if (!QueryPerformanceFrequency(&freq) ) 81 fprintf(stderr, "Failed to get performance frequency!\n"); 82 83 return (mytime_t)(now.QuadPart * NANOSECONDS_PER_SECOND / freq.QuadPart); 84#else 85 struct timespec ts; 86 if (clock_gettime(CLOCK_HIGHRES, &ts) != 0) 87 fprintf(stderr, "Failed to get high resolution clock! errno = %d\n", errno); 88 return ((ts.tv_sec * NANOSECONDS_PER_SECOND) + ts.tv_nsec); 89#endif 90} 91 92#endif 93 94#ifdef _WIN32 95 96static void my_sleep(unsigned long nanoseconds) { 97 LARGE_INTEGER start, end; 98 LARGE_INTEGER freq; 99 100 QueryPerformanceCounter(&start); 101 QueryPerformanceFrequency(&freq); 102 103 // sleep with ms resolution ... 104 Sleep(nanoseconds / 1000000); 105 106 // ... and busy-wait afterwards, until the requested delay was reached 107 QueryPerformanceCounter(&end); 108 while( (end.QuadPart - start.QuadPart) * NANOSECONDS_PER_SECOND / freq.QuadPart < nanoseconds ){ 109 YieldProcessor(); 110 QueryPerformanceCounter(&end); 111 } 112 113} 114 115#else 116 117// real resolution on solaris is at best system clock tick, i.e. 100Hz unless having the 118// high res system clock (1000Hz in that case) 119 120static void my_sleep(unsigned long nanoseconds) 121{ 122 struct timespec timeout0; 123 struct timespec timeout1; 124 struct timespec* tmp; 125 struct timespec* t0 = &timeout0; 126 struct timespec* t1 = &timeout1; 127 128 t0->tv_sec = nanoseconds / NANOSECONDS_PER_SECOND; 129 t0->tv_nsec = nanoseconds % NANOSECONDS_PER_SECOND; 130 131 while ((nanosleep(t0, t1) == (-1)) && (errno == EINTR)) 132 { 133 tmp = t0; 134 t0 = t1; 135 t1 = tmp; 136 } 137 138 return; 139} 140 141#endif 142 143static void _process_data(void* context) 144{ 145 struct wq_event *event = (struct wq_event *) context; 146 mytime_t elapsed_time; 147 148 elapsed_time = gettime() - event->start_time; 149 150 workqueue_statistics[event->queue_index].avg = ((workqueue_statistics[event->queue_index].count * workqueue_statistics[event->queue_index].avg) + elapsed_time) / (workqueue_statistics[event->queue_index].count + 1); 151 workqueue_statistics[event->queue_index].total += elapsed_time; 152 workqueue_statistics[event->queue_index].count += 1; 153 154 if (elapsed_time < workqueue_statistics[event->queue_index].min || 155 workqueue_statistics[event->queue_index].min == 0) 156 workqueue_statistics[event->queue_index].min = elapsed_time; 157 158 if (elapsed_time > workqueue_statistics[event->queue_index].max) 159 workqueue_statistics[event->queue_index].max = elapsed_time; 160 161 if ((elapsed_time / 1000) < DISTRIBUTION_BUCKETS) 162 workqueue_statistics[event->queue_index].distribution[(int)(elapsed_time / 1000)] += 1; 163 else 164 workqueue_statistics[event->queue_index].distribution[DISTRIBUTION_BUCKETS-1] += 1; 165 166 // allow generator thread to continue when all events have been processed 167 if (atomic_dec_nv(&events_processed) == 0) 168 { 169 pthread_mutex_lock(&generator_mutex); 170 pthread_cond_signal(&generator_condition); 171 pthread_mutex_unlock(&generator_mutex); 172 } 173 return; 174} 175 176// Perform a small microburst for this tick 177static void _event_tick(void* context) 178{ 179 struct wq_event *current_event; 180 long i, generator_workqueue = (long) context; 181 182 for (i = 0; i < EVENTS_GENERATED_PER_TICK; i++) 183 { 184 current_event = &workqueue_generator[generator_workqueue].wq_events[i]; 185 current_event->start_time = gettime(); 186 current_event->queue_index = (current_event->start_time % WORKQUEUE_COUNT); 187 188 (void) pthread_workqueue_additem_np(workqueues[current_event->queue_index], _process_data, current_event, NULL, NULL); 189 } 190 191 return; 192} 193 194static void _generate_simulated_events() 195{ 196 unsigned long i = 0, tick; 197 mytime_t overhead; 198 mytime_t start, current, overhead_start = 0, overhead_end = 0; 199 200 start = current = gettime(); 201 202 for (tick = 0; tick < TOTAL_TICKS_TO_RUN; tick++) 203 { 204 start = current = overhead_end; 205 overhead = overhead_end - overhead_start; 206 207 // wait until we have waited proper amount of time for current rate 208 // we should remove overhead of previous lap to not lag behind in data rate 209 // one call to gethrtime() alone is around 211ns on Nehalem 2.93 210 // use busy waiting in case the frequency is higher than the supported resolution of nanosleep() 211 212 if (overhead > EVENT_TIME_SLICE) 213 { 214 printf("Warning: Event processing overhead > event time slice, readjust test parameters.\n"); 215 } 216 else 217 if ((EVENT_GENERATION_FREQUENCY > SYSTEM_CLOCK_RESOLUTION) || FORCE_BUSY_LOOP) 218 { 219 while ((current - start) < (EVENT_TIME_SLICE - overhead)) 220 current = gettime(); 221 } 222 else 223 { 224 my_sleep(EVENT_TIME_SLICE - overhead); 225 } 226 227 overhead_start = gettime(); 228 229 events_processed = GENERATOR_WORKQUEUE_COUNT * EVENTS_GENERATED_PER_TICK; // number of items that will be processed 230 231#if (LATENCY_RUN_GENERATOR_IN_MAIN_THREAD == 0) 232 for (i = 0; i < GENERATOR_WORKQUEUE_COUNT; i++) 233 (void) pthread_workqueue_additem_np(workqueue_generator[i].wq, _event_tick, (void *) i, NULL, NULL); 234#else 235 _event_tick((void *)i); 236#endif 237 238 // wait for all events to be processed 239 pthread_mutex_lock(&generator_mutex); 240 while (events_processed > 0) 241 pthread_cond_wait(&generator_condition, &generator_mutex); 242 pthread_mutex_unlock(&generator_mutex); 243 244 overhead_end = gettime(); 245 } 246 247 return; 248} 249 250static void _gather_statistics(unsigned long queue_index) 251{ 252 unsigned long i; 253 254 if (workqueue_statistics[queue_index].count > 0) 255 { 256 global_stats_used ++; 257 258 global_statistics.avg = ((global_statistics.count * global_statistics.avg) + (workqueue_statistics[queue_index].avg * workqueue_statistics[queue_index].count)) / (global_statistics.count + workqueue_statistics[queue_index].count); 259 global_statistics.total += workqueue_statistics[queue_index].total; 260 global_statistics.count += workqueue_statistics[queue_index].count; 261 262 if (workqueue_statistics[queue_index].min < global_statistics.min || global_statistics.min == 0) 263 global_statistics.min = workqueue_statistics[queue_index].min; 264 265 if (workqueue_statistics[queue_index].max > global_statistics.max) 266 global_statistics.max = workqueue_statistics[queue_index].max; 267 268 for (i = 0; i < DISTRIBUTION_BUCKETS; i++) 269 global_statistics.distribution[i] += workqueue_statistics[queue_index].distribution[i]; 270 } 271 272 return; 273} 274 275void _print_statistics() 276{ 277 unsigned long i, j, total_events = 0, last_percentile = 0, accumulated_percentile = 0; 278 279 printf("Collecting statistics...\n"); 280 281 for (i = 0; i < WORKQUEUE_COUNT; i++) 282 _gather_statistics(i); 283 284 printf("Test is done, run time was %.3f seconds, %.1fM events generated and processed.\n", (double)((double)(real_end - real_start) / (double) NANOSECONDS_PER_SECOND), total_events/1000000.0); 285 286 //FIXME - casting from mytime_t (u_long) to int will truncate the result 287 printf("Global dispatch queue aggregate statistics for %d queues: %dM events, min = %d ns, avg = %.1f ns, max = %d ns\n", 288 global_stats_used, global_statistics.count/1000000, (int) global_statistics.min, global_statistics.avg, (int) global_statistics.max); 289 290 printf("\nDistribution:\n"); 291 for (i = 0; i < DISTRIBUTION_BUCKETS; i++) 292 { 293 printf("%3ld us: %d ", i, global_statistics.distribution[i]); 294 for (j=0; j<(((double) global_statistics.distribution[i] / (double) global_statistics.count) * 400.0); j++) 295 printf("*"); 296 printf("\n"); 297 } 298 299 printf("\nPercentiles:\n"); 300 301 for (i = 0; i < DISTRIBUTION_BUCKETS; i++) 302 { 303 while ((last_percentile < PERCENTILE_COUNT) && ((100.0 * ((double) accumulated_percentile / (double) global_statistics.count)) > percentiles[last_percentile])) 304 { 305 printf("%.2f < %ld us\n", percentiles[last_percentile], i-1); 306 last_percentile++; 307 } 308 accumulated_percentile += global_statistics.distribution[i]; 309 } 310 311 while ((last_percentile < PERCENTILE_COUNT) && ((100.0 * ((double) accumulated_percentile / (double) global_statistics.count)) > percentiles[last_percentile])) 312 { 313 printf("%.2f > %d us\n", percentiles[last_percentile], DISTRIBUTION_BUCKETS-1); 314 last_percentile++; 315 } 316 317 return; 318} 319 320int main(void) 321{ 322 int i; 323 pthread_workqueue_attr_t attr; 324 325#ifdef __APPLE__ 326 (void) mach_timebase_info(&sTimebaseInfo); 327#endif 328 329#ifdef MAKE_STATIC 330 pthread_workqueue_init_np(); 331#endif 332 333 memset(&workqueues, 0, sizeof(workqueues)); 334 memset(&workqueue_statistics, 0, sizeof(workqueue_statistics)); 335 memset(&global_statistics, 0, sizeof(global_statistics)); 336 memset(&workqueue_generator, 0, sizeof(workqueue_generator)); 337 338 pthread_mutex_init(&generator_mutex, NULL); 339 pthread_cond_init(&generator_condition, NULL); 340 341 if (pthread_workqueue_attr_init_np(&attr) != 0) 342 fprintf(stderr, "Failed to set workqueue attributes\n"); 343 344 for (i = 0; i < GENERATOR_WORKQUEUE_COUNT; i++) 345 { 346 if (pthread_workqueue_attr_setqueuepriority_np(&attr, i) != 0) 347 fprintf(stderr, "Failed to set workqueue priority\n"); 348 349 if (pthread_workqueue_attr_setovercommit_np(&attr, 1) != 0) 350 fprintf(stderr, "Failed to set workqueue overcommit\n"); 351 352 workqueue_generator[i].wq_events = malloc(sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK); 353 memset(workqueue_generator[i].wq_events, 0, (sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK)); 354 355 if (pthread_workqueue_create_np(&workqueue_generator[i].wq, &attr) != 0) 356 fprintf(stderr, "Failed to create workqueue\n"); 357 } 358 359 for (i = 0; i < WORKQUEUE_COUNT; i++) 360 { 361 if (pthread_workqueue_attr_init_np(&attr) != 0) 362 fprintf(stderr, "Failed to set workqueue attributes\n"); 363 364 if (pthread_workqueue_attr_setqueuepriority_np(&attr, i) != 0) 365 fprintf(stderr, "Failed to set workqueue priority\n"); 366 367 if (pthread_workqueue_create_np(&workqueues[i], &attr) != 0) 368 fprintf(stderr, "Failed to create workqueue\n"); 369 } 370 371 if (SLEEP_BEFORE_START > 0) 372 { 373 printf("Sleeping for %d seconds to allow for processor set configuration...\n",SLEEP_BEFORE_START); 374 sleep(SLEEP_BEFORE_START); 375 } 376 377 printf("%d workqueues, running for %d seconds at %d Hz, %d events per tick.\n",WORKQUEUE_COUNT, SECONDS_TO_RUN, EVENT_GENERATION_FREQUENCY, EVENTS_GENERATED_PER_TICK); 378 379 printf("Running %d generator threads at %dK events/s, the aggregated data rate is %dK events/s. %.2f MB is used for %.2fK events.\n", 380 GENERATOR_WORKQUEUE_COUNT,AGGREGATE_DATA_RATE_PER_SECOND/1000, TOTAL_DATA_PER_SECOND/1000, 381 (double) GENERATOR_WORKQUEUE_COUNT * ((sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK + sizeof(workqueues))/(1024.0*1024.0)), 382 GENERATOR_WORKQUEUE_COUNT * EVENTS_GENERATED_PER_TICK/1000.0); 383 384 real_start = gettime(); 385 386 _generate_simulated_events(); 387 388 real_end = gettime(); 389 390 _print_statistics(); 391 392 return 0; 393}