Commit 68d92bce authored by Sebastian Lackner's avatar Sebastian Lackner Committed by Alexandre Julliard

ntdll: Implement threadpool cleanup group functions.

parent 7d9ec31e
......@@ -970,7 +970,10 @@
@ stdcall RtlxOemStringToUnicodeSize(ptr) RtlOemStringToUnicodeSize
@ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize
@ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
@ stdcall TpAllocCleanupGroup(ptr)
@ stdcall TpAllocPool(ptr ptr)
@ stdcall TpReleaseCleanupGroup(ptr)
@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
@ stdcall TpReleasePool(ptr)
@ stdcall TpSetPoolMaxThreads(ptr long)
@ stdcall TpSetPoolMinThreads(ptr long)
......
......@@ -168,9 +168,14 @@ struct threadpool_object
/* read-only information */
enum threadpool_objtype type;
struct threadpool *pool;
struct threadpool_group *group;
PVOID userdata;
/* information about the group, locked via .group->cs */
struct list group_entry;
BOOL is_group_member;
/* information about the pool, locked via .pool->cs */
struct list pool_entry;
RTL_CONDITION_VARIABLE finished_event;
LONG num_pending_callbacks;
LONG num_running_callbacks;
/* arguments for callback */
......@@ -183,12 +188,30 @@ struct threadpool_object
} u;
};
/* internal threadpool group representation */
struct threadpool_group
{
LONG refcount;
BOOL shutdown;
CRITICAL_SECTION cs;
/* list of group members, locked via .cs */
struct list members;
};
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
{
return (struct threadpool *)pool;
}
static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
{
return (struct threadpool_group *)group;
}
static void CALLBACK threadpool_worker_proc( void *param );
static void tp_object_submit( struct threadpool_object *object );
static void tp_object_shutdown( struct threadpool_object *object );
static BOOL tp_object_release( struct threadpool_object *object );
static struct threadpool *default_threadpool = NULL;
static inline LONG interlocked_inc( PLONG dest )
......@@ -1261,6 +1284,65 @@ static void tp_threadpool_unlock( struct threadpool *pool )
}
/***********************************************************************
* tp_group_alloc (internal)
*
* Allocates a new threadpool group object.
*/
static NTSTATUS tp_group_alloc( struct threadpool_group **out )
{
struct threadpool_group *group;
group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
if (!group)
return STATUS_NO_MEMORY;
group->refcount = 1;
group->shutdown = FALSE;
RtlInitializeCriticalSection( &group->cs );
group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
list_init( &group->members );
TRACE( "allocated group %p\n", group );
*out = group;
return STATUS_SUCCESS;
}
/***********************************************************************
* tp_group_shutdown (internal)
*
* Marks the group object for shutdown.
*/
static void tp_group_shutdown( struct threadpool_group *group )
{
group->shutdown = TRUE;
}
/***********************************************************************
* tp_group_release (internal)
*
* Releases a reference to a group object.
*/
static BOOL tp_group_release( struct threadpool_group *group )
{
if (interlocked_dec( &group->refcount ))
return FALSE;
TRACE( "destroying group %p\n", group );
assert( group->shutdown );
assert( list_empty( &group->members ) );
group->cs.DebugInfo->Spare[0] = 0;
RtlDeleteCriticalSection( &group->cs );
RtlFreeHeap( GetProcessHeap(), 0, group );
return TRUE;
}
/***********************************************************************
* tp_object_initialize (internal)
*
* Initializes members of a threadpool object.
......@@ -1268,20 +1350,58 @@ static void tp_threadpool_unlock( struct threadpool *pool )
static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
PVOID userdata, TP_CALLBACK_ENVIRON *environment )
{
BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
object->refcount = 1;
object->shutdown = FALSE;
object->pool = pool;
object->group = NULL;
object->userdata = userdata;
memset( &object->group_entry, 0, sizeof(object->group_entry) );
object->is_group_member = FALSE;
memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
RtlInitializeConditionVariable( &object->finished_event );
object->num_pending_callbacks = 0;
object->num_running_callbacks = 0;
if (environment)
FIXME( "environment not implemented yet\n" );
{
if (environment->Version != 1)
FIXME( "unsupported environment version %u\n", environment->Version );
object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
WARN( "environment not fully implemented yet\n" );
}
TRACE( "allocated object %p of type %u\n", object, object->type );
/* For simple callbacks we have to run tp_object_submit before adding this object
* to the cleanup group. As soon as the cleanup group members are released ->shutdown
* will be set, and tp_object_submit would fail with an assertion. */
if (is_simple_callback)
tp_object_submit( object );
if (object->group)
{
struct threadpool_group *group = object->group;
interlocked_inc( &group->refcount );
RtlEnterCriticalSection( &group->cs );
list_add_tail( &group->members, &object->group_entry );
object->is_group_member = TRUE;
RtlLeaveCriticalSection( &group->cs );
}
if (is_simple_callback)
{
tp_object_shutdown( object );
tp_object_release( object );
}
}
/***********************************************************************
......@@ -1331,6 +1451,45 @@ static void tp_object_submit( struct threadpool_object *object )
}
/***********************************************************************
* tp_object_cancel (internal)
*
* Cancels all currently pending callbacks for a specific object.
*/
static void tp_object_cancel( struct threadpool_object *object )
{
struct threadpool *pool = object->pool;
LONG pending_callbacks = 0;
RtlEnterCriticalSection( &pool->cs );
if (object->num_pending_callbacks)
{
pending_callbacks = object->num_pending_callbacks;
object->num_pending_callbacks = 0;
list_remove( &object->pool_entry );
}
RtlLeaveCriticalSection( &pool->cs );
while (pending_callbacks--)
tp_object_release( object );
}
/***********************************************************************
* tp_object_wait (internal)
*
* Waits until all pending and running callbacks of a specific object
* have been processed.
*/
static void tp_object_wait( struct threadpool_object *object )
{
struct threadpool *pool = object->pool;
RtlEnterCriticalSection( &pool->cs );
while (object->num_pending_callbacks || object->num_running_callbacks)
RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
RtlLeaveCriticalSection( &pool->cs );
}
/***********************************************************************
* tp_object_shutdown (internal)
*
* Marks a threadpool object for shutdown (which means that no further
......@@ -1357,6 +1516,22 @@ static BOOL tp_object_release( struct threadpool_object *object )
assert( !object->num_pending_callbacks );
assert( !object->num_running_callbacks );
/* release reference to the group */
if (object->group)
{
struct threadpool_group *group = object->group;
RtlEnterCriticalSection( &group->cs );
if (object->is_group_member)
{
list_remove( &object->group_entry );
object->is_group_member = FALSE;
}
RtlLeaveCriticalSection( &group->cs );
tp_group_release( group );
}
tp_threadpool_unlock( object->pool );
RtlFreeHeap( GetProcessHeap(), 0, object );
......@@ -1412,6 +1587,8 @@ static void CALLBACK threadpool_worker_proc( void *param )
RtlEnterCriticalSection( &pool->cs );
pool->num_busy_workers--;
object->num_running_callbacks--;
if (!object->num_pending_callbacks && !object->num_running_callbacks)
RtlWakeAllConditionVariable( &object->finished_event );
tp_object_release( object );
}
......@@ -1439,6 +1616,17 @@ static void CALLBACK threadpool_worker_proc( void *param )
tp_threadpool_release( pool );
}
/***********************************************************************
* TpAllocCleanupGroup (NTDLL.@)
*/
NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
{
TRACE( "%p\n", out );
return tp_group_alloc( (struct threadpool_group **)out );
}
/***********************************************************************
* TpAllocPool (NTDLL.@)
*/
......@@ -1453,6 +1641,80 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
}
/***********************************************************************
* TpReleaseCleanupGroup (NTDLL.@)
*/
VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
{
struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
TRACE( "%p\n", group );
tp_group_shutdown( this );
tp_group_release( this );
}
/***********************************************************************
* TpReleaseCleanupGroupMembers (NTDLL.@)
*/
VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
{
struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
struct threadpool_object *object, *next;
struct list members;
TRACE( "%p %u %p\n", group, cancel_pending, userdata );
RtlEnterCriticalSection( &this->cs );
/* Unset group, increase references, and mark objects for shutdown */
LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
{
assert( object->group == this );
assert( object->is_group_member );
/* Simple callbacks are very special. The user doesn't hold any reference, so
* they would be released too early. Add one additional temporary reference. */
if (object->type == TP_OBJECT_TYPE_SIMPLE)
{
if (interlocked_inc( &object->refcount ) == 1)
{
/* Object is basically already destroyed, but group reference
* was not deleted yet. We can safely ignore this object. */
interlocked_dec( &object->refcount );
list_remove( &object->group_entry );
object->is_group_member = FALSE;
continue;
}
}
object->is_group_member = FALSE;
tp_object_shutdown( object );
}
/* Move members to a new temporary list */
list_init( &members );
list_move_tail( &members, &this->members );
RtlLeaveCriticalSection( &this->cs );
/* Cancel pending callbacks if requested */
if (cancel_pending)
{
LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
{
tp_object_cancel( object );
}
}
/* Wait for remaining callbacks to finish */
LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
{
tp_object_wait( object );
tp_object_release( object );
}
}
/***********************************************************************
* TpReleasePool (NTDLL.@)
*/
VOID WINAPI TpReleasePool( TP_POOL *pool )
......@@ -1542,9 +1804,5 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
object->u.simple.callback = callback;
tp_object_initialize( object, pool, userdata, environment );
tp_object_submit( object );
tp_object_shutdown( object );
tp_object_release( object );
return STATUS_SUCCESS;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment