33 std::vector<const Analyzer::ColumnVar*>
const& geo_args,
35 auto it = std::find_if(
37 return cv->getTableKey() == table_key;
39 return it == geo_args.end() ?
nullptr : *it;
50 ST_CONTAIN_FORCE_TABLE_REORDERING_TARGET_FUNC.begin(),
53 [target_func_name](std::string_view func_name) {
54 return target_func_name == func_name;
60 ST_INTERSECTS_FORCE_TABLE_REORDERING_TARGET_FUNC.begin(),
63 [target_func_name](std::string_view func_name) {
64 return target_func_name == func_name;
72 std::string
const& geo_func_name,
73 const std::vector<InputTableInfo>& table_infos) {
79 auto const inner_poly_outer_pt_pair =
80 shared::is_any<kPOLYGON, kMULTIPOLYGON>(inner_type) && outer_type ==
kPOINT;
81 auto const outer_poly_inner_pt_pair =
82 shared::is_any<kPOLYGON, kMULTIPOLYGON>(outer_type) && inner_type ==
kPOINT;
83 auto const force_swap_st_contains =
85 auto const force_swap_st_intersects =
89 for (
size_t i = 0; i < table_infos.size(); i++) {
90 if (table_infos[i].table_key == inner_arg_key) {
92 }
else if (table_infos[i].table_key == outer_arg_key) {
96 size_t first_listed_idx = std::min(inner_idx, outer_idx);
97 size_t first_listed_card = table_infos[first_listed_idx].info.getNumTuples();
98 size_t last_listed_idx = std::max(inner_idx, outer_idx);
99 size_t last_listed_card = table_infos[last_listed_idx].info.getNumTuples();
100 if (first_listed_card > last_listed_card) {
101 if (inner_arg_key == table_infos[first_listed_idx].table_key) {
103 return inner_poly_outer_pt_pair &&
104 (force_swap_st_contains || force_swap_st_intersects);
107 CHECK_EQ(outer_arg_key, table_infos[first_listed_idx].table_key);
108 return outer_poly_inner_pt_pair && force_swap_st_intersects;
120 const std::vector<InputTableInfo>& table_infos,
121 const Executor* executor) {
124 geo_func_finder.
visit(qual);
126 auto const inner_table_key = (*table_key_pair).inner_table_key;
127 auto const outer_table_key = (*table_key_pair).outer_table_key;
130 CHECK_NE(inner_table_key, outer_table_key);
132 auto const inner_table_cardinality =
134 auto const outer_table_cardinality =
136 auto inner_qual_decision = inner_table_cardinality > outer_table_cardinality
146 bool needs_table_reordering = inner_table_cardinality < outer_table_cardinality;
147 const auto outer_inner_card_ratio =
148 outer_table_cardinality /
static_cast<double>(inner_table_cardinality);
150 target_geo_func_name) ||
152 target_geo_func_name)) {
159 if (inner_cv->get_rte_idx() == 0 &&
160 (inner_cv->get_type_info().get_type() ==
kPOINT)) {
162 if (needs_table_reordering && outer_inner_card_ratio > 10.0 &&
163 inner_table_cardinality < 10000) {
169 VLOG(1) <<
"Force loop-join to avoid unexpected overhead of building large "
180 const auto outer_cv =
183 auto const inner_type = inner_cv->get_type_info().get_type();
184 auto const outer_type = outer_cv->get_type_info().get_type();
191 VLOG(1) <<
"Force reordering tables to enable a hash join for "
197 if (needs_table_reordering) {
200 VLOG(1) <<
"Try to reorder tables based on table cardinality";
214 VLOG(2) <<
"Detect geo join operator, initial_inner_table(db_id: "
215 << inner_table_key.db_id <<
", table_id: " << inner_table_key.table_id
216 <<
"), cardinality: " << inner_table_cardinality
217 <<
"), initial_outer_table(db_id: " << outer_table_key.db_id
218 <<
", table_id: " << outer_table_key.table_id
219 <<
"), cardinality: " << outer_table_cardinality
220 <<
"), inner_qual_decision: " << inner_qual_decision;
221 return {200, 200, inner_qual_decision};
227 std::vector<SQLTypes> geo_types_for_func;
228 for (
size_t i = 0; i < func_oper->getArity(); i++) {
229 const auto arg_expr = func_oper->
getArg(i);
232 geo_types_for_func.push_back(ti.get_type());
235 std::regex geo_func_regex(
"ST_[\\w]*");
236 std::smatch geo_func_match;
237 const auto& func_name = func_oper->getName();
238 if (geo_types_for_func.size() == 2 &&
239 std::regex_match(func_name, geo_func_match, geo_func_regex)) {
256 const auto normalized_bin_oper =
258 const auto& inner_outer = normalized_bin_oper.first;
260 auto lhs = bin_oper->get_left_operand();
261 if (
auto lhs_tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(
262 bin_oper->get_left_operand())) {
263 lhs = lhs_tuple->getTuple().front().get();
266 if (lhs == inner_outer.front().first) {
268 }
else if (lhs == inner_outer.front().second) {
274 return {200, 200, inner_qual_decision};
277 return {100, 100, inner_qual_decision};
283 const std::vector<InputTableInfo>& table_infos,
284 const Executor* executor,
285 std::vector<std::map<node_t, InnerQualDecision>>& qual_detection_res) {
286 CHECK_EQ(left_deep_join_quals.size() + 1, table_infos.size());
287 std::vector<std::map<node_t, cost_t>> join_cost_graph(table_infos.size());
291 for (
const auto& current_level_join_conditions : left_deep_join_quals) {
292 for (
const auto& qual : current_level_join_conditions.quals) {
293 std::set<int> qual_nest_levels = visitor.
visit(qual.get());
294 if (qual_nest_levels.size() != 2) {
297 int lhs_nest_level = *qual_nest_levels.begin();
299 qual_nest_levels.erase(qual_nest_levels.begin());
300 int rhs_nest_level = *qual_nest_levels.begin();
304 qual_detection_res[lhs_nest_level][rhs_nest_level] = std::get<2>(qual_costing);
305 qual_detection_res[rhs_nest_level][lhs_nest_level] = std::get<2>(qual_costing);
306 const auto edge_it = join_cost_graph[lhs_nest_level].find(rhs_nest_level);
307 auto rhs_cost = std::get<1>(qual_costing);
308 if (edge_it == join_cost_graph[lhs_nest_level].end() ||
309 edge_it->second > rhs_cost) {
310 auto lhs_cost = std::get<0>(qual_costing);
311 join_cost_graph[lhs_nest_level][rhs_nest_level] = rhs_cost;
312 join_cost_graph[rhs_nest_level][lhs_nest_level] = lhs_cost;
316 return join_cost_graph;
329 for (
auto& inbound_for_node : inbound_) {
330 inbound_for_node.erase(from);
336 std::unordered_set<node_t> roots;
337 for (
node_t candidate = 0; candidate < inbound_.size(); ++candidate) {
338 if (inbound_[candidate].empty()) {
339 roots.insert(candidate);
358 const std::vector<std::map<node_t, cost_t>>& join_cost_graph) {
364 for (
size_t level_idx = 0; level_idx < left_deep_join_quals.size(); ++level_idx) {
366 dependency_tracking.
addEdge(level_idx, level_idx + 1);
369 return dependency_tracking;
376 const std::vector<std::map<node_t, cost_t>>& join_cost_graph,
377 const std::vector<InputTableInfo>& table_infos,
378 const std::function<
bool(
const node_t lhs_nest_level,
const node_t rhs_nest_level)>&
382 std::vector<std::map<node_t, InnerQualDecision>>& qual_normalization_res) {
383 std::vector<node_t> all_nest_levels(table_infos.size());
384 std::iota(all_nest_levels.begin(), all_nest_levels.end(), 0);
385 std::vector<node_t> input_permutation;
386 std::unordered_set<node_t> visited;
387 auto dependency_tracking =
389 auto schedulable_node = [&dependency_tracking, &visited](
const node_t node) {
390 const auto nodes_ready = dependency_tracking.getRoots();
391 return nodes_ready.find(node) != nodes_ready.end() &&
392 visited.find(node) == visited.end();
394 while (visited.size() < table_infos.size()) {
396 std::vector<node_t> remaining_nest_levels;
397 std::copy_if(all_nest_levels.begin(),
398 all_nest_levels.end(),
399 std::back_inserter(remaining_nest_levels),
401 CHECK(!remaining_nest_levels.empty());
403 const auto start_it = std::max_element(
404 remaining_nest_levels.begin(), remaining_nest_levels.end(), compare_node);
405 CHECK(start_it != remaining_nest_levels.end());
406 std::priority_queue<TraversalEdge, std::vector<TraversalEdge>, decltype(compare_edge)>
407 worklist(compare_edge);
418 if (remaining_nest_levels.size() == 2 && qual_normalization_res[start].size() == 1) {
419 auto inner_qual_decision = qual_normalization_res[start].begin()->second;
420 auto join_qual = left_deep_join_quals.begin()->quals;
423 bool (*)(
const Analyzer::ColumnVar*,
const Analyzer::ColumnVar*)>;
425 auto set_new_rte_idx = [](ColvarSet& cv_set,
int new_rte) {
427 cv_set.begin(), cv_set.end(), [new_rte](
const Analyzer::ColumnVar* cv) {
428 const_cast<Analyzer::ColumnVar*
>(cv)->set_rte_idx(new_rte);
438 auto analyze_join_qual = [&start,
439 &remaining_nest_levels,
440 &inner_qual_decision,
442 compare_node](
const std::shared_ptr<Analyzer::Expr>& lhs,
443 ColvarSet& lhs_colvar_set,
444 const std::shared_ptr<Analyzer::Expr>& rhs,
445 ColvarSet& rhs_colvar_set) {
446 if (!lhs || !rhs || lhs_colvar_set.empty() || rhs_colvar_set.empty()) {
447 return std::make_pair(Decision::IGNORE, start);
450 auto alternative_it = std::find_if(
451 remaining_nest_levels.begin(),
452 remaining_nest_levels.end(),
453 [start](
const size_t nest_level) {
return start != nest_level; });
454 CHECK(alternative_it != remaining_nest_levels.end());
455 auto alternative_rte = *alternative_it;
457 Decision decision = Decision::IGNORE;
461 bool is_outer_col_valid =
false;
462 auto check_expr_is_valid_col = [&is_outer_col_valid](
const Analyzer::Expr* expr) {
463 if (
auto expr_tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr)) {
464 for (
auto& inner_expr : expr_tuple->getTuple()) {
466 HashJoin::getHashJoinColumn<Analyzer::ColumnVar>(inner_expr.get());
468 is_outer_col_valid =
false;
473 auto cv_from_expr = HashJoin::getHashJoinColumn<Analyzer::ColumnVar>(expr);
475 is_outer_col_valid =
false;
479 is_outer_col_valid =
true;
482 inner_rte = (*lhs_colvar_set.begin())->get_rte_idx();
483 outer_rte = (*rhs_colvar_set.begin())->get_rte_idx();
484 check_expr_is_valid_col(rhs.get());
486 inner_rte = (*rhs_colvar_set.begin())->get_rte_idx();
487 outer_rte = (*lhs_colvar_set.begin())->get_rte_idx();
488 check_expr_is_valid_col(lhs.get());
490 if (inner_rte >= 0 && outer_rte >= 0) {
491 const auto inner_cardinality =
492 table_infos[inner_rte].info.getNumTuplesUpperBound();
493 const auto outer_cardinality =
494 table_infos[outer_rte].info.getNumTuplesUpperBound();
496 if (inner_rte == static_cast<int>(start)) {
501 decision = is_outer_col_valid && inner_cardinality > outer_cardinality
505 CHECK_EQ(inner_rte, static_cast<int>(alternative_rte));
507 if (compare_node(inner_rte, start)) {
510 decision = Decision::IGNORE;
514 decision = Decision::KEEP;
520 if (decision == Decision::KEEP) {
521 return std::make_pair(decision, start);
522 }
else if (decision == Decision::SWAP) {
523 return std::make_pair(decision, alternative_rte);
525 return std::make_pair(Decision::IGNORE, start);
528 auto collect_colvars = [](
const std::shared_ptr<Analyzer::Expr> expr,
530 expr->collect_column_var(cv_set,
false);
533 auto adjust_reordering_logic = [&start, &start_edge, &start_it, set_new_rte_idx](
536 ColvarSet& lhs_colvar_set,
537 ColvarSet& rhs_colvar_set) {
538 CHECK(decision == Decision::SWAP);
539 start = alternative_rte;
540 set_new_rte_idx(lhs_colvar_set, alternative_rte);
541 set_new_rte_idx(rhs_colvar_set, *start_it);
542 start_edge.join_cost = 0;
543 start_edge.nest_level = start;
549 auto rhs = bin_op->get_own_right_operand();
550 if (
auto lhs_exp = dynamic_cast<Analyzer::ExpressionTuple*>(lhs.get())) {
555 auto& lhs_exprs = lhs_exp->getTuple();
556 auto& rhs_exprs = rhs_exp->getTuple();
557 CHECK_EQ(lhs_exprs.size(), rhs_exprs.size());
558 for (
size_t i = 0; i < lhs_exprs.size(); ++i) {
559 Decision decision{Decision::IGNORE};
560 int alternative_rte_idx = -1;
563 collect_colvars(lhs_exprs.at(i), lhs_colvar_set);
564 collect_colvars(rhs_exprs.at(i), rhs_colvar_set);
566 auto investigation_res =
567 analyze_join_qual(lhs, lhs_colvar_set, rhs, rhs_colvar_set);
568 decision = investigation_res.first;
569 if (decision == Decision::KEEP) {
570 return remaining_nest_levels;
572 alternative_rte_idx = investigation_res.second;
574 if (decision == Decision::SWAP) {
575 adjust_reordering_logic(
576 decision, alternative_rte_idx, lhs_colvar_set, rhs_colvar_set);
582 collect_colvars(lhs, lhs_colvar_set);
583 collect_colvars(rhs, rhs_colvar_set);
584 auto investigation_res =
585 analyze_join_qual(lhs, lhs_colvar_set, rhs, rhs_colvar_set);
586 if (investigation_res.first == Decision::KEEP) {
587 return remaining_nest_levels;
588 }
else if (investigation_res.first == Decision::SWAP) {
589 adjust_reordering_logic(investigation_res.first,
590 investigation_res.second,
598 VLOG(2) <<
"Table reordering starting with nest level " << start;
599 for (
const auto& graph_edge : join_cost_graph[*start_it]) {
600 const node_t succ = graph_edge.first;
601 if (!schedulable_node(succ)) {
605 for (
const auto& successor_edge : join_cost_graph[succ]) {
606 if (successor_edge.first == start) {
607 start_edge.
join_cost = successor_edge.second;
610 if (compare_edge(start_edge, succ_edge)) {
611 VLOG(2) <<
"Table reordering changing start nest level from " << start
614 start_edge = succ_edge;
619 VLOG(2) <<
"Table reordering picked start nest level " << start <<
" with cost "
620 << start_edge.join_cost;
621 CHECK_EQ(start, start_edge.nest_level);
622 worklist.push(start_edge);
623 const auto it_ok = visited.insert(start);
625 while (!worklist.empty()) {
629 VLOG(1) <<
"Insert input permutation, idx: " << input_permutation.size()
632 dependency_tracking.removeNode(crt.
nest_level);
634 for (
const auto& graph_edge : join_cost_graph[crt.
nest_level]) {
635 const node_t succ = graph_edge.first;
636 if (!schedulable_node(succ)) {
640 const auto it_ok = visited.insert(succ);
645 return input_permutation;
652 const std::vector<InputTableInfo>& table_infos,
653 const Executor* executor) {
654 std::vector<std::map<node_t, InnerQualDecision>> qual_normalization_res(
657 left_deep_join_quals, table_infos, executor, qual_normalization_res);
659 const auto compare_node = [&table_infos](
const node_t lhs_nest_level,
660 const node_t rhs_nest_level) {
661 return table_infos[lhs_nest_level].info.getNumTuplesUpperBound() <
662 table_infos[rhs_nest_level].info.getNumTuplesUpperBound();
664 const auto compare_edge = [&compare_node](
const TraversalEdge& lhs_edge,
665 const TraversalEdge& rhs_edge) {
667 if (lhs_edge.join_cost == rhs_edge.join_cost) {
668 return compare_node(lhs_edge.nest_level, rhs_edge.nest_level);
670 return lhs_edge.join_cost > rhs_edge.join_cost;
676 left_deep_join_quals,
677 qual_normalization_res);
static bool is_point_poly_rewrite_target_func(std::string_view target_func_name)
static constexpr std::array< std::string_view, 6 > ST_INTERSECTS_FORCE_TABLE_REORDERING_TARGET_FUNC
std::unordered_set< node_t > getRoots() const
static bool colvar_comp(const ColumnVar *l, const ColumnVar *r)
#define IS_EQUIVALENCE(X)
SchedulingDependencyTracking build_dependency_tracking(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< std::map< node_t, cost_t >> &join_cost_graph)
bool is_constructed_point(const Analyzer::Expr *expr)
size_t get_table_cardinality(shared::TableKey const &table_key, Executor const *executor)
const std::vector< const Analyzer::ColumnVar * > & getGeoArgCvs() const
std::vector< JoinCondition > JoinQualsPerNestingLevel
T visit(const Analyzer::Expr *expr) const
unsigned g_trivial_loop_join_threshold
const Analyzer::ColumnVar * get_geo_cv(std::vector< const Analyzer::ColumnVar * > const &geo_args, shared::TableKey const &table_key)
std::tuple< cost_t, cost_t, InnerQualDecision > get_join_qual_cost(const Analyzer::Expr *qual, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static std::unordered_map< SQLTypes, cost_t > GEO_TYPE_COSTS
void removeNode(const node_t from)
std::vector< std::map< node_t, cost_t > > build_join_cost_graph(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor, std::vector< std::map< node_t, InnerQualDecision >> &qual_detection_res)
const std::string & getGeoFunctionName() const
const SQLTypeInfo & get_type_info() const
SchedulingDependencyTracking(const size_t node_count)
const Analyzer::Expr * getArg(const size_t i) const
DEVICE void iota(ARGS &&...args)
void addEdge(const node_t from, const node_t to)
const std::optional< GeoJoinOperandsTableKeyPair > getJoinTableKeyPair() const
bool should_force_table_reordering(shared::TableKey const &inner_arg_key, SQLTypes const inner_type, shared::TableKey const &outer_arg_key, SQLTypes const outer_type, std::string const &geo_func_name, const std::vector< InputTableInfo > &table_infos)
static std::unordered_map< SQLTypes, cost_t > static bool force_table_reordering_st_contain_func(std::string_view target_func_name)
static constexpr std::array< std::string_view, 4 > ST_CONTAIN_FORCE_TABLE_REORDERING_TARGET_FUNC
std::vector< std::unordered_set< node_t > > inbound_
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const TemporaryTables *temporary_tables)
static bool is_poly_point_rewrite_target_func(std::string_view target_func_name)
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
static bool force_table_reordering_st_intersects_func(std::string_view target_func_name)
std::vector< node_t > traverse_join_cost_graph(const std::vector< std::map< node_t, cost_t >> &join_cost_graph, const std::vector< InputTableInfo > &table_infos, const std::function< bool(const node_t lhs_nest_level, const node_t rhs_nest_level)> &compare_node, const std::function< bool(const TraversalEdge &, const TraversalEdge &)> &compare_edge, const JoinQualsPerNestingLevel &left_deep_join_quals, std::vector< std::map< node_t, InnerQualDecision >> &qual_normalization_res)
AccessManager::Decision Decision
InnerQualDecision inner_qual_decision
const std::shared_ptr< Analyzer::Expr > get_own_left_operand() const