OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Importer.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.h
19  * @brief Importer class for table import from file
20  *
21  */
22 
23 #ifndef _IMPORTER_H_
24 #define _IMPORTER_H_
25 
26 #include <atomic>
27 #include <boost/filesystem.hpp>
28 #include <boost/noncopyable.hpp>
29 #include <boost/tokenizer.hpp>
30 #include <condition_variable>
31 #include <cstdio>
32 #include <cstdlib>
33 #include <iostream>
34 #include <list>
35 #include <map>
36 #include <memory>
37 #include <mutex>
38 #include <set>
39 #include <string>
40 #include <string_view>
41 #include <utility>
42 
43 #include "AbstractImporter.h"
44 #include "Catalog/Catalog.h"
46 #include "DataMgr/Chunk/Chunk.h"
47 #if defined(ENABLE_IMPORT_PARQUET)
49 #endif
50 #include "Fragmenter/Fragmenter.h"
51 #include "Geospatial/GDAL.h"
53 #include "Logger/Logger.h"
55 #include "Shared/checked_alloc.h"
56 #include "Shared/fixautotools.h"
57 // Some builds of boost::geometry require iostream, but don't explicitly include it.
58 // Placing in own section to ensure it's included after iostream.
59 #include <boost/geometry/index/rtree.hpp>
60 
61 class TDatum;
62 class TColumn;
63 
64 namespace arrow {
65 
66 class Array;
67 
68 } // namespace arrow
69 
70 namespace import_export {
71 
72 class Importer;
73 
74 using ArraySliceRange = std::pair<size_t, size_t>;
75 
77  std::mutex mutex;
78  std::set<int64_t> rows;
79  std::atomic<int> nerrors;
80  std::string file_name;
81  int row_group;
83 };
84 
86  public:
87  static ArrayDatum composeNullArray(const SQLTypeInfo& ti);
88  static ArrayDatum composeNullPointCoords(const SQLTypeInfo& coords_ti,
89  const SQLTypeInfo& geo_ti);
90 };
91 
92 class TypedImportBuffer : boost::noncopyable {
93  public:
94  using OptionalStringVector = std::optional<std::vector<std::string>>;
95  TypedImportBuffer(const ColumnDescriptor* col_desc, StringDictionary* string_dict)
96  : column_desc_(col_desc), string_dict_(string_dict) {
97  switch (col_desc->columnType.get_type()) {
98  case kBOOLEAN:
99  bool_buffer_ = new std::vector<int8_t>();
100  break;
101  case kTINYINT:
102  tinyint_buffer_ = new std::vector<int8_t>();
103  break;
104  case kSMALLINT:
105  smallint_buffer_ = new std::vector<int16_t>();
106  break;
107  case kINT:
108  int_buffer_ = new std::vector<int32_t>();
109  break;
110  case kBIGINT:
111  case kNUMERIC:
112  case kDECIMAL:
113  bigint_buffer_ = new std::vector<int64_t>();
114  break;
115  case kFLOAT:
116  float_buffer_ = new std::vector<float>();
117  break;
118  case kDOUBLE:
119  double_buffer_ = new std::vector<double>();
120  break;
121  case kTEXT:
122  case kVARCHAR:
123  case kCHAR:
124  string_buffer_ = new std::vector<std::string>();
125  if (col_desc->columnType.get_compression() == kENCODING_DICT) {
126  switch (col_desc->columnType.get_size()) {
127  case 1:
128  string_dict_i8_buffer_ = new std::vector<uint8_t>();
129  break;
130  case 2:
131  string_dict_i16_buffer_ = new std::vector<uint16_t>();
132  break;
133  case 4:
134  string_dict_i32_buffer_ = new std::vector<int32_t>();
135  break;
136  default:
137  CHECK(false);
138  }
139  }
140  break;
141  case kDATE:
142  case kTIME:
143  case kTIMESTAMP:
144  bigint_buffer_ = new std::vector<int64_t>();
145  break;
146  case kARRAY:
147  if (IS_STRING(col_desc->columnType.get_subtype())) {
149  string_array_buffer_ = new std::vector<OptionalStringVector>();
150  string_array_dict_buffer_ = new std::vector<ArrayDatum>();
151  } else {
152  array_buffer_ = new std::vector<ArrayDatum>();
153  }
154  break;
155  case kPOINT:
156  case kMULTIPOINT:
157  case kLINESTRING:
158  case kMULTILINESTRING:
159  case kPOLYGON:
160  case kMULTIPOLYGON:
161  geo_string_buffer_ = new std::vector<std::string>();
162  break;
163  default:
164  CHECK(false);
165  }
166  }
167 
169  switch (column_desc_->columnType.get_type()) {
170  case kBOOLEAN:
171  delete bool_buffer_;
172  break;
173  case kTINYINT:
174  delete tinyint_buffer_;
175  break;
176  case kSMALLINT:
177  delete smallint_buffer_;
178  break;
179  case kINT:
180  delete int_buffer_;
181  break;
182  case kBIGINT:
183  case kNUMERIC:
184  case kDECIMAL:
185  delete bigint_buffer_;
186  break;
187  case kFLOAT:
188  delete float_buffer_;
189  break;
190  case kDOUBLE:
191  delete double_buffer_;
192  break;
193  case kTEXT:
194  case kVARCHAR:
195  case kCHAR:
196  delete string_buffer_;
198  switch (column_desc_->columnType.get_size()) {
199  case 1:
200  delete string_dict_i8_buffer_;
201  break;
202  case 2:
204  break;
205  case 4:
207  break;
208  }
209  }
210  break;
211  case kDATE:
212  case kTIME:
213  case kTIMESTAMP:
214  delete bigint_buffer_;
215  break;
216  case kARRAY:
218  delete string_array_buffer_;
220  } else {
221  delete array_buffer_;
222  }
223  break;
224  case kPOINT:
225  case kMULTIPOINT:
226  case kLINESTRING:
227  case kMULTILINESTRING:
228  case kPOLYGON:
229  case kMULTIPOLYGON:
230  delete geo_string_buffer_;
231  break;
232  default:
233  CHECK(false);
234  }
235  }
236 
237  void addBoolean(const int8_t v) { bool_buffer_->push_back(v); }
238 
239  void addTinyint(const int8_t v) { tinyint_buffer_->push_back(v); }
240 
241  void addSmallint(const int16_t v) { smallint_buffer_->push_back(v); }
242 
243  void addInt(const int32_t v) { int_buffer_->push_back(v); }
244 
245  void addBigint(const int64_t v) { bigint_buffer_->push_back(v); }
246 
247  void addFloat(const float v) { float_buffer_->push_back(v); }
248 
249  void addDouble(const double v) { double_buffer_->push_back(v); }
250 
251  void addString(const std::string_view v) { string_buffer_->emplace_back(v); }
252 
253  void addDictStringWithTruncation(std::string_view v) {
254  if (v.size() > StringDictionary::MAX_STRLEN) {
255  v = v.substr(0, StringDictionary::MAX_STRLEN);
256  }
257  string_buffer_->emplace_back(v);
258  }
259 
260  void addGeoString(const std::string_view v) { geo_string_buffer_->emplace_back(v); }
261 
262  void addArray(const ArrayDatum& v) { array_buffer_->push_back(v); }
263 
265  string_array_buffer_->emplace_back(std::vector<std::string>{});
266  return string_array_buffer_->back();
267  }
268 
270  string_array_buffer_->push_back(arr);
271  }
272 
273  void addDictEncodedString(const std::vector<std::string>& string_vec);
274 
276  const std::vector<OptionalStringVector>& string_array_vec) {
278 
279  // first check data is ok
280  for (auto& p : string_array_vec) {
281  if (!p) {
282  continue;
283  }
284  for (const auto& str : *p) {
285  if (str.size() > StringDictionary::MAX_STRLEN) {
286  throw std::runtime_error("String too long for dictionary encoding.");
287  }
288  }
289  }
290 
291  // to avoid copying, create a string view of each string in the
292  // `string_array_vec` where the array holding the string is *not null*
293  std::vector<std::vector<std::string_view>> string_view_array_vec;
294  for (auto& p : string_array_vec) {
295  if (!p) {
296  continue;
297  }
298  auto& array = string_view_array_vec.emplace_back();
299  for (const auto& str : *p) {
300  array.emplace_back(str);
301  }
302  }
303 
304  std::vector<std::vector<int32_t>> ids_array(0);
305  string_dict_->getOrAddBulkArray(string_view_array_vec, ids_array);
306 
307  size_t i, j;
308  for (i = 0, j = 0; i < string_array_vec.size(); ++i) {
309  if (!string_array_vec[i]) { // null array
310  string_array_dict_buffer_->push_back(
312  } else { // non-null array
313  auto& p = ids_array[j++];
314  size_t len = p.size() * sizeof(int32_t);
315  auto a = static_cast<int32_t*>(checked_malloc(len));
316  memcpy(a, &p[0], len);
317  string_array_dict_buffer_->push_back(
318  ArrayDatum(len, reinterpret_cast<int8_t*>(a), false));
319  }
320  }
321  }
322 
323  const SQLTypeInfo& getTypeInfo() const { return column_desc_->columnType; }
324 
325  const ColumnDescriptor* getColumnDesc() const { return column_desc_; }
326 
328 
329  int8_t* getAsBytes() const {
330  switch (column_desc_->columnType.get_type()) {
331  case kBOOLEAN:
332  return reinterpret_cast<int8_t*>(bool_buffer_->data());
333  case kTINYINT:
334  return reinterpret_cast<int8_t*>(tinyint_buffer_->data());
335  case kSMALLINT:
336  return reinterpret_cast<int8_t*>(smallint_buffer_->data());
337  case kINT:
338  return reinterpret_cast<int8_t*>(int_buffer_->data());
339  case kBIGINT:
340  case kNUMERIC:
341  case kDECIMAL:
342  return reinterpret_cast<int8_t*>(bigint_buffer_->data());
343  case kFLOAT:
344  return reinterpret_cast<int8_t*>(float_buffer_->data());
345  case kDOUBLE:
346  return reinterpret_cast<int8_t*>(double_buffer_->data());
347  case kDATE:
348  case kTIME:
349  case kTIMESTAMP:
350  return reinterpret_cast<int8_t*>(bigint_buffer_->data());
351  default:
352  abort();
353  }
354  }
355 
356  size_t getElementSize() const {
357  switch (column_desc_->columnType.get_type()) {
358  case kBOOLEAN:
359  return sizeof((*bool_buffer_)[0]);
360  case kTINYINT:
361  return sizeof((*tinyint_buffer_)[0]);
362  case kSMALLINT:
363  return sizeof((*smallint_buffer_)[0]);
364  case kINT:
365  return sizeof((*int_buffer_)[0]);
366  case kBIGINT:
367  case kNUMERIC:
368  case kDECIMAL:
369  return sizeof((*bigint_buffer_)[0]);
370  case kFLOAT:
371  return sizeof((*float_buffer_)[0]);
372  case kDOUBLE:
373  return sizeof((*double_buffer_)[0]);
374  case kDATE:
375  case kTIME:
376  case kTIMESTAMP:
377  return sizeof((*bigint_buffer_)[0]);
378  default:
379  abort();
380  }
381  }
382 
383  std::vector<std::string>* getStringBuffer() const { return string_buffer_; }
384 
385  std::vector<std::string>* getGeoStringBuffer() const { return geo_string_buffer_; }
386 
387  std::vector<ArrayDatum>* getArrayBuffer() const { return array_buffer_; }
388 
389  std::vector<OptionalStringVector>* getStringArrayBuffer() const {
390  return string_array_buffer_;
391  }
392 
393  std::vector<ArrayDatum>* getStringArrayDictBuffer() const {
395  }
396 
397  int8_t* getStringDictBuffer() const {
398  switch (column_desc_->columnType.get_size()) {
399  case 1:
400  return reinterpret_cast<int8_t*>(string_dict_i8_buffer_->data());
401  case 2:
402  return reinterpret_cast<int8_t*>(string_dict_i16_buffer_->data());
403  case 4:
404  return reinterpret_cast<int8_t*>(string_dict_i32_buffer_->data());
405  default:
406  abort();
407  }
408  }
409 
411  if (string_dict_ == nullptr) {
412  return true;
413  }
414  return string_dict_->checkpoint();
415  }
416 
417  void clear() {
418  switch (column_desc_->columnType.get_type()) {
419  case kBOOLEAN: {
420  bool_buffer_->clear();
421  break;
422  }
423  case kTINYINT: {
424  tinyint_buffer_->clear();
425  break;
426  }
427  case kSMALLINT: {
428  smallint_buffer_->clear();
429  break;
430  }
431  case kINT: {
432  int_buffer_->clear();
433  break;
434  }
435  case kBIGINT:
436  case kNUMERIC:
437  case kDECIMAL: {
438  bigint_buffer_->clear();
439  break;
440  }
441  case kFLOAT: {
442  float_buffer_->clear();
443  break;
444  }
445  case kDOUBLE: {
446  double_buffer_->clear();
447  break;
448  }
449  case kTEXT:
450  case kVARCHAR:
451  case kCHAR: {
452  string_buffer_->clear();
454  switch (column_desc_->columnType.get_size()) {
455  case 1:
456  string_dict_i8_buffer_->clear();
457  break;
458  case 2:
459  string_dict_i16_buffer_->clear();
460  break;
461  case 4:
462  string_dict_i32_buffer_->clear();
463  break;
464  default:
465  CHECK(false);
466  }
467  }
468  break;
469  }
470  case kDATE:
471  case kTIME:
472  case kTIMESTAMP:
473  bigint_buffer_->clear();
474  break;
475  case kARRAY: {
477  string_array_buffer_->clear();
478  string_array_dict_buffer_->clear();
479  } else {
480  array_buffer_->clear();
481  }
482  break;
483  }
484  case kPOINT:
485  case kMULTIPOINT:
486  case kLINESTRING:
487  case kMULTILINESTRING:
488  case kPOLYGON:
489  case kMULTIPOLYGON:
490  geo_string_buffer_->clear();
491  break;
492  default:
493  CHECK(false);
494  }
495  }
496 
497  size_t add_values(const ColumnDescriptor* cd, const TColumn& data);
498 
499  size_t add_arrow_values(const ColumnDescriptor* cd,
500  const arrow::Array& data,
501  const bool exact_type_match,
502  const ArraySliceRange& slice_range,
503  BadRowsTracker* bad_rows_tracker);
504 
505  void add_value(const ColumnDescriptor* cd,
506  const std::string_view val,
507  const bool is_null,
508  const CopyParams& copy_params,
509  const bool check_not_null = true);
510 
511  void add_value(const ColumnDescriptor* cd, const TDatum& val, const bool is_null);
512 
513  void addDefaultValues(const ColumnDescriptor* cd, size_t num_rows);
514 
515  void pop_value();
516 
517  template <typename DATA_TYPE>
519  const arrow::Array& array,
520  std::vector<DATA_TYPE>& buffer,
521  const ArraySliceRange& slice_range,
522  BadRowsTracker* const bad_rows_tracker);
523  template <typename DATA_TYPE>
524  auto del_values(std::vector<DATA_TYPE>& buffer, BadRowsTracker* const bad_rows_tracker);
525  auto del_values(const SQLTypes type, BadRowsTracker* const bad_rows_tracker);
526 
527  static std::vector<DataBlockPtr> get_data_block_pointers(
528  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers);
529 
530  std::vector<std::unique_ptr<TypedImportBuffer>>* import_buffers;
531  size_t col_idx;
532 
533  private:
534  union {
535  std::vector<int8_t>* bool_buffer_;
536  std::vector<int8_t>* tinyint_buffer_;
537  std::vector<int16_t>* smallint_buffer_;
538  std::vector<int32_t>* int_buffer_;
539  std::vector<int64_t>* bigint_buffer_;
540  std::vector<float>* float_buffer_;
541  std::vector<double>* double_buffer_;
542  std::vector<std::string>* string_buffer_;
543  std::vector<std::string>* geo_string_buffer_;
544  std::vector<ArrayDatum>* array_buffer_;
545  std::vector<OptionalStringVector>* string_array_buffer_;
546  };
547  union {
548  std::vector<uint8_t>* string_dict_i8_buffer_;
549  std::vector<uint16_t>* string_dict_i16_buffer_;
550  std::vector<int32_t>* string_dict_i32_buffer_;
551  std::vector<ArrayDatum>* string_array_dict_buffer_;
552  };
555 };
556 
557 class Loader {
558  using LoadCallbackType =
559  std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&,
560  std::vector<DataBlockPtr>&,
561  size_t)>;
562 
563  public:
564  // ParquetDataWrapper
566  const TableDescriptor* t,
567  LoadCallbackType load_callback = nullptr)
568  : catalog_(c)
569  , table_desc_(t)
570  , column_descs_(c.getAllColumnMetadataForTable(t->tableId, false, false, true))
571  , load_callback_(load_callback) {
572  init();
573  }
574 
575  virtual ~Loader() {}
576 
578  const TableDescriptor* getTableDesc() const { return table_desc_; }
579  const std::list<const ColumnDescriptor*>& get_column_descs() const {
580  return column_descs_;
581  }
582 
584  if ((cd->columnType.get_type() != kARRAY ||
585  !IS_STRING(cd->columnType.get_subtype())) &&
586  (!cd->columnType.is_string() ||
588  return nullptr;
589  }
590  return dict_map_.at(cd->columnId);
591  }
592 
593  virtual bool load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
594  const size_t row_count,
595  const Catalog_Namespace::SessionInfo* session_info);
596  virtual bool loadNoCheckpoint(
597  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
598  const size_t row_count,
599  const Catalog_Namespace::SessionInfo* session_info);
600  virtual void checkpoint();
601  virtual std::vector<Catalog_Namespace::TableEpochInfo> getTableEpochs() const;
602  virtual void setTableEpochs(
603  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
604 
605  void setAddingColumns(const bool adding_columns) { adding_columns_ = adding_columns; }
606  bool isAddingColumns() const { return adding_columns_; }
607  void dropColumns(const std::vector<int>& columns);
608  std::string getErrorMessage() { return error_msg_; };
609 
610  protected:
611  void init();
612 
613  virtual bool loadImpl(
614  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
615  size_t row_count,
616  bool checkpoint,
617  const Catalog_Namespace::SessionInfo* session_info);
618 
619  using OneShardBuffers = std::vector<std::unique_ptr<TypedImportBuffer>>;
620  void distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
621  std::vector<size_t>& all_shard_row_counts,
622  const OneShardBuffers& import_buffers,
623  const size_t row_count,
624  const size_t shard_count,
625  const Catalog_Namespace::SessionInfo* session_info);
626 
629  std::list<const ColumnDescriptor*> column_descs_;
632  std::map<int, StringDictionary*> dict_map_;
633 
634  private:
635  bool loadToShard(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
636  size_t row_count,
637  const TableDescriptor* shard_table,
638  bool checkpoint,
639  const Catalog_Namespace::SessionInfo* session_info);
641  std::vector<OneShardBuffers>& all_shard_import_buffers,
642  std::vector<size_t>& all_shard_row_counts,
643  const OneShardBuffers& import_buffers,
644  const size_t row_count,
645  const size_t shard_count,
646  const Catalog_Namespace::SessionInfo* session_info);
648  std::vector<OneShardBuffers>& all_shard_import_buffers,
649  std::vector<size_t>& all_shard_row_counts,
650  const OneShardBuffers& import_buffers,
651  const size_t row_count,
652  const size_t shard_count,
653  const Catalog_Namespace::SessionInfo* session_info);
654  void fillShardRow(const size_t row_index,
655  OneShardBuffers& shard_output_buffers,
656  const OneShardBuffers& import_buffers);
657 
658  bool adding_columns_ = false;
659  std::mutex loader_mutex_;
660  std::string error_msg_;
661 };
662 
663 struct ImportStatus {
664  std::chrono::steady_clock::time_point start;
665  std::chrono::steady_clock::time_point end;
669  std::chrono::duration<size_t, std::milli> elapsed;
670  bool load_failed = false;
671  std::string load_msg;
672  int thread_id; // to recall thread_id after thread exit
674  : start(std::chrono::steady_clock::now())
675  , rows_completed(0)
676  , rows_estimated(0)
677  , rows_rejected(0)
678  , elapsed(0)
679  , thread_id(0) {}
680 
684  if (is.load_failed) {
685  load_failed = true;
686  load_msg = is.load_msg;
687  }
688 
689  return *this;
690  }
691 };
692 
694  public:
696  DataStreamSink(const CopyParams& copy_params, const std::string file_path)
697  : copy_params(copy_params), file_path(file_path) {}
698  virtual ~DataStreamSink() {}
700  const std::string& file_path,
701  const bool decompressed,
702  const Catalog_Namespace::SessionInfo* session_info) = 0;
703 #ifdef ENABLE_IMPORT_PARQUET
704  virtual void import_parquet(std::vector<std::string>& file_paths,
705  const Catalog_Namespace::SessionInfo* session_info);
706  virtual void import_local_parquet(
707  const std::string& file_path,
708  const Catalog_Namespace::SessionInfo* session_info) = 0;
709 #endif
710  const CopyParams& get_copy_params() const {
711  return copy_params;
712  }
713  void import_compressed(std::vector<std::string>& file_paths,
714  const Catalog_Namespace::SessionInfo* session_info);
715 
716  protected:
718 
720  const std::string file_path;
721  FILE* p_file = nullptr;
724  size_t total_file_size{0};
725  std::vector<size_t> file_offsets;
726  std::mutex file_offsets_mutex;
727 };
728 
729 class Detector : public DataStreamSink {
730  public:
731  Detector(const boost::filesystem::path& fp, CopyParams& cp);
732 
733 #ifdef ENABLE_IMPORT_PARQUET
734  void import_local_parquet(const std::string& file_path,
735  const Catalog_Namespace::SessionInfo* session_info) override;
736 #endif
737  static SQLTypes detect_sqltype(const std::string& str);
738  std::vector<std::string> get_headers();
739  std::vector<std::vector<std::string>> raw_rows;
740  std::vector<std::vector<std::string>> get_sample_rows(size_t n);
741  bool has_headers = false;
742 
743  std::vector<SQLTypeInfo> getBestColumnTypes() const;
744 
745  private:
746  void init();
747  void read_file();
748  void detect_row_delimiter();
749  void split_raw_data();
750  std::vector<SQLTypes> detect_column_types(const std::vector<std::string>& row);
751  static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b);
752  void find_best_sqltypes();
753  std::vector<SQLTypes> find_best_sqltypes(
754  const std::vector<std::vector<std::string>>& raw_rows,
755  const CopyParams& copy_params);
756  std::vector<SQLTypes> find_best_sqltypes(
757  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
758  const std::vector<std::vector<std::string>>::const_iterator& row_end,
759  const CopyParams& copy_params);
760 
761  std::vector<EncodingType> find_best_encodings(
762  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
763  const std::vector<std::vector<std::string>>::const_iterator& row_end,
764  const std::vector<SQLTypes>& best_types);
765 
766  bool detect_headers(const std::vector<SQLTypes>& first_types,
767  const std::vector<SQLTypes>& rest_types);
770  const std::string& file_path,
771  const bool decompressed,
772  const Catalog_Namespace::SessionInfo* session_info) override;
773  std::string raw_data;
774  boost::filesystem::path file_path;
775  std::chrono::duration<double> timeout{1};
776  std::string line1;
777 #if defined(ENABLE_IMPORT_PARQUET)
778  std::optional<foreign_storage::DataPreview> data_preview_;
779 #endif
780  std::vector<SQLTypes> best_sqltypes;
781  std::vector<EncodingType> best_encodings;
782 };
783 
784 class Importer : public DataStreamSink, public AbstractImporter {
785  public:
787  const TableDescriptor* t,
788  const std::string& f,
789  const CopyParams& p);
790  Importer(Loader* providedLoader, const std::string& f, const CopyParams& p);
791  ~Importer() override;
792  ImportStatus import(const Catalog_Namespace::SessionInfo* session_info) override;
794  const std::string& file_path,
795  const bool decompressed,
796  const Catalog_Namespace::SessionInfo* session_info) override;
797  ImportStatus importGDAL(const std::map<std::string, std::string>& colname_to_src,
798  const Catalog_Namespace::SessionInfo* session_info,
799  const bool is_raster);
800  const CopyParams& get_copy_params() const { return copy_params; }
801  const std::list<const ColumnDescriptor*>& get_column_descs() const {
802  return loader->get_column_descs();
803  }
804  void load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
805  size_t row_count,
806  const Catalog_Namespace::SessionInfo* session_info);
807  std::vector<std::vector<std::unique_ptr<TypedImportBuffer>>>& get_import_buffers_vec() {
808  return import_buffers_vec;
809  }
810  std::vector<std::unique_ptr<TypedImportBuffer>>& get_import_buffers(int i) {
811  return import_buffers_vec[i];
812  }
813  const bool* get_is_array() const { return is_array_a.get(); }
814 #ifdef ENABLE_IMPORT_PARQUET
815  void import_local_parquet(const std::string& file_path,
816  const Catalog_Namespace::SessionInfo* session_info) override;
817 #endif
818  static ImportStatus get_import_status(const std::string& id);
819  static void set_import_status(const std::string& id, const ImportStatus is);
820  static const std::list<ColumnDescriptor> gdalToColumnDescriptors(
821  const std::string& fileName,
822  const bool is_raster,
823  const std::string& geoColumnName,
824  const CopyParams& copy_params);
825  static void readMetadataSampleGDAL(
826  const std::string& fileName,
827  const std::string& geoColumnName,
828  std::map<std::string, std::vector<std::string>>& metadata,
829  int rowLimit,
830  const CopyParams& copy_params);
831  static bool gdalFileExists(const std::string& path, const CopyParams& copy_params);
832  static bool gdalFileOrDirectoryExists(const std::string& path,
833  const CopyParams& copy_params);
834  static std::vector<std::string> gdalGetAllFilesInArchive(
835  const std::string& archive_path,
836  const CopyParams& copy_params);
839  GeoFileLayerInfo(const std::string& name_, GeoFileLayerContents contents_)
840  : name(name_), contents(contents_) {}
841  std::string name;
843  };
844  static std::vector<GeoFileLayerInfo> gdalGetLayersInGeoFile(
845  const std::string& file_name,
846  const CopyParams& copy_params);
848  return loader->getCatalog();
849  }
850  static void set_geo_physical_import_buffer(
851  const Catalog_Namespace::Catalog& catalog,
852  const ColumnDescriptor* cd,
853  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
854  size_t& col_idx,
855  std::vector<double>& coords,
856  std::vector<double>& bounds,
857  std::vector<int>& ring_sizes,
858  std::vector<int>& poly_rings,
859  const bool force_null = false);
861  const Catalog_Namespace::Catalog& catalog,
862  const ColumnDescriptor* cd,
863  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
864  size_t& col_idx,
865  std::vector<std::vector<double>>& coords_column,
866  std::vector<std::vector<double>>& bounds_column,
867  std::vector<std::vector<int>>& ring_sizes_column,
868  std::vector<std::vector<int>>& poly_rings_column);
869  void checkpoint(const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
870  auto getLoader() const {
871  return loader.get();
872  }
873 
874  private:
875  static bool gdalStatInternal(const std::string& path,
876  const CopyParams& copy_params,
877  bool also_dir);
879  const std::string& fileName,
880  const CopyParams& copy_params);
881 
882  ImportStatus importGDALGeo(const std::map<std::string, std::string>& colname_to_src,
883  const Catalog_Namespace::SessionInfo* session_info);
885 
886  static const std::list<ColumnDescriptor> gdalToColumnDescriptorsGeo(
887  const std::string& fileName,
888  const std::string& geoColumnName,
889  const CopyParams& copy_params);
890  static const std::list<ColumnDescriptor> gdalToColumnDescriptorsRaster(
891  const std::string& fileName,
892  const std::string& geoColumnName,
893  const CopyParams& copy_params);
894 
895  std::string import_id;
896  size_t file_size;
897  size_t max_threads;
898  char* buffer[2];
899  std::vector<std::vector<std::unique_ptr<TypedImportBuffer>>> import_buffers_vec;
900  std::unique_ptr<Loader> loader;
901  std::unique_ptr<bool[]> is_array_a;
902  static std::mutex init_gdal_mutex;
903 };
904 
905 std::vector<std::unique_ptr<TypedImportBuffer>> setup_column_loaders(
906  const TableDescriptor* td,
907  Loader* loader);
908 
909 std::vector<std::unique_ptr<TypedImportBuffer>> fill_missing_columns(
911  Fragmenter_Namespace::InsertData& insert_data);
912 
913 std::unique_ptr<AbstractImporter> create_importer(
915  const TableDescriptor* td,
916  const std::string& copy_from_source,
917  const import_export::CopyParams& copy_params);
918 
919 } // namespace import_export
920 
921 #endif // _IMPORTER_H_
std::pair< size_t, size_t > ArraySliceRange
Definition: Importer.h:74
Loader(Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr)
Definition: Importer.h:565
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:579
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:392
virtual std::vector< Catalog_Namespace::TableEpochInfo > getTableEpochs() const
Definition: Importer.cpp:4572
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
Definition: Importer.cpp:4365
ImportStatus importGDAL(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info, const bool is_raster)
Definition: Importer.cpp:5226
std::mutex loader_mutex_
Definition: Importer.h:659
const SQLTypeInfo & getTypeInfo() const
Definition: Importer.h:323
StringDictionary * getStringDictionary() const
Definition: Importer.h:327
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
Definition: Importer.cpp:3086
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
void addBigint(const int64_t v)
Definition: Importer.h:245
std::string cat(Ts &&...args)
OptionalStringVector & addStringArray()
Definition: Importer.h:264
void addSmallint(const int16_t v)
Definition: Importer.h:241
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
Definition: sqltypes.h:76
SQLTypes
Definition: sqltypes.h:65
TypedImportBuffer(const ColumnDescriptor *col_desc, StringDictionary *string_dict)
Definition: Importer.h:95
void import_compressed(std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:4086
const TableDescriptor * getTableDesc() const
Definition: Importer.h:578
static void set_geo_physical_import_buffer_columnar(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column)
Definition: Importer.cpp:1731
void dropColumns(const std::vector< int > &columns)
Definition: Importer.cpp:3055
std::vector< std::string > * string_buffer_
Definition: Importer.h:542
void addString(const std::string_view v)
Definition: Importer.h:251
std::vector< ArrayDatum > * array_buffer_
Definition: Importer.h:544
void find_best_sqltypes_and_headers()
Definition: Importer.cpp:3337
std::vector< SQLTypeInfo > getBestColumnTypes() const
Definition: Importer.cpp:3498
StringDictionary * string_dict_
Definition: Importer.h:554
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3514
std::atomic< int > nerrors
Definition: Importer.h:79
std::optional< std::vector< std::string >> OptionalStringVector
Definition: Importer.h:94
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:395
void addDouble(const double v)
Definition: Importer.h:249
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Definition: Importer.cpp:6217
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:172
std::vector< int16_t > * smallint_buffer_
Definition: Importer.h:537
const bool * get_is_array() const
Definition: Importer.h:813
std::vector< ArrayDatum > * getStringArrayDictBuffer() const
Definition: Importer.h:393
const TableDescriptor * table_desc_
Definition: Importer.h:628
virtual void checkpoint()
Definition: Importer.cpp:4564
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
Definition: Importer.cpp:6202
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > & get_import_buffers_vec()
Definition: Importer.h:807
ImportStatus importGDALGeo(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5236
std::vector< SQLTypes > best_sqltypes
Definition: Importer.h:780
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams &copy_params)
Definition: Importer.cpp:5157
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:669
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2872
const CopyParams & get_copy_params() const
Definition: Importer.h:710
std::vector< float > * float_buffer_
Definition: Importer.h:540
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
Definition: Importer.h:810
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:5018
GeoFileLayerInfo(const std::string &name_, GeoFileLayerContents contents_)
Definition: Importer.h:839
static bool gdalFileExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:5053
std::unique_ptr< AbstractImporter > create_importer(Catalog_Namespace::Catalog &catalog, const TableDescriptor *td, const std::string &copy_from_source, const import_export::CopyParams &copy_params)
Definition: Importer.cpp:6287
std::vector< double > * double_buffer_
Definition: Importer.h:541
void addFloat(const float v)
Definition: Importer.h:247
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:631
std::vector< std::string > * getStringBuffer() const
Definition: Importer.h:383
size_t add_values(const ColumnDescriptor *cd, const TColumn &data)
Definition: Importer.cpp:977
DataStreamSink(const CopyParams &copy_params, const std::string file_path)
Definition: Importer.h:696
ImportStatus & operator+=(const ImportStatus &is)
Definition: Importer.h:681
constexpr double a
Definition: Utm.h:32
void addStringArray(const OptionalStringVector &arr)
Definition: Importer.h:269
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool force_null=false)
Definition: Importer.cpp:1636
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:229
This file contains the class specification and related data structures for Catalog.
void addGeoString(const std::string_view v)
Definition: Importer.h:260
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info)=0
static SQLTypes detect_sqltype(const std::string &str)
Definition: Importer.cpp:3216
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
Definition: Importer.cpp:3413
auto del_values(std::vector< DATA_TYPE > &buffer, BadRowsTracker *const bad_rows_tracker)
void setAddingColumns(const bool adding_columns)
Definition: Importer.h:605
std::vector< std::unique_ptr< TypedImportBuffer >> OneShardBuffers
Definition: Importer.h:619
std::vector< int32_t > * int_buffer_
Definition: Importer.h:538
std::vector< ArrayDatum > * string_array_dict_buffer_
Definition: Importer.h:551
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2937
CONSTEXPR DEVICE bool is_null(const T &value)
void addBoolean(const int8_t v)
Definition: Importer.h:237
auto getLoader() const
Definition: Importer.h:870
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
std::vector< uint8_t > * string_dict_i8_buffer_
Definition: Importer.h:548
void addDictEncodedStringArray(const std::vector< OptionalStringVector > &string_array_vec)
Definition: Importer.h:275
void addTinyint(const int8_t v)
Definition: Importer.h:239
std::vector< OptionalStringVector > * string_array_buffer_
Definition: Importer.h:545
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
Definition: Importer.cpp:4625
std::vector< int64_t > * bigint_buffer_
Definition: Importer.h:539
int8_t * getAsBytes() const
Definition: Importer.h:329
std::string error_msg_
Definition: Importer.h:660
void addInt(const int32_t v)
Definition: Importer.h:243
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:847
std::vector< std::vector< std::string > > raw_rows
Definition: Importer.h:739
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2684
specifies the content in-memory of a row in the column metadata table
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
Definition: Importer.cpp:2739
DEVICE Array()
Definition: heavydbTypes.h:477
bool isAddingColumns() const
Definition: Importer.h:606
std::vector< EncodingType > best_encodings
Definition: Importer.h:781
std::vector< int8_t > * bool_buffer_
Definition: Importer.h:535
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899
bool g_enable_smem_group_by true
boost::filesystem::path file_path
Definition: Importer.h:774
size_t getElementSize() const
Definition: Importer.h:356
int8_t * getStringDictBuffer() const
Definition: Importer.h:397
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:5058
std::set< int64_t > rows
Definition: Importer.h:78
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:901
std::vector< std::unique_ptr< TypedImportBuffer > > * import_buffers
Definition: Importer.h:530
bool checkpoint() noexcept
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:240
Definition: sqltypes.h:79
Definition: sqltypes.h:80
Detector(const boost::filesystem::path &fp, CopyParams &cp)
Definition: Importer.cpp:3670
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4583
virtual bool loadNoCheckpoint(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2677
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
std::vector< int32_t > * string_dict_i32_buffer_
Definition: Importer.h:550
static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b)
Definition: Importer.cpp:3313
std::list< const ColumnDescriptor * > column_descs_
Definition: Importer.h:629
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsGeo(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4904
static ArrayDatum composeNullPointCoords(const SQLTypeInfo &coords_ti, const SQLTypeInfo &geo_ti)
Definition: Importer.cpp:399
void addArray(const ArrayDatum &v)
Definition: Importer.h:262
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2998
Catalog_Namespace::Catalog & getCatalog() const
Definition: Importer.h:577
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3560
std::string getErrorMessage()
Definition: Importer.h:608
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::chrono::duration< double > timeout
Definition: Importer.h:775
Definition: sqltypes.h:68
std::vector< std::string > * getGeoStringBuffer() const
Definition: Importer.h:385
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
Definition: Importer.cpp:3464
bool detect_headers(const std::vector< SQLTypes > &first_types, const std::vector< SQLTypes > &rest_types)
Definition: Importer.cpp:3449
#define IS_STRING(T)
Definition: sqltypes.h:309
void addDictStringWithTruncation(std::string_view v)
Definition: Importer.h:253
std::string import_id
Definition: Importer.h:895
const ColumnDescriptor * column_desc_
Definition: Importer.h:553
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
Definition: Importer.cpp:874
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3524
std::chrono::steady_clock::time_point start
Definition: Importer.h:664
std::vector< uint16_t > * string_dict_i16_buffer_
Definition: Importer.h:549
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2905
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsRaster(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4832
std::vector< SQLTypes > detect_column_types(const std::vector< std::string > &row)
Definition: Importer.cpp:3305
std::vector< std::string > get_headers()
Definition: Importer.cpp:3479
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:627
const CopyParams & get_copy_params() const
Definition: Importer.h:800
bool g_enable_watchdog false
Definition: Execute.cpp:80
std::vector< int8_t > * tinyint_buffer_
Definition: Importer.h:536
ImportStatus importGDALRaster(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5524
#define CHECK(condition)
Definition: Logger.h:291
std::string raw_data
Definition: Importer.h:773
static ImportStatus get_import_status(const std::string &id)
Definition: Importer.cpp:231
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
const ColumnDescriptor * getColumnDesc() const
Definition: Importer.h:325
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
Definition: Importer.h:583
void distributeToShardsExistingColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2813
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const bool is_raster, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4820
static std::vector< std::string > gdalGetAllFilesInArchive(const std::string &archive_path, const CopyParams &copy_params)
Definition: Importer.cpp:5130
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
static constexpr size_t MAX_STRLEN
void addDefaultValues(const ColumnDescriptor *cd, size_t num_rows)
Definition: Importer.cpp:1447
Definition: sqltypes.h:72
SQLTypeInfo columnType
std::vector< OptionalStringVector > * getStringArrayBuffer() const
Definition: Importer.h:389
bool is_string() const
Definition: sqltypes.h:561
LoadCallbackType load_callback_
Definition: Importer.h:630
void distributeToShardsNewColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:2851
constexpr double n
Definition: Utm.h:38
std::shared_timed_mutex shared_mutex
static std::mutex init_gdal_mutex
Definition: Importer.h:902
std::vector< size_t > file_offsets
Definition: Importer.h:725
void add_value(const ColumnDescriptor *cd, const std::string_view val, const bool is_null, const CopyParams &copy_params, const bool check_not_null=true)
Definition: Importer.cpp:529
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:632
std::vector< std::string > * geo_string_buffer_
Definition: Importer.h:543
std::chrono::steady_clock::time_point end
Definition: Importer.h:665
std::vector< ArrayDatum > * getArrayBuffer() const
Definition: Importer.h:387
heavyai::shared_mutex import_mutex_
Definition: Importer.h:723
std::unique_ptr< Loader > loader
Definition: Importer.h:900
std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)> LoadCallbackType
Definition: Importer.h:561
void addDictEncodedString(const std::vector< std::string > &string_vec)
Definition: Importer.cpp:489
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:801
const std::string file_path
Definition: Importer.h:720
virtual void setTableEpochs(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:4577