25 #include <string_view>
33 return c == copy_params.
line_delim || c ==
'\n' || c ==
'\r';
36 inline void trim_space(
const char*& field_begin,
const char*& field_end) {
37 while (field_begin < field_end && (*field_begin ==
' ' || *field_begin ==
'\r')) {
40 while (field_begin < field_end &&
41 (*(field_end - 1) ==
' ' || *(field_end - 1) ==
'\r')) {
47 const char*& field_end,
49 auto quote_begin = field_begin, quote_end = field_end;
53 if (copy_params.
quoted && quote_end - quote_begin > 0) {
54 if (*quote_begin == copy_params.
quote && *(quote_end - 1) == copy_params.
quote) {
55 field_begin = ++quote_begin;
56 field_end = (quote_begin == quote_end) ? quote_end : --quote_end;
59 "Unable to trim quotes.");
65 namespace import_export {
66 namespace delimited_parser {
72 if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.
line_delim)) {
76 const char* buf = buffer + begin;
77 for (i = 0; i < end - begin; i++) {
88 unsigned int& num_rows_this_buffer,
89 size_t buffer_first_row_index,
92 size_t last_line_delim_pos = 0;
93 const char* current = buffer + offset;
95 while (current < buffer + size) {
96 while (!in_quote && current < buffer + size) {
99 last_line_delim_pos = current - buffer;
100 ++num_rows_this_buffer;
101 }
else if (*current == copy_params.
quote) {
107 while (in_quote && current < buffer + size) {
109 if ((*current == copy_params.
escape) && (current < buffer + size - 1) &&
110 (*(current + 1) == copy_params.
quote)) {
112 }
else if (*current == copy_params.
quote) {
119 while (current < buffer + size) {
121 last_line_delim_pos = current - buffer;
122 ++num_rows_this_buffer;
128 if (last_line_delim_pos <= 0) {
129 size_t excerpt_length = std::min<size_t>(50, size);
130 std::string buffer_excerpt{buffer, buffer + excerpt_length};
132 std::string quote(1, copy_params.
quote);
133 std::string error_message =
134 "Unable to find a matching end quote for the quote character '" + quote +
136 " characters. Please ensure that all data fields are correctly formatted "
137 "or update the \"buffer_size\" option appropriately. Row number: " +
139 ". First few characters in row: " + buffer_excerpt;
142 std::string error_message =
143 "Unable to find an end of line character after reading " +
145 " characters. Please ensure that the correct \"line_delimiter\" option is "
146 "specified or update the \"buffer_size\" option appropriately. Row number: " +
148 ". First few characters in row: " + buffer_excerpt;
153 return last_line_delim_pos + 1;
167 std::unique_ptr<
char[]>& buffer,
170 const size_t buffer_first_row_index,
171 unsigned int& num_rows_in_buffer,
174 bool found_end_pos{
false};
175 bool in_quote{
false};
178 CHECK(file !=
nullptr || file_reader !=
nullptr);
180 while (!found_end_pos) {
186 buffer_first_row_index,
189 found_end_pos =
true;
197 offset = buffer_size;
205 template <
typename T>
208 const char* entire_buf_end,
210 const bool* is_array,
212 std::vector<std::unique_ptr<
char[]>>& tmp_buffers,
213 bool& try_single_thread,
214 bool filter_empty_lines) {
215 const char*
field = buf;
217 bool in_quote =
false;
218 bool in_array =
false;
219 bool has_escape =
false;
220 bool strip_quotes =
false;
221 try_single_thread =
false;
222 for (p = buf; p < entire_buf_end; ++p) {
223 if (*p == copy_params.
escape && p < entire_buf_end - 1 &&
224 *(p + 1) == copy_params.
quote) {
227 }
else if (copy_params.
quoted && *p == copy_params.
quote) {
228 in_quote = !in_quote;
232 }
else if (!in_quote && is_array !=
nullptr && *p == copy_params.
array_begin &&
233 is_array[row.size()]) {
235 while (p < entire_buf_end - 1) {
244 if (!has_escape && !strip_quotes) {
245 const char* field_end = p;
249 row.emplace_back(field, field_end - field);
251 tmp_buffers.emplace_back(std::make_unique<
char[]>(p - field + 1));
252 auto field_buf = tmp_buffers.back().get();
254 for (; i < p -
field; i++, j++) {
255 if (has_escape && field[i] == copy_params.
escape &&
256 field[i + 1] == copy_params.
quote) {
257 field_buf[j] = copy_params.
quote;
260 field_buf[j] = field[i];
263 const char* field_begin = field_buf;
264 const char* field_end = field_buf + j;
269 row.emplace_back(field_begin, field_end - field_begin);
273 strip_quotes =
false;
275 if (
is_eol(*p, copy_params)) {
277 if (filter_empty_lines) {
278 while (p + 1 < buf_end &&
is_eol(*(p + 1), copy_params)) {
283 if (p + 1 < buf_end && *p ==
'\r' && *(p + 1) ==
'\n') {
297 try_single_thread =
true;
301 try_single_thread =
true;
306 template const char*
get_row(
const char* buf,
308 const char* entire_buf_end,
310 const bool* is_array,
311 std::vector<std::string>& row,
312 std::vector<std::unique_ptr<
char[]>>& tmp_buffers,
313 bool& try_single_thread,
314 bool filter_empty_lines);
316 template const char*
get_row(
const char* buf,
318 const char* entire_buf_end,
320 const bool* is_array,
321 std::vector<std::string_view>& row,
322 std::vector<std::unique_ptr<
char[]>>& tmp_buffers,
323 bool& try_single_thread,
324 bool filter_empty_lines);
328 std::vector<std::string>& string_vec,
329 bool truncate_values) {
330 if (s == copy_params.
null_str || s ==
"NULL" || s.size() < 1 || s.empty()) {
334 throw std::runtime_error(
"Malformed Array :" + s);
337 std::string row(s.c_str() + 1, s.length() - 2);
343 bool try_single_thread =
false;
346 std::vector<std::unique_ptr<char[]>> tmp_buffers;
348 row.c_str() + row.length(),
349 row.c_str() + row.length(),
357 for (
size_t i = 0; i < string_vec.size(); ++i) {
359 if (truncate_values) {
362 throw std::runtime_error(
"Array String too long : " + string_vec[i] +
" max is " +
369 for (
auto& value : string_vec) {
370 if (value == copy_params.
null_str || value ==
"NULL" || value.empty()) {
382 auto old_buffer = std::move(buffer);
383 alloc_size = std::min(max_buffer_resize, alloc_size * 2);
384 LOG(
INFO) <<
"Setting import thread buffer allocation size to " << alloc_size
386 buffer = std::make_unique<char[]>(alloc_size);
388 memcpy(buffer.get(), old_buffer.get(), buffer_size);
390 CHECK(file !=
nullptr || file_reader !=
nullptr);
391 if (file !=
nullptr) {
392 fread_size = fread(buffer.get() + buffer_size, 1, alloc_size - buffer_size, file);
394 fread_size = file_reader->
read(buffer.get() + buffer_size, alloc_size - buffer_size);
396 buffer_size += fread_size;
bool is_eol(const char &c, const import_export::CopyParams ©_params)
virtual bool isScanFinished() const =0
virtual size_t read(void *buffer, size_t max_size)=0
void trim_quotes(const char *&field_begin, const char *&field_end, const import_export::CopyParams ©_params)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams ©_params)
Finds the closest possible row beginning in the given buffer.
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams ©_params, unsigned int &num_rows_this_buffer, size_t buffer_first_row_index, bool &in_quote, size_t offset)
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams ©_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
void parse_string_array(const std::string &s, const import_export::CopyParams ©_params, std::vector< std::string > &string_vec, bool truncate_values)
Parses given string array and inserts into given vector of strings.
size_t get_max_buffer_resize()
Gets the maximum size to which thread buffers should be automatically resized.
void set_max_buffer_resize(const size_t max_buffer_resize_param)
Sets the maximum size to which thread buffers should be automatically resized. This function is only ...
void extend_buffer(std::unique_ptr< char[]> &buffer, size_t &buffer_size, size_t &alloc_size, FILE *file, foreign_storage::FileReader *file_reader, size_t max_buffer_resize)
void trim_space(const char *&field_begin, const char *&field_end)
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
static constexpr size_t MAX_STRLEN
static size_t max_buffer_resize
static constexpr size_t max_import_buffer_resize_byte_size
static const std::string trim_space(const char *field, const size_t len)