Loop.cxx 6.13 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2020 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#include "Loop.hxx"
21
#include "TimerEvent.hxx"
22 23
#include "SocketMonitor.hxx"
#include "IdleMonitor.hxx"
24
#include "DeferEvent.hxx"
25
#include "util/ScopeExit.hxx"
26

27 28 29 30 31 32
#ifdef HAVE_URING
#include "UringManager.hxx"
#include "util/PrintException.hxx"
#include <stdio.h>
#endif

33 34 35 36 37 38 39
constexpr bool
EventLoop::TimerCompare::operator()(const TimerEvent &a,
				    const TimerEvent &b) const noexcept
{
	return a.due < b.due;
}

40
EventLoop::EventLoop(ThreadId _thread)
41
	:SocketMonitor(*this),
42 43 44 45 46 47 48
	 /* if this instance is hosted by an EventThread (no ThreadId
	    known yet) then we're not yet alive until the thread is
	    started; for the main EventLoop instance, we assume it's
	    already alive, because nobody but EventThread will call
	    SetAlive() */
	 alive(!_thread.IsNull()),
	 quit(false),
49
	 thread(_thread)
50
{
51
	SocketMonitor::Open(SocketDescriptor(wake_fd.Get()));
52 53
}

54
EventLoop::~EventLoop() noexcept
55 56 57 58 59
{
	assert(idle.empty());
	assert(timers.empty());
}

60 61 62 63 64 65
#ifdef HAVE_URING

Uring::Queue *
EventLoop::GetUring() noexcept
{
	if (!uring_initialized) {
66
		uring_initialized = true;
67 68 69 70 71 72 73 74 75 76 77 78 79
		try {
			uring = std::make_unique<Uring::Manager>(*this);
		} catch (...) {
			fprintf(stderr, "Failed to initialize io_uring: ");
			PrintException(std::current_exception());
		}
	}

	return uring.get();
}

#endif

80
void
81
EventLoop::Break() noexcept
82
{
83
	if (quit.exchange(true))
84 85
		return;

86
	wake_fd.Write();
87 88
}

89
bool
90
EventLoop::Abandon(int _fd, SocketMonitor &m)  noexcept
91
{
92
	assert(!IsAlive() || IsInside());
93

94 95
	poll_result.Clear(&m);
	return poll_group.Abandon(_fd);
96
}
97

98
bool
99
EventLoop::RemoveFD(int _fd, SocketMonitor &m) noexcept
100
{
101
	assert(!IsAlive() || IsInside());
102

103 104
	poll_result.Clear(&m);
	return poll_group.Remove(_fd);
105 106 107
}

void
108
EventLoop::AddIdle(IdleMonitor &i) noexcept
109
{
110
	assert(IsInside());
111

112
	idle.push_back(i);
113
	again = true;
114 115 116
}

void
117
EventLoop::RemoveIdle(IdleMonitor &i) noexcept
118
{
119
	assert(IsInside());
120

121
	idle.erase(idle.iterator_to(i));
122 123 124
}

void
125
EventLoop::AddTimer(TimerEvent &t, Event::Duration d) noexcept
126
{
127
	assert(IsInside());
128

129
	t.due = now + d;
130
	timers.insert(t);
131
	again = true;
132 133
}

134
inline Event::Duration
135 136
EventLoop::HandleTimers() noexcept
{
137
	Event::Duration timeout;
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153

	while (!quit) {
		auto i = timers.begin();
		if (i == timers.end())
			break;

		TimerEvent &t = *i;
		timeout = t.due - now;
		if (timeout > timeout.zero())
			return timeout;

		timers.erase(i);

		t.Run();
	}

154
	return Event::Duration(-1);
155 156
}

157 158 159 160 161 162
/**
 * Convert the given timeout specification to a milliseconds integer,
 * to be used by functions like poll() and epoll_wait().  Any negative
 * value (= never times out) is translated to the magic value -1.
 */
static constexpr int
163
ExportTimeoutMS(Event::Duration timeout)
164 165
{
	return timeout >= timeout.zero()
166 167
		/* round up (+1) to avoid unnecessary wakeups */
		? int(std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count()) + 1
168 169 170
		: -1;
}

171
void
172
EventLoop::Run() noexcept
173
{
174 175
	if (thread.IsNull())
		thread = ThreadId::GetCurrent();
176

177
	assert(IsInside());
178
	assert(!quit);
179
	assert(alive);
180
	assert(busy);
181

182
	SocketMonitor::Schedule(SocketMonitor::READ);
183
	AtScopeExit(this) {
184 185 186 187 188 189 190 191 192
#ifdef HAVE_URING
		/* make sure that the Uring::Manager gets destructed
		   from within the EventThread, or else its
		   destruction in another thread will cause assertion
		   failures */
		uring.reset();
		uring_initialized = false;
#endif

193 194
		SocketMonitor::Cancel();
	};
195

196
	do {
197
		now = std::chrono::steady_clock::now();
198
		again = false;
199 200 201

		/* invoke timers */

202 203 204
		const auto timeout = HandleTimers();
		if (quit)
			break;
205 206 207 208

		/* invoke idle */

		while (!idle.empty()) {
209
			IdleMonitor &m = idle.front();
210 211 212 213 214 215 216
			idle.pop_front();
			m.Run();

			if (quit)
				return;
		}

217
		/* try to handle DeferEvents without WakeFD
218
		   overhead */
219 220 221 222 223 224 225 226 227 228 229
		{
			const std::lock_guard<Mutex> lock(mutex);
			HandleDeferred();
			busy = false;

			if (again)
				/* re-evaluate timers because one of
				   the IdleMonitors may have added a
				   new timeout */
				continue;
		}
230 231 232

		/* wait for new event */

233
		poll_group.ReadEvents(poll_result, ExportTimeoutMS(timeout));
234

235
		now = std::chrono::steady_clock::now();
236

237 238 239 240
		{
			const std::lock_guard<Mutex> lock(mutex);
			busy = true;
		}
241

242
		/* invoke sockets */
243
		for (size_t i = 0; i < poll_result.GetSize(); ++i) {
244 245
			auto events = poll_result.GetEvents(i);
			if (events != 0) {
246 247
				if (quit)
					break;
248 249 250

				auto m = (SocketMonitor *)poll_result.GetObject(i);
				m->Dispatch(events);
251 252 253
			}
		}

254
		poll_result.Reset();
255

256
	} while (!quit);
257

258
#ifndef NDEBUG
259
	assert(thread.IsInside());
260
#endif
261 262
}

263
void
264
EventLoop::AddDeferred(DeferEvent &d) noexcept
265
{
266 267 268 269
	bool must_wake;

	{
		const std::lock_guard<Mutex> lock(mutex);
270
		if (d.IsPending())
271
			return;
272

273
		/* we don't need to wake up the EventLoop if another
274
		   DeferEvent has already done it */
275
		must_wake = !busy && deferred.empty();
276

277
		deferred.push_back(d);
278 279
		again = true;
	}
280

281 282
	if (must_wake)
		wake_fd.Write();
283 284 285
}

void
286
EventLoop::RemoveDeferred(DeferEvent &d) noexcept
287
{
288
	const std::lock_guard<Mutex> protect(mutex);
289

290
	if (d.IsPending())
291
		deferred.erase(deferred.iterator_to(d));
292 293
}

294
void
295
EventLoop::HandleDeferred() noexcept
296
{
297
	while (!deferred.empty() && !quit) {
298
		auto &m = deferred.front();
299
		assert(m.IsPending());
300 301 302

		deferred.pop_front();

303
		const ScopeUnlock unlock(mutex);
304 305
		m.RunDeferred();
	}
306
}
307

308
bool
Rosen Penev's avatar
Rosen Penev committed
309
EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
310
{
311 312
	assert(IsInside());

313 314
	wake_fd.Read();

315
	const std::lock_guard<Mutex> lock(mutex);
316
	HandleDeferred();
317 318 319

	return true;
}