Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
mpd
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Иван Мажукин
mpd
Commits
a8a39b6a
Commit
a8a39b6a
authored
Feb 22, 2021
by
Max Kellermann
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
output/snapcast: queue chunks
parent
f84cb6de
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
160 additions
and
25 deletions
+160
-25
Chunk.hxx
src/output/plugins/snapcast/Chunk.hxx
+55
-0
Client.cxx
src/output/plugins/snapcast/Client.cxx
+43
-6
Client.hxx
src/output/plugins/snapcast/Client.hxx
+22
-2
Internal.hxx
src/output/plugins/snapcast/Internal.hxx
+9
-4
SnapcastOutputPlugin.cxx
src/output/plugins/snapcast/SnapcastOutputPlugin.cxx
+31
-13
No files found.
src/output/plugins/snapcast/Chunk.hxx
0 → 100644
View file @
a8a39b6a
/*
* Copyright 2003-2021 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef MPD_SNAPCAST_CHUNK_HXX
#define MPD_SNAPCAST_CHUNK_HXX
#include "util/AllocatedArray.hxx"
#include <chrono>
#include <cstddef>
#include <list>
#include <memory>
#include <queue>
/**
* A chunk of data to be transmitted to connected Snapcast clients.
*/
struct
SnapcastChunk
{
std
::
chrono
::
steady_clock
::
time_point
time
;
AllocatedArray
<
std
::
byte
>
payload
;
SnapcastChunk
(
std
::
chrono
::
steady_clock
::
time_point
_time
,
AllocatedArray
<
std
::
byte
>
&&
_payload
)
noexcept
:
time
(
_time
),
payload
(
std
::
move
(
_payload
))
{}
};
using
SnapcastChunkPtr
=
std
::
shared_ptr
<
SnapcastChunk
>
;
using
SnapcastChunkQueue
=
std
::
queue
<
SnapcastChunkPtr
,
std
::
list
<
SnapcastChunkPtr
>>
;
inline
void
ClearQueue
(
SnapcastChunkQueue
&
q
)
noexcept
{
while
(
!
q
.
empty
())
q
.
pop
();
}
#endif
src/output/plugins/snapcast/Client.cxx
View file @
a8a39b6a
...
...
@@ -58,11 +58,49 @@ SnapcastClient::LockClose() noexcept
}
void
SnapcastClient
::
Push
(
SnapcastChunkPtr
chunk
)
noexcept
{
if
(
!
active
)
return
;
chunks
.
emplace
(
std
::
move
(
chunk
));
event
.
ScheduleWrite
();
}
SnapcastChunkPtr
SnapcastClient
::
LockPopQueue
()
noexcept
{
const
std
::
lock_guard
<
Mutex
>
protect
(
output
.
mutex
);
if
(
chunks
.
empty
())
return
nullptr
;
auto
chunk
=
std
::
move
(
chunks
.
front
());
chunks
.
pop
();
return
chunk
;
}
void
SnapcastClient
::
OnSocketReady
(
unsigned
flags
)
noexcept
{
if
(
flags
&
SocketEvent
::
WRITE
)
// TODO
{}
if
(
flags
&
SocketEvent
::
WRITE
)
{
constexpr
auto
max_age
=
std
::
chrono
::
milliseconds
(
500
);
const
auto
min_time
=
GetEventLoop
().
SteadyNow
()
-
max_age
;
while
(
auto
chunk
=
LockPopQueue
())
{
if
(
chunk
->
time
<
min_time
)
/* discard old chunks */
continue
;
const
ConstBuffer
<
std
::
byte
>
payload
=
chunk
->
payload
;
if
(
!
SendWireChunk
(
payload
.
ToVoid
(),
chunk
->
time
))
{
// TODO: handle EAGAIN
LockClose
();
return
;
}
}
event
.
CancelWrite
();
}
BufferedSocket
::
OnSocketReady
(
flags
);
}
...
...
@@ -187,12 +225,11 @@ SendWireChunk(SocketDescriptor s, const PackedBE16 id,
return
SendT
(
s
,
base
)
&&
SendT
(
s
,
hdr
)
&&
Send
(
s
,
payload
);
}
void
bool
SnapcastClient
::
SendWireChunk
(
ConstBuffer
<
void
>
payload
,
std
::
chrono
::
steady_clock
::
time_point
t
)
noexcept
{
if
(
active
)
::
SendWireChunk
(
GetSocket
(),
next_id
++
,
payload
,
t
);
return
::
SendWireChunk
(
GetSocket
(),
next_id
++
,
payload
,
t
);
}
BufferedSocket
::
InputResult
...
...
src/output/plugins/snapcast/Client.hxx
View file @
a8a39b6a
...
...
@@ -20,6 +20,7 @@
#ifndef MPD_OUTPUT_SNAPCAST_CLIENT_HXX
#define MPD_OUTPUT_SNAPCAST_CLIENT_HXX
#include "Chunk.hxx"
#include "event/BufferedSocket.hxx"
#include "util/IntrusiveList.hxx"
...
...
@@ -35,6 +36,11 @@ class SnapcastClient final : BufferedSocket, public IntrusiveListHook
{
SnapcastOutput
&
output
;
/**
* A queue of #Page objects to be sent to the client.
*/
SnapcastChunkQueue
chunks
;
uint16_t
next_id
=
1
;
bool
active
=
false
;
...
...
@@ -54,10 +60,24 @@ public:
void
LockClose
()
noexcept
;
void
SendWireChunk
(
ConstBuffer
<
void
>
payload
,
std
::
chrono
::
steady_clock
::
time_point
t
)
noexcept
;
/**
* Caller must lock the mutex.
*/
void
Push
(
SnapcastChunkPtr
chunk
)
noexcept
;
/**
* Caller must lock the mutex.
*/
void
Cancel
()
noexcept
{
ClearQueue
(
chunks
);
}
private
:
SnapcastChunkPtr
LockPopQueue
()
noexcept
;
bool
SendWireChunk
(
ConstBuffer
<
void
>
payload
,
std
::
chrono
::
steady_clock
::
time_point
t
)
noexcept
;
bool
SendServerSettings
(
const
SnapcastBase
&
request
)
noexcept
;
bool
SendCodecHeader
(
const
SnapcastBase
&
request
)
noexcept
;
bool
SendTime
(
const
SnapcastBase
&
request_header
,
...
...
src/output/plugins/snapcast/Internal.hxx
View file @
a8a39b6a
...
...
@@ -20,10 +20,12 @@
#ifndef MPD_OUTPUT_SNAPCAST_INTERNAL_HXX
#define MPD_OUTPUT_SNAPCAST_INTERNAL_HXX
#include "Chunk.hxx"
#include "output/Interface.hxx"
#include "output/Timer.hxx"
#include "thread/Mutex.hxx"
#include "event/ServerSocket.hxx"
#include "event/InjectEvent.hxx"
#include "util/AllocatedArray.hxx"
#include "util/IntrusiveList.hxx"
...
...
@@ -41,6 +43,8 @@ class SnapcastOutput final : AudioOutput, ServerSocket {
*/
bool
open
;
InjectEvent
inject_event
;
/**
* The configured encoder plugin.
*/
...
...
@@ -69,10 +73,12 @@ class SnapcastOutput final : AudioOutput, ServerSocket {
*/
IntrusiveList
<
SnapcastClient
>
clients
;
SnapcastChunkQueue
chunks
;
public
:
/**
* This mutex protects the listener socket
and the clien
t
*
list
.
* This mutex protects the listener socket
, the #clients lis
t
*
and the #chunks queue
.
*/
mutable
Mutex
mutex
;
...
...
@@ -161,8 +167,7 @@ public:
bool
Pause
()
override
;
private
:
void
BroadcastWireChunk
(
ConstBuffer
<
void
>
payload
,
std
::
chrono
::
steady_clock
::
time_point
t
)
noexcept
;
void
OnInject
()
noexcept
;
/* virtual methods from class ServerSocket */
void
OnAccept
(
UniqueSocketDescriptor
fd
,
...
...
src/output/plugins/snapcast/SnapcastOutputPlugin.cxx
View file @
a8a39b6a
...
...
@@ -40,6 +40,7 @@ SnapcastOutput::SnapcastOutput(EventLoop &_loop, const ConfigBlock &block)
:
AudioOutput
(
FLAG_ENABLE_DISABLE
|
FLAG_PAUSE
|
FLAG_NEED_FULLY_DEFINED_AUDIO_FORMAT
),
ServerSocket
(
_loop
),
inject_event
(
_loop
,
BIND_THIS_METHOD
(
OnInject
)),
// TODO: support other encoder plugins?
prepared_encoder
(
encoder_init
(
wave_encoder_plugin
,
block
))
{
...
...
@@ -146,16 +147,34 @@ SnapcastOutput::Close() noexcept
delete
timer
;
BlockingCall
(
GetEventLoop
(),
[
this
](){
inject_event
.
Cancel
();
const
std
::
lock_guard
<
Mutex
>
protect
(
mutex
);
open
=
false
;
clients
.
clear_and_dispose
(
DeleteDisposer
{});
});
ClearQueue
(
chunks
);
codec_header
=
nullptr
;
delete
encoder
;
}
void
SnapcastOutput
::
OnInject
()
noexcept
{
const
std
::
lock_guard
<
Mutex
>
protect
(
mutex
);
while
(
!
chunks
.
empty
())
{
const
auto
chunk
=
std
::
move
(
chunks
.
front
());
chunks
.
pop
();
for
(
auto
&
client
:
clients
)
client
.
Push
(
chunk
);
}
}
void
SnapcastOutput
::
RemoveClient
(
SnapcastClient
&
client
)
noexcept
{
assert
(
!
clients
.
empty
());
...
...
@@ -185,17 +204,6 @@ SnapcastOutput::Delay() const noexcept
:
std
::
chrono
::
steady_clock
::
duration
::
zero
();
}
inline
void
SnapcastOutput
::
BroadcastWireChunk
(
ConstBuffer
<
void
>
payload
,
std
::
chrono
::
steady_clock
::
time_point
t
)
noexcept
{
const
std
::
lock_guard
<
Mutex
>
protect
(
mutex
);
// TODO: no blocking send(), enqueue chunks, send() in I/O thread
for
(
auto
&
client
:
clients
)
client
.
SendWireChunk
(
payload
,
t
);
}
size_t
SnapcastOutput
::
Play
(
const
void
*
chunk
,
size_t
size
)
{
...
...
@@ -233,7 +241,12 @@ SnapcastOutput::Play(const void *chunk, size_t size)
if
(
nbytes
==
0
)
break
;
BroadcastWireChunk
({
buffer
,
nbytes
},
now
);
const
std
::
lock_guard
<
Mutex
>
protect
(
mutex
);
if
(
chunks
.
empty
())
inject_event
.
Schedule
();
const
ConstBuffer
payload
{
buffer
,
nbytes
};
chunks
.
emplace
(
std
::
make_shared
<
SnapcastChunk
>
(
now
,
AllocatedArray
{
payload
}));
}
return
size
;
...
...
@@ -251,7 +264,12 @@ SnapcastOutput::Pause()
void
SnapcastOutput
::
Cancel
()
noexcept
{
// TODO
const
std
::
lock_guard
<
Mutex
>
protect
(
mutex
);
ClearQueue
(
chunks
);
for
(
auto
&
client
:
clients
)
client
.
Cancel
();
}
const
struct
AudioOutputPlugin
snapcast_output_plugin
=
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment