OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DBEngine.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "DBEngine.h"
18 #include <boost/filesystem.hpp>
19 #include <stdexcept>
22 #include "LockMgr/LockMgr.h"
23 #include "Parser/ParserWrapper.h"
25 #include "QueryEngine/Execute.h"
28 #include "Shared/SysDefinitions.h"
31 
32 extern bool g_enable_thrift_logs;
33 extern bool g_enable_union;
34 extern bool g_serialize_temp_tables;
35 
36 namespace EmbeddedDatabase {
37 
38 class DBEngineImpl;
39 
43 class CursorImpl : public Cursor {
44  public:
45  CursorImpl(std::shared_ptr<ResultSet> result_set, std::vector<std::string> col_names)
46  : result_set_(result_set), col_names_(col_names) {}
47 
49  col_names_.clear();
50  record_batch_.reset();
51  result_set_.reset();
52  }
53 
54  size_t getColCount() { return result_set_ ? result_set_->colCount() : 0; }
55 
56  size_t getRowCount() { return result_set_ ? result_set_->rowCount() : 0; }
57 
59  if (result_set_) {
60  auto row = result_set_->getNextRow(true, false);
61  return row.empty() ? Row() : Row(row);
62  }
63  return Row();
64  }
65 
66  ColumnType getColType(uint32_t col_num) {
67  if (col_num < getColCount()) {
68  SQLTypeInfo type_info = result_set_->getColType(col_num);
69  return sqlToColumnType(type_info.get_type());
70  }
71  return ColumnType::UNKNOWN;
72  }
73 
74  std::shared_ptr<arrow::RecordBatch> getArrowRecordBatch() {
75  if (record_batch_) {
76  return record_batch_;
77  }
78  auto col_count = getColCount();
79  if (col_count > 0) {
80  auto row_count = getRowCount();
81  if (row_count > 0) {
82  auto converter =
83  std::make_unique<ArrowResultSetConverter>(result_set_, col_names_, -1);
84  record_batch_ = converter->convertToArrow();
85  return record_batch_;
86  }
87  }
88  return nullptr;
89  }
90 
91  private:
92  std::shared_ptr<ResultSet> result_set_;
93  std::vector<std::string> col_names_;
94  std::shared_ptr<arrow::RecordBatch> record_batch_;
95 };
96 
100 class DBEngineImpl : public DBEngine {
101  public:
103 
105 
106  bool init(const std::string& cmd_line) {
107  static bool initialized{false};
108  if (initialized) {
109  throw std::runtime_error("Database engine already initialized");
110  }
111 
113 
114  // Split the command line into parameters
115  std::vector<std::string> parameters;
116  if (!cmd_line.empty()) {
117  parameters = boost::program_options::split_unix(cmd_line);
118  }
119 
120  // Generate command line to initialize CommandLineOptions for DBHandler
121  const char* log_option = "omnisci_dbe";
122  std::vector<const char*> cstrings;
123  cstrings.push_back(log_option);
124  for (auto& param : parameters) {
125  cstrings.push_back(param.c_str());
126  }
127  int argc = cstrings.size();
128  const char** argv = cstrings.data();
129 
130  CommandLineOptions prog_config_opts(log_option);
131  if (prog_config_opts.parse_command_line(argc, argv, false)) {
132  throw std::runtime_error("DBE paramerameters parsing failed");
133  }
134 
135  if (!g_enable_thrift_logs) {
136  apache::thrift::GlobalOutput.setOutputFunction([](const char* msg) {});
137  }
138 
139  auto base_path = prog_config_opts.base_path;
140 
141  // Check path to the database
142  bool is_new_db = base_path.empty() || !catalogExists(base_path);
143  if (is_new_db) {
144  base_path = createCatalog(base_path);
145  if (base_path.empty()) {
146  throw std::runtime_error("Database directory could not be created");
147  }
148  }
149  prog_config_opts.base_path = base_path;
150  prog_config_opts.init_logging();
151 
152  prog_config_opts.system_parameters.omnisci_server_port = -1;
153  prog_config_opts.system_parameters.calcite_keepalive = true;
154 
155  try {
156  db_handler_ =
157  std::make_shared<DBHandler>(prog_config_opts.db_leaves,
158  prog_config_opts.string_leaves,
159  prog_config_opts.base_path,
160  prog_config_opts.allow_multifrag,
161  prog_config_opts.jit_debug,
162  prog_config_opts.intel_jit_profile,
163  prog_config_opts.read_only,
164  prog_config_opts.allow_loop_joins,
165  prog_config_opts.enable_rendering,
166  prog_config_opts.renderer_prefer_igpu,
167  prog_config_opts.renderer_vulkan_timeout_ms,
168  prog_config_opts.enable_auto_clear_render_mem,
169  prog_config_opts.render_oom_retry_threshold,
170  prog_config_opts.render_mem_bytes,
171  prog_config_opts.max_concurrent_render_sessions,
172  prog_config_opts.reserved_gpu_mem,
173  prog_config_opts.render_compositor_use_last_gpu,
174  prog_config_opts.num_reader_threads,
175  prog_config_opts.authMetadata,
176  prog_config_opts.system_parameters,
177  prog_config_opts.enable_legacy_syntax,
178  prog_config_opts.idle_session_duration,
179  prog_config_opts.max_session_duration,
180  prog_config_opts.udf_file_name,
181  prog_config_opts.udf_compiler_path,
182  prog_config_opts.udf_compiler_options,
183 #ifdef ENABLE_GEOS
184  prog_config_opts.libgeos_so_filename,
185 #endif
186  prog_config_opts.disk_cache_config,
187  is_new_db);
188  } catch (const std::exception& e) {
189  LOG(FATAL) << "Failed to initialize database handler: " << e.what();
190  }
191  db_handler_->connect(session_id_,
195  base_path_ = base_path;
196  initialized = true;
197  return true;
198  }
199 
200  std::shared_ptr<CursorImpl> sql_execute_dbe(const TSessionId& session_id,
201  const std::string& query_str,
202  const bool column_format,
203  const int32_t first_n,
204  const int32_t at_most_n) {
205  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
208  nullptr,
209  0,
210  0),
211  {}};
213  db_handler_->sql_execute(
214  result, session_id, query_str, column_format, first_n, at_most_n, locks);
215  auto& targets = result.getTargetsMeta();
216  std::vector<std::string> col_names;
217  for (const auto target : targets) {
218  col_names.push_back(target.get_resname());
219  }
220  return std::make_shared<CursorImpl>(result.getRows(), col_names);
221  }
222 
223  void executeDDL(const std::string& query) {
224  auto res = sql_execute_dbe(session_id_, query, false, -1, -1);
225  }
226 
227  void importArrowTable(const std::string& name,
228  std::shared_ptr<arrow::Table>& table,
229  uint64_t fragment_size) {
230  setArrowTable(name, table);
231  try {
232  auto session = db_handler_->get_session_copy(session_id_);
233  TableDescriptor td;
234  td.tableName = name;
235  td.userId = session.get_currentUser().userId;
236  td.storageType = "ARROW:" + name;
238  td.isView = false;
239  td.fragmenter = nullptr;
241  td.maxFragRows = fragment_size > 0 ? fragment_size : DEFAULT_FRAGMENT_ROWS;
245  td.keyMetainfo = "[]";
246 
247  std::list<ColumnDescriptor> cols;
248  std::vector<Parser::SharedDictionaryDef> dictionaries;
249  auto catalog = session.get_catalog_ptr();
250  // nColumns
251  catalog->createTable(td, cols, dictionaries, false);
253  session.get_currentUser(), td.tableName, TableDBObjectType, *catalog);
254  } catch (...) {
255  releaseArrowTable(name);
256  throw;
257  }
258  releaseArrowTable(name);
259  }
260 
261  std::shared_ptr<CursorImpl> executeDML(const std::string& query) {
262  return sql_execute_dbe(session_id_, query, false, -1, -1);
263  }
264 
265  std::shared_ptr<CursorImpl> executeRA(const std::string& query) {
266  return sql_execute_dbe(session_id_, query, false, -1, -1);
267  }
268 
269  std::vector<std::string> getTables() {
270  std::vector<std::string> table_names;
271  auto catalog = db_handler_->get_session_copy(session_id_).get_catalog_ptr();
272  if (catalog) {
273  const auto tables = catalog->getAllTableMetadata();
274  for (const auto td : tables) {
275  if (td->shard >= 0) {
276  // skip shards, they're not standalone tables
277  continue;
278  }
279  table_names.push_back(td->tableName);
280  }
281  } else {
282  throw std::runtime_error("System catalog uninitialized");
283  }
284  return table_names;
285  }
286 
287  std::vector<ColumnDetails> getTableDetails(const std::string& table_name) {
288  std::vector<ColumnDetails> result;
289  auto catalog = db_handler_->get_session_copy(session_id_).get_catalog_ptr();
290  if (catalog) {
291  auto metadata = catalog->getMetadataForTable(table_name, false);
292  if (metadata) {
293  const auto col_descriptors =
294  catalog->getAllColumnMetadataForTable(metadata->tableId, false, true, false);
295  const auto deleted_cd = catalog->getDeletedColumn(metadata);
296  for (const auto cd : col_descriptors) {
297  if (cd == deleted_cd) {
298  continue;
299  }
300  ColumnDetails col_details;
301  col_details.col_name = cd->columnName;
302  auto ct = cd->columnType;
303  SQLTypes sql_type = ct.get_type();
304  EncodingType sql_enc = ct.get_compression();
305  col_details.col_type = sqlToColumnType(sql_type);
306  col_details.encoding = sqlToColumnEncoding(sql_enc);
307  col_details.nullable = !ct.get_notnull();
308  col_details.is_array = (sql_type == kARRAY);
309  if (IS_GEO(sql_type)) {
310  col_details.precision = static_cast<int>(ct.get_subtype());
311  col_details.scale = ct.get_output_srid();
312  } else {
313  col_details.precision = ct.get_precision();
314  col_details.scale = ct.get_scale();
315  }
316  if (col_details.encoding == ColumnEncoding::DICT) {
317  // have to get the actual size of the encoding from the dictionary
318  // definition
319  const int dict_id = ct.get_comp_param();
320  auto dd = catalog->getMetadataForDict(dict_id, false);
321  if (dd) {
322  col_details.comp_param = dd->dictNBits;
323  } else {
324  throw std::runtime_error("Dictionary definition for column doesn't exist");
325  }
326  } else {
327  col_details.comp_param = ct.get_comp_param();
328  if (ct.is_date_in_days() && col_details.comp_param == 0) {
329  col_details.comp_param = 32;
330  }
331  }
332  result.push_back(col_details);
333  }
334  }
335  }
336  return result;
337  }
338 
339  bool setDatabase(std::string& db_name) {
340  auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
341  auto& user = db_handler_->get_session_copy(session_id_).get_currentUser();
342  sys_cat.switchDatabase(db_name, user.userName);
343  return true;
344  }
345 
346  bool login(std::string& db_name, std::string& user_name, const std::string& password) {
347  db_handler_->disconnect(session_id_);
348  db_handler_->connect(session_id_, user_name, password, db_name);
349  return true;
350  }
351 
352  protected:
353  void reset() {
354  if (db_handler_) {
355  db_handler_->disconnect(session_id_);
356  db_handler_->shutdown();
357  }
359  db_handler_.reset();
360 
362  if (is_temp_db_) {
363  boost::filesystem::remove_all(base_path_);
364  }
365  base_path_.clear();
366  }
367 
368  bool catalogExists(const std::string& base_path) {
369  if (!boost::filesystem::exists(base_path)) {
370  return false;
371  }
372  for (auto& subdir : system_folders_) {
373  std::string path = base_path + "/" + subdir;
374  if (!boost::filesystem::exists(path)) {
375  return false;
376  }
377  }
378  return true;
379  }
380 
381  void cleanCatalog(const std::string& base_path) {
382  if (boost::filesystem::exists(base_path)) {
383  for (auto& subdir : system_folders_) {
384  std::string path = base_path + "/" + subdir;
385  if (boost::filesystem::exists(path)) {
386  boost::filesystem::remove_all(path);
387  }
388  }
389  }
390  }
391 
392  std::string createCatalog(const std::string& base_path) {
393  std::string root_dir = base_path;
394  if (base_path.empty()) {
396  auto tmp_path = boost::filesystem::temp_directory_path(error);
397  if (boost::system::errc::success != error.value()) {
398  std::cerr << error.message() << std::endl;
399  return "";
400  }
401  tmp_path /= "omnidbe_%%%%-%%%%-%%%%";
402  auto uniq_path = boost::filesystem::unique_path(tmp_path, error);
403  if (boost::system::errc::success != error.value()) {
404  std::cerr << error.message() << std::endl;
405  return "";
406  }
407  root_dir = uniq_path.string();
408  is_temp_db_ = true;
409  }
410  if (!boost::filesystem::exists(root_dir)) {
411  if (!boost::filesystem::create_directory(root_dir)) {
412  std::cerr << "Cannot create database directory: " << root_dir << std::endl;
413  return "";
414  }
415  }
416  size_t absent_count = 0;
417  for (auto& sub_dir : system_folders_) {
418  std::string path = root_dir + "/" + sub_dir;
419  if (!boost::filesystem::exists(path)) {
420  if (!boost::filesystem::create_directory(path)) {
421  std::cerr << "Cannot create database subdirectory: " << path << std::endl;
422  return "";
423  }
424  ++absent_count;
425  }
426  }
427  if ((absent_count > 0) && (absent_count < system_folders_.size())) {
428  std::cerr << "Database directory structure is broken: " << root_dir << std::endl;
429  return "";
430  }
431  return root_dir;
432  }
433 
434  private:
435  std::string base_path_;
436  std::string session_id_;
437  std::shared_ptr<DBHandler> db_handler_;
439  std::string udf_filename_;
440 
441  std::vector<std::string> system_folders_ = {shared::kCatalogDirectoryName,
444 };
445 
446 namespace {
448 }
449 
450 std::shared_ptr<DBEngine> DBEngine::create(const std::string& cmd_line) {
451  const std::lock_guard<std::mutex> lock(engine_create_mutex);
452  auto engine = std::make_shared<DBEngineImpl>();
453  if (!engine->init(cmd_line)) {
454  throw std::runtime_error("DBE initialization failed");
455  }
456  return engine;
457 }
458 
462  return (DBEngineImpl*)ptr;
463 }
464 inline const DBEngineImpl* getImpl(const DBEngine* ptr) {
465  return (const DBEngineImpl*)ptr;
466 }
467 
470 void DBEngine::executeDDL(const std::string& query) {
471  DBEngineImpl* engine = getImpl(this);
472  engine->executeDDL(query);
473 }
474 
475 std::shared_ptr<Cursor> DBEngine::executeDML(const std::string& query) {
476  DBEngineImpl* engine = getImpl(this);
477  return engine->executeDML(query);
478 }
479 
480 std::shared_ptr<Cursor> DBEngine::executeRA(const std::string& query) {
481  DBEngineImpl* engine = getImpl(this);
482  return engine->executeRA(query);
483 }
484 
485 void DBEngine::importArrowTable(const std::string& name,
486  std::shared_ptr<arrow::Table>& table,
487  uint64_t fragment_size) {
488  DBEngineImpl* engine = getImpl(this);
489  engine->importArrowTable(name, table, fragment_size);
490 }
491 
492 std::vector<std::string> DBEngine::getTables() {
493  DBEngineImpl* engine = getImpl(this);
494  return engine->getTables();
495 }
496 
497 std::vector<ColumnDetails> DBEngine::getTableDetails(const std::string& table_name) {
498  DBEngineImpl* engine = getImpl(this);
499  return engine->getTableDetails(table_name);
500 }
501 
502 bool DBEngine::setDatabase(std::string& db_name) {
503  DBEngineImpl* engine = getImpl(this);
504  return engine->setDatabase(db_name);
505 }
506 
507 bool DBEngine::login(std::string& db_name,
508  std::string& user_name,
509  const std::string& password) {
510  DBEngineImpl* engine = getImpl(this);
511  return engine->login(db_name, user_name, password);
512 }
513 
516 inline CursorImpl* getImpl(Cursor* ptr) {
517  return (CursorImpl*)ptr;
518 }
519 
520 inline const CursorImpl* getImpl(const Cursor* ptr) {
521  return (const CursorImpl*)ptr;
522 }
523 
527  CursorImpl* cursor = getImpl(this);
528  return cursor->getColCount();
529 }
530 
532  CursorImpl* cursor = getImpl(this);
533  return cursor->getRowCount();
534 }
535 
537  CursorImpl* cursor = getImpl(this);
538  return cursor->getNextRow();
539 }
540 
541 ColumnType Cursor::getColType(uint32_t col_num) {
542  CursorImpl* cursor = getImpl(this);
543  return cursor->getColType(col_num);
544 }
545 
546 std::shared_ptr<arrow::RecordBatch> Cursor::getArrowRecordBatch() {
547  CursorImpl* cursor = getImpl(this);
548  return cursor->getArrowRecordBatch();
549 }
550 } // namespace EmbeddedDatabase
Classes used to wrap parser calls for calcite redirection.
std::vector< std::unique_ptr< lockmgr::AbstractLockContainer< const TableDescriptor * >>> LockedTableDescriptors
Definition: LockMgr.h:272
const std::string kDataDirectoryName
bool setDatabase(std::string &db_name)
Definition: DBEngine.cpp:339
std::shared_ptr< DBHandler > db_handler_
Definition: DBEngine.cpp:437
std::shared_ptr< CursorImpl > sql_execute_dbe(const TSessionId &session_id, const std::string &query_str, const bool column_format, const int32_t first_n, const int32_t at_most_n)
Definition: DBEngine.cpp:200
SQLTypes
Definition: sqltypes.h:65
std::string tableName
unsigned renderer_vulkan_timeout_ms
std::vector< LeafHostInfo > string_leaves
std::string udf_compiler_path
void importArrowTable(const std::string &name, std::shared_ptr< arrow::Table > &table, uint64_t fragment_size=0)
Definition: DBEngine.cpp:485
std::shared_ptr< arrow::RecordBatch > getArrowRecordBatch()
Definition: DBEngine.cpp:74
#define LOG(tag)
Definition: Logger.h:285
std::string storageType
#define DEFAULT_MAX_CHUNK_SIZE
bool setDatabase(std::string &db_name)
Definition: DBEngine.cpp:502
std::vector< ColumnDetails > getTableDetails(const std::string &table_name)
Definition: DBEngine.cpp:287
void setArrowTable(std::string name, std::shared_ptr< arrow::Table > table)
void executeDDL(const std::string &query)
Definition: DBEngine.cpp:223
bool init(const std::string &cmd_line)
Definition: DBEngine.cpp:106
bool catalogExists(const std::string &base_path)
Definition: DBEngine.cpp:368
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
const std::string kDefaultExportDirName
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
void createDBObject(const UserMetadata &user, const std::string &objectName, DBObjectType type, const Catalog_Namespace::Catalog &catalog, int32_t objectId=-1)
void releaseArrowTable(std::string name)
ColumnType getColType(uint32_t col_num)
Definition: DBEngine.cpp:66
std::vector< ColumnDetails > getTableDetails(const std::string &table_name)
Definition: DBEngine.cpp:497
ColumnType sqlToColumnType(const SQLTypes &type)
Definition: DBETypes.cpp:111
bool login(std::string &db_name, std::string &user_name, const std::string &password)
Definition: DBEngine.cpp:346
EncodingType
Definition: sqltypes.h:240
#define DEFAULT_MAX_ROWS
size_t max_concurrent_render_sessions
Supported runtime functions management and retrieval.
static SysCatalog & instance()
Definition: SysCatalog.h:343
std::shared_ptr< Cursor > executeDML(const std::string &query)
Definition: DBEngine.cpp:475
std::shared_ptr< ResultSet > result_set_
Definition: DBEngine.cpp:92
std::vector< LeafHostInfo > db_leaves
const std::string kDefaultDbName
CursorImpl(std::shared_ptr< ResultSet > result_set, std::vector< std::string > col_names)
Definition: DBEngine.cpp:45
void importArrowTable(const std::string &name, std::shared_ptr< arrow::Table > &table, uint64_t fragment_size)
Definition: DBEngine.cpp:227
std::shared_ptr< arrow::RecordBatch > record_batch_
Definition: DBEngine.cpp:94
std::shared_ptr< CursorImpl > executeRA(const std::string &query)
Definition: DBEngine.cpp:265
std::shared_ptr< arrow::RecordBatch > getArrowRecordBatch()
Definition: DBEngine.cpp:546
std::vector< std::string > col_names_
Definition: DBEngine.cpp:93
std::string createCatalog(const std::string &base_path)
Definition: DBEngine.cpp:392
std::string keyMetainfo
AuthMetadata authMetadata
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool g_serialize_temp_tables
Definition: Catalog.cpp:109
#define DEFAULT_PAGE_SIZE
const std::string kRootUsername
std::vector< std::string > getTables()
Definition: DBEngine.cpp:492
static std::shared_ptr< DBEngine > create(const std::string &cmd_line)
Definition: DBEngine.cpp:450
const std::string kDefaultRootPasswd
#define DEFAULT_FRAGMENT_ROWS
void shutdown()
Definition: Logger.cpp:401
Fragmenter_Namespace::FragmenterType fragType
Data_Namespace::MemoryLevel persistenceLevel
std::vector< std::string > udf_compiler_options
ColumnEncoding sqlToColumnEncoding(const EncodingType &type)
Definition: DBETypes.cpp:162
const std::string kCatalogDirectoryName
bool login(std::string &db_name, std::string &user_name, const std::string &password)
Definition: DBEngine.cpp:507
bool g_enable_watchdog false
Definition: Execute.cpp:80
def error_code
Definition: report.py:234
bool g_enable_union
ColumnType getColType(uint32_t col_num)
Definition: DBEngine.cpp:541
DBEngineImpl * getImpl(DBEngine *ptr)
Definition: DBEngine.cpp:461
File_Namespace::DiskCacheConfig disk_cache_config
string name
Definition: setup.in.py:72
void executeDDL(const std::string &query)
Definition: DBEngine.cpp:470
bool g_enable_thrift_logs
Definition: HeavyDB.cpp:298
#define IS_GEO(T)
Definition: sqltypes.h:310
std::vector< std::string > system_folders_
Definition: DBEngine.cpp:441
std::shared_ptr< CursorImpl > executeDML(const std::string &query)
Definition: DBEngine.cpp:261
std::shared_ptr< Cursor > executeRA(const std::string &query)
Definition: DBEngine.cpp:480
void cleanCatalog(const std::string &base_path)
Definition: DBEngine.cpp:381
SystemParameters system_parameters
std::vector< std::string > getTables()
Definition: DBEngine.cpp:269