Commit 862ac14e authored by Nikolay Sivov's avatar Nikolay Sivov Committed by Alexandre Julliard

mfreadwrite/writer: Add sample/marker queue for each stream.

parent e0473521
......@@ -27,6 +27,7 @@
#include "mf_private.h"
#include "wine/debug.h"
#include "wine/list.h"
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
......@@ -36,11 +37,30 @@ enum writer_state
SINK_WRITER_STATE_WRITING,
};
struct marker_context
{
IUnknown IUnknown_iface;
LONG refcount;
unsigned int marker_type;
void *user_context;
};
struct pending_item
{
struct list entry;
unsigned int marker_type;
IMFSample *sample;
LONGLONG timestamp;
void *user_context;
};
struct stream
{
IMFStreamSink *stream_sink;
IMFTransform *encoder;
MF_SINK_WRITER_STATISTICS stats;
struct list queue;
};
struct sink_writer
......@@ -76,6 +96,74 @@ static struct sink_writer *impl_from_events_callback_IMFAsyncCallback(IMFAsyncCa
return CONTAINING_RECORD(iface, struct sink_writer, events_callback);
}
static struct marker_context *impl_from_marker_context_IUnknown(IUnknown *iface)
{
return CONTAINING_RECORD(iface, struct marker_context, IUnknown_iface);
}
static HRESULT WINAPI marker_context_QueryInterface(IUnknown *iface, REFIID riid, void **out)
{
if (IsEqualIID(riid, &IID_IUnknown))
{
*out = iface;
IUnknown_AddRef(iface);
return S_OK;
}
*out = NULL;
return E_NOINTERFACE;
}
static ULONG WINAPI marker_context_AddRef(IUnknown *iface)
{
struct marker_context *context = impl_from_marker_context_IUnknown(iface);
return InterlockedIncrement(&context->refcount);
}
static ULONG WINAPI marker_context_Release(IUnknown *iface)
{
struct marker_context *context = impl_from_marker_context_IUnknown(iface);
LONG refcount = InterlockedDecrement(&context->refcount);
if (!refcount)
free(context);
return refcount;
}
static const IUnknownVtbl marker_context_vtbl =
{
marker_context_QueryInterface,
marker_context_AddRef,
marker_context_Release,
};
static HRESULT create_marker_context(unsigned int marker_type, void *user_context,
IUnknown **ret)
{
struct marker_context *object;
if (!(object = calloc(1, sizeof(*object))))
return E_OUTOFMEMORY;
object->IUnknown_iface.lpVtbl = &marker_context_vtbl;
object->refcount = 1;
object->marker_type = marker_type;
object->user_context = user_context;
*ret = &object->IUnknown_iface;
return S_OK;
}
static void sink_writer_release_pending_item(struct pending_item *item)
{
list_remove(&item->entry);
if (item->sample)
IMFSample_Release(item->sample);
free(item);
}
static HRESULT WINAPI sink_writer_QueryInterface(IMFSinkWriter *iface, REFIID riid, void **out)
{
TRACE("%p, %s, %p.\n", iface, debugstr_guid(riid), out);
......@@ -103,6 +191,16 @@ static ULONG WINAPI sink_writer_AddRef(IMFSinkWriter *iface)
return refcount;
}
static void sink_writer_drop_pending_items(struct stream *stream)
{
struct pending_item *item, *next;
LIST_FOR_EACH_ENTRY_SAFE(item, next, &stream->queue, struct pending_item, entry)
{
sink_writer_release_pending_item(item);
}
}
static ULONG WINAPI sink_writer_Release(IMFSinkWriter *iface)
{
struct sink_writer *writer = impl_from_IMFSinkWriter(iface);
......@@ -125,6 +223,7 @@ static ULONG WINAPI sink_writer_Release(IMFSinkWriter *iface)
IMFStreamSink_Release(stream->stream_sink);
if (stream->encoder)
IMFTransform_Release(stream->encoder);
sink_writer_drop_pending_items(stream);
}
DeleteCriticalSection(&writer->cs);
free(writer);
......@@ -148,12 +247,14 @@ static HRESULT sink_writer_add_stream(struct sink_writer *writer, IMFStreamSink
if (FAILED(hr = IMFStreamSink_GetIdentifier(stream_sink, &id))) return hr;
*index = writer->streams.count++;
stream = &writer->streams.items[*index];
stream = &writer->streams.items[*index];
memset(stream, 0, sizeof(*stream));
stream->stream_sink = stream_sink;
IMFStreamSink_AddRef(stream_sink);
memset(&stream->stats, 0, sizeof(stream->stats));
stream->stats.cb = sizeof(stream->stats);
list_init(&stream->queue);
writer->streams.next_id = max(writer->streams.next_id, id);
return hr;
......@@ -283,6 +384,147 @@ static HRESULT sink_writer_get_buffer_length(IMFSample *sample, LONGLONG *timest
return hr;
}
static HRESULT sink_writer_place_marker(struct sink_writer *writer, struct stream *stream, unsigned int marker_type,
LONGLONG timestamp, void *user_context)
{
PROPVARIANT value, context;
IUnknown *context_obj;
HRESULT hr;
if (FAILED(hr = create_marker_context(marker_type, user_context, &context_obj))) return hr;
context.vt = VT_UNKNOWN;
context.punkVal = context_obj;
value.vt = VT_I8;
value.hVal.QuadPart = timestamp;
hr = IMFStreamSink_PlaceMarker(stream->stream_sink, marker_type, marker_type == MFSTREAMSINK_MARKER_TICK ? &value : NULL,
&context);
IUnknown_Release(context_obj);
return hr;
}
static HRESULT sink_writer_queue_marker(struct sink_writer *writer, struct stream *stream, unsigned int marker_type,
LONGLONG timestamp, void *user_context)
{
struct pending_item *item;
if (list_empty(&stream->queue))
return sink_writer_place_marker(writer, stream, marker_type, timestamp, user_context);
if (!(item = calloc(1, sizeof(*item))))
return E_OUTOFMEMORY;
item->marker_type = marker_type;
item->timestamp = timestamp;
list_add_tail(&stream->queue, &item->entry);
return S_OK;
}
static HRESULT sink_writer_send_stream_tick(struct sink_writer *writer, unsigned int index, LONGLONG timestamp)
{
struct stream *stream;
if (!(stream = sink_writer_get_stream(writer, index))) return MF_E_INVALIDSTREAMNUMBER;
writer->stats.llLastStreamTickReceived = timestamp;
writer->stats.qwNumStreamTicksReceived++;
stream->stats.llLastStreamTickReceived = timestamp;
stream->stats.qwNumStreamTicksReceived++;
return sink_writer_queue_marker(writer, stream, MFSTREAMSINK_MARKER_TICK, timestamp, NULL);
}
static HRESULT sink_writer_notify_end_of_segment(struct sink_writer *writer, unsigned int index)
{
struct stream *stream;
if (!(stream = sink_writer_get_stream(writer, index))) return MF_E_INVALIDSTREAMNUMBER;
return sink_writer_queue_marker(writer, stream, MFSTREAMSINK_MARKER_ENDOFSEGMENT, 0, NULL);
}
static HRESULT sink_writer_process_sample(struct sink_writer *writer, struct stream *stream)
{
struct pending_item *item, *next;
LONGLONG timestamp;
IMFSample *sample;
HRESULT hr;
if (list_empty(&stream->queue)) return S_OK;
item = LIST_ENTRY(list_head(&stream->queue), struct pending_item, entry);
if (!item->sample) return S_OK;
IMFSample_AddRef((sample = item->sample));
sink_writer_release_pending_item(item);
writer->stats.dwNumOutstandingSinkSampleRequests--;
stream->stats.dwNumOutstandingSinkSampleRequests--;
if (FAILED(hr = IMFSample_GetSampleTime(sample, &timestamp)))
{
IMFSample_Release(sample);
writer->status = hr;
return S_OK;
}
writer->stats.llLastTimestampProcessed = timestamp;
stream->stats.llLastTimestampProcessed = timestamp;
hr = IMFStreamSink_ProcessSample(stream->stream_sink, sample);
IMFSample_Release(sample);
if (FAILED(hr)) return hr;
LIST_FOR_EACH_ENTRY_SAFE(item, next, &stream->queue, struct pending_item, entry)
{
if (item->sample) break;
sink_writer_place_marker(writer, stream, item->marker_type, item->timestamp, item->user_context);
sink_writer_release_pending_item(item);
}
return hr;
}
static HRESULT sink_writer_encode_sample(struct sink_writer *writer, struct stream *stream, IMFSample *sample)
{
struct pending_item *item;
/* FIXME: call the encoder, queue its output */
if (!(item = calloc(1, sizeof(*item))))
return E_OUTOFMEMORY;
item->sample = sample;
IMFSample_AddRef(item->sample);
list_add_tail(&stream->queue, &item->entry);
return S_OK;
}
static HRESULT sink_writer_write_sample(struct sink_writer *writer, struct stream *stream, IMFSample *sample)
{
LONGLONG timestamp;
DWORD length;
HRESULT hr;
if (FAILED(hr = sink_writer_get_buffer_length(sample, &timestamp, &length))) return hr;
stream->stats.llLastTimestampReceived = timestamp;
stream->stats.qwNumSamplesReceived++;
stream->stats.dwByteCountQueued += length;
writer->stats.llLastTimestampReceived = timestamp;
writer->stats.qwNumSamplesReceived++;
writer->stats.dwByteCountQueued += length;
if (FAILED(hr = sink_writer_encode_sample(writer, stream, sample))) return hr;
if (stream->stats.dwNumOutstandingSinkSampleRequests)
hr = sink_writer_process_sample(writer, stream);
return hr;
}
static HRESULT WINAPI sink_writer_WriteSample(IMFSinkWriter *iface, DWORD index, IMFSample *sample)
{
struct sink_writer *writer = impl_from_IMFSinkWriter(iface);
......@@ -306,8 +548,6 @@ static HRESULT WINAPI sink_writer_WriteSample(IMFSinkWriter *iface, DWORD index,
}
else if (SUCCEEDED(hr = sink_writer_get_buffer_length(sample, &timestamp, &length)))
{
/* FIXME: queue sample */
stream->stats.llLastTimestampReceived = timestamp;
stream->stats.qwNumSamplesReceived++;
stream->stats.dwByteCountQueued += length;
......@@ -315,18 +555,32 @@ static HRESULT WINAPI sink_writer_WriteSample(IMFSinkWriter *iface, DWORD index,
writer->stats.llLastTimestampReceived = timestamp;
writer->stats.qwNumSamplesReceived++;
writer->stats.dwByteCountQueued += length;
hr = sink_writer_write_sample(writer, stream, sample);
}
EnterCriticalSection(&writer->cs);
LeaveCriticalSection(&writer->cs);
return hr;
}
static HRESULT WINAPI sink_writer_SendStreamTick(IMFSinkWriter *iface, DWORD index, LONGLONG timestamp)
{
FIXME("%p, %lu, %s.\n", iface, index, wine_dbgstr_longlong(timestamp));
struct sink_writer *writer = impl_from_IMFSinkWriter(iface);
HRESULT hr;
return E_NOTIMPL;
TRACE("%p, %lu, %s.\n", iface, index, wine_dbgstr_longlong(timestamp));
EnterCriticalSection(&writer->cs);
if (writer->state != SINK_WRITER_STATE_WRITING)
hr = MF_E_INVALIDREQUEST;
else
hr = sink_writer_send_stream_tick(writer, index, timestamp);
LeaveCriticalSection(&writer->cs);
return hr;
}
static HRESULT WINAPI sink_writer_PlaceMarker(IMFSinkWriter *iface, DWORD index, void *context)
......@@ -338,9 +592,33 @@ static HRESULT WINAPI sink_writer_PlaceMarker(IMFSinkWriter *iface, DWORD index,
static HRESULT WINAPI sink_writer_NotifyEndOfSegment(IMFSinkWriter *iface, DWORD index)
{
FIXME("%p, %lu.\n", iface, index);
struct sink_writer *writer = impl_from_IMFSinkWriter(iface);
HRESULT hr = S_OK;
unsigned int i;
return E_NOTIMPL;
TRACE("%p, %lu.\n", iface, index);
EnterCriticalSection(&writer->cs);
if (writer->state != SINK_WRITER_STATE_WRITING)
hr = MF_E_INVALIDREQUEST;
else if (index == MF_SINK_WRITER_ALL_STREAMS)
{
for (i = 0; i < writer->streams.count; ++i)
{
if (FAILED(hr = sink_writer_notify_end_of_segment(writer, index)))
{
WARN("Failed to place a marker for stream %u.\n", i);
break;
}
}
}
else
hr = sink_writer_notify_end_of_segment(writer, index);
LeaveCriticalSection(&writer->cs);
return hr;
}
static HRESULT WINAPI sink_writer_Flush(IMFSinkWriter *iface, DWORD index)
......@@ -539,6 +817,8 @@ static HRESULT WINAPI sink_writer_events_callback_Invoke(IMFAsyncCallback *iface
stream->stats.llLastSinkSampleRequest = timestamp;
stream->stats.dwNumOutstandingSinkSampleRequests++;
sink_writer_process_sample(writer, stream);
break;
default:
;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment