OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutionDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
20 #include "QueryEngine/RelAlgDag.h"
21 
22 #include <boost/graph/topological_sort.hpp>
23 
24 #include <algorithm>
25 
27  : filter_push_down_enabled_(false)
28  , success_(true)
29  , execution_time_ms_(0)
30  , type_(QueryResult) {}
31 
32 ExecutionResult::ExecutionResult(const std::shared_ptr<ResultSet>& rows,
33  const std::vector<TargetMetaInfo>& targets_meta)
34  : result_(rows)
35  , targets_meta_(targets_meta)
36  , filter_push_down_enabled_(false)
37  , success_(true)
38  , execution_time_ms_(0)
39  , type_(QueryResult) {}
40 
42  const std::vector<TargetMetaInfo>& targets_meta)
43  : result_(std::move(result))
44  , targets_meta_(targets_meta)
45  , filter_push_down_enabled_(false)
46  , success_(true)
47  , execution_time_ms_(0)
48  , type_(QueryResult) {}
49 
51  : targets_meta_(that.targets_meta_)
52  , pushed_down_filter_info_(that.pushed_down_filter_info_)
53  , filter_push_down_enabled_(that.filter_push_down_enabled_)
54  , success_(that.success_)
55  , execution_time_ms_(that.execution_time_ms_)
56  , type_(that.type_) {
57  if (!pushed_down_filter_info_.empty() ||
59  return;
60  }
61  result_ = that.result_;
62 }
63 
65  : targets_meta_(std::move(that.targets_meta_))
66  , pushed_down_filter_info_(std::move(that.pushed_down_filter_info_))
67  , filter_push_down_enabled_(std::move(that.filter_push_down_enabled_))
68  , success_(that.success_)
69  , execution_time_ms_(that.execution_time_ms_)
70  , type_(that.type_) {
71  if (!pushed_down_filter_info_.empty() ||
73  return;
74  }
75  result_ = std::move(that.result_);
76 }
77 
79  const std::vector<PushedDownFilterInfo>& pushed_down_filter_info,
80  bool filter_push_down_enabled)
81  : pushed_down_filter_info_(pushed_down_filter_info)
82  , filter_push_down_enabled_(filter_push_down_enabled)
83  , success_(true)
84  , execution_time_ms_(0)
85  , type_(QueryResult) {}
86 
88  if (!that.pushed_down_filter_info_.empty() ||
89  (that.filter_push_down_enabled_ && that.pushed_down_filter_info_.empty())) {
92  return *this;
93  }
94  result_ = that.result_;
96  success_ = that.success_;
98  type_ = that.type_;
99  return *this;
100 }
101 
102 const std::vector<PushedDownFilterInfo>& ExecutionResult::getPushedDownFilterInfo()
103  const {
105 }
106 
107 void ExecutionResult::updateResultSet(const std::string& query,
108  RType type,
109  bool success) {
110  targets_meta_.clear();
111  pushed_down_filter_info_.clear();
112  success_ = success;
113  type_ = type;
114  result_ = std::make_shared<ResultSet>(query);
115 }
116 
118  if (!empty()) {
119  return getRows()->getExplanation();
120  }
121  return {};
122 }
123 
124 void RaExecutionDesc::setResult(const ExecutionResult& result) {
125  result_ = result;
126  body_->setContextData(this);
127 }
128 
130  return body_;
131 }
132 
133 namespace {
134 
136  std::unordered_map<const RelAlgNode*, int>& node_ptr_to_vert_idx) {
137  DAG graph(1);
138  graph[0] = sink;
139  node_ptr_to_vert_idx.emplace(std::make_pair(sink, 0));
140  std::vector<const RelAlgNode*> stack(1, sink);
141  while (!stack.empty()) {
142  const auto node = stack.back();
143  stack.pop_back();
144  if (dynamic_cast<const RelScan*>(node)) {
145  continue;
146  }
147 
148  const auto input_num = node->inputCount();
149  switch (input_num) {
150  case 0:
151  CHECK(dynamic_cast<const RelLogicalValues*>(node) ||
152  dynamic_cast<const RelTableFunction*>(node));
153  case 1:
154  break;
155  case 2:
156  CHECK(dynamic_cast<const RelJoin*>(node) ||
157  dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
158  dynamic_cast<const RelLogicalUnion*>(node) ||
159  dynamic_cast<const RelTableFunction*>(node));
160  break;
161  default:
162  CHECK(dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
163  dynamic_cast<const RelLogicalUnion*>(node) ||
164  dynamic_cast<const RelTableFunction*>(node));
165  }
166  for (size_t i = 0; i < input_num; ++i) {
167  const auto input = node->getInput(i);
168  CHECK(input);
169  const bool visited = node_ptr_to_vert_idx.count(input) > 0;
170  if (!visited) {
171  node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
172  }
173  boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
174  if (!visited) {
175  graph[node_ptr_to_vert_idx[input]] = input;
176  stack.push_back(input);
177  }
178  }
179  }
180  return graph;
181 }
182 
183 std::unordered_set<Vertex> get_join_vertices(const std::vector<Vertex>& vertices,
184  const DAG& graph) {
185  std::unordered_set<Vertex> joins;
186  for (const auto vert : vertices) {
187  if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
188  joins.insert(vert);
189  continue;
190  }
191  if (!dynamic_cast<const RelJoin*>(graph[vert])) {
192  continue;
193  }
194  if (boost::out_degree(vert, graph) > 1) {
195  throw std::runtime_error("Join used more than once not supported yet");
196  }
197  auto [oe_iter, oe_end] = boost::out_edges(vert, graph);
198  CHECK(std::next(oe_iter) == oe_end);
199  const auto out_vert = boost::target(*oe_iter, graph);
200  if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
201  joins.insert(vert);
202  }
203  }
204  return joins;
205 }
206 
207 } // namespace
208 
210  Executor* executor,
211  const bool build_sequence) {
212  CHECK(sink);
213  CHECK(executor);
214  if (dynamic_cast<const RelScan*>(sink) || dynamic_cast<const RelJoin*>(sink)) {
215  throw std::runtime_error("Query not supported yet");
216  }
217  executor_ = executor;
219 
220  boost::topological_sort(graph_, std::back_inserter(ordering_));
221  std::reverse(ordering_.begin(), ordering_.end());
222 
223  ordering_ = mergeSortWithInput(ordering_, graph_);
224  joins_ = get_join_vertices(ordering_, graph_);
225 
226  if (build_sequence) {
227  while (next()) {
228  // noop
229  }
233  skipQuerySteps();
234  }
235  }
236 }
237 
238 RaExecutionSequence::RaExecutionSequence(std::unique_ptr<RaExecutionDesc> exec_desc) {
239  descs_.emplace_back(std::move(exec_desc));
240 }
241 
243  auto checkQueryStepSkippable =
244  [&](RelAlgNode const* node, QueryPlanHash key, size_t step_id) {
245  CHECK_GE(step_id, 0);
246  if (executor_->getResultSetRecyclerHolder().hasCachedQueryResultSet(key)) {
247  cached_query_steps_.insert(step_id);
248  cached_resultset_keys_.emplace(-node->getId(), key);
249  const auto output_meta_info =
250  executor_->getResultSetRecyclerHolder().getOutputMetaInfo(key);
251  CHECK(output_meta_info);
252  node->setOutputMetainfo(*output_meta_info);
253  }
254  };
255  while (current_vertex_ < ordering_.size()) {
256  auto vert = ordering_[current_vertex_++];
257  if (joins_.count(vert)) {
258  continue;
259  }
260  auto& node = graph_[vert];
261  CHECK(node);
262  if (dynamic_cast<const RelScan*>(node)) {
263  scan_count_++;
264  continue;
265  }
266  descs_.emplace_back(std::make_unique<RaExecutionDesc>(node));
267  auto logical_union = dynamic_cast<const RelLogicalUnion*>(node);
268  if (logical_union) {
269  has_step_for_union_ = true;
270  }
271  auto extracted_query_plan_dag =
274  !boost::iequals(extracted_query_plan_dag.extracted_dag, EMPTY_QUERY_PLAN)) {
275  SortInfo sort_info;
276  if (auto sort_node = dynamic_cast<const RelSort*>(node)) {
277  sort_info = SortInfo::createFromSortNode(sort_node);
278  }
280  node->getQueryPlanDagHash(), sort_info);
281  checkQueryStepSkippable(node, cache_key, descs_.size() - 1);
282  }
283  return descs_.back().get();
284  }
285  return nullptr;
286 }
287 
289  const std::vector<Vertex>& vertices,
290  const DAG& graph) {
291  DAG::in_edge_iterator ie_iter, ie_end;
292  std::unordered_set<Vertex> inputs;
293  for (const auto vert : vertices) {
294  if (const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
295  boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
296  CHECK(size_t(1) == sort->inputCount() && boost::next(ie_iter) == ie_end);
297  if (sort->isLimitDelivered()) {
298  has_limit_clause_ = true;
299  }
300  const auto in_vert = boost::source(*ie_iter, graph);
301  const auto input = graph[in_vert];
302  if (dynamic_cast<const RelScan*>(input)) {
303  throw std::runtime_error("Standalone sort not supported yet");
304  }
305  if (boost::out_degree(in_vert, graph) > 1) {
306  throw std::runtime_error("Sort's input node used by others not supported yet");
307  }
308  inputs.insert(in_vert);
309  }
310  }
311 
312  std::vector<Vertex> new_vertices;
313  for (const auto vert : vertices) {
314  if (inputs.count(vert)) {
315  continue;
316  }
317  new_vertices.push_back(vert);
318  }
319  return new_vertices;
320 }
321 
323  if (descs_.empty()) {
324  return nullptr;
325  }
326  if (descs_.size() == 1) {
327  return nullptr;
328  }
329  CHECK_GE(descs_.size(), size_t(2));
330  auto itr = descs_.rbegin();
331  return (++itr)->get();
332 }
333 
335  const auto pushChildNodes = [&](auto& stack, const auto node_id) {
336  auto [in_edge_iter, in_edge_end] = boost::in_edges(node_id, graph_);
337  while (in_edge_iter != in_edge_end) {
338  const auto child_node_id = boost::source(*in_edge_iter, graph_);
339  stack.push(graph_[child_node_id]);
340  std::advance(in_edge_iter, 1);
341  }
342  };
343  auto top_node_id = descs_.size() - 1;
344  auto top_res = skippable_steps_.emplace(top_node_id, std::unordered_set<int>{});
345  CHECK(top_res.second);
346  for (auto it = descs_.begin(); it != std::prev(descs_.end()); ++it) {
347  const auto step = it->get();
348  const auto body = step->getBody();
349  const auto step_id = static_cast<int>(std::distance(descs_.begin(), it));
350  auto res = skippable_steps_.emplace(step_id, std::unordered_set<int>{});
351  CHECK(res.second);
352  skippable_steps_[top_node_id].insert(step_id); // top-desc can skip all child descs
353  std::stack<const RelAlgNode*> stack;
354  pushChildNodes(stack, node_ptr_to_vert_idx_[body]);
355  while (!stack.empty()) {
356  auto child_node = stack.top();
357  stack.pop();
358  // descs_ is based on the topologically sorted (flattened) DAG, so we can limit the
359  // search range for child descs
360  auto is_desc_body = std::find_if(
361  descs_.begin(), it, [&child_node](std::unique_ptr<RaExecutionDesc>& ptr) {
362  return ptr->getBody() == child_node;
363  });
364  if (is_desc_body != it) {
365  // due to the topological sorting of query plan DAG, we can avoid visiting "all"
366  // child nodes
367  const auto child_step_id =
368  static_cast<int>(std::distance(descs_.begin(), is_desc_body));
369  skippable_steps_[step_id].insert(child_step_id);
370  skippable_steps_[step_id].insert(skippable_steps_[child_step_id].begin(),
371  skippable_steps_[child_step_id].end());
372  } else {
373  pushChildNodes(stack, node_ptr_to_vert_idx_[child_node]);
374  }
375  }
376  }
377 }
378 
380  CHECK_LE(cached_query_steps_.size(), descs_.size());
381 
382  // extract skippable query steps`
383  std::unordered_set<int> skippable_query_steps;
384  for (const auto cached_step_id : cached_query_steps_) {
385  auto it = skippable_steps_.find(cached_step_id);
386  CHECK(it != skippable_steps_.end());
387  const auto& child_steps = it->second;
388  std::for_each(
389  child_steps.begin(), child_steps.end(), [&](const auto& skippable_step_id) {
390  if (skippable_step_id != cached_step_id) {
391  skippable_query_steps.insert(skippable_step_id);
392  }
393  });
394  }
395 
396  // modify query step sequence based on the skippable query steps
397  if (!skippable_query_steps.empty()) {
398  std::vector<std::unique_ptr<RaExecutionDesc>> new_descs;
399  for (size_t step_id = 0; step_id < descs_.size(); ++step_id) {
400  const auto body = descs_[step_id]->getBody();
401  if (!skippable_query_steps.count(step_id)) {
402  new_descs.push_back(std::make_unique<RaExecutionDesc>(body));
403  }
404  }
405  const auto prev_num_steps = descs_.size();
406  if (!new_descs.empty()) {
407  std::swap(descs_, new_descs);
408  }
409  VLOG(1) << "Skip " << prev_num_steps - descs_.size() << " query steps from "
410  << prev_num_steps << " steps";
411  }
412 
413  for (const auto& desc : descs_) {
414  // remove cached resultset info for each desc since
415  // it is not skipped
416  auto body = desc->getBody();
417  auto it = cached_resultset_keys_.find(-body->getId());
418  if (it != cached_resultset_keys_.end()) {
419  cached_resultset_keys_.erase(it);
420  }
421  }
422 }
423 
424 std::optional<size_t> RaExecutionSequence::nextStepId(const bool after_broadcast) const {
425  if (after_broadcast) {
426  if (current_vertex_ == ordering_.size()) {
427  return std::nullopt;
428  }
429  return descs_.size() + stepsToNextBroadcast();
430  } else if (current_vertex_ == ordering_.size()) {
431  return std::nullopt;
432  } else {
433  return descs_.size();
434  }
435 }
436 
438  if (current_vertex_ == ordering_.size()) {
439  // All descriptors visited, execution finished
440  return true;
441  } else {
442  const auto next_step_id = nextStepId(true);
443  if (!next_step_id || (*next_step_id == totalDescriptorsCount())) {
444  // One step remains (the current vertex), or all remaining steps can be executed
445  // without another broadcast (i.e. on the aggregator)
446  return true;
447  }
448  }
449  return false;
450 }
451 
452 namespace {
453 struct MatchBody {
454  unsigned const body_id_;
455  bool operator()(std::unique_ptr<RaExecutionDesc> const& desc) const {
456  return desc->getBody()->getId() == body_id_;
457  }
458 };
459 } // namespace
460 
461 // Search for RaExecutionDesc* by body, starting at start_idx and decrementing to 0.
463  unsigned const body_id,
464  size_t const start_idx) const {
465  CHECK_LT(start_idx, descs_.size());
466  auto const from_end = descs_.size() - (start_idx + 1);
467  MatchBody const match_body{body_id};
468  auto const itr = std::find_if(descs_.rbegin() + from_end, descs_.rend(), match_body);
469  return itr == descs_.rend() ? nullptr : itr->get();
470 }
471 
473  size_t num_descriptors = 0;
474  size_t crt_vertex = 0;
475  while (crt_vertex < ordering_.size()) {
476  auto vert = ordering_[crt_vertex++];
477  if (joins_.count(vert)) {
478  continue;
479  }
480  auto& node = graph_[vert];
481  CHECK(node);
482  if (dynamic_cast<const RelScan*>(node)) {
483  continue;
484  }
485  ++num_descriptors;
486  }
487  return num_descriptors;
488 }
489 
491  size_t steps_to_next_broadcast = 0;
492  auto crt_vertex = current_vertex_;
493  while (crt_vertex < ordering_.size()) {
494  auto vert = ordering_[crt_vertex++];
495  auto node = graph_[vert];
496  CHECK(node);
497  if (joins_.count(vert)) {
498  auto join_node = dynamic_cast<const RelLeftDeepInnerJoin*>(node);
499  CHECK(join_node);
500  for (size_t i = 0; i < join_node->inputCount(); i++) {
501  const auto input = join_node->getInput(i);
502  if (dynamic_cast<const RelScan*>(input)) {
503  return steps_to_next_broadcast;
504  }
505  }
506  if (crt_vertex < ordering_.size() - 1) {
507  // Force the parent node of the RelLeftDeepInnerJoin to run on the aggregator.
508  // Note that crt_vertex has already been incremented once above for the join node
509  // -- increment it again to account for the parent node of the join
510  ++steps_to_next_broadcast;
511  ++crt_vertex;
512  continue;
513  } else {
514  CHECK_EQ(crt_vertex, ordering_.size() - 1);
515  // If the join node parent is the last node in the tree, run all remaining steps
516  // on the aggregator
517  return ++steps_to_next_broadcast;
518  }
519  }
520  if (auto sort = dynamic_cast<const RelSort*>(node)) {
521  CHECK_EQ(sort->inputCount(), size_t(1));
522  node = sort->getInput(0);
523  }
524  if (dynamic_cast<const RelScan*>(node)) {
525  return steps_to_next_broadcast;
526  }
527  if (dynamic_cast<const RelModify*>(node)) {
528  // Modify runs on the leaf automatically, run the same node as a noop on the
529  // aggregator
530  ++steps_to_next_broadcast;
531  continue;
532  }
533  if (auto project = dynamic_cast<const RelProject*>(node)) {
534  if (project->hasWindowFunctionExpr()) {
535  ++steps_to_next_broadcast;
536  continue;
537  }
538  }
539  for (size_t input_idx = 0; input_idx < node->inputCount(); input_idx++) {
540  if (dynamic_cast<const RelScan*>(node->getInput(input_idx))) {
541  return steps_to_next_broadcast;
542  }
543  }
544  ++steps_to_next_broadcast;
545  }
546  return steps_to_next_broadcast;
547 }
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
Definition: RelAlgDag.h:850
#define CHECK_EQ(x, y)
Definition: Logger.h:301
static ExtractedQueryPlanDag extractQueryPlanDag(const RelAlgNode *top_node, Executor *executor)
bool g_use_query_resultset_cache
Definition: Execute.cpp:160
std::vector< Vertex > mergeSortWithInput(const std::vector< Vertex > &vertices, const DAG &graph)
static size_t applyLimitClauseToCacheKey(size_t cache_key, SortInfo const &sort_info)
bool g_allow_query_step_skipping
Definition: Execute.cpp:163
ExecutionResult & operator=(const ExecutionResult &that)
const RelAlgNode * body_
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::shared_ptr< ResultSet > ResultSetPtr
std::unordered_set< Vertex > joins_
static SortInfo createFromSortNode(const RelSort *sort_node)
bool g_enable_data_recycler
Definition: Execute.cpp:158
bool operator()(std::unique_ptr< RaExecutionDesc > const &desc) const
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:863
tuple rows
Definition: report.py:114
DAG build_dag(const RelAlgNode *sink, std::unordered_map< const RelAlgNode *, int > &node_ptr_to_vert_idx)
std::unordered_set< int > cached_query_steps_
unsigned getId() const
Definition: RelAlgDag.h:869
std::vector< std::unique_ptr< RaExecutionDesc > > descs_
const std::shared_ptr< ResultSet > & getRows() const
void updateResultSet(const std::string &query_ra, RType type, bool success=true)
void setContextData(const RaExecutionDesc *context_data) const
Definition: RelAlgDag.h:845
bool g_enable_smem_group_by true
std::vector< PushedDownFilterInfo > pushed_down_filter_info_
#define CHECK_LT(x, y)
Definition: Logger.h:303
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::unordered_map< const RelAlgNode *, int > node_ptr_to_vert_idx_
RaExecutionSequence(const RelAlgNode *, Executor *, const bool build_sequence=true)
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
std::unordered_set< Vertex > get_join_vertices(const std::vector< Vertex > &vertices, const DAG &graph)
std::vector< Vertex > ordering_
size_t QueryPlanHash
bool g_enable_watchdog false
Definition: Execute.cpp:80
constexpr char const * EMPTY_QUERY_PLAN
#define CHECK(condition)
Definition: Logger.h:291
const RelAlgNode * getBody() const
bool g_cluster
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
unsigned node_id(const rapidjson::Value &ra_node) noexcept
Definition: RelAlgDag.cpp:973
std::vector< TargetMetaInfo > targets_meta_
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
RaExecutionDesc * getDescriptorByBodyId(unsigned const body_id, size_t const start_idx) const
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
std::unordered_map< int, std::unordered_set< int > > skippable_steps_
std::unordered_map< int, QueryPlanHash > cached_resultset_keys_
#define VLOG(n)
Definition: Logger.h:388
std::optional< size_t > nextStepId(const bool after_broadcast) const