9 #include "SortUtils.cuh"
11 #include <thrust/copy.h>
12 #include <thrust/execution_policy.h>
13 #include <thrust/host_vector.h>
14 #include <thrust/sort.h>
16 #define checkCudaErrors(err) CHECK_EQ(err, CUDA_SUCCESS)
18 #define FORCE_CPU_VERSION
20 #undef FORCE_CPU_VERSION
24 template <
class K,
class V,
class I>
28 const int8_t* groupby_buffer,
29 V dev_oe_col_buffer_begin,
30 V dev_oe_col_buffer_end,
32 const size_t dev_idx_buff_size,
36 if (dev_idx_buff_size == 0) {
42 thrust::sort_by_key(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
43 dev_oe_col_buffer_begin,
44 dev_oe_col_buffer_end,
46 thrust::greater<int64_t>());
49 thrust::sort_by_key(dev_oe_col_buffer_begin,
50 dev_oe_col_buffer_end,
52 thrust::greater<int64_t>());
57 thrust::sort_by_key(thrust::cuda::par(thrust_allocator).on(qe_cuda_stream),
58 dev_oe_col_buffer_begin,
59 dev_oe_col_buffer_end,
64 dev_oe_col_buffer_begin, dev_oe_col_buffer_end, dev_idx_buff_begin);
68 thrust::host_vector<uint32_t> host_vector_result(
69 dev_idx_buff_begin, dev_idx_buff_begin + std::min(top_n, dev_idx_buff_size));
75 for (
size_t i = 0; i < host_vector_result.size(); ++i) {
76 const auto entry_idx = host_vector_result[i];
77 if (is_empty_entry<K>(entry_idx, groupby_buffer, layout.
row_bytes)) {
78 host_vector_result = thrust::host_vector<uint32_t>(
79 dev_idx_buff_begin, dev_idx_buff_begin + dev_idx_buff_size);
83 std::vector<uint32_t>
result;
84 result.reserve(std::min(top_n, host_vector_result.size()));
85 for (
size_t i = 0; i < host_vector_result.size(); ++i) {
86 const auto entry_idx = host_vector_result[i];
87 if (!is_empty_entry<K>(entry_idx, groupby_buffer, layout.
row_bytes)) {
88 result.push_back(entry_idx);
89 if (result.size() >= top_n) {
98 const std::vector<uint32_t>& null_idx_buff,
100 if (null_idx_buff.empty()) {
103 const auto insertion_point = oe.
nulls_first ? idx_buff.begin() : idx_buff.end();
104 idx_buff.insert(insertion_point, null_idx_buff.begin(), null_idx_buff.end());
107 template <
typename T>
110 if (host_vec.empty()) {
111 return thrust::device_ptr<T>(
static_cast<T*
>(
nullptr));
113 const auto host_vec_bytes = host_vec.size() *
sizeof(
T);
114 T* dev_ptr =
reinterpret_cast<T*
>(
121 return thrust::device_ptr<T>(dev_ptr);
128 const int8_t* groupby_buffer,
129 const thrust::host_vector<int64_t>& oe_col_buffer,
135 thrust::host_vector<uint32_t> neg_idx_buff;
136 thrust::host_vector<uint32_t> pos_idx_buff;
137 std::vector<uint32_t> null_idx_buff;
138 thrust::host_vector<int64_t> neg_oe_col_buffer;
139 thrust::host_vector<int64_t> pos_oe_col_buffer;
140 const auto slice_entry_count =
142 neg_idx_buff.reserve(slice_entry_count);
143 pos_idx_buff.reserve(slice_entry_count);
144 null_idx_buff.reserve(slice_entry_count);
145 neg_oe_col_buffer.reserve(slice_entry_count);
146 pos_oe_col_buffer.reserve(slice_entry_count);
147 size_t oe_col_buffer_idx = 0;
153 const bool float_argument_input =
157 float_argument_input ? [](
const int64_t v) ->
bool {
return (v & (1 << 31)) != 0; }
158 : [](
const int64_t v) ->
bool {
return v < 0; };
160 for (
size_t i = start; i < layout.
entry_count; i += step, ++oe_col_buffer_idx) {
161 if (!is_empty_entry<K>(i, groupby_buffer, layout.
row_bytes) &&
162 oe_col_buffer[oe_col_buffer_idx] ==
164 null_idx_buff.push_back(i);
167 if (is_negative(oe_col_buffer[oe_col_buffer_idx])) {
169 neg_idx_buff.push_back(i);
170 neg_oe_col_buffer.push_back(oe_col_buffer[oe_col_buffer_idx]);
172 pos_idx_buff.push_back(i);
173 pos_oe_col_buffer.push_back(oe_col_buffer[oe_col_buffer_idx]);
176 std::vector<uint32_t> pos_result;
180 const auto dev_pos_oe_col_buffer =
182 pos_result = do_radix_sort<K>(device_type,
186 dev_pos_oe_col_buffer,
187 dev_pos_oe_col_buffer + pos_oe_col_buffer.size(),
195 pos_result = do_radix_sort<K>(device_type,
199 pos_oe_col_buffer.begin(),
200 pos_oe_col_buffer.end(),
201 pos_idx_buff.begin(),
207 std::vector<uint32_t> neg_result;
211 const auto dev_neg_oe_col_buffer =
213 neg_result = do_radix_sort<K>(device_type,
217 dev_neg_oe_col_buffer,
218 dev_neg_oe_col_buffer + neg_oe_col_buffer.size(),
226 neg_result = do_radix_sort<K>(device_type,
230 neg_oe_col_buffer.begin(),
231 neg_oe_col_buffer.end(),
232 neg_idx_buff.begin(),
239 pos_result.insert(pos_result.end(), neg_result.begin(), neg_result.end());
240 add_nulls(pos_result, null_idx_buff, oe);
243 neg_result.insert(neg_result.end(), pos_result.begin(), pos_result.end());
244 add_nulls(neg_result, null_idx_buff, oe);
252 const int8_t* groupby_buffer,
253 const thrust::host_vector<int64_t>& oe_col_buffer,
260 std::vector<uint32_t> null_idx_buff;
261 thrust::host_vector<uint32_t> notnull_idx_buff;
262 const auto slice_entry_count =
264 null_idx_buff.reserve(slice_entry_count);
265 notnull_idx_buff.reserve(slice_entry_count);
266 thrust::host_vector<int64_t> notnull_oe_col_buffer;
267 notnull_oe_col_buffer.reserve(slice_entry_count);
268 size_t oe_col_buffer_idx = 0;
269 for (
size_t i = start; i < layout.
entry_count; i += step, ++oe_col_buffer_idx) {
270 if (!is_empty_entry<K>(i, groupby_buffer, layout.
row_bytes) &&
272 null_idx_buff.push_back(i);
274 notnull_idx_buff.push_back(i);
275 notnull_oe_col_buffer.push_back(oe_col_buffer[oe_col_buffer_idx]);
278 std::vector<uint32_t> notnull_result;
281 const auto dev_notnull_idx_buff =
283 const auto dev_notnull_oe_col_buffer =
286 do_radix_sort<K>(device_type,
290 dev_notnull_oe_col_buffer,
291 dev_notnull_oe_col_buffer + notnull_oe_col_buffer.size(),
292 dev_notnull_idx_buff,
293 notnull_idx_buff.size(),
299 notnull_result = do_radix_sort<K>(device_type,
303 notnull_oe_col_buffer.begin(),
304 notnull_oe_col_buffer.end(),
305 notnull_idx_buff.begin(),
306 notnull_idx_buff.size(),
311 add_nulls(notnull_result, null_idx_buff, oe);
312 return notnull_result;
317 const int8_t* groupby_buffer,
321 thrust::host_vector<int64_t> oe_col_buffer;
322 const auto row_ptr = groupby_buffer + start * layout.
row_bytes;
326 const int8_t* crt_group_ptr2{
nullptr};
327 if (layout.oe_target_info.agg_kind ==
kAVG) {
328 crt_group_ptr2 = crt_group_ptr1 + layout.col_bytes;
332 const auto step_bytes = layout.row_bytes * step;
333 const auto col_bytes = float_argument_input ? entry_ti.get_size() : layout.col_bytes;
334 for (
size_t i = start; i < layout.entry_count; i += step) {
335 auto val1 =
read_int_from_buff(crt_group_ptr1, col_bytes > 0 ? col_bytes :
sizeof(K));
336 if (crt_group_ptr2) {
338 const auto avg_val =
pair_to_double({val1, val2}, entry_ti, float_argument_input);
339 val1 = *
reinterpret_cast<const int64_t*
>(&avg_val);
341 oe_col_buffer.push_back(val1);
342 crt_group_ptr1 += step_bytes;
343 if (crt_group_ptr2) {
344 crt_group_ptr2 += step_bytes;
347 return oe_col_buffer;
356 const int8_t* groupby_buffer,
362 auto oe_col_buffer = collect_order_entry_column<K>(groupby_buffer, layout, start, step);
364 CHECK(entry_ti.is_number());
365 if (entry_ti.is_fp() || layout.oe_target_info.agg_kind ==
kAVG) {
366 return baseline_sort_fp<K>(device_type,
380 return baseline_sort_int<K>(device_type,
395 if (oe_col_buffer.empty()) {
398 const auto dev_idx_buff =
399 get_device_ptr<uint32_t>(oe_col_buffer.size(), thrust_allocator);
400 thrust::sequence(dev_idx_buff, dev_idx_buff + oe_col_buffer.size(), start, step);
402 return do_radix_sort<K>(device_type,
407 dev_oe_col_buffer + oe_col_buffer.size(),
409 oe_col_buffer.size(),
415 thrust::host_vector<uint32_t> host_idx_buff(oe_col_buffer.size());
416 thrust::sequence(host_idx_buff.begin(), host_idx_buff.end(), start, step);
417 return do_radix_sort<K>(device_type,
421 oe_col_buffer.begin(),
423 host_idx_buff.begin(),
424 host_idx_buff.size(),
434 const int8_t* groupby_buffer,
445 const int8_t* groupby_buffer,
Utility functions for easy access to the result set buffers.
unsigned long long CUdeviceptr
thrust::device_ptr< T > get_device_copy_ptr(const thrust::host_vector< T > &host_vec, ThrustAllocator &thrust_allocator)
Macros and functions for groupby buffer compaction.
Data_Namespace::DataMgr * getDataMgr() const
int64_t read_int_from_buff(const int8_t *ptr, const int8_t compact_sz)
double pair_to_double(const std::pair< int64_t, int64_t > &fp_pair, const SQLTypeInfo &ti, const bool float_argument_input)
bool takes_float_argument(const TargetInfo &target_info)
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
const SQLTypeInfo get_compact_type(const TargetInfo &target)
int8_t * allocateScopedBuffer(std::ptrdiff_t num_bytes)
std::vector< uint32_t > baseline_sort(const ExecutorDeviceType device_type, const int device_id, Data_Namespace::DataMgr *data_mgr, const int8_t *groupby_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t top_n, const size_t start, const size_t step)
Utility functions for group by buffer entries.
void collect_order_entry_column(thrust::device_ptr< K > &d_oe_col_buffer, const int8_t *d_src_buffer, const thrust::device_ptr< I > &d_idx_first, const size_t idx_count, const size_t oe_offset, const size_t oe_stride, ThrustAllocator &allocator, const int device_id)
void add_nulls(std::vector< uint32_t > &idx_buff, const std::vector< uint32_t > &null_idx_buff, const PodOrderEntry &oe)
void do_radix_sort(thrust::device_ptr< I > d_idx_first, const size_t idx_count, const int8_t *d_src_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, ThrustAllocator &allocator, const int device_id)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
const TargetInfo oe_target_info
void copy_to_nvidia_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
#define checkCudaErrors(err)
std::vector< uint32_t > baseline_sort_fp(const ExecutorDeviceType device_type, const int device_id, Data_Namespace::DataMgr *data_mgr, const int8_t *groupby_buffer, const thrust::host_vector< int64_t > &oe_col_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t top_n, const size_t start, const size_t step)
template std::vector< uint32_t > baseline_sort< int32_t >(const ExecutorDeviceType device_type, const int device_id, Data_Namespace::DataMgr *data_mgr, const int8_t *groupby_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t top_n, const size_t start, const size_t step)
template std::vector< uint32_t > baseline_sort< int64_t >(const ExecutorDeviceType device_type, const int device_id, Data_Namespace::DataMgr *data_mgr, const int8_t *groupby_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t top_n, const size_t start, const size_t step)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
const int64_t target_groupby_index
std::vector< uint32_t > baseline_sort_int(const ExecutorDeviceType device_type, const int device_id, Data_Namespace::DataMgr *data_mgr, const int8_t *groupby_buffer, const thrust::host_vector< int64_t > &oe_col_buffer, const PodOrderEntry &oe, const GroupByBufferLayoutInfo &layout, const size_t top_n, const size_t start, const size_t step)