24 namespace foreign_storage {
34 while (i >= static_cast<int64_t>(start)) {
35 if (buffer[i] == line_delim) {
42 "Unable to find an end of line character after reading " +
49 const boost::regex& line_start_regex) {
50 return boost::regex_search(std::string{buffer + start, end - start + 1},
52 boost::regex_constants::match_continuous);
58 if (it != foreign_table->
options.end()) {
75 const char* buffer_end,
77 const std::optional<boost::regex>& line_start_regex) {
79 bool row_found{
false};
80 while (!row_found && row_end <= buffer_end) {
81 if (*row_end == line_delim) {
82 if (row_end == buffer_end) {
84 }
else if (line_start_regex.has_value()) {
88 <<
"'" << line_start_regex.value() <<
"' not found in: '"
89 << std::string{curr, row_end - curr + 1ULL} <<
"'";
90 auto row_str =
get_next_row(row_end + 1, buffer_end, line_delim, {});
92 row_str.c_str(), 0, row_str.length() - 1, line_start_regex.value())) {
93 row_end += row_str.length() + 1;
94 if (row_end == buffer_end) {
97 row_str =
get_next_row(row_end + 1, buffer_end, line_delim, {});
107 return std::string{curr,
static_cast<size_t>(row_end - curr - 1)};
114 const std::optional<boost::regex>& line_start_regex,
115 const boost::regex& line_regex,
116 bool remove_non_matches) {
118 auto buffer_end = buffer + end;
119 auto curr = buffer + start;
120 while (curr <= buffer_end) {
121 auto row_str =
get_next_row(curr, buffer_end, line_delim, line_start_regex);
122 curr += row_str.length() + 1;
123 if (remove_non_matches) {
124 if (boost::regex_match(row_str, line_regex)) {
135 const std::string& option_name) {
136 if (
auto it = foreign_table->
options.find(option_name);
137 it != foreign_table->
options.end()) {
138 if (boost::iequals(it->second,
"TRUE")) {
140 }
else if (boost::iequals(it->second,
"FALSE")) {
143 throw std::runtime_error{
"Invalid boolean value specified for \"" + option_name +
144 "\" foreign table option. "
145 "Value must be either 'true' or 'false'."};
161 bool convert_data_blocks,
162 bool columns_are_pre_filtered,
163 bool skip_dict_encoding)
const {
166 const char* buffer_end = request.
buffer.get() + request.
end_pos;
168 std::vector<size_t> row_offsets;
171 size_t current_row_id = 0;
172 size_t row_count = 0;
174 std::vector<std::string> parsed_columns_str;
175 parsed_columns_str.reserve(logical_column_count);
176 std::vector<std::string_view> parsed_columns_sv;
177 parsed_columns_sv.reserve(logical_column_count);
183 auto curr = buffer_start;
184 while (curr < buffer_end && remaining_row_count > 0) {
188 curr += row_str.length() + 1;
189 current_row_id = row_count++;
190 remaining_row_count--;
192 bool skip_all_columns =
195 [](
const auto& import_buffer) {
return !import_buffer; });
196 if (!skip_all_columns) {
199 bool set_all_nulls =
false;
201 parsed_columns_str.clear();
202 parsed_columns_sv.clear();
205 logical_column_count,
210 current_row_id = row_count--;
211 remaining_row_count++;
216 result.rejected_rows.insert(current_row_id);
217 auto cd_it = columns.begin();
225 size_t parsed_column_index = 0;
226 size_t import_buffer_index = 0;
228 for (
auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
230 const auto& column_type = cd->columnType;
235 (set_all_nulls ||
isNullDatum(parsed_columns_sv[parsed_column_index],
238 }
catch (
const std::exception& e) {
240 result.rejected_rows.insert(current_row_id);
242 columns, cd_it, import_buffer_index, request);
248 if (column_type.is_geometry()) {
249 auto starting_import_buffer_index = import_buffer_index;
261 }
catch (
const std::exception& e) {
263 result.rejected_rows.insert(current_row_id);
265 columns, cd_it, starting_import_buffer_index, request);
272 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
277 auto& column_sv = parsed_columns_sv[parsed_column_index];
284 parsed_columns_sv[parsed_column_index],
287 }
catch (
const std::exception& e) {
289 result.rejected_rows.insert(current_row_id);
291 columns, cd_it, import_buffer_index, request);
297 parsed_column_index++;
298 import_buffer_index++;
302 for (
int i = 0; i < column_type.get_physical_cols(); i++) {
303 import_buffer_index++;
306 parsed_column_index++;
307 import_buffer_index++;
313 }
catch (
const std::exception& e) {
315 "\" in row \"" + row_str +
"\" in file \"" +
321 result.row_offsets = row_offsets;
322 result.row_count = row_count;
323 if (convert_data_blocks) {
324 result.column_id_to_data_blocks_map =
331 const std::string& row_str,
332 const boost::regex& line_regex,
333 size_t logical_column_count,
334 std::vector<std::string>& parsed_columns_str,
335 std::vector<std::string_view>& parsed_columns_sv,
336 const std::string& file_path)
const {
338 bool set_all_nulls{
false};
339 if (boost::regex_match(row_str, match, line_regex)) {
340 auto matched_column_count = match.size() - 1 + parsed_columns_sv.size();
341 if (logical_column_count != matched_column_count) {
343 logical_column_count, matched_column_count, file_path);
345 CHECK_GT(match.size(),
static_cast<size_t>(1));
346 for (
size_t i = 1; i < match.size(); i++) {
347 parsed_columns_str.emplace_back(match[i].str());
348 parsed_columns_sv.emplace_back(parsed_columns_str.back());
351 parsed_columns_str.clear();
353 std::vector<std::string_view>(logical_column_count, std::string_view{});
354 set_all_nulls =
true;
356 return set_all_nulls;
364 if (has_header.has_value()) {
365 if (has_header.value()) {
375 it != foreign_table->
options.end()) {
376 copy_params.buffer_size = std::stoi(it->second);
379 it != foreign_table->
options.end()) {
380 copy_params.threads = std::stoi(it->second);
382 copy_params.geo_validate_geometry =
384 .value_or(copy_params.geo_validate_geometry);
390 std::unique_ptr<
char[]>& buffer,
393 const size_t buffer_first_row_index,
394 unsigned int& num_rows_in_buffer,
396 CHECK_GT(buffer_size, static_cast<size_t>(0));
398 size_t end_pos = buffer_size - 1;
399 bool found_end_pos{
false};
400 while (!found_end_pos) {
403 buffer.get(), buffer_size, start_pos, end_pos, copy_params.
line_delim);
406 found_end_pos =
true;
413 CHECK_GT(end_pos, static_cast<size_t>(0));
414 auto old_end_pos = end_pos;
422 old_end_pos = end_pos;
429 found_end_pos =
true;
431 found_end_pos =
true;
433 }
catch (InsufficientBufferSizeException& e) {
438 start_pos = buffer_size;
441 end_pos = buffer_size - 1;
444 CHECK(found_end_pos);
461 for (
const auto& [file_path,
line] : first_line_by_file_path) {
466 CHECK(line_start_regex.has_value());
468 "\" does not match line start regex \"" +
469 line_start_regex.value() +
"\""};
virtual bool isScanFinished() const =0
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
static const std::string BUFFER_SIZE_KEY
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
static const std::string LINE_REGEX_KEY
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
static const std::string HEADER_KEY
RegexFileBufferParser(const ForeignTable *foreign_table)
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams ©_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
const import_export::CopyParams copy_params
virtual bool isEndOfLastFile()=0
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
const bool track_rejected_rows
std::unique_ptr< ForeignTableSchema > foreign_table_schema
static void setMaxBufferResize(size_t max_buffer_resize)
std::string get_line_regex(const ForeignTable *foreign_table)
CONSTEXPR DEVICE bool is_null(const T &value)
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const override
virtual bool shouldRemoveNonMatches() const
static size_t max_buffer_resize_
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
std::list< const ColumnDescriptor * > getColumns() const
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const override
std::optional< std::string > get_line_start_regex(const ForeignTable *foreign_table)
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
std::string get_next_row(const char *curr, const char *buffer_end, char line_delim, const std::optional< boost::regex > &line_start_regex)
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
std::optional< boost::regex > line_start_regex_
size_t find_last_end_of_line(const char *buffer, size_t buffer_size, size_t start, size_t end, char line_delim)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
virtual bool regexMatchColumns(const std::string &row_str, const boost::regex &line_regex, size_t logical_column_count, std::vector< std::string > &parsed_columns_str, std::vector< std::string_view > &parsed_columns_sv, const std::string &file_path) const
static size_t getMaxBufferResize()
static const std::string THREADS_KEY
static const std::string LINE_START_REGEX_KEY
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
size_t get_row_count(const char *buffer, size_t start, size_t end, char line_delim, const std::optional< boost::regex > &line_start_regex, const boost::regex &line_regex, bool remove_non_matches)
static constexpr size_t MAX_STRLEN
static size_t max_buffer_resize
virtual FirstLineByFilePath getFirstLineForEachFile() const =0
std::string getFilePath() const
std::unique_ptr< char[]> buffer
bool line_starts_with_regex(const char *buffer, size_t start, size_t end, const boost::regex &line_start_regex)
void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const override
virtual bool shouldTruncateStringValues() const