38 #include <tbb/parallel_for.h>
45 #include <cuda_runtime.h>
46 #include <thrust/scan.h>
57 const int64_t min_inner_elem,
58 const int64_t min_outer_elem,
59 const int64_t max_outer_elem,
60 const int32_t* inner_to_outer_translation_map) {
61 const auto outer_id = inner_to_outer_translation_map[inner_elem - min_inner_elem];
62 if (outer_id > max_outer_elem || outer_id < min_outer_elem) {
72 const int64_t hash_entry_count,
73 const int32_t invalid_slot_val,
74 const int32_t cpu_thread_idx,
75 const int32_t cpu_thread_count) {
77 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
78 int32_t step = blockDim.x * gridDim.x;
80 int32_t start = cpu_thread_idx;
81 int32_t step = cpu_thread_count;
83 for (int64_t i = start; i < hash_entry_count; i += step) {
84 groups_buffer[i] = invalid_slot_val;
91 void SUFFIX(init_hash_join_buff_tbb)(int32_t* groups_buffer,
92 const int64_t hash_entry_count,
93 const int32_t invalid_slot_val) {
95 [=](
const tbb::blocked_range<int64_t>& r) {
96 const auto start_idx = r.begin();
97 const auto end_idx = r.end();
98 for (
auto entry_idx = start_idx; entry_idx != end_idx;
100 groups_buffer[entry_idx] = invalid_slot_val;
105 #endif // #ifdef HAVE_TBB
106 #endif // #ifndef __CUDACC__
109 #define mapd_cas(address, compare, val) atomicCAS(address, compare, val)
110 #elif defined(_MSC_VER)
111 #define mapd_cas(address, compare, val) \
112 InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
113 static_cast<long>(val), \
114 static_cast<long>(compare))
116 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
119 template <
typename HASHTABLE_FILLING_FUNC>
121 const int32_t cpu_thread_idx,
122 const int32_t cpu_thread_count,
123 HASHTABLE_FILLING_FUNC filling_func) {
125 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
126 int32_t step = blockDim.x * gridDim.x;
128 int32_t start = cpu_thread_idx;
129 int32_t step = cpu_thread_count;
134 for (
auto item : col.slice(start, step)) {
135 const size_t index = item.index;
136 int64_t elem = item.element;
137 if (elem == type_info.null_val) {
138 if (type_info.uses_bw_eq) {
139 elem = type_info.translated_null_val;
147 if (sd_inner_to_outer_translation_map &&
148 (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
153 sd_inner_to_outer_translation_map);
160 if (filling_func(elem, index)) {
169 int32_t
const cpu_thread_idx,
170 int32_t
const cpu_thread_count) {
173 auto hashtable_filling_func = [&](
auto elem,
size_t index) {
177 args.type_info.min_val / args.bucket_normalization,
178 args.type_info.translated_null_val,
179 args.bucket_normalization);
180 return filling_func(index, entry_ptr, args.invalid_slot_val);
184 args, cpu_thread_idx, cpu_thread_count, hashtable_filling_func);
189 int32_t
const cpu_thread_idx,
190 int32_t
const cpu_thread_count) {
193 auto hashtable_filling_func = [&](
auto elem,
size_t index) {
195 args.buff, elem, args.type_info.min_val, args.type_info.translated_null_val);
196 return filling_func(index, entry_ptr, args.invalid_slot_val);
200 args, cpu_thread_idx, cpu_thread_count, hashtable_filling_func);
205 const int32_t cpu_thread_idx,
206 const int32_t cpu_thread_count) {
209 auto hashtable_filling_func = [&](
auto elem,
size_t index) {
211 return filling_func(index, entry_ptr, args.invalid_slot_val);
215 args, cpu_thread_idx, cpu_thread_count, hashtable_filling_func);
218 template <
typename HASHTABLE_FILLING_FUNC>
224 const int32_t* sd_inner_to_outer_translation_map,
225 const int32_t min_inner_elem,
226 const int32_t cpu_thread_idx,
227 const int32_t cpu_thread_count,
228 HASHTABLE_FILLING_FUNC filling_func) {
230 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
231 int32_t step = blockDim.x * gridDim.x;
233 int32_t start = cpu_thread_idx;
234 int32_t step = cpu_thread_count;
237 for (
auto item : col.slice(start, step)) {
238 const size_t index = item.index;
239 int64_t elem = item.element;
241 if (shard != shard_info.
shard) {
252 if (sd_inner_to_outer_translation_map &&
258 sd_inner_to_outer_translation_map);
265 if (filling_func(elem, shard, index)) {
274 const int32_t invalid_slot_val,
275 const bool for_semi_join,
279 const int32_t* sd_inner_to_outer_translation_map,
280 const int32_t min_inner_elem,
281 const int32_t cpu_thread_idx,
282 const int32_t cpu_thread_count,
283 const int64_t bucket_normalization) {
286 auto hashtable_filling_func = [&](
auto elem,
auto shard,
size_t index) {
290 type_info.min_val / bucket_normalization,
291 type_info.translated_null_val,
292 shard_info.entry_count_per_shard,
294 shard_info.num_shards,
295 shard_info.device_count,
296 bucket_normalization);
297 return filling_func(index, entry_ptr, invalid_slot_val);
304 sd_inner_to_outer_translation_map,
308 hashtable_filling_func);
313 const int32_t invalid_slot_val,
314 const bool for_semi_join,
318 const int32_t* sd_inner_to_outer_translation_map,
319 const int32_t min_inner_elem,
320 const int32_t cpu_thread_idx,
321 const int32_t cpu_thread_count) {
324 auto hashtable_filling_func = [&](
auto elem,
auto shard,
size_t index) {
328 shard_info.entry_count_per_shard,
330 shard_info.num_shards,
331 shard_info.device_count);
332 return filling_func(index, entry_ptr, invalid_slot_val);
339 sd_inner_to_outer_translation_map,
343 hashtable_filling_func);
346 template <
typename T>
348 const int64_t entry_count,
349 const size_t key_component_count,
350 const bool with_val_slot,
351 const int32_t invalid_slot_val,
352 const int32_t cpu_thread_idx,
353 const int32_t cpu_thread_count) {
355 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
356 int32_t step = blockDim.x * gridDim.x;
358 int32_t start = cpu_thread_idx;
359 int32_t step = cpu_thread_count;
361 auto hash_entry_size = (key_component_count + (with_val_slot ? 1 : 0)) *
sizeof(
T);
363 for (int64_t h = start; h < entry_count; h += step) {
364 int64_t off = h * hash_entry_size;
365 auto row_ptr =
reinterpret_cast<T*
>(hash_buff + off);
366 for (
size_t i = 0; i < key_component_count; ++i) {
367 row_ptr[i] = empty_key;
370 row_ptr[key_component_count] = invalid_slot_val;
378 template <
typename T>
379 DEVICE void SUFFIX(init_baseline_hash_join_buff_tbb)(int8_t* hash_buff,
380 const int64_t entry_count,
381 const size_t key_component_count,
382 const bool with_val_slot,
383 const int32_t invalid_slot_val) {
384 const auto hash_entry_size =
385 (key_component_count + (with_val_slot ? 1 : 0)) *
sizeof(
T);
388 [=](
const tbb::blocked_range<int64_t>& r) {
389 const auto start_idx = r.begin();
390 const auto end_idx = r.end();
391 for (int64_t entry_idx = start_idx; entry_idx < end_idx;
393 const int64_t offset = entry_idx * hash_entry_size;
394 auto row_ptr =
reinterpret_cast<T*
>(hash_buff + offset);
395 for (
size_t k = 0; k < key_component_count; ++k) {
396 row_ptr[k] = empty_key;
399 row_ptr[key_component_count] = invalid_slot_val;
405 #endif // #ifdef HAVE_TBB
406 #endif // #ifndef __CUDACC__
409 template <
typename T>
413 const size_t key_component_count,
414 const int64_t hash_entry_size) {
415 uint32_t off = h * hash_entry_size;
416 auto row_ptr =
reinterpret_cast<T*
>(hash_buff + off);
419 const T old = atomicCAS(row_ptr, empty_key, *key);
420 if (empty_key == old && key_component_count > 1) {
421 for (int64_t i = 1; i <= key_component_count - 1; ++i) {
422 atomicExch(row_ptr + i, key[i]);
426 if (key_component_count > 1) {
427 while (atomicAdd(row_ptr + key_component_count - 1, 0) == empty_key) {
433 for (uint32_t i = 0; i < key_component_count; ++i) {
434 if (row_ptr[i] != key[i]) {
441 return reinterpret_cast<T*
>(row_ptr + key_component_count);
448 #define cas_cst(ptr, expected, desired) \
449 (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
450 reinterpret_cast<void*>(&desired), \
451 expected) == expected)
452 #define store_cst(ptr, val) \
453 InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
454 reinterpret_cast<void*>(val))
455 #define load_cst(ptr) \
456 InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
458 #define cas_cst(ptr, expected, desired) \
459 __atomic_compare_exchange_n( \
460 ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
461 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
462 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
465 template <
typename T>
469 const size_t key_component_count,
470 const int64_t hash_entry_size) {
471 uint32_t off = h * hash_entry_size;
472 auto row_ptr =
reinterpret_cast<T*
>(hash_buff + off);
475 if (
UNLIKELY(*key == write_pending)) {
480 const bool success =
cas_cst(row_ptr, &empty_key, write_pending);
482 if (key_component_count > 1) {
483 memcpy(row_ptr + 1, key + 1, (key_component_count - 1) *
sizeof(
T));
486 return reinterpret_cast<T*
>(row_ptr + key_component_count);
488 while (
load_cst(row_ptr) == write_pending) {
491 for (
size_t i = 0; i < key_component_count; ++i) {
492 if (
load_cst(row_ptr + i) != key[i]) {
496 return reinterpret_cast<T*
>(row_ptr + key_component_count);
505 template <
typename T>
508 const int64_t entry_count,
510 const size_t key_component_count,
511 const bool with_val_slot,
512 const int32_t invalid_slot_val,
513 const size_t key_size_in_bytes,
514 const size_t hash_entry_size) {
515 const uint32_t h =
MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
517 hash_buff, h, key, key_component_count, hash_entry_size);
518 if (!matching_group) {
519 uint32_t h_probe = (h + 1) % entry_count;
520 while (h_probe != h) {
522 hash_buff, h_probe, key, key_component_count, hash_entry_size);
523 if (matching_group) {
526 h_probe = (h_probe + 1) % entry_count;
529 if (!matching_group) {
532 if (!with_val_slot) {
535 if (
mapd_cas(matching_group, invalid_slot_val, val) != invalid_slot_val) {
541 template <
typename T>
544 const int64_t entry_count,
546 const size_t key_component_count,
547 const bool with_val_slot,
548 const int32_t invalid_slot_val,
549 const size_t key_size_in_bytes,
550 const size_t hash_entry_size) {
551 const uint32_t h =
MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
553 hash_buff, h, key, key_component_count, hash_entry_size);
554 if (!matching_group) {
555 uint32_t h_probe = (h + 1) % entry_count;
556 while (h_probe != h) {
558 hash_buff, h_probe, key, key_component_count, hash_entry_size);
559 if (matching_group) {
562 h_probe = (h_probe + 1) % entry_count;
565 if (!matching_group) {
568 if (!with_val_slot) {
571 mapd_cas(matching_group, invalid_slot_val, val);
575 template <
typename T,
typename FILL_HANDLER>
577 const int64_t entry_count,
578 const int32_t invalid_slot_val,
579 const bool for_semi_join,
580 const size_t key_component_count,
581 const bool with_val_slot,
582 const FILL_HANDLER*
f,
583 const int64_t num_elems,
584 const int32_t cpu_thread_idx,
585 const int32_t cpu_thread_count) {
587 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
588 int32_t step = blockDim.x * gridDim.x;
590 int32_t start = cpu_thread_idx;
591 int32_t step = cpu_thread_count;
595 const size_t key_size_in_bytes = key_component_count *
sizeof(
T);
596 const size_t hash_entry_size =
597 (key_component_count + (with_val_slot ? 1 : 0)) *
sizeof(
T);
598 auto key_buff_handler = [hash_buff,
604 &for_semi_join](
const int64_t entry_idx,
605 const T* key_scratch_buffer,
606 const size_t key_component_count) {
608 return write_baseline_hash_slot_for_semi_join<T>(entry_idx,
618 return write_baseline_hash_slot<T>(entry_idx,
631 f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
632 for (
auto& it : cols.slice(start, step)) {
633 const auto err = (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
644 #define mapd_add(address, val) atomicAdd(address, val)
645 #elif defined(_MSC_VER)
646 #define mapd_add(address, val) \
647 InterlockedExchangeAdd(reinterpret_cast<volatile long*>(address), \
648 static_cast<long>(val))
650 #define mapd_add(address, val) __sync_fetch_and_add(address, val)
653 template <
typename SLOT_
SELECTOR>
659 const int32_t* sd_inner_to_outer_translation_map,
660 const int32_t min_inner_elem,
661 const int32_t cpu_thread_idx,
662 const int32_t cpu_thread_count
665 SLOT_SELECTOR slot_selector) {
667 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
668 int32_t step = blockDim.x * gridDim.x;
670 int32_t start = cpu_thread_idx;
671 int32_t step = cpu_thread_count;
674 for (
auto item : col.slice(start, step)) {
675 int64_t elem = item.element;
676 if (elem == type_info.null_val) {
677 if (type_info.uses_bw_eq) {
678 elem = type_info.translated_null_val;
684 if (sd_inner_to_outer_translation_map &&
685 (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
690 sd_inner_to_outer_translation_map);
697 auto* entry_ptr = slot_selector(count_buff, elem);
707 const int32_t* sd_inner_to_outer_translation_map,
708 const int32_t min_inner_elem,
709 const int32_t cpu_thread_idx,
710 const int32_t cpu_thread_count
713 auto slot_sel = [&type_info](
auto count_buff,
auto elem) {
721 sd_inner_to_outer_translation_map,
736 const int32_t* sd_inner_to_outer_translation_map,
737 const int32_t min_inner_elem,
738 const int32_t cpu_thread_idx,
739 const int32_t cpu_thread_count
742 const int64_t bucket_normalization) {
743 auto slot_sel = [bucket_normalization, &type_info](
auto count_buff,
auto elem) {
746 type_info.min_val / bucket_normalization,
747 type_info.translated_null_val,
748 bucket_normalization);
755 sd_inner_to_outer_translation_map,
771 const int32_t* sd_inner_to_outer_translation_map,
772 const int32_t min_inner_elem,
773 const int32_t cpu_thread_idx,
774 const int32_t cpu_thread_count
778 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
779 int32_t step = blockDim.x * gridDim.x;
781 int32_t start = cpu_thread_idx;
782 int32_t step = cpu_thread_count;
785 for (
auto item : col.slice(start, step)) {
786 int64_t elem = item.element;
787 if (elem == type_info.null_val) {
788 if (type_info.uses_bw_eq) {
789 elem = type_info.translated_null_val;
795 if (sd_inner_to_outer_translation_map &&
796 (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
801 sd_inner_to_outer_translation_map);
811 shard_info.entry_count_per_shard,
812 shard_info.num_shards,
813 shard_info.device_count);
818 template <
typename T>
821 const size_t key_component_count,
822 const T* composite_key_dict,
823 const int64_t entry_count,
824 const size_t key_size_in_bytes) {
825 const uint32_t h =
MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
826 uint32_t off = h * key_component_count;
827 if (
keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
828 return &composite_key_dict[off];
830 uint32_t h_probe = (h + 1) % entry_count;
831 while (h_probe != h) {
832 off = h_probe * key_component_count;
833 if (
keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
834 return &composite_key_dict[off];
836 h_probe = (h_probe + 1) % entry_count;
846 template <
typename T,
typename KEY_HANDLER>
848 const T* composite_key_dict,
849 const int64_t entry_count,
850 const KEY_HANDLER*
f,
851 const int64_t num_elems
854 const int32_t cpu_thread_idx,
855 const int32_t cpu_thread_count
859 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
860 int32_t step = blockDim.x * gridDim.x;
862 int32_t start = cpu_thread_idx;
863 int32_t step = cpu_thread_count;
866 assert(composite_key_dict);
869 const size_t key_size_in_bytes = f->get_key_component_count() *
sizeof(
T);
870 auto key_buff_handler = [composite_key_dict,
873 key_size_in_bytes](
const int64_t row_entry_idx,
874 const T* key_scratch_buff,
875 const size_t key_component_count) {
876 const auto matching_group =
882 const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
883 mapd_add(&count_buff[entry_idx], int32_t(1));
888 f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
889 for (
auto& it : cols.slice(start, step)) {
890 (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
894 template <
typename SLOT_
SELECTOR>
896 const int64_t hash_entry_count,
901 const int32_t* sd_inner_to_outer_translation_map,
902 const int32_t min_inner_elem,
903 const int32_t cpu_thread_idx,
904 const int32_t cpu_thread_count
907 SLOT_SELECTOR slot_selector) {
908 int32_t* pos_buff = buff;
909 int32_t* count_buff = buff + hash_entry_count;
910 int32_t* id_buff = count_buff + hash_entry_count;
912 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
913 int32_t step = blockDim.x * gridDim.x;
915 int32_t start = cpu_thread_idx;
916 int32_t step = cpu_thread_count;
919 for (
auto item : col.slice(start, step)) {
920 const size_t index = item.index;
921 int64_t elem = item.element;
922 if (elem == type_info.null_val) {
923 if (type_info.uses_bw_eq) {
924 elem = type_info.translated_null_val;
930 if (sd_inner_to_outer_translation_map &&
931 (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
936 sd_inner_to_outer_translation_map);
943 auto pos_ptr = slot_selector(pos_buff, elem);
944 const auto bin_idx = pos_ptr - pos_buff;
945 const auto id_buff_idx =
mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
946 id_buff[id_buff_idx] =
static_cast<int32_t
>(index);
950 template <
typename SLOT_
SELECTOR>
953 const int64_t hash_entry_count,
958 const int32_t* sd_inner_to_outer_translation_map,
959 const int32_t min_inner_elem,
960 const int32_t cpu_thread_idx,
961 const int32_t cpu_thread_count
964 SLOT_SELECTOR slot_selector) {
965 int32_t* pos_buff = buff;
966 int32_t* count_buff = buff + hash_entry_count;
967 int32_t* id_buff = count_buff + hash_entry_count;
968 int32_t* reversed_id_buff = id_buff + join_column.
num_elems;
970 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
971 int32_t step = blockDim.x * gridDim.x;
973 int32_t start = cpu_thread_idx;
974 int32_t step = cpu_thread_count;
981 bool all_nulls = hash_entry_count == 1 && type_info.min_val == 0 &&
982 type_info.max_val == -1 &&
983 (*col.begin()).element == type_info.null_val;
985 int32_t thread_idx = -1;
987 thread_idx = threadIdx.x;
989 thread_idx = cpu_thread_idx;
991 if (thread_idx == 0) {
993 count_buff[0] = join_column.
num_elems - 1;
994 for (
size_t i = 0; i < join_column.
num_elems; i++) {
995 reversed_id_buff[i] = i;
1000 for (
auto item : col.slice(start, step)) {
1001 const size_t index = item.index;
1002 int64_t elem = item.element;
1003 if (elem == type_info.null_val) {
1004 elem = type_info.translated_null_val;
1007 if (sd_inner_to_outer_translation_map && elem != type_info.translated_null_val) {
1012 sd_inner_to_outer_translation_map);
1019 auto pos_ptr = slot_selector(pos_buff, elem);
1020 const auto bin_idx = pos_ptr - pos_buff;
1021 auto id_buff_idx =
mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1022 id_buff[id_buff_idx] =
static_cast<int32_t
>(index);
1023 reversed_id_buff[index] = id_buff_idx;
1028 const int64_t hash_entry_count,
1031 const bool for_window_framing
1034 const int32_t* sd_inner_to_outer_translation_map,
1035 const int32_t min_inner_elem,
1036 const int32_t cpu_thread_idx,
1037 const int32_t cpu_thread_count
1040 auto slot_sel = [&type_info](
auto pos_buff,
auto elem) {
1044 if (!for_window_framing) {
1051 sd_inner_to_outer_translation_map,
1065 sd_inner_to_outer_translation_map,
1077 const int64_t hash_entry_count,
1082 const int32_t* sd_inner_to_outer_translation_map,
1083 const int32_t min_inner_elem,
1084 const int32_t cpu_thread_idx,
1085 const int32_t cpu_thread_count
1088 const int64_t bucket_normalization) {
1089 auto slot_sel = [&type_info, bucket_normalization](
auto pos_buff,
auto elem) {
1092 type_info.min_val / bucket_normalization,
1093 type_info.translated_null_val,
1094 bucket_normalization);
1103 sd_inner_to_outer_translation_map,
1112 template <
typename SLOT_
SELECTOR>
1114 const int64_t hash_entry_count,
1120 const int32_t* sd_inner_to_outer_translation_map,
1121 const int32_t min_inner_elem,
1122 const int32_t cpu_thread_idx,
1123 const int32_t cpu_thread_count
1126 SLOT_SELECTOR slot_selector) {
1128 int32_t* pos_buff = buff;
1129 int32_t* count_buff = buff + hash_entry_count;
1130 int32_t* id_buff = count_buff + hash_entry_count;
1133 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1134 int32_t step = blockDim.x * gridDim.x;
1136 int32_t start = cpu_thread_idx;
1137 int32_t step = cpu_thread_count;
1140 for (
auto item : col.slice(start, step)) {
1141 const size_t index = item.index;
1142 int64_t elem = item.element;
1151 if (sd_inner_to_outer_translation_map &&
1157 sd_inner_to_outer_translation_map);
1164 auto* pos_ptr = slot_selector(pos_buff, elem);
1165 const auto bin_idx = pos_ptr - pos_buff;
1166 const auto id_buff_idx =
mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1167 id_buff[id_buff_idx] =
static_cast<int32_t
>(index);
1172 const int64_t hash_entry_count,
1178 const int32_t* sd_inner_to_outer_translation_map,
1179 const int32_t min_inner_elem,
1180 const int32_t cpu_thread_idx,
1181 const int32_t cpu_thread_count
1184 auto slot_sel = [&type_info, &shard_info](
auto pos_buff,
auto elem) {
1188 shard_info.entry_count_per_shard,
1189 shard_info.num_shards,
1190 shard_info.device_count);
1198 sd_inner_to_outer_translation_map,
1209 const int64_t hash_entry_count,
1215 const int32_t* sd_inner_to_outer_translation_map,
1216 const int32_t min_inner_elem,
1217 const int32_t cpu_thread_idx,
1218 const int32_t cpu_thread_count
1221 const int64_t bucket_normalization) {
1222 auto slot_sel = [&shard_info, &type_info, bucket_normalization](
auto pos_buff,
1227 type_info.min_val / bucket_normalization,
1228 type_info.translated_null_val,
1229 shard_info.entry_count_per_shard,
1230 shard_info.num_shards,
1231 shard_info.device_count,
1232 bucket_normalization);
1241 sd_inner_to_outer_translation_map,
1250 template <
typename T,
typename KEY_HANDLER>
1252 const T* composite_key_dict,
1253 const int64_t hash_entry_count,
1254 const KEY_HANDLER*
f,
1255 const int64_t num_elems,
1256 const bool for_window_framing
1259 const int32_t cpu_thread_idx,
1260 const int32_t cpu_thread_count
1263 int32_t* pos_buff = buff;
1264 int32_t* count_buff = buff + hash_entry_count;
1265 int32_t* id_buff = count_buff + hash_entry_count;
1266 int32_t* reversed_id_buff = for_window_framing ? id_buff + num_elems :
nullptr;
1268 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1269 int32_t step = blockDim.x * gridDim.x;
1271 int32_t start = cpu_thread_idx;
1272 int32_t step = cpu_thread_count;
1277 assert(composite_key_dict);
1279 const size_t key_size_in_bytes = f->get_key_component_count() *
sizeof(
T);
1280 auto key_buff_handler = [composite_key_dict,
1287 for_window_framing](
const int64_t row_index,
1288 const T* key_scratch_buff,
1289 const size_t key_component_count) {
1290 const T* matching_group =
1292 key_component_count,
1296 const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
1297 int32_t* pos_ptr = pos_buff + entry_idx;
1298 const auto bin_idx = pos_ptr - pos_buff;
1299 const auto id_buff_idx =
mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1300 id_buff[id_buff_idx] =
static_cast<int32_t
>(row_index);
1301 if (for_window_framing) {
1302 reversed_id_buff[row_index] = id_buff_idx;
1308 f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1309 for (
auto& it : cols.slice(start, step)) {
1310 (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1317 template <
typename KEY_HANDLER>
1319 int32_t* row_count_buffer,
1321 const int64_t num_elems,
1322 const KEY_HANDLER* f
1325 const int32_t cpu_thread_idx,
1326 const int32_t cpu_thread_count
1330 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1331 int32_t step = blockDim.x * gridDim.x;
1333 int32_t start = cpu_thread_idx;
1334 int32_t step = cpu_thread_count;
1337 auto key_buff_handler = [b, hll_buffer, row_count_buffer](
1338 const int64_t entry_idx,
1339 const int64_t* key_scratch_buff,
1340 const size_t key_component_count) {
1341 if (row_count_buffer) {
1342 row_count_buffer[entry_idx] += 1;
1345 const uint64_t hash =
1347 const uint32_t index = hash >> (64 - b);
1348 const auto rank =
get_rank(hash << b, 64 - b);
1350 atomicMax(reinterpret_cast<int32_t*>(hll_buffer) + index, rank);
1352 hll_buffer[index] = std::max(hll_buffer[index], rank);
1361 f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1362 for (
auto& it : cols.slice(start, step)) {
1363 (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1371 __device__
double atomicMin(
double* address,
double val) {
1372 unsigned long long int* address_as_ull = (
unsigned long long int*)address;
1373 unsigned long long int old = *address_as_ull, assumed;
1377 old = atomicCAS(address_as_ull,
1379 __double_as_longlong(min(val, __longlong_as_double(assumed))));
1380 }
while (assumed != old);
1382 return __longlong_as_double(old);
1391 const double* bucket_size_thresholds
1394 const int32_t cpu_thread_idx,
1395 const int32_t cpu_thread_count
1399 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1400 int32_t step = blockDim.x * gridDim.x;
1402 int32_t start = cpu_thread_idx;
1403 int32_t step = cpu_thread_count;
1408 double bounds[2 *
N];
1409 for (
size_t j = 0; j < 2 *
N; j++) {
1413 for (
size_t j = 0; j <
N; j++) {
1414 const auto diff = bounds[j +
N] - bounds[j];
1416 if (diff > bucket_size_thresholds[j]) {
1417 atomicMin(&bucket_sizes_for_thread[j], diff);
1420 if (diff < bucket_size_thresholds[j] && diff > bucket_sizes_for_thread[j]) {
1421 bucket_sizes_for_thread[j] = diff;
1430 template <
typename InputIterator,
typename OutputIterator>
1434 const size_t thread_count) {
1435 using ElementType =
typename InputIterator::value_type;
1436 using OffsetType =
typename InputIterator::difference_type;
1437 const OffsetType elem_count = last - first;
1438 if (elem_count < 10000 || thread_count <= 1) {
1439 ElementType sum = 0;
1440 for (
auto iter = first; iter != last; ++iter, ++out) {
1441 *out = sum += *iter;
1446 const OffsetType step = (elem_count + thread_count - 1) / thread_count;
1447 OffsetType start_off = 0;
1448 OffsetType end_off = std::min(step, elem_count);
1449 std::vector<ElementType> partial_sums(thread_count);
1450 std::vector<std::future<void>> counter_threads;
1451 for (
size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx,
1452 start_off = std::min(start_off + step, elem_count),
1453 end_off = std::min(start_off + step, elem_count)) {
1457 ElementType&
partial_sum,
const OffsetType start,
const OffsetType end) {
1458 ElementType sum = 0;
1459 for (
auto in_iter = first + start, out_iter = out + start;
1460 in_iter != (first + end);
1461 ++in_iter, ++out_iter) {
1462 *out_iter = sum += *in_iter;
1466 std::ref(partial_sums[thread_idx]),
1470 for (
auto& child : counter_threads) {
1474 ElementType sum = 0;
1475 for (
auto& s : partial_sums) {
1480 counter_threads.clear();
1481 start_off = std::min(step, elem_count);
1482 end_off = std::min(start_off + step, elem_count);
1483 for (
size_t thread_idx = 0; thread_idx < thread_count - 1; ++thread_idx,
1484 start_off = std::min(start_off + step, elem_count),
1485 end_off = std::min(start_off + step, elem_count)) {
1488 [out](
const ElementType prev_sum,
const OffsetType start,
const OffsetType end) {
1489 for (
auto iter = out + start; iter != (out + end); ++iter) {
1493 partial_sums[thread_idx],
1497 for (
auto& child : counter_threads) {
1502 template <
typename COUNT_MATCHES_LAUNCH_FUNCTOR,
typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1504 const int64_t hash_entry_count,
1507 const int32_t* sd_inner_to_outer_translation_map,
1508 const int32_t min_inner_elem,
1509 const int32_t cpu_thread_count,
1510 const bool for_window_framing,
1511 COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func,
1512 FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func) {
1514 int32_t* pos_buff = buff;
1515 int32_t* count_buff = buff + hash_entry_count;
1516 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
1517 std::vector<std::future<void>> counter_threads;
1518 for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1523 for (
auto& child : counter_threads) {
1527 std::vector<int32_t> count_copy(hash_entry_count, 0);
1528 CHECK_GT(hash_entry_count, int64_t(0));
1529 memcpy(count_copy.data() + 1, count_buff, (hash_entry_count - 1) *
sizeof(int32_t));
1534 count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1536 std::vector<std::future<void>> pos_threads;
1537 for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1540 [&](
size_t thread_idx) {
1541 for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1542 if (count_buff[i]) {
1543 pos_buff[i] = count_copy[i];
1549 for (
auto& child : pos_threads) {
1552 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
1553 std::vector<std::future<void>> rowid_threads;
1554 for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1559 for (
auto& child : rowid_threads) {
1565 const int32_t cpu_thread_count) {
1567 auto const buff = args.
buff;
1569 auto launch_count_matches = [count_buff =
1570 buff + hash_entry_info.bucketized_hash_entry_count,
1571 &
args](
auto cpu_thread_idx,
auto cpu_thread_count) {
1581 auto launch_fill_row_ids =
1582 [hash_entry_count = hash_entry_info.bucketized_hash_entry_count, buff,
args](
1583 auto cpu_thread_idx,
auto cpu_thread_count) {
1597 hash_entry_info.bucketized_hash_entry_count,
1604 launch_count_matches,
1605 launch_fill_row_ids);
1610 const int32_t cpu_thread_count) {
1612 auto const buff = args.
buff;
1615 auto hash_entry_count = hash_entry_info.getNormalizedHashEntryCount();
1616 auto launch_count_matches = [bucket_normalization,
1617 count_buff = buff + hash_entry_count,
1618 &
args](
auto cpu_thread_idx,
auto cpu_thread_count) {
1627 bucket_normalization);
1629 auto launch_fill_row_ids = [bucket_normalization, hash_entry_count, buff,
args](
1630 auto cpu_thread_idx,
auto cpu_thread_count) {
1640 bucket_normalization);
1651 launch_count_matches,
1652 launch_fill_row_ids);
1655 template <
typename COUNT_MATCHES_LAUNCH_FUNCTOR,
typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1658 const int64_t hash_entry_count,
1662 const int32_t* sd_inner_to_outer_translation_map,
1663 const int32_t min_inner_elem,
1664 const int32_t cpu_thread_count,
1665 COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher,
1666 FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher) {
1668 int32_t* pos_buff = buff;
1669 int32_t* count_buff = buff + hash_entry_count;
1670 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
1671 std::vector<std::future<void>> counter_threads;
1672 for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1677 for (
auto& child : counter_threads) {
1681 std::vector<int32_t> count_copy(hash_entry_count, 0);
1682 CHECK_GT(hash_entry_count, int64_t(0));
1683 memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) *
sizeof(int32_t));
1685 count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1686 std::vector<std::future<void>> pos_threads;
1687 for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1690 [&](
const int32_t thread_idx) {
1691 for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1692 if (count_buff[i]) {
1693 pos_buff[i] = count_copy[i];
1699 for (
auto& child : pos_threads) {
1703 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
1704 std::vector<std::future<void>> rowid_threads;
1705 for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1710 for (
auto& child : rowid_threads) {
1716 const int64_t hash_entry_count,
1720 const int32_t* sd_inner_to_outer_translation_map,
1721 const int32_t min_inner_elem,
1722 const int32_t cpu_thread_count) {
1723 auto launch_count_matches = [count_buff = buff + hash_entry_count,
1729 sd_inner_to_outer_translation_map,
1732 ](
auto cpu_thread_idx,
auto cpu_thread_count) {
1739 sd_inner_to_outer_translation_map,
1747 auto launch_fill_row_ids = [buff,
1754 sd_inner_to_outer_translation_map,
1757 ](
auto cpu_thread_idx,
auto cpu_thread_count) {
1765 sd_inner_to_outer_translation_map,
1779 sd_inner_to_outer_translation_map,
1784 launch_count_matches,
1785 launch_fill_row_ids);
1789 const int64_t entry_count,
1790 const size_t key_component_count,
1791 const bool with_val_slot,
1792 const int32_t invalid_slot_val,
1793 const int32_t cpu_thread_idx,
1794 const int32_t cpu_thread_count) {
1795 init_baseline_hash_join_buff<int32_t>(hash_join_buff,
1797 key_component_count,
1805 const int64_t entry_count,
1806 const size_t key_component_count,
1807 const bool with_val_slot,
1808 const int32_t invalid_slot_val,
1809 const int32_t cpu_thread_idx,
1810 const int32_t cpu_thread_count) {
1811 init_baseline_hash_join_buff<int64_t>(hash_join_buff,
1813 key_component_count,
1823 void init_baseline_hash_join_buff_tbb_32(int8_t* hash_join_buff,
1824 const int64_t entry_count,
1825 const size_t key_component_count,
1826 const bool with_val_slot,
1827 const int32_t invalid_slot_val) {
1828 init_baseline_hash_join_buff_tbb<int32_t>(
1829 hash_join_buff, entry_count, key_component_count, with_val_slot, invalid_slot_val);
1832 void init_baseline_hash_join_buff_tbb_64(int8_t* hash_join_buff,
1833 const int64_t entry_count,
1834 const size_t key_component_count,
1835 const bool with_val_slot,
1836 const int32_t invalid_slot_val) {
1837 init_baseline_hash_join_buff_tbb<int64_t>(
1838 hash_join_buff, entry_count, key_component_count, with_val_slot, invalid_slot_val);
1841 #endif // #ifdef HAVE_TBB
1842 #endif // #ifndef __CUDACC__
1845 const int64_t entry_count,
1846 const int32_t invalid_slot_val,
1847 const bool for_semi_join,
1848 const size_t key_component_count,
1849 const bool with_val_slot,
1851 const int64_t num_elems,
1852 const int32_t cpu_thread_idx,
1853 const int32_t cpu_thread_count) {
1854 return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1858 key_component_count,
1868 const int64_t entry_count,
1869 const int32_t invalid_slot_val,
1870 const size_t key_component_count,
1871 const bool with_val_slot,
1873 const int64_t num_elems,
1874 const int32_t cpu_thread_idx,
1875 const int32_t cpu_thread_count) {
1876 return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1880 key_component_count,
1889 const size_t entry_count,
1890 const int32_t invalid_slot_val,
1891 const size_t key_component_count,
1892 const bool with_val_slot,
1894 const size_t num_elems,
1895 const int32_t cpu_thread_idx,
1896 const int32_t cpu_thread_count) {
1897 return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1901 key_component_count,
1910 const int64_t entry_count,
1911 const int32_t invalid_slot_val,
1912 const bool for_semi_join,
1913 const size_t key_component_count,
1914 const bool with_val_slot,
1916 const int64_t num_elems,
1917 const int32_t cpu_thread_idx,
1918 const int32_t cpu_thread_count) {
1919 return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1923 key_component_count,
1933 const int64_t entry_count,
1934 const int32_t invalid_slot_val,
1935 const size_t key_component_count,
1936 const bool with_val_slot,
1938 const int64_t num_elems,
1939 const int32_t cpu_thread_idx,
1940 const int32_t cpu_thread_count) {
1941 return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1945 key_component_count,
1954 const size_t entry_count,
1955 const int32_t invalid_slot_val,
1956 const size_t key_component_count,
1957 const bool with_val_slot,
1959 const size_t num_elems,
1960 const int32_t cpu_thread_idx,
1961 const int32_t cpu_thread_count) {
1962 return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1966 key_component_count,
1974 template <
typename T>
1977 const T* composite_key_dict,
1978 const int64_t hash_entry_count,
1979 const size_t key_component_count,
1980 const std::vector<JoinColumn>& join_column_per_key,
1981 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1982 const std::vector<JoinBucketInfo>& join_buckets_per_key,
1983 const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
1984 const std::vector<int32_t>& sd_min_inner_elems,
1985 const size_t cpu_thread_count,
1986 const bool is_range_join,
1987 const bool is_geo_compressed,
1988 const bool for_window_framing) {
1989 int32_t* pos_buff = buff;
1990 int32_t* count_buff = buff + hash_entry_count;
1991 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
1992 std::vector<std::future<void>> counter_threads;
1993 for (
size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1994 if (is_range_join) {
2000 &join_buckets_per_key,
2001 &join_column_per_key,
2007 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2008 &join_column_per_key[0],
2009 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2014 join_column_per_key[0].num_elems,
2018 }
else if (join_buckets_per_key.size() > 0) {
2024 &join_buckets_per_key,
2025 &join_column_per_key,
2029 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2030 &join_column_per_key[0],
2031 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2036 join_column_per_key[0].num_elems,
2041 counter_threads.push_back(
2045 &key_component_count,
2047 &join_column_per_key,
2049 &sd_inner_to_outer_translation_maps,
2050 &sd_min_inner_elems,
2053 const auto key_handler =
2056 &join_column_per_key[0],
2057 &type_info_per_key[0],
2058 &sd_inner_to_outer_translation_maps[0],
2059 &sd_min_inner_elems[0]);
2064 join_column_per_key[0].num_elems,
2071 for (
auto& child : counter_threads) {
2075 std::vector<int32_t> count_copy(hash_entry_count, 0);
2076 CHECK_GT(hash_entry_count, int64_t(0));
2077 memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) *
sizeof(int32_t));
2079 count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
2080 std::vector<std::future<void>> pos_threads;
2081 for (
size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
2084 [&](
const int thread_idx) {
2085 for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
2086 if (count_buff[i]) {
2087 pos_buff[i] = count_copy[i];
2093 for (
auto& child : pos_threads) {
2097 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
2098 std::vector<std::future<void>> rowid_threads;
2099 for (
size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
2100 if (is_range_join) {
2106 &join_column_per_key,
2107 &join_buckets_per_key,
2113 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2114 &join_column_per_key[0],
2115 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2121 join_column_per_key[0].num_elems,
2126 }
else if (join_buckets_per_key.size() > 0) {
2132 &join_column_per_key,
2133 &join_buckets_per_key,
2138 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2139 &join_column_per_key[0],
2140 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2146 join_column_per_key[0].num_elems,
2156 key_component_count,
2157 &join_column_per_key,
2159 &sd_inner_to_outer_translation_maps,
2160 &sd_min_inner_elems,
2165 key_component_count,
2167 &join_column_per_key[0],
2168 &type_info_per_key[0],
2169 &sd_inner_to_outer_translation_maps[0],
2170 &sd_min_inner_elems[0]);
2176 join_column_per_key[0].num_elems,
2184 for (
auto& child : rowid_threads) {
2191 const int32_t* composite_key_dict,
2192 const int64_t hash_entry_count,
2193 const size_t key_component_count,
2194 const std::vector<JoinColumn>& join_column_per_key,
2195 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2196 const std::vector<JoinBucketInfo>& join_bucket_info,
2197 const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
2198 const std::vector<int32_t>& sd_min_inner_elems,
2199 const int32_t cpu_thread_count,
2200 const bool is_range_join,
2201 const bool is_geo_compressed,
2202 const bool for_window_framing) {
2203 fill_one_to_many_baseline_hash_table<int32_t>(buff,
2206 key_component_count,
2207 join_column_per_key,
2210 sd_inner_to_outer_translation_maps,
2215 for_window_framing);
2220 const int64_t* composite_key_dict,
2221 const int64_t hash_entry_count,
2222 const size_t key_component_count,
2223 const std::vector<JoinColumn>& join_column_per_key,
2224 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2225 const std::vector<JoinBucketInfo>& join_bucket_info,
2226 const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
2227 const std::vector<int32_t>& sd_min_inner_elems,
2228 const int32_t cpu_thread_count,
2229 const bool is_range_join,
2230 const bool is_geo_compressed,
2231 const bool for_window_framing) {
2232 fill_one_to_many_baseline_hash_table<int64_t>(buff,
2235 key_component_count,
2236 join_column_per_key,
2239 sd_inner_to_outer_translation_maps,
2244 for_window_framing);
2249 const size_t padded_size_bytes,
2250 const std::vector<JoinColumn>& join_column_per_key,
2251 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2252 const int thread_count) {
2253 CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2254 CHECK(!join_column_per_key.empty());
2256 std::vector<std::future<void>> approx_distinct_threads;
2257 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2258 approx_distinct_threads.push_back(
std::async(
2260 [&join_column_per_key,
2263 hll_buffer_all_cpus,
2267 auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2271 &join_column_per_key[0],
2272 &type_info_per_key[0],
2278 join_column_per_key[0].num_elems,
2284 for (
auto& child : approx_distinct_threads) {
2290 uint8_t* hll_buffer_all_cpus,
2291 std::vector<int32_t>& row_counts,
2293 const size_t padded_size_bytes,
2294 const std::vector<JoinColumn>& join_column_per_key,
2295 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2296 const std::vector<JoinBucketInfo>& join_buckets_per_key,
2297 const int thread_count) {
2298 CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2299 CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2300 CHECK(!join_column_per_key.empty());
2302 std::vector<std::future<void>> approx_distinct_threads;
2303 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2304 approx_distinct_threads.push_back(
std::async(
2306 [&join_column_per_key,
2307 &join_buckets_per_key,
2310 hll_buffer_all_cpus,
2314 auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2317 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2318 &join_column_per_key[0],
2319 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2323 join_column_per_key[0].num_elems,
2329 for (
auto& child : approx_distinct_threads) {
2334 row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2338 uint8_t* hll_buffer_all_cpus,
2339 std::vector<int32_t>& row_counts,
2341 const size_t padded_size_bytes,
2342 const std::vector<JoinColumn>& join_column_per_key,
2343 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2344 const std::vector<JoinBucketInfo>& join_buckets_per_key,
2345 const bool is_compressed,
2346 const int thread_count) {
2347 CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2348 CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2349 CHECK(!join_column_per_key.empty());
2351 std::vector<std::future<void>> approx_distinct_threads;
2352 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2353 approx_distinct_threads.push_back(
std::async(
2355 [&join_column_per_key,
2356 &join_buckets_per_key,
2359 hll_buffer_all_cpus,
2364 auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2368 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2369 &join_column_per_key[0],
2370 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2374 join_column_per_key[0].num_elems,
2380 for (
auto& child : approx_distinct_threads) {
2385 row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2391 const std::vector<double>& bucket_size_thresholds,
2392 const int thread_count) {
2393 std::vector<std::vector<double>> bucket_sizes_for_threads;
2394 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2395 bucket_sizes_for_threads.emplace_back(bucket_sizes_for_dimension.size(), 0.0);
2397 std::vector<std::future<void>> threads;
2398 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2400 compute_bucket_sizes_impl<2>,
2401 bucket_sizes_for_threads[thread_idx].data(),
2404 bucket_size_thresholds.data(),
2408 for (
auto& child : threads) {
2412 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2413 for (
size_t i = 0; i < bucket_sizes_for_dimension.size(); i++) {
2414 if (bucket_sizes_for_threads[thread_idx][i] > bucket_sizes_for_dimension[i]) {
2415 bucket_sizes_for_dimension[i] = bucket_sizes_for_threads[thread_idx][i];
2421 #endif // ifndef __CUDACC__
const bool for_window_framing
T * get_matching_baseline_hash_slot_at(int8_t *hash_buff, const uint32_t h, const T *key, const size_t key_component_count, const int64_t hash_entry_size)
const JoinColumnTypeInfo type_info
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const int32_t min_inner_elem
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
GLOBAL void SUFFIX() fill_row_ids_sharded_bucketized(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE void fill_row_ids_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
DEVICE void fill_row_ids_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
__device__ double atomicMin(double *address, double val)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
DEVICE int SUFFIX() fill_hash_join_buff_bitwise_eq(OneToOnePerfectJoinHashTableFillFuncArgs const args, int32_t const cpu_thread_idx, int32_t const cpu_thread_count)
const int32_t * sd_inner_to_outer_translation_map
ALWAYS_INLINE DEVICE int SUFFIX() fill_hashtable_for_semi_join(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
const JoinColumn join_column
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
FORCE_INLINE DEVICE uint32_t MurmurHash1Impl(const void *key, int len, const uint32_t seed)
void fill_one_to_many_hash_table(OneToManyPerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_count)
DEVICE int SUFFIX() fill_hash_join_buff(OneToOnePerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_bitwise_eq(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val)
DEVICE auto fill_hash_join_buff_impl(OneToOnePerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, HASHTABLE_FILLING_FUNC filling_func)
const int64_t translated_null_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
DEVICE NEVER_INLINE const T *SUFFIX() get_matching_baseline_hash_slot_readonly(const T *key, const size_t key_component_count, const T *composite_key_dict, const int64_t entry_count, const size_t key_size_in_bytes)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const int64_t bucket_normalization)
future< Result > async(Fn &&fn, Args &&...args)
const JoinColumn join_column
static constexpr int32_t INVALID_STR_ID
const BucketizedHashEntryInfo hash_entry_info
GLOBAL void SUFFIX() fill_row_ids_baseline(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const KEY_HANDLER *f, const int64_t num_elems, const bool for_window_framing, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
Iterates over the rows of a JoinColumn across multiple fragments/chunks.
int64_t bucket_normalization
DEVICE T SUFFIX() get_invalid_key()
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define mapd_cas(address, compare, val)
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
DEVICE void partial_sum(ARGS &&...args)
int64_t map_str_id_to_outer_dict(const int64_t inner_elem, const int64_t min_inner_elem, const int64_t min_outer_elem, const int64_t max_outer_elem, const int32_t *inner_to_outer_translation_map)
int bbox_intersect_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const BoundingBoxIntersectKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define cas_cst(ptr, expected, desired)
int bbox_intersect_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const BoundingBoxIntersectKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() approximate_distinct_tuples_impl(uint8_t *hll_buffer, int32_t *row_count_buffer, const uint32_t b, const int64_t num_elems, const KEY_HANDLER *f, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int range_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_sharded(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count)
int range_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_bucketized(OneToManyPerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
void approximate_distinct_tuples_bbox_intersect(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
DEVICE int write_baseline_hash_slot(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
DEVICE int fill_hash_join_buff_sharded_impl(int32_t *buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, HASHTABLE_FILLING_FUNC filling_func)
void approximate_distinct_tuples_range(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const bool is_compressed, const int thread_count)
#define store_cst(ptr, val)
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const bool for_window_framing, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define DEBUG_TIMER(name)
ALWAYS_INLINE DEVICE int SUFFIX() fill_one_to_one_hashtable(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
const JoinColumnTypeInfo type_info
DEVICE NEVER_INLINE double SUFFIX() fixed_width_double_decode_noinline(const int8_t *byte_stream, const int64_t pos)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(OneToOnePerfectJoinHashTableFillFuncArgs const args, int32_t const cpu_thread_idx, int32_t const cpu_thread_count)
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_baseline_hash_table(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const size_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
DEVICE int write_baseline_hash_slot_for_semi_join(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
__device__ double atomicMax(double *address, double val)
DEVICE FORCE_INLINE const int8_t * ptr() const
#define mapd_add(address, val)
GLOBAL void SUFFIX() count_matches_baseline(int32_t *count_buff, const T *composite_key_dict, const int64_t entry_count, const KEY_HANDLER *f, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_count, const bool for_window_framing, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func)
GLOBAL void SUFFIX() compute_bucket_sizes_impl(double *bucket_sizes_for_thread, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const size_t g_maximum_conditions_to_coalesce
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
const int32_t * sd_inner_to_outer_translation_map
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void fill_row_ids_for_window_framing_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
const int32_t min_inner_elem
#define SHARD_FOR_KEY(key, num_shards)
FORCE_INLINE DEVICE uint64_t MurmurHash64AImpl(const void *key, int len, uint64_t seed)
DEVICE void count_matches_impl(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)