ThreadInputStream.hxx 3.9 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2017 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#ifndef MPD_THREAD_INPUT_STREAM_HXX
#define MPD_THREAD_INPUT_STREAM_HXX

#include "check.h"
#include "InputStream.hxx"
#include "thread/Thread.hxx"
#include "thread/Cond.hxx"
27 28
#include "util/HugeAllocator.hxx"
#include "util/CircularBuffer.hxx"
29 30

#include <exception>
31

32
#include <assert.h>
33 34 35 36 37 38 39 40 41 42
#include <stdint.h>

/**
 * Helper class for moving InputStream implementations with blocking
 * backend library implementation to a dedicated thread.  Data is
 * being read into a ring buffer, and that buffer is then consumed by
 * another thread using the regular #InputStream API.  This class
 * manages the thread and the buffer.
 *
 * This works only for "streams": unknown length, no seeking, no tags.
43 44 45 46 47
 *
 * The implementation must call Stop() before its destruction
 * completes.  This cannot be done in ~ThreadInputStream() because at
 * this point, the class has been morphed back to #ThreadInputStream
 * and the still-running thread will crash due to pure method call.
48
 */
49
class ThreadInputStream : public InputStream {
50 51
	const char *const plugin;

52 53 54 55 56 57 58 59 60
	Thread thread;

	/**
	 * Signalled when the thread shall be woken up: when data from
	 * the buffer has been consumed and when the stream shall be
	 * closed.
	 */
	Cond wake_cond;

61
	std::exception_ptr postponed_exception;
62

63 64 65
	HugeArray<uint8_t> allocation;

	CircularBuffer<uint8_t> buffer;
66 67 68 69

	/**
	 * Shall the stream be closed?
	 */
70
	bool close = false;
71 72 73 74

	/**
	 * Has the end of the stream been seen by the thread?
	 */
75
	bool eof = false;
76 77

public:
78
	ThreadInputStream(const char *_plugin,
79
			  const char *_uri, Mutex &_mutex,
80
			  size_t _buffer_size) noexcept;
81

82 83 84 85 86 87
#ifndef NDEBUG
	~ThreadInputStream() override {
		/* Stop() must have been called already */
		assert(!thread.IsDefined());
	}
#endif
88 89 90 91

	/**
	 * Initialize the object and start the thread.
	 */
92
	void Start();
93

94
	/* virtual methods from InputStream */
95
	void Check() override final;
96 97
	bool IsEOF() noexcept final;
	bool IsAvailable() noexcept final;
98
	size_t Read(void *ptr, size_t size) override final;
99

100
protected:
101 102 103 104 105 106
	/**
	 * Stop the thread and free the buffer.  This must be called
	 * before destruction of this object completes.
	 */
	void Stop() noexcept;

107
	void SetMimeType(const char *_mime) noexcept {
108 109
		assert(thread.IsInside());

110
		InputStream::SetMimeType(_mime);
111 112 113 114 115 116 117 118 119 120 121
	}

	/* to be implemented by the plugin */

	/**
	 * Optional initialization after entering the thread.  After
	 * this returns with success, the InputStream::ready flag is
	 * set.
	 *
	 * The #InputStream is locked.  Unlock/relock it if you do a
	 * blocking operation.
122 123
	 *
	 * Throws std::runtime_error on error.
124
	 */
125
	virtual void Open() {
126 127 128 129 130 131 132
	}

	/**
	 * Read from the stream.
	 *
	 * The #InputStream is not locked.
	 *
133 134 135
	 * Throws std::runtime_error on error.
	 *
	 * @return 0 on end-of-file
136
	 */
137
	virtual size_t ThreadRead(void *ptr, size_t size) = 0;
138 139 140 141 142 143

	/**
	 * Optional deinitialization before leaving the thread.
	 *
	 * The #InputStream is not locked.
	 */
144
	virtual void Close() noexcept {}
145 146 147 148 149 150 151

	/**
	 * Called from the client thread to cancel a Read() inside the
	 * thread.
	 *
	 * The #InputStream is not locked.
	 */
152
	virtual void Cancel() noexcept {}
153 154

private:
155
	void ThreadFunc() noexcept;
156 157 158
};

#endif