OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StringDictionary.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Shared/DatumFetchers.h"
19 #include "StringOps/StringOps.h"
20 
21 #include <tbb/parallel_for.h>
22 #include <tbb/task_arena.h>
23 #include <algorithm>
24 #include <boost/filesystem/operations.hpp>
25 #include <boost/filesystem/path.hpp>
26 #include <boost/iterator/transform_iterator.hpp>
27 #include <boost/sort/spreadsort/string_sort.hpp>
28 #include <functional>
29 #include <future>
30 #include <iostream>
31 #include <numeric>
32 #include <string_view>
33 #include <thread>
34 #include <type_traits>
35 
36 // TODO(adb): fixup
37 #ifdef _WIN32
38 #include <fcntl.h>
39 #include <io.h>
40 #else
41 #include <sys/fcntl.h>
42 #endif
43 
44 #include "Logger/Logger.h"
45 #include "OSDependent/heavyai_fs.h"
46 #include "Shared/sqltypes.h"
47 #include "Shared/thread_count.h"
48 #include "StringDictionaryClient.h"
49 #include "Utils/Regexp.h"
50 #include "Utils/StringLike.h"
51 
52 #include "LeafHostInfo.h"
53 #include "Shared/measure.h"
54 
56 
57 namespace {
58 
60 
61 int checked_open(const char* path, const bool recover) {
62  auto fd = heavyai::open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
63  if (fd > 0) {
64  return fd;
65  }
66  auto err = std::string("Dictionary path ") + std::string(path) +
67  std::string(" does not exist.");
68  LOG(ERROR) << err;
69  throw DictPayloadUnavailable(err);
70 }
71 
72 const uint64_t round_up_p2(const uint64_t num) {
73  uint64_t in = num;
74  in--;
75  in |= in >> 1;
76  in |= in >> 2;
77  in |= in >> 4;
78  in |= in >> 8;
79  in |= in >> 16;
80  in++;
81  // TODO MAT deal with case where filesize has been increased but reality is
82  // we are constrained to 2^31.
83  // In that situation this calculation will wrap to zero
84  if (in == 0 || (in > (UINT32_MAX))) {
85  in = UINT32_MAX;
86  }
87  return in;
88 }
89 
90 string_dict_hash_t hash_string(const std::string_view& str) {
91  string_dict_hash_t str_hash = 1;
92  // rely on fact that unsigned overflow is defined and wraps
93  for (size_t i = 0; i < str.size(); ++i) {
94  str_hash = str_hash * 997 + str[i];
95  }
96  return str_hash;
97 }
98 
99 struct ThreadInfo {
100  int64_t num_threads{0};
102 
103  ThreadInfo(const int64_t max_thread_count,
104  const int64_t num_elems,
105  const int64_t target_elems_per_thread) {
106  num_threads =
107  std::min(std::max(max_thread_count, int64_t(1)),
108  ((num_elems + target_elems_per_thread - 1) / target_elems_per_thread));
109  num_elems_per_thread =
110  std::max((num_elems + num_threads - 1) / num_threads, int64_t(1));
111  }
112 };
113 
114 } // namespace
115 
117 constexpr int32_t StringDictionary::INVALID_STR_ID;
118 constexpr size_t StringDictionary::MAX_STRLEN;
119 constexpr size_t StringDictionary::MAX_STRCOUNT;
120 
122  const std::string& folder,
123  const bool isTemp,
124  const bool recover,
125  const bool materializeHashes,
126  size_t initial_capacity)
127  : dict_key_(dict_key)
128  , folder_(folder)
129  , str_count_(0)
130  , string_id_string_dict_hash_table_(initial_capacity, INVALID_STR_ID)
131  , hash_cache_(initial_capacity)
132  , isTemp_(isTemp)
133  , materialize_hashes_(materializeHashes)
134  , payload_fd_(-1)
135  , offset_fd_(-1)
136  , offset_map_(nullptr)
137  , payload_map_(nullptr)
138  , offset_file_size_(0)
139  , payload_file_size_(0)
140  , payload_file_off_(0)
141  , like_cache_size_(0)
142  , regex_cache_size_(0)
143  , equal_cache_size_(0)
144  , compare_cache_size_(0)
145  , strings_cache_(nullptr)
146  , strings_cache_size_(0) {
147  if (!isTemp && folder.empty()) {
148  return;
149  }
150 
151  // initial capacity must be a power of two for efficient bucket computation
152  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
153  if (!isTemp_) {
154  boost::filesystem::path storage_path(folder);
155  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
156  const auto payload_path =
157  (storage_path / boost::filesystem::path("DictPayload")).string();
158  payload_fd_ = checked_open(payload_path.c_str(), recover);
159  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
162  }
163  bool storage_is_empty = false;
164  if (payload_file_size_ == 0) {
165  storage_is_empty = true;
167  }
168  if (offset_file_size_ == 0) {
170  }
171  if (!isTemp_) { // we never mmap or recover temp dictionaries
172  payload_map_ =
173  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
174  offset_map_ = reinterpret_cast<StringIdxEntry*>(
176  if (recover) {
177  const size_t bytes = heavyai::file_size(offset_fd_);
178  if (bytes % sizeof(StringIdxEntry) != 0) {
179  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
180  }
181  const uint64_t str_count =
182  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
183  collisions_ = 0;
184  // at this point we know the size of the StringDict we need to load
185  // so lets reallocate the vector to the correct size
186  const uint64_t max_entries =
187  std::max(round_up_p2(str_count * 2 + 1),
188  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
189  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
190  string_id_string_dict_hash_table_.swap(new_str_ids);
191  if (materialize_hashes_) {
192  std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
193  hash_cache_.swap(new_hash_cache);
194  }
195  // Bail early if we know we don't have strings to add (i.e. a new or empty
196  // dictionary)
197  if (str_count == 0) {
198  return;
199  }
200 
201  unsigned string_id = 0;
202  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
203 
204  uint32_t thread_inits = 0;
205  const auto thread_count = std::thread::hardware_concurrency();
206  const uint32_t items_per_thread = std::max<uint32_t>(
207  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
208  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
209  dictionary_futures;
210  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
211  dictionary_futures.emplace_back(std::async(
212  std::launch::async, [string_id, str_count, items_per_thread, this] {
213  std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
214  for (uint32_t curr_id = string_id;
215  curr_id < string_id + items_per_thread && curr_id < str_count;
216  curr_id++) {
217  const auto recovered = getStringFromStorage(curr_id);
218  if (recovered.canary) {
219  // hit the canary, recovery finished
220  break;
221  } else {
222  std::string_view temp(recovered.c_str_ptr, recovered.size);
223  hashVec.emplace_back(std::make_pair(hash_string(temp), temp.size()));
224  }
225  }
226  return hashVec;
227  }));
228  thread_inits++;
229  if (thread_inits % thread_count == 0) {
230  processDictionaryFutures(dictionary_futures);
231  }
232  }
233  // gather last few threads
234  if (dictionary_futures.size() != 0) {
235  processDictionaryFutures(dictionary_futures);
236  }
237  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
238  << " Hash table size: " << string_id_string_dict_hash_table_.size()
239  << " Fill rate: "
240  << static_cast<double>(str_count_) * 100.0 /
242  << "% Collisions: " << collisions_;
243  }
244  }
245 }
246 
247 namespace {
249  std::unordered_map<std::string, int32_t> map_;
250 
251  public:
252  void operator()(std::string const& str, int32_t const string_id) override {
253  auto const emplaced = map_.emplace(str, string_id);
254  CHECK(emplaced.second) << "str(" << str << ") string_id(" << string_id << ')';
255  }
256  void operator()(std::string_view const, int32_t const string_id) override {
257  UNREACHABLE() << "MapMaker must be called with a std::string.";
258  }
259  std::unordered_map<std::string, int32_t> moveMap() { return std::move(map_); }
260 };
261 } // namespace
262 
263 std::function<int32_t(std::string const&)> StringDictionary::makeLambdaStringToId()
264  const {
265  CHECK(isClient());
266  constexpr size_t big_gen = static_cast<size_t>(std::numeric_limits<size_t>::max());
267  MapMaker map_maker;
268  eachStringSerially(big_gen, map_maker);
269  return [map{map_maker.moveMap()}](std::string const& str) {
270  auto const itr = map.find(str);
271  return itr == map.cend() ? INVALID_STR_ID : itr->second;
272  };
273 }
274 
275 // Call serial_callback for each (string/_view, string_id). Must be called serially.
276 void StringDictionary::eachStringSerially(int64_t const generation,
277  StringCallback& serial_callback) const {
278  if (isClient()) {
279  // copyStrings() is not supported when isClient().
280  std::string str; // Import buffer. Placing outside of loop should reduce allocations.
281  size_t const n = std::min(static_cast<size_t>(generation), storageEntryCount());
282  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
283  for (unsigned id = 0; id < n; ++id) {
284  {
285  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
286  client_->get_string(str, id);
287  }
288  serial_callback(str, id);
289  }
290  } else {
291  size_t const n = std::min(static_cast<size_t>(generation), str_count_);
292  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
293  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
294  for (unsigned id = 0; id < n; ++id) {
295  serial_callback(getStringFromStorageFast(static_cast<int>(id)), id);
296  }
297  }
298 }
299 
301  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>&
302  dictionary_futures) {
303  for (auto& dictionary_future : dictionary_futures) {
304  dictionary_future.wait();
305  const auto hashVec = dictionary_future.get();
306  for (const auto& hash : hashVec) {
307  const uint32_t bucket =
309  payload_file_off_ += hash.second;
310  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
311  if (materialize_hashes_) {
312  hash_cache_[str_count_] = hash.first;
313  }
314  ++str_count_;
315  }
316  }
317  dictionary_futures.clear();
318 }
319 
321  return dict_key_;
322 }
323 
332  const size_t storage_slots) const noexcept {
333  if (storage_slots == 0) {
334  return 0;
335  }
336  // Must use signed integers since final binary search step can wrap to max size_t value
337  // if dictionary is empty
338  int64_t min_bound = 0;
339  int64_t max_bound = storage_slots - 1;
340  int64_t guess{0};
341  while (min_bound <= max_bound) {
342  guess = (max_bound + min_bound) / 2;
343  CHECK_GE(guess, 0);
344  if (getStringFromStorage(guess).canary) {
345  max_bound = guess - 1;
346  } else {
347  min_bound = guess + 1;
348  }
349  }
350  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
351  return guess + (min_bound > guess ? 1 : 0);
352 }
353 
355  const shared::StringDictKey& dict_key)
356  : dict_key_(dict_key)
357  , folder_("DB_" + std::to_string(dict_key.db_id) + "_DICT_" +
358  std::to_string(dict_key.dict_id))
359  , strings_cache_(nullptr)
360  , client_(new StringDictionaryClient(host, {dict_key.db_id, dict_key.dict_id}, true))
362  new StringDictionaryClient(host, {dict_key.db_id, dict_key.dict_id}, false)) {}
363 
365  free(CANARY_BUFFER);
366  if (isClient()) {
367  return;
368  }
369  if (payload_map_) {
370  if (!isTemp_) {
374  CHECK_GE(payload_fd_, 0);
376  CHECK_GE(offset_fd_, 0);
378  } else {
380  free(payload_map_);
381  free(offset_map_);
382  }
383  }
384 }
385 
387 
388 int32_t StringDictionary::getOrAdd(const std::string& str) noexcept {
389  if (isClient()) {
390  std::vector<int32_t> string_ids;
391  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
392  CHECK_EQ(size_t(1), string_ids.size());
393  return string_ids.front();
394  }
395  return getOrAddImpl(str);
396 }
397 
398 namespace {
399 
400 template <class T>
401 void throw_encoding_error(std::string_view str, const shared::StringDictKey& dict_key) {
402  std::ostringstream oss;
403  oss << "The text encoded column using dictionary " << dict_key
404  << " has exceeded it's limit of " << sizeof(T) * 8 << " bits ("
405  << static_cast<size_t>(max_valid_int_value<T>() + 1) << " unique values) "
406  << "while attempting to add the new string '" << str << "'. ";
407 
408  if (sizeof(T) < 4) {
409  // Todo: Implement automatic type widening for dictionary-encoded text
410  // columns/all fixed length columm types (at least if not defined
411  // with fixed encoding size), or short of that, ALTER TABLE
412  // COLUMN TYPE to at least allow the user to do this manually
413  // without re-creating the table
414 
415  oss << "To load more data, please re-create the table with "
416  << "this column as type TEXT ENCODING DICT(" << sizeof(T) * 2 * 8 << ") ";
417  if (sizeof(T) == 1) {
418  oss << "or TEXT ENCODING DICT(32) ";
419  }
420  oss << "and reload your data.";
421  } else {
422  // Todo: Implement TEXT ENCODING DICT(64) type which should essentially
423  // preclude overflows.
424  oss << "Currently dictionary-encoded text columns support a maximum of "
426  << " strings. Consider recreating the table with "
427  << "this column as type TEXT ENCODING NONE and reloading your data.";
428  }
429  LOG(ERROR) << oss.str();
430  throw std::runtime_error(oss.str());
431 }
432 
433 void throw_string_too_long_error(std::string_view str,
434  const shared::StringDictKey& dict_key) {
435  std::ostringstream oss;
436  oss << "The string '" << str << " could not be inserted into the dictionary "
437  << dict_key << " because it exceeded the maximum allowable "
438  << "length of " << StringDictionary::MAX_STRLEN << " characters (string was "
439  << str.size() << " characters).";
440  LOG(ERROR) << oss.str();
441  throw std::runtime_error(oss.str());
442 }
443 
444 } // namespace
445 
446 template <class String>
448  const std::vector<std::vector<String>>& string_array_vec,
449  std::vector<std::vector<int32_t>>& ids_array_vec) {
450  if (client_no_timeout_) {
451  client_no_timeout_->get_or_add_bulk_array(ids_array_vec, string_array_vec);
452  return;
453  }
454 
455  ids_array_vec.resize(string_array_vec.size());
456  for (size_t i = 0; i < string_array_vec.size(); i++) {
457  auto& strings = string_array_vec[i];
458  auto& ids = ids_array_vec[i];
459  ids.resize(strings.size());
460  getOrAddBulk(strings, &ids[0]);
461  }
462 }
463 
465  const std::vector<std::vector<std::string>>& string_array_vec,
466  std::vector<std::vector<int32_t>>& ids_array_vec);
467 
469  const std::vector<std::vector<std::string_view>>& string_array_vec,
470  std::vector<std::vector<int32_t>>& ids_array_vec);
471 
477 template <class String>
479  const std::vector<String>& string_vec,
480  std::vector<string_dict_hash_t>& hashes) const noexcept {
481  CHECK_EQ(string_vec.size(), hashes.size());
482 
483  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
484  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
485  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
486  if (string_vec[curr_id].empty()) {
487  continue;
488  }
489  hashes[curr_id] = hash_string(string_vec[curr_id]);
490  }
491  });
492 }
493 
494 template <class T, class String>
495 size_t StringDictionary::getBulk(const std::vector<String>& string_vec,
496  T* encoded_vec) const {
497  return getBulk(string_vec, encoded_vec, -1L /* generation */);
498 }
499 
500 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
501  uint8_t* encoded_vec) const;
502 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
503  uint16_t* encoded_vec) const;
504 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
505  int32_t* encoded_vec) const;
506 
507 template <class T, class String>
508 size_t StringDictionary::getBulk(const std::vector<String>& string_vec,
509  T* encoded_vec,
510  const int64_t generation) const {
511  constexpr int64_t target_strings_per_thread{1000};
512  const int64_t num_lookup_strings = string_vec.size();
513  if (num_lookup_strings == 0) {
514  return 0;
515  }
516 
517  const ThreadInfo thread_info(
518  std::thread::hardware_concurrency(), num_lookup_strings, target_strings_per_thread);
519  CHECK_GE(thread_info.num_threads, 1L);
520  CHECK_GE(thread_info.num_elems_per_thread, 1L);
521 
522  std::vector<size_t> num_strings_not_found_per_thread(thread_info.num_threads, 0UL);
523 
524  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
525  const int64_t num_dict_strings = generation >= 0 ? generation : storageEntryCount();
526  const bool dictionary_is_empty = (num_dict_strings == 0);
527  if (dictionary_is_empty) {
528  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_lookup_strings),
529  [&](const tbb::blocked_range<int64_t>& r) {
530  const int64_t start_idx = r.begin();
531  const int64_t end_idx = r.end();
532  for (int64_t string_idx = start_idx; string_idx < end_idx;
533  ++string_idx) {
534  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
535  }
536  });
537  return num_lookup_strings;
538  }
539  // If we're here the generation-capped dictionary has strings in it
540  // that we need to look up against
541 
542  tbb::task_arena limited_arena(thread_info.num_threads);
543  limited_arena.execute([&] {
545  tbb::blocked_range<int64_t>(
546  0, num_lookup_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
547  [&](const tbb::blocked_range<int64_t>& r) {
548  const int64_t start_idx = r.begin();
549  const int64_t end_idx = r.end();
550  size_t num_strings_not_found = 0;
551  for (int64_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
552  const auto& input_string = string_vec[string_idx];
553  if (input_string.empty()) {
554  encoded_vec[string_idx] = inline_int_null_value<T>();
555  continue;
556  }
557  if (input_string.size() > StringDictionary::MAX_STRLEN) {
558  throw_string_too_long_error(input_string, dict_key_);
559  }
560  const string_dict_hash_t input_string_hash = hash_string(input_string);
561  uint32_t hash_bucket = computeBucket(
562  input_string_hash, input_string, string_id_string_dict_hash_table_);
563  // Will either be legit id or INVALID_STR_ID
564  const auto string_id = string_id_string_dict_hash_table_[hash_bucket];
565  if (string_id == StringDictionary::INVALID_STR_ID ||
566  string_id >= num_dict_strings) {
567  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
568  num_strings_not_found++;
569  continue;
570  }
571  encoded_vec[string_idx] = string_id;
572  }
573  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
574  num_strings_not_found_per_thread[tbb_thread_idx] = num_strings_not_found;
575  },
576  tbb::simple_partitioner());
577  });
578 
579  size_t num_strings_not_found = 0;
580  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
581  num_strings_not_found += num_strings_not_found_per_thread[thread_idx];
582  }
583  return num_strings_not_found;
584 }
585 
586 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
587  uint8_t* encoded_vec,
588  const int64_t generation) const;
589 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
590  uint16_t* encoded_vec,
591  const int64_t generation) const;
592 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
593  int32_t* encoded_vec,
594  const int64_t generation) const;
595 
596 template <class T, class String>
597 void StringDictionary::getOrAddBulk(const std::vector<String>& input_strings,
598  T* output_string_ids) {
600  getOrAddBulkParallel(input_strings, output_string_ids);
601  return;
602  }
603  // Single-thread path.
604  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
605 
606  const size_t initial_str_count = str_count_;
607  size_t idx = 0;
608  for (const auto& input_string : input_strings) {
609  if (input_string.empty()) {
610  output_string_ids[idx++] = inline_int_null_value<T>();
611  continue;
612  }
613  CHECK(input_string.size() <= MAX_STRLEN);
614 
615  const string_dict_hash_t input_string_hash = hash_string(input_string);
616  uint32_t hash_bucket =
617  computeBucket(input_string_hash, input_string, string_id_string_dict_hash_table_);
619  output_string_ids[idx++] = string_id_string_dict_hash_table_[hash_bucket];
620  continue;
621  }
622  // need to add record to dictionary
623  // check there is room
624  if (str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
625  throw_encoding_error<T>(input_string, dict_key_);
626  }
628  << "Maximum number (" << str_count_
629  << ") of Dictionary encoded Strings reached for this column, offset path "
630  "for column is "
631  << offsets_path_;
632  if (fillRateIsHigh(str_count_)) {
633  // resize when more than 50% is full
635  hash_bucket = computeBucket(
636  input_string_hash, input_string, string_id_string_dict_hash_table_);
637  }
638  appendToStorage(input_string);
639 
640  if (materialize_hashes_) {
641  hash_cache_[str_count_] = input_string_hash;
642  }
643  const int32_t string_id = static_cast<int32_t>(str_count_);
644  string_id_string_dict_hash_table_[hash_bucket] = string_id;
645  output_string_ids[idx++] = string_id;
646  ++str_count_;
647  }
648  const size_t num_strings_added = str_count_ - initial_str_count;
649  if (num_strings_added > 0) {
651  }
652 }
653 
654 template <class T, class String>
655 void StringDictionary::getOrAddBulkParallel(const std::vector<String>& input_strings,
656  T* output_string_ids) {
657  // Compute hashes of the input strings up front, and in parallel,
658  // as the string hashing does not need to be behind the subsequent write_lock
659  std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
660  hashStrings(input_strings, input_strings_hashes);
661 
662  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
663  size_t shadow_str_count =
664  str_count_; // Need to shadow str_count_ now with bulk add methods
665  const size_t storage_high_water_mark = shadow_str_count;
666  std::vector<size_t> string_memory_ids;
667  size_t sum_new_string_lengths = 0;
668  string_memory_ids.reserve(input_strings.size());
669  size_t input_string_idx{0};
670  for (const auto& input_string : input_strings) {
671  // Currently we make empty strings null
672  if (input_string.empty()) {
673  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
674  continue;
675  }
676  // TODO: Recover gracefully if an input string is too long
677  CHECK(input_string.size() <= MAX_STRLEN);
678 
679  if (fillRateIsHigh(shadow_str_count)) {
680  // resize when more than 50% is full
682  storage_high_water_mark,
683  input_strings,
684  string_memory_ids,
685  input_strings_hashes);
686  }
687  // Compute the hash for this input_string
688  const string_dict_hash_t input_string_hash = input_strings_hashes[input_string_idx];
689 
690  const uint32_t hash_bucket =
691  computeBucketFromStorageAndMemory(input_string_hash,
692  input_string,
694  storage_high_water_mark,
695  input_strings,
696  string_memory_ids);
697 
698  // If the hash bucket is not empty, that is our string id
699  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
700  // bucket string are equal)
702  output_string_ids[input_string_idx++] =
704  continue;
705  }
706  // Did not find string, so need to add record to dictionary
707  // First check there is room
708  if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
709  throw_encoding_error<T>(input_string, dict_key_);
710  }
711  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
712  << "Maximum number (" << shadow_str_count
713  << ") of Dictionary encoded Strings reached for this column, offset path "
714  "for column is "
715  << offsets_path_;
716 
717  string_memory_ids.push_back(input_string_idx);
718  sum_new_string_lengths += input_string.size();
719  string_id_string_dict_hash_table_[hash_bucket] =
720  static_cast<int32_t>(shadow_str_count);
721  if (materialize_hashes_) {
722  hash_cache_[shadow_str_count] = input_string_hash;
723  }
724  output_string_ids[input_string_idx++] = shadow_str_count++;
725  }
726  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
727  const size_t num_strings_added = shadow_str_count - str_count_;
728  str_count_ = shadow_str_count;
729  if (num_strings_added > 0) {
731  }
732 }
733 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
734  uint8_t* encoded_vec);
735 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
736  uint16_t* encoded_vec);
737 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
738  int32_t* encoded_vec);
739 
740 template void StringDictionary::getOrAddBulk(
741  const std::vector<std::string_view>& string_vec,
742  uint8_t* encoded_vec);
743 template void StringDictionary::getOrAddBulk(
744  const std::vector<std::string_view>& string_vec,
745  uint16_t* encoded_vec);
746 template void StringDictionary::getOrAddBulk(
747  const std::vector<std::string_view>& string_vec,
748  int32_t* encoded_vec);
749 
750 template <class String>
751 int32_t StringDictionary::getIdOfString(const String& str) const {
752  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
753  if (isClient()) {
754  if constexpr (std::is_same_v<std::string, std::decay_t<String>>) {
755  return client_->get(str);
756  } else {
757  return client_->get(std::string(str));
758  }
759  }
760  return getUnlocked(str);
761 }
762 
763 template int32_t StringDictionary::getIdOfString(const std::string&) const;
764 template int32_t StringDictionary::getIdOfString(const std::string_view&) const;
765 
766 int32_t StringDictionary::getUnlocked(const std::string_view sv) const noexcept {
767  const string_dict_hash_t hash = hash_string(sv);
768  auto str_id = string_id_string_dict_hash_table_[computeBucket(
769  hash, sv, string_id_string_dict_hash_table_)];
770  return str_id;
771 }
772 
773 std::string StringDictionary::getString(int32_t string_id) const {
774  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
775  if (isClient()) {
776  std::string ret;
777  client_->get_string(ret, string_id);
778  return ret;
779  }
780  return getStringUnlocked(string_id);
781 }
782 
783 std::string_view StringDictionary::getStringView(int32_t string_id) const {
784  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
785  CHECK(!isClient()) << "use of this function is unsupported in distributed";
786  return getStringViewUnlocked(string_id);
787 }
788 
789 std::string StringDictionary::getStringUnlocked(int32_t string_id) const noexcept {
790  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
791  return getStringChecked(string_id);
792 }
793 
795  int32_t string_id) const noexcept {
796  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
797  return getStringViewChecked(string_id);
798 }
799 
800 std::pair<char*, size_t> StringDictionary::getStringBytes(
801  int32_t string_id) const noexcept {
802  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
803  CHECK(!isClient());
804  CHECK_LE(0, string_id);
805  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
806  return getStringBytesChecked(string_id);
807 }
808 
810  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
811  if (isClient()) {
812  return client_->storage_entry_count();
813  }
814  return str_count_;
815 }
816 
817 template <typename T>
818 std::vector<T> StringDictionary::getLikeImpl(const std::string& pattern,
819  const bool icase,
820  const bool is_simple,
821  const char escape,
822  const size_t generation) const {
823  constexpr size_t grain_size{1000};
824  auto is_like_impl = icase ? is_simple ? string_ilike_simple : string_ilike
825  : is_simple ? string_like_simple
826  : string_like;
827  auto const num_threads = static_cast<size_t>(cpu_threads());
828  std::vector<std::vector<T>> worker_results(num_threads);
829  tbb::task_arena limited_arena(num_threads);
830  limited_arena.execute([&] {
832  tbb::blocked_range<size_t>(0, generation, grain_size),
833  [&is_like_impl, &pattern, &escape, &worker_results, this](
834  const tbb::blocked_range<size_t>& range) {
835  auto& result_vector =
836  worker_results[tbb::this_task_arena::current_thread_index()];
837  for (size_t i = range.begin(); i < range.end(); ++i) {
838  const auto str = getStringUnlocked(i);
839  if (is_like_impl(
840  str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape)) {
841  result_vector.push_back(i);
842  }
843  }
844  });
845  });
846  // partial_sum to get 1) a start offset for each thread and 2) the total # elems
847  std::vector<size_t> start_offsets(num_threads + 1, 0);
848  auto vec_size = [](std::vector<T> const& vec) { return vec.size(); };
849  auto begin = boost::make_transform_iterator(worker_results.begin(), vec_size);
850  auto end = boost::make_transform_iterator(worker_results.end(), vec_size);
851  std::partial_sum(begin, end, start_offsets.begin() + 1); // first element is 0
852 
853  std::vector<T> result(start_offsets[num_threads]);
854  limited_arena.execute([&] {
856  tbb::blocked_range<size_t>(0, num_threads, 1),
857  [&worker_results, &result, &start_offsets](
858  const tbb::blocked_range<size_t>& range) {
859  auto& result_vector = worker_results[range.begin()];
860  auto const start_offset = start_offsets[range.begin()];
861  std::copy(
862  result_vector.begin(), result_vector.end(), result.begin() + start_offset);
863  },
864  tbb::static_partitioner());
865  });
866  return result;
867 }
868 template <>
869 std::vector<int32_t> StringDictionary::getLike<int32_t>(const std::string& pattern,
870  const bool icase,
871  const bool is_simple,
872  const char escape,
873  const size_t generation) const {
874  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
875  if (isClient()) {
876  return client_->get_like_i32(pattern, icase, is_simple, escape, generation);
877  }
878  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
879  const auto it = like_i32_cache_.find(cache_key);
880  if (it != like_i32_cache_.end()) {
881  return it->second;
882  }
883 
884  auto result = getLikeImpl<int32_t>(pattern, icase, is_simple, escape, generation);
885  // place result into cache for reuse if similar query
886  const auto it_ok = like_i32_cache_.insert(std::make_pair(cache_key, result));
887  like_cache_size_ += (pattern.size() + 3 + (result.size() * sizeof(int32_t)));
888 
889  CHECK(it_ok.second);
890 
891  return result;
892 }
893 
894 template <>
895 std::vector<int64_t> StringDictionary::getLike<int64_t>(const std::string& pattern,
896  const bool icase,
897  const bool is_simple,
898  const char escape,
899  const size_t generation) const {
900  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
901  if (isClient()) {
902  return client_->get_like_i64(pattern, icase, is_simple, escape, generation);
903  }
904  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
905  const auto it = like_i64_cache_.find(cache_key);
906  if (it != like_i64_cache_.end()) {
907  return it->second;
908  }
909 
910  auto result = getLikeImpl<int64_t>(pattern, icase, is_simple, escape, generation);
911  // place result into cache for reuse if similar query
912  const auto it_ok = like_i64_cache_.insert(std::make_pair(cache_key, result));
913  like_cache_size_ += (pattern.size() + 3 + (result.size() * sizeof(int64_t)));
914 
915  CHECK(it_ok.second);
916 
917  return result;
918 }
919 
920 std::vector<int32_t> StringDictionary::getEquals(std::string pattern,
921  std::string comp_operator,
922  size_t generation) {
923  std::vector<int32_t> result;
924  auto eq_id_itr = equal_cache_.find(pattern);
925  int32_t eq_id = MAX_STRLEN + 1;
926  int32_t cur_size = str_count_;
927  if (eq_id_itr != equal_cache_.end()) {
928  eq_id = eq_id_itr->second;
929  if (comp_operator == "=") {
930  result.push_back(eq_id);
931  } else {
932  for (int32_t idx = 0; idx <= cur_size; idx++) {
933  if (idx == eq_id) {
934  continue;
935  }
936  result.push_back(idx);
937  }
938  }
939  } else {
940  std::vector<std::thread> workers;
941  int worker_count = cpu_threads();
942  CHECK_GT(worker_count, 0);
943  std::vector<std::vector<int32_t>> worker_results(worker_count);
944  CHECK_LE(generation, str_count_);
945  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
946  workers.emplace_back(
947  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
948  for (size_t string_id = worker_idx; string_id < generation;
949  string_id += worker_count) {
950  const auto str = getStringUnlocked(string_id);
951  if (str == pattern) {
952  worker_results[worker_idx].push_back(string_id);
953  }
954  }
955  });
956  }
957  for (auto& worker : workers) {
958  worker.join();
959  }
960  for (const auto& worker_result : worker_results) {
961  result.insert(result.end(), worker_result.begin(), worker_result.end());
962  }
963  if (result.size() > 0) {
964  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
965  equal_cache_size_ += (pattern.size() + (result.size() * sizeof(int32_t)));
966  CHECK(it_ok.second);
967  eq_id = result[0];
968  }
969  if (comp_operator == "<>") {
970  for (int32_t idx = 0; idx <= cur_size; idx++) {
971  if (idx == eq_id) {
972  continue;
973  }
974  result.push_back(idx);
975  }
976  }
977  }
978  return result;
979 }
980 
981 std::vector<int32_t> StringDictionary::getCompare(const std::string& pattern,
982  const std::string& comp_operator,
983  const size_t generation) {
984  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
985  if (isClient()) {
986  return client_->get_compare(pattern, comp_operator, generation);
987  }
988  std::vector<int32_t> ret;
989  if (str_count_ == 0) {
990  return ret;
991  }
992  if (sorted_cache.size() < str_count_) {
993  if (comp_operator == "=" || comp_operator == "<>") {
994  return getEquals(pattern, comp_operator, generation);
995  }
996 
998  }
999  auto cache_index = compare_cache_.get(pattern);
1000 
1001  if (!cache_index) {
1002  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
1003  const auto cache_itr = std::lower_bound(
1004  sorted_cache.begin(),
1005  sorted_cache.end(),
1006  pattern,
1007  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
1008  auto a_str = this->getStringFromStorage(a);
1009  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
1010  });
1011 
1012  if (cache_itr == sorted_cache.end()) {
1013  cache_index->index = sorted_cache.size() - 1;
1014  cache_index->diff = 1;
1015  } else {
1016  const auto cache_str = getStringFromStorage(*cache_itr);
1017  if (!string_eq(
1018  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
1019  cache_index->index = cache_itr - sorted_cache.begin() - 1;
1020  cache_index->diff = 1;
1021  } else {
1022  cache_index->index = cache_itr - sorted_cache.begin();
1023  cache_index->diff = 0;
1024  }
1025  }
1026 
1027  compare_cache_.put(pattern, cache_index);
1028  compare_cache_size_ += (pattern.size() + sizeof(cache_index));
1029  }
1030 
1031  // since we have a cache in form of vector of ints which is sorted according to
1032  // corresponding strings in the dictionary all we need is the index of the element
1033  // which equal to the pattern that we are trying to match or the index of “biggest”
1034  // element smaller than the pattern, to perform all the comparison operators over
1035  // string. The search function guarantees we have such index so now it is just the
1036  // matter to include all the elements in the result vector.
1037 
1038  // For < operator if the index that we have points to the element which is equal to
1039  // the pattern that we are searching for we simply get all the elements less than the
1040  // index. If the element pointed by the index is not equal to the pattern we are
1041  // comparing with we also need to include that index in result vector, except when the
1042  // index points to 0 and the pattern is lesser than the smallest value in the string
1043  // dictionary.
1044 
1045  if (comp_operator == "<") {
1046  size_t idx = cache_index->index;
1047  if (cache_index->diff) {
1048  idx = cache_index->index + 1;
1049  if (cache_index->index == 0 && cache_index->diff > 0) {
1050  idx = cache_index->index;
1051  }
1052  }
1053  for (size_t i = 0; i < idx; i++) {
1054  ret.push_back(sorted_cache[i]);
1055  }
1056 
1057  // For <= operator if the index that we have points to the element which is equal to
1058  // the pattern that we are searching for we want to include the element pointed by
1059  // the index in the result set. If the element pointed by the index is not equal to
1060  // the pattern we are comparing with we just want to include all the ids with index
1061  // less than the index that is cached, except when pattern that we are searching for
1062  // is smaller than the smallest string in the dictionary.
1063 
1064  } else if (comp_operator == "<=") {
1065  size_t idx = cache_index->index + 1;
1066  if (cache_index == 0 && cache_index->diff > 0) {
1067  idx = cache_index->index;
1068  }
1069  for (size_t i = 0; i < idx; i++) {
1070  ret.push_back(sorted_cache[i]);
1071  }
1072 
1073  // For > operator we want to get all the elements with index greater than the index
1074  // that we have except, when the pattern we are searching for is lesser than the
1075  // smallest string in the dictionary we also want to include the id of the index
1076  // that we have.
1077 
1078  } else if (comp_operator == ">") {
1079  size_t idx = cache_index->index + 1;
1080  if (cache_index->index == 0 && cache_index->diff > 0) {
1081  idx = cache_index->index;
1082  }
1083  for (size_t i = idx; i < sorted_cache.size(); i++) {
1084  ret.push_back(sorted_cache[i]);
1085  }
1086 
1087  // For >= operator when the indexed element that we have points to element which is
1088  // equal to the pattern we are searching for we want to include that in the result
1089  // vector. If the index that we have does not point to the string which is equal to
1090  // the pattern we are searching we don’t want to include that id into the result
1091  // vector except when the index is 0.
1092 
1093  } else if (comp_operator == ">=") {
1094  size_t idx = cache_index->index;
1095  if (cache_index->diff) {
1096  idx = cache_index->index + 1;
1097  if (cache_index->index == 0 && cache_index->diff > 0) {
1098  idx = cache_index->index;
1099  }
1100  }
1101  for (size_t i = idx; i < sorted_cache.size(); i++) {
1102  ret.push_back(sorted_cache[i]);
1103  }
1104  } else if (comp_operator == "=") {
1105  if (!cache_index->diff) {
1106  ret.push_back(sorted_cache[cache_index->index]);
1107  }
1108 
1109  // For <> operator it is simple matter of not including id of string which is equal
1110  // to pattern we are searching for.
1111  } else if (comp_operator == "<>") {
1112  if (!cache_index->diff) {
1113  size_t idx = cache_index->index;
1114  for (size_t i = 0; i < idx; i++) {
1115  ret.push_back(sorted_cache[i]);
1116  }
1117  ++idx;
1118  for (size_t i = idx; i < sorted_cache.size(); i++) {
1119  ret.push_back(sorted_cache[i]);
1120  }
1121  } else {
1122  for (size_t i = 0; i < sorted_cache.size(); i++) {
1123  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
1124  }
1125  }
1126 
1127  } else {
1128  std::runtime_error("Unsupported string comparison operator");
1129  }
1130  return ret;
1131 }
1132 
1133 namespace {
1134 
1135 bool is_regexp_like(const std::string& str,
1136  const std::string& pattern,
1137  const char escape) {
1138  return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
1139 }
1140 
1141 } // namespace
1142 
1143 std::vector<int32_t> StringDictionary::getRegexpLike(const std::string& pattern,
1144  const char escape,
1145  const size_t generation) const {
1146  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1147  if (isClient()) {
1148  return client_->get_regexp_like(pattern, escape, generation);
1149  }
1150  const auto cache_key = std::make_pair(pattern, escape);
1151  const auto it = regex_cache_.find(cache_key);
1152  if (it != regex_cache_.end()) {
1153  return it->second;
1154  }
1155  std::vector<int32_t> result;
1156  std::vector<std::thread> workers;
1157  int worker_count = cpu_threads();
1158  CHECK_GT(worker_count, 0);
1159  std::vector<std::vector<int32_t>> worker_results(worker_count);
1160  CHECK_LE(generation, str_count_);
1161  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
1162  workers.emplace_back([&worker_results,
1163  &pattern,
1164  generation,
1165  escape,
1166  worker_idx,
1167  worker_count,
1168  this]() {
1169  for (size_t string_id = worker_idx; string_id < generation;
1170  string_id += worker_count) {
1171  const auto str = getStringUnlocked(string_id);
1172  if (is_regexp_like(str, pattern, escape)) {
1173  worker_results[worker_idx].push_back(string_id);
1174  }
1175  }
1176  });
1177  }
1178  for (auto& worker : workers) {
1179  worker.join();
1180  }
1181  for (const auto& worker_result : worker_results) {
1182  result.insert(result.end(), worker_result.begin(), worker_result.end());
1183  }
1184  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
1185  regex_cache_size_ += (pattern.size() + 1 + (result.size() * sizeof(int32_t)));
1186  CHECK(it_ok.second);
1187 
1188  return result;
1189 }
1190 
1191 std::vector<std::string> StringDictionary::copyStrings() const {
1192  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1193  if (isClient()) {
1194  // TODO(miyu): support remote string dictionary
1195  throw std::runtime_error(
1196  "copying dictionaries from remote server is not supported yet.");
1197  }
1198 
1199  if (strings_cache_) {
1200  return *strings_cache_;
1201  }
1202 
1203  strings_cache_ = std::make_shared<std::vector<std::string>>();
1204  strings_cache_->reserve(str_count_);
1205  const bool multithreaded = str_count_ > 10000;
1206  const auto worker_count =
1207  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
1208  CHECK_GT(worker_count, 0UL);
1209  std::vector<std::vector<std::string>> worker_results(worker_count);
1210  std::vector<size_t> string_size(worker_count, 0);
1211  auto copy = [this, &string_size](std::vector<std::string>& str_list,
1212  const size_t worker_idx,
1213  const size_t start_id,
1214  const size_t end_id) {
1215  CHECK_LE(start_id, end_id);
1216  str_list.reserve(end_id - start_id);
1217  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
1218  auto str = getStringUnlocked(string_id);
1219  string_size[worker_idx] += str.size();
1220  str_list.push_back(str);
1221  }
1222  };
1223  if (multithreaded) {
1224  std::vector<std::future<void>> workers;
1225  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
1226  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
1227  worker_idx < worker_count && start < str_count_;
1228  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
1229  workers.push_back(std::async(std::launch::async,
1230  copy,
1231  std::ref(worker_results[worker_idx]),
1232  worker_idx,
1233  start,
1234  end));
1235  }
1236  for (auto& worker : workers) {
1237  worker.get();
1238  }
1239  } else {
1240  CHECK_EQ(worker_results.size(), size_t(1));
1241  copy(worker_results[0], 0, 0, str_count_);
1242  }
1243 
1244  for (const auto& worker_result : worker_results) {
1245  strings_cache_->insert(
1246  strings_cache_->end(), worker_result.begin(), worker_result.end());
1247  }
1249  std::accumulate(string_size.begin(), string_size.end(), size_t(0));
1250  return *strings_cache_;
1251 }
1252 
1253 bool StringDictionary::fillRateIsHigh(const size_t num_strings) const noexcept {
1254  return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1255 }
1256 
1258  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1259  INVALID_STR_ID);
1260 
1261  if (materialize_hashes_) {
1262  for (size_t i = 0; i != str_count_; ++i) {
1263  const string_dict_hash_t hash = hash_cache_[i];
1264  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1265  new_str_ids[bucket] = i;
1266  }
1267  hash_cache_.resize(hash_cache_.size() * 2);
1268  } else {
1269  for (size_t i = 0; i != str_count_; ++i) {
1270  const auto str = getStringChecked(i);
1271  const string_dict_hash_t hash = hash_string(str);
1272  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1273  new_str_ids[bucket] = i;
1274  }
1275  }
1276  string_id_string_dict_hash_table_.swap(new_str_ids);
1277 }
1278 
1279 template <class String>
1281  const size_t str_count, // str_count_ is only persisted strings, so need transient
1282  // shadow count
1283  const size_t storage_high_water_mark,
1284  const std::vector<String>& input_strings,
1285  const std::vector<size_t>& string_memory_ids,
1286  const std::vector<string_dict_hash_t>& input_strings_hashes) noexcept {
1287  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1288  INVALID_STR_ID);
1289  if (materialize_hashes_) {
1290  for (size_t i = 0; i != str_count; ++i) {
1291  const string_dict_hash_t hash = hash_cache_[i];
1292  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1293  new_str_ids[bucket] = i;
1294  }
1295  hash_cache_.resize(hash_cache_.size() * 2);
1296  } else {
1297  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1298  const auto storage_string = getStringChecked(storage_idx);
1299  const string_dict_hash_t hash = hash_string(storage_string);
1300  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1301  new_str_ids[bucket] = storage_idx;
1302  }
1303  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1304  const size_t string_memory_id = string_memory_ids[memory_idx];
1305  const uint32_t bucket = computeUniqueBucketWithHash(
1306  input_strings_hashes[string_memory_id], new_str_ids);
1307  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1308  }
1309  }
1310  string_id_string_dict_hash_table_.swap(new_str_ids);
1311 }
1312 
1313 int32_t StringDictionary::getOrAddImpl(const std::string_view& str) noexcept {
1314  // @TODO(wei) treat empty string as NULL for now
1315  if (str.size() == 0) {
1316  return inline_int_null_value<int32_t>();
1317  }
1318  CHECK(str.size() <= MAX_STRLEN);
1319  const string_dict_hash_t hash = hash_string(str);
1320  {
1321  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1322  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1323  if (string_id_string_dict_hash_table_[bucket] != INVALID_STR_ID) {
1324  return string_id_string_dict_hash_table_[bucket];
1325  }
1326  }
1327  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1328  if (fillRateIsHigh(str_count_)) {
1329  // resize when more than 50% is full
1330  increaseHashTableCapacity();
1331  }
1332  // need to recalculate the bucket in case it changed before
1333  // we got the lock
1334  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1335  if (string_id_string_dict_hash_table_[bucket] == INVALID_STR_ID) {
1336  CHECK_LT(str_count_, MAX_STRCOUNT)
1337  << "Maximum number (" << str_count_
1338  << ") of Dictionary encoded Strings reached for this column, offset path "
1339  "for column is "
1340  << offsets_path_;
1341  appendToStorage(str);
1342  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1343  if (materialize_hashes_) {
1344  hash_cache_[str_count_] = hash;
1345  }
1346  ++str_count_;
1347  invalidateInvertedIndex();
1348  }
1349  return string_id_string_dict_hash_table_[bucket];
1350 }
1351 
1352 std::string StringDictionary::getStringChecked(const int string_id) const noexcept {
1353  const auto str_canary = getStringFromStorage(string_id);
1354  CHECK(!str_canary.canary);
1355  return std::string(str_canary.c_str_ptr, str_canary.size);
1356 }
1357 
1359  const int string_id) const noexcept {
1360  const auto str_canary = getStringFromStorage(string_id);
1361  CHECK(!str_canary.canary);
1362  return std::string_view{str_canary.c_str_ptr, str_canary.size};
1363 }
1364 
1366  const int string_id) const noexcept {
1367  const auto str_canary = getStringFromStorage(string_id);
1368  CHECK(!str_canary.canary);
1369  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1370 }
1371 
1372 template <class String>
1374  const string_dict_hash_t hash,
1375  const String& input_string,
1376  const std::vector<int32_t>& string_id_string_dict_hash_table) const noexcept {
1377  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1378  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1379  while (true) {
1380  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1381  if (candidate_string_id ==
1382  INVALID_STR_ID) { // In this case it means the slot is available for use
1383  break;
1384  }
1385  if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1386  !materialize_hashes_) {
1387  const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1388  if (input_string.size() == candidate_string.size() &&
1389  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1390  // found the string
1391  break;
1392  }
1393  }
1394  // wrap around
1395  if (++bucket == string_dict_hash_table_size) {
1396  bucket = 0;
1397  }
1398  }
1399  return bucket;
1400 }
1401 
1402 template <class String>
1404  const string_dict_hash_t input_string_hash,
1405  const String& input_string,
1406  const std::vector<int32_t>& string_id_string_dict_hash_table,
1407  const size_t storage_high_water_mark,
1408  const std::vector<String>& input_strings,
1409  const std::vector<size_t>& string_memory_ids) const noexcept {
1410  uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1411  while (true) {
1412  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1413  if (candidate_string_id ==
1414  INVALID_STR_ID) { // In this case it means the slot is available for use
1415  break;
1416  }
1417  if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1418  if (candidate_string_id > 0 &&
1419  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1420  // The candidate string is not in storage yet but in our string_memory_ids temp
1421  // buffer
1422  size_t memory_offset =
1423  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1424  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1425  if (input_string.size() == candidate_string.size() &&
1426  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1427  // found the string in the temp memory buffer
1428  break;
1429  }
1430  } else {
1431  // The candidate string is in storage, need to fetch it for comparison
1432  const auto candidate_storage_string =
1433  getStringFromStorageFast(candidate_string_id);
1434  if (input_string.size() == candidate_storage_string.size() &&
1435  !memcmp(input_string.data(),
1436  candidate_storage_string.data(),
1437  input_string.size())) {
1440  // found the string in storage
1441  break;
1442  }
1443  }
1444  }
1445  if (++bucket == string_id_string_dict_hash_table.size()) {
1446  bucket = 0;
1447  }
1448  }
1449  return bucket;
1450 }
1451 
1453  const string_dict_hash_t hash,
1454  const std::vector<int32_t>& string_id_string_dict_hash_table) noexcept {
1455  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1456  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1457  while (true) {
1458  if (string_id_string_dict_hash_table[bucket] ==
1459  INVALID_STR_ID) { // In this case it means the slot is available for use
1460  break;
1461  }
1462  collisions_++;
1463  // wrap around
1464  if (++bucket == string_dict_hash_table_size) {
1465  bucket = 0;
1466  }
1467  }
1468  return bucket;
1469 }
1470 
1472  const size_t write_length) {
1473  if (payload_file_off_ + write_length > payload_file_size_) {
1474  const size_t min_capacity_needed =
1475  write_length - (payload_file_size_ - payload_file_off_);
1476  if (!isTemp_) {
1477  CHECK_GE(payload_fd_, 0);
1479  addPayloadCapacity(min_capacity_needed);
1480  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1481  payload_map_ =
1482  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
1483  } else {
1484  addPayloadCapacity(min_capacity_needed);
1485  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1486  }
1487  }
1488 }
1489 
1491  const size_t write_length) {
1492  const size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1493  if (offset_file_off + write_length >= offset_file_size_) {
1494  const size_t min_capacity_needed =
1495  write_length - (offset_file_size_ - offset_file_off);
1496  if (!isTemp_) {
1497  CHECK_GE(offset_fd_, 0);
1499  addOffsetCapacity(min_capacity_needed);
1500  CHECK(offset_file_off + write_length <= offset_file_size_);
1501  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1503  } else {
1504  addOffsetCapacity(min_capacity_needed);
1505  CHECK(offset_file_off + write_length <= offset_file_size_);
1506  }
1507  }
1508 }
1509 
1510 template <class String>
1511 void StringDictionary::appendToStorage(const String str) noexcept {
1512  // write the payload
1513  checkAndConditionallyIncreasePayloadCapacity(str.size());
1514  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1515 
1516  // write the offset and length
1517  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1518  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1519 
1520  checkAndConditionallyIncreaseOffsetCapacity(sizeof(str_meta));
1521  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1522 }
1523 
1524 template <class String>
1526  const std::vector<String>& input_strings,
1527  const std::vector<size_t>& string_memory_ids,
1528  const size_t sum_new_strings_lengths) noexcept {
1529  const size_t num_strings = string_memory_ids.size();
1530 
1531  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1532  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1533 
1534  for (size_t i = 0; i < num_strings; ++i) {
1535  const size_t string_idx = string_memory_ids[i];
1536  const String str = input_strings[string_idx];
1537  const size_t str_size(str.size());
1538  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1539  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1540  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1541  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1542  }
1543 }
1544 
1546  const int string_id) const noexcept {
1547  const StringIdxEntry* str_meta = offset_map_ + string_id;
1548  return {payload_map_ + str_meta->off, str_meta->size};
1549 }
1550 
1552  const int string_id) const noexcept {
1553  if (!isTemp_) {
1554  CHECK_GE(payload_fd_, 0);
1555  CHECK_GE(offset_fd_, 0);
1556  }
1557  CHECK_GE(string_id, 0);
1558  const StringIdxEntry* str_meta = offset_map_ + string_id;
1559  if (str_meta->size == 0xffff) {
1560  // hit the canary
1561  return {nullptr, 0, true};
1562  }
1563  return {payload_map_ + str_meta->off, str_meta->size, false};
1564 }
1565 
1566 void StringDictionary::addPayloadCapacity(const size_t min_capacity_requested) noexcept {
1567  if (!isTemp_) {
1568  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1569  } else {
1570  payload_map_ = static_cast<char*>(
1571  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1572  }
1573 }
1574 
1575 void StringDictionary::addOffsetCapacity(const size_t min_capacity_requested) noexcept {
1576  if (!isTemp_) {
1577  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1578  } else {
1579  offset_map_ = static_cast<StringIdxEntry*>(
1580  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1581  }
1582 }
1583 
1585  int fd,
1586  const size_t min_capacity_requested) noexcept {
1587  const size_t canary_buff_size_to_add =
1588  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1589  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1590 
1591  if (canary_buffer_size < canary_buff_size_to_add) {
1592  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1593  canary_buffer_size = canary_buff_size_to_add;
1594  CHECK(CANARY_BUFFER);
1595  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1596  }
1597 
1598  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1599  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1600  CHECK(write_return > 0 &&
1601  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1602  return canary_buff_size_to_add;
1603 }
1604 
1606  size_t& mem_size,
1607  const size_t min_capacity_requested) noexcept {
1608  const size_t canary_buff_size_to_add =
1609  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1610  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1611  if (canary_buffer_size < canary_buff_size_to_add) {
1612  CANARY_BUFFER =
1613  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1614  canary_buffer_size = canary_buff_size_to_add;
1615  CHECK(CANARY_BUFFER);
1616  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1617  }
1618  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1619  CHECK(new_addr);
1620  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1621  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1622  mem_size += canary_buff_size_to_add;
1623  return new_addr;
1624 }
1625 
1627  if (!like_i32_cache_.empty()) {
1628  decltype(like_i32_cache_)().swap(like_i32_cache_);
1629  }
1630  if (!like_i64_cache_.empty()) {
1631  decltype(like_i64_cache_)().swap(like_i64_cache_);
1632  }
1633  if (!regex_cache_.empty()) {
1634  decltype(regex_cache_)().swap(regex_cache_);
1635  }
1636  if (!equal_cache_.empty()) {
1637  decltype(equal_cache_)().swap(equal_cache_);
1638  }
1639  compare_cache_.invalidateInvertedIndex();
1640 
1641  like_cache_size_ = 0;
1642  regex_cache_size_ = 0;
1643  equal_cache_size_ = 0;
1644  compare_cache_size_ = 0;
1645 }
1646 
1647 // TODO 5 Mar 2021 Nothing will undo the writes to dictionary currently on a failed
1648 // load. The next write to the dictionary that does checkpoint will make the
1649 // uncheckpointed data be written to disk. Only option is a table truncate, and thats
1650 // assuming not replicated dictionary
1652  if (isClient()) {
1653  try {
1654  return client_->checkpoint();
1655  } catch (...) {
1656  return false;
1657  }
1658  }
1659  CHECK(!isTemp_);
1660  bool ret = true;
1661  ret = ret &&
1662  (heavyai::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1663  ret = ret &&
1664  (heavyai::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1665  ret = ret && (heavyai::fsync(offset_fd_) == 0);
1666  ret = ret && (heavyai::fsync(payload_fd_) == 0);
1667  return ret;
1668 }
1669 
1670 bool StringDictionary::isClient() const noexcept {
1671  return static_cast<bool>(client_);
1672 }
1673 
1675  // This method is not thread-safe.
1676  const auto cur_cache_size = sorted_cache.size();
1677  std::vector<int32_t> temp_sorted_cache;
1678  for (size_t i = cur_cache_size; i < str_count_; i++) {
1679  temp_sorted_cache.push_back(i);
1680  }
1681  sortCache(temp_sorted_cache);
1682  mergeSortedCache(temp_sorted_cache);
1683 }
1684 
1685 void StringDictionary::sortCache(std::vector<int32_t>& cache) {
1686  // This method is not thread-safe.
1687 
1688  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1689  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1690 
1691  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1692  auto a_str = this->getStringFromStorage(a);
1693  auto b_str = this->getStringFromStorage(b);
1694  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1695  });
1696 }
1697 
1698 void StringDictionary::mergeSortedCache(std::vector<int32_t>& temp_sorted_cache) {
1699  // this method is not thread safe
1700  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1701  size_t t_idx = 0, s_idx = 0, idx = 0;
1702  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1703  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1704  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1705  const auto insert_from_temp_cache =
1706  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1707  if (insert_from_temp_cache) {
1708  updated_cache[idx] = temp_sorted_cache[t_idx++];
1709  } else {
1710  updated_cache[idx] = sorted_cache[s_idx++];
1711  }
1712  }
1713  while (t_idx < temp_sorted_cache.size()) {
1714  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1715  }
1716  while (s_idx < sorted_cache.size()) {
1717  updated_cache[idx++] = sorted_cache[s_idx++];
1718  }
1719  sorted_cache.swap(updated_cache);
1720 }
1721 
1723  std::vector<int32_t>& dest_ids,
1724  StringDictionary* dest_dict,
1725  const std::vector<int32_t>& source_ids,
1726  const StringDictionary* source_dict,
1727  const std::vector<std::string const*>& transient_string_vec) {
1728  std::vector<std::string> strings;
1729 
1730  for (const int32_t source_id : source_ids) {
1731  if (source_id == std::numeric_limits<int32_t>::min()) {
1732  strings.emplace_back("");
1733  } else if (source_id < 0) {
1734  unsigned const string_index = StringDictionaryProxy::transientIdToIndex(source_id);
1735  CHECK_LT(string_index, transient_string_vec.size()) << "source_id=" << source_id;
1736  strings.emplace_back(*transient_string_vec[string_index]);
1737  } else {
1738  strings.push_back(source_dict->getString(source_id));
1739  }
1740  }
1741 
1742  dest_ids.resize(strings.size());
1743  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1744 }
1745 
1747  std::vector<std::vector<int32_t>>& dest_array_ids,
1748  StringDictionary* dest_dict,
1749  const std::vector<std::vector<int32_t>>& source_array_ids,
1750  const StringDictionary* source_dict) {
1751  dest_array_ids.resize(source_array_ids.size());
1752 
1753  std::atomic<size_t> row_idx{0};
1754  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1755  int thread_id) {
1756  for (;;) {
1757  auto row = row_idx.fetch_add(1);
1758 
1759  if (row >= dest_array_ids.size()) {
1760  return;
1761  }
1762  const auto& source_ids = source_array_ids[row];
1763  auto& dest_ids = dest_array_ids[row];
1764  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1765  }
1766  };
1767 
1768  const int num_worker_threads = std::thread::hardware_concurrency();
1769 
1770  if (source_array_ids.size() / num_worker_threads > 10) {
1771  std::vector<std::future<void>> worker_threads;
1772  for (int i = 0; i < num_worker_threads; ++i) {
1773  worker_threads.push_back(std::async(std::launch::async, processor, i));
1774  }
1775 
1776  for (auto& child : worker_threads) {
1777  child.wait();
1778  }
1779  for (auto& child : worker_threads) {
1780  child.get();
1781  }
1782  } else {
1783  processor(0);
1784  }
1785 }
1786 
1787 std::vector<std::string_view> StringDictionary::getStringViews(
1788  const size_t generation) const {
1789  auto timer = DEBUG_TIMER(__func__);
1790  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1791  const int64_t num_strings = generation >= 0 ? generation : storageEntryCount();
1792  CHECK_LE(num_strings, static_cast<int64_t>(StringDictionary::MAX_STRCOUNT));
1793  // The CHECK_LE below is currently redundant with the check
1794  // above against MAX_STRCOUNT, however given we iterate using
1795  // int32_t types for efficiency (to match type expected by
1796  // getStringFromStorageFast, check that the # of strings is also
1797  // in the int32_t range in case MAX_STRCOUNT is changed
1798 
1799  // Todo(todd): consider aliasing the max logical type width
1800  // (currently int32_t) throughout StringDictionary
1801  CHECK_LE(num_strings, std::numeric_limits<int32_t>::max());
1802 
1803  std::vector<std::string_view> string_views(num_strings);
1804  // We can bail early if the generation-specified dictionary is empty
1805  if (num_strings == 0) {
1806  return string_views;
1807  }
1808  constexpr int64_t tbb_parallel_threshold{1000};
1809  if (num_strings < tbb_parallel_threshold) {
1810  // Use int32_t to match type expected by getStringFromStorageFast
1811  for (int32_t string_idx = 0; string_idx < num_strings; ++string_idx) {
1812  string_views[string_idx] = getStringFromStorageFast(string_idx);
1813  }
1814  } else {
1815  constexpr int64_t target_strings_per_thread{1000};
1816  const ThreadInfo thread_info(
1817  std::thread::hardware_concurrency(), num_strings, target_strings_per_thread);
1818  CHECK_GE(thread_info.num_threads, 1L);
1819  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1820 
1821  tbb::task_arena limited_arena(thread_info.num_threads);
1822  limited_arena.execute([&] {
1824  tbb::blocked_range<int64_t>(
1825  0, num_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
1826  [&](const tbb::blocked_range<int64_t>& r) {
1827  // r should be in range of int32_t per CHECK above
1828  const int32_t start_idx = r.begin();
1829  const int32_t end_idx = r.end();
1830  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1831  string_views[string_idx] = getStringFromStorageFast(string_idx);
1832  }
1833  },
1834  tbb::simple_partitioner());
1835  });
1836  }
1837  return string_views;
1838 }
1839 
1840 std::vector<std::string_view> StringDictionary::getStringViews() const {
1842 }
1843 
1845  const std::shared_ptr<StringDictionary> dest_dict,
1846  StringLookupCallback const& dest_transient_lookup_callback) const {
1847  auto timer = DEBUG_TIMER(__func__);
1848  const size_t num_source_strings = storageEntryCount();
1849  const size_t num_dest_strings = dest_dict->storageEntryCount();
1850  std::vector<int32_t> translated_ids(num_source_strings);
1851 
1852  buildDictionaryTranslationMap(dest_dict.get(),
1853  translated_ids.data(),
1854  num_source_strings,
1855  num_dest_strings,
1856  true, // Just assume true for dest_has_transients as this
1857  // function is only used for testing currently
1858  dest_transient_lookup_callback,
1859  {});
1860  return translated_ids;
1861 }
1862 
1864  const shared::StringDictKey& dest_dict_key,
1865  std::shared_lock<std::shared_mutex>& source_read_lock,
1866  std::shared_lock<std::shared_mutex>& dest_read_lock) {
1867  const bool dicts_are_same = (source_dict_key == dest_dict_key);
1868  const bool source_dict_is_locked_first = (source_dict_key < dest_dict_key);
1869  if (dicts_are_same) {
1870  // dictionaries are same, only take one write lock
1871  dest_read_lock.lock();
1872  } else if (source_dict_is_locked_first) {
1873  source_read_lock.lock();
1874  dest_read_lock.lock();
1875  } else {
1876  dest_read_lock.lock();
1877  source_read_lock.lock();
1878  }
1879 }
1880 
1882  const StringDictionary* dest_dict,
1883  int32_t* translated_ids,
1884  const int64_t source_generation,
1885  const int64_t dest_generation,
1886  const bool dest_has_transients,
1887  StringLookupCallback const& dest_transient_lookup_callback,
1888  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) const {
1889  auto timer = DEBUG_TIMER(__func__);
1890  CHECK_GE(source_generation, 0L);
1891  CHECK_GE(dest_generation, 0L);
1892  const int64_t num_source_strings = source_generation;
1893  const int64_t num_dest_strings = dest_generation;
1894 
1895  // We can bail early if there are no source strings to translate
1896  if (num_source_strings == 0L) {
1897  return 0;
1898  }
1899 
1900  // If here we should should have local dictionaries.
1901  // Note case of transient source dictionaries that aren't
1902  // seen as remote (they have no client_no_timeout_) is covered
1903  // by early bail above on num_source_strings == 0
1904  if (dest_dict->client_no_timeout_) {
1905  throw std::runtime_error(
1906  "Cannot translate between a local source and remote destination dictionary.");
1907  }
1908 
1909  // Sort this/source dict and dest dict on folder_ so we can enforce
1910  // lock ordering and avoid deadlocks
1911  std::shared_lock<std::shared_mutex> source_read_lock(rw_mutex_, std::defer_lock);
1912  std::shared_lock<std::shared_mutex> dest_read_lock(dest_dict->rw_mutex_,
1913  std::defer_lock);
1915  getDictKey(), dest_dict->getDictKey(), source_read_lock, dest_read_lock);
1916 
1917  // For both source and destination dictionaries we cap the max
1918  // entries to be translated/translated to at the supplied
1919  // generation arguments, if valid (i.e. >= 0), otherwise just the
1920  // size of each dictionary
1921 
1922  CHECK_LE(num_source_strings, static_cast<int64_t>(str_count_));
1923  CHECK_LE(num_dest_strings, static_cast<int64_t>(dest_dict->str_count_));
1924  const bool dest_dictionary_is_empty = (num_dest_strings == 0);
1925 
1926  constexpr int64_t target_strings_per_thread{1000};
1927  const ThreadInfo thread_info(
1928  std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
1929  CHECK_GE(thread_info.num_threads, 1L);
1930  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1931 
1932  // We use a tbb::task_arena to cap the number of threads, has been
1933  // in other contexts been shown to exhibit better performance when low
1934  // numbers of threads are needed than just letting tbb figure the number of threads,
1935  // but should benchmark in this specific context
1936 
1937  const StringOps_Namespace::StringOps string_ops(string_op_infos);
1938  const bool has_string_ops = string_ops.size();
1939 
1940  tbb::task_arena limited_arena(thread_info.num_threads);
1941  std::vector<size_t> num_strings_not_translated_per_thread(thread_info.num_threads, 0UL);
1942  constexpr bool short_circuit_empty_dictionary_translations{false};
1943  limited_arena.execute([&] {
1944  if (short_circuit_empty_dictionary_translations && dest_dictionary_is_empty) {
1946  tbb::blocked_range<int32_t>(
1947  0,
1948  num_source_strings,
1949  thread_info.num_elems_per_thread /* tbb grain_size */),
1950  [&](const tbb::blocked_range<int32_t>& r) {
1951  const int32_t start_idx = r.begin();
1952  const int32_t end_idx = r.end();
1953  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1954  translated_ids[string_idx] = INVALID_STR_ID;
1955  }
1956  },
1957  tbb::simple_partitioner());
1958  num_strings_not_translated_per_thread[0] += num_source_strings;
1959  } else {
1960  // The below logic, by executing low-level private variable accesses on both
1961  // dictionaries, is less clean than a previous variant that simply called
1962  // `getStringViews` from the source dictionary and then called `getBulk` on the
1963  // destination dictionary, but this version gets significantly better performance
1964  // (~2X), likely due to eliminating the overhead of writing out the string views and
1965  // then reading them back in (along with the associated cache misses)
1967  tbb::blocked_range<int32_t>(
1968  0,
1969  num_source_strings,
1970  thread_info.num_elems_per_thread /* tbb grain_size */),
1971  [&](const tbb::blocked_range<int32_t>& r) {
1972  const int32_t start_idx = r.begin();
1973  const int32_t end_idx = r.end();
1974  size_t num_strings_not_translated = 0;
1975  std::string string_ops_storage; // Needs to be thread local to back
1976  // string_view returned by string_ops()
1977  for (int32_t source_string_id = start_idx; source_string_id != end_idx;
1978  ++source_string_id) {
1979  const std::string_view source_str =
1980  has_string_ops ? string_ops(getStringFromStorageFast(source_string_id),
1981  string_ops_storage)
1982  : getStringFromStorageFast(source_string_id);
1983 
1984  if (source_str.empty()) {
1985  translated_ids[source_string_id] = inline_int_null_value<int32_t>();
1986  continue;
1987  }
1988  // Get the hash from this/the source dictionary's cache, as the function
1989  // will be the same for the dest_dict, sparing us having to recompute it
1990 
1991  // Todo(todd): Remove option to turn string hash cache off or at least
1992  // make a constexpr to avoid these branches when we expect it to be always
1993  // on going forward
1994  const string_dict_hash_t hash = (materialize_hashes_ && !has_string_ops)
1995  ? hash_cache_[source_string_id]
1996  : hash_string(source_str);
1997  const uint32_t hash_bucket = dest_dict->computeBucket(
1998  hash, source_str, dest_dict->string_id_string_dict_hash_table_);
1999  const auto translated_string_id =
2000  dest_dict->string_id_string_dict_hash_table_[hash_bucket];
2001  translated_ids[source_string_id] = translated_string_id;
2002 
2003  if (translated_string_id == StringDictionary::INVALID_STR_ID ||
2004  translated_string_id >= num_dest_strings) {
2005  if (dest_has_transients) {
2006  num_strings_not_translated +=
2007  dest_transient_lookup_callback(source_str, source_string_id);
2008  } else {
2009  num_strings_not_translated++;
2010  }
2011  continue;
2012  }
2013  }
2014  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
2015  num_strings_not_translated_per_thread[tbb_thread_idx] +=
2016  num_strings_not_translated;
2017  },
2018  tbb::simple_partitioner());
2019  }
2020  });
2021  size_t total_num_strings_not_translated = 0;
2022  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
2023  total_num_strings_not_translated += num_strings_not_translated_per_thread[thread_idx];
2024  }
2025  return total_num_strings_not_translated;
2026 }
2027 
2029  Datum* translated_ids,
2030  const int64_t source_generation,
2031  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) const {
2032  auto timer = DEBUG_TIMER(__func__);
2033  CHECK_GE(source_generation, 0L);
2034  CHECK_GT(string_op_infos.size(), 0UL);
2035  CHECK(!string_op_infos.back().getReturnType().is_string());
2036  const int64_t num_source_strings = source_generation;
2037 
2038  // We can bail early if there are no source strings to translate
2039  if (num_source_strings == 0L) {
2040  return;
2041  }
2042 
2043  // If here we should should have a local dictionary
2044  // Note case of transient source dictionaries that aren't
2045  // seen as remote (they have no client_no_timeout_) is covered
2046  // by early bail above on num_source_strings == 0
2047 
2048  std::shared_lock<std::shared_mutex> source_read_lock(rw_mutex_);
2049 
2050  // For source dictionary we cap the number of entries
2051  // to be translated/translated to at the supplied
2052  // generation arguments, if valid (i.e. >= 0), otherwise
2053  // just the size of each dictionary
2054 
2055  CHECK_LE(num_source_strings, static_cast<int64_t>(str_count_));
2056 
2057  constexpr int64_t target_strings_per_thread{1000};
2058  const ThreadInfo thread_info(
2059  std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
2060  CHECK_GE(thread_info.num_threads, 1L);
2061  CHECK_GE(thread_info.num_elems_per_thread, 1L);
2062 
2063  // We use a tbb::task_arena to cap the number of threads, has been
2064  // in other contexts been shown to exhibit better performance when low
2065  // numbers of threads are needed than just letting tbb figure the number of threads,
2066  // but should benchmark in this specific context
2067 
2068  const StringOps_Namespace::StringOps string_ops(string_op_infos);
2069  CHECK_GT(string_ops.size(), 0UL);
2070 
2071  tbb::task_arena limited_arena(thread_info.num_threads);
2072  // The below logic, by executing low-level private variable accesses on both
2073  // dictionaries, is less clean than a previous variant that simply called
2074  // `getStringViews` from the source dictionary and then called `getBulk` on the
2075  // destination dictionary, but this version gets significantly better performance
2076  // (~2X), likely due to eliminating the overhead of writing out the string views and
2077  // then reading them back in (along with the associated cache misses)
2078  limited_arena.execute([&] {
2080  tbb::blocked_range<int32_t>(
2081  0, num_source_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
2082  [&](const tbb::blocked_range<int32_t>& r) {
2083  const int32_t start_idx = r.begin();
2084  const int32_t end_idx = r.end();
2085  for (int32_t source_string_id = start_idx; source_string_id != end_idx;
2086  ++source_string_id) {
2087  const std::string source_str =
2088  std::string(getStringFromStorageFast(source_string_id));
2089  translated_ids[source_string_id] = string_ops.numericEval(source_str);
2090  }
2091  });
2092  });
2093 }
2094 
2095 void translate_string_ids(std::vector<int32_t>& dest_ids,
2096  const LeafHostInfo& dict_server_host,
2097  const shared::StringDictKey& dest_dict_key,
2098  const std::vector<int32_t>& source_ids,
2099  const shared::StringDictKey& source_dict_key,
2100  const int32_t dest_generation) {
2101  shared::StringDictKey temp_dict_key(-1, -1);
2102  StringDictionaryClient string_client(
2103  dict_server_host, {temp_dict_key.db_id, temp_dict_key.dict_id}, false);
2104  string_client.translate_string_ids(dest_ids,
2105  {dest_dict_key.db_id, dest_dict_key.dict_id},
2106  source_ids,
2107  {source_dict_key.db_id, source_dict_key.dict_id},
2108  dest_generation);
2109 }
2110 
2112  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
2113  return string_id_string_dict_hash_table_.size() * sizeof(int32_t) +
2114  hash_cache_.size() * sizeof(string_dict_hash_t) +
2115  sorted_cache.size() * sizeof(int32_t) + like_cache_size_ + regex_cache_size_ +
2117 }
void throw_string_too_long_error(std::string_view str, const shared::StringDictKey &dict_key)
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
bool isClient() const noexcept
void increaseHashTableCapacity() noexcept
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
string_dict_hash_t hash_string(const std::string_view &str)
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
void operator()(std::string_view const, int32_t const string_id) override
size_t getBulk(const std::vector< String > &string_vec, T *encoded_vec) const
int64_t num_elems_per_thread
Definition: ThreadInfo.h:23
client_no_timeout_(new StringDictionaryClient(host,{dict_key.db_id, dict_key.dict_id}, false))
std::vector< int32_t > buildDictionaryTranslationMap(const std::shared_ptr< StringDictionary > dest_dict, StringLookupCallback const &dest_transient_lookup_callback) const
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:336
std::vector< std::string > copyStrings() const
void buildDictionaryNumericTranslationMap(Datum *translated_ids, const int64_t source_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
uint64_t off
const shared::StringDictKey & getDictKey() const noexcept
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int64_t > > like_i64_cache_
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
uint64_t size
size_t storageEntryCount() const
#define LOG(tag)
Definition: Logger.h:285
StringDictionary(const shared::StringDictKey &dict_key, const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
#define UNREACHABLE()
Definition: Logger.h:338
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
bool fillRateIsHigh(const size_t num_strings) const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:306
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
Constants for Builtin SQL Types supported by HEAVY.AI.
RUNTIME_EXPORT DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, char escape_char)
Definition: StringLike.cpp:61
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_i32_cache_
std::string offsets_path_
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::vector< std::string const * > &transient_string_vec={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
std::vector< T > getLikeImpl(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
std::unordered_map< std::string, int32_t > map_
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
#define CHECK_GT(x, y)
Definition: Logger.h:305
int32_t getOrAdd(const std::string &str) noexcept
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::string to_string(char const *&&v)
size_t computeCacheSize() const
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:143
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:308
constexpr double a
Definition: Utm.h:32
int32_t getIdOfString(const String &) const
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
std::string_view getStringView(int32_t string_id) const
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
future< Result > async(Fn &&fn, Args &&...args)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
int64_t num_threads
Definition: ThreadInfo.h:22
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define CHECK_NE(x, y)
Definition: Logger.h:302
int open(const char *path, int flags, int mode)
Definition: heavyai_fs.cpp:66
RUNTIME_EXPORT DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:250
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
std::map< std::string, int32_t > equal_cache_
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
void order_translation_locks(const shared::StringDictKey &source_dict_key, const shared::StringDictKey &dest_dict_key, std::shared_lock< std::shared_mutex > &source_read_lock, std::shared_lock< std::shared_mutex > &dest_read_lock)
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
Functions to support the LIKE and ILIKE operator in SQL. Only single-byte character set is supported ...
int32_t getUnlocked(const std::string_view sv) const noexcept
void appendToStorage(const String str) noexcept
int checked_open(const char *path, const bool recover)
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
const shared::StringDictKey dict_key_
std::unordered_map< std::string, int32_t > moveMap()
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const shared::StringDictKey &dest_dict_key, const std::vector< int32_t > &source_ids, const shared::StringDictKey &source_dict_key, const int32_t dest_generation)
bool checkpoint() noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:303
void throw_encoding_error(std::string_view str, const shared::StringDictKey &dict_key)
RUNTIME_EXPORT DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, char escape_char)
Definition: StringLike.cpp:43
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
#define CHECK_LE(x, y)
Definition: Logger.h:304
void eachStringSerially(int64_t const generation, StringCallback &) const
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
void update_leaf(const LeafHostInfo &host_info)
std::string getString(int32_t string_id) const
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
bool g_cache_string_hash
std::vector< std::string_view > getStringViews() const
int msync(void *addr, size_t length, bool async)
Definition: heavyai_fs.cpp:57
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
ThreadId thread_id()
Definition: Logger.cpp:879
int fsync(int fd)
Definition: heavyai_fs.cpp:62
std::function< int32_t(std::string const &)> makeLambdaStringToId() const
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
void checked_munmap(void *addr, size_t length)
Definition: heavyai_fs.cpp:53
const uint64_t round_up_p2(const uint64_t num)
std::string_view getStringViewUnlocked(int32_t string_id) const noexcept
std::vector< int32_t > string_id_string_dict_hash_table_
ThreadInfo(const int64_t max_thread_count, const int64_t num_elems, const int64_t target_elems_per_thread)
void sortCache(std::vector< int32_t > &cache)
std::string_view getStringViewChecked(const int string_id) const noexcept
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
std::function< bool(std::string_view, int32_t string_id)> StringLookupCallback
bool g_enable_stringdict_parallel
PayloadString getStringFromStorage(const int string_id) const noexcept
void close(const int fd)
Definition: heavyai_fs.cpp:70
constexpr double n
Definition: Utm.h:38
int cpu_threads()
Definition: thread_count.h:25
int get_page_size()
Definition: heavyai_fs.cpp:29
Definition: Datum.h:71
RUNTIME_EXPORT DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: Regexp.cpp:39
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
static unsigned transientIdToIndex(int32_t const id)
void operator()(std::string const &str, int32_t const string_id) override
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
std::vector< int32_t > sorted_cache
#define VLOG(n)
Definition: Logger.h:388
int32_t getOrAddImpl(const std::string_view &str) noexcept
~StringDictionary() noexcept
void * checked_mmap(const int fd, const size_t sz)
Definition: heavyai_fs.cpp:40
RUNTIME_EXPORT DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:261