Commit f1d07002 authored by Max Kellermann's avatar Max Kellermann

input/plugins: make InputStream the base class

Prepare for adding virtual methods.
parent e1383a2d
......@@ -88,9 +88,7 @@ public:
Error &error) override;
};
struct Bzip2InputStream {
InputStream base;
struct Bzip2InputStream final : public InputStream {
Bzip2ArchiveFile *archive;
bool eof;
......@@ -130,7 +128,7 @@ Bzip2InputStream::Open(Error &error)
return false;
}
base.ready = true;
SetReady();
return true;
}
......@@ -157,9 +155,10 @@ bz2_open(Path pathname, Error &error)
/* single archive handling */
Bzip2InputStream::Bzip2InputStream(Bzip2ArchiveFile &_context, const char *uri,
Mutex &mutex, Cond &cond)
:base(bz2_inputplugin, uri, mutex, cond),
Bzip2InputStream::Bzip2InputStream(Bzip2ArchiveFile &_context,
const char *_uri,
Mutex &_mutex, Cond &_cond)
:InputStream(bz2_inputplugin, _uri, _mutex, _cond),
archive(&_context), eof(false)
{
archive->Ref();
......@@ -181,7 +180,7 @@ Bzip2ArchiveFile::OpenStream(const char *path,
return nullptr;
}
return &bis->base;
return bis;
}
static void
......
......@@ -140,21 +140,19 @@ Iso9660ArchiveFile::Visit(ArchiveVisitor &visitor)
/* single archive handling */
class Iso9660InputStream {
InputStream base;
class Iso9660InputStream final : public InputStream {
Iso9660ArchiveFile &archive;
iso9660_stat_t *statbuf;
public:
Iso9660InputStream(Iso9660ArchiveFile &_archive, const char *uri,
Mutex &mutex, Cond &cond,
Iso9660InputStream(Iso9660ArchiveFile &_archive, const char *_uri,
Mutex &_mutex, Cond &_cond,
iso9660_stat_t *_statbuf)
:base(iso9660_input_plugin, uri, mutex, cond),
:InputStream(iso9660_input_plugin, _uri, _mutex, _cond),
archive(_archive), statbuf(_statbuf) {
base.ready = true;
base.size = statbuf->size;
size = statbuf->size;
SetReady();
archive.Ref();
}
......@@ -164,10 +162,6 @@ public:
archive.Unref();
}
InputStream *Get() {
return &base;
}
size_t Read(void *ptr, size_t size, Error &error);
};
......@@ -183,10 +177,8 @@ Iso9660ArchiveFile::OpenStream(const char *pathname,
return nullptr;
}
Iso9660InputStream *iis =
new Iso9660InputStream(*this, pathname, mutex, cond,
statbuf);
return iis->Get();
return new Iso9660InputStream(*this, pathname, mutex, cond,
statbuf);
}
static void
......@@ -198,22 +190,22 @@ iso9660_input_close(InputStream *is)
}
inline size_t
Iso9660InputStream::Read(void *ptr, size_t size, Error &error)
Iso9660InputStream::Read(void *ptr, size_t read_size, Error &error)
{
int readed = 0;
int no_blocks, cur_block;
size_t left_bytes = statbuf->size - base.offset;
size_t left_bytes = statbuf->size - offset;
if (left_bytes < size) {
no_blocks = CEILING(left_bytes,ISO_BLOCKSIZE);
if (left_bytes < read_size) {
no_blocks = CEILING(left_bytes, ISO_BLOCKSIZE);
} else {
no_blocks = size / ISO_BLOCKSIZE;
no_blocks = read_size / ISO_BLOCKSIZE;
}
if (no_blocks == 0)
return 0;
cur_block = base.offset / ISO_BLOCKSIZE;
cur_block = offset / ISO_BLOCKSIZE;
readed = archive.SeekRead(ptr, statbuf->lsn + cur_block,
no_blocks);
......@@ -224,11 +216,11 @@ Iso9660InputStream::Read(void *ptr, size_t size, Error &error)
(unsigned long)cur_block);
return 0;
}
if (left_bytes < size) {
if (left_bytes < read_size) {
readed = left_bytes;
}
base.offset += readed;
offset += readed;
return readed;
}
......
......@@ -97,25 +97,24 @@ ZzipArchiveFile::Visit(ArchiveVisitor &visitor)
/* single archive handling */
struct ZzipInputStream {
InputStream base;
struct ZzipInputStream final : public InputStream {
ZzipArchiveFile *archive;
ZZIP_FILE *file;
ZzipInputStream(ZzipArchiveFile &_archive, const char *uri,
Mutex &mutex, Cond &cond,
ZzipInputStream(ZzipArchiveFile &_archive, const char *_uri,
Mutex &_mutex, Cond &_cond,
ZZIP_FILE *_file)
:base(zzip_input_plugin, uri, mutex, cond),
:InputStream(zzip_input_plugin, _uri, _mutex, _cond),
archive(&_archive), file(_file) {
base.ready = true;
//we are seekable (but its not recommendent to do so)
base.seekable = true;
seekable = true;
ZZIP_STAT z_stat;
zzip_file_stat(file, &z_stat);
base.size = z_stat.st_size;
size = z_stat.st_size;
SetReady();
archive->ref.Increment();
}
......@@ -138,11 +137,9 @@ ZzipArchiveFile::OpenStream(const char *pathname,
return nullptr;
}
ZzipInputStream *zis =
new ZzipInputStream(*this, pathname,
mutex, cond,
_file);
return &zis->base;
return new ZzipInputStream(*this, pathname,
mutex, cond,
_file);
}
static void
......
......@@ -52,30 +52,30 @@ ThreadInputStream::Start(Error &error)
if (!thread.Start(ThreadFunc, this, error))
return nullptr;
return &base;
return this;
}
inline void
ThreadInputStream::ThreadFunc()
{
FormatThreadName("input:%s", base.GetPlugin().name);
FormatThreadName("input:%s", GetPlugin().name);
Lock();
if (!Open(postponed_error)) {
base.cond.broadcast();
cond.broadcast();
Unlock();
return;
}
/* we're ready, tell it to our client */
base.SetReady();
SetReady();
while (!close) {
assert(!postponed_error.IsDefined());
auto w = buffer->Write();
if (w.IsEmpty()) {
wake_cond.wait(base.mutex);
wake_cond.wait(mutex);
} else {
Unlock();
......@@ -83,7 +83,7 @@ ThreadInputStream::ThreadFunc()
size_t nbytes = Read(w.data, w.size, error);
Lock();
base.cond.broadcast();
cond.broadcast();
if (nbytes == 0) {
eof = true;
......@@ -121,7 +121,8 @@ ThreadInputStream::Check2(Error &error)
bool
ThreadInputStream::Check(InputStream *is, Error &error)
{
return Cast(is)->Check2(error);
ThreadInputStream &tis = *(ThreadInputStream *)is;
return tis.Check2(error);
}
inline bool
......@@ -133,11 +134,12 @@ ThreadInputStream::Available2()
bool
ThreadInputStream::Available(InputStream *is)
{
return Cast(is)->Available2();
ThreadInputStream &tis = *(ThreadInputStream *)is;
return tis.Available2();
}
inline size_t
ThreadInputStream::Read2(void *ptr, size_t size, Error &error)
ThreadInputStream::Read2(void *ptr, size_t read_size, Error &error)
{
while (true) {
if (postponed_error.IsDefined()) {
......@@ -147,18 +149,18 @@ ThreadInputStream::Read2(void *ptr, size_t size, Error &error)
auto r = buffer->Read();
if (!r.IsEmpty()) {
size_t nbytes = std::min(size, r.size);
size_t nbytes = std::min(read_size, r.size);
memcpy(ptr, r.data, nbytes);
buffer->Consume(nbytes);
wake_cond.broadcast();
base.offset += nbytes;
offset += nbytes;
return nbytes;
}
if (eof)
return 0;
base.cond.wait(base.mutex);
cond.wait(mutex);
}
}
......@@ -166,7 +168,8 @@ size_t
ThreadInputStream::Read(InputStream *is, void *ptr, size_t size,
Error &error)
{
return Cast(is)->Read2(ptr, size, error);
ThreadInputStream &tis = *(ThreadInputStream *)is;
return tis.Read2(ptr, size, error);
}
inline void
......@@ -187,7 +190,8 @@ ThreadInputStream::Close2()
void
ThreadInputStream::Close(InputStream *is)
{
Cast(is)->Close2();
ThreadInputStream &tis = *(ThreadInputStream *)is;
tis.Close2();
}
inline bool
......@@ -199,5 +203,6 @@ ThreadInputStream::IsEOF2()
bool
ThreadInputStream::IsEOF(InputStream *is)
{
return Cast(is)->IsEOF2();
ThreadInputStream &tis = *(ThreadInputStream *)is;
return tis.IsEOF2();
}
......@@ -24,7 +24,6 @@
#include "InputStream.hxx"
#include "thread/Thread.hxx"
#include "thread/Cond.hxx"
#include "util/Cast.hxx"
#include "util/Error.hxx"
#include <stdint.h>
......@@ -40,9 +39,7 @@ template<typename T> class CircularBuffer;
*
* This works only for "streams": unknown length, no seeking, no tags.
*/
class ThreadInputStream {
InputStream base;
class ThreadInputStream : public InputStream {
Thread thread;
/**
......@@ -71,7 +68,7 @@ public:
ThreadInputStream(const InputPlugin &_plugin,
const char *_uri, Mutex &_mutex, Cond &_cond,
size_t _buffer_size)
:base(_plugin, _uri, _mutex, _cond),
:InputStream(_plugin, _uri, _mutex, _cond),
buffer_size(_buffer_size),
buffer(nullptr),
close(false), eof(false) {}
......@@ -86,24 +83,10 @@ public:
InputStream *Start(Error &error);
protected:
void Lock() {
base.Lock();
}
void Unlock() {
base.Unlock();
}
const char *GetURI() const {
assert(thread.IsInside());
return base.GetURI();
}
void SetMimeType(const char *mime) {
void SetMimeType(const char *_mime) {
assert(thread.IsInside());
base.SetMimeType(mime);
InputStream::SetMimeType(_mime);
}
/* to be implemented by the plugin */
......@@ -145,20 +128,6 @@ protected:
virtual void Cancel() {}
private:
#if GCC_CHECK_VERSION(4,6) || defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
#endif
static constexpr ThreadInputStream *Cast(InputStream *is) {
return ContainerCast(is, ThreadInputStream, base);
}
#if GCC_CHECK_VERSION(4,6) || defined(__clang__)
#pragma GCC diagnostic pop
#endif
void ThreadFunc();
static void ThreadFunc(void *ctx);
......
......@@ -32,7 +32,7 @@
#include "util/Error.hxx"
#include "util/StringUtil.hxx"
#include "util/ReusableArray.hxx"
#include "util/Cast.hxx"
#include "Log.hxx"
#include "event/MultiSocketMonitor.hxx"
#include "event/DeferredMonitor.hxx"
......@@ -64,8 +64,9 @@ static constexpr unsigned int default_rate = 44100; // cd quality
*/
static constexpr size_t read_buffer_size = 4096;
class AlsaInputStream final : MultiSocketMonitor, DeferredMonitor {
InputStream base;
class AlsaInputStream final
: public InputStream,
MultiSocketMonitor, DeferredMonitor {
snd_pcm_t *capture_handle;
size_t frame_size;
int frames_to_read;
......@@ -81,23 +82,23 @@ class AlsaInputStream final : MultiSocketMonitor, DeferredMonitor {
public:
AlsaInputStream(EventLoop &loop,
const char *uri, Mutex &mutex, Cond &cond,
const char *_uri, Mutex &_mutex, Cond &_cond,
snd_pcm_t *_handle, int _frame_size)
:MultiSocketMonitor(loop),
:InputStream(input_plugin_alsa, _uri, _mutex, _cond),
MultiSocketMonitor(loop),
DeferredMonitor(loop),
base(input_plugin_alsa, uri, mutex, cond),
capture_handle(_handle),
frame_size(_frame_size),
eof(false)
{
assert(uri != nullptr);
assert(_uri != nullptr);
assert(_handle != nullptr);
/* this mime type forces use of the PcmDecoderPlugin.
Needs to be generalised when/if that decoder is
updated to support other audio formats */
base.SetMimeType("audio/x-mpd-cdda-pcm");
base.SetReady();
SetMimeType("audio/x-mpd-cdda-pcm");
InputStream::SetReady();
frames_to_read = read_buffer_size / frame_size;
......@@ -115,19 +116,6 @@ public:
static InputStream *Create(const char *uri, Mutex &mutex, Cond &cond,
Error &error);
#if GCC_CHECK_VERSION(4,6) || defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
#endif
static constexpr AlsaInputStream *Cast(InputStream *is) {
return ContainerCast(is, AlsaInputStream, base);
}
#if GCC_CHECK_VERSION(4,6) || defined(__clang__)
#pragma GCC diagnostic pop
#endif
bool Available() {
if (snd_pcm_avail(capture_handle) > frames_to_read)
return true;
......@@ -188,18 +176,17 @@ AlsaInputStream::Create(const char *uri, Mutex &mutex, Cond &cond,
return nullptr;
int frame_size = snd_pcm_format_width(format) / 8 * channels;
AlsaInputStream *stream = new AlsaInputStream(io_thread_get(),
uri, mutex, cond,
handle, frame_size);
return &stream->base;
return new AlsaInputStream(io_thread_get(),
uri, mutex, cond,
handle, frame_size);
}
inline size_t
AlsaInputStream::Read(void *ptr, size_t size, Error &error)
AlsaInputStream::Read(void *ptr, size_t read_size, Error &error)
{
assert(ptr != nullptr);
int num_frames = size / frame_size;
int num_frames = read_size / frame_size;
int ret;
while ((ret = snd_pcm_readi(capture_handle, ptr, num_frames)) < 0) {
if (Recover(ret) < 0) {
......@@ -211,7 +198,7 @@ AlsaInputStream::Read(void *ptr, size_t size, Error &error)
}
size_t nbytes = ret * frame_size;
base.offset += nbytes;
offset += nbytes;
return nbytes;
}
......@@ -244,9 +231,9 @@ AlsaInputStream::DispatchSockets()
{
waiting = false;
const ScopeLock protect(base.mutex);
const ScopeLock protect(mutex);
/* wake up the thread that is waiting for more data */
base.cond.broadcast();
cond.broadcast();
}
inline int
......@@ -389,28 +376,28 @@ alsa_input_open(const char *uri, Mutex &mutex, Cond &cond, Error &error)
static void
alsa_input_close(InputStream *is)
{
AlsaInputStream *ais = AlsaInputStream::Cast(is);
AlsaInputStream *ais = (AlsaInputStream *)is;
delete ais;
}
static bool
alsa_input_available(InputStream *is)
{
AlsaInputStream *ais = AlsaInputStream::Cast(is);
AlsaInputStream *ais = (AlsaInputStream *)is;
return ais->Available();
}
static size_t
alsa_input_read(InputStream *is, void *ptr, size_t size, Error &error)
{
AlsaInputStream *ais = AlsaInputStream::Cast(is);
AlsaInputStream *ais = (AlsaInputStream *)is;
return ais->Read(ptr, size, error);
}
static bool
alsa_input_eof(gcc_unused InputStream *is)
{
AlsaInputStream *ais = AlsaInputStream::Cast(is);
AlsaInputStream *ais = (AlsaInputStream *)is;
return ais->IsEOF();
}
......
......@@ -50,9 +50,7 @@
#include <cdio/cd_types.h>
struct CdioParanoiaInputStream {
InputStream base;
struct CdioParanoiaInputStream final : public InputStream {
cdrom_drive_t *drv;
CdIo_t *cdio;
cdrom_paranoia_t *para;
......@@ -65,9 +63,9 @@ struct CdioParanoiaInputStream {
char buffer[CDIO_CD_FRAMESIZE_RAW];
int buffer_lsn;
CdioParanoiaInputStream(const char *uri, Mutex &mutex, Cond &cond,
CdioParanoiaInputStream(const char *_uri, Mutex &_mutex, Cond &_cond,
int _trackno)
:base(input_plugin_cdio_paranoia, uri, mutex, cond),
:InputStream(input_plugin_cdio_paranoia, _uri, _mutex, _cond),
drv(nullptr), cdio(nullptr), para(nullptr),
trackno(_trackno)
{
......@@ -264,16 +262,16 @@ input_cdio_open(const char *uri,
/* seek to beginning of the track */
cdio_paranoia_seek(i->para, i->lsn_from, SEEK_SET);
i->base.ready = true;
i->base.seekable = true;
i->base.size = (i->lsn_to - i->lsn_from + 1) * CDIO_CD_FRAMESIZE_RAW;
i->seekable = true;
i->size = (i->lsn_to - i->lsn_from + 1) * CDIO_CD_FRAMESIZE_RAW;
/* hack to make MPD select the "pcm" decoder plugin */
i->base.SetMimeType(reverse_endian
i->SetMimeType(reverse_endian
? "audio/x-mpd-cdda-pcm-reverse"
: "audio/x-mpd-cdda-pcm");
i->SetReady();
return &i->base;
return i;
}
static bool
......@@ -287,26 +285,26 @@ input_cdio_seek(InputStream *is,
case SEEK_SET:
break;
case SEEK_CUR:
offset += cis->base.offset;
offset += cis->offset;
break;
case SEEK_END:
offset += cis->base.size;
offset += cis->size;
break;
}
if (offset < 0 || offset > cis->base.size) {
if (offset < 0 || offset > cis->size) {
error.Format(cdio_domain, "Invalid offset to seek %ld (%ld)",
(long int)offset, (long int)cis->base.size);
(long int)offset, (long int)cis->size);
return false;
}
/* simple case */
if (offset == cis->base.offset)
if (offset == cis->offset)
return true;
/* calculate current LSN */
cis->lsn_relofs = offset / CDIO_CD_FRAMESIZE_RAW;
cis->base.offset = offset;
cis->offset = offset;
cdio_paranoia_seek(cis->para, cis->lsn_from + cis->lsn_relofs, SEEK_SET);
......@@ -360,7 +358,7 @@ input_cdio_read(InputStream *is, void *ptr, size_t length,
}
//correct offset
diff = cis->base.offset - cis->lsn_relofs * CDIO_CD_FRAMESIZE_RAW;
diff = cis->offset - cis->lsn_relofs * CDIO_CD_FRAMESIZE_RAW;
assert(diff >= 0 && diff < CDIO_CD_FRAMESIZE_RAW);
......@@ -374,8 +372,8 @@ input_cdio_read(InputStream *is, void *ptr, size_t length,
nbytes += len;
//update offset
cis->base.offset += len;
cis->lsn_relofs = cis->base.offset / CDIO_CD_FRAMESIZE_RAW;
cis->offset += len;
cis->lsn_relofs = cis->offset / CDIO_CD_FRAMESIZE_RAW;
//update length
length -= len;
}
......
......@@ -67,9 +67,7 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024;
*/
static const size_t CURL_RESUME_AT = 384 * 1024;
struct CurlInputStream {
InputStream base;
struct CurlInputStream final : public InputStream {
/* some buffers which were passed to libcurl, which we have
too free */
char range[32];
......@@ -106,9 +104,9 @@ struct CurlInputStream {
Error postponed_error;
CurlInputStream(const char *url, Mutex &mutex, Cond &cond,
CurlInputStream(const char *_url, Mutex &_mutex, Cond &_cond,
void *_buffer)
:base(input_plugin_curl, url, mutex, cond),
:InputStream(input_plugin_curl, _url, _mutex, _cond),
request_headers(nullptr),
buffer((uint8_t *)_buffer, CURL_MAX_BUFFERED),
paused(false),
......@@ -486,7 +484,7 @@ CurlInputStream::RequestDone(CURLcode result, long status)
FreeEasy();
const ScopeLock protect(base.mutex);
const ScopeLock protect(mutex);
if (result != CURLE_OK) {
postponed_error.Format(curl_domain, result,
......@@ -497,7 +495,7 @@ CurlInputStream::RequestDone(CURLcode result, long status)
status);
}
base.SetReady();
SetReady();
}
static void
......@@ -688,7 +686,7 @@ inline bool
CurlInputStream::FillBuffer(Error &error)
{
while (easy != nullptr && buffer.IsEmpty())
base.cond.wait(base.mutex);
cond.wait(mutex);
if (postponed_error.IsDefined()) {
error = std::move(postponed_error);
......@@ -768,7 +766,7 @@ input_curl_available(InputStream *is)
}
inline size_t
CurlInputStream::Read(void *ptr, size_t size, Error &error)
CurlInputStream::Read(void *ptr, size_t read_size, Error &error)
{
size_t nbytes;
......@@ -778,22 +776,22 @@ CurlInputStream::Read(void *ptr, size_t size, Error &error)
if (!FillBuffer(error))
return 0;
nbytes = read_from_buffer(icy, buffer, ptr, size);
nbytes = read_from_buffer(icy, buffer, ptr, read_size);
} while (nbytes == 0);
if (icy.IsDefined())
CopyIcyTag();
base.offset += (InputPlugin::offset_type)nbytes;
offset += (InputPlugin::offset_type)nbytes;
if (paused && GetTotalBufferSize() < CURL_RESUME_AT) {
base.mutex.unlock();
mutex.unlock();
BlockingCall(io_thread_get(), [this](){
Resume();
});
base.mutex.lock();
mutex.lock();
}
return nbytes;
......@@ -828,11 +826,11 @@ CurlInputStream::HeaderReceived(const char *name, std::string &&value)
if (StringEqualsCaseASCII(name, "accept-ranges")) {
/* a stream with icy-metadata is not seekable */
if (!icy.IsDefined())
base.seekable = true;
seekable = true;
} else if (StringEqualsCaseASCII(name, "content-length")) {
base.size = base.offset + ParseUint64(value.c_str());
size = offset + ParseUint64(value.c_str());
} else if (StringEqualsCaseASCII(name, "content-type")) {
base.SetMimeType(std::move(value));
SetMimeType(std::move(value));
} else if (StringEqualsCaseASCII(name, "icy-name") ||
StringEqualsCaseASCII(name, "ice-name") ||
StringEqualsCaseASCII(name, "x-audiocast-name")) {
......@@ -856,7 +854,7 @@ CurlInputStream::HeaderReceived(const char *name, std::string &&value)
/* a stream with icy-metadata is not
seekable */
base.seekable = false;
seekable = false;
}
}
}
......@@ -898,13 +896,13 @@ input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
}
inline size_t
CurlInputStream::DataReceived(const void *ptr, size_t size)
CurlInputStream::DataReceived(const void *ptr, size_t received_size)
{
assert(size > 0);
assert(received_size > 0);
const ScopeLock protect(base.mutex);
const ScopeLock protect(mutex);
if (size > buffer.GetSpace()) {
if (received_size > buffer.GetSpace()) {
paused = true;
return CURL_WRITEFUNC_PAUSE;
}
......@@ -912,23 +910,23 @@ CurlInputStream::DataReceived(const void *ptr, size_t size)
auto w = buffer.Write();
assert(!w.IsEmpty());
size_t nbytes = std::min(w.size, size);
size_t nbytes = std::min(w.size, received_size);
memcpy(w.data, ptr, nbytes);
buffer.Append(nbytes);
const size_t remaining = size - nbytes;
const size_t remaining = received_size - nbytes;
if (remaining > 0) {
w = buffer.Write();
assert(!w.IsEmpty());
assert(w.size >= remaining);
memcpy(w.data, (const uint8_t *)ptr + nbytes, remaining);
buffer.Append(size);
buffer.Append(received_size);
}
base.ready = true;
base.cond.broadcast();
return size;
ready = true;
cond.broadcast();
return received_size;
}
/** called by curl when new data is available */
......@@ -986,7 +984,7 @@ CurlInputStream::InitEasy(Error &error)
curl_easy_setopt(easy, CURLOPT_PROXYUSERPWD, proxy_auth_str);
}
CURLcode code = curl_easy_setopt(easy, CURLOPT_URL, base.GetURI());
CURLcode code = curl_easy_setopt(easy, CURLOPT_URL, GetURI());
if (code != CURLE_OK) {
error.Format(curl_domain, code,
"curl_easy_setopt() failed: %s",
......@@ -1003,16 +1001,16 @@ CurlInputStream::InitEasy(Error &error)
}
inline bool
CurlInputStream::Seek(InputPlugin::offset_type offset, int whence,
CurlInputStream::Seek(InputPlugin::offset_type new_offset, int whence,
Error &error)
{
assert(base.ready);
assert(IsReady());
if (whence == SEEK_SET && offset == base.offset)
if (whence == SEEK_SET && new_offset == offset)
/* no-op */
return true;
if (!base.seekable)
if (!IsSeekable())
return false;
/* calculate the absolute offset */
......@@ -1022,52 +1020,52 @@ CurlInputStream::Seek(InputPlugin::offset_type offset, int whence,
break;
case SEEK_CUR:
offset += base.offset;
new_offset += offset;
break;
case SEEK_END:
if (base.size < 0)
if (size < 0)
/* stream size is not known */
return false;
offset += base.size;
new_offset += size;
break;
default:
return false;
}
if (offset < 0)
if (new_offset < 0)
return false;
/* check if we can fast-forward the buffer */
while (offset > base.offset) {
while (new_offset > offset) {
auto r = buffer.Read();
if (r.IsEmpty())
break;
const size_t nbytes =
offset - base.offset < (InputPlugin::offset_type)r.size
? offset - base.offset
: r.size;
new_offset - offset < (InputPlugin::offset_type)r.size
? new_offset - offset
: r.size;
buffer.Consume(nbytes);
base.offset += nbytes;
offset += nbytes;
}
if (offset == base.offset)
if (new_offset == offset)
return true;
/* close the old connection and open a new one */
base.mutex.unlock();
mutex.unlock();
FreeEasyIndirect();
buffer.Clear();
base.offset = offset;
if (base.offset == base.size) {
offset = new_offset;
if (offset == size) {
/* seek to EOF: simulate empty result; avoid
triggering a "416 Requested Range Not Satisfiable"
response */
......@@ -1079,18 +1077,18 @@ CurlInputStream::Seek(InputPlugin::offset_type offset, int whence,
/* send the "Range" header */
if (base.offset > 0) {
sprintf(range, "%lld-", (long long)base.offset);
if (offset > 0) {
sprintf(range, "%lld-", (long long)offset);
curl_easy_setopt(easy, CURLOPT_RANGE, range);
}
base.ready = false;
ready = false;
if (!input_curl_easy_add_indirect(this, error))
return false;
base.mutex.lock();
base.WaitReady();
mutex.lock();
WaitReady();
if (postponed_error.IsDefined()) {
error = std::move(postponed_error);
......@@ -1127,7 +1125,7 @@ CurlInputStream::Open(const char *url, Mutex &mutex, Cond &cond,
return nullptr;
}
return &c->base;
return c;
}
static InputStream *
......
......@@ -36,9 +36,7 @@ extern "C" {
#include <stdio.h>
class DespotifyInputStream {
InputStream base;
class DespotifyInputStream final : public InputStream {
struct despotify_session *session;
struct ds_track *track;
Tag tag;
......@@ -46,11 +44,11 @@ class DespotifyInputStream {
size_t len_available;
bool eof;
DespotifyInputStream(const char *uri,
Mutex &mutex, Cond &cond,
DespotifyInputStream(const char *_uri,
Mutex &_mutex, Cond &_cond,
despotify_session *_session,
ds_track *_track)
:base(input_plugin_despotify, uri, mutex, cond),
:InputStream(input_plugin_despotify, _uri, _mutex, _cond),
session(_session), track(_track),
tag(mpd_despotify_tag_from_track(*track)),
len_available(0), eof(false) {
......@@ -58,8 +56,8 @@ class DespotifyInputStream {
memset(&pcm, 0, sizeof(pcm));
/* Despotify outputs pcm data */
base.SetMimeType("audio/x-mpd-cdda-pcm");
base.SetReady();
SetMimeType("audio/x-mpd-cdda-pcm");
SetReady();
}
public:
......@@ -190,7 +188,7 @@ DespotifyInputStream::Open(const char *url,
return nullptr;
}
return &ctx->base;
return ctx;
}
static InputStream *
......@@ -200,16 +198,17 @@ input_despotify_open(const char *url, Mutex &mutex, Cond &cond, Error &error)
}
inline size_t
DespotifyInputStream::Read(void *ptr, size_t size, gcc_unused Error &error)
DespotifyInputStream::Read(void *ptr, size_t read_size,
gcc_unused Error &error)
{
if (len_available == 0)
FillBuffer();
size_t to_cpy = std::min(size, len_available);
size_t to_cpy = std::min(read_size, len_available);
memcpy(ptr, pcm.buf, to_cpy);
len_available -= to_cpy;
base.offset += to_cpy;
offset += to_cpy;
return to_cpy;
}
......
......@@ -33,26 +33,24 @@ extern "C" {
#include <libavformat/avformat.h>
}
struct FfmpegInputStream {
InputStream base;
struct FfmpegInputStream final : public InputStream {
AVIOContext *h;
bool eof;
FfmpegInputStream(const char *uri, Mutex &mutex, Cond &cond,
FfmpegInputStream(const char *_uri, Mutex &_mutex, Cond &_cond,
AVIOContext *_h)
:base(input_plugin_ffmpeg, uri, mutex, cond),
:InputStream(input_plugin_ffmpeg, _uri, _mutex, _cond),
h(_h), eof(false) {
base.ready = true;
base.seekable = (h->seekable & AVIO_SEEKABLE_NORMAL) != 0;
base.size = avio_size(h);
seekable = (h->seekable & AVIO_SEEKABLE_NORMAL) != 0;
size = avio_size(h);
/* hack to make MPD select the "ffmpeg" decoder plugin
- since avio.h doesn't tell us the MIME type of the
resource, we can't select a decoder plugin, but the
"ffmpeg" plugin is quite good at auto-detection */
base.SetMimeType("audio/x-mpd-ffmpeg");
SetMimeType("audio/x-mpd-ffmpeg");
SetReady();
}
~FfmpegInputStream() {
......@@ -105,8 +103,7 @@ input_ffmpeg_open(const char *uri,
return nullptr;
}
auto *i = new FfmpegInputStream(uri, mutex, cond, h);
return &i->base;
return new FfmpegInputStream(uri, mutex, cond, h);
}
static size_t
......
......@@ -33,18 +33,16 @@
static constexpr Domain file_domain("file");
struct FileInputStream {
InputStream base;
struct FileInputStream final : public InputStream {
int fd;
FileInputStream(const char *path, int _fd, off_t size,
Mutex &mutex, Cond &cond)
:base(input_plugin_file, path, mutex, cond),
FileInputStream(const char *path, int _fd, off_t _size,
Mutex &_mutex, Cond &_cond)
:InputStream(input_plugin_file, path, _mutex, _cond),
fd(_fd) {
base.size = size;
base.seekable = true;
base.SetReady();
size = _size;
seekable = true;
SetReady();
}
~FileInputStream() {
......@@ -88,9 +86,7 @@ input_file_open(const char *filename,
posix_fadvise(fd, (off_t)0, st.st_size, POSIX_FADV_SEQUENTIAL);
#endif
FileInputStream *fis = new FileInputStream(filename, fd, st.st_size,
mutex, cond);
return &fis->base;
return new FileInputStream(filename, fd, st.st_size, mutex, cond);
}
static bool
......
......@@ -33,8 +33,8 @@ class MmsInputStream final : public ThreadInputStream {
mmsx_t *mms;
public:
MmsInputStream(const char *uri, Mutex &mutex, Cond &cond)
:ThreadInputStream(input_plugin_mms, uri, mutex, cond,
MmsInputStream(const char *_uri, Mutex &_mutex, Cond &_cond)
:ThreadInputStream(input_plugin_mms, _uri, _mutex, _cond,
MMS_BUFFER_SIZE) {
}
......@@ -89,9 +89,9 @@ input_mms_open(const char *url,
}
size_t
MmsInputStream::Read(void *ptr, size_t size, Error &error)
MmsInputStream::Read(void *ptr, size_t read_size, Error &error)
{
int nbytes = mmsx_read(nullptr, mms, (char *)ptr, size);
int nbytes = mmsx_read(nullptr, mms, (char *)ptr, read_size);
if (nbytes <= 0) {
if (nbytes < 0)
error.SetErrno("mmsx_read() failed");
......
......@@ -33,22 +33,20 @@ extern "C" {
#include <sys/stat.h>
#include <fcntl.h>
class NfsInputStream {
InputStream base;
class NfsInputStream final : public InputStream {
nfs_context *ctx;
nfsfh *fh;
public:
NfsInputStream(const char *uri,
Mutex &mutex, Cond &cond,
NfsInputStream(const char *_uri,
Mutex &_mutex, Cond &_cond,
nfs_context *_ctx, nfsfh *_fh,
InputStream::offset_type size)
:base(input_plugin_nfs, uri, mutex, cond),
InputStream::offset_type _size)
:InputStream(input_plugin_nfs, _uri, _mutex, _cond),
ctx(_ctx), fh(_fh) {
base.ready = true;
base.seekable = true;
base.size = size;
seekable = true;
size = _size;
SetReady();
}
~NfsInputStream() {
......@@ -56,16 +54,12 @@ public:
nfs_destroy_context(ctx);
}
InputStream *GetBase() {
return &base;
}
bool IsEOF() const {
return base.offset >= base.size;
return offset >= size;
}
size_t Read(void *ptr, size_t size, Error &error) {
int nbytes = nfs_read(ctx, fh, size, (char *)ptr);
size_t Read(void *ptr, size_t read_size, Error &error) {
int nbytes = nfs_read(ctx, fh, read_size, (char *)ptr);
if (nbytes < 0) {
error.SetErrno(-nbytes, "nfs_read() failed");
nbytes = 0;
......@@ -74,15 +68,16 @@ public:
return nbytes;
}
bool Seek(InputStream::offset_type offset, int whence, Error &error) {
bool Seek(InputStream::offset_type new_offset, int whence, Error &error) {
uint64_t current_offset;
int result = nfs_lseek(ctx, fh, offset, whence, &current_offset);
int result = nfs_lseek(ctx, fh, new_offset, whence,
&current_offset);
if (result < 0) {
error.SetErrno(-result, "smbc_lseek() failed");
return false;
}
base.offset = current_offset;
offset = current_offset;
return true;
}
};
......@@ -150,8 +145,7 @@ input_nfs_open(const char *uri,
return nullptr;
}
auto is = new NfsInputStream(uri, mutex, cond, ctx, fh, st.st_size);
return is->GetBase();
return new NfsInputStream(uri, mutex, cond, ctx, fh, st.st_size);
}
static size_t
......
......@@ -28,9 +28,7 @@
extern const InputPlugin rewind_input_plugin;
class RewindInputStream {
InputStream base;
class RewindInputStream final : public InputStream {
InputStream *input;
/**
......@@ -56,8 +54,8 @@ class RewindInputStream {
public:
RewindInputStream(InputStream *_input)
:base(rewind_input_plugin, _input->GetURI(),
_input->mutex, _input->cond),
:InputStream(rewind_input_plugin, _input->GetURI(),
_input->mutex, _input->cond),
input(_input), tail(0) {
}
......@@ -65,10 +63,6 @@ public:
input->Close();
}
InputStream *GetBase() {
return &base;
}
bool Check(Error &error) {
return input->Check(error);
}
......@@ -100,7 +94,7 @@ private:
* buffer contain more data for the next read operation?
*/
bool ReadingFromBuffer() const {
return tail > 0 && base.offset < input->offset;
return tail > 0 && offset < input->offset;
}
/**
......@@ -110,21 +104,20 @@ private:
* attributes.
*/
void CopyAttributes() {
InputStream *dest = &base;
const InputStream *src = input;
assert(dest != src);
assert(src != this);
if (!dest->IsReady() && src->IsReady()) {
if (!IsReady() && src->IsReady()) {
if (src->HasMimeType())
dest->SetMimeType(src->GetMimeType());
SetMimeType(src->GetMimeType());
dest->size = src->GetSize();
dest->seekable = src->IsSeekable();
dest->SetReady();
size = src->GetSize();
seekable = src->IsSeekable();
SetReady();
}
dest->offset = src->offset;
offset = src->offset;
}
};
......@@ -169,31 +162,31 @@ input_rewind_available(InputStream *is)
}
inline size_t
RewindInputStream::Read(void *ptr, size_t size, Error &error)
RewindInputStream::Read(void *ptr, size_t read_size, Error &error)
{
if (ReadingFromBuffer()) {
/* buffered read */
assert(head == (size_t)base.offset);
assert(head == (size_t)offset);
assert(tail == (size_t)input->offset);
if (size > tail - head)
size = tail - head;
if (read_size > tail - head)
read_size = tail - head;
memcpy(ptr, buffer + head, size);
head += size;
base.offset += size;
memcpy(ptr, buffer + head, read_size);
head += read_size;
offset += read_size;
return size;
return read_size;
} else {
/* pass method call to underlying stream */
size_t nbytes = input->Read(ptr, size, error);
size_t nbytes = input->Read(ptr, read_size, error);
if (input->offset > (InputPlugin::offset_type)sizeof(buffer))
/* disable buffering */
tail = 0;
else if (tail == (size_t)base.offset) {
else if (tail == (size_t)offset) {
/* append to buffer */
memcpy(buffer + tail, ptr, nbytes);
......@@ -226,25 +219,25 @@ input_rewind_eof(InputStream *is)
}
inline bool
RewindInputStream::Seek(InputPlugin::offset_type offset, int whence,
RewindInputStream::Seek(InputPlugin::offset_type new_offset, int whence,
Error &error)
{
assert(base.IsReady());
assert(IsReady());
if (whence == SEEK_SET && tail > 0 &&
offset <= (InputPlugin::offset_type)tail) {
new_offset <= (InputPlugin::offset_type)tail) {
/* buffered seek */
assert(!ReadingFromBuffer() ||
head == (size_t)base.offset);
head == (size_t)offset);
assert(tail == (size_t)input->offset);
head = (size_t)offset;
base.offset = offset;
head = (size_t)new_offset;
offset = new_offset;
return true;
} else {
bool success = input->Seek(offset, whence, error);
bool success = input->Seek(new_offset, whence, error);
CopyAttributes();
/* disable the buffer, because input has left the
......@@ -290,6 +283,5 @@ input_rewind_open(InputStream *is)
/* seekable resources don't need this plugin */
return is;
RewindInputStream *c = new RewindInputStream(is);
return c->GetBase();
return new RewindInputStream(is);
}
......@@ -28,21 +28,19 @@
#include <libsmbclient.h>
class SmbclientInputStream {
InputStream base;
class SmbclientInputStream final : public InputStream {
SMBCCTX *ctx;
int fd;
public:
SmbclientInputStream(const char *uri,
Mutex &mutex, Cond &cond,
SmbclientInputStream(const char *_uri,
Mutex &_mutex, Cond &_cond,
SMBCCTX *_ctx, int _fd, const struct stat &st)
:base(input_plugin_smbclient, uri, mutex, cond),
:InputStream(input_plugin_smbclient, _uri, _mutex, _cond),
ctx(_ctx), fd(_fd) {
base.ready = true;
base.seekable = true;
base.size = st.st_size;
seekable = true;
size = st.st_size;
SetReady();
}
~SmbclientInputStream() {
......@@ -52,17 +50,13 @@ public:
smbclient_mutex.unlock();
}
InputStream *GetBase() {
return &base;
}
bool IsEOF() const {
return base.offset >= base.size;
return offset >= size;
}
size_t Read(void *ptr, size_t size, Error &error) {
size_t Read(void *ptr, size_t read_size, Error &error) {
smbclient_mutex.lock();
ssize_t nbytes = smbc_read(fd, ptr, size);
ssize_t nbytes = smbc_read(fd, ptr, read_size);
smbclient_mutex.unlock();
if (nbytes < 0) {
error.SetErrno("smbc_read() failed");
......@@ -72,16 +66,16 @@ public:
return nbytes;
}
bool Seek(InputStream::offset_type offset, int whence, Error &error) {
bool Seek(InputStream::offset_type new_offset, int whence, Error &error) {
smbclient_mutex.lock();
off_t result = smbc_lseek(fd, offset, whence);
off_t result = smbc_lseek(fd, new_offset, whence);
smbclient_mutex.unlock();
if (result < 0) {
error.SetErrno("smbc_lseek() failed");
return false;
}
base.offset = result;
offset = result;
return true;
}
};
......@@ -144,8 +138,7 @@ input_smbclient_open(const char *uri,
return nullptr;
}
auto s = new SmbclientInputStream(uri, mutex, cond, ctx, fd, st);
return s->GetBase();
return new SmbclientInputStream(uri, mutex, cond, ctx, fd, st);
}
static size_t
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment