OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
JoinHashTableQueryRuntime.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <cmath>
18 #include <cstdio>
19 #include <cstdlib>
20 
23 #include "QueryEngine/MurmurHash.h"
24 
25 DEVICE bool compare_to_key(const int8_t* entry,
26  const int8_t* key,
27  const size_t key_bytes) {
28  for (size_t i = 0; i < key_bytes; ++i) {
29  if (entry[i] != key[i]) {
30  return false;
31  }
32  }
33  return true;
34 }
35 
36 namespace {
37 
38 const int kNoMatch = -1;
39 const int kNotPresent = -2;
40 
41 } // namespace
42 
43 template <class T>
44 DEVICE int64_t get_matching_slot(const int8_t* hash_buff,
45  const uint32_t h,
46  const int8_t* key,
47  const size_t key_bytes) {
48  const auto lookup_result_ptr = hash_buff + h * (key_bytes + sizeof(T));
49  if (compare_to_key(lookup_result_ptr, key, key_bytes)) {
50  return *reinterpret_cast<const T*>(lookup_result_ptr + key_bytes);
51  }
52  if (*reinterpret_cast<const T*>(lookup_result_ptr) ==
53  SUFFIX(get_invalid_key) < T > ()) {
54  return kNotPresent;
55  }
56  return kNoMatch;
57 }
58 
59 template <class T>
60 FORCE_INLINE DEVICE int64_t baseline_hash_join_idx_impl(const int8_t* hash_buff,
61  const int8_t* key,
62  const size_t key_bytes,
63  const size_t entry_count) {
64  if (!entry_count) {
65  return kNoMatch;
66  }
67  const uint32_t h = MurmurHash1(key, key_bytes, 0) % entry_count;
68  int64_t matching_slot = get_matching_slot<T>(hash_buff, h, key, key_bytes);
69  if (matching_slot != kNoMatch) {
70  return matching_slot;
71  }
72  uint32_t h_probe = (h + 1) % entry_count;
73  while (h_probe != h) {
74  matching_slot = get_matching_slot<T>(hash_buff, h_probe, key, key_bytes);
75  if (matching_slot != kNoMatch) {
76  return matching_slot;
77  }
78  h_probe = (h_probe + 1) % entry_count;
79  }
80  return kNoMatch;
81 }
82 
83 extern "C" RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t
84 baseline_hash_join_idx_32(const int8_t* hash_buff,
85  const int8_t* key,
86  const size_t key_bytes,
87  const size_t entry_count) {
88  return baseline_hash_join_idx_impl<int32_t>(hash_buff, key, key_bytes, entry_count);
89 }
90 
91 extern "C" RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t
92 baseline_hash_join_idx_64(const int8_t* hash_buff,
93  const int8_t* key,
94  const size_t key_bytes,
95  const size_t entry_count) {
96  return baseline_hash_join_idx_impl<int64_t>(hash_buff, key, key_bytes, entry_count);
97 }
98 
99 template <typename T>
101  const double bucket_size) {
102  return static_cast<int64_t>(floor(static_cast<double>(value) * bucket_size));
103 }
104 
105 extern "C" RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t
106 get_bucket_key_for_range_double(const int8_t* range_bytes,
107  const size_t range_component_index,
108  const double bucket_size) {
109  const auto range = reinterpret_cast<const double*>(range_bytes);
110  return get_bucket_key_for_value_impl(range[range_component_index], bucket_size);
111 }
112 
113 FORCE_INLINE DEVICE int64_t
115  const size_t range_component_index,
116  const double bucket_size) {
117  const auto range_ptr = reinterpret_cast<const int32_t*>(range);
118  if (range_component_index % 2 == 0) {
120  Geospatial::decompress_longitude_coord_geoint32(range_ptr[range_component_index]),
121  bucket_size);
122  } else {
124  Geospatial::decompress_latitude_coord_geoint32(range_ptr[range_component_index]),
125  bucket_size);
126  }
127 }
128 
129 extern "C" RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t
131  const size_t range_component_index,
132  const double bucket_size) {
134  range, range_component_index, bucket_size);
135 }
136 
137 template <typename T>
139  const size_t key_component_count,
140  const T* composite_key_dict,
141  const size_t entry_count) {
142  const uint32_t h = MurmurHash1(key, key_component_count * sizeof(T), 0) % entry_count;
143  uint32_t off = h * key_component_count;
144  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
145  return h;
146  }
147  uint32_t h_probe = (h + 1) % entry_count;
148  while (h_probe != h) {
149  off = h_probe * key_component_count;
150  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
151  return h_probe;
152  }
153  if (composite_key_dict[off] == SUFFIX(get_invalid_key) < T > ()) {
154  return -1;
155  }
156  h_probe = (h_probe + 1) % entry_count;
157  }
158  return -1;
159 }
160 
161 extern "C" RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t
162 get_composite_key_index_32(const int32_t* key,
163  const size_t key_component_count,
164  const int32_t* composite_key_dict,
165  const size_t entry_count) {
167  key, key_component_count, composite_key_dict, entry_count);
168 }
169 
170 extern "C" RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t
171 get_composite_key_index_64(const int64_t* key,
172  const size_t key_component_count,
173  const int64_t* composite_key_dict,
174  const size_t entry_count) {
176  key, key_component_count, composite_key_dict, entry_count);
177 }
178 
179 extern "C" RUNTIME_EXPORT NEVER_INLINE DEVICE int32_t insert_sorted(int32_t* arr,
180  size_t elem_count,
181  int32_t elem) {
182  for (size_t i = 0; i < elem_count; i++) {
183  if (elem == arr[i]) {
184  return 0;
185  }
186 
187  if (elem > arr[i]) {
188  continue;
189  }
190 
191  for (size_t j = elem_count; i < j; j--) {
192  arr[j] = arr[j - 1];
193  }
194  arr[i] = elem;
195  return 1;
196  }
197 
198  arr[elem_count] = elem;
199  return 1;
200 }
201 
202 extern "C" RUNTIME_EXPORT ALWAYS_INLINE DEVICE int64_t
204  const int64_t key,
205  const int64_t min_key,
206  const int64_t max_key) {
207  if (key >= min_key && key <= max_key) {
208  return *(reinterpret_cast<int32_t*>(hash_buff) + (key - min_key));
209  }
210  return -1;
211 }
212 
213 struct BufferRange {
214  const int32_t* buffer = nullptr;
215  const int64_t element_count = 0;
216 };
217 
219 get_row_id_buffer_ptr(int64_t* hash_table_ptr,
220  const int64_t* key,
221  const int64_t key_component_count,
222  const int64_t entry_count,
223  const int64_t offset_buffer_ptr_offset,
224  const int64_t sub_buff_size) {
225  const int64_t min_key = 0;
226  const int64_t max_key = entry_count - 1;
227 
228  auto key_idx =
229  get_composite_key_index_64(key, key_component_count, hash_table_ptr, entry_count);
230 
231  if (key_idx < -1) {
232  return BufferRange{nullptr, 0};
233  }
234 
235  int8_t* one_to_many_ptr = reinterpret_cast<int8_t*>(hash_table_ptr);
236  one_to_many_ptr += offset_buffer_ptr_offset;
237 
238  // Returns an index used to fetch row count and row ids.
239  const auto slot = bbox_intersect_hash_join_idx(
240  reinterpret_cast<int64_t>(one_to_many_ptr), key_idx, min_key, max_key);
241  if (slot < 0) {
242  return BufferRange{nullptr, 0};
243  }
244 
245  // Offset into the row count section of buffer
246  int8_t* count_ptr = one_to_many_ptr + sub_buff_size;
247 
248  const int64_t matched_row_count = bbox_intersect_hash_join_idx(
249  reinterpret_cast<int64_t>(count_ptr), key_idx, min_key, max_key);
250 
251  // Offset into payload section, containing an array of row ids
252  int32_t* rowid_buffer = (int32_t*)(one_to_many_ptr + 2 * sub_buff_size);
253  const auto rowidoff_ptr = &rowid_buffer[slot];
254 
255  return BufferRange{rowidoff_ptr, matched_row_count};
256 }
257 
258 struct Bounds {
259  const double min_X;
260  const double min_Y;
261  const double max_X;
262  const double max_Y;
263 };
264 
289 extern "C" RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t
290 get_candidate_rows(int32_t* out_arr,
291  int32_t* error_code,
292  const uint32_t max_arr_size,
293  const int8_t* range_bytes,
294  const int32_t range_component_index,
295  const double bucket_size_x,
296  const double bucket_size_y,
297  const int32_t keys_count,
298  const int64_t key_component_count,
299  int64_t* hash_table_ptr,
300  const int64_t entry_count,
301  const int64_t offset_buffer_ptr_offset,
302  const int64_t sub_buff_size,
303  const int32_t max_bbox_overlaps_error_code) {
304  const auto range = reinterpret_cast<const double*>(range_bytes);
305 
306  size_t elem_count = 0;
307 
308  const auto bounds = Bounds{range[0], range[1], range[2], range[3]};
309 
310  for (int64_t x = floor(bounds.min_X * bucket_size_x);
311  x <= floor(bounds.max_X * bucket_size_x);
312  x++) {
313  for (int64_t y = floor(bounds.min_Y * bucket_size_y);
314  y <= floor(bounds.max_Y * bucket_size_y);
315  y++) {
316  int64_t cur_key[2];
317  cur_key[0] = static_cast<int64_t>(x);
318  cur_key[1] = static_cast<int64_t>(y);
319 
320  const auto buffer_range = get_row_id_buffer_ptr(hash_table_ptr,
321  cur_key,
322  key_component_count,
323  entry_count,
324  offset_buffer_ptr_offset,
325  sub_buff_size);
326 
327  for (int64_t j = 0; j < buffer_range.element_count; j++) {
328  const auto rowid = buffer_range.buffer[j];
329  elem_count += insert_sorted(out_arr, elem_count, rowid);
330  if (elem_count > max_arr_size) {
331  *error_code = max_bbox_overlaps_error_code;
332  return 0;
333  }
334  }
335  }
336  }
337 
338  return elem_count;
339 }
340 
341 // /// Given the bounding box and the bucket size,
342 // /// return the number of buckets the bounding box
343 // /// will be split into.
344 extern "C" RUNTIME_EXPORT NEVER_INLINE DEVICE int32_t
345 get_num_buckets_for_bounds(const int8_t* range_bytes,
346  const int32_t range_component_index,
347  const double bucket_size_x,
348  const double bucket_size_y) {
349  const auto range = reinterpret_cast<const double*>(range_bytes);
350 
351  const auto bounds_min_x = range[0];
352  const auto bounds_min_y = range[1];
353  const auto bounds_max_x = range[2];
354  const auto bounds_max_y = range[3];
355 
356  const auto num_x =
357  floor(bounds_max_x * bucket_size_x) - floor(bounds_min_x * bucket_size_x);
358  const auto num_y =
359  floor(bounds_max_y * bucket_size_y) - floor(bounds_min_y * bucket_size_y);
360  const auto num_buckets = (num_x + 1) * (num_y + 1);
361 
362  return num_buckets;
363 }
DEVICE bool compare_to_key(const int8_t *entry, const int8_t *key, const size_t key_bytes)
ALWAYS_INLINE DEVICE BufferRange get_row_id_buffer_ptr(int64_t *hash_table_ptr, const int64_t *key, const int64_t key_component_count, const int64_t entry_count, const int64_t offset_buffer_ptr_offset, const int64_t sub_buff_size)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
DEVICE double decompress_latitude_coord_geoint32(const int32_t compressed)
DEVICE int64_t get_matching_slot(const int8_t *hash_buff, const uint32_t h, const int8_t *key, const size_t key_bytes)
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t get_composite_key_index_64(const int64_t *key, const size_t key_component_count, const int64_t *composite_key_dict, const size_t entry_count)
RUNTIME_EXPORT NEVER_INLINE DEVICE int32_t insert_sorted(int32_t *arr, size_t elem_count, int32_t elem)
#define SUFFIX(name)
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t get_bucket_key_for_range_double(const int8_t *range_bytes, const size_t range_component_index, const double bucket_size)
FORCE_INLINE DEVICE int64_t get_bucket_key_for_value_impl(const T value, const double bucket_size)
#define DEVICE
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t baseline_hash_join_idx_32(const int8_t *hash_buff, const int8_t *key, const size_t key_bytes, const size_t entry_count)
DEVICE T SUFFIX() get_invalid_key()
#define RUNTIME_EXPORT
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t get_candidate_rows(int32_t *out_arr, int32_t *error_code, const uint32_t max_arr_size, const int8_t *range_bytes, const int32_t range_component_index, const double bucket_size_x, const double bucket_size_y, const int32_t keys_count, const int64_t key_component_count, int64_t *hash_table_ptr, const int64_t entry_count, const int64_t offset_buffer_ptr_offset, const int64_t sub_buff_size, const int32_t max_bbox_overlaps_error_code)
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t get_bucket_key_for_range_compressed(const int8_t *range, const size_t range_component_index, const double bucket_size)
RUNTIME_EXPORT NEVER_INLINE DEVICE uint32_t MurmurHash1(const void *key, int len, const uint32_t seed)
Definition: MurmurHash.cpp:21
DEVICE double decompress_longitude_coord_geoint32(const int32_t compressed)
#define FORCE_INLINE
FORCE_INLINE DEVICE int64_t get_bucket_key_for_range_compressed_impl(const int8_t *range, const size_t range_component_index, const double bucket_size)
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t get_composite_key_index_32(const int32_t *key, const size_t key_component_count, const int32_t *composite_key_dict, const size_t entry_count)
FORCE_INLINE DEVICE int64_t baseline_hash_join_idx_impl(const int8_t *hash_buff, const int8_t *key, const size_t key_bytes, const size_t entry_count)
#define NEVER_INLINE
def error_code
Definition: report.py:234
#define ALWAYS_INLINE
RUNTIME_EXPORT NEVER_INLINE DEVICE int32_t get_num_buckets_for_bounds(const int8_t *range_bytes, const int32_t range_component_index, const double bucket_size_x, const double bucket_size_y)
FORCE_INLINE DEVICE int64_t get_composite_key_index_impl(const T *key, const size_t key_component_count, const T *composite_key_dict, const size_t entry_count)
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t baseline_hash_join_idx_64(const int8_t *hash_buff, const int8_t *key, const size_t key_bytes, const size_t entry_count)
RUNTIME_EXPORT ALWAYS_INLINE DEVICE int64_t bbox_intersect_hash_join_idx(int64_t hash_buff, const int64_t key, const int64_t min_key, const int64_t max_key)