Commit b44d3a39 authored by Zebediah Figura's avatar Zebediah Figura Committed by Alexandre Julliard

winegstreamer: Manage our own thread for read requests in the media source.

parent 38330678
......@@ -113,6 +113,21 @@ struct media_source
HANDLE no_more_pads_event;
uint64_t file_size, next_pull_offset;
HANDLE read_thread;
bool read_thread_shutdown;
pthread_mutex_t mutex;
pthread_cond_t read_cond, read_done_cond;
struct
{
void *data;
uint64_t offset;
uint32_t size;
bool done;
bool ret;
} read_request;
bool shutdown;
};
static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface)
......@@ -442,42 +457,109 @@ GstFlowReturn bytestream_wrapper_pull(GstPad *pad, GstObject *parent, guint64 of
GstBuffer **buf)
{
struct media_source *source = gst_pad_get_element_private(pad);
IMFByteStream *byte_stream = source->byte_stream;
GstBuffer *new_buffer = NULL;
ULONG bytes_read;
GstMapInfo info;
BOOL is_eof;
HRESULT hr;
bool ret;
TRACE("requesting %u bytes at %s from source %p into buffer %p\n", len, wine_dbgstr_longlong(ofs), source, *buf);
if (ofs == GST_BUFFER_OFFSET_NONE)
ofs = source->next_pull_offset;
source->next_pull_offset = ofs + len;
if (FAILED(IMFByteStream_SetCurrentPosition(byte_stream, ofs)))
return GST_FLOW_ERROR;
if (FAILED(IMFByteStream_IsEndOfStream(byte_stream, &is_eof)))
return GST_FLOW_ERROR;
if (is_eof)
if (ofs >= source->file_size)
return GST_FLOW_EOS;
if (ofs + len >= source->file_size)
len = source->file_size - ofs;
if (!(*buf))
*buf = new_buffer = gst_buffer_new_and_alloc(len);
gst_buffer_map(*buf, &info, GST_MAP_WRITE);
hr = IMFByteStream_Read(byte_stream, info.data, len, &bytes_read);
pthread_mutex_lock(&source->mutex);
assert(!source->read_request.data);
source->read_request.data = info.data;
source->read_request.offset = ofs;
source->read_request.size = len;
source->read_request.done = false;
pthread_cond_signal(&source->read_cond);
/* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect
* the upstream pin to flush if necessary. We should never be blocked on
* read_thread() not running. */
while (!source->read_request.done)
pthread_cond_wait(&source->read_done_cond, &source->mutex);
ret = source->read_request.ret;
pthread_mutex_unlock(&source->mutex);
gst_buffer_unmap(*buf, &info);
gst_buffer_set_size(*buf, bytes_read);
if (!ret && new_buffer)
gst_buffer_unref(new_buffer);
return ret ? GST_FLOW_OK : GST_FLOW_ERROR;
}
static bool get_read_request(struct media_source *source, void **data, uint64_t *offset, uint32_t *size)
{
pthread_mutex_lock(&source->mutex);
while (!source->shutdown && !source->read_request.data)
pthread_cond_wait(&source->read_cond, &source->mutex);
if (source->shutdown)
{
pthread_mutex_unlock(&source->mutex);
return false;
}
*data = source->read_request.data;
*offset = source->read_request.offset;
*size = source->read_request.size;
pthread_mutex_unlock(&source->mutex);
return true;
}
static void complete_read_request(struct media_source *source, bool ret)
{
pthread_mutex_lock(&source->mutex);
source->read_request.done = true;
source->read_request.ret = ret;
source->read_request.data = NULL;
pthread_mutex_unlock(&source->mutex);
pthread_cond_signal(&source->read_done_cond);
}
if (FAILED(hr))
static DWORD CALLBACK read_thread(void *arg)
{
struct media_source *source = arg;
IMFByteStream *byte_stream = source->byte_stream;
TRACE("Starting read thread for media source %p.\n", source);
while (!source->read_thread_shutdown)
{
if (new_buffer)
gst_buffer_unref(new_buffer);
return GST_FLOW_ERROR;
uint64_t offset;
ULONG ret_size;
uint32_t size;
HRESULT hr;
void *data;
if (!get_read_request(source, &data, &offset, &size))
continue;
if (SUCCEEDED(hr = IMFByteStream_SetCurrentPosition(byte_stream, offset)))
hr = IMFByteStream_Read(byte_stream, data, size, &ret_size);
if (SUCCEEDED(hr) && ret_size != size)
ERR("Unexpected short read: requested %u bytes, got %u.\n", size, ret_size);
complete_read_request(source, SUCCEEDED(hr));
}
return GST_FLOW_OK;
TRACE("Media source is shutting down; exiting.\n");
return 0;
}
static gboolean bytestream_query(GstPad *pad, GstObject *parent, GstQuery *query)
......@@ -1164,6 +1246,21 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
IMFMediaStream_Release(&stream->IMFMediaStream_iface);
}
if (source->read_thread)
{
source->read_thread_shutdown = true;
pthread_mutex_lock(&source->mutex);
source->shutdown = true;
pthread_mutex_unlock(&source->mutex);
pthread_cond_signal(&source->read_cond);
WaitForSingleObject(source->read_thread, INFINITE);
CloseHandle(source->read_thread);
}
pthread_mutex_destroy(&source->mutex);
pthread_cond_destroy(&source->read_cond);
pthread_cond_destroy(&source->read_done_cond);
if (source->stream_count)
heap_free(source->streams);
......@@ -1284,6 +1381,12 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_
if (FAILED(hr = MFAllocateWorkQueue(&object->async_commands_queue)))
goto fail;
pthread_mutex_init(&object->mutex, NULL);
pthread_cond_init(&object->read_cond, NULL);
pthread_cond_init(&object->read_done_cond, NULL);
object->read_thread = CreateThread(NULL, 0, read_thread, object, 0, NULL);
object->container = gst_bin_new(NULL);
object->bus = gst_bus_new();
gst_bus_set_sync_handler(object->bus, mf_src_bus_watch_wrapper, object, NULL);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment