34 std::unique_ptr<BoundingBoxIntersectTuningParamRecycler>
36 std::make_unique<BoundingBoxIntersectTuningParamRecycler>();
39 std::shared_ptr<BoundingBoxIntersectJoinHashTable>
41 const std::shared_ptr<Analyzer::BinOper> condition,
42 const std::vector<InputTableInfo>& query_infos,
45 const int device_count,
51 decltype(std::chrono::steady_clock::now()) ts1, ts2;
52 auto copied_query_hints = query_hints;
54 LOG(
INFO) <<
"Ignoring query hint \'force_one_to_many_hash_join\' for bounding box "
56 copied_query_hints.force_one_to_many_hash_join =
false;
59 LOG(
INFO) <<
"Ignoring query hint \'force_baseline_hash_join\' for bounding box "
61 copied_query_hints.force_baseline_hash_join =
false;
63 std::vector<InnerOuter> inner_outer_pairs;
64 if (
const auto range_expr =
65 dynamic_cast<const Analyzer::RangeOper*>(condition->get_right_operand())) {
74 hashtable_build_dag_map,
76 table_id_to_node_map);
82 CHECK(!inner_outer_pairs.empty());
84 const auto getHashTableType =
85 [](
const std::shared_ptr<Analyzer::BinOper> condition,
86 const std::vector<InnerOuter>& inner_outer_pairs) ->
HashType {
88 if (condition->is_bbox_intersect_oper()) {
89 CHECK_EQ(inner_outer_pairs.size(), size_t(1));
90 if (inner_outer_pairs[0].first->get_type_info().is_array() &&
91 inner_outer_pairs[0].second->get_type_info().is_array() &&
93 inner_outer_pairs[0].second->get_type_info().get_size() == 32) {
100 const auto layout = getHashTableType(condition, inner_outer_pairs);
104 <<
" for qual: " << condition->toString();
105 ts1 = std::chrono::steady_clock::now();
108 const auto qi_0 = query_infos[0].info.getNumTuplesUpperBound();
109 const auto qi_1 = query_infos[1].info.getNumTuplesUpperBound();
111 VLOG(1) <<
"table_key = " << query_infos[0].table_key <<
" has " << qi_0 <<
" tuples.";
112 VLOG(1) <<
"table_key = " << query_infos[1].table_key <<
" has " << qi_1 <<
" tuples.";
114 const auto& query_info =
122 auto join_hash_table =
123 std::make_shared<BoundingBoxIntersectJoinHashTable>(condition,
132 hashtable_build_dag_map,
133 table_id_to_node_map);
135 join_hash_table->reify(layout);
137 throw HashJoinFail(std::string(
"Could not build a 1-to-1 correspondence for columns "
138 "involved in bounding box intersection | ") +
142 std::string(
"Could not build hash tables for bounding box intersection | "
143 "Inner table too big. Attempt manual table reordering "
144 "or create a single fragment inner table. | ") +
148 }
catch (
const std::exception& e) {
150 std::string(
"Failed to build hash tables for bounding box intersection | ") +
154 ts2 = std::chrono::steady_clock::now();
156 << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
159 return join_hash_table;
165 const std::vector<double>& bucket_sizes,
166 const std::vector<double>& bucket_thresholds,
167 const double initial_value) {
168 std::vector<double> corrected_bucket_sizes(bucket_sizes);
169 for (
size_t i = 0; i != bucket_sizes.size(); ++i) {
170 if (bucket_sizes[i] == initial_value) {
171 corrected_bucket_sizes[i] = bucket_thresholds[i];
174 return corrected_bucket_sizes;
178 const std::vector<double>& bucket_thresholds,
182 const std::vector<InnerOuter>& inner_outer_pairs,
183 const Executor* executor) {
185 CHECK_EQ(inner_outer_pairs.size(), 1u);
187 const auto col = inner_outer_pairs[0].first;
189 const auto col_ti = col->get_type_info();
190 CHECK(col_ti.is_array());
194 const size_t num_dims{2};
195 const double initial_bin_value{0.0};
196 std::vector<double> bucket_sizes(num_dims, initial_bin_value);
197 CHECK_EQ(bucket_thresholds.size(), num_dims);
199 VLOG(1) <<
"Computing x and y bucket sizes for bounding box intersection with maximum "
207 bucket_sizes, join_column, join_column_type, bucket_thresholds, thread_count);
212 const int device_id = 0;
213 auto data_mgr = executor->getDataMgr();
216 auto device_bucket_sizes_gpu =
220 auto device_bucket_thresholds_gpu =
225 join_column_type_gpu,
226 device_bucket_thresholds_gpu);
227 allocator.
copyFromDevice(reinterpret_cast<int8_t*>(bucket_sizes.data()),
228 reinterpret_cast<int8_t*>(device_bucket_sizes_gpu),
229 bucket_sizes.size() *
sizeof(double));
233 bucket_sizes, bucket_thresholds, initial_bin_value);
235 VLOG(1) <<
"Computed x and y bucket sizes for bounding box intersection: ("
236 << corrected_bucket_sizes[0] <<
", " << corrected_bucket_sizes[1] <<
")";
238 return corrected_bucket_sizes;
243 const size_t emitted_keys_count,
244 const size_t hash_table_size,
245 const std::vector<double>& bucket_sizes)
246 : entry_count(entry_count)
247 , emitted_keys_count(emitted_keys_count)
248 , keys_per_bin(entry_count == 0 ? std::numeric_limits<double>::max()
249 : emitted_keys_count / (entry_count / 2.0))
250 , hash_table_size(hash_table_size)
251 , bucket_sizes(bucket_sizes) {}
263 os <<
" entry_count: " << props.
entry_count <<
", emitted_keys "
271 const double bbox_intersect_target_entries_per_bin)
274 , chosen_bbox_intersect_threshold(-1)
276 , crt_reverse_search_iteration(0)
277 , bbox_intersect_max_table_size_bytes(bbox_intersect_max_table_size_bytes)
278 , bbox_intersect_target_entries_per_bin(bbox_intersect_target_entries_per_bin) {}
294 const size_t max_reverse_search_iterations{8};
300 const bool new_bbox_intersect_threshold) {
301 prev_props = crt_props;
302 crt_props = new_props;
305 if (hashTableTooBig() || keysPerBinIncreasing()) {
306 if (hashTableTooBig()) {
307 VLOG(1) <<
"Reached hash table size limit: "
308 << bbox_intersect_max_table_size_bytes <<
" with "
309 << crt_props.hash_table_size <<
" byte hash table, "
310 << crt_props.keys_per_bin <<
" keys per bin.";
311 }
else if (keysPerBinIncreasing()) {
312 VLOG(1) <<
"Keys per bin increasing from " << prev_props.keys_per_bin <<
" to "
313 << crt_props.keys_per_bin;
314 CHECK(previousIterationValid());
316 if (previousIterationValid()) {
317 VLOG(1) <<
"Using previous threshold value " << chosen_bbox_intersect_threshold;
318 crt_props = prev_props;
321 CHECK(hashTableTooBig());
322 crt_reverse_search_iteration++;
323 chosen_bbox_intersect_threshold = new_bbox_intersect_threshold;
325 if (crt_reverse_search_iteration == max_reverse_search_iterations) {
326 VLOG(1) <<
"Hit maximum number (" << max_reverse_search_iterations
327 <<
") of reverse tuning iterations. Aborting tuning";
332 if (crt_reverse_search_iteration > 1 &&
333 crt_props.hash_table_size == prev_props.hash_table_size) {
335 VLOG(1) <<
"Hash table size not decreasing (" << crt_props.hash_table_size
336 <<
" bytes) and still above maximum allowed size ("
337 << bbox_intersect_max_table_size_bytes <<
" bytes). Aborting tuning";
343 if (crt_step == 1 && crt_reverse_search_iteration == 1) {
345 <<
"First iteration of tuning led to hash table size over "
346 "limit. Reversing search to try larger bin sizes (previous threshold: "
347 << chosen_bbox_intersect_threshold <<
")";
349 tuning_direction = TuningDirection::LARGER;
356 chosen_bbox_intersect_threshold = new_bbox_intersect_threshold;
358 if (keysPerBinUnderThreshold()) {
359 VLOG(1) <<
"Hash table reached size " << crt_props.hash_table_size
360 <<
" with keys per bin " << crt_props.keys_per_bin <<
" under threshold "
361 << bbox_intersect_target_entries_per_bin
362 <<
". Terminating bucket size loop.";
366 if (crt_reverse_search_iteration > 0) {
370 VLOG(1) <<
"On reverse (larger tuning direction) search found workable "
371 <<
" hash table size of " << crt_props.hash_table_size
372 <<
" with keys per bin " << crt_props.keys_per_bin
373 <<
". Terminating bucket size loop.";
381 return crt_props.hash_table_size > bbox_intersect_max_table_size_bytes;
385 return crt_props.keys_per_bin > prev_props.keys_per_bin;
389 return tuning_direction == TuningDirection::SMALLER && crt_step > 1;
393 return crt_props.keys_per_bin < bbox_intersect_target_entries_per_bin;
401 const double min_threshold,
403 const std::vector<ColumnsForDevice>& columns_per_device,
404 const std::vector<InnerOuter>& inner_outer_pairs,
405 const size_t table_tuple_count,
406 const Executor* executor)
408 , bucket_thresholds_(num_dims_, bucket_threshold)
410 , min_threshold_(min_threshold)
411 , effective_memory_level_(effective_memory_level)
412 , columns_per_device_(columns_per_device)
413 , inner_outer_pairs_(inner_outer_pairs)
414 , table_tuple_count_(table_tuple_count)
416 CHECK(!columns_per_device_.empty());
422 return tuneOneStep(tuning_direction, step_);
426 const double step_overide) {
427 if (table_tuple_count_ == 0) {
430 if (tuning_direction == TuningState::TuningDirection::SMALLER) {
431 return tuneSmallerOneStep(step_overide);
433 return tuneLargerOneStep(step_overide);
437 return *std::min_element(bucket_thresholds_.begin(), bucket_thresholds_.end());
447 if (num_steps_ == 0) {
448 CHECK_EQ(current_bucket_sizes_.size(),
static_cast<size_t>(0));
449 current_bucket_sizes_ = computeBucketSizes();
451 CHECK_EQ(current_bucket_sizes_.size(), num_dims_);
452 std::vector<double> inverse_bucket_sizes;
453 for (
const auto s : current_bucket_sizes_) {
454 inverse_bucket_sizes.emplace_back(1.0 / s);
456 return inverse_bucket_sizes;
461 for (
const auto& t : bucket_thresholds_) {
462 if (t < min_threshold_) {
470 if (table_tuple_count_ == 0) {
471 return std::vector<double>(num_dims_, 0);
474 effective_memory_level_,
475 columns_per_device_.front().join_columns[0],
476 columns_per_device_.front().join_column_types[0],
482 if (!current_bucket_sizes_.empty()) {
483 CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
484 bucket_thresholds_ = current_bucket_sizes_;
485 for (
auto& t : bucket_thresholds_) {
489 if (bucketThresholdsBelowMinThreshold()) {
490 VLOG(1) <<
"Aborting tuning for bounding box intersection as at least one bucket "
491 "size is below min threshold";
494 const auto next_bucket_sizes = computeBucketSizes();
495 if (next_bucket_sizes == current_bucket_sizes_) {
496 VLOG(1) <<
"Aborting tuning for bounding box intersection as bucket size is no "
501 current_bucket_sizes_ = next_bucket_sizes;
507 if (!current_bucket_sizes_.empty()) {
508 CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
509 bucket_thresholds_ = current_bucket_sizes_;
512 for (
auto& t : bucket_thresholds_) {
522 current_bucket_sizes_ = bucket_thresholds_;
529 size_t num_steps_{0};
544 os <<
"Step Num: " << tuner.
num_steps_ <<
", Threshold: " << std::fixed <<
"("
546 <<
", Step Size: " << std::fixed << tuner.
step_ <<
", Min: " << std::fixed
556 const auto& query_info =
561 <<
", table_id: " << table_id;
562 if (query_info.fragments.empty()) {
567 std::optional<double> bbox_intersect_threshold_override;
569 auto skip_hashtable_caching =
false;
571 VLOG(1) <<
"Setting bounding box intersection bucket threshold "
572 "\'bbox_intersect_bucket_threshold\' via "
578 std::ostringstream oss;
579 oss <<
"User requests to change a threshold \'bbox_intersect_max_table_size_bytes\' "
582 if (!bbox_intersect_threshold_override.has_value()) {
583 oss <<
": " << bbox_intersect_max_table_size_bytes <<
" -> "
587 oss <<
", but is skipped since the query hint also changes the threshold "
588 "\'bbox_intersect_bucket_threshold\'";
590 VLOG(1) << oss.str();
593 VLOG(1) <<
"User requests to skip caching join hashtable for bounding box "
594 "intersection and its tuned "
595 "parameters for this query";
596 skip_hashtable_caching =
true;
599 VLOG(1) <<
"User requests to change a threshold \'bbox_intersect_keys_per_bin\' via "
602 << bbox_intersect_target_entries_per_bin <<
" -> "
612 auto allow_gpu_hashtable_build =
615 if (allow_gpu_hashtable_build) {
616 if (data_mgr->gpusPresent() &&
618 VLOG(1) <<
"A user forces to build GPU hash table for bounding box intersection";
620 allow_gpu_hashtable_build =
false;
621 VLOG(1) <<
"A user forces to build GPU hash table for bounding box intersection "
622 "but we skip it since either GPU is not presented or CPU execution mode "
627 std::vector<ColumnsForDevice> columns_per_device;
628 std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
630 allow_gpu_hashtable_build) {
631 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
632 dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
637 std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
639 size_t total_num_tuples = 0;
640 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
641 fragments_per_device.emplace_back(
644 : query_info.fragments);
645 const size_t crt_num_tuples =
647 fragments_per_device.back().end(),
649 [](
const auto& sum,
const auto& fragment) {
650 return sum + fragment.getNumTuples();
652 total_num_tuples += crt_num_tuples;
653 const auto columns_for_device =
657 allow_gpu_hashtable_build
658 ? dev_buff_owners[device_id].get()
660 columns_per_device.push_back(columns_for_device);
664 auto hashtable_access_path_info =
672 fragments_per_device,
676 table_keys_ = hashtable_access_path_info.table_keys;
678 auto get_inner_table_key = [
this]() {
680 return col_var->getTableKey();
684 const auto& table_key = get_inner_table_key();
690 if (bbox_intersect_threshold_override) {
692 BucketSizeTuner tuner(*bbox_intersect_threshold_override,
700 const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
702 auto [entry_count, emitted_keys_count] =
704 inverse_bucket_sizes,
706 bbox_intersect_max_table_size_bytes,
707 *bbox_intersect_threshold_override);
713 *bbox_intersect_threshold_override,
714 inverse_bucket_sizes,
715 fragments_per_device,
723 skip_hashtable_caching,
724 bbox_intersect_max_table_size_bytes,
725 *bbox_intersect_threshold_override);
727 double bbox_intersect_bucket_threshold = std::numeric_limits<double>::max();
729 bbox_intersect_bucket_threshold,
731 fragments_per_device,
733 std::vector<size_t> per_device_chunk_key;
735 get_inner_table_key().table_id > 0) {
736 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
741 per_device_chunk_key.push_back(chunk_key_hash);
744 columns_per_device.front().join_columns.front().num_elems,
747 bbox_intersect_max_table_size_bytes,
748 bbox_intersect_bucket_threshold,
757 hashtable_cache_key_.front(),
760 if (cached_bucket_threshold) {
761 bbox_intersect_bucket_threshold = cached_bucket_threshold->bucket_threshold;
762 auto inverse_bucket_sizes = cached_bucket_threshold->bucket_sizes;
764 bbox_intersect_bucket_threshold,
765 inverse_bucket_sizes);
767 bbox_intersect_bucket_threshold,
768 inverse_bucket_sizes,
769 fragments_per_device,
772 if (
auto hash_table =
783 VLOG(1) <<
"Using cached hash table bucket size";
789 hash_table->getEntryCount(),
790 hash_table->getEmittedKeysCount(),
791 skip_hashtable_caching,
792 bbox_intersect_max_table_size_bytes,
793 bbox_intersect_bucket_threshold);
795 VLOG(1) <<
"Computing bucket size for cached bucket threshold";
797 BucketSizeTuner tuner(bbox_intersect_bucket_threshold,
806 const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
808 auto [entry_count, emitted_keys_count] =
810 inverse_bucket_sizes,
812 bbox_intersect_max_table_size_bytes,
813 bbox_intersect_bucket_threshold);
817 bbox_intersect_bucket_threshold,
818 inverse_bucket_sizes,
819 fragments_per_device,
828 skip_hashtable_caching,
829 bbox_intersect_max_table_size_bytes,
830 bbox_intersect_bucket_threshold);
834 BucketSizeTuner tuner(
835 bbox_intersect_bucket_threshold,
844 VLOG(1) <<
"Running auto tune logic for bounding box intersection with parameters: "
848 TuningState tuning_state(bbox_intersect_max_table_size_bytes,
849 bbox_intersect_target_entries_per_bin);
850 while (tuner.tuneOneStep(tuning_state.tuning_direction)) {
851 const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
853 const auto [crt_entry_count, crt_emitted_keys_count] =
855 inverse_bucket_sizes,
857 tuning_state.bbox_intersect_max_table_size_bytes,
858 tuning_state.chosen_bbox_intersect_threshold);
860 inverse_bucket_sizes.size(), crt_emitted_keys_count, crt_entry_count);
861 HashTableProps crt_props(crt_entry_count,
862 crt_emitted_keys_count,
864 inverse_bucket_sizes);
865 VLOG(1) <<
"Tuner output: " << tuner <<
" with properties " << crt_props;
867 const auto should_continue = tuning_state(crt_props, tuner.getMinBucketSize());
869 tuning_state.crt_props.bucket_sizes, columns_per_device, device_count_);
870 if (!should_continue) {
875 const auto& crt_props = tuning_state.crt_props;
878 const size_t hash_table_size =
880 crt_props.emitted_keys_count,
881 crt_props.entry_count);
882 CHECK_EQ(crt_props.hash_table_size, hash_table_size);
885 hash_table_size > bbox_intersect_max_table_size_bytes) {
886 VLOG(1) <<
"Could not find suitable parameters to create hash "
887 "table for bounding box intersectionunder max allowed size ("
888 << bbox_intersect_max_table_size_bytes <<
") bytes.";
892 VLOG(1) <<
"Final tuner output: " << tuner <<
" with properties " << crt_props;
894 VLOG(1) <<
"Final bucket sizes: ";
896 VLOG(1) <<
"dim[" << dim
899 CHECK_GE(tuning_state.chosen_bbox_intersect_threshold,
double(0));
901 tuning_state.chosen_bbox_intersect_threshold,
903 fragments_per_device,
905 const auto candidate_auto_tuner_cache_key = hashtable_cache_key_.front();
906 if (skip_hashtable_caching) {
907 VLOG(1) <<
"Skip to add tuned parameters to auto tuner";
910 tuning_state.chosen_bbox_intersect_threshold,
919 bbox_intersect_bucket_threshold = tuning_state.chosen_bbox_intersect_threshold;
924 crt_props.entry_count,
925 crt_props.emitted_keys_count,
926 skip_hashtable_caching,
927 bbox_intersect_max_table_size_bytes,
928 bbox_intersect_bucket_threshold);
934 size_t number_of_dimensions,
935 size_t emitted_keys_count,
936 size_t entry_count)
const {
938 const auto key_component_count = number_of_dimensions;
939 const auto entry_size = key_component_count * key_component_width;
940 const auto keys_for_all_rows = emitted_keys_count;
941 const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
942 const size_t hash_table_size =
943 entry_size * entry_count + one_to_many_hash_entries *
sizeof(int32_t);
944 return hash_table_size;
948 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
953 std::vector<JoinColumn> join_columns;
954 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
955 std::vector<JoinColumnTypeInfo> join_column_types;
956 std::vector<std::shared_ptr<void>> malloc_owner;
958 const auto inner_col = inner_outer_pair.first;
960 if (inner_cd && inner_cd->isVirtualCol) {
965 effective_memory_level,
972 const auto& ti = inner_col->get_type_info();
976 inline_int_null_value<int64_t>(),
981 <<
"Bounding box intersection currently only supported for arrays.";
983 return {join_columns, join_column_types, chunks_owner, {}, malloc_owner};
987 const size_t shard_count,
988 const std::vector<double>& inverse_bucket_sizes_for_dimension,
989 std::vector<ColumnsForDevice>& columns_per_device,
990 const size_t chosen_max_hashtable_size,
991 const double chosen_bucket_threshold) {
992 CHECK(!inverse_bucket_sizes_for_dimension.empty());
993 const auto [tuple_count, emitted_keys_count] =
996 chosen_max_hashtable_size,
997 chosen_bucket_threshold);
998 const auto entry_count = 2 * std::max(tuple_count,
size_t(1));
1000 return std::make_pair(
1002 emitted_keys_count);
1006 const std::vector<double>& inverse_bucket_sizes_for_dimension,
1007 std::vector<ColumnsForDevice>& columns_per_device,
1008 const size_t chosen_max_hashtable_size,
1009 const double chosen_bucket_threshold) {
1021 const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
1023 CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
1024 if (columns_per_device.front().join_columns.front().num_elems == 0) {
1025 return std::make_pair(0, 0);
1031 for (
size_t device_id = 0; device_id < columns_per_device.size(); ++device_id) {
1032 auto& columns_for_device = columns_per_device[device_id];
1033 columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
1038 CHECK_EQ(columns_per_device.front().join_columns.size(),
1039 columns_per_device.front().join_buckets.size());
1043 const auto cached_count_info =
1047 if (cached_count_info) {
1048 VLOG(1) <<
"Using a cached tuple count: " << cached_count_info->first
1049 <<
", emitted keys count: " << cached_count_info->second;
1050 return *cached_count_info;
1053 std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
1054 auto hll_result = &hll_buffer_all_cpus[0];
1056 std::vector<int32_t> num_keys_for_row;
1058 num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
1063 count_distinct_desc.bitmap_sz_bits,
1065 columns_per_device.front().join_columns,
1066 columns_per_device.front().join_column_types,
1067 columns_per_device.front().join_buckets,
1069 for (
int i = 1; i < thread_count; ++i) {
1071 hll_result + i * padded_size_bytes,
1072 size_t(1) << count_distinct_desc.bitmap_sz_bits);
1074 return std::make_pair(
1075 hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1076 static_cast<size_t>(num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0));
1079 auto data_mgr =
executor_->getDataMgr();
1080 std::vector<std::vector<uint8_t>> host_hll_buffers(
device_count_);
1081 for (
auto& host_hll_buffer : host_hll_buffers) {
1082 host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
1084 std::vector<size_t> emitted_keys_count_device_threads(
device_count_, 0);
1085 std::vector<std::future<void>> approximate_distinct_device_threads;
1086 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
1087 approximate_distinct_device_threads.emplace_back(
std::async(
1090 &columns_per_device,
1091 &count_distinct_desc,
1094 &emitted_keys_count_device_threads] {
1095 auto allocator = std::make_unique<CudaAllocator>(
1097 auto device_hll_buffer =
1098 allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
1099 data_mgr->getCudaMgr()->zeroDeviceMem(
1101 count_distinct_desc.bitmapPaddedSizeBytes(),
1104 const auto& columns_for_device = columns_per_device[device_id];
1106 columns_for_device.join_columns, *allocator);
1108 CHECK_GT(columns_for_device.join_buckets.size(), 0u);
1109 const auto& inverse_bucket_sizes_for_dimension =
1110 columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
1111 auto inverse_bucket_sizes_gpu = allocator->alloc(
1112 inverse_bucket_sizes_for_dimension.size() *
sizeof(double));
1113 allocator->copyToDevice(
1114 inverse_bucket_sizes_gpu,
1115 inverse_bucket_sizes_for_dimension.data(),
1116 inverse_bucket_sizes_for_dimension.size() *
sizeof(double));
1117 const size_t row_counts_buffer_sz =
1118 columns_per_device.front().join_columns[0].num_elems *
sizeof(int32_t);
1119 auto row_counts_buffer = allocator->alloc(row_counts_buffer_sz);
1120 data_mgr->getCudaMgr()->zeroDeviceMem(
1122 row_counts_buffer_sz,
1126 inverse_bucket_sizes_for_dimension.size(),
1128 reinterpret_cast<double*
>(inverse_bucket_sizes_gpu));
1129 const auto key_handler_gpu =
1132 reinterpret_cast<uint8_t*>(device_hll_buffer),
1133 count_distinct_desc.bitmap_sz_bits,
1134 reinterpret_cast<int32_t*>(row_counts_buffer),
1136 columns_for_device.join_columns[0].num_elems);
1138 auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
1139 allocator->copyFromDevice(
1140 &host_emitted_keys_count,
1142 (columns_per_device.front().join_columns[0].num_elems - 1) *
1146 auto& host_hll_buffer = host_hll_buffers[device_id];
1147 allocator->copyFromDevice(&host_hll_buffer[0],
1149 count_distinct_desc.bitmapPaddedSizeBytes());
1152 for (
auto& child : approximate_distinct_device_threads) {
1156 auto& result_hll_buffer = host_hll_buffers.front();
1157 auto hll_result =
reinterpret_cast<int32_t*
>(&result_hll_buffer[0]);
1158 for (
int device_id = 1; device_id <
device_count_; ++device_id) {
1159 auto& host_hll_buffer = host_hll_buffers[device_id];
1161 reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
1162 size_t(1) << count_distinct_desc.bitmap_sz_bits);
1164 const size_t emitted_keys_count =
1166 emitted_keys_count_device_threads.end(),
1168 return std::make_pair(
hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1169 emitted_keys_count);
1177 const std::vector<double>& inverse_bucket_sizes,
1178 std::vector<ColumnsForDevice>& columns_per_device,
1179 const size_t device_count) {
1184 CHECK_EQ(columns_per_device.size(),
static_cast<size_t>(device_count));
1185 for (
size_t device_id = 0; device_id < device_count; ++device_id) {
1186 auto& columns_for_device = columns_per_device[device_id];
1221 }
catch (
const std::exception& e) {
1222 VLOG(1) <<
"Caught exception while building baseline hash table for bounding box "
1230 std::vector<ColumnsForDevice>& columns_per_device,
1233 const size_t shard_count,
1234 const size_t entry_count,
1235 const size_t emitted_keys_count,
1236 const bool skip_hashtable_caching,
1237 const size_t chosen_max_hashtable_size,
1238 const double chosen_bucket_threshold) {
1239 std::vector<std::future<void>> init_threads;
1246 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
1247 const auto fragments =
1254 columns_per_device[device_id],
1258 skip_hashtable_caching,
1262 for (
auto& init_thread : init_threads) {
1265 for (
auto& init_thread : init_threads) {
1273 const size_t entry_count,
1274 const size_t emitted_keys_count,
1275 const bool skip_hashtable_caching,
1276 const int device_id,
1291 VLOG(1) <<
"Building join hash table for bounding box intersection on CPU.";
1295 hash_table_entry_info,
1296 skip_hashtable_caching);
1301 auto gpu_hash_table = copyCpuHashTableToGpu(hash_table, device_id);
1315 auto hash_table = initHashTableOnGpu(columns_for_device.
join_columns,
1318 hash_table_entry_info,
1329 const std::vector<JoinColumn>& join_columns,
1330 const std::vector<JoinColumnTypeInfo>& join_column_types,
1331 const std::vector<JoinBucketInfo>& join_bucket_info,
1333 const bool skip_hashtable_caching) {
1335 decltype(std::chrono::steady_clock::now()) ts1, ts2;
1336 ts1 = std::chrono::steady_clock::now();
1337 CHECK(!join_columns.empty());
1338 CHECK(!join_bucket_info.empty());
1341 if (
auto generic_hash_table =
1345 if (
auto hash_table =
1346 std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
1347 VLOG(1) <<
"Using cached CPU hash table for initialization.";
1356 if (hash_table_layout == hash_table->getLayout()) {
1362 const auto key_component_count =
1363 join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
1366 key_component_count,
1368 join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
1371 dummy_str_proxy_translation_maps_ptrs_and_offsets;
1378 dummy_str_proxy_translation_maps_ptrs_and_offsets,
1379 hash_table_entry_info,
1383 ts2 = std::chrono::steady_clock::now();
1385 throw HashJoinFail(std::string(
"Unrecognized error when initializing CPU hash table "
1386 "for bounding box intersection(") +
1389 std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
1390 if (skip_hashtable_caching) {
1391 VLOG(1) <<
"Skip to cache join hashtable for bounding box intersection";
1393 auto hashtable_build_time =
1394 std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
1399 hashtable_build_time);
1406 std::shared_ptr<BaselineHashTable> BoundingBoxIntersectJoinHashTable::initHashTableOnGpu(
1407 const std::vector<JoinColumn>& join_columns,
1408 const std::vector<JoinColumnTypeInfo>& join_column_types,
1409 const std::vector<JoinBucketInfo>& join_bucket_info,
1411 const size_t device_id) {
1414 VLOG(1) <<
"Building join hash table for bounding box intersection on GPU.";
1417 auto data_mgr =
executor_->getDataMgr();
1422 CHECK(!join_bucket_info.empty());
1423 auto& inverse_bucket_sizes_for_dimension =
1424 join_bucket_info[0].inverse_bucket_sizes_for_dimension;
1426 inverse_bucket_sizes_for_dimension, allocator);
1427 const auto key_handler =
1430 inverse_bucket_sizes_gpu);
1435 hash_table_entry_info,
1440 throw HashJoinFail(std::string(
"Unrecognized error when initializing GPU hash table "
1441 "for bounding box intersection (") +
1447 std::shared_ptr<BaselineHashTable>
1448 BoundingBoxIntersectJoinHashTable::copyCpuHashTableToGpu(
1449 std::shared_ptr<BaselineHashTable>& cpu_hash_table,
1450 const size_t device_id) {
1453 auto data_mgr =
executor_->getDataMgr();
1459 std::shared_ptr<BaselineHashTable> gpu_hash_table = gpu_builder.
getHashTable();
1460 CHECK(gpu_hash_table);
1461 auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1462 CHECK(gpu_buffer_ptr);
1466 auto device_allocator = std::make_unique<CudaAllocator>(
1468 device_allocator->copyToDevice(
1470 cpu_hash_table->getCpuBuffer(),
1472 return gpu_hash_table;
1477 #define LL_CONTEXT executor_->cgen_state_->context_
1478 #define LL_BUILDER executor_->cgen_state_->ir_builder_
1479 #define LL_INT(v) executor_->cgen_state_->llInt(v)
1480 #define LL_FP(v) executor_->cgen_state_->llFp(v)
1481 #define ROW_FUNC executor_->cgen_state_->row_func_
1486 CHECK(key_component_width == 4 || key_component_width == 8);
1488 llvm::Value* key_buff_lv{
nullptr};
1489 switch (key_component_width) {
1503 const auto outer_geo = inner_outer_pair.second;
1504 const auto outer_geo_ti = outer_geo->get_type_info();
1506 llvm::Value* arr_ptr =
nullptr;
1510 if (outer_geo_ti.is_geometry()) {
1515 if (
const auto outer_geo_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_geo)) {
1516 const auto outer_geo_col_lvs = code_generator.
codegen(outer_geo_col,
true, co);
1517 CHECK_EQ(outer_geo_col_lvs.size(), size_t(1));
1518 auto column_key = outer_geo_col->getColumnKey();
1519 column_key.column_id = column_key.column_id + 1;
1523 const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1525 llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1526 {outer_geo_col_lvs.front(), code_generator.
posArg(outer_geo_col)});
1527 CHECK(coords_cd->columnType.get_elem_type().get_type() ==
kTINYINT)
1528 <<
"Bounding box intersection only supports TINYINT coordinates columns.";
1530 coords_cd->columnType.get_elem_type());
1531 }
else if (
const auto outer_geo_function_operator =
1532 dynamic_cast<const Analyzer::GeoOperator*>(outer_geo)) {
1534 const auto outer_geo_function_operator_lvs =
1535 code_generator.
codegen(outer_geo_function_operator,
true, co);
1536 CHECK_EQ(outer_geo_function_operator_lvs.size(), size_t(2));
1537 arr_ptr = outer_geo_function_operator_lvs.front();
1538 }
else if (
const auto outer_geo_expr =
1539 dynamic_cast<const Analyzer::GeoExpr*>(outer_geo)) {
1542 }
else if (outer_geo_ti.is_fixlen_array()) {
1544 const auto outer_geo_cast_coord_array =
1548 outer_geo_cast_coord_array->get_operand());
1549 CHECK(outer_geo_coord_array);
1550 CHECK(outer_geo_coord_array->isLocalAlloc());
1551 CHECK_EQ(outer_geo_coord_array->getElementCount(), 2);
1555 CHECK_EQ(outer_geo_ti.get_size(), int(2 * elem_size));
1556 const auto outer_geo_constructed_lvs = code_generator.codegen(outer_geo,
true, co);
1558 const auto array_ptr = outer_geo_constructed_lvs.front();
1560 array_ptr->getType()->getScalarType()->getPointerElementType(),
1567 <<
"Bounding box intersection currently only supports geospatial columns and "
1568 "constructed points.";
1571 for (
size_t i = 0; i < 2; i++) {
1572 const auto key_comp_dest_lv =
LL_BUILDER.CreateGEP(
1573 key_buff_lv->getType()->getScalarType()->getPointerElementType(),
1581 ? executor_->cgen_state_->emitExternalCall(
1582 "get_bucket_key_for_range_compressed",
1584 {arr_ptr,
LL_INT(i),
LL_FP(inverse_bucket_sizes_for_dimension_[i])})
1585 : executor_->cgen_state_->emitExternalCall(
1586 "get_bucket_key_for_range_double",
1588 {arr_ptr,
LL_INT(i),
LL_FP(inverse_bucket_sizes_for_dimension_[i])});
1591 LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
1600 CHECK(key_component_width == 4 || key_component_width == 8);
1605 VLOG(1) <<
"Performing codgen for ManyToMany";
1607 const auto outer_col = inner_outer_pair.second;
1610 const auto col_lvs = code_generator.
codegen(outer_col,
true, co);
1611 CHECK_EQ(col_lvs.size(), size_t(1));
1614 CHECK(outer_col_var);
1615 const auto coords_cd =
1619 const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1621 llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1622 {col_lvs.front(), code_generator.
posArg(outer_col)});
1628 array_ptr->setName(
"array_ptr");
1630 auto num_keys_lv = executor_->cgen_state_->emitExternalCall(
1631 "get_num_buckets_for_bounds",
1635 LL_FP(inverse_bucket_sizes_for_dimension_[0]),
1636 LL_FP(inverse_bucket_sizes_for_dimension_[1])});
1637 num_keys_lv->setName(
"num_keys_lv");
1639 return {num_keys_lv, array_ptr};
1644 const size_t index) {
1647 VLOG(1) <<
"Building codegenMatchingSet for ManyToMany";
1649 CHECK(key_component_width == 4 || key_component_width == 8);
1652 const auto composite_dict_ptr_type =
1653 llvm::Type::getIntNPtrTy(
LL_CONTEXT, key_component_width * 8);
1654 const auto composite_key_dict =
1655 hash_ptr->getType()->isPointerTy()
1656 ?
LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1657 :
LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1660 auto one_to_many_ptr = hash_ptr;
1662 if (one_to_many_ptr->getType()->isPointerTy()) {
1666 CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1674 const auto out_arr_lv =
LL_BUILDER.CreateAlloca(arr_type);
1675 out_arr_lv->setName(
"out_arr");
1677 const auto casted_out_arr_lv =
1678 LL_BUILDER.CreatePointerCast(out_arr_lv, arr_type->getPointerTo());
1680 const auto element_ptr =
LL_BUILDER.CreateGEP(arr_type, casted_out_arr_lv,
LL_INT(0));
1682 auto rowid_ptr_i32 =
1685 const auto error_code_ptr =
LL_BUILDER.CreateAlloca(
1689 const auto candidate_count_lv = executor_->cgen_state_->emitExternalCall(
1690 "get_candidate_rows",
1695 many_to_many_args[1],
1699 many_to_many_args[0],
1700 LL_INT(key_component_count),
1703 LL_INT(composite_key_dict_size),
1705 LL_INT(int32_t(heavyai::ErrorCode::BBOX_OVERLAPS_LIMIT_EXCEEDED))});
1707 const auto slot_lv =
LL_INT(int64_t(0));
1709 error_code_ptr->getType()->getPointerElementType(), error_code_ptr);
1710 return {rowid_ptr_i32, candidate_count_lv, slot_lv, error_code_lv};
1712 VLOG(1) <<
"Building codegenMatchingSet for Baseline";
1715 CHECK(key_component_width == 4 || key_component_width == 8);
1719 const auto composite_dict_ptr_type =
1720 llvm::Type::getIntNPtrTy(
LL_CONTEXT, key_component_width * 8);
1721 const auto composite_key_dict =
1722 hash_ptr->getType()->isPointerTy()
1723 ?
LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1724 :
LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1726 const auto key = executor_->cgen_state_->emitExternalCall(
1727 "get_composite_key_index_" +
std::to_string(key_component_width * 8),
1730 LL_INT(key_component_count),
1733 auto one_to_many_ptr = hash_ptr;
1734 if (one_to_many_ptr->getType()->isPointerTy()) {
1738 CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1744 std::vector<llvm::Value*>{
1758 const int device_id,
1767 auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1769 std::unique_ptr<int8_t[]> buffer_copy;
1771 buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1773 auto data_mgr = executor_->getDataMgr();
1774 auto device_allocator = std::make_unique<CudaAllocator>(
1777 device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1779 auto ptr1 = buffer_copy ? buffer_copy.get() :
reinterpret_cast<const int8_t*
>(buffer);
1781 auto ptr1 =
reinterpret_cast<const int8_t*
>(buffer);
1793 hash_table->getEntryCount(),
1804 const int device_id)
const {
1808 auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1810 std::unique_ptr<int8_t[]> buffer_copy;
1812 buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1814 auto data_mgr = executor_->getDataMgr();
1815 auto allocator = std::make_unique<CudaAllocator>(
1818 allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1820 auto ptr1 = buffer_copy ? buffer_copy.get() :
reinterpret_cast<const int8_t*
>(buffer);
1822 auto ptr1 =
reinterpret_cast<const int8_t*
>(buffer);
1830 hash_table->getEntryCount(),
1839 const std::vector<InnerOuter>& inner_outer_pairs)
const {
1842 this->executor_->getDataMgr()->gpusPresent() &&
1864 VLOG(1) <<
"Checking CPU hash table cache.";
1868 auto cached_hashtable =
1869 hash_table_cache_->getItemFromCache(key, item_type, device_identifier, meta_info);
1870 if (cached_hashtable) {
1871 return cached_hashtable;
1876 std::optional<std::pair<size_t, size_t>>
1884 auto cached_hashtable =
1885 hash_table_cache_->getItemFromCache(key, item_type, device_identifier, metaInfo);
1886 if (cached_hashtable) {
1887 return std::make_pair(cached_hashtable->getEntryCount() / 2,
1888 cached_hashtable->getEmittedKeysCount());
1890 return std::nullopt;
1896 std::shared_ptr<HashTable> hashtable_ptr,
1898 size_t hashtable_building_time) {
1900 CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
1910 hashtable_building_time,
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
bool previousIterationValid() const
std::vector< double > compute_bucket_sizes(const std::vector< double > &bucket_thresholds, const Data_Namespace::MemoryLevel effective_memory_level, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
static std::unique_ptr< BoundingBoxIntersectTuningParamRecycler > auto_tuner_cache_
size_t getKeyComponentCount() const
bool tuneOneStep(const TuningState::TuningDirection tuning_direction, const double step_overide)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
bool hashTableTooBig() const
void setBoundingBoxIntersectionMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
HashtableCacheMetaInfo hashtable_cache_meta_info_
std::vector< double > computeBucketSizes() const
auto getMinBucketSize() const
std::mutex cpu_hash_table_buff_mutex_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::vector< double > bucket_thresholds_
llvm::Value * codegenKey(const CompilationOptions &)
size_t getKeyComponentWidth() const
shared::TableKey getInnerTableId() const noexceptoverride
const std::vector< ColumnsForDevice > & columns_per_device_
std::vector< ChunkKey > cache_key_chunks
std::vector< QueryPlanHash > hashtable_cache_key_
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
std::unordered_set< size_t > table_keys_
double bbox_intersect_keys_per_bin
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
size_t offsetBufferOff() const noexceptoverride
std::vector< double > current_bucket_sizes_
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
void reify(const HashType preferred_layout)
bool bucketThresholdsBelowMinThreshold() const
llvm::Value * posArg(const Analyzer::Expr *) const
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
const std::shared_ptr< Analyzer::BinOper > condition_
llvm::Value * castArrayPointer(llvm::Value *ptr, const SQLTypeInfo &elem_ti)
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const BaselineHashTableEntryInfo hash_table_entry_info, const bool skip_hashtable_caching)
const ColumnDescriptor * get_metadata_for_column(const ::shared::ColumnKey &column_key)
const InputTableInfo & get_inner_query_info(const shared::TableKey &inner_table_key, const std::vector< InputTableInfo > &query_infos)
std::vector< double > inverse_bucket_sizes_for_dimension_
double g_bbox_intersect_target_entries_per_bin
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
std::vector< FragmentInfo > fragments
CompositeKeyInfo composite_key_info_
std::optional< HashType > layout_override_
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
void allocateDeviceMemory(const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
double chosen_bbox_intersect_bucket_threshold_
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const JoinType join_type, const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
size_t shardCount() const
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
HashTableBuildDagMap hashtable_build_dag_map_
size_t crt_reverse_search_iteration
const double min_threshold_
const std::vector< JoinColumnTypeInfo > join_column_types
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds)
bool isBitwiseEq() const override
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
bool keysPerBinIncreasing() const
future< Result > async(Fn &&fn, Args &&...args)
std::optional< BoundingBoxIntersectMetaInfo > getBoundingBoxIntersectMetaInfo()
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::vector< double > correct_uninitialized_bucket_sizes_to_thresholds(const std::vector< double > &bucket_sizes, const std::vector< double > &bucket_thresholds, const double initial_value)
const Executor * executor_
HashType getHashTableLayout() const
size_t bbox_intersect_max_table_size_bytes
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
std::vector< double > getInverseBucketSizes()
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
const Data_Namespace::MemoryLevel memory_level_
double bbox_intersect_bucket_threshold
static std::unique_ptr< HashtableRecycler > hash_table_cache_
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &bucket_sizes, std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &fragments_per_device, int device_count)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
static std::shared_ptr< BoundingBoxIntersectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query's parse tree etc.
const std::vector< InnerOuter > & inner_outer_pairs_
double chosen_bbox_intersect_threshold
const std::vector< InputTableInfo > & query_infos_
static constexpr size_t MAX_NUM_HASH_ENTRIES
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
DEVICE auto accumulate(ARGS &&...args)
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
BucketSizeTuner(const double bucket_threshold, const double step, const double min_threshold, const Data_Namespace::MemoryLevel effective_memory_level, const std::vector< ColumnsForDevice > &columns_per_device, const std::vector< InnerOuter > &inner_outer_pairs, const size_t table_tuple_count, const Executor *executor)
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForBoundingBoxIntersection &info)
size_t payloadBufferOff() const noexceptoverride
HashTableProps prev_props
void approximate_distinct_tuples_on_device_bbox_intersect(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const BoundingBoxIntersectKeyHandler *key_handler, const int64_t num_elems)
static std::shared_ptr< RangeJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const Analyzer::RangeOper *range_expr, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
TuningState(const size_t bbox_intersect_max_table_size_bytes, const double bbox_intersect_target_entries_per_bin)
HashTable * getHashTableForDevice(const size_t device_id) const
virtual shared::TableKey getInnerTableId() const noexcept=0
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
HashType getHashType() const noexceptoverride
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
std::vector< double > bucket_sizes
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
std::vector< InnerOuter > inner_outer_pairs_
size_t chosen_bbox_intersect_max_table_size_bytes_
size_t getEntryCount() const
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
LocalIdsScopeGuard setNewThreadId() const
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
bool isHintRegistered(const QueryHint hint) const
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const StrProxyTranslationMapsPtrsAndOffsets &str_proxy_translation_maps_ptrs_and_offsets, const BaselineHashTableEntryInfo hash_table_entry_info, const JoinType join_type, const Executor *executor, const RegisteredQueryHint &query_hint)
bool operator()(const HashTableProps &new_props, const bool new_bbox_intersect_threshold)
virtual void reifyWithLayout(const HashType layout)
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
QueryEngine enum classes with minimal #include files.
bool tuneLargerOneStep(const double step_overide)
const JoinType join_type_
void approximate_distinct_tuples_bbox_intersect(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
bool keysPerBinUnderThreshold() const
size_t bbox_intersect_max_size
CUstream getQueryEngineCudaStreamForDevice(int device_num)
bool force_baseline_hash_join
const Data_Namespace::MemoryLevel effective_memory_level_
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
size_t getNumTuplesUpperBound() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
RegisteredQueryHint query_hints_
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
#define DEBUG_TIMER(name)
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const TemporaryTables *temporary_tables)
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const int device_id, const logger::ThreadLocalIds parent_thread_local_ids)
double bbox_intersect_target_entries_per_bin
size_t countBufferOff() const noexceptoverride
bool tuneOneStep(const TuningState::TuningDirection tuning_direction)
constexpr int32_t kMaxBBoxOverlapsCount
bool bbox_intersect_allow_gpu_build
ColumnCacheMap & column_cache_
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
std::vector< JoinBucketInfo > join_buckets
static HashTableProps invalid()
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
void copyFromDevice(void *host_dst, const void *device_src, const size_t num_bytes) const override
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
llvm::ArrayType * get_int_array_type(int const width, int count, llvm::LLVMContext &context)
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
const size_t table_tuple_count_
ThreadLocalIds thread_local_ids()
const std::vector< JoinColumn > join_columns
bool force_one_to_many_hash_join
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
size_t g_bbox_intersect_max_table_size_bytes
size_t emitted_keys_count
bool tuneSmallerOneStep(const double step_overide)
HashTableProps(const size_t entry_count, const size_t emitted_keys_count, const size_t hash_table_size, const std::vector< double > &bucket_sizes)
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
size_t getComponentBufferSize() const noexceptoverride