OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DataRecycler.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "Analyzer/Analyzer.h"
25 #include "QueryEngine/ResultSet.h"
27 #include "Shared/misc.h"
28 
29 #include <boost/functional/hash.hpp>
30 
31 #include <algorithm>
32 #include <ostream>
33 #include <unordered_map>
34 
35 struct EMPTY_META_INFO {};
36 
37 // Item type that we try to recycle
39  PERFECT_HT = 0, // Perfect hashtable
40  BASELINE_HT, // Baseline hashtable
41  BBOX_INTERSECT_HT, // Bounding box intersect hashtable
42  HT_HASHING_SCHEME, // Hashtable layout
43  BASELINE_HT_APPROX_CARD, // Approximated cardinality for baseline hashtable
44  BBOX_INTERSECT_AUTO_TUNER_PARAM, // Bounding box intersect auto tuner's params
45  QUERY_RESULTSET, // query resultset
46  CHUNK_METADATA, // query resultset's chunk metadata
47  // TODO (yoonmin): support the following items for recycling
48  // COUNTALL_CARD_EST, Cardinality of query result
49  // NDV_CARD_EST, # Non-distinct value
50  // FILTER_SEL Selectivity of (push-downed) filter node
52 };
53 
54 inline std::ostream& operator<<(std::ostream& os, CacheItemType const item_type) {
55  constexpr char const* cache_item_type_str[]{
56  "Perfect Join Hashtable",
57  "Baseline Join Hashtable",
58  "Bounding Box Intersect Join Hashtable",
59  "Hashing Scheme for Join Hashtable",
60  "Baseline Join Hashtable's Approximated Cardinality",
61  "Bounding Box Intersect Join Hashtable's Auto Tuner's Parameters",
62  "Query ResultSet",
63  "Chunk Metadata"};
64  static_assert(sizeof(cache_item_type_str) / sizeof(*cache_item_type_str) ==
66  return os << cache_item_type_str[item_type];
67 }
68 
69 // given item to be cached, it represents whether the item can be cached when considering
70 // various size limitation
72  AVAILABLE, // item can be cached as is
73  AVAILABLE_AFTER_CLEANUP, // item can be cached after removing already cached items
74  UNAVAILABLE // item cannot be cached due to size limitation
75 };
76 
78 
79 // the order of enum values affects how we remove cached items when
80 // new item wants to be cached but there is not enough space to keep them
81 // regarding `REF_COUNT`, it represents how many times a cached item is referenced during
82 // its lifetime to numerically estimate the usefulness of this cached item
83 // (not to measure exact # reference count at time T as std::shared_ptr does)
85 
86 // per query plan DAG metric
88  public:
89  CacheItemMetric(QueryPlanHash query_plan_hash, size_t compute_time, size_t mem_size)
90  : query_plan_hash_(query_plan_hash), metrics_({0, mem_size, compute_time}) {}
91 
92  QueryPlanHash getQueryPlanHash() const { return query_plan_hash_; }
93 
94  void incRefCount() { ++metrics_[CacheMetricType::REF_COUNT]; }
95 
96  size_t getRefCount() const { return metrics_[CacheMetricType::REF_COUNT]; }
97 
98  size_t getComputeTime() const { return metrics_[CacheMetricType::COMPUTE_TIME]; }
99 
100  size_t getMemSize() const { return metrics_[CacheMetricType::MEM_SIZE]; }
101 
102  const std::array<size_t, CacheMetricType::NUM_METRIC_TYPE>& getMetrics() const {
103  return metrics_;
104  }
105 
106  void setComputeTime(size_t compute_time) {
107  metrics_[CacheMetricType::COMPUTE_TIME] = compute_time;
108  }
109 
110  void setMemSize(const size_t mem_size) {
112  }
113 
114  std::string toString() const {
115  std::ostringstream oss;
116  oss << "Query plan hash: " << query_plan_hash_
117  << ", compute_time: " << metrics_[CacheMetricType::COMPUTE_TIME]
118  << ", mem_size: " << metrics_[CacheMetricType::MEM_SIZE]
119  << ", ref_count: " << metrics_[CacheMetricType::REF_COUNT];
120  return oss.str();
121  }
122 
123  private:
124  const QueryPlanHash query_plan_hash_;
125  std::array<size_t, CacheMetricType::NUM_METRIC_TYPE> metrics_;
126 };
127 
128 // 0 = CPU, 1 ~ N : GPU-1 ~ GPU-N
129 using DeviceIdentifier = size_t;
130 using CacheSizeMap = std::unordered_map<DeviceIdentifier, size_t>;
131 using CacheMetricInfoMap =
132  std::unordered_map<DeviceIdentifier, std::vector<std::shared_ptr<CacheItemMetric>>>;
133 
135  public:
137 
138  static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier) {
139  std::string device_type = device_identifier == CPU_DEVICE_IDENTIFIER ? "CPU" : "GPU-";
140  return device_identifier != CPU_DEVICE_IDENTIFIER
141  ? device_type.append(std::to_string(device_identifier))
142  : device_type;
143  }
144 
146  // C++20 will support constexpr vector
147  // Before we have C++20, let's use a pre-computed constant which is retrieved from
148  // std::vector<int> unitary_table_identifier = {-1, -1};
149  // UNITARY_TABLE_ID_HASH_VALUE = boost::hash_value(unitary_table_identifier);
150  constexpr QueryPlanHash UNITARY_TABLE_ID_HASH_VALUE = 1703092966009212028;
151  return UNITARY_TABLE_ID_HASH_VALUE;
152  }
153 
154  static std::unordered_set<size_t> getAlternativeTableKeys(
155  const std::vector<ChunkKey>& chunk_keys,
156  const shared::TableKey& inner_table_key) {
157  std::unordered_set<size_t> alternative_table_keys;
158  if (!chunk_keys.empty() && chunk_keys.front().size() > 2 &&
159  chunk_keys.front()[1] > 0) {
160  auto& chunk_key = chunk_keys.front();
161  // the actual chunks fetched per device can be different but they constitute the
162  // same table in the same db, so we can exploit this to create an alternative table
163  // key
164  std::vector<int> alternative_table_key{chunk_key[0], chunk_key[1]};
165  alternative_table_keys.insert(boost::hash_value(alternative_table_key));
166  } else if (inner_table_key.table_id > 0) {
167  // use this path if chunk_keys is empty
168  alternative_table_keys.insert(inner_table_key.hash());
169  } else {
170  // this can happen when we use synthetic table generated by table function such as
171  // generate_series, i.e., SELECT ... FROM table(generate_series(...)) ...
172  // then we try to manage them via predefined static chunk key
173  // and remove them "all" when necessary
174  alternative_table_keys.insert(DataRecyclerUtil::getUnitaryTableKey());
175  }
176  return alternative_table_keys;
177  }
178 };
179 
180 // contain information regarding 1) per-cache item metric: perfect ht-1, perfect ht-2,
181 // baseline ht-1, ... and 2) per-type size in current: perfect-ht cache size, baseline-ht
182 // cache size, ...
184  public:
186  size_t total_cache_size,
187  size_t max_cache_item_size,
188  int num_gpus = 0)
189  : item_type_(cache_item_type)
190  , total_cache_size_(total_cache_size)
191  , max_cache_item_size_(max_cache_item_size) {
192  // initialize cache metrics for each device: CPU, GPU0, GPU1, ...
193  // Currently we only consider maintaining our cache in CPU-memory
194  for (int gpu_device_identifier = num_gpus; gpu_device_identifier >= 1;
195  --gpu_device_identifier) {
196  cache_metrics_.emplace(gpu_device_identifier,
197  std::vector<std::shared_ptr<CacheItemMetric>>());
198  current_cache_size_in_bytes_.emplace(gpu_device_identifier, 0);
199  }
201  std::vector<std::shared_ptr<CacheItemMetric>>());
203 
204  if (total_cache_size_ < 1024 * 1024 * 256) {
205  LOG(INFO) << "The total cache size of " << cache_item_type
206  << " is set too low, so we suggest raising it larger than 256MB";
207  }
208 
209  if (max_cache_item_size < 1024 * 1024 * 10) {
210  LOG(INFO)
211  << "The maximum item size of " << cache_item_type
212  << " that can be cached is set too low, we suggest raising it larger than 10MB";
213  }
214  if (max_cache_item_size > total_cache_size_) {
215  LOG(INFO) << "The maximum item size of " << cache_item_type
216  << " is set larger than its total cache size, so we force to set the "
217  "maximum item size as equal to the total cache size";
218  max_cache_item_size = total_cache_size_;
219  }
220  }
221 
222  static inline CacheMetricInfoMap::mapped_type::const_iterator getCacheItemMetricItr(
223  QueryPlanHash key,
224  CacheMetricInfoMap::mapped_type const& metrics) {
225  auto same_hash = [key](auto itr) { return itr->getQueryPlanHash() == key; };
226  return std::find_if(metrics.cbegin(), metrics.cend(), same_hash);
227  }
228 
229  static inline std::shared_ptr<CacheItemMetric> getCacheItemMetricImpl(
230  QueryPlanHash key,
231  CacheMetricInfoMap::mapped_type const& metrics) {
232  auto itr = getCacheItemMetricItr(key, metrics);
233  return itr == metrics.cend() ? nullptr : *itr;
234  }
235 
236  std::vector<std::shared_ptr<CacheItemMetric>>& getCacheItemMetrics(
237  DeviceIdentifier device_identifier) {
238  auto itr = cache_metrics_.find(device_identifier);
239  CHECK(itr != cache_metrics_.end());
240  return itr->second;
241  }
242 
243  std::shared_ptr<CacheItemMetric> getCacheItemMetric(
244  QueryPlanHash key,
245  DeviceIdentifier device_identifier) const {
246  auto itr = cache_metrics_.find(device_identifier);
247  return itr == cache_metrics_.cend() ? nullptr
248  : getCacheItemMetricImpl(key, itr->second);
249  }
250 
251  void setCurrentCacheSize(DeviceIdentifier device_identifier, size_t bytes) {
252  if (bytes > total_cache_size_) {
253  return;
254  }
255  auto itr = current_cache_size_in_bytes_.find(device_identifier);
256  CHECK(itr != current_cache_size_in_bytes_.end());
257  itr->second = bytes;
258  }
259 
260  std::optional<size_t> getCurrentCacheSize(DeviceIdentifier key) const {
261  auto same_hash = [key](auto itr) { return itr.first == key; };
262  auto itr = std::find_if(current_cache_size_in_bytes_.cbegin(),
264  same_hash);
265  return itr == current_cache_size_in_bytes_.cend() ? std::nullopt
266  : std::make_optional(itr->second);
267  }
268 
269  std::shared_ptr<CacheItemMetric> putNewCacheItemMetric(
270  QueryPlanHash key,
271  DeviceIdentifier device_identifier,
272  size_t mem_size,
273  size_t compute_time) {
274  auto itr = cache_metrics_.find(device_identifier);
275  CHECK(itr != cache_metrics_.end());
276  if (auto cached_metric = getCacheItemMetricImpl(key, itr->second)) {
277  if (cached_metric->getMemSize() != mem_size) {
279  device_identifier, CacheUpdateAction::REMOVE, cached_metric->getMemSize());
280  removeCacheItemMetric(key, device_identifier);
281  } else {
282  cached_metric->incRefCount();
283  return cached_metric;
284  }
285  }
286  auto cache_metric = std::make_shared<CacheItemMetric>(key, compute_time, mem_size);
287  updateCurrentCacheSize(device_identifier, CacheUpdateAction::ADD, mem_size);
288  // we add the item to cache after we create it during query runtime
289  // so it is used at least once
290  cache_metric->incRefCount();
291  return itr->second.emplace_back(std::move(cache_metric));
292  }
293 
294  void removeCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier) {
295  auto& cache_metrics = getCacheItemMetrics(device_identifier);
296  auto itr = getCacheItemMetricItr(key, cache_metrics);
297  if (itr != cache_metrics.cend()) {
298  cache_metrics.erase(itr);
299  }
300  }
301 
302  void removeMetricFromBeginning(DeviceIdentifier device_identifier, int offset) {
303  auto metrics = getCacheItemMetrics(device_identifier);
304  metrics.erase(metrics.begin(), metrics.begin() + offset);
305  }
306 
308  size_t item_size) const {
309  auto it = current_cache_size_in_bytes_.find(device_identifier);
310  CHECK(it != current_cache_size_in_bytes_.end());
311  CHECK_LE(item_size, total_cache_size_);
312  const auto current_cache_size = it->second;
313  long rem = total_cache_size_ - current_cache_size;
314  return rem < 0 ? item_size : item_size - rem;
315  }
316 
318  for (auto& kv : current_cache_size_in_bytes_) {
319  auto cache_item_metrics = getCacheItemMetrics(kv.first);
320  if (kv.first > 0) {
321  VLOG(1) << "[" << item_type_ << "]"
322  << "] clear cache metrics (# items: " << kv.first << ", " << kv.second
323  << " bytes)";
324  }
326  CHECK_EQ(getCurrentCacheSize(kv.first).value(), 0u);
327  }
328  for (auto& kv : cache_metrics_) {
329  kv.second.clear();
330  }
331  }
332 
334  size_t item_size) const {
335  if (item_size > max_cache_item_size_ || item_size > total_cache_size_) {
337  }
338  // now we know that a cache can hold the new item since its size is less than
339  // per-item maximum size limit
340  // check if we need to remove some (or all) of cached item to make a room
341  // for the new item
342  auto current_cache_size = getCurrentCacheSize(device_identifier);
343  CHECK(current_cache_size.has_value());
344  auto cache_size_after_addition = *current_cache_size + item_size;
345  if (cache_size_after_addition > total_cache_size_) {
346  // if so, we need to remove the item to hold the new one within the cache
348  }
349  // cache has a sufficient space to hold the new item
350  // thus, there is no need to remove cached item
352  }
353 
354  void updateCurrentCacheSize(DeviceIdentifier device_identifier,
356  size_t size) {
357  auto current_cache_size = getCurrentCacheSize(device_identifier);
358  CHECK(current_cache_size.has_value());
359  if (action == CacheUpdateAction::ADD) {
360  setCurrentCacheSize(device_identifier, current_cache_size.value() + size);
361  } else {
363  CHECK_LE(size, *current_cache_size);
364  setCurrentCacheSize(device_identifier, current_cache_size.value() - size);
365  }
366  }
367 
369  auto& metric_cache = getCacheItemMetrics(device_identifier);
370  std::sort(metric_cache.begin(),
371  metric_cache.end(),
372  [](const std::shared_ptr<CacheItemMetric>& left,
373  const std::shared_ptr<CacheItemMetric>& right) {
374  auto& elem1_metrics = left->getMetrics();
375  auto& elem2_metrics = right->getMetrics();
376  for (size_t i = 0; i < CacheMetricType::NUM_METRIC_TYPE; ++i) {
377  if (elem1_metrics[i] != elem2_metrics[i]) {
378  return elem1_metrics[i] < elem2_metrics[i];
379  }
380  }
381  return false;
382  });
383  }
384 
385  std::string toString() const {
386  std::ostringstream oss;
387  oss << "Current memory consumption of caches for each device:\n";
388  for (auto& kv : current_cache_size_in_bytes_) {
389  oss << "\t\tDevice " << kv.first << " : " << kv.second << " bytes\n";
390  }
391  return oss.str();
392  }
393 
394  size_t getTotalCacheSize() const { return total_cache_size_; }
395  size_t getMaxCacheItemSize() const { return max_cache_item_size_; }
396  void setTotalCacheSize(size_t new_total_cache_size) {
397  if (new_total_cache_size > 0) {
398  total_cache_size_ = new_total_cache_size;
399  }
400  }
401  void setMaxCacheItemSize(size_t new_max_cache_item_size) {
402  if (new_max_cache_item_size > 0) {
403  max_cache_item_size_ = new_max_cache_item_size;
404  }
405  }
406 
407  private:
411  // metadata of cached item that belongs to a cache of a specific device
412  // 1) ref_count: how many times this cached item is recycled
413  // 2) memory_usage: the size of cached item in bytes
414  // 3) compute_time: an elapsed time to generate this cached item
416 
417  // the total amount of currently cached data per device
419 };
420 
421 template <typename CACHED_ITEM_TYPE, typename META_INFO_TYPE>
422 struct CachedItem {
424  CACHED_ITEM_TYPE item,
425  std::shared_ptr<CacheItemMetric> item_metric_ptr,
426  std::optional<META_INFO_TYPE> metadata = std::nullopt)
427  : key(hashed_plan)
428  , cached_item(item)
429  , item_metric(item_metric_ptr)
430  , meta_info(metadata)
431  , dirty(false) {}
432 
433  void setDirty() { dirty = true; }
434  bool isDirty() const { return dirty; }
435 
437  CACHED_ITEM_TYPE cached_item;
438  std::shared_ptr<CacheItemMetric> item_metric;
439  std::optional<META_INFO_TYPE> meta_info;
440  bool dirty;
441 };
442 
443 // A main class of data recycler
444 // note that some tests which directly accesses APIs for update/modify/delete
445 // (meta)data may need to disable data recycler explicitly before running test suites
446 // to make test scenarios as expected
447 // i.e., UpdelStorageTest that calls fragmenter's updateColumn API
448 template <typename CACHED_ITEM_TYPE, typename META_INFO_TYPE>
450  public:
451  using CachedItemContainer = std::vector<CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>>;
453  std::unordered_map<DeviceIdentifier, std::shared_ptr<CachedItemContainer>>;
455  std::unordered_map<CacheItemType, std::shared_ptr<PerDeviceCacheItemContainer>>;
456  using PerTypeCacheMetricTracker = std::unordered_map<CacheItemType, CacheMetricTracker>;
457 
458  DataRecycler(const std::vector<CacheItemType>& item_types,
459  size_t total_cache_size,
460  size_t max_item_size,
461  int num_gpus) {
462  for (auto& item_type : item_types) {
463  cache_item_types_.insert(item_type);
464  metric_tracker_.emplace(
465  item_type,
466  CacheMetricTracker(item_type, total_cache_size, max_item_size, num_gpus));
467  auto item_container = std::make_shared<PerDeviceCacheItemContainer>();
468  for (int gpu_device_identifier = num_gpus; gpu_device_identifier >= 1;
469  --gpu_device_identifier) {
470  item_container->emplace(gpu_device_identifier,
471  std::make_shared<CachedItemContainer>());
472  }
473  item_container->emplace(DataRecyclerUtil::CPU_DEVICE_IDENTIFIER,
474  std::make_shared<CachedItemContainer>());
475  cached_items_container_.emplace(item_type, item_container);
476  }
477  }
478 
479  virtual ~DataRecycler() = default;
480 
481  virtual CACHED_ITEM_TYPE getItemFromCache(
482  QueryPlanHash key,
483  CacheItemType item_type,
484  DeviceIdentifier device_identifier,
485  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
486 
487  virtual void putItemToCache(QueryPlanHash key,
488  CACHED_ITEM_TYPE item_ptr,
489  CacheItemType item_type,
490  DeviceIdentifier device_identifier,
491  size_t item_size,
492  size_t compute_time,
493  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
494 
495  virtual void initCache() = 0;
496 
497  virtual void clearCache() = 0;
498 
499  virtual void markCachedItemAsDirty(size_t table_key,
500  std::unordered_set<QueryPlanHash>& key_set,
501  CacheItemType item_type,
502  DeviceIdentifier device_identifier) = 0;
503 
505  auto candidate_it = std::find_if(
506  m.begin(),
507  m.end(),
508  [&key](const CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>& cached_item) {
509  return cached_item.key == key;
510  });
511  if (candidate_it != m.end()) {
512  candidate_it->setDirty();
513  }
514  }
515 
517  auto candidate_it = std::find_if(
518  m.begin(),
519  m.end(),
520  [&key](const CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>& cached_item) {
521  return cached_item.key == key;
522  });
523  return candidate_it != m.end() && candidate_it->isDirty();
524  }
525 
526  virtual std::string toString() const = 0;
527 
528  std::shared_ptr<CachedItemContainer> getCachedItemContainer(
529  CacheItemType item_type,
530  DeviceIdentifier device_identifier) const {
531  auto item_type_container_itr = cached_items_container_.find(item_type);
532  if (item_type_container_itr != cached_items_container_.end()) {
533  auto device_type_container_itr =
534  item_type_container_itr->second->find(device_identifier);
535  return device_type_container_itr != item_type_container_itr->second->end()
536  ? device_type_container_itr->second
537  : nullptr;
538  }
539  return nullptr;
540  }
541 
542  std::optional<CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>>
544  CacheItemType item_type,
545  DeviceIdentifier device_identifier,
547  std::lock_guard<std::mutex>& lock) {
548  auto candidate_it = std::find_if(
549  m.begin(),
550  m.end(),
551  [&key](const CachedItem<CACHED_ITEM_TYPE, META_INFO_TYPE>& cached_item) {
552  return cached_item.key == key;
553  });
554  if (candidate_it != m.end()) {
555  if (candidate_it->isDirty()) {
557  key, item_type, device_identifier, lock, candidate_it->meta_info);
558  return std::nullopt;
559  }
560  return *candidate_it;
561  }
562  return std::nullopt;
563  }
564 
566  DeviceIdentifier device_identifier) const {
567  std::lock_guard<std::mutex> lock(cache_lock_);
568  auto container = getCachedItemContainer(item_type, device_identifier);
569  return container ? container->size() : 0;
570  }
571 
573  DeviceIdentifier device_identifier) const {
574  std::lock_guard<std::mutex> lock(cache_lock_);
575  auto container = getCachedItemContainer(item_type, device_identifier);
576  return std::count_if(container->begin(),
577  container->end(),
578  [](const auto& cached_item) { return cached_item.isDirty(); });
579  }
580 
582  DeviceIdentifier device_identifier) const {
583  std::lock_guard<std::mutex> lock(cache_lock_);
584  auto container = getCachedItemContainer(item_type, device_identifier);
585  return std::count_if(container->begin(),
586  container->end(),
587  [](const auto& cached_item) { return !cached_item.isDirty(); });
588  }
589 
591  DeviceIdentifier device_identifier) const {
592  std::lock_guard<std::mutex> lock(cache_lock_);
593  auto metric_tracker = getMetricTracker(item_type);
594  auto current_size_opt = metric_tracker.getCurrentCacheSize(device_identifier);
595  return current_size_opt ? current_size_opt.value() : 0;
596  }
597 
598  std::shared_ptr<CacheItemMetric> getCachedItemMetric(CacheItemType item_type,
599  DeviceIdentifier device_identifier,
600  QueryPlanHash key) const {
601  std::lock_guard<std::mutex> lock(cache_lock_);
602  auto cache_metric_tracker = getMetricTracker(item_type);
603  return cache_metric_tracker.getCacheItemMetric(key, device_identifier);
604  }
605 
606  void setTotalCacheSize(CacheItemType item_type, size_t new_total_cache_size) {
607  if (new_total_cache_size > 0) {
608  std::lock_guard<std::mutex> lock(cache_lock_);
609  getMetricTracker(item_type).setTotalCacheSize(new_total_cache_size);
610  }
611  }
612 
613  void setMaxCacheItemSize(CacheItemType item_type, size_t new_max_cache_item_size) {
614  if (new_max_cache_item_size > 0) {
615  std::lock_guard<std::mutex> lock(cache_lock_);
616  getMetricTracker(item_type).setMaxCacheItemSize(new_max_cache_item_size);
617  }
618  }
619 
620  protected:
622  DeviceIdentifier device_identifier,
623  int offset) {
624  // it removes cached items located from `idx 0` to `offset`
625  // so, call this function after sorting the cached items container vec
626  // and we should call this function under the proper locking scheme
627  auto container = getCachedItemContainer(item_type, device_identifier);
628  CHECK(container);
629  container->erase(container->begin(), container->begin() + offset);
630  }
631 
633  DeviceIdentifier device_identifier) {
634  // should call this function under the proper locking scheme
635  auto container = getCachedItemContainer(item_type, device_identifier);
636  CHECK(container);
637  std::sort(container->begin(),
638  container->end(),
641  auto& left_metrics = left.item_metric->getMetrics();
642  auto& right_metrics = right.item_metric->getMetrics();
643  for (size_t i = 0; i < CacheMetricType::NUM_METRIC_TYPE; ++i) {
644  if (left_metrics[i] != right_metrics[i]) {
645  return left_metrics[i] < right_metrics[i];
646  }
647  }
648  return false;
649  });
650  }
651 
652  std::mutex& getCacheLock() const { return cache_lock_; }
653 
655  auto metric_iter = metric_tracker_.find(item_type);
656  CHECK(metric_iter != metric_tracker_.end());
657  return metric_iter->second;
658  }
659 
661  return const_cast<DataRecycler*>(this)->getMetricTracker(item_type);
662  }
663 
664  std::unordered_set<CacheItemType> const& getCacheItemType() const {
665  return cache_item_types_;
666  }
667 
670  }
671 
672  private:
673  // internally called under the proper locking scheme
674  virtual bool hasItemInCache(
675  QueryPlanHash key,
676  CacheItemType item_type,
677  DeviceIdentifier device_identifier,
678  std::lock_guard<std::mutex>& lock,
679  std::optional<META_INFO_TYPE> meta_info = std::nullopt) const = 0;
680 
681  // internally called under the proper locking scheme
682  virtual void removeItemFromCache(
683  QueryPlanHash key,
684  CacheItemType item_type,
685  DeviceIdentifier device_identifier,
686  std::lock_guard<std::mutex>& lock,
687  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
688 
689  // internally called under the proper locking scheme
690  virtual void cleanupCacheForInsertion(
691  CacheItemType item_type,
692  DeviceIdentifier device_identifier,
693  size_t required_size,
694  std::lock_guard<std::mutex>& lock,
695  std::optional<META_INFO_TYPE> meta_info = std::nullopt) = 0;
696 
697  // a set of cache item type that this recycler supports
698  std::unordered_set<CacheItemType> cache_item_types_;
699 
700  // cache metric tracker
702 
703  // per-device cached item containers for each cached item type
705 
706  mutable std::mutex cache_lock_;
707 };
Defines data structures for the semantic analysis phase of query processing.
CACHED_ITEM_TYPE cached_item
Definition: DataRecycler.h:437
std::mutex & getCacheLock() const
Definition: DataRecycler.h:652
std::unordered_map< CacheItemType, std::shared_ptr< PerDeviceCacheItemContainer >> PerTypeCacheItemContainer
Definition: DataRecycler.h:455
#define CHECK_EQ(x, y)
Definition: Logger.h:301
CacheUpdateAction
Definition: DataRecycler.h:77
size_t DeviceIdentifier
Definition: DataRecycler.h:129
static std::string getDeviceIdentifierString(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:138
virtual std::string toString() const =0
size_t calculateRequiredSpaceForItemAddition(DeviceIdentifier device_identifier, size_t item_size) const
Definition: DataRecycler.h:307
std::shared_ptr< CacheItemMetric > putNewCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier, size_t mem_size, size_t compute_time)
Definition: DataRecycler.h:269
std::optional< CachedItem< CACHED_ITEM_TYPE, META_INFO_TYPE > > getCachedItemWithoutConsideringMetaInfo(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, CachedItemContainer &m, std::lock_guard< std::mutex > &lock)
Definition: DataRecycler.h:543
size_t getCurrentNumCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:565
CacheMetricTracker & getMetricTracker(CacheItemType item_type)
Definition: DataRecycler.h:654
CacheItemType item_type_
Definition: DataRecycler.h:408
std::unordered_map< DeviceIdentifier, size_t > CacheSizeMap
Definition: DataRecycler.h:130
DataRecycler(const std::vector< CacheItemType > &item_types, size_t total_cache_size, size_t max_item_size, int num_gpus)
Definition: DataRecycler.h:458
bool isDirty() const
Definition: DataRecycler.h:434
#define LOG(tag)
Definition: Logger.h:285
std::vector< CachedItem< std::optional< HashType >, EMPTY_META_INFO >> CachedItemContainer
Definition: DataRecycler.h:451
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
CacheMetricTracker const & getMetricTracker(CacheItemType item_type) const
Definition: DataRecycler.h:660
void setDirty()
Definition: DataRecycler.h:433
CacheItemMetric(QueryPlanHash query_plan_hash, size_t compute_time, size_t mem_size)
Definition: DataRecycler.h:89
std::unordered_set< CacheItemType > cache_item_types_
Definition: DataRecycler.h:698
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
QueryPlanHash key
Definition: DataRecycler.h:436
std::optional< size_t > getCurrentCacheSize(DeviceIdentifier key) const
Definition: DataRecycler.h:260
size_t getCurrentNumCleanCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:581
std::shared_ptr< CachedItemContainer > getCachedItemContainer(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:528
void setMaxCacheItemSize(size_t new_max_cache_item_size)
Definition: DataRecycler.h:401
void markCachedItemAsDirtyImpl(QueryPlanHash key, CachedItemContainer &m) const
Definition: DataRecycler.h:504
void setMaxCacheItemSize(CacheItemType item_type, size_t new_max_cache_item_size)
Definition: DataRecycler.h:613
CacheAvailability canAddItem(DeviceIdentifier device_identifier, size_t item_size) const
Definition: DataRecycler.h:333
void clearCacheMetricTracker()
Definition: DataRecycler.h:317
std::string to_string(char const *&&v)
PerTypeCacheItemContainer const & getItemCache() const
Definition: DataRecycler.h:668
CacheAvailability
Definition: DataRecycler.h:71
PerTypeCacheMetricTracker metric_tracker_
Definition: DataRecycler.h:701
size_t getCurrentCacheSizeForDevice(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:590
std::mutex cache_lock_
Definition: DataRecycler.h:706
void setCurrentCacheSize(DeviceIdentifier device_identifier, size_t bytes)
Definition: DataRecycler.h:251
size_t getMaxCacheItemSize() const
Definition: DataRecycler.h:395
std::vector< std::shared_ptr< CacheItemMetric > > & getCacheItemMetrics(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:236
void removeCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier)
Definition: DataRecycler.h:294
CacheItemType
Definition: DataRecycler.h:38
std::optional< META_INFO_TYPE > meta_info
Definition: DataRecycler.h:439
virtual void initCache()=0
size_t getTotalCacheSize() const
Definition: DataRecycler.h:394
bool isCachedItemDirty(QueryPlanHash key, CachedItemContainer &m) const
Definition: DataRecycler.h:516
std::shared_ptr< CacheItemMetric > getCacheItemMetric(QueryPlanHash key, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:243
std::string toString() const
Definition: DataRecycler.h:385
PerTypeCacheItemContainer cached_items_container_
Definition: DataRecycler.h:704
size_t max_cache_item_size_
Definition: DataRecycler.h:410
CacheSizeMap current_cache_size_in_bytes_
Definition: DataRecycler.h:418
void updateCurrentCacheSize(DeviceIdentifier device_identifier, CacheUpdateAction action, size_t size)
Definition: DataRecycler.h:354
virtual void putItemToCache(QueryPlanHash key, CACHED_ITEM_TYPE item_ptr, CacheItemType item_type, DeviceIdentifier device_identifier, size_t item_size, size_t compute_time, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
std::unordered_map< CacheItemType, CacheMetricTracker > PerTypeCacheMetricTracker
Definition: DataRecycler.h:456
std::shared_ptr< CacheItemMetric > getCachedItemMetric(CacheItemType item_type, DeviceIdentifier device_identifier, QueryPlanHash key) const
Definition: DataRecycler.h:598
static QueryPlanHash getUnitaryTableKey()
Definition: DataRecycler.h:145
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
Definition: DataRecycler.h:154
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1703
void setTotalCacheSize(size_t new_total_cache_size)
Definition: DataRecycler.h:396
void setTotalCacheSize(CacheItemType item_type, size_t new_total_cache_size)
Definition: DataRecycler.h:606
static std::shared_ptr< CacheItemMetric > getCacheItemMetricImpl(QueryPlanHash key, CacheMetricInfoMap::mapped_type const &metrics)
Definition: DataRecycler.h:229
size_t getCurrentNumDirtyCachedItems(CacheItemType item_type, DeviceIdentifier device_identifier) const
Definition: DataRecycler.h:572
std::unordered_map< DeviceIdentifier, std::shared_ptr< CachedItemContainer >> PerDeviceCacheItemContainer
Definition: DataRecycler.h:453
CacheMetricType
Definition: DataRecycler.h:84
void sortCacheContainerByQueryMetric(CacheItemType item_type, DeviceIdentifier device_identifier)
Definition: DataRecycler.h:632
#define CHECK_LE(x, y)
Definition: Logger.h:304
void sortCacheInfoByQueryMetric(DeviceIdentifier device_identifier)
Definition: DataRecycler.h:368
std::unordered_set< CacheItemType > const & getCacheItemType() const
Definition: DataRecycler.h:664
CacheMetricTracker(CacheItemType cache_item_type, size_t total_cache_size, size_t max_cache_item_size, int num_gpus=0)
Definition: DataRecycler.h:185
virtual ~DataRecycler()=default
size_t QueryPlanHash
std::unordered_map< DeviceIdentifier, std::vector< std::shared_ptr< CacheItemMetric >>> CacheMetricInfoMap
Definition: DataRecycler.h:132
static CacheMetricInfoMap::mapped_type::const_iterator getCacheItemMetricItr(QueryPlanHash key, CacheMetricInfoMap::mapped_type const &metrics)
Definition: DataRecycler.h:222
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
Definition: RelAlgDag.cpp:3548
CachedItem(QueryPlanHash hashed_plan, CACHED_ITEM_TYPE item, std::shared_ptr< CacheItemMetric > item_metric_ptr, std::optional< META_INFO_TYPE > metadata=std::nullopt)
Definition: DataRecycler.h:423
virtual bool hasItemInCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt) const =0
bool g_enable_watchdog false
Definition: Execute.cpp:80
CacheMetricInfoMap cache_metrics_
Definition: DataRecycler.h:415
#define CHECK(condition)
Definition: Logger.h:291
virtual void clearCache()=0
virtual void removeItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
Basic constructors and methods of the row set interface.
virtual CACHED_ITEM_TYPE getItemFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
virtual void markCachedItemAsDirty(size_t table_key, std::unordered_set< QueryPlanHash > &key_set, CacheItemType item_type, DeviceIdentifier device_identifier)=0
void removeMetricFromBeginning(DeviceIdentifier device_identifier, int offset)
Definition: DataRecycler.h:302
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
void removeCachedItemFromBeginning(CacheItemType item_type, DeviceIdentifier device_identifier, int offset)
Definition: DataRecycler.h:621
virtual void cleanupCacheForInsertion(CacheItemType item_type, DeviceIdentifier device_identifier, size_t required_size, std::lock_guard< std::mutex > &lock, std::optional< META_INFO_TYPE > meta_info=std::nullopt)=0
std::shared_ptr< CacheItemMetric > item_metric
Definition: DataRecycler.h:438
#define VLOG(n)
Definition: Logger.h:388
std::array< size_t, CacheMetricType::NUM_METRIC_TYPE > metrics_
Definition: DataRecycler.h:90
size_t hash() const