Commit d4139833 authored by Jacek Caban's avatar Jacek Caban Committed by Alexandre Julliard

server Introduce read queue for server-side named pipe I/O.

parent 2c392176
......@@ -474,6 +474,15 @@ struct iosb *async_get_iosb( struct async *async )
return async->iosb ? (struct iosb *)grab_object( async->iosb ) : NULL;
}
/* find the first pending async in queue */
struct async *find_pending_async( struct async_queue *queue )
{
struct async *async;
if (queue) LIST_FOR_EACH_ENTRY( async, &queue->queue, struct async, queue_entry )
if (async->status == STATUS_PENDING) return (struct async *)grab_object( async );
return NULL;
}
/* cancels all async I/O */
DECL_HANDLER(cancel_async)
{
......
......@@ -188,6 +188,7 @@ extern struct completion *fd_get_completion( struct fd *fd, apc_param_t *p_key )
extern void fd_copy_completion( struct fd *src, struct fd *dst );
extern struct iosb *create_iosb( const void *in_data, data_size_t in_size, data_size_t out_size );
extern struct iosb *async_get_iosb( struct async *async );
extern struct async *find_pending_async( struct async_queue *queue );
extern void cancel_process_asyncs( struct process *process );
/* access rights that require Unix read permission */
......
......@@ -81,6 +81,7 @@ struct pipe_end
struct pipe_end *connection; /* the other end of the pipe */
data_size_t buffer_size;/* size of buffered data that doesn't block caller */
struct list message_queue;
struct async_queue *read_q; /* read queue */
struct async_queue *write_q; /* write queue */
};
......@@ -425,6 +426,7 @@ static void pipe_end_disconnect( struct pipe_end *pipe_end, unsigned int status
struct pipe_message *message, *next;
struct async *async;
if (pipe_end->fd) fd_async_wake_up( pipe_end->fd, ASYNC_TYPE_WAIT, status );
async_wake_up( pipe_end->read_q, status );
LIST_FOR_EACH_ENTRY_SAFE( message, next, &pipe_end->message_queue, struct pipe_message, entry )
{
async = message->async;
......@@ -470,6 +472,7 @@ static void pipe_end_destroy( struct pipe_end *pipe_end )
free_message( message );
}
free_async_queue( pipe_end->read_q );
free_async_queue( pipe_end->write_q );
}
......@@ -681,10 +684,89 @@ static obj_handle_t pipe_client_flush( struct fd *fd, struct async *async, int b
return 0;
}
static void message_queue_read( struct pipe_end *pipe_end, struct iosb *iosb )
{
struct pipe_message *message;
data_size_t avail = 0;
LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry )
{
avail += message->iosb->in_size - message->read_pos;
if (avail >= iosb->out_size) break;
}
iosb->out_size = min( iosb->out_size, avail );
iosb->status = STATUS_SUCCESS;
message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
if (!message->read_pos && message->iosb->in_size == iosb->out_size) /* fast path */
{
iosb->out_data = message->iosb->in_data;
message->iosb->in_data = NULL;
wake_message( message );
free_message( message );
}
else
{
data_size_t write_pos = 0, writing;
char *buf = NULL;
if (iosb->out_size && !(buf = iosb->out_data = malloc( iosb->out_size )))
{
iosb->out_size = 0;
iosb->status = STATUS_NO_MEMORY;
return;
}
do
{
message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
writing = min( iosb->out_size - write_pos, message->iosb->in_size - message->read_pos );
if (writing) memcpy( buf + write_pos, (const char *)message->iosb->in_data + message->read_pos, writing );
write_pos += writing;
message->read_pos += writing;
if (message->read_pos == message->iosb->in_size)
{
wake_message(message);
free_message(message);
}
} while (write_pos < iosb->out_size);
}
iosb->result = iosb->out_size;
}
/* We call async_terminate in our reselect implementation, which causes recursive reselect.
* We're not interested in such reselect calls, so we ignore them. */
static int ignore_reselect;
static void reselect_write_queue( struct pipe_end *pipe_end );
static void reselect_read_queue( struct pipe_end *pipe_end )
{
struct async *async;
struct iosb *iosb;
int read_done = 0;
ignore_reselect = 1;
while (!list_empty( &pipe_end->message_queue) && (async = find_pending_async( pipe_end->read_q )))
{
iosb = async_get_iosb( async );
message_queue_read( pipe_end, iosb );
async_terminate( async, iosb->result ? STATUS_ALERTED : iosb->status );
release_object( async );
release_object( iosb );
read_done = 1;
}
ignore_reselect = 0;
if (pipe_end->connection)
{
if (list_empty( &pipe_end->message_queue ))
fd_async_wake_up( pipe_end->connection->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
else if (read_done)
reselect_write_queue( pipe_end->connection );
}
}
static void reselect_write_queue( struct pipe_end *pipe_end )
{
struct pipe_message *message, *next;
......@@ -777,6 +859,8 @@ static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue )
default_fd_reselect_async( fd, queue );
else if (pipe_end->write_q && pipe_end->write_q == queue)
reselect_write_queue( pipe_end );
else if (pipe_end->read_q && pipe_end->read_q == queue)
reselect_read_queue( pipe_end );
}
static inline int is_overlapped( unsigned int options )
......@@ -937,6 +1021,7 @@ static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, d
pipe_end->flags = pipe_flags;
pipe_end->connection = NULL;
pipe_end->buffer_size = buffer_size;
pipe_end->read_q = NULL;
pipe_end->write_q = NULL;
list_init( &pipe_end->message_queue );
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment