31 const size_t shard_count,
33 const int device_count,
34 const Executor* executor) {
37 const auto shards_per_device = (shard_count + device_count - 1) / device_count;
39 const size_t entries_per_shard =
47 hash_table_entry_info,
48 executor->getDataMgr(),
50 if (hash_table_entry_info.getNumKeys() == 0) {
51 VLOG(1) <<
"Stop building a hash table based on a column: an input table is empty";
54 hash_table_->allocateGpuMemory(hash_table_entry_info.computeTotalNumSlots());
61 void initHashTableOnGpu(
const ChunkKey& chunk_key,
64 const bool is_bitwise_eq,
69 const size_t shard_count,
70 const int32_t hash_join_invalid_val,
72 const int device_count,
73 const Executor* executor) {
76 VLOG(1) <<
"Stop building a hash table based on a column: an input table is empty";
79 auto data_mgr = executor->getDataMgr();
82 ScopeGuard cleanup_error_buff = [&data_mgr, gpu_hash_table_err_buff]() {
83 data_mgr->free(gpu_hash_table_err_buff);
85 CHECK(gpu_hash_table_err_buff);
86 auto dev_err_buff = gpu_hash_table_err_buff->
getMemoryPtr();
88 auto allocator = std::make_unique<CudaAllocator>(
90 allocator->copyToDevice(dev_err_buff, &err,
sizeof(err));
92 auto gpu_hash_table_buff =
hash_table_->getGpuBuffer();
94 auto timer_init =
DEBUG_TIMER(
"Initialize GPU Perfect Hash Table");
97 hash_join_invalid_val);
99 if (chunk_key.empty()) {
103 const auto inner_col = cols.first;
105 const auto& ti = inner_col->get_type_info();
106 auto translated_null_val = col_range.
getIntMax() + 1;
108 translated_null_val = col_range.
getIntMin() - 1;
117 auto use_bucketization = inner_col->get_type_info().get_type() ==
kDATE;
118 auto timer_fill =
DEBUG_TIMER(
"Fill GPU Perfect Hash Table");
121 reinterpret_cast<int32_t*
>(gpu_hash_table_buff),
122 reinterpret_cast<int32_t*>(dev_err_buff),
123 hash_join_invalid_val,
137 for (
size_t shard = device_id; shard < shard_count; shard += device_count) {
138 auto const shard_info =
139 ShardInfo{shard, entries_per_shard, shard_count, device_count};
140 hash_table_fill_func(one_to_one_args, shard_info);
146 hash_table_fill_func(one_to_one_args);
150 reinterpret_cast<int32_t*
>(gpu_hash_table_buff),
162 for (
size_t shard = device_id; shard < shard_count; shard += device_count) {
163 auto const shard_info =
164 ShardInfo{shard, entries_per_shard, shard_count, device_count};
171 hash_table_fill_func(one_to_many_args);
174 allocator->copyFromDevice(&err, dev_err_buff, sizeof(err));
179 throw std::runtime_error(
"Unexpected error when building perfect hash table: " +
189 const bool is_bitwise_eq,
195 const int32_t hash_join_invalid_val,
196 const Executor* executor) {
198 const auto inner_col = cols.first;
200 const auto& ti = inner_col->get_type_info();
203 hash_table_entry_info);
204 if (hash_table_entry_info.
getNumKeys() == 0) {
205 VLOG(1) <<
"Stop building a hash table based on a column: an input table is empty";
208 auto cpu_hash_table_buff =
reinterpret_cast<int32_t*
>(
hash_table_->getCpuBuffer());
211 DEBUG_TIMER(
"Initialize CPU One-To-One Perfect Hash Table");
213 init_hash_join_buff_tbb(cpu_hash_table_buff,
215 hash_join_invalid_val);
216 #else // #ifdef HAVE_TBB
217 std::vector<std::thread> init_cpu_buff_threads;
218 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
219 init_cpu_buff_threads.emplace_back([hash_entry_info,
220 hash_join_invalid_val,
223 cpu_hash_table_buff] {
226 hash_join_invalid_val,
231 for (
auto& t : init_cpu_buff_threads) {
234 init_cpu_buff_threads.clear();
238 auto const use_bucketization = inner_col->get_type_info().get_type() ==
kDATE;
239 auto translated_null_val = col_range.
getIntMax() + 1;
241 translated_null_val = col_range.
getIntMin() - 1;
250 DEBUG_TIMER(
"Fill CPU One-To-One Perfect Hash Table");
254 hash_join_invalid_val,
258 str_proxy_translation_map ? str_proxy_translation_map->
data() :
nullptr,
259 str_proxy_translation_map ? str_proxy_translation_map->
domainStart()
267 std::vector<std::future<int>> fill_threads;
268 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
272 for (
auto& child : fill_threads) {
275 for (
auto& child : fill_threads) {
287 const bool is_bitwise_eq,
288 const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
293 const int32_t hash_join_invalid_val,
294 const Executor* executor) {
296 const auto inner_col = cols.first;
298 const auto& ti = inner_col->get_type_info();
301 hash_table_entry_info);
302 if (hash_table_entry_info.
getNumKeys() == 0) {
303 VLOG(1) <<
"Stop building a hash table based on a column: an input table is empty";
306 auto cpu_hash_table_buff =
reinterpret_cast<int32_t*
>(
hash_table_->getCpuBuffer());
309 auto timer_init =
DEBUG_TIMER(
"Initialize CPU One-To-Many Perfect Hash Table");
311 init_hash_join_buff_tbb(cpu_hash_table_buff,
313 hash_join_invalid_val);
314 #else // #ifdef HAVE_TBB
315 std::vector<std::future<void>> init_threads;
316 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
317 init_threads.emplace_back(
322 hash_join_invalid_val,
326 for (
auto& child : init_threads) {
329 for (
auto& child : init_threads) {
334 auto timer_build =
DEBUG_TIMER(
"Fill CPU One-To-Many Perfect Hash Table");
335 auto const use_bucketization = inner_col->get_type_info().get_type() ==
kDATE;
336 auto translated_null_val = col_range.
getIntMax() + 1;
338 translated_null_val = col_range.
getIntMin() - 1;
352 str_proxy_translation_map ? str_proxy_translation_map->
data() :
nullptr,
353 str_proxy_translation_map ? str_proxy_translation_map->
domainStart()
355 hash_entry_info.bucket_normalization,
360 hash_table_fill_func(
args, thread_count);
368 const size_t shard_count) {
370 return (total_entry_count + shard_count - 1) / shard_count;
int64_t getIntMin() const
std::vector< int > ChunkKey
void fill_hash_join_buff_on_device_sharded(OneToOnePerfectJoinHashTableFillFuncArgs const args, ShardInfo const shard_info)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
void fill_one_to_many_hash_table_on_device(OneToManyPerfectJoinHashTableFillFuncArgs const args)
void fill_hash_join_buff_on_device(OneToOnePerfectJoinHashTableFillFuncArgs const args)
virtual int8_t * getMemoryPtr()=0
int32_t domainStart() const
DEVICE int SUFFIX() fill_hash_join_buff_bitwise_eq(OneToOnePerfectJoinHashTableFillFuncArgs const args, int32_t const cpu_thread_idx, int32_t const cpu_thread_count)
const bool for_semi_anti_join(const JoinType join_type)
static size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count)
void allocateDeviceMemory(BucketizedHashEntryInfo hash_entry_info, PerfectHashTableEntryInfo hash_table_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
PerfectJoinHashTableBuilder()
void fill_one_to_many_hash_table(OneToManyPerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_count)
DEVICE int SUFFIX() fill_hash_join_buff(OneToOnePerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
future< Result > async(Fn &&fn, Args &&...args)
int64_t bucket_normalization
HashType getHashTableLayout() const
An AbstractBuffer is a unit of data management for a data manager.
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
std::unique_ptr< PerfectHashTable > getHashTable()
size_t getNumKeys() const
size_t getNormalizedHashEntryCount() const
void fill_one_to_many_hash_table_bucketized(OneToManyPerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
void fill_hash_join_buff_on_device_bucketized(OneToOnePerfectJoinHashTableFillFuncArgs const args)
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define DEBUG_TIMER(name)
void setNumHashEntries(size_t num_hash_entries)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(OneToOnePerfectJoinHashTableFillFuncArgs const args, int32_t const cpu_thread_idx, int32_t const cpu_thread_count)
std::unique_ptr< PerfectHashTable > hash_table_
void fill_one_to_many_hash_table_on_device_sharded(OneToManyPerfectJoinHashTableFillFuncArgs const args, ShardInfo const shard_info)
void fill_hash_join_buff_on_device_sharded_bucketized(OneToOnePerfectJoinHashTableFillFuncArgs const args, ShardInfo const shard_info)
size_t bucketized_hash_entry_count
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const PerfectHashTableEntryInfo hash_table_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const PerfectHashTableEntryInfo hash_table_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
void fill_one_to_many_hash_table_on_device_bucketized(OneToManyPerfectJoinHashTableFillFuncArgs const args)