Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
parallel_for_mutex_pool.cpp
Go to the documentation of this file.
4#ifndef NO_MULTITHREADING
5#include "log.hpp"
6#include "thread.hpp"
7#include <atomic>
8#include <condition_variable>
9#include <functional>
10#include <mutex>
11#include <queue>
12#include <thread>
13#include <vector>
14
16
17namespace {
18
19class ThreadPool {
20 public:
21 ThreadPool(size_t num_threads);
22 ThreadPool(const ThreadPool& other) = delete;
23 ThreadPool(ThreadPool&& other) = delete;
24 ~ThreadPool();
25
26 ThreadPool& operator=(const ThreadPool& other) = delete;
27 ThreadPool& operator=(ThreadPool&& other) = delete;
28
29 void start_tasks(size_t num_iterations, const std::function<void(size_t)>& func)
30 {
32 {
33 std::unique_lock<std::mutex> lock(tasks_mutex);
34 task_ = func;
35 num_iterations_ = num_iterations;
36 iteration_ = 0;
37 complete_ = 0;
38 }
39 condition.notify_all();
40
41 do_iterations();
42
43 {
44 // BB_BENCH_NAME("spinning main thread");
45 std::unique_lock<std::mutex> lock(tasks_mutex);
46 complete_condition_.wait(lock, [this] { return complete_ == num_iterations_; });
47 }
48 }
49
50 private:
52 std::vector<std::thread> workers;
53 std::mutex tasks_mutex;
54 std::function<void(size_t)> task_;
55 size_t num_iterations_ = 0;
56 size_t iteration_ = 0;
57 size_t complete_ = 0;
59 std::condition_variable complete_condition_;
60 bool stop = false;
61
62 BB_NO_PROFILE void worker_loop(size_t thread_index);
63
64 void do_iterations()
65 {
66 while (true) {
67 size_t iteration = 0;
68 {
69 std::unique_lock<std::mutex> lock(tasks_mutex);
70 if (iteration_ == num_iterations_) {
71 return;
72 }
73 iteration = iteration_++;
74 }
75 // BB_BENCH_NAME("do_iterations()");
76 task_(iteration);
77 {
78 std::unique_lock<std::mutex> lock(tasks_mutex);
79 if (++complete_ == num_iterations_) {
80 complete_condition_.notify_one();
81 return;
82 }
83 }
84 }
85 }
86};
87
88ThreadPool::ThreadPool(size_t num_threads)
89{
90 workers.reserve(num_threads);
91 for (size_t i = 0; i < num_threads; ++i) {
92 workers.emplace_back(&ThreadPool::worker_loop, this, i);
93 }
94}
95
96ThreadPool::~ThreadPool()
97{
98 {
99 std::unique_lock<std::mutex> lock(tasks_mutex);
100 stop = true;
101 }
102 condition.notify_all();
103 for (auto& worker : workers) {
104 worker.join();
105 }
106}
107
108void ThreadPool::worker_loop(size_t /*unused*/)
109{
110 // info("created worker ", worker_num);
111 while (true) {
112 {
113 std::unique_lock<std::mutex> lock(tasks_mutex);
114 condition.wait(lock, [this] { return (iteration_ < num_iterations_) || stop; });
115
116 if (stop) {
117 break;
118 }
119 }
120 // Make sure nested stats accounting works under multithreading
121 // Note: parent is a thread-local variable.
123 do_iterations();
124 }
125 // info("worker exit ", worker_num);
126}
127} // namespace
128
129namespace bb {
134void parallel_for_mutex_pool(size_t num_iterations, const std::function<void(size_t)>& func)
135{
136 static ThreadPool pool(get_num_cpus() - 1);
137 // Note that if this is used safely, we don't need the std::atomic_bool (can use bool), but if we are catching the
138 // mess up case of nesting parallel_for this should be atomic
139 static std::atomic_bool nested = false;
140 // Check if we are already in a nested parallel_for_mutex_pool call
141 bool expected = false;
142 if (!nested.compare_exchange_strong(expected, true)) {
143 // Run single-threaded if nested
144 for (size_t i = 0; i < num_iterations; ++i) {
145 func(i);
146 }
147 return;
148 }
149 // info("starting job with iterations: ", num_iterations);
150 pool.start_tasks(num_iterations, func);
151 // info("done");
152 nested = false;
153}
154} // namespace bb
155#endif
#define BB_NO_PROFILE
Entry point for Barretenberg command-line interface.
void parallel_for_mutex_pool(size_t num_iterations, const std::function< void(size_t)> &func)
size_t get_num_cpus()
Definition thread.cpp:33
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
static thread_local TimeStatsEntry * parent
Definition bb_bench.hpp:83