19 #include <boost/algorithm/cxx11/any_of.hpp>
29 bool operator()(std::shared_ptr<Analyzer::Expr>
const& qual) {
30 if (
auto oper = std::dynamic_pointer_cast<const Analyzer::BinOper>(qual)) {
40 std::vector<InnerOuterOrLoopQual>
result;
41 const auto lhs_tuple_expr =
43 const auto rhs_tuple_expr =
46 CHECK_EQ(static_cast<bool>(lhs_tuple_expr), static_cast<bool>(rhs_tuple_expr));
47 auto do_normalize_inner_outer_pair = [&
result, &condition](
52 auto inner_outer_pair =
57 std::make_pair(inner_outer_pair.first, inner_outer_pair.second),
false};
58 result.push_back(valid_qual);
61 result.push_back(invalid_qual);
65 const auto& lhs_tuple = lhs_tuple_expr->getTuple();
66 const auto& rhs_tuple = rhs_tuple_expr->getTuple();
67 CHECK_EQ(lhs_tuple.size(), rhs_tuple.size());
68 for (
size_t i = 0; i < lhs_tuple.size(); ++i) {
69 do_normalize_inner_outer_pair(
70 lhs_tuple[i].
get(), rhs_tuple[i].
get(),
executor_->getTemporaryTables());
89 if (dag_checker_res.first) {
90 VLOG(1) <<
"Stop DAG extraction (" << dag_checker_res.second <<
")";
94 auto& cached_dag = executor->getQueryPlanDagCache();
97 auto extracted_query_plan_dag = dag_extractor.getExtractedQueryPlanDagStr();
99 if (
auto sort_node = dynamic_cast<RelSort const*>(top_node)) {
103 auto child_dag = dag_extractor.getExtractedQueryPlanDagStr(1);
104 sort_node->getInput(0)->setQueryPlanDag(child_dag);
106 return {extracted_query_plan_dag, dag_extractor.isDagExtractionAvailable()};
111 std::optional<unsigned> left_deep_tree_id,
112 std::unordered_map<unsigned, JoinQualsPerNestingLevel> left_deep_tree_infos,
113 Executor* executor) {
119 auto& cached_dag = executor->getQueryPlanDagCache();
131 VLOG(1) <<
"Stop DAG extraction (Query plan dag cache reaches the maximum capacity)";
140 if (
auto table_func_node = dynamic_cast<const RelTableFunction*>(top_node)) {
141 for (
size_t i = 0; i < table_func_node->inputCount(); ++i) {
142 dag_extractor.
visit(table_func_node, table_func_node->getInput(i));
146 switch (num_child_node) {
151 if (
auto trans_join_node = dynamic_cast<const RelTranslatedJoin*>(top_node)) {
152 dag_extractor.
visit(trans_join_node, trans_join_node->getLHS());
153 dag_extractor.
visit(trans_join_node, trans_join_node->getRHS());
156 VLOG(1) <<
"Visit an invalid rel node while extracting query plan DAG: "
175 dag_extractor.
executor_->registerExtractedQueryPlanDag(
182 std::ostringstream oss;
188 if (cnt >= start_pos) {
189 oss << dag_node_id <<
"|";
197 std::optional<RelNodeId> retrieved_node_id) {
198 if (!retrieved_node_id) {
199 VLOG(1) <<
"Stop DAG extraction (Detect an invalid dag id)";
203 CHECK(retrieved_node_id.has_value());
211 std::optional<RelNodeId> retrieved_node_id) {
214 CHECK(retrieved_node_id.has_value());
228 for (
size_t i = 0; i < child_node->
inputCount(); i++) {
243 bool child_visited =
false;
245 if (
auto left_deep_joins = dynamic_cast<const RelLeftDeepInnerJoin*>(child_node)) {
248 VLOG(1) <<
"Stop DAG extraction (Detect non-supported join pattern)";
252 auto true_parent_node = parent_node;
253 std::shared_ptr<RelFilter> dummy_filter_node{
nullptr};
254 const auto inner_cond = left_deep_joins->getInnerCondition();
261 if (
auto cond = dynamic_cast<const RexOperator*>(inner_cond)) {
263 auto copied_inner_cond = copier.
visit(cond);
264 dummy_filter_node = std::make_shared<RelFilter>(copied_inner_cond);
266 true_parent_node = dummy_filter_node.get();
269 child_visited =
true;
270 }
else if (
auto translated_join_node =
271 dynamic_cast<const RelTranslatedJoin*>(child_node)) {
273 child_visited =
true;
276 if (!child_visited) {
288 CHECK(rel_trans_join);
307 auto fill_node_ids_to_dag_vec = [&](
const std::string& node_ids) {
308 auto node_ids_vec =
split(node_ids,
"|");
310 std::for_each(node_ids_vec.begin(),
311 std::prev(node_ids_vec.end()),
314 QueryPlanDAG current_plan_dag, after_rhs_visited, after_lhs_visited;
316 auto rhs_node = rel_trans_join->
getRHS();
317 std::unordered_set<size_t> rhs_input_keys, lhs_input_keys;
320 visit(rel_trans_join, rhs_node);
322 fill_node_ids_to_dag_vec(rhs_node->getQueryPlanDag());
328 auto lhs_node = rel_trans_join->
getLHS();
329 if (rel_trans_join->
getLHS()) {
331 visit(rel_trans_join, lhs_node);
333 fill_node_ids_to_dag_vec(lhs_node->getQueryPlanDag());
340 VLOG(1) <<
"Stop DAG extraction (Detect invalid query plan dag of join col(s))";
346 auto outer_table_identifier =
split(after_rhs_visited, current_plan_dag)[1];
347 auto hash_table_identfier =
split(after_lhs_visited, after_rhs_visited)[1];
350 auto inner_join_cols = rel_trans_join->
getJoinCols(
true);
351 auto inner_join_col_info =
353 boost::hash_combine(join_qual_info, inner_join_col_info);
354 auto outer_join_cols = rel_trans_join->
getJoinCols(
false);
355 auto outer_join_col_info =
357 boost::hash_combine(join_qual_info, outer_join_col_info);
359 std::unordered_set<size_t> collected_table_keys;
360 collected_table_keys.insert(lhs_input_keys.begin(), lhs_input_keys.end());
361 if (!inner_join_cols.empty() &&
362 inner_join_cols[0]->get_type_info().is_dict_encoded_type()) {
363 collected_table_keys.insert(rhs_input_keys.begin(), rhs_input_keys.end());
368 VLOG(2) <<
"Add hashtable access path"
369 <<
", inner join col info: " << inner_join_col_info
370 <<
" (access path: " << hash_table_identfier <<
")"
371 <<
", outer join col info: " << outer_join_col_info
372 <<
" (access path: " << outer_table_identifier <<
")";
379 std::move(collected_table_keys)));
382 VLOG(2) <<
"Add loop join access path, for LHS: " << outer_table_identifier
383 <<
", for RHS: " << hash_table_identfier <<
"\n";
398 for (
size_t input_idx = 0; input_idx < rel_left_deep_join->
inputCount(); ++input_idx) {
399 auto const input_node = rel_left_deep_join->
getInput(input_idx);
400 auto const scan_node =
dynamic_cast<const RelScan*
>(input_node);
403 target_table_key.
db_id = scan_node->getCatalog().getDatabaseId();
404 target_table_key.table_id = scan_node->getTableDescriptor()->tableId;
406 target_table_key.table_id = -1 * input_node->getId();
408 if (target_table_key == table_key) {
418 if (
auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(col_info)) {
433 CHECK(rel_left_deep_join);
440 auto left_deep_tree_id = rel_left_deep_join->
getId();
442 if (!left_deep_join_info) {
444 VLOG(1) <<
"Stop DAG extraction (Detect Non-supported join pattern)";
459 for (
size_t level_idx = 0; level_idx < left_deep_join_info->size(); ++level_idx) {
460 const auto& current_level_join_conditions = left_deep_join_info->at(level_idx);
461 std::vector<const Analyzer::ColumnVar*> inner_join_cols;
462 std::vector<const Analyzer::ColumnVar*> outer_join_cols;
463 std::vector<std::shared_ptr<const Analyzer::Expr>> filter_ops;
464 int inner_input_idx{-1};
465 int outer_input_idx{-1};
466 OpInfo op_info{
"UNDEFINED",
"UNDEFINED",
"UNDEFINED"};
467 std::unordered_set<std::string> visited_filter_ops;
470 const bool found_eq_join_qual =
473 const bool nested_loop = !found_eq_join_qual;
474 const bool is_left_join = current_level_join_conditions.type ==
JoinType::LEFT;
482 bool is_geo_join{
false};
483 for (
const auto& join_qual : current_level_join_conditions.quals) {
484 auto qual_bin_oper = std::dynamic_pointer_cast<
const Analyzer::BinOper>(join_qual);
487 is_geo_join = qual_bin_oper->is_bbox_intersect_oper();
488 if (join_qual == current_level_join_conditions.quals.front()) {
490 op_info = OpInfo{
::toString(qual_bin_oper->get_optype()),
491 ::
toString(qual_bin_oper->get_qualifier()),
492 qual_bin_oper->get_type_info().to_string()};
499 if (!found_eq_join_qual && (is_left_join || col_pair_info.loop_join_qual)) {
505 if (visited_filter_ops.insert(join_qual_str).second) {
506 filter_ops.push_back(join_qual);
511 bool found_valid_col_vars =
false;
512 std::vector<const Analyzer::ColumnVar*> lhs_cvs, rhs_cvs;
513 if (col_pair_info.inner_outer.first && col_pair_info.inner_outer.second) {
515 if (
auto range_oper = dynamic_cast<const Analyzer::RangeOper*>(
516 col_pair_info.inner_outer.second)) {
517 lhs_cvs =
getColVar(range_oper->get_left_operand());
518 rhs_cvs =
getColVar(col_pair_info.inner_outer.first);
522 lhs_cvs =
getColVar(col_pair_info.inner_outer.first);
523 rhs_cvs =
getColVar(col_pair_info.inner_outer.second);
525 if (!lhs_cvs.empty() && !rhs_cvs.empty()) {
526 found_valid_col_vars =
true;
527 if (inner_input_idx == -1) {
529 get_input_idx(rel_left_deep_join, lhs_cvs.front()->getTableKey());
531 if (outer_input_idx == -1) {
533 get_input_idx(rel_left_deep_join, rhs_cvs.front()->getTableKey());
536 lhs_cvs.begin(), lhs_cvs.end(), std::back_inserter(inner_join_cols));
538 rhs_cvs.begin(), rhs_cvs.end(), std::back_inserter(outer_join_cols));
541 if (!found_valid_col_vars &&
542 visited_filter_ops.insert(join_qual_str).second) {
543 filter_ops.push_back(join_qual);
548 if (visited_filter_ops.insert(join_qual_str).second) {
549 filter_ops.push_back(join_qual);
553 if (!is_geo_join && (inner_join_cols.size() != outer_join_cols.size())) {
554 VLOG(1) <<
"Stop DAG extraction (Detect inner/outer col mismatch)";
574 if (inner_input_idx != -1 && outer_input_idx != -1) {
575 lhs = rel_left_deep_join->
getInput(inner_input_idx);
576 rhs = rel_left_deep_join->
getInput(outer_input_idx);
578 if (level_idx == 0) {
579 lhs = rel_left_deep_join->
getInput(0);
580 rhs = rel_left_deep_join->
getInput(1);
583 rhs = rel_left_deep_join->
getInput(level_idx + 1);
588 auto cur_translated_join_node =
589 std::make_shared<RelTranslatedJoin>(lhs,
596 current_level_join_conditions.type,
600 CHECK(cur_translated_join_node);
608 boost::hash_combine(cache_key, sort_info.
hashLimit());
bool isNestedLoopQual() const
std::optional< RelNodeId > addNodeIfAbsent(const RelAlgNode *)
#define IS_EQUIVALENCE(X)
void connectNodes(const RelNodeId parent_id, const RelNodeId child_id)
const RexScalar * getOuterCondition(const size_t nesting_level) const
void setQueryPlanDag(const std::string &extracted_query_plan_dag) const
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
const Expr * get_right_operand() const
void setRelNodeDagId(const size_t id) const
static std::pair< bool, std::string > hasNonSupportedNodeInDag(const RelAlgNode *rel_alg_node)
std::vector< const Analyzer::ColumnVar * > getJoinCols(bool lhs) const
static std::unordered_set< size_t > getScanNodeTableKey(RelAlgNode const *rel_alg_node)
size_t getQueryPlanDagHash() const
virtual T visit(const RexScalar *rex_scalar) const
const RelAlgNode * getRHS() const
size_t translateColVarsToInfoHash(std::vector< const Analyzer::ColumnVar * > &col_vars, bool col_id_only) const
DEVICE auto copy(ARGS &&...args)
bool is_bbox_intersect_oper() const
std::unique_lock< T > unique_lock
const RelAlgNode * getInput(const size_t idx) const
std::string toString(const Executor::ExtModuleKinds &kind)
const RelAlgNode * getLHS() const
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
constexpr char const * EMPTY_QUERY_PLAN
static std::pair< InnerOuter, InnerOuterStringOpInfos > normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const TemporaryTables *temporary_tables, const bool is_bbox_intersect=false)
const Expr * get_left_operand() const
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
const size_t inputCount() const
size_t getRelNodeDagId() const
unsigned node_id(const rapidjson::Value &ra_node) noexcept
std::vector< const Analyzer::ColumnVar * > collectColVars(const Analyzer::Expr *target)
int get_input_idx(RelAlgExecutionUnit const &ra_exe_unit, const shared::TableKey &outer_table_key)