24 #include <boost/algorithm/string.hpp>
25 #include <boost/algorithm/string/trim.hpp>
40 #include <boost/program_options.hpp>
42 #define MAX_FIELD_LEN 20000
51 const std::map<std::string,
52 std::pair<std::unique_ptr<boost::regex>,
53 std::unique_ptr<std::string>>>& transformations,
56 std::ios_base::sync_with_stdio(
false);
57 std::istream_iterator<char> eos;
58 std::cin >> std::noskipws;
59 std::istream_iterator<char> iit(std::cin);
66 bool backEscape =
false;
71 const std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>*>
72 xforms(row_desc.size());
73 for (
size_t i = 0; i < row_desc.size(); i++) {
74 auto it = transformations.find(row_desc[i].col_name);
75 if (it != transformations.end()) {
76 xforms[i] = &(it->second);
82 std::vector<TStringValue> row;
89 bool end_of_field = (*iit == copy_params.
delimiter);
94 end_of_row = (row_desc[row.size()].col_type.type != TDatumType::STR) ||
95 (row.size() == row_desc.size() - 1);
97 size_t l = copy_params.
null_str.size();
99 strncmp(field + field_i - l, copy_params.
null_str.c_str(), l) == 0) {
104 if (!end_of_field && !end_of_row) {
107 field[field_i++] = *iit;
109 field[field_i] =
'\0';
112 ts.str_val = std::string(field);
113 ts.is_null = (ts.str_val.empty() || ts.str_val == copy_params.
null_str);
114 auto xform = row.size() < row_desc.size() ? xforms[row.size()] :
nullptr;
115 if (!ts.is_null && xform !=
nullptr) {
117 std::cout <<
"\ntransforming\n" << ts.str_val <<
"\nto\n";
119 ts.str_val = boost::regex_replace(ts.str_val, *xform->first, *xform->second);
120 if (ts.str_val.empty()) {
124 std::cout << ts.str_val << std::endl;
129 if (end_of_row || (row.size() > row_desc.size())) {
136 }
else if (backEscape || !remove_quotes || *iit !=
'\"') {
137 field[field_i++] = *iit;
145 std::cerr <<
"String too long for buffer." << std::endl;
147 std::cerr << field << std::endl;
154 if (row.size() == row_desc.size()) {
158 if (!record_loaded) {
163 if (read_rows % copy_params.
batch_size == 0) {
164 row_loader.
do_load(nrows, nskipped, copy_params);
169 std::cerr <<
"Incorrect number of columns for row: ";
172 if (row.size() > row_desc.size()) {
183 if (read_rows % copy_params.
batch_size != 0) {
184 LOG(
INFO) <<
" read_rows " << read_rows;
185 row_loader.
do_load(nrows, nskipped, copy_params);
189 int main(
int argc,
char** argv) {
190 std::string server_host(
"localhost");
194 bool skip_host_verify =
false;
195 std::string ca_cert_name{
""};
196 std::string table_name;
198 std::string user_name;
200 std::string delim_str(
","), nulls(
"\\N"), line_delim_str(
"\n"), quoted(
"false");
201 size_t batch_size = 10000;
202 size_t retry_count = 10;
203 size_t retry_wait = 5;
205 std::vector<std::string> xforms;
206 std::map<std::string,
207 std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>>
211 namespace po = boost::program_options;
213 po::options_description desc(
"Options");
214 desc.add_options()(
"help,h",
"Print help messages ");
216 "table", po::value<std::string>(&table_name)->
required(),
"Table Name");
218 "database", po::value<std::string>(&db_name)->
required(),
"Database Name");
220 "user,u", po::value<std::string>(&user_name)->
required(),
"User Name");
222 "passwd,p", po::value<std::string>(&passwd)->
required(),
"User Password");
223 desc.add_options()(
"host",
224 po::value<std::string>(&server_host)->default_value(server_host),
225 "HeavyDB Server Hostname");
227 "port", po::value<int>(&port)->default_value(port),
"HeavyDB Server Port Number");
228 desc.add_options()(
"http",
229 po::bool_switch(&http)->default_value(http)->implicit_value(
true),
230 "Use HTTP transport");
231 desc.add_options()(
"https",
232 po::bool_switch(&https)->default_value(https)->implicit_value(
true),
233 "Use HTTPS transport");
234 desc.add_options()(
"skip-verify",
235 po::bool_switch(&skip_host_verify)
236 ->default_value(skip_host_verify)
237 ->implicit_value(
true),
238 "Don't verify SSL certificate validity");
241 po::value<std::string>(&ca_cert_name)->default_value(ca_cert_name),
242 "Path to trusted server certificate. Initiates an encrypted connection");
243 desc.add_options()(
"delim",
244 po::value<std::string>(&delim_str)->default_value(delim_str),
246 desc.add_options()(
"null", po::value<std::string>(&nulls),
"NULL string");
247 desc.add_options()(
"line", po::value<std::string>(&line_delim_str),
"Line delimiter");
250 po::value<std::string>("ed),
251 "Whether the source contains quoted fields (true/false, default false)");
252 desc.add_options()(
"batch",
253 po::value<size_t>(&batch_size)->default_value(batch_size),
254 "Insert batch size");
255 desc.add_options()(
"retry_count",
256 po::value<size_t>(&retry_count)->default_value(retry_count),
257 "Number of time to retry an insert");
258 desc.add_options()(
"retry_wait",
259 po::value<size_t>(&retry_wait)->default_value(retry_wait),
260 "wait in secs between retries");
261 desc.add_options()(
"transform,t",
262 po::value<std::vector<std::string>>(&xforms)->multitoken(),
263 "Column Transformations");
264 desc.add_options()(
"print_error",
"Print Error Rows");
265 desc.add_options()(
"print_transform",
"Print Transformations");
267 po::positional_options_description positionalOptions;
268 positionalOptions.add(
"table", 1);
269 positionalOptions.add(
"database", 1);
275 po::variables_map vm;
278 po::store(po::command_line_parser(argc, argv)
280 .positional(positionalOptions)
283 if (vm.count(
"help")) {
284 std::cout <<
"Usage: <table name> <database name> {-u|--user} <user> {-p|--passwd} "
285 "<password> [{--host} "
286 "<hostname>][--port <port number>][--delim <delimiter>][--null <null "
287 "string>][--line <line "
288 "delimiter>][--batch <batch size>][{-t|--transform} transformation "
289 "[--quoted <true|false>] "
290 "...][--retry_count <num_of_retries>] [--retry_wait <wait in "
291 "secs>][--print_error][--print_transform]\n\n";
292 std::cout << desc << std::endl;
295 if (vm.count(
"print_error")) {
298 if (vm.count(
"print_transform")) {
303 }
catch (boost::program_options::error& e) {
304 std::cerr <<
"Usage Error: " << e.what() << std::endl;
314 }
else if (!ca_cert_name.empty()) {
320 char delim = delim_str[0];
322 if (delim_str.size() < 2 ||
323 (delim_str[1] !=
'x' && delim_str[1] !=
't' && delim_str[1] !=
'n')) {
324 std::cerr <<
"Incorrect delimiter string: " << delim_str << std::endl;
327 if (delim_str[1] ==
't') {
329 }
else if (delim_str[1] ==
'n') {
332 std::string d(delim_str);
334 delim = (char)std::stoi(d,
nullptr, 16);
337 if (isprint(delim)) {
338 std::cout <<
"Field Delimiter: " << delim << std::endl;
339 }
else if (delim ==
'\t') {
340 std::cout <<
"Field Delimiter: "
341 <<
"\\t" << std::endl;
342 }
else if (delim ==
'\n') {
343 std::cout <<
"Field Delimiter: "
347 std::cout <<
"Field Delimiter: \\x" << std::hex << (int)delim << std::endl;
349 char line_delim = line_delim_str[0];
350 if (line_delim ==
'\\') {
351 if (line_delim_str.size() < 2 ||
352 (line_delim_str[1] !=
'x' && line_delim_str[1] !=
't' &&
353 line_delim_str[1] !=
'n')) {
354 std::cerr <<
"Incorrect delimiter string: " << line_delim_str << std::endl;
357 if (line_delim_str[1] ==
't') {
359 }
else if (line_delim_str[1] ==
'n') {
362 std::string d(line_delim_str);
364 line_delim = (char)std::stoi(d,
nullptr, 16);
367 if (isprint(line_delim)) {
368 std::cout <<
"Line Delimiter: " << line_delim << std::endl;
369 }
else if (line_delim ==
'\t') {
370 std::cout <<
"Line Delimiter: "
371 <<
"\\t" << std::endl;
372 }
else if (line_delim ==
'\n') {
373 std::cout <<
"Line Delimiter: "
377 std::cout <<
"Line Delimiter: \\x" << std::hex << (int)line_delim << std::endl;
379 std::cout <<
"Null String: " << nulls << std::endl;
380 std::cout <<
"Insert Batch Size: " << std::dec << batch_size << std::endl;
382 if (quoted ==
"true") {
383 remove_quotes =
true;
386 for (
auto& t : xforms) {
387 auto n = t.find_first_of(
':');
388 if (
n == std::string::npos) {
389 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
393 std::string col_name = t.substr(0,
n);
394 if (t.size() <
n + 3 || t[
n + 1] !=
's' || t[
n + 2] !=
'/') {
395 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
400 auto n2 = t.find_first_of(
'/', n1);
401 if (n2 == std::string::npos) {
402 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
406 std::string regex_str = t.substr(n1, n2 - n1);
408 n2 = t.find_first_of(
'/', n1);
409 if (n2 == std::string::npos) {
410 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
414 std::string fmt_str = t.substr(n1, n2 - n1);
415 std::cout <<
"transform " << col_name <<
": s/" << regex_str <<
"/" << fmt_str <<
"/"
417 transformations[col_name] =
418 std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>(
419 std::unique_ptr<boost::regex>(
new boost::regex(regex_str)),
420 std::unique_ptr<std::string>(
new std::string(fmt_str)));
424 delim, nulls, line_delim, batch_size, retry_count, retry_wait);
427 server_host, port, conn_type, skip_host_verify, ca_cert_name, ca_cert_name),
433 stream_insert(row_loader, transformations, copy_params, remove_quotes);
TRowDescriptor get_row_descriptor()
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
Constants for Builtin SQL Types supported by HEAVY.AI.
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
bool print_transformation
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
void init(LogOptions const &log_opts)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
std::string remove_quotes(const std::string &value)
void stream_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const import_export::CopyParams ©_params, const bool remove_quotes)
boost::program_options::options_description const & get_options() const