27 const std::vector<InputTableInfo>& query_infos,
28 const std::vector<Data_Namespace::MemoryInfo>& gpu_mem_infos,
29 const double gpu_input_mem_limit_percent,
30 std::vector<size_t> allowed_outer_fragment_indices)
31 : allowed_outer_fragment_indices_(allowed_outer_fragment_indices)
32 , gpu_input_mem_limit_percent_(gpu_input_mem_limit_percent) {
33 const size_t input_desc_count{ra_exe_unit.
input_descs.size()};
34 CHECK_EQ(query_infos.size(), input_desc_count);
35 for (
size_t table_idx = 0; table_idx < input_desc_count; ++table_idx) {
36 const auto& table_key = ra_exe_unit.
input_descs[table_idx].getTableKey();
42 for (
size_t device_id = 0; device_id < gpu_mem_infos.size(); device_id++) {
43 const auto& gpu_mem_info = gpu_mem_infos[device_id];
45 gpu_mem_info.maxNumPages * gpu_mem_info.pageSize;
50 std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
52 const std::vector<InputTableInfo>& query_infos) {
53 for (
size_t tab_idx = 0; tab_idx < ra_exe_unit.
input_descs.size(); ++tab_idx) {
54 const auto& table_key = ra_exe_unit.
input_descs[tab_idx].getTableKey();
55 CHECK_EQ(query_infos[tab_idx].table_key, table_key);
56 const auto& fragments = query_infos[tab_idx].info.fragments;
57 if (!all_tables_fragments.count(table_key)) {
58 all_tables_fragments.insert(std::make_pair(table_key, &fragments));
65 const std::vector<uint64_t>& frag_offsets,
66 const int device_count,
68 const bool enable_multifrag_kernels,
69 const bool enable_inner_join_fragment_skipping,
73 std::set<shared::TableKey> lhs_table_keys;
74 for (
const auto& input_desc : ra_exe_unit.
input_descs) {
75 if (input_desc.getNestLevel() == 0) {
76 lhs_table_keys.insert(input_desc.getTableKey());
80 const auto num_bytes_for_row = executor->getNumBytesForFetchedRow(lhs_table_keys);
89 }
else if (enable_multifrag_kernels) {
95 enable_inner_join_fragment_skipping,
111 const bool is_temporary_table,
112 const std::vector<uint64_t>& frag_offsets,
113 const int device_count,
114 const size_t num_bytes_for_row,
116 const std::optional<size_t> table_desc_offset,
118 Executor* executor) {
119 auto get_fragment_tuple_count = [&deleted_chunk_metadata_vec, &is_temporary_table](
120 const auto& fragment) -> std::optional<size_t> {
125 if (is_temporary_table) {
130 if (deleted_chunk_metadata_vec.empty()) {
131 return fragment.getNumTuples();
133 const auto fragment_id = fragment.fragmentId;
135 if (static_cast<size_t>(fragment_id) < deleted_chunk_metadata_vec.size()) {
136 const auto& chunk_metadata = deleted_chunk_metadata_vec[fragment_id];
137 if (chunk_metadata.second->chunkStats.max.tinyintval == 1) {
141 return fragment.getNumTuples();
144 for (
size_t i = 0; i < fragments->size(); i++) {
153 const auto& fragment = (*fragments)[i];
154 const auto skip_frag = executor->skipFragment(
155 table_desc, fragment, ra_exe_unit.
simple_quals, frag_offsets, i);
156 if (skip_frag.first) {
160 const int chosen_device_count =
167 ? fragment.deviceIds[static_cast<int>(memory_level)]
168 : fragment.shard % chosen_device_count;
175 device_id, {}, get_fragment_tuple_count(fragment)};
176 if (table_desc_offset) {
177 const auto frag_ids =
178 executor->getTableFragmentIndices(ra_exe_unit,
183 executor->getInnerTabIdToJoinCond());
184 const auto& table_key = ra_exe_unit.
input_descs[*table_desc_offset].getTableKey();
185 execution_kernel_desc.fragments.emplace_back(
189 for (
size_t j = 0; j < ra_exe_unit.
input_descs.size(); ++j) {
190 const auto frag_ids =
191 executor->getTableFragmentIndices(ra_exe_unit,
196 executor->getInnerTabIdToJoinCond());
197 const auto& table_key = ra_exe_unit.
input_descs[j].getTableKey();
201 execution_kernel_desc.fragments.emplace_back(
210 std::vector<ExecutionKernelDescriptor>{std::move(execution_kernel_desc)}));
213 itr->second.emplace_back(std::move(execution_kernel_desc));
220 const std::vector<uint64_t>& frag_offsets,
221 const int device_count,
222 const size_t num_bytes_for_row,
224 Executor* executor) {
225 for (
size_t j = 0; j < ra_exe_unit.
input_descs.size(); ++j) {
226 auto const& table_desc = ra_exe_unit.
input_descs[j];
227 const auto& table_key = table_desc.getTableKey();
230 auto data_mgr = executor->getDataMgr();
233 bool is_temporary_table =
false;
234 if (table_key.table_id > 0) {
243 is_temporary_table =
true;
245 const auto deleted_cd = executor->plan_state_->getDeletedColForTable(table_key);
248 table_key.db_id, table_key.table_id, deleted_cd->columnId};
249 data_mgr->getChunkMetadataVecForKeyPrefix(deleted_chunk_metadata_vec,
267 std::vector<int> table_ids =
271 [](
auto&& vec,
auto& exe_kern) {
272 vec.push_back(exe_kern.fragments[0].table_key.table_id);
275 VLOG(1) <<
"execution_kernels_per_device_.size()="
277 <<
" execution_kernels_per_device_[0][*].fragments[0].table_id="
284 const std::vector<uint64_t>& frag_offsets,
285 const int device_count,
286 const size_t num_bytes_for_row,
288 Executor* executor) {
289 const auto& outer_table_desc = ra_exe_unit.
input_descs.front();
290 const auto& outer_table_key = outer_table_desc.getTableKey();
293 const auto outer_fragments = it->second;
298 bool is_temporary_table =
false;
299 if (outer_table_key.table_id > 0) {
305 const auto td = catalog->getMetadataForTable(outer_table_key.table_id);
311 is_temporary_table =
true;
313 const auto deleted_cd = catalog->getDeletedColumnIfRowsDeleted(td);
317 td->fragmenter->getFragmenterId();
318 auto frags = td->fragmenter->getFragmentsForQuery().fragments;
319 for (
auto frag : frags) {
321 frag.getChunkMetadataMapPhysical().find(deleted_cd->columnId);
322 if (chunk_meta_it != frag.getChunkMetadataMapPhysical().end()) {
323 const auto& chunk_meta = chunk_meta_it->second;
324 ChunkKey chunk_key_prefix = {outer_table_key.db_id,
325 outer_table_key.table_id,
326 deleted_cd->columnId,
328 deleted_chunk_metadata_vec.emplace_back(
329 std::pair{chunk_key_prefix, chunk_meta});
343 deleted_chunk_metadata_vec,
351 const std::vector<uint64_t>& frag_offsets,
352 const int device_count,
353 const size_t num_bytes_for_row,
355 const bool enable_inner_join_fragment_skipping,
356 Executor* executor) {
361 const auto& outer_table_desc = ra_exe_unit.
input_descs.front();
362 const auto& outer_table_key = outer_table_desc.getTableKey();
365 const auto outer_fragments = it->second;
368 const auto inner_table_id_to_join_condition = executor->getInnerTabIdToJoinCond();
370 for (
size_t outer_frag_id = 0; outer_frag_id < outer_fragments->size();
380 const auto& fragment = (*outer_fragments)[outer_frag_id];
381 auto skip_frag = executor->skipFragment(outer_table_desc,
386 if (enable_inner_join_fragment_skipping &&
387 (skip_frag == std::pair<bool, int64_t>(
false, -1))) {
388 skip_frag = executor->skipFragmentInnerJoins(
389 outer_table_desc, ra_exe_unit, fragment, frag_offsets, outer_frag_id);
391 if (skip_frag.first) {
394 const int device_id =
397 : fragment.shard % device_count;
401 for (
size_t j = 0; j < ra_exe_unit.
input_descs.size(); ++j) {
402 const auto& table_key = ra_exe_unit.
input_descs[j].getTableKey();
405 const auto frag_ids =
406 executor->getTableFragmentIndices(ra_exe_unit,
411 inner_table_id_to_join_condition);
415 std::vector<ExecutionKernelDescriptor> kernel_descs{
427 auto& kernel_frag_list = execution_kernel.fragments;
428 if (kernel_frag_list.size() < j + 1) {
431 CHECK_EQ(kernel_frag_list[j].table_key, table_key);
432 auto& curr_frag_ids = kernel_frag_list[j].fragment_ids;
433 for (
const int frag_id : frag_ids) {
434 if (std::find(curr_frag_ids.begin(), curr_frag_ids.end(), frag_id) ==
435 curr_frag_ids.end()) {
436 curr_frag_ids.push_back(frag_id);
465 const auto sample_query_limit =
472 tuple_count >= sample_query_limit) {
482 const size_t num_bytes_for_row) {
489 const size_t gpu_bytes_limit =
492 LOG(
WARNING) <<
"Not enough memory on device " << device_id
493 <<
" for input chunks totaling "
495 <<
" bytes (available device memory: " << gpu_bytes_limit <<
" bytes)";
501 os << fragments_per_table.
table_key <<
", fragment_ids";
502 for (
size_t i = 0; i < fragments_per_table.
fragment_ids.size(); ++i) {
503 os << (i ?
' ' :
'(') << fragments_per_table.
fragment_ids[i];
size_t getNumTuples() const
QueryFragmentDescriptor(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const std::vector< Data_Namespace::MemoryInfo > &gpu_mem_infos, const double gpu_input_mem_limit_percent, const std::vector< size_t > allowed_outer_fragment_indices)
std::vector< int > ChunkKey
std::optional< size_t > outer_tuple_count
std::map< size_t, size_t > tuple_count_per_device_
bool terminateDispatchMaybe(size_t &tuple_count, const RelAlgExecutionUnit &ra_exe_unit, const ExecutionKernelDescriptor &kernel) const
const std::optional< bool > union_all
int64_t rowid_lookup_key_
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
std::vector< Fragmenter_Namespace::FragmentInfo > TableFragments
std::vector< InputDescriptor > input_descs
const TableDescriptor * get_metadata_for_table(const ::shared::TableKey &table_key, bool populate_fragmenter)
const std::list< std::shared_ptr< Analyzer::Expr > > groupby_exprs
void buildFragmentPerKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)
std::vector< FragmentsPerTable > FragmentsList
double gpu_input_mem_limit_percent_
static SysCatalog & instance()
std::map< int, std::vector< ExecutionKernelDescriptor > > execution_kernels_per_device_
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
std::optional< size_t > limit
std::list< Analyzer::OrderEntry > order_entries
void checkDeviceMemoryUsage(const Fragmenter_Namespace::FragmentInfo &fragment, const int device_id, const size_t num_cols)
DEVICE auto accumulate(ARGS &&...args)
bool is_sample_query(const RelAlgExecutionUnit &ra_exe_unit)
size_t outer_fragments_size_
static void computeAllTablesFragments(std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos)
void buildMultifragKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, const bool enable_inner_join_fragment_skipping, Executor *executor)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
shared::TableKey table_key
bool table_is_temporary(const TableDescriptor *const td)
std::list< std::shared_ptr< Analyzer::Expr > > quals
void buildFragmentPerKernelForTable(const TableFragments *fragments, const RelAlgExecutionUnit &ra_exe_unit, const InputDescriptor &table_desc, const bool is_temporary_table, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ChunkMetadataVector &deleted_chunk_metadata_vec, const std::optional< size_t > table_desc_offset, const ExecutorDeviceType &device_type, Executor *executor)
std::vector< size_t > allowed_outer_fragment_indices_
void buildFragmentPerKernelMapForUnion(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const size_t num_bytes_for_row, const ExecutorDeviceType &device_type, Executor *executor)
std::vector< size_t > fragment_ids
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
std::map< shared::TableKey, const TableFragments * > selected_tables_fragments_
void buildFragmentKernelMap(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< uint64_t > &frag_offsets, const int device_count, const ExecutorDeviceType &device_type, const bool enable_multifrag_kernels, const bool enable_inner_join_fragment_skipping, Executor *executor)
std::map< size_t, size_t > available_gpu_mem_bytes_
Descriptor for the fragments required for an execution kernel.
std::list< std::shared_ptr< Analyzer::Expr > > simple_quals