34 #include <boost/filesystem.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/system/error_code.hpp>
49 namespace File_Namespace {
51 FileMgr::FileMgr(
const int32_t device_id,
54 const int32_t max_rollback_epochs,
55 const size_t num_reader_threads,
57 : AbstractBufferMgr(device_id)
58 , maxRollbackEpochs_(max_rollback_epochs)
61 , fileMgrKey_(file_mgr_key)
62 , page_size_(gfm->getPageSize())
63 , metadata_page_size_(gfm->getMetadataPageSize()) {
64 init(num_reader_threads, epoch);
71 const bool run_core_init)
72 : AbstractBufferMgr(device_id)
73 , maxRollbackEpochs_(-1)
76 , fileMgrKey_(file_mgr_key)
77 , page_size_(gfm->getPageSize())
78 , metadata_page_size_(gfm->getMetadataPageSize()) {
80 const std::string fileMgrDirPrefix(
"table");
81 const std::string fileMgrDirDelim(
"_");
93 : AbstractBufferMgr(0)
94 , maxRollbackEpochs_(-1)
95 , fileMgrBasePath_(base_path)
99 , page_size_(gfm->getPageSize())
100 , metadata_page_size_(gfm->getMetadataPageSize()) {
106 : AbstractBufferMgr(-1)
114 : AbstractBufferMgr(0)
115 , page_size_(page_size)
116 , metadata_page_size_(metadata_page_size) {}
121 delete chunkIt->second;
139 const std::string fileMgrDirPrefix(
"table");
140 const std::string FileMgrDirDelim(
"_");
145 if (boost::filesystem::exists(path)) {
146 if (!boost::filesystem::is_directory(path)) {
148 <<
"' for table data is not a directory.";
158 const boost::filesystem::directory_iterator& fileIterator)
const {
161 fileMetadata.
file_path = fileIterator->path().string();
162 if (!boost::filesystem::is_regular_file(fileIterator->status())) {
167 std::string extension(fileIterator->path().extension().string());
169 std::string fileStem(fileIterator->path().stem().string());
171 if (fileStem.size() > 0 && fileStem.back() ==
'.') {
172 fileStem = fileStem.substr(0, fileStem.size() - 1);
174 size_t dotPos = fileStem.find_last_of(
".");
175 if (dotPos == std::string::npos) {
176 LOG(
FATAL) <<
"File `" << fileIterator->path()
177 <<
"` does not carry page size information in the filename.";
180 fileMetadata.
file_id = boost::lexical_cast<
int>(fileStem.substr(0, dotPos));
182 boost::lexical_cast<
size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
202 boost::filesystem::directory_iterator
206 int32_t file_count = 0;
207 int32_t thread_count = std::thread::hardware_concurrency();
208 std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
210 for (boost::filesystem::directory_iterator file_it(path); file_it != end_itr;
216 std::vector<HeaderInfo> temp_header_vec;
222 return temp_header_vec;
225 if (file_count % thread_count == 0) {
236 if (file_futures.size() > 0) {
240 int64_t queue_time_ms =
timer_stop(clock_begin);
241 LOG(
INFO) <<
"Completed Reading table's file metadata, Elapsed time : " << queue_time_ms
242 <<
"ms Epoch: " <<
epoch_.
ceiling() <<
" files read: " << file_count
252 void FileMgr::init(
const size_t num_reader_threads,
const int32_t epochOverride) {
257 if (epochOverride != -1) {
262 if (!open_files_result.compaction_status_file_name.empty()) {
266 CHECK(open_files_result.compaction_status_file_name.empty());
273 auto& header_vec = open_files_result.header_infos;
274 std::sort(header_vec.begin(), header_vec.end());
280 VLOG(3) <<
"Number of Headers in Vector: " << header_vec.size();
281 if (header_vec.size() > 0) {
282 ChunkKey lastChunkKey = header_vec.begin()->chunkKey;
283 auto startIt = header_vec.begin();
285 for (
auto headerIt = header_vec.begin() + 1; headerIt != header_vec.end();
287 if (headerIt->chunkKey != lastChunkKey) {
289 lastChunkKey = headerIt->chunkKey;
303 if (!boost::filesystem::create_directory(path)) {
304 LOG(
FATAL) <<
"Could not create data directory: " << path;
307 if (epochOverride != -1) {
327 size_t metadata_page_size,
328 size_t num_pages_per_metadata_file) {
329 return (file_size == (metadata_page_size * num_pages_per_metadata_file) &&
330 page_size == metadata_page_size);
340 return storage_stats;
348 if (boost::filesystem::exists(path)) {
349 if (!boost::filesystem::is_directory(path)) {
351 <<
"' for table data is not a directory.";
356 boost::filesystem::directory_iterator
358 for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr;
385 for (
const auto& file_info_entry :
files_) {
386 const auto file_info = file_info_entry.second.get();
393 file_info->pageSize * file_info->numPages;
396 file_info->freePages.size();
409 std::set<int32_t> fragment_ids;
410 for (
const auto& [chunk_key, file_buffer] :
chunkIndex_) {
413 return static_cast<uint32_t
>(fragment_ids.size());
417 std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
418 std::vector<HeaderInfo>& headerVec) {
419 for (
auto& file_future : file_futures) {
423 for (
auto& file_future : file_futures) {
424 auto tempHeaderVec = file_future.get();
425 headerVec.insert(headerVec.end(), tempHeaderVec.begin(), tempHeaderVec.end());
427 file_futures.clear();
431 const int32_t epochOverride) {
432 int32_t converted_data_epoch = 0;
433 boost::filesystem::path path(dataPathToConvertFrom);
434 if (boost::filesystem::exists(path)) {
435 if (!boost::filesystem::is_directory(path)) {
436 LOG(
FATAL) <<
"Specified path `" << path <<
"` is not a directory.";
440 if (epochOverride != -1) {
444 boost::filesystem::directory_iterator
446 int32_t maxFileId = -1;
447 int32_t fileCount = 0;
448 int32_t threadCount = std::thread::hardware_concurrency();
449 std::vector<HeaderInfo> headerVec;
450 std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
451 for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
454 maxFileId = std::max(maxFileId, fileMetadata.
file_id);
456 std::vector<HeaderInfo> tempHeaderVec;
462 return tempHeaderVec;
465 if (fileCount % threadCount) {
471 if (file_futures.size() > 0) {
480 std::sort(headerVec.begin(), headerVec.end());
486 if (headerVec.size() > 0) {
487 ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
488 auto startIt = headerVec.begin();
490 for (
auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
492 if (headerIt->chunkKey != lastChunkKey) {
497 auto destBuf = c_fm_->
createBuffer(lastChunkKey, srcBuf->pageSize());
499 destBuf->setSize(srcBuf->size());
503 size_t totalNumPages = srcBuf->getMultiPage().size();
504 for (
size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
505 Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
510 multiPage.
push(destPage, converted_data_epoch);
511 destBuf->multiPages_.push_back(multiPage);
512 size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
514 srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
515 destBuf->writeHeader(destPage, pageNum, converted_data_epoch,
false);
517 lastChunkKey = headerIt->chunkKey;
526 auto destBuf = c_fm_->
createBuffer(lastChunkKey, srcBuf->pageSize());
528 destBuf->setSize(srcBuf->size());
532 size_t totalNumPages = srcBuf->getMultiPage().size();
533 for (
size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
534 Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
539 multiPage.
push(destPage, converted_data_epoch);
540 destBuf->multiPages_.push_back(multiPage);
541 size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
542 copyPage(srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
543 destBuf->writeHeader(destPage, pageNum, converted_data_epoch,
false);
549 if (!boost::filesystem::create_directory(path)) {
550 LOG(
FATAL) <<
"Specified path does not exist: " << path;
557 for (
const auto& [idx, file_info] :
files_) {
560 file_info->f =
nullptr;
589 const size_t reservedHeaderSize,
590 const size_t numBytes,
591 const size_t offset) {
595 int8_t* buffer =
reinterpret_cast<int8_t*
>(
checked_malloc(numBytes));
598 size_t bytesRead = srcFileInfo->
read(
600 CHECK(bytesRead == numBytes);
601 size_t bytesWritten = destFileInfo->
write(
603 CHECK(bytesWritten == numBytes);
609 if (boost::filesystem::exists(epochFilePath)) {
610 LOG(
FATAL) <<
"Epoch file '" << epochFilePath <<
"' already exists";
620 if (!boost::filesystem::exists(epochFilePath)) {
624 if (!boost::filesystem::is_regular_file(epochFilePath)) {
625 LOG(
FATAL) <<
"Epoch file `" << epochFilePath <<
"` is not a regular file";
628 LOG(
FATAL) <<
"Epoch file `" << epochFilePath
629 <<
"` is not sized properly (current size: "
632 FILE* legacyEpochFile =
open(epochFilePath);
634 read(legacyEpochFile, 0,
sizeof(int32_t), (int8_t*)&epoch, epochFilePath);
635 close(legacyEpochFile);
642 if (!boost::filesystem::exists(epochFilePath)) {
643 LOG(
FATAL) <<
"Epoch file `" << epochFilePath <<
"` does not exist";
645 if (!boost::filesystem::is_regular_file(epochFilePath)) {
646 LOG(
FATAL) <<
"Epoch file `" << epochFilePath <<
"` is not a regular file";
649 LOG(
FATAL) <<
"Epoch file `" << epochFilePath
650 <<
"` is not sized properly (current size: "
663 CHECK(status == 0) <<
"Could not flush epoch file to disk";
669 CHECK(status == 0) <<
"Could not sync epoch file to disk";
679 const int32_t min_epoch,
682 for (
auto chunkIt = lower_bound; chunkIt !=
upper_bound; ++chunkIt) {
683 chunkIt->second->freePagesBeforeEpoch(min_epoch);
693 if (should_checkpoint) {
717 const size_t page_size,
718 const size_t num_bytes) {
721 <<
"Chunk already exists: " +
show_chunk(key);
727 const size_t page_size,
728 const size_t num_bytes) {
729 size_t actual_page_size = page_size;
730 if (actual_page_size == 0) {
739 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
740 const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
743 <<
"Chunk already exists for key: " <<
show_chunk(key);
761 const ChunkKeyToChunkMap::iterator chunk_it,
764 chunk_it->second->freePages();
766 delete chunk_it->second;
777 std::search(chunkIt->first.begin(),
778 chunkIt->first.begin() + keyPrefix.size(),
780 keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
791 const size_t num_bytes)
const {
794 return chunk_it->second;
799 const size_t numBytes) {
803 <<
"Aborting attempt to fetch a chunk marked dirty. Chunk inconsistency for key: "
808 if (numBytes > 0 && numBytes > chunk->
size()) {
810 << chunk->
size() <<
") than number of bytes requested (" << numBytes
813 chunk->
copyTo(destBuffer, numBytes);
818 const size_t numBytes) {
820 size_t oldChunkSize = chunk->size();
822 size_t newChunkSize = (numBytes == 0) ? srcBuffer->
size() : numBytes;
823 if (chunk->isDirty()) {
826 if (srcBuffer->
isUpdated() && chunk->isUpdated()) {
827 LOG(
FATAL) <<
"Aborting attempt to write a chunk marked dirty. Chunk inconsistency "
832 CHECK(srcBuffer->
isDirty()) <<
"putBuffer expects a dirty buffer";
838 if (0 == numBytes && !chunk->isDirty()) {
839 chunk->setSize(newChunkSize);
849 CHECK_LT(oldChunkSize, newChunkSize);
850 chunk->append((int8_t*)srcBuffer->
getMemoryPtr() + oldChunkSize,
851 newChunkSize - oldChunkSize,
858 <<
"Dirty buffer with size > 0 must be marked as isAppended() or isUpdated()";
864 chunk->syncEncoder(srcBuffer);
869 LOG(
FATAL) <<
"Operation not supported";
874 LOG(
FATAL) <<
"Operation not supported";
880 auto candidateFiles =
fileIndex_.equal_range(pageSize);
881 int32_t pageNum = -1;
882 for (
auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
897 CHECK(pageNum != -1);
903 std::vector<Page>& pages,
904 const bool isMetadata) {
908 auto candidateFiles =
fileIndex_.equal_range(pageSize);
909 size_t numPagesNeeded = numPagesRequested;
910 for (
auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
916 pages.emplace_back(fileInfo->
fileId, pageNum);
919 }
while (pageNum != -1 && numPagesNeeded > 0);
920 if (numPagesNeeded == 0) {
924 while (numPagesNeeded > 0) {
935 pages.emplace_back(fileInfo->
fileId, pageNum);
938 }
while (pageNum != -1 && numPagesNeeded > 0);
939 if (numPagesNeeded == 0) {
943 CHECK(pages.size() == numPagesRequested);
948 const size_t pageSize,
949 const size_t numPages,
950 std::vector<HeaderInfo>& headerVec) {
951 FILE*
f =
open(path);
952 auto file_info = std::make_unique<FileInfo>(
this,
960 file_info->openExistingFile(headerVec);
963 files_.emplace(fileId, std::move(file_info));
964 fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
972 if (pageSize == 0 || numPages == 0) {
973 LOG(
FATAL) <<
"File creation failed: pageSize and numPages must be greater than 0.";
986 auto fInfo = std::make_unique<FileInfo>(
this,
997 files_[fileId] = std::move(fInfo);
998 fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
1005 CHECK(
files_.find(fileId) !=
files_.end()) <<
"File does not exist for id: " << fileId;
1006 return files_.at(fileId)->f;
1011 auto chunk_it =
chunkIndex_.lower_bound(key_prefix);
1016 std::mismatch(key_prefix.begin(), key_prefix.end(), chunk_it->first.begin());
1017 return it_pair.first == key_prefix.end();
1024 auto chunkIt =
chunkIndex_.lower_bound(keyPrefix);
1029 std::search(chunkIt->first.begin(),
1030 chunkIt->first.begin() + keyPrefix.size(),
1032 keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
1033 if (chunkIt->second->hasEncoder()) {
1034 auto chunk_metadata = std::make_shared<ChunkMetadata>();
1035 chunkIt->second->encoder_->getMetadata(chunk_metadata);
1036 chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
1046 return chunkIt->second->numMetadataPages();
1048 throw std::runtime_error(
"Chunk was not found.");
1060 if (file_version > gfm_version) {
1061 LOG(
FATAL) <<
"DB forward compatibility is not supported. Version of HeavyDB "
1062 "software used is older than the version of DB being read: "
1074 const std::string versionFilePath(
fileMgrBasePath_ +
"/" + versionFileName);
1075 if (!boost::filesystem::exists(versionFilePath) ||
1076 !boost::filesystem::is_regular_file(versionFilePath) ||
1080 FILE* versionFile =
open(versionFilePath);
1082 read(versionFile, 0,
sizeof(int32_t), (int8_t*)&version, versionFilePath);
1089 const std::string versionFilePath(
fileMgrBasePath_ +
"/" + versionFileName);
1092 if (boost::filesystem::exists(versionFilePath)) {
1094 LOG(
INFO) <<
"Storage version file `" << versionFilePath
1095 <<
"` already exists, its current version is " << oldVersion;
1096 versionFile =
open(versionFilePath);
1098 versionFile =
create(versionFilePath,
sizeof(int32_t));
1100 write(versionFile, 0,
sizeof(int32_t), (int8_t*)&version);
1101 int32_t status = fflush(versionFile);
1103 LOG(
FATAL) <<
"Could not flush version file " << versionFilePath <<
" to disk";
1111 LOG(
FATAL) <<
"Could not sync version file " << versionFilePath <<
" to disk";
1118 LOG(
INFO) <<
"Migrating file format version from 0 to 1 for `" << versionFilePath;
1124 int32_t migrationCompleteVersion = 1;
1129 LOG(
INFO) <<
"Migrating file format version from 1 to 2";
1132 constexpr int32_t migration_complete_version{2};
1137 std::map<boost::filesystem::path, boost::filesystem::path> old_to_new_paths;
1138 for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
1140 const auto old_path = boost::filesystem::canonical(it->path());
1141 if (boost::filesystem::is_regular_file(it->status()) &&
1143 auto new_path = old_path;
1145 old_to_new_paths[old_path] = new_path;
1148 for (
const auto& [old_path, new_path] : old_to_new_paths) {
1149 boost::filesystem::rename(old_path, new_path);
1150 LOG(
INFO) <<
"Rebrand migration: Renamed " << old_path <<
" to " << new_path;
1151 boost::filesystem::create_symlink(new_path.filename(), old_path);
1152 LOG(
INFO) <<
"Rebrand migration: Added symlink from " << old_path <<
" to "
1153 << new_path.filename();
1164 <<
"Table storage forward compatibility is not supported. Version of HeavyDB "
1165 "software used is older than the version of table being read: "
1201 std::stringstream error_message;
1202 error_message <<
"Cannot set epoch for " <<
describeSelf()
1203 <<
" lower than the minimum rollback epoch (" <<
epoch_.
floor() <<
").";
1204 throw std::runtime_error(error_message.str());
1230 CHECK(boost::filesystem::exists(file_path));
1231 boost::filesystem::remove(file_path);
1245 UNREACHABLE() <<
"Unexpected status file name: " << status_file_name;
1286 CHECK(!boost::filesystem::exists(copy_pages_status_file_path));
1287 std::ofstream status_file(copy_pages_status_file_path.string(),
1288 std::ios::out | std::ios::binary);
1289 status_file.close();
1291 std::vector<PageMapping> page_mappings;
1292 std::set<Page> touched_pages;
1293 std::set<size_t> page_sizes;
1294 for (
const auto& [file_id, file_info] :
files_) {
1295 page_sizes.emplace(file_info->pageSize);
1297 for (
auto page_size : page_sizes) {
1317 std::vector<PageMapping>& page_mappings,
1318 std::set<Page>& touched_pages) {
1319 std::vector<FileInfo*> sorted_file_infos;
1320 auto range =
fileIndex_.equal_range(page_size);
1321 for (
auto it = range.first; it != range.second; it++) {
1322 sorted_file_infos.emplace_back(
files_.at(it->second).get());
1324 if (sorted_file_infos.empty()) {
1331 sorted_file_infos.end(),
1333 return file_1->
freePages.size() < file_2->freePages.size();
1336 size_t destination_index = 0, source_index = sorted_file_infos.size() - 1;
1339 while (destination_index < source_index &&
1340 sorted_file_infos[destination_index]->
freePages.empty()) {
1341 destination_index++;
1345 while (destination_index < source_index &&
1346 sorted_file_infos[source_index]->
freePages.size() ==
1347 sorted_file_infos[source_index]->numPages) {
1351 std::set<size_t> source_used_pages;
1352 CHECK(destination_index <= source_index);
1355 int64_t total_free_pages{0};
1356 for (
size_t i = destination_index; i <= source_index; i++) {
1357 total_free_pages += sorted_file_infos[i]->numFreePages();
1360 while (destination_index < source_index) {
1361 if (source_used_pages.empty()) {
1363 auto source_file_info = sorted_file_infos[source_index];
1364 auto& free_pages = source_file_info->freePages;
1365 for (
size_t page_num = 0; page_num < source_file_info->numPages; page_num++) {
1366 if (free_pages.find(page_num) == free_pages.end()) {
1367 source_used_pages.emplace(page_num);
1372 total_free_pages -= source_file_info->numFreePages();
1376 if (total_free_pages - static_cast<int64_t>(source_used_pages.size()) < 0) {
1381 auto dest_file_info = sorted_file_infos[destination_index];
1382 while (!source_used_pages.empty() && !dest_file_info->freePages.empty()) {
1384 size_t source_page_num = *source_used_pages.begin();
1385 source_used_pages.erase(source_page_num);
1387 Page source_page{sorted_file_infos[source_index]->
fileId, source_page_num};
1389 sorted_file_infos[destination_index],
1395 if (source_used_pages.empty()) {
1399 if (dest_file_info->freePages.empty()) {
1400 destination_index++;
1413 std::vector<PageMapping>& page_mappings,
1414 std::set<Page>& touched_pages) {
1415 size_t destination_page_num = destination_file_info->
getFreePage();
1416 CHECK_NE(destination_page_num, static_cast<size_t>(-1));
1417 Page destination_page{destination_file_info->
fileId, destination_page_num};
1420 CHECK(touched_pages.find(source_page) == touched_pages.end());
1421 touched_pages.emplace(source_page);
1423 CHECK(touched_pages.find(destination_page) == touched_pages.end());
1424 touched_pages.emplace(destination_page);
1427 page_mappings.emplace_back(static_cast<size_t>(source_page.
fileId),
1430 static_cast<size_t>(destination_page.fileId),
1431 destination_page.pageNum);
1442 const Page& destination_page) {
1444 CHECK(source_file_info);
1448 CHECK(destination_file_info);
1452 auto page_size = source_file_info->
pageSize;
1453 auto buffer = std::make_unique<int8_t[]>(page_size);
1455 source_file_info->
read(source_page.
pageNum * page_size, page_size, buffer.get());
1458 auto header_size_offset =
sizeof(int32_t);
1459 size_t bytes_written = destination_file_info->
write(
1460 (destination_page.
pageNum * page_size) + header_size_offset,
1461 page_size - header_size_offset,
1462 buffer.get() + header_size_offset);
1463 CHECK_EQ(page_size - header_size_offset, bytes_written);
1464 return reinterpret_cast<int32_t*
>(buffer.get())[0];
1472 for (
const auto& page_mapping : page_mappings) {
1476 auto header_size = page_mapping.source_page_header_size;
1478 destination_file->write(
1479 page_mapping.destination_page_num * destination_file->pageSize,
1481 reinterpret_cast<int8_t*>(&header_size));
1486 source_file->write(page_mapping.source_page_num * source_file->pageSize,
1488 reinterpret_cast<int8_t*>(&free_page_header_size));
1489 source_file->freePageDeferred(page_mapping.source_page_num);
1492 for (
const auto& file_info_entry :
files_) {
1493 int32_t status = file_info_entry.second->syncToDisk();
1495 LOG(
FATAL) <<
"Could not sync file to disk";
1505 for (
const auto& [file_id, file_info] :
files_) {
1506 CHECK_EQ(file_id, file_info->fileId);
1507 if (file_info->freePages.size() == file_info->numPages) {
1508 fclose(file_info->f);
1509 file_info->f =
nullptr;
1513 boost::filesystem::remove(file_path);
1518 CHECK(boost::filesystem::exists(status_file_path));
1520 boost::filesystem::remove(status_file_path);
1529 const std::vector<PageMapping>& page_mappings) {
1534 CHECK(boost::filesystem::exists(file_path));
1535 CHECK(boost::filesystem::is_empty(file_path));
1536 std::ofstream status_file{file_path.string(), std::ios::out | std::ios::binary};
1537 int64_t page_mappings_count = page_mappings.size();
1538 status_file.write(reinterpret_cast<const char*>(&page_mappings_count),
sizeof(int64_t));
1539 status_file.write(reinterpret_cast<const char*>(page_mappings.data()),
1541 status_file.close();
1549 CHECK(boost::filesystem::exists(file_path));
1550 std::ifstream status_file{file_path.string(),
1551 std::ios::in | std::ios::binary | std::ios::ate};
1552 CHECK(status_file.is_open());
1554 status_file.seekg(0, std::ios::beg);
1555 CHECK_GE(file_size,
sizeof(int64_t));
1557 int64_t page_mappings_count;
1558 status_file.read(reinterpret_cast<char*>(&page_mappings_count),
sizeof(int64_t));
1559 auto page_mappings_byte_size = file_size -
sizeof(int64_t);
1561 CHECK_EQ(static_cast<size_t>(page_mappings_count),
1564 std::vector<PageMapping> page_mappings(page_mappings_count);
1565 status_file.read(reinterpret_cast<char*>(page_mappings.data()),
1566 page_mappings_byte_size);
1567 status_file.close();
1568 return page_mappings;
1575 const char*
const to_status) {
1576 auto from_status_file_path =
getFilePath(from_status);
1577 auto to_status_file_path =
getFilePath(to_status);
1579 readOnlyCheck(
"write file", from_status_file_path.string());
1581 CHECK(boost::filesystem::exists(from_status_file_path));
1582 CHECK(!boost::filesystem::exists(to_status_file_path));
1583 boost::filesystem::rename(from_status_file_path, to_status_file_path);
1598 for (
const auto& file_info_entry :
files_) {
1599 int32_t status = file_info_entry.second->syncToDisk();
1600 CHECK(status == 0) <<
"Could not sync file to disk";
1606 size_t num_hardware_based_threads = std::thread::hardware_concurrency();
1607 if (num_reader_threads == 0 || num_reader_threads > num_hardware_based_threads) {
1620 free_pages_.clear();
1625 const size_t num_bytes) {
1626 return new FileBuffer(
this, page_size, key, num_bytes);
1631 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
1632 const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
1633 return new FileBuffer(
this, key, headerStartIt, headerEndIt);
1648 auto table_epoch =
epoch(db_id, tb_id);
1685 if (buf->isDirty()) {
1686 buf->writeMetadata(
epoch());
1687 buf->clearDirtyBits();
1702 const size_t requested_file_size)
const {
1704 return create(full_path, requested_file_size);
1709 const size_t page_size,
1710 const size_t num_pages)
const {
1712 return create(base_path, file_id, page_size, num_pages);
1716 const size_t offset,
1718 const int8_t* buf)
const {
1720 return write(f, offset, size, buf);
1724 const std::optional<std::string>& file_name)
const {
1726 << (file_name.has_value() ? (
": '" + file_name.value() +
"'") :
"")
1727 <<
". Not allowed in read only mode.";
DEVICE auto upper_bound(ARGS &&...args)
int32_t openAndReadLegacyEpochFile(const std::string &epochFileName)
const size_t metadata_page_size_
std::vector< PageMapping > readPageMappingsFromStatusFile()
virtual FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &headerStartIt, const std::vector< HeaderInfo >::const_iterator &headerEndIt)
int32_t readVersionFromDisk(const std::string &versionFileName) const
std::vector< int > ChunkKey
OpenFilesResult openFiles()
AbstractBuffer * alloc(const size_t numBytes) override
size_t write(const size_t offset, const size_t size, const int8_t *buf)
TablePair fileMgrKey_
Global FileMgr.
uint64_t total_data_page_count
std::string getBasePath() const
FileMgr(const int32_t device_id, GlobalFileMgr *gfm, const TablePair file_mgr_key, const int32_t max_rollback_epochs=-1, const size_t num_reader_threads=0, const int32_t epoch=-1)
Constructor.
static constexpr int32_t LATEST_FILE_MGR_VERSION
void createEpochFile(const std::string &epochFileName)
bool is_page_deleted_without_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
virtual Page requestFreePage(size_t pagesize, const bool isMetadata)
size_t getNumUsedMetadataPagesForChunkKey(const ChunkKey &chunkKey) const
void syncEncoder(const AbstractBuffer *src_buffer)
std::string get_legacy_data_file_path(const std::string &new_data_file_path)
FileBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
void sortAndCopyFilePagesForCompaction(size_t page_size, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
uint32_t metadata_file_count
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
std::vector< HeaderInfo > header_infos
virtual FileBuffer * allocateBuffer(const size_t page_size, const ChunkKey &key, const size_t num_bytes=0)
A logical page (Page) belongs to a file on disk.
FileInfo * createFileInfo(const size_t pageSize, const size_t numPages)
Adds a file to the file manager repository.
#define CHUNK_KEY_FRAGMENT_IDX
void copySourcePageForCompaction(const Page &source_page, FileInfo *destination_file_info, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
virtual int8_t * getMemoryPtr()=0
void free(AbstractBuffer *buffer) override
virtual MemoryLevel getType() const =0
void freePageImmediate(int32_t page_num)
void copyPage(Page &srcPage, FileMgr *destFileMgr, Page &destPage, const size_t reservedHeaderSize, const size_t numBytes, const size_t offset)
void setDataAndMetadataFileStats(StorageStats &storage_stats) const
void migrateToLatestFileMgrVersion()
void rollOffOldData(const int32_t epochCeiling, const bool shouldCheckpoint)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
static constexpr char const * UPDATE_PAGE_VISIBILITY_STATUS
DEVICE void sort(ARGS &&...args)
void init(const size_t num_reader_threads, const int32_t epochOverride)
TypeR::rep timer_stop(Type clock_begin)
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
bool getDBConvert() const
static int64_t min_allowable_epoch()
std::string getFileMgrBasePath() const
bool is_compaction_status_file(const std::string &file_name)
std::mutex getPageMutex_
pointer to DB level metadata
#define DEFAULT_METADATA_PAGE_SIZE
static constexpr int32_t db_version_
virtual bool updatePageIfDeleted(FileInfo *file_info, ChunkKey &chunk_key, int32_t contingent, int32_t page_epoch, int32_t page_num)
deletes or recovers a page based on last checkpointed epoch.
std::optional< uint64_t > total_free_data_page_count
void writePageMappingsToStatusFile(const std::vector< PageMapping > &page_mappings)
std::pair< FILE *, std::string > create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Represents/provides access to contiguous data stored in the file system.
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
int32_t copyPageWithoutHeaderSize(const Page &source_page, const Page &destination_page)
std::string fileMgrBasePath_
static void setNumPagesPerMetadataFile(size_t num_pages)
int32_t lastCheckpointedEpoch() const
Returns value of epoch at last checkpoint.
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
std::string show_chunk(const ChunkKey &key)
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
static size_t num_pages_per_data_file_
int32_t PageHeaderSizeType
std::shared_lock< T > shared_lock
std::set< size_t > freePages
future< Result > async(Fn &&fn, Args &&...args)
size_t pageSize
file stream object for the represented file
int32_t fileMgrVersion_
the index of the next file id
void writeAndSyncVersionToDisk(const std::string &versionFileName, const int32_t version)
size_t getNumChunks() override
void removeTableRelatedDS(const int32_t db_id, const int32_t table_id) override
void processFileFutures(std::vector< std::future< std::vector< HeaderInfo >>> &file_futures, std::vector< HeaderInfo > &headerVec)
FileBuffer * getOrCreateBuffer(const ChunkKey &key)
void * checked_malloc(const size_t size)
static void setNumPagesPerDataFile(size_t num_pages)
std::optional< uint64_t > total_free_metadata_page_count
void freePagesBeforeEpoch(const int32_t min_epoch)
ChunkKeyToChunkMap chunkIndex_
PageSizeFileMMap fileIndex_
std::unique_lock< T > unique_lock
virtual ChunkKeyToChunkMap::iterator deleteBufferUnlocked(const ChunkKeyToChunkMap::iterator chunk_it, const bool purge=true)
#define CHUNK_KEY_TABLE_IDX
uint64_t total_metadata_page_count
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
std::vector< std::pair< FileInfo *, int32_t > > free_pages_
static constexpr char FILE_MGR_VERSION_FILENAME[]
An AbstractBuffer is a unit of data management for a data manager.
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
static size_t num_pages_per_metadata_file_
bool isBufferOnDevice(const ChunkKey &key) override
uint32_t getFragmentCount() const
FileInfo * openExistingFile(const std::string &path, const int32_t fileId, const size_t pageSize, const size_t numPages, std::vector< HeaderInfo > &headerVec)
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf, const std::string &file_path)
Reads the specified number of bytes from the offset position in file f into buf.
void writeAndSyncEpochToDisk()
boost::filesystem::path getFilePath(const std::string &file_name) const
size_t pageNum
unique identifier of the owning file
#define DEFAULT_PAGE_SIZE
std::string compaction_status_file_name
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
heavyai::shared_mutex chunkIndexMutex_
const TablePair get_fileMgrKey() const
static constexpr char EPOCH_FILENAME[]
virtual std::string describeSelf() const
virtual FileBuffer * createBufferUnlocked(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0)
heavyai::shared_mutex mutex_free_page_
int32_t maxRollbackEpochs_
FILE * createFile(const std::string &full_path, const size_t requested_file_size) const
void freePagesBeforeEpochUnlocked(const int32_t min_epoch, const ChunkKeyToChunkMap::iterator lower_bound, const ChunkKeyToChunkMap::iterator upper_bound)
DEVICE auto lower_bound(ARGS &&...args)
uint64_t total_metadata_file_size
virtual void readOnlyCheck(const std::string &action, const std::optional< std::string > &file_name={}) const
void migrateEpochFileV0()
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
void openAndReadEpochFile(const std::string &epochFileName)
size_t read(const size_t offset, const size_t size, int8_t *buf)
bool epochIsCheckpointed_
bool is_page_deleted_with_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
StorageStats getStorageStats() const
uint64_t total_data_file_size
bool hasChunkMetadataForKeyPrefix(const ChunkKey &keyPrefix)
static size_t byte_size()
static constexpr char const * DELETE_EMPTY_FILES_STATUS
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
~FileMgr() override
Destructor.
FILE * open(int file_id)
Opens the file with the given id; fatal crash on error.
bool getDBConvert() const
Index for looking up chunks.
static void renameAndSymlinkLegacyFiles(const std::string &table_data_dir)
virtual void free_page(std::pair< FileInfo *, int32_t > &&page)
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
virtual void closeRemovePhysical()
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
static constexpr char const * COPY_PAGES_STATUS
FileMetadata getMetadataForFile(const boost::filesystem::directory_iterator &fileIterator) const
void updateMappedPagesVisibility(const std::vector< PageMapping > &page_mappings)
std::optional< uint32_t > fragment_count
FileBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
void recoverPage(const ChunkKey &chunk_key, int32_t page_num)
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
virtual FileBuffer * getBufferUnlocked(const ChunkKey &key, const size_t numBytes=0) const
void setEpoch(const int32_t newEpoch)
bool coreInit()
Determines file path, and if exists, runs file migration and opens and reads epoch file...
FileInfo * getFileInfoForFileId(const int32_t fileId) const
void initializeNumThreads(size_t num_reader_threads=0)
void requestFreePages(size_t npages, size_t pagesize, std::vector< Page > &pages, const bool isMetadata)
Obtains free pages – creates new files if necessary – of the requested size.
void createOrMigrateTopLevelMetadata()
heavyai::shared_mutex files_rw_mutex_
void closePhysicalUnlocked()
size_t writeFile(FILE *f, const size_t offset, const size_t size, const int8_t *buf) const
std::pair< const int32_t, const int32_t > TablePair
std::map< int32_t, std::unique_ptr< FileInfo > > files_
std::string get_data_file_path(const std::string &base_path, int file_id, size_t page_size)
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_<EPOCH>_<oldname>.
unsigned nextFileId_
number of threads used when loading data
int32_t epochFloor() const
bool is_metadata_file(size_t file_size, size_t page_size, size_t metadata_page_size, size_t num_pages_per_metadata_file)
size_t file_size(const int fd)
void renameCompactionStatusFile(const char *const from_status, const char *const to_status)
The MultiPage stores versions of the same logical page in a deque.
constexpr auto kLegacyDataFileExtension
static constexpr char LEGACY_EPOCH_FILENAME[]
A selection of helper methods for File I/O.
FileBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
void resumeFileCompaction(const std::string &status_file_name)
static constexpr char DB_META_FILENAME[]
static constexpr int32_t INVALID_VERSION
void migrateLegacyFilesV1()