OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HashJoinRuntime.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "HashJoinRuntime.h"
18 
24 #include "Shared/shard_key.h"
25 #ifdef __CUDACC__
29 #else
30 #include "Logger/Logger.h"
31 
33 #include "Shared/likely.h"
36 
37 #ifdef HAVE_TBB
38 #include <tbb/parallel_for.h>
39 #endif
40 
41 #include <future>
42 #endif
43 
44 #if HAVE_CUDA
45 #include <cuda_runtime.h>
46 #include <thrust/scan.h>
47 #endif
48 #include "Shared/funcannotations.h"
49 
50 #include <cmath>
51 #include <numeric>
52 
53 #ifndef __CUDACC__
54 namespace {
55 
56 inline int64_t map_str_id_to_outer_dict(const int64_t inner_elem,
57  const int64_t min_inner_elem,
58  const int64_t min_outer_elem,
59  const int64_t max_outer_elem,
60  const int32_t* inner_to_outer_translation_map) {
61  const auto outer_id = inner_to_outer_translation_map[inner_elem - min_inner_elem];
62  if (outer_id > max_outer_elem || outer_id < min_outer_elem) {
64  }
65  return outer_id;
66 }
67 
68 } // namespace
69 #endif
70 
71 DEVICE void SUFFIX(init_hash_join_buff)(int32_t* groups_buffer,
72  const int64_t hash_entry_count,
73  const int32_t invalid_slot_val,
74  const int32_t cpu_thread_idx,
75  const int32_t cpu_thread_count) {
76 #ifdef __CUDACC__
77  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
78  int32_t step = blockDim.x * gridDim.x;
79 #else
80  int32_t start = cpu_thread_idx;
81  int32_t step = cpu_thread_count;
82 #endif
83  for (int64_t i = start; i < hash_entry_count; i += step) {
84  groups_buffer[i] = invalid_slot_val;
85  }
86 }
87 
88 #ifndef __CUDACC__
89 #ifdef HAVE_TBB
90 
91 void SUFFIX(init_hash_join_buff_tbb)(int32_t* groups_buffer,
92  const int64_t hash_entry_count,
93  const int32_t invalid_slot_val) {
94  tbb::parallel_for(tbb::blocked_range<int64_t>(0, hash_entry_count),
95  [=](const tbb::blocked_range<int64_t>& r) {
96  const auto start_idx = r.begin();
97  const auto end_idx = r.end();
98  for (auto entry_idx = start_idx; entry_idx != end_idx;
99  ++entry_idx) {
100  groups_buffer[entry_idx] = invalid_slot_val;
101  }
102  });
103 }
104 
105 #endif // #ifdef HAVE_TBB
106 #endif // #ifndef __CUDACC__
107 
108 #ifdef __CUDACC__
109 #define mapd_cas(address, compare, val) atomicCAS(address, compare, val)
110 #elif defined(_MSC_VER)
111 #define mapd_cas(address, compare, val) \
112  InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
113  static_cast<long>(val), \
114  static_cast<long>(compare))
115 #else
116 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
117 #endif
118 
119 template <typename HASHTABLE_FILLING_FUNC>
121  const int32_t cpu_thread_idx,
122  const int32_t cpu_thread_count,
123  HASHTABLE_FILLING_FUNC filling_func) {
124 #ifdef __CUDACC__
125  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
126  int32_t step = blockDim.x * gridDim.x;
127 #else
128  int32_t start = cpu_thread_idx;
129  int32_t step = cpu_thread_count;
130 #endif
131  auto const join_column = args.join_column;
132  auto const type_info = args.type_info;
133  JoinColumnTyped col{&join_column, &type_info};
134  for (auto item : col.slice(start, step)) {
135  const size_t index = item.index;
136  int64_t elem = item.element;
137  if (elem == type_info.null_val) {
138  if (type_info.uses_bw_eq) {
139  elem = type_info.translated_null_val;
140  } else {
141  continue;
142  }
143  }
144 #ifndef __CUDACC__
145  auto const sd_inner_to_outer_translation_map = args.sd_inner_to_outer_translation_map;
146  auto const min_inner_elem = args.min_inner_elem;
147  if (sd_inner_to_outer_translation_map &&
148  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
149  const auto outer_id = map_str_id_to_outer_dict(elem,
150  min_inner_elem,
151  type_info.min_val,
152  type_info.max_val,
153  sd_inner_to_outer_translation_map);
154  if (outer_id == StringDictionary::INVALID_STR_ID) {
155  continue;
156  }
157  elem = outer_id;
158  }
159 #endif
160  if (filling_func(elem, index)) {
161  return -1;
162  }
163  }
164  return 0;
165 };
166 
169  int32_t const cpu_thread_idx,
170  int32_t const cpu_thread_count) {
171  auto filling_func = args.for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
173  auto hashtable_filling_func = [&](auto elem, size_t index) {
174  auto entry_ptr = SUFFIX(get_bucketized_hash_slot)(
175  args.buff,
176  elem,
177  args.type_info.min_val / args.bucket_normalization,
178  args.type_info.translated_null_val,
179  args.bucket_normalization);
180  return filling_func(index, entry_ptr, args.invalid_slot_val);
181  };
182 
184  args, cpu_thread_idx, cpu_thread_count, hashtable_filling_func);
185 }
186 
189  int32_t const cpu_thread_idx,
190  int32_t const cpu_thread_count) {
191  auto filling_func = args.for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
193  auto hashtable_filling_func = [&](auto elem, size_t index) {
194  auto entry_ptr = SUFFIX(get_hash_slot_bitwise_eq)(
195  args.buff, elem, args.type_info.min_val, args.type_info.translated_null_val);
196  return filling_func(index, entry_ptr, args.invalid_slot_val);
197  };
198 
200  args, cpu_thread_idx, cpu_thread_count, hashtable_filling_func);
201 }
202 
205  const int32_t cpu_thread_idx,
206  const int32_t cpu_thread_count) {
207  auto filling_func = args.for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
209  auto hashtable_filling_func = [&](auto elem, size_t index) {
210  auto entry_ptr = SUFFIX(get_hash_slot)(args.buff, elem, args.type_info.min_val);
211  return filling_func(index, entry_ptr, args.invalid_slot_val);
212  };
213 
215  args, cpu_thread_idx, cpu_thread_count, hashtable_filling_func);
216 }
217 
218 template <typename HASHTABLE_FILLING_FUNC>
220  int32_t* buff,
221  const JoinColumn join_column,
222  const JoinColumnTypeInfo type_info,
223  const ShardInfo shard_info,
224  const int32_t* sd_inner_to_outer_translation_map,
225  const int32_t min_inner_elem,
226  const int32_t cpu_thread_idx,
227  const int32_t cpu_thread_count,
228  HASHTABLE_FILLING_FUNC filling_func) {
229 #ifdef __CUDACC__
230  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
231  int32_t step = blockDim.x * gridDim.x;
232 #else
233  int32_t start = cpu_thread_idx;
234  int32_t step = cpu_thread_count;
235 #endif
236  JoinColumnTyped col{&join_column, &type_info};
237  for (auto item : col.slice(start, step)) {
238  const size_t index = item.index;
239  int64_t elem = item.element;
240  size_t shard = SHARD_FOR_KEY(elem, shard_info.num_shards);
241  if (shard != shard_info.shard) {
242  continue;
243  }
244  if (elem == type_info.null_val) {
245  if (type_info.uses_bw_eq) {
246  elem = type_info.translated_null_val;
247  } else {
248  continue;
249  }
250  }
251 #ifndef __CUDACC__
252  if (sd_inner_to_outer_translation_map &&
253  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
254  const auto outer_id = map_str_id_to_outer_dict(elem,
255  min_inner_elem,
256  type_info.min_val,
257  type_info.max_val,
258  sd_inner_to_outer_translation_map);
259  if (outer_id == StringDictionary::INVALID_STR_ID) {
260  continue;
261  }
262  elem = outer_id;
263  }
264 #endif
265  if (filling_func(elem, shard, index)) {
266  return -1;
267  }
268  }
269  return 0;
270 }
271 
273  int32_t* buff,
274  const int32_t invalid_slot_val,
275  const bool for_semi_join,
276  const JoinColumn join_column,
277  const JoinColumnTypeInfo type_info,
278  const ShardInfo shard_info,
279  const int32_t* sd_inner_to_outer_translation_map,
280  const int32_t min_inner_elem,
281  const int32_t cpu_thread_idx,
282  const int32_t cpu_thread_count,
283  const int64_t bucket_normalization) {
284  auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
286  auto hashtable_filling_func = [&](auto elem, auto shard, size_t index) {
288  buff,
289  elem,
290  type_info.min_val / bucket_normalization,
291  type_info.translated_null_val,
292  shard_info.entry_count_per_shard,
293  shard,
294  shard_info.num_shards,
295  shard_info.device_count,
296  bucket_normalization);
297  return filling_func(index, entry_ptr, invalid_slot_val);
298  };
299 
301  join_column,
302  type_info,
303  shard_info,
304  sd_inner_to_outer_translation_map,
305  min_inner_elem,
306  cpu_thread_idx,
307  cpu_thread_count,
308  hashtable_filling_func);
309 }
310 
312  int32_t* buff,
313  const int32_t invalid_slot_val,
314  const bool for_semi_join,
315  const JoinColumn join_column,
316  const JoinColumnTypeInfo type_info,
317  const ShardInfo shard_info,
318  const int32_t* sd_inner_to_outer_translation_map,
319  const int32_t min_inner_elem,
320  const int32_t cpu_thread_idx,
321  const int32_t cpu_thread_count) {
322  auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
324  auto hashtable_filling_func = [&](auto elem, auto shard, size_t index) {
325  auto entry_ptr = SUFFIX(get_hash_slot_sharded_opt)(buff,
326  elem,
327  type_info.min_val,
328  shard_info.entry_count_per_shard,
329  shard,
330  shard_info.num_shards,
331  shard_info.device_count);
332  return filling_func(index, entry_ptr, invalid_slot_val);
333  };
334 
336  join_column,
337  type_info,
338  shard_info,
339  sd_inner_to_outer_translation_map,
340  min_inner_elem,
341  cpu_thread_idx,
342  cpu_thread_count,
343  hashtable_filling_func);
344 }
345 
346 template <typename T>
348  const int64_t entry_count,
349  const size_t key_component_count,
350  const bool with_val_slot,
351  const int32_t invalid_slot_val,
352  const int32_t cpu_thread_idx,
353  const int32_t cpu_thread_count) {
354 #ifdef __CUDACC__
355  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
356  int32_t step = blockDim.x * gridDim.x;
357 #else
358  int32_t start = cpu_thread_idx;
359  int32_t step = cpu_thread_count;
360 #endif
361  auto hash_entry_size = (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
362  const T empty_key = SUFFIX(get_invalid_key)<T>();
363  for (int64_t h = start; h < entry_count; h += step) {
364  int64_t off = h * hash_entry_size;
365  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
366  for (size_t i = 0; i < key_component_count; ++i) {
367  row_ptr[i] = empty_key;
368  }
369  if (with_val_slot) {
370  row_ptr[key_component_count] = invalid_slot_val;
371  }
372  }
373 }
374 
375 #ifndef __CUDACC__
376 #ifdef HAVE_TBB
377 
378 template <typename T>
379 DEVICE void SUFFIX(init_baseline_hash_join_buff_tbb)(int8_t* hash_buff,
380  const int64_t entry_count,
381  const size_t key_component_count,
382  const bool with_val_slot,
383  const int32_t invalid_slot_val) {
384  const auto hash_entry_size =
385  (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
386  const T empty_key = SUFFIX(get_invalid_key)<T>();
387  tbb::parallel_for(tbb::blocked_range<int64_t>(0, entry_count),
388  [=](const tbb::blocked_range<int64_t>& r) {
389  const auto start_idx = r.begin();
390  const auto end_idx = r.end();
391  for (int64_t entry_idx = start_idx; entry_idx < end_idx;
392  ++entry_idx) {
393  const int64_t offset = entry_idx * hash_entry_size;
394  auto row_ptr = reinterpret_cast<T*>(hash_buff + offset);
395  for (size_t k = 0; k < key_component_count; ++k) {
396  row_ptr[k] = empty_key;
397  }
398  if (with_val_slot) {
399  row_ptr[key_component_count] = invalid_slot_val;
400  }
401  }
402  });
403 }
404 
405 #endif // #ifdef HAVE_TBB
406 #endif // #ifndef __CUDACC__
407 
408 #ifdef __CUDACC__
409 template <typename T>
410 __device__ T* get_matching_baseline_hash_slot_at(int8_t* hash_buff,
411  const uint32_t h,
412  const T* key,
413  const size_t key_component_count,
414  const int64_t hash_entry_size) {
415  uint32_t off = h * hash_entry_size;
416  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
417  const T empty_key = SUFFIX(get_invalid_key)<T>();
418  {
419  const T old = atomicCAS(row_ptr, empty_key, *key);
420  if (empty_key == old && key_component_count > 1) {
421  for (int64_t i = 1; i <= key_component_count - 1; ++i) {
422  atomicExch(row_ptr + i, key[i]);
423  }
424  }
425  }
426  if (key_component_count > 1) {
427  while (atomicAdd(row_ptr + key_component_count - 1, 0) == empty_key) {
428  // spin until the winning thread has finished writing the entire key and the init
429  // value
430  }
431  }
432  bool match = true;
433  for (uint32_t i = 0; i < key_component_count; ++i) {
434  if (row_ptr[i] != key[i]) {
435  match = false;
436  break;
437  }
438  }
439 
440  if (match) {
441  return reinterpret_cast<T*>(row_ptr + key_component_count);
442  }
443  return nullptr;
444 }
445 #else
446 
447 #ifdef _MSC_VER
448 #define cas_cst(ptr, expected, desired) \
449  (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
450  reinterpret_cast<void*>(&desired), \
451  expected) == expected)
452 #define store_cst(ptr, val) \
453  InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
454  reinterpret_cast<void*>(val))
455 #define load_cst(ptr) \
456  InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
457 #else
458 #define cas_cst(ptr, expected, desired) \
459  __atomic_compare_exchange_n( \
460  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
461 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
462 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
463 #endif
464 
465 template <typename T>
467  const uint32_t h,
468  const T* key,
469  const size_t key_component_count,
470  const int64_t hash_entry_size) {
471  uint32_t off = h * hash_entry_size;
472  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
473  T empty_key = SUFFIX(get_invalid_key)<T>();
474  T write_pending = SUFFIX(get_invalid_key)<T>() - 1;
475  if (UNLIKELY(*key == write_pending)) {
476  // Address the singularity case where the first column contains the pending
477  // write special value. Should never happen, but avoid doing wrong things.
478  return nullptr;
479  }
480  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
481  if (success) {
482  if (key_component_count > 1) {
483  memcpy(row_ptr + 1, key + 1, (key_component_count - 1) * sizeof(T));
484  }
485  store_cst(row_ptr, *key);
486  return reinterpret_cast<T*>(row_ptr + key_component_count);
487  }
488  while (load_cst(row_ptr) == write_pending) {
489  // spin until the winning thread has finished writing the entire key
490  }
491  for (size_t i = 0; i < key_component_count; ++i) {
492  if (load_cst(row_ptr + i) != key[i]) {
493  return nullptr;
494  }
495  }
496  return reinterpret_cast<T*>(row_ptr + key_component_count);
497 }
498 
499 #undef load_cst
500 #undef store_cst
501 #undef cas_cst
502 
503 #endif // __CUDACC__
504 
505 template <typename T>
506 DEVICE int write_baseline_hash_slot(const int32_t val,
507  int8_t* hash_buff,
508  const int64_t entry_count,
509  const T* key,
510  const size_t key_component_count,
511  const bool with_val_slot,
512  const int32_t invalid_slot_val,
513  const size_t key_size_in_bytes,
514  const size_t hash_entry_size) {
515  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
516  T* matching_group = get_matching_baseline_hash_slot_at(
517  hash_buff, h, key, key_component_count, hash_entry_size);
518  if (!matching_group) {
519  uint32_t h_probe = (h + 1) % entry_count;
520  while (h_probe != h) {
521  matching_group = get_matching_baseline_hash_slot_at(
522  hash_buff, h_probe, key, key_component_count, hash_entry_size);
523  if (matching_group) {
524  break;
525  }
526  h_probe = (h_probe + 1) % entry_count;
527  }
528  }
529  if (!matching_group) {
530  return -2;
531  }
532  if (!with_val_slot) {
533  return 0;
534  }
535  if (mapd_cas(matching_group, invalid_slot_val, val) != invalid_slot_val) {
536  return -1;
537  }
538  return 0;
539 }
540 
541 template <typename T>
543  int8_t* hash_buff,
544  const int64_t entry_count,
545  const T* key,
546  const size_t key_component_count,
547  const bool with_val_slot,
548  const int32_t invalid_slot_val,
549  const size_t key_size_in_bytes,
550  const size_t hash_entry_size) {
551  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
552  T* matching_group = get_matching_baseline_hash_slot_at(
553  hash_buff, h, key, key_component_count, hash_entry_size);
554  if (!matching_group) {
555  uint32_t h_probe = (h + 1) % entry_count;
556  while (h_probe != h) {
557  matching_group = get_matching_baseline_hash_slot_at(
558  hash_buff, h_probe, key, key_component_count, hash_entry_size);
559  if (matching_group) {
560  break;
561  }
562  h_probe = (h_probe + 1) % entry_count;
563  }
564  }
565  if (!matching_group) {
566  return -2;
567  }
568  if (!with_val_slot) {
569  return 0;
570  }
571  mapd_cas(matching_group, invalid_slot_val, val);
572  return 0;
573 }
574 
575 template <typename T, typename FILL_HANDLER>
577  const int64_t entry_count,
578  const int32_t invalid_slot_val,
579  const bool for_semi_join,
580  const size_t key_component_count,
581  const bool with_val_slot,
582  const FILL_HANDLER* f,
583  const int64_t num_elems,
584  const int32_t cpu_thread_idx,
585  const int32_t cpu_thread_count) {
586 #ifdef __CUDACC__
587  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
588  int32_t step = blockDim.x * gridDim.x;
589 #else
590  int32_t start = cpu_thread_idx;
591  int32_t step = cpu_thread_count;
592 #endif
593 
594  T key_scratch_buff[g_maximum_conditions_to_coalesce];
595  const size_t key_size_in_bytes = key_component_count * sizeof(T);
596  const size_t hash_entry_size =
597  (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
598  auto key_buff_handler = [hash_buff,
599  entry_count,
600  with_val_slot,
601  invalid_slot_val,
602  key_size_in_bytes,
603  hash_entry_size,
604  &for_semi_join](const int64_t entry_idx,
605  const T* key_scratch_buffer,
606  const size_t key_component_count) {
607  if (for_semi_join) {
608  return write_baseline_hash_slot_for_semi_join<T>(entry_idx,
609  hash_buff,
610  entry_count,
611  key_scratch_buffer,
612  key_component_count,
613  with_val_slot,
614  invalid_slot_val,
615  key_size_in_bytes,
616  hash_entry_size);
617  } else {
618  return write_baseline_hash_slot<T>(entry_idx,
619  hash_buff,
620  entry_count,
621  key_scratch_buffer,
622  key_component_count,
623  with_val_slot,
624  invalid_slot_val,
625  key_size_in_bytes,
626  hash_entry_size);
627  }
628  };
629 
630  JoinColumnTuple cols(
631  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
632  for (auto& it : cols.slice(start, step)) {
633  const auto err = (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
634  if (err) {
635  return err;
636  }
637  }
638  return 0;
639 }
640 
641 #undef mapd_cas
642 
643 #ifdef __CUDACC__
644 #define mapd_add(address, val) atomicAdd(address, val)
645 #elif defined(_MSC_VER)
646 #define mapd_add(address, val) \
647  InterlockedExchangeAdd(reinterpret_cast<volatile long*>(address), \
648  static_cast<long>(val))
649 #else
650 #define mapd_add(address, val) __sync_fetch_and_add(address, val)
651 #endif
652 
653 template <typename SLOT_SELECTOR>
654 DEVICE void count_matches_impl(int32_t* count_buff,
655  const JoinColumn join_column,
656  const JoinColumnTypeInfo type_info
657 #ifndef __CUDACC__
658  ,
659  const int32_t* sd_inner_to_outer_translation_map,
660  const int32_t min_inner_elem,
661  const int32_t cpu_thread_idx,
662  const int32_t cpu_thread_count
663 #endif
664  ,
665  SLOT_SELECTOR slot_selector) {
666 #ifdef __CUDACC__
667  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
668  int32_t step = blockDim.x * gridDim.x;
669 #else
670  int32_t start = cpu_thread_idx;
671  int32_t step = cpu_thread_count;
672 #endif
673  JoinColumnTyped col{&join_column, &type_info};
674  for (auto item : col.slice(start, step)) {
675  int64_t elem = item.element;
676  if (elem == type_info.null_val) {
677  if (type_info.uses_bw_eq) {
678  elem = type_info.translated_null_val;
679  } else {
680  continue;
681  }
682  }
683 #ifndef __CUDACC__
684  if (sd_inner_to_outer_translation_map &&
685  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
686  const auto outer_id = map_str_id_to_outer_dict(elem,
687  min_inner_elem,
688  type_info.min_val,
689  type_info.max_val,
690  sd_inner_to_outer_translation_map);
691  if (outer_id == StringDictionary::INVALID_STR_ID) {
692  continue;
693  }
694  elem = outer_id;
695  }
696 #endif
697  auto* entry_ptr = slot_selector(count_buff, elem);
698  mapd_add(entry_ptr, int32_t(1));
699  }
700 }
701 
702 GLOBAL void SUFFIX(count_matches)(int32_t* count_buff,
703  const JoinColumn join_column,
704  const JoinColumnTypeInfo type_info
705 #ifndef __CUDACC__
706  ,
707  const int32_t* sd_inner_to_outer_translation_map,
708  const int32_t min_inner_elem,
709  const int32_t cpu_thread_idx,
710  const int32_t cpu_thread_count
711 #endif
712 ) {
713  auto slot_sel = [&type_info](auto count_buff, auto elem) {
714  return SUFFIX(get_hash_slot)(count_buff, elem, type_info.min_val);
715  };
716  count_matches_impl(count_buff,
717  join_column,
718  type_info
719 #ifndef __CUDACC__
720  ,
721  sd_inner_to_outer_translation_map,
722  min_inner_elem,
723  cpu_thread_idx,
724  cpu_thread_count
725 #endif
726  ,
727  slot_sel);
728 }
729 
731  int32_t* count_buff,
732  const JoinColumn join_column,
733  const JoinColumnTypeInfo type_info
734 #ifndef __CUDACC__
735  ,
736  const int32_t* sd_inner_to_outer_translation_map,
737  const int32_t min_inner_elem,
738  const int32_t cpu_thread_idx,
739  const int32_t cpu_thread_count
740 #endif
741  ,
742  const int64_t bucket_normalization) {
743  auto slot_sel = [bucket_normalization, &type_info](auto count_buff, auto elem) {
744  return SUFFIX(get_bucketized_hash_slot)(count_buff,
745  elem,
746  type_info.min_val / bucket_normalization,
747  type_info.translated_null_val,
748  bucket_normalization);
749  };
750  count_matches_impl(count_buff,
751  join_column,
752  type_info
753 #ifndef __CUDACC__
754  ,
755  sd_inner_to_outer_translation_map,
756  min_inner_elem,
757  cpu_thread_idx,
758  cpu_thread_count
759 #endif
760  ,
761  slot_sel);
762 }
763 
765  int32_t* count_buff,
766  const JoinColumn join_column,
767  const JoinColumnTypeInfo type_info,
768  const ShardInfo shard_info
769 #ifndef __CUDACC__
770  ,
771  const int32_t* sd_inner_to_outer_translation_map,
772  const int32_t min_inner_elem,
773  const int32_t cpu_thread_idx,
774  const int32_t cpu_thread_count
775 #endif
776 ) {
777 #ifdef __CUDACC__
778  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
779  int32_t step = blockDim.x * gridDim.x;
780 #else
781  int32_t start = cpu_thread_idx;
782  int32_t step = cpu_thread_count;
783 #endif
784  JoinColumnTyped col{&join_column, &type_info};
785  for (auto item : col.slice(start, step)) {
786  int64_t elem = item.element;
787  if (elem == type_info.null_val) {
788  if (type_info.uses_bw_eq) {
789  elem = type_info.translated_null_val;
790  } else {
791  continue;
792  }
793  }
794 #ifndef __CUDACC__
795  if (sd_inner_to_outer_translation_map &&
796  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
797  const auto outer_id = map_str_id_to_outer_dict(elem,
798  min_inner_elem,
799  type_info.min_val,
800  type_info.max_val,
801  sd_inner_to_outer_translation_map);
802  if (outer_id == StringDictionary::INVALID_STR_ID) {
803  continue;
804  }
805  elem = outer_id;
806  }
807 #endif
808  int32_t* entry_ptr = SUFFIX(get_hash_slot_sharded)(count_buff,
809  elem,
810  type_info.min_val,
811  shard_info.entry_count_per_shard,
812  shard_info.num_shards,
813  shard_info.device_count);
814  mapd_add(entry_ptr, int32_t(1));
815  }
816 }
817 
818 template <typename T>
820  const T* key,
821  const size_t key_component_count,
822  const T* composite_key_dict,
823  const int64_t entry_count,
824  const size_t key_size_in_bytes) {
825  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
826  uint32_t off = h * key_component_count;
827  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
828  return &composite_key_dict[off];
829  }
830  uint32_t h_probe = (h + 1) % entry_count;
831  while (h_probe != h) {
832  off = h_probe * key_component_count;
833  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
834  return &composite_key_dict[off];
835  }
836  h_probe = (h_probe + 1) % entry_count;
837  }
838 #ifndef __CUDACC__
839  CHECK(false);
840 #else
841  assert(false);
842 #endif
843  return nullptr;
844 }
845 
846 template <typename T, typename KEY_HANDLER>
847 GLOBAL void SUFFIX(count_matches_baseline)(int32_t* count_buff,
848  const T* composite_key_dict,
849  const int64_t entry_count,
850  const KEY_HANDLER* f,
851  const int64_t num_elems
852 #ifndef __CUDACC__
853  ,
854  const int32_t cpu_thread_idx,
855  const int32_t cpu_thread_count
856 #endif
857 ) {
858 #ifdef __CUDACC__
859  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
860  int32_t step = blockDim.x * gridDim.x;
861 #else
862  int32_t start = cpu_thread_idx;
863  int32_t step = cpu_thread_count;
864 #endif
865 #ifdef __CUDACC__
866  assert(composite_key_dict);
867 #endif
868  T key_scratch_buff[g_maximum_conditions_to_coalesce];
869  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
870  auto key_buff_handler = [composite_key_dict,
871  entry_count,
872  count_buff,
873  key_size_in_bytes](const int64_t row_entry_idx,
874  const T* key_scratch_buff,
875  const size_t key_component_count) {
876  const auto matching_group =
878  key_component_count,
879  composite_key_dict,
880  entry_count,
881  key_size_in_bytes);
882  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
883  mapd_add(&count_buff[entry_idx], int32_t(1));
884  return 0;
885  };
886 
887  JoinColumnTuple cols(
888  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
889  for (auto& it : cols.slice(start, step)) {
890  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
891  }
892 }
893 
894 template <typename SLOT_SELECTOR>
895 DEVICE void fill_row_ids_impl(int32_t* buff,
896  const int64_t hash_entry_count,
897  const JoinColumn join_column,
898  const JoinColumnTypeInfo type_info
899 #ifndef __CUDACC__
900  ,
901  const int32_t* sd_inner_to_outer_translation_map,
902  const int32_t min_inner_elem,
903  const int32_t cpu_thread_idx,
904  const int32_t cpu_thread_count
905 #endif
906  ,
907  SLOT_SELECTOR slot_selector) {
908  int32_t* pos_buff = buff;
909  int32_t* count_buff = buff + hash_entry_count;
910  int32_t* id_buff = count_buff + hash_entry_count;
911 #ifdef __CUDACC__
912  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
913  int32_t step = blockDim.x * gridDim.x;
914 #else
915  int32_t start = cpu_thread_idx;
916  int32_t step = cpu_thread_count;
917 #endif
918  JoinColumnTyped col{&join_column, &type_info};
919  for (auto item : col.slice(start, step)) {
920  const size_t index = item.index;
921  int64_t elem = item.element;
922  if (elem == type_info.null_val) {
923  if (type_info.uses_bw_eq) {
924  elem = type_info.translated_null_val;
925  } else {
926  continue;
927  }
928  }
929 #ifndef __CUDACC__
930  if (sd_inner_to_outer_translation_map &&
931  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
932  const auto outer_id = map_str_id_to_outer_dict(elem,
933  min_inner_elem,
934  type_info.min_val,
935  type_info.max_val,
936  sd_inner_to_outer_translation_map);
937  if (outer_id == StringDictionary::INVALID_STR_ID) {
938  continue;
939  }
940  elem = outer_id;
941  }
942 #endif
943  auto pos_ptr = slot_selector(pos_buff, elem);
944  const auto bin_idx = pos_ptr - pos_buff;
945  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
946  id_buff[id_buff_idx] = static_cast<int32_t>(index);
947  }
948 }
949 
950 template <typename SLOT_SELECTOR>
952  int32_t* buff,
953  const int64_t hash_entry_count,
954  const JoinColumn join_column,
955  const JoinColumnTypeInfo type_info
956 #ifndef __CUDACC__
957  ,
958  const int32_t* sd_inner_to_outer_translation_map,
959  const int32_t min_inner_elem,
960  const int32_t cpu_thread_idx,
961  const int32_t cpu_thread_count
962 #endif
963  ,
964  SLOT_SELECTOR slot_selector) {
965  int32_t* pos_buff = buff;
966  int32_t* count_buff = buff + hash_entry_count;
967  int32_t* id_buff = count_buff + hash_entry_count;
968  int32_t* reversed_id_buff = id_buff + join_column.num_elems;
969 #ifdef __CUDACC__
970  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
971  int32_t step = blockDim.x * gridDim.x;
972 #else
973  int32_t start = cpu_thread_idx;
974  int32_t step = cpu_thread_count;
975 
976 #endif
977  if (join_column.num_elems == 0) {
978  return;
979  }
980  JoinColumnTyped col{&join_column, &type_info};
981  bool all_nulls = hash_entry_count == 1 && type_info.min_val == 0 &&
982  type_info.max_val == -1 &&
983  (*col.begin()).element == type_info.null_val;
984  if (all_nulls) {
985  int32_t thread_idx = -1;
986 #ifdef __CUDACC__
987  thread_idx = threadIdx.x;
988 #else
989  thread_idx = cpu_thread_idx;
990 #endif
991  if (thread_idx == 0) {
992  pos_buff[0] = 0;
993  count_buff[0] = join_column.num_elems - 1;
994  for (size_t i = 0; i < join_column.num_elems; i++) {
995  reversed_id_buff[i] = i;
996  }
997  }
998  return;
999  }
1000  for (auto item : col.slice(start, step)) {
1001  const size_t index = item.index;
1002  int64_t elem = item.element;
1003  if (elem == type_info.null_val) {
1004  elem = type_info.translated_null_val;
1005  }
1006 #ifndef __CUDACC__
1007  if (sd_inner_to_outer_translation_map && elem != type_info.translated_null_val) {
1008  const auto outer_id = map_str_id_to_outer_dict(elem,
1009  min_inner_elem,
1010  type_info.min_val,
1011  type_info.max_val,
1012  sd_inner_to_outer_translation_map);
1013  if (outer_id == StringDictionary::INVALID_STR_ID) {
1014  continue;
1015  }
1016  elem = outer_id;
1017  }
1018 #endif
1019  auto pos_ptr = slot_selector(pos_buff, elem);
1020  const auto bin_idx = pos_ptr - pos_buff;
1021  auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1022  id_buff[id_buff_idx] = static_cast<int32_t>(index);
1023  reversed_id_buff[index] = id_buff_idx;
1024  }
1025 }
1026 
1027 GLOBAL void SUFFIX(fill_row_ids)(int32_t* buff,
1028  const int64_t hash_entry_count,
1029  const JoinColumn join_column,
1030  const JoinColumnTypeInfo type_info,
1031  const bool for_window_framing
1032 #ifndef __CUDACC__
1033  ,
1034  const int32_t* sd_inner_to_outer_translation_map,
1035  const int32_t min_inner_elem,
1036  const int32_t cpu_thread_idx,
1037  const int32_t cpu_thread_count
1038 #endif
1039 ) {
1040  auto slot_sel = [&type_info](auto pos_buff, auto elem) {
1041  return SUFFIX(get_hash_slot)(pos_buff, elem, type_info.min_val);
1042  };
1043 
1044  if (!for_window_framing) {
1045  fill_row_ids_impl(buff,
1046  hash_entry_count,
1047  join_column,
1048  type_info
1049 #ifndef __CUDACC__
1050  ,
1051  sd_inner_to_outer_translation_map,
1052  min_inner_elem,
1053  cpu_thread_idx,
1054  cpu_thread_count
1055 #endif
1056  ,
1057  slot_sel);
1058  } else {
1060  hash_entry_count,
1061  join_column,
1062  type_info
1063 #ifndef __CUDACC__
1064  ,
1065  sd_inner_to_outer_translation_map,
1066  min_inner_elem,
1067  cpu_thread_idx,
1068  cpu_thread_count
1069 #endif
1070  ,
1071  slot_sel);
1072  }
1073 }
1074 
1076  int32_t* buff,
1077  const int64_t hash_entry_count,
1078  const JoinColumn join_column,
1079  const JoinColumnTypeInfo type_info
1080 #ifndef __CUDACC__
1081  ,
1082  const int32_t* sd_inner_to_outer_translation_map,
1083  const int32_t min_inner_elem,
1084  const int32_t cpu_thread_idx,
1085  const int32_t cpu_thread_count
1086 #endif
1087  ,
1088  const int64_t bucket_normalization) {
1089  auto slot_sel = [&type_info, bucket_normalization](auto pos_buff, auto elem) {
1090  return SUFFIX(get_bucketized_hash_slot)(pos_buff,
1091  elem,
1092  type_info.min_val / bucket_normalization,
1093  type_info.translated_null_val,
1094  bucket_normalization);
1095  };
1096 
1097  fill_row_ids_impl(buff,
1098  hash_entry_count,
1099  join_column,
1100  type_info
1101 #ifndef __CUDACC__
1102  ,
1103  sd_inner_to_outer_translation_map,
1104  min_inner_elem,
1105  cpu_thread_idx,
1106  cpu_thread_count
1107 #endif
1108  ,
1109  slot_sel);
1110 }
1111 
1112 template <typename SLOT_SELECTOR>
1114  const int64_t hash_entry_count,
1115  const JoinColumn join_column,
1116  const JoinColumnTypeInfo type_info,
1117  const ShardInfo shard_info
1118 #ifndef __CUDACC__
1119  ,
1120  const int32_t* sd_inner_to_outer_translation_map,
1121  const int32_t min_inner_elem,
1122  const int32_t cpu_thread_idx,
1123  const int32_t cpu_thread_count
1124 #endif
1125  ,
1126  SLOT_SELECTOR slot_selector) {
1127 
1128  int32_t* pos_buff = buff;
1129  int32_t* count_buff = buff + hash_entry_count;
1130  int32_t* id_buff = count_buff + hash_entry_count;
1131 
1132 #ifdef __CUDACC__
1133  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1134  int32_t step = blockDim.x * gridDim.x;
1135 #else
1136  int32_t start = cpu_thread_idx;
1137  int32_t step = cpu_thread_count;
1138 #endif
1139  JoinColumnTyped col{&join_column, &type_info};
1140  for (auto item : col.slice(start, step)) {
1141  const size_t index = item.index;
1142  int64_t elem = item.element;
1143  if (elem == type_info.null_val) {
1144  if (type_info.uses_bw_eq) {
1145  elem = type_info.translated_null_val;
1146  } else {
1147  continue;
1148  }
1149  }
1150 #ifndef __CUDACC__
1151  if (sd_inner_to_outer_translation_map &&
1152  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
1153  const auto outer_id = map_str_id_to_outer_dict(elem,
1154  min_inner_elem,
1155  type_info.min_val,
1156  type_info.max_val,
1157  sd_inner_to_outer_translation_map);
1158  if (outer_id == StringDictionary::INVALID_STR_ID) {
1159  continue;
1160  }
1161  elem = outer_id;
1162  }
1163 #endif
1164  auto* pos_ptr = slot_selector(pos_buff, elem);
1165  const auto bin_idx = pos_ptr - pos_buff;
1166  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1167  id_buff[id_buff_idx] = static_cast<int32_t>(index);
1168  }
1169 }
1170 
1171 GLOBAL void SUFFIX(fill_row_ids_sharded)(int32_t* buff,
1172  const int64_t hash_entry_count,
1173  const JoinColumn join_column,
1174  const JoinColumnTypeInfo type_info,
1175  const ShardInfo shard_info
1176 #ifndef __CUDACC__
1177  ,
1178  const int32_t* sd_inner_to_outer_translation_map,
1179  const int32_t min_inner_elem,
1180  const int32_t cpu_thread_idx,
1181  const int32_t cpu_thread_count
1182 #endif
1183 ) {
1184  auto slot_sel = [&type_info, &shard_info](auto pos_buff, auto elem) {
1185  return SUFFIX(get_hash_slot_sharded)(pos_buff,
1186  elem,
1187  type_info.min_val,
1188  shard_info.entry_count_per_shard,
1189  shard_info.num_shards,
1190  shard_info.device_count);
1191  };
1192  fill_row_ids_impl(buff,
1193  hash_entry_count,
1194  join_column,
1195  type_info
1196 #ifndef __CUDACC__
1197  ,
1198  sd_inner_to_outer_translation_map,
1199  min_inner_elem,
1200  cpu_thread_idx,
1201  cpu_thread_count
1202 #endif
1203  ,
1204  slot_sel);
1205 }
1206 
1208  int32_t* buff,
1209  const int64_t hash_entry_count,
1210  const JoinColumn join_column,
1211  const JoinColumnTypeInfo type_info,
1212  const ShardInfo shard_info
1213 #ifndef __CUDACC__
1214  ,
1215  const int32_t* sd_inner_to_outer_translation_map,
1216  const int32_t min_inner_elem,
1217  const int32_t cpu_thread_idx,
1218  const int32_t cpu_thread_count
1219 #endif
1220  ,
1221  const int64_t bucket_normalization) {
1222  auto slot_sel = [&shard_info, &type_info, bucket_normalization](auto pos_buff,
1223  auto elem) {
1225  pos_buff,
1226  elem,
1227  type_info.min_val / bucket_normalization,
1228  type_info.translated_null_val,
1229  shard_info.entry_count_per_shard,
1230  shard_info.num_shards,
1231  shard_info.device_count,
1232  bucket_normalization);
1233  };
1234 
1235  fill_row_ids_impl(buff,
1236  hash_entry_count,
1237  join_column,
1238  type_info
1239 #ifndef __CUDACC__
1240  ,
1241  sd_inner_to_outer_translation_map,
1242  min_inner_elem,
1243  cpu_thread_idx,
1244  cpu_thread_count
1245 #endif
1246  ,
1247  slot_sel);
1248 }
1249 
1250 template <typename T, typename KEY_HANDLER>
1252  const T* composite_key_dict,
1253  const int64_t hash_entry_count,
1254  const KEY_HANDLER* f,
1255  const int64_t num_elems,
1256  const bool for_window_framing
1257 #ifndef __CUDACC__
1258  ,
1259  const int32_t cpu_thread_idx,
1260  const int32_t cpu_thread_count
1261 #endif
1262 ) {
1263  int32_t* pos_buff = buff;
1264  int32_t* count_buff = buff + hash_entry_count;
1265  int32_t* id_buff = count_buff + hash_entry_count;
1266  int32_t* reversed_id_buff = for_window_framing ? id_buff + num_elems : nullptr;
1267 #ifdef __CUDACC__
1268  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1269  int32_t step = blockDim.x * gridDim.x;
1270 #else
1271  int32_t start = cpu_thread_idx;
1272  int32_t step = cpu_thread_count;
1273 #endif
1274 
1275  T key_scratch_buff[g_maximum_conditions_to_coalesce];
1276 #ifdef __CUDACC__
1277  assert(composite_key_dict);
1278 #endif
1279  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
1280  auto key_buff_handler = [composite_key_dict,
1281  hash_entry_count,
1282  pos_buff,
1283  count_buff,
1284  id_buff,
1285  reversed_id_buff,
1286  key_size_in_bytes,
1287  for_window_framing](const int64_t row_index,
1288  const T* key_scratch_buff,
1289  const size_t key_component_count) {
1290  const T* matching_group =
1292  key_component_count,
1293  composite_key_dict,
1294  hash_entry_count,
1295  key_size_in_bytes);
1296  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
1297  int32_t* pos_ptr = pos_buff + entry_idx;
1298  const auto bin_idx = pos_ptr - pos_buff;
1299  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1300  id_buff[id_buff_idx] = static_cast<int32_t>(row_index);
1301  if (for_window_framing) {
1302  reversed_id_buff[row_index] = id_buff_idx;
1303  }
1304  return 0;
1305  };
1306 
1307  JoinColumnTuple cols(
1308  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1309  for (auto& it : cols.slice(start, step)) {
1310  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1311  }
1312  return;
1313 }
1314 
1315 #undef mapd_add
1316 
1317 template <typename KEY_HANDLER>
1319  int32_t* row_count_buffer,
1320  const uint32_t b,
1321  const int64_t num_elems,
1322  const KEY_HANDLER* f
1323 #ifndef __CUDACC__
1324  ,
1325  const int32_t cpu_thread_idx,
1326  const int32_t cpu_thread_count
1327 #endif
1328 ) {
1329 #ifdef __CUDACC__
1330  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1331  int32_t step = blockDim.x * gridDim.x;
1332 #else
1333  int32_t start = cpu_thread_idx;
1334  int32_t step = cpu_thread_count;
1335 #endif
1336 
1337  auto key_buff_handler = [b, hll_buffer, row_count_buffer](
1338  const int64_t entry_idx,
1339  const int64_t* key_scratch_buff,
1340  const size_t key_component_count) {
1341  if (row_count_buffer) {
1342  row_count_buffer[entry_idx] += 1;
1343  }
1344 
1345  const uint64_t hash =
1346  MurmurHash64AImpl(key_scratch_buff, key_component_count * sizeof(int64_t), 0);
1347  const uint32_t index = hash >> (64 - b);
1348  const auto rank = get_rank(hash << b, 64 - b);
1349 #ifdef __CUDACC__
1350  atomicMax(reinterpret_cast<int32_t*>(hll_buffer) + index, rank);
1351 #else
1352  hll_buffer[index] = std::max(hll_buffer[index], rank);
1353 #endif
1354 
1355  return 0;
1356  };
1357 
1358  int64_t key_scratch_buff[g_maximum_conditions_to_coalesce];
1359 
1360  JoinColumnTuple cols(
1361  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1362  for (auto& it : cols.slice(start, step)) {
1363  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1364  }
1365 }
1366 
1367 #ifdef __CUDACC__
1368 namespace {
1369 // TODO(adb): put these in a header file so they are not duplicated between here and
1370 // cuda_mapd_rt.cu
1371 __device__ double atomicMin(double* address, double val) {
1372  unsigned long long int* address_as_ull = (unsigned long long int*)address;
1373  unsigned long long int old = *address_as_ull, assumed;
1374 
1375  do {
1376  assumed = old;
1377  old = atomicCAS(address_as_ull,
1378  assumed,
1379  __double_as_longlong(min(val, __longlong_as_double(assumed))));
1380  } while (assumed != old);
1381 
1382  return __longlong_as_double(old);
1383 }
1384 } // namespace
1385 #endif
1386 
1387 template <size_t N>
1388 GLOBAL void SUFFIX(compute_bucket_sizes_impl)(double* bucket_sizes_for_thread,
1389  const JoinColumn* join_column,
1390  const JoinColumnTypeInfo* type_info,
1391  const double* bucket_size_thresholds
1392 #ifndef __CUDACC__
1393  ,
1394  const int32_t cpu_thread_idx,
1395  const int32_t cpu_thread_count
1396 #endif
1397 ) {
1398 #ifdef __CUDACC__
1399  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1400  int32_t step = blockDim.x * gridDim.x;
1401 #else
1402  int32_t start = cpu_thread_idx;
1403  int32_t step = cpu_thread_count;
1404 #endif
1405  JoinColumnIterator it(join_column, type_info, start, step);
1406  for (; it; ++it) {
1407  // We expect the bounds column to be (min, max) e.g. (x_min, y_min, x_max, y_max)
1408  double bounds[2 * N];
1409  for (size_t j = 0; j < 2 * N; j++) {
1410  bounds[j] = SUFFIX(fixed_width_double_decode_noinline)(it.ptr(), j);
1411  }
1412 
1413  for (size_t j = 0; j < N; j++) {
1414  const auto diff = bounds[j + N] - bounds[j];
1415 #ifdef __CUDACC__
1416  if (diff > bucket_size_thresholds[j]) {
1417  atomicMin(&bucket_sizes_for_thread[j], diff);
1418  }
1419 #else
1420  if (diff < bucket_size_thresholds[j] && diff > bucket_sizes_for_thread[j]) {
1421  bucket_sizes_for_thread[j] = diff;
1422  }
1423 #endif
1424  }
1425  }
1426 }
1427 
1428 #ifndef __CUDACC__
1429 
1430 template <typename InputIterator, typename OutputIterator>
1431 void inclusive_scan(InputIterator first,
1432  InputIterator last,
1433  OutputIterator out,
1434  const size_t thread_count) {
1435  using ElementType = typename InputIterator::value_type;
1436  using OffsetType = typename InputIterator::difference_type;
1437  const OffsetType elem_count = last - first;
1438  if (elem_count < 10000 || thread_count <= 1) {
1439  ElementType sum = 0;
1440  for (auto iter = first; iter != last; ++iter, ++out) {
1441  *out = sum += *iter;
1442  }
1443  return;
1444  }
1445 
1446  const OffsetType step = (elem_count + thread_count - 1) / thread_count;
1447  OffsetType start_off = 0;
1448  OffsetType end_off = std::min(step, elem_count);
1449  std::vector<ElementType> partial_sums(thread_count);
1450  std::vector<std::future<void>> counter_threads;
1451  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx,
1452  start_off = std::min(start_off + step, elem_count),
1453  end_off = std::min(start_off + step, elem_count)) {
1454  counter_threads.push_back(std::async(
1456  [first, out](
1457  ElementType& partial_sum, const OffsetType start, const OffsetType end) {
1458  ElementType sum = 0;
1459  for (auto in_iter = first + start, out_iter = out + start;
1460  in_iter != (first + end);
1461  ++in_iter, ++out_iter) {
1462  *out_iter = sum += *in_iter;
1463  }
1464  partial_sum = sum;
1465  },
1466  std::ref(partial_sums[thread_idx]),
1467  start_off,
1468  end_off));
1469  }
1470  for (auto& child : counter_threads) {
1471  child.get();
1472  }
1473 
1474  ElementType sum = 0;
1475  for (auto& s : partial_sums) {
1476  s += sum;
1477  sum = s;
1478  }
1479 
1480  counter_threads.clear();
1481  start_off = std::min(step, elem_count);
1482  end_off = std::min(start_off + step, elem_count);
1483  for (size_t thread_idx = 0; thread_idx < thread_count - 1; ++thread_idx,
1484  start_off = std::min(start_off + step, elem_count),
1485  end_off = std::min(start_off + step, elem_count)) {
1486  counter_threads.push_back(std::async(
1488  [out](const ElementType prev_sum, const OffsetType start, const OffsetType end) {
1489  for (auto iter = out + start; iter != (out + end); ++iter) {
1490  *iter += prev_sum;
1491  }
1492  },
1493  partial_sums[thread_idx],
1494  start_off,
1495  end_off));
1496  }
1497  for (auto& child : counter_threads) {
1498  child.get();
1499  }
1500 }
1501 
1502 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1504  const int64_t hash_entry_count,
1505  const JoinColumn& join_column,
1506  const JoinColumnTypeInfo& type_info,
1507  const int32_t* sd_inner_to_outer_translation_map,
1508  const int32_t min_inner_elem,
1509  const int32_t cpu_thread_count,
1510  const bool for_window_framing,
1511  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func,
1512  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func) {
1513  auto timer = DEBUG_TIMER(__func__);
1514  int32_t* pos_buff = buff;
1515  int32_t* count_buff = buff + hash_entry_count;
1516  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1517  std::vector<std::future<void>> counter_threads;
1518  for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1519  counter_threads.push_back(std::async(
1520  std::launch::async, count_matches_func, cpu_thread_idx, cpu_thread_count));
1521  }
1522 
1523  for (auto& child : counter_threads) {
1524  child.get();
1525  }
1526 
1527  std::vector<int32_t> count_copy(hash_entry_count, 0);
1528  CHECK_GT(hash_entry_count, int64_t(0));
1529  memcpy(count_copy.data() + 1, count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1530 #if HAVE_CUDA
1531  thrust::inclusive_scan(count_copy.begin(), count_copy.end(), count_copy.begin());
1532 #else
1534  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1535 #endif
1536  std::vector<std::future<void>> pos_threads;
1537  for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1538  pos_threads.push_back(std::async(
1540  [&](size_t thread_idx) {
1541  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1542  if (count_buff[i]) {
1543  pos_buff[i] = count_copy[i];
1544  }
1545  }
1546  },
1547  cpu_thread_idx));
1548  }
1549  for (auto& child : pos_threads) {
1550  child.get();
1551  }
1552  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1553  std::vector<std::future<void>> rowid_threads;
1554  for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1555  rowid_threads.push_back(std::async(
1556  std::launch::async, fill_row_ids_func, cpu_thread_idx, cpu_thread_count));
1557  }
1558 
1559  for (auto& child : rowid_threads) {
1560  child.get();
1561  }
1562 }
1563 
1565  const int32_t cpu_thread_count) {
1566  auto timer = DEBUG_TIMER(__func__);
1567  auto const buff = args.buff;
1568  auto const hash_entry_info = args.hash_entry_info;
1569  auto launch_count_matches = [count_buff =
1570  buff + hash_entry_info.bucketized_hash_entry_count,
1571  &args](auto cpu_thread_idx, auto cpu_thread_count) {
1573  (count_buff,
1574  args.join_column,
1575  args.type_info,
1577  args.min_inner_elem,
1578  cpu_thread_idx,
1579  cpu_thread_count);
1580  };
1581  auto launch_fill_row_ids =
1582  [hash_entry_count = hash_entry_info.bucketized_hash_entry_count, buff, args](
1583  auto cpu_thread_idx, auto cpu_thread_count) {
1585  (buff,
1586  hash_entry_count,
1587  args.join_column,
1588  args.type_info,
1589  args.for_window_framing,
1591  args.min_inner_elem,
1592  cpu_thread_idx,
1593  cpu_thread_count);
1594  };
1595 
1597  hash_entry_info.bucketized_hash_entry_count,
1598  args.join_column,
1599  args.type_info,
1601  args.min_inner_elem,
1602  cpu_thread_count,
1603  args.for_window_framing,
1604  launch_count_matches,
1605  launch_fill_row_ids);
1606 }
1607 
1610  const int32_t cpu_thread_count) {
1611  auto timer = DEBUG_TIMER(__func__);
1612  auto const buff = args.buff;
1613  auto const hash_entry_info = args.hash_entry_info;
1614  auto bucket_normalization = hash_entry_info.bucket_normalization;
1615  auto hash_entry_count = hash_entry_info.getNormalizedHashEntryCount();
1616  auto launch_count_matches = [bucket_normalization,
1617  count_buff = buff + hash_entry_count,
1618  &args](auto cpu_thread_idx, auto cpu_thread_count) {
1620  (count_buff,
1621  args.join_column,
1622  args.type_info,
1624  args.min_inner_elem,
1625  cpu_thread_idx,
1626  cpu_thread_count,
1627  bucket_normalization);
1628  };
1629  auto launch_fill_row_ids = [bucket_normalization, hash_entry_count, buff, args](
1630  auto cpu_thread_idx, auto cpu_thread_count) {
1632  (buff,
1633  hash_entry_count,
1634  args.join_column,
1635  args.type_info,
1637  args.min_inner_elem,
1638  cpu_thread_idx,
1639  cpu_thread_count,
1640  bucket_normalization);
1641  };
1642 
1644  hash_entry_count,
1645  args.join_column,
1646  args.type_info,
1648  args.min_inner_elem,
1649  cpu_thread_count,
1650  false,
1651  launch_count_matches,
1652  launch_fill_row_ids);
1653 }
1654 
1655 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1657  int32_t* buff,
1658  const int64_t hash_entry_count,
1659  const JoinColumn& join_column,
1660  const JoinColumnTypeInfo& type_info,
1661  const ShardInfo& shard_info,
1662  const int32_t* sd_inner_to_outer_translation_map,
1663  const int32_t min_inner_elem,
1664  const int32_t cpu_thread_count,
1665  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher,
1666  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher) {
1667  auto timer = DEBUG_TIMER(__func__);
1668  int32_t* pos_buff = buff;
1669  int32_t* count_buff = buff + hash_entry_count;
1670  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1671  std::vector<std::future<void>> counter_threads;
1672  for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1673  counter_threads.push_back(std::async(
1674  std::launch::async, count_matches_launcher, cpu_thread_idx, cpu_thread_count));
1675  }
1676 
1677  for (auto& child : counter_threads) {
1678  child.get();
1679  }
1680 
1681  std::vector<int32_t> count_copy(hash_entry_count, 0);
1682  CHECK_GT(hash_entry_count, int64_t(0));
1683  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1685  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1686  std::vector<std::future<void>> pos_threads;
1687  for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1688  pos_threads.push_back(std::async(
1690  [&](const int32_t thread_idx) {
1691  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1692  if (count_buff[i]) {
1693  pos_buff[i] = count_copy[i];
1694  }
1695  }
1696  },
1697  cpu_thread_idx));
1698  }
1699  for (auto& child : pos_threads) {
1700  child.get();
1701  }
1702 
1703  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1704  std::vector<std::future<void>> rowid_threads;
1705  for (int32_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1706  rowid_threads.push_back(std::async(
1707  std::launch::async, fill_row_ids_launcher, cpu_thread_idx, cpu_thread_count));
1708  }
1709 
1710  for (auto& child : rowid_threads) {
1711  child.get();
1712  }
1713 }
1714 
1716  const int64_t hash_entry_count,
1717  const JoinColumn& join_column,
1718  const JoinColumnTypeInfo& type_info,
1719  const ShardInfo& shard_info,
1720  const int32_t* sd_inner_to_outer_translation_map,
1721  const int32_t min_inner_elem,
1722  const int32_t cpu_thread_count) {
1723  auto launch_count_matches = [count_buff = buff + hash_entry_count,
1724  &join_column,
1725  &type_info,
1726  &shard_info
1727 #ifndef __CUDACC__
1728  ,
1729  sd_inner_to_outer_translation_map,
1730  min_inner_elem
1731 #endif
1732  ](auto cpu_thread_idx, auto cpu_thread_count) {
1733  return SUFFIX(count_matches_sharded)(count_buff,
1734  join_column,
1735  type_info,
1736  shard_info
1737 #ifndef __CUDACC__
1738  ,
1739  sd_inner_to_outer_translation_map,
1740  min_inner_elem,
1741  cpu_thread_idx,
1742  cpu_thread_count
1743 #endif
1744  );
1745  };
1746 
1747  auto launch_fill_row_ids = [buff,
1748  hash_entry_count,
1749  &join_column,
1750  &type_info,
1751  &shard_info
1752 #ifndef __CUDACC__
1753  ,
1754  sd_inner_to_outer_translation_map,
1755  min_inner_elem
1756 #endif
1757  ](auto cpu_thread_idx, auto cpu_thread_count) {
1758  return SUFFIX(fill_row_ids_sharded)(buff,
1759  hash_entry_count,
1760  join_column,
1761  type_info,
1762  shard_info
1763 #ifndef __CUDACC__
1764  ,
1765  sd_inner_to_outer_translation_map,
1766  min_inner_elem,
1767  cpu_thread_idx,
1768  cpu_thread_count);
1769 #endif
1770  };
1771 
1773  hash_entry_count,
1774  join_column,
1775  type_info,
1776  shard_info
1777 #ifndef __CUDACC__
1778  ,
1779  sd_inner_to_outer_translation_map,
1780  min_inner_elem,
1781  cpu_thread_count
1782 #endif
1783  ,
1784  launch_count_matches,
1785  launch_fill_row_ids);
1786 }
1787 
1788 void init_baseline_hash_join_buff_32(int8_t* hash_join_buff,
1789  const int64_t entry_count,
1790  const size_t key_component_count,
1791  const bool with_val_slot,
1792  const int32_t invalid_slot_val,
1793  const int32_t cpu_thread_idx,
1794  const int32_t cpu_thread_count) {
1795  init_baseline_hash_join_buff<int32_t>(hash_join_buff,
1796  entry_count,
1797  key_component_count,
1798  with_val_slot,
1799  invalid_slot_val,
1800  cpu_thread_idx,
1801  cpu_thread_count);
1802 }
1803 
1804 void init_baseline_hash_join_buff_64(int8_t* hash_join_buff,
1805  const int64_t entry_count,
1806  const size_t key_component_count,
1807  const bool with_val_slot,
1808  const int32_t invalid_slot_val,
1809  const int32_t cpu_thread_idx,
1810  const int32_t cpu_thread_count) {
1811  init_baseline_hash_join_buff<int64_t>(hash_join_buff,
1812  entry_count,
1813  key_component_count,
1814  with_val_slot,
1815  invalid_slot_val,
1816  cpu_thread_idx,
1817  cpu_thread_count);
1818 }
1819 
1820 #ifndef __CUDACC__
1821 #ifdef HAVE_TBB
1822 
1823 void init_baseline_hash_join_buff_tbb_32(int8_t* hash_join_buff,
1824  const int64_t entry_count,
1825  const size_t key_component_count,
1826  const bool with_val_slot,
1827  const int32_t invalid_slot_val) {
1828  init_baseline_hash_join_buff_tbb<int32_t>(
1829  hash_join_buff, entry_count, key_component_count, with_val_slot, invalid_slot_val);
1830 }
1831 
1832 void init_baseline_hash_join_buff_tbb_64(int8_t* hash_join_buff,
1833  const int64_t entry_count,
1834  const size_t key_component_count,
1835  const bool with_val_slot,
1836  const int32_t invalid_slot_val) {
1837  init_baseline_hash_join_buff_tbb<int64_t>(
1838  hash_join_buff, entry_count, key_component_count, with_val_slot, invalid_slot_val);
1839 }
1840 
1841 #endif // #ifdef HAVE_TBB
1842 #endif // #ifndef __CUDACC__
1843 
1844 int fill_baseline_hash_join_buff_32(int8_t* hash_buff,
1845  const int64_t entry_count,
1846  const int32_t invalid_slot_val,
1847  const bool for_semi_join,
1848  const size_t key_component_count,
1849  const bool with_val_slot,
1850  const GenericKeyHandler* key_handler,
1851  const int64_t num_elems,
1852  const int32_t cpu_thread_idx,
1853  const int32_t cpu_thread_count) {
1854  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1855  entry_count,
1856  invalid_slot_val,
1857  for_semi_join,
1858  key_component_count,
1859  with_val_slot,
1860  key_handler,
1861  num_elems,
1862  cpu_thread_idx,
1863  cpu_thread_count);
1864 }
1865 
1867  int8_t* hash_buff,
1868  const int64_t entry_count,
1869  const int32_t invalid_slot_val,
1870  const size_t key_component_count,
1871  const bool with_val_slot,
1872  const BoundingBoxIntersectKeyHandler* key_handler,
1873  const int64_t num_elems,
1874  const int32_t cpu_thread_idx,
1875  const int32_t cpu_thread_count) {
1876  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1877  entry_count,
1878  invalid_slot_val,
1879  false,
1880  key_component_count,
1881  with_val_slot,
1882  key_handler,
1883  num_elems,
1884  cpu_thread_idx,
1885  cpu_thread_count);
1886 }
1887 
1889  const size_t entry_count,
1890  const int32_t invalid_slot_val,
1891  const size_t key_component_count,
1892  const bool with_val_slot,
1893  const RangeKeyHandler* key_handler,
1894  const size_t num_elems,
1895  const int32_t cpu_thread_idx,
1896  const int32_t cpu_thread_count) {
1897  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1898  entry_count,
1899  invalid_slot_val,
1900  false,
1901  key_component_count,
1902  with_val_slot,
1903  key_handler,
1904  num_elems,
1905  cpu_thread_idx,
1906  cpu_thread_count);
1907 }
1908 
1909 int fill_baseline_hash_join_buff_64(int8_t* hash_buff,
1910  const int64_t entry_count,
1911  const int32_t invalid_slot_val,
1912  const bool for_semi_join,
1913  const size_t key_component_count,
1914  const bool with_val_slot,
1915  const GenericKeyHandler* key_handler,
1916  const int64_t num_elems,
1917  const int32_t cpu_thread_idx,
1918  const int32_t cpu_thread_count) {
1919  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1920  entry_count,
1921  invalid_slot_val,
1922  for_semi_join,
1923  key_component_count,
1924  with_val_slot,
1925  key_handler,
1926  num_elems,
1927  cpu_thread_idx,
1928  cpu_thread_count);
1929 }
1930 
1932  int8_t* hash_buff,
1933  const int64_t entry_count,
1934  const int32_t invalid_slot_val,
1935  const size_t key_component_count,
1936  const bool with_val_slot,
1937  const BoundingBoxIntersectKeyHandler* key_handler,
1938  const int64_t num_elems,
1939  const int32_t cpu_thread_idx,
1940  const int32_t cpu_thread_count) {
1941  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1942  entry_count,
1943  invalid_slot_val,
1944  false,
1945  key_component_count,
1946  with_val_slot,
1947  key_handler,
1948  num_elems,
1949  cpu_thread_idx,
1950  cpu_thread_count);
1951 }
1952 
1954  const size_t entry_count,
1955  const int32_t invalid_slot_val,
1956  const size_t key_component_count,
1957  const bool with_val_slot,
1958  const RangeKeyHandler* key_handler,
1959  const size_t num_elems,
1960  const int32_t cpu_thread_idx,
1961  const int32_t cpu_thread_count) {
1962  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1963  entry_count,
1964  invalid_slot_val,
1965  false,
1966  key_component_count,
1967  with_val_slot,
1968  key_handler,
1969  num_elems,
1970  cpu_thread_idx,
1971  cpu_thread_count);
1972 }
1973 
1974 template <typename T>
1976  int32_t* buff,
1977  const T* composite_key_dict,
1978  const int64_t hash_entry_count,
1979  const size_t key_component_count,
1980  const std::vector<JoinColumn>& join_column_per_key,
1981  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1982  const std::vector<JoinBucketInfo>& join_buckets_per_key,
1983  const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
1984  const std::vector<int32_t>& sd_min_inner_elems,
1985  const size_t cpu_thread_count,
1986  const bool is_range_join,
1987  const bool is_geo_compressed,
1988  const bool for_window_framing) {
1989  int32_t* pos_buff = buff;
1990  int32_t* count_buff = buff + hash_entry_count;
1991  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1992  std::vector<std::future<void>> counter_threads;
1993  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1994  if (is_range_join) {
1995  counter_threads.push_back(std::async(
1997  [count_buff,
1998  composite_key_dict,
1999  &hash_entry_count,
2000  &join_buckets_per_key,
2001  &join_column_per_key,
2002  &is_geo_compressed,
2003  cpu_thread_idx,
2004  cpu_thread_count] {
2005  const auto key_handler = RangeKeyHandler(
2006  is_geo_compressed,
2007  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2008  &join_column_per_key[0],
2009  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2010  count_matches_baseline(count_buff,
2011  composite_key_dict,
2012  hash_entry_count,
2013  &key_handler,
2014  join_column_per_key[0].num_elems,
2015  cpu_thread_idx,
2016  cpu_thread_count);
2017  }));
2018  } else if (join_buckets_per_key.size() > 0) {
2019  counter_threads.push_back(std::async(
2021  [count_buff,
2022  composite_key_dict,
2023  &hash_entry_count,
2024  &join_buckets_per_key,
2025  &join_column_per_key,
2026  cpu_thread_idx,
2027  cpu_thread_count] {
2028  const auto key_handler = BoundingBoxIntersectKeyHandler(
2029  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2030  &join_column_per_key[0],
2031  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2032  count_matches_baseline(count_buff,
2033  composite_key_dict,
2034  hash_entry_count,
2035  &key_handler,
2036  join_column_per_key[0].num_elems,
2037  cpu_thread_idx,
2038  cpu_thread_count);
2039  }));
2040  } else {
2041  counter_threads.push_back(
2043  [count_buff,
2044  composite_key_dict,
2045  &key_component_count,
2046  &hash_entry_count,
2047  &join_column_per_key,
2048  &type_info_per_key,
2049  &sd_inner_to_outer_translation_maps,
2050  &sd_min_inner_elems,
2051  cpu_thread_idx,
2052  cpu_thread_count] {
2053  const auto key_handler =
2054  GenericKeyHandler(key_component_count,
2055  true,
2056  &join_column_per_key[0],
2057  &type_info_per_key[0],
2058  &sd_inner_to_outer_translation_maps[0],
2059  &sd_min_inner_elems[0]);
2060  count_matches_baseline(count_buff,
2061  composite_key_dict,
2062  hash_entry_count,
2063  &key_handler,
2064  join_column_per_key[0].num_elems,
2065  cpu_thread_idx,
2066  cpu_thread_count);
2067  }));
2068  }
2069  }
2070 
2071  for (auto& child : counter_threads) {
2072  child.get();
2073  }
2074 
2075  std::vector<int32_t> count_copy(hash_entry_count, 0);
2076  CHECK_GT(hash_entry_count, int64_t(0));
2077  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
2079  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
2080  std::vector<std::future<void>> pos_threads;
2081  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
2082  pos_threads.push_back(std::async(
2084  [&](const int thread_idx) {
2085  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
2086  if (count_buff[i]) {
2087  pos_buff[i] = count_copy[i];
2088  }
2089  }
2090  },
2091  cpu_thread_idx));
2092  }
2093  for (auto& child : pos_threads) {
2094  child.get();
2095  }
2096 
2097  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
2098  std::vector<std::future<void>> rowid_threads;
2099  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
2100  if (is_range_join) {
2101  rowid_threads.push_back(std::async(
2103  [buff,
2104  composite_key_dict,
2105  hash_entry_count,
2106  &join_column_per_key,
2107  &join_buckets_per_key,
2108  &is_geo_compressed,
2109  cpu_thread_idx,
2110  cpu_thread_count] {
2111  const auto key_handler = RangeKeyHandler(
2112  is_geo_compressed,
2113  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2114  &join_column_per_key[0],
2115  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2117  (buff,
2118  composite_key_dict,
2119  hash_entry_count,
2120  &key_handler,
2121  join_column_per_key[0].num_elems,
2122  false,
2123  cpu_thread_idx,
2124  cpu_thread_count);
2125  }));
2126  } else if (join_buckets_per_key.size() > 0) {
2127  rowid_threads.push_back(std::async(
2129  [buff,
2130  composite_key_dict,
2131  hash_entry_count,
2132  &join_column_per_key,
2133  &join_buckets_per_key,
2134  for_window_framing,
2135  cpu_thread_idx,
2136  cpu_thread_count] {
2137  const auto key_handler = BoundingBoxIntersectKeyHandler(
2138  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2139  &join_column_per_key[0],
2140  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2142  (buff,
2143  composite_key_dict,
2144  hash_entry_count,
2145  &key_handler,
2146  join_column_per_key[0].num_elems,
2147  for_window_framing,
2148  cpu_thread_idx,
2149  cpu_thread_count);
2150  }));
2151  } else {
2152  rowid_threads.push_back(std::async(std::launch::async,
2153  [buff,
2154  composite_key_dict,
2155  hash_entry_count,
2156  key_component_count,
2157  &join_column_per_key,
2158  &type_info_per_key,
2159  &sd_inner_to_outer_translation_maps,
2160  &sd_min_inner_elems,
2161  for_window_framing,
2162  cpu_thread_idx,
2163  cpu_thread_count] {
2164  const auto key_handler = GenericKeyHandler(
2165  key_component_count,
2166  true,
2167  &join_column_per_key[0],
2168  &type_info_per_key[0],
2169  &sd_inner_to_outer_translation_maps[0],
2170  &sd_min_inner_elems[0]);
2172  (buff,
2173  composite_key_dict,
2174  hash_entry_count,
2175  &key_handler,
2176  join_column_per_key[0].num_elems,
2177  for_window_framing,
2178  cpu_thread_idx,
2179  cpu_thread_count);
2180  }));
2181  }
2182  }
2183 
2184  for (auto& child : rowid_threads) {
2185  child.get();
2186  }
2187 }
2188 
2190  int32_t* buff,
2191  const int32_t* composite_key_dict,
2192  const int64_t hash_entry_count,
2193  const size_t key_component_count,
2194  const std::vector<JoinColumn>& join_column_per_key,
2195  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2196  const std::vector<JoinBucketInfo>& join_bucket_info,
2197  const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
2198  const std::vector<int32_t>& sd_min_inner_elems,
2199  const int32_t cpu_thread_count,
2200  const bool is_range_join,
2201  const bool is_geo_compressed,
2202  const bool for_window_framing) {
2203  fill_one_to_many_baseline_hash_table<int32_t>(buff,
2204  composite_key_dict,
2205  hash_entry_count,
2206  key_component_count,
2207  join_column_per_key,
2208  type_info_per_key,
2209  join_bucket_info,
2210  sd_inner_to_outer_translation_maps,
2211  sd_min_inner_elems,
2212  cpu_thread_count,
2213  is_range_join,
2214  is_geo_compressed,
2215  for_window_framing);
2216 }
2217 
2219  int32_t* buff,
2220  const int64_t* composite_key_dict,
2221  const int64_t hash_entry_count,
2222  const size_t key_component_count,
2223  const std::vector<JoinColumn>& join_column_per_key,
2224  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2225  const std::vector<JoinBucketInfo>& join_bucket_info,
2226  const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
2227  const std::vector<int32_t>& sd_min_inner_elems,
2228  const int32_t cpu_thread_count,
2229  const bool is_range_join,
2230  const bool is_geo_compressed,
2231  const bool for_window_framing) {
2232  fill_one_to_many_baseline_hash_table<int64_t>(buff,
2233  composite_key_dict,
2234  hash_entry_count,
2235  key_component_count,
2236  join_column_per_key,
2237  type_info_per_key,
2238  join_bucket_info,
2239  sd_inner_to_outer_translation_maps,
2240  sd_min_inner_elems,
2241  cpu_thread_count,
2242  is_range_join,
2243  is_geo_compressed,
2244  for_window_framing);
2245 }
2246 
2247 void approximate_distinct_tuples(uint8_t* hll_buffer_all_cpus,
2248  const uint32_t b,
2249  const size_t padded_size_bytes,
2250  const std::vector<JoinColumn>& join_column_per_key,
2251  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2252  const int thread_count) {
2253  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2254  CHECK(!join_column_per_key.empty());
2255 
2256  std::vector<std::future<void>> approx_distinct_threads;
2257  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2258  approx_distinct_threads.push_back(std::async(
2260  [&join_column_per_key,
2261  &type_info_per_key,
2262  b,
2263  hll_buffer_all_cpus,
2264  padded_size_bytes,
2265  thread_idx,
2266  thread_count] {
2267  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2268 
2269  const auto key_handler = GenericKeyHandler(join_column_per_key.size(),
2270  false,
2271  &join_column_per_key[0],
2272  &type_info_per_key[0],
2273  nullptr,
2274  nullptr);
2276  nullptr,
2277  b,
2278  join_column_per_key[0].num_elems,
2279  &key_handler,
2280  thread_idx,
2281  thread_count);
2282  }));
2283  }
2284  for (auto& child : approx_distinct_threads) {
2285  child.get();
2286  }
2287 }
2288 
2290  uint8_t* hll_buffer_all_cpus,
2291  std::vector<int32_t>& row_counts,
2292  const uint32_t b,
2293  const size_t padded_size_bytes,
2294  const std::vector<JoinColumn>& join_column_per_key,
2295  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2296  const std::vector<JoinBucketInfo>& join_buckets_per_key,
2297  const int thread_count) {
2298  CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2299  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2300  CHECK(!join_column_per_key.empty());
2301 
2302  std::vector<std::future<void>> approx_distinct_threads;
2303  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2304  approx_distinct_threads.push_back(std::async(
2306  [&join_column_per_key,
2307  &join_buckets_per_key,
2308  &row_counts,
2309  b,
2310  hll_buffer_all_cpus,
2311  padded_size_bytes,
2312  thread_idx,
2313  thread_count] {
2314  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2315 
2316  const auto key_handler = BoundingBoxIntersectKeyHandler(
2317  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2318  &join_column_per_key[0],
2319  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2321  row_counts.data(),
2322  b,
2323  join_column_per_key[0].num_elems,
2324  &key_handler,
2325  thread_idx,
2326  thread_count);
2327  }));
2328  }
2329  for (auto& child : approx_distinct_threads) {
2330  child.get();
2331  }
2332 
2334  row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2335 }
2336 
2338  uint8_t* hll_buffer_all_cpus,
2339  std::vector<int32_t>& row_counts,
2340  const uint32_t b,
2341  const size_t padded_size_bytes,
2342  const std::vector<JoinColumn>& join_column_per_key,
2343  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2344  const std::vector<JoinBucketInfo>& join_buckets_per_key,
2345  const bool is_compressed,
2346  const int thread_count) {
2347  CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2348  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2349  CHECK(!join_column_per_key.empty());
2350 
2351  std::vector<std::future<void>> approx_distinct_threads;
2352  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2353  approx_distinct_threads.push_back(std::async(
2355  [&join_column_per_key,
2356  &join_buckets_per_key,
2357  &row_counts,
2358  b,
2359  hll_buffer_all_cpus,
2360  padded_size_bytes,
2361  thread_idx,
2362  is_compressed,
2363  thread_count] {
2364  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2365 
2366  const auto key_handler = RangeKeyHandler(
2367  is_compressed,
2368  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2369  &join_column_per_key[0],
2370  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2372  row_counts.data(),
2373  b,
2374  join_column_per_key[0].num_elems,
2375  &key_handler,
2376  thread_idx,
2377  thread_count);
2378  }));
2379  }
2380  for (auto& child : approx_distinct_threads) {
2381  child.get();
2382  }
2383 
2385  row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2386 }
2387 
2388 void compute_bucket_sizes_on_cpu(std::vector<double>& bucket_sizes_for_dimension,
2389  const JoinColumn& join_column,
2390  const JoinColumnTypeInfo& type_info,
2391  const std::vector<double>& bucket_size_thresholds,
2392  const int thread_count) {
2393  std::vector<std::vector<double>> bucket_sizes_for_threads;
2394  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2395  bucket_sizes_for_threads.emplace_back(bucket_sizes_for_dimension.size(), 0.0);
2396  }
2397  std::vector<std::future<void>> threads;
2398  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2399  threads.push_back(std::async(std::launch::async,
2400  compute_bucket_sizes_impl<2>,
2401  bucket_sizes_for_threads[thread_idx].data(),
2402  &join_column,
2403  &type_info,
2404  bucket_size_thresholds.data(),
2405  thread_idx,
2406  thread_count));
2407  }
2408  for (auto& child : threads) {
2409  child.get();
2410  }
2411 
2412  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2413  for (size_t i = 0; i < bucket_sizes_for_dimension.size(); i++) {
2414  if (bucket_sizes_for_threads[thread_idx][i] > bucket_sizes_for_dimension[i]) {
2415  bucket_sizes_for_dimension[i] = bucket_sizes_for_threads[thread_idx][i];
2416  }
2417  }
2418  }
2419 }
2420 
2421 #endif // ifndef __CUDACC__
#define CHECK_EQ(x, y)
Definition: Logger.h:301
T * get_matching_baseline_hash_slot_at(int8_t *hash_buff, const uint32_t h, const T *key, const size_t key_component_count, const int64_t hash_entry_size)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
GLOBAL void SUFFIX() fill_row_ids_sharded_bucketized(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
size_t num_elems
DEVICE void fill_row_ids_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
DEVICE void fill_row_ids_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
__device__ double atomicMin(double *address, double val)
#define GLOBAL
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
DEVICE int SUFFIX() fill_hash_join_buff_bitwise_eq(OneToOnePerfectJoinHashTableFillFuncArgs const args, int32_t const cpu_thread_idx, int32_t const cpu_thread_count)
#define SUFFIX(name)
ALWAYS_INLINE DEVICE int SUFFIX() fill_hashtable_for_semi_join(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
Definition: JoinHashImpl.h:54
const int64_t null_val
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define CHECK_GT(x, y)
Definition: Logger.h:305
#define load_cst(ptr)
FORCE_INLINE DEVICE uint32_t MurmurHash1Impl(const void *key, int len, const uint32_t seed)
Definition: MurmurHash1Inl.h:6
void fill_one_to_many_hash_table(OneToManyPerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_count)
#define DEVICE
DEVICE int SUFFIX() fill_hash_join_buff(OneToOnePerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_bitwise_eq(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val)
Definition: JoinHashImpl.h:82
DEVICE auto fill_hash_join_buff_impl(OneToOnePerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, HASHTABLE_FILLING_FUNC filling_func)
const int64_t translated_null_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
Definition: JoinHashImpl.h:76
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
DEVICE NEVER_INLINE const T *SUFFIX() get_matching_baseline_hash_slot_readonly(const T *key, const size_t key_component_count, const T *composite_key_dict, const int64_t entry_count, const size_t key_size_in_bytes)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:66
future< Result > async(Fn &&fn, Args &&...args)
size_t num_shards
static constexpr int32_t INVALID_STR_ID
const BucketizedHashEntryInfo hash_entry_info
GLOBAL void SUFFIX() fill_row_ids_baseline(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const KEY_HANDLER *f, const int64_t num_elems, const bool for_window_framing, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
Iterates over the rows of a JoinColumn across multiple fragments/chunks.
int64_t bucket_normalization
DEVICE T SUFFIX() get_invalid_key()
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define mapd_cas(address, compare, val)
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
int64_t map_str_id_to_outer_dict(const int64_t inner_elem, const int64_t min_inner_elem, const int64_t min_outer_elem, const int64_t max_outer_elem, const int32_t *inner_to_outer_translation_map)
int bbox_intersect_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const BoundingBoxIntersectKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define cas_cst(ptr, expected, desired)
int bbox_intersect_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const BoundingBoxIntersectKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() approximate_distinct_tuples_impl(uint8_t *hll_buffer, int32_t *row_count_buffer, const uint32_t b, const int64_t num_elems, const KEY_HANDLER *f, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const int64_t max_val
#define UNLIKELY(x)
Definition: likely.h:25
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int range_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_sharded(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:140
int range_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_bucketized(OneToManyPerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
void approximate_distinct_tuples_bbox_intersect(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
const int64_t min_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:108
DEVICE int write_baseline_hash_slot(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
DEVICE int fill_hash_join_buff_sharded_impl(int32_t *buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, HASHTABLE_FILLING_FUNC filling_func)
#define NEVER_INLINE
void approximate_distinct_tuples_range(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const bool is_compressed, const int thread_count)
#define store_cst(ptr, val)
constexpr unsigned N
Definition: Utm.h:110
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const bool for_window_framing, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
ALWAYS_INLINE DEVICE int SUFFIX() fill_one_to_one_hashtable(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
Definition: JoinHashImpl.h:44
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:122
DEVICE NEVER_INLINE double SUFFIX() fixed_width_double_decode_noinline(const int8_t *byte_stream, const int64_t pos)
Definition: DecodersImpl.h:134
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(OneToOnePerfectJoinHashTableFillFuncArgs const args, int32_t const cpu_thread_idx, int32_t const cpu_thread_count)
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_baseline_hash_table(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const size_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
DEVICE int write_baseline_hash_slot_for_semi_join(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
__device__ double atomicMax(double *address, double val)
DEVICE FORCE_INLINE const int8_t * ptr() const
#define mapd_add(address, val)
GLOBAL void SUFFIX() count_matches_baseline(int32_t *count_buff, const T *composite_key_dict, const int64_t entry_count, const KEY_HANDLER *f, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_count, const bool for_window_framing, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func)
GLOBAL void SUFFIX() compute_bucket_sizes_impl(double *bucket_sizes_for_thread, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const size_t g_maximum_conditions_to_coalesce
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:90
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void fill_row_ids_for_window_framing_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
FORCE_INLINE DEVICE uint64_t MurmurHash64AImpl(const void *key, int len, uint64_t seed)
DEVICE void count_matches_impl(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)