OmniSciDB
a5dc49c757
|
Based on StreamInsert code but using binary columnar format for inserting a stream of rows with optional transformations from stdin to a DB table. More...
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <cstring>
#include <iostream>
#include <iterator>
#include <string>
#include "Logger/Logger.h"
#include "RowToColumnLoader.h"
#include "Shared/ThriftClient.h"
#include "Shared/clean_boost_regex.hpp"
#include "Shared/sqltypes.h"
#include <chrono>
#include <thread>
#include <boost/program_options.hpp>
#include <librdkafka/rdkafkacpp.h>
Go to the source code of this file.
Classes | |
class | RebalanceCb |
class | ConsumeCb |
class | EventCb |
struct | stuff |
Macros | |
#define | MAX_FIELD_LEN 20000 |
Functions | |
bool | msg_consume (RdKafka::Message *message, RowToColumnLoader &row_loader, import_export::CopyParams copy_params, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const bool remove_quotes) |
void | kafka_insert (RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const import_export::CopyParams ©_params, const bool remove_quotes, std::string group_id, std::string topic, std::string brokers) |
int | main (int argc, char **argv) |
Variables | |
bool | print_error_data = false |
bool | print_transformation = false |
static bool | run = true |
static bool | exit_eof = false |
static int | eof_cnt = 0 |
static int | partition_cnt = 0 |
static long | msg_cnt = 0 |
static int64_t | msg_bytes = 0 |
Based on StreamInsert code but using binary columnar format for inserting a stream of rows with optional transformations from stdin to a DB table.
Definition in file KafkaImporter.cpp.
#define MAX_FIELD_LEN 20000 |
Definition at line 44 of file KafkaImporter.cpp.
Referenced by msg_consume().
void kafka_insert | ( | RowToColumnLoader & | row_loader, |
const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> & | transformations, | ||
const import_export::CopyParams & | copy_params, | ||
const bool | remove_quotes, | ||
std::string | group_id, | ||
std::string | topic, | ||
std::string | brokers | ||
) |
Definition at line 291 of file KafkaImporter.cpp.
References import_export::CopyParams::batch_size, File_Namespace::create(), RowToColumnLoader::do_load(), logger::ERROR, logger::FATAL, logger::INFO, LOG, msg_bytes, msg_cnt, msg_consume(), run_benchmark_import::rows_loaded, run, and setup::version.
Referenced by main().
int main | ( | int | argc, |
char ** | argv | ||
) |
Definition at line 476 of file KafkaImporter.cpp.
References BINARY, BINARY_SSL, logger::LogOptions::get_options(), HTTP, HTTPS, logger::init(), kafka_insert(), logger::LogOptions::max_files_, anonymous_namespace{Utm.h}::n, print_error_data, print_transformation, foreign_storage::anonymous_namespace{LogFileBufferParser.cpp}::remove_quotes(), run_benchmark_import::required, and run.
bool msg_consume | ( | RdKafka::Message * | message, |
RowToColumnLoader & | row_loader, | ||
import_export::CopyParams | copy_params, | ||
const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> & | transformations, | ||
const bool | remove_quotes | ||
) |
Definition at line 84 of file KafkaImporter.cpp.
References RowToColumnLoader::convert_string_to_column(), import_export::CopyParams::delimiter, eof_cnt, logger::ERROR, exit_eof, field(), RowToColumnLoader::get_row_descriptor(), import_export::CopyParams::line_delim, LOG, MAX_FIELD_LEN, msg_bytes, msg_cnt, import_export::CopyParams::null_str, partition_cnt, print_error_data, RowToColumnLoader::print_row_with_delim(), print_transformation, run, and VLOG.
Referenced by kafka_insert().
|
static |
Definition at line 51 of file KafkaImporter.cpp.
Referenced by msg_consume(), and RebalanceCb::rebalance_cb().
|
static |
Definition at line 50 of file KafkaImporter.cpp.
Referenced by msg_consume().
|
static |
Definition at line 54 of file KafkaImporter.cpp.
Referenced by kafka_insert(), and msg_consume().
|
static |
Definition at line 53 of file KafkaImporter.cpp.
Referenced by kafka_insert(), and msg_consume().
|
static |
Definition at line 52 of file KafkaImporter.cpp.
Referenced by msg_consume(), and RebalanceCb::rebalance_cb().
bool print_error_data = false |
Definition at line 46 of file KafkaImporter.cpp.
Referenced by main(), msg_consume(), and stream_insert().
bool print_transformation = false |
Definition at line 47 of file KafkaImporter.cpp.
Referenced by main(), msg_consume(), and stream_insert().
|
static |
Definition at line 49 of file KafkaImporter.cpp.
Referenced by com.mapd.tests.ConcurrencyTest::createBarrier(), TableArchiver::dumpTable(), EventCb::event_cb(), kafka_insert(), main(), msg_consume(), CommandLineOptions::parse_command_line(), TableArchiver::restoreTable(), com.mapd.tests.ForeignStorageConcurrencyTest::runTest(), com.mapd.tests.ReductionConcurrencyTest::runTest(), com.mapd.tests.DistributedConcurrencyTest::runTest(), com.mapd.tests.SelectUpdateDeleteDifferentTables::runTest(), com.mapd.tests.AlterDropTruncateValidateConcurrencyTest::runTest(), com.mapd.tests.CtasItasSelectUpdelConcurrencyTest::runTest(), com.mapd.tests.ForeignTableRefreshConcurrencyTest::runTest(), com.mapd.tests.ImportAlterValidateSelectConcurrencyTest::runTest(), com.mapd.tests.UpdateDeleteInsertConcurrencyTest::runTest(), com.mapd.tests.RuntimeInterruptConcurrencyTest::runTest(), com.mapd.tests.CatalogConcurrencyTest::runTest(), com.mapd.tests.EagainConcurrencyTest::runTest(), com.mapd.tests.SelectCopyFromDeleteConcurrencyTest::runTest(), anonymous_namespace{TableArchiver.cpp}::simple_file_cat(), com.mapd.parser.server.test.TestDBServer::testThreadedCall(), com.mapd.parser.server.test.TestServer::testThreadedCall(), and com.mapd.tests.CalciteViewsConcurrencyTest::testViewsResolutionConcurrency().