20 #include "SortUtils.cuh"
24 #include <thrust/copy.h>
25 #include <thrust/execution_policy.h>
26 #include <thrust/functional.h>
27 #include <thrust/partition.h>
28 #include <thrust/sort.h>
33 #define checkCudaErrors(err) CHECK_EQ(err, CUDA_SUCCESS)
37 template <
class K,
class I =
int32_t>
48 template <
class K,
class I =
int32_t>
54 const auto oe_val = *
reinterpret_cast<const K*
>(
oe_base + index *
oe_stride);
57 return *
reinterpret_cast<const int32_t*
>(&oe_val) ==
60 return *
reinterpret_cast<const int64_t*
>(&oe_val) ==
null_val;
70 template <
typename ForwardIterator>
73 const int64_t null_val,
74 const bool nulls_first,
75 const int8_t* rows_ptr,
104 template <
class K,
class I>
107 const int8_t* src_oe_base,
123 KeyReseter(int8_t* out_base,
const size_t stride,
const K emp_key)
136 template <
class K,
class I>
138 const int8_t* d_src_buffer,
139 const thrust::device_ptr<I>& d_idx_first,
140 const size_t idx_count,
141 const size_t oe_offset,
142 const size_t oe_stride,
144 const int device_id) {
146 thrust::for_each(thrust::cuda::par(allocator).on(qe_cuda_stream),
147 thrust::make_counting_iterator(
size_t(0)),
148 thrust::make_counting_iterator(idx_count),
150 d_src_buffer + oe_offset,
152 thrust::raw_pointer_cast(d_idx_first)));
156 template <
class K,
class I>
158 const size_t idx_count,
159 const thrust::device_ptr<K>& d_key_buffer,
162 const int device_id) {
165 thrust::sort_by_key(thrust::cuda::par(allocator).on(qe_cuda_stream),
167 d_key_buffer + idx_count,
169 thrust::greater<K>());
171 thrust::sort_by_key(thrust::cuda::par(allocator).on(qe_cuda_stream),
173 d_key_buffer + idx_count,
179 template <
class I =
int32_t>
181 const size_t idx_count,
182 const int8_t* d_src_buffer,
186 const int device_id) {
188 if (oe_type.is_fp()) {
191 auto d_oe_buffer = get_device_ptr<float>(idx_count, allocator);
201 d_idx_first, idx_count, d_oe_buffer, oe.
is_desc, allocator, device_id);
205 auto d_oe_buffer = get_device_ptr<double>(idx_count, allocator);
215 d_idx_first, idx_count, d_oe_buffer, oe.
is_desc, allocator, device_id);
223 CHECK(oe_type.is_number() || oe_type.is_time());
226 auto d_oe_buffer = get_device_ptr<int32_t>(idx_count, allocator);
236 d_idx_first, idx_count, d_oe_buffer, oe.
is_desc, allocator, device_id);
240 auto d_oe_buffer = get_device_ptr<int64_t>(idx_count, allocator);
250 d_idx_first, idx_count, d_oe_buffer, oe.
is_desc, allocator, device_id);
261 const int8_t* in_base,
275 template <
typename DerivedPolicy>
277 const thrust::detail::execution_policy_base<DerivedPolicy>& exec,
279 const size_t key_width,
280 const size_t row_size,
287 thrust::make_counting_iterator(first),
288 thrust::make_counting_iterator(last),
294 thrust::make_counting_iterator(first),
295 thrust::make_counting_iterator(last),
305 const int64_t* dev_heaps,
306 const size_t heaps_size,
310 const size_t group_key_bytes,
311 const size_t thread_count,
312 const int device_id) {
315 const int8_t* rows_ptr =
reinterpret_cast<const int8_t*
>(dev_heaps) +
317 const auto total_entry_count = n * thread_count;
319 auto d_indices = get_device_ptr<int32_t>(total_entry_count, thrust_allocator);
321 thrust::sequence(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
323 d_indices + total_entry_count);
326 (group_key_bytes == 4)
327 ? thrust::partition(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
329 d_indices + total_entry_count,
331 : thrust::partition(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
333 d_indices + total_entry_count,
336 const size_t actual_entry_count =
separator - d_indices;
337 if (!actual_entry_count) {
338 std::vector<int8_t> top_rows(row_size * n);
340 thrust::host, &top_rows[0], layout.
col_bytes, row_size, 0, n);
345 if (oe_type.get_notnull()) {
347 d_indices, actual_entry_count, rows_ptr, oe, layout, thrust_allocator, device_id);
350 d_indices + actual_entry_count,
356 const size_t null_count =
separator - d_indices;
357 if (null_count < actual_entry_count) {
359 actual_entry_count - null_count,
367 const size_t nonnull_count =
separator - d_indices;
368 if (nonnull_count > 0) {
370 d_indices, nonnull_count, rows_ptr, oe, layout, thrust_allocator, device_id);
375 const auto final_entry_count = std::min(n, actual_entry_count);
376 auto d_top_rows = get_device_ptr<int8_t>(row_size *
n, thrust_allocator);
377 thrust::for_each(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
378 thrust::make_counting_iterator(
size_t(0)),
379 thrust::make_counting_iterator(final_entry_count),
382 thrust::raw_pointer_cast(d_indices),
386 if (final_entry_count < n) {
388 thrust::raw_pointer_cast(d_top_rows),
396 std::vector<int8_t> top_rows(row_size * n);
397 thrust::copy(d_top_rows, d_top_rows + row_size * n, top_rows.begin());
void reset_keys_in_row_buffer(const thrust::detail::execution_policy_base< DerivedPolicy > &exec, int8_t *row_buffer, const size_t key_width, const size_t row_size, const size_t first, const size_t last)
__host__ __device__ void operator()(const size_t index)
__host__ __device__ void operator()(const I index)
is_null_order_entry(const int8_t *base, const size_t stride, const int64_t nul)
Utility functions for easy access to the result set buffers.
Streaming Top N algorithm.
size_t get_rows_offset_of_heaps(const size_t n, const size_t thread_count)
std::vector< int8_t > pop_n_rows_from_merged_heaps_gpu(Data_Namespace::DataMgr *data_mgr, const int64_t *dev_heaps, const size_t heaps_size, const size_t n, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t group_key_bytes, const size_t thread_count, const int device_id)
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
DEVICE auto copy(ARGS &&...args)
KeyFetcher(K *out_base, const int8_t *src_oe_base, const size_t stride, const I *indices)
Utility functions for group by buffer entries.
void collect_order_entry_column(thrust::device_ptr< K > &d_oe_col_buffer, const int8_t *d_src_buffer, const thrust::device_ptr< I > &d_idx_first, const size_t idx_count, const size_t oe_offset, const size_t oe_stride, ThrustAllocator &allocator, const int device_id)
is_taken_entry(const int8_t *buff, const size_t stride)
__host__ __device__ void operator()(const I index)
KeyReseter(int8_t *out_base, const size_t stride, const K emp_key)
void do_radix_sort(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const int8_t *d_src_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, ThrustAllocator &allocator, const int device_id)
__host__ __device__ bool operator()(const I index)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
RowFetcher(int8_t *out_base, const int8_t *in_base, const I *indices, const size_t row_sz)
size_t get_heap_size(const size_t row_size, const size_t n, const size_t thread_count)
const TargetInfo oe_target_info
#define checkCudaErrors(err)
__host__ __device__ bool operator()(const I index)
ForwardIterator partition_by_null(ForwardIterator first, ForwardIterator last, const int64_t null_val, const bool nulls_first, const int8_t *rows_ptr, const GroupByBufferLayoutInfo &layout)
void sort_indices_by_key(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const thrust::device_ptr< K > &d_key_buffer, const bool desc, ThrustAllocator &allocator, const int device_id)