Commit adb68450 authored by Max Kellermann's avatar Max Kellermann

input/curl: move code into class CurlMulti

Move all global variables there, and keep just one global variable: the pointer to the CurlMulti instance. Prepares for the next commit.
parent 2520f6fe
...@@ -179,18 +179,40 @@ struct input_curl { ...@@ -179,18 +179,40 @@ struct input_curl {
}; };
/** /**
* This class monitors all CURL file descriptors. * Manager for the global CURLM object.
*/ */
class CurlSockets final : private MultiSocketMonitor { class CurlMulti final : private MultiSocketMonitor {
CURLM *const multi;
public: public:
CurlSockets(EventLoop &_loop) CurlMulti(EventLoop &_loop, CURLM *_multi);
:MultiSocketMonitor(_loop) {}
~CurlMulti() {
curl_multi_cleanup(multi);
}
bool Add(input_curl *c, Error &error);
void Remove(input_curl *c);
/**
* Check for finished HTTP responses.
*
* Runs in the I/O thread. The caller must not hold locks.
*/
void ReadInfo();
/**
* Give control to CURL.
*
* Runs in the I/O thread. The caller must not hold locks.
*/
void Perform();
using MultiSocketMonitor::InvalidateSockets; using MultiSocketMonitor::InvalidateSockets;
private:
void UpdateSockets(); void UpdateSockets();
private:
virtual int PrepareSockets() override; virtual int PrepareSockets() override;
virtual void DispatchSockets() override; virtual void DispatchSockets() override;
}; };
...@@ -202,16 +224,17 @@ static struct curl_slist *http_200_aliases; ...@@ -202,16 +224,17 @@ static struct curl_slist *http_200_aliases;
static const char *proxy, *proxy_user, *proxy_password; static const char *proxy, *proxy_user, *proxy_password;
static unsigned proxy_port; static unsigned proxy_port;
static struct { static CurlMulti *curl_multi;
CURLM *multi;
CurlSockets *sockets;
} curl;
static constexpr Domain http_domain("http"); static constexpr Domain http_domain("http");
static constexpr Domain curl_domain("curl"); static constexpr Domain curl_domain("curl");
static constexpr Domain curlm_domain("curlm"); static constexpr Domain curlm_domain("curlm");
CurlMulti::CurlMulti(EventLoop &_loop, CURLM *_multi)
:MultiSocketMonitor(_loop), multi(_multi)
{
}
/** /**
* Find a request by its CURL "easy" handle. * Find a request by its CURL "easy" handle.
* *
...@@ -239,7 +262,7 @@ input_curl_resume(struct input_curl *c) ...@@ -239,7 +262,7 @@ input_curl_resume(struct input_curl *c)
if (c->paused) { if (c->paused) {
c->paused = false; c->paused = false;
curl_easy_pause(c->easy, CURLPAUSE_CONT); curl_easy_pause(c->easy, CURLPAUSE_CONT);
curl.sockets->InvalidateSockets(); curl_multi->InvalidateSockets();
} }
} }
...@@ -279,8 +302,8 @@ input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) ...@@ -279,8 +302,8 @@ input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds)
* *
* Runs in the I/O thread. No lock needed. * Runs in the I/O thread. No lock needed.
*/ */
void inline void
CurlSockets::UpdateSockets() CurlMulti::UpdateSockets()
{ {
assert(io_thread_inside()); assert(io_thread_inside());
...@@ -291,7 +314,7 @@ CurlSockets::UpdateSockets() ...@@ -291,7 +314,7 @@ CurlSockets::UpdateSockets()
FD_ZERO(&efds); FD_ZERO(&efds);
int max_fd; int max_fd;
CURLMcode mcode = curl_multi_fdset(curl.multi, &rfds, &wfds, CURLMcode mcode = curl_multi_fdset(multi, &rfds, &wfds,
&efds, &max_fd); &efds, &max_fd);
if (mcode != CURLM_OK) { if (mcode != CURLM_OK) {
FormatError(curlm_domain, FormatError(curlm_domain,
...@@ -315,14 +338,14 @@ CurlSockets::UpdateSockets() ...@@ -315,14 +338,14 @@ CurlSockets::UpdateSockets()
/** /**
* Runs in the I/O thread. No lock needed. * Runs in the I/O thread. No lock needed.
*/ */
static bool inline bool
input_curl_easy_add(struct input_curl *c, Error &error) CurlMulti::Add(struct input_curl *c, Error &error)
{ {
assert(io_thread_inside()); assert(io_thread_inside());
assert(c != nullptr); assert(c != nullptr);
assert(c->easy != nullptr); assert(c->easy != nullptr);
CURLMcode mcode = curl_multi_add_handle(curl.multi, c->easy); CURLMcode mcode = curl_multi_add_handle(multi, c->easy);
if (mcode != CURLM_OK) { if (mcode != CURLM_OK) {
error.Format(curlm_domain, mcode, error.Format(curlm_domain, mcode,
"curl_multi_add_handle() failed: %s", "curl_multi_add_handle() failed: %s",
...@@ -330,7 +353,7 @@ input_curl_easy_add(struct input_curl *c, Error &error) ...@@ -330,7 +353,7 @@ input_curl_easy_add(struct input_curl *c, Error &error)
return false; return false;
} }
curl.sockets->InvalidateSockets(); InvalidateSockets();
return true; return true;
} }
...@@ -347,11 +370,17 @@ input_curl_easy_add_indirect(struct input_curl *c, Error &error) ...@@ -347,11 +370,17 @@ input_curl_easy_add_indirect(struct input_curl *c, Error &error)
bool result; bool result;
BlockingCall(io_thread_get(), [c, &error, &result](){ BlockingCall(io_thread_get(), [c, &error, &result](){
result = input_curl_easy_add(c, error); result = curl_multi->Add(c, error);
}); });
return result; return result;
} }
inline void
CurlMulti::Remove(input_curl *c)
{
curl_multi_remove_handle(multi, c->easy);
}
/** /**
* Frees the current "libcurl easy" handle, and everything associated * Frees the current "libcurl easy" handle, and everything associated
* with it. * with it.
...@@ -367,7 +396,8 @@ input_curl_easy_free(struct input_curl *c) ...@@ -367,7 +396,8 @@ input_curl_easy_free(struct input_curl *c)
if (c->easy == nullptr) if (c->easy == nullptr)
return; return;
curl_multi_remove_handle(curl.multi, c->easy); curl_multi->Remove(c);
curl_easy_cleanup(c->easy); curl_easy_cleanup(c->easy);
c->easy = nullptr; c->easy = nullptr;
...@@ -386,7 +416,7 @@ input_curl_easy_free_indirect(struct input_curl *c) ...@@ -386,7 +416,7 @@ input_curl_easy_free_indirect(struct input_curl *c)
{ {
BlockingCall(io_thread_get(), [c](){ BlockingCall(io_thread_get(), [c](){
input_curl_easy_free(c); input_curl_easy_free(c);
curl.sockets->InvalidateSockets(); curl_multi->InvalidateSockets();
}); });
assert(c->easy == nullptr); assert(c->easy == nullptr);
...@@ -439,28 +469,23 @@ input_curl_handle_done(CURL *easy_handle, CURLcode result) ...@@ -439,28 +469,23 @@ input_curl_handle_done(CURL *easy_handle, CURLcode result)
* *
* Runs in the I/O thread. The caller must not hold locks. * Runs in the I/O thread. The caller must not hold locks.
*/ */
static void inline void
input_curl_info_read(void) CurlMulti::ReadInfo()
{ {
assert(io_thread_inside()); assert(io_thread_inside());
CURLMsg *msg; CURLMsg *msg;
int msgs_in_queue; int msgs_in_queue;
while ((msg = curl_multi_info_read(curl.multi, while ((msg = curl_multi_info_read(multi,
&msgs_in_queue)) != nullptr) { &msgs_in_queue)) != nullptr) {
if (msg->msg == CURLMSG_DONE) if (msg->msg == CURLMSG_DONE)
input_curl_handle_done(msg->easy_handle, msg->data.result); input_curl_handle_done(msg->easy_handle, msg->data.result);
} }
} }
/** inline void
* Give control to CURL. CurlMulti::Perform()
*
* Runs in the I/O thread. The caller must not hold locks.
*/
static void
input_curl_perform(void)
{ {
assert(io_thread_inside()); assert(io_thread_inside());
...@@ -468,7 +493,7 @@ input_curl_perform(void) ...@@ -468,7 +493,7 @@ input_curl_perform(void)
do { do {
int running_handles; int running_handles;
mcode = curl_multi_perform(curl.multi, &running_handles); mcode = curl_multi_perform(multi, &running_handles);
} while (mcode == CURLM_CALL_MULTI_PERFORM); } while (mcode == CURLM_CALL_MULTI_PERFORM);
if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM)
...@@ -478,12 +503,12 @@ input_curl_perform(void) ...@@ -478,12 +503,12 @@ input_curl_perform(void)
} }
int int
CurlSockets::PrepareSockets() CurlMulti::PrepareSockets()
{ {
UpdateSockets(); UpdateSockets();
long timeout2; long timeout2;
CURLMcode mcode = curl_multi_timeout(curl.multi, &timeout2); CURLMcode mcode = curl_multi_timeout(multi, &timeout2);
if (mcode == CURLM_OK) { if (mcode == CURLM_OK) {
if (timeout2 >= 0 && timeout2 < 10) if (timeout2 >= 0 && timeout2 < 10)
/* CURL 7.21.1 likes to report "timeout=0", /* CURL 7.21.1 likes to report "timeout=0",
...@@ -502,10 +527,10 @@ CurlSockets::PrepareSockets() ...@@ -502,10 +527,10 @@ CurlSockets::PrepareSockets()
} }
void void
CurlSockets::DispatchSockets() CurlMulti::DispatchSockets()
{ {
input_curl_perform(); Perform();
input_curl_info_read(); ReadInfo();
} }
/* /*
...@@ -540,13 +565,13 @@ input_curl_init(const config_param &param, Error &error) ...@@ -540,13 +565,13 @@ input_curl_init(const config_param &param, Error &error)
""); "");
} }
curl.multi = curl_multi_init(); CURLM *multi = curl_multi_init();
if (curl.multi == nullptr) { if (multi == nullptr) {
error.Set(curl_domain, 0, "curl_multi_init() failed"); error.Set(curl_domain, 0, "curl_multi_init() failed");
return false; return false;
} }
curl.sockets = new CurlSockets(io_thread_get()); curl_multi = new CurlMulti(io_thread_get(), multi);
return true; return true;
} }
...@@ -555,11 +580,9 @@ static void ...@@ -555,11 +580,9 @@ static void
input_curl_finish(void) input_curl_finish(void)
{ {
BlockingCall(io_thread_get(), [](){ BlockingCall(io_thread_get(), [](){
delete curl.sockets; delete curl_multi;
}); });
curl_multi_cleanup(curl.multi);
curl_slist_free_all(http_200_aliases); curl_slist_free_all(http_200_aliases);
curl_global_cleanup(); curl_global_cleanup();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment