22 #include <boost/graph/topological_sort.hpp>
27 : filter_push_down_enabled_(
false)
29 , execution_time_ms_(0)
30 , type_(QueryResult) {}
33 const std::vector<TargetMetaInfo>& targets_meta)
35 , targets_meta_(targets_meta)
36 , filter_push_down_enabled_(
false)
38 , execution_time_ms_(0)
39 , type_(QueryResult) {}
42 const std::vector<TargetMetaInfo>& targets_meta)
43 : result_(std::move(
result))
44 , targets_meta_(targets_meta)
45 , filter_push_down_enabled_(
false)
47 , execution_time_ms_(0)
48 , type_(QueryResult) {}
51 : targets_meta_(that.targets_meta_)
52 , pushed_down_filter_info_(that.pushed_down_filter_info_)
53 , filter_push_down_enabled_(that.filter_push_down_enabled_)
54 , success_(that.success_)
55 , execution_time_ms_(that.execution_time_ms_)
65 : targets_meta_(std::move(that.targets_meta_))
66 , pushed_down_filter_info_(std::move(that.pushed_down_filter_info_))
67 , filter_push_down_enabled_(std::move(that.filter_push_down_enabled_))
68 , success_(that.success_)
69 , execution_time_ms_(that.execution_time_ms_)
75 result_ = std::move(that.result_);
79 const std::vector<PushedDownFilterInfo>& pushed_down_filter_info,
80 bool filter_push_down_enabled)
81 : pushed_down_filter_info_(pushed_down_filter_info)
82 , filter_push_down_enabled_(filter_push_down_enabled)
84 , execution_time_ms_(0)
85 , type_(QueryResult) {}
114 result_ = std::make_shared<ResultSet>(query);
119 return getRows()->getExplanation();
136 std::unordered_map<const RelAlgNode*, int>& node_ptr_to_vert_idx) {
139 node_ptr_to_vert_idx.emplace(std::make_pair(sink, 0));
140 std::vector<const RelAlgNode*> stack(1, sink);
141 while (!stack.empty()) {
142 const auto node = stack.back();
144 if (dynamic_cast<const RelScan*>(node)) {
148 const auto input_num = node->inputCount();
151 CHECK(dynamic_cast<const RelLogicalValues*>(node) ||
152 dynamic_cast<const RelTableFunction*>(node));
156 CHECK(dynamic_cast<const RelJoin*>(node) ||
157 dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
158 dynamic_cast<const RelLogicalUnion*>(node) ||
159 dynamic_cast<const RelTableFunction*>(node));
162 CHECK(dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
163 dynamic_cast<const RelLogicalUnion*>(node) ||
164 dynamic_cast<const RelTableFunction*>(node));
166 for (
size_t i = 0; i < input_num; ++i) {
167 const auto input = node->getInput(i);
169 const bool visited = node_ptr_to_vert_idx.count(input) > 0;
171 node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
173 boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
175 graph[node_ptr_to_vert_idx[input]] = input;
176 stack.push_back(input);
185 std::unordered_set<Vertex> joins;
186 for (
const auto vert : vertices) {
187 if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
191 if (!dynamic_cast<const RelJoin*>(graph[vert])) {
194 if (boost::out_degree(vert, graph) > 1) {
195 throw std::runtime_error(
"Join used more than once not supported yet");
197 auto [oe_iter, oe_end] = boost::out_edges(vert, graph);
198 CHECK(std::next(oe_iter) == oe_end);
199 const auto out_vert = boost::target(*oe_iter, graph);
200 if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
211 const bool build_sequence) {
214 if (dynamic_cast<const RelScan*>(sink) || dynamic_cast<const RelJoin*>(sink)) {
215 throw std::runtime_error(
"Query not supported yet");
226 if (build_sequence) {
239 descs_.emplace_back(std::move(exec_desc));
243 auto checkQueryStepSkippable =
246 if (
executor_->getResultSetRecyclerHolder().hasCachedQueryResultSet(key)) {
249 const auto output_meta_info =
250 executor_->getResultSetRecyclerHolder().getOutputMetaInfo(key);
251 CHECK(output_meta_info);
260 auto& node =
graph_[vert];
262 if (dynamic_cast<const RelScan*>(node)) {
266 descs_.emplace_back(std::make_unique<RaExecutionDesc>(node));
271 auto extracted_query_plan_dag =
274 !boost::iequals(extracted_query_plan_dag.extracted_dag,
EMPTY_QUERY_PLAN)) {
276 if (
auto sort_node = dynamic_cast<const RelSort*>(node)) {
281 checkQueryStepSkippable(node, cache_key,
descs_.size() - 1);
283 return descs_.back().get();
289 const std::vector<Vertex>& vertices,
291 DAG::in_edge_iterator ie_iter, ie_end;
292 std::unordered_set<Vertex> inputs;
293 for (
const auto vert : vertices) {
294 if (
const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
295 boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
296 CHECK(
size_t(1) ==
sort->inputCount() && boost::next(ie_iter) == ie_end);
297 if (
sort->isLimitDelivered()) {
300 const auto in_vert = boost::source(*ie_iter, graph);
301 const auto input = graph[in_vert];
302 if (dynamic_cast<const RelScan*>(input)) {
303 throw std::runtime_error(
"Standalone sort not supported yet");
305 if (boost::out_degree(in_vert, graph) > 1) {
306 throw std::runtime_error(
"Sort's input node used by others not supported yet");
308 inputs.insert(in_vert);
312 std::vector<Vertex> new_vertices;
313 for (
const auto vert : vertices) {
314 if (inputs.count(vert)) {
317 new_vertices.push_back(vert);
330 auto itr =
descs_.rbegin();
331 return (++itr)->get();
335 const auto pushChildNodes = [&](
auto& stack,
const auto node_id) {
336 auto [in_edge_iter, in_edge_end] = boost::in_edges(
node_id,
graph_);
337 while (in_edge_iter != in_edge_end) {
338 const auto child_node_id = boost::source(*in_edge_iter,
graph_);
339 stack.push(
graph_[child_node_id]);
340 std::advance(in_edge_iter, 1);
343 auto top_node_id =
descs_.size() - 1;
344 auto top_res =
skippable_steps_.emplace(top_node_id, std::unordered_set<int>{});
345 CHECK(top_res.second);
346 for (
auto it =
descs_.begin(); it != std::prev(
descs_.end()); ++it) {
347 const auto step = it->get();
348 const auto body = step->getBody();
349 const auto step_id =
static_cast<int>(std::distance(
descs_.begin(), it));
353 std::stack<const RelAlgNode*> stack;
355 while (!stack.empty()) {
356 auto child_node = stack.top();
360 auto is_desc_body = std::find_if(
361 descs_.begin(), it, [&child_node](std::unique_ptr<RaExecutionDesc>& ptr) {
362 return ptr->getBody() == child_node;
364 if (is_desc_body != it) {
367 const auto child_step_id =
368 static_cast<int>(std::distance(
descs_.begin(), is_desc_body));
383 std::unordered_set<int> skippable_query_steps;
387 const auto& child_steps = it->second;
389 child_steps.begin(), child_steps.end(), [&](
const auto& skippable_step_id) {
390 if (skippable_step_id != cached_step_id) {
391 skippable_query_steps.insert(skippable_step_id);
397 if (!skippable_query_steps.empty()) {
398 std::vector<std::unique_ptr<RaExecutionDesc>> new_descs;
399 for (
size_t step_id = 0; step_id <
descs_.size(); ++step_id) {
400 const auto body =
descs_[step_id]->getBody();
401 if (!skippable_query_steps.count(step_id)) {
402 new_descs.push_back(std::make_unique<RaExecutionDesc>(body));
405 const auto prev_num_steps =
descs_.size();
406 if (!new_descs.empty()) {
409 VLOG(1) <<
"Skip " << prev_num_steps -
descs_.size() <<
" query steps from "
410 << prev_num_steps <<
" steps";
413 for (
const auto& desc :
descs_) {
416 auto body = desc->getBody();
425 if (after_broadcast) {
455 bool operator()(std::unique_ptr<RaExecutionDesc>
const& desc)
const {
456 return desc->getBody()->getId() == body_id_;
463 unsigned const body_id,
464 size_t const start_idx)
const {
466 auto const from_end =
descs_.size() - (start_idx + 1);
467 MatchBody
const match_body{body_id};
468 auto const itr = std::find_if(
descs_.rbegin() + from_end,
descs_.rend(), match_body);
469 return itr ==
descs_.rend() ?
nullptr : itr->get();
473 size_t num_descriptors = 0;
474 size_t crt_vertex = 0;
480 auto& node =
graph_[vert];
482 if (dynamic_cast<const RelScan*>(node)) {
487 return num_descriptors;
491 size_t steps_to_next_broadcast = 0;
500 for (
size_t i = 0; i < join_node->inputCount(); i++) {
501 const auto input = join_node->getInput(i);
502 if (dynamic_cast<const RelScan*>(input)) {
503 return steps_to_next_broadcast;
510 ++steps_to_next_broadcast;
517 return ++steps_to_next_broadcast;
520 if (
auto sort = dynamic_cast<const RelSort*>(node)) {
522 node =
sort->getInput(0);
524 if (dynamic_cast<const RelScan*>(node)) {
525 return steps_to_next_broadcast;
527 if (dynamic_cast<const RelModify*>(node)) {
530 ++steps_to_next_broadcast;
533 if (
auto project = dynamic_cast<const RelProject*>(node)) {
534 if (project->hasWindowFunctionExpr()) {
535 ++steps_to_next_broadcast;
539 for (
size_t input_idx = 0; input_idx < node->inputCount(); input_idx++) {
540 if (dynamic_cast<const RelScan*>(node->getInput(input_idx))) {
541 return steps_to_next_broadcast;
544 ++steps_to_next_broadcast;
546 return steps_to_next_broadcast;
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
std::string getExplanation()
uint64_t execution_time_ms_
bool g_use_query_resultset_cache
std::vector< Vertex > mergeSortWithInput(const std::vector< Vertex > &vertices, const DAG &graph)
bool g_allow_query_step_skipping
ExecutionResult & operator=(const ExecutionResult &that)
DEVICE void sort(ARGS &&...args)
std::shared_ptr< ResultSet > ResultSetPtr
std::unordered_set< Vertex > joins_
static SortInfo createFromSortNode(const RelSort *sort_node)
bool g_enable_data_recycler
bool operator()(std::unique_ptr< RaExecutionDesc > const &desc) const
size_t getQueryPlanDagHash() const
DAG build_dag(const RelAlgNode *sink, std::unordered_map< const RelAlgNode *, int > &node_ptr_to_vert_idx)
std::unordered_set< int > cached_query_steps_
std::vector< std::unique_ptr< RaExecutionDesc > > descs_
const std::shared_ptr< ResultSet > & getRows() const
bool filter_push_down_enabled_
bool executionFinished() const
void updateResultSet(const std::string &query_ra, RType type, bool success=true)
void setContextData(const RaExecutionDesc *context_data) const
void extractQueryStepSkippingInfo()
bool g_enable_smem_group_by true
std::vector< PushedDownFilterInfo > pushed_down_filter_info_
size_t stepsToNextBroadcast() const
std::unordered_map< const RelAlgNode *, int > node_ptr_to_vert_idx_
RaExecutionSequence(const RelAlgNode *, Executor *, const bool build_sequence=true)
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
std::unordered_set< Vertex > get_join_vertices(const std::vector< Vertex > &vertices, const DAG &graph)
std::vector< Vertex > ordering_
bool g_enable_watchdog false
constexpr char const * EMPTY_QUERY_PLAN
const RelAlgNode * getBody() const
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
size_t totalDescriptorsCount() const
unsigned node_id(const rapidjson::Value &ra_node) noexcept
std::vector< TargetMetaInfo > targets_meta_
DEVICE void reverse(ARGS &&...args)
RaExecutionDesc * getDescriptorByBodyId(unsigned const body_id, size_t const start_idx) const
DEVICE void swap(ARGS &&...args)
std::unordered_map< int, std::unordered_set< int > > skippable_steps_
std::unordered_map< int, QueryPlanHash > cached_resultset_keys_
std::optional< size_t > nextStepId(const bool after_broadcast) const