Commit db9fc914 authored by Sebastian Lackner's avatar Sebastian Lackner Committed by Alexandre Julliard

ntdll: Implement TpSimpleTryPost and basic threadpool infrastructure.

parent b6cf2653
...@@ -970,6 +970,9 @@ ...@@ -970,6 +970,9 @@
@ stdcall RtlxOemStringToUnicodeSize(ptr) RtlOemStringToUnicodeSize @ stdcall RtlxOemStringToUnicodeSize(ptr) RtlOemStringToUnicodeSize
@ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize @ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize
@ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize @ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
@ stdcall TpAllocPool(ptr ptr)
@ stdcall TpReleasePool(ptr)
@ stdcall TpSimpleTryPost(ptr ptr ptr)
@ stdcall -ret64 VerSetConditionMask(int64 long long) @ stdcall -ret64 VerSetConditionMask(int64 long long)
@ stdcall WinSqmIsOptedIn() @ stdcall WinSqmIsOptedIn()
@ stdcall ZwAcceptConnectPort(ptr long ptr long long ptr) NtAcceptConnectPort @ stdcall ZwAcceptConnectPort(ptr long ptr long long ptr) NtAcceptConnectPort
......
...@@ -48,7 +48,7 @@ static BOOL init_threadpool(void) ...@@ -48,7 +48,7 @@ static BOOL init_threadpool(void)
if (!pTpAllocPool) if (!pTpAllocPool)
{ {
skip("Threadpool functions not supported, skipping tests\n"); win_skip("Threadpool functions not supported, skipping tests\n");
return FALSE; return FALSE;
} }
...@@ -105,6 +105,7 @@ static void test_tp_simple(void) ...@@ -105,6 +105,7 @@ static void test_tp_simple(void)
environment.Version = 9999; environment.Version = 9999;
environment.Pool = pool; environment.Pool = pool;
status = pTpSimpleTryPost(simple_cb, semaphore, &environment); status = pTpSimpleTryPost(simple_cb, semaphore, &environment);
todo_wine
ok(status == STATUS_INVALID_PARAMETER || broken(!status) /* Vista/2008 */, ok(status == STATUS_INVALID_PARAMETER || broken(!status) /* Vista/2008 */,
"TpSimpleTryPost unexpectedly returned status %x\n", status); "TpSimpleTryPost unexpectedly returned status %x\n", status);
if (!status) if (!status)
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
* Thread pooling * Thread pooling
* *
* Copyright (c) 2006 Robert Shearman * Copyright (c) 2006 Robert Shearman
* Copyright (c) 2014-2015 Sebastian Lackner
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
...@@ -37,6 +38,10 @@ ...@@ -37,6 +38,10 @@
WINE_DEFAULT_DEBUG_CHANNEL(threadpool); WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
/*
* Old thread pooling API
*/
#define OLD_WORKER_TIMEOUT 30000 /* 30 seconds */ #define OLD_WORKER_TIMEOUT 30000 /* 30 seconds */
#define EXPIRE_NEVER (~(ULONGLONG)0) #define EXPIRE_NEVER (~(ULONGLONG)0)
#define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */ #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
...@@ -127,6 +132,65 @@ struct timer_queue ...@@ -127,6 +132,65 @@ struct timer_queue
HANDLE thread; HANDLE thread;
}; };
/*
* Object-oriented thread pooling API
*/
#define THREADPOOL_WORKER_TIMEOUT 5000
/* internal threadpool representation */
struct threadpool
{
LONG refcount;
LONG objcount;
BOOL shutdown;
CRITICAL_SECTION cs;
/* pool of work items, locked via .cs */
struct list pool;
RTL_CONDITION_VARIABLE update_event;
/* information about worker threads, locked via .cs */
int max_workers;
int min_workers;
int num_workers;
int num_busy_workers;
};
enum threadpool_objtype
{
TP_OBJECT_TYPE_SIMPLE
};
/* internal threadpool object representation */
struct threadpool_object
{
LONG refcount;
BOOL shutdown;
/* read-only information */
enum threadpool_objtype type;
struct threadpool *pool;
PVOID userdata;
/* information about the pool, locked via .pool->cs */
struct list pool_entry;
LONG num_pending_callbacks;
LONG num_running_callbacks;
/* arguments for callback */
union
{
struct
{
PTP_SIMPLE_CALLBACK callback;
} simple;
} u;
};
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
{
return (struct threadpool *)pool;
}
static void CALLBACK threadpool_worker_proc( void *param );
static struct threadpool *default_threadpool = NULL;
static inline LONG interlocked_inc( PLONG dest ) static inline LONG interlocked_inc( PLONG dest )
{ {
return interlocked_xchg_add( dest, 1 ) + 1; return interlocked_xchg_add( dest, 1 ) + 1;
...@@ -1044,3 +1108,393 @@ NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer, ...@@ -1044,3 +1108,393 @@ NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
return status; return status;
} }
/***********************************************************************
* tp_threadpool_alloc (internal)
*
* Allocates a new threadpool object.
*/
static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
{
struct threadpool *pool;
pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
if (!pool)
return STATUS_NO_MEMORY;
pool->refcount = 1;
pool->objcount = 0;
pool->shutdown = FALSE;
RtlInitializeCriticalSection( &pool->cs );
pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
list_init( &pool->pool );
RtlInitializeConditionVariable( &pool->update_event );
pool->max_workers = 500;
pool->min_workers = 0;
pool->num_workers = 0;
pool->num_busy_workers = 0;
TRACE( "allocated threadpool %p\n", pool );
*out = pool;
return STATUS_SUCCESS;
}
/***********************************************************************
* tp_threadpool_shutdown (internal)
*
* Prepares the shutdown of a threadpool object and notifies all worker
* threads to terminate (after all remaining work items have been
* processed).
*/
static void tp_threadpool_shutdown( struct threadpool *pool )
{
assert( pool != default_threadpool );
pool->shutdown = TRUE;
RtlWakeAllConditionVariable( &pool->update_event );
}
/***********************************************************************
* tp_threadpool_release (internal)
*
* Releases a reference to a threadpool object.
*/
static BOOL tp_threadpool_release( struct threadpool *pool )
{
if (interlocked_dec( &pool->refcount ))
return FALSE;
TRACE( "destroying threadpool %p\n", pool );
assert( pool->shutdown );
assert( !pool->objcount );
assert( list_empty( &pool->pool ) );
pool->cs.DebugInfo->Spare[0] = 0;
RtlDeleteCriticalSection( &pool->cs );
RtlFreeHeap( GetProcessHeap(), 0, pool );
return TRUE;
}
/***********************************************************************
* tp_threadpool_lock (internal)
*
* Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
* block. When the lock is acquired successfully, it is guaranteed that
* there is at least one worker thread to process tasks.
*/
static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
{
struct threadpool *pool = NULL;
NTSTATUS status = STATUS_SUCCESS;
if (environment)
pool = (struct threadpool *)environment->Pool;
if (!pool)
{
if (!default_threadpool)
{
status = tp_threadpool_alloc( &pool );
if (status != STATUS_SUCCESS)
return status;
if (interlocked_cmpxchg_ptr( (void *)&default_threadpool, pool, NULL ) != NULL)
{
tp_threadpool_shutdown( pool );
tp_threadpool_release( pool );
}
}
pool = default_threadpool;
}
RtlEnterCriticalSection( &pool->cs );
/* Make sure that the threadpool has at least one thread. */
if (!pool->num_workers)
{
HANDLE thread;
status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
threadpool_worker_proc, pool, &thread, NULL );
if (status == STATUS_SUCCESS)
{
interlocked_inc( &pool->refcount );
pool->num_workers++;
NtClose( thread );
}
}
/* Keep a reference, and increment objcount to ensure that the
* last thread doesn't terminate. */
if (status == STATUS_SUCCESS)
{
interlocked_inc( &pool->refcount );
pool->objcount++;
}
RtlLeaveCriticalSection( &pool->cs );
if (status != STATUS_SUCCESS)
return status;
*out = pool;
return STATUS_SUCCESS;
}
/***********************************************************************
* tp_threadpool_unlock (internal)
*
* Releases a lock on a threadpool.
*/
static void tp_threadpool_unlock( struct threadpool *pool )
{
RtlEnterCriticalSection( &pool->cs );
pool->objcount--;
RtlLeaveCriticalSection( &pool->cs );
tp_threadpool_release( pool );
}
/***********************************************************************
* tp_object_initialize (internal)
*
* Initializes members of a threadpool object.
*/
static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
PVOID userdata, TP_CALLBACK_ENVIRON *environment )
{
object->refcount = 1;
object->shutdown = FALSE;
object->pool = pool;
object->userdata = userdata;
memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
object->num_pending_callbacks = 0;
object->num_running_callbacks = 0;
if (environment)
FIXME( "environment not implemented yet\n" );
TRACE( "allocated object %p of type %u\n", object, object->type );
}
/***********************************************************************
* tp_object_submit (internal)
*
* Submits a threadpool object to the associcated threadpool. This
* function has to be VOID because TpPostWork can never fail on Windows.
*/
static void tp_object_submit( struct threadpool_object *object )
{
struct threadpool *pool = object->pool;
NTSTATUS status = STATUS_UNSUCCESSFUL;
assert( !object->shutdown );
assert( !pool->shutdown );
RtlEnterCriticalSection( &pool->cs );
/* Start new worker threads if required. */
if (pool->num_busy_workers >= pool->num_workers &&
pool->num_workers < pool->max_workers)
{
HANDLE thread;
status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
threadpool_worker_proc, pool, &thread, NULL );
if (status == STATUS_SUCCESS)
{
interlocked_inc( &pool->refcount );
pool->num_workers++;
NtClose( thread );
}
}
/* Queue work item and increment refcount. */
interlocked_inc( &object->refcount );
if (!object->num_pending_callbacks++)
list_add_tail( &pool->pool, &object->pool_entry );
/* No new thread started - wake up one existing thread. */
if (status != STATUS_SUCCESS)
{
assert( pool->num_workers > 0 );
RtlWakeConditionVariable( &pool->update_event );
}
RtlLeaveCriticalSection( &pool->cs );
}
/***********************************************************************
* tp_object_shutdown (internal)
*
* Marks a threadpool object for shutdown (which means that no further
* tasks can be submitted).
*/
static void tp_object_shutdown( struct threadpool_object *object )
{
object->shutdown = TRUE;
}
/***********************************************************************
* tp_object_release (internal)
*
* Releases a reference to a threadpool object.
*/
static BOOL tp_object_release( struct threadpool_object *object )
{
if (interlocked_dec( &object->refcount ))
return FALSE;
TRACE( "destroying object %p of type %u\n", object, object->type );
assert( object->shutdown );
assert( !object->num_pending_callbacks );
assert( !object->num_running_callbacks );
tp_threadpool_unlock( object->pool );
RtlFreeHeap( GetProcessHeap(), 0, object );
return TRUE;
}
/***********************************************************************
* threadpool_worker_proc (internal)
*/
static void CALLBACK threadpool_worker_proc( void *param )
{
struct threadpool *pool = param;
LARGE_INTEGER timeout;
struct list *ptr;
TRACE( "starting worker thread for pool %p\n", pool );
RtlEnterCriticalSection( &pool->cs );
for (;;)
{
while ((ptr = list_head( &pool->pool )))
{
struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
assert( object->num_pending_callbacks > 0 );
/* If further pending callbacks are queued, move the work item to
* the end of the pool list. Otherwise remove it from the pool. */
list_remove( &object->pool_entry );
if (--object->num_pending_callbacks)
list_add_tail( &pool->pool, &object->pool_entry );
/* Leave critical section and do the actual callback. */
object->num_running_callbacks++;
pool->num_busy_workers++;
RtlLeaveCriticalSection( &pool->cs );
switch (object->type)
{
case TP_OBJECT_TYPE_SIMPLE:
{
TRACE( "executing simple callback %p(NULL, %p)\n",
object->u.simple.callback, object->userdata );
object->u.simple.callback( NULL, object->userdata );
TRACE( "callback %p returned\n", object->u.simple.callback );
break;
}
default:
assert(0);
break;
}
RtlEnterCriticalSection( &pool->cs );
pool->num_busy_workers--;
object->num_running_callbacks--;
tp_object_release( object );
}
/* Shutdown worker thread if requested. */
if (pool->shutdown)
break;
/* Wait for new tasks or until the timeout expires. A thread only terminates
* when no new tasks are available, and the number of threads can be
* decreased without violating the min_workers limit. An exception is when
* min_workers == 0, then objcount is used to detect if the last thread
* can be terminated. */
timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
!list_head( &pool->pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
(!pool->min_workers && !pool->objcount)))
{
break;
}
}
pool->num_workers--;
RtlLeaveCriticalSection( &pool->cs );
TRACE( "terminating worker thread for pool %p\n", pool );
tp_threadpool_release( pool );
}
/***********************************************************************
* TpAllocPool (NTDLL.@)
*/
NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
{
TRACE( "%p %p\n", out, reserved );
if (reserved)
FIXME( "reserved argument is nonzero (%p)", reserved );
return tp_threadpool_alloc( (struct threadpool **)out );
}
/***********************************************************************
* TpReleasePool (NTDLL.@)
*/
VOID WINAPI TpReleasePool( TP_POOL *pool )
{
struct threadpool *this = impl_from_TP_POOL( pool );
TRACE( "%p\n", pool );
tp_threadpool_shutdown( this );
tp_threadpool_release( this );
}
/***********************************************************************
* TpSimpleTryPost (NTDLL.@)
*/
NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
TP_CALLBACK_ENVIRON *environment )
{
struct threadpool_object *object;
struct threadpool *pool;
NTSTATUS status;
TRACE( "%p %p %p\n", callback, userdata, environment );
object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
if (!object)
return STATUS_NO_MEMORY;
status = tp_threadpool_lock( &pool, environment );
if (status)
{
RtlFreeHeap( GetProcessHeap(), 0, object );
return status;
}
object->type = TP_OBJECT_TYPE_SIMPLE;
object->u.simple.callback = callback;
tp_object_initialize( object, pool, userdata, environment );
tp_object_submit( object );
tp_object_shutdown( object );
tp_object_release( object );
return STATUS_SUCCESS;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment