25 namespace foreign_storage {
38 const size_t buffer_size,
40 const char line_delim) {
41 if (read_size == 0 || buffer[read_size - 1] != line_delim) {
42 CHECK(buffer_size > read_size);
43 static_cast<char*
>(buffer)[read_size] = line_delim;
45 }
else if (read_size > 1 && buffer[read_size - 2] == line_delim) {
57 size_t offset_to_index(
const std::vector<size_t>& cumulative_sizes,
size_t byte_offset) {
59 std::upper_bound(cumulative_sizes.begin(), cumulative_sizes.end(), byte_offset);
60 if (iterator == cumulative_sizes.end()) {
61 throw std::runtime_error{
"Invalid offset into cumulative_sizes"};
63 return iterator - cumulative_sizes.begin();
68 return file_size - header_size + 1;
92 , scan_finished_(
false)
94 , total_bytes_read_(0) {
97 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
98 file_path +
"\". " + strerror(errno)};
103 fseek(
file_, 0, SEEK_END);
112 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
113 file_path +
"\". " + strerror(errno)};
119 const rapidjson::Value& value)
121 , scan_finished_(
true)
123 , total_bytes_read_(0) {
126 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
127 file_path +
"\". " + strerror(errno)};
135 rapidjson::Value& value,
136 rapidjson::Document::AllocatorType& allocator)
const {
153 throw std::runtime_error{
"An error occurred when attempting to open file \"" +
156 fseek(
file_, 0, SEEK_END);
157 size_t new_file_size = ftell(
file_);
163 throw std::runtime_error{
"An error occurred when attempting to read offset " +
165 " in file: \"" +
file_path_ +
"\". " + strerror(errno)};
197 if (
arch_.get()->read_next_header()) {
200 throw std::runtime_error{
"Invalid archive entry"};
208 bool success =
arch_.get()->read_next_header();
219 if (dest_buffer !=
nullptr) {
228 char ArchiveWrapper::ArchiveWrapper::peekNextChar() {
229 CHECK(block_chars_remaining_ > 0);
230 return static_cast<const char*
>(current_block_)[0];
252 , archive_(file_path)
253 , initial_scan_(
true)
254 , scan_finished_(
false)
256 , current_index_(-1) {
263 const rapidjson::Value& value)
277 size_t buffer_size) {
278 size_t remaining_size = read_size;
279 char*
dest =
static_cast<char*
>(buffer);
286 remaining_size -= copy_size;
289 size_t bytes_read = read_size - remaining_size;
303 size_t bytes_read =
readInternal(buffer, max_size - 1, max_size);
371 std::optional<std::string> str = std::nullopt;
378 auto first_line = std::make_optional<std::string>();
380 reader.consumeFirstLine(first_line);
381 return first_line.value();
385 char* dest_buffer =
nullptr;
387 if (dest_str.has_value()) {
388 auto& str = dest_str.value();
389 str.resize(str.length() + 1);
390 dest_buffer = str.data() + str.length() - 1;
406 while (n_bytes > 0) {
435 int entry_number = 0;
454 throw std::runtime_error{
455 "Foreign table refreshed with APPEND mode missing archive entry \"" +
457 boost::filesystem::path(
file_path_).filename().string() +
"\"."};
480 size_t last_size = 0;
481 size_t file_index = -1;
482 size_t num_file_entries = 0;
490 if (num_file_entries == 1) {
498 for (
size_t zero_index = 0; zero_index < file_index; zero_index++) {
517 rapidjson::Value& value,
518 rapidjson::Document::AllocatorType& allocator)
const {
535 , starting_offset_(0)
536 , is_end_of_last_file_(
false) {}
540 const rapidjson::Value& value)
544 , starting_offset_(0)
545 , is_end_of_last_file_(
false) {
550 if (value.HasMember(
"starting_offset")) {
555 CHECK(value.HasMember(
"files_metadata"));
556 CHECK(value[
"files_metadata"].IsArray());
561 rapidjson::Document::AllocatorType& allocator)
const {
570 rapidjson::Value files_metadata(rapidjson::kArrayType);
571 for (
size_t index = 0; index <
files_.size(); index++) {
572 rapidjson::Value file_metadata(rapidjson::kObjectType);
573 files_[index]->serialize(file_metadata, allocator);
574 files_metadata.PushBack(file_metadata, allocator);
576 value.AddMember(
"files_metadata", files_metadata, allocator);
580 size_t total_size = 0;
582 total_size +=
files_[index]->getRemainingSize();
588 bool size_known =
true;
590 size_known = size_known &&
files_[index]->isRemainingSizeKnown();
597 for (
const auto& file :
files_) {
598 first_line_by_file_path.merge(file->getFirstLineForEachFile());
600 return first_line_by_file_path;
609 return files_.back()->getCurrentFilePath();
618 auto rolled_off_files =
620 if (!rolled_off_files.empty()) {
628 return rolled_off_files;
634 const std::optional<size_t>& max_file_count)
637 if (max_file_count.has_value() && file_paths.size() > max_file_count.value()) {
638 file_paths.erase(file_paths.begin(),
639 file_paths.begin() + (file_paths.size() - max_file_count.value()));
641 for (
const auto& file_path : file_paths) {
648 const rapidjson::Value& value)
653 files_.emplace_back(std::make_unique<CompressedFileReader>(
656 value[
"files_metadata"].GetArray()[index]));
658 files_.emplace_back(std::make_unique<SingleTextFileReader>(
661 value[
"files_metadata"].GetArray()[index]));
672 if (
files_.back()->isScanFinished()) {
686 std::set<std::string> new_locations;
689 if (boost::filesystem::is_directory(
file_path_)) {
692 for (
const auto& path : all_file_paths) {
694 new_locations.insert(path);
710 if (last_file_index > 0) {
714 if (!
files_.back()->isScanFinished()) {
722 if (new_locations.size() > 0) {
723 for (
const auto& location : new_locations) {
764 size_t read_size = size;
769 size_t bytes_read =
files_[index].get()->readRegion(buffer, offset - base, read_size);
DEVICE auto upper_bound(ARGS &&...args)
bool contains(const T &container, const U &element)
virtual bool isScanFinished() const =0
void checkForMoreRows(size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options, const UserMapping *user_mapping) override
std::vector< std::string > sourcenames_
void skipHeader() override
virtual std::string getFirstLine() const =0
size_t offset_to_index(const std::vector< size_t > &cumulative_sizes, size_t byte_offset)
std::vector< int > archive_entry_index_
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
std::vector< std::string > getAllFilePaths(const shared::FilePathOptions &file_path_options) const override
int getCurrentEntryIndex() const
bool isScanFinished() const override
std::string getFirstLine() const override
bool isEndOfLastFile() override
std::vector< size_t > cumulative_sizes_
void throw_removed_row_in_file_error(const std::string &file_path)
CompressedFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
virtual std::vector< std::string > getAllFilePaths(const shared::FilePathOptions &file_path_options) const =0
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
FirstLineByFilePath getFirstLineForEachFile() const override
size_t readRegion(void *buffer, size_t offset, size_t size) override
void checkForMoreRows(size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options, const UserMapping *user_mapping) override
bool is_end_of_last_file_
SingleTextFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
std::map< std::string, std::string > FirstLineByFilePath
std::set< std::string > check_for_rolled_off_file_paths(const std::vector< std::string > &all_file_paths, std::vector< std::string > &processed_file_paths)
MultiFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
void insertFile(std::string location)
import_export::CopyParams copy_params_
void throw_removed_file_error(const std::string &file_path)
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
ImportHeaderRow has_header
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
bool isScanFinished() const override
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
bool isRemainingSizeKnown() override
FirstLineByFilePath getFirstLineForEachFile() const override
::FILE * fopen(const char *filename, const char *mode)
bool currentEntryFinished() const
LocalMultiFileReader(const std::string &file_path, const import_export::CopyParams ©_params, const shared::FilePathOptions &options, const std::optional< size_t > &max_file_count)
size_t readRegion(void *buffer, size_t offset, size_t size) override
void checkForMoreRows(size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options, const UserMapping *user_mapping) override
void skipBytes(size_t n_bytes)
void consumeFirstLine(std::optional< std::string > &dest_str)
bool g_enable_smem_group_by true
size_t get_data_size(size_t file_size, size_t header_size)
std::unique_ptr< Archive > arch_
bool isScanFinished() const override
const void * current_block_
static constexpr size_t DEFAULT_HEADER_READ_SIZE
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
void adjust_eof(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
SingleFileReader(const std::string &file_path, const import_export::CopyParams ©_params)
size_t read(void *buffer, size_t max_size) override
void skipToEntry(int entry_number)
shared utility for mime-types
bool g_enable_watchdog false
size_t currentEntryDataAvailable() const
std::string getCurrentFilePath() const override
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)
std::string getFirstLine() const override
std::string getCurrentFilePath() const override
size_t getRemainingSize() override
void skipHeader() override
std::vector< std::string > file_locations_
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
virtual std::set< std::string > checkForRolledOffFiles(const shared::FilePathOptions &file_path_options)
size_t file_size(const int fd)
bool isEndOfLastFile() override
bool is_compressed_file_extension(const std::string &location)
std::vector< std::unique_ptr< FileReader > > files_
std::vector< size_t > cumulative_sizes_
size_t read(void *buffer, size_t max_size) override
size_t block_chars_remaining_