29 #include <boost/functional/hash.hpp>
33 #include <unordered_map>
55 constexpr
char const* cache_item_type_str[]{
56 "Perfect Join Hashtable",
57 "Baseline Join Hashtable",
58 "Bounding Box Intersect Join Hashtable",
59 "Hashing Scheme for Join Hashtable",
60 "Baseline Join Hashtable's Approximated Cardinality",
61 "Bounding Box Intersect Join Hashtable's Auto Tuner's Parameters",
64 static_assert(
sizeof(cache_item_type_str) /
sizeof(*cache_item_type_str) ==
66 return os << cache_item_type_str[item_type];
90 : query_plan_hash_(query_plan_hash),
metrics_({0, mem_size, compute_time}) {}
92 QueryPlanHash getQueryPlanHash()
const {
return query_plan_hash_; }
102 const std::array<size_t, CacheMetricType::NUM_METRIC_TYPE>& getMetrics()
const {
106 void setComputeTime(
size_t compute_time) {
110 void setMemSize(
const size_t mem_size) {
115 std::ostringstream oss;
116 oss <<
"Query plan hash: " << query_plan_hash_
125 std::array<size_t, CacheMetricType::NUM_METRIC_TYPE>
metrics_;
132 std::unordered_map<DeviceIdentifier, std::vector<std::shared_ptr<CacheItemMetric>>>;
150 constexpr
QueryPlanHash UNITARY_TABLE_ID_HASH_VALUE = 1703092966009212028;
151 return UNITARY_TABLE_ID_HASH_VALUE;
155 const std::vector<ChunkKey>& chunk_keys,
157 std::unordered_set<size_t> alternative_table_keys;
158 if (!chunk_keys.empty() && chunk_keys.front().size() > 2 &&
159 chunk_keys.front()[1] > 0) {
160 auto& chunk_key = chunk_keys.front();
164 std::vector<int> alternative_table_key{chunk_key[0], chunk_key[1]};
166 }
else if (inner_table_key.
table_id > 0) {
168 alternative_table_keys.insert(inner_table_key.
hash());
176 return alternative_table_keys;
186 size_t total_cache_size,
187 size_t max_cache_item_size,
194 for (
int gpu_device_identifier = num_gpus; gpu_device_identifier >= 1;
195 --gpu_device_identifier) {
197 std::vector<std::shared_ptr<CacheItemMetric>>());
201 std::vector<std::shared_ptr<CacheItemMetric>>());
205 LOG(
INFO) <<
"The total cache size of " << cache_item_type
206 <<
" is set too low, so we suggest raising it larger than 256MB";
209 if (max_cache_item_size < 1024 * 1024 * 10) {
211 <<
"The maximum item size of " << cache_item_type
212 <<
" that can be cached is set too low, we suggest raising it larger than 10MB";
215 LOG(
INFO) <<
"The maximum item size of " << cache_item_type
216 <<
" is set larger than its total cache size, so we force to set the "
217 "maximum item size as equal to the total cache size";
224 CacheMetricInfoMap::mapped_type
const& metrics) {
225 auto same_hash = [key](
auto itr) {
return itr->getQueryPlanHash() == key; };
226 return std::find_if(metrics.cbegin(), metrics.cend(), same_hash);
231 CacheMetricInfoMap::mapped_type
const& metrics) {
233 return itr == metrics.cend() ?
nullptr : *itr;
261 auto same_hash = [key](
auto itr) {
return itr.first == key; };
266 : std::make_optional(itr->second);
273 size_t compute_time) {
277 if (cached_metric->getMemSize() != mem_size) {
282 cached_metric->incRefCount();
283 return cached_metric;
286 auto cache_metric = std::make_shared<CacheItemMetric>(key, compute_time, mem_size);
290 cache_metric->incRefCount();
291 return itr->second.emplace_back(std::move(cache_metric));
297 if (itr != cache_metrics.cend()) {
298 cache_metrics.erase(itr);
304 metrics.erase(metrics.begin(), metrics.begin() + offset);
308 size_t item_size)
const {
312 const auto current_cache_size = it->second;
314 return rem < 0 ? item_size : item_size - rem;
322 <<
"] clear cache metrics (# items: " << kv.first <<
", " << kv.second
334 size_t item_size)
const {
343 CHECK(current_cache_size.has_value());
344 auto cache_size_after_addition = *current_cache_size + item_size;
358 CHECK(current_cache_size.has_value());
363 CHECK_LE(size, *current_cache_size);
372 [](
const std::shared_ptr<CacheItemMetric>& left,
373 const std::shared_ptr<CacheItemMetric>& right) {
374 auto& elem1_metrics = left->getMetrics();
375 auto& elem2_metrics = right->getMetrics();
377 if (elem1_metrics[i] != elem2_metrics[i]) {
378 return elem1_metrics[i] < elem2_metrics[i];
386 std::ostringstream oss;
387 oss <<
"Current memory consumption of caches for each device:\n";
389 oss <<
"\t\tDevice " << kv.first <<
" : " << kv.second <<
" bytes\n";
397 if (new_total_cache_size > 0) {
402 if (new_max_cache_item_size > 0) {
421 template <
typename CACHED_ITEM_TYPE,
typename META_INFO_TYPE>
424 CACHED_ITEM_TYPE item,
425 std::shared_ptr<CacheItemMetric> item_metric_ptr,
426 std::optional<META_INFO_TYPE> metadata = std::nullopt)
448 template <
typename CACHED_ITEM_TYPE,
typename META_INFO_TYPE>
453 std::unordered_map<DeviceIdentifier, std::shared_ptr<CachedItemContainer>>;
455 std::unordered_map<CacheItemType, std::shared_ptr<PerDeviceCacheItemContainer>>;
459 size_t total_cache_size,
460 size_t max_item_size,
462 for (
auto& item_type : item_types) {
467 auto item_container = std::make_shared<PerDeviceCacheItemContainer>();
468 for (
int gpu_device_identifier = num_gpus; gpu_device_identifier >= 1;
469 --gpu_device_identifier) {
470 item_container->emplace(gpu_device_identifier,
471 std::make_shared<CachedItemContainer>());
474 std::make_shared<CachedItemContainer>());
485 std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
488 CACHED_ITEM_TYPE item_ptr,
493 std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
500 std::unordered_set<QueryPlanHash>& key_set,
505 auto candidate_it = std::find_if(
509 return cached_item.key == key;
511 if (candidate_it != m.end()) {
512 candidate_it->setDirty();
517 auto candidate_it = std::find_if(
521 return cached_item.key == key;
523 return candidate_it != m.end() && candidate_it->isDirty();
526 virtual std::string
toString()
const = 0;
533 auto device_type_container_itr =
534 item_type_container_itr->second->find(device_identifier);
535 return device_type_container_itr != item_type_container_itr->second->end()
536 ? device_type_container_itr->second
542 std::optional<CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>>
547 std::lock_guard<std::mutex>& lock) {
548 auto candidate_it = std::find_if(
552 return cached_item.key == key;
554 if (candidate_it != m.end()) {
555 if (candidate_it->isDirty()) {
557 key, item_type, device_identifier, lock, candidate_it->meta_info);
560 return *candidate_it;
569 return container ? container->size() : 0;
576 return std::count_if(container->begin(),
578 [](
const auto& cached_item) {
return cached_item.isDirty(); });
585 return std::count_if(container->begin(),
587 [](
const auto& cached_item) {
return !cached_item.isDirty(); });
594 auto current_size_opt = metric_tracker.getCurrentCacheSize(device_identifier);
595 return current_size_opt ? current_size_opt.value() : 0;
603 return cache_metric_tracker.getCacheItemMetric(key, device_identifier);
607 if (new_total_cache_size > 0) {
614 if (new_max_cache_item_size > 0) {
629 container->erase(container->begin(), container->begin() + offset);
641 auto& left_metrics = left.
item_metric->getMetrics();
642 auto& right_metrics = right.item_metric->getMetrics();
644 if (left_metrics[i] != right_metrics[i]) {
645 return left_metrics[i] < right_metrics[i];
657 return metric_iter->second;
678 std::lock_guard<std::mutex>& lock,
679 std::optional<META_INFO_TYPE> meta_info = std::nullopt)
const = 0;
686 std::lock_guard<std::mutex>& lock,
687 std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
693 size_t required_size,
694 std::lock_guard<std::mutex>& lock,
695 std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
Defines data structures for the semantic analysis phase of query processing.
CACHED_ITEM_TYPE cached_item
std::mutex & getCacheLock() const
std::unordered_map< CacheItemType, std::shared_ptr< PerDeviceCacheItemContainer >> PerTypeCacheItemContainer
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
virtual std::string toString() const =0
size_t calculateRequiredSpaceForItemAddition(DeviceIdentifier device_identifier, size_t item_size) const
std::shared_ptr< CacheItemMetric > putNewCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier, size_t mem_size, size_t compute_time)
std::optional< CachedItem< CACHED_ITEM_TYPE, META_INFO_TYPE > > getCachedItemWithoutConsideringMetaInfo(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, CachedItemContainer &m, std::lock_guard< std::mutex > &lock)
size_t getCurrentNumCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
std::unordered_map< DeviceIdentifier, size_t > CacheSizeMap
DataRecycler(const std::vector< CacheItemType > &item_types, size_t total_cache_size, size_t max_item_size, int num_gpus)
std::vector< CachedItem< std::optional< HashType >, EMPTY_META_INFO >> CachedItemContainer
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
CacheMetricTracker const & getMetricTracker(CacheItemType item_type) const
CacheItemMetric(QueryPlanHash query_plan_hash, size_t compute_time, size_t mem_size)
std::unordered_set< CacheItemType > cache_item_types_
DEVICE void sort(ARGS &&...args)
std::optional< size_t > getCurrentCacheSize(DeviceIdentifier key) const
size_t getCurrentNumCleanCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
void setMaxCacheItemSize(size_t new_max_cache_item_size)
void markCachedItemAsDirtyImpl(QueryPlanHash key, CachedItemContainer &m) const
void setMaxCacheItemSize(CacheItemType item_type, size_t new_max_cache_item_size)
CacheAvailability canAddItem(DeviceIdentifier device_identifier, size_t item_size) const
void clearCacheMetricTracker()
PerTypeCacheItemContainer const & getItemCache() const
PerTypeCacheMetricTracker metric_tracker_
size_t getCurrentCacheSizeForDevice(CacheItemType item_type, DeviceIdentifier device_identifier) const
void setCurrentCacheSize(DeviceIdentifier device_identifier, size_t bytes)
size_t getMaxCacheItemSize() const
std::vector< std::shared_ptr< CacheItemMetric > > & getCacheItemMetrics(DeviceIdentifier device_identifier)
void removeCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier)
std::optional< META_INFO_TYPE > meta_info
virtual void initCache()=0
size_t getTotalCacheSize() const
bool isCachedItemDirty(QueryPlanHash key, CachedItemContainer &m) const
std::shared_ptr< CacheItemMetric > getCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier) const
std::string toString() const
PerTypeCacheItemContainer cached_items_container_
size_t max_cache_item_size_
CacheSizeMap current_cache_size_in_bytes_
void updateCurrentCacheSize(DeviceIdentifier device_identifier, CacheUpdateAction action, size_t size)
virtual void putItemToCache(QueryPlanHash key, CACHED_ITEM_TYPE item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
std::unordered_map< CacheItemType, CacheMetricTracker > PerTypeCacheMetricTracker
std::shared_ptr< CacheItemMetric > getCachedItemMetric(CacheItemType item_type, DeviceIdentifier device_identifier, QueryPlanHash key) const
static QueryPlanHash getUnitaryTableKey()
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
std::string toString(const Executor::ExtModuleKinds &kind)
void setTotalCacheSize(size_t new_total_cache_size)
void setTotalCacheSize(CacheItemType item_type, size_t new_total_cache_size)
static std::shared_ptr< CacheItemMetric > getCacheItemMetricImpl(QueryPlanHash key, CacheMetricInfoMap::mapped_type const &metrics)
size_t getCurrentNumDirtyCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
std::unordered_map< DeviceIdentifier, std::shared_ptr< CachedItemContainer >> PerDeviceCacheItemContainer
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
void sortCacheInfoByQueryMetric(DeviceIdentifier device_identifier)
std::unordered_set< CacheItemType > const & getCacheItemType() const
CacheMetricTracker(CacheItemType cache_item_type, size_t total_cache_size, size_t max_cache_item_size, int num_gpus=0)
virtual ~DataRecycler()=default
std::unordered_map< DeviceIdentifier, std::vector< std::shared_ptr< CacheItemMetric >>> CacheMetricInfoMap
static CacheMetricInfoMap::mapped_type::const_iterator getCacheItemMetricItr(QueryPlanHash key, CacheMetricInfoMap::mapped_type const &metrics)
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
CachedItem(QueryPlanHash hashed_plan, CACHED_ITEM_TYPE item, std::shared_ptr< CacheItemMetric > item_metric_ptr, std::optional< META_INFO_TYPE > metadata=std::nullopt)
virtual bool hasItemInCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt) const =0
bool g_enable_watchdog false
CacheMetricInfoMap cache_metrics_
virtual void clearCache()=0
virtual void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
Basic constructors and methods of the row set interface.
virtual CACHED_ITEM_TYPE getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
virtual void markCachedItemAsDirty(size_t table_key, std::unordered_set< QueryPlanHash > &key_set, CacheItemType item_type, DeviceIdentifier device_identifier)=0
void removeMetricFromBeginning(DeviceIdentifier device_identifier, int offset)
Execution unit for relational algebra. It's a low-level description of any relational algebra operati...
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
virtual void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
std::shared_ptr< CacheItemMetric > item_metric
std::array< size_t, CacheMetricType::NUM_METRIC_TYPE > metrics_