21 namespace foreign_storage {
27 std::set<int> column_filter_set,
28 const std::string& full_path,
29 const bool track_rejected_rows)
30 : buffer_size(buffer_size)
31 , buffer_alloc_size(buffer_size)
32 , copy_params(copy_params)
35 , full_path(full_path)
36 , track_rejected_rows(track_rejected_rows) {
37 if (buffer_size > 0) {
42 if (column_filter_set.find(column->columnId) == column_filter_set.end()) {
46 if (column->columnType.is_dict_encoded_string() ||
47 (column->columnType.is_array() &&
IS_STRING(column->columnType.get_subtype()) &&
49 auto dict_descriptor =
50 getCatalog()->getMetadataForDict(column->columnType.get_comp_param(),
true);
51 string_dictionary = dict_descriptor->stringDict.get();
54 std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
60 const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
61 const bool skip_dict_encoding) {
62 std::map<int, DataBlockPtr>
result;
63 std::vector<std::pair<const size_t, std::future<int8_t*>>>
64 encoded_data_block_ptrs_futures;
66 for (
const auto& import_buffer : import_buffers) {
67 if (import_buffer ==
nullptr) {
71 if (import_buffer->getTypeInfo().is_number() ||
72 import_buffer->getTypeInfo().is_time() ||
73 import_buffer->getTypeInfo().get_type() ==
kBOOLEAN) {
75 }
else if (import_buffer->getTypeInfo().is_string()) {
76 auto string_payload_ptr = import_buffer->getStringBuffer();
77 if (import_buffer->getTypeInfo().get_compression() ==
kENCODING_NONE) {
83 if (!skip_dict_encoding) {
84 auto column_id = import_buffer->getColumnDesc()->columnId;
85 encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
88 import_buffer->addDictEncodedString(*string_payload_ptr);
89 return import_buffer->getStringDictBuffer();
93 }
else if (import_buffer->getTypeInfo().is_geometry()) {
94 auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
97 CHECK(import_buffer->getTypeInfo().get_type() ==
kARRAY);
98 if (
IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
100 import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
101 p.
arraysPtr = import_buffer->getStringArrayDictBuffer();
103 p.
arraysPtr = import_buffer->getArrayBuffer();
106 result[import_buffer->getColumnDesc()->columnId] = p;
109 if (!skip_dict_encoding) {
111 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
112 encoded_ptr_future.second.wait();
114 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
115 result[encoded_ptr_future.first].
numbersPtr = encoded_ptr_future.second.get();
123 return datum.size() > 0 && (datum[0] ==
'.' || isdigit(datum[0]) || datum[0] ==
'-') &&
124 datum.find_first_of(
"ABCDEFabcdef") == std::string_view::npos;
130 const std::string_view lat_str,
132 std::vector<double>& coords,
133 const bool is_lon_lat_order) {
134 double lon = std::atof(std::string(lon_str).c_str());
138 lat = std::atof(std::string(lat_str).c_str());
142 if (!is_lon_lat_order) {
152 if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
158 if (!pt.transform(ti)) {
165 coords.push_back(lon);
166 coords.push_back(lat);
172 const std::list<const ColumnDescriptor*>& columns,
173 std::list<const ColumnDescriptor*>::iterator& cd_it,
174 const size_t starting_col_idx,
176 size_t col_idx = starting_col_idx;
178 for (; cd_it != columns.end(); cd_it++) {
180 const auto& col_ti = cd->columnType;
181 if (col_ti.is_geometry()) {
190 col_idx += col_ti.get_physical_cols();
193 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
210 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
214 std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
220 import_buffers[col_idx]->add_value(cd, copy_params.
null_str,
true, copy_params);
223 std::vector<double> coords;
224 std::vector<double> bounds;
225 std::vector<int> ring_sizes;
226 std::vector<int> poly_rings;
230 import_ti, coords, bounds, ring_sizes, poly_rings);
245 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
248 std::list<const ColumnDescriptor*>::iterator& cd_it,
249 std::vector<std::string_view>& row,
252 size_t first_row_index,
253 size_t row_index_plus_one,
254 std::shared_ptr<Catalog_Namespace::Catalog> catalog) {
256 auto col_ti = cd->columnType;
257 SQLTypes col_type = col_ti.get_type();
260 auto starting_col_idx = col_idx;
262 auto const& geo_string = row[import_idx];
266 std::vector<double> coords;
267 std::vector<double> bounds;
268 std::vector<int> ring_sizes;
269 std::vector<int> poly_rings;
273 if (import_ti.get_output_srid() == 4326) {
277 import_ti.set_input_srid(srid0);
283 geo_string, row[import_idx], import_ti, coords, copy_params.
lonlat)) {
284 throw std::runtime_error(
"Cannot read lon/lat to insert into POINT column " +
289 if (is_null || geo_string.empty() || geo_string ==
"NULL") {
291 import_ti, coords, bounds, ring_sizes, poly_rings);
296 std::string(geo_string),
303 std::string msg =
"Failed to extract valid geometry from row " +
305 " for column " + cd->columnName;
306 throw std::runtime_error(msg);
311 throw std::runtime_error(
"Imported geometry doesn't match the type of column " +
318 if (is_null && col_ti.get_notnull()) {
319 throw std::runtime_error(
"NULL value provided for column (" + cd->columnName +
320 ") with NOT NULL constraint.");
325 *catalog, cd, import_buffers, col_idx, coords, bounds, ring_sizes, poly_rings);
328 import_buffers[starting_col_idx]->add_value(
329 cd, copy_params.
null_str,
true, copy_params);
334 const std::string& null_indicator) {
343 throw std::runtime_error(
"NULL value provided for column (" + column->
columnName +
344 ") with NOT NULL constraint.");
bool geo_promoted_type_match(const SQLTypes a, const SQLTypes b)
bool is_null_datum(const DatumStringType &datum, const std::string &null_indicator)
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
ParseBufferRequest(const ParseBufferRequest &request)=delete
std::vector< std::string > * stringsPtr
std::vector< ArrayDatum > * arraysPtr
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
static void processGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams ©_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
const import_export::CopyParams copy_params
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings)
HOST DEVICE SQLTypes get_type() const
void getColumns(std::vector< double > &coords) const
static void fillRejectedRowWithInvalidData(const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, SQLTypeInfo &ti, std::vector< double > &coords, const bool is_lon_lat_order)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool force_null=false)
future< Result > async(Fn &&fn, Args &&...args)
CONSTEXPR DEVICE bool is_null(const T &value)
bool geo_validate_geometry
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool validate_with_geos_if_available)
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > getColumns() const
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
static void processInvalidGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams ©_params, const ColumnDescriptor *cd, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
static bool isNullDatum(const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
static bool isCoordinateScalar(const std::string_view datum)
HOST DEVICE bool get_notnull() const
std::unique_ptr< char[]> buffer
DEVICE void swap(ARGS &&...args)