Commit 737e67b1 authored by Mike Hearn's avatar Mike Hearn Committed by Alexandre Julliard

Propagate apartments through the intermediate threads, make listener

thread apartment scoped.
parent f4f4dff9
...@@ -1105,7 +1105,7 @@ _LocalServerThread(LPVOID param) { ...@@ -1105,7 +1105,7 @@ _LocalServerThread(LPVOID param) {
ULARGE_INTEGER newpos; ULARGE_INTEGER newpos;
ULONG res; ULONG res;
TRACE("Starting threader for %s.\n",debugstr_guid(&newClass->classIdentifier)); TRACE("Starting classfactory server thread for %s.\n",debugstr_guid(&newClass->classIdentifier));
strcpy(pipefn,PIPEPREF); strcpy(pipefn,PIPEPREF);
WINE_StringFromCLSID(&newClass->classIdentifier,pipefn+strlen(PIPEPREF)); WINE_StringFromCLSID(&newClass->classIdentifier,pipefn+strlen(PIPEPREF));
...@@ -1262,7 +1262,7 @@ HRESULT WINAPI CoRegisterClassObject( ...@@ -1262,7 +1262,7 @@ HRESULT WINAPI CoRegisterClassObject(
if (dwClsContext & CLSCTX_LOCAL_SERVER) { if (dwClsContext & CLSCTX_LOCAL_SERVER) {
DWORD tid; DWORD tid;
start_listener_thread(); start_apartment_listener_thread();
newClass->hThread = CreateThread(NULL,0,_LocalServerThread,newClass,0,&tid); newClass->hThread = CreateThread(NULL,0,_LocalServerThread,newClass,0,&tid);
} }
return S_OK; return S_OK;
......
...@@ -105,6 +105,7 @@ typedef struct tagAPARTMENT { ...@@ -105,6 +105,7 @@ typedef struct tagAPARTMENT {
IOBJECT *proxies; /* imported objects */ IOBJECT *proxies; /* imported objects */
LPUNKNOWN state; /* state object (see Co[Get,Set]State) */ LPUNKNOWN state; /* state object (see Co[Get,Set]State) */
LPVOID ErrorInfo; /* thread error info */ LPVOID ErrorInfo; /* thread error info */
DWORD listenertid; /* id of apartment_listener_thread */
} APARTMENT; } APARTMENT;
extern APARTMENT MTA, *apts; extern APARTMENT MTA, *apts;
...@@ -152,7 +153,7 @@ HRESULT MARSHAL_Disconnect_Proxies(void); ...@@ -152,7 +153,7 @@ HRESULT MARSHAL_Disconnect_Proxies(void);
HRESULT MARSHAL_GetStandardMarshalCF(LPVOID *ppv); HRESULT MARSHAL_GetStandardMarshalCF(LPVOID *ppv);
void start_listener_thread(void); void start_apartment_listener_thread(void);
extern HRESULT PIPE_GetNewPipeBuf(wine_marshal_id *mid, IRpcChannelBuffer **pipebuf); extern HRESULT PIPE_GetNewPipeBuf(wine_marshal_id *mid, IRpcChannelBuffer **pipebuf);
......
...@@ -176,7 +176,11 @@ MARSHAL_Register_Proxy(wine_marshal_id *mid,LPUNKNOWN punk) { ...@@ -176,7 +176,11 @@ MARSHAL_Register_Proxy(wine_marshal_id *mid,LPUNKNOWN punk) {
for (i=0;i<nrofproxies;i++) { for (i=0;i<nrofproxies;i++) {
if (MARSHAL_Compare_Mids(mid,&(proxies[i].mid))) { if (MARSHAL_Compare_Mids(mid,&(proxies[i].mid))) {
ERR("Already have mid?\n"); /* this will happen if the program attempts to marshal two
* objects that implement the same interface */
FIXME("need to use IPIDs, already have mid oxid=%s, oid=%s, iid=%s\n",
wine_dbgstr_longlong(mid->oxid), wine_dbgstr_longlong(mid->oid), debugstr_guid(&mid->iid));
return E_FAIL; return E_FAIL;
} }
} }
...@@ -262,7 +266,7 @@ StdMarshalImpl_MarshalInterface( ...@@ -262,7 +266,7 @@ StdMarshalImpl_MarshalInterface(
TRACE("(...,%s,...)\n",debugstr_guid(riid)); TRACE("(...,%s,...)\n",debugstr_guid(riid));
start_listener_thread(); /* just to be sure we have one running. */ start_apartment_listener_thread(); /* just to be sure we have one running. */
IUnknown_QueryInterface((LPUNKNOWN)pv,&IID_IUnknown,(LPVOID*)&pUnk); IUnknown_QueryInterface((LPUNKNOWN)pv,&IID_IUnknown,(LPVOID*)&pUnk);
mid.oxid = COM_CurrentApt()->oxid; mid.oxid = COM_CurrentApt()->oxid;
......
...@@ -91,15 +91,19 @@ static wine_rpc_request **reqs = NULL; ...@@ -91,15 +91,19 @@ static wine_rpc_request **reqs = NULL;
static int nrofreqs = 0; static int nrofreqs = 0;
/* This pipe is _thread_ based, each thread which talks to a remote /* This pipe is _thread_ based, each thread which talks to a remote
* apartment (mid) has its own pipe */ * apartment (mid) has its own pipe. The same structure is used both
* for outgoing and incoming RPCs.
*/
typedef struct _wine_pipe { typedef struct _wine_pipe {
wine_marshal_id mid; /* target mid */ wine_marshal_id mid; /* target mid */
DWORD tid; /* thread which owns this outgoing pipe */ DWORD tid; /* thread which owns this pipe */
HANDLE hPipe; HANDLE hPipe;
int pending; int pending;
HANDLE hThread; HANDLE hThread;
CRITICAL_SECTION crit; CRITICAL_SECTION crit;
APARTMENT *apt; /* apartment of the marshalling thread for the stub dispatch case */
} wine_pipe; } wine_pipe;
static wine_pipe *pipes = NULL; static wine_pipe *pipes = NULL;
...@@ -167,7 +171,7 @@ write_pipe(HANDLE hf, LPVOID ptr, DWORD size) { ...@@ -167,7 +171,7 @@ write_pipe(HANDLE hf, LPVOID ptr, DWORD size) {
return S_OK; return S_OK;
} }
static DWORD WINAPI _StubReaderThread(LPVOID); static DWORD WINAPI stub_dispatch_thread(LPVOID);
static HRESULT static HRESULT
PIPE_RegisterPipe(wine_marshal_id *mid, HANDLE hPipe, BOOL startreader) { PIPE_RegisterPipe(wine_marshal_id *mid, HANDLE hPipe, BOOL startreader) {
...@@ -187,10 +191,12 @@ PIPE_RegisterPipe(wine_marshal_id *mid, HANDLE hPipe, BOOL startreader) { ...@@ -187,10 +191,12 @@ PIPE_RegisterPipe(wine_marshal_id *mid, HANDLE hPipe, BOOL startreader) {
sprintf(pipefn,OLESTUBMGR"_%08lx%08lx",(DWORD)(mid->oxid >> 32),(DWORD)mid->oxid); sprintf(pipefn,OLESTUBMGR"_%08lx%08lx",(DWORD)(mid->oxid >> 32),(DWORD)mid->oxid);
memcpy(&(pipes[nrofpipes].mid),mid,sizeof(*mid)); memcpy(&(pipes[nrofpipes].mid),mid,sizeof(*mid));
pipes[nrofpipes].hPipe = hPipe; pipes[nrofpipes].hPipe = hPipe;
pipes[nrofpipes].apt = COM_CurrentApt();
assert( pipes[nrofpipes].apt );
InitializeCriticalSection(&(pipes[nrofpipes].crit)); InitializeCriticalSection(&(pipes[nrofpipes].crit));
nrofpipes++; nrofpipes++;
if (startreader) { if (startreader) {
pipes[nrofpipes-1].hThread = CreateThread(NULL,0,_StubReaderThread,(LPVOID)(pipes+(nrofpipes-1)),0,&(pipes[nrofpipes-1].tid)); pipes[nrofpipes-1].hThread = CreateThread(NULL,0,stub_dispatch_thread,(LPVOID)(pipes+(nrofpipes-1)),0,&(pipes[nrofpipes-1].tid));
} else { } else {
pipes[nrofpipes-1].tid = GetCurrentThreadId(); pipes[nrofpipes-1].tid = GetCurrentThreadId();
} }
...@@ -579,6 +585,8 @@ HRESULT create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv) { ...@@ -579,6 +585,8 @@ HRESULT create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv) {
WINE_StringFromCLSID(rclsid,pipefn+strlen(PIPEPREF)); WINE_StringFromCLSID(rclsid,pipefn+strlen(PIPEPREF));
while (tries++<MAXTRIES) { while (tries++<MAXTRIES) {
TRACE("waiting for %s\n", pipefn);
WaitNamedPipeA( pipefn, NMPWAIT_WAIT_FOREVER ); WaitNamedPipeA( pipefn, NMPWAIT_WAIT_FOREVER );
hPipe = CreateFileA( hPipe = CreateFileA(
pipefn, pipefn,
...@@ -606,6 +614,7 @@ HRESULT create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv) { ...@@ -606,6 +614,7 @@ HRESULT create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv) {
Sleep(1000); Sleep(1000);
continue; continue;
} }
TRACE("read marshal id from pipe\n");
CloseHandle(hPipe); CloseHandle(hPipe);
break; break;
} }
...@@ -617,6 +626,7 @@ HRESULT create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv) { ...@@ -617,6 +626,7 @@ HRESULT create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv) {
if (hres) goto out; if (hres) goto out;
seekto.u.LowPart = 0;seekto.u.HighPart = 0; seekto.u.LowPart = 0;seekto.u.HighPart = 0;
hres = IStream_Seek(pStm,seekto,SEEK_SET,&newpos); hres = IStream_Seek(pStm,seekto,SEEK_SET,&newpos);
TRACE("unmarshalling classfactory\n");
hres = CoUnmarshalInterface(pStm,&IID_IClassFactory,ppv); hres = CoUnmarshalInterface(pStm,&IID_IClassFactory,ppv);
out: out:
IStream_Release(pStm); IStream_Release(pStm);
...@@ -726,15 +736,21 @@ end: ...@@ -726,15 +736,21 @@ end:
return hres; return hres;
} }
static DWORD WINAPI /* This thread listens on the given pipe for requests to a particular stub manager */
_StubReaderThread(LPVOID param) { static DWORD WINAPI stub_dispatch_thread(LPVOID param)
{
wine_pipe *xpipe = (wine_pipe*)param; wine_pipe *xpipe = (wine_pipe*)param;
HANDLE xhPipe = xpipe->hPipe; HANDLE xhPipe = xpipe->hPipe;
HRESULT hres = S_OK; HRESULT hres = S_OK;
TRACE("STUB reader thread %lx\n",GetCurrentProcessId()); TRACE("starting for apartment OXID %08lx%08lx\n", (DWORD)(xpipe->mid.oxid >> 32), (DWORD)(xpipe->mid.oxid));
/* join marshalling apartment. fixme: this stuff is all very wrong, threading needs to work like native */
NtCurrentTeb()->ReservedForOle = xpipe->apt;
while (!hres) { while (!hres) {
int i; int i;
hres = COM_RpcReceive(xpipe); hres = COM_RpcReceive(xpipe);
if (hres) break; if (hres) break;
...@@ -746,21 +762,31 @@ _StubReaderThread(LPVOID param) { ...@@ -746,21 +762,31 @@ _StubReaderThread(LPVOID param) {
} }
} }
} }
FIXME("Failed with hres %lx\n",hres);
/* fixme: this thread never quits naturally */
WARN("exiting with hres %lx\n",hres);
CloseHandle(xhPipe); CloseHandle(xhPipe);
return 0; return 0;
} }
/* This thread listens on a named pipe for the entire process. It /* This thread listens on a named pipe for each apartment that exports
* deals with incoming connection requests to objects. * objects. It deals with incoming connection requests. Each time a
* client connects a separate thread is spawned for that particular
* connection.
*
* This architecture is different in native DCOM.
*/ */
static DWORD WINAPI listener_thread(LPVOID param) static DWORD WINAPI apartment_listener_thread(LPVOID param)
{ {
char pipefn[200]; char pipefn[200];
HANDLE listenPipe; HANDLE listenPipe;
APARTMENT *apt = (APARTMENT *) param;
sprintf(pipefn,OLESTUBMGR"_%08lx",GetCurrentProcessId()); /* we must join the marshalling threads apartment */
TRACE("Process listener thread starting on (%s)\n",pipefn); NtCurrentTeb()->ReservedForOle = apt;
sprintf(pipefn,OLESTUBMGR"_%08lx%08lx", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid));
TRACE("Apartment listener thread starting on (%s)\n",pipefn);
while (1) { while (1) {
listenPipe = CreateNamedPipeA( listenPipe = CreateNamedPipeA(
...@@ -774,7 +800,7 @@ static DWORD WINAPI listener_thread(LPVOID param) ...@@ -774,7 +800,7 @@ static DWORD WINAPI listener_thread(LPVOID param)
NULL NULL
); );
if (listenPipe == INVALID_HANDLE_VALUE) { if (listenPipe == INVALID_HANDLE_VALUE) {
FIXME("pipe creation failed for %s, le is %lx\n",pipefn,GetLastError()); FIXME("pipe creation failed for %s, error %lx\n",pipefn,GetLastError());
return 1; /* permanent failure, so quit stubmgr thread */ return 1; /* permanent failure, so quit stubmgr thread */
} }
if (!ConnectNamedPipe(listenPipe,NULL)) { if (!ConnectNamedPipe(listenPipe,NULL)) {
...@@ -787,15 +813,24 @@ static DWORD WINAPI listener_thread(LPVOID param) ...@@ -787,15 +813,24 @@ static DWORD WINAPI listener_thread(LPVOID param)
return 0; return 0;
} }
void start_listener_thread() void start_apartment_listener_thread()
{ {
static BOOL running = FALSE; APARTMENT *apt = COM_CurrentApt();
DWORD tid;
assert( apt );
TRACE("apt->listenertid=%ld\n", apt->listenertid);
/* apt->listenertid is a hack which needs to die at some point, as
* it leaks information into the apartment structure. in fact,
* this thread isn't quite correct anyway as native RPC doesn't
* use a thread per apartment at all, instead the dispatch thread
* either enters the apartment to perform the RPC (for MTAs, RTAs)
* or does a context switch into it for STAs.
*/
if (!running) if (!apt->listenertid)
{ {
running = TRUE; CreateThread(NULL, 0, apartment_listener_thread, apt, 0, &apt->listenertid);
CreateThread(NULL, 0, listener_thread, NULL, 0, &tid);
Sleep(2000); /* actually we just try opening the pipe until it succeeds */
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment