34 #include <thrift/protocol/TBinaryProtocol.h>
35 #include <thrift/transport/TSocket.h>
36 #include <thrift/transport/TTransportUtils.h>
37 #include <type_traits>
43 #include "gen-cpp/CalciteServer.h"
45 #include "rapidjson/document.h"
49 using namespace rapidjson;
50 using namespace apache::thrift;
51 using namespace apache::thrift::protocol;
52 using namespace apache::thrift::transport;
55 template <
typename XDEBUG_OPTION,
56 typename REMOTE_DEBUG_OPTION,
57 typename... REMAINING_ARGS>
59 XDEBUG_OPTION&& x_debug,
60 REMOTE_DEBUG_OPTION&& remote_debug,
61 REMAINING_ARGS&&... standard_args) {
62 #ifdef ENABLE_JAVA_REMOTE_DEBUG
64 path, x_debug, remote_debug, std::forward<REMAINING_ARGS>(standard_args)...);
66 return execlp(path, std::forward<REMAINING_ARGS>(standard_args)...);
73 const std::string& data_dir,
74 const size_t calcite_max_mem,
75 const std::string& ssl_trust_store,
76 const std::string& ssl_trust_password_X,
77 const std::string& ssl_keystore,
78 const std::string& ssl_keystore_password_X,
79 const std::string& ssl_key_file,
80 const std::string& db_config_file,
81 const std::string& udf_filename) {
83 std::string
const xDebug =
"-Xdebug";
84 std::string
const remoteDebug =
85 "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005";
87 std::string jarP =
"-jar";
89 root_abs_path +
"/bin/calcite-1.0-SNAPSHOT-jar-with-dependencies.jar";
90 std::string extensionsP =
"-e";
91 std::string extensionsD = root_abs_path +
"/QueryEngine/";
92 std::string dataP =
"-d";
93 std::string dataD = data_dir;
94 std::string localPortP =
"-p";
96 std::string dbPortP =
"-m";
98 std::string TrustStoreP =
"-T";
99 std::string TrustPasswdP =
"-P";
100 std::string ConfigFileP =
"-c";
101 std::string KeyStoreP =
"-Y";
102 std::string KeyStorePasswdP =
"-Z";
104 std::string logDirectory =
106 std::string userDefinedFunctionsP =
"";
107 std::string userDefinedFunctionsD =
"";
109 if (!udf_filename.empty()) {
110 userDefinedFunctionsP +=
"-u";
111 userDefinedFunctionsD += udf_filename;
116 std::string key_store_password = (db_config_file ==
"") ? ssl_keystore_password_X :
"";
117 std::string trust_store_password = (db_config_file ==
"") ? ssl_trust_password_X :
"";
120 std::vector<std::string> args_vec;
121 args_vec.push_back(
"java");
122 args_vec.push_back(xDebug);
123 args_vec.push_back(remoteDebug);
124 args_vec.push_back(xmxP);
125 args_vec.push_back(logDirectory);
126 args_vec.push_back(jarP);
127 args_vec.push_back(jarD);
128 args_vec.push_back(extensionsP);
129 args_vec.push_back(extensionsD);
130 args_vec.push_back(dataP);
131 args_vec.push_back(dataD);
132 args_vec.push_back(localPortP);
133 args_vec.push_back(localPortD);
134 args_vec.push_back(dbPortP);
135 args_vec.push_back(dbPortD);
136 if (!ssl_trust_store.empty()) {
137 args_vec.push_back(TrustStoreP);
138 args_vec.push_back(ssl_trust_store);
140 if (!trust_store_password.empty()) {
141 args_vec.push_back(TrustPasswdP);
142 args_vec.push_back(trust_store_password);
144 if (!ssl_keystore.empty()) {
145 args_vec.push_back(KeyStoreP);
146 args_vec.push_back(ssl_keystore);
148 if (!key_store_password.empty()) {
149 args_vec.push_back(KeyStorePasswdP);
150 args_vec.push_back(key_store_password);
152 if (!db_config_file.empty()) {
153 args_vec.push_back(ConfigFileP);
154 args_vec.push_back(db_config_file);
157 STARTUPINFO startup_info;
158 PROCESS_INFORMATION proc_info;
159 ZeroMemory(&startup_info,
sizeof(startup_info));
160 startup_info.cb =
sizeof(startup_info);
161 ZeroMemory(&proc_info,
sizeof(proc_info));
163 std::wstring wargs = std::wstring(
args.begin(),
args.end());
164 const auto ret = CreateProcess(NULL,
165 (LPWSTR)wargs.c_str(),
175 LOG(
FATAL) <<
"Failed to start Calcite server " << GetLastError();
182 if (udf_filename.empty()) {
187 logDirectory.c_str(),
199 ssl_trust_store.c_str(),
200 TrustPasswdP.c_str(),
201 trust_store_password.c_str(),
203 ssl_keystore.c_str(),
204 KeyStorePasswdP.c_str(),
205 key_store_password.c_str(),
207 db_config_file.c_str(),
214 logDirectory.c_str(),
226 ssl_trust_store.c_str(),
227 TrustPasswdP.c_str(),
228 trust_store_password.c_str(),
230 ssl_keystore.c_str(),
231 KeyStorePasswdP.c_str(),
232 key_store_password.c_str(),
234 db_config_file.c_str(),
235 userDefinedFunctionsP.c_str(),
236 userDefinedFunctionsD.c_str(),
242 LOG(
FATAL) <<
"Failed to start Calcite server [errno=" << errsv
243 <<
"]: " << strerror(errsv);
245 LOG(
INFO) <<
"Successfully started Calcite server";
251 std::pair<std::shared_ptr<CalciteServerClient>, std::shared_ptr<TTransport>>
253 const auto transport = connMgr_->open_buffered_client_transport(
"localhost",
264 }
catch (TException& tx) {
266 }
catch (std::exception& ex) {
269 std::shared_ptr<TProtocol> protocol(
new TBinaryProtocol(transport));
270 std::shared_ptr<CalciteServerClient> client;
271 client.reset(
new CalciteServerClient(protocol));
272 std::pair<std::shared_ptr<CalciteServerClient>, std::shared_ptr<TTransport>> ret;
273 return std::make_pair(client, transport);
278 const std::string& data_dir,
279 const size_t calcite_max_mem,
280 const std::string& udf_filename) {
281 LOG(
INFO) <<
"Running Calcite server as a daemon";
284 int ping_time = ping();
285 if (ping_time > -1) {
288 <<
"Appears to be orphaned Calcite server already running, shutting it down";
289 LOG(
ERROR) <<
"Please check that you are not trying to run two servers on same port";
290 LOG(
ERROR) <<
"Attempting to shutdown orphaned Calcite server";
292 auto clientP = getClient(remote_calcite_port_);
293 clientP.first->shutdown();
294 clientP.second->close();
295 LOG(
ERROR) <<
"orphaned Calcite server shutdown";
297 }
catch (TException& tx) {
298 LOG(
ERROR) <<
"Failed to shutdown orphaned Calcite server, reason: " << tx.what();
310 ssl_keystore_password_,
316 std::this_thread::sleep_for(std::chrono::milliseconds(200));
318 for (
int i = 2; i <= retry_max; i++) {
319 int ping_time = ping(i, retry_max);
320 if (ping_time > -1) {
321 LOG(
INFO) <<
"Calcite server start took " << i * 100 <<
" ms ";
322 LOG(
INFO) <<
"ping took " << ping_time <<
" ms ";
323 server_available_ =
true;
327 std::this_thread::sleep_for(std::chrono::milliseconds(100));
330 server_available_ =
false;
331 LOG(
FATAL) <<
"Could not connect to Calcite remote server running on port [" << port
341 auto clientP = getClient(remote_calcite_port_);
342 clientP.first->ping();
343 clientP.second->close();
347 }
catch (TException& tx) {
348 if (retry_num >= max_retry) {
349 LOG(
ERROR) <<
"Problems connecting to Calcite. Thrift error - " << tx.what();
356 const int calcite_port,
357 const std::string& data_dir,
358 const size_t calcite_max_mem,
359 const size_t service_timeout,
360 const bool service_keepalive,
361 const std::string& udf_filename)
362 : server_available_(
false)
363 , service_timeout_(service_timeout)
364 , service_keepalive_(service_keepalive) {
365 init(db_port, calcite_port, data_dir, calcite_max_mem, udf_filename);
369 const int calcite_port,
370 const std::string& data_dir,
371 const size_t calcite_max_mem,
372 const std::string& udf_filename) {
373 LOG(
INFO) <<
"Creating Calcite Handler, Calcite Port is " << calcite_port
374 <<
" base data dir is " << data_dir;
375 connMgr_ = std::make_shared<ThriftClientConnection>();
376 if (calcite_port < 0) {
377 CHECK(
false) <<
"JNI mode no longer supported.";
379 if (calcite_port == 0) {
385 runServer(db_port, calcite_port, data_dir, calcite_max_mem, udf_filename);
391 const std::string& data_dir,
392 const std::string& udf_filename)
393 : service_timeout_(system_parameters.calcite_timeout)
394 , service_keepalive_(system_parameters.calcite_keepalive)
395 , ssl_trust_store_(system_parameters.ssl_trust_store)
396 , ssl_trust_password_(system_parameters.ssl_trust_password)
397 , ssl_key_file_(system_parameters.ssl_key_file)
398 , ssl_keystore_(system_parameters.ssl_keystore)
399 , ssl_keystore_password_(system_parameters.ssl_keystore_password)
400 , ssl_ca_file_(system_parameters.ssl_trust_ca_file)
401 , db_config_file_(system_parameters.config_file) {
413 clientP.first->updateMetadata(catalog, table);
414 clientP.second->close();
416 LOG(
INFO) <<
"Time to updateMetadata " << ms <<
" (ms)";
418 LOG(
INFO) <<
"Not routing to Calcite, server is not up";
425 const auto db_name = accessed_catalog.
name();
427 db_object.
loadKey(accessed_catalog);
432 throw std::runtime_error(
"Unauthorized Access: user " + user.userLoggable() +
433 " is not allowed to access database " + db_name +
".");
438 std::vector<std::vector<std::string>> tableOrViewNames,
441 for (
auto tableOrViewName : tableOrViewNames) {
449 catalog->getMetadataForTable(tableOrViewName[0],
false);
452 throw std::runtime_error(
"unknown table of view: " + tableOrViewName[0]);
456 key.
dbId = catalog->getCurrentDB().dbId;
462 std::vector<DBObject> privObjects{dbobject};
465 throw std::runtime_error(
"Operation not supported for object " +
471 throw std::runtime_error(
"Violation of access privileges: user " +
473 " has no proper privileges for object " +
481 std::string sql_string,
482 const TQueryParsingOption& query_parsing_option,
483 const TOptimizationOption& optimization_option,
484 const std::string& calcite_session_id) {
486 std::move(sql_string),
487 query_parsing_option,
490 if (query_parsing_option.check_privileges && !query_parsing_option.is_explain) {
498 TPlanResult plan)
const {
504 plan.primary_accessed_objects.tables_selected_from,
508 plan.primary_accessed_objects.tables_inserted_into,
512 plan.primary_accessed_objects.tables_updated_in,
516 plan.primary_accessed_objects.tables_deleted_from,
523 const std::vector<std::string>& visible_tables,
524 const std::string sql_string,
526 std::vector<TCompletionHint> hints;
530 const auto catalog =
cat.getCurrentDB().dbName;
532 client.first->getCompletionHints(
533 hints, user, session, catalog, visible_tables, sql_string, cursor);
538 std::vector<std::string> v_db_obj;
540 document.Parse(ra.c_str());
541 const Value& rels = document[
"rels"];
542 CHECK(rels.IsArray());
543 for (
auto& v : rels.GetArray()) {
544 std::string relOp(v[
"relOp"].GetString());
545 if (!relOp.compare(
"EnumerableTableScan")) {
547 auto t = v[
"table"].GetArray();
548 x = t[1].GetString();
549 v_db_obj.push_back(x);
557 const std::string sql_string,
558 const TQueryParsingOption& query_parsing_option,
559 const TOptimizationOption& optimization_option,
560 const std::string& calcite_session_id) {
563 const auto&
cat = user_session_info->getCatalog();
565 const std::string catalog =
cat.getCurrentDB().dbName;
566 LOG(
INFO) <<
"User " << user <<
" catalog " << catalog <<
" sql '"
568 LOG(
IR) <<
"SQL query\n"
575 std::vector<TRestriction> trestrictions;
585 clientP.first->process(ret,
587 calcite_session_id.empty()
588 ? user_session_info->get_session_id()
589 : calcite_session_id,
592 query_parsing_option,
595 clientP.second->close();
600 << (ms > ret.execution_time_ms ? ms - ret.execution_time_ms : 0)
601 <<
" (ms), Time in Java Calcite server " << ret.execution_time_ms
603 }
catch (InvalidParseRequest& e) {
604 throw std::invalid_argument(e.whyUp);
605 }
catch (
const TTransportException& ex) {
606 if (ex.getType() == TTransportException::TIMED_OUT) {
607 LOG(
WARNING) <<
"Calcite request timed out: " << ex.what();
609 LOG(
FATAL) <<
"Error occurred trying to communicate with Calcite server, the "
611 << ex.what() <<
"', heavydb restart will be required";
614 }
catch (
const std::exception& ex) {
616 <<
"Error occurred trying to communicate with Calcite server, the error was: '"
617 << ex.what() <<
"', heavydb restart will be required";
621 LOG(
FATAL) <<
"Not routing to Calcite, server is not up";
629 std::string whitelist;
632 clientP.first->getExtensionFunctionWhitelist(whitelist);
633 clientP.second->close();
634 VLOG(1) << whitelist;
637 LOG(
FATAL) <<
"Not routing to Calcite, server is not up";
647 std::string whitelist;
650 clientP.first->getUserDefinedFunctionWhitelist(whitelist);
651 clientP.second->close();
652 VLOG(1) <<
"User defined functions whitelist loaded from Calcite: " << whitelist;
655 LOG(
FATAL) <<
"Not routing to Calcite, server is not up";
669 LOG_IF(
INFO, log) <<
"Shutting down Calcite server";
672 clientP.first->shutdown();
673 clientP.second->close();
674 }
catch (
const std::exception& e) {
675 if (std::string(e.what()) !=
"connect() failed: Connection refused" &&
676 std::string(e.what()) !=
"socket open() error: Connection refused" &&
677 std::string(e.what()) !=
"No more data to read.") {
678 std::cerr <<
"Error shutting down Calcite server: " << e.what() << std::endl;
693 std::string whitelist;
695 clientP.first->getRuntimeExtensionFunctionWhitelist(whitelist);
696 clientP.second->close();
697 VLOG(1) <<
"Runtime extension functions whitelist loaded from Calcite: " << whitelist;
700 LOG(
FATAL) <<
"Not routing to Calcite, server is not up";
708 const std::vector<TUserDefinedFunction>& udfs,
709 const std::vector<TUserDefinedTableFunction>& udtfs,
713 clientP.first->setRuntimeExtensionFunctions(udfs, udtfs, isruntime);
714 clientP.second->close();
716 LOG(
FATAL) <<
"Not routing to Calcite, server is not up";
722 bool check_privileges,
723 bool is_explain_detail) {
724 TQueryParsingOption query_parsing_info;
725 query_parsing_info.legacy_syntax = legacy_syntax;
726 query_parsing_info.is_explain = is_explain;
727 query_parsing_info.check_privileges = check_privileges;
728 query_parsing_info.is_explain_detail = is_explain_detail;
730 CHECK_LE(is_explain_detail, is_explain);
731 return query_parsing_info;
735 bool is_view_optimize,
736 bool enable_watchdog,
737 const std::vector<TFilterPushDownInfo>& filter_push_down_info,
738 bool distributed_mode) {
739 TOptimizationOption optimization_option;
740 optimization_option.filter_push_down_info = filter_push_down_info;
741 optimization_option.is_view_optimize = is_view_optimize;
742 optimization_option.enable_watchdog = enable_watchdog;
743 optimization_option.distributed_mode = distributed_mode;
744 return optimization_option;
std::vector< std::string > get_db_objects(const std::string ra)
int wrapped_execlp(char const *path, XDEBUG_OPTION &&x_debug, REMOTE_DEBUG_OPTION &&remote_debug, REMAINING_ARGS &&...standard_args)
TPlanResult processImpl(query_state::QueryStateProxy, std::string sql_string, const TQueryParsingOption &query_parsing_option, const TOptimizationOption &optimization_option, const std::string &calcite_session_id)
std::once_flag shutdown_once_flag_
class for a per-database catalog. also includes metadata for the current database and the current use...
static TimeT::rep execution(F func, Args &&...args)
std::string get_root_abs_path()
void init(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &udf_filename)
static std::string const getInternalSessionProxyUserName()
const std::string kDefaultLogDirName
static const AccessPrivileges INSERT_INTO_TABLE
TQueryParsingOption getCalciteQueryParsingOption(bool legacy_syntax, bool is_explain, bool check_privileges, bool is_explain_detail)
void setRuntimeExtensionFunctions(const std::vector< TUserDefinedFunction > &udfs, const std::vector< TUserDefinedTableFunction > &udtfs, bool isruntime=true)
std::shared_ptr< ThriftClientConnection > connMgr_
void setPrivileges(const AccessPrivileges &privs)
Timer createTimer(char const *event_name)
TOptimizationOption getCalciteOptimizationOption(bool is_view_optimize, bool enable_watchdog, const std::vector< TFilterPushDownInfo > &filter_push_down_info, bool distributed_mode)
static const AccessPrivileges SELECT_FROM_TABLE
#define LOG_IF(severity, condition)
virtual void updateMetadata(std::string catalog, std::string table)
This file contains the class specification and related data structures for Catalog.
void runServer(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &udf_filename)
static SysCatalog & instance()
static void start_calcite_server_as_daemon(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &ssl_trust_store, const std::string &ssl_trust_password_X, const std::string &ssl_keystore, const std::string &ssl_keystore_password_X, const std::string &ssl_key_file, const std::string &db_config_file, const std::string &udf_filename)
std::vector< TCompletionHint > getCompletionHints(const Catalog_Namespace::SessionInfo &session_info, const std::vector< std::string > &visible_tables, const std::string sql_string, const int cursor)
static const AccessPrivileges DELETE_FROM_TABLE
int ping(int retry_num=0, int max_retry=50)
std::string getUserDefinedFunctionWhitelist()
std::string get_session_id() const
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void checkPermissionForTables(const Catalog_Namespace::SessionInfo &session_info, std::vector< std::vector< std::string >> tableOrViewNames, AccessPrivileges tablePrivs, AccessPrivileges viewPrivs)
void check_db_access(const Catalog_Namespace::SessionInfo &session_info, const Catalog_Namespace::Catalog &accessed_catalog)
static const AccessPrivileges SELECT_FROM_VIEW
Catalog & getCatalog() const
void inner_close_calcite_server(bool log)
static const AccessPrivileges ACCESS
bool g_enable_watchdog false
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
TPlanResult process(query_state::QueryStateProxy, std::string sql_string, const TQueryParsingOption &query_parsing_option, const TOptimizationOption &optimization_option, const std::string &calcite_session_id="")
static const AccessPrivileges UPDATE_IN_TABLE
void checkAccessedObjectsPrivileges(query_state::QueryStateProxy query_state_prox, TPlanResult plan) const
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
std::string getExtensionFunctionWhitelist()
const UserMetadata & get_currentUser() const
std::string getRuntimeExtensionFunctionWhitelist()
void close_calcite_server(bool log=true)