Commit 4634447d authored by Eric Pouech's avatar Eric Pouech Committed by Alexandre Julliard

- got rid of include/async.h

- fixed some overlapped issues in socket handling - moved kernel32.CancelIo implementation to ntdll
parent 3e55df39
......@@ -93,8 +93,8 @@
#include "winerror.h"
#include "wine/server.h"
#include "async.h"
#include "wine/unicode.h"
#include "thread.h"
#include "wine/debug.h"
......@@ -125,32 +125,15 @@ static inline void release_comm_fd( HANDLE handle, int fd )
* Asynchronous I/O for asynchronous wait requests *
*/
static DWORD commio_get_async_count (const async_private *ovp);
static void commio_async_cleanup (async_private *ovp);
static async_ops commio_async_ops =
{
commio_get_async_count, /* get_count */
NULL, /* call_completion */
commio_async_cleanup /* cleanup */
};
typedef struct async_commio
{
struct async_private async;
char *buffer;
HANDLE handle;
PIO_APC_ROUTINE apc_internal;
int type;
char* buffer;
int fd;
} async_commio;
static DWORD commio_get_async_count (const struct async_private *ovp)
{
return 0;
}
static void commio_async_cleanup (async_private *ovp)
{
HeapFree(GetProcessHeap(), 0, ovp );
}
/***********************************************************************/
#if !defined(TIOCINQ) && defined(FIONREAD)
......@@ -1928,17 +1911,27 @@ BOOL WINAPI GetCommModemStatus(
* This function is called while the client is waiting on the
* server, so we can't make any server calls here.
*/
static void COMM_WaitCommEventService(async_private *ovp)
static void WINAPI COMM_WaitCommEventService(void* ovp, IO_STATUS_BLOCK* iosb, ULONG status)
{
async_commio *commio = (async_commio*) ovp;
IO_STATUS_BLOCK* iosb = commio->async.iosb;
TRACE("iosb %p\n",iosb);
TRACE("iosb %p\n", iosb);
/* FIXME: detect other events */
*commio->buffer = EV_RXCHAR;
iosb->u.Status = STATUS_SUCCESS;
switch (status)
{
case STATUS_ALERTED: /* got some new stuff */
/* FIXME: detect other events */
*commio->buffer = EV_RXCHAR;
iosb->u.Status = STATUS_SUCCESS;
break;
default:
iosb->u.Status = status;
break;
}
wine_server_release_fd( commio->handle, commio->fd );
if ( ((LPOVERLAPPED)iosb)->hEvent != INVALID_HANDLE_VALUE )
NtSetEvent( ((LPOVERLAPPED)iosb)->hEvent, NULL );
HeapFree(GetProcessHeap(), 0, commio );
}
......@@ -1952,44 +1945,52 @@ static BOOL COMM_WaitCommEvent(
LPDWORD lpdwEvents, /* [out] event(s) that were detected */
LPOVERLAPPED lpOverlapped) /* [in/out] for Asynchronous waiting */
{
int fd;
async_commio *ovp;
int fd;
async_commio* commio;
NTSTATUS status;
if(!lpOverlapped)
if (!lpOverlapped)
{
SetLastError(ERROR_INVALID_PARAMETER);
return FALSE;
}
if(NtResetEvent(lpOverlapped->hEvent,NULL))
if (NtResetEvent(lpOverlapped->hEvent,NULL))
return FALSE;
fd = get_comm_fd( hFile, GENERIC_WRITE );
if(fd<0)
return FALSE;
if (fd < 0) return FALSE;
ovp = (async_commio*) HeapAlloc(GetProcessHeap(), 0, sizeof (async_commio));
if(!ovp)
commio = (async_commio*) HeapAlloc(GetProcessHeap(), 0, sizeof (async_commio));
if (!commio)
{
release_comm_fd( hFile, fd );
return FALSE;
}
ovp->async.ops = &commio_async_ops;
ovp->async.handle = hFile;
ovp->async.fd = fd; /* FIXME */
ovp->async.type = ASYNC_TYPE_WAIT;
ovp->async.func = COMM_WaitCommEventService;
ovp->async.event = lpOverlapped->hEvent;
ovp->async.iosb = (IO_STATUS_BLOCK*)lpOverlapped;
ovp->buffer = (char *)lpdwEvents;
commio->handle = hFile;
commio->type = ASYNC_TYPE_WAIT;
commio->apc_internal = COMM_WaitCommEventService;
commio->buffer = (char *)lpdwEvents;
commio->fd = fd; /* FIXME */
lpOverlapped->InternalHigh = 0;
lpOverlapped->Offset = 0;
lpOverlapped->OffsetHigh = 0;
if ( !register_new_async (&ovp->async) )
SetLastError( ERROR_IO_PENDING );
SERVER_START_REQ( register_async )
{
req->handle = hFile;
req->io_apc = COMM_WaitCommEventService;
req->io_user = commio;
req->io_sb = (IO_STATUS_BLOCK*)lpOverlapped;
req->count = 0;
status = wine_server_call( req );
}
SERVER_END_REQ;
if ( status ) SetLastError( RtlNtStatusToDosError(status) );
else NtCurrentTeb()->num_async_io++;
return FALSE;
}
......
......@@ -43,7 +43,8 @@
#include "excpt.h"
#include "wine/unicode.h"
#include "wine/debug.h"
#include "async.h"
#include "thread.h"
#include "wine/server.h"
WINE_DEFAULT_DEBUG_CHANNEL(file);
......@@ -512,7 +513,7 @@ BOOL WINAPI GetOverlappedResult(HANDLE hFile, LPOVERLAPPED lpOverlapped,
else
{
/* busy loop */
while ( (volatile DWORD)lpOverlapped->Internal == STATUS_PENDING )
while ( ((volatile OVERLAPPED*)lpOverlapped)->Internal == STATUS_PENDING )
Sleep( 10 );
}
}
......@@ -532,7 +533,7 @@ BOOL WINAPI GetOverlappedResult(HANDLE hFile, LPOVERLAPPED lpOverlapped,
}
if ( r == WAIT_FAILED )
{
ERR("wait operation failed\n");
WARN("wait operation failed\n");
return FALSE;
}
if (lpTransferred) *lpTransferred = lpOverlapped->InternalHigh;
......@@ -556,17 +557,14 @@ BOOL WINAPI GetOverlappedResult(HANDLE hFile, LPOVERLAPPED lpOverlapped,
*/
BOOL WINAPI CancelIo(HANDLE handle)
{
async_private *ovp,*t;
IO_STATUS_BLOCK io_status;
TRACE("handle = %p\n",handle);
for (ovp = NtCurrentTeb()->pending_list; ovp; ovp = t)
NtCancelIoFile(handle, &io_status);
if (io_status.u.Status)
{
t = ovp->next;
if ( ovp->handle == handle )
cancel_async ( ovp );
SetLastError( RtlNtStatusToDosError( io_status.u.Status ) );
return FALSE;
}
SleepEx(1,TRUE);
return TRUE;
}
......
......@@ -51,7 +51,6 @@
#include "winbase.h"
#include "winreg.h"
#include "winternl.h"
#include "async.h"
#include "thread.h"
#include "wine/server.h"
#include "wine/debug.h"
......@@ -605,35 +604,6 @@ NTSTATUS WINAPI NtSetTimerResolution(IN ULONG resolution,
/***********************************************************************
* check_async_list
*
* Process a status event from the server.
*/
static void WINAPI check_async_list(async_private *asp, DWORD status)
{
async_private *ovp;
DWORD ovp_status;
for( ovp = NtCurrentTeb()->pending_list; ovp && ovp != asp; ovp = ovp->next );
if(!ovp)
return;
if( status != STATUS_ALERTED )
{
ovp_status = status;
ovp->iosb->u.Status = status;
}
else ovp_status = ovp->iosb->u.Status;
if( ovp_status == STATUS_PENDING ) ovp->func( ovp );
/* This will destroy all but PENDING requests */
register_old_async( ovp );
}
/***********************************************************************
* wait_reply
*
* Wait for a reply on the waiting pipe of the current thread.
......@@ -697,7 +667,7 @@ static void call_apcs( BOOL alertable )
}
SERVER_END_REQ;
switch(type)
switch (type)
{
case APC_NONE:
return; /* no more APCs */
......@@ -714,7 +684,8 @@ static void call_apcs( BOOL alertable )
proc( arg3, time.u.LowPart, time.u.HighPart );
break;
case APC_ASYNC_IO:
check_async_list( arg1, (DWORD) arg2 );
NtCurrentTeb()->num_async_io--;
proc( arg1, (IO_STATUS_BLOCK*)arg2, (ULONG)arg3 );
break;
default:
server_protocol_error( "get_apc_request: bad type %d\n", type );
......@@ -810,7 +781,7 @@ NTSTATUS WINAPI NtYieldExecution(void)
NTSTATUS WINAPI NtDelayExecution( BOOLEAN alertable, const LARGE_INTEGER *timeout )
{
/* if alertable or async I/O in progress, we need to query the server */
if (alertable || NtCurrentTeb()->pending_list)
if (alertable || NtCurrentTeb()->num_async_io)
{
UINT flags = SELECT_INTERRUPTIBLE;
if (alertable) flags |= SELECT_ALERTABLE;
......
/*
* Structures and static functions for handling asynchronous I/O.
*
* Copyright (C) 2002 Mike McCormack, Martin Wilck
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
* This file declares static functions.
* It should only be included by those source files that implement async I/O requests.
*/
#ifndef __WINE_ASYNC_H
#define __WINE_ASYNC_H
#include <thread.h>
#include <ntstatus.h>
#include <wine/server.h>
#include <winternl.h>
struct async_private;
typedef void (*async_handler)(struct async_private *ovp);
typedef void (CALLBACK *async_call_completion_func)(ULONG_PTR data);
typedef DWORD (*async_get_count)(const struct async_private *ovp);
typedef void (*async_cleanup)(struct async_private *ovp);
typedef struct async_ops
{
async_get_count get_count;
async_call_completion_func call_completion;
async_cleanup cleanup;
} async_ops;
typedef struct async_private
{
struct async_ops* ops;
HANDLE handle;
HANDLE event;
int fd;
async_handler func;
int type;
IO_STATUS_BLOCK* iosb;
struct async_private* next;
struct async_private* prev;
} async_private;
/* All functions declared static for Dll separation purposes */
static void CALLBACK call_user_apc( ULONG_PTR arg1, ULONG_PTR arg2, ULONG_PTR arg3 )
{
PAPCFUNC func = (PAPCFUNC)arg1;
func( arg2 );
}
inline static void finish_async( async_private *ovp )
{
if (ovp->prev)
ovp->prev->next = ovp->next;
else
NtCurrentTeb()->pending_list = ovp->next;
if (ovp->next)
ovp->next->prev = ovp->prev;
ovp->next = ovp->prev = NULL;
wine_server_release_fd( ovp->handle, ovp->fd );
if ( ovp->event != INVALID_HANDLE_VALUE )
NtSetEvent( ovp->event, NULL );
if ( ovp->ops->call_completion )
NtQueueApcThread( GetCurrentThread(), call_user_apc,
(ULONG_PTR)ovp->ops->call_completion, (ULONG_PTR)ovp, 0 );
else
ovp->ops->cleanup( ovp );
}
inline static NTSTATUS __register_async( async_private *ovp, const DWORD status )
{
NTSTATUS ret;
SERVER_START_REQ( register_async )
{
req->handle = ovp->handle;
req->overlapped = ovp;
req->type = ovp->type;
req->count = ovp->ops->get_count( ovp );
req->status = status;
ret = wine_server_call( req );
}
SERVER_END_REQ;
if (ret) ovp->iosb->u.Status = ret;
if ( ovp->iosb->u.Status != STATUS_PENDING )
finish_async(ovp);
return ret;
}
inline static NTSTATUS register_old_async( async_private *ovp )
{
return __register_async(ovp, ovp->iosb->u.Status);
}
inline static NTSTATUS register_new_async( async_private *ovp )
{
ovp->iosb->u.Status = STATUS_PENDING;
ovp->next = NtCurrentTeb()->pending_list;
ovp->prev = NULL;
if ( ovp->next ) ovp->next->prev = ovp;
NtCurrentTeb()->pending_list = ovp;
return __register_async( ovp, STATUS_PENDING );
}
inline static NTSTATUS cancel_async( async_private *ovp )
{
/* avoid multiple cancellations */
if ( ovp->iosb->u.Status != STATUS_PENDING )
return STATUS_SUCCESS;
ovp->iosb->u.Status = STATUS_CANCELLED;
return __register_async( ovp, STATUS_CANCELLED );
}
#endif /* __WINE_ASYNC_H */
......@@ -112,7 +112,7 @@ typedef struct _TEB
int wait_fd[2]; /* --3 214 fd for sleeping server requests */
struct debug_info *debug_info; /* --3 21c Info for debugstr functions */
void *pthread_data; /* --3 220 Data for pthread emulation */
struct async_private *pending_list; /* --3 224 list of pending async operations */
DWORD num_async_io; /* --3 224 number of pending async I/O in the server */
void *driver_data; /* --3 228 Graphics driver private data */
DWORD dpmi_vif; /* --3 22c Protected mode virtual interrupt flag */
DWORD vm86_pending; /* --3 230 Data for vm86 mode */
......
......@@ -2282,21 +2282,33 @@ struct register_async_request
struct request_header __header;
obj_handle_t handle;
int type;
void* overlapped;
void* io_apc;
void* io_sb;
void* io_user;
int count;
unsigned int status;
};
struct register_async_reply
{
struct reply_header __header;
};
#define ASYNC_TYPE_NONE 0x00
#define ASYNC_TYPE_READ 0x01
#define ASYNC_TYPE_WRITE 0x02
#define ASYNC_TYPE_WAIT 0x03
struct cancel_async_request
{
struct request_header __header;
obj_handle_t handle;
};
struct cancel_async_reply
{
struct reply_header __header;
};
struct create_named_pipe_request
{
struct request_header __header;
......@@ -3274,6 +3286,7 @@ enum request
REQ_get_serial_info,
REQ_set_serial_info,
REQ_register_async,
REQ_cancel_async,
REQ_create_named_pipe,
REQ_open_named_pipe,
REQ_connect_named_pipe,
......@@ -3460,6 +3473,7 @@ union generic_request
struct get_serial_info_request get_serial_info_request;
struct set_serial_info_request set_serial_info_request;
struct register_async_request register_async_request;
struct cancel_async_request cancel_async_request;
struct create_named_pipe_request create_named_pipe_request;
struct open_named_pipe_request open_named_pipe_request;
struct connect_named_pipe_request connect_named_pipe_request;
......@@ -3644,6 +3658,7 @@ union generic_reply
struct get_serial_info_reply get_serial_info_reply;
struct set_serial_info_reply set_serial_info_reply;
struct register_async_reply register_async_reply;
struct cancel_async_reply cancel_async_reply;
struct create_named_pipe_reply create_named_pipe_reply;
struct open_named_pipe_reply open_named_pipe_reply;
struct connect_named_pipe_reply connect_named_pipe_reply;
......@@ -3697,6 +3712,6 @@ union generic_reply
struct set_global_windows_reply set_global_windows_reply;
};
#define SERVER_PROTOCOL_VERSION 153
#define SERVER_PROTOCOL_VERSION 154
#endif /* __WINE_WINE_SERVER_PROTOCOL_H */
......@@ -6,7 +6,6 @@ VPATH = @srcdir@
MODULE = none
C_SRCS = \
async.c \
atom.c \
change.c \
class.c \
......
/*
* Server-side support for async i/o operations
*
* Copyright (C) 1998 Alexandre Julliard
* Copyright (C) 2000 Mike McCormack
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "config.h"
#include <assert.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include "handle.h"
#include "file.h"
#include "thread.h"
#include "request.h"
#include "async.h"
void destroy_async( struct async *async )
{
struct async_queue *aq = async->q;
/*fprintf(stderr,"destroyed async %p\n",async->overlapped); */
if(async->timeout)
remove_timeout_user(async->timeout);
async->timeout = NULL;
if(async->prev)
async->prev->next = async->next;
else
aq->head = async->next;
if(async->next)
async->next->prev = async->prev;
else
aq->tail = async->prev;
async->q = NULL;
async->next = NULL;
async->prev = NULL;
release_object( async->thread );
free(async);
}
void async_notify(struct async *async, int status)
{
/* fprintf(stderr,"notifying %p!\n",async->overlapped); */
async->status = status;
thread_queue_apc(async->thread, NULL, NULL, APC_ASYNC_IO, 1,
async->overlapped, (void *)status, NULL );
}
void destroy_async_queue( struct async_queue *q )
{
while(q->head)
{
async_notify(q->head, STATUS_CANCELLED);
destroy_async(q->head);
}
}
struct async *find_async(struct async_queue *q, struct thread *thread, void *overlapped)
{
struct async *async;
/* fprintf(stderr,"find_async: %p\n",overlapped); */
if(!q)
return NULL;
for(async = q->head; async; async = async->next)
if((async->overlapped==overlapped) && (async->thread == thread))
return async;
return NULL;
}
void async_insert(struct async_queue *q, struct async *async)
{
async->q = q;
async->prev = q->tail;
async->next = NULL;
if(q->tail)
q->tail->next = async;
else
q->head = async;
q->tail = async;
}
static void async_callback(void *private)
{
struct async *async = (struct async *)private;
/* fprintf(stderr,"%p timeout out\n",async->overlapped); */
async->timeout = NULL;
async_notify(async, STATUS_TIMEOUT);
destroy_async(async);
}
struct async *create_async(struct object *obj, struct thread *thread,
void *overlapped)
{
struct async *async = (struct async *) malloc(sizeof(struct async));
if(!async)
{
set_error(STATUS_NO_MEMORY);
return NULL;
}
async->obj = obj;
async->thread = (struct thread *)grab_object(thread);
async->overlapped = overlapped;
async->next = NULL;
async->prev = NULL;
async->q = NULL;
async->status = STATUS_PENDING;
async->timeout = NULL;
return async;
}
void async_add_timeout(struct async *async, int timeout)
{
if(timeout)
{
gettimeofday( &async->when, 0 );
add_timeout( &async->when, timeout );
async->timeout = add_timeout_user( &async->when, async_callback, async );
}
}
/*
* Async i/o definitions
*
* Copyright (C) 2000 Mike McCormack
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _SERVER_ASYNC_
#define _SERVER_ASYNC_
#include <sys/time.h>
#include "object.h"
struct async_queue;
struct async
{
struct object *obj;
struct thread *thread;
void *overlapped;
unsigned int status;
struct timeval when;
struct timeout_user *timeout;
struct async *next,*prev;
struct async_queue *q;
};
struct async_queue
{
struct async *head;
struct async *tail;
};
void destroy_async( struct async *async );
void destroy_async_queue( struct async_queue *q );
void async_notify(struct async *async, int status);
struct async *find_async(struct async_queue *q, struct thread *thread, void *overlapped);
void async_insert(struct async_queue *q, struct async *async);
struct async *create_async(struct object *obj, struct thread *thread,
void *overlapped);
void async_add_timeout(struct async *async, int timeout);
static inline void init_async_queue(struct async_queue *q)
{
q->head = q->tail = NULL;
}
#define IS_READY(q) (((q).head) && ((q).head->status==STATUS_PENDING))
#endif /* _SERVER_ASYNC_ */
......@@ -980,6 +980,89 @@ void unlock_fd( struct fd *fd, file_pos_t start, file_pos_t count )
/****************************************************************/
/* asynchronous operations support */
struct async
{
struct fd *fd;
struct thread *thread;
void *apc;
void *user;
void *sb;
struct timeval when;
struct timeout_user *timeout;
struct async *next;
struct async **head;
};
/* cb for timeout on an async request */
static void async_callback(void *private)
{
struct async *async = (struct async *)private;
/* fprintf(stderr, "async timeout out %p\n", async); */
async->timeout = NULL;
async_terminate( async, STATUS_TIMEOUT );
}
/* create an async on a given queue of a fd */
struct async *create_async(struct fd *fd, struct thread *thread,
int timeout, struct async **head,
void *io_apc, void *io_user, void* io_sb)
{
struct async *async = mem_alloc( sizeof(struct async) );
struct async **p;
if (!async) return NULL;
async->fd = fd;
async->thread = (struct thread *)grab_object(thread);
async->apc = io_apc;
async->user = io_user;
async->sb = io_sb;
async->head = head;
async->next = NULL;
for (p = head; *p; p = &(*p)->next);
*p = async;
if (timeout)
{
gettimeofday( &async->when, 0 );
add_timeout( &async->when, timeout );
async->timeout = add_timeout_user( &async->when, async_callback, async );
}
else async->timeout = NULL;
return async;
}
/* notifies client thread of new status of its async request */
/* destroys the server side of it */
void async_terminate( struct async *async, int status )
{
struct async** p;
thread_queue_apc( async->thread, NULL, async->apc, APC_ASYNC_IO,
1, async->user, async->sb, (void *)status );
if (async->timeout) remove_timeout_user( async->timeout );
async->timeout = NULL;
for (p = async->head; *p; p = &(*p)->next)
{
if (*p == async)
{
*p = async->next;
break;
}
}
release_object( async->thread );
free( async );
}
/****************************************************************/
/* file descriptor functions */
static void fd_dump( struct object *obj, int verbose )
......@@ -1309,7 +1392,14 @@ int no_get_file_info( struct fd *fd )
}
/* default queue_async() routine */
void no_queue_async( struct fd *fd, void* ptr, unsigned int status, int type, int count )
void no_queue_async( struct fd *fd, void* apc, void* user, void* io_sb,
int type, int count)
{
set_error( STATUS_OBJECT_TYPE_MISMATCH );
}
/* default cancel_async() routine */
void no_cancel_async( struct fd *fd )
{
set_error( STATUS_OBJECT_TYPE_MISMATCH );
}
......@@ -1338,7 +1428,7 @@ DECL_HANDLER(flush_file)
if (fd)
{
fd->fd_ops->flush( fd, &event );
if( event )
if ( event )
{
reply->event = alloc_handle( current->process, event, SYNCHRONIZE, 0 );
}
......@@ -1372,26 +1462,41 @@ DECL_HANDLER(register_async)
{
struct fd *fd = get_handle_fd_obj( current->process, req->handle, 0 );
/*
* The queue_async method must do the following:
*
* 1. Get the async_queue for the request of given type.
* 2. Call find_async() to look for the specific client request in the queue (=> NULL if not found).
* 3. If status is STATUS_PENDING:
* a) If no async request found in step 2 (new request): call create_async() to initialize one.
* b) Set request's status to STATUS_PENDING.
* c) If the "queue" field of the async request is NULL: call async_insert() to put it into the queue.
* Otherwise:
* If the async request was found in step 2, destroy it by calling destroy_async().
* 4. Carry out any operations necessary to adjust the object's poll events
* Usually: set_elect_events (obj, obj->ops->get_poll_events()).
*
* See also the implementations in file.c, serial.c, and sock.c.
*/
/*
* The queue_async method must do the following:
*
* 1. Get the async_queue for the request of given type.
* 2. Create a new asynchronous request for the selected queue
* 3. Carry out any operations necessary to adjust the object's poll events
* Usually: set_elect_events (obj, obj->ops->get_poll_events()).
* 4. When the async request is triggered, then send back (with a proper APC)
* the trigger (STATUS_ALERTED) to the thread that posted the request.
* async_destroy() is to be called: it will both notify the sender about
* the trigger and destroy the request by itself
* See also the implementations in file.c, serial.c, and sock.c.
*/
if (fd)
{
fd->fd_ops->queue_async( fd, req->overlapped, req->status, req->type, req->count );
fd->fd_ops->queue_async( fd, req->io_apc, req->io_user, req->io_sb,
req->type, req->count );
release_object( fd );
}
}
/* cancels all async I/O */
DECL_HANDLER(cancel_async)
{
struct fd *fd = get_handle_fd_obj( current->process, req->handle, 0 );
if (fd)
{
/* Note: we don't kill the queued APC_ASYNC_IO on this thread because
* NtCancelIoFile() will force the pending APC to be run. Since,
* Windows only guarantees that the current thread will have no async
* operation on the current fd when NtCancelIoFile returns, this shall
* do the work.
*/
fd->fd_ops->cancel_async( fd );
release_object( fd );
}
}
......@@ -50,7 +50,6 @@
#include "handle.h"
#include "thread.h"
#include "request.h"
#include "async.h"
struct file
{
......@@ -58,8 +57,8 @@ struct file
struct fd *fd; /* file descriptor for this file */
unsigned int access; /* file access (GENERIC_READ/WRITE) */
unsigned int options; /* file options (FILE_DELETE_ON_CLOSE, FILE_SYNCHRONOUS...) */
struct async_queue read_q;
struct async_queue write_q;
struct async *read_q;
struct async *write_q;
};
static void file_dump( struct object *obj, int verbose );
......@@ -70,7 +69,8 @@ static int file_get_poll_events( struct fd *fd );
static void file_poll_event( struct fd *fd, int event );
static int file_flush( struct fd *fd, struct event **event );
static int file_get_info( struct fd *fd );
static void file_queue_async( struct fd *fd, void *ptr, unsigned int status, int type, int count );
static void file_queue_async( struct fd *fd, void *apc, void *user, void* iosb, int type, int count );
static void file_cancel_async( struct fd *fd );
static const struct object_ops file_ops =
{
......@@ -90,7 +90,8 @@ static const struct fd_ops file_fd_ops =
file_poll_event, /* poll_event */
file_flush, /* flush */
file_get_info, /* get_file_info */
file_queue_async /* queue_async */
file_queue_async, /* queue_async */
file_cancel_async /* cancel_async */
};
static inline int is_overlapped( const struct file *file )
......@@ -141,6 +142,7 @@ static struct object *create_file( const char *nameptr, size_t len, unsigned int
case FILE_OVERWRITE: flags = O_TRUNC; break;
default: set_error( STATUS_INVALID_PARAMETER ); goto error;
}
switch(access & (GENERIC_READ | GENERIC_WRITE))
{
case 0: break;
......@@ -160,8 +162,7 @@ static struct object *create_file( const char *nameptr, size_t len, unsigned int
file->options = options;
if (is_overlapped( file ))
{
init_async_queue (&file->read_q);
init_async_queue (&file->write_q);
file->read_q = file->write_q = NULL;
}
/* FIXME: should set error to STATUS_OBJECT_NAME_COLLISION if file existed before */
......@@ -236,14 +237,14 @@ static void file_poll_event( struct fd *fd, int event )
assert( file->obj.ops == &file_ops );
if (is_overlapped( file ))
{
if( IS_READY(file->read_q) && (POLLIN & event) )
if ( file->read_q && (POLLIN & event) )
{
async_notify(file->read_q.head, STATUS_ALERTED);
async_terminate( file->read_q, STATUS_ALERTED );
return;
}
if( IS_READY(file->write_q) && (POLLOUT & event) )
if ( file->write_q && (POLLOUT & event) )
{
async_notify(file->write_q.head, STATUS_ALERTED);
async_terminate( file->write_q, STATUS_ALERTED );
return;
}
}
......@@ -266,56 +267,51 @@ static int file_get_info( struct fd *fd )
else return 0;
}
static void file_queue_async(struct fd *fd, void *ptr, unsigned int status, int type, int count)
static void file_queue_async( struct fd *fd, void *apc, void *user, void *iosb,
int type, int count )
{
struct file *file = get_fd_user( fd );
struct async *async;
struct async_queue *q;
struct async **head;
int events;
assert( file->obj.ops == &file_ops );
if (!is_overlapped( file ))
{
set_error ( STATUS_INVALID_HANDLE );
set_error( STATUS_INVALID_HANDLE );
return;
}
switch(type)
switch (type)
{
case ASYNC_TYPE_READ:
q = &file->read_q;
head = &file->read_q;
break;
case ASYNC_TYPE_WRITE:
q = &file->write_q;
head = &file->write_q;
break;
default:
set_error( STATUS_INVALID_PARAMETER );
return;
}
async = find_async ( q, current, ptr );
if ( status == STATUS_PENDING )
{
int events;
if (!create_async( fd, current, 0, head, apc, user, iosb ))
return;
if ( !async )
async = create_async ( &file->obj, current, ptr );
if ( !async )
return;
/* Check if the new pending request can be served immediately */
events = check_fd_events( fd, file_get_poll_events( fd ) );
if (events) file_poll_event( fd, events );
async->status = STATUS_PENDING;
if ( !async->q )
async_insert( q, async );
set_fd_events( fd, file_get_poll_events( fd ));
}
/* Check if the new pending request can be served immediately */
events = check_fd_events( fd, file_get_poll_events( fd ) );
if (events) file_poll_event ( fd, events );
}
else if ( async ) destroy_async ( async );
else set_error ( STATUS_INVALID_PARAMETER );
static void file_cancel_async( struct fd *fd )
{
struct file *file = get_fd_user( fd );
assert( file->obj.ops == &file_ops );
set_fd_events( fd, file_get_poll_events( fd ));
async_terminate_queue( &file->read_q, STATUS_CANCELLED );
async_terminate_queue( &file->write_q, STATUS_CANCELLED );
}
static struct fd *file_get_fd( struct object *obj )
......@@ -332,8 +328,8 @@ static void file_destroy( struct object *obj )
if (is_overlapped( file ))
{
destroy_async_queue (&file->read_q);
destroy_async_queue (&file->write_q);
async_terminate_queue( &file->read_q, STATUS_CANCELLED );
async_terminate_queue( &file->write_q, STATUS_CANCELLED );
}
if (file->fd) release_object( file->fd );
}
......
......@@ -38,8 +38,10 @@ struct fd_ops
int (*flush)(struct fd *, struct event **);
/* get file information */
int (*get_file_info)(struct fd *);
/* queue an async operation - see register_async handler in async.c*/
void (*queue_async)(struct fd *, void* ptr, unsigned int status, int type, int count);
/* queue an async operation */
void (*queue_async)(struct fd *, void* apc, void* user, void* io_sb, int type, int count);
/* cancel an async operation */
void (*cancel_async)(struct fd *);
};
/* file descriptor functions */
......@@ -65,7 +67,8 @@ extern int default_fd_signaled( struct object *obj, struct thread *thread );
extern void default_poll_event( struct fd *fd, int event );
extern int no_flush( struct fd *fd, struct event **event );
extern int no_get_file_info( struct fd *fd );
extern void no_queue_async( struct fd *fd, void* ptr, unsigned int status, int type, int count );
extern void no_queue_async( struct fd *fd, void* apc, void* user, void* io_sb, int type, int count);
extern void no_cancel_async( struct fd *fd );
extern void main_loop(void);
inline static struct fd *get_obj_fd( struct object *obj ) { return obj->ops->get_fd( obj ); }
......@@ -107,4 +110,14 @@ extern void sigio_callback(void);
extern int is_serial_fd( struct fd *fd );
extern struct object *create_serial( struct fd *fd, unsigned int options );
/* async I/O functions */
extern struct async *create_async( struct fd *fd, struct thread *thread, int timeout,
struct async **head, void *, void *, void *);
extern void async_terminate( struct async *async, int status );
static inline void async_terminate_queue( struct async **head, int status )
{
while (*head) async_terminate( *head, status );
}
#endif /* __WINE_SERVER_FILE_H */
......@@ -151,7 +151,8 @@ static const struct fd_ops pipe_server_fd_ops =
default_poll_event, /* poll_event */
pipe_server_flush, /* flush */
pipe_end_get_info, /* get_file_info */
no_queue_async /* queue_async */
no_queue_async, /* queue_async */
no_cancel_async, /* cancel_async */
};
/* client end functions */
......@@ -178,7 +179,8 @@ static const struct fd_ops pipe_client_fd_ops =
default_poll_event, /* poll_event */
pipe_client_flush, /* flush */
pipe_end_get_info, /* get_file_info */
no_queue_async /* queue_async */
no_queue_async, /* queue_async */
no_cancel_async /* cancel_async */
};
static void named_pipe_dump( struct object *obj, int verbose )
......
......@@ -76,7 +76,8 @@ static const struct fd_ops process_fd_ops =
process_poll_event, /* poll_event */
no_flush, /* flush */
no_get_file_info, /* get_file_info */
no_queue_async /* queue_async */
no_queue_async, /* queue_async */
no_cancel_async /* cancel async */
};
/* process startup info */
......
......@@ -1615,20 +1615,26 @@ enum message_type
#define SERIALINFO_SET_ERROR 0x04
/* Create / reschedule an async I/O */
/* Create an async I/O */
@REQ(register_async)
obj_handle_t handle; /* handle to comm port, socket or file */
int type;
void* overlapped;
int count;
unsigned int status;
obj_handle_t handle; /* handle to comm port, socket or file */
int type; /* type of queue to look after */
void* io_apc; /* APC routine to queue upon end of async */
void* io_sb; /* I/O status block (unique across all async on this handle) */
void* io_user; /* data to pass back to caller */
int count; /* count - usually # of bytes to be read/written */
@END
#define ASYNC_TYPE_NONE 0x00
#define ASYNC_TYPE_READ 0x01
#define ASYNC_TYPE_WRITE 0x02
#define ASYNC_TYPE_WAIT 0x03
/* Cancel all async op on a fd */
@REQ(cancel_async)
obj_handle_t handle; /* handle to comm port, socket or file */
@END
/* Create a named pipe */
@REQ(create_named_pipe)
unsigned int openmode;
......
......@@ -100,7 +100,8 @@ static const struct fd_ops master_socket_fd_ops =
master_socket_poll_event, /* poll_event */
no_flush, /* flush */
no_get_file_info, /* get_file_info */
no_queue_async /* queue_async */
no_queue_async, /* queue_async */
no_cancel_async /* cancel_async */
};
......
......@@ -231,6 +231,7 @@ DECL_HANDLER(kill_win_timer);
DECL_HANDLER(get_serial_info);
DECL_HANDLER(set_serial_info);
DECL_HANDLER(register_async);
DECL_HANDLER(cancel_async);
DECL_HANDLER(create_named_pipe);
DECL_HANDLER(open_named_pipe);
DECL_HANDLER(connect_named_pipe);
......@@ -416,6 +417,7 @@ static const req_handler req_handlers[REQ_NB_REQUESTS] =
(req_handler)req_get_serial_info,
(req_handler)req_set_serial_info,
(req_handler)req_register_async,
(req_handler)req_cancel_async,
(req_handler)req_create_named_pipe,
(req_handler)req_open_named_pipe,
(req_handler)req_connect_named_pipe,
......
......@@ -53,7 +53,6 @@
#include "handle.h"
#include "thread.h"
#include "request.h"
#include "async.h"
static void serial_dump( struct object *obj, int verbose );
static struct fd *serial_get_fd( struct object *obj );
......@@ -63,7 +62,8 @@ static int serial_get_poll_events( struct fd *fd );
static void serial_poll_event( struct fd *fd, int event );
static int serial_get_info( struct fd *fd );
static int serial_flush( struct fd *fd, struct event **event );
static void serial_queue_async(struct fd *fd, void *ptr, unsigned int status, int type, int count);
static void serial_queue_async( struct fd *fd, void *apc, void *user, void *iosb, int type, int count );
static void serial_cancel_async( struct fd *fd );
struct serial
{
......@@ -83,9 +83,9 @@ struct serial
struct termios original;
struct async_queue read_q;
struct async_queue write_q;
struct async_queue wait_q;
struct async *read_q;
struct async *write_q;
struct async *wait_q;
/* FIXME: add dcb, comm status, handler module, sharing */
};
......@@ -108,7 +108,8 @@ static const struct fd_ops serial_fd_ops =
serial_poll_event, /* poll_event */
serial_flush, /* flush */
serial_get_info, /* get_file_info */
serial_queue_async /* queue_async */
serial_queue_async, /* queue_async */
serial_cancel_async /* cancel_async */
};
/* check if the given fd is a serial port */
......@@ -144,9 +145,7 @@ struct object *create_serial( struct fd *fd, unsigned int options )
serial->writeconst = 0;
serial->eventmask = 0;
serial->commerror = 0;
init_async_queue(&serial->read_q);
init_async_queue(&serial->write_q);
init_async_queue(&serial->wait_q);
serial->read_q = serial->write_q = serial->wait_q = NULL;
if (!(serial->fd = create_anonymous_fd( &serial_fd_ops, unix_fd, &serial->obj )))
{
release_object( serial );
......@@ -165,9 +164,9 @@ static void serial_destroy( struct object *obj)
{
struct serial *serial = (struct serial *)obj;
destroy_async_queue(&serial->read_q);
destroy_async_queue(&serial->write_q);
destroy_async_queue(&serial->wait_q);
async_terminate_queue( &serial->read_q, STATUS_CANCELLED );
async_terminate_queue( &serial->write_q, STATUS_CANCELLED );
async_terminate_queue( &serial->wait_q, STATUS_CANCELLED );
if (serial->fd) release_object( serial->fd );
}
......@@ -189,12 +188,9 @@ static int serial_get_poll_events( struct fd *fd )
int events = 0;
assert( serial->obj.ops == &serial_ops );
if(IS_READY(serial->read_q))
events |= POLLIN;
if(IS_READY(serial->write_q))
events |= POLLOUT;
if(IS_READY(serial->wait_q))
events |= POLLIN;
if (serial->read_q) events |= POLLIN;
if (serial->write_q) events |= POLLOUT;
if (serial->wait_q) events |= POLLIN;
/* fprintf(stderr,"poll events are %04x\n",events); */
......@@ -225,39 +221,40 @@ static void serial_poll_event(struct fd *fd, int event)
/* fprintf(stderr,"Poll event %02x\n",event); */
if(IS_READY(serial->read_q) && (POLLIN & event) )
async_notify(serial->read_q.head,STATUS_ALERTED);
if (serial->read_q && (POLLIN & event) )
async_terminate( serial->read_q, STATUS_ALERTED );
if(IS_READY(serial->write_q) && (POLLOUT & event) )
async_notify(serial->write_q.head,STATUS_ALERTED);
if (serial->write_q && (POLLOUT & event) )
async_terminate( serial->write_q, STATUS_ALERTED );
if(IS_READY(serial->wait_q) && (POLLIN & event) )
async_notify(serial->wait_q.head,STATUS_ALERTED);
if (serial->wait_q && (POLLIN & event) )
async_terminate( serial->wait_q, STATUS_ALERTED );
set_fd_events( fd, serial_get_poll_events(fd) );
}
static void serial_queue_async(struct fd *fd, void *ptr, unsigned int status, int type, int count)
static void serial_queue_async( struct fd *fd, void *apc, void *user, void *iosb,
int type, int count )
{
struct serial *serial = get_fd_user( fd );
struct async_queue *q;
struct async *async;
struct async **head;
int timeout;
int events;
assert(serial->obj.ops == &serial_ops);
switch(type)
switch (type)
{
case ASYNC_TYPE_READ:
q = &serial->read_q;
head = &serial->read_q;
timeout = serial->readconst + serial->readmult*count;
break;
case ASYNC_TYPE_WAIT:
q = &serial->wait_q;
head = &serial->wait_q;
timeout = 0;
break;
case ASYNC_TYPE_WRITE:
q = &serial->write_q;
head = &serial->write_q;
timeout = serial->writeconst + serial->writemult*count;
break;
default:
......@@ -265,37 +262,29 @@ static void serial_queue_async(struct fd *fd, void *ptr, unsigned int status, in
return;
}
async = find_async ( q, current, ptr );
if (!create_async( fd, current, timeout, head, apc, user, iosb ))
return;
if ( status == STATUS_PENDING )
/* Check if the new pending request can be served immediately */
events = check_fd_events( fd, serial_get_poll_events( fd ) );
if (events)
{
int events;
if ( !async )
async = create_async ( &serial->obj, current, ptr );
if ( !async )
return;
/* serial_poll_event() calls set_select_events() */
serial_poll_event( fd, events );
return;
}
async->status = STATUS_PENDING;
if(!async->q)
{
async_add_timeout(async,timeout);
async_insert(q, async);
}
set_fd_events( fd, serial_get_poll_events( fd ) );
}
/* Check if the new pending request can be served immediately */
events = check_fd_events( fd, serial_get_poll_events( fd ) );
if (events)
{
/* serial_poll_event() calls set_select_events() */
serial_poll_event( fd, events );
return;
}
}
else if ( async ) destroy_async ( async );
else set_error ( STATUS_INVALID_PARAMETER );
static void serial_cancel_async( struct fd *fd )
{
struct serial *serial = get_fd_user( fd );
assert(serial->obj.ops == &serial_ops);
set_fd_events ( fd, serial_get_poll_events( fd ) );
async_terminate_queue( &serial->read_q, STATUS_CANCELLED );
async_terminate_queue( &serial->write_q, STATUS_CANCELLED );
async_terminate_queue( &serial->wait_q, STATUS_CANCELLED );
}
static int serial_flush( struct fd *fd, struct event **event )
......@@ -338,7 +327,7 @@ DECL_HANDLER(set_serial_info)
if ((serial = get_serial_obj( current->process, req->handle, 0 )))
{
/* timeouts */
if(req->flags & SERIALINFO_SET_TIMEOUTS)
if (req->flags & SERIALINFO_SET_TIMEOUTS)
{
serial->readinterval = req->readinterval;
serial->readconst = req->readconst;
......@@ -348,21 +337,17 @@ DECL_HANDLER(set_serial_info)
}
/* event mask */
if(req->flags & SERIALINFO_SET_MASK)
if (req->flags & SERIALINFO_SET_MASK)
{
serial->eventmask = req->eventmask;
if(!serial->eventmask)
if (!serial->eventmask)
{
while(serial->wait_q.head)
{
async_notify(serial->wait_q.head, STATUS_SUCCESS);
destroy_async(serial->wait_q.head);
}
async_terminate_queue( &serial->wait_q, STATUS_SUCCESS );
}
}
/* comm port error status */
if(req->flags & SERIALINFO_SET_ERROR)
if (req->flags & SERIALINFO_SET_ERROR)
{
serial->commerror = req->commerror;
}
......
......@@ -71,7 +71,8 @@ static const struct fd_ops handler_fd_ops =
handler_poll_event, /* poll_event */
no_flush, /* flush */
no_get_file_info, /* get_file_info */
no_queue_async /* queue_async */
no_queue_async, /* queue_async */
no_cancel_async /* cancel_async */
};
static struct handler *handler_sighup;
......
......@@ -100,7 +100,8 @@ static const struct fd_ops thread_fd_ops =
thread_poll_event, /* poll_event */
no_flush, /* flush */
no_get_file_info, /* get_file_info */
no_queue_async /* queue_async */
no_queue_async, /* queue_async */
no_cancel_async /* cancel_async */
};
static struct thread *first_thread;
......
......@@ -1948,9 +1948,15 @@ static void dump_register_async_request( const struct register_async_request *re
{
fprintf( stderr, " handle=%p,", req->handle );
fprintf( stderr, " type=%d,", req->type );
fprintf( stderr, " overlapped=%p,", req->overlapped );
fprintf( stderr, " count=%d,", req->count );
fprintf( stderr, " status=%08x", req->status );
fprintf( stderr, " io_apc=%p,", req->io_apc );
fprintf( stderr, " io_sb=%p,", req->io_sb );
fprintf( stderr, " io_user=%p,", req->io_user );
fprintf( stderr, " count=%d", req->count );
}
static void dump_cancel_async_request( const struct cancel_async_request *req )
{
fprintf( stderr, " handle=%p", req->handle );
}
static void dump_create_named_pipe_request( const struct create_named_pipe_request *req )
......@@ -2730,6 +2736,7 @@ static const dump_func req_dumpers[REQ_NB_REQUESTS] = {
(dump_func)dump_get_serial_info_request,
(dump_func)dump_set_serial_info_request,
(dump_func)dump_register_async_request,
(dump_func)dump_cancel_async_request,
(dump_func)dump_create_named_pipe_request,
(dump_func)dump_open_named_pipe_request,
(dump_func)dump_connect_named_pipe_request,
......@@ -2912,6 +2919,7 @@ static const dump_func reply_dumpers[REQ_NB_REQUESTS] = {
(dump_func)dump_get_serial_info_reply,
(dump_func)0,
(dump_func)0,
(dump_func)0,
(dump_func)dump_create_named_pipe_reply,
(dump_func)dump_open_named_pipe_reply,
(dump_func)0,
......@@ -3094,6 +3102,7 @@ static const char * const req_names[REQ_NB_REQUESTS] = {
"get_serial_info",
"set_serial_info",
"register_async",
"cancel_async",
"create_named_pipe",
"open_named_pipe",
"connect_named_pipe",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment