19 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
23 #ifdef HAVE_THRIFT_THREADFACTORY
24 #include <thrift/concurrency/ThreadFactory.h>
26 #include <thrift/concurrency/PlatformThreadFactory.h>
29 #include <thrift/concurrency/ThreadManager.h>
30 #include <thrift/protocol/TBinaryProtocol.h>
31 #include <thrift/server/TThreadedServer.h>
32 #include <thrift/transport/TBufferTransports.h>
33 #include <thrift/transport/THttpServer.h>
34 #include <thrift/transport/TSSLServerSocket.h>
35 #include <thrift/transport/TSSLSocket.h>
36 #include <thrift/transport/TServerSocket.h>
46 #include <boost/algorithm/string.hpp>
47 #include <boost/algorithm/string/trim.hpp>
48 #include <boost/filesystem.hpp>
49 #include <boost/locale/generator.hpp>
50 #include <boost/make_shared.hpp>
51 #include <boost/program_options.hpp>
54 #include <tbb/global_control.h>
78 using namespace ::apache::thrift;
79 using namespace ::apache::thrift::concurrency;
80 using namespace ::apache::thrift::protocol;
81 using namespace ::apache::thrift::server;
82 using namespace ::apache::thrift::transport;
107 signal(signum, handler);
109 struct sigaction act;
110 memset(&act, 0,
sizeof(act));
111 if (handler != SIG_DFL && handler != SIG_IGN) {
113 sigfillset(&act.sa_mask);
115 act.sa_handler = handler;
116 sigaction(signum, &act, NULL);
129 int expected_signal{-1};
130 if (!
g_saw_signal.compare_exchange_strong(expected_signal, signum)) {
145 if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
152 std::this_thread::sleep_for(std::chrono::seconds(2));
162 kill(getpid(), signum);
164 std::this_thread::sleep_for(std::chrono::seconds(5));
191 void start_server(std::shared_ptr<TThreadedServer> server,
const int port) {
195 throw std::runtime_error(std::string(
"Thrift server exited: ") +
196 std::strerror(errno));
198 }
catch (std::exception& e) {
199 LOG(
ERROR) <<
"Exception: " << e.what() <<
": port " << port << std::endl;
205 if (sessionId != g_warmup_handler->getInvalidSessionId()) {
207 g_warmup_handler->disconnect(sessionId);
209 LOG(
ERROR) <<
"Failed to disconnect warmup session, possible failure to run warmup "
216 std::string base_path,
217 std::string query_file_path) {
219 if (query_file_path.empty()) {
222 if (handler->isAggregator()) {
223 LOG(
INFO) <<
"Skipping warmup query execution on the aggregator, queries should be "
224 "run directly on the leaf nodes.";
228 LOG(
INFO) <<
"Running DB warmup with queries from " << query_file_path;
230 g_warmup_handler = handler;
232 std::string user_keyword, user_name, db_name;
233 std::ifstream query_file;
236 TSessionId sessionId = g_warmup_handler->getInvalidSessionId();
239 query_file.open(query_file_path);
240 while (std::getline(query_file, db_info)) {
241 if (db_info.length() == 0) {
244 std::istringstream iss(db_info);
245 iss >> user_keyword >> user_name >> db_name;
246 if (user_keyword.compare(0, 4,
"USER") == 0) {
249 g_warmup_handler->super_user_rights_ =
true;
250 g_warmup_handler->connect(sessionId, user_name,
"", db_name);
251 g_warmup_handler->super_user_rights_ =
false;
255 std::string single_query;
256 while (std::getline(query_file, single_query)) {
257 boost::algorithm::trim(single_query);
258 if (single_query.length() == 0 || single_query[0] ==
'-') {
261 if (single_query[0] ==
'}') {
262 single_query.clear();
265 if (single_query.find(
';') == single_query.npos) {
266 std::string multiline_query;
267 std::getline(query_file, multiline_query,
';');
268 single_query += multiline_query;
272 g_warmup_handler->sql_execute(ret, sessionId, single_query,
true,
"", -1, -1);
274 LOG(
WARNING) <<
"Exception while executing '" << single_query
277 single_query.clear();
281 g_warmup_handler->disconnect(sessionId);
282 sessionId = g_warmup_handler->getInvalidSessionId();
284 LOG(
WARNING) <<
"\nSyntax error in the file: " << query_file_path.c_str()
285 <<
" Missing expected keyword USER. Following line will be ignored: "
286 << db_info.c_str() << std::endl;
290 }
catch (
const std::exception& e) {
292 <<
"Exception while executing warmup queries. "
293 <<
"Warmup may not be fully completed. Will proceed nevertheless.\nError was: "
303 if (
auto thrift_http_server = g_thrift_http_server; thrift_http_server) {
304 thrift_http_server->stop();
306 g_thrift_http_server.reset();
308 if (
auto thrift_http_binary_server = g_thrift_http_binary_server;
309 thrift_http_binary_server) {
310 thrift_http_binary_server->stop();
312 g_thrift_http_binary_server.reset();
314 if (
auto thrift_tcp_server = g_thrift_tcp_server; thrift_tcp_server) {
315 thrift_tcp_server->stop();
317 g_thrift_tcp_server.reset();
325 int result = pthread_sigmask(SIG_BLOCK, &set, NULL);
327 throw std::runtime_error(
"heartbeat() thread startup failed");
332 VLOG(1) <<
"heartbeat thread starting";
334 using namespace std::chrono;
335 std::this_thread::sleep_for(1s);
337 VLOG(1) <<
"heartbeat thread exiting";
341 if (signum >= 1 && signum != SIGTERM) {
342 LOG(
INFO) <<
"Interrupt signal (" << signum <<
") received.";
346 if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
352 if (
auto db_handler = g_db_handler; db_handler) {
353 db_handler->emergency_shutdown();
366 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
368 class UnboundedTBufferedTransportFactory :
public TBufferedTransportFactory {
370 UnboundedTBufferedTransportFactory() : TBufferedTransportFactory() {}
372 std::shared_ptr<TTransport> getTransport(
373 std::shared_ptr<TTransport> transport)
override {
378 class UnboundedTHttpServerTransportFactory :
public THttpServerTransportFactory {
380 UnboundedTHttpServerTransportFactory() : THttpServerTransportFactory() {}
382 std::shared_ptr<TTransport> getTransport(
383 std::shared_ptr<TTransport> transport)
override {
391 bool start_http_server =
true) {
393 LOG(
INFO) <<
"HeavyDB starting up";
397 LOG(
INFO) <<
"Initializing TBB with " << num_cpu_threads <<
" threads.";
398 tbb::global_control tbb_control(tbb::global_control::max_allowed_parallelism,
402 LOG(
INFO) <<
"TBB max concurrency: " << tbb_max_concurrency <<
" threads.";
406 #endif // HAVE_AWS_S3
407 std::set<std::unique_ptr<std::thread>> server_threads;
408 auto wait_for_server_threads = [&] {
409 for (
auto& th : server_threads) {
412 }
catch (
const std::system_error& e) {
413 if (e.code() != std::errc::invalid_argument) {
414 LOG(
WARNING) <<
"std::thread join failed: " << e.what();
416 }
catch (
const std::exception& e) {
417 LOG(
WARNING) <<
"std::thread join failed: " << e.what();
426 LOG(
INFO) <<
"HeavyDB shutting down";
436 g_db_handler.reset();
438 wait_for_server_threads();
442 #endif // HAVE_AWS_S3
449 const unsigned int wait_interval =
451 server_threads.insert(std::make_unique<std::thread>(
456 server_threads.insert(std::make_unique<std::thread>(
heartbeat));
458 if (!g_enable_thrift_logs) {
459 apache::thrift::GlobalOutput.setOutputFunction([](
const char* msg) {});
467 std::make_shared<DBHandler>(prog_config_opts.
db_leaves,
496 prog_config_opts.libgeos_so_filename,
498 #ifdef HAVE_TORCH_TFS
499 prog_config_opts.torch_lib_path,
505 <<
"No High Availability module available, please contact OmniSci support";
507 }
catch (
const std::exception& e) {
508 LOG(
FATAL) <<
"Failed to initialize service handler: " << e.what();
519 if (g_enable_fsi && g_enable_foreign_table_scheduled_refresh) {
524 std::shared_ptr<TServerSocket> tcp_socket;
525 std::shared_ptr<TServerSocket> http_socket;
526 std::shared_ptr<TServerSocket> http_binary_socket;
531 auto sslSocketFactory = std::make_shared<TSSLSocketFactory>(SSLProtocol::SSLTLS);
532 sslSocketFactory->loadCertificate(
534 sslSocketFactory->loadPrivateKey(
537 sslSocketFactory->authenticate(
true);
539 sslSocketFactory->authenticate(
false);
541 sslSocketFactory->ciphers(
"ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
542 tcp_socket = std::make_shared<TSSLServerSocket>(
544 if (start_http_server) {
545 http_socket = std::make_shared<TSSLServerSocket>(prog_config_opts.
http_port,
549 http_binary_socket = std::make_shared<TSSLServerSocket>(
552 LOG(
INFO) <<
" HeavyDB server using encrypted connection. Cert file ["
558 LOG(
INFO) <<
" HeavyDB server using unencrypted connection";
559 tcp_socket = std::make_shared<TServerSocket>(
561 if (start_http_server) {
562 http_socket = std::make_shared<TServerSocket>(prog_config_opts.
http_port);
571 std::shared_ptr<TProcessor> processor{std::make_shared<TrackingProcessor>(
575 std::shared_ptr<TServerTransport> tcp_st = tcp_socket;
576 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
577 std::shared_ptr<TTransportFactory> tcp_tf{
578 std::make_shared<UnboundedTBufferedTransportFactory>()};
580 std::shared_ptr<TTransportFactory> tcp_tf{
581 std::make_shared<TBufferedTransportFactory>()};
583 std::shared_ptr<TProtocolFactory> tcp_pf{std::make_shared<TBinaryProtocolFactory>()};
584 g_thrift_tcp_server.reset(
new TThreadedServer(processor, tcp_st, tcp_tf, tcp_pf));
585 server_threads.insert(std::make_unique<std::thread>(
591 if (start_http_server) {
592 std::shared_ptr<TServerTransport> http_st = http_socket;
593 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
594 std::shared_ptr<TTransportFactory> http_tf{
595 std::make_shared<UnboundedTHttpServerTransportFactory>()};
597 std::shared_ptr<TTransportFactory> http_tf{
598 std::make_shared<THttpServerTransportFactory>()};
600 std::shared_ptr<TProtocolFactory> http_pf{std::make_shared<TJSONProtocolFactory>()};
601 g_thrift_http_server.reset(
new TThreadedServer(processor, http_st, http_tf, http_pf));
602 server_threads.insert(std::make_unique<std::thread>(
608 std::shared_ptr<TServerTransport> http_binary_st = http_binary_socket;
609 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
610 std::shared_ptr<TTransportFactory> http_binary_tf{
611 std::make_shared<UnboundedTHttpServerTransportFactory>()};
613 std::shared_ptr<TTransportFactory> http_binary_tf{
614 std::make_shared<THttpServerTransportFactory>()};
616 std::shared_ptr<TProtocolFactory> http_binary_pf{
617 std::make_shared<TBinaryProtocolFactory>()};
618 g_thrift_http_binary_server.reset(
619 new TThreadedServer(processor, http_binary_st, http_binary_tf, http_binary_pf));
620 server_threads.insert(std::make_unique<std::thread>(
632 wait_for_server_threads();
636 if (signum <= 0 || signum == SIGTERM) {
645 VLOG(1) <<
"sysconf(_SC_PAGE_SIZE): " << sysconf(_SC_PAGE_SIZE);
651 int main(
int argc,
char** argv) {
652 bool has_clust_topo =
false;
657 if (
auto return_code =
662 if (!has_clust_topo) {
668 }
catch (std::runtime_error& e) {
669 std::cerr <<
"Server Error: " << e.what() << std::endl;
671 }
catch (boost::program_options::error& e) {
672 std::cerr <<
"Usage Error: " << e.what() << std::endl;
const std::string kDataDirectoryName
void run_warmup_queries(std::shared_ptr< DBHandler > handler, std::string base_path, std::string query_file_path)
int idle_session_duration
unsigned renderer_vulkan_timeout_ms
std::vector< LeafHostInfo > string_leaves
std::string udf_compiler_path
std::shared_ptr< DBHandler > g_db_handler
std::string udf_file_name
bool renderer_enable_slab_allocation
shared utility for the db server and string dictionary server to remove old files ...
void start_server(std::shared_ptr< TThreadedServer > server, const int port)
void checkDropRenderGroupColumnsMigration() const
std::shared_ptr< TThreadedServer > g_thrift_http_binary_server
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
bool render_compositor_use_last_gpu
bool renderer_prefer_igpu
bool renderer_use_parallel_executors
std::atomic< int > g_saw_signal
size_t max_concurrent_render_sessions
static SysCatalog & instance()
singleton class to handle concurrancy and state for blosc library. A C++ wrapper over a pure C librar...
tbb::task_arena g_tbb_arena
size_t num_reader_threads
std::shared_ptr< apache::thrift::TConfiguration > default_tconfig()
std::vector< LeafHostInfo > db_leaves
int startHeavyDBServer(CommandLineOptions &prog_config_opts, bool start_http_server=true)
bool g_enable_http_binary_server
bool enable_auto_clear_render_mem
std::shared_ptr< TThreadedServer > g_thrift_http_server
int render_oom_retry_threshold
std::string db_query_file
AuthMetadata authMetadata
static void resolveIncompleteAlterColumnCommandsForAllCatalogs()
static void start(std::atomic< bool > &is_program_running)
void validate_base_path()
std::vector< std::string > udf_compiler_options
bool g_enable_foreign_table_scheduled_refresh
bool ssl_transport_client_auth
void register_signal_handler(int signum, void(*handler)(int))
std::shared_ptr< DBHandler > g_warmup_handler
std::atomic< bool > g_running
File_Namespace::DiskCacheConfig disk_cache_config
void file_delete(std::atomic< bool > &program_is_running, const unsigned int wait_interval_seconds, const std::string base_path)
bool g_enable_thrift_logs
void heavydb_signal_handler(int signum)
void register_signal_handlers()
void releaseWarmupSession(TSessionId &sessionId, std::ifstream &query_file) noexcept
bool enable_legacy_syntax
std::string master_address
std::shared_ptr< TThreadedServer > g_thrift_tcp_server
SystemParameters system_parameters
std::string ssl_cert_file