OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignDataImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignDataImporter.h"
18 
19 #include <boost/uuid/uuid_generators.hpp>
20 #include <boost/uuid/uuid_io.hpp>
21 #include <filesystem>
22 
23 #include "Archive/S3Archive.h"
28 #include "Importer.h"
29 #include "Parser/ParserNode.h"
30 #include "Shared/file_path_util.h"
31 #include "Shared/measure.h"
32 #include "Shared/misc.h"
33 #include "Shared/scope.h"
34 #include "UserMapping.h"
35 
37 #ifdef ENABLE_IMPORT_PARQUET
38 extern bool g_enable_legacy_parquet_import;
39 #endif
40 extern bool g_enable_fsi_regex_import;
41 
42 namespace {
44  foreign_storage::ForeignTable* foreign_table) {
45  ChunkMetadataVector metadata_vector;
46  try {
47  data_wrapper->populateChunkMetadata(
48  metadata_vector); // explicitly invoke a metadata scan on data wrapper
50  metadata_scan_exception) {
51  // if a metadata scan exception is thrown, check to see if we can adjust
52  // the fragment size and retry
53 
54  auto min_feasible_fragment_size = metadata_scan_exception.min_feasible_fragment_size_;
55  if (min_feasible_fragment_size < 0) {
56  throw; // no valid fragment size returned by exception
57  }
58  foreign_table->maxFragRows = min_feasible_fragment_size;
59  data_wrapper->populateChunkMetadata(
60  metadata_vector); // attempt another metadata scan, note, we assume that the
61  // metadata scan can be reentered safely after throwing the
62  // exception
63  }
64  return metadata_vector;
65 }
66 
67 std::string get_import_id(const import_export::CopyParams& copy_params,
68  const std::string& copy_from_source) {
71 #ifdef ENABLE_IMPORT_PARQUET
73 #endif
74  ) {
75  return boost::filesystem::path(copy_from_source).filename().string();
76  }
77 
78  return copy_from_source;
79 }
80 
84  }
85 }
86 
88  std::map<ChunkKey, std::unique_ptr<foreign_storage::ForeignStorageBuffer>>
91  std::unique_ptr<foreign_storage::ForeignStorageBuffer> delete_buffer;
92 };
93 
94 std::unique_ptr<FragmentBuffers> create_fragment_buffers(
95  const int32_t fragment_id,
97  const TableDescriptor* table) {
98  auto columns = catalog.getAllColumnMetadataForTable(table->tableId, false, false, true);
99 
100  std::set<ChunkKey> fragment_keys;
101  for (const auto col_desc : columns) {
102  ChunkKey key{
103  catalog.getDatabaseId(), table->tableId, col_desc->columnId, fragment_id};
104  if (col_desc->columnType.is_varlen_indeed()) {
105  auto data_key = key;
106  data_key.push_back(1);
107  fragment_keys.insert(data_key);
108  auto index_key = key;
109  index_key.push_back(2);
110  fragment_keys.insert(index_key);
111  } else {
112  fragment_keys.insert(key);
113  }
114  }
115 
116  // create buffers
117  std::unique_ptr<FragmentBuffers> frag_buffers = std::make_unique<FragmentBuffers>();
118  frag_buffers->delete_buffer = std::make_unique<foreign_storage::ForeignStorageBuffer>();
119  for (const auto& key : fragment_keys) {
120  frag_buffers->fragment_buffers_owner[key] =
121  std::make_unique<foreign_storage::ForeignStorageBuffer>();
122  frag_buffers->fragment_buffers[key] =
123  shared::get_from_map(frag_buffers->fragment_buffers_owner, key).get();
124  }
125 
126  return frag_buffers;
127 }
128 
132  const TableDescriptor* table,
133  const Catalog_Namespace::SessionInfo* session_info,
134  const import_export::CopyParams& copy_params,
135  const std::string& copy_from_source,
136  import_export::ImportStatus& import_status,
137  std::mutex& communication_mutex,
138  bool& continue_loading,
139  bool& load_failed,
140  bool& data_wrapper_error_occured,
141  std::condition_variable& buffers_to_load_condition,
142  std::list<std::unique_ptr<FragmentBuffers>>& buffers_to_load) {
143  Fragmenter_Namespace::InsertDataLoader insert_data_loader(*connector);
144  while (true) {
145  {
146  std::unique_lock communication_lock(communication_mutex);
147  buffers_to_load_condition.wait(communication_lock, [&]() {
148  return !buffers_to_load.empty() || !continue_loading ||
149  data_wrapper_error_occured;
150  });
151  if ((buffers_to_load.empty() && !continue_loading) || data_wrapper_error_occured) {
152  return;
153  }
154  }
155 
156  CHECK(!buffers_to_load.empty());
157 
158  try {
159  std::unique_ptr<FragmentBuffers> grouped_fragment_buffers;
160  {
161  std::unique_lock communication_lock(communication_mutex);
162  grouped_fragment_buffers.reset(buffers_to_load.front().release());
163  buffers_to_load.pop_front();
164  buffers_to_load_condition.notify_all();
165  }
166  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
167  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
168 
169  // get chunks for import
171  table->tableId, catalog.getDatabaseId(), {}, {}};
172 
173  // create chunks from buffers
174  for (const auto& [key, buffer] : fragment_buffers) {
175  const auto col_id = key[CHUNK_KEY_COLUMN_IDX];
176  const auto table_id = key[CHUNK_KEY_TABLE_IDX];
177  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
178 
179  if (col_desc->columnType.is_varlen_indeed()) {
180  CHECK(key.size() > CHUNK_KEY_VARLEN_IDX); // check for varlen key
181  if (key[CHUNK_KEY_VARLEN_IDX] == 1) { // data key
182  auto index_key = key;
183  index_key[CHUNK_KEY_VARLEN_IDX] = 2;
184  insert_chunks.chunks[col_id] = Chunk_NS::Chunk::getChunk(
185  col_desc,
186  buffer,
187  shared::get_from_map(fragment_buffers, index_key),
188  false);
189  }
190  } else { // regular non-varlen case with no index buffer
191  insert_chunks.chunks[col_id] =
192  Chunk_NS::Chunk::getChunk(col_desc, buffer, nullptr, false);
193  }
194  }
195 
196  // mark which row indices are valid for import
197  auto row_count = fragment_buffers.begin()
198  ->second->getEncoder()
199  ->getNumElems(); // assume all chunks have same number of
200  // rows, this is validated at a lower level
201  insert_chunks.valid_row_indices.reserve(row_count);
202  for (size_t irow = 0; irow < row_count; ++irow) {
203  if (delete_buffer->size() > 0) {
204  CHECK_LE(irow, delete_buffer->size());
205  if (delete_buffer->getMemoryPtr()[irow]) {
206  continue;
207  }
208  }
209  insert_chunks.valid_row_indices.emplace_back(irow);
210  }
211 
212  // import chunks
213  insert_data_loader.insertChunks(*session_info, insert_chunks);
214 
215  CHECK_LE(insert_chunks.valid_row_indices.size(), row_count);
216  import_status.rows_rejected += row_count - insert_chunks.valid_row_indices.size();
217  import_status.rows_completed += insert_chunks.valid_row_indices.size();
218  if (import_status.rows_rejected > copy_params.max_reject) {
219  import_status.load_failed = true;
220  import_status.load_msg =
221  "Load was cancelled due to max reject rows being reached";
223  get_import_id(copy_params, copy_from_source), import_status);
224  std::unique_lock communication_lock(communication_mutex);
225  load_failed = true;
226  buffers_to_load_condition.notify_all();
227  return;
228  }
230  get_import_id(copy_params, copy_from_source), import_status);
231  } catch (...) {
232  {
233  std::unique_lock communication_lock(communication_mutex);
234  load_failed = true;
235  buffers_to_load_condition.notify_all();
236  }
237  throw;
238  }
239  }
240 }
241 
243  const int32_t max_fragment_id,
246  const TableDescriptor* table,
248  const Catalog_Namespace::SessionInfo* session_info,
249  const import_export::CopyParams& copy_params,
250  const std::string& copy_from_source,
251  const size_t maximum_num_fragments_buffered) {
252  import_export::ImportStatus import_status;
253 
254  std::mutex communication_mutex;
255  bool continue_loading =
256  true; // when false, signals that the last buffer to load has been added, and
257  // loading should cease after loading remaining buffers
258  bool load_failed =
259  false; // signals loading has failed and buffer population should cease
260  bool data_wrapper_error_occured = false; // signals an error occured during buffer
261  // population and loading should cease
262  std::condition_variable buffers_to_load_condition;
263  std::list<std::unique_ptr<FragmentBuffers>> buffers_to_load;
264 
265  // launch separate thread to load processed fragment buffers
266  auto load_future = std::async(std::launch::async,
268  connector,
269  std::ref(catalog),
270  table,
271  session_info,
272  std::cref(copy_params),
273  std::cref(copy_from_source),
274  std::ref(import_status),
275  std::ref(communication_mutex),
276  std::ref(continue_loading),
277  std::ref(load_failed),
278  std::ref(data_wrapper_error_occured),
279  std::ref(buffers_to_load_condition),
280  std::ref(buffers_to_load));
281 
282  for (int32_t fragment_id = 0; fragment_id <= max_fragment_id; ++fragment_id) {
283  {
284  std::unique_lock communication_lock(communication_mutex);
285  buffers_to_load_condition.wait(communication_lock, [&]() {
286  return buffers_to_load.size() < maximum_num_fragments_buffered || load_failed;
287  });
288  if (load_failed) {
289  break;
290  }
291  }
292  auto grouped_fragment_buffers = create_fragment_buffers(fragment_id, catalog, table);
293  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
294  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
295 
296  // get the buffers, accounting for the possibility of the requested fragment id being
297  // out of bounds
298  try {
299  data_wrapper->populateChunkBuffers(fragment_buffers, {}, delete_buffer.get());
301  break;
302  } catch (...) {
303  std::unique_lock communication_lock(communication_mutex);
304  data_wrapper_error_occured = true;
305  buffers_to_load_condition.notify_all();
306  throw;
307  }
308 
309  std::unique_lock communication_lock(communication_mutex);
310  buffers_to_load.emplace_back(std::move(grouped_fragment_buffers));
311  buffers_to_load_condition.notify_all();
312  }
313 
314  { // data wrapper processing has finished, notify loading thread
315  std::unique_lock communication_lock(communication_mutex);
316  continue_loading = false;
317  buffers_to_load_condition.notify_all();
318  }
319 
320  // any exceptions in separate loading thread will occur here
321  load_future.get();
322 
323  return import_status;
324 }
325 
326 #ifdef HAVE_AWS_S3
327 struct DownloadedObjectToProcess {
328  std::string object_key;
329  std::atomic<bool> is_downloaded;
330  std::string download_file_path;
331  std::string import_file_path;
332 };
333 
334 size_t get_number_of_digits(const size_t number) {
335  return std::to_string(number).length();
336 }
337 
338 std::tuple<std::string, import_export::CopyParams> get_local_copy_source_and_params(
339  const import_export::CopyParams& s3_copy_params,
340  std::vector<DownloadedObjectToProcess>& objects_to_process,
341  const size_t begin_object_index,
342  const size_t end_object_index) {
343  import_export::CopyParams local_copy_params = s3_copy_params;
344  // remove any members from `local_copy_params` that are only intended to be used at a
345  // higher level
346  local_copy_params.s3_access_key.clear();
347  local_copy_params.s3_secret_key.clear();
348  local_copy_params.s3_session_token.clear();
349  local_copy_params.s3_region.clear();
350  local_copy_params.s3_endpoint.clear();
351 
352  local_copy_params.regex_path_filter = std::nullopt;
353  local_copy_params.file_sort_order_by = "PATHNAME"; // see comment below
354  local_copy_params.file_sort_regex = std::nullopt;
355 
356  CHECK_GT(end_object_index, begin_object_index);
357  CHECK_LT(begin_object_index, objects_to_process.size());
358 
359  size_t num_objects = end_object_index - begin_object_index;
360  auto& first_object = objects_to_process[begin_object_index];
361  std::string first_path = first_object.download_file_path;
362  std::string temp_dir = first_path + "_import";
363 
364  if (!std::filesystem::create_directory(temp_dir)) {
365  throw std::runtime_error("failed to create temporary directory for import: " +
366  temp_dir);
367  }
368 
369  // construct a directory with files to import
370  //
371  // NOTE:
372  // * files are moved into `temp_dir` in the exact order that they appear in
373  // `objects_to_process`
374  //
375  // * the `PATHNAME` option is set for `file_sort_order_by` in order to
376  // guarantee that import occurs in the order specified by user, provided the
377  // data wrapper correctly supports the `PATHNAME` option
378  //
379  // * filenames are chosen such that they appear in lexicographical order by
380  // pathname, thus require padding by appropriate number of zeros
381  //
382  // * filenames must maintain their suffixes for some features of data
383  // wrappers to work correctly, especially in the case of archived data (such
384  // as `.zip` or `.gz` or any variant.)
385  std::filesystem::path temp_dir_path{temp_dir};
386  size_t counter = 0;
387  size_t num_zero = get_number_of_digits(num_objects);
388  for (size_t i = begin_object_index; i < end_object_index; ++i) {
389  auto& object = objects_to_process[i];
390  std::filesystem::path old_path = object.download_file_path;
391  auto counter_str = std::to_string(counter++);
392  auto zero_padded_counter_str =
393  std::string(num_zero - counter_str.length(), '0') + counter_str;
394  auto new_path = (temp_dir_path / zero_padded_counter_str).string() +
395  std::filesystem::path{object.object_key}.extension().string();
396  std::filesystem::rename(old_path, new_path);
397  object.import_file_path = new_path;
398  }
399  return {temp_dir, local_copy_params};
400 }
401 #endif
402 
403 } // namespace
404 
405 namespace import_export {
406 
407 ForeignDataImporter::ForeignDataImporter(const std::string& copy_from_source,
408  const CopyParams& copy_params,
409  const TableDescriptor* table)
410  : copy_from_source_(copy_from_source), copy_params_(copy_params), table_(table) {
411  connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
412 }
413 
415  const Catalog_Namespace::SessionInfo& parent_session_info,
416  ImportStatus& import_status,
417  const std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>>&
418  string_dictionaries) {
419  if (table_->persistenceLevel ==
420  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
421  // tables
422  if (!import_status.load_failed) {
423  auto timer = DEBUG_TIMER("Dictionary Checkpointing");
424  for (const auto& [column_desciptor, string_dictionary] : string_dictionaries) {
425  if (!string_dictionary->checkpoint()) {
426  LOG(ERROR) << "Checkpointing Dictionary for Column "
427  << column_desciptor->columnName << " failed.";
428  import_status.load_failed = true;
429  import_status.load_msg = "Dictionary checkpoint failed";
430  break;
431  }
432  }
433  }
434  }
435  if (import_status.load_failed) {
436  connector_->rollback(parent_session_info, table_->tableId);
437  } else {
438  connector_->checkpoint(parent_session_info, table_->tableId);
439  }
440 }
441 
443  const Catalog_Namespace::SessionInfo& parent_session_info,
444  ImportStatus& import_status,
445  const int32_t table_id) {
446  auto& catalog = parent_session_info.getCatalog();
447 
448  auto logical_columns =
449  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
450 
451  std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>> string_dictionaries;
452  for (const auto& column_descriptor : logical_columns) {
453  if (!column_descriptor->columnType.is_dict_encoded_string()) {
454  continue;
455  }
456  auto dict_descriptor =
457  catalog.getMetadataForDict(column_descriptor->columnType.get_comp_param(), true);
458  string_dictionaries.emplace_back(column_descriptor,
459  dict_descriptor->stringDict.get());
460  }
461 
462  finalize(parent_session_info, import_status, string_dictionaries);
463 }
464 
465 // This value is used only if it is non-zero; it is for testing purposes only
467 
468 namespace {
470  const size_t maximum_num_fragments_buffered,
471  const size_t max_import_batch_row_count,
472  const Catalog_Namespace::SessionInfo& parent_session_info,
473  const int32_t table_id) {
476  }
477 
478  if (max_import_batch_row_count != 0) {
479  return max_import_batch_row_count;
480  }
481 
482  // This number is chosen as a reasonable default value to reserve for
483  // intermediate buffering during import, it is about 2GB of memory. Note,
484  // depending on the acutal size of var len values, this heuristic target may
485  // be off. NOTE: `maximum_num_fragments_buffered` scales the allowed buffer
486  // size with the assumption that in the worst case all buffers may be
487  // buffered at once.
488  const size_t max_buffer_byte_size =
489  2 * 1024UL * 1024UL * 1024UL / maximum_num_fragments_buffered;
490 
491  auto& catalog = parent_session_info.getCatalog();
492 
493  auto logical_columns =
494  catalog.getAllColumnMetadataForTable(table_id, false, false, false);
495 
496  size_t row_byte_size = 0;
497  for (const auto& column_descriptor : logical_columns) {
498  auto type = column_descriptor->columnType;
499  size_t field_byte_length = 0;
500  if (type.is_varlen_indeed()) {
501  // use a heuristic where varlen types are assumed to be 256 bytes in length
502  field_byte_length = 256;
503  } else {
504  field_byte_length = type.get_size();
505  }
506  row_byte_size += field_byte_length;
507  }
508 
509  return std::min<size_t>((max_buffer_byte_size + row_byte_size - 1) / row_byte_size,
511 }
512 } // namespace
513 
515  const Catalog_Namespace::SessionInfo* session_info,
516  const std::string& copy_from_source,
517  const CopyParams& copy_params) {
518  auto& catalog = session_info->getCatalog();
519 
521 
522  // validate copy params before import in order to print user friendly messages
523  validate_copy_params(copy_params);
524 
525  ImportStatus import_status;
526  {
527  auto& current_user = session_info->get_currentUser();
528  auto [server, user_mapping, foreign_table] =
530  copy_params,
531  catalog.getDatabaseId(),
532  table_,
533  current_user.userId);
534 
535  // maximum number of fragments buffered in memory at any one time, affects
536  // `maxFragRows` heuristic below
537  const size_t maximum_num_fragments_buffered = 3;
538  // set fragment size for proxy foreign table during import
539  foreign_table->maxFragRows =
540  get_proxy_foreign_table_fragment_size(maximum_num_fragments_buffered,
541  copy_params.max_import_batch_row_count,
542  *session_info,
543  table_->tableId);
544 
545  // log for debugging purposes
546  LOG(INFO) << "Import fragment row count is " << foreign_table->maxFragRows
547  << " for table " << table_->tableName;
548 
549  auto data_wrapper =
551  copy_params,
552  catalog.getDatabaseId(),
553  foreign_table.get(),
554  user_mapping.get());
555 
556  int32_t max_fragment_id = std::numeric_limits<int32_t>::max();
557  if (!data_wrapper->isLazyFragmentFetchingEnabled()) {
558  ChunkMetadataVector metadata_vector =
559  metadata_scan(data_wrapper.get(), foreign_table.get());
560  if (metadata_vector.empty()) { // an empty data source
561  return {};
562  }
563  max_fragment_id = 0;
564  for (const auto& [key, _] : metadata_vector) {
565  max_fragment_id = std::max(max_fragment_id, key[CHUNK_KEY_FRAGMENT_IDX]);
566  }
567  CHECK_GE(max_fragment_id, 0);
568  }
569 
570  import_status = import_foreign_data(max_fragment_id,
571  connector_.get(),
572  catalog,
573  table_,
574  data_wrapper.get(),
575  session_info,
576  copy_params,
577  copy_from_source,
578  maximum_num_fragments_buffered);
579 
580  } // this scope ensures that fsi proxy objects are destroyed prior to checkpoint
581 
582  finalize(*session_info, import_status, table_->tableId);
583 
584  return import_status;
585 }
586 
588  const Catalog_Namespace::SessionInfo* session_info) {
589  return importGeneral(session_info, copy_from_source_, copy_params_);
590 }
591 
592 void ForeignDataImporter::setDefaultImportPath(const std::string& base_path) {
593  auto data_dir_path = boost::filesystem::canonical(base_path);
594  default_import_path_ = (data_dir_path / shared::kDefaultImportDirName).string();
595 }
596 
598  const Catalog_Namespace::SessionInfo* session_info) {
600 
602 #if ENABLE_IMPORT_PARQUET
604 #endif
606  throw std::runtime_error("Attempting to load S3 resource '" + copy_from_source_ +
607  "' for unsupported 'source_type' (must be 'DELIMITED_FILE'"
608 #if ENABLE_IMPORT_PARQUET
609  ", 'PARQUET_FILE'"
610 #endif
611  " or 'REGEX_PARSED_FILE'");
612  }
613 
618 
619 #ifdef HAVE_AWS_S3
620 
621  auto uuid = boost::uuids::random_generator()();
622  std::string base_path = "s3-import-" + boost::uuids::to_string(uuid);
623  auto import_path = std::filesystem::path(default_import_path_) / base_path;
624 
625  // Ensure files & dirs are cleaned up, regardless of outcome
626  ScopeGuard cleanup_guard = [&] {
627  if (std::filesystem::exists(import_path)) {
628  std::filesystem::remove_all(import_path);
629  }
630  };
631 
632  auto s3_archive = std::make_unique<S3Archive>(copy_from_source_,
642  import_path);
643  s3_archive->init_for_read();
644 
645  const auto bucket_name = s3_archive->url_part(4);
646 
647  auto object_keys = s3_archive->get_objkeys();
648  std::vector<DownloadedObjectToProcess> objects_to_process(object_keys.size());
649  size_t object_count = 0;
650  for (const auto& objkey : object_keys) {
651  auto& object = objects_to_process[object_count++];
652  object.object_key = objkey;
653  object.is_downloaded = false;
654  }
655 
656  ImportStatus aggregate_import_status;
657  const int num_download_threads = copy_params_.s3_max_concurrent_downloads;
658 
659  std::mutex communication_mutex;
660  bool continue_downloading = true;
661  bool download_exception_occured = false;
662 
663  std::condition_variable files_download_condition;
664 
665  auto is_downloading_finished = [&] {
666  std::unique_lock communication_lock(communication_mutex);
667  return !continue_downloading || download_exception_occured;
668  };
669 
670  std::function<void(const std::vector<size_t>&)> download_objects =
671  [&](const std::vector<size_t>& partition) {
672  for (const auto& index : partition) {
673  DownloadedObjectToProcess& object = objects_to_process[index];
674  const std::string& obj_key = object.object_key;
675  if (is_downloading_finished()) {
676  return;
677  }
678  std::exception_ptr eptr; // unused
679  std::string local_file_path;
680  std::string exception_what;
681  bool exception_occured = false;
682 
683  try {
684  local_file_path = s3_archive->land(obj_key,
685  eptr,
686  false,
687  /*allow_named_pipe_use=*/false,
688  /*track_file_path=*/false);
689  } catch (const std::exception& e) {
690  exception_what = e.what();
691  exception_occured = true;
692  }
693 
694  if (is_downloading_finished()) {
695  return;
696  }
697  if (exception_occured) {
698  {
699  std::unique_lock communication_lock(communication_mutex);
700  download_exception_occured = true;
701  }
702  files_download_condition.notify_all();
703  throw std::runtime_error("failed to fetch s3 object: '" + obj_key +
704  "': " + exception_what);
705  }
706 
707  object.download_file_path = local_file_path;
708  object.is_downloaded =
709  true; // this variable is atomic and therefore acts as a lock, it must be
710  // set last to ensure no data race
711 
712  files_download_condition.notify_all();
713  }
714  };
715 
716  std::function<void()> import_local_files = [&]() {
717  for (size_t object_index = 0; object_index < object_count;) {
718  {
719  std::unique_lock communication_lock(communication_mutex);
720  files_download_condition.wait(
721  communication_lock,
722  [&download_exception_occured, object_index, &objects_to_process]() {
723  return objects_to_process[object_index].is_downloaded ||
724  download_exception_occured;
725  });
726  if (download_exception_occured) { // do not wait for object index if a download
727  // error has occured
728  return;
729  }
730  }
731 
732  // find largest range of files to import
733  size_t end_object_index = object_count;
734  for (size_t i = object_index + 1; i < object_count; ++i) {
735  if (!objects_to_process[i].is_downloaded) {
736  end_object_index = i;
737  break;
738  }
739  }
740 
741  ImportStatus local_import_status;
742  std::string local_import_dir;
743  try {
744  CopyParams local_copy_params;
745  std::tie(local_import_dir, local_copy_params) = get_local_copy_source_and_params(
746  copy_params_, objects_to_process, object_index, end_object_index);
747  local_import_status =
748  importGeneral(session_info, local_import_dir, local_copy_params);
749  // clean up temporary files
750  std::filesystem::remove_all(local_import_dir);
751  } catch (const std::exception& except) {
752  // replace all occurences of file names with the object keys for
753  // users
754  std::string what = except.what();
755 
756  for (size_t i = object_index; i < end_object_index; ++i) {
757  auto& object = objects_to_process[i];
758  what = boost::regex_replace(what,
759  boost::regex{object.import_file_path},
760  bucket_name + "/" + object.object_key);
761  }
762  {
763  std::unique_lock communication_lock(communication_mutex);
764  continue_downloading = false;
765  }
766  // clean up temporary files
767  std::filesystem::remove_all(local_import_dir);
768  throw std::runtime_error(what);
769  }
770  aggregate_import_status += local_import_status;
772  aggregate_import_status);
773  if (aggregate_import_status.load_failed) {
774  {
775  std::unique_lock communication_lock(communication_mutex);
776  continue_downloading = false;
777  }
778  return;
779  }
780 
781  object_index =
782  end_object_index; // all objects in range [object_index,end_object_index)
783  // correctly imported at this point in excecution, move onto
784  // next range
785  }
786  };
787 
788  std::vector<size_t> partition_range(object_count);
789  std::iota(partition_range.begin(), partition_range.end(), 0);
790  auto download_futures = foreign_storage::create_futures_for_workers(
791  partition_range, num_download_threads, download_objects);
792 
793  auto import_future = std::async(std::launch::async, import_local_files);
794 
795  for (auto& future : download_futures) {
796  future.wait();
797  }
798  import_future.get(); // may throw an exception
799 
800  // get any remaining exceptions
801  for (auto& future : download_futures) {
802  future.get();
803  }
804  return aggregate_import_status;
805 
806 #else
807  throw std::runtime_error("AWS S3 support not available");
808 
809  return {};
810 #endif
811 }
812 
813 #ifdef ENABLE_IMPORT_PARQUET
814 ImportStatus ForeignDataImporter::importParquet(
815  const Catalog_Namespace::SessionInfo* session_info) {
816  auto& catalog = session_info->getCatalog();
817 
819 
820  auto& current_user = session_info->get_currentUser();
822  catalog.getDatabaseId(), current_user.userId, copy_from_source_, copy_params_);
823 
824  auto user_mapping =
826  catalog.getDatabaseId(),
827  current_user.userId,
829  copy_params_,
830  server.get());
831 
832  auto foreign_table =
834  catalog.getDatabaseId(), table_, copy_from_source_, copy_params_, server.get());
835 
836  foreign_table->validateOptionValues();
837 
840  catalog.getDatabaseId(),
841  foreign_table.get(),
842  user_mapping.get());
843 
844  if (auto parquet_import =
845  dynamic_cast<foreign_storage::ParquetImporter*>(data_wrapper.get())) {
847 
848  // determine the number of threads to use at each level
849 
850  int max_threads = 0;
851  if (copy_params_.threads == 0) {
852  max_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
854  } else {
855  max_threads = static_cast<size_t>(copy_params_.threads);
856  }
857  CHECK_GT(max_threads, 0);
858 
859  int num_importer_threads =
860  std::min<int>(max_threads, parquet_import->getMaxNumUsefulThreads());
861  parquet_import->setNumThreads(num_importer_threads);
862  int num_outer_thread = 1;
863  for (int thread_count = 1; thread_count <= max_threads; ++thread_count) {
864  if (thread_count * num_importer_threads <= max_threads) {
865  num_outer_thread = thread_count;
866  }
867  }
868 
869  std::shared_mutex import_status_mutex;
870  ImportStatus import_status; // manually update
871 
872  auto import_failed = [&import_status_mutex, &import_status] {
873  std::shared_lock import_status_lock(import_status_mutex);
874  return import_status.load_failed;
875  };
876 
877  std::vector<std::future<void>> futures;
878 
879  for (int ithread = 0; ithread < num_outer_thread; ++ithread) {
880  futures.emplace_back(std::async(std::launch::async, [&] {
881  while (true) {
882  auto batch_result = parquet_import->getNextImportBatch();
883  if (import_failed()) {
884  break;
885  }
886  auto batch = batch_result->getInsertData();
887  if (!batch || import_failed()) {
888  break;
889  }
890  insert_data_loader.insertData(*session_info, *batch);
891 
892  auto batch_import_status = batch_result->getImportStatus();
893  {
894  std::unique_lock import_status_lock(import_status_mutex);
895  import_status.rows_completed += batch_import_status.rows_completed;
896  import_status.rows_rejected += batch_import_status.rows_rejected;
897  if (import_status.rows_rejected > copy_params_.max_reject) {
898  import_status.load_failed = true;
899  import_status.load_msg =
900  "Load was cancelled due to max reject rows being reached";
901  break;
902  }
903  }
904  }
905  }));
906  }
907 
908  for (auto& future : futures) {
909  future.wait();
910  }
911 
912  for (auto& future : futures) {
913  future.get();
914  }
915 
916  if (import_status.load_failed) {
917  foreign_table.reset(); // this is to avoid calling the TableDescriptor dtor after
918  // the rollback in the checkpoint below
919  }
920 
921  finalize(*session_info, import_status, parquet_import->getStringDictionaries());
922 
923  return import_status;
924  }
925 
926  UNREACHABLE();
927  return {};
928 }
929 #endif
930 
932  const Catalog_Namespace::SessionInfo* session_info) {
933  if (shared::is_s3_uri(copy_from_source_)) {
934  return importGeneralS3(session_info);
935  }
936  return importGeneral(session_info);
937 }
938 
939 } // namespace import_export
std::unique_ptr< foreign_storage::ForeignStorageBuffer > delete_buffer
static std::unique_ptr< ForeignDataWrapper > createForImport(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
std::string s3_secret_key
Definition: CopyParams.h:62
std::vector< int > ChunkKey
Definition: types.h:36
void insertChunks(const Catalog_Namespace::SessionInfo &session_info, const InsertChunks &insert_chunks)
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
std::string tableName
std::string get_import_id(const import_export::CopyParams &copy_params, const std::string &copy_from_source)
bool is_s3_uri(const std::string &file_path)
bool g_enable_legacy_delimited_import
Definition: ParserNode.cpp:84
#define LOG(tag)
Definition: Logger.h:285
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void validate_sort_options(const FilePathOptions &options)
#define UNREACHABLE()
Definition: Logger.h:338
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::unique_ptr< FragmentBuffers > create_fragment_buffers(const int32_t fragment_id, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table)
#define CHECK_GE(x, y)
Definition: Logger.h:306
ChunkMetadataVector metadata_scan(foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string to_string(char const *&&v)
ImportStatus import(const Catalog_Namespace::SessionInfo *session_info) override
std::tuple< std::unique_ptr< foreign_storage::ForeignServer >, std::unique_ptr< foreign_storage::UserMapping >, std::unique_ptr< foreign_storage::ForeignTable > > create_proxy_fsi_objects(const std::string &copy_from_source, const import_export::CopyParams &copy_params, const int db_id, const TableDescriptor *table, const int32_t user_id)
Create proxy fsi objects for use outside FSI.
std::shared_lock< T > shared_lock
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:85
const std::string kDefaultImportDirName
static std::unique_ptr< ForeignDataWrapper > createForGeneralImport(const import_export::CopyParams &copy_params, const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
future< Result > async(Fn &&fn, Args &&...args)
static void setDefaultImportPath(const std::string &base_path)
Classes representing a parse tree.
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
std::map< ChunkKey, std::unique_ptr< foreign_storage::ForeignStorageBuffer > > fragment_buffers_owner
int32_t s3_max_concurrent_downloads
Definition: CopyParams.h:66
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
int getDatabaseId() const
Definition: Catalog.h:326
ForeignDataImporter(const std::string &file_path, const CopyParams &copy_params, const TableDescriptor *table)
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
import_export::SourceType source_type
Definition: CopyParams.h:57
std::unique_ptr< Fragmenter_Namespace::InsertDataLoader::InsertConnector > connector_
bool is_valid_source_type(const import_export::CopyParams &copy_params)
void load_foreign_data_buffers(Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, import_export::ImportStatus &import_status, std::mutex &communication_mutex, bool &continue_loading, bool &load_failed, bool &data_wrapper_error_occured, std::condition_variable &buffers_to_load_condition, std::list< std::unique_ptr< FragmentBuffers >> &buffers_to_load)
ImportStatus importGeneralS3(const Catalog_Namespace::SessionInfo *session_info)
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:240
#define CHECK_LT(x, y)
Definition: Logger.h:303
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
#define CHECK_LE(x, y)
Definition: Logger.h:304
void validate_regex_parser_options(const import_export::CopyParams &copy_params)
bool g_enable_fsi_regex_import
Definition: ParserNode.cpp:88
Catalog & getCatalog() const
Definition: SessionInfo.h:75
#define DEFAULT_FRAGMENT_ROWS
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
void finalize(const Catalog_Namespace::SessionInfo &parent_session_info, ImportStatus &import_status, const std::vector< std::pair< const ColumnDescriptor *, StringDictionary * >> &string_dictionaries)
static std::unique_ptr< ForeignServer > createForeignServerProxy(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2175
Data_Namespace::MemoryLevel persistenceLevel
std::string s3_session_token
Definition: CopyParams.h:63
static std::unique_ptr< ForeignTable > createForeignTableProxy(const int db_id, const TableDescriptor *table, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:42
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
int32_t get_proxy_foreign_table_fragment_size(const size_t maximum_num_fragments_buffered, const size_t max_import_batch_row_count, const Catalog_Namespace::SessionInfo &parent_session_info, const int32_t table_id)
virtual void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector)=0
import_export::ImportStatus import_foreign_data(const int32_t max_fragment_id, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, const size_t maximum_num_fragments_buffered)
void validate_copy_params(const import_export::CopyParams &copy_params)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
std::shared_timed_mutex shared_mutex
ImportStatus importGeneral(const Catalog_Namespace::SessionInfo *session_info)
std::string s3_access_key
Definition: CopyParams.h:61
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:86
static constexpr char const * PARQUET
size_t g_max_import_threads
Definition: Importer.cpp:105
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:87
static std::unique_ptr< UserMapping > createUserMappingProxyIfApplicable(const int db_id, const int user_id, const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server)