OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
BaselineJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <future>
20 
24 #include "QueryEngine/Execute.h"
31 
32 // let's only consider CPU hashtable recycler at this moment
33 // todo (yoonmin): support GPU hashtable cache without regression
34 std::unique_ptr<HashtableRecycler> BaselineJoinHashTable::hash_table_cache_ =
35  std::make_unique<HashtableRecycler>(CacheItemType::BASELINE_HT,
37 std::unique_ptr<HashingSchemeRecycler> BaselineJoinHashTable::hash_table_layout_cache_ =
38  std::make_unique<HashingSchemeRecycler>();
39 
41 std::shared_ptr<BaselineJoinHashTable> BaselineJoinHashTable::getInstance(
42  const std::shared_ptr<Analyzer::BinOper> condition,
43  const std::vector<InputTableInfo>& query_infos,
44  const Data_Namespace::MemoryLevel memory_level,
45  const JoinType join_type,
46  const HashType preferred_hash_type,
47  const int device_count,
48  ColumnCacheMap& column_cache,
49  Executor* executor,
50  const HashTableBuildDagMap& hashtable_build_dag_map,
51  const RegisteredQueryHint& query_hints,
52  const TableIdToNodeMap& table_id_to_node_map) {
53  decltype(std::chrono::steady_clock::now()) ts1, ts2;
54 
55  auto hash_type = preferred_hash_type;
56  if (query_hints.force_one_to_many_hash_join) {
57  LOG(INFO) << "A user's query hint forced the join operation to use OneToMany hash "
58  "join layout";
59  hash_type = HashType::OneToMany;
60  }
61 
62  if (VLOGGING(1)) {
63  VLOG(1) << "Building keyed hash table " << getHashTypeString(hash_type)
64  << " for qual: " << condition->toString();
65  ts1 = std::chrono::steady_clock::now();
66  }
67  auto inner_outer_pairs =
68  HashJoin::normalizeColumnPairs(condition.get(), executor->getTemporaryTables());
69  const auto& inner_outer_cols = inner_outer_pairs.first;
70  const auto& col_pairs_string_op_infos = inner_outer_pairs.second;
71  auto join_hash_table = std::shared_ptr<BaselineJoinHashTable>(
72  new BaselineJoinHashTable(condition,
73  join_type,
74  query_infos,
75  memory_level,
76  column_cache,
77  executor,
78  inner_outer_cols,
79  col_pairs_string_op_infos,
80  device_count,
81  query_hints,
82  hashtable_build_dag_map,
83  table_id_to_node_map));
84  try {
85  join_hash_table->reify(hash_type);
86  } catch (const TableMustBeReplicated& e) {
87  // Throw a runtime error to abort the query
88  join_hash_table->freeHashBufferMemory();
89  throw std::runtime_error(e.what());
90  } catch (const HashJoinFail& e) {
91  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
92  // possible)
93  join_hash_table->freeHashBufferMemory();
94  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
95  "involved in equijoin | ") +
96  e.what());
97  } catch (const ColumnarConversionNotSupported& e) {
98  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
99  e.what());
100  } catch (const OutOfMemory& e) {
101  throw HashJoinFail(
102  std::string("Ran out of memory while building hash tables for equijoin | ") +
103  e.what());
104  } catch (const JoinHashTableTooBig& e) {
105  throw e;
106  } catch (const std::exception& e) {
107  throw std::runtime_error(
108  std::string("Fatal error while attempting to build hash tables for join: ") +
109  e.what());
110  }
111  if (VLOGGING(1)) {
112  ts2 = std::chrono::steady_clock::now();
113  VLOG(1) << "Built keyed hash table "
114  << getHashTypeString(join_hash_table->getHashType()) << " in "
115  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
116  << " ms";
117  }
118  return join_hash_table;
119 }
120 
122  const std::shared_ptr<Analyzer::BinOper> condition,
123  const JoinType join_type,
124  const std::vector<InputTableInfo>& query_infos,
125  const Data_Namespace::MemoryLevel memory_level,
126  ColumnCacheMap& column_cache,
127  Executor* executor,
128  const std::vector<InnerOuter>& inner_outer_pairs,
129  const std::vector<InnerOuterStringOpInfos>& col_pairs_string_op_infos,
130  const int device_count,
131  const RegisteredQueryHint& query_hints,
132  const HashTableBuildDagMap& hashtable_build_dag_map,
133  const TableIdToNodeMap& table_id_to_node_map)
134  : condition_(condition)
135  , join_type_(join_type)
136  , query_infos_(query_infos)
137  , memory_level_(memory_level)
138  , executor_(executor)
139  , column_cache_(column_cache)
140  , inner_outer_pairs_(inner_outer_pairs)
141  , inner_outer_string_op_infos_pairs_(col_pairs_string_op_infos)
142  , device_count_(device_count)
143  , query_hints_(query_hints)
144  , needs_dict_translation_(false)
145  , hashtable_build_dag_map_(hashtable_build_dag_map)
146  , table_id_to_node_map_(table_id_to_node_map)
147  , rowid_size_(sizeof(int32_t)) {
149  hash_tables_for_device_.resize(std::max(device_count_, 1));
150 }
151 
153  const Analyzer::BinOper* condition,
154  const Executor* executor,
155  const std::vector<InnerOuter>& inner_outer_pairs) {
156  for (const auto& inner_outer_pair : inner_outer_pairs) {
157  const auto pair_shard_count = get_shard_count(inner_outer_pair, executor);
158  if (pair_shard_count) {
159  return pair_shard_count;
160  }
161  }
162  return 0;
163 }
164 
166  const int device_id,
167  bool raw) const {
168  auto buffer = getJoinHashBuffer(device_type, device_id);
169  if (!buffer) {
170  return "EMPTY";
171  }
172  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
173  auto hash_table = hash_tables_for_device_[device_id];
174  CHECK(hash_table);
175  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
176 #ifdef HAVE_CUDA
177  std::unique_ptr<int8_t[]> buffer_copy;
178  if (device_type == ExecutorDeviceType::GPU) {
179  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
180 
181  auto data_mgr = executor_->getDataMgr();
182  auto device_allocator = std::make_unique<CudaAllocator>(
183  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
184  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
185  }
186  auto ptr1 = buffer_copy ? buffer_copy.get() : buffer;
187 #else
188  auto ptr1 = buffer;
189 #endif // HAVE_CUDA
190  auto ptr2 = ptr1 + offsetBufferOff();
191  auto ptr3 = ptr1 + countBufferOff();
192  auto ptr4 = ptr1 + payloadBufferOff();
193  CHECK(hash_table);
194  const auto layout = getHashType();
195  return HashTable::toString(
196  "keyed",
197  getHashTypeString(layout),
198  getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
200  hash_table->getEntryCount(),
201  ptr1,
202  ptr2,
203  ptr3,
204  ptr4,
205  buffer_size,
206  raw);
207 }
208 
209 std::set<DecodedJoinHashBufferEntry> BaselineJoinHashTable::toSet(
210  const ExecutorDeviceType device_type,
211  const int device_id) const {
212  auto buffer = getJoinHashBuffer(device_type, device_id);
213  auto hash_table = getHashTableForDevice(device_id);
214  CHECK(hash_table);
215  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
216 #ifdef HAVE_CUDA
217  std::unique_ptr<int8_t[]> buffer_copy;
218  if (device_type == ExecutorDeviceType::GPU) {
219  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
220  auto data_mgr = executor_->getDataMgr();
221  auto device_allocator = std::make_unique<CudaAllocator>(
222  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
223  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
224  }
225  auto ptr1 = buffer_copy ? buffer_copy.get() : buffer;
226 #else
227  auto ptr1 = buffer;
228 #endif // HAVE_CUDA
229  auto ptr2 = ptr1 + offsetBufferOff();
230  auto ptr3 = ptr1 + countBufferOff();
231  auto ptr4 = ptr1 + payloadBufferOff();
232  const auto layout = hash_table->getLayout();
233  return HashTable::toSet(getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
235  hash_table->getEntryCount(),
236  ptr1,
237  ptr2,
238  ptr3,
239  ptr4,
240  buffer_size);
241 }
242 
244  const std::vector<InnerOuter>& inner_outer_pairs,
245  const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs,
246  const Executor* executor) {
247  const auto num_col_pairs = inner_outer_pairs.size();
248  CHECK_EQ(num_col_pairs, inner_outer_string_op_infos_pairs.size());
249  for (size_t col_pair_idx = 0; col_pair_idx < num_col_pairs; ++col_pair_idx) {
250  if (needs_dictionary_translation(inner_outer_pairs[col_pair_idx],
251  inner_outer_string_op_infos_pairs[col_pair_idx],
252  executor)) {
253  return true;
254  }
255  }
256  return false;
257 }
258 
259 void BaselineJoinHashTable::reify(const HashType preferred_layout) {
260  auto timer = DEBUG_TIMER(__func__);
266  executor_);
267 
268  auto layout = preferred_layout;
269  if (condition_->is_bbox_intersect_oper()) {
270  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
271 
272  if (inner_outer_pairs_[0].second->get_type_info().is_array()) {
273  layout = HashType::ManyToMany;
274  } else {
275  layout = HashType::OneToMany;
276  }
277  try {
278  reifyWithLayout(layout);
279  return;
280  } catch (const std::exception& e) {
281  VLOG(1) << "Caught exception while building baseline hash table for bounding box "
282  "intersection: "
283  << e.what();
284  throw;
285  }
286  }
287 
288  // Automatically prefer One-To-Many layouts when string operations are involved as these
289  // tend to be cardinality-reducing operations.
290  // Todo(todd): Ostensibly only string ops on the rhs/inner expression cause rhs dups and
291  // so we may be too conservative here, but validate
292 
293  for (const auto& inner_outer_string_op_infos : inner_outer_string_op_infos_pairs_) {
294  if (inner_outer_string_op_infos.first.size() ||
295  inner_outer_string_op_infos.second.size()) {
296  layout = HashType::OneToMany;
297  break;
298  }
299  }
300 
301  try {
302  reifyWithLayout(layout);
303  } catch (const std::exception& e) {
304  VLOG(1) << "Caught exception while building baseline hash table: " << e.what();
305  // In perfect hash we CHECK that the layout is not OnetoMany here, but for baseline
306  // we are catching all exceptions, so should determine if that is safe first
307  // before we would CHECK and not throw an exception here
308  if (layout == HashType::OneToMany) {
309  throw(e);
310  }
313  }
314 }
315 
317  const auto& query_info = get_inner_query_info(getInnerTableId(), query_infos_).info;
318  if (query_info.fragments.empty()) {
319  return;
320  }
321 
322  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
323  if (total_entries > HashJoin::MAX_NUM_HASH_ENTRIES) {
324  throw TooManyHashEntries();
325  }
326 
327  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
328  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
329  std::vector<ColumnsForDevice> columns_per_device;
330  const auto shard_count = shardCount();
331  auto entries_per_device =
332  get_entries_per_device(total_entries, shard_count, device_count_, memory_level_);
333  auto data_mgr = executor_->getDataMgr();
334  // cached hash table lookup logic is similar with perfect join hash table
335  // first, prepare fragment lists per device
336  std::vector<ChunkKey> chunk_key_per_device;
337  for (int device_id = 0; device_id < device_count_; ++device_id) {
338  fragments_per_device.emplace_back(
339  shard_count
340  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
341  : query_info.fragments);
343  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
344  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
345  }
346  const auto chunk_key = genChunkKey(fragments_per_device[device_id]);
347  chunk_key_per_device.emplace_back(std::move(chunk_key));
348  }
349 
350  // prepare per-device cache key
351  auto inner_outer_pairs =
352  HashJoin::normalizeColumnPairs(condition_.get(), executor_->getTemporaryTables());
353  const auto& inner_outer_cols = inner_outer_pairs.first;
354  const auto& col_pairs_string_op_infos = inner_outer_pairs.second;
355  auto hashtable_access_path_info =
357  col_pairs_string_op_infos,
358  condition_->get_optype(),
359  join_type_,
362  shard_count,
363  fragments_per_device,
364  executor_);
365  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
366  hashtable_cache_meta_info_ = hashtable_access_path_info.meta_info;
367  table_keys_ = hashtable_access_path_info.table_keys;
368 
369  // the actual chunks fetched per device can be different but they constitute the same
370  // table in the same db, so we can exploit this to create an alternative table key
371  if (table_keys_.empty()) {
372  const auto& inner_table_key = getInnerTableId();
373  table_keys_ =
374  DataRecyclerUtil::getAlternativeTableKeys(chunk_key_per_device, inner_table_key);
375  }
376  CHECK(!table_keys_.empty());
377 
378  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
379  getInnerTableId().table_id > 0) {
380  // sometimes we cannot retrieve query plan dag, so try to recycler cache
381  // with the old-passioned cache key if we deal with hashtable of non-temporary table
382  for (int device_id = 0; device_id < device_count_; ++device_id) {
383  const auto num_tuples = std::accumulate(fragments_per_device[device_id].begin(),
384  fragments_per_device[device_id].end(),
385  size_t(0),
386  [](const auto& sum, const auto& fragment) {
387  return sum + fragment.getNumTuples();
388  });
391  num_tuples,
392  condition_->get_optype(),
393  join_type_,
394  chunk_key_per_device[device_id]};
395  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
396  }
397  }
398 
399  // register a mapping between cache key and input tables of the hash table
400  const auto invalid_cache_key =
401  HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_);
402  if (!invalid_cache_key) {
403  if (!shard_count) {
404  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_.front(),
405  table_keys_);
406  } else {
407  std::for_each(hashtable_cache_key_.cbegin(),
408  hashtable_cache_key_.cend(),
409  [this](QueryPlanHash key) {
410  hash_table_cache_->addQueryPlanDagForTableKeys(key, table_keys_);
411  });
412  }
413  }
414 
415  // now, let's try to check whether we have a cached hash table for this join qual
416  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
417 
418  // todo (yoonmin) : support dictionary proxy cache for join including string op(s)
419  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
420  std::unique_lock<std::mutex> str_proxy_translation_lock(str_proxy_translation_mutex_);
421  if (str_proxy_translation_maps_.empty()) {
422  const auto composite_key_info = HashJoin::getCompositeKeyInfo(
425  composite_key_info, inner_outer_string_op_infos_pairs_, executor_);
427  }
428  }
429 
430  auto allow_hashtable_recycling =
435  bool has_invalid_cached_hash_table = false;
436  if (effective_memory_level == Data_Namespace::CPU_LEVEL &&
438  allow_hashtable_recycling, invalid_cache_key, join_type_)) {
439  // build a hash table on CPU, and we have a chance to recycle the cached one if
440  // available
441  for (int device_id = 0; device_id < device_count_; ++device_id) {
442  auto hash_table =
443  initHashTableOnCpuFromCache(hashtable_cache_key_[device_id],
446  if (hash_table) {
447  hash_tables_for_device_[device_id] = hash_table;
448  } else {
449  has_invalid_cached_hash_table = true;
450  break;
451  }
452  }
453 
454  if (has_invalid_cached_hash_table) {
455  hash_tables_for_device_.clear();
456  hash_tables_for_device_.resize(device_count_);
457  } else {
459 #ifdef HAVE_CUDA
460  for (int device_id = 0; device_id < device_count_; ++device_id) {
461  auto cpu_hash_table = std::dynamic_pointer_cast<BaselineHashTable>(
462  hash_tables_for_device_[device_id]);
463  if (cpu_hash_table->getEntryCount()) {
464  copyCpuHashTableToGpu(cpu_hash_table, device_id, data_mgr);
465  }
466  }
467 #else
468  UNREACHABLE();
469 #endif
470  }
471  return;
472  }
473  }
474 
475  // we have no cached hash table for this qual
476  // so, start building the hash table by fetching columns for devices
477  for (int device_id = 0; device_id < device_count_; ++device_id) {
478  const auto columns_for_device =
479  fetchColumnsForDevice(fragments_per_device[device_id],
480  device_id,
482  ? dev_buff_owners[device_id].get()
483  : nullptr);
484  columns_per_device.push_back(columns_for_device);
485  }
486 
487  auto hashtable_layout_type = layout;
488  if (hashtable_layout_type == HashType::OneToMany) {
489  CHECK(!columns_per_device.front().join_columns.empty());
490  size_t tuple_count;
491  std::tie(tuple_count, std::ignore) = approximateTupleCount(columns_per_device);
492  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
493  // reset entries per device with one to many info
494  entries_per_device =
495  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_);
496  }
497  std::vector<std::future<void>> init_threads;
498  for (int device_id = 0; device_id < device_count_; ++device_id) {
499  BaselineHashTableEntryInfo hash_table_entry_info(
500  entries_per_device,
501  columns_per_device[device_id].join_columns.front().num_elems,
502  rowid_size_,
505  hashtable_layout_type,
507  init_threads.push_back(std::async(std::launch::async,
509  this,
510  columns_per_device[device_id],
511  hashtable_layout_type,
512  device_id,
513  hash_table_entry_info,
515  }
516  for (auto& init_thread : init_threads) {
517  init_thread.wait();
518  }
519  for (auto& init_thread : init_threads) {
520  init_thread.get();
521  }
522 }
523 
525  const std::vector<ColumnsForDevice>& columns_per_device) const {
526  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
527  CountDistinctDescriptor count_distinct_desc{
529  0,
530  0,
531  11,
532  true,
533  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
536  1};
537  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
538 
539  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
540 
541  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
542  int thread_count = cpu_threads();
543  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
544  auto hll_result = &hll_buffer_all_cpus[0];
545 
546  approximate_distinct_tuples(hll_result,
547  count_distinct_desc.bitmap_sz_bits,
548  padded_size_bytes,
549  columns_per_device.front().join_columns,
550  columns_per_device.front().join_column_types,
551  thread_count);
552  for (int i = 1; i < thread_count; ++i) {
553  hll_unify(hll_result,
554  hll_result + i * padded_size_bytes,
555  size_t(1) << count_distinct_desc.bitmap_sz_bits);
556  }
557  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits), 0);
558  }
559 #ifdef HAVE_CUDA
560  auto data_mgr = executor_->getDataMgr();
561  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
562  for (auto& host_hll_buffer : host_hll_buffers) {
563  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
564  }
565  std::vector<std::future<void>> approximate_distinct_device_threads;
566  for (int device_id = 0; device_id < device_count_; ++device_id) {
567  approximate_distinct_device_threads.emplace_back(std::async(
569  [device_id,
570  &columns_per_device,
571  &count_distinct_desc,
572  data_mgr,
573  &host_hll_buffers] {
574  auto allocator = std::make_unique<CudaAllocator>(
575  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
576  auto device_hll_buffer =
577  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
578  data_mgr->getCudaMgr()->zeroDeviceMem(
579  device_hll_buffer,
580  count_distinct_desc.bitmapPaddedSizeBytes(),
581  device_id,
583  const auto& columns_for_device = columns_per_device[device_id];
584  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
585  columns_for_device.join_columns, *allocator);
586  auto join_column_types_gpu = transfer_vector_of_flat_objects_to_gpu(
587  columns_for_device.join_column_types, *allocator);
588  const auto key_handler =
589  GenericKeyHandler(columns_for_device.join_columns.size(),
590  true,
591  join_columns_gpu,
592  join_column_types_gpu,
593  nullptr,
594  nullptr);
595  const auto key_handler_gpu =
596  transfer_flat_object_to_gpu(key_handler, *allocator);
598  reinterpret_cast<uint8_t*>(device_hll_buffer),
599  count_distinct_desc.bitmap_sz_bits,
600  key_handler_gpu,
601  columns_for_device.join_columns[0].num_elems);
602 
603  auto& host_hll_buffer = host_hll_buffers[device_id];
604  allocator->copyFromDevice(&host_hll_buffer[0],
605  device_hll_buffer,
606  count_distinct_desc.bitmapPaddedSizeBytes());
607  }));
608  }
609  for (auto& child : approximate_distinct_device_threads) {
610  child.get();
611  }
612  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
613  auto& result_hll_buffer = host_hll_buffers.front();
614  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
615  for (int device_id = 1; device_id < device_count_; ++device_id) {
616  auto& host_hll_buffer = host_hll_buffers[device_id];
617  hll_unify(hll_result,
618  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
619  size_t(1) << count_distinct_desc.bitmap_sz_bits);
620  }
621  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits), 0);
622 #else
623  UNREACHABLE();
624  return {0, 0};
625 #endif // HAVE_CUDA
626 }
627 
629  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
630  const int device_id,
631  DeviceAllocator* dev_buff_owner) {
632  const auto effective_memory_level =
634 
635  std::vector<JoinColumn> join_columns;
636  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
637  std::vector<JoinColumnTypeInfo> join_column_types;
638  std::vector<JoinBucketInfo> join_bucket_info;
639  std::vector<std::shared_ptr<void>> malloc_owner;
640  for (const auto& inner_outer_pair : inner_outer_pairs_) {
641  const auto inner_col = inner_outer_pair.first;
642  const auto inner_cd = get_column_descriptor_maybe(inner_col->getColumnKey());
643  if (inner_cd && inner_cd->isVirtualCol) {
645  }
646  join_columns.emplace_back(fetchJoinColumn(inner_col,
647  fragments,
648  effective_memory_level,
649  device_id,
650  chunks_owner,
651  dev_buff_owner,
652  malloc_owner,
653  executor_,
654  &column_cache_));
655  const auto& ti = inner_col->get_type_info();
656  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
657  0,
658  0,
660  isBitwiseEq(),
661  0,
663  }
664  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
665 }
666 
668  const ColumnsForDevice& columns_for_device,
669  const HashType layout,
670  const int device_id,
671  const BaselineHashTableEntryInfo hash_table_entries_info,
672  const logger::ThreadLocalIds parent_thread_local_ids) {
673  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
674  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
675  const auto effective_memory_level =
677  const auto err = initHashTableForDevice(columns_for_device.join_columns,
678  columns_for_device.join_column_types,
679  columns_for_device.join_buckets,
680  layout,
681  effective_memory_level,
682  hash_table_entries_info,
683  device_id);
684  if (err) {
685  throw HashJoinFail(
686  std::string("Unrecognized error when initializing baseline hash table (") +
687  std::to_string(err) + std::string(")"));
688  }
689 }
690 
693  return 0;
694  }
697 }
698 
700  // todo: relax the assumption that all keys have the same width
701  // (i.e., 3 join keys having different integer types: (smallint, int, bigint)
702  // current: 3 * 8 = 24 bytes / ideal: 2 + 4 + 8 = 14 bytes)
703  for (const auto& inner_outer_pair : inner_outer_pairs_) {
704  const auto inner_col = inner_outer_pair.first;
705  const auto& inner_col_ti = inner_col->get_type_info();
706  if (inner_col_ti.get_logical_size() > 4) {
707  CHECK_EQ(8, inner_col_ti.get_logical_size());
708  return 8;
709  }
710  }
711  return 4;
712 }
713 
715  return inner_outer_pairs_.size();
716 }
717 
719  const std::vector<InnerOuter>& inner_outer_pairs) const {
721  inner_outer_pairs, inner_outer_string_op_infos_pairs_, executor_)) {
724  }
725  return memory_level_;
726 }
727 
729  std::shared_ptr<BaselineHashTable>& cpu_hash_table,
730  const int device_id,
731  Data_Namespace::DataMgr* data_mgr) {
733 
734  builder.allocateDeviceMemory(
735  cpu_hash_table->getHashTableEntryInfo(), device_id, executor_, query_hints_);
736  auto gpu_target_hash_table = builder.getHashTable();
737  CHECK(gpu_target_hash_table);
738  const auto gpu_buff = gpu_target_hash_table->getGpuBuffer();
739  if (gpu_buff) {
740  auto allocator = std::make_unique<CudaAllocator>(
741  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
742  allocator->copyToDevice(
743  gpu_buff,
744  cpu_hash_table->getCpuBuffer(),
745  cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU));
746  }
747  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
748  hash_tables_for_device_[device_id] = std::move(gpu_target_hash_table);
749 }
750 
752  const std::vector<const StringDictionaryProxy::IdMap*>& str_proxy_translation_maps) {
753  StrProxyTranslationMapsPtrsAndOffsets translation_map_ptrs_and_offsets;
754  // First element of pair is vector of int32_t* pointing to translation map "vector"
755  // Second element of pair is vector of int32_t of min inner dictionary ids (offsets)
756  const size_t num_translation_maps = str_proxy_translation_maps.size();
757  translation_map_ptrs_and_offsets.first.reserve(num_translation_maps);
758  translation_map_ptrs_and_offsets.second.reserve(num_translation_maps);
759  for (const auto& str_proxy_translation_map : str_proxy_translation_maps) {
760  if (str_proxy_translation_map) {
761  translation_map_ptrs_and_offsets.first.emplace_back(
762  str_proxy_translation_map->data());
763  translation_map_ptrs_and_offsets.second.emplace_back(
764  str_proxy_translation_map->domainStart());
765  } else {
766  // dummy values
767  translation_map_ptrs_and_offsets.first.emplace_back(nullptr);
768  translation_map_ptrs_and_offsets.second.emplace_back(0);
769  }
770  }
771  return translation_map_ptrs_and_offsets;
772 }
773 
775  const std::vector<JoinColumn>& join_columns,
776  const std::vector<JoinColumnTypeInfo>& join_column_types,
777  const std::vector<JoinBucketInfo>& join_bucket_info,
778  const HashType layout,
779  const Data_Namespace::MemoryLevel effective_memory_level,
780  const BaselineHashTableEntryInfo hash_table_entry_info,
781  const int device_id) {
782  auto timer = DEBUG_TIMER(__func__);
783  const auto key_component_count = getKeyComponentCount();
784  int err = 0;
785  decltype(std::chrono::steady_clock::now()) ts1, ts2;
786  ts1 = std::chrono::steady_clock::now();
787  auto allow_hashtable_recycling =
792  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
793  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
794 
795  const auto composite_key_info = HashJoin::getCompositeKeyInfo(
797 
798  CHECK(!join_columns.empty());
799 
801  CHECK_EQ(device_id, 0);
802  }
803  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
804  std::shared_ptr<HashTable> hash_table{nullptr};
805  const auto str_proxy_translation_map_ptrs_and_offsets =
808 
809  const auto key_handler =
810  GenericKeyHandler(key_component_count,
811  true,
812  &join_columns[0],
813  &join_column_types[0],
814  &str_proxy_translation_map_ptrs_and_offsets.first[0],
815  &str_proxy_translation_map_ptrs_and_offsets.second[0]);
816  err = builder.initHashTableOnCpu(&key_handler,
817  composite_key_info,
818  join_columns,
819  join_column_types,
820  join_bucket_info,
821  str_proxy_translation_map_ptrs_and_offsets,
822  hash_table_entry_info,
823  join_type_,
824  executor_,
825  query_hints_);
826  hash_tables_for_device_[device_id] = builder.getHashTable();
827  ts2 = std::chrono::steady_clock::now();
828  auto hashtable_build_time =
829  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
830  if (!err && allow_hashtable_recycling && hash_tables_for_device_[device_id] &&
831  hash_tables_for_device_[device_id]->getHashTableBufferSize(
833  // add ht-related items to cache iff we have a valid hashtable
836  hash_tables_for_device_[device_id],
838  hashtable_build_time);
839 
840  hash_table_layout_cache_->putItemToCache(
841  hashtable_cache_key_[device_id],
842  hash_tables_for_device_[device_id]->getLayout(),
845  0,
846  0,
847  {});
848  }
849  // Transfer the hash table on the GPU if we've only built it on CPU
850  // but the query runs on GPU (join on dictionary encoded columns).
851  // Don't transfer the buffer if there was an error since we'll bail anyway.
852  if (memory_level_ == Data_Namespace::GPU_LEVEL && !err) {
853 #ifdef HAVE_CUDA
854  auto cpu_hash_table = std::dynamic_pointer_cast<BaselineHashTable>(
855  hash_tables_for_device_[device_id]);
856  if (cpu_hash_table->getEntryCount()) {
857  copyCpuHashTableToGpu(cpu_hash_table, device_id, executor_->getDataMgr());
858  }
859 #else
860  CHECK(false);
861 #endif
862  }
863  } else {
864 #ifdef HAVE_CUDA
866 
867  auto data_mgr = executor_->getDataMgr();
868  CudaAllocator allocator(
869  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
870  auto join_column_types_gpu =
871  transfer_vector_of_flat_objects_to_gpu(join_column_types, allocator);
872  auto join_columns_gpu =
873  transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
874  const auto key_handler = GenericKeyHandler(key_component_count,
875  true,
876  join_columns_gpu,
877  join_column_types_gpu,
878  nullptr,
879  nullptr);
880 
881  err = builder.initHashTableOnGpu(&key_handler,
882  join_columns,
883  join_type_,
884  hash_table_entry_info,
885  device_id,
886  executor_,
887  query_hints_);
888  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
889  hash_tables_for_device_[device_id] = builder.getHashTable();
890  if (!err && allow_hashtable_recycling && hash_tables_for_device_[device_id]) {
891  // add layout to cache iff we have a valid hashtable
892  hash_table_layout_cache_->putItemToCache(
893  hashtable_cache_key_[device_id],
894  hash_tables_for_device_[device_id]->getLayout(),
897  0,
898  0,
899  {});
900  }
901 #else
902  UNREACHABLE();
903 #endif
904  }
905  return err;
906 }
907 
908 #define LL_CONTEXT executor_->cgen_state_->context_
909 #define LL_BUILDER executor_->cgen_state_->ir_builder_
910 #define LL_INT(v) executor_->cgen_state_->llInt(v)
911 #define LL_FP(v) executor_->cgen_state_->llFp(v)
912 #define ROW_FUNC executor_->cgen_state_->row_func_
913 
915  const size_t index) {
916  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
918  const auto key_component_width = getKeyComponentWidth();
919  CHECK(key_component_width == 4 || key_component_width == 8);
920  auto key_buff_lv = codegenKey(co);
921  const auto hash_ptr = hashPtr(index);
922  const auto key_ptr_lv =
923  LL_BUILDER.CreatePointerCast(key_buff_lv, llvm::Type::getInt8PtrTy(LL_CONTEXT));
924  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
925  const auto hash_table = getHashTableForDevice(size_t(0));
926  return executor_->cgen_state_->emitExternalCall(
927  "baseline_hash_join_idx_" + std::to_string(key_component_width * 8),
929  {hash_ptr, key_ptr_lv, key_size_lv, LL_INT(hash_table->getEntryCount())});
930 }
931 
933  const CompilationOptions& co,
934  const size_t index) {
935  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
936  const auto hash_table = getHashTableForDevice(size_t(0));
937  CHECK(hash_table);
938  const auto key_component_width = getKeyComponentWidth();
939  CHECK(key_component_width == 4 || key_component_width == 8);
940  auto key_buff_lv = codegenKey(co);
942  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
943  const auto composite_dict_ptr_type =
944  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
945  const auto composite_key_dict =
946  hash_ptr->getType()->isPointerTy()
947  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
948  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
949  const auto key_component_count = getKeyComponentCount();
950  const auto key = executor_->cgen_state_->emitExternalCall(
951  "get_composite_key_index_" + std::to_string(key_component_width * 8),
953  {key_buff_lv,
954  LL_INT(key_component_count),
955  composite_key_dict,
956  LL_INT(hash_table->getEntryCount())});
957  auto one_to_many_ptr = hash_ptr;
958  if (one_to_many_ptr->getType()->isPointerTy()) {
959  one_to_many_ptr =
960  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
961  } else {
962  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
963  }
964  const auto composite_key_dict_size = offsetBufferOff();
965  one_to_many_ptr =
966  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
968  {one_to_many_ptr, key, LL_INT(int64_t(0)), LL_INT(hash_table->getEntryCount() - 1)},
969  false,
970  false,
971  false,
973  executor_);
974 }
975 
976 size_t BaselineJoinHashTable::offsetBufferOff() const noexcept {
977  return getKeyBufferSize();
978 }
979 
980 size_t BaselineJoinHashTable::countBufferOff() const noexcept {
983  } else {
984  return getKeyBufferSize();
985  }
986 }
987 
991  } else {
992  return getKeyBufferSize();
993  }
994 }
995 
997  const auto key_component_width = getKeyComponentWidth();
998  CHECK(key_component_width == 4 || key_component_width == 8);
999  const auto key_component_count = getKeyComponentCount();
1000  auto hash_table = getHashTableForDevice(size_t(0));
1001  CHECK(hash_table);
1002  if (layoutRequiresAdditionalBuffers(hash_table->getLayout())) {
1003  return hash_table->getEntryCount() * key_component_count * key_component_width;
1004  } else {
1005  return hash_table->getEntryCount() * (key_component_count + 1) * key_component_width;
1006  }
1007 }
1008 
1010  const auto hash_table = getHashTableForDevice(size_t(0));
1011  return hash_table->getEntryCount() * sizeof(int32_t);
1012 }
1013 
1015  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1016  const auto key_component_width = getKeyComponentWidth();
1017  CHECK(key_component_width == 4 || key_component_width == 8);
1018  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
1019  llvm::Value* key_buff_lv{nullptr};
1020  switch (key_component_width) {
1021  case 4:
1022  key_buff_lv =
1023  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
1024  break;
1025  case 8:
1026  key_buff_lv =
1027  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
1028  break;
1029  default:
1030  CHECK(false);
1031  }
1032 
1033  CodeGenerator code_generator(executor_);
1034  for (size_t i = 0; i < getKeyComponentCount(); ++i) {
1035  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(
1036  key_buff_lv->getType()->getScalarType()->getPointerElementType(),
1037  key_buff_lv,
1038  LL_INT(i));
1039  const auto& inner_outer_pair = inner_outer_pairs_[i];
1040  const auto outer_col = inner_outer_pair.second;
1041  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
1042  const auto val_col_var =
1043  dynamic_cast<const Analyzer::ColumnVar*>(inner_outer_pair.first);
1044  if (key_col_var && val_col_var &&
1046  key_col_var,
1047  val_col_var,
1048  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
1049  throw std::runtime_error(
1050  "Query execution fails because the query contains not supported self-join "
1051  "pattern. We suspect the query requires multiple left-deep join tree due to "
1052  "the join condition of the self-join and is not supported for now. Please "
1053  "consider rewriting table order in "
1054  "FROM clause.");
1055  }
1056  auto key_lv = HashJoin::codegenColOrStringOper(
1057  outer_col, inner_outer_string_op_infos_pairs_[i].second, code_generator, co);
1058  const auto key_lv_ext =
1059  LL_BUILDER.CreateSExt(key_lv, get_int_type(key_component_width * 8, LL_CONTEXT));
1060  LL_BUILDER.CreateStore(key_lv_ext, key_comp_dest_lv);
1061  }
1062  return key_buff_lv;
1063 }
1064 
1065 llvm::Value* BaselineJoinHashTable::hashPtr(const size_t index) {
1066  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1067  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1068  const auto pi8_type = llvm::Type::getInt8PtrTy(LL_CONTEXT);
1069  return hash_ptr->getType()->isPointerTy()
1070  ? LL_BUILDER.CreatePointerCast(hash_ptr, pi8_type)
1071  : LL_BUILDER.CreateIntToPtr(hash_ptr, pi8_type);
1072 }
1073 
1074 #undef ROW_FUNC
1075 #undef LL_INT
1076 #undef LL_BUILDER
1077 #undef LL_CONTEXT
1078 
1080  try {
1082  } catch (...) {
1083  CHECK(false);
1084  }
1085  return {0, 0};
1086 }
1087 
1089  CHECK(!inner_outer_pairs_.empty());
1090  const auto first_inner_col = inner_outer_pairs_.front().first;
1091  return first_inner_col->get_rte_idx();
1092 }
1093 
1095  auto hash_table = getHashTableForDevice(size_t(0));
1096  CHECK(hash_table);
1097  if (layout_override_) {
1098  return *layout_override_;
1099  } else {
1100  return hash_table->getLayout();
1101  }
1102 }
1103 
1105  const std::vector<InnerOuter>& inner_outer_pairs) {
1106  CHECK(!inner_outer_pairs.empty());
1107  const auto first_inner_col = inner_outer_pairs.front().first;
1108  return first_inner_col->getTableKey();
1109 }
1110 
1112  QueryPlanHash key,
1113  CacheItemType item_type,
1114  DeviceIdentifier device_identifier) {
1115  auto timer = DEBUG_TIMER(__func__);
1116  VLOG(1) << "Checking CPU hash table cache.";
1118  return hash_table_cache_->getItemFromCache(key, item_type, device_identifier);
1119 }
1120 
1122  QueryPlanHash key,
1123  CacheItemType item_type,
1124  std::shared_ptr<HashTable> hashtable_ptr,
1125  DeviceIdentifier device_identifier,
1126  size_t hashtable_building_time) {
1128  CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
1129  hash_table_cache_->putItemToCache(
1130  key,
1131  hashtable_ptr,
1132  item_type,
1133  device_identifier,
1134  hashtable_ptr->getHashTableBufferSize(ExecutorDeviceType::CPU),
1135  hashtable_building_time);
1136 }
1137 
1139  return condition_->get_optype() == kBW_EQ;
1140 }
1141 
1143  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments) const {
1144  std::vector<int> fragment_ids;
1145  std::for_each(
1146  fragments.cbegin(), fragments.cend(), [&fragment_ids](const auto& fragment) {
1147  fragment_ids.push_back(fragment.fragmentId);
1148  });
1149  return fragment_ids;
1150 }
size_t offsetBufferOff() const noexceptoverride
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
size_t DeviceIdentifier
Definition: DataRecycler.h:129
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:238
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:259
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
std::vector< QueryPlanHash > hashtable_cache_key_
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static bool canAccessHashTable(bool allow_hash_table_recycling, bool invalid_cache_key, JoinType join_type)
Definition: HashJoin.cpp:1049
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
#define LOG(tag)
Definition: Logger.h:285
static void checkHashJoinReplicationConstraint(const shared::TableKey &table_key, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:796
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:107
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:60
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:378
#define UNREACHABLE()
Definition: Logger.h:338
Data_Namespace::MemoryLevel get_effective_memory_level(const Data_Namespace::MemoryLevel memory_level, const bool needs_dict_translation)
const InputTableInfo & get_inner_query_info(const shared::TableKey &inner_table_key, const std::vector< InputTableInfo > &query_infos)
HashTableBuildDagMap hashtable_build_dag_map_
size_t getKeyBufferSize() const noexcept
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:417
const TableIdToNodeMap table_id_to_node_map_
size_t getComponentBufferSize() const noexceptoverride
RegisteredQueryHint query_hints_
bool needs_dictionary_translation(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const Executor *executor)
void allocateDeviceMemory(const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
static llvm::Value * codegenColOrStringOper(const Analyzer::Expr *col_or_string_oper, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, CodeGenerator &code_generator, const CompilationOptions &co)
Definition: HashJoin.cpp:564
void freeHashBufferMemory()
Definition: HashJoin.h:338
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:88
#define CHECK_GT(x, y)
Definition: Logger.h:305
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const JoinType join_type, const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
int getInnerTableRteIdx() const noexceptoverride
ExecutorDeviceType
std::string to_string(char const *&&v)
std::unordered_set< size_t > table_keys_
virtual ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
virtual int initHashTableForDevice(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id)
const std::vector< InputTableInfo > & query_infos_
virtual llvm::Value * codegenKey(const CompilationOptions &)
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
size_t payloadBufferOff() const noexceptoverride
std::vector< InnerOuter > inner_outer_pairs_
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:111
void reify(const HashType preferred_layout)
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
future< Result > async(Fn &&fn, Args &&...args)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
HashType getHashType() const noexceptoverride
static QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForBaselineHashJoin &info)
CacheItemType
Definition: DataRecycler.h:38
std::vector< InnerOuterStringOpInfos > inner_outer_string_op_infos_pairs_
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
Definition: Execute.h:241
static std::unique_ptr< HashtableRecycler > hash_table_cache_
ColumnCacheMap & column_cache_
#define LL_INT(v)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
static constexpr size_t MAX_NUM_HASH_ENTRIES
Definition: HashJoin.h:137
executor_(executor)
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:315
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
BaselineJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const JoinType join_type, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &col_pairs_string_op_infos, const int device_count, const RegisteredQueryHint &query_hints, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map)
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const
static std::vector< const StringDictionaryProxy::IdMap * > translateCompositeStrDictProxies(const CompositeKeyInfo &composite_key_info, const std::vector< InnerOuterStringOpInfos > &string_op_infos_for_keys, const Executor *executor)
Definition: HashJoin.cpp:528
HashtableCacheMetaInfo hashtable_cache_meta_info_
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
Definition: DataRecycler.h:154
#define LL_CONTEXT
#define AUTOMATIC_IR_METADATA(CGENSTATE)
virtual void reifyWithLayout(const HashType layout)
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:296
#define VLOGGING(n)
Definition: Logger.h:289
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
#define CHECK_LT(x, y)
Definition: Logger.h:303
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
#define LL_BUILDER
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:180
std::optional< HashType > layout_override_
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
LocalIdsScopeGuard setNewThreadId() const
Definition: Logger.cpp:538
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
virtual void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const int device_id, const BaselineHashTableEntryInfo hash_table_entry_info, const logger::ThreadLocalIds parent_thread_local_ids)
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const StrProxyTranslationMapsPtrsAndOffsets &str_proxy_translation_maps_ptrs_and_offsets, const BaselineHashTableEntryInfo hash_table_entry_info, const JoinType join_type, const Executor *executor, const RegisteredQueryHint &query_hint)
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
size_t QueryPlanHash
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
const Data_Namespace::MemoryLevel memory_level_
llvm::Value * hashPtr(const size_t index)
void approximate_distinct_tuples_on_device(uint8_t *hll_buffer, const uint32_t b, const GenericKeyHandler *key_handler, const int64_t num_elems)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
bool g_enable_watchdog false
Definition: Execute.cpp:80
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
bool isBitwiseEq() const override
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
Definition: sqldefs.h:33
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_info_pairs, const shared::TableKey &table_key)
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:1015
virtual size_t getKeyComponentCount() const
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void copyCpuHashTableToGpu(std::shared_ptr< BaselineHashTable > &cpu_hash_table, const int device_id, Data_Namespace::DataMgr *data_mgr)
ChunkKey genChunkKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments) const
std::vector< const StringDictionaryProxy::IdMap * > str_proxy_translation_maps_
virtual size_t getKeyComponentWidth() const
device_count_(device_count)
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
Allocate GPU memory using GpuBuffers via DataMgr.
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
ThreadId thread_id_
Definition: Logger.h:138
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:113
static std::unique_ptr< HashingSchemeRecycler > hash_table_layout_cache_
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
int cpu_threads()
Definition: thread_count.h:25
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:1084
StrProxyTranslationMapsPtrsAndOffsets decomposeStrDictTranslationMaps(const std::vector< const StringDictionaryProxy::IdMap * > &str_proxy_translation_maps)
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
shared::TableKey getInnerTableId() const noexceptoverride
HashType
Definition: HashTable.h:19
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:882
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:110
#define VLOG(n)
Definition: Logger.h:388
bool force_one_to_many_hash_join
Definition: QueryHint.h:375
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:176
const std::shared_ptr< Analyzer::BinOper > condition_
size_t countBufferOff() const noexceptoverride
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:470
memory_level_(memory_level)