Commit dd81ac50 authored by Alexandre Julliard's avatar Alexandre Julliard

Convert async I/O queues to standard lists.

parent 91fc881d
...@@ -991,10 +991,23 @@ struct async ...@@ -991,10 +991,23 @@ struct async
void *sb; void *sb;
struct timeval when; struct timeval when;
struct timeout_user *timeout; struct timeout_user *timeout;
struct async *next; struct list entry;
struct async **head;
}; };
/* notifies client thread of new status of its async request */
/* destroys the server side of it */
static void async_terminate( struct async *async, int status )
{
thread_queue_apc( async->thread, NULL, async->apc, APC_ASYNC_IO,
1, async->user, async->sb, (void *)status );
if (async->timeout) remove_timeout_user( async->timeout );
async->timeout = NULL;
list_remove( &async->entry );
release_object( async->thread );
free( async );
}
/* cb for timeout on an async request */ /* cb for timeout on an async request */
static void async_callback(void *private) static void async_callback(void *private)
{ {
...@@ -1006,12 +1019,10 @@ static void async_callback(void *private) ...@@ -1006,12 +1019,10 @@ static void async_callback(void *private)
} }
/* create an async on a given queue of a fd */ /* create an async on a given queue of a fd */
struct async *create_async(struct fd *fd, struct thread *thread, struct async *create_async(struct fd *fd, struct thread *thread, int timeout, struct list *queue,
int timeout, struct async **head,
void *io_apc, void *io_user, void* io_sb) void *io_apc, void *io_user, void* io_sb)
{ {
struct async *async = mem_alloc( sizeof(struct async) ); struct async *async = mem_alloc( sizeof(struct async) );
struct async **p;
if (!async) return NULL; if (!async) return NULL;
...@@ -1020,11 +1031,8 @@ struct async *create_async(struct fd *fd, struct thread *thread, ...@@ -1020,11 +1031,8 @@ struct async *create_async(struct fd *fd, struct thread *thread,
async->apc = io_apc; async->apc = io_apc;
async->user = io_user; async->user = io_user;
async->sb = io_sb; async->sb = io_sb;
async->head = head;
async->next = NULL;
for (p = head; *p; p = &(*p)->next); list_add_tail( queue, &async->entry );
*p = async;
if (timeout) if (timeout)
{ {
...@@ -1037,29 +1045,11 @@ struct async *create_async(struct fd *fd, struct thread *thread, ...@@ -1037,29 +1045,11 @@ struct async *create_async(struct fd *fd, struct thread *thread,
return async; return async;
} }
/* notifies client thread of new status of its async request */ /* terminate the async operation at the head of the queue */
/* destroys the server side of it */ void async_terminate_head( struct list *queue, int status )
void async_terminate( struct async *async, int status )
{ {
struct async** p; struct list *ptr = list_head( queue );
if (ptr) async_terminate( LIST_ENTRY( ptr, struct async, entry ), status );
thread_queue_apc( async->thread, NULL, async->apc, APC_ASYNC_IO,
1, async->user, async->sb, (void *)status );
if (async->timeout) remove_timeout_user( async->timeout );
async->timeout = NULL;
for (p = async->head; *p; p = &(*p)->next)
{
if (*p == async)
{
*p = async->next;
break;
}
}
release_object( async->thread );
free( async );
} }
/****************************************************************/ /****************************************************************/
......
...@@ -57,8 +57,8 @@ struct file ...@@ -57,8 +57,8 @@ struct file
struct fd *fd; /* file descriptor for this file */ struct fd *fd; /* file descriptor for this file */
unsigned int access; /* file access (GENERIC_READ/WRITE) */ unsigned int access; /* file access (GENERIC_READ/WRITE) */
unsigned int options; /* file options (FILE_DELETE_ON_CLOSE, FILE_SYNCHRONOUS...) */ unsigned int options; /* file options (FILE_DELETE_ON_CLOSE, FILE_SYNCHRONOUS...) */
struct async *read_q; struct list read_q;
struct async *write_q; struct list write_q;
}; };
static void file_dump( struct object *obj, int verbose ); static void file_dump( struct object *obj, int verbose );
...@@ -160,10 +160,8 @@ static struct object *create_file( const char *nameptr, size_t len, unsigned int ...@@ -160,10 +160,8 @@ static struct object *create_file( const char *nameptr, size_t len, unsigned int
file->access = access; file->access = access;
file->options = options; file->options = options;
if (is_overlapped( file )) list_init( &file->read_q );
{ list_init( &file->write_q );
file->read_q = file->write_q = NULL;
}
/* FIXME: should set error to STATUS_OBJECT_NAME_COLLISION if file existed before */ /* FIXME: should set error to STATUS_OBJECT_NAME_COLLISION if file existed before */
if (!(file->fd = alloc_fd( &file_fd_ops, &file->obj )) || if (!(file->fd = alloc_fd( &file_fd_ops, &file->obj )) ||
...@@ -237,14 +235,14 @@ static void file_poll_event( struct fd *fd, int event ) ...@@ -237,14 +235,14 @@ static void file_poll_event( struct fd *fd, int event )
assert( file->obj.ops == &file_ops ); assert( file->obj.ops == &file_ops );
if (is_overlapped( file )) if (is_overlapped( file ))
{ {
if ( file->read_q && (POLLIN & event) ) if (!list_empty( &file->read_q ) && (POLLIN & event) )
{ {
async_terminate( file->read_q, STATUS_ALERTED ); async_terminate_head( &file->read_q, STATUS_ALERTED );
return; return;
} }
if ( file->write_q && (POLLOUT & event) ) if (!list_empty( &file->write_q ) && (POLLOUT & event) )
{ {
async_terminate( file->write_q, STATUS_ALERTED ); async_terminate_head( &file->write_q, STATUS_ALERTED );
return; return;
} }
} }
...@@ -271,7 +269,7 @@ static void file_queue_async( struct fd *fd, void *apc, void *user, void *iosb, ...@@ -271,7 +269,7 @@ static void file_queue_async( struct fd *fd, void *apc, void *user, void *iosb,
int type, int count ) int type, int count )
{ {
struct file *file = get_fd_user( fd ); struct file *file = get_fd_user( fd );
struct async **head; struct list *queue;
int events; int events;
assert( file->obj.ops == &file_ops ); assert( file->obj.ops == &file_ops );
...@@ -285,17 +283,17 @@ static void file_queue_async( struct fd *fd, void *apc, void *user, void *iosb, ...@@ -285,17 +283,17 @@ static void file_queue_async( struct fd *fd, void *apc, void *user, void *iosb,
switch (type) switch (type)
{ {
case ASYNC_TYPE_READ: case ASYNC_TYPE_READ:
head = &file->read_q; queue = &file->read_q;
break; break;
case ASYNC_TYPE_WRITE: case ASYNC_TYPE_WRITE:
head = &file->write_q; queue = &file->write_q;
break; break;
default: default:
set_error( STATUS_INVALID_PARAMETER ); set_error( STATUS_INVALID_PARAMETER );
return; return;
} }
if (!create_async( fd, current, 0, head, apc, user, iosb )) if (!create_async( fd, current, 0, queue, apc, user, iosb ))
return; return;
/* Check if the new pending request can be served immediately */ /* Check if the new pending request can be served immediately */
......
...@@ -112,12 +112,12 @@ extern struct object *create_serial( struct fd *fd, unsigned int options ); ...@@ -112,12 +112,12 @@ extern struct object *create_serial( struct fd *fd, unsigned int options );
/* async I/O functions */ /* async I/O functions */
extern struct async *create_async( struct fd *fd, struct thread *thread, int timeout, extern struct async *create_async( struct fd *fd, struct thread *thread, int timeout,
struct async **head, void *, void *, void *); struct list *queue, void *, void *, void *);
extern void async_terminate( struct async *async, int status ); extern void async_terminate_head( struct list *queue, int status );
static inline void async_terminate_queue( struct async **head, int status ) static inline void async_terminate_queue( struct list *queue, int status )
{ {
while (*head) async_terminate( *head, status ); while (!list_empty( queue )) async_terminate_head( queue, status );
} }
#endif /* __WINE_SERVER_FILE_H */ #endif /* __WINE_SERVER_FILE_H */
...@@ -83,9 +83,9 @@ struct serial ...@@ -83,9 +83,9 @@ struct serial
struct termios original; struct termios original;
struct async *read_q; struct list read_q;
struct async *write_q; struct list write_q;
struct async *wait_q; struct list wait_q;
/* FIXME: add dcb, comm status, handler module, sharing */ /* FIXME: add dcb, comm status, handler module, sharing */
}; };
...@@ -145,7 +145,9 @@ struct object *create_serial( struct fd *fd, unsigned int options ) ...@@ -145,7 +145,9 @@ struct object *create_serial( struct fd *fd, unsigned int options )
serial->writeconst = 0; serial->writeconst = 0;
serial->eventmask = 0; serial->eventmask = 0;
serial->commerror = 0; serial->commerror = 0;
serial->read_q = serial->write_q = serial->wait_q = NULL; list_init( &serial->read_q );
list_init( &serial->write_q );
list_init( &serial->wait_q );
if (!(serial->fd = create_anonymous_fd( &serial_fd_ops, unix_fd, &serial->obj ))) if (!(serial->fd = create_anonymous_fd( &serial_fd_ops, unix_fd, &serial->obj )))
{ {
release_object( serial ); release_object( serial );
...@@ -188,9 +190,9 @@ static int serial_get_poll_events( struct fd *fd ) ...@@ -188,9 +190,9 @@ static int serial_get_poll_events( struct fd *fd )
int events = 0; int events = 0;
assert( serial->obj.ops == &serial_ops ); assert( serial->obj.ops == &serial_ops );
if (serial->read_q) events |= POLLIN; if (!list_empty( &serial->read_q )) events |= POLLIN;
if (serial->write_q) events |= POLLOUT; if (!list_empty( &serial->write_q )) events |= POLLOUT;
if (serial->wait_q) events |= POLLIN; if (!list_empty( &serial->wait_q )) events |= POLLIN;
/* fprintf(stderr,"poll events are %04x\n",events); */ /* fprintf(stderr,"poll events are %04x\n",events); */
...@@ -221,14 +223,14 @@ static void serial_poll_event(struct fd *fd, int event) ...@@ -221,14 +223,14 @@ static void serial_poll_event(struct fd *fd, int event)
/* fprintf(stderr,"Poll event %02x\n",event); */ /* fprintf(stderr,"Poll event %02x\n",event); */
if (serial->read_q && (POLLIN & event) ) if (!list_empty( &serial->read_q ) && (POLLIN & event) )
async_terminate( serial->read_q, STATUS_ALERTED ); async_terminate_head( &serial->read_q, STATUS_ALERTED );
if (serial->write_q && (POLLOUT & event) ) if (!list_empty( &serial->write_q ) && (POLLOUT & event) )
async_terminate( serial->write_q, STATUS_ALERTED ); async_terminate_head( &serial->write_q, STATUS_ALERTED );
if (serial->wait_q && (POLLIN & event) ) if (!list_empty( &serial->wait_q ) && (POLLIN & event) )
async_terminate( serial->wait_q, STATUS_ALERTED ); async_terminate_head( &serial->wait_q, STATUS_ALERTED );
set_fd_events( fd, serial_get_poll_events(fd) ); set_fd_events( fd, serial_get_poll_events(fd) );
} }
...@@ -237,7 +239,7 @@ static void serial_queue_async( struct fd *fd, void *apc, void *user, void *iosb ...@@ -237,7 +239,7 @@ static void serial_queue_async( struct fd *fd, void *apc, void *user, void *iosb
int type, int count ) int type, int count )
{ {
struct serial *serial = get_fd_user( fd ); struct serial *serial = get_fd_user( fd );
struct async **head; struct list *queue;
int timeout; int timeout;
int events; int events;
...@@ -246,15 +248,15 @@ static void serial_queue_async( struct fd *fd, void *apc, void *user, void *iosb ...@@ -246,15 +248,15 @@ static void serial_queue_async( struct fd *fd, void *apc, void *user, void *iosb
switch (type) switch (type)
{ {
case ASYNC_TYPE_READ: case ASYNC_TYPE_READ:
head = &serial->read_q; queue = &serial->read_q;
timeout = serial->readconst + serial->readmult*count; timeout = serial->readconst + serial->readmult*count;
break; break;
case ASYNC_TYPE_WAIT: case ASYNC_TYPE_WAIT:
head = &serial->wait_q; queue = &serial->wait_q;
timeout = 0; timeout = 0;
break; break;
case ASYNC_TYPE_WRITE: case ASYNC_TYPE_WRITE:
head = &serial->write_q; queue = &serial->write_q;
timeout = serial->writeconst + serial->writemult*count; timeout = serial->writeconst + serial->writemult*count;
break; break;
default: default:
...@@ -262,7 +264,7 @@ static void serial_queue_async( struct fd *fd, void *apc, void *user, void *iosb ...@@ -262,7 +264,7 @@ static void serial_queue_async( struct fd *fd, void *apc, void *user, void *iosb
return; return;
} }
if (!create_async( fd, current, timeout, head, apc, user, iosb )) if (!create_async( fd, current, timeout, queue, apc, user, iosb ))
return; return;
/* Check if the new pending request can be served immediately */ /* Check if the new pending request can be served immediately */
......
...@@ -81,8 +81,8 @@ struct sock ...@@ -81,8 +81,8 @@ struct sock
obj_handle_t wparam; /* message wparam (socket handle) */ obj_handle_t wparam; /* message wparam (socket handle) */
int errors[FD_MAX_EVENTS]; /* event errors */ int errors[FD_MAX_EVENTS]; /* event errors */
struct sock *deferred; /* socket that waits for a deferred accept */ struct sock *deferred; /* socket that waits for a deferred accept */
struct async *read_q; /* Queue for asynchronous reads */ struct list read_q; /* queue for asynchronous reads */
struct async *write_q; /* Queue for asynchronous writes */ struct list write_q; /* queue for asynchronous writes */
}; };
static void sock_dump( struct object *obj, int verbose ); static void sock_dump( struct object *obj, int verbose );
...@@ -237,16 +237,16 @@ static void sock_wake_up( struct sock *sock, int pollev ) ...@@ -237,16 +237,16 @@ static void sock_wake_up( struct sock *sock, int pollev )
if ( sock->flags & WSA_FLAG_OVERLAPPED ) if ( sock->flags & WSA_FLAG_OVERLAPPED )
{ {
if ( pollev & (POLLIN|POLLPRI) && sock->read_q ) if ( pollev & (POLLIN|POLLPRI) && !list_empty( &sock->read_q ))
{ {
if (debug_level) fprintf( stderr, "activating read queue for socket %p\n", sock ); if (debug_level) fprintf( stderr, "activating read queue for socket %p\n", sock );
async_terminate( sock->read_q, STATUS_ALERTED ); async_terminate_head( &sock->read_q, STATUS_ALERTED );
async_active = 1; async_active = 1;
} }
if ( pollev & POLLOUT && sock->write_q ) if ( pollev & POLLOUT && !list_empty( &sock->write_q ))
{ {
if (debug_level) fprintf( stderr, "activating write queue for socket %p\n", sock ); if (debug_level) fprintf( stderr, "activating write queue for socket %p\n", sock );
async_terminate( sock->write_q, STATUS_ALERTED ); async_terminate_head( &sock->write_q, STATUS_ALERTED );
async_active = 1; async_active = 1;
} }
} }
...@@ -466,9 +466,9 @@ static int sock_get_poll_events( struct fd *fd ) ...@@ -466,9 +466,9 @@ static int sock_get_poll_events( struct fd *fd )
/* listening, wait for readable */ /* listening, wait for readable */
return (sock->hmask & FD_ACCEPT) ? 0 : POLLIN; return (sock->hmask & FD_ACCEPT) ? 0 : POLLIN;
if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && sock->read_q)) if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && !list_empty( &sock->read_q )))
ev |= POLLIN | POLLPRI; ev |= POLLIN | POLLPRI;
if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && sock->write_q)) if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && !list_empty( &sock->write_q )))
ev |= POLLOUT; ev |= POLLOUT;
/* We use POLLIN with 0 bytes recv() as FD_CLOSE indication for stream sockets. */ /* We use POLLIN with 0 bytes recv() as FD_CLOSE indication for stream sockets. */
if ( sock->type == SOCK_STREAM && ( sock->mask & ~sock->hmask & FD_CLOSE) ) if ( sock->type == SOCK_STREAM && ( sock->mask & ~sock->hmask & FD_CLOSE) )
...@@ -496,7 +496,7 @@ static void sock_queue_async( struct fd *fd, void *apc, void *user, void *iosb, ...@@ -496,7 +496,7 @@ static void sock_queue_async( struct fd *fd, void *apc, void *user, void *iosb,
int type, int count ) int type, int count )
{ {
struct sock *sock = get_fd_user( fd ); struct sock *sock = get_fd_user( fd );
struct async **head; struct list *queue;
int pollev; int pollev;
assert( sock->obj.ops == &sock_ops ); assert( sock->obj.ops == &sock_ops );
...@@ -510,11 +510,11 @@ static void sock_queue_async( struct fd *fd, void *apc, void *user, void *iosb, ...@@ -510,11 +510,11 @@ static void sock_queue_async( struct fd *fd, void *apc, void *user, void *iosb,
switch (type) switch (type)
{ {
case ASYNC_TYPE_READ: case ASYNC_TYPE_READ:
head = &sock->read_q; queue = &sock->read_q;
sock->hmask &= ~FD_CLOSE; sock->hmask &= ~FD_CLOSE;
break; break;
case ASYNC_TYPE_WRITE: case ASYNC_TYPE_WRITE:
head = &sock->write_q; queue = &sock->write_q;
break; break;
default: default:
set_error( STATUS_INVALID_PARAMETER ); set_error( STATUS_INVALID_PARAMETER );
...@@ -528,7 +528,7 @@ static void sock_queue_async( struct fd *fd, void *apc, void *user, void *iosb, ...@@ -528,7 +528,7 @@ static void sock_queue_async( struct fd *fd, void *apc, void *user, void *iosb,
} }
else else
{ {
if (!create_async( fd, current, 0, head, apc, user, iosb )) if (!create_async( fd, current, 0, queue, apc, user, iosb ))
return; return;
} }
...@@ -608,10 +608,8 @@ static struct object *create_socket( int family, int type, int protocol, unsigne ...@@ -608,10 +608,8 @@ static struct object *create_socket( int family, int type, int protocol, unsigne
release_object( sock ); release_object( sock );
return NULL; return NULL;
} }
if (sock->flags & WSA_FLAG_OVERLAPPED) list_init( &sock->read_q );
{ list_init( &sock->write_q );
sock->read_q = sock->write_q = NULL;
}
sock_reselect( sock ); sock_reselect( sock );
clear_error(); clear_error();
return &sock->obj; return &sock->obj;
...@@ -682,10 +680,8 @@ static struct sock *accept_socket( obj_handle_t handle ) ...@@ -682,10 +680,8 @@ static struct sock *accept_socket( obj_handle_t handle )
release_object( sock ); release_object( sock );
return NULL; return NULL;
} }
if ( acceptsock->flags & WSA_FLAG_OVERLAPPED ) list_init( &acceptsock->read_q );
{ list_init( &acceptsock->write_q );
acceptsock->read_q = acceptsock->write_q = NULL;
}
} }
clear_error(); clear_error();
sock->pmask &= ~FD_ACCEPT; sock->pmask &= ~FD_ACCEPT;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment