Commit 042c1cef authored by Rémi Bernon's avatar Rémi Bernon Committed by Alexandre Julliard

winegstreamer: Allow wg_parser_stream_get_buffer to wait on all streams.

For the WM reader, returning the earliest buffer.
parent af751b59
......@@ -86,7 +86,8 @@ void wg_parser_stream_get_preferred_format(struct wg_parser_stream *stream, stru
void wg_parser_stream_enable(struct wg_parser_stream *stream, const struct wg_format *format);
void wg_parser_stream_disable(struct wg_parser_stream *stream);
bool wg_parser_stream_get_buffer(struct wg_parser_stream *stream, struct wg_parser_buffer *buffer);
bool wg_parser_stream_get_buffer(struct wg_parser *parser, struct wg_parser_stream *stream,
struct wg_parser_buffer *buffer);
bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream,
void *data, uint32_t offset, uint32_t size);
void wg_parser_stream_release_buffer(struct wg_parser_stream *stream);
......
......@@ -202,15 +202,17 @@ void wg_parser_stream_disable(struct wg_parser_stream *stream)
__wine_unix_call(unix_handle, unix_wg_parser_stream_disable, stream);
}
bool wg_parser_stream_get_buffer(struct wg_parser_stream *stream, struct wg_parser_buffer *buffer)
bool wg_parser_stream_get_buffer(struct wg_parser *parser, struct wg_parser_stream *stream,
struct wg_parser_buffer *buffer)
{
struct wg_parser_stream_get_buffer_params params =
{
.parser = parser,
.stream = stream,
.buffer = buffer,
};
TRACE("stream %p, buffer %p.\n", stream, buffer);
TRACE("parser %p, stream %p, buffer %p.\n", parser, stream, buffer);
return !__wine_unix_call(unix_handle, unix_wg_parser_stream_get_buffer, &params);
}
......
......@@ -528,12 +528,13 @@ out:
static void wait_on_sample(struct media_stream *stream, IUnknown *token)
{
struct media_source *source = stream->parent_source;
PROPVARIANT empty_var = {.vt = VT_EMPTY};
struct wg_parser_buffer buffer;
TRACE("%p, %p\n", stream, token);
if (wg_parser_stream_get_buffer(stream->wg_stream, &buffer))
if (wg_parser_stream_get_buffer(source->wg_parser, stream->wg_stream, &buffer))
{
send_buffer(stream, &buffer, token);
}
......
......@@ -938,7 +938,7 @@ static DWORD CALLBACK stream_thread(void *arg)
continue;
}
if (wg_parser_stream_get_buffer(pin->wg_stream, &buffer))
if (wg_parser_stream_get_buffer(filter->wg_parser, pin->wg_stream, &buffer))
{
send_buffer(pin, &buffer);
}
......
......@@ -154,6 +154,7 @@ struct wg_parser_buffer
/* pts and duration are in 100-nanosecond units. */
UINT64 pts, duration;
UINT32 size;
UINT32 stream;
bool discontinuity, preroll, delta, has_pts, has_duration;
};
C_ASSERT(sizeof(struct wg_parser_buffer) == 32);
......@@ -222,6 +223,7 @@ struct wg_parser_stream_enable_params
struct wg_parser_stream_get_buffer_params
{
struct wg_parser *parser;
struct wg_parser_stream *stream;
struct wg_parser_buffer *buffer;
};
......
......@@ -96,6 +96,7 @@ struct wg_parser
struct wg_parser_stream
{
struct wg_parser *parser;
uint32_t number;
GstPad *their_src, *post_sink, *post_src, *my_sink;
GstElement *flip;
......@@ -260,43 +261,89 @@ static NTSTATUS wg_parser_stream_disable(void *args)
return S_OK;
}
static GstBuffer *wait_parser_stream_buffer(struct wg_parser *parser, struct wg_parser_stream *stream)
{
GstBuffer *buffer = NULL;
/* Note that we can both have a buffer and stream->eos, in which case we
* must return the buffer. */
while (!(buffer = stream->buffer) && !stream->eos)
pthread_cond_wait(&stream->event_cond, &parser->mutex);
return buffer;
}
static NTSTATUS wg_parser_stream_get_buffer(void *args)
{
const struct wg_parser_stream_get_buffer_params *params = args;
struct wg_parser_buffer *wg_buffer = params->buffer;
struct wg_parser_stream *stream = params->stream;
struct wg_parser *parser = stream->parser;
struct wg_parser *parser = params->parser;
GstBuffer *buffer;
unsigned int i;
pthread_mutex_lock(&parser->mutex);
while (!stream->eos && !stream->buffer)
pthread_cond_wait(&stream->event_cond, &parser->mutex);
/* Note that we can both have a buffer and stream->eos, in which case we
* must return the buffer. */
if ((buffer = stream->buffer))
if (stream)
buffer = wait_parser_stream_buffer(parser, stream);
else
{
/* FIXME: Should we use gst_segment_to_stream_time_full()? Under what
* circumstances is the stream time not equal to the buffer PTS? Note
* that this will need modification to wg_parser_stream_notify_qos() as
* well. */
if ((wg_buffer->has_pts = GST_BUFFER_PTS_IS_VALID(buffer)))
wg_buffer->pts = GST_BUFFER_PTS(buffer) / 100;
if ((wg_buffer->has_duration = GST_BUFFER_DURATION_IS_VALID(buffer)))
wg_buffer->duration = GST_BUFFER_DURATION(buffer) / 100;
wg_buffer->discontinuity = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT);
wg_buffer->preroll = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_LIVE);
wg_buffer->delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);
wg_buffer->size = gst_buffer_get_size(buffer);
/* Find the earliest buffer by PTS.
*
* Native seems to behave similarly to this with the wm async reader, although our
* unit tests show that it's not entirely consistent—some frames are received
* slightly out of order. It's possible that one stream is being manually offset
* to account for decoding latency.
*
* The behaviour with the wm sync reader, when stream 0 is requested, seems
* consistent with this hypothesis, but with a much larger offset—the video
* stream seems to be "behind" by about 150 ms.
*
* The main reason for doing this is that the video and audio stream probably
* don't have quite the same "frame rate", and we don't want to force one stream
* to decode faster just to keep up with the other. Delivering samples in PTS
* order should avoid that problem. */
GstBuffer *earliest = NULL;
for (i = 0; i < parser->stream_count; ++i)
{
if (!parser->streams[i]->enabled || !(buffer = wait_parser_stream_buffer(parser, parser->streams[i])))
continue;
/* invalid PTS is GST_CLOCK_TIME_NONE == (guint64)-1, so this will prefer valid timestamps. */
if (!earliest || GST_BUFFER_PTS(buffer) < GST_BUFFER_PTS(earliest))
{
stream = parser->streams[i];
earliest = buffer;
}
}
buffer = earliest;
}
if (!buffer)
{
pthread_mutex_unlock(&parser->mutex);
return S_OK;
return S_FALSE;
}
/* FIXME: Should we use gst_segment_to_stream_time_full()? Under what
* circumstances is the stream time not equal to the buffer PTS? Note
* that this will need modification to wg_parser_stream_notify_qos() as
* well. */
if ((wg_buffer->has_pts = GST_BUFFER_PTS_IS_VALID(buffer)))
wg_buffer->pts = GST_BUFFER_PTS(buffer) / 100;
if ((wg_buffer->has_duration = GST_BUFFER_DURATION_IS_VALID(buffer)))
wg_buffer->duration = GST_BUFFER_DURATION(buffer) / 100;
wg_buffer->discontinuity = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT);
wg_buffer->preroll = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_LIVE);
wg_buffer->delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);
wg_buffer->size = gst_buffer_get_size(buffer);
wg_buffer->stream = stream->number;
pthread_mutex_unlock(&parser->mutex);
return S_FALSE;
return S_OK;
}
static NTSTATUS wg_parser_stream_copy_buffer(void *args)
......@@ -691,6 +738,7 @@ static struct wg_parser_stream *create_stream(struct wg_parser *parser)
gst_segment_init(&stream->segment, GST_FORMAT_UNDEFINED);
stream->parser = parser;
stream->number = parser->stream_count;
stream->current_format.major_type = WG_MAJOR_TYPE_UNKNOWN;
pthread_cond_init(&stream->event_cond, NULL);
pthread_cond_init(&stream->event_empty_cond, NULL);
......
......@@ -1596,13 +1596,17 @@ static HRESULT wm_stream_allocate_sample(struct wm_stream *stream, DWORD size, I
return S_OK;
}
static HRESULT wm_reader_read_stream_sample(struct wm_reader *reader, struct wm_stream *stream,
struct wg_parser_buffer *buffer, INSSBuffer **sample, QWORD *pts, QWORD *duration, DWORD *flags)
static HRESULT wm_reader_read_stream_sample(struct wm_reader *reader, struct wg_parser_buffer *buffer,
INSSBuffer **sample, QWORD *pts, QWORD *duration, DWORD *flags)
{
struct wm_stream *stream;
DWORD size, capacity;
HRESULT hr;
BYTE *data;
if (!(stream = wm_reader_get_stream_by_stream_number(reader, buffer->stream + 1)))
return E_INVALIDARG;
TRACE("Got buffer for '%s' stream %p.\n", get_major_type_string(stream->format.major_type), stream);
if (FAILED(hr = wm_stream_allocate_sample(stream, buffer->size, sample)))
......@@ -1648,49 +1652,6 @@ static HRESULT wm_reader_read_stream_sample(struct wm_reader *reader, struct wm_
return S_OK;
}
/* Find the earliest buffer by PTS.
*
* Native seems to behave similarly to this with the async reader, although our
* unit tests show that it's not entirely consistent—some frames are received
* slightly out of order. It's possible that one stream is being manually offset
* to account for decoding latency.
*
* The behaviour with the synchronous reader, when stream 0 is requested, seems
* consistent with this hypothesis, but with a much larger offset—the video
* stream seems to be "behind" by about 150 ms.
*
* The main reason for doing this is that the video and audio stream probably
* don't have quite the same "frame rate", and we don't want to force one stream
* to decode faster just to keep up with the other. Delivering samples in PTS
* order should avoid that problem. */
static WORD get_earliest_buffer(struct wm_reader *reader, struct wg_parser_buffer *ret_buffer)
{
struct wg_parser_buffer buffer;
QWORD earliest_pts = UI64_MAX;
WORD stream_number = 0;
WORD i;
for (i = 0; i < reader->stream_count; ++i)
{
struct wm_stream *stream = &reader->streams[i];
if (stream->selection == WMT_OFF)
continue;
if (!wg_parser_stream_get_buffer(stream->wg_stream, &buffer))
continue;
if (buffer.has_pts && buffer.pts < earliest_pts)
{
stream_number = i + 1;
earliest_pts = buffer.pts;
*ret_buffer = buffer;
}
}
return stream_number;
}
static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream_number,
INSSBuffer **ret_sample, QWORD *pts, QWORD *duration, DWORD *flags, WORD *ret_stream_number)
{
......@@ -1702,13 +1663,11 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream
{
if (!stream_number)
{
if (!(stream_number = get_earliest_buffer(reader, &wg_buffer)))
if (!wg_parser_stream_get_buffer(reader->wg_parser, NULL, &wg_buffer))
{
/* All streams are disabled or EOS. */
return NS_E_NO_MORE_SAMPLES;
}
stream = wm_reader_get_stream_by_stream_number(reader, stream_number);
}
else
{
......@@ -1727,7 +1686,7 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream
if (stream->eos)
return NS_E_NO_MORE_SAMPLES;
if (!wg_parser_stream_get_buffer(stream->wg_stream, &wg_buffer))
if (!wg_parser_stream_get_buffer(reader->wg_parser, stream->wg_stream, &wg_buffer))
{
stream->eos = true;
TRACE("End of stream.\n");
......@@ -1735,8 +1694,8 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream
}
}
if (SUCCEEDED(hr = wm_reader_read_stream_sample(reader, stream, &wg_buffer, ret_sample, pts, duration, flags)))
*ret_stream_number = stream_number;
if (SUCCEEDED(hr = wm_reader_read_stream_sample(reader, &wg_buffer, ret_sample, pts, duration, flags)))
*ret_stream_number = wg_buffer.stream + 1;
} while (hr == S_FALSE);
return hr;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment