PulseOutputPlugin.cxx 20.3 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright (C) 2003-2015 The Music Player Daemon Project
3
 * http://www.musicpd.org
4 5 6 7 8 9 10 11 12 13
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
14 15 16 17
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 19
 */

20
#include "config.h"
21
#include "PulseOutputPlugin.hxx"
22
#include "lib/pulse/Domain.hxx"
23
#include "lib/pulse/Error.hxx"
24
#include "lib/pulse/LogError.hxx"
25
#include "../OutputAPI.hxx"
26
#include "../Wrapper.hxx"
Max Kellermann's avatar
Max Kellermann committed
27 28
#include "mixer/MixerList.hxx"
#include "mixer/plugins/PulseMixerPlugin.hxx"
29
#include "util/Error.hxx"
30
#include "Log.hxx"
31

32 33 34
#include <pulse/thread-mainloop.h>
#include <pulse/context.h>
#include <pulse/stream.h>
35 36
#include <pulse/introspect.h>
#include <pulse/subscribe.h>
37
#include <pulse/version.h>
38

39
#include <assert.h>
40
#include <stddef.h>
41
#include <stdlib.h>
42

43
#define MPD_PULSE_NAME "Music Player Daemon"
44

45 46 47
class PulseOutput {
	friend struct AudioOutputWrapper<PulseOutput>;

48
	AudioOutput base;
49

50 51 52 53
	const char *name;
	const char *server;
	const char *sink;

54
	PulseMixer *mixer;
55 56 57 58 59 60

	struct pa_threaded_mainloop *mainloop;
	struct pa_context *context;
	struct pa_stream *stream;

	size_t writable;
61 62

	PulseOutput()
63 64 65
		:base(pulse_output_plugin),
		 mixer(nullptr),
		 mainloop(nullptr), stream(nullptr) {}
66

67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
public:
	void SetMixer(PulseMixer &_mixer);

	void ClearMixer(gcc_unused PulseMixer &old_mixer) {
		assert(mixer == &old_mixer);

		mixer = nullptr;
	}

	bool SetVolume(const pa_cvolume &volume, Error &error);

	void Lock() {
		pa_threaded_mainloop_lock(mainloop);
	}

	void Unlock() {
		pa_threaded_mainloop_unlock(mainloop);
	}

	void OnContextStateChanged(pa_context_state_t new_state);
	void OnServerLayoutChanged(pa_subscription_event_type_t t,
				   uint32_t idx);
	void OnStreamSuspended(pa_stream *_stream);
	void OnStreamStateChanged(pa_stream *_stream,
				  pa_stream_state_t new_state);
	void OnStreamWrite(size_t nbytes);

	void OnStreamSuccess() {
95
		Signal();
96 97
	}

98 99 100
	gcc_const
	static bool TestDefaultDevice();

101 102
	bool Configure(const ConfigBlock &block, Error &error);
	static PulseOutput *Create(const ConfigBlock &block, Error &error);
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138

	bool Enable(Error &error);
	void Disable();

	bool Open(AudioFormat &audio_format, Error &error);
	void Close();

	unsigned Delay();
	size_t Play(const void *chunk, size_t size, Error &error);
	void Cancel();
	bool Pause();

private:
	/**
	 * Attempt to connect asynchronously to the PulseAudio server.
	 *
	 * @return true on success, false on error
	 */
	bool Connect(Error &error);

	/**
	 * Create, set up and connect a context.
	 *
	 * Caller must lock the main loop.
	 *
	 * @return true on success, false on error
	 */
	bool SetupContext(Error &error);

	/**
	 * Frees and clears the context.
	 *
	 * Caller must lock the main loop.
	 */
	void DeleteContext();

139 140 141 142
	void Signal() {
		pa_threaded_mainloop_signal(mainloop, 0);
	}

143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
	/**
	 * Check if the context is (already) connected, and waits if
	 * not.  If the context has been disconnected, retry to
	 * connect.
	 *
	 * Caller must lock the main loop.
	 *
	 * @return true on success, false on error
	 */
	bool WaitConnection(Error &error);

	/**
	 * Create, set up and connect a context.
	 *
	 * Caller must lock the main loop.
	 *
	 * @return true on success, false on error
	 */
	bool SetupStream(const pa_sample_spec &ss, Error &error);

	/**
	 * Frees and clears the stream.
	 */
	void DeleteStream();

	/**
	 * Check if the stream is (already) connected, and waits if
	 * not.  The mainloop must be locked before calling this
	 * function.
	 *
	 * @return true on success, false on error
	 */
	bool WaitStream(Error &error);

	/**
	 * Sets cork mode on the stream.
	 */
	bool StreamPause(bool pause, Error &error);
181 182 183
};

void
184
pulse_output_lock(PulseOutput &po)
185
{
186
	po.Lock();
187 188 189
}

void
190
pulse_output_unlock(PulseOutput &po)
191
{
192
	po.Unlock();
193 194
}

195 196
inline void
PulseOutput::SetMixer(PulseMixer &_mixer)
197
{
198
	assert(mixer == nullptr);
199

200
	mixer = &_mixer;
201

202
	if (mainloop == nullptr)
203 204
		return;

205
	pa_threaded_mainloop_lock(mainloop);
206

207 208 209
	if (context != nullptr &&
	    pa_context_get_state(context) == PA_CONTEXT_READY) {
		pulse_mixer_on_connect(_mixer, context);
210

211 212 213
		if (stream != nullptr &&
		    pa_stream_get_state(stream) == PA_STREAM_READY)
			pulse_mixer_on_change(_mixer, context, stream);
214 215
	}

216
	pa_threaded_mainloop_unlock(mainloop);
217 218 219
}

void
220
pulse_output_set_mixer(PulseOutput &po, PulseMixer &pm)
221
{
222
	po.SetMixer(pm);
223 224
}

225 226
void
pulse_output_clear_mixer(PulseOutput &po, PulseMixer &pm)
227
{
228 229
	po.ClearMixer(pm);
}
230

231 232 233 234 235
inline bool
PulseOutput::SetVolume(const pa_cvolume &volume, Error &error)
{
	if (context == nullptr || stream == nullptr ||
	    pa_stream_get_state(stream) != PA_STREAM_READY) {
236
		error.Set(pulse_domain, "disconnected");
237 238 239
		return false;
	}

240 241 242 243
	pa_operation *o =
		pa_context_set_sink_input_volume(context,
						 pa_stream_get_index(stream),
						 &volume, nullptr, nullptr);
244
	if (o == nullptr) {
245
		SetPulseError(error, context,
246
			      "failed to set PulseAudio volume");
247 248 249 250 251 252 253
		return false;
	}

	pa_operation_unref(o);
	return true;
}

254 255 256 257 258 259 260
bool
pulse_output_set_volume(PulseOutput &po, const pa_cvolume *volume,
			Error &error)
{
	return po.SetVolume(*volume, error);
}

261 262 263 264 265 266 267 268 269 270 271
/**
 * \brief waits for a pulseaudio operation to finish, frees it and
 *     unlocks the mainloop
 * \param operation the operation to wait for
 * \return true if operation has finished normally (DONE state),
 *     false otherwise
 */
static bool
pulse_wait_for_operation(struct pa_threaded_mainloop *mainloop,
			 struct pa_operation *operation)
{
272 273
	assert(mainloop != nullptr);
	assert(operation != nullptr);
274

275 276 277
	pa_operation_state_t state;
	while ((state = pa_operation_get_state(operation))
	       == PA_OPERATION_RUNNING)
278 279 280 281 282 283 284 285 286 287 288 289
		pa_threaded_mainloop_wait(mainloop);

	pa_operation_unref(operation);

	return state == PA_OPERATION_DONE;
}

/**
 * Callback function for stream operation.  It just sends a signal to
 * the caller thread, to wake pulse_wait_for_operation() up.
 */
static void
290 291
pulse_output_stream_success_cb(gcc_unused pa_stream *s,
			       gcc_unused int success, void *userdata)
292
{
293
	PulseOutput &po = *(PulseOutput *)userdata;
294

295
	po.OnStreamSuccess();
296 297
}

298 299
inline void
PulseOutput::OnContextStateChanged(pa_context_state_t new_state)
300
{
301
	switch (new_state) {
302
	case PA_CONTEXT_READY:
303 304
		if (mixer != nullptr)
			pulse_mixer_on_connect(*mixer, context);
305

306
		Signal();
307 308
		break;

309 310
	case PA_CONTEXT_TERMINATED:
	case PA_CONTEXT_FAILED:
311 312
		if (mixer != nullptr)
			pulse_mixer_on_disconnect(*mixer);
313

314 315
		/* the caller thread might be waiting for these
		   states */
316
		Signal();
317 318 319 320 321 322 323 324 325 326
		break;

	case PA_CONTEXT_UNCONNECTED:
	case PA_CONTEXT_CONNECTING:
	case PA_CONTEXT_AUTHORIZING:
	case PA_CONTEXT_SETTING_NAME:
		break;
	}
}

327
static void
328 329 330 331 332 333 334 335 336 337
pulse_output_context_state_cb(struct pa_context *context, void *userdata)
{
	PulseOutput &po = *(PulseOutput *)userdata;

	po.OnContextStateChanged(pa_context_get_state(context));
}

inline void
PulseOutput::OnServerLayoutChanged(pa_subscription_event_type_t t,
				   uint32_t idx)
338
{
339 340 341 342
	pa_subscription_event_type_t facility =
		pa_subscription_event_type_t(t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK);
	pa_subscription_event_type_t type =
		pa_subscription_event_type_t(t & PA_SUBSCRIPTION_EVENT_TYPE_MASK);
343

344
	if (mixer != nullptr &&
345
	    facility == PA_SUBSCRIPTION_EVENT_SINK_INPUT &&
346 347 348
	    stream != nullptr &&
	    pa_stream_get_state(stream) == PA_STREAM_READY &&
	    idx == pa_stream_get_index(stream) &&
349 350
	    (type == PA_SUBSCRIPTION_EVENT_NEW ||
	     type == PA_SUBSCRIPTION_EVENT_CHANGE))
351 352 353 354 355 356 357 358 359 360 361
		pulse_mixer_on_change(*mixer, context, stream);
}

static void
pulse_output_subscribe_cb(gcc_unused pa_context *context,
			  pa_subscription_event_type_t t,
			  uint32_t idx, void *userdata)
{
	PulseOutput &po = *(PulseOutput *)userdata;

	po.OnServerLayoutChanged(t, idx);
362 363
}

364 365
inline bool
PulseOutput::Connect(Error &error)
366
{
367
	assert(context != nullptr);
368

369
	if (pa_context_connect(context, server,
370
			       (pa_context_flags_t)0, nullptr) < 0) {
371
		SetPulseError(error, context,
372
			      "pa_context_connect() has failed");
373 374
		return false;
	}
375

376 377
	return true;
}
378

379 380
void
PulseOutput::DeleteStream()
381
{
382
	assert(stream != nullptr);
383

384
	pa_stream_set_suspended_callback(stream, nullptr, nullptr);
385

386 387
	pa_stream_set_state_callback(stream, nullptr, nullptr);
	pa_stream_set_write_callback(stream, nullptr, nullptr);
388

389 390 391
	pa_stream_disconnect(stream);
	pa_stream_unref(stream);
	stream = nullptr;
392 393
}

394 395
void
PulseOutput::DeleteContext()
396
{
397
	assert(context != nullptr);
398

399 400
	pa_context_set_state_callback(context, nullptr, nullptr);
	pa_context_set_subscribe_callback(context, nullptr, nullptr);
401

402 403 404
	pa_context_disconnect(context);
	pa_context_unref(context);
	context = nullptr;
405 406
}

407 408
bool
PulseOutput::SetupContext(Error &error)
409
{
410
	assert(mainloop != nullptr);
411

412 413 414
	context = pa_context_new(pa_threaded_mainloop_get_api(mainloop),
				 MPD_PULSE_NAME);
	if (context == nullptr) {
415
		error.Set(pulse_domain, "pa_context_new() has failed");
416 417 418
		return false;
	}

419 420 421 422
	pa_context_set_state_callback(context,
				      pulse_output_context_state_cb, this);
	pa_context_set_subscribe_callback(context,
					  pulse_output_subscribe_cb, this);
423

424 425
	if (!Connect(error)) {
		DeleteContext();
426 427 428 429
		return false;
	}

	return true;
430 431
}

432
inline bool
433
PulseOutput::Configure(const ConfigBlock &block, Error &error)
434
{
435
	if (!base.Configure(block, error))
436 437
		return false;

438 439 440
	name = block.GetBlockValue("name", "mpd_pulse");
	server = block.GetBlockValue("server");
	sink = block.GetBlockValue("sink");
441 442 443 444

	return true;
}

445
PulseOutput *
446
PulseOutput::Create(const ConfigBlock &block, Error &error)
447
{
448
	setenv("PULSE_PROP_media.role", "music", true);
449
	setenv("PULSE_PROP_application.icon_name", "mpd", true);
450

451
	auto *po = new PulseOutput();
452
	if (!po->Configure(block, error)) {
453 454
		delete po;
		return nullptr;
455 456
	}

457
	return po;
458 459
}

460 461
inline bool
PulseOutput::Enable(Error &error)
462
{
463
	assert(mainloop == nullptr);
464

465 466
	/* create the libpulse mainloop and start the thread */

467 468
	mainloop = pa_threaded_mainloop_new();
	if (mainloop == nullptr) {
469
		error.Set(pulse_domain,
470
			  "pa_threaded_mainloop_new() has failed");
471
		return false;
472 473
	}

474
	pa_threaded_mainloop_lock(mainloop);
475

476 477 478 479
	if (pa_threaded_mainloop_start(mainloop) < 0) {
		pa_threaded_mainloop_unlock(mainloop);
		pa_threaded_mainloop_free(mainloop);
		mainloop = nullptr;
480

481
		error.Set(pulse_domain,
482
			  "pa_threaded_mainloop_start() has failed");
483 484 485 486 487
		return false;
	}

	/* create the libpulse context and connect it */

488 489 490 491 492
	if (!SetupContext(error)) {
		pa_threaded_mainloop_unlock(mainloop);
		pa_threaded_mainloop_stop(mainloop);
		pa_threaded_mainloop_free(mainloop);
		mainloop = nullptr;
493
		return false;
494 495
	}

496
	pa_threaded_mainloop_unlock(mainloop);
497

498
	return true;
499 500
}

501 502
inline void
PulseOutput::Disable()
503
{
504
	assert(mainloop != nullptr);
505

506 507 508 509 510
	pa_threaded_mainloop_stop(mainloop);
	if (context != nullptr)
		DeleteContext();
	pa_threaded_mainloop_free(mainloop);
	mainloop = nullptr;
511 512
}

513 514
bool
PulseOutput::WaitConnection(Error &error)
515
{
516
	assert(mainloop != nullptr);
517

518
	pa_context_state_t state;
519

520
	if (context == nullptr && !SetupContext(error))
521
		return false;
522 523

	while (true) {
524
		state = pa_context_get_state(context);
525 526 527 528 529 530 531 532 533
		switch (state) {
		case PA_CONTEXT_READY:
			/* nothing to do */
			return true;

		case PA_CONTEXT_UNCONNECTED:
		case PA_CONTEXT_TERMINATED:
		case PA_CONTEXT_FAILED:
			/* failure */
534 535
			SetPulseError(error, context, "failed to connect");
			DeleteContext();
536 537 538 539 540 541
			return false;

		case PA_CONTEXT_CONNECTING:
		case PA_CONTEXT_AUTHORIZING:
		case PA_CONTEXT_SETTING_NAME:
			/* wait some more */
542
			pa_threaded_mainloop_wait(mainloop);
543 544
			break;
		}
545
	}
546
}
547

548 549
inline void
PulseOutput::OnStreamSuspended(gcc_unused pa_stream *_stream)
550
{
551 552
	assert(_stream == stream || stream == nullptr);
	assert(mainloop != nullptr);
553 554 555

	/* wake up the main loop to break out of the loop in
	   pulse_output_play() */
556
	Signal();
557 558
}

559
static void
560
pulse_output_stream_suspended_cb(pa_stream *stream, void *userdata)
561
{
562
	PulseOutput &po = *(PulseOutput *)userdata;
563

564 565 566 567 568 569 570 571 572 573
	po.OnStreamSuspended(stream);
}

inline void
PulseOutput::OnStreamStateChanged(pa_stream *_stream,
				  pa_stream_state_t new_state)
{
	assert(_stream == stream || stream == nullptr);
	assert(mainloop != nullptr);
	assert(context != nullptr);
574

575
	switch (new_state) {
576
	case PA_STREAM_READY:
577 578
		if (mixer != nullptr)
			pulse_mixer_on_change(*mixer, context, _stream);
579

580
		Signal();
581 582
		break;

583 584
	case PA_STREAM_FAILED:
	case PA_STREAM_TERMINATED:
585 586
		if (mixer != nullptr)
			pulse_mixer_on_disconnect(*mixer);
587

588
		Signal();
589 590 591 592 593 594 595
		break;

	case PA_STREAM_UNCONNECTED:
	case PA_STREAM_CREATING:
		break;
	}
}
596

597 598 599 600 601 602 603 604 605 606 607 608 609 610
static void
pulse_output_stream_state_cb(pa_stream *stream, void *userdata)
{
	PulseOutput &po = *(PulseOutput *)userdata;

	return po.OnStreamStateChanged(stream, pa_stream_get_state(stream));
}

inline void
PulseOutput::OnStreamWrite(size_t nbytes)
{
	assert(mainloop != nullptr);

	writable = nbytes;
611
	Signal();
612 613
}

614
static void
615
pulse_output_stream_write_cb(gcc_unused pa_stream *stream, size_t nbytes,
616 617
			     void *userdata)
{
618
	PulseOutput &po = *(PulseOutput *)userdata;
619

620
	return po.OnStreamWrite(nbytes);
621 622
}

623 624
inline bool
PulseOutput::SetupStream(const pa_sample_spec &ss, Error &error)
625
{
626
	assert(context != nullptr);
627

628 629
	/* WAVE-EX is been adopted as the speaker map for most media files */
	pa_channel_map chan_map;
630
	pa_channel_map_init_auto(&chan_map, ss.channels,
631
				 PA_CHANNEL_MAP_WAVEEX);
632 633 634
	stream = pa_stream_new(context, name, &ss, &chan_map);
	if (stream == nullptr) {
		SetPulseError(error, context,
635
			      "pa_stream_new() has failed");
636 637 638
		return false;
	}

639 640 641
	pa_stream_set_suspended_callback(stream,
					 pulse_output_stream_suspended_cb,
					 this);
642

643 644 645 646
	pa_stream_set_state_callback(stream,
				     pulse_output_stream_state_cb, this);
	pa_stream_set_write_callback(stream,
				     pulse_output_stream_write_cb, this);
647 648 649 650

	return true;
}

651 652
inline bool
PulseOutput::Open(AudioFormat &audio_format, Error &error)
653
{
654
	assert(mainloop != nullptr);
655

656
	pa_threaded_mainloop_lock(mainloop);
657

658 659
	if (context != nullptr) {
		switch (pa_context_get_state(context)) {
660 661 662 663 664 665
		case PA_CONTEXT_UNCONNECTED:
		case PA_CONTEXT_TERMINATED:
		case PA_CONTEXT_FAILED:
			/* the connection was closed meanwhile; delete
			   it, and pulse_output_wait_connection() will
			   reopen it */
666
			DeleteContext();
667 668 669 670 671 672 673 674 675 676
			break;

		case PA_CONTEXT_READY:
		case PA_CONTEXT_CONNECTING:
		case PA_CONTEXT_AUTHORIZING:
		case PA_CONTEXT_SETTING_NAME:
			break;
		}
	}

677 678
	if (!WaitConnection(error)) {
		pa_threaded_mainloop_unlock(mainloop);
679
		return false;
680
	}
681

682 683
	/* MPD doesn't support the other pulseaudio sample formats, so
	   we just force MPD to send us everything as 16 bit */
684
	audio_format.format = SampleFormat::S16;
685

686
	pa_sample_spec ss;
687
	ss.format = PA_SAMPLE_S16NE;
688 689
	ss.rate = audio_format.sample_rate;
	ss.channels = audio_format.channels;
690

691 692
	/* create a stream .. */

693 694
	if (!SetupStream(ss, error)) {
		pa_threaded_mainloop_unlock(mainloop);
695 696 697 698 699
		return false;
	}

	/* .. and connect it (asynchronously) */

700
	if (pa_stream_connect_playback(stream, sink,
701 702
				       nullptr, pa_stream_flags_t(0),
				       nullptr, nullptr) < 0) {
703
		DeleteStream();
704

705
		SetPulseError(error, context,
706
			      "pa_stream_connect_playback() has failed");
707
		pa_threaded_mainloop_unlock(mainloop);
708
		return false;
709 710
	}

711
	pa_threaded_mainloop_unlock(mainloop);
712
	return true;
713 714
}

715 716
inline void
PulseOutput::Close()
717
{
718
	assert(mainloop != nullptr);
719

720
	pa_threaded_mainloop_lock(mainloop);
721

722 723 724 725
	if (pa_stream_get_state(stream) == PA_STREAM_READY) {
		pa_operation *o =
			pa_stream_drain(stream,
					pulse_output_stream_success_cb, this);
726
		if (o == nullptr) {
727
			LogPulseError(context,
728
				      "pa_stream_drain() has failed");
729
		} else
730
			pulse_wait_for_operation(mainloop, o);
731 732
	}

733
	DeleteStream();
734

735 736 737
	if (context != nullptr &&
	    pa_context_get_state(context) != PA_CONTEXT_READY)
		DeleteContext();
738

739
	pa_threaded_mainloop_unlock(mainloop);
740 741
}

742 743
bool
PulseOutput::WaitStream(Error &error)
744
{
745
	while (true) {
746
		switch (pa_stream_get_state(stream)) {
747 748
		case PA_STREAM_READY:
			return true;
749

750 751 752
		case PA_STREAM_FAILED:
		case PA_STREAM_TERMINATED:
		case PA_STREAM_UNCONNECTED:
753
			SetPulseError(error, context,
754
				      "failed to connect the stream");
755
			return false;
756

757
		case PA_STREAM_CREATING:
758
			pa_threaded_mainloop_wait(mainloop);
759 760
			break;
		}
761
	}
762 763
}

764 765
bool
PulseOutput::StreamPause(bool pause, Error &error)
766
{
767 768 769
	assert(mainloop != nullptr);
	assert(context != nullptr);
	assert(stream != nullptr);
770

771 772
	pa_operation *o = pa_stream_cork(stream, pause,
					 pulse_output_stream_success_cb, this);
773
	if (o == nullptr) {
774
		SetPulseError(error, context,
775
			      "pa_stream_cork() has failed");
776 777 778
		return false;
	}

779 780
	if (!pulse_wait_for_operation(mainloop, o)) {
		SetPulseError(error, context,
781
			      "pa_stream_cork() has failed");
782 783 784 785
		return false;
	}

	return true;
786 787
}

788 789
inline unsigned
PulseOutput::Delay()
790
{
791
	pa_threaded_mainloop_lock(mainloop);
792

793 794 795
	unsigned result = 0;
	if (base.pause && pa_stream_is_corked(stream) &&
	    pa_stream_get_state(stream) == PA_STREAM_READY)
796 797 798
		/* idle while paused */
		result = 1000;

799
	pa_threaded_mainloop_unlock(mainloop);
800 801 802 803

	return result;
}

804 805
inline size_t
PulseOutput::Play(const void *chunk, size_t size, Error &error)
806
{
807 808
	assert(mainloop != nullptr);
	assert(stream != nullptr);
809

810
	pa_threaded_mainloop_lock(mainloop);
811 812 813

	/* check if the stream is (already) connected */

814 815
	if (!WaitStream(error)) {
		pa_threaded_mainloop_unlock(mainloop);
816 817 818
		return 0;
	}

819
	assert(context != nullptr);
820 821 822

	/* unpause if previously paused */

823 824
	if (pa_stream_is_corked(stream) && !StreamPause(false, error)) {
		pa_threaded_mainloop_unlock(mainloop);
825
		return 0;
826
	}
827 828 829

	/* wait until the server allows us to write */

830 831 832
	while (writable == 0) {
		if (pa_stream_is_suspended(stream)) {
			pa_threaded_mainloop_unlock(mainloop);
833
			error.Set(pulse_domain, "suspended");
834 835 836
			return 0;
		}

837
		pa_threaded_mainloop_wait(mainloop);
838

839 840
		if (pa_stream_get_state(stream) != PA_STREAM_READY) {
			pa_threaded_mainloop_unlock(mainloop);
841
			error.Set(pulse_domain, "disconnected");
842
			return 0;
843 844 845 846 847
		}
	}

	/* now write */

848
	if (size > writable)
849
		/* don't send more than possible */
850
		size = writable;
851

852
	writable -= size;
853

854
	int result = pa_stream_write(stream, chunk, size, nullptr,
855
				     0, PA_SEEK_RELATIVE);
856
	pa_threaded_mainloop_unlock(mainloop);
857
	if (result < 0) {
858
		SetPulseError(error, context, "pa_stream_write() failed");
859
		return 0;
860 861
	}

862
	return size;
863 864
}

865 866
inline void
PulseOutput::Cancel()
867
{
868 869
	assert(mainloop != nullptr);
	assert(stream != nullptr);
870

871
	pa_threaded_mainloop_lock(mainloop);
872

873
	if (pa_stream_get_state(stream) != PA_STREAM_READY) {
874 875
		/* no need to flush when the stream isn't connected
		   yet */
876
		pa_threaded_mainloop_unlock(mainloop);
877 878 879
		return;
	}

880
	assert(context != nullptr);
881

882 883 884
	pa_operation *o = pa_stream_flush(stream,
					  pulse_output_stream_success_cb,
					  this);
885
	if (o == nullptr) {
886 887
		LogPulseError(context, "pa_stream_flush() has failed");
		pa_threaded_mainloop_unlock(mainloop);
888 889 890
		return;
	}

891 892
	pulse_wait_for_operation(mainloop, o);
	pa_threaded_mainloop_unlock(mainloop);
893 894
}

895 896
inline bool
PulseOutput::Pause()
897
{
898 899
	assert(mainloop != nullptr);
	assert(stream != nullptr);
900

901
	pa_threaded_mainloop_lock(mainloop);
902 903 904

	/* check if the stream is (already/still) connected */

905
	Error error;
906 907
	if (!WaitStream(error)) {
		pa_threaded_mainloop_unlock(mainloop);
908
		LogError(error);
909 910 911
		return false;
	}

912
	assert(context != nullptr);
913 914 915

	/* cork the stream */

916 917
	if (!pa_stream_is_corked(stream) && !StreamPause(true, error)) {
		pa_threaded_mainloop_unlock(mainloop);
918
		LogError(error);
919 920 921
		return false;
	}

922
	pa_threaded_mainloop_unlock(mainloop);
923 924 925
	return true;
}

926 927
inline bool
PulseOutput::TestDefaultDevice()
928
{
929
	const ConfigBlock empty;
930
	PulseOutput *po = PulseOutput::Create(empty, IgnoreError());
931
	if (po == nullptr)
932 933
		return false;

934 935
	bool success = po->WaitConnection(IgnoreError());
	delete po;
936 937 938
	return success;
}

939 940 941 942 943 944 945 946
static bool
pulse_output_test_default_device(void)
{
	return PulseOutput::TestDefaultDevice();
}

typedef AudioOutputWrapper<PulseOutput> Wrapper;

947
const struct AudioOutputPlugin pulse_output_plugin = {
948 949
	"pulse",
	pulse_output_test_default_device,
950
	&Wrapper::Init,
951 952 953 954 955 956
	&Wrapper::Finish,
	&Wrapper::Enable,
	&Wrapper::Disable,
	&Wrapper::Open,
	&Wrapper::Close,
	&Wrapper::Delay,
957
	nullptr,
958
	&Wrapper::Play,
959
	nullptr,
960 961
	&Wrapper::Cancel,
	&Wrapper::Pause,
962 963

	&pulse_mixer_plugin,
964
};