29 size_t const fragment_index,
30 const std::shared_ptr<ResultSet>& rs)
31 : fragment_info_(fragment_info), fragment_index_(fragment_index), rs_(rs) {
36 return rs_->getRowAtNoTranslations(index);
40 const size_t index)
const {
41 return rs_->getRowAt(index);
45 return rs_->rowCount();
54 return rs_->entryCount();
62 return rs_->getColType(col_idx);
67 const std::vector<InputTableInfo>& table_infos,
72 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
76 CHECK(table_desc_for_update);
77 VLOG(1) <<
"Executor " << executor_id_
78 <<
" is executing update/delete work unit:" << ra_exe_unit_in;
80 const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
84 CHECK_GT(ra_exe_unit.input_descs.size(), size_t(0));
85 const auto& outer_table_key = ra_exe_unit.input_descs[0].getTableKey();
86 CHECK_EQ(outer_table_key, table_infos.front().table_key);
87 const auto& outer_fragments = table_infos.front().info.fragments;
89 std::vector<FragmentsPerTable> fragments = {{{0, 0}, {0}}};
90 for (
size_t tab_idx = 1; tab_idx < ra_exe_unit.input_descs.size(); tab_idx++) {
91 const auto& table_key = ra_exe_unit.input_descs[tab_idx].getTableKey();
92 CHECK_EQ(table_infos[tab_idx].table_key, table_key);
93 const auto& fragmentsPerTable = table_infos[tab_idx].info.fragments;
95 for (
size_t innerFragId = 0; innerFragId < fragmentsPerTable.size(); innerFragId++) {
98 fragments.push_back(entry);
101 if (outer_fragments.empty()) {
105 const auto max_tuple_count_fragment_it = std::max_element(
106 outer_fragments.begin(), outer_fragments.end(), [](
const auto&
a,
const auto& b) {
107 return a.getNumTuples() < b.getNumTuples();
109 CHECK(max_tuple_count_fragment_it != outer_fragments.end());
110 int64_t global_max_groups_buffer_entry_guess =
111 max_tuple_count_fragment_it->getNumTuples();
113 global_max_groups_buffer_entry_guess = std::min(
114 2 * global_max_groups_buffer_entry_guess, static_cast<int64_t>(100
'000'000));
117 auto query_comp_desc = std::make_unique<QueryCompilationDescriptor>();
120 query_mem_desc = query_comp_desc->compile(global_max_groups_buffer_entry_guess,
132 CHECK(query_mem_desc);
136 query_mem_desc->setThreadsCanReuseGroupByBuffers(
false);
139 for (
size_t fragment_index = 0; fragment_index < outer_fragments.size();
141 const int64_t crt_fragment_tuple_count =
142 outer_fragments[fragment_index].getNumTuples();
143 if (crt_fragment_tuple_count == 0) {
149 auto skip_frag = skipFragment(ra_exe_unit.input_descs[0],
150 outer_fragments[fragment_index],
151 ra_exe_unit.simple_quals,
154 if (skip_frag.first) {
155 VLOG(2) <<
"Update/delete skipping fragment with table id: "
156 << outer_fragments[fragment_index].physicalTableId
157 <<
", fragment id: " << fragment_index;
160 fragments[0] = {outer_table_key, {fragment_index}};
175 std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
176 kernel_queue_time_ms_ +=
timer_stop(clock_begin);
178 current_fragment_kernel.
run(
this, 0, shared_context);
181 if (proj_fragment_results.empty()) {
184 const auto& proj_fragment_result = proj_fragment_results[0];
185 const auto proj_result_set = proj_fragment_result.first;
186 CHECK(proj_result_set);
187 cb({outer_fragments[fragment_index], fragment_index, proj_result_set},
188 table_update_metadata);
194 table_optimizer.recomputeMetadataUnlocked(table_update_metadata);
196 return table_update_metadata;
SQLTypeInfo getColumnType(const size_t col_idx) const
bool is_agg(const Analyzer::Expr *expr)
class for a per-database catalog. also includes metadata for the current database and the current use...
const std::vector< uint64_t > & getFragOffsets()
size_t const getFragmentIndex() const
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
TypeR::rep timer_stop(Type clock_begin)
bool g_enable_auto_metadata_update
TableUpdateMetadata executeUpdate(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const TableDescriptor *updated_table_desc, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const UpdateLogForFragment::Callback &cb, const bool is_agg)
Container for compilation results and assorted options for a single execution unit.
std::function< void(const UpdateLogForFragment &, TableUpdateMetadata &)> Callback
std::vector< TargetValue > getEntryAt(const size_t index) const override
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
FragmentInfoType const & getFragmentInfo() const
std::vector< TargetValue > getTranslatedEntryAt(const size_t index) const override
std::shared_ptr< ResultSet > rs_
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
FragmentInfoType const & fragment_info_
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
std::vector< size_t > fragment_ids
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
UpdateLogForFragment(FragmentInfoType const &fragment_info, size_t const, const std::shared_ptr< ResultSet > &rs)
size_t const getRowCount() const override
Descriptor for the fragments required for an execution kernel.
size_t const getEntryCount() const override