OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Parser::CopyTableStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::CopyTableStmt:
+ Collaboration diagram for Parser::CopyTableStmt:

Public Member Functions

 CopyTableStmt (std::string *t, std::string *f, std::list< NameValueAssign * > *o)
 
 CopyTableStmt (const rapidjson::Value &payload)
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode, const std::function< std::unique_ptr< import_export::AbstractImporter >(Catalog_Namespace::Catalog &, const TableDescriptor *, const std::string &, const import_export::CopyParams &)> &importer_factory)
 
std::string & get_table () const
 
bool get_success () const
 
bool was_deferred_copy_from () const
 
void get_deferred_copy_from_payload (std::string &table, std::string &file_name, import_export::CopyParams &copy_params, std::string &partitions)
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

std::unique_ptr< std::string > return_message
 

Private Attributes

std::unique_ptr< std::string > table_
 
std::unique_ptr< std::string > copy_from_source_pattern_
 
bool success_
 
std::list< std::unique_ptr
< NameValueAssign > > 
options_
 
bool was_deferred_copy_from_ = false
 
std::string deferred_copy_from_file_name_
 
import_export::CopyParams deferred_copy_from_copy_params_
 
std::string deferred_copy_from_partitions_
 

Detailed Description

Definition at line 1460 of file ParserNode.h.

Constructor & Destructor Documentation

Parser::CopyTableStmt::CopyTableStmt ( std::string *  t,
std::string *  f,
std::list< NameValueAssign * > *  o 
)

Definition at line 5720 of file ParserNode.cpp.

References options_.

5723  : table_(t), copy_from_source_pattern_(f), success_(true) {
5724  if (o) {
5725  for (const auto e : *o) {
5726  options_.emplace_back(e);
5727  }
5728  delete o;
5729  }
5730 }
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1498
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1497
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1500
Parser::CopyTableStmt::CopyTableStmt ( const rapidjson::Value &  payload)

Definition at line 5732 of file ParserNode.cpp.

References CHECK, copy_from_source_pattern_, json_str(), options_, Parser::anonymous_namespace{ParserNode.cpp}::parse_options(), and table_.

5732  : success_(true) {
5733  CHECK(payload.HasMember("table"));
5734  table_ = std::make_unique<std::string>(json_str(payload["table"]));
5735 
5736  CHECK(payload.HasMember("filePath"));
5737  std::string fs = json_str(payload["filePath"]);
5738  // strip leading/trailing spaces/quotes/single quotes
5739  boost::algorithm::trim_if(fs, boost::is_any_of(" \"'`"));
5740  copy_from_source_pattern_ = std::make_unique<std::string>(fs);
5741 
5742  parse_options(payload, options_);
5743 }
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1498
const std::string json_str(const rapidjson::Value &obj) noexcept
Definition: JsonAccessors.h:46
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1497
void parse_options(const rapidjson::Value &payload, std::list< std::unique_ptr< NameValueAssign >> &nameValueList, bool stringToNull=false, bool stringToInteger=false)
#define CHECK(condition)
Definition: Logger.h:291
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1500

+ Here is the call graph for this function:

Member Function Documentation

void Parser::CopyTableStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode 
)
overridevirtual

Implements Parser::DDLStmt.

Definition at line 5745 of file ParserNode.cpp.

References import_export::create_importer().

Referenced by heavydb.cursor.Cursor::executemany(), and QueryRunner::QueryRunner::runImport().

5746  {
5747  if (read_only_mode) {
5748  throw std::runtime_error("IMPORT invalid in read only mode.");
5749  }
5750  auto importer_factory = [](Catalog_Namespace::Catalog& catalog,
5751  const TableDescriptor* td,
5752  const std::string& copy_from_source,
5753  const import_export::CopyParams& copy_params)
5754  -> std::unique_ptr<import_export::AbstractImporter> {
5755  return import_export::create_importer(catalog, td, copy_from_source, copy_params);
5756  };
5757  return execute(session, read_only_mode, importer_factory);
5758 }
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
std::unique_ptr< AbstractImporter > create_importer(Catalog_Namespace::Catalog &catalog, const TableDescriptor *td, const std::string &copy_from_source, const import_export::CopyParams &copy_params)
Definition: Importer.cpp:6287
void execute(const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Parser::CopyTableStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode,
const std::function< std::unique_ptr< import_export::AbstractImporter >(Catalog_Namespace::Catalog &, const TableDescriptor *, const std::string &, const import_export::CopyParams &)> &  importer_factory 
)

Definition at line 5760 of file ParserNode.cpp.

References CHECK, Executor::clearExternalCaches(), copy_from_source_pattern_, deferred_copy_from_copy_params_, deferred_copy_from_file_name_, deferred_copy_from_partitions_, logger::ERROR, measure< TimeT >::execution(), g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::SessionInfo::get_currentUser(), Catalog_Namespace::SessionInfo::get_session_id(), Catalog_Namespace::SessionInfo::getCatalog(), legacylockmgr::getExecuteReadLock(), Executor::getExecutor(), lockmgr::TableLockMgrImpl< T >::getWriteLockForTable(), ddl_utils::IMPORT, logger::INFO, AccessPrivileges::INSERT_INTO_TABLE, lockmgr::instance(), import_export::kGeoFile, import_export::kOdbc, import_export::kRasterFile, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, DBObject::loadKey(), LOG, import_export::CopyParams::max_reject, options_, Parser::anonymous_namespace{ParserNode.cpp}::parse_copy_params(), return_message, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_rejected, DBObject::setPrivileges(), import_export::CopyParams::source_type, import_export::CopyParams::sql_order_by, import_export::CopyParams::sql_select, run_benchmark_import::start_time, success_, table_, TableDBObjectType, TableDescriptor::tableName, to_string(), toString(), Executor::UNITARY_EXECUTOR_ID, Catalog_Namespace::UserMetadata::userLoggable(), ddl_utils::validate_allowed_file_path(), and was_deferred_copy_from_.

Referenced by heavydb.cursor.Cursor::executemany().

5767  {
5768  if (read_only_mode) {
5769  throw std::runtime_error("COPY FROM invalid in read only mode.");
5770  }
5771 
5772  size_t total_time = 0;
5773 
5774  // Prevent simultaneous import / truncate (see TruncateTableStmt::execute)
5775  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
5776 
5777  const TableDescriptor* td{nullptr};
5778  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>> td_with_lock;
5779  std::unique_ptr<lockmgr::WriteLock> insert_data_lock;
5780 
5781  auto& catalog = session.getCatalog();
5782 
5783  try {
5784  td_with_lock = std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
5786  catalog, *table_));
5787  td = (*td_with_lock)();
5788  insert_data_lock = std::make_unique<lockmgr::WriteLock>(
5790  } catch (const std::runtime_error& e) {
5791  // noop
5792  // TODO(adb): We're really only interested in whether the table exists or not.
5793  // Create a more refined exception.
5794  }
5795 
5796  // if the table already exists, it's locked, so check access privileges
5797  if (td) {
5798  std::vector<DBObject> privObjects;
5799  DBObject dbObject(*table_, TableDBObjectType);
5800  dbObject.loadKey(catalog);
5801  dbObject.setPrivileges(AccessPrivileges::INSERT_INTO_TABLE);
5802  privObjects.push_back(dbObject);
5803  if (!SysCatalog::instance().checkPrivileges(session.get_currentUser(), privObjects)) {
5804  throw std::runtime_error("Violation of access privileges: user " +
5805  session.get_currentUser().userLoggable() +
5806  " has no insert privileges for table " + *table_ + ".");
5807  }
5808 
5809  // invalidate cached item
5810  Executor::clearExternalCaches(true, td, catalog.getCurrentDB().dbId);
5811  }
5812 
5813  import_export::CopyParams copy_params;
5814  std::vector<std::string> warnings;
5816 
5817  boost::regex non_local_file_regex{R"(^\s*(s3|http|https)://.+)",
5818  boost::regex::extended | boost::regex::icase};
5819  if (!boost::regex_match(*copy_from_source_pattern_, non_local_file_regex) &&
5823  }
5824  // since we'll have not only posix file names but also s3/hdfs/... url
5825  // we do not expand wildcard or check file existence here.
5826  // from here on, copy_from_source contains something which may be a url
5827  // a wildcard of file names, or a sql select statement;
5828  std::string copy_from_source = *copy_from_source_pattern_;
5829 
5830  if (copy_params.source_type == import_export::SourceType::kOdbc) {
5831  copy_params.sql_select = copy_from_source;
5832  if (copy_params.sql_order_by.empty()) {
5833  throw std::runtime_error(
5834  "Option \"SQL ORDER BY\" must be specified when copying from an ODBC source.");
5835  }
5836  }
5837 
5838  std::string tr;
5839 
5840  for (auto const& warning : warnings) {
5841  tr += warning + "\n";
5842  }
5843 
5844  if (copy_params.source_type == import_export::SourceType::kGeoFile ||
5846  // geo import
5847  // we do nothing here, except stash the parameters so we can
5848  // do the import when we unwind to the top of the handler
5849  deferred_copy_from_file_name_ = copy_from_source;
5850  deferred_copy_from_copy_params_ = copy_params;
5851  was_deferred_copy_from_ = true;
5852 
5853  // the result string
5854  // @TODO simon.eves put something more useful in here
5855  // except we really can't because we haven't done the import yet!
5856  if (td) {
5857  tr += std::string("Appending geo to table '") + *table_ + std::string("'...");
5858  } else {
5859  tr += std::string("Creating table '") + *table_ +
5860  std::string("' and importing geo...");
5861  }
5862  } else {
5863  if (td) {
5864  CHECK(td_with_lock);
5865 
5866  // regular import
5867  auto importer = importer_factory(catalog, td, copy_from_source, copy_params);
5868  auto start_time = ::toString(std::chrono::system_clock::now());
5870  auto query_session = session.get_session_id();
5871  auto query_str = "COPYING " + td->tableName;
5873  executor->enrollQuerySession(query_session,
5874  query_str,
5875  start_time,
5877  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5878  }
5879 
5880  ScopeGuard clearInterruptStatus =
5881  [executor, &query_str, &query_session, &start_time, &importer] {
5882  // reset the runtime query interrupt status
5884  executor->clearQuerySessionStatus(query_session, start_time);
5885  }
5886  };
5887  import_export::ImportStatus import_result;
5888  auto ms =
5889  measure<>::execution([&]() { import_result = importer->import(&session); });
5890  total_time += ms;
5891  // results
5892  if (!import_result.load_failed &&
5893  import_result.rows_rejected > copy_params.max_reject) {
5894  LOG(ERROR) << "COPY exited early due to reject records count during multi file "
5895  "processing ";
5896  // if we have crossed the truncated load threshold
5897  import_result.load_failed = true;
5898  import_result.load_msg =
5899  "COPY exited early due to reject records count during multi file "
5900  "processing ";
5901  success_ = false;
5902  }
5903  if (!import_result.load_failed) {
5904  tr += std::string(
5905  "Loaded: " + std::to_string(import_result.rows_completed) +
5906  " recs, Rejected: " + std::to_string(import_result.rows_rejected) +
5907  " recs in " + std::to_string((double)total_time / 1000.0) + " secs");
5908  } else {
5909  tr += std::string("Loader Failed due to : " + import_result.load_msg + " in " +
5910  std::to_string((double)total_time / 1000.0) + " secs");
5911  }
5912  } else {
5913  throw std::runtime_error("Table '" + *table_ + "' must exist before COPY FROM");
5914  }
5915  }
5916  return_message.reset(new std::string(tr));
5917  LOG(INFO) << tr;
5918 }
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1498
auto getExecuteReadLock()
#define LOG(tag)
Definition: Logger.h:285
std::unique_ptr< std::string > return_message
Definition: ParserNode.h:1474
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:161
std::string deferred_copy_from_partitions_
Definition: ParserNode.h:1505
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:138
std::string to_string(char const *&&v)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:513
std::string deferred_copy_from_file_name_
Definition: ParserNode.h:1503
std::string sql_order_by
Definition: CopyParams.h:98
import_export::SourceType source_type
Definition: CopyParams.h:57
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1703
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1497
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:822
void parse_copy_params(const std::list< std::unique_ptr< NameValueAssign >> &options_, import_export::CopyParams &copy_params, std::vector< std::string > &warnings, std::string &deferred_copy_from_partitions_)
std::string get_session_id() const
Definition: SessionInfo.h:93
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgr.cpp:137
Catalog & getCatalog() const
Definition: SessionInfo.h:75
T & instance()
Definition: LockMgr.cpp:101
import_export::CopyParams deferred_copy_from_copy_params_
Definition: ParserNode.h:1504
#define CHECK(condition)
Definition: Logger.h:291
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1500
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:438
static constexpr ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
std::string userLoggable() const
Definition: SysCatalog.cpp:158
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Parser::CopyTableStmt::get_deferred_copy_from_payload ( std::string &  table,
std::string &  file_name,
import_export::CopyParams copy_params,
std::string &  partitions 
)
inline

Definition at line 1485 of file ParserNode.h.

References deferred_copy_from_copy_params_, deferred_copy_from_file_name_, deferred_copy_from_partitions_, table_, and was_deferred_copy_from_.

1488  {
1489  table = *table_;
1490  file_name = deferred_copy_from_file_name_;
1491  copy_params = deferred_copy_from_copy_params_;
1492  partitions = deferred_copy_from_partitions_;
1493  was_deferred_copy_from_ = false;
1494  }
std::string deferred_copy_from_partitions_
Definition: ParserNode.h:1505
std::string deferred_copy_from_file_name_
Definition: ParserNode.h:1503
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1497
import_export::CopyParams deferred_copy_from_copy_params_
Definition: ParserNode.h:1504
bool Parser::CopyTableStmt::get_success ( ) const
inline

Definition at line 1481 of file ParserNode.h.

References success_.

1481 { return success_; }
std::string& Parser::CopyTableStmt::get_table ( ) const
inline

Definition at line 1476 of file ParserNode.h.

References CHECK, and table_.

1476  {
1477  CHECK(table_);
1478  return *table_;
1479  }
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1497
#define CHECK(condition)
Definition: Logger.h:291
bool Parser::CopyTableStmt::was_deferred_copy_from ( ) const
inline

Definition at line 1483 of file ParserNode.h.

References was_deferred_copy_from_.

1483 { return was_deferred_copy_from_; }

Member Data Documentation

std::unique_ptr<std::string> Parser::CopyTableStmt::copy_from_source_pattern_
private

Definition at line 1498 of file ParserNode.h.

Referenced by CopyTableStmt(), and execute().

import_export::CopyParams Parser::CopyTableStmt::deferred_copy_from_copy_params_
private

Definition at line 1504 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::string Parser::CopyTableStmt::deferred_copy_from_file_name_
private

Definition at line 1503 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::string Parser::CopyTableStmt::deferred_copy_from_partitions_
private

Definition at line 1505 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::list<std::unique_ptr<NameValueAssign> > Parser::CopyTableStmt::options_
private

Definition at line 1500 of file ParserNode.h.

Referenced by CopyTableStmt(), and execute().

std::unique_ptr<std::string> Parser::CopyTableStmt::return_message

Definition at line 1474 of file ParserNode.h.

Referenced by execute().

bool Parser::CopyTableStmt::success_
private

Definition at line 1499 of file ParserNode.h.

Referenced by execute(), and get_success().

std::unique_ptr<std::string> Parser::CopyTableStmt::table_
private

Definition at line 1497 of file ParserNode.h.

Referenced by CopyTableStmt(), execute(), get_deferred_copy_from_payload(), and get_table().

bool Parser::CopyTableStmt::was_deferred_copy_from_ = false
private

Definition at line 1502 of file ParserNode.h.

Referenced by execute(), get_deferred_copy_from_payload(), and was_deferred_copy_from().


The documentation for this class was generated from the following files: