Commit 25354b9d authored by Max Kellermann's avatar Max Kellermann

Merge branch 'v0.22.x'

parents ee720064 25b01940
......@@ -172,7 +172,15 @@ FileDescriptor::CreatePipe(FileDescriptor &r, FileDescriptor &w) noexcept
#endif
}
#ifndef _WIN32
#ifdef _WIN32
void
FileDescriptor::SetBinaryMode() noexcept
{
_setmode(fd, _O_BINARY);
}
#else // !_WIN32
bool
FileDescriptor::CreatePipeNonBlock(FileDescriptor &r,
......
......@@ -147,10 +147,13 @@ public:
#ifdef _WIN32
void EnableCloseOnExec() noexcept {}
void DisableCloseOnExec() noexcept {}
void SetBinaryMode() noexcept;
#else
static bool CreatePipeNonBlock(FileDescriptor &r,
FileDescriptor &w) noexcept;
void SetBinaryMode() noexcept {}
/**
* Enable non-blocking mode on this file descriptor.
*/
......
......@@ -44,7 +44,11 @@ public:
void Close() noexcept override {}
int GetVolume() override {
auto future = COMWorker::Async([&]() -> int {
auto com_worker = wasapi_output_get_com_worker(output);
if (!com_worker)
return -1;
auto future = com_worker->Async([&]() -> int {
HRESULT result;
float volume_level;
......@@ -55,9 +59,9 @@ public:
result = endpoint_volume->GetMasterVolumeLevelScalar(
&volume_level);
if (FAILED(result)) {
throw FormatHResultError(result,
"Unable to get master "
"volume level");
throw MakeHResultError(result,
"Unable to get master "
"volume level");
}
} else {
auto session_volume =
......@@ -65,7 +69,7 @@ public:
result = session_volume->GetMasterVolume(&volume_level);
if (FAILED(result)) {
throw FormatHResultError(
throw MakeHResultError(
result, "Unable to get master volume");
}
}
......@@ -76,7 +80,11 @@ public:
}
void SetVolume(unsigned volume) override {
COMWorker::Async([&]() {
auto com_worker = wasapi_output_get_com_worker(output);
if (!com_worker)
throw std::runtime_error("Cannot set WASAPI volume");
com_worker->Async([&]() {
HRESULT result;
const float volume_level = volume / 100.0f;
......@@ -87,7 +95,7 @@ public:
result = endpoint_volume->SetMasterVolumeLevelScalar(
volume_level, nullptr);
if (FAILED(result)) {
throw FormatHResultError(
throw MakeHResultError(
result,
"Unable to set master volume level");
}
......@@ -98,7 +106,7 @@ public:
result = session_volume->SetMasterVolume(volume_level,
nullptr);
if (FAILED(result)) {
throw FormatHResultError(
throw MakeHResultError(
result, "Unable to set master volume");
}
}
......
......@@ -33,8 +33,8 @@ GetBufferSizeInFrames(IAudioClient &client)
HRESULT result = client.GetBufferSize(&buffer_size_in_frames);
if (FAILED(result))
throw FormatHResultError(result,
"Unable to get audio client buffer size");
throw MakeHResultError(result,
"Unable to get audio client buffer size");
return buffer_size_in_frames;
}
......@@ -46,8 +46,8 @@ GetCurrentPaddingFrames(IAudioClient &client)
HRESULT result = client.GetCurrentPadding(&padding_frames);
if (FAILED(result))
throw FormatHResultError(result,
"Failed to get current padding");
throw MakeHResultError(result,
"Failed to get current padding");
return padding_frames;
}
......@@ -59,7 +59,7 @@ GetMixFormat(IAudioClient &client)
HRESULT result = client.GetMixFormat(&f);
if (FAILED(result))
throw FormatHResultError(result, "GetMixFormat failed");
throw MakeHResultError(result, "GetMixFormat failed");
return ComHeapPtr{f};
}
......@@ -69,7 +69,7 @@ Start(IAudioClient &client)
{
HRESULT result = client.Start();
if (FAILED(result))
throw FormatHResultError(result, "Failed to start client");
throw MakeHResultError(result, "Failed to start client");
}
inline void
......@@ -77,7 +77,7 @@ Stop(IAudioClient &client)
{
HRESULT result = client.Stop();
if (FAILED(result))
throw FormatHResultError(result, "Failed to stop client");
throw MakeHResultError(result, "Failed to stop client");
}
inline void
......@@ -85,7 +85,7 @@ SetEventHandle(IAudioClient &client, HANDLE h)
{
HRESULT result = client.SetEventHandle(h);
if (FAILED(result))
throw FormatHResultError(result, "Unable to set event handle");
throw MakeHResultError(result, "Unable to set event handle");
}
template<typename T>
......@@ -95,7 +95,7 @@ GetService(IAudioClient &client)
T *p = nullptr;
HRESULT result = client.GetService(IID_PPV_ARGS(&p));
if (FAILED(result))
throw FormatHResultError(result, "Unable to get service");
throw MakeHResultError(result, "Unable to get service");
return ComPtr{p};
}
......
......@@ -33,8 +33,8 @@ GetDefaultAudioEndpoint(IMMDeviceEnumerator &e)
HRESULT result = e.GetDefaultAudioEndpoint(eRender, eMultimedia,
&device);
if (FAILED(result))
throw FormatHResultError(result,
"Unable to get default device for multimedia");
throw MakeHResultError(result,
"Unable to get default device for multimedia");
return ComPtr{device};
}
......@@ -47,7 +47,7 @@ EnumAudioEndpoints(IMMDeviceEnumerator &e)
HRESULT result = e.EnumAudioEndpoints(eRender, DEVICE_STATE_ACTIVE,
&dc);
if (FAILED(result))
throw FormatHResultError(result, "Unable to enumerate devices");
throw MakeHResultError(result, "Unable to enumerate devices");
return ComPtr{dc};
}
......@@ -59,7 +59,7 @@ GetCount(IMMDeviceCollection &dc)
HRESULT result = dc.GetCount(&count);
if (FAILED(result))
throw FormatHResultError(result, "Collection->GetCount failed");
throw MakeHResultError(result, "Collection->GetCount failed");
return count;
}
......@@ -71,7 +71,7 @@ Item(IMMDeviceCollection &dc, UINT i)
auto result = dc.Item(i, &device);
if (FAILED(result))
throw FormatHResultError(result, "Collection->Item failed");
throw MakeHResultError(result, "Collection->Item failed");
return ComPtr{device};
}
......@@ -83,7 +83,7 @@ GetState(IMMDevice &device)
HRESULT result = device.GetState(&state);;
if (FAILED(result))
throw FormatHResultError(result, "Unable to get device status");
throw MakeHResultError(result, "Unable to get device status");
return state;
}
......@@ -96,7 +96,7 @@ Activate(IMMDevice &device)
HRESULT result = device.Activate(__uuidof(T), CLSCTX_ALL,
nullptr, (void **)&p);
if (FAILED(result))
throw FormatHResultError(result, "Unable to activate device");
throw MakeHResultError(result, "Unable to activate device");
return ComPtr{p};
}
......@@ -108,8 +108,8 @@ OpenPropertyStore(IMMDevice &device)
HRESULT result = device.OpenPropertyStore(STGM_READ, &property_store);
if (FAILED(result))
throw FormatHResultError(result,
"Device->OpenPropertyStore failed");
throw MakeHResultError(result,
"Device->OpenPropertyStore failed");
return ComPtr{property_store};
}
......
......@@ -20,10 +20,13 @@
#ifndef MPD_WASAPI_OUTPUT_FOR_MIXER_HXX
#define MPD_WASAPI_OUTPUT_FOR_MIXER_HXX
#include <memory>
struct IMMDevice;
struct IAudioClient;
class AudioOutput;
class WasapiOutput;
class COMWorker;
[[gnu::pure]]
WasapiOutput &
......@@ -34,6 +37,10 @@ bool
wasapi_is_exclusive(WasapiOutput &output) noexcept;
[[gnu::pure]]
std::shared_ptr<COMWorker>
wasapi_output_get_com_worker(WasapiOutput &output) noexcept;
[[gnu::pure]]
IMMDevice *
wasapi_output_get_device(WasapiOutput &output) noexcept;
......
......@@ -29,17 +29,9 @@
class COM {
public:
COM() {
if (HRESULT result = CoInitializeEx(nullptr, COINIT_MULTITHREADED);
if (HRESULT result = CoInitializeEx(nullptr, COINIT_APARTMENTTHREADED|COINIT_DISABLE_OLE1DDE);
FAILED(result)) {
throw FormatHResultError(
result,
"Unable to initialize COM with COINIT_MULTITHREADED");
}
}
COM(bool) {
if (HRESULT result = CoInitializeEx(nullptr, COINIT_APARTMENTTHREADED);
FAILED(result)) {
throw FormatHResultError(
throw MakeHResultError(
result,
"Unable to initialize COM with COINIT_APARTMENTTHREADED");
}
......
......@@ -85,7 +85,7 @@ public:
::CoCreateInstance(class_id, unknown_outer, class_context,
__uuidof(T), reinterpret_cast<void **>(&ptr));
if (FAILED(result)) {
throw FormatHResultError(result, "Unable to create instance");
throw MakeHResultError(result, "Unable to create instance");
}
}
......
......@@ -21,13 +21,11 @@
#include "Com.hxx"
#include "thread/Name.hxx"
Mutex COMWorker::mutex;
unsigned int COMWorker::reference_count = 0;
std::optional<COMWorker::COMWorkerThread> COMWorker::thread;
void COMWorker::COMWorkerThread::Work() noexcept {
void
COMWorker::Work() noexcept
{
SetThreadName("COM Worker");
COM com{true};
COM com;
while (true) {
if (!running_flag.test_and_set()) {
return;
......
......@@ -22,76 +22,46 @@
#include "WinEvent.hxx"
#include "thread/Future.hxx"
#include "thread/Mutex.hxx"
#include "thread/Thread.hxx"
#include <boost/lockfree/spsc_queue.hpp>
#include <mutex>
#include <optional>
#include <windows.h>
// Worker thread for all COM operation
class COMWorker {
private:
class COMWorkerThread : public Thread {
public:
COMWorkerThread() : Thread{BIND_THIS_METHOD(Work)} {}
private:
friend class COMWorker;
void Work() noexcept;
void Finish() noexcept {
running_flag.clear();
event.Set();
}
void Push(const std::function<void()> &function) {
spsc_buffer.push(function);
event.Set();
}
Thread thread{BIND_THIS_METHOD(Work)};
boost::lockfree::spsc_queue<std::function<void()>> spsc_buffer{32};
std::atomic_flag running_flag = true;
WinEvent event{};
};
boost::lockfree::spsc_queue<std::function<void()>> spsc_buffer{32};
std::atomic_flag running_flag = true;
WinEvent event{};
public:
static void Aquire() {
std::unique_lock locker(mutex);
if (reference_count == 0) {
thread.emplace();
thread->Start();
}
++reference_count;
COMWorker() {
thread.Start();
}
static void Release() noexcept {
std::unique_lock locker(mutex);
--reference_count;
if (reference_count == 0) {
thread->Finish();
thread->Join();
thread.reset();
}
~COMWorker() noexcept {
Finish();
thread.Join();
}
template <typename Function, typename... Args>
static auto Async(Function &&function, Args &&...args) {
using R = std::invoke_result_t<std::decay_t<Function>,
std::decay_t<Args>...>;
COMWorker(const COMWorker &) = delete;
COMWorker &operator=(const COMWorker &) = delete;
template<typename Function>
auto Async(Function &&function) {
using R = std::invoke_result_t<std::decay_t<Function>>;
auto promise = std::make_shared<Promise<R>>();
auto future = promise->get_future();
thread->Push([function = std::forward<Function>(function),
args = std::make_tuple(std::forward<Args>(args)...),
Push([function = std::forward<Function>(function),
promise = std::move(promise)]() mutable {
try {
if constexpr (std::is_void_v<R>) {
std::apply(std::forward<Function>(function),
std::move(args));
std::invoke(std::forward<Function>(function));
promise->set_value();
} else {
promise->set_value(std::apply(
std::forward<Function>(function),
std::move(args)));
promise->set_value(std::invoke(std::forward<Function>(function)));
}
} catch (...) {
promise->set_exception(std::current_exception());
......@@ -101,9 +71,17 @@ public:
}
private:
static Mutex mutex;
static unsigned int reference_count;
static std::optional<COMWorkerThread> thread;
void Finish() noexcept {
running_flag.clear();
event.Set();
}
void Push(const std::function<void()> &function) {
spsc_buffer.push(function);
event.Set();
}
void Work() noexcept;
};
#endif
......@@ -18,6 +18,7 @@
*/
#include "HResult.hxx"
#include "system/Error.hxx"
#include <cassert>
#include <cstdarg>
......@@ -27,11 +28,21 @@
std::string
HResultCategory::message(int Errcode) const
{
char buffer[256];
/* FormatMessage() supports some HRESULT values (depending on
the Windows version) */
if (FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
nullptr, Errcode, 0,
buffer, sizeof(buffer),
nullptr))
return buffer;
const auto msg = HRESULTToString(Errcode);
if (!msg.empty())
return std::string(msg);
char buffer[11]; // "0x12345678\0"
int size = snprintf(buffer, sizeof(buffer), "0x%1x", Errcode);
assert(2 <= size && size <= 10);
return std::string(buffer, size);
......
......@@ -50,6 +50,8 @@ case x:
C(AUDCLNT_E_SERVICE_NOT_RUNNING);
C(AUDCLNT_E_UNSUPPORTED_FORMAT);
C(AUDCLNT_E_WRONG_ENDPOINT_TYPE);
C(AUDCLNT_E_NOT_INITIALIZED);
C(AUDCLNT_E_NOT_STOPPED);
C(CO_E_NOTINITIALIZED);
C(E_INVALIDARG);
C(E_OUTOFMEMORY);
......@@ -74,6 +76,13 @@ static inline const std::error_category &hresult_category() noexcept {
return hresult_category_instance;
}
inline std::system_error
MakeHResultError(HRESULT result, const char *msg) noexcept
{
return std::system_error(std::error_code(result, hresult_category()),
msg);
}
gcc_printf(2, 3) std::system_error
FormatHResultError(HRESULT result, const char *fmt, ...) noexcept;
......
......@@ -29,6 +29,7 @@
#include "pcm/Convert.hxx"
#include "fs/Path.hxx"
#include "fs/NarrowPath.hxx"
#include "io/FileDescriptor.hxx"
#include "util/ConstBuffer.hxx"
#include "util/StaticFifoBuffer.hxx"
#include "util/OptionDef.hxx"
......@@ -101,26 +102,21 @@ public:
}
};
int
main(int argc, char **argv)
try {
const auto c = ParseCommandLine(argc, argv);
SetLogThreshold(c.verbose ? LogLevel::DEBUG : LogLevel::INFO);
const GlobalInit init(c.config_path);
const size_t in_frame_size = c.in_audio_format.GetFrameSize();
PcmConvert state(c.in_audio_format, c.out_audio_format);
static void
RunConvert(PcmConvert &convert, size_t in_frame_size,
FileDescriptor in_fd, FileDescriptor out_fd)
{
in_fd.SetBinaryMode();
out_fd.SetBinaryMode();
StaticFifoBuffer<uint8_t, 4096> buffer;
StaticFifoBuffer<std::byte, 4096> buffer;
while (true) {
{
const auto dest = buffer.Write();
assert(!dest.empty());
ssize_t nbytes = read(0, dest.data, dest.size);
ssize_t nbytes = in_fd.Read(dest.data, dest.size);
if (nbytes <= 0)
break;
......@@ -136,20 +132,31 @@ try {
buffer.Consume(src.size);
auto output = state.Convert({src.data, src.size});
[[maybe_unused]] ssize_t ignored = write(1, output.data,
output.size);
auto output = convert.Convert({src.data, src.size});
out_fd.FullWrite(output.data, output.size);
}
while (true) {
auto output = state.Flush();
auto output = convert.Flush();
if (output.IsNull())
break;
[[maybe_unused]] ssize_t ignored = write(1, output.data,
output.size);
out_fd.FullWrite(output.data, output.size);
}
}
int
main(int argc, char **argv)
try {
const auto c = ParseCommandLine(argc, argv);
SetLogThreshold(c.verbose ? LogLevel::DEBUG : LogLevel::INFO);
const GlobalInit init(c.config_path);
PcmConvert state(c.in_audio_format, c.out_audio_format);
RunConvert(state, c.in_audio_format.GetFrameSize(),
FileDescriptor(STDIN_FILENO),
FileDescriptor(STDOUT_FILENO));
return EXIT_SUCCESS;
} catch (...) {
......
......@@ -164,6 +164,8 @@ static int
dump_input_stream(InputStream &is, FileDescriptor out,
offset_type seek, size_t chunk_size)
{
out.SetBinaryMode();
std::unique_lock<Mutex> lock(is.mutex);
if (seek > 0)
......
......@@ -31,6 +31,7 @@
#include "util/StringBuffer.hxx"
#include "util/RuntimeError.hxx"
#include "util/ScopeExit.hxx"
#include "util/StaticFifoBuffer.hxx"
#include "util/PrintException.hxx"
#include "LogBackend.hxx"
......@@ -113,8 +114,11 @@ LoadAudioOutput(const ConfigData &config, EventLoop &event_loop,
}
static void
run_output(AudioOutput &ao, AudioFormat audio_format)
RunOutput(AudioOutput &ao, AudioFormat audio_format,
FileDescriptor in_fd)
{
in_fd.SetBinaryMode();
/* open the audio output */
ao.Enable();
......@@ -126,33 +130,40 @@ run_output(AudioOutput &ao, AudioFormat audio_format)
fprintf(stderr, "audio_format=%s\n",
ToString(audio_format).c_str());
size_t frame_size = audio_format.GetFrameSize();
const size_t in_frame_size = audio_format.GetFrameSize();
/* play */
size_t length = 0;
char buffer[4096];
StaticFifoBuffer<std::byte, 4096> buffer;
while (true) {
if (length < sizeof(buffer)) {
ssize_t nbytes = read(0, buffer + length,
sizeof(buffer) - length);
{
const auto dest = buffer.Write();
assert(!dest.empty());
ssize_t nbytes = in_fd.Read(dest.data, dest.size);
if (nbytes <= 0)
break;
length += (size_t)nbytes;
buffer.Append(nbytes);
}
size_t play_length = (length / frame_size) * frame_size;
if (play_length > 0) {
size_t consumed = ao.Play(buffer, play_length);
auto src = buffer.Read();
assert(!src.empty());
assert(consumed <= length);
assert(consumed % frame_size == 0);
src.size -= src.size % in_frame_size;
if (src.empty())
continue;
length -= consumed;
memmove(buffer, buffer + consumed, length);
}
size_t consumed = ao.Play(src.data, src.size);
assert(consumed <= src.size);
assert(consumed % in_frame_size == 0);
buffer.Consume(consumed);
}
ao.Drain();
}
int main(int argc, char **argv)
......@@ -174,7 +185,7 @@ try {
/* do it */
run_output(*ao, c.audio_format);
RunOutput(*ao, c.audio_format, FileDescriptor(STDIN_FILENO));
/* cleanup and exit */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment