19 #include <boost/uuid/uuid_generators.hpp>
20 #include <boost/uuid/uuid_io.hpp>
37 #ifdef ENABLE_IMPORT_PARQUET
38 extern bool g_enable_legacy_parquet_import;
50 metadata_scan_exception) {
55 if (min_feasible_fragment_size < 0) {
58 foreign_table->
maxFragRows = min_feasible_fragment_size;
64 return metadata_vector;
68 const std::string& copy_from_source) {
71 #ifdef ENABLE_IMPORT_PARQUET
75 return boost::filesystem::path(copy_from_source).filename().string();
78 return copy_from_source;
88 std::map<ChunkKey, std::unique_ptr<foreign_storage::ForeignStorageBuffer>>
95 const int32_t fragment_id,
100 std::set<ChunkKey> fragment_keys;
101 for (
const auto col_desc : columns) {
104 if (col_desc->columnType.is_varlen_indeed()) {
106 data_key.push_back(1);
107 fragment_keys.insert(data_key);
108 auto index_key = key;
109 index_key.push_back(2);
110 fragment_keys.insert(index_key);
112 fragment_keys.insert(key);
117 std::unique_ptr<FragmentBuffers> frag_buffers = std::make_unique<FragmentBuffers>();
118 frag_buffers->delete_buffer = std::make_unique<foreign_storage::ForeignStorageBuffer>();
119 for (
const auto& key : fragment_keys) {
120 frag_buffers->fragment_buffers_owner[key] =
121 std::make_unique<foreign_storage::ForeignStorageBuffer>();
122 frag_buffers->fragment_buffers[key] =
135 const std::string& copy_from_source,
137 std::mutex& communication_mutex,
138 bool& continue_loading,
140 bool& data_wrapper_error_occured,
141 std::condition_variable& buffers_to_load_condition,
142 std::list<std::unique_ptr<FragmentBuffers>>& buffers_to_load) {
147 buffers_to_load_condition.wait(communication_lock, [&]() {
148 return !buffers_to_load.empty() || !continue_loading ||
149 data_wrapper_error_occured;
151 if ((buffers_to_load.empty() && !continue_loading) || data_wrapper_error_occured) {
156 CHECK(!buffers_to_load.empty());
159 std::unique_ptr<FragmentBuffers> grouped_fragment_buffers;
162 grouped_fragment_buffers.reset(buffers_to_load.front().release());
163 buffers_to_load.pop_front();
164 buffers_to_load_condition.notify_all();
166 auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
167 auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
174 for (
const auto& [key, buffer] : fragment_buffers) {
179 if (col_desc->columnType.is_varlen_indeed()) {
182 auto index_key = key;
191 insert_chunks.chunks[col_id] =
197 auto row_count = fragment_buffers.begin()
198 ->second->getEncoder()
201 insert_chunks.valid_row_indices.reserve(row_count);
202 for (
size_t irow = 0; irow < row_count; ++irow) {
203 if (delete_buffer->size() > 0) {
204 CHECK_LE(irow, delete_buffer->size());
205 if (delete_buffer->getMemoryPtr()[irow]) {
209 insert_chunks.valid_row_indices.emplace_back(irow);
213 insert_data_loader.
insertChunks(*session_info, insert_chunks);
215 CHECK_LE(insert_chunks.valid_row_indices.size(), row_count);
216 import_status.
rows_rejected += row_count - insert_chunks.valid_row_indices.size();
217 import_status.
rows_completed += insert_chunks.valid_row_indices.size();
221 "Load was cancelled due to max reject rows being reached";
223 get_import_id(copy_params, copy_from_source), import_status);
226 buffers_to_load_condition.notify_all();
230 get_import_id(copy_params, copy_from_source), import_status);
235 buffers_to_load_condition.notify_all();
243 const int32_t max_fragment_id,
250 const std::string& copy_from_source,
251 const size_t maximum_num_fragments_buffered) {
254 std::mutex communication_mutex;
255 bool continue_loading =
260 bool data_wrapper_error_occured =
false;
262 std::condition_variable buffers_to_load_condition;
263 std::list<std::unique_ptr<FragmentBuffers>> buffers_to_load;
272 std::cref(copy_params),
273 std::cref(copy_from_source),
274 std::ref(import_status),
275 std::ref(communication_mutex),
276 std::ref(continue_loading),
277 std::ref(load_failed),
278 std::ref(data_wrapper_error_occured),
279 std::ref(buffers_to_load_condition),
280 std::ref(buffers_to_load));
282 for (int32_t fragment_id = 0; fragment_id <= max_fragment_id; ++fragment_id) {
285 buffers_to_load_condition.wait(communication_lock, [&]() {
286 return buffers_to_load.size() < maximum_num_fragments_buffered || load_failed;
293 auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
294 auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
304 data_wrapper_error_occured =
true;
305 buffers_to_load_condition.notify_all();
310 buffers_to_load.emplace_back(std::move(grouped_fragment_buffers));
311 buffers_to_load_condition.notify_all();
316 continue_loading =
false;
317 buffers_to_load_condition.notify_all();
323 return import_status;
327 struct DownloadedObjectToProcess {
328 std::string object_key;
329 std::atomic<bool> is_downloaded;
330 std::string download_file_path;
331 std::string import_file_path;
334 size_t get_number_of_digits(
const size_t number) {
338 std::tuple<std::string, import_export::CopyParams> get_local_copy_source_and_params(
340 std::vector<DownloadedObjectToProcess>& objects_to_process,
341 const size_t begin_object_index,
342 const size_t end_object_index) {
356 CHECK_GT(end_object_index, begin_object_index);
357 CHECK_LT(begin_object_index, objects_to_process.size());
359 size_t num_objects = end_object_index - begin_object_index;
360 auto& first_object = objects_to_process[begin_object_index];
361 std::string first_path = first_object.download_file_path;
362 std::string temp_dir = first_path +
"_import";
364 if (!std::filesystem::create_directory(temp_dir)) {
365 throw std::runtime_error(
"failed to create temporary directory for import: " +
385 std::filesystem::path temp_dir_path{temp_dir};
387 size_t num_zero = get_number_of_digits(num_objects);
388 for (
size_t i = begin_object_index; i < end_object_index; ++i) {
389 auto&
object = objects_to_process[i];
390 std::filesystem::path old_path =
object.download_file_path;
392 auto zero_padded_counter_str =
393 std::string(num_zero - counter_str.length(),
'0') + counter_str;
394 auto new_path = (temp_dir_path / zero_padded_counter_str).
string() +
395 std::filesystem::path{
object.object_key}.extension().string();
396 std::filesystem::rename(old_path, new_path);
397 object.import_file_path = new_path;
399 return {temp_dir, local_copy_params};
405 namespace import_export {
410 : copy_from_source_(copy_from_source), copy_params_(copy_params), table_(table) {
411 connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
417 const std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>>&
418 string_dictionaries) {
423 auto timer =
DEBUG_TIMER(
"Dictionary Checkpointing");
424 for (
const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
425 if (!string_dictionary->checkpoint()) {
426 LOG(
ERROR) <<
"Checkpointing Dictionary for Column "
427 << column_desciptor->columnName <<
" failed.";
429 import_status.
load_msg =
"Dictionary checkpoint failed";
445 const int32_t table_id) {
446 auto& catalog = parent_session_info.
getCatalog();
448 auto logical_columns =
451 std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>> string_dictionaries;
452 for (
const auto& column_descriptor : logical_columns) {
453 if (!column_descriptor->columnType.is_dict_encoded_string()) {
456 auto dict_descriptor =
457 catalog.getMetadataForDict(column_descriptor->columnType.get_comp_param(),
true);
458 string_dictionaries.emplace_back(column_descriptor,
459 dict_descriptor->stringDict.get());
462 finalize(parent_session_info, import_status, string_dictionaries);
470 const size_t maximum_num_fragments_buffered,
471 const size_t max_import_batch_row_count,
473 const int32_t table_id) {
478 if (max_import_batch_row_count != 0) {
479 return max_import_batch_row_count;
488 const size_t max_buffer_byte_size =
489 2 * 1024UL * 1024UL * 1024UL / maximum_num_fragments_buffered;
491 auto& catalog = parent_session_info.
getCatalog();
493 auto logical_columns =
496 size_t row_byte_size = 0;
497 for (
const auto& column_descriptor : logical_columns) {
498 auto type = column_descriptor->columnType;
499 size_t field_byte_length = 0;
500 if (
type.is_varlen_indeed()) {
502 field_byte_length = 256;
504 field_byte_length =
type.get_size();
506 row_byte_size += field_byte_length;
509 return std::min<size_t>((max_buffer_byte_size + row_byte_size - 1) / row_byte_size,
516 const std::string& copy_from_source,
528 auto [server, user_mapping, foreign_table] =
531 catalog.getDatabaseId(),
537 const size_t maximum_num_fragments_buffered = 3;
539 foreign_table->maxFragRows =
546 LOG(
INFO) <<
"Import fragment row count is " << foreign_table->maxFragRows
552 catalog.getDatabaseId(),
556 int32_t max_fragment_id = std::numeric_limits<int32_t>::max();
557 if (!data_wrapper->isLazyFragmentFetchingEnabled()) {
560 if (metadata_vector.empty()) {
564 for (
const auto& [key, _] : metadata_vector) {
578 maximum_num_fragments_buffered);
584 return import_status;
593 auto data_dir_path = boost::filesystem::canonical(base_path);
602 #
if ENABLE_IMPORT_PARQUET
606 throw std::runtime_error(
"Attempting to load S3 resource '" +
copy_from_source_ +
607 "' for unsupported 'source_type' (must be 'DELIMITED_FILE'"
608 #
if ENABLE_IMPORT_PARQUET
611 " or 'REGEX_PARSED_FILE'");
621 auto uuid = boost::uuids::random_generator()();
627 if (std::filesystem::exists(import_path)) {
628 std::filesystem::remove_all(import_path);
643 s3_archive->init_for_read();
645 const auto bucket_name = s3_archive->url_part(4);
647 auto object_keys = s3_archive->get_objkeys();
648 std::vector<DownloadedObjectToProcess> objects_to_process(object_keys.size());
649 size_t object_count = 0;
650 for (
const auto& objkey : object_keys) {
651 auto&
object = objects_to_process[object_count++];
652 object.object_key = objkey;
653 object.is_downloaded =
false;
659 std::mutex communication_mutex;
660 bool continue_downloading =
true;
661 bool download_exception_occured =
false;
663 std::condition_variable files_download_condition;
665 auto is_downloading_finished = [&] {
667 return !continue_downloading || download_exception_occured;
670 std::function<void(const std::vector<size_t>&)> download_objects =
671 [&](
const std::vector<size_t>& partition) {
672 for (
const auto& index : partition) {
673 DownloadedObjectToProcess&
object = objects_to_process[index];
674 const std::string& obj_key =
object.object_key;
675 if (is_downloading_finished()) {
678 std::exception_ptr eptr;
679 std::string local_file_path;
680 std::string exception_what;
681 bool exception_occured =
false;
684 local_file_path = s3_archive->land(obj_key,
689 }
catch (
const std::exception& e) {
690 exception_what = e.what();
691 exception_occured =
true;
694 if (is_downloading_finished()) {
697 if (exception_occured) {
700 download_exception_occured =
true;
702 files_download_condition.notify_all();
703 throw std::runtime_error(
"failed to fetch s3 object: '" + obj_key +
704 "': " + exception_what);
707 object.download_file_path = local_file_path;
708 object.is_downloaded =
712 files_download_condition.notify_all();
716 std::function<void()> import_local_files = [&]() {
717 for (
size_t object_index = 0; object_index < object_count;) {
720 files_download_condition.wait(
722 [&download_exception_occured, object_index, &objects_to_process]() {
723 return objects_to_process[object_index].is_downloaded ||
724 download_exception_occured;
726 if (download_exception_occured) {
733 size_t end_object_index = object_count;
734 for (
size_t i = object_index + 1; i < object_count; ++i) {
735 if (!objects_to_process[i].is_downloaded) {
736 end_object_index = i;
742 std::string local_import_dir;
745 std::tie(local_import_dir, local_copy_params) = get_local_copy_source_and_params(
746 copy_params_, objects_to_process, object_index, end_object_index);
747 local_import_status =
748 importGeneral(session_info, local_import_dir, local_copy_params);
750 std::filesystem::remove_all(local_import_dir);
751 }
catch (
const std::exception& except) {
754 std::string what = except.what();
756 for (
size_t i = object_index; i < end_object_index; ++i) {
757 auto&
object = objects_to_process[i];
758 what = boost::regex_replace(what,
759 boost::regex{
object.import_file_path},
760 bucket_name +
"/" +
object.object_key);
764 continue_downloading =
false;
767 std::filesystem::remove_all(local_import_dir);
768 throw std::runtime_error(what);
770 aggregate_import_status += local_import_status;
772 aggregate_import_status);
776 continue_downloading =
false;
788 std::vector<size_t> partition_range(object_count);
789 std::iota(partition_range.begin(), partition_range.end(), 0);
791 partition_range, num_download_threads, download_objects);
795 for (
auto& future : download_futures) {
801 for (
auto& future : download_futures) {
804 return aggregate_import_status;
807 throw std::runtime_error(
"AWS S3 support not available");
813 #ifdef ENABLE_IMPORT_PARQUET
826 catalog.getDatabaseId(),
836 foreign_table->validateOptionValues();
840 catalog.getDatabaseId(),
844 if (
auto parquet_import =
845 dynamic_cast<foreign_storage::ParquetImporter*>(data_wrapper.get())) {
851 if (copy_params_.threads == 0) {
852 max_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
855 max_threads =
static_cast<size_t>(copy_params_.threads);
859 int num_importer_threads =
860 std::min<int>(max_threads, parquet_import->getMaxNumUsefulThreads());
861 parquet_import->setNumThreads(num_importer_threads);
862 int num_outer_thread = 1;
863 for (
int thread_count = 1; thread_count <= max_threads; ++thread_count) {
864 if (thread_count * num_importer_threads <= max_threads) {
865 num_outer_thread = thread_count;
870 ImportStatus import_status;
872 auto import_failed = [&import_status_mutex, &import_status] {
874 return import_status.load_failed;
877 std::vector<std::future<void>> futures;
879 for (
int ithread = 0; ithread < num_outer_thread; ++ithread) {
882 auto batch_result = parquet_import->getNextImportBatch();
883 if (import_failed()) {
886 auto batch = batch_result->getInsertData();
887 if (!batch || import_failed()) {
890 insert_data_loader.insertData(*session_info, *batch);
892 auto batch_import_status = batch_result->getImportStatus();
895 import_status.rows_completed += batch_import_status.rows_completed;
896 import_status.rows_rejected += batch_import_status.rows_rejected;
897 if (import_status.rows_rejected > copy_params_.max_reject) {
898 import_status.load_failed =
true;
899 import_status.load_msg =
900 "Load was cancelled due to max reject rows being reached";
908 for (
auto& future : futures) {
912 for (
auto& future : futures) {
916 if (import_status.load_failed) {
917 foreign_table.reset();
921 finalize(*session_info, import_status, parquet_import->getStringDictionaries());
923 return import_status;
std::unique_ptr< foreign_storage::ForeignStorageBuffer > delete_buffer
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::string s3_secret_key
static int32_t proxy_foreign_table_fragment_size_
std::vector< int > ChunkKey
void insertChunks(const Catalog_Namespace::SessionInfo &session_info, const InsertChunks &insert_chunks)
static std::string default_import_path_
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
std::string copy_from_source_
class for a per-database catalog. also includes metadata for the current database and the current use...
std::string get_import_id(const import_export::CopyParams ©_params, const std::string ©_from_source)
bool is_s3_uri(const std::string &file_path)
bool g_enable_legacy_delimited_import
foreign_storage::ChunkToBufferMap fragment_buffers
#define CHUNK_KEY_FRAGMENT_IDX
void validate_sort_options(const FilePathOptions &options)
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::unique_ptr< FragmentBuffers > create_fragment_buffers(const int32_t fragment_id, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table)
size_t max_import_batch_row_count
ChunkMetadataVector metadata_scan(foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
ImportStatus import(const Catalog_Namespace::SessionInfo *session_info) override
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string ©_from_source, const import_export::CopyParams ©_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
std::shared_lock< T > shared_lock
std::optional< std::string > regex_path_filter
const std::string kDefaultImportDirName
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const import_export::CopyParams ©_params, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
future< Result > async(Fn &&fn, Args &&...args)
static void setDefaultImportPath(const std::string &base_path)
Classes representing a parse tree.
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
std::map< ChunkKey, std::unique_ptr< foreign_storage::ForeignStorageBuffer > > fragment_buffers_owner
int32_t s3_max_concurrent_downloads
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
int getDatabaseId() const
ForeignDataImporter(const std::string &file_path, const CopyParams ©_params, const TableDescriptor *table)
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
import_export::SourceType source_type
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
bool is_valid_source_type(const import_export::CopyParams ©_params)
void load_foreign_data_buffers(Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams ©_params, const std::string ©_from_source, import_export::ImportStatus &import_status, std::mutex &communication_mutex, bool &continue_loading, bool &load_failed, bool &data_wrapper_error_occured, std::condition_variable &buffers_to_load_condition, std::list< std::unique_ptr< FragmentBuffers >> &buffers_to_load)
ImportStatus importGeneralS3(const Catalog_Namespace::SessionInfo *session_info)
static void set_import_status(const std::string &id, const ImportStatus is)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
void validate_regex_parser_options(const import_export::CopyParams ©_params)
bool g_enable_fsi_regex_import
Catalog & getCatalog() const
#define DEFAULT_FRAGMENT_ROWS
DEVICE void iota(ARGS &&...args)
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams ©_params)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Data_Namespace::MemoryLevel persistenceLevel
std::string s3_session_token
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams ©_params, const ForeignServer *server)
#define CHUNK_KEY_VARLEN_IDX
#define DEBUG_TIMER(name)
const TableDescriptor * table_
int32_t get_proxy_foreign_table_fragment_size(const size_t maximum_num_fragments_buffered, const size_t max_import_batch_row_count, const Catalog_Namespace::SessionInfo &parent_session_info, const int32_t table_id)
virtual void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector)=0
import_export::ImportStatus import_foreign_data(const int32_t max_fragment_id, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams ©_params, const std::string ©_from_source, const size_t maximum_num_fragments_buffered)
void validate_copy_params(const import_export::CopyParams ©_params)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
#define CHUNK_KEY_COLUMN_IDX
std::shared_timed_mutex shared_mutex
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)
std::string s3_access_key
const UserMetadata & get_currentUser() const
std::optional< std::string > file_sort_order_by
static constexpr char const * PARQUET
size_t g_max_import_threads
std::optional< std::string > file_sort_regex
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams ©_params, const ForeignServer *server)