Commit a14997ff authored by Max Kellermann's avatar Max Kellermann

event/Loop: manage all SocketEvents in a linked list

Not only those which are "ready".
parent dd94f975
...@@ -67,6 +67,8 @@ EventLoop::~EventLoop() noexcept ...@@ -67,6 +67,8 @@ EventLoop::~EventLoop() noexcept
{ {
assert(idle.empty()); assert(idle.empty());
assert(timers.empty()); assert(timers.empty());
assert(sockets.empty());
assert(ready_sockets.empty());
} }
#ifdef HAVE_URING #ifdef HAVE_URING
...@@ -117,7 +119,11 @@ EventLoop::AddFD(int fd, unsigned events, SocketEvent &event) noexcept ...@@ -117,7 +119,11 @@ EventLoop::AddFD(int fd, unsigned events, SocketEvent &event) noexcept
assert(!IsAlive() || IsInside()); assert(!IsAlive() || IsInside());
#endif #endif
return poll_group.Add(fd, events, &event); if (!poll_group.Add(fd, events, &event))
return false;
sockets.push_back(event);
return true;
} }
bool bool
...@@ -131,12 +137,13 @@ EventLoop::ModifyFD(int fd, unsigned events, SocketEvent &event) noexcept ...@@ -131,12 +137,13 @@ EventLoop::ModifyFD(int fd, unsigned events, SocketEvent &event) noexcept
} }
bool bool
EventLoop::RemoveFD(int fd) noexcept EventLoop::RemoveFD(int fd, SocketEvent &event) noexcept
{ {
#ifdef HAVE_THREADED_EVENT_LOOP #ifdef HAVE_THREADED_EVENT_LOOP
assert(!IsAlive() || IsInside()); assert(!IsAlive() || IsInside());
#endif #endif
event.unlink();
return poll_group.Remove(fd); return poll_group.Remove(fd);
} }
...@@ -213,11 +220,13 @@ EventLoop::Wait(Event::Duration timeout) noexcept ...@@ -213,11 +220,13 @@ EventLoop::Wait(Event::Duration timeout) noexcept
const auto poll_result = const auto poll_result =
poll_group.ReadEvents(ExportTimeoutMS(timeout)); poll_group.ReadEvents(ExportTimeoutMS(timeout));
ready_sockets.clear();
for (size_t i = 0; i < poll_result.GetSize(); ++i) { for (size_t i = 0; i < poll_result.GetSize(); ++i) {
auto &s = *(SocketEvent *)poll_result.GetObject(i); auto &socket_event = *(SocketEvent *)poll_result.GetObject(i);
s.SetReadyFlags(poll_result.GetEvents(i)); socket_event.SetReadyFlags(poll_result.GetEvents(i));
ready_sockets.push_back(s);
/* move from "sockets" to "ready_sockets" */
socket_event.unlink();
ready_sockets.push_back(socket_event);
} }
return poll_result.GetSize() > 0; return poll_result.GetSize() > 0;
...@@ -309,10 +318,13 @@ EventLoop::Run() noexcept ...@@ -309,10 +318,13 @@ EventLoop::Run() noexcept
/* invoke sockets */ /* invoke sockets */
while (!ready_sockets.empty() && !quit) { while (!ready_sockets.empty() && !quit) {
auto &sm = ready_sockets.front(); auto &socket_event = ready_sockets.front();
ready_sockets.pop_front();
/* move from "ready_sockets" back to "sockets" */
socket_event.unlink();
sockets.push_back(socket_event);
sm.Dispatch(); socket_event.Dispatch();
} }
} while (!quit); } while (!quit);
......
...@@ -91,18 +91,22 @@ class EventLoop final ...@@ -91,18 +91,22 @@ class EventLoop final
DeferredList deferred; DeferredList deferred;
#endif #endif
using ReadySocketList = using SocketList =
boost::intrusive::list<SocketEvent, boost::intrusive::list<SocketEvent,
boost::intrusive::member_hook<SocketEvent, boost::intrusive::base_hook<boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>>,
SocketEvent::ReadyListHook,
&SocketEvent::ready_siblings>,
boost::intrusive::constant_time_size<false>>; boost::intrusive::constant_time_size<false>>;
/** /**
* A list of scheduled #SocketEvent instances, without those
* which are ready (these are in #ready_sockets).
*/
SocketList sockets;
/**
* A linked list of #SocketEvent instances which have a * A linked list of #SocketEvent instances which have a
* non-zero "ready_flags" field, and need to be dispatched. * non-zero "ready_flags" field, and need to be dispatched.
*/ */
ReadySocketList ready_sockets; SocketList ready_sockets;
#ifdef HAVE_URING #ifdef HAVE_URING
std::unique_ptr<Uring::Manager> uring; std::unique_ptr<Uring::Manager> uring;
...@@ -188,7 +192,7 @@ public: ...@@ -188,7 +192,7 @@ public:
bool AddFD(int fd, unsigned events, SocketEvent &event) noexcept; bool AddFD(int fd, unsigned events, SocketEvent &event) noexcept;
bool ModifyFD(int fd, unsigned events, SocketEvent &event) noexcept; bool ModifyFD(int fd, unsigned events, SocketEvent &event) noexcept;
bool RemoveFD(int fd) noexcept; bool RemoveFD(int fd, SocketEvent &event) noexcept;
/** /**
* Remove the given #SocketEvent after the file descriptor * Remove the given #SocketEvent after the file descriptor
......
...@@ -69,7 +69,7 @@ SocketEvent::Schedule(unsigned flags) noexcept ...@@ -69,7 +69,7 @@ SocketEvent::Schedule(unsigned flags) noexcept
if (scheduled_flags == 0) if (scheduled_flags == 0)
success = loop.AddFD(fd.Get(), flags, *this); success = loop.AddFD(fd.Get(), flags, *this);
else if (flags == 0) else if (flags == 0)
success = loop.RemoveFD(fd.Get()); success = loop.RemoveFD(fd.Get(), *this);
else else
success = loop.ModifyFD(fd.Get(), flags, *this); success = loop.ModifyFD(fd.Get(), flags, *this);
......
...@@ -45,14 +45,11 @@ class EventLoop; ...@@ -45,14 +45,11 @@ class EventLoop;
* thread that runs the #EventLoop, except where explicitly documented * thread that runs the #EventLoop, except where explicitly documented
* as thread-safe. * as thread-safe.
*/ */
class SocketEvent { class SocketEvent final : public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
friend class EventLoop; friend class EventLoop;
EventLoop &loop; EventLoop &loop;
using ReadyListHook = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
ReadyListHook ready_siblings;
using Callback = BoundMethod<void(unsigned events) noexcept>; using Callback = BoundMethod<void(unsigned events) noexcept>;
const Callback callback; const Callback callback;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment