196 const auto& outer_tab_frag_ids =
frag_list[0].fragment_ids;
201 auto data_mgr = executor->getDataMgr();
202 executor->logSystemCPUMemoryStatus(
"Before Query Execution", thread_idx);
204 executor->logSystemGPUMemoryStatus(
"Before Query Execution", thread_idx);
208 auto chunk_iterators_ptr = std::make_shared<std::list<ChunkIter>>();
209 std::list<std::shared_ptr<Chunk_NS::Chunk>> chunks;
210 std::unique_ptr<std::lock_guard<std::mutex>> gpu_lock;
211 std::unique_ptr<CudaAllocator> device_allocator;
214 new std::lock_guard<std::mutex>(executor->gpu_exec_mutex_[
chosen_device_id]));
215 device_allocator = std::make_unique<CudaAllocator>(
218 std::shared_ptr<FetchResult> fetch_result(
new FetchResult);
220 std::map<shared::TableKey, const TableFragments*> all_tables_fragments;
229 all_tables_fragments,
231 *chunk_iterators_ptr,
233 device_allocator.get(),
240 all_tables_fragments,
242 *chunk_iterators_ptr,
244 device_allocator.get(),
247 if (fetch_result->num_rows.empty()) {
254 LOG(
INFO) <<
"Dynamic Watchdog budget: CPU: "
261 : ErrorCode::OUT_OF_CPU_MEM,
270 throw std::runtime_error(
"Joins not supported through external execution");
277 executor->row_set_mem_owner_,
280 group_by_and_aggregate.initQueryMemoryDescriptor(
false, 0, 8,
nullptr,
false);
284 executor->plan_state_.get(),
293 std::unique_ptr<QueryExecutionContext> query_exe_context_owned;
296 int64_t total_num_input_rows{-1};
299 total_num_input_rows = 0;
300 std::for_each(fetch_result->num_rows.begin(),
301 fetch_result->num_rows.end(),
302 [&total_num_input_rows](
const std::vector<int64_t>& frag_row_count) {
304 frag_row_count.end(),
305 total_num_input_rows);
307 VLOG(2) <<
"total_num_input_rows=" << total_num_input_rows;
311 if (total_num_input_rows == 0) {
320 uint32_t start_rowid{0};
323 const auto& all_frag_row_offsets = shared_context.
getFragOffsets();
325 all_frag_row_offsets[
frag_list.begin()->fragment_ids.front()];
331 query_mem_desc.setAvailableCpuThreads(
335 bool can_run_subkernels = shared_context.getThreadPool() !=
nullptr;
352 can_run_subkernels &&
358 if (can_run_subkernels) {
359 size_t total_rows = fetch_result->num_rows[0][0];
362 for (
size_t sub_start = start_rowid; sub_start < total_rows; sub_start += sub_size) {
363 sub_size = (sub_start + sub_size > total_rows) ? total_rows - sub_start : sub_size;
364 auto subtask = std::make_shared<KernelSubtask>(*
this,
368 total_num_input_rows,
372 shared_context.getThreadPool()->run(
373 [subtask, executor] { subtask->run(executor); });
387 query_exe_context_owned =
394 total_num_input_rows,
395 fetch_result->col_buffers,
396 fetch_result->frag_offsets,
397 executor->getRowSetMemoryOwner(),
399 query_mem_desc.sortOnGpu(),
407 CHECK(query_exe_context);
409 bool optimize_cuda_block_and_grid_sizes =
413 executor->logSystemCPUMemoryStatus(
"After Query Memory Initialization", thread_idx);
422 fetch_result->col_buffers,
424 fetch_result->num_rows,
425 fetch_result->frag_offsets,
432 optimize_cuda_block_and_grid_sizes);
435 VLOG(1) <<
"outer_table_key=" << outer_table_key
443 fetch_result->col_buffers,
446 fetch_result->num_rows,
447 fetch_result->frag_offsets,
456 optimize_cuda_block_and_grid_sizes);
459 std::list<std::shared_ptr<Chunk_NS::Chunk>> chunks_to_hold;
460 for (
const auto& chunk : chunks) {
465 chunks_to_hold.push_back(chunk);
471 VLOG(1) <<
"null device_results.";
476 shared_context.addDeviceResults(std::move(
device_results_), outer_tab_frag_ids);
477 executor->logSystemCPUMemoryStatus(
"After Query Execution", thread_idx);
479 executor->logSystemGPUMemoryStatus(
"After Query Execution", thread_idx);
bool need_to_hold_chunk(const Chunk_NS::Chunk *chunk, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ExecutorDeviceType device_type)
std::vector< Analyzer::Expr * > target_exprs
std::atomic_flag dynamic_watchdog_set
const ExecutionOptions & eo
size_t g_cpu_sub_task_size
const std::vector< uint64_t > & getFragOffsets()
static const int max_gpu_count
bool with_dynamic_watchdog
const std::optional< bool > union_all
const ExecutorDispatchMode kernel_dispatch_mode
const RelAlgExecutionUnit & ra_exe_unit_
size_t get_available_cpu_threads_per_task(Executor *executor, SharedKernelContext &shared_context)
const int64_t rowid_lookup_key
void addDeviceResults(ResultSetPtr &&device_results, std::vector< size_t > outer_table_fragment_ids)
std::vector< InputDescriptor > input_descs
const ExecutorDeviceType chosen_device_type
bool hoistLiterals() const
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
std::unique_ptr< ResultSet > run_query_external(const ExecutionUnitSql &sql, const FetchResult &fetch_result, const PlanState *plan_state, const ExternalQueryOutputSpec &output_spec)
RenderInfo * render_info_
ExecutorType executor_type
const QueryMemoryDescriptor & query_mem_desc
DEVICE auto accumulate(ARGS &&...args)
const QueryCompilationDescriptor & query_comp_desc
static void computeAllTablesFragments(std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
const std::shared_ptr< Analyzer::Estimator > estimator
QueryDescriptionType getQueryDescriptionType() const
RUNTIME_EXPORT uint64_t dynamic_watchdog_init(unsigned ms_budget)
const FragmentsList frag_list
ExecutionUnitSql serialize_to_sql(const RelAlgExecutionUnit *ra_exe_unit)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
bool optimize_cuda_block_and_grid_sizes
bool query_has_inner_join(const RelAlgExecutionUnit &ra_exe_unit)
const std::vector< InputTableInfo > & getQueryInfos() const
ResultSetPtr device_results_
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
unsigned dynamic_watchdog_time_limit
bool allow_runtime_query_interrupt
auto getCompilationResult() const
const ColumnFetcher & column_fetcher