OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
QueryRunner.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryRunner.h"
18 
19 #include "Calcite/Calcite.h"
20 #include "Catalog/Catalog.h"
22 #include "DistributedLoader.h"
23 #include "Geospatial/ColumnNames.h"
25 #include "Logger/Logger.h"
26 #include "Parser/ParserNode.h"
27 #include "Parser/ParserWrapper.h"
36 #ifdef HAVE_RUNTIME_LIBS
38 #endif
39 #include "Shared/StringTransform.h"
40 #include "Shared/SysDefinitions.h"
42 #include "Shared/import_helpers.h"
44 #include "gen-cpp/CalciteServer.h"
45 #include "include/bcrypt.h"
46 
47 #include <boost/filesystem/operations.hpp>
48 #include <csignal>
49 #include <random>
50 
51 #define CALCITEPORT 3279
52 
53 extern size_t g_leaf_count;
54 extern bool g_enable_filter_push_down;
55 
57 
58 extern bool g_serialize_temp_tables;
60 std::mutex calcite_lock;
61 
62 using namespace Catalog_Namespace;
63 namespace {
64 
65 std::shared_ptr<Calcite> g_calcite = nullptr;
66 
67 void calcite_shutdown_handler() noexcept {
68  if (g_calcite) {
69  g_calcite->close_calcite_server();
70  g_calcite.reset();
71  }
72 }
73 
77 }
78 
79 } // namespace
80 
81 namespace QueryRunner {
82 
83 std::unique_ptr<QueryRunner> QueryRunner::qr_instance_ = nullptr;
84 
85 query_state::QueryStates QueryRunner::query_states_;
86 
87 QueryRunner* QueryRunner::init(const char* db_path,
88  const std::string& udf_filename,
89  const size_t max_gpu_mem,
90  const int reserved_gpu_mem) {
91  return QueryRunner::init(db_path,
93  "HyperInteractive",
95  {},
96  {},
97  udf_filename,
98  true,
99  max_gpu_mem,
100  reserved_gpu_mem);
101 }
102 
104  const char* db_path,
105  const std::vector<LeafHostInfo>& string_servers,
106  const std::vector<LeafHostInfo>& leaf_servers) {
107  return QueryRunner::init(db_path,
109  "HyperInteractive",
111  string_servers,
112  leaf_servers,
113  "",
114  true,
115  0,
116  256 << 20,
117  false,
118  false,
119  disk_cache_config);
120 }
121 
122 QueryRunner* QueryRunner::init(const char* db_path,
123  const std::string& user,
124  const std::string& pass,
125  const std::string& db_name,
126  const std::vector<LeafHostInfo>& string_servers,
127  const std::vector<LeafHostInfo>& leaf_servers,
128  const std::string& udf_filename,
129  bool uses_gpus,
130  const size_t max_gpu_mem,
131  const int reserved_gpu_mem,
132  const bool create_user,
133  const bool create_db,
134  const File_Namespace::DiskCacheConfig* disk_cache_config) {
135  // Whitelist root path for tests by default
137  ddl_utils::FilePathWhitelist::initialize(db_path, "[\"/\"]", "[\"/\"]");
138  LOG_IF(FATAL, !leaf_servers.empty()) << "Distributed test runner not supported.";
139  CHECK(leaf_servers.empty());
140  qr_instance_.reset(new QueryRunner(db_path,
141  user,
142  pass,
143  db_name,
144  string_servers,
145  leaf_servers,
146  udf_filename,
147  uses_gpus,
148  max_gpu_mem,
149  reserved_gpu_mem,
150  create_user,
151  create_db,
152  disk_cache_config));
153  return qr_instance_.get();
154 }
155 
156 QueryRunner::QueryRunner(const char* db_path,
157  const std::string& user_name,
158  const std::string& passwd,
159  const std::string& db_name,
160  const std::vector<LeafHostInfo>& string_servers,
161  const std::vector<LeafHostInfo>& leaf_servers,
162  const std::string& udf_filename,
163  bool uses_gpus,
164  const size_t max_gpu_mem,
165  const int reserved_gpu_mem,
166  const bool create_user,
167  const bool create_db,
168  const File_Namespace::DiskCacheConfig* cache_config)
169  : dispatch_queue_(std::make_unique<QueryDispatchQueue>(1)) {
171  boost::filesystem::path base_path{db_path};
172  CHECK(boost::filesystem::exists(base_path));
173  auto system_db_file =
175  CHECK(boost::filesystem::exists(system_db_file));
176  auto data_dir = base_path / shared::kDataDirectoryName;
177  File_Namespace::DiskCacheConfig disk_cache_config{
178  (base_path / shared::kDefaultDiskCacheDirName).string(),
180  if (cache_config) {
181  disk_cache_config = *cache_config;
182  }
184 
187  g_calcite =
188  std::make_shared<Calcite>(-1, CALCITEPORT, db_path, 1024, 5000, true, udf_filename);
189  ExtensionFunctionsWhitelist::add(g_calcite->getExtensionFunctionWhitelist());
190  if (!udf_filename.empty()) {
191  ExtensionFunctionsWhitelist::addUdfs(g_calcite->getUserDefinedFunctionWhitelist());
192  }
193 
195 #ifdef HAVE_RUNTIME_LIBS
198 #endif
199  auto udtfs = ThriftSerializers::to_thrift(
201  std::vector<TUserDefinedFunction> udfs = {};
202  g_calcite->setRuntimeExtensionFunctions(udfs, udtfs, /*is_runtime=*/false);
203 
204  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
205 #ifdef HAVE_CUDA
206  if (uses_gpus) {
207  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(-1, 0);
208  }
209 #else
210  uses_gpus = false;
211 #endif
212  const size_t num_gpus = static_cast<size_t>(cuda_mgr ? cuda_mgr->getDeviceCount() : 0);
213  SystemParameters mapd_params;
214  mapd_params.gpu_buffer_mem_bytes = max_gpu_mem;
215  mapd_params.aggregator = !leaf_servers.empty();
216 
217  auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
218 
219  g_base_path = base_path.string();
220 
221  if (!sys_cat.isInitialized()) {
222  auto data_mgr = std::make_shared<Data_Namespace::DataMgr>(data_dir.string(),
223  mapd_params,
224  std::move(cuda_mgr),
225  uses_gpus,
226  reserved_gpu_mem,
227  0,
228  disk_cache_config);
229 
231  // With the exception of cpu_result_mem, the below values essentially mirror
232  // how ExecutorResourceMgr is initialized by DBHandler for normal DB operation.
233  // The static 4GB allowcation of CPU result memory is sufficient for our tests,
234  // and prevents variability based on the DBHandler approach to sizing as a fraction
235  // of CPU buffer pool mem size.
237  cpu_threads() /* num_cpu_slots */,
238  num_gpus /* num_gpu_slots */,
239  static_cast<size_t>(1UL << 32) /* cpu_result_mem */,
240  data_mgr->getCpuBufferPoolSize() /* cpu_buffer_pool_mem */,
241  data_mgr->getGpuBufferPoolSize() /* gpu_buffer_pool_mem */,
242  0.9 /* per_query_max_cpu_slots_ratio */,
243  1.0 /* per_query_max_cpu_result_mem_ratio */,
244  true /* allow_cpu_kernel_concurrency */,
245  true /* allow_cpu_gpu_kernel_concurrency */,
246  false /* allow_cpu_slot_oversubscription_concurrency */,
247  false /* allow_cpu_result_mem_oversubscription_concurrency */,
248  0.9 /* max_available_resource_use_ratio */);
249  }
250 
251  sys_cat.init(g_base_path,
252  data_mgr,
253  {},
254  g_calcite,
255  false,
256  mapd_params.aggregator,
257  string_servers);
258  }
259 
260  query_engine_ =
261  QueryEngine::createInstance(sys_cat.getDataMgr().getCudaMgr(), !uses_gpus);
262 
263  if (create_user) {
264  if (!sys_cat.getMetadataForUser(user_name, user)) {
265  sys_cat.createUser(
266  user_name,
268  passwd, /*is_super=*/false, /*default_db=*/"", /*can_login=*/true},
269  g_read_only);
270  }
271  }
272  CHECK(sys_cat.getMetadataForUser(user_name, user));
273  CHECK(bcrypt_checkpw(passwd.c_str(), user.passwd_hash.c_str()) == 0);
274 
275  if (create_db) {
276  if (!sys_cat.getMetadataForDB(db_name, db_metadata_)) {
277  sys_cat.createDatabase(db_name, user.userId);
278  }
279  }
280  CHECK(sys_cat.getMetadataForDB(db_name, db_metadata_));
281  CHECK(user.isSuper || (user.userId == db_metadata_.dbOwner));
282  auto cat = sys_cat.getCatalog(db_metadata_, create_db);
283  CHECK(cat);
284  session_info_ = std::make_unique<Catalog_Namespace::SessionInfo>(
285  cat, user, ExecutorDeviceType::GPU, "");
286 }
287 
288 void QueryRunner::resizeDispatchQueue(const size_t num_executors) {
289  dispatch_queue_ = std::make_unique<QueryDispatchQueue>(num_executors);
290 }
291 
292 QueryRunner::QueryRunner(std::unique_ptr<Catalog_Namespace::SessionInfo> session)
293  : session_info_(std::move(session))
294  , dispatch_queue_(std::make_unique<QueryDispatchQueue>(1)) {}
295 
296 std::shared_ptr<Catalog_Namespace::Catalog> QueryRunner::getCatalog() const {
298  return session_info_->get_catalog_ptr();
299 }
300 
301 std::shared_ptr<Calcite> QueryRunner::getCalcite() const {
302  // TODO: Embed Calcite shared_ptr ownership in QueryRunner
303  return g_calcite;
304 }
305 
306 bool QueryRunner::gpusPresent() const {
308  return session_info_->getCatalog().getDataMgr().gpusPresent();
309 }
310 
311 void QueryRunner::clearGpuMemory() const {
314 }
315 
316 void QueryRunner::clearCpuMemory() const {
319 }
320 
321 std::vector<MemoryInfo> QueryRunner::getMemoryInfo(
322  const Data_Namespace::MemoryLevel memory_level) const {
324  return session_info_->getCatalog().getDataMgr().getMemoryInfo(memory_level);
325 }
326 
327 BufferPoolStats QueryRunner::getBufferPoolStats(
328  const Data_Namespace::MemoryLevel memory_level,
329  const bool current_db_only) const {
330  // Only works single-node for now
332  const std::vector<MemoryInfo> memory_infos =
333  session_info_->getCatalog().getDataMgr().getMemoryInfo(memory_level);
334  if (memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
335  CHECK_EQ(memory_infos.size(), static_cast<size_t>(1));
336  }
337  std::set<std::vector<int32_t>> chunk_keys;
338  std::set<std::vector<int32_t>> table_keys;
339  std::set<std::vector<int32_t>> column_keys;
340  std::set<std::vector<int32_t>> fragment_keys;
341  size_t total_num_buffers{
342  0}; // can be greater than chunk keys set size due to table replication
343  size_t total_num_bytes{0};
344  for (auto& pool_memory_info : memory_infos) {
345  const std::vector<MemoryData>& memory_data = pool_memory_info.nodeMemoryData;
346  for (auto& memory_datum : memory_data) {
347  total_num_buffers++;
348  const auto& chunk_key = memory_datum.chunk_key;
349  if (memory_datum.memStatus == Buffer_Namespace::MemStatus::FREE ||
350  chunk_key.size() < 4) {
351  continue;
352  }
353  if (current_db_only) {
354  if (chunk_key[0] != db_metadata_.dbId) {
355  continue;
356  }
357  }
358  total_num_bytes += (memory_datum.numPages * pool_memory_info.pageSize);
359  table_keys.insert({chunk_key[0], chunk_key[1]});
360  column_keys.insert({chunk_key[0], chunk_key[1], chunk_key[2]});
361  fragment_keys.insert({chunk_key[0], chunk_key[1], chunk_key[3]});
362  chunk_keys.insert(chunk_key);
363  }
364  }
365  return {total_num_buffers,
366  total_num_bytes,
367  table_keys.size(),
368  column_keys.size(),
369  fragment_keys.size(),
370  chunk_keys.size()};
371 }
372 
373 RegisteredQueryHint QueryRunner::getParsedQueryHint(const std::string& query_str) {
376  auto query_state = create_query_state(session_info_, query_str);
377  auto& cat = session_info_->getCatalog();
379 
380  auto calcite_mgr = cat.getCalciteMgr();
381  const auto calciteQueryParsingOption =
382  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
383  const auto calciteOptimizationOption =
384  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {}, false);
385  const auto query_ra = calcite_mgr
386  ->process(query_state->createQueryStateProxy(),
387  pg_shim(query_str),
388  calciteQueryParsingOption,
389  calciteOptimizationOption)
390  .plan_result;
391  auto ra_executor = RelAlgExecutor(executor.get(), query_ra, query_state);
392  auto query_hints =
393  ra_executor.getParsedQueryHint(ra_executor.getRootRelAlgNodeShPtr().get());
394  return query_hints ? *query_hints : RegisteredQueryHint::defaults();
395 }
396 
397 std::shared_ptr<const RelAlgNode> QueryRunner::getRootNodeFromParsedQuery(
398  const std::string& query_str) {
401  auto query_state = create_query_state(session_info_, query_str);
402  auto& cat = session_info_->getCatalog();
404 
405  auto calcite_mgr = cat.getCalciteMgr();
406  const auto calciteQueryParsingOption =
407  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
408  const auto calciteOptimizationOption =
409  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {}, false);
410  const auto query_ra = calcite_mgr
411  ->process(query_state->createQueryStateProxy(),
412  pg_shim(query_str),
413  calciteQueryParsingOption,
414  calciteOptimizationOption)
415  .plan_result;
416  auto ra_executor = RelAlgExecutor(executor.get(), query_ra, query_state);
417  return ra_executor.getRootRelAlgNodeShPtr();
418 }
419 
421  std::unordered_map<size_t, std::unordered_map<unsigned, RegisteredQueryHint>>>
422 QueryRunner::getParsedQueryHints(const std::string& query_str) {
425  auto query_state = create_query_state(session_info_, query_str);
426  auto& cat = session_info_->getCatalog();
428  auto calcite_mgr = cat.getCalciteMgr();
429  const auto calciteQueryParsingOption =
430  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
431  const auto calciteOptimizationOption =
432  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {}, false);
433  const auto query_ra = calcite_mgr
434  ->process(query_state->createQueryStateProxy(),
435  pg_shim(query_str),
436  calciteQueryParsingOption,
437  calciteOptimizationOption)
438  .plan_result;
439  auto ra_executor = RelAlgExecutor(executor.get(), query_ra, query_state);
440  return ra_executor.getParsedQueryHints();
441 }
442 
443 std::optional<RegisteredQueryHint> QueryRunner::getParsedGlobalQueryHints(
444  const std::string& query_str) {
447  auto query_state = create_query_state(session_info_, query_str);
448  auto& cat = session_info_->getCatalog();
450  auto calcite_mgr = cat.getCalciteMgr();
451  const auto calciteQueryParsingOption =
452  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
453  const auto calciteOptimizationOption =
454  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {}, false);
455  const auto query_ra = calcite_mgr
456  ->process(query_state->createQueryStateProxy(),
457  pg_shim(query_str),
458  calciteQueryParsingOption,
459  calciteOptimizationOption)
460  .plan_result;
461  auto ra_executor = RelAlgExecutor(executor.get(), query_ra, query_state);
462  return ra_executor.getGlobalQueryHint();
463 }
464 
465 RaExecutionSequence QueryRunner::getRaExecutionSequence(const std::string& query_str) {
468  auto query_state = create_query_state(session_info_, query_str);
469  auto& cat = session_info_->getCatalog();
471  auto calcite_mgr = cat.getCalciteMgr();
472  const auto calciteQueryParsingOption =
473  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
474  const auto calciteOptimizationOption =
475  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {}, false);
476  const auto query_ra = calcite_mgr
477  ->process(query_state->createQueryStateProxy(),
478  pg_shim(query_str),
479  calciteQueryParsingOption,
480  calciteOptimizationOption)
481  .plan_result;
482  auto ra_executor = RelAlgExecutor(executor.get(), query_ra, query_state);
483  return ra_executor.getRaExecutionSequence(ra_executor.getRootRelAlgNodeShPtr().get(),
484  executor.get());
485 }
486 
487 // used to validate calcite ddl statements
488 void QueryRunner::validateDDLStatement(const std::string& stmt_str_in) {
490 
491  std::string stmt_str = stmt_str_in;
492  // First remove special chars
493  boost::algorithm::trim_left_if(stmt_str, boost::algorithm::is_any_of("\n"));
494  // Then remove spaces
495  boost::algorithm::trim_left(stmt_str);
496 
497  auto query_state = create_query_state(session_info_, stmt_str);
498  auto stdlog = STDLOG(query_state);
499 
500  auto& cat = session_info_->getCatalog();
501  auto calcite_mgr = cat.getCalciteMgr();
502  const auto calciteQueryParsingOption =
503  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
504  const auto calciteOptimizationOption =
505  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {}, false);
506  calcite_mgr->process(query_state->createQueryStateProxy(),
507  pg_shim(stmt_str),
508  calciteQueryParsingOption,
509  calciteOptimizationOption);
510 }
511 
512 std::shared_ptr<RelAlgTranslator> QueryRunner::getRelAlgTranslator(
513  const std::string& query_str,
514  Executor* executor) {
517  auto query_state = create_query_state(session_info_, query_str);
518  auto& cat = session_info_->getCatalog();
519  auto calcite_mgr = cat.getCalciteMgr();
520  const auto calciteQueryParsingOption =
521  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
522  const auto calciteOptimizationOption =
523  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {}, false);
524  const auto query_ra = calcite_mgr
525  ->process(query_state->createQueryStateProxy(),
526  pg_shim(query_str),
527  calciteQueryParsingOption,
528  calciteOptimizationOption)
529  .plan_result;
530  auto ra_executor = RelAlgExecutor(executor, query_ra);
531  auto root_node_shared_ptr = ra_executor.getRootRelAlgNodeShPtr();
532  return ra_executor.getRelAlgTranslator(root_node_shared_ptr.get());
533 }
534 
535 QueryPlanDagInfo QueryRunner::getQueryInfoForDataRecyclerTest(
536  const std::string& query_str) {
539  auto query_state = create_query_state(session_info_, query_str);
540  auto& cat = session_info_->getCatalog();
542  auto calcite_mgr = cat.getCalciteMgr();
543  const auto calciteQueryParsingOption =
544  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
545  const auto calciteOptimizationOption =
546  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {}, false);
547  const auto query_ra = calcite_mgr
548  ->process(query_state->createQueryStateProxy(),
549  pg_shim(query_str),
550  calciteQueryParsingOption,
551  calciteOptimizationOption)
552  .plan_result;
553  auto ra_executor = RelAlgExecutor(executor.get(), query_ra);
554  // note that we assume the test for data recycler that needs to have join_info
555  // does not contain any ORDER BY clause; this is necessary to create work_unit
556  // without actually performing the query
557  auto root_node_shared_ptr = ra_executor.getRootRelAlgNodeShPtr();
558  auto join_info = ra_executor.getJoinInfo(root_node_shared_ptr.get());
559  auto relAlgTranslator = ra_executor.getRelAlgTranslator(root_node_shared_ptr.get());
560  return {root_node_shared_ptr, join_info.first, join_info.second, relAlgTranslator};
561 }
562 
563 std::unique_ptr<Parser::Stmt> QueryRunner::createStatement(
564  const std::string& stmt_str_in) {
567 
568  std::string stmt_str = stmt_str_in;
569  // First remove special chars
570  boost::algorithm::trim_left_if(stmt_str, boost::algorithm::is_any_of("\n"));
571  // Then remove spaces
572  boost::algorithm::trim_left(stmt_str);
573 
574  ParserWrapper pw{stmt_str};
575 
576  auto query_state = create_query_state(session_info_, stmt_str);
577  auto stdlog = STDLOG(query_state);
578 
579  if (pw.is_ddl) {
580  const auto& cat = session_info_->getCatalog();
581  auto calcite_mgr = cat.getCalciteMgr();
582  const auto calciteQueryParsingOption =
583  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
584  const auto calciteOptimizationOption =
585  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {}, false);
586  const auto query_json = calcite_mgr
587  ->process(query_state->createQueryStateProxy(),
588  pg_shim(stmt_str),
589  calciteQueryParsingOption,
590  calciteOptimizationOption)
591  .plan_result;
592  return Parser::create_stmt_for_json(query_json);
593  }
594 
595  // simply fail here as non-Calcite parsing is about to be removed
596  UNREACHABLE();
597  return nullptr;
598 }
599 
600 void QueryRunner::runDDLStatement(const std::string& stmt_str_in) {
603 
604  std::string stmt_str = stmt_str_in;
605  // First remove special chars
606  boost::algorithm::trim_left_if(stmt_str, boost::algorithm::is_any_of("\n"));
607  // Then remove spaces
608  boost::algorithm::trim_left(stmt_str);
609 
610  ParserWrapper pw{stmt_str};
611 
612  auto query_state = create_query_state(session_info_, stmt_str);
613  auto stdlog = STDLOG(query_state);
614 
615  if (pw.is_ddl || pw.getDMLType() == ParserWrapper::DMLType::Insert) {
616  auto& cat = session_info_->getCatalog();
617  auto calcite_mgr = cat.getCalciteMgr();
618  const auto calciteQueryParsingOption =
619  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
620  const auto calciteOptimizationOption =
621  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {}, false);
622  const auto query_ra = calcite_mgr
623  ->process(query_state->createQueryStateProxy(),
624  pg_shim(stmt_str),
625  calciteQueryParsingOption,
626  calciteOptimizationOption)
627  .plan_result;
628  if (pw.getDMLType() == ParserWrapper::DMLType::Insert) {
629  rapidjson::Document ddl_query;
630  ddl_query.Parse(query_ra);
631  CHECK(ddl_query.HasMember("payload"));
632  CHECK(ddl_query["payload"].IsObject());
633  auto stmt = Parser::InsertValuesStmt(cat, ddl_query["payload"].GetObject());
634  stmt.execute(*session_info_, false /* read only */);
635  return;
636  }
638  executor.execute(false /* read only */);
639  return;
640  }
641 }
642 
643 std::shared_ptr<ResultSet> QueryRunner::runSQL(const std::string& query_str,
645  ExecutionOptions eo) {
648 
649  ParserWrapper pw{query_str};
650  if (pw.getDMLType() == ParserWrapper::DMLType::Insert) {
651  runDDLStatement(query_str);
652  return nullptr;
653  }
654  const auto execution_result = runSelectQuery(query_str, std::move(co), std::move(eo));
655  return execution_result->getRows();
656 }
657 
658 std::shared_ptr<ResultSet> QueryRunner::runSQL(const std::string& query_str,
659  const ExecutorDeviceType device_type,
660  const bool hoist_literals,
661  const bool allow_loop_joins) {
662  auto co = CompilationOptions::defaults(device_type);
663  co.hoist_literals = hoist_literals;
664  return runSQL(
665  query_str, std::move(co), defaultExecutionOptionsForRunSQL(allow_loop_joins));
666 }
667 
668 ExecutionOptions QueryRunner::defaultExecutionOptionsForRunSQL(bool allow_loop_joins,
669  bool just_explain) {
670  return {g_enable_columnar_output,
671  false,
672  true,
673  just_explain,
674  allow_loop_joins,
675  false,
676  false,
677  false,
678  false,
679  10000,
680  false,
681  false,
683  false,
684  0.5,
685  1000,
686  false};
687 }
688 
689 std::shared_ptr<Executor> QueryRunner::getExecutor() const {
692  auto query_state = create_query_state(session_info_, "");
693  auto stdlog = STDLOG(query_state);
695  return executor;
696 }
697 
698 std::shared_ptr<ResultSet> QueryRunner::runSQLWithAllowingInterrupt(
699  const std::string& query_str,
700  const std::string& session_id,
701  const ExecutorDeviceType device_type,
702  const double running_query_check_freq,
703  const unsigned pending_query_check_freq) {
706  auto current_user = session_info_->get_currentUser();
707  auto session_info = std::make_shared<Catalog_Namespace::SessionInfo>(
708  session_info_->get_catalog_ptr(), current_user, device_type, session_id);
709  auto query_state = create_query_state(session_info, query_str);
710  auto stdlog = STDLOG(query_state);
711  auto& cat = query_state->getConstSessionInfo()->getCatalog();
712  std::string query_ra{""};
713 
714  std::shared_ptr<ExecutionResult> result;
715  auto query_launch_task = std::make_shared<QueryDispatchQueue::Task>(
716  [&cat,
717  &query_ra,
718  &device_type,
719  &query_state,
720  &result,
721  &running_query_check_freq,
722  &pending_query_check_freq,
723  parent_thread_local_ids = logger::thread_local_ids()](const size_t worker_id) {
724  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
725  auto executor = Executor::getExecutor(worker_id);
727 
729  false,
730  true,
731  false,
732  true,
733  false,
734  false,
735  false,
736  false,
737  10000,
738  false,
739  false,
741  true,
742  running_query_check_freq,
743  pending_query_check_freq,
744  false};
745  {
746  // async query initiation for interrupt test
747  // incurs data race warning in TSAN since
748  // calcite_mgr is shared across multiple query threads
749  // so here we lock the manager during query parsing
750  std::lock_guard<std::mutex> calcite_lock_guard(calcite_lock);
751  auto calcite_mgr = cat.getCalciteMgr();
752  const auto calciteQueryParsingOption =
753  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
754  const auto calciteOptimizationOption =
755  calcite_mgr->getCalciteOptimizationOption(
756  false, g_enable_watchdog, {}, false);
757  query_ra = calcite_mgr
758  ->process(query_state->createQueryStateProxy(),
759  pg_shim(query_state->getQueryStr()),
760  calciteQueryParsingOption,
761  calciteOptimizationOption)
762  .plan_result;
763  }
764  auto ra_executor = RelAlgExecutor(executor.get(), query_ra, query_state);
765  result = std::make_shared<ExecutionResult>(
766  ra_executor.executeRelAlgQuery(co, eo, false, false, nullptr));
767  });
769  executor->enrollQuerySession(session_id,
770  query_str,
771  query_state->getQuerySubmittedTime(),
773  QuerySessionStatus::QueryStatus::PENDING_QUEUE);
775  dispatch_queue_->submit(query_launch_task, /*is_update_delete=*/false);
776  auto result_future = query_launch_task->get_future();
777  result_future.get();
778  CHECK(result);
779  return result->getRows();
780 }
781 
782 std::vector<std::shared_ptr<ResultSet>> QueryRunner::runMultipleStatements(
783  const std::string& sql,
784  const ExecutorDeviceType dt) {
785  std::vector<std::shared_ptr<ResultSet>> results;
786  // TODO: Need to properly handle escaped semicolons instead of doing a naive split().
787  auto fields = split(sql, ";");
788  for (const auto& field : fields) {
789  auto text = strip(field) + ";";
790  if (text == ";") {
791  continue;
792  }
793 
794  ParserWrapper pw{text};
795  if (pw.is_ddl || pw.getDMLType() == ParserWrapper::DMLType::Insert) {
796  runDDLStatement(text);
797  results.push_back(nullptr);
798  } else {
799  // is not DDL, then assume it's DML and try to execute
800  results.push_back(runSQL(text, dt, true, true));
801  }
802  }
803  return results;
804 }
805 
806 void QueryRunner::runImport(Parser::CopyTableStmt* import_stmt) {
807  CHECK(import_stmt);
808  import_stmt->execute(*session_info_, false /* read only */);
809 }
810 
811 std::unique_ptr<import_export::Loader> QueryRunner::getLoader(
812  const TableDescriptor* td) const {
813  auto cat = getCatalog();
814  return std::make_unique<import_export::Loader>(*cat, td);
815 }
816 
817 namespace {
818 
819 std::shared_ptr<ExecutionResult> run_select_query_with_filter_push_down(
820  QueryStateProxy query_state_proxy,
821  const ExecutorDeviceType device_type,
822  const bool hoist_literals,
823  const bool allow_loop_joins,
824  const bool just_explain,
825  const ExecutorExplainType explain_type,
826  const bool with_filter_push_down) {
827  auto& cat = query_state_proxy->getConstSessionInfo()->getCatalog();
830  co.explain_type = explain_type;
831 
834  eo.just_explain = just_explain;
835  eo.allow_loop_joins = allow_loop_joins;
836  eo.find_push_down_candidates = with_filter_push_down;
838 
839  auto calcite_mgr = cat.getCalciteMgr();
840  const auto calciteQueryParsingOption =
841  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
842  auto calciteOptimizationOption =
843  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {}, false);
844  const auto query_ra = calcite_mgr
845  ->process(query_state_proxy,
846  pg_shim(query_state_proxy->getQueryStr()),
847  calciteQueryParsingOption,
848  calciteOptimizationOption)
849  .plan_result;
850  auto ra_executor = RelAlgExecutor(executor.get(), query_ra);
851  auto result = std::make_shared<ExecutionResult>(
852  ra_executor.executeRelAlgQuery(co, eo, false, false, nullptr));
853  const auto& filter_push_down_requests = result->getPushedDownFilterInfo();
854  if (!filter_push_down_requests.empty()) {
855  std::vector<TFilterPushDownInfo> filter_push_down_info;
856  for (const auto& req : filter_push_down_requests) {
857  TFilterPushDownInfo filter_push_down_info_for_request;
858  filter_push_down_info_for_request.input_prev = req.input_prev;
859  filter_push_down_info_for_request.input_start = req.input_start;
860  filter_push_down_info_for_request.input_next = req.input_next;
861  filter_push_down_info.push_back(filter_push_down_info_for_request);
862  }
863  calciteOptimizationOption.filter_push_down_info = filter_push_down_info;
864  const auto new_query_ra = calcite_mgr
865  ->process(query_state_proxy,
866  pg_shim(query_state_proxy->getQueryStr()),
867  calciteQueryParsingOption,
868  calciteOptimizationOption)
869  .plan_result;
870  auto eo_modified = eo;
871  eo_modified.find_push_down_candidates = false;
872  eo_modified.just_calcite_explain = false;
873  auto new_ra_executor = RelAlgExecutor(executor.get(), new_query_ra);
874  return std::make_shared<ExecutionResult>(
875  new_ra_executor.executeRelAlgQuery(co, eo_modified, false, false, nullptr));
876  } else {
877  return result;
878  }
879 }
880 
881 } // namespace
882 
883 std::shared_ptr<ResultSet> QueryRunner::getCalcitePlan(const std::string& query_str,
884  bool enable_watchdog,
885  bool is_explain_as_json_str,
886  bool is_explain_detailed) const {
889  const auto& cat = session_info_->getCatalog();
890  auto query_state = create_query_state(session_info_, query_str);
891  auto stdlog = STDLOG(query_state);
892 
893  std::shared_ptr<ResultSet> result;
894  auto query_launch_task = std::make_shared<QueryDispatchQueue::Task>(
895  [&cat,
896  &query_str,
897  &enable_watchdog,
898  &is_explain_as_json_str,
899  &is_explain_detailed,
900  &query_state,
901  &result,
902  parent_thread_local_ids = logger::thread_local_ids()](const size_t worker_id) {
903  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
904  auto executor = Executor::getExecutor(worker_id);
905  auto calcite_mgr = cat.getCalciteMgr();
906  // Calcite returns its plan as a form of `json_str` by default,
907  // so we set `is_explain` to TRUE if `!is_explain_as_json_str`
908  const auto calciteQueryParsingOption = calcite_mgr->getCalciteQueryParsingOption(
909  true, !is_explain_as_json_str, false, is_explain_detailed);
910  const auto calciteOptimizationOption = calcite_mgr->getCalciteOptimizationOption(
911  g_enable_calcite_view_optimize, enable_watchdog, {}, false);
912  const auto query_ra = calcite_mgr
913  ->process(query_state->createQueryStateProxy(),
914  pg_shim(query_str),
915  calciteQueryParsingOption,
916  calciteOptimizationOption)
917  .plan_result;
918  result = std::make_shared<ResultSet>(query_ra);
919  return result;
920  });
922  dispatch_queue_->submit(query_launch_task, /*is_update_delete=*/false);
923  auto result_future = query_launch_task->get_future();
924  result_future.get();
925  CHECK(result);
926  return result;
927 }
928 
929 std::shared_ptr<ExecutionResult> QueryRunner::runSelectQuery(const std::string& query_str,
931  ExecutionOptions eo) {
934  auto query_state = create_query_state(session_info_, query_str);
935  auto stdlog = STDLOG(query_state);
937  return run_select_query_with_filter_push_down(query_state->createQueryStateProxy(),
938  co.device_type,
939  co.hoist_literals,
940  eo.allow_loop_joins,
941  eo.just_explain,
944  }
945 
946  auto& cat = session_info_->getCatalog();
947 
948  std::shared_ptr<ExecutionResult> result;
949  auto query_launch_task = std::make_shared<QueryDispatchQueue::Task>(
950  [&cat,
951  &query_str,
952  &co,
953  explain_type = this->explain_type_,
954  &eo,
955  &query_state,
956  &result,
957  parent_thread_local_ids = logger::thread_local_ids()](const size_t worker_id) {
958  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
959  auto executor = Executor::getExecutor(worker_id);
960  // TODO The next line should be deleted since it overwrites co, but then
961  // NycTaxiTest.RunSelectsEncodingDictWhereGreater fails due to co not getting
962  // reset to its default values.
963  co = CompilationOptions::defaults(co.device_type);
964  co.explain_type = explain_type;
965  auto calcite_mgr = cat.getCalciteMgr();
966  const auto calciteQueryParsingOption =
967  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
968  const auto calciteOptimizationOption = calcite_mgr->getCalciteOptimizationOption(
970  const auto query_ra = calcite_mgr
971  ->process(query_state->createQueryStateProxy(),
972  pg_shim(query_str),
973  calciteQueryParsingOption,
974  calciteOptimizationOption)
975  .plan_result;
976  auto ra_executor = RelAlgExecutor(executor.get(), query_ra);
977  result = std::make_shared<ExecutionResult>(
978  ra_executor.executeRelAlgQuery(co, eo, false, false, nullptr));
979  });
981  dispatch_queue_->submit(query_launch_task, /*is_update_delete=*/false);
982  auto result_future = query_launch_task->get_future();
983  result_future.get();
984  CHECK(result);
985  return result;
986 }
987 
988 std::shared_ptr<ExecutionResult> QueryRunner::runSelectQuery(
989  const std::string& query_str,
990  const ExecutorDeviceType device_type,
991  const bool hoist_literals,
992  const bool allow_loop_joins,
993  const bool just_explain) {
994  auto co = CompilationOptions::defaults(device_type);
995  co.hoist_literals = hoist_literals;
996  return runSelectQuery(query_str,
997  std::move(co),
998  defaultExecutionOptionsForRunSQL(allow_loop_joins, just_explain));
999 }
1000 
1001 ExtractedQueryPlanDag QueryRunner::extractQueryPlanDag(const std::string& query_str) {
1002  auto query_dag_info = getQueryInfoForDataRecyclerTest(query_str);
1004  auto extracted_dag_info = QueryPlanDagExtractor::extractQueryPlanDag(
1005  query_dag_info.root_node.get(), executor);
1006  return extracted_dag_info;
1007 }
1008 
1009 std::unique_ptr<RelAlgDag> QueryRunner::getRelAlgDag(const std::string& query_str) {
1012  auto query_state = create_query_state(session_info_, query_str);
1013  auto stdlog = STDLOG(query_state);
1014  auto& cat = session_info_->getCatalog();
1015 
1016  std::unique_ptr<RelAlgDag> rel_alg_dag;
1017  auto query_launch_task = std::make_shared<QueryDispatchQueue::Task>(
1018  [&cat,
1019  &query_str,
1020  &query_state,
1021  &rel_alg_dag,
1022  parent_thread_local_ids = logger::thread_local_ids()](const size_t worker_id) {
1023  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1024  auto executor = Executor::getExecutor(worker_id);
1025  auto eo = ExecutionOptions::defaults();
1026  auto calcite_mgr = cat.getCalciteMgr();
1027  const auto calciteQueryParsingOption =
1028  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
1029  const auto calciteOptimizationOption = calcite_mgr->getCalciteOptimizationOption(
1031  const auto query_ra = calcite_mgr
1032  ->process(query_state->createQueryStateProxy(),
1033  pg_shim(query_str),
1034  calciteQueryParsingOption,
1035  calciteOptimizationOption)
1036  .plan_result;
1037  auto ra_executor = RelAlgExecutor(executor.get(), query_ra);
1038  rel_alg_dag = ra_executor.getOwnedRelAlgDag();
1039  });
1041  dispatch_queue_->submit(query_launch_task, /*is_update_delete=*/false);
1042  auto result_future = query_launch_task->get_future();
1043  result_future.get();
1044  CHECK(rel_alg_dag);
1045  return rel_alg_dag;
1046 }
1047 
1048 // this function exists to test data recycler
1049 // specifically, it is tricky to get a hashtable cache key when we only know
1050 // a target query sql in test code
1051 // so this function utilizes an incorrect way to manipulate our hashtable recycler
1052 // but provides the cached hashtable for performing the test
1053 // a set "visited" contains cached hashtable keys that we have retrieved so far
1054 // based on that, this function iterates hashtable cache and return a cached one
1055 // when its hashtable cache key has not been visited yet
1056 // for instance, if we call this funtion with an empty "visited" key, we return
1057 // the first hashtable that its iterator visits
1058 std::tuple<QueryPlanHash,
1059  std::shared_ptr<HashTable>,
1060  std::optional<HashtableCacheMetaInfo>>
1061 QueryRunner::getCachedHashtableWithoutCacheKey(std::set<size_t>& visited,
1062  CacheItemType hash_table_type,
1063  DeviceIdentifier device_identifier) {
1064  HashtableRecycler* hash_table_cache{nullptr};
1065  switch (hash_table_type) {
1067  hash_table_cache = PerfectJoinHashTable::getHashTableCache();
1068  break;
1069  }
1071  hash_table_cache = BaselineJoinHashTable::getHashTableCache();
1072  break;
1073  }
1076  break;
1077  }
1078  default: {
1079  UNREACHABLE();
1080  break;
1081  }
1082  }
1083  CHECK(hash_table_cache);
1084  return hash_table_cache->getCachedHashtableWithoutCacheKey(
1085  visited, hash_table_type, device_identifier);
1086 }
1087 
1088 std::shared_ptr<CacheItemMetric> QueryRunner::getCacheItemMetric(
1089  QueryPlanHash cache_key,
1090  CacheItemType hash_table_type,
1091  DeviceIdentifier device_identifier) {
1092  HashtableRecycler* hash_table_cache{nullptr};
1093  switch (hash_table_type) {
1095  hash_table_cache = PerfectJoinHashTable::getHashTableCache();
1096  break;
1097  }
1099  hash_table_cache = BaselineJoinHashTable::getHashTableCache();
1100  break;
1101  }
1104  break;
1105  }
1106  default: {
1107  UNREACHABLE();
1108  break;
1109  }
1110  }
1111  CHECK(hash_table_cache);
1112  return hash_table_cache->getCachedItemMetric(
1113  hash_table_type, device_identifier, cache_key);
1114 }
1115 
1116 size_t QueryRunner::getNumberOfCachedItem(CacheItemStatus item_status,
1117  CacheItemType hash_table_type,
1118  bool with_bbox_intersect_tuning_param) const {
1119  auto get_num_cached_auto_tuner_param = [&item_status]() {
1120  auto auto_tuner_cache =
1122  CHECK(auto_tuner_cache);
1123  switch (item_status) {
1124  case CacheItemStatus::ALL: {
1125  return auto_tuner_cache->getCurrentNumCachedItems(
1128  }
1130  return auto_tuner_cache->getCurrentNumCleanCachedItems(
1133  }
1135  return auto_tuner_cache->getCurrentNumDirtyCachedItems(
1138  }
1139  default: {
1140  UNREACHABLE();
1141  return static_cast<size_t>(0);
1142  }
1143  }
1144  };
1145 
1146  auto get_num_cached_hashtable =
1147  [&item_status,
1148  &hash_table_type,
1149  &with_bbox_intersect_tuning_param,
1150  &get_num_cached_auto_tuner_param](HashtableRecycler* hash_table_cache) {
1151  switch (item_status) {
1152  case CacheItemStatus::ALL: {
1153  if (with_bbox_intersect_tuning_param) {
1154  // we assume additional consideration of turing param cache is only valid
1155  // for bounding box intersection
1156  CHECK_EQ(hash_table_type, CacheItemType::BBOX_INTERSECT_HT);
1157  return hash_table_cache->getCurrentNumCachedItems(
1158  hash_table_type, DataRecyclerUtil::CPU_DEVICE_IDENTIFIER) +
1159  get_num_cached_auto_tuner_param();
1160  }
1161  return hash_table_cache->getCurrentNumCachedItems(
1162  hash_table_type, DataRecyclerUtil::CPU_DEVICE_IDENTIFIER);
1163  }
1165  if (with_bbox_intersect_tuning_param) {
1166  CHECK_EQ(hash_table_type, CacheItemType::BBOX_INTERSECT_HT);
1167  return hash_table_cache->getCurrentNumCleanCachedItems(
1168  hash_table_type, DataRecyclerUtil::CPU_DEVICE_IDENTIFIER) +
1169  get_num_cached_auto_tuner_param();
1170  }
1171  return hash_table_cache->getCurrentNumCleanCachedItems(
1172  hash_table_type, DataRecyclerUtil::CPU_DEVICE_IDENTIFIER);
1173  }
1175  if (with_bbox_intersect_tuning_param) {
1176  CHECK_EQ(hash_table_type, CacheItemType::BBOX_INTERSECT_HT);
1177  return hash_table_cache->getCurrentNumDirtyCachedItems(
1178  hash_table_type, DataRecyclerUtil::CPU_DEVICE_IDENTIFIER) +
1179  get_num_cached_auto_tuner_param();
1180  }
1181  return hash_table_cache->getCurrentNumDirtyCachedItems(
1182  hash_table_type, DataRecyclerUtil::CPU_DEVICE_IDENTIFIER);
1183  }
1184  default: {
1185  UNREACHABLE();
1186  return static_cast<size_t>(0);
1187  }
1188  }
1189  };
1190 
1191  switch (hash_table_type) {
1193  auto hash_table_cache = PerfectJoinHashTable::getHashTableCache();
1194  CHECK(hash_table_cache);
1195  return get_num_cached_hashtable(hash_table_cache);
1196  }
1198  auto hash_table_cache = BaselineJoinHashTable::getHashTableCache();
1199  CHECK(hash_table_cache);
1200  return get_num_cached_hashtable(hash_table_cache);
1201  }
1203  auto hash_table_cache = BoundingBoxIntersectJoinHashTable::getHashTableCache();
1204  CHECK(hash_table_cache);
1205  return get_num_cached_hashtable(hash_table_cache);
1206  }
1208  return get_num_cached_auto_tuner_param();
1209  }
1210  default: {
1211  UNREACHABLE();
1212  return 0;
1213  }
1214  }
1215  return 0;
1216 }
1217 
1218 void QueryRunner::reset() {
1219  qr_instance_->query_engine_.reset();
1220  qr_instance_.reset(nullptr);
1222 }
1223 
1224 ImportDriver::ImportDriver(std::shared_ptr<Catalog_Namespace::Catalog> cat,
1225  const Catalog_Namespace::UserMetadata& user,
1226  const ExecutorDeviceType dt,
1227  const std::string session_id)
1228  : QueryRunner(
1229  std::make_unique<Catalog_Namespace::SessionInfo>(cat, user, dt, session_id)) {}
1230 
1231 void ImportDriver::importGeoTable(const std::string& file_path,
1232  const std::string& table_name,
1233  const bool compression,
1234  const bool create_table,
1235  const bool explode_collections) {
1236  using namespace import_export;
1237 
1238  static constexpr bool kIsGeoRaster{false};
1239 
1241 
1242  CopyParams copy_params;
1244  if (compression) {
1245  copy_params.geo_coords_encoding = EncodingType::kENCODING_GEOINT;
1246  copy_params.geo_coords_comp_param = 32;
1247  } else {
1248  copy_params.geo_coords_encoding = EncodingType::kENCODING_NONE;
1249  copy_params.geo_coords_comp_param = 0;
1250  }
1251  copy_params.geo_explode_collections = explode_collections;
1252 
1253  std::map<std::string, std::string> colname_to_src;
1254  auto& cat = session_info_->getCatalog();
1255  auto cds = Importer::gdalToColumnDescriptors(
1256  file_path, kIsGeoRaster, Geospatial::kGeoColumnName, copy_params);
1257 
1258  for (auto& cd : cds) {
1259  const auto col_name_sanitized = ImportHelpers::sanitize_name(cd.columnName);
1260  const auto ret =
1261  colname_to_src.insert(std::make_pair(col_name_sanitized, cd.columnName));
1262  CHECK(ret.second);
1263  cd.columnName = col_name_sanitized;
1264  }
1265 
1266  if (create_table) {
1267  const auto td = cat.getMetadataForTable(table_name);
1268  if (td != nullptr) {
1269  throw std::runtime_error(
1270  "Error: Table " + table_name +
1271  " already exists. Possible failure to correctly re-create " +
1272  shared::kDataDirectoryName + " directory.");
1273  }
1274  if (table_name != ImportHelpers::sanitize_name(table_name)) {
1275  throw std::runtime_error("Invalid characters in table name: " + table_name);
1276  }
1277 
1278  std::string stmt{"CREATE TABLE " + table_name};
1279  std::vector<std::string> col_stmts;
1280 
1281  for (auto& cd : cds) {
1282  if (cd.columnType.get_type() == SQLTypes::kINTERVAL_DAY_TIME ||
1283  cd.columnType.get_type() == SQLTypes::kINTERVAL_YEAR_MONTH) {
1284  throw std::runtime_error(
1285  "Unsupported type: INTERVAL_DAY_TIME or INTERVAL_YEAR_MONTH for col " +
1286  cd.columnName + " (table: " + table_name + ")");
1287  }
1288 
1289  if (cd.columnType.get_type() == SQLTypes::kDECIMAL) {
1290  if (cd.columnType.get_precision() == 0 && cd.columnType.get_scale() == 0) {
1291  cd.columnType.set_precision(14);
1292  cd.columnType.set_scale(7);
1293  }
1294  }
1295 
1296  std::string col_stmt;
1297  col_stmt.append(cd.columnName + " " + cd.columnType.get_type_name() + " ");
1298 
1299  if (cd.columnType.get_compression() != EncodingType::kENCODING_NONE) {
1300  col_stmt.append("ENCODING " + cd.columnType.get_compression_name() + " ");
1301  } else {
1302  if (cd.columnType.is_string()) {
1303  col_stmt.append("ENCODING NONE");
1304  } else if (cd.columnType.is_geometry()) {
1305  if (cd.columnType.get_output_srid() == 4326) {
1306  col_stmt.append("ENCODING NONE");
1307  }
1308  }
1309  }
1310  col_stmts.push_back(col_stmt);
1311  }
1312 
1313  stmt.append(" (" + boost::algorithm::join(col_stmts, ",") + ");");
1314  runDDLStatement(stmt);
1315 
1316  LOG(INFO) << "Created table: " << table_name;
1317  } else {
1318  LOG(INFO) << "Not creating table: " << table_name;
1319  }
1320 
1321  const auto td = cat.getMetadataForTable(table_name);
1322  if (td == nullptr) {
1323  throw std::runtime_error("Error: Failed to create table " + table_name);
1324  }
1325 
1326  import_export::Importer importer(cat, td, file_path, copy_params);
1327  auto ms = measure<>::execution(
1328  [&]() { importer.importGDAL(colname_to_src, session_info_.get(), kIsGeoRaster); });
1329  LOG(INFO) << "Import Time for " << table_name << ": " << (double)ms / 1000.0 << " s";
1330 }
1331 
1332 } // namespace QueryRunner
bool g_enable_calcite_view_optimize
Definition: QueryRunner.cpp:59
Classes used to wrap parser calls for calcite redirection.
static void addUdfs(const std::string &json_func_sigs)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
ImportStatus importGDAL(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info, const bool is_raster)
Definition: Importer.cpp:5226
#define CALCITEPORT
Definition: QueryRunner.cpp:51
size_t DeviceIdentifier
Definition: DataRecycler.h:129
const std::string kDataDirectoryName
static std::vector< TableFunction > get_table_funcs()
static ExtractedQueryPlanDag extractQueryPlanDag(const RelAlgNode *top_node, Executor *executor)
static void loadTestRuntimeLibs()
std::string cat(Ts &&...args)
std::unique_ptr< QueryDispatchQueue > dispatch_queue_
Definition: QueryRunner.h:332
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
std::string const & getQueryStr() const
Definition: QueryState.h:159
static void initialize(const std::string &data_dir, const std::string &allowed_import_paths, const std::string &allowed_export_paths)
Definition: DdlUtils.cpp:878
const std::string kDefaultDiskCacheDirName
static void loadRuntimeLibs(const std::string &torch_lib_path=std::string())
std::string strip(std::string_view str)
trim any whitespace from the left and right ends of a string
ImportDriver(std::shared_ptr< Catalog_Namespace::Catalog > cat, const Catalog_Namespace::UserMetadata &user, const ExecutorDeviceType dt=ExecutorDeviceType::GPU, const std::string session_id="")
#define LOG(tag)
Definition: Logger.h:285
ExecutorExplainType explain_type_
Definition: QueryRunner.h:328
std::string join(T const &container, std::string const &delim)
static void add(const std::string &json_func_sigs)
#define UNREACHABLE()
Definition: Logger.h:338
std::mutex calcite_lock
Definition: QueryRunner.cpp:60
std::optional< std::unordered_map< size_t, std::unordered_map< unsigned, RegisteredQueryHint > > > getParsedQueryHints()
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
static void init_resource_mgr(const size_t num_cpu_slots, const size_t num_gpu_slots, const size_t cpu_result_mem, const size_t cpu_buffer_pool_mem, const size_t gpu_buffer_pool_mem, const double per_query_max_cpu_slots_ratio, const double per_query_max_cpu_result_mem_ratio, const bool allow_cpu_kernel_concurrency, const bool allow_cpu_gpu_kernel_concurrency, const bool allow_cpu_slot_oversubscription_concurrency, const bool allow_cpu_result_mem_oversubscription, const double max_available_resource_use_ratio)
Definition: Execute.cpp:5387
void set_once_fatal_func(FatalFunc fatal_func)
Definition: Logger.cpp:394
Catalog_Namespace::DBMetadata db_metadata_
Definition: QueryRunner.h:330
static ExecutionOptions defaultExecutionOptionsForRunSQL(bool allow_loop_joins=true, bool just_explain=false)
ExecutorDeviceType
virtual std::shared_ptr< ResultSet > runSQL(const std::string &query_str, CompilationOptions co, ExecutionOptions eo)
const std::string kGeoColumnName
Definition: ColumnNames.h:23
std::vector< std::string > split(std::string_view str, std::string_view delim, std::optional< size_t > maxsplit)
split apart a string into a vector of substrings
#define LOG_IF(severity, condition)
Definition: Logger.h:384
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:535
static void addShutdownCallback(std::function< void()> shutdown_callback)
std::shared_ptr< const RelAlgNode > getRootRelAlgNodeShPtr() const
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:513
std::shared_ptr< QueryEngine > query_engine_
Definition: QueryRunner.h:333
bool g_enable_executor_resource_mgr
Definition: Execute.cpp:178
This file contains the class specification and related data structures for Catalog.
RaExecutionSequence getRaExecutionSequence(const RelAlgNode *root_node, Executor *executor)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
bool g_enable_columnar_output
Definition: Execute.cpp:106
Supported runtime functions management and retrieval.
static SysCatalog & instance()
Definition: SysCatalog.h:343
void execute(const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override
Classes representing a parse tree.
CacheItemType
Definition: DataRecycler.h:38
const std::string kDefaultDbName
std::string g_base_path
Definition: SysCatalog.cpp:62
void init(LogOptions const &log_opts)
Definition: Logger.cpp:364
std::unique_ptr< Parser::Stmt > create_stmt_for_json(const std::string &query_json)
static HashtableRecycler * getHashTableCache()
static std::shared_ptr< QueryEngine > createInstance(CudaMgr_Namespace::CudaMgr *cuda_mgr, bool cpu_only)
Definition: QueryEngine.h:97
A container for relational algebra descriptors defining the execution order for a relational algebra ...
ExecutorExplainType explain_type
bool g_enable_watchdog
virtual void runDDLStatement(const std::string &)
static std::unique_ptr< QueryRunner > qr_instance_
Definition: QueryRunner.h:326
double g_gpu_mem_limit_percent
Definition: QueryRunner.cpp:56
import_export::SourceType source_type
Definition: CopyParams.h:57
bool g_serialize_temp_tables
Definition: Catalog.cpp:109
ExecutorDeviceType device_type
std::optional< RegisteredQueryHint > getGlobalQueryHint()
void importGeoTable(const std::string &file_path, const std::string &table_name, const bool compression, const bool create_table, const bool explode_collections)
const std::string kRootUsername
static RegisteredQueryHint defaults()
Definition: QueryHint.h:379
TExtArgumentType::type to_thrift(const ExtArgumentType &t)
std::shared_ptr< ExecutionResult > run_select_query_with_filter_push_down(QueryStateProxy query_state_proxy, const ExecutorDeviceType device_type, const bool hoist_literals, const bool allow_loop_joins, const bool just_explain, const ExecutorExplainType explain_type, const bool with_filter_push_down)
ExecutionResult execute(bool read_only_mode)
bool g_read_only
Definition: heavyai_locks.h:21
std::string sanitize_name(const std::string &name, const bool underscore=false)
std::shared_ptr< Catalog_Namespace::SessionInfo > session_info_
Definition: QueryRunner.h:331
size_t QueryPlanHash
virtual std::shared_ptr< ExecutionResult > runSelectQuery(const std::string &query_str, CompilationOptions co, ExecutionOptions eo)
std::shared_ptr< Calcite > g_calcite
Definition: QueryRunner.cpp:65
const std::string kCatalogDirectoryName
static std::shared_ptr< query_state::QueryState > create_query_state(Ts &&...args)
Definition: QueryRunner.h:304
static CompilationOptions defaults(const ExecutorDeviceType device_type=ExecutorDeviceType::GPU)
bool g_enable_filter_push_down
Definition: Execute.cpp:102
std::string pg_shim(std::string const &query)
QueryPlanDagInfo getQueryInfoForDataRecyclerTest(const std::string &)
#define CHECK(condition)
Definition: Logger.h:291
static HashtableRecycler * getHashTableCache()
double gpu_input_mem_limit_percent
Serializers for query engine types to/from thrift.
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
static constexpr ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
size_t g_leaf_count
Definition: ParserNode.cpp:79
void init_table_functions()
std::unique_ptr< RelAlgDag > getOwnedRelAlgDag()
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
int cpu_threads()
Definition: thread_count.h:25
static ExecutionOptions defaults()
ExecutorExplainType
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:84
static BoundingBoxIntersectTuningParamRecycler * getBoundingBoxIntersectTuningParamCache()
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:882
#define STDLOG(...)
Definition: QueryState.h:234
std::atomic< bool > isSuper
Definition: SysCatalog.h:107