Loop.cxx 4.94 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2017 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#include "config.h"
#include "Loop.hxx"
22 23
#include "SocketMonitor.hxx"
#include "IdleMonitor.hxx"
24
#include "DeferEvent.hxx"
25
#include "util/ScopeExit.hxx"
26

27
EventLoop::EventLoop(ThreadId _thread)
28
	:SocketMonitor(*this), quit(false), thread(_thread)
29
{
30
	SocketMonitor::Open(SocketDescriptor(wake_fd.Get()));
31 32 33 34 35 36 37 38 39 40 41
}

EventLoop::~EventLoop()
{
	assert(idle.empty());
	assert(timers.empty());
}

void
EventLoop::Break()
{
42
	if (quit.exchange(true))
43 44
		return;

45
	wake_fd.Write();
46 47
}

48 49
bool
EventLoop::Abandon(int _fd, SocketMonitor &m)
50
{
51
	assert(IsInside());
52

53 54
	poll_result.Clear(&m);
	return poll_group.Abandon(_fd);
55
}
56

57 58 59
bool
EventLoop::RemoveFD(int _fd, SocketMonitor &m)
{
60
	assert(IsInside());
61

62 63
	poll_result.Clear(&m);
	return poll_group.Remove(_fd);
64 65 66 67 68
}

void
EventLoop::AddIdle(IdleMonitor &i)
{
69
	assert(IsInside());
70

71
	idle.push_back(i);
72
	again = true;
73 74 75 76 77
}

void
EventLoop::RemoveIdle(IdleMonitor &i)
{
78
	assert(IsInside());
79

80
	idle.erase(idle.iterator_to(i));
81 82 83
}

void
84
EventLoop::AddTimer(TimerEvent &t, std::chrono::steady_clock::duration d)
85
{
86
	assert(IsInside());
87

88
	t.due = now + d;
89
	timers.insert(t);
90
	again = true;
91 92 93
}

void
94
EventLoop::CancelTimer(TimerEvent &t)
95
{
96
	assert(IsInside());
97

98
	timers.erase(timers.iterator_to(t));
99 100
}

101 102 103 104 105 106 107 108 109 110 111 112 113
/**
 * Convert the given timeout specification to a milliseconds integer,
 * to be used by functions like poll() and epoll_wait().  Any negative
 * value (= never times out) is translated to the magic value -1.
 */
static constexpr int
ExportTimeoutMS(std::chrono::steady_clock::duration timeout)
{
	return timeout >= timeout.zero()
		? int(std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count())
		: -1;
}

114 115 116
void
EventLoop::Run()
{
117 118
	if (thread.IsNull())
		thread = ThreadId::GetCurrent();
119

120
	assert(IsInside());
121
	assert(!quit);
122
	assert(busy);
123

124 125 126
	SocketMonitor::Schedule(SocketMonitor::READ);
	AtScopeExit(this) { SocketMonitor::Cancel(); };

127
	do {
128
		now = std::chrono::steady_clock::now();
129
		again = false;
130 131 132

		/* invoke timers */

133
		std::chrono::steady_clock::duration timeout;
134 135 136
		while (true) {
			auto i = timers.begin();
			if (i == timers.end()) {
137
				timeout = std::chrono::steady_clock::duration(-1);
138 139 140
				break;
			}

141 142
			TimerEvent &t = *i;
			timeout = t.due - now;
143
			if (timeout > timeout.zero())
144 145 146 147
				break;

			timers.erase(i);

148
			t.Run();
149 150 151 152 153 154 155 156

			if (quit)
				return;
		}

		/* invoke idle */

		while (!idle.empty()) {
157
			IdleMonitor &m = idle.front();
158 159 160 161 162 163 164
			idle.pop_front();
			m.Run();

			if (quit)
				return;
		}

165
		/* try to handle DeferEvents without WakeFD
166
		   overhead */
167 168 169 170 171 172 173 174 175 176 177
		{
			const std::lock_guard<Mutex> lock(mutex);
			HandleDeferred();
			busy = false;

			if (again)
				/* re-evaluate timers because one of
				   the IdleMonitors may have added a
				   new timeout */
				continue;
		}
178 179 180

		/* wait for new event */

181
		poll_group.ReadEvents(poll_result, ExportTimeoutMS(timeout));
182

183
		now = std::chrono::steady_clock::now();
184

185 186 187 188
		{
			const std::lock_guard<Mutex> lock(mutex);
			busy = true;
		}
189

190
		/* invoke sockets */
191 192 193
		for (int i = 0; i < poll_result.GetSize(); ++i) {
			auto events = poll_result.GetEvents(i);
			if (events != 0) {
194 195
				if (quit)
					break;
196 197 198

				auto m = (SocketMonitor *)poll_result.GetObject(i);
				m->Dispatch(events);
199 200 201
			}
		}

202
		poll_result.Reset();
203

204
	} while (!quit);
205

206
#ifndef NDEBUG
207
	assert(busy);
208
	assert(thread.IsInside());
209
#endif
210 211
}

212
void
213
EventLoop::AddDeferred(DeferEvent &d) noexcept
214
{
215 216 217 218
	bool must_wake;

	{
		const std::lock_guard<Mutex> lock(mutex);
219
		if (d.IsPending())
220
			return;
221

222
		/* we don't need to wake up the EventLoop if another
223
		   DeferEvent has already done it */
224
		must_wake = !busy && deferred.empty();
225

226
		deferred.push_back(d);
227 228
		again = true;
	}
229

230 231
	if (must_wake)
		wake_fd.Write();
232 233 234
}

void
235
EventLoop::RemoveDeferred(DeferEvent &d) noexcept
236
{
237
	const std::lock_guard<Mutex> protect(mutex);
238

239
	if (d.IsPending())
240
		deferred.erase(deferred.iterator_to(d));
241 242
}

243 244
void
EventLoop::HandleDeferred()
245
{
246
	while (!deferred.empty() && !quit) {
247
		auto &m = deferred.front();
248
		assert(m.IsPending());
249 250 251

		deferred.pop_front();

252
		const ScopeUnlock unlock(mutex);
253 254
		m.RunDeferred();
	}
255
}
256

257
bool
258
EventLoop::OnSocketReady(gcc_unused unsigned flags) noexcept
259
{
260 261
	assert(IsInside());

262 263
	wake_fd.Read();

264
	const std::lock_guard<Mutex> lock(mutex);
265
	HandleDeferred();
266 267 268

	return true;
}