Commit a357d84d authored by Max Kellermann's avatar Max Kellermann

event/DeferredMonitor: make fully thread-safe

Instead of creating a new eventfd for each DeferredMonitor instance, reuse EventLoop's eventfd, and add a std::list to EventLoop that manages the list of pending DeferredMonitors. This std::list is protected by the same mutex as the "calls" list. The bottom line is: reduced overhead because the per-instance eventfd was eliminated, slightly added overhead due to Mutex usage (but negligible), and we're thread-safe now. This subsystem is now good enough to replace EventLoop::AddCall().
parent 48c96bba
...@@ -25,7 +25,7 @@ void ...@@ -25,7 +25,7 @@ void
DeferredMonitor::Cancel() DeferredMonitor::Cancel()
{ {
#ifdef USE_INTERNAL_EVENTLOOP #ifdef USE_INTERNAL_EVENTLOOP
pending = false; loop.RemoveDeferred(*this);
#endif #endif
#ifdef USE_GLIB_EVENTLOOP #ifdef USE_GLIB_EVENTLOOP
const auto id = source_id.exchange(0); const auto id = source_id.exchange(0);
...@@ -38,8 +38,7 @@ void ...@@ -38,8 +38,7 @@ void
DeferredMonitor::Schedule() DeferredMonitor::Schedule()
{ {
#ifdef USE_INTERNAL_EVENTLOOP #ifdef USE_INTERNAL_EVENTLOOP
if (!pending.exchange(true)) loop.AddDeferred(*this);
fd.Write();
#endif #endif
#ifdef USE_GLIB_EVENTLOOP #ifdef USE_GLIB_EVENTLOOP
const unsigned id = loop.AddIdle(Callback, this); const unsigned id = loop.AddIdle(Callback, this);
...@@ -49,21 +48,6 @@ DeferredMonitor::Schedule() ...@@ -49,21 +48,6 @@ DeferredMonitor::Schedule()
#endif #endif
} }
#ifdef USE_INTERNAL_EVENTLOOP
bool
DeferredMonitor::OnSocketReady(unsigned)
{
fd.Read();
if (pending.exchange(false))
RunDeferred();
return true;
}
#endif
#ifdef USE_GLIB_EVENTLOOP #ifdef USE_GLIB_EVENTLOOP
void void
......
...@@ -23,11 +23,6 @@ ...@@ -23,11 +23,6 @@
#include "check.h" #include "check.h"
#include "Compiler.h" #include "Compiler.h"
#ifdef USE_INTERNAL_EVENTLOOP
#include "SocketMonitor.hxx"
#include "WakeFD.hxx"
#endif
#ifdef USE_GLIB_EVENTLOOP #ifdef USE_GLIB_EVENTLOOP
#include <glib.h> #include <glib.h>
#endif #endif
...@@ -39,31 +34,24 @@ class EventLoop; ...@@ -39,31 +34,24 @@ class EventLoop;
/** /**
* Defer execution of an event into an #EventLoop. * Defer execution of an event into an #EventLoop.
* *
* This class is thread-safe, however the constructor must be called * This class is thread-safe.
* from the thread that runs the #EventLoop
*/ */
class DeferredMonitor class DeferredMonitor {
#ifdef USE_INTERNAL_EVENTLOOP EventLoop &loop;
: private SocketMonitor
#endif
{
#ifdef USE_INTERNAL_EVENTLOOP #ifdef USE_INTERNAL_EVENTLOOP
std::atomic_bool pending; friend class EventLoop;
WakeFD fd; bool pending;
#endif #endif
#ifdef USE_GLIB_EVENTLOOP #ifdef USE_GLIB_EVENTLOOP
EventLoop &loop;
std::atomic<guint> source_id; std::atomic<guint> source_id;
#endif #endif
public: public:
#ifdef USE_INTERNAL_EVENTLOOP #ifdef USE_INTERNAL_EVENTLOOP
DeferredMonitor(EventLoop &_loop) DeferredMonitor(EventLoop &_loop)
:SocketMonitor(_loop), pending(false) { :loop(_loop), pending(false) {}
SocketMonitor::Open(fd.Get());
SocketMonitor::Schedule(SocketMonitor::READ);
}
#endif #endif
#ifdef USE_GLIB_EVENTLOOP #ifdef USE_GLIB_EVENTLOOP
...@@ -72,24 +60,11 @@ public: ...@@ -72,24 +60,11 @@ public:
#endif #endif
~DeferredMonitor() { ~DeferredMonitor() {
#ifdef USE_INTERNAL_EVENTLOOP
/* avoid closing the WakeFD twice */
SocketMonitor::Steal();
#endif
#ifdef USE_GLIB_EVENTLOOP
Cancel(); Cancel();
#endif
} }
EventLoop &GetEventLoop() { EventLoop &GetEventLoop() {
#ifdef USE_INTERNAL_EVENTLOOP
return SocketMonitor::GetEventLoop();
#endif
#ifdef USE_GLIB_EVENTLOOP
return loop; return loop;
#endif
} }
void Schedule(); void Schedule();
...@@ -99,10 +74,6 @@ protected: ...@@ -99,10 +74,6 @@ protected:
virtual void RunDeferred() = 0; virtual void RunDeferred() = 0;
private: private:
#ifdef USE_INTERNAL_EVENTLOOP
virtual bool OnSocketReady(unsigned flags) override final;
#endif
#ifdef USE_GLIB_EVENTLOOP #ifdef USE_GLIB_EVENTLOOP
void Run(); void Run();
static gboolean Callback(gpointer data); static gboolean Callback(gpointer data);
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "TimeoutMonitor.hxx" #include "TimeoutMonitor.hxx"
#include "SocketMonitor.hxx" #include "SocketMonitor.hxx"
#include "IdleMonitor.hxx" #include "IdleMonitor.hxx"
#include "DeferredMonitor.hxx"
#include <algorithm> #include <algorithm>
...@@ -204,6 +205,44 @@ EventLoop::AddCall(std::function<void()> &&f) ...@@ -204,6 +205,44 @@ EventLoop::AddCall(std::function<void()> &&f)
wake_fd.Write(); wake_fd.Write();
} }
void
EventLoop::AddDeferred(DeferredMonitor &d)
{
mutex.lock();
if (d.pending) {
mutex.unlock();
return;
}
assert(std::find(deferred.begin(),
deferred.end(), &d) == deferred.end());
d.pending = true;
deferred.push_back(&d);
mutex.unlock();
wake_fd.Write();
}
void
EventLoop::RemoveDeferred(DeferredMonitor &d)
{
const ScopeLock protect(mutex);
if (!d.pending) {
assert(std::find(deferred.begin(),
deferred.end(), &d) == deferred.end());
return;
}
d.pending = false;
auto i = std::find(deferred.begin(), deferred.end(), &d);
assert(i != deferred.end());
deferred.erase(i);
}
bool bool
EventLoop::OnSocketReady(gcc_unused unsigned flags) EventLoop::OnSocketReady(gcc_unused unsigned flags)
{ {
...@@ -213,6 +252,18 @@ EventLoop::OnSocketReady(gcc_unused unsigned flags) ...@@ -213,6 +252,18 @@ EventLoop::OnSocketReady(gcc_unused unsigned flags)
mutex.lock(); mutex.lock();
while (!deferred.empty() && !quit) {
DeferredMonitor &m = *deferred.front();
assert(m.pending);
deferred.pop_front();
m.pending = false;
mutex.unlock();
m.RunDeferred();
mutex.lock();
}
while (!calls.empty() && !quit) { while (!calls.empty() && !quit) {
auto f = std::move(calls.front()); auto f = std::move(calls.front());
calls.pop_front(); calls.pop_front();
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#ifdef USE_INTERNAL_EVENTLOOP #ifdef USE_INTERNAL_EVENTLOOP
class TimeoutMonitor; class TimeoutMonitor;
class IdleMonitor; class IdleMonitor;
class DeferredMonitor;
class SocketMonitor; class SocketMonitor;
#endif #endif
...@@ -91,6 +92,7 @@ class EventLoop final ...@@ -91,6 +92,7 @@ class EventLoop final
Mutex mutex; Mutex mutex;
std::list<std::function<void()>> calls; std::list<std::function<void()>> calls;
std::list<DeferredMonitor *> deferred;
unsigned now_ms; unsigned now_ms;
...@@ -162,6 +164,21 @@ public: ...@@ -162,6 +164,21 @@ public:
void AddCall(std::function<void()> &&f); void AddCall(std::function<void()> &&f);
/** /**
* Schedule a call to DeferredMonitor::RunDeferred().
*
* This method is thread-safe.
*/
void AddDeferred(DeferredMonitor &d);
/**
* Cancel a pending call to DeferredMonitor::RunDeferred().
* However after returning, the call may still be running.
*
* This method is thread-safe.
*/
void RemoveDeferred(DeferredMonitor &d);
/**
* The main function of this class. It will loop until * The main function of this class. It will loop until
* Break() gets called. Can be called only once. * Break() gets called. Can be called only once.
*/ */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment