43 using namespace Data_Namespace;
46 namespace filesystem {
47 class directory_iterator;
51 namespace File_Namespace {
98 using TablePair = std::pair<const int32_t, const int32_t>;
111 int32_t epoch_floor{0};
112 uint32_t metadata_file_count{0};
113 uint64_t total_metadata_file_size{0};
114 uint64_t total_metadata_page_count{0};
115 std::optional<uint64_t> total_free_metadata_page_count{};
116 uint32_t data_file_count{0};
117 uint64_t total_data_file_size{0};
118 uint64_t total_data_page_count{0};
119 std::optional<uint64_t> total_free_data_page_count{};
120 std::optional<uint32_t> fragment_count{};
140 size_t source_page_num,
142 int32_t destination_file_id,
143 size_t destination_page_num)
144 : source_file_id(source_file_id)
145 , source_page_num(source_page_num)
146 , source_page_header_size(source_page_header_size)
147 , destination_file_id(destination_file_id)
148 , destination_page_num(destination_page_num) {}
166 FileMgr(
const int32_t device_id,
169 const int32_t max_rollback_epochs = -1,
170 const size_t num_reader_threads = 0,
171 const int32_t epoch = -1);
174 FileMgr(
const int32_t device_id,
177 const bool run_core_init);
188 const size_t numBytes = 0)
override;
190 bool isBufferOnDevice(
const ChunkKey& key)
override;
195 void deleteBuffer(
const ChunkKey& key,
const bool purge =
true)
override;
196 void deleteBuffersWithPrefix(
const ChunkKey& keyPrefix,
197 const bool purge =
true)
override;
202 void fetchBuffer(
const ChunkKey& key,
204 const size_t numBytes)
override;
214 const size_t numBytes = 0)
override;
219 virtual Page requestFreePage(
size_t pagesize,
const bool isMetadata);
223 inline std::string
printSlabs()
override {
return "Not Implemented"; }
230 return files_.at(fileId).get();
234 const boost::filesystem::directory_iterator& fileIterator)
const;
236 void copyPage(
Page& srcPage,
239 const size_t reservedHeaderSize,
240 const size_t numBytes,
241 const size_t offset);
256 void requestFreePages(
size_t npages,
258 std::vector<Page>& pages,
259 const bool isMetadata);
262 const ChunkKey& keyPrefix)
override;
264 bool hasChunkMetadataForKeyPrefix(
const ChunkKey& keyPrefix);
271 void checkpoint()
override;
272 void checkpoint(
const int32_t db_id,
const int32_t tb_id)
override {
273 LOG(
FATAL) <<
"Operation not supported, api checkpoint() should be used instead";
281 inline virtual int32_t
epoch(int32_t db_id, int32_t tb_id)
const {
return epoch(); }
283 inline int32_t
epochFloor()
const {
return static_cast<int32_t
>(epoch_.floor()); }
286 int32_t newEpoch = epoch_.increment();
287 epochIsCheckpointed_ =
false;
291 LOG(
FATAL) <<
"Epoch for table (" << fileMgrKey_.first <<
", " << fileMgrKey_.second
292 <<
") greater than maximum allowed value of "
302 return epoch() - (epochIsCheckpointed_ ? 0 : 1);
325 FILE* getFileForFileId(
const int32_t fileId);
327 size_t getNumChunks()
override;
328 size_t getNumUsedMetadataPagesForChunkKey(
const ChunkKey& chunkKey)
const;
332 bool getDBConvert()
const;
333 void createOrMigrateTopLevelMetadata();
335 virtual void closeRemovePhysical();
337 void removeTableRelatedDS(
const int32_t db_id,
const int32_t table_id)
override;
339 virtual void free_page(std::pair<FileInfo*, int32_t>&& page);
343 boost::filesystem::path getFilePath(
const std::string& file_name)
const;
346 void writePageMappingsToStatusFile(
const std::vector<PageMapping>& page_mappings);
349 void renameCompactionStatusFile(
const char*
const from_status,
350 const char*
const to_status);
357 virtual bool updatePageIfDeleted(
FileInfo* file_info,
372 virtual std::string describeSelf()
const;
374 FILE* createFile(
const std::string& full_path,
const size_t requested_file_size)
const;
375 std::pair<FILE*, std::string> createFile(
const std::string& base_path,
377 const size_t page_size,
378 const size_t num_pages)
const;
379 size_t writeFile(FILE*
f,
382 const int8_t* buf)
const;
384 static constexpr
size_t DEFAULT_NUM_PAGES_PER_DATA_FILE{256};
385 static constexpr
size_t DEFAULT_NUM_PAGES_PER_METADATA_FILE{4096};
388 static constexpr
char const* COPY_PAGES_STATUS{
"pending_data_compaction_0"};
389 static constexpr
char const* UPDATE_PAGE_VISIBILITY_STATUS{
"pending_data_compaction_1"};
390 static constexpr
char const* DELETE_EMPTY_FILES_STATUS{
"pending_data_compaction_2"};
394 static void setNumPagesPerDataFile(
size_t num_pages);
395 static void setNumPagesPerMetadataFile(
size_t num_pages);
397 static void renameAndSymlinkLegacyFiles(
const std::string& table_data_dir);
399 static constexpr
char LEGACY_EPOCH_FILENAME[] =
"epoch";
400 static constexpr
char EPOCH_FILENAME[] =
"epoch_metadata";
401 static constexpr
char DB_META_FILENAME[] =
"dbmeta";
402 static constexpr
char FILE_MGR_VERSION_FILENAME[] =
"filemgr_version";
403 static constexpr int32_t INVALID_VERSION = -1;
404 static constexpr int32_t LATEST_FILE_MGR_VERSION = 2;
408 FileMgr(
const size_t defaultPageSize,
const size_t defaultMetadataPageSize);
412 std::map<int32_t, std::unique_ptr<FileInfo>> files_;
418 FILE* DBMetaFile_ =
nullptr;
425 bool isFullyInitted_{
false};
446 FileInfo* createFileInfo(
const size_t pageSize,
const size_t numPages);
447 FileInfo* openExistingFile(
const std::string& path,
448 const int32_t fileId,
449 const size_t pageSize,
450 const size_t numPages,
451 std::vector<HeaderInfo>& headerVec);
452 void createEpochFile(
const std::string& epochFileName);
453 int32_t openAndReadLegacyEpochFile(
const std::string& epochFileName);
454 void openAndReadEpochFile(
const std::string& epochFileName);
455 void writeAndSyncEpochToDisk();
456 void setEpoch(
const int32_t newEpoch);
457 int32_t readVersionFromDisk(
const std::string& versionFileName)
const;
458 void writeAndSyncVersionToDisk(
const std::string& versionFileName,
460 void processFileFutures(std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
461 std::vector<HeaderInfo>& headerVec);
464 const size_t numBytes = 0);
467 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
468 const std::vector<HeaderInfo>::const_iterator& headerEndIt);
471 void migrateToLatestFileMgrVersion();
472 void migrateEpochFileV0();
473 void migrateLegacyFilesV1();
477 void clearFileInfos();
480 void copySourcePageForCompaction(
const Page& source_page,
482 std::vector<PageMapping>& page_mappings,
483 std::set<Page>& touched_pages);
484 int32_t copyPageWithoutHeaderSize(
const Page& source_page,
485 const Page& destination_page);
486 void sortAndCopyFilePagesForCompaction(
size_t page_size,
487 std::vector<PageMapping>& page_mappings,
488 std::set<Page>& touched_pages);
489 void updateMappedPagesVisibility(
const std::vector<PageMapping>& page_mappings);
490 void deleteEmptyFiles();
491 void resumeFileCompaction(
const std::string& status_file_name);
492 std::vector<PageMapping> readPageMappingsFromStatusFile();
497 void closePhysicalUnlocked();
498 void syncFilesToDisk();
500 void initializeNumThreads(
size_t num_reader_threads = 0);
501 virtual FileBuffer* allocateBuffer(
const size_t page_size,
503 const size_t num_bytes = 0);
506 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
507 const std::vector<HeaderInfo>::const_iterator& headerEndIt);
508 virtual ChunkKeyToChunkMap::iterator deleteBufferUnlocked(
509 const ChunkKeyToChunkMap::iterator chunk_it,
510 const bool purge =
true);
512 const size_t numBytes = 0)
const;
515 void init(
const size_t num_reader_threads,
const int32_t epochOverride);
516 void init(
const std::string& dataPathToConvertFrom,
const int32_t epochOverride);
518 void rollOffOldData(
const int32_t epochCeiling,
const bool shouldCheckpoint);
519 void freePagesBeforeEpoch(
const int32_t min_epoch);
520 void freePagesBeforeEpochUnlocked(
const int32_t min_epoch,
530 inline int32_t
epoch()
const {
return static_cast<int32_t
>(epoch_.ceiling()); }
531 void writeDirtyBuffers();
533 void setDataAndMetadataFileStats(
StorageStats& storage_stats)
const;
534 uint32_t getFragmentCount()
const;
536 virtual void readOnlyCheck(
const std::string&
action,
537 const std::optional<std::string>& file_name = {})
const;
543 bool epochIsCheckpointed_ =
true;
544 FILE* epochFile_ =
nullptr;
DEVICE auto upper_bound(ARGS &&...args)
const size_t metadata_page_size_
virtual int32_t epoch(int32_t db_id, int32_t tb_id) const
Returns current value of epoch - should be one greater than recorded at last checkpoint. Because FileMgr only contains buffers from one table we can just return the FileMgr's epoch instead of finding a table-specific epoch.
std::vector< int > ChunkKey
TablePair fileMgrKey_
Global FileMgr.
size_t getPageSize() const
virtual bool hasFileMgrKey() const
std::vector< HeaderInfo > header_infos
A logical page (Page) belongs to a file on disk.
int32_t destination_file_id
std::string printSlabs() override
size_t getMaxSize() override
std::string getFileMgrBasePath() const
std::mutex getPageMutex_
pointer to DB level metadata
Represents/provides access to contiguous data stored in the file system.
static int64_t max_allowable_epoch()
MgrType getMgrType() override
std::string fileMgrBasePath_
std::multimap< size_t, int32_t > PageSizeFileMMap
Maps logical page sizes to files.
int32_t lastCheckpointedEpoch() const
Returns value of epoch at last checkpoint.
size_t getInUseSize() override
static size_t num_pages_per_data_file_
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
int32_t PageHeaderSizeType
PageMapping(int32_t source_file_id, size_t source_page_num, PageHeaderSizeType source_page_header_size, int32_t destination_file_id, size_t destination_page_num)
int32_t fileMgrVersion_
the index of the next file id
void init(LogOptions const &log_opts)
ChunkKeyToChunkMap chunkIndex_
PageSizeFileMMap fileIndex_
constexpr int32_t kDbVersion
DB version for DataMgr DS and corresponding file buffer read/write code.
std::vector< std::pair< FileInfo *, int32_t > > free_pages_
An AbstractBuffer is a unit of data management for a data manager.
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
static size_t num_pages_per_metadata_file_
std::map< ChunkKey, FileBuffer * > ChunkKeyToChunkMap
Maps ChunkKeys (unique ids for Chunks) to Chunk objects.
std::string compaction_status_file_name
heavyai::shared_mutex chunkIndexMutex_
const TablePair get_fileMgrKey() const
heavyai::shared_mutex mutex_free_page_
int32_t maxRollbackEpochs_
DEVICE auto lower_bound(ARGS &&...args)
void checkpoint(const int32_t db_id, const int32_t tb_id) override
virtual bool failOnReadError() const
True if a read error should cause a fatal error.
std::string getStringMgrType() override
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
size_t getAllocated() override
This file contains the declaration and definition of a Page type and a MultiPage type.
FileInfo * getFileInfoForFileId(const int32_t fileId) const
heavyai::shared_mutex files_rw_mutex_
size_t destination_page_num
int32_t maxRollbackEpochs()
Returns value max_rollback_epochs.
std::shared_timed_mutex shared_mutex
std::pair< const int32_t, const int32_t > TablePair
unsigned nextFileId_
number of threads used when loading data
int32_t epochFloor() const
PageHeaderSizeType source_page_header_size
bool isAllocationCapped() override
size_t getMetadataPageSize() const
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...