HttpdOutputPlugin.cxx 8.58 KB
Newer Older
1
/*
2
 * Copyright 2003-2018 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

20
#include "config.h"
21 22 23
#include "HttpdOutputPlugin.hxx"
#include "HttpdInternal.hxx"
#include "HttpdClient.hxx"
24
#include "output/OutputAPI.hxx"
25
#include "encoder/EncoderInterface.hxx"
26
#include "encoder/Configured.hxx"
27
#include "net/UniqueSocketDescriptor.hxx"
28
#include "net/SocketAddress.hxx"
29
#include "net/ToString.hxx"
Max Kellermann's avatar
Max Kellermann committed
30
#include "Page.hxx"
31
#include "IcyMetaDataServer.hxx"
32
#include "event/Call.hxx"
33
#include "util/Domain.hxx"
34
#include "util/DeleteDisposer.hxx"
35
#include "Log.hxx"
36
#include "config/Net.hxx"
37 38 39

#include <assert.h>

Max Kellermann's avatar
Max Kellermann committed
40
#include <string.h>
41 42
#include <errno.h>

43
const Domain httpd_output_domain("httpd_output");
44

45
inline
46
HttpdOutput::HttpdOutput(EventLoop &_loop, const ConfigBlock &block)
47
	:AudioOutput(FLAG_ENABLE_DISABLE|FLAG_PAUSE),
48
	 ServerSocket(_loop),
49
	 prepared_encoder(CreateConfiguredEncoder(block)),
50
	 defer_broadcast(_loop, BIND_THIS_METHOD(OnDeferredBroadcast))
51 52
{
	/* read configuration */
53 54 55
	name = block.GetBlockValue("name", "Set name in config");
	genre = block.GetBlockValue("genre", "Set genre in config");
	website = block.GetBlockValue("website", "Set website in config");
56

57
	clients_max = block.GetBlockValue("max_clients", 0u);
58

59
	/* set up bind_to_address */
60

61
	ServerSocketAddGeneric(*this, block.GetBlockValue("bind_to_address"), block.GetBlockValue("port", 8000u));
62

63
	/* determine content type */
64
	content_type = prepared_encoder->GetMimeType();
65 66
	if (content_type == nullptr)
		content_type = "application/octet-stream";
67 68 69 70
}

inline void
HttpdOutput::Bind()
71
{
72 73 74 75 76 77 78 79
	open = false;

	BlockingCall(GetEventLoop(), [this](){
			ServerSocket::Open();
		});
}

inline void
80
HttpdOutput::Unbind() noexcept
81 82 83 84 85 86
{
	assert(!open);

	BlockingCall(GetEventLoop(), [this](){
			ServerSocket::Close();
		});
87 88
}

89
/**
90
 * Creates a new #HttpdClient object and adds it into the
91
 * HttpdOutput.clients linked list.
92
 */
93
inline void
94
HttpdOutput::AddClient(UniqueSocketDescriptor fd) noexcept
95
{
96
	auto *client = new HttpdClient(*this, std::move(fd), GetEventLoop(),
97
				       !encoder->ImplementsTag());
98
	clients.push_front(*client);
99 100

	/* pass metadata to client */
101 102
	if (metadata != nullptr)
		clients.front().PushMetaData(metadata);
103 104
}

105
void
106
HttpdOutput::OnDeferredBroadcast() noexcept
107 108 109 110
{
	/* this method runs in the IOThread; it broadcasts pages from
	   our own queue to all clients */

111
	const std::lock_guard<Mutex> protect(mutex);
112 113

	while (!pages.empty()) {
114
		PagePtr page = std::move(pages.front());
115 116 117 118 119 120 121 122 123 124 125
		pages.pop();

		for (auto &client : clients)
			client.PushPage(page);
	}

	/* wake up the client that may be waiting for the queue to be
	   flushed */
	cond.broadcast();
}

126
void
127
HttpdOutput::OnAccept(UniqueSocketDescriptor fd,
128
		      SocketAddress, gcc_unused int uid) noexcept
129 130 131 132
{
	/* the listener socket has become readable - a client has
	   connected */

133
	const std::lock_guard<Mutex> protect(mutex);
134

135 136
	/* can we allow additional client */
	if (open && (clients_max == 0 || clients.size() < clients_max))
137
		AddClient(std::move(fd));
138 139
}

140
PagePtr
141
HttpdOutput::ReadPage()
142
{
143
	if (unflushed_input >= 65536) {
144 145 146
		/* we have fed a lot of input into the encoder, but it
		   didn't give anything back yet - flush now to avoid
		   buffer underruns */
147 148
		try {
			encoder->Flush();
149
		} catch (...) {
150 151 152
			/* ignore */
		}

153
		unflushed_input = 0;
154 155
	}

156
	size_t size = 0;
157
	do {
158 159
		size_t nbytes = encoder->Read(buffer + size,
					      sizeof(buffer) - size);
160 161 162
		if (nbytes == 0)
			break;

163
		unflushed_input = 0;
164

165
		size += nbytes;
166
	} while (size < sizeof(buffer));
167 168

	if (size == 0)
169
		return nullptr;
170

171
	return std::make_shared<Page>(buffer, size);
172 173
}

174 175
inline void
HttpdOutput::OpenEncoder(AudioFormat &audio_format)
176
{
177
	encoder = prepared_encoder->Open(audio_format);
178 179 180 181

	/* we have to remember the encoder header, i.e. the first
	   bytes of encoder output after opening it, because it has to
	   be sent to every new client */
182
	header = ReadPage();
183

184
	unflushed_input = 0;
185 186
}

187
void
188
HttpdOutput::Open(AudioFormat &audio_format)
189
{
190 191
	assert(!open);
	assert(clients.empty());
192

193 194
	const std::lock_guard<Mutex> protect(mutex);

195
	OpenEncoder(audio_format);
196 197 198

	/* initialize other attributes */

199
	timer = new Timer(audio_format);
200

201
	open = true;
202
	pause = false;
203 204
}

205 206
void
HttpdOutput::Close() noexcept
207
{
208
	assert(open);
209

210
	delete timer;
211

212
	BlockingCall(GetEventLoop(), [this](){
213
			defer_broadcast.Cancel();
214

215 216
			const std::lock_guard<Mutex> protect(mutex);
			open = false;
217
			clients.clear_and_dispose(DeleteDisposer());
218
		});
219

220
	header.reset();
221

222
	delete encoder;
223 224 225
}

void
226
HttpdOutput::RemoveClient(HttpdClient &client) noexcept
227
{
228 229 230 231
	assert(!clients.empty());

	clients.erase_and_dispose(clients.iterator_to(client),
				  DeleteDisposer());
232 233 234
}

void
235
HttpdOutput::SendHeader(HttpdClient &client) const noexcept
236
{
237
	if (header != nullptr)
238
		client.PushPage(header);
239 240
}

241
std::chrono::steady_clock::duration
242
HttpdOutput::Delay() const noexcept
243
{
244
	if (!LockHasClients() && pause) {
245 246 247 248
		/* if there's no client and this output is paused,
		   then httpd_output_pause() will not do anything, it
		   will not fill the buffer and it will not update the
		   timer; therefore, we reset the timer here */
249
		timer->Reset();
250 251 252 253

		/* some arbitrary delay that is long enough to avoid
		   consuming too much CPU, and short enough to notice
		   new clients quickly enough */
254
		return std::chrono::seconds(1);
255 256
	}

257
	return timer->IsStarted()
258
		? timer->GetDelay()
259
		: std::chrono::steady_clock::duration::zero();
260 261
}

262
void
263
HttpdOutput::BroadcastPage(PagePtr page) noexcept
264
{
265
	assert(page != nullptr);
266

267 268 269 270
	{
		const std::lock_guard<Mutex> lock(mutex);
		pages.emplace(std::move(page));
	}
271

272
	defer_broadcast.Schedule();
273 274
}

275 276
void
HttpdOutput::BroadcastFromEncoder()
277
{
278
	/* synchronize with the IOThread */
279 280 281 282
	{
		const std::lock_guard<Mutex> lock(mutex);
		while (!pages.empty())
			cond.wait(mutex);
283
	}
284

285 286
	bool empty = true;

287 288 289
	PagePtr page;
	while ((page = ReadPage()) != nullptr) {
		const std::lock_guard<Mutex> lock(mutex);
290
		pages.emplace(std::move(page));
291
		empty = false;
292
	}
293

294
	if (!empty)
295
		defer_broadcast.Schedule();
296 297
}

298 299
inline void
HttpdOutput::EncodeAndPlay(const void *chunk, size_t size)
300
{
301
	encoder->Write(chunk, size);
302

303
	unflushed_input += size;
304

305
	BroadcastFromEncoder();
306 307
}

308
size_t
309
HttpdOutput::Play(const void *chunk, size_t size)
310
{
311 312
	pause = false;

313 314
	if (LockHasClients())
		EncodeAndPlay(chunk, size);
315

316 317 318
	if (!timer->IsStarted())
		timer->Start();
	timer->Add(size);
319 320 321 322

	return size;
}

323
bool
324
HttpdOutput::Pause()
325
{
326 327
	pause = true;

328
	if (LockHasClients()) {
329
		static const char silence[1020] = { 0 };
330
		Play(silence, sizeof(silence));
331
	}
332 333

	return true;
334 335
}

336
void
337
HttpdOutput::SendTag(const Tag &tag)
338
{
339
	if (encoder->ImplementsTag()) {
340
		/* embed encoder tags */
341 342 343

		/* flush the current stream, and end it */

344 345
		try {
			encoder->PreTag();
346
		} catch (...) {
347 348 349
			/* ignore */
		}

350
		BroadcastFromEncoder();
351 352 353

		/* send the tag to the encoder - which starts a new
		   stream now */
354

355 356
		try {
			encoder->SendTag(tag);
357
			encoder->Flush();
358
		} catch (...) {
359 360
			/* ignore */
		}
361 362 363 364 365

		/* the first page generated by the encoder will now be
		   used as the new "header" page, which is sent to all
		   new clients */

366
		auto page = ReadPage();
367
		if (page != nullptr) {
368
			header = page;
369
			BroadcastPage(page);
370
		}
371 372 373
	} else {
		/* use Icy-Metadata */

374
		static constexpr TagType types[] = {
375 376 377 378
			TAG_ALBUM, TAG_ARTIST, TAG_TITLE,
			TAG_NUM_OF_ITEM_TYPES
		};

379
		metadata = icy_server_metadata_page(tag, &types[0]);
380
		if (metadata != nullptr) {
381
			const std::lock_guard<Mutex> protect(mutex);
382 383
			for (auto &client : clients)
				client.PushMetaData(metadata);
384
		}
385
	}
386 387
}

388
inline void
389
HttpdOutput::CancelAllClients() noexcept
390
{
391
	const std::lock_guard<Mutex> protect(mutex);
392 393

	while (!pages.empty()) {
394
		PagePtr page = std::move(pages.front());
395 396 397
		pages.pop();
	}

398 399
	for (auto &client : clients)
		client.CancelQueue();
400 401

	cond.broadcast();
402 403
}

404
void
405
HttpdOutput::Cancel() noexcept
406
{
407
	BlockingCall(GetEventLoop(), [this](){
408
			CancelAllClients();
409
		});
410 411
}

412
const struct AudioOutputPlugin httpd_output_plugin = {
413 414
	"httpd",
	nullptr,
415
	&HttpdOutput::Create,
416
	nullptr,
417
};