Loop.cxx 6.15 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2020 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#include "Loop.hxx"
21 22
#include "SocketMonitor.hxx"
#include "IdleMonitor.hxx"
23
#include "DeferEvent.hxx"
24
#include "util/ScopeExit.hxx"
25

26 27 28 29 30 31
#ifdef HAVE_URING
#include "UringManager.hxx"
#include "util/PrintException.hxx"
#include <stdio.h>
#endif

32
EventLoop::EventLoop(ThreadId _thread)
33
	:SocketMonitor(*this),
34 35 36 37 38 39 40
	 /* if this instance is hosted by an EventThread (no ThreadId
	    known yet) then we're not yet alive until the thread is
	    started; for the main EventLoop instance, we assume it's
	    already alive, because nobody but EventThread will call
	    SetAlive() */
	 alive(!_thread.IsNull()),
	 quit(false),
41
	 thread(_thread)
42
{
43
	SocketMonitor::Open(SocketDescriptor(wake_fd.Get()));
44 45
}

46
EventLoop::~EventLoop() noexcept
47 48 49 50 51
{
	assert(idle.empty());
	assert(timers.empty());
}

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
#ifdef HAVE_URING

Uring::Queue *
EventLoop::GetUring() noexcept
{
	if (!uring_initialized) {
		try {
			uring = std::make_unique<Uring::Manager>(*this);
		} catch (...) {
			fprintf(stderr, "Failed to initialize io_uring: ");
			PrintException(std::current_exception());
		}
	}

	return uring.get();
}

#endif

71
void
72
EventLoop::Break() noexcept
73
{
74
	if (quit.exchange(true))
75 76
		return;

77
	wake_fd.Write();
78 79
}

80
bool
81
EventLoop::Abandon(int _fd, SocketMonitor &m)  noexcept
82
{
83
	assert(!IsAlive() || IsInside());
84

85 86
	poll_result.Clear(&m);
	return poll_group.Abandon(_fd);
87
}
88

89
bool
90
EventLoop::RemoveFD(int _fd, SocketMonitor &m) noexcept
91
{
92
	assert(!IsAlive() || IsInside());
93

94 95
	poll_result.Clear(&m);
	return poll_group.Remove(_fd);
96 97 98
}

void
99
EventLoop::AddIdle(IdleMonitor &i) noexcept
100
{
101
	assert(IsInside());
102

103
	idle.push_back(i);
104
	again = true;
105 106 107
}

void
108
EventLoop::RemoveIdle(IdleMonitor &i) noexcept
109
{
110
	assert(IsInside());
111

112
	idle.erase(idle.iterator_to(i));
113 114 115
}

void
116
EventLoop::AddTimer(TimerEvent &t, std::chrono::steady_clock::duration d) noexcept
117
{
118
	assert(IsInside());
119

120
	t.due = now + d;
121
	timers.insert(t);
122
	again = true;
123 124 125
}

void
126
EventLoop::CancelTimer(TimerEvent &t) noexcept
127
{
128
	assert(IsInside());
129

130
	timers.erase(timers.iterator_to(t));
131 132
}

133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
inline std::chrono::steady_clock::duration
EventLoop::HandleTimers() noexcept
{
	std::chrono::steady_clock::duration timeout;

	while (!quit) {
		auto i = timers.begin();
		if (i == timers.end())
			break;

		TimerEvent &t = *i;
		timeout = t.due - now;
		if (timeout > timeout.zero())
			return timeout;

		timers.erase(i);

		t.Run();
	}

	return std::chrono::steady_clock::duration(-1);
}

156 157 158 159 160 161 162 163 164
/**
 * Convert the given timeout specification to a milliseconds integer,
 * to be used by functions like poll() and epoll_wait().  Any negative
 * value (= never times out) is translated to the magic value -1.
 */
static constexpr int
ExportTimeoutMS(std::chrono::steady_clock::duration timeout)
{
	return timeout >= timeout.zero()
165 166
		/* round up (+1) to avoid unnecessary wakeups */
		? int(std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count()) + 1
167 168 169
		: -1;
}

170
void
171
EventLoop::Run() noexcept
172
{
173 174
	if (thread.IsNull())
		thread = ThreadId::GetCurrent();
175

176
	assert(IsInside());
177
	assert(!quit);
178
	assert(alive);
179
	assert(busy);
180

181
	SocketMonitor::Schedule(SocketMonitor::READ);
182
	AtScopeExit(this) {
183 184 185 186 187 188 189 190 191
#ifdef HAVE_URING
		/* make sure that the Uring::Manager gets destructed
		   from within the EventThread, or else its
		   destruction in another thread will cause assertion
		   failures */
		uring.reset();
		uring_initialized = false;
#endif

192 193
		SocketMonitor::Cancel();
	};
194

195
	do {
196
		now = std::chrono::steady_clock::now();
197
		again = false;
198 199 200

		/* invoke timers */

201 202 203
		const auto timeout = HandleTimers();
		if (quit)
			break;
204 205 206 207

		/* invoke idle */

		while (!idle.empty()) {
208
			IdleMonitor &m = idle.front();
209 210 211 212 213 214 215
			idle.pop_front();
			m.Run();

			if (quit)
				return;
		}

216
		/* try to handle DeferEvents without WakeFD
217
		   overhead */
218 219 220 221 222 223 224 225 226 227 228
		{
			const std::lock_guard<Mutex> lock(mutex);
			HandleDeferred();
			busy = false;

			if (again)
				/* re-evaluate timers because one of
				   the IdleMonitors may have added a
				   new timeout */
				continue;
		}
229 230 231

		/* wait for new event */

232
		poll_group.ReadEvents(poll_result, ExportTimeoutMS(timeout));
233

234
		now = std::chrono::steady_clock::now();
235

236 237 238 239
		{
			const std::lock_guard<Mutex> lock(mutex);
			busy = true;
		}
240

241
		/* invoke sockets */
242
		for (size_t i = 0; i < poll_result.GetSize(); ++i) {
243 244
			auto events = poll_result.GetEvents(i);
			if (events != 0) {
245 246
				if (quit)
					break;
247 248 249

				auto m = (SocketMonitor *)poll_result.GetObject(i);
				m->Dispatch(events);
250 251 252
			}
		}

253
		poll_result.Reset();
254

255
	} while (!quit);
256

257
#ifndef NDEBUG
258
	assert(thread.IsInside());
259
#endif
260 261
}

262
void
263
EventLoop::AddDeferred(DeferEvent &d) noexcept
264
{
265 266 267 268
	bool must_wake;

	{
		const std::lock_guard<Mutex> lock(mutex);
269
		if (d.IsPending())
270
			return;
271

272
		/* we don't need to wake up the EventLoop if another
273
		   DeferEvent has already done it */
274
		must_wake = !busy && deferred.empty();
275

276
		deferred.push_back(d);
277 278
		again = true;
	}
279

280 281
	if (must_wake)
		wake_fd.Write();
282 283 284
}

void
285
EventLoop::RemoveDeferred(DeferEvent &d) noexcept
286
{
287
	const std::lock_guard<Mutex> protect(mutex);
288

289
	if (d.IsPending())
290
		deferred.erase(deferred.iterator_to(d));
291 292
}

293
void
294
EventLoop::HandleDeferred() noexcept
295
{
296
	while (!deferred.empty() && !quit) {
297
		auto &m = deferred.front();
298
		assert(m.IsPending());
299 300 301

		deferred.pop_front();

302
		const ScopeUnlock unlock(mutex);
303 304
		m.RunDeferred();
	}
305
}
306

307
bool
Rosen Penev's avatar
Rosen Penev committed
308
EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept
309
{
310 311
	assert(IsInside());

312 313
	wake_fd.Read();

314
	const std::lock_guard<Mutex> lock(mutex);
315
	HandleDeferred();
316 317 318

	return true;
}