Commit cffc78ad authored by Max Kellermann's avatar Max Kellermann

InputStream: store references instead of pointers

parent dcf55c7e
...@@ -285,7 +285,7 @@ size_t decoder_read(struct decoder *decoder, ...@@ -285,7 +285,7 @@ size_t decoder_read(struct decoder *decoder,
if (input_stream_available(is)) if (input_stream_available(is))
break; break;
is->cond->wait(*is->mutex); is->cond.wait(is->mutex);
} }
nbytes = input_stream_read(is, buffer, length, &error); nbytes = input_stream_read(is, buffer, length, &error);
......
...@@ -24,14 +24,13 @@ ...@@ -24,14 +24,13 @@
void void
input_stream_signal_client(struct input_stream *is) input_stream_signal_client(struct input_stream *is)
{ {
if (is->cond != NULL) is->cond.broadcast();
is->cond->broadcast();
} }
void void
input_stream_set_ready(struct input_stream *is) input_stream_set_ready(struct input_stream *is)
{ {
const ScopeLock protect(*is->mutex); const ScopeLock protect(is->mutex);
if (!is->ready) { if (!is->ready) {
is->ready = true; is->ready = true;
......
...@@ -50,11 +50,10 @@ input_stream_open(const char *url, ...@@ -50,11 +50,10 @@ input_stream_open(const char *url,
is = plugin->open(url, mutex, cond, &error); is = plugin->open(url, mutex, cond, &error);
if (is != NULL) { if (is != NULL) {
assert(is->plugin != NULL); assert(is->plugin.close != NULL);
assert(is->plugin->close != NULL); assert(is->plugin.read != NULL);
assert(is->plugin->read != NULL); assert(is->plugin.eof != NULL);
assert(is->plugin->eof != NULL); assert(!is->seekable || is->plugin.seek != NULL);
assert(!is->seekable || is->plugin->seek != NULL);
is = input_rewind_open(is); is = input_rewind_open(is);
...@@ -73,35 +72,31 @@ bool ...@@ -73,35 +72,31 @@ bool
input_stream_check(struct input_stream *is, GError **error_r) input_stream_check(struct input_stream *is, GError **error_r)
{ {
assert(is != NULL); assert(is != NULL);
assert(is->plugin != NULL);
return is->plugin->check == NULL || return is->plugin.check == NULL ||
is->plugin->check(is, error_r); is->plugin.check(is, error_r);
} }
void void
input_stream_update(struct input_stream *is) input_stream_update(struct input_stream *is)
{ {
assert(is != NULL); assert(is != NULL);
assert(is->plugin != NULL);
if (is->plugin->update != NULL) if (is->plugin.update != NULL)
is->plugin->update(is); is->plugin.update(is);
} }
void void
input_stream_wait_ready(struct input_stream *is) input_stream_wait_ready(struct input_stream *is)
{ {
assert(is != NULL); assert(is != NULL);
assert(is->mutex != NULL);
assert(is->cond != NULL);
while (true) { while (true) {
input_stream_update(is); input_stream_update(is);
if (is->ready) if (is->ready)
break; break;
is->cond->wait(*is->mutex); is->cond.wait(is->mutex);
} }
} }
...@@ -109,10 +104,8 @@ void ...@@ -109,10 +104,8 @@ void
input_stream_lock_wait_ready(struct input_stream *is) input_stream_lock_wait_ready(struct input_stream *is)
{ {
assert(is != NULL); assert(is != NULL);
assert(is->mutex != NULL);
assert(is->cond != NULL);
const ScopeLock protect(*is->mutex); const ScopeLock protect(is->mutex);
input_stream_wait_ready(is); input_stream_wait_ready(is);
} }
...@@ -173,12 +166,11 @@ input_stream_seek(struct input_stream *is, goffset offset, int whence, ...@@ -173,12 +166,11 @@ input_stream_seek(struct input_stream *is, goffset offset, int whence,
GError **error_r) GError **error_r)
{ {
assert(is != NULL); assert(is != NULL);
assert(is->plugin != NULL);
if (is->plugin->seek == NULL) if (is->plugin.seek == NULL)
return false; return false;
return is->plugin->seek(is, offset, whence, error_r); return is->plugin.seek(is, offset, whence, error_r);
} }
bool bool
...@@ -186,16 +178,11 @@ input_stream_lock_seek(struct input_stream *is, goffset offset, int whence, ...@@ -186,16 +178,11 @@ input_stream_lock_seek(struct input_stream *is, goffset offset, int whence,
GError **error_r) GError **error_r)
{ {
assert(is != NULL); assert(is != NULL);
assert(is->plugin != NULL);
if (is->plugin->seek == NULL) if (is->plugin.seek == NULL)
return false; return false;
if (is->mutex == NULL) const ScopeLock protect(is->mutex);
/* no locking */
return input_stream_seek(is, offset, whence, error_r);
const ScopeLock protect(*is->mutex);
return input_stream_seek(is, offset, whence, error_r); return input_stream_seek(is, offset, whence, error_r);
} }
...@@ -203,10 +190,9 @@ struct tag * ...@@ -203,10 +190,9 @@ struct tag *
input_stream_tag(struct input_stream *is) input_stream_tag(struct input_stream *is)
{ {
assert(is != NULL); assert(is != NULL);
assert(is->plugin != NULL);
return is->plugin->tag != NULL return is->plugin.tag != NULL
? is->plugin->tag(is) ? is->plugin.tag(is)
: NULL; : NULL;
} }
...@@ -214,16 +200,11 @@ struct tag * ...@@ -214,16 +200,11 @@ struct tag *
input_stream_lock_tag(struct input_stream *is) input_stream_lock_tag(struct input_stream *is)
{ {
assert(is != NULL); assert(is != NULL);
assert(is->plugin != NULL);
if (is->plugin->tag == NULL) if (is->plugin.tag == NULL)
return nullptr; return nullptr;
if (is->mutex == NULL) const ScopeLock protect(is->mutex);
/* no locking */
return input_stream_tag(is);
const ScopeLock protect(*is->mutex);
return input_stream_tag(is); return input_stream_tag(is);
} }
...@@ -231,10 +212,9 @@ bool ...@@ -231,10 +212,9 @@ bool
input_stream_available(struct input_stream *is) input_stream_available(struct input_stream *is)
{ {
assert(is != NULL); assert(is != NULL);
assert(is->plugin != NULL);
return is->plugin->available != NULL return is->plugin.available != NULL
? is->plugin->available(is) ? is->plugin.available(is)
: true; : true;
} }
...@@ -245,7 +225,7 @@ input_stream_read(struct input_stream *is, void *ptr, size_t size, ...@@ -245,7 +225,7 @@ input_stream_read(struct input_stream *is, void *ptr, size_t size,
assert(ptr != NULL); assert(ptr != NULL);
assert(size > 0); assert(size > 0);
return is->plugin->read(is, ptr, size, error_r); return is->plugin.read(is, ptr, size, error_r);
} }
size_t size_t
...@@ -255,35 +235,26 @@ input_stream_lock_read(struct input_stream *is, void *ptr, size_t size, ...@@ -255,35 +235,26 @@ input_stream_lock_read(struct input_stream *is, void *ptr, size_t size,
assert(ptr != NULL); assert(ptr != NULL);
assert(size > 0); assert(size > 0);
if (is->mutex == NULL) const ScopeLock protect(is->mutex);
/* no locking */
return input_stream_read(is, ptr, size, error_r);
const ScopeLock protect(*is->mutex);
return input_stream_read(is, ptr, size, error_r); return input_stream_read(is, ptr, size, error_r);
} }
void input_stream_close(struct input_stream *is) void input_stream_close(struct input_stream *is)
{ {
is->plugin->close(is); is->plugin.close(is);
} }
bool input_stream_eof(struct input_stream *is) bool input_stream_eof(struct input_stream *is)
{ {
return is->plugin->eof(is); return is->plugin.eof(is);
} }
bool bool
input_stream_lock_eof(struct input_stream *is) input_stream_lock_eof(struct input_stream *is)
{ {
assert(is != NULL); assert(is != NULL);
assert(is->plugin != NULL);
if (is->mutex == NULL)
/* no locking */
return input_stream_eof(is);
const ScopeLock protect(*is->mutex); const ScopeLock protect(is->mutex);
return input_stream_eof(is); return input_stream_eof(is);
} }
...@@ -34,7 +34,7 @@ struct input_stream { ...@@ -34,7 +34,7 @@ struct input_stream {
/** /**
* the plugin which implements this input stream * the plugin which implements this input stream
*/ */
const struct input_plugin *plugin; const struct input_plugin &plugin;
/** /**
* The absolute URI which was used to open this stream. May * The absolute URI which was used to open this stream. May
...@@ -50,7 +50,7 @@ struct input_stream { ...@@ -50,7 +50,7 @@ struct input_stream {
* This object is allocated by the client, and the client is * This object is allocated by the client, and the client is
* responsible for freeing it. * responsible for freeing it.
*/ */
Mutex *mutex; Mutex &mutex;
/** /**
* A cond that gets signalled when the state of this object * A cond that gets signalled when the state of this object
...@@ -60,7 +60,7 @@ struct input_stream { ...@@ -60,7 +60,7 @@ struct input_stream {
* This object is allocated by the client, and the client is * This object is allocated by the client, and the client is
* responsible for freeing it. * responsible for freeing it.
*/ */
Cond *cond; Cond &cond;
/** /**
* indicates whether the stream is ready for reading and * indicates whether the stream is ready for reading and
...@@ -90,8 +90,8 @@ struct input_stream { ...@@ -90,8 +90,8 @@ struct input_stream {
input_stream(const input_plugin &_plugin, input_stream(const input_plugin &_plugin,
const char *_uri, Mutex &_mutex, Cond &_cond) const char *_uri, Mutex &_mutex, Cond &_cond)
:plugin(&_plugin), uri(g_strdup(_uri)), :plugin(_plugin), uri(g_strdup(_uri)),
mutex(&_mutex), cond(&_cond), mutex(_mutex), cond(_cond),
ready(false), seekable(false), ready(false), seekable(false),
size(-1), offset(0), size(-1), offset(0),
mime(nullptr) { mime(nullptr) {
...@@ -108,14 +108,14 @@ gcc_nonnull(1) ...@@ -108,14 +108,14 @@ gcc_nonnull(1)
static inline void static inline void
input_stream_lock(struct input_stream *is) input_stream_lock(struct input_stream *is)
{ {
is->mutex->lock(); is->mutex.lock();
} }
gcc_nonnull(1) gcc_nonnull(1)
static inline void static inline void
input_stream_unlock(struct input_stream *is) input_stream_unlock(struct input_stream *is)
{ {
is->mutex->unlock(); is->mutex.unlock();
} }
#endif #endif
...@@ -517,7 +517,7 @@ wavpack_streamdecode(struct decoder * decoder, struct input_stream *is) ...@@ -517,7 +517,7 @@ wavpack_streamdecode(struct decoder * decoder, struct input_stream *is)
struct wavpack_input isp, isp_wvc; struct wavpack_input isp, isp_wvc;
bool can_seek = is->seekable; bool can_seek = is->seekable;
is_wvc = wavpack_open_wvc(decoder, is->uri, *is->mutex, *is->cond, is_wvc = wavpack_open_wvc(decoder, is->uri, is->mutex, is->cond,
&isp_wvc); &isp_wvc);
if (is_wvc != NULL) { if (is_wvc != NULL) {
open_flags |= OPEN_WVC; open_flags |= OPEN_WVC;
......
...@@ -462,12 +462,12 @@ input_curl_abort_all_requests(GError *error) ...@@ -462,12 +462,12 @@ input_curl_abort_all_requests(GError *error)
input_curl_easy_free(c); input_curl_easy_free(c);
const ScopeLock protect(*c->base.mutex); const ScopeLock protect(c->base.mutex);
c->postponed_error = g_error_copy(error); c->postponed_error = g_error_copy(error);
c->base.ready = true; c->base.ready = true;
c->base.cond->broadcast(); c->base.cond.broadcast();
} }
g_error_free(error); g_error_free(error);
...@@ -487,7 +487,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status) ...@@ -487,7 +487,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status)
assert(c->easy == NULL); assert(c->easy == NULL);
assert(c->postponed_error == NULL); assert(c->postponed_error == NULL);
const ScopeLock protect(*c->base.mutex); const ScopeLock protect(c->base.mutex);
if (result != CURLE_OK) { if (result != CURLE_OK) {
c->postponed_error = g_error_new(curl_quark(), result, c->postponed_error = g_error_new(curl_quark(), result,
...@@ -501,7 +501,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status) ...@@ -501,7 +501,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status)
c->base.ready = true; c->base.ready = true;
c->base.cond->broadcast(); c->base.cond.broadcast();
} }
static void static void
...@@ -735,7 +735,7 @@ static bool ...@@ -735,7 +735,7 @@ static bool
fill_buffer(struct input_curl *c, GError **error_r) fill_buffer(struct input_curl *c, GError **error_r)
{ {
while (c->easy != NULL && c->buffers.empty()) while (c->easy != NULL && c->buffers.empty())
c->base.cond->wait(*c->base.mutex); c->base.cond.wait(c->base.mutex);
if (c->postponed_error != NULL) { if (c->postponed_error != NULL) {
g_propagate_error(error_r, c->postponed_error); g_propagate_error(error_r, c->postponed_error);
...@@ -855,9 +855,9 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, ...@@ -855,9 +855,9 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size,
is->offset += (goffset)nbytes; is->offset += (goffset)nbytes;
if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) { if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) {
c->base.mutex->unlock(); c->base.mutex.unlock();
io_thread_call(input_curl_resume, c); io_thread_call(input_curl_resume, c);
c->base.mutex->lock(); c->base.mutex.lock();
} }
return nbytes; return nbytes;
...@@ -974,7 +974,7 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) ...@@ -974,7 +974,7 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
if (size == 0) if (size == 0)
return 0; return 0;
const ScopeLock protect(*c->base.mutex); const ScopeLock protect(c->base.mutex);
if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) {
c->paused = true; c->paused = true;
...@@ -984,7 +984,7 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) ...@@ -984,7 +984,7 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
c->buffers.emplace_back(ptr, size); c->buffers.emplace_back(ptr, size);
c->base.ready = true; c->base.ready = true;
c->base.cond->broadcast(); c->base.cond.broadcast();
return size; return size;
} }
...@@ -1108,7 +1108,7 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, ...@@ -1108,7 +1108,7 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
/* close the old connection and open a new one */ /* close the old connection and open a new one */
c->base.mutex->unlock(); c->base.mutex.unlock();
input_curl_easy_free_indirect(c); input_curl_easy_free_indirect(c);
c->buffers.clear(); c->buffers.clear();
...@@ -1137,10 +1137,10 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, ...@@ -1137,10 +1137,10 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
if (!input_curl_easy_add_indirect(c, error_r)) if (!input_curl_easy_add_indirect(c, error_r))
return false; return false;
c->base.mutex->lock(); c->base.mutex.lock();
while (!c->base.ready) while (!c->base.ready)
c->base.cond->wait(*c->base.mutex); c->base.cond.wait(c->base.mutex);
if (c->postponed_error != NULL) { if (c->postponed_error != NULL) {
g_propagate_error(error_r, c->postponed_error); g_propagate_error(error_r, c->postponed_error);
......
...@@ -62,7 +62,7 @@ struct RewindInputStream { ...@@ -62,7 +62,7 @@ struct RewindInputStream {
RewindInputStream(input_stream *_input) RewindInputStream(input_stream *_input)
:base(rewind_input_plugin, _input->uri, :base(rewind_input_plugin, _input->uri,
*_input->mutex, *_input->cond), _input->mutex, _input->cond),
input(_input), tail(0) { input(_input), tail(0) {
} }
......
...@@ -174,7 +174,7 @@ input_soup_session_callback(G_GNUC_UNUSED SoupSession *session, ...@@ -174,7 +174,7 @@ input_soup_session_callback(G_GNUC_UNUSED SoupSession *session,
assert(msg == s->msg); assert(msg == s->msg);
assert(!s->completed); assert(!s->completed);
const ScopeLock protect(*s->base.mutex); const ScopeLock protect(s->base.mutex);
if (!s->base.ready) if (!s->base.ready)
s->CopyError(msg); s->CopyError(msg);
...@@ -183,7 +183,7 @@ input_soup_session_callback(G_GNUC_UNUSED SoupSession *session, ...@@ -183,7 +183,7 @@ input_soup_session_callback(G_GNUC_UNUSED SoupSession *session,
s->alive = false; s->alive = false;
s->completed = true; s->completed = true;
s->base.cond->broadcast(); s->base.cond.broadcast();
} }
static void static void
...@@ -191,10 +191,10 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data) ...@@ -191,10 +191,10 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data)
{ {
SoupInputStream *s = (SoupInputStream *)user_data; SoupInputStream *s = (SoupInputStream *)user_data;
s->base.mutex->lock(); s->base.mutex.lock();
if (!s->CopyError(msg)) { if (!s->CopyError(msg)) {
s->base.mutex->unlock(); s->base.mutex.unlock();
soup_session_cancel_message(soup_session, msg, soup_session_cancel_message(soup_session, msg,
SOUP_STATUS_CANCELLED); SOUP_STATUS_CANCELLED);
...@@ -202,8 +202,8 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data) ...@@ -202,8 +202,8 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data)
} }
s->base.ready = true; s->base.ready = true;
s->base.cond->broadcast(); s->base.cond.broadcast();
s->base.mutex->unlock(); s->base.mutex.unlock();
soup_message_body_set_accumulate(msg->response_body, false); soup_message_body_set_accumulate(msg->response_body, false);
} }
...@@ -215,7 +215,7 @@ input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data) ...@@ -215,7 +215,7 @@ input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data)
assert(msg == s->msg); assert(msg == s->msg);
const ScopeLock protect(*s->base.mutex); const ScopeLock protect(s->base.mutex);
g_queue_push_tail(s->buffers, soup_buffer_copy(chunk)); g_queue_push_tail(s->buffers, soup_buffer_copy(chunk));
s->total_buffered += chunk->length; s->total_buffered += chunk->length;
...@@ -225,8 +225,8 @@ input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data) ...@@ -225,8 +225,8 @@ input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data)
soup_session_pause_message(soup_session, msg); soup_session_pause_message(soup_session, msg);
} }
s->base.cond->broadcast(); s->base.cond.broadcast();
s->base.mutex->unlock(); s->base.mutex.unlock();
} }
static void static void
...@@ -236,14 +236,14 @@ input_soup_got_body(G_GNUC_UNUSED SoupMessage *msg, gpointer user_data) ...@@ -236,14 +236,14 @@ input_soup_got_body(G_GNUC_UNUSED SoupMessage *msg, gpointer user_data)
assert(msg == s->msg); assert(msg == s->msg);
const ScopeLock protect(*s->base.mutex); const ScopeLock protect(s->base.mutex);
s->base.ready = true; s->base.ready = true;
s->eof = true; s->eof = true;
s->alive = false; s->alive = false;
s->base.cond->broadcast(); s->base.cond.broadcast();
s->base.mutex->unlock(); s->base.mutex.unlock();
} }
inline bool inline bool
...@@ -261,7 +261,7 @@ SoupInputStream::WaitData() ...@@ -261,7 +261,7 @@ SoupInputStream::WaitData()
assert(current_consumed == 0); assert(current_consumed == 0);
base.cond->wait(*base.mutex); base.cond.wait(base.mutex);
} }
} }
...@@ -339,22 +339,22 @@ input_soup_cancel(gpointer data) ...@@ -339,22 +339,22 @@ input_soup_cancel(gpointer data)
SoupInputStream::~SoupInputStream() SoupInputStream::~SoupInputStream()
{ {
base.mutex->lock(); base.mutex.lock();
if (!completed) { if (!completed) {
/* the messages's session callback hasn't been invoked /* the messages's session callback hasn't been invoked
yet; cancel it and wait for completion */ yet; cancel it and wait for completion */
base.mutex->unlock(); base.mutex.unlock();
io_thread_call(input_soup_cancel, this); io_thread_call(input_soup_cancel, this);
base.mutex->lock(); base.mutex.lock();
while (!completed) while (!completed)
base.cond->wait(*base.mutex); base.cond.wait(base.mutex);
} }
base.mutex->unlock(); base.mutex.unlock();
SoupBuffer *buffer; SoupBuffer *buffer;
while ((buffer = (SoupBuffer *)g_queue_pop_head(buffers)) != NULL) while ((buffer = (SoupBuffer *)g_queue_pop_head(buffers)) != NULL)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment