Reactos
at master 3516 lines 111 kB view raw
1/* 2 * Thread pooling 3 * 4 * Copyright (c) 2006 Robert Shearman 5 * Copyright (c) 2014-2016 Sebastian Lackner 6 * 7 * This library is free software; you can redistribute it and/or 8 * modify it under the terms of the GNU Lesser General Public 9 * License as published by the Free Software Foundation; either 10 * version 2.1 of the License, or (at your option) any later version. 11 * 12 * This library is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 * Lesser General Public License for more details. 16 * 17 * You should have received a copy of the GNU Lesser General Public 18 * License along with this library; if not, write to the Free Software 19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA 20 */ 21 22 23#ifdef __REACTOS__ 24#include <rtl_vista.h> 25#define NDEBUG 26#include "wine/list.h" 27#include <debug.h> 28 29#define ERR(fmt, ...) DPRINT1(fmt, ##__VA_ARGS__) 30#define FIXME(fmt, ...) DPRINT(fmt, ##__VA_ARGS__) 31#define WARN(fmt, ...) DPRINT(fmt, ##__VA_ARGS__) 32#define TRACE(fmt, ...) DPRINT(fmt, ##__VA_ARGS__) 33#ifndef ARRAY_SIZE 34#define ARRAY_SIZE(_x) (sizeof((_x))/sizeof((_x)[0])) 35#endif 36 37typedef void (CALLBACK *PNTAPCFUNC)(ULONG_PTR,ULONG_PTR,ULONG_PTR); 38typedef void (CALLBACK *PRTL_THREAD_START_ROUTINE)(LPVOID); 39typedef DWORD (CALLBACK *PRTL_WORK_ITEM_ROUTINE)(LPVOID); 40typedef void (NTAPI *RTL_WAITORTIMERCALLBACKFUNC)(PVOID,BOOLEAN); 41typedef VOID (CALLBACK *PRTL_OVERLAPPED_COMPLETION_ROUTINE)(DWORD,DWORD,LPVOID); 42 43typedef void (CALLBACK *PTP_IO_CALLBACK)(PTP_CALLBACK_INSTANCE,void*,void*,IO_STATUS_BLOCK*,PTP_IO); 44NTSYSAPI NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *); 45#define PRTL_WORK_ITEM_ROUTINE WORKERCALLBACKFUNC 46 47#define CRITICAL_SECTION RTL_CRITICAL_SECTION 48#define GetProcessHeap() RtlGetProcessHeap() 49#define GetCurrentProcess() NtCurrentProcess() 50#define GetCurrentThread() NtCurrentThread() 51#define GetCurrentThreadId() HandleToULong(NtCurrentTeb()->ClientId.UniqueThread) 52#else 53#include <assert.h> 54#include <stdarg.h> 55#include <limits.h> 56 57#include "ntstatus.h" 58#define WIN32_NO_STATUS 59#include "winternl.h" 60 61#include "wine/debug.h" 62#include "wine/list.h" 63 64#include "ntdll_misc.h" 65 66WINE_DEFAULT_DEBUG_CHANNEL(threadpool); 67#endif 68 69/* 70 * Old thread pooling API 71 */ 72 73struct rtl_work_item 74{ 75 PRTL_WORK_ITEM_ROUTINE function; 76 PVOID context; 77}; 78 79#define EXPIRE_NEVER (~(ULONGLONG)0) 80#define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */ 81 82#ifndef __REACTOS__ 83static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug; 84#endif 85 86static struct 87{ 88 HANDLE compl_port; 89 RTL_CRITICAL_SECTION threadpool_compl_cs; 90} 91old_threadpool = 92{ 93 NULL, /* compl_port */ 94#ifdef __REACTOS__ 95 {0}, /* threadpool_compl_cs */ 96#else 97 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */ 98#endif 99}; 100 101#ifndef __REACTOS__ 102static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug = 103{ 104 0, 0, &old_threadpool.threadpool_compl_cs, 105 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList }, 106 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") } 107}; 108#endif 109 110struct timer_queue; 111struct queue_timer 112{ 113 struct timer_queue *q; 114 struct list entry; 115 ULONG runcount; /* number of callbacks pending execution */ 116 RTL_WAITORTIMERCALLBACKFUNC callback; 117 PVOID param; 118 DWORD period; 119 ULONG flags; 120 ULONGLONG expire; 121 BOOL destroy; /* timer should be deleted; once set, never unset */ 122 HANDLE event; /* removal event */ 123}; 124 125struct timer_queue 126{ 127 DWORD magic; 128 RTL_CRITICAL_SECTION cs; 129 struct list timers; /* sorted by expiration time */ 130 BOOL quit; /* queue should be deleted; once set, never unset */ 131 HANDLE event; 132 HANDLE thread; 133}; 134 135/* 136 * Object-oriented thread pooling API 137 */ 138 139#define THREADPOOL_WORKER_TIMEOUT 5000 140#define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1) 141 142/* internal threadpool representation */ 143struct threadpool 144{ 145 LONG refcount; 146 LONG objcount; 147 BOOL shutdown; 148 CRITICAL_SECTION cs; 149 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */ 150 struct list pools[3]; 151 RTL_CONDITION_VARIABLE update_event; 152 /* information about worker threads, locked via .cs */ 153 int max_workers; 154 int min_workers; 155 int num_workers; 156 int num_busy_workers; 157 HANDLE compl_port; 158 TP_POOL_STACK_INFORMATION stack_info; 159}; 160 161enum threadpool_objtype 162{ 163 TP_OBJECT_TYPE_SIMPLE, 164 TP_OBJECT_TYPE_WORK, 165 TP_OBJECT_TYPE_TIMER, 166 TP_OBJECT_TYPE_WAIT, 167 TP_OBJECT_TYPE_IO, 168}; 169 170struct io_completion 171{ 172 IO_STATUS_BLOCK iosb; 173 ULONG_PTR cvalue; 174}; 175 176/* internal threadpool object representation */ 177struct threadpool_object 178{ 179 void *win32_callback; /* leave space for kernelbase to store win32 callback */ 180 LONG refcount; 181 BOOL shutdown; 182 /* read-only information */ 183 enum threadpool_objtype type; 184 struct threadpool *pool; 185 struct threadpool_group *group; 186 PVOID userdata; 187 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback; 188 PTP_SIMPLE_CALLBACK finalization_callback; 189 BOOL may_run_long; 190 HMODULE race_dll; 191 TP_CALLBACK_PRIORITY priority; 192 /* information about the group, locked via .group->cs */ 193 struct list group_entry; 194 BOOL is_group_member; 195 /* information about the pool, locked via .pool->cs */ 196 struct list pool_entry; 197 RTL_CONDITION_VARIABLE finished_event; 198 RTL_CONDITION_VARIABLE group_finished_event; 199 HANDLE completed_event; 200 LONG num_pending_callbacks; 201 LONG num_running_callbacks; 202 LONG num_associated_callbacks; 203 /* arguments for callback */ 204 union 205 { 206 struct 207 { 208 PTP_SIMPLE_CALLBACK callback; 209 } simple; 210 struct 211 { 212 PTP_WORK_CALLBACK callback; 213 } work; 214 struct 215 { 216 PTP_TIMER_CALLBACK callback; 217 /* information about the timer, locked via timerqueue.cs */ 218 BOOL timer_initialized; 219 BOOL timer_pending; 220 struct list timer_entry; 221 BOOL timer_set; 222 ULONGLONG timeout; 223 LONG period; 224 LONG window_length; 225 } timer; 226 struct 227 { 228 PTP_WAIT_CALLBACK callback; 229 LONG signaled; 230 /* information about the wait object, locked via waitqueue.cs */ 231 struct waitqueue_bucket *bucket; 232 BOOL wait_pending; 233 struct list wait_entry; 234 ULONGLONG timeout; 235 HANDLE handle; 236 DWORD flags; 237 RTL_WAITORTIMERCALLBACKFUNC rtl_callback; 238 } wait; 239 struct 240 { 241 PTP_IO_CALLBACK callback; 242 /* locked via .pool->cs */ 243 unsigned int pending_count, skipped_count, completion_count, completion_max; 244 BOOL shutting_down; 245 struct io_completion *completions; 246 } io; 247 } u; 248}; 249 250/* internal threadpool instance representation */ 251struct threadpool_instance 252{ 253 struct threadpool_object *object; 254 DWORD threadid; 255 BOOL associated; 256 BOOL may_run_long; 257 struct 258 { 259 CRITICAL_SECTION *critical_section; 260 HANDLE mutex; 261 HANDLE semaphore; 262 LONG semaphore_count; 263 HANDLE event; 264 HMODULE library; 265 } cleanup; 266}; 267 268/* internal threadpool group representation */ 269struct threadpool_group 270{ 271 LONG refcount; 272 BOOL shutdown; 273 CRITICAL_SECTION cs; 274 /* list of group members, locked via .cs */ 275 struct list members; 276}; 277 278#ifndef __REACTOS__ 279/* global timerqueue object */ 280static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug; 281#endif 282 283static struct 284{ 285 CRITICAL_SECTION cs; 286 LONG objcount; 287 BOOL thread_running; 288 struct list pending_timers; 289 RTL_CONDITION_VARIABLE update_event; 290} 291timerqueue = 292{ 293#ifdef __REACTOS__ 294 {0}, /* cs */ 295#else 296 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */ 297#endif 298 0, /* objcount */ 299 FALSE, /* thread_running */ 300 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */ 301#if __REACTOS__ 302 0, 303#else 304 RTL_CONDITION_VARIABLE_INIT /* update_event */ 305#endif 306}; 307 308#ifndef __REACTOS__ 309static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug = 310{ 311 0, 0, &timerqueue.cs, 312 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList }, 313 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") } 314}; 315 316/* global waitqueue object */ 317static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug; 318#endif 319 320static struct 321{ 322 CRITICAL_SECTION cs; 323 LONG num_buckets; 324 struct list buckets; 325} 326waitqueue = 327{ 328#ifdef __REACTOS__ 329 {0}, /* cs */ 330#else 331 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */ 332#endif 333 0, /* num_buckets */ 334 LIST_INIT( waitqueue.buckets ) /* buckets */ 335}; 336 337#ifndef __REACTOS__ 338static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug = 339{ 340 0, 0, &waitqueue.cs, 341 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList }, 342 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") } 343}; 344#endif 345 346struct waitqueue_bucket 347{ 348 struct list bucket_entry; 349 LONG objcount; 350 struct list reserved; 351 struct list waiting; 352 HANDLE update_event; 353 BOOL alertable; 354}; 355 356#ifndef __REACTOS__ 357/* global I/O completion queue object */ 358static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug; 359#endif 360 361static struct 362{ 363 CRITICAL_SECTION cs; 364 LONG objcount; 365 BOOL thread_running; 366 HANDLE port; 367 RTL_CONDITION_VARIABLE update_event; 368} 369ioqueue = 370{ 371#ifdef __REACTOS__ 372 .cs = {0}, 373#else 374 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 }, 375#endif 376}; 377 378#ifndef __REACTOS__ 379static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug = 380{ 381 0, 0, &ioqueue.cs, 382 { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList }, 383 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") } 384}; 385#endif 386 387static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool ) 388{ 389 return (struct threadpool *)pool; 390} 391 392static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work ) 393{ 394 struct threadpool_object *object = (struct threadpool_object *)work; 395 assert( object->type == TP_OBJECT_TYPE_WORK ); 396 return object; 397} 398 399static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer ) 400{ 401 struct threadpool_object *object = (struct threadpool_object *)timer; 402 assert( object->type == TP_OBJECT_TYPE_TIMER ); 403 return object; 404} 405 406static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait ) 407{ 408 struct threadpool_object *object = (struct threadpool_object *)wait; 409 assert( object->type == TP_OBJECT_TYPE_WAIT ); 410 return object; 411} 412 413static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io ) 414{ 415 struct threadpool_object *object = (struct threadpool_object *)io; 416 assert( object->type == TP_OBJECT_TYPE_IO ); 417 return object; 418} 419 420static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group ) 421{ 422 return (struct threadpool_group *)group; 423} 424 425static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance ) 426{ 427 return (struct threadpool_instance *)instance; 428} 429 430#ifdef __REACTOS__ 431ULONG NTAPI threadpool_worker_proc(PVOID param ); 432#else 433static void CALLBACK threadpool_worker_proc( void *param ); 434#endif 435static void tp_object_submit( struct threadpool_object *object, BOOL signaled ); 436static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread ); 437static void tp_object_prepare_shutdown( struct threadpool_object *object ); 438static BOOL tp_object_release( struct threadpool_object *object ); 439static struct threadpool *default_threadpool = NULL; 440 441static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size) 442{ 443 unsigned int new_capacity, max_capacity; 444 void *new_elements; 445 446 if (count <= *capacity) 447 return TRUE; 448 449 max_capacity = ~(SIZE_T)0 / size; 450 if (count > max_capacity) 451 return FALSE; 452 453 new_capacity = max(4, *capacity); 454 while (new_capacity < count && new_capacity <= max_capacity / 2) 455 new_capacity *= 2; 456 if (new_capacity < count) 457 new_capacity = max_capacity; 458 459 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size ))) 460 return FALSE; 461 462 *elements = new_elements; 463 *capacity = new_capacity; 464 465 return TRUE; 466} 467 468static void set_thread_name(const WCHAR *name) 469{ 470#ifndef __REACTOS__ // This is impossible on non vista+ 471 THREAD_NAME_INFORMATION info; 472 473 RtlInitUnicodeString(&info.ThreadName, name); 474 NtSetInformationThread(GetCurrentThread(), ThreadNameInformation, &info, sizeof(info)); 475#endif 476} 477 478#ifndef __REACTOS__ 479static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata ) 480{ 481 struct rtl_work_item *item = userdata; 482 483 TRACE("executing %p(%p)\n", item->function, item->context); 484 item->function( item->context ); 485 486 RtlFreeHeap( GetProcessHeap(), 0, item ); 487} 488 489/*********************************************************************** 490 * RtlQueueWorkItem (NTDLL.@) 491 * 492 * Queues a work item into a thread in the thread pool. 493 * 494 * PARAMS 495 * function [I] Work function to execute. 496 * context [I] Context to pass to the work function when it is executed. 497 * flags [I] Flags. See notes. 498 * 499 * RETURNS 500 * Success: STATUS_SUCCESS. 501 * Failure: Any NTSTATUS code. 502 * 503 * NOTES 504 * Flags can be one or more of the following: 505 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread. 506 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread. 507 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent. 508 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time. 509 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token. 510 */ 511NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags ) 512{ 513 TP_CALLBACK_ENVIRON environment; 514 struct rtl_work_item *item; 515 NTSTATUS status; 516 517 TRACE( "%p %p %lu\n", function, context, flags ); 518 519 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) ); 520 if (!item) 521 return STATUS_NO_MEMORY; 522 523 memset( &environment, 0, sizeof(environment) ); 524 environment.Version = 1; 525 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0; 526 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0; 527 528 item->function = function; 529 item->context = context; 530 531 status = TpSimpleTryPost( process_rtl_work_item, item, &environment ); 532 if (status) RtlFreeHeap( GetProcessHeap(), 0, item ); 533 return status; 534} 535 536/*********************************************************************** 537 * iocp_poller - get completion events and run callbacks 538 */ 539static DWORD CALLBACK iocp_poller(LPVOID Arg) 540{ 541 HANDLE cport = Arg; 542 543 while( TRUE ) 544 { 545 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback; 546 LPVOID overlapped; 547 IO_STATUS_BLOCK iosb; 548#ifdef __REACTOS__ 549 NTSTATUS res = NtRemoveIoCompletion( cport, (PVOID)&callback, (PVOID)&overlapped, &iosb, NULL ); 550#else 551 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL ); 552#endif 553 if (res) 554 { 555 ERR("NtRemoveIoCompletion failed: 0x%lx\n", res); 556 } 557 else 558 { 559 DWORD transferred = 0; 560 DWORD err = 0; 561 562 if (iosb.Status == STATUS_SUCCESS) 563 transferred = iosb.Information; 564 else 565 err = RtlNtStatusToDosError(iosb.Status); 566 567 callback( err, transferred, overlapped ); 568 } 569 } 570 return 0; 571} 572 573/*********************************************************************** 574 * RtlSetIoCompletionCallback (NTDLL.@) 575 * 576 * Binds a handle to a thread pool's completion port, and possibly 577 * starts a non-I/O thread to monitor this port and call functions back. 578 * 579 * PARAMS 580 * FileHandle [I] Handle to bind to a completion port. 581 * Function [I] Callback function to call on I/O completions. 582 * Flags [I] Not used. 583 * 584 * RETURNS 585 * Success: STATUS_SUCCESS. 586 * Failure: Any NTSTATUS code. 587 * 588 */ 589NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags) 590{ 591 IO_STATUS_BLOCK iosb; 592 FILE_COMPLETION_INFORMATION info; 593 594 if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags); 595 596 if (!old_threadpool.compl_port) 597 { 598 NTSTATUS res = STATUS_SUCCESS; 599 600 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs); 601 if (!old_threadpool.compl_port) 602 { 603 HANDLE cport; 604 605 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 ); 606 if (!res) 607 { 608 /* FIXME native can start additional threads in case of e.g. hung callback function. */ 609 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT ); 610 if (!res) 611 old_threadpool.compl_port = cport; 612 else 613 NtClose( cport ); 614 } 615 } 616 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs); 617 if (res) return res; 618 } 619 620 info.CompletionPort = old_threadpool.compl_port; 621 info.CompletionKey = (ULONG_PTR)Function; 622 623 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation ); 624} 625#endif 626 627static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout ) 628{ 629 if (timeout == INFINITE) return NULL; 630 pTime->QuadPart = (ULONGLONG)timeout * -10000; 631 return pTime; 632} 633 634/************************** Timer Queue Impl **************************/ 635 636static void queue_remove_timer(struct queue_timer *t) 637{ 638 /* We MUST hold the queue cs while calling this function. This ensures 639 that we cannot queue another callback for this timer. The runcount 640 being zero makes sure we don't have any already queued. */ 641 struct timer_queue *q = t->q; 642 643 assert(t->runcount == 0); 644 assert(t->destroy); 645 646 list_remove(&t->entry); 647 if (t->event) 648 NtSetEvent(t->event, NULL); 649 RtlFreeHeap(GetProcessHeap(), 0, t); 650 651 if (q->quit && list_empty(&q->timers)) 652 NtSetEvent(q->event, NULL); 653} 654 655static void timer_cleanup_callback(struct queue_timer *t) 656{ 657 struct timer_queue *q = t->q; 658 RtlEnterCriticalSection(&q->cs); 659 660 assert(0 < t->runcount); 661 --t->runcount; 662 663 if (t->destroy && t->runcount == 0) 664 queue_remove_timer(t); 665 666 RtlLeaveCriticalSection(&q->cs); 667} 668 669static DWORD WINAPI timer_callback_wrapper(LPVOID p) 670{ 671 struct queue_timer *t = p; 672 t->callback(t->param, TRUE); 673 timer_cleanup_callback(t); 674 return 0; 675} 676 677static inline ULONGLONG queue_current_time(void) 678{ 679 LARGE_INTEGER now, freq; 680 NtQueryPerformanceCounter(&now, &freq); 681 return now.QuadPart * 1000 / freq.QuadPart; 682} 683 684static void queue_add_timer(struct queue_timer *t, ULONGLONG time, 685 BOOL set_event) 686{ 687 /* We MUST hold the queue cs while calling this function. */ 688 struct timer_queue *q = t->q; 689 struct list *ptr = &q->timers; 690 691 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER)); 692 693 if (time != EXPIRE_NEVER) 694 LIST_FOR_EACH(ptr, &q->timers) 695 { 696 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry); 697 if (time < cur->expire) 698 break; 699 } 700 list_add_before(ptr, &t->entry); 701 702 t->expire = time; 703 704 /* If we insert at the head of the list, we need to expire sooner 705 than expected. */ 706 if (set_event && &t->entry == list_head(&q->timers)) 707 NtSetEvent(q->event, NULL); 708} 709 710static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time, 711 BOOL set_event) 712{ 713 /* We MUST hold the queue cs while calling this function. */ 714 list_remove(&t->entry); 715 queue_add_timer(t, time, set_event); 716} 717 718static void queue_timer_expire(struct timer_queue *q) 719{ 720 struct queue_timer *t = NULL; 721 722 RtlEnterCriticalSection(&q->cs); 723 if (list_head(&q->timers)) 724 { 725 ULONGLONG now, next; 726 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry); 727 if (!t->destroy && t->expire <= ((now = queue_current_time()))) 728 { 729 ++t->runcount; 730 if (t->period) 731 { 732 next = t->expire + t->period; 733 /* avoid trigger cascade if overloaded / hibernated */ 734 if (next < now) 735 next = now + t->period; 736 } 737 else 738 next = EXPIRE_NEVER; 739 queue_move_timer(t, next, FALSE); 740 } 741 else 742 t = NULL; 743 } 744 RtlLeaveCriticalSection(&q->cs); 745 746 if (t) 747 { 748 if (t->flags & WT_EXECUTEINTIMERTHREAD) 749 timer_callback_wrapper(t); 750 else 751 { 752 ULONG flags 753 = (t->flags 754 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD 755 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION)); 756 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags); 757 if (status != STATUS_SUCCESS) 758 timer_cleanup_callback(t); 759 } 760 } 761} 762 763static ULONG queue_get_timeout(struct timer_queue *q) 764{ 765 struct queue_timer *t; 766 ULONG timeout = INFINITE; 767 768 RtlEnterCriticalSection(&q->cs); 769 if (list_head(&q->timers)) 770 { 771 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry); 772 assert(!t->destroy || t->expire == EXPIRE_NEVER); 773 774 if (t->expire != EXPIRE_NEVER) 775 { 776 ULONGLONG time = queue_current_time(); 777 timeout = t->expire < time ? 0 : t->expire - time; 778 } 779 } 780 RtlLeaveCriticalSection(&q->cs); 781 782 return timeout; 783} 784 785#ifdef __REACTOS__ 786ULONG NTAPI timer_queue_thread_proc(PVOID p) 787#else 788static void WINAPI timer_queue_thread_proc(LPVOID p) 789#endif 790{ 791 struct timer_queue *q = p; 792 ULONG timeout_ms; 793 794 set_thread_name(L"wine_threadpool_timer_queue"); 795 timeout_ms = INFINITE; 796 for (;;) 797 { 798 LARGE_INTEGER timeout; 799 NTSTATUS status; 800 BOOL done = FALSE; 801 802 status = NtWaitForSingleObject( 803 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms)); 804 805 if (status == STATUS_WAIT_0) 806 { 807 /* There are two possible ways to trigger the event. Either 808 we are quitting and the last timer got removed, or a new 809 timer got put at the head of the list so we need to adjust 810 our timeout. */ 811 RtlEnterCriticalSection(&q->cs); 812 if (q->quit && list_empty(&q->timers)) 813 done = TRUE; 814 RtlLeaveCriticalSection(&q->cs); 815 } 816 else if (status == STATUS_TIMEOUT) 817 queue_timer_expire(q); 818 819 if (done) 820 break; 821 822 timeout_ms = queue_get_timeout(q); 823 } 824 825 NtClose(q->event); 826 RtlDeleteCriticalSection(&q->cs); 827 q->magic = 0; 828 RtlFreeHeap(GetProcessHeap(), 0, q); 829 RtlExitUserThread( 0 ); 830#ifdef __REACTOS__ 831 return STATUS_SUCCESS; 832#endif 833} 834 835#ifndef __REACTOS__ 836 837static void queue_destroy_timer(struct queue_timer *t) 838{ 839 /* We MUST hold the queue cs while calling this function. */ 840 t->destroy = TRUE; 841 if (t->runcount == 0) 842 /* Ensure a timer is promptly removed. If callbacks are pending, 843 it will be removed after the last one finishes by the callback 844 cleanup wrapper. */ 845 queue_remove_timer(t); 846 else 847 /* Make sure no destroyed timer masks an active timer at the head 848 of the sorted list. */ 849 queue_move_timer(t, EXPIRE_NEVER, FALSE); 850} 851 852/*********************************************************************** 853 * RtlCreateTimerQueue (NTDLL.@) 854 * 855 * Creates a timer queue object and returns a handle to it. 856 * 857 * PARAMS 858 * NewTimerQueue [O] The newly created queue. 859 * 860 * RETURNS 861 * Success: STATUS_SUCCESS. 862 * Failure: Any NTSTATUS code. 863 */ 864NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue) 865{ 866 NTSTATUS status; 867 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q); 868 if (!q) 869 return STATUS_NO_MEMORY; 870 871 RtlInitializeCriticalSection(&q->cs); 872 list_init(&q->timers); 873 q->quit = FALSE; 874 q->magic = TIMER_QUEUE_MAGIC; 875 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE); 876 if (status != STATUS_SUCCESS) 877 { 878 RtlFreeHeap(GetProcessHeap(), 0, q); 879 return status; 880 } 881 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, 0, 0, 0, 882 timer_queue_thread_proc, q, &q->thread, NULL); 883 if (status != STATUS_SUCCESS) 884 { 885 NtClose(q->event); 886 RtlFreeHeap(GetProcessHeap(), 0, q); 887 return status; 888 } 889 890 *NewTimerQueue = q; 891 return STATUS_SUCCESS; 892} 893 894/*********************************************************************** 895 * RtlDeleteTimerQueueEx (NTDLL.@) 896 * 897 * Deletes a timer queue object. 898 * 899 * PARAMS 900 * TimerQueue [I] The timer queue to destroy. 901 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE, 902 * wait until all timers are finished firing before 903 * returning. Otherwise, return immediately and set the 904 * event when all timers are done. 905 * 906 * RETURNS 907 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not. 908 * Failure: Any NTSTATUS code. 909 */ 910NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent) 911{ 912 struct timer_queue *q = TimerQueue; 913 struct queue_timer *t, *temp; 914 HANDLE thread; 915 NTSTATUS status; 916 917 if (!q || q->magic != TIMER_QUEUE_MAGIC) 918 return STATUS_INVALID_HANDLE; 919 920 thread = q->thread; 921 922 RtlEnterCriticalSection(&q->cs); 923 q->quit = TRUE; 924 if (list_head(&q->timers)) 925 /* When the last timer is removed, it will signal the timer thread to 926 exit... */ 927 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry) 928 queue_destroy_timer(t); 929 else 930 /* However if we have none, we must do it ourselves. */ 931 NtSetEvent(q->event, NULL); 932 RtlLeaveCriticalSection(&q->cs); 933 934 if (CompletionEvent == INVALID_HANDLE_VALUE) 935 { 936 NtWaitForSingleObject(thread, FALSE, NULL); 937 status = STATUS_SUCCESS; 938 } 939 else 940 { 941 if (CompletionEvent) 942 { 943 FIXME("asynchronous return on completion event unimplemented\n"); 944 NtWaitForSingleObject(thread, FALSE, NULL); 945 NtSetEvent(CompletionEvent, NULL); 946 } 947 status = STATUS_PENDING; 948 } 949 950 NtClose(thread); 951 return status; 952} 953 954static struct timer_queue *get_timer_queue(HANDLE TimerQueue) 955{ 956 static struct timer_queue *default_timer_queue; 957 958 if (TimerQueue) 959 return TimerQueue; 960 else 961 { 962 if (!default_timer_queue) 963 { 964 HANDLE q; 965 NTSTATUS status = RtlCreateTimerQueue(&q); 966 if (status == STATUS_SUCCESS) 967 { 968 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL ); 969 if (p) 970 /* Got beat to the punch. */ 971 RtlDeleteTimerQueueEx(q, NULL); 972 } 973 } 974 return default_timer_queue; 975 } 976} 977 978/*********************************************************************** 979 * RtlCreateTimer (NTDLL.@) 980 * 981 * Creates a new timer associated with the given queue. 982 * 983 * PARAMS 984 * TimerQueue [I] The queue to hold the timer. 985 * NewTimer [O] The newly created timer. 986 * Callback [I] The callback to fire. 987 * Parameter [I] The argument for the callback. 988 * DueTime [I] The delay, in milliseconds, before first firing the 989 * timer. 990 * Period [I] The period, in milliseconds, at which to fire the timer 991 * after the first callback. If zero, the timer will only 992 * fire once. It still needs to be deleted with 993 * RtlDeleteTimer. 994 * Flags [I] Flags controlling the execution of the callback. In 995 * addition to the WT_* thread pool flags (see 996 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and 997 * WT_EXECUTEONLYONCE are supported. 998 * 999 * RETURNS 1000 * Success: STATUS_SUCCESS. 1001 * Failure: Any NTSTATUS code. 1002 */ 1003NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer, 1004 RTL_WAITORTIMERCALLBACKFUNC Callback, 1005 PVOID Parameter, DWORD DueTime, DWORD Period, 1006 ULONG Flags) 1007{ 1008 NTSTATUS status; 1009 struct queue_timer *t; 1010 struct timer_queue *q = get_timer_queue(TimerQueue); 1011 1012 if (!q) return STATUS_NO_MEMORY; 1013 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE; 1014 1015 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t); 1016 if (!t) 1017 return STATUS_NO_MEMORY; 1018 1019 t->q = q; 1020 t->runcount = 0; 1021 t->callback = Callback; 1022 t->param = Parameter; 1023 t->period = Period; 1024 t->flags = Flags; 1025 t->destroy = FALSE; 1026 t->event = NULL; 1027 1028 status = STATUS_SUCCESS; 1029 RtlEnterCriticalSection(&q->cs); 1030 if (q->quit) 1031 status = STATUS_INVALID_HANDLE; 1032 else 1033 queue_add_timer(t, queue_current_time() + DueTime, TRUE); 1034 RtlLeaveCriticalSection(&q->cs); 1035 1036 if (status == STATUS_SUCCESS) 1037 *NewTimer = t; 1038 else 1039 RtlFreeHeap(GetProcessHeap(), 0, t); 1040 1041 return status; 1042} 1043 1044/*********************************************************************** 1045 * RtlUpdateTimer (NTDLL.@) 1046 * 1047 * Changes the time at which a timer expires. 1048 * 1049 * PARAMS 1050 * TimerQueue [I] The queue that holds the timer. 1051 * Timer [I] The timer to update. 1052 * DueTime [I] The delay, in milliseconds, before next firing the timer. 1053 * Period [I] The period, in milliseconds, at which to fire the timer 1054 * after the first callback. If zero, the timer will not 1055 * refire once. It still needs to be deleted with 1056 * RtlDeleteTimer. 1057 * 1058 * RETURNS 1059 * Success: STATUS_SUCCESS. 1060 * Failure: Any NTSTATUS code. 1061 */ 1062NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer, 1063 DWORD DueTime, DWORD Period) 1064{ 1065 struct queue_timer *t = Timer; 1066 struct timer_queue *q = t->q; 1067 1068 RtlEnterCriticalSection(&q->cs); 1069 /* Can't change a timer if it was once-only or destroyed. */ 1070 if (t->expire != EXPIRE_NEVER) 1071 { 1072 t->period = Period; 1073 queue_move_timer(t, queue_current_time() + DueTime, TRUE); 1074 } 1075 RtlLeaveCriticalSection(&q->cs); 1076 1077 return STATUS_SUCCESS; 1078} 1079 1080/*********************************************************************** 1081 * RtlDeleteTimer (NTDLL.@) 1082 * 1083 * Cancels a timer-queue timer. 1084 * 1085 * PARAMS 1086 * TimerQueue [I] The queue that holds the timer. 1087 * Timer [I] The timer to update. 1088 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE, 1089 * wait until the timer is finished firing all pending 1090 * callbacks before returning. Otherwise, return 1091 * immediately and set the timer is done. 1092 * 1093 * RETURNS 1094 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not, 1095 or if the completion event is NULL. 1096 * Failure: Any NTSTATUS code. 1097 */ 1098NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer, 1099 HANDLE CompletionEvent) 1100{ 1101 struct queue_timer *t = Timer; 1102 struct timer_queue *q; 1103 NTSTATUS status = STATUS_PENDING; 1104 HANDLE event = NULL; 1105 1106 if (!Timer) 1107 return STATUS_INVALID_PARAMETER_1; 1108 q = t->q; 1109 if (CompletionEvent == INVALID_HANDLE_VALUE) 1110 { 1111 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE); 1112 if (status == STATUS_SUCCESS) 1113 status = STATUS_PENDING; 1114 } 1115 else if (CompletionEvent) 1116 event = CompletionEvent; 1117 1118 RtlEnterCriticalSection(&q->cs); 1119 t->event = event; 1120 if (t->runcount == 0 && event) 1121 status = STATUS_SUCCESS; 1122 queue_destroy_timer(t); 1123 RtlLeaveCriticalSection(&q->cs); 1124 1125 if (CompletionEvent == INVALID_HANDLE_VALUE && event) 1126 { 1127 if (status == STATUS_PENDING) 1128 { 1129 NtWaitForSingleObject(event, FALSE, NULL); 1130 status = STATUS_SUCCESS; 1131 } 1132 NtClose(event); 1133 } 1134 1135 return status; 1136} 1137#endif 1138/*********************************************************************** 1139 * timerqueue_thread_proc (internal) 1140 */ 1141#ifdef __REACTOS__ 1142ULONG NTAPI timerqueue_thread_proc(PVOID param ) 1143#else 1144static void CALLBACK timerqueue_thread_proc( void *param ) 1145#endif 1146{ 1147 ULONGLONG timeout_lower, timeout_upper, new_timeout; 1148 struct threadpool_object *other_timer; 1149 LARGE_INTEGER now, timeout; 1150 struct list *ptr; 1151 1152 TRACE( "starting timer queue thread\n" ); 1153 set_thread_name(L"wine_threadpool_timerqueue"); 1154 1155 RtlEnterCriticalSection( &timerqueue.cs ); 1156 for (;;) 1157 { 1158 NtQuerySystemTime( &now ); 1159 1160 /* Check for expired timers. */ 1161 while ((ptr = list_head( &timerqueue.pending_timers ))) 1162 { 1163 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry ); 1164 assert( timer->type == TP_OBJECT_TYPE_TIMER ); 1165 assert( timer->u.timer.timer_pending ); 1166 if (timer->u.timer.timeout > now.QuadPart) 1167 break; 1168 1169 /* Queue a new callback in one of the worker threads. */ 1170 list_remove( &timer->u.timer.timer_entry ); 1171 timer->u.timer.timer_pending = FALSE; 1172 tp_object_submit( timer, FALSE ); 1173 1174 /* Insert the timer back into the queue, except it's marked for shutdown. */ 1175 if (timer->u.timer.period && !timer->shutdown) 1176 { 1177 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000; 1178 if (timer->u.timer.timeout <= now.QuadPart) 1179 timer->u.timer.timeout = now.QuadPart + 1; 1180 1181 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers, 1182 struct threadpool_object, u.timer.timer_entry ) 1183 { 1184 assert( other_timer->type == TP_OBJECT_TYPE_TIMER ); 1185 if (timer->u.timer.timeout < other_timer->u.timer.timeout) 1186 break; 1187 } 1188 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry ); 1189 timer->u.timer.timer_pending = TRUE; 1190 } 1191 } 1192 1193 timeout_lower = timeout_upper = MAXLONGLONG; 1194 1195 /* Determine next timeout and use the window length to optimize wakeup times. */ 1196 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers, 1197 struct threadpool_object, u.timer.timer_entry ) 1198 { 1199 assert( other_timer->type == TP_OBJECT_TYPE_TIMER ); 1200 if (other_timer->u.timer.timeout >= timeout_upper) 1201 break; 1202 1203 timeout_lower = other_timer->u.timer.timeout; 1204 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000; 1205 if (new_timeout < timeout_upper) 1206 timeout_upper = new_timeout; 1207 } 1208 1209 /* Wait for timer update events or until the next timer expires. */ 1210 if (timerqueue.objcount) 1211 { 1212 timeout.QuadPart = timeout_lower; 1213 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout ); 1214 continue; 1215 } 1216 1217 /* All timers have been destroyed, if no new timers are created 1218 * within some amount of time, then we can shutdown this thread. */ 1219 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000; 1220 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, 1221 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount) 1222 { 1223 break; 1224 } 1225 } 1226 1227 timerqueue.thread_running = FALSE; 1228 RtlLeaveCriticalSection( &timerqueue.cs ); 1229 1230 TRACE( "terminating timer queue thread\n" ); 1231 RtlExitUserThread( 0 ); 1232#ifdef __REACTOS__ 1233 return STATUS_SUCCESS; 1234#endif 1235} 1236 1237/*********************************************************************** 1238 * tp_new_worker_thread (internal) 1239 * 1240 * Create and account a new worker thread for the desired pool. 1241 */ 1242static NTSTATUS tp_new_worker_thread( struct threadpool *pool ) 1243{ 1244 HANDLE thread; 1245 NTSTATUS status; 1246 1247 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 1248 pool->stack_info.StackReserve, pool->stack_info.StackCommit, 1249 threadpool_worker_proc, pool, &thread, NULL ); 1250 if (status == STATUS_SUCCESS) 1251 { 1252 InterlockedIncrement( &pool->refcount ); 1253 pool->num_workers++; 1254 NtClose( thread ); 1255 } 1256 return status; 1257} 1258 1259/*********************************************************************** 1260 * tp_timerqueue_lock (internal) 1261 * 1262 * Acquires a lock on the global timerqueue. When the lock is acquired 1263 * successfully, it is guaranteed that the timer thread is running. 1264 */ 1265static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer ) 1266{ 1267 NTSTATUS status = STATUS_SUCCESS; 1268 assert( timer->type == TP_OBJECT_TYPE_TIMER ); 1269 1270 timer->u.timer.timer_initialized = FALSE; 1271 timer->u.timer.timer_pending = FALSE; 1272 timer->u.timer.timer_set = FALSE; 1273 timer->u.timer.timeout = 0; 1274 timer->u.timer.period = 0; 1275 timer->u.timer.window_length = 0; 1276 1277 RtlEnterCriticalSection( &timerqueue.cs ); 1278 1279 /* Make sure that the timerqueue thread is running. */ 1280 if (!timerqueue.thread_running) 1281 { 1282 HANDLE thread; 1283 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0, 1284 timerqueue_thread_proc, NULL, &thread, NULL ); 1285 if (status == STATUS_SUCCESS) 1286 { 1287 timerqueue.thread_running = TRUE; 1288 NtClose( thread ); 1289 } 1290 } 1291 1292 if (status == STATUS_SUCCESS) 1293 { 1294 timer->u.timer.timer_initialized = TRUE; 1295 timerqueue.objcount++; 1296 } 1297 1298 RtlLeaveCriticalSection( &timerqueue.cs ); 1299 return status; 1300} 1301 1302/*********************************************************************** 1303 * tp_timerqueue_unlock (internal) 1304 * 1305 * Releases a lock on the global timerqueue. 1306 */ 1307static void tp_timerqueue_unlock( struct threadpool_object *timer ) 1308{ 1309 assert( timer->type == TP_OBJECT_TYPE_TIMER ); 1310 1311 RtlEnterCriticalSection( &timerqueue.cs ); 1312 if (timer->u.timer.timer_initialized) 1313 { 1314 /* If timer was pending, remove it. */ 1315 if (timer->u.timer.timer_pending) 1316 { 1317 list_remove( &timer->u.timer.timer_entry ); 1318 timer->u.timer.timer_pending = FALSE; 1319 } 1320 1321 /* If the last timer object was destroyed, then wake up the thread. */ 1322 if (!--timerqueue.objcount) 1323 { 1324 assert( list_empty( &timerqueue.pending_timers ) ); 1325 RtlWakeAllConditionVariable( &timerqueue.update_event ); 1326 } 1327 1328 timer->u.timer.timer_initialized = FALSE; 1329 } 1330 RtlLeaveCriticalSection( &timerqueue.cs ); 1331} 1332 1333/*********************************************************************** 1334 * waitqueue_thread_proc (internal) 1335 */ 1336#ifdef __REACTOS__ 1337void NTAPI waitqueue_thread_proc(PVOID param ) 1338#else 1339static void CALLBACK waitqueue_thread_proc( void *param ) 1340#endif 1341{ 1342 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS]; 1343 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1]; 1344 struct waitqueue_bucket *bucket = param; 1345 struct threadpool_object *wait, *next; 1346 LARGE_INTEGER now, timeout; 1347 DWORD num_handles; 1348 NTSTATUS status; 1349 1350 TRACE( "starting wait queue thread\n" ); 1351 set_thread_name(L"wine_threadpool_waitqueue"); 1352 1353 RtlEnterCriticalSection( &waitqueue.cs ); 1354 1355 for (;;) 1356 { 1357 NtQuerySystemTime( &now ); 1358 timeout.QuadPart = MAXLONGLONG; 1359 num_handles = 0; 1360 1361 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object, 1362 u.wait.wait_entry ) 1363 { 1364 assert( wait->type == TP_OBJECT_TYPE_WAIT ); 1365 if (wait->u.wait.timeout <= now.QuadPart) 1366 { 1367 /* Wait object timed out. */ 1368 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE)) 1369 { 1370 list_remove( &wait->u.wait.wait_entry ); 1371 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); 1372 } 1373 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD))) 1374 { 1375 InterlockedIncrement( &wait->refcount ); 1376 wait->num_pending_callbacks++; 1377 RtlEnterCriticalSection( &wait->pool->cs ); 1378 tp_object_execute( wait, TRUE ); 1379 RtlLeaveCriticalSection( &wait->pool->cs ); 1380 tp_object_release( wait ); 1381 } 1382 else tp_object_submit( wait, FALSE ); 1383 } 1384 else 1385 { 1386 if (wait->u.wait.timeout < timeout.QuadPart) 1387 timeout.QuadPart = wait->u.wait.timeout; 1388 1389 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS ); 1390 InterlockedIncrement( &wait->refcount ); 1391 objects[num_handles] = wait; 1392 handles[num_handles] = wait->u.wait.handle; 1393 num_handles++; 1394 } 1395 } 1396 1397 if (!bucket->objcount) 1398 { 1399 /* All wait objects have been destroyed, if no new wait objects are created 1400 * within some amount of time, then we can shutdown this thread. */ 1401 assert( num_handles == 0 ); 1402 RtlLeaveCriticalSection( &waitqueue.cs ); 1403 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000; 1404 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout ); 1405 RtlEnterCriticalSection( &waitqueue.cs ); 1406 1407 if (status == STATUS_TIMEOUT && !bucket->objcount) 1408 break; 1409 } 1410 else 1411 { 1412 handles[num_handles] = bucket->update_event; 1413 RtlLeaveCriticalSection( &waitqueue.cs ); 1414 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout ); 1415 RtlEnterCriticalSection( &waitqueue.cs ); 1416 1417 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles) 1418 { 1419 wait = objects[status - STATUS_WAIT_0]; 1420 assert( wait->type == TP_OBJECT_TYPE_WAIT ); 1421 if (wait->u.wait.bucket) 1422 { 1423 /* Wait object signaled. */ 1424 assert( wait->u.wait.bucket == bucket ); 1425 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE)) 1426 { 1427 list_remove( &wait->u.wait.wait_entry ); 1428 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); 1429 } 1430 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD))) 1431 { 1432 wait->u.wait.signaled++; 1433 wait->num_pending_callbacks++; 1434 RtlEnterCriticalSection( &wait->pool->cs ); 1435 tp_object_execute( wait, TRUE ); 1436 RtlLeaveCriticalSection( &wait->pool->cs ); 1437 } 1438 else tp_object_submit( wait, TRUE ); 1439 } 1440 else 1441 WARN("wait object %p triggered while object was destroyed\n", wait); 1442 } 1443 1444 /* Release temporary references to wait objects. */ 1445 while (num_handles) 1446 { 1447 wait = objects[--num_handles]; 1448 assert( wait->type == TP_OBJECT_TYPE_WAIT ); 1449 tp_object_release( wait ); 1450 } 1451 } 1452 1453 /* Try to merge bucket with other threads. */ 1454 if (waitqueue.num_buckets > 1 && bucket->objcount && 1455 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3) 1456 { 1457 struct waitqueue_bucket *other_bucket; 1458 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry ) 1459 { 1460 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable && 1461 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3) 1462 { 1463 other_bucket->objcount += bucket->objcount; 1464 bucket->objcount = 0; 1465 1466 /* Update reserved list. */ 1467 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry ) 1468 { 1469 assert( wait->type == TP_OBJECT_TYPE_WAIT ); 1470 wait->u.wait.bucket = other_bucket; 1471 } 1472 list_move_tail( &other_bucket->reserved, &bucket->reserved ); 1473 1474 /* Update waiting list. */ 1475 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry ) 1476 { 1477 assert( wait->type == TP_OBJECT_TYPE_WAIT ); 1478 wait->u.wait.bucket = other_bucket; 1479 } 1480 list_move_tail( &other_bucket->waiting, &bucket->waiting ); 1481 1482 /* Move bucket to the end, to keep the probability of 1483 * newly added wait objects as small as possible. */ 1484 list_remove( &bucket->bucket_entry ); 1485 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry ); 1486 1487 NtSetEvent( other_bucket->update_event, NULL ); 1488 break; 1489 } 1490 } 1491 } 1492 } 1493 1494 /* Remove this bucket from the list. */ 1495 list_remove( &bucket->bucket_entry ); 1496 if (!--waitqueue.num_buckets) 1497 assert( list_empty( &waitqueue.buckets ) ); 1498 1499 RtlLeaveCriticalSection( &waitqueue.cs ); 1500 1501 TRACE( "terminating wait queue thread\n" ); 1502 1503 assert( bucket->objcount == 0 ); 1504 assert( list_empty( &bucket->reserved ) ); 1505 assert( list_empty( &bucket->waiting ) ); 1506 NtClose( bucket->update_event ); 1507 1508 RtlFreeHeap( GetProcessHeap(), 0, bucket ); 1509 RtlExitUserThread( 0 ); 1510} 1511 1512/*********************************************************************** 1513 * tp_waitqueue_lock (internal) 1514 */ 1515static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait ) 1516{ 1517 struct waitqueue_bucket *bucket; 1518 NTSTATUS status; 1519 HANDLE thread; 1520 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0; 1521 assert( wait->type == TP_OBJECT_TYPE_WAIT ); 1522 1523 wait->u.wait.signaled = 0; 1524 wait->u.wait.bucket = NULL; 1525 wait->u.wait.wait_pending = FALSE; 1526 wait->u.wait.timeout = 0; 1527 wait->u.wait.handle = INVALID_HANDLE_VALUE; 1528 1529 RtlEnterCriticalSection( &waitqueue.cs ); 1530 1531 /* Try to assign to existing bucket if possible. */ 1532 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry ) 1533 { 1534 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable) 1535 { 1536 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); 1537 wait->u.wait.bucket = bucket; 1538 bucket->objcount++; 1539 1540 status = STATUS_SUCCESS; 1541 goto out; 1542 } 1543 } 1544 1545 /* Create a new bucket and corresponding worker thread. */ 1546 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) ); 1547 if (!bucket) 1548 { 1549 status = STATUS_NO_MEMORY; 1550 goto out; 1551 } 1552 1553 bucket->objcount = 0; 1554 bucket->alertable = alertable; 1555 list_init( &bucket->reserved ); 1556 list_init( &bucket->waiting ); 1557 1558 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS, 1559 NULL, SynchronizationEvent, FALSE ); 1560 if (status) 1561 { 1562 RtlFreeHeap( GetProcessHeap(), 0, bucket ); 1563 goto out; 1564 } 1565 1566 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0, 1567 (PTHREAD_START_ROUTINE)waitqueue_thread_proc, bucket, &thread, NULL ); 1568 if (status == STATUS_SUCCESS) 1569 { 1570 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry ); 1571 waitqueue.num_buckets++; 1572 1573 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); 1574 wait->u.wait.bucket = bucket; 1575 bucket->objcount++; 1576 1577 NtClose( thread ); 1578 } 1579 else 1580 { 1581 NtClose( bucket->update_event ); 1582 RtlFreeHeap( GetProcessHeap(), 0, bucket ); 1583 } 1584 1585out: 1586 RtlLeaveCriticalSection( &waitqueue.cs ); 1587 return status; 1588} 1589 1590/*********************************************************************** 1591 * tp_waitqueue_unlock (internal) 1592 */ 1593static void tp_waitqueue_unlock( struct threadpool_object *wait ) 1594{ 1595 assert( wait->type == TP_OBJECT_TYPE_WAIT ); 1596 1597 RtlEnterCriticalSection( &waitqueue.cs ); 1598 if (wait->u.wait.bucket) 1599 { 1600 struct waitqueue_bucket *bucket = wait->u.wait.bucket; 1601 assert( bucket->objcount > 0 ); 1602 1603 list_remove( &wait->u.wait.wait_entry ); 1604 wait->u.wait.bucket = NULL; 1605 bucket->objcount--; 1606 1607 NtSetEvent( bucket->update_event, NULL ); 1608 } 1609 RtlLeaveCriticalSection( &waitqueue.cs ); 1610} 1611 1612#ifdef __REACTOS__ 1613ULONG NTAPI ioqueue_thread_proc(PVOID param ) 1614#else 1615static void CALLBACK ioqueue_thread_proc( void *param ) 1616#endif 1617{ 1618 struct io_completion *completion; 1619 struct threadpool_object *io; 1620 IO_STATUS_BLOCK iosb; 1621#ifdef __REACTOS__ 1622 PVOID key, value; 1623#else 1624 ULONG_PTR key, value; 1625#endif 1626 BOOL destroy, skip; 1627 NTSTATUS status; 1628 1629 TRACE( "starting I/O completion thread\n" ); 1630 set_thread_name(L"wine_threadpool_ioqueue"); 1631 1632 RtlEnterCriticalSection( &ioqueue.cs ); 1633 1634 for (;;) 1635 { 1636 RtlLeaveCriticalSection( &ioqueue.cs ); 1637 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL ))) 1638 ERR("NtRemoveIoCompletion failed, status %#lx.\n", status); 1639 RtlEnterCriticalSection( &ioqueue.cs ); 1640 1641 destroy = skip = FALSE; 1642 io = (struct threadpool_object *)key; 1643 1644 TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status ); 1645 1646 if (io && (io->shutdown || io->u.io.shutting_down)) 1647 { 1648 RtlEnterCriticalSection( &io->pool->cs ); 1649 if (!io->u.io.pending_count) 1650 { 1651 if (io->u.io.skipped_count) 1652 --io->u.io.skipped_count; 1653 1654 if (io->u.io.skipped_count) 1655 skip = TRUE; 1656 else 1657 destroy = TRUE; 1658 } 1659 RtlLeaveCriticalSection( &io->pool->cs ); 1660 if (skip) continue; 1661 } 1662 1663 if (destroy) 1664 { 1665 --ioqueue.objcount; 1666 TRACE( "Releasing io %p.\n", io ); 1667 io->shutdown = TRUE; 1668 tp_object_release( io ); 1669 } 1670 else if (io) 1671 { 1672 RtlEnterCriticalSection( &io->pool->cs ); 1673 1674 TRACE( "pending_count %u.\n", io->u.io.pending_count ); 1675 1676 if (io->u.io.pending_count) 1677 { 1678 --io->u.io.pending_count; 1679 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max, 1680 io->u.io.completion_count + 1, sizeof(*io->u.io.completions))) 1681 { 1682 ERR( "Failed to allocate memory.\n" ); 1683 RtlLeaveCriticalSection( &io->pool->cs ); 1684 continue; 1685 } 1686 1687 completion = &io->u.io.completions[io->u.io.completion_count++]; 1688 completion->iosb = iosb; 1689#ifdef __REACTOS__ 1690 completion->cvalue = (ULONG_PTR)value; 1691#else 1692 completion->cvalue = value; 1693#endif 1694 1695 tp_object_submit( io, FALSE ); 1696 } 1697 RtlLeaveCriticalSection( &io->pool->cs ); 1698 } 1699 1700 if (!ioqueue.objcount) 1701 { 1702 /* All I/O objects have been destroyed; if no new objects are 1703 * created within some amount of time, then we can shutdown this 1704 * thread. */ 1705 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000}; 1706 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs, 1707 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount) 1708 break; 1709 } 1710 } 1711 1712 ioqueue.thread_running = FALSE; 1713 RtlLeaveCriticalSection( &ioqueue.cs ); 1714 1715 TRACE( "terminating I/O completion thread\n" ); 1716 1717 RtlExitUserThread( 0 ); 1718 1719#ifdef __REACTOS__ 1720 return STATUS_SUCCESS; 1721#endif 1722} 1723 1724static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file ) 1725{ 1726 NTSTATUS status = STATUS_SUCCESS; 1727 1728 assert( io->type == TP_OBJECT_TYPE_IO ); 1729 1730 RtlEnterCriticalSection( &ioqueue.cs ); 1731 1732 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port, 1733 IO_COMPLETION_ALL_ACCESS, NULL, 0 ))) 1734 { 1735 RtlLeaveCriticalSection( &ioqueue.cs ); 1736 return status; 1737 } 1738 1739 if (!ioqueue.thread_running) 1740 { 1741 HANDLE thread; 1742 1743 if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 1744 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL ))) 1745 { 1746 ioqueue.thread_running = TRUE; 1747 NtClose( thread ); 1748 } 1749 } 1750 1751 if (status == STATUS_SUCCESS) 1752 { 1753 FILE_COMPLETION_INFORMATION info; 1754 IO_STATUS_BLOCK iosb; 1755 1756#ifdef __REACTOS__ 1757 info.Port = ioqueue.port; 1758 info.Key = io; 1759#else 1760 info.CompletionPort = ioqueue.port; 1761 info.CompletionKey = (ULONG_PTR)io; 1762#endif 1763 1764 status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation ); 1765 } 1766 1767 if (status == STATUS_SUCCESS) 1768 { 1769 if (!ioqueue.objcount++) 1770 RtlWakeConditionVariable( &ioqueue.update_event ); 1771 } 1772 1773 RtlLeaveCriticalSection( &ioqueue.cs ); 1774 return status; 1775} 1776 1777/*********************************************************************** 1778 * tp_threadpool_alloc (internal) 1779 * 1780 * Allocates a new threadpool object. 1781 */ 1782static NTSTATUS tp_threadpool_alloc( struct threadpool **out ) 1783{ 1784#ifdef __REACTOS__ 1785 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->ProcessEnvironmentBlock->ImageBaseAddress ); 1786#else 1787 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress ); 1788#endif 1789 struct threadpool *pool; 1790 unsigned int i; 1791 1792 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) ); 1793 if (!pool) 1794 return STATUS_NO_MEMORY; 1795 1796 pool->refcount = 1; 1797 pool->objcount = 0; 1798 pool->shutdown = FALSE; 1799 1800#ifdef __REACTOS__ 1801 RtlInitializeCriticalSection( &pool->cs ); 1802#else 1803 RtlInitializeCriticalSectionEx( &pool->cs, 0, RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO ); 1804 1805 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs"); 1806#endif 1807 1808 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i) 1809 list_init( &pool->pools[i] ); 1810 RtlInitializeConditionVariable( &pool->update_event ); 1811 1812 pool->max_workers = 500; 1813 pool->min_workers = 0; 1814 pool->num_workers = 0; 1815 pool->num_busy_workers = 0; 1816 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve; 1817 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit; 1818 1819 TRACE( "allocated threadpool %p\n", pool ); 1820 1821 *out = pool; 1822 return STATUS_SUCCESS; 1823} 1824 1825/*********************************************************************** 1826 * tp_threadpool_shutdown (internal) 1827 * 1828 * Prepares the shutdown of a threadpool object and notifies all worker 1829 * threads to terminate (after all remaining work items have been 1830 * processed). 1831 */ 1832static void tp_threadpool_shutdown( struct threadpool *pool ) 1833{ 1834 assert( pool != default_threadpool ); 1835 1836 pool->shutdown = TRUE; 1837 RtlWakeAllConditionVariable( &pool->update_event ); 1838} 1839 1840/*********************************************************************** 1841 * tp_threadpool_release (internal) 1842 * 1843 * Releases a reference to a threadpool object. 1844 */ 1845static BOOL tp_threadpool_release( struct threadpool *pool ) 1846{ 1847 unsigned int i; 1848 1849 if (InterlockedDecrement( &pool->refcount )) 1850 return FALSE; 1851 1852 TRACE( "destroying threadpool %p\n", pool ); 1853 1854 assert( pool->shutdown ); 1855 assert( !pool->objcount ); 1856 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i) 1857 assert( list_empty( &pool->pools[i] ) ); 1858#ifndef __REACTOS__ 1859 pool->cs.DebugInfo->Spare[0] = 0; 1860#endif 1861 RtlDeleteCriticalSection( &pool->cs ); 1862 1863 RtlFreeHeap( GetProcessHeap(), 0, pool ); 1864 return TRUE; 1865} 1866 1867/*********************************************************************** 1868 * tp_threadpool_lock (internal) 1869 * 1870 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON 1871 * block. When the lock is acquired successfully, it is guaranteed that 1872 * there is at least one worker thread to process tasks. 1873 */ 1874static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment ) 1875{ 1876 struct threadpool *pool = NULL; 1877 NTSTATUS status = STATUS_SUCCESS; 1878 1879 if (environment) 1880 { 1881#ifndef __REACTOS__ //Windows 7 stuff 1882 /* Validate environment parameters. */ 1883 if (environment->Version == 3) 1884 { 1885 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment; 1886 1887 switch (environment3->CallbackPriority) 1888 { 1889 case TP_CALLBACK_PRIORITY_HIGH: 1890 case TP_CALLBACK_PRIORITY_NORMAL: 1891 case TP_CALLBACK_PRIORITY_LOW: 1892 break; 1893 default: 1894 return STATUS_INVALID_PARAMETER; 1895 } 1896 } 1897#endif 1898 pool = (struct threadpool *)environment->Pool; 1899 } 1900 1901 if (!pool) 1902 { 1903 if (!default_threadpool) 1904 { 1905 status = tp_threadpool_alloc( &pool ); 1906 if (status != STATUS_SUCCESS) 1907 return status; 1908 1909 if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool, NULL ) != NULL) 1910 { 1911 tp_threadpool_shutdown( pool ); 1912 tp_threadpool_release( pool ); 1913 } 1914 } 1915 1916 pool = default_threadpool; 1917 } 1918 1919 RtlEnterCriticalSection( &pool->cs ); 1920 1921 /* Make sure that the threadpool has at least one thread. */ 1922 if (!pool->num_workers) 1923 status = tp_new_worker_thread( pool ); 1924 1925 /* Keep a reference, and increment objcount to ensure that the 1926 * last thread doesn't terminate. */ 1927 if (status == STATUS_SUCCESS) 1928 { 1929 InterlockedIncrement( &pool->refcount ); 1930 pool->objcount++; 1931 } 1932 1933 RtlLeaveCriticalSection( &pool->cs ); 1934 1935 if (status != STATUS_SUCCESS) 1936 return status; 1937 1938 *out = pool; 1939 return STATUS_SUCCESS; 1940} 1941 1942/*********************************************************************** 1943 * tp_threadpool_unlock (internal) 1944 * 1945 * Releases a lock on a threadpool. 1946 */ 1947static void tp_threadpool_unlock( struct threadpool *pool ) 1948{ 1949 RtlEnterCriticalSection( &pool->cs ); 1950 pool->objcount--; 1951 RtlLeaveCriticalSection( &pool->cs ); 1952 tp_threadpool_release( pool ); 1953} 1954 1955/*********************************************************************** 1956 * tp_group_alloc (internal) 1957 * 1958 * Allocates a new threadpool group object. 1959 */ 1960static NTSTATUS tp_group_alloc( struct threadpool_group **out ) 1961{ 1962 struct threadpool_group *group; 1963 1964 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) ); 1965 if (!group) 1966 return STATUS_NO_MEMORY; 1967 1968 group->refcount = 1; 1969 group->shutdown = FALSE; 1970 1971#ifdef __REACTOS__ 1972 RtlInitializeCriticalSection( &group->cs ); 1973#else 1974 RtlInitializeCriticalSectionEx( &group->cs, 0, RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO ); 1975 1976 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs"); 1977#endif 1978 1979 list_init( &group->members ); 1980 1981 TRACE( "allocated group %p\n", group ); 1982 1983 *out = group; 1984 return STATUS_SUCCESS; 1985} 1986 1987/*********************************************************************** 1988 * tp_group_shutdown (internal) 1989 * 1990 * Marks the group object for shutdown. 1991 */ 1992static void tp_group_shutdown( struct threadpool_group *group ) 1993{ 1994 group->shutdown = TRUE; 1995} 1996 1997/*********************************************************************** 1998 * tp_group_release (internal) 1999 * 2000 * Releases a reference to a group object. 2001 */ 2002static BOOL tp_group_release( struct threadpool_group *group ) 2003{ 2004 if (InterlockedDecrement( &group->refcount )) 2005 return FALSE; 2006 2007 TRACE( "destroying group %p\n", group ); 2008 2009 assert( group->shutdown ); 2010 assert( list_empty( &group->members ) ); 2011 2012#ifndef __REACTOS__ 2013 group->cs.DebugInfo->Spare[0] = 0; 2014#endif 2015 RtlDeleteCriticalSection( &group->cs ); 2016 2017 RtlFreeHeap( GetProcessHeap(), 0, group ); 2018 return TRUE; 2019} 2020 2021/*********************************************************************** 2022 * tp_object_initialize (internal) 2023 * 2024 * Initializes members of a threadpool object. 2025 */ 2026static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool, 2027 PVOID userdata, TP_CALLBACK_ENVIRON *environment ) 2028{ 2029 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE); 2030 2031 object->refcount = 1; 2032 object->shutdown = FALSE; 2033 2034 object->pool = pool; 2035 object->group = NULL; 2036 object->userdata = userdata; 2037 object->group_cancel_callback = NULL; 2038 object->finalization_callback = NULL; 2039 object->may_run_long = 0; 2040 object->race_dll = NULL; 2041 object->priority = TP_CALLBACK_PRIORITY_NORMAL; 2042 2043 memset( &object->group_entry, 0, sizeof(object->group_entry) ); 2044 object->is_group_member = FALSE; 2045 2046 memset( &object->pool_entry, 0, sizeof(object->pool_entry) ); 2047 RtlInitializeConditionVariable( &object->finished_event ); 2048 RtlInitializeConditionVariable( &object->group_finished_event ); 2049 object->completed_event = NULL; 2050 object->num_pending_callbacks = 0; 2051 object->num_running_callbacks = 0; 2052 object->num_associated_callbacks = 0; 2053 2054 if (environment) 2055 { 2056 if (environment->Version != 1 && environment->Version != 3) 2057 FIXME( "unsupported environment version %lu\n", environment->Version ); 2058 2059 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup ); 2060 object->group_cancel_callback = environment->CleanupGroupCancelCallback; 2061 object->finalization_callback = environment->FinalizationCallback; 2062 object->may_run_long = environment->u.s.LongFunction != 0; 2063 object->race_dll = environment->RaceDll; 2064#ifndef __REACTOS__ //Windows 7 stuff 2065 if (environment->Version == 3) 2066 { 2067 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment; 2068 2069 object->priority = environment_v3->CallbackPriority; 2070 assert( object->priority < ARRAY_SIZE(pool->pools) ); 2071 } 2072#endif 2073 if (environment->ActivationContext) 2074 FIXME( "activation context not supported yet\n" ); 2075 2076 if (environment->u.s.Persistent) 2077 FIXME( "persistent threads not supported yet\n" ); 2078 } 2079 2080 if (object->race_dll) 2081 LdrAddRefDll( 0, object->race_dll ); 2082 2083 TRACE( "allocated object %p of type %u\n", object, object->type ); 2084 2085 /* For simple callbacks we have to run tp_object_submit before adding this object 2086 * to the cleanup group. As soon as the cleanup group members are released ->shutdown 2087 * will be set, and tp_object_submit would fail with an assertion. */ 2088 2089 if (is_simple_callback) 2090 tp_object_submit( object, FALSE ); 2091 2092 if (object->group) 2093 { 2094 struct threadpool_group *group = object->group; 2095 InterlockedIncrement( &group->refcount ); 2096 2097 RtlEnterCriticalSection( &group->cs ); 2098 list_add_tail( &group->members, &object->group_entry ); 2099 object->is_group_member = TRUE; 2100 RtlLeaveCriticalSection( &group->cs ); 2101 } 2102 2103 if (is_simple_callback) 2104 tp_object_release( object ); 2105} 2106 2107static void tp_object_prio_queue( struct threadpool_object *object ) 2108{ 2109 ++object->pool->num_busy_workers; 2110 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry ); 2111} 2112 2113/*********************************************************************** 2114 * tp_object_submit (internal) 2115 * 2116 * Submits a threadpool object to the associated threadpool. This 2117 * function has to be VOID because TpPostWork can never fail on Windows. 2118 */ 2119static void tp_object_submit( struct threadpool_object *object, BOOL signaled ) 2120{ 2121 struct threadpool *pool = object->pool; 2122 NTSTATUS status = STATUS_UNSUCCESSFUL; 2123 2124 assert( !object->shutdown ); 2125 assert( !pool->shutdown ); 2126 2127 RtlEnterCriticalSection( &pool->cs ); 2128 2129 /* Start new worker threads if required. */ 2130 if (pool->num_busy_workers >= pool->num_workers && 2131 pool->num_workers < pool->max_workers) 2132 status = tp_new_worker_thread( pool ); 2133 2134 /* Queue work item and increment refcount. */ 2135 InterlockedIncrement( &object->refcount ); 2136 if (!object->num_pending_callbacks++) 2137 tp_object_prio_queue( object ); 2138 2139 /* Count how often the object was signaled. */ 2140 if (object->type == TP_OBJECT_TYPE_WAIT && signaled) 2141 object->u.wait.signaled++; 2142 2143 /* No new thread started - wake up one existing thread. */ 2144 if (status != STATUS_SUCCESS) 2145 { 2146 assert( pool->num_workers > 0 ); 2147 RtlWakeConditionVariable( &pool->update_event ); 2148 } 2149 2150 RtlLeaveCriticalSection( &pool->cs ); 2151} 2152 2153/*********************************************************************** 2154 * tp_object_cancel (internal) 2155 * 2156 * Cancels all currently pending callbacks for a specific object. 2157 */ 2158static void tp_object_cancel( struct threadpool_object *object ) 2159{ 2160 struct threadpool *pool = object->pool; 2161 LONG pending_callbacks = 0; 2162 2163 RtlEnterCriticalSection( &pool->cs ); 2164 if (object->num_pending_callbacks) 2165 { 2166 pending_callbacks = object->num_pending_callbacks; 2167 object->num_pending_callbacks = 0; 2168 list_remove( &object->pool_entry ); 2169 2170 if (object->type == TP_OBJECT_TYPE_WAIT) 2171 object->u.wait.signaled = 0; 2172 } 2173 if (object->type == TP_OBJECT_TYPE_IO) 2174 { 2175 object->u.io.skipped_count += object->u.io.pending_count; 2176 object->u.io.pending_count = 0; 2177 } 2178 RtlLeaveCriticalSection( &pool->cs ); 2179 2180 while (pending_callbacks--) 2181 tp_object_release( object ); 2182} 2183 2184static BOOL object_is_finished( struct threadpool_object *object, BOOL group ) 2185{ 2186 if (object->num_pending_callbacks) 2187 return FALSE; 2188 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count) 2189 return FALSE; 2190 2191 if (group) 2192 return !object->num_running_callbacks; 2193 else 2194 return !object->num_associated_callbacks; 2195} 2196 2197/*********************************************************************** 2198 * tp_object_wait (internal) 2199 * 2200 * Waits until all pending and running callbacks of a specific object 2201 * have been processed. 2202 */ 2203static void tp_object_wait( struct threadpool_object *object, BOOL group_wait ) 2204{ 2205 struct threadpool *pool = object->pool; 2206 2207 RtlEnterCriticalSection( &pool->cs ); 2208 while (!object_is_finished( object, group_wait )) 2209 { 2210 if (group_wait) 2211 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL ); 2212 else 2213 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL ); 2214 } 2215 RtlLeaveCriticalSection( &pool->cs ); 2216} 2217 2218static void tp_ioqueue_unlock( struct threadpool_object *io ) 2219{ 2220 assert( io->type == TP_OBJECT_TYPE_IO ); 2221 2222 RtlEnterCriticalSection( &ioqueue.cs ); 2223 2224 assert(ioqueue.objcount); 2225 2226 if (!io->shutdown && !--ioqueue.objcount) 2227 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 ); 2228 2229 RtlLeaveCriticalSection( &ioqueue.cs ); 2230} 2231 2232/*********************************************************************** 2233 * tp_object_prepare_shutdown (internal) 2234 * 2235 * Prepares a threadpool object for shutdown. 2236 */ 2237static void tp_object_prepare_shutdown( struct threadpool_object *object ) 2238{ 2239 if (object->type == TP_OBJECT_TYPE_TIMER) 2240 tp_timerqueue_unlock( object ); 2241 else if (object->type == TP_OBJECT_TYPE_WAIT) 2242 tp_waitqueue_unlock( object ); 2243 else if (object->type == TP_OBJECT_TYPE_IO) 2244 tp_ioqueue_unlock( object ); 2245} 2246 2247/*********************************************************************** 2248 * tp_object_release (internal) 2249 * 2250 * Releases a reference to a threadpool object. 2251 */ 2252static BOOL tp_object_release( struct threadpool_object *object ) 2253{ 2254 if (InterlockedDecrement( &object->refcount )) 2255 return FALSE; 2256 2257 TRACE( "destroying object %p of type %u\n", object, object->type ); 2258 2259 assert( object->shutdown ); 2260 assert( !object->num_pending_callbacks ); 2261 assert( !object->num_running_callbacks ); 2262 assert( !object->num_associated_callbacks ); 2263 2264 /* release reference to the group */ 2265 if (object->group) 2266 { 2267 struct threadpool_group *group = object->group; 2268 2269 RtlEnterCriticalSection( &group->cs ); 2270 if (object->is_group_member) 2271 { 2272 list_remove( &object->group_entry ); 2273 object->is_group_member = FALSE; 2274 } 2275 RtlLeaveCriticalSection( &group->cs ); 2276 2277 tp_group_release( group ); 2278 } 2279 2280 tp_threadpool_unlock( object->pool ); 2281 2282 if (object->race_dll) 2283 LdrUnloadDll( object->race_dll ); 2284 2285 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE) 2286 NtSetEvent( object->completed_event, NULL ); 2287 2288 RtlFreeHeap( GetProcessHeap(), 0, object ); 2289 return TRUE; 2290} 2291 2292static struct list *threadpool_get_next_item( const struct threadpool *pool ) 2293{ 2294 struct list *ptr; 2295 unsigned int i; 2296 2297 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i) 2298 { 2299 if ((ptr = list_head( &pool->pools[i] ))) 2300 break; 2301 } 2302 2303 return ptr; 2304} 2305 2306/*********************************************************************** 2307 * tp_object_execute (internal) 2308 * 2309 * Executes a threadpool object callback, object->pool->cs has to be 2310 * held. 2311 */ 2312static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread ) 2313{ 2314 TP_CALLBACK_INSTANCE *callback_instance; 2315 struct threadpool_instance instance; 2316 struct io_completion completion; 2317 struct threadpool *pool = object->pool; 2318 TP_WAIT_RESULT wait_result = 0; 2319 NTSTATUS status; 2320 2321 object->num_pending_callbacks--; 2322 2323 /* For wait objects check if they were signaled or have timed out. */ 2324 if (object->type == TP_OBJECT_TYPE_WAIT) 2325 { 2326 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT; 2327 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--; 2328 } 2329 else if (object->type == TP_OBJECT_TYPE_IO) 2330 { 2331 assert( object->u.io.completion_count ); 2332 completion = object->u.io.completions[--object->u.io.completion_count]; 2333 } 2334 2335 /* Leave critical section and do the actual callback. */ 2336 object->num_associated_callbacks++; 2337 object->num_running_callbacks++; 2338 RtlLeaveCriticalSection( &pool->cs ); 2339 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs ); 2340 2341 /* Initialize threadpool instance struct. */ 2342 callback_instance = (TP_CALLBACK_INSTANCE *)&instance; 2343 instance.object = object; 2344 instance.threadid = GetCurrentThreadId(); 2345 instance.associated = TRUE; 2346 instance.may_run_long = object->may_run_long; 2347 instance.cleanup.critical_section = NULL; 2348 instance.cleanup.mutex = NULL; 2349 instance.cleanup.semaphore = NULL; 2350 instance.cleanup.semaphore_count = 0; 2351 instance.cleanup.event = NULL; 2352 instance.cleanup.library = NULL; 2353 2354 switch (object->type) 2355 { 2356 case TP_OBJECT_TYPE_SIMPLE: 2357 { 2358 TRACE( "executing simple callback %p(%p, %p)\n", 2359 object->u.simple.callback, callback_instance, object->userdata ); 2360 object->u.simple.callback( callback_instance, object->userdata ); 2361 TRACE( "callback %p returned\n", object->u.simple.callback ); 2362 break; 2363 } 2364 2365 case TP_OBJECT_TYPE_WORK: 2366 { 2367 TRACE( "executing work callback %p(%p, %p, %p)\n", 2368 object->u.work.callback, callback_instance, object->userdata, object ); 2369 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object ); 2370 TRACE( "callback %p returned\n", object->u.work.callback ); 2371 break; 2372 } 2373 2374 case TP_OBJECT_TYPE_TIMER: 2375 { 2376 TRACE( "executing timer callback %p(%p, %p, %p)\n", 2377 object->u.timer.callback, callback_instance, object->userdata, object ); 2378 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object ); 2379 TRACE( "callback %p returned\n", object->u.timer.callback ); 2380 break; 2381 } 2382 2383 case TP_OBJECT_TYPE_WAIT: 2384 { 2385 TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n", 2386 object->u.wait.callback, callback_instance, object->userdata, object, wait_result ); 2387 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result ); 2388 TRACE( "callback %p returned\n", object->u.wait.callback ); 2389 break; 2390 } 2391 2392 case TP_OBJECT_TYPE_IO: 2393 { 2394 TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n", 2395 object->u.io.callback, callback_instance, object->userdata, 2396 completion.cvalue, &completion.iosb, (TP_IO *)object ); 2397 object->u.io.callback( callback_instance, object->userdata, 2398 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object ); 2399 TRACE( "callback %p returned\n", object->u.io.callback ); 2400 break; 2401 } 2402 2403 default: 2404 assert(0); 2405 break; 2406 } 2407 2408 /* Execute finalization callback. */ 2409 if (object->finalization_callback) 2410 { 2411 TRACE( "executing finalization callback %p(%p, %p)\n", 2412 object->finalization_callback, callback_instance, object->userdata ); 2413 object->finalization_callback( callback_instance, object->userdata ); 2414 TRACE( "callback %p returned\n", object->finalization_callback ); 2415 } 2416 2417 /* Execute cleanup tasks. */ 2418 if (instance.cleanup.critical_section) 2419 { 2420 RtlLeaveCriticalSection( instance.cleanup.critical_section ); 2421 } 2422 if (instance.cleanup.mutex) 2423 { 2424 status = NtReleaseMutant( instance.cleanup.mutex, NULL ); 2425 if (status != STATUS_SUCCESS) goto skip_cleanup; 2426 } 2427 if (instance.cleanup.semaphore) 2428 { 2429 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL ); 2430 if (status != STATUS_SUCCESS) goto skip_cleanup; 2431 } 2432 if (instance.cleanup.event) 2433 { 2434 status = NtSetEvent( instance.cleanup.event, NULL ); 2435 if (status != STATUS_SUCCESS) goto skip_cleanup; 2436 } 2437 if (instance.cleanup.library) 2438 { 2439 LdrUnloadDll( instance.cleanup.library ); 2440 } 2441 2442skip_cleanup: 2443 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs ); 2444 RtlEnterCriticalSection( &pool->cs ); 2445 2446 /* Simple callbacks are automatically shutdown after execution. */ 2447 if (object->type == TP_OBJECT_TYPE_SIMPLE) 2448 { 2449 tp_object_prepare_shutdown( object ); 2450 object->shutdown = TRUE; 2451 } 2452 2453 object->num_running_callbacks--; 2454 if (object_is_finished( object, TRUE )) 2455 RtlWakeAllConditionVariable( &object->group_finished_event ); 2456 2457 if (instance.associated) 2458 { 2459 object->num_associated_callbacks--; 2460 if (object_is_finished( object, FALSE )) 2461 RtlWakeAllConditionVariable( &object->finished_event ); 2462 } 2463} 2464 2465/*********************************************************************** 2466 * threadpool_worker_proc (internal) 2467 */ 2468#ifdef __REACTOS__ 2469ULONG NTAPI threadpool_worker_proc(PVOID param ) 2470#else 2471static void CALLBACK threadpool_worker_proc( void *param ) 2472#endif 2473{ 2474 struct threadpool *pool = param; 2475 LARGE_INTEGER timeout; 2476 struct list *ptr; 2477 2478 TRACE( "starting worker thread for pool %p\n", pool ); 2479 set_thread_name(L"wine_threadpool_worker"); 2480 2481 RtlEnterCriticalSection( &pool->cs ); 2482 for (;;) 2483 { 2484 while ((ptr = threadpool_get_next_item( pool ))) 2485 { 2486 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry ); 2487 assert( object->num_pending_callbacks > 0 ); 2488 2489 /* If further pending callbacks are queued, move the work item to 2490 * the end of the pool list. Otherwise remove it from the pool. */ 2491 list_remove( &object->pool_entry ); 2492 if (object->num_pending_callbacks > 1) 2493 tp_object_prio_queue( object ); 2494 2495 tp_object_execute( object, FALSE ); 2496 2497 assert(pool->num_busy_workers); 2498 pool->num_busy_workers--; 2499 2500 tp_object_release( object ); 2501 } 2502 2503 /* Shutdown worker thread if requested. */ 2504 if (pool->shutdown) 2505 break; 2506 2507 /* Wait for new tasks or until the timeout expires. A thread only terminates 2508 * when no new tasks are available, and the number of threads can be 2509 * decreased without violating the min_workers limit. An exception is when 2510 * min_workers == 0, then objcount is used to detect if the last thread 2511 * can be terminated. */ 2512 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000; 2513 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT && 2514 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) || 2515 (!pool->min_workers && !pool->objcount))) 2516 { 2517 break; 2518 } 2519 } 2520 pool->num_workers--; 2521 RtlLeaveCriticalSection( &pool->cs ); 2522 2523 TRACE( "terminating worker thread for pool %p\n", pool ); 2524 tp_threadpool_release( pool ); 2525 RtlExitUserThread( 0 ); 2526#ifdef __REACTOS__ 2527 return STATUS_SUCCESS; 2528#endif 2529} 2530 2531/*********************************************************************** 2532 * TpAllocCleanupGroup (NTDLL.@) 2533 */ 2534NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out ) 2535{ 2536 TRACE( "%p\n", out ); 2537 2538 return tp_group_alloc( (struct threadpool_group **)out ); 2539} 2540 2541/*********************************************************************** 2542 * TpAllocIoCompletion (NTDLL.@) 2543 */ 2544NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback, 2545 void *userdata, TP_CALLBACK_ENVIRON *environment ) 2546{ 2547 struct threadpool_object *object; 2548 struct threadpool *pool; 2549 NTSTATUS status; 2550 2551 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment ); 2552 2553 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) ))) 2554 return STATUS_NO_MEMORY; 2555 2556 if ((status = tp_threadpool_lock( &pool, environment ))) 2557 { 2558 RtlFreeHeap( GetProcessHeap(), 0, object ); 2559 return status; 2560 } 2561 2562 object->type = TP_OBJECT_TYPE_IO; 2563 object->u.io.callback = callback; 2564 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) ))) 2565 { 2566 tp_threadpool_unlock( pool ); 2567 RtlFreeHeap( GetProcessHeap(), 0, object ); 2568 return status; 2569 } 2570 2571 if ((status = tp_ioqueue_lock( object, file ))) 2572 { 2573 tp_threadpool_unlock( pool ); 2574 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions ); 2575 RtlFreeHeap( GetProcessHeap(), 0, object ); 2576 return status; 2577 } 2578 2579 tp_object_initialize( object, pool, userdata, environment ); 2580 2581 *out = (TP_IO *)object; 2582 return STATUS_SUCCESS; 2583} 2584 2585/*********************************************************************** 2586 * TpAllocPool (NTDLL.@) 2587 */ 2588NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved ) 2589{ 2590 TRACE( "%p %p\n", out, reserved ); 2591 2592 if (reserved) 2593 FIXME( "reserved argument is nonzero (%p)\n", reserved ); 2594 2595 return tp_threadpool_alloc( (struct threadpool **)out ); 2596} 2597 2598/*********************************************************************** 2599 * TpAllocTimer (NTDLL.@) 2600 */ 2601NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata, 2602 TP_CALLBACK_ENVIRON *environment ) 2603{ 2604 struct threadpool_object *object; 2605 struct threadpool *pool; 2606 NTSTATUS status; 2607 2608 TRACE( "%p %p %p %p\n", out, callback, userdata, environment ); 2609 2610 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) ); 2611 if (!object) 2612 return STATUS_NO_MEMORY; 2613 2614 status = tp_threadpool_lock( &pool, environment ); 2615 if (status) 2616 { 2617 RtlFreeHeap( GetProcessHeap(), 0, object ); 2618 return status; 2619 } 2620 2621 object->type = TP_OBJECT_TYPE_TIMER; 2622 object->u.timer.callback = callback; 2623 2624 status = tp_timerqueue_lock( object ); 2625 if (status) 2626 { 2627 tp_threadpool_unlock( pool ); 2628 RtlFreeHeap( GetProcessHeap(), 0, object ); 2629 return status; 2630 } 2631 2632 tp_object_initialize( object, pool, userdata, environment ); 2633 2634 *out = (TP_TIMER *)object; 2635 return STATUS_SUCCESS; 2636} 2637 2638static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, 2639 TP_CALLBACK_ENVIRON *environment, DWORD flags ) 2640{ 2641 struct threadpool_object *object; 2642 struct threadpool *pool; 2643 NTSTATUS status; 2644 2645 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) ); 2646 if (!object) 2647 return STATUS_NO_MEMORY; 2648 2649 status = tp_threadpool_lock( &pool, environment ); 2650 if (status) 2651 { 2652 RtlFreeHeap( GetProcessHeap(), 0, object ); 2653 return status; 2654 } 2655 2656 object->type = TP_OBJECT_TYPE_WAIT; 2657 object->u.wait.callback = callback; 2658 object->u.wait.flags = flags; 2659 2660 status = tp_waitqueue_lock( object ); 2661 if (status) 2662 { 2663 tp_threadpool_unlock( pool ); 2664 RtlFreeHeap( GetProcessHeap(), 0, object ); 2665 return status; 2666 } 2667 2668 tp_object_initialize( object, pool, userdata, environment ); 2669 2670 *out = (TP_WAIT *)object; 2671 return STATUS_SUCCESS; 2672} 2673 2674/*********************************************************************** 2675 * TpAllocWait (NTDLL.@) 2676 */ 2677NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, 2678 TP_CALLBACK_ENVIRON *environment ) 2679{ 2680 TRACE( "%p %p %p %p\n", out, callback, userdata, environment ); 2681 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE ); 2682} 2683 2684/*********************************************************************** 2685 * TpAllocWork (NTDLL.@) 2686 */ 2687NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata, 2688 TP_CALLBACK_ENVIRON *environment ) 2689{ 2690 struct threadpool_object *object; 2691 struct threadpool *pool; 2692 NTSTATUS status; 2693 2694 TRACE( "%p %p %p %p\n", out, callback, userdata, environment ); 2695 2696 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) ); 2697 if (!object) 2698 return STATUS_NO_MEMORY; 2699 2700 status = tp_threadpool_lock( &pool, environment ); 2701 if (status) 2702 { 2703 RtlFreeHeap( GetProcessHeap(), 0, object ); 2704 return status; 2705 } 2706 2707 object->type = TP_OBJECT_TYPE_WORK; 2708 object->u.work.callback = callback; 2709 tp_object_initialize( object, pool, userdata, environment ); 2710 2711 *out = (TP_WORK *)object; 2712 return STATUS_SUCCESS; 2713} 2714 2715/*********************************************************************** 2716 * TpCancelAsyncIoOperation (NTDLL.@) 2717 */ 2718void WINAPI TpCancelAsyncIoOperation( TP_IO *io ) 2719{ 2720 struct threadpool_object *this = impl_from_TP_IO( io ); 2721 2722 TRACE( "%p\n", io ); 2723 2724 RtlEnterCriticalSection( &this->pool->cs ); 2725 2726 TRACE("pending_count %u.\n", this->u.io.pending_count); 2727 2728 this->u.io.pending_count--; 2729 if (object_is_finished( this, TRUE )) 2730 RtlWakeAllConditionVariable( &this->group_finished_event ); 2731 if (object_is_finished( this, FALSE )) 2732 RtlWakeAllConditionVariable( &this->finished_event ); 2733 2734 RtlLeaveCriticalSection( &this->pool->cs ); 2735} 2736 2737/*********************************************************************** 2738 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@) 2739 */ 2740VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit ) 2741{ 2742 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance ); 2743 2744 TRACE( "%p %p\n", instance, crit ); 2745 2746 if (!this->cleanup.critical_section) 2747 this->cleanup.critical_section = crit; 2748} 2749 2750/*********************************************************************** 2751 * TpCallbackMayRunLong (NTDLL.@) 2752 */ 2753NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance ) 2754{ 2755 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance ); 2756 struct threadpool_object *object = this->object; 2757 struct threadpool *pool; 2758 NTSTATUS status = STATUS_SUCCESS; 2759 2760 TRACE( "%p\n", instance ); 2761 2762 if (this->threadid != GetCurrentThreadId()) 2763 { 2764 ERR("called from wrong thread, ignoring\n"); 2765 return STATUS_UNSUCCESSFUL; /* FIXME */ 2766 } 2767 2768 if (this->may_run_long) 2769 return STATUS_SUCCESS; 2770 2771 pool = object->pool; 2772 RtlEnterCriticalSection( &pool->cs ); 2773 2774 /* Start new worker threads if required. */ 2775 if (pool->num_busy_workers >= pool->num_workers) 2776 { 2777 if (pool->num_workers < pool->max_workers) 2778 { 2779 status = tp_new_worker_thread( pool ); 2780 } 2781 else 2782 { 2783 status = STATUS_TOO_MANY_THREADS; 2784 } 2785 } 2786 2787 RtlLeaveCriticalSection( &pool->cs ); 2788 this->may_run_long = TRUE; 2789 return status; 2790} 2791 2792/*********************************************************************** 2793 * TpCallbackReleaseMutexOnCompletion (NTDLL.@) 2794 */ 2795VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex ) 2796{ 2797 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance ); 2798 2799 TRACE( "%p %p\n", instance, mutex ); 2800 2801 if (!this->cleanup.mutex) 2802 this->cleanup.mutex = mutex; 2803} 2804 2805/*********************************************************************** 2806 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@) 2807 */ 2808VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count ) 2809{ 2810 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance ); 2811 2812 TRACE( "%p %p %lu\n", instance, semaphore, count ); 2813 2814 if (!this->cleanup.semaphore) 2815 { 2816 this->cleanup.semaphore = semaphore; 2817 this->cleanup.semaphore_count = count; 2818 } 2819} 2820 2821/*********************************************************************** 2822 * TpCallbackSetEventOnCompletion (NTDLL.@) 2823 */ 2824VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event ) 2825{ 2826 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance ); 2827 2828 TRACE( "%p %p\n", instance, event ); 2829 2830 if (!this->cleanup.event) 2831 this->cleanup.event = event; 2832} 2833 2834/*********************************************************************** 2835 * TpCallbackUnloadDllOnCompletion (NTDLL.@) 2836 */ 2837VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module ) 2838{ 2839 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance ); 2840 2841 TRACE( "%p %p\n", instance, module ); 2842 2843 if (!this->cleanup.library) 2844 this->cleanup.library = module; 2845} 2846 2847/*********************************************************************** 2848 * TpDisassociateCallback (NTDLL.@) 2849 */ 2850VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance ) 2851{ 2852 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance ); 2853 struct threadpool_object *object = this->object; 2854 struct threadpool *pool; 2855 2856 TRACE( "%p\n", instance ); 2857 2858 if (this->threadid != GetCurrentThreadId()) 2859 { 2860 ERR("called from wrong thread, ignoring\n"); 2861 return; 2862 } 2863 2864 if (!this->associated) 2865 return; 2866 2867 pool = object->pool; 2868 RtlEnterCriticalSection( &pool->cs ); 2869 2870 object->num_associated_callbacks--; 2871 if (object_is_finished( object, FALSE )) 2872 RtlWakeAllConditionVariable( &object->finished_event ); 2873 2874 RtlLeaveCriticalSection( &pool->cs ); 2875 this->associated = FALSE; 2876} 2877 2878/*********************************************************************** 2879 * TpIsTimerSet (NTDLL.@) 2880 */ 2881BOOL WINAPI TpIsTimerSet( TP_TIMER *timer ) 2882{ 2883 struct threadpool_object *this = impl_from_TP_TIMER( timer ); 2884 2885 TRACE( "%p\n", timer ); 2886 2887 return this->u.timer.timer_set; 2888} 2889 2890/*********************************************************************** 2891 * TpPostWork (NTDLL.@) 2892 */ 2893VOID WINAPI TpPostWork( TP_WORK *work ) 2894{ 2895 struct threadpool_object *this = impl_from_TP_WORK( work ); 2896 2897 TRACE( "%p\n", work ); 2898 2899 tp_object_submit( this, FALSE ); 2900} 2901 2902/*********************************************************************** 2903 * TpReleaseCleanupGroup (NTDLL.@) 2904 */ 2905VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group ) 2906{ 2907 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group ); 2908 2909 TRACE( "%p\n", group ); 2910 2911 tp_group_shutdown( this ); 2912 tp_group_release( this ); 2913} 2914 2915/*********************************************************************** 2916 * TpReleaseCleanupGroupMembers (NTDLL.@) 2917 */ 2918VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata ) 2919{ 2920 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group ); 2921 struct threadpool_object *object, *next; 2922 struct list members; 2923 2924 TRACE( "%p %u %p\n", group, cancel_pending, userdata ); 2925 2926 RtlEnterCriticalSection( &this->cs ); 2927 2928 /* Unset group, increase references, and mark objects for shutdown */ 2929 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry ) 2930 { 2931 assert( object->group == this ); 2932 assert( object->is_group_member ); 2933 2934 if (InterlockedIncrement( &object->refcount ) == 1) 2935 { 2936 /* Object is basically already destroyed, but group reference 2937 * was not deleted yet. We can safely ignore this object. */ 2938 InterlockedDecrement( &object->refcount ); 2939 list_remove( &object->group_entry ); 2940 object->is_group_member = FALSE; 2941 continue; 2942 } 2943 2944 object->is_group_member = FALSE; 2945 tp_object_prepare_shutdown( object ); 2946 } 2947 2948 /* Move members to a new temporary list */ 2949 list_init( &members ); 2950 list_move_tail( &members, &this->members ); 2951 2952 RtlLeaveCriticalSection( &this->cs ); 2953 2954 /* Cancel pending callbacks if requested */ 2955 if (cancel_pending) 2956 { 2957 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry ) 2958 { 2959 tp_object_cancel( object ); 2960 } 2961 } 2962 2963 /* Wait for remaining callbacks to finish */ 2964 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry ) 2965 { 2966 tp_object_wait( object, TRUE ); 2967 2968 if (!object->shutdown) 2969 { 2970 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */ 2971 if (cancel_pending && object->group_cancel_callback) 2972 { 2973 TRACE( "executing group cancel callback %p(%p, %p)\n", 2974 object->group_cancel_callback, object->userdata, userdata ); 2975 object->group_cancel_callback( object->userdata, userdata ); 2976 TRACE( "callback %p returned\n", object->group_cancel_callback ); 2977 } 2978 2979 if (object->type != TP_OBJECT_TYPE_SIMPLE) 2980 tp_object_release( object ); 2981 } 2982 2983 object->shutdown = TRUE; 2984 tp_object_release( object ); 2985 } 2986} 2987 2988/*********************************************************************** 2989 * TpReleaseIoCompletion (NTDLL.@) 2990 */ 2991void WINAPI TpReleaseIoCompletion( TP_IO *io ) 2992{ 2993 struct threadpool_object *this = impl_from_TP_IO( io ); 2994 BOOL can_destroy; 2995 2996 TRACE( "%p\n", io ); 2997 2998 RtlEnterCriticalSection( &this->pool->cs ); 2999 this->u.io.shutting_down = TRUE; 3000 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count; 3001 RtlLeaveCriticalSection( &this->pool->cs ); 3002 3003 if (can_destroy) 3004 { 3005 tp_object_prepare_shutdown( this ); 3006 this->shutdown = TRUE; 3007 tp_object_release( this ); 3008 } 3009} 3010 3011/*********************************************************************** 3012 * TpReleasePool (NTDLL.@) 3013 */ 3014VOID WINAPI TpReleasePool( TP_POOL *pool ) 3015{ 3016 struct threadpool *this = impl_from_TP_POOL( pool ); 3017 3018 TRACE( "%p\n", pool ); 3019 3020 tp_threadpool_shutdown( this ); 3021 tp_threadpool_release( this ); 3022} 3023 3024/*********************************************************************** 3025 * TpReleaseTimer (NTDLL.@) 3026 */ 3027VOID WINAPI TpReleaseTimer( TP_TIMER *timer ) 3028{ 3029 struct threadpool_object *this = impl_from_TP_TIMER( timer ); 3030 3031 TRACE( "%p\n", timer ); 3032 3033 tp_object_prepare_shutdown( this ); 3034 this->shutdown = TRUE; 3035 tp_object_release( this ); 3036} 3037 3038/*********************************************************************** 3039 * TpReleaseWait (NTDLL.@) 3040 */ 3041VOID WINAPI TpReleaseWait( TP_WAIT *wait ) 3042{ 3043 struct threadpool_object *this = impl_from_TP_WAIT( wait ); 3044 3045 TRACE( "%p\n", wait ); 3046 3047 tp_object_prepare_shutdown( this ); 3048 this->shutdown = TRUE; 3049 tp_object_release( this ); 3050} 3051 3052/*********************************************************************** 3053 * TpReleaseWork (NTDLL.@) 3054 */ 3055VOID WINAPI TpReleaseWork( TP_WORK *work ) 3056{ 3057 struct threadpool_object *this = impl_from_TP_WORK( work ); 3058 3059 TRACE( "%p\n", work ); 3060 3061 tp_object_prepare_shutdown( this ); 3062 this->shutdown = TRUE; 3063 tp_object_release( this ); 3064} 3065 3066/*********************************************************************** 3067 * TpSetPoolMaxThreads (NTDLL.@) 3068 */ 3069VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum ) 3070{ 3071 struct threadpool *this = impl_from_TP_POOL( pool ); 3072 3073 TRACE( "%p %lu\n", pool, maximum ); 3074 3075 RtlEnterCriticalSection( &this->cs ); 3076 this->max_workers = max( maximum, 1 ); 3077 this->min_workers = min( this->min_workers, this->max_workers ); 3078 RtlLeaveCriticalSection( &this->cs ); 3079} 3080 3081/*********************************************************************** 3082 * TpSetPoolMinThreads (NTDLL.@) 3083 */ 3084BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum ) 3085{ 3086 struct threadpool *this = impl_from_TP_POOL( pool ); 3087 NTSTATUS status = STATUS_SUCCESS; 3088 3089 TRACE( "%p %lu\n", pool, minimum ); 3090 3091 RtlEnterCriticalSection( &this->cs ); 3092 3093 while (this->num_workers < minimum) 3094 { 3095 status = tp_new_worker_thread( this ); 3096 if (status != STATUS_SUCCESS) 3097 break; 3098 } 3099 3100 if (status == STATUS_SUCCESS) 3101 { 3102 this->min_workers = minimum; 3103 this->max_workers = max( this->min_workers, this->max_workers ); 3104 } 3105 3106 RtlLeaveCriticalSection( &this->cs ); 3107 return !status; 3108} 3109 3110/*********************************************************************** 3111 * TpSetTimer (NTDLL.@) 3112 */ 3113VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length ) 3114{ 3115 struct threadpool_object *this = impl_from_TP_TIMER( timer ); 3116 struct threadpool_object *other_timer; 3117 BOOL submit_timer = FALSE; 3118 ULONGLONG timestamp; 3119 3120 TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length ); 3121 3122 RtlEnterCriticalSection( &timerqueue.cs ); 3123 3124 assert( this->u.timer.timer_initialized ); 3125 this->u.timer.timer_set = timeout != NULL; 3126 3127 /* Convert relative timeout to absolute timestamp and handle a timeout 3128 * of zero, which means that the timer is submitted immediately. */ 3129 if (timeout) 3130 { 3131 timestamp = timeout->QuadPart; 3132 if ((LONGLONG)timestamp < 0) 3133 { 3134 LARGE_INTEGER now; 3135 NtQuerySystemTime( &now ); 3136 timestamp = now.QuadPart - timestamp; 3137 } 3138 else if (!timestamp) 3139 { 3140 if (!period) 3141 timeout = NULL; 3142 else 3143 { 3144 LARGE_INTEGER now; 3145 NtQuerySystemTime( &now ); 3146 timestamp = now.QuadPart + (ULONGLONG)period * 10000; 3147 } 3148 submit_timer = TRUE; 3149 } 3150 } 3151 3152 /* First remove existing timeout. */ 3153 if (this->u.timer.timer_pending) 3154 { 3155 list_remove( &this->u.timer.timer_entry ); 3156 this->u.timer.timer_pending = FALSE; 3157 } 3158 3159 /* If the timer was enabled, then add it back to the queue. */ 3160 if (timeout) 3161 { 3162 this->u.timer.timeout = timestamp; 3163 this->u.timer.period = period; 3164 this->u.timer.window_length = window_length; 3165 3166 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers, 3167 struct threadpool_object, u.timer.timer_entry ) 3168 { 3169 assert( other_timer->type == TP_OBJECT_TYPE_TIMER ); 3170 if (this->u.timer.timeout < other_timer->u.timer.timeout) 3171 break; 3172 } 3173 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry ); 3174 3175 /* Wake up the timer thread when the timeout has to be updated. */ 3176 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry ) 3177 RtlWakeAllConditionVariable( &timerqueue.update_event ); 3178 3179 this->u.timer.timer_pending = TRUE; 3180 } 3181 3182 RtlLeaveCriticalSection( &timerqueue.cs ); 3183 3184 if (submit_timer) 3185 tp_object_submit( this, FALSE ); 3186} 3187 3188/*********************************************************************** 3189 * TpSetWait (NTDLL.@) 3190 */ 3191VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout ) 3192{ 3193 struct threadpool_object *this = impl_from_TP_WAIT( wait ); 3194 ULONGLONG timestamp = MAXLONGLONG; 3195 3196 TRACE( "%p %p %p\n", wait, handle, timeout ); 3197 3198 RtlEnterCriticalSection( &waitqueue.cs ); 3199 3200 assert( this->u.wait.bucket ); 3201 this->u.wait.handle = handle; 3202 3203 if (handle || this->u.wait.wait_pending) 3204 { 3205 struct waitqueue_bucket *bucket = this->u.wait.bucket; 3206 list_remove( &this->u.wait.wait_entry ); 3207 3208 /* Convert relative timeout to absolute timestamp. */ 3209 if (handle && timeout) 3210 { 3211 timestamp = timeout->QuadPart; 3212 if ((LONGLONG)timestamp < 0) 3213 { 3214 LARGE_INTEGER now; 3215 NtQuerySystemTime( &now ); 3216 timestamp = now.QuadPart - timestamp; 3217 } 3218 } 3219 3220 /* Add wait object back into one of the queues. */ 3221 if (handle) 3222 { 3223 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry ); 3224 this->u.wait.wait_pending = TRUE; 3225 this->u.wait.timeout = timestamp; 3226 } 3227 else 3228 { 3229 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry ); 3230 this->u.wait.wait_pending = FALSE; 3231 } 3232 3233 /* Wake up the wait queue thread. */ 3234 NtSetEvent( bucket->update_event, NULL ); 3235 } 3236 3237 RtlLeaveCriticalSection( &waitqueue.cs ); 3238} 3239 3240/*********************************************************************** 3241 * TpSimpleTryPost (NTDLL.@) 3242 */ 3243NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, 3244 TP_CALLBACK_ENVIRON *environment ) 3245{ 3246 struct threadpool_object *object; 3247 struct threadpool *pool; 3248 NTSTATUS status; 3249 3250 TRACE( "%p %p %p\n", callback, userdata, environment ); 3251 3252 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) ); 3253 if (!object) 3254 return STATUS_NO_MEMORY; 3255 3256 status = tp_threadpool_lock( &pool, environment ); 3257 if (status) 3258 { 3259 RtlFreeHeap( GetProcessHeap(), 0, object ); 3260 return status; 3261 } 3262 3263 object->type = TP_OBJECT_TYPE_SIMPLE; 3264 object->u.simple.callback = callback; 3265 tp_object_initialize( object, pool, userdata, environment ); 3266 3267 return STATUS_SUCCESS; 3268} 3269 3270/*********************************************************************** 3271 * TpStartAsyncIoOperation (NTDLL.@) 3272 */ 3273void WINAPI TpStartAsyncIoOperation( TP_IO *io ) 3274{ 3275 struct threadpool_object *this = impl_from_TP_IO( io ); 3276 3277 TRACE( "%p\n", io ); 3278 3279 RtlEnterCriticalSection( &this->pool->cs ); 3280 3281 this->u.io.pending_count++; 3282 3283 RtlLeaveCriticalSection( &this->pool->cs ); 3284} 3285 3286/*********************************************************************** 3287 * TpWaitForIoCompletion (NTDLL.@) 3288 */ 3289void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending ) 3290{ 3291 struct threadpool_object *this = impl_from_TP_IO( io ); 3292 3293 TRACE( "%p %d\n", io, cancel_pending ); 3294 3295 if (cancel_pending) 3296 tp_object_cancel( this ); 3297 tp_object_wait( this, FALSE ); 3298} 3299 3300/*********************************************************************** 3301 * TpWaitForTimer (NTDLL.@) 3302 */ 3303VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending ) 3304{ 3305 struct threadpool_object *this = impl_from_TP_TIMER( timer ); 3306 3307 TRACE( "%p %d\n", timer, cancel_pending ); 3308 3309 if (cancel_pending) 3310 tp_object_cancel( this ); 3311 tp_object_wait( this, FALSE ); 3312} 3313 3314/*********************************************************************** 3315 * TpWaitForWait (NTDLL.@) 3316 */ 3317VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending ) 3318{ 3319 struct threadpool_object *this = impl_from_TP_WAIT( wait ); 3320 3321 TRACE( "%p %d\n", wait, cancel_pending ); 3322 3323 if (cancel_pending) 3324 tp_object_cancel( this ); 3325 tp_object_wait( this, FALSE ); 3326} 3327 3328/*********************************************************************** 3329 * TpWaitForWork (NTDLL.@) 3330 */ 3331VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending ) 3332{ 3333 struct threadpool_object *this = impl_from_TP_WORK( work ); 3334 3335 TRACE( "%p %u\n", work, cancel_pending ); 3336 3337 if (cancel_pending) 3338 tp_object_cancel( this ); 3339 tp_object_wait( this, FALSE ); 3340} 3341 3342/*********************************************************************** 3343 * TpSetPoolStackInformation (NTDLL.@) 3344 */ 3345NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info ) 3346{ 3347 struct threadpool *this = impl_from_TP_POOL( pool ); 3348 3349 TRACE( "%p %p\n", pool, stack_info ); 3350 3351 if (!stack_info) 3352 return STATUS_INVALID_PARAMETER; 3353 3354 RtlEnterCriticalSection( &this->cs ); 3355 this->stack_info = *stack_info; 3356 RtlLeaveCriticalSection( &this->cs ); 3357 3358 return STATUS_SUCCESS; 3359} 3360 3361/*********************************************************************** 3362 * TpQueryPoolStackInformation (NTDLL.@) 3363 */ 3364NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info ) 3365{ 3366 struct threadpool *this = impl_from_TP_POOL( pool ); 3367 3368 TRACE( "%p %p\n", pool, stack_info ); 3369 3370 if (!stack_info) 3371 return STATUS_INVALID_PARAMETER; 3372 3373 RtlEnterCriticalSection( &this->cs ); 3374 *stack_info = this->stack_info; 3375 RtlLeaveCriticalSection( &this->cs ); 3376 3377 return STATUS_SUCCESS; 3378} 3379 3380#ifndef __REACTOS__ 3381static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result ) 3382{ 3383 struct threadpool_object *object = impl_from_TP_WAIT(wait); 3384 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 ); 3385} 3386 3387/*********************************************************************** 3388 * RtlRegisterWait (NTDLL.@) 3389 * 3390 * Registers a wait for a handle to become signaled. 3391 * 3392 * PARAMS 3393 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it. 3394 * Object [I] Object to wait to become signaled. 3395 * Callback [I] Callback function to execute when the wait times out or the handle is signaled. 3396 * Context [I] Context to pass to the callback function when it is executed. 3397 * Milliseconds [I] Number of milliseconds to wait before timing out. 3398 * Flags [I] Flags. See notes. 3399 * 3400 * RETURNS 3401 * Success: STATUS_SUCCESS. 3402 * Failure: Any NTSTATUS code. 3403 * 3404 * NOTES 3405 * Flags can be one or more of the following: 3406 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread. 3407 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread. 3408 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent. 3409 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time. 3410 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token. 3411 */ 3412NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback, 3413 void *context, ULONG milliseconds, ULONG flags ) 3414{ 3415 struct threadpool_object *object; 3416 TP_CALLBACK_ENVIRON environment; 3417 LARGE_INTEGER timeout; 3418 NTSTATUS status; 3419 TP_WAIT *wait; 3420 3421 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n", 3422 out, handle, callback, context, milliseconds, flags ); 3423 3424 memset( &environment, 0, sizeof(environment) ); 3425 environment.Version = 1; 3426 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0; 3427 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0; 3428 3429 flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD); 3430 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags ))) 3431 return status; 3432 3433 object = impl_from_TP_WAIT(wait); 3434 object->u.wait.rtl_callback = callback; 3435 3436 RtlEnterCriticalSection( &waitqueue.cs ); 3437 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) ); 3438 3439 *out = object; 3440 RtlLeaveCriticalSection( &waitqueue.cs ); 3441 3442 return STATUS_SUCCESS; 3443} 3444 3445/*********************************************************************** 3446 * RtlDeregisterWaitEx (NTDLL.@) 3447 * 3448 * Cancels a wait operation and frees the resources associated with calling 3449 * RtlRegisterWait(). 3450 * 3451 * PARAMS 3452 * WaitObject [I] Handle to the wait object to free. 3453 * 3454 * RETURNS 3455 * Success: STATUS_SUCCESS. 3456 * Failure: Any NTSTATUS code. 3457 */ 3458NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event ) 3459{ 3460 struct threadpool_object *object = handle; 3461 NTSTATUS status; 3462 3463 TRACE( "handle %p, event %p\n", handle, event ); 3464 3465 if (!object) return STATUS_INVALID_HANDLE; 3466 3467 TpSetWait( (TP_WAIT *)object, NULL, NULL ); 3468 3469 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE ); 3470 else 3471 { 3472 assert( object->completed_event == NULL ); 3473 object->completed_event = event; 3474 } 3475 3476 RtlEnterCriticalSection( &object->pool->cs ); 3477 if (object->num_pending_callbacks + object->num_running_callbacks 3478 + object->num_associated_callbacks) status = STATUS_PENDING; 3479 else status = STATUS_SUCCESS; 3480 RtlLeaveCriticalSection( &object->pool->cs ); 3481 3482 TpReleaseWait( (TP_WAIT *)object ); 3483 return status; 3484} 3485 3486/*********************************************************************** 3487 * RtlDeregisterWait (NTDLL.@) 3488 * 3489 * Cancels a wait operation and frees the resources associated with calling 3490 * RtlRegisterWait(). 3491 * 3492 * PARAMS 3493 * WaitObject [I] Handle to the wait object to free. 3494 * 3495 * RETURNS 3496 * Success: STATUS_SUCCESS. 3497 * Failure: Any NTSTATUS code. 3498 */ 3499NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle) 3500{ 3501 return RtlDeregisterWaitEx(WaitHandle, NULL); 3502} 3503#endif 3504 3505#ifdef __REACTOS__ 3506VOID 3507NTAPI 3508RtlpInitializeThreadPooling( 3509 VOID) 3510{ 3511 RtlInitializeCriticalSection(&old_threadpool.threadpool_compl_cs); 3512 RtlInitializeCriticalSection(&timerqueue.cs); 3513 RtlInitializeCriticalSection(&waitqueue.cs); 3514 RtlInitializeCriticalSection(&ioqueue.cs); 3515} 3516#endif