Loop.cxx 5.2 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2017 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#include "config.h"
#include "Loop.hxx"
22 23
#include "SocketMonitor.hxx"
#include "IdleMonitor.hxx"
24
#include "DeferEvent.hxx"
25
#include "util/ScopeExit.hxx"
26

27
EventLoop::EventLoop(ThreadId _thread)
28 29 30
	:SocketMonitor(*this),
	 quit(false), dead(false),
	 thread(_thread)
31
{
32
	SocketMonitor::Open(SocketDescriptor(wake_fd.Get()));
33 34
}

35
EventLoop::~EventLoop() noexcept
36 37 38 39 40 41
{
	assert(idle.empty());
	assert(timers.empty());
}

void
42
EventLoop::Break() noexcept
43
{
44
	if (quit.exchange(true))
45 46
		return;

47
	wake_fd.Write();
48 49
}

50
bool
51
EventLoop::Abandon(int _fd, SocketMonitor &m)  noexcept
52
{
53
	assert(IsInside());
54

55 56
	poll_result.Clear(&m);
	return poll_group.Abandon(_fd);
57
}
58

59
bool
60
EventLoop::RemoveFD(int _fd, SocketMonitor &m) noexcept
61
{
62
	assert(IsInside());
63

64 65
	poll_result.Clear(&m);
	return poll_group.Remove(_fd);
66 67 68
}

void
69
EventLoop::AddIdle(IdleMonitor &i) noexcept
70
{
71
	assert(IsInside());
72

73
	idle.push_back(i);
74
	again = true;
75 76 77
}

void
78
EventLoop::RemoveIdle(IdleMonitor &i) noexcept
79
{
80
	assert(IsInside());
81

82
	idle.erase(idle.iterator_to(i));
83 84 85
}

void
86
EventLoop::AddTimer(TimerEvent &t, std::chrono::steady_clock::duration d) noexcept
87
{
88
	assert(IsInside());
89

90
	t.due = now + d;
91
	timers.insert(t);
92
	again = true;
93 94 95
}

void
96
EventLoop::CancelTimer(TimerEvent &t) noexcept
97
{
98
	assert(IsInside());
99

100
	timers.erase(timers.iterator_to(t));
101 102
}

103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
inline std::chrono::steady_clock::duration
EventLoop::HandleTimers() noexcept
{
	std::chrono::steady_clock::duration timeout;

	while (!quit) {
		auto i = timers.begin();
		if (i == timers.end())
			break;

		TimerEvent &t = *i;
		timeout = t.due - now;
		if (timeout > timeout.zero())
			return timeout;

		timers.erase(i);

		t.Run();
	}

	return std::chrono::steady_clock::duration(-1);
}

126 127 128 129 130 131 132 133 134 135 136 137 138
/**
 * Convert the given timeout specification to a milliseconds integer,
 * to be used by functions like poll() and epoll_wait().  Any negative
 * value (= never times out) is translated to the magic value -1.
 */
static constexpr int
ExportTimeoutMS(std::chrono::steady_clock::duration timeout)
{
	return timeout >= timeout.zero()
		? int(std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count())
		: -1;
}

139
void
140
EventLoop::Run() noexcept
141
{
142 143
	if (thread.IsNull())
		thread = ThreadId::GetCurrent();
144

145
	assert(IsInside());
146
	assert(!quit);
147
	assert(!dead);
148
	assert(busy);
149

150
	SocketMonitor::Schedule(SocketMonitor::READ);
151 152 153 154
	AtScopeExit(this) {
		dead = true;
		SocketMonitor::Cancel();
	};
155

156
	do {
157
		now = std::chrono::steady_clock::now();
158
		again = false;
159 160 161

		/* invoke timers */

162 163 164
		const auto timeout = HandleTimers();
		if (quit)
			break;
165 166 167 168

		/* invoke idle */

		while (!idle.empty()) {
169
			IdleMonitor &m = idle.front();
170 171 172 173 174 175 176
			idle.pop_front();
			m.Run();

			if (quit)
				return;
		}

177
		/* try to handle DeferEvents without WakeFD
178
		   overhead */
179 180 181 182 183 184 185 186 187 188 189
		{
			const std::lock_guard<Mutex> lock(mutex);
			HandleDeferred();
			busy = false;

			if (again)
				/* re-evaluate timers because one of
				   the IdleMonitors may have added a
				   new timeout */
				continue;
		}
190 191 192

		/* wait for new event */

193
		poll_group.ReadEvents(poll_result, ExportTimeoutMS(timeout));
194

195
		now = std::chrono::steady_clock::now();
196

197 198 199 200
		{
			const std::lock_guard<Mutex> lock(mutex);
			busy = true;
		}
201

202
		/* invoke sockets */
203
		for (size_t i = 0; i < poll_result.GetSize(); ++i) {
204 205
			auto events = poll_result.GetEvents(i);
			if (events != 0) {
206 207
				if (quit)
					break;
208 209 210

				auto m = (SocketMonitor *)poll_result.GetObject(i);
				m->Dispatch(events);
211 212 213
			}
		}

214
		poll_result.Reset();
215

216
	} while (!quit);
217

218
#ifndef NDEBUG
219
	assert(!dead);
220
	assert(busy);
221
	assert(thread.IsInside());
222
#endif
223 224
}

225
void
226
EventLoop::AddDeferred(DeferEvent &d) noexcept
227
{
228 229 230 231
	bool must_wake;

	{
		const std::lock_guard<Mutex> lock(mutex);
232
		if (d.IsPending())
233
			return;
234

235
		/* we don't need to wake up the EventLoop if another
236
		   DeferEvent has already done it */
237
		must_wake = !busy && deferred.empty();
238

239
		deferred.push_back(d);
240 241
		again = true;
	}
242

243 244
	if (must_wake)
		wake_fd.Write();
245 246 247
}

void
248
EventLoop::RemoveDeferred(DeferEvent &d) noexcept
249
{
250
	const std::lock_guard<Mutex> protect(mutex);
251

252
	if (d.IsPending())
253
		deferred.erase(deferred.iterator_to(d));
254 255
}

256
void
257
EventLoop::HandleDeferred() noexcept
258
{
259
	while (!deferred.empty() && !quit) {
260
		auto &m = deferred.front();
261
		assert(m.IsPending());
262 263 264

		deferred.pop_front();

265
		const ScopeUnlock unlock(mutex);
266 267
		m.RunDeferred();
	}
268
}
269

270
bool
271
EventLoop::OnSocketReady(gcc_unused unsigned flags) noexcept
272
{
273 274
	assert(IsInside());

275 276
	wake_fd.Read();

277
	const std::lock_guard<Mutex> lock(mutex);
278
	HandleDeferred();
279 280 281

	return true;
}