25 std::lock_guard<std::mutex>& lock,
26 std::optional<HashtableCacheMetaInfo> meta_info)
const {
33 CHECK(hashtable_cache);
34 auto candidate_ht_it = std::find_if(
35 hashtable_cache->begin(), hashtable_cache->end(), [&key](
const auto& cached_item) {
36 return cached_item.key == key;
38 if (candidate_ht_it != hashtable_cache->end()) {
40 CHECK(candidate_ht_it->meta_info &&
41 candidate_ht_it->meta_info->bbox_intersect_meta_info);
42 CHECK(meta_info && meta_info->bbox_intersect_meta_info);
44 *candidate_ht_it->meta_info->bbox_intersect_meta_info,
45 *meta_info->bbox_intersect_meta_info)) {
59 std::optional<HashtableCacheMetaInfo> meta_info) {
67 key, item_type, device_identifier, *hashtable_cache, lock);
69 bool can_return_cached_item =
false;
73 CHECK(candidate_ht->meta_info && candidate_ht->meta_info->bbox_intersect_meta_info);
74 CHECK(meta_info && meta_info->bbox_intersect_meta_info);
76 *candidate_ht->meta_info->bbox_intersect_meta_info,
77 *meta_info->bbox_intersect_meta_info)) {
78 can_return_cached_item =
true;
81 can_return_cached_item =
true;
83 if (can_return_cached_item) {
84 CHECK(!candidate_ht->isDirty());
85 candidate_ht->item_metric->incRefCount();
86 VLOG(1) <<
"[" << item_type <<
", "
88 <<
"] Recycle item in a cache (key: " << key <<
")";
89 return candidate_ht->cached_item;
96 std::shared_ptr<HashTable> item_ptr,
101 std::optional<HashtableCacheMetaInfo> meta_info) {
107 auto has_cached_ht =
hasItemInCache(key, item_type, device_identifier, lock, meta_info);
112 std::find_if(hashtable_cache->begin(),
113 hashtable_cache->end(),
114 [&key](
const auto& cached_item) {
return cached_item.key == key; });
115 bool found_candidate =
false;
116 if (candidate_it != hashtable_cache->end()) {
119 CHECK(candidate_it->meta_info &&
120 candidate_it->meta_info->bbox_intersect_meta_info);
121 CHECK(meta_info && meta_info->bbox_intersect_meta_info);
123 *candidate_it->meta_info->bbox_intersect_meta_info,
124 *meta_info->bbox_intersect_meta_info)) {
125 found_candidate =
true;
128 found_candidate =
true;
130 if (found_candidate && candidate_it->isDirty()) {
133 key, item_type, device_identifier, lock, candidate_it->meta_info);
134 has_cached_ht =
false;
139 if (!has_cached_ht) {
142 auto cache_status = metric_tracker.canAddItem(device_identifier, item_size);
145 LOG(
INFO) <<
"Caching hash table fails: hash table is too large";
152 auto required_size = metric_tracker.calculateRequiredSpaceForItemAddition(
153 device_identifier, item_size);
157 auto new_cache_metric_ptr = metric_tracker.putNewCacheItemMetric(
158 key, device_identifier, item_size, compute_time);
159 CHECK_EQ(item_size, new_cache_metric_ptr->getMemSize());
161 VLOG(1) <<
"[" << item_type <<
", "
163 <<
"] Put item to cache (key: " << key <<
")";
165 hashtable_cache->emplace_back(key, item_ptr, new_cache_metric_ptr, meta_info);
175 std::lock_guard<std::mutex>& lock,
176 std::optional<HashtableCacheMetaInfo> meta_info) {
183 auto cache_metric = cache_metrics.getCacheItemMetric(key, device_identifier);
185 auto hashtable_size = cache_metric->getMemSize();
187 auto filter = [key](
auto const& item) {
return item.key == key; };
189 std::find_if(hashtable_container->cbegin(), hashtable_container->cend(), filter);
190 if (itr == hashtable_container->cend()) {
193 VLOG(1) <<
"[" << item_type <<
", "
195 <<
"] remove cached item from cache (key: " << key <<
")";
196 hashtable_container->erase(itr);
199 cache_metrics.removeCacheItemMetric(key, device_identifier);
201 cache_metrics.updateCurrentCacheSize(
209 size_t required_size,
210 std::lock_guard<std::mutex>& lock,
211 std::optional<HashtableCacheMetaInfo> meta_info) {
214 int elimination_target_offset = 0;
215 size_t removed_size = 0;
217 auto actual_space_to_free = metric_tracker.getTotalCacheSize() / 2;
218 if (!
g_is_test_env && required_size < actual_space_to_free) {
222 required_size = actual_space_to_free;
224 metric_tracker.sortCacheInfoByQueryMetric(device_identifier);
225 auto cached_item_metrics = metric_tracker.getCacheItemMetrics(device_identifier);
229 for (
auto& metric : cached_item_metrics) {
230 auto target_size = metric->getMemSize();
231 ++elimination_target_offset;
232 removed_size += target_size;
233 if (removed_size > required_size) {
240 metric_tracker.removeMetricFromBeginning(device_identifier, elimination_target_offset);
243 metric_tracker.updateCurrentCacheSize(
251 auto item_cache =
getItemCache().find(item_type)->second;
252 for (
auto& kv : *item_cache) {
253 if (!kv.second->empty()) {
254 VLOG(1) <<
"[" << item_type <<
", "
257 <<
"] clear cache (# items: " << kv.second->size() <<
")";
266 std::unordered_set<QueryPlanHash>& key_set,
274 for (
auto key : key_set) {
295 std::lock_guard<std::mutex>& lock) {
297 CHECK(hashtable_cache);
301 auto& key_set = key_set_it->second;
302 for (
auto key : key_set) {
313 std::ostringstream oss;
314 oss <<
"A current status of the Hashtable Recycler:\n";
316 oss <<
"\t" << item_type;
318 oss <<
"\n\t# cached hashtables:\n";
319 auto item_cache =
getItemCache().find(item_type)->second;
320 for (
auto& cache_container : *item_cache) {
323 <<
", # hashtables: " << cache_container.second->size() <<
"\n";
324 for (
auto& ht : *cache_container.second) {
325 oss <<
"\t\t\tHT] " << ht.item_metric->toString() <<
"\n";
328 oss <<
"\t" << metric_tracker.toString() <<
"\n";
339 for (
size_t i = 0; i < candidate.
bucket_sizes.size(); i++) {
344 auto threshold_check =
348 return threshold_check && hashtable_size_check;
352 std::vector<const Analyzer::ColumnVar*>& inner_cols,
353 std::vector<const Analyzer::ColumnVar*>& outer_cols,
354 Executor* executor) {
357 hashed_join_col_info,
358 executor->getQueryPlanDagCache().translateColVarsToInfoHash(inner_cols,
false));
360 hashed_join_col_info,
361 executor->getQueryPlanDagCache().translateColVarsToInfoHash(outer_cols,
false));
362 return hashed_join_col_info;
367 bool need_dict_translation,
368 const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_info_pairs,
374 auto getNodeByTableId =
375 [&table_id_to_node_map](
377 auto it = table_id_to_node_map.find(table_key_param);
378 if (it != table_id_to_node_map.end()) {
383 bool found_sort_node =
false;
384 bool found_project_node =
false;
386 const auto origin_table_id = table_key.
table_id * -1;
387 const auto inner_node = getNodeByTableId({table_key.
db_id, origin_table_id});
396 auto sort_node =
dynamic_cast<const RelSort*
>(inner_node);
398 found_sort_node =
true;
400 auto project_node =
dynamic_cast<const RelProject*
>(inner_node);
402 found_project_node =
true;
406 return !(found_sort_node || (found_project_node && need_dict_translation));
410 const std::vector<QueryPlanHash>& cache_keys) {
411 return cache_keys.empty() ||
418 const std::vector<InnerOuter>& inner_outer_pairs,
419 const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs,
425 const std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>>& frags_for_device,
426 Executor* executor) {
429 std::vector<const Analyzer::ColumnVar*> inner_cols_vec, outer_cols_vec;
431 for (
auto& join_col_pair : inner_outer_pairs) {
432 inner_cols_vec.push_back(join_col_pair.first);
437 boost::hash_combine(join_qual_info,
438 executor->getQueryPlanDagCache().getJoinColumnsInfoHash(
440 boost::hash_combine(join_qual_info, op_type);
441 boost::hash_combine(join_qual_info, join_type);
443 boost::hash_combine(join_qual_info, join_col_pair.first->get_type_info().toString());
445 outer_cols_vec.push_back(outer_col_var);
446 if (join_col_pair.first->get_type_info().is_dict_encoded_string()) {
448 boost::hash_combine(join_qual_info,
449 executor->getQueryPlanDagCache().getJoinColumnsInfoHash(
451 boost::hash_combine(join_qual_info, outer_col_var->get_type_info().toString());
456 if (inner_outer_string_op_infos_pairs.size()) {
457 boost::hash_combine(join_qual_info, ::
toString(inner_outer_string_op_infos_pairs));
462 auto it = hashtable_build_dag_map.find(join_cols_info);
463 if (it != hashtable_build_dag_map.end()) {
465 boost::hash_combine(hashtable_access_path, it->second.inner_cols_access_path);
466 boost::hash_combine(hashtable_access_path, join_qual_info);
467 if (inner_cols_vec.front()->get_type_info().is_dict_encoded_string()) {
468 boost::hash_combine(hashtable_access_path, it->second.outer_cols_access_path);
470 boost::hash_combine(hashtable_access_path, shard_count);
474 auto cache_key_for_device = hashtable_access_path;
476 boost::hash_combine(cache_key_for_device, frag_list);
477 for (
int i = 0; i < device_count; ++i) {
483 for (
int i = 0; i < device_count; ++i) {
484 const auto frag_list_for_device =
486 auto cache_key_for_device = hashtable_access_path;
487 boost::hash_combine(cache_key_for_device, frag_list_for_device);
491 access_path_info.
table_keys = it->second.inputTableKeys;
493 return access_path_info;
497 std::shared_ptr<HashTable>,
498 std::optional<HashtableCacheMetaInfo>>
504 for (
auto& ht : *hashtable_cache) {
505 if (!visited.count(ht.key)) {
506 return std::make_tuple(ht.key, ht.cached_item, ht.meta_info);
513 size_t hashed_query_plan_dag,
514 const std::unordered_set<size_t>& table_keys) {
516 for (
auto table_key : table_keys) {
518 itr->second.insert(hashed_query_plan_dag);
522 std::optional<std::unordered_set<size_t>>
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
std::mutex & getCacheLock() const
bool hasItemInCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) const override
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
void putItemToCache(QueryPlanHash key, std::shared_ptr< HashTable > item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::optional< CachedItem< std::shared_ptr< HashTable >, HashtableCacheMetaInfo > > getCachedItemWithoutConsideringMetaInfo(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, CachedItemContainer &m, std::lock_guard< std::mutex > &lock)
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
static size_t getJoinColumnInfoHash(std::vector< const Analyzer::ColumnVar * > &inner_cols, std::vector< const Analyzer::ColumnVar * > &outer_cols, Executor *executor)
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
void markCachedItemAsDirtyImpl(QueryPlanHash key, CachedItemContainer &m) const
void addQueryPlanDagForTableKeys(size_t hashed_query_plan_dag, const std::unordered_set< size_t > &table_keys)
bool g_enable_data_recycler
void clearCacheMetricTracker()
void removeCachedHashtableBuiltFromSyntheticTable(CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock)
void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
PerTypeCacheItemContainer const & getItemCache() const
std::unordered_set< size_t > table_keys
std::vector< QueryPlanHash > hashed_query_plan_dag
bool checkHashtableForBoundingBoxIntersectBucketCompatability(const BoundingBoxIntersectMetaInfo &candidate_bucket_dim, const BoundingBoxIntersectMetaInfo &target_bucket_dim) const
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::unordered_map< size_t, std::unordered_set< size_t > > table_key_to_query_plan_dag_map_
static QueryPlanHash getUnitaryTableKey()
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt) override
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
std::unordered_set< CacheItemType > const & getCacheItemType() const
std::optional< std::unordered_set< size_t > > getMappedQueryPlanDagsWithTableKey(size_t table_key) const
std::string toString() const override
void markCachedItemAsDirty(size_t table_key, std::unordered_set< QueryPlanHash > &key_set, CacheItemType item_type, DeviceIdentifier device_identifier) override
void clearCache() override
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_info_pairs, const shared::TableKey &table_key)
void removeTableKeyInfoFromQueryPlanDagMap(size_t table_key)
std::tuple< QueryPlanHash, std::shared_ptr< HashTable >, std::optional< HashtableCacheMetaInfo > > getCachedHashtableWithoutCacheKey(std::set< size_t > &visited, CacheItemType hash_table_type, DeviceIdentifier device_identifier)
virtual std::shared_ptr< HashTable > getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< HashtableCacheMetaInfo > meta_info=std::nullopt)=0
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
bool g_use_hashtable_cache