Commit f5f74184 authored by Rémi Bernon's avatar Rémi Bernon Committed by Alexandre Julliard

winegstreamer: Use an atomic queue for wg_transform input buffers.

And push them one by one until an output buffer is generated, to avoid generating multiple output buffers without a backing wg_sample. This makes zero-copy more efficient for games which queue multiple input buffers before checking output, such as Yakuza 4. Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=45988 Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=47084 Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=49715 Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=52183Signed-off-by: 's avatarRémi Bernon <rbernon@codeweavers.com>
parent 7e046aa4
......@@ -51,8 +51,10 @@ struct wg_transform
GstPad *my_src, *my_sink;
GstPad *their_sink, *their_src;
GstSegment segment;
GstBufferList *input;
guint input_max_length;
GstAtomicQueue *input_queue;
guint output_plane_align;
struct wg_sample *output_wg_sample;
GstAtomicQueue *output_queue;
......@@ -215,9 +217,11 @@ NTSTATUS wg_transform_destroy(void *args)
{
struct wg_transform *transform = args;
GstSample *sample;
GstBuffer *buffer;
if (transform->input)
gst_buffer_list_unref(transform->input);
while ((buffer = gst_atomic_queue_pop(transform->input_queue)))
gst_buffer_unref(buffer);
gst_atomic_queue_unref(transform->input_queue);
gst_element_set_state(transform->container, GST_STATE_NULL);
......@@ -336,7 +340,7 @@ NTSTATUS wg_transform_create(void *args)
return STATUS_NO_MEMORY;
if (!(transform->container = gst_bin_new("wg_transform")))
goto out;
if (!(transform->input = gst_buffer_list_new()))
if (!(transform->input_queue = gst_atomic_queue_new(8)))
goto out;
if (!(transform->output_queue = gst_atomic_queue_new(8)))
goto out;
......@@ -503,8 +507,8 @@ out:
wg_allocator_destroy(transform->allocator);
if (transform->output_queue)
gst_atomic_queue_unref(transform->output_queue);
if (transform->input)
gst_buffer_list_unref(transform->input);
if (transform->input_queue)
gst_atomic_queue_unref(transform->input_queue);
if (transform->container)
{
gst_element_set_state(transform->container, GST_STATE_NULL);
......@@ -530,7 +534,7 @@ NTSTATUS wg_transform_push_data(void *args)
GstBuffer *buffer;
guint length;
length = gst_buffer_list_length(transform->input);
length = gst_atomic_queue_length(transform->input_queue);
if (length >= transform->input_max_length)
{
GST_INFO("Refusing %u bytes, %u buffers already queued", sample->size, length);
......@@ -556,7 +560,7 @@ NTSTATUS wg_transform_push_data(void *args)
GST_BUFFER_DURATION(buffer) = sample->duration * 100;
if (!(sample->flags & WG_SAMPLE_FLAG_SYNC_POINT))
GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);
gst_buffer_list_insert(transform->input, -1, buffer);
gst_atomic_queue_push(transform->input_queue, buffer);
params->result = S_OK;
return STATUS_SUCCESS;
......@@ -705,48 +709,52 @@ static NTSTATUS read_transform_output_data(GstBuffer *buffer, GstCaps *caps, gsi
return STATUS_SUCCESS;
}
NTSTATUS wg_transform_read_data(void *args)
static bool get_transform_output(struct wg_transform *transform, struct wg_sample *sample)
{
struct wg_transform_read_data_params *params = args;
struct wg_transform *transform = params->transform;
struct wg_sample *sample = params->sample;
struct wg_format *format = params->format;
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *output_buffer;
GstBufferList *input;
GstCaps *output_caps;
bool discard_data;
NTSTATUS status;
GstBuffer *input_buffer;
/* Provide the sample for transform_request_sample to pick it up */
InterlockedIncrement(&sample->refcount);
InterlockedExchangePointer((void **)&transform->output_wg_sample, sample);
if (!gst_buffer_list_length(transform->input))
GST_DEBUG("Not input buffer queued");
else if ((input = gst_buffer_list_new()))
while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue)))
{
ret = gst_pad_push_list(transform->my_src, transform->input);
transform->input = input;
}
else
if (!(input_buffer = gst_atomic_queue_pop(transform->input_queue)))
break;
if ((ret = gst_pad_push(transform->my_src, input_buffer)))
{
GST_ERROR("Failed to allocate new input queue");
ret = GST_FLOW_ERROR;
GST_ERROR("Failed to push transform input, error %d", ret);
break;
}
}
/* Remove the sample so transform_request_sample cannot use it */
if (InterlockedExchangePointer((void **)&transform->output_wg_sample, NULL))
InterlockedDecrement(&sample->refcount);
if (ret)
return ret == GST_FLOW_OK;
}
NTSTATUS wg_transform_read_data(void *args)
{
struct wg_transform_read_data_params *params = args;
struct wg_transform *transform = params->transform;
struct wg_sample *sample = params->sample;
struct wg_format *format = params->format;
GstBuffer *output_buffer;
GstCaps *output_caps;
bool discard_data;
NTSTATUS status;
if (!transform->output_sample && !get_transform_output(transform, sample))
{
GST_ERROR("Failed to push transform input, error %d", ret);
wg_allocator_release_sample(transform->allocator, sample, false);
return STATUS_UNSUCCESSFUL;
}
if (!transform->output_sample && !(transform->output_sample = gst_atomic_queue_pop(transform->output_queue)))
if (!transform->output_sample)
{
sample->size = 0;
params->result = MF_E_TRANSFORM_NEED_MORE_INPUT;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment