Commit 0b4c8bf9 authored by Jacek Caban's avatar Jacek Caban Committed by Alexandre Julliard

server: Make async_queue object a simple list instead of a server object.

parent 7f17bae7
......@@ -82,40 +82,6 @@ static const struct object_ops async_ops =
async_destroy /* destroy */
};
struct async_queue
{
struct object obj; /* object header */
struct fd *fd; /* file descriptor owning this queue */
struct list queue; /* queue of async objects */
};
static void async_queue_dump( struct object *obj, int verbose );
static void async_queue_destroy( struct object *obj );
static const struct object_ops async_queue_ops =
{
sizeof(struct async_queue), /* size */
async_queue_dump, /* dump */
no_get_type, /* get_type */
no_add_queue, /* add_queue */
NULL, /* remove_queue */
NULL, /* signaled */
NULL, /* satisfied */
no_signal, /* signal */
no_get_fd, /* get_fd */
no_map_access, /* map_access */
default_get_sd, /* get_sd */
default_set_sd, /* set_sd */
no_lookup_name, /* lookup_name */
no_link_name, /* link_name */
NULL, /* unlink_name */
no_open_file, /* open_file */
no_close_handle, /* close_handle */
async_queue_destroy /* destroy */
};
static inline void async_reselect( struct async *async )
{
if (async->queue && async->fd) fd_reselect_async( async->fd, async->queue );
......@@ -165,7 +131,6 @@ static void async_destroy( struct object *obj )
{
list_remove( &async->queue_entry );
async_reselect( async );
release_object( async->queue );
}
else if (async->fd) release_object( async->fd );
......@@ -176,18 +141,6 @@ static void async_destroy( struct object *obj )
release_object( async->thread );
}
static void async_queue_dump( struct object *obj, int verbose )
{
struct async_queue *async_queue = (struct async_queue *)obj;
assert( obj->ops == &async_queue_ops );
fprintf( stderr, "Async queue fd=%p\n", async_queue->fd );
}
static void async_queue_destroy( struct object *obj )
{
assert( obj->ops == &async_queue_ops );
}
/* notifies client thread of new status of its async request */
void async_terminate( struct async *async, unsigned int status )
{
......@@ -232,33 +185,18 @@ static void async_timeout( void *private )
async_terminate( async, async->timeout_status );
}
/* create a new async queue for a given fd */
struct async_queue *create_async_queue( struct fd *fd )
{
struct async_queue *queue = alloc_object( &async_queue_ops );
if (queue)
{
queue->fd = fd;
list_init( &queue->queue );
}
return queue;
}
/* free an async queue, cancelling all async operations */
void free_async_queue( struct async_queue *queue )
{
struct async *async;
struct async *async, *next;
if (!queue) return;
LIST_FOR_EACH_ENTRY( async, &queue->queue, struct async, queue_entry )
LIST_FOR_EACH_ENTRY_SAFE( async, next, &queue->queue, struct async, queue_entry )
{
async->completion = fd_get_completion( async->fd, &async->comp_key );
async->fd = NULL;
async_terminate( async, STATUS_HANDLES_CLOSED );
async->queue = NULL;
}
queue->fd = NULL;
async_wake_up( queue, STATUS_HANDLES_CLOSED );
release_object( queue );
}
void queue_async( struct async_queue *queue, struct async *async )
......@@ -266,7 +204,7 @@ void queue_async( struct async_queue *queue, struct async *async )
/* fd will be set to NULL in free_async_queue when fd is destroyed */
release_object( async->fd );
async->queue = (struct async_queue *)grab_object( queue );
async->queue = queue;
grab_object( async );
list_add_tail( &queue->queue, &async->queue_entry );
......
......@@ -189,9 +189,9 @@ struct fd
unsigned int signaled :1; /* is the fd signaled? */
unsigned int fs_locks :1; /* can we use filesystem locks for this fd? */
int poll_index; /* index of fd in poll array */
struct async_queue *read_q; /* async readers of this fd */
struct async_queue *write_q; /* async writers of this fd */
struct async_queue *wait_q; /* other async waiters of this fd */
struct async_queue read_q; /* async readers of this fd */
struct async_queue write_q; /* async writers of this fd */
struct async_queue wait_q; /* other async waiters of this fd */
struct completion *completion; /* completion object attached to this fd */
apc_param_t comp_key; /* completion key to set in completion events */
};
......@@ -1473,9 +1473,9 @@ static void fd_destroy( struct object *obj )
{
struct fd *fd = (struct fd *)obj;
free_async_queue( fd->read_q );
free_async_queue( fd->write_q );
free_async_queue( fd->wait_q );
free_async_queue( &fd->read_q );
free_async_queue( &fd->write_q );
free_async_queue( &fd->wait_q );
if (fd->completion) release_object( fd->completion );
remove_fd_locks( fd );
......@@ -1567,8 +1567,8 @@ static inline void unmount_fd( struct fd *fd )
{
assert( fd->inode );
async_wake_up( fd->read_q, STATUS_VOLUME_DISMOUNTED );
async_wake_up( fd->write_q, STATUS_VOLUME_DISMOUNTED );
async_wake_up( &fd->read_q, STATUS_VOLUME_DISMOUNTED );
async_wake_up( &fd->write_q, STATUS_VOLUME_DISMOUNTED );
if (fd->poll_index != -1) set_fd_events( fd, -1 );
......@@ -1603,10 +1603,10 @@ static struct fd *alloc_fd_object(void)
fd->signaled = 1;
fd->fs_locks = 1;
fd->poll_index = -1;
fd->read_q = NULL;
fd->write_q = NULL;
fd->wait_q = NULL;
fd->completion = NULL;
init_async_queue( &fd->read_q );
init_async_queue( &fd->write_q );
init_async_queue( &fd->wait_q );
list_init( &fd->inode_entry );
list_init( &fd->locks );
......@@ -1638,11 +1638,11 @@ struct fd *alloc_pseudo_fd( const struct fd_ops *fd_user_ops, struct object *use
fd->signaled = 0;
fd->fs_locks = 0;
fd->poll_index = -1;
fd->read_q = NULL;
fd->write_q = NULL;
fd->wait_q = NULL;
fd->completion = NULL;
fd->no_fd_status = STATUS_BAD_DEVICE_TYPE;
init_async_queue( &fd->read_q );
init_async_queue( &fd->write_q );
init_async_queue( &fd->wait_q );
list_init( &fd->inode_entry );
list_init( &fd->locks );
return fd;
......@@ -2014,16 +2014,16 @@ int default_fd_get_poll_events( struct fd *fd )
{
int events = 0;
if (async_waiting( fd->read_q )) events |= POLLIN;
if (async_waiting( fd->write_q )) events |= POLLOUT;
if (async_waiting( &fd->read_q )) events |= POLLIN;
if (async_waiting( &fd->write_q )) events |= POLLOUT;
return events;
}
/* default handler for poll() events */
void default_poll_event( struct fd *fd, int event )
{
if (event & (POLLIN | POLLERR | POLLHUP)) async_wake_up( fd->read_q, STATUS_ALERTED );
if (event & (POLLOUT | POLLERR | POLLHUP)) async_wake_up( fd->write_q, STATUS_ALERTED );
if (event & (POLLIN | POLLERR | POLLHUP)) async_wake_up( &fd->read_q, STATUS_ALERTED );
if (event & (POLLOUT | POLLERR | POLLHUP)) async_wake_up( &fd->write_q, STATUS_ALERTED );
/* if an error occurred, stop polling this fd to avoid busy-looping */
if (event & (POLLERR | POLLHUP)) set_fd_events( fd, -1 );
......@@ -2037,16 +2037,13 @@ int fd_queue_async( struct fd *fd, struct async *async, int type )
switch (type)
{
case ASYNC_TYPE_READ:
if (!fd->read_q && !(fd->read_q = create_async_queue( fd ))) return 0;
queue = fd->read_q;
queue = &fd->read_q;
break;
case ASYNC_TYPE_WRITE:
if (!fd->write_q && !(fd->write_q = create_async_queue( fd ))) return 0;
queue = fd->write_q;
queue = &fd->write_q;
break;
case ASYNC_TYPE_WAIT:
if (!fd->wait_q && !(fd->wait_q = create_async_queue( fd ))) return 0;
queue = fd->wait_q;
queue = &fd->wait_q;
break;
default:
queue = NULL;
......@@ -2070,13 +2067,13 @@ void fd_async_wake_up( struct fd *fd, int type, unsigned int status )
switch (type)
{
case ASYNC_TYPE_READ:
async_wake_up( fd->read_q, status );
async_wake_up( &fd->read_q, status );
break;
case ASYNC_TYPE_WRITE:
async_wake_up( fd->write_q, status );
async_wake_up( &fd->write_q, status );
break;
case ASYNC_TYPE_WAIT:
async_wake_up( fd->wait_q, status );
async_wake_up( &fd->wait_q, status );
break;
default:
assert(0);
......@@ -2101,7 +2098,7 @@ void default_fd_queue_async( struct fd *fd, struct async *async, int type, int c
/* default reselect_async() fd routine */
void default_fd_reselect_async( struct fd *fd, struct async_queue *queue )
{
if (queue == fd->read_q || queue == fd->write_q)
if (queue == &fd->read_q || queue == &fd->write_q)
{
int poll_events = fd->fd_ops->get_poll_events( fd );
int events = check_fd_events( fd, poll_events );
......
......@@ -42,6 +42,11 @@ struct iosb
void *out_data; /* output data */
};
struct async_queue
{
struct list queue; /* queue of async objects */
};
/* operations valid on file descriptor objects */
struct fd_ops
{
......@@ -173,7 +178,6 @@ extern int is_serial_fd( struct fd *fd );
extern struct object *create_serial( struct fd *fd );
/* async I/O functions */
extern struct async_queue *create_async_queue( struct fd *fd );
extern void free_async_queue( struct async_queue *queue );
extern struct async *create_async( struct fd *fd, struct thread *thread, const async_data_t *data, struct iosb *iosb );
extern struct async *create_request_async( struct fd *fd, const async_data_t *data );
......@@ -193,6 +197,11 @@ extern int async_is_blocking( struct async *async );
extern struct async *find_pending_async( struct async_queue *queue );
extern void cancel_process_asyncs( struct process *process );
static inline void init_async_queue( struct async_queue *queue )
{
list_init( &queue->queue );
}
/* access rights that require Unix read permission */
#define FILE_UNIX_READ_ACCESS (FILE_READ_DATA|FILE_READ_ATTRIBUTES|FILE_READ_EA)
......
......@@ -79,8 +79,8 @@ struct pipe_end
struct pipe_end *connection; /* the other end of the pipe */
data_size_t buffer_size;/* size of buffered data that doesn't block caller */
struct list message_queue;
struct async_queue *read_q; /* read queue */
struct async_queue *write_q; /* write queue */
struct async_queue read_q; /* read queue */
struct async_queue write_q; /* write queue */
};
struct pipe_server
......@@ -113,7 +113,7 @@ struct named_pipe
unsigned int instances;
timeout_t timeout;
struct list servers; /* list of servers using this pipe */
struct async_queue *waiters; /* list of clients waiting to connect */
struct async_queue waiters; /* list of clients waiting to connect */
};
struct named_pipe_device
......@@ -337,7 +337,7 @@ static void named_pipe_destroy( struct object *obj)
assert( list_empty( &pipe->servers ) );
assert( !pipe->instances );
free_async_queue( pipe->waiters );
free_async_queue( &pipe->waiters );
}
static struct fd *pipe_client_get_fd( struct object *obj )
......@@ -421,7 +421,7 @@ static void pipe_end_disconnect( struct pipe_end *pipe_end, unsigned int status
struct pipe_message *message, *next;
struct async *async;
if (pipe_end->fd) fd_async_wake_up( pipe_end->fd, ASYNC_TYPE_WAIT, status );
async_wake_up( pipe_end->read_q, status );
async_wake_up( &pipe_end->read_q, status );
LIST_FOR_EACH_ENTRY_SAFE( message, next, &pipe_end->message_queue, struct pipe_message, entry )
{
async = message->async;
......@@ -470,8 +470,8 @@ static void pipe_end_destroy( struct pipe_end *pipe_end )
free_message( message );
}
free_async_queue( pipe_end->read_q );
free_async_queue( pipe_end->write_q );
free_async_queue( &pipe_end->read_q );
free_async_queue( &pipe_end->write_q );
}
static void pipe_server_destroy( struct object *obj)
......@@ -756,7 +756,7 @@ static void reselect_read_queue( struct pipe_end *pipe_end )
int read_done = 0;
ignore_reselect = 1;
while (!list_empty( &pipe_end->message_queue) && (async = find_pending_async( pipe_end->read_q )))
while (!list_empty( &pipe_end->message_queue ) && (async = find_pending_async( &pipe_end->read_q )))
{
iosb = async_get_iosb( async );
message_queue_read( pipe_end, iosb );
......@@ -818,9 +818,7 @@ static int pipe_end_read( struct fd *fd, struct async *async, file_pos_t pos )
return 0;
}
if (!pipe_end->read_q && !(pipe_end->read_q = create_async_queue( fd ))) return 0;
queue_async( pipe_end->read_q, async );
queue_async( &pipe_end->read_q, async );
reselect_read_queue( pipe_end );
set_error( STATUS_PENDING );
return 1;
......@@ -840,15 +838,13 @@ static int pipe_end_write( struct fd *fd, struct async *async, file_pos_t pos )
return 0;
}
if (!write_end->write_q && !(write_end->write_q = create_async_queue( fd ))) return 0;
if (!(message = mem_alloc( sizeof(*message) ))) return 0;
message->async = (struct async *)grab_object( async );
message->iosb = async_get_iosb( async );
message->read_pos = 0;
list_add_tail( &read_end->message_queue, &message->entry );
queue_async( write_end->write_q, async );
queue_async( &write_end->write_q, async );
reselect_write_queue( write_end );
set_error( STATUS_PENDING );
return 1;
......@@ -869,9 +865,9 @@ static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue )
if (!use_server_io( pipe_end ))
default_fd_reselect_async( fd, queue );
else if (pipe_end->write_q && pipe_end->write_q == queue)
else if (&pipe_end->write_q == queue)
reselect_write_queue( pipe_end );
else if (pipe_end->read_q && pipe_end->read_q == queue)
else if (&pipe_end->read_q == queue)
reselect_read_queue( pipe_end );
}
......@@ -940,7 +936,7 @@ static int pipe_server_ioctl( struct fd *fd, ioctl_code_t code, struct async *as
if (fd_queue_async( server->ioctl_fd, async, ASYNC_TYPE_WAIT ))
{
set_server_state( server, ps_wait_open );
if (server->pipe->waiters) async_wake_up( server->pipe->waiters, STATUS_SUCCESS );
async_wake_up( &server->pipe->waiters, STATUS_SUCCESS );
set_error( STATUS_PENDING );
return 1;
}
......@@ -1025,8 +1021,8 @@ static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, d
pipe_end->flags = pipe_flags;
pipe_end->connection = NULL;
pipe_end->buffer_size = buffer_size;
pipe_end->read_q = NULL;
pipe_end->write_q = NULL;
init_async_queue( &pipe_end->read_q );
init_async_queue( &pipe_end->write_q );
list_init( &pipe_end->message_queue );
}
......@@ -1231,18 +1227,15 @@ static int named_pipe_device_ioctl( struct fd *fd, ioctl_code_t code, struct asy
if (!(server = find_available_server( pipe )))
{
if (!pipe->waiters && !(pipe->waiters = create_async_queue( fd ))) goto done;
queue_async( pipe->waiters, async );
queue_async( &pipe->waiters, async );
when = buffer->TimeoutSpecified ? buffer->Timeout.QuadPart : pipe->timeout;
async_set_timeout( async, when, STATUS_IO_TIMEOUT );
release_object( pipe );
set_error( STATUS_PENDING );
return 1;
}
else release_object( server );
done:
release_object( server );
release_object( pipe );
return 0;
}
......@@ -1291,7 +1284,7 @@ DECL_HANDLER(create_named_pipe)
{
/* initialize it if it didn't already exist */
pipe->instances = 0;
pipe->waiters = NULL;
init_async_queue( &pipe->waiters );
list_init( &pipe->servers );
pipe->insize = req->insize;
pipe->outsize = req->outsize;
......
......@@ -112,9 +112,9 @@ struct sock
int errors[FD_MAX_EVENTS]; /* event errors */
timeout_t connect_time;/* time the socket was connected */
struct sock *deferred; /* socket that waits for a deferred accept */
struct async_queue *read_q; /* queue for asynchronous reads */
struct async_queue *write_q; /* queue for asynchronous writes */
struct async_queue *ifchange_q; /* queue for interface change notifications */
struct async_queue read_q; /* queue for asynchronous reads */
struct async_queue write_q; /* queue for asynchronous writes */
struct async_queue ifchange_q; /* queue for interface change notifications */
struct object *ifchange_obj; /* the interface change notification object */
struct list ifchange_entry; /* entry in ifchange notification list */
};
......@@ -312,16 +312,16 @@ static int sock_dispatch_asyncs( struct sock *sock, int event, int error )
{
if ( sock->flags & WSA_FLAG_OVERLAPPED )
{
if ( event & (POLLIN|POLLPRI) && async_waiting( sock->read_q ) )
if (event & (POLLIN|POLLPRI) && async_waiting( &sock->read_q ))
{
if (debug_level) fprintf( stderr, "activating read queue for socket %p\n", sock );
async_wake_up( sock->read_q, STATUS_ALERTED );
async_wake_up( &sock->read_q, STATUS_ALERTED );
event &= ~(POLLIN|POLLPRI);
}
if ( event & POLLOUT && async_waiting( sock->write_q ) )
if (event & POLLOUT && async_waiting( &sock->write_q ))
{
if (debug_level) fprintf( stderr, "activating write queue for socket %p\n", sock );
async_wake_up( sock->write_q, STATUS_ALERTED );
async_wake_up( &sock->write_q, STATUS_ALERTED );
event &= ~POLLOUT;
}
if ( event & (POLLERR|POLLHUP) )
......@@ -329,9 +329,9 @@ static int sock_dispatch_asyncs( struct sock *sock, int event, int error )
int status = sock_get_ntstatus( error );
if ( !(sock->state & FD_READ) )
async_wake_up( sock->read_q, status );
async_wake_up( &sock->read_q, status );
if ( !(sock->state & FD_WRITE) )
async_wake_up( sock->write_q, status );
async_wake_up( &sock->write_q, status );
}
}
return event;
......@@ -508,9 +508,9 @@ static int sock_get_poll_events( struct fd *fd )
/* connecting, wait for writable */
return POLLOUT;
if ( async_queued( sock->read_q ) )
if (async_queued( &sock->read_q ))
{
if ( async_waiting( sock->read_q ) ) ev |= POLLIN | POLLPRI;
if (async_waiting( &sock->read_q )) ev |= POLLIN | POLLPRI;
}
else if (smask & FD_READ || (sock->state & FD_WINE_LISTENING && mask & FD_ACCEPT))
ev |= POLLIN | POLLPRI;
......@@ -519,9 +519,9 @@ static int sock_get_poll_events( struct fd *fd )
!(sock->hmask & FD_READ) )
ev |= POLLIN;
if ( async_queued( sock->write_q ) )
if (async_queued( &sock->write_q ))
{
if ( async_waiting( sock->write_q ) ) ev |= POLLOUT;
if (async_waiting( &sock->write_q )) ev |= POLLOUT;
}
else if (smask & FD_WRITE)
ev |= POLLOUT;
......@@ -549,8 +549,7 @@ static int sock_ioctl( struct fd *fd, ioctl_code_t code, struct async *async )
return 0;
}
if (!sock_get_ifchange( sock )) return 0;
if (!sock->ifchange_q && !(sock->ifchange_q = create_async_queue( sock->fd ))) return 0;
queue_async( sock->ifchange_q, async );
queue_async( &sock->ifchange_q, async );
set_error( STATUS_PENDING );
return 1;
default:
......@@ -569,12 +568,10 @@ static void sock_queue_async( struct fd *fd, struct async *async, int type, int
switch (type)
{
case ASYNC_TYPE_READ:
if (!sock->read_q && !(sock->read_q = create_async_queue( sock->fd ))) return;
queue = sock->read_q;
queue = &sock->read_q;
break;
case ASYNC_TYPE_WRITE:
if (!sock->write_q && !(sock->write_q = create_async_queue( sock->fd ))) return;
queue = sock->write_q;
queue = &sock->write_q;
break;
default:
set_error( STATUS_INVALID_PARAMETER );
......@@ -598,7 +595,7 @@ static void sock_reselect_async( struct fd *fd, struct async_queue *queue )
{
struct sock *sock = get_fd_user( fd );
/* ignore reselect on ifchange queue */
if (sock->ifchange_q != queue)
if (&sock->ifchange_q != queue)
sock_reselect( sock );
}
......@@ -618,11 +615,11 @@ static void sock_destroy( struct object *obj )
if ( sock->deferred )
release_object( sock->deferred );
async_wake_up( sock->ifchange_q, STATUS_CANCELLED );
async_wake_up( &sock->ifchange_q, STATUS_CANCELLED );
sock_release_ifchange( sock );
free_async_queue( sock->read_q );
free_async_queue( sock->write_q );
free_async_queue( sock->ifchange_q );
free_async_queue( &sock->read_q );
free_async_queue( &sock->write_q );
free_async_queue( &sock->ifchange_q );
if (sock->event) release_object( sock->event );
if (sock->fd)
{
......@@ -648,10 +645,10 @@ static void init_sock(struct sock *sock)
sock->wparam = 0;
sock->connect_time = 0;
sock->deferred = NULL;
sock->read_q = NULL;
sock->write_q = NULL;
sock->ifchange_q = NULL;
sock->ifchange_obj = NULL;
init_async_queue( &sock->read_q );
init_async_queue( &sock->write_q );
init_async_queue( &sock->ifchange_q );
memset( sock->errors, 0, sizeof(sock->errors) );
}
......@@ -1040,7 +1037,7 @@ static void ifchange_wake_up( struct object *obj, unsigned int status )
struct sock *sock = LIST_ENTRY( ptr, struct sock, ifchange_entry );
assert( sock->ifchange_obj );
async_wake_up( sock->ifchange_q, status ); /* issue ifchange notification for the socket */
async_wake_up( &sock->ifchange_q, status ); /* issue ifchange notification for the socket */
sock_release_ifchange( sock ); /* remove socket from list and decrement ifchange refcount */
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment