OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
BufferMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
22 
23 #include <algorithm>
24 #include <iomanip>
25 #include <limits>
26 
29 #include "Logger/Logger.h"
30 #include "Shared/measure.h"
31 
32 using namespace std;
33 
34 namespace Buffer_Namespace {
35 
36 std::string BufferMgr::keyToString(const ChunkKey& key) {
37  std::ostringstream oss;
38 
39  oss << " key: ";
40  for (auto sub_key : key) {
41  oss << sub_key << ",";
42  }
43  return oss.str();
44 }
45 
47 BufferMgr::BufferMgr(const int device_id,
48  const size_t max_buffer_pool_size,
49  const size_t min_slab_size,
50  const size_t max_slab_size,
51  const size_t default_slab_size,
52  const size_t page_size,
53  AbstractBufferMgr* parent_mgr)
54  : AbstractBufferMgr(device_id)
55  , max_buffer_pool_size_(max_buffer_pool_size)
56  , min_slab_size_(min_slab_size)
57  , max_slab_size_(max_slab_size)
58  , default_slab_size_(default_slab_size)
59  , page_size_(page_size)
60  , num_pages_allocated_(0)
61  , allocations_capped_(false)
62  , parent_mgr_(parent_mgr)
63  , max_buffer_id_(0)
64  , buffer_epoch_(0) {
65  CHECK_GT(max_buffer_pool_size_, size_t(0));
66  CHECK_GT(page_size_, size_t(0));
67  // TODO change checks on run-time configurable slab size variables to exceptions
68  CHECK_GT(min_slab_size_, size_t(0));
69  CHECK_GT(max_slab_size_, size_t(0));
70  CHECK_GT(default_slab_size_, size_t(0));
74  CHECK_EQ(min_slab_size_ % page_size_, size_t(0));
75  CHECK_EQ(max_slab_size_ % page_size_, size_t(0));
76  CHECK_EQ(default_slab_size_ % page_size_, size_t(0));
77 
83  max_num_pages_per_slab_; // current_max_num_pages_per_slab_ will drop as
84  // allocations fail - this is the high water mark
85 }
86 
89  clear();
90 }
91 
95  max_num_pages_per_slab_; // current_max_num_pages_per_slab_ will drop as
96  // allocations fail - this is the high water mark
97  allocations_capped_ = false;
98 }
99 
101  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
102  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
103  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
104 
105  for (auto& buf : chunk_index_) {
106  delete buf.second->buffer;
107  }
108 
109  chunk_index_.clear();
110  slabs_.clear();
111  slab_segments_.clear();
112  unsized_segs_.clear();
113  buffer_epoch_ = 0;
114 }
115 
118  const size_t chunk_page_size,
119  const size_t initial_size) {
120  // LOG(INFO) << printMap();
121  size_t actual_chunk_page_size = chunk_page_size;
122  if (actual_chunk_page_size == 0) {
123  actual_chunk_page_size = page_size_;
124  }
125 
126  // chunk_page_size is just for recording dirty pages
127  {
128  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
129  CHECK(chunk_index_.find(chunk_key) == chunk_index_.end());
130  BufferSeg buffer_seg(BufferSeg(-1, 0, USED));
131  buffer_seg.chunk_key = chunk_key;
132  std::lock_guard<std::mutex> unsizedSegsLock(unsized_segs_mutex_);
133  unsized_segs_.push_back(buffer_seg); // race condition?
134  chunk_index_[chunk_key] =
135  std::prev(unsized_segs_.end(),
136  1); // need to do this before allocating Buffer because doing so could
137  // change the segment used
138  }
139  // following should be safe outside the lock b/c first thing Buffer
140  // constructor does is pin (and its still in unsized segs at this point
141  // so can't be evicted)
142  try {
143  allocateBuffer(chunk_index_[chunk_key], actual_chunk_page_size, initial_size);
144  } catch (const OutOfMemory&) {
145  auto buffer_it = chunk_index_.find(chunk_key);
146  CHECK(buffer_it != chunk_index_.end());
147  buffer_it->second->buffer =
148  nullptr; // constructor failed for the buffer object so make sure to mark it null
149  // so deleteBuffer doesn't try to delete it
150  deleteBuffer(chunk_key);
151  throw;
152  }
153  CHECK(initial_size == 0 || chunk_index_[chunk_key]->buffer->getMemoryPtr());
154  // chunk_index_[chunk_key]->buffer->pin();
155  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
156  return chunk_index_[chunk_key]->buffer;
157 }
158 
159 BufferList::iterator BufferMgr::evict(BufferList::iterator& evict_start,
160  const size_t num_pages_requested,
161  const int slab_num) {
162  // We can assume here that buffer for evictStart either doesn't exist
163  // (evictStart is first buffer) or was not free, so don't need ot merge
164  // it
165  auto evict_it = evict_start;
166  size_t num_pages = 0;
167  size_t start_page = evict_start->start_page;
168  while (num_pages < num_pages_requested) {
169  if (evict_it->mem_status == USED) {
170  CHECK(evict_it->buffer->getPinCount() < 1);
171  }
172  num_pages += evict_it->num_pages;
173  if (evict_it->mem_status == USED && evict_it->chunk_key.size() > 0) {
174  chunk_index_.erase(evict_it->chunk_key);
175  }
176  if (evict_it->buffer != nullptr) {
177  // If we don't delete buffers here then we lose reference to them later and cause a
178  // memleak.
179  delete evict_it->buffer;
180  }
181  evict_it = slab_segments_[slab_num].erase(
182  evict_it); // erase operations returns next iterator - safe if we ever move
183  // to a vector (as opposed to erase(evict_it++)
184  }
185  BufferSeg data_seg(
186  start_page, num_pages_requested, USED, buffer_epoch_++); // until we can
187  // data_seg.pinCount++;
188  data_seg.slab_num = slab_num;
189  auto data_seg_it =
190  slab_segments_[slab_num].insert(evict_it, data_seg); // Will insert before evict_it
191  if (num_pages_requested < num_pages) {
192  size_t excess_pages = num_pages - num_pages_requested;
193  if (evict_it != slab_segments_[slab_num].end() &&
194  evict_it->mem_status == FREE) { // need to merge with current page
195  evict_it->start_page = start_page + num_pages_requested;
196  evict_it->num_pages += excess_pages;
197  } else { // need to insert a free seg before evict_it for excess_pages
198  BufferSeg free_seg(start_page + num_pages_requested, excess_pages, FREE);
199  slab_segments_[slab_num].insert(evict_it, free_seg);
200  }
201  }
202  return data_seg_it;
203 }
204 
205 BufferList::iterator BufferMgr::reserveBuffer(
206  BufferList::iterator& seg_it,
207  const size_t num_bytes) { // assumes buffer is already pinned
208 
209  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
210  size_t num_pages_extra_needed = num_pages_requested - seg_it->num_pages;
211 
212  if (num_pages_requested < seg_it->num_pages) {
213  // We already have enough pages in existing segment
214  return seg_it;
215  }
216  // First check for free segment after seg_it
217  int slab_num = seg_it->slab_num;
218  if (slab_num >= 0) { // not dummy page
219  BufferList::iterator next_it = std::next(seg_it);
220  if (next_it != slab_segments_[slab_num].end() && next_it->mem_status == FREE &&
221  next_it->num_pages >= num_pages_extra_needed) {
222  // Then we can just use the next BufferSeg which happens to be free
223  size_t leftover_pages = next_it->num_pages - num_pages_extra_needed;
224  seg_it->num_pages = num_pages_requested;
225  next_it->num_pages = leftover_pages;
226  next_it->start_page = seg_it->start_page + seg_it->num_pages;
227  return seg_it;
228  }
229  }
230  // If we're here then we couldn't keep buffer in existing slot
231  // need to find new segment, copy data over, and then delete old
232  auto new_seg_it = findFreeBuffer(num_bytes);
233 
234  // Below should be in copy constructor for BufferSeg?
235  new_seg_it->buffer = seg_it->buffer;
236  new_seg_it->chunk_key = seg_it->chunk_key;
237  int8_t* old_mem = new_seg_it->buffer->mem_;
238  new_seg_it->buffer->mem_ =
239  slabs_[new_seg_it->slab_num] + new_seg_it->start_page * page_size_;
240 
241  // now need to copy over memory
242  // only do this if the old segment is valid (i.e. not new w/ unallocated buffer
243  if (seg_it->start_page >= 0 && seg_it->buffer->mem_ != 0) {
244  new_seg_it->buffer->writeData(old_mem,
245  new_seg_it->buffer->size(),
246  0,
247  new_seg_it->buffer->getType(),
248  device_id_);
249  }
250  // Decrement pin count to reverse effect above
251  removeSegment(seg_it);
252  {
253  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
254  chunk_index_[new_seg_it->chunk_key] = new_seg_it;
255  }
256 
257  return new_seg_it;
258 }
259 
260 BufferList::iterator BufferMgr::findFreeBufferInSlab(const size_t slab_num,
261  const size_t num_pages_requested) {
262  for (auto buffer_it = slab_segments_[slab_num].begin();
263  buffer_it != slab_segments_[slab_num].end();
264  ++buffer_it) {
265  if (buffer_it->mem_status == FREE && buffer_it->num_pages >= num_pages_requested) {
266  // startPage doesn't change
267  size_t excess_pages = buffer_it->num_pages - num_pages_requested;
268  buffer_it->num_pages = num_pages_requested;
269  buffer_it->mem_status = USED;
270  buffer_it->last_touched = buffer_epoch_++;
271  buffer_it->slab_num = slab_num;
272  if (excess_pages > 0) {
273  BufferSeg free_seg(
274  buffer_it->start_page + num_pages_requested, excess_pages, FREE);
275  auto temp_it = buffer_it; // this should make a copy and not be a reference
276  // - as we do not want to increment buffer_it
277  temp_it++;
278  slab_segments_[slab_num].insert(temp_it, free_seg);
279  }
280  return buffer_it;
281  }
282  }
283  // If here then we did not find a free buffer of sufficient size in this slab,
284  // return the end iterator
285  return slab_segments_[slab_num].end();
286 }
287 
288 BufferList::iterator BufferMgr::findFreeBuffer(size_t num_bytes) {
289  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
290  if (num_pages_requested > max_num_pages_per_slab_) {
291  throw TooBigForSlab(num_bytes);
292  }
293 
294  size_t num_slabs = slab_segments_.size();
295 
296  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
297  auto seg_it = findFreeBufferInSlab(slab_num, num_pages_requested);
298  if (seg_it != slab_segments_[slab_num].end()) {
299  return seg_it;
300  }
301  }
302 
303  // If we're here then we didn't find a free segment of sufficient size
304  // First we see if we can add another slab
306  size_t allocated_num_pages{0};
307  try {
309  if (pages_left < current_max_num_pages_per_slab_) {
310  current_max_num_pages_per_slab_ = pages_left;
311  }
312  if (num_pages_requested <=
313  current_max_num_pages_per_slab_) { // don't try to allocate if the
314  // new slab won't be big enough
316  allocated_num_pages =
317  std::max(default_num_pages_per_slab_, num_pages_requested);
318  } else {
319  allocated_num_pages = current_max_num_pages_per_slab_;
320  }
321  const auto slab_in_bytes = allocated_num_pages * page_size_;
322  VLOG(1) << "Try to allocate SLAB of " << allocated_num_pages << " pages ("
323  << slab_in_bytes << " bytes) on " << getStringMgrType() << ":"
324  << device_id_;
325  auto alloc_ms = measure<>::execution([&]() { addSlab(slab_in_bytes); });
326  LOG(INFO) << "ALLOCATION slab of " << allocated_num_pages << " pages ("
327  << slab_in_bytes << "B) created in " << alloc_ms << " ms "
328  << getStringMgrType() << ":" << device_id_;
329  } else {
330  break;
331  }
332  // if here then addSlab succeeded
333  CHECK_GT(allocated_num_pages, size_t(0));
334  num_pages_allocated_ += allocated_num_pages;
335  return findFreeBufferInSlab(
336  num_slabs,
337  num_pages_requested); // has to succeed since we made sure to request a slab
338  // big enough to accomodate request
339  } catch (std::runtime_error& error) { // failed to allocate slab
340  LOG(INFO) << "ALLOCATION Attempted slab of " << allocated_num_pages << " pages ("
341  << (allocated_num_pages * page_size_) << "B) failed "
342  << getStringMgrType() << ":" << device_id_;
343  // check if there is any point halving currentMaxSlabSize and trying again
344  // if the request wont fit in half available then let try once at full size
345  // if we have already tries at full size and failed then break as
346  // there could still be room enough for other later request but
347  // not for his current one
348  if (num_pages_requested > current_max_num_pages_per_slab_ / 2 &&
349  current_max_num_pages_per_slab_ != num_pages_requested) {
350  current_max_num_pages_per_slab_ = num_pages_requested;
351  } else {
354  allocations_capped_ = true;
355  // dump out the slabs and their sizes
356  LOG(INFO) << "ALLOCATION Capped " << current_max_num_pages_per_slab_
357  << " Minimum size = " << min_num_pages_per_slab_ << " "
358  << getStringMgrType() << ":" << device_id_;
359  }
360  }
361  }
362  }
363 
365  throw FailedToCreateFirstSlab(num_bytes);
366  }
367 
368  // If here then we can't add a slab - so we need to evict
369 
370  size_t min_score = std::numeric_limits<size_t>::max();
371  // We're going for lowest score here, like golf
372  // This is because score is the sum of the lastTouched score for all pages evicted.
373  // Evicting fewer pages and older pages will lower the score
374  BufferList::iterator best_eviction_start = slab_segments_[0].end();
375  int best_eviction_start_slab = -1;
376  int slab_num = 0;
377 
378  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
379  ++slab_it, ++slab_num) {
380  for (auto buffer_it = slab_it->begin(); buffer_it != slab_it->end(); ++buffer_it) {
381  // Note there are some shortcuts we could take here - like we should never consider
382  // a USED buffer coming after a free buffer as we would have used the FREE buffer,
383  // but we won't worry about this for now
384 
385  // We can't evict pinned buffers - only normal usedbuffers
386 
387  // if (buffer_it->mem_status == FREE || buffer_it->buffer->getPinCount() == 0) {
388  size_t page_count = 0;
389  size_t score = 0;
390  bool solution_found = false;
391  auto evict_it = buffer_it;
392  for (; evict_it != slab_segments_[slab_num].end(); ++evict_it) {
393  // pinCount should never go up - only down because we have
394  // global lock on buffer pool and pin count only increments
395  // on getChunk
396  if (evict_it->mem_status == USED && evict_it->buffer->getPinCount() > 0) {
397  break;
398  }
399  page_count += evict_it->num_pages;
400  if (evict_it->mem_status == USED) {
401  // MAT changed from
402  // score += evictIt->lastTouched;
403  // Issue was thrashing when going from 8M fragment size chunks back to 64M
404  // basically the large chunks were being evicted prior to small as many small
405  // chunk score was larger than one large chunk so it always would evict a large
406  // chunk so under memory pressure a query would evict its own current chunks and
407  // cause reloads rather than evict several smaller unused older chunks.
408  score = std::max(score, static_cast<size_t>(evict_it->last_touched));
409  }
410  if (page_count >= num_pages_requested) {
411  solution_found = true;
412  break;
413  }
414  }
415  if (solution_found && score < min_score) {
416  min_score = score;
417  best_eviction_start = buffer_it;
418  best_eviction_start_slab = slab_num;
419  } else if (evict_it == slab_segments_[slab_num].end()) {
420  // this means that every segment after this will fail as well, so our search has
421  // proven futile
422  // throw std::runtime_error ("Couldn't evict chunks to get free space");
423  break;
424  // in reality we should try to rearrange the buffer to get more contiguous free
425  // space
426  }
427  // other possibility is ending at PINNED - do nothing in this case
428  //}
429  }
430  }
431  if (best_eviction_start == slab_segments_[0].end()) {
432  LOG(ERROR) << "ALLOCATION failed to find " << num_bytes << "B throwing out of memory "
433  << getStringMgrType() << ":" << device_id_;
434  VLOG(2) << printSlabs();
435  throw OutOfMemory(num_bytes);
436  }
437  LOG(INFO) << "ALLOCATION failed to find " << num_bytes << "B free. Forcing Eviction."
438  << " Eviction start " << best_eviction_start->start_page
439  << " Number pages requested " << num_pages_requested
440  << " Best Eviction Start Slab " << best_eviction_start_slab << " "
441  << getStringMgrType() << ":" << device_id_;
442  best_eviction_start =
443  evict(best_eviction_start, num_pages_requested, best_eviction_start_slab);
444  return best_eviction_start;
445 }
446 
447 std::string BufferMgr::printSlab(size_t slab_num) {
448  std::ostringstream tss;
449  // size_t lastEnd = 0;
450  tss << "Slab St.Page Pages Touch" << std::endl;
451  for (auto segment : slab_segments_[slab_num]) {
452  tss << setfill(' ') << setw(4) << slab_num;
453  // tss << " BSN: " << setfill(' ') << setw(2) << segment.slab_num;
454  tss << setfill(' ') << setw(8) << segment.start_page;
455  tss << setfill(' ') << setw(8) << segment.num_pages;
456  // tss << " GAP: " << setfill(' ') << setw(7) << segment.start_page - lastEnd;
457  // lastEnd = segment.start_page + segment.num_pages;
458  tss << setfill(' ') << setw(7) << segment.last_touched;
459  // tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
460  if (segment.mem_status == FREE) {
461  tss << " FREE"
462  << " ";
463  } else {
464  tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
465  tss << " USED - Chunk: ";
466 
467  for (auto&& key_elem : segment.chunk_key) {
468  tss << key_elem << ",";
469  }
470  }
471  tss << std::endl;
472  }
473  return tss.str();
474 }
475 
476 std::string BufferMgr::printSlabs() {
477  std::ostringstream tss;
478  tss << std::endl
479  << "Slabs Contents: "
480  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
481  size_t num_slabs = slab_segments_.size();
482  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
483  tss << printSlab(slab_num);
484  }
485  tss << "--------------------" << std::endl;
486  return tss.str();
487 }
488 
490  std::lock_guard<std::mutex> lock(global_mutex_);
491  bool pinned_exists = false;
492  for (auto& segment_list : slab_segments_) {
493  for (auto& segment : segment_list) {
494  if (segment.mem_status == FREE) {
495  // no need to free
496  } else if (segment.buffer->getPinCount() < 1) {
497  deleteBuffer(segment.chunk_key, true);
498  } else {
499  pinned_exists = true;
500  }
501  }
502  }
503  if (!pinned_exists) {
504  // lets actually clear the buffer from memory
505  LOG(INFO) << getStringMgrType() << ":" << device_id_ << " clear slab memory";
506  freeAllMem();
507  clear();
508  reinit();
509  } else {
510  LOG(INFO) << getStringMgrType() << ":" << device_id_ << " keep slab memory (pinned).";
511  }
512 }
513 
514 // return the maximum size this buffer can be in bytes
517 }
518 
519 // return how large the buffer are currently allocated
522 }
523 
524 //
526  return allocations_capped_;
527 }
528 
530  return page_size_;
531 }
532 
533 // return the size of the chunks in use in bytes
535  size_t in_use = 0;
536  for (auto& segment_list : slab_segments_) {
537  for (auto& segment : segment_list) {
538  if (segment.mem_status != FREE) {
539  in_use += segment.num_pages * page_size_;
540  }
541  }
542  }
543  return in_use;
544 }
545 
546 std::string BufferMgr::printSeg(BufferList::iterator& seg_it) {
547  std::ostringstream tss;
548  tss << "SN: " << setfill(' ') << setw(2) << seg_it->slab_num;
549  tss << " SP: " << setfill(' ') << setw(7) << seg_it->start_page;
550  tss << " NP: " << setfill(' ') << setw(7) << seg_it->num_pages;
551  tss << " LT: " << setfill(' ') << setw(7) << seg_it->last_touched;
552  tss << " PC: " << setfill(' ') << setw(2) << seg_it->buffer->getPinCount();
553  if (seg_it->mem_status == FREE) {
554  tss << " FREE"
555  << " ";
556  } else {
557  tss << " USED - Chunk: ";
558  for (auto vec_it = seg_it->chunk_key.begin(); vec_it != seg_it->chunk_key.end();
559  ++vec_it) {
560  tss << *vec_it << ",";
561  }
562  tss << std::endl;
563  }
564  return tss.str();
565 }
566 
567 std::string BufferMgr::printMap() {
568  std::ostringstream tss;
569  int seg_num = 1;
570  tss << std::endl
571  << "Map Contents: "
572  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
573  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
574  for (auto seg_it = chunk_index_.begin(); seg_it != chunk_index_.end();
575  ++seg_it, ++seg_num) {
576  // tss << "Map Entry " << seg_num << ": ";
577  // for (auto vec_it = seg_it->first.begin(); vec_it != seg_it->first.end();
578  // ++vec_it)
579  // {
580  // tss << *vec_it << ",";
581  // }
582  // tss << " " << std::endl;
583  tss << printSeg(seg_it->second);
584  }
585  tss << "--------------------" << std::endl;
586  return tss.str();
587 }
588 
590  int seg_num = 1;
591  int slab_num = 1;
592  LOG(INFO) << std::endl << " " << getStringMgrType() << ":" << device_id_;
593  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
594  ++slab_it, ++slab_num) {
595  LOG(INFO) << "Slab Num: " << slab_num << " " << getStringMgrType() << ":"
596  << device_id_;
597  for (auto seg_it = slab_it->begin(); seg_it != slab_it->end(); ++seg_it, ++seg_num) {
598  LOG(INFO) << "Segment: " << seg_num << " " << getStringMgrType() << ":"
599  << device_id_;
600  printSeg(seg_it);
601  LOG(INFO) << " " << getStringMgrType() << ":" << device_id_;
602  }
603  LOG(INFO) << "--------------------"
604  << " " << getStringMgrType() << ":" << device_id_;
605  }
606 }
607 
609  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
610  if (chunk_index_.find(key) == chunk_index_.end()) {
611  return false;
612  } else {
613  return true;
614  }
615 }
616 
618 void BufferMgr::deleteBuffer(const ChunkKey& key, const bool) {
619  // Note: purge is unused
620  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
621 
622  // lookup the buffer for the Chunk in chunk_index_
623  auto buffer_it = chunk_index_.find(key);
624  CHECK(buffer_it != chunk_index_.end());
625  auto seg_it = buffer_it->second;
626  chunk_index_.erase(buffer_it);
627  chunk_index_lock.unlock();
628  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
629  if (seg_it->buffer) {
630  delete seg_it->buffer; // Delete Buffer for segment
631  seg_it->buffer = 0;
632  }
633  removeSegment(seg_it);
634 }
635 
636 void BufferMgr::deleteBuffersWithPrefix(const ChunkKey& key_prefix, const bool) {
637  // Note: purge is unused
638  // lookup the buffer for the Chunk in chunk_index_
639  std::lock_guard<std::mutex> sized_segs_lock(
640  sized_segs_mutex_); // Take this lock early to prevent deadlock with
641  // reserveBuffer which needs segs_mutex_ and then
642  // chunk_index_mutex_
643  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
644  auto prefix_upper_bound = key_prefix;
645  prefix_upper_bound.emplace_back(std::numeric_limits<ChunkKey::value_type>::max());
646  for (auto buffer_it = chunk_index_.lower_bound(key_prefix),
647  end_chunk_it = chunk_index_.upper_bound(prefix_upper_bound);
648  buffer_it != end_chunk_it;) {
649  auto seg_it = buffer_it->second;
650  if (seg_it->buffer) {
651  if (seg_it->buffer->getPinCount() != 0) {
652  // leave the buffer and buffer segment in place, they are in use elsewhere. once
653  // unpinned, the buffer will be inaccessible and evicted
654  buffer_it++;
655  continue;
656  }
657  delete seg_it->buffer; // Delete Buffer for segment
658  seg_it->buffer = nullptr;
659  }
660  removeSegment(seg_it);
661  chunk_index_.erase(buffer_it++);
662  }
663 }
664 
665 void BufferMgr::removeSegment(BufferList::iterator& seg_it) {
666  // Note: does not delete buffer as this may be moved somewhere else
667  int slab_num = seg_it->slab_num;
668  // cout << "Slab num: " << slabNum << endl;
669  if (slab_num < 0) {
670  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
671  unsized_segs_.erase(seg_it);
672  } else {
673  if (seg_it != slab_segments_[slab_num].begin()) {
674  auto prev_it = std::prev(seg_it);
675  // LOG(INFO) << "PrevIt: " << " " << getStringMgrType() << ":" << device_id_;
676  // printSeg(prev_it);
677  if (prev_it->mem_status == FREE) {
678  seg_it->start_page = prev_it->start_page;
679  seg_it->num_pages += prev_it->num_pages;
680  slab_segments_[slab_num].erase(prev_it);
681  }
682  }
683  auto next_it = std::next(seg_it);
684  if (next_it != slab_segments_[slab_num].end()) {
685  if (next_it->mem_status == FREE) {
686  seg_it->num_pages += next_it->num_pages;
687  slab_segments_[slab_num].erase(next_it);
688  }
689  }
690  seg_it->mem_status = FREE;
691  // seg_it->pinCount = 0;
692  seg_it->buffer = 0;
693  }
694 }
695 
697  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
698  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
699 
700  for (auto& chunk_itr : chunk_index_) {
701  // checks that buffer is actual chunk (not just buffer) and is dirty
702  auto& buffer_itr = chunk_itr.second;
703  if (buffer_itr->chunk_key[0] != -1 && buffer_itr->buffer->isDirty()) {
704  parent_mgr_->putBuffer(buffer_itr->chunk_key, buffer_itr->buffer);
705  buffer_itr->buffer->clearDirtyBits();
706  }
707  }
708 }
709 
710 void BufferMgr::checkpoint(const int db_id, const int tb_id) {
711  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
712  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
713 
714  ChunkKey key_prefix;
715  key_prefix.push_back(db_id);
716  key_prefix.push_back(tb_id);
717  auto start_chunk_it = chunk_index_.lower_bound(key_prefix);
718  if (start_chunk_it == chunk_index_.end()) {
719  return;
720  }
721 
722  auto buffer_it = start_chunk_it;
723  while (buffer_it != chunk_index_.end() &&
724  std::search(buffer_it->first.begin(),
725  buffer_it->first.begin() + key_prefix.size(),
726  key_prefix.begin(),
727  key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
728  if (buffer_it->second->chunk_key[0] != -1 &&
729  buffer_it->second->buffer->isDirty()) { // checks that buffer is actual chunk
730  // (not just buffer) and is dirty
731 
732  parent_mgr_->putBuffer(buffer_it->second->chunk_key, buffer_it->second->buffer);
733  buffer_it->second->buffer->clearDirtyBits();
734  }
735  buffer_it++;
736  }
737 }
738 
741 AbstractBuffer* BufferMgr::getBuffer(const ChunkKey& key, const size_t num_bytes) {
742  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
743 
744  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
745  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
746  auto buffer_it = chunk_index_.find(key);
747  bool found_buffer = buffer_it != chunk_index_.end();
748  chunk_index_lock.unlock();
749  if (found_buffer) {
750  CHECK(buffer_it->second->buffer);
751  buffer_it->second->buffer->pin();
752  sized_segs_lock.unlock();
753 
754  buffer_it->second->last_touched = buffer_epoch_++; // race
755 
756  auto buffer_size = buffer_it->second->buffer->size();
757  if (buffer_size < num_bytes) {
758  // need to fetch part of buffer we don't have - up to numBytes
759  VLOG(1) << ToString(getMgrType())
760  << ": Fetching buffer from parent manager. Reason: increased buffer size. "
761  "Buffer size: "
762  << buffer_size << ", num bytes to fetch: " << num_bytes
763  << ", chunk key: " << keyToString(key);
764  parent_mgr_->fetchBuffer(key, buffer_it->second->buffer, num_bytes);
765  }
766  return buffer_it->second->buffer;
767  } else { // If wasn't in pool then we need to fetch it
768  sized_segs_lock.unlock();
769  // createChunk pins for us
770  AbstractBuffer* buffer = createBuffer(key, page_size_, num_bytes);
771  try {
772  VLOG(1) << ToString(getMgrType())
773  << ": Fetching buffer from parent manager. Reason: cache miss. Num bytes "
774  "to fetch: "
775  << num_bytes << ", chunk key: " << keyToString(key);
776  parent_mgr_->fetchBuffer(
777  key, buffer, num_bytes); // this should put buffer in a BufferSegment
778  } catch (const foreign_storage::ForeignStorageException& error) {
779  deleteBuffer(key); // buffer failed to load, ensure it is cleaned up
780  LOG(WARNING) << "Get chunk - Could not load chunk " << keyToString(key)
781  << " from foreign storage. Error was " << error.what();
782  throw;
783  } catch (const std::exception& error) {
784  LOG(FATAL) << "Get chunk - Could not find chunk " << keyToString(key)
785  << " in buffer pool or parent buffer pools. Error was " << error.what();
786  }
787  return buffer;
788  }
789 }
790 
792  AbstractBuffer* dest_buffer,
793  const size_t num_bytes) {
794  std::unique_lock<std::mutex> lock(global_mutex_); // granular lock
795  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
796  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
797 
798  auto buffer_it = chunk_index_.find(key);
799  bool found_buffer = buffer_it != chunk_index_.end();
800  chunk_index_lock.unlock();
801  AbstractBuffer* buffer;
802  if (!found_buffer) {
803  sized_segs_lock.unlock();
804  CHECK(parent_mgr_ != 0);
805  buffer = createBuffer(key, page_size_, num_bytes); // will pin buffer
806  try {
807  VLOG(1) << ToString(getMgrType())
808  << ": Fetching buffer from parent manager. Reason: cache miss. Num bytes "
809  "to fetch: "
810  << num_bytes << ", chunk key: " << keyToString(key);
811  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
812  } catch (const foreign_storage::ForeignStorageException& error) {
813  deleteBuffer(key); // buffer failed to load, ensure it is cleaned up
814  LOG(WARNING) << "Could not fetch parent chunk " << keyToString(key)
815  << " from foreign storage. Error was " << error.what();
816  throw;
817  } catch (std::runtime_error& error) {
818  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key)
819  << " error: " << error.what();
820  }
821  } else {
822  buffer = buffer_it->second->buffer;
823  buffer->pin();
824  auto buffer_size = buffer->size();
825  if (num_bytes > buffer_size) {
826  try {
827  VLOG(1) << ToString(getMgrType())
828  << ": Fetching buffer from parent manager. Reason: increased buffer "
829  "size. Buffer size: "
830  << buffer_size << ", num bytes to fetch: " << num_bytes
831  << ", chunk key: " << keyToString(key);
832  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
833  } catch (const foreign_storage::ForeignStorageException& error) {
834  LOG(WARNING) << "Could not fetch parent chunk " << keyToString(key)
835  << " from foreign storage. Error was " << error.what();
836  throw;
837  } catch (std::runtime_error& error) {
838  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key)
839  << " error: " << error.what();
840  }
841  }
842  sized_segs_lock.unlock();
843  }
844  lock.unlock();
845  buffer->copyTo(dest_buffer, num_bytes);
846  buffer->unPin();
847 }
848 
850  AbstractBuffer* src_buffer,
851  const size_t num_bytes) {
852  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
853  auto buffer_it = chunk_index_.find(key);
854  bool found_buffer = buffer_it != chunk_index_.end();
855  chunk_index_lock.unlock();
856  AbstractBuffer* buffer;
857  if (!found_buffer) {
858  buffer = createBuffer(key, page_size_);
859  } else {
860  buffer = buffer_it->second->buffer;
861  }
862  size_t old_buffer_size = buffer->size();
863  size_t new_buffer_size = num_bytes == 0 ? src_buffer->size() : num_bytes;
864  CHECK(!buffer->isDirty());
865 
866  if (src_buffer->isUpdated()) {
867  //@todo use dirty flags to only flush pages of chunk that need to
868  // be flushed
869  buffer->write((int8_t*)src_buffer->getMemoryPtr(),
870  new_buffer_size,
871  0,
872  src_buffer->getType(),
873  src_buffer->getDeviceId());
874  } else if (src_buffer->isAppended()) {
875  CHECK(old_buffer_size < new_buffer_size);
876  buffer->append((int8_t*)src_buffer->getMemoryPtr() + old_buffer_size,
877  new_buffer_size - old_buffer_size,
878  src_buffer->getType(),
879  src_buffer->getDeviceId());
880  } else {
881  UNREACHABLE();
882  }
883  src_buffer->clearDirtyBits();
884  buffer->syncEncoder(src_buffer);
885  return buffer;
886 }
887 
889  std::lock_guard<std::mutex> lock(buffer_id_mutex_);
890  return max_buffer_id_++;
891 }
892 
894 AbstractBuffer* BufferMgr::alloc(const size_t num_bytes) {
895  std::lock_guard<std::mutex> lock(global_mutex_);
896  ChunkKey chunk_key = {-1, getBufferId()};
897  return createBuffer(chunk_key, page_size_, num_bytes);
898 }
899 
901  std::lock_guard<std::mutex> lock(global_mutex_); // hack for now
902  Buffer* casted_buffer = dynamic_cast<Buffer*>(buffer);
903  if (casted_buffer == 0) {
904  LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type.";
905  }
906  deleteBuffer(casted_buffer->seg_it_->chunk_key);
907 }
908 
910  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
911  return chunk_index_.size();
912 }
913 
914 size_t BufferMgr::size() {
915  return num_pages_allocated_;
916 }
917 
919  return max_buffer_pool_size_;
920 }
921 
923  return max_slab_size_;
924 }
925 
927  const ChunkKey& key_prefix) {
928  LOG(FATAL) << "getChunkMetadataVecForPrefix not supported for BufferMgr.";
929 }
930 
931 const std::vector<BufferList>& BufferMgr::getSlabSegments() {
932  return slab_segments_;
933 }
934 
935 void BufferMgr::removeTableRelatedDS(const int db_id, const int table_id) {
936  UNREACHABLE();
937 }
938 } // namespace Buffer_Namespace
size_t getAllocated() override
Definition: BufferMgr.cpp:520
#define CHECK_EQ(x, y)
Definition: Logger.h:301
AbstractBufferMgr * parent_mgr_
Definition: BufferMgr.h:216
std::vector< int > ChunkKey
Definition: types.h:36
~BufferMgr() override
Destructor.
Definition: BufferMgr.cpp:88
virtual void allocateBuffer(BufferList::iterator seg_it, const size_t page_size, const size_t num_bytes)=0
AbstractBuffer * createBuffer(const ChunkKey &key, const size_t page_size=0, const size_t initial_size=0) override
Creates a chunk with the specified key and page size.
Definition: BufferMgr.cpp:117
BufferList::iterator seg_it_
Definition: Buffer.h:163
BufferList::iterator findFreeBuffer(size_t num_bytes)
Gets a buffer of required size and returns an iterator to it.
Definition: BufferMgr.cpp:288
size_t current_max_num_pages_per_slab_
Definition: BufferMgr.h:214
const size_t max_buffer_pool_size_
Definition: BufferMgr.h:173
size_t getMaxSize() override
Definition: BufferMgr.cpp:515
void syncEncoder(const AbstractBuffer *src_buffer)
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:849
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
size_t getNumChunks() override
Definition: BufferMgr.cpp:909
#define LOG(tag)
Definition: Logger.h:285
virtual void addSlab(const size_t slab_size)=0
virtual int8_t * getMemoryPtr()=0
virtual MemoryLevel getType() const =0
void removeSegment(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:665
unsigned int buffer_epoch_
Definition: BufferMgr.h:218
#define UNREACHABLE()
Definition: Logger.h:338
std::vector< BufferList > slab_segments_
Definition: BufferMgr.h:184
#define CHECK_GE(x, y)
Definition: Logger.h:306
BufferList::iterator findFreeBufferInSlab(const size_t slab_num, const size_t num_pages_requested)
Definition: BufferMgr.cpp:260
size_t size()
Returns the total number of bytes allocated.
Definition: BufferMgr.cpp:914
std::map< ChunkKey, BufferList::iterator > chunk_index_
Definition: BufferMgr.h:208
std::string printSeg(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:546
#define CHECK_GT(x, y)
Definition: Logger.h:305
const size_t min_slab_size_
max number of bytes allocated for the buffer pool
Definition: BufferMgr.h:174
BufferList::iterator reserveBuffer(BufferList::iterator &seg_it, const size_t num_bytes)
Definition: BufferMgr.cpp:205
bool isAllocationCapped() override
Definition: BufferMgr.cpp:525
std::string printSlabs() override
Definition: BufferMgr.cpp:476
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Puts the contents of d into the Buffer with ChunkKey key.
Definition: BufferMgr.cpp:608
This file includes the class specification for the buffer manager (BufferMgr), and related data struc...
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t num_bytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: BufferMgr.cpp:741
BufferList::iterator evict(BufferList::iterator &evict_start, const size_t num_pages_requested, const int slab_num)
Definition: BufferMgr.cpp:159
void free(AbstractBuffer *buffer) override
Definition: BufferMgr.cpp:900
void checkpoint() override
Definition: BufferMgr.cpp:696
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::mutex unsized_segs_mutex_
Definition: BufferMgr.h:204
void removeTableRelatedDS(const int db_id, const int table_id) override
Definition: BufferMgr.cpp:935
const std::vector< BufferList > & getSlabSegments()
Definition: BufferMgr.cpp:931
void deleteBuffersWithPrefix(const ChunkKey &key_prefix, const bool purge=true) override
Definition: BufferMgr.cpp:636
const size_t default_slab_size_
Definition: BufferMgr.h:179
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
size_t getInUseSize() override
Definition: BufferMgr.cpp:534
bool g_enable_watchdog false
Definition: Execute.cpp:80
#define CHECK(condition)
Definition: Logger.h:291
AbstractBuffer * alloc(const size_t num_bytes=0) override
client is responsible for deleting memory allocated for b-&gt;mem_
Definition: BufferMgr.cpp:894
const size_t max_slab_size_
Definition: BufferMgr.h:176
std::string printSlab(size_t slab_num)
Definition: BufferMgr.cpp:447
void fetchBuffer(const ChunkKey &key, AbstractBuffer *dest_buffer, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:791
std::string keyToString(const ChunkKey &key)
Definition: BufferMgr.cpp:36
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata_vec, const ChunkKey &key_prefix) override
Definition: BufferMgr.cpp:926
std::vector< int8_t * > slabs_
Definition: BufferMgr.h:182
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: BufferMgr.cpp:618
#define VLOG(n)
Definition: Logger.h:388
Note(s): Forbid Copying Idiom 4.1.
Definition: Buffer.h:42
virtual void freeAllMem()=0