WorkQueue.hxx 5.02 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/*
 * Copyright (C) 2003-2014 The Music Player Daemon Project
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#ifndef _WORKQUEUE_H_INCLUDED_
#define _WORKQUEUE_H_INCLUDED_

#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"

26
#include <assert.h>
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
#include <pthread.h>

#include <string>
#include <queue>

#define LOGINFO(X)
#define LOGERR(X)

/**
 * A WorkQueue manages the synchronisation around a queue of work items,
 * where a number of client threads queue tasks and a number of worker
 * threads take and execute them. The goal is to introduce some level
 * of parallelism between the successive steps of a previously single
 * threaded pipeline. For example data extraction / data preparation / index
 * update, but this could have other uses.
 *
 * There is no individual task status return. In case of fatal error,
 * the client or worker sets an end condition on the queue. A second
 * queue could conceivably be used for returning individual task
 * status.
 */
template <class T>
class WorkQueue {
	// Configuration
51
	const std::string name;
52 53 54

	// Status
	// Worker threads having called exit
55 56
	unsigned n_workers_exited;
	bool ok;
57

58 59
	unsigned n_threads;
	pthread_t *threads;
60 61

	// Synchronization
62 63 64 65
	std::queue<T> queue;
	Cond client_cond;
	Cond worker_cond;
	Mutex mutex;
66 67 68 69 70 71 72 73

public:
	/** Create a WorkQueue
	 * @param name for message printing
	 * @param hi number of tasks on queue before clients blocks. Default 0
	 *    meaning no limit. hi == -1 means that the queue is disabled.
	 * @param lo minimum count of tasks before worker starts. Default 1.
	 */
74 75
	WorkQueue(const char *_name)
		:name(_name),
76
		 n_workers_exited(0),
77
		 ok(false),
78
		 n_threads(0), threads(nullptr)
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
	{
	}

	~WorkQueue() {
		setTerminateAndWait();
	}

	/** Start the worker threads.
	 *
	 * @param nworkers number of threads copies to start.
	 * @param start_routine thread function. It should loop
	 *      taking (QueueWorker::take()) and executing tasks.
	 * @param arg initial parameter to thread function.
	 * @return true if ok.
	 */
94
	bool start(unsigned nworkers, void *(*workproc)(void *), void *arg)
95
	{
96
		const ScopeLock protect(mutex);
97

98
		assert(nworkers > 0);
99
		assert(!ok);
100 101 102
		assert(n_threads == 0);
		assert(threads == nullptr);

103 104
		n_threads = nworkers;
		threads = new pthread_t[n_threads];
105

106
		for  (unsigned i = 0; i < nworkers; i++) {
107
			int err;
108
			if ((err = pthread_create(&threads[i], 0, workproc, arg))) {
109
				LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
110
					name.c_str(), err));
111 112 113
				return false;
			}
		}
114 115

		ok = true;
116 117 118 119 120 121 122
		return true;
	}

	/** Add item to work queue, called from client.
	 *
	 * Sleeps if there are already too many.
	 */
123 124
	template<typename U>
	bool put(U &&u)
125
	{
126
		const ScopeLock protect(mutex);
127

128
		queue.emplace(std::forward<U>(u));
129 130 131

		// Just wake one worker, there is only one new task.
		worker_cond.signal();
132 133 134 135 136 137 138 139 140

		return true;
	}


	/** Tell the workers to exit, and wait for them.
	 */
	void setTerminateAndWait()
	{
141
		const ScopeLock protect(mutex);
142 143

		// Wait for all worker threads to have called workerExit()
144
		ok = false;
145
		while (n_workers_exited < n_threads) {
146 147
			worker_cond.broadcast();
			client_cond.wait(mutex);
148 149 150 151
		}

		// Perform the thread joins and compute overall status
		// Workers return (void*)1 if ok
152
		for (unsigned i = 0; i < n_threads; ++i) {
153
			void *status;
154
			pthread_join(threads[i], &status);
155 156
		}

157 158 159 160
		delete[] threads;
		threads = nullptr;
		n_threads = 0;

161
		// Reset to start state.
162
		n_workers_exited = 0;
163 164 165 166 167 168 169
	}

	/** Take task from queue. Called from worker.
	 *
	 * Sleeps if there are not enough. Signal if we go to sleep on empty
	 * queue: client may be waiting for our going idle.
	 */
170
	bool take(T &tp)
171
	{
172
		const ScopeLock protect(mutex);
173

174
		if (!ok)
175 176
			return false;

177
		while (queue.empty()) {
178
			worker_cond.wait(mutex);
179
			if (!ok)
180 181 182
				return false;
		}

183
		tp = std::move(queue.front());
184
		queue.pop();
185 186 187 188 189 190 191
		return true;
	}

	/** Advertise exit and abort queue. Called from worker
	 *
	 * This would happen after an unrecoverable error, or when
	 * the queue is terminated by the client. Workers never exit normally,
192
	 * except when the queue is shut down (at which point ok is set to
193 194 195 196 197
	 * false by the shutdown code anyway). The thread must return/exit
	 * immediately after calling this.
	 */
	void workerExit()
	{
198
		const ScopeLock protect(mutex);
199

200 201 202
		n_workers_exited++;
		ok = false;
		client_cond.broadcast();
203 204 205 206
	}
};

#endif /* _WORKQUEUE_H_INCLUDED_ */