Commit 6cb72539 authored by Max Kellermann's avatar Max Kellermann

db/upnp/WorkQueue: rename attributes

parent c13facda
...@@ -60,27 +60,27 @@ class WorkQueue { ...@@ -60,27 +60,27 @@ class WorkQueue {
}; };
// Configuration // Configuration
std::string m_name; const std::string name;
size_t m_high; const size_t high;
size_t m_low; const size_t low;
// Status // Status
// Worker threads having called exit // Worker threads having called exit
unsigned int m_workers_exited; unsigned n_workers_exited;
bool m_ok; bool ok;
// Per-thread data. The data is not used currently, this could be // Per-thread data. The data is not used currently, this could be
// a set<pthread_t> // a set<pthread_t>
std::unordered_map<pthread_t, WQTData> m_worker_threads; std::unordered_map<pthread_t, WQTData> threads;
// Synchronization // Synchronization
std::queue<T> m_queue; std::queue<T> queue;
Cond m_ccond; Cond client_cond;
Cond m_wcond; Cond worker_cond;
Mutex m_mutex; Mutex mutex;
// Client/Worker threads currently waiting for a job // Client/Worker threads currently waiting for a job
unsigned int m_clients_waiting; unsigned n_clients_waiting;
unsigned int m_workers_waiting; unsigned n_workers_waiting;
public: public:
/** Create a WorkQueue /** Create a WorkQueue
...@@ -89,11 +89,11 @@ public: ...@@ -89,11 +89,11 @@ public:
* meaning no limit. hi == -1 means that the queue is disabled. * meaning no limit. hi == -1 means that the queue is disabled.
* @param lo minimum count of tasks before worker starts. Default 1. * @param lo minimum count of tasks before worker starts. Default 1.
*/ */
WorkQueue(const char *name, size_t hi = 0, size_t lo = 1) WorkQueue(const char *_name, size_t hi = 0, size_t lo = 1)
:m_name(name), m_high(hi), m_low(lo), :name(_name), high(hi), low(lo),
m_workers_exited(0), n_workers_exited(0),
m_ok(true), ok(true),
m_clients_waiting(0), m_workers_waiting(0) n_clients_waiting(0), n_workers_waiting(0)
{ {
} }
...@@ -111,17 +111,17 @@ public: ...@@ -111,17 +111,17 @@ public:
*/ */
bool start(int nworkers, void *(*workproc)(void *), void *arg) bool start(int nworkers, void *(*workproc)(void *), void *arg)
{ {
const ScopeLock protect(m_mutex); const ScopeLock protect(mutex);
for (int i = 0; i < nworkers; i++) { for (int i = 0; i < nworkers; i++) {
int err; int err;
pthread_t thr; pthread_t thr;
if ((err = pthread_create(&thr, 0, workproc, arg))) { if ((err = pthread_create(&thr, 0, workproc, arg))) {
LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n", LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
m_name.c_str(), err)); name.c_str(), err));
return false; return false;
} }
m_worker_threads.insert(std::make_pair(thr, WQTData())); threads.insert(std::make_pair(thr, WQTData()));
} }
return true; return true;
} }
...@@ -132,29 +132,29 @@ public: ...@@ -132,29 +132,29 @@ public:
*/ */
bool put(T t) bool put(T t)
{ {
const ScopeLock protect(m_mutex); const ScopeLock protect(mutex);
if (!ok()) { if (!IsOK()) {
LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n", LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n",
m_name.c_str())); name.c_str()));
return false; return false;
} }
while (ok() && m_high > 0 && m_queue.size() >= m_high) { while (IsOK() && high > 0 && queue.size() >= high) {
// Keep the order: we test ok() AFTER the sleep... // Keep the order: we test IsOK() AFTER the sleep...
m_clients_waiting++; n_clients_waiting++;
m_ccond.wait(m_mutex); client_cond.wait(mutex);
if (!ok()) { if (!IsOK()) {
m_clients_waiting--; n_clients_waiting--;
return false; return false;
} }
m_clients_waiting--; n_clients_waiting--;
} }
m_queue.push(t); queue.push(t);
if (m_workers_waiting > 0) { if (n_workers_waiting > 0) {
// Just wake one worker, there is only one new task. // Just wake one worker, there is only one new task.
m_wcond.signal(); worker_cond.signal();
} }
return true; return true;
...@@ -179,24 +179,24 @@ public: ...@@ -179,24 +179,24 @@ public:
*/ */
bool waitIdle() bool waitIdle()
{ {
const ScopeLock protect(m_mutex); const ScopeLock protect(mutex);
if (!ok()) { if (!IsOK()) {
LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n", LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n",
m_name.c_str())); name.c_str()));
return false; return false;
} }
// We're done when the queue is empty AND all workers are back // We're done when the queue is empty AND all workers are back
// waiting for a task. // waiting for a task.
while (ok() && (m_queue.size() > 0 || while (IsOK() && (queue.size() > 0 ||
m_workers_waiting != m_worker_threads.size())) { n_workers_waiting != threads.size())) {
m_clients_waiting++; n_clients_waiting++;
m_ccond.wait(m_mutex); client_cond.wait(mutex);
m_clients_waiting--; n_clients_waiting--;
} }
return ok(); return IsOK();
} }
...@@ -207,33 +207,33 @@ public: ...@@ -207,33 +207,33 @@ public:
*/ */
void setTerminateAndWait() void setTerminateAndWait()
{ {
const ScopeLock protect(m_mutex); const ScopeLock protect(mutex);
if (m_worker_threads.empty()) if (threads.empty())
// Already called ? // Already called ?
return; return;
// Wait for all worker threads to have called workerExit() // Wait for all worker threads to have called workerExit()
m_ok = false; ok = false;
while (m_workers_exited < m_worker_threads.size()) { while (n_workers_exited < threads.size()) {
m_wcond.broadcast(); worker_cond.broadcast();
m_clients_waiting++; n_clients_waiting++;
m_ccond.wait(m_mutex); client_cond.wait(mutex);
m_clients_waiting--; n_clients_waiting--;
} }
// Perform the thread joins and compute overall status // Perform the thread joins and compute overall status
// Workers return (void*)1 if ok // Workers return (void*)1 if ok
while (!m_worker_threads.empty()) { while (!threads.empty()) {
void *status; void *status;
auto it = m_worker_threads.begin(); auto it = threads.begin();
pthread_join(it->first, &status); pthread_join(it->first, &status);
m_worker_threads.erase(it); threads.erase(it);
} }
// Reset to start state. // Reset to start state.
m_workers_exited = m_clients_waiting = m_workers_waiting = 0; n_workers_exited = n_clients_waiting = n_workers_waiting = 0;
m_ok = true; ok = true;
} }
/** Take task from queue. Called from worker. /** Take task from queue. Called from worker.
...@@ -243,34 +243,34 @@ public: ...@@ -243,34 +243,34 @@ public:
*/ */
bool take(T &tp) bool take(T &tp)
{ {
const ScopeLock protect(m_mutex); const ScopeLock protect(mutex);
if (!ok()) { if (!IsOK()) {
return false; return false;
} }
while (ok() && m_queue.size() < m_low) { while (IsOK() && queue.size() < low) {
m_workers_waiting++; n_workers_waiting++;
if (m_queue.empty()) if (queue.empty())
m_ccond.broadcast(); client_cond.broadcast();
m_wcond.wait(m_mutex); worker_cond.wait(mutex);
if (!ok()) { if (!IsOK()) {
// !ok is a normal condition when shutting down // !ok is a normal condition when shutting down
if (ok()) { if (IsOK()) {
LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n", LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
m_name.c_str())); name.c_str()));
} }
m_workers_waiting--; n_workers_waiting--;
return false; return false;
} }
m_workers_waiting--; n_workers_waiting--;
} }
tp = m_queue.front(); tp = queue.front();
m_queue.pop(); queue.pop();
if (m_clients_waiting > 0) { if (n_clients_waiting > 0) {
// No reason to wake up more than one client thread // No reason to wake up more than one client thread
m_ccond.signal(); client_cond.signal();
} }
return true; return true;
} }
...@@ -279,23 +279,23 @@ public: ...@@ -279,23 +279,23 @@ public:
* *
* This would happen after an unrecoverable error, or when * This would happen after an unrecoverable error, or when
* the queue is terminated by the client. Workers never exit normally, * the queue is terminated by the client. Workers never exit normally,
* except when the queue is shut down (at which point m_ok is set to * except when the queue is shut down (at which point ok is set to
* false by the shutdown code anyway). The thread must return/exit * false by the shutdown code anyway). The thread must return/exit
* immediately after calling this. * immediately after calling this.
*/ */
void workerExit() void workerExit()
{ {
const ScopeLock protect(m_mutex); const ScopeLock protect(mutex);
m_workers_exited++; n_workers_exited++;
m_ok = false; ok = false;
m_ccond.broadcast(); client_cond.broadcast();
} }
private: private:
bool ok() bool IsOK()
{ {
return m_ok && m_workers_exited == 0 && !m_worker_threads.empty(); return ok && n_workers_exited == 0 && !threads.empty();
} }
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment