41 std::make_unique<HashingSchemeRecycler>();
44 std::pair<InnerOuter, InnerOuterStringOpInfos>
get_cols(
54 bool const is_bw_eq) {
55 using EmptyRangeSize = std::optional<size_t>;
57 bool const is_bw_eq) -> EmptyRangeSize {
66 return EmptyRangeSize{};
69 auto empty_range = empty_range_check(col_range, is_bw_eq);
71 return {size_t(*empty_range), 1};
74 int64_t bucket_normalization =
77 auto const normalized_max = col_range.
getIntMax() / bucket_normalization;
78 auto const normalized_min = col_range.
getIntMin() / bucket_normalization;
79 return {size_t(normalized_max - normalized_min + 1 + (is_bw_eq ? 1 : 0)),
80 bucket_normalization};
87 return is_bw_eq ? 1 : 0;
97 const Executor* executor) {
98 const auto inner_table_info = executor->getTableInfo(inner_table_key);
99 std::unordered_set<int> device_holding_fragments;
100 auto cuda_mgr = executor->getDataMgr()->getCudaMgr();
101 const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
102 for (
const auto& fragment : inner_table_info.fragments) {
103 if (fragment.shard != -1) {
104 const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
116 std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
117 const Executor* executor) {
118 const auto inner_col = equi_pair.first;
120 if (!outer_col || inner_col->getColumnKey().table_id < 0 ||
121 outer_col->getColumnKey().table_id < 0) {
124 if (outer_col->get_rte_idx()) {
127 if (inner_col->get_type_info() != outer_col->get_type_info()) {
131 const auto inner_td =
134 const auto outer_td =
137 if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
138 inner_td->nShards != outer_td->nShards) {
145 return (inner_td->shardedColumnId == inner_col->getColumnKey().column_id &&
146 outer_td->shardedColumnId == outer_col->getColumnKey().column_id) ||
147 (outer_td->shardedColumnId == inner_col->getColumnKey().column_id &&
148 inner_td->shardedColumnId == inner_col->getColumnKey().column_id)
155 const std::vector<InputTableInfo>& query_infos) {
156 std::optional<size_t> ti_idx;
157 for (
size_t i = 0; i < query_infos.size(); ++i) {
158 if (inner_table_key == query_infos[i].table_key) {
164 return query_infos[*ti_idx];
169 const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
170 const std::vector<InputTableInfo>& query_infos,
174 const int device_count,
181 const auto cols_and_string_op_infos =
182 get_cols(qual_bin_oper.get(), executor->temporary_tables_);
183 const auto& cols = cols_and_string_op_infos.first;
184 const auto& inner_outer_string_op_infos = cols_and_string_op_infos.second;
185 const auto inner_col = cols.first;
187 const auto& ti = inner_col->get_type_info();
192 "Could not compute range for the expressions involved in the equijoin");
194 const auto rhs_source_col_range =
196 if (ti.is_string()) {
200 "Could not compute range for the expressions involved in the equijoin");
202 if (rhs_source_col_range.getIntMin() > rhs_source_col_range.getIntMax()) {
204 CHECK_EQ(rhs_source_col_range.getIntMin(), int64_t(0));
205 CHECK_EQ(rhs_source_col_range.getIntMax(), int64_t(-1));
206 col_range = rhs_source_col_range;
209 std::min(rhs_source_col_range.getIntMin(), col_range.getIntMin()),
210 std::max(rhs_source_col_range.getIntMax(), col_range.getIntMax()),
212 rhs_source_col_range.hasNulls());
216 ti, col_range, qual_bin_oper->get_optype() ==
kBW_EQ);
218 size_t const rowid_size =
sizeof(int32_t);
219 auto const max_num_hash_entries =
221 if (bucketized_entry_count > max_num_hash_entries) {
223 bucketized_entry_count, max_num_hash_entries, memory_level));
226 auto const& inner_table_info =
228 auto const num_inner_table_tuple = inner_table_info.getFragmentNumTuplesUpperBound();
232 auto const deploy_baseline_join =
236 bucketized_entry_count;
237 if (deploy_baseline_join) {
238 std::ostringstream oss;
239 oss <<
"Switch to baseline hash join: a join column has too wide hash value range "
240 "when comparing the actual # rows";
241 oss <<
"(# hash entries: " << bucketized_entry_count
242 <<
", # rows: " << num_inner_table_tuple <<
")";
246 if (qual_bin_oper->get_optype() ==
kBW_EQ &&
247 col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
248 throw HashJoinFail(
"Cannot translate null value for kBW_EQ");
250 decltype(std::chrono::steady_clock::now()) ts1, ts2;
252 ts1 = std::chrono::steady_clock::now();
254 auto hash_type = preferred_hash_type;
256 LOG(
INFO) <<
"A user's query hint forced the join operation to use OneToMany hash "
260 auto join_hash_table = std::shared_ptr<PerfectJoinHashTable>(
268 rhs_source_col_range,
269 bucketized_entry_count_info,
274 hashtable_build_dag_map,
275 table_id_to_node_map,
277 inner_outer_string_op_infos));
279 join_hash_table->reify();
282 join_hash_table->freeHashBufferMemory();
283 throw std::runtime_error(e.what());
287 join_hash_table->freeHashBufferMemory();
288 throw HashJoinFail(std::string(
"Could not build a 1-to-1 correspondence for columns "
289 "involved in equijoin | ") +
292 throw HashJoinFail(std::string(
"Could not build hash tables for equijoin | ") +
296 std::string(
"Ran out of memory while building hash tables for equijoin | ") +
300 }
catch (
const std::exception& e) {
301 throw std::runtime_error(
302 std::string(
"Fatal error while attempting to build hash tables for join: ") +
306 ts2 = std::chrono::steady_clock::now();
307 VLOG(1) <<
"Built perfect hash table "
309 << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
312 return join_hash_table;
318 const Executor* executor) {
319 if (inner_outer_string_op_infos.first.size() ||
320 inner_outer_string_op_infos.second.size()) {
323 auto inner_col = inner_outer_col_pair.first;
324 auto outer_col_expr = inner_outer_col_pair.second;
326 const auto& inner_col_key = inner_col->getColumnKey();
328 inner_col_key.table_id,
330 executor->getTemporaryTables());
332 if (!inner_ti.is_string()) {
339 if (!inner_cd || !outer_cd) {
342 const auto& outer_col_key = outer_col->getColumnKey();
344 outer_col_key.table_id,
346 executor->getTemporaryTables());
347 CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
349 if (outer_ti.getStringDictKey() != inner_ti.getStringDictKey()) {
352 const auto inner_str_dict_proxy =
353 executor->getStringDictionaryProxy(inner_ti.getStringDictKey(),
true);
354 CHECK(inner_str_dict_proxy);
355 const auto outer_str_dict_proxy =
356 executor->getStringDictionaryProxy(outer_ti.getStringDictKey(),
true);
357 CHECK(outer_str_dict_proxy);
359 return *inner_str_dict_proxy != *outer_str_dict_proxy;
363 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
365 const int device_count) {
366 std::vector<Fragmenter_Namespace::FragmentInfo> shards_for_device;
367 for (
const auto& fragment : fragments) {
369 if (fragment.shard % device_count == device_id) {
370 shards_for_device.push_back(fragment);
373 return shards_for_device;
377 const std::vector<ColumnsForDevice>& columns_per_device)
const {
380 const auto max_unique_hash_input_entries =
385 for (
const auto& device_columns : columns_per_device) {
386 CHECK(!device_columns.join_columns.empty());
387 const auto rhs_join_col_num_entries = device_columns.join_columns.front().num_elems;
388 if (rhs_join_col_num_entries > max_unique_hash_input_entries) {
389 VLOG(1) <<
"Skipping attempt to build perfect hash one-to-one table as number of "
390 "rhs column entries ("
391 << rhs_join_col_num_entries <<
") exceeds range for rhs join column ("
392 << max_unique_hash_input_entries <<
").";
403 const auto inner_col = cols.first;
405 inner_col->getTableKey(),
409 if (query_info.fragments.empty()) {
415 std::vector<std::future<void>> init_threads;
426 std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
427 std::vector<ColumnsForDevice> columns_per_device;
428 std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
445 std::vector<ChunkKey> chunk_key_per_device;
448 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
449 fragments_per_device.emplace_back(
452 : query_info.fragments);
454 dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
457 const auto chunk_key =
458 genChunkKey(fragments_per_device[device_id], outer_col, inner_col);
459 chunk_key_per_device.emplace_back(std::move(chunk_key));
463 auto hashtable_access_path_info =
471 fragments_per_device,
475 table_keys_ = hashtable_access_path_info.table_keys;
490 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
492 fragments_per_device[device_id].begin(),
493 fragments_per_device[device_id].end(),
495 [](
size_t sum,
const auto& fragment) {
return sum + fragment.getNumTuples(); });
498 outer_col ? outer_col : inner_col,
500 chunk_key_per_device[device_id],
511 auto allow_hashtable_recycling =
513 needs_dict_translation_,
515 inner_col->getTableKey());
516 const bool invalid_cache_key =
518 if (!invalid_cache_key && allow_hashtable_recycling) {
523 std::for_each(hashtable_cache_key_.cbegin(),
524 hashtable_cache_key_.cend(),
529 auto found_cached_one_to_many_layout =
std::any_of(
530 hashtable_cache_key_.cbegin(),
531 hashtable_cache_key_.cend(),
538 return cached_hashtable_layout_type &&
541 if (found_cached_one_to_many_layout) {
586 if (!(col_range_ == copied_col_range)) {
592 bool has_invalid_cached_hash_table =
false;
595 allow_hashtable_recycling, invalid_cache_key,
join_type_)) {
598 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
607 has_invalid_cached_hash_table =
true;
612 if (has_invalid_cached_hash_table) {
618 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
622 cpu_hash_table->getHashTableEntryInfo(),
636 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
637 columns_per_device.emplace_back(
641 ? dev_buff_owners[device_id].
get()
646 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
647 const auto chunk_key =
genChunkKey(fragments_per_device[device_id],
654 columns_per_device[device_id],
659 for (
auto& init_thread : init_threads) {
662 for (
auto& init_thread : init_threads) {
666 VLOG(1) <<
"RHS/Inner hash join values detected to not be unique, falling back to "
667 "One-to-Many hash layout.";
670 init_threads.clear();
672 CHECK_EQ(dev_buff_owners.size(), size_t(device_count_));
674 CHECK_EQ(columns_per_device.size(), size_t(device_count_));
675 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
676 const auto chunk_key =
genChunkKey(fragments_per_device[device_id],
683 columns_per_device[device_id],
688 for (
auto& init_thread : init_threads) {
691 for (
auto& init_thread : init_threads) {
695 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
696 auto const cache_key = hashtable_cache_key_[device_id];
698 if (hash_table_ptr) {
700 hash_table_ptr->getLayout(),
711 const std::vector<InnerOuter>& inner_outer_pairs)
const {
721 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
724 std::vector<JoinColumn> join_columns;
725 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
726 std::vector<JoinColumnTypeInfo> join_column_types;
727 std::vector<JoinBucketInfo> join_bucket_info;
728 std::vector<std::shared_ptr<void>> malloc_owner;
729 const auto effective_memory_level =
732 const auto inner_col = inner_outer_pair.first;
734 if (inner_cd && inner_cd->isVirtualCol) {
739 effective_memory_level,
746 const auto& ti = inner_col->get_type_info();
755 return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
766 const auto effective_memory_level =
771 auto& join_column = columns_for_device.
join_columns.front();
777 effective_memory_level,
787 effective_memory_level,
790 throw std::runtime_error(
"Unexpected error building one to many hash table: " +
802 const int device_id) {
804 const auto inner_col = cols.first;
810 const int32_t hash_join_invalid_val{-1};
811 auto hashtable_layout = layout;
812 auto allow_hashtable_recycling =
816 inner_col->getTableKey());
823 auto const hash_table_size = hash_table_entry_info.computeHashTableSize();
829 hash_table_size >
executor_->maxGpuSlabSize()) {
833 CHECK(!chunk_key.empty());
834 std::shared_ptr<PerfectHashTable> hash_table{
nullptr};
835 decltype(std::chrono::steady_clock::now()) ts1, ts2;
836 ts1 = std::chrono::steady_clock::now();
848 hash_table_entry_info,
849 hash_join_invalid_val,
860 hash_table_entry_info,
861 hash_join_invalid_val,
865 ts2 = std::chrono::steady_clock::now();
867 std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
869 hash_table->setColumnNumElems(join_column.
num_elems);
870 if (allow_hashtable_recycling && hash_table &&
883 const auto& ti = inner_col->get_type_info();
884 CHECK(ti.is_string());
900 hash_table_entry_info,
905 builder.initHashTableOnGpu(chunk_key,
912 hash_table_entry_info,
914 hash_join_invalid_val,
928 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
932 ChunkKey chunk_key{column_key.
db_id, column_key.table_id, column_key.column_id};
934 std::for_each(fragments.cbegin(), fragments.cend(), [&chunk_key](
const auto& fragment) {
936 chunk_key.push_back(fragment.fragmentId);
938 if (ti.is_string()) {
943 size_t outer_elem_count =
945 outer_query_info.fragments.end(),
947 [&chunk_key](
size_t sum,
const auto& fragment) {
948 chunk_key.push_back(fragment.fragmentId);
949 return sum + fragment.getNumTuples();
951 chunk_key.push_back(outer_elem_count);
963 VLOG(1) <<
"Checking CPU hash table cache.";
975 std::shared_ptr<PerfectHashTable> hashtable_ptr,
977 size_t hashtable_building_time) {
979 CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
986 hashtable_building_time);
992 if (hash_ptr->getType()->isIntegerTy(64)) {
995 CHECK(hash_ptr->getType()->isPointerTy());
996 return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
998 llvm::Type::getInt64Ty(
executor_->cgen_state_->context_));
1002 llvm::Value* hash_ptr,
1003 llvm::Value* key_lv,
1005 const int shard_count,
1016 std::vector<llvm::Value*> hash_join_idx_args{
1018 executor_->cgen_state_->castToTypeIn(key_lv, 64),
1022 const auto expected_hash_entry_count =
1024 const auto entry_count_per_shard =
1025 (expected_hash_entry_count + shard_count - 1) / shard_count;
1026 hash_join_idx_args.push_back(
1027 executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
1028 hash_join_idx_args.push_back(
executor_->cgen_state_->llInt<uint32_t>(shard_count));
1032 if (!key_col_logical_ti.get_notnull() ||
isBitwiseEq()) {
1033 hash_join_idx_args.push_back(
executor_->cgen_state_->llInt(
1036 auto special_date_bucketization_case = key_col_ti.
get_type() ==
kDATE;
1038 if (special_date_bucketization_case) {
1039 hash_join_idx_args.push_back(
executor_->cgen_state_->llInt(
1042 hash_join_idx_args.push_back(
1047 if (special_date_bucketization_case) {
1048 hash_join_idx_args.emplace_back(
1052 return hash_join_idx_args;
1056 const size_t index) {
1059 auto key_col = cols.second;
1061 auto val_col = cols.first;
1068 if (key_col_var && val_col_var &&
1073 throw std::runtime_error(
1074 "Query execution fails because the query contains not supported self-join "
1075 "pattern. We suspect the query requires multiple left-deep join tree due to "
1077 "join condition of the self-join and is not supported for now. Please consider "
1078 "rewriting table order in "
1086 auto hash_join_idx_args =
getHashJoinArgs(pos_ptr, key_lv, key_col, shard_count, co);
1088 const auto& key_col_ti = key_col->get_type_info();
1090 auto bucketize = (key_col_ti.get_type() ==
kDATE);
1093 !key_col_ti.get_notnull(),
1118 return hash_table->getEntryCount() *
sizeof(int32_t);
1130 std::shared_ptr<PerfectHashTable>& cpu_hash_table,
1132 const int device_id,
1136 CHECK(cpu_hash_table);
1141 hash_table_entry_info,
1147 std::shared_ptr<PerfectHashTable> gpu_hash_table = gpu_builder.
getHashTable();
1148 CHECK(gpu_hash_table);
1149 auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1150 if (gpu_buffer_ptr) {
1151 auto device_allocator = std::make_unique<CudaAllocator>(
1153 device_allocator->copyToDevice(
1155 cpu_hash_table->getCpuBuffer(),
1163 const int device_id,
1172 std::unique_ptr<int8_t[]> buffer_copy;
1174 buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1176 auto data_mgr =
executor_->getDataMgr();
1177 auto device_allocator = std::make_unique<CudaAllocator>(
1179 device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1181 auto ptr1 = buffer_copy ? buffer_copy.get() :
reinterpret_cast<const int8_t*
>(buffer);
1183 auto ptr1 =
reinterpret_cast<const int8_t*
>(buffer);
1192 hash_table ? hash_table->getEntryCount() : 0,
1203 const int device_id)
const {
1208 std::unique_ptr<int8_t[]> buffer_copy;
1210 buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1212 auto data_mgr =
executor_->getDataMgr();
1213 auto device_allocator = std::make_unique<CudaAllocator>(
1215 device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1217 auto ptr1 = buffer_copy ? buffer_copy.get() :
reinterpret_cast<const int8_t*
>(buffer);
1219 auto ptr1 =
reinterpret_cast<const int8_t*
>(buffer);
1226 hash_table ? hash_table->getEntryCount() : 0,
1235 const size_t index) {
1237 using namespace std::string_literals;
1240 const auto cols_and_string_op_infos =
1242 const auto& cols = cols_and_string_op_infos.first;
1243 const auto& inner_outer_string_op_infos = cols_and_string_op_infos.second;
1244 auto key_col = cols.second;
1246 auto val_col = cols.first;
1251 if (key_col_var && val_col_var &&
1256 throw std::runtime_error(
1257 "Query execution failed because the query contains not supported self-join "
1258 "pattern. We suspect the query requires multiple left-deep join tree due to "
1259 "the join condition of the self-join and is not supported for now. Please "
1260 "consider chaning the table order in the FROM clause.");
1264 key_col, inner_outer_string_op_infos.second, code_generator, co);
1270 const auto hash_join_idx_args =
1274 std::string fname((key_col_ti.get_type() ==
kDATE) ?
"bucketized_hash_join_idx"s
1275 :
"hash_join_idx"s);
1278 fname +=
"_bitwise";
1281 fname +=
"_sharded";
1284 if (!
isBitwiseEq() && !key_col_ti.get_notnull()) {
1285 fname +=
"_nullable";
1287 return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1296 const size_t shard_count,
1297 const size_t device_count,
1299 const auto entries_per_shard =
1300 shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
1301 size_t entries_per_device = entries_per_shard;
1303 const auto shards_per_device = (shard_count + device_count - 1) / device_count;
1305 entries_per_device = entries_per_shard * shards_per_device;
1307 return entries_per_device;
llvm::Value * codegenHashTableLoad(const size_t table_idx)
BucketizedHashEntryInfo hash_entry_info_
int64_t getIntMin() const
std::vector< int > ChunkKey
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
size_t g_num_tuple_threshold_switch_to_baseline
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
ExpressionRange rhs_source_col_range_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
size_t getNormalizedHashEntryCount() const
#define IS_EQUIVALENCE(X)
static bool canAccessHashTable(bool allow_hash_table_recycling, bool invalid_cache_key, JoinType join_type)
const Data_Namespace::MemoryLevel memory_level_
size_t getComponentBufferSize() const noexceptoverride
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
PerfectJoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const ExpressionRange &col_range, const ExpressionRange &rhs_source_col_range, const BucketizedHashEntryInfo hash_entry_info, ColumnCacheMap &column_cache, Executor *executor, const int device_count, const RegisteredQueryHint &query_hints, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map, const size_t rowid_size, const InnerOuterStringOpInfos &inner_outer_string_op_infos={})
std::mutex str_proxy_translation_mutex_
ChunkKey genChunkKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
static void checkHashJoinReplicationConstraint(const shared::TableKey &table_key, const size_t shard_count, const Executor *executor)
const TableIdToNodeMap table_id_to_node_map_
const Expr * get_right_operand() const
size_t offsetBufferOff() const noexceptoverride
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
const TableDescriptor * get_metadata_for_table(const ::shared::TableKey &table_key, bool populate_fragmenter)
bool shard_count_less_or_equal_device_count(const shared::TableKey &inner_table_key, const Executor *executor)
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
HashTableBuildDagMap hashtable_build_dag_map_
Data_Namespace::MemoryLevel get_effective_memory_level(const Data_Namespace::MemoryLevel memory_level, const bool needs_dict_translation)
const InputTableInfo & get_inner_query_info(const shared::TableKey &inner_table_key, const std::vector< InputTableInfo > &query_infos)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
size_t payloadBufferOff() const noexceptoverride
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const TemporaryTables *temporary_tables)
std::shared_ptr< PerfectHashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
HOST DEVICE SQLTypes get_type() const
bool needs_dictionary_translation(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const Executor *executor)
void allocateDeviceMemory(BucketizedHashEntryInfo hash_entry_info, PerfectHashTableEntryInfo hash_table_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
static llvm::Value * codegenColOrStringOper(const Analyzer::Expr *col_or_string_oper, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, CodeGenerator &code_generator, const CompilationOptions &co)
void freeHashBufferMemory()
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
size_t max_join_hash_table_size
HashType getHashType() const noexceptoverride
BucketizedHashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
HashtableCacheMetaInfo hashtable_cache_meta_info_
static std::unique_ptr< HashtableRecycler > hash_table_cache_
size_t g_ratio_num_hash_entry_to_num_tuple_switch_to_baseline
future< Result > async(Fn &&fn, Args &&...args)
shared::TableKey getInnerTableId() const noexceptoverride
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
int64_t bucket_normalization
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
void reifyForDevice(const ChunkKey &hash_table_key, const ColumnsForDevice &columns_for_device, const HashType layout, const int device_id, const logger::ThreadLocalIds)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
static size_t getMaximumNumHashEntriesCanHold(MemoryLevel memory_level, const Executor *executor, size_t rowid_size) noexcept
static std::string generateTooManyHashEntriesErrMsg(size_t num_entries, size_t threshold, MemoryLevel memory_level)
bool isOneToOneHashPossible(const std::vector< ColumnsForDevice > &columns_per_device) const
static constexpr size_t MAX_NUM_HASH_ENTRIES
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
DEVICE auto accumulate(ARGS &&...args)
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
size_t shardCount() const
bool needs_dict_translation_
const SQLTypeInfo & get_type_info() const
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, llvm::Value *key_lvs, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::vector< InnerOuter > inner_outer_pairs_
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
static const StringDictionaryProxy::IdMap * translateInnerToOuterStrDictProxies(const InnerOuter &cols, const InnerOuterStringOpInfos &inner_outer_string_op_infos, ExpressionRange &old_col_range, const Executor *executor)
static std::unique_ptr< HashingSchemeRecycler > hash_table_layout_cache_
std::unique_ptr< PerfectHashTable > getHashTable()
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
static QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForPerfectHashJoin &info)
const std::vector< InputTableInfo > & query_infos_
ExpressionRange col_range_
const shared::ColumnKey & getColumnKey() const
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< PerfectHashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
static std::string getHashTypeString(HashType ht) noexcept
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
size_t getNormalizedHashEntryCount() const
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
ColumnCacheMap & column_cache_
LocalIdsScopeGuard setNewThreadId() const
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
bool isHintRegistered(const QueryHint hint) const
RegisteredQueryHint query_hints_
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query's parse tree etc.
CUstream getQueryEngineCudaStreamForDevice(int device_num)
const InnerOuterStringOpInfos inner_outer_string_op_infos_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t getBucket() const
std::pair< std::vector< StringOps_Namespace::StringOpInfo >, std::vector< StringOps_Namespace::StringOpInfo >> InnerOuterStringOpInfos
#define DEBUG_TIMER(name)
static std::pair< InnerOuter, InnerOuterStringOpInfos > normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const TemporaryTables *temporary_tables, const bool is_bbox_intersect=false)
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_info_pairs, const shared::TableKey &table_key)
std::mutex cpu_hash_table_buff_mutex_
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
size_t countBufferOff() const noexceptoverride
const Expr * get_left_operand() const
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
int initHashTableForDevice(const ChunkKey &chunk_key, const JoinColumn &join_column, const InnerOuter &cols, const HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
void copyCpuHashTableToGpu(std::shared_ptr< PerfectHashTable > &cpu_hash_table, const PerfectHashTableEntryInfo hash_table_entry_info, const int device_id, Data_Namespace::DataMgr *data_mgr)
std::vector< QueryPlanHash > hashtable_cache_key_
HashTable * getHashTableForDevice(const size_t device_id) const
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
const JoinType join_type_
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
ThreadLocalIds thread_local_ids()
const StringDictionaryProxy::IdMap * str_proxy_translation_map_
const std::vector< JoinColumn > join_columns
std::unordered_set< size_t > table_keys_
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const PerfectHashTableEntryInfo hash_table_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
bool force_one_to_many_hash_join
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const PerfectHashTableEntryInfo hash_table_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
shared::TableKey getTableKey() const
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
bool isBitwiseEq() const override