/* * Copyright 2003-2021 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "UringInputPlugin.hxx" #include "../AsyncInputStream.hxx" #include "event/Call.hxx" #include "event/Loop.hxx" #include "system/Error.hxx" #include "io/Open.hxx" #include "io/UniqueFileDescriptor.hxx" #include "io/uring/ReadOperation.hxx" #include "io/uring/Queue.hxx" #include "util/RuntimeError.hxx" #include <sys/stat.h> /** * Read at most this number of bytes in each read request. */ static const size_t URING_MAX_READ = 256 * 1024; /** * Do not buffer more than this number of bytes. It should be a * reasonable limit that doesn't make low-end machines suffer too * much, but doesn't cause stuttering on high-latency lines. */ static const size_t URING_MAX_BUFFERED = 512 * 1024; /** * Resume the stream at this number of bytes after it has been paused. */ static const size_t URING_RESUME_AT = 384 * 1024; static EventLoop *uring_input_event_loop; static Uring::Queue *uring_input_queue; class UringInputStream final : public AsyncInputStream, Uring::ReadHandler { Uring::Queue ů UniqueFileDescriptor fd; uint64_t next_offset = 0; std::unique_ptr<Uring::ReadOperation> read_operation; public: UringInputStream(EventLoop &event_loop, Uring::Queue &_uring, const char *path, UniqueFileDescriptor &&_fd, offset_type _size, Mutex &_mutex) :AsyncInputStream(event_loop, path, _mutex, URING_MAX_BUFFERED, URING_RESUME_AT), uring(_uring), fd(std::move(_fd)) { size = _size; seekable = true; SetReady(); BlockingCall(GetEventLoop(), [this](){ SubmitRead(); }); } ~UringInputStream() noexcept override { BlockingCall(GetEventLoop(), [this](){ CancelRead(); }); } private: void SubmitRead() noexcept; void CancelRead() noexcept { if (read_operation) read_operation.release()->Cancel(); } protected: /* virtual methods from AsyncInputStream */ void DoResume() override; void DoSeek(offset_type new_offset) override; private: /* virtual methods from class Uring::Operation */ void OnRead(std::unique_ptr<std::byte[]> buffer, std::size_t size) noexcept override; void OnReadError(int error) noexcept override; }; void UringInputStream::SubmitRead() noexcept { assert(!read_operation); int64_t remaining = size - next_offset; if (remaining <= 0) return; auto w = PrepareWriteBuffer(); if (w.empty()) { Pause(); return; } read_operation = std::make_unique<Uring::ReadOperation>(); read_operation->Start(uring, fd, next_offset, std::min(w.size, URING_MAX_READ), *this); } void UringInputStream::DoResume() { SubmitRead(); } void UringInputStream::DoSeek(offset_type new_offset) { CancelRead(); next_offset = offset = new_offset; SeekDone(); SubmitRead(); } void UringInputStream::OnRead(std::unique_ptr<std::byte[]> data, std::size_t nbytes) noexcept { read_operation.reset(); const std::lock_guard<Mutex> protect(mutex); if (nbytes == 0) { postponed_exception = std::make_exception_ptr(std::runtime_error("Premature end of file")); InvokeOnAvailable(); return; } auto w = PrepareWriteBuffer(); assert(w.size >= nbytes); memcpy(w.data, data.get(), nbytes); CommitWriteBuffer(nbytes); next_offset += nbytes; SubmitRead(); } void UringInputStream::OnReadError(int error) noexcept { read_operation.reset(); const std::lock_guard<Mutex> protect(mutex); postponed_exception = std::make_exception_ptr(MakeErrno(error, "Read failed")); InvokeOnAvailable(); } InputStreamPtr OpenUringInputStream(const char *path, Mutex &mutex) { if (uring_input_queue == nullptr) return nullptr; // TODO: use IORING_OP_OPENAT auto fd = OpenReadOnly(path); // TODO: use IORING_OP_STATX struct stat st; if (fstat(fd.Get(), &st) < 0) throw FormatErrno("Failed to access %s", path); if (!S_ISREG(st.st_mode)) throw FormatRuntimeError("Not a regular file: %s", path); return std::make_unique<UringInputStream>(*uring_input_event_loop, *uring_input_queue, path, std::move(fd), st.st_size, mutex); } void InitUringInputPlugin(EventLoop &event_loop) noexcept { uring_input_event_loop = &event_loop; BlockingCall(event_loop, [](){ uring_input_queue = uring_input_event_loop->GetUring(); }); }