OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ExecuteUpdate.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryEngine/Execute.h"
18 
25 
27 
29  size_t const fragment_index,
30  const std::shared_ptr<ResultSet>& rs)
31  : fragment_info_(fragment_info), fragment_index_(fragment_index), rs_(rs) {
32  rs->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
33 }
34 
35 std::vector<TargetValue> UpdateLogForFragment::getEntryAt(const size_t index) const {
36  return rs_->getRowAtNoTranslations(index);
37 }
38 
40  const size_t index) const {
41  return rs_->getRowAt(index);
42 }
43 
44 size_t const UpdateLogForFragment::getRowCount() const {
45  return rs_->rowCount();
46 }
47 
49  const {
50  return fragment_info_;
51 }
52 
54  return rs_->entryCount();
55 }
56 
58  return fragment_index_;
59 }
60 
61 SQLTypeInfo UpdateLogForFragment::getColumnType(const size_t col_idx) const {
62  return rs_->getColType(col_idx);
63 }
64 
66  const RelAlgExecutionUnit& ra_exe_unit_in,
67  const std::vector<InputTableInfo>& table_infos,
68  const TableDescriptor* table_desc_for_update,
69  const CompilationOptions& co,
70  const ExecutionOptions& eo,
72  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
74  const bool is_agg) {
75  CHECK(cb);
76  CHECK(table_desc_for_update);
77  VLOG(1) << "Executor " << executor_id_
78  << " is executing update/delete work unit:" << ra_exe_unit_in;
79 
80  const auto [ra_exe_unit, deleted_cols_map] = addDeletedColumn(ra_exe_unit_in, co);
81  ColumnCacheMap column_cache;
82 
83  ColumnFetcher column_fetcher(this, column_cache);
84  CHECK_GT(ra_exe_unit.input_descs.size(), size_t(0));
85  const auto& outer_table_key = ra_exe_unit.input_descs[0].getTableKey();
86  CHECK_EQ(outer_table_key, table_infos.front().table_key);
87  const auto& outer_fragments = table_infos.front().info.fragments;
88 
89  std::vector<FragmentsPerTable> fragments = {{{0, 0}, {0}}};
90  for (size_t tab_idx = 1; tab_idx < ra_exe_unit.input_descs.size(); tab_idx++) {
91  const auto& table_key = ra_exe_unit.input_descs[tab_idx].getTableKey();
92  CHECK_EQ(table_infos[tab_idx].table_key, table_key);
93  const auto& fragmentsPerTable = table_infos[tab_idx].info.fragments;
94  FragmentsPerTable entry = {table_key, {}};
95  for (size_t innerFragId = 0; innerFragId < fragmentsPerTable.size(); innerFragId++) {
96  entry.fragment_ids.push_back(innerFragId);
97  }
98  fragments.push_back(entry);
99  }
100 
101  if (outer_fragments.empty()) {
102  return {};
103  }
104 
105  const auto max_tuple_count_fragment_it = std::max_element(
106  outer_fragments.begin(), outer_fragments.end(), [](const auto& a, const auto& b) {
107  return a.getNumTuples() < b.getNumTuples();
108  });
109  CHECK(max_tuple_count_fragment_it != outer_fragments.end());
110  int64_t global_max_groups_buffer_entry_guess =
111  max_tuple_count_fragment_it->getNumTuples();
112  if (is_agg) {
113  global_max_groups_buffer_entry_guess = std::min(
114  2 * global_max_groups_buffer_entry_guess, static_cast<int64_t>(100'000'000));
115  }
116 
117  auto query_comp_desc = std::make_unique<QueryCompilationDescriptor>();
118  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc;
119  {
120  query_mem_desc = query_comp_desc->compile(global_max_groups_buffer_entry_guess,
121  8,
122  /*has_cardinality_estimation=*/true,
123  ra_exe_unit,
124  table_infos,
125  deleted_cols_map,
126  column_fetcher,
127  co,
128  eo,
129  nullptr,
130  this);
131  }
132  CHECK(query_mem_desc);
133  // Since we execute updates one thread/fragment at a time,
134  // buffer re-use is not applicable and can cause issues
135  // when the contents of the output buffer are written to storage
136  query_mem_desc->setThreadsCanReuseGroupByBuffers(false);
137 
138  TableUpdateMetadata table_update_metadata;
139  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
140  ++fragment_index) {
141  const int64_t crt_fragment_tuple_count =
142  outer_fragments[fragment_index].getNumTuples();
143  if (crt_fragment_tuple_count == 0) {
144  // nothing to update
145  continue;
146  }
147  SharedKernelContext shared_context(table_infos);
148  const auto& frag_offsets = shared_context.getFragOffsets();
149  auto skip_frag = skipFragment(ra_exe_unit.input_descs[0],
150  outer_fragments[fragment_index],
151  ra_exe_unit.simple_quals,
152  frag_offsets,
153  fragment_index);
154  if (skip_frag.first) {
155  VLOG(2) << "Update/delete skipping fragment with table id: "
156  << outer_fragments[fragment_index].physicalTableId
157  << ", fragment id: " << fragment_index;
158  continue;
159  }
160  fragments[0] = {outer_table_key, {fragment_index}};
161  {
162  ExecutionKernel current_fragment_kernel(ra_exe_unit,
164  0,
165  eo,
166  column_fetcher,
167  *query_comp_desc,
168  *query_mem_desc,
169  fragments,
171  /*render_info=*/nullptr,
172  /*rowid_lookup_key=*/-1);
173 
174  auto clock_begin = timer_start();
175  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
176  kernel_queue_time_ms_ += timer_stop(clock_begin);
177 
178  current_fragment_kernel.run(this, 0, shared_context);
179  }
180  const auto& proj_fragment_results = shared_context.getFragmentResults();
181  if (proj_fragment_results.empty()) {
182  continue;
183  }
184  const auto& proj_fragment_result = proj_fragment_results[0];
185  const auto proj_result_set = proj_fragment_result.first;
186  CHECK(proj_result_set);
187  cb({outer_fragments[fragment_index], fragment_index, proj_result_set},
188  table_update_metadata);
189  }
190 
192  auto td = cat.getMetadataForTable(table_desc_for_update->tableId);
193  TableOptimizer table_optimizer{td, this, cat};
194  table_optimizer.recomputeMetadataUnlocked(table_update_metadata);
195  }
196  return table_update_metadata;
197 }
SQLTypeInfo getColumnType(const size_t col_idx) const
bool is_agg(const Analyzer::Expr *expr)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::string cat(Ts &&...args)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
const std::vector< uint64_t > & getFragOffsets()
size_t const getFragmentIndex() const
Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanu...
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_auto_metadata_update
TableUpdateMetadata executeUpdate(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const TableDescriptor *updated_table_desc, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const UpdateLogForFragment::Callback &cb, const bool is_agg)
#define CHECK_GT(x, y)
Definition: Logger.h:305
Container for compilation results and assorted options for a single execution unit.
std::function< void(const UpdateLogForFragment &, TableUpdateMetadata &)> Callback
Definition: Execute.h:370
constexpr double a
Definition: Utm.h:32
std::vector< TargetValue > getEntryAt(const size_t index) const override
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:86
FragmentInfoType const & getFragmentInfo() const
size_t fragment_index_
Definition: Execute.h:376
std::vector< TargetValue > getTranslatedEntryAt(const size_t index) const override
std::shared_ptr< ResultSet > rs_
Definition: Execute.h:377
void run(Executor *executor, const size_t thread_idx, SharedKernelContext &shared_context)
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
FragmentInfoType const & fragment_info_
Definition: Execute.h:375
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
#define CHECK(condition)
Definition: Logger.h:291
std::vector< size_t > fragment_ids
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
UpdateLogForFragment(FragmentInfoType const &fragment_info, size_t const, const std::shared_ptr< ResultSet > &rs)
size_t const getRowCount() const override
Descriptor for the fragments required for an execution kernel.
#define VLOG(n)
Definition: Logger.h:388
Type timer_start()
Definition: measure.h:42
size_t const getEntryCount() const override