24 #include <boost/algorithm/string.hpp>
25 #include <boost/algorithm/string/trim.hpp>
40 #include <boost/program_options.hpp>
42 #include <librdkafka/rdkafkacpp.h>
44 #define MAX_FIELD_LEN 20000
49 static bool run =
true;
58 static void part_list_print(
const std::vector<RdKafka::TopicPartition*>& partitions) {
59 for (
unsigned int i = 0; i < partitions.size(); i++) {
60 LOG(
INFO) <<
"\t" << partitions[i]->topic() <<
"[" << partitions[i]->partition()
67 RdKafka::ErrorCode err,
68 std::vector<RdKafka::TopicPartition*>& partitions)
override {
69 LOG(
INFO) <<
"RebalanceCb: " << RdKafka::err2str(err) <<
": ";
73 if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
74 consumer->assign(partitions);
87 const std::map<std::string,
88 std::pair<std::unique_ptr<boost::regex>,
89 std::unique_ptr<std::string>>>& transformations,
91 switch (message->err()) {
92 case RdKafka::ERR__TIMED_OUT:
93 VLOG(1) <<
" Timed out";
96 case RdKafka::ERR_NO_ERROR: {
99 VLOG(1) <<
"Read msg at offset " << message->offset();
100 RdKafka::MessageTimestamp ts;
101 ts = message->timestamp();
102 if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
103 std::string tsname =
"?";
104 if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {
105 tsname =
"create time";
106 }
else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {
107 tsname =
"log append time";
109 VLOG(1) <<
"Timestamp: " << tsname <<
" " << ts.timestamp << std::endl;
112 std::vector<char> buffer(message->len() + 2);
113 sprintf(buffer.data(),
115 static_cast<int>(message->len()),
116 static_cast<const char*>(message->payload()));
117 VLOG(1) <<
"Full Message received is :'" << buffer.data() <<
"'";
122 bool backEscape =
false;
127 const std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>*>
128 xforms(row_desc.size());
129 for (
size_t i = 0; i < row_desc.size(); i++) {
130 auto it = transformations.find(row_desc[i].col_name);
131 if (it != transformations.end()) {
132 xforms[i] = &(it->second);
138 std::vector<TStringValue>
141 for (
auto iit : buffer) {
143 bool end_of_field = (iit == copy_params.
delimiter);
148 end_of_row = (row_desc[row.size()].col_type.type != TDatumType::STR) ||
149 (row.size() == row_desc.size() - 1);
151 size_t l = copy_params.
null_str.size();
153 strncmp(field + field_i - l, copy_params.
null_str.c_str(), l) == 0) {
158 if (!end_of_field && !end_of_row) {
161 field[field_i++] = iit;
163 field[field_i] =
'\0';
166 ts.str_val = std::string(field);
167 ts.is_null = (ts.str_val.empty() || ts.str_val == copy_params.
null_str);
168 auto xform = row.size() < row_desc.size() ? xforms[row.size()] :
nullptr;
169 if (!ts.is_null && xform !=
nullptr) {
171 std::cout <<
"\ntransforming\n" << ts.str_val <<
"\nto\n";
174 boost::regex_replace(ts.str_val, *xform->first, *xform->second);
175 if (ts.str_val.empty()) {
179 std::cout << ts.str_val << std::endl;
184 if (end_of_row || (row.size() > row_desc.size())) {
191 }
else if (backEscape || !remove_quotes || iit !=
'\"') {
192 field[field_i++] = iit;
200 std::cerr <<
"String too long for buffer." << std::endl;
202 std::cerr << field << std::endl;
208 if (row.size() == row_desc.size()) {
211 if (!record_loaded) {
219 std::cerr <<
"Incorrect number of columns for row: ";
226 case RdKafka::ERR__PARTITION_EOF:
234 case RdKafka::ERR__UNKNOWN_TOPIC:
235 case RdKafka::ERR__UNKNOWN_PARTITION:
236 LOG(
ERROR) <<
"Consume failed: " << message->errstr() << std::endl;
242 LOG(
ERROR) <<
"Consume failed: " << message->errstr();
250 void consume_cb(RdKafka::Message& msg,
void* opaque)
override {
259 switch (event.type()) {
260 case RdKafka::Event::EVENT_ERROR:
261 LOG(
ERROR) <<
"ERROR (" << RdKafka::err2str(event.err()) <<
"): " << event.str();
262 if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) {
263 LOG(
ERROR) <<
"All brokers are down, we may need special handling here";
268 case RdKafka::Event::EVENT_STATS:
269 VLOG(2) <<
"\"STATS\": " <<
event.str();
272 case RdKafka::Event::EVENT_LOG:
273 LOG(
INFO) <<
"LOG-" <<
event.severity() <<
"-" <<
event.fac().c_str() <<
":"
274 <<
event.str().c_str();
277 case RdKafka::Event::EVENT_THROTTLE:
278 LOG(
INFO) <<
"THROTTLED: " <<
event.throttle_time() <<
"ms by "
279 <<
event.broker_name() <<
" id " << (int)event.broker_id();
283 LOG(
INFO) <<
"EVENT " <<
event.type() <<
" (" << RdKafka::err2str(event.err())
284 <<
"): " << event.str();
293 const std::map<std::string,
294 std::pair<std::unique_ptr<boost::regex>,
295 std::unique_ptr<std::string>>>& transformations,
298 std::string group_id,
300 std::string brokers) {
302 std::string topic_str;
305 std::vector<std::string> topics;
306 bool do_conf_dump =
false;
317 conf->set(
"rebalance_cb", &ex_rebalance_cb, errstr);
319 if (conf->set(
"group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) {
320 LOG(
FATAL) <<
"could not set group.id " << errstr;
323 if (conf->set(
"compression.codec",
"none", errstr) !=
324 RdKafka::Conf::CONF_OK) {
328 if (conf->set(
"statistics.interval.ms",
"1000", errstr) != RdKafka::Conf::CONF_OK) {
331 if (conf->set(
"enable.auto.commit",
"false", errstr) != RdKafka::Conf::CONF_OK) {
335 if (tconf->set(
"auto.offset.reset",
"earliest", errstr) != RdKafka::Conf::CONF_OK) {
339 if (tconf->set(
"enable.auto.commit",
"false", errstr) != RdKafka::Conf::CONF_OK) {
345 topics.push_back(topic);
347 LOG(
INFO) <<
"Version " << RdKafka::version_str().c_str();
349 LOG(
INFO) << RdKafka::get_debug_contexts().c_str();
351 conf->set(
"metadata.broker.list", brokers, errstr);
354 if (!debug.empty()) {
355 if (conf->set(
"debug", debug, errstr) != RdKafka::Conf::CONF_OK) {
363 if (conf->set(
"consume_cb", &consume_cb, errstr) != RdKafka::Conf::CONF_OK) {
371 if (conf->set(
"event_cb", &ex_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
375 if (conf->set(
"default_topic_conf", tconf, errstr) != RdKafka::Conf::CONF_OK) {
382 for (pass = 0; pass < 2; pass++) {
383 std::list<std::string>* dump;
386 LOG(
INFO) <<
"# Global config";
387 LOG(
INFO) <<
"===============";
389 dump = tconf->dump();
391 LOG(
INFO) <<
"===============";
394 for (std::list<std::string>::iterator it = dump->begin(); it != dump->end();) {
395 std::string ts = *it;
397 LOG(
INFO) << ts <<
" = " << *it;
400 LOG(
INFO) <<
"Dump config finished";
403 LOG(
INFO) <<
"FULL Dump config finished";
412 LOG(
ERROR) <<
"Failed to create consumer: " << errstr;
417 LOG(
INFO) <<
" Created consumer " << consumer->name();
422 RdKafka::ErrorCode err = consumer->subscribe(topics);
424 LOG(
FATAL) <<
"Failed to subscribe to " << topics.size()
425 <<
" topics: " << RdKafka::err2str(err);
431 size_t recv_rows = 0;
435 RdKafka::Message* msg = consumer->consume(10000);
436 if (msg->err() == RdKafka::ERR_NO_ERROR) {
439 msg_consume(msg, row_loader, copy_params, transformations, remove_quotes);
444 row_loader.
do_load(rows_loaded, skipped, copy_params);
447 consumer->commitSync();
465 LOG(
FATAL) <<
"Consumer shut down, probably due to an error please review logs";
476 int main(
int argc,
char** argv) {
477 std::string server_host(
"localhost");
481 bool skip_host_verify =
false;
482 std::string ca_cert_name{
""};
483 std::string table_name;
485 std::string user_name;
487 std::string group_id;
490 std::string delim_str(
","), nulls(
"\\N"), line_delim_str(
"\n"), quoted(
"false");
491 size_t batch_size = 10000;
492 size_t retry_count = 10;
493 size_t retry_wait = 5;
495 std::vector<std::string> xforms;
496 std::map<std::string,
497 std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>>
501 namespace po = boost::program_options;
503 po::options_description desc(
"Options");
504 desc.add_options()(
"help,h",
"Print help messages ");
506 "table", po::value<std::string>(&table_name)->
required(),
"Table Name");
508 "database", po::value<std::string>(&db_name)->
required(),
"Database Name");
510 "user,u", po::value<std::string>(&user_name)->
required(),
"User Name");
512 "passwd,p", po::value<std::string>(&passwd)->
required(),
"User Password");
513 desc.add_options()(
"host",
514 po::value<std::string>(&server_host)->default_value(server_host),
515 "HeavyDB Server Hostname");
517 "port", po::value<int>(&port)->default_value(port),
"HeavyDB Server Port Number");
518 desc.add_options()(
"http",
519 po::bool_switch(&http)->default_value(http)->implicit_value(
true),
520 "Use HTTP transport");
521 desc.add_options()(
"https",
522 po::bool_switch(&https)->default_value(https)->implicit_value(
true),
523 "Use HTTPS transport");
524 desc.add_options()(
"skip-verify",
525 po::bool_switch(&skip_host_verify)
526 ->default_value(skip_host_verify)
527 ->implicit_value(
true),
528 "Don't verify SSL certificate validity");
531 po::value<std::string>(&ca_cert_name)->default_value(ca_cert_name),
532 "Path to trusted server certificate. Initiates an encrypted connection");
533 desc.add_options()(
"delim",
534 po::value<std::string>(&delim_str)->default_value(delim_str),
536 desc.add_options()(
"null", po::value<std::string>(&nulls),
"NULL string");
537 desc.add_options()(
"line", po::value<std::string>(&line_delim_str),
"Line delimiter");
540 po::value<std::string>("ed),
541 "Whether the source contains quoted fields (true/false, default false)");
542 desc.add_options()(
"batch",
543 po::value<size_t>(&batch_size)->default_value(batch_size),
544 "Insert batch size");
545 desc.add_options()(
"retry_count",
546 po::value<size_t>(&retry_count)->default_value(retry_count),
547 "Number of time to retry an insert");
548 desc.add_options()(
"retry_wait",
549 po::value<size_t>(&retry_wait)->default_value(retry_wait),
550 "wait in secs between retries");
551 desc.add_options()(
"transform,t",
552 po::value<std::vector<std::string>>(&xforms)->multitoken(),
553 "Column Transformations");
554 desc.add_options()(
"print_error",
"Print Error Rows");
555 desc.add_options()(
"print_transform",
"Print Transformations");
556 desc.add_options()(
"topic",
557 po::value<std::string>(&topic)->
required(),
558 "Kafka topic to consume from ");
559 desc.add_options()(
"group-id",
560 po::value<std::string>(&group_id)->
required(),
561 "Group id this consumer is part of");
562 desc.add_options()(
"brokers",
563 po::value<std::string>(&brokers)->
required(),
564 "list of kafka brokers for topic");
566 po::positional_options_description positionalOptions;
567 positionalOptions.add(
"table", 1);
568 positionalOptions.add(
"database", 1);
574 po::variables_map vm;
577 po::store(po::command_line_parser(argc, argv)
579 .positional(positionalOptions)
582 if (vm.count(
"help")) {
583 std::cout <<
"Usage: <table name> <database name> {-u|--user} <user> {-p|--passwd} "
584 "<password> [{--host} "
585 "<hostname>][--port <port number>][--delim <delimiter>][--null <null "
586 "string>][--line <line "
587 "delimiter>][--batch <batch size>][{-t|--transform} transformation "
588 "[--quoted <true|false>] "
589 "...][--retry_count <num_of_retries>] [--retry_wait <wait in "
590 "secs>][--print_error][--print_transform]\n\n";
591 std::cout << desc << std::endl;
594 if (vm.count(
"print_error")) {
597 if (vm.count(
"print_transform")) {
602 }
catch (boost::program_options::error& e) {
603 std::cerr <<
"Usage Error: " << e.what() << std::endl;
613 }
else if (!ca_cert_name.empty()) {
619 char delim = delim_str[0];
621 if (delim_str.size() < 2 ||
622 (delim_str[1] !=
'x' && delim_str[1] !=
't' && delim_str[1] !=
'n')) {
623 std::cerr <<
"Incorrect delimiter string: " << delim_str << std::endl;
626 if (delim_str[1] ==
't') {
628 }
else if (delim_str[1] ==
'n') {
631 std::string d(delim_str);
633 delim = (char)std::stoi(d,
nullptr, 16);
636 if (isprint(delim)) {
637 std::cout <<
"Field Delimiter: " << delim << std::endl;
638 }
else if (delim ==
'\t') {
639 std::cout <<
"Field Delimiter: "
640 <<
"\\t" << std::endl;
641 }
else if (delim ==
'\n') {
642 std::cout <<
"Field Delimiter: "
646 std::cout <<
"Field Delimiter: \\x" << std::hex << (int)delim << std::endl;
648 char line_delim = line_delim_str[0];
649 if (line_delim ==
'\\') {
650 if (line_delim_str.size() < 2 ||
651 (line_delim_str[1] !=
'x' && line_delim_str[1] !=
't' &&
652 line_delim_str[1] !=
'n')) {
653 std::cerr <<
"Incorrect delimiter string: " << line_delim_str << std::endl;
656 if (line_delim_str[1] ==
't') {
658 }
else if (line_delim_str[1] ==
'n') {
661 std::string d(line_delim_str);
663 line_delim = (char)std::stoi(d,
nullptr, 16);
666 if (isprint(line_delim)) {
667 std::cout <<
"Line Delimiter: " << line_delim << std::endl;
668 }
else if (line_delim ==
'\t') {
669 std::cout <<
"Line Delimiter: "
670 <<
"\\t" << std::endl;
671 }
else if (line_delim ==
'\n') {
672 std::cout <<
"Line Delimiter: "
676 std::cout <<
"Line Delimiter: \\x" << std::hex << (int)line_delim << std::endl;
678 std::cout <<
"Null String: " << nulls << std::endl;
679 std::cout <<
"Insert Batch Size: " << std::dec << batch_size << std::endl;
681 if (quoted ==
"true") {
682 remove_quotes =
true;
685 for (
auto& t : xforms) {
686 auto n = t.find_first_of(
':');
687 if (
n == std::string::npos) {
688 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
692 std::string col_name = t.substr(0,
n);
693 if (t.size() <
n + 3 || t[
n + 1] !=
's' || t[
n + 2] !=
'/') {
694 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
699 auto n2 = t.find_first_of(
'/', n1);
700 if (n2 == std::string::npos) {
701 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
705 std::string regex_str = t.substr(n1, n2 - n1);
707 n2 = t.find_first_of(
'/', n1);
708 if (n2 == std::string::npos) {
709 std::cerr <<
"Transformation format: <column name>:s/<regex pattern>/<fmt string>/"
713 std::string fmt_str = t.substr(n1, n2 - n1);
714 std::cout <<
"transform " << col_name <<
": s/" << regex_str <<
"/" << fmt_str <<
"/"
716 transformations[col_name] =
717 std::pair<std::unique_ptr<boost::regex>, std::unique_ptr<std::string>>(
718 std::unique_ptr<boost::regex>(
new boost::regex(regex_str)),
719 std::unique_ptr<std::string>(
new std::string(fmt_str)));
723 delim, nulls, line_delim, batch_size, retry_count, retry_wait);
726 server_host, port, conn_type, skip_host_verify, ca_cert_name, ca_cert_name),
733 row_loader, transformations, copy_params, remove_quotes, group_id, topic, brokers);
TRowDescriptor get_row_descriptor()
std::string print_row_with_delim(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
void consume_cb(RdKafka::Message &msg, void *opaque) override
Constants for Builtin SQL Types supported by HEAVY.AI.
void do_load(int &nrows, int &nskipped, import_export::CopyParams copy_params)
std::pair< FILE *, std::string > create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
bool print_transformation
stuff(RowToColumnLoader &rl, import_export::CopyParams &cp)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
bool msg_consume(RdKafka::Message *message, RowToColumnLoader &row_loader, import_export::CopyParams copy_params, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const bool remove_quotes)
void init(LogOptions const &log_opts)
Utility Function to convert rows to input columns for loading via load_table_binary_columnar.
bool convert_string_to_column(std::vector< TStringValue > row, const import_export::CopyParams ©_params)
void kafka_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const import_export::CopyParams ©_params, const bool remove_quotes, std::string group_id, std::string topic, std::string brokers)
std::string remove_quotes(const std::string &value)
RowToColumnLoader row_loader
void event_cb(RdKafka::Event &event) override
boost::program_options::options_description const & get_options() const
static void part_list_print(const std::vector< RdKafka::TopicPartition * > &partitions)
import_export::CopyParams copy_params
void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< RdKafka::TopicPartition * > &partitions) override