Commit df09ac51 authored by Alexandre Julliard's avatar Alexandre Julliard

server: Make async I/O queues into real objects.

parent d99ee344
......@@ -62,6 +62,35 @@ static const struct object_ops async_ops =
async_destroy /* destroy */
};
struct async_queue
{
struct object obj; /* object header */
struct fd *fd; /* file descriptor owning this queue */
struct list queue; /* queue of async objects */
};
static void async_queue_dump( struct object *obj, int verbose );
static void async_queue_destroy( struct object *obj );
static const struct object_ops async_queue_ops =
{
sizeof(struct async_queue), /* size */
async_queue_dump, /* dump */
no_add_queue, /* add_queue */
NULL, /* remove_queue */
NULL, /* signaled */
NULL, /* satisfied */
no_signal, /* signal */
no_get_fd, /* get_fd */
no_map_access, /* map_access */
no_lookup_name, /* lookup_name */
no_open_file, /* open_file */
no_close_handle, /* close_handle */
async_queue_destroy /* destroy */
};
static void async_dump( struct object *obj, int verbose )
{
struct async *async = (struct async *)obj;
......@@ -79,6 +108,21 @@ static void async_destroy( struct object *obj )
release_object( async->thread );
}
static void async_queue_dump( struct object *obj, int verbose )
{
struct async_queue *async_queue = (struct async_queue *)obj;
assert( obj->ops == &async_queue_ops );
fprintf( stderr, "Async queue fd=%p\n", async_queue->fd );
}
static void async_queue_destroy( struct object *obj )
{
struct async_queue *async_queue = (struct async_queue *)obj;
assert( obj->ops == &async_queue_ops );
async_wake_up( async_queue, STATUS_HANDLES_CLOSED );
}
/* notifies client thread of new status of its async request */
/* destroys the server side of it */
static void async_terminate( struct async *async, unsigned int status )
......@@ -108,9 +152,22 @@ static void async_timeout( void *private )
async_terminate( async, STATUS_TIMEOUT );
}
/* create a new async queue for a given fd */
struct async_queue *create_async_queue( struct fd *fd )
{
struct async_queue *queue = alloc_object( &async_queue_ops );
if (queue)
{
queue->fd = fd;
list_init( &queue->queue );
}
return queue;
}
/* create an async on a given queue of a fd */
struct async *create_async( struct thread *thread, const struct timeval *timeout,
struct list *queue, const async_data_t *data )
struct async_queue *queue, const async_data_t *data )
{
struct event *event = NULL;
struct async *async;
......@@ -128,7 +185,7 @@ struct async *create_async( struct thread *thread, const struct timeval *timeout
async->event = event;
async->data = *data;
list_add_tail( queue, &async->queue_entry );
list_add_tail( &queue->queue, &async->queue_entry );
if (timeout) async->timeout = add_timeout_user( timeout, async_timeout, async );
else async->timeout = NULL;
......@@ -164,21 +221,23 @@ void async_set_result( struct object *obj, unsigned int status )
}
}
/* terminate the async operation at the head of the queue */
void async_terminate_head( struct list *queue, unsigned int status )
/* check if an async operation is waiting to be alerted */
int async_waiting( struct async_queue *queue )
{
struct list *ptr = list_head( queue );
if (ptr) async_terminate( LIST_ENTRY( ptr, struct async, queue_entry ), status );
return queue && !list_empty( &queue->queue );
}
/* terminate all async operations on the queue */
void async_terminate_queue( struct list *queue, unsigned int status )
/* wake up async operations on the queue */
void async_wake_up( struct async_queue *queue, unsigned int status )
{
struct list *ptr, *next;
LIST_FOR_EACH_SAFE( ptr, next, queue )
if (!queue) return;
LIST_FOR_EACH_SAFE( ptr, next, &queue->queue )
{
struct async *async = LIST_ENTRY( ptr, struct async, queue_entry );
async_terminate( async, status );
if (status == STATUS_ALERTED) break; /* only wake up the first one */
}
}
......@@ -572,7 +572,7 @@ static void inotify_do_change_notify( struct dir *dir, unsigned int action,
list_add_tail( &dir->change_records, &record->entry );
}
fd_async_terminate_head( dir->fd, ASYNC_TYPE_WAIT, STATUS_ALERTED );
fd_async_wake_up( dir->fd, ASYNC_TYPE_WAIT, STATUS_ALERTED );
}
static unsigned int filter_from_event( struct inotify_event *ie )
......@@ -1097,7 +1097,7 @@ DECL_HANDLER(read_directory_changes)
/* if there's already a change in the queue, send it */
if (!list_empty( &dir->change_records ))
fd_async_terminate_head( dir->fd, ASYNC_TYPE_WAIT, STATUS_ALERTED );
fd_async_wake_up( dir->fd, ASYNC_TYPE_WAIT, STATUS_ALERTED );
/* setup the real notification */
if (!inotify_adjust_changes( dir ))
......
......@@ -171,9 +171,9 @@ struct fd
int fs_locks :1; /* can we use filesystem locks for this fd? */
int unmounted :1;/* has the device been unmounted? */
int poll_index; /* index of fd in poll array */
struct list read_q; /* async readers of this fd */
struct list write_q; /* async writers of this fd */
struct list wait_q; /* other async waiters of this fd */
struct async_queue *read_q; /* async readers of this fd */
struct async_queue *write_q; /* async writers of this fd */
struct async_queue *wait_q; /* other async waiters of this fd */
};
static void fd_dump( struct object *obj, int verbose );
......@@ -1286,9 +1286,9 @@ static void fd_destroy( struct object *obj )
{
struct fd *fd = (struct fd *)obj;
async_terminate_queue( &fd->read_q, STATUS_CANCELLED );
async_terminate_queue( &fd->write_q, STATUS_CANCELLED );
async_terminate_queue( &fd->wait_q, STATUS_CANCELLED );
if (fd->read_q) release_object( fd->read_q );
if (fd->write_q) release_object( fd->write_q );
if (fd->wait_q) release_object( fd->wait_q );
remove_fd_locks( fd );
list_remove( &fd->inode_entry );
......@@ -1330,8 +1330,8 @@ static inline void unmount_fd( struct fd *fd )
{
assert( fd->inode );
async_terminate_queue( &fd->read_q, STATUS_VOLUME_DISMOUNTED );
async_terminate_queue( &fd->write_q, STATUS_VOLUME_DISMOUNTED );
async_wake_up( fd->read_q, STATUS_VOLUME_DISMOUNTED );
async_wake_up( fd->write_q, STATUS_VOLUME_DISMOUNTED );
if (fd->poll_index != -1) set_fd_events( fd, -1 );
......@@ -1363,11 +1363,11 @@ static struct fd *alloc_fd_object(void)
fd->fs_locks = 1;
fd->unmounted = 0;
fd->poll_index = -1;
fd->read_q = NULL;
fd->write_q = NULL;
fd->wait_q = NULL;
list_init( &fd->inode_entry );
list_init( &fd->locks );
list_init( &fd->read_q );
list_init( &fd->write_q );
list_init( &fd->wait_q );
if ((fd->poll_index = add_poll_user( fd )) == -1)
{
......@@ -1394,11 +1394,11 @@ struct fd *alloc_pseudo_fd( const struct fd_ops *fd_user_ops, struct object *use
fd->fs_locks = 0;
fd->unmounted = 0;
fd->poll_index = -1;
fd->read_q = NULL;
fd->write_q = NULL;
fd->wait_q = NULL;
list_init( &fd->inode_entry );
list_init( &fd->locks );
list_init( &fd->read_q );
list_init( &fd->write_q );
list_init( &fd->wait_q );
return fd;
}
......@@ -1692,27 +1692,16 @@ int default_fd_get_poll_events( struct fd *fd )
{
int events = 0;
if (!list_empty( &fd->read_q ))
events |= POLLIN;
if (!list_empty( &fd->write_q ))
events |= POLLOUT;
if (async_waiting( fd->read_q )) events |= POLLIN;
if (async_waiting( fd->write_q )) events |= POLLOUT;
return events;
}
/* default handler for poll() events */
void default_poll_event( struct fd *fd, int event )
{
if (!list_empty( &fd->read_q ) && (POLLIN & event) )
{
async_terminate_head( &fd->read_q, STATUS_ALERTED );
return;
}
if (!list_empty( &fd->write_q ) && (POLLOUT & event) )
{
async_terminate_head( &fd->write_q, STATUS_ALERTED );
return;
}
if (event & POLLIN) async_wake_up( fd->read_q, STATUS_ALERTED );
if (event & POLLOUT) async_wake_up( fd->write_q, STATUS_ALERTED );
/* if an error occurred, stop polling this fd to avoid busy-looping */
if (event & (POLLERR | POLLHUP)) set_fd_events( fd, -1 );
......@@ -1722,18 +1711,21 @@ void default_poll_event( struct fd *fd, int event )
int fd_queue_async_timeout( struct fd *fd, const async_data_t *data, int type, int count,
const struct timeval *timeout )
{
struct list *queue;
struct async_queue *queue;
switch (type)
{
case ASYNC_TYPE_READ:
queue = &fd->read_q;
if (!fd->read_q && !(fd->read_q = create_async_queue( fd ))) return 0;
queue = fd->read_q;
break;
case ASYNC_TYPE_WRITE:
queue = &fd->write_q;
if (!fd->write_q && !(fd->write_q = create_async_queue( fd ))) return 0;
queue = fd->write_q;
break;
case ASYNC_TYPE_WAIT:
queue = &fd->wait_q;
if (!fd->wait_q && !(fd->wait_q = create_async_queue( fd ))) return 0;
queue = fd->wait_q;
break;
default:
set_error( STATUS_INVALID_PARAMETER );
......@@ -1746,41 +1738,23 @@ int fd_queue_async_timeout( struct fd *fd, const async_data_t *data, int type, i
if (!fd->inode)
set_fd_events( fd, fd->fd_ops->get_poll_events( fd ) );
else /* regular files are always ready for read and write */
if (type != ASYNC_TYPE_WAIT) async_terminate_head( queue, STATUS_ALERTED );
if (type != ASYNC_TYPE_WAIT) async_wake_up( queue, STATUS_ALERTED );
return 1;
}
void fd_async_terminate_head( struct fd *fd, int type, unsigned int status )
{
switch (type)
{
case ASYNC_TYPE_READ:
async_terminate_head( &fd->read_q, status );
break;
case ASYNC_TYPE_WRITE:
async_terminate_head( &fd->write_q, status );
break;
case ASYNC_TYPE_WAIT:
async_terminate_head( &fd->wait_q, status );
break;
default:
assert(0);
}
}
void fd_async_terminate_queue( struct fd *fd, int type, unsigned int status )
void fd_async_wake_up( struct fd *fd, int type, unsigned int status )
{
switch (type)
{
case ASYNC_TYPE_READ:
async_terminate_queue( &fd->read_q, status );
async_wake_up( fd->read_q, status );
break;
case ASYNC_TYPE_WRITE:
async_terminate_queue( &fd->write_q, status );
async_wake_up( fd->write_q, status );
break;
case ASYNC_TYPE_WAIT:
async_terminate_queue( &fd->wait_q, status );
async_wake_up( fd->wait_q, status );
break;
default:
assert(0);
......@@ -1802,9 +1776,9 @@ void default_fd_queue_async( struct fd *fd, const async_data_t *data, int type,
void default_fd_cancel_async( struct fd *fd )
{
async_terminate_queue( &fd->read_q, STATUS_CANCELLED );
async_terminate_queue( &fd->write_q, STATUS_CANCELLED );
async_terminate_queue( &fd->wait_q, STATUS_CANCELLED );
async_wake_up( fd->read_q, STATUS_CANCELLED );
async_wake_up( fd->write_q, STATUS_CANCELLED );
async_wake_up( fd->wait_q, STATUS_CANCELLED );
}
/* default flush() routine */
......
......@@ -24,6 +24,7 @@
#include "object.h"
struct fd;
struct async_queue;
typedef unsigned __int64 file_pos_t;
......@@ -70,8 +71,7 @@ extern int default_fd_get_poll_events( struct fd *fd );
extern void default_poll_event( struct fd *fd, int event );
extern int fd_queue_async_timeout( struct fd *fd, const async_data_t *data, int type,
int count, const struct timeval *timeout );
extern void fd_async_terminate_head( struct fd *fd, int type, unsigned int status );
extern void fd_async_terminate_queue( struct fd *fd, int type, unsigned int status );
extern void fd_async_wake_up( struct fd *fd, int type, unsigned int status );
extern void default_fd_queue_async( struct fd *fd, const async_data_t *data, int type, int count );
extern void default_fd_cancel_async( struct fd *fd );
extern void no_flush( struct fd *fd, struct event **event );
......@@ -124,11 +124,12 @@ extern int is_serial_fd( struct fd *fd );
extern struct object *create_serial( struct fd *fd, unsigned int options );
/* async I/O functions */
extern struct async_queue *create_async_queue( struct fd *fd );
extern struct async *create_async( struct thread *thread, const struct timeval *timeout,
struct list *queue, const async_data_t *data );
struct async_queue *queue, const async_data_t *data );
extern void async_set_result( struct object *obj, unsigned int status );
extern void async_terminate_head( struct list *queue, unsigned int status );
extern void async_terminate_queue( struct list *queue, unsigned int status );
extern int async_waiting( struct async_queue *queue );
extern void async_wake_up( struct async_queue *queue, unsigned int status );
/* access rights that require Unix read permission */
#define FILE_UNIX_READ_ACCESS (FILE_READ_DATA|FILE_READ_ATTRIBUTES|FILE_READ_EA)
......
......@@ -74,7 +74,7 @@ struct pipe_server
struct named_pipe *pipe;
struct timeout_user *flush_poll;
struct event *event;
struct list wait_q; /* only a single one can be queued */
struct async_queue *wait_q; /* only a single one can be queued */
unsigned int options; /* pipe options */
};
......@@ -96,7 +96,7 @@ struct named_pipe
unsigned int timeout;
unsigned int instances;
struct list servers; /* list of servers using this pipe */
struct list waiters; /* list of clients waiting to connect */
struct async_queue *waiters; /* list of clients waiting to connect */
};
struct named_pipe_device
......@@ -274,7 +274,7 @@ static void named_pipe_destroy( struct object *obj)
assert( list_empty( &pipe->servers ) );
assert( !pipe->instances );
async_terminate_queue( &pipe->waiters, STATUS_HANDLES_CLOSED );
if (pipe->waiters) release_object( pipe->waiters );
}
static struct fd *pipe_client_get_fd( struct object *obj )
......@@ -366,7 +366,7 @@ static void pipe_server_destroy( struct object *obj)
server->client = NULL;
}
async_terminate_head( &server->wait_q, STATUS_HANDLES_CLOSED );
release_object( server->wait_q );
assert( server->pipe->instances );
server->pipe->instances--;
......@@ -634,7 +634,7 @@ static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned
server->client = NULL;
server->flush_poll = NULL;
server->options = options;
list_init( &server->wait_q );
server->wait_q = create_async_queue( NULL );
list_add_head( &pipe->servers, &server->entry );
grab_object( pipe );
......@@ -718,8 +718,7 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc
if (client->fd && server->fd && res != 1)
{
if (server->state == ps_wait_open)
async_terminate_head( &server->wait_q, STATUS_SUCCESS );
assert( list_empty( &server->wait_q ) );
async_wake_up( server->wait_q, STATUS_SUCCESS );
server->state = ps_connected_server;
server->client = client;
client->server = server;
......@@ -753,8 +752,8 @@ DECL_HANDLER(create_named_pipe)
{
/* initialize it if it didn't already exist */
pipe->instances = 0;
pipe->waiters = NULL;
list_init( &pipe->servers );
list_init( &pipe->waiters );
pipe->insize = req->insize;
pipe->outsize = req->outsize;
pipe->maxinstances = req->maxinstances;
......@@ -805,8 +804,8 @@ DECL_HANDLER(connect_named_pipe)
case ps_wait_connect:
assert( !server->fd );
server->state = ps_wait_open;
create_async( current, NULL, &server->wait_q, &req->async );
async_terminate_queue( &server->pipe->waiters, STATUS_SUCCESS );
create_async( current, NULL, server->wait_q, &req->async );
if (server->pipe->waiters) async_wake_up( server->pipe->waiters, STATUS_SUCCESS );
set_error( STATUS_PENDING );
break;
case ps_connected_server:
......@@ -849,9 +848,14 @@ DECL_HANDLER(wait_named_pipe)
server = find_available_server( pipe );
if (!server)
{
if (!pipe->waiters && !(pipe->waiters = create_async_queue( NULL )))
{
release_object( pipe );
return;
}
if (req->timeout == NMPWAIT_WAIT_FOREVER)
{
if (create_async( current, NULL, &pipe->waiters, &req->async ))
if (create_async( current, NULL, pipe->waiters, &req->async ))
set_error( STATUS_PENDING );
}
else
......@@ -859,7 +863,7 @@ DECL_HANDLER(wait_named_pipe)
struct timeval when = current_time;
if (req->timeout == NMPWAIT_USE_DEFAULT_WAIT) add_timeout( &when, pipe->timeout );
else add_timeout( &when, req->timeout );
if (create_async( current, &when, &pipe->waiters, &req->async ))
if (create_async( current, &when, pipe->waiters, &req->async ))
set_error( STATUS_PENDING );
}
}
......
......@@ -267,7 +267,7 @@ DECL_HANDLER(set_serial_info)
serial->eventmask = req->eventmask;
if (!serial->eventmask)
{
fd_async_terminate_queue( serial->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
fd_async_wake_up( serial->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
}
}
......
......@@ -83,8 +83,8 @@ struct sock
obj_handle_t wparam; /* message wparam (socket handle) */
int errors[FD_MAX_EVENTS]; /* event errors */
struct sock *deferred; /* socket that waits for a deferred accept */
struct list read_q; /* queue for asynchronous reads */
struct list write_q; /* queue for asynchronous writes */
struct async_queue *read_q; /* queue for asynchronous reads */
struct async_queue *write_q; /* queue for asynchronous writes */
};
static void sock_dump( struct object *obj, int verbose );
......@@ -245,16 +245,16 @@ static void sock_wake_up( struct sock *sock, int pollev )
if ( sock->flags & WSA_FLAG_OVERLAPPED )
{
if ( pollev & (POLLIN|POLLPRI) && !list_empty( &sock->read_q ))
if ( pollev & (POLLIN|POLLPRI) && async_waiting( sock->read_q ))
{
if (debug_level) fprintf( stderr, "activating read queue for socket %p\n", sock );
async_terminate_head( &sock->read_q, STATUS_ALERTED );
async_wake_up( sock->read_q, STATUS_ALERTED );
async_active = 1;
}
if ( pollev & POLLOUT && !list_empty( &sock->write_q ))
if ( pollev & POLLOUT && async_waiting( sock->write_q ))
{
if (debug_level) fprintf( stderr, "activating write queue for socket %p\n", sock );
async_terminate_head( &sock->write_q, STATUS_ALERTED );
async_wake_up( sock->write_q, STATUS_ALERTED );
async_active = 1;
}
}
......@@ -483,9 +483,9 @@ static int sock_get_poll_events( struct fd *fd )
/* listening, wait for readable */
return (sock->hmask & FD_ACCEPT) ? 0 : POLLIN;
if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && !list_empty( &sock->read_q )))
if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && async_waiting( sock->read_q )))
ev |= POLLIN | POLLPRI;
if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && !list_empty( &sock->write_q )))
if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && async_waiting( sock->write_q )))
ev |= POLLOUT;
/* We use POLLIN with 0 bytes recv() as FD_CLOSE indication for stream sockets. */
if ( sock->type == SOCK_STREAM && ( sock->mask & ~sock->hmask & FD_CLOSE) )
......@@ -512,7 +512,7 @@ static enum server_fd_type sock_get_info( struct fd *fd, int *flags )
static void sock_queue_async( struct fd *fd, const async_data_t *data, int type, int count )
{
struct sock *sock = get_fd_user( fd );
struct list *queue;
struct async_queue *queue;
int pollev;
assert( sock->obj.ops == &sock_ops );
......@@ -526,11 +526,13 @@ static void sock_queue_async( struct fd *fd, const async_data_t *data, int type,
switch (type)
{
case ASYNC_TYPE_READ:
queue = &sock->read_q;
if (!sock->read_q && !(sock->read_q = create_async_queue( sock->fd ))) return;
queue = sock->read_q;
sock->hmask &= ~FD_CLOSE;
break;
case ASYNC_TYPE_WRITE:
queue = &sock->write_q;
if (!sock->write_q && !(sock->write_q = create_async_queue( sock->fd ))) return;
queue = sock->write_q;
break;
default:
set_error( STATUS_INVALID_PARAMETER );
......@@ -557,8 +559,8 @@ static void sock_cancel_async( struct fd *fd )
struct sock *sock = get_fd_user( fd );
assert( sock->obj.ops == &sock_ops );
async_terminate_queue( &sock->read_q, STATUS_CANCELLED );
async_terminate_queue( &sock->write_q, STATUS_CANCELLED );
async_wake_up( sock->read_q, STATUS_CANCELLED );
async_wake_up( sock->write_q, STATUS_CANCELLED );
}
static struct fd *sock_get_fd( struct object *obj )
......@@ -577,11 +579,8 @@ static void sock_destroy( struct object *obj )
if ( sock->deferred )
release_object( sock->deferred );
if ( sock->flags & WSA_FLAG_OVERLAPPED )
{
async_terminate_queue( &sock->read_q, STATUS_CANCELLED );
async_terminate_queue( &sock->write_q, STATUS_CANCELLED );
}
if (sock->read_q) release_object( sock->read_q );
if (sock->write_q) release_object( sock->write_q );
if (sock->event) release_object( sock->event );
if (sock->fd)
{
......@@ -624,13 +623,13 @@ static struct object *create_socket( int family, int type, int protocol, unsigne
sock->message = 0;
sock->wparam = 0;
sock->deferred = NULL;
sock->read_q = NULL;
sock->write_q = NULL;
if (!(sock->fd = create_anonymous_fd( &sock_fd_ops, sockfd, &sock->obj )))
{
release_object( sock );
return NULL;
}
list_init( &sock->read_q );
list_init( &sock->write_q );
sock_reselect( sock );
clear_error();
return &sock->obj;
......@@ -693,14 +692,14 @@ static struct sock *accept_socket( obj_handle_t handle )
if (sock->event) acceptsock->event = (struct event *)grab_object( sock->event );
acceptsock->flags = sock->flags;
acceptsock->deferred = NULL;
acceptsock->read_q = NULL;
acceptsock->write_q = NULL;
if (!(acceptsock->fd = create_anonymous_fd( &sock_fd_ops, acceptfd, &acceptsock->obj )))
{
release_object( acceptsock );
release_object( sock );
return NULL;
}
list_init( &acceptsock->read_q );
list_init( &acceptsock->write_q );
}
clear_error();
sock->pmask &= ~FD_ACCEPT;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment