Commit 06c2982a authored by Rob Shearman's avatar Rob Shearman Committed by Alexandre Julliard

rpcrt4: Factor out non-Win32 compatible parts of the ncacn_ip_tcp client code to…

rpcrt4: Factor out non-Win32 compatible parts of the ncacn_ip_tcp client code to allow it to be compiled in.
parent e2aefee8
......@@ -40,6 +40,8 @@
# ifndef EAGAIN
# define EAGAIN WSAEWOULDBLOCK
# endif
# undef errno
# define errno WSAGetLastError()
#else
# include <errno.h>
# ifdef HAVE_UNISTD_H
......@@ -64,7 +66,14 @@
# ifdef HAVE_SYS_POLL_H
# include <sys/poll.h>
# endif
# ifdef HAVE_SYS_FILIO_H
# include <sys/filio.h>
# endif
# ifdef HAVE_SYS_IOCTL_H
# include <sys/ioctl.h>
# endif
# define closesocket close
# define ioctlsocket ioctl
#endif /* defined(__MINGW32__) || defined (_MSC_VER) */
#include "windef.h"
......@@ -899,8 +908,6 @@ static RPC_STATUS rpcrt4_ip_tcp_parse_top_of_tower(const unsigned char *tower_da
return RPC_S_OK;
}
#ifdef HAVE_SOCKETPAIR
typedef struct _RpcConnection_tcp
{
RpcConnection common;
......@@ -908,6 +915,97 @@ typedef struct _RpcConnection_tcp
int cancel_fds[2];
} RpcConnection_tcp;
#ifdef HAVE_SOCKETPAIR
static BOOL rpcrt4_sock_wait_init(RpcConnection_tcp *tcpc)
{
if (socketpair(PF_UNIX, SOCK_STREAM, 0, tcpc->cancel_fds) < 0)
{
ERR("socketpair() failed: %s\n", strerror(errno));
return FALSE;
}
return TRUE;
}
static BOOL rpcrt4_sock_wait_for_recv(RpcConnection_tcp *tcpc)
{
struct pollfd pfds[2];
pfds[0].fd = tcpc->sock;
pfds[0].events = POLLIN;
pfds[1].fd = tcpc->cancel_fds[0];
pfds[1].events = POLLIN;
if (poll(pfds, 2, -1 /* infinite */) == -1 && errno != EINTR)
{
ERR("poll() failed: %s\n", strerror(errno));
return FALSE;
}
if (pfds[1].revents & POLLIN) /* canceled */
{
char dummy;
read(pfds[1].fd, &dummy, sizeof(dummy));
return FALSE;
}
return TRUE;
}
static BOOL rpcrt4_sock_wait_for_send(RpcConnection_tcp *tcpc)
{
struct pollfd pfd;
pfd.fd = tcpc->sock;
pfd.events = POLLOUT;
if (poll(&pfd, 1, -1 /* infinite */) == -1 && errno != EINTR)
{
ERR("poll() failed: %s\n", strerror(errno));
return FALSE;
}
return TRUE;
}
static void rpcrt4_sock_wait_cancel(RpcConnection_tcp *tcpc)
{
char dummy = 1;
write(tcpc->cancel_fds[1], &dummy, 1);
}
static void rpcrt4_sock_wait_destroy(RpcConnection_tcp *tcpc)
{
close(tcpc->cancel_fds[0]);
close(tcpc->cancel_fds[1]);
}
#else /* HAVE_SOCKETPAIR */
static BOOL rpcrt4_sock_wait_init(RpcConnection_tcp *tcpc)
{
/* FIXME */
return FALSE;
}
static BOOL rpcrt4_sock_wait_for_recv(RpcConnection_tcp *tcpc)
{
/* FIXME */
return FALSE;
}
static BOOL rpcrt4_sock_wait_for_send(RpcConnection_tcp *tcpc)
{
/* FIXME */
return FALSE;
}
static void rpcrt4_sock_wait_cancel(RpcConnection_tcp *tcpc)
{
/* FIXME */
}
static void rpcrt4_sock_wait_destroy(RpcConnection_tcp *tcpc)
{
/* FIXME */
}
#endif
static RpcConnection *rpcrt4_conn_tcp_alloc(void)
{
RpcConnection_tcp *tcpc;
......@@ -915,9 +1013,8 @@ static RpcConnection *rpcrt4_conn_tcp_alloc(void)
if (tcpc == NULL)
return NULL;
tcpc->sock = -1;
if (socketpair(PF_UNIX, SOCK_STREAM, 0, tcpc->cancel_fds) < 0)
if (!rpcrt4_sock_wait_init(tcpc))
{
ERR("socketpair() failed: %s\n", strerror(errno));
HeapFree(GetProcessHeap(), 0, tcpc);
return NULL;
}
......@@ -958,6 +1055,7 @@ static RPC_STATUS rpcrt4_ncacn_ip_tcp_open(RpcConnection* Connection)
for (ai_cur = ai; ai_cur; ai_cur = ai_cur->ai_next)
{
int val;
u_long nonblocking;
if (ai_cur->ai_family != AF_INET && ai_cur->ai_family != AF_INET6)
{
......@@ -991,8 +1089,9 @@ static RPC_STATUS rpcrt4_ncacn_ip_tcp_open(RpcConnection* Connection)
/* RPC depends on having minimal latency so disable the Nagle algorithm */
val = 1;
setsockopt(sock, SOL_TCP, TCP_NODELAY, &val, sizeof(val));
fcntl(sock, F_SETFL, O_NONBLOCK); /* make socket nonblocking */
setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&val, sizeof(val));
nonblocking = 1;
ioctlsocket(sock, FIONBIO, &nonblocking);
tcpc->sock = sock;
......@@ -1006,6 +1105,8 @@ static RPC_STATUS rpcrt4_ncacn_ip_tcp_open(RpcConnection* Connection)
return RPC_S_SERVER_UNAVAILABLE;
}
#ifdef HAVE_SOCKETPAIR
static RPC_STATUS rpcrt4_protseq_ncacn_ip_tcp_open_endpoint(RpcServerProtseq *protseq, const char *endpoint)
{
RPC_STATUS status = RPC_S_CANT_CREATE_ENDPOINT;
......@@ -1044,6 +1145,7 @@ static RPC_STATUS rpcrt4_protseq_ncacn_ip_tcp_open_endpoint(RpcServerProtseq *pr
struct sockaddr_storage sa;
socklen_t sa_len;
char service[NI_MAXSERV];
u_long nonblocking;
if (ai_cur->ai_family != AF_INET && ai_cur->ai_family != AF_INET6)
{
......@@ -1121,7 +1223,8 @@ static RPC_STATUS rpcrt4_protseq_ncacn_ip_tcp_open_endpoint(RpcServerProtseq *pr
* race-condition (poll() says it is readable, connection drops,
* and accept() blocks until the next connection comes...)
*/
ret = fcntl(sock, F_SETFL, O_NONBLOCK);
nonblocking = 1;
ret = ioctlsocket(sock, FIONBIO, &nonblocking);
if (ret < 0)
{
WARN("couldn't make socket non-blocking, error %d\n", ret);
......@@ -1163,6 +1266,8 @@ static RPC_STATUS rpcrt4_protseq_ncacn_ip_tcp_open_endpoint(RpcServerProtseq *pr
return status;
}
#endif
static RPC_STATUS rpcrt4_conn_tcp_handoff(RpcConnection *old_conn, RpcConnection *new_conn)
{
int ret;
......@@ -1170,6 +1275,7 @@ static RPC_STATUS rpcrt4_conn_tcp_handoff(RpcConnection *old_conn, RpcConnection
socklen_t addrsize;
RpcConnection_tcp *server = (RpcConnection_tcp*) old_conn;
RpcConnection_tcp *client = (RpcConnection_tcp*) new_conn;
u_long nonblocking;
addrsize = sizeof(address);
ret = accept(server->sock, (struct sockaddr*) &address, &addrsize);
......@@ -1178,8 +1284,8 @@ static RPC_STATUS rpcrt4_conn_tcp_handoff(RpcConnection *old_conn, RpcConnection
ERR("Failed to accept a TCP connection: error %d\n", ret);
return RPC_S_OUT_OF_RESOURCES;
}
/* reset to blocking behaviour */
fcntl(ret, F_SETFL, 0);
nonblocking = 1;
ioctlsocket(ret, FIONBIO, &nonblocking);
client->sock = ret;
TRACE("Accepted a new TCP connection\n");
return RPC_S_OK;
......@@ -1204,23 +1310,9 @@ static int rpcrt4_conn_tcp_read(RpcConnection *Connection,
}
else
{
struct pollfd pfds[2];
pfds[0].fd = tcpc->sock;
pfds[0].events = POLLIN;
pfds[1].fd = tcpc->cancel_fds[0];
pfds[1].events = POLLIN;
if (poll(pfds, 2, -1 /* infinite */) == -1 && errno != EINTR)
{
ERR("poll() failed: %s\n", strerror(errno));
return -1;
}
if (pfds[1].revents & POLLIN) /* canceled */
{
char dummy;
read(pfds[1].fd, &dummy, sizeof(dummy));
if (!rpcrt4_sock_wait_for_recv(tcpc))
return -1;
}
}
} while (bytes_read != count);
TRACE("%d %p %u -> %d\n", tcpc->sock, buffer, count, bytes_read);
return bytes_read;
......@@ -1240,15 +1332,9 @@ static int rpcrt4_conn_tcp_write(RpcConnection *Connection,
return -1;
else
{
struct pollfd pfd;
pfd.fd = tcpc->sock;
pfd.events = POLLOUT;
if (poll(&pfd, 1, -1 /* infinite */) == -1 && errno != EINTR)
{
ERR("poll() failed: %s\n", strerror(errno));
if (!rpcrt4_sock_wait_for_send(tcpc))
return -1;
}
}
} while (bytes_written != count);
TRACE("%d %p %u -> %d\n", tcpc->sock, buffer, count, bytes_written);
return bytes_written;
......@@ -1263,44 +1349,25 @@ static int rpcrt4_conn_tcp_close(RpcConnection *Connection)
if (tcpc->sock != -1)
closesocket(tcpc->sock);
tcpc->sock = -1;
close(tcpc->cancel_fds[0]);
close(tcpc->cancel_fds[1]);
rpcrt4_sock_wait_destroy(tcpc);
return 0;
}
static void rpcrt4_conn_tcp_cancel_call(RpcConnection *Connection)
{
RpcConnection_tcp *tcpc = (RpcConnection_tcp *) Connection;
char dummy = 1;
TRACE("%p\n", Connection);
write(tcpc->cancel_fds[1], &dummy, 1);
rpcrt4_sock_wait_cancel(tcpc);
}
static int rpcrt4_conn_tcp_wait_for_incoming_data(RpcConnection *Connection)
{
RpcConnection_tcp *tcpc = (RpcConnection_tcp *) Connection;
struct pollfd pfds[2];
TRACE("%p\n", Connection);
pfds[0].fd = tcpc->sock;
pfds[0].events = POLLIN;
pfds[1].fd = tcpc->cancel_fds[0];
pfds[1].events = POLLIN;
if (poll(pfds, 2, -1 /* infinite */) == -1 && errno != EINTR)
{
ERR("poll() failed: %s\n", strerror(errno));
if (!rpcrt4_sock_wait_for_recv(tcpc))
return -1;
}
if (pfds[1].revents & POLLIN) /* canceled */
{
char dummy;
read(pfds[1].fd, &dummy, sizeof(dummy));
return -1;
}
return 0;
}
......@@ -1312,6 +1379,8 @@ static size_t rpcrt4_ncacn_ip_tcp_get_top_of_tower(unsigned char *tower_data,
EPM_PROTOCOL_TCP, endpoint);
}
#ifdef HAVE_SOCKETPAIR
typedef struct _RpcServerProtseq_sock
{
RpcServerProtseq common;
......@@ -1451,6 +1520,8 @@ static int rpcrt4_protseq_sock_wait_for_new_connection(RpcServerProtseq *protseq
return 1;
}
#endif /* HAVE_SOCKETPAIR */
static RPC_STATUS rpcrt4_ncacn_ip_tcp_parse_top_of_tower(const unsigned char *tower_data,
size_t tower_size,
char **networkaddr,
......@@ -1461,8 +1532,6 @@ static RPC_STATUS rpcrt4_ncacn_ip_tcp_parse_top_of_tower(const unsigned char *to
endpoint);
}
#endif /* HAVE_SOCKETPAIR */
/**** ncacn_http support ****/
/* 60 seconds is the period native uses */
......@@ -2431,7 +2500,6 @@ static const struct connection_ops conn_protseq_list[] = {
rpcrt4_ncalrpc_parse_top_of_tower,
NULL,
},
#ifdef HAVE_SOCKETPAIR
{ "ncacn_ip_tcp",
{ EPM_PROTOCOL_NCACN, EPM_PROTOCOL_TCP },
rpcrt4_conn_tcp_alloc,
......@@ -2446,7 +2514,6 @@ static const struct connection_ops conn_protseq_list[] = {
rpcrt4_ncacn_ip_tcp_parse_top_of_tower,
NULL,
},
#endif
{ "ncacn_http",
{ EPM_PROTOCOL_NCACN, EPM_PROTOCOL_HTTP },
rpcrt4_ncacn_http_alloc,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment