OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSetReduction.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "DynamicWatchdog.h"
24 #include "Execute.h"
25 #include "ResultSet.h"
27 #include "ResultSetReductionJIT.h"
28 #include "RuntimeFunctions.h"
29 #include "Shared/SqlTypesLayout.h"
30 #include "Shared/likely.h"
31 #include "Shared/thread_count.h"
32 
33 #include <llvm/ExecutionEngine/GenericValue.h>
34 
35 #include <algorithm>
36 #include <future>
37 #include <numeric>
38 
39 extern bool g_enable_dynamic_watchdog;
40 
41 namespace {
42 
43 bool use_multithreaded_reduction(const size_t entry_count) {
44  return entry_count > 100000;
45 }
46 
48  const auto row_bytes = get_row_bytes(query_mem_desc);
49  CHECK_EQ(size_t(0), row_bytes % 8);
50  return row_bytes / 8;
51 }
52 
53 std::vector<int64_t> make_key(const int64_t* buff,
54  const size_t entry_count,
55  const size_t key_count) {
56  std::vector<int64_t> key;
57  size_t off = 0;
58  for (size_t i = 0; i < key_count; ++i) {
59  key.push_back(buff[off]);
60  off += entry_count;
61  }
62  return key;
63 }
64 
65 void fill_slots(int64_t* dst_entry,
66  const size_t dst_entry_count,
67  const int64_t* src_buff,
68  const size_t src_entry_idx,
69  const size_t src_entry_count,
71  const auto slot_count = query_mem_desc.getBufferColSlotCount();
72  const auto key_count = query_mem_desc.getGroupbyColCount();
73  if (query_mem_desc.didOutputColumnar()) {
74  for (size_t i = 0, dst_slot_off = 0; i < slot_count;
75  ++i, dst_slot_off += dst_entry_count) {
76  dst_entry[dst_slot_off] =
77  src_buff[slot_offset_colwise(src_entry_idx, i, key_count, src_entry_count)];
78  }
79  } else {
80  const auto row_ptr = src_buff + get_row_qw_count(query_mem_desc) * src_entry_idx;
81  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
82  for (size_t i = 0; i < slot_count; ++i) {
83  dst_entry[i] = row_ptr[slot_off_quad + i];
84  }
85  }
86 }
87 
89 void fill_empty_key_32(int32_t* key_ptr_i32, const size_t key_count) {
90  for (size_t i = 0; i < key_count; ++i) {
91  key_ptr_i32[i] = EMPTY_KEY_32;
92  }
93 }
94 
96 void fill_empty_key_64(int64_t* key_ptr_i64, const size_t key_count) {
97  for (size_t i = 0; i < key_count; ++i) {
98  key_ptr_i64[i] = EMPTY_KEY_64;
99  }
100 }
101 
102 inline int64_t get_component(const int8_t* group_by_buffer,
103  const size_t comp_sz,
104  const size_t index = 0) {
105  int64_t ret = std::numeric_limits<int64_t>::min();
106  switch (comp_sz) {
107  case 1: {
108  ret = group_by_buffer[index];
109  break;
110  }
111  case 2: {
112  const int16_t* buffer_ptr = reinterpret_cast<const int16_t*>(group_by_buffer);
113  ret = buffer_ptr[index];
114  break;
115  }
116  case 4: {
117  const int32_t* buffer_ptr = reinterpret_cast<const int32_t*>(group_by_buffer);
118  ret = buffer_ptr[index];
119  break;
120  }
121  case 8: {
122  const int64_t* buffer_ptr = reinterpret_cast<const int64_t*>(group_by_buffer);
123  ret = buffer_ptr[index];
124  break;
125  }
126  default:
127  CHECK(false);
128  }
129  return ret;
130 }
131 
132 void run_reduction_code(const size_t executor_id,
133  const ReductionCode& reduction_code,
134  int8_t* this_buff,
135  const int8_t* that_buff,
136  const int32_t start_entry_index,
137  const int32_t end_entry_index,
138  const int32_t that_entry_count,
139  const void* this_qmd,
140  const void* that_qmd,
141  const void* serialized_varlen_buffer) {
142  int err = 0;
143  if (reduction_code.func_ptr) {
144  err = reduction_code.func_ptr(this_buff,
145  that_buff,
146  start_entry_index,
147  end_entry_index,
148  that_entry_count,
149  this_qmd,
150  that_qmd,
151  serialized_varlen_buffer);
152  } else {
153  auto ret = ReductionInterpreter::run(
154  executor_id,
155  reduction_code.ir_reduce_loop.get(),
158  ReductionInterpreter::MakeEvalValue(start_entry_index),
159  ReductionInterpreter::MakeEvalValue(end_entry_index),
160  ReductionInterpreter::MakeEvalValue(that_entry_count),
163  ReductionInterpreter::MakeEvalValue(serialized_varlen_buffer)});
164  err = ret.int_val;
165  }
166  if (err) {
167  if (err == int32_t(heavyai::ErrorCode::SINGLE_VALUE_FOUND_MULTIPLE_VALUES)) {
168  throw std::runtime_error("Multiple distinct values encountered");
169  } else if (err == int32_t(heavyai::ErrorCode::INTERRUPTED)) {
170  throw std::runtime_error(
171  "Query execution has interrupted during result set reduction");
172  }
173  throw std::runtime_error(
174  "Query execution has exceeded the time limit or was interrupted during result "
175  "set reduction");
176  }
177 }
178 
179 } // namespace
180 
181 void result_set::fill_empty_key(void* key_ptr,
182  const size_t key_count,
183  const size_t key_width) {
184  switch (key_width) {
185  case 4: {
186  auto key_ptr_i32 = reinterpret_cast<int32_t*>(key_ptr);
187  fill_empty_key_32(key_ptr_i32, key_count);
188  break;
189  }
190  case 8: {
191  auto key_ptr_i64 = reinterpret_cast<int64_t*>(key_ptr);
192  fill_empty_key_64(key_ptr_i64, key_count);
193  break;
194  }
195  default:
196  CHECK(false);
197  }
198 }
199 
200 // Driver method for various buffer layouts, actual work is done by reduceOne* methods.
201 // Reduces the entries of `that` into the buffer of this ResultSetStorage object.
203  const std::vector<std::string>& serialized_varlen_buffer,
204  const ReductionCode& reduction_code,
205  const size_t executor_id) const {
206  auto entry_count = query_mem_desc_.getEntryCount();
207  CHECK_GT(entry_count, size_t(0));
215  }
216  const auto that_entry_count = that.query_mem_desc_.getEntryCount();
219  CHECK_GE(entry_count, that_entry_count);
220  break;
221  default:
222  CHECK_EQ(entry_count, that_entry_count);
223  }
224  auto this_buff = buff_;
225  CHECK(this_buff);
226  auto that_buff = that.buff_;
227  CHECK(that_buff);
230  if (!serialized_varlen_buffer.empty()) {
231  throw std::runtime_error(
232  "Projection of variable length targets with baseline hash group by is not yet "
233  "supported in Distributed mode");
234  }
235  if (use_multithreaded_reduction(that_entry_count)) {
236  const size_t thread_count = cpu_threads();
237  std::vector<std::future<void>> reduction_threads;
238  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
239  const auto thread_entry_count =
240  (that_entry_count + thread_count - 1) / thread_count;
241  const auto start_index = thread_idx * thread_entry_count;
242  const auto end_index =
243  std::min(start_index + thread_entry_count, that_entry_count);
244  reduction_threads.emplace_back(std::async(
246  [this,
247  this_buff,
248  that_buff,
249  start_index,
250  end_index,
251  that_entry_count,
252  executor_id,
253  &reduction_code,
254  &that] {
255  if (reduction_code.ir_reduce_loop) {
256  run_reduction_code(executor_id,
257  reduction_code,
258  this_buff,
259  that_buff,
260  start_index,
261  end_index,
262  that_entry_count,
264  &that.query_mem_desc_,
265  nullptr);
266  } else {
267  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
269  this_buff, that_buff, entry_idx, that_entry_count, that);
270  }
271  }
272  }));
273  }
274  for (auto& reduction_thread : reduction_threads) {
275  reduction_thread.wait();
276  }
277  for (auto& reduction_thread : reduction_threads) {
278  reduction_thread.get();
279  }
280  } else {
281  if (reduction_code.ir_reduce_loop) {
282  run_reduction_code(executor_id,
283  reduction_code,
284  this_buff,
285  that_buff,
286  0,
287  that_entry_count,
288  that_entry_count,
290  &that.query_mem_desc_,
291  nullptr);
292  } else {
293  for (size_t i = 0; i < that_entry_count; ++i) {
294  reduceOneEntryBaseline(this_buff, that_buff, i, that_entry_count, that);
295  }
296  }
297  }
298  return;
299  }
300  if (use_multithreaded_reduction(entry_count)) {
301  const size_t thread_count = cpu_threads();
302  std::vector<std::future<void>> reduction_threads;
303  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
304  const auto thread_entry_count = (entry_count + thread_count - 1) / thread_count;
305  const auto start_index = thread_idx * thread_entry_count;
306  const auto end_index = std::min(start_index + thread_entry_count, entry_count);
307  if (query_mem_desc_.didOutputColumnar()) {
308  reduction_threads.emplace_back(std::async(std::launch::async,
309  [this,
310  this_buff,
311  that_buff,
312  start_index,
313  end_index,
314  &that,
315  &serialized_varlen_buffer,
316  &executor_id] {
317  reduceEntriesNoCollisionsColWise(
318  this_buff,
319  that_buff,
320  that,
321  start_index,
322  end_index,
323  serialized_varlen_buffer,
324  executor_id);
325  }));
326  } else {
327  reduction_threads.emplace_back(std::async(std::launch::async,
328  [this,
329  this_buff,
330  that_buff,
331  start_index,
332  end_index,
333  that_entry_count,
334  executor_id,
335  &reduction_code,
336  &that,
337  &serialized_varlen_buffer] {
338  CHECK(reduction_code.ir_reduce_loop);
340  executor_id,
341  reduction_code,
342  this_buff,
343  that_buff,
344  start_index,
345  end_index,
346  that_entry_count,
347  &query_mem_desc_,
348  &that.query_mem_desc_,
349  &serialized_varlen_buffer);
350  }));
351  }
352  }
353  for (auto& reduction_thread : reduction_threads) {
354  reduction_thread.wait();
355  }
356  for (auto& reduction_thread : reduction_threads) {
357  reduction_thread.get();
358  }
359  } else {
360  if (query_mem_desc_.didOutputColumnar()) {
361  reduceEntriesNoCollisionsColWise(this_buff,
362  that_buff,
363  that,
364  0,
365  query_mem_desc_.getEntryCount(),
366  serialized_varlen_buffer,
367  executor_id);
368  } else {
369  CHECK(reduction_code.ir_reduce_loop);
370  run_reduction_code(executor_id,
371  reduction_code,
372  this_buff,
373  that_buff,
374  0,
375  entry_count,
376  that_entry_count,
377  &query_mem_desc_,
378  &that.query_mem_desc_,
379  &serialized_varlen_buffer);
380  }
381  }
382 }
383 
384 namespace {
385 
387  if (UNLIKELY(dynamic_watchdog())) {
388  // TODO(alex): distinguish between the deadline and interrupt
389  throw std::runtime_error(
390  "Query execution has exceeded the time limit or was interrupted during result "
391  "set reduction");
392  }
393 }
394 
395 ALWAYS_INLINE void check_watchdog_with_seed(const size_t sample_seed) {
396  if (UNLIKELY((sample_seed & 0x3F) == 0 && dynamic_watchdog())) {
397  // TODO(alex): distinguish between the deadline and interrupt
398  throw std::runtime_error(
399  "Query execution has exceeded the time limit or was interrupted during result "
400  "set reduction");
401  }
402 }
403 
404 } // namespace
405 
407  int8_t* this_buff,
408  const int8_t* that_buff,
409  const ResultSetStorage& that,
410  const size_t start_index,
411  const size_t end_index,
412  const std::vector<std::string>& serialized_varlen_buffer,
413  const size_t executor_id) const {
414  // TODO(adb / saman): Support column wise output when serializing distributed agg
415  // functions
416  CHECK(serialized_varlen_buffer.empty());
417 
418  const auto& col_slot_context = query_mem_desc_.getColSlotContext();
419 
420  auto this_crt_col_ptr = get_cols_ptr(this_buff, query_mem_desc_);
421  auto that_crt_col_ptr = get_cols_ptr(that_buff, query_mem_desc_);
422  auto executor = Executor::getExecutor(executor_id);
423  CHECK(executor);
424  for (size_t target_idx = 0; target_idx < targets_.size(); ++target_idx) {
425  const auto& agg_info = targets_[target_idx];
426  const auto& slots_for_col = col_slot_context.getSlotsForCol(target_idx);
427 
428  bool two_slot_target{false};
429  if (agg_info.is_agg &&
430  (agg_info.agg_kind == kAVG ||
431  (agg_info.agg_kind == kSAMPLE && agg_info.sql_type.is_varlen()))) {
432  // Note that this assumes if one of the slot pairs in a given target is an array,
433  // all slot pairs are arrays. Currently this is true for all geo targets, but we
434  // should better codify and store this information in the future
435  two_slot_target = true;
436  }
438  executor->checkNonKernelTimeInterrupted())) {
439  throw std::runtime_error(
440  "Query execution was interrupted during result set reduction");
441  }
443  check_watchdog();
444  }
445  for (size_t target_slot_idx = slots_for_col.front();
446  target_slot_idx < slots_for_col.back() + 1;
447  target_slot_idx += 2) {
448  const auto this_next_col_ptr = advance_to_next_columnar_target_buff(
449  this_crt_col_ptr, query_mem_desc_, target_slot_idx);
450  const auto that_next_col_ptr = advance_to_next_columnar_target_buff(
451  that_crt_col_ptr, query_mem_desc_, target_slot_idx);
452  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
453  if (isEmptyEntryColumnar(entry_idx, that_buff)) {
454  continue;
455  }
457  // copy the key from right hand side
458  copyKeyColWise(entry_idx, this_buff, that_buff);
459  }
460  auto this_ptr1 =
461  this_crt_col_ptr +
462  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
463  auto that_ptr1 =
464  that_crt_col_ptr +
465  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
466  int8_t* this_ptr2{nullptr};
467  const int8_t* that_ptr2{nullptr};
468  if (UNLIKELY(two_slot_target)) {
469  this_ptr2 =
470  this_next_col_ptr +
471  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
472  that_ptr2 =
473  that_next_col_ptr +
474  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
475  }
476  reduceOneSlot(this_ptr1,
477  this_ptr2,
478  that_ptr1,
479  that_ptr2,
480  agg_info,
481  target_idx,
482  target_slot_idx,
483  target_slot_idx,
484  that,
485  slots_for_col.front(),
486  serialized_varlen_buffer);
487  }
488 
489  this_crt_col_ptr = this_next_col_ptr;
490  that_crt_col_ptr = that_next_col_ptr;
491  if (UNLIKELY(two_slot_target)) {
492  this_crt_col_ptr = advance_to_next_columnar_target_buff(
493  this_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
494  that_crt_col_ptr = advance_to_next_columnar_target_buff(
495  that_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
496  }
497  }
498  }
499 }
500 
501 /*
502  * copy all keys from the columnar prepended group buffer of "that_buff" into
503  * "this_buff"
504  */
505 void ResultSetStorage::copyKeyColWise(const size_t entry_idx,
506  int8_t* this_buff,
507  const int8_t* that_buff) const {
509  for (size_t group_idx = 0; group_idx < query_mem_desc_.getGroupbyColCount();
510  group_idx++) {
511  // if the column corresponds to a group key
512  const auto column_offset_bytes =
514  auto lhs_key_ptr = this_buff + column_offset_bytes;
515  auto rhs_key_ptr = that_buff + column_offset_bytes;
516  switch (query_mem_desc_.groupColWidth(group_idx)) {
517  case 8:
518  *(reinterpret_cast<int64_t*>(lhs_key_ptr) + entry_idx) =
519  *(reinterpret_cast<const int64_t*>(rhs_key_ptr) + entry_idx);
520  break;
521  case 4:
522  *(reinterpret_cast<int32_t*>(lhs_key_ptr) + entry_idx) =
523  *(reinterpret_cast<const int32_t*>(rhs_key_ptr) + entry_idx);
524  break;
525  case 2:
526  *(reinterpret_cast<int16_t*>(lhs_key_ptr) + entry_idx) =
527  *(reinterpret_cast<const int16_t*>(rhs_key_ptr) + entry_idx);
528  break;
529  case 1:
530  *(reinterpret_cast<int8_t*>(lhs_key_ptr) + entry_idx) =
531  *(reinterpret_cast<const int8_t*>(rhs_key_ptr) + entry_idx);
532  break;
533  default:
534  CHECK(false);
535  break;
536  }
537  }
538 }
539 
540 // Rewrites the entries of this ResultSetStorage object to point directly into the
541 // serialized_varlen_buffer rather than using offsets.
543  const std::vector<std::string>& serialized_varlen_buffer) const {
544  if (serialized_varlen_buffer.empty()) {
545  return;
546  }
547 
549  auto entry_count = query_mem_desc_.getEntryCount();
550  CHECK_GT(entry_count, size_t(0));
551  CHECK(buff_);
552 
553  // Row-wise iteration, consider moving to separate function
554  for (size_t i = 0; i < entry_count; ++i) {
555  if (isEmptyEntry(i, buff_)) {
556  continue;
557  }
558  const auto key_bytes = get_key_bytes_rowwise(query_mem_desc_);
559  const auto key_bytes_with_padding = align_to_int64(key_bytes);
560  auto rowwise_targets_ptr =
561  row_ptr_rowwise(buff_, query_mem_desc_, i) + key_bytes_with_padding;
562  size_t target_slot_idx = 0;
563  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
564  ++target_logical_idx) {
565  const auto& target_info = targets_[target_logical_idx];
566  if (target_info.sql_type.is_varlen() && target_info.is_agg) {
567  CHECK(target_info.agg_kind == kSAMPLE);
568  auto ptr1 = rowwise_targets_ptr;
569  auto slot_idx = target_slot_idx;
570  auto ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx);
571  auto offset = *reinterpret_cast<const int64_t*>(ptr1);
572 
573  const auto& elem_ti = target_info.sql_type.get_elem_type();
574  size_t length_to_elems =
575  target_info.sql_type.is_string() || target_info.sql_type.is_geometry()
576  ? 1
577  : elem_ti.get_size();
578  if (target_info.sql_type.is_geometry()) {
579  for (int j = 0; j < target_info.sql_type.get_physical_coord_cols(); j++) {
580  if (j > 0) {
581  ptr1 = ptr2 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 1);
582  ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 2);
583  slot_idx += 2;
584  length_to_elems = 4;
585  }
586  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
587  const auto& varlen_bytes_str = serialized_varlen_buffer[offset++];
588  const auto str_ptr =
589  reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
590  CHECK(ptr1);
591  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
592  CHECK(ptr2);
593  *reinterpret_cast<int64_t*>(ptr2) =
594  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
595  }
596  } else {
597  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
598  const auto& varlen_bytes_str = serialized_varlen_buffer[offset];
599  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
600  CHECK(ptr1);
601  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
602  CHECK(ptr2);
603  *reinterpret_cast<int64_t*>(ptr2) =
604  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
605  }
606  }
607 
608  rowwise_targets_ptr = advance_target_ptr_row_wise(
609  rowwise_targets_ptr, target_info, target_slot_idx, query_mem_desc_, false);
610  target_slot_idx = advance_slot(target_slot_idx, target_info, false);
611  }
612  }
613 
614  return;
615 }
616 
617 namespace {
618 
619 #ifdef _MSC_VER
620 #define mapd_cas(address, compare, val) \
621  InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
622  static_cast<long>(val), \
623  static_cast<long>(compare))
624 #else
625 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
626 #endif
627 
629  const uint32_t h,
630  const int64_t* key,
631  const uint32_t key_qw_count,
632  const size_t entry_count) {
633  auto off = h;
634  const auto old_key = mapd_cas(&groups_buffer[off], EMPTY_KEY_64, *key);
635  if (old_key == EMPTY_KEY_64) {
636  for (size_t i = 0; i < key_qw_count; ++i) {
637  groups_buffer[off] = key[i];
638  off += entry_count;
639  }
640  return {&groups_buffer[off], true};
641  }
642  off = h;
643  for (size_t i = 0; i < key_qw_count; ++i) {
644  if (groups_buffer[off] != key[i]) {
645  return {nullptr, true};
646  }
647  off += entry_count;
648  }
649  return {&groups_buffer[off], false};
650 }
651 
652 #undef mapd_cas
653 
654 // TODO(alex): fix synchronization when we enable it
656  int64_t* groups_buffer,
657  const uint32_t groups_buffer_entry_count,
658  const int64_t* key,
659  const uint32_t key_qw_count) {
660  uint32_t h = key_hash(key, key_qw_count, sizeof(int64_t)) % groups_buffer_entry_count;
662  groups_buffer, h, key, key_qw_count, groups_buffer_entry_count);
663  if (matching_gvi.first) {
664  return matching_gvi;
665  }
666  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
667  while (h_probe != h) {
669  groups_buffer, h_probe, key, key_qw_count, groups_buffer_entry_count);
670  if (matching_gvi.first) {
671  return matching_gvi;
672  }
673  h_probe = (h_probe + 1) % groups_buffer_entry_count;
674  }
675  return {nullptr, true};
676 }
677 
678 #ifdef _MSC_VER
679 #define cas_cst(ptr, expected, desired) \
680  (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
681  reinterpret_cast<void*>(&desired), \
682  expected) == expected)
683 #define store_cst(ptr, val) \
684  InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
685  reinterpret_cast<void*>(val))
686 #define load_cst(ptr) \
687  InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
688 #else
689 #define cas_cst(ptr, expected, desired) \
690  __atomic_compare_exchange_n( \
691  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
692 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
693 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
694 #endif
695 
696 template <typename T = int64_t>
698  int64_t* groups_buffer,
699  const uint32_t h,
700  const T* key,
701  const uint32_t key_count,
703  const int64_t* that_buff_i64,
704  const size_t that_entry_idx,
705  const size_t that_entry_count,
706  const uint32_t row_size_quad) {
707  auto off = h * row_size_quad;
708  T empty_key = get_empty_key<T>();
709  T write_pending = get_empty_key<T>() - 1;
710  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
711  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
712  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
713  if (success) {
714  fill_slots(groups_buffer + off + slot_off_quad,
715  query_mem_desc.getEntryCount(),
716  that_buff_i64,
717  that_entry_idx,
718  that_entry_count,
720  if (key_count > 1) {
721  memcpy(row_ptr + 1, key + 1, (key_count - 1) * sizeof(T));
722  }
723  store_cst(row_ptr, *key);
724  return {groups_buffer + off + slot_off_quad, true};
725  }
726  while (load_cst(row_ptr) == write_pending) {
727  // spin until the winning thread has finished writing the entire key and the init
728  // value
729  }
730  for (size_t i = 0; i < key_count; ++i) {
731  if (load_cst(row_ptr + i) != key[i]) {
732  return {nullptr, true};
733  }
734  }
735  return {groups_buffer + off + slot_off_quad, false};
736 }
737 
738 #undef load_cst
739 #undef store_cst
740 #undef cas_cst
741 
743  int64_t* groups_buffer,
744  const uint32_t h,
745  const int64_t* key,
746  const uint32_t key_count,
747  const size_t key_width,
749  const int64_t* that_buff_i64,
750  const size_t that_entry_idx,
751  const size_t that_entry_count,
752  const uint32_t row_size_quad) {
753  switch (key_width) {
754  case 4:
755  return get_matching_group_value_reduction(groups_buffer,
756  h,
757  reinterpret_cast<const int32_t*>(key),
758  key_count,
759  query_mem_desc,
760  that_buff_i64,
761  that_entry_idx,
762  that_entry_count,
763  row_size_quad);
764  case 8:
765  return get_matching_group_value_reduction(groups_buffer,
766  h,
767  key,
768  key_count,
769  query_mem_desc,
770  that_buff_i64,
771  that_entry_idx,
772  that_entry_count,
773  row_size_quad);
774  default:
775  CHECK(false);
776  return {nullptr, true};
777  }
778 }
779 
780 } // namespace
781 
783  int64_t* groups_buffer,
784  const uint32_t groups_buffer_entry_count,
785  const int64_t* key,
786  const uint32_t key_count,
787  const size_t key_width,
789  const int64_t* that_buff_i64,
790  const size_t that_entry_idx,
791  const size_t that_entry_count,
792  const uint32_t row_size_quad) {
793  uint32_t h = key_hash(key, key_count, key_width) % groups_buffer_entry_count;
794  auto matching_gvi = get_matching_group_value_reduction(groups_buffer,
795  h,
796  key,
797  key_count,
798  key_width,
799  query_mem_desc,
800  that_buff_i64,
801  that_entry_idx,
802  that_entry_count,
803  row_size_quad);
804  if (matching_gvi.first) {
805  return matching_gvi;
806  }
807  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
808  while (h_probe != h) {
809  matching_gvi = get_matching_group_value_reduction(groups_buffer,
810  h_probe,
811  key,
812  key_count,
813  key_width,
814  query_mem_desc,
815  that_buff_i64,
816  that_entry_idx,
817  that_entry_count,
818  row_size_quad);
819  if (matching_gvi.first) {
820  return matching_gvi;
821  }
822  h_probe = (h_probe + 1) % groups_buffer_entry_count;
823  }
824  return {nullptr, true};
825 }
826 
827 // Reduces entry at position that_entry_idx in that_buff into this_buff. This is
828 // the baseline layout, so the position in this_buff isn't known to be that_entry_idx.
830  const int8_t* that_buff,
831  const size_t that_entry_idx,
832  const size_t that_entry_count,
833  const ResultSetStorage& that) const {
835  check_watchdog_with_seed(that_entry_idx);
836  }
837  const auto key_count = query_mem_desc_.getGroupbyColCount();
842  const auto key_off =
844  if (isEmptyEntry(that_entry_idx, that_buff)) {
845  return;
846  }
847  auto this_buff_i64 = reinterpret_cast<int64_t*>(this_buff);
848  auto that_buff_i64 = reinterpret_cast<const int64_t*>(that_buff);
849  const auto key = make_key(&that_buff_i64[key_off], that_entry_count, key_count);
850  auto [this_entry_slots, empty_entry] = get_group_value_columnar_reduction(
851  this_buff_i64, query_mem_desc_.getEntryCount(), &key[0], key_count);
852  CHECK(this_entry_slots);
853  if (empty_entry) {
854  fill_slots(this_entry_slots,
856  that_buff_i64,
857  that_entry_idx,
858  that_entry_count,
860  return;
861  }
863  this_entry_slots, that_buff_i64, that_entry_idx, that_entry_count, that);
864 }
865 
866 void ResultSetStorage::reduceOneEntrySlotsBaseline(int64_t* this_entry_slots,
867  const int64_t* that_buff,
868  const size_t that_entry_idx,
869  const size_t that_entry_count,
870  const ResultSetStorage& that) const {
872  const auto key_count = query_mem_desc_.getGroupbyColCount();
873  size_t j = 0;
874  size_t init_agg_val_idx = 0;
875  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
876  ++target_logical_idx) {
877  const auto& target_info = targets_[target_logical_idx];
878  const auto that_slot_off = slot_offset_colwise(
879  that_entry_idx, init_agg_val_idx, key_count, that_entry_count);
880  const auto this_slot_off = init_agg_val_idx * query_mem_desc_.getEntryCount();
881  reduceOneSlotBaseline(this_entry_slots,
882  this_slot_off,
883  that_buff,
884  that_entry_count,
885  that_slot_off,
886  target_info,
887  target_logical_idx,
888  j,
889  init_agg_val_idx,
890  that);
892  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
893  } else {
894  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) < 0) {
895  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
896  }
897  }
898  j = advance_slot(j, target_info, false);
899  }
900 }
901 
903  const size_t this_slot,
904  const int64_t* that_buff,
905  const size_t that_entry_count,
906  const size_t that_slot,
907  const TargetInfo& target_info,
908  const size_t target_logical_idx,
909  const size_t target_slot_idx,
910  const size_t init_agg_val_idx,
911  const ResultSetStorage& that) const {
913  int8_t* this_ptr2{nullptr};
914  const int8_t* that_ptr2{nullptr};
915  if (target_info.is_agg &&
916  (target_info.agg_kind == kAVG ||
917  (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()))) {
918  const auto this_count_off = query_mem_desc_.getEntryCount();
919  const auto that_count_off = that_entry_count;
920  this_ptr2 = reinterpret_cast<int8_t*>(&this_buff[this_slot + this_count_off]);
921  that_ptr2 = reinterpret_cast<const int8_t*>(&that_buff[that_slot + that_count_off]);
922  }
923  reduceOneSlot(reinterpret_cast<int8_t*>(&this_buff[this_slot]),
924  this_ptr2,
925  reinterpret_cast<const int8_t*>(&that_buff[that_slot]),
926  that_ptr2,
927  target_info,
928  target_logical_idx,
929  target_slot_idx,
930  init_agg_val_idx,
931  that,
932  target_slot_idx, // dummy, for now
933  {});
934 }
935 
936 // During the reduction of two result sets using the baseline strategy, we first create a
937 // big enough buffer to hold the entries for both and we move the entries from the first
938 // into it before doing the reduction as usual (into the first buffer).
939 template <class KeyType>
941  const size_t new_entry_count) const {
943  CHECK_GT(new_entry_count, query_mem_desc_.getEntryCount());
944  auto new_buff_i64 = reinterpret_cast<int64_t*>(new_buff);
945  const auto key_count = query_mem_desc_.getGroupbyColCount();
948  const auto src_buff = reinterpret_cast<const int64_t*>(buff_);
949  const auto row_qw_count = get_row_qw_count(query_mem_desc_);
950  const auto key_byte_width = query_mem_desc_.getEffectiveKeyWidth();
951 
953  const size_t thread_count = cpu_threads();
954  std::vector<std::future<void>> move_threads;
955 
956  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
957  const auto thread_entry_count =
958  (query_mem_desc_.getEntryCount() + thread_count - 1) / thread_count;
959  const auto start_index = thread_idx * thread_entry_count;
960  const auto end_index =
961  std::min(start_index + thread_entry_count, query_mem_desc_.getEntryCount());
962  move_threads.emplace_back(std::async(
964  [this,
965  src_buff,
966  new_buff_i64,
967  new_entry_count,
968  start_index,
969  end_index,
970  key_count,
971  row_qw_count,
972  key_byte_width] {
973  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
974  moveOneEntryToBuffer<KeyType>(entry_idx,
975  new_buff_i64,
976  new_entry_count,
977  key_count,
978  row_qw_count,
979  src_buff,
980  key_byte_width);
981  }
982  }));
983  }
984  for (auto& move_thread : move_threads) {
985  move_thread.wait();
986  }
987  for (auto& move_thread : move_threads) {
988  move_thread.get();
989  }
990  } else {
991  for (size_t entry_idx = 0; entry_idx < query_mem_desc_.getEntryCount(); ++entry_idx) {
992  moveOneEntryToBuffer<KeyType>(entry_idx,
993  new_buff_i64,
994  new_entry_count,
995  key_count,
996  row_qw_count,
997  src_buff,
998  key_byte_width);
999  }
1000  }
1001 }
1002 
1003 template <class KeyType>
1004 void ResultSetStorage::moveOneEntryToBuffer(const size_t entry_index,
1005  int64_t* new_buff_i64,
1006  const size_t new_entry_count,
1007  const size_t key_count,
1008  const size_t row_qw_count,
1009  const int64_t* src_buff,
1010  const size_t key_byte_width) const {
1011  const auto key_off =
1013  ? key_offset_colwise(entry_index, 0, query_mem_desc_.getEntryCount())
1014  : row_qw_count * entry_index;
1015  const auto key_ptr = reinterpret_cast<const KeyType*>(&src_buff[key_off]);
1016  if (*key_ptr == get_empty_key<KeyType>()) {
1017  return;
1018  }
1019  int64_t* new_entries_ptr{nullptr};
1021  const auto key =
1022  make_key(&src_buff[key_off], query_mem_desc_.getEntryCount(), key_count);
1023  new_entries_ptr =
1024  get_group_value_columnar(new_buff_i64, new_entry_count, &key[0], key_count);
1025  } else {
1026  new_entries_ptr = get_group_value(new_buff_i64,
1027  new_entry_count,
1028  &src_buff[key_off],
1029  key_count,
1030  key_byte_width,
1031  row_qw_count);
1032  }
1033  CHECK(new_entries_ptr);
1034  fill_slots(new_entries_ptr,
1035  new_entry_count,
1036  src_buff,
1037  entry_index,
1039  query_mem_desc_);
1040 }
1041 
1043  if (query_mem_desc_.didOutputColumnar()) {
1044  storage_->initializeColWise();
1045  } else {
1046  storage_->initializeRowWise();
1047  }
1048 }
1049 
1050 // Driver for reductions. Needed because the result of a reduction on the baseline
1051 // layout, which can have collisions, cannot be done in place and something needs
1052 // to take the ownership of the new result set with the bigger underlying buffer.
1053 ResultSet* ResultSetManager::reduce(std::vector<ResultSet*>& result_sets,
1054  const size_t executor_id) {
1055  CHECK(!result_sets.empty());
1056  auto result_rs = result_sets.front();
1057  CHECK(result_rs->storage_);
1058  auto& first_result = *result_rs->storage_;
1059  auto result = &first_result;
1060  const auto row_set_mem_owner = result_rs->row_set_mem_owner_;
1061  for (const auto result_set : result_sets) {
1062  CHECK_EQ(row_set_mem_owner, result_set->row_set_mem_owner_);
1063  }
1064  if (first_result.query_mem_desc_.getQueryDescriptionType() ==
1066  const auto total_entry_count =
1067  std::accumulate(result_sets.begin(),
1068  result_sets.end(),
1069  size_t(0),
1070  [](const size_t init, const ResultSet* rs) {
1071  return init + rs->query_mem_desc_.getEntryCount();
1072  });
1073  CHECK(total_entry_count);
1074  auto query_mem_desc = first_result.query_mem_desc_;
1075  query_mem_desc.setEntryCount(total_entry_count);
1076  rs_.reset(new ResultSet(first_result.targets_,
1079  row_set_mem_owner,
1080  0,
1081  0));
1082  auto result_storage = rs_->allocateStorage(first_result.target_init_vals_);
1083  rs_->initializeStorage();
1084  switch (query_mem_desc.getEffectiveKeyWidth()) {
1085  case 4:
1086  first_result.moveEntriesToBuffer<int32_t>(result_storage->getUnderlyingBuffer(),
1087  query_mem_desc.getEntryCount());
1088  break;
1089  case 8:
1090  first_result.moveEntriesToBuffer<int64_t>(result_storage->getUnderlyingBuffer(),
1091  query_mem_desc.getEntryCount());
1092  break;
1093  default:
1094  CHECK(false);
1095  }
1096  result = rs_->storage_.get();
1097  result_rs = rs_.get();
1098  }
1099 
1100  auto& serialized_varlen_buffer = result_sets.front()->serialized_varlen_buffer_;
1101  if (!serialized_varlen_buffer.empty()) {
1102  result->rewriteAggregateBufferOffsets(serialized_varlen_buffer.front());
1103  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1104  ++result_it) {
1105  auto& result_serialized_varlen_buffer = (*result_it)->serialized_varlen_buffer_;
1106  CHECK_EQ(result_serialized_varlen_buffer.size(), size_t(1));
1107  serialized_varlen_buffer.emplace_back(
1108  std::move(result_serialized_varlen_buffer.front()));
1109  }
1110  }
1111 
1112  ResultSetReductionJIT reduction_jit(result_rs->getQueryMemDesc(),
1113  result_rs->getTargetInfos(),
1114  result_rs->getTargetInitVals(),
1115  executor_id);
1116  auto reduction_code = reduction_jit.codegen();
1117  size_t ctr = 1;
1118  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1119  ++result_it) {
1120  if (!serialized_varlen_buffer.empty()) {
1121  result->reduce(*((*result_it)->storage_),
1122  serialized_varlen_buffer[ctr++],
1123  reduction_code,
1124  executor_id);
1125  } else {
1126  result->reduce(*((*result_it)->storage_), {}, reduction_code, executor_id);
1127  }
1128  }
1129  return result_rs;
1130 }
1131 
1132 std::shared_ptr<ResultSet> ResultSetManager::getOwnResultSet() {
1133  return rs_;
1134 }
1135 
1136 void ResultSetManager::rewriteVarlenAggregates(ResultSet* result_rs) {
1137  auto& result_storage = result_rs->storage_;
1138  result_storage->rewriteAggregateBufferOffsets(
1139  result_rs->serialized_varlen_buffer_.front());
1140 }
1141 
1142 void ResultSetStorage::fillOneEntryRowWise(const std::vector<int64_t>& entry) {
1143  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1144  const auto key_count = query_mem_desc_.getGroupbyColCount();
1145  CHECK_EQ(slot_count + key_count, entry.size());
1146  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1148  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1149  const auto key_off = key_offset_rowwise(0, key_count, slot_count);
1150  CHECK_EQ(query_mem_desc_.getEffectiveKeyWidth(), sizeof(int64_t));
1151  for (size_t i = 0; i < key_count; ++i) {
1152  this_buff[key_off + i] = entry[i];
1153  }
1154  const auto first_slot_off = slot_offset_rowwise(0, 0, key_count, slot_count);
1155  for (size_t i = 0; i < target_init_vals_.size(); ++i) {
1156  this_buff[first_slot_off + i] = entry[key_count + i];
1157  }
1158 }
1159 
1161  const auto key_count = query_mem_desc_.getGroupbyColCount();
1162  const auto row_size = get_row_bytes(query_mem_desc_);
1163  CHECK_EQ(row_size % 8, 0u);
1164  const auto key_bytes_with_padding =
1168  case 4: {
1169  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1170  auto row_ptr = buff_ + i * row_size;
1171  fill_empty_key_32(reinterpret_cast<int32_t*>(row_ptr), key_count);
1172  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1173  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1174  slot_ptr[j] = target_init_vals_[j];
1175  }
1176  }
1177  break;
1178  }
1179  case 8: {
1180  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1181  auto row_ptr = buff_ + i * row_size;
1182  fill_empty_key_64(reinterpret_cast<int64_t*>(row_ptr), key_count);
1183  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1184  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1185  slot_ptr[j] = target_init_vals_[j];
1186  }
1187  }
1188  break;
1189  }
1190  default:
1191  CHECK(false);
1192  }
1193 }
1194 
1195 void ResultSetStorage::fillOneEntryColWise(const std::vector<int64_t>& entry) {
1197  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1198  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1199  const auto key_count = query_mem_desc_.getGroupbyColCount();
1200  CHECK_EQ(slot_count + key_count, entry.size());
1201  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1202 
1203  for (size_t i = 0; i < key_count; i++) {
1204  const auto key_offset = key_offset_colwise(0, i, 1);
1205  this_buff[key_offset] = entry[i];
1206  }
1207 
1208  for (size_t i = 0; i < target_init_vals_.size(); i++) {
1209  const auto slot_offset = slot_offset_colwise(0, i, key_count, 1);
1210  this_buff[slot_offset] = entry[key_count + i];
1211  }
1212 }
1213 
1215  const auto key_count = query_mem_desc_.getGroupbyColCount();
1216  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1218  for (size_t key_idx = 0; key_idx < key_count; ++key_idx) {
1219  const auto first_key_off =
1221  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1222  this_buff[first_key_off + i] = EMPTY_KEY_64;
1223  }
1224  }
1225  for (size_t target_idx = 0; target_idx < target_init_vals_.size(); ++target_idx) {
1226  const auto first_val_off =
1227  slot_offset_colwise(0, target_idx, key_count, query_mem_desc_.getEntryCount());
1228  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1229  this_buff[first_val_off + i] = target_init_vals_[target_idx];
1230  }
1231  }
1232 }
1233 
1234 void ResultSetStorage::initializeBaselineValueSlots(int64_t* entry_slots) const {
1235  CHECK(entry_slots);
1237  size_t slot_off = 0;
1238  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1239  entry_slots[slot_off] = target_init_vals_[j];
1240  slot_off += query_mem_desc_.getEntryCount();
1241  }
1242  } else {
1243  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1244  entry_slots[j] = target_init_vals_[j];
1245  }
1246  }
1247 }
1248 
1249 #define AGGREGATE_ONE_VALUE( \
1250  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1251  do { \
1252  const auto sql_type = get_compact_type(agg_info__); \
1253  if (sql_type.is_fp()) { \
1254  if (chosen_bytes__ == sizeof(float)) { \
1255  agg_##agg_kind__##_float(reinterpret_cast<int32_t*>(val_ptr__), \
1256  *reinterpret_cast<const float*>(other_ptr__)); \
1257  } else { \
1258  agg_##agg_kind__##_double(reinterpret_cast<int64_t*>(val_ptr__), \
1259  *reinterpret_cast<const double*>(other_ptr__)); \
1260  } \
1261  } else { \
1262  if (chosen_bytes__ == sizeof(int32_t)) { \
1263  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1264  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1265  agg_##agg_kind__##_int32(val_ptr, *other_ptr); \
1266  } else { \
1267  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1268  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1269  agg_##agg_kind__(val_ptr, *other_ptr); \
1270  } \
1271  } \
1272  } while (0)
1273 
1274 #define AGGREGATE_ONE_NULLABLE_VALUE( \
1275  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1276  do { \
1277  if (agg_info__.skip_null_val) { \
1278  const auto sql_type = get_compact_type(agg_info__); \
1279  if (sql_type.is_fp()) { \
1280  if (chosen_bytes__ == sizeof(float)) { \
1281  agg_##agg_kind__##_float_skip_val( \
1282  reinterpret_cast<int32_t*>(val_ptr__), \
1283  *reinterpret_cast<const float*>(other_ptr__), \
1284  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1285  } else { \
1286  agg_##agg_kind__##_double_skip_val( \
1287  reinterpret_cast<int64_t*>(val_ptr__), \
1288  *reinterpret_cast<const double*>(other_ptr__), \
1289  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1290  } \
1291  } else { \
1292  if (chosen_bytes__ == sizeof(int32_t)) { \
1293  int32_t* val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1294  const int32_t* other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1295  const auto null_val = static_cast<int32_t>(init_val__); \
1296  agg_##agg_kind__##_int32_skip_val(val_ptr, *other_ptr, null_val); \
1297  } else { \
1298  int64_t* val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1299  const int64_t* other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1300  const auto null_val = static_cast<int64_t>(init_val__); \
1301  agg_##agg_kind__##_skip_val(val_ptr, *other_ptr, null_val); \
1302  } \
1303  } \
1304  } else { \
1305  AGGREGATE_ONE_VALUE( \
1306  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1307  } \
1308  } while (0)
1309 
1310 #define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__) \
1311  do { \
1312  if (chosen_bytes__ == sizeof(int32_t)) { \
1313  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1314  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1315  agg_sum_int32(val_ptr, *other_ptr); \
1316  } else { \
1317  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1318  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1319  agg_sum(val_ptr, *other_ptr); \
1320  } \
1321  } while (0)
1322 
1323 #define AGGREGATE_ONE_NULLABLE_COUNT( \
1324  val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1325  { \
1326  if (agg_info__.skip_null_val) { \
1327  const auto sql_type = get_compact_type(agg_info__); \
1328  if (sql_type.is_fp()) { \
1329  if (chosen_bytes__ == sizeof(float)) { \
1330  agg_sum_float_skip_val( \
1331  reinterpret_cast<int32_t*>(val_ptr__), \
1332  *reinterpret_cast<const float*>(other_ptr__), \
1333  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1334  } else { \
1335  agg_sum_double_skip_val( \
1336  reinterpret_cast<int64_t*>(val_ptr__), \
1337  *reinterpret_cast<const double*>(other_ptr__), \
1338  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1339  } \
1340  } else { \
1341  if (chosen_bytes__ == sizeof(int32_t)) { \
1342  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1343  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1344  const auto null_val = static_cast<int32_t>(init_val__); \
1345  agg_sum_int32_skip_val(val_ptr, *other_ptr, null_val); \
1346  } else { \
1347  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1348  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1349  const auto null_val = static_cast<int64_t>(init_val__); \
1350  agg_sum_skip_val(val_ptr, *other_ptr, null_val); \
1351  } \
1352  } \
1353  } else { \
1354  AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__); \
1355  } \
1356  }
1357 
1358 // to be used for 8/16-bit kMIN and kMAX only
1359 #define AGGREGATE_ONE_VALUE_SMALL( \
1360  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1361  do { \
1362  if (chosen_bytes__ == sizeof(int16_t)) { \
1363  auto val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1364  auto other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1365  agg_##agg_kind__##_int16(val_ptr, *other_ptr); \
1366  } else if (chosen_bytes__ == sizeof(int8_t)) { \
1367  auto val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1368  auto other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1369  agg_##agg_kind__##_int8(val_ptr, *other_ptr); \
1370  } else { \
1371  UNREACHABLE(); \
1372  } \
1373  } while (0)
1374 
1375 // to be used for 8/16-bit kMIN and kMAX only
1376 #define AGGREGATE_ONE_NULLABLE_VALUE_SMALL( \
1377  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1378  do { \
1379  if (agg_info__.skip_null_val) { \
1380  if (chosen_bytes__ == sizeof(int16_t)) { \
1381  int16_t* val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1382  const int16_t* other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1383  const auto null_val = static_cast<int16_t>(init_val__); \
1384  agg_##agg_kind__##_int16_skip_val(val_ptr, *other_ptr, null_val); \
1385  } else if (chosen_bytes == sizeof(int8_t)) { \
1386  int8_t* val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1387  const int8_t* other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1388  const auto null_val = static_cast<int8_t>(init_val__); \
1389  agg_##agg_kind__##_int8_skip_val(val_ptr, *other_ptr, null_val); \
1390  } \
1391  } else { \
1392  AGGREGATE_ONE_VALUE_SMALL( \
1393  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1394  } \
1395  } while (0)
1396 
1397 int8_t result_set::get_width_for_slot(const size_t target_slot_idx,
1398  const bool float_argument_input,
1400  if (float_argument_input) {
1401  return sizeof(float);
1402  }
1403  return query_mem_desc.getPaddedSlotWidthBytes(target_slot_idx);
1404 }
1405 
1407  const TargetInfo& target_info,
1408  const size_t target_slot_idx,
1409  const size_t init_agg_val_idx,
1410  const int8_t* that_ptr1) const {
1411  const bool float_argument_input = takes_float_argument(target_info);
1412  const auto chosen_bytes = result_set::get_width_for_slot(
1413  target_slot_idx, float_argument_input, query_mem_desc_);
1414  auto init_val = target_init_vals_[init_agg_val_idx];
1415 
1416  auto reduce = [&](auto const& size_tag) {
1417  using CastTarget = std::decay_t<decltype(size_tag)>;
1418  const auto lhs_proj_col = *reinterpret_cast<const CastTarget*>(this_ptr1);
1419  const auto rhs_proj_col = *reinterpret_cast<const CastTarget*>(that_ptr1);
1420  if (rhs_proj_col == init_val) {
1421  // ignore
1422  } else if (lhs_proj_col == init_val) {
1423  *reinterpret_cast<CastTarget*>(this_ptr1) = rhs_proj_col;
1424  } else if (lhs_proj_col != rhs_proj_col) {
1425  throw std::runtime_error("Multiple distinct values encountered");
1426  }
1427  };
1428 
1429  switch (chosen_bytes) {
1430  case 1: {
1432  reduce(int8_t());
1433  break;
1434  }
1435  case 2: {
1437  reduce(int16_t());
1438  break;
1439  }
1440  case 4: {
1441  reduce(int32_t());
1442  break;
1443  }
1444  case 8: {
1445  CHECK(!target_info.sql_type.is_varlen());
1446  reduce(int64_t());
1447  break;
1448  }
1449  default:
1450  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1451  }
1452 }
1453 
1455  int8_t* this_ptr1,
1456  int8_t* this_ptr2,
1457  const int8_t* that_ptr1,
1458  const int8_t* that_ptr2,
1459  const TargetInfo& target_info,
1460  const size_t target_logical_idx,
1461  const size_t target_slot_idx,
1462  const size_t init_agg_val_idx,
1463  const ResultSetStorage& that,
1464  const size_t first_slot_idx_for_target,
1465  const std::vector<std::string>& serialized_varlen_buffer) const {
1467  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) >= 0) {
1468  return;
1469  }
1470  }
1471  CHECK_LT(init_agg_val_idx, target_init_vals_.size());
1472  const bool float_argument_input = takes_float_argument(target_info);
1473  const auto chosen_bytes = result_set::get_width_for_slot(
1474  target_slot_idx, float_argument_input, query_mem_desc_);
1475  int64_t init_val = target_init_vals_[init_agg_val_idx]; // skip_val for nullable types
1476 
1477  if (target_info.is_agg && target_info.agg_kind == kSINGLE_VALUE) {
1479  this_ptr1, target_info, target_logical_idx, init_agg_val_idx, that_ptr1);
1480  } else if (target_info.is_agg && target_info.agg_kind != kSAMPLE) {
1481  switch (target_info.agg_kind) {
1482  case kCOUNT:
1483  case kCOUNT_IF:
1484  case kAPPROX_COUNT_DISTINCT: {
1485  if (is_distinct_target(target_info)) {
1486  CHECK_EQ(static_cast<size_t>(chosen_bytes), sizeof(int64_t));
1487  reduceOneCountDistinctSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1488  break;
1489  }
1490  CHECK_EQ(int64_t(0), init_val);
1491  AGGREGATE_ONE_COUNT(this_ptr1, that_ptr1, chosen_bytes);
1492  break;
1493  }
1494  case kAVG: {
1495  // Ignore float argument compaction for count component for fear of its overflow
1496  AGGREGATE_ONE_COUNT(this_ptr2,
1497  that_ptr2,
1498  query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx));
1499  }
1500  // fall thru
1501  case kSUM:
1502  case kSUM_IF: {
1504  sum, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1505  break;
1506  }
1507  case kMIN: {
1508  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1510  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1511  } else {
1513  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1514  }
1515  break;
1516  }
1517  case kMAX: {
1518  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1520  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1521  } else {
1523  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1524  }
1525  break;
1526  }
1527  case kAPPROX_QUANTILE:
1528  CHECK_EQ(static_cast<int8_t>(sizeof(int64_t)), chosen_bytes);
1529  reduceOneApproxQuantileSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1530  break;
1531  default:
1532  UNREACHABLE() << toString(target_info.agg_kind);
1533  }
1534  } else {
1535  switch (chosen_bytes) {
1536  case 1: {
1538  const auto rhs_proj_col = *reinterpret_cast<const int8_t*>(that_ptr1);
1539  if (rhs_proj_col != init_val) {
1540  *reinterpret_cast<int8_t*>(this_ptr1) = rhs_proj_col;
1541  }
1542  break;
1543  }
1544  case 2: {
1546  const auto rhs_proj_col = *reinterpret_cast<const int16_t*>(that_ptr1);
1547  if (rhs_proj_col != init_val) {
1548  *reinterpret_cast<int16_t*>(this_ptr1) = rhs_proj_col;
1549  }
1550  break;
1551  }
1552  case 4: {
1553  CHECK(target_info.agg_kind != kSAMPLE ||
1555  const auto rhs_proj_col = *reinterpret_cast<const int32_t*>(that_ptr1);
1556  if (rhs_proj_col != init_val) {
1557  *reinterpret_cast<int32_t*>(this_ptr1) = rhs_proj_col;
1558  }
1559  break;
1560  }
1561  case 8: {
1562  auto rhs_proj_col = *reinterpret_cast<const int64_t*>(that_ptr1);
1563  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) &&
1564  !serialized_varlen_buffer.empty()) {
1565  size_t length_to_elems{0};
1566  if (target_info.sql_type.is_geometry()) {
1567  // TODO: Assumes hard-coded sizes for geometry targets
1568  length_to_elems = target_slot_idx == first_slot_idx_for_target ? 1 : 4;
1569  } else {
1570  const auto& elem_ti = target_info.sql_type.get_elem_type();
1571  length_to_elems = target_info.sql_type.is_string() ? 1 : elem_ti.get_size();
1572  }
1573 
1574  CHECK_LT(static_cast<size_t>(rhs_proj_col), serialized_varlen_buffer.size());
1575  const auto& varlen_bytes_str = serialized_varlen_buffer[rhs_proj_col];
1576  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
1577  *reinterpret_cast<int64_t*>(this_ptr1) =
1578  reinterpret_cast<const int64_t>(str_ptr);
1579  *reinterpret_cast<int64_t*>(this_ptr2) =
1580  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
1581  } else {
1582  if (rhs_proj_col != init_val) {
1583  *reinterpret_cast<int64_t*>(this_ptr1) = rhs_proj_col;
1584  }
1585  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen())) {
1586  CHECK(this_ptr2 && that_ptr2);
1587  *reinterpret_cast<int64_t*>(this_ptr2) =
1588  *reinterpret_cast<const int64_t*>(that_ptr2);
1589  }
1590  }
1591 
1592  break;
1593  }
1594  default:
1595  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1596  }
1597  }
1598 }
1599 
1601  const int8_t* that_ptr1,
1602  const size_t target_logical_idx,
1603  const ResultSetStorage& that) const {
1605  static_assert(sizeof(int64_t) == sizeof(quantile::TDigest*));
1606  auto* incoming = *reinterpret_cast<quantile::TDigest* const*>(that_ptr1);
1607  CHECK(incoming) << "this_ptr1=" << (void*)this_ptr1
1608  << ", that_ptr1=" << (void const*)that_ptr1
1609  << ", target_logical_idx=" << target_logical_idx;
1610  if (incoming->centroids().capacity()) {
1611  auto* accumulator = *reinterpret_cast<quantile::TDigest**>(this_ptr1);
1612  CHECK(accumulator) << "this_ptr1=" << (void*)this_ptr1
1613  << ", that_ptr1=" << (void const*)that_ptr1
1614  << ", target_logical_idx=" << target_logical_idx;
1615  accumulator->allocate();
1616  accumulator->mergeTDigest(*incoming);
1617  }
1618 }
1619 
1621  const int8_t* that_ptr1,
1622  const size_t target_logical_idx,
1623  const ResultSetStorage& that) const {
1625  const auto& old_count_distinct_desc =
1626  query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1627  CHECK(old_count_distinct_desc.impl_type_ != CountDistinctImplType::Invalid);
1628  const auto& new_count_distinct_desc =
1629  that.query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1630  CHECK(old_count_distinct_desc.impl_type_ == new_count_distinct_desc.impl_type_);
1631  CHECK(this_ptr1 && that_ptr1);
1632  auto old_set_ptr = reinterpret_cast<const int64_t*>(this_ptr1);
1633  auto new_set_ptr = reinterpret_cast<const int64_t*>(that_ptr1);
1635  *new_set_ptr, *old_set_ptr, new_count_distinct_desc, old_count_distinct_desc);
1636 }
1637 
1638 bool ResultSetStorage::reduceSingleRow(const int8_t* row_ptr,
1639  const int8_t warp_count,
1640  const bool is_columnar,
1641  const bool replace_bitmap_ptr_with_bitmap_sz,
1642  std::vector<int64_t>& agg_vals,
1644  const std::vector<TargetInfo>& targets,
1645  const std::vector<int64_t>& agg_init_vals) {
1646  const size_t agg_col_count{agg_vals.size()};
1647  const auto row_size = query_mem_desc.getRowSize();
1648  CHECK_EQ(agg_col_count, query_mem_desc.getSlotCount());
1649  CHECK_GE(agg_col_count, targets.size());
1650  CHECK_EQ(is_columnar, query_mem_desc.didOutputColumnar());
1651  CHECK(query_mem_desc.hasKeylessHash());
1652  std::vector<int64_t> partial_agg_vals(agg_col_count, 0);
1653  bool discard_row = true;
1654  for (int8_t warp_idx = 0; warp_idx < warp_count; ++warp_idx) {
1655  bool discard_partial_result = true;
1656  for (size_t target_idx = 0, agg_col_idx = 0;
1657  target_idx < targets.size() && agg_col_idx < agg_col_count;
1658  ++target_idx, ++agg_col_idx) {
1659  const auto& agg_info = targets[target_idx];
1660  const bool float_argument_input = takes_float_argument(agg_info);
1661  const auto chosen_bytes = float_argument_input
1662  ? sizeof(float)
1663  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1664  auto partial_bin_val = get_component(
1665  row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx), chosen_bytes);
1666  partial_agg_vals[agg_col_idx] = partial_bin_val;
1667  if (is_distinct_target(agg_info)) {
1668  CHECK_EQ(int8_t(1), warp_count);
1669  CHECK(agg_info.is_agg &&
1670  (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kCOUNT_IF ||
1671  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT));
1672  partial_bin_val = count_distinct_set_size(
1673  partial_bin_val, query_mem_desc.getCountDistinctDescriptor(target_idx));
1674  if (replace_bitmap_ptr_with_bitmap_sz) {
1675  partial_agg_vals[agg_col_idx] = partial_bin_val;
1676  }
1677  }
1678  if (kAVG == agg_info.agg_kind) {
1679  CHECK(agg_info.is_agg && !agg_info.is_distinct);
1680  ++agg_col_idx;
1681  partial_bin_val = partial_agg_vals[agg_col_idx] =
1682  get_component(row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx),
1683  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1684  }
1685  if (agg_col_idx == static_cast<size_t>(query_mem_desc.getTargetIdxForKey()) &&
1686  partial_bin_val != agg_init_vals[query_mem_desc.getTargetIdxForKey()]) {
1687  CHECK(agg_info.is_agg);
1688  discard_partial_result = false;
1689  }
1690  }
1691  row_ptr += row_size;
1692  if (discard_partial_result) {
1693  continue;
1694  }
1695  discard_row = false;
1696  for (size_t target_idx = 0, agg_col_idx = 0;
1697  target_idx < targets.size() && agg_col_idx < agg_col_count;
1698  ++target_idx, ++agg_col_idx) {
1699  auto partial_bin_val = partial_agg_vals[agg_col_idx];
1700  const auto& agg_info = targets[target_idx];
1701  const bool float_argument_input = takes_float_argument(agg_info);
1702  const auto chosen_bytes = float_argument_input
1703  ? sizeof(float)
1704  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1705  const auto& chosen_type = get_compact_type(agg_info);
1706  if (agg_info.is_agg && agg_info.agg_kind != kSAMPLE) {
1707  try {
1708  switch (agg_info.agg_kind) {
1709  case kCOUNT:
1710  case kCOUNT_IF:
1713  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1714  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1715  agg_init_vals[agg_col_idx],
1716  chosen_bytes,
1717  agg_info);
1718  break;
1719  case kAVG:
1720  // Ignore float argument compaction for count component for fear of its
1721  // overflow
1723  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx + 1]),
1724  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx + 1]),
1725  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1726  // fall thru
1727  case kSUM:
1728  case kSUM_IF:
1730  sum,
1731  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1732  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1733  agg_init_vals[agg_col_idx],
1734  chosen_bytes,
1735  agg_info);
1736  break;
1737  case kMIN:
1738  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1740  min,
1741  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1742  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1743  agg_init_vals[agg_col_idx],
1744  chosen_bytes,
1745  agg_info);
1746  } else {
1748  min,
1749  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1750  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1751  agg_init_vals[agg_col_idx],
1752  chosen_bytes,
1753  agg_info);
1754  }
1755  break;
1756  case kMAX:
1757  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1759  max,
1760  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1761  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1762  agg_init_vals[agg_col_idx],
1763  chosen_bytes,
1764  agg_info);
1765  } else {
1767  max,
1768  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1769  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1770  agg_init_vals[agg_col_idx],
1771  chosen_bytes,
1772  agg_info);
1773  }
1774  break;
1775  default:
1776  CHECK(false);
1777  break;
1778  }
1779  } catch (std::runtime_error& e) {
1780  // TODO(miyu): handle the case where chosen_bytes < 8
1781  LOG(ERROR) << e.what();
1782  }
1783  if (chosen_type.is_integer() || chosen_type.is_decimal()) {
1784  switch (chosen_bytes) {
1785  case 8:
1786  break;
1787  case 4: {
1788  int32_t ret = *reinterpret_cast<const int32_t*>(&agg_vals[agg_col_idx]);
1789  if (!(shared::is_any<kCOUNT, kCOUNT_IF>(agg_info.agg_kind) &&
1790  ret != agg_init_vals[agg_col_idx])) {
1791  agg_vals[agg_col_idx] = static_cast<int64_t>(ret);
1792  }
1793  break;
1794  }
1795  default:
1796  CHECK(false);
1797  }
1798  }
1799  if (kAVG == agg_info.agg_kind) {
1800  ++agg_col_idx;
1801  }
1802  } else {
1803  if (agg_info.agg_kind == kSAMPLE) {
1804  CHECK(!agg_info.sql_type.is_varlen())
1805  << "Interleaved bins reduction not supported for variable length "
1806  "arguments "
1807  "to SAMPLE";
1808  }
1809  if (agg_vals[agg_col_idx]) {
1810  if (agg_info.agg_kind == kSAMPLE) {
1811  continue;
1812  }
1813  CHECK_EQ(agg_vals[agg_col_idx], partial_bin_val);
1814  } else {
1815  agg_vals[agg_col_idx] = partial_bin_val;
1816  }
1817  }
1818  }
1819  }
1820  return discard_row;
1821 }
GroupByPerfectHash
Definition: enums.h:58
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
size_t slot_offset_rowwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t slot_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
void count_distinct_set_union(const int64_t new_set_handle, const int64_t old_set_handle, const CountDistinctDescriptor &new_count_distinct_desc, const CountDistinctDescriptor &old_count_distinct_desc)
__device__ bool dynamic_watchdog()
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
#define EMPTY_KEY_64
const std::vector< TargetInfo > targets_
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
NonGroupedAggregate
Definition: enums.h:58
std::vector< int64_t > target_init_vals_
void initializeColWise() const
size_t slot_offset_colwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t entry_count)
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
int64_t getTargetGroupbyIndex(const size_t target_idx) const
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
int64_t get_component(const int8_t *group_by_buffer, const size_t comp_sz, const size_t index=0)
std::unique_ptr< Function > ir_reduce_loop
bool isLogicalSizedColumnsAllowed() const
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
SQLTypeInfo sql_type
Definition: TargetInfo.h:52
#define LOG(tag)
Definition: Logger.h:285
std::shared_ptr< ResultSet > rs_
Definition: ResultSet.h:1042
GroupValueInfo get_matching_group_value_reduction(int64_t *groups_buffer, const uint32_t h, const T *key, const uint32_t key_count, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
ALWAYS_INLINE void check_watchdog_with_seed(const size_t sample_seed)
bool is_varlen() const
Definition: sqltypes.h:631
void initializeStorage() const
#define UNREACHABLE()
Definition: Logger.h:338
#define CHECK_GE(x, y)
Definition: Logger.h:306
void fill_slots(int64_t *dst_entry, const size_t dst_entry_count, const int64_t *src_buff, const size_t src_entry_idx, const size_t src_entry_count, const QueryMemoryDescriptor &query_mem_desc)
void initializeRowWise() const
void reduceOneApproxQuantileSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
GroupValueInfo get_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_slot_off_quad(const QueryMemoryDescriptor &query_mem_desc)
size_t getEffectiveKeyWidth() const
ALWAYS_INLINE void fill_empty_key_32(int32_t *key_ptr_i32, const size_t key_count)
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:81
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:106
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:138
std::vector< int64_t > make_key(const int64_t *buff, const size_t entry_count, const size_t key_count)
T advance_target_ptr_row_wise(T target_ptr, const TargetInfo &target_info, const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc, const bool separate_varlen_storage)
#define CHECK_GT(x, y)
Definition: Logger.h:305
void rewriteVarlenAggregates(ResultSet *)
size_t getColOnlyOffInBytes(const size_t col_idx) const
Definition: sqldefs.h:78
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:513
const SQLTypeInfo get_compact_type(const TargetInfo &target)
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
int8_t groupColWidth(const size_t key_idx) const
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
future< Result > async(Fn &&fn, Args &&...args)
bool is_agg
Definition: TargetInfo.h:50
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
void init(LogOptions const &log_opts)
Definition: Logger.cpp:364
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
size_t getGroupbyColCount() const
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
#define store_cst(ptr, val)
size_t targetGroupbyIndicesSize() const
std::pair< int64_t *, bool > GroupValueInfo
Definition: sqldefs.h:80
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
#define LIKELY(x)
Definition: likely.h:24
std::shared_ptr< ResultSet > getOwnResultSet()
void fillOneEntryColWise(const std::vector< int64_t > &entry)
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:102
static EvalValue MakeEvalValue(const T &val)
GroupValueInfo get_matching_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const uint32_t key_width, const uint32_t row_size_quad)
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1703
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code, const size_t executor_id) const
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
#define mapd_cas(address, compare, val)
SQLAgg agg_kind
Definition: TargetInfo.h:51
size_t getCountDistinctDescriptorsSize() const
QueryDescriptionType getQueryDescriptionType() const
#define cas_cst(ptr, expected, desired)
#define UNLIKELY(x)
Definition: likely.h:25
size_t key_offset_colwise(const size_t entry_idx, const size_t key_idx, const size_t entry_count)
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
#define load_cst(ptr)
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer, const size_t executor_id) const
#define AGGREGATE_ONE_NULLABLE_VALUE(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
#define CHECK_LT(x, y)
Definition: Logger.h:303
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
ALWAYS_INLINE void reduceOneSlotSingleValue(int8_t *this_ptr1, const TargetInfo &target_info, const size_t target_slot_idx, const size_t init_agg_val_idx, const int8_t *that_ptr1) const
#define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__)
Definition: sqldefs.h:81
#define AGGREGATE_ONE_NULLABLE_COUNT(val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
GroupByBaselineHash
Definition: enums.h:58
size_t key_offset_rowwise(const size_t entry_idx, const size_t key_count, const size_t slot_count)
bool use_multithreaded_reduction(const size_t entry_count)
RUNTIME_EXPORT ALWAYS_INLINE DEVICE uint32_t key_hash(const int64_t *key, const uint32_t key_count, const uint32_t key_byte_width)
const ColSlotContext & getColSlotContext() const
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:597
#define EMPTY_KEY_32
static EvalValue run(const size_t execution_id, const Function *function, const std::vector< EvalValue > &inputs)
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
Basic constructors and methods of the row set interface.
size_t get_row_qw_count(const QueryMemoryDescriptor &query_mem_desc)
#define AGGREGATE_ONE_NULLABLE_VALUE_SMALL(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
ALWAYS_INLINE void fill_empty_key_64(int64_t *key_ptr_i64, const size_t key_count)
bool is_string() const
Definition: sqltypes.h:561
void run_reduction_code(const size_t executor_id, const ReductionCode &reduction_code, int8_t *this_buff, const int8_t *that_buff, const int32_t start_entry_index, const int32_t end_entry_index, const int32_t that_entry_count, const void *this_qmd, const void *that_qmd, const void *serialized_varlen_buffer)
Definition: sqldefs.h:79
#define ALWAYS_INLINE
int cpu_threads()
Definition: thread_count.h:25
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:977
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:77
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value_columnar(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
ResultSet * reduce(std::vector< ResultSet * > &, const size_t executor_id)
QueryMemoryDescriptor query_mem_desc_
int32_t getTargetIdxForKey() const
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const