Commit 2362380b authored by Alexandre Julliard's avatar Alexandre Julliard

Added separate queue for "system" APCs that get called even when the

thread is not in an alertable state. Specify the select_request timeout as absolute value so that we can restart the request when interrupted.
parent d3f2581d
......@@ -340,6 +340,7 @@ struct queue_apc_request
{
REQUEST_HEADER; /* request header */
IN handle_t handle; /* thread handle */
IN int user; /* user or system apc? */
IN void* func; /* function to call */
IN void* param; /* param for function to call */
};
......@@ -349,6 +350,7 @@ struct queue_apc_request
struct get_apc_request
{
REQUEST_HEADER; /* request header */
IN int alertable; /* is thread alertable? */
OUT void* func; /* function to call */
OUT int type; /* function type */
OUT VARARG(args,ptrs); /* function arguments */
......@@ -412,13 +414,15 @@ struct select_request
{
REQUEST_HEADER; /* request header */
IN int flags; /* wait flags (see below) */
IN int timeout; /* timeout in ms */
IN int sec; /* absolute timeout */
IN int usec; /* absolute timeout */
OUT int signaled; /* signaled handle */
IN VARARG(handles,handles); /* handles to select on */
};
#define SELECT_ALL 1
#define SELECT_ALERTABLE 2
#define SELECT_TIMEOUT 4
#define SELECT_ALL 1
#define SELECT_ALERTABLE 2
#define SELECT_INTERRUPTIBLE 4
#define SELECT_TIMEOUT 8
/* Create an event */
......@@ -1576,7 +1580,7 @@ union generic_request
struct async_result_request async_result;
};
#define SERVER_PROTOCOL_VERSION 33
#define SERVER_PROTOCOL_VERSION 34
/* ### make_requests end ### */
/* Everything above this line is generated automatically by tools/make_requests */
......
......@@ -16,11 +16,30 @@
/***********************************************************************
* get_timeout
*/
inline static void get_timeout( struct timeval *when, int timeout )
{
gettimeofday( when, 0 );
if (timeout)
{
long sec = timeout / 1000;
if ((when->tv_usec += (timeout - 1000*sec) * 1000) >= 1000000)
{
when->tv_usec -= 1000000;
when->tv_sec++;
}
when->tv_sec += sec;
}
}
/***********************************************************************
* call_apcs
*
* Call outstanding APCs.
*/
static void call_apcs(void)
static void call_apcs( BOOL alertable )
{
FARPROC proc = NULL;
FILETIME ft;
......@@ -32,6 +51,7 @@ static void call_apcs(void)
SERVER_START_REQ
{
struct get_apc_request *req = server_alloc_req( sizeof(*req), sizeof(args) );
req->alertable = alertable;
if (!server_call( REQ_GET_APC ))
{
type = req->type;
......@@ -119,6 +139,7 @@ DWORD WINAPI WaitForMultipleObjectsEx( DWORD count, const HANDLE *handles,
BOOL alertable )
{
int i, ret;
struct timeval tv;
if (count > MAXIMUM_WAIT_OBJECTS)
{
......@@ -126,24 +147,33 @@ DWORD WINAPI WaitForMultipleObjectsEx( DWORD count, const HANDLE *handles,
return WAIT_FAILED;
}
SERVER_START_REQ
if (timeout == INFINITE) tv.tv_sec = tv.tv_usec = 0;
else get_timeout( &tv, timeout );
for (;;)
{
struct select_request *req = server_alloc_req( sizeof(*req), count * sizeof(int) );
int *data = server_data_ptr( req );
SERVER_START_REQ
{
struct select_request *req = server_alloc_req( sizeof(*req), count * sizeof(int) );
int *data = server_data_ptr( req );
req->flags = 0;
req->timeout = timeout;
for (i = 0; i < count; i++) data[i] = handles[i];
req->flags = SELECT_INTERRUPTIBLE;
req->sec = tv.tv_sec;
req->usec = tv.tv_usec;
for (i = 0; i < count; i++) data[i] = handles[i];
if (wait_all) req->flags |= SELECT_ALL;
if (alertable) req->flags |= SELECT_ALERTABLE;
if (timeout != INFINITE) req->flags |= SELECT_TIMEOUT;
if (wait_all) req->flags |= SELECT_ALL;
if (alertable) req->flags |= SELECT_ALERTABLE;
if (timeout != INFINITE) req->flags |= SELECT_TIMEOUT;
server_call( REQ_SELECT );
ret = req->signaled;
server_call( REQ_SELECT );
ret = req->signaled;
}
SERVER_END_REQ;
if (ret != STATUS_USER_APC) break;
call_apcs( alertable );
if (alertable) break;
}
SERVER_END_REQ;
if (ret == STATUS_USER_APC) call_apcs();
return ret;
}
......
......@@ -638,6 +638,7 @@ DWORD WINAPI QueueUserAPC( PAPCFUNC func, HANDLE hthread, ULONG_PTR data )
{
struct queue_apc_request *req = server_alloc_req( sizeof(*req), 0 );
req->handle = hthread;
req->user = 1;
req->func = func;
req->param = (void *)data;
ret = !server_call( REQ_QUEUE_APC );
......
......@@ -213,25 +213,25 @@ DECL_HANDLER(create_async)
static void async_poll_event( struct object *obj, int event )
{
struct async *ov = (struct async *) obj;
/* queue an APC in the client thread to do our dirty work */
ov->obj.ops->remove_queue(&ov->obj,&ov->wait);
/* FIXME: this should be a function pointer */
event = serial_async_poll_event(obj,event);
thread_queue_apc(ov->thread, NULL, ov->func, APC_ASYNC, 3,
thread_queue_apc(ov->thread, NULL, ov->func, APC_ASYNC, 1, 3,
ov->client_overlapped, ov->buffer, event);
}
}
/* handler for async i/o timeouts */
static void overlapped_timeout (void *private)
{
struct async *ov = (struct async *) private;
ov->obj.ops->remove_queue(&ov->obj,&ov->wait);
thread_queue_apc(ov->thread, NULL, ov->func, APC_ASYNC, 3,
thread_queue_apc(ov->thread, NULL, ov->func, APC_ASYNC, 1, 3,
ov->client_overlapped,ov->buffer, 0);
}
......
......@@ -363,14 +363,21 @@ static int wait_for_debug_event( int timeout )
struct debug_ctx *debug_ctx = current->debug_ctx;
struct object *obj = &debug_ctx->obj;
int flags = 0;
struct timeval tv;
if (!debug_ctx) /* current thread is not a debugger */
{
set_error( STATUS_INVALID_HANDLE );
return 0;
}
if (timeout != -1) flags = SELECT_TIMEOUT;
return sleep_on( 1, &obj, flags, timeout, build_wait_debug_reply );
if (timeout != -1)
{
flags = SELECT_TIMEOUT;
gettimeofday( &tv, 0 );
add_timeout( &tv, timeout );
}
else tv.tv_sec = tv.tv_usec = 0;
return sleep_on( 1, &obj, flags, tv.tv_sec, tv.tv_usec, build_wait_debug_reply );
}
/* continue a debug event */
......@@ -603,7 +610,7 @@ DECL_HANDLER(exception_event)
{
struct object *obj = &event->obj;
current->context = context;
sleep_on( 1, &obj, 0, -1, build_exception_event_reply );
sleep_on( 1, &obj, 0, 0, 0, build_exception_event_reply );
release_object( event );
}
}
......
......@@ -771,8 +771,12 @@ DECL_HANDLER(wait_process)
}
else
{
struct timeval timeout;
struct object *obj = &current->info->obj;
sleep_on( 1, &obj, SELECT_TIMEOUT, req->timeout, build_wait_process_reply );
gettimeofday( &timeout, 0 );
add_timeout( &timeout, req->timeout );
sleep_on( 1, &obj, SELECT_TIMEOUT, timeout.tv_sec, timeout.tv_usec,
build_wait_process_reply );
}
}
......
......@@ -64,7 +64,7 @@ static void dump_thread( struct object *obj, int verbose );
static int thread_signaled( struct object *obj, struct thread *thread );
extern void thread_poll_event( struct object *obj, int event );
static void destroy_thread( struct object *obj );
static struct thread_apc *thread_dequeue_apc( struct thread *thread );
static struct thread_apc *thread_dequeue_apc( struct thread *thread, int system_only );
static const struct object_ops thread_ops =
{
......@@ -147,8 +147,10 @@ struct thread *create_thread( int fd, struct process *process )
thread->queue = NULL;
thread->info = NULL;
thread->wait = NULL;
thread->apc_head = NULL;
thread->apc_tail = NULL;
thread->system_apc.head = NULL;
thread->system_apc.tail = NULL;
thread->user_apc.head = NULL;
thread->user_apc.tail = NULL;
thread->error = 0;
thread->pass_fd = -1;
thread->request_fd = NULL;
......@@ -211,7 +213,7 @@ static void destroy_thread( struct object *obj )
if (thread->next) thread->next->prev = thread->prev;
if (thread->prev) thread->prev->next = thread->next;
else first_thread = thread->next;
while ((apc = thread_dequeue_apc( thread ))) free( apc );
while ((apc = thread_dequeue_apc( thread, 0 ))) free( apc );
if (thread->info) release_object( thread->info );
if (thread->queue) release_object( thread->queue );
if (thread->buffer != (void *)-1) munmap( thread->buffer, MAX_REQUEST_LENGTH );
......@@ -354,7 +356,7 @@ static void end_wait( struct thread *thread )
/* build the thread wait structure */
static int wait_on( int count, struct object *objects[], int flags,
int timeout, sleep_reply func )
int sec, int usec, sleep_reply func )
{
struct thread_wait *wait;
struct wait_queue_entry *entry;
......@@ -368,8 +370,8 @@ static int wait_on( int count, struct object *objects[], int flags,
wait->reply = func;
if (flags & SELECT_TIMEOUT)
{
gettimeofday( &wait->timeout, 0 );
add_timeout( &wait->timeout, timeout );
wait->timeout.tv_sec = sec;
wait->timeout.tv_usec = usec;
}
for (i = 0, entry = wait->queues; i < count; i++, entry++)
......@@ -425,7 +427,8 @@ static int check_wait( struct thread *thread, struct object **object )
}
other_checks:
if ((wait->flags & SELECT_ALERTABLE) && thread->apc_head) return STATUS_USER_APC;
if ((wait->flags & SELECT_INTERRUPTIBLE) && thread->system_apc.head) return STATUS_USER_APC;
if ((wait->flags & SELECT_ALERTABLE) && thread->user_apc.head) return STATUS_USER_APC;
if (wait->flags & SELECT_TIMEOUT)
{
struct timeval now;
......@@ -469,10 +472,10 @@ static void thread_timeout( void *ptr )
}
/* sleep on a list of objects */
int sleep_on( int count, struct object *objects[], int flags, int timeout, sleep_reply func )
int sleep_on( int count, struct object *objects[], int flags, int sec, int usec, sleep_reply func )
{
assert( !current->wait );
if (!wait_on( count, objects, flags, timeout, func )) return 0;
if (!wait_on( count, objects, flags, sec, usec, func )) return 0;
if (wake_thread( current )) return 1;
/* now we need to wait */
if (flags & SELECT_TIMEOUT)
......@@ -488,7 +491,7 @@ int sleep_on( int count, struct object *objects[], int flags, int timeout, sleep
}
/* select on a list of handles */
static int select_on( int count, handle_t *handles, int flags, int timeout )
static int select_on( int count, handle_t *handles, int flags, int sec, int usec )
{
int ret = 0;
int i;
......@@ -504,7 +507,7 @@ static int select_on( int count, handle_t *handles, int flags, int timeout )
if (!(objects[i] = get_handle_obj( current->process, handles[i], SYNCHRONIZE, NULL )))
break;
}
if (i == count) ret = sleep_on( count, objects, flags, timeout, build_select_reply );
if (i == count) ret = sleep_on( count, objects, flags, sec, usec, build_select_reply );
while (--i >= 0) release_object( objects[i] );
return ret;
}
......@@ -528,15 +531,16 @@ void wake_up( struct object *obj, int max )
/* queue an async procedure call */
int thread_queue_apc( struct thread *thread, struct object *owner, void *func,
enum apc_type type, int nb_args, ... )
enum apc_type type, int system, int nb_args, ... )
{
struct thread_apc *apc;
struct apc_queue *queue = system ? &thread->system_apc : &thread->user_apc;
/* cancel a possible previous APC with the same owner */
if (owner) thread_cancel_apc( thread, owner );
if (owner) thread_cancel_apc( thread, owner, system );
if (!(apc = mem_alloc( sizeof(*apc) + (nb_args-1)*sizeof(apc->args[0]) ))) return 0;
apc->prev = thread->apc_tail;
apc->prev = queue->tail;
apc->next = NULL;
apc->owner = owner;
apc->func = func;
......@@ -550,40 +554,44 @@ int thread_queue_apc( struct thread *thread, struct object *owner, void *func,
for (i = 0; i < nb_args; i++) apc->args[i] = va_arg( args, void * );
va_end( args );
}
thread->apc_tail = apc;
queue->tail = apc;
if (!apc->prev) /* first one */
{
thread->apc_head = apc;
queue->head = apc;
if (thread->wait && wake_thread( thread )) send_reply( thread );
}
return 1;
}
/* cancel the async procedure call owned by a specific object */
void thread_cancel_apc( struct thread *thread, struct object *owner )
void thread_cancel_apc( struct thread *thread, struct object *owner, int system )
{
struct thread_apc *apc;
for (apc = thread->apc_head; apc; apc = apc->next)
struct apc_queue *queue = system ? &thread->system_apc : &thread->user_apc;
for (apc = queue->head; apc; apc = apc->next)
{
if (apc->owner != owner) continue;
if (apc->next) apc->next->prev = apc->prev;
else thread->apc_tail = apc->prev;
else queue->tail = apc->prev;
if (apc->prev) apc->prev->next = apc->next;
else thread->apc_head = apc->next;
else queue->head = apc->next;
free( apc );
return;
}
}
/* remove the head apc from the queue; the returned pointer must be freed by the caller */
static struct thread_apc *thread_dequeue_apc( struct thread *thread )
static struct thread_apc *thread_dequeue_apc( struct thread *thread, int system_only )
{
struct thread_apc *apc = thread->apc_head;
if (apc)
struct thread_apc *apc;
struct apc_queue *queue = &thread->system_apc;
if (!queue->head && !system_only) queue = &thread->user_apc;
if ((apc = queue->head))
{
if (apc->next) apc->next->prev = NULL;
else thread->apc_tail = NULL;
thread->apc_head = apc->next;
else queue->tail = NULL;
queue->head = apc->next;
}
return apc;
}
......@@ -811,7 +819,7 @@ DECL_HANDLER(resume_thread)
DECL_HANDLER(select)
{
int count = get_req_data_size(req) / sizeof(int);
if (!select_on( count, get_req_data(req), req->flags, req->timeout ))
if (!select_on( count, get_req_data(req), req->flags, req->sec, req->usec ))
req->signaled = -1;
}
......@@ -821,7 +829,7 @@ DECL_HANDLER(queue_apc)
struct thread *thread;
if ((thread = get_thread_from_handle( req->handle, THREAD_SET_CONTEXT )))
{
thread_queue_apc( thread, NULL, req->func, APC_USER, 1, req->param );
thread_queue_apc( thread, NULL, req->func, APC_USER, !req->user, 1, req->param );
release_object( thread );
}
}
......@@ -834,7 +842,7 @@ DECL_HANDLER(get_apc)
for (;;)
{
if (!(apc = thread_dequeue_apc( current )))
if (!(apc = thread_dequeue_apc( current, !req->alertable )))
{
/* no more APCs */
req->func = NULL;
......
......@@ -30,6 +30,11 @@ enum run_state
TERMINATED /* terminated */
};
struct apc_queue
{
struct thread_apc *head;
struct thread_apc *tail;
};
struct thread
{
......@@ -45,8 +50,8 @@ struct thread
struct msg_queue *queue; /* message queue */
struct startup_info*info; /* startup info for child process */
struct thread_wait *wait; /* current wait condition if sleeping */
struct thread_apc *apc_head; /* queue of async procedure calls */
struct thread_apc *apc_tail; /* queue of async procedure calls */
struct apc_queue system_apc; /* queue of system async procedure calls */
struct apc_queue user_apc; /* queue of user async procedure calls */
unsigned int error; /* current error code */
struct object *request_fd; /* fd for receiving client requests */
int pass_fd; /* fd to pass to the client */
......@@ -92,10 +97,10 @@ extern void remove_queue( struct object *obj, struct wait_queue_entry *entry );
extern void kill_thread( struct thread *thread, int violent_death );
extern void wake_up( struct object *obj, int max );
extern int sleep_on( int count, struct object *objects[], int flags,
int timeout, sleep_reply func );
int sec, int usec, sleep_reply func );
extern int thread_queue_apc( struct thread *thread, struct object *owner, void *func,
enum apc_type type, int nb_args, ... );
extern void thread_cancel_apc( struct thread *thread, struct object *owner );
enum apc_type type, int system, int nb_args, ... );
extern void thread_cancel_apc( struct thread *thread, struct object *owner, int system );
extern struct thread_snapshot *thread_snap( int *count );
/* ptrace functions */
......
......@@ -78,7 +78,7 @@ static void timer_callback( void *private )
/* queue an APC */
if (timer->thread)
thread_queue_apc( timer->thread, &timer->obj, timer->callback, APC_TIMER, 3,
thread_queue_apc( timer->thread, &timer->obj, timer->callback, APC_TIMER, 0, 3,
(void *)timer->when.tv_sec, (void *)timer->when.tv_usec, timer->arg );
if (timer->period) /* schedule the next expiration */
......@@ -103,7 +103,7 @@ static void cancel_timer( struct timer *timer )
}
if (timer->thread)
{
thread_cancel_apc( timer->thread, &timer->obj );
thread_cancel_apc( timer->thread, &timer->obj, 0 );
timer->thread = NULL;
}
}
......
......@@ -466,12 +466,14 @@ static void dump_unload_dll_request( const struct unload_dll_request *req )
static void dump_queue_apc_request( const struct queue_apc_request *req )
{
fprintf( stderr, " handle=%d,", req->handle );
fprintf( stderr, " user=%d,", req->user );
fprintf( stderr, " func=%p,", req->func );
fprintf( stderr, " param=%p", req->param );
}
static void dump_get_apc_request( const struct get_apc_request *req )
{
fprintf( stderr, " alertable=%d", req->alertable );
}
static void dump_get_apc_reply( const struct get_apc_request *req )
......@@ -537,7 +539,8 @@ static void dump_open_process_reply( const struct open_process_request *req )
static void dump_select_request( const struct select_request *req )
{
fprintf( stderr, " flags=%d,", req->flags );
fprintf( stderr, " timeout=%d,", req->timeout );
fprintf( stderr, " sec=%d,", req->sec );
fprintf( stderr, " usec=%d,", req->usec );
fprintf( stderr, " handles=" );
cur_pos += dump_varargs_handles( req );
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment