34 namespace Buffer_Namespace {
36 std::string BufferMgr::keyToString(
const ChunkKey& key) {
37 std::ostringstream oss;
40 for (
auto sub_key : key) {
41 oss << sub_key <<
",";
47 BufferMgr::BufferMgr(
const int device_id,
48 const size_t max_buffer_pool_size,
49 const size_t min_slab_size,
50 const size_t max_slab_size,
51 const size_t default_slab_size,
52 const size_t page_size,
53 AbstractBufferMgr* parent_mgr)
54 : AbstractBufferMgr(device_id)
55 , max_buffer_pool_size_(max_buffer_pool_size)
56 , min_slab_size_(min_slab_size)
57 , max_slab_size_(max_slab_size)
58 , default_slab_size_(default_slab_size)
59 , page_size_(page_size)
60 , num_pages_allocated_(0)
61 , allocations_capped_(
false)
62 , parent_mgr_(parent_mgr)
106 delete buf.second->buffer;
109 chunk_index_.clear();
118 const size_t chunk_page_size,
119 const size_t initial_size) {
121 size_t actual_chunk_page_size = chunk_page_size;
122 if (actual_chunk_page_size == 0) {
131 buffer_seg.chunk_key = chunk_key;
147 buffer_it->second->buffer =
160 const size_t num_pages_requested,
161 const int slab_num) {
165 auto evict_it = evict_start;
166 size_t num_pages = 0;
167 size_t start_page = evict_start->start_page;
168 while (num_pages < num_pages_requested) {
169 if (evict_it->mem_status ==
USED) {
170 CHECK(evict_it->buffer->getPinCount() < 1);
172 num_pages += evict_it->num_pages;
173 if (evict_it->mem_status ==
USED && evict_it->chunk_key.size() > 0) {
176 if (evict_it->buffer !=
nullptr) {
179 delete evict_it->buffer;
191 if (num_pages_requested < num_pages) {
192 size_t excess_pages = num_pages - num_pages_requested;
194 evict_it->mem_status ==
FREE) {
195 evict_it->start_page = start_page + num_pages_requested;
196 evict_it->num_pages += excess_pages;
198 BufferSeg free_seg(start_page + num_pages_requested, excess_pages,
FREE);
206 BufferList::iterator& seg_it,
207 const size_t num_bytes) {
210 size_t num_pages_extra_needed = num_pages_requested - seg_it->num_pages;
212 if (num_pages_requested < seg_it->num_pages) {
217 int slab_num = seg_it->slab_num;
219 BufferList::iterator next_it = std::next(seg_it);
221 next_it->num_pages >= num_pages_extra_needed) {
223 size_t leftover_pages = next_it->num_pages - num_pages_extra_needed;
224 seg_it->num_pages = num_pages_requested;
225 next_it->num_pages = leftover_pages;
226 next_it->start_page = seg_it->start_page + seg_it->num_pages;
235 new_seg_it->buffer = seg_it->buffer;
236 new_seg_it->chunk_key = seg_it->chunk_key;
237 int8_t* old_mem = new_seg_it->buffer->mem_;
238 new_seg_it->buffer->mem_ =
243 if (seg_it->start_page >= 0 && seg_it->buffer->mem_ != 0) {
244 new_seg_it->buffer->writeData(old_mem,
245 new_seg_it->buffer->size(),
247 new_seg_it->buffer->getType(),
261 const size_t num_pages_requested) {
265 if (buffer_it->mem_status ==
FREE && buffer_it->num_pages >= num_pages_requested) {
267 size_t excess_pages = buffer_it->num_pages - num_pages_requested;
268 buffer_it->num_pages = num_pages_requested;
269 buffer_it->mem_status =
USED;
271 buffer_it->slab_num = slab_num;
272 if (excess_pages > 0) {
274 buffer_it->start_page + num_pages_requested, excess_pages,
FREE);
275 auto temp_it = buffer_it;
296 for (
size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
306 size_t allocated_num_pages{0};
312 if (num_pages_requested <=
316 allocated_num_pages =
321 const auto slab_in_bytes = allocated_num_pages *
page_size_;
322 VLOG(1) <<
"Try to allocate SLAB of " << allocated_num_pages <<
" pages ("
323 << slab_in_bytes <<
" bytes) on " << getStringMgrType() <<
":"
326 LOG(
INFO) <<
"ALLOCATION slab of " << allocated_num_pages <<
" pages ("
327 << slab_in_bytes <<
"B) created in " << alloc_ms <<
" ms "
328 << getStringMgrType() <<
":" << device_id_;
333 CHECK_GT(allocated_num_pages,
size_t(0));
334 num_pages_allocated_ += allocated_num_pages;
337 num_pages_requested);
339 }
catch (std::runtime_error& error) {
340 LOG(
INFO) <<
"ALLOCATION Attempted slab of " << allocated_num_pages <<
" pages ("
341 << (allocated_num_pages *
page_size_) <<
"B) failed "
342 << getStringMgrType() <<
":" << device_id_;
358 << getStringMgrType() <<
":" << device_id_;
370 size_t min_score = std::numeric_limits<size_t>::max();
374 BufferList::iterator best_eviction_start =
slab_segments_[0].end();
375 int best_eviction_start_slab = -1;
379 ++slab_it, ++slab_num) {
380 for (
auto buffer_it = slab_it->begin(); buffer_it != slab_it->end(); ++buffer_it) {
388 size_t page_count = 0;
390 bool solution_found =
false;
391 auto evict_it = buffer_it;
396 if (evict_it->mem_status ==
USED && evict_it->buffer->getPinCount() > 0) {
399 page_count += evict_it->num_pages;
400 if (evict_it->mem_status ==
USED) {
408 score = std::max(score, static_cast<size_t>(evict_it->last_touched));
410 if (page_count >= num_pages_requested) {
411 solution_found =
true;
415 if (solution_found && score < min_score) {
417 best_eviction_start = buffer_it;
418 best_eviction_start_slab = slab_num;
432 LOG(
ERROR) <<
"ALLOCATION failed to find " << num_bytes <<
"B throwing out of memory "
433 << getStringMgrType() <<
":" << device_id_;
437 LOG(
INFO) <<
"ALLOCATION failed to find " << num_bytes <<
"B free. Forcing Eviction."
438 <<
" Eviction start " << best_eviction_start->start_page
439 <<
" Number pages requested " << num_pages_requested
440 <<
" Best Eviction Start Slab " << best_eviction_start_slab <<
" "
441 << getStringMgrType() <<
":" << device_id_;
442 best_eviction_start =
443 evict(best_eviction_start, num_pages_requested, best_eviction_start_slab);
444 return best_eviction_start;
448 std::ostringstream tss;
450 tss <<
"Slab St.Page Pages Touch" << std::endl;
452 tss << setfill(
' ') << setw(4) << slab_num;
454 tss << setfill(
' ') << setw(8) << segment.start_page;
455 tss << setfill(
' ') << setw(8) << segment.num_pages;
458 tss << setfill(
' ') << setw(7) << segment.last_touched;
460 if (segment.mem_status ==
FREE) {
464 tss <<
" PC: " << setfill(
' ') << setw(2) << segment.buffer->getPinCount();
465 tss <<
" USED - Chunk: ";
467 for (
auto&& key_elem : segment.chunk_key) {
468 tss << key_elem <<
",";
477 std::ostringstream tss;
479 <<
"Slabs Contents: "
480 <<
" " << getStringMgrType() <<
":" << device_id_ << std::endl;
482 for (
size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
485 tss <<
"--------------------" << std::endl;
491 bool pinned_exists =
false;
493 for (
auto& segment : segment_list) {
494 if (segment.mem_status ==
FREE) {
496 }
else if (segment.buffer->getPinCount() < 1) {
499 pinned_exists =
true;
503 if (!pinned_exists) {
505 LOG(
INFO) << getStringMgrType() <<
":" << device_id_ <<
" clear slab memory";
510 LOG(
INFO) << getStringMgrType() <<
":" << device_id_ <<
" keep slab memory (pinned).";
537 for (
auto& segment : segment_list) {
538 if (segment.mem_status !=
FREE) {
547 std::ostringstream tss;
548 tss <<
"SN: " << setfill(
' ') << setw(2) << seg_it->slab_num;
549 tss <<
" SP: " << setfill(
' ') << setw(7) << seg_it->start_page;
550 tss <<
" NP: " << setfill(
' ') << setw(7) << seg_it->num_pages;
551 tss <<
" LT: " << setfill(
' ') << setw(7) << seg_it->last_touched;
552 tss <<
" PC: " << setfill(
' ') << setw(2) << seg_it->buffer->getPinCount();
553 if (seg_it->mem_status ==
FREE) {
557 tss <<
" USED - Chunk: ";
558 for (
auto vec_it = seg_it->chunk_key.begin(); vec_it != seg_it->chunk_key.end();
560 tss << *vec_it <<
",";
568 std::ostringstream tss;
572 <<
" " << getStringMgrType() <<
":" << device_id_ << std::endl;
575 ++seg_it, ++seg_num) {
585 tss <<
"--------------------" << std::endl;
592 LOG(
INFO) << std::endl <<
" " << getStringMgrType() <<
":" << device_id_;
594 ++slab_it, ++slab_num) {
595 LOG(
INFO) <<
"Slab Num: " << slab_num <<
" " << getStringMgrType() <<
":"
597 for (
auto seg_it = slab_it->begin(); seg_it != slab_it->end(); ++seg_it, ++seg_num) {
598 LOG(
INFO) <<
"Segment: " << seg_num <<
" " << getStringMgrType() <<
":"
601 LOG(
INFO) <<
" " << getStringMgrType() <<
":" << device_id_;
603 LOG(
INFO) <<
"--------------------"
604 <<
" " << getStringMgrType() <<
":" << device_id_;
625 auto seg_it = buffer_it->second;
627 chunk_index_lock.unlock();
629 if (seg_it->buffer) {
630 delete seg_it->buffer;
639 std::lock_guard<std::mutex> sized_segs_lock(
644 auto prefix_upper_bound = key_prefix;
645 prefix_upper_bound.emplace_back(std::numeric_limits<ChunkKey::value_type>::max());
646 for (
auto buffer_it =
chunk_index_.lower_bound(key_prefix),
647 end_chunk_it =
chunk_index_.upper_bound(prefix_upper_bound);
648 buffer_it != end_chunk_it;) {
649 auto seg_it = buffer_it->second;
650 if (seg_it->buffer) {
651 if (seg_it->buffer->getPinCount() != 0) {
657 delete seg_it->buffer;
658 seg_it->buffer =
nullptr;
667 int slab_num = seg_it->slab_num;
674 auto prev_it = std::prev(seg_it);
677 if (prev_it->mem_status ==
FREE) {
678 seg_it->start_page = prev_it->start_page;
679 seg_it->num_pages += prev_it->num_pages;
683 auto next_it = std::next(seg_it);
685 if (next_it->mem_status ==
FREE) {
686 seg_it->num_pages += next_it->num_pages;
690 seg_it->mem_status =
FREE;
702 auto& buffer_itr = chunk_itr.second;
703 if (buffer_itr->chunk_key[0] != -1 && buffer_itr->buffer->isDirty()) {
704 parent_mgr_->putBuffer(buffer_itr->chunk_key, buffer_itr->buffer);
705 buffer_itr->buffer->clearDirtyBits();
715 key_prefix.push_back(db_id);
716 key_prefix.push_back(tb_id);
717 auto start_chunk_it =
chunk_index_.lower_bound(key_prefix);
722 auto buffer_it = start_chunk_it;
724 std::search(buffer_it->first.begin(),
725 buffer_it->first.begin() + key_prefix.size(),
727 key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
728 if (buffer_it->second->chunk_key[0] != -1 &&
729 buffer_it->second->buffer->isDirty()) {
732 parent_mgr_->putBuffer(buffer_it->second->chunk_key, buffer_it->second->buffer);
733 buffer_it->second->buffer->clearDirtyBits();
748 chunk_index_lock.unlock();
750 CHECK(buffer_it->second->buffer);
751 buffer_it->second->buffer->pin();
752 sized_segs_lock.unlock();
756 auto buffer_size = buffer_it->second->buffer->size();
757 if (buffer_size < num_bytes) {
759 VLOG(1) << ToString(getMgrType())
760 <<
": Fetching buffer from parent manager. Reason: increased buffer size. "
762 << buffer_size <<
", num bytes to fetch: " << num_bytes
764 parent_mgr_->fetchBuffer(key, buffer_it->second->buffer, num_bytes);
766 return buffer_it->second->buffer;
768 sized_segs_lock.unlock();
772 VLOG(1) << ToString(getMgrType())
773 <<
": Fetching buffer from parent manager. Reason: cache miss. Num bytes "
775 << num_bytes <<
", chunk key: " <<
keyToString(key);
777 key, buffer, num_bytes);
781 <<
" from foreign storage. Error was " << error.what();
783 }
catch (
const std::exception& error) {
785 <<
" in buffer pool or parent buffer pools. Error was " << error.what();
793 const size_t num_bytes) {
800 chunk_index_lock.unlock();
803 sized_segs_lock.unlock();
807 VLOG(1) << ToString(getMgrType())
808 <<
": Fetching buffer from parent manager. Reason: cache miss. Num bytes "
810 << num_bytes <<
", chunk key: " <<
keyToString(key);
815 <<
" from foreign storage. Error was " << error.what();
817 }
catch (std::runtime_error& error) {
819 <<
" error: " << error.what();
822 buffer = buffer_it->second->buffer;
824 auto buffer_size = buffer->
size();
825 if (num_bytes > buffer_size) {
827 VLOG(1) << ToString(getMgrType())
828 <<
": Fetching buffer from parent manager. Reason: increased buffer "
829 "size. Buffer size: "
830 << buffer_size <<
", num bytes to fetch: " << num_bytes
835 <<
" from foreign storage. Error was " << error.what();
837 }
catch (std::runtime_error& error) {
839 <<
" error: " << error.what();
842 sized_segs_lock.unlock();
845 buffer->
copyTo(dest_buffer, num_bytes);
851 const size_t num_bytes) {
855 chunk_index_lock.unlock();
860 buffer = buffer_it->second->buffer;
862 size_t old_buffer_size = buffer->
size();
863 size_t new_buffer_size = num_bytes == 0 ? src_buffer->
size() : num_bytes;
875 CHECK(old_buffer_size < new_buffer_size);
877 new_buffer_size - old_buffer_size,
903 if (casted_buffer == 0) {
904 LOG(
FATAL) <<
"Wrong buffer type - expects base class pointer to Buffer type.";
928 LOG(
FATAL) <<
"getChunkMetadataVecForPrefix not supported for BufferMgr.";
size_t max_buffer_pool_num_pages_
size_t getAllocated() override
size_t min_num_pages_per_slab_
AbstractBufferMgr * parent_mgr_
std::vector< int > ChunkKey
~BufferMgr() override
Destructor.
virtual void allocateBuffer(BufferList::iterator seg_it, const size_t page_size, const size_t num_bytes)=0
AbstractBuffer * createBuffer(const ChunkKey &key, const size_t page_size=0, const size_t initial_size=0) override
Creates a chunk with the specified key and page size.
BufferList::iterator seg_it_
BufferList::iterator findFreeBuffer(size_t num_bytes)
Gets a buffer of required size and returns an iterator to it.
size_t current_max_num_pages_per_slab_
const size_t max_buffer_pool_size_
size_t getMaxSize() override
size_t max_num_pages_per_slab_
void syncEncoder(const AbstractBuffer *src_buffer)
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t num_bytes=0) override
static TimeT::rep execution(F func, Args &&...args)
size_t getNumChunks() override
virtual void addSlab(const size_t slab_size)=0
virtual int8_t * getMemoryPtr()=0
virtual MemoryLevel getType() const =0
size_t num_pages_allocated_
void removeSegment(BufferList::iterator &seg_it)
unsigned int buffer_epoch_
std::vector< BufferList > slab_segments_
BufferList::iterator findFreeBufferInSlab(const size_t slab_num, const size_t num_pages_requested)
size_t size()
Returns the total number of bytes allocated.
std::map< ChunkKey, BufferList::iterator > chunk_index_
std::string printSeg(BufferList::iterator &seg_it)
const size_t min_slab_size_
max number of bytes allocated for the buffer pool
BufferList::iterator reserveBuffer(BufferList::iterator &seg_it, const size_t num_bytes)
bool isAllocationCapped() override
std::string printSlabs() override
An AbstractBuffer is a unit of data management for a data manager.
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Puts the contents of d into the Buffer with ChunkKey key.
This file includes the class specification for the buffer manager (BufferMgr), and related data struc...
std::mutex buffer_id_mutex_
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t num_bytes=0) override
Returns the a pointer to the chunk with the specified key.
BufferList::iterator evict(BufferList::iterator &evict_start, const size_t num_pages_requested, const int slab_num)
void free(AbstractBuffer *buffer) override
std::mutex sized_segs_mutex_
void checkpoint() override
std::mutex unsized_segs_mutex_
void removeTableRelatedDS(const int db_id, const int table_id) override
const std::vector< BufferList > & getSlabSegments()
void deleteBuffersWithPrefix(const ChunkKey &key_prefix, const bool purge=true) override
const size_t default_slab_size_
size_t default_num_pages_per_slab_
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
size_t getInUseSize() override
bool g_enable_watchdog false
size_t getMaxBufferSize()
AbstractBuffer * alloc(const size_t num_bytes=0) override
client is responsible for deleting memory allocated for b->mem_
const size_t max_slab_size_
std::string printSlab(size_t slab_num)
std::mutex chunk_index_mutex_
void fetchBuffer(const ChunkKey &key, AbstractBuffer *dest_buffer, const size_t num_bytes=0) override
std::string keyToString(const ChunkKey &key)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata_vec, const ChunkKey &key_prefix) override
std::vector< int8_t * > slabs_
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Note(s): Forbid Copying Idiom 4.1.
virtual void freeAllMem()=0