Commit e9064241 authored by Robert Shearman's avatar Robert Shearman Committed by Alexandre Julliard

rpcrt4: Create a server thread for each protseq.

parent 12c19620
......@@ -93,12 +93,6 @@ static BOOL std_listen;
static LONG manual_listen_count;
/* total listeners including auto listeners */
static LONG listen_count;
/* set on change of configuration (e.g. listening on new protseq) */
static HANDLE mgr_event;
/* mutex for ensuring only one thread can change state at a time */
static HANDLE mgr_mutex;
/* set when server thread has finished opening connections */
static HANDLE server_ready_event;
static UUID uuid_nil;
......@@ -355,30 +349,28 @@ static void RPCRT4_new_client(RpcConnection* conn)
static DWORD CALLBACK RPCRT4_server_thread(LPVOID the_arg)
{
HANDLE m_event = mgr_event, b_handle;
HANDLE m_event, b_handle;
HANDLE *objs = NULL;
DWORD count, res;
RpcServerProtseq* cps;
RpcServerProtseq* cps = the_arg;
RpcConnection* conn;
RpcConnection* cconn;
BOOL set_ready_event = FALSE;
TRACE("(the_arg == ^%p)\n", the_arg);
m_event = cps->mgr_event;
for (;;) {
EnterCriticalSection(&server_cs);
/* open and count connections */
count = 1;
cps = protseqs;
while (cps) {
conn = cps->conn;
while (conn) {
RPCRT4_OpenConnection(conn);
if (rpcrt4_conn_get_wait_object(conn))
count++;
conn = conn->Next;
}
cps = cps->Next;
conn = cps->conn;
while (conn) {
RPCRT4_OpenConnection(conn);
if (rpcrt4_conn_get_wait_object(conn))
count++;
conn = conn->Next;
}
/* make array of connections */
if (objs)
......@@ -388,22 +380,18 @@ static DWORD CALLBACK RPCRT4_server_thread(LPVOID the_arg)
objs[0] = m_event;
count = 1;
cps = protseqs;
while (cps) {
conn = cps->conn;
while (conn) {
if ((objs[count] = rpcrt4_conn_get_wait_object(conn)))
count++;
conn = conn->Next;
}
cps = cps->Next;
conn = cps->conn;
while (conn) {
if ((objs[count] = rpcrt4_conn_get_wait_object(conn)))
count++;
conn = conn->Next;
}
LeaveCriticalSection(&server_cs);
if (set_ready_event)
{
/* signal to function that changed state that we are now sync'ed */
SetEvent(server_ready_event);
SetEvent(cps->server_ready_event);
set_ready_event = FALSE;
}
......@@ -412,7 +400,7 @@ static DWORD CALLBACK RPCRT4_server_thread(LPVOID the_arg)
if (res == WAIT_OBJECT_0) {
if (!std_listen)
{
SetEvent(server_ready_event);
SetEvent(cps->server_ready_event);
break;
}
set_ready_event = TRUE;
......@@ -424,16 +412,10 @@ static DWORD CALLBACK RPCRT4_server_thread(LPVOID the_arg)
b_handle = objs[res - WAIT_OBJECT_0];
/* find which connection got a RPC */
EnterCriticalSection(&server_cs);
conn = NULL;
cps = protseqs;
while (cps) {
conn = cps->conn;
while (conn) {
if (b_handle == rpcrt4_conn_get_wait_object(conn)) break;
conn = conn->Next;
}
if (conn) break;
cps = cps->Next;
conn = cps->conn;
while (conn) {
if (b_handle == rpcrt4_conn_get_wait_object(conn)) break;
conn = conn->Next;
}
cconn = NULL;
if (conn)
......@@ -447,14 +429,10 @@ static DWORD CALLBACK RPCRT4_server_thread(LPVOID the_arg)
HeapFree(GetProcessHeap(), 0, objs);
EnterCriticalSection(&server_cs);
/* close connections */
cps = protseqs;
while (cps) {
conn = cps->conn;
while (conn) {
RPCRT4_CloseConnection(conn);
conn = conn->Next;
}
cps = cps->Next;
conn = cps->conn;
while (conn) {
RPCRT4_CloseConnection(conn);
conn = conn->Next;
}
LeaveCriticalSection(&server_cs);
return 0;
......@@ -462,23 +440,49 @@ static DWORD CALLBACK RPCRT4_server_thread(LPVOID the_arg)
/* tells the server thread that the state has changed and waits for it to
* make the changes */
static void RPCRT4_sync_with_server_thread(void)
static void RPCRT4_sync_with_server_thread(RpcServerProtseq *ps)
{
/* make sure we are the only thread sync'ing the server state, otherwise
* there is a race with the server thread setting an older state and setting
* the server_ready_event when the new state hasn't yet been applied */
WaitForSingleObject(mgr_mutex, INFINITE);
WaitForSingleObject(ps->mgr_mutex, INFINITE);
SetEvent(mgr_event);
SetEvent(ps->mgr_event);
/* wait for server thread to make the requested changes before returning */
WaitForSingleObject(server_ready_event, INFINITE);
WaitForSingleObject(ps->server_ready_event, INFINITE);
ReleaseMutex(mgr_mutex);
ReleaseMutex(ps->mgr_mutex);
}
static RPC_STATUS RPCRT4_start_listen_protseq(RpcServerProtseq *ps, BOOL auto_listen)
{
RPC_STATUS status = RPC_S_OK;
HANDLE server_thread;
EnterCriticalSection(&listen_cs);
if (ps->is_listening) goto done;
if (!ps->mgr_mutex) ps->mgr_mutex = CreateMutexW(NULL, FALSE, NULL);
if (!ps->mgr_event) ps->mgr_event = CreateEventW(NULL, FALSE, FALSE, NULL);
if (!ps->server_ready_event) ps->server_ready_event = CreateEventW(NULL, FALSE, FALSE, NULL);
server_thread = CreateThread(NULL, 0, RPCRT4_server_thread, ps, 0, NULL);
if (!server_thread)
{
status = RPC_S_OUT_OF_RESOURCES;
goto done;
}
ps->is_listening = TRUE;
CloseHandle(server_thread);
done:
LeaveCriticalSection(&listen_cs);
return status;
}
static RPC_STATUS RPCRT4_start_listen(BOOL auto_listen)
{
RPC_STATUS status = RPC_S_ALREADY_LISTENING;
RpcServerProtseq *cps;
TRACE("\n");
......@@ -486,19 +490,26 @@ static RPC_STATUS RPCRT4_start_listen(BOOL auto_listen)
if (auto_listen || (manual_listen_count++ == 0))
{
status = RPC_S_OK;
if (++listen_count == 1) {
HANDLE server_thread;
/* first listener creates server thread */
if (!mgr_mutex) mgr_mutex = CreateMutexW(NULL, FALSE, NULL);
if (!mgr_event) mgr_event = CreateEventW(NULL, FALSE, FALSE, NULL);
if (!server_ready_event) server_ready_event = CreateEventW(NULL, FALSE, FALSE, NULL);
if (++listen_count == 1)
std_listen = TRUE;
server_thread = CreateThread(NULL, 0, RPCRT4_server_thread, NULL, 0, NULL);
CloseHandle(server_thread);
}
}
LeaveCriticalSection(&listen_cs);
if (std_listen)
{
cps = protseqs;
while (cps && status == RPC_S_OK)
{
status = RPCRT4_start_listen_protseq(cps, TRUE);
/* make sure server is actually listening on the interface before
* returning */
if (status == RPC_S_OK)
RPCRT4_sync_with_server_thread(cps);
cps = cps->Next;
}
}
return status;
}
......@@ -508,9 +519,16 @@ static void RPCRT4_stop_listen(BOOL auto_listen)
if (auto_listen || (--manual_listen_count == 0))
{
if (listen_count != 0 && --listen_count == 0) {
RpcServerProtseq *cps;
std_listen = FALSE;
LeaveCriticalSection(&listen_cs);
RPCRT4_sync_with_server_thread();
cps = protseqs;
while (cps) {
RPCRT4_sync_with_server_thread(cps);
cps = cps->Next;
}
return;
}
assert(listen_count >= 0);
......@@ -520,17 +538,26 @@ static void RPCRT4_stop_listen(BOOL auto_listen)
static RPC_STATUS RPCRT4_use_protseq(RpcServerProtseq* ps)
{
RPCRT4_CreateConnection(&ps->conn, TRUE, ps->Protseq, NULL, ps->Endpoint,
NULL, NULL, NULL);
RPC_STATUS status;
status = RPCRT4_CreateConnection(&ps->conn, TRUE, ps->Protseq, NULL,
ps->Endpoint, NULL, NULL, NULL);
if (status != RPC_S_OK)
return status;
EnterCriticalSection(&server_cs);
ps->Next = protseqs;
protseqs = ps;
LeaveCriticalSection(&server_cs);
if (std_listen) RPCRT4_sync_with_server_thread();
if (std_listen)
{
status = RPCRT4_start_listen_protseq(ps, FALSE);
if (status == RPC_S_OK)
RPCRT4_sync_with_server_thread(ps);
}
return RPC_S_OK;
return status;
}
/***********************************************************************
......@@ -758,13 +785,8 @@ RPC_STATUS WINAPI RpcServerRegisterIf2( RPC_IF_HANDLE IfSpec, UUID* MgrTypeUuid,
ifs = sif;
LeaveCriticalSection(&server_cs);
if (sif->Flags & RPC_IF_AUTOLISTEN) {
RPCRT4_start_listen(TRUE);
/* make sure server is actually listening on the interface before
* returning */
RPCRT4_sync_with_server_thread();
}
if (sif->Flags & RPC_IF_AUTOLISTEN)
RPCRT4_start_listen(TRUE);
return RPC_S_OK;
}
......@@ -882,7 +904,7 @@ RPC_STATUS WINAPI RpcServerRegisterAuthInfoW( RPC_WSTR ServerPrincName, unsigned
*/
RPC_STATUS WINAPI RpcServerListen( UINT MinimumCallThreads, UINT MaxCalls, UINT DontWait )
{
RPC_STATUS status;
RPC_STATUS status = RPC_S_OK;
TRACE("(%u,%u,%u)\n", MinimumCallThreads, MaxCalls, DontWait);
......@@ -891,9 +913,6 @@ RPC_STATUS WINAPI RpcServerListen( UINT MinimumCallThreads, UINT MaxCalls, UINT
status = RPCRT4_start_listen(FALSE);
if (status == RPC_S_OK)
RPCRT4_sync_with_server_thread();
if (DontWait || (status != RPC_S_OK)) return status;
return RpcMgmtWaitServerListen();
......
......@@ -30,6 +30,15 @@ typedef struct _RpcServerProtseq
LPSTR Endpoint;
UINT MaxCalls;
RpcConnection* conn;
/* is the server currently listening? */
BOOL is_listening;
/* set on change of configuration (e.g. listening on new protseq) */
HANDLE mgr_event;
/* mutex for ensuring only one thread can change state at a time */
HANDLE mgr_mutex;
/* set when server thread has finished opening connections */
HANDLE server_ready_event;
} RpcServerProtseq;
typedef struct _RpcServerInterface
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment