Thread.cxx 11 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2021 The Music Player Daemon Project
3
 * http://www.musicpd.org
4 5 6 7 8 9 10 11 12 13
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
14 15 16 17
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 19
 */

20
#include "Control.hxx"
21
#include "Error.hxx"
22
#include "Filtered.hxx"
23
#include "Client.hxx"
24
#include "Domain.hxx"
25 26
#include "lib/fmt/AudioFormatFormatter.hxx"
#include "lib/fmt/ExceptionFormatter.hxx"
27
#include "thread/Util.hxx"
28
#include "thread/Slack.hxx"
29
#include "thread/Name.hxx"
30
#include "util/StringBuffer.hxx"
31
#include "util/ScopeExit.hxx"
32
#include "util/RuntimeError.hxx"
33
#include "Log.hxx"
34

35 36
#include <cassert>

Max Kellermann's avatar
Max Kellermann committed
37
#include <string.h>
38

39
void
40
AudioOutputControl::CommandFinished() noexcept
41
{
42 43
	assert(command != Command::NONE);
	command = Command::NONE;
44

45
	client_cond.notify_one();
46 47
}

48
inline void
49
AudioOutputControl::InternalOpen2(const AudioFormat in_audio_format)
50
{
51
	assert(in_audio_format.IsValid());
52

53
	const auto cf = in_audio_format.WithMask(output->config_audio_format);
54

55
	if (open && cf != output->filter_audio_format)
56 57
		/* if the filter's output format changes, the output
		   must be reopened as well */
58
		InternalCloseOutput(playing);
59

60
	output->filter_audio_format = cf;
61

62
	if (!open) {
63
		{
64
			const ScopeUnlock unlock(mutex);
65
			output->OpenOutputAndConvert(output->filter_audio_format);
66
		}
67 68

		open = true;
69
		playing = false;
70
	} else if (in_audio_format != output->out_audio_format) {
71 72 73 74
		/* reconfigure the final ConvertFilter for its new
		   input AudioFormat */

		try {
75
			output->ConfigureConvertFilter();
76
		} catch (...) {
77
			InternalCloseOutput(false);
78
			throw;
79
		}
80
	}
81 82 83 84 85

	{
		const ScopeUnlock unlock(mutex);
		output->OpenSoftwareMixer();
	}
86 87
}

88 89 90
inline bool
AudioOutputControl::InternalEnable() noexcept
{
91 92 93 94
	if (really_enabled)
		/* already enabled */
		return true;

95 96 97
	last_error = nullptr;

	try {
98 99 100 101 102
		{
			const ScopeUnlock unlock(mutex);
			output->Enable();
		}

103
		really_enabled = true;
104
		return true;
105 106
	} catch (...) {
		LogError(std::current_exception());
107
		Failure(std::current_exception());
108 109 110 111
		return false;
	}
}

112 113 114
inline void
AudioOutputControl::InternalDisable() noexcept
{
115 116 117
	if (!really_enabled)
		return;

118
	InternalCheckClose(false);
119

120
	really_enabled = false;
121 122

	const ScopeUnlock unlock(mutex);
123 124 125
	output->Disable();
}

126
inline void
127
AudioOutputControl::InternalOpen(const AudioFormat in_audio_format,
128 129
				 const MusicPipe &pipe) noexcept
{
130 131 132 133
	/* enable the device (just in case the last enable has failed) */
	if (!InternalEnable())
		return;

134 135
	last_error = nullptr;
	fail_timer.Reset();
136
	caught_interrupted = false;
137
	skip_delay = true;
138

139 140
	AudioFormat f;

141
	try {
142 143
		try {
			f = source.Open(in_audio_format, pipe,
144 145
					output->prepared_replay_gain_filter.get(),
					output->prepared_other_replay_gain_filter.get(),
146
					*output->prepared_filter);
147
		} catch (...) {
148 149
			std::throw_with_nested(FormatRuntimeError("Failed to open filter for %s",
								  GetLogName()));
150 151 152
		}

		try {
153
			InternalOpen2(f);
154
		} catch (...) {
155
			source.Close();
156 157
			throw;
		}
158 159
	} catch (...) {
		LogError(std::current_exception());
160
		Failure(std::current_exception());
161
		return;
162
	}
163 164

	if (f != in_audio_format || f != output->out_audio_format)
165 166
		FmtDebug(output_domain, "converting in={} -> f={} -> out={}",
			 in_audio_format, f, output->out_audio_format);
167 168
}

169 170 171 172 173 174 175 176 177 178 179
inline void
AudioOutputControl::InternalCloseOutput(bool drain) noexcept
{
	assert(IsOpen());

	open = false;

	const ScopeUnlock unlock(mutex);
	output->CloseOutput(drain);
}

180
inline void
181
AudioOutputControl::InternalClose(bool drain) noexcept
182
{
183
	assert(IsOpen());
184

185
	open = false;
186 187 188 189 190 191

	{
		const ScopeUnlock unlock(mutex);
		output->Close(drain);
	}

192
	source.Close();
193 194
}

195 196 197 198 199 200 201
inline void
AudioOutputControl::InternalCheckClose(bool drain) noexcept
{
	if (IsOpen())
		InternalClose(drain);
}

202 203 204 205 206 207
/**
 * Wait until the output's delay reaches zero.
 *
 * @return true if playback should be continued, false if a command
 * was issued
 */
208
inline bool
209
AudioOutputControl::WaitForDelay(std::unique_lock<Mutex> &lock) noexcept
210 211
{
	while (true) {
212
		const auto delay = output->Delay();
213
		if (delay <= std::chrono::steady_clock::duration::zero())
214 215
			return true;

216
		(void)wake_cond.wait_for(lock, delay);
217

218
		if (command != Command::NONE)
219 220 221 222
			return false;
	}
}

223
bool
224
AudioOutputControl::FillSourceOrClose() noexcept
225
try {
226
	return source.Fill(mutex);
227
} catch (...) {
228 229 230
	FmtError(output_domain,
		 "Failed to filter for {}: {}",
		 GetLogName(), std::current_exception());
231
	InternalCloseError(std::current_exception());
232 233 234
	return false;
}

235
inline bool
236
AudioOutputControl::PlayChunk(std::unique_lock<Mutex> &lock) noexcept
237
{
238 239 240 241 242
	// ensure pending tags are flushed in all cases
	const auto *tag = source.ReadTag();
	if (tags && tag != nullptr) {
		const ScopeUnlock unlock(mutex);
		try {
243
			output->SendTag(*tag);
244 245 246
		} catch (AudioOutputInterrupted) {
			caught_interrupted = true;
			return false;
247
		} catch (...) {
248 249 250
			FmtError(output_domain,
				 "Failed to send tag to {}: {}",
				 GetLogName(), std::current_exception());
251
		}
252 253
	}

254
	while (command == Command::NONE) {
255
		const auto data = source.PeekData();
256
		if (data.empty())
257
			break;
258

259 260
		if (skip_delay)
			skip_delay = false;
261
		else if (!WaitForDelay(lock))
262 263
			break;

264 265
		size_t nbytes;

266
		try {
267
			const ScopeUnlock unlock(mutex);
268
			nbytes = output->Play(data.data, data.size);
269
			assert(nbytes > 0);
270
			assert(nbytes <= data.size);
271 272 273
		} catch (AudioOutputInterrupted) {
			caught_interrupted = true;
			return false;
274
		} catch (...) {
275
			FmtError(output_domain,
276
				 "Failed to play on {}: {}",
277
				 GetLogName(), std::current_exception());
278
			InternalCloseError(std::current_exception());
279
			return false;
280 281
		}

282
		assert(nbytes % output->out_audio_format.GetFrameSize() == 0);
283

284
		source.ConsumeData(nbytes);
285 286 287

		/* there's data to be drained from now on */
		playing = true;
288 289
	}

290 291 292
	return true;
}

293
inline bool
294
AudioOutputControl::InternalPlay(std::unique_lock<Mutex> &lock) noexcept
295
{
296
	if (!FillSourceOrClose())
297 298
		/* no chunk available */
		return false;
299

300 301
	assert(!in_playback_loop);
	in_playback_loop = true;
302

303 304 305 306 307
	AtScopeExit(this) {
		assert(in_playback_loop);
		in_playback_loop = false;
	};

308 309
	unsigned n = 0;

310
	do {
311 312 313
		if (command != Command::NONE)
			return true;

314 315 316 317 318
		if (++n >= 64) {
			/* wake up the player every now and then to
			   give it a chance to refill the pipe before
			   it runs empty */
			const ScopeUnlock unlock(mutex);
319
			client.ChunksConsumed();
320 321 322
			n = 0;
		}

323
		if (!PlayChunk(lock))
324
			break;
325
	} while (FillSourceOrClose());
326

327
	const ScopeUnlock unlock(mutex);
328
	client.ChunksConsumed();
329 330

	return true;
331 332
}

333
inline void
334
AudioOutputControl::InternalPause(std::unique_lock<Mutex> &lock) noexcept
335
{
336 337 338 339 340
	{
		const ScopeUnlock unlock(mutex);
		output->BeginPause();
	}

341 342
	pause = true;

343
	CommandFinished();
344 345

	do {
346
		if (!WaitForDelay(lock))
347 348
			break;

349 350
		bool success = false;
		try {
351 352
			const ScopeUnlock unlock(mutex);
			success = output->IteratePause();
353
		} catch (AudioOutputInterrupted) {
354
		} catch (...) {
355 356 357
			FmtError(output_domain,
				 "Failed to pause {}: {}",
				 GetLogName(), std::current_exception());
358 359 360
		}

		if (!success) {
361
			InternalClose(false);
362
			break;
363
		}
364
	} while (command == Command::NONE);
365

366
	pause = false;
367 368 369 370 371

	{
		const ScopeUnlock unlock(mutex);
		output->EndPause();
	}
372 373

	skip_delay = true;
374 375 376

	/* ignore drain commands until we got something new to play */
	playing = false;
377 378
}

379 380 381 382 383 384 385 386 387 388 389 390 391 392
static void
PlayFull(FilteredAudioOutput &output, ConstBuffer<void> _buffer)
{
	auto buffer = ConstBuffer<uint8_t>::FromVoid(_buffer);

	while (!buffer.empty()) {
		size_t nbytes = output.Play(buffer.data, buffer.size);
		assert(nbytes > 0);

		buffer.skip_front(nbytes);
	}

}

393 394 395
inline void
AudioOutputControl::InternalDrain() noexcept
{
396 397 398 399
	/* after this method finishes, there's nothing left to be
	   drained */
	playing = false;

400 401 402
	try {
		/* flush the filter and play its remaining output */

403 404
		const ScopeUnlock unlock(mutex);

405 406 407 408 409 410 411
		while (true) {
			auto buffer = source.Flush();
			if (buffer.IsNull())
				break;

			PlayFull(*output, buffer);
		}
412 413

		output->Drain();
414
	} catch (...) {
415 416 417
		FmtError(output_domain,
			 "Failed to flush filter on {}: {}",
			 GetLogName(), std::current_exception());
418 419 420
		InternalCloseError(std::current_exception());
		return;
	}
421 422
}

423
void
424
AudioOutputControl::Task() noexcept
425
{
426
	FormatThreadName("output:%s", GetName());
427

428 429
	try {
		SetThreadRealtime();
430
	} catch (...) {
431
		FmtInfo(output_domain,
432
			"OutputThread could not get realtime scheduling, continuing anyway: {}",
433
			std::current_exception());
434
	}
435

436
	SetThreadTimerSlack(std::chrono::microseconds(100));
437

438
	std::unique_lock<Mutex> lock(mutex);
439

440
	while (true) {
441
		switch (command) {
442
		case Command::NONE:
443 444 445
			/* no pending command: play (or wait for a
			   command) */

446 447
			if (open && allow_play && !caught_interrupted &&
			    InternalPlay(lock))
448 449 450 451 452 453
				/* don't wait for an event if there
				   are more chunks in the pipe */
				continue;

			woken_for_play = false;
			wake_cond.wait(lock);
454 455
			break;

456
		case Command::ENABLE:
457
			InternalEnable();
458
			CommandFinished();
459 460
			break;

461
		case Command::DISABLE:
462
			InternalDisable();
463
			CommandFinished();
464 465
			break;

466
		case Command::OPEN:
467
			InternalOpen(request.audio_format, *request.pipe);
468
			CommandFinished();
469 470
			break;

471
		case Command::CLOSE:
472
			InternalCheckClose(false);
473
			CommandFinished();
474 475
			break;

476
		case Command::PAUSE:
477
			if (!open) {
478
				/* the output has failed after
479
				   the PAUSE command was submitted; bail
480
				   out */
481
				CommandFinished();
482 483 484
				break;
			}

485 486
			caught_interrupted = false;

487
			InternalPause(lock);
488
			break;
489

490 491 492
		case Command::RELEASE:
			if (!open) {
				/* the output has failed after
493
				   the RELEASE command was submitted; bail
494 495 496 497 498
				   out */
				CommandFinished();
				break;
			}

499 500
			caught_interrupted = false;

501 502
			if (always_on) {
				/* in "always_on" mode, the output is
503
				   paused instead of being closed;
504
				   however we need to flush the
505 506 507
				   AudioOutputSource because its data
				   have been invalidated by stopping
				   the actual playback */
508
				source.Cancel();
509
				InternalPause(lock);
510 511 512 513 514
			} else {
				InternalClose(false);
				CommandFinished();
			}

515
			break;
516

517
		case Command::DRAIN:
518 519
			if (open)
				InternalDrain();
520

521
			CommandFinished();
522
			break;
523

524
		case Command::CANCEL:
525 526
			caught_interrupted = false;

527
			source.Cancel();
528

529
			if (open) {
530
				playing = false;
531
				const ScopeUnlock unlock(mutex);
532
				output->Cancel();
533 534
			}

535
			CommandFinished();
536
			break;
537

538
		case Command::KILL:
539
			InternalDisable();
540
			source.Cancel();
541
			CommandFinished();
542
			return;
543 544 545 546
		}
	}
}

547
void
548
AudioOutputControl::StartThread()
549
{
550
	assert(command == Command::NONE);
551

552 553
	killed = false;

554
	const ScopeUnlock unlock(mutex);
555
	thread.Start();
556
}