OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
class  StringCallback
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const shared::StringDictKey &dict_key, const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const shared::StringDictKey &dict_key)
 
 ~StringDictionary () noexcept
 
const shared::StringDictKeygetDictKey () const noexcept
 
void eachStringSerially (int64_t const generation, StringCallback &) const
 
std::function< int32_t(std::string
const &)> 
makeLambdaStringToId () const
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T , class String >
size_t getBulk (const std::vector< String > &string_vec, T *encoded_vec) const
 
template<class T , class String >
size_t getBulk (const std::vector< String > &string_vec, T *encoded_vec, const int64_t generation) const
 
template<class T , class String >
void getOrAddBulk (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class T , class String >
void getOrAddBulkParallel (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class String >
void getOrAddBulkArray (const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
template<class String >
int32_t getIdOfString (const String &) const
 
std::string getString (int32_t string_id) const
 
std::string_view getStringView (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
template<typename T >
std::vector< T > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
template<typename T >
std::vector< T > getLikeImpl (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::vector< std::string > copyStrings () const
 
std::vector< std::string_view > getStringViews () const
 
std::vector< std::string_view > getStringViews (const size_t generation) const
 
std::vector< int32_t > buildDictionaryTranslationMap (const std::shared_ptr< StringDictionary > dest_dict, StringLookupCallback const &dest_transient_lookup_callback) const
 
size_t buildDictionaryTranslationMap (const StringDictionary *dest_dict, int32_t *translated_ids, const int64_t source_generation, const int64_t dest_generation, const bool dest_has_transients, StringLookupCallback const &dest_transient_lookup_callback, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
 
void buildDictionaryNumericTranslationMap (Datum *translated_ids, const int64_t source_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
 
bool checkpoint () noexcept
 
bool isClient () const noexcept
 
void update_leaf (const LeafHostInfo &host_info)
 
size_t computeCacheSize () const
 
template<>
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
template<>
std::vector< int64_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::vector< std::string const * > &transient_string_vec={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
 
size_t getNumStringsFromStorage (const size_t storage_slots) const noexcept
 
bool fillRateIsHigh (const size_t num_strings) const noexcept
 
void increaseHashTableCapacity () noexcept
 
template<class String >
void increaseHashTableCapacityFromStorageAndMemory (const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
 
int32_t getOrAddImpl (const std::string_view &str) noexcept
 
template<class String >
void hashStrings (const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
 
int32_t getUnlocked (const std::string_view sv) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string_view getStringViewUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::string_view getStringViewChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
template<class String >
uint32_t computeBucket (const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
 
template<class String >
uint32_t computeBucketFromStorageAndMemory (const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
 
uint32_t computeUniqueBucketWithHash (const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
 
void checkAndConditionallyIncreasePayloadCapacity (const size_t write_length)
 
void checkAndConditionallyIncreaseOffsetCapacity (const size_t write_length)
 
template<class String >
void appendToStorage (const String str) noexcept
 
template<class String >
void appendToStorageBulk (const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
std::string_view getStringFromStorageFast (const int string_id) const noexcept
 
void addPayloadCapacity (const size_t min_capacity_requested=0) noexcept
 
void addOffsetCapacity (const size_t min_capacity_requested=0) noexcept
 
size_t addStorageCapacity (int fd, const size_t min_capacity_requested=0) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

const shared::StringDictKey dict_key_
 
const std::string folder_
 
size_t str_count_
 
size_t collisions_
 
std::vector< int32_t > string_id_string_dict_hash_table_
 
std::vector< string_dict_hash_thash_cache_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
std::shared_mutex rw_mutex_
 
std::map< std::tuple
< std::string, bool, bool,
char >, std::vector< int32_t > > 
like_i32_cache_
 
std::map< std::tuple
< std::string, bool, bool,
char >, std::vector< int64_t > > 
like_i64_cache_
 
size_t like_cache_size_
 
std::map< std::pair
< std::string, char >
, std::vector< int32_t > > 
regex_cache_
 
size_t regex_cache_size_
 
std::map< std::string, int32_t > equal_cache_
 
size_t equal_cache_size_
 
DictionaryCache< std::string,
compare_cache_value_t
compare_cache_
 
size_t compare_cache_size_
 
std::shared_ptr< std::vector
< std::string > > 
strings_cache_
 
size_t strings_cache_size_
 
std::unique_ptr
< StringDictionaryClient
client_
 
std::unique_ptr
< StringDictionaryClient
client_no_timeout_
 
char * CANARY_BUFFER {nullptr}
 
size_t canary_buffer_size = 0
 

Friends

class StringLocalCallback
 

Detailed Description

Definition at line 54 of file StringDictionary.h.

Constructor & Destructor Documentation

StringDictionary::StringDictionary ( const shared::StringDictKey dict_key,
const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 121 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), threading_serial::async(), CHECK_EQ, heavyai::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), collisions_, heavyai::file_size(), getNumStringsFromStorage(), getStringFromStorage(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, str_count_, string_id_string_dict_hash_table_, VLOG, and logger::WARNING.

127  : dict_key_(dict_key)
128  , folder_(folder)
129  , str_count_(0)
131  , hash_cache_(initial_capacity)
132  , isTemp_(isTemp)
133  , materialize_hashes_(materializeHashes)
134  , payload_fd_(-1)
135  , offset_fd_(-1)
136  , offset_map_(nullptr)
137  , payload_map_(nullptr)
138  , offset_file_size_(0)
139  , payload_file_size_(0)
140  , payload_file_off_(0)
141  , like_cache_size_(0)
142  , regex_cache_size_(0)
143  , equal_cache_size_(0)
145  , strings_cache_(nullptr)
146  , strings_cache_size_(0) {
147  if (!isTemp && folder.empty()) {
148  return;
149  }
150 
151  // initial capacity must be a power of two for efficient bucket computation
152  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
153  if (!isTemp_) {
154  boost::filesystem::path storage_path(folder);
155  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
156  const auto payload_path =
157  (storage_path / boost::filesystem::path("DictPayload")).string();
158  payload_fd_ = checked_open(payload_path.c_str(), recover);
159  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
162  }
163  bool storage_is_empty = false;
164  if (payload_file_size_ == 0) {
165  storage_is_empty = true;
167  }
168  if (offset_file_size_ == 0) {
170  }
171  if (!isTemp_) { // we never mmap or recover temp dictionaries
172  payload_map_ =
173  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
174  offset_map_ = reinterpret_cast<StringIdxEntry*>(
176  if (recover) {
177  const size_t bytes = heavyai::file_size(offset_fd_);
178  if (bytes % sizeof(StringIdxEntry) != 0) {
179  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
180  }
181  const uint64_t str_count =
182  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
183  collisions_ = 0;
184  // at this point we know the size of the StringDict we need to load
185  // so lets reallocate the vector to the correct size
186  const uint64_t max_entries =
187  std::max(round_up_p2(str_count * 2 + 1),
188  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
189  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
190  string_id_string_dict_hash_table_.swap(new_str_ids);
191  if (materialize_hashes_) {
192  std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
193  hash_cache_.swap(new_hash_cache);
194  }
195  // Bail early if we know we don't have strings to add (i.e. a new or empty
196  // dictionary)
197  if (str_count == 0) {
198  return;
199  }
200 
201  unsigned string_id = 0;
202  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
203 
204  uint32_t thread_inits = 0;
205  const auto thread_count = std::thread::hardware_concurrency();
206  const uint32_t items_per_thread = std::max<uint32_t>(
207  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
208  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
209  dictionary_futures;
210  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
211  dictionary_futures.emplace_back(std::async(
212  std::launch::async, [string_id, str_count, items_per_thread, this] {
213  std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
214  for (uint32_t curr_id = string_id;
215  curr_id < string_id + items_per_thread && curr_id < str_count;
216  curr_id++) {
217  const auto recovered = getStringFromStorage(curr_id);
218  if (recovered.canary) {
219  // hit the canary, recovery finished
220  break;
221  } else {
222  std::string_view temp(recovered.c_str_ptr, recovered.size);
223  hashVec.emplace_back(std::make_pair(hash_string(temp), temp.size()));
224  }
225  }
226  return hashVec;
227  }));
228  thread_inits++;
229  if (thread_inits % thread_count == 0) {
230  processDictionaryFutures(dictionary_futures);
231  }
232  }
233  // gather last few threads
234  if (dictionary_futures.size() != 0) {
235  processDictionaryFutures(dictionary_futures);
236  }
237  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
238  << " Hash table size: " << string_id_string_dict_hash_table_.size()
239  << " Fill rate: "
240  << static_cast<double>(str_count_) * 100.0 /
242  << "% Collisions: " << collisions_;
243  }
244  }
245 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:301
string_dict_hash_t hash_string(const std::string_view &str)
#define LOG(tag)
Definition: Logger.h:285
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
std::vector< string_dict_hash_t > hash_cache_
std::string offsets_path_
std::shared_mutex rw_mutex_
future< Result > async(Fn &&fn, Args &&...args)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
const std::string folder_
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
int checked_open(const char *path, const bool recover)
const shared::StringDictKey dict_key_
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > string_id_string_dict_hash_table_
PayloadString getStringFromStorage(const int string_id) const noexcept
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
#define VLOG(n)
Definition: Logger.h:388
void * checked_mmap(const int fd, const size_t sz)
Definition: heavyai_fs.cpp:40

+ Here is the call graph for this function:

StringDictionary::StringDictionary ( const LeafHostInfo host,
const shared::StringDictKey dict_key 
)

Definition at line 354 of file StringDictionary.cpp.

356  : dict_key_(dict_key)
357  , folder_("DB_" + std::to_string(dict_key.db_id) + "_DICT_" +
358  std::to_string(dict_key.dict_id))
359  , strings_cache_(nullptr)
360  , client_(new StringDictionaryClient(host, {dict_key.db_id, dict_key.dict_id}, true))
std::string to_string(char const *&&v)
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
const std::string folder_
const shared::StringDictKey dict_key_
StringDictionary::~StringDictionary ( )
noexcept

Definition at line 364 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, CHECK_GE, heavyai::checked_munmap(), heavyai::close(), isClient(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

364  {
365  free(CANARY_BUFFER);
366  if (isClient()) {
367  return;
368  }
369  if (payload_map_) {
370  if (!isTemp_) {
374  CHECK_GE(payload_fd_, 0);
376  CHECK_GE(offset_fd_, 0);
378  } else {
380  free(payload_map_);
381  free(offset_map_);
382  }
383  }
384 }
StringIdxEntry * offset_map_
bool isClient() const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:306
#define CHECK(condition)
Definition: Logger.h:291
void checked_munmap(void *addr, size_t length)
Definition: heavyai_fs.cpp:53
void close(const int fd)
Definition: heavyai_fs.cpp:70

+ Here is the call graph for this function:

Member Function Documentation

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1605 of file StringDictionary.cpp.

References CHECK, and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

1607  {
1608  const size_t canary_buff_size_to_add =
1609  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1610  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1611  if (canary_buffer_size < canary_buff_size_to_add) {
1612  CANARY_BUFFER =
1613  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1614  canary_buffer_size = canary_buff_size_to_add;
1616  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1617  }
1618  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1619  CHECK(new_addr);
1620  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1621  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1622  mem_size += canary_buff_size_to_add;
1623  return new_addr;
1624 }
#define CHECK(condition)
Definition: Logger.h:291
void StringDictionary::addOffsetCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1575 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreaseOffsetCapacity(), and StringDictionary().

1575  {
1576  if (!isTemp_) {
1577  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1578  } else {
1579  offset_map_ = static_cast<StringIdxEntry*>(
1580  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1581  }
1582 }
StringIdxEntry * offset_map_
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

void StringDictionary::addPayloadCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1566 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreasePayloadCapacity(), and StringDictionary().

1566  {
1567  if (!isTemp_) {
1568  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1569  } else {
1570  payload_map_ = static_cast<char*>(
1571  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1572  }
1573 }
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

size_t StringDictionary::addStorageCapacity ( int  fd,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1584 of file StringDictionary.cpp.

References CHECK, CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

1586  {
1587  const size_t canary_buff_size_to_add =
1588  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1589  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1590 
1591  if (canary_buffer_size < canary_buff_size_to_add) {
1592  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1593  canary_buffer_size = canary_buff_size_to_add;
1595  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1596  }
1597 
1598  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1599  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1600  CHECK(write_return > 0 &&
1601  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1602  return canary_buff_size_to_add;
1603 }
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:143
#define CHECK_NE(x, y)
Definition: Logger.h:302
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

template<class String >
void StringDictionary::appendToStorage ( const String  str)
privatenoexcept

Definition at line 1511 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::size.

Referenced by getOrAddBulk().

1511  {
1512  // write the payload
1514  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1515 
1516  // write the offset and length
1517  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1518  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1519 
1521  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1522 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::appendToStorageBulk ( const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const size_t  sum_new_strings_lengths 
)
privatenoexcept

Definition at line 1525 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1528  {
1529  const size_t num_strings = string_memory_ids.size();
1530 
1531  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1532  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1533 
1534  for (size_t i = 0; i < num_strings; ++i) {
1535  const size_t string_idx = string_memory_ids[i];
1536  const String str = input_strings[string_idx];
1537  const size_t str_size(str.size());
1538  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1539  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1540  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1541  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1542  }
1543 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private
void StringDictionary::buildDictionaryNumericTranslationMap ( Datum translated_ids,
const int64_t  source_generation,
const std::vector< StringOps_Namespace::StringOpInfo > &  string_op_infos 
) const

Definition at line 2028 of file StringDictionary.cpp.

References CHECK, CHECK_GE, CHECK_GT, CHECK_LE, DEBUG_TIMER, getStringFromStorageFast(), ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, threading_serial::parallel_for(), rw_mutex_, and str_count_.

2031  {
2032  auto timer = DEBUG_TIMER(__func__);
2033  CHECK_GE(source_generation, 0L);
2034  CHECK_GT(string_op_infos.size(), 0UL);
2035  CHECK(!string_op_infos.back().getReturnType().is_string());
2036  const int64_t num_source_strings = source_generation;
2037 
2038  // We can bail early if there are no source strings to translate
2039  if (num_source_strings == 0L) {
2040  return;
2041  }
2042 
2043  // If here we should should have a local dictionary
2044  // Note case of transient source dictionaries that aren't
2045  // seen as remote (they have no client_no_timeout_) is covered
2046  // by early bail above on num_source_strings == 0
2047 
2048  std::shared_lock<std::shared_mutex> source_read_lock(rw_mutex_);
2049 
2050  // For source dictionary we cap the number of entries
2051  // to be translated/translated to at the supplied
2052  // generation arguments, if valid (i.e. >= 0), otherwise
2053  // just the size of each dictionary
2054 
2055  CHECK_LE(num_source_strings, static_cast<int64_t>(str_count_));
2056 
2057  constexpr int64_t target_strings_per_thread{1000};
2058  const ThreadInfo thread_info(
2059  std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
2060  CHECK_GE(thread_info.num_threads, 1L);
2061  CHECK_GE(thread_info.num_elems_per_thread, 1L);
2062 
2063  // We use a tbb::task_arena to cap the number of threads, has been
2064  // in other contexts been shown to exhibit better performance when low
2065  // numbers of threads are needed than just letting tbb figure the number of threads,
2066  // but should benchmark in this specific context
2067 
2068  const StringOps_Namespace::StringOps string_ops(string_op_infos);
2069  CHECK_GT(string_ops.size(), 0UL);
2070 
2071  tbb::task_arena limited_arena(thread_info.num_threads);
2072  // The below logic, by executing low-level private variable accesses on both
2073  // dictionaries, is less clean than a previous variant that simply called
2074  // `getStringViews` from the source dictionary and then called `getBulk` on the
2075  // destination dictionary, but this version gets significantly better performance
2076  // (~2X), likely due to eliminating the overhead of writing out the string views and
2077  // then reading them back in (along with the associated cache misses)
2078  limited_arena.execute([&] {
2080  tbb::blocked_range<int32_t>(
2081  0, num_source_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
2082  [&](const tbb::blocked_range<int32_t>& r) {
2083  const int32_t start_idx = r.begin();
2084  const int32_t end_idx = r.end();
2085  for (int32_t source_string_id = start_idx; source_string_id != end_idx;
2086  ++source_string_id) {
2087  const std::string source_str =
2088  std::string(getStringFromStorageFast(source_string_id));
2089  translated_ids[source_string_id] = string_ops.numericEval(source_str);
2090  }
2091  });
2092  });
2093 }
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
#define CHECK_GT(x, y)
Definition: Logger.h:305
#define CHECK_LE(x, y)
Definition: Logger.h:304
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::buildDictionaryTranslationMap ( const std::shared_ptr< StringDictionary dest_dict,
StringLookupCallback const &  dest_transient_lookup_callback 
) const

Definition at line 1844 of file StringDictionary.cpp.

References DEBUG_TIMER, and storageEntryCount().

1846  {
1847  auto timer = DEBUG_TIMER(__func__);
1848  const size_t num_source_strings = storageEntryCount();
1849  const size_t num_dest_strings = dest_dict->storageEntryCount();
1850  std::vector<int32_t> translated_ids(num_source_strings);
1851 
1852  buildDictionaryTranslationMap(dest_dict.get(),
1853  translated_ids.data(),
1854  num_source_strings,
1855  num_dest_strings,
1856  true, // Just assume true for dest_has_transients as this
1857  // function is only used for testing currently
1858  dest_transient_lookup_callback,
1859  {});
1860  return translated_ids;
1861 }
std::vector< int32_t > buildDictionaryTranslationMap(const std::shared_ptr< StringDictionary > dest_dict, StringLookupCallback const &dest_transient_lookup_callback) const
size_t storageEntryCount() const
#define DEBUG_TIMER(name)
Definition: Logger.h:412

+ Here is the call graph for this function:

size_t StringDictionary::buildDictionaryTranslationMap ( const StringDictionary dest_dict,
int32_t *  translated_ids,
const int64_t  source_generation,
const int64_t  dest_generation,
const bool  dest_has_transients,
StringLookupCallback const &  dest_transient_lookup_callback,
const std::vector< StringOps_Namespace::StringOpInfo > &  string_op_infos 
) const

Definition at line 1881 of file StringDictionary.cpp.

References CHECK_GE, CHECK_LE, client_no_timeout_, computeBucket(), DEBUG_TIMER, getDictKey(), getStringFromStorageFast(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, materialize_hashes_, ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, order_translation_locks(), threading_serial::parallel_for(), rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

1888  {
1889  auto timer = DEBUG_TIMER(__func__);
1890  CHECK_GE(source_generation, 0L);
1891  CHECK_GE(dest_generation, 0L);
1892  const int64_t num_source_strings = source_generation;
1893  const int64_t num_dest_strings = dest_generation;
1894 
1895  // We can bail early if there are no source strings to translate
1896  if (num_source_strings == 0L) {
1897  return 0;
1898  }
1899 
1900  // If here we should should have local dictionaries.
1901  // Note case of transient source dictionaries that aren't
1902  // seen as remote (they have no client_no_timeout_) is covered
1903  // by early bail above on num_source_strings == 0
1904  if (dest_dict->client_no_timeout_) {
1905  throw std::runtime_error(
1906  "Cannot translate between a local source and remote destination dictionary.");
1907  }
1908 
1909  // Sort this/source dict and dest dict on folder_ so we can enforce
1910  // lock ordering and avoid deadlocks
1911  std::shared_lock<std::shared_mutex> source_read_lock(rw_mutex_, std::defer_lock);
1912  std::shared_lock<std::shared_mutex> dest_read_lock(dest_dict->rw_mutex_,
1913  std::defer_lock);
1915  getDictKey(), dest_dict->getDictKey(), source_read_lock, dest_read_lock);
1916 
1917  // For both source and destination dictionaries we cap the max
1918  // entries to be translated/translated to at the supplied
1919  // generation arguments, if valid (i.e. >= 0), otherwise just the
1920  // size of each dictionary
1921 
1922  CHECK_LE(num_source_strings, static_cast<int64_t>(str_count_));
1923  CHECK_LE(num_dest_strings, static_cast<int64_t>(dest_dict->str_count_));
1924  const bool dest_dictionary_is_empty = (num_dest_strings == 0);
1925 
1926  constexpr int64_t target_strings_per_thread{1000};
1927  const ThreadInfo thread_info(
1928  std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
1929  CHECK_GE(thread_info.num_threads, 1L);
1930  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1931 
1932  // We use a tbb::task_arena to cap the number of threads, has been
1933  // in other contexts been shown to exhibit better performance when low
1934  // numbers of threads are needed than just letting tbb figure the number of threads,
1935  // but should benchmark in this specific context
1936 
1937  const StringOps_Namespace::StringOps string_ops(string_op_infos);
1938  const bool has_string_ops = string_ops.size();
1939 
1940  tbb::task_arena limited_arena(thread_info.num_threads);
1941  std::vector<size_t> num_strings_not_translated_per_thread(thread_info.num_threads, 0UL);
1942  constexpr bool short_circuit_empty_dictionary_translations{false};
1943  limited_arena.execute([&] {
1944  if (short_circuit_empty_dictionary_translations && dest_dictionary_is_empty) {
1946  tbb::blocked_range<int32_t>(
1947  0,
1948  num_source_strings,
1949  thread_info.num_elems_per_thread /* tbb grain_size */),
1950  [&](const tbb::blocked_range<int32_t>& r) {
1951  const int32_t start_idx = r.begin();
1952  const int32_t end_idx = r.end();
1953  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1954  translated_ids[string_idx] = INVALID_STR_ID;
1955  }
1956  },
1957  tbb::simple_partitioner());
1958  num_strings_not_translated_per_thread[0] += num_source_strings;
1959  } else {
1960  // The below logic, by executing low-level private variable accesses on both
1961  // dictionaries, is less clean than a previous variant that simply called
1962  // `getStringViews` from the source dictionary and then called `getBulk` on the
1963  // destination dictionary, but this version gets significantly better performance
1964  // (~2X), likely due to eliminating the overhead of writing out the string views and
1965  // then reading them back in (along with the associated cache misses)
1967  tbb::blocked_range<int32_t>(
1968  0,
1969  num_source_strings,
1970  thread_info.num_elems_per_thread /* tbb grain_size */),
1971  [&](const tbb::blocked_range<int32_t>& r) {
1972  const int32_t start_idx = r.begin();
1973  const int32_t end_idx = r.end();
1974  size_t num_strings_not_translated = 0;
1975  std::string string_ops_storage; // Needs to be thread local to back
1976  // string_view returned by string_ops()
1977  for (int32_t source_string_id = start_idx; source_string_id != end_idx;
1978  ++source_string_id) {
1979  const std::string_view source_str =
1980  has_string_ops ? string_ops(getStringFromStorageFast(source_string_id),
1981  string_ops_storage)
1982  : getStringFromStorageFast(source_string_id);
1983 
1984  if (source_str.empty()) {
1985  translated_ids[source_string_id] = inline_int_null_value<int32_t>();
1986  continue;
1987  }
1988  // Get the hash from this/the source dictionary's cache, as the function
1989  // will be the same for the dest_dict, sparing us having to recompute it
1990 
1991  // Todo(todd): Remove option to turn string hash cache off or at least
1992  // make a constexpr to avoid these branches when we expect it to be always
1993  // on going forward
1994  const string_dict_hash_t hash = (materialize_hashes_ && !has_string_ops)
1995  ? hash_cache_[source_string_id]
1996  : hash_string(source_str);
1997  const uint32_t hash_bucket = dest_dict->computeBucket(
1998  hash, source_str, dest_dict->string_id_string_dict_hash_table_);
1999  const auto translated_string_id =
2000  dest_dict->string_id_string_dict_hash_table_[hash_bucket];
2001  translated_ids[source_string_id] = translated_string_id;
2002 
2003  if (translated_string_id == StringDictionary::INVALID_STR_ID ||
2004  translated_string_id >= num_dest_strings) {
2005  if (dest_has_transients) {
2006  num_strings_not_translated +=
2007  dest_transient_lookup_callback(source_str, source_string_id);
2008  } else {
2009  num_strings_not_translated++;
2010  }
2011  continue;
2012  }
2013  }
2014  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
2015  num_strings_not_translated_per_thread[tbb_thread_idx] +=
2016  num_strings_not_translated;
2017  },
2018  tbb::simple_partitioner());
2019  }
2020  });
2021  size_t total_num_strings_not_translated = 0;
2022  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
2023  total_num_strings_not_translated += num_strings_not_translated_per_thread[thread_idx];
2024  }
2025  return total_num_strings_not_translated;
2026 }
string_dict_hash_t hash_string(const std::string_view &str)
const shared::StringDictKey & getDictKey() const noexcept
std::vector< string_dict_hash_t > hash_cache_
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
void order_translation_locks(const shared::StringDictKey &source_dict_key, const shared::StringDictKey &dest_dict_key, std::shared_lock< std::shared_mutex > &source_read_lock, std::shared_lock< std::shared_mutex > &dest_read_lock)
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
uint32_t string_dict_hash_t
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

void StringDictionary::buildSortedCache ( )
private

Definition at line 1674 of file StringDictionary.cpp.

References mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1674  {
1675  // This method is not thread-safe.
1676  const auto cur_cache_size = sorted_cache.size();
1677  std::vector<int32_t> temp_sorted_cache;
1678  for (size_t i = cur_cache_size; i < str_count_; i++) {
1679  temp_sorted_cache.push_back(i);
1680  }
1681  sortCache(temp_sorted_cache);
1682  mergeSortedCache(temp_sorted_cache);
1683 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::checkAndConditionallyIncreaseOffsetCapacity ( const size_t  write_length)
private

Definition at line 1490 of file StringDictionary.cpp.

References addOffsetCapacity(), CHECK, CHECK_GE, heavyai::checked_mmap(), heavyai::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, and str_count_.

1491  {
1492  const size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1493  if (offset_file_off + write_length >= offset_file_size_) {
1494  const size_t min_capacity_needed =
1495  write_length - (offset_file_size_ - offset_file_off);
1496  if (!isTemp_) {
1497  CHECK_GE(offset_fd_, 0);
1499  addOffsetCapacity(min_capacity_needed);
1500  CHECK(offset_file_off + write_length <= offset_file_size_);
1501  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1503  } else {
1504  addOffsetCapacity(min_capacity_needed);
1505  CHECK(offset_file_off + write_length <= offset_file_size_);
1506  }
1507  }
1508 }
StringIdxEntry * offset_map_
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:306
#define CHECK(condition)
Definition: Logger.h:291
void checked_munmap(void *addr, size_t length)
Definition: heavyai_fs.cpp:53
void * checked_mmap(const int fd, const size_t sz)
Definition: heavyai_fs.cpp:40

+ Here is the call graph for this function:

void StringDictionary::checkAndConditionallyIncreasePayloadCapacity ( const size_t  write_length)
private

Definition at line 1471 of file StringDictionary.cpp.

References addPayloadCapacity(), CHECK, CHECK_GE, heavyai::checked_mmap(), heavyai::checked_munmap(), isTemp_, payload_fd_, payload_file_off_, payload_file_size_, and payload_map_.

1472  {
1473  if (payload_file_off_ + write_length > payload_file_size_) {
1474  const size_t min_capacity_needed =
1475  write_length - (payload_file_size_ - payload_file_off_);
1476  if (!isTemp_) {
1477  CHECK_GE(payload_fd_, 0);
1479  addPayloadCapacity(min_capacity_needed);
1480  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1481  payload_map_ =
1482  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
1483  } else {
1484  addPayloadCapacity(min_capacity_needed);
1485  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1486  }
1487  }
1488 }
#define CHECK_GE(x, y)
Definition: Logger.h:306
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
#define CHECK(condition)
Definition: Logger.h:291
void checked_munmap(void *addr, size_t length)
Definition: heavyai_fs.cpp:53
void * checked_mmap(const int fd, const size_t sz)
Definition: heavyai_fs.cpp:40

+ Here is the call graph for this function:

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1651 of file StringDictionary.cpp.

References CHECK, client_, heavyai::fsync(), isClient(), isTemp_, heavyai::msync(), offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by import_export::TypedImportBuffer::stringDictCheckpoint().

1651  {
1652  if (isClient()) {
1653  try {
1654  return client_->checkpoint();
1655  } catch (...) {
1656  return false;
1657  }
1658  }
1659  CHECK(!isTemp_);
1660  bool ret = true;
1661  ret = ret &&
1662  (heavyai::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1663  ret = ret &&
1664  (heavyai::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1665  ret = ret && (heavyai::fsync(offset_fd_) == 0);
1666  ret = ret && (heavyai::fsync(payload_fd_) == 0);
1667  return ret;
1668 }
StringIdxEntry * offset_map_
bool isClient() const noexcept
std::unique_ptr< StringDictionaryClient > client_
int msync(void *addr, size_t length, bool async)
Definition: heavyai_fs.cpp:57
int fsync(int fd)
Definition: heavyai_fs.cpp:62
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucket ( const string_dict_hash_t  hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
) const
privatenoexcept

Definition at line 1373 of file StringDictionary.cpp.

Referenced by buildDictionaryTranslationMap(), getBulk(), and getOrAddBulk().

1376  {
1377  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1378  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1379  while (true) {
1380  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1381  if (candidate_string_id ==
1382  INVALID_STR_ID) { // In this case it means the slot is available for use
1383  break;
1384  }
1385  if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1387  const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1388  if (input_string.size() == candidate_string.size() &&
1389  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1390  // found the string
1391  break;
1392  }
1393  }
1394  // wrap around
1395  if (++bucket == string_dict_hash_table_size) {
1396  bucket = 0;
1397  }
1398  }
1399  return bucket;
1400 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucketFromStorageAndMemory ( const string_dict_hash_t  input_string_hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids 
) const
privatenoexcept

memcmp(input_string.data(), candidate_storage_string.c_str_ptr, input_string.size())) {

Definition at line 1403 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1409  {
1410  uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1411  while (true) {
1412  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1413  if (candidate_string_id ==
1414  INVALID_STR_ID) { // In this case it means the slot is available for use
1415  break;
1416  }
1417  if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1418  if (candidate_string_id > 0 &&
1419  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1420  // The candidate string is not in storage yet but in our string_memory_ids temp
1421  // buffer
1422  size_t memory_offset =
1423  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1424  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1425  if (input_string.size() == candidate_string.size() &&
1426  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1427  // found the string in the temp memory buffer
1428  break;
1429  }
1430  } else {
1431  // The candidate string is in storage, need to fetch it for comparison
1432  const auto candidate_storage_string =
1433  getStringFromStorageFast(candidate_string_id);
1434  if (input_string.size() == candidate_storage_string.size() &&
1435  !memcmp(input_string.data(),
1436  candidate_storage_string.data(),
1437  input_string.size())) {
1440  // found the string in storage
1441  break;
1442  }
1443  }
1444  }
1445  if (++bucket == string_id_string_dict_hash_table.size()) {
1446  bucket = 0;
1447  }
1448  }
1449  return bucket;
1450 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

size_t StringDictionary::computeCacheSize ( ) const

Definition at line 2111 of file StringDictionary.cpp.

References compare_cache_size_, equal_cache_size_, hash_cache_, like_cache_size_, regex_cache_size_, rw_mutex_, sorted_cache, string_id_string_dict_hash_table_, and strings_cache_size_.

2111  {
2112  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
2113  return string_id_string_dict_hash_table_.size() * sizeof(int32_t) +
2114  hash_cache_.size() * sizeof(string_dict_hash_t) +
2115  sorted_cache.size() * sizeof(int32_t) + like_cache_size_ + regex_cache_size_ +
2117 }
std::vector< string_dict_hash_t > hash_cache_
std::shared_mutex rw_mutex_
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_
std::vector< int32_t > sorted_cache
uint32_t StringDictionary::computeUniqueBucketWithHash ( const string_dict_hash_t  hash,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
)
privatenoexcept

Definition at line 1452 of file StringDictionary.cpp.

Referenced by increaseHashTableCapacity(), and processDictionaryFutures().

1454  {
1455  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1456  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1457  while (true) {
1458  if (string_id_string_dict_hash_table[bucket] ==
1459  INVALID_STR_ID) { // In this case it means the slot is available for use
1460  break;
1461  }
1462  collisions_++;
1463  // wrap around
1464  if (++bucket == string_dict_hash_table_size) {
1465  bucket = 0;
1466  }
1467  }
1468  return bucket;
1469 }
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

std::vector< std::string > StringDictionary::copyStrings ( ) const

Definition at line 1191 of file StringDictionary.cpp.

References gpu_enabled::accumulate(), threading_serial::async(), CHECK_EQ, CHECK_GT, CHECK_LE, gpu_enabled::copy(), cpu_threads(), getStringUnlocked(), isClient(), rw_mutex_, str_count_, strings_cache_, and strings_cache_size_.

1191  {
1192  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1193  if (isClient()) {
1194  // TODO(miyu): support remote string dictionary
1195  throw std::runtime_error(
1196  "copying dictionaries from remote server is not supported yet.");
1197  }
1198 
1199  if (strings_cache_) {
1200  return *strings_cache_;
1201  }
1202 
1203  strings_cache_ = std::make_shared<std::vector<std::string>>();
1204  strings_cache_->reserve(str_count_);
1205  const bool multithreaded = str_count_ > 10000;
1206  const auto worker_count =
1207  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
1208  CHECK_GT(worker_count, 0UL);
1209  std::vector<std::vector<std::string>> worker_results(worker_count);
1210  std::vector<size_t> string_size(worker_count, 0);
1211  auto copy = [this, &string_size](std::vector<std::string>& str_list,
1212  const size_t worker_idx,
1213  const size_t start_id,
1214  const size_t end_id) {
1215  CHECK_LE(start_id, end_id);
1216  str_list.reserve(end_id - start_id);
1217  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
1218  auto str = getStringUnlocked(string_id);
1219  string_size[worker_idx] += str.size();
1220  str_list.push_back(str);
1221  }
1222  };
1223  if (multithreaded) {
1224  std::vector<std::future<void>> workers;
1225  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
1226  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
1227  worker_idx < worker_count && start < str_count_;
1228  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
1229  workers.push_back(std::async(std::launch::async,
1230  copy,
1231  std::ref(worker_results[worker_idx]),
1232  worker_idx,
1233  start,
1234  end));
1235  }
1236  for (auto& worker : workers) {
1237  worker.get();
1238  }
1239  } else {
1240  CHECK_EQ(worker_results.size(), size_t(1));
1241  copy(worker_results[0], 0, 0, str_count_);
1242  }
1243 
1244  for (const auto& worker_result : worker_results) {
1245  strings_cache_->insert(
1246  strings_cache_->end(), worker_result.begin(), worker_result.end());
1247  }
1249  std::accumulate(string_size.begin(), string_size.end(), size_t(0));
1250  return *strings_cache_;
1251 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
bool isClient() const noexcept
std::shared_mutex rw_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string getStringUnlocked(int32_t string_id) const noexcept
future< Result > async(Fn &&fn, Args &&...args)
std::shared_ptr< std::vector< std::string > > strings_cache_
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
#define CHECK_LE(x, y)
Definition: Logger.h:304
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

void StringDictionary::eachStringSerially ( int64_t const  generation,
StringCallback serial_callback 
) const

Definition at line 276 of file StringDictionary.cpp.

References CHECK_LE, client_, getStringFromStorageFast(), isClient(), anonymous_namespace{Utm.h}::n, rw_mutex_, storageEntryCount(), and str_count_.

Referenced by makeLambdaStringToId().

277  {
278  if (isClient()) {
279  // copyStrings() is not supported when isClient().
280  std::string str; // Import buffer. Placing outside of loop should reduce allocations.
281  size_t const n = std::min(static_cast<size_t>(generation), storageEntryCount());
282  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
283  for (unsigned id = 0; id < n; ++id) {
284  {
285  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
286  client_->get_string(str, id);
287  }
288  serial_callback(str, id);
289  }
290  } else {
291  size_t const n = std::min(static_cast<size_t>(generation), str_count_);
292  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
293  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
294  for (unsigned id = 0; id < n; ++id) {
295  serial_callback(getStringFromStorageFast(static_cast<int>(id)), id);
296  }
297  }
298 }
bool isClient() const noexcept
size_t storageEntryCount() const
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
#define CHECK_LE(x, y)
Definition: Logger.h:304
constexpr double n
Definition: Utm.h:38

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool StringDictionary::fillRateIsHigh ( const size_t  num_strings) const
privatenoexcept

Definition at line 1253 of file StringDictionary.cpp.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1253  {
1254  return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1255 }
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the caller graph for this function:

template<class T , class String >
size_t StringDictionary::getBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
) const

Definition at line 495 of file StringDictionary.cpp.

496  {
497  return getBulk(string_vec, encoded_vec, -1L /* generation */);
498 }
size_t getBulk(const std::vector< String > &string_vec, T *encoded_vec) const
template<class T , class String >
size_t StringDictionary::getBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec,
const int64_t  generation 
) const

Definition at line 508 of file StringDictionary.cpp.

References CHECK_GE, computeBucket(), dict_key_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, MAX_STRLEN, ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, threading_serial::parallel_for(), rw_mutex_, storageEntryCount(), string_id_string_dict_hash_table_, and anonymous_namespace{StringDictionary.cpp}::throw_string_too_long_error().

510  {
511  constexpr int64_t target_strings_per_thread{1000};
512  const int64_t num_lookup_strings = string_vec.size();
513  if (num_lookup_strings == 0) {
514  return 0;
515  }
516 
517  const ThreadInfo thread_info(
518  std::thread::hardware_concurrency(), num_lookup_strings, target_strings_per_thread);
519  CHECK_GE(thread_info.num_threads, 1L);
520  CHECK_GE(thread_info.num_elems_per_thread, 1L);
521 
522  std::vector<size_t> num_strings_not_found_per_thread(thread_info.num_threads, 0UL);
523 
524  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
525  const int64_t num_dict_strings = generation >= 0 ? generation : storageEntryCount();
526  const bool dictionary_is_empty = (num_dict_strings == 0);
527  if (dictionary_is_empty) {
528  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_lookup_strings),
529  [&](const tbb::blocked_range<int64_t>& r) {
530  const int64_t start_idx = r.begin();
531  const int64_t end_idx = r.end();
532  for (int64_t string_idx = start_idx; string_idx < end_idx;
533  ++string_idx) {
534  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
535  }
536  });
537  return num_lookup_strings;
538  }
539  // If we're here the generation-capped dictionary has strings in it
540  // that we need to look up against
541 
542  tbb::task_arena limited_arena(thread_info.num_threads);
543  limited_arena.execute([&] {
545  tbb::blocked_range<int64_t>(
546  0, num_lookup_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
547  [&](const tbb::blocked_range<int64_t>& r) {
548  const int64_t start_idx = r.begin();
549  const int64_t end_idx = r.end();
550  size_t num_strings_not_found = 0;
551  for (int64_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
552  const auto& input_string = string_vec[string_idx];
553  if (input_string.empty()) {
554  encoded_vec[string_idx] = inline_int_null_value<T>();
555  continue;
556  }
557  if (input_string.size() > StringDictionary::MAX_STRLEN) {
558  throw_string_too_long_error(input_string, dict_key_);
559  }
560  const string_dict_hash_t input_string_hash = hash_string(input_string);
561  uint32_t hash_bucket = computeBucket(
562  input_string_hash, input_string, string_id_string_dict_hash_table_);
563  // Will either be legit id or INVALID_STR_ID
564  const auto string_id = string_id_string_dict_hash_table_[hash_bucket];
565  if (string_id == StringDictionary::INVALID_STR_ID ||
566  string_id >= num_dict_strings) {
567  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
568  num_strings_not_found++;
569  continue;
570  }
571  encoded_vec[string_idx] = string_id;
572  }
573  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
574  num_strings_not_found_per_thread[tbb_thread_idx] = num_strings_not_found;
575  },
576  tbb::simple_partitioner());
577  });
578 
579  size_t num_strings_not_found = 0;
580  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
581  num_strings_not_found += num_strings_not_found_per_thread[thread_idx];
582  }
583  return num_strings_not_found;
584 }
void throw_string_too_long_error(std::string_view str, const shared::StringDictKey &dict_key)
string_dict_hash_t hash_string(const std::string_view &str)
size_t storageEntryCount() const
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::shared_mutex rw_mutex_
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
const shared::StringDictKey dict_key_
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 981 of file StringDictionary.cpp.

References anonymous_namespace{Utm.h}::a, buildSortedCache(), client_, compare_cache_, compare_cache_size_, getEquals(), getStringFromStorage(), isClient(), gpu_enabled::lower_bound(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

983  {
984  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
985  if (isClient()) {
986  return client_->get_compare(pattern, comp_operator, generation);
987  }
988  std::vector<int32_t> ret;
989  if (str_count_ == 0) {
990  return ret;
991  }
992  if (sorted_cache.size() < str_count_) {
993  if (comp_operator == "=" || comp_operator == "<>") {
994  return getEquals(pattern, comp_operator, generation);
995  }
996 
998  }
999  auto cache_index = compare_cache_.get(pattern);
1000 
1001  if (!cache_index) {
1002  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
1003  const auto cache_itr = std::lower_bound(
1004  sorted_cache.begin(),
1005  sorted_cache.end(),
1006  pattern,
1007  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
1008  auto a_str = this->getStringFromStorage(a);
1009  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
1010  });
1011 
1012  if (cache_itr == sorted_cache.end()) {
1013  cache_index->index = sorted_cache.size() - 1;
1014  cache_index->diff = 1;
1015  } else {
1016  const auto cache_str = getStringFromStorage(*cache_itr);
1017  if (!string_eq(
1018  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
1019  cache_index->index = cache_itr - sorted_cache.begin() - 1;
1020  cache_index->diff = 1;
1021  } else {
1022  cache_index->index = cache_itr - sorted_cache.begin();
1023  cache_index->diff = 0;
1024  }
1025  }
1026 
1027  compare_cache_.put(pattern, cache_index);
1028  compare_cache_size_ += (pattern.size() + sizeof(cache_index));
1029  }
1030 
1031  // since we have a cache in form of vector of ints which is sorted according to
1032  // corresponding strings in the dictionary all we need is the index of the element
1033  // which equal to the pattern that we are trying to match or the index of “biggest”
1034  // element smaller than the pattern, to perform all the comparison operators over
1035  // string. The search function guarantees we have such index so now it is just the
1036  // matter to include all the elements in the result vector.
1037 
1038  // For < operator if the index that we have points to the element which is equal to
1039  // the pattern that we are searching for we simply get all the elements less than the
1040  // index. If the element pointed by the index is not equal to the pattern we are
1041  // comparing with we also need to include that index in result vector, except when the
1042  // index points to 0 and the pattern is lesser than the smallest value in the string
1043  // dictionary.
1044 
1045  if (comp_operator == "<") {
1046  size_t idx = cache_index->index;
1047  if (cache_index->diff) {
1048  idx = cache_index->index + 1;
1049  if (cache_index->index == 0 && cache_index->diff > 0) {
1050  idx = cache_index->index;
1051  }
1052  }
1053  for (size_t i = 0; i < idx; i++) {
1054  ret.push_back(sorted_cache[i]);
1055  }
1056 
1057  // For <= operator if the index that we have points to the element which is equal to
1058  // the pattern that we are searching for we want to include the element pointed by
1059  // the index in the result set. If the element pointed by the index is not equal to
1060  // the pattern we are comparing with we just want to include all the ids with index
1061  // less than the index that is cached, except when pattern that we are searching for
1062  // is smaller than the smallest string in the dictionary.
1063 
1064  } else if (comp_operator == "<=") {
1065  size_t idx = cache_index->index + 1;
1066  if (cache_index == 0 && cache_index->diff > 0) {
1067  idx = cache_index->index;
1068  }
1069  for (size_t i = 0; i < idx; i++) {
1070  ret.push_back(sorted_cache[i]);
1071  }
1072 
1073  // For > operator we want to get all the elements with index greater than the index
1074  // that we have except, when the pattern we are searching for is lesser than the
1075  // smallest string in the dictionary we also want to include the id of the index
1076  // that we have.
1077 
1078  } else if (comp_operator == ">") {
1079  size_t idx = cache_index->index + 1;
1080  if (cache_index->index == 0 && cache_index->diff > 0) {
1081  idx = cache_index->index;
1082  }
1083  for (size_t i = idx; i < sorted_cache.size(); i++) {
1084  ret.push_back(sorted_cache[i]);
1085  }
1086 
1087  // For >= operator when the indexed element that we have points to element which is
1088  // equal to the pattern we are searching for we want to include that in the result
1089  // vector. If the index that we have does not point to the string which is equal to
1090  // the pattern we are searching we don’t want to include that id into the result
1091  // vector except when the index is 0.
1092 
1093  } else if (comp_operator == ">=") {
1094  size_t idx = cache_index->index;
1095  if (cache_index->diff) {
1096  idx = cache_index->index + 1;
1097  if (cache_index->index == 0 && cache_index->diff > 0) {
1098  idx = cache_index->index;
1099  }
1100  }
1101  for (size_t i = idx; i < sorted_cache.size(); i++) {
1102  ret.push_back(sorted_cache[i]);
1103  }
1104  } else if (comp_operator == "=") {
1105  if (!cache_index->diff) {
1106  ret.push_back(sorted_cache[cache_index->index]);
1107  }
1108 
1109  // For <> operator it is simple matter of not including id of string which is equal
1110  // to pattern we are searching for.
1111  } else if (comp_operator == "<>") {
1112  if (!cache_index->diff) {
1113  size_t idx = cache_index->index;
1114  for (size_t i = 0; i < idx; i++) {
1115  ret.push_back(sorted_cache[i]);
1116  }
1117  ++idx;
1118  for (size_t i = idx; i < sorted_cache.size(); i++) {
1119  ret.push_back(sorted_cache[i]);
1120  }
1121  } else {
1122  for (size_t i = 0; i < sorted_cache.size(); i++) {
1123  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
1124  }
1125  }
1126 
1127  } else {
1128  std::runtime_error("Unsupported string comparison operator");
1129  }
1130  return ret;
1131 }
bool isClient() const noexcept
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:336
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:308
constexpr double a
Definition: Utm.h:32
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

const shared::StringDictKey & StringDictionary::getDictKey ( ) const
noexcept

Definition at line 320 of file StringDictionary.cpp.

References dict_key_.

Referenced by RowSetMemoryOwner::addStringProxyIntersectionTranslationMap(), RowSetMemoryOwner::addStringProxyNumericTranslationMap(), RowSetMemoryOwner::addStringProxyUnionTranslationMap(), and buildDictionaryTranslationMap().

320  {
321  return dict_key_;
322 }
const shared::StringDictKey dict_key_

+ Here is the caller graph for this function:

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 920 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, equal_cache_size_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

922  {
923  std::vector<int32_t> result;
924  auto eq_id_itr = equal_cache_.find(pattern);
925  int32_t eq_id = MAX_STRLEN + 1;
926  int32_t cur_size = str_count_;
927  if (eq_id_itr != equal_cache_.end()) {
928  eq_id = eq_id_itr->second;
929  if (comp_operator == "=") {
930  result.push_back(eq_id);
931  } else {
932  for (int32_t idx = 0; idx <= cur_size; idx++) {
933  if (idx == eq_id) {
934  continue;
935  }
936  result.push_back(idx);
937  }
938  }
939  } else {
940  std::vector<std::thread> workers;
941  int worker_count = cpu_threads();
942  CHECK_GT(worker_count, 0);
943  std::vector<std::vector<int32_t>> worker_results(worker_count);
944  CHECK_LE(generation, str_count_);
945  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
946  workers.emplace_back(
947  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
948  for (size_t string_id = worker_idx; string_id < generation;
949  string_id += worker_count) {
950  const auto str = getStringUnlocked(string_id);
951  if (str == pattern) {
952  worker_results[worker_idx].push_back(string_id);
953  }
954  }
955  });
956  }
957  for (auto& worker : workers) {
958  worker.join();
959  }
960  for (const auto& worker_result : worker_results) {
961  result.insert(result.end(), worker_result.begin(), worker_result.end());
962  }
963  if (result.size() > 0) {
964  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
965  equal_cache_size_ += (pattern.size() + (result.size() * sizeof(int32_t)));
966  CHECK(it_ok.second);
967  eq_id = result[0];
968  }
969  if (comp_operator == "<>") {
970  for (int32_t idx = 0; idx <= cur_size; idx++) {
971  if (idx == eq_id) {
972  continue;
973  }
974  result.push_back(idx);
975  }
976  }
977  }
978  return result;
979 }
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string getStringUnlocked(int32_t string_id) const noexcept
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:304
#define CHECK(condition)
Definition: Logger.h:291
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template int32_t StringDictionary::getIdOfString ( const String &  ) const

Definition at line 751 of file StringDictionary.cpp.

References client_, getUnlocked(), isClient(), and rw_mutex_.

751  {
752  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
753  if (isClient()) {
754  if constexpr (std::is_same_v<std::string, std::decay_t<String>>) {
755  return client_->get(str);
756  } else {
757  return client_->get(std::string(str));
758  }
759  }
760  return getUnlocked(str);
761 }
bool isClient() const noexcept
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
int32_t getUnlocked(const std::string_view sv) const noexcept

+ Here is the call graph for this function:

template<typename T >
std::vector<T> StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const
template<>
std::vector<int32_t> StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 869 of file StringDictionary.cpp.

References CHECK, and run_benchmark_import::result.

873  {
874  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
875  if (isClient()) {
876  return client_->get_like_i32(pattern, icase, is_simple, escape, generation);
877  }
878  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
879  const auto it = like_i32_cache_.find(cache_key);
880  if (it != like_i32_cache_.end()) {
881  return it->second;
882  }
883 
884  auto result = getLikeImpl<int32_t>(pattern, icase, is_simple, escape, generation);
885  // place result into cache for reuse if similar query
886  const auto it_ok = like_i32_cache_.insert(std::make_pair(cache_key, result));
887  like_cache_size_ += (pattern.size() + 3 + (result.size() * sizeof(int32_t)));
888 
889  CHECK(it_ok.second);
890 
891  return result;
892 }
bool isClient() const noexcept
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_i32_cache_
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
#define CHECK(condition)
Definition: Logger.h:291
template<>
std::vector<int64_t> StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 895 of file StringDictionary.cpp.

References CHECK, and run_benchmark_import::result.

899  {
900  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
901  if (isClient()) {
902  return client_->get_like_i64(pattern, icase, is_simple, escape, generation);
903  }
904  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
905  const auto it = like_i64_cache_.find(cache_key);
906  if (it != like_i64_cache_.end()) {
907  return it->second;
908  }
909 
910  auto result = getLikeImpl<int64_t>(pattern, icase, is_simple, escape, generation);
911  // place result into cache for reuse if similar query
912  const auto it_ok = like_i64_cache_.insert(std::make_pair(cache_key, result));
913  like_cache_size_ += (pattern.size() + 3 + (result.size() * sizeof(int64_t)));
914 
915  CHECK(it_ok.second);
916 
917  return result;
918 }
bool isClient() const noexcept
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int64_t > > like_i64_cache_
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
#define CHECK(condition)
Definition: Logger.h:291
template<typename T >
std::vector< T > StringDictionary::getLikeImpl ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 818 of file StringDictionary.cpp.

References gpu_enabled::copy(), cpu_threads(), getStringUnlocked(), threading_serial::parallel_for(), gpu_enabled::partial_sum(), run_benchmark_import::result, string_ilike(), string_ilike_simple(), string_like(), and string_like_simple().

822  {
823  constexpr size_t grain_size{1000};
824  auto is_like_impl = icase ? is_simple ? string_ilike_simple : string_ilike
825  : is_simple ? string_like_simple
826  : string_like;
827  auto const num_threads = static_cast<size_t>(cpu_threads());
828  std::vector<std::vector<T>> worker_results(num_threads);
829  tbb::task_arena limited_arena(num_threads);
830  limited_arena.execute([&] {
832  tbb::blocked_range<size_t>(0, generation, grain_size),
833  [&is_like_impl, &pattern, &escape, &worker_results, this](
834  const tbb::blocked_range<size_t>& range) {
835  auto& result_vector =
836  worker_results[tbb::this_task_arena::current_thread_index()];
837  for (size_t i = range.begin(); i < range.end(); ++i) {
838  const auto str = getStringUnlocked(i);
839  if (is_like_impl(
840  str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape)) {
841  result_vector.push_back(i);
842  }
843  }
844  });
845  });
846  // partial_sum to get 1) a start offset for each thread and 2) the total # elems
847  std::vector<size_t> start_offsets(num_threads + 1, 0);
848  auto vec_size = [](std::vector<T> const& vec) { return vec.size(); };
849  auto begin = boost::make_transform_iterator(worker_results.begin(), vec_size);
850  auto end = boost::make_transform_iterator(worker_results.end(), vec_size);
851  std::partial_sum(begin, end, start_offsets.begin() + 1); // first element is 0
852 
853  std::vector<T> result(start_offsets[num_threads]);
854  limited_arena.execute([&] {
856  tbb::blocked_range<size_t>(0, num_threads, 1),
857  [&worker_results, &result, &start_offsets](
858  const tbb::blocked_range<size_t>& range) {
859  auto& result_vector = worker_results[range.begin()];
860  auto const start_offset = start_offsets[range.begin()];
861  std::copy(
862  result_vector.begin(), result_vector.end(), result.begin() + start_offset);
863  },
864  tbb::static_partitioner());
865  });
866  return result;
867 }
RUNTIME_EXPORT DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, char escape_char)
Definition: StringLike.cpp:61
std::string getStringUnlocked(int32_t string_id) const noexcept
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
RUNTIME_EXPORT DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:250
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
RUNTIME_EXPORT DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, char escape_char)
Definition: StringLike.cpp:43
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
int cpu_threads()
Definition: thread_count.h:25
RUNTIME_EXPORT DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:261

+ Here is the call graph for this function:

size_t StringDictionary::getNumStringsFromStorage ( const size_t  storage_slots) const
privatenoexcept

Method to retrieve number of strings in storage via a binary search for the first canary

Parameters
storage_slotsnumber of storage entries we should search to find the minimum canary
Returns
number of strings in storage

Definition at line 331 of file StringDictionary.cpp.

References CHECK_GE.

Referenced by StringDictionary().

332  {
333  if (storage_slots == 0) {
334  return 0;
335  }
336  // Must use signed integers since final binary search step can wrap to max size_t value
337  // if dictionary is empty
338  int64_t min_bound = 0;
339  int64_t max_bound = storage_slots - 1;
340  int64_t guess{0};
341  while (min_bound <= max_bound) {
342  guess = (max_bound + min_bound) / 2;
343  CHECK_GE(guess, 0);
344  if (getStringFromStorage(guess).canary) {
345  max_bound = guess - 1;
346  } else {
347  min_bound = guess + 1;
348  }
349  }
350  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
351  return guess + (min_bound > guess ? 1 : 0);
352 }
#define CHECK_GE(x, y)
Definition: Logger.h:306
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 388 of file StringDictionary.cpp.

References CHECK_EQ.

388  {
389  if (isClient()) {
390  std::vector<int32_t> string_ids;
391  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
392  CHECK_EQ(size_t(1), string_ids.size());
393  return string_ids.front();
394  }
395  return getOrAddImpl(str);
396 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
bool isClient() const noexcept
std::unique_ptr< StringDictionaryClient > client_
int32_t getOrAddImpl(const std::string_view &str) noexcept
template<class T , class String >
template void StringDictionary::getOrAddBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 597 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, computeBucket(), dict_key_, fillRateIsHigh(), g_enable_stringdict_parallel, getOrAddBulkParallel(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), increaseHashTableCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by import_export::TypedImportBuffer::addDictEncodedString(), ArrowForeignStorageBase::convertArrowDictionary(), ArrowForeignStorageBase::createDictionaryEncodedColumn(), data_conversion::StringViewToStringDictEncoder< IdType >::encodeAndAppendData(), foreign_storage::ParquetStringEncoder< V >::encodeAndCopyContiguous(), getOrAddBulkArray(), and populate_string_ids().

598  {
600  getOrAddBulkParallel(input_strings, output_string_ids);
601  return;
602  }
603  // Single-thread path.
604  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
605 
606  const size_t initial_str_count = str_count_;
607  size_t idx = 0;
608  for (const auto& input_string : input_strings) {
609  if (input_string.empty()) {
610  output_string_ids[idx++] = inline_int_null_value<T>();
611  continue;
612  }
613  CHECK(input_string.size() <= MAX_STRLEN);
614 
615  const string_dict_hash_t input_string_hash = hash_string(input_string);
616  uint32_t hash_bucket =
617  computeBucket(input_string_hash, input_string, string_id_string_dict_hash_table_);
619  output_string_ids[idx++] = string_id_string_dict_hash_table_[hash_bucket];
620  continue;
621  }
622  // need to add record to dictionary
623  // check there is room
624  if (str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
625  throw_encoding_error<T>(input_string, dict_key_);
626  }
628  << "Maximum number (" << str_count_
629  << ") of Dictionary encoded Strings reached for this column, offset path "
630  "for column is "
631  << offsets_path_;
632  if (fillRateIsHigh(str_count_)) {
633  // resize when more than 50% is full
635  hash_bucket = computeBucket(
636  input_string_hash, input_string, string_id_string_dict_hash_table_);
637  }
638  appendToStorage(input_string);
639 
640  if (materialize_hashes_) {
641  hash_cache_[str_count_] = input_string_hash;
642  }
643  const int32_t string_id = static_cast<int32_t>(str_count_);
644  string_id_string_dict_hash_table_[hash_bucket] = string_id;
645  output_string_ids[idx++] = string_id;
646  ++str_count_;
647  }
648  const size_t num_strings_added = str_count_ - initial_str_count;
649  if (num_strings_added > 0) {
651  }
652 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
std::shared_mutex rw_mutex_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
void appendToStorage(const String str) noexcept
const shared::StringDictKey dict_key_
#define CHECK_LT(x, y)
Definition: Logger.h:303
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:291
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
bool g_enable_stringdict_parallel

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< String >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 447 of file StringDictionary.cpp.

References client_no_timeout_, and getOrAddBulk().

Referenced by import_export::TypedImportBuffer::addDictEncodedStringArray().

449  {
450  if (client_no_timeout_) {
451  client_no_timeout_->get_or_add_bulk_array(ids_array_vec, string_array_vec);
452  return;
453  }
454 
455  ids_array_vec.resize(string_array_vec.size());
456  for (size_t i = 0; i < string_array_vec.size(); i++) {
457  auto& strings = string_array_vec[i];
458  auto& ids = ids_array_vec[i];
459  ids.resize(strings.size());
460  getOrAddBulk(strings, &ids[0]);
461  }
462 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::unique_ptr< StringDictionaryClient > client_no_timeout_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
void StringDictionary::getOrAddBulkParallel ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 655 of file StringDictionary.cpp.

References appendToStorageBulk(), CHECK, CHECK_LT, computeBucketFromStorageAndMemory(), dict_key_, fillRateIsHigh(), hash_cache_, hashStrings(), increaseHashTableCapacityFromStorageAndMemory(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

656  {
657  // Compute hashes of the input strings up front, and in parallel,
658  // as the string hashing does not need to be behind the subsequent write_lock
659  std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
660  hashStrings(input_strings, input_strings_hashes);
661 
662  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
663  size_t shadow_str_count =
664  str_count_; // Need to shadow str_count_ now with bulk add methods
665  const size_t storage_high_water_mark = shadow_str_count;
666  std::vector<size_t> string_memory_ids;
667  size_t sum_new_string_lengths = 0;
668  string_memory_ids.reserve(input_strings.size());
669  size_t input_string_idx{0};
670  for (const auto& input_string : input_strings) {
671  // Currently we make empty strings null
672  if (input_string.empty()) {
673  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
674  continue;
675  }
676  // TODO: Recover gracefully if an input string is too long
677  CHECK(input_string.size() <= MAX_STRLEN);
678 
679  if (fillRateIsHigh(shadow_str_count)) {
680  // resize when more than 50% is full
682  storage_high_water_mark,
683  input_strings,
684  string_memory_ids,
685  input_strings_hashes);
686  }
687  // Compute the hash for this input_string
688  const string_dict_hash_t input_string_hash = input_strings_hashes[input_string_idx];
689 
690  const uint32_t hash_bucket =
691  computeBucketFromStorageAndMemory(input_string_hash,
692  input_string,
694  storage_high_water_mark,
695  input_strings,
696  string_memory_ids);
697 
698  // If the hash bucket is not empty, that is our string id
699  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
700  // bucket string are equal)
702  output_string_ids[input_string_idx++] =
704  continue;
705  }
706  // Did not find string, so need to add record to dictionary
707  // First check there is room
708  if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
709  throw_encoding_error<T>(input_string, dict_key_);
710  }
711  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
712  << "Maximum number (" << shadow_str_count
713  << ") of Dictionary encoded Strings reached for this column, offset path "
714  "for column is "
715  << offsets_path_;
716 
717  string_memory_ids.push_back(input_string_idx);
718  sum_new_string_lengths += input_string.size();
719  string_id_string_dict_hash_table_[hash_bucket] =
720  static_cast<int32_t>(shadow_str_count);
721  if (materialize_hashes_) {
722  hash_cache_[shadow_str_count] = input_string_hash;
723  }
724  output_string_ids[input_string_idx++] = shadow_str_count++;
725  }
726  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
727  const size_t num_strings_added = shadow_str_count - str_count_;
728  str_count_ = shadow_str_count;
729  if (num_strings_added > 0) {
731  }
732 }
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
std::shared_mutex rw_mutex_
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
const shared::StringDictKey dict_key_
#define CHECK_LT(x, y)
Definition: Logger.h:303
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:291
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAddImpl ( const std::string_view &  str)
privatenoexcept

Definition at line 1313 of file StringDictionary.cpp.

References CHECK, CHECK_LT, and anonymous_namespace{StringDictionary.cpp}::hash_string().

1313  {
1314  // @TODO(wei) treat empty string as NULL for now
1315  if (str.size() == 0) {
1316  return inline_int_null_value<int32_t>();
1317  }
1318  CHECK(str.size() <= MAX_STRLEN);
1319  const string_dict_hash_t hash = hash_string(str);
1320  {
1321  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1322  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1324  return string_id_string_dict_hash_table_[bucket];
1325  }
1326  }
1327  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1328  if (fillRateIsHigh(str_count_)) {
1329  // resize when more than 50% is full
1331  }
1332  // need to recalculate the bucket in case it changed before
1333  // we got the lock
1334  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1337  << "Maximum number (" << str_count_
1338  << ") of Dictionary encoded Strings reached for this column, offset path "
1339  "for column is "
1340  << offsets_path_;
1341  appendToStorage(str);
1342  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1343  if (materialize_hashes_) {
1344  hash_cache_[str_count_] = hash;
1345  }
1346  ++str_count_;
1348  }
1349  return string_id_string_dict_hash_table_[bucket];
1350 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
std::shared_mutex rw_mutex_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
void appendToStorage(const String str) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:303
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:291
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 1143 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), isClient(), regex_cache_, regex_cache_size_, run_benchmark_import::result, rw_mutex_, and str_count_.

1145  {
1146  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1147  if (isClient()) {
1148  return client_->get_regexp_like(pattern, escape, generation);
1149  }
1150  const auto cache_key = std::make_pair(pattern, escape);
1151  const auto it = regex_cache_.find(cache_key);
1152  if (it != regex_cache_.end()) {
1153  return it->second;
1154  }
1155  std::vector<int32_t> result;
1156  std::vector<std::thread> workers;
1157  int worker_count = cpu_threads();
1158  CHECK_GT(worker_count, 0);
1159  std::vector<std::vector<int32_t>> worker_results(worker_count);
1160  CHECK_LE(generation, str_count_);
1161  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
1162  workers.emplace_back([&worker_results,
1163  &pattern,
1164  generation,
1165  escape,
1166  worker_idx,
1167  worker_count,
1168  this]() {
1169  for (size_t string_id = worker_idx; string_id < generation;
1170  string_id += worker_count) {
1171  const auto str = getStringUnlocked(string_id);
1172  if (is_regexp_like(str, pattern, escape)) {
1173  worker_results[worker_idx].push_back(string_id);
1174  }
1175  }
1176  });
1177  }
1178  for (auto& worker : workers) {
1179  worker.join();
1180  }
1181  for (const auto& worker_result : worker_results) {
1182  result.insert(result.end(), worker_result.begin(), worker_result.end());
1183  }
1184  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
1185  regex_cache_size_ += (pattern.size() + 1 + (result.size() * sizeof(int32_t)));
1186  CHECK(it_ok.second);
1187 
1188  return result;
1189 }
bool isClient() const noexcept
std::shared_mutex rw_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:304
#define CHECK(condition)
Definition: Logger.h:291
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 773 of file StringDictionary.cpp.

References client_, getStringUnlocked(), isClient(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

773  {
774  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
775  if (isClient()) {
776  std::string ret;
777  client_->get_string(ret, string_id);
778  return ret;
779  }
780  return getStringUnlocked(string_id);
781 }
bool isClient() const noexcept
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 800 of file StringDictionary.cpp.

References CHECK, CHECK_LE, and CHECK_LT.

801  {
802  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
803  CHECK(!isClient());
804  CHECK_LE(0, string_id);
805  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
806  return getStringBytesChecked(string_id);
807 }
bool isClient() const noexcept
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:303
#define CHECK_LE(x, y)
Definition: Logger.h:304
#define CHECK(condition)
Definition: Logger.h:291
std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 1365 of file StringDictionary.cpp.

References CHECK.

1366  {
1367  const auto str_canary = getStringFromStorage(string_id);
1368  CHECK(!str_canary.canary);
1369  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1370 }
#define CHECK(condition)
Definition: Logger.h:291
PayloadString getStringFromStorage(const int string_id) const noexcept
std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 1352 of file StringDictionary.cpp.

References CHECK.

Referenced by increaseHashTableCapacity().

1352  {
1353  const auto str_canary = getStringFromStorage(string_id);
1354  CHECK(!str_canary.canary);
1355  return std::string(str_canary.c_str_ptr, str_canary.size);
1356 }
#define CHECK(condition)
Definition: Logger.h:291
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 1551 of file StringDictionary.cpp.

References CHECK_GE, StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by getCompare(), mergeSortedCache(), sortCache(), and StringDictionary().

1552  {
1553  if (!isTemp_) {
1554  CHECK_GE(payload_fd_, 0);
1555  CHECK_GE(offset_fd_, 0);
1556  }
1557  CHECK_GE(string_id, 0);
1558  const StringIdxEntry* str_meta = offset_map_ + string_id;
1559  if (str_meta->size == 0xffff) {
1560  // hit the canary
1561  return {nullptr, 0, true};
1562  }
1563  return {payload_map_ + str_meta->off, str_meta->size, false};
1564 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:306

+ Here is the caller graph for this function:

std::string_view StringDictionary::getStringFromStorageFast ( const int  string_id) const
privatenoexcept

Definition at line 1545 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by buildDictionaryNumericTranslationMap(), buildDictionaryTranslationMap(), eachStringSerially(), and getStringViews().

1546  {
1547  const StringIdxEntry* str_meta = offset_map_ + string_id;
1548  return {payload_map_ + str_meta->off, str_meta->size};
1549 }
StringIdxEntry * offset_map_

+ Here is the caller graph for this function:

std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 789 of file StringDictionary.cpp.

References CHECK_LT.

Referenced by copyStrings(), getEquals(), getLikeImpl(), getRegexpLike(), and getString().

789  {
790  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
791  return getStringChecked(string_id);
792 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:303

+ Here is the caller graph for this function:

std::string_view StringDictionary::getStringView ( int32_t  string_id) const

Definition at line 783 of file StringDictionary.cpp.

References CHECK, getStringViewUnlocked(), isClient(), and rw_mutex_.

Referenced by data_conversion::anonymous_namespace{StringViewSource.h}::get_materialized_string_views().

783  {
784  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
785  CHECK(!isClient()) << "use of this function is unsupported in distributed";
786  return getStringViewUnlocked(string_id);
787 }
bool isClient() const noexcept
std::shared_mutex rw_mutex_
#define CHECK(condition)
Definition: Logger.h:291
std::string_view getStringViewUnlocked(int32_t string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string_view StringDictionary::getStringViewChecked ( const int  string_id) const
privatenoexcept

Definition at line 1358 of file StringDictionary.cpp.

References CHECK.

1359  {
1360  const auto str_canary = getStringFromStorage(string_id);
1361  CHECK(!str_canary.canary);
1362  return std::string_view{str_canary.c_str_ptr, str_canary.size};
1363 }
#define CHECK(condition)
Definition: Logger.h:291
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< std::string_view > StringDictionary::getStringViews ( ) const

Definition at line 1840 of file StringDictionary.cpp.

References storageEntryCount().

1840  {
1842 }
size_t storageEntryCount() const
std::vector< std::string_view > getStringViews() const

+ Here is the call graph for this function:

std::vector< std::string_view > StringDictionary::getStringViews ( const size_t  generation) const

Definition at line 1787 of file StringDictionary.cpp.

References CHECK_GE, CHECK_LE, DEBUG_TIMER, getStringFromStorageFast(), MAX_STRCOUNT, ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, threading_serial::parallel_for(), rw_mutex_, and storageEntryCount().

1788  {
1789  auto timer = DEBUG_TIMER(__func__);
1790  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1791  const int64_t num_strings = generation >= 0 ? generation : storageEntryCount();
1792  CHECK_LE(num_strings, static_cast<int64_t>(StringDictionary::MAX_STRCOUNT));
1793  // The CHECK_LE below is currently redundant with the check
1794  // above against MAX_STRCOUNT, however given we iterate using
1795  // int32_t types for efficiency (to match type expected by
1796  // getStringFromStorageFast, check that the # of strings is also
1797  // in the int32_t range in case MAX_STRCOUNT is changed
1798 
1799  // Todo(todd): consider aliasing the max logical type width
1800  // (currently int32_t) throughout StringDictionary
1801  CHECK_LE(num_strings, std::numeric_limits<int32_t>::max());
1802 
1803  std::vector<std::string_view> string_views(num_strings);
1804  // We can bail early if the generation-specified dictionary is empty
1805  if (num_strings == 0) {
1806  return string_views;
1807  }
1808  constexpr int64_t tbb_parallel_threshold{1000};
1809  if (num_strings < tbb_parallel_threshold) {
1810  // Use int32_t to match type expected by getStringFromStorageFast
1811  for (int32_t string_idx = 0; string_idx < num_strings; ++string_idx) {
1812  string_views[string_idx] = getStringFromStorageFast(string_idx);
1813  }
1814  } else {
1815  constexpr int64_t target_strings_per_thread{1000};
1816  const ThreadInfo thread_info(
1817  std::thread::hardware_concurrency(), num_strings, target_strings_per_thread);
1818  CHECK_GE(thread_info.num_threads, 1L);
1819  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1820 
1821  tbb::task_arena limited_arena(thread_info.num_threads);
1822  limited_arena.execute([&] {
1824  tbb::blocked_range<int64_t>(
1825  0, num_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
1826  [&](const tbb::blocked_range<int64_t>& r) {
1827  // r should be in range of int32_t per CHECK above
1828  const int32_t start_idx = r.begin();
1829  const int32_t end_idx = r.end();
1830  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1831  string_views[string_idx] = getStringFromStorageFast(string_idx);
1832  }
1833  },
1834  tbb::simple_partitioner());
1835  });
1836  }
1837  return string_views;
1838 }
size_t storageEntryCount() const
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr size_t MAX_STRCOUNT
#define CHECK_LE(x, y)
Definition: Logger.h:304
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
#define DEBUG_TIMER(name)
Definition: Logger.h:412

+ Here is the call graph for this function:

std::string_view StringDictionary::getStringViewUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 794 of file StringDictionary.cpp.

References CHECK_LT.

Referenced by getStringView().

795  {
796  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
797  return getStringViewChecked(string_id);
798 }
#define CHECK_LT(x, y)
Definition: Logger.h:303
std::string_view getStringViewChecked(const int string_id) const noexcept

+ Here is the caller graph for this function:

int32_t StringDictionary::getUnlocked ( const std::string_view  sv) const
privatenoexcept

Definition at line 766 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string().

Referenced by getIdOfString().

766  {
767  const string_dict_hash_t hash = hash_string(sv);
770  return str_id;
771 }
string_dict_hash_t hash_string(const std::string_view &str)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::hashStrings ( const std::vector< String > &  string_vec,
std::vector< string_dict_hash_t > &  hashes 
) const
privatenoexcept

Method to hash a vector of strings in parallel.

Parameters
string_vecinput vector of strings to be hashed
hashesspace for the output - should be pre-sized to match string_vec size

Definition at line 478 of file StringDictionary.cpp.

References CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::hash_string(), and threading_serial::parallel_for().

Referenced by getOrAddBulkParallel().

480  {
481  CHECK_EQ(string_vec.size(), hashes.size());
482 
483  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
484  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
485  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
486  if (string_vec[curr_id].empty()) {
487  continue;
488  }
489  hashes[curr_id] = hash_string(string_vec[curr_id]);
490  }
491  });
492 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
string_dict_hash_t hash_string(const std::string_view &str)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::increaseHashTableCapacity ( )
privatenoexcept

Definition at line 1257 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, materialize_hashes_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

1257  {
1258  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1259  INVALID_STR_ID);
1260 
1261  if (materialize_hashes_) {
1262  for (size_t i = 0; i != str_count_; ++i) {
1263  const string_dict_hash_t hash = hash_cache_[i];
1264  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1265  new_str_ids[bucket] = i;
1266  }
1267  hash_cache_.resize(hash_cache_.size() * 2);
1268  } else {
1269  for (size_t i = 0; i != str_count_; ++i) {
1270  const auto str = getStringChecked(i);
1271  const string_dict_hash_t hash = hash_string(str);
1272  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1273  new_str_ids[bucket] = i;
1274  }
1275  }
1276  string_id_string_dict_hash_table_.swap(new_str_ids);
1277 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::increaseHashTableCapacityFromStorageAndMemory ( const size_t  str_count,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const std::vector< string_dict_hash_t > &  input_strings_hashes 
)
privatenoexcept

Definition at line 1280 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string().

Referenced by getOrAddBulkParallel().

1286  {
1287  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1288  INVALID_STR_ID);
1289  if (materialize_hashes_) {
1290  for (size_t i = 0; i != str_count; ++i) {
1291  const string_dict_hash_t hash = hash_cache_[i];
1292  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1293  new_str_ids[bucket] = i;
1294  }
1295  hash_cache_.resize(hash_cache_.size() * 2);
1296  } else {
1297  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1298  const auto storage_string = getStringChecked(storage_idx);
1299  const string_dict_hash_t hash = hash_string(storage_string);
1300  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1301  new_str_ids[bucket] = storage_idx;
1302  }
1303  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1304  const size_t string_memory_id = string_memory_ids[memory_idx];
1305  const uint32_t bucket = computeUniqueBucketWithHash(
1306  input_strings_hashes[string_memory_id], new_str_ids);
1307  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1308  }
1309  }
1310  string_id_string_dict_hash_table_.swap(new_str_ids);
1311 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private
void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1626 of file StringDictionary.cpp.

References compare_cache_, compare_cache_size_, equal_cache_, equal_cache_size_, like_cache_size_, like_i32_cache_, like_i64_cache_, regex_cache_, regex_cache_size_, and gpu_enabled::swap().

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1626  {
1627  if (!like_i32_cache_.empty()) {
1628  decltype(like_i32_cache_)().swap(like_i32_cache_);
1629  }
1630  if (!like_i64_cache_.empty()) {
1631  decltype(like_i64_cache_)().swap(like_i64_cache_);
1632  }
1633  if (!regex_cache_.empty()) {
1634  decltype(regex_cache_)().swap(regex_cache_);
1635  }
1636  if (!equal_cache_.empty()) {
1637  decltype(equal_cache_)().swap(equal_cache_);
1638  }
1639  compare_cache_.invalidateInvertedIndex();
1640 
1641  like_cache_size_ = 0;
1642  regex_cache_size_ = 0;
1643  equal_cache_size_ = 0;
1644  compare_cache_size_ = 0;
1645 }
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int64_t > > like_i64_cache_
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_i32_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool StringDictionary::isClient ( ) const
noexcept

Definition at line 1670 of file StringDictionary.cpp.

References client_.

Referenced by checkpoint(), copyStrings(), eachStringSerially(), getCompare(), getIdOfString(), getRegexpLike(), getString(), getStringView(), makeLambdaStringToId(), storageEntryCount(), and ~StringDictionary().

1670  {
1671  return static_cast<bool>(client_);
1672 }
std::unique_ptr< StringDictionaryClient > client_

+ Here is the caller graph for this function:

std::function< int32_t(std::string const &)> StringDictionary::makeLambdaStringToId ( ) const

Definition at line 263 of file StringDictionary.cpp.

References CHECK, eachStringSerially(), INVALID_STR_ID, and isClient().

264  {
265  CHECK(isClient());
266  constexpr size_t big_gen = static_cast<size_t>(std::numeric_limits<size_t>::max());
267  MapMaker map_maker;
268  eachStringSerially(big_gen, map_maker);
269  return [map{map_maker.moveMap()}](std::string const& str) {
270  auto const itr = map.find(str);
271  return itr == map.cend() ? INVALID_STR_ID : itr->second;
272  };
273 }
bool isClient() const noexcept
static constexpr int32_t INVALID_STR_ID
void eachStringSerially(int64_t const generation, StringCallback &) const
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1698 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1698  {
1699  // this method is not thread safe
1700  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1701  size_t t_idx = 0, s_idx = 0, idx = 0;
1702  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1703  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1704  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1705  const auto insert_from_temp_cache =
1706  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1707  if (insert_from_temp_cache) {
1708  updated_cache[idx] = temp_sorted_cache[t_idx++];
1709  } else {
1710  updated_cache[idx] = sorted_cache[s_idx++];
1711  }
1712  }
1713  while (t_idx < temp_sorted_cache.size()) {
1714  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1715  }
1716  while (s_idx < sorted_cache.size()) {
1717  updated_cache[idx++] = sorted_cache[s_idx++];
1718  }
1719  sorted_cache.swap(updated_cache);
1720 }
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:308
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1746 of file StringDictionary.cpp.

References threading_serial::async(), populate_string_ids(), and logger::thread_id().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1750  {
1751  dest_array_ids.resize(source_array_ids.size());
1752 
1753  std::atomic<size_t> row_idx{0};
1754  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1755  int thread_id) {
1756  for (;;) {
1757  auto row = row_idx.fetch_add(1);
1758 
1759  if (row >= dest_array_ids.size()) {
1760  return;
1761  }
1762  const auto& source_ids = source_array_ids[row];
1763  auto& dest_ids = dest_array_ids[row];
1764  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1765  }
1766  };
1767 
1768  const int num_worker_threads = std::thread::hardware_concurrency();
1769 
1770  if (source_array_ids.size() / num_worker_threads > 10) {
1771  std::vector<std::future<void>> worker_threads;
1772  for (int i = 0; i < num_worker_threads; ++i) {
1773  worker_threads.push_back(std::async(std::launch::async, processor, i));
1774  }
1775 
1776  for (auto& child : worker_threads) {
1777  child.wait();
1778  }
1779  for (auto& child : worker_threads) {
1780  child.get();
1781  }
1782  } else {
1783  processor(0);
1784  }
1785 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::vector< std::string const * > &transient_string_vec={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
future< Result > async(Fn &&fn, Args &&...args)
ThreadId thread_id()
Definition: Logger.cpp:879

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::vector< std::string const * > &  transient_string_vec = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_string_vec- ordered vector of string value pointers

Definition at line 1722 of file StringDictionary.cpp.

References CHECK_LT, getOrAddBulk(), getString(), and StringDictionaryProxy::transientIdToIndex().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1727  {
1728  std::vector<std::string> strings;
1729 
1730  for (const int32_t source_id : source_ids) {
1731  if (source_id == std::numeric_limits<int32_t>::min()) {
1732  strings.emplace_back("");
1733  } else if (source_id < 0) {
1734  unsigned const string_index = StringDictionaryProxy::transientIdToIndex(source_id);
1735  CHECK_LT(string_index, transient_string_vec.size()) << "source_id=" << source_id;
1736  strings.emplace_back(*transient_string_vec[string_index]);
1737  } else {
1738  strings.push_back(source_dict->getString(source_id));
1739  }
1740  }
1741 
1742  dest_ids.resize(strings.size());
1743  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1744 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
#define CHECK_LT(x, y)
Definition: Logger.h:303
std::string getString(int32_t string_id) const
static unsigned transientIdToIndex(int32_t const id)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 300 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), hash_cache_, materialize_hashes_, payload_file_off_, str_count_, and string_id_string_dict_hash_table_.

Referenced by StringDictionary().

302  {
303  for (auto& dictionary_future : dictionary_futures) {
304  dictionary_future.wait();
305  const auto hashVec = dictionary_future.get();
306  for (const auto& hash : hashVec) {
307  const uint32_t bucket =
309  payload_file_off_ += hash.second;
310  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
311  if (materialize_hashes_) {
312  hash_cache_[str_count_] = hash.first;
313  }
314  ++str_count_;
315  }
316  }
317  dictionary_futures.clear();
318 }
std::vector< string_dict_hash_t > hash_cache_
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1685 of file StringDictionary.cpp.

References anonymous_namespace{Utm.h}::a, getStringFromStorage(), gpu_enabled::sort(), and string_lt().

Referenced by buildSortedCache().

1685  {
1686  // This method is not thread-safe.
1687 
1688  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1689  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1690 
1691  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1692  auto a_str = this->getStringFromStorage(a);
1693  auto b_str = this->getStringFromStorage(b);
1694  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1695  });
1696 }
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:308
constexpr double a
Definition: Utm.h:32
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 809 of file StringDictionary.cpp.

References client_, isClient(), rw_mutex_, and str_count_.

Referenced by buildDictionaryTranslationMap(), eachStringSerially(), getBulk(), and getStringViews().

809  {
810  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
811  if (isClient()) {
812  return client_->storage_entry_count();
813  }
814  return str_count_;
815 }
bool isClient() const noexcept
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::update_leaf ( const LeafHostInfo host_info)

Definition at line 386 of file StringDictionary.cpp.

386 {}

Friends And Related Function Documentation

friend class StringLocalCallback
friend

Definition at line 79 of file StringDictionary.h.

Member Data Documentation

char* StringDictionary::CANARY_BUFFER {nullptr}
private

Definition at line 310 of file StringDictionary.h.

Referenced by ~StringDictionary().

size_t StringDictionary::canary_buffer_size = 0
private

Definition at line 311 of file StringDictionary.h.

std::unique_ptr<StringDictionaryClient> StringDictionary::client_
mutableprivate
std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
mutableprivate

Definition at line 308 of file StringDictionary.h.

Referenced by buildDictionaryTranslationMap(), and getOrAddBulkArray().

size_t StringDictionary::collisions_
private

Definition at line 279 of file StringDictionary.h.

Referenced by StringDictionary().

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 303 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

size_t StringDictionary::compare_cache_size_
mutableprivate

Definition at line 304 of file StringDictionary.h.

Referenced by computeCacheSize(), getCompare(), and invalidateInvertedIndex().

const shared::StringDictKey StringDictionary::dict_key_
private

Definition at line 276 of file StringDictionary.h.

Referenced by getBulk(), getDictKey(), getOrAddBulk(), and getOrAddBulkParallel().

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 301 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

size_t StringDictionary::equal_cache_size_
mutableprivate

Definition at line 302 of file StringDictionary.h.

Referenced by computeCacheSize(), getEquals(), and invalidateInvertedIndex().

const std::string StringDictionary::folder_
private

Definition at line 277 of file StringDictionary.h.

size_t StringDictionary::like_cache_size_
mutableprivate

Definition at line 298 of file StringDictionary.h.

Referenced by computeCacheSize(), and invalidateInvertedIndex().

std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_i32_cache_
mutableprivate

Definition at line 295 of file StringDictionary.h.

Referenced by invalidateInvertedIndex().

std::map<std::tuple<std::string, bool, bool, char>, std::vector<int64_t> > StringDictionary::like_i64_cache_
mutableprivate

Definition at line 297 of file StringDictionary.h.

Referenced by invalidateInvertedIndex().

bool StringDictionary::materialize_hashes_
private
constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static
int StringDictionary::offset_fd_
private
size_t StringDictionary::offset_file_size_
private
StringIdxEntry* StringDictionary::offset_map_
private
std::string StringDictionary::offsets_path_
private

Definition at line 285 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and StringDictionary().

int StringDictionary::payload_fd_
private
size_t StringDictionary::payload_file_off_
private
size_t StringDictionary::payload_file_size_
private
char* StringDictionary::payload_map_
private
std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 299 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

size_t StringDictionary::regex_cache_size_
mutableprivate

Definition at line 300 of file StringDictionary.h.

Referenced by computeCacheSize(), getRegexpLike(), and invalidateInvertedIndex().

std::vector<int32_t> StringDictionary::sorted_cache
private
std::vector<int32_t> StringDictionary::string_id_string_dict_hash_table_
private
std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 305 of file StringDictionary.h.

Referenced by copyStrings().

size_t StringDictionary::strings_cache_size_
mutableprivate

Definition at line 306 of file StringDictionary.h.

Referenced by computeCacheSize(), and copyStrings().


The documentation for this class was generated from the following files: