27 #include <boost/filesystem.hpp>
28 #include <boost/noncopyable.hpp>
29 #include <boost/tokenizer.hpp>
30 #include <condition_variable>
40 #include <string_view>
47 #if defined(ENABLE_IMPORT_PARQUET)
59 #include <boost/geometry/index/rtree.hpp>
70 namespace import_export {
276 const std::vector<OptionalStringVector>& string_array_vec) {
280 for (
auto& p : string_array_vec) {
284 for (
const auto& str : *p) {
286 throw std::runtime_error(
"String too long for dictionary encoding.");
293 std::vector<std::vector<std::string_view>> string_view_array_vec;
294 for (
auto& p : string_array_vec) {
298 auto& array = string_view_array_vec.emplace_back();
299 for (
const auto& str : *p) {
300 array.emplace_back(str);
304 std::vector<std::vector<int32_t>> ids_array(0);
308 for (i = 0, j = 0; i < string_array_vec.size(); ++i) {
309 if (!string_array_vec[i]) {
313 auto& p = ids_array[j++];
314 size_t len = p.size() *
sizeof(int32_t);
316 memcpy(
a, &p[0], len);
318 ArrayDatum(len, reinterpret_cast<int8_t*>(
a),
false));
338 return reinterpret_cast<int8_t*
>(
int_buffer_->data());
359 return sizeof((*bool_buffer_)[0]);
361 return sizeof((*tinyint_buffer_)[0]);
363 return sizeof((*smallint_buffer_)[0]);
365 return sizeof((*int_buffer_)[0]);
369 return sizeof((*bigint_buffer_)[0]);
371 return sizeof((*float_buffer_)[0]);
373 return sizeof((*double_buffer_)[0]);
377 return sizeof((*bigint_buffer_)[0]);
500 const arrow::Array& data,
501 const bool exact_type_match,
506 const std::string_view val,
509 const bool check_not_null =
true);
517 template <
typename DATA_TYPE>
519 const arrow::Array& array,
520 std::vector<DATA_TYPE>& buffer,
523 template <
typename DATA_TYPE>
528 const std::vector<std::unique_ptr<TypedImportBuffer>>&
import_buffers);
559 std::function<bool(const std::vector<std::unique_ptr<TypedImportBuffer>>&,
560 std::vector<DataBlockPtr>&,
593 virtual bool load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
594 const size_t row_count,
597 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
598 const size_t row_count,
601 virtual std::vector<Catalog_Namespace::TableEpochInfo>
getTableEpochs()
const;
603 const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
614 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
621 std::vector<size_t>& all_shard_row_counts,
623 const size_t row_count,
624 const size_t shard_count,
635 bool loadToShard(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
641 std::vector<OneShardBuffers>& all_shard_import_buffers,
642 std::vector<size_t>& all_shard_row_counts,
644 const size_t row_count,
645 const size_t shard_count,
648 std::vector<OneShardBuffers>& all_shard_import_buffers,
649 std::vector<size_t>& all_shard_row_counts,
651 const size_t row_count,
652 const size_t shard_count,
664 std::chrono::steady_clock::time_point
start;
665 std::chrono::steady_clock::time_point
end;
669 std::chrono::duration<size_t, std::milli>
elapsed;
674 :
start(std::chrono::steady_clock::now())
697 : copy_params(copy_params), file_path(file_path) {}
701 const bool decompressed,
703 #ifdef ENABLE_IMPORT_PARQUET
704 virtual void import_parquet(std::vector<std::string>& file_paths,
706 virtual void import_local_parquet(
733 #ifdef ENABLE_IMPORT_PARQUET
734 void import_local_parquet(
const std::string&
file_path,
754 const std::vector<std::vector<std::string>>&
raw_rows,
757 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
758 const std::vector<std::vector<std::string>>::const_iterator& row_end,
762 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
763 const std::vector<std::vector<std::string>>::const_iterator& row_end,
764 const std::vector<SQLTypes>& best_types);
767 const std::vector<SQLTypes>& rest_types);
770 const std::string& file_path,
771 const bool decompressed,
777 #if defined(ENABLE_IMPORT_PARQUET)
778 std::optional<foreign_storage::DataPreview> data_preview_;
788 const std::string&
f,
795 const bool decompressed,
799 const bool is_raster);
802 return loader->get_column_descs();
804 void load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
814 #ifdef ENABLE_IMPORT_PARQUET
815 void import_local_parquet(
const std::string& file_path,
821 const std::string& fileName,
822 const bool is_raster,
823 const std::string& geoColumnName,
826 const std::string& fileName,
827 const std::string& geoColumnName,
828 std::map<std::string, std::vector<std::string>>& metadata,
835 const std::string& archive_path,
845 const std::string& file_name,
848 return loader->getCatalog();
853 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
855 std::vector<double>& coords,
856 std::vector<double>& bounds,
857 std::vector<int>& ring_sizes,
858 std::vector<int>& poly_rings,
859 const bool force_null =
false);
863 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
865 std::vector<std::vector<double>>& coords_column,
866 std::vector<std::vector<double>>& bounds_column,
867 std::vector<std::vector<int>>& ring_sizes_column,
868 std::vector<std::vector<int>>& poly_rings_column);
869 void checkpoint(
const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs);
879 const std::string& fileName,
887 const std::string& fileName,
888 const std::string& geoColumnName,
891 const std::string& fileName,
892 const std::string& geoColumnName,
916 const std::string& copy_from_source,
921 #endif // _IMPORTER_H_
std::pair< size_t, size_t > ArraySliceRange
Loader(Catalog_Namespace::Catalog &c, const TableDescriptor *t, LoadCallbackType load_callback=nullptr)
const std::list< const ColumnDescriptor * > & get_column_descs() const
HOST DEVICE SQLTypes get_subtype() const
virtual std::vector< Catalog_Namespace::TableEpochInfo > getTableEpochs() const
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
ImportStatus importGDAL(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info, const bool is_raster)
const SQLTypeInfo & getTypeInfo() const
StringDictionary * getStringDictionary() const
ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
HOST DEVICE int get_size() const
void addBigint(const int64_t v)
OptionalStringVector & addStringArray()
void addSmallint(const int16_t v)
class for a per-database catalog. also includes metadata for the current database and the current use...
TypedImportBuffer(const ColumnDescriptor *col_desc, StringDictionary *string_dict)
void import_compressed(std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
void detect_row_delimiter()
virtual ~DataStreamSink()
const TableDescriptor * getTableDesc() const
static void set_geo_physical_import_buffer_columnar(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column)
void dropColumns(const std::vector< int > &columns)
std::vector< std::string > * string_buffer_
void addString(const std::string_view v)
std::vector< ArrayDatum > * array_buffer_
void find_best_sqltypes_and_headers()
std::vector< SQLTypeInfo > getBestColumnTypes() const
StringDictionary * string_dict_
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
std::atomic< int > nerrors
std::optional< std::vector< std::string >> OptionalStringVector
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
void addDouble(const double v)
std::vector< std::unique_ptr< TypedImportBuffer > > fill_missing_columns(const Catalog_Namespace::Catalog *cat, Fragmenter_Namespace::InsertData &insert_data)
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
std::vector< int16_t > * smallint_buffer_
const bool * get_is_array() const
std::vector< ArrayDatum > * getStringArrayDictBuffer() const
const TableDescriptor * table_desc_
virtual void checkpoint()
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > & get_import_buffers_vec()
ImportStatus importGDALGeo(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
std::vector< SQLTypes > best_sqltypes
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams ©_params)
std::chrono::duration< size_t, std::milli > elapsed
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
const CopyParams & get_copy_params() const
std::vector< float > * float_buffer_
HOST DEVICE SQLTypes get_type() const
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
static bool gdalStatInternal(const std::string &path, const CopyParams ©_params, bool also_dir)
ImportStatus import_status_
GeoFileLayerInfo(const std::string &name_, GeoFileLayerContents contents_)
static bool gdalFileExists(const std::string &path, const CopyParams ©_params)
std::unique_ptr< AbstractImporter > create_importer(Catalog_Namespace::Catalog &catalog, const TableDescriptor *td, const std::string ©_from_source, const import_export::CopyParams ©_params)
std::vector< double > * double_buffer_
void addFloat(const float v)
Fragmenter_Namespace::InsertData insert_data_
std::vector< std::string > * getStringBuffer() const
size_t add_values(const ColumnDescriptor *cd, const TColumn &data)
DataStreamSink(const CopyParams ©_params, const std::string file_path)
ImportStatus & operator+=(const ImportStatus &is)
void addStringArray(const OptionalStringVector &arr)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool force_null=false)
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
GeoFileLayerContents contents
This file contains the class specification and related data structures for Catalog.
void addGeoString(const std::string_view v)
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info)=0
static SQLTypes detect_sqltype(const std::string &str)
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
auto del_values(std::vector< DATA_TYPE > &buffer, BadRowsTracker *const bad_rows_tracker)
void setAddingColumns(const bool adding_columns)
std::vector< std::unique_ptr< TypedImportBuffer >> OneShardBuffers
std::vector< int32_t > * int_buffer_
std::vector< ArrayDatum > * string_array_dict_buffer_
static std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
CONSTEXPR DEVICE bool is_null(const T &value)
void addBoolean(const int8_t v)
void * checked_malloc(const size_t size)
std::vector< uint8_t > * string_dict_i8_buffer_
void addDictEncodedStringArray(const std::vector< OptionalStringVector > &string_array_vec)
void addTinyint(const int8_t v)
std::vector< OptionalStringVector > * string_array_buffer_
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams ©_params)
std::vector< int64_t > * bigint_buffer_
int8_t * getAsBytes() const
bool stringDictCheckpoint()
void addInt(const int32_t v)
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
Catalog_Namespace::Catalog & getCatalog()
void find_best_sqltypes()
std::vector< std::vector< std::string > > raw_rows
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
specifies the content in-memory of a row in the column metadata table
void fillShardRow(const size_t row_index, OneShardBuffers &shard_output_buffers, const OneShardBuffers &import_buffers)
bool isAddingColumns() const
std::vector< EncodingType > best_encodings
std::vector< int8_t > * bool_buffer_
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
std::mutex file_offsets_mutex
bool g_enable_smem_group_by true
boost::filesystem::path file_path
size_t getElementSize() const
int8_t * getStringDictBuffer() const
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams ©_params)
std::unique_ptr< bool[]> is_array_a
std::vector< std::unique_ptr< TypedImportBuffer > > * import_buffers
bool checkpoint() noexcept
static void set_import_status(const std::string &id, const ImportStatus is)
Detector(const boost::filesystem::path &fp, CopyParams &cp)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams ©_params)
virtual bool loadNoCheckpoint(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
HOST DEVICE EncodingType get_compression() const
std::vector< int32_t > * string_dict_i32_buffer_
static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b)
std::list< const ColumnDescriptor * > column_descs_
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsGeo(const std::string &fileName, const std::string &geoColumnName, const CopyParams ©_params)
static ArrayDatum composeNullPointCoords(const SQLTypeInfo &coords_ti, const SQLTypeInfo &geo_ti)
void addArray(const ArrayDatum &v)
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
Catalog_Namespace::Catalog & getCatalog() const
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
std::string getErrorMessage()
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::chrono::duration< double > timeout
std::vector< std::string > * getGeoStringBuffer() const
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
bool detect_headers(const std::vector< SQLTypes > &first_types, const std::vector< SQLTypes > &rest_types)
void addDictStringWithTruncation(std::string_view v)
const ColumnDescriptor * column_desc_
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
std::chrono::steady_clock::time_point start
std::vector< uint16_t > * string_dict_i16_buffer_
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint, const Catalog_Namespace::SessionInfo *session_info)
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsRaster(const std::string &fileName, const std::string &geoColumnName, const CopyParams ©_params)
std::vector< SQLTypes > detect_column_types(const std::vector< std::string > &row)
std::vector< std::string > get_headers()
Catalog_Namespace::Catalog & catalog_
const CopyParams & get_copy_params() const
bool g_enable_watchdog false
std::vector< int8_t > * tinyint_buffer_
ImportStatus importGDALRaster(const Catalog_Namespace::SessionInfo *session_info)
static ImportStatus get_import_status(const std::string &id)
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
const ColumnDescriptor * getColumnDesc() const
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
void distributeToShardsExistingColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const bool is_raster, const std::string &geoColumnName, const CopyParams ©_params)
static std::vector< std::string > gdalGetAllFilesInArchive(const std::string &archive_path, const CopyParams ©_params)
The data to be inserted using the fragment manager.
static constexpr size_t MAX_STRLEN
void addDefaultValues(const ColumnDescriptor *cd, size_t num_rows)
std::vector< OptionalStringVector > * getStringArrayBuffer() const
LoadCallbackType load_callback_
void distributeToShardsNewColumns(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count, const Catalog_Namespace::SessionInfo *session_info)
std::shared_timed_mutex shared_mutex
static std::mutex init_gdal_mutex
std::vector< size_t > file_offsets
void add_value(const ColumnDescriptor *cd, const std::string_view val, const bool is_null, const CopyParams ©_params, const bool check_not_null=true)
std::map< int, StringDictionary * > dict_map_
std::vector< std::string > * geo_string_buffer_
std::chrono::steady_clock::time_point end
std::vector< ArrayDatum > * getArrayBuffer() const
heavyai::shared_mutex import_mutex_
std::unique_ptr< Loader > loader
std::function< bool(const std::vector< std::unique_ptr< TypedImportBuffer >> &, std::vector< DataBlockPtr > &, size_t)> LoadCallbackType
void addDictEncodedString(const std::vector< std::string > &string_vec)
const std::list< const ColumnDescriptor * > & get_column_descs() const
const std::string file_path
virtual void setTableEpochs(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)