PulseOutputPlugin.cxx 20.3 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright (C) 2003-2014 The Music Player Daemon Project
3
 * http://www.musicpd.org
4 5 6 7 8 9 10 11 12 13
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
14 15 16 17
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 19
 */

20
#include "config.h"
21
#include "PulseOutputPlugin.hxx"
22
#include "../OutputAPI.hxx"
Max Kellermann's avatar
Max Kellermann committed
23 24
#include "mixer/MixerList.hxx"
#include "mixer/plugins/PulseMixerPlugin.hxx"
25 26
#include "util/Error.hxx"
#include "util/Domain.hxx"
27
#include "Log.hxx"
28

29 30 31
#include <pulse/thread-mainloop.h>
#include <pulse/context.h>
#include <pulse/stream.h>
32 33
#include <pulse/introspect.h>
#include <pulse/subscribe.h>
34
#include <pulse/error.h>
35
#include <pulse/version.h>
36

37
#include <assert.h>
38
#include <stddef.h>
39
#include <stdlib.h>
40

41
#define MPD_PULSE_NAME "Music Player Daemon"
42

43
struct PulseOutput {
44
	AudioOutput base;
45

46 47 48 49
	const char *name;
	const char *server;
	const char *sink;

50
	PulseMixer *mixer;
51 52 53 54 55 56

	struct pa_threaded_mainloop *mainloop;
	struct pa_context *context;
	struct pa_stream *stream;

	size_t writable;
57 58 59

	PulseOutput()
		:base(pulse_output_plugin) {}
60 61
};

62 63 64 65
static constexpr Domain pulse_output_domain("pulse_output");

static void
SetError(Error &error, pa_context *context, const char *msg)
66
{
67 68
	const int e = pa_context_errno(context);
	error.Format(pulse_output_domain, e, "%s: %s", msg, pa_strerror(e));
69 70
}

71
void
72
pulse_output_lock(PulseOutput &po)
73
{
74
	pa_threaded_mainloop_lock(po.mainloop);
75 76 77
}

void
78
pulse_output_unlock(PulseOutput &po)
79
{
80
	pa_threaded_mainloop_unlock(po.mainloop);
81 82
}

83
void
84
pulse_output_set_mixer(PulseOutput &po, PulseMixer &pm)
85
{
86
	assert(po.mixer == nullptr);
87

88
	po.mixer = &pm;
89

90
	if (po.mainloop == nullptr)
91 92
		return;

93
	pa_threaded_mainloop_lock(po.mainloop);
94

95 96 97
	if (po.context != nullptr &&
	    pa_context_get_state(po.context) == PA_CONTEXT_READY) {
		pulse_mixer_on_connect(pm, po.context);
98

99 100 101
		if (po.stream != nullptr &&
		    pa_stream_get_state(po.stream) == PA_STREAM_READY)
			pulse_mixer_on_change(pm, po.context, po.stream);
102 103
	}

104
	pa_threaded_mainloop_unlock(po.mainloop);
105 106 107
}

void
108
pulse_output_clear_mixer(PulseOutput &po, gcc_unused PulseMixer &pm)
109
{
110
	assert(po.mixer == &pm);
111

112
	po.mixer = nullptr;
113 114 115
}

bool
116
pulse_output_set_volume(PulseOutput &po, const pa_cvolume *volume,
117
			Error &error)
118 119 120
{
	pa_operation *o;

121 122
	if (po.context == nullptr || po.stream == nullptr ||
	    pa_stream_get_state(po.stream) != PA_STREAM_READY) {
123
		error.Set(pulse_output_domain, "disconnected");
124 125 126
		return false;
	}

127 128
	o = pa_context_set_sink_input_volume(po.context,
					     pa_stream_get_index(po.stream),
129 130
					     volume, nullptr, nullptr);
	if (o == nullptr) {
131
		SetError(error, po.context,
132
			 "failed to set PulseAudio volume");
133 134 135 136 137 138 139
		return false;
	}

	pa_operation_unref(o);
	return true;
}

140 141 142 143 144 145 146 147 148 149 150
/**
 * \brief waits for a pulseaudio operation to finish, frees it and
 *     unlocks the mainloop
 * \param operation the operation to wait for
 * \return true if operation has finished normally (DONE state),
 *     false otherwise
 */
static bool
pulse_wait_for_operation(struct pa_threaded_mainloop *mainloop,
			 struct pa_operation *operation)
{
151 152
	assert(mainloop != nullptr);
	assert(operation != nullptr);
153

154 155 156
	pa_operation_state_t state;
	while ((state = pa_operation_get_state(operation))
	       == PA_OPERATION_RUNNING)
157 158 159 160 161 162 163 164 165 166 167 168
		pa_threaded_mainloop_wait(mainloop);

	pa_operation_unref(operation);

	return state == PA_OPERATION_DONE;
}

/**
 * Callback function for stream operation.  It just sends a signal to
 * the caller thread, to wake pulse_wait_for_operation() up.
 */
static void
169 170
pulse_output_stream_success_cb(gcc_unused pa_stream *s,
			       gcc_unused int success, void *userdata)
171
{
172
	PulseOutput *po = (PulseOutput *)userdata;
173 174 175 176

	pa_threaded_mainloop_signal(po->mainloop, 0);
}

177
static void
178 179
pulse_output_context_state_cb(struct pa_context *context, void *userdata)
{
180
	PulseOutput *po = (PulseOutput *)userdata;
181 182 183

	switch (pa_context_get_state(context)) {
	case PA_CONTEXT_READY:
184
		if (po->mixer != nullptr)
185
			pulse_mixer_on_connect(*po->mixer, context);
186 187 188 189

		pa_threaded_mainloop_signal(po->mainloop, 0);
		break;

190 191
	case PA_CONTEXT_TERMINATED:
	case PA_CONTEXT_FAILED:
192
		if (po->mixer != nullptr)
193
			pulse_mixer_on_disconnect(*po->mixer);
194

195 196 197 198 199 200 201 202 203 204 205 206 207
		/* the caller thread might be waiting for these
		   states */
		pa_threaded_mainloop_signal(po->mainloop, 0);
		break;

	case PA_CONTEXT_UNCONNECTED:
	case PA_CONTEXT_CONNECTING:
	case PA_CONTEXT_AUTHORIZING:
	case PA_CONTEXT_SETTING_NAME:
		break;
	}
}

208 209 210 211 212
static void
pulse_output_subscribe_cb(pa_context *context,
			  pa_subscription_event_type_t t,
			  uint32_t idx, void *userdata)
{
213 214 215 216 217
	PulseOutput *po = (PulseOutput *)userdata;
	pa_subscription_event_type_t facility =
		pa_subscription_event_type_t(t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK);
	pa_subscription_event_type_t type =
		pa_subscription_event_type_t(t & PA_SUBSCRIPTION_EVENT_TYPE_MASK);
218

219
	if (po->mixer != nullptr &&
220
	    facility == PA_SUBSCRIPTION_EVENT_SINK_INPUT &&
221
	    po->stream != nullptr &&
222 223 224 225
	    pa_stream_get_state(po->stream) == PA_STREAM_READY &&
	    idx == pa_stream_get_index(po->stream) &&
	    (type == PA_SUBSCRIPTION_EVENT_NEW ||
	     type == PA_SUBSCRIPTION_EVENT_CHANGE))
226
		pulse_mixer_on_change(*po->mixer, context, po->stream);
227 228
}

229 230 231 232 233 234
/**
 * Attempt to connect asynchronously to the PulseAudio server.
 *
 * @return true on success, false on error
 */
static bool
235
pulse_output_connect(PulseOutput *po, Error &error)
236
{
237 238
	assert(po != nullptr);
	assert(po->context != nullptr);
239

240 241 242 243
	if (pa_context_connect(po->context, po->server,
			       (pa_context_flags_t)0, nullptr) < 0) {
		SetError(error, po->context,
			 "pa_context_connect() has failed");
244 245
		return false;
	}
246

247 248
	return true;
}
249

250 251 252 253
/**
 * Frees and clears the stream.
 */
static void
254
pulse_output_delete_stream(PulseOutput *po)
255
{
256 257
	assert(po != nullptr);
	assert(po->stream != nullptr);
258

259
	pa_stream_set_suspended_callback(po->stream, nullptr, nullptr);
260

261 262
	pa_stream_set_state_callback(po->stream, nullptr, nullptr);
	pa_stream_set_write_callback(po->stream, nullptr, nullptr);
263

264 265
	pa_stream_disconnect(po->stream);
	pa_stream_unref(po->stream);
266
	po->stream = nullptr;
267 268
}

269 270
/**
 * Frees and clears the context.
271 272
 *
 * Caller must lock the main loop.
273 274
 */
static void
275
pulse_output_delete_context(PulseOutput *po)
276
{
277 278
	assert(po != nullptr);
	assert(po->context != nullptr);
279

280 281
	pa_context_set_state_callback(po->context, nullptr, nullptr);
	pa_context_set_subscribe_callback(po->context, nullptr, nullptr);
282

283 284
	pa_context_disconnect(po->context);
	pa_context_unref(po->context);
285
	po->context = nullptr;
286 287
}

288 289 290
/**
 * Create, set up and connect a context.
 *
291 292
 * Caller must lock the main loop.
 *
293 294 295
 * @return true on success, false on error
 */
static bool
296
pulse_output_setup_context(PulseOutput *po, Error &error)
297
{
298 299
	assert(po != nullptr);
	assert(po->mainloop != nullptr);
300

301 302
	po->context = pa_context_new(pa_threaded_mainloop_get_api(po->mainloop),
				     MPD_PULSE_NAME);
303
	if (po->context == nullptr) {
304
		error.Set(pulse_output_domain, "pa_context_new() has failed");
305 306 307 308 309
		return false;
	}

	pa_context_set_state_callback(po->context,
				      pulse_output_context_state_cb, po);
310 311
	pa_context_set_subscribe_callback(po->context,
					  pulse_output_subscribe_cb, po);
312

313
	if (!pulse_output_connect(po, error)) {
314
		pulse_output_delete_context(po);
315 316 317 318
		return false;
	}

	return true;
319 320
}

321
static AudioOutput *
322
pulse_output_init(const config_param &param, Error &error)
323
{
324
	PulseOutput *po;
325

326
	setenv("PULSE_PROP_media.role", "music", true);
327
	setenv("PULSE_PROP_application.icon_name", "mpd", true);
328

329
	po = new PulseOutput();
330
	if (!po->base.Configure(param, error)) {
331 332
		delete po;
		return nullptr;
333 334
	}

335 336 337
	po->name = param.GetBlockValue("name", "mpd_pulse");
	po->server = param.GetBlockValue("server");
	po->sink = param.GetBlockValue("sink");
338

339 340 341 342
	po->mixer = nullptr;
	po->mainloop = nullptr;
	po->context = nullptr;
	po->stream = nullptr;
343

344
	return &po->base;
345 346 347
}

static void
348
pulse_output_finish(AudioOutput *ao)
349
{
350
	PulseOutput *po = (PulseOutput *)ao;
351

352
	delete po;
353 354 355
}

static bool
356
pulse_output_enable(AudioOutput *ao, Error &error)
357
{
358
	PulseOutput *po = (PulseOutput *)ao;
359

360 361
	assert(po->mainloop == nullptr);
	assert(po->context == nullptr);
362

363 364 365
	/* create the libpulse mainloop and start the thread */

	po->mainloop = pa_threaded_mainloop_new();
366
	if (po->mainloop == nullptr) {
367 368
		error.Set(pulse_output_domain,
			  "pa_threaded_mainloop_new() has failed");
369
		return false;
370 371 372 373 374 375 376
	}

	pa_threaded_mainloop_lock(po->mainloop);

	if (pa_threaded_mainloop_start(po->mainloop) < 0) {
		pa_threaded_mainloop_unlock(po->mainloop);
		pa_threaded_mainloop_free(po->mainloop);
377
		po->mainloop = nullptr;
378

379 380
		error.Set(pulse_output_domain,
			  "pa_threaded_mainloop_start() has failed");
381 382 383 384 385
		return false;
	}

	/* create the libpulse context and connect it */

386
	if (!pulse_output_setup_context(po, error)) {
387 388 389
		pa_threaded_mainloop_unlock(po->mainloop);
		pa_threaded_mainloop_stop(po->mainloop);
		pa_threaded_mainloop_free(po->mainloop);
390
		po->mainloop = nullptr;
391
		return false;
392 393 394 395
	}

	pa_threaded_mainloop_unlock(po->mainloop);

396
	return true;
397 398
}

399
static void
400
pulse_output_disable(AudioOutput *ao)
401
{
402
	PulseOutput *po = (PulseOutput *)ao;
403

404
	assert(po->mainloop != nullptr);
405

406
	pa_threaded_mainloop_stop(po->mainloop);
407
	if (po->context != nullptr)
408 409
		pulse_output_delete_context(po);
	pa_threaded_mainloop_free(po->mainloop);
410
	po->mainloop = nullptr;
411 412
}

413 414 415 416
/**
 * Check if the context is (already) connected, and waits if not.  If
 * the context has been disconnected, retry to connect.
 *
417 418
 * Caller must lock the main loop.
 *
419 420 421
 * @return true on success, false on error
 */
static bool
422
pulse_output_wait_connection(PulseOutput *po, Error &error)
423
{
424
	assert(po->mainloop != nullptr);
425

426
	pa_context_state_t state;
427

428
	if (po->context == nullptr && !pulse_output_setup_context(po, error))
429
		return false;
430 431 432 433 434 435 436 437 438 439 440 441

	while (true) {
		state = pa_context_get_state(po->context);
		switch (state) {
		case PA_CONTEXT_READY:
			/* nothing to do */
			return true;

		case PA_CONTEXT_UNCONNECTED:
		case PA_CONTEXT_TERMINATED:
		case PA_CONTEXT_FAILED:
			/* failure */
442
			SetError(error, po->context, "failed to connect");
443 444 445 446 447 448 449 450 451 452
			pulse_output_delete_context(po);
			return false;

		case PA_CONTEXT_CONNECTING:
		case PA_CONTEXT_AUTHORIZING:
		case PA_CONTEXT_SETTING_NAME:
			/* wait some more */
			pa_threaded_mainloop_wait(po->mainloop);
			break;
		}
453
	}
454
}
455

456
static void
457
pulse_output_stream_suspended_cb(gcc_unused pa_stream *stream, void *userdata)
458
{
459
	PulseOutput *po = (PulseOutput *)userdata;
460

461 462
	assert(stream == po->stream || po->stream == nullptr);
	assert(po->mainloop != nullptr);
463 464 465 466 467 468

	/* wake up the main loop to break out of the loop in
	   pulse_output_play() */
	pa_threaded_mainloop_signal(po->mainloop, 0);
}

469 470 471
static void
pulse_output_stream_state_cb(pa_stream *stream, void *userdata)
{
472
	PulseOutput *po = (PulseOutput *)userdata;
473

474 475 476
	assert(stream == po->stream || po->stream == nullptr);
	assert(po->mainloop != nullptr);
	assert(po->context != nullptr);
477

478 479
	switch (pa_stream_get_state(stream)) {
	case PA_STREAM_READY:
480
		if (po->mixer != nullptr)
481
			pulse_mixer_on_change(*po->mixer, po->context, stream);
482 483 484 485

		pa_threaded_mainloop_signal(po->mainloop, 0);
		break;

486 487
	case PA_STREAM_FAILED:
	case PA_STREAM_TERMINATED:
488
		if (po->mixer != nullptr)
489
			pulse_mixer_on_disconnect(*po->mixer);
490

491 492 493 494 495 496 497 498
		pa_threaded_mainloop_signal(po->mainloop, 0);
		break;

	case PA_STREAM_UNCONNECTED:
	case PA_STREAM_CREATING:
		break;
	}
}
499

500
static void
501
pulse_output_stream_write_cb(gcc_unused pa_stream *stream, size_t nbytes,
502 503
			     void *userdata)
{
504
	PulseOutput *po = (PulseOutput *)userdata;
505

506
	assert(po->mainloop != nullptr);
507

508 509
	po->writable = nbytes;
	pa_threaded_mainloop_signal(po->mainloop, 0);
510 511
}

512 513 514 515 516 517 518 519
/**
 * Create, set up and connect a context.
 *
 * Caller must lock the main loop.
 *
 * @return true on success, false on error
 */
static bool
520
pulse_output_setup_stream(PulseOutput *po, const pa_sample_spec *ss,
521
			  Error &error)
522
{
523 524
	assert(po != nullptr);
	assert(po->context != nullptr);
525

526 527 528 529 530
	/* WAVE-EX is been adopted as the speaker map for most media files */
	pa_channel_map chan_map;
	pa_channel_map_init_auto(&chan_map, ss->channels,
				 PA_CHANNEL_MAP_WAVEEX);
	po->stream = pa_stream_new(po->context, po->name, ss, &chan_map);
531
	if (po->stream == nullptr) {
532
		SetError(error, po->context, "pa_stream_new() has failed");
533 534 535 536 537 538 539 540 541 542 543 544 545 546
		return false;
	}

	pa_stream_set_suspended_callback(po->stream,
					 pulse_output_stream_suspended_cb, po);

	pa_stream_set_state_callback(po->stream,
				     pulse_output_stream_state_cb, po);
	pa_stream_set_write_callback(po->stream,
				     pulse_output_stream_write_cb, po);

	return true;
}

547
static bool
548
pulse_output_open(AudioOutput *ao, AudioFormat &audio_format,
549
		  Error &error)
550
{
551
	PulseOutput *po = (PulseOutput *)ao;
552 553
	pa_sample_spec ss;

554
	assert(po->mainloop != nullptr);
555

556 557
	pa_threaded_mainloop_lock(po->mainloop);

558
	if (po->context != nullptr) {
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
		switch (pa_context_get_state(po->context)) {
		case PA_CONTEXT_UNCONNECTED:
		case PA_CONTEXT_TERMINATED:
		case PA_CONTEXT_FAILED:
			/* the connection was closed meanwhile; delete
			   it, and pulse_output_wait_connection() will
			   reopen it */
			pulse_output_delete_context(po);
			break;

		case PA_CONTEXT_READY:
		case PA_CONTEXT_CONNECTING:
		case PA_CONTEXT_AUTHORIZING:
		case PA_CONTEXT_SETTING_NAME:
			break;
		}
	}

577
	if (!pulse_output_wait_connection(po, error)) {
578
		pa_threaded_mainloop_unlock(po->mainloop);
579
		return false;
580
	}
581

582 583
	/* MPD doesn't support the other pulseaudio sample formats, so
	   we just force MPD to send us everything as 16 bit */
584
	audio_format.format = SampleFormat::S16;
585 586

	ss.format = PA_SAMPLE_S16NE;
587 588
	ss.rate = audio_format.sample_rate;
	ss.channels = audio_format.channels;
589

590 591
	/* create a stream .. */

592
	if (!pulse_output_setup_stream(po, &ss, error)) {
593 594 595 596 597 598
		pa_threaded_mainloop_unlock(po->mainloop);
		return false;
	}

	/* .. and connect it (asynchronously) */

599 600 601
	if (pa_stream_connect_playback(po->stream, po->sink,
				       nullptr, pa_stream_flags_t(0),
				       nullptr, nullptr) < 0) {
602
		pulse_output_delete_stream(po);
603

604 605
		SetError(error, po->context,
			 "pa_stream_connect_playback() has failed");
606
		pa_threaded_mainloop_unlock(po->mainloop);
607
		return false;
608 609
	}

610 611
	pa_threaded_mainloop_unlock(po->mainloop);

612
	return true;
613 614
}

615
static void
616
pulse_output_close(AudioOutput *ao)
617
{
618
	PulseOutput *po = (PulseOutput *)ao;
619 620
	pa_operation *o;

621
	assert(po->mainloop != nullptr);
622

623 624 625 626 627
	pa_threaded_mainloop_lock(po->mainloop);

	if (pa_stream_get_state(po->stream) == PA_STREAM_READY) {
		o = pa_stream_drain(po->stream,
				    pulse_output_stream_success_cb, po);
628
		if (o == nullptr) {
629 630 631
			FormatWarning(pulse_output_domain,
				      "pa_stream_drain() has failed: %s",
				      pa_strerror(pa_context_errno(po->context)));
632 633 634 635
		} else
			pulse_wait_for_operation(po->mainloop, o);
	}

636
	pulse_output_delete_stream(po);
637

638
	if (po->context != nullptr &&
639 640 641 642 643 644 645 646 647 648 649 650 651
	    pa_context_get_state(po->context) != PA_CONTEXT_READY)
		pulse_output_delete_context(po);

	pa_threaded_mainloop_unlock(po->mainloop);
}

/**
 * Check if the stream is (already) connected, and waits if not.  The
 * mainloop must be locked before calling this function.
 *
 * @return true on success, false on error
 */
static bool
652
pulse_output_wait_stream(PulseOutput *po, Error &error)
653
{
654 655 656 657
	while (true) {
		switch (pa_stream_get_state(po->stream)) {
		case PA_STREAM_READY:
			return true;
658

659 660 661
		case PA_STREAM_FAILED:
		case PA_STREAM_TERMINATED:
		case PA_STREAM_UNCONNECTED:
662 663
			SetError(error, po->context,
				 "failed to connect the stream");
664
			return false;
665

666 667 668 669
		case PA_STREAM_CREATING:
			pa_threaded_mainloop_wait(po->mainloop);
			break;
		}
670
	}
671 672
}

673 674 675 676
/**
 * Sets cork mode on the stream.
 */
static bool
677
pulse_output_stream_pause(PulseOutput *po, bool pause,
678
			  Error &error)
679 680 681
{
	pa_operation *o;

682 683 684
	assert(po->mainloop != nullptr);
	assert(po->context != nullptr);
	assert(po->stream != nullptr);
685 686 687

	o = pa_stream_cork(po->stream, pause,
			   pulse_output_stream_success_cb, po);
688
	if (o == nullptr) {
689
		SetError(error, po->context, "pa_stream_cork() has failed");
690 691 692 693
		return false;
	}

	if (!pulse_wait_for_operation(po->mainloop, o)) {
694
		SetError(error, po->context, "pa_stream_cork() has failed");
695 696 697 698
		return false;
	}

	return true;
699 700
}

701
static unsigned
702
pulse_output_delay(AudioOutput *ao)
703
{
704
	PulseOutput *po = (PulseOutput *)ao;
705 706 707 708
	unsigned result = 0;

	pa_threaded_mainloop_lock(po->mainloop);

709
	if (po->base.pause && pa_stream_is_corked(po->stream) &&
710 711 712 713 714 715 716 717 718
	    pa_stream_get_state(po->stream) == PA_STREAM_READY)
		/* idle while paused */
		result = 1000;

	pa_threaded_mainloop_unlock(po->mainloop);

	return result;
}

719
static size_t
720
pulse_output_play(AudioOutput *ao, const void *chunk, size_t size,
721
		  Error &error)
722
{
723
	PulseOutput *po = (PulseOutput *)ao;
724

725 726
	assert(po->mainloop != nullptr);
	assert(po->stream != nullptr);
727 728 729 730 731

	pa_threaded_mainloop_lock(po->mainloop);

	/* check if the stream is (already) connected */

732
	if (!pulse_output_wait_stream(po, error)) {
733 734 735 736
		pa_threaded_mainloop_unlock(po->mainloop);
		return 0;
	}

737
	assert(po->context != nullptr);
738 739 740

	/* unpause if previously paused */

741
	if (pa_stream_is_corked(po->stream) &&
742
	    !pulse_output_stream_pause(po, false, error)) {
743
		pa_threaded_mainloop_unlock(po->mainloop);
744
		return 0;
745
	}
746 747 748 749

	/* wait until the server allows us to write */

	while (po->writable == 0) {
750 751
		if (pa_stream_is_suspended(po->stream)) {
			pa_threaded_mainloop_unlock(po->mainloop);
752
			error.Set(pulse_output_domain, "suspended");
753 754 755
			return 0;
		}

756 757 758 759
		pa_threaded_mainloop_wait(po->mainloop);

		if (pa_stream_get_state(po->stream) != PA_STREAM_READY) {
			pa_threaded_mainloop_unlock(po->mainloop);
760
			error.Set(pulse_output_domain, "disconnected");
761
			return 0;
762 763 764 765 766 767 768 769 770 771 772
		}
	}

	/* now write */

	if (size > po->writable)
		/* don't send more than possible */
		size = po->writable;

	po->writable -= size;

773 774
	int result = pa_stream_write(po->stream, chunk, size, nullptr,
				     0, PA_SEEK_RELATIVE);
775
	pa_threaded_mainloop_unlock(po->mainloop);
776 777
	if (result < 0) {
		SetError(error, po->context, "pa_stream_write() failed");
778
		return 0;
779 780
	}

781
	return size;
782 783
}

784
static void
785
pulse_output_cancel(AudioOutput *ao)
786
{
787
	PulseOutput *po = (PulseOutput *)ao;
788 789
	pa_operation *o;

790 791
	assert(po->mainloop != nullptr);
	assert(po->stream != nullptr);
792 793 794 795 796 797 798 799 800 801

	pa_threaded_mainloop_lock(po->mainloop);

	if (pa_stream_get_state(po->stream) != PA_STREAM_READY) {
		/* no need to flush when the stream isn't connected
		   yet */
		pa_threaded_mainloop_unlock(po->mainloop);
		return;
	}

802
	assert(po->context != nullptr);
803 804

	o = pa_stream_flush(po->stream, pulse_output_stream_success_cb, po);
805
	if (o == nullptr) {
806 807 808
		FormatWarning(pulse_output_domain,
			      "pa_stream_flush() has failed: %s",
			      pa_strerror(pa_context_errno(po->context)));
809 810 811 812 813 814 815 816 817
		pa_threaded_mainloop_unlock(po->mainloop);
		return;
	}

	pulse_wait_for_operation(po->mainloop, o);
	pa_threaded_mainloop_unlock(po->mainloop);
}

static bool
818
pulse_output_pause(AudioOutput *ao)
819
{
820
	PulseOutput *po = (PulseOutput *)ao;
821

822 823
	assert(po->mainloop != nullptr);
	assert(po->stream != nullptr);
824 825 826 827 828

	pa_threaded_mainloop_lock(po->mainloop);

	/* check if the stream is (already/still) connected */

829 830
	Error error;
	if (!pulse_output_wait_stream(po, error)) {
831
		pa_threaded_mainloop_unlock(po->mainloop);
832
		LogError(error);
833 834 835
		return false;
	}

836
	assert(po->context != nullptr);
837 838 839

	/* cork the stream */

840
	if (!pa_stream_is_corked(po->stream) &&
841
	    !pulse_output_stream_pause(po, true, error)) {
842
		pa_threaded_mainloop_unlock(po->mainloop);
843
		LogError(error);
844 845 846 847 848 849 850 851 852 853 854 855 856
		return false;
	}

	pa_threaded_mainloop_unlock(po->mainloop);

	return true;
}

static bool
pulse_output_test_default_device(void)
{
	bool success;

857
	const config_param empty;
858 859
	PulseOutput *po = (PulseOutput *)
		pulse_output_init(empty, IgnoreError());
860
	if (po == nullptr)
861 862
		return false;

863
	success = pulse_output_wait_connection(po, IgnoreError());
864
	pulse_output_finish(&po->base);
865 866 867 868

	return success;
}

869
const struct AudioOutputPlugin pulse_output_plugin = {
870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885
	"pulse",
	pulse_output_test_default_device,
	pulse_output_init,
	pulse_output_finish,
	pulse_output_enable,
	pulse_output_disable,
	pulse_output_open,
	pulse_output_close,
	pulse_output_delay,
	nullptr,
	pulse_output_play,
	nullptr,
	pulse_output_cancel,
	pulse_output_pause,

	&pulse_mixer_plugin,
886
};