Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
mpd
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Иван Мажукин
mpd
Commits
4f021cbc
Commit
4f021cbc
authored
Aug 24, 2011
by
Max Kellermann
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
input/curl: use the I/O thread
Background buffering and better timeout handling. This patch sort of obsoletes the input_plugin method buffer().
parent
ba31d176
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
652 additions
and
221 deletions
+652
-221
curl_input_plugin.c
src/input/curl_input_plugin.c
+652
-221
No files found.
src/input/curl_input_plugin.c
View file @
4f021cbc
...
...
@@ -23,6 +23,7 @@
#include "conf.h"
#include "tag.h"
#include "icy_metadata.h"
#include "io_thread.h"
#include "glib_compat.h"
#include <assert.h>
...
...
@@ -73,14 +74,29 @@ struct input_curl {
/** the curl handles */
CURL
*
easy
;
CURLM
*
multi
;
/** the GMainLoop source used to poll all CURL file
descriptors */
GSource
*
source
;
/** the source id of #source */
guint
source_id
;
/** a linked list of all registered GPollFD objects */
GSList
*
fds
;
/** list of buffers, where input_curl_writefunction() appends
to, and input_curl_read() reads from them */
GQueue
*
buffers
;
/** has something been added to the buffers list? */
bool
buffered
;
#if LIBCURL_VERSION_NUM >= 0x071200
/**
* Is the connection currently paused? That happens when the
* buffer was getting too large. It will be unpaused when the
* buffer is below the threshold again.
*/
bool
paused
;
#endif
/** error message provided by libcurl */
char
error
[
CURL_ERROR_SIZE
];
...
...
@@ -94,6 +110,8 @@ struct input_curl {
/** the tag object ready to be requested via
input_stream_tag() */
struct
tag
*
tag
;
GError
*
postponed_error
;
};
/** libcurl should accept "ICY 200 OK" */
...
...
@@ -103,12 +121,525 @@ static struct curl_slist *http_200_aliases;
static
const
char
*
proxy
,
*
proxy_user
,
*
proxy_password
;
static
unsigned
proxy_port
;
static
struct
{
GStaticMutex
mutex
;
GCond
*
cond
;
CURLM
*
multi
;
/**
* A linked list of all active HTTP requests. An active
* request is one that doesn't have the "eof" flag set.
*/
GSList
*
requests
;
/**
* The GMainLoop source used to poll all CURL file
* descriptors.
*/
GSource
*
source
;
/**
* The source id of #source.
*/
guint
source_id
;
GSList
*
fds
;
/**
* When this is non-zero, then an update of #fds is scheduled.
*/
guint
dirty_source_id
;
#if LIBCURL_VERSION_NUM >= 0x070f04
/**
* Did CURL give us a timeout? If yes, then we need to call
* curl_multi_perform(), even if there was no event on any
* file descriptor.
*/
bool
timeout
;
/**
* The absolute time stamp when the timeout expires. This is
* used in the GSource method check().
*/
GTimeVal
absolute_timeout
;
#endif
}
curl
=
{
.
mutex
=
G_STATIC_MUTEX_INIT
,
};
static
inline
GQuark
curl_quark
(
void
)
{
return
g_quark_from_static_string
(
"curl"
);
}
/**
* Find a request by its CURL "easy" handle.
*
* The caller must lock the mutex.
*/
static
struct
input_curl
*
input_curl_find_request
(
CURL
*
easy
)
{
for
(
GSList
*
i
=
curl
.
requests
;
i
!=
NULL
;
i
=
g_slist_next
(
i
))
{
struct
input_curl
*
c
=
i
->
data
;
if
(
c
->
easy
==
easy
)
return
c
;
}
return
NULL
;
}
#if LIBCURL_VERSION_NUM >= 0x071200
static
gpointer
input_curl_resume
(
gpointer
data
)
{
struct
input_curl
*
c
=
data
;
if
(
c
->
paused
)
{
curl_easy_pause
(
c
->
easy
,
CURLPAUSE_CONT
);
c
->
paused
=
false
;
}
return
NULL
;
}
#endif
/**
* Calculates the GLib event bit mask for one file descriptor,
* obtained from three #fd_set objects filled by curl_multi_fdset().
*/
static
gushort
input_curl_fd_events
(
int
fd
,
fd_set
*
rfds
,
fd_set
*
wfds
,
fd_set
*
efds
)
{
gushort
events
=
0
;
if
(
FD_ISSET
(
fd
,
rfds
))
{
events
|=
G_IO_IN
|
G_IO_HUP
|
G_IO_ERR
;
FD_CLR
(
fd
,
rfds
);
}
if
(
FD_ISSET
(
fd
,
wfds
))
{
events
|=
G_IO_OUT
|
G_IO_ERR
;
FD_CLR
(
fd
,
wfds
);
}
if
(
FD_ISSET
(
fd
,
efds
))
{
events
|=
G_IO_HUP
|
G_IO_ERR
;
FD_CLR
(
fd
,
efds
);
}
return
events
;
}
/**
* Updates all registered GPollFD objects, unregisters old ones,
* registers new ones.
*
* The caller must lock the mutex. Runs in the I/O thread.
*/
static
void
curl_update_fds
(
void
)
{
fd_set
rfds
,
wfds
,
efds
;
FD_ZERO
(
&
rfds
);
FD_ZERO
(
&
wfds
);
FD_ZERO
(
&
efds
);
int
max_fd
;
CURLMcode
mcode
=
curl_multi_fdset
(
curl
.
multi
,
&
rfds
,
&
wfds
,
&
efds
,
&
max_fd
);
if
(
mcode
!=
CURLM_OK
)
{
g_warning
(
"curl_multi_fdset() failed: %s
\n
"
,
curl_multi_strerror
(
mcode
));
return
;
}
GSList
*
fds
=
curl
.
fds
;
curl
.
fds
=
NULL
;
while
(
fds
!=
NULL
)
{
GPollFD
*
poll_fd
=
fds
->
data
;
gushort
events
=
input_curl_fd_events
(
poll_fd
->
fd
,
&
rfds
,
&
wfds
,
&
efds
);
assert
(
poll_fd
->
events
!=
0
);
fds
=
g_slist_remove
(
fds
,
poll_fd
);
if
(
events
!=
poll_fd
->
events
)
g_source_remove_poll
(
curl
.
source
,
poll_fd
);
if
(
events
!=
0
)
{
if
(
events
!=
poll_fd
->
events
)
{
poll_fd
->
events
=
events
;
g_source_add_poll
(
curl
.
source
,
poll_fd
);
}
curl
.
fds
=
g_slist_prepend
(
curl
.
fds
,
poll_fd
);
}
else
{
g_free
(
poll_fd
);
}
}
for
(
int
fd
=
0
;
fd
<=
max_fd
;
++
fd
)
{
gushort
events
=
input_curl_fd_events
(
fd
,
&
rfds
,
&
wfds
,
&
efds
);
if
(
events
!=
0
)
{
GPollFD
*
poll_fd
=
g_new
(
GPollFD
,
1
);
poll_fd
->
fd
=
fd
;
poll_fd
->
events
=
events
;
g_source_add_poll
(
curl
.
source
,
poll_fd
);
curl
.
fds
=
g_slist_prepend
(
curl
.
fds
,
poll_fd
);
}
}
}
/**
* Callback for curl_schedule_update() that runs in the I/O thread.
*/
static
gboolean
input_curl_dirty_callback
(
G_GNUC_UNUSED
gpointer
data
)
{
g_static_mutex_lock
(
&
curl
.
mutex
);
assert
(
curl
.
dirty_source_id
!=
0
||
curl
.
requests
==
NULL
);
curl
.
dirty_source_id
=
0
;
curl_update_fds
();
g_static_mutex_unlock
(
&
curl
.
mutex
);
return
false
;
}
/**
* Schedule a refresh of curl.fds. Does nothing if that is already
* scheduled.
*
* The caller must lock the mutex.
*/
static
void
input_curl_schedule_update
(
void
)
{
if
(
curl
.
dirty_source_id
!=
0
)
/* already scheduled */
return
;
curl
.
dirty_source_id
=
io_thread_idle_add
(
input_curl_dirty_callback
,
NULL
);
}
static
bool
input_curl_easy_add
(
struct
input_curl
*
c
,
GError
**
error_r
)
{
assert
(
c
!=
NULL
);
assert
(
c
->
easy
!=
NULL
);
assert
(
input_curl_find_request
(
c
->
easy
)
==
NULL
);
curl
.
requests
=
g_slist_prepend
(
curl
.
requests
,
c
);
CURLMcode
mcode
=
curl_multi_add_handle
(
curl
.
multi
,
c
->
easy
);
if
(
mcode
!=
CURLM_OK
)
{
g_set_error
(
error_r
,
curl_quark
(),
mcode
,
"curl_multi_add_handle() failed: %s"
,
curl_multi_strerror
(
mcode
));
return
false
;
}
input_curl_schedule_update
();
return
true
;
}
/**
* Frees the current "libcurl easy" handle, and everything associated
* with it.
*
* The caller must lock the mutex.
*/
static
void
input_curl_easy_free
(
struct
input_curl
*
c
)
{
assert
(
c
!=
NULL
);
if
(
c
->
easy
==
NULL
)
return
;
curl
.
requests
=
g_slist_remove
(
curl
.
requests
,
c
);
curl_multi_remove_handle
(
curl
.
multi
,
c
->
easy
);
curl_easy_cleanup
(
c
->
easy
);
c
->
easy
=
NULL
;
curl_slist_free_all
(
c
->
request_headers
);
c
->
request_headers
=
NULL
;
g_free
(
c
->
range
);
c
->
range
=
NULL
;
c
->
base
.
ready
=
true
;
}
static
gpointer
input_curl_easy_free_callback
(
gpointer
data
)
{
struct
input_curl
*
c
=
data
;
g_static_mutex_lock
(
&
curl
.
mutex
);
input_curl_easy_free
(
c
);
curl_update_fds
();
g_static_mutex_unlock
(
&
curl
.
mutex
);
return
NULL
;
}
/**
* Frees the current "libcurl easy" handle, and everything associated
* with it.
*
* The mutex must not be locked.
*/
static
void
input_curl_easy_free_indirect
(
struct
input_curl
*
c
)
{
io_thread_call
(
input_curl_easy_free_callback
,
c
);
assert
(
c
->
easy
==
NULL
);
}
/**
* Aborts and frees a running HTTP request.
*
* The caller must lock the mutex. Runs in the I/O thread.
*/
static
void
input_curl_request_abort
(
struct
input_curl
*
c
,
GError
*
error
)
{
assert
(
c
!=
NULL
);
assert
(
c
->
postponed_error
==
NULL
);
assert
(
error
!=
NULL
);
input_curl_easy_free
(
c
);
c
->
postponed_error
=
error
;
g_cond_broadcast
(
curl
.
cond
);
}
/**
* Abort and free all HTTP requests.
*
* The caller must lock the mutex. Runs in the I/O thread.
*/
static
void
input_curl_abort_all_requests
(
GError
*
error
)
{
while
(
curl
.
requests
!=
NULL
)
{
struct
input_curl
*
is
=
curl
.
requests
->
data
;
input_curl_request_abort
(
is
,
g_error_copy
(
error
));
}
g_error_free
(
error
);
}
/**
* A HTTP request is finished.
*
* The caller must lock the mutex. Runs in the I/O thread.
*/
static
void
input_curl_request_done
(
struct
input_curl
*
c
,
CURLcode
result
,
long
status
)
{
assert
(
c
->
easy
==
NULL
);
assert
(
c
->
base
.
ready
);
if
(
result
!=
CURLE_OK
)
{
GError
*
error
=
g_error_new
(
curl_quark
(),
result
,
"curl failed: %s"
,
c
->
error
);
input_curl_request_abort
(
c
,
error
);
}
else
if
(
status
<
200
||
status
>=
300
)
{
GError
*
error
=
g_error_new
(
curl_quark
(),
0
,
"got HTTP status %ld"
,
status
);
input_curl_request_abort
(
c
,
error
);
}
else
{
g_cond_broadcast
(
curl
.
cond
);
}
}
static
void
input_curl_handle_done
(
CURL
*
easy_handle
,
CURLcode
result
)
{
struct
input_curl
*
c
=
input_curl_find_request
(
easy_handle
);
assert
(
c
!=
NULL
);
long
status
=
0
;
curl_easy_getinfo
(
easy_handle
,
CURLINFO_RESPONSE_CODE
,
&
status
);
input_curl_easy_free
(
c
);
input_curl_request_done
(
c
,
result
,
status
);
}
/**
* Check for finished HTTP responses.
*
* The caller must lock the mutex. Runs in the I/O thread.
*/
static
void
input_curl_info_read
(
void
)
{
CURLMsg
*
msg
;
int
msgs_in_queue
;
while
((
msg
=
curl_multi_info_read
(
curl
.
multi
,
&
msgs_in_queue
))
!=
NULL
)
{
if
(
msg
->
msg
==
CURLMSG_DONE
)
input_curl_handle_done
(
msg
->
easy_handle
,
msg
->
data
.
result
);
}
}
/**
* Give control to CURL.
*
* The caller must lock the mutex. Runs in the I/O thread.
*/
static
bool
input_curl_perform
(
void
)
{
CURLMcode
mcode
;
do
{
int
running_handles
;
mcode
=
curl_multi_perform
(
curl
.
multi
,
&
running_handles
);
}
while
(
mcode
==
CURLM_CALL_MULTI_PERFORM
);
if
(
mcode
!=
CURLM_OK
&&
mcode
!=
CURLM_CALL_MULTI_PERFORM
)
{
GError
*
error
=
g_error_new
(
curl_quark
(),
mcode
,
"curl_multi_perform() failed: %s"
,
curl_multi_strerror
(
mcode
));
input_curl_abort_all_requests
(
error
);
return
false
;
}
return
true
;
}
/*
* GSource methods
*
*/
/**
* The GSource prepare() method implementation.
*/
static
gboolean
input_curl_source_prepare
(
G_GNUC_UNUSED
GSource
*
source
,
gint
*
timeout_r
)
{
curl_update_fds
();
#if LIBCURL_VERSION_NUM >= 0x070f04
curl
.
timeout
=
false
;
long
timeout2
;
CURLMcode
mcode
=
curl_multi_timeout
(
curl
.
multi
,
&
timeout2
);
if
(
mcode
==
CURLM_OK
)
{
if
(
timeout2
>=
0
)
{
g_source_get_current_time
(
source
,
&
curl
.
absolute_timeout
);
g_time_val_add
(
&
curl
.
absolute_timeout
,
timeout2
*
1000
);
}
if
(
timeout2
>=
0
&&
timeout2
<
10
)
/* CURL 7.21.1 likes to report "timeout=0",
which means we're running in a busy loop.
Quite a bad idea to waste so much CPU.
Let's use a lower limit of 10ms. */
timeout2
=
10
;
*
timeout_r
=
timeout2
;
curl
.
timeout
=
timeout2
>=
0
;
}
else
g_warning
(
"curl_multi_timeout() failed: %s
\n
"
,
curl_multi_strerror
(
mcode
));
#else
(
void
)
timeout_r
;
#endif
return
false
;
}
/**
* The GSource check() method implementation.
*/
static
gboolean
input_curl_source_check
(
G_GNUC_UNUSED
GSource
*
source
)
{
#if LIBCURL_VERSION_NUM >= 0x070f04
if
(
curl
.
timeout
)
{
/* when a timeout has expired, we need to call
curl_multi_perform(), even if there was no file
descriptor event */
GTimeVal
now
;
g_source_get_current_time
(
source
,
&
now
);
if
(
now
.
tv_sec
>
curl
.
absolute_timeout
.
tv_sec
||
(
now
.
tv_sec
==
curl
.
absolute_timeout
.
tv_sec
&&
now
.
tv_usec
>=
curl
.
absolute_timeout
.
tv_usec
))
return
true
;
}
#endif
for
(
GSList
*
i
=
curl
.
fds
;
i
!=
NULL
;
i
=
i
->
next
)
{
GPollFD
*
poll_fd
=
i
->
data
;
if
(
poll_fd
->
revents
!=
0
)
return
true
;
}
return
false
;
}
/**
* The GSource dispatch() method implementation. The callback isn't
* used, because we're handling all events directly.
*/
static
gboolean
input_curl_source_dispatch
(
G_GNUC_UNUSED
GSource
*
source
,
G_GNUC_UNUSED
GSourceFunc
callback
,
G_GNUC_UNUSED
gpointer
user_data
)
{
g_static_mutex_lock
(
&
curl
.
mutex
);
if
(
input_curl_perform
())
input_curl_info_read
();
g_static_mutex_unlock
(
&
curl
.
mutex
);
return
true
;
}
/**
* The vtable for our GSource implementation. Unfortunately, we
* cannot declare it "const", because g_source_new() takes a non-const
* pointer, for whatever reason.
*/
static
GSourceFuncs
curl_source_funcs
=
{
.
prepare
=
input_curl_source_prepare
,
.
check
=
input_curl_source_check
,
.
dispatch
=
input_curl_source_dispatch
,
};
/*
* input_plugin methods
*
*/
static
bool
input_curl_init
(
const
struct
config_param
*
param
,
G_GNUC_UNUSED
GError
**
error_r
)
...
...
@@ -138,20 +669,61 @@ input_curl_init(const struct config_param *param,
""
);
}
curl
.
multi
=
curl_multi_init
();
if
(
curl
.
multi
==
NULL
)
{
g_set_error
(
error_r
,
curl_quark
(),
0
,
"curl_multi_init() failed"
);
return
false
;
}
curl
.
cond
=
g_cond_new
();
curl
.
source
=
g_source_new
(
&
curl_source_funcs
,
sizeof
(
*
curl
.
source
));
curl
.
source_id
=
g_source_attach
(
curl
.
source
,
io_thread_context
());
return
true
;
}
static
gpointer
curl_destroy_sources
(
G_GNUC_UNUSED
gpointer
data
)
{
g_source_destroy
(
curl
.
source
);
if
(
curl
.
dirty_source_id
!=
0
)
{
GSource
*
source
=
g_main_context_find_source_by_id
(
io_thread_context
(),
curl
.
dirty_source_id
);
assert
(
source
!=
NULL
);
curl
.
dirty_source_id
=
0
;
g_source_destroy
(
source
);
}
return
NULL
;
}
static
void
input_curl_finish
(
void
)
{
assert
(
curl
.
requests
==
NULL
);
io_thread_call
(
curl_destroy_sources
,
NULL
);
curl_multi_cleanup
(
curl
.
multi
);
g_cond_free
(
curl
.
cond
);
curl_slist_free_all
(
http_200_aliases
);
curl_global_cleanup
();
}
#if LIBCURL_VERSION_NUM >= 0x071200
/**
* Determine the total sizes of all buffers, including portions that
* have already been consumed.
*
* The caller must lock the mutex.
*/
G_GNUC_PURE
static
size_t
...
...
@@ -168,6 +740,8 @@ curl_total_buffer_size(const struct input_curl *c)
return
total
;
}
#endif
static
void
buffer_free_callback
(
gpointer
data
,
G_GNUC_UNUSED
gpointer
user_data
)
{
...
...
@@ -186,28 +760,6 @@ input_curl_flush_buffers(struct input_curl *c)
}
/**
* Frees the current "libcurl easy" handle, and everything associated
* with it.
*/
static
void
input_curl_easy_free
(
struct
input_curl
*
c
)
{
if
(
c
->
easy
!=
NULL
)
{
curl_multi_remove_handle
(
c
->
multi
,
c
->
easy
);
curl_easy_cleanup
(
c
->
easy
);
c
->
easy
=
NULL
;
}
curl_slist_free_all
(
c
->
request_headers
);
c
->
request_headers
=
NULL
;
g_free
(
c
->
range
);
c
->
range
=
NULL
;
c
->
base
.
ready
=
true
;
}
/**
* Frees this stream (but not the input_stream struct itself).
*/
static
void
...
...
@@ -217,12 +769,9 @@ input_curl_free(struct input_curl *c)
tag_free
(
c
->
tag
);
g_free
(
c
->
meta_name
);
input_curl_easy_free
(
c
);
input_curl_easy_free
_indirect
(
c
);
input_curl_flush_buffers
(
c
);
if
(
c
->
multi
!=
NULL
)
curl_multi_cleanup
(
c
->
multi
);
g_queue_free
(
c
->
buffers
);
g_free
(
c
->
url
);
...
...
@@ -241,116 +790,14 @@ input_curl_tag(struct input_stream *is)
}
static
bool
input_curl_multi_info_read
(
struct
input_curl
*
c
,
GError
**
error_r
)
{
CURLMsg
*
msg
;
int
msgs_in_queue
;
while
((
msg
=
curl_multi_info_read
(
c
->
multi
,
&
msgs_in_queue
))
!=
NULL
)
{
if
(
msg
->
msg
==
CURLMSG_DONE
)
{
CURLcode
result
=
msg
->
data
.
result
;
input_curl_easy_free
(
c
);
if
(
result
!=
CURLE_OK
)
{
g_set_error
(
error_r
,
curl_quark
(),
result
,
"curl failed: %s"
,
c
->
error
);
return
false
;
}
}
}
return
true
;
}
/**
* Wait for the libcurl socket.
*
* @return -1 on error, 0 if no data is available yet, 1 if data is
* available
*/
static
int
input_curl_select
(
struct
input_curl
*
c
,
GError
**
error_r
)
{
fd_set
rfds
,
wfds
,
efds
;
int
max_fd
,
ret
;
CURLMcode
mcode
;
struct
timeval
timeout
=
{
.
tv_sec
=
1
,
.
tv_usec
=
0
,
};
assert
(
c
->
easy
!=
NULL
);
FD_ZERO
(
&
rfds
);
FD_ZERO
(
&
wfds
);
FD_ZERO
(
&
efds
);
mcode
=
curl_multi_fdset
(
c
->
multi
,
&
rfds
,
&
wfds
,
&
efds
,
&
max_fd
);
if
(
mcode
!=
CURLM_OK
)
{
g_set_error
(
error_r
,
curl_quark
(),
mcode
,
"curl_multi_fdset() failed: %s"
,
curl_multi_strerror
(
mcode
));
return
-
1
;
}
#if LIBCURL_VERSION_NUM >= 0x070f04
long
timeout2
;
mcode
=
curl_multi_timeout
(
c
->
multi
,
&
timeout2
);
if
(
mcode
!=
CURLM_OK
)
{
g_warning
(
"curl_multi_timeout() failed: %s
\n
"
,
curl_multi_strerror
(
mcode
));
return
-
1
;
}
if
(
timeout2
>=
0
)
{
if
(
timeout2
>
10000
)
timeout2
=
10000
;
timeout
.
tv_sec
=
timeout2
/
1000
;
timeout
.
tv_usec
=
(
timeout2
%
1000
)
*
1000
;
}
#endif
ret
=
select
(
max_fd
+
1
,
&
rfds
,
&
wfds
,
&
efds
,
&
timeout
);
if
(
ret
<
0
)
g_set_error
(
error_r
,
g_quark_from_static_string
(
"errno"
),
errno
,
"select() failed: %s
\n
"
,
g_strerror
(
errno
));
return
ret
;
}
static
bool
fill_buffer
(
struct
input_curl
*
c
,
GError
**
error_r
)
{
CURLMcode
mcode
=
CURLM_CALL_MULTI_PERFORM
;
while
(
c
->
easy
!=
NULL
&&
g_queue_is_empty
(
c
->
buffers
))
g_cond_wait
(
curl
.
cond
,
g_static_mutex_get_mutex
(
&
curl
.
mutex
));
while
(
c
->
easy
!=
NULL
&&
g_queue_is_empty
(
c
->
buffers
))
{
int
running_handles
;
bool
bret
;
if
(
mcode
!=
CURLM_CALL_MULTI_PERFORM
)
{
/* if we're still here, there is no input yet
- wait for input */
int
ret
=
input_curl_select
(
c
,
error_r
);
if
(
ret
<=
0
)
/* no data yet or error */
return
false
;
}
mcode
=
curl_multi_perform
(
c
->
multi
,
&
running_handles
);
if
(
mcode
!=
CURLM_OK
&&
mcode
!=
CURLM_CALL_MULTI_PERFORM
)
{
g_set_error
(
error_r
,
curl_quark
(),
mcode
,
"curl_multi_perform() failed: %s"
,
curl_multi_strerror
(
mcode
));
input_curl_easy_free
(
c
);
return
false
;
}
bret
=
input_curl_multi_info_read
(
c
,
error_r
);
if
(
!
bret
)
if
(
c
->
postponed_error
!=
NULL
)
{
g_propagate_error
(
error_r
,
c
->
postponed_error
);
c
->
postponed_error
=
NULL
;
return
false
;
}
...
...
@@ -456,12 +903,16 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size,
size_t
nbytes
=
0
;
char
*
dest
=
ptr
;
g_static_mutex_lock
(
&
curl
.
mutex
);
do
{
/* fill the buffer */
success
=
fill_buffer
(
c
,
error_r
);
if
(
!
success
)
if
(
!
success
)
{
g_static_mutex_unlock
(
&
curl
.
mutex
);
return
0
;
}
/* send buffer contents */
...
...
@@ -479,6 +930,13 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size,
is
->
offset
+=
(
goffset
)
nbytes
;
#if LIBCURL_VERSION_NUM >= 0x071200
if
(
c
->
paused
&&
curl_total_buffer_size
(
c
)
<
CURL_MAX_BUFFERED
)
io_thread_call
(
input_curl_resume
,
c
);
#endif
g_static_mutex_unlock
(
&
curl
.
mutex
);
return
nbytes
;
}
...
...
@@ -495,7 +953,11 @@ input_curl_eof(G_GNUC_UNUSED struct input_stream *is)
{
struct
input_curl
*
c
=
(
struct
input_curl
*
)
is
;
return
c
->
easy
==
NULL
&&
g_queue_is_empty
(
c
->
buffers
);
g_static_mutex_lock
(
&
curl
.
mutex
);
bool
eof
=
c
->
easy
==
NULL
&&
g_queue_is_empty
(
c
->
buffers
);
g_static_mutex_unlock
(
&
curl
.
mutex
);
return
eof
;
}
static
int
...
...
@@ -503,40 +965,21 @@ input_curl_buffer(struct input_stream *is, GError **error_r)
{
struct
input_curl
*
c
=
(
struct
input_curl
*
)
is
;
if
(
curl_total_buffer_size
(
c
)
>=
CURL_MAX_BUFFERED
)
return
0
;
CURLMcode
mcode
;
int
running_handles
;
bool
ret
;
c
->
buffered
=
false
;
g_static_mutex_lock
(
&
curl
.
mutex
);
if
(
!
is
->
ready
&&
c
->
easy
!=
NULL
)
/* not ready yet means the caller is waiting in a busy
loop; relax that by calling select() on the
socket */
if
(
input_curl_select
(
c
,
error_r
)
<
0
)
return
-
1
;
int
result
;
if
(
c
->
postponed_error
!=
NULL
)
{
g_propagate_error
(
error_r
,
c
->
postponed_error
);
c
->
postponed_error
=
NULL
;
result
=
-
1
;
}
else
if
(
g_queue_is_empty
(
c
->
buffers
))
result
=
0
;
else
result
=
1
;
do
{
mcode
=
curl_multi_perform
(
c
->
multi
,
&
running_handles
);
}
while
(
mcode
==
CURLM_CALL_MULTI_PERFORM
&&
g_queue_is_empty
(
c
->
buffers
));
g_static_mutex_unlock
(
&
curl
.
mutex
);
if
(
mcode
!=
CURLM_OK
&&
mcode
!=
CURLM_CALL_MULTI_PERFORM
)
{
g_set_error
(
error_r
,
curl_quark
(),
mcode
,
"curl_multi_perform() failed: %s"
,
curl_multi_strerror
(
mcode
));
input_curl_easy_free
(
c
);
return
-
1
;
}
ret
=
input_curl_multi_info_read
(
c
,
error_r
);
if
(
!
ret
)
return
-
1
;
return
c
->
buffered
;
return
result
;
}
/** called by curl when new data is available */
...
...
@@ -634,15 +1077,24 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
if
(
size
==
0
)
return
0
;
#if LIBCURL_VERSION_NUM >= 0x071200
if
(
curl_total_buffer_size
(
c
)
+
size
>=
CURL_MAX_BUFFERED
)
{
c
->
paused
=
true
;
return
CURL_WRITEFUNC_PAUSE
;
}
#endif
buffer
=
g_malloc
(
sizeof
(
*
buffer
)
-
sizeof
(
buffer
->
data
)
+
size
);
buffer
->
size
=
size
;
buffer
->
consumed
=
0
;
memcpy
(
buffer
->
data
,
ptr
,
size
);
g_queue_push_tail
(
c
->
buffers
,
buffer
);
c
->
buffered
=
true
;
c
->
base
.
ready
=
true
;
g_cond_broadcast
(
curl
.
cond
);
return
size
;
}
...
...
@@ -650,7 +1102,6 @@ static bool
input_curl_easy_init
(
struct
input_curl
*
c
,
GError
**
error_r
)
{
CURLcode
code
;
CURLMcode
mcode
;
c
->
easy
=
curl_easy_init
();
if
(
c
->
easy
==
NULL
)
{
...
...
@@ -659,14 +1110,6 @@ input_curl_easy_init(struct input_curl *c, GError **error_r)
return
false
;
}
mcode
=
curl_multi_add_handle
(
c
->
multi
,
c
->
easy
);
if
(
mcode
!=
CURLM_OK
)
{
g_set_error
(
error_r
,
curl_quark
(),
mcode
,
"curl_multi_add_handle() failed: %s"
,
curl_multi_strerror
(
mcode
));
return
false
;
}
curl_easy_setopt
(
c
->
easy
,
CURLOPT_USERAGENT
,
"Music Player Daemon "
VERSION
);
curl_easy_setopt
(
c
->
easy
,
CURLOPT_HEADERFUNCTION
,
...
...
@@ -712,26 +1155,6 @@ input_curl_easy_init(struct input_curl *c, GError **error_r)
}
static
bool
input_curl_send_request
(
struct
input_curl
*
c
,
GError
**
error_r
)
{
CURLMcode
mcode
;
int
running_handles
;
do
{
mcode
=
curl_multi_perform
(
c
->
multi
,
&
running_handles
);
}
while
(
mcode
==
CURLM_CALL_MULTI_PERFORM
);
if
(
mcode
!=
CURLM_OK
)
{
g_set_error
(
error_r
,
curl_quark
(),
mcode
,
"curl_multi_perform() failed: %s"
,
curl_multi_strerror
(
mcode
));
return
false
;
}
return
true
;
}
static
bool
input_curl_seek
(
struct
input_stream
*
is
,
goffset
offset
,
int
whence
,
GError
**
error_r
)
{
...
...
@@ -796,7 +1219,7 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
/* close the old connection and open a new one */
input_curl_easy_free
(
c
);
input_curl_easy_free
_indirect
(
c
);
input_curl_flush_buffers
(
c
);
is
->
offset
=
offset
;
...
...
@@ -818,18 +1241,34 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence,
curl_easy_setopt
(
c
->
easy
,
CURLOPT_RANGE
,
c
->
range
);
}
ret
=
input_curl_send_request
(
c
,
error_r
);
if
(
!
ret
)
g_static_mutex_lock
(
&
curl
.
mutex
);
c
->
base
.
ready
=
false
;
if
(
!
input_curl_easy_add
(
c
,
error_r
))
{
g_static_mutex_unlock
(
&
curl
.
mutex
);
return
false
;
}
while
(
!
c
->
base
.
ready
)
g_cond_wait
(
curl
.
cond
,
g_static_mutex_get_mutex
(
&
curl
.
mutex
));
if
(
c
->
postponed_error
!=
NULL
)
{
g_propagate_error
(
error_r
,
c
->
postponed_error
);
c
->
postponed_error
=
NULL
;
g_static_mutex_unlock
(
&
curl
.
mutex
);
return
false
;
}
g_static_mutex_unlock
(
&
curl
.
mutex
);
return
input_curl_multi_info_read
(
c
,
error_r
)
;
return
true
;
}
static
struct
input_stream
*
input_curl_open
(
const
char
*
url
,
GError
**
error_r
)
{
struct
input_curl
*
c
;
bool
ret
;
if
(
strncmp
(
url
,
"http://"
,
7
)
!=
0
)
return
NULL
;
...
...
@@ -840,35 +1279,27 @@ input_curl_open(const char *url, GError **error_r)
c
->
url
=
g_strdup
(
url
);
c
->
buffers
=
g_queue_new
();
c
->
multi
=
curl_multi_init
();
if
(
c
->
multi
==
NULL
)
{
g_set_error
(
error_r
,
curl_quark
(),
0
,
"curl_multi_init() failed"
);
input_curl_free
(
c
);
return
NULL
;
}
icy_clear
(
&
c
->
icy_metadata
);
c
->
tag
=
NULL
;
ret
=
input_curl_easy_init
(
c
,
error_r
);
if
(
!
ret
)
{
input_curl_free
(
c
);
return
NULL
;
}
#if LIBCURL_VERSION_NUM >= 0x071200
c
->
paused
=
false
;
#endif
ret
=
input_curl_send_request
(
c
,
error_r
);
if
(
!
ret
)
{
if
(
!
input_curl_easy_init
(
c
,
error_r
))
{
input_curl_free
(
c
);
return
NULL
;
}
ret
=
input_curl_multi_info_read
(
c
,
error_r
);
if
(
!
ret
)
{
g_static_mutex_lock
(
&
curl
.
mutex
);
if
(
!
input_curl_easy_add
(
c
,
error_r
))
{
g_static_mutex_unlock
(
&
curl
.
mutex
);
input_curl_free
(
c
);
return
NULL
;
}
g_static_mutex_unlock
(
&
curl
.
mutex
);
return
&
c
->
base
;
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment