OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FileReader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "Shared/JsonUtils.h"
21 #include "Shared/file_path_util.h"
22 #include "Shared/file_type.h"
23 #include "Shared/misc.h"
24 
25 namespace foreign_storage {
26 
27 namespace {
37 void adjust_eof(size_t& read_size,
38  const size_t buffer_size,
39  char* buffer,
40  const char line_delim) {
41  if (read_size == 0 || buffer[read_size - 1] != line_delim) {
42  CHECK(buffer_size > read_size);
43  static_cast<char*>(buffer)[read_size] = line_delim;
44  read_size++;
45  } else if (read_size > 1 && buffer[read_size - 2] == line_delim) {
46  // Extra newline may have been due to the file encoding
47  // and may disappear during an append
48  read_size--;
49  }
50 }
51 
57 size_t offset_to_index(const std::vector<size_t>& cumulative_sizes, size_t byte_offset) {
58  auto iterator =
59  std::upper_bound(cumulative_sizes.begin(), cumulative_sizes.end(), byte_offset);
60  if (iterator == cumulative_sizes.end()) {
61  throw std::runtime_error{"Invalid offset into cumulative_sizes"};
62  }
63  return iterator - cumulative_sizes.begin();
64 }
65 
66 size_t get_data_size(size_t file_size, size_t header_size) {
67  // Add 1 byte for possible need to insert a newline
68  return file_size - header_size + 1;
69 }
70 
71 } // namespace
72 
73 SingleFileReader::SingleFileReader(const std::string& file_path,
74  const import_export::CopyParams& copy_params)
75  : FileReader(file_path, copy_params) {}
76 
78  return {{file_path_, getFirstLine()}};
79 }
80 
82  return isScanFinished();
83 }
84 
86  return file_path_;
87 }
88 
89 SingleTextFileReader::SingleTextFileReader(const std::string& file_path,
90  const import_export::CopyParams& copy_params)
91  : SingleFileReader(file_path, copy_params)
92  , scan_finished_(false)
93  , header_offset_(0)
94  , total_bytes_read_(0) {
95  file_ = fopen(file_path.c_str(), "rb");
96  if (!file_) {
97  throw std::runtime_error{"An error occurred when attempting to open file \"" +
98  file_path + "\". " + strerror(errno)};
99  }
100 
101  // Skip header and record offset
102  skipHeader();
103  fseek(file_, 0, SEEK_END);
104 
106  // Empty file
107  if (data_size_ == 0) {
108  scan_finished_ = true;
109  }
110 
111  if (fseek(file_, static_cast<long int>(header_offset_), SEEK_SET) != 0) {
112  throw std::runtime_error{"An error occurred when attempting to open file \"" +
113  file_path + "\". " + strerror(errno)};
114  };
115 }
116 
117 SingleTextFileReader::SingleTextFileReader(const std::string& file_path,
118  const import_export::CopyParams& copy_params,
119  const rapidjson::Value& value)
120  : SingleFileReader(file_path, copy_params)
121  , scan_finished_(true)
122  , header_offset_(0)
123  , total_bytes_read_(0) {
124  file_ = fopen(file_path.c_str(), "rb");
125  if (!file_) {
126  throw std::runtime_error{"An error occurred when attempting to open file \"" +
127  file_path + "\". " + strerror(errno)};
128  }
129  json_utils::get_value_from_object(value, header_offset_, "header_offset");
130  json_utils::get_value_from_object(value, total_bytes_read_, "total_bytes_read");
131  json_utils::get_value_from_object(value, data_size_, "data_size");
132 }
133 
135  rapidjson::Value& value,
136  rapidjson::Document::AllocatorType& allocator) const {
138  json_utils::add_value_to_object(value, header_offset_, "header_offset", allocator);
140  value, total_bytes_read_, "total_bytes_read", allocator);
141  json_utils::add_value_to_object(value, data_size_, "data_size", allocator);
142 }
143 
145  const shared::FilePathOptions& options,
146  const ForeignServer* server_options,
147  const UserMapping* user_mapping) {
149  // Re-open file and check if there is any new data in it
150  fclose(file_);
151  file_ = fopen(file_path_.c_str(), "rb");
152  if (!file_) {
153  throw std::runtime_error{"An error occurred when attempting to open file \"" +
154  file_path_ + "\". " + strerror(errno)};
155  }
156  fseek(file_, 0, SEEK_END);
157  size_t new_file_size = ftell(file_);
158  size_t new_data_size = get_data_size(new_file_size, header_offset_);
159  if (new_data_size < data_size_) {
161  }
162  if (fseek(file_, static_cast<long int>(file_offset + header_offset_), SEEK_SET) != 0) {
163  throw std::runtime_error{"An error occurred when attempting to read offset " +
164  std::to_string(file_offset + header_offset_) +
165  " in file: \"" + file_path_ + "\". " + strerror(errno)};
166  }
167  if (new_data_size > data_size_) {
168  scan_finished_ = false;
169  total_bytes_read_ = file_offset;
170  data_size_ = new_data_size;
171  }
172 }
173 
176  header_offset_ = getFirstLine().length() + 1;
177  }
178 }
179 
181  std::ifstream file{file_path_};
182  CHECK(file.good());
183  std::string line;
184  std::getline(file, line, copy_params_.line_delim);
185  file.close();
186  return line;
187 }
188 
192 void ArchiveWrapper::skipToEntry(int entry_number) {
193  if (current_entry_ >= entry_number) {
194  resetArchive();
195  }
196  while (current_entry_ < entry_number) {
197  if (arch_.get()->read_next_header()) {
198  current_entry_++;
199  } else {
200  throw std::runtime_error{"Invalid archive entry"};
201  }
202  }
203  fetchBlock();
204 }
205 
206 // Go to next consecutive entry
208  bool success = arch_.get()->read_next_header();
209  if (success) {
210  current_entry_++;
211  fetchBlock();
212  }
213  return success;
214 }
215 
216 void ArchiveWrapper::consumeDataFromCurrentEntry(size_t size, char* dest_buffer) {
217  CHECK(size <= block_chars_remaining_);
218  block_chars_remaining_ -= size;
219  if (dest_buffer != nullptr) {
220  memcpy(dest_buffer, current_block_, size);
221  }
222  current_block_ = static_cast<const char*>(current_block_) + size;
223  if (block_chars_remaining_ == 0) {
224  fetchBlock();
225  }
226 }
227 
228 char ArchiveWrapper::ArchiveWrapper::peekNextChar() {
229  CHECK(block_chars_remaining_ > 0);
230  return static_cast<const char*>(current_block_)[0];
231 }
232 
234  arch_.reset(new PosixFileArchive(file_path_, false));
236  // We will increment to 0 when reading first entry
237  current_entry_ = -1;
238 }
239 
241  int64_t offset;
242  auto ok =
243  arch_.get()->read_data_block(&current_block_, &block_chars_remaining_, &offset);
244  if (!ok) {
246  }
247 }
248 
249 CompressedFileReader::CompressedFileReader(const std::string& file_path,
250  const import_export::CopyParams& copy_params)
251  : SingleFileReader(file_path, copy_params)
252  , archive_(file_path)
253  , initial_scan_(true)
254  , scan_finished_(false)
255  , current_offset_(0)
256  , current_index_(-1) {
257  // Initialize first entry
258  nextEntry();
259 }
260 
261 CompressedFileReader::CompressedFileReader(const std::string& file_path,
262  const import_export::CopyParams& copy_params,
263  const rapidjson::Value& value)
264  : CompressedFileReader(file_path, copy_params) {
265  scan_finished_ = true;
266  initial_scan_ = false;
267  sourcenames_.clear();
268  archive_entry_index_.clear();
269  cumulative_sizes_.clear();
270  json_utils::get_value_from_object(value, sourcenames_, "sourcenames");
271  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
272  json_utils::get_value_from_object(value, archive_entry_index_, "archive_entry_index");
273 }
274 
276  size_t read_size,
277  size_t buffer_size) {
278  size_t remaining_size = read_size;
279  char* dest = static_cast<char*>(buffer);
280  while (remaining_size > 0 && !archive_.currentEntryFinished()) {
281  size_t copy_size = (archive_.currentEntryDataAvailable() < remaining_size)
283  : remaining_size;
284  // copy data into dest
285  archive_.consumeDataFromCurrentEntry(copy_size, dest);
286  remaining_size -= copy_size;
287  dest += copy_size;
288  }
289  size_t bytes_read = read_size - remaining_size;
290  if (archive_.currentEntryFinished() && (bytes_read < read_size)) {
291  adjust_eof(
292  bytes_read, buffer_size, static_cast<char*>(buffer), copy_params_.line_delim);
293  current_offset_ += bytes_read;
294  nextEntry();
295  } else {
296  current_offset_ += bytes_read;
297  }
298  return bytes_read;
299 }
300 
301 size_t CompressedFileReader::read(void* buffer, size_t max_size) {
302  // Leave one extra char in case we need to insert a delimiter
303  size_t bytes_read = readInternal(buffer, max_size - 1, max_size);
304  return bytes_read;
305 }
306 
307 size_t CompressedFileReader::readRegion(void* buffer, size_t offset, size_t size) {
309 
310  // Determine where in the archive we are
311  size_t index = offset_to_index(cumulative_sizes_, offset);
312  CHECK(archive_entry_index_.size() > index);
313  auto archive_entry = archive_entry_index_[index];
314  current_index_ = static_cast<int>(index);
315 
316  // If we are in the wrong entry or too far in the right one skip to the correct entry
317  if (archive_entry != archive_.getCurrentEntryIndex() ||
318  (archive_entry == archive_.getCurrentEntryIndex() && offset < current_offset_)) {
319  archive_.skipToEntry(archive_entry);
320  skipHeader();
321  current_offset_ = 0;
322  if (index > 0) {
323  current_offset_ = cumulative_sizes_[index - 1];
324  }
325  }
326  skipBytes(offset - current_offset_);
327  return readInternal(buffer, size, size);
328 }
329 
334  do {
335  // Go to the next index
336  current_index_++;
337  if (static_cast<int>(cumulative_sizes_.size()) < current_index_) {
339  }
340  if (!initial_scan_) {
341  // Entry # in the archive is known and might not be the next one in the file
342  if (static_cast<int>(archive_entry_index_.size()) > current_index_) {
344  skipHeader();
345  } else {
346  scan_finished_ = true;
347  return;
348  }
349  } else {
350  // Read next header in archive and save the sourcename
351  if (archive_.nextEntry()) {
352  // read headers until one has data
353  CHECK(sourcenames_.size() == archive_entry_index_.size());
354  sourcenames_.emplace_back(archive_.entryName());
356  skipHeader();
357  } else {
358  scan_finished_ = true;
359  initial_scan_ = false;
360  return;
361  }
362  }
363  } while (archive_.currentEntryFinished());
364 }
365 
371  std::optional<std::string> str = std::nullopt;
372  consumeFirstLine(str);
373  }
374 }
375 
378  auto first_line = std::make_optional<std::string>();
379  first_line.value().reserve(DEFAULT_HEADER_READ_SIZE);
380  reader.consumeFirstLine(first_line);
381  return first_line.value();
382 }
383 
384 void CompressedFileReader::consumeFirstLine(std::optional<std::string>& dest_str) {
385  char* dest_buffer = nullptr;
386  while (!archive_.currentEntryFinished()) {
387  if (dest_str.has_value()) {
388  auto& str = dest_str.value();
389  str.resize(str.length() + 1);
390  dest_buffer = str.data() + str.length() - 1;
391  }
393  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
394  break;
395  }
396  archive_.consumeDataFromCurrentEntry(1, dest_buffer);
397  }
398 }
399 
404 void CompressedFileReader::skipBytes(size_t n_bytes) {
405  current_offset_ += n_bytes;
406  while (n_bytes > 0) {
408  // We've reached the end of the entry
409  return;
410  }
411  // Keep fetching blocks/entries until we've gone through N bytes
412  if (archive_.currentEntryDataAvailable() <= n_bytes) {
413  n_bytes -= archive_.currentEntryDataAvailable();
415  } else {
417  n_bytes = 0;
418  }
419  }
420 }
421 
423  const shared::FilePathOptions& options,
424  const ForeignServer* server_options,
425  const UserMapping* user_mapping) {
426  CHECK(initial_scan_ == false);
427  size_t initial_entries = archive_entry_index_.size();
428 
429  // Reset all entry indexes for existing items
430  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
431  archive_entry_index_[index] = -1;
432  }
433 
434  // Read headers and determine location of existing and new files
435  int entry_number = 0;
437  while (archive_.nextEntry()) {
438  auto it = find(sourcenames_.begin(), sourcenames_.end(), archive_.entryName());
439  if (it != sourcenames_.end()) {
440  // Record new index of already read file
441  auto index = it - sourcenames_.begin();
442  archive_entry_index_[index] = entry_number;
443  } else {
444  // Append new source file
445  sourcenames_.emplace_back(archive_.entryName());
446  archive_entry_index_.emplace_back(entry_number);
447  }
448  entry_number++;
449  }
450 
451  // Error if we are missing a file from a previous scan
452  for (size_t index = 0; index < archive_entry_index_.size(); index++) {
453  if (archive_entry_index_[index] == -1) {
454  throw std::runtime_error{
455  "Foreign table refreshed with APPEND mode missing archive entry \"" +
456  sourcenames_[index] + "\" from file \"" +
457  boost::filesystem::path(file_path_).filename().string() + "\"."};
458  }
459  }
460 
462  if (initial_entries < archive_entry_index_.size()) {
463  // We found more files
464  current_index_ = static_cast<int>(initial_entries) - 1;
466  // iterate through new entries until we get one with data
467  do {
468  nextEntry();
469  } while (archive_.currentEntryFinished() &&
470  current_index_ < static_cast<int>(archive_entry_index_.size()));
471 
473  scan_finished_ = false;
474  }
475  } else {
476  // No new files but this may be an archive of a single file
477  // Check if we only have one file and check if it has more data
478  // May have still have multiple entries with some empty that are ignored
479  // like directories
480  size_t last_size = 0;
481  size_t file_index = -1;
482  size_t num_file_entries = 0;
483  for (size_t index = 0; index < cumulative_sizes_.size(); index++) {
484  if (cumulative_sizes_[index] > last_size) {
485  file_index = index;
486  num_file_entries++;
487  last_size = cumulative_sizes_[index];
488  }
489  }
490  if (num_file_entries == 1) {
491  current_index_ = static_cast<int>(file_index);
492  current_offset_ = 0;
493  size_t last_eof = cumulative_sizes_[file_index];
494 
495  // reset cumulative_sizes_ with initial zero sizes
496  auto old_cumulative_sizes = std::move(cumulative_sizes_);
497  cumulative_sizes_ = {};
498  for (size_t zero_index = 0; zero_index < file_index; zero_index++) {
499  cumulative_sizes_.emplace_back(0);
500  }
501 
502  // Go to Index of file and read to where we left off
504  skipHeader();
505  skipBytes(last_eof);
507  scan_finished_ = false;
508  } else {
509  // There was no new data, so put back the old data structure
510  cumulative_sizes_ = std::move(old_cumulative_sizes);
511  }
512  }
513  }
514 }
515 
517  rapidjson::Value& value,
518  rapidjson::Document::AllocatorType& allocator) const {
519  // Should be done initial scan
522 
523  json_utils::add_value_to_object(value, sourcenames_, "sourcenames", allocator);
525  value, cumulative_sizes_, "cumulative_sizes", allocator);
527  value, archive_entry_index_, "archive_entry_index", allocator);
528 };
529 
530 MultiFileReader::MultiFileReader(const std::string& file_path,
531  const import_export::CopyParams& copy_params)
532  : FileReader(file_path, copy_params)
533  , current_index_(0)
534  , current_offset_(0)
535  , starting_offset_(0)
536  , is_end_of_last_file_(false) {}
537 
538 MultiFileReader::MultiFileReader(const std::string& file_path,
539  const import_export::CopyParams& copy_params,
540  const rapidjson::Value& value)
541  : FileReader(file_path, copy_params)
542  , current_index_(0)
543  , current_offset_(0)
544  , starting_offset_(0)
545  , is_end_of_last_file_(false) {
546  json_utils::get_value_from_object(value, file_locations_, "file_locations");
547  json_utils::get_value_from_object(value, cumulative_sizes_, "cumulative_sizes");
548  json_utils::get_value_from_object(value, current_offset_, "current_offset");
549  json_utils::get_value_from_object(value, current_index_, "current_index");
550  if (value.HasMember("starting_offset")) {
551  json_utils::get_value_from_object(value, starting_offset_, "starting_offset");
552  }
553 
554  // Validate files_metadata here, but objects will be recreated by child class
555  CHECK(value.HasMember("files_metadata"));
556  CHECK(value["files_metadata"].IsArray());
557  CHECK(file_locations_.size() == value["files_metadata"].GetArray().Size());
558 }
559 
560 void MultiFileReader::serialize(rapidjson::Value& value,
561  rapidjson::Document::AllocatorType& allocator) const {
562  json_utils::add_value_to_object(value, file_locations_, "file_locations", allocator);
564  value, cumulative_sizes_, "cumulative_sizes", allocator);
565  json_utils::add_value_to_object(value, current_offset_, "current_offset", allocator);
566  json_utils::add_value_to_object(value, current_index_, "current_index", allocator);
567  json_utils::add_value_to_object(value, starting_offset_, "starting_offset", allocator);
568 
569  // Serialize metadata from all files
570  rapidjson::Value files_metadata(rapidjson::kArrayType);
571  for (size_t index = 0; index < files_.size(); index++) {
572  rapidjson::Value file_metadata(rapidjson::kObjectType);
573  files_[index]->serialize(file_metadata, allocator);
574  files_metadata.PushBack(file_metadata, allocator);
575  }
576  value.AddMember("files_metadata", files_metadata, allocator);
577 };
578 
580  size_t total_size = 0;
581  for (size_t index = current_index_; index < files_.size(); index++) {
582  total_size += files_[index]->getRemainingSize();
583  }
584  return total_size;
585 }
586 
588  bool size_known = true;
589  for (size_t index = current_index_; index < files_.size(); index++) {
590  size_known = size_known && files_[index]->isRemainingSizeKnown();
591  }
592  return size_known;
593 }
594 
596  FirstLineByFilePath first_line_by_file_path;
597  for (const auto& file : files_) {
598  first_line_by_file_path.merge(file->getFirstLineForEachFile());
599  }
600  return first_line_by_file_path;
601 }
602 
604  return (isScanFinished() || is_end_of_last_file_);
605 }
606 
608  if (isScanFinished()) {
609  return files_.back()->getCurrentFilePath();
610  }
611  CHECK_LT(current_index_, files_.size());
612  return files_[current_index_]->getCurrentFilePath();
613 }
614 
616  const shared::FilePathOptions& file_path_options) {
617  auto all_file_paths = getAllFilePaths(file_path_options);
618  auto rolled_off_files =
620  if (!rolled_off_files.empty()) {
621  files_.erase(files_.begin(), files_.begin() + rolled_off_files.size());
622  CHECK_LE(rolled_off_files.size(), cumulative_sizes_.size());
623  starting_offset_ = cumulative_sizes_[rolled_off_files.size() - 1];
624  cumulative_sizes_.erase(cumulative_sizes_.begin(),
625  cumulative_sizes_.begin() + rolled_off_files.size());
626  current_index_ -= rolled_off_files.size();
627  }
628  return rolled_off_files;
629 }
630 
631 LocalMultiFileReader::LocalMultiFileReader(const std::string& file_path,
632  const import_export::CopyParams& copy_params,
633  const shared::FilePathOptions& options,
634  const std::optional<size_t>& max_file_count)
635  : MultiFileReader(file_path, copy_params) {
636  auto file_paths = shared::local_glob_filter_sort_files(file_path, options);
637  if (max_file_count.has_value() && file_paths.size() > max_file_count.value()) {
638  file_paths.erase(file_paths.begin(),
639  file_paths.begin() + (file_paths.size() - max_file_count.value()));
640  }
641  for (const auto& file_path : file_paths) {
642  insertFile(file_path);
643  }
644 }
645 
646 LocalMultiFileReader::LocalMultiFileReader(const std::string& file_path,
647  const import_export::CopyParams& copy_params,
648  const rapidjson::Value& value)
649  : MultiFileReader(file_path, copy_params, value) {
650  // Constructs file from files_metadata
651  for (size_t index = 0; index < file_locations_.size(); index++) {
653  files_.emplace_back(std::make_unique<CompressedFileReader>(
654  file_locations_[index],
655  copy_params_,
656  value["files_metadata"].GetArray()[index]));
657  } else {
658  files_.emplace_back(std::make_unique<SingleTextFileReader>(
659  file_locations_[index],
660  copy_params_,
661  value["files_metadata"].GetArray()[index]));
662  }
663  }
664 }
665 
666 void LocalMultiFileReader::insertFile(std::string location) {
667  if (shared::is_compressed_file_extension(location)) {
668  files_.emplace_back(std::make_unique<CompressedFileReader>(location, copy_params_));
669  } else {
670  files_.emplace_back(std::make_unique<SingleTextFileReader>(location, copy_params_));
671  }
672  if (files_.back()->isScanFinished()) {
673  // skip any initially empty files
674  files_.pop_back();
675  } else {
676  file_locations_.push_back(location);
677  }
678 }
679 
681  size_t file_offset,
682  const shared::FilePathOptions& file_path_options,
683  const ForeignServer* server_options,
684  const UserMapping* user_mapping) {
685  // Look for new files
686  std::set<std::string> new_locations;
688  CHECK(file_offset == current_offset_);
689  if (boost::filesystem::is_directory(file_path_)) {
690  // Find all files in this directory
691  auto all_file_paths = getAllFilePaths(file_path_options);
692  for (const auto& path : all_file_paths) {
693  if (!shared::contains(file_locations_, path)) {
694  new_locations.insert(path);
695  }
696  }
697 
698  for (const auto& file_path : file_locations_) {
699  if (!shared::contains(all_file_paths, file_path)) {
700  throw_removed_file_error(file_path);
701  }
702  }
703  }
704 
705  if (!files_.empty()) {
706  // Check if last file has new data
707  size_t base = starting_offset_;
708  CHECK_GT(current_index_, size_t(0));
709  auto last_file_index = current_index_ - 1;
710  if (last_file_index > 0) {
711  base = cumulative_sizes_[last_file_index - 1];
712  }
713  files_.back()->checkForMoreRows(current_offset_ - base, file_path_options);
714  if (!files_.back()->isScanFinished()) {
715  // Go back to the last file, if more rows are found.
716  current_index_ = last_file_index;
717  is_end_of_last_file_ = false;
718  cumulative_sizes_.pop_back();
719  }
720  }
721 
722  if (new_locations.size() > 0) {
723  for (const auto& location : new_locations) {
724  insertFile(location);
725  }
726  }
727 }
728 
729 std::vector<std::string> LocalMultiFileReader::getAllFilePaths(
730  const shared::FilePathOptions& file_path_options) const {
731  return shared::local_glob_filter_sort_files(file_path_, file_path_options);
732 }
733 
734 size_t MultiFileReader::read(void* buffer, size_t max_size) {
735  if (isScanFinished()) {
736  return 0;
737  }
738  // Leave one extra char in case we need to insert a delimiter
739  size_t bytes_read = files_[current_index_].get()->read(buffer, max_size - 1);
740  if (files_[current_index_].get()->isScanFinished()) {
741  adjust_eof(bytes_read, max_size, static_cast<char*>(buffer), copy_params_.line_delim);
742  }
743  current_offset_ += bytes_read;
744  if (current_index_ < files_.size() && files_[current_index_].get()->isScanFinished()) {
746  current_index_++;
747  is_end_of_last_file_ = true;
748  } else {
749  is_end_of_last_file_ = false;
750  }
751  return bytes_read;
752 }
753 
754 size_t MultiFileReader::readRegion(void* buffer, size_t offset, size_t size) {
756  // Get file index
757  auto index = offset_to_index(cumulative_sizes_, offset);
758  // Get offset into this file
759  size_t base = starting_offset_;
760  if (index > 0) {
761  base = cumulative_sizes_[index - 1];
762  }
763 
764  size_t read_size = size;
765  if (offset + size == cumulative_sizes_[index]) {
766  // Skip the last byte as it may have been an inserted delimiter
767  read_size--;
768  }
769  size_t bytes_read = files_[index].get()->readRegion(buffer, offset - base, read_size);
770 
771  if (offset + size == cumulative_sizes_[index]) {
772  // Re-insert delimiter
773  static_cast<char*>(buffer)[size - 1] = copy_params_.line_delim;
774  bytes_read++;
775  }
776 
777  return bytes_read;
778 }
779 
780 } // namespace foreign_storage
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
bool contains(const T &container, const U &element)
Definition: misc.h:204
virtual bool isScanFinished() const =0
void checkForMoreRows(size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:144
std::vector< std::string > sourcenames_
Definition: FileReader.h:339
virtual std::string getFirstLine() const =0
size_t offset_to_index(const std::vector< size_t > &cumulative_sizes, size_t byte_offset)
Definition: FileReader.cpp:57
std::vector< int > archive_entry_index_
Definition: FileReader.h:342
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
std::vector< std::string > getAllFilePaths(const shared::FilePathOptions &file_path_options) const override
Definition: FileReader.cpp:729
bool isScanFinished() const override
Definition: FileReader.h:186
std::string getFirstLine() const override
Definition: FileReader.cpp:180
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:337
void throw_removed_row_in_file_error(const std::string &file_path)
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:249
virtual std::vector< std::string > getAllFilePaths(const shared::FilePathOptions &file_path_options) const =0
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: JsonUtils.h:270
FirstLineByFilePath getFirstLineForEachFile() const override
Definition: FileReader.cpp:77
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.cpp:307
void checkForMoreRows(size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:422
SingleTextFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:89
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: FileReader.cpp:275
std::map< std::string, std::string > FirstLineByFilePath
Definition: FileReader.h:37
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::set< std::string > check_for_rolled_off_file_paths(const std::vector< std::string > &all_file_paths, std::vector< std::string > &processed_file_paths)
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:530
std::string to_string(char const *&&v)
void insertFile(std::string location)
Definition: FileReader.cpp:666
import_export::CopyParams copy_params_
Definition: FileReader.h:128
void throw_removed_file_error(const std::string &file_path)
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:134
ImportHeaderRow has_header
Definition: CopyParams.h:46
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:516
bool isScanFinished() const override
Definition: FileReader.h:280
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:216
bool isRemainingSizeKnown() override
Definition: FileReader.cpp:587
FirstLineByFilePath getFirstLineForEachFile() const override
Definition: FileReader.cpp:595
::FILE * fopen(const char *filename, const char *mode)
Definition: heavyai_fs.cpp:74
bool currentEntryFinished() const
Definition: FileReader.h:234
LocalMultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &options, const std::optional< size_t > &max_file_count)
Definition: FileReader.cpp:631
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.cpp:754
void checkForMoreRows(size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:680
void consumeFirstLine(std::optional< std::string > &dest_str)
Definition: FileReader.cpp:384
bool g_enable_smem_group_by true
size_t get_data_size(size_t file_size, size_t header_size)
Definition: FileReader.cpp:66
std::unique_ptr< Archive > arch_
Definition: FileReader.h:257
bool isScanFinished() const override
Definition: FileReader.h:362
#define CHECK_LT(x, y)
Definition: Logger.h:303
#define CHECK_LE(x, y)
Definition: Logger.h:304
static constexpr size_t DEFAULT_HEADER_READ_SIZE
Definition: FileReader.h:149
tuple line
Definition: parse_ast.py:10
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: JsonUtils.h:255
void adjust_eof(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
Definition: FileReader.cpp:37
SingleFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:73
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.cpp:734
void skipToEntry(int entry_number)
Definition: FileReader.cpp:192
shared utility for mime-types
bool g_enable_watchdog false
Definition: Execute.cpp:80
#define CHECK(condition)
Definition: Logger.h:291
size_t currentEntryDataAvailable() const
Definition: FileReader.h:236
std::string getCurrentFilePath() const override
Definition: FileReader.cpp:607
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)
std::string getFirstLine() const override
Definition: FileReader.cpp:376
std::string getCurrentFilePath() const override
Definition: FileReader.cpp:85
size_t getRemainingSize() override
Definition: FileReader.cpp:579
std::vector< std::string > file_locations_
Definition: FileReader.h:381
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:560
virtual std::set< std::string > checkForRolledOffFiles(const shared::FilePathOptions &file_path_options)
Definition: FileReader.cpp:615
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
bool is_compressed_file_extension(const std::string &location)
Definition: file_type.cpp:49
std::vector< std::unique_ptr< FileReader > > files_
Definition: FileReader.h:380
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:384
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.cpp:301