Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
thread.cpp
Go to the documentation of this file.
1#include "thread.hpp"
2#include "log.hpp"
3#include "throw_or_abort.hpp"
5#include <cstdlib>
6#include <string>
7
8#ifndef NO_MULTITHREADING
9#include <thread>
10
11namespace {
12uint32_t& get_num_cores_ref()
13{
14 static thread_local const char* val = std::getenv("HARDWARE_CONCURRENCY");
15 static thread_local uint32_t cores =
16 val != nullptr ? static_cast<uint32_t>(std::stoul(val)) : env_hardware_concurrency();
17 return cores;
18}
19} // namespace
20#endif
21
22namespace bb {
23// only for testing purposes currently
24void set_parallel_for_concurrency([[maybe_unused]] size_t num_cores)
25{
26#ifdef NO_MULTITHREADING
27 throw_or_abort("Cannot set hardware concurrency when multithreading is disabled.");
28#else
29 get_num_cores_ref() = static_cast<uint32_t>(num_cores);
30#endif
31}
32
34{
35#ifdef NO_MULTITHREADING
36 return 1;
37#else
38 return static_cast<size_t>(get_num_cores_ref());
39#endif
40}
41} // namespace bb
42
78namespace bb {
79// 64 core aws r5.
80// pippenger run: pippenger_bench/1048576
81// coset_fft run: coset_fft_bench_parallel/4194304
82// proof run: 2m gate ultraplonk. average of 5.
83
84// pippenger: 179ms
85// coset_fft: 54776us
86// proof: 11.33s
87void parallel_for_omp(size_t num_iterations, const std::function<void(size_t)>& func);
88
89// pippenger: 163ms
90// coset_fft: 59993us
91// proof: 11.11s
92void parallel_for_moody(size_t num_iterations, const std::function<void(size_t)>& func);
93
94// pippenger: 154ms
95// coset_fft: 92997us
96// proof: 10.84s
97void parallel_for_spawning(size_t num_iterations, const std::function<void(size_t)>& func);
98
99// pippenger: 178ms
100// coset_fft: 70207us
101// proof: 11.55s
102void parallel_for_queued(size_t num_iterations, const std::function<void(size_t)>& func);
103
104// pippenger: 152ms
105// coset_fft: 56658us
106// proof: 11.28s
107void parallel_for_atomic_pool(size_t num_iterations, const std::function<void(size_t)>& func);
108
109void parallel_for_mutex_pool(size_t num_iterations, const std::function<void(size_t)>& func);
110
111void parallel_for(size_t num_iterations, const std::function<void(size_t)>& func)
112{
113#ifdef NO_MULTITHREADING
114 for (size_t i = 0; i < num_iterations; ++i) {
115 func(i);
116 }
117#else
118#ifdef OMP_MULTITHREADING
119 parallel_for_omp(num_iterations, func);
120#else
121 // parallel_for_spawning(num_iterations, func);
122 // parallel_for_moody(num_iterations, func);
123 // parallel_for_atomic_pool(num_iterations, func);
124 parallel_for_mutex_pool(num_iterations, func);
125 // parallel_for_queued(num_iterations, func);
126#endif
127#endif
128}
129
141void parallel_for_range(size_t num_points,
142 const std::function<void(size_t, size_t)>& func,
143 size_t no_multhreading_if_less_or_equal)
144{
145 if (num_points <= no_multhreading_if_less_or_equal) {
146 func(0, num_points);
147 return;
148 }
149 // Get number of cpus we can split into
150 const size_t num_cpus = get_num_cpus();
151
152 // Compute the size of a single chunk
153 const size_t chunk_size = (num_points / num_cpus) + (num_points % num_cpus == 0 ? 0 : 1);
154 // Parallelize over chunks
155 parallel_for(num_cpus, [num_points, chunk_size, &func](size_t chunk_index) {
156 // If num_points is small, sometimes we need fewer CPUs
157 if (chunk_size * chunk_index > num_points) {
158 return;
159 }
160 // Compute the current chunk size (can differ in case it's the last chunk)
161 size_t current_chunk_size = std::min(num_points - (chunk_size * chunk_index), chunk_size);
162 if (current_chunk_size == 0) {
163 return;
164 }
165 size_t start = chunk_index * chunk_size;
166 size_t end = chunk_index * chunk_size + current_chunk_size;
167 func(start, end);
168 });
169};
170
171void parallel_for_heuristic(size_t num_points,
172 const std::function<void(size_t, size_t, size_t)>& func,
173 size_t heuristic_cost)
174{
175 // We take the maximum observed parallel_for cost (388 us) and round it up.
176 // The goals of these checks is to evade significantly (10x) increasing processing time for small workloads. So we
177 // can accept not triggering parallel_for if the workload would become faster by half a millisecond for medium
178 // workloads
179 constexpr size_t PARALLEL_FOR_COST = 400000;
180 // Get number of cpus we can split into
181 const size_t num_cpus = get_num_cpus();
182
183 // Compute the size of a single chunk
184 const size_t chunk_size = (num_points / num_cpus) + (num_points % num_cpus == 0 ? 0 : 1);
185
186 // Compute the cost of all operations done by other threads
187 const size_t offset_cost = (num_points - chunk_size) * heuristic_cost;
188
189 // If starting parallel for is longer than computing, just compute
190 if (offset_cost < PARALLEL_FOR_COST) {
191 func(0, num_points, 0);
192 return;
193 }
194 // Parallelize over chunks
195 parallel_for(num_cpus, [num_points, chunk_size, &func](size_t chunk_index) {
196 // If num_points is small, sometimes we need fewer CPUs
197 if (chunk_size * chunk_index > num_points) {
198 return;
199 }
200 // Compute the current chunk size (can differ in case it's the last chunk)
201 size_t current_chunk_size = std::min(num_points - (chunk_size * chunk_index), chunk_size);
202 if (current_chunk_size == 0) {
203 return;
204 }
205 size_t start = chunk_index * chunk_size;
206 size_t end = chunk_index * chunk_size + current_chunk_size;
207
208 func(start, end, chunk_index);
209 });
210};
211
212MultithreadData calculate_thread_data(size_t num_iterations, size_t min_iterations_per_thread)
213{
214 size_t num_threads = calculate_num_threads(num_iterations, min_iterations_per_thread);
215 const size_t thread_size = num_iterations / num_threads;
216
217 // Cumpute the index bounds for each thread
218 std::vector<size_t> start(num_threads);
219 std::vector<size_t> end(num_threads);
220 for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
221 start[thread_idx] = thread_idx * thread_size;
222 end[thread_idx] = (thread_idx == num_threads - 1) ? num_iterations : (thread_idx + 1) * thread_size;
223 }
224
225 return MultithreadData{ num_threads, start, end };
226}
227
238size_t calculate_num_threads(size_t num_iterations, size_t min_iterations_per_thread)
239{
240 size_t max_num_threads = get_num_cpus(); // number of available threads
241 size_t desired_num_threads = num_iterations / min_iterations_per_thread;
242 size_t num_threads = std::min(desired_num_threads, max_num_threads); // fewer than max if justified
243 num_threads = num_threads > 0 ? num_threads : 1; // ensure num_threads is at least 1
244 return num_threads;
245}
246
254size_t calculate_num_threads_pow2(size_t num_iterations, size_t min_iterations_per_thread)
255{
256 size_t max_num_threads = get_num_cpus_pow2(); // number of available threads (power of 2)
257 size_t desired_num_threads = num_iterations / min_iterations_per_thread;
258 desired_num_threads = static_cast<size_t>(1ULL << numeric::get_msb(desired_num_threads));
259 size_t num_threads = std::min(desired_num_threads, max_num_threads); // fewer than max if justified
260 num_threads = num_threads > 0 ? num_threads : 1; // ensure num_threads is at least 1
261 return num_threads;
262}
263} // namespace bb
uint32_t env_hardware_concurrency()
constexpr T get_msb(const T in)
Definition get_msb.hpp:47
Entry point for Barretenberg command-line interface.
void parallel_for_mutex_pool(size_t num_iterations, const std::function< void(size_t)> &func)
MultithreadData calculate_thread_data(size_t num_iterations, size_t min_iterations_per_thread)
Calculates number of threads and index bounds for each thread.
Definition thread.cpp:212
void parallel_for_queued(size_t num_iterations, const std::function< void(size_t)> &func)
size_t get_num_cpus_pow2()
Definition thread.hpp:25
size_t get_num_cpus()
Definition thread.cpp:33
void parallel_for_moody(size_t num_iterations, const std::function< void(size_t)> &func)
size_t calculate_num_threads(size_t num_iterations, size_t min_iterations_per_thread)
calculates number of threads to create based on minimum iterations per thread
Definition thread.cpp:238
size_t calculate_num_threads_pow2(size_t num_iterations, size_t min_iterations_per_thread)
calculates number of threads to create based on minimum iterations per thread, guaranteed power of 2
Definition thread.cpp:254
void parallel_for_atomic_pool(size_t num_iterations, const std::function< void(size_t)> &func)
void parallel_for_heuristic(size_t num_points, const std::function< void(size_t, size_t, size_t)> &func, size_t heuristic_cost)
Split a loop into several loops running in parallel based on operations in 1 iteration.
Definition thread.cpp:171
void parallel_for_spawning(size_t num_iterations, const std::function< void(size_t)> &func)
void set_parallel_for_concurrency(size_t num_cores)
Definition thread.cpp:24
void parallel_for(size_t num_iterations, const std::function< void(size_t)> &func)
Definition thread.cpp:111
void parallel_for_omp(size_t num_iterations, const std::function< void(size_t)> &func)
void parallel_for_range(size_t num_points, const std::function< void(size_t, size_t)> &func, size_t no_multhreading_if_less_or_equal)
Split a loop into several loops running in parallel.
Definition thread.cpp:141
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
void throw_or_abort(std::string const &err)