Commit d6a4e34a authored by Robert Shearman's avatar Robert Shearman Committed by Alexandre Julliard

- Add a default asynchronous I/O implementation.

- Make file objects use it.
parent 09d5cc23
...@@ -146,6 +146,8 @@ struct fd ...@@ -146,6 +146,8 @@ struct fd
int unix_fd; /* unix file descriptor */ int unix_fd; /* unix file descriptor */
int fs_locks; /* can we use filesystem locks for this fd? */ int fs_locks; /* can we use filesystem locks for this fd? */
int poll_index; /* index of fd in poll array */ int poll_index; /* index of fd in poll array */
struct list read_q; /* async readers of this fd */
struct list write_q; /* async writers of this fd */
}; };
static void fd_dump( struct object *obj, int verbose ); static void fd_dump( struct object *obj, int verbose );
...@@ -1073,6 +1075,9 @@ static void fd_destroy( struct object *obj ) ...@@ -1073,6 +1075,9 @@ static void fd_destroy( struct object *obj )
{ {
struct fd *fd = (struct fd *)obj; struct fd *fd = (struct fd *)obj;
async_terminate_queue( &fd->read_q, STATUS_CANCELLED );
async_terminate_queue( &fd->write_q, STATUS_CANCELLED );
remove_fd_locks( fd ); remove_fd_locks( fd );
list_remove( &fd->inode_entry ); list_remove( &fd->inode_entry );
if (fd->poll_index != -1) remove_poll_user( fd, fd->poll_index ); if (fd->poll_index != -1) remove_poll_user( fd, fd->poll_index );
...@@ -1126,6 +1131,8 @@ struct fd *alloc_fd( const struct fd_ops *fd_user_ops, struct object *user ) ...@@ -1126,6 +1131,8 @@ struct fd *alloc_fd( const struct fd_ops *fd_user_ops, struct object *user )
fd->poll_index = -1; fd->poll_index = -1;
list_init( &fd->inode_entry ); list_init( &fd->inode_entry );
list_init( &fd->locks ); list_init( &fd->locks );
list_init( &fd->read_q );
list_init( &fd->write_q );
if ((fd->poll_index = add_poll_user( fd )) == -1) if ((fd->poll_index = add_poll_user( fd )) == -1)
{ {
...@@ -1370,14 +1377,77 @@ int default_fd_signaled( struct object *obj, struct thread *thread ) ...@@ -1370,14 +1377,77 @@ int default_fd_signaled( struct object *obj, struct thread *thread )
return ret; return ret;
} }
int default_fd_get_poll_events( struct fd *fd )
{
int events = 0;
if( !list_empty( &fd->read_q ))
events |= POLLIN;
if( !list_empty( &fd->write_q ))
events |= POLLOUT;
return events;
}
/* default handler for poll() events */ /* default handler for poll() events */
void default_poll_event( struct fd *fd, int event ) void default_poll_event( struct fd *fd, int event )
{ {
/* an error occurred, stop polling this fd to avoid busy-looping */ if (!list_empty( &fd->read_q ) && (POLLIN & event) )
{
async_terminate_head( &fd->read_q, STATUS_ALERTED );
return;
}
if (!list_empty( &fd->write_q ) && (POLLOUT & event) )
{
async_terminate_head( &fd->write_q, STATUS_ALERTED );
return;
}
/* if an error occurred, stop polling this fd to avoid busy-looping */
if (event & (POLLERR | POLLHUP)) set_fd_events( fd, -1 ); if (event & (POLLERR | POLLHUP)) set_fd_events( fd, -1 );
wake_up( fd->user, 0 ); wake_up( fd->user, 0 );
} }
void default_fd_queue_async( struct fd *fd, void *apc, void *user, void *io_sb, int type, int count )
{
struct list *queue;
int events;
if (!(fd->fd_ops->get_file_info( fd ) & FD_FLAG_OVERLAPPED))
{
set_error( STATUS_INVALID_HANDLE );
return;
}
switch (type)
{
case ASYNC_TYPE_READ:
queue = &fd->read_q;
break;
case ASYNC_TYPE_WRITE:
queue = &fd->write_q;
break;
default:
set_error( STATUS_INVALID_PARAMETER );
return;
}
if (!create_async( current, NULL, queue, apc, user, io_sb ))
return;
/* Check if the new pending request can be served immediately */
events = check_fd_events( fd, fd->fd_ops->get_poll_events( fd ) );
if (events) fd->fd_ops->poll_event( fd, events );
set_fd_events( fd, fd->fd_ops->get_poll_events( fd ) );
}
void default_fd_cancel_async( struct fd *fd )
{
async_terminate_queue( &fd->read_q, STATUS_CANCELLED );
async_terminate_queue( &fd->write_q, STATUS_CANCELLED );
}
/* default flush() routine */ /* default flush() routine */
int no_flush( struct fd *fd, struct event **event ) int no_flush( struct fd *fd, struct event **event )
{ {
......
...@@ -60,8 +60,6 @@ struct file ...@@ -60,8 +60,6 @@ struct file
struct fd *fd; /* file descriptor for this file */ struct fd *fd; /* file descriptor for this file */
unsigned int access; /* file access (GENERIC_READ/WRITE) */ unsigned int access; /* file access (GENERIC_READ/WRITE) */
unsigned int options; /* file options (FILE_DELETE_ON_CLOSE, FILE_SYNCHRONOUS...) */ unsigned int options; /* file options (FILE_DELETE_ON_CLOSE, FILE_SYNCHRONOUS...) */
struct list read_q;
struct list write_q;
}; };
static void file_dump( struct object *obj, int verbose ); static void file_dump( struct object *obj, int verbose );
...@@ -69,11 +67,8 @@ static struct fd *file_get_fd( struct object *obj ); ...@@ -69,11 +67,8 @@ static struct fd *file_get_fd( struct object *obj );
static void file_destroy( struct object *obj ); static void file_destroy( struct object *obj );
static int file_get_poll_events( struct fd *fd ); static int file_get_poll_events( struct fd *fd );
static void file_poll_event( struct fd *fd, int event );
static int file_flush( struct fd *fd, struct event **event ); static int file_flush( struct fd *fd, struct event **event );
static int file_get_info( struct fd *fd ); static int file_get_info( struct fd *fd );
static void file_queue_async( struct fd *fd, void *apc, void *user, void* iosb, int type, int count );
static void file_cancel_async( struct fd *fd );
static const struct object_ops file_ops = static const struct object_ops file_ops =
{ {
...@@ -91,11 +86,11 @@ static const struct object_ops file_ops = ...@@ -91,11 +86,11 @@ static const struct object_ops file_ops =
static const struct fd_ops file_fd_ops = static const struct fd_ops file_fd_ops =
{ {
file_get_poll_events, /* get_poll_events */ file_get_poll_events, /* get_poll_events */
file_poll_event, /* poll_event */ default_poll_event, /* poll_event */
file_flush, /* flush */ file_flush, /* flush */
file_get_info, /* get_file_info */ file_get_info, /* get_file_info */
file_queue_async, /* queue_async */ default_fd_queue_async, /* queue_async */
file_cancel_async /* cancel_async */ default_fd_cancel_async /* cancel_async */
}; };
static inline int is_overlapped( const struct file *file ) static inline int is_overlapped( const struct file *file )
...@@ -164,8 +159,6 @@ static struct object *create_file( const char *nameptr, size_t len, unsigned int ...@@ -164,8 +159,6 @@ static struct object *create_file( const char *nameptr, size_t len, unsigned int
file->access = access; file->access = access;
file->options = options; file->options = options;
list_init( &file->read_q );
list_init( &file->write_q );
/* FIXME: should set error to STATUS_OBJECT_NAME_COLLISION if file existed before */ /* FIXME: should set error to STATUS_OBJECT_NAME_COLLISION if file existed before */
if (!(file->fd = alloc_fd( &file_fd_ops, &file->obj )) || if (!(file->fd = alloc_fd( &file_fd_ops, &file->obj )) ||
...@@ -233,27 +226,6 @@ static int file_get_poll_events( struct fd *fd ) ...@@ -233,27 +226,6 @@ static int file_get_poll_events( struct fd *fd )
return events; return events;
} }
static void file_poll_event( struct fd *fd, int event )
{
struct file *file = get_fd_user( fd );
assert( file->obj.ops == &file_ops );
if (is_overlapped( file ))
{
if (!list_empty( &file->read_q ) && (POLLIN & event) )
{
async_terminate_head( &file->read_q, STATUS_ALERTED );
return;
}
if (!list_empty( &file->write_q ) && (POLLOUT & event) )
{
async_terminate_head( &file->write_q, STATUS_ALERTED );
return;
}
}
default_poll_event( fd, event );
}
static int file_flush( struct fd *fd, struct event **event ) static int file_flush( struct fd *fd, struct event **event )
{ {
int ret = (fsync( get_unix_fd(fd) ) != -1); int ret = (fsync( get_unix_fd(fd) ) != -1);
...@@ -269,53 +241,6 @@ static int file_get_info( struct fd *fd ) ...@@ -269,53 +241,6 @@ static int file_get_info( struct fd *fd )
else return 0; else return 0;
} }
static void file_queue_async( struct fd *fd, void *apc, void *user, void *iosb,
int type, int count )
{
struct file *file = get_fd_user( fd );
struct list *queue;
int events;
assert( file->obj.ops == &file_ops );
if (!is_overlapped( file ))
{
set_error( STATUS_INVALID_HANDLE );
return;
}
switch (type)
{
case ASYNC_TYPE_READ:
queue = &file->read_q;
break;
case ASYNC_TYPE_WRITE:
queue = &file->write_q;
break;
default:
set_error( STATUS_INVALID_PARAMETER );
return;
}
if (!create_async( current, NULL, queue, apc, user, iosb ))
return;
/* Check if the new pending request can be served immediately */
events = check_fd_events( fd, file_get_poll_events( fd ) );
if (events) file_poll_event( fd, events );
set_fd_events( fd, file_get_poll_events( fd ));
}
static void file_cancel_async( struct fd *fd )
{
struct file *file = get_fd_user( fd );
assert( file->obj.ops == &file_ops );
async_terminate_queue( &file->read_q, STATUS_CANCELLED );
async_terminate_queue( &file->write_q, STATUS_CANCELLED );
}
static struct fd *file_get_fd( struct object *obj ) static struct fd *file_get_fd( struct object *obj )
{ {
struct file *file = (struct file *)obj; struct file *file = (struct file *)obj;
...@@ -328,11 +253,6 @@ static void file_destroy( struct object *obj ) ...@@ -328,11 +253,6 @@ static void file_destroy( struct object *obj )
struct file *file = (struct file *)obj; struct file *file = (struct file *)obj;
assert( obj->ops == &file_ops ); assert( obj->ops == &file_ops );
if (is_overlapped( file ))
{
async_terminate_queue( &file->read_q, STATUS_CANCELLED );
async_terminate_queue( &file->write_q, STATUS_CANCELLED );
}
if (file->fd) release_object( file->fd ); if (file->fd) release_object( file->fd );
} }
......
...@@ -64,7 +64,10 @@ extern int flush_cached_fd( struct process *process, obj_handle_t handle ); ...@@ -64,7 +64,10 @@ extern int flush_cached_fd( struct process *process, obj_handle_t handle );
extern int default_fd_add_queue( struct object *obj, struct wait_queue_entry *entry ); extern int default_fd_add_queue( struct object *obj, struct wait_queue_entry *entry );
extern void default_fd_remove_queue( struct object *obj, struct wait_queue_entry *entry ); extern void default_fd_remove_queue( struct object *obj, struct wait_queue_entry *entry );
extern int default_fd_signaled( struct object *obj, struct thread *thread ); extern int default_fd_signaled( struct object *obj, struct thread *thread );
extern int default_fd_get_poll_events( struct fd *fd );
extern void default_poll_event( struct fd *fd, int event ); extern void default_poll_event( struct fd *fd, int event );
extern void default_fd_queue_async( struct fd *fd, void *apc, void *user, void *io_sb, int type, int count );
extern void default_fd_cancel_async( struct fd *fd );
extern int no_flush( struct fd *fd, struct event **event ); extern int no_flush( struct fd *fd, struct event **event );
extern int no_get_file_info( struct fd *fd ); extern int no_get_file_info( struct fd *fd );
extern void no_queue_async( struct fd *fd, void* apc, void* user, void* io_sb, int type, int count); extern void no_queue_async( struct fd *fd, void* apc, void* user, void* io_sb, int type, int count);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment