24 std::lock_guard<std::mutex>& lock,
25 std::optional<ResultSetMetaInfo> meta_info)
const {
31 CHECK(resultset_cache);
32 auto candidate_resultset_it = std::find_if(
33 resultset_cache->begin(), resultset_cache->end(), [&key](
const auto& cached_item) {
34 return cached_item.key == key;
36 return candidate_resultset_it != resultset_cache->end();
52 std::optional<ResultSetMetaInfo> meta_info) {
59 CHECK(resultset_cache);
60 auto candidate_resultset_it = std::find_if(
61 resultset_cache->begin(), resultset_cache->end(), [&key](
const auto& cached_item) {
62 return cached_item.key == key;
64 if (candidate_resultset_it != resultset_cache->end()) {
65 CHECK(candidate_resultset_it->meta_info);
66 if (candidate_resultset_it->isDirty()) {
68 key, item_type, device_identifier, lock, candidate_resultset_it->meta_info);
71 auto candidate_resultset = candidate_resultset_it->cached_item;
72 decltype(std::chrono::steady_clock::now()) ts1, ts2;
73 ts1 = std::chrono::steady_clock::now();
75 auto copied_rs = candidate_resultset->
copy();
77 copied_rs->setCached(
true);
78 copied_rs->initStatus();
79 candidate_resultset_it->item_metric->incRefCount();
80 ts2 = std::chrono::steady_clock::now();
81 VLOG(1) << "[" << item_type << ", "
83 << "] Get cached query resultset from cache (key: " << key
84 << ", copying it takes "
85 << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
101 CHECK(resultset_cache);
102 auto candidate_resultset_it = std::find_if(
103 resultset_cache->begin(), resultset_cache->end(), [&key](
const auto& cached_item) {
104 return cached_item.key == key;
106 if (candidate_resultset_it != resultset_cache->end()) {
107 CHECK(candidate_resultset_it->meta_info);
108 if (candidate_resultset_it->isDirty()) {
113 candidate_resultset_it->meta_info);
116 auto candidate_resultset = candidate_resultset_it->cached_item;
117 auto output_meta_info = candidate_resultset->getTargetMetaInfo();
118 return output_meta_info;
129 std::optional<ResultSetMetaInfo> meta_info) {
134 CHECK(meta_info.has_value());
137 auto candidate_resultset_it = std::find_if(
138 resultset_cache->begin(), resultset_cache->end(), [&key](
const auto& cached_item) {
139 return cached_item.key == key;
141 bool has_cached_resultset =
false;
142 bool need_to_cleanup =
false;
143 if (candidate_resultset_it != resultset_cache->end()) {
144 has_cached_resultset =
true;
145 CHECK(candidate_resultset_it->meta_info);
146 if (candidate_resultset_it->isDirty()) {
147 need_to_cleanup =
true;
148 }
else if (candidate_resultset_it->cached_item->didOutputColumnar() !=
149 item_ptr->didOutputColumnar()) {
154 need_to_cleanup =
true;
155 VLOG(1) <<
"Failed to recycle query resultset: mismatched cached resultset layout";
158 if (need_to_cleanup) {
161 key, item_type, device_identifier, lock, candidate_resultset_it->meta_info);
162 has_cached_resultset =
false;
165 if (!has_cached_resultset) {
170 if (metric_tracker.getMaxCacheItemSize() !=
174 auto cache_status = metric_tracker.canAddItem(device_identifier, item_size);
176 LOG(
INFO) <<
"Failed to keep a query resultset: the size of the resultset ("
177 << item_size <<
" bytes) exceeds the current system limit ("
181 auto required_size = metric_tracker.calculateRequiredSpaceForItemAddition(
182 device_identifier, item_size);
184 LOG(
INFO) <<
"Cleanup cached query resultset(s) to make a free space ("
185 << required_size <<
" bytes) to cache a new resultset";
188 auto new_cache_metric_ptr = metric_tracker.putNewCacheItemMetric(
189 key, device_identifier, item_size, compute_time);
190 CHECK_EQ(item_size, new_cache_metric_ptr->getMemSize());
191 item_ptr->setCached(
true);
192 item_ptr->initStatus();
193 VLOG(1) <<
"[" << item_type <<
", "
195 <<
"] Put query resultset to cache (key: " << key <<
")";
196 resultset_cache->emplace_back(key, item_ptr, new_cache_metric_ptr, meta_info);
197 if (!meta_info->input_table_keys.empty()) {
207 std::lock_guard<std::mutex>& lock,
208 std::optional<ResultSetMetaInfo> meta_info) {
214 auto filter = [key](
auto const& item) {
return item.key == key; };
216 std::find_if(resultset_container->cbegin(), resultset_container->cend(), filter);
217 if (itr == resultset_container->cend()) {
220 itr->cached_item->invalidateResultSetChunks();
221 VLOG(1) <<
"[" << item_type <<
", "
223 <<
"] Remove item from cache (key: " << key <<
")";
224 resultset_container->erase(itr);
227 auto cache_metric = cache_metrics.getCacheItemMetric(key, device_identifier);
229 auto resultset_size = cache_metric->getMemSize();
230 cache_metrics.removeCacheItemMetric(key, device_identifier);
231 cache_metrics.updateCurrentCacheSize(
239 size_t required_size,
240 std::lock_guard<std::mutex>& lock,
241 std::optional<ResultSetMetaInfo> meta_info) {
242 int elimination_target_offset = 0;
243 size_t removed_size = 0;
245 auto actual_space_to_free = required_size;
247 double moderate_free_space =
248 static_cast<double>(metric_tracker.getTotalCacheSize()) / 2;
255 actual_space_to_free = moderate_free_space;
257 metric_tracker.sortCacheInfoByQueryMetric(device_identifier);
258 auto cached_item_metrics = metric_tracker.getCacheItemMetrics(device_identifier);
260 for (
auto& metric : cached_item_metrics) {
261 auto target_size = metric->getMemSize();
262 ++elimination_target_offset;
263 removed_size += target_size;
264 if (removed_size > actual_space_to_free) {
270 metric_tracker.removeMetricFromBeginning(device_identifier, elimination_target_offset);
272 metric_tracker.updateCurrentCacheSize(
280 auto item_cache =
getItemCache().find(item_type)->second;
281 for (
auto& kv : *item_cache) {
282 if (!kv.second->empty()) {
283 VLOG(1) <<
"[" << item_type <<
", "
286 <<
"] clear cache (# items: " << kv.second->size() <<
")";
295 std::unordered_set<QueryPlanHash>& key_set,
303 for (
auto key : key_set) {
310 std::ostringstream oss;
311 oss <<
"A current status of the query resultSet Recycler:\n";
313 oss <<
"\t" << item_type;
315 oss <<
"\n\t# cached query resultsets:\n";
316 auto item_cache =
getItemCache().find(item_type)->second;
317 for (
auto& cache_container : *item_cache) {
320 <<
", # query resultsets: " << cache_container.second->size() <<
"\n";
321 for (
auto& ht : *cache_container.second) {
322 oss <<
"\t\t\tHT] " << ht.item_metric->toString() <<
"\n";
325 oss <<
"\t" << metric_tracker.toString() <<
"\n";
330 std::tuple<QueryPlanHash, ResultSetPtr, std::optional<ResultSetMetaInfo>>
334 auto resultset_cache =
336 for (
auto& rs : *resultset_cache) {
337 if (!visited.count(rs.key)) {
338 return std::make_tuple(rs.key, rs.cached_item, rs.meta_info);
345 size_t hashed_query_plan_dag,
346 const std::unordered_set<size_t>& table_keys,
347 std::lock_guard<std::mutex>& lock) {
348 for (
auto table_key : table_keys) {
350 itr->second.insert(hashed_query_plan_dag);
354 std::optional<std::unordered_set<size_t>>
371 CHECK(resultset_cache);
372 auto candidate_resultset_it = std::find_if(
373 resultset_cache->begin(), resultset_cache->end(), [&key](
const auto& cached_item) {
374 return cached_item.key == key;
376 CHECK(candidate_resultset_it != resultset_cache->end());
377 CHECK(candidate_resultset_it->meta_info);
378 return candidate_resultset_it->meta_info->getTargetExprs();
std::mutex & getCacheLock() const
void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< ResultSetMetaInfo > meta_info=std::nullopt) override
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
std::vector< std::shared_ptr< Analyzer::Expr > > & getTargetExprs(QueryPlanHash key) const
bool g_use_query_resultset_cache
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
std::unordered_map< size_t, std::unordered_set< size_t > > table_key_to_query_plan_dag_map_
std::optional< std::vector< TargetMetaInfo > > getOutputMetaInfo(QueryPlanHash key)
void addQueryPlanDagForTableKeys(size_t hashed_query_plan_dag, const std::unordered_set< size_t > &table_keys, std::lock_guard< std::mutex > &lock)
void clearCache() override
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
std::shared_ptr< ResultSet > ResultSetPtr
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
bool g_enable_data_recycler
void clearCacheMetricTracker()
PerTypeCacheItemContainer const & getItemCache() const
std::optional< std::unordered_set< size_t > > getMappedQueryPlanDagsWithTableKey(size_t table_key) const
DEVICE auto copy(ARGS &&...args)
g_query_resultset_cache_total_bytes
bool g_enable_smem_group_by true
std::tuple< QueryPlanHash, ResultSetPtr, std::optional< ResultSetMetaInfo > > getCachedResultSetWithoutCacheKey(std::set< size_t > &visited, DeviceIdentifier device_identifier)
std::string toString() const override
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
std::unordered_set< CacheItemType > const & getCacheItemType() const
void putItemToCache(QueryPlanHash key, ResultSetPtr item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< ResultSetMetaInfo > meta_info=std::nullopt) override
void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< ResultSetMetaInfo > meta_info=std::nullopt) override
bool hasItemInCache(QueryPlanHash key)
void markCachedItemAsDirty(size_t table_key, std::unordered_set< QueryPlanHash > &key_set, CacheItemType item_type, DeviceIdentifier device_identifier) override
virtual ResultSetPtr getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< ResultSetMetaInfo > meta_info=std::nullopt)=0
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
g_max_cacheable_query_resultset_size_bytes
void removeTableKeyInfoFromQueryPlanDagMap(size_t table_key)