Commit c0759107 authored by Nikolay Sivov's avatar Nikolay Sivov Committed by Alexandre Julliard

mfplat: Add basic support for submitting work items.

parent 3a433af2
...@@ -33,9 +33,16 @@ WINE_DEFAULT_DEBUG_CHANNEL(mfplat); ...@@ -33,9 +33,16 @@ WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
#define FIRST_USER_QUEUE_HANDLE 5 #define FIRST_USER_QUEUE_HANDLE 5
#define MAX_USER_QUEUE_HANDLES 124 #define MAX_USER_QUEUE_HANDLES 124
struct work_item
{
LONG refcount;
IMFAsyncResult *result;
};
struct queue struct queue
{ {
TP_POOL *pool; TP_POOL *pool;
TP_CALLBACK_ENVIRON env;
}; };
struct queue_handle struct queue_handle
...@@ -86,9 +93,104 @@ enum system_queue_index ...@@ -86,9 +93,104 @@ enum system_queue_index
static struct queue system_queues[SYS_QUEUE_COUNT]; static struct queue system_queues[SYS_QUEUE_COUNT];
static struct queue *get_system_queue(DWORD queue_id)
{
switch (queue_id)
{
case MFASYNC_CALLBACK_QUEUE_STANDARD:
case MFASYNC_CALLBACK_QUEUE_RT:
case MFASYNC_CALLBACK_QUEUE_IO:
case MFASYNC_CALLBACK_QUEUE_TIMER:
case MFASYNC_CALLBACK_QUEUE_MULTITHREADED:
case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION:
return &system_queues[queue_id - 1];
default:
return NULL;
}
}
static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
{
}
static struct work_item * alloc_work_item(struct queue *queue, IMFAsyncResult *result)
{
struct work_item *item;
item = heap_alloc_zero(sizeof(*item));
item->result = result;
IMFAsyncResult_AddRef(item->result);
item->refcount = 1;
return item;
}
static void release_work_item(struct work_item *item)
{
if (InterlockedDecrement(&item->refcount) == 0)
{
IMFAsyncResult_Release(item->result);
heap_free(item);
}
}
static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *queue) static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *queue)
{ {
unsigned int max_thread;
queue->pool = CreateThreadpool(NULL); queue->pool = CreateThreadpool(NULL);
queue->env.Version = 1;
queue->env.Pool = queue->pool;
queue->env.CleanupGroup = CreateThreadpoolCleanupGroup();
queue->env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
max_thread = (queue_type == MF_STANDARD_WORKQUEUE || queue_type == MF_WINDOW_WORKQUEUE) ? 1 : 4;
SetThreadpoolThreadMinimum(queue->pool, 1);
SetThreadpoolThreadMaximum(queue->pool, max_thread);
if (queue_type == MF_WINDOW_WORKQUEUE)
FIXME("MF_WINDOW_WORKQUEUE is not supported.\n");
}
static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
{
struct queue *queue = get_system_queue(queue_id);
MFASYNC_WORKQUEUE_TYPE queue_type;
struct queue_handle *entry;
if (!system_queues[SYS_QUEUE_STANDARD].pool)
return MF_E_SHUTDOWN;
if (queue && queue->pool)
{
*ret = queue;
return S_OK;
}
else if (queue)
{
EnterCriticalSection(&queues_section);
switch (queue_id)
{
case MFASYNC_CALLBACK_QUEUE_IO:
case MFASYNC_CALLBACK_QUEUE_MULTITHREADED:
case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION:
queue_type = MF_MULTITHREADED_WORKQUEUE;
break;
default:
queue_type = MF_STANDARD_WORKQUEUE;
}
init_work_queue(queue_type, queue);
LeaveCriticalSection(&queues_section);
*ret = queue;
return S_OK;
}
/* Handles user queues. */
if ((entry = get_queue_obj(queue_id)))
*ret = entry->obj;
return *ret ? S_OK : MF_E_INVALID_WORKQUEUE;
} }
void init_system_queues(void) void init_system_queues(void)
...@@ -113,6 +215,7 @@ static void shutdown_queue(struct queue *queue) ...@@ -113,6 +215,7 @@ static void shutdown_queue(struct queue *queue)
if (!queue->pool) if (!queue->pool)
return; return;
CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL);
CloseThreadpool(queue->pool); CloseThreadpool(queue->pool);
queue->pool = NULL; queue->pool = NULL;
} }
...@@ -131,6 +234,47 @@ void shutdown_system_queues(void) ...@@ -131,6 +234,47 @@ void shutdown_system_queues(void)
LeaveCriticalSection(&queues_section); LeaveCriticalSection(&queues_section);
} }
static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
{
struct work_item *item = context;
MFASYNCRESULT *result = (MFASYNCRESULT *)item->result;
TRACE("result object %p.\n", result);
IMFAsyncCallback_Invoke(result->pCallback, item->result);
release_work_item(item);
}
static HRESULT queue_submit_item(struct queue *queue, IMFAsyncResult *result)
{
struct work_item *item;
TP_WORK *work_object;
if (!(item = alloc_work_item(queue, result)))
return E_OUTOFMEMORY;
work_object = CreateThreadpoolWork(standard_queue_worker, item, &queue->env);
SubmitThreadpoolWork(work_object);
TRACE("dispatched %p.\n", result);
return S_OK;
}
static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result)
{
struct queue *queue;
HRESULT hr;
if (FAILED(hr = grab_queue(queue_id, &queue)))
return hr;
hr = queue_submit_item(queue, result);
return hr;
}
static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id) static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id)
{ {
struct queue_handle *entry; struct queue_handle *entry;
...@@ -440,7 +584,7 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown * ...@@ -440,7 +584,7 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown *
if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result))) if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result)))
return hr; return hr;
hr = MFPutWorkItemEx(queue, result); hr = queue_put_work_item(queue, result);
IMFAsyncResult_Release(result); IMFAsyncResult_Release(result);
...@@ -452,9 +596,9 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown * ...@@ -452,9 +596,9 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown *
*/ */
HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result) HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result)
{ {
FIXME("%#x, %p\n", queue, result); TRACE("%#x, %p\n", queue, result);
return E_NOTIMPL; return queue_put_work_item(queue, result);
} }
/*********************************************************************** /***********************************************************************
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment