OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RangeJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 
19 #include "QueryEngine/Execute.h"
25 
26 // clang-format off
69 // clang-format on
70 
71 std::shared_ptr<RangeJoinHashTable> RangeJoinHashTable::getInstance(
72  const std::shared_ptr<Analyzer::BinOper> condition,
73  const Analyzer::RangeOper* range_expr,
74  const std::vector<InputTableInfo>& query_infos,
75  const Data_Namespace::MemoryLevel memory_level,
76  const JoinType join_type,
77  const int device_count,
78  ColumnCacheMap& column_cache,
79  Executor* executor,
80  const HashTableBuildDagMap& hashtable_build_dag_map,
81  const RegisteredQueryHint& query_hints,
82  const TableIdToNodeMap& table_id_to_node_map) {
83  // the hash table is built over the LHS of the range oper. we then use the lhs
84  // of the bin oper + the rhs of the range oper for the probe
85  auto range_expr_col_var =
86  dynamic_cast<const Analyzer::ColumnVar*>(range_expr->get_left_operand());
87  if (!range_expr_col_var || !range_expr_col_var->get_type_info().is_geometry()) {
88  throw HashJoinFail("Could not build hash tables for range join | " +
89  range_expr->toString());
90  }
91 
92  CHECK(range_expr_col_var->get_type_info().is_geometry());
93 
94  auto coords_column_key = range_expr_col_var->getColumnKey();
95  coords_column_key.column_id = coords_column_key.column_id + 1;
96  const auto coords_cd = Catalog_Namespace::get_metadata_for_column(coords_column_key);
97  CHECK(coords_cd);
98 
99  auto range_join_inner_col_expr = makeExpr<Analyzer::ColumnVar>(
100  coords_cd->columnType, coords_column_key, range_expr_col_var->get_rte_idx());
101 
102  std::vector<InnerOuter> inner_outer_pairs;
103  inner_outer_pairs.emplace_back(
104  InnerOuter{dynamic_cast<Analyzer::ColumnVar*>(range_join_inner_col_expr.get()),
105  condition->get_left_operand()});
106 
107  const auto& query_info =
108  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
109  .info;
110 
111  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
112  if (total_entries > HashJoin::MAX_NUM_HASH_ENTRIES) {
113  throw TooManyHashEntries();
114  }
115 
116  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
118  condition.get(), executor, inner_outer_pairs)
119  : 0;
120 
121  auto join_hash_table = std::make_shared<RangeJoinHashTable>(condition,
122  join_type,
123  range_expr,
124  range_join_inner_col_expr,
125  query_infos,
126  memory_level,
127  column_cache,
128  executor,
129  inner_outer_pairs,
130  device_count,
131  query_hints,
132  hashtable_build_dag_map,
133  table_id_to_node_map);
135  HashJoin::getInnerTableId(inner_outer_pairs), shard_count, executor);
136  try {
137  join_hash_table->reifyWithLayout(HashType::OneToMany);
138  } catch (const HashJoinFail& e) {
139  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
140  "involved in equijoin | ") +
141  e.what());
142  } catch (const ColumnarConversionNotSupported& e) {
143  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
144  e.what());
145  } catch (const JoinHashTableTooBig& e) {
146  throw e;
147  } catch (const std::exception& e) {
148  LOG(FATAL) << "Fatal error while attempting to build hash tables for join: "
149  << e.what();
150  }
151 
152  return join_hash_table;
153 }
154 
156  auto timer = DEBUG_TIMER(__func__);
157  CHECK(layout == HashType::OneToMany);
158 
159  const auto& query_info =
161  .info;
162 
163  if (query_info.fragments.empty()) {
164  return;
165  }
166 
167  const auto& table_key = getInnerTableId();
168  VLOG(1) << "Reify with layout " << getHashTypeString(layout) << "for " << table_key;
169 
170  std::vector<ColumnsForDevice> columns_per_device;
171 
172  auto data_mgr = executor_->getDataMgr();
173  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
174  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
175  const auto shard_count = shardCount();
176  for (int device_id = 0; device_id < device_count_; ++device_id) {
177  fragments_per_device.emplace_back(
178  shard_count
179  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
180  : query_info.fragments);
182  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
183  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
184  }
185  // for bounding box intersection, we need to fetch columns regardless of the
186  // availability of cached hash table to calculate various params, i.e., bucket size
187  // info todo(yoonmin) : relax this
188  const auto columns_for_device =
189  fetchColumnsForDevice(fragments_per_device[device_id],
190  device_id,
192  ? dev_buff_owners[device_id].get()
193  : nullptr);
194  columns_per_device.push_back(columns_for_device);
195  }
196 
198 
199  const auto bucket_range =
200  dynamic_cast<const Analyzer::Constant*>(range_expr_->get_right_operand());
201 
202  CHECK(bucket_range);
203  CHECK(bucket_range->get_type_info().is_fp() &&
204  bucket_range->get_type_info().get_size() == 8); // TODO
205 
206  const auto bucket_range_datum = bucket_range->get_constval();
207 
208  inverse_bucket_sizes_for_dimension_.emplace_back(1. / bucket_range_datum.doubleval);
209  inverse_bucket_sizes_for_dimension_.emplace_back(1. / bucket_range_datum.doubleval);
210 
212  inverse_bucket_sizes_for_dimension_, columns_per_device, device_count_);
213 
215 
216  // to properly lookup cached hash table, we need to use join columns listed as lhs and
217  // rhs of the bbox intersect op instead of physical (and hidden) column tailored to
218  // range join expr in other words, we need to use geometry column (point) instead of its
219  // hidden array column i.e., see `get_physical_cols` function
220  std::vector<InnerOuter> inner_outer_pairs_for_cache_lookup;
221  inner_outer_pairs_for_cache_lookup.emplace_back(InnerOuter{
222  dynamic_cast<const Analyzer::ColumnVar*>(range_expr_->get_left_operand()),
223  condition_->get_left_operand()});
224  auto hashtable_access_path_info =
225  HashtableRecycler::getHashtableAccessPathInfo(inner_outer_pairs_for_cache_lookup,
226  {},
227  condition_->get_optype(),
228  join_type_,
231  shard_count,
232  fragments_per_device,
233  executor_);
234  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
235  table_keys_ = hashtable_access_path_info.table_keys;
236 
237  auto get_inner_table_key = [&inner_outer_pairs_for_cache_lookup]() {
238  auto col_var = inner_outer_pairs_for_cache_lookup.front().first;
239  return col_var->getTableKey();
240  };
241 
242  if (table_keys_.empty()) {
243  const auto& inner_table_key = get_inner_table_key();
245  composite_key_info_.cache_key_chunks, inner_table_key);
246  }
247  CHECK(!table_keys_.empty());
248 
254  fragments_per_device,
255  device_count_);
256 
257  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
258  get_inner_table_key().table_id > 0) {
259  std::vector<size_t> per_device_chunk_key;
260  for (int device_id = 0; device_id < device_count_; ++device_id) {
262  boost::hash_combine(chunk_key_hash,
263  HashJoin::collectFragmentIds(fragments_per_device[device_id]));
264  per_device_chunk_key.push_back(chunk_key_hash);
266  inner_outer_pairs_for_cache_lookup,
267  columns_per_device.front().join_columns.front().num_elems,
268  chunk_key_hash,
269  condition_->get_optype(),
272  {}};
273  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
274  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_[device_id],
275  table_keys_);
276  }
277  }
278 
280  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
281  if (auto generic_hash_table =
282  initHashTableOnCpuFromCache(hashtable_cache_key_.front(),
285  if (auto hash_table =
286  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
287  // See if a hash table of a different layout was returned.
288  // If it was OneToMany, we can reuse it on ManyToMany.
289  if (layout == HashType::ManyToMany &&
290  hash_table->getLayout() == HashType::OneToMany) {
291  // use the cached hash table
293  }
294 
296 #ifdef HAVE_CUDA
297  for (int device_id = 0; device_id < device_count_; ++device_id) {
298  auto gpu_hash_table = copyCpuHashTableToGpu(hash_table, device_id);
299  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
300  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
301  }
302 #else
303  UNREACHABLE();
304 #endif
305  } else {
307  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
308  // do not move hash_table to keep valid ptr of it within the hash table recycler
309  hash_tables_for_device_[0] = hash_table;
310  }
311  return;
312  }
313  }
314  }
315 
316  auto [entry_count, emitted_keys_count] =
317  computeRangeHashTableCounts(shard_count, columns_per_device);
318 
320  inverse_bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
321 
322  VLOG(1) << "Finalized range join hash table: entry count " << entry_count
323  << " hash table size " << hash_table_size;
324 
325  std::vector<std::future<void>> init_threads;
326  for (int device_id = 0; device_id < device_count_; ++device_id) {
327  init_threads.push_back(
330  this,
331  /* columns_for_device */ columns_per_device[device_id],
332  /* layout_type */ layout,
333  /* entry_count */ entry_count,
334  /* emitted_keys_count */ emitted_keys_count,
335  /* device_id */ device_id,
336  /* parent_thread_local_ids */ logger::thread_local_ids()));
337  }
338  for (auto& init_thread : init_threads) {
339  init_thread.wait();
340  }
341  for (auto& init_thread : init_threads) {
342  init_thread.get();
343  }
344 }
345 
347  const ColumnsForDevice& columns_for_device,
348  const HashType layout,
349  const size_t entry_count,
350  const size_t emitted_keys_count,
351  const int device_id,
352  const logger::ThreadLocalIds parent_thread_local_ids) {
353  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
354  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
355  CHECK_EQ(getKeyComponentWidth(), size_t(8));
357  BaselineHashTableEntryInfo hash_table_entry_info(entry_count,
358  emitted_keys_count,
359  sizeof(int32_t),
362  layout,
363  false);
365  VLOG(1) << "Building range join hash table on CPU.";
366  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
367  columns_for_device.join_column_types,
368  columns_for_device.join_buckets,
369  hash_table_entry_info);
370  CHECK(hash_table);
371 
372 #ifdef HAVE_CUDA
374  auto gpu_hash_table = copyCpuHashTableToGpu(hash_table, device_id);
375  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
376  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
377  } else {
378 #else
380 #endif
381  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
382  hash_tables_for_device_[0] = std::move(hash_table);
383 #ifdef HAVE_CUDA
384  }
385 #endif
386  } else {
387 #ifdef HAVE_CUDA
388  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
389  columns_for_device.join_column_types,
390  columns_for_device.join_buckets,
391  hash_table_entry_info,
392  device_id);
393  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
394  hash_tables_for_device_[device_id] = std::move(hash_table);
395 #else
396  UNREACHABLE();
397 #endif
398  }
399 }
400 // #endif
401 
402 #ifdef HAVE_CUDA
403 std::shared_ptr<BaselineHashTable> RangeJoinHashTable::initHashTableOnGpu(
404  const std::vector<JoinColumn>& join_columns,
405  const std::vector<JoinColumnTypeInfo>& join_column_types,
406  const std::vector<JoinBucketInfo>& join_bucket_info,
407  const BaselineHashTableEntryInfo hash_table_entry_info,
408  const size_t device_id) {
410 
411  VLOG(1) << "Building range join hash table on GPU.";
412 
414  auto data_mgr = executor_->getDataMgr();
415  CudaAllocator allocator(
416  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
417  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
418  CHECK_EQ(join_columns.size(), 1u);
419  CHECK(!join_bucket_info.empty());
420 
421  auto& inverse_bucket_sizes_for_dimension =
422  join_bucket_info[0].inverse_bucket_sizes_for_dimension;
423 
424  auto bucket_sizes_gpu = transfer_vector_of_flat_objects_to_gpu(
425  inverse_bucket_sizes_for_dimension, allocator);
426 
427  const auto key_handler = RangeKeyHandler(isInnerColCompressed(),
428  inverse_bucket_sizes_for_dimension.size(),
429  join_columns_gpu,
430  bucket_sizes_gpu);
431 
432  const auto err = builder.initHashTableOnGpu(&key_handler,
433  join_columns,
434  join_type_,
435  hash_table_entry_info,
436  device_id,
437  executor_,
438  query_hints_);
439  if (err) {
440  throw HashJoinFail(
441  std::string("Unrecognized error when initializing GPU range join hash table (") +
442  std::to_string(err) + std::string(")"));
443  }
444  return builder.getHashTable();
445 }
446 #endif
447 
448 std::shared_ptr<BaselineHashTable> RangeJoinHashTable::initHashTableOnCpu(
449  const std::vector<JoinColumn>& join_columns,
450  const std::vector<JoinColumnTypeInfo>& join_column_types,
451  const std::vector<JoinBucketInfo>& join_bucket_info,
452  const BaselineHashTableEntryInfo hash_table_entry_info) {
453  auto timer = DEBUG_TIMER(__func__);
454  decltype(std::chrono::steady_clock::now()) ts1, ts2;
455  ts1 = std::chrono::steady_clock::now();
456  const auto composite_key_info =
458  CHECK(!join_columns.empty());
459  CHECK(!join_bucket_info.empty());
460 
461  CHECK(layoutRequiresAdditionalBuffers(hash_table_entry_info.getHashTableLayout()));
462  const auto key_component_count =
463  join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
464 
465  auto key_handler =
467  key_component_count,
468  &join_columns[0],
469  join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
470 
473  dummy_str_proxy_translation_maps_ptrs_and_offsets;
474  const auto err =
475  builder.initHashTableOnCpu(&key_handler,
476  composite_key_info,
477  join_columns,
478  join_column_types,
479  join_bucket_info,
480  dummy_str_proxy_translation_maps_ptrs_and_offsets,
481  hash_table_entry_info,
482  join_type_,
483  executor_,
484  query_hints_);
485  ts2 = std::chrono::steady_clock::now();
486  if (err) {
487  throw HashJoinFail(std::string("Unrecognized error when initializing CPU "
488  "range join hash table (") +
489  std::to_string(err) + std::string(")"));
490  }
491  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
492  auto hashtable_build_time =
493  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
496  hash_table,
498  hashtable_build_time);
499  return hash_table;
500 }
501 
503  const size_t shard_count,
504  std::vector<ColumnsForDevice>& columns_per_device) {
506  const auto [tuple_count, emitted_keys_count] =
508  columns_per_device,
511  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
512 
513  return std::make_pair(
514  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
515  emitted_keys_count);
516 }
517 
519  const std::vector<double>& inverse_bucket_sizes_for_dimension,
520  std::vector<ColumnsForDevice>& columns_per_device,
521  const size_t chosen_max_hashtable_size,
522  const double chosen_bucket_threshold) {
523 #ifdef _WIN32
524  // WIN32 needs have C++20 set for designated initialisation to work
525  CountDistinctDescriptor count_distinct_desc{
527  0,
528  0,
529  11,
530  true,
534  1,
535  };
536 #else
537  CountDistinctDescriptor count_distinct_desc{
539  .min_val = 0,
540  .bucket_size = 0,
541  .bitmap_sz_bits = 11,
542  .approximate = true,
546  .sub_bitmap_count = 1,
547  };
548 #endif
549  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
550 
551  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
552  if (columns_per_device.front().join_columns.front().num_elems == 0) {
553  return std::make_pair(0, 0);
554  }
555 
556  for (auto& columns_for_device : columns_per_device) {
557  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
559  }
560 
561  // Number of keys must match dimension of buckets
562  CHECK_EQ(columns_per_device.front().join_columns.size(),
563  columns_per_device.front().join_buckets.size());
565  const auto composite_key_info =
567  int thread_count = cpu_threads();
568  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
569  auto hll_result = &hll_buffer_all_cpus[0];
570 
571  std::vector<int32_t> num_keys_for_row;
572  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
573 
575  num_keys_for_row,
576  count_distinct_desc.bitmap_sz_bits,
577  padded_size_bytes,
578  columns_per_device.front().join_columns,
579  columns_per_device.front().join_column_types,
580  columns_per_device.front().join_buckets,
582  thread_count);
583 
584  for (int i = 1; i < thread_count; ++i) {
585  hll_unify(hll_result,
586  hll_result + i * padded_size_bytes,
587  size_t(1) << count_distinct_desc.bitmap_sz_bits);
588  }
589  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
590  num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0);
591  }
592 #ifdef HAVE_CUDA
593  auto& data_mgr = *executor_->getDataMgr();
594  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
595  for (auto& host_hll_buffer : host_hll_buffers) {
596  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
597  }
598  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
599  std::vector<std::future<void>> approximate_distinct_device_threads;
600  for (int device_id = 0; device_id < device_count_; ++device_id) {
601  approximate_distinct_device_threads.emplace_back(std::async(
603  [device_id,
604  &columns_per_device,
605  &count_distinct_desc,
606  &data_mgr,
607  &host_hll_buffers,
608  &emitted_keys_count_device_threads,
609  this] {
610  auto allocator = std::make_unique<CudaAllocator>(
611  &data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
612  auto device_hll_buffer =
613  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
614  data_mgr.getCudaMgr()->zeroDeviceMem(
615  device_hll_buffer,
616  count_distinct_desc.bitmapPaddedSizeBytes(),
617  device_id,
619  const auto& columns_for_device = columns_per_device[device_id];
620  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
621  columns_for_device.join_columns, *allocator);
622 
623  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
624  const auto& bucket_sizes_for_dimension =
625  columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
626  auto bucket_sizes_gpu =
627  allocator->alloc(bucket_sizes_for_dimension.size() * sizeof(double));
628  allocator->copyToDevice(bucket_sizes_gpu,
629  bucket_sizes_for_dimension.data(),
630  bucket_sizes_for_dimension.size() * sizeof(double));
631  const size_t row_counts_buffer_sz =
632  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
633  auto row_counts_buffer = allocator->alloc(row_counts_buffer_sz);
634  data_mgr.getCudaMgr()->zeroDeviceMem(
635  row_counts_buffer,
636  row_counts_buffer_sz,
637  device_id,
639  const auto key_handler =
641  bucket_sizes_for_dimension.size(),
642  join_columns_gpu,
643  reinterpret_cast<double*>(bucket_sizes_gpu));
644  const auto key_handler_gpu =
645  transfer_flat_object_to_gpu(key_handler, *allocator);
647  reinterpret_cast<uint8_t*>(device_hll_buffer),
648  count_distinct_desc.bitmap_sz_bits,
649  reinterpret_cast<int32_t*>(row_counts_buffer),
650  key_handler_gpu,
651  columns_for_device.join_columns[0].num_elems,
652  executor_->blockSize(),
653  executor_->gridSize());
654 
655  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
656  allocator->copyFromDevice(
657  &host_emitted_keys_count,
658  row_counts_buffer +
659  (columns_per_device.front().join_columns[0].num_elems - 1) *
660  sizeof(int32_t),
661  sizeof(int32_t));
662 
663  auto& host_hll_buffer = host_hll_buffers[device_id];
664  allocator->copyFromDevice(&host_hll_buffer[0],
665  device_hll_buffer,
666  count_distinct_desc.bitmapPaddedSizeBytes());
667  }));
668  }
669  for (auto& child : approximate_distinct_device_threads) {
670  child.get();
671  }
673  auto& result_hll_buffer = host_hll_buffers.front();
674  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
675  for (int device_id = 1; device_id < device_count_; ++device_id) {
676  auto& host_hll_buffer = host_hll_buffers[device_id];
677  hll_unify(hll_result,
678  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
679  size_t(1) << count_distinct_desc.bitmap_sz_bits);
680  }
681  size_t emitted_keys_count = 0;
682  for (auto& emitted_keys_count_device : emitted_keys_count_device_threads) {
683  emitted_keys_count += emitted_keys_count_device;
684  }
685  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
686  emitted_keys_count);
687 #else
688  UNREACHABLE();
689  return {0, 0};
690 #endif // HAVE_CUDA
691 }
692 
693 #define LL_CONTEXT executor_->cgen_state_->context_
694 #define LL_BUILDER executor_->cgen_state_->ir_builder_
695 #define LL_INT(v) executor_->cgen_state_->llInt(v)
696 #define LL_FP(v) executor_->cgen_state_->llFp(v)
697 #define ROW_FUNC executor_->cgen_state_->row_func_
698 
700  llvm::Value* offset_ptr) {
701  const auto key_component_width = getKeyComponentWidth();
702  CHECK(key_component_width == 4 || key_component_width == 8);
703  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
704  llvm::Value* key_buff_lv{nullptr};
705  switch (key_component_width) {
706  case 4:
707  key_buff_lv =
708  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
709  break;
710  case 8:
711  key_buff_lv =
712  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
713  break;
714  default:
715  CHECK(false);
716  }
717 
718  const auto& inner_outer_pair = inner_outer_pairs_[0];
719  const auto outer_col = inner_outer_pair.second;
720  const auto outer_col_ti = outer_col->get_type_info();
721 
722  if (outer_col_ti.is_geometry()) {
723  CodeGenerator code_generator(executor_);
724  // TODO(adb): for points we will use the coords array, but for other
725  // geometries we will need to use the bounding box. For now only support
726  // points.
727  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
728  CHECK_EQ(inverse_bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
729 
730  llvm::Value* arr_ptr{nullptr};
731  // prepare point column (arr) ptr to generate code for hash table key
732  if (auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col)) {
733  const auto col_lvs = code_generator.codegen(outer_col, true, co);
734  CHECK_EQ(col_lvs.size(), size_t(1));
735  auto column_key = outer_col_var->getColumnKey();
736  if (column_key.table_id < 0) {
737  // todo: relax this
738  throw QueryNotSupported(
739  "Geospatial columns not yet supported in this temporary table context.");
740  }
741  column_key.column_id = column_key.column_id + 1;
742  const auto coords_cd = Catalog_Namespace::get_metadata_for_column(column_key);
743  CHECK(coords_cd);
744  const auto coords_ti = coords_cd->columnType;
745 
746  const auto array_buff_ptr = executor_->cgen_state_->emitExternalCall(
747  "array_buff",
748  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
749  {col_lvs.front(), code_generator.posArg(outer_col)});
750  CHECK(array_buff_ptr);
751  CHECK(coords_ti.get_elem_type().get_type() == kTINYINT)
752  << "Only TINYINT coordinates columns are supported in bounding box "
753  "intersection.";
754  arr_ptr =
755  code_generator.castArrayPointer(array_buff_ptr, coords_ti.get_elem_type());
756  } else if (auto geo_expr_outer_col =
757  dynamic_cast<const Analyzer::GeoOperator*>(outer_col)) {
758  const auto geo_expr_name = geo_expr_outer_col->getName();
759  if (func_resolve(geo_expr_name, "ST_Point"sv, "ST_Transform"sv, "ST_Centroid"sv)) {
760  // note that ST_SetSRID changes type info of the column, and is handled by
761  // translation phase, so when we use ST_SETSRID(ST_POINT(x, y), 4326)
762  // as a join column expression, we recognize it as ST_POINT (with SRID as 4326)
763  const auto col_lvs = code_generator.codegen(outer_col, true, co);
764  // listed functions keep point coordinates in the local variable (let say S)
765  // which is corresponding to the pointer that col_lvs[0] holds
766  // thus, all we need is to retrieve necessary coordinate from the S by varying
767  // its offset (i.e., i == 0 means x coordinate)
768  arr_ptr = LL_BUILDER.CreatePointerCast(
769  col_lvs[0], llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_));
770  } else {
771  throw std::runtime_error(
772  "RHS key of the range join operator has a geospatial function which is not "
773  "supported yet: " +
774  geo_expr_name);
775  }
776  } else {
777  throw std::runtime_error("Range join operator has an invalid rhs key: " +
778  outer_col->toString());
779  }
780 
781  // load and unpack offsets
782  const auto offset =
783  LL_BUILDER.CreateLoad(offset_ptr->getType()->getPointerElementType(),
784  offset_ptr,
785  "packed_bucket_offset");
786  const auto x_offset =
787  LL_BUILDER.CreateTrunc(offset, llvm::Type::getInt32Ty(LL_CONTEXT));
788 
789  const auto y_offset_shifted =
790  LL_BUILDER.CreateLShr(offset, LL_INT(static_cast<int64_t>(32)));
791  const auto y_offset =
792  LL_BUILDER.CreateTrunc(y_offset_shifted, llvm::Type::getInt32Ty(LL_CONTEXT));
793 
794  const auto x_bucket_offset =
795  LL_BUILDER.CreateSExt(x_offset, llvm::Type::getInt64Ty(LL_CONTEXT));
796  const auto y_bucket_offset =
797  LL_BUILDER.CreateSExt(y_offset, llvm::Type::getInt64Ty(LL_CONTEXT));
798 
799  for (size_t i = 0; i < 2; i++) {
800  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(
801  key_buff_lv->getType()->getScalarType()->getPointerElementType(),
802  key_buff_lv,
803  LL_INT(i));
804 
805  const auto funcName = isProbeCompressed() ? "get_bucket_key_for_range_compressed"
806  : "get_bucket_key_for_range_double";
807 
808  // Note that get_bucket_key_for_range_compressed will need to be
809  // specialized for future compression schemes
810  auto bucket_key = executor_->cgen_state_->emitExternalCall(
811  funcName,
814 
815  auto bucket_key_shifted = i == 0
816  ? LL_BUILDER.CreateAdd(x_bucket_offset, bucket_key)
817  : LL_BUILDER.CreateAdd(y_bucket_offset, bucket_key);
818 
819  const auto col_lv = LL_BUILDER.CreateSExt(
820  bucket_key_shifted, get_int_type(key_component_width * 8, LL_CONTEXT));
821  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
822  }
823  } else {
824  LOG(FATAL) << "Range join key currently only supported for geospatial types.";
825  }
826  return key_buff_lv;
827 }
828 
830  const CompilationOptions& co,
831  const size_t index,
832  llvm::Value* range_offset) {
833  const auto key_component_width = getKeyComponentWidth();
834  CHECK(key_component_width == 4 || key_component_width == 8);
835 
836  auto key_buff_lv = codegenKey(co, range_offset);
838 
839  auto hash_ptr = codegenHashTableLoad(index, executor_);
840  const auto composite_dict_ptr_type =
841  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
842 
843  const auto composite_key_dict =
844  hash_ptr->getType()->isPointerTy()
845  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
846  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
847 
848  const auto key_component_count = getKeyComponentCount();
849 
850  const auto funcName =
851  "get_composite_key_index_" + std::to_string(key_component_width * 8);
852 
853  const auto key = executor_->cgen_state_->emitExternalCall(funcName,
855  {key_buff_lv,
856  LL_INT(key_component_count),
857  composite_key_dict,
858  LL_INT(getEntryCount())});
859 
860  auto one_to_many_ptr = hash_ptr;
861  if (one_to_many_ptr->getType()->isPointerTy()) {
862  one_to_many_ptr =
863  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
864  } else {
865  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
866  }
867  const auto composite_key_dict_size = offsetBufferOff();
868  one_to_many_ptr =
869  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
870 
872  /* hash_join_idx_args_in */ {one_to_many_ptr,
873  key,
874  LL_INT(int64_t(0)),
875  LL_INT(getEntryCount() - 1)},
876  /* is_sharded */ false,
877  /* is_nullable */ false,
878  /* is_bw_eq */ false,
879  /* sub_buff_size */ getComponentBufferSize(),
880  /* executor */ executor_);
881 }
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:461
#define LL_INT(v)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
auto func_resolve
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
void setBoundingBoxIntersectionMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
JoinType
Definition: sqldefs.h:238
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
llvm::Value * codegenKey(const CompilationOptions &co, llvm::Value *offset)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:259
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:105
shared::TableKey getInnerTableId() const noexceptoverride
HashType getHashType() const noexceptoverride
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:130
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
#define LOG(tag)
Definition: Logger.h:285
static void checkHashJoinReplicationConstraint(const shared::TableKey &table_key, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:796
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:107
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:590
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:378
const std::shared_ptr< Analyzer::BinOper > condition_
llvm::Value * castArrayPointer(llvm::Value *ptr, const SQLTypeInfo &elem_ti)
#define UNREACHABLE()
Definition: Logger.h:338
void reifyWithLayout(const HashType layout) override
const ColumnDescriptor * get_metadata_for_column(const ::shared::ColumnKey &column_key)
const InputTableInfo & get_inner_query_info(const shared::TableKey &inner_table_key, const std::vector< InputTableInfo > &query_infos)
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:417
CountDistinctImplType impl_type_
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:88
#define CHECK_GT(x, y)
Definition: Logger.h:305
#define LL_CONTEXT
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const JoinType join_type, const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
const Expr * get_left_operand() const
Definition: Analyzer.h:552
std::string to_string(char const *&&v)
#define LL_FP(v)
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
#define LL_BUILDER
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:111
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
void approximate_distinct_tuples_on_device_range(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const RangeKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
future< Result > async(Fn &&fn, Args &&...args)
std::string toString() const override
Definition: Analyzer.cpp:2850
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
HashType getHashTableLayout() const
Definition: HashTable.h:53
const Data_Namespace::MemoryLevel memory_level_
std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold) override
static std::unique_ptr< HashtableRecycler > hash_table_cache_
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &bucket_sizes, std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &fragments_per_device, int device_count)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
const std::vector< InputTableInfo > & query_infos_
static constexpr size_t MAX_NUM_HASH_ENTRIES
Definition: HashJoin.h:137
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
Definition: DataRecycler.h:154
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForBoundingBoxIntersection &info)
std::pair< size_t, size_t > computeRangeHashTableCounts(const size_t shard_count, std::vector< ColumnsForDevice > &columns_per_device)
HashJoinMatchingSet codegenMatchingSetWithOffset(const CompilationOptions &, const size_t, llvm::Value *)
const double bucket_threshold_
const Expr * get_right_operand() const
Definition: Analyzer.h:553
static std::shared_ptr< RangeJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const Analyzer::RangeOper *range_expr, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
virtual shared::TableKey getInnerTableId() const noexcept=0
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
#define CHECK_LT(x, y)
Definition: Logger.h:303
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
bool isProbeCompressed() const
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:180
LocalIdsScopeGuard setNewThreadId() const
Definition: Logger.cpp:538
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const BaselineHashTableEntryInfo hash_table_entry_info)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
bool isInnerColCompressed() const
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
Definition: RelAlgDag.cpp:3548
void approximate_distinct_tuples_range(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const bool is_compressed, const int thread_count)
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const int device_id, const logger::ThreadLocalIds parent_thread_local_ids)
Data_Namespace::MemoryLevel effective_memory_level_
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
ThreadId thread_id_
Definition: Logger.h:138
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:113
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
const Analyzer::RangeOper * range_expr_
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
int cpu_threads()
Definition: thread_count.h:25
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
HashType
Definition: HashTable.h:19
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:882
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:110
#define VLOG(n)
Definition: Logger.h:388
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:176
const size_t max_hashtable_size_
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:470
size_t getComponentBufferSize() const noexceptoverride