Commit c2feafb9 authored by Bernhard Loos's avatar Bernhard Loos Committed by Alexandre Julliard

rpcrt4: Switch to non-overlapped named pipe operations and use a thread for the…

rpcrt4: Switch to non-overlapped named pipe operations and use a thread for the async listen operation.
parent d905ba5a
...@@ -111,8 +111,7 @@ typedef struct _RpcConnection_np ...@@ -111,8 +111,7 @@ typedef struct _RpcConnection_np
{ {
RpcConnection common; RpcConnection common;
HANDLE pipe; HANDLE pipe;
OVERLAPPED read_ovl; HANDLE listen_thread;
OVERLAPPED write_ovl;
BOOL listening; BOOL listening;
} RpcConnection_np; } RpcConnection_np;
...@@ -122,25 +121,21 @@ static RpcConnection *rpcrt4_conn_np_alloc(void) ...@@ -122,25 +121,21 @@ static RpcConnection *rpcrt4_conn_np_alloc(void)
return &npc->common; return &npc->common;
} }
static RPC_STATUS rpcrt4_conn_listen_pipe(RpcConnection_np *npc) static DWORD CALLBACK listen_thread(void *arg)
{ {
if (npc->listening) RpcConnection_np *npc = arg;
return RPC_S_OK;
npc->listening = TRUE;
for (;;) for (;;)
{ {
if (ConnectNamedPipe(npc->pipe, &npc->read_ovl)) if (ConnectNamedPipe(npc->pipe, NULL))
return RPC_S_OK; return RPC_S_OK;
switch(GetLastError()) switch(GetLastError())
{ {
case ERROR_PIPE_CONNECTED: case ERROR_PIPE_CONNECTED:
SetEvent(npc->read_ovl.hEvent);
return RPC_S_OK;
case ERROR_IO_PENDING:
/* will be completed in rpcrt4_protseq_np_wait_for_new_connection */
return RPC_S_OK; return RPC_S_OK;
case ERROR_HANDLES_CLOSED:
/* connection closed during listen */
return RPC_S_NO_CONTEXT_AVAILABLE;
case ERROR_NO_DATA_DETECTED: case ERROR_NO_DATA_DETECTED:
/* client has disconnected, retry */ /* client has disconnected, retry */
DisconnectNamedPipe( npc->pipe ); DisconnectNamedPipe( npc->pipe );
...@@ -153,12 +148,28 @@ static RPC_STATUS rpcrt4_conn_listen_pipe(RpcConnection_np *npc) ...@@ -153,12 +148,28 @@ static RPC_STATUS rpcrt4_conn_listen_pipe(RpcConnection_np *npc)
} }
} }
static RPC_STATUS rpcrt4_conn_listen_pipe(RpcConnection_np *npc)
{
if (npc->listening)
return RPC_S_OK;
npc->listening = TRUE;
npc->listen_thread = CreateThread(NULL, 0, listen_thread, npc, 0, NULL);
if (!npc->listen_thread)
{
npc->listening = FALSE;
ERR("Couldn't create listen thread (error was %d)\n", GetLastError());
return RPC_S_OUT_OF_RESOURCES;
}
return RPC_S_OK;
}
static RPC_STATUS rpcrt4_conn_create_pipe(RpcConnection *Connection, LPCSTR pname) static RPC_STATUS rpcrt4_conn_create_pipe(RpcConnection *Connection, LPCSTR pname)
{ {
RpcConnection_np *npc = (RpcConnection_np *) Connection; RpcConnection_np *npc = (RpcConnection_np *) Connection;
TRACE("listening on %s\n", pname); TRACE("listening on %s\n", pname);
npc->pipe = CreateNamedPipeA(pname, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, npc->pipe = CreateNamedPipeA(pname, PIPE_ACCESS_DUPLEX,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE,
PIPE_UNLIMITED_INSTANCES, PIPE_UNLIMITED_INSTANCES,
RPC_MAX_PACKET_SIZE, RPC_MAX_PACKET_SIZE, 5000, NULL); RPC_MAX_PACKET_SIZE, RPC_MAX_PACKET_SIZE, 5000, NULL);
...@@ -170,9 +181,6 @@ static RPC_STATUS rpcrt4_conn_create_pipe(RpcConnection *Connection, LPCSTR pnam ...@@ -170,9 +181,6 @@ static RPC_STATUS rpcrt4_conn_create_pipe(RpcConnection *Connection, LPCSTR pnam
return RPC_S_CANT_CREATE_ENDPOINT; return RPC_S_CANT_CREATE_ENDPOINT;
} }
npc->read_ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
npc->write_ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
/* Note: we don't call ConnectNamedPipe here because it must be done in the /* Note: we don't call ConnectNamedPipe here because it must be done in the
* server thread as the thread must be alertable */ * server thread as the thread must be alertable */
return RPC_S_OK; return RPC_S_OK;
...@@ -231,8 +239,6 @@ static RPC_STATUS rpcrt4_conn_open_pipe(RpcConnection *Connection, LPCSTR pname, ...@@ -231,8 +239,6 @@ static RPC_STATUS rpcrt4_conn_open_pipe(RpcConnection *Connection, LPCSTR pname,
/* pipe is connected; change to message-read mode. */ /* pipe is connected; change to message-read mode. */
dwMode = PIPE_READMODE_MESSAGE; dwMode = PIPE_READMODE_MESSAGE;
SetNamedPipeHandleState(pipe, &dwMode, NULL, NULL); SetNamedPipeHandleState(pipe, &dwMode, NULL, NULL);
npc->read_ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
npc->write_ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
npc->pipe = pipe; npc->pipe = pipe;
return RPC_S_OK; return RPC_S_OK;
...@@ -360,11 +366,9 @@ static void rpcrt4_conn_np_handoff(RpcConnection_np *old_npc, RpcConnection_np * ...@@ -360,11 +366,9 @@ static void rpcrt4_conn_np_handoff(RpcConnection_np *old_npc, RpcConnection_np *
* to the child, then reopen the server binding to continue listening */ * to the child, then reopen the server binding to continue listening */
new_npc->pipe = old_npc->pipe; new_npc->pipe = old_npc->pipe;
new_npc->read_ovl = old_npc->read_ovl; new_npc->listen_thread = old_npc->listen_thread;
new_npc->write_ovl = old_npc->write_ovl;
old_npc->pipe = 0; old_npc->pipe = 0;
memset(&old_npc->read_ovl, 0, sizeof(old_npc->read_ovl)); old_npc->listen_thread = 0;
memset(&old_npc->write_ovl, 0, sizeof(old_npc->write_ovl));
old_npc->listening = FALSE; old_npc->listening = FALSE;
} }
...@@ -413,9 +417,7 @@ static int rpcrt4_conn_np_read(RpcConnection *Connection, ...@@ -413,9 +417,7 @@ static int rpcrt4_conn_np_read(RpcConnection *Connection,
while (bytes_left) while (bytes_left)
{ {
DWORD bytes_read; DWORD bytes_read;
ret = ReadFile(npc->pipe, buf, bytes_left, &bytes_read, &npc->read_ovl); ret = ReadFile(npc->pipe, buf, bytes_left, &bytes_read, NULL);
if (!ret && GetLastError() == ERROR_IO_PENDING)
ret = GetOverlappedResult(npc->pipe, &npc->read_ovl, &bytes_read, TRUE);
if (!ret && GetLastError() == ERROR_MORE_DATA) if (!ret && GetLastError() == ERROR_MORE_DATA)
ret = TRUE; ret = TRUE;
if (!ret || !bytes_read) if (!ret || !bytes_read)
...@@ -437,9 +439,7 @@ static int rpcrt4_conn_np_write(RpcConnection *Connection, ...@@ -437,9 +439,7 @@ static int rpcrt4_conn_np_write(RpcConnection *Connection,
while (bytes_left) while (bytes_left)
{ {
DWORD bytes_written; DWORD bytes_written;
ret = WriteFile(npc->pipe, buf, bytes_left, &bytes_written, &npc->write_ovl); ret = WriteFile(npc->pipe, buf, bytes_left, &bytes_written, NULL);
if (!ret && GetLastError() == ERROR_IO_PENDING)
ret = GetOverlappedResult(npc->pipe, &npc->write_ovl, &bytes_written, TRUE);
if (!ret || !bytes_written) if (!ret || !bytes_written)
break; break;
bytes_left -= bytes_written; bytes_left -= bytes_written;
...@@ -456,13 +456,9 @@ static int rpcrt4_conn_np_close(RpcConnection *Connection) ...@@ -456,13 +456,9 @@ static int rpcrt4_conn_np_close(RpcConnection *Connection)
CloseHandle(npc->pipe); CloseHandle(npc->pipe);
npc->pipe = 0; npc->pipe = 0;
} }
if (npc->read_ovl.hEvent) { if (npc->listen_thread) {
CloseHandle(npc->read_ovl.hEvent); CloseHandle(npc->listen_thread);
npc->read_ovl.hEvent = 0; npc->listen_thread = 0;
}
if (npc->write_ovl.hEvent) {
CloseHandle(npc->write_ovl.hEvent);
npc->write_ovl.hEvent = 0;
} }
return 0; return 0;
} }
...@@ -666,7 +662,7 @@ static void *rpcrt4_protseq_np_get_wait_array(RpcServerProtseq *protseq, void *p ...@@ -666,7 +662,7 @@ static void *rpcrt4_protseq_np_get_wait_array(RpcServerProtseq *protseq, void *p
conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common); conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common);
while (conn) { while (conn) {
rpcrt4_conn_listen_pipe(conn); rpcrt4_conn_listen_pipe(conn);
if (conn->read_ovl.hEvent) if (conn->listen_thread)
(*count)++; (*count)++;
conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common); conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common);
} }
...@@ -687,7 +683,7 @@ static void *rpcrt4_protseq_np_get_wait_array(RpcServerProtseq *protseq, void *p ...@@ -687,7 +683,7 @@ static void *rpcrt4_protseq_np_get_wait_array(RpcServerProtseq *protseq, void *p
*count = 1; *count = 1;
conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common); conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common);
while (conn) { while (conn) {
if ((objs[*count] = conn->read_ovl.hEvent)) if ((objs[*count] = conn->listen_thread))
(*count)++; (*count)++;
conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common); conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common);
} }
...@@ -734,12 +730,18 @@ static int rpcrt4_protseq_np_wait_for_new_connection(RpcServerProtseq *protseq, ...@@ -734,12 +730,18 @@ static int rpcrt4_protseq_np_wait_for_new_connection(RpcServerProtseq *protseq,
EnterCriticalSection(&protseq->cs); EnterCriticalSection(&protseq->cs);
conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common); conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common);
while (conn) { while (conn) {
if (b_handle == conn->read_ovl.hEvent) break; if (b_handle == conn->listen_thread) break;
conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common); conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common);
} }
cconn = NULL; cconn = NULL;
if (conn) if (conn)
RPCRT4_SpawnConnection(&cconn, &conn->common); {
DWORD exit_code;
if (GetExitCodeThread(conn->listen_thread, &exit_code) && exit_code == RPC_S_OK)
RPCRT4_SpawnConnection(&cconn, &conn->common);
CloseHandle(conn->listen_thread);
conn->listen_thread = 0;
}
else else
ERR("failed to locate connection for handle %p\n", b_handle); ERR("failed to locate connection for handle %p\n", b_handle);
LeaveCriticalSection(&protseq->cs); LeaveCriticalSection(&protseq->cs);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment