Commit e3b048c1 authored by Nikolay Sivov's avatar Nikolay Sivov Committed by Alexandre Julliard

mfplat: Add support for work item priority.

parent 1654116d
......@@ -123,7 +123,9 @@
@ stdcall MFLockWorkQueue(long)
@ stdcall MFPutWaitingWorkItem(long long ptr ptr)
@ stdcall MFPutWorkItem(long ptr ptr)
@ stdcall MFPutWorkItem2(long long ptr ptr)
@ stdcall MFPutWorkItemEx(long ptr)
@ stdcall MFPutWorkItemEx2(long long ptr)
@ stub MFRecordError
@ stdcall MFRemovePeriodicCallback(long)
@ stdcall MFScheduleWorkItem(ptr ptr int64 ptr)
......
......@@ -59,10 +59,17 @@ struct work_item
} u;
};
static const TP_CALLBACK_PRIORITY priorities[] =
{
TP_CALLBACK_PRIORITY_HIGH,
TP_CALLBACK_PRIORITY_NORMAL,
TP_CALLBACK_PRIORITY_LOW,
};
struct queue
{
TP_POOL *pool;
TP_CALLBACK_ENVIRON_V3 env;
TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
CRITICAL_SECTION cs;
struct list pending_items;
};
......@@ -160,16 +167,23 @@ static void release_work_item(struct work_item *item)
static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *queue)
{
unsigned int max_thread;
TP_CALLBACK_ENVIRON_V3 env;
unsigned int max_thread, i;
queue->pool = CreateThreadpool(NULL);
memset(&queue->env, 0, sizeof(queue->env));
queue->env.Version = 3;
queue->env.Size = sizeof(queue->env);
queue->env.Pool = queue->pool;
queue->env.CleanupGroup = CreateThreadpoolCleanupGroup();
queue->env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
queue->env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
memset(&env, 0, sizeof(env));
env.Version = 3;
env.Size = sizeof(env);
env.Pool = queue->pool;
env.CleanupGroup = CreateThreadpoolCleanupGroup();
env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
{
queue->envs[i] = env;
queue->envs[i].CallbackPriority = priorities[i];
}
list_init(&queue->pending_items);
InitializeCriticalSection(&queue->cs);
......@@ -267,7 +281,7 @@ static void shutdown_queue(struct queue *queue)
if (!queue->pool)
return;
CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL);
CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
CloseThreadpool(queue->pool);
queue->pool = NULL;
......@@ -339,15 +353,23 @@ static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void
release_work_item(item);
}
static HRESULT queue_submit_item(struct queue *queue, IMFAsyncResult *result)
static HRESULT queue_submit_item(struct queue *queue, LONG priority, IMFAsyncResult *result)
{
TP_CALLBACK_PRIORITY callback_priority;
struct work_item *item;
TP_WORK *work_object;
if (!(item = alloc_work_item(queue, result)))
return E_OUTOFMEMORY;
work_object = CreateThreadpoolWork(standard_queue_worker, item, (TP_CALLBACK_ENVIRON *)&queue->env);
if (priority == 0)
callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
else if (priority < 0)
callback_priority = TP_CALLBACK_PRIORITY_LOW;
else
callback_priority = TP_CALLBACK_PRIORITY_HIGH;
work_object = CreateThreadpoolWork(standard_queue_worker, item,
(TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
SubmitThreadpoolWork(work_object);
TRACE("dispatched %p.\n", result);
......@@ -355,7 +377,7 @@ static HRESULT queue_submit_item(struct queue *queue, IMFAsyncResult *result)
return S_OK;
}
static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result)
static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IMFAsyncResult *result)
{
struct queue *queue;
HRESULT hr;
......@@ -363,7 +385,7 @@ static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result)
if (FAILED(hr = grab_queue(queue_id, &queue)))
return hr;
hr = queue_submit_item(queue, result);
hr = queue_submit_item(queue, priority, result);
return hr;
}
......@@ -380,7 +402,7 @@ static HRESULT invoke_async_callback(IMFAsyncResult *result)
if (FAILED(lock_user_queue(queue)))
queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
hr = queue_put_work_item(queue, result);
hr = queue_put_work_item(queue, 0, result);
unlock_user_queue(queue);
......@@ -486,7 +508,8 @@ static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priorit
else
callback = waiting_item_callback;
item->u.wait_object = CreateThreadpoolWait(callback, item, (TP_CALLBACK_ENVIRON *)&queue->env);
item->u.wait_object = CreateThreadpoolWait(callback, item,
(TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]);
SetThreadpoolWait(item->u.wait_object, event, NULL);
TRACE("dispatched %p.\n", result);
......@@ -519,7 +542,8 @@ static HRESULT queue_submit_timer(struct queue *queue, IMFAsyncResult *result, I
filetime.dwLowDateTime = t.u.LowPart;
filetime.dwHighDateTime = t.u.HighPart;
item->u.timer_object = CreateThreadpoolTimer(callback, item, (TP_CALLBACK_ENVIRON *)&queue->env);
item->u.timer_object = CreateThreadpoolTimer(callback, item,
(TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]);
SetThreadpoolTimer(item->u.timer_object, &filetime, period, 0);
TRACE("dispatched %p.\n", result);
......@@ -828,7 +852,27 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown *
if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result)))
return hr;
hr = queue_put_work_item(queue, result);
hr = queue_put_work_item(queue, 0, result);
IMFAsyncResult_Release(result);
return hr;
}
/***********************************************************************
* MFPutWorkItem2 (mfplat.@)
*/
HRESULT WINAPI MFPutWorkItem2(DWORD queue, LONG priority, IMFAsyncCallback *callback, IUnknown *state)
{
IMFAsyncResult *result;
HRESULT hr;
TRACE("%#x, %d, %p, %p.\n", queue, priority, callback, state);
if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result)))
return hr;
hr = queue_put_work_item(queue, priority, result);
IMFAsyncResult_Release(result);
......@@ -842,7 +886,17 @@ HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result)
{
TRACE("%#x, %p\n", queue, result);
return queue_put_work_item(queue, result);
return queue_put_work_item(queue, 0, result);
}
/***********************************************************************
* MFPutWorkItemEx2 (mfplat.@)
*/
HRESULT WINAPI MFPutWorkItemEx2(DWORD queue, LONG priority, IMFAsyncResult *result)
{
TRACE("%#x, %d, %p\n", queue, priority, result);
return queue_put_work_item(queue, priority, result);
}
/***********************************************************************
......
......@@ -173,7 +173,9 @@ HRESULT WINAPI MFTEnumEx(GUID category, UINT32 flags, const MFT_REGISTER_TYPE_IN
HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result);
HRESULT WINAPI MFLockPlatform(void);
HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown *state);
HRESULT WINAPI MFPutWorkItem2(DWORD queue, LONG priority, IMFAsyncCallback *callback, IUnknown *state);
HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result);
HRESULT WINAPI MFPutWorkItemEx2(DWORD queue, LONG priority, IMFAsyncResult *result);
HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, INT64 timeout, MFWORKITEM_KEY *key);
HRESULT WINAPI MFScheduleWorkItemEx(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key);
HRESULT WINAPI MFTRegister(CLSID clsid, GUID category, LPWSTR name, UINT32 flags, UINT32 cinput,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment