Commit 395191bd authored by Max Kellermann's avatar Max Kellermann

rtsp_client: use the I/O thread

Make the code portable.
parent ec7d8fb6
......@@ -294,6 +294,7 @@ src_mpd_SOURCES = \
src/client_message.c \
src/client_subscribe.h \
src/client_subscribe.c \
src/tcp_socket.c src/tcp_socket.h \
src/udp_server.c src/udp_server.h \
src/server_socket.c \
src/listen.c \
......@@ -1126,6 +1127,7 @@ test_run_output_SOURCES = test/run_output.c \
src/conf.c src/tokenizer.c src/utils.c src/string_util.c src/log.c \
src/io_thread.c src/io_thread.h \
src/udp_server.c src/udp_server.h \
src/tcp_socket.c src/tcp_socket.h \
src/audio_check.c \
src/audio_format.c \
src/audio_parser.c \
......
......@@ -22,13 +22,16 @@
*/
#include "rtsp_client.h"
#include "tcp_socket.h"
#include "glib_compat.h"
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
#include <sys/time.h>
#ifdef WIN32
#define WINVER 0x0501
......@@ -37,8 +40,6 @@
#else
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/poll.h>
#include <netdb.h>
#endif
......@@ -78,6 +79,9 @@ rtspcl_open(void)
{
struct rtspcl_data *rtspcld;
rtspcld = g_new0(struct rtspcl_data, 1);
rtspcld->mutex = g_mutex_new();
rtspcld->cond = g_cond_new();
rtspcld->received_lines = g_queue_new();
rtspcld->useragent = "RTSPClient";
return rtspcld;
}
......@@ -223,33 +227,141 @@ get_tcp_connect_by_host(int sd, const char *host, short destport,
get_tcp_connect(sd, addr, error_r);
}
static void
rtsp_client_flush_received(struct rtspcl_data *rtspcld)
{
char *line;
while ((line = g_queue_pop_head(rtspcld->received_lines)) != NULL)
g_free(line);
}
static size_t
rtsp_client_socket_data(const void *_data, size_t length, void *ctx)
{
struct rtspcl_data *rtspcld = ctx;
g_mutex_lock(rtspcld->mutex);
if (rtspcld->tcp_socket == NULL) {
g_mutex_unlock(rtspcld->mutex);
return 0;
}
const bool was_empty = g_queue_is_empty(rtspcld->received_lines);
bool added = false;
const char *data = _data, *end = data + length, *p = data, *eol;
while ((eol = memchr(p, '\n', end - p)) != NULL) {
const char *next = eol + 1;
if (rtspcld->received_lines->length < 64) {
if (eol > p && eol[-1] == '\r')
--eol;
g_queue_push_tail(rtspcld->received_lines,
g_strndup(p, eol - p));
added = true;
}
p = next;
}
if (was_empty && added)
g_cond_broadcast(rtspcld->cond);
g_mutex_unlock(rtspcld->mutex);
return p - data;
}
static void
rtsp_client_socket_error(GError *error, void *ctx)
{
struct rtspcl_data *rtspcld = ctx;
g_warning("%s", error->message);
g_error_free(error);
g_mutex_lock(rtspcld->mutex);
rtsp_client_flush_received(rtspcld);
struct tcp_socket *s = rtspcld->tcp_socket;
rtspcld->tcp_socket = NULL;
g_cond_broadcast(rtspcld->cond);
g_mutex_unlock(rtspcld->mutex);
if (s != NULL)
tcp_socket_free(s);
}
static void
rtsp_client_socket_disconnected(void *ctx)
{
struct rtspcl_data *rtspcld = ctx;
g_mutex_lock(rtspcld->mutex);
rtsp_client_flush_received(rtspcld);
struct tcp_socket *s = rtspcld->tcp_socket;
rtspcld->tcp_socket = NULL;
g_cond_broadcast(rtspcld->cond);
g_mutex_unlock(rtspcld->mutex);
if (s != NULL)
tcp_socket_free(s);
}
static const struct tcp_socket_handler rtsp_client_socket_handler = {
.data = rtsp_client_socket_data,
.error = rtsp_client_socket_error,
.disconnected = rtsp_client_socket_disconnected,
};
bool
rtspcl_connect(struct rtspcl_data *rtspcld, const char *host, short destport,
const char *sid, GError **error_r)
{
assert(rtspcld->tcp_socket == NULL);
unsigned short myport = 0;
struct sockaddr_in name;
socklen_t namelen = sizeof(name);
if ((rtspcld->fd = open_tcp_socket(NULL, &myport, error_r)) == -1)
int fd = open_tcp_socket(NULL, &myport, error_r);
if (fd < 0)
return false;
if (!get_tcp_connect_by_host(rtspcld->fd, host, destport, error_r))
if (!get_tcp_connect_by_host(fd, host, destport, error_r))
return false;
getsockname(rtspcld->fd, (struct sockaddr*)&name, &namelen);
getsockname(fd, (struct sockaddr*)&name, &namelen);
memcpy(&rtspcld->local_addr, &name.sin_addr,sizeof(struct in_addr));
sprintf(rtspcld->url, "rtsp://%s/%s", inet_ntoa(name.sin_addr), sid);
getpeername(rtspcld->fd, (struct sockaddr*)&name, &namelen);
getpeername(fd, (struct sockaddr*)&name, &namelen);
memcpy(&rtspcld->host_addr, &name.sin_addr, sizeof(struct in_addr));
rtspcld->tcp_socket = tcp_socket_new(fd, &rtsp_client_socket_handler,
rtspcld);
return true;
}
static void
rtspcl_disconnect(struct rtspcl_data *rtspcld)
{
if (rtspcld->fd > 0) close(rtspcld->fd);
rtspcld->fd = 0;
g_mutex_lock(rtspcld->mutex);
rtsp_client_flush_received(rtspcld);
g_mutex_unlock(rtspcld->mutex);
if (rtspcld->tcp_socket != NULL) {
tcp_socket_free(rtspcld->tcp_socket);
rtspcld->tcp_socket = NULL;
}
}
static void
......@@ -263,8 +375,11 @@ void
rtspcl_close(struct rtspcl_data *rtspcld)
{
rtspcl_disconnect(rtspcld);
g_queue_free(rtspcld->received_lines);
rtspcl_remove_all_exthds(rtspcld);
g_free(rtspcld->session);
g_cond_free(rtspcld->cond);
g_mutex_free(rtspcld->mutex);
g_free(rtspcld);
}
......@@ -294,40 +409,51 @@ rtspcl_add_exthds(struct rtspcl_data *rtspcld, const char *key, char *data)
* returned string in line is always null terminated, maxlen-1 is maximum string length
*/
static int
read_line(int fd, char *line, int maxlen, int timeout, int no_poll)
read_line(struct rtspcl_data *rtspcld, char *line, int maxlen,
int timeout)
{
int i, rval;
int count = 0;
struct pollfd pfds;
char ch;
*line = 0;
pfds.events = POLLIN;
pfds.fd = fd;
for (i = 0;i < maxlen; i++) {
if (no_poll || poll(&pfds, 1, timeout))
rval=read(fd,&ch,1);
else return 0;
if (rval == -1) {
if (errno == EAGAIN) return 0;
g_warning("%s:read error: %s\n", __func__, strerror(errno));
return -1;
g_mutex_lock(rtspcld->mutex);
GTimeVal end_time;
if (timeout >= 0) {
g_get_current_time(&end_time);
end_time.tv_sec += timeout / 1000;
timeout %= 1000;
end_time.tv_usec = timeout * 1000;
if (end_time.tv_usec > 1000000) {
end_time.tv_usec -= 1000000;
++end_time.tv_sec;
}
}
while (true) {
if (!g_queue_is_empty(rtspcld->received_lines)) {
/* success, copy to buffer */
char *p = g_queue_pop_head(rtspcld->received_lines);
g_mutex_unlock(rtspcld->mutex);
g_strlcpy(line, p, maxlen);
g_free(p);
return strlen(line);
}
if (rval == 0) {
g_debug("%s:disconnected on the other end\n", __func__);
if (rtspcld->tcp_socket == NULL) {
/* error */
g_mutex_unlock(rtspcld->mutex);
return -1;
}
if(ch == '\n') {
*line = 0;
return count;
if (timeout < 0) {
g_cond_wait(rtspcld->cond, rtspcld->mutex);
} else if (!g_cond_timed_wait(rtspcld->cond, rtspcld->mutex,
&end_time)) {
g_mutex_unlock(rtspcld->mutex);
return 0;
}
if (ch == '\r') continue;
*line++ = ch;
count++;
if (count >= maxlen - 1) break;
}
*line = 0;
return count;
}
/*
......@@ -346,13 +472,9 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
char reql[128];
const char delimiters[] = " ";
char *token, *dp;
int dsize = 0,rval;
int dsize = 0;
int timeout = 5000; // msec unit
fd_set rdfds;
int fdmax = 0;
struct timeval tout = {.tv_sec=10, .tv_usec=0};
if (!rtspcld) {
g_set_error_literal(error_r, rtsp_client_quark(), 0,
"not connected");
......@@ -393,8 +515,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
if (content_type && content)
strncat(req, content, sizeof(req));
rval = write(rtspcld->fd, req, strlen(req));
if (rval < 0) {
if (!tcp_socket_send(rtspcld->tcp_socket, req, strlen(req))) {
g_set_error(error_r, rtsp_client_quark(), errno,
"write error: %s",
g_strerror(errno));
......@@ -403,17 +524,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
if (!get_response) return true;
while (true) {
FD_ZERO(&rdfds);
FD_SET(rtspcld->fd, &rdfds);
fdmax = rtspcld->fd;
select(fdmax + 1, &rdfds, NULL, NULL, &tout);
if (FD_ISSET(rtspcld->fd, &rdfds)) {
break;
}
}
if (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) <= 0) {
if (read_line(rtspcld, line, sizeof(line), timeout) <= 0) {
g_set_error_literal(error_r, rtsp_client_quark(), 0,
"request failed");
return false;
......@@ -443,7 +554,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
struct key_data *cur_kd = *kd;
struct key_data *new_kd = NULL;
while (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) > 0) {
while (read_line(rtspcld, line, sizeof(line), timeout) > 0) {
timeout = 1000; // once it started, it shouldn't take a long time
if (new_kd != NULL && line[0] == ' ') {
const char *j = line;
......
......@@ -42,7 +42,13 @@ struct key_data {
};
struct rtspcl_data {
int fd;
GMutex *mutex;
GCond *cond;
GQueue *received_lines;
struct tcp_socket *tcp_socket;
char url[128];
int cseq;
struct key_data *exthds;
......
/*
* Copyright (C) 2003-2011 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "tcp_socket.h"
#include "fifo_buffer.h"
#include "io_thread.h"
#include <assert.h>
#include <string.h>
#ifdef WIN32
#define WINVER 0x0501
#include <ws2tcpip.h>
#include <winsock.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#endif
struct tcp_socket {
const struct tcp_socket_handler *handler;
void *handler_ctx;
GMutex *mutex;
GIOChannel *channel;
GSource *in_source, *out_source;
struct fifo_buffer *input, *output;
};
static gboolean
tcp_event(GIOChannel *source, GIOCondition condition, gpointer data);
static void
tcp_socket_schedule_read(struct tcp_socket *s)
{
assert(s->input != NULL);
assert(!fifo_buffer_is_full(s->input));
if (s->in_source != NULL)
return;
s->in_source = g_io_create_watch(s->channel,
G_IO_IN|G_IO_ERR|G_IO_HUP);
g_source_set_callback(s->in_source, (GSourceFunc)tcp_event, s, NULL);
g_source_attach(s->in_source, io_thread_context());
}
static void
tcp_socket_unschedule_read(struct tcp_socket *s)
{
if (s->in_source == NULL)
return;
g_source_destroy(s->in_source);
g_source_unref(s->in_source);
s->in_source = NULL;
}
static void
tcp_socket_schedule_write(struct tcp_socket *s)
{
assert(s->output != NULL);
assert(!fifo_buffer_is_empty(s->output));
if (s->out_source != NULL)
return;
s->out_source = g_io_create_watch(s->channel, G_IO_OUT);
g_source_set_callback(s->out_source, (GSourceFunc)tcp_event, s, NULL);
g_source_attach(s->out_source, io_thread_context());
}
static void
tcp_socket_unschedule_write(struct tcp_socket *s)
{
if (s->out_source == NULL)
return;
g_source_destroy(s->out_source);
g_source_unref(s->out_source);
s->out_source = NULL;
}
/**
* Close the socket. Caller must lock the mutex.
*/
static void
tcp_socket_close(struct tcp_socket *s)
{
tcp_socket_unschedule_read(s);
tcp_socket_unschedule_write(s);
if (s->channel != NULL) {
g_io_channel_unref(s->channel);
s->channel = NULL;
}
if (s->input != NULL) {
fifo_buffer_free(s->input);
s->input = NULL;
}
if (s->output != NULL) {
fifo_buffer_free(s->output);
s->output = NULL;
}
}
static gpointer
tcp_socket_close_callback(gpointer data)
{
struct tcp_socket *s = data;
g_mutex_lock(s->mutex);
tcp_socket_close(s);
g_mutex_unlock(s->mutex);
return NULL;
}
static void
tcp_socket_close_indirect(struct tcp_socket *s)
{
io_thread_call(tcp_socket_close_callback, s);
assert(s->channel == NULL);
assert(s->in_source == NULL);
assert(s->out_source == NULL);
}
static void
tcp_handle_input(struct tcp_socket *s)
{
size_t length;
const void *p = fifo_buffer_read(s->input, &length);
if (p == NULL)
return;
g_mutex_unlock(s->mutex);
size_t consumed = s->handler->data(p, length, s->handler_ctx);
g_mutex_lock(s->mutex);
if (consumed > 0 && s->input != NULL)
fifo_buffer_consume(s->input, consumed);
}
static bool
tcp_in_event(struct tcp_socket *s)
{
assert(s != NULL);
assert(s->channel != NULL);
g_mutex_lock(s->mutex);
size_t max_length;
void *p = fifo_buffer_write(s->input, &max_length);
if (p == NULL) {
GError *error = g_error_new_literal(tcp_socket_quark(), 0,
"buffer overflow");
tcp_socket_close(s);
g_mutex_unlock(s->mutex);
s->handler->error(error, s->handler_ctx);
return false;
}
gsize bytes_read;
GError *error = NULL;
GIOStatus status = g_io_channel_read_chars(s->channel,
p, max_length,
&bytes_read, &error);
switch (status) {
case G_IO_STATUS_NORMAL:
fifo_buffer_append(s->input, bytes_read);
tcp_handle_input(s);
g_mutex_unlock(s->mutex);
return true;
case G_IO_STATUS_AGAIN:
/* try again later */
g_mutex_unlock(s->mutex);
return true;
case G_IO_STATUS_EOF:
/* peer disconnected */
tcp_socket_close(s);
g_mutex_unlock(s->mutex);
s->handler->disconnected(s->handler_ctx);
return false;
case G_IO_STATUS_ERROR:
/* I/O error */
tcp_socket_close(s);
g_mutex_unlock(s->mutex);
s->handler->error(error, s->handler_ctx);
return false;
}
/* unreachable */
assert(false);
return true;
}
static bool
tcp_out_event(struct tcp_socket *s)
{
assert(s != NULL);
assert(s->channel != NULL);
g_mutex_lock(s->mutex);
size_t length;
const void *p = fifo_buffer_read(s->output, &length);
if (p == NULL) {
/* no more data in the output buffer, remove the
output event */
tcp_socket_unschedule_write(s);
g_mutex_unlock(s->mutex);
return false;
}
gsize bytes_written;
GError *error = NULL;
GIOStatus status = g_io_channel_write_chars(s->channel, p, length,
&bytes_written, &error);
switch (status) {
case G_IO_STATUS_NORMAL:
fifo_buffer_consume(s->output, bytes_written);
g_mutex_unlock(s->mutex);
return true;
case G_IO_STATUS_AGAIN:
tcp_socket_schedule_write(s);
g_mutex_unlock(s->mutex);
return true;
case G_IO_STATUS_EOF:
/* peer disconnected */
tcp_socket_close(s);
g_mutex_unlock(s->mutex);
s->handler->disconnected(s->handler_ctx);
return false;
case G_IO_STATUS_ERROR:
/* I/O error */
tcp_socket_close(s);
g_mutex_unlock(s->mutex);
s->handler->error(error, s->handler_ctx);
return false;
}
/* unreachable */
g_mutex_unlock(s->mutex);
assert(false);
return true;
}
static gboolean
tcp_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition,
gpointer data)
{
struct tcp_socket *s = data;
assert(source == s->channel);
switch (condition) {
case G_IO_IN:
case G_IO_PRI:
return tcp_in_event(s);
case G_IO_OUT:
return tcp_out_event(s);
case G_IO_ERR:
case G_IO_HUP:
case G_IO_NVAL:
tcp_socket_close(s);
s->handler->disconnected(s->handler_ctx);
return false;
}
/* unreachable */
assert(false);
return false;
}
struct tcp_socket *
tcp_socket_new(int fd,
const struct tcp_socket_handler *handler, void *ctx)
{
assert(fd >= 0);
assert(handler != NULL);
assert(handler->data != NULL);
assert(handler->error != NULL);
assert(handler->disconnected != NULL);
struct tcp_socket *s = g_new(struct tcp_socket, 1);
s->handler = handler;
s->handler_ctx = ctx;
s->mutex = g_mutex_new();
g_mutex_lock(s->mutex);
#ifndef G_OS_WIN32
s->channel = g_io_channel_unix_new(fd);
#else
s->channel = g_io_channel_win32_new_socket(fd);
#endif
/* GLib is responsible for closing the file descriptor */
g_io_channel_set_close_on_unref(s->channel, true);
/* NULL encoding means the stream is binary safe */
g_io_channel_set_encoding(s->channel, NULL, NULL);
/* no buffering */
g_io_channel_set_buffered(s->channel, false);
s->input = fifo_buffer_new(4096);
s->output = fifo_buffer_new(4096);
s->in_source = NULL;
s->out_source = NULL;
tcp_socket_schedule_read(s);
g_mutex_unlock(s->mutex);
return s;
}
void
tcp_socket_free(struct tcp_socket *s)
{
tcp_socket_close_indirect(s);
g_mutex_free(s->mutex);
g_free(s);
}
bool
tcp_socket_send(struct tcp_socket *s, const void *data, size_t length)
{
assert(s != NULL);
g_mutex_lock(s->mutex);
if (s->output == NULL || s->channel == NULL) {
/* already disconnected */
g_mutex_unlock(s->mutex);
return false;
}
size_t max_length;
void *p = fifo_buffer_write(s->output, &max_length);
if (p == NULL || max_length < length) {
/* buffer is full */
g_mutex_unlock(s->mutex);
return false;
}
memcpy(p, data, length);
fifo_buffer_append(s->output, length);
tcp_socket_schedule_write(s);
g_mutex_unlock(s->mutex);
return true;
}
/*
* Copyright (C) 2003-2011 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef MPD_TCP_SOCKET_H
#define MPD_TCP_SOCKET_H
#include <glib.h>
#include <stdbool.h>
#include <stddef.h>
struct sockaddr;
struct tcp_socket_handler {
/**
* New data has arrived.
*
* @return the number of bytes consumed; 0 if more data is
* needed
*/
size_t (*data)(const void *data, size_t length, void *ctx);
void (*error)(GError *error, void *ctx);
void (*disconnected)(void *ctx);
};
static inline GQuark
tcp_socket_quark(void)
{
return g_quark_from_static_string("tcp_socket");
}
G_GNUC_MALLOC
struct tcp_socket *
tcp_socket_new(int fd,
const struct tcp_socket_handler *handler, void *ctx);
void
tcp_socket_free(struct tcp_socket *s);
bool
tcp_socket_send(struct tcp_socket *s, const void *data, size_t length);
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment