36 #include <tbb/parallel_sort.h>
38 #include <thrust/sort.h>
52 const size_t elem_count,
54 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
55 : window_func_(window_func)
58 , partitions_(nullptr)
59 , elem_count_(elem_count)
61 , sorted_partition_buf_(nullptr)
63 , aggregate_trees_depth_(nullptr)
64 , ordered_partition_null_start_pos_(nullptr)
65 , ordered_partition_null_end_pos_(nullptr)
66 , partition_start_offset_(nullptr)
67 , partition_start_(nullptr)
68 , partition_end_(nullptr)
69 , device_type_(device_type)
70 , row_set_mem_owner_(row_set_mem_owner)
71 , dummy_count_(elem_count)
73 , dummy_payload_(nullptr) {
97 const std::shared_ptr<HashJoin>& partitions,
98 const size_t elem_count,
100 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
101 size_t aggregation_tree_fan_out)
102 : window_func_(window_func)
103 , partition_cache_key_(partition_cache_key)
105 , partitions_(partitions)
106 , elem_count_(elem_count)
108 , sorted_partition_buf_(nullptr)
109 , aggregate_trees_fan_out_(aggregation_tree_fan_out)
110 , aggregate_trees_depth_(nullptr)
111 , ordered_partition_null_start_pos_(nullptr)
112 , ordered_partition_null_end_pos_(nullptr)
113 , partition_start_offset_(nullptr)
114 , partition_start_(nullptr)
115 , partition_end_(nullptr)
116 , device_type_(device_type)
117 , row_set_mem_owner_(row_set_mem_owner)
118 , dummy_count_(elem_count)
120 , dummy_payload_(nullptr) {
124 reinterpret_cast<int64_t*
>(
checked_calloc(partition_count + 1,
sizeof(int64_t)));
127 reinterpret_cast<size_t*
>(
checked_calloc(partition_count,
sizeof(
size_t)));
129 reinterpret_cast<int64_t*
>(
checked_calloc(partition_count,
sizeof(int64_t)));
131 reinterpret_cast<int64_t*
>(
checked_calloc(partition_count,
sizeof(int64_t)));
158 const int8_t* column,
160 const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
167 const int8_t* column,
168 const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
173 const std::vector<const int8_t*>&
196 std::vector<int64_t> row_numbers(index_size);
197 for (
size_t i = 0; i < index_size; ++i) {
198 row_numbers[index[i]] = i + 1;
206 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator,
207 const int64_t* index,
212 return comparator(index[i - 1], index[i]);
217 const int64_t* index,
218 const size_t index_size,
219 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
220 std::vector<int64_t> rank(index_size);
222 for (
size_t i = 0; i < index_size; ++i) {
226 rank[index[i]] = crt_rank;
233 const int64_t* index,
234 const size_t index_size,
235 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
236 std::vector<int64_t> dense_rank(index_size);
238 for (
size_t i = 0; i < index_size; ++i) {
242 dense_rank[index[i]] = crt_rank;
249 const int64_t* index,
250 const size_t index_size,
251 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
252 std::vector<double> percent_rank(index_size);
254 for (
size_t i = 0; i < index_size; ++i) {
258 percent_rank[index[i]] =
259 index_size == 1 ? 0 :
static_cast<double>(crt_rank - 1) / (index_size - 1);
266 const int64_t* index,
267 const size_t index_size,
268 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
269 std::vector<double> cume_dist(index_size);
270 size_t start_peer_group = 0;
271 while (start_peer_group < index_size) {
272 size_t end_peer_group = start_peer_group + 1;
273 while (end_peer_group < index_size &&
277 for (
size_t i = start_peer_group; i < end_peer_group; ++i) {
278 cume_dist[index[i]] =
static_cast<double>(end_peer_group) / index_size;
280 start_peer_group = end_peer_group;
287 const size_t index_size,
289 std::vector<int64_t> row_numbers(index_size);
291 throw std::runtime_error(
"NTILE argument cannot be zero");
293 const size_t tile_size = (index_size + n - 1) / n;
294 for (
size_t i = 0; i < index_size; ++i) {
295 row_numbers[index[i]] = i / tile_size + 1;
310 throw std::runtime_error(
"LAG with non-constant lag argument not supported yet");
312 const auto& lag_ti = lag_constant->get_type_info();
313 switch (lag_ti.get_type()) {
315 return lag_constant->get_constval().smallintval;
318 return lag_constant->get_constval().intval;
321 return lag_constant->get_constval().bigintval;
324 LOG(
FATAL) <<
"Invalid type for the lag argument";
335 if (args.size() == 3) {
336 throw std::runtime_error(
"LAG with default not supported yet");
338 if (args.size() == 2) {
339 const int64_t lag_or_lead =
350 const size_t partition_size) {
355 : partition_size - 1;
361 const int32_t* original_indices,
362 const size_t partition_size) {
363 std::vector<int64_t> new_output_for_partition_buff(partition_size);
364 for (
size_t i = 0; i < partition_size; ++i) {
365 new_output_for_partition_buff[i] = original_indices[output_for_partition_buff[i]];
367 std::copy(new_output_for_partition_buff.begin(),
368 new_output_for_partition_buff.end(),
369 output_for_partition_buff);
374 const int32_t* original_indices,
375 int64_t* sorted_indices,
376 const size_t partition_size) {
377 std::vector<int64_t> lag_sorted_indices(partition_size, -1);
378 for (int64_t idx = 0; idx < static_cast<int64_t>(partition_size); ++idx) {
379 int64_t lag_idx = idx - lag;
380 if (lag_idx < 0 || lag_idx >= static_cast<int64_t>(partition_size)) {
383 lag_sorted_indices[idx] = sorted_indices[lag_idx];
385 std::vector<int64_t> lag_original_indices(partition_size);
386 for (
size_t k = 0; k < partition_size; ++k) {
387 const auto lag_index = lag_sorted_indices[k];
388 lag_original_indices[sorted_indices[k]] =
389 lag_index != -1 ? original_indices[lag_index] : -1;
391 std::copy(lag_original_indices.begin(), lag_original_indices.end(), sorted_indices);
395 int64_t* output_for_partition_buff,
396 const size_t partition_size,
397 const size_t target_pos) {
398 CHECK_LT(target_pos, partition_size);
399 const auto target_idx = original_indices[output_for_partition_buff[target_pos]];
401 output_for_partition_buff, output_for_partition_buff + partition_size, target_idx);
405 int64_t* output_for_partition_buff,
406 const size_t partition_size) {
407 for (
size_t i = 0; i < partition_size; i++) {
408 const auto target_idx = original_indices[output_for_partition_buff[i]];
409 output_for_partition_buff[i] = target_idx;
414 const int8_t* partition_end,
416 const int64_t* index,
417 const size_t index_size,
418 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
419 int64_t partition_end_handle =
reinterpret_cast<int64_t
>(partition_end);
420 for (
size_t i = 0; i < index_size; ++i) {
430 return (reinterpret_cast<const int8_t*>(bitset))[pos >> 3] & (1 << (pos & 7));
438 const int64_t bitset,
443 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
444 for (
auto pending_output_slot : pending_output_slots) {
445 *
reinterpret_cast<T*
>(pending_output_slot) = value;
447 pending_output_slots.clear();
454 const int64_t bitset,
456 apply_window_pending_outputs_int<int64_t>(handle, value, bitset, pos);
461 const int64_t bitset,
463 apply_window_pending_outputs_int<int32_t>(handle, value, bitset, pos);
468 const int64_t bitset,
470 apply_window_pending_outputs_int<int16_t>(handle, value, bitset, pos);
475 const int64_t bitset,
477 apply_window_pending_outputs_int<int8_t>(handle, value, bitset, pos);
482 const int64_t bitset,
487 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
488 for (
auto pending_output_slot : pending_output_slots) {
489 *
reinterpret_cast<double*
>(pending_output_slot) = value;
491 pending_output_slots.clear();
496 const int64_t bitset,
501 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
502 for (
auto pending_output_slot : pending_output_slots) {
503 *
reinterpret_cast<double*
>(pending_output_slot) = value;
505 pending_output_slots.clear();
509 const int64_t handle,
511 const int64_t bitset,
516 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
517 for (
auto pending_output_slot : pending_output_slots) {
518 *
reinterpret_cast<float*
>(pending_output_slot) = value;
520 pending_output_slots.clear();
525 const int64_t handle) {
526 reinterpret_cast<std::vector<void*>*
>(handle)->push_back(pending_output);
538 switch (window_func->
getKind()) {
550 std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
551 std::unordered_map<
QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>&
552 sorted_partition_cache,
553 std::unordered_map<size_t, AggregateTreeForWindowFraming>& aggregate_tree_map) {
559 size_t output_buf_sz =
564 bool const need_window_partition_buf =
566 if (is_agg_func || need_window_partition_buf) {
569 need_window_partition_buf) {
573 std::unique_ptr<int64_t[]> scratchpad;
574 int64_t* intermediate_output_buffer;
575 if (is_agg_func || need_window_partition_buf) {
576 intermediate_output_buffer =
reinterpret_cast<int64_t*
>(
output_);
580 intermediate_output_buffer = scratchpad.get();
586 auto cached_sorted_partition_it =
588 if (cached_sorted_partition_it != sorted_partition_cache.end()) {
589 auto& sorted_partition = cached_sorted_partition_it->second;
590 VLOG(1) <<
"Reuse cached sorted partition to compute window function context (key: "
594 DEBUG_TIMER(
"Window Function Cached Sorted Partition Copy");
595 std::memcpy(intermediate_output_buffer, sorted_partition->data(), output_buf_sz);
596 if (need_window_partition_buf) {
601 const auto sort_partitions = [&](
const size_t start,
const size_t end) {
602 for (
size_t partition_idx = start; partition_idx < end; ++partition_idx) {
604 intermediate_output_buffer +
offsets()[partition_idx],
609 if (should_parallelize) {
610 auto sorted_partition_copy_timer =
611 DEBUG_TIMER(
"Window Function Partition Sorting Parallelized");
614 const tbb::blocked_range<int64_t>& r) {
616 parent_thread_local_ids.setNewThreadId();
617 sort_partitions(r.begin(), r.end());
620 auto sorted_partition_copy_timer =
621 DEBUG_TIMER(
"Window Function Partition Sorting Non-Parallelized");
624 auto sorted_partition_ref_cnt_it =
626 bool can_access_sorted_partition =
627 sorted_partition_ref_cnt_it != sorted_partition_key_ref_count_map.end() &&
628 sorted_partition_ref_cnt_it->second > 1;
629 if (can_access_sorted_partition || need_window_partition_buf) {
633 DEBUG_TIMER(
"Window Function Sorted Partition Copy For Caching");
646 if (need_window_partition_buf) {
647 const auto compute_ordered_partition_null_range = [=](
const size_t start,
649 for (
size_t partition_idx = start; partition_idx < end; ++partition_idx) {
654 intermediate_output_buffer +
offsets()[partition_idx]);
659 if (should_parallelize) {
660 auto partition_compuation_timer =
661 DEBUG_TIMER(
"Window Function Ordered-Partition Null-Range Compute");
664 const tbb::blocked_range<int64_t>& r) {
666 parent_thread_local_ids.setNewThreadId();
667 compute_ordered_partition_null_range(r.begin(), r.end());
671 "Window Function Non-Parallelized Ordered-Partition Null-Range Compute");
675 auto const c_it = aggregate_tree_map.find(cache_key);
676 if (c_it != aggregate_tree_map.cend()) {
677 VLOG(1) <<
"Reuse aggregate tree for window function framing";
682 sizeof(
size_t) * partition_count);
685 const auto build_aggregation_tree_for_partitions = [=](
const size_t start,
687 for (
size_t partition_idx = start; partition_idx < end; ++partition_idx) {
693 const auto partition_size =
counts()[partition_idx];
698 intermediate_output_buffer,
703 if (should_parallelize) {
705 "Window Function Parallelized Segment Tree Construction for Partitions");
708 const tbb::blocked_range<int64_t>& r) {
710 parent_thread_local_ids.setNewThreadId();
711 build_aggregation_tree_for_partitions(r.begin(), r.end());
715 "Window Function Non-Parallelized Segment Tree Construction for "
717 build_aggregation_tree_for_partitions(0, partition_count);
721 VLOG(2) <<
"Put aggregate tree for the window framing";
725 const auto compute_partitions = [=](
const size_t start,
const size_t end) {
726 for (
size_t partition_idx = start; partition_idx < end; ++partition_idx) {
728 intermediate_output_buffer +
offsets()[partition_idx],
733 if (should_parallelize) {
734 auto partition_compuation_timer =
DEBUG_TIMER(
"Window Function Partition Compute");
737 const tbb::blocked_range<int64_t>& r) {
739 parent_thread_local_ids.setNewThreadId();
740 compute_partitions(r.begin(), r.end());
743 auto partition_compuation_timer =
744 DEBUG_TIMER(
"Window Function Non-Parallelized Partition Compute");
748 if (is_agg_func || need_window_partition_buf) {
754 auto output_i64 =
reinterpret_cast<int64_t*
>(
output_);
755 const auto payload_copy = [=](
const size_t start,
const size_t end) {
756 for (
size_t i = start; i < end; ++i) {
757 output_i64[
payload()[i]] = intermediate_output_buffer[i];
760 if (should_parallelize) {
761 auto payload_copy_timer =
762 DEBUG_TIMER(
"Window Function Non-Aggregate Payload Copy Parallelized");
765 const tbb::blocked_range<int64_t>& r) {
767 parent_thread_local_ids.setNewThreadId();
768 payload_copy(r.begin(), r.end());
771 auto payload_copy_timer =
772 DEBUG_TIMER(
"Window Function Non-Aggregate Payload Copy Non-Parallelized");
782 int64_t null_bit_pattern = -1;
784 template <
typename T>
786 IndexPair null_range{std::numeric_limits<int64_t>::max(),
787 std::numeric_limits<int64_t>::min()};
788 auto const null_val = inline_int_null_value<T>();
789 auto const casted_order_col_buf =
reinterpret_cast<T const*
>(order_col_buf);
790 if (casted_order_col_buf[original_col_idx_buf[ordered_col_idx_buf[0]]] == null_val) {
791 int64_t null_range_max = 1;
792 while (null_range_max < partition_size &&
794 [original_col_idx_buf[ordered_col_idx_buf[null_range_max]]] ==
798 null_range.first = 0;
799 null_range.second = null_range_max - 1;
800 }
else if (casted_order_col_buf
801 [original_col_idx_buf[ordered_col_idx_buf[partition_size - 1]]] ==
803 int64_t null_range_min = partition_size - 2;
804 while (null_range_min >= 0 &&
806 [original_col_idx_buf[ordered_col_idx_buf[null_range_min]]] ==
810 null_range.first = null_range_min + 1;
811 null_range.second = partition_size - 1;
816 template <
typename COL_TYPE,
818 std::conditional_t<sizeof(COL_TYPE) == sizeof(int32_t), int32_t, int64_t>>
820 IndexPair null_range{std::numeric_limits<int64_t>::max(),
821 std::numeric_limits<int64_t>::min()};
822 auto const casted_order_col_buf =
reinterpret_cast<COL_TYPE const*
>(order_col_buf);
823 auto check_null_val = [&casted_order_col_buf,
this](
size_t idx) {
824 return *
reinterpret_cast<NULL_TYPE const*
>(
825 may_alias_ptr(&casted_order_col_buf
826 [original_col_idx_buf[ordered_col_idx_buf[idx]]])) ==
829 if (check_null_val(0)) {
830 int64_t null_range_max = 1;
831 while (null_range_max < partition_size && check_null_val(null_range_max)) {
834 null_range.first = 0;
835 null_range.second = null_range_max - 1;
836 }
else if (check_null_val(partition_size - 1)) {
837 int64_t null_range_min = partition_size - 2;
838 while (null_range_min >= 0 && check_null_val(null_range_min)) {
841 null_range.first = null_range_min + 1;
842 null_range.second = partition_size - 1;
851 size_t partition_idx,
852 const int32_t* original_col_idx_buf,
853 const int64_t* ordered_col_idx_buf) {
855 const auto partition_size =
counts()[partition_idx];
856 if (partition_size > 0) {
859 FindNullRange
const null_range_info{
860 original_col_idx_buf, ordered_col_idx_buf, partition_size};
864 null_range_info.find_null_range_int<int64_t>(
order_columns_.front());
868 null_range_info.find_null_range_int<int32_t>(
order_columns_.front());
872 null_range_info.find_null_range_int<int16_t>(
order_columns_.front());
876 null_range_info.find_null_range_int<int8_t>(
order_columns_.front());
881 }
else if (order_col_ti.
is_fp()) {
882 const auto null_bit_pattern =
884 FindNullRange
const null_range_info{
885 original_col_idx_buf, ordered_col_idx_buf, partition_size, null_bit_pattern};
888 null_range = null_range_info.find_null_range_fp<
float>(
order_columns_.front());
891 null_range = null_range_info.find_null_range_fp<
double>(
order_columns_.front());
897 LOG(
FATAL) <<
"Invalid column type for window aggregation over the frame";
905 size_t partition_idx) {
907 std::vector<WindowFunctionContext::Comparator> partition_comparator;
910 CHECK_EQ(order_keys.size(), collation.size());
911 for (
size_t order_column_idx = 0; order_column_idx <
order_columns_.size();
912 ++order_column_idx) {
914 const auto order_col =
917 const auto& order_col_collation = collation[order_column_idx];
921 !order_col_collation.is_desc,
922 order_col_collation.nulls_first);
923 if (order_col_collation.is_desc) {
924 comparator = [comparator](
const int64_t lhs,
const int64_t rhs) {
925 return comparator(rhs, lhs);
928 partition_comparator.push_back(comparator);
930 return partition_comparator;
934 int64_t* output_for_partition_buff,
935 bool should_parallelize) {
936 const size_t partition_size{
static_cast<size_t>(
counts()[partition_idx])};
937 if (partition_size == 0) {
941 output_for_partition_buff, output_for_partition_buff + partition_size, int64_t(0));
943 if (!partition_comparator.empty()) {
944 const auto col_tuple_comparator = [&partition_comparator](
const int64_t lhs,
946 for (
const auto& comparator : partition_comparator) {
947 const auto comparator_result = comparator(lhs, rhs);
948 switch (comparator_result) {
962 if (should_parallelize) {
964 tbb::parallel_sort(output_for_partition_buff,
965 output_for_partition_buff + partition_size,
966 col_tuple_comparator);
969 output_for_partition_buff + partition_size,
970 col_tuple_comparator);
974 output_for_partition_buff + partition_size,
975 col_tuple_comparator);
1034 const int8_t* order_column_buffer,
1036 const int32_t* partition_indices,
1039 const bool asc_ordering,
1040 const bool nulls_first) {
1041 const auto values =
reinterpret_cast<const T*
>(order_column_buffer);
1042 const auto lhs_val = values[partition_indices[lhs]];
1043 const auto rhs_val = values[partition_indices[rhs]];
1045 if (lhs_val == null_val && rhs_val == null_val) {
1048 if (lhs_val == null_val && rhs_val != null_val) {
1052 if (rhs_val == null_val && lhs_val != null_val) {
1056 if (lhs_val < rhs_val) {
1059 if (lhs_val > rhs_val) {
1067 const int8_t* order_column_buffer,
1069 const int32_t* partition_indices,
1072 const bool asc_ordering,
1073 const bool nulls_first) {
1074 const auto values =
reinterpret_cast<const T*
>(order_column_buffer);
1075 const auto lhs_val = values[partition_indices[lhs]];
1076 const auto rhs_val = values[partition_indices[rhs]];
1078 if (lhs_val == null_val && rhs_val == null_val) {
1081 if (lhs_val == null_val && rhs_val != null_val) {
1085 if (rhs_val == null_val && lhs_val != null_val) {
1089 if (lhs_val < rhs_val) {
1092 if (lhs_val > rhs_val) {
1098 template <
class T,
class NullPatternType>
1100 const int8_t* order_column_buffer,
1102 const int32_t* partition_indices,
1105 const bool asc_ordering,
1106 const bool nulls_first) {
1107 const auto values =
reinterpret_cast<const T*
>(order_column_buffer);
1108 const auto lhs_val = values[partition_indices[lhs]];
1109 const auto rhs_val = values[partition_indices[rhs]];
1111 const auto lhs_bit_pattern =
1112 *
reinterpret_cast<const NullPatternType*
>(may_alias_ptr(&lhs_val));
1113 const auto rhs_bit_pattern =
1114 *
reinterpret_cast<const NullPatternType*
>(may_alias_ptr(&rhs_val));
1115 if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
1118 if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
1122 if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
1126 if (lhs_val < rhs_val) {
1129 if (lhs_val > rhs_val) {
1135 template <
class T,
class NullPatternType>
1137 const int8_t* order_column_buffer,
1139 const int32_t* partition_indices,
1142 const bool asc_ordering,
1143 const bool nulls_first) {
1144 const auto values =
reinterpret_cast<const T*
>(order_column_buffer);
1145 const auto lhs_val = values[partition_indices[lhs]];
1146 const auto rhs_val = values[partition_indices[rhs]];
1148 const auto lhs_bit_pattern =
1149 *
reinterpret_cast<const NullPatternType*
>(may_alias_ptr(&lhs_val));
1150 const auto rhs_bit_pattern =
1151 *
reinterpret_cast<const NullPatternType*
>(may_alias_ptr(&rhs_val));
1152 if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
1155 if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
1159 if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
1163 if (lhs_val < rhs_val) {
1166 if (lhs_val > rhs_val) {
1176 const int8_t* order_column_buffer,
1177 const int32_t* partition_indices,
1178 const bool asc_ordering,
1179 const bool nulls_first) {
1181 if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1182 switch (ti.get_size()) {
1184 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1185 const int64_t lhs,
const int64_t rhs) {
1186 return asc_ordering ? integer_comparator_asc<int64_t>(order_column_buffer,
1193 : integer_comparator_desc<int64_t>(order_column_buffer,
1203 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1204 const int64_t lhs,
const int64_t rhs) {
1205 return asc_ordering ? integer_comparator_asc<int32_t>(order_column_buffer,
1212 : integer_comparator_desc<int32_t>(order_column_buffer,
1222 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1223 const int64_t lhs,
const int64_t rhs) {
1224 return asc_ordering ? integer_comparator_asc<int16_t>(order_column_buffer,
1231 : integer_comparator_desc<int16_t>(order_column_buffer,
1241 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1242 const int64_t lhs,
const int64_t rhs) {
1243 return asc_ordering ? integer_comparator_asc<int8_t>(order_column_buffer,
1250 : integer_comparator_desc<int8_t>(order_column_buffer,
1260 LOG(
FATAL) <<
"Invalid type size: " << ti.get_size();
1265 switch (ti.get_type()) {
1267 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1268 const int64_t lhs,
const int64_t rhs) {
1269 return asc_ordering ? fp_comparator_asc<float, int32_t>(order_column_buffer,
1276 : fp_comparator_desc<float, int32_t>(order_column_buffer,
1286 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1287 const int64_t lhs,
const int64_t rhs) {
1288 return asc_ordering ? fp_comparator_asc<double, int64_t>(order_column_buffer,
1295 : fp_comparator_desc<double, int64_t>(order_column_buffer,
1305 LOG(
FATAL) <<
"Invalid float type";
1309 throw std::runtime_error(
"Type not supported yet");
1313 const size_t partition_idx,
1314 int64_t* output_for_partition_buff,
1316 const size_t partition_size{
static_cast<size_t>(
counts()[partition_idx])};
1317 if (partition_size == 0) {
1320 const auto offset =
offsets()[partition_idx];
1322 const auto col_tuple_comparator = [&partition_comparator](
const int64_t lhs,
1323 const int64_t rhs) {
1324 for (
const auto& comparator : partition_comparator) {
1325 const auto comparator_result = comparator(lhs, rhs);
1326 switch (comparator_result) {
1340 switch (window_func->
getKind()) {
1342 const auto row_numbers =
1344 std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
1349 index_to_rank(output_for_partition_buff, partition_size, col_tuple_comparator);
1350 std::copy(rank.begin(), rank.end(), output_for_partition_buff);
1355 output_for_partition_buff, partition_size, col_tuple_comparator);
1356 std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
1361 output_for_partition_buff, partition_size, col_tuple_comparator);
1364 reinterpret_cast<double*
>(may_alias_ptr(output_for_partition_buff)));
1369 output_for_partition_buff, partition_size, col_tuple_comparator);
1372 reinterpret_cast<double*
>(may_alias_ptr(output_for_partition_buff)));
1379 const auto ntile =
index_to_ntile(output_for_partition_buff, partition_size, n);
1380 std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
1386 const auto partition_row_offsets =
payload() + offset;
1388 lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
1393 const auto target_idx =
1395 const auto partition_row_offsets =
payload() + offset;
1397 partition_row_offsets, output_for_partition_buff, partition_size, target_idx);
1401 auto const n_value_ptr =
1404 auto const n_value =
static_cast<size_t>(n_value_ptr->get_constval().intval);
1405 const auto partition_row_offsets =
payload() + offset;
1406 if (n_value < partition_size) {
1408 partition_row_offsets, output_for_partition_buff, partition_size, n_value);
1415 partition_row_offsets, output_for_partition_buff, partition_size);
1434 const auto partition_row_offsets =
payload() + offset;
1438 output_for_partition_buff,
1440 col_tuple_comparator);
1443 output_for_partition_buff, partition_row_offsets, partition_size);
1447 std::ostringstream oss;
1449 throw std::runtime_error(oss.str());
1475 size_t partition_idx,
1476 size_t partition_size,
1477 const int32_t* original_rowid_buf,
1478 const int64_t* ordered_rowid_buf,
1482 throw QueryNotSupported(
"Window aggregate function over frame on a column type " +
1491 "Aggregation over a window frame for a column type " +
1493 " must use one of the following window aggregate function: MIN / MAX / COUNT");
1499 if (partition_size > 0) {
1502 const int64_t* ordered_rowid_buf_for_partition =
1503 ordered_rowid_buf +
offsets()[partition_idx];
1507 const auto segment_tree = std::make_shared<SegmentTree<int8_t, int64_t>>(
1511 ordered_rowid_buf_for_partition,
1516 segment_tree ? segment_tree->getLeafDepth() : 0;
1519 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1522 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1528 const auto segment_tree = std::make_shared<SegmentTree<int16_t, int64_t>>(
1532 ordered_rowid_buf_for_partition,
1537 segment_tree ? segment_tree->getLeafDepth() : 0;
1540 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1543 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1549 const auto segment_tree = std::make_shared<SegmentTree<int32_t, int64_t>>(
1553 ordered_rowid_buf_for_partition,
1558 segment_tree ? segment_tree->getLeafDepth() : 0;
1561 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1564 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1572 const auto segment_tree = std::make_shared<SegmentTree<int64_t, int64_t>>(
1576 ordered_rowid_buf_for_partition,
1581 segment_tree ? segment_tree->getLeafDepth() : 0;
1584 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1587 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1593 const auto segment_tree =
1597 ordered_rowid_buf_for_partition,
1602 segment_tree ? segment_tree->getLeafDepth() : 0;
1605 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1608 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1614 const auto segment_tree =
1618 ordered_rowid_buf_for_partition,
1623 segment_tree ? segment_tree->getLeafDepth() : 0;
1626 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1629 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1704 auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
1717 for (int64_t i = 0; i < partition_count - 1; ++i) {
1722 std::vector<size_t> partition_offsets(partition_count);
1724 for (int64_t i = 0; i < partition_count - 1; ++i) {
1738 auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
1743 auto partition_end_handle =
reinterpret_cast<int64_t
>(
partition_end_);
1750 for (int64_t i = 0; i < partition_count - 1; ++i) {
1761 std::vector<size_t> partition_offsets(partition_count);
1763 for (int64_t i = 0; i < partition_count - 1; ++i) {
1764 if (partition_offsets[i] == 0) {
1777 return reinterpret_cast<const int32_t*
>(
1786 return reinterpret_cast<const int32_t*
>(
1794 return reinterpret_cast<const int32_t*
>(
1804 return partition_count;
1825 boost::hash_combine(cache_key, order_entry.toString());
1831 std::unique_ptr<WindowFunctionContext> window_function_context,
1832 const size_t target_index) {
1833 const auto it_ok = window_contexts_.emplace(
1834 std::make_pair(target_index, std::move(window_function_context)));
1835 CHECK(it_ok.second);
1840 const size_t target_index)
const {
1841 const auto it = window_contexts_.find(target_index);
1842 CHECK(it != window_contexts_.end());
1843 executor->active_window_function_ = it->second.get();
1844 return executor->active_window_function_;
1848 executor->active_window_function_ =
nullptr;
1852 Executor* executor) {
1853 return executor->active_window_function_;
1857 executor->window_project_node_context_owned_ =
1858 std::make_unique<WindowProjectNodeContext>();
1859 return executor->window_project_node_context_owned_.
get();
1863 return executor->window_project_node_context_owned_.
get();
1867 executor->window_project_node_context_owned_ =
nullptr;
1868 executor->active_window_function_ =
nullptr;
size_t getAggregateTreeFanout() const
int32_t const * original_col_idx_buf
bool g_enable_parallel_window_partition_sort
std::vector< SumAndCountPair< double > * > derived_aggregate_tree_for_double_type_
SqlWindowFunctionKind getKind() const
void addOrderColumn(const int8_t *column, const SQLTypeInfo &ti, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
HOST DEVICE int get_size() const
WindowFunctionContext::WindowComparatorResult fp_comparator_asc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
int64_t * ordered_partition_null_start_pos_
RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
Descriptor for the storage layout use for (approximate) count distinct operations.
bool allow_framing_on_time_or_date(SqlWindowFunctionKind kind)
const int32_t dummy_count_
bool isMissingValueFillingFunction() const
std::vector< double * > aggregate_tree_for_double_type_
size_t get_target_idx_for_first_or_last_value_func(const Analyzer::WindowFunction *window_func, const size_t partition_size)
bool advance_current_rank(const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator, const int64_t *index, const size_t i)
RUNTIME_EXPORT void add_window_pending_output(void *pending_output, const int64_t handle)
bool hasAggregateTreeRequiredWindowFunc() const
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
int64_t * getNullValueEndPos() const
Utility functions for easy access to the result set buffers.
bool is_time_or_date() const
const int32_t dummy_offset_
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool asc_ordering, const bool nulls_first)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
const int8_t * partitionStart() const
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const std::vector< SQLTypeInfo > & getOrderKeyColumnBufferTypes() const
void setSortedPartitionCacheKey(QueryPlanHash cache_key)
void computeNullRangeOfSortedPartition(const SQLTypeInfo &order_col_ti, size_t partition_idx, const int32_t *original_col_idx_buf, const int64_t *ordered_col_idx_buf)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
DEVICE void sort(ARGS &&...args)
static WindowProjectNodeContext * create(Executor *executor)
size_t elementCount() const
const int8_t * output() const
const Analyzer::WindowFunction * window_func_
RUNTIME_EXPORT ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val, const int64_t bucket_size)
const int32_t * counts() const
Constants for Builtin SQL Types supported by HEAVY.AI.
int64_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
const int32_t * offsets() const
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
size_t g_parallel_window_partition_compute_threshold
HOST DEVICE SQLTypes get_type() const
size_t g_parallel_window_partition_sort_threshold
RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle, const double value, const int64_t bitset, const int64_t pos)
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
const std::vector< OrderEntry > & getCollation() const
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
const bool needsToBuildAggregateTree() const
size_t * getAggregateTreeDepth() const
int64_t ** getAggregationTreesForIntegerTypeWindowExpr() const
const std::vector< const int8_t * > & getColumnBufferForWindowFunctionExpressions() const
int64_t * getNullValueStartPos() const
SumAndCountPair< double > ** getDerivedAggregationTreesForDoubleTypeWindowExpr() const
const int64_t * partitionStartOffset() const
std::vector< std::shared_ptr< void > > segment_trees_owned_
std::shared_ptr< std::vector< int64_t > > sorted_partition_buf_
size_t partitionCount() const
AggregateState aggregate_state_
void apply_window_pending_outputs_int(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
int32_t const partition_size
static const WindowProjectNodeContext * get(Executor *executor)
size_t g_window_function_aggregation_tree_fanout
DEVICE void fill(ARGS &&...args)
const int64_t * aggregateStateCount() const
std::vector< Comparator > createComparator(size_t partition_idx)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
QueryPlanHash sorted_partition_cache_key_
void apply_nth_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size, const size_t target_pos)
const int8_t * partitionEnd() const
void * checked_malloc(const size_t size)
DEVICE auto copy(ARGS &&...args)
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > window_func_expr_columns_owner_
void addColumnBufferForWindowFunctionExpression(const int8_t *column, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
static void reset(Executor *executor)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
size_t * aggregate_trees_depth_
DEVICE void partial_sum(ARGS &&...args)
void * checked_calloc(const size_t nmemb, const size_t size)
int64_t aggregateStatePendingOutputs() const
void buildAggregationTreeForPartition(SqlWindowFunctionKind agg_type, size_t partition_idx, size_t partition_size, const int32_t *original_rowid_buf, const int64_t *ordered_rowid_buf, const SQLTypeInfo &input_col_ti)
RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const int64_t * aggregateState() const
size_t aggregate_trees_fan_out_
std::string toString(const Executor::ExtModuleKinds &kind)
RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const SQLTypeInfo & get_type_info() const
std::pair< int64_t, int64_t > IndexPair
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
SumAndCountPair< int64_t > ** getDerivedAggregationTreesForIntegerTypeWindowExpr() const
void fillPartitionStart()
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
int8_t * partition_start_
std::vector< const int8_t * > window_func_expr_columns_
SQLTypes get_int_type_by_size(size_t const nbytes)
int64_t const * ordered_col_idx_buf
AggregateTreeForWindowFraming aggregate_trees_
void sortPartition(const size_t partition_idx, int64_t *output_for_partition_buff, bool should_parallelize)
DEVICE void iota(ARGS &&...args)
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
bool pos_is_set(const int64_t bitset, const int64_t pos)
const std::vector< const int8_t * > & getOrderKeyColumnBuffers() const
std::shared_ptr< HashJoin > partitions_
void resizeStorageForWindowFraming(size_t partition_count)
const int64_t * partitionNumCountBuf() const
RUNTIME_EXPORT void apply_window_pending_outputs_float_columnar(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
void apply_lag_to_partition(const int64_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
void apply_original_index_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void resizeStorageForWindowFraming(bool const for_reuse=false)
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
WindowFunctionContext::WindowComparatorResult integer_comparator_desc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
bool g_enable_parallel_window_partition_compute
void computePartitionBuffer(const size_t partition_idx, int64_t *output_for_partition_buff, const Analyzer::WindowFunction *window_func)
int64_t * partition_start_offset_
#define DEBUG_TIMER(name)
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< const int8_t * > order_columns_
const int64_t * sortedPartition() const
const QueryPlanHash computeAggregateTreeCacheKey() const
static void resetWindowFunctionContext(Executor *executor)
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::vector< SQLTypeInfo > order_columns_ti_
std::vector< SumAndCountPair< int64_t > * > derived_aggregate_tree_for_integer_type_
const Analyzer::WindowFunction * getWindowFunction() const
const int32_t * payload() const
IndexPair find_null_range_int(int8_t const *order_col_buf) const
std::function< WindowFunctionContext::WindowComparatorResult(const int64_t lhs, const int64_t rhs)> Comparator
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
size_t * aggregate_trees_depth_
RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
WindowFunctionContext::WindowComparatorResult fp_comparator_desc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
std::vector< int64_t * > aggregate_tree_for_integer_type_
WindowFunctionContext::WindowComparatorResult integer_comparator_asc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
IndexPair find_null_range_fp(int8_t const *order_col_buf) const
ThreadLocalIds thread_local_ids()
const ExecutorDeviceType device_type_
void compute(std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, std::unordered_map< QueryPlanHash, std::shared_ptr< std::vector< int64_t >>> &sorted_partition_cache, std::unordered_map< QueryPlanHash, AggregateTreeForWindowFraming > &aggregate_tree_map)
double ** getAggregationTreesForDoubleTypeWindowExpr() const
int64_t * ordered_partition_null_end_pos_
std::vector< void * > outputs