Commit 72bff2e4 authored by Alexandre Julliard's avatar Alexandre Julliard

server: Add support for restarting an async I/O when the client side couldn't finish it right away.

parent ce45b8b1
...@@ -2169,7 +2169,7 @@ struct read_changes_info ...@@ -2169,7 +2169,7 @@ struct read_changes_info
ULONG BufferSize; ULONG BufferSize;
}; };
static void WINAPI read_changes_apc( void *user, PIO_STATUS_BLOCK iosb, ULONG status ) static NTSTATUS read_changes_apc( void *user, PIO_STATUS_BLOCK iosb, NTSTATUS status )
{ {
struct read_changes_info *info = user; struct read_changes_info *info = user;
char path[PATH_MAX]; char path[PATH_MAX];
...@@ -2222,6 +2222,7 @@ static void WINAPI read_changes_apc( void *user, PIO_STATUS_BLOCK iosb, ULONG st ...@@ -2222,6 +2222,7 @@ static void WINAPI read_changes_apc( void *user, PIO_STATUS_BLOCK iosb, ULONG st
iosb->Information = len; iosb->Information = len;
RtlFreeHeap( GetProcessHeap(), 0, info ); RtlFreeHeap( GetProcessHeap(), 0, info );
return ret;
} }
#define FILE_NOTIFY_ALL ( \ #define FILE_NOTIFY_ALL ( \
......
...@@ -236,8 +236,8 @@ NTSTATUS WINAPI NtCreateFile( PHANDLE handle, ACCESS_MASK access, POBJECT_ATTRIB ...@@ -236,8 +236,8 @@ NTSTATUS WINAPI NtCreateFile( PHANDLE handle, ACCESS_MASK access, POBJECT_ATTRIB
/*********************************************************************** /***********************************************************************
* Asynchronous file I/O * * Asynchronous file I/O *
*/ */
static void WINAPI FILE_AsyncReadService(void*, PIO_STATUS_BLOCK, ULONG); static NTSTATUS FILE_AsyncReadService(void*, PIO_STATUS_BLOCK, NTSTATUS);
static void WINAPI FILE_AsyncWriteService(void*, PIO_STATUS_BLOCK, ULONG); static NTSTATUS FILE_AsyncWriteService(void*, PIO_STATUS_BLOCK, NTSTATUS);
typedef struct async_fileio typedef struct async_fileio
{ {
...@@ -264,13 +264,12 @@ static void fileio_terminate(async_fileio *fileio, IO_STATUS_BLOCK* iosb, NTSTAT ...@@ -264,13 +264,12 @@ static void fileio_terminate(async_fileio *fileio, IO_STATUS_BLOCK* iosb, NTSTAT
static ULONG fileio_queue_async(async_fileio* fileio, IO_STATUS_BLOCK* iosb, static ULONG fileio_queue_async(async_fileio* fileio, IO_STATUS_BLOCK* iosb,
BOOL do_read) BOOL do_read)
{ {
PIO_APC_ROUTINE apc = do_read ? FILE_AsyncReadService : FILE_AsyncWriteService; NTSTATUS status;
NTSTATUS status;
SERVER_START_REQ( register_async ) SERVER_START_REQ( register_async )
{ {
req->handle = fileio->handle; req->handle = fileio->handle;
req->async.callback = apc; req->async.callback = do_read ? FILE_AsyncReadService : FILE_AsyncWriteService;
req->async.iosb = iosb; req->async.iosb = iosb;
req->async.arg = fileio; req->async.arg = fileio;
req->async.apc = fileio->apc; req->async.apc = fileio->apc;
...@@ -338,7 +337,7 @@ NTSTATUS FILE_GetNtStatus(void) ...@@ -338,7 +337,7 @@ NTSTATUS FILE_GetNtStatus(void)
/*********************************************************************** /***********************************************************************
* FILE_AsyncReadService (INTERNAL) * FILE_AsyncReadService (INTERNAL)
*/ */
static void WINAPI FILE_AsyncReadService(void *user, PIO_STATUS_BLOCK iosb, ULONG status) static NTSTATUS FILE_AsyncReadService(void *user, PIO_STATUS_BLOCK iosb, NTSTATUS status)
{ {
async_fileio *fileio = (async_fileio*)user; async_fileio *fileio = (async_fileio*)user;
int fd, needs_close, result; int fd, needs_close, result;
...@@ -351,10 +350,8 @@ static void WINAPI FILE_AsyncReadService(void *user, PIO_STATUS_BLOCK iosb, ULON ...@@ -351,10 +350,8 @@ static void WINAPI FILE_AsyncReadService(void *user, PIO_STATUS_BLOCK iosb, ULON
/* check to see if the data is ready (non-blocking) */ /* check to see if the data is ready (non-blocking) */
if ((status = server_get_unix_fd( fileio->handle, FILE_READ_DATA, &fd, if ((status = server_get_unix_fd( fileio->handle, FILE_READ_DATA, &fd,
&needs_close, NULL, NULL ))) &needs_close, NULL, NULL )))
{
fileio_terminate(fileio, iosb, status);
break; break;
}
result = read(fd, &fileio->buffer[fileio->already], fileio->count - fileio->already); result = read(fd, &fileio->buffer[fileio->already], fileio->count - fileio->already);
if (needs_close) close( fd ); if (needs_close) close( fd );
...@@ -390,16 +387,15 @@ static void WINAPI FILE_AsyncReadService(void *user, PIO_STATUS_BLOCK iosb, ULON ...@@ -390,16 +387,15 @@ static void WINAPI FILE_AsyncReadService(void *user, PIO_STATUS_BLOCK iosb, ULON
result, fileio->already, fileio->count, result, fileio->already, fileio->count,
(status == STATUS_SUCCESS) ? "success" : "pending"); (status == STATUS_SUCCESS) ? "success" : "pending");
} }
/* queue another async operation ? */
if (status == STATUS_PENDING)
fileio_queue_async(fileio, iosb, TRUE);
else
fileio_terminate(fileio, iosb, status);
break; break;
default:
fileio_terminate(fileio, iosb, status); case STATUS_TIMEOUT:
case STATUS_IO_TIMEOUT:
if (fileio->already) status = STATUS_SUCCESS;
break; break;
} }
if (status != STATUS_PENDING) fileio_terminate(fileio, iosb, status);
return status;
} }
struct io_timeouts struct io_timeouts
...@@ -664,9 +660,9 @@ done: ...@@ -664,9 +660,9 @@ done:
/*********************************************************************** /***********************************************************************
* FILE_AsyncWriteService (INTERNAL) * FILE_AsyncWriteService (INTERNAL)
*/ */
static void WINAPI FILE_AsyncWriteService(void *ovp, IO_STATUS_BLOCK *iosb, ULONG status) static NTSTATUS FILE_AsyncWriteService(void *user, IO_STATUS_BLOCK *iosb, NTSTATUS status)
{ {
async_fileio *fileio = (async_fileio *) ovp; async_fileio *fileio = user;
int result, fd, needs_close; int result, fd, needs_close;
enum server_fd_type type; enum server_fd_type type;
...@@ -678,10 +674,8 @@ static void WINAPI FILE_AsyncWriteService(void *ovp, IO_STATUS_BLOCK *iosb, ULON ...@@ -678,10 +674,8 @@ static void WINAPI FILE_AsyncWriteService(void *ovp, IO_STATUS_BLOCK *iosb, ULON
/* write some data (non-blocking) */ /* write some data (non-blocking) */
if ((status = server_get_unix_fd( fileio->handle, FILE_WRITE_DATA, &fd, if ((status = server_get_unix_fd( fileio->handle, FILE_WRITE_DATA, &fd,
&needs_close, &type, NULL ))) &needs_close, &type, NULL )))
{
fileio_terminate(fileio, iosb, status);
break; break;
}
if (!fileio->count && (type == FD_TYPE_MAILSLOT || type == FD_TYPE_PIPE || type == FD_TYPE_SOCKET)) if (!fileio->count && (type == FD_TYPE_MAILSLOT || type == FD_TYPE_PIPE || type == FD_TYPE_SOCKET))
result = send( fd, fileio->buffer, 0, 0 ); result = send( fd, fileio->buffer, 0, 0 );
else else
...@@ -700,15 +694,15 @@ static void WINAPI FILE_AsyncWriteService(void *ovp, IO_STATUS_BLOCK *iosb, ULON ...@@ -700,15 +694,15 @@ static void WINAPI FILE_AsyncWriteService(void *ovp, IO_STATUS_BLOCK *iosb, ULON
status = (fileio->already < fileio->count) ? STATUS_PENDING : STATUS_SUCCESS; status = (fileio->already < fileio->count) ? STATUS_PENDING : STATUS_SUCCESS;
TRACE("wrote %d more bytes %u/%u so far\n", result, fileio->already, fileio->count); TRACE("wrote %d more bytes %u/%u so far\n", result, fileio->already, fileio->count);
} }
if (status == STATUS_PENDING)
fileio_queue_async(fileio, iosb, FALSE);
else
fileio_terminate(fileio, iosb, status);
break; break;
default:
fileio_terminate(fileio, iosb, status); case STATUS_TIMEOUT:
case STATUS_IO_TIMEOUT:
if (fileio->already) status = STATUS_SUCCESS;
break; break;
} }
if (status != STATUS_PENDING) fileio_terminate(fileio, iosb, status);
return status;
} }
/****************************************************************************** /******************************************************************************
...@@ -937,10 +931,11 @@ NTSTATUS WINAPI NtDeviceIoControlFile(HANDLE handle, HANDLE event, ...@@ -937,10 +931,11 @@ NTSTATUS WINAPI NtDeviceIoControlFile(HANDLE handle, HANDLE event,
/*********************************************************************** /***********************************************************************
* pipe_completion_wait (Internal) * pipe_completion_wait (Internal)
*/ */
static void CALLBACK pipe_completion_wait(void *arg, PIO_STATUS_BLOCK iosb, ULONG status) static NTSTATUS pipe_completion_wait(void *arg, PIO_STATUS_BLOCK iosb, NTSTATUS status)
{ {
TRACE("for %p, status=%08x\n", iosb, status); TRACE("for %p, status=%08x\n", iosb, status);
iosb->u.Status = status; iosb->u.Status = status;
return status;
} }
/************************************************************************** /**************************************************************************
......
...@@ -680,10 +680,12 @@ static BOOL invoke_apc( const apc_call_t *call, apc_result_t *result ) ...@@ -680,10 +680,12 @@ static BOOL invoke_apc( const apc_call_t *call, apc_result_t *result )
break; break;
} }
case APC_ASYNC_IO: case APC_ASYNC_IO:
NtCurrentTeb()->num_async_io--;
call->async_io.func( call->async_io.user, call->async_io.sb, call->async_io.status );
result->type = call->type; result->type = call->type;
result->async_io.status = ((IO_STATUS_BLOCK *)call->async_io.sb)->u.Status; result->async_io.status = call->async_io.func( call->async_io.user,
call->async_io.sb,
call->async_io.status );
if (result->async_io.status != STATUS_PENDING)
NtCurrentTeb()->num_async_io--;
break; break;
case APC_VIRTUAL_ALLOC: case APC_VIRTUAL_ALLOC:
result->type = call->type; result->type = call->type;
......
...@@ -1072,9 +1072,9 @@ static void ws2_async_terminate(ws2_async* as, IO_STATUS_BLOCK* iosb, NTSTATUS s ...@@ -1072,9 +1072,9 @@ static void ws2_async_terminate(ws2_async* as, IO_STATUS_BLOCK* iosb, NTSTATUS s
* WS2_make_async (INTERNAL) * WS2_make_async (INTERNAL)
*/ */
static void WINAPI WS2_async_recv(void*, IO_STATUS_BLOCK*, ULONG); static NTSTATUS WS2_async_recv(void*, IO_STATUS_BLOCK*, NTSTATUS);
static void WINAPI WS2_async_send(void*, IO_STATUS_BLOCK*, ULONG); static NTSTATUS WS2_async_send(void*, IO_STATUS_BLOCK*, NTSTATUS);
static void WINAPI WS2_async_shutdown( void*, IO_STATUS_BLOCK*, ULONG); static NTSTATUS WS2_async_shutdown( void*, IO_STATUS_BLOCK*, NTSTATUS);
static inline struct ws2_async* static inline struct ws2_async*
WS2_make_async(SOCKET s, enum ws2_mode mode, struct iovec *iovec, DWORD dwBufferCount, WS2_make_async(SOCKET s, enum ws2_mode mode, struct iovec *iovec, DWORD dwBufferCount,
...@@ -1137,7 +1137,7 @@ error: ...@@ -1137,7 +1137,7 @@ error:
static ULONG ws2_queue_async(struct ws2_async* wsa, IO_STATUS_BLOCK* iosb) static ULONG ws2_queue_async(struct ws2_async* wsa, IO_STATUS_BLOCK* iosb)
{ {
PIO_APC_ROUTINE apc; NTSTATUS (*apc)(void *, IO_STATUS_BLOCK *, NTSTATUS);
int type; int type;
NTSTATUS status; NTSTATUS status;
...@@ -1239,10 +1239,10 @@ out: ...@@ -1239,10 +1239,10 @@ out:
* *
* Handler for overlapped recv() operations. * Handler for overlapped recv() operations.
*/ */
static void WINAPI WS2_async_recv( void* ovp, IO_STATUS_BLOCK* iosb, ULONG status) static NTSTATUS WS2_async_recv( void* user, IO_STATUS_BLOCK* iosb, NTSTATUS status)
{ {
ws2_async* wsa = (ws2_async*) ovp; ws2_async* wsa = user;
int result, fd, err; int result = 0, fd, err;
TRACE( "(%p %p %x)\n", wsa, iosb, status ); TRACE( "(%p %p %x)\n", wsa, iosb, status );
...@@ -1250,10 +1250,8 @@ static void WINAPI WS2_async_recv( void* ovp, IO_STATUS_BLOCK* iosb, ULONG statu ...@@ -1250,10 +1250,8 @@ static void WINAPI WS2_async_recv( void* ovp, IO_STATUS_BLOCK* iosb, ULONG statu
{ {
case STATUS_ALERTED: case STATUS_ALERTED:
if ((status = wine_server_handle_to_fd( wsa->hSocket, FILE_READ_DATA, &fd, NULL ) )) if ((status = wine_server_handle_to_fd( wsa->hSocket, FILE_READ_DATA, &fd, NULL ) ))
{
ws2_async_terminate(wsa, iosb, status, 0);
break; break;
}
result = WS2_recv( fd, wsa->iovec, wsa->n_iovecs, result = WS2_recv( fd, wsa->iovec, wsa->n_iovecs,
wsa->addr, wsa->addrlen.ptr, &wsa->flags ); wsa->addr, wsa->addrlen.ptr, &wsa->flags );
wine_server_release_fd( wsa->hSocket, fd ); wine_server_release_fd( wsa->hSocket, fd );
...@@ -1279,15 +1277,10 @@ static void WINAPI WS2_async_recv( void* ovp, IO_STATUS_BLOCK* iosb, ULONG statu ...@@ -1279,15 +1277,10 @@ static void WINAPI WS2_async_recv( void* ovp, IO_STATUS_BLOCK* iosb, ULONG statu
TRACE( "Error: %x\n", err ); TRACE( "Error: %x\n", err );
} }
} }
if (status == STATUS_PENDING)
ws2_queue_async(wsa, iosb);
else
ws2_async_terminate(wsa, iosb, status, result);
break;
default:
ws2_async_terminate(wsa, iosb, status, 0);
break; break;
} }
if (status != STATUS_PENDING) ws2_async_terminate(wsa, iosb, status, result);
return status;
} }
/*********************************************************************** /***********************************************************************
...@@ -1364,10 +1357,10 @@ out: ...@@ -1364,10 +1357,10 @@ out:
* *
* Handler for overlapped send() operations. * Handler for overlapped send() operations.
*/ */
static void WINAPI WS2_async_send(void* as, IO_STATUS_BLOCK* iosb, ULONG status) static NTSTATUS WS2_async_send(void* user, IO_STATUS_BLOCK* iosb, NTSTATUS status)
{ {
ws2_async* wsa = (ws2_async*) as; ws2_async* wsa = user;
int result, fd; int result = 0, fd;
TRACE( "(%p %p %x)\n", wsa, iosb, status ); TRACE( "(%p %p %x)\n", wsa, iosb, status );
...@@ -1375,10 +1368,8 @@ static void WINAPI WS2_async_send(void* as, IO_STATUS_BLOCK* iosb, ULONG status) ...@@ -1375,10 +1368,8 @@ static void WINAPI WS2_async_send(void* as, IO_STATUS_BLOCK* iosb, ULONG status)
{ {
case STATUS_ALERTED: case STATUS_ALERTED:
if ((status = wine_server_handle_to_fd( wsa->hSocket, FILE_WRITE_DATA, &fd, NULL ) )) if ((status = wine_server_handle_to_fd( wsa->hSocket, FILE_WRITE_DATA, &fd, NULL ) ))
{
ws2_async_terminate(wsa, iosb, status, 0);
break; break;
}
/* check to see if the data is ready (non-blocking) */ /* check to see if the data is ready (non-blocking) */
result = WS2_send( fd, wsa->iovec, wsa->n_iovecs, wsa->addr, wsa->addrlen.val, wsa->flags ); result = WS2_send( fd, wsa->iovec, wsa->n_iovecs, wsa->addr, wsa->addrlen.val, wsa->flags );
wine_server_release_fd( wsa->hSocket, fd ); wine_server_release_fd( wsa->hSocket, fd );
...@@ -1407,16 +1398,10 @@ static void WINAPI WS2_async_send(void* as, IO_STATUS_BLOCK* iosb, ULONG status) ...@@ -1407,16 +1398,10 @@ static void WINAPI WS2_async_send(void* as, IO_STATUS_BLOCK* iosb, ULONG status)
TRACE( "Error: %x\n", err ); TRACE( "Error: %x\n", err );
} }
} }
if (status == STATUS_PENDING)
ws2_queue_async(wsa, iosb);
else
ws2_async_terminate(wsa, iosb, status, result);
break;
default:
ws2_async_terminate(wsa, iosb, status, 0);
break; break;
} }
if (status != STATUS_PENDING) ws2_async_terminate(wsa, iosb, status, result);
return status;
} }
/*********************************************************************** /***********************************************************************
...@@ -1424,9 +1409,9 @@ static void WINAPI WS2_async_send(void* as, IO_STATUS_BLOCK* iosb, ULONG status) ...@@ -1424,9 +1409,9 @@ static void WINAPI WS2_async_send(void* as, IO_STATUS_BLOCK* iosb, ULONG status)
* *
* Handler for shutdown() operations on overlapped sockets. * Handler for shutdown() operations on overlapped sockets.
*/ */
static void WINAPI WS2_async_shutdown( void* as, PIO_STATUS_BLOCK iosb, ULONG status ) static NTSTATUS WS2_async_shutdown( void* user, PIO_STATUS_BLOCK iosb, NTSTATUS status )
{ {
ws2_async* wsa = (ws2_async*) as; ws2_async* wsa = user;
int fd, err = 1; int fd, err = 1;
TRACE( "async %p %d\n", wsa, wsa->mode ); TRACE( "async %p %d\n", wsa, wsa->mode );
...@@ -1434,10 +1419,8 @@ static void WINAPI WS2_async_shutdown( void* as, PIO_STATUS_BLOCK iosb, ULONG st ...@@ -1434,10 +1419,8 @@ static void WINAPI WS2_async_shutdown( void* as, PIO_STATUS_BLOCK iosb, ULONG st
{ {
case STATUS_ALERTED: case STATUS_ALERTED:
if ((status = wine_server_handle_to_fd( wsa->hSocket, 0, &fd, NULL ) )) if ((status = wine_server_handle_to_fd( wsa->hSocket, 0, &fd, NULL ) ))
{
ws2_async_terminate(wsa, iosb, status, 0);
break; break;
}
switch ( wsa->mode ) switch ( wsa->mode )
{ {
case ws2m_sd_read: err = shutdown( fd, 0 ); break; case ws2m_sd_read: err = shutdown( fd, 0 ); break;
...@@ -1446,13 +1429,10 @@ static void WINAPI WS2_async_shutdown( void* as, PIO_STATUS_BLOCK iosb, ULONG st ...@@ -1446,13 +1429,10 @@ static void WINAPI WS2_async_shutdown( void* as, PIO_STATUS_BLOCK iosb, ULONG st
} }
wine_server_release_fd( wsa->hSocket, fd ); wine_server_release_fd( wsa->hSocket, fd );
status = err ? wsaErrno() : STATUS_SUCCESS; status = err ? wsaErrno() : STATUS_SUCCESS;
ws2_async_terminate(wsa, iosb, status, 0);
break;
default:
ws2_async_terminate(wsa, iosb, status, 0);
break; break;
} }
ws2_async_terminate(wsa, iosb, status, 0);
return status;
} }
/*********************************************************************** /***********************************************************************
......
...@@ -263,7 +263,7 @@ typedef union ...@@ -263,7 +263,7 @@ typedef union
struct struct
{ {
enum apc_type type; enum apc_type type;
void (__stdcall *func)(void*, void*, unsigned int); unsigned int (*func)(void*, void*, unsigned int);
void *user; void *user;
void *sb; void *sb;
unsigned int status; unsigned int status;
...@@ -4677,6 +4677,6 @@ union generic_reply ...@@ -4677,6 +4677,6 @@ union generic_reply
struct allocate_locally_unique_id_reply allocate_locally_unique_id_reply; struct allocate_locally_unique_id_reply allocate_locally_unique_id_reply;
}; };
#define SERVER_PROTOCOL_VERSION 290 #define SERVER_PROTOCOL_VERSION 291
#endif /* __WINE_WINE_SERVER_PROTOCOL_H */ #endif /* __WINE_WINE_SERVER_PROTOCOL_H */
...@@ -38,6 +38,7 @@ struct async ...@@ -38,6 +38,7 @@ struct async
struct thread *thread; /* owning thread */ struct thread *thread; /* owning thread */
struct list queue_entry; /* entry in async queue list */ struct list queue_entry; /* entry in async queue list */
struct async_queue *queue; /* queue containing this async */ struct async_queue *queue; /* queue containing this async */
unsigned int status; /* current status */
struct timeout_user *timeout; struct timeout_user *timeout;
unsigned int timeout_status; /* status to report upon timeout */ unsigned int timeout_status; /* status to report upon timeout */
struct event *event; struct event *event;
...@@ -92,6 +93,11 @@ static const struct object_ops async_queue_ops = ...@@ -92,6 +93,11 @@ static const struct object_ops async_queue_ops =
}; };
static inline void async_reselect( struct async *async )
{
if (async->queue->fd) fd_reselect_async( async->queue->fd, async->queue );
}
static void async_dump( struct object *obj, int verbose ) static void async_dump( struct object *obj, int verbose )
{ {
struct async *async = (struct async *)obj; struct async *async = (struct async *)obj;
...@@ -104,10 +110,12 @@ static void async_destroy( struct object *obj ) ...@@ -104,10 +110,12 @@ static void async_destroy( struct object *obj )
struct async *async = (struct async *)obj; struct async *async = (struct async *)obj;
assert( obj->ops == &async_ops ); assert( obj->ops == &async_ops );
list_remove( &async->queue_entry );
async_reselect( async );
if (async->timeout) remove_timeout_user( async->timeout ); if (async->timeout) remove_timeout_user( async->timeout );
if (async->event) release_object( async->event ); if (async->event) release_object( async->event );
release_object( async->queue ); release_object( async->queue );
async->queue = NULL;
release_object( async->thread ); release_object( async->thread );
} }
...@@ -119,11 +127,19 @@ static void async_queue_dump( struct object *obj, int verbose ) ...@@ -119,11 +127,19 @@ static void async_queue_dump( struct object *obj, int verbose )
} }
/* notifies client thread of new status of its async request */ /* notifies client thread of new status of its async request */
/* destroys the server side of it */
static void async_terminate( struct async *async, unsigned int status ) static void async_terminate( struct async *async, unsigned int status )
{ {
apc_call_t data; apc_call_t data;
assert( status != STATUS_PENDING );
if (async->status != STATUS_PENDING)
{
/* already terminated, just update status */
async->status = status;
return;
}
memset( &data, 0, sizeof(data) ); memset( &data, 0, sizeof(data) );
data.type = APC_ASYNC_IO; data.type = APC_ASYNC_IO;
data.async_io.func = async->data.callback; data.async_io.func = async->data.callback;
...@@ -131,11 +147,9 @@ static void async_terminate( struct async *async, unsigned int status ) ...@@ -131,11 +147,9 @@ static void async_terminate( struct async *async, unsigned int status )
data.async_io.sb = async->data.iosb; data.async_io.sb = async->data.iosb;
data.async_io.status = status; data.async_io.status = status;
thread_queue_apc( async->thread, &async->obj, &data ); thread_queue_apc( async->thread, &async->obj, &data );
async->status = status;
if (async->timeout) remove_timeout_user( async->timeout ); async_reselect( async );
async->timeout = NULL; release_object( async ); /* so that it gets destroyed when the async is done */
list_remove( &async->queue_entry );
release_object( async );
} }
/* callback for timeout on an async request */ /* callback for timeout on an async request */
...@@ -184,11 +198,12 @@ struct async *create_async( struct thread *thread, struct async_queue *queue, co ...@@ -184,11 +198,12 @@ struct async *create_async( struct thread *thread, struct async_queue *queue, co
return NULL; return NULL;
} }
async->thread = (struct thread *)grab_object( thread ); async->thread = (struct thread *)grab_object( thread );
async->event = event; async->event = event;
async->data = *data; async->status = STATUS_PENDING;
async->data = *data;
async->timeout = NULL; async->timeout = NULL;
async->queue = (struct async_queue *)grab_object( queue ); async->queue = (struct async_queue *)grab_object( queue );
list_add_tail( &queue->queue, &async->queue_entry ); list_add_tail( &queue->queue, &async->queue_entry );
grab_object( async ); grab_object( async );
...@@ -214,12 +229,24 @@ void async_set_result( struct object *obj, unsigned int status ) ...@@ -214,12 +229,24 @@ void async_set_result( struct object *obj, unsigned int status )
if (obj->ops != &async_ops) return; /* in case the client messed up the APC results */ if (obj->ops != &async_ops) return; /* in case the client messed up the APC results */
if (status == STATUS_PENDING) assert( async->status != STATUS_PENDING ); /* it must have been woken up if we get a result */
if (status == STATUS_PENDING) /* restart it */
{ {
/* FIXME: restart the async operation */ status = async->status;
async->status = STATUS_PENDING;
grab_object( async );
if (status != STATUS_ALERTED) /* it was terminated in the meantime */
async_terminate( async, status );
else
async_reselect( async );
} }
else else
{ {
if (async->timeout) remove_timeout_user( async->timeout );
async->timeout = NULL;
async->status = status;
if (async->data.apc) if (async->data.apc)
{ {
apc_call_t data; apc_call_t data;
...@@ -238,7 +265,13 @@ void async_set_result( struct object *obj, unsigned int status ) ...@@ -238,7 +265,13 @@ void async_set_result( struct object *obj, unsigned int status )
/* check if an async operation is waiting to be alerted */ /* check if an async operation is waiting to be alerted */
int async_waiting( struct async_queue *queue ) int async_waiting( struct async_queue *queue )
{ {
return queue && !list_empty( &queue->queue ); struct list *ptr;
struct async *async;
if (!queue) return 0;
if (!(ptr = list_head( &queue->queue ))) return 0;
async = LIST_ENTRY( ptr, struct async, queue_entry );
return async->status == STATUS_PENDING;
} }
/* wake up async operations on the queue */ /* wake up async operations on the queue */
......
...@@ -183,12 +183,13 @@ static enum server_fd_type dir_get_info( struct fd *fd, int *flags ); ...@@ -183,12 +183,13 @@ static enum server_fd_type dir_get_info( struct fd *fd, int *flags );
static const struct fd_ops dir_fd_ops = static const struct fd_ops dir_fd_ops =
{ {
dir_get_poll_events, /* get_poll_events */ dir_get_poll_events, /* get_poll_events */
default_poll_event, /* poll_event */ default_poll_event, /* poll_event */
no_flush, /* flush */ no_flush, /* flush */
dir_get_info, /* get_file_info */ dir_get_info, /* get_file_info */
default_fd_queue_async, /* queue_async */ default_fd_queue_async, /* queue_async */
default_fd_cancel_async /* cancel_async */ default_fd_reselect_async, /* reselect_async */
default_fd_cancel_async /* cancel_async */
}; };
static struct list change_list = LIST_INIT(change_list); static struct list change_list = LIST_INIT(change_list);
...@@ -516,12 +517,13 @@ static void inotify_poll_event( struct fd *fd, int event ); ...@@ -516,12 +517,13 @@ static void inotify_poll_event( struct fd *fd, int event );
static const struct fd_ops inotify_fd_ops = static const struct fd_ops inotify_fd_ops =
{ {
inotify_get_poll_events, /* get_poll_events */ inotify_get_poll_events, /* get_poll_events */
inotify_poll_event, /* poll_event */ inotify_poll_event, /* poll_event */
no_flush, /* flush */ no_flush, /* flush */
no_get_file_info, /* get_file_info */ no_get_file_info, /* get_file_info */
default_fd_queue_async, /* queue_async */ default_fd_queue_async, /* queue_async */
default_fd_cancel_async, /* cancel_async */ default_fd_reselect_async, /* reselect_async */
default_fd_cancel_async, /* cancel_async */
}; };
static int inotify_get_poll_events( struct fd *fd ) static int inotify_get_poll_events( struct fd *fd )
......
...@@ -1638,6 +1638,7 @@ int check_fd_events( struct fd *fd, int events ) ...@@ -1638,6 +1638,7 @@ int check_fd_events( struct fd *fd, int events )
struct pollfd pfd; struct pollfd pfd;
if (fd->unix_fd == -1) return POLLERR; if (fd->unix_fd == -1) return POLLERR;
if (fd->inode) return events; /* regular files are always signaled */
pfd.fd = fd->unix_fd; pfd.fd = fd->unix_fd;
pfd.events = events; pfd.events = events;
...@@ -1666,12 +1667,12 @@ int default_fd_get_poll_events( struct fd *fd ) ...@@ -1666,12 +1667,12 @@ int default_fd_get_poll_events( struct fd *fd )
/* default handler for poll() events */ /* default handler for poll() events */
void default_poll_event( struct fd *fd, int event ) void default_poll_event( struct fd *fd, int event )
{ {
if (event & POLLIN) async_wake_up( fd->read_q, STATUS_ALERTED ); if (event & (POLLIN | POLLERR | POLLHUP)) async_wake_up( fd->read_q, STATUS_ALERTED );
if (event & POLLOUT) async_wake_up( fd->write_q, STATUS_ALERTED ); if (event & (POLLOUT | POLLERR | POLLHUP)) async_wake_up( fd->write_q, STATUS_ALERTED );
/* if an error occurred, stop polling this fd to avoid busy-looping */ /* if an error occurred, stop polling this fd to avoid busy-looping */
if (event & (POLLERR | POLLHUP)) set_fd_events( fd, -1 ); if (event & (POLLERR | POLLHUP)) set_fd_events( fd, -1 );
else set_fd_events( fd, fd->fd_ops->get_poll_events( fd ) ); else if (!fd->inode) set_fd_events( fd, fd->fd_ops->get_poll_events( fd ) );
} }
struct async *fd_queue_async( struct fd *fd, const async_data_t *data, int type, int count ) struct async *fd_queue_async( struct fd *fd, const async_data_t *data, int type, int count )
...@@ -1725,6 +1726,11 @@ void fd_async_wake_up( struct fd *fd, int type, unsigned int status ) ...@@ -1725,6 +1726,11 @@ void fd_async_wake_up( struct fd *fd, int type, unsigned int status )
} }
} }
void fd_reselect_async( struct fd *fd, struct async_queue *queue )
{
fd->fd_ops->reselect_async( fd, queue );
}
void default_fd_queue_async( struct fd *fd, const async_data_t *data, int type, int count ) void default_fd_queue_async( struct fd *fd, const async_data_t *data, int type, int count )
{ {
int flags; int flags;
...@@ -1743,6 +1749,19 @@ void default_fd_queue_async( struct fd *fd, const async_data_t *data, int type, ...@@ -1743,6 +1749,19 @@ void default_fd_queue_async( struct fd *fd, const async_data_t *data, int type,
} }
} }
/* default reselect_async() fd routine */
void default_fd_reselect_async( struct fd *fd, struct async_queue *queue )
{
if (queue != fd->wait_q)
{
int poll_events = fd->fd_ops->get_poll_events( fd );
int events = check_fd_events( fd, poll_events );
if (events) fd->fd_ops->poll_event( fd, events );
else set_fd_events( fd, poll_events );
}
}
/* default cancel_async() fd routine */
void default_fd_cancel_async( struct fd *fd ) void default_fd_cancel_async( struct fd *fd )
{ {
async_wake_up( fd->read_q, STATUS_CANCELLED ); async_wake_up( fd->read_q, STATUS_CANCELLED );
......
...@@ -96,6 +96,7 @@ static const struct fd_ops file_fd_ops = ...@@ -96,6 +96,7 @@ static const struct fd_ops file_fd_ops =
file_flush, /* flush */ file_flush, /* flush */
file_get_info, /* get_file_info */ file_get_info, /* get_file_info */
default_fd_queue_async, /* queue_async */ default_fd_queue_async, /* queue_async */
default_fd_reselect_async, /* reselect_async */
default_fd_cancel_async /* cancel_async */ default_fd_cancel_async /* cancel_async */
}; };
......
...@@ -41,6 +41,8 @@ struct fd_ops ...@@ -41,6 +41,8 @@ struct fd_ops
enum server_fd_type (*get_file_info)(struct fd *fd, int *flags); enum server_fd_type (*get_file_info)(struct fd *fd, int *flags);
/* queue an async operation */ /* queue an async operation */
void (*queue_async)(struct fd *, const async_data_t *data, int type, int count); void (*queue_async)(struct fd *, const async_data_t *data, int type, int count);
/* selected events for async i/o need an update */
void (*reselect_async)( struct fd *, struct async_queue *queue );
/* cancel an async operation */ /* cancel an async operation */
void (*cancel_async)(struct fd *); void (*cancel_async)(struct fd *);
}; };
...@@ -70,7 +72,9 @@ extern int default_fd_get_poll_events( struct fd *fd ); ...@@ -70,7 +72,9 @@ extern int default_fd_get_poll_events( struct fd *fd );
extern void default_poll_event( struct fd *fd, int event ); extern void default_poll_event( struct fd *fd, int event );
extern struct async *fd_queue_async( struct fd *fd, const async_data_t *data, int type, int count ); extern struct async *fd_queue_async( struct fd *fd, const async_data_t *data, int type, int count );
extern void fd_async_wake_up( struct fd *fd, int type, unsigned int status ); extern void fd_async_wake_up( struct fd *fd, int type, unsigned int status );
extern void fd_reselect_async( struct fd *fd, struct async_queue *queue );
extern void default_fd_queue_async( struct fd *fd, const async_data_t *data, int type, int count ); extern void default_fd_queue_async( struct fd *fd, const async_data_t *data, int type, int count );
extern void default_fd_reselect_async( struct fd *fd, struct async_queue *queue );
extern void default_fd_cancel_async( struct fd *fd ); extern void default_fd_cancel_async( struct fd *fd );
extern void no_flush( struct fd *fd, struct event **event ); extern void no_flush( struct fd *fd, struct event **event );
extern enum server_fd_type no_get_file_info( struct fd *fd, int *flags ); extern enum server_fd_type no_get_file_info( struct fd *fd, int *flags );
......
...@@ -97,6 +97,7 @@ static const struct fd_ops mailslot_fd_ops = ...@@ -97,6 +97,7 @@ static const struct fd_ops mailslot_fd_ops =
no_flush, /* flush */ no_flush, /* flush */
mailslot_get_info, /* get_file_info */ mailslot_get_info, /* get_file_info */
mailslot_queue_async, /* queue_async */ mailslot_queue_async, /* queue_async */
default_fd_reselect_async, /* reselect_async */
default_fd_cancel_async /* cancel_async */ default_fd_cancel_async /* cancel_async */
}; };
...@@ -185,6 +186,7 @@ static const struct fd_ops mailslot_device_fd_ops = ...@@ -185,6 +186,7 @@ static const struct fd_ops mailslot_device_fd_ops =
no_flush, /* flush */ no_flush, /* flush */
mailslot_device_get_file_info, /* get_file_info */ mailslot_device_get_file_info, /* get_file_info */
default_fd_queue_async, /* queue_async */ default_fd_queue_async, /* queue_async */
default_fd_reselect_async, /* reselect_async */
default_fd_cancel_async /* cancel_async */ default_fd_cancel_async /* cancel_async */
}; };
......
...@@ -158,11 +158,12 @@ static const struct object_ops pipe_server_ops = ...@@ -158,11 +158,12 @@ static const struct object_ops pipe_server_ops =
static const struct fd_ops pipe_server_fd_ops = static const struct fd_ops pipe_server_fd_ops =
{ {
default_fd_get_poll_events, /* get_poll_events */ default_fd_get_poll_events, /* get_poll_events */
default_poll_event, /* poll_event */ default_poll_event, /* poll_event */
pipe_server_flush, /* flush */ pipe_server_flush, /* flush */
pipe_server_get_info, /* get_file_info */ pipe_server_get_info, /* get_file_info */
default_fd_queue_async, /* queue_async */ default_fd_queue_async, /* queue_async */
default_fd_reselect_async, /* reselect_async */
default_fd_cancel_async, /* cancel_async */ default_fd_cancel_async, /* cancel_async */
}; };
...@@ -197,6 +198,7 @@ static const struct fd_ops pipe_client_fd_ops = ...@@ -197,6 +198,7 @@ static const struct fd_ops pipe_client_fd_ops =
pipe_client_flush, /* flush */ pipe_client_flush, /* flush */
pipe_client_get_info, /* get_file_info */ pipe_client_get_info, /* get_file_info */
default_fd_queue_async, /* queue_async */ default_fd_queue_async, /* queue_async */
default_fd_reselect_async, /* reselect_async */
default_fd_cancel_async /* cancel_async */ default_fd_cancel_async /* cancel_async */
}; };
...@@ -233,6 +235,7 @@ static const struct fd_ops named_pipe_device_fd_ops = ...@@ -233,6 +235,7 @@ static const struct fd_ops named_pipe_device_fd_ops =
no_flush, /* flush */ no_flush, /* flush */
named_pipe_device_get_file_info, /* get_file_info */ named_pipe_device_get_file_info, /* get_file_info */
default_fd_queue_async, /* queue_async */ default_fd_queue_async, /* queue_async */
default_fd_reselect_async, /* reselect_async */
default_fd_cancel_async /* cancel_async */ default_fd_cancel_async /* cancel_async */
}; };
......
...@@ -87,6 +87,7 @@ static const struct fd_ops process_fd_ops = ...@@ -87,6 +87,7 @@ static const struct fd_ops process_fd_ops =
no_flush, /* flush */ no_flush, /* flush */
no_get_file_info, /* get_file_info */ no_get_file_info, /* get_file_info */
no_queue_async, /* queue_async */ no_queue_async, /* queue_async */
NULL, /* reselect_async */
no_cancel_async /* cancel async */ no_cancel_async /* cancel async */
}; };
......
...@@ -279,7 +279,7 @@ typedef union ...@@ -279,7 +279,7 @@ typedef union
struct struct
{ {
enum apc_type type; /* APC_ASYNC_IO */ enum apc_type type; /* APC_ASYNC_IO */
void (__stdcall *func)(void*, void*, unsigned int); unsigned int (*func)(void*, void*, unsigned int);
void *user; /* user pointer */ void *user; /* user pointer */
void *sb; /* status block */ void *sb; /* status block */
unsigned int status; /* I/O status */ unsigned int status; /* I/O status */
......
...@@ -169,6 +169,7 @@ static const struct fd_ops msg_queue_fd_ops = ...@@ -169,6 +169,7 @@ static const struct fd_ops msg_queue_fd_ops =
no_flush, /* flush */ no_flush, /* flush */
no_get_file_info, /* get_file_info */ no_get_file_info, /* get_file_info */
no_queue_async, /* queue_async */ no_queue_async, /* queue_async */
NULL, /* reselect_async */
no_cancel_async /* cancel async */ no_cancel_async /* cancel async */
}; };
......
...@@ -109,6 +109,7 @@ static const struct fd_ops master_socket_fd_ops = ...@@ -109,6 +109,7 @@ static const struct fd_ops master_socket_fd_ops =
no_flush, /* flush */ no_flush, /* flush */
no_get_file_info, /* get_file_info */ no_get_file_info, /* get_file_info */
no_queue_async, /* queue_async */ no_queue_async, /* queue_async */
NULL, /* reselect_async */
no_cancel_async /* cancel_async */ no_cancel_async /* cancel_async */
}; };
......
...@@ -109,6 +109,7 @@ static const struct fd_ops serial_fd_ops = ...@@ -109,6 +109,7 @@ static const struct fd_ops serial_fd_ops =
serial_flush, /* flush */ serial_flush, /* flush */
serial_get_info, /* get_file_info */ serial_get_info, /* get_file_info */
serial_queue_async, /* queue_async */ serial_queue_async, /* queue_async */
default_fd_reselect_async, /* reselect_async */
default_fd_cancel_async /* cancel_async */ default_fd_cancel_async /* cancel_async */
}; };
......
...@@ -85,6 +85,7 @@ static const struct fd_ops handler_fd_ops = ...@@ -85,6 +85,7 @@ static const struct fd_ops handler_fd_ops =
no_flush, /* flush */ no_flush, /* flush */
no_get_file_info, /* get_file_info */ no_get_file_info, /* get_file_info */
no_queue_async, /* queue_async */ no_queue_async, /* queue_async */
NULL, /* reselect_async */
no_cancel_async /* cancel_async */ no_cancel_async /* cancel_async */
}; };
......
...@@ -97,6 +97,7 @@ static int sock_get_poll_events( struct fd *fd ); ...@@ -97,6 +97,7 @@ static int sock_get_poll_events( struct fd *fd );
static void sock_poll_event( struct fd *fd, int event ); static void sock_poll_event( struct fd *fd, int event );
static enum server_fd_type sock_get_info( struct fd *fd, int *flags ); static enum server_fd_type sock_get_info( struct fd *fd, int *flags );
static void sock_queue_async( struct fd *fd, const async_data_t *data, int type, int count ); static void sock_queue_async( struct fd *fd, const async_data_t *data, int type, int count );
static void sock_reselect_async( struct fd *fd, struct async_queue *queue );
static void sock_cancel_async( struct fd *fd ); static void sock_cancel_async( struct fd *fd );
static int sock_get_error( int err ); static int sock_get_error( int err );
...@@ -126,6 +127,7 @@ static const struct fd_ops sock_fd_ops = ...@@ -126,6 +127,7 @@ static const struct fd_ops sock_fd_ops =
no_flush, /* flush */ no_flush, /* flush */
sock_get_info, /* get_file_info */ sock_get_info, /* get_file_info */
sock_queue_async, /* queue_async */ sock_queue_async, /* queue_async */
sock_reselect_async, /* reselect_async */
sock_cancel_async /* cancel_async */ sock_cancel_async /* cancel_async */
}; };
...@@ -556,6 +558,13 @@ static void sock_queue_async( struct fd *fd, const async_data_t *data, int type, ...@@ -556,6 +558,13 @@ static void sock_queue_async( struct fd *fd, const async_data_t *data, int type,
if ( pollev ) sock_try_event( sock, pollev ); if ( pollev ) sock_try_event( sock, pollev );
} }
static void sock_reselect_async( struct fd *fd, struct async_queue *queue )
{
struct sock *sock = get_fd_user( fd );
int events = sock_reselect( sock );
if (events) sock_try_event( sock, events );
}
static void sock_cancel_async( struct fd *fd ) static void sock_cancel_async( struct fd *fd )
{ {
struct sock *sock = get_fd_user( fd ); struct sock *sock = get_fd_user( fd );
......
...@@ -132,6 +132,7 @@ static const struct fd_ops thread_fd_ops = ...@@ -132,6 +132,7 @@ static const struct fd_ops thread_fd_ops =
no_flush, /* flush */ no_flush, /* flush */
no_get_file_info, /* get_file_info */ no_get_file_info, /* get_file_info */
no_queue_async, /* queue_async */ no_queue_async, /* queue_async */
NULL, /* reselect_async */
no_cancel_async /* cancel_async */ no_cancel_async /* cancel_async */
}; };
......
...@@ -4161,6 +4161,7 @@ static const struct ...@@ -4161,6 +4161,7 @@ static const struct
{ "MEDIA_WRITE_PROTECTED", STATUS_MEDIA_WRITE_PROTECTED }, { "MEDIA_WRITE_PROTECTED", STATUS_MEDIA_WRITE_PROTECTED },
{ "MUTANT_NOT_OWNED", STATUS_MUTANT_NOT_OWNED }, { "MUTANT_NOT_OWNED", STATUS_MUTANT_NOT_OWNED },
{ "NAME_TOO_LONG", STATUS_NAME_TOO_LONG }, { "NAME_TOO_LONG", STATUS_NAME_TOO_LONG },
{ "NOTIFY_ENUM_DIR", STATUS_NOTIFY_ENUM_DIR },
{ "NOT_ALL_ASSIGNED", STATUS_NOT_ALL_ASSIGNED }, { "NOT_ALL_ASSIGNED", STATUS_NOT_ALL_ASSIGNED },
{ "NOT_A_DIRECTORY", STATUS_NOT_A_DIRECTORY }, { "NOT_A_DIRECTORY", STATUS_NOT_A_DIRECTORY },
{ "NOT_IMPLEMENTED", STATUS_NOT_IMPLEMENTED }, { "NOT_IMPLEMENTED", STATUS_NOT_IMPLEMENTED },
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment