Commit ad53ffc9 authored by Alexandre Julliard's avatar Alexandre Julliard

server: Make async objects waitable.

parent cc2f2f22
......@@ -41,11 +41,13 @@ struct async
unsigned int status; /* current status */
struct timeout_user *timeout;
unsigned int timeout_status; /* status to report upon timeout */
int signaled;
struct event *event;
async_data_t data; /* data for async I/O call */
};
static void async_dump( struct object *obj, int verbose );
static int async_signaled( struct object *obj, struct wait_queue_entry *entry );
static void async_destroy( struct object *obj );
static const struct object_ops async_ops =
......@@ -53,10 +55,10 @@ static const struct object_ops async_ops =
sizeof(struct async), /* size */
async_dump, /* dump */
no_get_type, /* get_type */
no_add_queue, /* add_queue */
NULL, /* remove_queue */
NULL, /* signaled */
NULL, /* satisfied */
add_queue, /* add_queue */
remove_queue, /* remove_queue */
async_signaled, /* signaled */
no_satisfied, /* satisfied */
no_signal, /* signal */
no_get_fd, /* get_fd */
no_map_access, /* map_access */
......@@ -114,6 +116,13 @@ static void async_dump( struct object *obj, int verbose )
fprintf( stderr, "Async thread=%p\n", async->thread );
}
static int async_signaled( struct object *obj, struct wait_queue_entry *entry )
{
struct async *async = (struct async *)obj;
assert( obj->ops == &async_ops );
return async->signaled;
}
static void async_destroy( struct object *obj )
{
struct async *async = (struct async *)obj;
......@@ -228,6 +237,7 @@ struct async *create_async( struct thread *thread, struct async_queue *queue, co
async->data = *data;
async->timeout = NULL;
async->queue = (struct async_queue *)grab_object( queue );
async->signaled = 0;
list_add_tail( &queue->queue, &async->queue_entry );
grab_object( async );
......@@ -306,6 +316,8 @@ void async_set_result( struct object *obj, unsigned int status, apc_param_t tota
}
if (async->event) set_event( async->event );
else if (async->queue->fd) set_fd_signaled( async->queue->fd, 1 );
async->signaled = 1;
wake_up( &async->obj, 0 );
}
}
......
......@@ -124,19 +124,6 @@ struct event *create_event( struct directory *root, const struct unicode_str *na
return event;
}
obj_handle_t alloc_wait_event( struct process *process )
{
obj_handle_t handle = 0;
struct event *event = create_event( NULL, NULL, 0, 1, 0, NULL );
if (event)
{
handle = alloc_handle( process, event, EVENT_ALL_ACCESS, 0 );
release_object( event );
}
return handle;
}
struct event *get_event_obj( struct process *process, obj_handle_t handle, unsigned int access )
{
return (struct event *)get_handle_obj( process, handle, access, &event_ops );
......
......@@ -614,21 +614,9 @@ static obj_handle_t pipe_server_ioctl( struct fd *fd, ioctl_code_t code, const a
{
case ps_idle_server:
case ps_wait_connect:
if (blocking)
{
async_data_t new_data = *async_data;
if (!(wait_handle = alloc_wait_event( current->process ))) break;
new_data.event = wait_handle;
if (!(async = fd_queue_async( server->ioctl_fd, &new_data, ASYNC_TYPE_WAIT )))
{
close_handle( current->process, wait_handle );
break;
}
}
else async = fd_queue_async( server->ioctl_fd, async_data, ASYNC_TYPE_WAIT );
if (async)
if ((async = fd_queue_async( server->ioctl_fd, async_data, ASYNC_TYPE_WAIT )))
{
if (blocking) wait_handle = alloc_handle( current->process, async, SYNCHRONIZE, 0 );
set_server_state( server, ps_wait_open );
if (server->pipe->waiters) async_wake_up( server->pipe->waiters, STATUS_SUCCESS );
release_object( async );
......@@ -913,23 +901,11 @@ static obj_handle_t named_pipe_device_ioctl( struct fd *fd, ioctl_code_t code,
if (!pipe->waiters && !(pipe->waiters = create_async_queue( NULL ))) goto done;
if (blocking)
{
async_data_t new_data = *async_data;
if (!(wait_handle = alloc_wait_event( current->process ))) goto done;
new_data.event = wait_handle;
if (!(async = create_async( current, pipe->waiters, &new_data )))
{
close_handle( current->process, wait_handle );
wait_handle = 0;
}
}
else async = create_async( current, pipe->waiters, async_data );
if (async)
if ((async = create_async( current, pipe->waiters, async_data )))
{
timeout_t when = buffer->TimeoutSpecified ? buffer->Timeout.QuadPart : pipe->timeout;
async_set_timeout( async, when, STATUS_IO_TIMEOUT );
if (blocking) wait_handle = alloc_handle( current->process, async, SYNCHRONIZE, 0 );
release_object( async );
set_error( STATUS_PENDING );
}
......
......@@ -162,7 +162,6 @@ extern struct event *create_event( struct directory *root, const struct unicode_
const struct security_descriptor *sd );
extern struct keyed_event *create_keyed_event( struct directory *root, const struct unicode_str *name,
unsigned int attr, const struct security_descriptor *sd );
extern obj_handle_t alloc_wait_event( struct process *process );
extern struct event *get_event_obj( struct process *process, obj_handle_t handle, unsigned int access );
extern struct keyed_event *get_keyed_event_obj( struct process *process, obj_handle_t handle, unsigned int access );
extern void pulse_event( struct event *event );
......
......@@ -120,7 +120,6 @@ struct sock
};
static void sock_dump( struct object *obj, int verbose );
static int sock_add_ifchange( struct sock *sock, const async_data_t *async_data );
static int sock_signaled( struct object *obj, struct wait_queue_entry *entry );
static struct fd *sock_get_fd( struct object *obj );
static void sock_destroy( struct object *obj );
......@@ -539,25 +538,19 @@ obj_handle_t sock_ioctl( struct fd *fd, ioctl_code_t code, const async_data_t *a
{
struct sock *sock = get_fd_user( fd );
obj_handle_t wait_handle = 0;
async_data_t new_data;
struct async_queue *ifchange_q;
struct async *async;
assert( sock->obj.ops == &sock_ops );
switch(code)
{
case WS_SIO_ADDRESS_LIST_CHANGE:
if (blocking)
{
if (!(wait_handle = alloc_wait_event( current->process ))) return 0;
new_data = *async_data;
new_data.event = wait_handle;
async_data = &new_data;
}
if (!sock_add_ifchange( sock, async_data ) && wait_handle)
{
close_handle( current->process, wait_handle );
return 0;
}
if (!(ifchange_q = sock_get_ifchange_q( sock ))) return 0;
if (!(async = create_async( current, ifchange_q, async_data ))) return 0;
if (blocking) wait_handle = alloc_handle( current->process, async, SYNCHRONIZE, 0 );
release_object( async );
set_error( STATUS_PENDING );
return wait_handle;
default:
set_error( STATUS_NOT_SUPPORTED );
......@@ -962,29 +955,6 @@ static void sock_set_error(void)
set_error( sock_get_ntstatus( errno ) );
}
/* add interface change notification to a socket */
static int sock_add_ifchange( struct sock *sock, const async_data_t *async_data )
{
struct async_queue *ifchange_q;
struct async *async;
if (!(ifchange_q = sock_get_ifchange_q( sock )))
return 0;
if (!(async = create_async( current, ifchange_q, async_data )))
{
if (!async_queued( ifchange_q ))
sock_destroy_ifchange_q( sock );
set_error( STATUS_NO_MEMORY );
return 0;
}
release_object( async );
set_error( STATUS_PENDING );
return 1;
}
#ifdef HAVE_LINUX_RTNETLINK_H
/* only keep one ifchange object around, all sockets waiting for wakeups will look to it */
......@@ -1204,7 +1174,7 @@ static struct async_queue *sock_get_ifchange_q( struct sock *sock )
return NULL;
/* create the ifchange notification queue */
fd = ifchange->ops->get_fd( ifchange );
fd = get_obj_fd( ifchange );
sock->ifchange_q = create_async_queue( fd );
release_object( fd );
if (!sock->ifchange_q)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment