28 #include <gperftools/heap-profiler.h>
29 #endif // HAVE_PROFILER
31 #include "MapDRelease.h"
34 #include "gen-cpp/CalciteServer.h"
68 #ifdef HAVE_RUNTIME_LIBS
84 #include <aws/core/auth/AWSCredentialsProviderChain.h>
88 #include <sys/types.h>
90 #include <boost/algorithm/string.hpp>
91 #include <boost/filesystem.hpp>
92 #include <boost/make_shared.hpp>
93 #include <boost/process/search_path.hpp>
94 #include <boost/program_options.hpp>
95 #include <boost/tokenizer.hpp>
108 #include <arrow/api.h>
109 #include <arrow/io/api.h>
110 #include <arrow/ipc/api.h>
115 #ifdef ENABLE_IMPORT_PARQUET
116 extern bool g_enable_parquet_import_fsi;
131 #define INVALID_SESSION_ID ""
133 #define SET_REQUEST_ID(parent_request_id) \
134 if (g_uniform_request_ids_per_thrift_call && parent_request_id) \
135 logger::set_request_id(parent_request_id); \
136 else if (logger::set_new_request_id(); parent_request_id) \
137 LOG(INFO) << "This request has parent request_id(" << parent_request_id << ')'
139 #define THROW_DB_EXCEPTION(errstr) \
142 ex.error_msg = errstr; \
143 LOG(ERROR) << ex.error_msg; \
153 const int32_t user_id,
154 const std::string& dashboard_name) {
166 extern std::unique_ptr<std::string> g_libgeos_so_filename;
170 const std::vector<LeafHostInfo>& string_leaves,
171 const std::string& base_data_path,
172 const bool allow_multifrag,
173 const bool jit_debug,
174 const bool intel_jit_profile,
175 const bool read_only,
176 const bool allow_loop_joins,
177 const bool enable_rendering,
178 const bool renderer_prefer_igpu,
179 const unsigned renderer_vulkan_timeout_ms,
180 const bool renderer_use_parallel_executors,
181 const bool enable_auto_clear_render_mem,
182 const int render_oom_retry_threshold,
183 const size_t render_mem_bytes,
184 const size_t max_concurrent_render_sessions,
185 const size_t reserved_gpu_mem,
186 const bool render_compositor_use_last_gpu,
187 const bool renderer_enable_slab_allocation,
188 const size_t num_reader_threads,
191 const bool legacy_syntax,
192 const int idle_session_duration,
193 const int max_session_duration,
194 const std::string& udf_filename,
195 const std::string& clang_path,
196 const std::vector<std::string>& clang_options,
198 const std::string& libgeos_so_filename,
200 #ifdef HAVE_TORCH_TFS
201 const std::string& torch_lib_path,
204 const bool is_new_db)
205 : leaf_aggregator_(db_leaves)
206 , db_leaves_(db_leaves)
207 , string_leaves_(string_leaves)
208 , base_data_path_(base_data_path)
209 , random_gen_(std::random_device{}())
210 , session_id_dist_(0, INT32_MAX)
211 , jit_debug_(jit_debug)
212 , intel_jit_profile_(intel_jit_profile)
213 , allow_multifrag_(allow_multifrag)
214 , read_only_(read_only)
215 , allow_loop_joins_(allow_loop_joins)
216 , authMetadata_(authMetadata)
217 , system_parameters_(system_parameters)
218 , legacy_syntax_(legacy_syntax)
220 std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
221 , super_user_rights_(
false)
222 , idle_session_duration_(idle_session_duration * 60)
223 , max_session_duration_(max_session_duration * 60)
224 , enable_rendering_(enable_rendering)
225 , renderer_prefer_igpu_(renderer_prefer_igpu)
226 , renderer_vulkan_timeout_(renderer_vulkan_timeout_ms)
227 , renderer_use_parallel_executors_(renderer_use_parallel_executors)
228 , enable_auto_clear_render_mem_(enable_auto_clear_render_mem)
229 , render_oom_retry_threshold_(render_oom_retry_threshold)
230 , render_mem_bytes_(render_mem_bytes)
231 , max_concurrent_render_sessions_(max_concurrent_render_sessions)
232 , reserved_gpu_mem_(reserved_gpu_mem)
233 , render_compositor_use_last_gpu_(render_compositor_use_last_gpu)
234 , renderer_enable_slab_allocation_{renderer_enable_slab_allocation}
235 , num_reader_threads_(num_reader_threads)
237 , libgeos_so_filename_(libgeos_so_filename)
239 #ifdef HAVE_TORCH_TFS
240 , torch_lib_path_(torch_lib_path)
242 , disk_cache_config_(disk_cache_config)
243 , udf_filename_(udf_filename)
244 , clang_path_(clang_path)
245 , clang_options_(clang_options)
246 , max_num_sessions_(-1) {
248 initialize(is_new_db);
249 resetSessionsStore();
253 size_t num_cpu_slots{0};
254 size_t num_gpu_slots{0};
255 size_t cpu_result_mem{0};
256 size_t cpu_buffer_pool_mem{0};
257 size_t gpu_buffer_pool_mem{0};
258 LOG(
INFO) <<
"Initializing Executor Resource Manager";
261 LOG(
INFO) <<
"\tSetting Executor resource pool avaiable CPU threads/slots to "
262 "user-specified value of "
266 LOG(
INFO) <<
"\tSetting Executor resource pool avaiable CPU threads/slots to default "
277 LOG(
INFO) <<
"\tSetting max per-query CPU threads to ratio of "
279 << num_cpu_slots <<
" available threads, or "
289 cpu_buffer_pool_mem =
data_mgr_->getCpuBufferPoolSize();
293 const size_t system_mem_bytes = DataMgr::getTotalSystemMemory();
294 CHECK_GT(system_mem_bytes,
size_t(0));
295 const size_t remaining_cpu_mem_bytes = system_mem_bytes >= cpu_buffer_pool_mem
296 ? system_mem_bytes - cpu_buffer_pool_mem
299 std::max(static_cast<size_t>(remaining_cpu_mem_bytes *
301 static_cast<size_t>(1UL << 32));
306 gpu_buffer_pool_mem =
data_mgr_->getGpuBufferPoolSize();
322 constexpr
double buffer_pool_max_occupancy{0.95};
323 const size_t conservative_cpu_buffer_pool_mem =
324 static_cast<size_t>(cpu_buffer_pool_mem * buffer_pool_max_occupancy);
325 const size_t conservative_gpu_buffer_pool_mem =
326 static_cast<size_t>(gpu_buffer_pool_mem * buffer_pool_max_occupancy);
329 <<
"\tSetting Executor resource pool reserved space for CPU buffer pool memory to "
331 if (gpu_buffer_pool_mem > 0UL) {
332 LOG(
INFO) <<
"\tSetting Executor resource pool reserved space for GPU buffer pool "
336 LOG(
INFO) <<
"\tSetting Executor resource pool reserved space for CPU result memory to "
343 conservative_cpu_buffer_pool_mem,
344 conservative_gpu_buffer_pool_mem,
360 <<
"The product of g_num_tuple_threshold_switch_to_baseline and "
361 "g_ratio_num_hash_entry_to_num_tuple_switch_to_baseline exceeds 64 bits.";
369 for (
auto session : sessions) {
387 "Server already initialized; service restart required to activate any new "
401 LOG(
WARNING) <<
"This build isn't CUDA enabled, will run on CPU";
408 is_rendering_enabled =
false;
416 if (is_rendering_enabled) {
420 std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
424 cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(
432 }
catch (
const std::exception& e) {
433 LOG(
ERROR) <<
"Unable to instantiate CudaMgr, falling back to CPU-only mode. "
437 is_rendering_enabled =
false;
452 }
catch (
const std::exception& e) {
453 LOG(
FATAL) <<
"Failed to initialize data manager: " << e.what();
459 std::string udf_ast_filename(
"");
463 const auto cuda_mgr =
data_mgr_->getCudaMgr();
465 cuda_mgr ? cuda_mgr->getDeviceArch()
471 if (!cuda_udf_ir_file.empty()) {
476 }
catch (
const std::exception& e) {
477 LOG(
FATAL) <<
"Failed to initialize UDF compiler: " << e.what();
483 }
catch (
const std::exception& e) {
484 LOG(
FATAL) <<
"Failed to initialize Calcite server: " << e.what();
492 }
catch (
const std::exception& e) {
493 LOG(
FATAL) <<
"Failed to initialize extension functions: " << e.what();
498 }
catch (
const std::exception& e) {
499 LOG(
FATAL) <<
"Failed to initialize table functions factory: " << e.what();
502 #ifdef HAVE_RUNTIME_LIBS
504 #ifdef HAVE_TORCH_TFS
509 }
catch (
const std::exception& e) {
510 LOG(
ERROR) <<
"Failed to load runtime libraries: " << e.what();
511 LOG(
ERROR) <<
"Support for runtime library table functions is disabled.";
518 std::vector<TUserDefinedFunction> udfs = {};
519 calcite_->setRuntimeExtensionFunctions(udfs, udtfs,
false);
520 }
catch (
const std::exception& e) {
521 LOG(
FATAL) <<
"Failed to register compile-time table functions: " << e.what();
526 LOG(
ERROR) <<
"No GPUs detected, falling back to CPU mode";
541 }
catch (
const std::exception& e) {
542 LOG(
FATAL) <<
"Failed to initialize system catalog: " << e.what();
548 if (is_rendering_enabled) {
561 }
catch (
const std::exception& e) {
562 LOG(
ERROR) <<
"Backend rendering disabled: " << e.what();
569 if (!libgeos_so_filename_.empty()) {
570 g_libgeos_so_filename.reset(
new std::string(libgeos_so_filename_));
571 LOG(
INFO) <<
"Overriding default geos library with '" + *g_libgeos_so_filename +
"'";
587 const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
594 std::string session_id;
599 calcite_->getInternalSessionProxyUserName(),
600 calcite_->getInternalSessionProxyPassword(),
607 std::make_shared<Catalog_Namespace::SessionInfo>(
609 CHECK(emplace_ret.second);
621 const std::string& username,
622 const std::string& dbname) {
625 std::string username2 = username;
626 std::string dbname2 = dbname;
628 std::shared_ptr<Catalog>
cat =
nullptr;
632 }
catch (std::exception& e) {
639 std::vector<DBObject> dbObjects;
640 dbObjects.push_back(dbObject);
643 " is not allowed to access database " + dbname2 +
".");
645 connect_impl(session_id, std::string(), dbname2, user_meta, cat, stdlog);
653 const std::string& inputToken,
654 const std::string& dbname) {
659 const std::string& username,
660 const std::string& passwd,
661 const std::string& dbname) {
665 std::string username2 = username;
666 std::string dbname2 = dbname;
668 std::shared_ptr<Catalog>
cat =
nullptr;
672 }
catch (std::exception& e) {
673 stdlog.appendNameValuePairs(
"user", username,
"db", dbname,
"exception", e.what());
680 std::vector<DBObject> dbObjects;
681 dbObjects.push_back(dbObject);
683 stdlog.appendNameValuePairs(
684 "user", username,
"db", dbname,
"exception",
"Missing Privileges");
686 " is not allowed to access database " + dbname2 +
".");
688 connect_impl(session_id, passwd, dbname2, user_meta, cat, stdlog);
695 const std::string& passwd,
696 const std::string& dbname,
698 std::shared_ptr<Catalog>
cat,
704 session_id = session_ptr->get_session_id();
713 ? std::vector<std::string>{{
"super"}}
728 const auto session_id = session_ptr->get_session_id();
729 std::exception_ptr leaf_exception =
nullptr;
735 leaf_exception = std::current_exception();
742 if (leaf_exception) {
743 std::rethrow_exception(leaf_exception);
748 const std::string& dbname) {
752 auto stdlog =
STDLOG(session_ptr);
754 std::string dbname2 = dbname;
757 dbname2, session_ptr->get_currentUser().userName);
758 session_ptr->set_catalog_ptr(cat);
763 }
catch (std::exception& e) {
769 const TSessionId& session1_id_or_json) {
773 auto stdlog =
STDLOG(session1_ptr);
778 std::shared_ptr<Catalog>
cat = session1_ptr->get_catalog_ptr();
780 session2_id = session2_ptr->get_session_id();
787 }
catch (std::exception& e) {
793 const TSessionId& interrupt_session_id_or_json) {
802 auto&
cat = session_ptr->getCatalog();
803 auto stdlog =
STDLOG(session_ptr);
805 const auto allow_query_interrupt =
808 const auto dbname = cat.getCurrentDB().dbName;
819 auto target_executor_ids =
820 executor->getExecutorIdsRunningQuery(query_request_info.
sessionId());
821 if (target_executor_ids.empty()) {
823 executor->getSessionLock());
824 if (executor->checkIsQuerySessionEnrolled(query_request_info.
sessionId(),
825 session_read_lock)) {
826 session_read_lock.unlock();
827 VLOG(1) <<
"Received interrupt: "
828 <<
"User " << session_ptr->get_currentUser().userLoggable()
829 <<
", Database " << dbname << std::endl;
830 executor->interrupt(query_request_info.
sessionId(),
834 for (
auto& executor_id : target_executor_ids) {
835 VLOG(1) <<
"Received interrupt: "
836 <<
"Executor " << executor_id <<
", User "
837 << session_ptr->get_currentUser().userLoggable() <<
", Database "
838 << dbname << std::endl;
840 target_executor->interrupt(query_request_info.
sessionId(),
845 LOG(
INFO) <<
"User " << session_ptr->get_currentUser().userName
846 <<
" interrupted session with database " << dbname << std::endl;
853 return TRole::type::AGGREGATOR;
855 return TRole::type::LEAF;
857 return TRole::type::SERVER;
860 const TSessionId& session_id_or_json) {
868 _return.rendering_enabled = rendering_enabled;
872 _return.poly_rendering_enabled = rendering_enabled;
874 _return.renderer_status_json =
879 const TSessionId& session_id_or_json) {
899 LOG(
INFO) <<
"get_status() called in session-less mode";
905 ret.rendering_enabled = rendering_enabled;
909 ret.poly_rendering_enabled = rendering_enabled;
911 ret.renderer_status_json =
915 _return.push_back(ret);
917 std::vector<TServerStatus> leaf_status =
919 _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
924 const TSessionId& session_id_or_json) {
930 const auto cuda_mgr =
data_mgr_->getCudaMgr();
932 ret.num_gpu_hw = cuda_mgr->getDeviceCount();
933 ret.start_gpu = cuda_mgr->getStartGpu();
934 if (ret.start_gpu >= 0) {
935 ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
938 for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
939 TGpuSpecification gpu_spec;
940 auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
941 gpu_spec.num_sm = deviceProperties->numMPs;
942 gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
943 gpu_spec.memory = deviceProperties->globalMem;
944 gpu_spec.compute_capability_major = deviceProperties->computeMajor;
945 gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
946 ret.gpu_info.push_back(gpu_spec);
951 ret.num_cpu_hw = std::thread::hardware_concurrency();
955 _return.hardware_info.push_back(ret);
959 const TSessionId& session_id_or_json) {
964 auto stdlog =
STDLOG(session_ptr);
966 auto user_metadata = session_ptr->get_currentUser();
967 _return.user = user_metadata.userName;
968 _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
969 _return.start_time = session_ptr->get_start_time();
970 _return.is_super = user_metadata.isSuper;
983 <<
"element types of arrays should always be nullable";
985 const auto array_tv = boost::get<ArrayTargetValue>(&tv);
987 bool is_null = !array_tv->is_initialized();
989 const auto& vec = array_tv->get();
990 for (
const auto& elem_tv : vec) {
994 column.data.arr_col.push_back(tColumn);
995 column.nulls.push_back(is_null && !ti.
get_notnull());
997 const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
999 auto s_n = boost::get<NullableString>(scalar_tv);
1000 auto s = boost::get<std::string>(s_n);
1002 column.data.str_col.push_back(*s);
1004 column.data.str_col.emplace_back(
"");
1005 auto null_p = boost::get<void*>(s_n);
1006 CHECK(null_p && !*null_p);
1010 const auto array_tv = boost::get<ArrayTargetValue>(&tv);
1012 bool is_null = !array_tv->is_initialized();
1016 const auto& vec = array_tv->get();
1017 for (
const auto& elem_tv : vec) {
1020 column.data.arr_col.push_back(tColumn);
1021 column.nulls.push_back(
false);
1024 column.data.arr_col.push_back(tColumn);
1025 column.nulls.push_back(is_null && !ti.
get_notnull());
1030 const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
1032 if (boost::get<int64_t>(scalar_tv)) {
1033 int64_t data = *(boost::get<int64_t>(scalar_tv));
1036 double val =
static_cast<double>(data);
1038 val /= pow(10.0, std::abs(ti.
get_scale()));
1040 column.data.real_col.push_back(val);
1042 column.data.int_col.push_back(data);
1071 column.nulls.push_back(
false);
1073 }
else if (boost::get<double>(scalar_tv)) {
1074 double data = *(boost::get<double>(scalar_tv));
1075 column.data.real_col.push_back(data);
1081 }
else if (boost::get<float>(scalar_tv)) {
1083 float data = *(boost::get<float>(scalar_tv));
1084 column.data.real_col.push_back(data);
1086 }
else if (boost::get<NullableString>(scalar_tv)) {
1087 auto s_n = boost::get<NullableString>(scalar_tv);
1088 auto s = boost::get<std::string>(s_n);
1090 column.data.str_col.push_back(*s);
1092 column.data.str_col.emplace_back(
"");
1093 auto null_p = boost::get<void*>(s_n);
1094 CHECK(null_p && !*null_p);
1105 const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
1109 <<
"element types of arrays should always be nullable";
1110 const auto array_tv = boost::get<ArrayTargetValue>(&tv);
1112 if (array_tv->is_initialized()) {
1113 const auto& vec = array_tv->get();
1114 for (
const auto& elem_tv : vec) {
1116 datum.val.arr_val.push_back(scalar_col_val);
1119 datum.is_null =
false;
1121 datum.is_null =
true;
1125 if (boost::get<int64_t>(scalar_tv)) {
1126 int64_t data = *(boost::get<int64_t>(scalar_tv));
1129 double val =
static_cast<double>(data);
1131 val /= pow(10.0, std::abs(ti.
get_scale()));
1133 datum.val.real_val = val;
1135 datum.val.int_val = data;
1149 datum.is_null = (datum.val.int_val ==
NULL_INT);
1154 datum.is_null = (datum.val.int_val ==
NULL_BIGINT);
1161 datum.is_null = (datum.val.int_val ==
NULL_BIGINT);
1164 datum.is_null =
false;
1166 }
else if (boost::get<double>(scalar_tv)) {
1167 datum.val.real_val = *(boost::get<double>(scalar_tv));
1169 datum.is_null = (datum.val.real_val ==
NULL_FLOAT);
1171 datum.is_null = (datum.val.real_val ==
NULL_DOUBLE);
1173 }
else if (boost::get<float>(scalar_tv)) {
1175 datum.val.real_val = *(boost::get<float>(scalar_tv));
1176 datum.is_null = (datum.val.real_val ==
NULL_FLOAT);
1177 }
else if (boost::get<NullableString>(scalar_tv)) {
1178 auto s_n = boost::get<NullableString>(scalar_tv);
1179 auto s = boost::get<std::string>(s_n);
1181 datum.val.str_val = *s;
1183 auto null_p = boost::get<void*>(s_n);
1184 CHECK(null_p && !*null_p);
1194 TQueryResult& _return,
1196 const std::shared_ptr<Catalog_Namespace::SessionInfo> session_ptr,
1197 const std::string& query_str,
1198 const bool column_format,
1199 const std::string& nonce,
1200 const int32_t first_n,
1201 const int32_t at_most_n,
1202 const bool use_calcite) {
1203 _return.total_time_ms = 0;
1204 _return.nonce = nonce;
1206 switch (pw.getQueryType()) {
1208 _return.query_type = TQueryType::READ;
1209 VLOG(1) <<
"query type: READ";
1213 _return.query_type = TQueryType::WRITE;
1214 VLOG(1) <<
"query type: WRITE";
1218 _return.query_type = TQueryType::SCHEMA_READ;
1219 VLOG(1) <<
"query type: SCHEMA READ";
1223 _return.query_type = TQueryType::SCHEMA_WRITE;
1224 VLOG(1) <<
"query type: SCHEMA WRITE";
1228 _return.query_type = TQueryType::UNKNOWN;
1240 session_ptr->get_executor_device_type(),
1246 _return, result, query_state_proxy, column_format, first_n, at_most_n);
1253 const bool column_format,
1254 const int32_t first_n,
1255 const int32_t at_most_n) {
1257 if (result.
empty()) {
1290 const TSessionId& session_id_or_json,
1291 const std::string& query_str,
1292 const bool column_format,
1293 const std::string& nonce,
1294 const int32_t first_n,
1295 const int32_t at_most_n) {
1298 const std::string exec_ra_prefix =
"execute relalg";
1299 const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1301 use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1304 auto stdlog =
STDLOG(session_ptr, query_state);
1306 stdlog.appendNameValuePairs(
"nonce", nonce);
1309 ScopeGuard reset_was_deferred_copy_from = [
this, &session_ptr] {
1313 if (first_n >= 0 && at_most_n >= 0) {
1323 query_state->createQueryStateProxy(),
1324 query_state->getQueryStr(),
1331 _return.nonce = nonce;
1334 query_state->createQueryStateProxy(),
1344 std::string debug_json = timer.stopAndGetJson();
1345 if (!debug_json.empty()) {
1346 _return.__set_debug(std::move(debug_json));
1348 stdlog.appendNameValuePairs(
1349 "execution_time_ms",
1350 _return.execution_time_ms,
1352 stdlog.duration<std::chrono::milliseconds>());
1355 }
catch (
const std::exception& e) {
1356 if (strstr(e.what(),
"java.lang.NullPointerException")) {
1358 }
else if (strstr(e.what(),
"SQL Error: Encountered \";\"")) {
1360 }
else if (strstr(e.what(),
"SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1369 const TSessionId& session_id_or_json,
1370 const std::string& query_str,
1371 const bool column_format,
1372 const int32_t first_n,
1373 const int32_t at_most_n,
1377 const std::string exec_ra_prefix =
"execute relalg";
1378 const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1380 use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1385 auto stdlog =
STDLOG(session_ptr, query_state);
1389 ScopeGuard reset_was_deferred_copy_from = [
this, &session_ptr] {
1393 if (first_n >= 0 && at_most_n >= 0) {
1398 query_state->createQueryStateProxy(),
1400 session_ptr->get_executor_device_type(),
1410 stdlog.appendNameValuePairs(
1411 "execution_time_ms",
1414 stdlog.duration<std::chrono::milliseconds>());
1417 }
catch (
const std::exception& e) {
1418 if (strstr(e.what(),
"java.lang.NullPointerException")) {
1420 }
else if (strstr(e.what(),
"SQL Error: Encountered \";\"")) {
1422 }
else if (strstr(e.what(),
"SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1431 int64_t total_time_ms(0);
1443 TCreateParams create_params;
1444 if (deferred_copy_from_state->partitions ==
"REPLICATED") {
1445 create_params.is_replicated =
true;
1451 deferred_copy_from_state->table,
1452 deferred_copy_from_state->file_name,
1453 deferred_copy_from_state->copy_params,
1458 return total_time_ms;
1462 const TSessionId& session_id_or_json,
1463 const std::string& query_str,
1465 const int32_t device_id,
1466 const int32_t first_n,
1473 auto stdlog =
STDLOG(session_ptr, query_state);
1475 const auto executor_device_type = session_ptr->get_executor_device_type();
1477 if (results_device_type == TDeviceType::GPU) {
1484 if (device_id < 0 || device_id >=
data_mgr_->getCudaMgr()->getDeviceCount()) {
1486 std::string(
"Invalid device_id or unavailable GPU with this ID"));
1492 "Only read queries supported for the Arrow sql_execute_df endpoint."));
1496 "Explain is currently unsupported by the Arrow sql_execute_df endpoint."));
1502 query_state->createQueryStateProxy(),
1504 executor_device_type,
1510 const auto result_set = execution_result.
getRows();
1511 const auto executor_results_device_type = results_device_type == TDeviceType::CPU
1514 _return.execution_time_ms =
1516 const auto converter = std::make_unique<ArrowResultSetConverter>(
1519 executor_results_device_type,
1525 _return.arrow_conversion_time_ms +=
1528 std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
1529 _return.sm_size = arrow_result.sm_size;
1531 std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
1533 std::string(arrow_result.df_buffer.begin(), arrow_result.df_buffer.end());
1538 std::make_pair(_return.df_handle, arrow_result.serialized_cuda_handle));
1540 _return.df_size = arrow_result.df_size;
1544 const TSessionId& session_id_or_json,
1545 const std::string& query_str,
1546 const int32_t device_id,
1547 const int32_t first_n) {
1553 request_info.
json(),
1558 TArrowTransport::SHARED_MEMORY);
1563 const TDataFrame& df,
1565 const int32_t device_id) {
1569 std::string serialized_cuda_handle =
"";
1570 if (device_type == TDeviceType::GPU) {
1574 ex.error_msg = std::string(
1575 "Current data frame handle is not bookkept or been inserted "
1583 std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1584 std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1586 sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1595 const TSessionId& session_id_or_json,
1596 const std::string& query_str) {
1603 stdlog.setQueryState(query_state);
1606 if (
ExplainInfo(query_str).isExplain() || pw.is_ddl || pw.is_update_dml) {
1607 throw std::runtime_error(
"Can only validate SELECT statements.");
1612 TPlanResult parse_result;
1614 std::tie(parse_result, locks) =
parse_to_ra(query_state->createQueryStateProxy(),
1615 query_state->getQueryStr(),
1620 const auto query_ra = parse_result.plan_result;
1621 _return =
validateRelAlg(query_ra, query_state->createQueryStateProxy());
1622 }
catch (
const std::exception& e) {
1638 const std::string& sql) {
1639 boost::regex id_regex{R
"(([[:alnum:]]|_|\.)+)",
1640 boost::regex::extended | boost::regex::icase};
1641 boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1642 boost::sregex_token_iterator end;
1643 std::unordered_set<std::string> uc_column_names;
1644 std::unordered_set<std::string> uc_column_table_qualifiers;
1645 for (; tok_it != end; ++tok_it) {
1646 std::string column_name = *tok_it;
1647 std::vector<std::string> column_tokens;
1648 boost::split(column_tokens, column_name, boost::is_any_of(
"."));
1649 if (column_tokens.size() == 2) {
1651 uc_column_table_qualifiers.insert(
to_upper(column_tokens.front()));
1653 uc_column_names.insert(
to_upper(column_name));
1656 return {uc_column_names, uc_column_table_qualifiers};
1662 const TSessionId& session_id_or_json,
1663 const std::string& sql,
1668 std::vector<std::string> visible_tables;
1672 proj_tokens.uc_column_names, visible_tables, stdlog);
1674 compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1675 proj_tokens.uc_column_table_qualifiers.end());
1680 [&compatible_table_names](
const TCompletionHint& lhs,
const TCompletionHint& rhs) {
1681 if (lhs.type == TCompletionHintType::TABLE &&
1682 rhs.type == TCompletionHintType::TABLE) {
1685 if (compatible_table_names.find(
to_upper(lhs.hints.back())) !=
1686 compatible_table_names.end() &&
1687 compatible_table_names.find(
to_upper(rhs.hints.back())) ==
1688 compatible_table_names.end()) {
1692 return lhs.type < rhs.type;
1697 std::vector<std::string>& visible_tables,
1699 const std::string& sql,
1707 calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1708 }
catch (
const std::exception& e) {
1710 ex.error_msg = std::string(e.what());
1714 boost::regex from_expr{R
"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1715 const size_t length_to_cursor =
1716 cursor < 0 ? sql.size() : std::min(sql.size(),
static_cast<size_t>(cursor));
1718 if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1727 std::vector<std::string>& visible_tables,
1728 const std::string& sql,
1730 const auto last_word =
1732 boost::regex select_expr{R
"(\s*select\s+)",
1733 boost::regex::extended | boost::regex::icase};
1734 const size_t length_to_cursor =
1735 cursor < 0 ? sql.size() : std::min(sql.size(),
static_cast<size_t>(cursor));
1738 if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1749 const std::string kFromKeyword{
"FROM"};
1750 if (boost::istarts_with(kFromKeyword, last_word)) {
1751 TCompletionHint keyword_hint;
1752 keyword_hint.type = TCompletionHintType::KEYWORD;
1753 keyword_hint.replaced = last_word;
1754 keyword_hint.hints.emplace_back(kFromKeyword);
1755 hints.push_back(keyword_hint);
1758 const std::string kSelectKeyword{
"SELECT"};
1759 if (boost::istarts_with(kSelectKeyword, last_word)) {
1760 TCompletionHint keyword_hint;
1761 keyword_hint.type = TCompletionHintType::KEYWORD;
1762 keyword_hint.replaced = last_word;
1763 keyword_hint.hints.emplace_back(kSelectKeyword);
1764 hints.push_back(keyword_hint);
1769 std::unordered_map<std::string, std::unordered_set<std::string>>
1772 std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1773 for (
auto it = table_names.begin(); it != table_names.end();) {
1774 TTableDetails table_details;
1777 }
catch (
const TDBException& e) {
1779 it = table_names.erase(it);
1782 for (
const auto& column_type : table_details.row_desc) {
1783 column_names_by_table[*it].emplace(column_type.col_name);
1787 return column_names_by_table;
1796 const std::unordered_set<std::string>& uc_column_names,
1797 std::vector<std::string>& table_names,
1799 std::unordered_set<std::string> compatible_table_names_by_column;
1800 for (
auto it = table_names.begin(); it != table_names.end();) {
1801 TTableDetails table_details;
1804 }
catch (
const TDBException& e) {
1806 it = table_names.erase(it);
1809 for (
const auto& column_type : table_details.row_desc) {
1810 if (uc_column_names.find(
to_upper(column_type.col_name)) != uc_column_names.end()) {
1811 compatible_table_names_by_column.emplace(
to_upper(*it));
1817 return compatible_table_names_by_column;
1821 const bool is_update_delete) {
1828 TQueryResult query_result;
1830 auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
1835 parent_thread_local_ids =
1851 auto result_future = execute_rel_alg_task->get_future();
1852 result_future.get();
1855 const auto& row_desc = query_result.row_set.row_desc;
1856 const auto& targets_meta = execution_result.getTargetsMeta();
1857 CHECK_EQ(row_desc.size(), targets_meta.size());
1860 TRowDescriptor fixedup_row_desc;
1861 for (
size_t i = 0; i < row_desc.size(); i++) {
1862 const auto& col_desc = row_desc[i];
1863 auto fixedup_col_desc = col_desc;
1864 if (col_desc.col_type.encoding == TEncodingType::DICT &&
1865 col_desc.col_type.comp_param > 0) {
1866 const auto& type_info = targets_meta[i].get_type_info();
1869 type_info.getStringDictKey().db_id);
1870 const auto dd = cat->getMetadataForDict(col_desc.col_type.comp_param,
false);
1872 fixedup_col_desc.col_type.comp_param = dd->dictNBits;
1874 fixedup_row_desc.push_back(fixedup_col_desc);
1876 return fixedup_row_desc;
1880 const TSessionId& session_id_or_json) {
1884 auto session_ptr = stdlog.getConstSessionInfo();
1885 if (!session_ptr->get_currentUser().isSuper) {
1890 session_ptr->getCatalog().getCurrentDB().dbId);
1893 false,
true, session_ptr->get_currentUser().userName);
1898 const std::string& granteeName,
1899 const std::string& roleName) {
1903 const auto stdlog =
STDLOG(session_ptr);
1904 const auto current_user = session_ptr->get_currentUser();
1905 if (!current_user.isSuper) {
1907 user && current_user.userName != granteeName) {
1910 current_user.userName, granteeName,
true)) {
1912 "Only super users can check roles assignment that have not been directly "
1913 "granted to a user.");
1921 TDBObject outObject;
1922 outObject.objectName = inObject.
getName();
1923 outObject.grantee = roleName;
1976 const int type_val =
static_cast<int>(inObject.
getType());
1977 CHECK(type_val >= 0 && type_val < 6);
1983 const TDBObjectPermissions& permissions) {
1984 if (!permissions.__isset.database_permissions_) {
1987 auto perms = permissions.database_permissions_;
1990 (perms.view_sql_editor_ &&
2000 const TDBObjectPermissions& permissions) {
2001 if (!permissions.__isset.table_permissions_) {
2004 auto perms = permissions.table_permissions_;
2020 const TDBObjectPermissions& permissions) {
2021 if (!permissions.__isset.dashboard_permissions_) {
2024 auto perms = permissions.dashboard_permissions_;
2036 const TDBObjectPermissions& permissions) {
2037 if (!permissions.__isset.view_permissions_) {
2040 auto perms = permissions.view_permissions_;
2054 const TDBObjectPermissions& permissions) {
2055 CHECK(permissions.__isset.server_permissions_);
2056 auto perms = permissions.server_permissions_;
2068 const std::string& granteeName,
2069 const std::string& objectName,
2071 const TDBObjectPermissions& permissions) {
2075 auto stdlog =
STDLOG(session_ptr);
2076 auto const&
cat = session_ptr->getCatalog();
2077 auto const& current_user = session_ptr->get_currentUser();
2079 current_user.userName, granteeName,
false)) {
2081 "Users except superusers can only check privileges for self or roles granted "
2095 std::string func_name;
2096 switch (objectType) {
2099 func_name =
"database";
2103 func_name =
"table";
2107 func_name =
"dashboard";
2115 func_name =
"server";
2120 DBObject req_object(objectName, type);
2124 if (grantee_object) {
2134 const TSessionId& session_id_or_json,
2135 const std::string& roleName) {
2139 auto stdlog =
STDLOG(session_ptr);
2140 auto const& user = session_ptr->get_currentUser();
2141 if (!user.isSuper &&
2147 auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
2148 for (
auto& dbObject : *rl->getDbObjects(
true)) {
2149 if (dbObject.first.dbId != dbId) {
2155 TDBObjectsForRole.push_back(tdbObject);
2163 const TSessionId& session_id_or_json,
2164 const std::string& objectName,
2169 auto stdlog =
STDLOG(session_ptr);
2170 const auto&
cat = session_ptr->getCatalog();
2192 DBObject object_to_find(objectName, object_type);
2197 if (objectName ==
"") {
2198 object_to_find =
DBObject(-1, object_type);
2200 object_to_find =
DBObject(std::stoi(objectName), object_type);
2203 !objectName.empty()) {
2205 auto td =
cat.getMetadataForTable(objectName,
false);
2208 object_to_find =
DBObject(objectName, object_type);
2212 }
catch (
const std::exception&) {
2217 DBObject object_to_find_dblevel(
"", object_type);
2220 if (session_ptr->get_currentUser().isSuper) {
2224 session_ptr->get_currentUser().userId};
2225 dbObj.setName(
"super");
2226 TDBObjects.push_back(
2230 std::vector<std::string> grantees =
2232 session_ptr->get_currentUser().isSuper,
2233 session_ptr->get_currentUser().userName);
2234 for (
const auto& grantee : grantees) {
2237 if (gr && (object_found = gr->findDbObject(object_to_find.
getObjectKey(),
true))) {
2242 (object_found = gr->findDbObject(object_to_find_dblevel.
getObjectKey(),
true))) {
2249 std::shared_ptr<Catalog_Namespace::SessionInfo const> session_ptr,
2250 std::vector<std::string>& roles,
2251 const std::string& granteeName,
2255 if (session_ptr->get_currentUser().isSuper) {
2256 roles = grantee->getRoles(!effective);
2257 }
else if (grantee->isUser()) {
2258 if (session_ptr->get_currentUser().userName == granteeName) {
2259 roles = grantee->getRoles(!effective);
2262 "Only a superuser is authorized to request list of roles granted to another "
2266 CHECK(!grantee->isUser());
2270 session_ptr->get_currentUser().userName, granteeName,
false)) {
2271 roles = grantee->getRoles(!effective);
2282 const TSessionId& session_id_or_json,
2283 const std::string& granteeName) {
2289 auto session_ptr = stdlog.getConstSessionInfo();
2294 const TSessionId& session_id_or_json,
2295 const std::string& granteeName) {
2299 auto session_ptr = stdlog.getConstSessionInfo();
2305 const std::map<std::string, std::vector<std::string>>& table_col_names) {
2306 std::ostringstream oss;
2307 for (
const auto& [table_name, col_names] : table_col_names) {
2308 oss <<
":" << table_name;
2309 for (
const auto& col_name : col_names) {
2310 oss <<
"," << col_name;
2318 TPixelTableRowResult& _return,
2319 const TSessionId& session_id_or_json,
2320 const int64_t widget_id,
2321 const TPixel& pixel,
2322 const std::map<std::string, std::vector<std::string>>& table_col_names,
2323 const bool column_format,
2324 const int32_t pixel_radius,
2325 const std::string& nonce) {
2329 auto stdlog =
STDLOG(session_ptr,
2358 }
catch (std::exception& e) {
2365 TColumnType col_type;
2389 col_type.col_type.comp_param = 0;
2396 col_type.col_type.comp_param = dd->dictNBits;
2398 col_type.col_type.comp_param =
2411 const TSessionId& session_id_or_json,
2412 const std::string& table_name,
2413 const bool include_system_columns) {
2423 TTableDetails& _return,
2424 const TSessionId& session_id_or_json,
2425 const std::string& table_name,
2426 const std::string& database_name) {
2436 const TSessionId& session_id_or_json,
2437 const std::string& table_name) {
2449 const TSessionId& session_id_or_json,
2450 const std::string& table_name,
2451 const std::string& database_name) {
2466 CHECK(foreign_table);
2467 TTableRefreshInfo refresh_info;
2468 const auto& update_type =
2470 CHECK(update_type.has_value());
2473 }
else if (update_type.value() ==
2475 refresh_info.update_type = TTableRefreshUpdateType::APPEND;
2477 UNREACHABLE() <<
"Unexpected refresh update type: " << update_type.value();
2480 const auto& timing_type =
2482 CHECK(timing_type.has_value());
2484 refresh_info.timing_type = TTableRefreshTimingType::MANUAL;
2485 refresh_info.interval_count = -1;
2486 }
else if (timing_type.value() ==
2488 refresh_info.timing_type = TTableRefreshTimingType::SCHEDULED;
2489 const auto& start_date_time = foreign_table->getOption(
2491 CHECK(start_date_time.has_value());
2492 auto start_date_time_epoch = dateTimeParse<kTIMESTAMP>(start_date_time.value(), 0);
2493 refresh_info.start_date_time =
2495 const auto& interval =
2497 CHECK(interval.has_value());
2498 const auto& interval_str = interval.value();
2499 refresh_info.interval_count =
2500 std::stoi(interval_str.substr(0, interval_str.length() - 1));
2501 auto interval_type = std::toupper(interval_str[interval_str.length() - 1]);
2502 if (interval_type ==
'H') {
2503 refresh_info.interval_type = TTableRefreshIntervalType::HOUR;
2504 }
else if (interval_type ==
'D') {
2505 refresh_info.interval_type = TTableRefreshIntervalType::DAY;
2506 }
else if (interval_type ==
'S') {
2508 refresh_info.interval_type = TTableRefreshIntervalType::NONE;
2510 UNREACHABLE() <<
"Unexpected interval type: " << interval_str;
2513 UNREACHABLE() <<
"Unexpected refresh timing type: " << timing_type.value();
2515 if (foreign_table->last_refresh_time !=
2518 {
kTIMESTAMP}, foreign_table->last_refresh_time);
2520 if (foreign_table->next_refresh_time !=
2523 {
kTIMESTAMP}, foreign_table->next_refresh_time);
2525 return refresh_info;
2531 const std::string& table_name,
2532 const bool get_system,
2533 const bool get_physical,
2534 const std::string& database_name) {
2537 auto cat = (database_name.empty())
2538 ? &session_info->getCatalog()
2543 const auto td_with_lock =
2545 *
cat, table_name,
false);
2546 const auto td = td_with_lock();
2549 bool have_privileges_on_view_sources =
true;
2555 const auto [query_ra, locks] =
parse_to_ra(query_state->createQueryStateProxy(),
2556 query_state->getQueryStr(),
2562 calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
2564 }
catch (
const std::runtime_error&) {
2565 have_privileges_on_view_sources =
false;
2569 validateRelAlg(query_ra.plan_result, query_state->createQueryStateProxy());
2571 throw std::runtime_error(
2572 "Unable to access view " + table_name +
2573 ". The view may not exist, or the logged in user may not "
2574 "have permission to access the view.");
2576 }
catch (
const std::exception& e) {
2577 throw std::runtime_error(
"View '" + table_name +
2578 "' query has failed with an error: '" +
2579 std::string(e.what()) +
2580 "'.\nThe view must be dropped and re-created to "
2581 "resolve the error. \nQuery:\n" +
2582 query_state->getQueryStr());
2586 const auto col_descriptors =
cat->getAllColumnMetadataForTable(
2587 td->tableId, get_system,
true, get_physical);
2588 const auto deleted_cd =
cat->getDeletedColumn(td);
2589 for (
const auto cd : col_descriptors) {
2590 if (cd == deleted_cd) {
2596 throw std::runtime_error(
2597 "Unable to access table " + table_name +
2598 ". The table may not exist, or the logged in user may not "
2599 "have permission to access the table.");
2602 _return.fragment_size = td->maxFragRows;
2603 _return.page_size = td->fragPageSize;
2604 _return.max_rows = td->maxRows;
2606 (have_privileges_on_view_sources ? td->viewSQL
2607 :
"[Not enough privileges to see the view SQL]");
2608 _return.shard_count = td->nShards * std::max(
g_leaf_count,
size_t(1));
2609 if (td->nShards > 0) {
2610 auto cd =
cat->getMetadataForColumn(td->tableId, td->shardedColumnId);
2612 _return.sharded_column_name = cd->columnName;
2614 _return.key_metainfo = td->keyMetainfo;
2616 _return.partition_detail =
2617 td->partitions.empty()
2618 ? TPartitionDetail::DEFAULT
2620 ? TPartitionDetail::REPLICATED
2621 : (td->partitions ==
"SHARDED" ? TPartitionDetail::SHARDED
2622 : TPartitionDetail::OTHER));
2624 _return.table_type = TTableType::VIEW;
2625 }
else if (td->isTemporaryTable()) {
2626 _return.table_type = TTableType::TEMPORARY;
2627 }
else if (td->isForeignTable()) {
2628 _return.table_type = TTableType::FOREIGN;
2631 _return.table_type = TTableType::DEFAULT;
2634 }
catch (
const std::runtime_error& e) {
2640 const TSessionId& session_id_or_json,
2641 const std::string& link) {
2645 auto stdlog =
STDLOG(session_ptr);
2647 auto const&
cat = session_ptr->getCatalog();
2652 _return.view_state = ld->viewState;
2653 _return.view_name = ld->link;
2654 _return.update_time = ld->updateTime;
2655 _return.view_metadata = ld->viewMetadata;
2664 if (user_metadata.isSuper) {
2670 std::vector<DBObject> privObjects = {dbObject};
2678 const std::string& database_name) {
2679 if (database_name.empty()) {
2687 table_names = request_cat->getTableNamesForUser(session_info.
get_currentUser(),
2693 const TSessionId& session_id_or_json) {
2703 const TSessionId& session_id_or_json,
2704 const std::string& database_name) {
2711 *stdlog.getConstSessionInfo(),
2717 const TSessionId& session_id_or_json) {
2726 const TSessionId& session_id_or_json) {
2737 const bool with_table_locks) {
2741 const auto tables =
cat.getAllTableMetadataCopy();
2742 _return.reserve(
tables.size());
2744 for (
const auto& td :
tables) {
2745 if (td.shard >= 0) {
2755 ret.table_name = td.tableName;
2756 ret.is_view = td.isView;
2758 ret.shard_count = td.nShards;
2759 ret.max_rows = td.maxRows;
2760 ret.table_id = td.tableId;
2762 std::vector<TTypeInfo> col_types;
2763 std::vector<std::string> col_names;
2764 size_t num_cols = 0;
2767 TPlanResult parse_result;
2771 const auto query_ra = parse_result.plan_result;
2786 num_cols = result.row_set.row_desc.size();
2787 for (
const auto& col : result.row_set.row_desc) {
2788 if (col.is_physical) {
2792 col_types.push_back(col.col_type);
2793 col_names.push_back(col.col_name);
2795 }
catch (std::exception& e) {
2796 LOG(
WARNING) <<
"get_tables_meta: Ignoring broken view: " << td.tableName;
2801 const auto col_descriptors =
2802 cat.getAllColumnMetadataForTable(td.tableId,
false,
true,
false);
2803 const auto deleted_cd =
cat.getDeletedColumn(&td);
2804 for (
const auto cd : col_descriptors) {
2805 if (cd == deleted_cd) {
2809 col_names.push_back(cd->columnName);
2811 num_cols = col_descriptors.size();
2815 }
catch (
const std::runtime_error& e) {
2820 ret.num_cols = num_cols;
2821 std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2822 std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2824 _return.push_back(ret);
2829 const TSessionId& session_id_or_json) {
2834 auto session_ptr = stdlog.getConstSessionInfo();
2836 stdlog.setQueryState(query_state);
2842 }
catch (
const std::exception& e) {
2848 const TSessionId& session_id_or_json) {
2853 auto session_ptr = stdlog.getConstSessionInfo();
2854 std::list<Catalog_Namespace::UserMetadata> user_list;
2856 if (!session_ptr->get_currentUser().isSuper) {
2858 session_ptr->getCatalog().getCurrentDB().dbId);
2862 for (
auto u : user_list) {
2863 user_names.push_back(u.userName);
2892 auto session_ptr = stdlog.getConstSessionInfo();
2893 if (!session_ptr->get_currentUser().isSuper) {
2906 }
catch (
const std::exception& e) {
2915 auto session_ptr = stdlog.getConstSessionInfo();
2916 if (!session_ptr->get_currentUser().isSuper) {
2929 }
catch (
const std::exception& e) {
2939 auto session_ptr = stdlog.getConstSessionInfo();
2940 if (!session_ptr->get_currentUser().isSuper) {
2953 auto session_ptr = stdlog.getConstSessionInfo();
2954 if (!session_ptr->get_currentUser().isSuper) {
2959 }
catch (
const std::exception& e) {
2967 auto session_ptr = stdlog.getConstSessionInfo();
2968 if (!session_ptr->get_currentUser().isSuper) {
2973 }
catch (
const std::exception& e) {
2979 const TSessionId& leaf_session_id_or_json,
2980 const std::string& start_time_str,
2981 const std::string&
label,
2982 bool for_running_query_kernel) {
2989 auto session_ptr = stdlog.getConstSessionInfo();
2992 executor->enrollQuerySession(parent_request_info.
sessionId(),
2996 for_running_query_kernel
2997 ? QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL
2998 : QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
3002 const TSessionId& leaf_session_id_or_json,
3003 const std::string& start_time_str,
3004 const std::string&
label,
3005 bool for_running_query_kernel) {
3013 executor->clearQuerySessionStatus(parent_request_info.
sessionId(), start_time_str);
3021 const TSessionId& session_id_or_json,
3022 const std::string& memory_level) {
3027 std::vector<Data_Namespace::MemoryInfo> internal_memory;
3028 if (!memory_level.compare(
"gpu")) {
3036 for (
auto memInfo : internal_memory) {
3037 TNodeMemoryInfo nodeInfo;
3038 nodeInfo.page_size = memInfo.pageSize;
3039 nodeInfo.max_num_pages = memInfo.maxNumPages;
3040 nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
3041 nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
3042 for (
auto gpu : memInfo.nodeMemoryData) {
3044 md.slab = gpu.slabNum;
3045 md.start_page = gpu.startPage;
3046 md.num_pages = gpu.numPages;
3047 md.touch = gpu.touch;
3048 md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
3050 nodeInfo.node_memory_data.push_back(md);
3052 _return.push_back(nodeInfo);
3057 const TSessionId& session_id_or_json) {
3062 auto session_ptr = stdlog.getConstSessionInfo();
3063 const auto& user = session_ptr->get_currentUser();
3066 for (
auto& db : dbs) {
3068 dbinfo.db_name = std::move(db.dbName);
3069 dbinfo.db_owner = std::move(db.dbOwnerName);
3070 dbinfos.push_back(std::move(dbinfo));
3075 auto executor =
get_session_ptr(session_id)->get_executor_device_type();
3078 return TExecuteMode::CPU;
3080 return TExecuteMode::GPU;
3085 return TExecuteMode::CPU;
3092 auto stdlog =
STDLOG(session_ptr);
3101 throw std::runtime_error(
"Cannot import a sharded table directly to a leaf");
3106 const std::vector<std::string>& column_names) {
3107 std::unordered_set<std::string> unique_names;
3108 for (
const auto&
name : column_names) {
3110 if (unique_names.find(lower_name) != unique_names.end()) {
3113 unique_names.insert(lower_name);
3116 for (
const auto& cd : descs) {
3117 auto iter = unique_names.find(
to_lower(cd->columnName));
3118 if (iter != unique_names.end()) {
3119 unique_names.erase(iter);
3122 if (!unique_names.empty()) {
3132 const std::vector<std::string>& column_names) {
3133 std::vector<int> desc_to_column_ids;
3134 if (column_names.empty()) {
3136 for (
const auto& cd : descs) {
3137 if (!cd->isGeoPhyCol) {
3138 desc_to_column_ids.push_back(col_idx);
3143 for (
const auto& cd : descs) {
3144 if (!cd->isGeoPhyCol) {
3146 for (
size_t j = 0; j < column_names.size(); ++j) {
3149 desc_to_column_ids.push_back(j);
3154 if (!cd->columnType.get_notnull()) {
3155 desc_to_column_ids.push_back(-1);
3158 "' cannot be omitted due to NOT NULL constraint");
3164 return desc_to_column_ids;
3168 std::ostringstream oss;
3169 oss <<
"Cache size information {";
3173 auto resultset_cache_size =
3174 executor->getResultSetRecyclerHolder()
3175 .getResultSetRecycler()
3176 ->getResultSetRecyclerMetricTracker()
3178 if (resultset_cache_size) {
3179 oss <<
"\"query_resultset\": " << *resultset_cache_size <<
" bytes, ";
3183 auto perfect_join_ht_cache_size =
3186 auto baseline_join_ht_cache_size =
3189 auto bbox_intersect_ht_cache_size =
3193 auto bbox_intersect_ht_tuner_cache_size =
3197 auto sum_hash_table_cache_size =
3198 perfect_join_ht_cache_size + baseline_join_ht_cache_size +
3199 bbox_intersect_ht_cache_size + bbox_intersect_ht_tuner_cache_size;
3200 oss <<
"\"hash_tables\": " << sum_hash_table_cache_size <<
" bytes, ";
3203 auto chunk_metadata_cache_size =
3204 executor->getResultSetRecyclerHolder()
3205 .getChunkMetadataRecycler()
3208 oss <<
"\"chunk_metadata\": " << chunk_metadata_cache_size <<
" bytes, ";
3211 auto query_plan_dag_cache_size =
3212 executor->getQueryPlanDagCache().getCurrentNodeMapSize();
3213 oss <<
"\"query_plan_dag\": " << query_plan_dag_cache_size <<
" bytes, ";
3216 oss <<
"\"compiled_GPU code\": "
3229 std::ostringstream oss;
3238 const TSessionId& session_id,
3240 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
3244 const std::string& table_name) {
3245 auto geo_col_idx = col_idx - 1;
3246 const auto wkt_or_wkb_hex_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
3247 std::vector<std::vector<double>> coords_column, bounds_column;
3248 std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
3250 const bool validate_with_geos_if_available =
false;
3251 if (num_rows != wkt_or_wkb_hex_column->size() ||
3258 validate_with_geos_if_available)) {
3259 std::ostringstream oss;
3260 oss <<
"Invalid geometry in column " << cd->
columnName;
3276 const TSessionId& session_id,
3278 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
3279 const std::list<const ColumnDescriptor*>& cds,
3280 const std::vector<int>& desc_id_to_column_id,
3282 const std::string& table_name) {
3283 size_t skip_physical_cols = 0;
3284 size_t col_idx = 0, import_idx = 0;
3285 for (
const auto& cd : cds) {
3286 if (skip_physical_cols > 0) {
3287 CHECK(cd->isGeoPhyCol);
3288 skip_physical_cols--;
3290 }
else if (cd->columnType.is_geometry()) {
3291 skip_physical_cols = cd->columnType.get_physical_cols();
3293 if (desc_id_to_column_id[import_idx] == -1) {
3294 import_buffers[col_idx]->addDefaultValues(cd, num_rows);
3296 if (cd->columnType.is_geometry()) {
3298 session_id, catalog, import_buffers, cd, col_idx, num_rows, table_name);
3302 col_idx += skip_physical_cols;
3309 std::string
get_load_tag(
const std::string& load_tag,
const std::string& table_name) {
3310 std::ostringstream oss;
3311 oss << load_tag <<
"(" << table_name <<
")";
3316 const std::string& table_name,
3317 const std::string& file_path) {
3318 std::ostringstream oss;
3319 oss << import_tag <<
"(" << table_name <<
", file_path:" << file_path <<
")";
3325 const std::string& table_name,
3326 const std::vector<TRow>&
rows,
3327 const std::vector<std::string>& column_names) {
3334 auto session_ptr = stdlog.getConstSessionInfo();
3341 std::unique_ptr<import_export::Loader> loader;
3342 std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3345 rows.front().cols.size(),
3349 "load_table_binary");
3351 auto col_descs = loader->get_column_descs();
3354 size_t rows_completed = 0;
3355 auto const load_tag =
get_load_tag(
"load_table_binary", table_name);
3357 ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3360 for (
auto const& row : rows) {
3363 for (
auto cd : col_descs) {
3364 auto mapped_idx = desc_id_to_column_id[col_idx];
3365 if (mapped_idx != -1) {
3366 import_buffers[col_idx]->add_value(
3367 cd, row.cols[mapped_idx], row.cols[mapped_idx].is_null);
3372 }
catch (
const std::exception& e) {
3373 for (
size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
3374 import_buffers[col_idx_to_pop]->pop_value();
3376 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
3377 <<
". Row discarded, issue at column : " << (col_idx + 1)
3378 <<
" data :" << row;
3382 session_ptr->getCatalog(),
3385 desc_id_to_column_id,
3389 session_ptr->getCatalog(), table_name);
3390 if (!loader->load(import_buffers, rows.size(), session_ptr.get())) {
3393 }
catch (
const std::exception& e) {
3398 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
3401 const std::string& table_name,
3403 std::unique_ptr<import_export::Loader>* loader,
3404 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers,
3405 const std::vector<std::string>& column_names,
3406 std::string load_type) {
3407 if (num_cols == 0) {
3413 std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
3415 cat, table_name,
true));
3416 const auto td = (*td_with_lock)();
3427 auto col_descs = (*loader)->get_column_descs();
3429 if (column_names.empty()) {
3432 auto geo_physical_cols = std::count_if(
3433 col_descs.begin(), col_descs.end(), [](
auto cd) {
return cd->isGeoPhyCol; });
3434 const auto num_table_cols =
static_cast<size_t>(td->nColumns) - geo_physical_cols -
3435 (td->hasDeletedCol ? 2 : 1);
3436 if (num_cols != num_table_cols) {
3437 throw std::runtime_error(
"Number of columns to load (" +
std::to_string(num_cols) +
3438 ") does not match number of columns in table " +
3442 }
else if (num_cols != column_names.size()) {
3444 "Number of columns specified does not match the "
3445 "number of columns given (" +
3450 return std::move(td_with_lock);
3455 if (!column.nulls.empty()) {
3456 return column.nulls.size();
3461 return column.data.int_col.size() + column.data.arr_col.size() +
3462 column.data.real_col.size() + column.data.str_col.size();
3469 const std::string& table_name,
3470 const std::vector<TColumn>& cols,
3471 const std::vector<std::string>& column_names) {
3477 auto session_ptr = stdlog.getConstSessionInfo();
3480 std::unique_ptr<import_export::Loader> loader;
3481 std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3488 "load_table_binary_columnar");
3490 auto desc_id_to_column_id =
3493 size_t import_idx = 0;
3495 auto const load_tag =
get_load_tag(
"load_table_binary_columnar", table_name);
3497 ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3501 size_t skip_physical_cols = 0;
3502 for (
auto cd : loader->get_column_descs()) {
3503 if (skip_physical_cols > 0) {
3504 CHECK(cd->isGeoPhyCol);
3505 skip_physical_cols--;
3508 auto mapped_idx = desc_id_to_column_id[import_idx];
3509 if (mapped_idx != -1) {
3510 size_t col_rows = import_buffers[col_idx]->add_values(cd, cols[mapped_idx]);
3511 if (col_rows != num_rows) {
3512 std::ostringstream oss;
3513 oss <<
"load_table_binary_columnar: Inconsistent number of rows in column "
3514 << cd->columnName <<
" , expecting " << num_rows <<
" rows, column "
3515 << col_idx <<
" has " << col_rows <<
" rows";
3521 if (cd->columnType.is_geometry()) {
3523 session_ptr->getCatalog(),
3529 skip_physical_cols = cd->columnType.get_physical_cols();
3533 if (cd->columnType.is_geometry()) {
3534 skip_physical_cols = cd->columnType.get_physical_cols();
3535 col_idx += skip_physical_cols;
3541 }
catch (
const std::exception& e) {
3542 std::ostringstream oss;
3543 oss <<
"load_table_binary_columnar: Input exception thrown: " << e.what()
3544 <<
". Issue at column : " << (col_idx + 1) <<
". Import aborted";
3548 session_ptr->getCatalog(),
3550 loader->get_column_descs(),
3551 desc_id_to_column_id,
3555 session_ptr->getCatalog(), table_name);
3556 if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3563 #define ARROW_THRIFT_THROW_NOT_OK(s) \
3565 ::arrow::Status _s = (s); \
3566 if (UNLIKELY(!_s.ok())) { \
3568 ex.error_msg = _s.ToString(); \
3569 LOG(ERROR) << s.ToString(); \
3580 auto stream_buffer =
3581 std::make_shared<arrow::Buffer>(
reinterpret_cast<const uint8_t*
>(stream.c_str()),
3582 static_cast<int64_t>(stream.size()));
3584 arrow::io::BufferReader buf_reader(stream_buffer);
3585 std::shared_ptr<arrow::RecordBatchReader> batch_reader;
3587 arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
3590 std::shared_ptr<arrow::RecordBatch> batch;
3593 if (batch ==
nullptr) {
3596 batches.emplace_back(std::move(batch));
3598 }
catch (
const std::exception& e) {
3599 LOG(
ERROR) <<
"Error parsing Arrow stream: " << e.what() <<
". Import aborted";
3607 const std::string& table_name,
3608 const std::string& arrow_stream,
3609 const bool use_column_names) {
3614 auto session_ptr = stdlog.getConstSessionInfo();
3618 if (batches.size() != 1) {
3622 std::shared_ptr<arrow::RecordBatch> batch = batches[0];
3623 std::unique_ptr<import_export::Loader> loader;
3624 std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3625 std::vector<std::string> column_names;
3626 if (use_column_names) {
3627 column_names = batch->schema()->field_names();
3630 auto schema_read_lock =
3633 static_cast<size_t>(batch->num_columns()),
3637 "load_table_binary_arrow");
3639 auto desc_id_to_column_id =
3641 size_t num_rows = 0;
3645 auto const load_tag =
get_load_tag(
"load_table_binary_arrow", table_name);
3647 ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3651 for (
auto cd : loader->get_column_descs()) {
3652 if (cd->isGeoPhyCol) {
3660 auto mapped_idx = desc_id_to_column_id[col_idx];
3661 if (mapped_idx != -1) {
3662 auto& array = *batch->column(mapped_idx);
3666 size_t col_id = cd->columnId;
3673 num_rows = import_buffers[col_id - 1]->add_arrow_values(
3674 cd, array,
true, row_slice,
nullptr);
3676 if (cd->columnType.is_geometry()) {
3678 session_ptr->getCatalog(),
3689 }
catch (
const std::exception& e) {
3690 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
3691 <<
". Issue at column : " << (col_idx + 1) <<
". Import aborted";
3697 session_ptr->getCatalog(),
3699 loader->get_column_descs(),
3700 desc_id_to_column_id,
3704 session_ptr->getCatalog(), table_name);
3705 if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3711 const std::string& table_name,
3712 const std::vector<TStringRow>&
rows,
3713 const std::vector<std::string>& column_names) {
3720 auto session_ptr = stdlog.getConstSessionInfo();
3725 auto const load_tag =
get_load_tag(
"load_table", table_name);
3727 ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3731 std::unique_ptr<import_export::Loader> loader;
3732 std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3733 auto schema_read_lock =
3736 static_cast<size_t>(rows.front().cols.size()),
3742 auto col_descs = loader->get_column_descs();
3745 size_t rows_completed = 0;
3746 for (
auto const& row : rows) {
3747 size_t import_idx = 0;
3750 size_t skip_physical_cols = 0;
3751 for (
auto cd : col_descs) {
3752 if (skip_physical_cols > 0) {
3753 CHECK(cd->isGeoPhyCol);
3754 skip_physical_cols--;
3757 auto mapped_idx = desc_id_to_column_id[import_idx];
3758 if (mapped_idx != -1) {
3759 import_buffers[col_idx]->add_value(cd,
3760 row.cols[mapped_idx].str_val,
3761 row.cols[mapped_idx].is_null,
3765 if (cd->columnType.is_geometry()) {
3767 skip_physical_cols = cd->columnType.get_physical_cols();
3768 col_idx += skip_physical_cols;
3774 }
catch (
const std::exception& e) {
3775 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
3776 <<
". Row discarded, issue at column : " << (col_idx + 1)
3777 <<
" data :" << row;
3782 if (rows.size() != 0) {
3783 const auto& row = rows[0];
3786 size_t import_idx = 0;
3787 size_t skip_physical_cols = 0;
3788 for (
auto cd : col_descs) {
3789 if (skip_physical_cols > 0) {
3790 skip_physical_cols--;
3793 auto mapped_idx = desc_id_to_column_id[import_idx];
3795 if (cd->columnType.is_geometry()) {
3796 skip_physical_cols = cd->columnType.get_physical_cols();
3797 if (mapped_idx != -1) {
3799 session_ptr->getCatalog(),
3806 col_idx += skip_physical_cols;
3811 }
catch (
const std::exception& e) {
3812 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
3813 <<
". Row discarded, issue at column : " << (col_idx + 1)
3814 <<
" data :" << row;
3819 session_ptr->getCatalog(),
3822 desc_id_to_column_id,
3826 session_ptr->getCatalog(), table_name);
3827 if (!loader->load(import_buffers, rows_completed, session_ptr.get())) {
3831 }
catch (
const std::exception& e) {
3838 if (str.size() == 2 && str[0] ==
'\\') {
3839 if (str[1] ==
't') {
3841 }
else if (str[1] ==
'n') {
3843 }
else if (str[1] ==
'0') {
3845 }
else if (str[1] ==
'\'') {
3847 }
else if (str[1] ==
'\\') {
3856 switch (cp.has_header) {
3857 case TImportHeaderRow::AUTODETECT:
3860 case TImportHeaderRow::NO_HEADER:
3863 case TImportHeaderRow::HAS_HEADER:
3869 copy_params.
quoted = cp.quoted;
3870 if (cp.delimiter.length() > 0) {
3875 if (cp.null_str.length() > 0) {
3876 copy_params.
null_str = cp.null_str;
3878 if (cp.quote.length() > 0) {
3881 if (cp.escape.length() > 0) {
3884 if (cp.line_delim.length() > 0) {
3887 if (cp.array_delim.length() > 0) {
3890 if (cp.array_begin.length() > 0) {
3893 if (cp.array_end.length() > 0) {
3896 if (cp.threads != 0) {
3897 copy_params.
threads = cp.threads;
3899 if (cp.s3_access_key.length() > 0) {
3902 if (cp.s3_secret_key.length() > 0) {
3905 if (cp.s3_session_token.length() > 0) {
3908 if (cp.s3_region.length() > 0) {
3911 if (cp.s3_endpoint.length() > 0) {
3916 cp.s3_secret_key.length() == 0 && cp.s3_session_token.length() == 0) {
3917 const auto& server_credentials =
3918 Aws::Auth::DefaultAWSCredentialsProviderChain().GetAWSCredentials();
3919 copy_params.
s3_access_key = server_credentials.GetAWSAccessKeyId();
3920 copy_params.
s3_secret_key = server_credentials.GetAWSSecretKey();
3925 switch (cp.source_type) {
3926 case TSourceType::DELIMITED_FILE:
3929 case TSourceType::GEO_FILE:
3932 case TSourceType::PARQUET_FILE:
3933 #ifdef ENABLE_IMPORT_PARQUET
3939 case TSourceType::ODBC:
3941 case TSourceType::RASTER_FILE:
3948 switch (cp.geo_coords_encoding) {
3949 case TEncodingType::GEOINT:
3952 case TEncodingType::NONE:
3960 switch (cp.geo_coords_type) {
3961 case TDatumType::GEOGRAPHY:
3964 case TDatumType::GEOMETRY:
3971 switch (cp.geo_coords_srid) {
3985 switch (cp.raster_point_type) {
3986 case TRasterPointType::NONE:
3989 case TRasterPointType::AUTO:
3992 case TRasterPointType::SMALLINT:
3995 case TRasterPointType::INT:
3998 case TRasterPointType::FLOAT:
4001 case TRasterPointType::DOUBLE:
4004 case TRasterPointType::POINT:
4011 if (cp.raster_scanlines_per_thread < 0) {
4017 switch (cp.raster_point_transform) {
4018 case TRasterPointTransform::NONE:
4021 case TRasterPointTransform::AUTO:
4024 case TRasterPointTransform::FILE:
4027 case TRasterPointTransform::WORLD:
4035 copy_params.
dsn = cp.odbc_dsn;
4039 copy_params.
username = cp.odbc_username;
4040 copy_params.
password = cp.odbc_password;
4050 TCopyParams copy_params;
4052 copy_params.null_str = cp.
null_str;
4055 copy_params.has_header = TImportHeaderRow::AUTODETECT;
4058 copy_params.has_header = TImportHeaderRow::NO_HEADER;
4061 copy_params.has_header = TImportHeaderRow::HAS_HEADER;
4066 copy_params.quoted = cp.
quoted;
4067 copy_params.quote = cp.
quote;
4068 copy_params.escape = cp.
escape;
4073 copy_params.threads = cp.
threads;
4081 copy_params.source_type = TSourceType::DELIMITED_FILE;
4084 copy_params.source_type = TSourceType::GEO_FILE;
4087 copy_params.source_type = TSourceType::PARQUET_FILE;
4090 copy_params.source_type = TSourceType::RASTER_FILE;
4093 copy_params.source_type = TSourceType::ODBC;
4100 copy_params.geo_coords_encoding = TEncodingType::GEOINT;
4103 copy_params.geo_coords_encoding = TEncodingType::NONE;
4109 copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
4112 copy_params.geo_coords_type = TDatumType::GEOMETRY;
4120 copy_params.geo_assign_render_groups =
false;
4125 copy_params.raster_point_type = TRasterPointType::NONE;
4128 copy_params.raster_point_type = TRasterPointType::AUTO;
4131 copy_params.raster_point_type = TRasterPointType::SMALLINT;
4134 copy_params.raster_point_type = TRasterPointType::INT;
4137 copy_params.raster_point_type = TRasterPointType::FLOAT;
4140 copy_params.raster_point_type = TRasterPointType::DOUBLE;
4143 copy_params.raster_point_type = TRasterPointType::POINT;
4152 copy_params.raster_point_transform = TRasterPointTransform::NONE;
4155 copy_params.raster_point_transform = TRasterPointTransform::AUTO;
4158 copy_params.raster_point_transform = TRasterPointTransform::FILE;
4161 copy_params.raster_point_transform = TRasterPointTransform::WORLD;
4168 copy_params.odbc_dsn = cp.
dsn;
4172 copy_params.odbc_username = cp.
username;
4173 copy_params.odbc_password = cp.
password;
4188 if (boost::istarts_with(path,
"http://") || boost::istarts_with(path,
"https://")) {
4189 if (!gdal_network) {
4191 "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
4194 path =
"/vsicurl/" + path;
4195 }
else if (boost::istarts_with(path,
"s3://")) {
4196 if (!gdal_network) {
4198 "S3 geo file import not supported! Update to GDAL 2.2 or later!");
4201 boost::replace_first(path,
"s3://",
"/vsis3/");
4207 if (boost::iends_with(path,
".gz") && !boost::iends_with(path,
".tar.gz")) {
4208 path =
"/vsigzip/" + path;
4214 if (boost::iends_with(path,
".zip")) {
4216 path =
"/vsizip/" + path;
4217 }
else if (boost::iends_with(path,
".tar") || boost::iends_with(path,
".tgz") ||
4218 boost::iends_with(path,
".tar.gz")) {
4220 path =
"/vsitar/" + path;
4225 std::string path(path_in);
4228 if (boost::istarts_with(path,
"/vsizip/")) {
4229 boost::replace_first(path,
"/vsizip/",
"");
4230 }
else if (boost::istarts_with(path,
"/vsitar/")) {
4231 boost::replace_first(path,
"/vsitar/",
"");
4232 }
else if (boost::istarts_with(path,
"/vsigzip/")) {
4233 boost::replace_first(path,
"/vsigzip/",
"");
4237 if (boost::istarts_with(path,
"/vsicurl/")) {
4238 boost::replace_first(path,
"/vsicurl/",
"");
4239 }
else if (boost::istarts_with(path,
"/vsis3/")) {
4240 boost::replace_first(path,
"/vsis3/",
"s3://");
4247 if (boost::istarts_with(path,
"s3://") || boost::istarts_with(path,
"http://") ||
4248 boost::istarts_with(path,
"https://")) {
4251 return !boost::filesystem::path(path).is_absolute();
4255 auto filename = boost::filesystem::path(path).filename().string();
4269 if (boost::iends_with(path,
".shp") || boost::iends_with(path,
".geojson") ||
4270 boost::iends_with(path,
".json") || boost::iends_with(path,
".kml") ||
4271 boost::iends_with(path,
".kmz") || boost::iends_with(path,
".gdb") ||
4272 boost::iends_with(path,
".gdb.zip") || boost::iends_with(path,
".fgb")) {
4282 if (boost::iends_with(path,
".zip") && !boost::iends_with(path,
".gdb.zip")) {
4284 }
else if (boost::iends_with(path,
".tar") || boost::iends_with(path,
".tgz") ||
4285 boost::iends_with(path,
".tar.gz")) {
4294 std::vector<std::string> files =
4298 LOG(
INFO) <<
"Found " << files.size() <<
" files in Archive "
4300 for (
const auto& file : files) {
4305 bool found_suitable_file =
false;
4306 std::string file_name;
4307 for (
const auto& file : files) {
4310 found_suitable_file =
true;
4316 if (!found_suitable_file) {
4317 LOG(
INFO) <<
"Failed to find any supported geo files in Archive: " +
4327 return (!boost::istarts_with(file_path,
"s3://") &&
4328 !boost::istarts_with(file_path,
"http://") &&
4329 !boost::istarts_with(file_path,
"https://"));
4341 const TSessionId& session_id_or_json,
4342 const std::string& file_name_in,
4343 const TCopyParams& cp) {
4350 bool is_raster =
false;
4351 boost::filesystem::path file_path;
4354 std::string file_name{file_name_in};
4358 picosha2::hash256_hex_string(request_info.
sessionId()) /
4359 boost::filesystem::path(file_name).filename();
4360 file_name = temp_file_path.string();
4373 CHECK(!file_paths.empty());
4374 file_name = file_paths[0];
4389 if (geo_file.size()) {
4390 file_name = file_name + std::string(
"/") + geo_file;
4404 file_path = boost::filesystem::path(file_name);
4406 if (!boost::istarts_with(file_name,
"s3://")) {
4407 if (!boost::filesystem::path(file_name).is_absolute()) {
4409 picosha2::hash256_hex_string(request_info.
sessionId()) /
4410 boost::filesystem::path(file_name).filename();
4411 file_name = file_path.string();
4419 "\" does not exist.")
4425 "\" does not exist.");
4433 #ifdef ENABLE_IMPORT_PARQUET
4439 std::vector<std::string> headers = detector.
get_headers();
4443 _return.row_set.row_desc.resize(best_types.size());
4444 for (
size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
4446 auto& ti = best_types[col_idx];
4447 col.col_type.precision = ti.get_precision();
4448 col.col_type.scale = ti.get_scale();
4449 col.col_type.comp_param = ti.get_comp_param();
4450 if (ti.is_geometry()) {
4454 col.col_type.precision =
static_cast<int>(copy_params.
geo_coords_type);
4460 if (ti.is_array()) {
4461 col.col_type.is_array =
true;
4466 col.col_name = headers[col_idx];
4469 _return.row_set.row_desc[col_idx] = col;
4474 for (
auto row : sample_data) {
4475 sample_row.cols.clear();
4476 for (
const auto& s : row) {
4479 td.is_null = s.empty();
4480 sample_row.cols.push_back(td);
4482 _return.row_set.rows.push_back(sample_row);
4489 for (
auto cd : cds) {
4497 std::map<std::string, std::vector<std::string>> sample_data;
4503 if (sample_data.size() > 0) {
4504 for (
size_t i = 0; i < sample_data.begin()->second.size(); i++) {
4506 for (
auto cd : cds) {
4508 td.val.str_val = sample_data[cd.sourceName].at(i);
4509 td.is_null = td.val.str_val.empty();
4510 sample_row.cols.push_back(td);
4512 _return.row_set.rows.push_back(sample_row);
4518 }
catch (
const std::exception& e) {
4524 const TSessionId& session_id_or_json,
4525 const int64_t widget_id,
4526 const std::string& vega_json,
4527 const int compression_level,
4528 const std::string& nonce) {
4534 "compression_level",
4541 stdlog.appendNameValuePairs(
"nonce", nonce);
4549 auto& non_const_vega_json =
const_cast<std::string&
>(vega_json);
4554 stdlog.getSessionInfo(),
4556 std::move(non_const_vega_json),
4559 }
catch (std::exception& e) {
4566 int32_t dashboard_id,
4571 object.loadKey(catalog);
4572 object.setPrivileges(requestedPermissions);
4573 std::vector<DBObject> privs = {
object};
4583 const TCustomExpression& t_custom_expr,
4585 if (t_custom_expr.data_source_name.empty()) {
4588 CHECK(t_custom_expr.data_source_type == TDataSourceType::type::TABLE)
4589 <<
"Unexpected data source type: "
4590 <<
static_cast<int>(t_custom_expr.data_source_type);
4594 t_custom_expr.data_source_name +
"\" that does not exist.")
4597 return std::make_unique<CustomExpression>(
4598 t_custom_expr.name, t_custom_expr.expression_json, data_source_type, td->tableId);
4603 TCustomExpression t_custom_expr;
4604 t_custom_expr.id = custom_expr.
id;
4605 t_custom_expr.name = custom_expr.
name;
4608 t_custom_expr.is_deleted = custom_expr.
is_deleted;
4610 <<
"Unexpected data source type: "
4612 t_custom_expr.data_source_type = TDataSourceType::type::TABLE;
4615 t_custom_expr.data_source_name = td->
tableName;
4618 <<
"Custom expression references a deleted data source. Custom expression id: "
4619 << custom_expr.
id <<
", name: " << custom_expr.
name;
4621 return t_custom_expr;
4626 const TCustomExpression& t_custom_expr) {
4633 auto session_ptr = stdlog.getConstSessionInfo();
4634 if (!session_ptr->get_currentUser().isSuper) {
4637 auto& catalog = session_ptr->getCatalog();
4639 return catalog.createCustomExpression(
4644 const TSessionId& session_id_or_json) {
4650 auto session_ptr = stdlog.getConstSessionInfo();
4651 auto& catalog = session_ptr->getCatalog();
4653 auto custom_expressions =
4654 catalog.getCustomExpressionsForUser(session_ptr->get_currentUser());
4655 for (
const auto& custom_expression : custom_expressions) {
4662 const std::string& expression_json) {
4669 auto session_ptr = stdlog.getConstSessionInfo();
4670 if (!session_ptr->get_currentUser().isSuper) {
4673 auto& catalog = session_ptr->getCatalog();
4675 catalog.updateCustomExpression(
id, expression_json);
4679 const TSessionId& session_id_or_json,
4680 const std::vector<int32_t>& custom_expression_ids,
4681 const bool do_soft_delete) {
4688 auto session_ptr = stdlog.getConstSessionInfo();
4689 if (!session_ptr->get_currentUser().isSuper) {
4692 auto& catalog = session_ptr->getCatalog();
4694 catalog.deleteCustomExpressions(custom_expression_ids, do_soft_delete);
4699 const TSessionId& session_id_or_json,
4700 const int32_t dashboard_id) {
4705 auto session_ptr = stdlog.getConstSessionInfo();
4706 auto const&
cat = session_ptr->getCatalog();
4708 auto dash =
cat.getMetadataForDashboard(dashboard_id);
4724 const TSessionId& session_id_or_json) {
4729 auto session_ptr = stdlog.getConstSessionInfo();
4730 auto const&
cat = session_ptr->getCatalog();
4732 const auto dashes =
cat.getAllDashboardsMetadata();
4734 for (
const auto dash : dashes) {
4746 const std::shared_ptr<Catalog_Namespace::SessionInfo const>& session_ptr,
4749 const bool populate_state) {
4750 auto const&
cat = session_ptr->getCatalog();
4753 cat.getCurrentDB().dbId,
4756 TDashboard dashboard;
4758 if (populate_state) {
4765 dashboard.dashboard_owner = dash->
user;
4766 TDashboardPermissions perms;
4768 if (session_ptr->get_currentUser().isSuper) {
4769 perms.create_ =
true;
4770 perms.delete_ =
true;
4778 obj_to_find.loadKey(
cat);
4779 std::vector<std::string> grantees =
4781 session_ptr->get_currentUser().isSuper,
4782 session_ptr->get_currentUser().userName);
4783 for (
const auto& grantee : grantees) {
4786 if (gr && (object_found = gr->findDbObject(obj_to_find.getObjectKey(),
true))) {
4795 dashboard.dashboard_permissions = perms;
4796 if (objects_list.empty() ||
4797 (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.
userName)) {
4798 dashboard.is_dash_shared =
false;
4800 dashboard.is_dash_shared =
true;
4805 namespace dbhandler {
4813 std::string error_message{
"Write requests/queries are not allowed in the " +
4815 if (throw_db_exception) {
4818 throw std::runtime_error(error_message);
4825 const std::string& dashboard_name,
4826 const std::string& dashboard_state,
4827 const std::string& image_hash,
4828 const std::string& dashboard_metadata) {
4833 auto session_ptr = stdlog.getConstSessionInfo();
4836 auto&
cat = session_ptr->getCatalog();
4855 dd.
userId = session_ptr->get_currentUser().userId;
4856 dd.
user = session_ptr->get_currentUser().userName;
4859 auto id =
cat.createDashboard(dd);
4864 }
catch (
const std::exception& e) {
4870 const int32_t dashboard_id,
4871 const std::string& dashboard_name,
4872 const std::string& dashboard_owner,
4873 const std::string& dashboard_state,
4874 const std::string& image_hash,
4875 const std::string& dashboard_metadata) {
4880 auto session_ptr = stdlog.getConstSessionInfo();
4883 auto&
cat = session_ptr->getCatalog();
4893 if (
auto dash =
cat.getMetadataForDashboard(
4894 std::to_string(session_ptr->get_currentUser().userId), dashboard_name)) {
4895 if (dash->dashboardId != dashboard_id) {
4911 dd.
user = dashboard_owner;
4915 cat.replaceDashboard(dd);
4916 }
catch (
const std::exception& e) {
4922 const int32_t dashboard_id) {
4927 const std::vector<int32_t>& dashboard_ids) {
4932 auto session_ptr = stdlog.getConstSessionInfo();
4934 auto&
cat = session_ptr->getCatalog();
4940 cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
4941 }
catch (
const std::exception& e) {
4947 int32_t dashboard_id,
4948 std::vector<std::string> groups) {
4952 auto&
cat = session_info.getCatalog();
4953 auto dash = cat.getMetadataForDashboard(dashboard_id);
4957 }
else if (session_info.get_currentUser().userId != dash->userId &&
4958 !session_info.get_currentUser().isSuper) {
4959 throw std::runtime_error(
4960 "User should be either owner of dashboard or super user to share/unshare it");
4962 std::vector<std::string> valid_groups;
4964 for (
auto& group : groups) {
4968 }
else if (!user_meta.
isSuper) {
4969 valid_groups.push_back(group);
4972 return valid_groups;
4976 for (
auto const& group : groups) {
4985 const std::vector<int32_t>& dashboard_ids) {
4987 std::map<std::string, std::list<int32_t>> errors;
4988 for (
auto const& dashboard_id : dashboard_ids) {
4989 auto dashboard =
cat.getMetadataForDashboard(dashboard_id);
4991 errors[
"Dashboard id does not exist"].push_back(dashboard_id);
4994 errors[
"User should be either owner of dashboard or super user to share/unshare it"]
4995 .push_back(dashboard_id);
4998 if (!errors.empty()) {
4999 std::stringstream error_stream;
5000 error_stream <<
"Share/Unshare dashboard(s) failed with error(s)\n";
5001 for (
const auto& [error, id_list] : errors) {
5002 error_stream <<
"Dashboard ids " <<
join(id_list,
", ") <<
": " << error <<
"\n";
5009 const std::vector<int32_t>& dashboard_ids,
5010 const std::vector<std::string>& groups,
5011 const TDashboardPermissions& permissions,
5012 const bool do_share) {
5015 check_read_only(do_share ?
"share_dashboards" :
"unshare_dashboards");
5016 if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
5017 !permissions.view_) {
5019 std::string(do_share ?
"grants" :
"revokes"));
5021 auto session_ptr = stdlog.getConstSessionInfo();
5022 auto const& catalog = session_ptr->getCatalog();
5026 std::vector<DBObject> batch_objects;
5027 for (
auto const& dashboard_id : dashboard_ids) {
5030 if (permissions.delete_) {
5033 if (permissions.create_) {
5036 if (permissions.edit_) {
5039 if (permissions.view_) {
5042 object.setPrivileges(privs);
5043 batch_objects.push_back(
object);
5046 sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
5048 sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
5053 const std::vector<int32_t>& dashboard_ids,
5054 const std::vector<std::string>& groups,
5055 const TDashboardPermissions& permissions) {
5059 request_info.
sessionId(), dashboard_ids, groups, permissions,
true);
5064 const int32_t dashboard_id,
5065 const std::vector<std::string>& groups,
5066 const std::vector<std::string>& objects,
5067 const TDashboardPermissions& permissions,
5068 const bool grant_role =
false) {
5073 const std::vector<int32_t>& dashboard_ids,
5074 const std::vector<std::string>& groups,
5075 const TDashboardPermissions& permissions) {
5079 request_info.
sessionId(), dashboard_ids, groups, permissions,
false);
5083 const int32_t dashboard_id,
5084 const std::vector<std::string>& groups,
5085 const std::vector<std::string>& objects,
5086 const TDashboardPermissions& permissions) {
5091 std::vector<TDashboardGrantees>& dashboard_grantees,
5092 const TSessionId& session_id_or_json,
5093 const int32_t dashboard_id) {
5098 auto session_ptr = stdlog.getConstSessionInfo();
5099 auto const&
cat = session_ptr->getCatalog();
5101 auto dash =
cat.getMetadataForDashboard(dashboard_id);
5105 }
else if (session_ptr->get_currentUser().userId != dash->userId &&
5106 !session_ptr->get_currentUser().isSuper) {
5108 "User should be either owner of dashboard or super user to access grantees");
5110 std::vector<ObjectRoleDescriptor*> objectsList;
5112 cat.getCurrentDB().dbId,
5118 for (
auto object : objectsList) {
5119 if (user_meta.
userName == object->roleName) {
5123 TDashboardGrantees grantee;
5124 TDashboardPermissions perm;
5125 grantee.name =
object->roleName;
5126 grantee.is_user =
object->roleType;
5131 grantee.permissions = perm;
5132 dashboard_grantees.push_back(grantee);
5137 const TSessionId& session_id_or_json,
5138 const std::string& view_state,
5139 const std::string& view_metadata) {
5144 auto session_ptr = stdlog.getConstSessionInfo();
5146 auto&
cat = session_ptr->getCatalog();
5149 ld.
userId = session_ptr->get_currentUser().userId;
5154 _return =
cat.createLink(ld, 6);
5155 }
catch (
const std::exception& e) {
5161 const std::string&
name,
5162 const bool is_array) {
5165 ct.col_type.type =
type;
5166 ct.col_type.is_array = is_array;
5172 const std::list<std::string> shp_ext{
".shp",
".shx",
".dbf"};
5173 if (std::find(shp_ext.begin(),
5175 boost::algorithm::to_lower_copy(file_path.extension().string())) !=
5177 for (
auto ext : shp_ext) {
5178 auto aux_file = file_path;
5180 aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
5183 aux_file.replace_extension(ext).string(), copy_params)) {
5184 throw std::runtime_error(
"required file for shapefile does not exist: " +
5185 aux_file.filename().string());
5192 const std::string& table_name,
5193 const TRowDescriptor& rd,
5194 const TCreateParams& create_params) {
5197 auto stdlog =
STDLOG(
"table_name", table_name);
5209 std::string stmt{
"CREATE TABLE " + table_name};
5210 std::vector<std::string> col_stmts;
5212 for (
auto col : rds) {
5218 if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
5219 col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
5221 " for column: " + col.col_name);
5224 if (col.col_type.type == TDatumType::DECIMAL) {
5226 if (col.col_type.precision == 0 && col.col_type.scale == 0) {
5227 col.col_type.precision = 14;
5228 col.col_type.scale = 7;
5232 std::string col_stmt;
5233 col_stmt.append(col.col_name +
" " +
thrift_to_name(col.col_type));
5234 if (col.__isset.default_value) {
5235 col_stmt.append(
" DEFAULT " + col.default_value);
5248 col_stmt.append(
"(" +
std::to_string(col.col_type.comp_param) +
")");
5250 }
else if (col.col_type.type == TDatumType::STR) {
5252 col_stmt.append(
" ENCODING NONE");
5253 }
else if (col.col_type.type == TDatumType::POINT ||
5254 col.col_type.type == TDatumType::MULTIPOINT ||
5255 col.col_type.type == TDatumType::LINESTRING ||
5256 col.col_type.type == TDatumType::MULTILINESTRING ||
5257 col.col_type.type == TDatumType::POLYGON ||
5258 col.col_type.type == TDatumType::MULTIPOLYGON) {
5260 if (col.col_type.scale == 4326) {
5261 col_stmt.append(
" ENCODING NONE");
5264 col_stmts.push_back(col_stmt);
5269 if (create_params.is_replicated) {
5270 stmt.append(
" WITH (PARTITIONS = 'REPLICATED')");
5281 const std::string& table_name,
5282 const std::string& file_name_in,
5283 const TCopyParams& cp) {
5290 auto session_ptr = stdlog.getConstSessionInfo();
5292 LOG(
INFO) <<
"import_table " << table_name <<
" from " << file_name_in;
5295 auto&
cat = session_ptr->getCatalog();
5299 executor->enrollQuerySession(request_info.
sessionId(),
5303 QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5309 executor->clearQuerySessionStatus(request_info.sessionId(),
start_time);
5312 const auto td_with_lock =
5315 const auto td = td_with_lock();
5319 std::string copy_from_source;
5324 std::string file_name{file_name_in};
5325 auto file_path = boost::filesystem::path(file_name);
5326 if (!boost::istarts_with(file_name,
"s3://")) {
5327 if (!boost::filesystem::path(file_name).is_absolute()) {
5329 picosha2::hash256_hex_string(request_info.sessionId()) /
5330 boost::filesystem::path(file_name).filename();
5331 file_name = file_path.string();
5335 "\" does not exist.");
5343 if (boost::filesystem::path(file_path).extension() ==
".tsv") {
5347 copy_from_source = file_path.string();
5349 auto const load_tag =
get_import_tag(
"import_table", table_name, copy_from_source);
5351 ScopeGuard cleanup = [&load_tag, &session_ptr]() {
5355 session_ptr->getCatalog(), table_name);
5356 std::unique_ptr<import_export::AbstractImporter> importer;
5359 LOG(
INFO) <<
"Total Import Time: " << (double)ms / 1000.0 <<
" Seconds.";
5360 }
catch (
const TDBException& e) {
5362 }
catch (
const std::exception& e) {
5374 return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
5375 t == TDatumType::LINESTRING || t == TDatumType::MULTILINESTRING ||
5376 t == TDatumType::POINT || t == TDatumType::MULTIPOINT);
5380 std::stringstream ss;
5386 const std::string& file_path,
5387 const std::string& column_name,
5388 const std::string& attr,
5389 const std::string& got,
5390 const std::string& expected) {
5391 return "Issue encountered in geo/raster file '" + file_path +
5392 "' while appending to table '" + table_name +
"'. Column '" + column_name +
5393 "' " + attr +
" mismatch (got '" + got +
"', expected '" + expected +
"')";
5398 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
5399 THROW_DB_EXCEPTION("Could not append geo/raster file '" + \
5400 file_path.filename().string() + "' to table '" + table_name + \
5401 "'. Column '" + cd->columnName + "' " + attr + " mismatch (got '" + \
5402 got + "', expected '" + expected + "')");
5405 const std::string& table_name,
5406 const std::string& file_name,
5407 const TCopyParams& cp,
5408 const TRowDescriptor& row_desc,
5409 const TCreateParams& create_params) {
5424 const std::string& table_name,
5425 const std::string& file_name,
5427 const TRowDescriptor& row_desc,
5428 const TCreateParams& create_params) {
5432 std::vector<std::string> file_names;
5441 file_names.push_back(file_name);
5444 for (
auto const& file_name : file_names) {
5446 session_id, table_name, file_name, copy_params, row_desc, create_params);
5451 const std::string& table_name,
5452 const std::string& file_name_in,
5454 const TRowDescriptor& row_desc,
5455 const TCreateParams& create_params) {
5458 auto session_ptr = stdlog.getConstSessionInfo();
5461 auto&
cat = session_ptr->getCatalog();
5465 executor->enrollQuerySession(session_id,
5469 QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5475 executor->clearQuerySessionStatus(session_id,
start_time);
5479 std::string file_name{file_name_in};
5483 auto file_path =
import_path_ / picosha2::hash256_hex_string(session_id) /
5484 boost::filesystem::path(file_name).filename();
5485 file_name = file_path.string();
5489 bool is_raster =
false;
5501 if (geo_file.size()) {
5502 file_name = file_name + std::string(
"/") + geo_file;
5515 THROW_DB_EXCEPTION(
"import_geo_table called with file_type other than GEO or RASTER");
5519 VLOG(1) <<
"import_geo_table: Original filename: " << file_name_in;
5520 VLOG(1) <<
"import_geo_table: Actual filename: " << file_name;
5521 VLOG(1) <<
"import_geo_table: Raster: " << is_raster;
5522 auto const load_tag =
get_import_tag(
"import_geo_table", table_name, file_name);
5524 ScopeGuard cleanup = [&load_tag, &session_ptr]() {
5528 auto file_path = boost::filesystem::path(file_name);
5536 }
catch (
const std::exception& e) {
5546 std::vector<import_export::Importer::GeoFileLayerInfo> layer_info;
5551 }
catch (
const std::exception& e) {
5557 using LayerNameToContentsMap =
5558 std::map<std::string, import_export::Importer::GeoFileLayerContents>;
5559 LayerNameToContentsMap load_layers;
5561 <<
"import_geo_table: Found the following layers in the geo file:";
5562 for (
const auto& layer : layer_info) {
5563 switch (layer.contents) {
5565 LOG(
INFO) <<
"import_geo_table: '" << layer.name
5566 <<
"' (will import as geo table)";
5567 load_layers[layer.name] = layer.contents;
5570 LOG(
INFO) <<
"import_geo_table: '" << layer.name
5571 <<
"' (will import as regular table)";
5572 load_layers[layer.name] = layer.contents;
5575 LOG(
WARNING) <<
"import_geo_table: '" << layer.name
5576 <<
"' (will not import, unsupported geo type)";
5579 LOG(
INFO) <<
"import_geo_table: '" << layer.name <<
"' (ignoring, empty)";
5587 if (!is_raster && load_layers.size() == 0) {
5595 for (
const auto& layer : layer_info) {
5600 load_layers.clear();
5601 load_layers[layer.name] = layer.contents;
5604 }
else if (layer.contents ==
5608 }
else if (layer.contents ==
5623 if (!is_raster && row_desc.size() > 0 && load_layers.size() > 1) {
5625 "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
5630 auto construct_layer_table_name = [&load_layers](
const std::string& table_name,
5631 const std::string& layer_name) {
5632 if (load_layers.size() > 1) {
5634 if (sanitized_layer_name != layer_name) {
5635 LOG(
INFO) <<
"import_geo_table: Using sanitized layer name '"
5636 << sanitized_layer_name <<
"' for table name";
5638 return table_name +
"_" + sanitized_layer_name;
5644 if (!is_raster && load_layers.size() > 1) {
5645 for (
const auto& layer : load_layers) {
5647 auto this_table_name = construct_layer_table_name(table_name, layer.first);
5650 if (
cat.getMetadataForTable(this_table_name)) {
5652 "' already exists, aborting!");
5659 std::vector<std::string> caught_exception_messages;
5662 double total_import_ms = 0.0;
5673 for (
const auto& layer : load_layers) {
5675 const auto& layer_name = layer.first;
5676 const auto& layer_contents = layer.second;
5681 auto this_table_name = construct_layer_table_name(table_name, layer_name);
5684 LOG(
INFO) <<
"import_geo_table: Creating table: " << this_table_name;
5688 if (row_desc.size() > 0) {
5700 cp_copy.geo_layer_name = layer_name;
5703 }
catch (
const std::exception& e) {
5705 caught_exception_messages.emplace_back(
"Column Type Detection failed for '" +
5706 layer_name +
"':" + e.what());
5709 rd = cds.row_set.row_desc;
5715 create_table(session_id, this_table_name, rd, create_params);
5716 }
catch (
const std::exception& e) {
5718 caught_exception_messages.emplace_back(
"Failed to create table for Layer '" +
5719 layer_name +
"':" + e.what());
5729 std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>> td_with_lock;
5730 std::unique_ptr<lockmgr::WriteLock> insert_data_lock;
5734 std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
5737 td = (*td_with_lock)();
5738 insert_data_lock = std::make_unique<lockmgr::WriteLock>(
5740 }
catch (
const std::runtime_error& e) {
5742 std::string exception_message =
"Could not import geo/raster file '" +
5743 file_path.filename().string() +
"' to table '" +
5745 "'; table does not exist or failed to create.";
5746 caught_exception_messages.emplace_back(exception_message);
5753 const auto col_descriptors =
5754 cat.getAllColumnMetadataForTable(td->tableId,
false,
false,
false);
5757 if (col_descriptors.size() != rd.size()) {
5759 std::string exception_message =
"Could not append geo/raster file '" +
5760 file_path.filename().string() +
"' to table '" +
5761 this_table_name +
"'. Column count mismatch (got " +
5764 caught_exception_messages.emplace_back(exception_message);
5772 for (
auto const* cd : col_descriptors) {
5777 auto const gtype = rd[rd_index].col_type.type;
5778 auto const etype = cd_col_type.col_type.type;
5780 if (gtype != etype) {
5792 auto const gname = rd[rd_index].col_name;
5793 auto const ename = cd->columnName;
5794 if (gname != ename) {
5796 LOG(
INFO) <<
"import_geo_table: Renaming incoming geo column to match "
5797 "existing table column name '"
5799 rd[rd_index].col_name = ename;
5804 file_path.filename().string(),
5816 }
catch (
const std::exception& e) {
5818 caught_exception_messages.emplace_back(e.what());
5822 std::map<std::string, std::string> colname_to_src;
5824 colname_to_src[r.col_name] =
5830 }
catch (
const std::exception& e) {
5832 caught_exception_messages.emplace_back(e.what());
5836 if (!is_raster && is_geo_layer) {
5846 int num_geo_columns{0};
5847 for (
auto const& col : rd) {
5852 if (num_geo_columns != 1) {
5853 std::string exception_message =
5854 "Table '" + this_table_name +
5855 "' must have exactly one geo column. Import aborted!";
5856 caught_exception_messages.emplace_back(exception_message);
5861 std::string layer_or_raster = is_raster ?
"Raster" :
"Layer";
5869 std::unique_ptr<import_export::Importer> importer;
5875 [&]() { importer->importGDAL(colname_to_src, session_ptr.get(), is_raster); });
5876 LOG(
INFO) <<
"Import of " << layer_or_raster <<
" '" << layer_name <<
"' took "
5877 << (double)ms / 1000.0 <<
"s";
5878 total_import_ms += ms;
5879 }
catch (
const std::exception& e) {
5880 std::string exception_message =
"Import of " + layer_or_raster +
" '" +
5881 this_table_name +
"' failed: " + e.what();
5882 caught_exception_messages.emplace_back(exception_message);
5888 if (caught_exception_messages.size()) {
5890 std::string combined_exception_message =
"Failed to import geo/raster file: ";
5892 for (
const auto& message : caught_exception_messages) {
5893 combined_exception_message += comma ? (
", " + message) : message;
5899 LOG(
INFO) <<
"Import Successful!";
5900 LOG(
INFO) <<
"Total Import Time: " << total_import_ms / 1000.0 <<
"s";
5904 #undef THROW_COLUMN_ATTR_MISMATCH_EXCEPTION
5907 const TSessionId& session_id_or_json,
5908 const std::string& import_id) {
5915 _return.elapsed = is.elapsed.count();
5916 _return.rows_completed = is.rows_completed;
5917 _return.rows_estimated = is.rows_estimated;
5918 _return.rows_rejected = is.rows_rejected;
5922 const TSessionId& session_id_or_json,
5923 const std::string& archive_path_in,
5924 const TCopyParams& copy_params) {
5928 "get_first_geo_file_in_archive",
5932 std::string archive_path(archive_path_in);
5937 picosha2::hash256_hex_string(request_info.
sessionId()) /
5938 boost::filesystem::path(archive_path).filename();
5939 archive_path = file_path.string();
5952 std::string geo_file =
5955 if (geo_file.size()) {
5957 _return = archive_path_in + std::string(
"/") + geo_file;
5960 _return = archive_path_in;
5964 _return = archive_path_in;
5969 const TSessionId& session_id_or_json,
5970 const std::string& archive_path_in,
5971 const TCopyParams& copy_params) {
5975 "get_all_files_in_archive",
5979 std::string archive_path(archive_path_in);
5983 picosha2::hash256_hex_string(request_info.
sessionId()) /
5984 boost::filesystem::path(archive_path).filename();
5985 archive_path = file_path.string();
6001 for (
auto& s : _return) {
6002 s = archive_path_in +
'/' + s;
6008 const TSessionId& session_id_or_json,
6009 const std::string& file_name_in,
6010 const TCopyParams& cp) {
6017 std::string file_name(file_name_in);
6025 picosha2::hash256_hex_string(request_info.
sessionId()) /
6026 boost::filesystem::path(file_name).filename();
6027 file_name = file_path.string();
6042 if (geo_file.size()) {
6043 file_name = file_name + std::string(
"/") + geo_file;
6057 auto internal_layer_info =
6061 for (
const auto& internal_layer : internal_layer_info) {
6062 TGeoFileLayerInfo layer;
6063 layer.name = internal_layer.name;
6064 switch (internal_layer.contents) {
6066 layer.contents = TGeoFileLayerContents::EMPTY;
6069 layer.contents = TGeoFileLayerContents::GEO;
6072 layer.contents = TGeoFileLayerContents::NON_GEO;
6075 layer.contents = TGeoFileLayerContents::UNSUPPORTED_GEO;
6080 _return.emplace_back(layer);
6088 #ifdef HAVE_PROFILER
6089 if (IsHeapProfilerRunning()) {
6092 HeapProfilerStart(
"omnisci");
6095 #endif // HAVE_PROFILER
6102 #ifdef HAVE_PROFILER
6103 if (!IsHeapProfilerRunning()) {
6109 #endif // HAVE_PROFILER
6113 TSessionId
const& session_id)
const {
6120 const TSessionId& session_id_or_json) {
6124 #ifdef HAVE_PROFILER
6125 if (!IsHeapProfilerRunning()) {
6128 auto profile_buff = GetHeapProfile();
6129 profile = profile_buff;
6133 #endif // HAVE_PROFILER
6142 throw std::runtime_error(
"No session with id " + session_id);
6148 const TSessionId& session_id) {
6157 if (session_id.empty()) {
6171 const std::string& table_name) {
6177 std::vector<DBObject> privObjects;
6178 privObjects.push_back(dbObject);
6181 user_metadata.userLoggable() +
6182 " has no insert privileges for table " + table_name +
".");
6190 case TExecuteMode::GPU:
6193 e.error_msg =
"Cannot switch to GPU mode in a server started in CPU-only mode.";
6197 LOG(
INFO) <<
"User " << user_name <<
" sets GPU mode.";
6199 case TExecuteMode::CPU:
6201 LOG(
INFO) <<
"User " << user_name <<
" sets CPU mode.";
6209 const std::string& query_ra,
6210 const bool column_format,
6212 const int32_t first_n,
6213 const int32_t at_most_n,
6214 const bool just_validate,
6215 const bool find_push_down_candidates,
6217 const std::optional<size_t> executor_index)
const {
6227 executor.get(), query_ra, query_state_proxy->shared_from_this());
6238 auto validate_or_explain_query =
6251 find_push_down_candidates,
6259 auto execution_time_ms =
6261 _return = ra_executor.executeRelAlgQuery(
6265 const auto rs = _return.
getRows();
6267 execution_time_ms -= rs->getQueueTime();
6271 if (!filter_push_down_info.empty()) {
6272 return filter_push_down_info;
6283 const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets)
const {
6284 std::vector<TargetMetaInfo>
result;
6285 for (
const auto& target : targets) {
6287 CHECK(target->get_expr());
6288 result.emplace_back(target->get_resname(), target->get_expr()->get_type_info());
6294 const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets)
const {
6295 std::vector<std::string> names;
6296 for (
const auto& target : targets) {
6298 CHECK(target->get_expr());
6299 names.push_back(target->get_resname());
6305 const std::vector<TargetMetaInfo>& targets)
const {
6306 std::vector<std::string> names;
6307 for (
const auto& target : targets) {
6308 names.push_back(target.get_resname());
6315 const std::vector<TargetMetaInfo>& targets,
6316 const ResultSet& results,
6317 const bool column_format,
6318 const int32_t first_n,
6319 const int32_t at_most_n) {
6323 if (column_format) {
6324 _return.row_set.is_columnar =
true;
6325 std::vector<TColumn> tcolumns(results.colCount());
6326 while (first_n == -1 || fetched < first_n) {
6327 const auto crt_row = results.getNextRow(
true,
true);
6328 if (crt_row.empty()) {
6332 if (at_most_n >= 0 && fetched > at_most_n) {
6336 for (
size_t i = 0; i < results.colCount(); ++i) {
6337 const auto agg_result = crt_row[i];
6341 for (
size_t i = 0; i < results.colCount(); ++i) {
6342 _return.row_set.columns.push_back(tcolumns[i]);
6345 _return.row_set.is_columnar =
false;
6346 while (first_n == -1 || fetched < first_n) {
6347 const auto crt_row = results.getNextRow(
true,
true);
6348 if (crt_row.empty()) {
6352 if (at_most_n >= 0 && fetched > at_most_n) {
6357 trow.cols.reserve(results.colCount());
6358 for (
size_t i = 0; i < results.colCount(); ++i) {
6359 const auto agg_result = crt_row[i];
6360 trow.cols.push_back(
value_to_thrift(agg_result, targets[i].get_type_info()));
6362 _return.row_set.rows.push_back(trow);
6369 const ResultSet& results,
6370 const bool column_format,
6371 const std::string
label) {
6372 CHECK_EQ(
size_t(1), results.rowCount());
6373 TColumnType proj_info;
6374 proj_info.col_name =
label;
6375 proj_info.col_type.type = TDatumType::STR;
6376 proj_info.col_type.nullable =
false;
6377 proj_info.col_type.is_array =
false;
6378 _return.row_set.row_desc.push_back(proj_info);
6379 const auto crt_row = results.getNextRow(
true,
true);
6380 const auto tv = crt_row[0];
6381 CHECK(results.getNextRow(
true,
true).empty());
6382 const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
6384 const auto s_n = boost::get<NullableString>(scalar_tv);
6386 const auto s = boost::get<std::string>(s_n);
6388 if (column_format) {
6390 tcol.data.str_col.push_back(*s);
6391 tcol.nulls.push_back(
false);
6392 _return.row_set.is_columnar =
true;
6393 _return.row_set.columns.push_back(tcol);
6396 explanation.val.str_val = *s;
6397 explanation.is_null =
false;
6399 trow.cols.push_back(explanation);
6400 _return.row_set.is_columnar =
false;
6401 _return.row_set.rows.push_back(trow);
6406 const ResultSet& results,
6407 const bool column_format) {
6412 const ResultSet& results,
6413 const bool column_format) {
6423 std::vector<DBObject> privObjects;
6427 privObjects.push_back(dbObject);
6434 if (
const auto drop_db_stmt = dynamic_cast<Parser::DropDBStmt*>(ddl)) {
6436 }
else if (
const auto rename_db_stmt = dynamic_cast<Parser::RenameDBStmt*>(ddl)) {
6437 sessions_store_->eraseByDB(*rename_db_stmt->getPreviousDatabaseName());
6438 }
else if (
const auto drop_user_stmt = dynamic_cast<Parser::DropUserStmt*>(ddl)) {
6440 }
else if (
const auto rename_user_stmt = dynamic_cast<Parser::RenameUserStmt*>(ddl)) {
6447 const bool column_format,
6449 const int32_t first_n,
6450 const int32_t at_most_n,
6451 const bool use_calcite,
6459 auto&
cat = session_ptr->getCatalog();
6465 std::ostringstream oss;
6466 oss << query_substr << post_fix;
6467 auto const reduced_query_str = oss.str();
6468 bool show_cpu_memory_stat_after_finishing_query =
false;
6469 ScopeGuard cpu_system_memory_logging = [&show_cpu_memory_stat_after_finishing_query,
6471 &reduced_query_str]() {
6472 if (show_cpu_memory_stat_after_finishing_query) {
6476 auto log_cpu_memory_status =
6477 [&reduced_query_str, &
cat, &show_cpu_memory_stat_after_finishing_query]() {
6479 show_cpu_memory_stat_after_finishing_query =
true;
6495 std::string query_ra;
6498 std::tie(result, locks) =
6500 query_ra = result.plan_result;
6502 rapidjson::Document ddl_query;
6503 ddl_query.Parse(query_ra);
6504 CHECK(ddl_query.HasMember(
"payload"));
6505 CHECK(ddl_query[
"payload"].IsObject());
6507 log_cpu_memory_status();
6512 }
else if (pw.is_ctas) {
6516 std::string query_ra;
6519 std::tie(result, locks) =
6521 query_ra = result.plan_result;
6523 if (query_ra.size()) {
6524 rapidjson::Document ddl_query;
6525 ddl_query.Parse(query_ra);
6526 CHECK(ddl_query.HasMember(
"payload"));
6527 CHECK(ddl_query[
"payload"].IsObject());
6529 log_cpu_memory_status();
6537 std::string query_ra;
6540 std::tie(result, locks) =
6542 query_ra = result.plan_result;
6544 rapidjson::Document ddl_query;
6545 ddl_query.Parse(query_ra);
6546 CHECK(ddl_query.HasMember(
"payload"));
6547 CHECK(ddl_query[
"payload"].IsObject());
6549 if (stmt.get_value_lists().size() > 1) {
6550 log_cpu_memory_status();
6556 }
else if (pw.is_validate) {
6558 if (!session_ptr->get_currentUser().isSuper) {
6559 throw std::runtime_error(
"Superuser is required to run VALIDATE");
6562 std::string query_ra;
6565 std::tie(result, locks) =
6567 query_ra = result.plan_result;
6569 rapidjson::Document ddl_query;
6570 ddl_query.Parse(query_ra);
6571 CHECK(ddl_query.HasMember(
"payload"));
6572 CHECK(ddl_query[
"payload"].IsObject());
6578 std::string output{
"Result for validate"};
6592 }
else if (pw.is_copy && !pw.is_copy_to) {
6593 std::unique_ptr<Parser::Stmt> stmt =
6599 throw std::runtime_error(
6600 "Cannot import on an individual leaf. Please import from the Aggregator.");
6605 log_cpu_memory_status();
6607 [&]() { import_stmt->execute(*session_ptr,
read_only_); }));
6613 import_stmt->get_success());
6616 if (import_stmt->was_deferred_copy_from()) {
6618 import_stmt->get_deferred_copy_from_payload(deferred_copy_from_state.
table,
6623 deferred_copy_from_state);
6634 }
else if (pw.is_ddl) {
6635 std::string query_ra;
6638 std::tie(result, locks) =
6640 query_ra = result.plan_result;
6645 }
else if (pw.is_other_explain) {
6647 throw std::runtime_error(
"EXPLAIN not yet supported for DDL or DML commands.");
6663 std::string query_ra = query_str;
6667 std::tie(result, locks) =
6669 query_ra = result.plan_result;
6672 std::string query_ra_calcite_explain;
6680 CHECK(!locks.empty());
6681 query_ra_calcite_explain =
6686 std::vector<PushedDownFilterInfo> filter_push_down_requests;
6688 auto query_session = session_ptr ? session_ptr->get_session_id() :
"";
6689 auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
6691 &filter_push_down_requests,
6695 &query_ra_calcite_explain,
6700 executor_device_type,
6703 parent_thread_local_ids =
6713 explain.isCalciteExplain() ? query_ra_calcite_explain : query_ra,
6715 executor_device_type,
6722 if (explain.isCalciteExplain()) {
6723 if (filter_push_down_requests.empty()) {
6728 CHECK(!locks.empty());
6729 std::vector<TFilterPushDownInfo> filter_push_down_info;
6730 for (
const auto& req : filter_push_down_requests) {
6731 TFilterPushDownInfo filter_push_down_info_for_request;
6732 filter_push_down_info_for_request.input_prev = req.input_prev;
6733 filter_push_down_info_for_request.input_start = req.input_start;
6734 filter_push_down_info_for_request.input_next = req.input_next;
6735 filter_push_down_info.push_back(filter_push_down_info_for_request);
6739 filter_push_down_info,
6746 if (!filter_push_down_requests.empty()) {
6747 CHECK(!locks.empty());
6752 executor_device_type,
6757 filter_push_down_requests);
6764 !explain.isSelectExplain()) {
6765 executor->enrollQuerySession(query_session,
6769 QuerySessionStatus::QueryStatus::PENDING_QUEUE);
6772 executor->checkPendingQueryStatus(query_session);
6774 executor->clearQuerySessionStatus(query_session, submitted_time_str);
6776 throw std::runtime_error(
6777 "Query execution has been interrupted (pending query).");
6781 std::this_thread::sleep_for(std::chrono::milliseconds(10));
6784 log_cpu_memory_status();
6788 auto result_future = execute_rel_alg_task->get_future();
6789 result_future.get();
6797 std::string& query_ra,
6798 const bool column_format,
6800 const int32_t first_n,
6801 const int32_t at_most_n,
6802 const bool just_explain,
6803 const bool is_calcite_explain,
6804 const std::vector<PushedDownFilterInfo>& filter_push_down_requests) {
6806 std::vector<TFilterPushDownInfo> filter_push_down_info;
6807 for (
const auto& req : filter_push_down_requests) {
6808 TFilterPushDownInfo filter_push_down_info_for_request;
6809 filter_push_down_info_for_request.input_prev = req.input_prev;
6810 filter_push_down_info_for_request.input_start = req.input_start;
6811 filter_push_down_info_for_request.input_next = req.input_next;
6812 filter_push_down_info.push_back(filter_push_down_info_for_request);
6818 filter_push_down_info,
6830 executor_device_type,
6848 auto table_schema_lock =
6850 auto table_data_lock =
6856 throw std::runtime_error(
6857 "Query cannot be executed because use of system tables is currently "
6865 const std::vector<std::vector<std::string>>& selected_tables) {
6866 const auto info_schema_catalog =
6868 if (info_schema_catalog) {
6869 for (
const auto& table : selected_tables) {
6871 auto td = info_schema_catalog->getMetadataForTable(table[0],
false);
6882 const std::shared_ptr<Catalog_Namespace::Catalog>&
cat,
6883 const std::string& query_str,
6884 const std::vector<TFilterPushDownInfo>& filter_push_down_info,
6886 const bool check_privileges) {
6895 auto query_parsing_option =
6900 auto optimization_option =
calcite_->getCalciteOptimizationOption(
6903 filter_push_down_info,
6908 query_parsing_option,
6909 optimization_option,
6910 request_info.
json());
6915 const std::string& query_str,
6916 const std::vector<TFilterPushDownInfo>& filter_push_down_info,
6917 const bool acquire_locks,
6919 bool check_privileges) {
6924 if (pw.is_ddl || (!pw.is_validate && !pw.is_other_explain)) {
6930 std::shared_lock<heavyai::DistributedSharedMutex> cat_lock;
6932 cat_lock = std::shared_lock<heavyai::DistributedSharedMutex>(*
cat->dcatalogMutex_);
6937 filter_push_down_info,
6941 result.resolved_accessed_objects.tables_selected_from);
6943 if (acquire_locks) {
6944 std::set<std::vector<std::string>> write_only_tables;
6945 std::vector<std::vector<std::string>>
tables;
6947 tables.insert(tables.end(),
6948 result.resolved_accessed_objects.tables_updated_in.begin(),
6949 result.resolved_accessed_objects.tables_updated_in.end());
6950 tables.insert(tables.end(),
6951 result.resolved_accessed_objects.tables_deleted_from.begin(),
6952 result.resolved_accessed_objects.tables_deleted_from.end());
6955 for (
const auto& table : tables) {
6956 write_only_tables.insert(table);
6959 tables.insert(tables.end(),
6960 result.resolved_accessed_objects.tables_selected_from.begin(),
6961 result.resolved_accessed_objects.tables_selected_from.end());
6962 tables.insert(tables.end(),
6963 result.resolved_accessed_objects.tables_inserted_into.begin(),
6964 result.resolved_accessed_objects.tables_inserted_into.end());
6973 [](
const std::vector<std::string>&
a,
const std::vector<std::string>& b) {
6977 return cat_a->getDatabaseId() < cat_b->getDatabaseId();
6980 return cat->getMetadataForTable(a[0],
false)->tableId <
6981 cat->getMetadataForTable(b[0],
false)->tableId;
6986 tables.erase(unique(tables.begin(), tables.end()), tables.end());
6987 for (
const auto& table : tables) {
6994 if (write_only_tables.count(table)) {
7000 cat->getDatabaseId(), (*locks.back())())));
7002 auto lock_td = (*locks.back())();
7003 if (lock_td->is_in_memory_system_table) {
7007 cat->getDatabaseId(), lock_td)));
7012 cat->getDatabaseId(), lock_td)));
7018 return std::make_pair(result, std::move(locks));
7022 const std::string& select_query) {
7032 }
catch (std::exception& e) {
7038 const TSessionId& session_id_or_json,
7039 const int32_t table_id) {
7048 }
catch (std::exception& e) {
7054 const TSessionId& leaf_session_id_or_json,
7055 const TSessionId& parent_session_id_or_json,
7056 const std::string& serialized_rel_alg_dag,
7057 const std::string& start_time_str,
7058 const bool just_explain,
7059 const std::vector<int64_t>& outer_fragment_indices) {
7064 auto session_ptr = stdlog.getConstSessionInfo();
7068 LOG(
INFO) <<
"start_query :" << *session_ptr <<
" :" << just_explain;
7074 serialized_rel_alg_dag,
7077 outer_fragment_indices);
7078 }
catch (std::exception& e) {
7082 LOG(
INFO) <<
"start_query-COMPLETED " << time_ms <<
"ms "
7083 <<
"id is " << _return.id;
7087 const TPendingQuery& pending_query,
7088 const TSubqueryId subquery_id,
7089 const std::string& start_time_str) {
7094 LOG(
INFO) <<
"execute_query_step : id:" << pending_query.id;
7098 _return, pending_query, subquery_id, start_time_str);
7099 }
catch (std::exception& e) {
7103 LOG(
INFO) <<
"execute_query_step-COMPLETED " << time_ms <<
"ms";
7107 const TRowDescriptor& row_desc,
7108 const TQueryId query_id,
7109 const TSubqueryId subquery_id,
7110 const bool is_final_subquery_result) {
7114 LOG(
INFO) <<
"BROADCAST-SERIALIZED-ROWS id:" << query_id;
7118 serialized_rows, row_desc, query_id, subquery_id, is_final_subquery_result);
7119 }
catch (std::exception& e) {
7123 LOG(
INFO) <<
"BROADCAST-SERIALIZED-ROWS COMPLETED " << time_ms <<
"ms";
7127 const TInsertChunks& thrift_insert_chunks) {
7132 auto session_ptr = stdlog.getConstSessionInfo();
7133 auto const&
cat = session_ptr->getCatalog();
7135 thrift_insert_chunks.db_id};
7136 insert_chunks.valid_row_indices.resize(thrift_insert_chunks.valid_indices.size());
7137 std::copy(thrift_insert_chunks.valid_indices.begin(),
7138 thrift_insert_chunks.valid_indices.end(),
7142 cat.getAllColumnMetadataForTable(
insert_chunks.table_id,
false,
false,
true);
7143 CHECK_EQ(columns.size(), thrift_insert_chunks.data.size());
7145 std::list<foreign_storage::PassThroughBuffer> pass_through_buffers;
7146 auto thrift_data_it = thrift_insert_chunks.data.begin();
7147 for (
const auto col_desc : columns) {
7150 data_buffer = &pass_through_buffers.emplace_back(
7151 reinterpret_cast<const int8_t*>(thrift_data_it->data_buffer.data()),
7152 thrift_data_it->data_buffer.size());
7155 if (col_desc->columnType.is_varlen_indeed()) {
7156 CHECK(thrift_insert_chunks.num_rows == 0 ||
7157 thrift_data_it->index_buffer.size() > 0);
7158 index_buffer = &pass_through_buffers.emplace_back(
7159 reinterpret_cast<const int8_t*>(thrift_data_it->index_buffer.data()),
7160 thrift_data_it->index_buffer.size());
7168 const ChunkKey lock_chunk_key{
cat.getDatabaseId(),
7170 auto table_read_lock =
7176 auto insert_data_lock =
7180 }
catch (
const std::exception& e) {
7186 const TInsertData& thrift_insert_data) {
7191 auto session_ptr = stdlog.getConstSessionInfo();
7192 CHECK_EQ(thrift_insert_data.column_ids.size(), thrift_insert_data.data.size());
7193 CHECK(thrift_insert_data.is_default.size() == 0 ||
7194 thrift_insert_data.is_default.size() == thrift_insert_data.column_ids.size());
7195 auto const&
cat = session_ptr->getCatalog();
7197 insert_data.
databaseId = thrift_insert_data.db_id;
7198 insert_data.
tableId = thrift_insert_data.table_id;
7199 insert_data.
columnIds = thrift_insert_data.column_ids;
7200 insert_data.
is_default = thrift_insert_data.is_default;
7201 insert_data.
numRows = thrift_insert_data.num_rows;
7202 std::vector<std::unique_ptr<std::vector<std::string>>> none_encoded_string_columns;
7203 std::vector<std::unique_ptr<std::vector<ArrayDatum>>> array_columns;
7206 for (
size_t col_idx = 0; col_idx < insert_data.
columnIds.size(); ++col_idx) {
7207 const int column_id = insert_data.
columnIds[col_idx];
7209 const auto cd = cat.getMetadataForColumn(insert_data.
tableId, column_id);
7211 const auto& ti = cd->columnType;
7212 size_t rows_expected =
7216 if (ti.is_number() || ti.is_time() || ti.is_boolean()) {
7217 p.
numbersPtr = (int8_t*)thrift_insert_data.data[col_idx].fixed_len_data.data();
7218 }
else if (ti.is_string()) {
7220 p.
numbersPtr = (int8_t*)thrift_insert_data.data[col_idx].fixed_len_data.data();
7223 none_encoded_string_columns.emplace_back(
new std::vector<std::string>());
7224 auto& none_encoded_strings = none_encoded_string_columns.back();
7226 CHECK_EQ(rows_expected, thrift_insert_data.data[col_idx].var_len_data.size());
7227 for (
const auto& varlen_str : thrift_insert_data.data[col_idx].var_len_data) {
7228 none_encoded_strings->push_back(varlen_str.payload);
7232 }
else if (ti.is_geometry()) {
7233 none_encoded_string_columns.emplace_back(
new std::vector<std::string>());
7234 auto& none_encoded_strings = none_encoded_string_columns.back();
7235 CHECK_EQ(rows_expected, thrift_insert_data.data[col_idx].var_len_data.size());
7236 for (
const auto& varlen_str : thrift_insert_data.data[col_idx].var_len_data) {
7237 none_encoded_strings->push_back(varlen_str.payload);
7246 CHECK(ti.is_array());
7247 array_columns.emplace_back(
new std::vector<ArrayDatum>());
7248 auto& array_column = array_columns.back();
7249 CHECK_EQ(rows_expected, thrift_insert_data.data[col_idx].var_len_data.size());
7250 for (
const auto& t_arr_datum : thrift_insert_data.data[col_idx].var_len_data) {
7251 if (t_arr_datum.is_null) {
7252 if ((cd->columnName.find(
"_coords") != std::string::npos) &&
7253 geo_ti.get_type() ==
kPOINT) {
7255 array_column->push_back(
7257 }
else if (ti.get_size() > 0) {
7260 array_column->emplace_back(0,
nullptr,
true);
7264 arr_datum.length = t_arr_datum.payload.size();
7265 int8_t* ptr = (int8_t*)(t_arr_datum.payload.data());
7266 arr_datum.pointer = ptr;
7269 arr_datum.data_ptr = std::shared_ptr<int8_t>(ptr, [](
auto p) {});
7270 arr_datum.is_null =
false;
7271 array_column->push_back(arr_datum);
7276 insert_data.
data.push_back(p);
7278 const ChunkKey lock_chunk_key{cat.getDatabaseId(),
7279 cat.getLogicalTableId(insert_data.
tableId)};
7280 auto table_read_lock =
7282 const auto td = cat.getMetadataForTable(insert_data.
tableId);
7286 auto insert_data_lock =
7289 td->fragmenter->insertDataNoCheckpoint(insert_data);
7290 }
catch (
const std::exception& e) {
7296 const TSessionId& session_id_or_json,
7297 const int64_t widget_id,
7298 const int16_t node_idx,
7299 const std::string& vega_json) {
7303 auto session_ptr = stdlog.getConstSessionInfo();
7307 LOG(
INFO) <<
"start_render_query :" << *session_ptr <<
" :widget_id:" << widget_id
7308 <<
":vega_json:" << vega_json;
7313 auto& non_const_vega_json =
const_cast<std::string&
>(vega_json);
7321 std::move(non_const_vega_json));
7322 }
catch (std::exception& e) {
7326 LOG(
INFO) <<
"start_render_query-COMPLETED " << time_ms <<
"ms "
7327 <<
"id is " << _return.id;
7331 const TPendingRenderQuery& pending_render,
7332 const TRenderAggDataMap& merged_data) {
7340 LOG(
INFO) <<
"execute_next_render_step: id:" << pending_render.id;
7343 render_handler_->execute_next_render_step(_return, pending_render, merged_data);
7344 }
catch (std::exception& e) {
7348 LOG(
INFO) <<
"execute_next_render_step-COMPLETED id: " << pending_render.id
7349 <<
", time: " << time_ms <<
"ms ";
7356 auto session_ptr = stdlog.getConstSessionInfo();
7357 auto&
cat = session_ptr->getCatalog();
7358 cat.checkpoint(table_id);
7365 const int new_epoch) {
7370 auto session_ptr = stdlog.getConstSessionInfo();
7371 if (!session_ptr->get_currentUser().isSuper) {
7372 throw std::runtime_error(
"Only superuser can set_table_epoch");
7375 ChunkKey table_key{db_id, table_id};
7379 auto&
cat = session_ptr->getCatalog();
7380 cat.setTableEpoch(db_id, table_id, new_epoch);
7381 }
catch (
const std::runtime_error& e) {
7388 const std::string& table_name,
7389 const int new_epoch) {
7395 auto session_ptr = stdlog.getConstSessionInfo();
7396 if (!session_ptr->get_currentUser().isSuper) {
7397 throw std::runtime_error(
"Only superuser can set_table_epoch");
7401 auto&
cat = session_ptr->getCatalog();
7402 auto table_write_lock =
7404 auto table_data_write_lock =
7406 auto td =
cat.getMetadataForTable(
7409 int32_t db_id =
cat.getCurrentDB().dbId;
7411 cat.setTableEpoch(db_id, td->tableId, new_epoch);
7412 }
catch (
const std::runtime_error& e) {
7418 const int32_t db_id,
7419 const int32_t table_id) {
7424 auto session_ptr = stdlog.getConstSessionInfo();
7427 ChunkKey table_key{db_id, table_id};
7431 auto const&
cat = session_ptr->getCatalog();
7432 return cat.getTableEpoch(db_id, table_id);
7433 }
catch (
const std::runtime_error& e) {
7439 const std::string& table_name) {
7445 auto session_ptr = stdlog.getConstSessionInfo();
7448 auto&
cat = session_ptr->getCatalog();
7449 auto table_read_lock =
7451 auto table_data_read_lock =
7453 auto td =
cat.getMetadataForTable(
7456 int32_t db_id =
cat.getCurrentDB().dbId;
7458 return cat.getTableEpoch(db_id, td->tableId);
7459 }
catch (
const std::runtime_error& e) {
7465 const TSessionId& session_id_or_json,
7466 const int32_t db_id,
7467 const int32_t table_id) {
7472 auto session_ptr = stdlog.getConstSessionInfo();
7475 ChunkKey table_key{db_id, table_id};
7479 std::vector<Catalog_Namespace::TableEpochInfo> table_epochs;
7480 auto const&
cat = session_ptr->getCatalog();
7481 table_epochs =
cat.getTableEpochs(db_id, table_id);
7482 CHECK(!table_epochs.empty());
7484 for (
const auto& table_epoch : table_epochs) {
7485 TTableEpochInfo table_epoch_info;
7486 table_epoch_info.table_id = table_epoch.table_id;
7487 table_epoch_info.table_epoch = table_epoch.table_epoch;
7488 table_epoch_info.leaf_index = table_epoch.leaf_index;
7489 _return.emplace_back(table_epoch_info);
7494 const int32_t db_id,
7495 const std::vector<TTableEpochInfo>& table_epochs) {
7500 auto session_ptr = stdlog.getConstSessionInfo();
7505 if (!session_ptr->get_currentUser().isSuper) {
7509 if (table_epochs.empty()) {
7512 auto&
cat = session_ptr->getCatalog();
7513 auto logical_table_id =
cat.getLogicalTableId(table_epochs[0].table_id);
7514 std::vector<Catalog_Namespace::TableEpochInfo> table_epochs_vector;
7515 for (
const auto& table_epoch : table_epochs) {
7516 if (logical_table_id !=
cat.getLogicalTableId(table_epoch.table_id)) {
7519 table_epochs_vector.emplace_back(
7520 table_epoch.table_id, table_epoch.table_epoch, table_epoch.leaf_index);
7525 true,
cat.getMetadataForTable(logical_table_id,
false), db_id);
7526 ChunkKey table_key{db_id, logical_table_id};
7529 cat.setTableEpochs(db_id, table_epochs_vector);
7533 const TSessionId& session_id_or_json,
7534 const std::string& key,
7535 const std::string& nonce) {
7543 const TSessionId& session_id_or_json,
7544 const std::string& nonce) {
7548 _return.claims.emplace_back(
"");
7567 calcite_->close_calcite_server(
false);
7573 #define EXPOSE_THRIFT_MAP(TYPENAME) \
7575 std::map<int, const char*>::const_iterator it = \
7576 _##TYPENAME##_VALUES_TO_NAMES.begin(); \
7577 while (it != _##TYPENAME##_VALUES_TO_NAMES.end()) { \
7578 _return.insert(std::pair<std::string, std::string>( \
7579 #TYPENAME "." + std::string(it->second), std::to_string(it->first))); \
7585 const TSessionId& session_id_or_json) {
7591 for (
auto item :
params) {
7592 _return.insert(item);
7602 const TSessionId& session_id_or_json,
7603 const std::vector<TUserDefinedFunction>& udfs,
7604 const std::vector<TUserDefinedTableFunction>& udtfs,
7605 const std::map<std::string, std::string>& device_ir_map) {
7611 VLOG(1) <<
"register_runtime_extension_functions: # UDFs: " << udfs.size()
7612 <<
" # UDTFs: " << udtfs.size() << std::endl;
7621 auto session_ptr = stdlog.getConstSessionInfo();
7622 if (!session_ptr->get_currentUser().isSuper) {
7624 "Server is configured to require superuser privilege to register UDFs and "
7632 auto it_cpu = device_ir_map.find(std::string{
"cpu"});
7633 auto it_gpu = device_ir_map.find(std::string{
"gpu"});
7634 if (it_cpu != device_ir_map.end() || it_gpu != device_ir_map.end()) {
7635 if (it_cpu != device_ir_map.end()) {
7642 if (it_gpu != device_ir_map.end()) {
7652 VLOG(1) <<
"Registering runtime UDTFs:\n";
7656 for (
auto it = udtfs.begin(); it != udtfs.end(); it++) {
7657 VLOG(1) <<
"UDTF name=" << it->name << std::endl;
7662 static_cast<size_t>(it->sizerArgPos)},
7673 calcite_->setRuntimeExtensionFunctions(udfs, udtfs_,
true);
7676 std::string whitelist =
calcite_->getRuntimeExtensionFunctionWhitelist();
7677 VLOG(1) <<
"Registering runtime extension functions with CodeGen using whitelist:\n"
7685 const TSessionId& session) {
7686 for (
auto udf_name :
7688 if (std::find(_return.begin(), _return.end(), udf_name) == _return.end()) {
7689 _return.emplace_back(udf_name);
7695 const TSessionId& session) {
7696 for (
auto udf_name :
7698 if (std::find(_return.begin(), _return.end(), udf_name) == _return.end()) {
7699 _return.emplace_back(udf_name);
7705 const TSessionId& session,
7706 const std::vector<std::string>& udf_names) {
7707 for (
const std::string& udf_name : udf_names) {
7715 const TSessionId& session) {
7717 const std::string&
name = tf.getName(
true,
true);
7718 if (std::find(_return.begin(), _return.end(),
name) == _return.end()) {
7719 _return.emplace_back(name);
7725 const TSessionId& session) {
7728 const std::string&
name = tf.getName(
true,
true);
7729 if (std::find(_return.begin(), _return.end(),
name) == _return.end()) {
7730 _return.emplace_back(name);
7736 std::vector<TUserDefinedTableFunction>& _return,
7737 const TSessionId& session,
7738 const std::vector<std::string>& udtf_names) {
7739 for (
const std::string& udtf_name : udtf_names) {
7748 const std::string& query_state_str,
7749 TQueryResult& _return) {
7760 int32_t nRows = result.
getDataPtr()->rowCount();
7771 return std::unique_ptr<RexLiteral>(
7776 std::shared_ptr<Catalog_Namespace::SessionInfo const> session_ptr) {
7777 std::shared_ptr<ResultSet> rSet =
nullptr;
7778 std::vector<TargetMetaInfo> label_infos;
7780 if (!session_ptr->get_currentUser().isSuper) {
7781 throw std::runtime_error(
7782 "SHOW USER SESSIONS failed, because it can only be executed by super user.");
7785 std::vector<std::string> labels{
7786 "session_id",
"login_name",
"client_address",
"db_name"};
7787 for (
const auto&
label : labels) {
7792 std::vector<RelLogicalValues::RowValues> logical_values;
7794 for (
const auto& session_ptr : sessions) {
7796 logical_values.back().emplace_back(
7798 logical_values.back().emplace_back(
7800 logical_values.back().emplace_back(
7802 logical_values.back().emplace_back(
7803 genLiteralStr(session_ptr->getCatalog().getCurrentDB().dbName));
7807 rSet = std::shared_ptr<ResultSet>(
7814 std::shared_ptr<Catalog_Namespace::SessionInfo const> session_ptr) {
7815 std::shared_ptr<ResultSet> rSet =
nullptr;
7816 std::vector<TargetMetaInfo> label_infos;
7817 auto current_user_name = session_ptr->get_currentUser().userName;
7818 auto is_super_user = session_ptr->get_currentUser().isSuper.load();
7820 std::vector<std::string> labels{
"query_session_id",
7828 "exec_device_type"};
7829 for (
const auto&
label : labels) {
7833 std::vector<RelLogicalValues::RowValues> logical_values;
7841 for (
const auto& query_session_ptr : sessions) {
7842 std::vector<QuerySessionStatus> query_infos;
7845 executor->getSessionLock());
7846 query_infos = executor->getQuerySessionInfo(query_session_ptr->get_session_id(),
7850 const std::string getQueryStatusStr[] = {
"UNDEFINED",
7853 "RUNNING_QUERY_KERNEL",
7854 "RUNNING_REDUCTION",
7855 "RUNNING_IMPORTER"};
7856 bool is_table_import_session =
false;
7859 logical_values.back().emplace_back(
7861 auto query_status = query_info.getQueryStatus();
7862 logical_values.back().emplace_back(
genLiteralStr(getQueryStatusStr[query_status]));
7863 if (query_status == QuerySessionStatus::QueryStatus::RUNNING_IMPORTER) {
7864 is_table_import_session =
true;
7866 logical_values.back().emplace_back(
7868 logical_values.back().emplace_back(
7870 logical_values.back().emplace_back(
genLiteralStr(query_info.getQueryStr()));
7871 logical_values.back().emplace_back(
7872 genLiteralStr(query_session_ptr->get_currentUser().userName));
7873 logical_values.back().emplace_back(
7875 logical_values.back().emplace_back(
7876 genLiteralStr(query_session_ptr->getCatalog().getCurrentDB().dbName));
7878 !is_table_import_session) {
7886 rSet = std::shared_ptr<ResultSet>(
7893 const TSessionId& session_id_or_json) {
7903 for (
const auto& query_session_ptr : sessions) {
7904 const auto query_session_user_name = query_session_ptr->get_currentUser().userName;
7905 std::vector<QuerySessionStatus> query_infos;
7908 executor->getSessionLock());
7909 query_infos = executor->getQuerySessionInfo(query_session_ptr->get_session_id(),
7913 const std::string getQueryStatusStr[] = {
"UNDEFINED",
7916 "RUNNING_QUERY_KERNEL",
7917 "RUNNING_REDUCTION",
7918 "RUNNING_IMPORTER"};
7921 info.query_session_id = query_session_ptr->get_session_id();
7922 info.query_public_session_id = query_session_ptr->get_public_session_id();
7923 info.current_status = getQueryStatusStr[query_info.getQueryStatus()];
7924 info.query_str = query_info.getQueryStr();
7925 info.executor_id = query_info.getExecutorId();
7926 info.submitted = query_info.getQuerySubmittedTime();
7927 info.login_name = query_session_user_name;
7928 info.client_address = query_session_ptr->get_connection_info();
7929 info.db_name = query_session_ptr->getCatalog().getCurrentDB().dbName;
7931 info.exec_device_type =
"GPU";
7933 info.exec_device_type =
"CPU";
7936 _return.push_back(info);
7941 const std::string& target_session) {
7978 throw std::runtime_error(
7979 "Unable to interrupt running query. Query interrupt is disabled.");
7982 CHECK_EQ(target_session.length(),
static_cast<unsigned long>(8));
7983 auto target_query_session =
sessions_store_->getByPublicID(target_session);
7984 if (!target_query_session) {
7985 throw std::runtime_error(
7986 "Unable to interrupt running query. An invalid query session is given.");
7988 auto target_session_id = target_query_session->get_session_id();
7997 if (non_admin_interrupt_user) {
7998 auto target_user_name = target_query_session->get_currentUser().userName;
7999 if (target_user_name.compare(interrupt_user_name) != 0) {
8000 throw std::runtime_error(
"Unable to interrupt running query.");
8004 auto target_executor_ids = executor->getExecutorIdsRunningQuery(target_session_id);
8005 if (target_executor_ids.empty()) {
8007 executor->getSessionLock());
8008 if (executor->checkIsQuerySessionEnrolled(target_session_id, session_read_lock)) {
8009 session_read_lock.unlock();
8010 VLOG(1) <<
"Received interrupt: "
8014 executor->interrupt(target_session_id, session_info.
get_session_id());
8017 for (
auto& executor_id : target_executor_ids) {
8018 VLOG(1) <<
"Received interrupt: "
8024 target_executor->interrupt(target_session_id, session_info.
get_session_id());
8031 const std::string& cache_type,
8032 int64_t& execution_time_ms) {
8034 if (
to_upper(cache_type) ==
"CPU") {
8036 }
else if (
to_upper(cache_type) ==
"GPU") {
8038 }
else if (
to_upper(cache_type) ==
"RENDER") {
8041 throw std::runtime_error(
"Invalid cache type. Valid values are CPU,GPU or RENDER");
8047 const std::pair<std::string, std::string>& session_parameter,
8048 int64_t& execution_time_ms) {
8050 if (session_parameter.first ==
"EXECUTOR_DEVICE") {
8051 std::string parameter_value =
to_upper(session_parameter.second);
8053 if (parameter_value ==
"GPU") {
8054 executorType = TExecuteMode::type::GPU;
8055 }
else if (parameter_value ==
"CPU") {
8056 executorType = TExecuteMode::type::CPU;
8058 throw std::runtime_error(
"Cannot set the " + session_parameter.first +
" to " +
8059 session_parameter.second +
8060 ". Valid options are CPU and GPU");
8064 }
else if (session_parameter.first ==
"CURRENT_DATABASE") {
8071 TQueryResult& _return,
8072 const std::string& query_ra,
8073 std::shared_ptr<Catalog_Namespace::SessionInfo const> session_ptr) {
8075 std::string commandStr = executor.
commandStr();
8081 int64_t execution_time_ms;
8084 _return.execution_time_ms +=
8088 _return.execution_time_ms +=
8095 _return.execution_time_ms += execution_time_ms;
8102 _return.execution_time_ms += execution_time_ms;
8112 throw std::runtime_error(
"Unknown queue command.");
8115 _return.execution_time_ms +=
8119 if (!result.
empty()) {
8121 _return.execution_time_ms -= result.
getRows()->getQueueTime();
8129 const std::string& query_ra,
8130 std::shared_ptr<Catalog_Namespace::SessionInfo const> session_ptr) {
8132 std::string commandStr = executor.
commandStr();
8137 int64_t execution_time_ms;
8165 throw std::runtime_error(
"Unknwon queue command.");
8185 const std::unordered_set<shared::TableKey>& selected_table_keys)
const {
8186 bool is_in_memory_system_table_query{
false};
8187 const auto info_schema_catalog =
8189 if (info_schema_catalog) {
8190 for (
const auto& table_key : selected_table_keys) {
8191 if (table_key.db_id == info_schema_catalog->getDatabaseId()) {
8192 auto td = info_schema_catalog->getMetadataForTable(table_key.table_id,
false);
8195 is_in_memory_system_table_query =
true;
8200 return is_in_memory_system_table_query;
std::lock_guard< T > lock_guard
std::pair< size_t, size_t > ArraySliceRange
std::string validate() const
void interrupt(const TSessionId &query_session, const TSessionId &interrupt_session) override
Classes used to wrap parser calls for calcite redirection.
int64_t process_deferred_copy_from(const TSessionId &session_id)
void get_table_details_impl(TTableDetails &_return, query_state::StdLog &stdlog, const std::string &table_name, const bool get_system, const bool get_physical, const std::string &database_name={})
ForceDisconnect(const std::string &cause)
std::vector< LeafHostInfo > string_leaves_
HOST DEVICE SQLTypes get_subtype() const
void get_tables_for_database(std::vector< std::string > &_return, const TSessionId &session, const std::string &database_name) override
static void convertData(TQueryResult &_return, ExecutionResult &result, const QueryStateProxy &query_state_proxy, const bool column_format, const int32_t first_n, const int32_t at_most_n)
static void addUdfs(const std::string &json_func_sigs)
const std::vector< std::string > & clang_options_
std::string s3_secret_key
boost::filesystem::path import_path_
RType getResultType() const
std::unique_ptr< QueryDispatchQueue > dispatch_queue_
void add_vsi_archive_prefix(std::string &path)
std::vector< int > ChunkKey
std::vector< std::unique_ptr< lockmgr::AbstractLockContainer< const TableDescriptor * >>> LockedTableDescriptors
double g_running_query_interrupt_freq
int32_t raster_scanlines_per_thread
static const int32_t SERVER_USAGE
void importGeoTableSingle(const TSessionId &session, const std::string &table_name, const std::string &file_name, const import_export::CopyParams ©_params, const TRowDescriptor &row_desc, const TCreateParams &create_params)
specifies the content in-memory of a row in the link metadata view
void insert_chunks(const TSessionId &session, const TInsertChunks &insert_chunks) override
void set_table_epoch(const TSessionId &session, const int db_id, const int table_id, const int new_epoch) override
static const AccessPrivileges VIEW_DASHBOARD
const std::string kDataDirectoryName
size_t g_num_tuple_threshold_switch_to_baseline
bool raster_drop_if_all_null
void resume_executor_queue(const TSessionId &session)
static const int32_t DROP_VIEW
static std::vector< TableFunction > get_table_funcs()
void resetSessionsStore()
HOST DEVICE int get_size() const
static TableSchemaLockMgr & instance()
void importGeoTableGlobFilterSort(const TSessionId &session, const std::string &table_name, const std::string &file_name, const import_export::CopyParams ©_params, const TRowDescriptor &row_desc, const TCreateParams &create_params)
static void convertExplain(TQueryResult &_return, const ResultSet &results, const bool column_format)
const std::string getTargetQuerySessionToKill() const
void validate_configurations()
const bool renderer_use_parallel_executors_
void get_table_function_names(std::vector< std::string > &_return, const TSessionId &session) override
void insert_data(const TSessionId &session, const TInsertData &insert_data) override
bool is_a_supported_archive_file(const std::string &path)
const std::string & udf_filename_
static const int32_t ALTER_SERVER
QueryStateProxy createQueryStateProxy()
std::vector< PushedDownFilterInfo > execute_rel_alg(ExecutionResult &_return, QueryStateProxy, const std::string &query_ra, const bool column_format, const ExecutorDeviceType executor_device_type, const int32_t first_n, const int32_t at_most_n, const bool just_validate, const bool find_push_down_candidates, const ExplainInfo &explain_info, const std::optional< size_t > executor_index=std::nullopt) const
void set_license_key(TLicenseInfo &_return, const TSessionId &session, const std::string &key, const std::string &nonce) override
void clearRenderMemory(const TSessionId &session)
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
DBObjectKey getObjectKey() const
static bool has_view_permission(const AccessPrivileges &privs, const TDBObjectPermissions &permissions)
static const int32_t SELECT_FROM_VIEW
void get_all_effective_roles_for_user(std::vector< std::string > &_return, const TSessionId &session, const std::string &granteeName) override
double g_executor_resource_mgr_per_query_max_cpu_slots_ratio
bool isCalciteExplainDetail() const
static std::vector< ExtensionFunction > get_ext_funcs(const std::string &name)
class for a per-database catalog. also includes metadata for the current database and the current use...
void get_tables(std::vector< std::string > &_return, const TSessionId &session) override
void unshare_dashboard(const TSessionId &session, const int32_t dashboard_id, const std::vector< std::string > &groups, const std::vector< std::string > &objects, const TDashboardPermissions &permissions) override
std::mutex handle_to_dev_ptr_mutex_
TDatumType::type type_to_thrift(const SQLTypeInfo &type_info)
static const int32_t UPDATE_IN_VIEW
const std::string commandStr() const
std::vector< std::string > * stringsPtr
void init_executor_resource_mgr()
TRowDescriptor target_meta_infos_to_thrift(const std::vector< TargetMetaInfo > &targets)
void unshare_dashboards(const TSessionId &session, const std::vector< int32_t > &dashboard_ids, const std::vector< std::string > &groups, const TDashboardPermissions &permissions) override
static TimeT::rep execution(F func, Args &&...args)
std::vector< ArrayDatum > * arraysPtr
static void convertResult(TQueryResult &_return, const ResultSet &results, const bool column_format)
std::shared_ptr< query_state::QueryState > create_query_state(ARGS &&...args)
void sql_execute_impl(ExecutionResult &_return, QueryStateProxy, const bool column_format, const ExecutorDeviceType executor_device_type, const int32_t first_n, const int32_t at_most_n, const bool use_calcite, lockmgr::LockedTableDescriptors &locks)
static void add(const std::string &name, const TableFunctionOutputRowSizer sizer, const std::vector< ExtArgumentType > &input_args, const std::vector< ExtArgumentType > &output_args, const std::vector< ExtArgumentType > &sql_args, const std::vector< std::map< std::string, std::string >> &annotations, bool is_runtime=false)
TTableRefreshInfo get_refresh_info(const TableDescriptor *td)
logger::RequestId requestId() const
void getAllRolesForUserImpl(std::shared_ptr< Catalog_Namespace::SessionInfo const > session_ptr, std::vector< std::string > &roles, const std::string &granteeName, bool effective)
void sql_execute_gdf(TDataFrame &_return, const TSessionId &session, const std::string &query, const int32_t device_id, const int32_t first_n) override
std::string const & getQueryStr() const
static thread_local std::string client_address
void get_views(std::vector< std::string > &_return, const TSessionId &session) override
void get_runtime_table_function_names(std::vector< std::string > &_return, const TSessionId &session) override
static std::string getAstFileName(const std::string &udf_file_name)
static const int32_t CREATE_VIEW
bool raster_point_compute_angle
void add(const std::string &session_id, const DeferredCopyFromState &state)
bool path_is_relative(const std::string &path)
int64_t query_get_outer_fragment_count(const TSessionId &session, const std::string &select_query) override
void share_dashboard(const TSessionId &session, const int32_t dashboard_id, const std::vector< std::string > &groups, const std::vector< std::string > &objects, const TDashboardPermissions &permissions, const bool grant_role) override
TCopyParams copyparams_to_thrift(const import_export::CopyParams &cp)
auto getExecuteReadLock()
EncodingType thrift_to_encoding(const TEncodingType::type tEncodingType)
Data_Namespace::DataMgr & getDataMgr() const
static void loadRuntimeLibs(const std::string &torch_lib_path=std::string())
void clone_session(const TSessionId session1, const TSessionId session2)
static void set_geo_physical_import_buffer_columnar(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column)
bool geo_explode_collections
void check_and_invalidate_sessions(Parser::DDLStmt *ddl)
unsigned g_pending_query_interrupt_freq
std::string convert_temporal_to_iso_format(const SQLTypeInfo &type_info, int64_t unix_time)
bool has_object_privilege(const TSessionId &sessionId, const std::string &granteeName, const std::string &objectName, const TDBObjectType::type object_type, const TDBObjectPermissions &permissions) override
static void clearRTUdfs()
bool enable_calcite_view_optimize
std::vector< SQLTypeInfo > getBestColumnTypes() const
char unescape_char(std::string str)
std::unordered_map< std::string, std::unordered_set< std::string > > fill_column_names_by_table(std::vector< std::string > &table_names, query_state::StdLog &stdlog)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
void get_custom_expressions(std::vector< TCustomExpression > &_return, const TSessionId &session) override
SystemMemoryUsage getSystemMemoryUsage() const
HOST DEVICE int get_scale() const
static const std::string MAPD_EDITION
static thread_local ClientProtocol client_protocol
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
LeafAggregator leaf_aggregator_
void get_hardware_info(TClusterHardwareInfo &_return, const TSessionId &session) override
static bool has_server_permission(const AccessPrivileges &privs, const TDBObjectPermissions &permissions)
void sql_execute(ExecutionResult &_return, const TSessionId &session, const std::string &query, const bool column_format, const int32_t first_n, const int32_t at_most_n, lockmgr::LockedTableDescriptors &locks)
static const AccessPrivileges INSERT_INTO_TABLE
void get_completion_hints(std::vector< TCompletionHint > &hints, const TSessionId &session, const std::string &sql, const int cursor) override
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
static const AccessPrivileges CREATE_DASHBOARD
std::vector< TCompletionHint > just_whitelisted_keyword_hints(const std::vector< TCompletionHint > &hints)
TSessionId getInvalidSessionId() const
void validate_sort_options(const FilePathOptions &options)
void get_tables_impl(std::vector< std::string > &table_names, const Catalog_Namespace::SessionInfo &, const GetTablesType get_tables_type, const std::string &database_name={})
std::vector< std::string > getTargetNames(const std::vector< TargetMetaInfo > &targets) const
static void add(const std::string &json_func_sigs)
int32_t get_table_epoch(const TSessionId &session, const int32_t db_id, const int32_t table_id) override
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
static void value_to_thrift_column(const TargetValue &tv, const SQLTypeInfo &ti, TColumn &column)
void sql_execute_local(TQueryResult &_return, const QueryStateProxy &query_state_proxy, const std::shared_ptr< Catalog_Namespace::SessionInfo > session_ptr, const std::string &query_str, const bool column_format, const std::string &nonce, const int32_t first_n, const int32_t at_most_n, const bool use_calcite)
DEVICE void sort(ARGS &&...args)
void delete_custom_expressions(const TSessionId &session, const std::vector< int32_t > &custom_expression_ids, const bool do_soft_delete) override
void get_column_hints(std::vector< TCompletionHint > &hints, const std::string &last_word, const std::unordered_map< std::string, std::unordered_set< std::string >> &column_names_by_table)
void get_table_epochs(std::vector< TTableEpochInfo > &_return, const TSessionId &session, const int32_t db_id, const int32_t table_id) override
bool path_has_valid_filename(const std::string &path)
void execute_query_step(TStepResult &_return, const TPendingQuery &pending_query, const TSubqueryId subquery_id, const std::string &start_time_str) override
std::vector< bool > is_default
unsigned g_cpu_threads_override
const unsigned renderer_vulkan_timeout_
std::string get_mismatch_attr_warning_text(const std::string &table_name, const std::string &file_path, const std::string &column_name, const std::string &attr, const std::string &got, const std::string &expected)
void set_execution_mode_nolock(Catalog_Namespace::SessionInfo *session_ptr, const TExecuteMode::type mode)
const std::string base_data_path_
void set_execution_mode(const TSessionId &session, const TExecuteMode::type mode) override
static const int32_t ALTER_TABLE
ExtArgumentType from_thrift(const TExtArgumentType::type &t)
void initialize(const bool is_new_db)
const ResultSetPtr & getDataPtr() const
std::string connection_string
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams ©_params)
bool g_enable_dynamic_watchdog
const size_t render_mem_bytes_
const CopyParams & get_copy_params() const
static void init_resource_mgr(const size_t num_cpu_slots, const size_t num_gpu_slots, const size_t cpu_result_mem, const size_t cpu_buffer_pool_mem, const size_t gpu_buffer_pool_mem, const double per_query_max_cpu_slots_ratio, const double per_query_max_cpu_result_mem_ratio, const bool allow_cpu_kernel_concurrency, const bool allow_cpu_gpu_kernel_concurrency, const bool allow_cpu_slot_oversubscription_concurrency, const bool allow_cpu_result_mem_oversubscription, const double max_available_resource_use_ratio)
void start_render_query(TPendingRenderQuery &_return, const TSessionId &session, const int64_t widget_id, const int16_t node_idx, const std::string &vega_json) override
void setPrivileges(const AccessPrivileges &privs)
bool g_enable_non_kernel_time_query_interrupt
HOST DEVICE SQLTypes get_type() const
void switch_database(const TSessionId session, const std::string &dbname)
void setSessionInfo(std::shared_ptr< Catalog_Namespace::SessionInfo >)
void get_db_object_privs(std::vector< TDBObject > &_return, const TSessionId &session, const std::string &objectName, const TDBObjectType::type type) override
const std::string kInfoSchemaDbName
void krb5_connect(TKrb5Session &session, const std::string &token, const std::string &dbname) override
void get_token_based_completions(std::vector< TCompletionHint > &hints, query_state::StdLog &stdlog, std::vector< std::string > &visible_tables, const std::string &sql, const int cursor)
Timer createTimer(char const *event_name)
void check_table_load_privileges(const Catalog_Namespace::SessionInfo &session_info, const std::string &table_name)
void disconnect(const TSessionId session)
bool isForeignTable() const
static void createSimpleResult(TQueryResult &_return, const ResultSet &results, const bool column_format, const std::string label)
std::string raster_import_dimensions
double g_executor_resource_mgr_cpu_result_mem_ratio
void get_users(std::vector< std::string > &_return, const TSessionId &session) override
void dispatch_query_task(std::shared_ptr< QueryDispatchQueue::Task > query_task, const bool is_update_delete)
bool get_qualified_column_hints(std::vector< TCompletionHint > &hints, const std::string &last_word, const std::unordered_map< std::string, std::unordered_set< std::string >> &column_names_by_table)
static constexpr const char * MANUAL_REFRESH_TIMING_TYPE
void detect_column_types(TDetectResult &_return, const TSessionId &session, const std::string &file_name, const TCopyParams ©_params) override
bool g_executor_resource_mgr_allow_cpu_gpu_kernel_concurrency
static const size_t auto_cpu_mem_bytes
void initEncoder(const SQLTypeInfo &tmp_sql_type)
DeferredCopyFromSessions deferred_copy_from_sessions
void execute_distributed_copy_statement(Parser::CopyTableStmt *, const Catalog_Namespace::SessionInfo &session_info)
static bool gdalFileExists(const std::string &path, const CopyParams ©_params)
void import_table_status(TImportStatus &_return, const TSessionId &session, const std::string &import_id) override
std::unique_ptr< AbstractImporter > create_importer(Catalog_Namespace::Catalog &catalog, const TableDescriptor *td, const std::string ©_from_source, const import_export::CopyParams ©_params)
void fillMissingBuffers(const TSessionId &session, const Catalog_Namespace::Catalog &catalog, std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const std::list< const ColumnDescriptor * > &cds, const std::vector< int > &desc_id_to_column_id, size_t num_rows, const std::string &table_name)
static TableDataLockMgr & instance()
void get_function_names(std::vector< std::string > &_return, const TSessionId &session) override
void setNumElems(const size_t num_elems)
DBObject * findDbObject(const DBObjectKey &objectKey, bool only_direct) const
void emergency_shutdown()
size_t get_column_size(const TColumn &column)
const std::string kGeoColumnName
std::string find_last_word_from_cursor(const std::string &sql, const int64_t cursor)
void set_leaf_info(const TSessionId &session, const TLeafInfo &info) override
std::vector< std::string > getTableNamesForUser(const UserMetadata &user, const GetTablesType get_tables_type) const
void sql_validate(TRowDescriptor &_return, const TSessionId &session, const std::string &query) override
std::vector< int > column_ids_by_names(const std::list< const ColumnDescriptor * > &descs, const std::vector< std::string > &column_names)
void internal_connect(TSessionId &session, const std::string &username, const std::string &dbname)
#define LOG_IF(severity, condition)
std::pair< std::string, std::string > getSessionParameter() const
int32_t get_table_epoch_by_name(const TSessionId &session, const std::string &table_name) override
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
bool is_in_memory_system_table
bool should_suggest_column_hints(const std::string &partial_query)
std::string dashboardMetadata
std::unique_ptr< lockmgr::AbstractLockContainer< const TableDescriptor * > > prepare_loader_generic(const Catalog_Namespace::SessionInfo &session_info, const std::string &table_name, size_t num_cols, std::unique_ptr< import_export::Loader > *loader, std::vector< std::unique_ptr< import_export::TypedImportBuffer >> *import_buffers, const std::vector< std::string > &column_names, std::string load_type)
std::unique_lock< WrapperType< std::shared_mutex >> ExecutorWriteLock
import_export::CopyParams copy_params
void fillGeoColumns(const TSessionId &session, const Catalog_Namespace::Catalog &catalog, std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const ColumnDescriptor *cd, size_t &col_idx, size_t num_rows, const std::string &table_name)
static void resume_executor_queue()
size_t getCurrentCacheSizeForDevice(CacheItemType item_type, DeviceIdentifier device_identifier) const
std::unordered_map< std::string, Catalog_Namespace::SessionInfoPtr > calcite_sessions_
bool isShowQueries() const
int tableId
identifies the database into which the data is being inserted
void convertResultSet(ExecutionResult &result, const Catalog_Namespace::SessionInfo &session_info, const std::string &query_state_str, TQueryResult &_return)
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
TRowDescriptor validateRelAlg(const std::string &query_ra, QueryStateProxy query_state_proxy)
void connect_impl(TSessionId &session, const std::string &passwd, const std::string &dbname, const Catalog_Namespace::UserMetadata &user_meta, std::shared_ptr< Catalog_Namespace::Catalog > cat, query_state::StdLog &stdlog)
void addExecutionTime(int64_t execution_time_ms)
std::string getDefaultValueLiteral() const
Driver for running validation on a single node.
void log_cache_size(const Catalog_Namespace::Catalog &cat)
bool g_enable_executor_resource_mgr
std::string add_metadata_columns
size_t numRows
a vector of column ids for the row(s) being inserted
void disconnect_impl(Catalog_Namespace::SessionInfoPtr &session_ptr)
This file contains the class specification and related data structures for Catalog.
ImportHeaderRow has_header
void connect(TSessionId &session, const std::string &username, const std::string &passwd, const std::string &dbname) override
void load_table_binary_arrow(const TSessionId &session, const std::string &table_name, const std::string &arrow_stream, const bool use_column_names) override
void checkpoint(const TSessionId &session, const int32_t table_id) override
std::string ActualQuery()
bool isAlterSystemClear() const
bool isAggregator() const
bool g_enable_columnar_output
size_t g_ratio_num_hash_entry_to_num_tuple_switch_to_baseline
void delete_dashboards(const TSessionId &session, const std::vector< int32_t > &dashboard_ids) override
void add(AccessPrivileges newprivs)
bool is_reserved_name(const std::string &name)
static constexpr const char * REFRESH_START_DATE_TIME_KEY
TRole::type getServerRole() const
std::optional< std::string > regex_path_filter
void validateDashboardIdsForSharing(const Catalog_Namespace::SessionInfo &session_info, const std::vector< int32_t > &dashboard_ids)
std::shared_lock< WrapperType< std::shared_mutex >> ExecutorReadLock
const std::string kDefaultImportDirName
TColumnType populateThriftColumnType(const Catalog_Namespace::Catalog *cat, const ColumnDescriptor *cd)
Supported runtime functions management and retrieval.
std::string dashboardState
static bool has_table_permission(const AccessPrivileges &privs, const TDBObjectPermissions &permission)
void get_layers_in_geo_file(std::vector< TGeoFileLayerInfo > &_return, const TSessionId &session, const std::string &file_name, const TCopyParams ©_params) override
const size_t reserved_gpu_mem_
TColumnType create_geo_column(const TDatumType::type type, const std::string &name, const bool is_array)
std::string TTypeInfo_TypeToString(const TDatumType::type &t)
static SysCatalog & instance()
void get_first_geo_file_in_archive(std::string &_return, const TSessionId &session, const std::string &archive_path, const TCopyParams ©_params) override
void create_table(const TSessionId &session, const std::string &table_name, const TRowDescriptor &row_desc, const TCreateParams &create_params) override
void check_geospatial_files(const boost::filesystem::path file_path, const import_export::CopyParams ©_params)
CONSTEXPR DEVICE bool is_null(const T &value)
RasterPointType raster_point_type
void update_custom_expression(const TSessionId &session, const int32_t id, const std::string &expression_json) override
#define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected)
std::unordered_set< std::string > uc_column_table_qualifiers
static constexpr const char * REFRESH_UPDATE_TYPE_KEY
const bool render_compositor_use_last_gpu_
QueryStateProxy createQueryStateProxy()
const DBMetadata & getCurrentDB() const
void get_function_details(std::vector< TUserDefinedFunction > &_return, const TSessionId &session, const std::vector< std::string > &udf_names) override
void get_all_files_in_archive(std::vector< std::string > &_return, const TSessionId &session, const std::string &archive_path, const TCopyParams ©_params) override
bool g_enable_system_tables
ScopeGuard pause_and_resume_executor_queue()
const size_t CALCITE_SESSION_ID_LENGTH
static const int32_t DROP_DATABASE
void setQueryState(std::shared_ptr< QueryState >)
#define EXPOSE_THRIFT_MAP(TYPENAME)
DEVICE auto copy(ARGS &&...args)
void get_tables_meta_impl(std::vector< TTableMeta > &_return, QueryStateProxy query_state_proxy, const Catalog_Namespace::SessionInfo &session_info, const bool with_table_locks=true)
bool geo_validate_geometry
void check_not_info_schema_db(const std::string &db_name, bool throw_db_exception)
static HashtableRecycler * getHashTableCache()
const int max_session_duration_
ExecutorDeviceType executor_device_type_
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams ©_params)
static constexpr const char * REFRESH_INTERVAL_KEY
std::shared_ptr< Catalog_Namespace::SessionInfo > get_session_ptr(const TSessionId &session_id)
ProjectionTokensForCompletion extract_projection_tokens_for_completion(const std::string &sql)
static std::shared_ptr< QueryEngine > createInstance(CudaMgr_Namespace::CudaMgr *cuda_mgr, bool cpu_only)
std::shared_ptr< Catalog_Namespace::SessionInfo > getSessionInfo() const
std::vector< LeafHostInfo > db_leaves_
static std::unordered_set< std::string > get_udfs_name(const bool is_runtime)
const File_Namespace::DiskCacheConfig & disk_cache_config_
void removeInMemoryCalciteSession(const std::string &session_id)
RequestId set_new_request_id()
#define INVALID_SESSION_ID
static bool has_database_permission(const AccessPrivileges &privs, const TDBObjectPermissions &permissions)
static const int32_t DELETE_FROM_TABLE
const std::vector< TargetMetaInfo > & getTargetsMeta() const
void validate_import_file_path_if_local(const std::string &file_path)
const std::string & clang_path_
const std::shared_ptr< ResultSet > & getRows() const
bool hasErrorCode(ErrorCode const ec) const
bool isShowUserSessions() const
bool g_executor_resource_mgr_allow_cpu_result_mem_oversubscription_concurrency
std::unique_lock< T > unique_lock
std::unique_ptr< RenderHandler > render_handler_
void get_completion_hints_unsorted(std::vector< TCompletionHint > &hints, std::vector< std::string > &visible_tables, query_state::StdLog &stdlog, const std::string &sql, const int cursor)
void alterSession(const std::string &sesson_id, ExecutionResult &result, const std::pair< std::string, std::string > &session_parameter, int64_t &execution_time_ms)
static const int32_t TRUNCATE_TABLE
Checked json field retrieval.
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool validate_with_geos_if_available)
const DashboardDescriptor * getMetadataForDashboard(const std::string &userId, const std::string &dashName) const
void set_cur_session(const TSessionId &parent_session, const TSessionId &leaf_session, const std::string &start_time_str, const std::string &label, bool for_running_query_kernel) override
std::string get_import_tag(const std::string &import_tag, const std::string &table_name, const std::string &file_path)
void updateResultSet(const std::string &query_ra, RType type, bool success=true)
std::shared_ptr< QueryEngine > query_engine_
SystemParameters & system_parameters_
const size_t num_reader_threads_
An AbstractBuffer is a unit of data management for a data manager.
import_export::SourceType source_type
std::string get_load_tag(const std::string &load_tag, const std::string &table_name)
#define SET_REQUEST_ID(parent_request_id)
size_t getTotalMemorySizeForDictionariesForDatabase() const
TDashboard get_dashboard_impl(const std::shared_ptr< Catalog_Namespace::SessionInfo const > &session_ptr, Catalog_Namespace::UserMetadata &user_meta, const DashboardDescriptor *dash, const bool populate_state=true)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
specifies the content in-memory of a row in the column metadata table
int32_t max_num_sessions_
void delete_dashboard(const TSessionId &session, const int32_t dashboard_id) override
std::string getName() const
static std::unique_ptr< SessionsStore > create(const std::string &base_path, size_t n_workers, int idle_session_duration, int max_session_duration, int capacity, DisconnectCallback disconnect_callback)
DataSourceType data_source_type
void get_session_info(TSessionInfo &_return, const TSessionId &session) override
std::string toString(const Executor::ExtModuleKinds &kind)
static const int32_t EDIT_DASHBOARD
static const int32_t DELETE_DASHBOARD
std::pair< TPlanResult, lockmgr::LockedTableDescriptors > parse_to_ra(QueryStateProxy, const std::string &query_str, const std::vector< TFilterPushDownInfo > &filter_push_down_info, const bool acquire_locks, const SystemParameters &system_parameters, bool check_privileges=true)
bool isOptimizedExplain() const
void create_link(std::string &_return, const TSessionId &session, const std::string &view_state, const std::string &view_metadata) override
static const int32_t INSERT_INTO_TABLE
void get_version(std::string &_return) override
RecordBatchVector loadArrowStream(const std::string &stream)
static bool supportsNetworkFileAccess()
bool g_optimize_cuda_block_and_grid_sizes
void get_tables_meta(std::vector< TTableMeta > &_return, const TSessionId &session) override
bool g_executor_resource_mgr_allow_cpu_slot_oversubscription_concurrency
int get_precision() const
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
const bool renderer_prefer_igpu_
heavyai::shared_mutex calcite_sessions_mtx_
DBObjectType getType() const
void get_server_status(TServerStatus &_return, const TSessionId &session) override
void setResultType(RType type)
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams ©_params)
static ResultSet * create(std::vector< TargetMetaInfo > &label_infos, std::vector< RelLogicalValues::RowValues > &logical_values)
std::vector< std::shared_ptr< arrow::RecordBatch >> RecordBatchVector
void load_table_binary_columnar(const TSessionId &session, const std::string &table_name, const std::vector< TColumn > &cols, const std::vector< std::string > &column_names) override
bool is_info_schema_db(const std::string &db_name)
bool isSelectExplain() const
static TDBObject serialize_db_object(const std::string &roleName, const DBObject &inObject)
std::string thrift_to_encoding_name(const TTypeInfo &ti)
void fixup_geo_column_descriptor(TColumnType &col_type, const SQLTypes subtype, const int output_srid)
static const int32_t CREATE_SERVER
std::string thrift_to_name(const TTypeInfo &ti)
RuntimeUdfRegistrationPolicy runtime_udf_registration_policy
std::vector< TargetMetaInfo > getTargetMetaInfo(const std::vector< std::shared_ptr< Analyzer::TargetEntry >> &targets) const
std::string get_session_id() const
bool isJustExplain() const
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::string geo_layer_name
const bool allow_loop_joins_
void start_query(TPendingQuery &_return, const TSessionId &leaf_session, const TSessionId &parent_session, const std::string &serialized_rel_alg_dag, const std::string &start_time_str, const bool just_explain, const std::vector< int64_t > &outer_fragment_indices) override
const AccessPrivileges & getPrivileges() const
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
bool isAlterSessionSet() const
bool file_or_glob_path_exists(const std::string &path)
std::unique_ptr< HeavyDBAggHandler > agg_handler_
std::optional< std::string > default_value
void load_table(const TSessionId &session, const std::string &table_name, const std::vector< TStringRow > &rows, const std::vector< std::string > &column_names) override
void broadcast_serialized_rows(const TSerializedRows &serialized_rows, const TRowDescriptor &row_desc, const TQueryId query_id, const TSubqueryId subquery_id, const bool is_final_subquery_result) override
std::vector< std::string > get_valid_groups(const TSessionId &session, int32_t dashboard_id, std::vector< std::string > groups)
heavyai::shared_mutex custom_expressions_mutex_
TExecuteMode::type getExecutionMode(const TSessionId &session)
void clone_session(TSessionId &session2, const TSessionId &session1) override
TExtArgumentType::type to_thrift(const ExtArgumentType &t)
int32_t g_distributed_leaf_idx
std::unordered_set< std::string > get_uc_compatible_table_names_by_column(const std::unordered_set< std::string > &uc_column_names, std::vector< std::string > &table_names, query_state::StdLog &stdlog)
void get_dashboard(TDashboard &_return, const TSessionId &session, const int32_t dashboard_id) override
const bool enable_rendering_
static void addUdfIrToModule(const std::string &udf_ir_filename, const bool is_cuda_ir)
#define ARROW_THRIFT_THROW_NOT_OK(s)
void get_table_details(TTableDetails &_return, const TSessionId &session, const std::string &table_name) override
void share_dashboards(const TSessionId &session, const std::vector< int32_t > &dashboard_ids, const std::vector< std::string > &groups, const TDashboardPermissions &permissions) override
TPlanResult processCalciteRequest(QueryStateProxy, const std::shared_ptr< Catalog_Namespace::Catalog > &cat, const std::string &query_str, const std::vector< TFilterPushDownInfo > &filter_push_down_info, const SystemParameters &system_parameters, const bool check_privileges)
void get_db_objects_for_grantee(std::vector< TDBObject > &_return, const TSessionId &session, const std::string &roleName) override
ExecutionResult execute(bool read_only_mode)
void appendNameValuePairs(Pairs &&...pairs)
void get_link_view(TFrontendView &_return, const TSessionId &session, const std::string &link) override
const bool intel_jit_profile_
HOST DEVICE EncodingType get_compression() const
const bool renderer_enable_slab_allocation_
std::string returnQueueAction() const
bool is_date_in_days() const
bool g_executor_resource_mgr_allow_cpu_kernel_concurrency
std::vector< DataBlockPtr > data
the number of rows being inserted
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
void disconnect(const TSessionId &session) override
void check_in_memory_system_table_query(const std::vector< std::vector< std::string >> &selected_tables)
static constexpr const char * ALL_REFRESH_UPDATE_TYPE
bool table_is_replicated(const TableDescriptor *td)
TCustomExpression create_thrift_obj_from_custom_expr(const CustomExpression &custom_expr, const Catalog &catalog)
void pause_executor_queue(const TSessionId &session)
Catalog & getCatalog() const
static void convertRows(TQueryResult &_return, QueryStateProxy query_state_proxy, const std::vector< TargetMetaInfo > &targets, const ResultSet &results, const bool column_format, const int32_t first_n, const int32_t at_most_n)
std::string sanitize_name(const std::string &name, const bool underscore=false)
void import_table(const TSessionId &session, const std::string &table_name, const std::string &file_name, const TCopyParams ©_params) override
std::unique_ptr< Catalog_Namespace::SessionsStore > sessions_store_
void register_runtime_extension_functions(const TSessionId &session, const std::vector< TUserDefinedFunction > &udfs, const std::vector< TUserDefinedTableFunction > &udtfs, const std::map< std::string, std::string > &device_ir_map) override
std::shared_ptr< Calcite > calcite_
double gpu_input_mem_limit
static ArrayDatum composeNullPointCoords(const SQLTypeInfo &coords_ti, const SQLTypeInfo &geo_ti)
Basic constructors and methods of the row set interface.
void get_table_details_for_database(TTableDetails &_return, const TSessionId &session, const std::string &table_name, const std::string &database_name) override
void get_status(std::vector< TServerStatus > &_return, const TSessionId &session) override
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
static const int32_t ACCESS
void switch_database(const TSessionId &session, const std::string &dbname) override
std::shared_ptr< Data_Namespace::DataMgr > data_mgr_
void validateGroups(const std::vector< std::string > &groups)
std::string s3_session_token
static const int32_t CREATE_DATABASE
void check_table_not_sharded(const TableDescriptor *td)
void executeDdl(TQueryResult &_return, const std::string &query_ra, std::shared_ptr< Catalog_Namespace::SessionInfo const > session_ptr)
int32_t geo_coords_comp_param
void shareOrUnshareDashboards(const TSessionId &session, const std::vector< int32_t > &dashboard_ids, const std::vector< std::string > &groups, const TDashboardPermissions &permissions, const bool do_share)
void get_queries_info(std::vector< TQueryInfo > &_return, const TSessionId &session) override
bool checkInMemorySystemTableQuery(const std::unordered_set< shared::TableKey > &tables_selected_from) const
void removeFragmenterForTable(const int table_id) const
void get_runtime_function_names(std::vector< std::string > &_return, const TSessionId &session) override
static std::map< ExtModuleKinds, std::string > extension_module_sources
static const AccessPrivileges ACCESS
void get_databases(std::vector< TDBInfo > &_return, const TSessionId &session) override
static const int32_t VIEW_DASHBOARD
bool user_can_access_table(const Catalog_Namespace::SessionInfo &, const TableDescriptor *td, const AccessPrivileges acess_priv)
std::string dashboardName
static const AccessPrivileges ALL_TABLE
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
void replace_dashboard(const TSessionId &session, const int32_t dashboard_id, const std::string &dashboard_name, const std::string &dashboard_owner, const std::string &dashboard_state, const std::string &image_hash, const std::string &dashboard_metadata) override
static const int32_t VIEW_SQL_EDITOR
static constexpr const char * APPEND_REFRESH_UPDATE_TYPE
void get_result_row_for_pixel(TPixelTableRowResult &_return, const TSessionId &session, const int64_t widget_id, const TPixel &pixel, const std::map< std::string, std::vector< std::string >> &table_col_names, const bool column_format, const int32_t pixel_radius, const std::string &nonce) override
const bool legacy_syntax_
void import_geo_table(const TSessionId &session, const std::string &table_name, const std::string &file_name, const TCopyParams ©_params, const TRowDescriptor &row_desc, const TCreateParams &create_params) override
void check_read_only(const std::string &str)
HOST DEVICE int get_comp_param() const
static std::unique_ptr< RexLiteral > genLiteralStr(std::string val)
void set_table_epoch_by_name(const TSessionId &session, const std::string &table_name, const int new_epoch) override
int32_t g_distributed_num_leaves
void set_table_epochs(const TSessionId &session, const int32_t db_id, const std::vector< TTableEpochInfo > &table_epochs) override
bool g_allow_system_dashboard_update
double g_executor_resource_mgr_per_query_max_cpu_result_mem_ratio
std::string returnCacheType() const
bool g_uniform_request_ids_per_thrift_call
static const int32_t DROP_TABLE
size_t g_executor_resource_mgr_cpu_result_mem_bytes
std::string filename(char const *path)
bool g_enable_filter_push_down
std::string find_first_geo_file_in_archive(const std::string &archive_path, const import_export::CopyParams ©_params)
std::vector< std::string > get_headers()
void deallocate_df(const TSessionId &session, const TDataFrame &df, const TDeviceType::type device_type, const int32_t device_id) override
std::string pg_shim(std::string const &query)
static const int32_t INSERT_INTO_VIEW
std::string raster_import_bands
static void registerExtensionFunctions(F register_extension_functions)
Catalog_Namespace::SessionInfo get_session_copy(const TSessionId &session_id)
Encoder * getEncoder() const
bool dashboard_exists(const Catalog_Namespace::Catalog &cat, const int32_t user_id, const std::string &dashboard_name)
bool has_role(const TSessionId &sessionId, const std::string &granteeName, const std::string &roleName) override
static constexpr const char * REFRESH_TIMING_TYPE_KEY
int32_t create_custom_expression(const TSessionId &session, const TCustomExpression &custom_expression) override
void render_vega(TRenderResult &_return, const TSessionId &session, const int64_t widget_id, const std::string &vega_json, const int32_t compression_level, const std::string &nonce) override
std::map< const std::string, const PermissionFuncPtr > permissionFuncMap_
bool g_allow_memory_status_log
std::unique_ptr< Catalog_Namespace::CustomExpression > create_custom_expr_from_thrift_obj(const TCustomExpression &t_custom_expr, const Catalog &catalog)
std::unique_ptr< HeavyDBLeafHandler > leaf_handler_
std::string expression_json
std::string remove_vsi_prefixes(const std::string &path_in)
void invalidate_cur_session(const TSessionId &parent_session, const TSessionId &leaf_session, const std::string &start_time_str, const std::string &label, bool for_running_query_kernel) override
void add_vsi_network_prefix(std::string &path)
static const int32_t DELETE_FROM_VIEW
#define DEBUG_TIMER(name)
static HashtableRecycler * getHashTableCache()
static const int32_t CREATE_TABLE
static ImportStatus get_import_status(const std::string &id)
void check_table_consistency(TTableMeta &_return, const TSessionId &session, const int32_t table_id) override
void execute_rel_alg_with_filter_push_down(ExecutionResult &_return, QueryStateProxy, std::string &query_ra, const bool column_format, const ExecutorDeviceType executor_device_type, const int32_t first_n, const int32_t at_most_n, const bool just_explain, const bool just_calcite_explain, const std::vector< PushedDownFilterInfo > &filter_push_down_requests)
const int idle_session_duration_
double g_executor_resource_mgr_max_available_resource_use_ratio
void setExecutionTime(int64_t execution_time_ms)
static TDatum value_to_thrift(const TargetValue &tv, const SQLTypeInfo &ti)
static bool is_allowed_on_dashboard(const Catalog_Namespace::SessionInfo &session_info, int32_t dashboard_id, AccessPrivileges requestedPermissions)
void remove(const std::string &session_id)
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const bool is_raster, const std::string &geoColumnName, const CopyParams ©_params)
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
static const int32_t CREATE_DASHBOARD
static constexpr int NULL_REFRESH_TIME
void start_heap_profile(const TSessionId &session) override
void get_table_function_details(std::vector< TUserDefinedTableFunction > &_return, const TSessionId &session, const std::vector< std::string > &udtf_names) override
static std::vector< std::string > gdalGetAllFilesInArchive(const std::string &archive_path, const CopyParams ©_params)
static void pause_executor_queue()
The data to be inserted using the fragment manager.
void get_device_parameters(std::map< std::string, std::string > &_return, const TSessionId &session) override
std::unordered_map< std::string, std::string > ipc_handle_to_dev_ptr_
auto getExecuteWriteLock()
void interruptQuery(const Catalog_Namespace::SessionInfo &session_info, const std::string &target_session)
static std::shared_ptr< QueryEngine > getInstance()
void set_executor_device_type(ExecutorDeviceType t)
bool hasTableAccessPrivileges(const TableDescriptor *td, const Catalog_Namespace::SessionInfo &session_info)
Serializers for query engine types to/from thrift.
bool isCalciteExplain() const
std::list< DBSummary > DBSummaryList
const size_t max_concurrent_render_sessions_
void log_system_cpu_memory_status(std::string const &query, const Catalog_Namespace::Catalog &cat)
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
void check_valid_column_names(const std::list< const ColumnDescriptor * > &descs, const std::vector< std::string > &column_names)
Catalog_Namespace::SessionInfoPtr findCalciteSession(TSessionId const &) const
static const AccessPrivileges DELETE_DASHBOARD
void get_internal_table_details(TTableDetails &_return, const TSessionId &session, const std::string &table_name, const bool include_system_columns) override
void get_roles(std::vector< std::string > &_return, const TSessionId &session) override
static const int32_t SELECT_FROM_TABLE
static const std::string MAPD_RELEASE
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool sanitize_column_names
static constexpr ExecutorId UNITARY_EXECUTOR_ID
std::vector< std::unique_ptr< const RexScalar >> RowValues
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
void load_table_binary(const TSessionId &session, const std::string &table_name, const std::vector< TRow > &rows, const std::vector< std::string > &column_names) override
bool isAlterSystemControlExecutorQueue() const
std::vector< TServerStatus > getLeafStatus(TSessionId session)
static HashtableRecycler * getHashTableCache()
void clear_gpu_memory(const TSessionId &session) override
bool check_and_reset_in_memory_system_table(const Catalog &catalog, const TableDescriptor &td)
HOST DEVICE bool get_notnull() const
bool isAggregator() const
void init_table_functions()
std::unordered_set< std::string > uc_column_names
void get_physical_tables(std::vector< std::string > &_return, const TSessionId &session) override
void get_dashboards(std::vector< TDashboard > &_return, const TSessionId &session) override
std::pair< std::string, std::string > compileUdf(const std::string &udf_file_name) const
const AuthMetadata & authMetadata_
import_export::CopyParams thrift_to_copyparams(const TCopyParams &cp)
unsigned g_dynamic_watchdog_time_limit
static bool has_dashboard_permission(const AccessPrivileges &privs, const TDBObjectPermissions &permissions)
TTypeInfo type_info_to_thrift(const SQLTypeInfo &ti)
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
static const AccessPrivileges EDIT_DASHBOARD
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
static const int32_t UPDATE_IN_TABLE
static constexpr size_t kDefaultSampleRowsCount
DBHandler(const std::vector< LeafHostInfo > &db_leaves, const std::vector< LeafHostInfo > &string_leaves, const std::string &base_data_path, const bool allow_multifrag, const bool jit_debug, const bool intel_jit_profile, const bool read_only, const bool allow_loop_joins, const bool enable_rendering, const bool renderer_prefer_igpu, const unsigned renderer_vulkan_timeout_ms, const bool renderer_use_parallel_executors, const bool enable_auto_clear_render_mem, const int render_oom_retry_threshold, const size_t render_mem_bytes, const size_t max_concurrent_render_sessions, const size_t reserved_gpu_mem, const bool render_compositor_use_last_gpu, const bool renderer_enable_slab_allocation, const size_t num_reader_threads, const AuthMetadata &authMetadata, SystemParameters &system_parameters, const bool legacy_syntax, const int idle_session_duration, const int max_session_duration, const std::string &udf_filename, const std::string &clang_path, const std::vector< std::string > &clang_options, const File_Namespace::DiskCacheConfig &disk_cache_config, const bool is_new_db)
std::string s3_access_key
SQLTypeInfo get_elem_type() const
void sql_execute_df(TDataFrame &_return, const TSessionId &session, const std::string &query, const TDeviceType::type device_type, const int32_t device_id, const int32_t first_n, const TArrowTransport::type transport_method) override
std::vector< int > columnIds
identifies the table into which the data is being inserted
bool g_allow_s3_server_privileges
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
int64_t getExecutionTime() const
void get_heap_profile(std::string &_return, const TSessionId &session) override
RasterPointTransform raster_point_transform
bool hasPermission(int permission) const
ExecutionResult getQueries(std::shared_ptr< Catalog_Namespace::SessionInfo const > session_ptr)
void get_all_roles_for_user(std::vector< std::string > &_return, const TSessionId &session, const std::string &granteeName) override
const UserMetadata & get_currentUser() const
std::string const & sessionId() const
std::optional< std::string > file_sort_order_by
bool is_a_supported_geo_file(const std::string &path)
bool g_enable_runtime_query_interrupt
const std::string kInfoSchemaMigrationName
std::unique_ptr< Parser::Stmt > create_stmt_for_query(const std::string &queryStr, const Catalog_Namespace::SessionInfo &session_info)
std::string dump_table_col_names(const std::map< std::string, std::vector< std::string >> &table_col_names)
static BoundingBoxIntersectTuningParamRecycler * getBoundingBoxIntersectTuningParamCache()
void get_dashboard_grantees(std::vector< TDashboardGrantees > &_return, const TSessionId &session, const int32_t dashboard_id) override
void get_license_claims(TLicenseInfo &_return, const TSessionId &session, const std::string &nonce) override
ExecutionResult getUserSessions(std::shared_ptr< Catalog_Namespace::SessionInfo const > session_ptr)
ThreadLocalIds thread_local_ids()
void get_memory(std::vector< TNodeMemoryInfo > &_return, const TSessionId &session, const std::string &memory_level) override
void add_vsi_geo_prefix(std::string &path)
std::string credential_string
static ReadLock getReadLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
ConnectionInfo getConnectionInfo() const
const std::string getQuerySubmittedTime() const
void resizeDispatchQueue(size_t queue_size)
static constexpr const char * SCHEDULE_REFRESH_TIMING_TYPE
void execute_next_render_step(TRenderStepResult &_return, const TPendingRenderQuery &pending_render, const TRenderAggDataMap &merged_data) override
void get_internal_table_details_for_database(TTableDetails &_return, const TSessionId &session, const std::string &table_name, const std::string &database_name) override
std::string const createInMemoryCalciteSession(const std::shared_ptr< Catalog_Namespace::Catalog > &catalog_ptr)
bool isPlanExplain() const
void setRequestId(logger::RequestId const request_id)
static const int32_t DROP_SERVER
void alterSystemClear(const std::string &sesson_id, ExecutionResult &result, const std::string &cache_type, int64_t &execution_time_ms)
void stop_heap_profile(const TSessionId &session) override
HOST DEVICE int get_output_srid() const
std::optional< std::string > file_sort_regex
std::atomic< bool > initialized_
static void addRTUdfs(const std::string &json_func_sigs)
bool TTypeInfo_IsGeo(const TDatumType::type &t)
int32_t create_dashboard(const TSessionId &session, const std::string &dashboard_name, const std::string &dashboard_state, const std::string &image_hash, const std::string &dashboard_metadata) override
std::string get_hostname()
void interrupt(const TSessionId query_session, const TSessionId interrupt_session)
TEncodingType::type encoding_to_thrift(const SQLTypeInfo &type_info)
std::shared_ptr< SessionInfo > SessionInfoPtr
#define THROW_DB_EXCEPTION(errstr)
EncodingType geo_coords_encoding
bool is_local_file(const std::string &file_path)
void clear_cpu_memory(const TSessionId &session) override