Commit e848a00d authored by Zebediah Figura's avatar Zebediah Figura Committed by Alexandre Julliard

winegstreamer: Map the output sample buffer in the Unix library.

parent e103c75a
...@@ -179,7 +179,13 @@ struct wg_parser_event ...@@ -179,7 +179,13 @@ struct wg_parser_event
enum wg_parser_event_type type; enum wg_parser_event_type type;
union union
{ {
GstBuffer *buffer; struct
{
/* pts and duration are in 100-nanosecond units. */
uint64_t pts, duration;
uint32_t size;
bool discontinuity, preroll, delta, has_pts, has_duration;
} buffer;
struct struct
{ {
uint64_t position, stop; uint64_t position, stop;
...@@ -198,6 +204,8 @@ struct wg_parser_stream ...@@ -198,6 +204,8 @@ struct wg_parser_stream
pthread_cond_t event_cond, event_empty_cond; pthread_cond_t event_cond, event_empty_cond;
struct wg_parser_event event; struct wg_parser_event event;
GstBuffer *buffer;
GstMapInfo map_info;
bool flushing, eos, enabled, has_caps; bool flushing, eos, enabled, has_caps;
...@@ -226,6 +234,9 @@ struct unix_funcs ...@@ -226,6 +234,9 @@ struct unix_funcs
void (CDECL *wg_parser_stream_disable)(struct wg_parser_stream *stream); void (CDECL *wg_parser_stream_disable)(struct wg_parser_stream *stream);
bool (CDECL *wg_parser_stream_get_event)(struct wg_parser_stream *stream, struct wg_parser_event *event); bool (CDECL *wg_parser_stream_get_event)(struct wg_parser_stream *stream, struct wg_parser_event *event);
bool (CDECL *wg_parser_stream_copy_buffer)(struct wg_parser_stream *stream,
void *data, uint32_t offset, uint32_t size);
void (CDECL *wg_parser_stream_release_buffer)(struct wg_parser_stream *stream);
void (CDECL *wg_parser_stream_notify_qos)(struct wg_parser_stream *stream, void (CDECL *wg_parser_stream_notify_qos)(struct wg_parser_stream *stream,
bool underflow, double proportion, int64_t diff, uint64_t timestamp); bool underflow, double proportion, int64_t diff, uint64_t timestamp);
......
...@@ -568,11 +568,13 @@ static bool amt_to_wg_format(const AM_MEDIA_TYPE *mt, struct wg_format *format) ...@@ -568,11 +568,13 @@ static bool amt_to_wg_format(const AM_MEDIA_TYPE *mt, struct wg_format *format)
/* Fill and send a single IMediaSample. */ /* Fill and send a single IMediaSample. */
static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample, static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample,
GstBuffer *buf, GstMapInfo *info, gsize offset, gsize size, DWORD bytes_per_second) const struct wg_parser_event *event, uint32_t offset, uint32_t size, DWORD bytes_per_second)
{ {
HRESULT hr; HRESULT hr;
BYTE *ptr = NULL; BYTE *ptr = NULL;
TRACE("offset %u, size %u, sample size %u\n", offset, size, IMediaSample_GetSize(sample));
hr = IMediaSample_SetActualDataLength(sample, size); hr = IMediaSample_SetActualDataLength(sample, size);
if(FAILED(hr)){ if(FAILED(hr)){
WARN("SetActualDataLength failed: %08x\n", hr); WARN("SetActualDataLength failed: %08x\n", hr);
...@@ -581,37 +583,48 @@ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample, ...@@ -581,37 +583,48 @@ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample,
IMediaSample_GetPointer(sample, &ptr); IMediaSample_GetPointer(sample, &ptr);
memcpy(ptr, &info->data[offset], size); if (!unix_funcs->wg_parser_stream_copy_buffer(pin->wg_stream, ptr, offset, size))
{
if (GST_BUFFER_PTS_IS_VALID(buf)) { /* The GStreamer pin has been flushed. */
REFERENCE_TIME rtStart, ptsStart = buf->pts; return S_OK;
}
if (offset > 0)
ptsStart = buf->pts + gst_util_uint64_scale(offset, GST_SECOND, bytes_per_second); if (event->u.buffer.has_pts)
rtStart = ((ptsStart / 100) - pin->seek.llCurrent) * pin->seek.dRate; {
REFERENCE_TIME start_pts = event->u.buffer.pts;
if (GST_BUFFER_DURATION_IS_VALID(buf)) {
REFERENCE_TIME rtStop, tStart, tStop, ptsStop = buf->pts + buf->duration; if (offset)
if (offset + size < info->size) start_pts += gst_util_uint64_scale(offset, 10000000, bytes_per_second);
ptsStop = buf->pts + gst_util_uint64_scale(offset + size, GST_SECOND, bytes_per_second); start_pts -= pin->seek.llCurrent;
tStart = ptsStart / 100; start_pts *= pin->seek.dRate;
tStop = ptsStop / 100;
rtStop = ((ptsStop / 100) - pin->seek.llCurrent) * pin->seek.dRate; if (event->u.buffer.has_duration)
TRACE("Current time on %p: %i to %i ms\n", pin, (int)(rtStart / 10000), (int)(rtStop / 10000)); {
IMediaSample_SetTime(sample, &rtStart, rtStop >= 0 ? &rtStop : NULL); REFERENCE_TIME end_pts = event->u.buffer.pts + event->u.buffer.duration;
IMediaSample_SetMediaTime(sample, &tStart, &tStop);
} else { if (offset + size < event->u.buffer.size)
IMediaSample_SetTime(sample, rtStart >= 0 ? &rtStart : NULL, NULL); end_pts = event->u.buffer.pts + gst_util_uint64_scale(offset + size, 10000000, bytes_per_second);
end_pts -= pin->seek.llCurrent;
end_pts *= pin->seek.dRate;
IMediaSample_SetTime(sample, &start_pts, &end_pts);
IMediaSample_SetMediaTime(sample, &start_pts, &end_pts);
}
else
{
IMediaSample_SetTime(sample, &start_pts, NULL);
IMediaSample_SetMediaTime(sample, NULL, NULL); IMediaSample_SetMediaTime(sample, NULL, NULL);
} }
} else { }
else
{
IMediaSample_SetTime(sample, NULL, NULL); IMediaSample_SetTime(sample, NULL, NULL);
IMediaSample_SetMediaTime(sample, NULL, NULL); IMediaSample_SetMediaTime(sample, NULL, NULL);
} }
IMediaSample_SetDiscontinuity(sample, !offset && GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DISCONT)); IMediaSample_SetDiscontinuity(sample, !offset && event->u.buffer.discontinuity);
IMediaSample_SetPreroll(sample, GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_LIVE)); IMediaSample_SetPreroll(sample, event->u.buffer.preroll);
IMediaSample_SetSyncPoint(sample, !GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)); IMediaSample_SetSyncPoint(sample, !event->u.buffer.delta);
if (!pin->pin.pin.peer) if (!pin->pin.pin.peer)
hr = VFW_E_NOT_CONNECTED; hr = VFW_E_NOT_CONNECTED;
...@@ -625,23 +638,21 @@ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample, ...@@ -625,23 +638,21 @@ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample,
/* Send a single GStreamer buffer (splitting it into multiple IMediaSamples if /* Send a single GStreamer buffer (splitting it into multiple IMediaSamples if
* necessary). */ * necessary). */
static void send_buffer(struct parser_source *pin, GstBuffer *buf) static void send_buffer(struct parser_source *pin, const struct wg_parser_event *event)
{ {
HRESULT hr; HRESULT hr;
IMediaSample *sample; IMediaSample *sample;
GstMapInfo info;
gst_buffer_map(buf, &info, GST_MAP_READ);
if (IsEqualGUID(&pin->pin.pin.mt.formattype, &FORMAT_WaveFormatEx) if (IsEqualGUID(&pin->pin.pin.mt.formattype, &FORMAT_WaveFormatEx)
&& (IsEqualGUID(&pin->pin.pin.mt.subtype, &MEDIASUBTYPE_PCM) && (IsEqualGUID(&pin->pin.pin.mt.subtype, &MEDIASUBTYPE_PCM)
|| IsEqualGUID(&pin->pin.pin.mt.subtype, &MEDIASUBTYPE_IEEE_FLOAT))) || IsEqualGUID(&pin->pin.pin.mt.subtype, &MEDIASUBTYPE_IEEE_FLOAT)))
{ {
WAVEFORMATEX *format = (WAVEFORMATEX *)pin->pin.pin.mt.pbFormat; WAVEFORMATEX *format = (WAVEFORMATEX *)pin->pin.pin.mt.pbFormat;
gsize offset = 0; uint32_t offset = 0;
while (offset < info.size)
while (offset < event->u.buffer.size)
{ {
gsize advance; uint32_t advance;
hr = BaseOutputPinImpl_GetDeliveryBuffer(&pin->pin, &sample, NULL, NULL, 0); hr = BaseOutputPinImpl_GetDeliveryBuffer(&pin->pin, &sample, NULL, NULL, 0);
...@@ -652,9 +663,9 @@ static void send_buffer(struct parser_source *pin, GstBuffer *buf) ...@@ -652,9 +663,9 @@ static void send_buffer(struct parser_source *pin, GstBuffer *buf)
break; break;
} }
advance = min(IMediaSample_GetSize(sample), info.size - offset); advance = min(IMediaSample_GetSize(sample), event->u.buffer.size - offset);
hr = send_sample(pin, sample, buf, &info, offset, advance, format->nAvgBytesPerSec); hr = send_sample(pin, sample, event, offset, advance, format->nAvgBytesPerSec);
IMediaSample_Release(sample); IMediaSample_Release(sample);
...@@ -675,15 +686,13 @@ static void send_buffer(struct parser_source *pin, GstBuffer *buf) ...@@ -675,15 +686,13 @@ static void send_buffer(struct parser_source *pin, GstBuffer *buf)
} }
else else
{ {
hr = send_sample(pin, sample, buf, &info, 0, info.size, 0); hr = send_sample(pin, sample, event, 0, event->u.buffer.size, 0);
IMediaSample_Release(sample); IMediaSample_Release(sample);
} }
} }
gst_buffer_unmap(buf, &info); unix_funcs->wg_parser_stream_release_buffer(pin->wg_stream);
gst_buffer_unref(buf);
} }
static DWORD CALLBACK stream_thread(void *arg) static DWORD CALLBACK stream_thread(void *arg)
...@@ -710,7 +719,7 @@ static DWORD CALLBACK stream_thread(void *arg) ...@@ -710,7 +719,7 @@ static DWORD CALLBACK stream_thread(void *arg)
switch (event.type) switch (event.type)
{ {
case WG_PARSER_EVENT_BUFFER: case WG_PARSER_EVENT_BUFFER:
send_buffer(pin, event.u.buffer); send_buffer(pin, &event);
break; break;
case WG_PARSER_EVENT_EOS: case WG_PARSER_EVENT_EOS:
......
...@@ -410,14 +410,56 @@ static bool CDECL wg_parser_stream_get_event(struct wg_parser_stream *stream, st ...@@ -410,14 +410,56 @@ static bool CDECL wg_parser_stream_get_event(struct wg_parser_stream *stream, st
} }
*event = stream->event; *event = stream->event;
if (stream->event.type != WG_PARSER_EVENT_BUFFER)
{
stream->event.type = WG_PARSER_EVENT_NONE; stream->event.type = WG_PARSER_EVENT_NONE;
pthread_cond_signal(&stream->event_empty_cond);
}
pthread_mutex_unlock(&parser->mutex);
return true;
}
static bool CDECL wg_parser_stream_copy_buffer(struct wg_parser_stream *stream,
void *data, uint32_t offset, uint32_t size)
{
struct wg_parser *parser = stream->parser;
pthread_mutex_lock(&parser->mutex);
if (!stream->buffer)
{
pthread_mutex_unlock(&parser->mutex); pthread_mutex_unlock(&parser->mutex);
pthread_cond_signal(&stream->event_empty_cond); return false;
}
assert(stream->event.type == WG_PARSER_EVENT_BUFFER);
assert(offset < stream->map_info.size);
assert(offset + size <= stream->map_info.size);
memcpy(data, stream->map_info.data + offset, size);
pthread_mutex_unlock(&parser->mutex);
return true; return true;
} }
static void CDECL wg_parser_stream_release_buffer(struct wg_parser_stream *stream)
{
struct wg_parser *parser = stream->parser;
pthread_mutex_lock(&parser->mutex);
assert(stream->event.type == WG_PARSER_EVENT_BUFFER);
gst_buffer_unmap(stream->buffer, &stream->map_info);
gst_buffer_unref(stream->buffer);
stream->buffer = NULL;
stream->event.type = WG_PARSER_EVENT_NONE;
pthread_mutex_unlock(&parser->mutex);
pthread_cond_signal(&stream->event_empty_cond);
}
static bool CDECL wg_parser_stream_seek(struct wg_parser_stream *stream, double rate, static bool CDECL wg_parser_stream_seek(struct wg_parser_stream *stream, double rate,
uint64_t start_pos, uint64_t stop_pos, DWORD start_flags, DWORD stop_flags) uint64_t start_pos, uint64_t stop_pos, DWORD start_flags, DWORD stop_flags)
{ {
...@@ -482,7 +524,8 @@ static void no_more_pads(GstElement *element, gpointer user) ...@@ -482,7 +524,8 @@ static void no_more_pads(GstElement *element, gpointer user)
pthread_cond_signal(&parser->init_cond); pthread_cond_signal(&parser->init_cond);
} }
static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream, const struct wg_parser_event *event) static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream,
const struct wg_parser_event *event, GstBuffer *buffer)
{ {
struct wg_parser *parser = stream->parser; struct wg_parser *parser = stream->parser;
...@@ -501,7 +544,18 @@ static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream, const s ...@@ -501,7 +544,18 @@ static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream, const s
GST_DEBUG("Filter is flushing; discarding event."); GST_DEBUG("Filter is flushing; discarding event.");
return GST_FLOW_FLUSHING; return GST_FLOW_FLUSHING;
} }
if (buffer)
{
assert(GST_IS_BUFFER(buffer));
if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ))
{
pthread_mutex_unlock(&parser->mutex);
GST_ERROR("Failed to map buffer.\n");
return GST_FLOW_ERROR;
}
}
stream->event = *event; stream->event = *event;
stream->buffer = buffer;
pthread_mutex_unlock(&parser->mutex); pthread_mutex_unlock(&parser->mutex);
pthread_cond_signal(&stream->event_cond); pthread_cond_signal(&stream->event_cond);
GST_LOG("Event queued."); GST_LOG("Event queued.");
...@@ -535,7 +589,7 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) ...@@ -535,7 +589,7 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
stream_event.u.segment.position = segment->position / 100; stream_event.u.segment.position = segment->position / 100;
stream_event.u.segment.stop = segment->stop / 100; stream_event.u.segment.stop = segment->stop / 100;
stream_event.u.segment.rate = segment->rate * segment->applied_rate; stream_event.u.segment.rate = segment->rate * segment->applied_rate;
queue_stream_event(stream, &stream_event); queue_stream_event(stream, &stream_event, NULL);
} }
break; break;
...@@ -545,7 +599,7 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) ...@@ -545,7 +599,7 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
struct wg_parser_event stream_event; struct wg_parser_event stream_event;
stream_event.type = WG_PARSER_EVENT_EOS; stream_event.type = WG_PARSER_EVENT_EOS;
queue_stream_event(stream, &stream_event); queue_stream_event(stream, &stream_event, NULL);
} }
else else
{ {
...@@ -564,16 +618,11 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) ...@@ -564,16 +618,11 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
stream->flushing = true; stream->flushing = true;
pthread_cond_signal(&stream->event_empty_cond); pthread_cond_signal(&stream->event_empty_cond);
switch (stream->event.type) if (stream->event.type == WG_PARSER_EVENT_BUFFER)
{ {
case WG_PARSER_EVENT_NONE: gst_buffer_unmap(stream->buffer, &stream->map_info);
case WG_PARSER_EVENT_EOS: gst_buffer_unref(stream->buffer);
case WG_PARSER_EVENT_SEGMENT: stream->buffer = NULL;
break;
case WG_PARSER_EVENT_BUFFER:
gst_buffer_unref(stream->event.u.buffer);
break;
} }
stream->event.type = WG_PARSER_EVENT_NONE; stream->event.type = WG_PARSER_EVENT_NONE;
...@@ -625,9 +674,18 @@ static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *bu ...@@ -625,9 +674,18 @@ static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *bu
} }
stream_event.type = WG_PARSER_EVENT_BUFFER; stream_event.type = WG_PARSER_EVENT_BUFFER;
stream_event.u.buffer = buffer;
/* Transfer our reference to the buffer to the object. */ if ((stream_event.u.buffer.has_pts = GST_BUFFER_PTS_IS_VALID(buffer)))
if ((ret = queue_stream_event(stream, &stream_event)) != GST_FLOW_OK) stream_event.u.buffer.pts = GST_BUFFER_PTS(buffer) / 100;
if ((stream_event.u.buffer.has_duration = GST_BUFFER_DURATION_IS_VALID(buffer)))
stream_event.u.buffer.duration = GST_BUFFER_DURATION(buffer) / 100;
stream_event.u.buffer.discontinuity = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT);
stream_event.u.buffer.preroll = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_LIVE);
stream_event.u.buffer.delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);
stream_event.u.buffer.size = gst_buffer_get_size(buffer);
/* Transfer our reference to the buffer to the stream object. */
if ((ret = queue_stream_event(stream, &stream_event, buffer)) != GST_FLOW_OK)
gst_buffer_unref(buffer); gst_buffer_unref(buffer);
return ret; return ret;
} }
...@@ -1613,6 +1671,8 @@ static const struct unix_funcs funcs = ...@@ -1613,6 +1671,8 @@ static const struct unix_funcs funcs =
wg_parser_stream_disable, wg_parser_stream_disable,
wg_parser_stream_get_event, wg_parser_stream_get_event,
wg_parser_stream_copy_buffer,
wg_parser_stream_release_buffer,
wg_parser_stream_notify_qos, wg_parser_stream_notify_qos,
wg_parser_stream_seek, wg_parser_stream_seek,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment