Commit dfd28aff authored by Daniel Lehman's avatar Daniel Lehman Committed by Alexandre Julliard

msvcrt: Implement Concurrency::event.

parent 4db4d61b
......@@ -550,19 +550,128 @@ unsigned int __cdecl _GetConcurrency(void)
return val;
}
#define EVT_RUNNING (void*)1
#define EVT_WAITING NULL
struct thread_wait;
typedef struct thread_wait_entry
{
struct thread_wait *wait;
struct thread_wait_entry *next;
struct thread_wait_entry *prev;
} thread_wait_entry;
typedef struct thread_wait
{
void *signaled;
int pending_waits;
thread_wait_entry entries[1];
} thread_wait;
typedef struct
{
volatile void *wait;
void *reset;
thread_wait_entry *waiters;
INT_PTR signaled;
critical_section cs;
} event;
static inline PLARGE_INTEGER evt_timeout(PLARGE_INTEGER pTime, unsigned int timeout)
{
if(timeout == COOPERATIVE_TIMEOUT_INFINITE) return NULL;
pTime->QuadPart = (ULONGLONG)timeout * -10000;
return pTime;
}
static void evt_add_queue(thread_wait_entry **head, thread_wait_entry *entry)
{
if(*head) {
entry->next = *head;
entry->prev = (*head)->prev;
(*head)->prev->next = entry;
(*head)->prev = entry;
} else {
entry->next = entry;
entry->prev = entry;
*head = entry;
}
}
static void evt_remove(thread_wait_entry **head, thread_wait_entry *entry)
{
entry->next->prev = entry->prev;
entry->prev->next = entry->next;
if(*head == entry)
*head = entry->next == entry ? NULL : entry->next;
}
static MSVCRT_size_t evt_end_wait(thread_wait *wait, event **events, int count)
{
MSVCRT_size_t i, ret = COOPERATIVE_WAIT_TIMEOUT;
for(i = 0; i < count; i++) {
critical_section_lock(&events[i]->cs);
if(events[i] == wait->signaled) ret = i;
evt_remove(&events[i]->waiters, &wait->entries[i]);
critical_section_unlock(&events[i]->cs);
}
return ret;
}
static inline int evt_transition(void **state, void *from, void *to)
{
return InterlockedCompareExchangePointer(state, to, from) == from;
}
static MSVCRT_size_t evt_wait(thread_wait *wait, event **events, int count, MSVCRT_bool wait_all, unsigned int timeout)
{
int i;
NTSTATUS status;
LARGE_INTEGER ntto;
wait->signaled = EVT_RUNNING;
wait->pending_waits = wait_all ? count : 1;
for(i = 0; i < count; i++) {
wait->entries[i].wait = wait;
critical_section_lock(&events[i]->cs);
evt_add_queue(&events[i]->waiters, &wait->entries[i]);
if(events[i]->signaled) {
if(!InterlockedDecrement(&wait->pending_waits)) {
wait->signaled = events[i];
critical_section_unlock(&events[i]->cs);
return evt_end_wait(wait, events, i+1);
}
}
critical_section_unlock(&events[i]->cs);
}
if(!timeout)
return evt_end_wait(wait, events, count);
if(!evt_transition(&wait->signaled, EVT_RUNNING, EVT_WAITING))
return evt_end_wait(wait, events, count);
status = NtWaitForKeyedEvent(keyed_event, wait, 0, evt_timeout(&ntto, timeout));
if(status && !evt_transition(&wait->signaled, EVT_WAITING, EVT_RUNNING))
NtWaitForKeyedEvent(keyed_event, wait, 0, NULL);
return evt_end_wait(wait, events, count);
}
/* ??0event@Concurrency@@QAE@XZ */
/* ??0event@Concurrency@@QEAA@XZ */
DEFINE_THISCALL_WRAPPER(event_ctor, 4)
event* __thiscall event_ctor(event *this)
{
FIXME("(%p) stub\n", this);
TRACE("(%p)\n", this);
this->waiters = NULL;
this->signaled = FALSE;
critical_section_ctor(&this->cs);
return this;
}
......@@ -571,7 +680,10 @@ event* __thiscall event_ctor(event *this)
DEFINE_THISCALL_WRAPPER(event_dtor, 4)
void __thiscall event_dtor(event *this)
{
FIXME("(%p) stub\n", this);
TRACE("(%p)\n", this);
critical_section_dtor(&this->cs);
if(this->waiters)
ERR("there's a wait on destroyed event\n");
}
/* ?reset@event@Concurrency@@QAEXXZ */
......@@ -579,7 +691,22 @@ void __thiscall event_dtor(event *this)
DEFINE_THISCALL_WRAPPER(event_reset, 4)
void __thiscall event_reset(event *this)
{
FIXME("(%p) stub\n", this);
thread_wait_entry *entry;
TRACE("(%p)\n", this);
critical_section_lock(&this->cs);
if(this->signaled) {
this->signaled = FALSE;
if(this->waiters) {
entry = this->waiters;
do {
InterlockedIncrement(&entry->wait->pending_waits);
entry = entry->next;
} while (entry != this->waiters);
}
}
critical_section_unlock(&this->cs);
}
/* ?set@event@Concurrency@@QAEXXZ */
......@@ -587,26 +714,65 @@ void __thiscall event_reset(event *this)
DEFINE_THISCALL_WRAPPER(event_set, 4)
void __thiscall event_set(event *this)
{
FIXME("(%p) stub\n", this);
thread_wait_entry *entry;
TRACE("(%p)\n", this);
critical_section_lock(&this->cs);
if(!this->signaled) {
this->signaled = TRUE;
if(this->waiters) {
entry = this->waiters;
do {
if(!InterlockedDecrement(&entry->wait->pending_waits)) {
if(InterlockedExchangePointer(&entry->wait->signaled, this) == EVT_WAITING)
NtReleaseKeyedEvent(keyed_event, entry->wait, 0, NULL);
}
entry = entry->next;
} while (entry != this->waiters);
}
}
critical_section_unlock(&this->cs);
}
/* ?wait@event@Concurrency@@QAEII@Z */
/* ?wait@event@Concurrency@@QEAA_KI@Z */
DEFINE_THISCALL_WRAPPER(event_wait, 8)
size_t __thiscall event_wait(event *this, unsigned int timeout)
MSVCRT_size_t __thiscall event_wait(event *this, unsigned int timeout)
{
FIXME("(%p %u) stub\n", this, timeout);
return COOPERATIVE_WAIT_TIMEOUT;
thread_wait wait;
MSVCRT_size_t signaled;
TRACE("(%p %u)\n", this, timeout);
critical_section_lock(&this->cs);
signaled = this->signaled;
critical_section_unlock(&this->cs);
if(!timeout) return signaled ? 0 : COOPERATIVE_WAIT_TIMEOUT;
return signaled ? 0 : evt_wait(&wait, &this, 1, FALSE, timeout);
}
/* ?wait_for_multiple@event@Concurrency@@SAIPAPAV12@I_NI@Z */
/* ?wait_for_multiple@event@Concurrency@@SA_KPEAPEAV12@_K_NI@Z */
int __cdecl event_wait_for_multiple(event **events, MSVCRT_size_t count, MSVCRT_bool wait_all, unsigned int timeout)
{
FIXME("(%p %ld %d %u) stub\n", events, count, wait_all, timeout);
return COOPERATIVE_WAIT_TIMEOUT;
}
thread_wait *wait;
MSVCRT_size_t ret;
TRACE("(%p %ld %d %u)\n", events, count, wait_all, timeout);
if(count == 0)
return 0;
wait = heap_alloc(FIELD_OFFSET(thread_wait, entries[count]));
if(!wait)
throw_bad_alloc("bad allocation");
ret = evt_wait(wait, events, count, wait_all, timeout);
heap_free(wait);
return ret;
}
#endif
#if _MSVCR_VER >= 110
......
......@@ -1181,6 +1181,7 @@ extern char* __cdecl __unDName(char *,const char*,int,malloc_func_t,free_func_t,
#define UCRTBASE_SCANF_MASK (0x0007)
#define COOPERATIVE_TIMEOUT_INFINITE ((unsigned int)-1)
#define COOPERATIVE_WAIT_TIMEOUT ~0
typedef enum {
......@@ -1380,4 +1381,18 @@ typedef struct {
_FPIEEE_VALUE Result;
} _FPIEEE_RECORD, *_PFPIEEE_RECORD;
static inline void* __WINE_ALLOC_SIZE(1) heap_alloc(size_t len)
{
return HeapAlloc(GetProcessHeap(), 0, len);
}
static inline void* __WINE_ALLOC_SIZE(1) heap_alloc_zero(size_t len)
{
return HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, len);
}
static inline BOOL heap_free(void *mem)
{
return HeapFree(GetProcessHeap(), 0, mem);
}
#endif /* __WINE_MSVCRT_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment