OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Parser::InsertIntoTableAsSelectStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::InsertIntoTableAsSelectStmt:
+ Collaboration diagram for Parser::InsertIntoTableAsSelectStmt:

Public Member Functions

 InsertIntoTableAsSelectStmt (const rapidjson::Value &payload)
 
 InsertIntoTableAsSelectStmt (const std::string *table_name, const std::string *select_query, std::list< std::string * > *c)
 
void populateData (QueryStateProxy, const TableDescriptor *td, bool validate_table, bool for_CTAS=false)
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override
 
std::string & get_table ()
 
std::string & get_select_query ()
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

std::unique_ptr< QueryConnectorleafs_connector_
 

Protected Attributes

std::vector< std::unique_ptr
< std::string > > 
column_list_
 
std::string table_name_
 
std::string select_query_
 

Detailed Description

Definition at line 1127 of file ParserNode.h.

Constructor & Destructor Documentation

Parser::InsertIntoTableAsSelectStmt::InsertIntoTableAsSelectStmt ( const rapidjson::Value &  payload)

Definition at line 4080 of file ParserNode.cpp.

References CHECK, column_list_, json_str(), select_query_, and table_name_.

4081  {
4082  CHECK(payload.HasMember("name"));
4083  table_name_ = json_str(payload["name"]);
4084 
4085  CHECK(payload.HasMember("query"));
4086  select_query_ = json_str(payload["query"]);
4087 
4088  boost::replace_all(select_query_, "\n", " ");
4089  select_query_ = "(" + select_query_ + ")";
4090 
4091  if (payload.HasMember("columns")) {
4092  CHECK(payload["columns"].IsArray());
4093  for (auto& column : payload["columns"].GetArray()) {
4094  std::string s = json_str(column);
4095  column_list_.emplace_back(std::unique_ptr<std::string>(new std::string(s)));
4096  }
4097  }
4098 }
const std::string json_str(const rapidjson::Value &obj) noexcept
Definition: JsonAccessors.h:46
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1160
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

Parser::InsertIntoTableAsSelectStmt::InsertIntoTableAsSelectStmt ( const std::string *  table_name,
const std::string *  select_query,
std::list< std::string * > *  c 
)
inline

Definition at line 1131 of file ParserNode.h.

References column_list_.

1134  : table_name_(*table_name), select_query_(*select_query) {
1135  if (c) {
1136  for (auto e : *c) {
1137  column_list_.emplace_back(e);
1138  }
1139  delete c;
1140  }
1141 
1142  delete table_name;
1143  delete select_query;
1144  }
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1160

Member Function Documentation

void Parser::InsertIntoTableAsSelectStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode 
)
overridevirtual

Implements Parser::DDLStmt.

Reimplemented in Parser::CreateTableAsSelectStmt.

Definition at line 4668 of file ParserNode.cpp.

References Parser::anonymous_namespace{ParserNode.cpp}::acquire_query_table_locks(), Executor::clearExternalCaches(), query_state::QueryState::create(), legacylockmgr::getExecuteReadLock(), populateData(), select_query_, STDLOG, and table_name_.

Referenced by heavydb.cursor.Cursor::executemany().

4669  {
4670  if (read_only_mode) {
4671  throw std::runtime_error("INSERT INTO TABLE invalid in read only mode.");
4672  }
4673  auto session_copy = session;
4674  auto session_ptr = std::shared_ptr<Catalog_Namespace::SessionInfo>(
4675  &session_copy, boost::null_deleter());
4676  auto query_state = query_state::QueryState::create(session_ptr, select_query_);
4677  auto stdlog = STDLOG(query_state);
4678  auto& catalog = session_ptr->getCatalog();
4679 
4680  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
4681 
4682  if (catalog.getMetadataForTable(table_name_) == nullptr) {
4683  throw std::runtime_error("ITAS failed: table " + table_name_ + " does not exist.");
4684  }
4685 
4686  auto locks = acquire_query_table_locks(
4687  catalog.name(), select_query_, query_state->createQueryStateProxy(), table_name_);
4688  const TableDescriptor* td = catalog.getMetadataForTable(table_name_);
4689 
4690  Executor::clearExternalCaches(true, td, catalog.getCurrentDB().dbId);
4691 
4692  try {
4693  populateData(query_state->createQueryStateProxy(), td, true, false);
4694  } catch (...) {
4695  throw;
4696  }
4697 }
auto getExecuteReadLock()
void populateData(QueryStateProxy, const TableDescriptor *td, bool validate_table, bool for_CTAS=false)
static std::shared_ptr< QueryState > create(ARGS &&...args)
Definition: QueryState.h:148
lockmgr::LockedTableDescriptors acquire_query_table_locks(const std::string &insert_table_db_name, const std::string &query_str, const QueryStateProxy &query_state_proxy, const std::optional< std::string > &insert_table_name={})
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:438
#define STDLOG(...)
Definition: QueryState.h:234

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string& Parser::InsertIntoTableAsSelectStmt::get_select_query ( )
inline

Definition at line 1155 of file ParserNode.h.

References select_query_.

1155 { return select_query_; }
std::string& Parser::InsertIntoTableAsSelectStmt::get_table ( )
inline

Definition at line 1153 of file ParserNode.h.

References table_name_.

1153 { return table_name_; }
void Parser::InsertIntoTableAsSelectStmt::populateData ( QueryStateProxy  query_state_proxy,
const TableDescriptor td,
bool  validate_table,
bool  for_CTAS = false 
)

Definition at line 4100 of file ParserNode.cpp.

References threading_serial::async(), CHECK, Parser::check_session_interrupted(), column_list_, ColumnDescriptor::columnName, ColumnDescriptor::columnType, TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, logger::ERROR, import_export::fill_missing_columns(), g_cluster, g_enable_non_kernel_time_query_interrupt, g_enable_string_functions, ResultSet::GeoTargetValue, SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), SQLTypeInfo::get_elem_type(), SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), SQLTypeInfo::get_type_name(), Parser::LocalQueryConnector::getColumnDescriptors(), query_state::QueryState::getConstSessionInfo(), Executor::getExecutor(), query_state::QueryState::getQuerySubmittedTime(), AccessPrivileges::INSERT_INTO_TABLE, Fragmenter_Namespace::InsertDataLoader::insertData(), SQLTypeInfo::is_array(), SQLTypeInfo::is_date(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_geometry(), SQLTypeInfo::is_integer(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), SQLTypeInfo::is_timestamp(), TableDescriptor::isView, leafs_connector_, LOG, Fragmenter_Namespace::InsertData::numRows, Parser::LocalQueryConnector::query(), run_benchmark_import::res, run_benchmark_import::result, select_query_, run_benchmark_import::start_time, table_is_temporary(), table_name_, TableDBObjectType, TableDescriptor::tableId, Fragmenter_Namespace::InsertData::tableId, logger::thread_id(), timer_start(), timer_stop(), Executor::UNITARY_EXECUTOR_ID, UNLIKELY, foreign_storage::validate_non_foreign_table_write(), and VLOG.

Referenced by execute(), and Parser::CreateTableAsSelectStmt::execute().

4103  {
4104  auto const session = query_state_proxy->getConstSessionInfo();
4105  auto& catalog = session->getCatalog();
4107  bool populate_table = false;
4108 
4109  if (leafs_connector_) {
4110  populate_table = true;
4111  } else {
4112  leafs_connector_ = std::make_unique<LocalQueryConnector>();
4113  if (!g_cluster) {
4114  populate_table = true;
4115  }
4116  }
4117 
4118  auto get_target_column_descriptors = [this, &catalog](const TableDescriptor* td) {
4119  std::vector<const ColumnDescriptor*> target_column_descriptors;
4120  if (column_list_.empty()) {
4121  auto list = catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
4122  target_column_descriptors = {std::begin(list), std::end(list)};
4123  } else {
4124  for (auto& c : column_list_) {
4125  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *c);
4126  if (cd == nullptr) {
4127  throw std::runtime_error("Column " + *c + " does not exist.");
4128  }
4129  target_column_descriptors.push_back(cd);
4130  }
4131  }
4132 
4133  return target_column_descriptors;
4134  };
4135 
4136  bool is_temporary = table_is_temporary(td);
4137 
4138  if (validate_table) {
4139  // check access privileges
4140  if (!td) {
4141  throw std::runtime_error("Table " + table_name_ + " does not exist.");
4142  }
4143  if (td->isView) {
4144  throw std::runtime_error("Insert to views is not supported yet.");
4145  }
4146 
4147  if (!session->checkDBAccessPrivileges(DBObjectType::TableDBObjectType,
4149  table_name_)) {
4150  throw std::runtime_error("User has no insert privileges on " + table_name_ + ".");
4151  }
4152 
4153  // only validate the select query so we get the target types
4154  // correctly, but do not populate the result set
4155  LocalQueryConnector local_connector;
4156  auto result = local_connector.query(query_state_proxy, select_query_, {}, true, true);
4157  auto source_column_descriptors = local_connector.getColumnDescriptors(result, false);
4158 
4159  std::vector<const ColumnDescriptor*> target_column_descriptors =
4160  get_target_column_descriptors(td);
4161 
4162  if (source_column_descriptors.size() != target_column_descriptors.size()) {
4163  throw std::runtime_error("The number of source and target columns does not match.");
4164  }
4165 
4166  for (int i = 0; i < source_column_descriptors.size(); i++) {
4167  const ColumnDescriptor* source_cd =
4168  &(*std::next(source_column_descriptors.begin(), i));
4169  const ColumnDescriptor* target_cd = target_column_descriptors.at(i);
4170 
4171  if (source_cd->columnType.get_type() != target_cd->columnType.get_type()) {
4172  auto type_cannot_be_cast = [](const auto& col_type) {
4173  return (col_type.is_time() || col_type.is_geometry() || col_type.is_array() ||
4174  col_type.is_boolean());
4175  };
4176 
4177  if (type_cannot_be_cast(source_cd->columnType) ||
4178  type_cannot_be_cast(target_cd->columnType)) {
4179  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4180  source_cd->columnType.get_type_name() +
4181  "' and target '" + target_cd->columnName + " " +
4182  target_cd->columnType.get_type_name() +
4183  "' column types do not match.");
4184  }
4185  }
4186  if (source_cd->columnType.is_array()) {
4187  if (source_cd->columnType.get_subtype() != target_cd->columnType.get_subtype()) {
4188  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4189  source_cd->columnType.get_type_name() +
4190  "' and target '" + target_cd->columnName + " " +
4191  target_cd->columnType.get_type_name() +
4192  "' array column element types do not match.");
4193  }
4194  }
4195 
4196  if (target_cd->columnType.is_string() && !source_cd->columnType.is_string()) {
4197  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4198  source_cd->columnType.get_type_name() +
4199  "' and target '" + target_cd->columnName + " " +
4200  target_cd->columnType.get_type_name() +
4201  "' column types do not match.");
4202  }
4203 
4204  if (source_cd->columnType.is_decimal() ||
4205  source_cd->columnType.get_elem_type().is_decimal()) {
4206  SQLTypeInfo sourceType = source_cd->columnType;
4207  SQLTypeInfo targetType = target_cd->columnType;
4208 
4209  if (source_cd->columnType.is_array()) {
4210  sourceType = source_cd->columnType.get_elem_type();
4211  targetType = target_cd->columnType.get_elem_type();
4212  }
4213 
4214  if (sourceType.get_scale() != targetType.get_scale()) {
4215  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4216  source_cd->columnType.get_type_name() +
4217  "' and target '" + target_cd->columnName + " " +
4218  target_cd->columnType.get_type_name() +
4219  "' decimal columns scales do not match.");
4220  }
4221  }
4222 
4223  if (source_cd->columnType.is_string()) {
4224  if (!target_cd->columnType.is_string()) {
4225  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4226  source_cd->columnType.get_type_name() +
4227  "' and target '" + target_cd->columnName + " " +
4228  target_cd->columnType.get_type_name() +
4229  "' column types do not match.");
4230  }
4231  if (source_cd->columnType.get_compression() !=
4232  target_cd->columnType.get_compression()) {
4233  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4234  source_cd->columnType.get_type_name() +
4235  "' and target '" + target_cd->columnName + " " +
4236  target_cd->columnType.get_type_name() +
4237  "' columns string encodings do not match.");
4238  }
4239  }
4240 
4241  if (source_cd->columnType.is_timestamp() && target_cd->columnType.is_timestamp()) {
4242  if (source_cd->columnType.get_dimension() !=
4243  target_cd->columnType.get_dimension()) {
4244  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4245  source_cd->columnType.get_type_name() +
4246  "' and target '" + target_cd->columnName + " " +
4247  target_cd->columnType.get_type_name() +
4248  "' timestamp column precisions do not match.");
4249  }
4250  }
4251 
4252  if (!source_cd->columnType.is_string() && !source_cd->columnType.is_geometry() &&
4253  !source_cd->columnType.is_integer() && !source_cd->columnType.is_decimal() &&
4254  !source_cd->columnType.is_date() && !source_cd->columnType.is_time() &&
4255  !source_cd->columnType.is_timestamp() &&
4256  source_cd->columnType.get_size() > target_cd->columnType.get_size()) {
4257  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4258  source_cd->columnType.get_type_name() +
4259  "' and target '" + target_cd->columnName + " " +
4260  target_cd->columnType.get_type_name() +
4261  "' column encoding sizes do not match.");
4262  }
4263  }
4264  }
4265 
4266  if (!populate_table) {
4267  return;
4268  }
4269 
4270  int64_t total_row_count = 0;
4271  int64_t total_source_query_time_ms = 0;
4272  int64_t total_target_value_translate_time_ms = 0;
4273  int64_t total_data_load_time_ms = 0;
4274 
4276  auto target_column_descriptors = get_target_column_descriptors(td);
4277  auto outer_frag_count =
4278  leafs_connector_->getOuterFragmentCount(query_state_proxy, select_query_);
4279 
4280  size_t outer_frag_end = outer_frag_count == 0 ? 1 : outer_frag_count;
4281  auto query_session = session ? session->get_session_id() : "";
4283  std::string work_type_str = for_CTAS ? "CTAS" : "ITAS";
4284  try {
4285  for (size_t outer_frag_idx = 0; outer_frag_idx < outer_frag_end; outer_frag_idx++) {
4286  std::vector<size_t> allowed_outer_fragment_indices;
4287 
4288  if (outer_frag_count) {
4289  allowed_outer_fragment_indices.push_back(outer_frag_idx);
4290  }
4291 
4292  const auto query_clock_begin = timer_start();
4293  std::vector<AggregatedResult> query_results =
4294  leafs_connector_->query(query_state_proxy,
4295  select_query_,
4296  allowed_outer_fragment_indices,
4298  total_source_query_time_ms += timer_stop(query_clock_begin);
4299 
4300  auto start_time = query_state_proxy->getQuerySubmittedTime();
4301  auto query_str = "INSERT_DATA for " + work_type_str;
4303  // In the clean-up phase of the query execution for collecting aggregated result
4304  // of SELECT query, we remove its query session info, so we need to enroll the
4305  // session info again
4306  executor->enrollQuerySession(query_session,
4307  query_str,
4308  start_time,
4310  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
4311  }
4312 
4313  ScopeGuard clearInterruptStatus = [executor, &query_session, &start_time] {
4314  // this data population is non-kernel operation, so we manually cleanup
4315  // the query session info in the cleanup phase
4317  executor->clearQuerySessionStatus(query_session, start_time);
4318  }
4319  };
4320 
4321  for (auto& res : query_results) {
4322  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
4323  throw std::runtime_error(
4324  "Query execution has been interrupted while performing " + work_type_str);
4325  }
4326  auto& result_rows = res.rs;
4327  result_rows->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
4328  const auto num_rows = result_rows->rowCount();
4329 
4330  if (0 == num_rows) {
4331  continue;
4332  }
4333 
4334  total_row_count += num_rows;
4335 
4336  size_t leaf_count = leafs_connector_->leafCount();
4337 
4338  // ensure that at least 1 row is processed per block up to a maximum of 65536 rows
4339  const size_t rows_per_block =
4340  std::max(std::min(num_rows / leaf_count, size_t(64 * 1024)), size_t(1));
4341 
4342  std::vector<std::unique_ptr<TargetValueConverter>> value_converters;
4343 
4345 
4346  const int num_worker_threads = std::thread::hardware_concurrency();
4347 
4348  std::vector<size_t> thread_start_idx(num_worker_threads),
4349  thread_end_idx(num_worker_threads);
4350  bool can_go_parallel = !result_rows->isTruncated() && rows_per_block > 20000;
4351 
4352  std::atomic<size_t> crt_row_idx{0};
4353 
4354  auto do_work = [&result_rows, &value_converters, &crt_row_idx](
4355  const size_t idx,
4356  const size_t block_end,
4357  const size_t num_cols,
4358  const size_t thread_id,
4359  bool& stop_convert) {
4360  const auto result_row = result_rows->getRowAtNoTranslations(idx);
4361  if (!result_row.empty()) {
4362  size_t target_row = crt_row_idx.fetch_add(1);
4363  if (target_row >= block_end) {
4364  stop_convert = true;
4365  return;
4366  }
4367  for (unsigned int col = 0; col < num_cols; col++) {
4368  const auto& mapd_variant = result_row[col];
4369  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
4370  }
4371  }
4372  };
4373 
4374  auto convert_function = [&thread_start_idx,
4375  &thread_end_idx,
4376  &value_converters,
4377  &executor,
4378  &query_session,
4379  &work_type_str,
4380  &do_work](const int thread_id, const size_t block_end) {
4381  const int num_cols = value_converters.size();
4382  const size_t start = thread_start_idx[thread_id];
4383  const size_t end = thread_end_idx[thread_id];
4384  size_t idx = 0;
4385  bool stop_convert = false;
4387  size_t local_idx = 0;
4388  for (idx = start; idx < end; ++idx, ++local_idx) {
4389  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
4390  check_session_interrupted(query_session, executor))) {
4391  throw std::runtime_error(
4392  "Query execution has been interrupted while performing " +
4393  work_type_str);
4394  }
4395  do_work(idx, block_end, num_cols, thread_id, stop_convert);
4396  if (stop_convert) {
4397  break;
4398  }
4399  }
4400  } else {
4401  for (idx = start; idx < end; ++idx) {
4402  do_work(idx, block_end, num_cols, thread_id, stop_convert);
4403  if (stop_convert) {
4404  break;
4405  }
4406  }
4407  }
4408  thread_start_idx[thread_id] = idx;
4409  };
4410 
4411  auto single_threaded_value_converter =
4412  [&crt_row_idx, &value_converters, &result_rows](const size_t idx,
4413  const size_t block_end,
4414  const size_t num_cols,
4415  bool& stop_convert) {
4416  size_t target_row = crt_row_idx.fetch_add(1);
4417  if (target_row >= block_end) {
4418  stop_convert = true;
4419  return;
4420  }
4421  const auto result_row = result_rows->getNextRow(false, false);
4422  CHECK(!result_row.empty());
4423  for (unsigned int col = 0; col < num_cols; col++) {
4424  const auto& mapd_variant = result_row[col];
4425  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
4426  }
4427  };
4428 
4429  auto single_threaded_convert_function = [&value_converters,
4430  &thread_start_idx,
4431  &thread_end_idx,
4432  &executor,
4433  &query_session,
4434  &work_type_str,
4435  &single_threaded_value_converter](
4436  const int thread_id,
4437  const size_t block_end) {
4438  const int num_cols = value_converters.size();
4439  const size_t start = thread_start_idx[thread_id];
4440  const size_t end = thread_end_idx[thread_id];
4441  size_t idx = 0;
4442  bool stop_convert = false;
4444  size_t local_idx = 0;
4445  for (idx = start; idx < end; ++idx, ++local_idx) {
4446  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
4447  check_session_interrupted(query_session, executor))) {
4448  throw std::runtime_error(
4449  "Query execution has been interrupted while performing " +
4450  work_type_str);
4451  }
4452  single_threaded_value_converter(idx, block_end, num_cols, stop_convert);
4453  if (stop_convert) {
4454  break;
4455  }
4456  }
4457  } else {
4458  for (idx = start; idx < end; ++idx) {
4459  single_threaded_value_converter(idx, end, num_cols, stop_convert);
4460  if (stop_convert) {
4461  break;
4462  }
4463  }
4464  }
4465  thread_start_idx[thread_id] = idx;
4466  };
4467 
4468  if (can_go_parallel) {
4469  const size_t entry_count = result_rows->entryCount();
4470  for (size_t
4471  i = 0,
4472  start_entry = 0,
4473  stride = (entry_count + num_worker_threads - 1) / num_worker_threads;
4474  i < num_worker_threads && start_entry < entry_count;
4475  ++i, start_entry += stride) {
4476  const auto end_entry = std::min(start_entry + stride, entry_count);
4477  thread_start_idx[i] = start_entry;
4478  thread_end_idx[i] = end_entry;
4479  }
4480  } else {
4481  thread_start_idx[0] = 0;
4482  thread_end_idx[0] = result_rows->entryCount();
4483  }
4484 
4485  for (size_t block_start = 0; block_start < num_rows;
4486  block_start += rows_per_block) {
4487  const auto num_rows_this_itr = block_start + rows_per_block < num_rows
4488  ? rows_per_block
4489  : num_rows - block_start;
4490  crt_row_idx = 0; // reset block tracker
4491  value_converters.clear();
4492  int colNum = 0;
4493  for (const auto targetDescriptor : target_column_descriptors) {
4494  auto sourceDataMetaInfo = res.targets_meta[colNum++];
4496  num_rows_this_itr,
4497  sourceDataMetaInfo,
4498  targetDescriptor,
4499  catalog,
4500  targetDescriptor->columnType,
4501  !targetDescriptor->columnType.get_notnull(),
4502  result_rows->getRowSetMemOwner()->getLiteralStringDictProxy(),
4504  sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
4505  ? executor->getStringDictionaryProxy(
4506  sourceDataMetaInfo.get_type_info().getStringDictKey(),
4507  result_rows->getRowSetMemOwner(),
4508  true)
4509  : nullptr};
4510  auto converter = factory.create(param);
4511  value_converters.push_back(std::move(converter));
4512  }
4513 
4514  const auto translate_clock_begin = timer_start();
4515  if (can_go_parallel) {
4516  std::vector<std::future<void>> worker_threads;
4517  for (int i = 0; i < num_worker_threads; ++i) {
4518  worker_threads.push_back(
4519  std::async(std::launch::async, convert_function, i, num_rows_this_itr));
4520  }
4521 
4522  for (auto& child : worker_threads) {
4523  child.wait();
4524  }
4525  for (auto& child : worker_threads) {
4526  child.get();
4527  }
4528 
4529  } else {
4530  single_threaded_convert_function(0, num_rows_this_itr);
4531  }
4532 
4533  // finalize the insert data
4534  auto finalizer_func =
4535  [](std::unique_ptr<TargetValueConverter>::pointer targetValueConverter) {
4536  targetValueConverter->finalizeDataBlocksForInsertData();
4537  };
4538 
4539  std::vector<std::future<void>> worker_threads;
4540  for (auto& converterPtr : value_converters) {
4541  worker_threads.push_back(
4542  std::async(std::launch::async, finalizer_func, converterPtr.get()));
4543  }
4544 
4545  for (auto& child : worker_threads) {
4546  child.wait();
4547  }
4548  for (auto& child : worker_threads) {
4549  child.get();
4550  }
4551 
4553  insert_data.databaseId = catalog.getCurrentDB().dbId;
4554  CHECK(td);
4555  insert_data.tableId = td->tableId;
4556  insert_data.numRows = num_rows_this_itr;
4557 
4558  for (int col_idx = 0; col_idx < target_column_descriptors.size(); col_idx++) {
4560  check_session_interrupted(query_session, executor))) {
4561  throw std::runtime_error(
4562  "Query execution has been interrupted while performing " +
4563  work_type_str);
4564  }
4565  value_converters[col_idx]->addDataBlocksToInsertData(insert_data);
4566  }
4567  total_target_value_translate_time_ms += timer_stop(translate_clock_begin);
4568 
4569  const auto data_load_clock_begin = timer_start();
4570  auto data_memory_holder =
4571  import_export::fill_missing_columns(&catalog, insert_data);
4572  insertDataLoader.insertData(*session, insert_data);
4573  total_data_load_time_ms += timer_stop(data_load_clock_begin);
4574  }
4575  }
4576  }
4577  } catch (...) {
4578  try {
4579  leafs_connector_->rollback(*session, td->tableId);
4580  } catch (std::exception& e) {
4581  LOG(ERROR) << "An error occurred during ITAS rollback attempt. Table id: "
4582  << td->tableId << ", Error: " << e.what();
4583  }
4584  throw;
4585  }
4586 
4587  int64_t total_time_ms = total_source_query_time_ms +
4588  total_target_value_translate_time_ms + total_data_load_time_ms;
4589 
4590  VLOG(1) << "CTAS/ITAS " << total_row_count << " rows loaded in " << total_time_ms
4591  << "ms (outer_frag_count=" << outer_frag_count
4592  << ", query_time=" << total_source_query_time_ms
4593  << "ms, translation_time=" << total_target_value_translate_time_ms
4594  << "ms, data_load_time=" << total_data_load_time_ms
4595  << "ms)\nquery: " << select_query_;
4596 
4597  if (!is_temporary) {
4598  leafs_connector_->checkpoint(*session, td->tableId);
4599  }
4600 }
void validate_non_foreign_table_write(const TableDescriptor *table_descriptor)
Definition: FsiUtils.h:22
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:392
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
bool is_timestamp() const
Definition: sqltypes.h:1046
#define LOG(tag)
Definition: Logger.h:285
HOST DEVICE int get_scale() const
Definition: sqltypes.h:396
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:161
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:6217
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:138
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
bool is_time() const
Definition: sqltypes.h:579
bool g_enable_string_functions
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:513
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
future< Result > async(Fn &&fn, Args &&...args)
std::unique_ptr< QueryConnector > leafs_connector_
Definition: ParserNode.h:1157
bool is_integer() const
Definition: sqltypes.h:567
specifies the content in-memory of a row in the column metadata table
bool check_session_interrupted(const QuerySessionId &query_session, Executor *executor)
Definition: ParserNode.cpp:107
#define UNLIKELY(x)
Definition: likely.h:25
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
bool table_is_temporary(const TableDescriptor *const td)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:393
std::string get_type_name() const
Definition: sqltypes.h:484
std::vector< std::unique_ptr< std::string > > column_list_
Definition: ParserNode.h:1160
ThreadId thread_id()
Definition: Logger.cpp:879
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:597
bool g_cluster
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
SQLTypeInfo columnType
static constexpr ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
bool is_string() const
Definition: sqltypes.h:561
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:977
bool is_decimal() const
Definition: sqltypes.h:570
std::string columnName
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:84
bool is_date() const
Definition: sqltypes.h:1028
bool is_array() const
Definition: sqltypes.h:585
const std::string getQuerySubmittedTime() const
Definition: QueryState.cpp:101
#define VLOG(n)
Definition: Logger.h:388
Type timer_start()
Definition: measure.h:42
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<std::unique_ptr<std::string> > Parser::InsertIntoTableAsSelectStmt::column_list_
protected

Definition at line 1160 of file ParserNode.h.

Referenced by InsertIntoTableAsSelectStmt(), and populateData().

std::unique_ptr<QueryConnector> Parser::InsertIntoTableAsSelectStmt::leafs_connector_

Definition at line 1157 of file ParserNode.h.

Referenced by Parser::CreateTableAsSelectStmt::execute(), and populateData().

std::string Parser::InsertIntoTableAsSelectStmt::select_query_
protected
std::string Parser::InsertIntoTableAsSelectStmt::table_name_
protected

The documentation for this class was generated from the following files: