27 template <
typename SIZE,
31 const size_t entry_count,
32 const int32_t invalid_slot_val,
33 const bool for_semi_join,
34 const size_t key_component_count,
35 const bool with_val_slot,
36 const KEY_HANDLER* key_handler,
37 const size_t num_elems,
38 const int32_t cpu_thread_idx,
39 const int32_t cpu_thread_count) {
40 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
51 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
63 std::is_same<KEY_HANDLER, BoundingBoxIntersectKeyHandler>::value,
64 "Only Generic, Bounding Box Intersect, and Range Key Handlers are supported.");
77 template <
typename SIZE,
81 const size_t entry_count,
82 const int32_t invalid_slot_val,
83 const bool for_semi_join,
84 const size_t key_component_count,
85 const bool with_val_slot,
86 const KEY_HANDLER* key_handler,
87 const size_t num_elems,
88 const int32_t cpu_thread_idx,
89 const int32_t cpu_thread_count) {
90 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
101 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
113 std::is_same<KEY_HANDLER, BoundingBoxIntersectKeyHandler>::value,
114 "Only Generic, Bounding Box Intersection, and Range Key Handlers are supported.");
127 template <
typename SIZE,
131 const size_t entry_count,
132 const int32_t invalid_slot_val,
133 const bool for_semi_join,
134 const size_t key_component_count,
135 const bool with_val_slot,
137 const KEY_HANDLER* key_handler,
138 const size_t num_elems) {
139 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
149 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
153 std::is_same<KEY_HANDLER, BoundingBoxIntersectKeyHandler>::value,
154 "Only Generic, Bounding Box Intersection, and Range Key Handlers are supported.");
155 LOG(
FATAL) <<
"32-bit keys not yet supported for bounding box intersect.";
159 template <
typename SIZE,
163 const size_t entry_count,
164 const int32_t invalid_slot_val,
165 const bool for_semi_join,
166 const size_t key_component_count,
167 const bool with_val_slot,
169 const KEY_HANDLER* key_handler,
170 const size_t num_elems) {
171 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
181 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
192 std::is_same<KEY_HANDLER, BoundingBoxIntersectKeyHandler>::value,
193 "Only Generic, Bounding Box Intersect, and Range Key Handlers are supported.");
205 template <
typename SIZE,
209 const SIZE* composite_key_dict,
210 const size_t hash_entry_count,
211 const size_t key_component_count,
212 const KEY_HANDLER* key_handler,
213 const size_t num_elems,
214 const bool for_window_framing) {
215 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
225 std::is_same<KEY_HANDLER, BoundingBoxIntersectKeyHandler>::value ||
226 std::is_same<KEY_HANDLER, RangeKeyHandler>::value,
227 "Only Generic, Bounding Box Intersection, and Range Key Handlers are supported.");
228 LOG(
FATAL) <<
"32-bit keys not yet supported for bounding box intersect.";
232 template <
typename SIZE,
236 const SIZE* composite_key_dict,
237 const size_t hash_entry_count,
238 const size_t key_component_count,
239 const KEY_HANDLER* key_handler,
240 const size_t num_elems,
241 const bool for_window_framing) {
242 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
249 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
251 buff, composite_key_dict, hash_entry_count, key_handler, num_elems);
254 std::is_same<KEY_HANDLER, BoundingBoxIntersectKeyHandler>::value,
255 "Only Generic, Bounding Box Intersect, and Range Key Handlers are supported.");
257 buff, composite_key_dict, hash_entry_count, key_handler, num_elems);
264 template <
class KEY_HANDLER>
267 const std::vector<JoinColumn>& join_columns,
268 const std::vector<JoinColumnTypeInfo>& join_column_types,
269 const std::vector<JoinBucketInfo>& join_bucket_info,
271 str_proxy_translation_maps_ptrs_and_offsets,
274 const Executor* executor,
283 const bool for_semi_join =
289 if (hash_table_entry_info.getNumKeys() == 0) {
290 VLOG(1) <<
"Stop building a hash table: the input table is empty";
293 auto cpu_hash_table_ptr =
hash_table_->getCpuBuffer();
295 std::vector<std::future<void>> init_cpu_buff_threads;
297 auto timer_init =
DEBUG_TIMER(
"Initialize CPU Baseline Join Hash Table");
299 switch (hash_table_entry_info.getJoinKeysSize()) {
301 init_baseline_hash_join_buff_tbb_32(cpu_hash_table_ptr,
302 hash_table_entry_info.getNumHashEntries(),
303 hash_table_entry_info.getNumJoinKeys(),
308 init_baseline_hash_join_buff_tbb_64(cpu_hash_table_ptr,
309 hash_table_entry_info.getNumHashEntries(),
310 hash_table_entry_info.getNumJoinKeys(),
317 #else // #ifdef HAVE_TBB
318 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
319 init_cpu_buff_threads.emplace_back(
std::async(
321 [keyspace_entry_count,
331 switch (key_component_width) {
334 keyspace_entry_count,
343 keyspace_entry_count,
355 for (
auto& child : init_cpu_buff_threads) {
360 std::vector<std::future<int>> fill_cpu_buff_threads;
361 auto timer_fill =
DEBUG_TIMER(
"Fill CPU Baseline Join Hash Table");
362 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
363 fill_cpu_buff_threads.emplace_back(
std::async(
367 hash_table_entry_info,
376 switch (hash_table_entry_info.getJoinKeysSize()) {
378 return fill_baseline_hash_join_buff<int32_t>(
380 hash_table_entry_info.getNumHashEntries(),
383 hash_table_entry_info.getNumJoinKeys(),
386 join_columns[0].num_elems,
391 return fill_baseline_hash_join_buff<int64_t>(
393 hash_table_entry_info.getNumHashEntries(),
396 hash_table_entry_info.getNumJoinKeys(),
399 join_columns[0].num_elems,
405 << hash_table_entry_info.getJoinKeysSize();
411 for (
auto& child : fill_cpu_buff_threads) {
412 int partial_err = child.get();
421 auto one_to_many_buff =
reinterpret_cast<int32_t*
>(
422 cpu_hash_table_ptr + hash_table_entry_info.getNumHashEntries() *
423 hash_table_entry_info.computeKeySize());
425 auto timer_init_additional_buffers =
426 DEBUG_TIMER(
"Initialize Additional Buffers for CPU Baseline Join Hash Table");
428 one_to_many_buff, hash_table_entry_info.getNumHashEntries(), -1, 0, 1);
430 bool is_geo_compressed =
false;
431 if constexpr (std::is_same_v<KEY_HANDLER, RangeKeyHandler>) {
432 if (
const auto range_handler =
433 reinterpret_cast<const RangeKeyHandler*>(key_handler)) {
434 is_geo_compressed = range_handler->is_compressed_;
437 auto timer_fill_additional_buffers =
438 DEBUG_TIMER(
"Fill Additional Buffers for CPU Baseline Join Hash Table");
440 switch (hash_table_entry_info.getJoinKeysSize()) {
442 const auto composite_key_dict =
reinterpret_cast<int32_t*
>(cpu_hash_table_ptr);
446 hash_table_entry_info.getNumHashEntries(),
447 hash_table_entry_info.getNumJoinKeys(),
451 str_proxy_translation_maps_ptrs_and_offsets.first,
452 str_proxy_translation_maps_ptrs_and_offsets.second,
454 std::is_same_v<KEY_HANDLER, RangeKeyHandler>,
460 const auto composite_key_dict =
reinterpret_cast<int64_t*
>(cpu_hash_table_ptr);
464 hash_table_entry_info.getNumHashEntries(),
465 hash_table_entry_info.getNumJoinKeys(),
469 str_proxy_translation_maps_ptrs_and_offsets.first,
470 str_proxy_translation_maps_ptrs_and_offsets.second,
472 std::is_same_v<KEY_HANDLER, RangeKeyHandler>,
486 const Executor* executor,
494 if (hash_table_size > executor->maxGpuSlabSize()) {
505 template <
class KEY_HANDLER>
507 const std::vector<JoinColumn>& join_columns,
511 const Executor* executor,
519 if (hash_table_entry_info.
getNumKeys() == 0) {
520 VLOG(1) <<
"Stop building a hash table based on a column: an input table is empty";
523 auto data_mgr = executor->getDataMgr();
524 auto allocator = std::make_unique<CudaAllocator>(
526 auto dev_err_buff = allocator->alloc(
sizeof(
int));
527 allocator->copyToDevice(dev_err_buff, &err,
sizeof(err));
528 auto gpu_hash_table_buff =
hash_table_->getGpuBuffer();
529 CHECK(gpu_hash_table_buff);
530 const bool for_semi_join =
535 auto timer_init =
DEBUG_TIMER(
"Initialize GPU Baseline Join Hash Table");
557 auto timer_fill =
DEBUG_TIMER(
"Fill GPU Baseline Join Hash Table");
560 fill_baseline_hash_join_buff_on_device<int32_t>(
567 reinterpret_cast<int*
>(dev_err_buff),
569 join_columns.front().num_elems);
570 allocator->copyFromDevice(&err, dev_err_buff,
sizeof(err));
574 fill_baseline_hash_join_buff_on_device<int64_t>(
581 reinterpret_cast<int*
>(dev_err_buff),
583 join_columns.front().num_elems);
584 allocator->copyFromDevice(&err, dev_err_buff,
sizeof(err));
594 auto one_to_many_buff =
reinterpret_cast<int32_t*
>(
598 auto timer_init_additional_buf =
599 DEBUG_TIMER(
"Initialize Additional Buffer for GPU Baseline Join Hash Table");
604 auto timer_fill_additional_buf =
605 DEBUG_TIMER(
"Fill Additional Buffer for GPU Baseline Join Hash Table");
608 const auto composite_key_dict =
reinterpret_cast<int32_t*
>(gpu_hash_table_buff);
609 fill_one_to_many_baseline_hash_table_on_device<int32_t>(
615 join_columns.front().num_elems,
621 const auto composite_key_dict =
reinterpret_cast<int64_t*
>(gpu_hash_table_buff);
622 fill_one_to_many_baseline_hash_table_on_device<int64_t>(
628 join_columns.front().num_elems,
void fill_baseline_hash_join_buff_on_device(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const KEY_HANDLER *key_handler, const size_t num_elems)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
void fill_one_to_many_baseline_hash_table_on_device(int32_t *buff, const SIZE *composite_key_dict, const size_t hash_entry_count, const size_t key_component_count, const KEY_HANDLER *key_handler, const size_t num_elems, const bool for_window_framing)
size_t computeKeySize() const
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
void init_baseline_hash_join_buff_on_device_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
void fill_baseline_hash_join_buff_on_device_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const int64_t num_elems)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
void range_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const RangeKeyHandler *key_handler, const size_t num_elems)
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
BaselineJoinHashTableBuilder()=default
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
std::unique_ptr< BaselineHashTable > hash_table_
void allocateDeviceMemory(const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const JoinType join_type, const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
size_t max_join_hash_table_size
size_t computeHashTableSize() const override
void setHashLayout(HashType layout)
void init_baseline_hash_join_buff_on_device_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
future< Result > async(Fn &&fn, Args &&...args)
void fill_one_to_many_baseline_hash_table_on_device_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const GenericKeyHandler *key_handler, const int64_t num_elems, const bool for_window_framing)
HashType getHashTableLayout() const
size_t getNumHashEntries() const
void bbox_intersect_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const BoundingBoxIntersectKeyHandler *key_handler, const int64_t num_elems)
int bbox_intersect_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const BoundingBoxIntersectKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int bbox_intersect_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const BoundingBoxIntersectKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const GenericKeyHandler *key_handler, const int64_t num_elems, const bool for_window_framing)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void range_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const RangeKeyHandler *key_handler, const size_t num_elems)
size_t getNumKeys() const
int range_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
HashType getHashLayout() const
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
std::unique_ptr< BaselineHashTable > getHashTable()
int range_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
bool isHintRegistered(const QueryHint hint) const
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const StrProxyTranslationMapsPtrsAndOffsets &str_proxy_translation_maps_ptrs_and_offsets, const BaselineHashTableEntryInfo hash_table_entry_info, const JoinType join_type, const Executor *executor, const RegisteredQueryHint &query_hint)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
void bbox_intersect_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const BoundingBoxIntersectKeyHandler *key_handler, const int64_t num_elems)
#define DEBUG_TIMER(name)
size_t getNumJoinKeys() const
void fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const int64_t num_elems)
Allocate GPU memory using GpuBuffers via DataMgr.
size_t getJoinKeysSize() const
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ThreadLocalIds thread_local_ids()
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept