Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
lmdb_store_wrapper.cpp
Go to the documentation of this file.
5#include "napi.h"
6#include <algorithm>
7#include <chrono>
8#include <cstdint>
9#include <iterator>
10#include <memory>
11#include <optional>
12#include <ratio>
13#include <stdexcept>
14#include <utility>
15
16using namespace bb::nodejs;
17using namespace bb::nodejs::lmdb_store;
18
19const uint64_t DEFAULT_MAP_SIZE = 1024UL * 1024;
20const uint64_t DEFAULT_MAX_READERS = 16;
21const uint64_t DEFAULT_CURSOR_PAGE_SIZE = 10;
22
23LMDBStoreWrapper::LMDBStoreWrapper(const Napi::CallbackInfo& info)
24 : ObjectWrap(info)
25{
26 Napi::Env env = info.Env();
27
28 size_t data_dir_index = 0;
29 std::string data_dir;
30 if (info.Length() > data_dir_index && info[data_dir_index].IsString()) {
31 data_dir = info[data_dir_index].As<Napi::String>();
32 } else {
33 throw Napi::TypeError::New(env, "Directory needs to be a string");
34 }
35
36 size_t map_size_index = 1;
37 uint64_t map_size = DEFAULT_MAP_SIZE;
38 if (info.Length() > map_size_index) {
40 map_size = info[map_size_index].As<Napi::Number>().Uint32Value();
41 } else {
42 throw Napi::TypeError::New(env, "Map size must be a number or an object");
43 }
44 }
45
46 size_t max_readers_index = 2;
48 if (info.Length() > max_readers_index) {
50 max_readers = info[max_readers_index].As<Napi::Number>().Uint32Value();
51 } else if (!info[max_readers_index].IsUndefined()) {
52 throw Napi::TypeError::New(env, "The number of readers must be a number");
53 }
54 }
55
57
59
62
68
70
72
73 // The close operation requires exclusive execution, no other operations can be run concurrently with it
75
77}
78
79Napi::Value LMDBStoreWrapper::call(const Napi::CallbackInfo& info)
80{
82}
83
84Napi::Function LMDBStoreWrapper::get_class(Napi::Env env)
85{
86 return DefineClass(env,
87 "Store",
88 {
90 });
91}
92
93// Simply verify that the store is still valid and that close has not been called
95{
96 if (_store) {
97 return;
98 }
99 throw std::runtime_error(format("LMDB store unavailable, was close already called?"));
100}
101
103{
104 verify_store();
105 _store->open_database(req.db, !req.uniqueKeys.value_or(true));
106 return { true };
107}
108
110{
111 verify_store();
113 lmdblib::KeysVector keys = req.keys;
114 _store->get(keys, vals, req.db);
115 return { vals };
116}
117
119{
120 verify_store();
122 for (const auto& entry : req.entries) {
123 key_set.insert(entry.first);
124 }
125
126 lmdblib::KeysVector keys(key_set.begin(), key_set.end());
128 _store->get(keys, vals, req.db);
129
130 std::vector<bool> exists;
131
132 for (const auto& entry : req.entries) {
133 const auto& key = entry.first;
134 const auto& requested_values = entry.second;
135
136 const auto& key_it = std::find(keys.begin(), keys.end(), key);
137 if (key_it == keys.end()) {
138 // this shouldn't happen. It means we missed a key when we created the key_set
139 exists.push_back(false);
140 continue;
141 }
142
143 // should be fine to convert this to an index in the array?
144 const auto& values = vals[static_cast<size_t>(key_it - keys.begin())];
145
146 if (!values.has_value()) {
147 exists.push_back(false);
148 continue;
149 }
150
151 // client just wanted to know if the key exists
152 if (!requested_values.has_value()) {
153 exists.push_back(true);
154 continue;
155 }
156
157 exists.push_back(std::all_of(requested_values->begin(), requested_values->end(), [&](const auto& val) {
158 return std::find(values->begin(), values->end(), val) != values->begin();
159 }));
160 }
161
162 return { exists };
163}
164
166{
167 verify_store();
168 bool reverse = req.reverse.value_or(false);
170 bool one_page = req.onePage.value_or(false);
171 lmdblib::Key key = req.key;
172
173 auto tx = _store->create_shared_read_transaction();
174 lmdblib::LMDBCursor::SharedPtr cursor = _store->create_cursor(tx, req.db);
175 bool start_ok = cursor->set_at_key(key);
176
177 if (!start_ok) {
178 // we couldn't find exactly the requested key. Find the next biggest one.
179 start_ok = cursor->set_at_key_gte(key);
180 // if we found a key that's greater _and_ we want to go in reverse order
181 // then we're actually outside the requested bounds, we need to go back one position
182 if (start_ok && reverse) {
184 // read_prev returns `true` if there's nothing more to read
185 // turn this into a "not ok" because there's nothing in the db for this cursor to read
186 start_ok = !cursor->read_prev(1, entries);
187 } else if (!start_ok && reverse) {
188 // we couldn't find a key greater than our starting point _and_ we want to go in reverse..
189 // then we start at the end of the database (the client requested to start at a key greater than anything in
190 // the DB)
191 start_ok = cursor->set_at_end();
192 }
193
194 // in case we're iterating in ascending order and we can't find the exact key or one that's greater than it
195 // then that means theren's nothing in the DB for the cursor to read
196 }
197
198 // we couldn't find a starting position
199 if (!start_ok) {
200 return { std::nullopt, {} };
201 }
202
203 auto [done, first_page] = _advance_cursor(*cursor, reverse, page_size);
204 // cursor finished after reading a single page or client only wanted the first page
205 if (done || one_page) {
206 return { std::nullopt, first_page };
207 }
208
209 auto cursor_id = cursor->id();
210 {
212 _cursors[cursor_id] = { cursor, reverse };
213 }
214
215 return { cursor_id, first_page };
216}
217
219{
220 {
222 _cursors.erase(req.cursor);
223 }
224 return { true };
225}
226
228{
230
231 {
233 data = _cursors.at(req.cursor);
234 }
235
237 auto [done, entries] = _advance_cursor(*data.cursor, data.reverse, page_size);
238 return { entries, done };
239}
240
242{
244
245 {
247 data = _cursors.at(req.cursor);
248 }
249
250 auto [done, count] = _advance_cursor_count(*data.cursor, data.reverse, req.endKey);
251 return { count, done };
252}
253
255{
256 verify_store();
258 batches.reserve(req.batches.size());
259
260 for (const auto& data : req.batches) {
261 lmdblib::LMDBStore::PutData batch{ data.second.addEntries, data.second.removeEntries, data.first };
262 batches.push_back(batch);
263 }
264
265 auto start = std::chrono::high_resolution_clock::now();
266 _store->put(batches);
267 auto end = std::chrono::high_resolution_clock::now();
268 std::chrono::duration<uint64_t, std::nano> duration_ns = end - start;
269
270 return { duration_ns.count() };
271}
272
274{
275 verify_store();
277 auto [map_size, physical_file_size] = _store->get_stats(stats);
278 return { stats, map_size, physical_file_size };
279}
280
282{
283 // prevent this store from receiving further messages
285
286 {
287 // close all of the open read cursors
288 std::lock_guard cursors(_cursor_mutex);
289 _cursors.clear();
290 }
291
292 // and finally close the database handle
293 _store.reset(nullptr);
294
295 return { true };
296}
297
299{
300 verify_store();
301 _store->copy_store(req.dstPath, req.compact.value_or(false));
302
303 return { true };
304}
305
307 bool reverse,
308 uint64_t page_size)
309{
311 bool done = reverse ? cursor.read_prev(page_size, entries) : cursor.read_next(page_size, entries);
312 return std::make_pair(done, entries);
313}
314
316 bool reverse,
317 const lmdblib::Key& end_key)
318{
319 uint64_t count = 0;
320 bool done = reverse ? cursor.count_until_prev(end_key, count) : cursor.count_until_next(end_key, count);
321 return std::make_pair(done, count);
322}
bool count_until_next(const Key &key, uint64_t &count) const
bool read_next(uint64_t numKeysToRead, KeyDupValuesVector &keyValuePairs) const
bool read_prev(uint64_t numKeysToRead, KeyDupValuesVector &keyValuePairs) const
std::shared_ptr< LMDBCursor > SharedPtr
bool count_until_prev(const Key &key, uint64_t &count) const
void register_handler(uint32_t msgType, T *self, R(T::*handler)() const, bool unique=false)
Napi::Promise process_message(const Napi::CallbackInfo &info)
StartCursorResponse start_cursor(const StartCursorRequest &req)
GetResponse get(const GetRequest &req)
static Napi::Function get_class(Napi::Env env)
BoolResponse close_cursor(const CloseCursorRequest &req)
BoolResponse open_database(const OpenDatabaseRequest &req)
bb::nodejs::AsyncMessageProcessor _msg_processor
HasResponse has(const HasRequest &req)
BatchResponse batch(const BatchRequest &req)
BoolResponse copy_store(const CopyStoreRequest &req)
std::unordered_map< uint64_t, CursorData > _cursors
static std::pair< bool, uint64_t > _advance_cursor_count(const lmdblib::LMDBCursor &cursor, bool reverse, const lmdblib::Key &end_key)
AdvanceCursorResponse advance_cursor(const AdvanceCursorRequest &req)
AdvanceCursorCountResponse advance_cursor_count(const AdvanceCursorCountRequest &req)
Napi::Value call(const Napi::CallbackInfo &)
The only instance method exposed to JavaScript. Takes a msgpack Message and returns a Promise.
static std::pair< bool, lmdblib::KeyDupValuesVector > _advance_cursor(const lmdblib::LMDBCursor &cursor, bool reverse, uint64_t page_size)
std::unique_ptr< lmdblib::LMDBStore > _store
std::string format(Args... args)
Definition log.hpp:21
void info(Args... args)
Definition log.hpp:74
const std::vector< FF > data
const uint64_t DEFAULT_MAP_SIZE
const uint64_t DEFAULT_MAX_READERS
const uint64_t DEFAULT_CURSOR_PAGE_SIZE
std::vector< Key > KeysVector
Definition types.hpp:13
std::vector< uint8_t > Key
Definition types.hpp:11
std::vector< KeyValuesPair > KeyDupValuesVector
Definition types.hpp:18
std::vector< OptionalValues > OptionalValuesVector
Definition types.hpp:17
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13