OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
JoinFilterPushDown.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "JoinFilterPushDown.h"
18 #include "DeepCopyVisitor.h"
19 #include "RelAlgExecutor.h"
20 
21 namespace {
22 
24  std::shared_ptr<Analyzer::Expr> visitColumnVar(
25  const Analyzer::ColumnVar* col_var) const override {
26  return makeExpr<Analyzer::ColumnVar>(
27  col_var->get_type_info(), col_var->getColumnKey(), 0);
28  }
29 };
30 
32  : public ScalarExprVisitor<std::unordered_set<InputColDescriptor>> {
33  std::unordered_set<InputColDescriptor> visitColumnVar(
34  const Analyzer::ColumnVar* col_var) const override {
35  const auto& column_key = col_var->getColumnKey();
36  return {InputColDescriptor(
37  column_key.column_id, column_key.table_id, column_key.db_id, 0)};
38  }
39 
40  public:
41  std::unordered_set<InputColDescriptor> aggregateResult(
42  const std::unordered_set<InputColDescriptor>& aggregate,
43  const std::unordered_set<InputColDescriptor>& next_result) const override {
44  auto result = aggregate;
45  result.insert(next_result.begin(), next_result.end());
46  return result;
47  }
48 };
49 
50 } // namespace
51 
60  const std::vector<std::shared_ptr<Analyzer::Expr>>& filter_expressions,
61  const CompilationOptions& co,
62  const ExecutionOptions& eo) {
63  CollectInputColumnsVisitor input_columns_visitor;
64  std::list<std::shared_ptr<Analyzer::Expr>> quals;
65  std::unordered_set<InputColDescriptor> input_column_descriptors;
66  BindFilterToOutermostVisitor bind_filter_to_outermost;
67  for (const auto& filter_expr : filter_expressions) {
68  input_column_descriptors = input_columns_visitor.aggregateResult(
69  input_column_descriptors, input_columns_visitor.visit(filter_expr.get()));
70  quals.push_back(bind_filter_to_outermost.visit(filter_expr.get()));
71  }
72  std::vector<InputDescriptor> input_descs;
73  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
74  for (const auto& input_col_desc : input_column_descriptors) {
75  if (input_descs.empty()) {
76  input_descs.push_back(input_col_desc.getScanDesc());
77  } else {
78  CHECK(input_col_desc.getScanDesc() == input_descs.front());
79  }
80  input_col_descs.push_back(std::make_shared<const InputColDescriptor>(input_col_desc));
81  }
82  const auto count_expr =
83  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
84  kCOUNT,
85  nullptr,
86  false,
87  nullptr);
88  RelAlgExecutionUnit ra_exe_unit{input_descs,
89  input_col_descs,
90  {},
91  quals,
92  {},
93  {},
94  {count_expr.get()},
95  {},
96  nullptr,
97  SortInfo(),
98  0};
99  size_t one{1};
100  ResultSetPtr filtered_result;
101  const auto table_infos = get_table_infos(input_descs, executor_);
102  CHECK_EQ(size_t(1), table_infos.size());
103  const size_t total_rows_upper_bound = table_infos.front().info.getNumTuplesUpperBound();
104  try {
105  ColumnCacheMap column_cache;
106  filtered_result = executor_->executeWorkUnit(
107  one, true, table_infos, ra_exe_unit, co, eo, nullptr, false, column_cache);
108  } catch (...) {
109  return {false, 1.0, 0};
110  }
111  const auto count_row = filtered_result->getNextRow(false, false);
112  CHECK_EQ(size_t(1), count_row.size());
113  const auto& count_tv = count_row.front();
114  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
115  CHECK(count_scalar_tv);
116  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
117  CHECK(count_ptr);
118  const auto rows_passing = *count_ptr;
119  const auto rows_total = std::max(total_rows_upper_bound, size_t(1));
120  return {true, static_cast<float>(rows_passing) / rows_total, total_rows_upper_bound};
121 }
122 
127 std::vector<PushedDownFilterInfo> RelAlgExecutor::selectFiltersToBePushedDown(
128  const RelAlgExecutor::WorkUnit& work_unit,
129  const CompilationOptions& co,
130  const ExecutionOptions& eo) {
131  const auto all_push_down_candidates =
133  work_unit.input_permutation,
134  work_unit.left_deep_join_input_sizes);
135  std::vector<PushedDownFilterInfo> selective_push_down_candidates;
136  const auto ti = get_table_infos(work_unit.exe_unit.input_descs, executor_);
138  for (const auto& candidate : all_push_down_candidates) {
139  const auto selectivity = getFilterSelectivity(candidate.filter_expressions, co, eo);
140  if (selectivity.is_valid && selectivity.isFilterSelectiveEnough()) {
141  selective_push_down_candidates.push_back(candidate);
142  }
143  }
144  }
145  return selective_push_down_candidates;
146 }
147 
149  const RaExecutionSequence& seq,
150  const CompilationOptions& co,
151  const ExecutionOptions& eo,
152  RenderInfo* render_info,
153  const int64_t queue_time_ms) {
154  // we currently do not fully support filter push down with
155  // multi-step execution and/or with subqueries
156  // TODO(Saman): add proper caching to enable filter push down for all cases
157  const auto& subqueries = getSubqueries();
158  if (seq.size() > 1 || !subqueries.empty()) {
159  if (eo.just_calcite_explain) {
160  return ExecutionResult(std::vector<PushedDownFilterInfo>{},
162  }
163  auto eo_modified = eo;
164  eo_modified.find_push_down_candidates = false;
165  eo_modified.just_calcite_explain = false;
166 
167  // Dispatch the subqueries first
168  for (auto subquery : subqueries) {
169  // Execute the subquery and cache the result.
170  RelAlgExecutor ra_executor(executor_);
171  const auto subquery_ra = subquery->getRelAlg();
172  CHECK(subquery_ra);
173  RaExecutionSequence subquery_seq(subquery_ra, executor_);
174  auto result =
175  ra_executor.executeRelAlgSeq(subquery_seq, co, eo_modified, nullptr, 0);
176  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
177  }
178  return executeRelAlgSeq(seq, co, eo_modified, render_info, queue_time_ms);
179  }
180  // else
181  return executeRelAlgSeq(seq, co, eo, render_info, queue_time_ms);
182 }
189  const std::vector<InputTableInfo>& table_infos) {
190  if (table_infos.size() < 2) {
191  return false;
192  }
193  // we currently do not support filter push down when there is a self-join involved:
194  // TODO(Saman): prevent Calcite from optimizing self-joins to remove this exclusion
195  std::unordered_set<shared::TableKey> table_keys;
196  for (auto ti : table_infos) {
197  if (table_keys.find(ti.table_key) == table_keys.end()) {
198  table_keys.insert(ti.table_key);
199  } else {
200  // a self-join is involved
201  return false;
202  }
203  }
204  // TODO(Saman): add some extra heuristics to avoid preflight count and push down if it
205  // is not going to be helpful.
206  return true;
207 }
208 
215 std::vector<PushedDownFilterInfo> find_push_down_filters(
216  const RelAlgExecutionUnit& ra_exe_unit,
217  const std::vector<size_t>& input_permutation,
218  const std::vector<size_t>& left_deep_join_input_sizes) {
219  std::vector<PushedDownFilterInfo> result;
220  if (left_deep_join_input_sizes.empty()) {
221  return result;
222  }
223  std::vector<size_t> input_size_prefix_sums(left_deep_join_input_sizes.size());
224  std::partial_sum(left_deep_join_input_sizes.begin(),
225  left_deep_join_input_sizes.end(),
226  input_size_prefix_sums.begin());
227  std::vector<int> to_original_rte_idx(ra_exe_unit.input_descs.size(),
228  ra_exe_unit.input_descs.size());
229  if (!input_permutation.empty()) {
230  CHECK_EQ(to_original_rte_idx.size(), input_permutation.size());
231  for (size_t i = 0; i < input_permutation.size(); ++i) {
232  CHECK_LT(input_permutation[i], to_original_rte_idx.size());
233  CHECK_EQ(static_cast<size_t>(to_original_rte_idx[input_permutation[i]]),
234  to_original_rte_idx.size());
235  to_original_rte_idx[input_permutation[i]] = i;
236  }
237  } else {
238  std::iota(to_original_rte_idx.begin(), to_original_rte_idx.end(), 0);
239  }
240  std::unordered_map<int, std::vector<std::shared_ptr<Analyzer::Expr>>>
241  filters_per_nesting_level;
242  for (const auto& level_conditions : ra_exe_unit.join_quals) {
244  for (const auto& cond : level_conditions.quals) {
245  const auto rte_indices = visitor.visit(cond.get());
246  if (rte_indices.size() > 1) {
247  continue;
248  }
249  const int rte_idx = (!rte_indices.empty()) ? *rte_indices.cbegin() : 0;
250  if (!rte_idx) {
251  continue;
252  }
253  CHECK_GE(rte_idx, 0);
254  CHECK_LT(static_cast<size_t>(rte_idx), to_original_rte_idx.size());
255  filters_per_nesting_level[to_original_rte_idx[rte_idx]].push_back(cond);
256  }
257  }
258  for (const auto& kv : filters_per_nesting_level) {
259  CHECK_GE(kv.first, 0);
260  CHECK_LT(static_cast<size_t>(kv.first), input_size_prefix_sums.size());
261  size_t input_prev = (kv.first > 1) ? input_size_prefix_sums[kv.first - 2] : 0;
262  size_t input_start = kv.first ? input_size_prefix_sums[kv.first - 1] : 0;
263  size_t input_next = input_size_prefix_sums[kv.first];
264  result.emplace_back(
265  PushedDownFilterInfo{kv.second, input_prev, input_start, input_next});
266  }
267  return result;
268 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
const std::vector< size_t > left_deep_join_input_sizes
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
RelAlgExecutionUnit exe_unit
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
FilterSelectivity getFilterSelectivity(const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
std::vector< InputDescriptor > input_descs
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
std::shared_ptr< ResultSet > ResultSetPtr
T visit(const Analyzer::Expr *expr) const
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
std::unordered_set< InputColDescriptor > aggregateResult(const std::unordered_set< InputColDescriptor > &aggregate, const std::unordered_set< InputColDescriptor > &next_result) const override
bool to_gather_info_for_filter_selectivity(const std::vector< InputTableInfo > &table_infos)
const JoinQualsPerNestingLevel join_quals
A container for relational algebra descriptors defining the execution order for a relational algebra ...
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
std::vector< PushedDownFilterInfo > find_push_down_filters(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< size_t > &input_permutation, const std::vector< size_t > &left_deep_join_input_sizes)
bool g_bigint_count
const std::vector< size_t > input_permutation
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
#define CHECK_LT(x, y)
Definition: Logger.h:303
const shared::ColumnKey & getColumnKey() const
Definition: Analyzer.h:198
std::shared_ptr< Analyzer::Expr > visitColumnVar(const Analyzer::ColumnVar *col_var) const override
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
Definition: sqldefs.h:81
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::unordered_set< InputColDescriptor > visitColumnVar(const Analyzer::ColumnVar *col_var) const override
#define CHECK(condition)
Definition: Logger.h:291
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:72
Executor * executor_