OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
cuda_mapd_rt.cu
Go to the documentation of this file.
1 #include <cuda.h>
2 #include <float.h>
3 #include <stdint.h>
4 #include <stdio.h>
5 #include <limits>
6 #include "BufferCompaction.h"
7 #include "ExtensionFunctions.hpp"
8 #include "GpuRtConstants.h"
9 #include "HyperLogLogRank.h"
10 
11 #if CUDA_VERSION < 10000
12 static_assert(false, "CUDA v10.0 or later is required.");
13 #endif
14 
15 #if (defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 350)
16 static_assert(false, "CUDA Compute Capability of 3.5 or greater is required.");
17 #endif
18 
19 extern "C" __device__ int64_t get_thread_index() {
20  return threadIdx.x;
21 }
22 
23 extern "C" __device__ int64_t get_block_index() {
24  return blockIdx.x;
25 }
26 
27 extern "C" __device__ int32_t pos_start_impl(const int32_t* row_index_resume) {
28  return blockIdx.x * blockDim.x + threadIdx.x;
29 }
30 
31 extern "C" __device__ int32_t group_buff_idx_impl() {
32  return pos_start_impl(NULL);
33 }
34 
35 extern "C" __device__ int32_t pos_step_impl() {
36  return blockDim.x * gridDim.x;
37 }
38 
39 extern "C" __device__ int8_t thread_warp_idx(const int8_t warp_sz) {
40  return threadIdx.x % warp_sz;
41 }
42 
43 extern "C" __device__ const int64_t* init_shared_mem_nop(
44  const int64_t* groups_buffer,
45  const int32_t groups_buffer_size) {
46  return groups_buffer;
47 }
48 
49 extern "C" __device__ void write_back_nop(int64_t* dest, int64_t* src, const int32_t sz) {
50 }
51 
52 /*
53  * Just declares and returns a dynamic shared memory pointer. Total size should be
54  * properly set during kernel launch
55  */
56 extern "C" __device__ int64_t* declare_dynamic_shared_memory() {
57  extern __shared__ int64_t shared_mem_buffer[];
58  return shared_mem_buffer;
59 }
60 
66 extern "C" __device__ const int64_t* init_shared_mem(const int64_t* global_groups_buffer,
67  const int32_t groups_buffer_size) {
68  // dynamic shared memory declaration
69  extern __shared__ int64_t shared_groups_buffer[];
70 
71  // it is assumed that buffer size is aligned with 64-bit units
72  // so it is safe to assign 64-bit to each thread
73  const int32_t buffer_units = groups_buffer_size >> 3;
74 
75  for (int32_t pos = threadIdx.x; pos < buffer_units; pos += blockDim.x) {
76  shared_groups_buffer[pos] = global_groups_buffer[pos];
77  }
78  __syncthreads();
79  return shared_groups_buffer;
80 }
81 
82 #define init_group_by_buffer_gpu_impl init_group_by_buffer_gpu
83 
84 #include "GpuInitGroups.cu"
85 
86 #undef init_group_by_buffer_gpu_impl
87 
88 // Dynamic watchdog: monitoring up to 64 SMs. E.g. GP100 config may have 60:
89 // 6 Graphics Processing Clusters (GPCs) * 10 Streaming Multiprocessors
90 // TODO(Saman): move these into a kernel parameter, allocated and initialized through CUDA
91 __device__ int64_t dw_sm_cycle_start[128]; // Set from host before launching the kernel
92 // TODO(Saman): make this cycle budget something constant in codegen level
93 __device__ int64_t dw_cycle_budget = 0; // Set from host before launching the kernel
94 __device__ int32_t dw_abort = 0; // TBD: set from host (async)
95 __device__ int32_t runtime_interrupt_flag = 0;
96 
97 __inline__ __device__ uint32_t get_smid(void) {
98  uint32_t ret;
99  asm("mov.u32 %0, %%smid;" : "=r"(ret));
100  return ret;
101 }
102 
103 /*
104  * The main objective of this function is to return true, if any of the following two
105  * scenarios happen:
106  * 1. receives a host request for aborting the kernel execution
107  * 2. kernel execution takes longer clock cycles than it was initially allowed
108  * The assumption is that all (or none) threads within a block return true for the
109  * watchdog, and the first thread within each block compares the recorded clock cycles for
110  * its occupying SM with the allowed budget. It also assumes that all threads entering
111  * this function are active (no critical edge exposure)
112  * NOTE: dw_cycle_budget, dw_abort, and dw_sm_cycle_start[] are all variables in global
113  * memory scope.
114  */
115 extern "C" __device__ bool dynamic_watchdog() {
116  // check for dynamic watchdog, if triggered all threads return true
117  if (dw_cycle_budget == 0LL) {
118  return false; // Uninitialized watchdog can't check time
119  }
120  if (dw_abort == 1) {
121  return true; // Received host request to abort
122  }
123  uint32_t smid = get_smid();
124  if (smid >= 128) {
125  return false;
126  }
127  __shared__ volatile int64_t dw_block_cycle_start; // Thread block shared cycle start
128  __shared__ volatile bool
129  dw_should_terminate; // all threads within a block should return together if
130  // watchdog criteria is met
131 
132  // thread 0 either initializes or read the initial clock cycle, the result is stored
133  // into shared memory. Since all threads wihtin a block shares the same SM, there's no
134  // point in using more threads here.
135  if (threadIdx.x == 0) {
136  dw_block_cycle_start = 0LL;
137  int64_t cycle_count = static_cast<int64_t>(clock64());
138  // Make sure the block hasn't switched SMs
139  if (smid == get_smid()) {
140  dw_block_cycle_start = static_cast<int64_t>(
141  atomicCAS(reinterpret_cast<unsigned long long*>(&dw_sm_cycle_start[smid]),
142  0ULL,
143  static_cast<unsigned long long>(cycle_count)));
144  }
145 
146  int64_t cycles = cycle_count - dw_block_cycle_start;
147  if ((smid == get_smid()) && (dw_block_cycle_start > 0LL) &&
148  (cycles > dw_cycle_budget)) {
149  // Check if we're out of time on this particular SM
150  dw_should_terminate = true;
151  } else {
152  dw_should_terminate = false;
153  }
154  }
155  __syncthreads();
156  return dw_should_terminate;
157 }
158 
159 extern "C" __device__ bool check_interrupt() {
160  return (runtime_interrupt_flag == 1) ? true : false;
161 }
162 
163 template <typename T = unsigned long long>
164 inline __device__ T get_empty_key() {
165  return EMPTY_KEY_64;
166 }
167 
168 template <>
169 inline __device__ unsigned int get_empty_key() {
170  return EMPTY_KEY_32;
171 }
172 
173 template <typename T>
174 inline __device__ int64_t* get_matching_group_value(int64_t* groups_buffer,
175  const uint32_t h,
176  const T* key,
177  const uint32_t key_count,
178  const uint32_t row_size_quad) {
179  const T empty_key = get_empty_key<T>();
180  uint32_t off = h * row_size_quad;
181  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
182  {
183  const T old = atomicCAS(row_ptr, empty_key, *key);
184  if (empty_key == old && key_count > 1) {
185  for (size_t i = 1; i <= key_count - 1; ++i) {
186  atomicExch(row_ptr + i, key[i]);
187  }
188  }
189  }
190  if (key_count > 1) {
191  while (atomicAdd(row_ptr + key_count - 1, 0) == empty_key) {
192  // spin until the winning thread has finished writing the entire key and the init
193  // value
194  }
195  }
196  bool match = true;
197  for (uint32_t i = 0; i < key_count; ++i) {
198  if (row_ptr[i] != key[i]) {
199  match = false;
200  break;
201  }
202  }
203 
204  if (match) {
205  auto row_ptr_i8 = reinterpret_cast<int8_t*>(row_ptr + key_count);
206  return reinterpret_cast<int64_t*>(align_to_int64(row_ptr_i8));
207  }
208  return NULL;
209 }
210 
211 extern "C" __device__ int64_t* get_matching_group_value(int64_t* groups_buffer,
212  const uint32_t h,
213  const int64_t* key,
214  const uint32_t key_count,
215  const uint32_t key_width,
216  const uint32_t row_size_quad) {
217  switch (key_width) {
218  case 4:
219  return get_matching_group_value(groups_buffer,
220  h,
221  reinterpret_cast<const unsigned int*>(key),
222  key_count,
223  row_size_quad);
224  case 8:
225  return get_matching_group_value(groups_buffer,
226  h,
227  reinterpret_cast<const unsigned long long*>(key),
228  key_count,
229  row_size_quad);
230  default:
231  return NULL;
232  }
233 }
234 
235 template <typename T>
236 __device__ int32_t get_matching_group_value_columnar_slot(int64_t* groups_buffer,
237  const uint32_t entry_count,
238  const uint32_t h,
239  const T* key,
240  const uint32_t key_count) {
241  const T empty_key = get_empty_key<T>();
242  const uint64_t old =
243  atomicCAS(reinterpret_cast<T*>(groups_buffer + h), empty_key, *key);
244  // the winner thread proceeds with writing the rest fo the keys
245  if (old == empty_key) {
246  uint32_t offset = h + entry_count;
247  for (size_t i = 1; i < key_count; ++i) {
248  *reinterpret_cast<T*>(groups_buffer + offset) = key[i];
249  offset += entry_count;
250  }
251  }
252 
253  __threadfence();
254  // for all threads except the winning thread, memory content of the keys
255  // related to the hash offset are checked again. In case of a complete match
256  // the hash offset is returned, otherwise -1 is returned
257  if (old != empty_key) {
258  uint32_t offset = h;
259  for (uint32_t i = 0; i < key_count; ++i) {
260  if (*reinterpret_cast<T*>(groups_buffer + offset) != key[i]) {
261  return -1;
262  }
263  offset += entry_count;
264  }
265  }
266  return h;
267 }
268 
269 extern "C" __device__ int32_t
271  const uint32_t entry_count,
272  const uint32_t h,
273  const int64_t* key,
274  const uint32_t key_count,
275  const uint32_t key_width) {
276  switch (key_width) {
277  case 4:
279  groups_buffer,
280  entry_count,
281  h,
282  reinterpret_cast<const unsigned int*>(key),
283  key_count);
284  case 8:
286  groups_buffer,
287  entry_count,
288  h,
289  reinterpret_cast<const unsigned long long*>(key),
290  key_count);
291  default:
292  return -1;
293  }
294 }
295 
296 extern "C" __device__ int64_t* get_matching_group_value_columnar(
297  int64_t* groups_buffer,
298  const uint32_t h,
299  const int64_t* key,
300  const uint32_t key_qw_count,
301  const size_t entry_count) {
302  uint32_t off = h;
303  {
304  const uint64_t old = atomicCAS(
305  reinterpret_cast<unsigned long long*>(groups_buffer + off), EMPTY_KEY_64, *key);
306  if (EMPTY_KEY_64 == old) {
307  for (size_t i = 0; i < key_qw_count; ++i) {
308  groups_buffer[off] = key[i];
309  off += entry_count;
310  }
311  return &groups_buffer[off];
312  }
313  }
314  __syncthreads();
315  off = h;
316  for (size_t i = 0; i < key_qw_count; ++i) {
317  if (groups_buffer[off] != key[i]) {
318  return NULL;
319  }
320  off += entry_count;
321  }
322  return &groups_buffer[off];
323 }
324 
325 #include "GroupByRuntime.cpp"
327 #include "MurmurHash.cpp"
328 #include "TopKRuntime.cpp"
329 
330 __device__ int64_t atomicMax64(int64_t* address, int64_t val) {
331  unsigned long long int* address_as_ull = (unsigned long long int*)address;
332  unsigned long long int old = *address_as_ull, assumed;
333 
334  do {
335  assumed = old;
336  old = atomicCAS(address_as_ull, assumed, max((long long)val, (long long)assumed));
337  } while (assumed != old);
338 
339  return old;
340 }
341 
342 __device__ int64_t atomicMin64(int64_t* address, int64_t val) {
343  unsigned long long int* address_as_ull = (unsigned long long int*)address;
344  unsigned long long int old = *address_as_ull, assumed;
345 
346  do {
347  assumed = old;
348  old = atomicCAS(address_as_ull, assumed, min((long long)val, (long long)assumed));
349  } while (assumed != old);
350 
351  return old;
352 }
353 
354 #if (defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 600)
355 __device__ double atomicAdd(double* address, double val) {
356  unsigned long long int* address_as_ull = (unsigned long long int*)address;
357  unsigned long long int old = *address_as_ull, assumed;
358 
359  do {
360  assumed = old;
361  old = atomicCAS(address_as_ull,
362  assumed,
363  __double_as_longlong(val + __longlong_as_double(assumed)));
364 
365  // Note: uses integer comparison to avoid hang in case of NaN (since NaN != NaN)
366  } while (assumed != old);
367 
368  return __longlong_as_double(old);
369 }
370 #endif
371 
372 __device__ double atomicMax(double* address, double val) {
373  unsigned long long int* address_as_ull = (unsigned long long int*)address;
374  unsigned long long int old = *address_as_ull, assumed;
375 
376  do {
377  assumed = old;
378  old = atomicCAS(address_as_ull,
379  assumed,
380  __double_as_longlong(max(val, __longlong_as_double(assumed))));
381 
382  // Note: uses integer comparison to avoid hang in case of NaN (since NaN != NaN)
383  } while (assumed != old);
384 
385  return __longlong_as_double(old);
386 }
387 
388 __device__ float atomicMax(float* address, float val) {
389  int* address_as_int = (int*)address;
390  int old = *address_as_int, assumed;
391 
392  do {
393  assumed = old;
394  old = atomicCAS(
395  address_as_int, assumed, __float_as_int(max(val, __int_as_float(assumed))));
396 
397  // Note: uses integer comparison to avoid hang in case of NaN (since NaN != NaN)
398  } while (assumed != old);
399 
400  return __int_as_float(old);
401 }
402 
403 __device__ double atomicMin(double* address, double val) {
404  unsigned long long int* address_as_ull = (unsigned long long int*)address;
405  unsigned long long int old = *address_as_ull, assumed;
406 
407  do {
408  assumed = old;
409  old = atomicCAS(address_as_ull,
410  assumed,
411  __double_as_longlong(min(val, __longlong_as_double(assumed))));
412  } while (assumed != old);
413 
414  return __longlong_as_double(old);
415 }
416 
417 __device__ double atomicMin(float* address, float val) {
418  int* address_as_ull = (int*)address;
419  int old = *address_as_ull, assumed;
420 
421  do {
422  assumed = old;
423  old = atomicCAS(
424  address_as_ull, assumed, __float_as_int(min(val, __int_as_float(assumed))));
425  } while (assumed != old);
426 
427  return __int_as_float(old);
428 }
429 
430 extern "C" __device__ uint64_t agg_count_shared(uint64_t* agg, const int64_t val) {
431  return static_cast<uint64_t>(atomicAdd(reinterpret_cast<uint32_t*>(agg), 1U));
432 }
433 
434 extern "C" __device__ uint64_t agg_count_if_shared(uint64_t* agg, const int64_t cond) {
435  return cond ? static_cast<uint64_t>(atomicAdd(reinterpret_cast<uint32_t*>(agg), 1U))
436  : static_cast<uint64_t>(*(reinterpret_cast<uint32_t*>(agg)));
437 }
438 
439 extern "C" __device__ uint32_t agg_count_int32_shared(uint32_t* agg, const int32_t val) {
440  return atomicAdd(agg, 1U);
441 }
442 
443 extern "C" __device__ uint32_t agg_count_if_int32_shared(uint32_t* agg,
444  const int32_t cond) {
445  return cond ? atomicAdd(agg, 1U) : *agg;
446 }
447 
448 extern "C" __device__ uint64_t agg_count_double_shared(uint64_t* agg, const double val) {
449  return agg_count_shared(agg, val);
450 }
451 
452 extern "C" __device__ uint32_t agg_count_float_shared(uint32_t* agg, const float val) {
453  return agg_count_int32_shared(agg, val);
454 }
455 
456 extern "C" __device__ int64_t agg_sum_shared(int64_t* agg, const int64_t val) {
457  return atomicAdd(reinterpret_cast<unsigned long long*>(agg), val);
458 }
459 
460 extern "C" __device__ int32_t agg_sum_int32_shared(int32_t* agg, const int32_t val) {
461  return atomicAdd(agg, val);
462 }
463 
464 extern "C" __device__ void agg_sum_float_shared(int32_t* agg, const float val) {
465  atomicAdd(reinterpret_cast<float*>(agg), val);
466 }
467 
468 extern "C" __device__ void agg_sum_double_shared(int64_t* agg, const double val) {
469  atomicAdd(reinterpret_cast<double*>(agg), val);
470 }
471 
472 extern "C" __device__ int64_t agg_sum_if_shared(int64_t* agg,
473  const int64_t val,
474  const int8_t cond) {
475  static_assert(sizeof(int64_t) == sizeof(unsigned long long));
476  if (cond) {
477  return atomicAdd(reinterpret_cast<unsigned long long*>(agg), val);
478  }
479  return *agg;
480 }
481 
482 extern "C" __device__ int32_t agg_sum_if_int32_shared(int32_t* agg,
483  const int32_t val,
484  const int8_t cond) {
485  if (cond) {
486  return atomicAdd(agg, val);
487  }
488  return *agg;
489 }
490 
491 extern "C" __device__ void agg_sum_if_float_shared(int32_t* agg,
492  const float val,
493  const int8_t cond) {
494  if (cond) {
495  atomicAdd(reinterpret_cast<float*>(agg), val);
496  }
497 }
498 
499 extern "C" __device__ void agg_sum_if_double_shared(int64_t* agg,
500  const double val,
501  const int8_t cond) {
502  if (cond) {
503  atomicAdd(reinterpret_cast<double*>(agg), val);
504  }
505 }
506 
507 extern "C" __device__ void agg_max_shared(int64_t* agg, const int64_t val) {
508  atomicMax64(agg, val);
509 }
510 
511 extern "C" __device__ void agg_max_int32_shared(int32_t* agg, const int32_t val) {
512  atomicMax(agg, val);
513 }
514 
515 extern "C" __device__ void agg_max_double_shared(int64_t* agg, const double val) {
516  atomicMax(reinterpret_cast<double*>(agg), val);
517 }
518 
519 extern "C" __device__ void agg_max_float_shared(int32_t* agg, const float val) {
520  atomicMax(reinterpret_cast<float*>(agg), val);
521 }
522 
523 extern "C" __device__ void agg_min_shared(int64_t* agg, const int64_t val) {
524  atomicMin64(agg, val);
525 }
526 
527 extern "C" __device__ void agg_min_int32_shared(int32_t* agg, const int32_t val) {
528  atomicMin(agg, val);
529 }
530 
531 #if CUDA_VERSION > 10000 && defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700
532 __device__ void atomicMax16(int16_t* agg, const int16_t val) {
533  unsigned short int* address_as_us = reinterpret_cast<unsigned short int*>(agg);
534  unsigned short int old = *address_as_us, assumed;
535 
536  do {
537  assumed = old;
538  old = atomicCAS(address_as_us,
539  assumed,
540  static_cast<unsigned short>(max(static_cast<short int>(val),
541  static_cast<short int>(assumed))));
542  } while (assumed != old);
543 }
544 #else
545 __device__ void atomicMax16(int16_t* agg, const int16_t val) {
546  // properly align the input pointer:
547  unsigned int* base_address_u32 =
548  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
549 
550  unsigned int old_value = *base_address_u32;
551  unsigned int swap_value, compare_value;
552  do {
553  compare_value = old_value;
554  swap_value =
555  (reinterpret_cast<size_t>(agg) & 0x2)
556  ? static_cast<unsigned int>(max(static_cast<int16_t>(old_value >> 16), val))
557  << 16 |
558  (old_value & 0xFFFF)
559  : (old_value & 0xFFFF0000) |
560  static_cast<unsigned int>(
561  max(static_cast<int16_t>(old_value & 0xFFFF), val));
562  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
563  } while (old_value != compare_value);
564 }
565 #endif
566 
567 __device__ void atomicMax8(int8_t* agg, const int8_t val) {
568  // properly align the input pointer:
569  unsigned int* base_address_u32 =
570  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
571 
572  // __byte_perm(unsigned int A, unsigned int B, unsigned int s):
573  // if s == 0x3214 returns {A[31..24], A[23..16], A[15..8], B[7..0]}
574  // if s == 0x3240 returns {A[31..24], A[23..16], B[7...0], A[7..0]}
575  // if s == 0x3410 returns {A[31..24], B[7....0], A[15..8], A[7..0]}
576  // if s == 0x4210 returns {B[7....0], A[23..16], A[15..8], A[7..0]}
577  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
578  unsigned int old_value = *base_address_u32;
579  unsigned int swap_value, compare_value;
580  do {
581  compare_value = old_value;
582  auto max_value = static_cast<unsigned int>(
583  // compare val with its corresponding bits in the compare_value
584  max(val,
585  static_cast<int8_t>(__byte_perm(
586  compare_value, 0, (reinterpret_cast<size_t>(agg) & 0x3) | 0x4440))));
587  swap_value = __byte_perm(
588  compare_value, max_value, byte_permutations[reinterpret_cast<size_t>(agg) & 0x3]);
589  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
590  } while (compare_value != old_value);
591 }
592 
593 #if CUDA_VERSION > 10000 && defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700
594 __device__ void atomicMin16(int16_t* agg, const int16_t val) {
595  unsigned short int* address_as_us = reinterpret_cast<unsigned short int*>(agg);
596  unsigned short int old = *address_as_us, assumed;
597 
598  do {
599  assumed = old;
600  old = atomicCAS(address_as_us,
601  assumed,
602  static_cast<unsigned short>(min(static_cast<short int>(val),
603  static_cast<short int>(assumed))));
604  } while (assumed != old);
605 }
606 #else
607 __device__ void atomicMin16(int16_t* agg, const int16_t val) {
608  // properly align the input pointer:
609  unsigned int* base_address_u32 =
610  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
611 
612  unsigned int old_value = *base_address_u32;
613  unsigned int swap_value, compare_value;
614  do {
615  compare_value = old_value;
616  swap_value =
617  (reinterpret_cast<size_t>(agg) & 0x2)
618  ? static_cast<unsigned int>(min(static_cast<int16_t>(old_value >> 16), val))
619  << 16 |
620  (old_value & 0xFFFF)
621  : (old_value & 0xFFFF0000) |
622  static_cast<unsigned int>(
623  min(static_cast<int16_t>(old_value & 0xFFFF), val));
624  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
625  } while (old_value != compare_value);
626 }
627 #endif
628 
629 __device__ void atomicMin16SkipVal(int16_t* agg,
630  const int16_t val,
631  const int16_t skip_val) {
632  // properly align the input pointer:
633  unsigned int* base_address_u32 =
634  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
635 
636  unsigned int old_value = *base_address_u32;
637  unsigned int swap_value, compare_value;
638  do {
639  compare_value = old_value;
640  int16_t selected_old_val = (reinterpret_cast<size_t>(agg) & 0x2)
641  ? static_cast<int16_t>(old_value >> 16)
642  : static_cast<int16_t>(old_value & 0xFFFF);
643 
644  swap_value =
645  (reinterpret_cast<size_t>(agg) & 0x2)
646  ? static_cast<unsigned int>(
647  selected_old_val == skip_val ? val : min(selected_old_val, val))
648  << 16 |
649  (old_value & 0xFFFF)
650  : (old_value & 0xFFFF0000) |
651  static_cast<unsigned int>(
652  selected_old_val == skip_val ? val : min(selected_old_val, val));
653  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
654  } while (old_value != compare_value);
655 }
656 
657 __device__ void atomicMin8(int8_t* agg, const int8_t val) {
658  // properly align the input pointer:
659  unsigned int* base_address_u32 =
660  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
661 
662  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
663  unsigned int old_value = *base_address_u32;
664  unsigned int swap_value, compare_value;
665  do {
666  compare_value = old_value;
667  auto min_value = static_cast<unsigned int>(
668  min(val,
669  static_cast<int8_t>(__byte_perm(
670  compare_value, 0, (reinterpret_cast<size_t>(agg) & 0x3) | 0x4440))));
671  swap_value = __byte_perm(
672  compare_value, min_value, byte_permutations[reinterpret_cast<size_t>(agg) & 0x3]);
673  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
674  } while (compare_value != old_value);
675 }
676 
677 __device__ void atomicMin8SkipVal(int8_t* agg, const int8_t val, const int8_t skip_val) {
678  // properly align the input pointer:
679  unsigned int* base_address_u32 =
680  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(agg) & ~0x3);
681 
682  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
683  unsigned int old_value = *base_address_u32;
684  unsigned int swap_value, compare_value;
685  do {
686  compare_value = old_value;
687  int8_t selected_old_val = static_cast<int8_t>(
688  __byte_perm(compare_value, 0, (reinterpret_cast<size_t>(agg) & 0x3) | 0x4440));
689  auto min_value = static_cast<unsigned int>(
690  selected_old_val == skip_val ? val : min(val, selected_old_val));
691  swap_value = __byte_perm(
692  compare_value, min_value, byte_permutations[reinterpret_cast<size_t>(agg) & 0x3]);
693  old_value = atomicCAS(base_address_u32, compare_value, swap_value);
694  } while (compare_value != old_value);
695 }
696 
697 extern "C" __device__ void agg_max_int16_shared(int16_t* agg, const int16_t val) {
698  return atomicMax16(agg, val);
699 }
700 
701 extern "C" __device__ void agg_max_int8_shared(int8_t* agg, const int8_t val) {
702  return atomicMax8(agg, val);
703 }
704 
705 extern "C" __device__ void agg_min_int16_shared(int16_t* agg, const int16_t val) {
706  return atomicMin16(agg, val);
707 }
708 
709 extern "C" __device__ void agg_min_int8_shared(int8_t* agg, const int8_t val) {
710  return atomicMin8(agg, val);
711 }
712 
713 extern "C" __device__ void agg_min_double_shared(int64_t* agg, const double val) {
714  atomicMin(reinterpret_cast<double*>(agg), val);
715 }
716 
717 extern "C" __device__ void agg_min_float_shared(int32_t* agg, const float val) {
718  atomicMin(reinterpret_cast<float*>(agg), val);
719 }
720 
721 extern "C" __device__ void agg_id_shared(int64_t* agg, const int64_t val) {
722  *agg = val;
723 }
724 
725 extern "C" __device__ int8_t* agg_id_varlen_shared(int8_t* varlen_buffer,
726  const int64_t offset,
727  const int8_t* value,
728  const int64_t size_bytes) {
729  for (auto i = 0; i < size_bytes; i++) {
730  varlen_buffer[offset + i] = value[i];
731  }
732  return &varlen_buffer[offset];
733 }
734 
735 extern "C" __device__ int32_t checked_single_agg_id_shared(int64_t* agg,
736  const int64_t val,
737  const int64_t null_val) {
738  unsigned long long int* address_as_ull = reinterpret_cast<unsigned long long int*>(agg);
739  unsigned long long int old = *address_as_ull, assumed;
740 
741  if (val == null_val) {
742  return 0;
743  }
744 
745  do {
746  if (static_cast<int64_t>(old) != null_val) {
747  if (static_cast<int64_t>(old) != val) {
748  // see Execute::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
749  return 15;
750  } else {
751  break;
752  }
753  }
754 
755  assumed = old;
756  old = atomicCAS(address_as_ull, assumed, val);
757  } while (assumed != old);
758 
759  return 0;
760 }
761 
762 #define DEF_AGG_ID_INT_SHARED(n) \
763  extern "C" __device__ void agg_id_int##n##_shared(int##n##_t* agg, \
764  const int##n##_t val) { \
765  *agg = val; \
766  }
767 
771 
772 #undef DEF_AGG_ID_INT_SHARED
773 
774 extern "C" __device__ void agg_id_double_shared(int64_t* agg, const double val) {
775  *agg = *(reinterpret_cast<const int64_t*>(&val));
776 }
777 
778 extern "C" __device__ int32_t checked_single_agg_id_double_shared(int64_t* agg,
779  const double val,
780  const double null_val) {
781  unsigned long long int* address_as_ull = reinterpret_cast<unsigned long long int*>(agg);
782  unsigned long long int old = *address_as_ull, assumed;
783 
784  if (val == null_val) {
785  return 0;
786  }
787 
788  do {
789  if (static_cast<int64_t>(old) != __double_as_longlong(null_val)) {
790  if (static_cast<int64_t>(old) != __double_as_longlong(val)) {
791  // see Execute::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
792  return 15;
793  } else {
794  break;
795  }
796  }
797 
798  assumed = old;
799  old = atomicCAS(address_as_ull, assumed, __double_as_longlong(val));
800  } while (assumed != old);
801 
802  return 0;
803 }
804 
805 extern "C" __device__ void agg_id_double_shared_slow(int64_t* agg, const double* val) {
806  *agg = *(reinterpret_cast<const int64_t*>(val));
807 }
808 
809 extern "C" __device__ int32_t
811  const double* valp,
812  const double null_val) {
813  unsigned long long int* address_as_ull = reinterpret_cast<unsigned long long int*>(agg);
814  unsigned long long int old = *address_as_ull, assumed;
815  double val = *valp;
816 
817  if (val == null_val) {
818  return 0;
819  }
820 
821  do {
822  if (static_cast<int64_t>(old) != __double_as_longlong(null_val)) {
823  if (static_cast<int64_t>(old) != __double_as_longlong(val)) {
824  // see Execute::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
825  return 15;
826  } else {
827  break;
828  }
829  }
830 
831  assumed = old;
832  old = atomicCAS(address_as_ull, assumed, __double_as_longlong(val));
833  } while (assumed != old);
834 
835  return 0;
836 }
837 
838 extern "C" __device__ void agg_id_float_shared(int32_t* agg, const float val) {
839  *agg = __float_as_int(val);
840 }
841 
842 extern "C" __device__ int32_t checked_single_agg_id_float_shared(int32_t* agg,
843  const float val,
844  const float null_val) {
845  int* address_as_ull = reinterpret_cast<int*>(agg);
846  int old = *address_as_ull, assumed;
847 
848  if (val == null_val) {
849  return 0;
850  }
851 
852  do {
853  if (old != __float_as_int(null_val)) {
854  if (old != __float_as_int(val)) {
855  // see Execute::ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
856  return 15;
857  } else {
858  break;
859  }
860  }
861 
862  assumed = old;
863  old = atomicCAS(address_as_ull, assumed, __float_as_int(val));
864  } while (assumed != old);
865 
866  return 0;
867 }
868 
869 #define DEF_SKIP_AGG(base_agg_func) \
870  extern "C" __device__ ADDR_T base_agg_func##_skip_val_shared( \
871  ADDR_T* agg, const DATA_T val, const DATA_T skip_val) { \
872  if (val != skip_val) { \
873  return base_agg_func##_shared(agg, val); \
874  } \
875  return 0; \
876  }
877 
878 #define DATA_T int64_t
879 #define ADDR_T uint64_t
882 #undef DATA_T
883 #undef ADDR_T
884 
885 #define DATA_T int32_t
886 #define ADDR_T uint32_t
889 #undef DATA_T
890 #undef ADDR_T
891 
892 // Initial value for nullable column is INT32_MIN
893 extern "C" __device__ void agg_max_int32_skip_val_shared(int32_t* agg,
894  const int32_t val,
895  const int32_t skip_val) {
896  if (val != skip_val) {
897  agg_max_int32_shared(agg, val);
898  }
899 }
900 
901 extern "C" __device__ void agg_max_int16_skip_val_shared(int16_t* agg,
902  const int16_t val,
903  const int16_t skip_val) {
904  if (val != skip_val) {
905  agg_max_int16_shared(agg, val);
906  }
907 }
908 
909 extern "C" __device__ void agg_min_int16_skip_val_shared(int16_t* agg,
910  const int16_t val,
911  const int16_t skip_val) {
912  if (val != skip_val) {
913  atomicMin16SkipVal(agg, val, skip_val);
914  }
915 }
916 
917 extern "C" __device__ void agg_max_int8_skip_val_shared(int8_t* agg,
918  const int8_t val,
919  const int8_t skip_val) {
920  if (val != skip_val) {
921  agg_max_int8_shared(agg, val);
922  }
923 }
924 
925 extern "C" __device__ void agg_min_int8_skip_val_shared(int8_t* agg,
926  const int8_t val,
927  const int8_t skip_val) {
928  if (val != skip_val) {
929  atomicMin8SkipVal(agg, val, skip_val);
930  }
931 }
932 
933 __device__ int32_t atomicMin32SkipVal(int32_t* address,
934  int32_t val,
935  const int32_t skip_val) {
936  int32_t old = atomicExch(address, INT_MAX);
937  return atomicMin(address, old == skip_val ? val : min(old, val));
938 }
939 
940 extern "C" __device__ void agg_min_int32_skip_val_shared(int32_t* agg,
941  const int32_t val,
942  const int32_t skip_val) {
943  if (val != skip_val) {
944  atomicMin32SkipVal(agg, val, skip_val);
945  }
946 }
947 
948 __device__ int32_t atomicSum32SkipVal(int32_t* address,
949  const int32_t val,
950  const int32_t skip_val) {
951  unsigned int* address_as_int = (unsigned int*)address;
952  int32_t old = atomicExch(address_as_int, 0);
953  int32_t old2 = atomicAdd(address_as_int, old == skip_val ? val : (val + old));
954  return old == skip_val ? old2 : (old2 + old);
955 }
956 
957 extern "C" __device__ int32_t agg_sum_int32_skip_val_shared(int32_t* agg,
958  const int32_t val,
959  const int32_t skip_val) {
960  if (val != skip_val) {
961  const int32_t old = atomicSum32SkipVal(agg, val, skip_val);
962  return old;
963  }
964  return 0;
965 }
966 
967 extern "C" __device__ int32_t agg_sum_if_int32_skip_val_shared(int32_t* agg,
968  const int32_t val,
969  const int32_t skip_val,
970  const int8_t cond) {
971  return cond ? agg_sum_int32_skip_val_shared(agg, val, skip_val) : *agg;
972 }
973 
974 __device__ int64_t atomicSum64SkipVal(int64_t* address,
975  const int64_t val,
976  const int64_t skip_val) {
977  unsigned long long int* address_as_ull = (unsigned long long int*)address;
978  int64_t old = atomicExch(address_as_ull, 0);
979  int64_t old2 = atomicAdd(address_as_ull, old == skip_val ? val : (val + old));
980  return old == skip_val ? old2 : (old2 + old);
981 }
982 
983 extern "C" __device__ int64_t agg_sum_skip_val_shared(int64_t* agg,
984  const int64_t val,
985  const int64_t skip_val) {
986  if (val != skip_val) {
987  return atomicSum64SkipVal(agg, val, skip_val);
988  }
989  return 0;
990 }
991 
992 extern "C" __device__ int64_t agg_sum_if_skip_val_shared(int64_t* agg,
993  const int64_t val,
994  const int64_t skip_val,
995  const int8_t cond) {
996  return cond ? agg_sum_skip_val_shared(agg, val, skip_val) : *agg;
997 }
998 
999 __device__ int64_t atomicMin64SkipVal(int64_t* address,
1000  int64_t val,
1001  const int64_t skip_val) {
1002  unsigned long long int* address_as_ull =
1003  reinterpret_cast<unsigned long long int*>(address);
1004  unsigned long long int old = *address_as_ull, assumed;
1005 
1006  do {
1007  assumed = old;
1008  old = atomicCAS(address_as_ull,
1009  assumed,
1010  assumed == skip_val ? val : min((long long)val, (long long)assumed));
1011  } while (assumed != old);
1012 
1013  return old;
1014 }
1015 
1016 extern "C" __device__ void agg_min_skip_val_shared(int64_t* agg,
1017  const int64_t val,
1018  const int64_t skip_val) {
1019  if (val != skip_val) {
1020  atomicMin64SkipVal(agg, val, skip_val);
1021  }
1022 }
1023 
1024 __device__ int64_t atomicMax64SkipVal(int64_t* address,
1025  int64_t val,
1026  const int64_t skip_val) {
1027  unsigned long long int* address_as_ull =
1028  reinterpret_cast<unsigned long long int*>(address);
1029  unsigned long long int old = *address_as_ull, assumed;
1030 
1031  do {
1032  assumed = old;
1033  old = atomicCAS(address_as_ull,
1034  assumed,
1035  assumed == skip_val ? val : max((long long)val, (long long)assumed));
1036  } while (assumed != old);
1037 
1038  return old;
1039 }
1040 
1041 extern "C" __device__ void agg_max_skip_val_shared(int64_t* agg,
1042  const int64_t val,
1043  const int64_t skip_val) {
1044  if (val != skip_val) {
1045  atomicMax64SkipVal(agg, val, skip_val);
1046  }
1047 }
1048 
1049 #undef DEF_SKIP_AGG
1050 #define DEF_SKIP_AGG(base_agg_func) \
1051  extern "C" __device__ ADDR_T base_agg_func##_skip_val_shared( \
1052  ADDR_T* agg, const DATA_T val, const DATA_T skip_val) { \
1053  if (val != skip_val) { \
1054  return base_agg_func##_shared(agg, val); \
1055  } \
1056  return *agg; \
1057  }
1058 
1059 #define DATA_T double
1060 #define ADDR_T uint64_t
1062 #undef ADDR_T
1063 #undef DATA_T
1064 
1065 #define DATA_T float
1066 #define ADDR_T uint32_t
1068 #undef ADDR_T
1069 #undef DATA_T
1070 
1071 // Initial value for nullable column is FLOAT_MIN
1072 extern "C" __device__ void agg_max_float_skip_val_shared(int32_t* agg,
1073  const float val,
1074  const float skip_val) {
1075  if (__float_as_int(val) != __float_as_int(skip_val)) {
1076  float old = atomicExch(reinterpret_cast<float*>(agg), -FLT_MAX);
1077  atomicMax(reinterpret_cast<float*>(agg),
1078  __float_as_int(old) == __float_as_int(skip_val) ? val : fmaxf(old, val));
1079  }
1080 }
1081 
1082 __device__ float atomicMinFltSkipVal(int32_t* address, float val, const float skip_val) {
1083  float old = atomicExch(reinterpret_cast<float*>(address), FLT_MAX);
1084  return atomicMin(
1085  reinterpret_cast<float*>(address),
1086  __float_as_int(old) == __float_as_int(skip_val) ? val : fminf(old, val));
1087 }
1088 
1089 extern "C" __device__ void agg_min_float_skip_val_shared(int32_t* agg,
1090  const float val,
1091  const float skip_val) {
1092  if (__float_as_int(val) != __float_as_int(skip_val)) {
1093  atomicMinFltSkipVal(agg, val, skip_val);
1094  }
1095 }
1096 
1097 __device__ void atomicSumFltSkipVal(float* address,
1098  const float val,
1099  const float skip_val) {
1100  float old = atomicExch(address, 0.f);
1101  atomicAdd(address, __float_as_int(old) == __float_as_int(skip_val) ? val : (val + old));
1102 }
1103 
1104 extern "C" __device__ void agg_sum_float_skip_val_shared(int32_t* agg,
1105  const float val,
1106  const float skip_val) {
1107  if (__float_as_int(val) != __float_as_int(skip_val)) {
1108  atomicSumFltSkipVal(reinterpret_cast<float*>(agg), val, skip_val);
1109  }
1110 }
1111 
1112 extern "C" __device__ void agg_sum_if_float_skip_val_shared(int32_t* agg,
1113  const float val,
1114  const float skip_val,
1115  const int8_t cond) {
1116  if (cond) {
1117  agg_sum_float_skip_val_shared(agg, val, skip_val);
1118  }
1119 }
1120 
1121 __device__ void atomicSumDblSkipVal(double* address,
1122  const double val,
1123  const double skip_val) {
1124  unsigned long long int* address_as_ull = (unsigned long long int*)address;
1125  double old = __longlong_as_double(atomicExch(address_as_ull, __double_as_longlong(0.)));
1126  atomicAdd(
1127  address,
1128  __double_as_longlong(old) == __double_as_longlong(skip_val) ? val : (val + old));
1129 }
1130 
1131 extern "C" __device__ void agg_sum_double_skip_val_shared(int64_t* agg,
1132  const double val,
1133  const double skip_val) {
1134  if (__double_as_longlong(val) != __double_as_longlong(skip_val)) {
1135  atomicSumDblSkipVal(reinterpret_cast<double*>(agg), val, skip_val);
1136  }
1137 }
1138 
1139 extern "C" __device__ void agg_sum_if_double_skip_val_shared(int64_t* agg,
1140  const double val,
1141  const double skip_val,
1142  const int8_t cond) {
1143  if (cond) {
1144  agg_sum_double_skip_val_shared(agg, val, skip_val);
1145  }
1146 }
1147 
1148 __device__ double atomicMinDblSkipVal(double* address,
1149  double val,
1150  const double skip_val) {
1151  unsigned long long int* address_as_ull =
1152  reinterpret_cast<unsigned long long int*>(address);
1153  unsigned long long int old = *address_as_ull;
1154  unsigned long long int skip_val_as_ull =
1155  *reinterpret_cast<const unsigned long long*>(&skip_val);
1156  unsigned long long int assumed;
1157 
1158  do {
1159  assumed = old;
1160  old = atomicCAS(address_as_ull,
1161  assumed,
1162  assumed == skip_val_as_ull
1163  ? *reinterpret_cast<unsigned long long*>(&val)
1164  : __double_as_longlong(min(val, __longlong_as_double(assumed))));
1165  } while (assumed != old);
1166 
1167  return __longlong_as_double(old);
1168 }
1169 
1170 extern "C" __device__ void agg_min_double_skip_val_shared(int64_t* agg,
1171  const double val,
1172  const double skip_val) {
1173  if (val != skip_val) {
1174  atomicMinDblSkipVal(reinterpret_cast<double*>(agg), val, skip_val);
1175  }
1176 }
1177 
1178 extern "C" __device__ void agg_max_double_skip_val_shared(int64_t* agg,
1179  const double val,
1180  const double skip_val) {
1181  if (__double_as_longlong(val) != __double_as_longlong(skip_val)) {
1182  double old = __longlong_as_double(atomicExch(
1183  reinterpret_cast<unsigned long long int*>(agg), __double_as_longlong(-DBL_MAX)));
1184  atomicMax(reinterpret_cast<double*>(agg),
1185  __double_as_longlong(old) == __double_as_longlong(skip_val)
1186  ? val
1187  : fmax(old, val));
1188  }
1189 }
1190 
1191 #undef DEF_SKIP_AGG
1192 
1193 extern "C" __device__ bool slotEmptyKeyCAS(int64_t* slot,
1194  int64_t new_val,
1195  int64_t init_val) {
1196  auto slot_address = reinterpret_cast<unsigned long long int*>(slot);
1197  const auto empty_key =
1198  static_cast<unsigned long long int*>(static_cast<void*>(&init_val));
1199  const auto new_val_cast =
1200  static_cast<unsigned long long int*>(static_cast<void*>(&new_val));
1201 
1202  const auto old_val = atomicCAS(slot_address, *empty_key, *new_val_cast);
1203  if (old_val == *empty_key) {
1204  return true;
1205  } else {
1206  return false;
1207  }
1208 }
1209 
1210 extern "C" __device__ bool slotEmptyKeyCAS_int32(int32_t* slot,
1211  int32_t new_val,
1212  int32_t init_val) {
1213  unsigned int* slot_address = reinterpret_cast<unsigned int*>(slot);
1214  unsigned int compare_value = static_cast<unsigned int>(init_val);
1215  unsigned int swap_value = static_cast<unsigned int>(new_val);
1216 
1217  const unsigned int old_value = atomicCAS(slot_address, compare_value, swap_value);
1218  return old_value == compare_value;
1219 }
1220 
1221 extern "C" __device__ bool slotEmptyKeyCAS_int16(int16_t* slot,
1222  int16_t new_val,
1223  int16_t init_val) {
1224  unsigned int* base_slot_address =
1225  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(slot) & ~0x3);
1226  unsigned int old_value = *base_slot_address;
1227  unsigned int swap_value, compare_value;
1228  do {
1229  compare_value = old_value;
1230  // exit criteria: if init_val does not exist in the slot (some other thread has
1231  // succeeded)
1232  if (static_cast<unsigned int>(init_val) !=
1233  __byte_perm(
1234  compare_value, 0, (reinterpret_cast<size_t>(slot) & 0x2 ? 0x3244 : 0x4410))) {
1235  return false;
1236  }
1237  swap_value = __byte_perm(compare_value,
1238  static_cast<unsigned int>(new_val),
1239  (reinterpret_cast<size_t>(slot) & 0x2) ? 0x5410 : 0x3254);
1240  old_value = atomicCAS(base_slot_address, compare_value, swap_value);
1241  } while (compare_value != old_value);
1242  return true;
1243 }
1244 
1245 extern "C" __device__ bool slotEmptyKeyCAS_int8(int8_t* slot,
1246  int8_t new_val,
1247  int8_t init_val) {
1248  // properly align the slot address:
1249  unsigned int* base_slot_address =
1250  reinterpret_cast<unsigned int*>(reinterpret_cast<size_t>(slot) & ~0x3);
1251  constexpr unsigned int byte_permutations[] = {0x3214, 0x3240, 0x3410, 0x4210};
1252  unsigned int old_value = *base_slot_address;
1253  unsigned int swap_value, compare_value;
1254  do {
1255  compare_value = old_value;
1256  // exit criteria: if init_val does not exist in the slot (some other thread has
1257  // succeeded)
1258  if (static_cast<unsigned int>(init_val) !=
1259  __byte_perm(compare_value, 0, (reinterpret_cast<size_t>(slot) & 0x3) | 0x4440)) {
1260  return false;
1261  }
1262  swap_value = __byte_perm(compare_value,
1263  static_cast<unsigned int>(new_val),
1264  byte_permutations[reinterpret_cast<size_t>(slot) & 0x3]);
1265  old_value = atomicCAS(base_slot_address, compare_value, swap_value);
1266  } while (compare_value != old_value);
1267  return true;
1268 }
1269 
1270 #include "../Utils/ChunkIter.cpp"
1271 #include "DateTruncate.cpp"
1272 #include "ExtractFromTime.cpp"
1273 #define EXECUTE_INCLUDE
1274 #include "ArrayOps.cpp"
1275 #include "DateAdd.cpp"
1276 #include "GeoOps.cpp"
1277 #include "StringFunctions.cpp"
1278 #undef EXECUTE_INCLUDE
1279 #include "../Utils/Regexp.cpp"
1280 #include "../Utils/StringLike.cpp"
1281 
1282 extern "C" __device__ StringView string_decode(int8_t* chunk_iter_, int64_t pos) {
1283  // TODO(alex): de-dup, the x64 version is basically identical
1284  auto chunk_iter = reinterpret_cast<ChunkIter*>(chunk_iter_);
1285  VarlenDatum vd;
1286  bool is_end;
1287  ChunkIter_get_nth(chunk_iter, pos, false, &vd, &is_end);
1288  // CHECK(!is_end); <--- this is the difference (re: above comment)
1289  return vd.is_null ? StringView{nullptr, 0u}
1290  : StringView{reinterpret_cast<char const*>(vd.pointer), vd.length};
1291 }
1292 
1293 extern "C" __device__ void linear_probabilistic_count(uint8_t* bitmap,
1294  const uint32_t bitmap_bytes,
1295  const uint8_t* key_bytes,
1296  const uint32_t key_len) {
1297  const uint32_t bit_pos = MurmurHash3(key_bytes, key_len, 0) % (bitmap_bytes * 8);
1298  const uint32_t word_idx = bit_pos / 32;
1299  const uint32_t bit_idx = bit_pos % 32;
1300  atomicOr(((uint32_t*)bitmap) + word_idx, 1 << bit_idx);
1301 }
1302 
1303 extern "C" __device__ void agg_count_distinct_bitmap_gpu(int64_t* agg,
1304  const int64_t val,
1305  const int64_t min_val,
1306  const int64_t bucket_size,
1307  const int64_t base_dev_addr,
1308  const int64_t base_host_addr,
1309  const uint64_t sub_bitmap_count,
1310  const uint64_t bitmap_bytes) {
1311  constexpr unsigned bitmap_element_size = 8 * sizeof(uint32_t);
1312  auto bitmap_idx = static_cast<uint64_t>(val - min_val);
1313  if (1 < bucket_size) {
1314  bitmap_idx /= static_cast<uint64_t>(bucket_size);
1315  }
1316  uint64_t const word_idx = bitmap_idx / bitmap_element_size;
1317  uint32_t const bit_idx = bitmap_idx % bitmap_element_size;
1318  int64_t const agg_offset = *agg - base_host_addr;
1319  int64_t const thread_offset = (threadIdx.x & (sub_bitmap_count - 1)) * bitmap_bytes;
1320  auto* bitmap = reinterpret_cast<uint32_t*>(base_dev_addr + agg_offset + thread_offset);
1321  atomicOr(bitmap + word_idx, 1u << bit_idx);
1322 }
1323 
1324 extern "C" __device__ void agg_count_distinct_bitmap_skip_val_gpu(
1325  int64_t* agg,
1326  const int64_t val,
1327  const int64_t min_val,
1328  const int64_t bucket_size,
1329  const int64_t skip_val,
1330  const int64_t base_dev_addr,
1331  const int64_t base_host_addr,
1332  const uint64_t sub_bitmap_count,
1333  const uint64_t bitmap_bytes) {
1334  if (val != skip_val) {
1336  val,
1337  min_val,
1338  bucket_size,
1339  base_dev_addr,
1340  base_host_addr,
1341  sub_bitmap_count,
1342  bitmap_bytes);
1343  }
1344 }
1345 
1346 extern "C" __device__ void agg_approximate_count_distinct_gpu(
1347  int64_t* agg,
1348  const int64_t key,
1349  const uint32_t b,
1350  const int64_t base_dev_addr,
1351  const int64_t base_host_addr) {
1352  const uint64_t hash = MurmurHash64A(&key, sizeof(key), 0);
1353  const uint32_t index = hash >> (64 - b);
1354  const int32_t rank = get_rank(hash << b, 64 - b);
1355  const int64_t host_addr = *agg;
1356  int32_t* M = (int32_t*)(base_dev_addr + host_addr - base_host_addr);
1357  atomicMax(&M[index], rank);
1358 }
1359 
1360 extern "C" __device__ void force_sync() {
1361  __threadfence_block();
1362 }
1363 
1364 extern "C" __device__ void sync_warp() {
1365  __syncwarp();
1366 }
1367 
1375 extern "C" __device__ void sync_warp_protected(int64_t thread_pos, int64_t row_count) {
1376  // only syncing if NOT within the same warp as those threads experiencing the critical
1377  // edge
1378  if ((((row_count - 1) | 0x1F) - thread_pos) >= 32) {
1379  __syncwarp();
1380  }
1381 }
1382 
1383 extern "C" __device__ void sync_threadblock() {
1384  __syncthreads();
1385 }
1386 
1387 /*
1388  * Currently, we just use this function for handling non-grouped aggregates
1389  * with COUNT queries (with GPU shared memory used). Later, we should generate code for
1390  * this depending on the type of aggregate functions.
1391  * TODO: we should use one contiguous global memory buffer, rather than current default
1392  * behaviour of multiple buffers, each for one aggregate. Once that's resolved, we can
1393  * do much cleaner than this function
1394  */
1395 extern "C" __device__ void write_back_non_grouped_agg(int64_t* input_buffer,
1396  int64_t* output_buffer,
1397  const int32_t agg_idx) {
1398  if (threadIdx.x == agg_idx) {
1399  agg_sum_shared(output_buffer, input_buffer[agg_idx]);
1400  }
1401 }
__device__ void sync_warp_protected(int64_t thread_pos, int64_t row_count)
__device__ int32_t checked_single_agg_id_double_shared_slow(int64_t *agg, const double *valp, const double null_val)
__device__ void agg_max_float_shared(int32_t *agg, const float val)
__device__ uint32_t agg_count_float_shared(uint32_t *agg, const float val)
__device__ bool dynamic_watchdog()
__device__ int64_t * get_matching_group_value_columnar(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
__device__ void agg_max_shared(int64_t *agg, const int64_t val)
#define EMPTY_KEY_64
RUNTIME_EXPORT ALWAYS_INLINE uint64_t agg_count_if(uint64_t *agg, const int64_t cond)
__device__ void write_back_nop(int64_t *dest, int64_t *src, const int32_t sz)
Definition: cuda_mapd_rt.cu:49
__device__ void agg_sum_float_skip_val_shared(int32_t *agg, const float val, const float skip_val)
__device__ StringView string_decode(int8_t *chunk_iter_, int64_t pos)
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
__device__ void agg_min_int32_shared(int32_t *agg, const int32_t val)
__device__ int8_t thread_warp_idx(const int8_t warp_sz)
Definition: cuda_mapd_rt.cu:39
__device__ int64_t dw_sm_cycle_start[128]
Definition: cuda_mapd_rt.cu:91
bool is_null
Definition: Datum.h:59
__device__ void agg_id_float_shared(int32_t *agg, const float val)
__device__ void agg_min_double_shared(int64_t *agg, const double val)
__device__ int64_t get_thread_index()
Definition: cuda_mapd_rt.cu:19
RUNTIME_EXPORT NEVER_INLINE DEVICE uint64_t MurmurHash64A(const void *key, int len, uint64_t seed)
Definition: MurmurHash.cpp:27
__device__ int32_t atomicMin32SkipVal(int32_t *address, int32_t val, const int32_t skip_val)
__device__ void agg_sum_if_double_skip_val_shared(int64_t *agg, const double val, const double skip_val, const int8_t cond)
__device__ int32_t pos_step_impl()
Definition: cuda_mapd_rt.cu:35
__device__ void write_back_non_grouped_agg(int64_t *input_buffer, int64_t *output_buffer, const int32_t agg_idx)
Structures and runtime functions of streaming top-k heap.
__device__ void agg_min_int8_shared(int8_t *agg, const int8_t val)
__device__ int32_t checked_single_agg_id_double_shared(int64_t *agg, const double val, const double null_val)
__device__ float atomicMinFltSkipVal(int32_t *address, float val, const float skip_val)
__device__ const int64_t * init_shared_mem_nop(const int64_t *groups_buffer, const int32_t groups_buffer_size)
Definition: cuda_mapd_rt.cu:43
__device__ void agg_sum_if_float_shared(int32_t *agg, const float val, const int8_t cond)
__device__ double atomicMin(double *address, double val)
__device__ void agg_max_int8_shared(int8_t *agg, const int8_t val)
__device__ int32_t checked_single_agg_id_float_shared(int32_t *agg, const float val, const float null_val)
__device__ void atomicMin8SkipVal(int8_t *agg, const int8_t val, const int8_t skip_val)
Functions to support geospatial operations used by the executor.
Macros and functions for groupby buffer compaction.
__device__ int64_t * get_matching_group_value(int64_t *groups_buffer, const uint32_t h, const T *key, const uint32_t key_count, const uint32_t row_size_quad)
__device__ uint32_t agg_count_int32_shared(uint32_t *agg, const int32_t val)
__device__ void agg_sum_if_double_shared(int64_t *agg, const double val, const int8_t cond)
__device__ int64_t dw_cycle_budget
Definition: cuda_mapd_rt.cu:93
__device__ int64_t agg_sum_shared(int64_t *agg, const int64_t val)
__device__ void agg_id_double_shared_slow(int64_t *agg, const double *val)
__device__ int32_t agg_sum_if_int32_shared(int32_t *agg, const int32_t val, const int8_t cond)
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
Definition: ChunkIter.cpp:182
__device__ void agg_min_float_shared(int32_t *agg, const float val)
__device__ int8_t * agg_id_varlen_shared(int8_t *varlen_buffer, const int64_t offset, const int8_t *value, const int64_t size_bytes)
__device__ int64_t atomicMin64(int64_t *address, int64_t val)
__device__ int64_t * declare_dynamic_shared_memory()
Definition: cuda_mapd_rt.cu:56
__device__ void agg_max_double_shared(int64_t *agg, const double val)
__device__ void atomicSumDblSkipVal(double *address, const double val, const double skip_val)
int8_t * pointer
Definition: Datum.h:58
__device__ int32_t agg_sum_int32_shared(int32_t *agg, const int32_t val)
__device__ int64_t agg_sum_skip_val_shared(int64_t *agg, const int64_t val, const int64_t skip_val)
__device__ void agg_sum_float_shared(int32_t *agg, const float val)
__device__ int64_t agg_sum_if_shared(int64_t *agg, const int64_t val, const int8_t cond)
__device__ void agg_id_double_shared(int64_t *agg, const double val)
__device__ void agg_max_skip_val_shared(int64_t *agg, const int64_t val, const int64_t skip_val)
__device__ void atomicMax16(int16_t *agg, const int16_t val)
#define DEF_SKIP_AGG(base_agg_func)
__device__ int64_t get_block_index()
Definition: cuda_mapd_rt.cu:23
__device__ void agg_min_float_skip_val_shared(int32_t *agg, const float val, const float skip_val)
__device__ bool check_interrupt()
__device__ bool slotEmptyKeyCAS_int32(int32_t *slot, int32_t new_val, int32_t init_val)
__device__ int64_t atomicSum64SkipVal(int64_t *address, const int64_t val, const int64_t skip_val)
__device__ int32_t agg_sum_int32_skip_val_shared(int32_t *agg, const int32_t val, const int32_t skip_val)
__device__ void agg_min_int32_skip_val_shared(int32_t *agg, const int32_t val, const int32_t skip_val)
__device__ void linear_probabilistic_count(uint8_t *bitmap, const uint32_t bitmap_bytes, const uint8_t *key_bytes, const uint32_t key_len)
RUNTIME_EXPORT ALWAYS_INLINE uint64_t agg_count_double(uint64_t *agg, const double val)
__device__ void atomicSumFltSkipVal(float *address, const float val, const float skip_val)
__device__ void agg_sum_double_shared(int64_t *agg, const double val)
__inline__ __device__ uint32_t get_smid(void)
Definition: cuda_mapd_rt.cu:97
__device__ int64_t agg_sum_if_skip_val_shared(int64_t *agg, const int64_t val, const int64_t skip_val, const int8_t cond)
__device__ void agg_min_skip_val_shared(int64_t *agg, const int64_t val, const int64_t skip_val)
__device__ uint64_t agg_count_shared(uint64_t *agg, const int64_t val)
__device__ int64_t atomicMax64(int64_t *address, int64_t val)
__device__ bool slotEmptyKeyCAS(int64_t *slot, int64_t new_val, int64_t init_val)
__device__ uint32_t agg_count_if_int32_shared(uint32_t *agg, const int32_t cond)
__device__ int32_t pos_start_impl(const int32_t *row_index_resume)
Definition: cuda_mapd_rt.cu:27
__device__ int64_t atomicMax64SkipVal(int64_t *address, int64_t val, const int64_t skip_val)
__device__ void atomicMin16(int16_t *agg, const int16_t val)
__device__ void agg_max_float_skip_val_shared(int32_t *agg, const float val, const float skip_val)
__device__ int32_t runtime_interrupt_flag
Definition: cuda_mapd_rt.cu:95
__device__ void agg_approximate_count_distinct_gpu(int64_t *agg, const int64_t key, const uint32_t b, const int64_t base_dev_addr, const int64_t base_host_addr)
__device__ void sync_warp()
__device__ void atomicMin16SkipVal(int16_t *agg, const int16_t val, const int16_t skip_val)
__device__ void agg_count_distinct_bitmap_skip_val_gpu(int64_t *agg, const int64_t val, const int64_t min_val, const int64_t bucket_size, const int64_t skip_val, const int64_t base_dev_addr, const int64_t base_host_addr, const uint64_t sub_bitmap_count, const uint64_t bitmap_bytes)
__device__ void agg_sum_double_skip_val_shared(int64_t *agg, const double val, const double skip_val)
__device__ void agg_max_int8_skip_val_shared(int8_t *agg, const int8_t val, const int8_t skip_val)
RUNTIME_EXPORT ALWAYS_INLINE uint32_t agg_count_if_int32(uint32_t *agg, const int32_t cond)
__device__ void agg_max_int16_skip_val_shared(int16_t *agg, const int16_t val, const int16_t skip_val)
__device__ void atomicMin8(int8_t *agg, const int8_t val)
RUNTIME_EXPORT NEVER_INLINE DEVICE uint32_t MurmurHash3(const void *key, int len, const uint32_t seed)
Definition: MurmurHash.cpp:33
__device__ void agg_min_int16_shared(int16_t *agg, const int16_t val)
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
__device__ void agg_max_int16_shared(int16_t *agg, const int16_t val)
__device__ const int64_t * init_shared_mem(const int64_t *global_groups_buffer, const int32_t groups_buffer_size)
Definition: cuda_mapd_rt.cu:66
RUNTIME_EXPORT ALWAYS_INLINE uint32_t agg_count_int32(uint32_t *agg, const int32_t)
__device__ void agg_min_double_skip_val_shared(int64_t *agg, const double val, const double skip_val)
#define DEF_AGG_ID_INT_SHARED(n)
__device__ uint64_t agg_count_double_shared(uint64_t *agg, const double val)
#define EMPTY_KEY_32
__device__ uint64_t agg_count_if_shared(uint64_t *agg, const int64_t cond)
__device__ T get_empty_key()
__device__ void agg_min_int16_skip_val_shared(int16_t *agg, const int16_t val, const int16_t skip_val)
__device__ void sync_threadblock()
__device__ void agg_min_int8_skip_val_shared(int8_t *agg, const int8_t val, const int8_t skip_val)
RUNTIME_EXPORT ALWAYS_INLINE uint64_t agg_count(uint64_t *agg, const int64_t)
__device__ void atomicMax8(int8_t *agg, const int8_t val)
__device__ void agg_id_shared(int64_t *agg, const int64_t val)
__device__ double atomicMax(double *address, double val)
__device__ void agg_count_distinct_bitmap_gpu(int64_t *agg, const int64_t val, const int64_t min_val, const int64_t bucket_size, const int64_t base_dev_addr, const int64_t base_host_addr, const uint64_t sub_bitmap_count, const uint64_t bitmap_bytes)
__device__ int32_t atomicSum32SkipVal(int32_t *address, const int32_t val, const int32_t skip_val)
__device__ double atomicMinDblSkipVal(double *address, double val, const double skip_val)
__device__ int32_t get_matching_group_value_columnar_slot(int64_t *groups_buffer, const uint32_t entry_count, const uint32_t h, const T *key, const uint32_t key_count)
__device__ void agg_max_int32_shared(int32_t *agg, const int32_t val)
__device__ int32_t checked_single_agg_id_shared(int64_t *agg, const int64_t val, const int64_t null_val)
__device__ void agg_sum_if_float_skip_val_shared(int32_t *agg, const float val, const float skip_val, const int8_t cond)
__device__ void agg_max_int32_skip_val_shared(int32_t *agg, const int32_t val, const int32_t skip_val)
__device__ int32_t dw_abort
Definition: cuda_mapd_rt.cu:94
__device__ bool slotEmptyKeyCAS_int16(int16_t *slot, int16_t new_val, int16_t init_val)
__device__ void agg_max_double_skip_val_shared(int64_t *agg, const double val, const double skip_val)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
__device__ int64_t atomicMin64SkipVal(int64_t *address, int64_t val, const int64_t skip_val)
Functions to support array operations used by the executor.
__device__ void force_sync()
__device__ int32_t agg_sum_if_int32_skip_val_shared(int32_t *agg, const int32_t val, const int32_t skip_val, const int8_t cond)
RUNTIME_EXPORT ALWAYS_INLINE uint32_t agg_count_float(uint32_t *agg, const float val)
__device__ void agg_min_shared(int64_t *agg, const int64_t val)
size_t length
Definition: Datum.h:57
__device__ bool slotEmptyKeyCAS_int8(int8_t *slot, int8_t new_val, int8_t init_val)
__device__ int32_t group_buff_idx_impl()
Definition: cuda_mapd_rt.cu:31