21 #include <tbb/parallel_for.h>
22 #include <tbb/task_arena.h>
24 #include <boost/filesystem/operations.hpp>
25 #include <boost/filesystem/path.hpp>
26 #include <boost/iterator/transform_iterator.hpp>
27 #include <boost/sort/spreadsort/string_sort.hpp>
32 #include <string_view>
34 #include <type_traits>
41 #include <sys/fcntl.h>
62 auto fd =
heavyai::open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
66 auto err = std::string(
"Dictionary path ") + std::string(path) +
67 std::string(
" does not exist.");
84 if (in == 0 || (in > (UINT32_MAX))) {
93 for (
size_t i = 0; i < str.size(); ++i) {
94 str_hash = str_hash * 997 + str[i];
100 int64_t num_threads{0};
104 const int64_t num_elems,
105 const int64_t target_elems_per_thread) {
107 std::min(std::max(max_thread_count, int64_t(1)),
108 ((num_elems + target_elems_per_thread - 1) / target_elems_per_thread));
109 num_elems_per_thread =
110 std::max((num_elems + num_threads - 1) / num_threads, int64_t(1));
122 const std::string& folder,
125 const bool materializeHashes,
126 size_t initial_capacity)
127 : dict_key_(dict_key)
130 , string_id_string_dict_hash_table_(initial_capacity, INVALID_STR_ID)
131 , hash_cache_(initial_capacity)
133 , materialize_hashes_(materializeHashes)
136 , offset_map_(nullptr)
137 , payload_map_(nullptr)
138 , offset_file_size_(0)
139 , payload_file_size_(0)
140 , payload_file_off_(0)
141 , like_cache_size_(0)
142 , regex_cache_size_(0)
143 , equal_cache_size_(0)
144 , compare_cache_size_(0)
145 , strings_cache_(nullptr)
146 , strings_cache_size_(0) {
147 if (!isTemp && folder.empty()) {
152 CHECK_EQ(
size_t(0), (initial_capacity & (initial_capacity - 1)));
154 boost::filesystem::path storage_path(folder);
155 offsets_path_ = (storage_path / boost::filesystem::path(
"DictOffsets")).
string();
156 const auto payload_path =
157 (storage_path / boost::filesystem::path(
"DictPayload")).
string();
163 bool storage_is_empty =
false;
165 storage_is_empty =
true;
181 const uint64_t str_count =
186 const uint64_t max_entries =
188 round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
192 std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
197 if (str_count == 0) {
201 unsigned string_id = 0;
202 std::lock_guard<std::shared_mutex> write_lock(
rw_mutex_);
204 uint32_t thread_inits = 0;
205 const auto thread_count = std::thread::hardware_concurrency();
206 const uint32_t items_per_thread = std::max<uint32_t>(
207 2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
208 std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
210 for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
213 std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
214 for (uint32_t curr_id = string_id;
215 curr_id < string_id + items_per_thread && curr_id < str_count;
218 if (recovered.canary) {
222 std::string_view temp(recovered.c_str_ptr, recovered.size);
223 hashVec.emplace_back(std::make_pair(
hash_string(temp), temp.size()));
229 if (thread_inits % thread_count == 0) {
234 if (dictionary_futures.size() != 0) {
237 VLOG(1) <<
"Opened string dictionary " << folder <<
" # Strings: " <<
str_count_
249 std::unordered_map<std::string, int32_t>
map_;
252 void operator()(std::string
const& str, int32_t
const string_id)
override {
253 auto const emplaced = map_.emplace(str, string_id);
254 CHECK(emplaced.second) <<
"str(" << str <<
") string_id(" << string_id <<
')';
256 void operator()(std::string_view
const, int32_t
const string_id)
override {
257 UNREACHABLE() <<
"MapMaker must be called with a std::string.";
259 std::unordered_map<std::string, int32_t>
moveMap() {
return std::move(map_); }
266 constexpr
size_t big_gen =
static_cast<size_t>(std::numeric_limits<size_t>::max());
269 return [map{map_maker.moveMap()}](std::string
const& str) {
270 auto const itr = map.find(str);
282 CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
283 for (
unsigned id = 0;
id <
n; ++id) {
285 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
288 serial_callback(str,
id);
291 size_t const n = std::min(static_cast<size_t>(generation),
str_count_);
292 CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
293 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
294 for (
unsigned id = 0;
id <
n; ++id) {
301 std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>&
302 dictionary_futures) {
303 for (
auto& dictionary_future : dictionary_futures) {
304 dictionary_future.wait();
305 const auto hashVec = dictionary_future.get();
306 for (
const auto& hash : hashVec) {
307 const uint32_t bucket =
317 dictionary_futures.clear();
332 const size_t storage_slots)
const noexcept {
333 if (storage_slots == 0) {
338 int64_t min_bound = 0;
339 int64_t max_bound = storage_slots - 1;
341 while (min_bound <= max_bound) {
342 guess = (max_bound + min_bound) / 2;
344 if (getStringFromStorage(guess).canary) {
345 max_bound = guess - 1;
347 min_bound = guess + 1;
350 CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
351 return guess + (min_bound > guess ? 1 : 0);
356 : dict_key_(dict_key)
357 , folder_(
"DB_" + std::
to_string(dict_key.db_id) +
"_DICT_" +
359 , strings_cache_(nullptr)
390 std::vector<int32_t> string_ids;
391 client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
392 CHECK_EQ(
size_t(1), string_ids.size());
393 return string_ids.front();
395 return getOrAddImpl(str);
402 std::ostringstream oss;
403 oss <<
"The text encoded column using dictionary " << dict_key
404 <<
" has exceeded it's limit of " <<
sizeof(
T) * 8 <<
" bits ("
405 << static_cast<size_t>(max_valid_int_value<T>() + 1) <<
" unique values) "
406 <<
"while attempting to add the new string '" << str <<
"'. ";
415 oss <<
"To load more data, please re-create the table with "
416 <<
"this column as type TEXT ENCODING DICT(" <<
sizeof(
T) * 2 * 8 <<
") ";
417 if (
sizeof(
T) == 1) {
418 oss <<
"or TEXT ENCODING DICT(32) ";
420 oss <<
"and reload your data.";
424 oss <<
"Currently dictionary-encoded text columns support a maximum of "
426 <<
" strings. Consider recreating the table with "
427 <<
"this column as type TEXT ENCODING NONE and reloading your data.";
430 throw std::runtime_error(oss.str());
435 std::ostringstream oss;
436 oss <<
"The string '" << str <<
" could not be inserted into the dictionary "
437 << dict_key <<
" because it exceeded the maximum allowable "
439 << str.size() <<
" characters).";
441 throw std::runtime_error(oss.str());
446 template <
class String>
448 const std::vector<std::vector<String>>& string_array_vec,
449 std::vector<std::vector<int32_t>>& ids_array_vec) {
455 ids_array_vec.resize(string_array_vec.size());
456 for (
size_t i = 0; i < string_array_vec.size(); i++) {
457 auto& strings = string_array_vec[i];
458 auto& ids = ids_array_vec[i];
459 ids.resize(strings.size());
465 const std::vector<std::vector<std::string>>& string_array_vec,
466 std::vector<std::vector<int32_t>>& ids_array_vec);
469 const std::vector<std::vector<std::string_view>>& string_array_vec,
470 std::vector<std::vector<int32_t>>& ids_array_vec);
477 template <
class String>
479 const std::vector<String>& string_vec,
480 std::vector<string_dict_hash_t>& hashes)
const noexcept {
481 CHECK_EQ(string_vec.size(), hashes.size());
484 [&string_vec, &hashes](
const tbb::blocked_range<size_t>& r) {
485 for (
size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
486 if (string_vec[curr_id].empty()) {
489 hashes[curr_id] =
hash_string(string_vec[curr_id]);
494 template <
class T,
class String>
496 T* encoded_vec)
const {
497 return getBulk(string_vec, encoded_vec, -1L );
501 uint8_t* encoded_vec)
const;
503 uint16_t* encoded_vec)
const;
505 int32_t* encoded_vec)
const;
507 template <
class T,
class String>
510 const int64_t generation)
const {
511 constexpr int64_t target_strings_per_thread{1000};
512 const int64_t num_lookup_strings = string_vec.size();
513 if (num_lookup_strings == 0) {
518 std::thread::hardware_concurrency(), num_lookup_strings, target_strings_per_thread);
522 std::vector<size_t> num_strings_not_found_per_thread(thread_info.
num_threads, 0UL);
524 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
525 const int64_t num_dict_strings = generation >= 0 ? generation :
storageEntryCount();
526 const bool dictionary_is_empty = (num_dict_strings == 0);
527 if (dictionary_is_empty) {
529 [&](
const tbb::blocked_range<int64_t>& r) {
530 const int64_t start_idx = r.begin();
531 const int64_t end_idx = r.end();
532 for (int64_t string_idx = start_idx; string_idx < end_idx;
537 return num_lookup_strings;
542 tbb::task_arena limited_arena(thread_info.
num_threads);
543 limited_arena.execute([&] {
545 tbb::blocked_range<int64_t>(
547 [&](
const tbb::blocked_range<int64_t>& r) {
548 const int64_t start_idx = r.begin();
549 const int64_t end_idx = r.end();
550 size_t num_strings_not_found = 0;
551 for (int64_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
552 const auto& input_string = string_vec[string_idx];
553 if (input_string.empty()) {
554 encoded_vec[string_idx] = inline_int_null_value<T>();
566 string_id >= num_dict_strings) {
568 num_strings_not_found++;
571 encoded_vec[string_idx] = string_id;
573 const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
574 num_strings_not_found_per_thread[tbb_thread_idx] = num_strings_not_found;
576 tbb::simple_partitioner());
579 size_t num_strings_not_found = 0;
580 for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
581 num_strings_not_found += num_strings_not_found_per_thread[thread_idx];
583 return num_strings_not_found;
587 uint8_t* encoded_vec,
588 const int64_t generation)
const;
590 uint16_t* encoded_vec,
591 const int64_t generation)
const;
593 int32_t* encoded_vec,
594 const int64_t generation)
const;
596 template <
class T,
class String>
598 T* output_string_ids) {
604 std::lock_guard<std::shared_mutex> write_lock(
rw_mutex_);
608 for (
const auto& input_string : input_strings) {
609 if (input_string.empty()) {
610 output_string_ids[idx++] = inline_int_null_value<T>();
616 uint32_t hash_bucket =
624 if (
str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
625 throw_encoding_error<T>(input_string,
dict_key_);
629 <<
") of Dictionary encoded Strings reached for this column, offset path "
643 const int32_t string_id =
static_cast<int32_t
>(
str_count_);
645 output_string_ids[idx++] = string_id;
648 const size_t num_strings_added =
str_count_ - initial_str_count;
649 if (num_strings_added > 0) {
654 template <
class T,
class String>
656 T* output_string_ids) {
659 std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
662 std::lock_guard<std::shared_mutex> write_lock(
rw_mutex_);
663 size_t shadow_str_count =
665 const size_t storage_high_water_mark = shadow_str_count;
666 std::vector<size_t> string_memory_ids;
667 size_t sum_new_string_lengths = 0;
668 string_memory_ids.reserve(input_strings.size());
669 size_t input_string_idx{0};
670 for (
const auto& input_string : input_strings) {
672 if (input_string.empty()) {
673 output_string_ids[input_string_idx++] = inline_int_null_value<T>();
682 storage_high_water_mark,
685 input_strings_hashes);
690 const uint32_t hash_bucket =
694 storage_high_water_mark,
702 output_string_ids[input_string_idx++] =
708 if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
709 throw_encoding_error<T>(input_string,
dict_key_);
712 <<
"Maximum number (" << shadow_str_count
713 <<
") of Dictionary encoded Strings reached for this column, offset path "
717 string_memory_ids.push_back(input_string_idx);
718 sum_new_string_lengths += input_string.size();
720 static_cast<int32_t
>(shadow_str_count);
724 output_string_ids[input_string_idx++] = shadow_str_count++;
727 const size_t num_strings_added = shadow_str_count -
str_count_;
728 str_count_ = shadow_str_count;
729 if (num_strings_added > 0) {
734 uint8_t* encoded_vec);
736 uint16_t* encoded_vec);
738 int32_t* encoded_vec);
741 const std::vector<std::string_view>& string_vec,
742 uint8_t* encoded_vec);
744 const std::vector<std::string_view>& string_vec,
745 uint16_t* encoded_vec);
747 const std::vector<std::string_view>& string_vec,
748 int32_t* encoded_vec);
750 template <
class String>
752 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
754 if constexpr (std::is_same_v<std::string, std::decay_t<String>>) {
757 return client_->get(std::string(str));
768 auto str_id = string_id_string_dict_hash_table_[computeBucket(
769 hash, sv, string_id_string_dict_hash_table_)];
774 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
777 client_->get_string(ret, string_id);
784 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
785 CHECK(!
isClient()) <<
"use of this function is unsupported in distributed";
790 CHECK_LT(string_id, static_cast<int32_t>(str_count_));
791 return getStringChecked(string_id);
795 int32_t string_id)
const noexcept {
796 CHECK_LT(string_id, static_cast<int32_t>(str_count_));
797 return getStringViewChecked(string_id);
801 int32_t string_id)
const noexcept {
802 std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
805 CHECK_LT(string_id, static_cast<int32_t>(str_count_));
806 return getStringBytesChecked(string_id);
810 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
812 return client_->storage_entry_count();
817 template <
typename T>
820 const bool is_simple,
822 const size_t generation)
const {
823 constexpr
size_t grain_size{1000};
827 auto const num_threads =
static_cast<size_t>(
cpu_threads());
828 std::vector<std::vector<T>> worker_results(num_threads);
829 tbb::task_arena limited_arena(num_threads);
830 limited_arena.execute([&] {
832 tbb::blocked_range<size_t>(0, generation, grain_size),
833 [&is_like_impl, &pattern, &escape, &worker_results,
this](
834 const tbb::blocked_range<size_t>& range) {
835 auto& result_vector =
836 worker_results[tbb::this_task_arena::current_thread_index()];
837 for (
size_t i = range.begin(); i < range.end(); ++i) {
840 str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape)) {
841 result_vector.push_back(i);
847 std::vector<size_t> start_offsets(num_threads + 1, 0);
848 auto vec_size = [](std::vector<T>
const& vec) {
return vec.size(); };
849 auto begin = boost::make_transform_iterator(worker_results.begin(), vec_size);
850 auto end = boost::make_transform_iterator(worker_results.end(), vec_size);
853 std::vector<T>
result(start_offsets[num_threads]);
854 limited_arena.execute([&] {
856 tbb::blocked_range<size_t>(0, num_threads, 1),
857 [&worker_results, &result, &start_offsets](
858 const tbb::blocked_range<size_t>& range) {
859 auto& result_vector = worker_results[range.begin()];
860 auto const start_offset = start_offsets[range.begin()];
862 result_vector.begin(), result_vector.end(), result.begin() + start_offset);
864 tbb::static_partitioner());
869 std::vector<int32_t> StringDictionary::getLike<int32_t>(
const std::string& pattern,
871 const bool is_simple,
873 const size_t generation)
const {
874 std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
876 return client_->get_like_i32(pattern, icase, is_simple, escape, generation);
878 const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
879 const auto it = like_i32_cache_.find(cache_key);
880 if (it != like_i32_cache_.end()) {
884 auto result = getLikeImpl<int32_t>(pattern, icase, is_simple, escape, generation);
886 const auto it_ok = like_i32_cache_.insert(std::make_pair(cache_key,
result));
887 like_cache_size_ += (pattern.size() + 3 + (
result.size() *
sizeof(int32_t)));
895 std::vector<int64_t> StringDictionary::getLike<int64_t>(
const std::string& pattern,
897 const bool is_simple,
899 const size_t generation)
const {
900 std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
902 return client_->get_like_i64(pattern, icase, is_simple, escape, generation);
904 const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
905 const auto it = like_i64_cache_.find(cache_key);
906 if (it != like_i64_cache_.end()) {
910 auto result = getLikeImpl<int64_t>(pattern, icase, is_simple, escape, generation);
912 const auto it_ok = like_i64_cache_.insert(std::make_pair(cache_key,
result));
913 like_cache_size_ += (pattern.size() + 3 + (
result.size() *
sizeof(int64_t)));
921 std::string comp_operator,
923 std::vector<int32_t>
result;
928 eq_id = eq_id_itr->second;
929 if (comp_operator ==
"=") {
930 result.push_back(eq_id);
932 for (int32_t idx = 0; idx <= cur_size; idx++) {
936 result.push_back(idx);
940 std::vector<std::thread> workers;
943 std::vector<std::vector<int32_t>> worker_results(worker_count);
945 for (
int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
946 workers.emplace_back(
947 [&worker_results, &pattern, generation, worker_idx, worker_count,
this]() {
948 for (
size_t string_id = worker_idx; string_id < generation;
949 string_id += worker_count) {
951 if (str == pattern) {
952 worker_results[worker_idx].push_back(string_id);
957 for (
auto& worker : workers) {
960 for (
const auto& worker_result : worker_results) {
961 result.insert(result.end(), worker_result.begin(), worker_result.end());
963 if (result.size() > 0) {
964 const auto it_ok =
equal_cache_.insert(std::make_pair(pattern, result[0]));
969 if (comp_operator ==
"<>") {
970 for (int32_t idx = 0; idx <= cur_size; idx++) {
974 result.push_back(idx);
982 const std::string& comp_operator,
983 const size_t generation) {
984 std::lock_guard<std::shared_mutex> write_lock(
rw_mutex_);
986 return client_->get_compare(pattern, comp_operator, generation);
988 std::vector<int32_t> ret;
993 if (comp_operator ==
"=" || comp_operator ==
"<>") {
994 return getEquals(pattern, comp_operator, generation);
1002 cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
1007 [
this](decltype(
sorted_cache)::value_type
const&
a, decltype(pattern)& b) {
1009 return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
1014 cache_index->diff = 1;
1018 cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
1019 cache_index->index = cache_itr -
sorted_cache.begin() - 1;
1020 cache_index->diff = 1;
1023 cache_index->diff = 0;
1045 if (comp_operator ==
"<") {
1046 size_t idx = cache_index->index;
1047 if (cache_index->diff) {
1048 idx = cache_index->index + 1;
1049 if (cache_index->index == 0 && cache_index->diff > 0) {
1050 idx = cache_index->index;
1053 for (
size_t i = 0; i < idx; i++) {
1064 }
else if (comp_operator ==
"<=") {
1065 size_t idx = cache_index->index + 1;
1066 if (cache_index == 0 && cache_index->diff > 0) {
1067 idx = cache_index->index;
1069 for (
size_t i = 0; i < idx; i++) {
1078 }
else if (comp_operator ==
">") {
1079 size_t idx = cache_index->index + 1;
1080 if (cache_index->index == 0 && cache_index->diff > 0) {
1081 idx = cache_index->index;
1093 }
else if (comp_operator ==
">=") {
1094 size_t idx = cache_index->index;
1095 if (cache_index->diff) {
1096 idx = cache_index->index + 1;
1097 if (cache_index->index == 0 && cache_index->diff > 0) {
1098 idx = cache_index->index;
1104 }
else if (comp_operator ==
"=") {
1105 if (!cache_index->diff) {
1111 }
else if (comp_operator ==
"<>") {
1112 if (!cache_index->diff) {
1113 size_t idx = cache_index->index;
1114 for (
size_t i = 0; i < idx; i++) {
1128 std::runtime_error(
"Unsupported string comparison operator");
1136 const std::string& pattern,
1137 const char escape) {
1138 return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
1145 const size_t generation)
const {
1146 std::lock_guard<std::shared_mutex> write_lock(
rw_mutex_);
1148 return client_->get_regexp_like(pattern, escape, generation);
1150 const auto cache_key = std::make_pair(pattern, escape);
1155 std::vector<int32_t>
result;
1156 std::vector<std::thread> workers;
1159 std::vector<std::vector<int32_t>> worker_results(worker_count);
1161 for (
int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
1162 workers.emplace_back([&worker_results,
1169 for (
size_t string_id = worker_idx; string_id < generation;
1170 string_id += worker_count) {
1173 worker_results[worker_idx].push_back(string_id);
1178 for (
auto& worker : workers) {
1181 for (
const auto& worker_result : worker_results) {
1182 result.insert(result.end(), worker_result.begin(), worker_result.end());
1184 const auto it_ok =
regex_cache_.insert(std::make_pair(cache_key, result));
1186 CHECK(it_ok.second);
1192 std::lock_guard<std::shared_mutex> write_lock(
rw_mutex_);
1195 throw std::runtime_error(
1196 "copying dictionaries from remote server is not supported yet.");
1205 const bool multithreaded =
str_count_ > 10000;
1206 const auto worker_count =
1207 multithreaded ?
static_cast<size_t>(
cpu_threads()) :
size_t(1);
1209 std::vector<std::vector<std::string>> worker_results(worker_count);
1210 std::vector<size_t> string_size(worker_count, 0);
1211 auto copy = [
this, &string_size](std::vector<std::string>& str_list,
1212 const size_t worker_idx,
1213 const size_t start_id,
1214 const size_t end_id) {
1216 str_list.reserve(end_id - start_id);
1217 for (
size_t string_id = start_id; string_id < end_id; ++string_id) {
1219 string_size[worker_idx] += str.size();
1220 str_list.push_back(str);
1223 if (multithreaded) {
1224 std::vector<std::future<void>> workers;
1225 const auto stride = (
str_count_ + (worker_count - 1)) / worker_count;
1226 for (
size_t worker_idx = 0, start = 0, end = std::min(start + stride,
str_count_);
1227 worker_idx < worker_count && start <
str_count_;
1228 ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
1231 std::ref(worker_results[worker_idx]),
1236 for (
auto& worker : workers) {
1240 CHECK_EQ(worker_results.size(), size_t(1));
1244 for (
const auto& worker_result : worker_results) {
1246 strings_cache_->end(), worker_result.begin(), worker_result.end());
1254 return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1265 new_str_ids[bucket] = i;
1273 new_str_ids[bucket] = i;
1279 template <
class String>
1281 const size_t str_count,
1283 const size_t storage_high_water_mark,
1284 const std::vector<String>& input_strings,
1285 const std::vector<size_t>& string_memory_ids,
1286 const std::vector<string_dict_hash_t>& input_strings_hashes) noexcept {
1287 std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1289 if (materialize_hashes_) {
1290 for (
size_t i = 0; i != str_count; ++i) {
1292 const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1293 new_str_ids[bucket] = i;
1295 hash_cache_.resize(hash_cache_.size() * 2);
1297 for (
size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1298 const auto storage_string = getStringChecked(storage_idx);
1300 const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1301 new_str_ids[bucket] = storage_idx;
1303 for (
size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1304 const size_t string_memory_id = string_memory_ids[memory_idx];
1305 const uint32_t bucket = computeUniqueBucketWithHash(
1306 input_strings_hashes[string_memory_id], new_str_ids);
1307 new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1310 string_id_string_dict_hash_table_.swap(new_str_ids);
1315 if (str.size() == 0) {
1316 return inline_int_null_value<int32_t>();
1318 CHECK(str.size() <= MAX_STRLEN);
1321 std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1322 const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1323 if (string_id_string_dict_hash_table_[bucket] != INVALID_STR_ID) {
1324 return string_id_string_dict_hash_table_[bucket];
1327 std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1328 if (fillRateIsHigh(str_count_)) {
1330 increaseHashTableCapacity();
1334 const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1335 if (string_id_string_dict_hash_table_[bucket] == INVALID_STR_ID) {
1337 <<
"Maximum number (" << str_count_
1338 <<
") of Dictionary encoded Strings reached for this column, offset path "
1341 appendToStorage(str);
1342 string_id_string_dict_hash_table_[bucket] =
static_cast<int32_t
>(str_count_);
1343 if (materialize_hashes_) {
1344 hash_cache_[str_count_] = hash;
1347 invalidateInvertedIndex();
1349 return string_id_string_dict_hash_table_[bucket];
1353 const auto str_canary = getStringFromStorage(string_id);
1354 CHECK(!str_canary.canary);
1355 return std::string(str_canary.c_str_ptr, str_canary.size);
1359 const int string_id)
const noexcept {
1360 const auto str_canary = getStringFromStorage(string_id);
1361 CHECK(!str_canary.canary);
1362 return std::string_view{str_canary.c_str_ptr, str_canary.size};
1366 const int string_id)
const noexcept {
1367 const auto str_canary = getStringFromStorage(string_id);
1368 CHECK(!str_canary.canary);
1369 return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1372 template <
class String>
1375 const String& input_string,
1376 const std::vector<int32_t>& string_id_string_dict_hash_table)
const noexcept {
1377 const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1378 uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1380 const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1381 if (candidate_string_id ==
1385 if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1386 !materialize_hashes_) {
1387 const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1388 if (input_string.size() == candidate_string.size() &&
1389 !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1395 if (++bucket == string_dict_hash_table_size) {
1402 template <
class String>
1405 const String& input_string,
1406 const std::vector<int32_t>& string_id_string_dict_hash_table,
1407 const size_t storage_high_water_mark,
1408 const std::vector<String>& input_strings,
1409 const std::vector<size_t>& string_memory_ids)
const noexcept {
1410 uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1412 const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1413 if (candidate_string_id ==
1417 if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1418 if (candidate_string_id > 0 &&
1419 static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1422 size_t memory_offset =
1423 static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1424 const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1425 if (input_string.size() == candidate_string.size() &&
1426 !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1432 const auto candidate_storage_string =
1433 getStringFromStorageFast(candidate_string_id);
1434 if (input_string.size() == candidate_storage_string.size() &&
1435 !memcmp(input_string.data(),
1436 candidate_storage_string.data(),
1437 input_string.size())) {
1445 if (++bucket == string_id_string_dict_hash_table.size()) {
1454 const std::vector<int32_t>& string_id_string_dict_hash_table) noexcept {
1455 const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1456 uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1458 if (string_id_string_dict_hash_table[bucket] ==
1464 if (++bucket == string_dict_hash_table_size) {
1472 const size_t write_length) {
1474 const size_t min_capacity_needed =
1491 const size_t write_length) {
1494 const size_t min_capacity_needed =
1510 template <
class String>
1513 checkAndConditionallyIncreasePayloadCapacity(str.size());
1514 memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1518 payload_file_off_ += str.size();
1520 checkAndConditionallyIncreaseOffsetCapacity(
sizeof(str_meta));
1521 memcpy(offset_map_ + str_count_, &str_meta,
sizeof(str_meta));
1524 template <
class String>
1526 const std::vector<String>& input_strings,
1527 const std::vector<size_t>& string_memory_ids,
1528 const size_t sum_new_strings_lengths) noexcept {
1529 const size_t num_strings = string_memory_ids.size();
1531 checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1532 checkAndConditionallyIncreaseOffsetCapacity(
sizeof(
StringIdxEntry) * num_strings);
1534 for (
size_t i = 0; i < num_strings; ++i) {
1535 const size_t string_idx = string_memory_ids[i];
1536 const String str = input_strings[string_idx];
1537 const size_t str_size(str.size());
1538 memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1539 StringIdxEntry str_meta{
static_cast<uint64_t
>(payload_file_off_), str_size};
1540 payload_file_off_ += str_size;
1541 memcpy(offset_map_ + str_count_ + i, &str_meta,
sizeof(str_meta));
1546 const int string_id)
const noexcept {
1548 return {payload_map_ + str_meta->
off, str_meta->
size};
1552 const int string_id)
const noexcept {
1559 if (str_meta->
size == 0xffff) {
1561 return {
nullptr, 0,
true};
1563 return {payload_map_ + str_meta->
off, str_meta->
size,
false};
1568 payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1570 payload_map_ =
static_cast<char*
>(
1571 addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1577 offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1580 addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1586 const size_t min_capacity_requested) noexcept {
1587 const size_t canary_buff_size_to_add =
1591 if (canary_buffer_size < canary_buff_size_to_add) {
1592 CANARY_BUFFER =
static_cast<char*
>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1593 canary_buffer_size = canary_buff_size_to_add;
1594 CHECK(CANARY_BUFFER);
1595 memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1598 CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1599 const auto write_return =
write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1600 CHECK(write_return > 0 &&
1601 (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1602 return canary_buff_size_to_add;
1607 const size_t min_capacity_requested) noexcept {
1608 const size_t canary_buff_size_to_add =
1611 if (canary_buffer_size < canary_buff_size_to_add) {
1613 reinterpret_cast<char*
>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1614 canary_buffer_size = canary_buff_size_to_add;
1615 CHECK(CANARY_BUFFER);
1616 memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1618 void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1620 void* write_addr =
reinterpret_cast<void*
>(
static_cast<char*
>(new_addr) + mem_size);
1621 CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1622 mem_size += canary_buff_size_to_add;
1671 return static_cast<bool>(
client_);
1677 std::vector<int32_t> temp_sorted_cache;
1678 for (
size_t i = cur_cache_size; i <
str_count_; i++) {
1679 temp_sorted_cache.push_back(i);
1691 std::sort(cache.begin(), cache.end(), [
this](int32_t
a, int32_t b) {
1694 return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1700 std::vector<int32_t> updated_cache(temp_sorted_cache.size() +
sorted_cache.size());
1701 size_t t_idx = 0, s_idx = 0, idx = 0;
1702 for (; t_idx < temp_sorted_cache.size() && s_idx <
sorted_cache.size(); idx++) {
1705 const auto insert_from_temp_cache =
1706 string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1707 if (insert_from_temp_cache) {
1708 updated_cache[idx] = temp_sorted_cache[t_idx++];
1713 while (t_idx < temp_sorted_cache.size()) {
1714 updated_cache[idx++] = temp_sorted_cache[t_idx++];
1723 std::vector<int32_t>& dest_ids,
1725 const std::vector<int32_t>& source_ids,
1727 const std::vector<std::string const*>& transient_string_vec) {
1728 std::vector<std::string> strings;
1730 for (
const int32_t source_id : source_ids) {
1731 if (source_id == std::numeric_limits<int32_t>::min()) {
1732 strings.emplace_back(
"");
1733 }
else if (source_id < 0) {
1735 CHECK_LT(string_index, transient_string_vec.size()) <<
"source_id=" << source_id;
1736 strings.emplace_back(*transient_string_vec[string_index]);
1738 strings.push_back(source_dict->
getString(source_id));
1742 dest_ids.resize(strings.size());
1747 std::vector<std::vector<int32_t>>& dest_array_ids,
1749 const std::vector<std::vector<int32_t>>& source_array_ids,
1751 dest_array_ids.resize(source_array_ids.size());
1753 std::atomic<size_t> row_idx{0};
1754 auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1757 auto row = row_idx.fetch_add(1);
1759 if (row >= dest_array_ids.size()) {
1762 const auto& source_ids = source_array_ids[row];
1763 auto& dest_ids = dest_array_ids[row];
1768 const int num_worker_threads = std::thread::hardware_concurrency();
1770 if (source_array_ids.size() / num_worker_threads > 10) {
1771 std::vector<std::future<void>> worker_threads;
1772 for (
int i = 0; i < num_worker_threads; ++i) {
1776 for (
auto& child : worker_threads) {
1779 for (
auto& child : worker_threads) {
1788 const size_t generation)
const {
1790 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
1801 CHECK_LE(num_strings, std::numeric_limits<int32_t>::max());
1803 std::vector<std::string_view> string_views(num_strings);
1805 if (num_strings == 0) {
1806 return string_views;
1808 constexpr int64_t tbb_parallel_threshold{1000};
1809 if (num_strings < tbb_parallel_threshold) {
1811 for (int32_t string_idx = 0; string_idx < num_strings; ++string_idx) {
1815 constexpr int64_t target_strings_per_thread{1000};
1817 std::thread::hardware_concurrency(), num_strings, target_strings_per_thread);
1821 tbb::task_arena limited_arena(thread_info.
num_threads);
1822 limited_arena.execute([&] {
1824 tbb::blocked_range<int64_t>(
1826 [&](
const tbb::blocked_range<int64_t>& r) {
1828 const int32_t start_idx = r.begin();
1829 const int32_t end_idx = r.end();
1830 for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1834 tbb::simple_partitioner());
1837 return string_views;
1845 const std::shared_ptr<StringDictionary> dest_dict,
1849 const size_t num_dest_strings = dest_dict->storageEntryCount();
1850 std::vector<int32_t> translated_ids(num_source_strings);
1853 translated_ids.data(),
1858 dest_transient_lookup_callback,
1860 return translated_ids;
1865 std::shared_lock<std::shared_mutex>& source_read_lock,
1866 std::shared_lock<std::shared_mutex>& dest_read_lock) {
1867 const bool dicts_are_same = (source_dict_key == dest_dict_key);
1868 const bool source_dict_is_locked_first = (source_dict_key < dest_dict_key);
1869 if (dicts_are_same) {
1871 dest_read_lock.lock();
1872 }
else if (source_dict_is_locked_first) {
1873 source_read_lock.lock();
1874 dest_read_lock.lock();
1876 dest_read_lock.lock();
1877 source_read_lock.lock();
1883 int32_t* translated_ids,
1884 const int64_t source_generation,
1885 const int64_t dest_generation,
1886 const bool dest_has_transients,
1888 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos)
const {
1892 const int64_t num_source_strings = source_generation;
1893 const int64_t num_dest_strings = dest_generation;
1896 if (num_source_strings == 0L) {
1905 throw std::runtime_error(
1906 "Cannot translate between a local source and remote destination dictionary.");
1911 std::shared_lock<std::shared_mutex> source_read_lock(
rw_mutex_, std::defer_lock);
1912 std::shared_lock<std::shared_mutex> dest_read_lock(dest_dict->
rw_mutex_,
1924 const bool dest_dictionary_is_empty = (num_dest_strings == 0);
1926 constexpr int64_t target_strings_per_thread{1000};
1928 std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
1937 const StringOps_Namespace::StringOps string_ops(string_op_infos);
1938 const bool has_string_ops = string_ops.size();
1940 tbb::task_arena limited_arena(thread_info.
num_threads);
1941 std::vector<size_t> num_strings_not_translated_per_thread(thread_info.
num_threads, 0UL);
1942 constexpr
bool short_circuit_empty_dictionary_translations{
false};
1943 limited_arena.execute([&] {
1944 if (short_circuit_empty_dictionary_translations && dest_dictionary_is_empty) {
1946 tbb::blocked_range<int32_t>(
1950 [&](
const tbb::blocked_range<int32_t>& r) {
1951 const int32_t start_idx = r.begin();
1952 const int32_t end_idx = r.end();
1953 for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1957 tbb::simple_partitioner());
1958 num_strings_not_translated_per_thread[0] += num_source_strings;
1967 tbb::blocked_range<int32_t>(
1971 [&](
const tbb::blocked_range<int32_t>& r) {
1972 const int32_t start_idx = r.begin();
1973 const int32_t end_idx = r.end();
1974 size_t num_strings_not_translated = 0;
1975 std::string string_ops_storage;
1977 for (int32_t source_string_id = start_idx; source_string_id != end_idx;
1978 ++source_string_id) {
1979 const std::string_view source_str =
1984 if (source_str.empty()) {
1985 translated_ids[source_string_id] = inline_int_null_value<int32_t>();
1999 const auto translated_string_id =
2001 translated_ids[source_string_id] = translated_string_id;
2004 translated_string_id >= num_dest_strings) {
2005 if (dest_has_transients) {
2006 num_strings_not_translated +=
2007 dest_transient_lookup_callback(source_str, source_string_id);
2009 num_strings_not_translated++;
2014 const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
2015 num_strings_not_translated_per_thread[tbb_thread_idx] +=
2016 num_strings_not_translated;
2018 tbb::simple_partitioner());
2021 size_t total_num_strings_not_translated = 0;
2022 for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
2023 total_num_strings_not_translated += num_strings_not_translated_per_thread[thread_idx];
2025 return total_num_strings_not_translated;
2029 Datum* translated_ids,
2030 const int64_t source_generation,
2031 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos)
const {
2034 CHECK_GT(string_op_infos.size(), 0UL);
2035 CHECK(!string_op_infos.back().getReturnType().is_string());
2036 const int64_t num_source_strings = source_generation;
2039 if (num_source_strings == 0L) {
2048 std::shared_lock<std::shared_mutex> source_read_lock(
rw_mutex_);
2057 constexpr int64_t target_strings_per_thread{1000};
2059 std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
2068 const StringOps_Namespace::StringOps string_ops(string_op_infos);
2071 tbb::task_arena limited_arena(thread_info.
num_threads);
2078 limited_arena.execute([&] {
2080 tbb::blocked_range<int32_t>(
2082 [&](
const tbb::blocked_range<int32_t>& r) {
2083 const int32_t start_idx = r.begin();
2084 const int32_t end_idx = r.end();
2085 for (int32_t source_string_id = start_idx; source_string_id != end_idx;
2086 ++source_string_id) {
2087 const std::string source_str =
2089 translated_ids[source_string_id] = string_ops.numericEval(source_str);
2098 const std::vector<int32_t>& source_ids,
2100 const int32_t dest_generation) {
2103 dict_server_host, {temp_dict_key.
db_id, temp_dict_key.
dict_id},
false);
2112 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
void throw_string_too_long_error(std::string_view str, const shared::StringDictKey &dict_key)
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
size_t payload_file_size_
bool isClient() const noexcept
void increaseHashTableCapacity() noexcept
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
string_dict_hash_t hash_string(const std::string_view &str)
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
void operator()(std::string_view const, int32_t const string_id) override
size_t getBulk(const std::vector< String > &string_vec, T *encoded_vec) const
int64_t num_elems_per_thread
client_no_timeout_(new StringDictionaryClient(host,{dict_key.db_id, dict_key.dict_id}, false))
std::vector< int32_t > buildDictionaryTranslationMap(const std::shared_ptr< StringDictionary > dest_dict, StringLookupCallback const &dest_transient_lookup_callback) const
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
std::vector< std::string > copyStrings() const
void buildDictionaryNumericTranslationMap(Datum *translated_ids, const int64_t source_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
const shared::StringDictKey & getDictKey() const noexcept
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int64_t > > like_i64_cache_
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
size_t storageEntryCount() const
StringDictionary(const shared::StringDictKey &dict_key, const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE void sort(ARGS &&...args)
bool fillRateIsHigh(const size_t num_strings) const noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
Constants for Builtin SQL Types supported by HEAVY.AI.
RUNTIME_EXPORT DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, char escape_char)
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_i32_cache_
std::string offsets_path_
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::vector< std::string const * > &transient_string_vec={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
std::vector< T > getLikeImpl(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
std::unordered_map< std::string, int32_t > map_
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
int32_t getOrAdd(const std::string &str) noexcept
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
size_t computeCacheSize() const
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
int32_t getIdOfString(const String &) const
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
std::string_view getStringView(int32_t string_id) const
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
size_t compare_cache_size_
future< Result > async(Fn &&fn, Args &&...args)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
DEVICE auto copy(ARGS &&...args)
int open(const char *path, int flags, int mode)
RUNTIME_EXPORT DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
DEVICE void partial_sum(ARGS &&...args)
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
std::map< std::string, int32_t > equal_cache_
DEVICE auto accumulate(ARGS &&...args)
void order_translation_locks(const shared::StringDictKey &source_dict_key, const shared::StringDictKey &dest_dict_key, std::shared_lock< std::shared_mutex > &source_read_lock, std::shared_lock< std::shared_mutex > &dest_read_lock)
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
Functions to support the LIKE and ILIKE operator in SQL. Only single-byte character set is supported ...
int32_t getUnlocked(const std::string_view sv) const noexcept
void appendToStorage(const String str) noexcept
int checked_open(const char *path, const bool recover)
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
const shared::StringDictKey dict_key_
std::unordered_map< std::string, int32_t > moveMap()
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const shared::StringDictKey &dest_dict_key, const std::vector< int32_t > &source_ids, const shared::StringDictKey &source_dict_key, const int32_t dest_generation)
const int SYSTEM_PAGE_SIZE
bool checkpoint() noexcept
void throw_encoding_error(std::string_view str, const shared::StringDictKey &dict_key)
RUNTIME_EXPORT DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, char escape_char)
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
DEVICE auto lower_bound(ARGS &&...args)
void eachStringSerially(int64_t const generation, StringCallback &) const
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
void update_leaf(const LeafHostInfo &host_info)
size_t strings_cache_size_
int64_t num_elems_per_thread
std::string getString(int32_t string_id) const
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
std::vector< std::string_view > getStringViews() const
int msync(void *addr, size_t length, bool async)
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
std::function< int32_t(std::string const &)> makeLambdaStringToId() const
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define DEBUG_TIMER(name)
void checked_munmap(void *addr, size_t length)
const uint64_t round_up_p2(const uint64_t num)
std::string_view getStringViewUnlocked(int32_t string_id) const noexcept
std::vector< int32_t > string_id_string_dict_hash_table_
ThreadInfo(const int64_t max_thread_count, const int64_t num_elems, const int64_t target_elems_per_thread)
void sortCache(std::vector< int32_t > &cache)
std::string_view getStringViewChecked(const int string_id) const noexcept
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
std::function< bool(std::string_view, int32_t string_id)> StringLookupCallback
bool g_enable_stringdict_parallel
PayloadString getStringFromStorage(const int string_id) const noexcept
RUNTIME_EXPORT DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
size_t file_size(const int fd)
static unsigned transientIdToIndex(int32_t const id)
void operator()(std::string const &str, int32_t const string_id) override
DEVICE void swap(ARGS &&...args)
std::vector< int32_t > sorted_cache
int32_t getOrAddImpl(const std::string_view &str) noexcept
~StringDictionary() noexcept
void * checked_mmap(const int fd, const size_t sz)
RUNTIME_EXPORT DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)