Commit ed71339d authored by Aric Stewart's avatar Aric Stewart Committed by Alexandre Julliard

strmbase: Add OutputQueue_EOS implementation.

parent 928994d9
...@@ -31,7 +31,10 @@ ...@@ -31,7 +31,10 @@
WINE_DEFAULT_DEBUG_CHANNEL(strmbase); WINE_DEFAULT_DEBUG_CHANNEL(strmbase);
enum {SAMPLE_PACKET, EOS_PACKET};
typedef struct tagQueuedEvent { typedef struct tagQueuedEvent {
int type;
struct list entry; struct list entry;
IMediaSample *pSample; IMediaSample *pSample;
...@@ -163,6 +166,7 @@ HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSamp ...@@ -163,6 +166,7 @@ HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSamp
hr = E_OUTOFMEMORY; hr = E_OUTOFMEMORY;
break; break;
} }
qev->type = SAMPLE_PACKET;
qev->pSample = ppSamples[i]; qev->pSample = ppSamples[i];
IMediaSample_AddRef(ppSamples[i]); IMediaSample_AddRef(ppSamples[i]);
list_add_tail(pOutputQueue->SampleList, &qev->entry); list_add_tail(pOutputQueue->SampleList, &qev->entry);
...@@ -182,6 +186,51 @@ HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSam ...@@ -182,6 +186,51 @@ HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSam
return OutputQueue_ReceiveMultiple(pOutputQueue,&pSample,1,&processed); return OutputQueue_ReceiveMultiple(pOutputQueue,&pSample,1,&processed);
} }
VOID WINAPI OutputQueue_SendAnyway(OutputQueue *pOutputQueue)
{
if (pOutputQueue->hThread)
{
EnterCriticalSection(&pOutputQueue->csQueue);
if (list_count(pOutputQueue->SampleList) > 0)
{
pOutputQueue->bSendAnyway = TRUE;
SetEvent(pOutputQueue->hProcessQueue);
}
LeaveCriticalSection(&pOutputQueue->csQueue);
}
}
VOID WINAPI OutputQueue_EOS(OutputQueue *pOutputQueue)
{
EnterCriticalSection(&pOutputQueue->csQueue);
if (pOutputQueue->hThread)
{
QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
if (!qev)
{
ERR("Out of Memory\n");
LeaveCriticalSection(&pOutputQueue->csQueue);
return;
}
qev->type = EOS_PACKET;
qev->pSample = NULL;
list_add_tail(pOutputQueue->SampleList, &qev->entry);
}
else
{
IPin* ppin = NULL;
IPin_ConnectedTo((IPin*)pOutputQueue->pInputPin, &ppin);
if (ppin)
{
IPin_EndOfStream(ppin);
IPin_Release(ppin);
}
}
LeaveCriticalSection(&pOutputQueue->csQueue);
/* Covers sending the Event to the worker Thread */
OutputQueue_SendAnyway(pOutputQueue);
}
DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue) DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue)
{ {
do do
...@@ -189,7 +238,12 @@ DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue) ...@@ -189,7 +238,12 @@ DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue)
EnterCriticalSection(&pOutputQueue->csQueue); EnterCriticalSection(&pOutputQueue->csQueue);
if (list_count(pOutputQueue->SampleList) > 0 && if (list_count(pOutputQueue->SampleList) > 0 &&
(!pOutputQueue->bBatchExact || (!pOutputQueue->bBatchExact ||
list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize)) list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize ||
pOutputQueue->bSendAnyway
)
)
{
while (list_count(pOutputQueue->SampleList) > 0)
{ {
IMediaSample **ppSamples; IMediaSample **ppSamples;
LONG nSamples; LONG nSamples;
...@@ -197,25 +251,59 @@ DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue) ...@@ -197,25 +251,59 @@ DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue)
struct list *cursor, *cursor2; struct list *cursor, *cursor2;
int i = 0; int i = 0;
nSamples = list_count(pOutputQueue->SampleList); /* First Pass Process Samples */
ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * nSamples); i = list_count(pOutputQueue->SampleList);
ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * i);
nSamples = 0;
LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList) LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
{ {
QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry); QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
if (qev->type == SAMPLE_PACKET)
ppSamples[nSamples++] = qev->pSample;
else
break;
list_remove(cursor); list_remove(cursor);
ppSamples[i++] = qev->pSample;
HeapFree(GetProcessHeap(),0,qev); HeapFree(GetProcessHeap(),0,qev);
} }
if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin) if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin)
{ {
IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin); IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
LeaveCriticalSection(&pOutputQueue->csQueue);
IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed); IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed);
EnterCriticalSection(&pOutputQueue->csQueue);
IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin); IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
} }
for (i = 0; i < nSamples; i++) for (i = 0; i < nSamples; i++)
IUnknown_Release(ppSamples[i]); IUnknown_Release(ppSamples[i]);
HeapFree(GetProcessHeap(),0,ppSamples); HeapFree(GetProcessHeap(),0,ppSamples);
/* Process Non-Samples */
if (list_count(pOutputQueue->SampleList) > 0)
{
LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
{
QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
if (qev->type == EOS_PACKET)
{
IPin* ppin = NULL;
IPin_ConnectedTo((IPin*)pOutputQueue->pInputPin, &ppin);
if (ppin)
{
IPin_EndOfStream(ppin);
IPin_Release(ppin);
}
}
else if (qev->type == SAMPLE_PACKET)
break;
else
FIXME("Unhandled Event type %i\n",qev->type);
list_remove(cursor);
HeapFree(GetProcessHeap(),0,qev);
}
}
}
pOutputQueue->bSendAnyway = FALSE;
} }
LeaveCriticalSection(&pOutputQueue->csQueue); LeaveCriticalSection(&pOutputQueue->csQueue);
WaitForSingleObject(pOutputQueue->hProcessQueue, INFINITE); WaitForSingleObject(pOutputQueue->hProcessQueue, INFINITE);
......
...@@ -358,6 +358,7 @@ typedef struct tagOutputQueue { ...@@ -358,6 +358,7 @@ typedef struct tagOutputQueue {
LONG lBatchSize; LONG lBatchSize;
BOOL bBatchExact; BOOL bBatchExact;
BOOL bTerminate; BOOL bTerminate;
BOOL bSendAnyway;
struct list *SampleList; struct list *SampleList;
...@@ -377,6 +378,8 @@ HRESULT WINAPI OutputQueue_Construct( BaseOutputPin *pInputPin, BOOL bAuto, ...@@ -377,6 +378,8 @@ HRESULT WINAPI OutputQueue_Construct( BaseOutputPin *pInputPin, BOOL bAuto,
HRESULT WINAPI OutputQueue_Destroy(OutputQueue *pOutputQueue); HRESULT WINAPI OutputQueue_Destroy(OutputQueue *pOutputQueue);
HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSample **ppSamples, LONG nSamples, LONG *nSamplesProcessed); HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSample **ppSamples, LONG nSamples, LONG *nSamplesProcessed);
HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSample); HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSample);
VOID WINAPI OutputQueue_EOS(OutputQueue *pOutputQueue);
VOID WINAPI OutputQueue_SendAnyway(OutputQueue *pOutputQueue);
DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue); DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue);
/* Dll Functions */ /* Dll Functions */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment