Commit 9ccaa904 authored by Max Kellermann's avatar Max Kellermann

ntp_server: use the I/O thread

parent 4733c5fe
...@@ -1014,6 +1014,7 @@ test_run_ntp_server_LDADD = $(MPD_LIBS) \ ...@@ -1014,6 +1014,7 @@ test_run_ntp_server_LDADD = $(MPD_LIBS) \
$(GLIB_LIBS) $(GLIB_LIBS)
test_run_ntp_server_SOURCES = test/run_ntp_server.c \ test_run_ntp_server_SOURCES = test/run_ntp_server.c \
test/signals.c test/signals.h \ test/signals.c test/signals.h \
src/io_thread.c src/io_thread.h \
src/ntp_server.c src/ntp_server.h src/ntp_server.c src/ntp_server.h
test_run_filter_CPPFLAGS = $(AM_CPPFLAGS) test_run_filter_CPPFLAGS = $(AM_CPPFLAGS)
...@@ -1119,6 +1120,7 @@ test_run_output_LDADD = $(MPD_LIBS) \ ...@@ -1119,6 +1120,7 @@ test_run_output_LDADD = $(MPD_LIBS) \
test_run_output_SOURCES = test/run_output.c \ test_run_output_SOURCES = test/run_output.c \
test/stdbin.h \ test/stdbin.h \
src/conf.c src/tokenizer.c src/utils.c src/string_util.c src/log.c \ src/conf.c src/tokenizer.c src/utils.c src/string_util.c src/log.c \
src/io_thread.c src/io_thread.h \
src/audio_check.c \ src/audio_check.c \
src/audio_format.c \ src/audio_format.c \
src/audio_parser.c \ src/audio_parser.c \
......
...@@ -18,8 +18,10 @@ ...@@ -18,8 +18,10 @@
*/ */
#include "ntp_server.h" #include "ntp_server.h"
#include "io_thread.h"
#include <glib.h> #include <glib.h>
#include <assert.h>
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <string.h> #include <string.h>
...@@ -31,7 +33,6 @@ ...@@ -31,7 +33,6 @@
#include <ws2tcpip.h> #include <ws2tcpip.h>
#include <winsock.h> #include <winsock.h>
#else #else
#include <sys/select.h>
#include <sys/socket.h> #include <sys/socket.h>
#endif #endif
...@@ -75,7 +76,7 @@ fill_time_buffer(unsigned char *buffer) ...@@ -75,7 +76,7 @@ fill_time_buffer(unsigned char *buffer)
fill_time_buffer_with_time(buffer, &current_time); fill_time_buffer_with_time(buffer, &current_time);
} }
bool static bool
ntp_server_handle(struct ntp_server *ntp) ntp_server_handle(struct ntp_server *ntp)
{ {
unsigned char buf[32]; unsigned char buf[32];
...@@ -102,25 +103,14 @@ ntp_server_handle(struct ntp_server *ntp) ...@@ -102,25 +103,14 @@ ntp_server_handle(struct ntp_server *ntp)
return num_bytes == sizeof(buf); return num_bytes == sizeof(buf);
} }
bool static gboolean
ntp_server_check(struct ntp_server *ntp, struct timeval *tout) ntp_in_event(G_GNUC_UNUSED GIOChannel *source,
G_GNUC_UNUSED GIOCondition condition,
gpointer data)
{ {
fd_set rdfds; struct ntp_server *ntp = data;
int fdmax = 0;
FD_ZERO(&rdfds); ntp_server_handle(ntp);
FD_SET(ntp->fd, &rdfds);
fdmax = ntp->fd;
if (select(fdmax + 1, &rdfds,NULL, NULL, tout) <= 0)
return false;
if (FD_ISSET(ntp->fd, &rdfds)) {
if (!ntp_server_handle(ntp)) {
g_debug("unable to send timing response\n");
return false;
}
}
return true; return true;
} }
...@@ -132,8 +122,40 @@ ntp_server_init(struct ntp_server *ntp) ...@@ -132,8 +122,40 @@ ntp_server_init(struct ntp_server *ntp)
} }
void void
ntp_server_open(struct ntp_server *ntp, int fd)
{
assert(ntp->fd < 0);
assert(fd >= 0);
ntp->fd = fd;
#ifndef G_OS_WIN32
ntp->channel = g_io_channel_unix_new(fd);
#else
ntp->channel = g_io_channel_win32_new_socket(fd);
#endif
/* NULL encoding means the stream is binary safe */
g_io_channel_set_encoding(ntp->channel, NULL, NULL);
/* no buffering */
g_io_channel_set_buffered(ntp->channel, false);
ntp->source = g_io_create_watch(ntp->channel, G_IO_IN);
g_source_set_callback(ntp->source, (GSourceFunc)ntp_in_event, ntp,
NULL);
g_source_attach(ntp->source, io_thread_context());
}
void
ntp_server_close(struct ntp_server *ntp) ntp_server_close(struct ntp_server *ntp)
{ {
if (ntp->source != NULL) {
g_source_destroy(ntp->source);
g_source_unref(ntp->source);
}
if (ntp->channel != NULL)
g_io_channel_unref(ntp->channel);
if (ntp->fd >= 0) if (ntp->fd >= 0)
close(ntp->fd); close(ntp->fd);
} }
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#ifndef MPD_NTP_SERVER_H #ifndef MPD_NTP_SERVER_H
#define MPD_NTP_SERVER_H #define MPD_NTP_SERVER_H
#include <glib.h>
#include <stdbool.h> #include <stdbool.h>
struct timeval; struct timeval;
...@@ -27,24 +29,18 @@ struct timeval; ...@@ -27,24 +29,18 @@ struct timeval;
struct ntp_server { struct ntp_server {
unsigned short port; unsigned short port;
int fd; int fd;
GIOChannel *channel;
GSource *source;
}; };
void void
ntp_server_init(struct ntp_server *ntp); ntp_server_init(struct ntp_server *ntp);
void void
ntp_server_close(struct ntp_server *ntp); ntp_server_open(struct ntp_server *ntp, int fd);
/*
* Recv the NTP datagram from the AirTunes, send back an NTP response.
*/
bool
ntp_server_handle(struct ntp_server *ntp);
/* void
* check to see if there are any timing requests, and respond if there are any ntp_server_close(struct ntp_server *ntp);
*/
bool
ntp_server_check(struct ntp_server *ntp, struct timeval *tout);
#endif #endif
...@@ -535,15 +535,11 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd, ...@@ -535,15 +535,11 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
while (true) { while (true) {
FD_ZERO(&rdfds); FD_ZERO(&rdfds);
FD_SET(rtspcld->fd, &rdfds); FD_SET(rtspcld->fd, &rdfds);
FD_SET(raop_session->ntp.fd, &rdfds); fdmax = rtspcld->fd;
fdmax = raop_session->ntp.fd > rtspcld->fd ? raop_session->ntp.fd : rtspcld->fd;;
select(fdmax + 1, &rdfds, NULL, NULL, &tout); select(fdmax + 1, &rdfds, NULL, NULL, &tout);
if (FD_ISSET(rtspcld->fd, &rdfds)) { if (FD_ISSET(rtspcld->fd, &rdfds)) {
break; break;
} }
if (FD_ISSET(raop_session->ntp.fd, &rdfds)) {
ntp_server_handle(&raop_session->ntp);
}
} }
if (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) <= 0) { if (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) <= 0) {
...@@ -1068,20 +1064,14 @@ static bool ...@@ -1068,20 +1064,14 @@ static bool
send_audio_data(int fd, GError **error_r) send_audio_data(int fd, GError **error_r)
{ {
int i = 0; int i = 0;
struct timeval current_time, tout, rtp_time; struct timeval current_time, rtp_time;
struct raop_data *rd = raop_session->raop_list; struct raop_data *rd = raop_session->raop_list;
get_time_for_rtp(&raop_session->play_state, &rtp_time); get_time_for_rtp(&raop_session->play_state, &rtp_time);
gettimeofday(&current_time, NULL); gettimeofday(&current_time, NULL);
int diff = difference(&current_time, &rtp_time); int diff = difference(&current_time, &rtp_time);
g_usleep(-diff);
while (diff < -10000) {
tout.tv_sec = 0;
tout.tv_usec = -diff;
ntp_server_check(&raop_session->ntp, &tout);
gettimeofday(&current_time, NULL);
diff = difference(&current_time, &rtp_time);
}
gettimeofday(&raop_session->play_state.last_send, NULL); gettimeofday(&raop_session->play_state.last_send, NULL);
while (rd) { while (rd) {
if (rd->started) { if (rd->started) {
...@@ -1214,10 +1204,8 @@ raop_output_cancel(void *data) ...@@ -1214,10 +1204,8 @@ raop_output_cancel(void *data)
static bool static bool
raop_output_pause(void *data) raop_output_pause(void *data)
{ {
struct timeval tout = {.tv_sec = 0, .tv_usec = 0};
struct raop_data *rd = (struct raop_data *) data; struct raop_data *rd = (struct raop_data *) data;
ntp_server_check(&raop_session->ntp, &tout);
rd->paused = true; rd->paused = true;
return true; return true;
} }
...@@ -1284,17 +1272,18 @@ raop_output_open(void *data, struct audio_format *audio_format, GError **error_r ...@@ -1284,17 +1272,18 @@ raop_output_open(void *data, struct audio_format *audio_format, GError **error_r
if (raop_session->data_fd < 0) if (raop_session->data_fd < 0)
return false; return false;
raop_session->ntp.fd = int fd = open_udp_socket(NULL, &raop_session->ntp.port,
open_udp_socket(NULL, &raop_session->ntp.port, error_r);
error_r); if (fd < 0)
if (raop_session->ntp.fd < 0)
return false; return false;
ntp_server_open(&raop_session->ntp, fd);
raop_session->ctrl.fd = raop_session->ctrl.fd =
open_udp_socket(NULL, &raop_session->ctrl.port, open_udp_socket(NULL, &raop_session->ctrl.port,
error_r); error_r);
if (raop_session->ctrl.fd < 0) { if (raop_session->ctrl.fd < 0) {
close(raop_session->ntp.fd); ntp_server_close(&raop_session->ntp);
raop_session->ctrl.fd = -1; raop_session->ctrl.fd = -1;
g_mutex_unlock(raop_session->list_mutex); g_mutex_unlock(raop_session->list_mutex);
return false; return false;
...@@ -1324,7 +1313,6 @@ raop_output_play(void *data, const void *chunk, size_t size, ...@@ -1324,7 +1313,6 @@ raop_output_play(void *data, const void *chunk, size_t size,
{ {
//raopcl_send_sample //raopcl_send_sample
struct raop_data *rd = data; struct raop_data *rd = data;
struct timeval tout = {.tv_sec = 0, .tv_usec = 0};
size_t rval = 0, orig_size = size; size_t rval = 0, orig_size = size;
rd->paused = false; rd->paused = false;
...@@ -1335,8 +1323,6 @@ raop_output_play(void *data, const void *chunk, size_t size, ...@@ -1335,8 +1323,6 @@ raop_output_play(void *data, const void *chunk, size_t size,
g_mutex_lock(raop_session->data_mutex); g_mutex_lock(raop_session->data_mutex);
ntp_server_check(&raop_session->ntp, &tout);
if (raop_session->play_state.rtptime <= NUMSAMPLES) { if (raop_session->play_state.rtptime <= NUMSAMPLES) {
// looped over, need new reference point to calculate correct times // looped over, need new reference point to calculate correct times
raop_session->play_state.playing = false; raop_session->play_state.playing = false;
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "config.h" #include "config.h"
#include "ntp_server.h" #include "ntp_server.h"
#include "signals.h" #include "signals.h"
#include "io_thread.h"
#include <glib.h> #include <glib.h>
...@@ -39,12 +40,10 @@ ...@@ -39,12 +40,10 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#endif #endif
static bool quit = false;
void void
on_quit(void) on_quit(void)
{ {
quit = true; io_thread_quit();
} }
static int bind_host(int sd, char *hostname, unsigned long ulAddr, static int bind_host(int sd, char *hostname, unsigned long ulAddr,
...@@ -122,27 +121,25 @@ open_udp_socket(char *hostname, unsigned short *port) ...@@ -122,27 +121,25 @@ open_udp_socket(char *hostname, unsigned short *port)
int int
main(G_GNUC_UNUSED int argc, G_GNUC_UNUSED char **argv) main(G_GNUC_UNUSED int argc, G_GNUC_UNUSED char **argv)
{ {
g_thread_init(NULL);
signals_init(); signals_init();
io_thread_init();
struct ntp_server ntp; struct ntp_server ntp;
ntp_server_init(&ntp); ntp_server_init(&ntp);
ntp.fd = open_udp_socket(NULL, &ntp.port); int fd = open_udp_socket(NULL, &ntp.port);
if (ntp.fd < 0) { if (fd < 0) {
g_printerr("Failed to create UDP socket\n"); g_printerr("Failed to create UDP socket\n");
ntp_server_close(&ntp); ntp_server_close(&ntp);
return EXIT_FAILURE; return EXIT_FAILURE;
} }
while (!quit) { ntp_server_open(&ntp, fd);
struct timeval tv = {
.tv_sec = 1,
.tv_usec = 0,
};
ntp_server_check(&ntp, &tv); io_thread_run();
}
ntp_server_close(&ntp); ntp_server_close(&ntp);
io_thread_deinit();
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
*/ */
#include "config.h" #include "config.h"
#include "io_thread.h"
#include "output_plugin.h" #include "output_plugin.h"
#include "output_internal.h" #include "output_internal.h"
#include "output_control.h" #include "output_control.h"
...@@ -36,6 +37,7 @@ ...@@ -36,6 +37,7 @@
#include <assert.h> #include <assert.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <stdlib.h>
struct playlist g_playlist; struct playlist g_playlist;
...@@ -146,6 +148,13 @@ int main(int argc, char **argv) ...@@ -146,6 +148,13 @@ int main(int argc, char **argv)
return 1; return 1;
} }
io_thread_init();
if (!io_thread_start(&error)) {
g_warning("%s", error->message);
g_error_free(error);
return EXIT_FAILURE;
}
/* initialize the audio output */ /* initialize the audio output */
if (!load_audio_output(&ao, argv[2])) if (!load_audio_output(&ao, argv[2]))
...@@ -216,6 +225,8 @@ int main(int argc, char **argv) ...@@ -216,6 +225,8 @@ int main(int argc, char **argv)
ao_plugin_finish(ao.plugin, ao.data); ao_plugin_finish(ao.plugin, ao.data);
g_mutex_free(ao.mutex); g_mutex_free(ao.mutex);
io_thread_deinit();
config_global_finish(); config_global_finish();
return 0; return 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment