Commit 39439b80 authored by Max Kellermann's avatar Max Kellermann

Client: rebase on the new BufferedSocket class

parent 396480cf
...@@ -22,11 +22,11 @@ ...@@ -22,11 +22,11 @@
#include "gcc.h" #include "gcc.h"
#include <stdbool.h>
#include <stddef.h> #include <stddef.h>
#include <stdarg.h> #include <stdarg.h>
struct sockaddr; struct sockaddr;
class EventLoop;
struct Partition; struct Partition;
class Client; class Client;
...@@ -34,7 +34,7 @@ void client_manager_init(void); ...@@ -34,7 +34,7 @@ void client_manager_init(void);
void client_manager_deinit(void); void client_manager_deinit(void);
void void
client_new(Partition &partition, client_new(EventLoop &loop, Partition &partition,
int fd, const struct sockaddr *sa, size_t sa_length, int uid); int fd, const struct sockaddr *sa, size_t sa_length, int uid);
/** /**
......
...@@ -19,92 +19,18 @@ ...@@ -19,92 +19,18 @@
#include "config.h" #include "config.h"
#include "ClientInternal.hxx" #include "ClientInternal.hxx"
#include "Main.hxx"
#include "event/Loop.hxx"
#include <assert.h> void
Client::OnSocketError(GError *error)
static gboolean
client_out_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition,
gpointer data)
{ {
Client *client = (Client *)data; g_warning("error on client %d: %s", num, error->message);
g_error_free(error);
assert(!client->IsExpired());
if (condition != G_IO_OUT) {
client->SetExpired();
return false;
}
client_write_deferred(client);
if (client->IsExpired()) { SetExpired();
client->Close();
return false;
}
g_timer_start(client->last_activity);
if (client->output_buffer.IsEmpty()) {
/* done sending deferred buffers exist: schedule
read */
client->source_id = g_io_add_watch(client->channel,
GIOCondition(G_IO_IN|G_IO_ERR|G_IO_HUP),
client_in_event, client);
return false;
}
/* write more */
return true;
} }
gboolean void
client_in_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition, Client::OnSocketClosed()
gpointer data)
{ {
Client *client = (Client *)data; SetExpired();
enum command_return ret;
assert(!client->IsExpired());
if (condition != G_IO_IN) {
client->SetExpired();
return false;
}
g_timer_start(client->last_activity);
ret = client_read(client);
switch (ret) {
case COMMAND_RETURN_OK:
case COMMAND_RETURN_IDLE:
case COMMAND_RETURN_ERROR:
break;
case COMMAND_RETURN_KILL:
client->Close();
main_loop->Break();
return false;
case COMMAND_RETURN_CLOSE:
client->Close();
return false;
}
if (client->IsExpired()) {
client->Close();
return false;
}
if (!client->output_buffer.IsEmpty()) {
/* deferred buffers exist: schedule write */
client->source_id = g_io_add_watch(client->channel,
GIOCondition(G_IO_OUT|G_IO_ERR|G_IO_HUP),
client_out_event, client);
return false;
}
/* read more */
return true;
} }
...@@ -26,18 +26,11 @@ static guint expire_source_id; ...@@ -26,18 +26,11 @@ static guint expire_source_id;
void void
Client::SetExpired() Client::SetExpired()
{ {
if (!IsExpired()) if (IsExpired())
client_schedule_expire(); return;
if (source_id != 0) { client_schedule_expire();
g_source_remove(source_id); BufferedSocket::Close();
source_id = 0;
}
if (channel != NULL) {
g_io_channel_unref(channel);
channel = nullptr;
}
} }
static void static void
......
...@@ -60,10 +60,8 @@ client_idle_add(Client *client, unsigned flags) ...@@ -60,10 +60,8 @@ client_idle_add(Client *client, unsigned flags)
client->idle_flags |= flags; client->idle_flags |= flags;
if (client->idle_waiting if (client->idle_waiting
&& (client->idle_flags & client->idle_subscriptions)) { && (client->idle_flags & client->idle_subscriptions))
client_idle_notify(client); client_idle_notify(client);
client_write_output(client);
}
} }
static void static void
......
...@@ -24,8 +24,8 @@ ...@@ -24,8 +24,8 @@
#include "Client.hxx" #include "Client.hxx"
#include "ClientMessage.hxx" #include "ClientMessage.hxx"
#include "CommandListBuilder.hxx" #include "CommandListBuilder.hxx"
#include "event/BufferedSocket.hxx"
#include "command.h" #include "command.h"
#include "util/PeakBuffer.hxx"
#include <set> #include <set>
#include <string> #include <string>
...@@ -42,20 +42,13 @@ enum { ...@@ -42,20 +42,13 @@ enum {
}; };
struct Partition; struct Partition;
class PeakBuffer;
class Client { class Client final : private BufferedSocket {
public: public:
Partition &partition; Partition &partition;
struct playlist &playlist; struct playlist &playlist;
struct player_control *player_control; struct player_control *player_control;
GIOChannel *channel;
guint source_id;
/** the buffer for reading lines from the #channel */
struct fifo_buffer *input;
unsigned permission; unsigned permission;
/** the uid of the client process, or -1 if unknown */ /** the uid of the client process, or -1 if unknown */
...@@ -70,8 +63,6 @@ public: ...@@ -70,8 +63,6 @@ public:
unsigned int num; /* client number */ unsigned int num; /* client number */
PeakBuffer output_buffer;
/** is this client waiting for an "idle" response? */ /** is this client waiting for an "idle" response? */
bool idle_waiting; bool idle_waiting;
...@@ -98,23 +89,35 @@ public: ...@@ -98,23 +89,35 @@ public:
*/ */
std::list<ClientMessage> messages; std::list<ClientMessage> messages;
Client(Partition &partition, Client(EventLoop &loop, Partition &partition,
int fd, int uid, int num); int fd, int uid, int num);
~Client(); ~Client();
bool IsConnected() const {
return BufferedSocket::IsDefined();
}
gcc_pure gcc_pure
bool IsSubscribed(const char *channel_name) const { bool IsSubscribed(const char *channel_name) const {
return subscriptions.find(channel_name) != subscriptions.end(); return subscriptions.find(channel_name) != subscriptions.end();
} }
gcc_pure gcc_pure
bool IsExpired() const { bool IsExpired() const {
return channel == nullptr; return !BufferedSocket::IsDefined();
} }
void Close(); void Close();
void SetExpired(); void SetExpired();
using BufferedSocket::Write;
private:
/* virtual methods from class BufferedSocket */
virtual InputResult OnSocketInput(const void *data,
size_t length) override;
virtual void OnSocketError(GError *error) override;
virtual void OnSocketClosed() override;
}; };
extern unsigned int client_max_connections; extern unsigned int client_max_connections;
...@@ -142,9 +145,6 @@ enum command_return ...@@ -142,9 +145,6 @@ enum command_return
client_process_line(Client *client, char *line); client_process_line(Client *client, char *line);
void void
client_write_deferred(Client *client);
void
client_write_output(Client *client); client_write_output(Client *client);
gboolean gboolean
......
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
#include "ClientList.hxx" #include "ClientList.hxx"
#include "Partition.hxx" #include "Partition.hxx"
#include "fd_util.h" #include "fd_util.h"
#include "util/fifo_buffer.h"
extern "C" { extern "C" {
#include "resolver.h" #include "resolver.h"
} }
...@@ -47,45 +46,27 @@ extern "C" { ...@@ -47,45 +46,27 @@ extern "C" {
static const char GREETING[] = "OK MPD " PROTOCOL_VERSION "\n"; static const char GREETING[] = "OK MPD " PROTOCOL_VERSION "\n";
Client::Client(Partition &_partition, Client::Client(EventLoop &_loop, Partition &_partition,
int fd, int _uid, int _num) int _fd, int _uid, int _num)
:partition(_partition), :BufferedSocket(_fd, _loop, 16384, client_max_output_buffer_size),
partition(_partition),
playlist(partition.playlist), player_control(&partition.pc), playlist(partition.playlist), player_control(&partition.pc),
input(fifo_buffer_new(4096)),
permission(getDefaultPermissions()), permission(getDefaultPermissions()),
uid(_uid), uid(_uid),
last_activity(g_timer_new()), last_activity(g_timer_new()),
num(_num), num(_num),
output_buffer(16384, client_max_output_buffer_size),
idle_waiting(false), idle_flags(0), idle_waiting(false), idle_flags(0),
num_subscriptions(0) num_subscriptions(0)
{ {
assert(fd >= 0);
channel = g_io_channel_new_socket(fd);
/* GLib is responsible for closing the file descriptor */
g_io_channel_set_close_on_unref(channel, true);
/* NULL encoding means the stream is binary safe; the MPD
protocol is UTF-8 only, but we are doing this call anyway
to prevent GLib from messing around with the stream */
g_io_channel_set_encoding(channel, NULL, NULL);
/* we prefer to do buffering */
g_io_channel_set_buffered(channel, false);
source_id = g_io_add_watch(channel,
GIOCondition(G_IO_IN|G_IO_ERR|G_IO_HUP),
client_in_event, this);
} }
Client::~Client() Client::~Client()
{ {
g_timer_destroy(last_activity); g_timer_destroy(last_activity);
fifo_buffer_free(input);
} }
void void
client_new(Partition &partition, client_new(EventLoop &loop, Partition &partition,
int fd, const struct sockaddr *sa, size_t sa_length, int uid) int fd, const struct sockaddr *sa, size_t sa_length, int uid)
{ {
static unsigned int next_client_num; static unsigned int next_client_num;
...@@ -124,7 +105,7 @@ client_new(Partition &partition, ...@@ -124,7 +105,7 @@ client_new(Partition &partition,
return; return;
} }
Client *client = new Client(partition, fd, uid, Client *client = new Client(loop, partition, fd, uid,
next_client_num++); next_client_num++);
(void)send(fd, GREETING, sizeof(GREETING) - 1, 0); (void)send(fd, GREETING, sizeof(GREETING) - 1, 0);
......
...@@ -61,7 +61,6 @@ client_process_line(Client *client, char *line) ...@@ -61,7 +61,6 @@ client_process_line(Client *client, char *line)
/* send empty idle response and leave idle mode */ /* send empty idle response and leave idle mode */
client->idle_waiting = false; client->idle_waiting = false;
command_success(client); command_success(client);
client_write_output(client);
} }
/* do nothing if the client wasn't idling: the client /* do nothing if the client wasn't idling: the client
...@@ -97,7 +96,6 @@ client_process_line(Client *client, char *line) ...@@ -97,7 +96,6 @@ client_process_line(Client *client, char *line)
if (ret == COMMAND_RETURN_OK) if (ret == COMMAND_RETURN_OK)
command_success(client); command_success(client);
client_write_output(client);
client->cmd_list.Reset(); client->cmd_list.Reset();
} else { } else {
if (!client->cmd_list.Add(line)) { if (!client->cmd_list.Add(line)) {
...@@ -130,8 +128,6 @@ client_process_line(Client *client, char *line) ...@@ -130,8 +128,6 @@ client_process_line(Client *client, char *line)
if (ret == COMMAND_RETURN_OK) if (ret == COMMAND_RETURN_OK)
command_success(client); command_success(client);
client_write_output(client);
} }
} }
......
...@@ -19,91 +19,48 @@ ...@@ -19,91 +19,48 @@
#include "config.h" #include "config.h"
#include "ClientInternal.hxx" #include "ClientInternal.hxx"
#include "util/fifo_buffer.h" #include "Main.hxx"
#include "event/Loop.hxx"
#include <assert.h> #include <assert.h>
#include <string.h> #include <string.h>
static char * BufferedSocket::InputResult
client_read_line(Client *client) Client::OnSocketInput(const void *data, size_t length)
{ {
size_t length; g_timer_start(last_activity);
const char *p = (const char *)fifo_buffer_read(client->input, &length);
if (p == NULL)
return NULL;
const char *p = (const char *)data;
const char *newline = (const char *)memchr(p, '\n', length); const char *newline = (const char *)memchr(p, '\n', length);
if (newline == NULL) if (newline == NULL)
return NULL; return InputResult::MORE;
char *line = g_strndup(p, newline - p); char *line = g_strndup(p, newline - p);
fifo_buffer_consume(client->input, newline - p + 1); BufferedSocket::ConsumeInput(newline + 1 - p);
return g_strchomp(line); enum command_return result = client_process_line(this, line);
} g_free(line);
static enum command_return
client_input_received(Client *client, size_t bytesRead)
{
char *line;
fifo_buffer_append(client->input, bytesRead);
/* process all lines */
while ((line = client_read_line(client)) != NULL) {
enum command_return ret = client_process_line(client, line);
g_free(line);
if (ret == COMMAND_RETURN_KILL ||
ret == COMMAND_RETURN_CLOSE)
return ret;
if (client->IsExpired())
return COMMAND_RETURN_CLOSE;
}
return COMMAND_RETURN_OK;
}
enum command_return switch (result) {
client_read(Client *client) case COMMAND_RETURN_OK:
{ case COMMAND_RETURN_IDLE:
GError *error = NULL; case COMMAND_RETURN_ERROR:
GIOStatus status; break;
gsize bytes_read;
assert(client != NULL); case COMMAND_RETURN_KILL:
assert(client->channel != NULL); Close();
main_loop->Break();
return InputResult::CLOSED;
size_t max_length; case COMMAND_RETURN_CLOSE:
char *p = (char *)fifo_buffer_write(client->input, &max_length); Close();
if (p == NULL) { return InputResult::CLOSED;
g_warning("[%u] buffer overflow", client->num);
return COMMAND_RETURN_CLOSE;
} }
status = g_io_channel_read_chars(client->channel, p, max_length, if (IsExpired()) {
&bytes_read, &error); Close();
switch (status) { return InputResult::CLOSED;
case G_IO_STATUS_NORMAL:
return client_input_received(client, bytes_read);
case G_IO_STATUS_AGAIN:
/* try again later, after select() */
return COMMAND_RETURN_OK;
case G_IO_STATUS_EOF:
/* peer disconnected */
return COMMAND_RETURN_CLOSE;
case G_IO_STATUS_ERROR:
/* I/O error */
g_warning("failed to read from client %d: %s",
client->num, error->message);
g_error_free(error);
return COMMAND_RETURN_CLOSE;
} }
/* unreachable */ return InputResult::AGAIN;
return COMMAND_RETURN_CLOSE;
} }
...@@ -20,98 +20,9 @@ ...@@ -20,98 +20,9 @@
#include "config.h" #include "config.h"
#include "ClientInternal.hxx" #include "ClientInternal.hxx"
#include <assert.h>
#include <string.h> #include <string.h>
#include <stdio.h> #include <stdio.h>
static size_t
client_write_direct(Client *client, const void *data, size_t length)
{
assert(client != NULL);
assert(client->channel != NULL);
assert(data != NULL);
assert(length > 0);
gsize bytes_written;
GError *error = NULL;
GIOStatus status =
g_io_channel_write_chars(client->channel, (const gchar *)data,
length, &bytes_written, &error);
switch (status) {
case G_IO_STATUS_NORMAL:
return bytes_written;
case G_IO_STATUS_AGAIN:
return 0;
case G_IO_STATUS_EOF:
/* client has disconnected */
client->SetExpired();
return 0;
case G_IO_STATUS_ERROR:
/* I/O error */
client->SetExpired();
g_warning("failed to write to %i: %s",
client->num, error->message);
g_error_free(error);
return 0;
}
/* unreachable */
assert(false);
return 0;
}
void
client_write_deferred(Client *client)
{
assert(!client_is_expired(client));
while (true) {
size_t length;
const void *data = client->output_buffer.Read(&length);
if (data == nullptr)
break;
size_t nbytes = client_write_direct(client, data, length);
if (nbytes == 0)
return;
client->output_buffer.Consume(nbytes);
if (nbytes < length)
return;
g_timer_start(client->last_activity);
}
}
static void
client_defer_output(Client *client, const void *data, size_t length)
{
if (!client->output_buffer.Append(data, length)) {
g_warning("[%u] output buffer size is "
"larger than the max (%lu)",
client->num,
(unsigned long)client_max_output_buffer_size);
/* cause client to close */
client->SetExpired();
return;
}
}
void
client_write_output(Client *client)
{
if (client->IsExpired())
return;
client_write_deferred(client);
}
/** /**
* Write a block of data to the client. * Write a block of data to the client.
*/ */
...@@ -122,7 +33,7 @@ client_write(Client *client, const char *data, size_t length) ...@@ -122,7 +33,7 @@ client_write(Client *client, const char *data, size_t length)
if (client->IsExpired() || length == 0) if (client->IsExpired() || length == 0)
return; return;
client_defer_output(client, data, length); client->Write(data, length);
} }
void void
......
...@@ -46,7 +46,7 @@ static void ...@@ -46,7 +46,7 @@ static void
listen_callback(int fd, const struct sockaddr *address, listen_callback(int fd, const struct sockaddr *address,
size_t address_length, int uid, G_GNUC_UNUSED void *ctx) size_t address_length, int uid, G_GNUC_UNUSED void *ctx)
{ {
client_new(*global_partition, client_new(*main_loop, *global_partition,
fd, address, address_length, uid); fd, address, address_length, uid);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment