OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RegexFileBufferParser.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
21 #include "Shared/StringTransform.h"
23 
24 namespace foreign_storage {
25 namespace {
27 
28 size_t find_last_end_of_line(const char* buffer,
29  size_t buffer_size,
30  size_t start,
31  size_t end,
32  char line_delim) {
33  int64_t i = end;
34  while (i >= static_cast<int64_t>(start)) {
35  if (buffer[i] == line_delim) {
36  return i;
37  } else {
38  i--;
39  }
40  }
42  "Unable to find an end of line character after reading " +
43  std::to_string(buffer_size) + " characters."};
44 }
45 
46 bool line_starts_with_regex(const char* buffer,
47  size_t start,
48  size_t end,
49  const boost::regex& line_start_regex) {
50  return boost::regex_search(std::string{buffer + start, end - start + 1},
51  line_start_regex,
52  boost::regex_constants::match_continuous);
53 }
54 
55 std::optional<std::string> get_line_start_regex(const ForeignTable* foreign_table) {
56  if (foreign_table) {
57  auto it = foreign_table->options.find(RegexFileBufferParser::LINE_START_REGEX_KEY);
58  if (it != foreign_table->options.end()) {
59  return it->second;
60  }
61  }
62  return {};
63 }
64 
65 std::string get_line_regex(const ForeignTable* foreign_table) {
66  if (foreign_table) {
67  auto it = foreign_table->options.find(RegexFileBufferParser::LINE_REGEX_KEY);
68  CHECK(it != foreign_table->options.end());
69  return it->second;
70  }
71  return {};
72 }
73 
74 std::string get_next_row(const char* curr,
75  const char* buffer_end,
76  char line_delim,
77  const std::optional<boost::regex>& line_start_regex) {
78  auto row_end = curr;
79  bool row_found{false};
80  while (!row_found && row_end <= buffer_end) {
81  if (*row_end == line_delim) {
82  if (row_end == buffer_end) {
83  row_found = true;
84  } else if (line_start_regex.has_value()) {
85  // When a LINE_START_REGEX option is present, concatenate the following lines
86  // until a line that starts with the specified regex is found.
87  CHECK(line_starts_with_regex(curr, 0, row_end - curr, line_start_regex.value()))
88  << "'" << line_start_regex.value() << "' not found in: '"
89  << std::string{curr, row_end - curr + 1ULL} << "'";
90  auto row_str = get_next_row(row_end + 1, buffer_end, line_delim, {});
91  while (!line_starts_with_regex(
92  row_str.c_str(), 0, row_str.length() - 1, line_start_regex.value())) {
93  row_end += row_str.length() + 1;
94  if (row_end == buffer_end) {
95  break;
96  }
97  row_str = get_next_row(row_end + 1, buffer_end, line_delim, {});
98  }
99  row_found = true;
100  } else {
101  row_found = true;
102  }
103  }
104  row_end++;
105  }
106  CHECK(row_found);
107  return std::string{curr, static_cast<size_t>(row_end - curr - 1)};
108 }
109 
110 size_t get_row_count(const char* buffer,
111  size_t start,
112  size_t end,
113  char line_delim,
114  const std::optional<boost::regex>& line_start_regex,
115  const boost::regex& line_regex,
116  bool remove_non_matches) {
117  size_t row_count{0};
118  auto buffer_end = buffer + end;
119  auto curr = buffer + start;
120  while (curr <= buffer_end) {
121  auto row_str = get_next_row(curr, buffer_end, line_delim, line_start_regex);
122  curr += row_str.length() + 1;
123  if (remove_non_matches) {
124  if (boost::regex_match(row_str, line_regex)) {
125  row_count++;
126  }
127  } else {
128  row_count++;
129  }
130  }
131  return row_count;
132 }
133 
134 std::optional<bool> validate_and_get_bool_value(const ForeignTable* foreign_table,
135  const std::string& option_name) {
136  if (auto it = foreign_table->options.find(option_name);
137  it != foreign_table->options.end()) {
138  if (boost::iequals(it->second, "TRUE")) {
139  return true;
140  } else if (boost::iequals(it->second, "FALSE")) {
141  return false;
142  } else {
143  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
144  "\" foreign table option. "
145  "Value must be either 'true' or 'false'."};
146  }
147  }
148  return std::nullopt;
149 }
150 } // namespace
151 
153  : line_regex_(get_line_regex(foreign_table))
154  , line_start_regex_(get_line_start_regex(foreign_table)) {}
155 
161  bool convert_data_blocks,
162  bool columns_are_pre_filtered,
163  bool skip_dict_encoding) const {
164  CHECK(request.buffer);
165  char* buffer_start = request.buffer.get() + request.begin_pos;
166  const char* buffer_end = request.buffer.get() + request.end_pos;
167 
168  std::vector<size_t> row_offsets;
169  row_offsets.emplace_back(request.file_offset + request.begin_pos);
170 
171  size_t current_row_id = 0;
172  size_t row_count = 0;
173  auto logical_column_count = request.foreign_table_schema->getLogicalColumns().size();
174  std::vector<std::string> parsed_columns_str;
175  parsed_columns_str.reserve(logical_column_count);
176  std::vector<std::string_view> parsed_columns_sv;
177  parsed_columns_sv.reserve(logical_column_count);
178 
180 
181  std::string row_str;
182  size_t remaining_row_count = request.process_row_count;
183  auto curr = buffer_start;
184  while (curr < buffer_end && remaining_row_count > 0) {
185  try {
186  row_str = get_next_row(
187  curr, buffer_end - 1, request.copy_params.line_delim, line_start_regex_);
188  curr += row_str.length() + 1;
189  current_row_id = row_count++;
190  remaining_row_count--;
191 
192  bool skip_all_columns =
193  std::all_of(request.import_buffers.begin(),
194  request.import_buffers.end(),
195  [](const auto& import_buffer) { return !import_buffer; });
196  if (!skip_all_columns) {
197  auto columns = request.getColumns();
198 
199  bool set_all_nulls = false;
200  try {
201  parsed_columns_str.clear();
202  parsed_columns_sv.clear();
203  set_all_nulls = regexMatchColumns(row_str,
204  line_regex_,
205  logical_column_count,
206  parsed_columns_str,
207  parsed_columns_sv,
208  request.getFilePath());
209  if (set_all_nulls && shouldRemoveNonMatches()) {
210  current_row_id = row_count--;
211  remaining_row_count++;
212  continue;
213  }
214  } catch (const ForeignStorageException& e) {
215  if (request.track_rejected_rows) {
216  result.rejected_rows.insert(current_row_id);
217  auto cd_it = columns.begin();
218  fillRejectedRowWithInvalidData(columns, cd_it, 0, request);
219  continue;
220  } else {
221  throw;
222  }
223  }
224 
225  size_t parsed_column_index = 0;
226  size_t import_buffer_index = 0;
227 
228  for (auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
229  auto cd = *cd_it;
230  const auto& column_type = cd->columnType;
231  if (request.import_buffers[import_buffer_index]) {
232  bool is_null = false;
233  try {
234  is_null =
235  (set_all_nulls || isNullDatum(parsed_columns_sv[parsed_column_index],
236  cd,
237  request.copy_params.null_str));
238  } catch (const std::exception& e) {
239  if (request.track_rejected_rows) {
240  result.rejected_rows.insert(current_row_id);
242  columns, cd_it, import_buffer_index, request);
243  break; // skip rest of row
244  } else {
245  throw;
246  }
247  }
248  if (column_type.is_geometry()) {
249  auto starting_import_buffer_index = import_buffer_index;
250  try {
252  import_buffer_index,
253  request.copy_params,
254  cd_it,
255  parsed_columns_sv,
256  parsed_column_index,
257  is_null,
258  request.first_row_index,
259  row_count,
260  request.getCatalog());
261  } catch (const std::exception& e) {
262  if (request.track_rejected_rows) {
263  result.rejected_rows.insert(current_row_id);
265  columns, cd_it, starting_import_buffer_index, request);
266  break; // skip rest of row
267  } else {
268  throw;
269  }
270  }
271  // Skip remaining physical columns
272  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
273  ++cd_it;
274  }
275  } else {
276  try {
277  auto& column_sv = parsed_columns_sv[parsed_column_index];
278  if (column_type.is_string() && shouldTruncateStringValues() &&
279  column_sv.length() > StringDictionary::MAX_STRLEN) {
280  column_sv = column_sv.substr(0, StringDictionary::MAX_STRLEN);
281  }
282  request.import_buffers[import_buffer_index]->add_value(
283  cd,
284  parsed_columns_sv[parsed_column_index],
285  is_null,
286  request.copy_params);
287  } catch (const std::exception& e) {
288  if (request.track_rejected_rows) {
289  result.rejected_rows.insert(current_row_id);
291  columns, cd_it, import_buffer_index, request);
292  break; // skip rest of row
293  } else {
294  throw;
295  }
296  }
297  parsed_column_index++;
298  import_buffer_index++;
299  }
300  } else {
301  // Skip column
302  for (int i = 0; i < column_type.get_physical_cols(); i++) {
303  import_buffer_index++;
304  cd_it++;
305  }
306  parsed_column_index++;
307  import_buffer_index++;
308  }
309  }
310  }
311  } catch (const ForeignStorageException& e) {
312  throw;
313  } catch (const std::exception& e) {
314  throw ForeignStorageException("Parsing failure \"" + std::string(e.what()) +
315  "\" in row \"" + row_str + "\" in file \"" +
316  request.getFilePath() + "\"");
317  }
318  }
319  row_offsets.emplace_back(request.file_offset + (curr - request.buffer.get()));
320 
321  result.row_offsets = row_offsets;
322  result.row_count = row_count;
323  if (convert_data_blocks) {
324  result.column_id_to_data_blocks_map =
325  convertImportBuffersToDataBlocks(request.import_buffers, skip_dict_encoding);
326  }
327  return result;
328 }
329 
331  const std::string& row_str,
332  const boost::regex& line_regex,
333  size_t logical_column_count,
334  std::vector<std::string>& parsed_columns_str,
335  std::vector<std::string_view>& parsed_columns_sv,
336  const std::string& file_path) const {
337  boost::smatch match;
338  bool set_all_nulls{false};
339  if (boost::regex_match(row_str, match, line_regex)) {
340  auto matched_column_count = match.size() - 1 + parsed_columns_sv.size();
341  if (logical_column_count != matched_column_count) {
343  logical_column_count, matched_column_count, file_path);
344  }
345  CHECK_GT(match.size(), static_cast<size_t>(1));
346  for (size_t i = 1; i < match.size(); i++) {
347  parsed_columns_str.emplace_back(match[i].str());
348  parsed_columns_sv.emplace_back(parsed_columns_str.back());
349  }
350  } else {
351  parsed_columns_str.clear();
352  parsed_columns_sv =
353  std::vector<std::string_view>(logical_column_count, std::string_view{});
354  set_all_nulls = true;
355  }
356  return set_all_nulls;
357 }
358 
360  const ForeignTable* foreign_table) const {
361  import_export::CopyParams copy_params{};
362  copy_params.plain_text = true;
363  auto has_header = validate_and_get_bool_value(foreign_table, HEADER_KEY);
364  if (has_header.has_value()) {
365  if (has_header.value()) {
366  copy_params.has_header = import_export::ImportHeaderRow::kHasHeader;
367  } else {
368  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
369  }
370  } else {
371  // By default, regex parsed files are not assumed to have headers.
372  copy_params.has_header = import_export::ImportHeaderRow::kNoHeader;
373  }
374  if (auto it = foreign_table->options.find(BUFFER_SIZE_KEY);
375  it != foreign_table->options.end()) {
376  copy_params.buffer_size = std::stoi(it->second);
377  }
378  if (auto it = foreign_table->options.find(AbstractFileStorageDataWrapper::THREADS_KEY);
379  it != foreign_table->options.end()) {
380  copy_params.threads = std::stoi(it->second);
381  }
382  copy_params.geo_validate_geometry =
384  .value_or(copy_params.geo_validate_geometry);
385  return copy_params;
386 }
387 
389  size_t& alloc_size,
390  std::unique_ptr<char[]>& buffer,
391  size_t& buffer_size,
392  const import_export::CopyParams& copy_params,
393  const size_t buffer_first_row_index,
394  unsigned int& num_rows_in_buffer,
395  foreign_storage::FileReader* file_reader) const {
396  CHECK_GT(buffer_size, static_cast<size_t>(0));
397  size_t start_pos{0};
398  size_t end_pos = buffer_size - 1;
399  bool found_end_pos{false};
400  while (!found_end_pos) {
401  try {
402  end_pos = find_last_end_of_line(
403  buffer.get(), buffer_size, start_pos, end_pos, copy_params.line_delim);
404  if (file_reader->isEndOfLastFile()) {
405  CHECK_EQ(end_pos, buffer_size - 1);
406  found_end_pos = true;
407  } else if (line_start_regex_.has_value()) {
408  // When a LINE_START_REGEX option is present and the file reader is not at the end
409  // of file, return the position of the end of line before the last line that
410  // matches the line start regex, since the last line that matches the line start
411  // regex in this buffer may still have to include/concatenate lines beyond this
412  // buffer.
413  CHECK_GT(end_pos, static_cast<size_t>(0));
414  auto old_end_pos = end_pos;
415  end_pos = find_last_end_of_line(buffer.get(),
416  buffer_size,
417  start_pos,
418  old_end_pos - 1,
419  copy_params.line_delim);
420  while (!line_starts_with_regex(
421  buffer.get(), end_pos + 1, old_end_pos, line_start_regex_.value())) {
422  old_end_pos = end_pos;
423  end_pos = find_last_end_of_line(buffer.get(),
424  buffer_size,
425  start_pos,
426  old_end_pos - 1,
427  copy_params.line_delim);
428  }
429  found_end_pos = true;
430  } else {
431  found_end_pos = true;
432  }
433  } catch (InsufficientBufferSizeException& e) {
435  if (alloc_size >= max_buffer_resize || file_reader->isScanFinished()) {
436  throw;
437  }
438  start_pos = buffer_size;
440  buffer, buffer_size, alloc_size, nullptr, file_reader, max_buffer_resize);
441  end_pos = buffer_size - 1;
442  }
443  }
444  CHECK(found_end_pos);
445  num_rows_in_buffer = get_row_count(buffer.get(),
446  0,
447  end_pos,
448  copy_params.line_delim,
450  line_regex_,
452  return end_pos + 1;
453 }
454 
456  const ForeignTable* foreign_table) const {
457  if (line_start_regex_.has_value()) {
458  // When a LINE_START_REGEX option is specified, at least the first line in each file
459  // has to start with the specified regex.
460  auto first_line_by_file_path = file_reader->getFirstLineForEachFile();
461  for (const auto& [file_path, line] : first_line_by_file_path) {
462  if (!line.empty() &&
464  line.c_str(), 0, line.length() - 1, line_start_regex_.value())) {
465  auto line_start_regex = get_line_start_regex(foreign_table);
466  CHECK(line_start_regex.has_value());
467  throw ForeignStorageException{"First line in file \"" + file_path +
468  "\" does not match line start regex \"" +
469  line_start_regex.value() + "\""};
470  }
471  }
472  }
473 }
474 
477 }
478 
480  return max_buffer_resize_;
481 }
482 
484  return false;
485 }
486 
488  return false;
489 }
490 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:301
virtual bool isScanFinished() const =0
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
RegexFileBufferParser(const ForeignTable *foreign_table)
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
const import_export::CopyParams copy_params
virtual bool isEndOfLastFile()=0
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string to_string(char const *&&v)
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
std::unique_ptr< ForeignTableSchema > foreign_table_schema
static void setMaxBufferResize(size_t max_buffer_resize)
std::string get_line_regex(const ForeignTable *foreign_table)
CONSTEXPR DEVICE bool is_null(const T &value)
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const override
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
std::list< const ColumnDescriptor * > getColumns() const
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const override
std::optional< std::string > get_line_start_regex(const ForeignTable *foreign_table)
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
std::string get_next_row(const char *curr, const char *buffer_end, char line_delim, const std::optional< boost::regex > &line_start_regex)
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
Definition: ForeignTable.h:49
std::optional< boost::regex > line_start_regex_
tuple line
Definition: parse_ast.py:10
size_t find_last_end_of_line(const char *buffer, size_t buffer_size, size_t start, size_t end, char line_delim)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
virtual bool regexMatchColumns(const std::string &row_str, const boost::regex &line_regex, size_t logical_column_count, std::vector< std::string > &parsed_columns_str, std::vector< std::string_view > &parsed_columns_sv, const std::string &file_path) const
#define CHECK(condition)
Definition: Logger.h:291
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
size_t get_row_count(const char *buffer, size_t start, size_t end, char line_delim, const std::optional< boost::regex > &line_start_regex, const boost::regex &line_regex, bool remove_non_matches)
static constexpr size_t MAX_STRLEN
virtual FirstLineByFilePath getFirstLineForEachFile() const =0
bool line_starts_with_regex(const char *buffer, size_t start, size_t end, const boost::regex &line_start_regex)
void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const override