Commit 1617bd3f authored by Ziqing Hui's avatar Ziqing Hui Committed by Alexandre Julliard

winegstreamer: Implement wg_muxer_read_data.

parent 98310d21
...@@ -59,8 +59,11 @@ struct wg_muxer ...@@ -59,8 +59,11 @@ struct wg_muxer
GstPad *my_sink; GstPad *my_sink;
GstCaps *my_sink_caps; GstCaps *my_sink_caps;
GstAtomicQueue *output_queue;
GstBuffer *buffer;
pthread_mutex_t mutex; pthread_mutex_t mutex;
guint64 offset; guint64 offset; /* Write offset of the output buffer generated by muxer. */
struct list streams; struct list streams;
}; };
...@@ -198,8 +201,29 @@ static gboolean muxer_sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *ev ...@@ -198,8 +201,29 @@ static gboolean muxer_sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *ev
static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer) static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer)
{ {
GST_FIXME("Stub."); GstBuffer *buffer_writable= gst_buffer_make_writable(buffer);
return GST_FLOW_ERROR; struct wg_muxer *muxer = gst_pad_get_element_private(pad);
GST_DEBUG("muxer %p, pad %"GST_PTR_FORMAT", parent %"GST_PTR_FORMAT", buffer <%"GST_PTR_FORMAT">.",
muxer, pad, parent, buffer);
pthread_mutex_lock(&muxer->mutex);
GST_BUFFER_OFFSET(buffer_writable) = GST_BUFFER_OFFSET_NONE;
if (muxer->offset != GST_BUFFER_OFFSET_NONE)
{
GST_BUFFER_OFFSET(buffer_writable) = muxer->offset;
muxer->offset = GST_BUFFER_OFFSET_NONE;
}
gst_atomic_queue_push(muxer->output_queue, buffer_writable);
GST_DEBUG("Pushed writable buffer <%"GST_PTR_FORMAT"> to output queue %p, %u buffers in queue now.",
buffer_writable, muxer->output_queue, gst_atomic_queue_length(muxer->output_queue));
pthread_mutex_unlock(&muxer->mutex);
return GST_FLOW_OK;
} }
static void stream_free(struct wg_muxer_stream *stream) static void stream_free(struct wg_muxer_stream *stream)
...@@ -226,6 +250,8 @@ NTSTATUS wg_muxer_create(void *args) ...@@ -226,6 +250,8 @@ NTSTATUS wg_muxer_create(void *args)
pthread_mutex_init(&muxer->mutex, NULL); pthread_mutex_init(&muxer->mutex, NULL);
if (!(muxer->container = gst_bin_new("wg_muxer"))) if (!(muxer->container = gst_bin_new("wg_muxer")))
goto out; goto out;
if (!(muxer->output_queue = gst_atomic_queue_new(8)))
goto out;
/* Create sink pad. */ /* Create sink pad. */
if (!(muxer->my_sink_caps = gst_caps_from_string(params->format))) if (!(muxer->my_sink_caps = gst_caps_from_string(params->format)))
...@@ -257,6 +283,8 @@ out: ...@@ -257,6 +283,8 @@ out:
gst_object_unref(template); gst_object_unref(template);
if (muxer->my_sink_caps) if (muxer->my_sink_caps)
gst_caps_unref(muxer->my_sink_caps); gst_caps_unref(muxer->my_sink_caps);
if (muxer->output_queue)
gst_atomic_queue_unref(muxer->output_queue);
if (muxer->container) if (muxer->container)
gst_object_unref(muxer->container); gst_object_unref(muxer->container);
pthread_mutex_destroy(&muxer->mutex); pthread_mutex_destroy(&muxer->mutex);
...@@ -269,6 +297,7 @@ NTSTATUS wg_muxer_destroy(void *args) ...@@ -269,6 +297,7 @@ NTSTATUS wg_muxer_destroy(void *args)
{ {
struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args); struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args);
struct wg_muxer_stream *stream, *next; struct wg_muxer_stream *stream, *next;
GstBuffer *buffer;
LIST_FOR_EACH_ENTRY_SAFE(stream, next, &muxer->streams, struct wg_muxer_stream, entry) LIST_FOR_EACH_ENTRY_SAFE(stream, next, &muxer->streams, struct wg_muxer_stream, entry)
{ {
...@@ -276,6 +305,13 @@ NTSTATUS wg_muxer_destroy(void *args) ...@@ -276,6 +305,13 @@ NTSTATUS wg_muxer_destroy(void *args)
stream_free(stream); stream_free(stream);
} }
if (muxer->buffer)
gst_buffer_unref(muxer->buffer);
while ((buffer = gst_atomic_queue_pop(muxer->output_queue)))
gst_buffer_unref(buffer);
gst_atomic_queue_unref(muxer->output_queue);
gst_object_unref(muxer->my_sink); gst_object_unref(muxer->my_sink);
gst_caps_unref(muxer->my_sink_caps); gst_caps_unref(muxer->my_sink_caps);
gst_element_set_state(muxer->container, GST_STATE_NULL); gst_element_set_state(muxer->container, GST_STATE_NULL);
...@@ -454,6 +490,36 @@ NTSTATUS wg_muxer_push_sample(void *args) ...@@ -454,6 +490,36 @@ NTSTATUS wg_muxer_push_sample(void *args)
NTSTATUS wg_muxer_read_data(void *args) NTSTATUS wg_muxer_read_data(void *args)
{ {
GST_FIXME("Not implemented."); struct wg_muxer_read_data_params *params = args;
return STATUS_NOT_IMPLEMENTED; struct wg_muxer *muxer = get_muxer(params->muxer);
gsize size, copied;
/* Pop buffer from output queue. */
if (!muxer->buffer)
{
if (!(muxer->buffer = gst_atomic_queue_pop(muxer->output_queue)))
return STATUS_NO_MEMORY;
/* We may continuously read data from a same buffer multiple times.
* But we only need to set the offset at the first reading. */
if (GST_BUFFER_OFFSET_IS_VALID(muxer->buffer))
params->offset = GST_BUFFER_OFFSET(muxer->buffer);
}
/* Copy data. */
size = min(gst_buffer_get_size(muxer->buffer), params->size);
copied = gst_buffer_extract(muxer->buffer, 0, params->buffer, size);
params->size = copied;
GST_INFO("Copied %"G_GSIZE_FORMAT" bytes from buffer <%"GST_PTR_FORMAT">", copied, muxer->buffer);
/* Unref buffer if all data is read. */
gst_buffer_resize(muxer->buffer, (gssize)copied, -1);
if (!gst_buffer_get_size(muxer->buffer))
{
gst_buffer_unref(muxer->buffer);
muxer->buffer = NULL;
}
return STATUS_SUCCESS;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment