19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
34 #include <type_traits>
91 bool g_enable_filter_function{true};
92 unsigned g_dynamic_watchdog_time_limit{10000};
93 bool g_allow_cpu_retry{true};
94 bool g_allow_query_step_cpu_retry{true};
95 bool g_null_div_by_zero{false};
96 unsigned g_trivial_loop_join_threshold{1000};
97 bool g_from_table_reordering{true};
98 bool g_inner_join_fragment_skipping{true};
99 extern bool g_enable_smem_group_by;
100 extern std::unique_ptr<llvm::Module> udf_gpu_module;
101 extern std::unique_ptr<llvm::Module> udf_cpu_module;
102 bool g_enable_filter_push_down{false};
103 float g_filter_push_down_low_frac{-1.0f};
104 float g_filter_push_down_high_frac{-1.0f};
105 size_t g_filter_push_down_passing_row_ubound{0};
106 bool g_enable_columnar_output{false};
107 bool g_enable_left_join_filter_hoisting{true};
108 bool g_optimize_row_initialization{true};
109 bool g_enable_bbox_intersect_hashjoin{true};
110 size_t g_num_tuple_threshold_switch_to_baseline{100000};
111 size_t g_ratio_num_hash_entry_to_num_tuple_switch_to_baseline{100};
112 bool g_enable_distance_rangejoin{true};
113 bool g_enable_hashjoin_many_to_many{true};
114 size_t g_bbox_intersect_max_table_size_bytes{1024 * 1024 * 1024};
115 double g_bbox_intersect_target_entries_per_bin{1.3};
116 bool g_strip_join_covered_quals{false};
117 size_t g_constrained_by_in_threshold{10};
118 size_t g_default_max_groups_buffer_entry_guess{16384};
119 size_t g_big_group_threshold{g_default_max_groups_buffer_entry_guess};
120 bool g_enable_window_functions{true};
121 bool g_enable_table_functions{true};
122 bool g_enable_ml_functions{true};
123 bool g_restrict_ml_model_metadata_to_superusers{false};
124 bool g_enable_dev_table_functions{false};
125 bool g_enable_geo_ops_on_uncompressed_coords{true};
126 bool g_enable_rf_prop_table_functions{true};
127 bool g_allow_memory_status_log{true};
128 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
129 size_t g_min_memory_allocation_size{
130 256}; // minimum memory allocation required for projection query output buffer
131 // without pre-flight count
132 bool g_enable_bump_allocator{false};
133 double g_bump_allocator_step_reduction{0.75};
134 bool g_enable_direct_columnarization{true};
135 extern bool g_enable_string_functions;
136 bool g_enable_lazy_fetch{true};
137 bool g_enable_runtime_query_interrupt{true};
138 bool g_enable_non_kernel_time_query_interrupt{true};
139 bool g_use_estimator_result_cache{true};
140 unsigned g_pending_query_interrupt_freq{1000};
141 double g_running_query_interrupt_freq{0.1};
142 size_t g_gpu_smem_threshold{
143 4096}; // GPU shared memory threshold (in bytes), if larger
144 // buffer sizes are required we do not use GPU shared
145 // memory optimizations Setting this to 0 means unlimited
146 // (subject to other dynamically calculated caps)
147 bool g_enable_smem_grouped_non_count_agg{
148 true}; // enable use of shared memory when performing group-by with select non-count
150 bool g_enable_smem_non_grouped_agg{
151 true}; // enable optimizations for using GPU shared memory in implementation of
152 // non-grouped aggregates
153 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
154 // limits the allocation for the output buffer arena
155 // and data recycler test
156 size_t g_enable_parallel_linearization{
157 10000}; // # rows that we are trying to linearize varlen col in parallel
158 bool g_enable_data_recycler{true};
159 bool g_use_hashtable_cache{true};
160 bool g_use_query_resultset_cache{true};
161 bool g_use_chunk_metadata_cache{true};
162 bool g_allow_auto_resultset_caching{false};
163 bool g_allow_query_step_skipping{true};
164 size_t g_hashtable_cache_total_bytes{size_t(1) << 32};
165 size_t g_max_cacheable_hashtable_size_bytes{size_t(1) << 31};
166 size_t g_query_resultset_cache_total_bytes{size_t(1) << 32};
167 size_t g_max_cacheable_query_resultset_size_bytes{size_t(1) << 31};
168 size_t g_auto_resultset_caching_threshold{size_t(1) << 20};
169 bool g_optimize_cuda_block_and_grid_sizes{false};
171 size_t g_approx_quantile_buffer{1000};
172 size_t g_approx_quantile_centroids{300};
174 bool g_enable_automatic_ir_metadata{true};
176 size_t g_max_log_length{500};
178 bool g_enable_executor_resource_mgr{true};
180 double g_executor_resource_mgr_cpu_result_mem_ratio{0.8};
181 size_t g_executor_resource_mgr_cpu_result_mem_bytes{Executor::auto_cpu_mem_bytes};
182 double g_executor_resource_mgr_per_query_max_cpu_slots_ratio{0.9};
183 double g_executor_resource_mgr_per_query_max_cpu_result_mem_ratio{0.8};
185 // Todo: rework ConcurrentResourceGrantPolicy and ExecutorResourcePool to allow
186 // thresholds for concurrent oversubscription, rather than just boolean allowed/disallowed
187 bool g_executor_resource_mgr_allow_cpu_kernel_concurrency{true};
188 bool g_executor_resource_mgr_allow_cpu_gpu_kernel_concurrency{true};
189 // Whether a single query can oversubscribe CPU slots should be controlled with
190 // g_executor_resource_mgr_per_query_max_cpu_slots_ratio
191 bool g_executor_resource_mgr_allow_cpu_slot_oversubscription_concurrency{false};
192 // Whether a single query can oversubscribe CPU memory should be controlled with
193 // g_executor_resource_mgr_per_query_max_cpu_slots_ratio
194 bool g_executor_resource_mgr_allow_cpu_result_mem_oversubscription_concurrency{false};
195 double g_executor_resource_mgr_max_available_resource_use_ratio{0.8};
197 bool g_use_cpu_mem_pool_for_output_buffers{false};
199 extern bool g_cache_string_hash;
200 extern bool g_allow_memory_status_log;
202 int const Executor::max_gpu_count;
204 std::map<Executor::ExtModuleKinds, std::string> Executor::extension_module_sources;
206 extern std::unique_ptr<llvm::Module> read_llvm_module_from_bc_file(
207 const std::string& udf_ir_filename,
208 llvm::LLVMContext& ctx);
209 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_file(
210 const std::string& udf_ir_filename,
211 llvm::LLVMContext& ctx,
212 bool is_gpu = false);
213 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_string(
214 const std::string& udf_ir_string,
215 llvm::LLVMContext& ctx,
216 bool is_gpu = false);
219 // This function is notably different from that in RelAlgExecutor because it already
220 // expects SPI values and therefore needs to avoid that transformation.
221 void prepare_string_dictionaries(const std::unordered_set<PhysicalInput>& phys_inputs) {
222 for (const auto [col_id, table_id, db_id] : phys_inputs) {
223 foreign_storage::populate_string_dictionary(table_id, col_id, db_id);
227 bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter* fragmenter) {
228 const auto& fragments = fragmenter->getFragmentsForQuery().fragments;
229 // The fragmenter always returns at least one fragment, even when the table is empty.
230 return (fragments.size() == 1 && fragments[0].getChunkMetadataMap().empty());
234 namespace foreign_storage {
235 // Foreign tables skip the population of dictionaries during metadata scan. This function
236 // will populate a dictionary's missing entries by fetching any unpopulated chunks.
240 if (
const auto foreign_table = dynamic_cast<const ForeignTable*>(
241 catalog->getMetadataForTable(table_id,
false))) {
242 const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
243 if (col_desc->columnType.is_dict_encoded_type()) {
244 auto& fragmenter = foreign_table->fragmenter;
245 CHECK(fragmenter !=
nullptr);
249 for (
const auto& frag : fragmenter->getFragmentsForQuery().fragments) {
250 ChunkKey chunk_key = {db_id, table_id, col_id, frag.fragmentId};
258 CHECK(metadata_map.find(col_id) != metadata_map.end());
259 if (
auto& meta = metadata_map.at(col_id); meta->isPlaceholder()) {
263 &(catalog->getDataMgr()),
278 const size_t block_size_x,
279 const size_t grid_size_x,
280 const size_t max_gpu_slab_size,
281 const std::string& debug_dir,
282 const std::string& debug_file)
283 : executor_id_(executor_id)
284 , context_(new llvm::LLVMContext())
295 update_extension_modules();
303 auto template_path = root_path +
"/QueryEngine/RuntimeFunctions.bc";
304 CHECK(boost::filesystem::exists(template_path));
308 auto rt_geos_path = root_path +
"/QueryEngine/GeosRuntime.bc";
309 CHECK(boost::filesystem::exists(rt_geos_path));
315 if (boost::filesystem::exists(rt_libdevice_path)) {
320 <<
" does not exist; support for some UDF "
321 "functions might not be available.";
330 qe->s_code_accessor->clear();
331 qe->s_stubs_accessor->clear();
332 qe->cpu_code_accessor->clear();
333 qe->gpu_code_accessor->clear();
334 qe->tf_code_accessor->clear();
336 if (discard_runtime_modules_only) {
341 cgen_state_->module_ =
nullptr;
343 extension_modules_.clear();
345 context_.reset(
new llvm::LLVMContext());
346 cgen_state_.reset(
new CgenState({},
false,
this));
352 const std::string& source) {
358 CHECK(!source.empty());
359 switch (module_kind) {
379 return std::unique_ptr<llvm::Module>();
384 bool erase_not_found =
false) {
387 auto llvm_module = read_module(module_kind, it->second);
389 extension_modules_[module_kind] = std::move(llvm_module);
390 }
else if (erase_not_found) {
391 extension_modules_.erase(module_kind);
393 if (extension_modules_.find(module_kind) == extension_modules_.end()) {
395 <<
" LLVM module. The module will be unavailable.";
398 <<
" LLVM module. Using the existing module.";
402 if (erase_not_found) {
403 extension_modules_.erase(module_kind);
405 if (extension_modules_.find(module_kind) == extension_modules_.end()) {
407 <<
" LLVM module is unavailable. The module will be unavailable.";
410 <<
" LLVM module is unavailable. Using the existing module.";
416 if (!update_runtime_modules_only) {
442 , cgen_state_(std::move(
executor_.cgen_state_))
450 const bool allow_lazy_fetch,
451 const std::vector<InputTableInfo>& query_infos,
464 executor_.nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, ra_exe_unit);
469 for (
auto& p :
executor_.cgen_state_->row_func_hoisted_literals_) {
470 auto inst = llvm::dyn_cast<llvm::LoadInst>(p.first);
471 if (inst && inst->getNumUses() == 0 && inst->getParent() ==
nullptr) {
474 p.first->deleteValue();
477 executor_.cgen_state_->row_func_hoisted_literals_.clear();
483 for (
auto& bm :
executor_.cgen_state_->in_values_bitmaps_) {
486 executor_.cgen_state_->in_values_bitmaps_.clear();
488 for (
auto& str_dict_translation_mgr :
489 executor_.cgen_state_->str_dict_translation_mgrs_) {
490 cgen_state_->moveStringDictionaryTranslationMgr(std::move(str_dict_translation_mgr));
492 executor_.cgen_state_->str_dict_translation_mgrs_.clear();
494 for (
auto& tree_model_prediction_mgr :
495 executor_.cgen_state_->tree_model_prediction_mgrs_) {
496 cgen_state_->moveTreeModelPredictionMgr(std::move(tree_model_prediction_mgr));
498 executor_.cgen_state_->tree_model_prediction_mgrs_.clear();
515 const std::string& debug_dir,
516 const std::string& debug_file,
524 auto executor = std::make_shared<Executor>(executor_id,
531 CHECK(
executors_.insert(std::make_pair(executor_id, executor)).second);
536 switch (memory_level) {
555 throw std::runtime_error(
556 "Clearing memory levels other than the CPU level or GPU level is not "
568 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
569 const bool with_generation)
const {
570 CHECK(row_set_mem_owner);
571 std::lock_guard<std::mutex> lock(
573 return row_set_mem_owner->getOrAddStringDictProxy(dict_id_in, with_generation);
578 const bool with_generation) {
584 const auto dd = catalog->getMetadataForDict(dict_id);
586 auto dict_key = dict_key_in;
588 CHECK(dd->stringDict);
590 const int64_t generation =
591 with_generation ? string_dictionary_generations_.getGeneration(dict_key) : -1;
592 return addStringDict(dd->stringDict, dict_key, generation);
596 if (!lit_str_dict_proxy_) {
598 std::shared_ptr<StringDictionary> tsd = std::make_shared<StringDictionary>(
600 lit_str_dict_proxy_ = std::make_shared<StringDictionaryProxy>(
603 return lit_str_dict_proxy_.get();
610 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
611 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
612 const bool with_generation)
const {
613 CHECK(row_set_mem_owner);
614 std::lock_guard<std::mutex> lock(
616 return row_set_mem_owner->getOrAddStringProxyTranslationMap(
617 source_dict_key, dest_dict_key, with_generation, translation_type, string_op_infos);
624 const std::vector<StringOps_Namespace::StringOpInfo>& source_string_op_infos,
625 const std::vector<StringOps_Namespace::StringOpInfo>& dest_string_op_infos,
626 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
const {
627 CHECK(row_set_mem_owner);
628 std::lock_guard<std::mutex> lock(
631 if (!dest_string_op_infos.empty()) {
632 row_set_mem_owner->addStringProxyUnionTranslationMap(
633 dest_proxy, dest_proxy, dest_string_op_infos);
635 return row_set_mem_owner->addStringProxyIntersectionTranslationMap(
636 source_proxy, dest_proxy, source_string_op_infos);
642 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
643 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
644 const bool with_generation)
const {
645 CHECK(row_set_mem_owner);
646 std::lock_guard<std::mutex> lock(
648 return row_set_mem_owner->getOrAddStringProxyNumericTranslationMap(
649 source_dict_key, with_generation, string_op_infos);
655 const bool with_generation,
657 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
658 const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
659 const auto dest_proxy = getOrAddStringDictProxy(dest_dict_key_in, with_generation);
661 return addStringProxyIntersectionTranslationMap(
662 source_proxy, dest_proxy, string_op_infos);
664 return addStringProxyUnionTranslationMap(source_proxy, dest_proxy, string_op_infos);
671 const bool with_generation,
672 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
673 const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
674 return addStringProxyNumericTranslationMap(source_proxy, string_op_infos);
680 static_assert(std::is_trivially_copyable_v<ApproxQuantileDescriptor>);
681 std::lock_guard<std::mutex> lock(state_mutex_);
682 auto t_digest = std::make_unique<quantile::TDigest>(
684 return t_digests_.emplace_back(std::move(t_digest)).get();
688 std::unique_lock<std::mutex> lock(state_mutex_);
689 if (t_digest_allocators_.size() <= thread_idx) {
690 t_digest_allocators_.resize(thread_idx + 1u);
692 if (t_digest_allocators_[thread_idx].capacity()) {
696 VLOG(2) <<
"Replacing t_digest_allocators_[" << thread_idx <<
"].";
701 int8_t*
const buffer = allocate(capacity, thread_idx);
720 if (!cd || n > cd->columnType.get_physical_cols()) {
755 std::string
const& log_tag,
756 size_t const thread_idx) {
757 std::ostringstream oss;
759 oss <<
" (" << log_tag <<
", EXECUTOR-" << executor_id <<
", THREAD-" << thread_idx
760 <<
", TOOK: " << log_time_ms <<
" ms)";
761 VLOG(1) << oss.str();
766 size_t const thread_idx)
const {
769 std::ostringstream oss;
777 size_t const thread_idx)
const {
800 const auto& ti = cd->columnType;
801 const auto sz = ti.get_size();
804 if (ti.is_logical_geo_type()) {
820 const std::set<shared::TableKey>& table_ids_to_fetch,
821 const bool include_lazy_fetched_cols)
const {
822 std::map<shared::ColumnKey, size_t> col_byte_width_map;
824 for (
const auto& fetched_col :
plan_state_->getColumnsToFetch()) {
825 if (table_ids_to_fetch.count({fetched_col.db_id, fetched_col.table_id}) == 0) {
829 CHECK(col_byte_width_map.insert({fetched_col, col_byte_width}).second);
831 if (include_lazy_fetched_cols) {
832 for (
const auto& lazy_fetched_col :
plan_state_->getColumnsToNotFetch()) {
833 if (table_ids_to_fetch.count({lazy_fetched_col.db_id, lazy_fetched_col.table_id}) ==
838 CHECK(col_byte_width_map.insert({lazy_fetched_col, col_byte_width}).second);
841 return col_byte_width_map;
845 const std::set<shared::TableKey>& table_ids_to_fetch)
const {
846 size_t num_bytes = 0;
850 for (
const auto& fetched_col :
plan_state_->getColumnsToFetch()) {
851 if (table_ids_to_fetch.count({fetched_col.db_id, fetched_col.table_id}) == 0) {
855 if (fetched_col.table_id < 0) {
859 {fetched_col.db_id, fetched_col.table_id, fetched_col.column_id});
864 if (!ti.is_logical_geo_type()) {
879 const std::vector<InputDescriptor>& input_descs,
880 const std::vector<InputTableInfo>& query_infos,
881 const std::vector<std::pair<int32_t, FragmentsList>>& kernel_fragment_lists)
const {
882 using TableFragmentId = std::pair<shared::TableKey, int32_t>;
883 using TableFragmentSizeMap = std::map<TableFragmentId, size_t>;
892 std::set<shared::TableKey> lhs_table_keys;
893 for (
const auto& input_desc : input_descs) {
894 if (input_desc.getNestLevel() == 0) {
895 lhs_table_keys.insert(input_desc.getTableKey());
900 const auto column_byte_width_map =
907 size_t const byte_width_per_row =
909 column_byte_width_map.end(),
911 [](
size_t sum,
auto& col_entry) {
return sum + col_entry.second; });
915 TableFragmentSizeMap all_table_fragments_size_map;
917 for (
auto& query_info : query_infos) {
918 const auto& table_key = query_info.table_key;
919 for (
const auto& frag : query_info.info.fragments) {
920 const int32_t frag_id = frag.fragmentId;
921 const TableFragmentId table_frag_id = std::make_pair(table_key, frag_id);
922 const size_t fragment_num_tuples = frag.getNumTuples();
923 all_table_fragments_size_map.insert(
924 std::make_pair(table_frag_id, fragment_num_tuples));
931 TableFragmentSizeMap query_table_fragments_size_map;
932 std::vector<size_t> bytes_per_kernel;
933 bytes_per_kernel.reserve(kernel_fragment_lists.size());
935 size_t max_kernel_bytes{0};
937 for (
auto& kernel_frag_list : kernel_fragment_lists) {
938 size_t kernel_bytes{0};
939 const auto frag_list = kernel_frag_list.second;
940 for (
const auto& table_frags : frag_list) {
941 const auto& table_key = table_frags.table_key;
942 for (
const size_t frag_id : table_frags.fragment_ids) {
943 const TableFragmentId table_frag_id = std::make_pair(table_key, frag_id);
944 const size_t fragment_num_tuples = all_table_fragments_size_map[table_frag_id];
945 kernel_bytes += fragment_num_tuples * byte_width_per_row;
946 query_table_fragments_size_map.insert(
947 std::make_pair(table_frag_id, fragment_num_tuples));
950 bytes_per_kernel.emplace_back(kernel_bytes);
951 if (kernel_bytes > max_kernel_bytes) {
952 max_kernel_bytes = kernel_bytes;
958 std::map<ChunkKey, size_t> all_chunks_byte_sizes_map;
959 constexpr int32_t subkey_min = std::numeric_limits<int32_t>::min();
961 for (
const auto& col_byte_width_entry : column_byte_width_map) {
963 const int32_t db_id = col_byte_width_entry.first.db_id;
964 const int32_t table_id = col_byte_width_entry.first.table_id;
965 const int32_t col_id = col_byte_width_entry.first.column_id;
966 const size_t col_byte_width = col_byte_width_entry.second;
969 const auto frag_start =
970 query_table_fragments_size_map.lower_bound({table_key, subkey_min});
971 for (
auto frag_itr = frag_start; frag_itr != query_table_fragments_size_map.end() &&
972 frag_itr->first.first == table_key;
974 const ChunkKey chunk_key = {db_id, table_id, col_id, frag_itr->first.second};
975 const size_t chunk_byte_size = col_byte_width * frag_itr->second;
976 all_chunks_byte_sizes_map.insert({chunk_key, chunk_byte_size});
980 size_t total_chunk_bytes{0};
981 const size_t num_chunks = all_chunks_byte_sizes_map.size();
982 std::vector<std::pair<ChunkKey, size_t>> chunks_with_byte_sizes;
983 chunks_with_byte_sizes.reserve(num_chunks);
984 for (
const auto& chunk_byte_size_entry : all_chunks_byte_sizes_map) {
985 chunks_with_byte_sizes.emplace_back(
986 std::make_pair(chunk_byte_size_entry.first, chunk_byte_size_entry.second));
989 total_chunk_bytes += chunk_byte_size_entry.second;
998 chunks_with_byte_sizes,
1003 bytes_scales_per_kernel};
1007 const std::vector<Analyzer::Expr*>& target_exprs)
const {
1009 for (
const auto target_expr : target_exprs) {
1010 if (
plan_state_->isLazyFetchColumn(target_expr)) {
1018 const std::vector<Analyzer::Expr*>& target_exprs)
const {
1020 std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
1021 for (
const auto target_expr : target_exprs) {
1022 if (!
plan_state_->isLazyFetchColumn(target_expr)) {
1023 col_lazy_fetch_info.emplace_back(
1028 auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
1030 if (cd &&
IS_GEO(cd->columnType.get_type())) {
1034 auto col_key = col_var->getColumnKey();
1035 col_key.column_id += 1;
1037 const auto col0_ti = cd0->columnType;
1038 CHECK(!cd0->isVirtualCol);
1039 const auto col0_var = makeExpr<Analyzer::ColumnVar>(col0_ti, col_key, rte_idx);
1040 const auto local_col0_id =
plan_state_->getLocalColumnId(col0_var.get(),
false);
1041 col_lazy_fetch_info.emplace_back(
1045 auto local_col_id =
plan_state_->getLocalColumnId(col_var,
false);
1046 const auto& col_ti = col_var->get_type_info();
1051 return col_lazy_fetch_info;
1061 const std::unordered_map<int, CgenState::LiteralValues>& literals,
1062 const int device_id) {
1063 if (literals.empty()) {
1066 const auto dev_literals_it = literals.find(device_id);
1067 CHECK(dev_literals_it != literals.end());
1068 const auto& dev_literals = dev_literals_it->second;
1069 size_t lit_buf_size{0};
1070 std::vector<std::string> real_strings;
1071 std::vector<std::vector<double>> double_array_literals;
1072 std::vector<std::vector<int8_t>> align64_int8_array_literals;
1073 std::vector<std::vector<int32_t>> int32_array_literals;
1074 std::vector<std::vector<int8_t>> align32_int8_array_literals;
1075 std::vector<std::vector<int8_t>> int8_array_literals;
1076 for (
const auto& lit : dev_literals) {
1078 if (lit.which() == 7) {
1079 const auto p = boost::get<std::string>(&lit);
1081 real_strings.push_back(*p);
1082 }
else if (lit.which() == 8) {
1083 const auto p = boost::get<std::vector<double>>(&lit);
1085 double_array_literals.push_back(*p);
1086 }
else if (lit.which() == 9) {
1087 const auto p = boost::get<std::vector<int32_t>>(&lit);
1089 int32_array_literals.push_back(*p);
1090 }
else if (lit.which() == 10) {
1091 const auto p = boost::get<std::vector<int8_t>>(&lit);
1093 int8_array_literals.push_back(*p);
1094 }
else if (lit.which() == 11) {
1095 const auto p = boost::get<std::pair<std::vector<int8_t>,
int>>(&lit);
1097 if (p->second == 64) {
1098 align64_int8_array_literals.push_back(p->first);
1099 }
else if (p->second == 32) {
1100 align32_int8_array_literals.push_back(p->first);
1106 if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
1109 int16_t crt_real_str_off = lit_buf_size;
1110 for (
const auto& real_str : real_strings) {
1111 CHECK_LE(real_str.size(),
static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1112 lit_buf_size += real_str.size();
1114 if (double_array_literals.size() > 0) {
1115 lit_buf_size =
align(lit_buf_size,
sizeof(
double));
1117 int16_t crt_double_arr_lit_off = lit_buf_size;
1118 for (
const auto& double_array_literal : double_array_literals) {
1119 CHECK_LE(double_array_literal.size(),
1120 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1121 lit_buf_size += double_array_literal.size() *
sizeof(double);
1123 if (align64_int8_array_literals.size() > 0) {
1124 lit_buf_size =
align(lit_buf_size,
sizeof(uint64_t));
1126 int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
1127 for (
const auto& align64_int8_array_literal : align64_int8_array_literals) {
1128 CHECK_LE(align64_int8_array_literals.size(),
1129 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1130 lit_buf_size += align64_int8_array_literal.size();
1132 if (int32_array_literals.size() > 0) {
1133 lit_buf_size =
align(lit_buf_size,
sizeof(int32_t));
1135 int16_t crt_int32_arr_lit_off = lit_buf_size;
1136 for (
const auto& int32_array_literal : int32_array_literals) {
1137 CHECK_LE(int32_array_literal.size(),
1138 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1139 lit_buf_size += int32_array_literal.size() *
sizeof(int32_t);
1141 if (align32_int8_array_literals.size() > 0) {
1142 lit_buf_size =
align(lit_buf_size,
sizeof(int32_t));
1144 int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
1145 for (
const auto& align32_int8_array_literal : align32_int8_array_literals) {
1146 CHECK_LE(align32_int8_array_literals.size(),
1147 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1148 lit_buf_size += align32_int8_array_literal.size();
1150 int16_t crt_int8_arr_lit_off = lit_buf_size;
1151 for (
const auto& int8_array_literal : int8_array_literals) {
1152 CHECK_LE(int8_array_literal.size(),
1153 static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1154 lit_buf_size += int8_array_literal.size();
1156 unsigned crt_real_str_idx = 0;
1157 unsigned crt_double_arr_lit_idx = 0;
1158 unsigned crt_align64_int8_arr_lit_idx = 0;
1159 unsigned crt_int32_arr_lit_idx = 0;
1160 unsigned crt_align32_int8_arr_lit_idx = 0;
1161 unsigned crt_int8_arr_lit_idx = 0;
1162 std::vector<int8_t> serialized(lit_buf_size);
1164 for (
const auto& lit : dev_literals) {
1167 switch (lit.which()) {
1169 const auto p = boost::get<int8_t>(&lit);
1171 serialized[off - lit_bytes] = *p;
1175 const auto p = boost::get<int16_t>(&lit);
1177 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1181 const auto p = boost::get<int32_t>(&lit);
1183 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1187 const auto p = boost::get<int64_t>(&lit);
1189 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1193 const auto p = boost::get<float>(&lit);
1195 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1199 const auto p = boost::get<double>(&lit);
1201 memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1205 const auto p = boost::get<std::pair<std::string, shared::StringDictKey>>(&lit);
1213 memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
1217 const auto p = boost::get<std::string>(&lit);
1219 int32_t off_and_len = crt_real_str_off << 16;
1220 const auto& crt_real_str = real_strings[crt_real_str_idx];
1221 off_and_len |=
static_cast<int16_t
>(crt_real_str.size());
1222 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1223 memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
1225 crt_real_str_off += crt_real_str.size();
1229 const auto p = boost::get<std::vector<double>>(&lit);
1231 int32_t off_and_len = crt_double_arr_lit_off << 16;
1232 const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
1233 int32_t len = crt_double_arr_lit.size();
1235 off_and_len |=
static_cast<int16_t
>(len);
1236 int32_t double_array_bytesize = len *
sizeof(double);
1237 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1238 memcpy(&serialized[crt_double_arr_lit_off],
1239 crt_double_arr_lit.data(),
1240 double_array_bytesize);
1241 ++crt_double_arr_lit_idx;
1242 crt_double_arr_lit_off += double_array_bytesize;
1246 const auto p = boost::get<std::vector<int32_t>>(&lit);
1248 int32_t off_and_len = crt_int32_arr_lit_off << 16;
1249 const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
1250 int32_t len = crt_int32_arr_lit.size();
1252 off_and_len |=
static_cast<int16_t
>(len);
1253 int32_t int32_array_bytesize = len *
sizeof(int32_t);
1254 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1255 memcpy(&serialized[crt_int32_arr_lit_off],
1256 crt_int32_arr_lit.data(),
1257 int32_array_bytesize);
1258 ++crt_int32_arr_lit_idx;
1259 crt_int32_arr_lit_off += int32_array_bytesize;
1263 const auto p = boost::get<std::vector<int8_t>>(&lit);
1265 int32_t off_and_len = crt_int8_arr_lit_off << 16;
1266 const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
1267 int32_t len = crt_int8_arr_lit.size();
1269 off_and_len |=
static_cast<int16_t
>(len);
1270 int32_t int8_array_bytesize = len;
1271 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1272 memcpy(&serialized[crt_int8_arr_lit_off],
1273 crt_int8_arr_lit.data(),
1274 int8_array_bytesize);
1275 ++crt_int8_arr_lit_idx;
1276 crt_int8_arr_lit_off += int8_array_bytesize;
1280 const auto p = boost::get<std::pair<std::vector<int8_t>,
int>>(&lit);
1282 if (p->second == 64) {
1283 int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
1284 const auto& crt_align64_int8_arr_lit =
1285 align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
1286 int32_t len = crt_align64_int8_arr_lit.size();
1288 off_and_len |=
static_cast<int16_t
>(len);
1289 int32_t align64_int8_array_bytesize = len;
1290 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1291 memcpy(&serialized[crt_align64_int8_arr_lit_off],
1292 crt_align64_int8_arr_lit.data(),
1293 align64_int8_array_bytesize);
1294 ++crt_align64_int8_arr_lit_idx;
1295 crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
1296 }
else if (p->second == 32) {
1297 int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
1298 const auto& crt_align32_int8_arr_lit =
1299 align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
1300 int32_t len = crt_align32_int8_arr_lit.size();
1302 off_and_len |=
static_cast<int16_t
>(len);
1303 int32_t align32_int8_array_bytesize = len;
1304 memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1305 memcpy(&serialized[crt_align32_int8_arr_lit_off],
1306 crt_align32_int8_arr_lit.data(),
1307 align32_int8_array_bytesize);
1308 ++crt_align32_int8_arr_lit_idx;
1309 crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
1339 const int64_t agg_init_val,
1340 const int8_t out_byte_width,
1341 const int64_t* out_vec,
1342 const size_t out_vec_sz,
1343 const bool is_group_by,
1344 const bool float_argument_input) {
1349 if (0 != agg_init_val) {
1351 int64_t agg_result = agg_init_val;
1352 for (
size_t i = 0; i < out_vec_sz; ++i) {
1355 return {agg_result, 0};
1358 switch (out_byte_width) {
1360 int agg_result =
static_cast<int32_t
>(agg_init_val);
1361 for (
size_t i = 0; i < out_vec_sz; ++i) {
1364 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1365 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1367 const int64_t converted_bin =
1368 float_argument_input
1369 ?
static_cast<int64_t
>(agg_result)
1371 return {converted_bin, 0};
1375 int64_t agg_result = agg_init_val;
1376 for (
size_t i = 0; i < out_vec_sz; ++i) {
1379 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1380 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1382 return {agg_result, 0};
1391 int64_t agg_result = 0;
1392 for (
size_t i = 0; i < out_vec_sz; ++i) {
1393 agg_result += out_vec[i];
1395 return {agg_result, 0};
1398 switch (out_byte_width) {
1401 for (
size_t i = 0; i < out_vec_sz; ++i) {
1402 r += *
reinterpret_cast<const float*
>(may_alias_ptr(&out_vec[i]));
1404 const auto float_bin = *
reinterpret_cast<const int32_t*
>(may_alias_ptr(&r));
1405 const int64_t converted_bin =
1407 return {converted_bin, 0};
1411 for (
size_t i = 0; i < out_vec_sz; ++i) {
1412 r += *
reinterpret_cast<const double*
>(may_alias_ptr(&out_vec[i]));
1414 return {*
reinterpret_cast<const int64_t*
>(may_alias_ptr(&r)), 0};
1423 uint64_t agg_result = 0;
1424 for (
size_t i = 0; i < out_vec_sz; ++i) {
1425 const uint64_t out =
static_cast<uint64_t
>(out_vec[i]);
1428 return {
static_cast<int64_t
>(agg_result), 0};
1432 int64_t agg_result = agg_init_val;
1433 for (
size_t i = 0; i < out_vec_sz; ++i) {
1436 return {agg_result, 0};
1438 switch (out_byte_width) {
1440 int32_t agg_result =
static_cast<int32_t
>(agg_init_val);
1441 for (
size_t i = 0; i < out_vec_sz; ++i) {
1444 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1445 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1447 const int64_t converted_bin =
1448 float_argument_input
1449 ?
static_cast<int64_t
>(agg_result)
1451 return {converted_bin, 0};
1454 int64_t agg_result = agg_init_val;
1455 for (
size_t i = 0; i < out_vec_sz; ++i) {
1458 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1459 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1461 return {agg_result, 0};
1470 int64_t agg_result = agg_init_val;
1471 for (
size_t i = 0; i < out_vec_sz; ++i) {
1474 return {agg_result, 0};
1476 switch (out_byte_width) {
1478 int32_t agg_result =
static_cast<int32_t
>(agg_init_val);
1479 for (
size_t i = 0; i < out_vec_sz; ++i) {
1482 *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1483 *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1485 const int64_t converted_bin =
1486 float_argument_input ?
static_cast<int64_t
>(agg_result)
1488 return {converted_bin, 0};
1491 int64_t agg_result = agg_init_val;
1492 for (
size_t i = 0; i < out_vec_sz; ++i) {
1495 *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1496 *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1498 return {agg_result, 0};
1505 int64_t agg_result = agg_init_val;
1506 for (
size_t i = 0; i < out_vec_sz; ++i) {
1507 if (out_vec[i] != agg_init_val) {
1508 if (agg_result == agg_init_val) {
1509 agg_result = out_vec[i];
1510 }
else if (out_vec[i] != agg_result) {
1511 return {agg_result, int32_t(ErrorCode::SINGLE_VALUE_FOUND_MULTIPLE_VALUES)};
1515 return {agg_result, 0};
1518 int64_t agg_result = agg_init_val;
1519 for (
size_t i = 0; i < out_vec_sz; ++i) {
1520 if (out_vec[i] != agg_init_val) {
1521 agg_result = out_vec[i];
1525 return {agg_result, 0};
1536 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1537 std::vector<TargetInfo>
const& targets) {
1538 auto& first = results_per_device.front().first;
1541 if (first_target_idx) {
1542 first->translateDictEncodedColumns(targets, *first_target_idx);
1544 for (
size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
1545 const auto& next = results_per_device[dev_idx].first;
1547 if (first_target_idx) {
1548 next->translateDictEncodedColumns(targets, *first_target_idx);
1550 first->append(*next);
1552 return std::move(first);
1567 auto const targets = shared::transform<std::vector<TargetInfo>>(
1569 if (results_per_device.empty()) {
1570 return std::make_shared<ResultSet>(targets,
1577 using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
1579 results_per_device.end(),
1580 [](
const IndexedResultSet& lhs,
const IndexedResultSet& rhs) {
1581 CHECK_GE(lhs.second.size(), size_t(1));
1582 CHECK_GE(rhs.second.size(), size_t(1));
1583 return lhs.second.front() < rhs.second.front();
1591 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1592 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1599 if (results_per_device.empty()) {
1600 auto const targets = shared::transform<std::vector<TargetInfo>>(
1602 return std::make_shared<ResultSet>(targets,
1623 std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>
1625 const std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device)
1627 std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> unique_thread_results;
1628 if (results_per_device.empty()) {
1629 return unique_thread_results;
1631 auto max_ti = [](
int acc,
auto& e) {
return std::max(acc, e.first->getThreadIdx()); };
1632 int const max_thread_idx =
1633 std::accumulate(results_per_device.begin(), results_per_device.end(), -1, max_ti);
1634 std::vector<bool> seen_thread_idxs(max_thread_idx + 1,
false);
1635 for (
const auto&
result : results_per_device) {
1636 const int32_t result_thread_idx =
result.first->getThreadIdx();
1637 if (!seen_thread_idxs[result_thread_idx]) {
1638 seen_thread_idxs[result_thread_idx] =
true;
1639 unique_thread_results.emplace_back(
result);
1642 return unique_thread_results;
1648 const size_t executor_id,
1649 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1650 int64_t* compilation_queue_time) {
1653 *compilation_queue_time =
timer_stop(clock_begin);
1654 const auto& this_result_set = results_per_device[0].first;
1656 this_result_set->getTargetInfos(),
1657 this_result_set->getTargetInitVals(),
1659 return reduction_jit.
codegen();
1665 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1666 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1669 std::shared_ptr<ResultSet> reduced_results;
1671 const auto& first = results_per_device.front().first;
1675 results_per_device.size() > 1) {
1677 results_per_device.begin(),
1678 results_per_device.end(),
1680 [](
const size_t init,
const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
1681 const auto& r = rs.first;
1682 return init + r->getQueryMemDesc().getEntryCount();
1684 CHECK(total_entry_count);
1685 auto query_mem_desc = first->getQueryMemDesc();
1687 reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
1693 auto result_storage = reduced_results->allocateStorage(
plan_state_->init_agg_vals_);
1694 reduced_results->initializeStorage();
1695 switch (query_mem_desc.getEffectiveKeyWidth()) {
1697 first->getStorage()->moveEntriesToBuffer<int32_t>(
1698 result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1701 first->getStorage()->moveEntriesToBuffer<int64_t>(
1702 result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1708 reduced_results = first;
1711 int64_t compilation_queue_time = 0;
1712 const auto reduction_code =
1715 for (
size_t i = 1; i < results_per_device.size(); ++i) {
1716 reduced_results->getStorage()->reduce(
1717 *(results_per_device[i].first->getStorage()), {}, reduction_code,
executor_id_);
1719 reduced_results->addCompilationQueueTime(compilation_queue_time);
1720 reduced_results->invalidateCachedRowCount();
1721 return reduced_results;
1726 std::vector<std::pair<
ResultSetPtr, std::vector<size_t>>>& results_per_device,
1727 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1729 if (results_per_device.size() == 1) {
1730 return std::move(results_per_device.front().first);
1735 for (
const auto&
result : results_per_device) {
1744 std::max(
size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1749 return m.
asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc,
this, top_n, desc);
1754 std::unordered_set<int> available_gpus;
1759 for (
int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1760 available_gpus.insert(gpu_id);
1763 return available_gpus;
1767 const size_t cpu_count,
1768 const size_t gpu_count) {
1770 :
static_cast<size_t>(cpu_count);
1783 <<
"Exploiting a result of filtered count query as output buffer entry count: "
1788 using checked_size_t = boost::multiprecision::number<
1789 boost::multiprecision::cpp_int_backend<64,
1791 boost::multiprecision::unsigned_magnitude,
1792 boost::multiprecision::checked,
1794 checked_size_t checked_max_groups_buffer_entry_guess = 1;
1797 constexpr
size_t max_groups_buffer_entry_guess_cap = 100000000;
1800 for (
const auto& table_info : query_infos) {
1801 CHECK(!table_info.info.fragments.empty());
1802 checked_size_t table_cardinality = 0;
1803 std::for_each(table_info.info.fragments.begin(),
1804 table_info.info.fragments.end(),
1805 [&table_cardinality](
const FragmentInfo& frag_info) {
1806 table_cardinality += frag_info.getNumTuples();
1808 checked_max_groups_buffer_entry_guess *= table_cardinality;
1811 checked_max_groups_buffer_entry_guess = max_groups_buffer_entry_guess_cap;
1812 VLOG(1) <<
"Detect overflow when approximating output buffer entry count, "
1814 << max_groups_buffer_entry_guess_cap;
1816 size_t max_groups_buffer_entry_guess =
1817 std::min(static_cast<size_t>(checked_max_groups_buffer_entry_guess),
1818 max_groups_buffer_entry_guess_cap);
1819 VLOG(1) <<
"Set an approximated output entry count as: "
1820 << max_groups_buffer_entry_guess;
1821 return max_groups_buffer_entry_guess;
1831 return td->tableName;
1838 size_t watchdog_max_projected_rows_per_device,
1840 const int device_count) {
1842 return device_count * watchdog_max_projected_rows_per_device;
1844 return watchdog_max_projected_rows_per_device;
1848 const std::vector<InputTableInfo>& table_infos,
1850 const int device_count) {
1851 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
1852 if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1856 size_t watchdog_max_projected_rows_per_device =
1860 watchdog_max_projected_rows_per_device =
1862 VLOG(1) <<
"Set the watchdog per device maximum projection limit: "
1863 << watchdog_max_projected_rows_per_device <<
" by a query hint";
1865 if (!ra_exe_unit.
scan_limit && table_infos.size() == 1 &&
1866 table_infos.front().info.getPhysicalNumTuples() <
1867 watchdog_max_projected_rows_per_device) {
1881 watchdog_max_projected_rows_per_device, device_type, device_count))) {
1882 std::vector<std::string> table_names;
1883 const auto& input_descs = ra_exe_unit.
input_descs;
1884 for (
const auto& input_desc : input_descs) {
1889 "Projection query would require a scan without a limit on table(s): " +
1893 "Projection query output result set on table(s): " +
1896 " rows, which is more than the current system limit of " +
1898 watchdog_max_projected_rows_per_device, device_type, device_count)));
1907 const auto inner_table_key = ra_exe_unit.
input_descs.back().getTableKey();
1909 std::optional<size_t> inner_table_idx;
1910 for (
size_t i = 0; i < query_infos.size(); ++i) {
1911 if (query_infos[i].table_key == inner_table_key) {
1912 inner_table_idx = i;
1916 CHECK(inner_table_idx);
1917 return query_infos[*inner_table_idx].info.getNumTuples();
1922 template <
typename T>
1924 std::vector<std::string> expr_strs;
1925 for (
const auto& expr : expr_container) {
1927 expr_strs.emplace_back(
"NULL");
1929 expr_strs.emplace_back(expr->toString());
1937 const std::list<Analyzer::OrderEntry>& expr_container) {
1938 std::vector<std::string> expr_strs;
1939 for (
const auto& expr : expr_container) {
1940 expr_strs.emplace_back(expr.toString());
1946 switch (algorithm) {
1950 return "Speculative Top N";
1952 return "Streaming Top N";
1963 std::ostringstream os;
1965 const auto& scan_desc = input_col_desc->getScanDesc();
1966 os << scan_desc.getTableKey() <<
"," << input_col_desc->getColId() <<
","
1967 << scan_desc.getNestLevel();
1968 table_keys.emplace(scan_desc.getTableKey());
1973 os << qual->toString() <<
",";
1977 if (!ra_exe_unit.
quals.empty()) {
1978 for (
const auto& qual : ra_exe_unit.
quals) {
1980 os << qual->toString() <<
",";
1985 for (
size_t i = 0; i < ra_exe_unit.
join_quals.size(); i++) {
1986 const auto& join_condition = ra_exe_unit.
join_quals[i];
1988 for (
const auto& qual : join_condition.quals) {
1990 os << qual->toString() <<
",";
1998 os << qual->toString() <<
",";
2004 os << expr->toString() <<
",";
2013 return key == other.
key;
2021 return table_keys.find(table_key) != table_keys.end();
2026 os <<
"\n\tTable/Col/Levels: ";
2028 const auto& scan_desc = input_col_desc->getScanDesc();
2029 os <<
"(" << scan_desc.getTableKey() <<
", " << input_col_desc->getColId() <<
", "
2030 << scan_desc.getNestLevel() <<
") ";
2033 os <<
"\n\tSimple Quals: "
2037 if (!ra_exe_unit.
quals.empty()) {
2042 os <<
"\n\tJoin Quals: ";
2043 for (
size_t i = 0; i < ra_exe_unit.
join_quals.size(); i++) {
2044 const auto& join_condition = ra_exe_unit.
join_quals[i];
2050 os <<
"\n\tGroup By: "
2054 os <<
"\n\tProjected targets: "
2057 os <<
"\n\tSort Info: ";
2058 const auto& sort_info = ra_exe_unit.
sort_info;
2059 os <<
"\n\t Order Entries: "
2062 std::string limit_str = sort_info.limit ?
std::to_string(*sort_info.limit) :
"N/A";
2063 os <<
"\n\t Limit: " << limit_str;
2068 os <<
"\n\tUnion: " << std::string(*ra_exe_unit.
union_all ?
"UNION ALL" :
"UNION");
2076 const size_t new_scan_limit) {
2080 ra_exe_unit_in.
quals,
2101 const std::vector<InputTableInfo>& query_infos,
2106 const bool has_cardinality_estimation,
2108 VLOG(1) <<
"Executor " <<
executor_id_ <<
" is executing work unit:" << ra_exe_unit_in;
2112 VLOG(1) <<
"Perform post execution clearance for Executor " <<
executor_id_;
2132 has_cardinality_estimation,
2138 result->setValidationOnlyRes();
2153 has_cardinality_estimation,
2159 result->setValidationOnlyRes();
2167 size_t& max_groups_buffer_entry_guess,
2169 const bool allow_single_frag_table_opt,
2170 const std::vector<InputTableInfo>& query_infos,
2174 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
2176 const bool has_cardinality_estimation,
2179 const auto [ra_exe_unit, deleted_cols_map] =
addDeletedColumn(ra_exe_unit_in, co);
2181 CHECK(!query_infos.empty());
2182 if (!max_groups_buffer_entry_guess) {
2187 max_groups_buffer_entry_guess =
2201 auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
2202 std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
2206 query_mem_desc_owned =
2207 query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
2209 has_cardinality_estimation,
2218 CHECK(query_mem_desc_owned);
2219 crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
2221 VLOG(1) << e.what();
2227 plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
2228 CHECK(!query_mem_desc_owned);
2229 query_mem_desc_owned.reset(
2236 if (query_mem_desc_owned->canUsePerDeviceCardinality(ra_exe_unit)) {
2237 auto const max_rows_per_device =
2238 query_mem_desc_owned->getMaxPerDeviceCardinality(ra_exe_unit);
2239 if (max_rows_per_device && *max_rows_per_device >= 0 &&
2240 *max_rows_per_device < query_mem_desc_owned->getEntryCount()) {
2241 VLOG(1) <<
"Setting the max per device cardinality of {max_rows_per_device} as "
2242 "the new scan limit: "
2243 << *max_rows_per_device;
2252 const auto context_count =
2261 allow_single_frag_table_opt,
2263 *query_comp_desc_owned,
2264 *query_mem_desc_owned,
2268 if (!kernels.empty()) {
2274 query_comp_desc_owned->getDeviceType(),
2275 ra_exe_unit.input_descs,
2276 *query_mem_desc_owned);
2279 shared_context, std::move(kernels), query_comp_desc_owned->getDeviceType());
2290 if (e.
hasErrorCode(ErrorCode::OVERFLOW_OR_UNDERFLOW) &&
2291 static_cast<size_t>(crt_min_byte_width << 1) <=
sizeof(int64_t)) {
2292 crt_min_byte_width <<= 1;
2301 std::string curRunningSession{
""};
2302 std::string curRunningQuerySubmittedTime{
""};
2303 bool sessionEnrolled =
false;
2308 curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
2312 if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
2315 curRunningQuerySubmittedTime,
2322 auto row =
result.first->getNextRow(
false,
false);
2324 auto scalar_r = boost::get<ScalarTargetValue>(&row[0]);
2326 auto p = boost::get<int64_t>(scalar_r);
2329 auto frag_ids =
result.second;
2331 <<
"} : " <<
static_cast<size_t>(*p);
2333 static_cast<size_t>(*p));
2334 result.first->moveToBegin();
2339 *query_mem_desc_owned,
2340 query_comp_desc_owned->getDeviceType(),
2345 crt_min_byte_width <<= 1;
2349 <<
", what(): " << e.what();
2355 }
while (static_cast<size_t>(crt_min_byte_width) <=
sizeof(int64_t));
2357 return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2372 const std::set<size_t>& fragment_indexes_param) {
2373 const auto [ra_exe_unit, deleted_cols_map] =
addDeletedColumn(ra_exe_unit_in, co);
2376 std::vector<InputTableInfo> table_infos{table_info};
2380 auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
2381 std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
2383 query_mem_desc_owned =
2384 query_comp_desc_owned->compile(0,
2396 CHECK(query_mem_desc_owned);
2397 CHECK_EQ(
size_t(1), ra_exe_unit.input_descs.size());
2398 const auto table_key = ra_exe_unit.input_descs[0].getTableKey();
2401 std::set<size_t> fragment_indexes;
2402 if (fragment_indexes_param.empty()) {
2406 for (
size_t i = 0; i < outer_fragments.size(); i++) {
2407 fragment_indexes.emplace(i);
2410 fragment_indexes = fragment_indexes_param;
2418 for (
auto fragment_index : fragment_indexes) {
2421 FragmentsList fragments_list{{table_key, {fragment_index}}};
2427 *query_comp_desc_owned,
2428 *query_mem_desc_owned,
2433 kernel.
run(
this, 0, kernel_context);
2439 for (
const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
2440 CHECK_EQ(result_fragment_indexes.size(), 1);
2441 cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
2447 const std::vector<InputTableInfo>& table_infos,
2455 return std::make_shared<ResultSet>(
2479 std::shared_ptr<CompilationContext> compilation_context;
2488 compilation_context =
2489 tf_compilation_context.
compile(exe_unit,
true );
2493 compilation_context,
2499 std::shared_ptr<CompilationContext> compilation_context;
2507 compilation_context =
2508 tf_compilation_context.compile(exe_unit,
false );
2510 return exe_context.
execute(exe_unit,
2512 compilation_context,
2520 return std::make_shared<ResultSet>(query_comp_desc.
getIR());
2525 const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
2529 [
this, &dict_id_visitor, &row_set_mem_owner](
const Analyzer::Expr* expr) {
2533 const auto& dict_key = dict_id_visitor.
visit(expr);
2534 if (dict_key.dict_id >= 0) {
2538 visitor.
visit(expr);
2543 visit_expr(group_expr.get());
2546 for (
const auto& group_expr : ra_exe_unit.
quals) {
2547 visit_expr(group_expr.get());
2550 for (
const auto& group_expr : ra_exe_unit.
simple_quals) {
2551 visit_expr(group_expr.get());
2554 const auto visit_target_expr = [&](
const Analyzer::Expr* target_expr) {
2555 const auto& target_type = target_expr->get_type_info();
2556 if (!target_type.is_string() || target_type.get_compression() ==
kENCODING_DICT) {
2560 if (agg_expr->get_is_distinct() || agg_expr->get_aggtype() ==
kSINGLE_VALUE ||
2561 agg_expr->get_aggtype() ==
kSAMPLE || agg_expr->get_aggtype() ==
kMODE) {
2562 visit_expr(agg_expr->get_arg());
2565 visit_expr(target_expr);
2570 std::for_each(target_exprs.begin(), target_exprs.end(), visit_target_expr);
2572 std::for_each(target_exprs_union.begin(), target_exprs_union.end(), visit_target_expr);
2581 for (
const auto target_expr : ra_exe_unit.
target_exprs) {
2585 if ((agg_info.agg_kind ==
kAVG || agg_info.agg_kind ==
kSUM ||
2586 agg_info.agg_kind ==
kSUM_IF) &&
2587 agg_info.agg_arg_type.get_type() ==
kDOUBLE) {
2591 if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
2595 return requested_device_type;
2604 int64_t float_null_val = 0;
2605 *
reinterpret_cast<float*
>(may_alias_ptr(&float_null_val)) =
2607 return float_null_val;
2610 return *
reinterpret_cast<const int64_t*
>(may_alias_ptr(&double_null_val));
2616 std::vector<int64_t>& entry,
2617 const std::vector<Analyzer::Expr*>& target_exprs,
2619 for (
size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
2620 const auto target_expr = target_exprs[target_idx];
2622 CHECK(agg_info.is_agg);
2623 target_infos.push_back(agg_info);
2625 const auto executor = query_mem_desc.
getExecutor();
2627 auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2628 CHECK(row_set_mem_owner);
2629 const auto& count_distinct_desc =
2632 CHECK(row_set_mem_owner);
2634 constexpr
size_t thread_idx{0};
2635 const auto bitmap_size = count_distinct_desc.bitmapPaddedSizeBytes();
2636 row_set_mem_owner->initCountDistinctBufferAllocator(bitmap_size, thread_idx);
2637 auto count_distinct_buffer =
2638 row_set_mem_owner->allocateCountDistinctBuffer(bitmap_size, thread_idx);
2639 entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
2644 CHECK(row_set_mem_owner);
2645 row_set_mem_owner->addCountDistinctSet(count_distinct_set);
2646 entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
2651 if (shared::is_any<kCOUNT, kCOUNT_IF, kAPPROX_COUNT_DISTINCT>(agg_info.agg_kind)) {
2653 }
else if (shared::is_any<kAVG>(agg_info.agg_kind)) {
2656 }
else if (shared::is_any<kSINGLE_VALUE, kSAMPLE>(agg_info.agg_kind)) {
2657 if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
2658 for (
int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2661 }
else if (agg_info.sql_type.is_varlen()) {
2665 entry.push_back(
inline_null_val(agg_info.sql_type, float_argument_input));
2668 entry.push_back(
inline_null_val(agg_info.sql_type, float_argument_input));
2674 const std::vector<Analyzer::Expr*>& target_exprs_in,
2677 std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
2678 std::vector<Analyzer::Expr*> target_exprs;
2679 for (
const auto target_expr : target_exprs_in) {
2680 const auto target_expr_copy =
2682 CHECK(target_expr_copy);
2683 auto ti = target_expr->get_type_info();
2685 target_expr_copy->set_type_info(ti);
2686 if (target_expr_copy->get_arg()) {
2687 auto arg_ti = target_expr_copy->get_arg()->get_type_info();
2688 arg_ti.set_notnull(
false);
2689 target_expr_copy->get_arg()->set_type_info(arg_ti);
2691 target_exprs_owned_copies.push_back(target_expr_copy);
2692 target_exprs.push_back(target_expr_copy.get());
2694 std::vector<TargetInfo> target_infos;
2695 std::vector<int64_t> entry;
2697 const auto executor = query_mem_desc.
getExecutor();
2700 auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2701 CHECK(row_set_mem_owner);
2702 auto rs = std::make_shared<ResultSet>(target_infos,
2706 executor->blockSize(),
2707 executor->gridSize());
2708 rs->allocateStorage();
2709 rs->fillOneEntry(entry);
2720 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2726 ra_exe_unit.
target_exprs, query_mem_desc, device_type);
2731 ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2732 }
catch (
const std::bad_alloc&) {
2736 const auto shard_count =
2741 if (shard_count && !result_per_device.empty()) {
2745 ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2762 size_t output_row_index,
2764 const std::vector<uint32_t>& top_permutation) {
2767 for (
const auto sorted_idx : top_permutation) {
2769 for (
size_t group_idx = 0; group_idx < input_query_mem_desc.
getKeyCount();
2771 const auto input_column_ptr =
2774 const auto output_column_ptr =
2777 output_row_index * output_query_mem_desc.
groupColWidth(group_idx);
2778 memcpy(output_column_ptr,
2783 for (
size_t slot_idx = 0; slot_idx < input_query_mem_desc.
getSlotCount();
2785 const auto input_column_ptr =
2788 const auto output_column_ptr =
2791 memcpy(output_column_ptr,
2797 return output_row_index;
2811 size_t output_row_index,
2813 const std::vector<uint32_t>& top_permutation) {
2816 for (
const auto sorted_idx : top_permutation) {
2817 const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.
getRowSize();
2818 memcpy(output_buffer + output_row_index * output_query_mem_desc.
getRowSize(),
2823 return output_row_index;
2835 const auto first_result_set = result_per_device.front().first;
2836 CHECK(first_result_set);
2837 auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2838 CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2841 top_query_mem_desc.setEntryCount(0);
2842 for (
auto&
result : result_per_device) {
2843 const auto result_set =
result.first;
2846 size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2847 top_query_mem_desc.setEntryCount(new_entry_cnt);
2849 auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2850 first_result_set->getDeviceType(),
2852 first_result_set->getRowSetMemOwner(),
2855 auto top_storage = top_result_set->allocateStorage();
2856 size_t top_output_row_idx{0};
2857 for (
auto&
result : result_per_device) {
2858 const auto result_set =
result.first;
2860 const auto& top_permutation = result_set->getPermutationBuffer();
2861 CHECK_LE(top_permutation.size(), top_n);
2862 if (top_query_mem_desc.didOutputColumnar()) {
2864 result_set->getQueryMemDesc(),
2877 CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2878 return top_result_set;
2881 std::unordered_map<shared::TableKey, const Analyzer::BinOper*>
2883 std::unordered_map<shared::TableKey, const Analyzer::BinOper*> id_to_cond;
2885 CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2886 for (
size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2887 const auto& inner_table_key = join_info.join_hash_tables_[i]->getInnerTableId();
2889 std::make_pair(inner_table_key, join_info.equi_join_tautologies_[i].get()));
2897 for (
const auto& col : fetched_cols) {
2898 if (col.is_lazily_fetched) {
2911 const std::vector<InputTableInfo>& table_infos,
2914 const bool allow_single_frag_table_opt,
2915 const size_t context_count,
2919 std::unordered_set<int>& available_gpus,
2920 int& available_cpus) {
2921 std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2928 : std::vector<Data_Namespace::MemoryInfo>{},
2934 const bool uses_lazy_fetch =
2939 const auto device_count =
deviceCount(device_type);
2942 fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2946 use_multifrag_kernel,
2949 if (eo.
with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2953 if (use_multifrag_kernel) {
2954 VLOG(1) <<
"Creating multifrag execution kernels";
2962 auto multifrag_kernel_dispatch = [&ra_exe_unit,
2968 render_info](
const int device_id,
2970 const int64_t rowid_lookup_key) {
2971 execution_kernels.emplace_back(
2972 std::make_unique<ExecutionKernel>(ra_exe_unit,
2984 fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2986 VLOG(1) <<
"Creating one execution kernel per fragment";
2991 table_infos.size() == 1 && table_infos.front().table_key.table_id > 0) {
2992 const auto max_frag_size =
2993 table_infos.front().info.getFragmentNumTuplesUpperBound();
2996 <<
" to match max fragment size " << max_frag_size
2997 <<
" for kernel per fragment execution path.";
3002 size_t frag_list_idx{0};
3003 auto fragment_per_kernel_dispatch = [&ra_exe_unit,
3011 render_info](
const int device_id,
3013 const int64_t rowid_lookup_key) {
3014 if (!frag_list.size()) {
3019 execution_kernels.emplace_back(
3020 std::make_unique<ExecutionKernel>(ra_exe_unit,
3034 fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
3037 return execution_kernels;
3041 std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3043 const size_t requested_num_threads) {
3045 const size_t num_threads =
3047 ? std::min(kernels.size(),
static_cast<size_t>(
cpu_threads()))
3048 : requested_num_threads;
3049 tbb::task_arena local_arena(num_threads);
3054 LOG(
EXECUTOR) <<
"Launching query step with " << num_threads <<
" threads.";
3058 kernels.empty() ?
nullptr : &kernels[0]->ra_exe_unit_;
3062 shared_context.setThreadPool(&tg);
3064 ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(
nullptr); });
3067 VLOG(1) <<
"Launching " << kernels.size() <<
" kernels for query on "
3069 <<
" using pool of " << num_threads <<
" threads.";
3070 size_t kernel_idx = 1;
3072 for (
auto& kernel : kernels) {
3073 CHECK(kernel.get());
3075 local_arena.execute([&] {
3082 crt_kernel_idx = kernel_idx++] {
3088 const size_t old_thread_idx = crt_kernel_idx % num_threads;
3089 const size_t thread_idx = tbb::this_task_arena::current_thread_index();
3091 <<
" Old thread idx: " << old_thread_idx;
3093 const size_t thread_idx = crt_kernel_idx % num_threads;
3095 kernel->run(
this, thread_idx, shared_context);
3102 local_arena.execute([&] { tg.
wait(); });
3107 for (
auto& exec_ctx : shared_context.getTlsExecutionContext()) {
3114 results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
3116 results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
3125 std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3137 std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3139 const std::vector<InputDescriptor>& input_descs,
3144 const size_t num_kernels = kernels.size();
3145 constexpr
bool cap_slots =
false;
3146 const size_t num_compute_slots =
3148 ? std::min(num_kernels,
3150 ->get_resource_info(
3156 const size_t cpu_result_mem_bytes_per_kernel =
3159 std::vector<std::pair<int32_t, FragmentsList>> kernel_fragments_list;
3160 kernel_fragments_list.reserve(num_kernels);
3161 for (
auto& kernel : kernels) {
3162 const auto device_id = kernel->get_chosen_device_id();
3163 const auto frag_list = kernel->get_fragment_list();
3164 if (!frag_list.empty()) {
3165 kernel_fragments_list.emplace_back(std::make_pair(device_id, frag_list));
3169 device_type, input_descs, shared_context.
getQueryInfos(), kernel_fragments_list);
3171 auto gen_resource_request_info = [device_type,
3173 cpu_result_mem_bytes_per_kernel,
3174 &chunk_request_info,
3179 static_cast<size_t>(0),
3180 static_cast<size_t>(0),
3181 static_cast<size_t>(0),
3184 cpu_result_mem_bytes_per_kernel * num_compute_slots,
3185 cpu_result_mem_bytes_per_kernel * num_compute_slots,
3189 const size_t min_cpu_slots{1};
3190 const size_t min_cpu_result_mem =
3192 ? cpu_result_mem_bytes_per_kernel * min_cpu_slots
3193 : cpu_result_mem_bytes_per_kernel * num_compute_slots;
3196 static_cast<size_t>(0),
3201 cpu_result_mem_bytes_per_kernel * num_compute_slots,
3205 .threadsCanReuseGroupByBuffers());
3209 const auto resource_request_info = gen_resource_request_info();
3212 const bool is_empty_request =
3213 resource_request_info.cpu_slots == 0UL && resource_request_info.gpu_slots == 0UL;
3214 auto resource_handle =
3215 is_empty_request ?
nullptr
3217 const auto num_cpu_threads =
3218 is_empty_request ? 0UL : resource_handle->get_resource_grant().cpu_slots;
3220 const auto num_gpu_slots =
3221 is_empty_request ? 0UL : resource_handle->get_resource_grant().gpu_slots;
3222 VLOG(1) <<
"In Executor::LaunchKernels executor " <<
getExecutorId() <<
" requested "
3223 <<
"between " << resource_request_info.min_gpu_slots <<
" and "
3224 << resource_request_info.gpu_slots <<
" GPU slots, and was granted "
3225 << num_gpu_slots <<
" GPU slots.";
3227 VLOG(1) <<
"In Executor::LaunchKernels executor " <<
getExecutorId() <<
" requested "
3228 <<
"between " << resource_request_info.min_cpu_slots <<
" and "
3229 << resource_request_info.cpu_slots <<
" CPU slots, and was granted "
3230 << num_cpu_threads <<
" CPU slots.";
3233 launchKernelsImpl(shared_context, std::move(kernels), device_type, num_cpu_threads);
3239 const size_t table_idx,
3240 const size_t outer_frag_idx,
3241 std::map<shared::TableKey, const TableFragments*>& selected_tables_fragments,
3242 const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
3243 inner_table_id_to_join_condition) {
3244 const auto& table_key = ra_exe_unit.
input_descs[table_idx].getTableKey();
3245 auto table_frags_it = selected_tables_fragments.find(table_key);
3246 CHECK(table_frags_it != selected_tables_fragments.end());
3247 const auto& outer_input_desc = ra_exe_unit.
input_descs[0];
3248 const auto outer_table_fragments_it =
3249 selected_tables_fragments.find(outer_input_desc.getTableKey());
3250 const auto outer_table_fragments = outer_table_fragments_it->second;
3251 CHECK(outer_table_fragments_it != selected_tables_fragments.end());
3252 CHECK_LT(outer_frag_idx, outer_table_fragments->size());
3254 return {outer_frag_idx};
3256 const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
3257 auto& inner_frags = table_frags_it->second;
3259 std::vector<size_t> all_frag_ids;
3260 for (
size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
3262 const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
3266 inner_table_id_to_join_condition,
3271 all_frag_ids.push_back(inner_frag_idx);
3273 return all_frag_ids;
3281 const int table_idx,
3282 const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
3283 inner_table_id_to_join_condition,
3289 CHECK(table_idx >= 0 &&
3290 static_cast<size_t>(table_idx) < ra_exe_unit.
input_descs.size());
3291 const auto& inner_table_key = ra_exe_unit.
input_descs[table_idx].getTableKey();
3293 if (outer_fragment_info.
shard == -1 || inner_fragment_info.
shard == -1 ||
3294 outer_fragment_info.
shard == inner_fragment_info.
shard) {
3299 CHECK(!inner_table_id_to_join_condition.empty());
3300 auto condition_it = inner_table_id_to_join_condition.find(inner_table_key);
3301 CHECK(condition_it != inner_table_id_to_join_condition.end());
3302 join_condition = condition_it->second;
3303 CHECK(join_condition);
3306 plan_state_->join_info_.join_hash_tables_.size());
3307 for (
size_t i = 0; i <
plan_state_->join_info_.join_hash_tables_.size(); ++i) {
3308 if (
plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
3310 CHECK(!join_condition);
3311 join_condition =
plan_state_->join_info_.equi_join_tautologies_[i].get();
3315 if (!join_condition) {
3319 if (join_condition->is_bbox_intersect_oper()) {
3322 size_t shard_count{0};
3323 if (dynamic_cast<const Analyzer::ExpressionTuple*>(
3324 join_condition->get_left_operand())) {
3325 auto inner_outer_pairs =
3328 join_condition,
this, inner_outer_pairs);
3332 if (shard_count && !ra_exe_unit.
join_quals.empty()) {
3333 plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
3342 const auto col_id = col_desc->
getColId();
3349 const std::vector<InputDescriptor>& input_descs,
3350 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
3351 std::map<shared::TableKey, std::vector<uint64_t>> tab_id_to_frag_offsets;
3352 for (
auto& desc : input_descs) {
3353 const auto fragments_it = all_tables_fragments.find(desc.getTableKey());
3354 CHECK(fragments_it != all_tables_fragments.end());
3355 const auto& fragments = *fragments_it->second;
3356 std::vector<uint64_t> frag_offsets(fragments.size(), 0);
3357 for (
size_t i = 0, off = 0; i < fragments.size(); ++i) {
3358 frag_offsets[i] = off;
3359 off += fragments[i].getNumTuples();
3361 tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableKey(), frag_offsets));
3363 return tab_id_to_frag_offsets;
3366 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
3369 const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
3370 const std::vector<InputDescriptor>& input_descs,
3371 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
3372 std::vector<std::vector<int64_t>> all_num_rows;
3373 std::vector<std::vector<uint64_t>> all_frag_offsets;
3374 const auto tab_id_to_frag_offsets =
3376 std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
3377 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
3378 std::vector<int64_t> num_rows;
3379 std::vector<uint64_t> frag_offsets;
3381 CHECK_EQ(selected_frag_ids.size(), input_descs.size());
3383 for (
size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
3384 const auto frag_id = ra_exe_unit.
union_all ? 0 : selected_frag_ids[tab_idx];
3385 const auto fragments_it =
3386 all_tables_fragments.find(input_descs[tab_idx].getTableKey());
3387 CHECK(fragments_it != all_tables_fragments.end());
3388 const auto& fragments = *fragments_it->second;
3389 if (ra_exe_unit.
join_quals.empty() || tab_idx == 0 ||
3390 plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
3391 const auto& fragment = fragments[frag_id];
3392 num_rows.push_back(fragment.getNumTuples());
3394 size_t total_row_count{0};
3395 for (
const auto& fragment : fragments) {
3396 total_row_count += fragment.getNumTuples();
3398 num_rows.push_back(total_row_count);
3400 const auto frag_offsets_it =
3401 tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableKey());
3402 CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
3403 const auto& offsets = frag_offsets_it->second;
3405 frag_offsets.push_back(offsets[frag_id]);
3407 all_num_rows.push_back(num_rows);
3409 all_frag_offsets.push_back(frag_offsets);
3411 return {all_num_rows, all_frag_offsets};
3419 const auto& input_descs = ra_exe_unit.
input_descs;
3421 if (nest_level < 1 ||
3423 ra_exe_unit.
join_quals.empty() || input_descs.size() < 2 ||
3425 plan_state_->isLazyFetchColumn(inner_col_desc))) {
3429 CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
3430 CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
3431 const auto& fragments = selected_fragments[nest_level].fragment_ids;
3432 return fragments.size() > 1;
3443 CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
3444 CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
3445 const auto& fragments = selected_fragments[nest_level].fragment_ids;
3446 auto need_linearize =
3449 return table_key.table_id > 0 && need_linearize && fragments.size() > 1;
3461 const int device_id,
3463 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
3465 std::list<ChunkIter>& chunk_iterators,
3466 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3468 const size_t thread_idx,
3469 const bool allow_runtime_interrupt) {
3473 std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3474 std::vector<size_t> local_col_to_frag_pos;
3476 local_col_to_frag_pos,
3482 selected_fragments_crossjoin);
3483 std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
3484 std::vector<std::vector<int64_t>> all_num_rows;
3485 std::vector<std::vector<uint64_t>> all_frag_offsets;
3486 for (
const auto& selected_frag_ids : frag_ids_crossjoin) {
3487 std::vector<const int8_t*> frag_col_buffers(
3489 for (
const auto& col_id : col_global_ids) {
3490 if (allow_runtime_interrupt) {
3491 bool isInterrupted =
false;
3499 if (isInterrupted) {
3508 if (cd && cd->isVirtualCol) {
3512 const auto& table_key = col_id->getScanDesc().getTableKey();
3513 const auto fragments_it = all_tables_fragments.find(table_key);
3514 CHECK(fragments_it != all_tables_fragments.end());
3515 const auto fragments = fragments_it->second;
3516 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
3518 CHECK_LT(static_cast<size_t>(it->second),
3520 const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
3521 if (!fragments->size()) {
3524 CHECK_LT(frag_id, fragments->size());
3525 auto memory_level_for_column = memory_level;
3527 col_id->getColId()};
3532 frag_col_buffers[it->second] =
3534 memory_level_for_column,
3545 cd, *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3546 bool for_lazy_fetch =
false;
3547 if (
plan_state_->isColumnToNotFetch(tbl_col_key)) {
3548 for_lazy_fetch =
true;
3549 VLOG(2) <<
"Try to linearize lazy fetch column (col_id: " << cd->columnId
3550 <<
", col_name: " << cd->columnName <<
")";
3553 col_id->getScanDesc().getTableKey(),
3555 all_tables_fragments,
3559 for_lazy_fetch ? 0 : device_id,
3564 col_id->getScanDesc().getTableKey(),
3566 all_tables_fragments,
3567 memory_level_for_column,
3574 col_id->getScanDesc().getTableKey(),
3577 all_tables_fragments,
3580 memory_level_for_column,
3586 all_frag_col_buffers.push_back(frag_col_buffers);
3589 ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.
input_descs, all_tables_fragments);
3590 return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3595 std::vector<InputDescriptor>
const& input_descs) {
3596 auto const has_table_key = [&table_key](
InputDescriptor const& input_desc) {
3597 return table_key == input_desc.getTableKey();
3599 return std::find_if(input_descs.begin(), input_descs.end(), has_table_key) -
3600 input_descs.begin();
3605 std::list<std::shared_ptr<InputColDescriptor const>>
const& input_col_descs) {
3606 auto const has_table_key = [&table_key](
auto const& input_desc) {
3607 return table_key == input_desc->getScanDesc().getTableKey();
3609 return std::distance(
3610 input_col_descs.begin(),
3611 std::find_if(input_col_descs.begin(), input_col_descs.end(), has_table_key));
3616 std::list<std::shared_ptr<InputColDescriptor const>>
const& input_col_descs) {
3617 std::list<std::shared_ptr<const InputColDescriptor>> selected;
3618 for (
auto const& input_col_desc : input_col_descs) {
3619 if (table_key == input_col_desc->getScanDesc().getTableKey()) {
3620 selected.push_back(input_col_desc);
3628 int8_t
const*
const ptr,
3629 size_t const local_col_id,
3631 size_t const begin = local_col_id - local_col_id %
N;
3632 size_t const end = begin +
N;
3633 CHECK_LE(end, frag_col_buffers.size()) << (
void*)ptr <<
' ' << local_col_id <<
' ' <<
N;
3634 for (
size_t i = begin; i < end; ++i) {
3635 frag_col_buffers[i] = ptr;
3645 const int device_id,
3647 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
3649 std::list<ChunkIter>& chunk_iterators,
3650 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3652 const size_t thread_idx,
3653 const bool allow_runtime_interrupt) {
3657 CHECK_EQ(1u, selected_fragments.size());
3660 auto const& input_descs = ra_exe_unit.
input_descs;
3661 const auto& selected_table_key = selected_fragments.front().table_key;
3662 size_t const input_descs_index =
3664 CHECK_LT(input_descs_index, input_descs.size());
3665 size_t const input_col_descs_index =
3668 VLOG(2) <<
"selected_table_key=" << selected_table_key
3669 <<
" input_descs_index=" << input_descs_index
3670 <<
" input_col_descs_index=" << input_col_descs_index
3672 <<
" ra_exe_unit.input_col_descs="
3675 std::list<std::shared_ptr<const InputColDescriptor>> selected_input_col_descs =
3677 std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3680 selected_fragments_crossjoin, selected_fragments, ra_exe_unit);
3683 selected_fragments_crossjoin);
3685 if (allow_runtime_interrupt) {
3686 bool isInterrupted =
false;
3693 if (isInterrupted) {
3697 std::vector<const int8_t*> frag_col_buffers(
3699 for (
const auto& col_id : selected_input_col_descs) {
3702 if (cd && cd->isVirtualCol) {
3706 const auto fragments_it = all_tables_fragments.find(selected_table_key);
3707 CHECK(fragments_it != all_tables_fragments.end());
3708 const auto fragments = fragments_it->second;
3709 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
3711 size_t const local_col_id = it->second;
3713 constexpr
size_t frag_id = 0;
3714 if (fragments->empty()) {
3718 plan_state_->isColumnToFetch({selected_table_key, col_id->getColId()})
3724 col_id.get(), memory_level_for_column, device_id, device_allocator, thread_idx);
3728 all_tables_fragments,
3729 memory_level_for_column,
3737 all_tables_fragments,
3740 memory_level_for_column,
3745 set_mod_range(frag_col_buffers, ptr, local_col_id, input_descs.size());
3748 ra_exe_unit, frag_ids_crossjoin, input_descs, all_tables_fragments);
3753 <<
" input_descs_index=" << input_descs_index
3754 <<
" input_col_descs_index=" << input_col_descs_index;
3755 return {{std::move(frag_col_buffers)},
3756 {{num_rows[0][input_descs_index]}},
3757 {{frag_offsets[0][input_descs_index]}}};
3761 const size_t scan_idx,
3765 !
plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
3766 !selected_fragments[scan_idx].fragment_ids.empty()) {
3771 return selected_fragments[scan_idx].fragment_ids;
3775 std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3776 std::vector<size_t>& local_col_to_frag_pos,
3777 const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3780 local_col_to_frag_pos.resize(
plan_state_->global_to_local_col_ids_.size());
3782 const auto& input_descs = ra_exe_unit.
input_descs;
3783 for (
size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3784 const auto& table_key = input_descs[scan_idx].getTableKey();
3785 CHECK_EQ(selected_fragments[scan_idx].table_key, table_key);
3786 selected_fragments_crossjoin.push_back(
3788 for (
const auto& col_id : col_global_ids) {
3790 const auto& input_desc = col_id->getScanDesc();
3791 if (input_desc.getTableKey() != table_key ||
3792 input_desc.getNestLevel() !=
static_cast<int>(scan_idx)) {
3795 auto it =
plan_state_->global_to_local_col_ids_.find(*col_id);
3797 CHECK_LT(static_cast<size_t>(it->second),
3799 local_col_to_frag_pos[it->second] = frag_pos;
3806 std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3809 const auto& input_descs = ra_exe_unit.
input_descs;
3810 for (
size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3812 if (selected_fragments[0].table_key == input_descs[scan_idx].getTableKey()) {
3813 selected_fragments_crossjoin.push_back({size_t(1)});
3822 OutVecOwner(
const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
3824 for (
auto out : out_vec_) {
3837 const bool hoist_literals,
3839 const std::vector<Analyzer::Expr*>& target_exprs,
3841 std::vector<std::vector<const int8_t*>>& col_buffers,
3843 const std::vector<std::vector<int64_t>>& num_rows,
3844 const std::vector<std::vector<uint64_t>>& frag_offsets,
3846 const int device_id,
3847 const uint32_t start_rowid,
3848 const uint32_t num_tables,
3849 const bool allow_runtime_interrupt,
3851 const bool optimize_cuda_block_and_grid_sizes,
3852 const int64_t rows_to_process) {
3855 CHECK(!results || !(*results));
3856 if (col_buffers.empty()) {
3865 <<
"CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
3866 "currently unsupported.";
3871 std::vector<int64_t*> out_vec;
3874 std::unique_ptr<OutVecOwner> output_memory_scope;
3875 if (allow_runtime_interrupt) {
3876 bool isInterrupted =
false;
3883 if (isInterrupted) {
3893 CHECK(cpu_generated_code);
3905 join_hash_table_ptrs,
3907 output_memory_scope.reset(
new OutVecOwner(out_vec));
3911 CHECK(gpu_generated_code);
3929 allow_runtime_interrupt,
3930 join_hash_table_ptrs,
3931 render_allocator_map_ptr,
3932 optimize_cuda_block_and_grid_sizes);
3933 output_memory_scope.reset(
new OutVecOwner(out_vec));
3935 return int32_t(ErrorCode::OUT_OF_GPU_MEM);
3936 }
catch (
const std::exception& e) {
3937 LOG(
FATAL) <<
"Error launching the GPU kernel: " << e.what();
3941 ErrorCode::DIV_BY_ZERO,
3942 ErrorCode::OUT_OF_TIME,
3943 ErrorCode::INTERRUPTED,
3944 ErrorCode::SINGLE_VALUE_FOUND_MULTIPLE_VALUES,
3946 ErrorCode::WIDTH_BUCKET_INVALID_ARGUMENT,
3947 ErrorCode::BBOX_OVERLAPS_LIMIT_EXCEEDED>::check(error_code)) {
3960 std::vector<int64_t> reduced_outs;
3961 const auto num_frags = col_buffers.size();
3962 const size_t entry_count =
3968 if (
size_t(1) == entry_count) {
3969 for (
auto out : out_vec) {
3971 reduced_outs.push_back(*out);
3974 size_t out_vec_idx = 0;
3976 for (
const auto target_expr : target_exprs) {
3978 CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3979 << target_expr->toString();
3981 const int num_iterations = agg_info.sql_type.is_geometry()
3982 ? agg_info.sql_type.get_physical_coord_cols()
3985 for (
int i = 0; i < num_iterations; i++) {
3989 shared::is_any<kAPPROX_QUANTILE, kMODE>(agg_info.agg_kind)) {
3990 bool const check = shared::
3991 is_any<kCOUNT, kAPPROX_COUNT_DISTINCT, kAPPROX_QUANTILE, kMODE, kCOUNT_IF>(
3993 CHECK(check) << agg_info.agg_kind;
3994 val1 = out_vec[out_vec_idx][0];
3997 const auto chosen_bytes =
static_cast<size_t>(
4003 float_argument_input ?
sizeof(int32_t) : chosen_bytes,
4004 out_vec[out_vec_idx],
4007 float_argument_input);
4012 reduced_outs.push_back(val1);
4013 if (agg_info.agg_kind ==
kAVG ||
4014 (agg_info.agg_kind ==
kSAMPLE &&
4015 (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
4016 const auto chosen_bytes =
static_cast<size_t>(
4021 agg_info.agg_kind ==
kAVG ?
kCOUNT : agg_info.agg_kind,
4024 float_argument_input ?
sizeof(int32_t) : chosen_bytes,
4025 out_vec[out_vec_idx + 1],
4032 reduced_outs.push_back(val2);
4045 auto rows_ptr = std::shared_ptr<ResultSet>(
4047 rows_ptr->fillOneEntry(reduced_outs);
4048 *results = std::move(rows_ptr);
4056 return results && results->rowCount() < scan_limit;
4064 const bool hoist_literals,
4067 std::vector<std::vector<const int8_t*>>& col_buffers,
4068 const std::vector<size_t> outer_tab_frag_ids,
4070 const std::vector<std::vector<int64_t>>& num_rows,
4071 const std::vector<std::vector<uint64_t>>& frag_offsets,
4073 const int device_id,
4075 const int64_t scan_limit,
4076 const uint32_t start_rowid,
4077 const uint32_t num_tables,
4078 const bool allow_runtime_interrupt,
4080 const bool optimize_cuda_block_and_grid_sizes,
4081 const int64_t rows_to_process) {
4085 CHECK(!results || !(*results));
4086 if (col_buffers.empty()) {
4097 if (allow_runtime_interrupt) {
4098 bool isInterrupted =
false;
4105 if (isInterrupted) {
4110 return int32_t(ErrorCode::INTERRUPTED);
4118 VLOG(2) <<
"bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.
union_all)
4119 <<
" ra_exe_unit.input_descs="
4121 <<
" ra_exe_unit.input_col_descs="
4123 <<
" ra_exe_unit.scan_limit=" << ra_exe_unit.
scan_limit
4126 <<
" query_exe_context->query_buffers_->num_rows_="
4128 <<
" query_exe_context->query_mem_desc_.getEntryCount()="
4130 <<
" device_id=" << device_id <<
" outer_table_key=" << outer_table_key
4131 <<
" scan_limit=" << scan_limit <<
" start_rowid=" << start_rowid
4132 <<
" num_tables=" << num_tables;
4139 std::stable_sort(ra_exe_unit_copy.
input_descs.begin(),
4141 [outer_table_key](
auto const&
a,
auto const& b) {
4142 return a.getTableKey() == outer_table_key &&
4143 b.getTableKey() != outer_table_key;
4146 ra_exe_unit_copy.
input_descs.back().getTableKey() != outer_table_key) {
4151 [outer_table_key](
auto const& input_col_desc) {
4152 return input_col_desc->getScanDesc().getTableKey() != outer_table_key;
4158 const int32_t scan_limit_for_query =
4160 const int32_t max_matched = scan_limit_for_query == 0
4162 : scan_limit_for_query;
4165 CHECK(cpu_generated_code);
4177 join_hash_table_ptrs,
4183 CHECK(gpu_generated_code);
4200 allow_runtime_interrupt,
4201 join_hash_table_ptrs,
4202 render_allocator_map_ptr,
4203 optimize_cuda_block_and_grid_sizes);
4205 return int32_t(ErrorCode::OUT_OF_GPU_MEM);
4207 return int32_t(ErrorCode::OUT_OF_RENDER_MEM);
4209 return int32_t(ErrorCode::STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY);
4210 }
catch (
const std::exception& e) {
4211 LOG(
FATAL) <<
"Error launching the GPU kernel: " << e.what();
4216 ErrorCode::DIV_BY_ZERO,
4217 ErrorCode::OUT_OF_TIME,
4218 ErrorCode::INTERRUPTED,
4219 ErrorCode::SINGLE_VALUE_FOUND_MULTIPLE_VALUES,
4221 ErrorCode::WIDTH_BUCKET_INVALID_ARGUMENT,
4222 ErrorCode::BBOX_OVERLAPS_LIMIT_EXCEEDED>::check(error_code)) {
4226 if (results && error_code != int32_t(ErrorCode::OVERFLOW_OR_UNDERFLOW) &&
4227 error_code != int32_t(ErrorCode::DIV_BY_ZERO) && !render_allocator_map_ptr) {
4228 *results = query_exe_context->
getRowSet(ra_exe_unit_copy,
4231 VLOG(2) <<
"results->rowCount()=" << (*results)->rowCount();
4232 (*results)->holdLiterals(hoist_buf);
4234 if (error_code < 0 && render_allocator_map_ptr) {
4235 auto const adjusted_scan_limit =
4239 if (adjusted_scan_limit != 0) {
4245 if (results && error_code &&
4254 const int device_id) {
4255 std::vector<int8_t*> table_ptrs;
4256 const auto& join_hash_tables =
plan_state_->join_info_.join_hash_tables_;
4257 for (
auto hash_table : join_hash_tables) {
4259 CHECK(table_ptrs.empty());
4262 table_ptrs.push_back(hash_table->getJoinHashBuffer(
4269 const std::vector<InputTableInfo>& query_infos,
4274 const bool contains_left_deep_outer_join =
4275 ra_exe_unit && std::find_if(ra_exe_unit->
join_quals.begin(),
4281 new CgenState(query_infos.size(), contains_left_deep_outer_join,
this));
4289 const std::vector<InputTableInfo>& query_infos) {
4291 const auto ld_count = input_descs.size();
4293 for (
size_t i = 0; i < ld_count; ++i) {
4295 const auto frag_count = query_infos[i].info.fragments.size();
4299 if (frag_count > 1) {
4301 frag_off_ptr->getType()->getPointerElementType(), frag_off_ptr));
4310 const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
4311 const std::vector<InputTableInfo>& query_infos,
4314 const HashType preferred_hash_type,
4321 "Bounding box intersection disabled, attempting to fall back to loop join"};
4331 preferred_hash_type,
4335 hashtable_build_dag_map,
4337 table_id_to_node_map);
4340 return {
nullptr, e.what()};
4346 CHECK(!dev_props.empty());
4347 return dev_props.front().warpSize;
4362 return std::max((
unsigned)2,
4398 return static_cast<int64_t
>(dev_props.front().clockKhz) * milliseconds;
4405 if (value->getType()->isIntegerTy() && from_ti.
is_number() && to_ti.
is_fp() &&
4410 fp_type = llvm::Type::getFloatTy(
cgen_state_->context_);
4413 fp_type = llvm::Type::getDoubleTy(
cgen_state_->context_);
4418 value =
cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
4430 CHECK(val->getType()->isPointerTy());
4432 const auto val_ptr_type =
static_cast<llvm::PointerType*
>(val->getType());
4433 const auto val_type = val_ptr_type->getPointerElementType();
4434 size_t val_width = 0;
4435 if (val_type->isIntegerTy()) {
4436 val_width = val_type->getIntegerBitWidth();
4438 if (val_type->isFloatTy()) {
4441 CHECK(val_type->isDoubleTy());
4446 if (bitWidth == val_width) {
4453 #define EXECUTE_INCLUDE
4460 #undef EXECUTE_INCLUDE
4466 auto deleted_cols_it = deleted_cols_map.find(table_key);
4467 if (deleted_cols_it == deleted_cols_map.end()) {
4468 CHECK(deleted_cols_map.insert(std::make_pair(table_key, deleted_cd)).second);
4470 CHECK_EQ(deleted_cd, deleted_cols_it->second);
4481 auto ra_exe_unit_with_deleted = ra_exe_unit;
4483 for (
const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
4487 const auto& table_key = input_table.getTableKey();
4488 const auto catalog =
4491 const auto td = catalog->getMetadataForTable(table_key.table_id);
4493 const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
4497 CHECK(deleted_cd->columnType.is_boolean());
4500 for (
const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
4501 if (input_col.get()->getColId() == deleted_cd->columnId &&
4502 input_col.get()->getScanDesc().getTableKey() == table_key &&
4503 input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
4511 ra_exe_unit_with_deleted.input_col_descs.emplace_back(
4513 deleted_cd->tableId,
4515 input_table.getNestLevel()));
4519 return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
4527 const int64_t chunk_min,
4528 const int64_t chunk_max,
4533 CHECK(ldim != rdim);
4537 return {
true, chunk_min / scale, chunk_max / scale};
4541 boost::multiprecision::cpp_int_backend<64,
4543 boost::multiprecision::signed_magnitude,
4544 boost::multiprecision::checked,
4549 std::make_tuple(
true,
4553 }
catch (
const std::overflow_error& e) {
4556 return std::make_tuple(
false, chunk_min, chunk_max);
4566 if (table_key.table_id < 0) {
4570 const auto catalog =
4573 const auto td = catalog->getMetadataForTable(fragment.
physicalTableId);
4575 const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
4580 const auto& chunk_type = deleted_cd->columnType;
4581 CHECK(chunk_type.is_boolean());
4583 const auto deleted_col_id = deleted_cd->columnId;
4586 const int64_t chunk_min =
4588 const int64_t chunk_max =
4590 if (chunk_min == 1 && chunk_max == 1) {
4608 double chunk_min{0.};
4609 double chunk_max{0.};
4613 if (chunk_min > chunk_max) {
4622 const auto rhs_val = rhs_type ==
kFLOAT ? datum_fp.floatval : datum_fp.doubleval;
4628 if (chunk_max < rhs_val) {
4633 if (chunk_max <= rhs_val) {
4638 if (chunk_min > rhs_val) {
4643 if (chunk_min >= rhs_val) {
4648 if (chunk_min > rhs_val || chunk_max < rhs_val) {
4661 const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
4662 const std::vector<uint64_t>& frag_offsets,
4663 const size_t frag_idx) {
4667 <<
", fragment id: " << frag_idx;
4671 for (
const auto& simple_qual : simple_quals) {
4672 const auto comp_expr =
4680 if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4684 CHECK(lhs_uexpr->get_optype() ==
4687 if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4694 const auto rhs = comp_expr->get_right_operand();
4700 if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time() &&
4701 !lhs->get_type_info().is_fp()) {
4704 if (lhs->get_type_info().is_fp()) {
4705 const auto fragment_skip_status =
4707 switch (fragment_skip_status) {
4722 if (lhs_col->get_type_info().is_timestamp() &&
4723 rhs_const->get_type_info().is_any<
kTIME>()) {
4730 const int col_id = lhs_col->getColumnKey().column_id;
4732 int64_t chunk_min{0};
4733 int64_t chunk_max{0};
4734 bool is_rowid{
false};
4735 size_t start_rowid{0};
4739 if (cd->isVirtualCol) {
4740 CHECK(cd->columnName ==
"rowid");
4742 start_rowid = table_generation.start_rowid;
4743 chunk_min = frag_offsets[frag_idx] + start_rowid;
4744 chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
4748 const auto& chunk_type = lhs_col->get_type_info();
4754 if (chunk_min > chunk_max) {
4758 if (lhs->get_type_info().is_timestamp() &&
4759 (lhs_col->get_type_info().get_dimension() !=
4760 rhs_const->get_type_info().get_dimension()) &&
4761 (lhs_col->get_type_info().is_high_precision_timestamp() ||
4762 rhs_const->get_type_info().is_high_precision_timestamp())) {
4771 std::tie(is_valid, chunk_min, chunk_max) =
4773 chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
4775 VLOG(4) <<
"Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
4779 <<
"\nLHS col precision is: "
4781 <<
"\nRHS precision is: "
4782 <<
std::to_string(rhs_const->get_type_info().get_dimension()) <<
".";
4786 if (lhs_col->get_type_info().is_timestamp() && rhs_const->get_type_info().is_date()) {
4791 chunk_min, pow(10, lhs_col->get_type_info().get_dimension()));
4793 chunk_max, pow(10, lhs_col->get_type_info().get_dimension()));
4795 llvm::LLVMContext local_context;
4796 CgenState local_cgen_state(local_context);
4799 const auto rhs_val =
4802 switch (comp_expr->get_optype()) {
4804 if (chunk_max < rhs_val) {
4809 if (chunk_max <= rhs_val) {
4814 if (chunk_min > rhs_val) {
4819 if (chunk_min >= rhs_val) {
4824 if (chunk_min > rhs_val || chunk_max < rhs_val) {
4826 }
else if (is_rowid) {
4827 return {
false, rhs_val - start_rowid};
4865 const std::vector<uint64_t>& frag_offsets,
4866 const size_t frag_idx) {
4867 std::pair<bool, int64_t> skip_frag{
false, -1};
4868 for (
auto& inner_join : ra_exe_unit.
join_quals) {
4875 std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
4876 for (
auto& qual : inner_join.quals) {
4878 inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
4879 temp_qual.simple_quals.begin(),
4880 temp_qual.simple_quals.end());
4883 table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
4884 if (temp_skip_frag.second != -1) {
4885 skip_frag.second = temp_skip_frag.second;
4888 skip_frag.first = skip_frag.first || temp_skip_frag.first;
4895 const std::unordered_set<PhysicalInput>& phys_inputs) {
4897 std::unordered_set<shared::TableKey> phys_table_keys;
4898 for (
const auto& phys_input : phys_inputs) {
4899 phys_table_keys.emplace(phys_input.db_id, phys_input.table_id);
4901 std::vector<InputTableInfo> query_infos;
4902 for (
const auto& table_key : phys_table_keys) {
4905 for (
const auto& phys_input : phys_inputs) {
4906 auto db_id = phys_input.db_id;
4907 auto table_id = phys_input.table_id;
4908 auto column_id = phys_input.col_id;
4913 const auto col_var = std::make_unique<Analyzer::ColumnVar>(
4916 agg_col_range_cache.
setColRange(phys_input, col_range);
4919 return agg_col_range_cache;
4923 const std::unordered_set<PhysicalInput>& phys_inputs) {
4929 for (
const auto& phys_input : phys_inputs) {
4930 const auto catalog =
4933 const auto cd = catalog->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4935 const auto& col_ti =
4936 cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
4937 if (col_ti.is_string() && col_ti.get_compression() ==
kENCODING_DICT) {
4938 const auto& dict_key = col_ti.getStringDictKey();
4939 const auto dd = catalog->getMetadataForDict(dict_key.dict_id);
4940 CHECK(dd && dd->stringDict);
4942 dd->stringDict->storageEntryCount());
4945 return string_dictionary_generations;
4949 const std::unordered_set<shared::TableKey>& phys_table_keys) {
4951 for (
const auto& table_key : phys_table_keys) {
4955 TableGeneration{
static_cast<int64_t
>(table_info.getPhysicalNumTuples()), 0});
4957 return table_generations;
4961 const std::unordered_set<shared::TableKey>& phys_table_ids) {
4996 return !candidate_query_session.empty() &&
5008 ->second.getQueryStatus();
5010 return QuerySessionStatus::QueryStatus::UNDEFINED;
5020 const std::string& query_str,
5021 const std::string& query_submitted_time) {
5022 if (!query_session_id.empty()) {
5026 query_session_id, query_submitted_time,
executor_id_, write_lock);
5028 query_submitted_time,
5029 QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
5032 return {query_session_id, query_str};
5039 if (query_session.empty()) {
5046 VLOG(1) <<
"Interrupting pending query is not available since the query session is "
5051 <<
"Interrupting pending query is not available since its interrupt flag is "
5062 const std::string& submitted_time_str) {
5065 if (query_session.empty()) {
5077 const std::string& submitted_time_str,
5081 if (query_session.empty()) {
5084 if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5088 query_session, submitted_time_str, new_query_status, session_write_lock);
5093 const std::string& query_str,
5094 const std::string& submitted_time_str,
5095 const size_t executor_id,
5099 if (query_session.empty()) {
5107 query_session_status,
5108 session_write_lock);
5110 if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5122 const std::string& query_str,
5123 const std::string& submitted_time_str,
5124 const size_t executor_id,
5132 .emplace(submitted_time_str,
5140 .emplace(submitted_time_str,
5148 std::map<std::string, QuerySessionStatus> executor_per_query_map;
5149 executor_per_query_map.emplace(
5152 query_session, executor_id, query_str, submitted_time_str, query_status));
5160 const std::string& submitted_time_str,
5164 if (query_session.empty()) {
5169 auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
5171 if (submitted_time_str.compare(target_submitted_t_str) == 0) {
5172 auto prev_status = query_status.second.getQueryStatus();
5173 if (prev_status == updated_query_status) {
5176 query_status.second.setQueryStatus(updated_query_status);
5186 const std::string& submitted_time_str,
5187 const size_t executor_id,
5190 if (query_session.empty()) {
5195 for (
auto it = storage.begin(); it != storage.end(); it++) {
5196 auto target_submitted_t_str = it->second.getQuerySubmittedTime();
5198 if (submitted_time_str.compare(target_submitted_t_str) == 0) {
5200 .at(submitted_time_str)
5201 .setExecutorId(executor_id);
5211 const std::string& submitted_time_str,
5213 if (query_session.empty()) {
5218 if (storage.size() > 1) {
5220 for (
auto it = storage.begin(); it != storage.end(); it++) {
5221 auto target_submitted_t_str = it->second.getQuerySubmittedTime();
5224 submitted_time_str.compare(target_submitted_t_str) == 0) {
5229 }
else if (storage.size() == 1) {
5246 if (query_session.empty()) {
5257 if (query_session.empty()) {
5268 if (query_session.empty()) {
5275 const double runtime_query_check_freq,
5276 const unsigned pending_query_check_freq)
const {
5290 const size_t cache_value) {
5294 VLOG(1) <<
"Put estimated cardinality to the cache";
5303 VLOG(1) <<
"Reuse cached cardinality";
5320 if (it->first.containsTableKey(table_key)) {
5334 std::vector<QuerySessionStatus> ret;
5335 for (
auto& info : query_infos) {
5336 ret.emplace_back(query_session,
5337 info.second.getExecutorId(),
5338 info.second.getQueryStr(),
5339 info.second.getQuerySubmittedTime(),
5340 info.second.getQueryStatus());
5349 std::vector<size_t>
res;
5353 for (
auto& kv : it->second) {
5354 if (kv.second.getQueryStatus() ==
5355 QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5356 res.push_back(kv.second.getExecutorId());
5388 const size_t num_cpu_slots,
5389 const size_t num_gpu_slots,
5390 const size_t cpu_result_mem,
5391 const size_t cpu_buffer_pool_mem,
5392 const size_t gpu_buffer_pool_mem,
5393 const double per_query_max_cpu_slots_ratio,
5394 const double per_query_max_cpu_result_mem_ratio,
5395 const bool allow_cpu_kernel_concurrency,
5396 const bool allow_cpu_gpu_kernel_concurrency,
5397 const bool allow_cpu_slot_oversubscription_concurrency,
5398 const bool allow_cpu_result_mem_oversubscription_concurrency,
5399 const double max_available_resource_use_ratio) {
5400 const double per_query_max_pinned_cpu_buffer_pool_mem_ratio{1.0};
5401 const double per_query_max_pageable_cpu_buffer_pool_mem_ratio{0.5};
5406 cpu_buffer_pool_mem,
5407 gpu_buffer_pool_mem,
5408 per_query_max_cpu_slots_ratio,
5409 per_query_max_cpu_result_mem_ratio,
5410 per_query_max_pinned_cpu_buffer_pool_mem_ratio,
5411 per_query_max_pageable_cpu_buffer_pool_mem_ratio,
5412 allow_cpu_kernel_concurrency,
5413 allow_cpu_gpu_kernel_concurrency,
5414 allow_cpu_slot_oversubscription_concurrency,
5416 allow_cpu_result_mem_oversubscription_concurrency,
5417 max_available_resource_use_ratio);
5422 throw std::runtime_error(
5423 "Executor queue cannot be paused as it requires Executor Resource Manager to be "
5431 throw std::runtime_error(
5432 "Executor queue cannot be resumed as it requires Executor Resource Manager to be "
5441 throw std::runtime_error(
5442 "ExecutorResourceMgr must be enabled to obtain executor resource pool stats.");
5450 throw std::runtime_error(
5451 "ExecutorResourceMgr must be enabled to obtain executor resource pool stats.");
5458 const size_t resource_quantity) {
5460 throw std::runtime_error(
5461 "ExecutorResourceMgr must be enabled to set executor resource pool resource.");
5470 throw std::runtime_error(
5471 "ExecutorResourceMgr must be enabled to set executor concurrent resource grant "
5479 concurrent_resource_grant_policy) {
5481 throw std::runtime_error(
5482 "ExecutorResourceMgr must be enabled to set executor concurrent resource grant "
5486 concurrent_resource_grant_policy);
5508 std::shared_ptr<ExecutorResourceMgr_Namespace::ExecutorResourceMgr>
5521 std::stringstream ss;
5522 ss <<
"colRangeCache: ";
5524 ss <<
"{" << phys_input.col_id <<
", " << phys_input.table_id
5525 <<
"} = " << exp_range.toString() <<
", ";
5527 ss <<
"stringDictGenerations: ";
5528 for (
auto& [key, val] :
row_set_mem_owner_->getStringDictionaryGenerations().asMap()) {
5529 ss << key <<
" = " << val <<
", ";
5531 ss <<
"tableGenerations: ";
5533 ss << key <<
" = {" << val.tuple_count <<
", " << val.start_rowid <<
"}, ";
void logSystemGPUMemoryStatus(std::string const &tag, size_t const thread_idx) const
A container for various stats about the current state of the ExecutorResourcePool. Note that ExecutorResourcePool does not persist a struct of this type, but rather builds one on the fly when ExecutorResourcePool::get_resource_info() is called.
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
size_t g_watchdog_in_clause_max_num_elem_non_bitmap
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
bool is_agg(const Analyzer::Expr *expr)
std::vector< Analyzer::Expr * > target_exprs
A container to store requested and minimum neccessary resource requests across all resource types cur...
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const