Commit 3a2ec50d authored by Max Kellermann's avatar Max Kellermann

input/curl: move all libCURL calls to the I/O thread

This adds some overheads for indirect calls to the I/O thread, but reduces the amount of global locks. Next step will be switching to per-request locks.
parent ce9aeed4
...@@ -178,11 +178,13 @@ curl_quark(void) ...@@ -178,11 +178,13 @@ curl_quark(void)
/** /**
* Find a request by its CURL "easy" handle. * Find a request by its CURL "easy" handle.
* *
* The caller must lock the mutex. * Runs in the I/O thread. No lock needed.
*/ */
static struct input_curl * static struct input_curl *
input_curl_find_request(CURL *easy) input_curl_find_request(CURL *easy)
{ {
assert(io_thread_inside());
for (GSList *i = curl.requests; i != NULL; i = g_slist_next(i)) { for (GSList *i = curl.requests; i != NULL; i = g_slist_next(i)) {
struct input_curl *c = i->data; struct input_curl *c = i->data;
if (c->easy == easy) if (c->easy == easy)
...@@ -197,6 +199,8 @@ input_curl_find_request(CURL *easy) ...@@ -197,6 +199,8 @@ input_curl_find_request(CURL *easy)
static gpointer static gpointer
input_curl_resume(gpointer data) input_curl_resume(gpointer data)
{ {
assert(io_thread_inside());
struct input_curl *c = data; struct input_curl *c = data;
if (c->paused) { if (c->paused) {
...@@ -240,7 +244,7 @@ input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) ...@@ -240,7 +244,7 @@ input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds)
* Updates all registered GPollFD objects, unregisters old ones, * Updates all registered GPollFD objects, unregisters old ones,
* registers new ones. * registers new ones.
* *
* The caller must lock the mutex. Runs in the I/O thread. * Runs in the I/O thread. No lock needed.
*/ */
static void static void
curl_update_fds(void) curl_update_fds(void)
...@@ -308,14 +312,11 @@ static gboolean ...@@ -308,14 +312,11 @@ static gboolean
input_curl_dirty_callback(G_GNUC_UNUSED gpointer data) input_curl_dirty_callback(G_GNUC_UNUSED gpointer data)
{ {
assert(io_thread_inside()); assert(io_thread_inside());
g_static_mutex_lock(&curl.mutex);
assert(curl.dirty_source_id != 0 || curl.requests == NULL); assert(curl.dirty_source_id != 0 || curl.requests == NULL);
curl.dirty_source_id = 0; curl.dirty_source_id = 0;
curl_update_fds(); curl_update_fds();
g_static_mutex_unlock(&curl.mutex);
return false; return false;
} }
...@@ -323,7 +324,7 @@ input_curl_dirty_callback(G_GNUC_UNUSED gpointer data) ...@@ -323,7 +324,7 @@ input_curl_dirty_callback(G_GNUC_UNUSED gpointer data)
* Schedule a refresh of curl.fds. Does nothing if that is already * Schedule a refresh of curl.fds. Does nothing if that is already
* scheduled. * scheduled.
* *
* The caller must lock the mutex. * No lock needed.
*/ */
static void static void
input_curl_schedule_update(void) input_curl_schedule_update(void)
...@@ -336,9 +337,13 @@ input_curl_schedule_update(void) ...@@ -336,9 +337,13 @@ input_curl_schedule_update(void)
io_thread_idle_add(input_curl_dirty_callback, NULL); io_thread_idle_add(input_curl_dirty_callback, NULL);
} }
/**
* Runs in the I/O thread. No lock needed.
*/
static bool static bool
input_curl_easy_add(struct input_curl *c, GError **error_r) input_curl_easy_add(struct input_curl *c, GError **error_r)
{ {
assert(io_thread_inside());
assert(c != NULL); assert(c != NULL);
assert(c->easy != NULL); assert(c->easy != NULL);
assert(input_curl_find_request(c->easy) == NULL); assert(input_curl_find_request(c->easy) == NULL);
...@@ -358,15 +363,50 @@ input_curl_easy_add(struct input_curl *c, GError **error_r) ...@@ -358,15 +363,50 @@ input_curl_easy_add(struct input_curl *c, GError **error_r)
return true; return true;
} }
struct easy_add_params {
struct input_curl *c;
GError **error_r;
};
static gpointer
input_curl_easy_add_callback(gpointer data)
{
const struct easy_add_params *params = data;
bool success = input_curl_easy_add(params->c, params->error_r);
return GUINT_TO_POINTER(success);
}
/**
* Call input_curl_easy_add() in the I/O thread. May be called from
* any thread. Caller must not hold a mutex.
*/
static bool
input_curl_easy_add_indirect(struct input_curl *c, GError **error_r)
{
assert(c != NULL);
assert(c->easy != NULL);
struct easy_add_params params = {
.c = c,
.error_r = error_r,
};
gpointer result =
io_thread_call(input_curl_easy_add_callback, &params);
return GPOINTER_TO_UINT(result);
}
/** /**
* Frees the current "libcurl easy" handle, and everything associated * Frees the current "libcurl easy" handle, and everything associated
* with it. * with it.
* *
* The caller must lock the mutex. * Runs in the I/O thread.
*/ */
static void static void
input_curl_easy_free(struct input_curl *c) input_curl_easy_free(struct input_curl *c)
{ {
assert(io_thread_inside());
assert(c != NULL); assert(c != NULL);
if (c->easy == NULL) if (c->easy == NULL)
...@@ -390,13 +430,9 @@ input_curl_easy_free_callback(gpointer data) ...@@ -390,13 +430,9 @@ input_curl_easy_free_callback(gpointer data)
{ {
struct input_curl *c = data; struct input_curl *c = data;
g_static_mutex_lock(&curl.mutex);
input_curl_easy_free(c); input_curl_easy_free(c);
curl_update_fds(); curl_update_fds();
g_static_mutex_unlock(&curl.mutex);
return NULL; return NULL;
} }
...@@ -416,7 +452,7 @@ input_curl_easy_free_indirect(struct input_curl *c) ...@@ -416,7 +452,7 @@ input_curl_easy_free_indirect(struct input_curl *c)
/** /**
* Abort and free all HTTP requests. * Abort and free all HTTP requests.
* *
* The caller must lock the mutex. Runs in the I/O thread. * Runs in the I/O thread. The caller must not hold locks.
*/ */
static void static void
input_curl_abort_all_requests(GError *error) input_curl_abort_all_requests(GError *error)
...@@ -424,6 +460,8 @@ input_curl_abort_all_requests(GError *error) ...@@ -424,6 +460,8 @@ input_curl_abort_all_requests(GError *error)
assert(io_thread_inside()); assert(io_thread_inside());
assert(error != NULL); assert(error != NULL);
g_static_mutex_lock(&curl.mutex);
while (curl.requests != NULL) { while (curl.requests != NULL) {
struct input_curl *c = curl.requests->data; struct input_curl *c = curl.requests->data;
assert(c->postponed_error == NULL); assert(c->postponed_error == NULL);
...@@ -436,12 +474,13 @@ input_curl_abort_all_requests(GError *error) ...@@ -436,12 +474,13 @@ input_curl_abort_all_requests(GError *error)
g_error_free(error); g_error_free(error);
g_cond_broadcast(curl.cond); g_cond_broadcast(curl.cond);
g_static_mutex_unlock(&curl.mutex);
} }
/** /**
* A HTTP request is finished. * A HTTP request is finished.
* *
* The caller must lock the mutex. Runs in the I/O thread. * Runs in the I/O thread. The caller must not hold locks.
*/ */
static void static void
input_curl_request_done(struct input_curl *c, CURLcode result, long status) input_curl_request_done(struct input_curl *c, CURLcode result, long status)
...@@ -451,6 +490,8 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status) ...@@ -451,6 +490,8 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status)
assert(c->easy == NULL); assert(c->easy == NULL);
assert(c->postponed_error == NULL); assert(c->postponed_error == NULL);
g_static_mutex_lock(&curl.mutex);
if (result != CURLE_OK) { if (result != CURLE_OK) {
c->postponed_error = g_error_new(curl_quark(), result, c->postponed_error = g_error_new(curl_quark(), result,
"curl failed: %s", "curl failed: %s",
...@@ -463,6 +504,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status) ...@@ -463,6 +504,7 @@ input_curl_request_done(struct input_curl *c, CURLcode result, long status)
c->base.ready = true; c->base.ready = true;
g_cond_broadcast(curl.cond); g_cond_broadcast(curl.cond);
g_static_mutex_unlock(&curl.mutex);
} }
static void static void
...@@ -481,7 +523,7 @@ input_curl_handle_done(CURL *easy_handle, CURLcode result) ...@@ -481,7 +523,7 @@ input_curl_handle_done(CURL *easy_handle, CURLcode result)
/** /**
* Check for finished HTTP responses. * Check for finished HTTP responses.
* *
* The caller must lock the mutex. Runs in the I/O thread. * Runs in the I/O thread. The caller must not hold locks.
*/ */
static void static void
input_curl_info_read(void) input_curl_info_read(void)
...@@ -501,7 +543,7 @@ input_curl_info_read(void) ...@@ -501,7 +543,7 @@ input_curl_info_read(void)
/** /**
* Give control to CURL. * Give control to CURL.
* *
* The caller must lock the mutex. Runs in the I/O thread. * Runs in the I/O thread. The caller must not hold locks.
*/ */
static bool static bool
input_curl_perform(void) input_curl_perform(void)
...@@ -611,13 +653,9 @@ input_curl_source_dispatch(G_GNUC_UNUSED GSource *source, ...@@ -611,13 +653,9 @@ input_curl_source_dispatch(G_GNUC_UNUSED GSource *source,
G_GNUC_UNUSED GSourceFunc callback, G_GNUC_UNUSED GSourceFunc callback,
G_GNUC_UNUSED gpointer user_data) G_GNUC_UNUSED gpointer user_data)
{ {
g_static_mutex_lock(&curl.mutex);
if (input_curl_perform()) if (input_curl_perform())
input_curl_info_read(); input_curl_info_read();
g_static_mutex_unlock(&curl.mutex);
return true; return true;
} }
...@@ -1074,9 +1112,12 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) ...@@ -1074,9 +1112,12 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
if (size == 0) if (size == 0)
return 0; return 0;
g_static_mutex_lock(&curl.mutex);
#if LIBCURL_VERSION_NUM >= 0x071200 #if LIBCURL_VERSION_NUM >= 0x071200
if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) {
c->paused = true; c->paused = true;
g_static_mutex_unlock(&curl.mutex);
return CURL_WRITEFUNC_PAUSE; return CURL_WRITEFUNC_PAUSE;
} }
#endif #endif
...@@ -1091,6 +1132,7 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) ...@@ -1091,6 +1132,7 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
c->base.ready = true; c->base.ready = true;
g_cond_broadcast(curl.cond); g_cond_broadcast(curl.cond);
g_static_mutex_unlock(&curl.mutex);
return size; return size;
} }
...@@ -1197,6 +1239,8 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, ...@@ -1197,6 +1239,8 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
/* check if we can fast-forward the buffer */ /* check if we can fast-forward the buffer */
g_static_mutex_lock(&curl.mutex);
while (offset > is->offset && !g_queue_is_empty(c->buffers)) { while (offset > is->offset && !g_queue_is_empty(c->buffers)) {
struct buffer *buffer; struct buffer *buffer;
size_t length; size_t length;
...@@ -1214,6 +1258,8 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, ...@@ -1214,6 +1258,8 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
is->offset += length; is->offset += length;
} }
g_static_mutex_unlock(&curl.mutex);
if (offset == is->offset) if (offset == is->offset)
return true; return true;
...@@ -1241,14 +1287,12 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, ...@@ -1241,14 +1287,12 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
curl_easy_setopt(c->easy, CURLOPT_RANGE, c->range); curl_easy_setopt(c->easy, CURLOPT_RANGE, c->range);
} }
g_static_mutex_lock(&curl.mutex);
c->base.ready = false; c->base.ready = false;
if (!input_curl_easy_add(c, error_r)) { if (!input_curl_easy_add_indirect(c, error_r))
g_static_mutex_unlock(&curl.mutex);
return false; return false;
}
g_static_mutex_lock(&curl.mutex);
while (!c->base.ready) while (!c->base.ready)
g_cond_wait(curl.cond, g_static_mutex_get_mutex(&curl.mutex)); g_cond_wait(curl.cond, g_static_mutex_get_mutex(&curl.mutex));
...@@ -1291,15 +1335,11 @@ input_curl_open(const char *url, GError **error_r) ...@@ -1291,15 +1335,11 @@ input_curl_open(const char *url, GError **error_r)
return NULL; return NULL;
} }
g_static_mutex_lock(&curl.mutex); if (!input_curl_easy_add_indirect(c, error_r)) {
if (!input_curl_easy_add(c, error_r)) {
g_static_mutex_unlock(&curl.mutex);
input_curl_free(c); input_curl_free(c);
return NULL; return NULL;
} }
g_static_mutex_unlock(&curl.mutex);
return &c->base; return &c->base;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment