35 namespace File_Namespace {
38 const size_t pageSize,
40 const size_t initialSize)
43 , metadataPageSize_(fm_->getMetadataPageSize())
44 , metadataPages_(metadataPageSize_)
46 , chunkKey_(chunkKey) {
68 const size_t pageSize,
71 const size_t initialSize)
74 , metadataPageSize_(fm->getMetadataPageSize())
75 , metadataPages_(metadataPageSize_)
77 , chunkKey_(chunkKey) {
85 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
86 const std::vector<HeaderInfo>::const_iterator& headerEndIt)
89 , metadataPageSize_(fm->getMetadataPageSize())
90 , metadataPages_(metadataPageSize_)
92 , chunkKey_(chunkKey) {
97 int32_t lastPageId = -1;
98 int32_t curPageId = 0;
99 for (
auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
100 curPageId = vecIt->pageId;
103 if (curPageId == -1) {
106 if (curPageId != lastPageId) {
109 if (curPageId != lastPageId + 1) {
111 <<
" Current page " << curPageId <<
" last page " << lastPageId
112 <<
" epoch " << vecIt->versionEpoch;
115 if (lastPageId == -1) {
120 lastPageId = curPageId;
122 multiPages_.back().push(vecIt->page, vecIt->versionEpoch);
125 if (curPageId == -1) {
139 for (
size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
148 size_t header_size = (chunk_size + 3) *
sizeof(int32_t);
150 if (header_mod > 0) {
162 constexpr
size_t max_chunk_size{5};
186 return num_pages_freed;
193 for (
auto pageIt = multiPageIt->pageVersions.begin();
194 pageIt != multiPageIt->pageVersions.end();
200 return num_pages_freed;
208 const int32_t targetEpoch,
209 const int32_t currentEpoch) {
210 std::vector<EpochedPage> epochedPagesToFree =
212 for (
const auto& epochedPageToFree : epochedPagesToFree) {
213 freePage(epochedPageToFree.page,
true );
222 CHECK_LE(targetEpoch, currentEpoch);
230 size_t max_historical_buffer_size{0};
234 buffer.readMetadata(epoch_page.page);
235 max_historical_buffer_size = std::max(max_historical_buffer_size, buffer.size());
239 if (max_historical_buffer_size == 0) {
261 size_t totalBytesRead = 0;
265 for (
size_t pageNum = startPage; pageNum < endPage; ++pageNum) {
274 size_t bytesRead = 0;
276 bytesRead = fileInfo->
read(
283 bytesRead = fileInfo->
read(
289 bytesLeft -= bytesRead;
290 totalBytesRead += bytesRead;
292 CHECK(bytesLeft == 0);
294 return (totalBytesRead);
298 const size_t numBytes,
301 const int32_t deviceId) {
303 LOG(
FATAL) <<
"Unsupported Buffer type";
309 size_t numPagesToRead =
322 <<
"Requested page out of bounds";
324 size_t numPagesPerThread = 0;
325 size_t numBytesCurrent = numBytes;
326 size_t bytesRead = 0;
327 size_t bytesLeftForThread = 0;
328 size_t numExtraPages = 0;
330 std::vector<readThreadDS>
333 if (numPagesToRead > numThreads) {
334 numPagesPerThread = numPagesToRead / numThreads;
335 numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
337 numThreads = numPagesToRead;
338 numPagesPerThread = 1;
345 if (numExtraPages > 0) {
361 if (numThreads == 1) {
364 std::vector<std::future<size_t>> threads;
366 for (
size_t i = 0; i < numThreads; i++) {
367 threadDSArr.push_back(threadDS);
374 threadDS.
t_curPtr += bytesLeftForThread;
378 if (numExtraPages > 0) {
384 numBytesCurrent -= bytesLeftForThread;
385 bytesLeftForThread = min(
391 for (
auto& p : threads) {
394 for (
auto& p : threads) {
395 bytesRead += p.get();
398 CHECK(bytesRead == numBytes);
403 const size_t numBytes,
404 const size_t offset) {
409 int8_t* buffer =
reinterpret_cast<int8_t*
>(
checked_malloc(numBytes));
410 ScopeGuard guard = [&buffer] { free(buffer); };
411 size_t bytesRead = srcFileInfo->
read(
413 CHECK(bytesRead == numBytes);
414 size_t bytesWritten = destFileInfo->
write(
416 CHECK(bytesWritten == numBytes);
422 multiPage.
push(page, epoch);
428 const int32_t pageId,
430 const bool writeMetadata) {
431 int32_t intHeaderSize =
chunkKey_.size() + 3;
432 vector<int32_t>
header(intHeaderSize);
435 (intHeaderSize - 1) *
sizeof(int32_t);
438 header[intHeaderSize - 2] = pageId;
439 header[intHeaderSize - 1] = epoch;
443 page.
pageNum * pageSize, (intHeaderSize) *
sizeof(int32_t), (int8_t*)&header[0]);
449 fread((int8_t*)&
pageSize_,
sizeof(
size_t), 1, f);
450 fread((int8_t*)&
size_,
sizeof(
size_t), 1, f);
451 vector<int32_t> typeData(
454 fread((int8_t*)&(typeData[0]),
sizeof(int32_t), typeData.size(),
f);
457 bool has_encoder =
static_cast<bool>(typeData[1]);
479 fwrite((int8_t*)&
pageSize_,
sizeof(
size_t), 1, f);
480 fwrite((int8_t*)&
size_,
sizeof(
size_t), 1, f);
481 vector<int32_t> typeData(
485 typeData[1] =
static_cast<int32_t
>(
hasEncoder());
496 fwrite((int8_t*)&(typeData[0]),
sizeof(int32_t), typeData.size(),
f);
504 const size_t numBytes,
506 const int32_t deviceId) {
511 size_t numPagesToWrite =
513 size_t bytesLeft = numBytes;
514 int8_t* curPtr = src;
518 for (
size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
520 if (pageNum >= initialNumPages) {
531 if (pageNum == startPage) {
532 bytesWritten = fileInfo->
write(
541 curPtr += bytesWritten;
542 bytesLeft -= bytesWritten;
544 CHECK(bytesLeft == 0);
548 const size_t numBytes,
551 const int32_t deviceId) {
554 bool tempIsAppended =
false;
556 if (offset <
size_) {
559 if (offset + numBytes >
size_) {
560 tempIsAppended =
true;
563 size_ = offset + numBytes;
568 size_t numPagesToWrite =
570 size_t bytesLeft = numBytes;
571 int8_t* curPtr = src;
577 for (
size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
582 for (
size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
584 if (pageNum >= initialNumPages) {
594 if (pageNum == startPage && startPageOffset > 0) {
597 copyPage(lastPage, page, startPageOffset, 0);
599 if (pageNum == (startPage + numPagesToWrite - 1) &&
616 if (pageNum == startPage) {
617 bytesWritten = fileInfo->
write(
626 curPtr += bytesWritten;
627 bytesLeft -= bytesWritten;
628 if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) {
634 CHECK(bytesLeft == 0);
643 std::stringstream ss;
645 ss <<
"has_encoder = " << (
hasEncoder() ?
"true\n" :
"false\n");
646 ss <<
"size_ = " <<
size_ <<
"\n";
663 size_t total_size = 0;
665 total_size += multi_page.pageVersions.size();
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
HOST DEVICE SQLTypes get_subtype() const
void set_compression(EncodingType c)
virtual int32_t epoch(int32_t db_id, int32_t tb_id) const
Returns current value of epoch - should be one greater than recorded at last checkpoint. Because FileMgr only contains buffers from one table we can just return the FileMgr's epoch instead of finding a table-specific epoch.
std::vector< int > ChunkKey
size_t write(const size_t offset, const size_t size, const int8_t *buf)
void freePagesBeforeEpochForMultiPage(MultiPage &multiPage, const int32_t targetEpoch, const int32_t currentEpoch)
HOST DEVICE int get_size() const
size_t reservedHeaderSize_
static constexpr size_t kHeaderBufferOffset
virtual Page requestFreePage(size_t pagesize, const bool isMetadata)
void freePagesBeforeEpoch(const int32_t targetEpoch)
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
A logical page (Page) belongs to a file on disk.
void pop()
Purges the oldest Page.
size_t numChunkPages() const
Page addNewMultiPage(const int32_t epoch)
HOST DEVICE int get_scale() const
void writeMetadata(const int32_t epoch)
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Writes the contents of source (src) into new versions of the affected logical pages.
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
HOST DEVICE void set_subtype(SQLTypes st)
const size_t metadataPageSize_
std::vector< MultiPage > multiPages_
std::vector< MultiPage > multiPages
HOST DEVICE SQLTypes get_type() const
Represents/provides access to contiguous data stored in the file system.
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::string show_chunk(const ChunkKey &key)
void freePage(int32_t pageId, const bool isRolloff, int32_t epoch)
void initMetadataAndPageDataSize()
size_t calculate_buffer_header_size(size_t chunk_size)
std::deque< EpochedPage > pageVersions
future< Result > async(Fn &&fn, Args &&...args)
void * checked_malloc(const size_t size)
DEVICE auto copy(ARGS &&...args)
void setBufferHeaderSize()
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
An AbstractBuffer is a unit of data management for a data manager.
size_t pageNum
unique identifier of the owning file
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
void set_comp_param(int p)
void writeHeader(Page &page, const int32_t pageId, const int32_t epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
virtual size_t reservedHeaderSize() const
HOST DEVICE EncodingType get_compression() const
void set_dimension(int d)
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
~FileBuffer() override
Destructor.
size_t read(const size_t offset, const size_t size, int8_t *buf)
HOST DEVICE int get_dimension() const
virtual bool failOnReadError() const
True if a read error should cause a fatal error.
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::unique_ptr< Encoder > encoder_
size_t freeMetadataPages()
HOST DEVICE int get_comp_param() const
FileBuffer(FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
Constructs a FileBuffer object.
std::pair< int, int > get_table_prefix(const ChunkKey &key)
void reserve(const size_t numBytes) override
FileInfo * getFileInfoForFileId(const int32_t fileId) const
int32_t getFileMgrEpoch()
std::vector< EpochedPage > freePagesBeforeEpoch(const int32_t target_epoch, const int32_t current_epoch)
static size_t getMinPageSize()
HOST DEVICE bool get_notnull() const
void read(int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
void readMetadata(const Page &page)
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
EpochedPage current() const
Returns a reference to the most recent version of the page.
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
The MultiPage stores versions of the same logical page in a deque.
A selection of helper methods for File I/O.
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
void freePage(const Page &page)
bool isMissingPages() const
HOST DEVICE void set_type(SQLTypes t)