Commit f17b8010 authored by Rémi Bernon's avatar Rémi Bernon Committed by Alexandre Julliard

winegstreamer: Support zero-copy output using the allocator.

Through a custom allocator, by borrowing memory from the reading thread and mapping it instead of the allocated memory. We cannot use the buffer pool to share wrapped buffers, because some decoder will hold on the acquired buffers longer than they should and we cannot remove our memory from them as long as they keep a reference. Swapping the memory on map should be safe. Signed-off-by: 's avatarRémi Bernon <rbernon@codeweavers.com>
parent b38935f1
...@@ -999,6 +999,12 @@ void wg_sample_release(struct wg_sample *wg_sample) ...@@ -999,6 +999,12 @@ void wg_sample_release(struct wg_sample *wg_sample)
{ {
struct mf_sample *mf_sample = CONTAINING_RECORD(wg_sample, struct mf_sample, wg_sample); struct mf_sample *mf_sample = CONTAINING_RECORD(wg_sample, struct mf_sample, wg_sample);
if (InterlockedOr(&wg_sample->refcount, 0))
{
ERR("Sample %p is still in use, trouble ahead!\n", wg_sample);
return;
}
IMFMediaBuffer_Unlock(mf_sample->media_buffer); IMFMediaBuffer_Unlock(mf_sample->media_buffer);
IMFMediaBuffer_Release(mf_sample->media_buffer); IMFMediaBuffer_Release(mf_sample->media_buffer);
IMFSample_Release(mf_sample->sample); IMFSample_Release(mf_sample->sample);
......
...@@ -37,7 +37,12 @@ extern NTSTATUS wg_transform_destroy(void *args) DECLSPEC_HIDDEN; ...@@ -37,7 +37,12 @@ extern NTSTATUS wg_transform_destroy(void *args) DECLSPEC_HIDDEN;
extern NTSTATUS wg_transform_push_data(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_transform_push_data(void *args) DECLSPEC_HIDDEN;
extern NTSTATUS wg_transform_read_data(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_transform_read_data(void *args) DECLSPEC_HIDDEN;
extern GstAllocator *wg_allocator_create(void) DECLSPEC_HIDDEN; /* wg_allocator_release_sample can be used to release any sample that was requested. */
typedef struct wg_sample *(*wg_allocator_request_sample_cb)(gsize size, void *context);
extern GstAllocator *wg_allocator_create(wg_allocator_request_sample_cb request_sample,
void *request_sample_context) DECLSPEC_HIDDEN;
extern void wg_allocator_destroy(GstAllocator *allocator) DECLSPEC_HIDDEN; extern void wg_allocator_destroy(GstAllocator *allocator) DECLSPEC_HIDDEN;
extern void wg_allocator_release_sample(GstAllocator *allocator, struct wg_sample *sample,
bool discard_data) DECLSPEC_HIDDEN;
#endif /* __WINE_WINEGSTREAMER_UNIX_PRIVATE_H */ #endif /* __WINE_WINEGSTREAMER_UNIX_PRIVATE_H */
...@@ -128,6 +128,7 @@ struct wg_sample ...@@ -128,6 +128,7 @@ struct wg_sample
/* timestamp and duration are in 100-nanosecond units. */ /* timestamp and duration are in 100-nanosecond units. */
UINT64 pts; UINT64 pts;
UINT64 duration; UINT64 duration;
LONG refcount; /* unix refcount */
UINT32 flags; UINT32 flags;
UINT32 max_size; UINT32 max_size;
UINT32 size; UINT32 size;
......
...@@ -33,20 +33,33 @@ ...@@ -33,20 +33,33 @@
#include "unix_private.h" #include "unix_private.h"
#include "wine/list.h"
GST_DEBUG_CATEGORY_EXTERN(wine); GST_DEBUG_CATEGORY_EXTERN(wine);
#define GST_CAT_DEFAULT wine #define GST_CAT_DEFAULT wine
typedef struct typedef struct
{ {
GstMemory parent; GstMemory parent;
struct list entry;
GstMemory *unix_memory; GstMemory *unix_memory;
GstMapInfo unix_map_info; GstMapInfo unix_map_info;
struct wg_sample *sample;
gsize written;
} WgMemory; } WgMemory;
typedef struct typedef struct
{ {
GstAllocator parent; GstAllocator parent;
wg_allocator_request_sample_cb request_sample;
void *request_sample_context;
pthread_mutex_t mutex;
pthread_cond_t release_cond;
struct list memory_list;
} WgAllocator; } WgAllocator;
typedef struct typedef struct
...@@ -58,6 +71,7 @@ G_DEFINE_TYPE(WgAllocator, wg_allocator, GST_TYPE_ALLOCATOR); ...@@ -58,6 +71,7 @@ G_DEFINE_TYPE(WgAllocator, wg_allocator, GST_TYPE_ALLOCATOR);
static gpointer wg_allocator_map(GstMemory *gst_memory, GstMapInfo *info, gsize maxsize) static gpointer wg_allocator_map(GstMemory *gst_memory, GstMapInfo *info, gsize maxsize)
{ {
WgAllocator *allocator = (WgAllocator *)gst_memory->allocator;
WgMemory *memory = (WgMemory *)gst_memory; WgMemory *memory = (WgMemory *)gst_memory;
if (gst_memory->parent) if (gst_memory->parent)
...@@ -65,7 +79,19 @@ static gpointer wg_allocator_map(GstMemory *gst_memory, GstMapInfo *info, gsize ...@@ -65,7 +79,19 @@ static gpointer wg_allocator_map(GstMemory *gst_memory, GstMapInfo *info, gsize
GST_LOG("memory %p, info %p, maxsize %#zx", memory, info, maxsize); GST_LOG("memory %p, info %p, maxsize %#zx", memory, info, maxsize);
info->data = memory->unix_map_info.data; pthread_mutex_lock(&allocator->mutex);
if (!memory->sample)
info->data = memory->unix_map_info.data;
else
{
InterlockedIncrement(&memory->sample->refcount);
info->data = memory->sample->data;
}
if (info->flags & GST_MAP_WRITE)
memory->written = max(memory->written, maxsize);
pthread_mutex_unlock(&allocator->mutex);
GST_INFO("Mapped memory %p to %p", memory, info->data); GST_INFO("Mapped memory %p to %p", memory, info->data);
return info->data; return info->data;
...@@ -73,12 +99,23 @@ static gpointer wg_allocator_map(GstMemory *gst_memory, GstMapInfo *info, gsize ...@@ -73,12 +99,23 @@ static gpointer wg_allocator_map(GstMemory *gst_memory, GstMapInfo *info, gsize
static void wg_allocator_unmap(GstMemory *gst_memory, GstMapInfo *info) static void wg_allocator_unmap(GstMemory *gst_memory, GstMapInfo *info)
{ {
WgAllocator *allocator = (WgAllocator *)gst_memory->allocator;
WgMemory *memory = (WgMemory *)gst_memory; WgMemory *memory = (WgMemory *)gst_memory;
if (gst_memory->parent) if (gst_memory->parent)
return wg_allocator_unmap(gst_memory->parent, info); return wg_allocator_unmap(gst_memory->parent, info);
GST_LOG("memory %p, info %p", memory, info); GST_LOG("memory %p, info %p", memory, info);
pthread_mutex_lock(&allocator->mutex);
if (memory->sample && info->data == memory->sample->data)
{
InterlockedDecrement(&memory->sample->refcount);
pthread_cond_signal(&allocator->release_cond);
}
pthread_mutex_unlock(&allocator->mutex);
} }
static void wg_allocator_init(WgAllocator *allocator) static void wg_allocator_init(WgAllocator *allocator)
...@@ -91,6 +128,10 @@ static void wg_allocator_init(WgAllocator *allocator) ...@@ -91,6 +128,10 @@ static void wg_allocator_init(WgAllocator *allocator)
allocator->parent.mem_unmap_full = wg_allocator_unmap; allocator->parent.mem_unmap_full = wg_allocator_unmap;
GST_OBJECT_FLAG_SET(allocator, GST_ALLOCATOR_FLAG_CUSTOM_ALLOC); GST_OBJECT_FLAG_SET(allocator, GST_ALLOCATOR_FLAG_CUSTOM_ALLOC);
pthread_mutex_init(&allocator->mutex, NULL);
pthread_cond_init(&allocator->release_cond, NULL);
list_init(&allocator->memory_list);
} }
static void wg_allocator_finalize(GObject *object) static void wg_allocator_finalize(GObject *object)
...@@ -99,6 +140,9 @@ static void wg_allocator_finalize(GObject *object) ...@@ -99,6 +140,9 @@ static void wg_allocator_finalize(GObject *object)
GST_LOG("allocator %p", allocator); GST_LOG("allocator %p", allocator);
pthread_cond_destroy(&allocator->release_cond);
pthread_mutex_destroy(&allocator->mutex);
G_OBJECT_CLASS(wg_allocator_parent_class)->finalize(object); G_OBJECT_CLASS(wg_allocator_parent_class)->finalize(object);
} }
...@@ -116,8 +160,15 @@ static GstMemory *wg_allocator_alloc(GstAllocator *gst_allocator, gsize size, ...@@ -116,8 +160,15 @@ static GstMemory *wg_allocator_alloc(GstAllocator *gst_allocator, gsize size,
memory->unix_memory = gst_allocator_alloc(NULL, size, params); memory->unix_memory = gst_allocator_alloc(NULL, size, params);
gst_memory_map(memory->unix_memory, &memory->unix_map_info, GST_MAP_WRITE); gst_memory_map(memory->unix_memory, &memory->unix_map_info, GST_MAP_WRITE);
GST_INFO("Allocated memory %p, unix_memory %p, data %p", memory, memory->unix_memory, pthread_mutex_lock(&allocator->mutex);
memory->unix_map_info.data);
memory->sample = allocator->request_sample(size, allocator->request_sample_context);
list_add_tail(&allocator->memory_list, &memory->entry);
pthread_mutex_unlock(&allocator->mutex);
GST_INFO("Allocated memory %p, sample %p, unix_memory %p, data %p", memory,
memory->sample, memory->unix_memory, memory->unix_map_info.data);
return (GstMemory *)memory; return (GstMemory *)memory;
} }
...@@ -128,6 +179,16 @@ static void wg_allocator_free(GstAllocator *gst_allocator, GstMemory *gst_memory ...@@ -128,6 +179,16 @@ static void wg_allocator_free(GstAllocator *gst_allocator, GstMemory *gst_memory
GST_LOG("allocator %p, memory %p", allocator, memory); GST_LOG("allocator %p, memory %p", allocator, memory);
pthread_mutex_lock(&allocator->mutex);
if (memory->sample)
InterlockedDecrement(&memory->sample->refcount);
memory->sample = NULL;
list_remove(&memory->entry);
pthread_mutex_unlock(&allocator->mutex);
gst_memory_unmap(memory->unix_memory, &memory->unix_map_info); gst_memory_unmap(memory->unix_memory, &memory->unix_map_info);
gst_memory_unref(memory->unix_memory); gst_memory_unref(memory->unix_memory);
g_slice_free(WgMemory, memory); g_slice_free(WgMemory, memory);
...@@ -145,18 +206,82 @@ static void wg_allocator_class_init(WgAllocatorClass *klass) ...@@ -145,18 +206,82 @@ static void wg_allocator_class_init(WgAllocatorClass *klass)
root_class->finalize = wg_allocator_finalize; root_class->finalize = wg_allocator_finalize;
} }
GstAllocator *wg_allocator_create(void) GstAllocator *wg_allocator_create(wg_allocator_request_sample_cb request_sample, void *request_sample_context)
{
WgAllocator *allocator;
if (!(allocator = g_object_new(wg_allocator_get_type(), NULL)))
return NULL;
allocator->request_sample = request_sample;
allocator->request_sample_context = request_sample_context;
return GST_ALLOCATOR(allocator);
}
static void release_memory_sample(WgAllocator *allocator, WgMemory *memory, bool discard_data)
{ {
return g_object_new(wg_allocator_get_type(), NULL); struct wg_sample *sample;
if (!(sample = memory->sample))
return;
while (sample->refcount > 1)
{
GST_WARNING("Waiting for sample %p to be unmapped", sample);
pthread_cond_wait(&allocator->release_cond, &allocator->mutex);
}
InterlockedDecrement(&sample->refcount);
if (memory->written && !discard_data)
{
GST_WARNING("Copying %#zx bytes from sample %p, back to memory %p", memory->written, sample, memory);
memcpy(memory->unix_map_info.data, memory->sample->data, memory->written);
}
memory->sample = NULL;
GST_INFO("Released sample %p from memory %p", sample, memory);
} }
void wg_allocator_destroy(GstAllocator *gst_allocator) void wg_allocator_destroy(GstAllocator *gst_allocator)
{ {
WgAllocator *allocator = (WgAllocator *)gst_allocator; WgAllocator *allocator = (WgAllocator *)gst_allocator;
WgMemory *memory;
GST_LOG("allocator %p", allocator); GST_LOG("allocator %p", allocator);
pthread_mutex_lock(&allocator->mutex);
LIST_FOR_EACH_ENTRY(memory, &allocator->memory_list, WgMemory, entry)
release_memory_sample(allocator, memory, true);
pthread_mutex_unlock(&allocator->mutex);
g_object_unref(allocator); g_object_unref(allocator);
GST_INFO("Destroyed buffer allocator %p", allocator); GST_INFO("Destroyed buffer allocator %p", allocator);
} }
static WgMemory *find_sample_memory(WgAllocator *allocator, struct wg_sample *sample)
{
WgMemory *memory;
LIST_FOR_EACH_ENTRY(memory, &allocator->memory_list, WgMemory, entry)
if (memory->sample == sample)
return memory;
return NULL;
}
void wg_allocator_release_sample(GstAllocator *gst_allocator, struct wg_sample *sample,
bool discard_data)
{
WgAllocator *allocator = (WgAllocator *)gst_allocator;
WgMemory *memory;
GST_LOG("allocator %p, sample %p, discard_data %u", allocator, sample, discard_data);
pthread_mutex_lock(&allocator->mutex);
if ((memory = find_sample_memory(allocator, sample)))
release_memory_sample(allocator, memory, discard_data);
else if (sample->refcount)
GST_ERROR("Couldn't find memory for sample %p", sample);
pthread_mutex_unlock(&allocator->mutex);
}
...@@ -54,6 +54,7 @@ struct wg_transform ...@@ -54,6 +54,7 @@ struct wg_transform
GstBufferList *input; GstBufferList *input;
guint input_max_length; guint input_max_length;
guint output_plane_align; guint output_plane_align;
struct wg_sample *output_wg_sample;
GstAtomicQueue *output_queue; GstAtomicQueue *output_queue;
GstSample *output_sample; GstSample *output_sample;
bool output_caps_changed; bool output_caps_changed;
...@@ -306,6 +307,20 @@ static bool transform_append_element(struct wg_transform *transform, GstElement ...@@ -306,6 +307,20 @@ static bool transform_append_element(struct wg_transform *transform, GstElement
return success; return success;
} }
static struct wg_sample *transform_request_sample(gsize size, void *context)
{
struct wg_transform *transform = context;
struct wg_sample *sample;
GST_LOG("size %#zx, context %p", size, transform);
sample = InterlockedExchangePointer((void **)&transform->output_wg_sample, NULL);
if (!sample || sample->max_size < size)
return NULL;
return sample;
}
NTSTATUS wg_transform_create(void *args) NTSTATUS wg_transform_create(void *args)
{ {
struct wg_transform_create_params *params = args; struct wg_transform_create_params *params = args;
...@@ -330,7 +345,7 @@ NTSTATUS wg_transform_create(void *args) ...@@ -330,7 +345,7 @@ NTSTATUS wg_transform_create(void *args)
goto out; goto out;
if (!(transform->output_queue = gst_atomic_queue_new(8))) if (!(transform->output_queue = gst_atomic_queue_new(8)))
goto out; goto out;
if (!(transform->allocator = wg_allocator_create())) if (!(transform->allocator = wg_allocator_create(transform_request_sample, transform)))
goto out; goto out;
transform->input_max_length = 1; transform->input_max_length = 1;
transform->output_plane_align = 0; transform->output_plane_align = 0;
...@@ -620,10 +635,22 @@ static bool copy_buffer(GstBuffer *buffer, GstCaps *caps, struct wg_sample *samp ...@@ -620,10 +635,22 @@ static bool copy_buffer(GstBuffer *buffer, GstCaps *caps, struct wg_sample *samp
static NTSTATUS read_transform_output_data(GstBuffer *buffer, GstCaps *caps, gsize plane_align, static NTSTATUS read_transform_output_data(GstBuffer *buffer, GstCaps *caps, gsize plane_align,
struct wg_sample *sample) struct wg_sample *sample)
{ {
bool ret, needs_copy;
gsize total_size; gsize total_size;
bool ret; GstMapInfo info;
if (!gst_buffer_map(buffer, &info, GST_MAP_READ))
{
GST_ERROR("Failed to map buffer %p", buffer);
sample->size = 0;
return STATUS_UNSUCCESSFUL;
}
needs_copy = info.data != sample->data;
gst_buffer_unmap(buffer, &info);
if (is_caps_video(caps)) if ((ret = !needs_copy))
total_size = sample->size = info.size;
else if (is_caps_video(caps))
ret = copy_video_buffer(buffer, caps, plane_align, sample, &total_size); ret = copy_video_buffer(buffer, caps, plane_align, sample, &total_size);
else else
ret = copy_buffer(buffer, caps, sample, &total_size); ret = copy_buffer(buffer, caps, sample, &total_size);
...@@ -655,7 +682,18 @@ static NTSTATUS read_transform_output_data(GstBuffer *buffer, GstCaps *caps, gsi ...@@ -655,7 +682,18 @@ static NTSTATUS read_transform_output_data(GstBuffer *buffer, GstCaps *caps, gsi
if (!GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT)) if (!GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT))
sample->flags |= WG_SAMPLE_FLAG_SYNC_POINT; sample->flags |= WG_SAMPLE_FLAG_SYNC_POINT;
GST_INFO("Copied %u bytes, sample %p, flags %#x", sample->size, sample, sample->flags); if (needs_copy)
{
if (is_caps_video(caps))
GST_WARNING("Copied %u bytes, sample %p, flags %#x", sample->size, sample, sample->flags);
else
GST_INFO("Copied %u bytes, sample %p, flags %#x", sample->size, sample, sample->flags);
}
else if (sample->flags & WG_SAMPLE_FLAG_INCOMPLETE)
GST_ERROR("Partial read %u bytes, sample %p, flags %#x", sample->size, sample, sample->flags);
else
GST_INFO("Read %u bytes, sample %p, flags %#x", sample->size, sample, sample->flags);
return STATUS_SUCCESS; return STATUS_SUCCESS;
} }
...@@ -665,23 +703,38 @@ NTSTATUS wg_transform_read_data(void *args) ...@@ -665,23 +703,38 @@ NTSTATUS wg_transform_read_data(void *args)
struct wg_transform *transform = params->transform; struct wg_transform *transform = params->transform;
struct wg_sample *sample = params->sample; struct wg_sample *sample = params->sample;
struct wg_format *format = params->format; struct wg_format *format = params->format;
GstBufferList *input = transform->input; GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *output_buffer; GstBuffer *output_buffer;
GstBufferList *input;
GstCaps *output_caps; GstCaps *output_caps;
GstFlowReturn ret; bool discard_data;
NTSTATUS status; NTSTATUS status;
/* Provide the sample for transform_request_sample to pick it up */
InterlockedIncrement(&sample->refcount);
InterlockedExchangePointer((void **)&transform->output_wg_sample, sample);
if (!gst_buffer_list_length(transform->input)) if (!gst_buffer_list_length(transform->input))
GST_DEBUG("Not input buffer queued"); GST_DEBUG("Not input buffer queued");
else if (!(transform->input = gst_buffer_list_new())) else if ((input = gst_buffer_list_new()))
{
ret = gst_pad_push_list(transform->my_src, transform->input);
transform->input = input;
}
else
{ {
GST_ERROR("Failed to allocate new input queue"); GST_ERROR("Failed to allocate new input queue");
gst_buffer_list_unref(input); ret = GST_FLOW_ERROR;
return STATUS_NO_MEMORY;
} }
else if ((ret = gst_pad_push_list(transform->my_src, input)))
/* Remove the sample so transform_request_sample cannot use it */
if (InterlockedExchangePointer((void **)&transform->output_wg_sample, NULL))
InterlockedDecrement(&sample->refcount);
if (ret)
{ {
GST_ERROR("Failed to push transform input, error %d", ret); GST_ERROR("Failed to push transform input, error %d", ret);
wg_allocator_release_sample(transform->allocator, sample, false);
return STATUS_UNSUCCESSFUL; return STATUS_UNSUCCESSFUL;
} }
...@@ -690,6 +743,7 @@ NTSTATUS wg_transform_read_data(void *args) ...@@ -690,6 +743,7 @@ NTSTATUS wg_transform_read_data(void *args)
sample->size = 0; sample->size = 0;
params->result = MF_E_TRANSFORM_NEED_MORE_INPUT; params->result = MF_E_TRANSFORM_NEED_MORE_INPUT;
GST_INFO("Cannot read %u bytes, no output available", sample->max_size); GST_INFO("Cannot read %u bytes, no output available", sample->max_size);
wg_allocator_release_sample(transform->allocator, sample, false);
return STATUS_SUCCESS; return STATUS_SUCCESS;
} }
...@@ -729,19 +783,41 @@ NTSTATUS wg_transform_read_data(void *args) ...@@ -729,19 +783,41 @@ NTSTATUS wg_transform_read_data(void *args)
params->result = MF_E_TRANSFORM_STREAM_CHANGE; params->result = MF_E_TRANSFORM_STREAM_CHANGE;
GST_INFO("Format changed detected, returning no output"); GST_INFO("Format changed detected, returning no output");
wg_allocator_release_sample(transform->allocator, sample, false);
return STATUS_SUCCESS; return STATUS_SUCCESS;
} }
if ((status = read_transform_output_data(output_buffer, output_caps, if ((status = read_transform_output_data(output_buffer, output_caps,
transform->output_plane_align, sample))) transform->output_plane_align, sample)))
{
wg_allocator_release_sample(transform->allocator, sample, false);
return status; return status;
}
if (!(sample->flags & WG_SAMPLE_FLAG_INCOMPLETE)) if (sample->flags & WG_SAMPLE_FLAG_INCOMPLETE)
discard_data = false;
else
{ {
/* Taint the buffer memory to make sure it cannot be reused by the buffer pool,
* for the pool to always requests new memory from the allocator, and so we can
* then always provide output sample memory to achieve zero-copy.
*
* However, some decoder keep a reference on the buffer they passed downstream,
* to re-use it later. In this case, it will not be possible to do zero-copy,
* and we should copy the data back to the buffer and leave it unchanged.
*
* Some other plugins make assumptions that the returned buffer will always have
* at least one memory attached, we cannot just remove it and need to replace the
* memory instead.
*/
if ((discard_data = gst_buffer_is_writable(output_buffer)))
gst_buffer_replace_all_memory(output_buffer, gst_allocator_alloc(NULL, 0, NULL));
gst_sample_unref(transform->output_sample); gst_sample_unref(transform->output_sample);
transform->output_sample = NULL; transform->output_sample = NULL;
} }
params->result = S_OK; params->result = S_OK;
wg_allocator_release_sample(transform->allocator, sample, discard_data);
return STATUS_SUCCESS; return STATUS_SUCCESS;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment