/* * Server-side pipe management * * Copyright (C) 1998 Alexandre Julliard * Copyright (C) 2001 Mike McCormack * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * TODO: * message mode */ #include "config.h" #include "wine/port.h" #include <assert.h> #include <fcntl.h> #include <string.h> #include <stdio.h> #include <stdlib.h> #include <sys/time.h> #include <sys/types.h> #ifdef HAVE_SYS_SOCKET_H #include <sys/socket.h> #endif #include <time.h> #include <unistd.h> #include "winbase.h" #include "file.h" #include "handle.h" #include "thread.h" #include "request.h" enum pipe_state { ps_none, ps_idle_server, ps_wait_open, ps_connected_server, ps_wait_disconnect, ps_disconnected_server, ps_wait_connect }; struct wait_info { struct thread *thread; void *func; void *overlapped; }; struct named_pipe; struct pipe_server { struct object obj; struct fd *fd; enum pipe_state state; struct pipe_client *client; struct named_pipe *pipe; struct pipe_server *next; struct pipe_server *prev; struct timeout_user *flush_poll; struct event *event; struct wait_info wait; }; struct pipe_client { struct object obj; struct fd *fd; struct pipe_server *server; struct wait_info wait; }; struct connect_wait { struct wait_info wait; struct connect_wait *next; }; struct named_pipe { struct object obj; /* object header */ unsigned int pipemode; unsigned int maxinstances; unsigned int outsize; unsigned int insize; unsigned int timeout; unsigned int instances; struct pipe_server *servers; struct connect_wait *connect_waiters; }; static void named_pipe_dump( struct object *obj, int verbose ); static void named_pipe_destroy( struct object *obj ); static const struct object_ops named_pipe_ops = { sizeof(struct named_pipe), /* size */ named_pipe_dump, /* dump */ no_add_queue, /* add_queue */ NULL, /* remove_queue */ NULL, /* signaled */ NULL, /* satisfied */ no_get_fd, /* get_fd */ named_pipe_destroy /* destroy */ }; /* common to clients and servers */ static int pipe_end_get_poll_events( struct fd *fd ); static int pipe_end_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags ); /* server end functions */ static void pipe_server_dump( struct object *obj, int verbose ); static struct fd *pipe_server_get_fd( struct object *obj ); static void pipe_server_destroy( struct object *obj); static int pipe_server_flush( struct fd *fd, struct event **event ); static const struct object_ops pipe_server_ops = { sizeof(struct pipe_server), /* size */ pipe_server_dump, /* dump */ default_fd_add_queue, /* add_queue */ default_fd_remove_queue, /* remove_queue */ default_fd_signaled, /* signaled */ no_satisfied, /* satisfied */ pipe_server_get_fd, /* get_fd */ pipe_server_destroy /* destroy */ }; static const struct fd_ops pipe_server_fd_ops = { pipe_end_get_poll_events, /* get_poll_events */ default_poll_event, /* poll_event */ pipe_server_flush, /* flush */ pipe_end_get_info, /* get_file_info */ no_queue_async /* queue_async */ }; /* client end functions */ static void pipe_client_dump( struct object *obj, int verbose ); static struct fd *pipe_client_get_fd( struct object *obj ); static void pipe_client_destroy( struct object *obj ); static int pipe_client_flush( struct fd *fd, struct event **event ); static const struct object_ops pipe_client_ops = { sizeof(struct pipe_client), /* size */ pipe_client_dump, /* dump */ default_fd_add_queue, /* add_queue */ default_fd_remove_queue, /* remove_queue */ default_fd_signaled, /* signaled */ no_satisfied, /* satisfied */ pipe_client_get_fd, /* get_fd */ pipe_client_destroy /* destroy */ }; static const struct fd_ops pipe_client_fd_ops = { pipe_end_get_poll_events, /* get_poll_events */ default_poll_event, /* poll_event */ pipe_client_flush, /* flush */ pipe_end_get_info, /* get_file_info */ no_queue_async /* queue_async */ }; static void named_pipe_dump( struct object *obj, int verbose ) { struct named_pipe *pipe = (struct named_pipe *) obj; assert( obj->ops == &named_pipe_ops ); fprintf( stderr, "named pipe %p\n" ,pipe); } static void pipe_server_dump( struct object *obj, int verbose ) { struct pipe_server *server = (struct pipe_server *) obj; assert( obj->ops == &pipe_server_ops ); fprintf( stderr, "named pipe server %p (state %d)\n", server, server->state ); } static void pipe_client_dump( struct object *obj, int verbose ) { struct pipe_client *client = (struct pipe_client *) obj; assert( obj->ops == &pipe_server_ops ); fprintf( stderr, "named pipe client %p (server state %d)\n", client, client->server->state ); } static void named_pipe_destroy( struct object *obj) { struct named_pipe *pipe = (struct named_pipe *) obj; assert( !pipe->servers ); assert( !pipe->instances ); } static void notify_waiter( struct wait_info *wait, unsigned int status ) { if( wait->thread && wait->func && wait->overlapped ) { /* queue a system APC, to notify a waiting thread */ thread_queue_apc( wait->thread, NULL, wait->func, APC_ASYNC, 1, wait->overlapped, (void *)status, NULL ); } if( wait->thread ) release_object( wait->thread ); wait->thread = NULL; } static void set_waiter( struct wait_info *wait, void *func, void *ov ) { wait->thread = (struct thread *) grab_object( current ); wait->func = func; wait->overlapped = ov; } static void notify_connect_waiters( struct named_pipe *pipe ) { struct connect_wait *cw, **x = &pipe->connect_waiters; while( *x ) { cw = *x; notify_waiter( &cw->wait, STATUS_SUCCESS ); release_object( pipe ); *x = cw->next; free( cw ); } } static void queue_connect_waiter( struct named_pipe *pipe, void *func, void *overlapped ) { struct connect_wait *waiter; waiter = mem_alloc( sizeof(*waiter) ); if( waiter ) { set_waiter( &waiter->wait, func, overlapped ); waiter->next = pipe->connect_waiters; pipe->connect_waiters = waiter; grab_object( pipe ); } } static struct fd *pipe_client_get_fd( struct object *obj ) { struct pipe_client *client = (struct pipe_client *) obj; if( client->fd ) return (struct fd *) grab_object( client->fd ); set_error( STATUS_PIPE_DISCONNECTED ); return NULL; } static struct fd *pipe_server_get_fd( struct object *obj ) { struct pipe_server *server = (struct pipe_server *) obj; switch(server->state) { case ps_connected_server: case ps_wait_disconnect: assert( server->fd ); return (struct fd *) grab_object( server->fd ); case ps_wait_open: case ps_idle_server: set_error( STATUS_PIPE_LISTENING ); break; case ps_disconnected_server: case ps_wait_connect: set_error( STATUS_PIPE_DISCONNECTED ); break; default: assert( 0 ); } return NULL; } static void notify_empty( struct pipe_server *server ) { if( !server->flush_poll ) return; assert( server->state == ps_connected_server ); assert( server->event ); remove_timeout_user( server->flush_poll ); server->flush_poll = NULL; set_event( server->event ); release_object( server->event ); server->event = NULL; } static void do_disconnect( struct pipe_server *server ) { /* we may only have a server fd, if the client disconnected */ if( server->client ) { assert( server->client->server == server ); assert( server->client->fd ); release_object( server->client->fd ); server->client->fd = NULL; } assert( server->fd ); release_object( server->fd ); server->fd = NULL; } static void pipe_server_destroy( struct object *obj) { struct pipe_server *server = (struct pipe_server *)obj; assert( obj->ops == &pipe_server_ops ); if( server->fd ) { notify_empty( server ); do_disconnect( server ); } if( server->client ) { server->client->server = NULL; server->client = NULL; } notify_waiter( &server->wait, STATUS_HANDLES_CLOSED ); assert( server->pipe->instances ); server->pipe->instances--; /* remove server from pipe's server list */ if( server->next ) server->next->prev = server->prev; if( server->prev ) server->prev->next = server->next; else server->pipe->servers = server->next; release_object( server->pipe ); } static void pipe_client_destroy( struct object *obj) { struct pipe_client *client = (struct pipe_client *)obj; struct pipe_server *server = client->server; assert( obj->ops == &pipe_client_ops ); notify_waiter( &client->wait, STATUS_HANDLES_CLOSED ); if( server ) { notify_empty( server ); switch( server->state ) { case ps_connected_server: /* Don't destroy the server's fd here as we can't do a successful flush without it. */ server->state = ps_wait_disconnect; release_object( client->fd ); client->fd = NULL; break; case ps_disconnected_server: server->state = ps_wait_connect; break; default: assert( 0 ); } assert( server->client ); server->client = NULL; client->server = NULL; } assert( !client->fd ); } static int pipe_end_get_poll_events( struct fd *fd ) { return POLLIN | POLLOUT; /* FIXME */ } static int pipe_data_remaining( struct pipe_server *server ) { struct pollfd pfd; int fd; assert( server->client ); fd = get_unix_fd( server->client->fd ); if( fd < 0 ) return 0; pfd.fd = fd; pfd.events = POLLIN; pfd.revents = 0; if( 0 > poll( &pfd, 1, 0 ) ) return 0; return pfd.revents&POLLIN; } static void check_flushed( void *arg ) { struct pipe_server *server = (struct pipe_server*) arg; assert( server->event ); if( pipe_data_remaining( server ) ) { struct timeval tv; gettimeofday( &tv, 0 ); add_timeout( &tv, 100 ); server->flush_poll = add_timeout_user( &tv, check_flushed, server ); } else notify_empty( server ); } static int pipe_server_flush( struct fd *fd, struct event **event ) { struct pipe_server *server = get_fd_user( fd ); if( !server ) return 0; if( server->state != ps_connected_server ) return 0; /* FIXME: if multiple threads flush the same pipe, maybe should create a list of processes to notify */ if( server->flush_poll ) return 0; if( pipe_data_remaining( server ) ) { struct timeval tv; /* this kind of sux - there's no unix way to be alerted when a pipe becomes empty */ server->event = create_event( NULL, 0, 0, 0 ); if( !server->event ) return 0; gettimeofday( &tv, 0 ); add_timeout( &tv, 100 ); server->flush_poll = add_timeout_user( &tv, check_flushed, server ); *event = server->event; } return 0; } static int pipe_client_flush( struct fd *fd, struct event **event ) { /* FIXME: what do we have to do for this? */ return 0; } static int pipe_end_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags ) { if (reply) { reply->type = FILE_TYPE_PIPE; reply->attr = 0; reply->access_time = 0; reply->write_time = 0; reply->size_high = 0; reply->size_low = 0; reply->links = 0; reply->index_high = 0; reply->index_low = 0; reply->serial = 0; } *flags = 0; return FD_TYPE_DEFAULT; } static struct named_pipe *create_named_pipe( const WCHAR *name, size_t len ) { struct named_pipe *pipe; pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len ); if( pipe ) { if( get_error() != STATUS_OBJECT_NAME_COLLISION ) { /* initialize it if it didn't already exist */ pipe->servers = 0; pipe->instances = 0; pipe->connect_waiters = NULL; } } return pipe; } static struct named_pipe *open_named_pipe( const WCHAR *name, size_t len ) { struct object *obj; if ((obj = find_object( sync_namespace, name, len ))) { if (obj->ops == &named_pipe_ops) return (struct named_pipe *)obj; release_object( obj ); set_error( STATUS_OBJECT_TYPE_MISMATCH ); } else set_error( STATUS_OBJECT_NAME_NOT_FOUND ); return NULL; } static struct pipe_server *get_pipe_server_obj( struct process *process, obj_handle_t handle, unsigned int access ) { struct object *obj; obj = get_handle_obj( process, handle, access, &pipe_server_ops ); return (struct pipe_server *) obj; } static struct pipe_server *create_pipe_server( struct named_pipe *pipe ) { struct pipe_server *server; server = alloc_object( &pipe_server_ops ); if( !server ) return NULL; server->fd = NULL; server->pipe = pipe; server->state = ps_none; server->client = NULL; server->flush_poll = NULL; server->wait.thread = NULL; /* add to list of pipe servers */ if ((server->next = pipe->servers)) server->next->prev = server; server->prev = NULL; pipe->servers = server; grab_object( pipe ); return server; } static struct pipe_client *create_pipe_client( struct pipe_server *server ) { struct pipe_client *client; client = alloc_object( &pipe_client_ops ); if( !client ) return NULL; client->fd = NULL; client->server = server; client->wait.thread = NULL; return client; } static struct pipe_server *find_server( struct named_pipe *pipe, enum pipe_state state ) { struct pipe_server *x; for( x = pipe->servers; x; x = x->next ) if( x->state == state ) break; if( !x ) return NULL; return (struct pipe_server *) grab_object( x ); } DECL_HANDLER(create_named_pipe) { struct named_pipe *pipe; struct pipe_server *server; reply->handle = 0; pipe = create_named_pipe( get_req_data(), get_req_data_size() ); if( !pipe ) return; if( get_error() != STATUS_OBJECT_NAME_COLLISION ) { pipe->insize = req->insize; pipe->outsize = req->outsize; pipe->maxinstances = req->maxinstances; pipe->timeout = req->timeout; pipe->pipemode = req->pipemode; } else { set_error( 0 ); /* clear the name collision */ if( pipe->maxinstances <= pipe->instances ) { set_error( STATUS_PIPE_BUSY ); release_object( pipe ); return; } if( ( pipe->maxinstances != req->maxinstances ) || ( pipe->timeout != req->timeout ) || ( pipe->pipemode != req->pipemode ) ) { set_error( STATUS_ACCESS_DENIED ); release_object( pipe ); return; } } server = create_pipe_server( pipe ); if(server) { server->state = ps_idle_server; reply->handle = alloc_handle( current->process, server, GENERIC_READ|GENERIC_WRITE, 0 ); server->pipe->instances++; release_object( server ); } release_object( pipe ); } DECL_HANDLER(open_named_pipe) { struct pipe_server *server; struct pipe_client *client; struct named_pipe *pipe; int fds[2]; reply->handle = 0; pipe = open_named_pipe( get_req_data(), get_req_data_size() ); if ( !pipe ) { set_error( STATUS_NO_SUCH_FILE ); return; } for( server = pipe->servers; server; server = server->next ) if( ( server->state==ps_idle_server ) || ( server->state==ps_wait_open ) ) break; release_object( pipe ); if ( !server ) { set_error( STATUS_PIPE_NOT_AVAILABLE ); return; } client = create_pipe_client( server ); if( client ) { if( !socketpair( PF_UNIX, SOCK_STREAM, 0, fds ) ) { assert( !client->fd ); assert( !server->fd ); client->fd = create_anonymous_fd( &pipe_server_fd_ops, fds[1], &client->obj ); server->fd = create_anonymous_fd( &pipe_server_fd_ops, fds[0], &server->obj ); if (client->fd && server->fd) { if( server->state == ps_wait_open ) notify_waiter( &server->wait, STATUS_SUCCESS ); assert( !server->wait.thread ); server->state = ps_connected_server; server->client = client; client->server = server; reply->handle = alloc_handle( current->process, client, req->access, req->inherit ); } } else file_set_error(); release_object( client ); } } DECL_HANDLER(connect_named_pipe) { struct pipe_server *server; server = get_pipe_server_obj(current->process, req->handle, 0); if(!server) return; switch( server->state ) { case ps_idle_server: case ps_wait_connect: assert( !server->fd ); server->state = ps_wait_open; set_waiter( &server->wait, req->func, req->overlapped ); notify_connect_waiters( server->pipe ); break; case ps_connected_server: assert( server->fd ); set_error( STATUS_PIPE_CONNECTED ); break; case ps_disconnected_server: set_error( STATUS_PIPE_BUSY ); break; case ps_wait_disconnect: set_error( STATUS_NO_DATA_DETECTED ); break; default: set_error( STATUS_INVALID_HANDLE ); break; } release_object(server); } DECL_HANDLER(wait_named_pipe) { struct named_pipe *pipe; struct pipe_server *server; if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() ))) { set_error( STATUS_PIPE_NOT_AVAILABLE ); return; } server = find_server( pipe, ps_wait_open ); if( server ) { /* there's already a server waiting for a client to connect */ struct wait_info wait; set_waiter( &wait, req->func, req->overlapped ); notify_waiter( &wait, STATUS_SUCCESS ); release_object( server ); } else queue_connect_waiter( pipe, req->func, req->overlapped ); release_object( pipe ); } DECL_HANDLER(disconnect_named_pipe) { struct pipe_server *server; reply->fd = -1; server = get_pipe_server_obj( current->process, req->handle, 0 ); if( !server ) return; switch( server->state ) { case ps_connected_server: assert( server->fd ); assert( server->client ); assert( server->client->fd ); notify_empty( server ); notify_waiter( &server->client->wait, STATUS_PIPE_DISCONNECTED ); /* Dump the client and server fds, but keep the pointers around - client loses all waiting data */ server->state = ps_disconnected_server; do_disconnect( server ); reply->fd = flush_cached_fd( current->process, req->handle ); break; case ps_wait_disconnect: assert( !server->client ); assert( server->fd ); do_disconnect( server ); server->state = ps_wait_connect; reply->fd = flush_cached_fd( current->process, req->handle ); break; default: set_error( STATUS_PIPE_DISCONNECTED ); } release_object( server ); } DECL_HANDLER(get_named_pipe_info) { struct pipe_server *server; server = get_pipe_server_obj( current->process, req->handle, 0 ); if(!server) return; reply->flags = server->pipe->pipemode; reply->maxinstances = server->pipe->maxinstances; reply->insize = server->pipe->insize; reply->outsize = server->pipe->outsize; release_object(server); }