27 #include <boost/filesystem.hpp>
28 #include <boost/lexical_cast.hpp>
40 namespace File_Namespace {
42 GlobalFileMgr::GlobalFileMgr(
const int32_t device_id,
43 std::shared_ptr<ForeignStorageInterface> fsi,
44 std::string base_path,
45 const size_t num_reader_threads,
46 const size_t page_size,
47 const size_t metadata_page_size)
48 : AbstractBufferMgr(device_id)
50 , basePath_(base_path)
51 , num_reader_threads_(num_reader_threads)
54 , page_size_(page_size)
55 , metadata_page_size_(metadata_page_size) {
67 if (boost::filesystem::exists(path)) {
68 if (!boost::filesystem::is_directory(path)) {
69 LOG(
FATAL) <<
"Specified path is not a directory.";
72 if (!boost::filesystem::create_directory(path)) {
73 LOG(
FATAL) <<
"Could not create data directory";
82 fileMgrsIt->second->checkpoint();
92 size_t num_chunks = 0;
95 num_chunks += fileMgrsIt->second->getNumChunks();
107 if (keyPrefix[0] != -1) {
108 return getFileMgr(keyPrefix)->deleteBuffersWithPrefix(keyPrefix, purge);
113 const int32_t tb_id) {
115 AbstractBufferMgr* fm =
nullptr;
116 const auto file_mgr_key = std::make_pair(db_id, tb_id);
125 const auto file_mgr_key = std::make_pair(db_id, tb_id);
142 if (file_mgr_params.
epoch != -1 &&
161 const auto file_mgr_key = std::make_pair(db_id, tb_id);
162 auto max_rollback_epochs =
165 auto s = std::make_shared<FileMgr>(
194 const auto file_mgr_key = std::make_pair(db_id, tb_id);
195 const auto foreign_buffer_manager =
fsi_->lookupBufferManager(db_id, tb_id);
196 if (foreign_buffer_manager) {
199 return foreign_buffer_manager;
201 int32_t max_rollback_epochs{-1};
206 auto s = std::make_shared<FileMgr>(
218 const int table_id) {
219 const auto table_key = std::make_pair(db_id, table_id);
229 std::shared_ptr<FileMgr> file_mgr) {
243 if ((fileMgr != 0) && (fileMgr != fm)) {
248 chunkIt->second->write((int8_t*)chunkIt->second, chunkIt->second->size(), 0);
256 if (
auto fm = dynamic_cast<File_Namespace::FileMgr*>(abm)) {
257 fm->closeRemovePhysical();
258 }
else if (dynamic_cast<ForeignStorageBufferMgr*>(abm)) {
259 abm->removeTableRelatedDS(db_id, tb_id);
260 fsi_->dropBufferManager(db_id, tb_id);
265 const auto file_mgr_key = std::make_pair(db_id, tb_id);
266 auto u = std::make_unique<FileMgr>(0,
this, file_mgr_key,
true);
267 u->closeRemovePhysical();
277 const int32_t start_epoch) {
278 AbstractBufferMgr* opened_fm =
findFileMgr(db_id, tb_id);
284 const auto file_mgr_key = std::make_pair(db_id, tb_id);
288 auto u = std::make_unique<FileMgr>(
298 AbstractBufferMgr* opened_fm =
findFileMgr(db_id, tb_id);
300 return dynamic_cast<FileMgr*
>(opened_fm)->lastCheckpointedEpoch();
303 const auto file_mgr_key = std::make_pair(db_id, tb_id);
304 auto u = std::make_unique<FileMgr>(0,
this, file_mgr_key,
true);
305 const auto epoch = u->lastCheckpointedEpoch();
311 AbstractBufferMgr* fm =
getFileMgr(db_id, tb_id);
313 dynamic_cast<FileMgr*
>(fm)->resetEpochFloor();
318 AbstractBufferMgr* opened_fm =
findFileMgr(db_id, tb_id);
328 auto u = std::make_unique<FileMgr>(0,
this, file_mgr_key,
true);
331 return lazy_initialized_stats_[file_mgr_key];
340 file_mgr->compactFiles();
void writeFileMgrData(FileMgr *fileMgr=0)
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
std::vector< int > ChunkKey
void deleteFileMgr(const int32_t db_id, const int32_t tb_id)
int32_t epoch_
number of threads used when loading data
std::shared_ptr< ForeignStorageInterface > fsi_
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
std::map< TablePair, std::shared_ptr< FileMgr > > ownedFileMgrs_
size_t getNumChunks() override
int32_t lastCheckpointedEpoch() const
Returns value of epoch at last checkpoint.
std::shared_lock< T > shared_lock
std::map< TablePair, AbstractBufferMgr * > allFileMgrs_
void resetTableEpochFloor(const int32_t db_id, const int32_t tb_id)
void setTableEpoch(const int32_t db_id, const int32_t tb_id, const int32_t start_epoch)
StorageStats getStorageStats(const int32_t db_id, const int32_t tb_id)
AbstractBufferMgr * findFileMgrUnlocked(const int32_t db_id, const int32_t tb_id)
ChunkKeyToChunkMap chunkIndex_
std::unique_lock< T > unique_lock
std::shared_ptr< FileMgr > getSharedFileMgr(const int db_id, const int table_id)
bool existsDiffBetweenFileMgrParamsAndFileMgr(FileMgr *file_mgr, const FileMgrParams &file_mgr_params) const
void compactDataFiles(const int32_t db_id, const int32_t tb_id)
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
size_t num_reader_threads_
The OS file system path containing the files.
std::map< TablePair, int32_t > max_rollback_epochs_per_table_
void setFileMgr(const int db_id, const int table_id, std::shared_ptr< FileMgr > file_mgr)
void closeFileMgr(const int32_t db_id, const int32_t tb_id)
void setFileMgrParams(const int32_t db_id, const int32_t tb_id, const FileMgrParams &file_mgr_params)
AbstractBufferMgr * findFileMgr(const int32_t db_id, const int32_t tb_id)
std::map< TablePair, StorageStats > lazy_initialized_stats_
int32_t maxRollbackEpochs()
Returns value max_rollback_epochs.
std::pair< const int32_t, const int32_t > TablePair
int32_t max_rollback_epochs
void removeTableRelatedDS(const int32_t db_id, const int32_t tb_id) override
heavyai::shared_mutex fileMgrs_mutex_
A selection of helper methods for File I/O.
bool dbConvert_
used to set FileMgr metadta_page_size_
size_t getTableEpoch(const int32_t db_id, const int32_t tb_id)