OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FileReader.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <optional>
20 
21 #include <boost/filesystem.hpp>
22 #include "rapidjson/document.h"
23 
25 #include "Catalog/ForeignTable.h"
27 
28 namespace shared {
29 struct FilePathOptions;
30 }
31 
32 namespace foreign_storage {
33 
34 struct ForeignServer;
35 struct UserMapping;
36 
37 using FirstLineByFilePath = std::map<std::string, std::string>;
38 
39 // File reader
40 // Supports an initial full scan with calls to read()
41 // When the entire object has been read isScanFinished() returns true
42 // Previously read data can then be re-read with readRegion()
43 class FileReader {
44  public:
45  FileReader(const std::string& file_path, const import_export::CopyParams& copy_params)
46  : copy_params_(copy_params), file_path_(file_path){};
47  virtual ~FileReader() = default;
48 
57  virtual size_t read(void* buffer, size_t max_size) = 0;
58 
62  virtual bool isScanFinished() const = 0;
63 
73  virtual size_t readRegion(void* buffer, size_t offset, size_t size) = 0;
74 
78  virtual size_t getRemainingSize() = 0;
79 
83  virtual bool isRemainingSizeKnown() = 0;
93  virtual void checkForMoreRows(size_t file_offset,
94  const shared::FilePathOptions& options,
95  const ForeignServer* server_options = nullptr,
96  const UserMapping* user_mapping = nullptr) {
97  throw std::runtime_error{"APPEND mode not yet supported for this table."};
98  }
99 
108  virtual void serialize(rapidjson::Value& value,
109  rapidjson::Document::AllocatorType& allocator) const = 0;
110 
114  virtual FirstLineByFilePath getFirstLineForEachFile() const = 0;
115 
120  virtual bool isEndOfLastFile() = 0;
121 
125  virtual std::string getCurrentFilePath() const = 0;
126 
127  protected:
129  std::string file_path_;
130 };
131 
132 class SingleFileReader : public FileReader {
133  public:
134  SingleFileReader(const std::string& file_path,
135  const import_export::CopyParams& copy_params);
136 
137  ~SingleFileReader() override = default;
138 
140 
141  bool isEndOfLastFile() override;
142 
143  std::string getCurrentFilePath() const override;
144 
145  protected:
146  virtual std::string getFirstLine() const = 0;
147  virtual void skipHeader() = 0;
148 
149  static constexpr size_t DEFAULT_HEADER_READ_SIZE{1024};
150 };
151 
152 // Single uncompressed file, that supports FSEEK for faster random access
154  public:
155  SingleTextFileReader(const std::string& file_path,
156  const import_export::CopyParams& copy_params);
157  SingleTextFileReader(const std::string& file_path,
158  const import_export::CopyParams& copy_params,
159  const rapidjson::Value& value);
160  ~SingleTextFileReader() override { fclose(file_); }
161 
162  // Delete copy assignment to prevent copying resource pointer
165 
166  size_t read(void* buffer, size_t max_size) override {
167  size_t bytes_read = fread(buffer, 1, max_size, file_);
168  if (!scan_finished_) {
169  scan_finished_ = feof(file_);
170  }
171 
172  total_bytes_read_ += bytes_read;
173  return bytes_read;
174  }
175 
176  size_t readRegion(void* buffer, size_t offset, size_t size) override {
178  if (fseek(file_, static_cast<long int>(offset + header_offset_), SEEK_SET) != 0) {
179  throw std::runtime_error{"An error occurred when attempting to read offset " +
180  std::to_string(offset) + " in file: \"" + file_path_ +
181  "\". " + strerror(errno)};
182  }
183  return fread(buffer, 1, size, file_);
184  }
185 
186  bool isScanFinished() const override { return scan_finished_; }
187 
188  size_t getRemainingSize() override { return data_size_ - total_bytes_read_; }
189 
190  bool isRemainingSizeKnown() override { return true; };
191 
192  void checkForMoreRows(size_t file_offset,
193  const shared::FilePathOptions& options,
194  const ForeignServer* server_options,
195  const UserMapping* user_mapping) override;
196 
197  void serialize(rapidjson::Value& value,
198  rapidjson::Document::AllocatorType& allocator) const override;
199 
200  private:
201  std::string getFirstLine() const override;
202  void skipHeader() override;
203 
204  std::FILE* file_;
205  // Size of data in file
206  size_t data_size_;
207  // We've reached the end of the file
209 
210  // Size of the header in bytes
212 
214 };
215 
217  public:
218  ArchiveWrapper(const std::string& file_path)
219  : current_block_(nullptr)
221  , current_entry_(-1)
222  , file_path_(file_path) {
223  resetArchive();
224  }
225 
229  void skipToEntry(int entry_number);
230 
231  // Go to next consecutive entry in archive
232  bool nextEntry();
233 
234  bool currentEntryFinished() const { return (block_chars_remaining_ == 0); }
235 
237 
238  // Consume given amount of data from current block, copying into dest_buffer if set
239  void consumeDataFromCurrentEntry(size_t size, char* dest_buffer = nullptr);
240 
241  // Return the next char from the buffer without consuming it
242  char peekNextChar();
243 
244  int getCurrentEntryIndex() const { return current_entry_; }
245 
246  // Reset archive, start reading again from the first entry
247  void resetArchive();
248 
249  std::string entryName() { return arch_->entryName(); }
250 
251  private:
255  void fetchBlock();
256 
257  std::unique_ptr<Archive> arch_;
258  // Pointer to current uncompressed block from the archive
259  const void* current_block_;
260  // Number of chars remaining in the current block
262  // Index of entry of the archive file
264 
265  std::string file_path_;
266 };
267 
268 // Single archive, does not support random access
270  public:
271  CompressedFileReader(const std::string& file_path,
272  const import_export::CopyParams& copy_params);
273  CompressedFileReader(const std::string& file_path,
274  const import_export::CopyParams& copy_params,
275  const rapidjson::Value& value);
276  size_t read(void* buffer, size_t max_size) override;
277 
278  size_t readRegion(void* buffer, size_t offset, size_t size) override;
279 
280  bool isScanFinished() const override { return scan_finished_; }
281 
282  bool isRemainingSizeKnown() override { return false; };
283  size_t getRemainingSize() override { return 0; }
284 
285  void serialize(rapidjson::Value& value,
286  rapidjson::Document::AllocatorType& allocator) const override;
287 
288  private:
292  void resetArchive();
293 
294  void checkForMoreRows(size_t file_offset,
295  const shared::FilePathOptions& options,
296  const ForeignServer* server_options,
297  const UserMapping* user_mapping) override;
298 
302  void nextEntry();
303 
307  void skipHeader() override;
308 
313  void skipBytes(size_t n_bytes);
314 
315  // Read bytes in current entry adjusting for EOF
316  size_t readInternal(void* buffer, size_t read_size, size_t buffer_size);
317 
318  std::string getFirstLine() const override;
319 
320  void consumeFirstLine(std::optional<std::string>& dest_str);
321 
323 
324  // Are we doing initial scan or an append
326  // We've reached the end of the last file
328 
329  // Overall number of bytes read in the archive (minus headers)
331 
332  // Index of current entry in order they appear in
333  // cumulative_sizes_/sourcenames_/archive_entry_index_
335 
336  // Size of each file + all previous files
337  std::vector<size_t> cumulative_sizes_;
338  // Names of the file in the archive
339  std::vector<std::string> sourcenames_;
340  // Index of the entry in the archive
341  // Order can change during append operation
342  std::vector<int> archive_entry_index_;
343 };
344 
345 // Combines several archives into single object
346 class MultiFileReader : public FileReader {
347  public:
348  MultiFileReader(const std::string& file_path,
349  const import_export::CopyParams& copy_params);
350  MultiFileReader(const std::string& file_path,
351  const import_export::CopyParams& copy_params,
352  const rapidjson::Value& value);
353 
354  size_t getRemainingSize() override;
355 
356  bool isRemainingSizeKnown() override;
357 
358  size_t read(void* buffer, size_t max_size) override;
359 
360  size_t readRegion(void* buffer, size_t offset, size_t size) override;
361 
362  bool isScanFinished() const override { return (current_index_ >= files_.size()); }
363 
364  void serialize(rapidjson::Value& value,
365  rapidjson::Document::AllocatorType& allocator) const override;
366 
368 
369  bool isEndOfLastFile() override;
370 
371  std::string getCurrentFilePath() const override;
372 
373  virtual std::set<std::string> checkForRolledOffFiles(
374  const shared::FilePathOptions& file_path_options);
375 
376  protected:
377  virtual std::vector<std::string> getAllFilePaths(
378  const shared::FilePathOptions& file_path_options) const = 0;
379 
380  std::vector<std::unique_ptr<FileReader>> files_;
381  std::vector<std::string> file_locations_;
382 
383  // Size of each file + all previous files
384  std::vector<size_t> cumulative_sizes_;
385  // Current file being read
387  // Overall number of bytes read in the directory (minus headers)
389 
391 
393 };
394 
395 // Single file or directory with multiple files
397  public:
398  LocalMultiFileReader(const std::string& file_path,
399  const import_export::CopyParams& copy_params,
400  const shared::FilePathOptions& options,
401  const std::optional<size_t>& max_file_count);
402 
403  LocalMultiFileReader(const std::string& file_path,
404  const import_export::CopyParams& copy_params,
405  const rapidjson::Value& value);
406 
407  void checkForMoreRows(size_t file_offset,
408  const shared::FilePathOptions& options,
409  const ForeignServer* server_options,
410  const UserMapping* user_mapping) override;
411 
412  private:
413  std::vector<std::string> getAllFilePaths(
414  const shared::FilePathOptions& file_path_options) const override;
415 
416  void insertFile(std::string location);
417 };
418 
419 } // namespace foreign_storage
virtual bool isScanFinished() const =0
void checkForMoreRows(size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:144
std::vector< std::string > sourcenames_
Definition: FileReader.h:339
virtual std::string getFirstLine() const =0
virtual size_t read(void *buffer, size_t max_size)=0
std::vector< int > archive_entry_index_
Definition: FileReader.h:342
std::vector< std::string > getAllFilePaths(const shared::FilePathOptions &file_path_options) const override
Definition: FileReader.cpp:729
bool isScanFinished() const override
Definition: FileReader.h:186
std::string getFirstLine() const override
Definition: FileReader.cpp:180
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:337
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:249
virtual std::vector< std::string > getAllFilePaths(const shared::FilePathOptions &file_path_options) const =0
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
FirstLineByFilePath getFirstLineForEachFile() const override
Definition: FileReader.cpp:77
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.h:176
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.cpp:307
void checkForMoreRows(size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:422
SingleTextFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:89
FileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.h:45
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: FileReader.cpp:275
std::map< std::string, std::string > FirstLineByFilePath
Definition: FileReader.h:37
virtual bool isEndOfLastFile()=0
virtual size_t getRemainingSize()=0
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:530
virtual ~FileReader()=default
std::string to_string(char const *&&v)
void insertFile(std::string location)
Definition: FileReader.cpp:666
import_export::CopyParams copy_params_
Definition: FileReader.h:128
virtual void checkForMoreRows(size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options=nullptr, const UserMapping *user_mapping=nullptr)
Definition: FileReader.h:93
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:134
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:516
bool isScanFinished() const override
Definition: FileReader.h:280
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:216
bool isRemainingSizeKnown() override
Definition: FileReader.cpp:587
SingleTextFileReader & operator=(const SingleTextFileReader &)=delete
FirstLineByFilePath getFirstLineForEachFile() const override
Definition: FileReader.cpp:595
ArchiveWrapper(const std::string &file_path)
Definition: FileReader.h:218
bool currentEntryFinished() const
Definition: FileReader.h:234
LocalMultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &options, const std::optional< size_t > &max_file_count)
Definition: FileReader.cpp:631
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.cpp:754
void checkForMoreRows(size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:680
void consumeFirstLine(std::optional< std::string > &dest_str)
Definition: FileReader.cpp:384
virtual void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const =0
std::unique_ptr< Archive > arch_
Definition: FileReader.h:257
bool isScanFinished() const override
Definition: FileReader.h:362
static constexpr size_t DEFAULT_HEADER_READ_SIZE
Definition: FileReader.h:149
SingleFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:73
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.cpp:734
void skipToEntry(int entry_number)
Definition: FileReader.cpp:192
#define CHECK(condition)
Definition: Logger.h:291
size_t currentEntryDataAvailable() const
Definition: FileReader.h:236
virtual bool isRemainingSizeKnown()=0
std::string getCurrentFilePath() const override
Definition: FileReader.cpp:607
std::string getFirstLine() const override
Definition: FileReader.cpp:376
std::string getCurrentFilePath() const override
Definition: FileReader.cpp:85
size_t getRemainingSize() override
Definition: FileReader.cpp:579
virtual FirstLineByFilePath getFirstLineForEachFile() const =0
std::vector< std::string > file_locations_
Definition: FileReader.h:381
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:560
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.h:166
virtual std::set< std::string > checkForRolledOffFiles(const shared::FilePathOptions &file_path_options)
Definition: FileReader.cpp:615
std::vector< std::unique_ptr< FileReader > > files_
Definition: FileReader.h:380
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:384
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.cpp:301
virtual std::string getCurrentFilePath() const =0
~SingleFileReader() override=default