OpenLB 1.7
Loading...
Searching...
No Matches
threadPool.h
Go to the documentation of this file.
1/* This file is part of the OpenLB library
2 *
3 * Copyright (C) 2022 Adrian Kummerlaender
4 * E-mail contact: info@openlb.net
5 * The most recent release of OpenLB can be downloaded at
6 * <http://www.openlb.net/>
7 *
8 * This program is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU General Public License
10 * as published by the Free Software Foundation; either version 2
11 * of the License, or (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public
19 * License along with this program; if not, write to the Free
20 * Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
22 */
23
24#ifndef THREAD_POOL_H
25#define THREAD_POOL_H
26
27#include <thread>
28#include <mutex>
29#include <atomic>
30#include <condition_variable>
31#include <queue>
32#include <functional>
33#include <future>
34#include <vector>
35
36#include "core/olbDebug.h"
37#include "io/ostreamManager.h"
38
39namespace olb {
40
42
48private:
49 void work(unsigned iThread)
50 {
51 while (_active) {
52 std::unique_lock lock(_mutex);
53 _available.wait(lock, [&]() {
54 return !_queue.empty()
55 || !_active;
56 });
57 if (_active) {
58 std::function<void()> task = std::move(_queue.front());
59 _queue.pop();
60 lock.unlock();
61 task();
62 --_taskCount;
63 if (_waiting) {
64 _done.notify_one();
65 }
66 }
67 }
68 }
69
70 mutable std::mutex _mutex;
71
72 std::atomic<bool> _waiting;
73 std::atomic<bool> _active;
74
75 std::atomic<std::size_t> _taskCount;
76
77 std::condition_variable _available;
78 std::condition_variable _done;
79
80 std::vector<std::thread> _threads;
81 std::queue<std::function<void()>> _queue;
82
83 bool _initialized;
84
85public:
87 _mutex{},
88 _waiting{false},
89 _active{true},
90 _taskCount{0},
91 _available{},
92 _done{},
93 _threads{1},
94 _queue{},
95 _initialized{false}
96 { }
97
99 void init(int nThreads, bool verbose)
100 {
101 OLB_PRECONDITION(!_initialized);
102 OstreamManager clout(std::cout, "ThreadPool");
103 if (nThreads > 1) {
104 _threads.resize(nThreads);
105 }
106 for (unsigned iThread=0; iThread < _threads.size(); ++iThread) {
107 _threads[iThread] = std::thread(&ThreadPool::work, this, iThread);
108 }
109 if (verbose) {
110 clout << "Sucessfully initialized, numThreads=" << std::to_string(_threads.size()) << std::endl;
111 }
112 _initialized = true;
113 }
114
116 {
117 _active = false;
118 _available.notify_all();
119 for (std::thread& thread : _threads) {
120 thread.join();
121 }
122 }
123
125 unsigned size() const
126 {
127 return _threads.size();
128 }
129
131 template <typename F>
133 {
134 OLB_PRECONDITION(_initialized);
135 {
136 const std::scoped_lock lock(_mutex);
137 _queue.push(f);
138 }
139 ++_taskCount;
140 _available.notify_one();
141 }
142
144 template <typename F, typename R = std::invoke_result_t<std::decay_t<F>>>
145 std::future<R> schedule(F&& f)
146 {
147 OLB_PRECONDITION(_initialized);
148 auto packagedF = std::make_shared<std::packaged_task<R()>>(f);
149 {
150 const std::scoped_lock lock(_mutex);
151 _queue.push([packagedF]() {
152 (*packagedF)();
153 });
154 }
155 ++_taskCount;
156 _available.notify_one();
157 return packagedF->get_future();
158 }
159
161 void wait()
162 {
163 OLB_PRECONDITION(_initialized);
164 _waiting = true;
165 std::unique_lock lock(_mutex);
166 _done.wait(lock, [&]() {
167 return _taskCount == 0;
168 });
169 _waiting = false;
170 }
171
173 template <typename T>
174 void waitFor(std::vector<std::future<T>>& futures)
175 {
176 for (auto& future : futures) {
177 future.wait();
178 }
179 }
180
181};
182
183}
184
185#endif
class for marking output with some text
Pool of threads for CPU-based background processing.
Definition threadPool.h:47
void init(int nThreads, bool verbose)
Initialization to be called by olbInit.
Definition threadPool.h:99
void scheduleAndForget(F &&f)
Schedule F, tracking neither its return value nor completion.
Definition threadPool.h:132
unsigned size() const
Returns number of threads.
Definition threadPool.h:125
std::future< R > schedule(F &&f)
Schedule F and return future of its return value.
Definition threadPool.h:145
void waitFor(std::vector< std::future< T > > &futures)
Blocks until all tasks producing the given futures are completed.
Definition threadPool.h:174
void wait()
Blocks until all tasks are completed.
Definition threadPool.h:161
Top level namespace for all of OpenLB.
#define OLB_PRECONDITION(COND)
Definition olbDebug.h:46