Reactos
1/*
2 * Thread pooling
3 *
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
20 */
21
22
23#ifdef __REACTOS__
24#include <rtl_vista.h>
25#define NDEBUG
26#include "wine/list.h"
27#include <debug.h>
28
29#define ERR(fmt, ...) DPRINT1(fmt, ##__VA_ARGS__)
30#define FIXME(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
31#define WARN(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
32#define TRACE(fmt, ...) DPRINT(fmt, ##__VA_ARGS__)
33#ifndef ARRAY_SIZE
34#define ARRAY_SIZE(_x) (sizeof((_x))/sizeof((_x)[0]))
35#endif
36
37typedef void (CALLBACK *PNTAPCFUNC)(ULONG_PTR,ULONG_PTR,ULONG_PTR);
38typedef void (CALLBACK *PRTL_THREAD_START_ROUTINE)(LPVOID);
39typedef DWORD (CALLBACK *PRTL_WORK_ITEM_ROUTINE)(LPVOID);
40typedef void (NTAPI *RTL_WAITORTIMERCALLBACKFUNC)(PVOID,BOOLEAN);
41typedef VOID (CALLBACK *PRTL_OVERLAPPED_COMPLETION_ROUTINE)(DWORD,DWORD,LPVOID);
42
43typedef void (CALLBACK *PTP_IO_CALLBACK)(PTP_CALLBACK_INSTANCE,void*,void*,IO_STATUS_BLOCK*,PTP_IO);
44NTSYSAPI NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
45#define PRTL_WORK_ITEM_ROUTINE WORKERCALLBACKFUNC
46
47#define CRITICAL_SECTION RTL_CRITICAL_SECTION
48#define GetProcessHeap() RtlGetProcessHeap()
49#define GetCurrentProcess() NtCurrentProcess()
50#define GetCurrentThread() NtCurrentThread()
51#define GetCurrentThreadId() HandleToULong(NtCurrentTeb()->ClientId.UniqueThread)
52#else
53#include <assert.h>
54#include <stdarg.h>
55#include <limits.h>
56
57#include "ntstatus.h"
58#define WIN32_NO_STATUS
59#include "winternl.h"
60
61#include "wine/debug.h"
62#include "wine/list.h"
63
64#include "ntdll_misc.h"
65
66WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
67#endif
68
69/*
70 * Old thread pooling API
71 */
72
73struct rtl_work_item
74{
75 PRTL_WORK_ITEM_ROUTINE function;
76 PVOID context;
77};
78
79#define EXPIRE_NEVER (~(ULONGLONG)0)
80#define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
81
82#ifndef __REACTOS__
83static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
84#endif
85
86static struct
87{
88 HANDLE compl_port;
89 RTL_CRITICAL_SECTION threadpool_compl_cs;
90}
91old_threadpool =
92{
93 NULL, /* compl_port */
94#ifdef __REACTOS__
95 {0}, /* threadpool_compl_cs */
96#else
97 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
98#endif
99};
100
101#ifndef __REACTOS__
102static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
103{
104 0, 0, &old_threadpool.threadpool_compl_cs,
105 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
106 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
107};
108#endif
109
110struct timer_queue;
111struct queue_timer
112{
113 struct timer_queue *q;
114 struct list entry;
115 ULONG runcount; /* number of callbacks pending execution */
116 RTL_WAITORTIMERCALLBACKFUNC callback;
117 PVOID param;
118 DWORD period;
119 ULONG flags;
120 ULONGLONG expire;
121 BOOL destroy; /* timer should be deleted; once set, never unset */
122 HANDLE event; /* removal event */
123};
124
125struct timer_queue
126{
127 DWORD magic;
128 RTL_CRITICAL_SECTION cs;
129 struct list timers; /* sorted by expiration time */
130 BOOL quit; /* queue should be deleted; once set, never unset */
131 HANDLE event;
132 HANDLE thread;
133};
134
135/*
136 * Object-oriented thread pooling API
137 */
138
139#define THREADPOOL_WORKER_TIMEOUT 5000
140#define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
141
142/* internal threadpool representation */
143struct threadpool
144{
145 LONG refcount;
146 LONG objcount;
147 BOOL shutdown;
148 CRITICAL_SECTION cs;
149 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
150 struct list pools[3];
151 RTL_CONDITION_VARIABLE update_event;
152 /* information about worker threads, locked via .cs */
153 int max_workers;
154 int min_workers;
155 int num_workers;
156 int num_busy_workers;
157 HANDLE compl_port;
158 TP_POOL_STACK_INFORMATION stack_info;
159};
160
161enum threadpool_objtype
162{
163 TP_OBJECT_TYPE_SIMPLE,
164 TP_OBJECT_TYPE_WORK,
165 TP_OBJECT_TYPE_TIMER,
166 TP_OBJECT_TYPE_WAIT,
167 TP_OBJECT_TYPE_IO,
168};
169
170struct io_completion
171{
172 IO_STATUS_BLOCK iosb;
173 ULONG_PTR cvalue;
174};
175
176/* internal threadpool object representation */
177struct threadpool_object
178{
179 void *win32_callback; /* leave space for kernelbase to store win32 callback */
180 LONG refcount;
181 BOOL shutdown;
182 /* read-only information */
183 enum threadpool_objtype type;
184 struct threadpool *pool;
185 struct threadpool_group *group;
186 PVOID userdata;
187 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
188 PTP_SIMPLE_CALLBACK finalization_callback;
189 BOOL may_run_long;
190 HMODULE race_dll;
191 TP_CALLBACK_PRIORITY priority;
192 /* information about the group, locked via .group->cs */
193 struct list group_entry;
194 BOOL is_group_member;
195 /* information about the pool, locked via .pool->cs */
196 struct list pool_entry;
197 RTL_CONDITION_VARIABLE finished_event;
198 RTL_CONDITION_VARIABLE group_finished_event;
199 HANDLE completed_event;
200 LONG num_pending_callbacks;
201 LONG num_running_callbacks;
202 LONG num_associated_callbacks;
203 /* arguments for callback */
204 union
205 {
206 struct
207 {
208 PTP_SIMPLE_CALLBACK callback;
209 } simple;
210 struct
211 {
212 PTP_WORK_CALLBACK callback;
213 } work;
214 struct
215 {
216 PTP_TIMER_CALLBACK callback;
217 /* information about the timer, locked via timerqueue.cs */
218 BOOL timer_initialized;
219 BOOL timer_pending;
220 struct list timer_entry;
221 BOOL timer_set;
222 ULONGLONG timeout;
223 LONG period;
224 LONG window_length;
225 } timer;
226 struct
227 {
228 PTP_WAIT_CALLBACK callback;
229 LONG signaled;
230 /* information about the wait object, locked via waitqueue.cs */
231 struct waitqueue_bucket *bucket;
232 BOOL wait_pending;
233 struct list wait_entry;
234 ULONGLONG timeout;
235 HANDLE handle;
236 DWORD flags;
237 RTL_WAITORTIMERCALLBACKFUNC rtl_callback;
238 } wait;
239 struct
240 {
241 PTP_IO_CALLBACK callback;
242 /* locked via .pool->cs */
243 unsigned int pending_count, skipped_count, completion_count, completion_max;
244 BOOL shutting_down;
245 struct io_completion *completions;
246 } io;
247 } u;
248};
249
250/* internal threadpool instance representation */
251struct threadpool_instance
252{
253 struct threadpool_object *object;
254 DWORD threadid;
255 BOOL associated;
256 BOOL may_run_long;
257 struct
258 {
259 CRITICAL_SECTION *critical_section;
260 HANDLE mutex;
261 HANDLE semaphore;
262 LONG semaphore_count;
263 HANDLE event;
264 HMODULE library;
265 } cleanup;
266};
267
268/* internal threadpool group representation */
269struct threadpool_group
270{
271 LONG refcount;
272 BOOL shutdown;
273 CRITICAL_SECTION cs;
274 /* list of group members, locked via .cs */
275 struct list members;
276};
277
278#ifndef __REACTOS__
279/* global timerqueue object */
280static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
281#endif
282
283static struct
284{
285 CRITICAL_SECTION cs;
286 LONG objcount;
287 BOOL thread_running;
288 struct list pending_timers;
289 RTL_CONDITION_VARIABLE update_event;
290}
291timerqueue =
292{
293#ifdef __REACTOS__
294 {0}, /* cs */
295#else
296 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
297#endif
298 0, /* objcount */
299 FALSE, /* thread_running */
300 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
301#if __REACTOS__
302 0,
303#else
304 RTL_CONDITION_VARIABLE_INIT /* update_event */
305#endif
306};
307
308#ifndef __REACTOS__
309static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
310{
311 0, 0, &timerqueue.cs,
312 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
313 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
314};
315
316/* global waitqueue object */
317static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
318#endif
319
320static struct
321{
322 CRITICAL_SECTION cs;
323 LONG num_buckets;
324 struct list buckets;
325}
326waitqueue =
327{
328#ifdef __REACTOS__
329 {0}, /* cs */
330#else
331 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
332#endif
333 0, /* num_buckets */
334 LIST_INIT( waitqueue.buckets ) /* buckets */
335};
336
337#ifndef __REACTOS__
338static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
339{
340 0, 0, &waitqueue.cs,
341 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
342 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
343};
344#endif
345
346struct waitqueue_bucket
347{
348 struct list bucket_entry;
349 LONG objcount;
350 struct list reserved;
351 struct list waiting;
352 HANDLE update_event;
353 BOOL alertable;
354};
355
356#ifndef __REACTOS__
357/* global I/O completion queue object */
358static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
359#endif
360
361static struct
362{
363 CRITICAL_SECTION cs;
364 LONG objcount;
365 BOOL thread_running;
366 HANDLE port;
367 RTL_CONDITION_VARIABLE update_event;
368}
369ioqueue =
370{
371#ifdef __REACTOS__
372 .cs = {0},
373#else
374 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
375#endif
376};
377
378#ifndef __REACTOS__
379static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
380{
381 0, 0, &ioqueue.cs,
382 { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
383 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
384};
385#endif
386
387static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
388{
389 return (struct threadpool *)pool;
390}
391
392static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
393{
394 struct threadpool_object *object = (struct threadpool_object *)work;
395 assert( object->type == TP_OBJECT_TYPE_WORK );
396 return object;
397}
398
399static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
400{
401 struct threadpool_object *object = (struct threadpool_object *)timer;
402 assert( object->type == TP_OBJECT_TYPE_TIMER );
403 return object;
404}
405
406static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
407{
408 struct threadpool_object *object = (struct threadpool_object *)wait;
409 assert( object->type == TP_OBJECT_TYPE_WAIT );
410 return object;
411}
412
413static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
414{
415 struct threadpool_object *object = (struct threadpool_object *)io;
416 assert( object->type == TP_OBJECT_TYPE_IO );
417 return object;
418}
419
420static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
421{
422 return (struct threadpool_group *)group;
423}
424
425static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
426{
427 return (struct threadpool_instance *)instance;
428}
429
430#ifdef __REACTOS__
431ULONG NTAPI threadpool_worker_proc(PVOID param );
432#else
433static void CALLBACK threadpool_worker_proc( void *param );
434#endif
435static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
436static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
437static void tp_object_prepare_shutdown( struct threadpool_object *object );
438static BOOL tp_object_release( struct threadpool_object *object );
439static struct threadpool *default_threadpool = NULL;
440
441static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
442{
443 unsigned int new_capacity, max_capacity;
444 void *new_elements;
445
446 if (count <= *capacity)
447 return TRUE;
448
449 max_capacity = ~(SIZE_T)0 / size;
450 if (count > max_capacity)
451 return FALSE;
452
453 new_capacity = max(4, *capacity);
454 while (new_capacity < count && new_capacity <= max_capacity / 2)
455 new_capacity *= 2;
456 if (new_capacity < count)
457 new_capacity = max_capacity;
458
459 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
460 return FALSE;
461
462 *elements = new_elements;
463 *capacity = new_capacity;
464
465 return TRUE;
466}
467
468static void set_thread_name(const WCHAR *name)
469{
470#ifndef __REACTOS__ // This is impossible on non vista+
471 THREAD_NAME_INFORMATION info;
472
473 RtlInitUnicodeString(&info.ThreadName, name);
474 NtSetInformationThread(GetCurrentThread(), ThreadNameInformation, &info, sizeof(info));
475#endif
476}
477
478#ifndef __REACTOS__
479static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
480{
481 struct rtl_work_item *item = userdata;
482
483 TRACE("executing %p(%p)\n", item->function, item->context);
484 item->function( item->context );
485
486 RtlFreeHeap( GetProcessHeap(), 0, item );
487}
488
489/***********************************************************************
490 * RtlQueueWorkItem (NTDLL.@)
491 *
492 * Queues a work item into a thread in the thread pool.
493 *
494 * PARAMS
495 * function [I] Work function to execute.
496 * context [I] Context to pass to the work function when it is executed.
497 * flags [I] Flags. See notes.
498 *
499 * RETURNS
500 * Success: STATUS_SUCCESS.
501 * Failure: Any NTSTATUS code.
502 *
503 * NOTES
504 * Flags can be one or more of the following:
505 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
506 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
507 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
508 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
509 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
510 */
511NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
512{
513 TP_CALLBACK_ENVIRON environment;
514 struct rtl_work_item *item;
515 NTSTATUS status;
516
517 TRACE( "%p %p %lu\n", function, context, flags );
518
519 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
520 if (!item)
521 return STATUS_NO_MEMORY;
522
523 memset( &environment, 0, sizeof(environment) );
524 environment.Version = 1;
525 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
526 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
527
528 item->function = function;
529 item->context = context;
530
531 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
532 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
533 return status;
534}
535
536/***********************************************************************
537 * iocp_poller - get completion events and run callbacks
538 */
539static DWORD CALLBACK iocp_poller(LPVOID Arg)
540{
541 HANDLE cport = Arg;
542
543 while( TRUE )
544 {
545 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
546 LPVOID overlapped;
547 IO_STATUS_BLOCK iosb;
548#ifdef __REACTOS__
549 NTSTATUS res = NtRemoveIoCompletion( cport, (PVOID)&callback, (PVOID)&overlapped, &iosb, NULL );
550#else
551 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
552#endif
553 if (res)
554 {
555 ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
556 }
557 else
558 {
559 DWORD transferred = 0;
560 DWORD err = 0;
561
562 if (iosb.Status == STATUS_SUCCESS)
563 transferred = iosb.Information;
564 else
565 err = RtlNtStatusToDosError(iosb.Status);
566
567 callback( err, transferred, overlapped );
568 }
569 }
570 return 0;
571}
572
573/***********************************************************************
574 * RtlSetIoCompletionCallback (NTDLL.@)
575 *
576 * Binds a handle to a thread pool's completion port, and possibly
577 * starts a non-I/O thread to monitor this port and call functions back.
578 *
579 * PARAMS
580 * FileHandle [I] Handle to bind to a completion port.
581 * Function [I] Callback function to call on I/O completions.
582 * Flags [I] Not used.
583 *
584 * RETURNS
585 * Success: STATUS_SUCCESS.
586 * Failure: Any NTSTATUS code.
587 *
588 */
589NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
590{
591 IO_STATUS_BLOCK iosb;
592 FILE_COMPLETION_INFORMATION info;
593
594 if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
595
596 if (!old_threadpool.compl_port)
597 {
598 NTSTATUS res = STATUS_SUCCESS;
599
600 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
601 if (!old_threadpool.compl_port)
602 {
603 HANDLE cport;
604
605 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
606 if (!res)
607 {
608 /* FIXME native can start additional threads in case of e.g. hung callback function. */
609 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
610 if (!res)
611 old_threadpool.compl_port = cport;
612 else
613 NtClose( cport );
614 }
615 }
616 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
617 if (res) return res;
618 }
619
620 info.CompletionPort = old_threadpool.compl_port;
621 info.CompletionKey = (ULONG_PTR)Function;
622
623 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
624}
625#endif
626
627static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
628{
629 if (timeout == INFINITE) return NULL;
630 pTime->QuadPart = (ULONGLONG)timeout * -10000;
631 return pTime;
632}
633
634/************************** Timer Queue Impl **************************/
635
636static void queue_remove_timer(struct queue_timer *t)
637{
638 /* We MUST hold the queue cs while calling this function. This ensures
639 that we cannot queue another callback for this timer. The runcount
640 being zero makes sure we don't have any already queued. */
641 struct timer_queue *q = t->q;
642
643 assert(t->runcount == 0);
644 assert(t->destroy);
645
646 list_remove(&t->entry);
647 if (t->event)
648 NtSetEvent(t->event, NULL);
649 RtlFreeHeap(GetProcessHeap(), 0, t);
650
651 if (q->quit && list_empty(&q->timers))
652 NtSetEvent(q->event, NULL);
653}
654
655static void timer_cleanup_callback(struct queue_timer *t)
656{
657 struct timer_queue *q = t->q;
658 RtlEnterCriticalSection(&q->cs);
659
660 assert(0 < t->runcount);
661 --t->runcount;
662
663 if (t->destroy && t->runcount == 0)
664 queue_remove_timer(t);
665
666 RtlLeaveCriticalSection(&q->cs);
667}
668
669static DWORD WINAPI timer_callback_wrapper(LPVOID p)
670{
671 struct queue_timer *t = p;
672 t->callback(t->param, TRUE);
673 timer_cleanup_callback(t);
674 return 0;
675}
676
677static inline ULONGLONG queue_current_time(void)
678{
679 LARGE_INTEGER now, freq;
680 NtQueryPerformanceCounter(&now, &freq);
681 return now.QuadPart * 1000 / freq.QuadPart;
682}
683
684static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
685 BOOL set_event)
686{
687 /* We MUST hold the queue cs while calling this function. */
688 struct timer_queue *q = t->q;
689 struct list *ptr = &q->timers;
690
691 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
692
693 if (time != EXPIRE_NEVER)
694 LIST_FOR_EACH(ptr, &q->timers)
695 {
696 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
697 if (time < cur->expire)
698 break;
699 }
700 list_add_before(ptr, &t->entry);
701
702 t->expire = time;
703
704 /* If we insert at the head of the list, we need to expire sooner
705 than expected. */
706 if (set_event && &t->entry == list_head(&q->timers))
707 NtSetEvent(q->event, NULL);
708}
709
710static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
711 BOOL set_event)
712{
713 /* We MUST hold the queue cs while calling this function. */
714 list_remove(&t->entry);
715 queue_add_timer(t, time, set_event);
716}
717
718static void queue_timer_expire(struct timer_queue *q)
719{
720 struct queue_timer *t = NULL;
721
722 RtlEnterCriticalSection(&q->cs);
723 if (list_head(&q->timers))
724 {
725 ULONGLONG now, next;
726 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
727 if (!t->destroy && t->expire <= ((now = queue_current_time())))
728 {
729 ++t->runcount;
730 if (t->period)
731 {
732 next = t->expire + t->period;
733 /* avoid trigger cascade if overloaded / hibernated */
734 if (next < now)
735 next = now + t->period;
736 }
737 else
738 next = EXPIRE_NEVER;
739 queue_move_timer(t, next, FALSE);
740 }
741 else
742 t = NULL;
743 }
744 RtlLeaveCriticalSection(&q->cs);
745
746 if (t)
747 {
748 if (t->flags & WT_EXECUTEINTIMERTHREAD)
749 timer_callback_wrapper(t);
750 else
751 {
752 ULONG flags
753 = (t->flags
754 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
755 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
756 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
757 if (status != STATUS_SUCCESS)
758 timer_cleanup_callback(t);
759 }
760 }
761}
762
763static ULONG queue_get_timeout(struct timer_queue *q)
764{
765 struct queue_timer *t;
766 ULONG timeout = INFINITE;
767
768 RtlEnterCriticalSection(&q->cs);
769 if (list_head(&q->timers))
770 {
771 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
772 assert(!t->destroy || t->expire == EXPIRE_NEVER);
773
774 if (t->expire != EXPIRE_NEVER)
775 {
776 ULONGLONG time = queue_current_time();
777 timeout = t->expire < time ? 0 : t->expire - time;
778 }
779 }
780 RtlLeaveCriticalSection(&q->cs);
781
782 return timeout;
783}
784
785#ifdef __REACTOS__
786ULONG NTAPI timer_queue_thread_proc(PVOID p)
787#else
788static void WINAPI timer_queue_thread_proc(LPVOID p)
789#endif
790{
791 struct timer_queue *q = p;
792 ULONG timeout_ms;
793
794 set_thread_name(L"wine_threadpool_timer_queue");
795 timeout_ms = INFINITE;
796 for (;;)
797 {
798 LARGE_INTEGER timeout;
799 NTSTATUS status;
800 BOOL done = FALSE;
801
802 status = NtWaitForSingleObject(
803 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
804
805 if (status == STATUS_WAIT_0)
806 {
807 /* There are two possible ways to trigger the event. Either
808 we are quitting and the last timer got removed, or a new
809 timer got put at the head of the list so we need to adjust
810 our timeout. */
811 RtlEnterCriticalSection(&q->cs);
812 if (q->quit && list_empty(&q->timers))
813 done = TRUE;
814 RtlLeaveCriticalSection(&q->cs);
815 }
816 else if (status == STATUS_TIMEOUT)
817 queue_timer_expire(q);
818
819 if (done)
820 break;
821
822 timeout_ms = queue_get_timeout(q);
823 }
824
825 NtClose(q->event);
826 RtlDeleteCriticalSection(&q->cs);
827 q->magic = 0;
828 RtlFreeHeap(GetProcessHeap(), 0, q);
829 RtlExitUserThread( 0 );
830#ifdef __REACTOS__
831 return STATUS_SUCCESS;
832#endif
833}
834
835#ifndef __REACTOS__
836
837static void queue_destroy_timer(struct queue_timer *t)
838{
839 /* We MUST hold the queue cs while calling this function. */
840 t->destroy = TRUE;
841 if (t->runcount == 0)
842 /* Ensure a timer is promptly removed. If callbacks are pending,
843 it will be removed after the last one finishes by the callback
844 cleanup wrapper. */
845 queue_remove_timer(t);
846 else
847 /* Make sure no destroyed timer masks an active timer at the head
848 of the sorted list. */
849 queue_move_timer(t, EXPIRE_NEVER, FALSE);
850}
851
852/***********************************************************************
853 * RtlCreateTimerQueue (NTDLL.@)
854 *
855 * Creates a timer queue object and returns a handle to it.
856 *
857 * PARAMS
858 * NewTimerQueue [O] The newly created queue.
859 *
860 * RETURNS
861 * Success: STATUS_SUCCESS.
862 * Failure: Any NTSTATUS code.
863 */
864NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
865{
866 NTSTATUS status;
867 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
868 if (!q)
869 return STATUS_NO_MEMORY;
870
871 RtlInitializeCriticalSection(&q->cs);
872 list_init(&q->timers);
873 q->quit = FALSE;
874 q->magic = TIMER_QUEUE_MAGIC;
875 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
876 if (status != STATUS_SUCCESS)
877 {
878 RtlFreeHeap(GetProcessHeap(), 0, q);
879 return status;
880 }
881 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
882 timer_queue_thread_proc, q, &q->thread, NULL);
883 if (status != STATUS_SUCCESS)
884 {
885 NtClose(q->event);
886 RtlFreeHeap(GetProcessHeap(), 0, q);
887 return status;
888 }
889
890 *NewTimerQueue = q;
891 return STATUS_SUCCESS;
892}
893
894/***********************************************************************
895 * RtlDeleteTimerQueueEx (NTDLL.@)
896 *
897 * Deletes a timer queue object.
898 *
899 * PARAMS
900 * TimerQueue [I] The timer queue to destroy.
901 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
902 * wait until all timers are finished firing before
903 * returning. Otherwise, return immediately and set the
904 * event when all timers are done.
905 *
906 * RETURNS
907 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
908 * Failure: Any NTSTATUS code.
909 */
910NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
911{
912 struct timer_queue *q = TimerQueue;
913 struct queue_timer *t, *temp;
914 HANDLE thread;
915 NTSTATUS status;
916
917 if (!q || q->magic != TIMER_QUEUE_MAGIC)
918 return STATUS_INVALID_HANDLE;
919
920 thread = q->thread;
921
922 RtlEnterCriticalSection(&q->cs);
923 q->quit = TRUE;
924 if (list_head(&q->timers))
925 /* When the last timer is removed, it will signal the timer thread to
926 exit... */
927 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
928 queue_destroy_timer(t);
929 else
930 /* However if we have none, we must do it ourselves. */
931 NtSetEvent(q->event, NULL);
932 RtlLeaveCriticalSection(&q->cs);
933
934 if (CompletionEvent == INVALID_HANDLE_VALUE)
935 {
936 NtWaitForSingleObject(thread, FALSE, NULL);
937 status = STATUS_SUCCESS;
938 }
939 else
940 {
941 if (CompletionEvent)
942 {
943 FIXME("asynchronous return on completion event unimplemented\n");
944 NtWaitForSingleObject(thread, FALSE, NULL);
945 NtSetEvent(CompletionEvent, NULL);
946 }
947 status = STATUS_PENDING;
948 }
949
950 NtClose(thread);
951 return status;
952}
953
954static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
955{
956 static struct timer_queue *default_timer_queue;
957
958 if (TimerQueue)
959 return TimerQueue;
960 else
961 {
962 if (!default_timer_queue)
963 {
964 HANDLE q;
965 NTSTATUS status = RtlCreateTimerQueue(&q);
966 if (status == STATUS_SUCCESS)
967 {
968 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
969 if (p)
970 /* Got beat to the punch. */
971 RtlDeleteTimerQueueEx(q, NULL);
972 }
973 }
974 return default_timer_queue;
975 }
976}
977
978/***********************************************************************
979 * RtlCreateTimer (NTDLL.@)
980 *
981 * Creates a new timer associated with the given queue.
982 *
983 * PARAMS
984 * TimerQueue [I] The queue to hold the timer.
985 * NewTimer [O] The newly created timer.
986 * Callback [I] The callback to fire.
987 * Parameter [I] The argument for the callback.
988 * DueTime [I] The delay, in milliseconds, before first firing the
989 * timer.
990 * Period [I] The period, in milliseconds, at which to fire the timer
991 * after the first callback. If zero, the timer will only
992 * fire once. It still needs to be deleted with
993 * RtlDeleteTimer.
994 * Flags [I] Flags controlling the execution of the callback. In
995 * addition to the WT_* thread pool flags (see
996 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
997 * WT_EXECUTEONLYONCE are supported.
998 *
999 * RETURNS
1000 * Success: STATUS_SUCCESS.
1001 * Failure: Any NTSTATUS code.
1002 */
1003NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer,
1004 RTL_WAITORTIMERCALLBACKFUNC Callback,
1005 PVOID Parameter, DWORD DueTime, DWORD Period,
1006 ULONG Flags)
1007{
1008 NTSTATUS status;
1009 struct queue_timer *t;
1010 struct timer_queue *q = get_timer_queue(TimerQueue);
1011
1012 if (!q) return STATUS_NO_MEMORY;
1013 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1014
1015 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1016 if (!t)
1017 return STATUS_NO_MEMORY;
1018
1019 t->q = q;
1020 t->runcount = 0;
1021 t->callback = Callback;
1022 t->param = Parameter;
1023 t->period = Period;
1024 t->flags = Flags;
1025 t->destroy = FALSE;
1026 t->event = NULL;
1027
1028 status = STATUS_SUCCESS;
1029 RtlEnterCriticalSection(&q->cs);
1030 if (q->quit)
1031 status = STATUS_INVALID_HANDLE;
1032 else
1033 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
1034 RtlLeaveCriticalSection(&q->cs);
1035
1036 if (status == STATUS_SUCCESS)
1037 *NewTimer = t;
1038 else
1039 RtlFreeHeap(GetProcessHeap(), 0, t);
1040
1041 return status;
1042}
1043
1044/***********************************************************************
1045 * RtlUpdateTimer (NTDLL.@)
1046 *
1047 * Changes the time at which a timer expires.
1048 *
1049 * PARAMS
1050 * TimerQueue [I] The queue that holds the timer.
1051 * Timer [I] The timer to update.
1052 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1053 * Period [I] The period, in milliseconds, at which to fire the timer
1054 * after the first callback. If zero, the timer will not
1055 * refire once. It still needs to be deleted with
1056 * RtlDeleteTimer.
1057 *
1058 * RETURNS
1059 * Success: STATUS_SUCCESS.
1060 * Failure: Any NTSTATUS code.
1061 */
1062NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
1063 DWORD DueTime, DWORD Period)
1064{
1065 struct queue_timer *t = Timer;
1066 struct timer_queue *q = t->q;
1067
1068 RtlEnterCriticalSection(&q->cs);
1069 /* Can't change a timer if it was once-only or destroyed. */
1070 if (t->expire != EXPIRE_NEVER)
1071 {
1072 t->period = Period;
1073 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
1074 }
1075 RtlLeaveCriticalSection(&q->cs);
1076
1077 return STATUS_SUCCESS;
1078}
1079
1080/***********************************************************************
1081 * RtlDeleteTimer (NTDLL.@)
1082 *
1083 * Cancels a timer-queue timer.
1084 *
1085 * PARAMS
1086 * TimerQueue [I] The queue that holds the timer.
1087 * Timer [I] The timer to update.
1088 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1089 * wait until the timer is finished firing all pending
1090 * callbacks before returning. Otherwise, return
1091 * immediately and set the timer is done.
1092 *
1093 * RETURNS
1094 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1095 or if the completion event is NULL.
1096 * Failure: Any NTSTATUS code.
1097 */
1098NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1099 HANDLE CompletionEvent)
1100{
1101 struct queue_timer *t = Timer;
1102 struct timer_queue *q;
1103 NTSTATUS status = STATUS_PENDING;
1104 HANDLE event = NULL;
1105
1106 if (!Timer)
1107 return STATUS_INVALID_PARAMETER_1;
1108 q = t->q;
1109 if (CompletionEvent == INVALID_HANDLE_VALUE)
1110 {
1111 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1112 if (status == STATUS_SUCCESS)
1113 status = STATUS_PENDING;
1114 }
1115 else if (CompletionEvent)
1116 event = CompletionEvent;
1117
1118 RtlEnterCriticalSection(&q->cs);
1119 t->event = event;
1120 if (t->runcount == 0 && event)
1121 status = STATUS_SUCCESS;
1122 queue_destroy_timer(t);
1123 RtlLeaveCriticalSection(&q->cs);
1124
1125 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1126 {
1127 if (status == STATUS_PENDING)
1128 {
1129 NtWaitForSingleObject(event, FALSE, NULL);
1130 status = STATUS_SUCCESS;
1131 }
1132 NtClose(event);
1133 }
1134
1135 return status;
1136}
1137#endif
1138/***********************************************************************
1139 * timerqueue_thread_proc (internal)
1140 */
1141#ifdef __REACTOS__
1142ULONG NTAPI timerqueue_thread_proc(PVOID param )
1143#else
1144static void CALLBACK timerqueue_thread_proc( void *param )
1145#endif
1146{
1147 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1148 struct threadpool_object *other_timer;
1149 LARGE_INTEGER now, timeout;
1150 struct list *ptr;
1151
1152 TRACE( "starting timer queue thread\n" );
1153 set_thread_name(L"wine_threadpool_timerqueue");
1154
1155 RtlEnterCriticalSection( &timerqueue.cs );
1156 for (;;)
1157 {
1158 NtQuerySystemTime( &now );
1159
1160 /* Check for expired timers. */
1161 while ((ptr = list_head( &timerqueue.pending_timers )))
1162 {
1163 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1164 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1165 assert( timer->u.timer.timer_pending );
1166 if (timer->u.timer.timeout > now.QuadPart)
1167 break;
1168
1169 /* Queue a new callback in one of the worker threads. */
1170 list_remove( &timer->u.timer.timer_entry );
1171 timer->u.timer.timer_pending = FALSE;
1172 tp_object_submit( timer, FALSE );
1173
1174 /* Insert the timer back into the queue, except it's marked for shutdown. */
1175 if (timer->u.timer.period && !timer->shutdown)
1176 {
1177 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1178 if (timer->u.timer.timeout <= now.QuadPart)
1179 timer->u.timer.timeout = now.QuadPart + 1;
1180
1181 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1182 struct threadpool_object, u.timer.timer_entry )
1183 {
1184 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1185 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1186 break;
1187 }
1188 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1189 timer->u.timer.timer_pending = TRUE;
1190 }
1191 }
1192
1193 timeout_lower = timeout_upper = MAXLONGLONG;
1194
1195 /* Determine next timeout and use the window length to optimize wakeup times. */
1196 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1197 struct threadpool_object, u.timer.timer_entry )
1198 {
1199 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1200 if (other_timer->u.timer.timeout >= timeout_upper)
1201 break;
1202
1203 timeout_lower = other_timer->u.timer.timeout;
1204 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1205 if (new_timeout < timeout_upper)
1206 timeout_upper = new_timeout;
1207 }
1208
1209 /* Wait for timer update events or until the next timer expires. */
1210 if (timerqueue.objcount)
1211 {
1212 timeout.QuadPart = timeout_lower;
1213 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1214 continue;
1215 }
1216
1217 /* All timers have been destroyed, if no new timers are created
1218 * within some amount of time, then we can shutdown this thread. */
1219 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1220 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1221 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1222 {
1223 break;
1224 }
1225 }
1226
1227 timerqueue.thread_running = FALSE;
1228 RtlLeaveCriticalSection( &timerqueue.cs );
1229
1230 TRACE( "terminating timer queue thread\n" );
1231 RtlExitUserThread( 0 );
1232#ifdef __REACTOS__
1233 return STATUS_SUCCESS;
1234#endif
1235}
1236
1237/***********************************************************************
1238 * tp_new_worker_thread (internal)
1239 *
1240 * Create and account a new worker thread for the desired pool.
1241 */
1242static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1243{
1244 HANDLE thread;
1245 NTSTATUS status;
1246
1247 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0,
1248 pool->stack_info.StackReserve, pool->stack_info.StackCommit,
1249 threadpool_worker_proc, pool, &thread, NULL );
1250 if (status == STATUS_SUCCESS)
1251 {
1252 InterlockedIncrement( &pool->refcount );
1253 pool->num_workers++;
1254 NtClose( thread );
1255 }
1256 return status;
1257}
1258
1259/***********************************************************************
1260 * tp_timerqueue_lock (internal)
1261 *
1262 * Acquires a lock on the global timerqueue. When the lock is acquired
1263 * successfully, it is guaranteed that the timer thread is running.
1264 */
1265static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1266{
1267 NTSTATUS status = STATUS_SUCCESS;
1268 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1269
1270 timer->u.timer.timer_initialized = FALSE;
1271 timer->u.timer.timer_pending = FALSE;
1272 timer->u.timer.timer_set = FALSE;
1273 timer->u.timer.timeout = 0;
1274 timer->u.timer.period = 0;
1275 timer->u.timer.window_length = 0;
1276
1277 RtlEnterCriticalSection( &timerqueue.cs );
1278
1279 /* Make sure that the timerqueue thread is running. */
1280 if (!timerqueue.thread_running)
1281 {
1282 HANDLE thread;
1283 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1284 timerqueue_thread_proc, NULL, &thread, NULL );
1285 if (status == STATUS_SUCCESS)
1286 {
1287 timerqueue.thread_running = TRUE;
1288 NtClose( thread );
1289 }
1290 }
1291
1292 if (status == STATUS_SUCCESS)
1293 {
1294 timer->u.timer.timer_initialized = TRUE;
1295 timerqueue.objcount++;
1296 }
1297
1298 RtlLeaveCriticalSection( &timerqueue.cs );
1299 return status;
1300}
1301
1302/***********************************************************************
1303 * tp_timerqueue_unlock (internal)
1304 *
1305 * Releases a lock on the global timerqueue.
1306 */
1307static void tp_timerqueue_unlock( struct threadpool_object *timer )
1308{
1309 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1310
1311 RtlEnterCriticalSection( &timerqueue.cs );
1312 if (timer->u.timer.timer_initialized)
1313 {
1314 /* If timer was pending, remove it. */
1315 if (timer->u.timer.timer_pending)
1316 {
1317 list_remove( &timer->u.timer.timer_entry );
1318 timer->u.timer.timer_pending = FALSE;
1319 }
1320
1321 /* If the last timer object was destroyed, then wake up the thread. */
1322 if (!--timerqueue.objcount)
1323 {
1324 assert( list_empty( &timerqueue.pending_timers ) );
1325 RtlWakeAllConditionVariable( &timerqueue.update_event );
1326 }
1327
1328 timer->u.timer.timer_initialized = FALSE;
1329 }
1330 RtlLeaveCriticalSection( &timerqueue.cs );
1331}
1332
1333/***********************************************************************
1334 * waitqueue_thread_proc (internal)
1335 */
1336#ifdef __REACTOS__
1337void NTAPI waitqueue_thread_proc(PVOID param )
1338#else
1339static void CALLBACK waitqueue_thread_proc( void *param )
1340#endif
1341{
1342 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1343 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1344 struct waitqueue_bucket *bucket = param;
1345 struct threadpool_object *wait, *next;
1346 LARGE_INTEGER now, timeout;
1347 DWORD num_handles;
1348 NTSTATUS status;
1349
1350 TRACE( "starting wait queue thread\n" );
1351 set_thread_name(L"wine_threadpool_waitqueue");
1352
1353 RtlEnterCriticalSection( &waitqueue.cs );
1354
1355 for (;;)
1356 {
1357 NtQuerySystemTime( &now );
1358 timeout.QuadPart = MAXLONGLONG;
1359 num_handles = 0;
1360
1361 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1362 u.wait.wait_entry )
1363 {
1364 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1365 if (wait->u.wait.timeout <= now.QuadPart)
1366 {
1367 /* Wait object timed out. */
1368 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1369 {
1370 list_remove( &wait->u.wait.wait_entry );
1371 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1372 }
1373 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1374 {
1375 InterlockedIncrement( &wait->refcount );
1376 wait->num_pending_callbacks++;
1377 RtlEnterCriticalSection( &wait->pool->cs );
1378 tp_object_execute( wait, TRUE );
1379 RtlLeaveCriticalSection( &wait->pool->cs );
1380 tp_object_release( wait );
1381 }
1382 else tp_object_submit( wait, FALSE );
1383 }
1384 else
1385 {
1386 if (wait->u.wait.timeout < timeout.QuadPart)
1387 timeout.QuadPart = wait->u.wait.timeout;
1388
1389 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1390 InterlockedIncrement( &wait->refcount );
1391 objects[num_handles] = wait;
1392 handles[num_handles] = wait->u.wait.handle;
1393 num_handles++;
1394 }
1395 }
1396
1397 if (!bucket->objcount)
1398 {
1399 /* All wait objects have been destroyed, if no new wait objects are created
1400 * within some amount of time, then we can shutdown this thread. */
1401 assert( num_handles == 0 );
1402 RtlLeaveCriticalSection( &waitqueue.cs );
1403 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1404 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1405 RtlEnterCriticalSection( &waitqueue.cs );
1406
1407 if (status == STATUS_TIMEOUT && !bucket->objcount)
1408 break;
1409 }
1410 else
1411 {
1412 handles[num_handles] = bucket->update_event;
1413 RtlLeaveCriticalSection( &waitqueue.cs );
1414 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1415 RtlEnterCriticalSection( &waitqueue.cs );
1416
1417 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1418 {
1419 wait = objects[status - STATUS_WAIT_0];
1420 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1421 if (wait->u.wait.bucket)
1422 {
1423 /* Wait object signaled. */
1424 assert( wait->u.wait.bucket == bucket );
1425 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1426 {
1427 list_remove( &wait->u.wait.wait_entry );
1428 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1429 }
1430 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1431 {
1432 wait->u.wait.signaled++;
1433 wait->num_pending_callbacks++;
1434 RtlEnterCriticalSection( &wait->pool->cs );
1435 tp_object_execute( wait, TRUE );
1436 RtlLeaveCriticalSection( &wait->pool->cs );
1437 }
1438 else tp_object_submit( wait, TRUE );
1439 }
1440 else
1441 WARN("wait object %p triggered while object was destroyed\n", wait);
1442 }
1443
1444 /* Release temporary references to wait objects. */
1445 while (num_handles)
1446 {
1447 wait = objects[--num_handles];
1448 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1449 tp_object_release( wait );
1450 }
1451 }
1452
1453 /* Try to merge bucket with other threads. */
1454 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1455 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1456 {
1457 struct waitqueue_bucket *other_bucket;
1458 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1459 {
1460 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1461 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1462 {
1463 other_bucket->objcount += bucket->objcount;
1464 bucket->objcount = 0;
1465
1466 /* Update reserved list. */
1467 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1468 {
1469 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1470 wait->u.wait.bucket = other_bucket;
1471 }
1472 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1473
1474 /* Update waiting list. */
1475 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1476 {
1477 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1478 wait->u.wait.bucket = other_bucket;
1479 }
1480 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1481
1482 /* Move bucket to the end, to keep the probability of
1483 * newly added wait objects as small as possible. */
1484 list_remove( &bucket->bucket_entry );
1485 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1486
1487 NtSetEvent( other_bucket->update_event, NULL );
1488 break;
1489 }
1490 }
1491 }
1492 }
1493
1494 /* Remove this bucket from the list. */
1495 list_remove( &bucket->bucket_entry );
1496 if (!--waitqueue.num_buckets)
1497 assert( list_empty( &waitqueue.buckets ) );
1498
1499 RtlLeaveCriticalSection( &waitqueue.cs );
1500
1501 TRACE( "terminating wait queue thread\n" );
1502
1503 assert( bucket->objcount == 0 );
1504 assert( list_empty( &bucket->reserved ) );
1505 assert( list_empty( &bucket->waiting ) );
1506 NtClose( bucket->update_event );
1507
1508 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1509 RtlExitUserThread( 0 );
1510}
1511
1512/***********************************************************************
1513 * tp_waitqueue_lock (internal)
1514 */
1515static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1516{
1517 struct waitqueue_bucket *bucket;
1518 NTSTATUS status;
1519 HANDLE thread;
1520 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1521 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1522
1523 wait->u.wait.signaled = 0;
1524 wait->u.wait.bucket = NULL;
1525 wait->u.wait.wait_pending = FALSE;
1526 wait->u.wait.timeout = 0;
1527 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1528
1529 RtlEnterCriticalSection( &waitqueue.cs );
1530
1531 /* Try to assign to existing bucket if possible. */
1532 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1533 {
1534 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1535 {
1536 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1537 wait->u.wait.bucket = bucket;
1538 bucket->objcount++;
1539
1540 status = STATUS_SUCCESS;
1541 goto out;
1542 }
1543 }
1544
1545 /* Create a new bucket and corresponding worker thread. */
1546 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1547 if (!bucket)
1548 {
1549 status = STATUS_NO_MEMORY;
1550 goto out;
1551 }
1552
1553 bucket->objcount = 0;
1554 bucket->alertable = alertable;
1555 list_init( &bucket->reserved );
1556 list_init( &bucket->waiting );
1557
1558 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1559 NULL, SynchronizationEvent, FALSE );
1560 if (status)
1561 {
1562 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1563 goto out;
1564 }
1565
1566 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1567 (PTHREAD_START_ROUTINE)waitqueue_thread_proc, bucket, &thread, NULL );
1568 if (status == STATUS_SUCCESS)
1569 {
1570 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1571 waitqueue.num_buckets++;
1572
1573 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1574 wait->u.wait.bucket = bucket;
1575 bucket->objcount++;
1576
1577 NtClose( thread );
1578 }
1579 else
1580 {
1581 NtClose( bucket->update_event );
1582 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1583 }
1584
1585out:
1586 RtlLeaveCriticalSection( &waitqueue.cs );
1587 return status;
1588}
1589
1590/***********************************************************************
1591 * tp_waitqueue_unlock (internal)
1592 */
1593static void tp_waitqueue_unlock( struct threadpool_object *wait )
1594{
1595 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1596
1597 RtlEnterCriticalSection( &waitqueue.cs );
1598 if (wait->u.wait.bucket)
1599 {
1600 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1601 assert( bucket->objcount > 0 );
1602
1603 list_remove( &wait->u.wait.wait_entry );
1604 wait->u.wait.bucket = NULL;
1605 bucket->objcount--;
1606
1607 NtSetEvent( bucket->update_event, NULL );
1608 }
1609 RtlLeaveCriticalSection( &waitqueue.cs );
1610}
1611
1612#ifdef __REACTOS__
1613ULONG NTAPI ioqueue_thread_proc(PVOID param )
1614#else
1615static void CALLBACK ioqueue_thread_proc( void *param )
1616#endif
1617{
1618 struct io_completion *completion;
1619 struct threadpool_object *io;
1620 IO_STATUS_BLOCK iosb;
1621#ifdef __REACTOS__
1622 PVOID key, value;
1623#else
1624 ULONG_PTR key, value;
1625#endif
1626 BOOL destroy, skip;
1627 NTSTATUS status;
1628
1629 TRACE( "starting I/O completion thread\n" );
1630 set_thread_name(L"wine_threadpool_ioqueue");
1631
1632 RtlEnterCriticalSection( &ioqueue.cs );
1633
1634 for (;;)
1635 {
1636 RtlLeaveCriticalSection( &ioqueue.cs );
1637 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1638 ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
1639 RtlEnterCriticalSection( &ioqueue.cs );
1640
1641 destroy = skip = FALSE;
1642 io = (struct threadpool_object *)key;
1643
1644 TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
1645
1646 if (io && (io->shutdown || io->u.io.shutting_down))
1647 {
1648 RtlEnterCriticalSection( &io->pool->cs );
1649 if (!io->u.io.pending_count)
1650 {
1651 if (io->u.io.skipped_count)
1652 --io->u.io.skipped_count;
1653
1654 if (io->u.io.skipped_count)
1655 skip = TRUE;
1656 else
1657 destroy = TRUE;
1658 }
1659 RtlLeaveCriticalSection( &io->pool->cs );
1660 if (skip) continue;
1661 }
1662
1663 if (destroy)
1664 {
1665 --ioqueue.objcount;
1666 TRACE( "Releasing io %p.\n", io );
1667 io->shutdown = TRUE;
1668 tp_object_release( io );
1669 }
1670 else if (io)
1671 {
1672 RtlEnterCriticalSection( &io->pool->cs );
1673
1674 TRACE( "pending_count %u.\n", io->u.io.pending_count );
1675
1676 if (io->u.io.pending_count)
1677 {
1678 --io->u.io.pending_count;
1679 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1680 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1681 {
1682 ERR( "Failed to allocate memory.\n" );
1683 RtlLeaveCriticalSection( &io->pool->cs );
1684 continue;
1685 }
1686
1687 completion = &io->u.io.completions[io->u.io.completion_count++];
1688 completion->iosb = iosb;
1689#ifdef __REACTOS__
1690 completion->cvalue = (ULONG_PTR)value;
1691#else
1692 completion->cvalue = value;
1693#endif
1694
1695 tp_object_submit( io, FALSE );
1696 }
1697 RtlLeaveCriticalSection( &io->pool->cs );
1698 }
1699
1700 if (!ioqueue.objcount)
1701 {
1702 /* All I/O objects have been destroyed; if no new objects are
1703 * created within some amount of time, then we can shutdown this
1704 * thread. */
1705 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1706 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1707 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1708 break;
1709 }
1710 }
1711
1712 ioqueue.thread_running = FALSE;
1713 RtlLeaveCriticalSection( &ioqueue.cs );
1714
1715 TRACE( "terminating I/O completion thread\n" );
1716
1717 RtlExitUserThread( 0 );
1718
1719#ifdef __REACTOS__
1720 return STATUS_SUCCESS;
1721#endif
1722}
1723
1724static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
1725{
1726 NTSTATUS status = STATUS_SUCCESS;
1727
1728 assert( io->type == TP_OBJECT_TYPE_IO );
1729
1730 RtlEnterCriticalSection( &ioqueue.cs );
1731
1732 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1733 IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
1734 {
1735 RtlLeaveCriticalSection( &ioqueue.cs );
1736 return status;
1737 }
1738
1739 if (!ioqueue.thread_running)
1740 {
1741 HANDLE thread;
1742
1743 if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
1744 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1745 {
1746 ioqueue.thread_running = TRUE;
1747 NtClose( thread );
1748 }
1749 }
1750
1751 if (status == STATUS_SUCCESS)
1752 {
1753 FILE_COMPLETION_INFORMATION info;
1754 IO_STATUS_BLOCK iosb;
1755
1756#ifdef __REACTOS__
1757 info.Port = ioqueue.port;
1758 info.Key = io;
1759#else
1760 info.CompletionPort = ioqueue.port;
1761 info.CompletionKey = (ULONG_PTR)io;
1762#endif
1763
1764 status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
1765 }
1766
1767 if (status == STATUS_SUCCESS)
1768 {
1769 if (!ioqueue.objcount++)
1770 RtlWakeConditionVariable( &ioqueue.update_event );
1771 }
1772
1773 RtlLeaveCriticalSection( &ioqueue.cs );
1774 return status;
1775}
1776
1777/***********************************************************************
1778 * tp_threadpool_alloc (internal)
1779 *
1780 * Allocates a new threadpool object.
1781 */
1782static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1783{
1784#ifdef __REACTOS__
1785 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->ProcessEnvironmentBlock->ImageBaseAddress );
1786#else
1787 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress );
1788#endif
1789 struct threadpool *pool;
1790 unsigned int i;
1791
1792 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1793 if (!pool)
1794 return STATUS_NO_MEMORY;
1795
1796 pool->refcount = 1;
1797 pool->objcount = 0;
1798 pool->shutdown = FALSE;
1799
1800#ifdef __REACTOS__
1801 RtlInitializeCriticalSection( &pool->cs );
1802#else
1803 RtlInitializeCriticalSectionEx( &pool->cs, 0, RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO );
1804
1805 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1806#endif
1807
1808 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1809 list_init( &pool->pools[i] );
1810 RtlInitializeConditionVariable( &pool->update_event );
1811
1812 pool->max_workers = 500;
1813 pool->min_workers = 0;
1814 pool->num_workers = 0;
1815 pool->num_busy_workers = 0;
1816 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1817 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1818
1819 TRACE( "allocated threadpool %p\n", pool );
1820
1821 *out = pool;
1822 return STATUS_SUCCESS;
1823}
1824
1825/***********************************************************************
1826 * tp_threadpool_shutdown (internal)
1827 *
1828 * Prepares the shutdown of a threadpool object and notifies all worker
1829 * threads to terminate (after all remaining work items have been
1830 * processed).
1831 */
1832static void tp_threadpool_shutdown( struct threadpool *pool )
1833{
1834 assert( pool != default_threadpool );
1835
1836 pool->shutdown = TRUE;
1837 RtlWakeAllConditionVariable( &pool->update_event );
1838}
1839
1840/***********************************************************************
1841 * tp_threadpool_release (internal)
1842 *
1843 * Releases a reference to a threadpool object.
1844 */
1845static BOOL tp_threadpool_release( struct threadpool *pool )
1846{
1847 unsigned int i;
1848
1849 if (InterlockedDecrement( &pool->refcount ))
1850 return FALSE;
1851
1852 TRACE( "destroying threadpool %p\n", pool );
1853
1854 assert( pool->shutdown );
1855 assert( !pool->objcount );
1856 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1857 assert( list_empty( &pool->pools[i] ) );
1858#ifndef __REACTOS__
1859 pool->cs.DebugInfo->Spare[0] = 0;
1860#endif
1861 RtlDeleteCriticalSection( &pool->cs );
1862
1863 RtlFreeHeap( GetProcessHeap(), 0, pool );
1864 return TRUE;
1865}
1866
1867/***********************************************************************
1868 * tp_threadpool_lock (internal)
1869 *
1870 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1871 * block. When the lock is acquired successfully, it is guaranteed that
1872 * there is at least one worker thread to process tasks.
1873 */
1874static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1875{
1876 struct threadpool *pool = NULL;
1877 NTSTATUS status = STATUS_SUCCESS;
1878
1879 if (environment)
1880 {
1881#ifndef __REACTOS__ //Windows 7 stuff
1882 /* Validate environment parameters. */
1883 if (environment->Version == 3)
1884 {
1885 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1886
1887 switch (environment3->CallbackPriority)
1888 {
1889 case TP_CALLBACK_PRIORITY_HIGH:
1890 case TP_CALLBACK_PRIORITY_NORMAL:
1891 case TP_CALLBACK_PRIORITY_LOW:
1892 break;
1893 default:
1894 return STATUS_INVALID_PARAMETER;
1895 }
1896 }
1897#endif
1898 pool = (struct threadpool *)environment->Pool;
1899 }
1900
1901 if (!pool)
1902 {
1903 if (!default_threadpool)
1904 {
1905 status = tp_threadpool_alloc( &pool );
1906 if (status != STATUS_SUCCESS)
1907 return status;
1908
1909 if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool, NULL ) != NULL)
1910 {
1911 tp_threadpool_shutdown( pool );
1912 tp_threadpool_release( pool );
1913 }
1914 }
1915
1916 pool = default_threadpool;
1917 }
1918
1919 RtlEnterCriticalSection( &pool->cs );
1920
1921 /* Make sure that the threadpool has at least one thread. */
1922 if (!pool->num_workers)
1923 status = tp_new_worker_thread( pool );
1924
1925 /* Keep a reference, and increment objcount to ensure that the
1926 * last thread doesn't terminate. */
1927 if (status == STATUS_SUCCESS)
1928 {
1929 InterlockedIncrement( &pool->refcount );
1930 pool->objcount++;
1931 }
1932
1933 RtlLeaveCriticalSection( &pool->cs );
1934
1935 if (status != STATUS_SUCCESS)
1936 return status;
1937
1938 *out = pool;
1939 return STATUS_SUCCESS;
1940}
1941
1942/***********************************************************************
1943 * tp_threadpool_unlock (internal)
1944 *
1945 * Releases a lock on a threadpool.
1946 */
1947static void tp_threadpool_unlock( struct threadpool *pool )
1948{
1949 RtlEnterCriticalSection( &pool->cs );
1950 pool->objcount--;
1951 RtlLeaveCriticalSection( &pool->cs );
1952 tp_threadpool_release( pool );
1953}
1954
1955/***********************************************************************
1956 * tp_group_alloc (internal)
1957 *
1958 * Allocates a new threadpool group object.
1959 */
1960static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1961{
1962 struct threadpool_group *group;
1963
1964 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1965 if (!group)
1966 return STATUS_NO_MEMORY;
1967
1968 group->refcount = 1;
1969 group->shutdown = FALSE;
1970
1971#ifdef __REACTOS__
1972 RtlInitializeCriticalSection( &group->cs );
1973#else
1974 RtlInitializeCriticalSectionEx( &group->cs, 0, RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO );
1975
1976 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1977#endif
1978
1979 list_init( &group->members );
1980
1981 TRACE( "allocated group %p\n", group );
1982
1983 *out = group;
1984 return STATUS_SUCCESS;
1985}
1986
1987/***********************************************************************
1988 * tp_group_shutdown (internal)
1989 *
1990 * Marks the group object for shutdown.
1991 */
1992static void tp_group_shutdown( struct threadpool_group *group )
1993{
1994 group->shutdown = TRUE;
1995}
1996
1997/***********************************************************************
1998 * tp_group_release (internal)
1999 *
2000 * Releases a reference to a group object.
2001 */
2002static BOOL tp_group_release( struct threadpool_group *group )
2003{
2004 if (InterlockedDecrement( &group->refcount ))
2005 return FALSE;
2006
2007 TRACE( "destroying group %p\n", group );
2008
2009 assert( group->shutdown );
2010 assert( list_empty( &group->members ) );
2011
2012#ifndef __REACTOS__
2013 group->cs.DebugInfo->Spare[0] = 0;
2014#endif
2015 RtlDeleteCriticalSection( &group->cs );
2016
2017 RtlFreeHeap( GetProcessHeap(), 0, group );
2018 return TRUE;
2019}
2020
2021/***********************************************************************
2022 * tp_object_initialize (internal)
2023 *
2024 * Initializes members of a threadpool object.
2025 */
2026static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
2027 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
2028{
2029 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
2030
2031 object->refcount = 1;
2032 object->shutdown = FALSE;
2033
2034 object->pool = pool;
2035 object->group = NULL;
2036 object->userdata = userdata;
2037 object->group_cancel_callback = NULL;
2038 object->finalization_callback = NULL;
2039 object->may_run_long = 0;
2040 object->race_dll = NULL;
2041 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
2042
2043 memset( &object->group_entry, 0, sizeof(object->group_entry) );
2044 object->is_group_member = FALSE;
2045
2046 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
2047 RtlInitializeConditionVariable( &object->finished_event );
2048 RtlInitializeConditionVariable( &object->group_finished_event );
2049 object->completed_event = NULL;
2050 object->num_pending_callbacks = 0;
2051 object->num_running_callbacks = 0;
2052 object->num_associated_callbacks = 0;
2053
2054 if (environment)
2055 {
2056 if (environment->Version != 1 && environment->Version != 3)
2057 FIXME( "unsupported environment version %lu\n", environment->Version );
2058
2059 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
2060 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
2061 object->finalization_callback = environment->FinalizationCallback;
2062 object->may_run_long = environment->u.s.LongFunction != 0;
2063 object->race_dll = environment->RaceDll;
2064#ifndef __REACTOS__ //Windows 7 stuff
2065 if (environment->Version == 3)
2066 {
2067 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
2068
2069 object->priority = environment_v3->CallbackPriority;
2070 assert( object->priority < ARRAY_SIZE(pool->pools) );
2071 }
2072#endif
2073 if (environment->ActivationContext)
2074 FIXME( "activation context not supported yet\n" );
2075
2076 if (environment->u.s.Persistent)
2077 FIXME( "persistent threads not supported yet\n" );
2078 }
2079
2080 if (object->race_dll)
2081 LdrAddRefDll( 0, object->race_dll );
2082
2083 TRACE( "allocated object %p of type %u\n", object, object->type );
2084
2085 /* For simple callbacks we have to run tp_object_submit before adding this object
2086 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2087 * will be set, and tp_object_submit would fail with an assertion. */
2088
2089 if (is_simple_callback)
2090 tp_object_submit( object, FALSE );
2091
2092 if (object->group)
2093 {
2094 struct threadpool_group *group = object->group;
2095 InterlockedIncrement( &group->refcount );
2096
2097 RtlEnterCriticalSection( &group->cs );
2098 list_add_tail( &group->members, &object->group_entry );
2099 object->is_group_member = TRUE;
2100 RtlLeaveCriticalSection( &group->cs );
2101 }
2102
2103 if (is_simple_callback)
2104 tp_object_release( object );
2105}
2106
2107static void tp_object_prio_queue( struct threadpool_object *object )
2108{
2109 ++object->pool->num_busy_workers;
2110 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
2111}
2112
2113/***********************************************************************
2114 * tp_object_submit (internal)
2115 *
2116 * Submits a threadpool object to the associated threadpool. This
2117 * function has to be VOID because TpPostWork can never fail on Windows.
2118 */
2119static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
2120{
2121 struct threadpool *pool = object->pool;
2122 NTSTATUS status = STATUS_UNSUCCESSFUL;
2123
2124 assert( !object->shutdown );
2125 assert( !pool->shutdown );
2126
2127 RtlEnterCriticalSection( &pool->cs );
2128
2129 /* Start new worker threads if required. */
2130 if (pool->num_busy_workers >= pool->num_workers &&
2131 pool->num_workers < pool->max_workers)
2132 status = tp_new_worker_thread( pool );
2133
2134 /* Queue work item and increment refcount. */
2135 InterlockedIncrement( &object->refcount );
2136 if (!object->num_pending_callbacks++)
2137 tp_object_prio_queue( object );
2138
2139 /* Count how often the object was signaled. */
2140 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2141 object->u.wait.signaled++;
2142
2143 /* No new thread started - wake up one existing thread. */
2144 if (status != STATUS_SUCCESS)
2145 {
2146 assert( pool->num_workers > 0 );
2147 RtlWakeConditionVariable( &pool->update_event );
2148 }
2149
2150 RtlLeaveCriticalSection( &pool->cs );
2151}
2152
2153/***********************************************************************
2154 * tp_object_cancel (internal)
2155 *
2156 * Cancels all currently pending callbacks for a specific object.
2157 */
2158static void tp_object_cancel( struct threadpool_object *object )
2159{
2160 struct threadpool *pool = object->pool;
2161 LONG pending_callbacks = 0;
2162
2163 RtlEnterCriticalSection( &pool->cs );
2164 if (object->num_pending_callbacks)
2165 {
2166 pending_callbacks = object->num_pending_callbacks;
2167 object->num_pending_callbacks = 0;
2168 list_remove( &object->pool_entry );
2169
2170 if (object->type == TP_OBJECT_TYPE_WAIT)
2171 object->u.wait.signaled = 0;
2172 }
2173 if (object->type == TP_OBJECT_TYPE_IO)
2174 {
2175 object->u.io.skipped_count += object->u.io.pending_count;
2176 object->u.io.pending_count = 0;
2177 }
2178 RtlLeaveCriticalSection( &pool->cs );
2179
2180 while (pending_callbacks--)
2181 tp_object_release( object );
2182}
2183
2184static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
2185{
2186 if (object->num_pending_callbacks)
2187 return FALSE;
2188 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2189 return FALSE;
2190
2191 if (group)
2192 return !object->num_running_callbacks;
2193 else
2194 return !object->num_associated_callbacks;
2195}
2196
2197/***********************************************************************
2198 * tp_object_wait (internal)
2199 *
2200 * Waits until all pending and running callbacks of a specific object
2201 * have been processed.
2202 */
2203static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2204{
2205 struct threadpool *pool = object->pool;
2206
2207 RtlEnterCriticalSection( &pool->cs );
2208 while (!object_is_finished( object, group_wait ))
2209 {
2210 if (group_wait)
2211 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2212 else
2213 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2214 }
2215 RtlLeaveCriticalSection( &pool->cs );
2216}
2217
2218static void tp_ioqueue_unlock( struct threadpool_object *io )
2219{
2220 assert( io->type == TP_OBJECT_TYPE_IO );
2221
2222 RtlEnterCriticalSection( &ioqueue.cs );
2223
2224 assert(ioqueue.objcount);
2225
2226 if (!io->shutdown && !--ioqueue.objcount)
2227 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2228
2229 RtlLeaveCriticalSection( &ioqueue.cs );
2230}
2231
2232/***********************************************************************
2233 * tp_object_prepare_shutdown (internal)
2234 *
2235 * Prepares a threadpool object for shutdown.
2236 */
2237static void tp_object_prepare_shutdown( struct threadpool_object *object )
2238{
2239 if (object->type == TP_OBJECT_TYPE_TIMER)
2240 tp_timerqueue_unlock( object );
2241 else if (object->type == TP_OBJECT_TYPE_WAIT)
2242 tp_waitqueue_unlock( object );
2243 else if (object->type == TP_OBJECT_TYPE_IO)
2244 tp_ioqueue_unlock( object );
2245}
2246
2247/***********************************************************************
2248 * tp_object_release (internal)
2249 *
2250 * Releases a reference to a threadpool object.
2251 */
2252static BOOL tp_object_release( struct threadpool_object *object )
2253{
2254 if (InterlockedDecrement( &object->refcount ))
2255 return FALSE;
2256
2257 TRACE( "destroying object %p of type %u\n", object, object->type );
2258
2259 assert( object->shutdown );
2260 assert( !object->num_pending_callbacks );
2261 assert( !object->num_running_callbacks );
2262 assert( !object->num_associated_callbacks );
2263
2264 /* release reference to the group */
2265 if (object->group)
2266 {
2267 struct threadpool_group *group = object->group;
2268
2269 RtlEnterCriticalSection( &group->cs );
2270 if (object->is_group_member)
2271 {
2272 list_remove( &object->group_entry );
2273 object->is_group_member = FALSE;
2274 }
2275 RtlLeaveCriticalSection( &group->cs );
2276
2277 tp_group_release( group );
2278 }
2279
2280 tp_threadpool_unlock( object->pool );
2281
2282 if (object->race_dll)
2283 LdrUnloadDll( object->race_dll );
2284
2285 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2286 NtSetEvent( object->completed_event, NULL );
2287
2288 RtlFreeHeap( GetProcessHeap(), 0, object );
2289 return TRUE;
2290}
2291
2292static struct list *threadpool_get_next_item( const struct threadpool *pool )
2293{
2294 struct list *ptr;
2295 unsigned int i;
2296
2297 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2298 {
2299 if ((ptr = list_head( &pool->pools[i] )))
2300 break;
2301 }
2302
2303 return ptr;
2304}
2305
2306/***********************************************************************
2307 * tp_object_execute (internal)
2308 *
2309 * Executes a threadpool object callback, object->pool->cs has to be
2310 * held.
2311 */
2312static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2313{
2314 TP_CALLBACK_INSTANCE *callback_instance;
2315 struct threadpool_instance instance;
2316 struct io_completion completion;
2317 struct threadpool *pool = object->pool;
2318 TP_WAIT_RESULT wait_result = 0;
2319 NTSTATUS status;
2320
2321 object->num_pending_callbacks--;
2322
2323 /* For wait objects check if they were signaled or have timed out. */
2324 if (object->type == TP_OBJECT_TYPE_WAIT)
2325 {
2326 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2327 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2328 }
2329 else if (object->type == TP_OBJECT_TYPE_IO)
2330 {
2331 assert( object->u.io.completion_count );
2332 completion = object->u.io.completions[--object->u.io.completion_count];
2333 }
2334
2335 /* Leave critical section and do the actual callback. */
2336 object->num_associated_callbacks++;
2337 object->num_running_callbacks++;
2338 RtlLeaveCriticalSection( &pool->cs );
2339 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2340
2341 /* Initialize threadpool instance struct. */
2342 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2343 instance.object = object;
2344 instance.threadid = GetCurrentThreadId();
2345 instance.associated = TRUE;
2346 instance.may_run_long = object->may_run_long;
2347 instance.cleanup.critical_section = NULL;
2348 instance.cleanup.mutex = NULL;
2349 instance.cleanup.semaphore = NULL;
2350 instance.cleanup.semaphore_count = 0;
2351 instance.cleanup.event = NULL;
2352 instance.cleanup.library = NULL;
2353
2354 switch (object->type)
2355 {
2356 case TP_OBJECT_TYPE_SIMPLE:
2357 {
2358 TRACE( "executing simple callback %p(%p, %p)\n",
2359 object->u.simple.callback, callback_instance, object->userdata );
2360 object->u.simple.callback( callback_instance, object->userdata );
2361 TRACE( "callback %p returned\n", object->u.simple.callback );
2362 break;
2363 }
2364
2365 case TP_OBJECT_TYPE_WORK:
2366 {
2367 TRACE( "executing work callback %p(%p, %p, %p)\n",
2368 object->u.work.callback, callback_instance, object->userdata, object );
2369 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2370 TRACE( "callback %p returned\n", object->u.work.callback );
2371 break;
2372 }
2373
2374 case TP_OBJECT_TYPE_TIMER:
2375 {
2376 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2377 object->u.timer.callback, callback_instance, object->userdata, object );
2378 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2379 TRACE( "callback %p returned\n", object->u.timer.callback );
2380 break;
2381 }
2382
2383 case TP_OBJECT_TYPE_WAIT:
2384 {
2385 TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2386 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2387 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2388 TRACE( "callback %p returned\n", object->u.wait.callback );
2389 break;
2390 }
2391
2392 case TP_OBJECT_TYPE_IO:
2393 {
2394 TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2395 object->u.io.callback, callback_instance, object->userdata,
2396 completion.cvalue, &completion.iosb, (TP_IO *)object );
2397 object->u.io.callback( callback_instance, object->userdata,
2398 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2399 TRACE( "callback %p returned\n", object->u.io.callback );
2400 break;
2401 }
2402
2403 default:
2404 assert(0);
2405 break;
2406 }
2407
2408 /* Execute finalization callback. */
2409 if (object->finalization_callback)
2410 {
2411 TRACE( "executing finalization callback %p(%p, %p)\n",
2412 object->finalization_callback, callback_instance, object->userdata );
2413 object->finalization_callback( callback_instance, object->userdata );
2414 TRACE( "callback %p returned\n", object->finalization_callback );
2415 }
2416
2417 /* Execute cleanup tasks. */
2418 if (instance.cleanup.critical_section)
2419 {
2420 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2421 }
2422 if (instance.cleanup.mutex)
2423 {
2424 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2425 if (status != STATUS_SUCCESS) goto skip_cleanup;
2426 }
2427 if (instance.cleanup.semaphore)
2428 {
2429 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2430 if (status != STATUS_SUCCESS) goto skip_cleanup;
2431 }
2432 if (instance.cleanup.event)
2433 {
2434 status = NtSetEvent( instance.cleanup.event, NULL );
2435 if (status != STATUS_SUCCESS) goto skip_cleanup;
2436 }
2437 if (instance.cleanup.library)
2438 {
2439 LdrUnloadDll( instance.cleanup.library );
2440 }
2441
2442skip_cleanup:
2443 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2444 RtlEnterCriticalSection( &pool->cs );
2445
2446 /* Simple callbacks are automatically shutdown after execution. */
2447 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2448 {
2449 tp_object_prepare_shutdown( object );
2450 object->shutdown = TRUE;
2451 }
2452
2453 object->num_running_callbacks--;
2454 if (object_is_finished( object, TRUE ))
2455 RtlWakeAllConditionVariable( &object->group_finished_event );
2456
2457 if (instance.associated)
2458 {
2459 object->num_associated_callbacks--;
2460 if (object_is_finished( object, FALSE ))
2461 RtlWakeAllConditionVariable( &object->finished_event );
2462 }
2463}
2464
2465/***********************************************************************
2466 * threadpool_worker_proc (internal)
2467 */
2468#ifdef __REACTOS__
2469ULONG NTAPI threadpool_worker_proc(PVOID param )
2470#else
2471static void CALLBACK threadpool_worker_proc( void *param )
2472#endif
2473{
2474 struct threadpool *pool = param;
2475 LARGE_INTEGER timeout;
2476 struct list *ptr;
2477
2478 TRACE( "starting worker thread for pool %p\n", pool );
2479 set_thread_name(L"wine_threadpool_worker");
2480
2481 RtlEnterCriticalSection( &pool->cs );
2482 for (;;)
2483 {
2484 while ((ptr = threadpool_get_next_item( pool )))
2485 {
2486 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2487 assert( object->num_pending_callbacks > 0 );
2488
2489 /* If further pending callbacks are queued, move the work item to
2490 * the end of the pool list. Otherwise remove it from the pool. */
2491 list_remove( &object->pool_entry );
2492 if (object->num_pending_callbacks > 1)
2493 tp_object_prio_queue( object );
2494
2495 tp_object_execute( object, FALSE );
2496
2497 assert(pool->num_busy_workers);
2498 pool->num_busy_workers--;
2499
2500 tp_object_release( object );
2501 }
2502
2503 /* Shutdown worker thread if requested. */
2504 if (pool->shutdown)
2505 break;
2506
2507 /* Wait for new tasks or until the timeout expires. A thread only terminates
2508 * when no new tasks are available, and the number of threads can be
2509 * decreased without violating the min_workers limit. An exception is when
2510 * min_workers == 0, then objcount is used to detect if the last thread
2511 * can be terminated. */
2512 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2513 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2514 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2515 (!pool->min_workers && !pool->objcount)))
2516 {
2517 break;
2518 }
2519 }
2520 pool->num_workers--;
2521 RtlLeaveCriticalSection( &pool->cs );
2522
2523 TRACE( "terminating worker thread for pool %p\n", pool );
2524 tp_threadpool_release( pool );
2525 RtlExitUserThread( 0 );
2526#ifdef __REACTOS__
2527 return STATUS_SUCCESS;
2528#endif
2529}
2530
2531/***********************************************************************
2532 * TpAllocCleanupGroup (NTDLL.@)
2533 */
2534NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2535{
2536 TRACE( "%p\n", out );
2537
2538 return tp_group_alloc( (struct threadpool_group **)out );
2539}
2540
2541/***********************************************************************
2542 * TpAllocIoCompletion (NTDLL.@)
2543 */
2544NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
2545 void *userdata, TP_CALLBACK_ENVIRON *environment )
2546{
2547 struct threadpool_object *object;
2548 struct threadpool *pool;
2549 NTSTATUS status;
2550
2551 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2552
2553 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2554 return STATUS_NO_MEMORY;
2555
2556 if ((status = tp_threadpool_lock( &pool, environment )))
2557 {
2558 RtlFreeHeap( GetProcessHeap(), 0, object );
2559 return status;
2560 }
2561
2562 object->type = TP_OBJECT_TYPE_IO;
2563 object->u.io.callback = callback;
2564 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2565 {
2566 tp_threadpool_unlock( pool );
2567 RtlFreeHeap( GetProcessHeap(), 0, object );
2568 return status;
2569 }
2570
2571 if ((status = tp_ioqueue_lock( object, file )))
2572 {
2573 tp_threadpool_unlock( pool );
2574 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2575 RtlFreeHeap( GetProcessHeap(), 0, object );
2576 return status;
2577 }
2578
2579 tp_object_initialize( object, pool, userdata, environment );
2580
2581 *out = (TP_IO *)object;
2582 return STATUS_SUCCESS;
2583}
2584
2585/***********************************************************************
2586 * TpAllocPool (NTDLL.@)
2587 */
2588NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2589{
2590 TRACE( "%p %p\n", out, reserved );
2591
2592 if (reserved)
2593 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2594
2595 return tp_threadpool_alloc( (struct threadpool **)out );
2596}
2597
2598/***********************************************************************
2599 * TpAllocTimer (NTDLL.@)
2600 */
2601NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2602 TP_CALLBACK_ENVIRON *environment )
2603{
2604 struct threadpool_object *object;
2605 struct threadpool *pool;
2606 NTSTATUS status;
2607
2608 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2609
2610 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2611 if (!object)
2612 return STATUS_NO_MEMORY;
2613
2614 status = tp_threadpool_lock( &pool, environment );
2615 if (status)
2616 {
2617 RtlFreeHeap( GetProcessHeap(), 0, object );
2618 return status;
2619 }
2620
2621 object->type = TP_OBJECT_TYPE_TIMER;
2622 object->u.timer.callback = callback;
2623
2624 status = tp_timerqueue_lock( object );
2625 if (status)
2626 {
2627 tp_threadpool_unlock( pool );
2628 RtlFreeHeap( GetProcessHeap(), 0, object );
2629 return status;
2630 }
2631
2632 tp_object_initialize( object, pool, userdata, environment );
2633
2634 *out = (TP_TIMER *)object;
2635 return STATUS_SUCCESS;
2636}
2637
2638static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2639 TP_CALLBACK_ENVIRON *environment, DWORD flags )
2640{
2641 struct threadpool_object *object;
2642 struct threadpool *pool;
2643 NTSTATUS status;
2644
2645 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2646 if (!object)
2647 return STATUS_NO_MEMORY;
2648
2649 status = tp_threadpool_lock( &pool, environment );
2650 if (status)
2651 {
2652 RtlFreeHeap( GetProcessHeap(), 0, object );
2653 return status;
2654 }
2655
2656 object->type = TP_OBJECT_TYPE_WAIT;
2657 object->u.wait.callback = callback;
2658 object->u.wait.flags = flags;
2659
2660 status = tp_waitqueue_lock( object );
2661 if (status)
2662 {
2663 tp_threadpool_unlock( pool );
2664 RtlFreeHeap( GetProcessHeap(), 0, object );
2665 return status;
2666 }
2667
2668 tp_object_initialize( object, pool, userdata, environment );
2669
2670 *out = (TP_WAIT *)object;
2671 return STATUS_SUCCESS;
2672}
2673
2674/***********************************************************************
2675 * TpAllocWait (NTDLL.@)
2676 */
2677NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2678 TP_CALLBACK_ENVIRON *environment )
2679{
2680 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2681 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2682}
2683
2684/***********************************************************************
2685 * TpAllocWork (NTDLL.@)
2686 */
2687NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2688 TP_CALLBACK_ENVIRON *environment )
2689{
2690 struct threadpool_object *object;
2691 struct threadpool *pool;
2692 NTSTATUS status;
2693
2694 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2695
2696 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2697 if (!object)
2698 return STATUS_NO_MEMORY;
2699
2700 status = tp_threadpool_lock( &pool, environment );
2701 if (status)
2702 {
2703 RtlFreeHeap( GetProcessHeap(), 0, object );
2704 return status;
2705 }
2706
2707 object->type = TP_OBJECT_TYPE_WORK;
2708 object->u.work.callback = callback;
2709 tp_object_initialize( object, pool, userdata, environment );
2710
2711 *out = (TP_WORK *)object;
2712 return STATUS_SUCCESS;
2713}
2714
2715/***********************************************************************
2716 * TpCancelAsyncIoOperation (NTDLL.@)
2717 */
2718void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
2719{
2720 struct threadpool_object *this = impl_from_TP_IO( io );
2721
2722 TRACE( "%p\n", io );
2723
2724 RtlEnterCriticalSection( &this->pool->cs );
2725
2726 TRACE("pending_count %u.\n", this->u.io.pending_count);
2727
2728 this->u.io.pending_count--;
2729 if (object_is_finished( this, TRUE ))
2730 RtlWakeAllConditionVariable( &this->group_finished_event );
2731 if (object_is_finished( this, FALSE ))
2732 RtlWakeAllConditionVariable( &this->finished_event );
2733
2734 RtlLeaveCriticalSection( &this->pool->cs );
2735}
2736
2737/***********************************************************************
2738 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2739 */
2740VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2741{
2742 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2743
2744 TRACE( "%p %p\n", instance, crit );
2745
2746 if (!this->cleanup.critical_section)
2747 this->cleanup.critical_section = crit;
2748}
2749
2750/***********************************************************************
2751 * TpCallbackMayRunLong (NTDLL.@)
2752 */
2753NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2754{
2755 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2756 struct threadpool_object *object = this->object;
2757 struct threadpool *pool;
2758 NTSTATUS status = STATUS_SUCCESS;
2759
2760 TRACE( "%p\n", instance );
2761
2762 if (this->threadid != GetCurrentThreadId())
2763 {
2764 ERR("called from wrong thread, ignoring\n");
2765 return STATUS_UNSUCCESSFUL; /* FIXME */
2766 }
2767
2768 if (this->may_run_long)
2769 return STATUS_SUCCESS;
2770
2771 pool = object->pool;
2772 RtlEnterCriticalSection( &pool->cs );
2773
2774 /* Start new worker threads if required. */
2775 if (pool->num_busy_workers >= pool->num_workers)
2776 {
2777 if (pool->num_workers < pool->max_workers)
2778 {
2779 status = tp_new_worker_thread( pool );
2780 }
2781 else
2782 {
2783 status = STATUS_TOO_MANY_THREADS;
2784 }
2785 }
2786
2787 RtlLeaveCriticalSection( &pool->cs );
2788 this->may_run_long = TRUE;
2789 return status;
2790}
2791
2792/***********************************************************************
2793 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2794 */
2795VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2796{
2797 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2798
2799 TRACE( "%p %p\n", instance, mutex );
2800
2801 if (!this->cleanup.mutex)
2802 this->cleanup.mutex = mutex;
2803}
2804
2805/***********************************************************************
2806 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2807 */
2808VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2809{
2810 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2811
2812 TRACE( "%p %p %lu\n", instance, semaphore, count );
2813
2814 if (!this->cleanup.semaphore)
2815 {
2816 this->cleanup.semaphore = semaphore;
2817 this->cleanup.semaphore_count = count;
2818 }
2819}
2820
2821/***********************************************************************
2822 * TpCallbackSetEventOnCompletion (NTDLL.@)
2823 */
2824VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2825{
2826 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2827
2828 TRACE( "%p %p\n", instance, event );
2829
2830 if (!this->cleanup.event)
2831 this->cleanup.event = event;
2832}
2833
2834/***********************************************************************
2835 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2836 */
2837VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2838{
2839 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2840
2841 TRACE( "%p %p\n", instance, module );
2842
2843 if (!this->cleanup.library)
2844 this->cleanup.library = module;
2845}
2846
2847/***********************************************************************
2848 * TpDisassociateCallback (NTDLL.@)
2849 */
2850VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2851{
2852 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2853 struct threadpool_object *object = this->object;
2854 struct threadpool *pool;
2855
2856 TRACE( "%p\n", instance );
2857
2858 if (this->threadid != GetCurrentThreadId())
2859 {
2860 ERR("called from wrong thread, ignoring\n");
2861 return;
2862 }
2863
2864 if (!this->associated)
2865 return;
2866
2867 pool = object->pool;
2868 RtlEnterCriticalSection( &pool->cs );
2869
2870 object->num_associated_callbacks--;
2871 if (object_is_finished( object, FALSE ))
2872 RtlWakeAllConditionVariable( &object->finished_event );
2873
2874 RtlLeaveCriticalSection( &pool->cs );
2875 this->associated = FALSE;
2876}
2877
2878/***********************************************************************
2879 * TpIsTimerSet (NTDLL.@)
2880 */
2881BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2882{
2883 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2884
2885 TRACE( "%p\n", timer );
2886
2887 return this->u.timer.timer_set;
2888}
2889
2890/***********************************************************************
2891 * TpPostWork (NTDLL.@)
2892 */
2893VOID WINAPI TpPostWork( TP_WORK *work )
2894{
2895 struct threadpool_object *this = impl_from_TP_WORK( work );
2896
2897 TRACE( "%p\n", work );
2898
2899 tp_object_submit( this, FALSE );
2900}
2901
2902/***********************************************************************
2903 * TpReleaseCleanupGroup (NTDLL.@)
2904 */
2905VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2906{
2907 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2908
2909 TRACE( "%p\n", group );
2910
2911 tp_group_shutdown( this );
2912 tp_group_release( this );
2913}
2914
2915/***********************************************************************
2916 * TpReleaseCleanupGroupMembers (NTDLL.@)
2917 */
2918VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2919{
2920 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2921 struct threadpool_object *object, *next;
2922 struct list members;
2923
2924 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2925
2926 RtlEnterCriticalSection( &this->cs );
2927
2928 /* Unset group, increase references, and mark objects for shutdown */
2929 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2930 {
2931 assert( object->group == this );
2932 assert( object->is_group_member );
2933
2934 if (InterlockedIncrement( &object->refcount ) == 1)
2935 {
2936 /* Object is basically already destroyed, but group reference
2937 * was not deleted yet. We can safely ignore this object. */
2938 InterlockedDecrement( &object->refcount );
2939 list_remove( &object->group_entry );
2940 object->is_group_member = FALSE;
2941 continue;
2942 }
2943
2944 object->is_group_member = FALSE;
2945 tp_object_prepare_shutdown( object );
2946 }
2947
2948 /* Move members to a new temporary list */
2949 list_init( &members );
2950 list_move_tail( &members, &this->members );
2951
2952 RtlLeaveCriticalSection( &this->cs );
2953
2954 /* Cancel pending callbacks if requested */
2955 if (cancel_pending)
2956 {
2957 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2958 {
2959 tp_object_cancel( object );
2960 }
2961 }
2962
2963 /* Wait for remaining callbacks to finish */
2964 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2965 {
2966 tp_object_wait( object, TRUE );
2967
2968 if (!object->shutdown)
2969 {
2970 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2971 if (cancel_pending && object->group_cancel_callback)
2972 {
2973 TRACE( "executing group cancel callback %p(%p, %p)\n",
2974 object->group_cancel_callback, object->userdata, userdata );
2975 object->group_cancel_callback( object->userdata, userdata );
2976 TRACE( "callback %p returned\n", object->group_cancel_callback );
2977 }
2978
2979 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2980 tp_object_release( object );
2981 }
2982
2983 object->shutdown = TRUE;
2984 tp_object_release( object );
2985 }
2986}
2987
2988/***********************************************************************
2989 * TpReleaseIoCompletion (NTDLL.@)
2990 */
2991void WINAPI TpReleaseIoCompletion( TP_IO *io )
2992{
2993 struct threadpool_object *this = impl_from_TP_IO( io );
2994 BOOL can_destroy;
2995
2996 TRACE( "%p\n", io );
2997
2998 RtlEnterCriticalSection( &this->pool->cs );
2999 this->u.io.shutting_down = TRUE;
3000 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
3001 RtlLeaveCriticalSection( &this->pool->cs );
3002
3003 if (can_destroy)
3004 {
3005 tp_object_prepare_shutdown( this );
3006 this->shutdown = TRUE;
3007 tp_object_release( this );
3008 }
3009}
3010
3011/***********************************************************************
3012 * TpReleasePool (NTDLL.@)
3013 */
3014VOID WINAPI TpReleasePool( TP_POOL *pool )
3015{
3016 struct threadpool *this = impl_from_TP_POOL( pool );
3017
3018 TRACE( "%p\n", pool );
3019
3020 tp_threadpool_shutdown( this );
3021 tp_threadpool_release( this );
3022}
3023
3024/***********************************************************************
3025 * TpReleaseTimer (NTDLL.@)
3026 */
3027VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
3028{
3029 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3030
3031 TRACE( "%p\n", timer );
3032
3033 tp_object_prepare_shutdown( this );
3034 this->shutdown = TRUE;
3035 tp_object_release( this );
3036}
3037
3038/***********************************************************************
3039 * TpReleaseWait (NTDLL.@)
3040 */
3041VOID WINAPI TpReleaseWait( TP_WAIT *wait )
3042{
3043 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3044
3045 TRACE( "%p\n", wait );
3046
3047 tp_object_prepare_shutdown( this );
3048 this->shutdown = TRUE;
3049 tp_object_release( this );
3050}
3051
3052/***********************************************************************
3053 * TpReleaseWork (NTDLL.@)
3054 */
3055VOID WINAPI TpReleaseWork( TP_WORK *work )
3056{
3057 struct threadpool_object *this = impl_from_TP_WORK( work );
3058
3059 TRACE( "%p\n", work );
3060
3061 tp_object_prepare_shutdown( this );
3062 this->shutdown = TRUE;
3063 tp_object_release( this );
3064}
3065
3066/***********************************************************************
3067 * TpSetPoolMaxThreads (NTDLL.@)
3068 */
3069VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
3070{
3071 struct threadpool *this = impl_from_TP_POOL( pool );
3072
3073 TRACE( "%p %lu\n", pool, maximum );
3074
3075 RtlEnterCriticalSection( &this->cs );
3076 this->max_workers = max( maximum, 1 );
3077 this->min_workers = min( this->min_workers, this->max_workers );
3078 RtlLeaveCriticalSection( &this->cs );
3079}
3080
3081/***********************************************************************
3082 * TpSetPoolMinThreads (NTDLL.@)
3083 */
3084BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
3085{
3086 struct threadpool *this = impl_from_TP_POOL( pool );
3087 NTSTATUS status = STATUS_SUCCESS;
3088
3089 TRACE( "%p %lu\n", pool, minimum );
3090
3091 RtlEnterCriticalSection( &this->cs );
3092
3093 while (this->num_workers < minimum)
3094 {
3095 status = tp_new_worker_thread( this );
3096 if (status != STATUS_SUCCESS)
3097 break;
3098 }
3099
3100 if (status == STATUS_SUCCESS)
3101 {
3102 this->min_workers = minimum;
3103 this->max_workers = max( this->min_workers, this->max_workers );
3104 }
3105
3106 RtlLeaveCriticalSection( &this->cs );
3107 return !status;
3108}
3109
3110/***********************************************************************
3111 * TpSetTimer (NTDLL.@)
3112 */
3113VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
3114{
3115 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3116 struct threadpool_object *other_timer;
3117 BOOL submit_timer = FALSE;
3118 ULONGLONG timestamp;
3119
3120 TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
3121
3122 RtlEnterCriticalSection( &timerqueue.cs );
3123
3124 assert( this->u.timer.timer_initialized );
3125 this->u.timer.timer_set = timeout != NULL;
3126
3127 /* Convert relative timeout to absolute timestamp and handle a timeout
3128 * of zero, which means that the timer is submitted immediately. */
3129 if (timeout)
3130 {
3131 timestamp = timeout->QuadPart;
3132 if ((LONGLONG)timestamp < 0)
3133 {
3134 LARGE_INTEGER now;
3135 NtQuerySystemTime( &now );
3136 timestamp = now.QuadPart - timestamp;
3137 }
3138 else if (!timestamp)
3139 {
3140 if (!period)
3141 timeout = NULL;
3142 else
3143 {
3144 LARGE_INTEGER now;
3145 NtQuerySystemTime( &now );
3146 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3147 }
3148 submit_timer = TRUE;
3149 }
3150 }
3151
3152 /* First remove existing timeout. */
3153 if (this->u.timer.timer_pending)
3154 {
3155 list_remove( &this->u.timer.timer_entry );
3156 this->u.timer.timer_pending = FALSE;
3157 }
3158
3159 /* If the timer was enabled, then add it back to the queue. */
3160 if (timeout)
3161 {
3162 this->u.timer.timeout = timestamp;
3163 this->u.timer.period = period;
3164 this->u.timer.window_length = window_length;
3165
3166 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3167 struct threadpool_object, u.timer.timer_entry )
3168 {
3169 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3170 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3171 break;
3172 }
3173 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3174
3175 /* Wake up the timer thread when the timeout has to be updated. */
3176 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3177 RtlWakeAllConditionVariable( &timerqueue.update_event );
3178
3179 this->u.timer.timer_pending = TRUE;
3180 }
3181
3182 RtlLeaveCriticalSection( &timerqueue.cs );
3183
3184 if (submit_timer)
3185 tp_object_submit( this, FALSE );
3186}
3187
3188/***********************************************************************
3189 * TpSetWait (NTDLL.@)
3190 */
3191VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
3192{
3193 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3194 ULONGLONG timestamp = MAXLONGLONG;
3195
3196 TRACE( "%p %p %p\n", wait, handle, timeout );
3197
3198 RtlEnterCriticalSection( &waitqueue.cs );
3199
3200 assert( this->u.wait.bucket );
3201 this->u.wait.handle = handle;
3202
3203 if (handle || this->u.wait.wait_pending)
3204 {
3205 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3206 list_remove( &this->u.wait.wait_entry );
3207
3208 /* Convert relative timeout to absolute timestamp. */
3209 if (handle && timeout)
3210 {
3211 timestamp = timeout->QuadPart;
3212 if ((LONGLONG)timestamp < 0)
3213 {
3214 LARGE_INTEGER now;
3215 NtQuerySystemTime( &now );
3216 timestamp = now.QuadPart - timestamp;
3217 }
3218 }
3219
3220 /* Add wait object back into one of the queues. */
3221 if (handle)
3222 {
3223 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3224 this->u.wait.wait_pending = TRUE;
3225 this->u.wait.timeout = timestamp;
3226 }
3227 else
3228 {
3229 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3230 this->u.wait.wait_pending = FALSE;
3231 }
3232
3233 /* Wake up the wait queue thread. */
3234 NtSetEvent( bucket->update_event, NULL );
3235 }
3236
3237 RtlLeaveCriticalSection( &waitqueue.cs );
3238}
3239
3240/***********************************************************************
3241 * TpSimpleTryPost (NTDLL.@)
3242 */
3243NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
3244 TP_CALLBACK_ENVIRON *environment )
3245{
3246 struct threadpool_object *object;
3247 struct threadpool *pool;
3248 NTSTATUS status;
3249
3250 TRACE( "%p %p %p\n", callback, userdata, environment );
3251
3252 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3253 if (!object)
3254 return STATUS_NO_MEMORY;
3255
3256 status = tp_threadpool_lock( &pool, environment );
3257 if (status)
3258 {
3259 RtlFreeHeap( GetProcessHeap(), 0, object );
3260 return status;
3261 }
3262
3263 object->type = TP_OBJECT_TYPE_SIMPLE;
3264 object->u.simple.callback = callback;
3265 tp_object_initialize( object, pool, userdata, environment );
3266
3267 return STATUS_SUCCESS;
3268}
3269
3270/***********************************************************************
3271 * TpStartAsyncIoOperation (NTDLL.@)
3272 */
3273void WINAPI TpStartAsyncIoOperation( TP_IO *io )
3274{
3275 struct threadpool_object *this = impl_from_TP_IO( io );
3276
3277 TRACE( "%p\n", io );
3278
3279 RtlEnterCriticalSection( &this->pool->cs );
3280
3281 this->u.io.pending_count++;
3282
3283 RtlLeaveCriticalSection( &this->pool->cs );
3284}
3285
3286/***********************************************************************
3287 * TpWaitForIoCompletion (NTDLL.@)
3288 */
3289void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3290{
3291 struct threadpool_object *this = impl_from_TP_IO( io );
3292
3293 TRACE( "%p %d\n", io, cancel_pending );
3294
3295 if (cancel_pending)
3296 tp_object_cancel( this );
3297 tp_object_wait( this, FALSE );
3298}
3299
3300/***********************************************************************
3301 * TpWaitForTimer (NTDLL.@)
3302 */
3303VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
3304{
3305 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3306
3307 TRACE( "%p %d\n", timer, cancel_pending );
3308
3309 if (cancel_pending)
3310 tp_object_cancel( this );
3311 tp_object_wait( this, FALSE );
3312}
3313
3314/***********************************************************************
3315 * TpWaitForWait (NTDLL.@)
3316 */
3317VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
3318{
3319 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3320
3321 TRACE( "%p %d\n", wait, cancel_pending );
3322
3323 if (cancel_pending)
3324 tp_object_cancel( this );
3325 tp_object_wait( this, FALSE );
3326}
3327
3328/***********************************************************************
3329 * TpWaitForWork (NTDLL.@)
3330 */
3331VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
3332{
3333 struct threadpool_object *this = impl_from_TP_WORK( work );
3334
3335 TRACE( "%p %u\n", work, cancel_pending );
3336
3337 if (cancel_pending)
3338 tp_object_cancel( this );
3339 tp_object_wait( this, FALSE );
3340}
3341
3342/***********************************************************************
3343 * TpSetPoolStackInformation (NTDLL.@)
3344 */
3345NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3346{
3347 struct threadpool *this = impl_from_TP_POOL( pool );
3348
3349 TRACE( "%p %p\n", pool, stack_info );
3350
3351 if (!stack_info)
3352 return STATUS_INVALID_PARAMETER;
3353
3354 RtlEnterCriticalSection( &this->cs );
3355 this->stack_info = *stack_info;
3356 RtlLeaveCriticalSection( &this->cs );
3357
3358 return STATUS_SUCCESS;
3359}
3360
3361/***********************************************************************
3362 * TpQueryPoolStackInformation (NTDLL.@)
3363 */
3364NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3365{
3366 struct threadpool *this = impl_from_TP_POOL( pool );
3367
3368 TRACE( "%p %p\n", pool, stack_info );
3369
3370 if (!stack_info)
3371 return STATUS_INVALID_PARAMETER;
3372
3373 RtlEnterCriticalSection( &this->cs );
3374 *stack_info = this->stack_info;
3375 RtlLeaveCriticalSection( &this->cs );
3376
3377 return STATUS_SUCCESS;
3378}
3379
3380#ifndef __REACTOS__
3381static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result )
3382{
3383 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3384 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3385}
3386
3387/***********************************************************************
3388 * RtlRegisterWait (NTDLL.@)
3389 *
3390 * Registers a wait for a handle to become signaled.
3391 *
3392 * PARAMS
3393 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3394 * Object [I] Object to wait to become signaled.
3395 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3396 * Context [I] Context to pass to the callback function when it is executed.
3397 * Milliseconds [I] Number of milliseconds to wait before timing out.
3398 * Flags [I] Flags. See notes.
3399 *
3400 * RETURNS
3401 * Success: STATUS_SUCCESS.
3402 * Failure: Any NTSTATUS code.
3403 *
3404 * NOTES
3405 * Flags can be one or more of the following:
3406 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3407 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3408 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3409 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3410 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3411 */
3412NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback,
3413 void *context, ULONG milliseconds, ULONG flags )
3414{
3415 struct threadpool_object *object;
3416 TP_CALLBACK_ENVIRON environment;
3417 LARGE_INTEGER timeout;
3418 NTSTATUS status;
3419 TP_WAIT *wait;
3420
3421 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3422 out, handle, callback, context, milliseconds, flags );
3423
3424 memset( &environment, 0, sizeof(environment) );
3425 environment.Version = 1;
3426 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3427 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3428
3429 flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD);
3430 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3431 return status;
3432
3433 object = impl_from_TP_WAIT(wait);
3434 object->u.wait.rtl_callback = callback;
3435
3436 RtlEnterCriticalSection( &waitqueue.cs );
3437 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3438
3439 *out = object;
3440 RtlLeaveCriticalSection( &waitqueue.cs );
3441
3442 return STATUS_SUCCESS;
3443}
3444
3445/***********************************************************************
3446 * RtlDeregisterWaitEx (NTDLL.@)
3447 *
3448 * Cancels a wait operation and frees the resources associated with calling
3449 * RtlRegisterWait().
3450 *
3451 * PARAMS
3452 * WaitObject [I] Handle to the wait object to free.
3453 *
3454 * RETURNS
3455 * Success: STATUS_SUCCESS.
3456 * Failure: Any NTSTATUS code.
3457 */
3458NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event )
3459{
3460 struct threadpool_object *object = handle;
3461 NTSTATUS status;
3462
3463 TRACE( "handle %p, event %p\n", handle, event );
3464
3465 if (!object) return STATUS_INVALID_HANDLE;
3466
3467 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3468
3469 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3470 else
3471 {
3472 assert( object->completed_event == NULL );
3473 object->completed_event = event;
3474 }
3475
3476 RtlEnterCriticalSection( &object->pool->cs );
3477 if (object->num_pending_callbacks + object->num_running_callbacks
3478 + object->num_associated_callbacks) status = STATUS_PENDING;
3479 else status = STATUS_SUCCESS;
3480 RtlLeaveCriticalSection( &object->pool->cs );
3481
3482 TpReleaseWait( (TP_WAIT *)object );
3483 return status;
3484}
3485
3486/***********************************************************************
3487 * RtlDeregisterWait (NTDLL.@)
3488 *
3489 * Cancels a wait operation and frees the resources associated with calling
3490 * RtlRegisterWait().
3491 *
3492 * PARAMS
3493 * WaitObject [I] Handle to the wait object to free.
3494 *
3495 * RETURNS
3496 * Success: STATUS_SUCCESS.
3497 * Failure: Any NTSTATUS code.
3498 */
3499NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
3500{
3501 return RtlDeregisterWaitEx(WaitHandle, NULL);
3502}
3503#endif
3504
3505#ifdef __REACTOS__
3506VOID
3507NTAPI
3508RtlpInitializeThreadPooling(
3509 VOID)
3510{
3511 RtlInitializeCriticalSection(&old_threadpool.threadpool_compl_cs);
3512 RtlInitializeCriticalSection(&timerqueue.cs);
3513 RtlInitializeCriticalSection(&waitqueue.cs);
3514 RtlInitializeCriticalSection(&ioqueue.cs);
3515}
3516#endif