OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FileBuffer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 #include <future>
25 #include <map>
26 #include <thread>
27 #include <utility>
29 #include "Shared/File.h"
30 #include "Shared/checked_alloc.h"
31 #include "Shared/scope.h"
32 
33 using namespace std;
34 
35 namespace File_Namespace {
36 
37 FileBuffer::FileBuffer(FileMgr* fm,
38  const size_t pageSize,
39  const ChunkKey& chunkKey,
40  const size_t initialSize)
41  : AbstractBuffer(fm->getDeviceId())
42  , fm_(fm)
43  , metadataPageSize_(fm_->getMetadataPageSize())
44  , metadataPages_(metadataPageSize_)
45  , pageSize_(pageSize)
46  , chunkKey_(chunkKey) {
47  // Create a new FileBuffer
48  CHECK(fm_);
52  //@todo reintroduce initialSize - need to develop easy way of
53  // differentiating these pre-allocated pages from "written-to" pages
54  /*
55  if (initalSize > 0) {
56  // should expand to initialSize bytes
57  size_t initialNumPages = (initalSize + pageSize_ -1) / pageSize_;
58  int32_t epoch = fm_->epoch();
59  for (size_t pageNum = 0; pageNum < initialNumPages; ++pageNum) {
60  Page page = addNewMultiPage(epoch);
61  writeHeader(page,pageNum,epoch);
62  }
63  }
64  */
65 }
66 
68  const size_t pageSize,
69  const ChunkKey& chunkKey,
70  const SQLTypeInfo sqlType,
71  const size_t initialSize)
72  : AbstractBuffer(fm->getDeviceId(), sqlType)
73  , fm_(fm)
74  , metadataPageSize_(fm->getMetadataPageSize())
75  , metadataPages_(metadataPageSize_)
76  , pageSize_(pageSize)
77  , chunkKey_(chunkKey) {
78  CHECK(fm_);
81 }
82 
84  /* const size_t pageSize,*/ const ChunkKey& chunkKey,
85  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
86  const std::vector<HeaderInfo>::const_iterator& headerEndIt)
87  : AbstractBuffer(fm->getDeviceId())
88  , fm_(fm)
89  , metadataPageSize_(fm->getMetadataPageSize())
90  , metadataPages_(metadataPageSize_)
91  , pageSize_(0)
92  , chunkKey_(chunkKey) {
93  // We are being assigned an existing FileBuffer on disk
94 
95  CHECK(fm_);
97  int32_t lastPageId = -1;
98  int32_t curPageId = 0;
99  for (auto vecIt = headerStartIt; vecIt != headerEndIt; ++vecIt) {
100  curPageId = vecIt->pageId;
101 
102  // We only want to read last metadata page
103  if (curPageId == -1) { // stats page
104  metadataPages_.push(vecIt->page, vecIt->versionEpoch);
105  } else {
106  if (curPageId != lastPageId) {
107  // protect from bad data on disk, and give diagnostics
108  if (fm->failOnReadError()) {
109  if (curPageId != lastPageId + 1) {
110  LOG(FATAL) << "Failure reading DB file " << show_chunk(chunkKey)
111  << " Current page " << curPageId << " last page " << lastPageId
112  << " epoch " << vecIt->versionEpoch;
113  }
114  }
115  if (lastPageId == -1) { // If we are on first real page
117  }
118  MultiPage multiPage(pageSize_);
119  multiPages_.push_back(multiPage);
120  lastPageId = curPageId;
121  }
122  multiPages_.back().push(vecIt->page, vecIt->versionEpoch);
123  }
124  }
125  if (curPageId == -1) { // meaning there was only a metadata page
127  }
128 }
129 
131  // need to free pages
132  // NOP
133 }
134 
135 void FileBuffer::reserve(const size_t numBytes) {
136  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
137  size_t numCurrentPages = multiPages_.size();
138  auto epoch = getFileMgrEpoch();
139  for (size_t pageNum = numCurrentPages; pageNum < numPagesRequested; ++pageNum) {
140  Page page = addNewMultiPage(epoch);
141  writeHeader(page, pageNum, epoch);
142  }
143 }
144 
145 namespace {
146 size_t calculate_buffer_header_size(size_t chunk_size) {
147  // Additional 3 * sizeof(int32_t) is for headerSize, pageId, and versionEpoch
148  size_t header_size = (chunk_size + 3) * sizeof(int32_t);
149  size_t header_mod = header_size % FileBuffer::kHeaderBufferOffset;
150  if (header_mod > 0) {
151  header_size += FileBuffer::kHeaderBufferOffset - header_mod;
152  }
153  return header_size;
154 }
155 } // namespace
156 
159 }
160 
162  constexpr size_t max_chunk_size{5};
163  return calculate_buffer_header_size(max_chunk_size) + 1;
164 }
165 
166 void FileBuffer::freePage(const Page& page) {
167  freePage(page, false);
168 }
169 
170 void FileBuffer::freePage(const Page& page, const bool isRolloff) {
171  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
172  CHECK(fileInfo);
173  fileInfo->freePage(page.pageNum, isRolloff, getFileMgrEpoch());
174 }
175 
177  size_t num_pages_freed = metadataPages_.pageVersions.size();
178  for (auto metaPageIt = metadataPages_.pageVersions.begin();
179  metaPageIt != metadataPages_.pageVersions.end();
180  ++metaPageIt) {
181  freePage(metaPageIt->page, false /* isRolloff */);
182  }
183  while (metadataPages_.pageVersions.size() > 0) {
185  }
186  return num_pages_freed;
187 }
188 
190  size_t num_pages_freed = multiPages_.size();
191  for (auto multiPageIt = multiPages_.begin(); multiPageIt != multiPages_.end();
192  ++multiPageIt) {
193  for (auto pageIt = multiPageIt->pageVersions.begin();
194  pageIt != multiPageIt->pageVersions.end();
195  ++pageIt) {
196  freePage(pageIt->page, false /* isRolloff */);
197  }
198  }
199  multiPages_.clear();
200  return num_pages_freed;
201 }
202 
204  return freeMetadataPages() + freeChunkPages();
205 }
206 
208  const int32_t targetEpoch,
209  const int32_t currentEpoch) {
210  std::vector<EpochedPage> epochedPagesToFree =
211  multiPage.freePagesBeforeEpoch(targetEpoch, currentEpoch);
212  for (const auto& epochedPageToFree : epochedPagesToFree) {
213  freePage(epochedPageToFree.page, true /* isRolloff */);
214  }
215 }
216 
217 void FileBuffer::freePagesBeforeEpoch(const int32_t targetEpoch) {
218  // This method is only safe to be called within a checkpoint, after the sync and epoch
219  // increment where a failure at any point32_t in the process would lead to a safe
220  // rollback
221  auto currentEpoch = getFileMgrEpoch();
222  CHECK_LE(targetEpoch, currentEpoch);
223  freePagesBeforeEpochForMultiPage(metadataPages_, targetEpoch, currentEpoch);
224  for (auto& multiPage : multiPages_) {
225  freePagesBeforeEpochForMultiPage(multiPage, targetEpoch, currentEpoch);
226  }
227 
228  // Check if all buffer pages can be freed
229  if (size_ == 0) {
230  size_t max_historical_buffer_size{0};
231  for (auto& epoch_page : metadataPages_.pageVersions) {
232  // Create buffer that is used to get the buffer size at the epoch version
233  FileBuffer buffer{fm_, pageSize_, chunkKey_};
234  buffer.readMetadata(epoch_page.page);
235  max_historical_buffer_size = std::max(max_historical_buffer_size, buffer.size());
236  }
237 
238  // Free all chunk pages, if none of the old chunk versions has any data
239  if (max_historical_buffer_size == 0) {
240  freeChunkPages();
241  }
242  }
243 }
244 
245 struct readThreadDS {
246  FileMgr* t_fm; // ptr to FileMgr
247  size_t t_startPage; // start page for the thread
248  size_t t_endPage; // last page for the thread
249  int8_t* t_curPtr; // pointer to the current location of the target for the thread
250  size_t t_bytesLeft; // number of bytes to be read in the thread
251  size_t t_startPageOffset; // offset - used for the first page of the buffer
252  bool t_isFirstPage; // true - for first page of the buffer, false - otherwise
253  std::vector<MultiPage> multiPages; // MultiPages of the FileBuffer passed to the thread
254 };
255 
256 static size_t readForThread(FileBuffer* fileBuffer, const readThreadDS threadDS) {
257  size_t startPage = threadDS.t_startPage; // start reading at startPage, including it
258  size_t endPage = threadDS.t_endPage; // stop reading at endPage, not including it
259  int8_t* curPtr = threadDS.t_curPtr;
260  size_t bytesLeft = threadDS.t_bytesLeft;
261  size_t totalBytesRead = 0;
262  bool isFirstPage = threadDS.t_isFirstPage;
263 
264  // Traverse the logical pages
265  for (size_t pageNum = startPage; pageNum < endPage; ++pageNum) {
266  CHECK(threadDS.multiPages[pageNum].pageSize == fileBuffer->pageSize());
267  Page page = threadDS.multiPages[pageNum].current().page;
268 
269  FileInfo* fileInfo = threadDS.t_fm->getFileInfoForFileId(page.fileId);
270  CHECK(fileInfo);
271 
272  // Read the page into the destination (dst) buffer at its
273  // current (cur) location
274  size_t bytesRead = 0;
275  if (isFirstPage) {
276  bytesRead = fileInfo->read(
277  page.pageNum * fileBuffer->pageSize() + threadDS.t_startPageOffset +
278  fileBuffer->reservedHeaderSize(),
279  min(fileBuffer->pageDataSize() - threadDS.t_startPageOffset, bytesLeft),
280  curPtr);
281  isFirstPage = false;
282  } else {
283  bytesRead = fileInfo->read(
284  page.pageNum * fileBuffer->pageSize() + fileBuffer->reservedHeaderSize(),
285  min(fileBuffer->pageDataSize(), bytesLeft),
286  curPtr);
287  }
288  curPtr += bytesRead;
289  bytesLeft -= bytesRead;
290  totalBytesRead += bytesRead;
291  }
292  CHECK(bytesLeft == 0);
293 
294  return (totalBytesRead);
295 }
296 
297 void FileBuffer::read(int8_t* const dst,
298  const size_t numBytes,
299  const size_t offset,
300  const MemoryLevel dstBufferType,
301  const int32_t deviceId) {
302  if (dstBufferType != CPU_LEVEL) {
303  LOG(FATAL) << "Unsupported Buffer type";
304  }
305 
306  // variable declarations
307  size_t startPage = offset / pageDataSize_;
308  size_t startPageOffset = offset % pageDataSize_;
309  size_t numPagesToRead =
310  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
311  /*
312  if (startPage + numPagesToRead > multiPages_.size()) {
313  cout << "Start page: " << startPage << endl;
314  cout << "Num pages to read: " << numPagesToRead << endl;
315  cout << "Num multipages: " << multiPages_.size() << endl;
316  cout << "Offset: " << offset << endl;
317  cout << "Num bytes: " << numBytes << endl;
318  }
319  */
320 
321  CHECK(startPage + numPagesToRead <= multiPages_.size())
322  << "Requested page out of bounds";
323 
324  size_t numPagesPerThread = 0;
325  size_t numBytesCurrent = numBytes; // total number of bytes still to be read
326  size_t bytesRead = 0; // total number of bytes already being read
327  size_t bytesLeftForThread = 0; // number of bytes to be read in the thread
328  size_t numExtraPages = 0; // extra pages to be assigned one per thread as needed
329  size_t numThreads = fm_->getNumReaderThreads();
330  std::vector<readThreadDS>
331  threadDSArr; // array of threadDS, needed to avoid racing conditions
332 
333  if (numPagesToRead > numThreads) {
334  numPagesPerThread = numPagesToRead / numThreads;
335  numExtraPages = numPagesToRead - (numThreads * numPagesPerThread);
336  } else {
337  numThreads = numPagesToRead;
338  numPagesPerThread = 1;
339  }
340 
341  /* set threadDS for the first thread */
342  readThreadDS threadDS;
343  threadDS.t_fm = fm_;
344  threadDS.t_startPage = offset / pageDataSize_;
345  if (numExtraPages > 0) {
346  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
347  numExtraPages--;
348  } else {
349  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
350  }
351  threadDS.t_curPtr = dst;
352  threadDS.t_startPageOffset = offset % pageDataSize_;
353  threadDS.t_isFirstPage = true;
354 
355  bytesLeftForThread = min(((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_ -
356  threadDS.t_startPageOffset),
357  numBytesCurrent);
358  threadDS.t_bytesLeft = bytesLeftForThread;
359  threadDS.multiPages = getMultiPage();
360 
361  if (numThreads == 1) {
362  bytesRead += readForThread(this, threadDS);
363  } else {
364  std::vector<std::future<size_t>> threads;
365 
366  for (size_t i = 0; i < numThreads; i++) {
367  threadDSArr.push_back(threadDS);
368  threads.push_back(
369  std::async(std::launch::async, readForThread, this, threadDSArr[i]));
370 
371  // calculate elements of threadDS
372  threadDS.t_fm = fm_;
373  threadDS.t_isFirstPage = false;
374  threadDS.t_curPtr += bytesLeftForThread;
375  threadDS.t_startPage +=
376  threadDS.t_endPage -
377  threadDS.t_startPage; // based on # of pages read on previous iteration
378  if (numExtraPages > 0) {
379  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread + 1;
380  numExtraPages--;
381  } else {
382  threadDS.t_endPage = threadDS.t_startPage + numPagesPerThread;
383  }
384  numBytesCurrent -= bytesLeftForThread;
385  bytesLeftForThread = min(
386  ((threadDS.t_endPage - threadDS.t_startPage) * pageDataSize_), numBytesCurrent);
387  threadDS.t_bytesLeft = bytesLeftForThread;
388  threadDS.multiPages = getMultiPage();
389  }
390 
391  for (auto& p : threads) {
392  p.wait();
393  }
394  for (auto& p : threads) {
395  bytesRead += p.get();
396  }
397  }
398  CHECK(bytesRead == numBytes);
399 }
400 
402  Page& destPage,
403  const size_t numBytes,
404  const size_t offset) {
405  CHECK_LE(offset + numBytes, pageDataSize_);
406  FileInfo* srcFileInfo = fm_->getFileInfoForFileId(srcPage.fileId);
407  FileInfo* destFileInfo = fm_->getFileInfoForFileId(destPage.fileId);
408 
409  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
410  ScopeGuard guard = [&buffer] { free(buffer); };
411  size_t bytesRead = srcFileInfo->read(
412  srcPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
413  CHECK(bytesRead == numBytes);
414  size_t bytesWritten = destFileInfo->write(
415  destPage.pageNum * pageSize_ + offset + reservedHeaderSize_, numBytes, buffer);
416  CHECK(bytesWritten == numBytes);
417 }
418 
419 Page FileBuffer::addNewMultiPage(const int32_t epoch) {
420  Page page = fm_->requestFreePage(pageSize_, false);
421  MultiPage multiPage(pageSize_);
422  multiPage.push(page, epoch);
423  multiPages_.emplace_back(multiPage);
424  return page;
425 }
426 
428  const int32_t pageId,
429  const int32_t epoch,
430  const bool writeMetadata) {
431  int32_t intHeaderSize = chunkKey_.size() + 3; // does not include chunkSize
432  vector<int32_t> header(intHeaderSize);
433  // in addition to chunkkey we need size of header, pageId, version
434  header[0] =
435  (intHeaderSize - 1) * sizeof(int32_t); // don't need to include size of headerSize
436  // value - sizeof(size_t) is for chunkSize
437  std::copy(chunkKey_.begin(), chunkKey_.end(), header.begin() + 1);
438  header[intHeaderSize - 2] = pageId;
439  header[intHeaderSize - 1] = epoch;
440  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
441  size_t pageSize = writeMetadata ? metadataPageSize_ : pageSize_;
442  fileInfo->write(
443  page.pageNum * pageSize, (intHeaderSize) * sizeof(int32_t), (int8_t*)&header[0]);
444 }
445 
446 void FileBuffer::readMetadata(const Page& page) {
447  FILE* f = fm_->getFileForFileId(page.fileId);
448  fseek(f, page.pageNum * metadataPageSize_ + reservedHeaderSize_, SEEK_SET);
449  fread((int8_t*)&pageSize_, sizeof(size_t), 1, f);
450  fread((int8_t*)&size_, sizeof(size_t), 1, f);
451  vector<int32_t> typeData(
452  NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
453  // encodingType, encodingBits all as int
454  fread((int8_t*)&(typeData[0]), sizeof(int32_t), typeData.size(), f);
455  int32_t version = typeData[0];
456  CHECK(version == METADATA_VERSION); // add backward compatibility code here
457  bool has_encoder = static_cast<bool>(typeData[1]);
458  if (has_encoder) {
459  sql_type_.set_type(static_cast<SQLTypes>(typeData[2]));
460  sql_type_.set_subtype(static_cast<SQLTypes>(typeData[3]));
461  sql_type_.set_dimension(typeData[4]);
462  sql_type_.set_scale(typeData[5]);
463  sql_type_.set_notnull(static_cast<bool>(typeData[6]));
464  sql_type_.set_compression(static_cast<EncodingType>(typeData[7]));
465  sql_type_.set_comp_param(typeData[8]);
466  sql_type_.set_size(typeData[9]);
468  encoder_->readMetadata(f);
469  }
470 }
471 
472 void FileBuffer::writeMetadata(const int32_t epoch) {
473  // Right now stats page is size_ (in bytes), bufferType, encodingType,
474  // encodingDataType, numElements
475  Page page = fm_->requestFreePage(metadataPageSize_, true);
476  writeHeader(page, -1, epoch, true);
477  FILE* f = fm_->getFileForFileId(page.fileId);
478  fseek(f, page.pageNum * metadataPageSize_ + reservedHeaderSize_, SEEK_SET);
479  fwrite((int8_t*)&pageSize_, sizeof(size_t), 1, f);
480  fwrite((int8_t*)&size_, sizeof(size_t), 1, f);
481  vector<int32_t> typeData(
482  NUM_METADATA); // assumes we will encode hasEncoder, bufferType,
483  // encodingType, encodingBits all as int32_t
484  typeData[0] = METADATA_VERSION;
485  typeData[1] = static_cast<int32_t>(hasEncoder());
486  if (hasEncoder()) {
487  typeData[2] = static_cast<int32_t>(sql_type_.get_type());
488  typeData[3] = static_cast<int32_t>(sql_type_.get_subtype());
489  typeData[4] = sql_type_.get_dimension();
490  typeData[5] = sql_type_.get_scale();
491  typeData[6] = static_cast<int32_t>(sql_type_.get_notnull());
492  typeData[7] = static_cast<int32_t>(sql_type_.get_compression());
493  typeData[8] = sql_type_.get_comp_param();
494  typeData[9] = sql_type_.get_size();
495  }
496  fwrite((int8_t*)&(typeData[0]), sizeof(int32_t), typeData.size(), f);
497  if (hasEncoder()) { // redundant
498  encoder_->writeMetadata(f);
499  }
500  metadataPages_.push(page, epoch);
501 }
502 
503 void FileBuffer::append(int8_t* src,
504  const size_t numBytes,
505  const MemoryLevel srcBufferType,
506  const int32_t deviceId) {
507  setAppended();
508 
509  size_t startPage = size_ / pageDataSize_;
510  size_t startPageOffset = size_ % pageDataSize_;
511  size_t numPagesToWrite =
512  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
513  size_t bytesLeft = numBytes;
514  int8_t* curPtr = src; // a pointer to the current location in dst being written to
515  size_t initialNumPages = multiPages_.size();
516  size_ = size_ + numBytes;
517  auto epoch = getFileMgrEpoch();
518  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
519  Page page;
520  if (pageNum >= initialNumPages) {
521  page = addNewMultiPage(epoch);
522  writeHeader(page, pageNum, epoch);
523  } else {
524  // we already have a new page at current
525  // epoch for this page - just grab this page
526  page = multiPages_[pageNum].current().page;
527  }
528  CHECK(page.fileId >= 0); // make sure page was initialized
529  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
530  size_t bytesWritten;
531  if (pageNum == startPage) {
532  bytesWritten = fileInfo->write(
533  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
534  min(pageDataSize_ - startPageOffset, bytesLeft),
535  curPtr);
536  } else {
537  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
538  min(pageDataSize_, bytesLeft),
539  curPtr);
540  }
541  curPtr += bytesWritten;
542  bytesLeft -= bytesWritten;
543  }
544  CHECK(bytesLeft == 0);
545 }
546 
547 void FileBuffer::write(int8_t* src,
548  const size_t numBytes,
549  const size_t offset,
550  const MemoryLevel srcBufferType,
551  const int32_t deviceId) {
552  CHECK(srcBufferType == CPU_LEVEL) << "Unsupported Buffer type";
553 
554  bool tempIsAppended = false;
555  setDirty();
556  if (offset < size_) {
557  setUpdated();
558  }
559  if (offset + numBytes > size_) {
560  tempIsAppended = true; // because is_appended_ could have already been true - to
561  // avoid rewriting header
562  setAppended();
563  size_ = offset + numBytes;
564  }
565 
566  size_t startPage = offset / pageDataSize_;
567  size_t startPageOffset = offset % pageDataSize_;
568  size_t numPagesToWrite =
569  (numBytes + startPageOffset + pageDataSize_ - 1) / pageDataSize_;
570  size_t bytesLeft = numBytes;
571  int8_t* curPtr = src; // a pointer to the current location in dst being written to
572  size_t initialNumPages = multiPages_.size();
573  auto epoch = getFileMgrEpoch();
574 
575  if (startPage >
576  initialNumPages) { // means there is a gap we need to allocate pages for
577  for (size_t pageNum = initialNumPages; pageNum < startPage; ++pageNum) {
578  Page page = addNewMultiPage(epoch);
579  writeHeader(page, pageNum, epoch);
580  }
581  }
582  for (size_t pageNum = startPage; pageNum < startPage + numPagesToWrite; ++pageNum) {
583  Page page;
584  if (pageNum >= initialNumPages) {
585  page = addNewMultiPage(epoch);
586  writeHeader(page, pageNum, epoch);
587  } else if (multiPages_[pageNum].current().epoch <
588  epoch) { // need to create new page b/c this current one lags epoch and we
589  // can't overwrite it also need to copy if we are on first or
590  // last page
591  Page lastPage = multiPages_[pageNum].current().page;
592  page = fm_->requestFreePage(pageSize_, false);
593  multiPages_[pageNum].push(page, epoch);
594  if (pageNum == startPage && startPageOffset > 0) {
595  // copyPage takes care of header offset so don't worry
596  // about it
597  copyPage(lastPage, page, startPageOffset, 0);
598  }
599  if (pageNum == (startPage + numPagesToWrite - 1) &&
600  bytesLeft > 0) { // bytesLeft should always > 0
601  copyPage(lastPage,
602  page,
603  pageDataSize_ - bytesLeft,
604  bytesLeft); // these would be empty if we're appending but we won't
605  // worry about it right now
606  }
607  writeHeader(page, pageNum, epoch);
608  } else {
609  // we already have a new page at current
610  // epoch for this page - just grab this page
611  page = multiPages_[pageNum].current().page;
612  }
613  CHECK(page.fileId >= 0); // make sure page was initialized
614  FileInfo* fileInfo = fm_->getFileInfoForFileId(page.fileId);
615  size_t bytesWritten;
616  if (pageNum == startPage) {
617  bytesWritten = fileInfo->write(
618  page.pageNum * pageSize_ + startPageOffset + reservedHeaderSize_,
619  min(pageDataSize_ - startPageOffset, bytesLeft),
620  curPtr);
621  } else {
622  bytesWritten = fileInfo->write(page.pageNum * pageSize_ + reservedHeaderSize_,
623  min(pageDataSize_, bytesLeft),
624  curPtr);
625  }
626  curPtr += bytesWritten;
627  bytesLeft -= bytesWritten;
628  if (tempIsAppended && pageNum == startPage + numPagesToWrite - 1) { // if last page
629  //@todo below can lead to undefined - we're overwriting num
630  // bytes valid at checkpoint
631  writeHeader(page, 0, multiPages_[0].current().epoch, true);
632  }
633  }
634  CHECK(bytesLeft == 0);
635 }
636 
638  auto [db_id, tb_id] = get_table_prefix(chunkKey_);
639  return fm_->epoch(db_id, tb_id);
640 }
641 
642 std::string FileBuffer::dump() const {
643  std::stringstream ss;
644  ss << "chunk_key = " << show_chunk(chunkKey_) << "\n";
645  ss << "has_encoder = " << (hasEncoder() ? "true\n" : "false\n");
646  ss << "size_ = " << size_ << "\n";
647  return ss.str();
648 }
649 
651  CHECK(metadataPages_.current().page.fileId != -1); // was initialized
654 }
655 
657  // Detect the case where a page is missing by comparing the amount of pages read
658  // with the metadata size.
659  return ((size() + pageDataSize_ - 1) / pageDataSize_ != multiPages_.size());
660 }
661 
663  size_t total_size = 0;
664  for (const auto& multi_page : multiPages_) {
665  total_size += multi_page.pageVersions.size();
666  }
667  return total_size;
668 }
669 
670 } // namespace File_Namespace
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:147
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:392
void set_compression(EncodingType c)
Definition: sqltypes.h:481
void set_size(int s)
Definition: sqltypes.h:478
virtual int32_t epoch(int32_t db_id, int32_t tb_id) const
Returns current value of epoch - should be one greater than recorded at last checkpoint. Because FileMgr only contains buffers from one table we can just return the FileMgr&#39;s epoch instead of finding a table-specific epoch.
Definition: FileMgr.h:281
std::vector< int > ChunkKey
Definition: types.h:36
size_t write(const size_t offset, const size_t size, const int8_t *buf)
Definition: FileInfo.cpp:64
void freePagesBeforeEpochForMultiPage(MultiPage &multiPage, const int32_t targetEpoch, const int32_t currentEpoch)
Definition: FileBuffer.cpp:207
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
static constexpr size_t kHeaderBufferOffset
Definition: FileBuffer.h:167
virtual Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:877
void freePagesBeforeEpoch(const int32_t targetEpoch)
Definition: FileBuffer.cpp:217
static size_t readForThread(FileBuffer *fileBuffer, const readThreadDS threadDS)
Definition: FileBuffer.cpp:256
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
void pop()
Purges the oldest Page.
Definition: Page.h:110
#define LOG(tag)
Definition: Logger.h:285
size_t numChunkPages() const
Definition: FileBuffer.cpp:662
Page addNewMultiPage(const int32_t epoch)
Definition: FileBuffer.cpp:419
HOST DEVICE int get_scale() const
Definition: sqltypes.h:396
void writeMetadata(const int32_t epoch)
Definition: FileBuffer.cpp:472
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Writes the contents of source (src) into new versions of the affected logical pages.
Definition: FileBuffer.cpp:547
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:471
const size_t metadataPageSize_
Definition: FileBuffer.h:198
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:200
std::vector< MultiPage > multiPages
Definition: FileBuffer.cpp:253
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:57
#define CHECK_GT(x, y)
Definition: Logger.h:305
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
void freePage(int32_t pageId, const bool isRolloff, int32_t epoch)
Definition: FileInfo.cpp:187
size_t calculate_buffer_header_size(size_t chunk_size)
Definition: FileBuffer.cpp:146
std::deque< EpochedPage > pageVersions
Definition: Page.h:81
future< Result > async(Fn &&fn, Args &&...args)
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
int32_t fileId
Definition: Page.h:47
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:137
string version
Definition: setup.in.py:73
void set_scale(int s)
Definition: sqltypes.h:475
std::string dump() const
Definition: FileBuffer.cpp:642
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:1003
An AbstractBuffer is a unit of data management for a data manager.
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Definition: FileBuffer.cpp:503
void set_comp_param(int p)
Definition: sqltypes.h:482
void writeHeader(Page &page, const int32_t pageId, const int32_t epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:427
virtual size_t reservedHeaderSize() const
Definition: FileBuffer.h:144
#define CHECK_LE(x, y)
Definition: Logger.h:304
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
#define NUM_METADATA
Definition: FileBuffer.h:35
void set_dimension(int d)
Definition: sqltypes.h:472
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
Definition: Page.h:102
~FileBuffer() override
Destructor.
Definition: FileBuffer.cpp:130
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:70
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:393
virtual bool failOnReadError() const
True if a read error should cause a fatal error.
Definition: FileMgr.h:366
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::unique_ptr< Encoder > encoder_
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:402
FileBuffer(FileMgr *fm, const size_t pageSize, const ChunkKey &chunkKey, const size_t initialSize=0)
Constructs a FileBuffer object.
Definition: FileBuffer.cpp:37
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:62
void set_notnull(bool n)
Definition: sqltypes.h:477
#define CHECK(condition)
Definition: Logger.h:291
void reserve(const size_t numBytes) override
Definition: FileBuffer.cpp:135
list header
Definition: report.py:113
FileInfo * getFileInfoForFileId(const int32_t fileId) const
Definition: FileMgr.h:229
std::vector< EpochedPage > freePagesBeforeEpoch(const int32_t target_epoch, const int32_t current_epoch)
Definition: Page.h:117
static size_t getMinPageSize()
Definition: FileBuffer.cpp:161
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:398
void read(int8_t *const dst, const size_t numBytes=0, const size_t offset=0, const MemoryLevel dstMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Definition: FileBuffer.cpp:297
void readMetadata(const Page &page)
Definition: FileBuffer.cpp:446
void copyPage(Page &srcPage, Page &destPage, const size_t numBytes, const size_t offset=0)
Definition: FileBuffer.cpp:401
EpochedPage current() const
Returns a reference to the most recent version of the page.
Definition: Page.h:94
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
Definition: FileBuffer.h:140
#define METADATA_VERSION
Definition: FileBuffer.h:36
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:79
A selection of helper methods for File I/O.
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...
Definition: FileMgr.h:316
void freePage(const Page &page)
Definition: FileBuffer.cpp:166
bool isMissingPages() const
Definition: FileBuffer.cpp:656
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:470