24 #include <boost/filesystem.hpp>
28 namespace bf = boost::filesystem;
32 size_t space_used = 0;
33 if (bf::exists(dir)) {
34 for (
const auto& file : bf::recursive_directory_iterator(dir)) {
35 if (bf::is_regular_file(file.path())) {
48 LOG(
FATAL) <<
"Disk cache needs to evict data to make space, but no data found in "
56 namespace File_Namespace {
60 ss <<
"Dump Cache:\n";
62 ss <<
" " <<
show_chunk(key) <<
" num_pages: " << buf->pageCount()
63 <<
", is dirty: " << buf->isDirty() <<
"\n";
72 :
FileMgr(config.page_size, config.meta_page_size) {
91 auto& header_vec = open_files_result.header_infos;
92 std::sort(header_vec.begin(), header_vec.end());
97 VLOG(3) <<
"Number of Headers in Vector: " << header_vec.size();
98 if (header_vec.size() > 0) {
99 auto startIt = header_vec.begin();
100 ChunkKey lastChunkKey = startIt->chunkKey;
101 for (
auto it = header_vec.begin() + 1; it != header_vec.end(); ++it) {
102 if (it->chunkKey != lastChunkKey) {
104 lastChunkKey = it->chunkKey;
121 CHECK(bf::is_directory(path))
122 <<
"Specified path '" <<
fileMgrBasePath_ <<
"' for disk cache is not a directory.";
125 boost::regex table_filter(
"table_([0-9]+)_([0-9]+)");
126 for (
const auto& file : bf::directory_iterator(path)) {
128 auto file_name = file.path().filename().string();
129 if (boost::regex_match(file_name, match, table_filter)) {
130 int32_t db_id = std::stoi(match[1]);
131 int32_t tb_id = std::stoi(match[2]);
134 <<
"Trying to read data for existing table";
136 std::make_unique<TableFileMgr>(file.path().string()));
154 auto& [pair, table_dir] = *tables_it;
155 return table_dir->getEpoch();
162 auto& [pair, table_dir] = *tables_it;
163 table_dir->incrementEpoch();
170 table_it->second->writeAndSyncEpochToDisk();
197 size_t space_used = 0;
198 ChunkKey min_table_key{db_id, tb_id};
199 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
200 for (
auto it =
chunkIndex_.lower_bound(min_table_key);
203 auto& [key, buffer] = *it;
204 space_used += (buffer->numChunkPages() *
page_size_);
210 int32_t tb_id)
const {
212 size_t space_used = 0;
213 ChunkKey min_table_key{db_id, tb_id};
214 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
215 for (
auto it =
chunkIndex_.lower_bound(min_table_key);
218 auto& [key, buffer] = *it;
229 space += table_it->second->getReservedSpace();
238 return chunk_space + meta_space + subdir_space;
251 VLOG(2) <<
"Checkpointing " << describeSelf() <<
" (" << db_id <<
", " << tb_id
252 <<
") epoch: " << epoch(db_id, tb_id);
253 writeDirtyBuffers(db_id, tb_id);
255 writeAndSyncEpochToDisk(db_id, tb_id);
256 incrementEpoch(db_id, tb_id);
260 void CachingFileMgr::createTableFileMgrIfNoneExists(
const int32_t db_id,
261 const int32_t tb_id) {
264 if (table_dirs_.find(table_pair) == table_dirs_.end()) {
266 table_pair, std::make_unique<TableFileMgr>(getTableFileMgrPath(db_id, tb_id)));
271 const size_t page_size,
272 const size_t num_bytes) {
275 createTableFileMgrIfNoneExists(db_id, tb_id);
276 return FileMgr::createBufferUnlocked(key, page_size, num_bytes);
281 const std::vector<HeaderInfo>::const_iterator& startIt,
282 const std::vector<HeaderInfo>::const_iterator& endIt) {
283 if (startIt->pageId != -1) {
290 createTableFileMgrIfNoneExists(db_id, tb_id);
291 auto buffer = FileMgr::createBufferFromHeaders(key, startIt, endIt);
292 if (buffer->isMissingPages()) {
295 buffer->freeChunkPages();
308 const size_t num_bytes) {
309 CHECK(!src_buffer->
isDirty()) <<
"Cannot cache dirty buffers.";
310 deleteBufferIfExists(key);
315 return FileMgr::putBuffer(key, src_buffer, num_bytes);
318 void CachingFileMgr::incrementAllEpochs() {
320 for (
auto& table_dir : table_dirs_) {
321 table_dir.second->incrementEpoch();
325 void CachingFileMgr::removeTableFileMgr(int32_t db_id, int32_t tb_id) {
328 auto it = table_dirs_.find({db_id, tb_id});
329 if (it != table_dirs_.end()) {
330 it->second->removeDiskContent();
331 table_dirs_.erase(it);
335 void CachingFileMgr::removeTableBuffers(int32_t db_id, int32_t tb_id) {
338 ChunkKey min_table_key{db_id, tb_id};
339 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
340 for (
auto it = chunkIndex_.lower_bound(min_table_key);
341 it != chunkIndex_.upper_bound(max_table_key);) {
342 it = deleteBufferUnlocked(it);
348 const size_t num_bytes) {
354 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
355 const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
360 bool CachingFileMgr::updatePageIfDeleted(
FileInfo* file_info,
372 file_info->
freePage(page_num,
false, page_epoch);
378 void CachingFileMgr::writeDirtyBuffers(int32_t db_id, int32_t tb_id) {
380 ChunkKey min_table_key{db_id, tb_id};
381 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
383 for (
auto chunk_it = chunkIndex_.lower_bound(min_table_key);
384 chunk_it != chunkIndex_.upper_bound(max_table_key);
386 if (
auto [key, buf] = *chunk_it; buf->isDirty()) {
388 buf->freeMetadataPages();
389 buf->writeMetadata(epoch(db_id, tb_id));
390 buf->clearDirtyBits();
396 void CachingFileMgr::deleteBufferIfExists(
const ChunkKey& key) {
398 auto chunk_it = chunkIndex_.find(key);
399 if (chunk_it != chunkIndex_.end()) {
400 deleteBufferUnlocked(chunk_it);
404 size_t CachingFileMgr::getNumDataChunks()
const {
406 size_t num_chunks = 0;
407 for (
auto [key, buf] : chunkIndex_) {
408 if (buf->hasDataPages()) {
415 void CachingFileMgr::deleteCacheIfTooLarge() {
417 closeRemovePhysical();
418 bf::create_directory(fileMgrBasePath_);
419 LOG(
INFO) <<
"Cache path over limit. Existing cache deleted.";
423 Page CachingFileMgr::requestFreePage(
size_t pageSize,
const bool isMetadata) {
424 std::lock_guard<std::mutex> lock(getPageMutex_);
425 int32_t pageNum = -1;
427 auto candidateFiles = fileIndex_.equal_range(pageSize);
429 for (
auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
430 FileInfo* fileInfo = getFileInfoForFileId(fileIt->second);
440 if (getMaxMetaFiles() > getNumMetaFiles()) {
441 fileInfo = createFileInfo(pageSize, num_pages_per_metadata_file_);
444 if (getMaxDataFiles() > getNumDataFiles()) {
445 fileInfo = createFileInfo(pageSize, num_pages_per_data_file_);
453 fileInfo = isMetadata ? evictMetadataPages() : evictPages();
458 CHECK(pageNum != -1);
462 std::vector<ChunkKey> CachingFileMgr::getKeysForTable(int32_t db_id,
463 int32_t tb_id)
const {
464 std::vector<ChunkKey> keys;
465 ChunkKey min_table_key{db_id, tb_id};
466 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
467 for (
auto it = chunkIndex_.lower_bound(min_table_key);
468 it != chunkIndex_.upper_bound(max_table_key);
470 keys.emplace_back(it->first);
480 const auto keys = getKeysForTable(db_id, tb_id);
481 for (
const auto& key : keys) {
482 auto chunk_it = chunkIndex_.find(key);
483 CHECK(chunk_it != chunkIndex_.end());
484 auto& buf = chunk_it->second;
489 CHECK(buf->getMetadataPage().pageVersions.size() > 0);
491 getFileInfoForFileId(buf->getMetadataPage().pageVersions.front().page.fileId);
495 deleteBufferUnlocked(chunk_it);
498 deleteWrapperFile(db_id, tb_id);
499 CHECK(file_info) <<
"FileInfo with freed page not found";
509 if (!buf->hasDataPages()) {
520 CHECK(buf->getMultiPage().front().pageVersions.size() > 0);
521 file_info = getFileInfoForFileId(
522 buf->getMultiPage().front().pageVersions.front().page.fileId);
524 auto pages_freed = buf->freeChunkPages();
525 CHECK(pages_freed > 0) <<
"failed to evict a page";
526 CHECK(file_info) <<
"FileInfo with freed page not found";
530 void CachingFileMgr::touchKey(
const ChunkKey& key)
const {
531 chunk_evict_alg_.touchChunk(key);
535 void CachingFileMgr::removeKey(
const ChunkKey& key)
const {
537 chunk_evict_alg_.removeChunk(key);
540 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
541 for (
auto it = chunkIndex_.lower_bound(table_key);
542 it != chunkIndex_.upper_bound(max_table_key);
544 if (it->first != key) {
551 table_evict_alg_.removeChunk(table_key);
554 size_t CachingFileMgr::getFilesSize()
const {
557 for (
const auto& [
id, file] : files_) {
563 size_t CachingFileMgr::getTableFileMgrsSize()
const {
565 size_t space_used = 0;
566 for (
const auto& [pair, table_dir] : table_dirs_) {
567 space_used += table_dir->getReservedSpace();
572 size_t CachingFileMgr::getNumDataFiles()
const {
574 return fileIndex_.count(page_size_);
577 size_t CachingFileMgr::getNumMetaFiles()
const {
579 return fileIndex_.count(metadata_page_size_);
582 std::vector<ChunkKey> CachingFileMgr::getChunkKeysForPrefix(
585 std::vector<ChunkKey> chunks;
586 for (
auto [key, buf] : chunkIndex_) {
588 if (buf->hasDataPages()) {
589 chunks.emplace_back(key);
597 void CachingFileMgr::removeChunkKeepMetadata(
const ChunkKey& key) {
598 if (isBufferOnDevice(key)) {
599 auto chunkIt = chunkIndex_.find(key);
600 CHECK(chunkIt != chunkIndex_.end());
601 auto& buf = chunkIt->second;
602 if (buf->hasDataPages()) {
603 buf->freeChunkPages();
604 chunk_evict_alg_.removeChunk(key);
609 size_t CachingFileMgr::getNumChunksWithMetadata()
const {
612 for (
const auto& [key, buf] : chunkIndex_) {
613 if (buf->hasEncoder()) {
620 std::string CachingFileMgr::dumpKeysWithMetadata()
const {
622 std::string ret_string =
"CFM keys with metadata:\n";
623 for (
const auto& [key, buf] : chunkIndex_) {
624 if (buf->hasEncoder()) {
631 std::string CachingFileMgr::dumpKeysWithChunkData()
const {
633 std::string ret_string =
"CFM keys with chunk data:\n";
634 for (
const auto& [key, buf] : chunkIndex_) {
635 if (buf->hasDataPages()) {
642 std::unique_ptr<CachingFileMgr> CachingFileMgr::reconstruct()
const {
644 DiskCacheLevel::none,
648 metadata_page_size_};
649 return std::make_unique<CachingFileMgr>(config);
652 void CachingFileMgr::deleteWrapperFile(int32_t db, int32_t tb) {
654 auto it = table_dirs_.find({db, tb});
655 CHECK(it != table_dirs_.end()) <<
"Wrapper does not exist.";
656 it->second->deleteWrapperFile();
659 void CachingFileMgr::writeWrapperFile(
const std::string& doc, int32_t db, int32_t tb) {
660 createTableFileMgrIfNoneExists(db, tb);
661 auto wrapper_size = doc.size();
662 CHECK_LE(wrapper_size, getMaxWrapperSize())
663 <<
"Wrapper is too big to fit into the cache";
664 while (wrapper_size > getAvailableWrapperSpace()) {
665 evictMetadataPages();
668 table_dirs_.at({db, tb})->writeWrapperFile(doc);
671 bool CachingFileMgr::hasWrapperFile(int32_t db_id, int32_t table_id)
const {
673 auto it = table_dirs_.find({db_id, table_id});
674 if (it != table_dirs_.end()) {
675 return it->second->hasWrapperFile();
689 void CachingFileMgr::setMaxSizes() {
690 size_t max_meta_space = std::floor(max_size_ * METADATA_SPACE_PERCENTAGE);
691 size_t max_meta_file_space = std::floor(max_size_ * METADATA_FILE_SPACE_PERCENTAGE);
692 max_wrapper_space_ = max_meta_space - max_meta_file_space;
693 auto max_data_space = max_size_ - max_meta_space;
694 auto meta_file_size = metadata_page_size_ * num_pages_per_metadata_file_;
695 auto data_file_size = page_size_ * num_pages_per_data_file_;
696 max_num_data_files_ = max_data_space / data_file_size;
697 max_num_meta_files_ = max_meta_file_space / meta_file_size;
698 CHECK_GT(max_num_data_files_, 0U) <<
"Cannot create a cache of size " << max_size_
699 <<
". Not enough space to create a data file.";
700 CHECK_GT(max_num_meta_files_, 0U) <<
"Cannot create a cache of size " << max_size_
701 <<
". Not enough space to create a metadata file.";
704 std::optional<FileBuffer*> CachingFileMgr::getBufferIfExists(
const ChunkKey& key) {
706 auto chunk_it = chunkIndex_.find(key);
707 if (chunk_it == chunkIndex_.end()) {
710 return getBufferUnlocked(key);
713 ChunkKeyToChunkMap::iterator CachingFileMgr::deleteBufferUnlocked(
714 const ChunkKeyToChunkMap::iterator chunk_it,
716 removeKey(chunk_it->first);
717 return FileMgr::deleteBufferUnlocked(chunk_it, purge);
720 void CachingFileMgr::getChunkMetadataVecForKeyPrefix(
723 FileMgr::getChunkMetadataVecForKeyPrefix(chunkMetadataVec, keyPrefix);
724 for (
const auto& [key, meta] : chunkMetadataVec) {
730 const size_t num_bytes)
const {
732 return FileMgr::getBufferUnlocked(key, num_bytes);
735 void CachingFileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
736 page.first->freePageDeferred(page.second);
739 std::set<ChunkKey> CachingFileMgr::getKeysWithMetadata()
const {
741 std::set<ChunkKey> ret;
742 for (
const auto& [key, buf] : chunkIndex_) {
743 if (buf->hasEncoder()) {
750 size_t CachingFileMgr::getMaxDataFilesSize()
const {
751 if (limit_data_size_) {
752 return *limit_data_size_;
754 return getMaxDataFiles() * getDataFileSize();
757 TableFileMgr::TableFileMgr(
const std::string& table_path)
758 : table_path_(table_path)
759 , epoch_file_path_(table_path_ +
"/" +
FileMgr::EPOCH_FILENAME)
760 , wrapper_file_path_(table_path_ +
"/" +
CachingFileMgr::WRAPPER_FILE_NAME)
762 , is_checkpointed_(
true) {
767 <<
"' for cache table data is not a directory.";
771 <<
"Found epoch file '" <<
epoch_file_path_ <<
"' which is not a regular file";
773 <<
"Found epoch file '" <<
epoch_file_path_ <<
"' which is not of expected size";
788 <<
"Epoch greater than maximum allowed value (" <<
epoch_.
ceiling() <<
" > "
801 CHECK(status == 0) <<
"Could not flush epoch file to disk";
807 CHECK(status == 0) <<
"Could not sync epoch file to disk";
819 for (
const auto& file : bf::recursive_directory_iterator(
table_path_)) {
820 if (bf::is_regular_file(file.path())) {
837 "\". The error was: " + std::strerror(errno)};
851 os <<
"ForeignTables";
858 <<
static_cast<int32_t
>(disk_cache_level);
const size_t metadata_page_size_
size_t getTableFileMgrSpaceReserved(int32_t db_id, int32_t tb_id) const
std::vector< int > ChunkKey
OpenFilesResult openFiles()
std::ostream & operator<<(std::ostream &os, DiskCacheLevel disk_cache_level)
LRUEvictionAlgorithm table_evict_alg_
void removeDiskContent() const
Removes all disk data for the subdir.
const ChunkKey evictNextChunk() override
This file details an extension of the FileMgr that can contain pages from multiple tables (CachingFil...
size_t size_of_dir(const std::string &dir)
heavyai::shared_mutex table_dirs_mutex_
std::string get_dir_name_for_table(int db_id, int tb_id)
A logical page (Page) belongs to a file on disk.
void setMaxSizes()
Sets the maximum number of files/space for each type of storage based on the maximum size...
void writeAndSyncEpochToDisk()
Write and flush the epoch to the epoch file on disk.
~CachingFileMgr() override
DEVICE void sort(ARGS &&...args)
std::string describeSelf() const override
describes this FileMgr for logging purposes.
size_t getSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
void closeRemovePhysical() override
Closes files and removes the caching directory.
static int64_t min_allowable_epoch()
std::string getFileMgrBasePath() const
size_t getMetadataSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
std::pair< FILE *, std::string > create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Represents/provides access to contiguous data stored in the file system.
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
static int64_t max_allowable_epoch()
bool hasWrapperFile() const
std::string fileMgrBasePath_
ChunkKey get_table_key(const ChunkKey &key)
std::string show_chunk(const ChunkKey &key)
CachingFileMgr(const DiskCacheConfig &config)
void freePage(int32_t pageId, const bool isRolloff, int32_t epoch)
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
std::shared_lock< T > shared_lock
std::string dumpEvictionQueue()
void deleteCacheIfTooLarge()
When the cache is read from disk, we don't know which chunks were least recently used. Rather than try to evict random pages to get down to size we just reset the cache to make sure we have space.
heavyai::shared_mutex table_mutex_
void incrementAllEpochs()
Increment epochs for each table in the CFM.
constexpr int32_t DELETE_CONTINGENT
A FileInfo type has a file pointer and metadata about a file.
std::string wrapper_file_path_
std::string epoch_file_path_
ChunkKeyToChunkMap chunkIndex_
void removeTableBuffers(int32_t db_id, int32_t tb_id)
Erases and cleans up all buffers for a table.
std::unique_lock< T > unique_lock
void deleteWrapperFile() const
Deletes only the wrapper file on disk.
An AbstractBuffer is a unit of data management for a data manager.
void incrementEpoch()
increment the epoch for this subdir (not synced to disk).
bool g_enable_smem_group_by true
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf, const std::string &file_path)
Reads the specified number of bytes from the offset position in file f into buf.
void writeAndSyncEpochToDisk()
void removeTableFileMgr(int32_t db_id, int32_t tb_id)
Removes the subdirectory content for a table.
constexpr int32_t ROLLOFF_CONTINGENT
ChunkKey evict_chunk_or_fail(LRUEvictionAlgorithm &alg)
heavyai::shared_mutex chunkIndexMutex_
int32_t maxRollbackEpochs_
void writeWrapperFile(const std::string &doc) const
Writes wrapper file to disk.
static size_t byte_size()
size_t getReservedSpace() const
Returns the disk space used (in bytes) for the subdir.
std::map< TablePair, std::unique_ptr< TableFileMgr > > table_dirs_
FILE * open(int file_id)
Opens the file with the given id; fatal crash on error.
void readTableFileMgrs()
Checks for any sub-directories containing table-specific data and creates epochs from found files...
std::pair< int, int > get_table_prefix(const ChunkKey &key)
void initializeNumThreads(size_t num_reader_threads=0)
void init(const size_t num_reader_threads)
Initializes a CFM, parsing any existing files and initializing data structures appropriately (current...
heavyai::shared_mutex files_rw_mutex_
void closePhysicalUnlocked()
bool in_same_table(const ChunkKey &left_key, const ChunkKey &right_key)
size_t num_reader_threads
std::pair< const int32_t, const int32_t > TablePair
unsigned nextFileId_
number of threads used when loading data
size_t getChunkSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
size_t file_size(const int fd)
int32_t getEpoch() const
Returns the current epoch (locked)
std::string getTableFileMgrPath(int32_t db, int32_t tb) const
void clearForTable(int32_t db_id, int32_t tb_id)
Removes all data related to the given table (pages and subdirectories).
A FileMgr capable of limiting it's size and storing data from multiple tables in a shared directory...
LRUEvictionAlgorithm chunk_evict_alg_
FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &startIt, const std::vector< HeaderInfo >::const_iterator &endIt) override
Creates a buffer and initializes it with info read from files on disk.