OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Execute.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryEngine/Execute.h"
18 
19 #include <llvm/Transforms/Utils/BasicBlockUtils.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 
23 #ifdef HAVE_CUDA
24 #include <cuda.h>
25 #endif // HAVE_CUDA
26 #include <chrono>
27 #include <ctime>
28 #include <future>
29 #include <memory>
30 #include <mutex>
31 #include <numeric>
32 #include <set>
33 #include <thread>
34 #include <type_traits>
35 
36 #include "Catalog/Catalog.h"
37 #include "CudaMgr/CudaMgr.h"
41 #include "Parser/ParserNode.h"
73 #include "Shared/checked_alloc.h"
74 #include "Shared/measure.h"
75 #include "Shared/misc.h"
76 #include "Shared/scope.h"
77 #include "Shared/shard_key.h"
78 #include "Shared/threading.h"
79 
80 bool g_enable_watchdog{false};
90 size_t g_cpu_sub_task_size{500'000};
91 bool g_enable_filter_function{true};
92 unsigned g_dynamic_watchdog_time_limit{10000};
93 bool g_allow_cpu_retry{true};
94 bool g_allow_query_step_cpu_retry{true};
95 bool g_null_div_by_zero{false};
96 unsigned g_trivial_loop_join_threshold{1000};
97 bool g_from_table_reordering{true};
98 bool g_inner_join_fragment_skipping{true};
99 extern bool g_enable_smem_group_by;
100 extern std::unique_ptr<llvm::Module> udf_gpu_module;
101 extern std::unique_ptr<llvm::Module> udf_cpu_module;
102 bool g_enable_filter_push_down{false};
103 float g_filter_push_down_low_frac{-1.0f};
104 float g_filter_push_down_high_frac{-1.0f};
105 size_t g_filter_push_down_passing_row_ubound{0};
106 bool g_enable_columnar_output{false};
107 bool g_enable_left_join_filter_hoisting{true};
108 bool g_optimize_row_initialization{true};
109 bool g_enable_bbox_intersect_hashjoin{true};
110 size_t g_num_tuple_threshold_switch_to_baseline{100000};
111 size_t g_ratio_num_hash_entry_to_num_tuple_switch_to_baseline{100};
112 bool g_enable_distance_rangejoin{true};
113 bool g_enable_hashjoin_many_to_many{true};
114 size_t g_bbox_intersect_max_table_size_bytes{1024 * 1024 * 1024};
115 double g_bbox_intersect_target_entries_per_bin{1.3};
116 bool g_strip_join_covered_quals{false};
117 size_t g_constrained_by_in_threshold{10};
118 size_t g_default_max_groups_buffer_entry_guess{16384};
119 size_t g_big_group_threshold{g_default_max_groups_buffer_entry_guess};
120 bool g_enable_window_functions{true};
121 bool g_enable_table_functions{true};
122 bool g_enable_ml_functions{true};
123 bool g_restrict_ml_model_metadata_to_superusers{false};
124 bool g_enable_dev_table_functions{false};
125 bool g_enable_geo_ops_on_uncompressed_coords{true};
126 bool g_enable_rf_prop_table_functions{true};
127 bool g_allow_memory_status_log{true};
128 size_t g_max_memory_allocation_size{2000000000}; // set to max slab size
129 size_t g_min_memory_allocation_size{
130  256}; // minimum memory allocation required for projection query output buffer
131  // without pre-flight count
132 bool g_enable_bump_allocator{false};
133 double g_bump_allocator_step_reduction{0.75};
134 bool g_enable_direct_columnarization{true};
135 extern bool g_enable_string_functions;
136 bool g_enable_lazy_fetch{true};
137 bool g_enable_runtime_query_interrupt{true};
138 bool g_enable_non_kernel_time_query_interrupt{true};
139 bool g_use_estimator_result_cache{true};
140 unsigned g_pending_query_interrupt_freq{1000};
141 double g_running_query_interrupt_freq{0.1};
142 size_t g_gpu_smem_threshold{
143  4096}; // GPU shared memory threshold (in bytes), if larger
144  // buffer sizes are required we do not use GPU shared
145  // memory optimizations Setting this to 0 means unlimited
146  // (subject to other dynamically calculated caps)
147 bool g_enable_smem_grouped_non_count_agg{
148  true}; // enable use of shared memory when performing group-by with select non-count
149  // aggregates
150 bool g_enable_smem_non_grouped_agg{
151  true}; // enable optimizations for using GPU shared memory in implementation of
152  // non-grouped aggregates
153 bool g_is_test_env{false}; // operating under a unit test environment. Currently only
154  // limits the allocation for the output buffer arena
155  // and data recycler test
156 size_t g_enable_parallel_linearization{
157  10000}; // # rows that we are trying to linearize varlen col in parallel
158 bool g_enable_data_recycler{true};
159 bool g_use_hashtable_cache{true};
160 bool g_use_query_resultset_cache{true};
161 bool g_use_chunk_metadata_cache{true};
162 bool g_allow_auto_resultset_caching{false};
163 bool g_allow_query_step_skipping{true};
164 size_t g_hashtable_cache_total_bytes{size_t(1) << 32};
165 size_t g_max_cacheable_hashtable_size_bytes{size_t(1) << 31};
166 size_t g_query_resultset_cache_total_bytes{size_t(1) << 32};
167 size_t g_max_cacheable_query_resultset_size_bytes{size_t(1) << 31};
168 size_t g_auto_resultset_caching_threshold{size_t(1) << 20};
169 bool g_optimize_cuda_block_and_grid_sizes{false};
170 
171 size_t g_approx_quantile_buffer{1000};
172 size_t g_approx_quantile_centroids{300};
173 
174 bool g_enable_automatic_ir_metadata{true};
175 
176 size_t g_max_log_length{500};
177 
178 bool g_enable_executor_resource_mgr{true};
179 
180 double g_executor_resource_mgr_cpu_result_mem_ratio{0.8};
181 size_t g_executor_resource_mgr_cpu_result_mem_bytes{Executor::auto_cpu_mem_bytes};
182 double g_executor_resource_mgr_per_query_max_cpu_slots_ratio{0.9};
183 double g_executor_resource_mgr_per_query_max_cpu_result_mem_ratio{0.8};
184 
185 // Todo: rework ConcurrentResourceGrantPolicy and ExecutorResourcePool to allow
186 // thresholds for concurrent oversubscription, rather than just boolean allowed/disallowed
187 bool g_executor_resource_mgr_allow_cpu_kernel_concurrency{true};
188 bool g_executor_resource_mgr_allow_cpu_gpu_kernel_concurrency{true};
189 // Whether a single query can oversubscribe CPU slots should be controlled with
190 // g_executor_resource_mgr_per_query_max_cpu_slots_ratio
191 bool g_executor_resource_mgr_allow_cpu_slot_oversubscription_concurrency{false};
192 // Whether a single query can oversubscribe CPU memory should be controlled with
193 // g_executor_resource_mgr_per_query_max_cpu_slots_ratio
194 bool g_executor_resource_mgr_allow_cpu_result_mem_oversubscription_concurrency{false};
195 double g_executor_resource_mgr_max_available_resource_use_ratio{0.8};
196 
197 bool g_use_cpu_mem_pool_for_output_buffers{false};
198 
199 extern bool g_cache_string_hash;
200 extern bool g_allow_memory_status_log;
201 
202 int const Executor::max_gpu_count;
203 
204 std::map<Executor::ExtModuleKinds, std::string> Executor::extension_module_sources;
205 
206 extern std::unique_ptr<llvm::Module> read_llvm_module_from_bc_file(
207  const std::string& udf_ir_filename,
208  llvm::LLVMContext& ctx);
209 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_file(
210  const std::string& udf_ir_filename,
211  llvm::LLVMContext& ctx,
212  bool is_gpu = false);
213 extern std::unique_ptr<llvm::Module> read_llvm_module_from_ir_string(
214  const std::string& udf_ir_string,
215  llvm::LLVMContext& ctx,
216  bool is_gpu = false);
217 
218 namespace {
219 // This function is notably different from that in RelAlgExecutor because it already
220 // expects SPI values and therefore needs to avoid that transformation.
221 void prepare_string_dictionaries(const std::unordered_set<PhysicalInput>& phys_inputs) {
222  for (const auto [col_id, table_id, db_id] : phys_inputs) {
223  foreign_storage::populate_string_dictionary(table_id, col_id, db_id);
224  }
225 }
226 
227 bool is_empty_table(Fragmenter_Namespace::AbstractFragmenter* fragmenter) {
228  const auto& fragments = fragmenter->getFragmentsForQuery().fragments;
229  // The fragmenter always returns at least one fragment, even when the table is empty.
230  return (fragments.size() == 1 && fragments[0].getChunkMetadataMap().empty());
231 }
232 } // namespace
233 
234 namespace foreign_storage {
235 // Foreign tables skip the population of dictionaries during metadata scan. This function
236 // will populate a dictionary's missing entries by fetching any unpopulated chunks.
237 void populate_string_dictionary(int32_t table_id, int32_t col_id, int32_t db_id) {
238  const auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
239  CHECK(catalog);
240  if (const auto foreign_table = dynamic_cast<const ForeignTable*>(
241  catalog->getMetadataForTable(table_id, false))) {
242  const auto col_desc = catalog->getMetadataForColumn(table_id, col_id);
243  if (col_desc->columnType.is_dict_encoded_type()) {
244  auto& fragmenter = foreign_table->fragmenter;
245  CHECK(fragmenter != nullptr);
246  if (is_empty_table(fragmenter.get())) {
247  return;
248  }
249  for (const auto& frag : fragmenter->getFragmentsForQuery().fragments) {
250  ChunkKey chunk_key = {db_id, table_id, col_id, frag.fragmentId};
251  // If the key is sharded across leaves, only populate fragments that are sharded
252  // to this leaf.
253  if (key_does_not_shard_to_leaf(chunk_key)) {
254  continue;
255  }
256 
257  const ChunkMetadataMap& metadata_map = frag.getChunkMetadataMap();
258  CHECK(metadata_map.find(col_id) != metadata_map.end());
259  if (auto& meta = metadata_map.at(col_id); meta->isPlaceholder()) {
260  // When this goes out of scope it will stay in CPU cache but become
261  // evictable
262  auto chunk = Chunk_NS::Chunk::getChunk(col_desc,
263  &(catalog->getDataMgr()),
264  chunk_key,
266  0,
267  0,
268  0);
269  }
270  }
271  }
272  }
273 }
274 } // namespace foreign_storage
275 
276 Executor::Executor(const ExecutorId executor_id,
277  Data_Namespace::DataMgr* data_mgr,
278  const size_t block_size_x,
279  const size_t grid_size_x,
280  const size_t max_gpu_slab_size,
281  const std::string& debug_dir,
282  const std::string& debug_file)
283  : executor_id_(executor_id)
284  , context_(new llvm::LLVMContext())
285  , cgen_state_(new CgenState({}, false, this))
286  , block_size_x_(block_size_x)
287  , grid_size_x_(grid_size_x)
288  , max_gpu_slab_size_(max_gpu_slab_size)
289  , debug_dir_(debug_dir)
290  , debug_file_(debug_file)
291  , data_mgr_(data_mgr)
292  , temporary_tables_(nullptr)
295  update_extension_modules();
296 }
297 
302  auto root_path = heavyai::get_root_abs_path();
303  auto template_path = root_path + "/QueryEngine/RuntimeFunctions.bc";
304  CHECK(boost::filesystem::exists(template_path));
306  template_path;
307 #ifdef ENABLE_GEOS
308  auto rt_geos_path = root_path + "/QueryEngine/GeosRuntime.bc";
309  CHECK(boost::filesystem::exists(rt_geos_path));
311  rt_geos_path;
312 #endif
313 #ifdef HAVE_CUDA
314  auto rt_libdevice_path = get_cuda_libdevice_dir() + "/libdevice.10.bc";
315  if (boost::filesystem::exists(rt_libdevice_path)) {
317  rt_libdevice_path;
318  } else {
319  LOG(WARNING) << "File " << rt_libdevice_path
320  << " does not exist; support for some UDF "
321  "functions might not be available.";
322  }
323 #endif
324  }
325 }
326 
327 void Executor::reset(bool discard_runtime_modules_only) {
328  // TODO: keep cached results that do not depend on runtime UDF/UDTFs
329  auto qe = QueryEngine::getInstance();
330  qe->s_code_accessor->clear();
331  qe->s_stubs_accessor->clear();
332  qe->cpu_code_accessor->clear();
333  qe->gpu_code_accessor->clear();
334  qe->tf_code_accessor->clear();
335 
336  if (discard_runtime_modules_only) {
337  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_cpu_module);
338 #ifdef HAVE_CUDA
339  extension_modules_.erase(Executor::ExtModuleKinds::rt_udf_gpu_module);
340 #endif
341  cgen_state_->module_ = nullptr;
342  } else {
343  extension_modules_.clear();
344  cgen_state_.reset();
345  context_.reset(new llvm::LLVMContext());
346  cgen_state_.reset(new CgenState({}, false, this));
347  }
348 }
349 
350 void Executor::update_extension_modules(bool update_runtime_modules_only) {
351  auto read_module = [&](Executor::ExtModuleKinds module_kind,
352  const std::string& source) {
353  /*
354  source can be either a filename of a LLVM IR
355  or LLVM BC source, or a string containing
356  LLVM IR code.
357  */
358  CHECK(!source.empty());
359  switch (module_kind) {
363  return read_llvm_module_from_bc_file(source, getContext());
364  }
366  return read_llvm_module_from_ir_file(source, getContext(), false);
367  }
369  return read_llvm_module_from_ir_file(source, getContext(), true);
370  }
372  return read_llvm_module_from_ir_string(source, getContext(), false);
373  }
375  return read_llvm_module_from_ir_string(source, getContext(), true);
376  }
377  default: {
378  UNREACHABLE();
379  return std::unique_ptr<llvm::Module>();
380  }
381  }
382  };
383  auto update_module = [&](Executor::ExtModuleKinds module_kind,
384  bool erase_not_found = false) {
385  auto it = Executor::extension_module_sources.find(module_kind);
386  if (it != Executor::extension_module_sources.end()) {
387  auto llvm_module = read_module(module_kind, it->second);
388  if (llvm_module) {
389  extension_modules_[module_kind] = std::move(llvm_module);
390  } else if (erase_not_found) {
391  extension_modules_.erase(module_kind);
392  } else {
393  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
394  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
395  << " LLVM module. The module will be unavailable.";
396  } else {
397  LOG(WARNING) << "Failed to update " << ::toString(module_kind)
398  << " LLVM module. Using the existing module.";
399  }
400  }
401  } else {
402  if (erase_not_found) {
403  extension_modules_.erase(module_kind);
404  } else {
405  if (extension_modules_.find(module_kind) == extension_modules_.end()) {
406  LOG(WARNING) << "Source of " << ::toString(module_kind)
407  << " LLVM module is unavailable. The module will be unavailable.";
408  } else {
409  LOG(WARNING) << "Source of " << ::toString(module_kind)
410  << " LLVM module is unavailable. Using the existing module.";
411  }
412  }
413  }
414  };
415 
416  if (!update_runtime_modules_only) {
417  // required compile-time modules, their requirements are enforced
418  // by Executor::initialize_extension_module_sources():
420 #ifdef ENABLE_GEOS
422 #endif
423  // load-time modules, these are optional:
424  update_module(Executor::ExtModuleKinds::udf_cpu_module, true);
425 #ifdef HAVE_CUDA
426  update_module(Executor::ExtModuleKinds::udf_gpu_module, true);
428 #endif
429  }
430  // run-time modules, these are optional and erasable:
431  update_module(Executor::ExtModuleKinds::rt_udf_cpu_module, true);
432 #ifdef HAVE_CUDA
433  update_module(Executor::ExtModuleKinds::rt_udf_gpu_module, true);
434 #endif
435 }
436 
437 // Used by StubGenerator::generateStub
439  : executor_(executor)
440  , lock_queue_clock_(timer_start())
441  , lock_(executor_.compilation_mutex_)
442  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
443 {
444  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
445  executor_.cgen_state_.reset(new CgenState(0, false, &executor));
446 }
447 
449  Executor& executor,
450  const bool allow_lazy_fetch,
451  const std::vector<InputTableInfo>& query_infos,
452  const PlanState::DeletedColumnsMap& deleted_cols_map,
453  const RelAlgExecutionUnit* ra_exe_unit)
454  : executor_(executor)
455  , lock_queue_clock_(timer_start())
457  , cgen_state_(std::move(executor_.cgen_state_)) // store old CgenState instance
458 {
459  executor_.compilation_queue_time_ms_ += timer_stop(lock_queue_clock_);
460  // nukeOldState creates new CgenState and PlanState instances for
461  // the subsequent code generation. It also resets
462  // kernel_queue_time_ms_ and compilation_queue_time_ms_ that we do
463  // not currently restore.. should we accumulate these timings?
464  executor_.nukeOldState(allow_lazy_fetch, query_infos, deleted_cols_map, ra_exe_unit);
465 }
466 
468  // prevent memory leak from hoisted literals
469  for (auto& p : executor_.cgen_state_->row_func_hoisted_literals_) {
470  auto inst = llvm::dyn_cast<llvm::LoadInst>(p.first);
471  if (inst && inst->getNumUses() == 0 && inst->getParent() == nullptr) {
472  // The llvm::Value instance stored in p.first is created by the
473  // CodeGenerator::codegenHoistedConstantsPlaceholders method.
474  p.first->deleteValue();
475  }
476  }
477  executor_.cgen_state_->row_func_hoisted_literals_.clear();
478 
479  // move generated StringDictionaryTranslationMgrs and InValueBitmaps
480  // to the old CgenState instance as the execution of the generated
481  // code uses these bitmaps
482 
483  for (auto& bm : executor_.cgen_state_->in_values_bitmaps_) {
484  cgen_state_->moveInValuesBitmap(bm);
485  }
486  executor_.cgen_state_->in_values_bitmaps_.clear();
487 
488  for (auto& str_dict_translation_mgr :
489  executor_.cgen_state_->str_dict_translation_mgrs_) {
490  cgen_state_->moveStringDictionaryTranslationMgr(std::move(str_dict_translation_mgr));
491  }
492  executor_.cgen_state_->str_dict_translation_mgrs_.clear();
493 
494  for (auto& tree_model_prediction_mgr :
495  executor_.cgen_state_->tree_model_prediction_mgrs_) {
496  cgen_state_->moveTreeModelPredictionMgr(std::move(tree_model_prediction_mgr));
497  }
498  executor_.cgen_state_->tree_model_prediction_mgrs_.clear();
499 
500  // Delete worker module that may have been set by
501  // set_module_shallow_copy. If QueryMustRunOnCpu is thrown, the
502  // worker module is not instantiated, so the worker module needs to
503  // be deleted conditionally [see "Managing LLVM modules" comment in
504  // CgenState.h]:
505  if (executor_.cgen_state_->module_) {
506  delete executor_.cgen_state_->module_;
507  }
508 
509  // restore the old CgenState instance
510  executor_.cgen_state_.reset(cgen_state_.release());
511 }
512 
513 std::shared_ptr<Executor> Executor::getExecutor(
514  const ExecutorId executor_id,
515  const std::string& debug_dir,
516  const std::string& debug_file,
517  const SystemParameters& system_parameters) {
519  auto it = executors_.find(executor_id);
520  if (it != executors_.end()) {
521  return it->second;
522  }
524  auto executor = std::make_shared<Executor>(executor_id,
525  &data_mgr,
526  system_parameters.cuda_block_size,
527  system_parameters.cuda_grid_size,
528  system_parameters.max_gpu_slab_size,
529  debug_dir,
530  debug_file);
531  CHECK(executors_.insert(std::make_pair(executor_id, executor)).second);
532  return executor;
533 }
534 
536  switch (memory_level) {
540  execute_mutex_); // Don't flush memory while queries are running
541 
542  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
543  // The hash table cache uses CPU memory not managed by the buffer manager. In the
544  // future, we should manage these allocations with the buffer manager directly.
545  // For now, assume the user wants to purge the hash table cache when they clear
546  // CPU memory (currently used in ExecuteTest to lower memory pressure)
547  // TODO: Move JoinHashTableCacheInvalidator to Executor::clearExternalCaches();
549  }
550  Executor::clearExternalCaches(true, nullptr, 0);
552  break;
553  }
554  default: {
555  throw std::runtime_error(
556  "Clearing memory levels other than the CPU level or GPU level is not "
557  "supported.");
558  }
559  }
560 }
561 
563  return g_is_test_env ? 100000000 : (1UL << 32) + kArenaBlockOverhead;
564 }
565 
567  const shared::StringDictKey& dict_id_in,
568  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
569  const bool with_generation) const {
570  CHECK(row_set_mem_owner);
571  std::lock_guard<std::mutex> lock(
572  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
573  return row_set_mem_owner->getOrAddStringDictProxy(dict_id_in, with_generation);
574 }
575 
577  const shared::StringDictKey& dict_key_in,
578  const bool with_generation) {
579  const int dict_id{dict_key_in.dict_id < 0 ? REGULAR_DICT(dict_key_in.dict_id)
580  : dict_key_in.dict_id};
581  const auto catalog =
583  if (catalog) {
584  const auto dd = catalog->getMetadataForDict(dict_id);
585  if (dd) {
586  auto dict_key = dict_key_in;
587  dict_key.dict_id = dict_id;
588  CHECK(dd->stringDict);
589  CHECK_LE(dd->dictNBits, 32);
590  const int64_t generation =
591  with_generation ? string_dictionary_generations_.getGeneration(dict_key) : -1;
592  return addStringDict(dd->stringDict, dict_key, generation);
593  }
594  }
596  if (!lit_str_dict_proxy_) {
597  DictRef literal_dict_ref(dict_key_in.db_id, DictRef::literalsDictId);
598  std::shared_ptr<StringDictionary> tsd = std::make_shared<StringDictionary>(
599  literal_dict_ref, "", false, true, g_cache_string_hash);
600  lit_str_dict_proxy_ = std::make_shared<StringDictionaryProxy>(
601  tsd, shared::StringDictKey{literal_dict_ref.dbId, literal_dict_ref.dictId}, 0);
602  }
603  return lit_str_dict_proxy_.get();
604 }
605 
607  const shared::StringDictKey& source_dict_key,
608  const shared::StringDictKey& dest_dict_key,
609  const RowSetMemoryOwner::StringTranslationType translation_type,
610  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
611  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
612  const bool with_generation) const {
613  CHECK(row_set_mem_owner);
614  std::lock_guard<std::mutex> lock(
615  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
616  return row_set_mem_owner->getOrAddStringProxyTranslationMap(
617  source_dict_key, dest_dict_key, with_generation, translation_type, string_op_infos);
618 }
619 
622  const StringDictionaryProxy* source_proxy,
623  StringDictionaryProxy* dest_proxy,
624  const std::vector<StringOps_Namespace::StringOpInfo>& source_string_op_infos,
625  const std::vector<StringOps_Namespace::StringOpInfo>& dest_string_op_infos,
626  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) const {
627  CHECK(row_set_mem_owner);
628  std::lock_guard<std::mutex> lock(
629  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
630  // First translate lhs onto itself if there are string ops
631  if (!dest_string_op_infos.empty()) {
632  row_set_mem_owner->addStringProxyUnionTranslationMap(
633  dest_proxy, dest_proxy, dest_string_op_infos);
634  }
635  return row_set_mem_owner->addStringProxyIntersectionTranslationMap(
636  source_proxy, dest_proxy, source_string_op_infos);
637 }
638 
641  const shared::StringDictKey& source_dict_key,
642  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
643  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
644  const bool with_generation) const {
645  CHECK(row_set_mem_owner);
646  std::lock_guard<std::mutex> lock(
647  str_dict_mutex_); // TODO: can we use RowSetMemOwner state mutex here?
648  return row_set_mem_owner->getOrAddStringProxyNumericTranslationMap(
649  source_dict_key, with_generation, string_op_infos);
650 }
651 
653  const shared::StringDictKey& source_dict_key_in,
654  const shared::StringDictKey& dest_dict_key_in,
655  const bool with_generation,
656  const RowSetMemoryOwner::StringTranslationType translation_type,
657  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
658  const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
659  const auto dest_proxy = getOrAddStringDictProxy(dest_dict_key_in, with_generation);
661  return addStringProxyIntersectionTranslationMap(
662  source_proxy, dest_proxy, string_op_infos);
663  } else {
664  return addStringProxyUnionTranslationMap(source_proxy, dest_proxy, string_op_infos);
665  }
666 }
667 
670  const shared::StringDictKey& source_dict_key_in,
671  const bool with_generation,
672  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
673  const auto source_proxy = getOrAddStringDictProxy(source_dict_key_in, with_generation);
674  return addStringProxyNumericTranslationMap(source_proxy, string_op_infos);
675 }
676 
678  ApproxQuantileDescriptor const desc,
679  double const q) {
680  static_assert(std::is_trivially_copyable_v<ApproxQuantileDescriptor>);
681  std::lock_guard<std::mutex> lock(state_mutex_);
682  auto t_digest = std::make_unique<quantile::TDigest>(
683  q, &t_digest_allocators_[thread_idx], desc.buffer_size, desc.centroids_size);
684  return t_digests_.emplace_back(std::move(t_digest)).get();
685 }
686 
687 void RowSetMemoryOwner::reserveTDigestMemory(size_t thread_idx, size_t capacity) {
688  std::unique_lock<std::mutex> lock(state_mutex_);
689  if (t_digest_allocators_.size() <= thread_idx) {
690  t_digest_allocators_.resize(thread_idx + 1u);
691  }
692  if (t_digest_allocators_[thread_idx].capacity()) {
693  // This can only happen when a thread_idx is re-used. In other words,
694  // two or more kernels have launched (serially!) using the same thread_idx.
695  // This is ok since TDigestAllocator does not own the memory it allocates.
696  VLOG(2) << "Replacing t_digest_allocators_[" << thread_idx << "].";
697  }
698  lock.unlock();
699  // This is not locked due to use of same state_mutex_ during allocation.
700  // The corresponding deallocation happens in ~DramArena().
701  int8_t* const buffer = allocate(capacity, thread_idx);
702  lock.lock();
703  t_digest_allocators_[thread_idx] = TDigestAllocator(buffer, capacity);
704 }
705 
706 bool Executor::isCPUOnly() const {
707  CHECK(data_mgr_);
708  return !data_mgr_->getCudaMgr();
709 }
710 
712  const Analyzer::ColumnVar* col_var) const {
713  return get_column_descriptor_maybe(col_var->getColumnKey());
714 }
715 
717  const Analyzer::ColumnVar* col_var,
718  int n) const {
719  const auto cd = getColumnDescriptor(col_var);
720  if (!cd || n > cd->columnType.get_physical_cols()) {
721  return nullptr;
722  }
723  auto column_key = col_var->getColumnKey();
724  column_key.column_id += n;
725  return get_column_descriptor_maybe(column_key);
726 }
727 
728 const std::shared_ptr<RowSetMemoryOwner> Executor::getRowSetMemoryOwner() const {
729  return row_set_mem_owner_;
730 }
731 
733  return temporary_tables_;
734 }
735 
737  const shared::TableKey& table_key) const {
738  return input_table_info_cache_.getTableInfo(table_key);
739 }
740 
742  const shared::TableKey& table_key) const {
743  return table_generations_.getGeneration(table_key);
744 }
745 
747  return agg_col_range_cache_.getColRange(phys_input);
748 }
749 
750 namespace {
751 
752 void log_system_memory_info_impl(std::string const& mem_log,
753  size_t executor_id,
754  size_t log_time_ms,
755  std::string const& log_tag,
756  size_t const thread_idx) {
757  std::ostringstream oss;
758  oss << mem_log;
759  oss << " (" << log_tag << ", EXECUTOR-" << executor_id << ", THREAD-" << thread_idx
760  << ", TOOK: " << log_time_ms << " ms)";
761  VLOG(1) << oss.str();
762 }
763 } // namespace
764 
765 void Executor::logSystemCPUMemoryStatus(std::string const& log_tag,
766  size_t const thread_idx) const {
768  auto timer = timer_start();
769  std::ostringstream oss;
770  oss << getDataMgr()->getSystemMemoryUsage();
772  oss.str(), executor_id_, timer_stop(timer), log_tag, thread_idx);
773  }
774 }
775 
776 void Executor::logSystemGPUMemoryStatus(std::string const& log_tag,
777  size_t const thread_idx) const {
778 #ifdef HAVE_CUDA
779  if (g_allow_memory_status_log && getDataMgr() && getDataMgr()->gpusPresent() &&
780  getDataMgr()->getCudaMgr()) {
781  auto timer = timer_start();
782  auto mem_log = getDataMgr()->getCudaMgr()->getCudaMemoryUsageInString();
784  mem_log, executor_id_, timer_stop(timer), log_tag, thread_idx);
785  }
786 #endif
787 }
788 
789 namespace {
790 
791 size_t get_col_byte_width(const shared::ColumnKey& column_key) {
792  if (column_key.table_id < 0) {
793  // We have an intermediate results table
794 
795  // Todo(todd): Get more accurate representation of column width
796  // for intermediate tables
797  return size_t(8);
798  } else {
799  const auto cd = Catalog_Namespace::get_metadata_for_column(column_key);
800  const auto& ti = cd->columnType;
801  const auto sz = ti.get_size();
802  if (sz < 0) {
803  // for varlen types, only account for the pointer/size for each row, for now
804  if (ti.is_logical_geo_type()) {
805  // Don't count size for logical geo types, as they are
806  // backed by physical columns
807  return size_t(0);
808  } else {
809  return size_t(16);
810  }
811  } else {
812  return sz;
813  }
814  }
815 }
816 
817 } // anonymous namespace
818 
819 std::map<shared::ColumnKey, size_t> Executor::getColumnByteWidthMap(
820  const std::set<shared::TableKey>& table_ids_to_fetch,
821  const bool include_lazy_fetched_cols) const {
822  std::map<shared::ColumnKey, size_t> col_byte_width_map;
823 
824  for (const auto& fetched_col : plan_state_->getColumnsToFetch()) {
825  if (table_ids_to_fetch.count({fetched_col.db_id, fetched_col.table_id}) == 0) {
826  continue;
827  }
828  const size_t col_byte_width = get_col_byte_width(fetched_col);
829  CHECK(col_byte_width_map.insert({fetched_col, col_byte_width}).second);
830  }
831  if (include_lazy_fetched_cols) {
832  for (const auto& lazy_fetched_col : plan_state_->getColumnsToNotFetch()) {
833  if (table_ids_to_fetch.count({lazy_fetched_col.db_id, lazy_fetched_col.table_id}) ==
834  0) {
835  continue;
836  }
837  const size_t col_byte_width = get_col_byte_width(lazy_fetched_col);
838  CHECK(col_byte_width_map.insert({lazy_fetched_col, col_byte_width}).second);
839  }
840  }
841  return col_byte_width_map;
842 }
843 
845  const std::set<shared::TableKey>& table_ids_to_fetch) const {
846  size_t num_bytes = 0;
847  if (!plan_state_) {
848  return 0;
849  }
850  for (const auto& fetched_col : plan_state_->getColumnsToFetch()) {
851  if (table_ids_to_fetch.count({fetched_col.db_id, fetched_col.table_id}) == 0) {
852  continue;
853  }
854 
855  if (fetched_col.table_id < 0) {
856  num_bytes += 8;
857  } else {
859  {fetched_col.db_id, fetched_col.table_id, fetched_col.column_id});
860  const auto& ti = cd->columnType;
861  const auto sz = ti.get_size();
862  if (sz < 0) {
863  // for varlen types, only account for the pointer/size for each row, for now
864  if (!ti.is_logical_geo_type()) {
865  // Don't count size for logical geo types, as they are
866  // backed by physical columns
867  num_bytes += 16;
868  }
869  } else {
870  num_bytes += sz;
871  }
872  }
873  }
874  return num_bytes;
875 }
876 
878  const ExecutorDeviceType device_type,
879  const std::vector<InputDescriptor>& input_descs,
880  const std::vector<InputTableInfo>& query_infos,
881  const std::vector<std::pair<int32_t, FragmentsList>>& kernel_fragment_lists) const {
882  using TableFragmentId = std::pair<shared::TableKey, int32_t>;
883  using TableFragmentSizeMap = std::map<TableFragmentId, size_t>;
884 
885  /* Calculate bytes per column */
886 
887  // Only fetch lhs table ids for now...
888  // Allows us to cleanly lower number of kernels in flight to save
889  // buffer pool space, but is not a perfect estimate when big rhs
890  // join tables are involved. Will revisit.
891 
892  std::set<shared::TableKey> lhs_table_keys;
893  for (const auto& input_desc : input_descs) {
894  if (input_desc.getNestLevel() == 0) {
895  lhs_table_keys.insert(input_desc.getTableKey());
896  }
897  }
898 
899  const bool include_lazy_fetch_cols = device_type == ExecutorDeviceType::CPU;
900  const auto column_byte_width_map =
901  getColumnByteWidthMap(lhs_table_keys, include_lazy_fetch_cols);
902 
903  /* Calculate the byte width per row (sum of all columns widths)
904  Assumes each fragment touches the same columns, which is a DB-wide
905  invariant for now */
906 
907  size_t const byte_width_per_row =
908  std::accumulate(column_byte_width_map.begin(),
909  column_byte_width_map.end(),
910  size_t(0),
911  [](size_t sum, auto& col_entry) { return sum + col_entry.second; });
912 
913  /* Calculate num tuples for all fragments */
914 
915  TableFragmentSizeMap all_table_fragments_size_map;
916 
917  for (auto& query_info : query_infos) {
918  const auto& table_key = query_info.table_key;
919  for (const auto& frag : query_info.info.fragments) {
920  const int32_t frag_id = frag.fragmentId;
921  const TableFragmentId table_frag_id = std::make_pair(table_key, frag_id);
922  const size_t fragment_num_tuples = frag.getNumTuples(); // num_tuples;
923  all_table_fragments_size_map.insert(
924  std::make_pair(table_frag_id, fragment_num_tuples));
925  }
926  }
927 
928  /* Calculate num tuples only for fragments actually touched by query
929  Also calculate the num bytes needed for each kernel */
930 
931  TableFragmentSizeMap query_table_fragments_size_map;
932  std::vector<size_t> bytes_per_kernel;
933  bytes_per_kernel.reserve(kernel_fragment_lists.size());
934 
935  size_t max_kernel_bytes{0};
936 
937  for (auto& kernel_frag_list : kernel_fragment_lists) {
938  size_t kernel_bytes{0};
939  const auto frag_list = kernel_frag_list.second;
940  for (const auto& table_frags : frag_list) {
941  const auto& table_key = table_frags.table_key;
942  for (const size_t frag_id : table_frags.fragment_ids) {
943  const TableFragmentId table_frag_id = std::make_pair(table_key, frag_id);
944  const size_t fragment_num_tuples = all_table_fragments_size_map[table_frag_id];
945  kernel_bytes += fragment_num_tuples * byte_width_per_row;
946  query_table_fragments_size_map.insert(
947  std::make_pair(table_frag_id, fragment_num_tuples));
948  }
949  }
950  bytes_per_kernel.emplace_back(kernel_bytes);
951  if (kernel_bytes > max_kernel_bytes) {
952  max_kernel_bytes = kernel_bytes;
953  }
954  }
955 
956  /* Calculate bytes per chunk touched by the query */
957 
958  std::map<ChunkKey, size_t> all_chunks_byte_sizes_map;
959  constexpr int32_t subkey_min = std::numeric_limits<int32_t>::min();
960 
961  for (const auto& col_byte_width_entry : column_byte_width_map) {
962  // Build a chunk key prefix of (db_id, table_id, column_id)
963  const int32_t db_id = col_byte_width_entry.first.db_id;
964  const int32_t table_id = col_byte_width_entry.first.table_id;
965  const int32_t col_id = col_byte_width_entry.first.column_id;
966  const size_t col_byte_width = col_byte_width_entry.second;
967  const shared::TableKey table_key(db_id, table_id);
968 
969  const auto frag_start =
970  query_table_fragments_size_map.lower_bound({table_key, subkey_min});
971  for (auto frag_itr = frag_start; frag_itr != query_table_fragments_size_map.end() &&
972  frag_itr->first.first == table_key;
973  frag_itr++) {
974  const ChunkKey chunk_key = {db_id, table_id, col_id, frag_itr->first.second};
975  const size_t chunk_byte_size = col_byte_width * frag_itr->second;
976  all_chunks_byte_sizes_map.insert({chunk_key, chunk_byte_size});
977  }
978  }
979 
980  size_t total_chunk_bytes{0};
981  const size_t num_chunks = all_chunks_byte_sizes_map.size();
982  std::vector<std::pair<ChunkKey, size_t>> chunks_with_byte_sizes;
983  chunks_with_byte_sizes.reserve(num_chunks);
984  for (const auto& chunk_byte_size_entry : all_chunks_byte_sizes_map) {
985  chunks_with_byte_sizes.emplace_back(
986  std::make_pair(chunk_byte_size_entry.first, chunk_byte_size_entry.second));
987  // Add here, post mapping of the chunks, to make sure chunks are deduped and we get an
988  // accurate size estimate
989  total_chunk_bytes += chunk_byte_size_entry.second;
990  }
991  // Don't allow scaling of bytes per kernel launches for GPU yet as we're not set up for
992  // this at this point
993  const bool bytes_scales_per_kernel = device_type == ExecutorDeviceType::CPU;
994 
995  // Return ChunkRequestInfo
996 
997  return {device_type,
998  chunks_with_byte_sizes,
999  num_chunks,
1000  total_chunk_bytes,
1001  bytes_per_kernel,
1002  max_kernel_bytes,
1003  bytes_scales_per_kernel};
1004 }
1005 
1007  const std::vector<Analyzer::Expr*>& target_exprs) const {
1008  CHECK(plan_state_);
1009  for (const auto target_expr : target_exprs) {
1010  if (plan_state_->isLazyFetchColumn(target_expr)) {
1011  return true;
1012  }
1013  }
1014  return false;
1015 }
1016 
1017 std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
1018  const std::vector<Analyzer::Expr*>& target_exprs) const {
1019  CHECK(plan_state_);
1020  std::vector<ColumnLazyFetchInfo> col_lazy_fetch_info;
1021  for (const auto target_expr : target_exprs) {
1022  if (!plan_state_->isLazyFetchColumn(target_expr)) {
1023  col_lazy_fetch_info.emplace_back(
1024  ColumnLazyFetchInfo{false, -1, SQLTypeInfo(kNULLT, false)});
1025  } else {
1026  const auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(target_expr);
1027  CHECK(col_var);
1028  auto rte_idx = (col_var->get_rte_idx() == -1) ? 0 : col_var->get_rte_idx();
1029  const auto cd = get_column_descriptor_maybe(col_var->getColumnKey());
1030  if (cd && IS_GEO(cd->columnType.get_type())) {
1031  // Geo coords cols will be processed in sequence. So we only need to track the
1032  // first coords col in lazy fetch info.
1033  {
1034  auto col_key = col_var->getColumnKey();
1035  col_key.column_id += 1;
1036  const auto cd0 = get_column_descriptor(col_key);
1037  const auto col0_ti = cd0->columnType;
1038  CHECK(!cd0->isVirtualCol);
1039  const auto col0_var = makeExpr<Analyzer::ColumnVar>(col0_ti, col_key, rte_idx);
1040  const auto local_col0_id = plan_state_->getLocalColumnId(col0_var.get(), false);
1041  col_lazy_fetch_info.emplace_back(
1042  ColumnLazyFetchInfo{true, local_col0_id, col0_ti});
1043  }
1044  } else {
1045  auto local_col_id = plan_state_->getLocalColumnId(col_var, false);
1046  const auto& col_ti = col_var->get_type_info();
1047  col_lazy_fetch_info.emplace_back(ColumnLazyFetchInfo{true, local_col_id, col_ti});
1048  }
1049  }
1050  }
1051  return col_lazy_fetch_info;
1052 }
1053 
1058 }
1059 
1060 std::vector<int8_t> Executor::serializeLiterals(
1061  const std::unordered_map<int, CgenState::LiteralValues>& literals,
1062  const int device_id) {
1063  if (literals.empty()) {
1064  return {};
1065  }
1066  const auto dev_literals_it = literals.find(device_id);
1067  CHECK(dev_literals_it != literals.end());
1068  const auto& dev_literals = dev_literals_it->second;
1069  size_t lit_buf_size{0};
1070  std::vector<std::string> real_strings;
1071  std::vector<std::vector<double>> double_array_literals;
1072  std::vector<std::vector<int8_t>> align64_int8_array_literals;
1073  std::vector<std::vector<int32_t>> int32_array_literals;
1074  std::vector<std::vector<int8_t>> align32_int8_array_literals;
1075  std::vector<std::vector<int8_t>> int8_array_literals;
1076  for (const auto& lit : dev_literals) {
1077  lit_buf_size = CgenState::addAligned(lit_buf_size, CgenState::literalBytes(lit));
1078  if (lit.which() == 7) {
1079  const auto p = boost::get<std::string>(&lit);
1080  CHECK(p);
1081  real_strings.push_back(*p);
1082  } else if (lit.which() == 8) {
1083  const auto p = boost::get<std::vector<double>>(&lit);
1084  CHECK(p);
1085  double_array_literals.push_back(*p);
1086  } else if (lit.which() == 9) {
1087  const auto p = boost::get<std::vector<int32_t>>(&lit);
1088  CHECK(p);
1089  int32_array_literals.push_back(*p);
1090  } else if (lit.which() == 10) {
1091  const auto p = boost::get<std::vector<int8_t>>(&lit);
1092  CHECK(p);
1093  int8_array_literals.push_back(*p);
1094  } else if (lit.which() == 11) {
1095  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
1096  CHECK(p);
1097  if (p->second == 64) {
1098  align64_int8_array_literals.push_back(p->first);
1099  } else if (p->second == 32) {
1100  align32_int8_array_literals.push_back(p->first);
1101  } else {
1102  CHECK(false);
1103  }
1104  }
1105  }
1106  if (lit_buf_size > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
1107  throw TooManyLiterals();
1108  }
1109  int16_t crt_real_str_off = lit_buf_size;
1110  for (const auto& real_str : real_strings) {
1111  CHECK_LE(real_str.size(), static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1112  lit_buf_size += real_str.size();
1113  }
1114  if (double_array_literals.size() > 0) {
1115  lit_buf_size = align(lit_buf_size, sizeof(double));
1116  }
1117  int16_t crt_double_arr_lit_off = lit_buf_size;
1118  for (const auto& double_array_literal : double_array_literals) {
1119  CHECK_LE(double_array_literal.size(),
1120  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1121  lit_buf_size += double_array_literal.size() * sizeof(double);
1122  }
1123  if (align64_int8_array_literals.size() > 0) {
1124  lit_buf_size = align(lit_buf_size, sizeof(uint64_t));
1125  }
1126  int16_t crt_align64_int8_arr_lit_off = lit_buf_size;
1127  for (const auto& align64_int8_array_literal : align64_int8_array_literals) {
1128  CHECK_LE(align64_int8_array_literals.size(),
1129  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1130  lit_buf_size += align64_int8_array_literal.size();
1131  }
1132  if (int32_array_literals.size() > 0) {
1133  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
1134  }
1135  int16_t crt_int32_arr_lit_off = lit_buf_size;
1136  for (const auto& int32_array_literal : int32_array_literals) {
1137  CHECK_LE(int32_array_literal.size(),
1138  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1139  lit_buf_size += int32_array_literal.size() * sizeof(int32_t);
1140  }
1141  if (align32_int8_array_literals.size() > 0) {
1142  lit_buf_size = align(lit_buf_size, sizeof(int32_t));
1143  }
1144  int16_t crt_align32_int8_arr_lit_off = lit_buf_size;
1145  for (const auto& align32_int8_array_literal : align32_int8_array_literals) {
1146  CHECK_LE(align32_int8_array_literals.size(),
1147  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1148  lit_buf_size += align32_int8_array_literal.size();
1149  }
1150  int16_t crt_int8_arr_lit_off = lit_buf_size;
1151  for (const auto& int8_array_literal : int8_array_literals) {
1152  CHECK_LE(int8_array_literal.size(),
1153  static_cast<size_t>(std::numeric_limits<int16_t>::max()));
1154  lit_buf_size += int8_array_literal.size();
1155  }
1156  unsigned crt_real_str_idx = 0;
1157  unsigned crt_double_arr_lit_idx = 0;
1158  unsigned crt_align64_int8_arr_lit_idx = 0;
1159  unsigned crt_int32_arr_lit_idx = 0;
1160  unsigned crt_align32_int8_arr_lit_idx = 0;
1161  unsigned crt_int8_arr_lit_idx = 0;
1162  std::vector<int8_t> serialized(lit_buf_size);
1163  size_t off{0};
1164  for (const auto& lit : dev_literals) {
1165  const auto lit_bytes = CgenState::literalBytes(lit);
1166  off = CgenState::addAligned(off, lit_bytes);
1167  switch (lit.which()) {
1168  case 0: {
1169  const auto p = boost::get<int8_t>(&lit);
1170  CHECK(p);
1171  serialized[off - lit_bytes] = *p;
1172  break;
1173  }
1174  case 1: {
1175  const auto p = boost::get<int16_t>(&lit);
1176  CHECK(p);
1177  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1178  break;
1179  }
1180  case 2: {
1181  const auto p = boost::get<int32_t>(&lit);
1182  CHECK(p);
1183  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1184  break;
1185  }
1186  case 3: {
1187  const auto p = boost::get<int64_t>(&lit);
1188  CHECK(p);
1189  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1190  break;
1191  }
1192  case 4: {
1193  const auto p = boost::get<float>(&lit);
1194  CHECK(p);
1195  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1196  break;
1197  }
1198  case 5: {
1199  const auto p = boost::get<double>(&lit);
1200  CHECK(p);
1201  memcpy(&serialized[off - lit_bytes], p, lit_bytes);
1202  break;
1203  }
1204  case 6: {
1205  const auto p = boost::get<std::pair<std::string, shared::StringDictKey>>(&lit);
1206  CHECK(p);
1207  const auto str_id =
1209  ? getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
1210  ->getOrAddTransient(p->first)
1211  : getStringDictionaryProxy(p->second, row_set_mem_owner_, true)
1212  ->getIdOfString(p->first);
1213  memcpy(&serialized[off - lit_bytes], &str_id, lit_bytes);
1214  break;
1215  }
1216  case 7: {
1217  const auto p = boost::get<std::string>(&lit);
1218  CHECK(p);
1219  int32_t off_and_len = crt_real_str_off << 16;
1220  const auto& crt_real_str = real_strings[crt_real_str_idx];
1221  off_and_len |= static_cast<int16_t>(crt_real_str.size());
1222  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1223  memcpy(&serialized[crt_real_str_off], crt_real_str.data(), crt_real_str.size());
1224  ++crt_real_str_idx;
1225  crt_real_str_off += crt_real_str.size();
1226  break;
1227  }
1228  case 8: {
1229  const auto p = boost::get<std::vector<double>>(&lit);
1230  CHECK(p);
1231  int32_t off_and_len = crt_double_arr_lit_off << 16;
1232  const auto& crt_double_arr_lit = double_array_literals[crt_double_arr_lit_idx];
1233  int32_t len = crt_double_arr_lit.size();
1234  CHECK_EQ((len >> 16), 0);
1235  off_and_len |= static_cast<int16_t>(len);
1236  int32_t double_array_bytesize = len * sizeof(double);
1237  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1238  memcpy(&serialized[crt_double_arr_lit_off],
1239  crt_double_arr_lit.data(),
1240  double_array_bytesize);
1241  ++crt_double_arr_lit_idx;
1242  crt_double_arr_lit_off += double_array_bytesize;
1243  break;
1244  }
1245  case 9: {
1246  const auto p = boost::get<std::vector<int32_t>>(&lit);
1247  CHECK(p);
1248  int32_t off_and_len = crt_int32_arr_lit_off << 16;
1249  const auto& crt_int32_arr_lit = int32_array_literals[crt_int32_arr_lit_idx];
1250  int32_t len = crt_int32_arr_lit.size();
1251  CHECK_EQ((len >> 16), 0);
1252  off_and_len |= static_cast<int16_t>(len);
1253  int32_t int32_array_bytesize = len * sizeof(int32_t);
1254  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1255  memcpy(&serialized[crt_int32_arr_lit_off],
1256  crt_int32_arr_lit.data(),
1257  int32_array_bytesize);
1258  ++crt_int32_arr_lit_idx;
1259  crt_int32_arr_lit_off += int32_array_bytesize;
1260  break;
1261  }
1262  case 10: {
1263  const auto p = boost::get<std::vector<int8_t>>(&lit);
1264  CHECK(p);
1265  int32_t off_and_len = crt_int8_arr_lit_off << 16;
1266  const auto& crt_int8_arr_lit = int8_array_literals[crt_int8_arr_lit_idx];
1267  int32_t len = crt_int8_arr_lit.size();
1268  CHECK_EQ((len >> 16), 0);
1269  off_and_len |= static_cast<int16_t>(len);
1270  int32_t int8_array_bytesize = len;
1271  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1272  memcpy(&serialized[crt_int8_arr_lit_off],
1273  crt_int8_arr_lit.data(),
1274  int8_array_bytesize);
1275  ++crt_int8_arr_lit_idx;
1276  crt_int8_arr_lit_off += int8_array_bytesize;
1277  break;
1278  }
1279  case 11: {
1280  const auto p = boost::get<std::pair<std::vector<int8_t>, int>>(&lit);
1281  CHECK(p);
1282  if (p->second == 64) {
1283  int32_t off_and_len = crt_align64_int8_arr_lit_off << 16;
1284  const auto& crt_align64_int8_arr_lit =
1285  align64_int8_array_literals[crt_align64_int8_arr_lit_idx];
1286  int32_t len = crt_align64_int8_arr_lit.size();
1287  CHECK_EQ((len >> 16), 0);
1288  off_and_len |= static_cast<int16_t>(len);
1289  int32_t align64_int8_array_bytesize = len;
1290  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1291  memcpy(&serialized[crt_align64_int8_arr_lit_off],
1292  crt_align64_int8_arr_lit.data(),
1293  align64_int8_array_bytesize);
1294  ++crt_align64_int8_arr_lit_idx;
1295  crt_align64_int8_arr_lit_off += align64_int8_array_bytesize;
1296  } else if (p->second == 32) {
1297  int32_t off_and_len = crt_align32_int8_arr_lit_off << 16;
1298  const auto& crt_align32_int8_arr_lit =
1299  align32_int8_array_literals[crt_align32_int8_arr_lit_idx];
1300  int32_t len = crt_align32_int8_arr_lit.size();
1301  CHECK_EQ((len >> 16), 0);
1302  off_and_len |= static_cast<int16_t>(len);
1303  int32_t align32_int8_array_bytesize = len;
1304  memcpy(&serialized[off - lit_bytes], &off_and_len, lit_bytes);
1305  memcpy(&serialized[crt_align32_int8_arr_lit_off],
1306  crt_align32_int8_arr_lit.data(),
1307  align32_int8_array_bytesize);
1308  ++crt_align32_int8_arr_lit_idx;
1309  crt_align32_int8_arr_lit_off += align32_int8_array_bytesize;
1310  } else {
1311  CHECK(false);
1312  }
1313  break;
1314  }
1315  default:
1316  CHECK(false);
1317  }
1318  }
1319  return serialized;
1320 }
1321 
1322 int Executor::deviceCount(const ExecutorDeviceType device_type) const {
1323  if (device_type == ExecutorDeviceType::GPU) {
1324  return cudaMgr()->getDeviceCount();
1325  } else {
1326  return 1;
1327  }
1328 }
1329 
1331  const Data_Namespace::MemoryLevel memory_level) const {
1332  return memory_level == GPU_LEVEL ? deviceCount(ExecutorDeviceType::GPU)
1334 }
1335 
1336 // TODO(alex): remove or split
1337 std::pair<int64_t, int32_t> Executor::reduceResults(const SQLAgg agg,
1338  const SQLTypeInfo& ti,
1339  const int64_t agg_init_val,
1340  const int8_t out_byte_width,
1341  const int64_t* out_vec,
1342  const size_t out_vec_sz,
1343  const bool is_group_by,
1344  const bool float_argument_input) {
1345  switch (agg) {
1346  case kAVG:
1347  case kSUM:
1348  case kSUM_IF:
1349  if (0 != agg_init_val) {
1350  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1351  int64_t agg_result = agg_init_val;
1352  for (size_t i = 0; i < out_vec_sz; ++i) {
1353  agg_sum_skip_val(&agg_result, out_vec[i], agg_init_val);
1354  }
1355  return {agg_result, 0};
1356  } else {
1357  CHECK(ti.is_fp());
1358  switch (out_byte_width) {
1359  case 4: {
1360  int agg_result = static_cast<int32_t>(agg_init_val);
1361  for (size_t i = 0; i < out_vec_sz; ++i) {
1363  &agg_result,
1364  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1365  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1366  }
1367  const int64_t converted_bin =
1368  float_argument_input
1369  ? static_cast<int64_t>(agg_result)
1370  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1371  return {converted_bin, 0};
1372  break;
1373  }
1374  case 8: {
1375  int64_t agg_result = agg_init_val;
1376  for (size_t i = 0; i < out_vec_sz; ++i) {
1378  &agg_result,
1379  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1380  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1381  }
1382  return {agg_result, 0};
1383  break;
1384  }
1385  default:
1386  CHECK(false);
1387  }
1388  }
1389  }
1390  if (ti.is_integer() || ti.is_decimal() || ti.is_time()) {
1391  int64_t agg_result = 0;
1392  for (size_t i = 0; i < out_vec_sz; ++i) {
1393  agg_result += out_vec[i];
1394  }
1395  return {agg_result, 0};
1396  } else {
1397  CHECK(ti.is_fp());
1398  switch (out_byte_width) {
1399  case 4: {
1400  float r = 0.;
1401  for (size_t i = 0; i < out_vec_sz; ++i) {
1402  r += *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i]));
1403  }
1404  const auto float_bin = *reinterpret_cast<const int32_t*>(may_alias_ptr(&r));
1405  const int64_t converted_bin =
1406  float_argument_input ? float_bin : float_to_double_bin(float_bin, true);
1407  return {converted_bin, 0};
1408  }
1409  case 8: {
1410  double r = 0.;
1411  for (size_t i = 0; i < out_vec_sz; ++i) {
1412  r += *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i]));
1413  }
1414  return {*reinterpret_cast<const int64_t*>(may_alias_ptr(&r)), 0};
1415  }
1416  default:
1417  CHECK(false);
1418  }
1419  }
1420  break;
1421  case kCOUNT:
1422  case kCOUNT_IF: {
1423  uint64_t agg_result = 0;
1424  for (size_t i = 0; i < out_vec_sz; ++i) {
1425  const uint64_t out = static_cast<uint64_t>(out_vec[i]);
1426  agg_result += out;
1427  }
1428  return {static_cast<int64_t>(agg_result), 0};
1429  }
1430  case kMIN: {
1431  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1432  int64_t agg_result = agg_init_val;
1433  for (size_t i = 0; i < out_vec_sz; ++i) {
1434  agg_min_skip_val(&agg_result, out_vec[i], agg_init_val);
1435  }
1436  return {agg_result, 0};
1437  } else {
1438  switch (out_byte_width) {
1439  case 4: {
1440  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1441  for (size_t i = 0; i < out_vec_sz; ++i) {
1443  &agg_result,
1444  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1445  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1446  }
1447  const int64_t converted_bin =
1448  float_argument_input
1449  ? static_cast<int64_t>(agg_result)
1450  : float_to_double_bin(static_cast<int32_t>(agg_result), true);
1451  return {converted_bin, 0};
1452  }
1453  case 8: {
1454  int64_t agg_result = agg_init_val;
1455  for (size_t i = 0; i < out_vec_sz; ++i) {
1457  &agg_result,
1458  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1459  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1460  }
1461  return {agg_result, 0};
1462  }
1463  default:
1464  CHECK(false);
1465  }
1466  }
1467  }
1468  case kMAX:
1469  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1470  int64_t agg_result = agg_init_val;
1471  for (size_t i = 0; i < out_vec_sz; ++i) {
1472  agg_max_skip_val(&agg_result, out_vec[i], agg_init_val);
1473  }
1474  return {agg_result, 0};
1475  } else {
1476  switch (out_byte_width) {
1477  case 4: {
1478  int32_t agg_result = static_cast<int32_t>(agg_init_val);
1479  for (size_t i = 0; i < out_vec_sz; ++i) {
1481  &agg_result,
1482  *reinterpret_cast<const float*>(may_alias_ptr(&out_vec[i])),
1483  *reinterpret_cast<const float*>(may_alias_ptr(&agg_init_val)));
1484  }
1485  const int64_t converted_bin =
1486  float_argument_input ? static_cast<int64_t>(agg_result)
1487  : float_to_double_bin(agg_result, !ti.get_notnull());
1488  return {converted_bin, 0};
1489  }
1490  case 8: {
1491  int64_t agg_result = agg_init_val;
1492  for (size_t i = 0; i < out_vec_sz; ++i) {
1494  &agg_result,
1495  *reinterpret_cast<const double*>(may_alias_ptr(&out_vec[i])),
1496  *reinterpret_cast<const double*>(may_alias_ptr(&agg_init_val)));
1497  }
1498  return {agg_result, 0};
1499  }
1500  default:
1501  CHECK(false);
1502  }
1503  }
1504  case kSINGLE_VALUE: {
1505  int64_t agg_result = agg_init_val;
1506  for (size_t i = 0; i < out_vec_sz; ++i) {
1507  if (out_vec[i] != agg_init_val) {
1508  if (agg_result == agg_init_val) {
1509  agg_result = out_vec[i];
1510  } else if (out_vec[i] != agg_result) {
1511  return {agg_result, int32_t(ErrorCode::SINGLE_VALUE_FOUND_MULTIPLE_VALUES)};
1512  }
1513  }
1514  }
1515  return {agg_result, 0};
1516  }
1517  case kSAMPLE: {
1518  int64_t agg_result = agg_init_val;
1519  for (size_t i = 0; i < out_vec_sz; ++i) {
1520  if (out_vec[i] != agg_init_val) {
1521  agg_result = out_vec[i];
1522  break;
1523  }
1524  }
1525  return {agg_result, 0};
1526  }
1527  default:
1528  UNREACHABLE() << "Unsupported SQLAgg: " << agg;
1529  }
1530  abort();
1531 }
1532 
1533 namespace {
1534 
1536  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1537  std::vector<TargetInfo> const& targets) {
1538  auto& first = results_per_device.front().first;
1539  CHECK(first);
1540  auto const first_target_idx = result_set::first_dict_encoded_idx(targets);
1541  if (first_target_idx) {
1542  first->translateDictEncodedColumns(targets, *first_target_idx);
1543  }
1544  for (size_t dev_idx = 1; dev_idx < results_per_device.size(); ++dev_idx) {
1545  const auto& next = results_per_device[dev_idx].first;
1546  CHECK(next);
1547  if (first_target_idx) {
1548  next->translateDictEncodedColumns(targets, *first_target_idx);
1549  }
1550  first->append(*next);
1551  }
1552  return std::move(first);
1553 }
1554 
1556  TargetInfo operator()(Analyzer::Expr const* const target_expr) const {
1557  return get_target_info(target_expr, g_bigint_count);
1558  }
1559 };
1560 
1561 } // namespace
1562 
1564  const RelAlgExecutionUnit& ra_exe_unit) {
1565  auto timer = DEBUG_TIMER(__func__);
1566  auto& results_per_device = shared_context.getFragmentResults();
1567  auto const targets = shared::transform<std::vector<TargetInfo>>(
1568  ra_exe_unit.target_exprs, GetTargetInfo{});
1569  if (results_per_device.empty()) {
1570  return std::make_shared<ResultSet>(targets,
1574  blockSize(),
1575  gridSize());
1576  }
1577  using IndexedResultSet = std::pair<ResultSetPtr, std::vector<size_t>>;
1578  std::sort(results_per_device.begin(),
1579  results_per_device.end(),
1580  [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) {
1581  CHECK_GE(lhs.second.size(), size_t(1));
1582  CHECK_GE(rhs.second.size(), size_t(1));
1583  return lhs.second.front() < rhs.second.front();
1584  });
1585 
1586  return get_merged_result(results_per_device, targets);
1587 }
1588 
1590  const RelAlgExecutionUnit& ra_exe_unit,
1591  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1592  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1593  const QueryMemoryDescriptor& query_mem_desc) const {
1594  auto timer = DEBUG_TIMER(__func__);
1595  if (ra_exe_unit.estimator) {
1596  return reduce_estimator_results(ra_exe_unit, results_per_device);
1597  }
1598 
1599  if (results_per_device.empty()) {
1600  auto const targets = shared::transform<std::vector<TargetInfo>>(
1601  ra_exe_unit.target_exprs, GetTargetInfo{});
1602  return std::make_shared<ResultSet>(targets,
1605  nullptr,
1606  blockSize(),
1607  gridSize());
1608  }
1609 
1610  if (query_mem_desc.threadsCanReuseGroupByBuffers()) {
1611  auto unique_results = getUniqueThreadSharedResultSets(results_per_device);
1613  unique_results,
1614  row_set_mem_owner,
1615  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
1616  }
1618  results_per_device,
1619  row_set_mem_owner,
1620  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc));
1621 }
1622 
1623 std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>
1625  const std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device)
1626  const {
1627  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> unique_thread_results;
1628  if (results_per_device.empty()) {
1629  return unique_thread_results;
1630  }
1631  auto max_ti = [](int acc, auto& e) { return std::max(acc, e.first->getThreadIdx()); };
1632  int const max_thread_idx =
1633  std::accumulate(results_per_device.begin(), results_per_device.end(), -1, max_ti);
1634  std::vector<bool> seen_thread_idxs(max_thread_idx + 1, false);
1635  for (const auto& result : results_per_device) {
1636  const int32_t result_thread_idx = result.first->getThreadIdx();
1637  if (!seen_thread_idxs[result_thread_idx]) {
1638  seen_thread_idxs[result_thread_idx] = true;
1639  unique_thread_results.emplace_back(result);
1640  }
1641  }
1642  return unique_thread_results;
1643 }
1644 
1645 namespace {
1646 
1648  const size_t executor_id,
1649  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1650  int64_t* compilation_queue_time) {
1651  auto clock_begin = timer_start();
1652  // ResultSetReductionJIT::codegen compilation-locks if new code will be generated
1653  *compilation_queue_time = timer_stop(clock_begin);
1654  const auto& this_result_set = results_per_device[0].first;
1655  ResultSetReductionJIT reduction_jit(this_result_set->getQueryMemDesc(),
1656  this_result_set->getTargetInfos(),
1657  this_result_set->getTargetInitVals(),
1658  executor_id);
1659  return reduction_jit.codegen();
1660 };
1661 
1662 } // namespace
1663 
1665  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1666  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1667  const QueryMemoryDescriptor& query_mem_desc) const {
1668  auto timer = DEBUG_TIMER(__func__);
1669  std::shared_ptr<ResultSet> reduced_results;
1670 
1671  const auto& first = results_per_device.front().first;
1672 
1673  if (query_mem_desc.getQueryDescriptionType() ==
1675  results_per_device.size() > 1) {
1676  const auto total_entry_count = std::accumulate(
1677  results_per_device.begin(),
1678  results_per_device.end(),
1679  size_t(0),
1680  [](const size_t init, const std::pair<ResultSetPtr, std::vector<size_t>>& rs) {
1681  const auto& r = rs.first;
1682  return init + r->getQueryMemDesc().getEntryCount();
1683  });
1684  CHECK(total_entry_count);
1685  auto query_mem_desc = first->getQueryMemDesc();
1686  query_mem_desc.setEntryCount(total_entry_count);
1687  reduced_results = std::make_shared<ResultSet>(first->getTargetInfos(),
1690  row_set_mem_owner,
1691  blockSize(),
1692  gridSize());
1693  auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
1694  reduced_results->initializeStorage();
1695  switch (query_mem_desc.getEffectiveKeyWidth()) {
1696  case 4:
1697  first->getStorage()->moveEntriesToBuffer<int32_t>(
1698  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1699  break;
1700  case 8:
1701  first->getStorage()->moveEntriesToBuffer<int64_t>(
1702  result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1703  break;
1704  default:
1705  CHECK(false);
1706  }
1707  } else {
1708  reduced_results = first;
1709  }
1710 
1711  int64_t compilation_queue_time = 0;
1712  const auto reduction_code =
1713  get_reduction_code(executor_id_, results_per_device, &compilation_queue_time);
1714 
1715  for (size_t i = 1; i < results_per_device.size(); ++i) {
1716  reduced_results->getStorage()->reduce(
1717  *(results_per_device[i].first->getStorage()), {}, reduction_code, executor_id_);
1718  }
1719  reduced_results->addCompilationQueueTime(compilation_queue_time);
1720  reduced_results->invalidateCachedRowCount();
1721  return reduced_results;
1722 }
1723 
1725  const RelAlgExecutionUnit& ra_exe_unit,
1726  std::vector<std::pair<ResultSetPtr, std::vector<size_t>>>& results_per_device,
1727  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
1728  const QueryMemoryDescriptor& query_mem_desc) const {
1729  if (results_per_device.size() == 1) {
1730  return std::move(results_per_device.front().first);
1731  }
1732  const auto top_n =
1733  ra_exe_unit.sort_info.limit.value_or(0) + ra_exe_unit.sort_info.offset;
1735  for (const auto& result : results_per_device) {
1736  auto rows = result.first;
1737  CHECK(rows);
1738  if (!rows) {
1739  continue;
1740  }
1741  SpeculativeTopNMap that(
1742  *rows,
1743  ra_exe_unit.target_exprs,
1744  std::max(size_t(10000 * std::max(1, static_cast<int>(log(top_n)))), top_n));
1745  m.reduce(that);
1746  }
1747  CHECK_EQ(size_t(1), ra_exe_unit.sort_info.order_entries.size());
1748  const auto desc = ra_exe_unit.sort_info.order_entries.front().is_desc;
1749  return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc);
1750 }
1751 
1752 std::unordered_set<int> get_available_gpus(const Data_Namespace::DataMgr* data_mgr) {
1753  CHECK(data_mgr);
1754  std::unordered_set<int> available_gpus;
1755  if (data_mgr->gpusPresent()) {
1756  CHECK(data_mgr->getCudaMgr());
1757  const int gpu_count = data_mgr->getCudaMgr()->getDeviceCount();
1758  CHECK_GT(gpu_count, 0);
1759  for (int gpu_id = 0; gpu_id < gpu_count; ++gpu_id) {
1760  available_gpus.insert(gpu_id);
1761  }
1762  }
1763  return available_gpus;
1764 }
1765 
1766 size_t get_context_count(const ExecutorDeviceType device_type,
1767  const size_t cpu_count,
1768  const size_t gpu_count) {
1769  return device_type == ExecutorDeviceType::GPU ? gpu_count
1770  : static_cast<size_t>(cpu_count);
1771 }
1772 
1773 namespace {
1774 
1775 // Compute a very conservative entry count for the output buffer entry count using no
1776 // other information than the number of tuples in each table and multiplying them
1777 // together.
1778 size_t compute_buffer_entry_guess(const std::vector<InputTableInfo>& query_infos,
1779  const RelAlgExecutionUnit& ra_exe_unit) {
1780  // we can use filtered_count_all's result if available
1781  if (ra_exe_unit.scan_limit) {
1782  VLOG(1)
1783  << "Exploiting a result of filtered count query as output buffer entry count: "
1784  << ra_exe_unit.scan_limit;
1785  return ra_exe_unit.scan_limit;
1786  }
1788  using checked_size_t = boost::multiprecision::number<
1789  boost::multiprecision::cpp_int_backend<64,
1790  64,
1791  boost::multiprecision::unsigned_magnitude,
1792  boost::multiprecision::checked,
1793  void>>;
1794  checked_size_t checked_max_groups_buffer_entry_guess = 1;
1795  // Cap the rough approximation to 100M entries, it's unlikely we can do a great job for
1796  // baseline group layout with that many entries anyway.
1797  constexpr size_t max_groups_buffer_entry_guess_cap = 100000000;
1798  // Check for overflows since we're multiplying potentially big table sizes.
1799  try {
1800  for (const auto& table_info : query_infos) {
1801  CHECK(!table_info.info.fragments.empty());
1802  checked_size_t table_cardinality = 0;
1803  std::for_each(table_info.info.fragments.begin(),
1804  table_info.info.fragments.end(),
1805  [&table_cardinality](const FragmentInfo& frag_info) {
1806  table_cardinality += frag_info.getNumTuples();
1807  });
1808  checked_max_groups_buffer_entry_guess *= table_cardinality;
1809  }
1810  } catch (...) {
1811  checked_max_groups_buffer_entry_guess = max_groups_buffer_entry_guess_cap;
1812  VLOG(1) << "Detect overflow when approximating output buffer entry count, "
1813  "resetting it as "
1814  << max_groups_buffer_entry_guess_cap;
1815  }
1816  size_t max_groups_buffer_entry_guess =
1817  std::min(static_cast<size_t>(checked_max_groups_buffer_entry_guess),
1818  max_groups_buffer_entry_guess_cap);
1819  VLOG(1) << "Set an approximated output entry count as: "
1820  << max_groups_buffer_entry_guess;
1821  return max_groups_buffer_entry_guess;
1822 }
1823 
1824 std::string get_table_name(const InputDescriptor& input_desc) {
1825  const auto source_type = input_desc.getSourceType();
1826  if (source_type == InputSourceType::TABLE) {
1827  const auto& table_key = input_desc.getTableKey();
1828  CHECK_GT(table_key.table_id, 0);
1829  const auto td = Catalog_Namespace::get_metadata_for_table(table_key);
1830  CHECK(td);
1831  return td->tableName;
1832  } else {
1833  return "$TEMPORARY_TABLE" + std::to_string(-input_desc.getTableKey().table_id);
1834  }
1835 }
1836 
1838  size_t watchdog_max_projected_rows_per_device,
1839  const ExecutorDeviceType device_type,
1840  const int device_count) {
1841  if (device_type == ExecutorDeviceType::GPU) {
1842  return device_count * watchdog_max_projected_rows_per_device;
1843  }
1844  return watchdog_max_projected_rows_per_device;
1845 }
1846 
1848  const std::vector<InputTableInfo>& table_infos,
1849  const ExecutorDeviceType device_type,
1850  const int device_count) {
1851  for (const auto target_expr : ra_exe_unit.target_exprs) {
1852  if (dynamic_cast<const Analyzer::AggExpr*>(target_expr)) {
1853  return;
1854  }
1855  }
1856  size_t watchdog_max_projected_rows_per_device =
1858  if (ra_exe_unit.query_hint.isHintRegistered(
1860  watchdog_max_projected_rows_per_device =
1862  VLOG(1) << "Set the watchdog per device maximum projection limit: "
1863  << watchdog_max_projected_rows_per_device << " by a query hint";
1864  }
1865  if (!ra_exe_unit.scan_limit && table_infos.size() == 1 &&
1866  table_infos.front().info.getPhysicalNumTuples() <
1867  watchdog_max_projected_rows_per_device) {
1868  // Allow a query with no scan limit to run on small tables
1869  return;
1870  }
1871  if (ra_exe_unit.use_bump_allocator) {
1872  // Bump allocator removes the scan limit (and any knowledge of the size of the output
1873  // relative to the size of the input), so we bypass this check for now
1874  return;
1875  }
1876  if (ra_exe_unit.sort_info.algorithm != SortAlgorithm::StreamingTopN &&
1877  ra_exe_unit.groupby_exprs.size() == 1 && !ra_exe_unit.groupby_exprs.front() &&
1878  (!ra_exe_unit.scan_limit ||
1879  ra_exe_unit.scan_limit >
1881  watchdog_max_projected_rows_per_device, device_type, device_count))) {
1882  std::vector<std::string> table_names;
1883  const auto& input_descs = ra_exe_unit.input_descs;
1884  for (const auto& input_desc : input_descs) {
1885  table_names.push_back(get_table_name(input_desc));
1886  }
1887  if (!ra_exe_unit.scan_limit) {
1888  throw WatchdogException(
1889  "Projection query would require a scan without a limit on table(s): " +
1890  boost::algorithm::join(table_names, ", "));
1891  } else {
1892  throw WatchdogException(
1893  "Projection query output result set on table(s): " +
1894  boost::algorithm::join(table_names, ", ") + " would contain " +
1895  std::to_string(ra_exe_unit.scan_limit) +
1896  " rows, which is more than the current system limit of " +
1898  watchdog_max_projected_rows_per_device, device_type, device_count)));
1899  }
1900  }
1901 }
1902 
1903 } // namespace
1904 
1905 size_t get_loop_join_size(const std::vector<InputTableInfo>& query_infos,
1906  const RelAlgExecutionUnit& ra_exe_unit) {
1907  const auto inner_table_key = ra_exe_unit.input_descs.back().getTableKey();
1908 
1909  std::optional<size_t> inner_table_idx;
1910  for (size_t i = 0; i < query_infos.size(); ++i) {
1911  if (query_infos[i].table_key == inner_table_key) {
1912  inner_table_idx = i;
1913  break;
1914  }
1915  }
1916  CHECK(inner_table_idx);
1917  return query_infos[*inner_table_idx].info.getNumTuples();
1918 }
1919 
1920 namespace {
1921 
1922 template <typename T>
1923 std::vector<std::string> expr_container_to_string(const T& expr_container) {
1924  std::vector<std::string> expr_strs;
1925  for (const auto& expr : expr_container) {
1926  if (!expr) {
1927  expr_strs.emplace_back("NULL");
1928  } else {
1929  expr_strs.emplace_back(expr->toString());
1930  }
1931  }
1932  return expr_strs;
1933 }
1934 
1935 template <>
1936 std::vector<std::string> expr_container_to_string(
1937  const std::list<Analyzer::OrderEntry>& expr_container) {
1938  std::vector<std::string> expr_strs;
1939  for (const auto& expr : expr_container) {
1940  expr_strs.emplace_back(expr.toString());
1941  }
1942  return expr_strs;
1943 }
1944 
1945 std::string sort_algorithm_to_string(const SortAlgorithm algorithm) {
1946  switch (algorithm) {
1948  return "ResultSet";
1950  return "Speculative Top N";
1952  return "Streaming Top N";
1953  }
1954  UNREACHABLE();
1955  return "";
1956 }
1957 
1958 } // namespace
1959 
1961  // todo(yoonmin): replace a cache key as a DAG representation of a query plan
1962  // instead of ra_exec_unit description if possible
1963  std::ostringstream os;
1964  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
1965  const auto& scan_desc = input_col_desc->getScanDesc();
1966  os << scan_desc.getTableKey() << "," << input_col_desc->getColId() << ","
1967  << scan_desc.getNestLevel();
1968  table_keys.emplace(scan_desc.getTableKey());
1969  }
1970  if (!ra_exe_unit.simple_quals.empty()) {
1971  for (const auto& qual : ra_exe_unit.simple_quals) {
1972  if (qual) {
1973  os << qual->toString() << ",";
1974  }
1975  }
1976  }
1977  if (!ra_exe_unit.quals.empty()) {
1978  for (const auto& qual : ra_exe_unit.quals) {
1979  if (qual) {
1980  os << qual->toString() << ",";
1981  }
1982  }
1983  }
1984  if (!ra_exe_unit.join_quals.empty()) {
1985  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
1986  const auto& join_condition = ra_exe_unit.join_quals[i];
1987  os << std::to_string(i) << ::toString(join_condition.type);
1988  for (const auto& qual : join_condition.quals) {
1989  if (qual) {
1990  os << qual->toString() << ",";
1991  }
1992  }
1993  }
1994  }
1995  if (!ra_exe_unit.groupby_exprs.empty()) {
1996  for (const auto& qual : ra_exe_unit.groupby_exprs) {
1997  if (qual) {
1998  os << qual->toString() << ",";
1999  }
2000  }
2001  }
2002  for (const auto& expr : ra_exe_unit.target_exprs) {
2003  if (expr) {
2004  os << expr->toString() << ",";
2005  }
2006  }
2007  os << ::toString(ra_exe_unit.estimator == nullptr);
2008  os << std::to_string(ra_exe_unit.scan_limit);
2009  key = os.str();
2010 }
2011 
2013  return key == other.key;
2014 }
2015 
2017  return boost::hash_value(key);
2018 }
2019 
2021  return table_keys.find(table_key) != table_keys.end();
2022 }
2023 
2024 std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit) {
2025  os << "\n\tExtracted Query Plan Dag Hash: " << ra_exe_unit.query_plan_dag_hash;
2026  os << "\n\tTable/Col/Levels: ";
2027  for (const auto& input_col_desc : ra_exe_unit.input_col_descs) {
2028  const auto& scan_desc = input_col_desc->getScanDesc();
2029  os << "(" << scan_desc.getTableKey() << ", " << input_col_desc->getColId() << ", "
2030  << scan_desc.getNestLevel() << ") ";
2031  }
2032  if (!ra_exe_unit.simple_quals.empty()) {
2033  os << "\n\tSimple Quals: "
2035  ", ");
2036  }
2037  if (!ra_exe_unit.quals.empty()) {
2038  os << "\n\tQuals: "
2039  << boost::algorithm::join(expr_container_to_string(ra_exe_unit.quals), ", ");
2040  }
2041  if (!ra_exe_unit.join_quals.empty()) {
2042  os << "\n\tJoin Quals: ";
2043  for (size_t i = 0; i < ra_exe_unit.join_quals.size(); i++) {
2044  const auto& join_condition = ra_exe_unit.join_quals[i];
2045  os << "\t\t" << std::to_string(i) << " " << ::toString(join_condition.type);
2046  os << boost::algorithm::join(expr_container_to_string(join_condition.quals), ", ");
2047  }
2048  }
2049  if (!ra_exe_unit.groupby_exprs.empty()) {
2050  os << "\n\tGroup By: "
2052  ", ");
2053  }
2054  os << "\n\tProjected targets: "
2056  os << "\n\tHas Estimator: " << ::toString(ra_exe_unit.estimator == nullptr);
2057  os << "\n\tSort Info: ";
2058  const auto& sort_info = ra_exe_unit.sort_info;
2059  os << "\n\t Order Entries: "
2060  << boost::algorithm::join(expr_container_to_string(sort_info.order_entries), ", ");
2061  os << "\n\t Algorithm: " << sort_algorithm_to_string(sort_info.algorithm);
2062  std::string limit_str = sort_info.limit ? std::to_string(*sort_info.limit) : "N/A";
2063  os << "\n\t Limit: " << limit_str;
2064  os << "\n\t Offset: " << std::to_string(sort_info.offset);
2065  os << "\n\tScan Limit: " << std::to_string(ra_exe_unit.scan_limit);
2066  os << "\n\tBump Allocator: " << ::toString(ra_exe_unit.use_bump_allocator);
2067  if (ra_exe_unit.union_all) {
2068  os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION");
2069  }
2070  return os;
2071 }
2072 
2073 namespace {
2074 
2076  const size_t new_scan_limit) {
2077  return {ra_exe_unit_in.input_descs,
2078  ra_exe_unit_in.input_col_descs,
2079  ra_exe_unit_in.simple_quals,
2080  ra_exe_unit_in.quals,
2081  ra_exe_unit_in.join_quals,
2082  ra_exe_unit_in.groupby_exprs,
2083  ra_exe_unit_in.target_exprs,
2084  ra_exe_unit_in.target_exprs_original_type_infos,
2085  ra_exe_unit_in.estimator,
2086  ra_exe_unit_in.sort_info,
2087  new_scan_limit,
2088  ra_exe_unit_in.query_hint,
2089  ra_exe_unit_in.query_plan_dag_hash,
2090  ra_exe_unit_in.hash_table_build_plan_dag,
2091  ra_exe_unit_in.table_id_to_node_map,
2092  ra_exe_unit_in.use_bump_allocator,
2093  ra_exe_unit_in.union_all,
2094  ra_exe_unit_in.query_state};
2095 }
2096 
2097 } // namespace
2098 
2099 ResultSetPtr Executor::executeWorkUnit(size_t& max_groups_buffer_entry_guess,
2100  const bool is_agg,
2101  const std::vector<InputTableInfo>& query_infos,
2102  const RelAlgExecutionUnit& ra_exe_unit_in,
2103  const CompilationOptions& co,
2104  const ExecutionOptions& eo,
2105  RenderInfo* render_info,
2106  const bool has_cardinality_estimation,
2107  ColumnCacheMap& column_cache) {
2108  VLOG(1) << "Executor " << executor_id_ << " is executing work unit:" << ra_exe_unit_in;
2109  ScopeGuard cleanup_post_execution = [this] {
2110  // cleanup/unpin GPU buffer allocations
2111  // TODO: separate out this state into a single object
2112  VLOG(1) << "Perform post execution clearance for Executor " << executor_id_;
2113  plan_state_.reset(nullptr);
2114  if (cgen_state_) {
2115  cgen_state_->in_values_bitmaps_.clear();
2116  cgen_state_->str_dict_translation_mgrs_.clear();
2117  cgen_state_->tree_model_prediction_mgrs_.clear();
2118  }
2119  row_set_mem_owner_->clearNonOwnedGroupByBuffers();
2120  };
2121 
2122  try {
2123  auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess,
2124  is_agg,
2125  true,
2126  query_infos,
2127  ra_exe_unit_in,
2128  co,
2129  eo,
2131  render_info,
2132  has_cardinality_estimation,
2133  column_cache);
2134  if (result) {
2135  result->setKernelQueueTime(kernel_queue_time_ms_);
2136  result->addCompilationQueueTime(compilation_queue_time_ms_);
2137  if (eo.just_validate) {
2138  result->setValidationOnlyRes();
2139  }
2140  }
2141  return result;
2142  } catch (const CompilationRetryNewScanLimit& e) {
2143  auto result =
2144  executeWorkUnitImpl(max_groups_buffer_entry_guess,
2145  is_agg,
2146  false,
2147  query_infos,
2148  replace_scan_limit(ra_exe_unit_in, e.new_scan_limit_),
2149  co,
2150  eo,
2152  render_info,
2153  has_cardinality_estimation,
2154  column_cache);
2155  if (result) {
2156  result->setKernelQueueTime(kernel_queue_time_ms_);
2157  result->addCompilationQueueTime(compilation_queue_time_ms_);
2158  if (eo.just_validate) {
2159  result->setValidationOnlyRes();
2160  }
2161  }
2162  return result;
2163  }
2164 }
2165 
2167  size_t& max_groups_buffer_entry_guess,
2168  const bool is_agg,
2169  const bool allow_single_frag_table_opt,
2170  const std::vector<InputTableInfo>& query_infos,
2171  const RelAlgExecutionUnit& ra_exe_unit_in,
2172  const CompilationOptions& co,
2173  const ExecutionOptions& eo,
2174  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
2175  RenderInfo* render_info,
2176  const bool has_cardinality_estimation,
2177  ColumnCacheMap& column_cache) {
2178  INJECT_TIMER(Exec_executeWorkUnit);
2179  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
2180  const auto device_type = getDeviceTypeForTargets(ra_exe_unit, co.device_type);
2181  CHECK(!query_infos.empty());
2182  if (!max_groups_buffer_entry_guess) {
2183  // The query has failed the first execution attempt because of running out
2184  // of group by slots. Make the conservative choice: allocate fragment size
2185  // slots and run on the CPU.
2186  CHECK(device_type == ExecutorDeviceType::CPU);
2187  max_groups_buffer_entry_guess =
2188  compute_buffer_entry_guess(query_infos, ra_exe_unit_in);
2189  }
2190 
2191  int8_t crt_min_byte_width{MAX_BYTE_WIDTH_SUPPORTED};
2192  CompilationOptions copied_co = co;
2193  copied_co.device_type = device_type;
2194  do {
2195  SharedKernelContext shared_context(query_infos);
2196  ColumnFetcher column_fetcher(this, column_cache);
2197  ScopeGuard scope_guard = [&column_fetcher] {
2198  column_fetcher.freeLinearizedBuf();
2199  column_fetcher.freeTemporaryCpuLinearizedIdxBuf();
2200  };
2201  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
2202  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
2203  if (eo.executor_type == ExecutorType::Native) {
2204  try {
2205  INJECT_TIMER(query_step_compilation);
2206  query_mem_desc_owned =
2207  query_comp_desc_owned->compile(max_groups_buffer_entry_guess,
2208  crt_min_byte_width,
2209  has_cardinality_estimation,
2210  ra_exe_unit,
2211  query_infos,
2212  deleted_cols_map,
2213  column_fetcher,
2214  copied_co,
2215  eo,
2216  render_info,
2217  this);
2218  CHECK(query_mem_desc_owned);
2219  crt_min_byte_width = query_comp_desc_owned->getMinByteWidth();
2220  } catch (CompilationRetryNoCompaction& e) {
2221  VLOG(1) << e.what();
2222  crt_min_byte_width = MAX_BYTE_WIDTH_SUPPORTED;
2223  continue;
2224  }
2225  } else {
2226  plan_state_.reset(new PlanState(false, query_infos, deleted_cols_map, this));
2227  plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
2228  CHECK(!query_mem_desc_owned);
2229  query_mem_desc_owned.reset(
2231  }
2232  if (eo.just_explain) {
2233  return executeExplain(*query_comp_desc_owned);
2234  }
2235 
2236  if (query_mem_desc_owned->canUsePerDeviceCardinality(ra_exe_unit)) {
2237  auto const max_rows_per_device =
2238  query_mem_desc_owned->getMaxPerDeviceCardinality(ra_exe_unit);
2239  if (max_rows_per_device && *max_rows_per_device >= 0 &&
2240  *max_rows_per_device < query_mem_desc_owned->getEntryCount()) {
2241  VLOG(1) << "Setting the max per device cardinality of {max_rows_per_device} as "
2242  "the new scan limit: "
2243  << *max_rows_per_device;
2244  throw CompilationRetryNewScanLimit(*max_rows_per_device);
2245  }
2246  }
2247 
2248  if (!eo.just_validate) {
2249  int available_cpus = cpu_threads();
2250  auto available_gpus = get_available_gpus(data_mgr_);
2251 
2252  const auto context_count =
2253  get_context_count(device_type, available_cpus, available_gpus.size());
2254  try {
2255  auto kernels = createKernels(shared_context,
2256  ra_exe_unit,
2257  column_fetcher,
2258  query_infos,
2259  eo,
2260  is_agg,
2261  allow_single_frag_table_opt,
2262  context_count,
2263  *query_comp_desc_owned,
2264  *query_mem_desc_owned,
2265  render_info,
2266  available_gpus,
2267  available_cpus);
2268  if (!kernels.empty()) {
2269  row_set_mem_owner_->setKernelMemoryAllocator(kernels.size());
2270  }
2272  launchKernelsViaResourceMgr(shared_context,
2273  std::move(kernels),
2274  query_comp_desc_owned->getDeviceType(),
2275  ra_exe_unit.input_descs,
2276  *query_mem_desc_owned);
2277  } else {
2279  shared_context, std::move(kernels), query_comp_desc_owned->getDeviceType());
2280  }
2281 
2282  } catch (QueryExecutionError& e) {
2283  if (eo.with_dynamic_watchdog && interrupted_.load() &&
2284  e.hasErrorCode(ErrorCode::OUT_OF_TIME)) {
2285  throw QueryExecutionError(ErrorCode::INTERRUPTED);
2286  }
2287  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
2288  throw QueryExecutionError(ErrorCode::INTERRUPTED);
2289  }
2290  if (e.hasErrorCode(ErrorCode::OVERFLOW_OR_UNDERFLOW) &&
2291  static_cast<size_t>(crt_min_byte_width << 1) <= sizeof(int64_t)) {
2292  crt_min_byte_width <<= 1;
2293  continue;
2294  }
2295  throw;
2296  }
2297  }
2298  if (is_agg) {
2299  if (eo.allow_runtime_query_interrupt && ra_exe_unit.query_state) {
2300  // update query status to let user know we are now in the reduction phase
2301  std::string curRunningSession{""};
2302  std::string curRunningQuerySubmittedTime{""};
2303  bool sessionEnrolled = false;
2304  {
2307  curRunningSession = getCurrentQuerySession(session_read_lock);
2308  curRunningQuerySubmittedTime = ra_exe_unit.query_state->getQuerySubmittedTime();
2309  sessionEnrolled =
2310  checkIsQuerySessionEnrolled(curRunningSession, session_read_lock);
2311  }
2312  if (!curRunningSession.empty() && !curRunningQuerySubmittedTime.empty() &&
2313  sessionEnrolled) {
2314  updateQuerySessionStatus(curRunningSession,
2315  curRunningQuerySubmittedTime,
2317  }
2318  }
2319  try {
2320  if (eo.estimate_output_cardinality) {
2321  for (const auto& result : shared_context.getFragmentResults()) {
2322  auto row = result.first->getNextRow(false, false);
2323  CHECK_EQ(1u, row.size());
2324  auto scalar_r = boost::get<ScalarTargetValue>(&row[0]);
2325  CHECK(scalar_r);
2326  auto p = boost::get<int64_t>(scalar_r);
2327  CHECK(p);
2328  // todo(yoonmin): sort the frag_ids to make it consistent for later usage
2329  auto frag_ids = result.second;
2330  VLOG(1) << "Filtered cardinality for fragments-{" << ::toString(result.second)
2331  << "} : " << static_cast<size_t>(*p);
2332  ra_exe_unit_in.per_device_cardinality.emplace_back(result.second,
2333  static_cast<size_t>(*p));
2334  result.first->moveToBegin();
2335  }
2336  }
2337  return collectAllDeviceResults(shared_context,
2338  ra_exe_unit,
2339  *query_mem_desc_owned,
2340  query_comp_desc_owned->getDeviceType(),
2341  row_set_mem_owner);
2342  } catch (ReductionRanOutOfSlots&) {
2343  throw QueryExecutionError(ErrorCode::OUT_OF_SLOTS);
2344  } catch (OverflowOrUnderflow&) {
2345  crt_min_byte_width <<= 1;
2346  continue;
2347  } catch (QueryExecutionError& e) {
2348  VLOG(1) << "Error received! error_code: " << e.getErrorCode()
2349  << ", what(): " << e.what();
2350  throw QueryExecutionError(e.getErrorCode());
2351  }
2352  }
2353  return resultsUnion(shared_context, ra_exe_unit);
2354 
2355  } while (static_cast<size_t>(crt_min_byte_width) <= sizeof(int64_t));
2356 
2357  return std::make_shared<ResultSet>(std::vector<TargetInfo>{},
2360  nullptr,
2361  blockSize(),
2362  gridSize());
2363 }
2364 
2366  const RelAlgExecutionUnit& ra_exe_unit_in,
2367  const InputTableInfo& table_info,
2368  const CompilationOptions& co,
2369  const ExecutionOptions& eo,
2371  PerFragmentCallBack& cb,
2372  const std::set<size_t>& fragment_indexes_param) {
2373  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
2374  ColumnCacheMap column_cache;
2375 
2376  std::vector<InputTableInfo> table_infos{table_info};
2377  SharedKernelContext kernel_context(table_infos);
2378 
2379  ColumnFetcher column_fetcher(this, column_cache);
2380  auto query_comp_desc_owned = std::make_unique<QueryCompilationDescriptor>();
2381  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc_owned;
2382  {
2383  query_mem_desc_owned =
2384  query_comp_desc_owned->compile(0,
2385  8,
2386  /*has_cardinality_estimation=*/false,
2387  ra_exe_unit,
2388  table_infos,
2389  deleted_cols_map,
2390  column_fetcher,
2391  co,
2392  eo,
2393  nullptr,
2394  this);
2395  }
2396  CHECK(query_mem_desc_owned);
2397  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
2398  const auto table_key = ra_exe_unit.input_descs[0].getTableKey();
2399  const auto& outer_fragments = table_info.info.fragments;
2400 
2401  std::set<size_t> fragment_indexes;
2402  if (fragment_indexes_param.empty()) {
2403  // An empty `fragment_indexes_param` set implies executing
2404  // the query for all fragments in the table. In this
2405  // case, populate `fragment_indexes` with all fragment indexes.
2406  for (size_t i = 0; i < outer_fragments.size(); i++) {
2407  fragment_indexes.emplace(i);
2408  }
2409  } else {
2410  fragment_indexes = fragment_indexes_param;
2411  }
2412 
2413  {
2414  auto clock_begin = timer_start();
2415  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
2416  kernel_queue_time_ms_ += timer_stop(clock_begin);
2417 
2418  for (auto fragment_index : fragment_indexes) {
2419  // We may want to consider in the future allowing this to execute on devices other
2420  // than CPU
2421  FragmentsList fragments_list{{table_key, {fragment_index}}};
2422  ExecutionKernel kernel(ra_exe_unit,
2423  co.device_type,
2424  /*device_id=*/0,
2425  eo,
2426  column_fetcher,
2427  *query_comp_desc_owned,
2428  *query_mem_desc_owned,
2429  fragments_list,
2431  /*render_info=*/nullptr,
2432  /*rowid_lookup_key=*/-1);
2433  kernel.run(this, 0, kernel_context);
2434  }
2435  }
2436 
2437  const auto& all_fragment_results = kernel_context.getFragmentResults();
2438 
2439  for (const auto& [result_set_ptr, result_fragment_indexes] : all_fragment_results) {
2440  CHECK_EQ(result_fragment_indexes.size(), 1);
2441  cb(result_set_ptr, outer_fragments[result_fragment_indexes[0]]);
2442  }
2443 }
2444 
2446  const TableFunctionExecutionUnit exe_unit,
2447  const std::vector<InputTableInfo>& table_infos,
2448  const CompilationOptions& co,
2449  const ExecutionOptions& eo) {
2450  INJECT_TIMER(Exec_executeTableFunction);
2451  if (eo.just_validate) {
2453  /*entry_count=*/0,
2455  return std::make_shared<ResultSet>(
2456  target_exprs_to_infos(exe_unit.target_exprs, query_mem_desc),
2457  co.device_type,
2458  ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
2459  this->getRowSetMemoryOwner(),
2460  this->blockSize(),
2461  this->gridSize());
2462  }
2463 
2464  // Avoid compile functions that set the sizer at runtime if the device is GPU
2465  // This should be fixed in the python script as well to minimize the number of
2466  // QueryMustRunOnCpu exceptions
2469  throw QueryMustRunOnCpu();
2470  }
2471 
2472  ColumnCacheMap column_cache; // Note: if we add retries to the table function
2473  // framework, we may want to move this up a level
2474 
2475  ColumnFetcher column_fetcher(this, column_cache);
2477 
2478  if (exe_unit.table_func.containsPreFlightFn()) {
2479  std::shared_ptr<CompilationContext> compilation_context;
2480  {
2481  Executor::CgenStateManager cgenstate_manager(*this,
2482  false,
2483  table_infos,
2485  nullptr); // locks compilation_mutex
2487  TableFunctionCompilationContext tf_compilation_context(this, pre_flight_co);
2488  compilation_context =
2489  tf_compilation_context.compile(exe_unit, true /* emit_only_preflight_fn*/);
2490  }
2491  exe_context.execute(exe_unit,
2492  table_infos,
2493  compilation_context,
2494  column_fetcher,
2496  this,
2497  true /* is_pre_launch_udtf */);
2498  }
2499  std::shared_ptr<CompilationContext> compilation_context;
2500  {
2501  Executor::CgenStateManager cgenstate_manager(*this,
2502  false,
2503  table_infos,
2505  nullptr); // locks compilation_mutex
2506  TableFunctionCompilationContext tf_compilation_context(this, co);
2507  compilation_context =
2508  tf_compilation_context.compile(exe_unit, false /* emit_only_preflight_fn */);
2509  }
2510  return exe_context.execute(exe_unit,
2511  table_infos,
2512  compilation_context,
2513  column_fetcher,
2514  co.device_type,
2515  this,
2516  false /* is_pre_launch_udtf */);
2517 }
2518 
2520  return std::make_shared<ResultSet>(query_comp_desc.getIR());
2521 }
2522 
2524  const RelAlgExecutionUnit& ra_exe_unit,
2525  const std::shared_ptr<RowSetMemoryOwner>& row_set_mem_owner) {
2526  TransientDictIdVisitor dict_id_visitor;
2527 
2528  auto visit_expr =
2529  [this, &dict_id_visitor, &row_set_mem_owner](const Analyzer::Expr* expr) {
2530  if (!expr) {
2531  return;
2532  }
2533  const auto& dict_key = dict_id_visitor.visit(expr);
2534  if (dict_key.dict_id >= 0) {
2535  auto sdp = getStringDictionaryProxy(dict_key, row_set_mem_owner, true);
2536  CHECK(sdp);
2537  TransientStringLiteralsVisitor visitor(sdp, this);
2538  visitor.visit(expr);
2539  }
2540  };
2541 
2542  for (const auto& group_expr : ra_exe_unit.groupby_exprs) {
2543  visit_expr(group_expr.get());
2544  }
2545 
2546  for (const auto& group_expr : ra_exe_unit.quals) {
2547  visit_expr(group_expr.get());
2548  }
2549 
2550  for (const auto& group_expr : ra_exe_unit.simple_quals) {
2551  visit_expr(group_expr.get());
2552  }
2553 
2554  const auto visit_target_expr = [&](const Analyzer::Expr* target_expr) {
2555  const auto& target_type = target_expr->get_type_info();
2556  if (!target_type.is_string() || target_type.get_compression() == kENCODING_DICT) {
2557  const auto agg_expr = dynamic_cast<const Analyzer::AggExpr*>(target_expr);
2558  if (agg_expr) {
2559  // The following agg types require taking into account transient string values
2560  if (agg_expr->get_is_distinct() || agg_expr->get_aggtype() == kSINGLE_VALUE ||
2561  agg_expr->get_aggtype() == kSAMPLE || agg_expr->get_aggtype() == kMODE) {
2562  visit_expr(agg_expr->get_arg());
2563  }
2564  } else {
2565  visit_expr(target_expr);
2566  }
2567  }
2568  };
2569  const auto& target_exprs = ra_exe_unit.target_exprs;
2570  std::for_each(target_exprs.begin(), target_exprs.end(), visit_target_expr);
2571  const auto& target_exprs_union = ra_exe_unit.target_exprs_union;
2572  std::for_each(target_exprs_union.begin(), target_exprs_union.end(), visit_target_expr);
2573 }
2574 
2576  const RelAlgExecutionUnit& ra_exe_unit,
2577  const ExecutorDeviceType requested_device_type) {
2578  if (!getDataMgr()->gpusPresent()) {
2579  return ExecutorDeviceType::CPU;
2580  }
2581  for (const auto target_expr : ra_exe_unit.target_exprs) {
2582  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2583  if (!ra_exe_unit.groupby_exprs.empty() &&
2584  !isArchPascalOrLater(requested_device_type)) {
2585  if ((agg_info.agg_kind == kAVG || agg_info.agg_kind == kSUM ||
2586  agg_info.agg_kind == kSUM_IF) &&
2587  agg_info.agg_arg_type.get_type() == kDOUBLE) {
2588  return ExecutorDeviceType::CPU;
2589  }
2590  }
2591  if (dynamic_cast<const Analyzer::RegexpExpr*>(target_expr)) {
2592  return ExecutorDeviceType::CPU;
2593  }
2594  }
2595  return requested_device_type;
2596 }
2597 
2598 namespace {
2599 
2600 int64_t inline_null_val(const SQLTypeInfo& ti, const bool float_argument_input) {
2601  CHECK(ti.is_number() || ti.is_time() || ti.is_boolean() || ti.is_string());
2602  if (ti.is_fp()) {
2603  if (float_argument_input && ti.get_type() == kFLOAT) {
2604  int64_t float_null_val = 0;
2605  *reinterpret_cast<float*>(may_alias_ptr(&float_null_val)) =
2606  static_cast<float>(inline_fp_null_val(ti));
2607  return float_null_val;
2608  }
2609  const auto double_null_val = inline_fp_null_val(ti);
2610  return *reinterpret_cast<const int64_t*>(may_alias_ptr(&double_null_val));
2611  }
2612  return inline_int_null_val(ti);
2613 }
2614 
2615 void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
2616  std::vector<int64_t>& entry,
2617  const std::vector<Analyzer::Expr*>& target_exprs,
2619  for (size_t target_idx = 0; target_idx < target_exprs.size(); ++target_idx) {
2620  const auto target_expr = target_exprs[target_idx];
2621  const auto agg_info = get_target_info(target_expr, g_bigint_count);
2622  CHECK(agg_info.is_agg);
2623  target_infos.push_back(agg_info);
2624  if (g_cluster) {
2625  const auto executor = query_mem_desc.getExecutor();
2626  CHECK(executor);
2627  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2628  CHECK(row_set_mem_owner);
2629  const auto& count_distinct_desc =
2630  query_mem_desc.getCountDistinctDescriptor(target_idx);
2631  if (count_distinct_desc.impl_type_ == CountDistinctImplType::Bitmap) {
2632  CHECK(row_set_mem_owner);
2633  // TODO: can we detect thread idx here?
2634  constexpr size_t thread_idx{0};
2635  const auto bitmap_size = count_distinct_desc.bitmapPaddedSizeBytes();
2636  row_set_mem_owner->initCountDistinctBufferAllocator(bitmap_size, thread_idx);
2637  auto count_distinct_buffer =
2638  row_set_mem_owner->allocateCountDistinctBuffer(bitmap_size, thread_idx);
2639  entry.push_back(reinterpret_cast<int64_t>(count_distinct_buffer));
2640  continue;
2641  }
2642  if (count_distinct_desc.impl_type_ == CountDistinctImplType::UnorderedSet) {
2643  auto count_distinct_set = new CountDistinctSet();
2644  CHECK(row_set_mem_owner);
2645  row_set_mem_owner->addCountDistinctSet(count_distinct_set);
2646  entry.push_back(reinterpret_cast<int64_t>(count_distinct_set));
2647  continue;
2648  }
2649  }
2650  const bool float_argument_input = takes_float_argument(agg_info);
2651  if (shared::is_any<kCOUNT, kCOUNT_IF, kAPPROX_COUNT_DISTINCT>(agg_info.agg_kind)) {
2652  entry.push_back(0);
2653  } else if (shared::is_any<kAVG>(agg_info.agg_kind)) {
2654  entry.push_back(0);
2655  entry.push_back(0);
2656  } else if (shared::is_any<kSINGLE_VALUE, kSAMPLE>(agg_info.agg_kind)) {
2657  if (agg_info.sql_type.is_geometry() && !agg_info.is_varlen_projection) {
2658  for (int i = 0; i < agg_info.sql_type.get_physical_coord_cols() * 2; i++) {
2659  entry.push_back(0);
2660  }
2661  } else if (agg_info.sql_type.is_varlen()) {
2662  entry.push_back(0);
2663  entry.push_back(0);
2664  } else {
2665  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2666  }
2667  } else {
2668  entry.push_back(inline_null_val(agg_info.sql_type, float_argument_input));
2669  }
2670  }
2671 }
2672 
2674  const std::vector<Analyzer::Expr*>& target_exprs_in,
2676  const ExecutorDeviceType device_type) {
2677  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_copies;
2678  std::vector<Analyzer::Expr*> target_exprs;
2679  for (const auto target_expr : target_exprs_in) {
2680  const auto target_expr_copy =
2681  std::dynamic_pointer_cast<Analyzer::AggExpr>(target_expr->deep_copy());
2682  CHECK(target_expr_copy);
2683  auto ti = target_expr->get_type_info();
2684  ti.set_notnull(false);
2685  target_expr_copy->set_type_info(ti);
2686  if (target_expr_copy->get_arg()) {
2687  auto arg_ti = target_expr_copy->get_arg()->get_type_info();
2688  arg_ti.set_notnull(false);
2689  target_expr_copy->get_arg()->set_type_info(arg_ti);
2690  }
2691  target_exprs_owned_copies.push_back(target_expr_copy);
2692  target_exprs.push_back(target_expr_copy.get());
2693  }
2694  std::vector<TargetInfo> target_infos;
2695  std::vector<int64_t> entry;
2696  fill_entries_for_empty_input(target_infos, entry, target_exprs, query_mem_desc);
2697  const auto executor = query_mem_desc.getExecutor();
2698  CHECK(executor);
2699  // todo(yoonmin): Can we avoid initialize DramArena for this empty result case?
2700  auto row_set_mem_owner = executor->getRowSetMemoryOwner();
2701  CHECK(row_set_mem_owner);
2702  auto rs = std::make_shared<ResultSet>(target_infos,
2703  device_type,
2705  row_set_mem_owner,
2706  executor->blockSize(),
2707  executor->gridSize());
2708  rs->allocateStorage();
2709  rs->fillOneEntry(entry);
2710  return rs;
2711 }
2712 
2713 } // namespace
2714 
2716  SharedKernelContext& shared_context,
2717  const RelAlgExecutionUnit& ra_exe_unit,
2719  const ExecutorDeviceType device_type,
2720  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner) {
2721  auto timer = DEBUG_TIMER(__func__);
2722  auto& result_per_device = shared_context.getFragmentResults();
2723  if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
2726  ra_exe_unit.target_exprs, query_mem_desc, device_type);
2727  }
2728  if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
2729  try {
2730  return reduceSpeculativeTopN(
2731  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2732  } catch (const std::bad_alloc&) {
2733  throw SpeculativeTopNFailed("Failed during multi-device reduction.");
2734  }
2735  }
2736  const auto shard_count =
2737  device_type == ExecutorDeviceType::GPU
2739  : 0;
2740 
2741  if (shard_count && !result_per_device.empty()) {
2742  return collectAllDeviceShardedTopResults(shared_context, ra_exe_unit, device_type);
2743  }
2744  return reduceMultiDeviceResults(
2745  ra_exe_unit, result_per_device, row_set_mem_owner, query_mem_desc);
2746 }
2747 
2748 namespace {
2759 size_t permute_storage_columnar(const ResultSetStorage* input_storage,
2760  const QueryMemoryDescriptor& input_query_mem_desc,
2761  const ResultSetStorage* output_storage,
2762  size_t output_row_index,
2763  const QueryMemoryDescriptor& output_query_mem_desc,
2764  const std::vector<uint32_t>& top_permutation) {
2765  const auto output_buffer = output_storage->getUnderlyingBuffer();
2766  const auto input_buffer = input_storage->getUnderlyingBuffer();
2767  for (const auto sorted_idx : top_permutation) {
2768  // permuting all group-columns in this result set into the final buffer:
2769  for (size_t group_idx = 0; group_idx < input_query_mem_desc.getKeyCount();
2770  group_idx++) {
2771  const auto input_column_ptr =
2772  input_buffer + input_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2773  sorted_idx * input_query_mem_desc.groupColWidth(group_idx);
2774  const auto output_column_ptr =
2775  output_buffer +
2776  output_query_mem_desc.getPrependedGroupColOffInBytes(group_idx) +
2777  output_row_index * output_query_mem_desc.groupColWidth(group_idx);
2778  memcpy(output_column_ptr,
2779  input_column_ptr,
2780  output_query_mem_desc.groupColWidth(group_idx));
2781  }
2782  // permuting all agg-columns in this result set into the final buffer:
2783  for (size_t slot_idx = 0; slot_idx < input_query_mem_desc.getSlotCount();
2784  slot_idx++) {
2785  const auto input_column_ptr =
2786  input_buffer + input_query_mem_desc.getColOffInBytes(slot_idx) +
2787  sorted_idx * input_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2788  const auto output_column_ptr =
2789  output_buffer + output_query_mem_desc.getColOffInBytes(slot_idx) +
2790  output_row_index * output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx);
2791  memcpy(output_column_ptr,
2792  input_column_ptr,
2793  output_query_mem_desc.getPaddedSlotWidthBytes(slot_idx));
2794  }
2795  ++output_row_index;
2796  }
2797  return output_row_index;
2798 }
2799 
2809 size_t permute_storage_row_wise(const ResultSetStorage* input_storage,
2810  const ResultSetStorage* output_storage,
2811  size_t output_row_index,
2812  const QueryMemoryDescriptor& output_query_mem_desc,
2813  const std::vector<uint32_t>& top_permutation) {
2814  const auto output_buffer = output_storage->getUnderlyingBuffer();
2815  const auto input_buffer = input_storage->getUnderlyingBuffer();
2816  for (const auto sorted_idx : top_permutation) {
2817  const auto row_ptr = input_buffer + sorted_idx * output_query_mem_desc.getRowSize();
2818  memcpy(output_buffer + output_row_index * output_query_mem_desc.getRowSize(),
2819  row_ptr,
2820  output_query_mem_desc.getRowSize());
2821  ++output_row_index;
2822  }
2823  return output_row_index;
2824 }
2825 } // namespace
2826 
2827 // Collect top results from each device, stitch them together and sort. Partial
2828 // results from each device are guaranteed to be disjunct because we only go on
2829 // this path when one of the columns involved is a shard key.
2831  SharedKernelContext& shared_context,
2832  const RelAlgExecutionUnit& ra_exe_unit,
2833  const ExecutorDeviceType device_type) const {
2834  auto& result_per_device = shared_context.getFragmentResults();
2835  const auto first_result_set = result_per_device.front().first;
2836  CHECK(first_result_set);
2837  auto top_query_mem_desc = first_result_set->getQueryMemDesc();
2838  CHECK(!top_query_mem_desc.hasInterleavedBinsOnGpu());
2839  const auto top_n =
2840  ra_exe_unit.sort_info.limit.value_or(0) + ra_exe_unit.sort_info.offset;
2841  top_query_mem_desc.setEntryCount(0);
2842  for (auto& result : result_per_device) {
2843  const auto result_set = result.first;
2844  CHECK(result_set);
2845  result_set->sort(ra_exe_unit.sort_info.order_entries, top_n, device_type, this);
2846  size_t new_entry_cnt = top_query_mem_desc.getEntryCount() + result_set->rowCount();
2847  top_query_mem_desc.setEntryCount(new_entry_cnt);
2848  }
2849  auto top_result_set = std::make_shared<ResultSet>(first_result_set->getTargetInfos(),
2850  first_result_set->getDeviceType(),
2851  top_query_mem_desc,
2852  first_result_set->getRowSetMemOwner(),
2853  blockSize(),
2854  gridSize());
2855  auto top_storage = top_result_set->allocateStorage();
2856  size_t top_output_row_idx{0};
2857  for (auto& result : result_per_device) {
2858  const auto result_set = result.first;
2859  CHECK(result_set);
2860  const auto& top_permutation = result_set->getPermutationBuffer();
2861  CHECK_LE(top_permutation.size(), top_n);
2862  if (top_query_mem_desc.didOutputColumnar()) {
2863  top_output_row_idx = permute_storage_columnar(result_set->getStorage(),
2864  result_set->getQueryMemDesc(),
2865  top_storage,
2866  top_output_row_idx,
2867  top_query_mem_desc,
2868  top_permutation);
2869  } else {
2870  top_output_row_idx = permute_storage_row_wise(result_set->getStorage(),
2871  top_storage,
2872  top_output_row_idx,
2873  top_query_mem_desc,
2874  top_permutation);
2875  }
2876  }
2877  CHECK_EQ(top_output_row_idx, top_query_mem_desc.getEntryCount());
2878  return top_result_set;
2879 }
2880 
2881 std::unordered_map<shared::TableKey, const Analyzer::BinOper*>
2883  std::unordered_map<shared::TableKey, const Analyzer::BinOper*> id_to_cond;
2884  const auto& join_info = plan_state_->join_info_;
2885  CHECK_EQ(join_info.equi_join_tautologies_.size(), join_info.join_hash_tables_.size());
2886  for (size_t i = 0; i < join_info.join_hash_tables_.size(); ++i) {
2887  const auto& inner_table_key = join_info.join_hash_tables_[i]->getInnerTableId();
2888  id_to_cond.insert(
2889  std::make_pair(inner_table_key, join_info.equi_join_tautologies_[i].get()));
2890  }
2891  return id_to_cond;
2892 }
2893 
2894 namespace {
2895 
2896 bool has_lazy_fetched_columns(const std::vector<ColumnLazyFetchInfo>& fetched_cols) {
2897  for (const auto& col : fetched_cols) {
2898  if (col.is_lazily_fetched) {
2899  return true;
2900  }
2901  }
2902  return false;
2903 }
2904 
2905 } // namespace
2906 
2907 std::vector<std::unique_ptr<ExecutionKernel>> Executor::createKernels(
2908  SharedKernelContext& shared_context,
2909  const RelAlgExecutionUnit& ra_exe_unit,
2910  ColumnFetcher& column_fetcher,
2911  const std::vector<InputTableInfo>& table_infos,
2912  const ExecutionOptions& eo,
2913  const bool is_agg,
2914  const bool allow_single_frag_table_opt,
2915  const size_t context_count,
2916  const QueryCompilationDescriptor& query_comp_desc,
2918  RenderInfo* render_info,
2919  std::unordered_set<int>& available_gpus,
2920  int& available_cpus) {
2921  std::vector<std::unique_ptr<ExecutionKernel>> execution_kernels;
2922 
2923  QueryFragmentDescriptor fragment_descriptor(
2924  ra_exe_unit,
2925  table_infos,
2926  query_comp_desc.getDeviceType() == ExecutorDeviceType::GPU
2928  : std::vector<Data_Namespace::MemoryInfo>{},
2931  CHECK(!ra_exe_unit.input_descs.empty());
2932 
2933  const auto device_type = query_comp_desc.getDeviceType();
2934  const bool uses_lazy_fetch =
2935  plan_state_->allow_lazy_fetch_ &&
2937  const bool use_multifrag_kernel = (device_type == ExecutorDeviceType::GPU) &&
2938  eo.allow_multifrag && (!uses_lazy_fetch || is_agg);
2939  const auto device_count = deviceCount(device_type);
2940  CHECK_GT(device_count, 0);
2941 
2942  fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
2943  shared_context.getFragOffsets(),
2944  device_count,
2945  device_type,
2946  use_multifrag_kernel,
2948  this);
2949  if (eo.with_watchdog && fragment_descriptor.shouldCheckWorkUnitWatchdog()) {
2950  checkWorkUnitWatchdog(ra_exe_unit, table_infos, device_type, device_count);
2951  }
2952 
2953  if (use_multifrag_kernel) {
2954  VLOG(1) << "Creating multifrag execution kernels";
2955  VLOG(1) << query_mem_desc.toString();
2956 
2957  // NB: We should never be on this path when the query is retried because of running
2958  // out of group by slots; also, for scan only queries on CPU we want the
2959  // high-granularity, fragment by fragment execution instead. For scan only queries on
2960  // GPU, we want the multifrag kernel path to save the overhead of allocating an output
2961  // buffer per fragment.
2962  auto multifrag_kernel_dispatch = [&ra_exe_unit,
2963  &execution_kernels,
2964  &column_fetcher,
2965  &eo,
2966  &query_comp_desc,
2967  &query_mem_desc,
2968  render_info](const int device_id,
2969  const FragmentsList& frag_list,
2970  const int64_t rowid_lookup_key) {
2971  execution_kernels.emplace_back(
2972  std::make_unique<ExecutionKernel>(ra_exe_unit,
2974  device_id,
2975  eo,
2976  column_fetcher,
2977  query_comp_desc,
2978  query_mem_desc,
2979  frag_list,
2981  render_info,
2982  rowid_lookup_key));
2983  };
2984  fragment_descriptor.assignFragsToMultiDispatch(multifrag_kernel_dispatch);
2985  } else {
2986  VLOG(1) << "Creating one execution kernel per fragment";
2987  VLOG(1) << query_mem_desc.toString();
2988 
2989  if (!ra_exe_unit.use_bump_allocator && allow_single_frag_table_opt &&
2991  table_infos.size() == 1 && table_infos.front().table_key.table_id > 0) {
2992  const auto max_frag_size =
2993  table_infos.front().info.getFragmentNumTuplesUpperBound();
2994  if (max_frag_size < query_mem_desc.getEntryCount()) {
2995  LOG(INFO) << "Lowering scan limit from " << query_mem_desc.getEntryCount()
2996  << " to match max fragment size " << max_frag_size
2997  << " for kernel per fragment execution path.";
2998  throw CompilationRetryNewScanLimit(max_frag_size);
2999  }
3000  }
3001 
3002  size_t frag_list_idx{0};
3003  auto fragment_per_kernel_dispatch = [&ra_exe_unit,
3004  &execution_kernels,
3005  &column_fetcher,
3006  &eo,
3007  &frag_list_idx,
3008  &device_type,
3009  &query_comp_desc,
3010  &query_mem_desc,
3011  render_info](const int device_id,
3012  const FragmentsList& frag_list,
3013  const int64_t rowid_lookup_key) {
3014  if (!frag_list.size()) {
3015  return;
3016  }
3017  CHECK_GE(device_id, 0);
3018 
3019  execution_kernels.emplace_back(
3020  std::make_unique<ExecutionKernel>(ra_exe_unit,
3021  device_type,
3022  device_id,
3023  eo,
3024  column_fetcher,
3025  query_comp_desc,
3026  query_mem_desc,
3027  frag_list,
3029  render_info,
3030  rowid_lookup_key));
3031  ++frag_list_idx;
3032  };
3033 
3034  fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
3035  ra_exe_unit);
3036  }
3037  return execution_kernels;
3038 }
3039 
3041  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3042  const ExecutorDeviceType device_type,
3043  const size_t requested_num_threads) {
3044 #ifdef HAVE_TBB
3045  const size_t num_threads =
3046  requested_num_threads == Executor::auto_num_threads
3047  ? std::min(kernels.size(), static_cast<size_t>(cpu_threads()))
3048  : requested_num_threads;
3049  tbb::task_arena local_arena(num_threads);
3050 #else
3051  const size_t num_threads = cpu_threads();
3052 #endif
3053  shared_context.setNumAllocatedThreads(num_threads);
3054  LOG(EXECUTOR) << "Launching query step with " << num_threads << " threads.";
3056  // A hack to have unused unit for results collection.
3057  const RelAlgExecutionUnit* ra_exe_unit =
3058  kernels.empty() ? nullptr : &kernels[0]->ra_exe_unit_;
3059 
3060 #ifdef HAVE_TBB
3061  if (g_enable_cpu_sub_tasks && device_type == ExecutorDeviceType::CPU) {
3062  shared_context.setThreadPool(&tg);
3063  }
3064  ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(nullptr); });
3065 #endif // HAVE_TBB
3066 
3067  VLOG(1) << "Launching " << kernels.size() << " kernels for query on "
3068  << (device_type == ExecutorDeviceType::CPU ? "CPU"s : "GPU"s)
3069  << " using pool of " << num_threads << " threads.";
3070  size_t kernel_idx = 1;
3071 
3072  for (auto& kernel : kernels) {
3073  CHECK(kernel.get());
3074 #ifdef HAVE_TBB
3075  local_arena.execute([&] {
3076 #endif
3077  tg.run([this,
3078  &kernel,
3079  &shared_context,
3080  parent_thread_local_ids = logger::thread_local_ids(),
3081  num_threads,
3082  crt_kernel_idx = kernel_idx++] {
3083  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
3084  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
3085  // Keep monotonicity of thread_idx by kernel launch time, so that optimizations
3086  // such as launching kernels with data already in pool first become possible
3087 #ifdef HAVE_TBB
3088  const size_t old_thread_idx = crt_kernel_idx % num_threads;
3089  const size_t thread_idx = tbb::this_task_arena::current_thread_index();
3090  LOG(EXECUTOR) << "Thread idx: " << thread_idx
3091  << " Old thread idx: " << old_thread_idx;
3092 #else
3093  const size_t thread_idx = crt_kernel_idx % num_threads;
3094 #endif
3095  kernel->run(this, thread_idx, shared_context);
3096  });
3097 #ifdef HAVE_TBB
3098  }); // local_arena.execute[&]
3099 #endif
3100  }
3101 #ifdef HAVE_TBB
3102  local_arena.execute([&] { tg.wait(); });
3103 #else
3104  tg.wait();
3105 #endif
3106 
3107  for (auto& exec_ctx : shared_context.getTlsExecutionContext()) {
3108  // The first arg is used for GPU only, it's not our case.
3109  // TODO: add QueryExecutionContext::getRowSet() interface
3110  // for our case.
3111  if (exec_ctx) {
3112  ResultSetPtr results;
3113  if (ra_exe_unit->estimator) {
3114  results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
3115  } else {
3116  results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
3117  }
3118  shared_context.addDeviceResults(std::move(results), {});
3119  }
3120  }
3121 }
3122 
3124  SharedKernelContext& shared_context,
3125  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3126  const ExecutorDeviceType device_type) {
3127  auto clock_begin = timer_start();
3128  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
3129  kernel_queue_time_ms_ += timer_stop(clock_begin);
3130 
3132  shared_context, std::move(kernels), device_type, Executor::auto_num_threads);
3133 }
3134 
3136  SharedKernelContext& shared_context,
3137  std::vector<std::unique_ptr<ExecutionKernel>>&& kernels,
3138  const ExecutorDeviceType device_type,
3139  const std::vector<InputDescriptor>& input_descs,
3141  // CPU queries in general, plus some GPU queries, i.e. certain types of top-k sorts,
3142  // can generate more kernels than cores/GPU devices, so allow handle this for now
3143  // by capping the number of requested slots from GPU than actual GPUs
3144  const size_t num_kernels = kernels.size();
3145  constexpr bool cap_slots = false;
3146  const size_t num_compute_slots =
3147  cap_slots
3148  ? std::min(num_kernels,
3150  ->get_resource_info(
3151  device_type == ExecutorDeviceType::GPU
3154  .second)
3155  : num_kernels;
3156  const size_t cpu_result_mem_bytes_per_kernel =
3157  query_mem_desc.getBufferSizeBytes(device_type);
3158 
3159  std::vector<std::pair<int32_t, FragmentsList>> kernel_fragments_list;
3160  kernel_fragments_list.reserve(num_kernels);
3161  for (auto& kernel : kernels) {
3162  const auto device_id = kernel->get_chosen_device_id();
3163  const auto frag_list = kernel->get_fragment_list();
3164  if (!frag_list.empty()) {
3165  kernel_fragments_list.emplace_back(std::make_pair(device_id, frag_list));
3166  }
3167  }
3168  const auto chunk_request_info = getChunkRequestInfo(
3169  device_type, input_descs, shared_context.getQueryInfos(), kernel_fragments_list);
3170 
3171  auto gen_resource_request_info = [device_type,
3172  num_compute_slots,
3173  cpu_result_mem_bytes_per_kernel,
3174  &chunk_request_info,
3175  &query_mem_desc]() {
3176  if (device_type == ExecutorDeviceType::GPU) {
3178  device_type,
3179  static_cast<size_t>(0), // priority_level
3180  static_cast<size_t>(0), // cpu_slots
3181  static_cast<size_t>(0), // min_cpu_slots,
3182  num_compute_slots, // gpu_slots
3183  num_compute_slots, // min_gpu_slots
3184  cpu_result_mem_bytes_per_kernel * num_compute_slots, // cpu_result_mem,
3185  cpu_result_mem_bytes_per_kernel * num_compute_slots, // min_cpu_result_mem,
3186  chunk_request_info, // chunks needed
3187  false); // output_buffers_reusable_intra_thrad
3188  } else {
3189  const size_t min_cpu_slots{1};
3190  const size_t min_cpu_result_mem =
3191  query_mem_desc.threadsCanReuseGroupByBuffers()
3192  ? cpu_result_mem_bytes_per_kernel * min_cpu_slots
3193  : cpu_result_mem_bytes_per_kernel * num_compute_slots;
3195  device_type,
3196  static_cast<size_t>(0), // priority_level
3197  num_compute_slots, // cpu_slots
3198  min_cpu_slots, // min_cpu_slots
3199  size_t(0), // gpu_slots
3200  size_t(0), // min_gpu_slots
3201  cpu_result_mem_bytes_per_kernel * num_compute_slots, // cpu_result_mem
3202  min_cpu_result_mem, // min_cpu_result_mem
3203  chunk_request_info, // chunks needed
3204  query_mem_desc
3205  .threadsCanReuseGroupByBuffers()); // output_buffers_reusable_intra_thread
3206  }
3207  };
3208 
3209  const auto resource_request_info = gen_resource_request_info();
3210 
3211  auto clock_begin = timer_start();
3212  const bool is_empty_request =
3213  resource_request_info.cpu_slots == 0UL && resource_request_info.gpu_slots == 0UL;
3214  auto resource_handle =
3215  is_empty_request ? nullptr
3216  : executor_resource_mgr_->request_resources(resource_request_info);
3217  const auto num_cpu_threads =
3218  is_empty_request ? 0UL : resource_handle->get_resource_grant().cpu_slots;
3219  if (device_type == ExecutorDeviceType::GPU) {
3220  const auto num_gpu_slots =
3221  is_empty_request ? 0UL : resource_handle->get_resource_grant().gpu_slots;
3222  VLOG(1) << "In Executor::LaunchKernels executor " << getExecutorId() << " requested "
3223  << "between " << resource_request_info.min_gpu_slots << " and "
3224  << resource_request_info.gpu_slots << " GPU slots, and was granted "
3225  << num_gpu_slots << " GPU slots.";
3226  } else {
3227  VLOG(1) << "In Executor::LaunchKernels executor " << getExecutorId() << " requested "
3228  << "between " << resource_request_info.min_cpu_slots << " and "
3229  << resource_request_info.cpu_slots << " CPU slots, and was granted "
3230  << num_cpu_threads << " CPU slots.";
3231  }
3232  kernel_queue_time_ms_ += timer_stop(clock_begin);
3233  launchKernelsImpl(shared_context, std::move(kernels), device_type, num_cpu_threads);
3234 }
3235 
3237  const RelAlgExecutionUnit& ra_exe_unit,
3238  const ExecutorDeviceType device_type,
3239  const size_t table_idx,
3240  const size_t outer_frag_idx,
3241  std::map<shared::TableKey, const TableFragments*>& selected_tables_fragments,
3242  const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
3243  inner_table_id_to_join_condition) {
3244  const auto& table_key = ra_exe_unit.input_descs[table_idx].getTableKey();
3245  auto table_frags_it = selected_tables_fragments.find(table_key);
3246  CHECK(table_frags_it != selected_tables_fragments.end());
3247  const auto& outer_input_desc = ra_exe_unit.input_descs[0];
3248  const auto outer_table_fragments_it =
3249  selected_tables_fragments.find(outer_input_desc.getTableKey());
3250  const auto outer_table_fragments = outer_table_fragments_it->second;
3251  CHECK(outer_table_fragments_it != selected_tables_fragments.end());
3252  CHECK_LT(outer_frag_idx, outer_table_fragments->size());
3253  if (!table_idx) {
3254  return {outer_frag_idx};
3255  }
3256  const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx];
3257  auto& inner_frags = table_frags_it->second;
3258  CHECK_LT(size_t(1), ra_exe_unit.input_descs.size());
3259  std::vector<size_t> all_frag_ids;
3260  for (size_t inner_frag_idx = 0; inner_frag_idx < inner_frags->size();
3261  ++inner_frag_idx) {
3262  const auto& inner_frag_info = (*inner_frags)[inner_frag_idx];
3263  if (skipFragmentPair(outer_fragment_info,
3264  inner_frag_info,
3265  table_idx,
3266  inner_table_id_to_join_condition,
3267  ra_exe_unit,
3268  device_type)) {
3269  continue;
3270  }
3271  all_frag_ids.push_back(inner_frag_idx);
3272  }
3273  return all_frag_ids;
3274 }
3275 
3276 // Returns true iff the join between two fragments cannot yield any results, per
3277 // shard information. The pair can be skipped to avoid full broadcast.
3279  const Fragmenter_Namespace::FragmentInfo& outer_fragment_info,
3280  const Fragmenter_Namespace::FragmentInfo& inner_fragment_info,
3281  const int table_idx,
3282  const std::unordered_map<shared::TableKey, const Analyzer::BinOper*>&
3283  inner_table_id_to_join_condition,
3284  const RelAlgExecutionUnit& ra_exe_unit,
3285  const ExecutorDeviceType device_type) {
3286  if (device_type != ExecutorDeviceType::GPU) {
3287  return false;
3288  }
3289  CHECK(table_idx >= 0 &&
3290  static_cast<size_t>(table_idx) < ra_exe_unit.input_descs.size());
3291  const auto& inner_table_key = ra_exe_unit.input_descs[table_idx].getTableKey();
3292  // Both tables need to be sharded the same way.
3293  if (outer_fragment_info.shard == -1 || inner_fragment_info.shard == -1 ||
3294  outer_fragment_info.shard == inner_fragment_info.shard) {
3295  return false;
3296  }
3297  const Analyzer::BinOper* join_condition{nullptr};
3298  if (ra_exe_unit.join_quals.empty()) {
3299  CHECK(!inner_table_id_to_join_condition.empty());
3300  auto condition_it = inner_table_id_to_join_condition.find(inner_table_key);
3301  CHECK(condition_it != inner_table_id_to_join_condition.end());
3302  join_condition = condition_it->second;
3303  CHECK(join_condition);
3304  } else {
3305  CHECK_EQ(plan_state_->join_info_.equi_join_tautologies_.size(),
3306  plan_state_->join_info_.join_hash_tables_.size());
3307  for (size_t i = 0; i < plan_state_->join_info_.join_hash_tables_.size(); ++i) {
3308  if (plan_state_->join_info_.join_hash_tables_[i]->getInnerTableRteIdx() ==
3309  table_idx) {
3310  CHECK(!join_condition);
3311  join_condition = plan_state_->join_info_.equi_join_tautologies_[i].get();
3312  }
3313  }
3314  }
3315  if (!join_condition) {
3316  return false;
3317  }
3318  // TODO(adb): support fragment skipping based on the bounding box intersect operator
3319  if (join_condition->is_bbox_intersect_oper()) {
3320  return false;
3321  }
3322  size_t shard_count{0};
3323  if (dynamic_cast<const Analyzer::ExpressionTuple*>(
3324  join_condition->get_left_operand())) {
3325  auto inner_outer_pairs =
3326  HashJoin::normalizeColumnPairs(join_condition, getTemporaryTables()).first;
3328  join_condition, this, inner_outer_pairs);
3329  } else {
3330  shard_count = get_shard_count(join_condition, this);
3331  }
3332  if (shard_count && !ra_exe_unit.join_quals.empty()) {
3333  plan_state_->join_info_.sharded_range_table_indices_.emplace(table_idx);
3334  }
3335  return shard_count;
3336 }
3337 
3338 namespace {
3339 
3341  const auto& table_key = col_desc->getScanDesc().getTableKey();
3342  const auto col_id = col_desc->getColId();
3343  return get_column_descriptor_maybe({table_key, col_id});
3344 }
3345 
3346 } // namespace
3347 
3348 std::map<shared::TableKey, std::vector<uint64_t>> get_table_id_to_frag_offsets(
3349  const std::vector<InputDescriptor>& input_descs,
3350  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
3351  std::map<shared::TableKey, std::vector<uint64_t>> tab_id_to_frag_offsets;
3352  for (auto& desc : input_descs) {
3353  const auto fragments_it = all_tables_fragments.find(desc.getTableKey());
3354  CHECK(fragments_it != all_tables_fragments.end());
3355  const auto& fragments = *fragments_it->second;
3356  std::vector<uint64_t> frag_offsets(fragments.size(), 0);
3357  for (size_t i = 0, off = 0; i < fragments.size(); ++i) {
3358  frag_offsets[i] = off;
3359  off += fragments[i].getNumTuples();
3360  }
3361  tab_id_to_frag_offsets.insert(std::make_pair(desc.getTableKey(), frag_offsets));
3362  }
3363  return tab_id_to_frag_offsets;
3364 }
3365 
3366 std::pair<std::vector<std::vector<int64_t>>, std::vector<std::vector<uint64_t>>>
3368  const RelAlgExecutionUnit& ra_exe_unit,
3369  const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
3370  const std::vector<InputDescriptor>& input_descs,
3371  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments) {
3372  std::vector<std::vector<int64_t>> all_num_rows;
3373  std::vector<std::vector<uint64_t>> all_frag_offsets;
3374  const auto tab_id_to_frag_offsets =
3375  get_table_id_to_frag_offsets(input_descs, all_tables_fragments);
3376  std::unordered_map<size_t, size_t> outer_id_to_num_row_idx;
3377  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
3378  std::vector<int64_t> num_rows;
3379  std::vector<uint64_t> frag_offsets;
3380  if (!ra_exe_unit.union_all) {
3381  CHECK_EQ(selected_frag_ids.size(), input_descs.size());
3382  }
3383  for (size_t tab_idx = 0; tab_idx < input_descs.size(); ++tab_idx) {
3384  const auto frag_id = ra_exe_unit.union_all ? 0 : selected_frag_ids[tab_idx];
3385  const auto fragments_it =
3386  all_tables_fragments.find(input_descs[tab_idx].getTableKey());
3387  CHECK(fragments_it != all_tables_fragments.end());
3388  const auto& fragments = *fragments_it->second;
3389  if (ra_exe_unit.join_quals.empty() || tab_idx == 0 ||
3390  plan_state_->join_info_.sharded_range_table_indices_.count(tab_idx)) {
3391  const auto& fragment = fragments[frag_id];
3392  num_rows.push_back(fragment.getNumTuples());
3393  } else {
3394  size_t total_row_count{0};
3395  for (const auto& fragment : fragments) {
3396  total_row_count += fragment.getNumTuples();
3397  }
3398  num_rows.push_back(total_row_count);
3399  }
3400  const auto frag_offsets_it =
3401  tab_id_to_frag_offsets.find(input_descs[tab_idx].getTableKey());
3402  CHECK(frag_offsets_it != tab_id_to_frag_offsets.end());
3403  const auto& offsets = frag_offsets_it->second;
3404  CHECK_LT(frag_id, offsets.size());
3405  frag_offsets.push_back(offsets[frag_id]);
3406  }
3407  all_num_rows.push_back(num_rows);
3408  // Fragment offsets of outer table should be ONLY used by rowid for now.
3409  all_frag_offsets.push_back(frag_offsets);
3410  }
3411  return {all_num_rows, all_frag_offsets};
3412 }
3413 
3414 // Only fetch columns of hash-joined inner fact table whose fetch are not deferred from
3415 // all the table fragments.
3417  const RelAlgExecutionUnit& ra_exe_unit,
3418  const FragmentsList& selected_fragments) const {
3419  const auto& input_descs = ra_exe_unit.input_descs;
3420  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
3421  if (nest_level < 1 ||
3422  inner_col_desc.getScanDesc().getSourceType() != InputSourceType::TABLE ||
3423  ra_exe_unit.join_quals.empty() || input_descs.size() < 2 ||
3424  (ra_exe_unit.join_quals.empty() &&
3425  plan_state_->isLazyFetchColumn(inner_col_desc))) {
3426  return false;
3427  }
3428  const auto& table_key = inner_col_desc.getScanDesc().getTableKey();
3429  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
3430  CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
3431  const auto& fragments = selected_fragments[nest_level].fragment_ids;
3432  return fragments.size() > 1;
3433 }
3434 
3436  const ColumnDescriptor* cd,
3437  const InputColDescriptor& inner_col_desc,
3438  const RelAlgExecutionUnit& ra_exe_unit,
3439  const FragmentsList& selected_fragments,
3440  const Data_Namespace::MemoryLevel memory_level) const {
3441  const int nest_level = inner_col_desc.getScanDesc().getNestLevel();
3442  const auto& table_key = inner_col_desc.getScanDesc().getTableKey();
3443  CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
3444  CHECK_EQ(table_key, selected_fragments[nest_level].table_key);
3445  const auto& fragments = selected_fragments[nest_level].fragment_ids;
3446  auto need_linearize =
3447  cd->columnType.is_array() ||
3449  return table_key.table_id > 0 && need_linearize && fragments.size() > 1;
3450 }
3451 
3452 std::ostream& operator<<(std::ostream& os, FetchResult const& fetch_result) {
3453  return os << "col_buffers" << shared::printContainer(fetch_result.col_buffers)
3454  << " num_rows" << shared::printContainer(fetch_result.num_rows)
3455  << " frag_offsets" << shared::printContainer(fetch_result.frag_offsets);
3456 }
3457 
3459  const ColumnFetcher& column_fetcher,
3460  const RelAlgExecutionUnit& ra_exe_unit,
3461  const int device_id,
3462  const Data_Namespace::MemoryLevel memory_level,
3463  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
3464  const FragmentsList& selected_fragments,
3465  std::list<ChunkIter>& chunk_iterators,
3466  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3467  DeviceAllocator* device_allocator,
3468  const size_t thread_idx,
3469  const bool allow_runtime_interrupt) {
3470  auto timer = DEBUG_TIMER(__func__);
3472  const auto& col_global_ids = ra_exe_unit.input_col_descs;
3473  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3474  std::vector<size_t> local_col_to_frag_pos;
3475  buildSelectedFragsMapping(selected_fragments_crossjoin,
3476  local_col_to_frag_pos,
3477  col_global_ids,
3478  selected_fragments,
3479  ra_exe_unit);
3480 
3482  selected_fragments_crossjoin);
3483  std::vector<std::vector<const int8_t*>> all_frag_col_buffers;
3484  std::vector<std::vector<int64_t>> all_num_rows;
3485  std::vector<std::vector<uint64_t>> all_frag_offsets;
3486  for (const auto& selected_frag_ids : frag_ids_crossjoin) {
3487  std::vector<const int8_t*> frag_col_buffers(
3488  plan_state_->global_to_local_col_ids_.size());
3489  for (const auto& col_id : col_global_ids) {
3490  if (allow_runtime_interrupt) {
3491  bool isInterrupted = false;
3492  {
3495  const auto query_session = getCurrentQuerySession(session_read_lock);
3496  isInterrupted =
3497  checkIsQuerySessionInterrupted(query_session, session_read_lock);
3498  }
3499  if (isInterrupted) {
3500  throw QueryExecutionError(ErrorCode::INTERRUPTED);
3501  }
3502  }
3503  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3504  throw QueryExecutionError(ErrorCode::INTERRUPTED);
3505  }
3506  CHECK(col_id);
3507  const auto cd = try_get_column_descriptor(col_id.get());
3508  if (cd && cd->isVirtualCol) {
3509  CHECK_EQ("rowid", cd->columnName);
3510  continue;
3511  }
3512  const auto& table_key = col_id->getScanDesc().getTableKey();
3513  const auto fragments_it = all_tables_fragments.find(table_key);
3514  CHECK(fragments_it != all_tables_fragments.end());
3515  const auto fragments = fragments_it->second;
3516  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3517  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3518  CHECK_LT(static_cast<size_t>(it->second),
3519  plan_state_->global_to_local_col_ids_.size());
3520  const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
3521  if (!fragments->size()) {
3522  return {};
3523  }
3524  CHECK_LT(frag_id, fragments->size());
3525  auto memory_level_for_column = memory_level;
3526  const shared::ColumnKey tbl_col_key{col_id->getScanDesc().getTableKey(),
3527  col_id->getColId()};
3528  if (!plan_state_->isColumnToFetch(tbl_col_key)) {
3529  memory_level_for_column = Data_Namespace::CPU_LEVEL;
3530  }
3531  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
3532  frag_col_buffers[it->second] =
3533  column_fetcher.getResultSetColumn(col_id.get(),
3534  memory_level_for_column,
3535  device_id,
3536  device_allocator,
3537  thread_idx);
3538  } else {
3539  if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3540  // determine if we need special treatment to linearlize multi-frag table
3541  // i.e., a column that is classified as varlen type, i.e., array
3542  // for now, we only support fixed-length array that contains
3543  // geo point coordianates but we can support more types in this way
3545  cd, *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3546  bool for_lazy_fetch = false;
3547  if (plan_state_->isColumnToNotFetch(tbl_col_key)) {
3548  for_lazy_fetch = true;
3549  VLOG(2) << "Try to linearize lazy fetch column (col_id: " << cd->columnId
3550  << ", col_name: " << cd->columnName << ")";
3551  }
3552  frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments(
3553  col_id->getScanDesc().getTableKey(),
3554  col_id->getColId(),
3555  all_tables_fragments,
3556  chunks,
3557  chunk_iterators,
3558  for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3559  for_lazy_fetch ? 0 : device_id,
3560  device_allocator,
3561  thread_idx);
3562  } else {
3563  frag_col_buffers[it->second] = column_fetcher.getAllTableColumnFragments(
3564  col_id->getScanDesc().getTableKey(),
3565  col_id->getColId(),
3566  all_tables_fragments,
3567  memory_level_for_column,
3568  device_id,
3569  device_allocator,
3570  thread_idx);
3571  }
3572  } else {
3573  frag_col_buffers[it->second] = column_fetcher.getOneTableColumnFragment(
3574  col_id->getScanDesc().getTableKey(),
3575  frag_id,
3576  col_id->getColId(),
3577  all_tables_fragments,
3578  chunks,
3579  chunk_iterators,
3580  memory_level_for_column,
3581  device_id,
3582  device_allocator);
3583  }
3584  }
3585  }
3586  all_frag_col_buffers.push_back(frag_col_buffers);
3587  }
3588  std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
3589  ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments);
3590  return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3591 }
3592 
3593 namespace {
3595  std::vector<InputDescriptor> const& input_descs) {
3596  auto const has_table_key = [&table_key](InputDescriptor const& input_desc) {
3597  return table_key == input_desc.getTableKey();
3598  };
3599  return std::find_if(input_descs.begin(), input_descs.end(), has_table_key) -
3600  input_descs.begin();
3601 }
3602 
3604  const shared::TableKey& table_key,
3605  std::list<std::shared_ptr<InputColDescriptor const>> const& input_col_descs) {
3606  auto const has_table_key = [&table_key](auto const& input_desc) {
3607  return table_key == input_desc->getScanDesc().getTableKey();
3608  };
3609  return std::distance(
3610  input_col_descs.begin(),
3611  std::find_if(input_col_descs.begin(), input_col_descs.end(), has_table_key));
3612 }
3613 
3614 std::list<std::shared_ptr<const InputColDescriptor>> get_selected_input_col_descs(
3615  const shared::TableKey& table_key,
3616  std::list<std::shared_ptr<InputColDescriptor const>> const& input_col_descs) {
3617  std::list<std::shared_ptr<const InputColDescriptor>> selected;
3618  for (auto const& input_col_desc : input_col_descs) {
3619  if (table_key == input_col_desc->getScanDesc().getTableKey()) {
3620  selected.push_back(input_col_desc);
3621  }
3622  }
3623  return selected;
3624 }
3625 
3626 // Set N consecutive elements of frag_col_buffers to ptr in the range of local_col_id.
3627 void set_mod_range(std::vector<int8_t const*>& frag_col_buffers,
3628  int8_t const* const ptr,
3629  size_t const local_col_id,
3630  size_t const N) {
3631  size_t const begin = local_col_id - local_col_id % N; // N divides begin
3632  size_t const end = begin + N;
3633  CHECK_LE(end, frag_col_buffers.size()) << (void*)ptr << ' ' << local_col_id << ' ' << N;
3634  for (size_t i = begin; i < end; ++i) {
3635  frag_col_buffers[i] = ptr;
3636  }
3637 }
3638 } // namespace
3639 
3640 // fetchChunks() assumes that multiple inputs implies a JOIN.
3641 // fetchUnionChunks() assumes that multiple inputs implies a UNION ALL.
3643  const ColumnFetcher& column_fetcher,
3644  const RelAlgExecutionUnit& ra_exe_unit,
3645  const int device_id,
3646  const Data_Namespace::MemoryLevel memory_level,
3647  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
3648  const FragmentsList& selected_fragments,
3649  std::list<ChunkIter>& chunk_iterators,
3650  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks,
3651  DeviceAllocator* device_allocator,
3652  const size_t thread_idx,
3653  const bool allow_runtime_interrupt) {
3654  auto timer = DEBUG_TIMER(__func__);
3656 
3657  CHECK_EQ(1u, selected_fragments.size());
3658  CHECK_LE(2u, ra_exe_unit.input_descs.size());
3659  CHECK_LE(2u, ra_exe_unit.input_col_descs.size());
3660  auto const& input_descs = ra_exe_unit.input_descs;
3661  const auto& selected_table_key = selected_fragments.front().table_key;
3662  size_t const input_descs_index =
3663  get_selected_input_descs_index(selected_table_key, input_descs);
3664  CHECK_LT(input_descs_index, input_descs.size());
3665  size_t const input_col_descs_index =
3666  get_selected_input_col_descs_index(selected_table_key, ra_exe_unit.input_col_descs);
3667  CHECK_LT(input_col_descs_index, ra_exe_unit.input_col_descs.size());
3668  VLOG(2) << "selected_table_key=" << selected_table_key
3669  << " input_descs_index=" << input_descs_index
3670  << " input_col_descs_index=" << input_col_descs_index
3671  << " input_descs=" << shared::printContainer(input_descs)
3672  << " ra_exe_unit.input_col_descs="
3673  << shared::printContainer(ra_exe_unit.input_col_descs);
3674 
3675  std::list<std::shared_ptr<const InputColDescriptor>> selected_input_col_descs =
3676  get_selected_input_col_descs(selected_table_key, ra_exe_unit.input_col_descs);
3677  std::vector<std::vector<size_t>> selected_fragments_crossjoin;
3678 
3680  selected_fragments_crossjoin, selected_fragments, ra_exe_unit);
3681 
3683  selected_fragments_crossjoin);
3684 
3685  if (allow_runtime_interrupt) {
3686  bool isInterrupted = false;
3687  {
3690  const auto query_session = getCurrentQuerySession(session_read_lock);
3691  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3692  }
3693  if (isInterrupted) {
3694  throw QueryExecutionError(ErrorCode::INTERRUPTED);
3695  }
3696  }
3697  std::vector<const int8_t*> frag_col_buffers(
3698  plan_state_->global_to_local_col_ids_.size());
3699  for (const auto& col_id : selected_input_col_descs) {
3700  CHECK(col_id);
3701  const auto cd = try_get_column_descriptor(col_id.get());
3702  if (cd && cd->isVirtualCol) {
3703  CHECK_EQ("rowid", cd->columnName);
3704  continue;
3705  }
3706  const auto fragments_it = all_tables_fragments.find(selected_table_key);
3707  CHECK(fragments_it != all_tables_fragments.end());
3708  const auto fragments = fragments_it->second;
3709  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3710  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3711  size_t const local_col_id = it->second;
3712  CHECK_LT(local_col_id, plan_state_->global_to_local_col_ids_.size());
3713  constexpr size_t frag_id = 0;
3714  if (fragments->empty()) {
3715  return {};
3716  }
3717  MemoryLevel const memory_level_for_column =
3718  plan_state_->isColumnToFetch({selected_table_key, col_id->getColId()})
3719  ? memory_level
3721  int8_t const* ptr;
3722  if (col_id->getScanDesc().getSourceType() == InputSourceType::RESULT) {
3723  ptr = column_fetcher.getResultSetColumn(
3724  col_id.get(), memory_level_for_column, device_id, device_allocator, thread_idx);
3725  } else if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3726  ptr = column_fetcher.getAllTableColumnFragments(selected_table_key,
3727  col_id->getColId(),
3728  all_tables_fragments,
3729  memory_level_for_column,
3730  device_id,
3731  device_allocator,
3732  thread_idx);
3733  } else {
3734  ptr = column_fetcher.getOneTableColumnFragment(selected_table_key,
3735  frag_id,
3736  col_id->getColId(),
3737  all_tables_fragments,
3738  chunks,
3739  chunk_iterators,
3740  memory_level_for_column,
3741  device_id,
3742  device_allocator);
3743  }
3744  // Set frag_col_buffers[i]=ptr for i in mod input_descs.size() range of local_col_id.
3745  set_mod_range(frag_col_buffers, ptr, local_col_id, input_descs.size());
3746  }
3747  auto const [num_rows, frag_offsets] = getRowCountAndOffsetForAllFrags(
3748  ra_exe_unit, frag_ids_crossjoin, input_descs, all_tables_fragments);
3749 
3750  VLOG(2) << "frag_col_buffers=" << shared::printContainer(frag_col_buffers)
3751  << " num_rows=" << shared::printContainer(num_rows)
3752  << " frag_offsets=" << shared::printContainer(frag_offsets)
3753  << " input_descs_index=" << input_descs_index
3754  << " input_col_descs_index=" << input_col_descs_index;
3755  return {{std::move(frag_col_buffers)},
3756  {{num_rows[0][input_descs_index]}},
3757  {{frag_offsets[0][input_descs_index]}}};
3758 }
3759 
3760 std::vector<size_t> Executor::getFragmentCount(const FragmentsList& selected_fragments,
3761  const size_t scan_idx,
3762  const RelAlgExecutionUnit& ra_exe_unit) {
3763  if ((ra_exe_unit.input_descs.size() > size_t(2) || !ra_exe_unit.join_quals.empty()) &&
3764  scan_idx > 0 &&
3765  !plan_state_->join_info_.sharded_range_table_indices_.count(scan_idx) &&
3766  !selected_fragments[scan_idx].fragment_ids.empty()) {
3767  // Fetch all fragments
3768  return {size_t(0)};
3769  }
3770 
3771  return selected_fragments[scan_idx].fragment_ids;
3772 }
3773 
3775  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3776  std::vector<size_t>& local_col_to_frag_pos,
3777  const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
3778  const FragmentsList& selected_fragments,
3779  const RelAlgExecutionUnit& ra_exe_unit) {
3780  local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
3781  size_t frag_pos{0};
3782  const auto& input_descs = ra_exe_unit.input_descs;
3783  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3784  const auto& table_key = input_descs[scan_idx].getTableKey();
3785  CHECK_EQ(selected_fragments[scan_idx].table_key, table_key);
3786  selected_fragments_crossjoin.push_back(
3787  getFragmentCount(selected_fragments, scan_idx, ra_exe_unit));
3788  for (const auto& col_id : col_global_ids) {
3789  CHECK(col_id);
3790  const auto& input_desc = col_id->getScanDesc();
3791  if (input_desc.getTableKey() != table_key ||
3792  input_desc.getNestLevel() != static_cast<int>(scan_idx)) {
3793  continue;
3794  }
3795  auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3796  CHECK(it != plan_state_->global_to_local_col_ids_.end());
3797  CHECK_LT(static_cast<size_t>(it->second),
3798  plan_state_->global_to_local_col_ids_.size());
3799  local_col_to_frag_pos[it->second] = frag_pos;
3800  }
3801  ++frag_pos;
3802  }
3803 }
3804 
3806  std::vector<std::vector<size_t>>& selected_fragments_crossjoin,
3807  const FragmentsList& selected_fragments,
3808  const RelAlgExecutionUnit& ra_exe_unit) {
3809  const auto& input_descs = ra_exe_unit.input_descs;
3810  for (size_t scan_idx = 0; scan_idx < input_descs.size(); ++scan_idx) {
3811  // selected_fragments is set in assignFragsToKernelDispatch execution_kernel.fragments
3812  if (selected_fragments[0].table_key == input_descs[scan_idx].getTableKey()) {
3813  selected_fragments_crossjoin.push_back({size_t(1)});
3814  }
3815  }
3816 }
3817 
3818 namespace {
3819 
3821  public:
3822  OutVecOwner(const std::vector<int64_t*>& out_vec) : out_vec_(out_vec) {}
3824  for (auto out : out_vec_) {
3825  delete[] out;
3826  }
3827  }
3828 
3829  private:
3830  std::vector<int64_t*> out_vec_;
3831 };
3832 } // namespace
3833 
3835  const RelAlgExecutionUnit& ra_exe_unit,
3836  const CompilationResult& compilation_result,
3837  const bool hoist_literals,
3838  ResultSetPtr* results,
3839  const std::vector<Analyzer::Expr*>& target_exprs,
3840  const ExecutorDeviceType device_type,
3841  std::vector<std::vector<const int8_t*>>& col_buffers,
3842  QueryExecutionContext* query_exe_context,
3843  const std::vector<std::vector<int64_t>>& num_rows,
3844  const std::vector<std::vector<uint64_t>>& frag_offsets,
3845  Data_Namespace::DataMgr* data_mgr,
3846  const int device_id,
3847  const uint32_t start_rowid,
3848  const uint32_t num_tables,
3849  const bool allow_runtime_interrupt,
3850  RenderInfo* render_info,
3851  const bool optimize_cuda_block_and_grid_sizes,
3852  const int64_t rows_to_process) {
3854  auto timer = DEBUG_TIMER(__func__);
3855  CHECK(!results || !(*results));
3856  if (col_buffers.empty()) {
3857  return 0;
3858  }
3859 
3860  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
3861  if (render_info) {
3862  // TODO(adb): make sure that we either never get here in the CPU case, or if we do get
3863  // here, we are in non-insitu mode.
3864  CHECK(render_info->useCudaBuffers() || !render_info->isInSitu())
3865  << "CUDA disabled rendering in the executePlanWithoutGroupBy query path is "
3866  "currently unsupported.";
3867  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
3868  }
3869 
3870  int32_t error_code = 0;
3871  std::vector<int64_t*> out_vec;
3872  const auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
3873  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
3874  std::unique_ptr<OutVecOwner> output_memory_scope;
3875  if (allow_runtime_interrupt) {
3876  bool isInterrupted = false;
3877  {
3880  const auto query_session = getCurrentQuerySession(session_read_lock);
3881  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
3882  }
3883  if (isInterrupted) {
3884  throw QueryExecutionError(ErrorCode::INTERRUPTED);
3885  }
3886  }
3887  if (g_enable_dynamic_watchdog && interrupted_.load()) {
3888  throw QueryExecutionError(ErrorCode::INTERRUPTED);
3889  }
3890  if (device_type == ExecutorDeviceType::CPU) {
3891  CpuCompilationContext* cpu_generated_code =
3892  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
3893  CHECK(cpu_generated_code);
3894  out_vec = query_exe_context->launchCpuCode(ra_exe_unit,
3895  cpu_generated_code,
3896  hoist_literals,
3897  hoist_buf,
3898  col_buffers,
3899  num_rows,
3900  frag_offsets,
3901  0,
3902  &error_code,
3903  start_rowid,
3904  num_tables,
3905  join_hash_table_ptrs,
3906  rows_to_process);
3907  output_memory_scope.reset(new OutVecOwner(out_vec));
3908  } else {
3909  GpuCompilationContext* gpu_generated_code =
3910  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
3911  CHECK(gpu_generated_code);
3912  try {
3913  out_vec = query_exe_context->launchGpuCode(
3914  ra_exe_unit,
3915  gpu_generated_code,
3916  hoist_literals,
3917  hoist_buf,
3918  col_buffers,
3919  num_rows,
3920  frag_offsets,
3921  0,
3922  data_mgr,
3923  blockSize(),
3924  gridSize(),
3925  device_id,
3926  compilation_result.gpu_smem_context.getSharedMemorySize(),
3927  &error_code,
3928  num_tables,
3929  allow_runtime_interrupt,
3930  join_hash_table_ptrs,
3931  render_allocator_map_ptr,
3932  optimize_cuda_block_and_grid_sizes);
3933  output_memory_scope.reset(new OutVecOwner(out_vec));
3934  } catch (const OutOfMemory&) {
3935  return int32_t(ErrorCode::OUT_OF_GPU_MEM);
3936  } catch (const std::exception& e) {
3937  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
3938  }
3939  }
3940  if (heavyai::IsAny<ErrorCode::OVERFLOW_OR_UNDERFLOW,
3941  ErrorCode::DIV_BY_ZERO,
3942  ErrorCode::OUT_OF_TIME,
3943  ErrorCode::INTERRUPTED,
3944  ErrorCode::SINGLE_VALUE_FOUND_MULTIPLE_VALUES,
3945  ErrorCode::GEOS,
3946  ErrorCode::WIDTH_BUCKET_INVALID_ARGUMENT,
3947  ErrorCode::BBOX_OVERLAPS_LIMIT_EXCEEDED>::check(error_code)) {
3948  return error_code;
3949  }
3950  if (ra_exe_unit.estimator) {
3951  CHECK(!error_code);
3952  if (results) {
3953  *results =
3954  std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
3955  }
3956  return 0;
3957  }
3958  // Expect delayed results extraction (used for sub-fragments) for estimator only;
3959  CHECK(results);
3960  std::vector<int64_t> reduced_outs;
3961  const auto num_frags = col_buffers.size();
3962  const size_t entry_count =
3963  device_type == ExecutorDeviceType::GPU
3964  ? (compilation_result.gpu_smem_context.isSharedMemoryUsed()
3965  ? 1
3966  : blockSize() * gridSize() * num_frags)
3967  : num_frags;
3968  if (size_t(1) == entry_count) {
3969  for (auto out : out_vec) {
3970  CHECK(out);
3971  reduced_outs.push_back(*out);
3972  }
3973  } else {
3974  size_t out_vec_idx = 0;
3975 
3976  for (const auto target_expr : target_exprs) {
3977  const auto agg_info = get_target_info(target_expr, g_bigint_count);
3978  CHECK(agg_info.is_agg || dynamic_cast<Analyzer::Constant*>(target_expr))
3979  << target_expr->toString();
3980 
3981  const int num_iterations = agg_info.sql_type.is_geometry()
3982  ? agg_info.sql_type.get_physical_coord_cols()
3983  : 1;
3984 
3985  for (int i = 0; i < num_iterations; i++) {
3986  int64_t val1;
3987  const bool float_argument_input = takes_float_argument(agg_info);
3988  if (is_distinct_target(agg_info) ||
3989  shared::is_any<kAPPROX_QUANTILE, kMODE>(agg_info.agg_kind)) {
3990  bool const check = shared::
3991  is_any<kCOUNT, kAPPROX_COUNT_DISTINCT, kAPPROX_QUANTILE, kMODE, kCOUNT_IF>(
3992  agg_info.agg_kind);
3993  CHECK(check) << agg_info.agg_kind;
3994  val1 = out_vec[out_vec_idx][0];
3995  error_code = 0;
3996  } else {
3997  const auto chosen_bytes = static_cast<size_t>(
3998  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx));
3999  std::tie(val1, error_code) = Executor::reduceResults(
4000  agg_info.agg_kind,
4001  agg_info.sql_type,
4002  query_exe_context->getAggInitValForIndex(out_vec_idx),
4003  float_argument_input ? sizeof(int32_t) : chosen_bytes,
4004  out_vec[out_vec_idx],
4005  entry_count,
4006  false,
4007  float_argument_input);
4008  }
4009  if (error_code) {
4010  break;
4011  }
4012  reduced_outs.push_back(val1);
4013  if (agg_info.agg_kind == kAVG ||
4014  (agg_info.agg_kind == kSAMPLE &&
4015  (agg_info.sql_type.is_varlen() || agg_info.sql_type.is_geometry()))) {
4016  const auto chosen_bytes = static_cast<size_t>(
4017  query_exe_context->query_mem_desc_.getPaddedSlotWidthBytes(out_vec_idx +
4018  1));
4019  int64_t val2;
4020  std::tie(val2, error_code) = Executor::reduceResults(
4021  agg_info.agg_kind == kAVG ? kCOUNT : agg_info.agg_kind,
4022  agg_info.sql_type,
4023  query_exe_context->getAggInitValForIndex(out_vec_idx + 1),
4024  float_argument_input ? sizeof(int32_t) : chosen_bytes,
4025  out_vec[out_vec_idx + 1],
4026  entry_count,
4027  false,
4028  false);
4029  if (error_code) {
4030  break;
4031  }
4032  reduced_outs.push_back(val2);
4033  ++out_vec_idx;
4034  }
4035  ++out_vec_idx;
4036  }
4037  }
4038  }
4039 
4040  if (error_code) {
4041  return error_code;
4042  }
4043 
4044  CHECK_EQ(size_t(1), query_exe_context->query_buffers_->result_sets_.size());
4045  auto rows_ptr = std::shared_ptr<ResultSet>(
4046  query_exe_context->query_buffers_->result_sets_[0].release());
4047  rows_ptr->fillOneEntry(reduced_outs);
4048  *results = std::move(rows_ptr);
4049  return error_code;
4050 }
4051 
4052 namespace {
4053 
4054 bool check_rows_less_than_needed(const ResultSetPtr& results, const size_t scan_limit) {
4055  CHECK(scan_limit);
4056  return results && results->rowCount() < scan_limit;
4057 }
4058 
4059 } // namespace
4060 
4062  const RelAlgExecutionUnit& ra_exe_unit,
4063  const CompilationResult& compilation_result,
4064  const bool hoist_literals,
4065  ResultSetPtr* results,
4066  const ExecutorDeviceType device_type,
4067  std::vector<std::vector<const int8_t*>>& col_buffers,
4068  const std::vector<size_t> outer_tab_frag_ids,
4069  QueryExecutionContext* query_exe_context,
4070  const std::vector<std::vector<int64_t>>& num_rows,
4071  const std::vector<std::vector<uint64_t>>& frag_offsets,
4072  Data_Namespace::DataMgr* data_mgr,
4073  const int device_id,
4074  const shared::TableKey& outer_table_key,
4075  const int64_t scan_limit,
4076  const uint32_t start_rowid,
4077  const uint32_t num_tables,
4078  const bool allow_runtime_interrupt,
4079  RenderInfo* render_info,
4080  const bool optimize_cuda_block_and_grid_sizes,
4081  const int64_t rows_to_process) {
4082  auto timer = DEBUG_TIMER(__func__);
4084  // TODO: get results via a separate method, but need to do something with literals.
4085  CHECK(!results || !(*results));
4086  if (col_buffers.empty()) {
4087  return 0;
4088  }
4089  CHECK_NE(ra_exe_unit.groupby_exprs.size(), size_t(0));
4090  // TODO(alex):
4091  // 1. Optimize size (make keys more compact).
4092  // 2. Resize on overflow.
4093  // 3. Optimize runtime.
4094  auto hoist_buf = serializeLiterals(compilation_result.literal_values, device_id);
4095  int32_t error_code = 0;
4096  const auto join_hash_table_ptrs = getJoinHashTablePtrs(device_type, device_id);
4097  if (allow_runtime_interrupt) {
4098  bool isInterrupted = false;
4099  {
4102  const auto query_session = getCurrentQuerySession(session_read_lock);
4103  isInterrupted = checkIsQuerySessionInterrupted(query_session, session_read_lock);
4104  }
4105  if (isInterrupted) {
4106  throw QueryExecutionError(ErrorCode::INTERRUPTED);
4107  }
4108  }
4109  if (g_enable_dynamic_watchdog && interrupted_.load()) {
4110  return int32_t(ErrorCode::INTERRUPTED);
4111  }
4112 
4113  RenderAllocatorMap* render_allocator_map_ptr = nullptr;
4114  if (render_info && render_info->useCudaBuffers()) {
4115  render_allocator_map_ptr = render_info->render_allocator_map_ptr.get();
4116  }
4117 
4118  VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
4119  << " ra_exe_unit.input_descs="
4120  << shared::printContainer(ra_exe_unit.input_descs)
4121  << " ra_exe_unit.input_col_descs="
4122  << shared::printContainer(ra_exe_unit.input_col_descs)
4123  << " ra_exe_unit.scan_limit=" << ra_exe_unit.scan_limit
4124  << " num_rows=" << shared::printContainer(num_rows)
4125  << " frag_offsets=" << shared::printContainer(frag_offsets)
4126  << " query_exe_context->query_buffers_->num_rows_="
4127  << query_exe_context->query_buffers_->num_rows_
4128  << " query_exe_context->query_mem_desc_.getEntryCount()="
4129  << query_exe_context->query_mem_desc_.getEntryCount()
4130  << " device_id=" << device_id << " outer_table_key=" << outer_table_key
4131  << " scan_limit=" << scan_limit << " start_rowid=" << start_rowid
4132  << " num_tables=" << num_tables;
4133 
4134  RelAlgExecutionUnit ra_exe_unit_copy = ra_exe_unit;
4135  // For UNION ALL, filter out input_descs and input_col_descs that are not associated
4136  // with outer_table_id.
4137  if (ra_exe_unit_copy.union_all) {
4138  // Sort outer_table_id first, then pop the rest off of ra_exe_unit_copy.input_descs.
4139  std::stable_sort(ra_exe_unit_copy.input_descs.begin(),
4140  ra_exe_unit_copy.input_descs.end(),
4141  [outer_table_key](auto const& a, auto const& b) {
4142  return a.getTableKey() == outer_table_key &&
4143  b.getTableKey() != outer_table_key;
4144  });
4145  while (!ra_exe_unit_copy.input_descs.empty() &&
4146  ra_exe_unit_copy.input_descs.back().getTableKey() != outer_table_key) {
4147  ra_exe_unit_copy.input_descs.pop_back();
4148  }
4149  // Filter ra_exe_unit_copy.input_col_descs.
4150  ra_exe_unit_copy.input_col_descs.remove_if(
4151  [outer_table_key](auto const& input_col_desc) {
4152  return input_col_desc->getScanDesc().getTableKey() != outer_table_key;
4153  });
4154  query_exe_context->query_mem_desc_.setEntryCount(ra_exe_unit_copy.scan_limit);
4155  }
4156 
4157  if (device_type == ExecutorDeviceType::CPU) {
4158  const int32_t scan_limit_for_query =
4159  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
4160  const int32_t max_matched = scan_limit_for_query == 0
4161  ? query_exe_context->query_mem_desc_.getEntryCount()
4162  : scan_limit_for_query;
4163  CpuCompilationContext* cpu_generated_code =
4164  dynamic_cast<CpuCompilationContext*>(compilation_result.generated_code.get());
4165  CHECK(cpu_generated_code);
4166  query_exe_context->launchCpuCode(ra_exe_unit_copy,
4167  cpu_generated_code,
4168  hoist_literals,
4169  hoist_buf,
4170  col_buffers,
4171  num_rows,
4172  frag_offsets,
4173  max_matched,
4174  &error_code,
4175  start_rowid,
4176  num_tables,
4177  join_hash_table_ptrs,
4178  rows_to_process);
4179  } else {
4180  try {
4181  GpuCompilationContext* gpu_generated_code =
4182  dynamic_cast<GpuCompilationContext*>(compilation_result.generated_code.get());
4183  CHECK(gpu_generated_code);
4184  query_exe_context->launchGpuCode(
4185  ra_exe_unit_copy,
4186  gpu_generated_code,
4187  hoist_literals,
4188  hoist_buf,
4189  col_buffers,
4190  num_rows,
4191  frag_offsets,
4192  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit,
4193  data_mgr,
4194  blockSize(),
4195  gridSize(),
4196  device_id,
4197  compilation_result.gpu_smem_context.getSharedMemorySize(),
4198  &error_code,
4199  num_tables,
4200  allow_runtime_interrupt,
4201  join_hash_table_ptrs,
4202  render_allocator_map_ptr,
4203  optimize_cuda_block_and_grid_sizes);
4204  } catch (const OutOfMemory&) {
4205  return int32_t(ErrorCode::OUT_OF_GPU_MEM);
4206  } catch (const OutOfRenderMemory&) {
4207  return int32_t(ErrorCode::OUT_OF_RENDER_MEM);
4208  } catch (const StreamingTopNNotSupportedInRenderQuery&) {
4209  return int32_t(ErrorCode::STREAMING_TOP_N_NOT_SUPPORTED_IN_RENDER_QUERY);
4210  } catch (const std::exception& e) {
4211  LOG(FATAL) << "Error launching the GPU kernel: " << e.what();
4212  }
4213  }
4214 
4215  if (heavyai::IsAny<ErrorCode::OVERFLOW_OR_UNDERFLOW,
4216  ErrorCode::DIV_BY_ZERO,
4217  ErrorCode::OUT_OF_TIME,
4218  ErrorCode::INTERRUPTED,
4219  ErrorCode::SINGLE_VALUE_FOUND_MULTIPLE_VALUES,
4220  ErrorCode::GEOS,
4221  ErrorCode::WIDTH_BUCKET_INVALID_ARGUMENT,
4222  ErrorCode::BBOX_OVERLAPS_LIMIT_EXCEEDED>::check(error_code)) {
4223  return error_code;
4224  }
4225 
4226  if (results && error_code != int32_t(ErrorCode::OVERFLOW_OR_UNDERFLOW) &&
4227  error_code != int32_t(ErrorCode::DIV_BY_ZERO) && !render_allocator_map_ptr) {
4228  *results = query_exe_context->getRowSet(ra_exe_unit_copy,
4229  query_exe_context->query_mem_desc_);
4230  CHECK(*results);
4231  VLOG(2) << "results->rowCount()=" << (*results)->rowCount();
4232  (*results)->holdLiterals(hoist_buf);
4233  }
4234  if (error_code < 0 && render_allocator_map_ptr) {
4235  auto const adjusted_scan_limit =
4236  ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
4237  // More rows passed the filter than available slots. We don't have a count to check,
4238  // so assume we met the limit if a scan limit is set
4239  if (adjusted_scan_limit != 0) {
4240  return 0;
4241  } else {
4242  return error_code;
4243  }
4244  }
4245  if (results && error_code &&
4246  (!scan_limit || check_rows_less_than_needed(*results, scan_limit))) {
4247  return error_code; // unlucky, not enough results and we ran out of slots
4248  }
4249 
4250  return 0;
4251 }
4252 
4253 std::vector<int8_t*> Executor::getJoinHashTablePtrs(const ExecutorDeviceType device_type,
4254  const int device_id) {
4255  std::vector<int8_t*> table_ptrs;
4256  const auto& join_hash_tables = plan_state_->join_info_.join_hash_tables_;
4257  for (auto hash_table : join_hash_tables) {
4258  if (!hash_table) {
4259  CHECK(table_ptrs.empty());
4260  return {};
4261  }
4262  table_ptrs.push_back(hash_table->getJoinHashBuffer(
4263  device_type, device_type == ExecutorDeviceType::GPU ? device_id : 0));
4264  }
4265  return table_ptrs;
4266 }
4267 
4268 void Executor::nukeOldState(const bool allow_lazy_fetch,
4269  const std::vector<InputTableInfo>& query_infos,
4270  const PlanState::DeletedColumnsMap& deleted_cols_map,
4271  const RelAlgExecutionUnit* ra_exe_unit) {
4274  const bool contains_left_deep_outer_join =
4275  ra_exe_unit && std::find_if(ra_exe_unit->join_quals.begin(),
4276  ra_exe_unit->join_quals.end(),
4277  [](const JoinCondition& join_condition) {
4278  return join_condition.type == JoinType::LEFT;
4279  }) != ra_exe_unit->join_quals.end();
4280  cgen_state_.reset(
4281  new CgenState(query_infos.size(), contains_left_deep_outer_join, this));
4282  plan_state_.reset(new PlanState(allow_lazy_fetch && !contains_left_deep_outer_join,
4283  query_infos,
4284  deleted_cols_map,
4285  this));
4286 }
4287 
4288 void Executor::preloadFragOffsets(const std::vector<InputDescriptor>& input_descs,
4289  const std::vector<InputTableInfo>& query_infos) {
4291  const auto ld_count = input_descs.size();
4292  auto frag_off_ptr = get_arg_by_name(cgen_state_->row_func_, "frag_row_off");
4293  for (size_t i = 0; i < ld_count; ++i) {
4294  CHECK_LT(i, query_infos.size());
4295  const auto frag_count = query_infos[i].info.fragments.size();
4296  if (i > 0) {
4297  cgen_state_->frag_offsets_.push_back(nullptr);
4298  } else {
4299  if (frag_count > 1) {
4300  cgen_state_->frag_offsets_.push_back(cgen_state_->ir_builder_.CreateLoad(
4301  frag_off_ptr->getType()->getPointerElementType(), frag_off_ptr));
4302  } else {
4303  cgen_state_->frag_offsets_.push_back(nullptr);
4304  }
4305  }
4306  }
4307 }
4308 
4310  const std::shared_ptr<Analyzer::BinOper>& qual_bin_oper,
4311  const std::vector<InputTableInfo>& query_infos,
4312  const MemoryLevel memory_level,
4313  const JoinType join_type,
4314  const HashType preferred_hash_type,
4315  ColumnCacheMap& column_cache,
4316  const HashTableBuildDagMap& hashtable_build_dag_map,
4317  const RegisteredQueryHint& query_hint,
4318  const TableIdToNodeMap& table_id_to_node_map) {
4319  if (!g_enable_bbox_intersect_hashjoin && qual_bin_oper->is_bbox_intersect_oper()) {
4320  return {nullptr,
4321  "Bounding box intersection disabled, attempting to fall back to loop join"};
4322  }
4323  if (g_enable_dynamic_watchdog && interrupted_.load()) {
4324  throw QueryExecutionError(ErrorCode::INTERRUPTED);
4325  }
4326  try {
4327  auto tbl = HashJoin::getInstance(qual_bin_oper,
4328  query_infos,
4329  memory_level,
4330  join_type,
4331  preferred_hash_type,
4332  deviceCountForMemoryLevel(memory_level),
4333  column_cache,
4334  this,
4335  hashtable_build_dag_map,
4336  query_hint,
4337  table_id_to_node_map);
4338  return {tbl, ""};
4339  } catch (const HashJoinFail& e) {
4340  return {nullptr, e.what()};
4341  }
4342 }
4343 
4344 int8_t Executor::warpSize() const {
4345  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
4346  CHECK(!dev_props.empty());
4347  return dev_props.front().warpSize;
4348 }
4349 
4350 // TODO(adb): should these three functions have consistent symantics if cuda mgr does not
4351 // exist?
4352 unsigned Executor::gridSize() const {
4353  CHECK(data_mgr_);
4354  const auto cuda_mgr = data_mgr_->getCudaMgr();
4355  if (!cuda_mgr) {
4356  return 0;
4357  }
4358  return grid_size_x_ ? grid_size_x_ : 2 * cuda_mgr->getMinNumMPsForAllDevices();
4359 }
4360 
4361 unsigned Executor::numBlocksPerMP() const {
4362  return std::max((unsigned)2,
4363  shared::ceil_div(grid_size_x_, cudaMgr()->getMinNumMPsForAllDevices()));
4364 }
4365 
4366 unsigned Executor::blockSize() const {
4367  CHECK(data_mgr_);
4368  const auto cuda_mgr = data_mgr_->getCudaMgr();
4369  if (!cuda_mgr) {
4370  return 0;
4371  }
4372  const auto& dev_props = cuda_mgr->getAllDeviceProperties();
4373  return block_size_x_ ? block_size_x_ : dev_props.front().maxThreadsPerBlock;
4374 }
4375 
4376 void Executor::setGridSize(unsigned grid_size) {
4377  grid_size_x_ = grid_size;
4378 }
4379 
4381  grid_size_x_ = 0;
4382 }
4383 
4384 void Executor::setBlockSize(unsigned block_size) {
4385  block_size_x_ = block_size;
4386 }
4387 
4389  block_size_x_ = 0;
4390 }
4391 
4393  return max_gpu_slab_size_;
4394 }
4395 
4396 int64_t Executor::deviceCycles(int milliseconds) const {
4397  const auto& dev_props = cudaMgr()->getAllDeviceProperties();
4398  return static_cast<int64_t>(dev_props.front().clockKhz) * milliseconds;
4399 }
4400 
4401 llvm::Value* Executor::castToFP(llvm::Value* value,
4402  SQLTypeInfo const& from_ti,
4403  SQLTypeInfo const& to_ti) {
4405  if (value->getType()->isIntegerTy() && from_ti.is_number() && to_ti.is_fp() &&
4406  (!from_ti.is_fp() || from_ti.get_size() != to_ti.get_size())) {
4407  llvm::Type* fp_type{nullptr};
4408  switch (to_ti.get_size()) {
4409  case 4:
4410  fp_type = llvm::Type::getFloatTy(cgen_state_->context_);
4411  break;
4412  case 8:
4413  fp_type = llvm::Type::getDoubleTy(cgen_state_->context_);
4414  break;
4415  default:
4416  LOG(FATAL) << "Unsupported FP size: " << to_ti.get_size();
4417  }
4418  value = cgen_state_->ir_builder_.CreateSIToFP(value, fp_type);
4419  if (from_ti.get_scale()) {
4420  value = cgen_state_->ir_builder_.CreateFDiv(
4421  value,
4422  llvm::ConstantFP::get(value->getType(), exp_to_scale(from_ti.get_scale())));
4423  }
4424  }
4425  return value;
4426 }
4427 
4428 llvm::Value* Executor::castToIntPtrTyIn(llvm::Value* val, const size_t bitWidth) {
4430  CHECK(val->getType()->isPointerTy());
4431 
4432  const auto val_ptr_type = static_cast<llvm::PointerType*>(val->getType());
4433  const auto val_type = val_ptr_type->getPointerElementType();
4434  size_t val_width = 0;
4435  if (val_type->isIntegerTy()) {
4436  val_width = val_type->getIntegerBitWidth();
4437  } else {
4438  if (val_type->isFloatTy()) {
4439  val_width = 32;
4440  } else {
4441  CHECK(val_type->isDoubleTy());
4442  val_width = 64;
4443  }
4444  }
4445  CHECK_LT(size_t(0), val_width);
4446  if (bitWidth == val_width) {
4447  return val;
4448  }
4449  return cgen_state_->ir_builder_.CreateBitCast(
4450  val, llvm::PointerType::get(get_int_type(bitWidth, cgen_state_->context_), 0));
4451 }
4452 
4453 #define EXECUTE_INCLUDE
4454 #include "ArrayOps.cpp"
4455 #include "DateAdd.cpp"
4456 #include "GeoOps.cpp"
4457 #include "RowFunctionOps.cpp"
4458 #include "StringFunctions.cpp"
4460 #undef EXECUTE_INCLUDE
4461 
4462 namespace {
4464  const ColumnDescriptor* deleted_cd,
4465  const shared::TableKey& table_key) {
4466  auto deleted_cols_it = deleted_cols_map.find(table_key);
4467  if (deleted_cols_it == deleted_cols_map.end()) {
4468  CHECK(deleted_cols_map.insert(std::make_pair(table_key, deleted_cd)).second);
4469  } else {
4470  CHECK_EQ(deleted_cd, deleted_cols_it->second);
4471  }
4472 }
4473 } // namespace
4474 
4475 std::tuple<RelAlgExecutionUnit, PlanState::DeletedColumnsMap> Executor::addDeletedColumn(
4476  const RelAlgExecutionUnit& ra_exe_unit,
4477  const CompilationOptions& co) {
4478  if (!co.filter_on_deleted_column) {
4479  return std::make_tuple(ra_exe_unit, PlanState::DeletedColumnsMap{});
4480  }
4481  auto ra_exe_unit_with_deleted = ra_exe_unit;
4482  PlanState::DeletedColumnsMap deleted_cols_map;
4483  for (const auto& input_table : ra_exe_unit_with_deleted.input_descs) {
4484  if (input_table.getSourceType() != InputSourceType::TABLE) {
4485  continue;
4486  }
4487  const auto& table_key = input_table.getTableKey();
4488  const auto catalog =
4490  CHECK(catalog);
4491  const auto td = catalog->getMetadataForTable(table_key.table_id);
4492  CHECK(td);
4493  const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
4494  if (!deleted_cd) {
4495  continue;
4496  }
4497  CHECK(deleted_cd->columnType.is_boolean());
4498  // check deleted column is not already present
4499  bool found = false;
4500  for (const auto& input_col : ra_exe_unit_with_deleted.input_col_descs) {
4501  if (input_col.get()->getColId() == deleted_cd->columnId &&
4502  input_col.get()->getScanDesc().getTableKey() == table_key &&
4503  input_col.get()->getScanDesc().getNestLevel() == input_table.getNestLevel()) {
4504  found = true;
4505  add_deleted_col_to_map(deleted_cols_map, deleted_cd, table_key);
4506  break;
4507  }
4508  }
4509  if (!found) {
4510  // add deleted column
4511  ra_exe_unit_with_deleted.input_col_descs.emplace_back(
4512  new InputColDescriptor(deleted_cd->columnId,
4513  deleted_cd->tableId,
4514  table_key.db_id,
4515  input_table.getNestLevel()));
4516  add_deleted_col_to_map(deleted_cols_map, deleted_cd, table_key);
4517  }
4518  }
4519  return std::make_tuple(ra_exe_unit_with_deleted, deleted_cols_map);
4520 }
4521 
4522 namespace {
4523 // Note(Wamsi): `get_hpt_overflow_underflow_safe_scaled_value` will return `true` for safe
4524 // scaled epoch value and `false` for overflow/underflow values as the first argument of
4525 // return type.
4526 std::tuple<bool, int64_t, int64_t> get_hpt_overflow_underflow_safe_scaled_values(
4527  const int64_t chunk_min,
4528  const int64_t chunk_max,
4529  const SQLTypeInfo& lhs_type,
4530  const SQLTypeInfo& rhs_type) {
4531  const int32_t ldim = lhs_type.get_dimension();
4532  const int32_t rdim = rhs_type.get_dimension();
4533  CHECK(ldim != rdim);
4534  const auto scale = DateTimeUtils::get_timestamp_precision_scale(abs(rdim - ldim));
4535  if (ldim > rdim) {
4536  // LHS type precision is more than RHS col type. No chance of overflow/underflow.
4537  return {true, chunk_min / scale, chunk_max / scale};
4538  }
4539 
4540  using checked_int64_t = boost::multiprecision::number<
4541  boost::multiprecision::cpp_int_backend<64,
4542  64,
4543  boost::multiprecision::signed_magnitude,
4544  boost::multiprecision::checked,
4545  void>>;
4546 
4547  try {
4548  auto ret =
4549  std::make_tuple(true,
4550  int64_t(checked_int64_t(chunk_min) * checked_int64_t(scale)),
4551  int64_t(checked_int64_t(chunk_max) * checked_int64_t(scale)));
4552  return ret;
4553  } catch (const std::overflow_error& e) {
4554  // noop
4555  }
4556  return std::make_tuple(false, chunk_min, chunk_max);
4557 }
4558 
4559 } // namespace
4560 
4562  const InputDescriptor& table_desc,
4563  const Fragmenter_Namespace::FragmentInfo& fragment) {
4564  // Skip temporary tables
4565  const auto& table_key = table_desc.getTableKey();
4566  if (table_key.table_id < 0) {
4567  return false;
4568  }
4569 
4570  const auto catalog =
4572  CHECK(catalog);
4573  const auto td = catalog->getMetadataForTable(fragment.physicalTableId);
4574  CHECK(td);
4575  const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
4576  if (!deleted_cd) {
4577  return false;
4578  }
4579 
4580  const auto& chunk_type = deleted_cd->columnType;
4581  CHECK(chunk_type.is_boolean());
4582 
4583  const auto deleted_col_id = deleted_cd->columnId;
4584  auto chunk_meta_it = fragment.getChunkMetadataMap().find(deleted_col_id);
4585  if (chunk_meta_it != fragment.getChunkMetadataMap().end()) {
4586  const int64_t chunk_min =
4587  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4588  const int64_t chunk_max =
4589  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4590  if (chunk_min == 1 && chunk_max == 1) { // Delete chunk if metadata says full bytemap
4591  // is true (signifying all rows deleted)
4592  return true;
4593  }
4594  }
4595  return false;
4596 }
4597 
4599  const Analyzer::BinOper* comp_expr,
4600  const Analyzer::ColumnVar* lhs_col,
4601  const Fragmenter_Namespace::FragmentInfo& fragment,
4602  const Analyzer::Constant* rhs_const) const {
4603  auto col_id = lhs_col->getColumnKey().column_id;
4604  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4605  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4607  }
4608  double chunk_min{0.};
4609  double chunk_max{0.};
4610  const auto& chunk_type = lhs_col->get_type_info();
4611  chunk_min = extract_min_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4612  chunk_max = extract_max_stat_fp_type(chunk_meta_it->second->chunkStats, chunk_type);
4613  if (chunk_min > chunk_max) {
4615  }
4616 
4617  const auto datum_fp = rhs_const->get_constval();
4618  const auto rhs_type = rhs_const->get_type_info().get_type();
4619  CHECK(rhs_type == kFLOAT || rhs_type == kDOUBLE);
4620 
4621  // Do we need to codegen the constant like the integer path does?
4622  const auto rhs_val = rhs_type == kFLOAT ? datum_fp.floatval : datum_fp.doubleval;
4623 
4624  // Todo: dedup the following comparison code with the integer/timestamp path, it is
4625  // slightly tricky due to do cleanly as we do not have rowid on this path
4626  switch (comp_expr->get_optype()) {
4627  case kGE:
4628  if (chunk_max < rhs_val) {
4630  }
4631  break;
4632  case kGT:
4633  if (chunk_max <= rhs_val) {
4635  }
4636  break;
4637  case kLE:
4638  if (chunk_min > rhs_val) {
4640  }
4641  break;
4642  case kLT:
4643  if (chunk_min >= rhs_val) {
4645  }
4646  break;
4647  case kEQ:
4648  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4650  }
4651  break;
4652  default:
4653  break;
4654  }
4656 }
4657 
4658 std::pair<bool, int64_t> Executor::skipFragment(
4659  const InputDescriptor& table_desc,
4660  const Fragmenter_Namespace::FragmentInfo& fragment,
4661  const std::list<std::shared_ptr<Analyzer::Expr>>& simple_quals,
4662  const std::vector<uint64_t>& frag_offsets,
4663  const size_t frag_idx) {
4664  // First check to see if all of fragment is deleted, in which case we know we can skip
4665  if (isFragmentFullyDeleted(table_desc, fragment)) {
4666  VLOG(2) << "Skipping deleted fragment with table id: " << fragment.physicalTableId
4667  << ", fragment id: " << frag_idx;
4668  return {true, -1};
4669  }
4670 
4671  for (const auto& simple_qual : simple_quals) {
4672  const auto comp_expr =
4673  std::dynamic_pointer_cast<const Analyzer::BinOper>(simple_qual);
4674  if (!comp_expr) {
4675  // is this possible?
4676  return {false, -1};
4677  }
4678  const auto lhs = comp_expr->get_left_operand();
4679  auto lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs);
4680  if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4681  // See if lhs is a simple cast that was allowed through normalize_simple_predicate
4682  auto lhs_uexpr = dynamic_cast<const Analyzer::UOper*>(lhs);
4683  if (lhs_uexpr) {
4684  CHECK(lhs_uexpr->get_optype() ==
4685  kCAST); // We should have only been passed a cast expression
4686  lhs_col = dynamic_cast<const Analyzer::ColumnVar*>(lhs_uexpr->get_operand());
4687  if (!lhs_col || !lhs_col->getColumnKey().table_id || lhs_col->get_rte_idx()) {
4688  continue;
4689  }
4690  } else {
4691  continue;
4692  }
4693  }
4694  const auto rhs = comp_expr->get_right_operand();
4695  const auto rhs_const = dynamic_cast<const Analyzer::Constant*>(rhs);
4696  if (!rhs_const) {
4697  // is this possible?
4698  return {false, -1};
4699  }
4700  if (!lhs->get_type_info().is_integer() && !lhs->get_type_info().is_time() &&
4701  !lhs->get_type_info().is_fp()) {
4702  continue;
4703  }
4704  if (lhs->get_type_info().is_fp()) {
4705  const auto fragment_skip_status =
4706  canSkipFragmentForFpQual(comp_expr.get(), lhs_col, fragment, rhs_const);
4707  switch (fragment_skip_status) {
4709  return {true, -1};
4711  return {false, -1};
4713  continue;
4714  default:
4715  UNREACHABLE();
4716  }
4717  }
4718 
4719  // Everything below is logic for integer and integer-backed timestamps
4720  // TODO: Factor out into separate function per canSkipFragmentForFpQual above
4721 
4722  if (lhs_col->get_type_info().is_timestamp() &&
4723  rhs_const->get_type_info().is_any<kTIME>()) {
4724  // when casting from a timestamp to time
4725  // is not possible to get a valid range
4726  // so we can't skip any fragment
4727  continue;
4728  }
4729 
4730  const int col_id = lhs_col->getColumnKey().column_id;
4731  auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
4732  int64_t chunk_min{0};
4733  int64_t chunk_max{0};
4734  bool is_rowid{false};
4735  size_t start_rowid{0};
4736  const auto& table_key = table_desc.getTableKey();
4737  if (chunk_meta_it == fragment.getChunkMetadataMap().end()) {
4738  auto cd = get_column_descriptor({table_key, col_id});
4739  if (cd->isVirtualCol) {
4740  CHECK(cd->columnName == "rowid");
4741  const auto& table_generation = getTableGeneration(table_key);
4742  start_rowid = table_generation.start_rowid;
4743  chunk_min = frag_offsets[frag_idx] + start_rowid;
4744  chunk_max = frag_offsets[frag_idx + 1] - 1 + start_rowid;
4745  is_rowid = true;
4746  }
4747  } else {
4748  const auto& chunk_type = lhs_col->get_type_info();
4749  chunk_min =
4750  extract_min_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4751  chunk_max =
4752  extract_max_stat_int_type(chunk_meta_it->second->chunkStats, chunk_type);
4753  }
4754  if (chunk_min > chunk_max) {
4755  // invalid metadata range, do not skip fragment
4756  return {false, -1};
4757  }
4758  if (lhs->get_type_info().is_timestamp() &&
4759  (lhs_col->get_type_info().get_dimension() !=
4760  rhs_const->get_type_info().get_dimension()) &&
4761  (lhs_col->get_type_info().is_high_precision_timestamp() ||
4762  rhs_const->get_type_info().is_high_precision_timestamp())) {
4763  // If original timestamp lhs col has different precision,
4764  // column metadata holds value in original precision
4765  // therefore adjust rhs value to match lhs precision
4766 
4767  // Note(Wamsi): We adjust rhs const value instead of lhs value to not
4768  // artificially limit the lhs column range. RHS overflow/underflow is already
4769  // been validated in `TimeGM::get_overflow_underflow_safe_epoch`.
4770  bool is_valid;
4771  std::tie(is_valid, chunk_min, chunk_max) =
4773  chunk_min, chunk_max, lhs_col->get_type_info(), rhs_const->get_type_info());
4774  if (!is_valid) {
4775  VLOG(4) << "Overflow/Underflow detecting in fragments skipping logic.\nChunk min "
4776  "value: "
4777  << std::to_string(chunk_min)
4778  << "\nChunk max value: " << std::to_string(chunk_max)
4779  << "\nLHS col precision is: "
4780  << std::to_string(lhs_col->get_type_info().get_dimension())
4781  << "\nRHS precision is: "
4782  << std::to_string(rhs_const->get_type_info().get_dimension()) << ".";
4783  return {false, -1};
4784  }
4785  }
4786  if (lhs_col->get_type_info().is_timestamp() && rhs_const->get_type_info().is_date()) {
4787  // It is obvious that a cast from timestamp to date is happening here,
4788  // so we have to correct the chunk min and max values to lower the precision as of
4789  // the date
4790  chunk_min = DateTruncateHighPrecisionToDate(
4791  chunk_min, pow(10, lhs_col->get_type_info().get_dimension()));
4792  chunk_max = DateTruncateHighPrecisionToDate(
4793  chunk_max, pow(10, lhs_col->get_type_info().get_dimension()));
4794  }
4795  llvm::LLVMContext local_context;
4796  CgenState local_cgen_state(local_context);
4797  CodeGenerator code_generator(&local_cgen_state, nullptr);
4798 
4799  const auto rhs_val =
4800  CodeGenerator::codegenIntConst(rhs_const, &local_cgen_state)->getSExtValue();
4801 
4802  switch (comp_expr->get_optype()) {
4803  case kGE:
4804  if (chunk_max < rhs_val) {
4805  return {true, -1};
4806  }
4807  break;
4808  case kGT:
4809  if (chunk_max <= rhs_val) {
4810  return {true, -1};
4811  }
4812  break;
4813  case kLE:
4814  if (chunk_min > rhs_val) {
4815  return {true, -1};
4816  }
4817  break;
4818  case kLT:
4819  if (chunk_min >= rhs_val) {
4820  return {true, -1};
4821  }
4822  break;
4823  case kEQ:
4824  if (chunk_min > rhs_val || chunk_max < rhs_val) {
4825  return {true, -1};
4826  } else if (is_rowid) {
4827  return {false, rhs_val - start_rowid};
4828  }
4829  break;
4830  default:
4831  break;
4832  }
4833  }
4834  return {false, -1};
4835 }
4836 
4837 /*
4838  * The skipFragmentInnerJoins process all quals stored in the execution unit's
4839  * join_quals and gather all the ones that meet the "simple_qual" characteristics
4840  * (logical expressions with AND operations, etc.). It then uses the skipFragment function
4841  * to decide whether the fragment should be skipped or not. The fragment will be skipped
4842  * if at least one of these skipFragment calls return a true statment in its first value.
4843  * - The code depends on skipFragment's output to have a meaningful (anything but -1)
4844  * second value only if its first value is "false".
4845  * - It is assumed that {false, n > -1} has higher priority than {true, -1},
4846  * i.e., we only skip if none of the quals trigger the code to update the
4847  * rowid_lookup_key
4848  * - Only AND operations are valid and considered:
4849  * - `select * from t1,t2 where A and B and C`: A, B, and C are considered for causing
4850  * the skip
4851  * - `select * from t1,t2 where (A or B) and C`: only C is considered
4852  * - `select * from t1,t2 where A or B`: none are considered (no skipping).
4853  * - NOTE: (re: intermediate projections) the following two queries are fundamentally
4854  * implemented differently, which cause the first one to skip correctly, but the second
4855  * one will not skip.
4856  * - e.g. #1, select * from t1 join t2 on (t1.i=t2.i) where (A and B); -- skips if
4857  * possible
4858  * - e.g. #2, select * from t1 join t2 on (t1.i=t2.i and A and B); -- intermediate
4859  * projection, no skipping
4860  */
4861 std::pair<bool, int64_t> Executor::skipFragmentInnerJoins(
4862  const InputDescriptor& table_desc,
4863  const RelAlgExecutionUnit& ra_exe_unit,
4864  const Fragmenter_Namespace::FragmentInfo& fragment,
4865  const std::vector<uint64_t>& frag_offsets,
4866  const size_t frag_idx) {
4867  std::pair<bool, int64_t> skip_frag{false, -1};
4868  for (auto& inner_join : ra_exe_unit.join_quals) {
4869  if (inner_join.type != JoinType::INNER) {
4870  continue;
4871  }
4872 
4873  // extracting all the conjunctive simple_quals from the quals stored for the inner
4874  // join
4875  std::list<std::shared_ptr<Analyzer::Expr>> inner_join_simple_quals;
4876  for (auto& qual : inner_join.quals) {
4877  auto temp_qual = qual_to_conjunctive_form(qual);
4878  inner_join_simple_quals.insert(inner_join_simple_quals.begin(),
4879  temp_qual.simple_quals.begin(),
4880  temp_qual.simple_quals.end());
4881  }
4882  auto temp_skip_frag = skipFragment(
4883  table_desc, fragment, inner_join_simple_quals, frag_offsets, frag_idx);
4884  if (temp_skip_frag.second != -1) {
4885  skip_frag.second = temp_skip_frag.second;
4886  return skip_frag;
4887  } else {
4888  skip_frag.first = skip_frag.first || temp_skip_frag.first;
4889  }
4890  }
4891  return skip_frag;
4892 }
4893 
4895  const std::unordered_set<PhysicalInput>& phys_inputs) {
4896  AggregatedColRange agg_col_range_cache;
4897  std::unordered_set<shared::TableKey> phys_table_keys;
4898  for (const auto& phys_input : phys_inputs) {
4899  phys_table_keys.emplace(phys_input.db_id, phys_input.table_id);
4900  }
4901  std::vector<InputTableInfo> query_infos;
4902  for (const auto& table_key : phys_table_keys) {
4903  query_infos.emplace_back(InputTableInfo{table_key, getTableInfo(table_key)});
4904  }
4905  for (const auto& phys_input : phys_inputs) {
4906  auto db_id = phys_input.db_id;
4907  auto table_id = phys_input.table_id;
4908  auto column_id = phys_input.col_id;
4909  const auto cd =
4910  Catalog_Namespace::get_metadata_for_column({db_id, table_id, column_id});
4911  CHECK(cd);
4912  if (ExpressionRange::typeSupportsRange(cd->columnType)) {
4913  const auto col_var = std::make_unique<Analyzer::ColumnVar>(
4914  cd->columnType, shared::ColumnKey{db_id, table_id, column_id}, 0);
4915  const auto col_range = getLeafColumnRange(col_var.get(), query_infos, this, false);
4916  agg_col_range_cache.setColRange(phys_input, col_range);
4917  }
4918  }
4919  return agg_col_range_cache;
4920 }
4921 
4923  const std::unordered_set<PhysicalInput>& phys_inputs) {
4924  StringDictionaryGenerations string_dictionary_generations;
4925  // Foreign tables may have not populated dictionaries for encoded columns. If this is
4926  // the case then we need to populate them here to make sure that the generations are set
4927  // correctly.
4928  prepare_string_dictionaries(phys_inputs);
4929  for (const auto& phys_input : phys_inputs) {
4930  const auto catalog =
4932  CHECK(catalog);
4933  const auto cd = catalog->getMetadataForColumn(phys_input.table_id, phys_input.col_id);
4934  CHECK(cd);
4935  const auto& col_ti =
4936  cd->columnType.is_array() ? cd->columnType.get_elem_type() : cd->columnType;
4937  if (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT) {
4938  const auto& dict_key = col_ti.getStringDictKey();
4939  const auto dd = catalog->getMetadataForDict(dict_key.dict_id);
4940  CHECK(dd && dd->stringDict);
4941  string_dictionary_generations.setGeneration(dict_key,
4942  dd->stringDict->storageEntryCount());
4943  }
4944  }
4945  return string_dictionary_generations;
4946 }
4947 
4949  const std::unordered_set<shared::TableKey>& phys_table_keys) {
4950  TableGenerations table_generations;
4951  for (const auto& table_key : phys_table_keys) {
4952  const auto table_info = getTableInfo(table_key);
4953  table_generations.setGeneration(
4954  table_key,
4955  TableGeneration{static_cast<int64_t>(table_info.getPhysicalNumTuples()), 0});
4956  }
4957  return table_generations;
4958 }
4959 
4960 void Executor::setupCaching(const std::unordered_set<PhysicalInput>& phys_inputs,
4961  const std::unordered_set<shared::TableKey>& phys_table_ids) {
4963  std::make_shared<RowSetMemoryOwner>(Executor::getArenaBlockSize(), executor_id_);
4964  row_set_mem_owner_->setDictionaryGenerations(
4965  computeStringDictionaryGenerations(phys_inputs));
4967  table_generations_ = computeTableGenerations(phys_table_ids);
4968 }
4969 
4971  return recycler_mutex_;
4972 }
4973 
4975  return query_plan_dag_cache_;
4976 }
4977 
4980 }
4981 
4983  return executor_session_mutex_;
4984 }
4985 
4988  return current_query_session_;
4989 }
4990 
4992  const QuerySessionId& candidate_query_session,
4994  // if current_query_session is equal to the candidate_query_session,
4995  // or it is empty session we consider
4996  return !candidate_query_session.empty() &&
4997  (current_query_session_ == candidate_query_session);
4998 }
4999 
5000 // used only for testing
5002  const QuerySessionId& candidate_query_session,
5004  if (queries_session_map_.count(candidate_query_session) &&
5005  !queries_session_map_.at(candidate_query_session).empty()) {
5006  return queries_session_map_.at(candidate_query_session)
5007  .begin()
5008  ->second.getQueryStatus();
5009  }
5010  return QuerySessionStatus::QueryStatus::UNDEFINED;
5011 }
5012 
5016 }
5017 
5019  const QuerySessionId& query_session_id,
5020  const std::string& query_str,
5021  const std::string& query_submitted_time) {
5022  if (!query_session_id.empty()) {
5023  // if session is valid, do update 1) the exact executor id and 2) query status
5026  query_session_id, query_submitted_time, executor_id_, write_lock);
5027  updateQuerySessionStatusWithLock(query_session_id,
5028  query_submitted_time,
5029  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR,
5030  write_lock);
5031  }
5032  return {query_session_id, query_str};
5033 }
5034 
5036  // check whether we are okay to execute the "pending" query
5037  // i.e., before running the query check if this query session is "ALREADY" interrupted
5039  if (query_session.empty()) {
5040  return;
5041  }
5042  if (queries_interrupt_flag_.find(query_session) == queries_interrupt_flag_.end()) {
5043  // something goes wrong since we assume this is caller's responsibility
5044  // (call this function only for enrolled query session)
5045  if (!queries_session_map_.count(query_session)) {
5046  VLOG(1) << "Interrupting pending query is not available since the query session is "
5047  "not enrolled";
5048  } else {
5049  // here the query session is enrolled but the interrupt flag is not registered
5050  VLOG(1)
5051  << "Interrupting pending query is not available since its interrupt flag is "
5052  "not registered";
5053  }
5054  return;
5055  }
5056  if (queries_interrupt_flag_[query_session]) {
5057  throw QueryExecutionError(ErrorCode::INTERRUPTED);
5058  }
5059 }
5060 
5062  const std::string& submitted_time_str) {
5064  // clear the interrupt-related info for a finished query
5065  if (query_session.empty()) {
5066  return;
5067  }
5068  removeFromQuerySessionList(query_session, submitted_time_str, session_write_lock);
5069  if (query_session.compare(current_query_session_) == 0) {
5070  invalidateRunningQuerySession(session_write_lock);
5071  resetInterrupt();
5072  }
5073 }
5074 
5076  const QuerySessionId& query_session,
5077  const std::string& submitted_time_str,
5078  const QuerySessionStatus::QueryStatus new_query_status) {
5079  // update the running query session's the current status
5081  if (query_session.empty()) {
5082  return;
5083  }
5084  if (new_query_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5085  current_query_session_ = query_session;
5086  }
5088  query_session, submitted_time_str, new_query_status, session_write_lock);
5089 }
5090 
5092  const QuerySessionId& query_session,
5093  const std::string& query_str,
5094  const std::string& submitted_time_str,
5095  const size_t executor_id,
5096  const QuerySessionStatus::QueryStatus query_session_status) {
5097  // enroll the query session into the Executor's session map
5099  if (query_session.empty()) {
5100  return;
5101  }
5102 
5103  addToQuerySessionList(query_session,
5104  query_str,
5105  submitted_time_str,
5106  executor_id,
5107  query_session_status,
5108  session_write_lock);
5109 
5110  if (query_session_status == QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5111  current_query_session_ = query_session;
5112  }
5113 }
5114 
5117  return queries_session_map_.size();
5118 }
5119 
5121  const QuerySessionId& query_session,
5122  const std::string& query_str,
5123  const std::string& submitted_time_str,
5124  const size_t executor_id,
5125  const QuerySessionStatus::QueryStatus query_status,
5127  // an internal API that enrolls the query session into the Executor's session map
5128  if (queries_session_map_.count(query_session)) {
5129  if (queries_session_map_.at(query_session).count(submitted_time_str)) {
5130  queries_session_map_.at(query_session).erase(submitted_time_str);
5131  queries_session_map_.at(query_session)
5132  .emplace(submitted_time_str,
5133  QuerySessionStatus(query_session,
5134  executor_id,
5135  query_str,
5136  submitted_time_str,
5137  query_status));
5138  } else {
5139  queries_session_map_.at(query_session)
5140  .emplace(submitted_time_str,
5141  QuerySessionStatus(query_session,
5142  executor_id,
5143  query_str,
5144  submitted_time_str,
5145  query_status));
5146  }
5147  } else {
5148  std::map<std::string, QuerySessionStatus> executor_per_query_map;
5149  executor_per_query_map.emplace(
5150  submitted_time_str,
5152  query_session, executor_id, query_str, submitted_time_str, query_status));
5153  queries_session_map_.emplace(query_session, executor_per_query_map);
5154  }
5155  return queries_interrupt_flag_.emplace(query_session, false).second;
5156 }
5157 
5159  const QuerySessionId& query_session,
5160  const std::string& submitted_time_str,
5161  const QuerySessionStatus::QueryStatus updated_query_status,
5163  // an internal API that updates query session status
5164  if (query_session.empty()) {
5165  return false;
5166  }
5167  if (queries_session_map_.count(query_session)) {
5168  for (auto& query_status : queries_session_map_.at(query_session)) {
5169  auto target_submitted_t_str = query_status.second.getQuerySubmittedTime();
5170  // no time difference --> found the target query status
5171  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
5172  auto prev_status = query_status.second.getQueryStatus();
5173  if (prev_status == updated_query_status) {
5174  return false;
5175  }
5176  query_status.second.setQueryStatus(updated_query_status);
5177  return true;
5178  }
5179  }
5180  }
5181  return false;
5182 }
5183 
5185  const QuerySessionId& query_session,
5186  const std::string& submitted_time_str,
5187  const size_t executor_id,
5189  // update the executor id of the query session
5190  if (query_session.empty()) {
5191  return false;
5192  }
5193  if (queries_session_map_.count(query_session)) {
5194  auto storage = queries_session_map_.at(query_session);
5195  for (auto it = storage.begin(); it != storage.end(); it++) {
5196  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
5197  // no time difference --> found the target query status
5198  if (submitted_time_str.compare(target_submitted_t_str) == 0) {
5199  queries_session_map_.at(query_session)
5200  .at(submitted_time_str)
5201  .setExecutorId(executor_id);
5202  return true;
5203  }
5204  }
5205  }
5206  return false;
5207 }
5208 
5210  const QuerySessionId& query_session,
5211  const std::string& submitted_time_str,
5213  if (query_session.empty()) {
5214  return false;
5215  }
5216  if (queries_session_map_.count(query_session)) {
5217  auto& storage = queries_session_map_.at(query_session);
5218  if (storage.size() > 1) {
5219  // in this case we only remove query executor info
5220  for (auto it = storage.begin(); it != storage.end(); it++) {
5221  auto target_submitted_t_str = it->second.getQuerySubmittedTime();
5222  // no time difference && have the same executor id--> found the target query
5223  if (it->second.getExecutorId() == executor_id_ &&
5224  submitted_time_str.compare(target_submitted_t_str) == 0) {
5225  storage.erase(it);
5226  return true;
5227  }
5228  }
5229  } else if (storage.size() == 1) {
5230  // here this session only has a single query executor
5231  // so we clear both executor info and its interrupt flag
5232  queries_session_map_.erase(query_session);
5233  queries_interrupt_flag_.erase(query_session);
5234  if (interrupted_.load()) {
5235  interrupted_.store(false);
5236  }
5237  return true;
5238  }
5239  }
5240  return false;
5241 }
5242 
5244  const QuerySessionId& query_session,
5246  if (query_session.empty()) {
5247  return;
5248  }
5249  if (queries_interrupt_flag_.find(query_session) != queries_interrupt_flag_.end()) {
5250  queries_interrupt_flag_[query_session] = true;
5251  }
5252 }
5253 
5255  const QuerySessionId& query_session,
5257  if (query_session.empty()) {
5258  return false;
5259  }
5260  auto flag_it = queries_interrupt_flag_.find(query_session);
5261  return !query_session.empty() && flag_it != queries_interrupt_flag_.end() &&
5262  flag_it->second;
5263 }
5264 
5266  const QuerySessionId& query_session,
5268  if (query_session.empty()) {
5269  return false;
5270  }
5271  return !query_session.empty() && queries_session_map_.count(query_session);
5272 }
5273 
5275  const double runtime_query_check_freq,
5276  const unsigned pending_query_check_freq) const {
5277  // The only one scenario that we intentionally call this function is
5278  // to allow runtime query interrupt in QueryRunner for test cases.
5279  // Because test machine's default setting does not allow runtime query interrupt,
5280  // so we have to turn it on within test code if necessary.
5282  g_pending_query_interrupt_freq = pending_query_check_freq;
5283  g_running_query_interrupt_freq = runtime_query_check_freq;
5286  }
5287 }
5288 
5290  const size_t cache_value) {
5293  cardinality_cache_[cache_key] = cache_value;
5294  VLOG(1) << "Put estimated cardinality to the cache";
5295  }
5296 }
5297 
5299  const CardinalityCacheKey& cache_key) {
5302  cardinality_cache_.find(cache_key) != cardinality_cache_.end()) {
5303  VLOG(1) << "Reuse cached cardinality";
5304  return {true, cardinality_cache_[cache_key]};
5305  }
5306  return {false, -1};
5307 }
5308 
5312  cardinality_cache_.clear();
5313  }
5314 }
5315 
5319  for (auto it = cardinality_cache_.begin(); it != cardinality_cache_.end();) {
5320  if (it->first.containsTableKey(table_key)) {
5321  it = cardinality_cache_.erase(it);
5322  } else {
5323  it++;
5324  }
5325  }
5326  }
5327 }
5328 
5329 std::vector<QuerySessionStatus> Executor::getQuerySessionInfo(
5330  const QuerySessionId& query_session,
5332  if (!queries_session_map_.empty() && queries_session_map_.count(query_session)) {
5333  auto& query_infos = queries_session_map_.at(query_session);
5334  std::vector<QuerySessionStatus> ret;
5335  for (auto& info : query_infos) {
5336  ret.emplace_back(query_session,
5337  info.second.getExecutorId(),
5338  info.second.getQueryStr(),
5339  info.second.getQuerySubmittedTime(),
5340  info.second.getQueryStatus());
5341  }
5342  return ret;
5343  }
5344  return {};
5345 }
5346 
5347 const std::vector<size_t> Executor::getExecutorIdsRunningQuery(
5348  const QuerySessionId& interrupt_session) const {
5349  std::vector<size_t> res;
5351  auto it = queries_session_map_.find(interrupt_session);
5352  if (it != queries_session_map_.end()) {
5353  for (auto& kv : it->second) {
5354  if (kv.second.getQueryStatus() ==
5355  QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL) {
5356  res.push_back(kv.second.getExecutorId());
5357  }
5358  }
5359  }
5360  return res;
5361 }
5362 
5364  // this function should be called within an executor which is assigned
5365  // to the specific query thread (that indicates we already enroll the session)
5366  // check whether this is called from non unitary executor
5368  return false;
5369  };
5371  auto flag_it = queries_interrupt_flag_.find(current_query_session_);
5372  return !current_query_session_.empty() && flag_it != queries_interrupt_flag_.end() &&
5373  flag_it->second;
5374 }
5375 
5377  // this function is called under the recycler lock
5378  // e.g., QueryPlanDagExtractor::extractQueryPlanDagImpl()
5379  latest_query_plan_extracted_ = query_plan_dag;
5380 }
5381 
5385 }
5386 
5388  const size_t num_cpu_slots,
5389  const size_t num_gpu_slots,
5390  const size_t cpu_result_mem,
5391  const size_t cpu_buffer_pool_mem,
5392  const size_t gpu_buffer_pool_mem,
5393  const double per_query_max_cpu_slots_ratio,
5394  const double per_query_max_cpu_result_mem_ratio,
5395  const bool allow_cpu_kernel_concurrency,
5396  const bool allow_cpu_gpu_kernel_concurrency,
5397  const bool allow_cpu_slot_oversubscription_concurrency,
5398  const bool allow_cpu_result_mem_oversubscription_concurrency,
5399  const double max_available_resource_use_ratio) {
5400  const double per_query_max_pinned_cpu_buffer_pool_mem_ratio{1.0};
5401  const double per_query_max_pageable_cpu_buffer_pool_mem_ratio{0.5};
5403  num_cpu_slots,
5404  num_gpu_slots,
5405  cpu_result_mem,
5406  cpu_buffer_pool_mem,
5407  gpu_buffer_pool_mem,
5408  per_query_max_cpu_slots_ratio,
5409  per_query_max_cpu_result_mem_ratio,
5410  per_query_max_pinned_cpu_buffer_pool_mem_ratio,
5411  per_query_max_pageable_cpu_buffer_pool_mem_ratio,
5412  allow_cpu_kernel_concurrency,
5413  allow_cpu_gpu_kernel_concurrency,
5414  allow_cpu_slot_oversubscription_concurrency,
5415  true, // allow_gpu_slot_oversubscription
5416  allow_cpu_result_mem_oversubscription_concurrency,
5417  max_available_resource_use_ratio);
5418 }
5419 
5422  throw std::runtime_error(
5423  "Executor queue cannot be paused as it requires Executor Resource Manager to be "
5424  "enabled");
5425  }
5426  executor_resource_mgr_->pause_process_queue();
5427 }
5428 
5431  throw std::runtime_error(
5432  "Executor queue cannot be resumed as it requires Executor Resource Manager to be "
5433  "enabled");
5434  }
5435  executor_resource_mgr_->resume_process_queue();
5436 }
5437 
5439  const ExecutorResourceMgr_Namespace::ResourceType resource_type) {
5441  throw std::runtime_error(
5442  "ExecutorResourceMgr must be enabled to obtain executor resource pool stats.");
5443  }
5444  return executor_resource_mgr_->get_resource_info(resource_type).second;
5445 }
5446 
5450  throw std::runtime_error(
5451  "ExecutorResourceMgr must be enabled to obtain executor resource pool stats.");
5452  }
5453  return executor_resource_mgr_->get_resource_info();
5454 }
5455 
5457  const ExecutorResourceMgr_Namespace::ResourceType resource_type,
5458  const size_t resource_quantity) {
5460  throw std::runtime_error(
5461  "ExecutorResourceMgr must be enabled to set executor resource pool resource.");
5462  }
5463  executor_resource_mgr_->set_resource(resource_type, resource_quantity);
5464 }
5465 
5468  const ExecutorResourceMgr_Namespace::ResourceType resource_type) {
5470  throw std::runtime_error(
5471  "ExecutorResourceMgr must be enabled to set executor concurrent resource grant "
5472  "policy.");
5473  }
5474  return executor_resource_mgr_->get_concurrent_resource_grant_policy(resource_type);
5475 }
5476 
5479  concurrent_resource_grant_policy) {
5481  throw std::runtime_error(
5482  "ExecutorResourceMgr must be enabled to set executor concurrent resource grant "
5483  "policy.");
5484  }
5485  executor_resource_mgr_->set_concurrent_resource_grant_policy(
5486  concurrent_resource_grant_policy);
5487 }
5488 
5489 std::map<int, std::shared_ptr<Executor>> Executor::executors_;
5490 
5491 // contain the interrupt flag's status per query session
5493 // contain a list of queries per query session
5495 // session lock
5497 
5500 
5504 
5506 std::mutex Executor::kernel_mutex_;
5507 
5508 std::shared_ptr<ExecutorResourceMgr_Namespace::ExecutorResourceMgr>
5510 
5513 std::unordered_map<CardinalityCacheKey, size_t> Executor::cardinality_cache_;
5514 // Executor has a single global result set recycler holder
5515 // which contains two recyclers related to query resultset
5518 
5519 // Useful for debugging.
5520 std::string Executor::dumpCache() const {
5521  std::stringstream ss;
5522  ss << "colRangeCache: ";
5523  for (auto& [phys_input, exp_range] : agg_col_range_cache_.asMap()) {
5524  ss << "{" << phys_input.col_id << ", " << phys_input.table_id
5525  << "} = " << exp_range.toString() << ", ";
5526  }
5527  ss << "stringDictGenerations: ";
5528  for (auto& [key, val] : row_set_mem_owner_->getStringDictionaryGenerations().asMap()) {
5529  ss << key << " = " << val << ", ";
5530  }
5531  ss << "tableGenerations: ";
5532  for (auto& [key, val] : table_generations_.asMap()) {
5533  ss << key << " = {" << val.tuple_count << ", " << val.start_rowid << "}, ";
5534  }
5535  ss << "\n";
5536  return ss.str();
5537 }
void logSystemGPUMemoryStatus(std::string const &tag, size_t const thread_idx) const
Definition: Execute.cpp:776
A container for various stats about the current state of the ExecutorResourcePool. Note that ExecutorResourcePool does not persist a struct of this type, but rather builds one on the fly when ExecutorResourcePool::get_resource_info() is called.
CudaMgr_Namespace::CudaMgr * getCudaMgr() const
Definition: DataMgr.h:177
size_t g_watchdog_in_clause_max_num_elem_non_bitmap
Definition: Execute.cpp:85
void executeWorkUnitPerFragment(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, PerFragmentCallBack &cb, const std::set< size_t > &fragment_indexes_param)
Compiles and dispatches a work unit per fragment processing results with the per fragment callback...
Definition: Execute.cpp:2365
bool is_agg(const Analyzer::Expr *expr)
std::vector< Analyzer::Expr * > target_exprs
A container to store requested and minimum neccessary resource requests across all resource types cur...
AggregatedColRange computeColRangesCache(const std::unordered_set< PhysicalInput > &phys_inputs)
Definition: Execute.cpp:4894
void enableRuntimeQueryInterrupt(const double runtime_query_check_freq, const unsigned pending_query_check_freq) const
Definition: Execute.cpp:5274