24 namespace foreign_storage {
32 std::unique_ptr<
bool[]>& array_flags,
35 const std::list<const ColumnDescriptor*>& columns) {
36 array_flags = std::unique_ptr<bool[]>(
new bool[columns.size()]);
38 for (
const auto cd : columns) {
39 const auto& col_ti = cd->columnType;
40 phys_cols += col_ti.get_physical_cols();
41 if (cd->columnType.get_type() ==
kPOINT) {
45 if (cd->columnType.get_type() ==
kARRAY) {
46 array_flags.get()[i] =
true;
48 array_flags.get()[i] =
false;
57 const std::string& file_name) {
59 if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
65 const std::string& option_name) {
66 if (
auto it = foreign_table->
options.find(option_name);
67 it != foreign_table->
options.end()) {
68 if (it->second.length() == 1) {
71 if (it->second == std::string(
"\\n")) {
73 }
else if (it->second == std::string(
"\\t")) {
76 throw std::runtime_error{
"Invalid value specified for option \"" + option_name +
77 "\". Expected a single character, \"\\n\" or \"\\t\"."};
85 const std::string& option_name,
86 const size_t expected_num_chars) {
87 if (
auto it = foreign_table->
options.find(option_name);
88 it != foreign_table->
options.end()) {
89 if (it->second.length() != expected_num_chars) {
90 throw std::runtime_error{
"Value of \"" + option_name +
91 "\" foreign table option has the wrong number of "
92 "characters. Expected " +
101 const std::string& option_name) {
102 if (
auto it = foreign_table->
options.find(option_name);
103 it != foreign_table->
options.end()) {
104 if (boost::iequals(it->second,
"TRUE")) {
106 }
else if (boost::iequals(it->second,
"FALSE")) {
109 throw std::runtime_error{
"Invalid boolean value specified for \"" + option_name +
110 "\" foreign table option. "
111 "Value must be either 'true' or 'false'."};
123 bool convert_data_blocks,
124 bool columns_are_pre_filtered,
125 bool skip_dict_encoding)
const {
129 const char* thread_buf = request.
buffer.get() + request.
begin_pos + begin;
130 const char* thread_buf_end = request.
buffer.get() + request.
end_pos;
133 std::vector<std::string_view> row;
134 size_t row_index_plus_one = 0;
135 const char* p = thread_buf;
136 bool try_single_thread =
false;
139 std::unique_ptr<bool[]> array_flags;
146 auto num_cols = request.
getColumns().size() - phys_cols;
147 if (columns_are_pre_filtered) {
148 for (
size_t col_idx = 0; col_idx < request.
getColumns().size(); ++col_idx) {
155 size_t current_row_id = 0;
156 size_t row_count = 0;
158 std::vector<size_t> row_offsets{};
164 for (; p < thread_buf_end && remaining_row_count > 0; p++, remaining_row_count--) {
166 current_row_id = row_count;
168 std::vector<std::unique_ptr<char[]>>
170 const char* line_start = p;
171 row_index_plus_one++;
172 bool incorrect_column_count =
false;
181 !columns_are_pre_filtered);
186 result.rejected_rows.insert(current_row_id);
187 incorrect_column_count =
true;
193 size_t import_idx = 0;
198 if (incorrect_column_count) {
199 auto cd_it = columns.begin();
203 for (
auto cd_it = columns.begin(); cd_it != columns.end(); ++cd_it) {
205 const auto& col_ti = cd->columnType;
206 bool column_is_present =
208 CHECK(row.size() > import_idx || !column_is_present);
211 is_null = column_is_present
214 }
catch (
const std::exception& e) {
216 result.rejected_rows.insert(current_row_id);
225 row[import_idx] =
sv_strip(row[import_idx]);
227 if (col_ti.is_geometry()) {
229 auto starting_col_idx = col_idx;
241 }
catch (
const std::exception& e) {
243 result.rejected_rows.insert(current_row_id);
252 if (!is_null && cd->columnType ==
kPOINT &&
254 if (!columns_are_pre_filtered) {
258 if (!columns_are_pre_filtered) {
262 col_idx += col_ti.get_physical_cols();
265 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
272 cd, row[import_idx], is_null, request.
copy_params);
273 }
catch (
const std::exception& e) {
275 result.rejected_rows.insert(current_row_id);
283 if (column_is_present) {
289 }
catch (
const std::exception& e) {
291 "\" in row \"" + std::string(line_start, p) +
292 "\" in file \"" + file_path +
"\"");
297 result.row_offsets = row_offsets;
298 result.row_count = row_count;
299 if (convert_data_blocks) {
300 result.column_id_to_data_blocks_map =
310 const std::string& row,
314 const std::string& file_name)
const {
315 bool is_array =
false;
316 bool try_single_thread =
false;
317 std::vector<std::unique_ptr<char[]>> tmp_buffers;
318 std::vector<std::string_view> fields;
321 row.c_str() + row.size(),
322 row.c_str() + row.size(),
340 copy_params.delimiter = value[0];
343 it != foreign_table->
options.end()) {
344 copy_params.null_str = it->second;
347 if (has_header.has_value()) {
348 if (has_header.value()) {
356 if (
const auto& value =
359 copy_params.quote = value[0];
361 if (
const auto& value =
364 copy_params.escape = value[0];
368 copy_params.line_delim = value[0];
370 if (
const auto& value =
373 copy_params.array_delim = value[0];
375 if (
const auto& value =
378 copy_params.array_begin = value[0];
379 copy_params.array_end = value[1];
383 copy_params.geo_explode_collections =
385 .value_or(copy_params.geo_explode_collections);
387 it != foreign_table->
options.end()) {
388 copy_params.source_srid = std::stoi(it->second);
392 it != foreign_table->
options.end()) {
393 copy_params.buffer_size = std::stoi(it->second);
396 it != foreign_table->
options.end()) {
397 copy_params.threads = std::stoi(it->second);
400 .value_or(copy_params.trim_spaces);
401 copy_params.geo_validate_geometry =
403 .value_or(copy_params.geo_validate_geometry);
410 std::unique_ptr<
char[]>& buffer,
413 const size_t buffer_first_row_index,
414 unsigned int& num_rows_in_buffer,
420 buffer_first_row_index,
static const std::string GEO_EXPLODE_COLLECTIONS_KEY
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
static const std::string BUFFER_SIZE_KEY
static const std::string HEADER_KEY
static const std::string TRIM_SPACES_KEY
static const std::string ARRAY_MARKER_KEY
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams ©_params)
Finds the closest possible row beginning in the given buffer.
static const std::string NULLS_KEY
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams ©_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
const import_export::CopyParams copy_params
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
const bool track_rejected_rows
std::unique_ptr< ForeignTableSchema > foreign_table_schema
static const std::string SOURCE_SRID_KEY
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
static const std::string LONLAT_KEY
CONSTEXPR DEVICE bool is_null(const T &value)
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
static const std::string LINE_DELIMITER_KEY
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const import_export::CopyParams ©_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread, bool filter_empty_lines)
Parses the first row in the given buffer and inserts fields into given vector.
size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, foreign_storage::FileReader *file_reader) const override
std::list< const ColumnDescriptor * > getColumns() const
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
static const std::string DELIMITER_KEY
import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const override
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
void validate_expected_column_count(std::vector< std::string_view > &row, size_t num_cols, int point_cols, const std::string &file_name)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
static const std::string ARRAY_DELIMITER_KEY
static const std::string THREADS_KEY
bool skip_column_import(ParseBufferRequest &request, int column_idx)
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
void validateExpectedColumnCount(const std::string &row, const import_export::CopyParams ©_params, size_t num_cols, int point_cols, const std::string &file_name) const
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
void set_array_flags_and_geo_columns_count(std::unique_ptr< bool[]> &array_flags, int &phys_cols, int &point_cols, const std::list< const ColumnDescriptor * > &columns)
static bool isCoordinateScalar(const std::string_view datum)
static const std::string QUOTED_KEY
std::string getFilePath() const
static const std::string QUOTE_KEY
std::unique_ptr< char[]> buffer
static const std::string ESCAPE_KEY
ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const override