31 using namespace ::apache::thrift;
34 switch (ct.col_type.type) {
35 case TDatumType::BIGINT:
37 case TDatumType::BOOL:
39 case TDatumType::DATE:
41 case TDatumType::DECIMAL:
43 case TDatumType::DOUBLE:
45 case TDatumType::FLOAT:
51 if (ct.col_type.precision == 0) {
56 case TDatumType::TIME:
58 case TDatumType::TIMESTAMP:
60 case TDatumType::SMALLINT:
62 case TDatumType::TINYINT:
64 case TDatumType::POINT:
66 case TDatumType::MULTIPOINT:
68 case TDatumType::LINESTRING:
70 case TDatumType::MULTILINESTRING:
72 case TDatumType::POLYGON:
74 case TDatumType::MULTIPOLYGON:
77 LOG(
FATAL) <<
"Unsupported TColumnType found, should not be possible";
83 if (ct.col_type.is_array) {
85 ct.col_type.precision,
97 ct.col_type.precision,
110 ct.col_type.precision,
112 ct.col_type.nullable,
119 std::vector<TStringValue> row,
121 std::ostringstream out;
123 for (TStringValue ts : row) {
137 std::vector<SQLTypeInfo> column_type_info_vector,
138 std::vector<TColumn>& input_col_vec) {
139 for (
size_t idx = 0; idx < failed_column; idx++) {
140 switch (column_type_info_vector[idx].get_type()) {
142 input_col_vec[idx].nulls.pop_back();
143 input_col_vec[idx].data.arr_col.pop_back();
148 input_col_vec[idx].nulls.pop_back();
149 input_col_vec[idx].data.str_col.pop_back();
161 input_col_vec[idx].nulls.pop_back();
162 input_col_vec[idx].data.int_col.pop_back();
166 input_col_vec[idx].nulls.pop_back();
167 input_col_vec[idx].data.real_col.pop_back();
175 input_col_vec[idx].nulls.pop_back();
176 input_col_vec[idx].data.str_col.pop_back();
179 LOG(
FATAL) <<
"Trying to process an unsupported datatype, should be impossible";
189 switch (column_type_info.
get_type()) {
191 LOG(
FATAL) <<
"Trying to process ARRAY at item level something is wrong";
203 input_col.nulls.push_back(
true);
204 input_col.data.str_col.emplace_back(
"");
207 input_col.nulls.push_back(
false);
208 switch (column_type_info.
get_type()) {
211 input_col.data.str_col.push_back(
221 input_col.data.str_col.push_back(ts.str_val);
224 LOG(
FATAL) <<
" trying to process a STRING transport type not handled "
240 input_col.nulls.push_back(
true);
241 input_col.data.int_col.push_back(0);
243 input_col.nulls.push_back(
false);
245 switch (column_type_info.
get_type()) {
248 input_col.data.int_col.push_back(d.
intval);
253 input_col.data.int_col.push_back(d.
bigintval);
259 input_col.data.int_col.push_back(d.
tinyintval);
264 input_col.data.int_col.push_back(d.
bigintval);
267 LOG(
FATAL) <<
" trying to process an INT transport type not handled "
275 input_col.nulls.push_back(
true);
276 input_col.data.real_col.push_back(0);
279 input_col.nulls.push_back(
false);
281 switch (column_type_info.
get_type()) {
283 input_col.data.real_col.push_back(d.
floatval);
286 input_col.data.real_col.push_back(d.
doubleval);
289 LOG(
FATAL) <<
" trying to process a REAL transport type not handled "
295 LOG(
FATAL) <<
"Trying to process an unsupported datatype, should be impossible";
304 std::vector<TStringValue> row,
307 uint64_t curr_col = 0;
308 for (TStringValue ts : row) {
312 std::vector<std::string> arr_ele;
314 ts.str_val, copy_params, arr_ele);
316 for (std::string item : arr_ele) {
317 boost::algorithm::trim(item);
320 tsa.is_null = (tsa.str_val.empty() || tsa.str_val == copy_params.
null_str);
334 }
catch (
const std::exception& e) {
337 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
338 <<
". Row discarded, issue at column : " << (curr_col + 1)
348 const std::string& user_name,
349 const std::string& passwd,
350 const std::string& db_name,
351 const std::string& table_name)
352 : user_name_(user_name)
355 , table_name_(table_name)
356 , conn_details_(conn_details) {
359 TTableDetails table_details;
371 for (TColumnType ct : row_desc_) {
376 for (TColumnType column : row_desc_) {
390 }
catch (TDBException& e) {
391 std::cerr << e.error_msg << std::endl;
392 }
catch (TException& te) {
393 std::cerr <<
"Thrift error on connect: " << te.what() << std::endl;
400 }
catch (TDBException& e) {
401 std::cerr << e.error_msg << std::endl;
402 }
catch (TException& te) {
403 std::cerr <<
"Thrift error on close: " << te.what() << std::endl;
410 std::cout <<
" Waiting " << copy_params.
retry_wait
411 <<
" secs to retry Inserts , will try " << (copy_params.
retry_count - tries)
412 <<
" times more " << std::endl;
413 std::this_thread::sleep_for(std::chrono::seconds(copy_params.
retry_wait));
422 for (
size_t tries = 0; tries < copy_params.
retry_count;
428 std::cout << nrows <<
" Rows Inserted, " << nskipped <<
" rows skipped."
438 }
catch (TDBException& e) {
439 std::cerr <<
"Exception trying to insert data " << e.error_msg << std::endl;
441 }
catch (TException& te) {
442 std::cerr <<
"Exception trying to insert data " << te.what() << std::endl;
446 std::cerr <<
"Retries exhausted program terminated" << std::endl;
TRowDescriptor get_row_descriptor()
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
void remove_partial_row(size_t failed_column, std::vector< SQLTypeInfo > column_type_info_vector, std::vector< TColumn > &input_col_vec)
std::vector< SQLTypeInfo > column_type_info_
std::shared_ptr< HeavyClient > client_
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
HOST DEVICE SQLTypes get_type() const
std::vector< TColumn > input_columns_
void populate_TColumn(TStringValue ts, SQLTypeInfo column_type_info, TColumn &input_col, const import_export::CopyParams ©_params)
std::vector< SQLTypeInfo > array_column_type_info_
RowToColumnLoader(const ThriftClientConnection &conn_details, const std::string &user_name, const std::string &passwd, const std::string &db_name, const std::string &table_name)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
void parse_string_array(const std::string &s, const import_export::CopyParams ©_params, std::vector< std::string > &string_vec, bool truncate_values)
Parses given string array and inserts into given vector of strings.
Datum StringToDatum(const std::string_view s, SQLTypeInfo &ti)
void wait_disconnect_reconnect_retry(size_t tries, import_export::CopyParams copy_params)
int get_precision() const
SQLTypeInfo create_sql_type_info_from_col_type(const TColumnType &ct)
SQLTypeInfo create_array_sql_type_info_from_col_type(const TColumnType &ct)
std::shared_ptr< TProtocol > get_protocol()
void createConnection(const ThriftClientConnection &con)
ThriftClientConnection conn_details_
SQLTypes get_sql_types(const TColumnType &ct)