OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
WindowFunctionContext Class Reference

#include <WindowContext.h>

+ Collaboration diagram for WindowFunctionContext:

Classes

struct  AggregateState
 

Public Types

enum  WindowComparatorResult { WindowComparatorResult::LT, WindowComparatorResult::EQ, WindowComparatorResult::GT }
 
using Comparator = std::function< WindowFunctionContext::WindowComparatorResult(const int64_t lhs, const int64_t rhs)>
 

Public Member Functions

 WindowFunctionContext (const Analyzer::WindowFunction *window_func, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
 WindowFunctionContext (const Analyzer::WindowFunction *window_func, QueryPlanHash cache_key, const std::shared_ptr< HashJoin > &partitions, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, size_t aggregation_tree_fan_out=g_window_function_aggregation_tree_fanout)
 
 WindowFunctionContext (const WindowFunctionContext &)=delete
 
WindowFunctionContextoperator= (const WindowFunctionContext &)=delete
 
 ~WindowFunctionContext ()
 
void addOrderColumn (const int8_t *column, const SQLTypeInfo &ti, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
 
void setSortedPartitionCacheKey (QueryPlanHash cache_key)
 
void addColumnBufferForWindowFunctionExpression (const int8_t *column, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
 
std::vector< ComparatorcreateComparator (size_t partition_idx)
 
void compute (std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, std::unordered_map< QueryPlanHash, std::shared_ptr< std::vector< int64_t >>> &sorted_partition_cache, std::unordered_map< QueryPlanHash, AggregateTreeForWindowFraming > &aggregate_tree_map)
 
const Analyzer::WindowFunctiongetWindowFunction () const
 
const int8_t * output () const
 
const int64_t * sortedPartition () const
 
const int64_t * aggregateState () const
 
const int64_t * aggregateStateCount () const
 
int64_t aggregateStatePendingOutputs () const
 
const int64_t * partitionStartOffset () const
 
const int64_t * partitionNumCountBuf () const
 
const std::vector< const
int8_t * > & 
getColumnBufferForWindowFunctionExpressions () const
 
const std::vector< const
int8_t * > & 
getOrderKeyColumnBuffers () const
 
const std::vector< SQLTypeInfo > & getOrderKeyColumnBufferTypes () const
 
int64_t ** getAggregationTreesForIntegerTypeWindowExpr () const
 
double ** getAggregationTreesForDoubleTypeWindowExpr () const
 
SumAndCountPair< int64_t > ** getDerivedAggregationTreesForIntegerTypeWindowExpr () const
 
SumAndCountPair< double > ** getDerivedAggregationTreesForDoubleTypeWindowExpr () const
 
size_t * getAggregateTreeDepth () const
 
size_t getAggregateTreeFanout () const
 
int64_t * getNullValueStartPos () const
 
int64_t * getNullValueEndPos () const
 
const int8_t * partitionStart () const
 
const int8_t * partitionEnd () const
 
size_t elementCount () const
 
const int32_t * payload () const
 
const int32_t * offsets () const
 
const int32_t * counts () const
 
size_t partitionCount () const
 
const bool needsToBuildAggregateTree () const
 

Static Public Attributes

static const int NUM_EXECUTION_DEVICES = 1
 

Private Member Functions

void computePartitionBuffer (const size_t partition_idx, int64_t *output_for_partition_buff, const Analyzer::WindowFunction *window_func)
 
void sortPartition (const size_t partition_idx, int64_t *output_for_partition_buff, bool should_parallelize)
 
void computeNullRangeOfSortedPartition (const SQLTypeInfo &order_col_ti, size_t partition_idx, const int32_t *original_col_idx_buf, const int64_t *ordered_col_idx_buf)
 
void buildAggregationTreeForPartition (SqlWindowFunctionKind agg_type, size_t partition_idx, size_t partition_size, const int32_t *original_rowid_buf, const int64_t *ordered_rowid_buf, const SQLTypeInfo &input_col_ti)
 
void fillPartitionStart ()
 
void fillPartitionEnd ()
 
void resizeStorageForWindowFraming (bool const for_reuse=false)
 
const QueryPlanHash computeAggregateTreeCacheKey () const
 

Static Private Member Functions

static Comparator makeComparator (const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool asc_ordering, const bool nulls_first)
 

Private Attributes

const Analyzer::WindowFunctionwindow_func_
 
QueryPlanHash partition_cache_key_
 
QueryPlanHash sorted_partition_cache_key_
 
std::vector< std::vector
< std::shared_ptr
< Chunk_NS::Chunk > > > 
order_columns_owner_
 
std::vector< const int8_t * > order_columns_
 
std::vector< SQLTypeInfoorder_columns_ti_
 
std::shared_ptr< HashJoinpartitions_
 
size_t elem_count_
 
int8_t * output_
 
std::shared_ptr< std::vector
< int64_t > > 
sorted_partition_buf_
 
std::vector< std::vector
< std::shared_ptr
< Chunk_NS::Chunk > > > 
window_func_expr_columns_owner_
 
std::vector< const int8_t * > window_func_expr_columns_
 
std::vector< std::shared_ptr
< void > > 
segment_trees_owned_
 
AggregateTreeForWindowFraming aggregate_trees_
 
size_t aggregate_trees_fan_out_
 
size_t * aggregate_trees_depth_
 
int64_t * ordered_partition_null_start_pos_
 
int64_t * ordered_partition_null_end_pos_
 
int64_t * partition_start_offset_
 
int8_t * partition_start_
 
int8_t * partition_end_
 
AggregateState aggregate_state_
 
const ExecutorDeviceType device_type_
 
std::shared_ptr
< RowSetMemoryOwner
row_set_mem_owner_
 
const int32_t dummy_count_
 
const int32_t dummy_offset_
 
int32_t * dummy_payload_
 

Detailed Description

Definition at line 136 of file WindowContext.h.

Member Typedef Documentation

using WindowFunctionContext::Comparator = std::function<WindowFunctionContext::WindowComparatorResult(const int64_t lhs, const int64_t rhs)>

Definition at line 181 of file WindowContext.h.

Member Enumeration Documentation

Enumerator
LT 
EQ 
GT 

Definition at line 178 of file WindowContext.h.

Constructor & Destructor Documentation

WindowFunctionContext::WindowFunctionContext ( const Analyzer::WindowFunction window_func,
const size_t  elem_count,
const ExecutorDeviceType  device_type,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)

Definition at line 50 of file WindowContext.cpp.

References aggregate_trees_depth_, CHECK_LE, checked_calloc(), checked_malloc(), dummy_payload_, elem_count_, Analyzer::WindowFunction::getKind(), Analyzer::WindowFunction::hasFraming(), gpu_enabled::iota(), Analyzer::WindowFunction::isMissingValueFillingFunction(), NTH_VALUE, ordered_partition_null_end_pos_, ordered_partition_null_start_pos_, partition_start_offset_, and window_func_.

55  : window_func_(window_func)
58  , partitions_(nullptr)
59  , elem_count_(elem_count)
60  , output_(nullptr)
61  , sorted_partition_buf_(nullptr)
63  , aggregate_trees_depth_(nullptr)
66  , partition_start_offset_(nullptr)
67  , partition_start_(nullptr)
68  , partition_end_(nullptr)
69  , device_type_(device_type)
70  , row_set_mem_owner_(row_set_mem_owner)
71  , dummy_count_(elem_count)
72  , dummy_offset_(0)
73  , dummy_payload_(nullptr) {
74  CHECK_LE(elem_count_, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
76  reinterpret_cast<int32_t*>(checked_malloc(elem_count_ * sizeof(int32_t)));
80  // in this case, we consider all rows of the row belong to the same and only
81  // existing partition
83  reinterpret_cast<int64_t*>(checked_calloc(2, sizeof(int64_t)));
85  aggregate_trees_depth_ = reinterpret_cast<size_t*>(checked_calloc(1, sizeof(size_t)));
87  reinterpret_cast<int64_t*>(checked_calloc(1, sizeof(int64_t)));
89  reinterpret_cast<int64_t*>(checked_calloc(1, sizeof(int64_t)));
90  }
91 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:2925
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
int64_t * ordered_partition_null_start_pos_
const int32_t dummy_count_
bool isMissingValueFillingFunction() const
Definition: Analyzer.h:2986
const int32_t dummy_offset_
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
const Analyzer::WindowFunction * window_func_
std::shared_ptr< std::vector< int64_t > > sorted_partition_buf_
size_t g_window_function_aggregation_tree_fanout
QueryPlanHash sorted_partition_cache_key_
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
QueryPlanHash partition_cache_key_
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
#define CHECK_LE(x, y)
Definition: Logger.h:304
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
std::shared_ptr< HashJoin > partitions_
int64_t * partition_start_offset_
size_t * aggregate_trees_depth_
bool hasFraming() const
Definition: Analyzer.h:2961
const ExecutorDeviceType device_type_
int64_t * ordered_partition_null_end_pos_

+ Here is the call graph for this function:

WindowFunctionContext::WindowFunctionContext ( const Analyzer::WindowFunction window_func,
QueryPlanHash  cache_key,
const std::shared_ptr< HashJoin > &  partitions,
const size_t  elem_count,
const ExecutorDeviceType  device_type,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
size_t  aggregation_tree_fan_out = g_window_function_aggregation_tree_fanout 
)

Definition at line 94 of file WindowContext.cpp.

References aggregate_trees_depth_, CHECK, checked_calloc(), counts(), Analyzer::WindowFunction::hasFraming(), Analyzer::WindowFunction::isMissingValueFillingFunction(), ordered_partition_null_end_pos_, ordered_partition_null_start_pos_, gpu_enabled::partial_sum(), partition_start_offset_, partitionCount(), partitions_, and window_func_.

102  : window_func_(window_func)
103  , partition_cache_key_(partition_cache_key)
105  , partitions_(partitions)
106  , elem_count_(elem_count)
107  , output_(nullptr)
108  , sorted_partition_buf_(nullptr)
109  , aggregate_trees_fan_out_(aggregation_tree_fan_out)
110  , aggregate_trees_depth_(nullptr)
113  , partition_start_offset_(nullptr)
114  , partition_start_(nullptr)
115  , partition_end_(nullptr)
116  , device_type_(device_type)
117  , row_set_mem_owner_(row_set_mem_owner)
118  , dummy_count_(elem_count)
119  , dummy_offset_(0)
120  , dummy_payload_(nullptr) {
121  CHECK(partitions_); // This version should have hash table
122  size_t partition_count = partitionCount();
124  reinterpret_cast<int64_t*>(checked_calloc(partition_count + 1, sizeof(int64_t)));
127  reinterpret_cast<size_t*>(checked_calloc(partition_count, sizeof(size_t)));
129  reinterpret_cast<int64_t*>(checked_calloc(partition_count, sizeof(int64_t)));
131  reinterpret_cast<int64_t*>(checked_calloc(partition_count, sizeof(int64_t)));
132  }
133  // the first partition starts at zero position
134  std::partial_sum(counts(), counts() + partition_count, partition_start_offset_ + 1);
135 }
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
int64_t * ordered_partition_null_start_pos_
const int32_t dummy_count_
bool isMissingValueFillingFunction() const
Definition: Analyzer.h:2986
const int32_t dummy_offset_
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
std::shared_ptr< std::vector< int64_t > > sorted_partition_buf_
size_t partitionCount() const
QueryPlanHash sorted_partition_cache_key_
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
QueryPlanHash partition_cache_key_
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
std::shared_ptr< HashJoin > partitions_
int64_t * partition_start_offset_
#define CHECK(condition)
Definition: Logger.h:291
size_t * aggregate_trees_depth_
bool hasFraming() const
Definition: Analyzer.h:2961
const ExecutorDeviceType device_type_
int64_t * ordered_partition_null_end_pos_

+ Here is the call graph for this function:

WindowFunctionContext::WindowFunctionContext ( const WindowFunctionContext )
delete
WindowFunctionContext::~WindowFunctionContext ( )

Member Function Documentation

void WindowFunctionContext::addColumnBufferForWindowFunctionExpression ( const int8_t *  column,
const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner 
)

Definition at line 166 of file WindowContext.cpp.

References window_func_expr_columns_, and window_func_expr_columns_owner_.

168  {
169  window_func_expr_columns_owner_.push_back(chunks_owner);
170  window_func_expr_columns_.push_back(column);
171 };
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > window_func_expr_columns_owner_
std::vector< const int8_t * > window_func_expr_columns_
void WindowFunctionContext::addOrderColumn ( const int8_t *  column,
const SQLTypeInfo ti,
const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner 
)

Definition at line 157 of file WindowContext.cpp.

References order_columns_, order_columns_owner_, and order_columns_ti_.

160  {
161  order_columns_owner_.push_back(chunks_owner);
162  order_columns_.push_back(column);
163  order_columns_ti_.push_back(ti);
164 }
std::vector< const int8_t * > order_columns_
std::vector< SQLTypeInfo > order_columns_ti_
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
const int64_t * WindowFunctionContext::aggregateState ( ) const

Definition at line 993 of file WindowContext.cpp.

References aggregate_state_, CHECK, Analyzer::WindowFunction::getKind(), WindowFunctionContext::AggregateState::val, window_func_, and window_function_is_aggregate().

993  {
995  return &aggregate_state_.val;
996 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:2925
const Analyzer::WindowFunction * window_func_
AggregateState aggregate_state_
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:61
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

const int64_t * WindowFunctionContext::aggregateStateCount ( ) const

Definition at line 998 of file WindowContext.cpp.

References aggregate_state_, CHECK, WindowFunctionContext::AggregateState::count, Analyzer::WindowFunction::getKind(), window_func_, and window_function_is_aggregate().

998  {
1000  return &aggregate_state_.count;
1001 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:2925
const Analyzer::WindowFunction * window_func_
AggregateState aggregate_state_
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:61
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

int64_t WindowFunctionContext::aggregateStatePendingOutputs ( ) const

Definition at line 1013 of file WindowContext.cpp.

References aggregate_state_, CHECK, Analyzer::WindowFunction::getKind(), WindowFunctionContext::AggregateState::outputs, window_func_, and window_function_is_aggregate().

1013  {
1015  return reinterpret_cast<int64_t>(&aggregate_state_.outputs);
1016 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:2925
const Analyzer::WindowFunction * window_func_
AggregateState aggregate_state_
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:61
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

void WindowFunctionContext::buildAggregationTreeForPartition ( SqlWindowFunctionKind  agg_type,
size_t  partition_idx,
size_t  partition_size,
const int32_t *  original_rowid_buf,
const int64_t *  ordered_rowid_buf,
const SQLTypeInfo input_col_ti 
)
private

Definition at line 1473 of file WindowContext.cpp.

References AggregateTreeForWindowFraming::aggregate_tree_for_double_type_, AggregateTreeForWindowFraming::aggregate_tree_for_integer_type_, aggregate_trees_, AggregateTreeForWindowFraming::aggregate_trees_depth_, aggregate_trees_depth_, aggregate_trees_fan_out_, anonymous_namespace{WindowContext.cpp}::allow_framing_on_time_or_date(), AVG, CHECK, COUNT, decimal_to_int_type(), AggregateTreeForWindowFraming::derived_aggregate_tree_for_double_type_, AggregateTreeForWindowFraming::derived_aggregate_tree_for_integer_type_, get_int_type_by_size(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), Analyzer::WindowFunction::getKind(), SQLTypeInfo::is_boolean(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_fp(), SQLTypeInfo::is_integer(), SQLTypeInfo::is_number(), SQLTypeInfo::is_time_or_date(), kBIGINT, kBOOLEAN, kDECIMAL, kDOUBLE, kFLOAT, kINT, kNUMERIC, kSMALLINT, kTINYINT, MAX, MIN, offsets(), ordered_partition_null_end_pos_, ordered_partition_null_start_pos_, segment_trees_owned_, toString(), run_benchmark_import::type, UNREACHABLE, window_func_, and window_func_expr_columns_.

Referenced by compute().

1479  {
1480  if (!(input_col_ti.is_number() || input_col_ti.is_boolean() ||
1481  input_col_ti.is_time_or_date())) {
1482  throw QueryNotSupported("Window aggregate function over frame on a column type " +
1483  ::toString(input_col_ti.get_type()) + " is not supported.");
1484  }
1485  if (input_col_ti.is_time_or_date() &&
1487  !(agg_type == SqlWindowFunctionKind::MIN ||
1488  agg_type == SqlWindowFunctionKind::MAX ||
1489  agg_type == SqlWindowFunctionKind::COUNT)) {
1490  throw QueryNotSupported(
1491  "Aggregation over a window frame for a column type " +
1492  ::toString(input_col_ti.get_type()) +
1493  " must use one of the following window aggregate function: MIN / MAX / COUNT");
1494  }
1495  const auto type = input_col_ti.is_decimal() ? decimal_to_int_type(input_col_ti)
1496  : input_col_ti.is_time_or_date()
1497  ? get_int_type_by_size(input_col_ti.get_size())
1498  : input_col_ti.get_type();
1499  if (partition_size > 0) {
1500  IndexPair order_col_null_range{ordered_partition_null_start_pos_[partition_idx],
1501  ordered_partition_null_end_pos_[partition_idx]};
1502  const int64_t* ordered_rowid_buf_for_partition =
1503  ordered_rowid_buf + offsets()[partition_idx];
1504  switch (type) {
1505  case kBOOLEAN:
1506  case kTINYINT: {
1507  const auto segment_tree = std::make_shared<SegmentTree<int8_t, int64_t>>(
1509  input_col_ti,
1510  original_rowid_buf,
1511  ordered_rowid_buf_for_partition,
1512  partition_size,
1513  agg_type,
1515  aggregate_trees_depth_[partition_idx] =
1516  segment_tree ? segment_tree->getLeafDepth() : 0;
1517  if (agg_type == SqlWindowFunctionKind::AVG) {
1519  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1520  } else {
1522  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1523  }
1524  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1525  break;
1526  }
1527  case kSMALLINT: {
1528  const auto segment_tree = std::make_shared<SegmentTree<int16_t, int64_t>>(
1530  input_col_ti,
1531  original_rowid_buf,
1532  ordered_rowid_buf_for_partition,
1533  partition_size,
1534  agg_type,
1536  aggregate_trees_depth_[partition_idx] =
1537  segment_tree ? segment_tree->getLeafDepth() : 0;
1538  if (agg_type == SqlWindowFunctionKind::AVG) {
1540  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1541  } else {
1543  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1544  }
1545  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1546  break;
1547  }
1548  case kINT: {
1549  const auto segment_tree = std::make_shared<SegmentTree<int32_t, int64_t>>(
1551  input_col_ti,
1552  original_rowid_buf,
1553  ordered_rowid_buf_for_partition,
1554  partition_size,
1555  agg_type,
1557  aggregate_trees_depth_[partition_idx] =
1558  segment_tree ? segment_tree->getLeafDepth() : 0;
1559  if (agg_type == SqlWindowFunctionKind::AVG) {
1561  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1562  } else {
1564  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1565  }
1566  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1567  break;
1568  }
1569  case kDECIMAL:
1570  case kNUMERIC:
1571  case kBIGINT: {
1572  const auto segment_tree = std::make_shared<SegmentTree<int64_t, int64_t>>(
1574  input_col_ti,
1575  original_rowid_buf,
1576  ordered_rowid_buf_for_partition,
1577  partition_size,
1578  agg_type,
1580  aggregate_trees_depth_[partition_idx] =
1581  segment_tree ? segment_tree->getLeafDepth() : 0;
1582  if (agg_type == SqlWindowFunctionKind::AVG) {
1584  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1585  } else {
1587  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1588  }
1589  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1590  break;
1591  }
1592  case kFLOAT: {
1593  const auto segment_tree =
1594  std::make_shared<SegmentTree<float, double>>(window_func_expr_columns_,
1595  input_col_ti,
1596  original_rowid_buf,
1597  ordered_rowid_buf_for_partition,
1598  partition_size,
1599  agg_type,
1601  aggregate_trees_depth_[partition_idx] =
1602  segment_tree ? segment_tree->getLeafDepth() : 0;
1603  if (agg_type == SqlWindowFunctionKind::AVG) {
1605  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1606  } else {
1608  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1609  }
1610  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1611  break;
1612  }
1613  case kDOUBLE: {
1614  const auto segment_tree =
1615  std::make_shared<SegmentTree<double, double>>(window_func_expr_columns_,
1616  input_col_ti,
1617  original_rowid_buf,
1618  ordered_rowid_buf_for_partition,
1619  partition_size,
1620  agg_type,
1622  aggregate_trees_depth_[partition_idx] =
1623  segment_tree ? segment_tree->getLeafDepth() : 0;
1624  if (agg_type == SqlWindowFunctionKind::AVG) {
1626  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1627  } else {
1629  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1630  }
1631  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1632  break;
1633  }
1634  default:
1635  UNREACHABLE();
1636  }
1637  } else {
1638  // handling a case of an empty partition
1639  aggregate_trees_depth_[partition_idx] = 0;
1640  if (input_col_ti.is_integer() || input_col_ti.is_decimal() ||
1641  input_col_ti.is_boolean() || input_col_ti.is_time_or_date()) {
1642  if (agg_type == SqlWindowFunctionKind::AVG) {
1644  nullptr;
1645  } else {
1646  aggregate_trees_.aggregate_tree_for_integer_type_[partition_idx] = nullptr;
1647  }
1648  } else {
1649  CHECK(input_col_ti.is_fp());
1650  if (agg_type == SqlWindowFunctionKind::AVG) {
1652  } else {
1653  aggregate_trees_.aggregate_tree_for_double_type_[partition_idx] = nullptr;
1654  }
1655  }
1656  }
1658 }
std::vector< SumAndCountPair< double > * > derived_aggregate_tree_for_double_type_
Definition: WindowContext.h:94
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:2925
int64_t * ordered_partition_null_start_pos_
bool allow_framing_on_time_or_date(SqlWindowFunctionKind kind)
std::vector< double * > aggregate_tree_for_double_type_
Definition: WindowContext.h:92
bool is_time_or_date() const
Definition: sqltypes.h:1032
#define UNREACHABLE()
Definition: Logger.h:338
const Analyzer::WindowFunction * window_func_
const int32_t * offsets() const
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
bool is_number() const
Definition: sqltypes.h:576
std::vector< std::shared_ptr< void > > segment_trees_owned_
bool is_boolean() const
Definition: sqltypes.h:582
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1703
std::pair< int64_t, int64_t > IndexPair
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:561
std::vector< const int8_t * > window_func_expr_columns_
SQLTypes get_int_type_by_size(size_t const nbytes)
Definition: sqltypes.h:1454
AggregateTreeForWindowFraming aggregate_trees_
#define CHECK(condition)
Definition: Logger.h:291
std::vector< SumAndCountPair< int64_t > * > derived_aggregate_tree_for_integer_type_
Definition: WindowContext.h:93
Definition: sqltypes.h:72
size_t * aggregate_trees_depth_
bool is_decimal() const
Definition: sqltypes.h:570
std::vector< int64_t * > aggregate_tree_for_integer_type_
Definition: WindowContext.h:91
int64_t * ordered_partition_null_end_pos_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void WindowFunctionContext::compute ( std::unordered_map< QueryPlanHash, size_t > &  sorted_partition_key_ref_count_map,
std::unordered_map< QueryPlanHash, std::shared_ptr< std::vector< int64_t >>> &  sorted_partition_cache,
std::unordered_map< QueryPlanHash, AggregateTreeForWindowFraming > &  aggregate_tree_map 
)

Definition at line 549 of file WindowContext.cpp.

References aggregate_trees_, AggregateTreeForWindowFraming::aggregate_trees_depth_, aggregate_trees_depth_, buildAggregationTreeForPartition(), CHECK, computeAggregateTreeCacheKey(), computeNullRangeOfSortedPartition(), computePartitionBuffer(), counts(), DEBUG_TIMER, elem_count_, fillPartitionEnd(), fillPartitionStart(), g_enable_parallel_window_partition_compute, g_parallel_window_partition_compute_threshold, Analyzer::WindowFunction::getArgs(), Analyzer::WindowFunction::getKind(), Analyzer::WindowFunction::getOrderKeys(), Analyzer::WindowFunction::hasFraming(), Analyzer::WindowFunction::isMissingValueFillingFunction(), needsToBuildAggregateTree(), offsets(), output_, threading_serial::parallel_for(), partitionCount(), payload(), resizeStorageForWindowFraming(), row_set_mem_owner_, sorted_partition_buf_, sorted_partition_cache_key_, sortPartition(), logger::thread_local_ids(), toString(), VLOG, window_func_, anonymous_namespace{WindowContext.cpp}::window_function_buffer_element_size(), window_function_is_aggregate(), and window_function_requires_peer_handling().

553  {
554  auto timer = DEBUG_TIMER(__func__);
555  CHECK(!output_);
556  if (elem_count_ == 0) {
557  return;
558  }
559  size_t output_buf_sz =
561  output_ = static_cast<int8_t*>(row_set_mem_owner_->allocate(output_buf_sz,
562  /*thread_idx=*/0));
563  bool const is_agg_func = window_function_is_aggregate(window_func_->getKind());
564  bool const need_window_partition_buf =
566  if (is_agg_func || need_window_partition_buf) {
569  need_window_partition_buf) {
571  }
572  }
573  std::unique_ptr<int64_t[]> scratchpad;
574  int64_t* intermediate_output_buffer;
575  if (is_agg_func || need_window_partition_buf) {
576  intermediate_output_buffer = reinterpret_cast<int64_t*>(output_);
577  } else {
578  output_buf_sz = sizeof(int64_t) * elem_count_;
579  scratchpad.reset(new int64_t[elem_count_]);
580  intermediate_output_buffer = scratchpad.get();
581  }
582  const bool should_parallelize{g_enable_parallel_window_partition_compute &&
583  elem_count_ >=
585 
586  auto cached_sorted_partition_it =
587  sorted_partition_cache.find(sorted_partition_cache_key_);
588  if (cached_sorted_partition_it != sorted_partition_cache.end()) {
589  auto& sorted_partition = cached_sorted_partition_it->second;
590  VLOG(1) << "Reuse cached sorted partition to compute window function context (key: "
592  << ", ordering condition: " << ::toString(window_func_->getOrderKeys())
593  << ")";
594  DEBUG_TIMER("Window Function Cached Sorted Partition Copy");
595  std::memcpy(intermediate_output_buffer, sorted_partition->data(), output_buf_sz);
596  if (need_window_partition_buf) {
597  sorted_partition_buf_ = sorted_partition;
598  }
599  } else {
600  // ordering partitions if necessary
601  const auto sort_partitions = [&](const size_t start, const size_t end) {
602  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
603  sortPartition(partition_idx,
604  intermediate_output_buffer + offsets()[partition_idx],
605  should_parallelize);
606  }
607  };
608 
609  if (should_parallelize) {
610  auto sorted_partition_copy_timer =
611  DEBUG_TIMER("Window Function Partition Sorting Parallelized");
612  tbb::parallel_for(tbb::blocked_range<int64_t>(0, partitionCount()),
613  [&, parent_thread_local_ids = logger::thread_local_ids()](
614  const tbb::blocked_range<int64_t>& r) {
616  parent_thread_local_ids.setNewThreadId();
617  sort_partitions(r.begin(), r.end());
618  });
619  } else {
620  auto sorted_partition_copy_timer =
621  DEBUG_TIMER("Window Function Partition Sorting Non-Parallelized");
622  sort_partitions(0, partitionCount());
623  }
624  auto sorted_partition_ref_cnt_it =
625  sorted_partition_key_ref_count_map.find(sorted_partition_cache_key_);
626  bool can_access_sorted_partition =
627  sorted_partition_ref_cnt_it != sorted_partition_key_ref_count_map.end() &&
628  sorted_partition_ref_cnt_it->second > 1;
629  if (can_access_sorted_partition || need_window_partition_buf) {
630  // keep the sorted partition only if it will be reused from other window function
631  // context of this query
632  sorted_partition_buf_ = std::make_shared<std::vector<int64_t>>(elem_count_);
633  DEBUG_TIMER("Window Function Sorted Partition Copy For Caching");
634  std::memcpy(
635  sorted_partition_buf_->data(), intermediate_output_buffer, output_buf_sz);
636  auto it = sorted_partition_cache.emplace(sorted_partition_cache_key_,
638  if (it.second) {
639  VLOG(1) << "Put sorted partition to cache (key: " << sorted_partition_cache_key_
640  << ", ordering condition: " << ::toString(window_func_->getOrderKeys())
641  << ")";
642  }
643  }
644  }
645 
646  if (need_window_partition_buf) {
647  const auto compute_ordered_partition_null_range = [=](const size_t start,
648  const size_t end) {
649  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
651  window_func_->getOrderKeys().front()->get_type_info(),
652  partition_idx,
653  payload() + offsets()[partition_idx],
654  intermediate_output_buffer + offsets()[partition_idx]);
655  }
656  };
657  auto partition_count = partitionCount();
658 
659  if (should_parallelize) {
660  auto partition_compuation_timer =
661  DEBUG_TIMER("Window Function Ordered-Partition Null-Range Compute");
662  tbb::parallel_for(tbb::blocked_range<int64_t>(0, partitionCount()),
663  [&, parent_thread_local_ids = logger::thread_local_ids()](
664  const tbb::blocked_range<int64_t>& r) {
666  parent_thread_local_ids.setNewThreadId();
667  compute_ordered_partition_null_range(r.begin(), r.end());
668  });
669  } else {
670  auto partition_compuation_timer = DEBUG_TIMER(
671  "Window Function Non-Parallelized Ordered-Partition Null-Range Compute");
672  compute_ordered_partition_null_range(0, partitionCount());
673  }
674  auto const cache_key = computeAggregateTreeCacheKey();
675  auto const c_it = aggregate_tree_map.find(cache_key);
676  if (c_it != aggregate_tree_map.cend()) {
677  VLOG(1) << "Reuse aggregate tree for window function framing";
679  aggregate_trees_ = c_it->second;
680  memcpy(aggregate_trees_depth_,
682  sizeof(size_t) * partition_count);
683  } else {
685  const auto build_aggregation_tree_for_partitions = [=](const size_t start,
686  const size_t end) {
687  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
688  // build a segment tree for the partition
689  // todo (yoonmin) : support generic window function expression
690  // i.e., when window_func_expr_columns_.size() > 1
691  SQLTypeInfo const input_col_ti =
692  window_func_->getArgs().front()->get_type_info();
693  const auto partition_size = counts()[partition_idx];
695  partition_idx,
696  partition_size,
697  payload() + offsets()[partition_idx],
698  intermediate_output_buffer,
699  input_col_ti);
700  }
701  };
703  if (should_parallelize) {
704  auto partition_compuation_timer = DEBUG_TIMER(
705  "Window Function Parallelized Segment Tree Construction for Partitions");
706  tbb::parallel_for(tbb::blocked_range<int64_t>(0, partitionCount()),
707  [=, parent_thread_local_ids = logger::thread_local_ids()](
708  const tbb::blocked_range<int64_t>& r) {
710  parent_thread_local_ids.setNewThreadId();
711  build_aggregation_tree_for_partitions(r.begin(), r.end());
712  });
713  } else {
714  auto partition_compuation_timer = DEBUG_TIMER(
715  "Window Function Non-Parallelized Segment Tree Construction for "
716  "Partitions");
717  build_aggregation_tree_for_partitions(0, partition_count);
718  }
719  }
720  CHECK(aggregate_tree_map.emplace(cache_key, aggregate_trees_).second);
721  VLOG(2) << "Put aggregate tree for the window framing";
722  }
723  }
724 
725  const auto compute_partitions = [=](const size_t start, const size_t end) {
726  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
727  computePartitionBuffer(partition_idx,
728  intermediate_output_buffer + offsets()[partition_idx],
729  window_func_);
730  }
731  };
732 
733  if (should_parallelize) {
734  auto partition_compuation_timer = DEBUG_TIMER("Window Function Partition Compute");
735  tbb::parallel_for(tbb::blocked_range<int64_t>(0, partitionCount()),
736  [&, parent_thread_local_ids = logger::thread_local_ids()](
737  const tbb::blocked_range<int64_t>& r) {
739  parent_thread_local_ids.setNewThreadId();
740  compute_partitions(r.begin(), r.end());
741  });
742  } else {
743  auto partition_compuation_timer =
744  DEBUG_TIMER("Window Function Non-Parallelized Partition Compute");
745  compute_partitions(0, partitionCount());
746  }
747 
748  if (is_agg_func || need_window_partition_buf) {
749  // If window function is aggregate we were able to write to the final output buffer
750  // directly in computePartition and we are done.
751  return;
752  }
753 
754  auto output_i64 = reinterpret_cast<int64_t*>(output_);
755  const auto payload_copy = [=](const size_t start, const size_t end) {
756  for (size_t i = start; i < end; ++i) {
757  output_i64[payload()[i]] = intermediate_output_buffer[i];
758  }
759  };
760  if (should_parallelize) {
761  auto payload_copy_timer =
762  DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Parallelized");
763  tbb::parallel_for(tbb::blocked_range<int64_t>(0, elem_count_),
764  [&, parent_thread_local_ids = logger::thread_local_ids()](
765  const tbb::blocked_range<int64_t>& r) {
767  parent_thread_local_ids.setNewThreadId();
768  payload_copy(r.begin(), r.end());
769  });
770  } else {
771  auto payload_copy_timer =
772  DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Non-Parallelized");
773  payload_copy(0, elem_count_);
774  }
775 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:2925
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
bool isMissingValueFillingFunction() const
Definition: Analyzer.h:2986
void computeNullRangeOfSortedPartition(const SQLTypeInfo &order_col_ti, size_t partition_idx, const int32_t *original_col_idx_buf, const int64_t *ordered_col_idx_buf)
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
const int32_t * offsets() const
size_t g_parallel_window_partition_compute_threshold
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:2933
const bool needsToBuildAggregateTree() const
std::shared_ptr< std::vector< int64_t > > sorted_partition_buf_
size_t partitionCount() const
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:2927
QueryPlanHash sorted_partition_cache_key_
void buildAggregationTreeForPartition(SqlWindowFunctionKind agg_type, size_t partition_idx, size_t partition_size, const int32_t *original_rowid_buf, const int64_t *ordered_rowid_buf, const SQLTypeInfo &input_col_ti)
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1703
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:61
AggregateTreeForWindowFraming aggregate_trees_
void sortPartition(const size_t partition_idx, int64_t *output_for_partition_buff, bool should_parallelize)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void resizeStorageForWindowFraming(bool const for_reuse=false)
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
bool g_enable_parallel_window_partition_compute
void computePartitionBuffer(const size_t partition_idx, int64_t *output_for_partition_buff, const Analyzer::WindowFunction *window_func)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
const QueryPlanHash computeAggregateTreeCacheKey() const
const int32_t * payload() const
size_t * aggregate_trees_depth_
bool hasFraming() const
Definition: Analyzer.h:2961
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:882
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

QueryPlanHash const WindowFunctionContext::computeAggregateTreeCacheKey ( ) const
private

Definition at line 1814 of file WindowContext.cpp.

References Analyzer::WindowFunction::getArgs(), Analyzer::WindowFunction::getCollation(), Analyzer::WindowFunction::getKind(), Analyzer::WindowFunction::getOrderKeys(), Analyzer::WindowFunction::getPartitionKeys(), hash_value(), toString(), and window_func_.

Referenced by compute().

1814  {
1815  // aggregate tree is constructed per window aggregate function kind, input expression,
1816  // partition key(s) and ordering key
1817  // this means when two window definitions have the same condition listed above but
1818  // differ in frame bound declaration,
1819  // they can share the same aggregate tree
1820  auto cache_key = boost::hash_value(::toString(window_func_->getKind()));
1821  boost::hash_combine(cache_key, ::toString(window_func_->getArgs()));
1822  boost::hash_combine(cache_key, ::toString(window_func_->getPartitionKeys()));
1823  boost::hash_combine(cache_key, ::toString(window_func_->getOrderKeys()));
1824  for (auto& order_entry : window_func_->getCollation()) {
1825  boost::hash_combine(cache_key, order_entry.toString());
1826  }
1827  return cache_key;
1828 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:2925
const Analyzer::WindowFunction * window_func_
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:2933
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:2951
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:2927
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1703
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
Definition: RelAlgDag.cpp:3548
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:2929

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void WindowFunctionContext::computeNullRangeOfSortedPartition ( const SQLTypeInfo order_col_ti,
size_t  partition_idx,
const int32_t *  original_col_idx_buf,
const int64_t *  ordered_col_idx_buf 
)
private

Definition at line 849 of file WindowContext.cpp.

References counts(), logger::FATAL, SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), SQLTypeInfo::is_boolean(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_fp(), SQLTypeInfo::is_integer(), SQLTypeInfo::is_time_or_date(), kDOUBLE, kFLOAT, LOG, null_val_bit_pattern(), order_columns_, ordered_partition_null_end_pos_, and ordered_partition_null_start_pos_.

Referenced by compute().

853  {
854  IndexPair null_range;
855  const auto partition_size = counts()[partition_idx];
856  if (partition_size > 0) {
857  if (order_col_ti.is_integer() || order_col_ti.is_decimal() ||
858  order_col_ti.is_time_or_date() || order_col_ti.is_boolean()) {
859  FindNullRange const null_range_info{
860  original_col_idx_buf, ordered_col_idx_buf, partition_size};
861  switch (order_col_ti.get_size()) {
862  case 8:
863  null_range =
864  null_range_info.find_null_range_int<int64_t>(order_columns_.front());
865  break;
866  case 4:
867  null_range =
868  null_range_info.find_null_range_int<int32_t>(order_columns_.front());
869  break;
870  case 2:
871  null_range =
872  null_range_info.find_null_range_int<int16_t>(order_columns_.front());
873  break;
874  case 1:
875  null_range =
876  null_range_info.find_null_range_int<int8_t>(order_columns_.front());
877  break;
878  default:
879  LOG(FATAL) << "Invalid type size: " << order_col_ti.get_size();
880  }
881  } else if (order_col_ti.is_fp()) {
882  const auto null_bit_pattern =
883  null_val_bit_pattern(order_col_ti, order_col_ti.get_type() == kFLOAT);
884  FindNullRange const null_range_info{
885  original_col_idx_buf, ordered_col_idx_buf, partition_size, null_bit_pattern};
886  switch (order_col_ti.get_type()) {
887  case kFLOAT:
888  null_range = null_range_info.find_null_range_fp<float>(order_columns_.front());
889  break;
890  case kDOUBLE:
891  null_range = null_range_info.find_null_range_fp<double>(order_columns_.front());
892  break;
893  default:
894  LOG(FATAL) << "Invalid float type";
895  }
896  } else {
897  LOG(FATAL) << "Invalid column type for window aggregation over the frame";
898  }
899  }
900  ordered_partition_null_start_pos_[partition_idx] = null_range.first;
901  ordered_partition_null_end_pos_[partition_idx] = null_range.second + 1;
902 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
int64_t * ordered_partition_null_start_pos_
bool is_time_or_date() const
Definition: sqltypes.h:1032
#define LOG(tag)
Definition: Logger.h:285
bool is_fp() const
Definition: sqltypes.h:573
const int32_t * counts() const
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
bool is_integer() const
Definition: sqltypes.h:567
bool is_boolean() const
Definition: sqltypes.h:582
std::pair< int64_t, int64_t > IndexPair
std::vector< const int8_t * > order_columns_
bool is_decimal() const
Definition: sqltypes.h:570
int64_t * ordered_partition_null_end_pos_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void WindowFunctionContext::computePartitionBuffer ( const size_t  partition_idx,
int64_t *  output_for_partition_buff,
const Analyzer::WindowFunction window_func 
)
private

Definition at line 1312 of file WindowContext.cpp.

References anonymous_namespace{WindowContext.cpp}::apply_lag_to_partition(), anonymous_namespace{WindowContext.cpp}::apply_nth_value_to_partition(), anonymous_namespace{WindowContext.cpp}::apply_original_index_to_partition(), anonymous_namespace{WindowContext.cpp}::apply_permutation_to_partition(), run_benchmark_import::args, AVG, BACKWARD_FILL, CHECK, CHECK_EQ, CONDITIONAL_CHANGE_EVENT, gpu_enabled::copy(), COUNT, COUNT_IF, counts(), createComparator(), CUME_DIST, DENSE_RANK, FIRST_VALUE, FIRST_VALUE_IN_FRAME, FORWARD_FILL, anonymous_namespace{WindowContext.cpp}::get_int_constant_from_expr(), anonymous_namespace{WindowContext.cpp}::get_lag_or_lead_argument(), anonymous_namespace{WindowContext.cpp}::get_target_idx_for_first_or_last_value_func(), Analyzer::WindowFunction::getArgs(), Analyzer::WindowFunction::getKind(), GT, anonymous_namespace{WindowContext.cpp}::index_to_cume_dist(), anonymous_namespace{WindowContext.cpp}::index_to_dense_rank(), anonymous_namespace{WindowContext.cpp}::index_to_ntile(), anonymous_namespace{WindowContext.cpp}::index_to_partition_end(), anonymous_namespace{WindowContext.cpp}::index_to_percent_rank(), anonymous_namespace{WindowContext.cpp}::index_to_rank(), anonymous_namespace{WindowContext.cpp}::index_to_row_number(), LAG, LAG_IN_FRAME, LAST_VALUE, LAST_VALUE_IN_FRAME, LEAD, LEAD_IN_FRAME, LT, MAX, MIN, anonymous_namespace{Utm.h}::n, NTH_VALUE, NTH_VALUE_IN_FRAME, NTILE, offsets(), partitionEnd(), payload(), PERCENT_RANK, RANK, ROW_NUMBER, SUM, SUM_IF, window_func_, and window_function_requires_peer_handling().

Referenced by compute().

1315  {
1316  const size_t partition_size{static_cast<size_t>(counts()[partition_idx])};
1317  if (partition_size == 0) {
1318  return;
1319  }
1320  const auto offset = offsets()[partition_idx];
1321  auto partition_comparator = createComparator(partition_idx);
1322  const auto col_tuple_comparator = [&partition_comparator](const int64_t lhs,
1323  const int64_t rhs) {
1324  for (const auto& comparator : partition_comparator) {
1325  const auto comparator_result = comparator(lhs, rhs);
1326  switch (comparator_result) {
1328  return true;
1330  return false;
1331  default:
1332  // WindowComparatorResult::EQ: continue to next comparator
1333  continue;
1334  }
1335  }
1336  // If here WindowFunctionContext::WindowComparatorResult::KEQ for all keys
1337  // return false as sort algo must enforce weak ordering
1338  return false;
1339  };
1340  switch (window_func->getKind()) {
1342  const auto row_numbers =
1343  index_to_row_number(output_for_partition_buff, partition_size);
1344  std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
1345  break;
1346  }
1348  const auto rank =
1349  index_to_rank(output_for_partition_buff, partition_size, col_tuple_comparator);
1350  std::copy(rank.begin(), rank.end(), output_for_partition_buff);
1351  break;
1352  }
1354  const auto dense_rank = index_to_dense_rank(
1355  output_for_partition_buff, partition_size, col_tuple_comparator);
1356  std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
1357  break;
1358  }
1360  const auto percent_rank = index_to_percent_rank(
1361  output_for_partition_buff, partition_size, col_tuple_comparator);
1362  std::copy(percent_rank.begin(),
1363  percent_rank.end(),
1364  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
1365  break;
1366  }
1368  const auto cume_dist = index_to_cume_dist(
1369  output_for_partition_buff, partition_size, col_tuple_comparator);
1370  std::copy(cume_dist.begin(),
1371  cume_dist.end(),
1372  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
1373  break;
1374  }
1376  const auto& args = window_func->getArgs();
1377  CHECK_EQ(args.size(), size_t(1));
1378  const auto n = get_int_constant_from_expr(args.front().get());
1379  const auto ntile = index_to_ntile(output_for_partition_buff, partition_size, n);
1380  std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
1381  break;
1382  }
1385  const auto lag_or_lead = get_lag_or_lead_argument(window_func);
1386  const auto partition_row_offsets = payload() + offset;
1388  lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
1389  break;
1390  }
1393  const auto target_idx =
1394  get_target_idx_for_first_or_last_value_func(window_func, partition_size);
1395  const auto partition_row_offsets = payload() + offset;
1397  partition_row_offsets, output_for_partition_buff, partition_size, target_idx);
1398  break;
1399  }
1401  auto const n_value_ptr =
1402  dynamic_cast<Analyzer::Constant*>(window_func_->getArgs()[1].get());
1403  CHECK(n_value_ptr);
1404  auto const n_value = static_cast<size_t>(n_value_ptr->get_constval().intval);
1405  const auto partition_row_offsets = payload() + offset;
1406  if (n_value < partition_size) {
1408  partition_row_offsets, output_for_partition_buff, partition_size, n_value);
1409  } else {
1410  // when NTH_VALUE of the current row is NULL, we keep the NULL value in the
1411  // current row's output storage in the query output buffer, so we assign the
1412  // original index of the current row to the corresponding slot in
1413  // `output_for_partition_buff`
1415  partition_row_offsets, output_for_partition_buff, partition_size);
1416  }
1417  break;
1418  }
1434  const auto partition_row_offsets = payload() + offset;
1435  if (window_function_requires_peer_handling(window_func)) {
1437  offset,
1438  output_for_partition_buff,
1439  partition_size,
1440  col_tuple_comparator);
1441  }
1443  output_for_partition_buff, partition_row_offsets, partition_size);
1444  break;
1445  }
1446  default: {
1447  std::ostringstream oss;
1448  oss << "Window function not supported yet: " << window_func_->getKind();
1449  throw std::runtime_error(oss.str());
1450  }
1451  }
1452 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:2925
size_t get_target_idx_for_first_or_last_value_func(const Analyzer::WindowFunction *window_func, const size_t partition_size)
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
int64_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const int32_t * offsets() const
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< Comparator > createComparator(size_t partition_idx)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:2927
void apply_nth_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size, const size_t target_pos)
const int8_t * partitionEnd() const
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
void apply_lag_to_partition(const int64_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
void apply_original_index_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
#define CHECK(condition)
Definition: Logger.h:291
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const int32_t * payload() const
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
constexpr double n
Definition: Utm.h:38

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int32_t * WindowFunctionContext::counts ( ) const

Definition at line 1792 of file WindowContext.cpp.

References device_type_, dummy_count_, and partitions_.

Referenced by CodeGenerator::codegenFixedLengthColVarInWindow(), Executor::codegenLoadPartitionBuffers(), compute(), computeNullRangeOfSortedPartition(), computePartitionBuffer(), fillPartitionEnd(), fillPartitionStart(), partitionCount(), sortPartition(), and WindowFunctionContext().

1792  {
1793  if (partitions_) {
1794  return reinterpret_cast<const int32_t*>(
1795  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->countBufferOff());
1796  }
1797  return &dummy_count_;
1798 }
const int32_t dummy_count_
std::shared_ptr< HashJoin > partitions_
const ExecutorDeviceType device_type_

+ Here is the caller graph for this function:

std::vector< WindowFunctionContext::Comparator > WindowFunctionContext::createComparator ( size_t  partition_idx)

Definition at line 904 of file WindowContext.cpp.

References CHECK, CHECK_EQ, Analyzer::WindowFunction::getCollation(), Analyzer::WindowFunction::getOrderKeys(), makeComparator(), offsets(), order_columns_, payload(), and window_func_.

Referenced by computePartitionBuffer(), and sortPartition().

905  {
906  // create tuple comparator
907  std::vector<WindowFunctionContext::Comparator> partition_comparator;
908  const auto& order_keys = window_func_->getOrderKeys();
909  const auto& collation = window_func_->getCollation();
910  CHECK_EQ(order_keys.size(), collation.size());
911  for (size_t order_column_idx = 0; order_column_idx < order_columns_.size();
912  ++order_column_idx) {
913  auto order_column_buffer = order_columns_[order_column_idx];
914  const auto order_col =
915  dynamic_cast<const Analyzer::ColumnVar*>(order_keys[order_column_idx].get());
916  CHECK(order_col);
917  const auto& order_col_collation = collation[order_column_idx];
918  auto comparator = makeComparator(order_col,
919  order_column_buffer,
920  payload() + offsets()[partition_idx],
921  !order_col_collation.is_desc,
922  order_col_collation.nulls_first);
923  if (order_col_collation.is_desc) {
924  comparator = [comparator](const int64_t lhs, const int64_t rhs) {
925  return comparator(rhs, lhs);
926  };
927  }
928  partition_comparator.push_back(comparator);
929  }
930  return partition_comparator;
931 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool asc_ordering, const bool nulls_first)
const Analyzer::WindowFunction * window_func_
const int32_t * offsets() const
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:2933
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:2951
#define CHECK(condition)
Definition: Logger.h:291
std::vector< const int8_t * > order_columns_
const int32_t * payload() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t WindowFunctionContext::elementCount ( ) const

Definition at line 1026 of file WindowContext.cpp.

References elem_count_.

Referenced by Executor::codegenCurrentPartitionIndex().

1026  {
1027  return elem_count_;
1028 }

+ Here is the caller graph for this function:

void WindowFunctionContext::fillPartitionEnd ( )
private

Definition at line 1730 of file WindowContext.cpp.

References agg_count_distinct_bitmap(), Bitmap, checked_calloc(), counts(), CPU, elem_count_, gpu_enabled::partial_sum(), partition_end_, partition_start_offset_, partitionCount(), and partitions_.

Referenced by compute().

1730  {
1732  0,
1733  0,
1734  static_cast<int64_t>(elem_count_),
1735  false,
1737  1};
1738  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
1739  if (partitions_) {
1740  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
1741  }
1742  partition_end_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
1743  auto partition_end_handle = reinterpret_cast<int64_t>(partition_end_);
1744  int64_t partition_count = partitionCount();
1746  // if we have `partition_start_offset_`, we can reuse it for this logic
1747  // but note that it has partition_count + 1 elements where the first element is zero
1748  // which means the first partition's start offset is zero
1749  // and rest of them can represent values required for this logic
1750  for (int64_t i = 0; i < partition_count - 1; ++i) {
1751  if (partition_start_offset_[i + 1] == 0) {
1752  continue;
1753  }
1755  &partition_end_handle, partition_start_offset_[i + 1] - 1, 0, 0);
1756  }
1757  if (elem_count_) {
1758  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0, 0);
1759  }
1760  } else {
1761  std::vector<size_t> partition_offsets(partition_count);
1762  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
1763  for (int64_t i = 0; i < partition_count - 1; ++i) {
1764  if (partition_offsets[i] == 0) {
1765  continue;
1766  }
1767  agg_count_distinct_bitmap(&partition_end_handle, partition_offsets[i] - 1, 0, 0);
1768  }
1769  if (elem_count_) {
1770  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0, 0);
1771  }
1772  }
1773 }
RUNTIME_EXPORT ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val, const int64_t bucket_size)
const int32_t * counts() const
size_t partitionCount() const
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
std::shared_ptr< HashJoin > partitions_
int64_t * partition_start_offset_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void WindowFunctionContext::fillPartitionStart ( )
private

Definition at line 1696 of file WindowContext.cpp.

References agg_count_distinct_bitmap(), Bitmap, checked_calloc(), counts(), CPU, elem_count_, gpu_enabled::partial_sum(), partition_start_, partition_start_offset_, partitionCount(), and partitions_.

Referenced by compute().

1696  {
1698  0,
1699  0,
1700  static_cast<int64_t>(elem_count_),
1701  false,
1703  1};
1704  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
1705  if (partitions_) {
1706  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
1707  }
1708  partition_start_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
1709  int64_t partition_count = partitionCount();
1710  auto partition_start_handle = reinterpret_cast<int64_t>(partition_start_);
1711  agg_count_distinct_bitmap(&partition_start_handle, 0, 0, 0);
1713  // if we have `partition_start_offset_`, we can reuse it for this logic
1714  // but note that it has partition_count + 1 elements where the first element is zero
1715  // which means the first partition's start offset is zero
1716  // and rest of them can represent values required for this logic
1717  for (int64_t i = 0; i < partition_count - 1; ++i) {
1719  &partition_start_handle, partition_start_offset_[i + 1], 0, 0);
1720  }
1721  } else {
1722  std::vector<size_t> partition_offsets(partition_count);
1723  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
1724  for (int64_t i = 0; i < partition_count - 1; ++i) {
1725  agg_count_distinct_bitmap(&partition_start_handle, partition_offsets[i], 0, 0);
1726  }
1727  }
1728 }
RUNTIME_EXPORT ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val, const int64_t bucket_size)
const int32_t * counts() const
size_t partitionCount() const
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
std::shared_ptr< HashJoin > partitions_
int64_t * partition_start_offset_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t * WindowFunctionContext::getAggregateTreeDepth ( ) const

Definition at line 1680 of file WindowContext.cpp.

References aggregate_trees_depth_.

1680  {
1681  return aggregate_trees_depth_;
1682 }
size_t * aggregate_trees_depth_
size_t WindowFunctionContext::getAggregateTreeFanout ( ) const

Definition at line 1684 of file WindowContext.cpp.

References aggregate_trees_fan_out_.

1684  {
1685  return aggregate_trees_fan_out_;
1686 }
double ** WindowFunctionContext::getAggregationTreesForDoubleTypeWindowExpr ( ) const

Definition at line 1664 of file WindowContext.cpp.

References AggregateTreeForWindowFraming::aggregate_tree_for_double_type_, and aggregate_trees_.

1664  {
1665  return const_cast<double**>(aggregate_trees_.aggregate_tree_for_double_type_.data());
1666 }
std::vector< double * > aggregate_tree_for_double_type_
Definition: WindowContext.h:92
AggregateTreeForWindowFraming aggregate_trees_
int64_t ** WindowFunctionContext::getAggregationTreesForIntegerTypeWindowExpr ( ) const

Definition at line 1660 of file WindowContext.cpp.

References AggregateTreeForWindowFraming::aggregate_tree_for_integer_type_, and aggregate_trees_.

1660  {
1661  return const_cast<int64_t**>(aggregate_trees_.aggregate_tree_for_integer_type_.data());
1662 }
AggregateTreeForWindowFraming aggregate_trees_
std::vector< int64_t * > aggregate_tree_for_integer_type_
Definition: WindowContext.h:91
const std::vector< const int8_t * > & WindowFunctionContext::getColumnBufferForWindowFunctionExpressions ( ) const

Definition at line 174 of file WindowContext.cpp.

References window_func_expr_columns_.

174  {
176 }
std::vector< const int8_t * > window_func_expr_columns_
SumAndCountPair< double > ** WindowFunctionContext::getDerivedAggregationTreesForDoubleTypeWindowExpr ( ) const

Definition at line 1675 of file WindowContext.cpp.

References aggregate_trees_, and AggregateTreeForWindowFraming::derived_aggregate_tree_for_double_type_.

1675  {
1676  return const_cast<SumAndCountPair<double>**>(
1678 }
std::vector< SumAndCountPair< double > * > derived_aggregate_tree_for_double_type_
Definition: WindowContext.h:94
AggregateTreeForWindowFraming aggregate_trees_
SumAndCountPair< int64_t > ** WindowFunctionContext::getDerivedAggregationTreesForIntegerTypeWindowExpr ( ) const

Definition at line 1669 of file WindowContext.cpp.

References aggregate_trees_, and AggregateTreeForWindowFraming::derived_aggregate_tree_for_integer_type_.

1669  {
1670  return const_cast<SumAndCountPair<int64_t>**>(
1672 }
AggregateTreeForWindowFraming aggregate_trees_
std::vector< SumAndCountPair< int64_t > * > derived_aggregate_tree_for_integer_type_
Definition: WindowContext.h:93
int64_t * WindowFunctionContext::getNullValueEndPos ( ) const

Definition at line 1692 of file WindowContext.cpp.

References ordered_partition_null_end_pos_.

Referenced by Executor::codegenFrameNullRange().

1692  {
1694 }
int64_t * ordered_partition_null_end_pos_

+ Here is the caller graph for this function:

int64_t * WindowFunctionContext::getNullValueStartPos ( ) const

Definition at line 1688 of file WindowContext.cpp.

References ordered_partition_null_start_pos_.

Referenced by Executor::codegenFrameNullRange().

1688  {
1690 }
int64_t * ordered_partition_null_start_pos_

+ Here is the caller graph for this function:

const std::vector< const int8_t * > & WindowFunctionContext::getOrderKeyColumnBuffers ( ) const

Definition at line 178 of file WindowContext.cpp.

References order_columns_.

Referenced by Executor::codegenLoadOrderKeyBufPtr(), and Executor::codegenWindowFrameBounds().

179  {
180  return order_columns_;
181 }
std::vector< const int8_t * > order_columns_

+ Here is the caller graph for this function:

const std::vector< SQLTypeInfo > & WindowFunctionContext::getOrderKeyColumnBufferTypes ( ) const

Definition at line 183 of file WindowContext.cpp.

References order_columns_ti_.

Referenced by CodeGenerator::codegenFixedLengthColVar(), and Executor::codegenLoadOrderKeyBufPtr().

184  {
185  return order_columns_ti_;
186 }
std::vector< SQLTypeInfo > order_columns_ti_

+ Here is the caller graph for this function:

const Analyzer::WindowFunction * WindowFunctionContext::getWindowFunction ( ) const

Definition at line 980 of file WindowContext.cpp.

References window_func_.

Referenced by Executor::codegenCurrentPartitionIndex(), CodeGenerator::codegenFixedLengthColVar(), CodeGenerator::codegenFixedLengthColVarInWindow(), Executor::codegenLoadCurrentValueFromColBuf(), Executor::codegenLoadOrderKeyBufPtr(), Executor::codegenWindowFrameBounds(), Executor::codegenWindowFunction(), Executor::getFirstOrderColTypeInfo(), and Executor::getOrderKeyTypeName().

980  {
981  return window_func_;
982 }
const Analyzer::WindowFunction * window_func_

+ Here is the caller graph for this function:

WindowFunctionContext::Comparator WindowFunctionContext::makeComparator ( const Analyzer::ColumnVar col_var,
const int8_t *  partition_values,
const int32_t *  partition_indices,
const bool  asc_ordering,
const bool  nulls_first 
)
staticprivate

Definition at line 1174 of file WindowContext.cpp.

References logger::FATAL, Analyzer::Expr::get_type_info(), kDOUBLE, kFLOAT, and LOG.

Referenced by createComparator().

1179  {
1180  const auto& ti = col_var->get_type_info();
1181  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1182  switch (ti.get_size()) {
1183  case 8: {
1184  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1185  const int64_t lhs, const int64_t rhs) {
1186  return asc_ordering ? integer_comparator_asc<int64_t>(order_column_buffer,
1187  ti,
1188  partition_indices,
1189  lhs,
1190  rhs,
1191  asc_ordering,
1192  nulls_first)
1193  : integer_comparator_desc<int64_t>(order_column_buffer,
1194  ti,
1195  partition_indices,
1196  lhs,
1197  rhs,
1198  asc_ordering,
1199  nulls_first);
1200  };
1201  }
1202  case 4: {
1203  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1204  const int64_t lhs, const int64_t rhs) {
1205  return asc_ordering ? integer_comparator_asc<int32_t>(order_column_buffer,
1206  ti,
1207  partition_indices,
1208  lhs,
1209  rhs,
1210  asc_ordering,
1211  nulls_first)
1212  : integer_comparator_desc<int32_t>(order_column_buffer,
1213  ti,
1214  partition_indices,
1215  lhs,
1216  rhs,
1217  asc_ordering,
1218  nulls_first);
1219  };
1220  }
1221  case 2: {
1222  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1223  const int64_t lhs, const int64_t rhs) {
1224  return asc_ordering ? integer_comparator_asc<int16_t>(order_column_buffer,
1225  ti,
1226  partition_indices,
1227  lhs,
1228  rhs,
1229  asc_ordering,
1230  nulls_first)
1231  : integer_comparator_desc<int16_t>(order_column_buffer,
1232  ti,
1233  partition_indices,
1234  lhs,
1235  rhs,
1236  asc_ordering,
1237  nulls_first);
1238  };
1239  }
1240  case 1: {
1241  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1242  const int64_t lhs, const int64_t rhs) {
1243  return asc_ordering ? integer_comparator_asc<int8_t>(order_column_buffer,
1244  ti,
1245  partition_indices,
1246  lhs,
1247  rhs,
1248  asc_ordering,
1249  nulls_first)
1250  : integer_comparator_desc<int8_t>(order_column_buffer,
1251  ti,
1252  partition_indices,
1253  lhs,
1254  rhs,
1255  asc_ordering,
1256  nulls_first);
1257  };
1258  }
1259  default: {
1260  LOG(FATAL) << "Invalid type size: " << ti.get_size();
1261  }
1262  }
1263  }
1264  if (ti.is_fp()) {
1265  switch (ti.get_type()) {
1266  case kFLOAT: {
1267  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1268  const int64_t lhs, const int64_t rhs) {
1269  return asc_ordering ? fp_comparator_asc<float, int32_t>(order_column_buffer,
1270  ti,
1271  partition_indices,
1272  lhs,
1273  rhs,
1274  asc_ordering,
1275  nulls_first)
1276  : fp_comparator_desc<float, int32_t>(order_column_buffer,
1277  ti,
1278  partition_indices,
1279  lhs,
1280  rhs,
1281  asc_ordering,
1282  nulls_first);
1283  };
1284  }
1285  case kDOUBLE: {
1286  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1287  const int64_t lhs, const int64_t rhs) {
1288  return asc_ordering ? fp_comparator_asc<double, int64_t>(order_column_buffer,
1289  ti,
1290  partition_indices,
1291  lhs,
1292  rhs,
1293  asc_ordering,
1294  nulls_first)
1295  : fp_comparator_desc<double, int64_t>(order_column_buffer,
1296  ti,
1297  partition_indices,
1298  lhs,
1299  rhs,
1300  asc_ordering,
1301  nulls_first);
1302  };
1303  }
1304  default: {
1305  LOG(FATAL) << "Invalid float type";
1306  }
1307  }
1308  }
1309  throw std::runtime_error("Type not supported yet");
1310 }
#define LOG(tag)
Definition: Logger.h:285
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const bool WindowFunctionContext::needsToBuildAggregateTree ( ) const

Definition at line 1809 of file WindowContext.cpp.

References elem_count_, Analyzer::WindowFunction::hasAggregateTreeRequiredWindowFunc(), Analyzer::WindowFunction::hasFraming(), and window_func_.

Referenced by compute().

1809  {
1810  return window_func_->hasFraming() &&
1812 }
bool hasAggregateTreeRequiredWindowFunc() const
Definition: Analyzer.h:2973
const Analyzer::WindowFunction * window_func_
bool hasFraming() const
Definition: Analyzer.h:2961

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int32_t * WindowFunctionContext::offsets ( ) const

Definition at line 1784 of file WindowContext.cpp.

References device_type_, dummy_offset_, and partitions_.

Referenced by buildAggregationTreeForPartition(), compute(), computePartitionBuffer(), createComparator(), and partitionCount().

1784  {
1785  if (partitions_) {
1786  return reinterpret_cast<const int32_t*>(
1787  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->offsetBufferOff());
1788  }
1789  return &dummy_offset_;
1790 }
const int32_t dummy_offset_
std::shared_ptr< HashJoin > partitions_
const ExecutorDeviceType device_type_

+ Here is the caller graph for this function:

WindowFunctionContext& WindowFunctionContext::operator= ( const WindowFunctionContext )
delete
const int8_t * WindowFunctionContext::output ( ) const

Definition at line 984 of file WindowContext.cpp.

References output_.

Referenced by CodeGenerator::codegenWindowPosition().

984  {
985  return output_;
986 }

+ Here is the caller graph for this function:

size_t WindowFunctionContext::partitionCount ( ) const

Definition at line 1800 of file WindowContext.cpp.

References CHECK_GE, counts(), offsets(), and partitions_.

Referenced by Executor::codegenCurrentPartitionIndex(), compute(), fillPartitionEnd(), fillPartitionStart(), resizeStorageForWindowFraming(), and WindowFunctionContext().

1800  {
1801  if (partitions_) {
1802  const auto partition_count = counts() - offsets();
1803  CHECK_GE(partition_count, 0);
1804  return partition_count;
1805  }
1806  return 1; // non-partitioned window function
1807 }
#define CHECK_GE(x, y)
Definition: Logger.h:306
const int32_t * counts() const
const int32_t * offsets() const
std::shared_ptr< HashJoin > partitions_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * WindowFunctionContext::partitionEnd ( ) const

Definition at line 1022 of file WindowContext.cpp.

References partition_end_.

Referenced by computePartitionBuffer().

1022  {
1023  return partition_end_;
1024 }

+ Here is the caller graph for this function:

const int64_t * WindowFunctionContext::partitionNumCountBuf ( ) const

Definition at line 1008 of file WindowContext.cpp.

References CHECK, and partition_start_offset_.

Referenced by Executor::codegenCurrentPartitionIndex().

1008  {
1010  return partition_start_offset_ + 1;
1011 }
int64_t * partition_start_offset_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

const int8_t * WindowFunctionContext::partitionStart ( ) const

Definition at line 1018 of file WindowContext.cpp.

References partition_start_.

1018  {
1019  return partition_start_;
1020 }
const int64_t * WindowFunctionContext::partitionStartOffset ( ) const

Definition at line 1003 of file WindowContext.cpp.

References CHECK, and partition_start_offset_.

Referenced by Executor::codegenLoadPartitionBuffers().

1003  {
1005  return partition_start_offset_;
1006 }
int64_t * partition_start_offset_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

const int32_t * WindowFunctionContext::payload ( ) const

Definition at line 1775 of file WindowContext.cpp.

References device_type_, dummy_payload_, and partitions_.

Referenced by Executor::codegenCurrentPartitionIndex(), Executor::codegenLoadPartitionBuffers(), compute(), computePartitionBuffer(), and createComparator().

1775  {
1776  if (partitions_) {
1777  return reinterpret_cast<const int32_t*>(
1778  partitions_->getJoinHashBuffer(device_type_, 0) +
1779  partitions_->payloadBufferOff());
1780  }
1781  return dummy_payload_; // non-partitioned window function
1782 }
std::shared_ptr< HashJoin > partitions_
const ExecutorDeviceType device_type_

+ Here is the caller graph for this function:

void WindowFunctionContext::resizeStorageForWindowFraming ( bool const  for_reuse = false)
private

Definition at line 1454 of file WindowContext.cpp.

References aggregate_trees_, partitionCount(), AggregateTreeForWindowFraming::resizeStorageForWindowFraming(), and segment_trees_owned_.

Referenced by compute().

1454  {
1455  auto const partition_count = partitionCount();
1457  if (!for_reuse) {
1458  segment_trees_owned_.resize(partition_count);
1459  }
1460 }
std::vector< std::shared_ptr< void > > segment_trees_owned_
size_t partitionCount() const
AggregateTreeForWindowFraming aggregate_trees_
void resizeStorageForWindowFraming(size_t partition_count)
Definition: WindowContext.h:97

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void WindowFunctionContext::setSortedPartitionCacheKey ( QueryPlanHash  cache_key)

Definition at line 188 of file WindowContext.cpp.

References sorted_partition_cache_key_.

188  {
189  sorted_partition_cache_key_ = cache_key;
190 }
QueryPlanHash sorted_partition_cache_key_
const int64_t * WindowFunctionContext::sortedPartition ( ) const

Definition at line 988 of file WindowContext.cpp.

References CHECK, and sorted_partition_buf_.

Referenced by Executor::codegenLoadPartitionBuffers().

988  {
990  return sorted_partition_buf_->data();
991 }
std::shared_ptr< std::vector< int64_t > > sorted_partition_buf_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

void WindowFunctionContext::sortPartition ( const size_t  partition_idx,
int64_t *  output_for_partition_buff,
bool  should_parallelize 
)
private

Definition at line 933 of file WindowContext.cpp.

References counts(), createComparator(), GT, gpu_enabled::iota(), LT, and gpu_enabled::sort().

Referenced by compute().

935  {
936  const size_t partition_size{static_cast<size_t>(counts()[partition_idx])};
937  if (partition_size == 0) {
938  return;
939  }
940  std::iota(
941  output_for_partition_buff, output_for_partition_buff + partition_size, int64_t(0));
942  auto partition_comparator = createComparator(partition_idx);
943  if (!partition_comparator.empty()) {
944  const auto col_tuple_comparator = [&partition_comparator](const int64_t lhs,
945  const int64_t rhs) {
946  for (const auto& comparator : partition_comparator) {
947  const auto comparator_result = comparator(lhs, rhs);
948  switch (comparator_result) {
950  return true;
952  return false;
953  default:
954  // WindowComparatorResult::EQ: continue to next comparator
955  continue;
956  }
957  }
958  // If here WindowFunctionContext::WindowComparatorResult::KEQ for all keys
959  // return false as sort algo must enforce weak ordering
960  return false;
961  };
962  if (should_parallelize) {
963 #ifdef HAVE_TBB
964  tbb::parallel_sort(output_for_partition_buff,
965  output_for_partition_buff + partition_size,
966  col_tuple_comparator);
967 #else
968  thrust::sort(output_for_partition_buff,
969  output_for_partition_buff + partition_size,
970  col_tuple_comparator);
971 #endif
972  } else {
973  std::sort(output_for_partition_buff,
974  output_for_partition_buff + partition_size,
975  col_tuple_comparator);
976  }
977  }
978 }
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
const int32_t * counts() const
std::vector< Comparator > createComparator(size_t partition_idx)
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

AggregateState WindowFunctionContext::aggregate_state_
private
size_t* WindowFunctionContext::aggregate_trees_depth_
private
size_t WindowFunctionContext::aggregate_trees_fan_out_
private

Definition at line 322 of file WindowContext.h.

Referenced by buildAggregationTreeForPartition(), and getAggregateTreeFanout().

const ExecutorDeviceType WindowFunctionContext::device_type_
private

Definition at line 335 of file WindowContext.h.

Referenced by counts(), offsets(), and payload().

const int32_t WindowFunctionContext::dummy_count_
private

Definition at line 339 of file WindowContext.h.

Referenced by counts().

const int32_t WindowFunctionContext::dummy_offset_
private

Definition at line 340 of file WindowContext.h.

Referenced by offsets().

int32_t* WindowFunctionContext::dummy_payload_
private

Definition at line 345 of file WindowContext.h.

Referenced by payload(), WindowFunctionContext(), and ~WindowFunctionContext().

size_t WindowFunctionContext::elem_count_
private
std::vector<const int8_t*> WindowFunctionContext::order_columns_
private
std::vector<std::vector<std::shared_ptr<Chunk_NS::Chunk> > > WindowFunctionContext::order_columns_owner_
private

Definition at line 303 of file WindowContext.h.

Referenced by addOrderColumn().

std::vector<SQLTypeInfo> WindowFunctionContext::order_columns_ti_
private

Definition at line 306 of file WindowContext.h.

Referenced by addOrderColumn(), and getOrderKeyColumnBufferTypes().

int64_t* WindowFunctionContext::ordered_partition_null_end_pos_
private
int64_t* WindowFunctionContext::ordered_partition_null_start_pos_
private
int8_t* WindowFunctionContext::output_
private

Definition at line 312 of file WindowContext.h.

Referenced by compute(), and output().

QueryPlanHash WindowFunctionContext::partition_cache_key_
private

Definition at line 300 of file WindowContext.h.

int8_t* WindowFunctionContext::partition_end_
private

Definition at line 332 of file WindowContext.h.

Referenced by fillPartitionEnd(), partitionEnd(), and ~WindowFunctionContext().

int8_t* WindowFunctionContext::partition_start_
private

Definition at line 329 of file WindowContext.h.

Referenced by fillPartitionStart(), partitionStart(), and ~WindowFunctionContext().

int64_t* WindowFunctionContext::partition_start_offset_
private
std::shared_ptr<HashJoin> WindowFunctionContext::partitions_
private
std::shared_ptr<RowSetMemoryOwner> WindowFunctionContext::row_set_mem_owner_
private

Definition at line 336 of file WindowContext.h.

Referenced by compute().

std::vector<std::shared_ptr<void> > WindowFunctionContext::segment_trees_owned_
private
std::shared_ptr<std::vector<int64_t> > WindowFunctionContext::sorted_partition_buf_
private

Definition at line 313 of file WindowContext.h.

Referenced by compute(), and sortedPartition().

QueryPlanHash WindowFunctionContext::sorted_partition_cache_key_
private

Definition at line 301 of file WindowContext.h.

Referenced by compute(), and setSortedPartitionCacheKey().

std::vector<const int8_t*> WindowFunctionContext::window_func_expr_columns_
private
std::vector<std::vector<std::shared_ptr<Chunk_NS::Chunk> > > WindowFunctionContext::window_func_expr_columns_owner_
private

Definition at line 316 of file WindowContext.h.

Referenced by addColumnBufferForWindowFunctionExpression().


The documentation for this class was generated from the following files: