Commit 1437f8bd authored by Robert Shearman's avatar Robert Shearman Committed by Alexandre Julliard

rpcrt4: Use the thread pool in the RPC server for processing packets.

parent 1ddc722e
......@@ -53,7 +53,6 @@ WINE_DEFAULT_DEBUG_CHANNEL(rpc);
typedef struct _RpcPacket
{
struct _RpcPacket* next;
struct _RpcConnection* conn;
RpcPktHdr* hdr;
RPC_MESSAGE* msg;
......@@ -103,20 +102,9 @@ static HANDLE mgr_mutex;
/* set when server thread has finished opening connections */
static HANDLE server_ready_event;
static CRITICAL_SECTION spacket_cs;
static CRITICAL_SECTION_DEBUG spacket_cs_debug =
{
0, 0, &spacket_cs,
{ &spacket_cs_debug.ProcessLocksList, &spacket_cs_debug.ProcessLocksList },
0, 0, { (DWORD_PTR)(__FILE__ ": spacket_cs") }
};
static CRITICAL_SECTION spacket_cs = { &spacket_cs_debug, -1, 0, 0, 0, 0 };
static RpcPacket* spacket_head;
static RpcPacket* spacket_tail;
static HANDLE server_sem;
static LONG worker_count, worker_free, worker_tls;
static LONG worker_tls;
static UUID uuid_nil;
......@@ -165,34 +153,6 @@ static RpcServerInterface* RPCRT4_find_interface(UUID* object,
return cif;
}
static void RPCRT4_push_packet(RpcPacket* packet)
{
packet->next = NULL;
EnterCriticalSection(&spacket_cs);
if (spacket_tail) {
spacket_tail->next = packet;
spacket_tail = packet;
} else {
spacket_head = packet;
spacket_tail = packet;
}
LeaveCriticalSection(&spacket_cs);
}
static RpcPacket* RPCRT4_pop_packet(void)
{
RpcPacket* packet;
EnterCriticalSection(&spacket_cs);
packet = spacket_head;
if (packet) {
spacket_head = packet->next;
if (!spacket_head) spacket_tail = NULL;
}
LeaveCriticalSection(&spacket_cs);
if (packet) packet->next = NULL;
return packet;
}
typedef struct {
PRPC_MESSAGE msg;
void* buf;
......@@ -350,52 +310,12 @@ fail:
static DWORD CALLBACK RPCRT4_worker_thread(LPVOID the_arg)
{
DWORD obj;
RpcPacket* pkt;
for (;;) {
/* idle timeout after 5s */
obj = WaitForSingleObject(server_sem, 5000);
if (obj == WAIT_TIMEOUT) {
/* if another idle thread exist, self-destruct */
if (worker_free > 1) break;
continue;
}
pkt = RPCRT4_pop_packet();
if (!pkt) continue;
InterlockedDecrement(&worker_free);
for (;;) {
RPCRT4_process_packet(pkt->conn, pkt->hdr, pkt->msg);
HeapFree(GetProcessHeap(), 0, pkt);
/* try to grab another packet here without waiting
* on the semaphore, in case it hits max */
pkt = RPCRT4_pop_packet();
if (!pkt) break;
/* decrement semaphore */
WaitForSingleObject(server_sem, 0);
}
InterlockedIncrement(&worker_free);
}
InterlockedDecrement(&worker_free);
InterlockedDecrement(&worker_count);
RpcPacket *pkt = the_arg;
RPCRT4_process_packet(pkt->conn, pkt->hdr, pkt->msg);
HeapFree(GetProcessHeap(), 0, pkt);
return 0;
}
static void RPCRT4_create_worker_if_needed(void)
{
if (!worker_free && worker_count < MAX_THREADS) {
HANDLE thread;
InterlockedIncrement(&worker_count);
InterlockedIncrement(&worker_free);
thread = CreateThread(NULL, 0, RPCRT4_worker_thread, NULL, 0, NULL);
if (thread) CloseHandle(thread);
else {
InterlockedDecrement(&worker_free);
InterlockedDecrement(&worker_count);
}
}
}
static DWORD CALLBACK RPCRT4_io_thread(LPVOID the_arg)
{
RpcConnection* conn = (RpcConnection*)the_arg;
......@@ -429,9 +349,7 @@ static DWORD CALLBACK RPCRT4_io_thread(LPVOID the_arg)
packet->conn = conn;
packet->hdr = hdr;
packet->msg = msg;
RPCRT4_create_worker_if_needed();
RPCRT4_push_packet(packet);
ReleaseSemaphore(server_sem, 1, NULL);
QueueUserWorkItem(RPCRT4_worker_thread, packet, WT_EXECUTEDEFAULT);
#endif
msg = NULL;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment