Commit 7ba4d119 authored by Sebastian Lackner's avatar Sebastian Lackner Committed by Alexandre Julliard

ntdll: Use condition variable for RtlQueueWorkItem implementation.

parent 05863c07
...@@ -39,12 +39,13 @@ WINE_DEFAULT_DEBUG_CHANNEL(threadpool); ...@@ -39,12 +39,13 @@ WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
#define WORKER_TIMEOUT 30000 /* 30 seconds */ #define WORKER_TIMEOUT 30000 /* 30 seconds */
/* threadpool_cs must be held while modifying the following elements */
static struct list work_item_list = LIST_INIT(work_item_list);
static LONG num_workers; static LONG num_workers;
static LONG num_work_items;
static LONG num_busy_workers; static LONG num_busy_workers;
static LONG num_items_processed;
static struct list work_item_list = LIST_INIT(work_item_list); static RTL_CONDITION_VARIABLE threadpool_cond = RTL_CONDITION_VARIABLE_INIT;
static HANDLE work_item_event;
static RTL_CRITICAL_SECTION threadpool_cs; static RTL_CRITICAL_SECTION threadpool_cs;
static RTL_CRITICAL_SECTION_DEBUG critsect_debug = static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
...@@ -84,80 +85,44 @@ static inline LONG interlocked_dec( PLONG dest ) ...@@ -84,80 +85,44 @@ static inline LONG interlocked_dec( PLONG dest )
static void WINAPI worker_thread_proc(void * param) static void WINAPI worker_thread_proc(void * param)
{ {
interlocked_inc(&num_workers); struct list *item;
struct work_item *work_item_ptr, work_item;
/* free the work item memory sooner to reduce memory usage */ LARGE_INTEGER timeout;
while (TRUE) timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
{
if (num_work_items > 0)
{
struct list *item;
RtlEnterCriticalSection(&threadpool_cs);
item = list_head(&work_item_list);
if (item)
{
struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry);
struct work_item work_item;
list_remove(&work_item_ptr->entry);
interlocked_dec(&num_work_items);
RtlLeaveCriticalSection(&threadpool_cs);
work_item = *work_item_ptr;
RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
TRACE("executing %p(%p)\n", work_item.function, work_item.context);
interlocked_inc(&num_busy_workers);
/* do the work */ RtlEnterCriticalSection( &threadpool_cs );
work_item.function(work_item.context); num_workers++;
interlocked_dec(&num_busy_workers); for (;;)
} {
else if ((item = list_head( &work_item_list )))
RtlLeaveCriticalSection(&threadpool_cs);
}
else
{ {
NTSTATUS status; work_item_ptr = LIST_ENTRY( item, struct work_item, entry );
LARGE_INTEGER timeout; list_remove( &work_item_ptr->entry );
timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000); num_busy_workers++;
status = NtWaitForSingleObject(work_item_event, FALSE, &timeout); num_items_processed++;
if (status != STATUS_WAIT_0) RtlLeaveCriticalSection( &threadpool_cs );
break;
/* copy item to stack and do the work */
work_item = *work_item_ptr;
RtlFreeHeap( GetProcessHeap(), 0, work_item_ptr );
TRACE("executing %p(%p)\n", work_item.function, work_item.context);
work_item.function( work_item.context );
RtlEnterCriticalSection( &threadpool_cs );
num_busy_workers--;
} }
else if (RtlSleepConditionVariableCS( &threadpool_cond, &threadpool_cs, &timeout ) != STATUS_SUCCESS)
break;
} }
interlocked_dec(&num_workers); num_workers--;
RtlLeaveCriticalSection( &threadpool_cs );
RtlExitUserThread(0); RtlExitUserThread( 0 );
/* never reached */ /* never reached */
} }
static NTSTATUS add_work_item_to_queue(struct work_item *work_item)
{
NTSTATUS status;
RtlEnterCriticalSection(&threadpool_cs);
list_add_tail(&work_item_list, &work_item->entry);
num_work_items++;
RtlLeaveCriticalSection(&threadpool_cs);
if (!work_item_event)
{
HANDLE sem;
status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, INT_MAX);
if (interlocked_cmpxchg_ptr( &work_item_event, sem, 0 ))
NtClose(sem); /* somebody beat us to it */
}
else
status = NtReleaseSemaphore(work_item_event, 1, NULL);
return status;
}
/*********************************************************************** /***********************************************************************
* RtlQueueWorkItem (NTDLL.@) * RtlQueueWorkItem (NTDLL.@)
* *
...@@ -184,6 +149,7 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, ...@@ -184,6 +149,7 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context,
{ {
HANDLE thread; HANDLE thread;
NTSTATUS status; NTSTATUS status;
LONG items_processed;
struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item)); struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item));
if (!work_item) if (!work_item)
...@@ -195,39 +161,40 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, ...@@ -195,39 +161,40 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context,
if (Flags & ~WT_EXECUTELONGFUNCTION) if (Flags & ~WT_EXECUTELONGFUNCTION)
FIXME("Flags 0x%x not supported\n", Flags); FIXME("Flags 0x%x not supported\n", Flags);
status = add_work_item_to_queue(work_item); RtlEnterCriticalSection( &threadpool_cs );
list_add_tail( &work_item_list, &work_item->entry );
status = (num_workers > num_busy_workers) ? STATUS_SUCCESS : STATUS_UNSUCCESSFUL;
items_processed = num_items_processed;
RtlLeaveCriticalSection( &threadpool_cs );
/* FIXME: tune this algorithm to not be as aggressive with creating threads /* FIXME: tune this algorithm to not be as aggressive with creating threads
* if WT_EXECUTELONGFUNCTION isn't specified */ * if WT_EXECUTELONGFUNCTION isn't specified */
if ((status == STATUS_SUCCESS) && if (status == STATUS_SUCCESS)
((num_workers == 0) || (num_workers == num_busy_workers))) RtlWakeConditionVariable( &threadpool_cond );
else
{ {
status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
NULL, 0, 0, worker_thread_proc, NULL, &thread, NULL );
worker_thread_proc, NULL, &thread, NULL );
if (status == STATUS_SUCCESS)
NtClose( thread );
/* NOTE: we don't care if we couldn't create the thread if there is at /* NOTE: we don't care if we couldn't create the thread if there is at
* least one other available to process the request */ * least one other available to process the request */
if ((num_workers > 0) && (status != STATUS_SUCCESS)) if (status == STATUS_SUCCESS)
status = STATUS_SUCCESS; NtClose( thread );
} else
{
if (status != STATUS_SUCCESS) RtlEnterCriticalSection( &threadpool_cs );
{ if (num_workers > 0 || num_items_processed != items_processed)
RtlEnterCriticalSection(&threadpool_cs); status = STATUS_SUCCESS;
else
interlocked_dec(&num_work_items); list_remove( &work_item->entry );
list_remove(&work_item->entry); RtlLeaveCriticalSection( &threadpool_cs );
RtlFreeHeap(GetProcessHeap(), 0, work_item);
RtlLeaveCriticalSection(&threadpool_cs);
return status; if (status != STATUS_SUCCESS)
RtlFreeHeap( GetProcessHeap(), 0, work_item );
}
} }
return STATUS_SUCCESS; return status;
} }
/*********************************************************************** /***********************************************************************
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment