29 #include <tbb/parallel_for.h>
30 #include <tbb/task_arena.h>
36 #include <string_view>
41 const int64_t generation)
42 : string_dict_(sd), string_dict_key_(string_dict_key), generation_(generation) {}
53 const std::vector<std::string>& strings)
const {
55 std::vector<int32_t> string_ids(strings.size());
61 const std::vector<std::string>& strings) {
63 const size_t num_strings = strings.size();
64 std::vector<int32_t> string_ids(num_strings);
65 if (num_strings == 0) {
75 const size_t num_strings_not_found =
77 if (num_strings_not_found > 0) {
78 std::lock_guard<std::shared_mutex> write_lock(
rw_mutex_);
79 for (
size_t string_idx = 0; string_idx < num_strings; ++string_idx) {
88 template <
typename String>
93 if (emplaced.second) {
96 transient_id = emplaced.first->second;
101 template <
typename String>
107 std::lock_guard<std::shared_mutex> write_lock(
rw_mutex_);
112 return getOrAddTransientImpl<std::string const&>(str);
116 return getOrAddTransientImpl<std::string_view const>(sv);
120 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
130 template <
typename String>
137 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
150 CHECK(proxy_ptr !=
nullptr);
158 CHECK(proxy_ptr !=
nullptr);
166 CHECK(proxy_ptr !=
nullptr);
168 std::string str(c_str_ptr);
169 return proxy->getOrAddTransient(str);
173 if (inline_int_null_value<int32_t>() == string_id) {
176 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
190 const std::vector<int32_t>& string_ids)
const {
191 std::vector<std::string> strings;
192 if (!string_ids.empty()) {
193 strings.reserve(string_ids.size());
194 for (
const auto string_id : string_ids) {
195 if (string_id >= 0) {
196 strings.emplace_back(
string_dict_->getString(string_id));
197 }
else if (inline_int_null_value<int32_t>() == string_id) {
198 strings.emplace_back(
"");
208 template <
typename String>
210 const String& lookup_string)
const {
218 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos)
const {
220 CHECK(string_op_infos.size());
222 if (translation_map.empty()) {
223 return translation_map;
226 const StringOps_Namespace::StringOps string_ops(string_op_infos);
228 const size_t num_transient_entries = translation_map.numTransients();
229 if (num_transient_entries) {
230 const int32_t map_domain_start = translation_map.domainStart();
231 if (num_transient_entries > 10000UL) {
233 tbb::blocked_range<int32_t>(map_domain_start, -1),
234 [&](
const tbb::blocked_range<int32_t>& r) {
235 const int32_t start_idx = r.begin();
236 const int32_t end_idx = r.end();
237 for (int32_t source_string_id = start_idx; source_string_id < end_idx;
238 ++source_string_id) {
240 translation_map[source_string_id] = string_ops.numericEval(source_string);
244 for (int32_t source_string_id = map_domain_start; source_string_id < -1;
245 ++source_string_id) {
247 translation_map[source_string_id] = string_ops.numericEval(source_string);
252 Datum* translation_map_stored_entries_ptr = translation_map.storageData();
255 translation_map_stored_entries_ptr,
generation_, string_op_infos);
257 translation_map.setNumUntranslatedStrings(0UL);
261 return translation_map;
267 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos)
const {
271 if (id_map.
empty()) {
275 const StringOps_Namespace::StringOps string_ops(string_op_infos);
279 size_t num_transient_strings_not_translated = 0UL;
280 if (num_transient_entries) {
281 std::vector<std::string> transient_lookup_strings(num_transient_entries);
282 if (string_ops.size()) {
285 transient_lookup_strings.rbegin(),
286 [&](std::string
const* ptr) {
return string_ops(*ptr); });
290 transient_lookup_strings.rbegin(),
291 [](std::string
const* ptr) {
return *ptr; });
304 num_transient_strings_not_translated =
311 int32_t* translation_map_stored_entries_ptr = id_map.
storageData();
313 auto dest_transient_lookup_callback = [dest_proxy, translation_map_stored_entries_ptr](
314 const std::string_view& source_string,
315 const int32_t source_string_id) {
316 translation_map_stored_entries_ptr[source_string_id] =
318 return translation_map_stored_entries_ptr[source_string_id] ==
323 const size_t num_persisted_strings_not_translated =
326 translation_map_stored_entries_ptr,
329 num_dest_transients > 0UL,
330 dest_transient_lookup_callback,
335 const size_t num_total_entries =
338 const size_t num_strings_not_translated =
339 num_transient_strings_not_translated + num_persisted_strings_not_translated;
340 CHECK_LE(num_strings_not_translated, num_total_entries);
348 num_dest_transients > 0 ? -1 - static_cast<int32_t>(num_dest_transients) : 0);
351 const size_t num_entries_translated = num_total_entries - num_strings_not_translated;
352 const float match_pct =
353 100.0 *
static_cast<float>(num_entries_translated) / num_total_entries;
354 VLOG(1) << std::fixed << std::setprecision(2) << match_pct <<
"% ("
355 << num_entries_translated <<
" entries) from dictionary ("
356 <<
string_dict_->getDictKey() <<
") with " << num_total_entries
357 <<
" total entries ( " << num_transient_entries <<
" literals)"
358 <<
" translated to dictionary (" << dest_proxy->
string_dict_->getDictKey()
359 <<
") with " << num_dest_entries <<
" total entries ("
367 std::shared_lock<std::shared_mutex>& source_proxy_read_lock,
368 std::unique_lock<std::shared_mutex>& dest_proxy_write_lock) {
369 if (source_dict_key == dest_dict_key) {
371 dest_proxy_write_lock.lock();
372 }
else if (source_dict_key < dest_dict_key) {
373 source_proxy_read_lock.lock();
374 dest_proxy_write_lock.lock();
376 dest_proxy_write_lock.lock();
377 source_proxy_read_lock.lock();
384 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos)
const {
386 const auto& dest_dict_id = dest_proxy->
getDictKey();
388 std::shared_lock<std::shared_mutex> source_proxy_read_lock(
rw_mutex_, std::defer_lock);
389 std::unique_lock<std::shared_mutex> dest_proxy_write_lock(dest_proxy->
rw_mutex_,
392 source_dict_id, dest_dict_id, source_proxy_read_lock, dest_proxy_write_lock);
398 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos)
const {
402 const auto& dest_dict_id = dest_proxy->
getDictKey();
403 std::shared_lock<std::shared_mutex> source_proxy_read_lock(
rw_mutex_, std::defer_lock);
404 std::unique_lock<std::shared_mutex> dest_proxy_write_lock(dest_proxy->
rw_mutex_,
407 source_dict_id, dest_dict_id, source_proxy_read_lock, dest_proxy_write_lock);
411 if (id_map.empty()) {
414 const auto num_untranslated_strings = id_map.numUntranslatedStrings();
415 if (num_untranslated_strings > 0) {
416 const size_t total_post_translation_dest_transients =
418 constexpr
size_t max_allowed_transients =
419 static_cast<size_t>(std::numeric_limits<int32_t>::max() -
421 if (total_post_translation_dest_transients > max_allowed_transients) {
422 std::stringstream ss;
423 ss <<
"Union translation to dictionary " <<
getDictKey() <<
" would result in "
424 << total_post_translation_dest_transients
425 <<
" transient entries, which is more than limit of " << max_allowed_transients
427 throw std::runtime_error(ss.str());
429 const int32_t map_domain_start = id_map.domainStart();
430 const int32_t map_domain_end = id_map.domainEnd();
432 const StringOps_Namespace::StringOps string_ops(string_op_infos);
433 const bool has_string_ops = string_ops.size();
439 for (int32_t source_string_id = map_domain_start; source_string_id < -1;
440 ++source_string_id) {
444 has_string_ops ? string_ops(source_string) : source_string);
445 id_map[source_string_id] = dest_string_id;
449 for (int32_t source_string_id = 0; source_string_id < map_domain_end;
450 ++source_string_id) {
452 const auto source_string =
string_dict_->getString(source_string_id);
454 has_string_ops ? string_ops(source_string) : source_string);
455 id_map[source_string_id] = dest_string_id;
463 id_map.setRangeStart(
464 num_dest_transients > 0 ? -1 - static_cast<int32_t>(num_dest_transients) : 0);
468 template <
typename T>
471 const bool is_simple,
472 const char escape)
const {
480 if (is_like_impl(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape)) {
487 template std::vector<int32_t> StringDictionaryProxy::getLike<int32_t>(
488 const std::string& pattern,
490 const bool is_simple,
491 const char escape)
const;
493 template std::vector<int64_t> StringDictionaryProxy::getLike<int64_t>(
494 const std::string& pattern,
496 const bool is_simple,
497 const char escape)
const;
502 const std::string& pattern,
503 const std::string& comp_operator) {
504 int res = str.compare(pattern);
505 if (comp_operator ==
"<") {
507 }
else if (comp_operator ==
"<=") {
509 }
else if (comp_operator ==
"=") {
511 }
else if (comp_operator ==
">") {
513 }
else if (comp_operator ==
">=") {
515 }
else if (comp_operator ==
"<>") {
518 throw std::runtime_error(
"unsupported string compare operator");
524 const std::string& pattern,
525 const std::string& comp_operator)
const {
539 const std::string& pattern,
541 return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
547 const char escape)
const {
559 return string_dict_->getOrAdd(str);
563 int32_t string_id)
const noexcept {
564 if (string_id >= 0) {
565 return string_dict_.get()->getStringBytes(string_id);
567 unsigned const string_index = transientIdToIndex(string_id);
568 std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
569 CHECK_LT(string_index, transient_string_vec_.size());
570 std::string
const*
const str_ptr = transient_string_vec_[string_index];
571 return {str_ptr->c_str(), str_ptr->size()};
577 CHECK_LE(num_storage_entries, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
578 return num_storage_entries;
586 static_cast<size_t>(std::numeric_limits<int32_t>::max()) - 1);
587 return num_transient_entries;
591 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
600 std::shared_lock<std::shared_mutex> read_lock(
rw_mutex_);
607 constexpr int32_t max_transient_id = -2;
611 int32_t
const string_id = max_transient_id - index;
612 serial_callback(str, string_id);
629 void operator()(std::string
const& str, int32_t
const string_id)
override {
632 void operator()(std::string_view
const sv, int32_t
const old_id)
override {
648 using Lambda = std::function<int32_t(std::string const&)>;
655 ,
string_to_id_(sdp->string_dict_->makeLambdaStringToId()) {}
656 void operator()(std::string
const& str, int32_t
const old_id)
override {
662 void operator()(std::string_view
const, int32_t
const string_id)
override {
663 UNREACHABLE() <<
"StringNetworkCallback requires a std::string.";
673 std::unique_ptr<StringDictionary::StringCallback> serial_callback;
675 serial_callback = std::make_unique<StringNetworkCallback>(
this, id_map);
677 serial_callback = std::make_unique<StringLocalCallback>(
this, id_map);
685 if (generation == -1) {
688 if (generation_ != -1) {
692 generation_ = generation;
696 const std::vector<std::string>& strings,
698 const bool take_read_lock)
const {
699 const size_t num_strings = strings.size();
700 if (num_strings == 0) {
714 template <
typename String>
716 const std::vector<String>& lookup_strings,
718 const bool take_read_lock)
const {
719 const size_t num_strings = lookup_strings.size();
720 auto read_lock = take_read_lock ? std::shared_lock<std::shared_mutex>(
rw_mutex_)
721 : std::shared_lock<std::shared_mutex>();
726 constexpr
size_t tbb_parallel_threshold{20000};
727 if (num_strings < tbb_parallel_threshold) {
734 template <
typename String>
736 const std::vector<String>& lookup_strings,
737 int32_t* string_ids)
const {
738 const size_t num_strings = lookup_strings.size();
739 size_t num_strings_not_found = 0;
740 for (
size_t string_idx = 0; string_idx < num_strings; ++string_idx) {
748 num_strings_not_found++;
751 return num_strings_not_found;
754 template <
typename String>
756 const std::vector<String>& lookup_strings,
757 int32_t* string_ids)
const {
758 const size_t num_lookup_strings = lookup_strings.size();
759 const size_t target_inputs_per_thread = 20000L;
761 std::thread::hardware_concurrency(), num_lookup_strings, target_inputs_per_thread);
765 std::vector<size_t> num_strings_not_found_per_thread(thread_info.
num_threads, 0UL);
767 tbb::task_arena limited_arena(thread_info.
num_threads);
768 limited_arena.execute([&] {
770 tbb::blocked_range<size_t>(
772 [&](
const tbb::blocked_range<size_t>& r) {
773 const size_t start_idx = r.begin();
774 const size_t end_idx = r.end();
775 size_t num_local_strings_not_found = 0;
776 for (
size_t string_idx = start_idx; string_idx < end_idx; ++string_idx) {
780 string_ids[string_idx] =
783 num_local_strings_not_found++;
786 const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
787 num_strings_not_found_per_thread[tbb_thread_idx] = num_local_strings_not_found;
789 tbb::simple_partitioner());
791 size_t num_strings_not_found = 0;
792 for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
793 num_strings_not_found += num_strings_not_found_per_thread[thread_idx];
795 return num_strings_not_found;
void eachStringSerially(StringDictionary::StringCallback &) const
int32_t getOrAddTransientImpl(String)
void setNumUntranslatedStrings(const size_t num_untranslated_strings)
const shared::StringDictKey string_dict_key_
std::pair< const char *, size_t > getStringBytes(int32_t string_id) const noexcept
size_t transientEntryCountUnlocked() const
StringLocalCallback(StringDictionaryProxy *sdp, StringDictionaryProxy::IdMap &id_map)
Lambda const string_to_id_
int64_t num_elems_per_thread
StringDictionaryProxy::IdMap & id_map_
size_t entryCount() const
Returns the number of total string entries for this proxy, both stored in the underlying dictionary a...
size_t numTransients() const
int32_t getIdOfStringNoGeneration(const std::string &str) const
std::function< int32_t(std::string const &)> Lambda
std::string getStringUnlocked(const int32_t string_id) const
size_t storageEntryCount() const
Returns the number of string entries in the underlying string dictionary, at this proxy's generation_...
StringDictionary * getDictionary() const noexcept
size_t transientLookupBulkUnlocked(const std::vector< String > &lookup_strings, int32_t *string_ids) const
StringDictionaryProxy * sdp_
void operator()(std::string const &str, int32_t const string_id) override
size_t transientLookupBulk(const std::vector< String > &lookup_strings, int32_t *string_ids, const bool take_read_lock) const
std::string getString(int32_t string_id) const
Constants for Builtin SQL Types supported by HEAVY.AI.
RUNTIME_EXPORT DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, char escape_char)
IdMap buildIntersectionTranslationMapToOtherProxyUnlocked(const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
size_t transientLookupBulkParallelUnlocked(const std::vector< String > &lookup_strings, int32_t *string_ids) const
int32_t getIdOfStringFromClient(String const &) const
std::vector< int32_t > getTransientBulk(const std::vector< std::string > &strings) const
Executes read-only lookup of a vector of strings and returns a vector of their integer ids...
TranslationMap< Datum > buildNumericTranslationMap(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
Builds a vectorized string_id translation map from this proxy to dest_proxy.
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator) const
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
StringNetworkCallback(StringDictionaryProxy *sdp, StringDictionaryProxy::IdMap &id_map)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< StringDictionary > string_dict_
IdMap transientUnion(StringDictionaryProxy const &)
std::vector< std::string const * > transient_string_vec_
void setRangeEnd(const int32_t range_end)
RUNTIME_EXPORT DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
void operator()(std::string const &str, int32_t const old_id) override
int32_t lookupTransientStringUnlocked(const String &lookup_string) const
std::vector< std::string > getStrings(const std::vector< int32_t > &string_ids) const
size_t getTransientBulkImpl(const std::vector< std::string > &strings, int32_t *string_ids, const bool take_read_lock) const
void order_translation_locks(const shared::StringDictKey &source_dict_key, const shared::StringDictKey &dest_dict_key, std::shared_lock< std::shared_mutex > &source_read_lock, std::shared_lock< std::shared_mutex > &dest_read_lock)
void operator()(std::string_view const sv, int32_t const old_id) override
static int32_t transientIndexToId(unsigned const index)
void updateGeneration(const int64_t generation) noexcept
size_t transientEntryCount() const
Returns the number of transient string entries for this proxy,.
OUTPUT transform(INPUT const &input, FUNC const &func)
Functions to support the LIKE and ILIKE operator in SQL. Only single-byte character set is supported ...
IdMap buildUnionTranslationMapToOtherProxy(StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_types) const
TransientMap transient_str_to_int_
StringDictionaryProxy(StringDictionaryProxy const &)=delete
void setRangeStart(const int32_t range_start)
int32_t getOrAddTransient(const std::string &)
size_t entryCountUnlocked() const
std::vector< T > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape) const
void operator()(std::string_view const, int32_t const string_id) override
RUNTIME_EXPORT DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, char escape_char)
bool do_compare(const std::string &str, const std::string &pattern, const std::string &comp_operator)
StringDictionaryProxy * sdp_
int32_t getOrAddTransientUnlocked(String const &)
bool operator!=(StringDictionaryProxy const &) const
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape) const
int32_t getOrAdd(const std::string &str) noexcept
bool operator==(StringDictionaryProxy const &) const
std::vector< T > const & getVectorMap() const
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
std::vector< int32_t > getOrAddTransientBulk(const std::vector< std::string > &strings)
IdMap buildIntersectionTranslationMapToOtherProxy(const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
std::shared_mutex rw_mutex_
DEVICE RUNTIME_EXPORT int32_t StringDictionaryProxy_getStringId(int8_t *proxy_ptr, char *c_str_ptr)
#define DEBUG_TIMER(name)
DEVICE RUNTIME_EXPORT size_t StringDictionaryProxy_getStringLength(int8_t *proxy_ptr, int32_t string_id)
const shared::StringDictKey & getDictKey() const noexcept
RUNTIME_EXPORT DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
int32_t getIdOfString(const std::string &str) const
static unsigned transientIdToIndex(int32_t const id)
int64_t getGeneration() const noexcept
int32_t truncate_to_generation(const int32_t id, const size_t generation)
DEVICE RUNTIME_EXPORT const char * StringDictionaryProxy_getStringBytes(int8_t *proxy_ptr, int32_t string_id)
StringDictionaryProxy::IdMap & id_map_
RUNTIME_EXPORT DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)