OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::Importer Class Reference

#include <Importer.h>

+ Inheritance diagram for import_export::Importer:
+ Collaboration diagram for import_export::Importer:

Classes

struct  GeoFileLayerInfo
 

Public Types

enum  GeoFileLayerContents { GeoFileLayerContents::EMPTY, GeoFileLayerContents::GEO, GeoFileLayerContents::NON_GEO, GeoFileLayerContents::UNSUPPORTED_GEO }
 

Public Member Functions

 Importer (Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
 
 Importer (Loader *providedLoader, const std::string &f, const CopyParams &p)
 
 ~Importer () override
 
ImportStatus import (const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importDelimited (const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info) override
 
ImportStatus importGDAL (const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info, const bool is_raster)
 
const CopyParamsget_copy_params () const
 
const std::list< const
ColumnDescriptor * > & 
get_column_descs () const
 
void load (const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > & 
get_import_buffers_vec ()
 
std::vector< std::unique_ptr
< TypedImportBuffer > > & 
get_import_buffers (int i)
 
const bool * get_is_array () const
 
Catalog_Namespace::CataloggetCatalog ()
 
void checkpoint (const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
 
auto getLoader () const
 
- Public Member Functions inherited from import_export::DataStreamSink
 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
 
- Public Member Functions inherited from import_export::AbstractImporter
virtual ~AbstractImporter ()=default
 

Static Public Member Functions

static ImportStatus get_import_status (const std::string &id)
 
static void set_import_status (const std::string &id, const ImportStatus is)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptors (const std::string &fileName, const bool is_raster, const std::string &geoColumnName, const CopyParams &copy_params)
 
static void readMetadataSampleGDAL (const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
 
static bool gdalFileExists (const std::string &path, const CopyParams &copy_params)
 
static bool gdalFileOrDirectoryExists (const std::string &path, const CopyParams &copy_params)
 
static std::vector< std::string > gdalGetAllFilesInArchive (const std::string &archive_path, const CopyParams &copy_params)
 
static std::vector
< GeoFileLayerInfo
gdalGetLayersInGeoFile (const std::string &file_name, const CopyParams &copy_params)
 
static void set_geo_physical_import_buffer (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool force_null=false)
 
static void set_geo_physical_import_buffer_columnar (const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column)
 

Private Member Functions

ImportStatus importGDALGeo (const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
 
ImportStatus importGDALRaster (const Catalog_Namespace::SessionInfo *session_info)
 

Static Private Member Functions

static bool gdalStatInternal (const std::string &path, const CopyParams &copy_params, bool also_dir)
 
static
Geospatial::GDAL::DataSourceUqPtr 
openGDALDataSource (const std::string &fileName, const CopyParams &copy_params)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptorsGeo (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 
static const std::list
< ColumnDescriptor
gdalToColumnDescriptorsRaster (const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
 

Private Attributes

std::string import_id
 
size_t file_size
 
size_t max_threads
 
char * buffer [2]
 
std::vector< std::vector
< std::unique_ptr
< TypedImportBuffer > > > 
import_buffers_vec
 
std::unique_ptr< Loaderloader
 
std::unique_ptr< bool[]> is_array_a
 

Static Private Attributes

static std::mutex init_gdal_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from import_export::DataStreamSink
ImportStatus archivePlumber (const Catalog_Namespace::SessionInfo *session_info)
 
- Protected Attributes inherited from import_export::DataStreamSink
CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status_
 
heavyai::shared_mutex import_mutex_
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 784 of file Importer.h.

Member Enumeration Documentation

Enumerator
EMPTY 
GEO 
NON_GEO 
UNSUPPORTED_GEO 

Definition at line 837 of file Importer.h.

837 { EMPTY, GEO, NON_GEO, UNSUPPORTED_GEO };

Constructor & Destructor Documentation

import_export::Importer::Importer ( Catalog_Namespace::Catalog c,
const TableDescriptor t,
const std::string &  f,
const CopyParams p 
)

Definition at line 172 of file Importer.cpp.

176  : Importer(new Loader(c, t), f, p) {}
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:172
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
import_export::Importer::Importer ( Loader providedLoader,
const std::string &  f,
const CopyParams p 
)

Definition at line 178 of file Importer.cpp.

References buffer, import_export::DataStreamSink::file_path, file_size, import_id, is_array_a, kARRAY, loader, max_threads, and import_export::DataStreamSink::p_file.

179  : DataStreamSink(p, f), loader(providedLoader) {
180  import_id = boost::filesystem::path(file_path).filename().string();
181  file_size = 0;
182  max_threads = 0;
183  p_file = nullptr;
184  buffer[0] = nullptr;
185  buffer[1] = nullptr;
186  // we may be overallocating a little more memory here due to dropping phy cols.
187  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
188  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
189  int i = 0;
190  bool has_array = false;
191  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
192  int skip_physical_cols = 0;
193  for (auto& p : loader->get_column_descs()) {
194  // phy geo columns can't be in input file
195  if (skip_physical_cols-- > 0) {
196  continue;
197  }
198  // neither are rowid or $deleted$
199  // note: columns can be added after rowid/$deleted$
200  if (p->isVirtualCol || p->isDeletedCol) {
201  continue;
202  }
203  skip_physical_cols = p->columnType.get_physical_cols();
204  if (p->columnType.get_type() == kARRAY) {
205  is_array.get()[i] = true;
206  has_array = true;
207  } else {
208  is_array.get()[i] = false;
209  }
210  ++i;
211  }
212  if (has_array) {
213  is_array_a = std::unique_ptr<bool[]>(is_array.release());
214  } else {
215  is_array_a = std::unique_ptr<bool[]>(nullptr);
216  }
217 }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:901
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::string import_id
Definition: Importer.h:895
std::unique_ptr< Loader > loader
Definition: Importer.h:900
const std::string file_path
Definition: Importer.h:720
import_export::Importer::~Importer ( )
override

Definition at line 219 of file Importer.cpp.

References buffer, and import_export::DataStreamSink::p_file.

219  {
220  if (p_file != nullptr) {
221  fclose(p_file);
222  }
223  if (buffer[0] != nullptr) {
224  free(buffer[0]);
225  }
226  if (buffer[1] != nullptr) {
227  free(buffer[1]);
228  }
229 }

Member Function Documentation

void import_export::Importer::checkpoint ( const std::vector< Catalog_Namespace::TableEpochInfo > &  table_epochs)

Definition at line 3524 of file Importer.cpp.

References DEBUG_TIMING, Data_Namespace::DISK_LEVEL, logger::ERROR, measure< TimeT >::execution(), StorageType::FOREIGN_TABLE, import_buffers_vec, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, logger::INFO, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, and LOG.

Referenced by importDelimited(), importGDALGeo(), and importGDALRaster().

3525  {
3526  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3529  // rollback to starting epoch - undo all the added records
3530  loader->setTableEpochs(table_epochs);
3531  } else {
3532  loader->checkpoint();
3533  }
3534  }
3535 
3536  if (loader->getTableDesc()->persistenceLevel ==
3537  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3538  // tables
3539  auto ms = measure<>::execution([&]() {
3541  if (!import_status_.load_failed) {
3542  for (auto& p : import_buffers_vec[0]) {
3543  if (!p->stringDictCheckpoint()) {
3544  LOG(ERROR) << "Checkpointing Dictionary for Column "
3545  << p->getColumnDesc()->columnName << " failed.";
3546  import_status_.load_failed = true;
3547  import_status_.load_msg = "Dictionary checkpoint failed";
3548  break;
3549  }
3550  }
3551  }
3552  });
3553  if (DEBUG_TIMING) {
3554  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3555  << std::endl;
3556  }
3557  }
3558 }
std::lock_guard< T > lock_guard
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:285
#define DEBUG_TIMING
Definition: Importer.cpp:154
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899
static constexpr char const * FOREIGN_TABLE
heavyai::shared_mutex import_mutex_
Definition: Importer.h:723
std::unique_ptr< Loader > loader
Definition: Importer.h:900

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 5053 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::check_geospatial_files(), DBHandler::detect_column_types(), DBHandler::get_all_files_in_archive(), DBHandler::get_first_geo_file_in_archive(), DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5053  {
5054  return gdalStatInternal(path, copy_params, false);
5055 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:5018

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalFileOrDirectoryExists ( const std::string &  path,
const CopyParams copy_params 
)
static

Definition at line 5058 of file Importer.cpp.

References gdalStatInternal().

Referenced by DBHandler::detect_column_types(), DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5059  {
5060  return gdalStatInternal(path, copy_params, true);
5061 }
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:5018

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< std::string > import_export::Importer::gdalGetAllFilesInArchive ( const std::string &  archive_path,
const CopyParams copy_params 
)
static

Definition at line 5130 of file Importer.cpp.

References import_export::gdalGatherFilesInArchiveRecursive(), Geospatial::GDAL::init(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, and Geospatial::GDAL::setAuthorizationTokens().

Referenced by anonymous_namespace{DBHandler.cpp}::find_first_geo_file_in_archive(), and DBHandler::get_all_files_in_archive().

5132  {
5133  // lazy init GDAL
5140 
5141  // prepare to gather files
5142  std::vector<std::string> files;
5143 
5144  // gather the files recursively
5145  gdalGatherFilesInArchiveRecursive(archive_path, files);
5146 
5147  // convert to relative paths inside archive
5148  for (auto& file : files) {
5149  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5150  }
5151 
5152  // done
5153  return files;
5154 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:5063
std::string s3_session_token
Definition: CopyParams.h:63
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< Importer::GeoFileLayerInfo > import_export::Importer::gdalGetLayersInGeoFile ( const std::string &  file_name,
const CopyParams copy_params 
)
static

Definition at line 5157 of file Importer.cpp.

References CHECK, EMPTY, GEO, Geospatial::GDAL::init(), NON_GEO, openGDALDataSource(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, Geospatial::GDAL::setAuthorizationTokens(), and UNSUPPORTED_GEO.

Referenced by DBHandler::get_layers_in_geo_file(), and DBHandler::importGeoTableSingle().

5159  {
5160  // lazy init GDAL
5167 
5168  // prepare to gather layer info
5169  std::vector<GeoFileLayerInfo> layer_info;
5170 
5171  // open the data set
5173  if (poDS == nullptr) {
5174  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5175  file_name);
5176  }
5177 
5178  // enumerate the layers
5179  for (auto&& poLayer : poDS->GetLayers()) {
5181  // prepare to read this layer
5182  poLayer->ResetReading();
5183  // skip layer if empty
5184  if (poLayer->GetFeatureCount() > 0) {
5185  // first read layer geo type
5186  auto ogr_type = wkbFlatten(poLayer->GetGeomType());
5187  if (ogr_type == wkbUnknown) {
5188  // layer geo type unknown, so try reading from the first feature
5189  Geospatial::GDAL::FeatureUqPtr first_feature(poLayer->GetNextFeature());
5190  CHECK(first_feature);
5191  auto const* ogr_geometry = first_feature->GetGeometryRef();
5192  if (ogr_geometry) {
5193  ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
5194  } else {
5195  ogr_type = wkbNone;
5196  }
5197  }
5198  switch (ogr_type) {
5199  case wkbNone:
5200  // no geo
5201  contents = GeoFileLayerContents::NON_GEO;
5202  break;
5203  case wkbPoint:
5204  case wkbMultiPoint:
5205  case wkbLineString:
5206  case wkbMultiLineString:
5207  case wkbPolygon:
5208  case wkbMultiPolygon:
5209  // layer has supported geo
5210  contents = GeoFileLayerContents::GEO;
5211  break;
5212  default:
5213  // layer has unsupported geometry
5215  break;
5216  }
5217  }
5218  // store info for this layer
5219  layer_info.emplace_back(poLayer->GetName(), contents);
5220  }
5221 
5222  // done
5223  return layer_info;
5224 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4583
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
std::string s3_session_token
Definition: CopyParams.h:63
#define CHECK(condition)
Definition: Logger.h:291
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool import_export::Importer::gdalStatInternal ( const std::string &  path,
const CopyParams copy_params,
bool  also_dir 
)
staticprivate

Definition at line 5018 of file Importer.cpp.

References Geospatial::GDAL::init(), run_benchmark_import::result, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, and Geospatial::GDAL::setAuthorizationTokens().

Referenced by gdalFileExists(), and gdalFileOrDirectoryExists().

5020  {
5021  // lazy init GDAL
5028 
5029 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5030  // clear GDAL stat cache
5031  // without this, file existence will be cached, even if authentication changes
5032  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
5033  VSICurlClearCache();
5034 #endif
5035 
5036  // stat path
5037  VSIStatBufL sb;
5038  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5039  if (result < 0) {
5040  return false;
5041  }
5042 
5043  // exists?
5044  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5045  return true;
5046  } else if (VSI_ISREG(sb.st_mode)) {
5047  return true;
5048  }
5049  return false;
5050 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
std::string s3_session_token
Definition: CopyParams.h:63
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptors ( const std::string &  fileName,
const bool  is_raster,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
static

Definition at line 4820 of file Importer.cpp.

References gdalToColumnDescriptorsGeo(), and gdalToColumnDescriptorsRaster().

Referenced by DBHandler::detect_column_types().

4824  {
4825  if (is_raster) {
4826  return gdalToColumnDescriptorsRaster(file_name, geo_column_name, copy_params);
4827  }
4828  return gdalToColumnDescriptorsGeo(file_name, geo_column_name, copy_params);
4829 }
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsGeo(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4904
static const std::list< ColumnDescriptor > gdalToColumnDescriptorsRaster(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4832

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptorsGeo ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4904 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, CHECK, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::CopyParams::geo_coords_comp_param, import_export::CopyParams::geo_coords_encoding, import_export::CopyParams::geo_coords_srid, import_export::CopyParams::geo_coords_type, import_export::CopyParams::geo_explode_collections, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), kARRAY, kENCODING_DICT, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, kTEXT, import_export::anonymous_namespace{Importer.cpp}::ogr_to_type(), openGDALDataSource(), import_export::parse_add_metadata_columns(), import_export::PROMOTE_LINESTRING_TO_MULTILINESTRING, import_export::PROMOTE_POINT_TO_MULTIPOINT, import_export::PROMOTE_POLYGON_TO_MULTIPOLYGON, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), and ColumnDescriptor::sourceName.

Referenced by gdalToColumnDescriptors().

4907  {
4908  std::list<ColumnDescriptor> cds;
4909 
4911  if (poDS == nullptr) {
4912  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4913  file_name);
4914  }
4915  if (poDS->GetLayerCount() == 0) {
4916  throw std::runtime_error("gdalToColumnDescriptors Error: Geo file " + file_name +
4917  " has no layers");
4918  }
4919 
4920  OGRLayer& layer =
4922 
4923  layer.ResetReading();
4924  // TODO(andrewseidl): support multiple features
4925  Geospatial::GDAL::FeatureUqPtr poFeature(layer.GetNextFeature());
4926  if (poFeature == nullptr) {
4927  throw std::runtime_error("No features found in " + file_name);
4928  }
4929  // get fields as regular columns
4930  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4931  CHECK(poFDefn);
4932  int iField;
4933  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4934  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4935  auto typePair = ogr_to_type(poFieldDefn->GetType());
4936  ColumnDescriptor cd;
4937  cd.columnName = poFieldDefn->GetNameRef();
4938  cd.sourceName = poFieldDefn->GetNameRef();
4939  SQLTypeInfo ti;
4940  if (typePair.second) {
4941  ti.set_type(kARRAY);
4942  ti.set_subtype(typePair.first);
4943  } else {
4944  ti.set_type(typePair.first);
4945  }
4946  if (typePair.first == kTEXT) {
4948  ti.set_comp_param(0);
4949  }
4950  ti.set_fixed_size();
4951  cd.columnType = ti;
4952  cds.push_back(cd);
4953  }
4954  // try getting the geo column type from the layer
4955  auto ogr_type = wkbFlatten(layer.GetGeomType());
4956  if (ogr_type == wkbUnknown) {
4957  // layer geo type unknown, so try the feature (that we already got)
4958  auto const* ogr_geometry = poFeature->GetGeometryRef();
4959  if (ogr_geometry) {
4960  ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
4961  }
4962  }
4963  // do we have a geo column?
4964  if (ogr_type != wkbNone) {
4965  ColumnDescriptor cd;
4966  cd.columnName = geo_column_name;
4967  cd.sourceName = geo_column_name;
4968 
4969  // if exploding, override any collection type to child type
4971  if (ogr_type == wkbMultiPolygon) {
4972  ogr_type = wkbPolygon;
4973  } else if (ogr_type == wkbMultiLineString) {
4974  ogr_type = wkbLineString;
4975  } else if (ogr_type == wkbMultiPoint) {
4976  ogr_type = wkbPoint;
4977  }
4978  }
4979 
4980  // convert to internal type
4981  // this will throw if the type is unsupported
4982  SQLTypes geoType = ogr_to_type(ogr_type);
4983 
4984  // promote column type? (unless exploding)
4986  if (PROMOTE_POINT_TO_MULTIPOINT && geoType == kPOINT) {
4987  geoType = kMULTIPOINT;
4988  } else if (PROMOTE_LINESTRING_TO_MULTILINESTRING && geoType == kLINESTRING) {
4989  geoType = kMULTILINESTRING;
4990  } else if (PROMOTE_POLYGON_TO_MULTIPOLYGON && geoType == kPOLYGON) {
4991  geoType = kMULTIPOLYGON;
4992  }
4993  }
4994 
4995  // build full internal type
4996  SQLTypeInfo ti;
4997  ti.set_type(geoType);
5003  cd.columnType = ti;
5004 
5005  cds.push_back(cd);
5006  }
5007 
5008  // metadata columns?
5009  auto metadata_column_infos =
5011  for (auto& mci : metadata_column_infos) {
5012  cds.push_back(std::move(mci.column_descriptor));
5013  }
5014 
5015  return cds;
5016 }
void set_compression(EncodingType c)
Definition: sqltypes.h:481
SQLTypes
Definition: sqltypes.h:65
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:471
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4602
std::string sourceName
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
std::string add_metadata_columns
Definition: CopyParams.h:94
void set_input_srid(int d)
Definition: sqltypes.h:474
void set_fixed_size()
Definition: sqltypes.h:479
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4721
static constexpr bool PROMOTE_LINESTRING_TO_MULTILINESTRING
Definition: Importer.cpp:163
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:164
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:476
void set_comp_param(int p)
Definition: sqltypes.h:482
std::string geo_layer_name
Definition: CopyParams.h:81
Definition: sqltypes.h:79
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4583
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
static constexpr bool PROMOTE_POINT_TO_MULTIPOINT
Definition: Importer.cpp:162
#define CHECK(condition)
Definition: Logger.h:291
SQLTypeInfo columnType
std::string columnName
EncodingType geo_coords_encoding
Definition: CopyParams.h:76
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:470

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list< ColumnDescriptor > import_export::Importer::gdalToColumnDescriptorsRaster ( const std::string &  fileName,
const std::string &  geoColumnName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4832 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, ColumnDescriptor::columnName, ColumnDescriptor::columnType, import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_transform(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_type(), import_export::RasterImporter::detect(), import_export::RasterImporter::getBandNamesAndSQLTypes(), import_export::RasterImporter::getPointNamesAndSQLTypes(), Geospatial::GDAL::init(), kENCODING_GEOINT, kGEOMETRY, kPOINT, import_export::parse_add_metadata_columns(), import_export::CopyParams::raster_import_bands, import_export::CopyParams::raster_import_dimensions, import_export::CopyParams::raster_point_compute_angle, import_export::CopyParams::raster_point_transform, import_export::CopyParams::raster_point_type, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, SQLTypeInfo::set_comp_param(), SQLTypeInfo::set_compression(), SQLTypeInfo::set_fixed_size(), SQLTypeInfo::set_input_srid(), SQLTypeInfo::set_output_srid(), SQLTypeInfo::set_subtype(), SQLTypeInfo::set_type(), Geospatial::GDAL::setAuthorizationTokens(), and ColumnDescriptor::sourceName.

Referenced by gdalToColumnDescriptors().

4835  {
4836  // lazy init GDAL
4843 
4844  // prepare for metadata column
4845  auto metadata_column_infos =
4847 
4848  // create a raster importer and do the detect
4849  RasterImporter raster_importer;
4850  raster_importer.detect(
4851  file_name,
4857  false,
4858  metadata_column_infos);
4859 
4860  // prepare to capture column descriptors
4861  std::list<ColumnDescriptor> cds;
4862 
4863  // get the point column info
4864  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
4865 
4866  // create the columns for the point in the specified type
4867  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
4868  ColumnDescriptor cd;
4869  cd.columnName = cd.sourceName = col_name;
4870  cd.columnType.set_type(sql_type);
4871  // hardwire other POINT attributes for now
4872  if (sql_type == kPOINT) {
4874  cd.columnType.set_input_srid(4326);
4875  cd.columnType.set_output_srid(4326);
4877  cd.columnType.set_comp_param(32);
4878  }
4879  cds.push_back(cd);
4880  }
4881 
4882  // get the names and types for the band column(s)
4883  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
4884 
4885  // add column descriptors for each band
4886  for (auto const& [band_name, sql_type] : band_names_and_types) {
4887  ColumnDescriptor cd;
4888  cd.columnName = cd.sourceName = band_name;
4889  cd.columnType.set_type(sql_type);
4891  cds.push_back(cd);
4892  }
4893 
4894  // metadata columns?
4895  for (auto& mci : metadata_column_infos) {
4896  cds.push_back(std::move(mci.column_descriptor));
4897  }
4898 
4899  // return the results
4900  return cds;
4901 }
void set_compression(EncodingType c)
Definition: sqltypes.h:481
std::string s3_secret_key
Definition: CopyParams.h:62
RasterImporter::PointType convert_raster_point_type(const import_export::RasterPointType raster_point_type)
Definition: Importer.cpp:4779
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:471
static void init()
Definition: GDAL.cpp:67
std::string raster_import_dimensions
Definition: CopyParams.h:93
std::string sourceName
std::string add_metadata_columns
Definition: CopyParams.h:94
void set_input_srid(int d)
Definition: sqltypes.h:474
RasterPointType raster_point_type
Definition: CopyParams.h:88
void set_fixed_size()
Definition: sqltypes.h:479
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
specifies the content in-memory of a row in the column metadata table
void set_output_srid(int s)
Definition: sqltypes.h:476
void set_comp_param(int p)
Definition: sqltypes.h:482
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
RasterImporter::PointTransform convert_raster_point_transform(const import_export::RasterPointTransform raster_point_transform)
Definition: Importer.cpp:4801
std::string s3_session_token
Definition: CopyParams.h:63
std::string raster_import_bands
Definition: CopyParams.h:89
SQLTypeInfo columnType
std::string s3_access_key
Definition: CopyParams.h:61
std::string columnName
RasterPointTransform raster_point_transform
Definition: CopyParams.h:91
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:470

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::list<const ColumnDescriptor*>& import_export::Importer::get_column_descs ( ) const
inline

Definition at line 801 of file Importer.h.

References loader.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

801  {
802  return loader->get_column_descs();
803  }
std::unique_ptr< Loader > loader
Definition: Importer.h:900

+ Here is the caller graph for this function:

const CopyParams& import_export::Importer::get_copy_params ( ) const
inline

Definition at line 800 of file Importer.h.

References import_export::DataStreamSink::copy_params.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

800 { return copy_params; }

+ Here is the caller graph for this function:

std::vector<std::unique_ptr<TypedImportBuffer> >& import_export::Importer::get_import_buffers ( int  i)
inline

Definition at line 810 of file Importer.h.

References import_buffers_vec.

Referenced by import_export::import_thread_delimited(), and import_export::import_thread_shapefile().

810  {
811  return import_buffers_vec[i];
812  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899

+ Here is the caller graph for this function:

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > >& import_export::Importer::get_import_buffers_vec ( )
inline

Definition at line 807 of file Importer.h.

References import_buffers_vec.

807  {
808  return import_buffers_vec;
809  }
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899
ImportStatus import_export::Importer::get_import_status ( const std::string &  id)
static

Definition at line 231 of file Importer.cpp.

References import_export::import_status_map, and import_export::status_mutex.

Referenced by DBHandler::import_table_status().

231  {
233  auto it = import_status_map.find(import_id);
234  if (it == import_status_map.end()) {
235  throw std::runtime_error("Import status not found for id: " + import_id);
236  }
237  return it->second;
238 }
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:167
std::shared_lock< T > shared_lock
static heavyai::shared_mutex status_mutex
Definition: Importer.cpp:166
std::string import_id
Definition: Importer.h:895

+ Here is the caller graph for this function:

const bool* import_export::Importer::get_is_array ( ) const
inline

Definition at line 813 of file Importer.h.

References is_array_a.

Referenced by import_export::import_thread_delimited().

813 { return is_array_a.get(); }
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:901

+ Here is the caller graph for this function:

Catalog_Namespace::Catalog& import_export::Importer::getCatalog ( )
inline

Definition at line 847 of file Importer.h.

References loader.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), and import_export::import_thread_delimited().

847  {
848  return loader->getCatalog();
849  }
std::unique_ptr< Loader > loader
Definition: Importer.h:900

+ Here is the caller graph for this function:

auto import_export::Importer::getLoader ( ) const
inline

Definition at line 870 of file Importer.h.

References loader.

870  {
871  return loader.get();
872  }
std::unique_ptr< Loader > loader
Definition: Importer.h:900
ImportStatus import_export::Importer::import ( const Catalog_Namespace::SessionInfo session_info)
overridevirtual

Implements import_export::AbstractImporter.

Definition at line 4361 of file Importer.cpp.

References import_export::DataStreamSink::archivePlumber().

4361  {
4362  return DataStreamSink::archivePlumber(session_info);
4363 }
ImportStatus archivePlumber(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3560

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importDelimited ( const std::string &  file_path,
const bool  decompressed,
const Catalog_Namespace::SessionInfo session_info 
)
overridevirtual

Implements import_export::DataStreamSink.

Definition at line 4365 of file Importer.cpp.

References threading_serial::async(), CHECK, checkpoint(), logger::ERROR, import_export::DataStreamSink::file_offsets, import_export::DataStreamSink::file_offsets_mutex, file_size, import_export::delimited_parser::find_row_end_pos(), heavyai::fopen(), Catalog_Namespace::SessionInfo::get_session_id(), Executor::getExecutor(), import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_delimited(), logger::INFO, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, import_export::num_import_threads(), import_export::DataStreamSink::p_file, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), logger::thread_id(), import_export::DataStreamSink::total_file_size, Executor::UNITARY_EXECUTOR_ID, and VLOG.

4368  {
4370  auto query_session = session_info ? session_info->get_session_id() : "";
4371 
4372  if (!p_file) {
4373  p_file = fopen(file_path.c_str(), "rb");
4374  }
4375  if (!p_file) {
4376  throw std::runtime_error("failed to open file '" + file_path +
4377  "': " + strerror(errno));
4378  }
4379 
4380  if (!decompressed) {
4381  (void)fseek(p_file, 0, SEEK_END);
4382  file_size = ftell(p_file);
4383  }
4384 
4386  VLOG(1) << "Delimited import # threads: " << max_threads;
4387 
4388  // deal with small files
4389  size_t alloc_size = copy_params.buffer_size;
4390  if (!decompressed && file_size < alloc_size) {
4391  alloc_size = file_size;
4392  }
4393 
4394  for (size_t i = 0; i < max_threads; i++) {
4395  import_buffers_vec.emplace_back();
4396  for (const auto cd : loader->get_column_descs()) {
4397  import_buffers_vec[i].emplace_back(
4398  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4399  }
4400  }
4401 
4402  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4403  size_t current_pos = 0;
4404  size_t end_pos;
4405  size_t begin_pos = 0;
4406 
4407  (void)fseek(p_file, current_pos, SEEK_SET);
4408  size_t size =
4409  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4410 
4411  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4412  loader->getTableDesc()->tableId};
4413  auto table_epochs = loader->getTableEpochs();
4415  {
4416  std::list<std::future<ImportStatus>> threads;
4417 
4418  // use a stack to track thread_ids which must not overlap among threads
4419  // because thread_id is used to index import_buffers_vec[]
4420  std::stack<size_t> stack_thread_ids;
4421  for (size_t i = 0; i < max_threads; i++) {
4422  stack_thread_ids.push(i);
4423  }
4424  // added for true row index on error
4425  size_t first_row_index_this_buffer = 0;
4426 
4427  while (size > 0) {
4428  unsigned int num_rows_this_buffer = 0;
4429  CHECK(scratch_buffer);
4430  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4431  scratch_buffer,
4432  size,
4433  copy_params,
4434  first_row_index_this_buffer,
4435  num_rows_this_buffer,
4436  p_file);
4437 
4438  // unput residual
4439  int nresidual = size - end_pos;
4440  std::unique_ptr<char[]> unbuf;
4441  if (nresidual > 0) {
4442  unbuf = std::make_unique<char[]>(nresidual);
4443  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4444  }
4445 
4446  // get a thread_id not in use
4447  auto thread_id = stack_thread_ids.top();
4448  stack_thread_ids.pop();
4449  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4450 
4451  threads.push_back(std::async(std::launch::async,
4453  thread_id,
4454  this,
4455  std::move(scratch_buffer),
4456  begin_pos,
4457  end_pos,
4458  end_pos,
4459  first_row_index_this_buffer,
4460  session_info,
4461  executor));
4462 
4463  first_row_index_this_buffer += num_rows_this_buffer;
4464 
4465  current_pos += end_pos;
4466  scratch_buffer = std::make_unique<char[]>(alloc_size);
4467  CHECK(scratch_buffer);
4468  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4469  size = nresidual +
4470  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4471 
4472  begin_pos = 0;
4473  while (threads.size() > 0) {
4474  int nready = 0;
4475  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4476  it != threads.end();) {
4477  auto& p = *it;
4478  std::chrono::milliseconds span(0);
4479  if (p.wait_for(span) == std::future_status::ready) {
4480  auto ret_import_status = p.get();
4481  {
4483  import_status_ += ret_import_status;
4484  if (ret_import_status.load_failed) {
4486  }
4487  }
4488  // sum up current total file offsets
4489  size_t total_file_offset{0};
4490  if (decompressed) {
4491  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4492  for (const auto file_offset : file_offsets) {
4493  total_file_offset += file_offset;
4494  }
4495  }
4496  // estimate number of rows per current total file offset
4497  if (decompressed ? total_file_offset : current_pos) {
4499  (decompressed ? (float)total_file_size / total_file_offset
4500  : (float)file_size / current_pos) *
4501  import_status_.rows_completed;
4502  }
4503 
4504  LOG(INFO) << "rows_completed " << import_status_.rows_completed
4505  << ", rows_estimated " << import_status_.rows_estimated
4506  << ", total_file_size " << total_file_size << ", total_file_offset "
4507  << total_file_offset;
4509  // recall thread_id for reuse
4510  stack_thread_ids.push(ret_import_status.thread_id);
4511  threads.erase(it++);
4512  ++nready;
4513  } else {
4514  ++it;
4515  }
4516  }
4517 
4518  if (nready == 0) {
4519  std::this_thread::yield();
4520  }
4521 
4522  // on eof, wait all threads to finish
4523  if (0 == size) {
4524  continue;
4525  }
4526 
4527  // keep reading if any free thread slot
4528  // this is one of the major difference from old threading model !!
4529  if (threads.size() < max_threads) {
4530  break;
4531  }
4534  break;
4535  }
4536  }
4539  import_status_.load_failed = true;
4540  // todo use better message
4541  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4542  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4543  break;
4544  }
4546  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4547  break;
4548  }
4549  }
4550 
4551  // join dangling threads in case of LOG(ERROR) above
4552  for (auto& p : threads) {
4553  p.wait();
4554  }
4555  }
4556 
4557  checkpoint(table_epochs);
4558 
4559  fclose(p_file);
4560  p_file = nullptr;
4561  return import_status_;
4562 }
std::lock_guard< T > lock_guard
std::vector< int > ChunkKey
Definition: types.h:36
#define LOG(tag)
Definition: Logger.h:285
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, size_t first_row_index_this_buffer, const Catalog_Namespace::SessionInfo *session_info, Executor *executor)
Definition: Importer.cpp:1986
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:513
future< Result > async(Fn &&fn, Args &&...args)
::FILE * fopen(const char *filename, const char *mode)
Definition: heavyai_fs.cpp:74
std::unique_lock< T > unique_lock
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
std::string get_session_id() const
Definition: SessionInfo.h:93
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:240
std::string import_id
Definition: Importer.h:895
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3524
ThreadId thread_id()
Definition: Logger.cpp:879
#define CHECK(condition)
Definition: Logger.h:291
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::FileReader *file_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
static constexpr ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
std::vector< size_t > file_offsets
Definition: Importer.h:725
#define VLOG(n)
Definition: Logger.h:388
heavyai::shared_mutex import_mutex_
Definition: Importer.h:723
std::unique_ptr< Loader > loader
Definition: Importer.h:900
const std::string file_path
Definition: Importer.h:720

+ Here is the call graph for this function:

ImportStatus import_export::Importer::importGDAL ( const std::map< std::string, std::string > &  colname_to_src,
const Catalog_Namespace::SessionInfo session_info,
const bool  is_raster 
)

Definition at line 5226 of file Importer.cpp.

References importGDALGeo(), and importGDALRaster().

Referenced by QueryRunner::ImportDriver::importGeoTable().

5229  {
5230  if (is_raster) {
5231  return importGDALRaster(session_info);
5232  }
5233  return importGDALGeo(columnNameToSourceNameMap, session_info);
5234 }
ImportStatus importGDALGeo(const std::map< std::string, std::string > &colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5236
ImportStatus importGDALRaster(const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:5524

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ImportStatus import_export::Importer::importGDALGeo ( const std::map< std::string, std::string > &  colname_to_src,
const Catalog_Namespace::SessionInfo session_info 
)
private

Definition at line 5236 of file Importer.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, checkpoint(), logger::ERROR, g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::SessionInfo::get_session_id(), Executor::getExecutor(), import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), import_buffers_vec, import_id, import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::import_thread_shapefile(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, import_export::num_import_threads(), openGDALDataSource(), import_export::parse_add_metadata_columns(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), logger::thread_id(), toString(), Executor::UNITARY_EXECUTOR_ID, and VLOG.

Referenced by importGDAL().

5238  {
5239  // initial status
5242  if (poDS == nullptr) {
5243  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5244  file_path);
5245  }
5246 
5247  OGRLayer& layer =
5249 
5250  // get the number of features in this layer
5251  size_t numFeatures = layer.GetFeatureCount();
5252 
5253  // build map of metadata field (additional columns) name to index
5254  // use shared_ptr since we need to pass it to the worker
5255  FieldNameToIndexMapType fieldNameToIndexMap;
5256  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5257  CHECK(poFDefn);
5258  size_t numFields = poFDefn->GetFieldCount();
5259  for (size_t iField = 0; iField < numFields; iField++) {
5260  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5261  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5262  }
5263 
5264  // the geographic spatial reference we want to put everything in
5265  Geospatial::GDAL::SpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5266  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5267 
5268 #if GDAL_VERSION_MAJOR >= 3
5269  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5270  // this results in X and Y being transposed for angle-based
5271  // coordinate systems. This restores the previous behavior.
5272  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5273 #endif
5274 
5275 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5276  // just one "thread"
5277  max_threads = 1;
5278 #else
5279  // how many threads to use
5281 #endif
5282  VLOG(1) << "GDAL import # threads: " << max_threads;
5283 
5284  // metadata columns?
5285  auto const metadata_column_infos =
5287 
5288  // import geo table is specifically handled in both DBHandler and QueryRunner
5289  // that is separate path against a normal SQL execution
5290  // so we here explicitly enroll the import session to allow interruption
5291  // while importing geo table
5292  auto query_session = session_info ? session_info->get_session_id() : "";
5293  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5295  auto is_session_already_registered = false;
5296  {
5298  executor->getSessionLock());
5299  is_session_already_registered =
5300  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5301  }
5302  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5303  !is_session_already_registered) {
5304  executor->enrollQuerySession(query_session,
5305  "IMPORT_GEO_TABLE",
5306  query_submitted_time,
5308  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5309  }
5310  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5311  // reset the runtime query interrupt status
5312  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5313  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5314  }
5315  };
5316 
5317  // make an import buffer for each thread
5318  CHECK_EQ(import_buffers_vec.size(), 0u);
5320  for (size_t i = 0; i < max_threads; i++) {
5321  for (const auto cd : loader->get_column_descs()) {
5322  import_buffers_vec[i].emplace_back(
5323  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5324  }
5325  }
5326 
5327 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5328  // threads
5329  std::list<std::future<ImportStatus>> threads;
5330 
5331  // use a stack to track thread_ids which must not overlap among threads
5332  // because thread_id is used to index import_buffers_vec[]
5333  std::stack<size_t> stack_thread_ids;
5334  for (size_t i = 0; i < max_threads; i++) {
5335  stack_thread_ids.push(i);
5336  }
5337 #endif
5338 
5339  // checkpoint the table
5340  auto table_epochs = loader->getTableEpochs();
5341 
5342  // reset the layer
5343  layer.ResetReading();
5344 
5345  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5346 
5347  // make a features buffer for each thread
5348  std::vector<FeaturePtrVector> features(max_threads);
5349 
5350  // make one of these for each thread, based on the first feature's SR
5351  std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5352  max_threads);
5353 
5354  // for each feature...
5355  size_t firstFeatureThisChunk = 0;
5356  while (firstFeatureThisChunk < numFeatures) {
5357  // how many features this chunk
5358  size_t numFeaturesThisChunk =
5359  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5360 
5361 // get a thread_id not in use
5362 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5363  size_t thread_id = 0;
5364 #else
5365  auto thread_id = stack_thread_ids.top();
5366  stack_thread_ids.pop();
5367  CHECK(thread_id < max_threads);
5368 #endif
5369 
5370  // fill features buffer for new thread
5371  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
5372  features[thread_id].emplace_back(layer.GetNextFeature());
5373  }
5374 
5375  // construct a coordinate transformation for each thread, if needed
5376  // some features may not have geometry, so look for the first one that does
5377  if (coordinate_transformations[thread_id] == nullptr) {
5378  for (auto const& feature : features[thread_id]) {
5379  auto const* geometry = feature->GetGeometryRef();
5380  if (geometry) {
5381  auto const* geometry_sr = geometry->getSpatialReference();
5382  // if the SR is non-null and non-empty and different from what we want
5383  // we need to make a reusable CoordinateTransformation
5384  if (geometry_sr &&
5385 #if GDAL_VERSION_MAJOR >= 3
5386  !geometry_sr->IsEmpty() &&
5387 #endif
5388  !geometry_sr->IsSame(poGeographicSR.get())) {
5389  // validate the SR before trying to use it
5390  if (geometry_sr->Validate() != OGRERR_NONE) {
5391  throw std::runtime_error("Incoming geo has invalid Spatial Reference");
5392  }
5393  // create the OGRCoordinateTransformation that will be used for
5394  // all the features in this chunk
5395  coordinate_transformations[thread_id].reset(
5396  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR.get()));
5397  if (coordinate_transformations[thread_id] == nullptr) {
5398  throw std::runtime_error(
5399  "Failed to create a GDAL CoordinateTransformation for incoming geo");
5400  }
5401  }
5402  // once we find at least one geometry with an SR, we're done
5403  break;
5404  }
5405  }
5406  }
5407 
5408 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5409  // call worker function directly
5410  auto ret_import_status =
5412  this,
5413  coordinate_transformations[thread_id].get(),
5414  std::move(features[thread_id]),
5415  firstFeatureThisChunk,
5416  numFeaturesThisChunk,
5417  fieldNameToIndexMap,
5418  columnNameToSourceNameMap,
5419  session_info,
5420  executor.get(),
5421  metadata_column_infos);
5422  import_status += ret_import_status;
5423  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
5424  import_status.rows_completed;
5425  set_import_status(import_id, import_status);
5426 #else
5427  // fire up that thread to import this geometry
5428  threads.push_back(std::async(std::launch::async,
5430  thread_id,
5431  this,
5432  coordinate_transformations[thread_id].get(),
5433  std::move(features[thread_id]),
5434  firstFeatureThisChunk,
5435  numFeaturesThisChunk,
5436  fieldNameToIndexMap,
5437  columnNameToSourceNameMap,
5438  session_info,
5439  executor.get(),
5440  metadata_column_infos));
5441 
5442  // let the threads run
5443  while (threads.size() > 0) {
5444  int nready = 0;
5445  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5446  it != threads.end();) {
5447  auto& p = *it;
5448  std::chrono::milliseconds span(
5449  0); //(std::distance(it, threads.end()) == 1? 1: 0);
5450  if (p.wait_for(span) == std::future_status::ready) {
5451  auto ret_import_status = p.get();
5452  {
5454  import_status_ += ret_import_status;
5456  ((float)firstFeatureThisChunk / (float)numFeatures) *
5460  break;
5461  }
5462  }
5463  // recall thread_id for reuse
5464  stack_thread_ids.push(ret_import_status.thread_id);
5465 
5466  threads.erase(it++);
5467  ++nready;
5468  } else {
5469  ++it;
5470  }
5471  }
5472 
5473  if (nready == 0) {
5474  std::this_thread::yield();
5475  }
5476 
5477  // keep reading if any free thread slot
5478  // this is one of the major difference from old threading model !!
5479  if (threads.size() < max_threads) {
5480  break;
5481  }
5482  }
5483 #endif
5484 
5485  // out of rows?
5486 
5489  import_status_.load_failed = true;
5490  // todo use better message
5491  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
5492  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
5493  break;
5494  }
5496  LOG(ERROR) << "A call to the Loader failed in GDAL, Please review the logs for "
5497  "more details";
5498  break;
5499  }
5500 
5501  firstFeatureThisChunk += numFeaturesThisChunk;
5502  }
5503 
5504 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5505  // wait for any remaining threads
5506  if (threads.size()) {
5507  for (auto& p : threads) {
5508  // wait for the thread
5509  p.wait();
5510  // get the result and update the final import status
5511  auto ret_import_status = p.get();
5512  import_status_ += ret_import_status;
5515  }
5516  }
5517 #endif
5518 
5519  checkpoint(table_epochs);
5520 
5521  return import_status_;
5522 }
std::lock_guard< T > lock_guard
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:150
#define CHECK_EQ(x, y)
Definition: Logger.h:301
#define LOG(tag)
Definition: Logger.h:285
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4602
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRCoordinateTransformation *coordinate_transformation, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const Catalog_Namespace::SessionInfo *session_info, Executor *executor, const MetadataColumnInfos &metadata_column_infos)
Definition: Importer.cpp:2338
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:138
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:513
std::string add_metadata_columns
Definition: CopyParams.h:94
future< Result > async(Fn &&fn, Args &&...args)
std::unique_lock< T > unique_lock
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1703
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
std::string get_session_id() const
Definition: SessionInfo.h:93
std::string geo_layer_name
Definition: CopyParams.h:81
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:240
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4583
std::unique_ptr< OGRSpatialReference, SpatialReferenceDeleter > SpatialReferenceUqPtr
Definition: GDAL.h:59
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
std::string import_id
Definition: Importer.h:895
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3524
ThreadId thread_id()
Definition: Logger.cpp:879
#define CHECK(condition)
Definition: Logger.h:291
static constexpr ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
#define VLOG(n)
Definition: Logger.h:388
heavyai::shared_mutex import_mutex_
Definition: Importer.h:723
std::unique_ptr< Loader > loader
Definition: Importer.h:900
const std::string file_path
Definition: Importer.h:720

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ImportStatus import_export::Importer::importGDALRaster ( const Catalog_Namespace::SessionInfo session_info)
private

Definition at line 5524 of file Importer.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, CHECK_LE, anonymous_namespace{Importer.cpp}::check_session_interrupted(), checkpoint(), ColumnDescriptor::columnName, ColumnDescriptor::columnType, Geospatial::compress_coords(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_transform(), import_export::anonymous_namespace{Importer.cpp}::convert_raster_point_type(), import_export::RasterImporter::detect(), logger::ERROR, f(), g_enable_non_kernel_time_query_interrupt, SQLTypeInfo::get_output_srid(), Catalog_Namespace::SessionInfo::get_session_id(), SQLTypeInfo::get_subtype(), SQLTypeInfo::get_type(), import_export::RasterImporter::getBandNamesAndSQLTypes(), import_export::RasterImporter::getBandNullValue(), import_export::RasterImporter::getBandsHeight(), import_export::RasterImporter::getBandsWidth(), Executor::getExecutor(), import_export::RasterImporter::getNumBands(), import_export::RasterImporter::getPointNamesAndSQLTypes(), import_export::RasterImporter::getProjectedPixelCoords(), import_export::RasterImporter::getRawPixels(), import_export::RasterImporter::import(), import_buffers_vec, import_id, import_export::DataStreamSink::import_status_, logger::INFO, ColumnDescriptor::isGeoPhyCol, kARRAY, kBIGINT, kDOUBLE, kFLOAT, kINT, kMaxRasterScanlinesPerThread, kNULLT, kPOINT, kSMALLINT, kTINYINT, load(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, loader, LOG, max_threads, NULL_DOUBLE, NULL_FLOAT, NULL_INT, NULL_SMALLINT, import_export::num_import_threads(), import_export::parse_add_metadata_columns(), import_export::BadRowsTracker::rows, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_estimated, import_export::ImportStatus::rows_rejected, set_import_status(), logger::thread_id(), timer_start(), TIMER_STOP, to_string(), toString(), Executor::UNITARY_EXECUTOR_ID, UNLIKELY, and VLOG.

Referenced by importGDAL().

5525  {
5526  // initial status
5528 
5529  // metadata columns?
5530  auto const metadata_column_infos =
5532 
5533  // create a raster importer and do the detect
5534  RasterImporter raster_importer;
5535  raster_importer.detect(
5536  file_path,
5542  true,
5543  metadata_column_infos);
5544 
5545  // get the table columns and count actual columns
5546  auto const& column_descs = loader->get_column_descs();
5547  uint32_t num_table_cols{0u};
5548  for (auto const* cd : column_descs) {
5549  if (!cd->isGeoPhyCol) {
5550  num_table_cols++;
5551  }
5552  }
5553 
5554  // how many bands do we have?
5555  auto num_bands = raster_importer.getNumBands();
5556 
5557  // get point columns info
5558  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
5559 
5560  // validate that the table column count matches
5561  auto num_expected_cols = num_bands;
5562  num_expected_cols += point_names_and_sql_types.size();
5563  num_expected_cols += metadata_column_infos.size();
5564  if (num_expected_cols != num_table_cols) {
5565  throw std::runtime_error(
5566  "Raster Import aborted. Band/Column count mismatch (file requires " +
5567  std::to_string(num_expected_cols) + ", table has " +
5568  std::to_string(num_table_cols) + ")");
5569  }
5570 
5571  // validate the point column names and types
5572  // if we're importing the coords as a POINT, then the first column
5573  // must be a POINT (two physical columns, POINT and TINYINT[])
5574  // if we're not, the first two columns must be the matching type
5575  // optionally followed by an angle column
5576  auto cd_itr = column_descs.begin();
5577  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
5578  if (sql_type == kPOINT) {
5579  // POINT column
5580  {
5581  auto const* cd = *cd_itr++;
5582  if (cd->columnName != col_name) {
5583  throw std::runtime_error("Column '" + cd->columnName +
5584  "' overridden name invalid (must be '" + col_name +
5585  "')");
5586  }
5587  auto const cd_type = cd->columnType.get_type();
5588  if (cd_type != kPOINT) {
5589  throw std::runtime_error("Column '" + cd->columnName +
5590  "' overridden type invalid (must be POINT)");
5591  }
5592  if (cd->columnType.get_output_srid() != 4326) {
5593  throw std::runtime_error("Column '" + cd->columnName +
5594  "' overridden SRID invalid (must be 4326)");
5595  }
5596  }
5597  // TINYINT[] coords sub-column
5598  {
5599  // if the above is true, this must be true
5600  auto const* cd = *cd_itr++;
5601  CHECK(cd->columnType.get_type() == kARRAY);
5602  CHECK(cd->columnType.get_subtype() == kTINYINT);
5603  }
5604  } else {
5605  // column of the matching name and type
5606  auto const* cd = *cd_itr++;
5607  if (cd->columnName != col_name) {
5608  throw std::runtime_error("Column '" + cd->columnName +
5609  "' overridden name invalid (must be '" + col_name +
5610  "')");
5611  }
5612  auto const cd_type = cd->columnType.get_type();
5613  if (cd_type != sql_type) {
5614  throw std::runtime_error("Column '" + cd->columnName +
5615  "' overridden type invalid (must be " +
5616  to_string(sql_type) + ")");
5617  }
5618  }
5619  }
5620 
5621  // validate the band column types
5622  // any Immerse overriding to other types will currently be rejected
5623  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
5624  if (band_names_and_types.size() != num_bands) {
5625  throw std::runtime_error("Column/Band count mismatch when validating types");
5626  }
5627  for (uint32_t i = 0; i < num_bands; i++) {
5628  auto const* cd = *cd_itr++;
5629  auto const cd_type = cd->columnType.get_type();
5630  auto const sql_type = band_names_and_types[i].second;
5631  if (cd_type != sql_type) {
5632  throw std::runtime_error("Band Column '" + cd->columnName +
5633  "' overridden type invalid (must be " +
5634  to_string(sql_type) + ")");
5635  }
5636  }
5637 
5638  // validate metadata column
5639  for (auto const& mci : metadata_column_infos) {
5640  auto const* cd = *cd_itr++;
5641  if (mci.column_descriptor.columnName != cd->columnName) {
5642  throw std::runtime_error("Metadata Column '" + cd->columnName +
5643  "' overridden name invalid (must be '" +
5644  mci.column_descriptor.columnName + "')");
5645  }
5646  auto const cd_type = cd->columnType.get_type();
5647  auto const md_type = mci.column_descriptor.columnType.get_type();
5648  if (cd_type != md_type) {
5649  throw std::runtime_error("Metadata Column '" + cd->columnName +
5650  "' overridden type invalid (must be " +
5651  to_string(md_type) + ")");
5652  }
5653  }
5654 
5655  // import geo table is specifically handled in both DBHandler and QueryRunner
5656  // that is separate path against a normal SQL execution
5657  // so we here explicitly enroll the import session to allow interruption
5658  // while importing geo table
5659  auto query_session = session_info ? session_info->get_session_id() : "";
5660  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5662  auto is_session_already_registered = false;
5663  {
5665  executor->getSessionLock());
5666  is_session_already_registered =
5667  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5668  }
5669  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5670  !is_session_already_registered) {
5671  executor->enrollQuerySession(query_session,
5672  "IMPORT_GEO_TABLE",
5673  query_submitted_time,
5675  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5676  }
5677  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5678  // reset the runtime query interrupt status
5679  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5680  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5681  }
5682  };
5683 
5684  // how many threads are we gonna use?
5686  VLOG(1) << "GDAL import # threads: " << max_threads;
5687 
5689  throw std::runtime_error("Invalid CopyParams.raster_scanlines_per_thread! (" +
5691  ")");
5692  }
5693  const int max_scanlines_per_thread =
5698  VLOG(1) << "Raster Importer: Max scanlines per thread: " << max_scanlines_per_thread;
5699 
5700  // prepare to checkpoint the table
5701  auto table_epochs = loader->getTableEpochs();
5702 
5703  // start wall clock
5704  auto wall_timer = timer_start();
5705 
5706  // start the import
5707  raster_importer.import(
5708  max_threads,
5709  copy_params.threads == 0); // NOTE: `max_threads` may change after this call
5710 
5711  // make an import buffer for each thread
5712  CHECK_EQ(import_buffers_vec.size(), 0u);
5714  for (size_t i = 0; i < max_threads; i++) {
5715  for (auto const& cd : loader->get_column_descs()) {
5716  import_buffers_vec[i].emplace_back(
5717  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5718  }
5719  }
5720 
5721  // status and times
5722  using ThreadReturn = std::tuple<ImportStatus, std::array<float, 3>>;
5723 
5724  // get the band dimensions
5725  auto const band_size_x = raster_importer.getBandsWidth();
5726  auto const band_size_y = raster_importer.getBandsHeight();
5727 
5728  // allocate raw pixel buffers per thread
5729  std::vector<RasterImporter::RawPixels> raw_pixel_bytes_per_thread(max_threads);
5730  for (size_t i = 0; i < max_threads; i++) {
5731  raw_pixel_bytes_per_thread[i].resize(band_size_x * max_scanlines_per_thread *
5732  sizeof(double));
5733  }
5734 
5735  // just the sql type of the first point column (if any)
5736  auto const point_sql_type = point_names_and_sql_types.size()
5737  ? point_names_and_sql_types.begin()->second
5738  : kNULLT;
5739 
5740  // lambda for importing to raw data buffers (threadable)
5741  auto import_rows =
5742  [&](const size_t thread_idx, const int y_start, const int y_end) -> ThreadReturn {
5743  // this threads's import buffers
5744  auto& import_buffers = import_buffers_vec[thread_idx];
5745 
5746  // this thread's raw pixel bytes
5747  auto& raw_pixel_bytes = raw_pixel_bytes_per_thread[thread_idx];
5748 
5749  // clear the buffers
5750  for (auto& col_buffer : import_buffers) {
5751  col_buffer->clear();
5752  }
5753 
5754  // prepare to iterate columns
5755  auto col_itr = column_descs.begin();
5756  int col_idx{0};
5757 
5758  float proj_s{0.0f};
5759  if (point_sql_type != kNULLT) {
5760  // the first two columns (either lon/lat or POINT/coords)
5761  auto const* cd_col0 = *col_itr++;
5762  auto const* cd_col1 = *col_itr++;
5763  auto const* cd_angle =
5764  copy_params.raster_point_compute_angle ? *col_itr++ : nullptr;
5765 
5766  // compute and add x and y
5767  auto proj_timer = timer_start();
5768  for (int y = y_start; y < y_end; y++) {
5769  // get projected pixel coords for this scan-line
5770  auto const coords = raster_importer.getProjectedPixelCoords(thread_idx, y);
5771 
5772  // add to buffers
5773  for (int x = 0; x < band_size_x; x++) {
5774  // this point and angle
5775  auto const& [dx, dy, angle] = coords[x];
5776 
5777  // store the point
5778  switch (point_sql_type) {
5779  case kPOINT: {
5780  // add empty value to POINT buffer
5781  TDatum td_point;
5782  import_buffers[0]->add_value(cd_col0, td_point, false);
5783 
5784  // convert lon/lat to bytes (compressed or not) and add to POINT coords
5785  // buffer
5786  auto const compressed_coords =
5787  Geospatial::compress_coords({dx, dy}, cd_col0->columnType);
5788  std::vector<TDatum> td_coords_data;
5789  for (auto const& cc : compressed_coords) {
5790  TDatum td_byte;
5791  td_byte.val.int_val = cc;
5792  td_coords_data.push_back(td_byte);
5793  }
5794  TDatum td_coords;
5795  td_coords.val.arr_val = td_coords_data;
5796  td_coords.is_null = false;
5797  import_buffers[1]->add_value(cd_col1, td_coords, false);
5798  } break;
5799  case kFLOAT:
5800  case kDOUBLE: {
5801  TDatum td;
5802  td.is_null = false;
5803  td.val.real_val = dx;
5804  import_buffers[0]->add_value(cd_col0, td, false);
5805  td.val.real_val = dy;
5806  import_buffers[1]->add_value(cd_col1, td, false);
5807  } break;
5808  case kSMALLINT:
5809  case kINT: {
5810  TDatum td;
5811  td.is_null = false;
5812  td.val.int_val = static_cast<int64_t>(x);
5813  import_buffers[0]->add_value(cd_col0, td, false);
5814  td.val.int_val = static_cast<int64_t>(y);
5815  import_buffers[1]->add_value(cd_col1, td, false);
5816  } break;
5817  default:
5818  CHECK(false);
5819  }
5820 
5821  // angle?
5823  CHECK(cd_angle);
5824  TDatum td;
5825  td.is_null = false;
5826  td.val.real_val = static_cast<double>(angle);
5827  import_buffers[2]->add_value(cd_angle, td, false);
5828  }
5829  }
5830  }
5831  proj_s = TIMER_STOP(proj_timer);
5832  col_idx += (copy_params.raster_point_compute_angle ? 3 : 2);
5833  }
5834 
5835  // prepare to accumulate read and conv times
5836  float read_s{0.0f};
5837  float conv_s{0.0f};
5838 
5839  // y_end is one past the actual end, so don't add 1
5840  auto const num_rows = y_end - y_start;
5841  auto const num_elems = band_size_x * num_rows;
5842 
5843  ImportStatus thread_import_status;
5844 
5845  bool read_block_failed = false;
5846 
5847  // prepare to store which band values in which rows are null
5848  boost::dynamic_bitset<> row_band_nulls;
5850  row_band_nulls.resize(num_elems * num_bands);
5851  }
5852 
5853  auto set_row_band_null = [&](const int row, const uint32_t band) {
5854  auto const bit_index = (row * num_bands) + band;
5855  row_band_nulls.set(bit_index);
5856  };
5857  auto all_row_bands_null = [&](const int row) -> bool {
5858  auto const first_bit_index = row * num_bands;
5859  bool all_null = true;
5860  for (auto i = first_bit_index; i < first_bit_index + num_bands; i++) {
5861  all_null = all_null && row_band_nulls.test(i);
5862  }
5863  return all_null;
5864  };
5865 
5866  // for each band/column
5867  for (uint32_t band_idx = 0; band_idx < num_bands; band_idx++) {
5868  // the corresponding column
5869  auto const* cd_band = *col_itr;
5870  CHECK(cd_band);
5871 
5872  // data type to read as
5873  auto const cd_type = cd_band->columnType.get_type();
5874 
5875  // read the scanlines (will do a data type conversion if necessary)
5876  try {
5877  auto read_timer = timer_start();
5878  raster_importer.getRawPixels(
5879  thread_idx, band_idx, y_start, num_rows, cd_type, raw_pixel_bytes);
5880  read_s += TIMER_STOP(read_timer);
5881  } catch (std::runtime_error& e) {
5882  // report error
5883  LOG(ERROR) << e.what();
5884  // abort this block
5885  read_block_failed = true;
5886  break;
5887  }
5888 
5889  // null value?
5890  auto const [null_value, null_value_valid] =
5891  raster_importer.getBandNullValue(band_idx);
5892 
5893  // copy to this thread's import buffers
5894  // convert any nulls we find
5895  auto conv_timer = timer_start();
5896  TDatum td;
5897  switch (cd_type) {
5898  case kSMALLINT: {
5899  const int16_t* values =
5900  reinterpret_cast<const int16_t*>(raw_pixel_bytes.data());
5901  for (int idx = 0; idx < num_elems; idx++) {
5902  auto const& value = values[idx];
5903  if (null_value_valid && value == static_cast<int16_t>(null_value)) {
5904  td.is_null = true;
5905  td.val.int_val = NULL_SMALLINT;
5907  set_row_band_null(idx, band_idx);
5908  }
5909  } else {
5910  td.is_null = false;
5911  td.val.int_val = static_cast<int64_t>(value);
5912  }
5913  import_buffers[col_idx]->add_value(cd_band, td, false);
5914  }
5915  } break;
5916  case kINT: {
5917  const int32_t* values =
5918  reinterpret_cast<const int32_t*>(raw_pixel_bytes.data());
5919  for (int idx = 0; idx < num_elems; idx++) {
5920  auto const& value = values[idx];
5921  if (null_value_valid && value == static_cast<int32_t>(null_value)) {
5922  td.is_null = true;
5923  td.val.int_val = NULL_INT;
5925  set_row_band_null(idx, band_idx);
5926  }
5927  } else {
5928  td.is_null = false;
5929  td.val.int_val = static_cast<int64_t>(value);
5930  }
5931  import_buffers[col_idx]->add_value(cd_band, td, false);
5932  }
5933  } break;
5934  case kBIGINT: {
5935  const uint32_t* values =
5936  reinterpret_cast<const uint32_t*>(raw_pixel_bytes.data());
5937  for (int idx = 0; idx < num_elems; idx++) {
5938  auto const& value = values[idx];
5939  if (null_value_valid && value == static_cast<uint32_t>(null_value)) {
5940  td.is_null = true;
5941  td.val.int_val = NULL_INT;
5943  set_row_band_null(idx, band_idx);
5944  }
5945  } else {
5946  td.is_null = false;
5947  td.val.int_val = static_cast<int64_t>(value);
5948  }
5949  import_buffers[col_idx]->add_value(cd_band, td, false);
5950  }
5951  } break;
5952  case kFLOAT: {
5953  const float* values = reinterpret_cast<const float*>(raw_pixel_bytes.data());
5954  for (int idx = 0; idx < num_elems; idx++) {
5955  auto const& value = values[idx];
5956  if (null_value_valid && value == static_cast<float>(null_value)) {
5957  td.is_null = true;
5958  td.val.real_val = NULL_FLOAT;
5960  set_row_band_null(idx, band_idx);
5961  }
5962  } else {
5963  td.is_null = false;
5964  td.val.real_val = static_cast<double>(value);
5965  }
5966  import_buffers[col_idx]->add_value(cd_band, td, false);
5967  }
5968  } break;
5969  case kDOUBLE: {
5970  const double* values = reinterpret_cast<const double*>(raw_pixel_bytes.data());
5971  for (int idx = 0; idx < num_elems; idx++) {
5972  auto const& value = values[idx];
5973  if (null_value_valid && value == null_value) {
5974  td.is_null = true;
5975  td.val.real_val = NULL_DOUBLE;
5977  set_row_band_null(idx, band_idx);
5978  }
5979  } else {
5980  td.is_null = false;
5981  td.val.real_val = value;
5982  }
5983  import_buffers[col_idx]->add_value(cd_band, td, false);
5984  }
5985  } break;
5986  default:
5987  CHECK(false);
5988  }
5989  conv_s += TIMER_STOP(conv_timer);
5990 
5991  // next column
5992  col_idx++;
5993  col_itr++;
5994  }
5995 
5996  if (read_block_failed) {
5997  // discard block data
5998  for (auto& col_buffer : import_buffers) {
5999  col_buffer->clear();
6000  }
6001  thread_import_status.rows_estimated = 0;
6002  thread_import_status.rows_completed = 0;
6003  thread_import_status.rows_rejected = num_elems;
6004  } else {
6005  // metadata columns?
6006  for (auto const& mci : metadata_column_infos) {
6007  auto const* cd_band = *col_itr++;
6008  CHECK(cd_band);
6009  for (int i = 0; i < num_elems; i++) {
6010  import_buffers[col_idx]->add_value(cd_band, mci.value, false, copy_params);
6011  }
6012  col_idx++;
6013  }
6014 
6015  // drop rows where all band columns are null?
6016  int num_dropped_as_all_null = 0;
6018  // capture rows where ALL the band values (only) were NULL
6019  // count rows first (implies two passes on the bitset but
6020  // still quicker than building the row set if not needed,
6021  // in the case where ALL rows are to be dropped)
6022  for (int row = 0; row < num_elems; row++) {
6023  if (all_row_bands_null(row)) {
6024  num_dropped_as_all_null++;
6025  }
6026  }
6027  // delete those rows from ALL column buffers (including coords and metadata)
6028  if (num_dropped_as_all_null == num_elems) {
6029  // all rows need dropping, just clear (fast)
6030  for (auto& col_buffer : import_buffers) {
6031  col_buffer->clear();
6032  }
6033  } else if (num_dropped_as_all_null > 0) {
6034  // drop "bad" rows selectively (slower)
6035  // build row set to drop
6036  BadRowsTracker bad_rows_tracker;
6037  for (int row = 0; row < num_elems; row++) {
6038  if (all_row_bands_null(row)) {
6039  bad_rows_tracker.rows.emplace(static_cast<int64_t>(row));
6040  }
6041  }
6042  // then delete rows
6043  for (auto& col_buffer : import_buffers) {
6044  auto const* cd = col_buffer->getColumnDesc();
6045  CHECK(cd);
6046  auto const col_type = cd->columnType.get_type();
6047  col_buffer->del_values(col_type, &bad_rows_tracker);
6048  }
6049  }
6050  }
6051 
6052  // final count
6053  CHECK_LE(num_dropped_as_all_null, num_elems);
6054  auto const actual_num_elems = num_elems - num_dropped_as_all_null;
6055  thread_import_status.rows_estimated = actual_num_elems;
6056  thread_import_status.rows_completed = actual_num_elems;
6057  thread_import_status.rows_rejected = 0;
6058  }
6059 
6060  // done
6061  return {std::move(thread_import_status), {proj_s, read_s, conv_s}};
6062  };
6063 
6064  // time the phases
6065  float total_proj_s{0.0f};
6066  float total_read_s{0.0f};
6067  float total_conv_s{0.0f};
6068  float total_load_s{0.0f};
6069 
6070  const int min_scanlines_per_thread = 8;
6071  const int max_scanlines_per_block = max_scanlines_per_thread * max_threads;
6072  for (int block_y = 0; block_y < band_size_y;
6073  block_y += (max_threads * max_scanlines_per_thread)) {
6074  using Future = std::future<ThreadReturn>;
6075  std::vector<Future> futures;
6076  const int scanlines_in_block =
6077  std::min(band_size_y - block_y, max_scanlines_per_block);
6078  const int pixels_in_block = scanlines_in_block * band_size_x;
6079  const int block_max_scanlines_per_thread =
6080  std::max((scanlines_in_block + static_cast<int>(max_threads) - 1) /
6081  static_cast<int>(max_threads),
6082  min_scanlines_per_thread);
6083  VLOG(1) << "Raster Importer: scanlines_in_block: " << scanlines_in_block
6084  << ", block_max_scanlines_per_thread: " << block_max_scanlines_per_thread;
6085 
6086  auto block_wall_timer = timer_start();
6087  // run max_threads scanlines at once
6088  for (size_t thread_id = 0; thread_id < max_threads; thread_id++) {
6089  const int y_start = block_y + thread_id * block_max_scanlines_per_thread;
6090  if (y_start < band_size_y) {
6091  const int y_end = std::min(y_start + block_max_scanlines_per_thread, band_size_y);
6092  if (y_start < y_end) {
6093  futures.emplace_back(
6094  std::async(std::launch::async, import_rows, thread_id, y_start, y_end));
6095  }
6096  }
6097  }
6098 
6099  // wait for the threads to finish and
6100  // accumulate the results and times
6101  float proj_s{0.0f}, read_s{0.0f}, conv_s{0.0f}, load_s{0.0f};
6102  size_t thread_idx = 0;
6103  for (auto& future : futures) {
6104  auto const [import_status, times] = future.get();
6105  import_status_ += import_status;
6106  proj_s += times[0];
6107  read_s += times[1];
6108  conv_s += times[2];
6109  // We load the data in thread order so we can get deterministic row-major raster
6110  // ordering
6111  // Todo: We should consider invoking the load on another thread in a ping-pong
6112  // fashion so we can simultaneously read the next batch of data
6113  auto thread_load_timer = timer_start();
6114  // only try to load this thread's data if valid
6115  if (import_status.rows_completed > 0) {
6116  load(import_buffers_vec[thread_idx], import_status.rows_completed, session_info);
6117  }
6118  load_s += TIMER_STOP(thread_load_timer);
6119  ++thread_idx;
6120  }
6121 
6122  // average times over all threads (except for load which is single-threaded)
6123  total_proj_s += (proj_s / float(futures.size()));
6124  total_read_s += (read_s / float(futures.size()));
6125  total_conv_s += (conv_s / float(futures.size()));
6126  total_load_s += load_s;
6127 
6128  // update the status
6130 
6131  // more debug
6132  auto const block_wall_s = TIMER_STOP(block_wall_timer);
6133  auto const scanlines_per_second = scanlines_in_block / block_wall_s;
6134  auto const rows_per_second = pixels_in_block / block_wall_s;
6135  LOG(INFO) << "Raster Importer: Loaded " << scanlines_in_block
6136  << " scanlines starting at " << block_y << " out of " << band_size_y
6137  << " in " << block_wall_s << "s at " << scanlines_per_second
6138  << " scanlines/s and " << rows_per_second << " rows/s";
6139 
6140  // check for interrupt
6141  if (UNLIKELY(check_session_interrupted(query_session, executor.get()))) {
6142  import_status_.load_failed = true;
6143  import_status_.load_msg = "Raster Import interrupted";
6144  throw QueryExecutionError(ErrorCode::INTERRUPTED);
6145  }
6146 
6147  // hit max_reject?
6149  break;
6150  }
6151  }
6152 
6153  // checkpoint
6154  auto checkpoint_timer = timer_start();
6155  checkpoint(table_epochs);
6156  auto const checkpoint_s = TIMER_STOP(checkpoint_timer);
6157 
6158  // stop wall clock
6159  auto const total_wall_s = TIMER_STOP(wall_timer);
6160 
6161  // report
6162  auto const total_scanlines_per_second = float(band_size_y) / total_wall_s;
6163  auto const total_rows_per_second =
6164  float(band_size_x) * float(band_size_y) / total_wall_s;
6165  LOG(INFO) << "Raster Importer: Imported "
6166  << static_cast<uint64_t>(band_size_x) * static_cast<uint64_t>(band_size_y)
6167  << " rows";
6168  LOG(INFO) << "Raster Importer: Total Import Time " << total_wall_s << "s at "
6169  << total_scanlines_per_second << " scanlines/s and " << total_rows_per_second
6170  << " rows/s";
6171 
6172  // if we hit max_reject, throw an exception now to report the error and abort any
6173  // multi-file loop
6175  std::string msg = "Raster Importer: Import aborted after failing to read " +
6177  " rows/pixels (limit " + std::to_string(copy_params.max_reject) +
6178  ")";
6179  import_status_.load_msg = msg;
6181  throw std::runtime_error(msg);
6182  }
6183 
6184  // phase times (with proportions)
6185  auto proj_pct = float(int(total_proj_s / total_wall_s * 1000.0f) * 0.1f);
6186  auto read_pct = float(int(total_read_s / total_wall_s * 1000.0f) * 0.1f);
6187  auto conv_pct = float(int(total_conv_s / total_wall_s * 1000.0f) * 0.1f);
6188  auto load_pct = float(int(total_load_s / total_wall_s * 1000.0f) * 0.1f);
6189  auto cpnt_pct = float(int(checkpoint_s / total_wall_s * 1000.0f) * 0.1f);
6190 
6191  VLOG(1) << "Raster Importer: Import timing breakdown:";
6192  VLOG(1) << " Project " << total_proj_s << "s (" << proj_pct << "%)";
6193  VLOG(1) << " Read " << total_read_s << "s (" << read_pct << "%)";
6194  VLOG(1) << " Convert " << total_conv_s << "s (" << conv_pct << "%)";
6195  VLOG(1) << " Load " << total_load_s << "s (" << load_pct << "%)";
6196  VLOG(1) << " Checkpoint " << checkpoint_s << "s (" << cpnt_pct << "%)";
6197 
6198  // all done
6199  return import_status_;
6200 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
int32_t raster_scanlines_per_thread
Definition: CopyParams.h:90
bool check_session_interrupted(const QuerySessionId &query_session, Executor *executor)
Definition: Importer.cpp:124
#define NULL_DOUBLE
#define NULL_FLOAT
#define LOG(tag)
Definition: Logger.h:285
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3514
RasterImporter::PointType convert_raster_point_type(const import_export::RasterPointType raster_point_type)
Definition: Importer.cpp:4779
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:138
std::string raster_import_dimensions
Definition: CopyParams.h:93
std::string to_string(char const *&&v)
#define NULL_INT
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:513
std::string add_metadata_columns
Definition: CopyParams.h:94
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
#define TIMER_STOP(t)
Definition: Importer.cpp:100
future< Result > async(Fn &&fn, Args &&...args)
RasterPointType raster_point_type
Definition: CopyParams.h:88
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:899
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1703
size_t num_import_threads(const int32_t copy_params_threads)
Definition: thread_count.h:31
#define UNLIKELY(x)
Definition: likely.h:25
std::string get_session_id() const
Definition: SessionInfo.h:93
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:240
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
#define CHECK_LE(x, y)
Definition: Logger.h:304
RasterImporter::PointTransform convert_raster_point_transform(const import_export::RasterPointTransform raster_point_transform)
Definition: Importer.cpp:4801
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::string import_id
Definition: Importer.h:895
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3524
ThreadId thread_id()
Definition: Logger.cpp:879
std::string raster_import_bands
Definition: CopyParams.h:89
#define CHECK(condition)
Definition: Logger.h:291
#define NULL_SMALLINT
Definition: sqltypes.h:72
static constexpr ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
RasterPointTransform raster_point_transform
Definition: CopyParams.h:91
#define VLOG(n)
Definition: Logger.h:388
Type timer_start()
Definition: measure.h:42
static constexpr int kMaxRasterScanlinesPerThread
Definition: Importer.cpp:113
std::unique_ptr< Loader > loader
Definition: Importer.h:900
const std::string file_path
Definition: Importer.h:720

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::load ( const std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t  row_count,
const Catalog_Namespace::SessionInfo session_info 
)

Definition at line 3514 of file Importer.cpp.

References import_export::DataStreamSink::import_mutex_, import_export::DataStreamSink::import_status_, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, and loader.

Referenced by import_export::import_thread_delimited(), import_export::import_thread_shapefile(), and importGDALRaster().

3516  {
3517  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3519  import_status_.load_failed = true;
3520  import_status_.load_msg = loader->getErrorMessage();
3521  }
3522 }
std::lock_guard< T > lock_guard
heavyai::shared_mutex import_mutex_
Definition: Importer.h:723
std::unique_ptr< Loader > loader
Definition: Importer.h:900

+ Here is the caller graph for this function:

Geospatial::GDAL::DataSourceUqPtr import_export::Importer::openGDALDataSource ( const std::string &  fileName,
const CopyParams copy_params 
)
staticprivate

Definition at line 4583 of file Importer.cpp.

References Geospatial::GDAL::init(), import_export::kGeoFile, Geospatial::GDAL::openDataSource(), import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, Geospatial::GDAL::setAuthorizationTokens(), import_export::CopyParams::source_type, and to_string().

Referenced by gdalGetLayersInGeoFile(), gdalToColumnDescriptorsGeo(), importGDALGeo(), and readMetadataSampleGDAL().

4585  {
4593  throw std::runtime_error("Unexpected CopyParams.source_type (" +
4594  std::to_string(static_cast<int>(copy_params.source_type)) +
4595  ")");
4596  }
4598 }
std::string s3_secret_key
Definition: CopyParams.h:62
static void init()
Definition: GDAL.cpp:67
std::string to_string(char const *&&v)
static void setAuthorizationTokens(const std::string &s3_region, const std::string &s3_endpoint, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token)
Definition: GDAL.cpp:138
import_export::SourceType source_type
Definition: CopyParams.h:57
std::string s3_session_token
Definition: CopyParams.h:63
static DataSourceUqPtr openDataSource(const std::string &name, const import_export::SourceType source_type)
Definition: GDAL.cpp:181
std::string s3_access_key
Definition: CopyParams.h:61

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::readMetadataSampleGDAL ( const std::string &  fileName,
const std::string &  geoColumnName,
std::map< std::string, std::vector< std::string >> &  metadata,
int  rowLimit,
const CopyParams copy_params 
)
static

Definition at line 4625 of file Importer.cpp.

References import_export::CopyParams::add_metadata_columns, CHECK, import_export::CopyParams::geo_layer_name, import_export::anonymous_namespace{Importer.cpp}::getLayerWithSpecifiedName(), openGDALDataSource(), and import_export::parse_add_metadata_columns().

Referenced by DBHandler::detect_column_types().

4630  {
4632  openGDALDataSource(file_name, copy_params));
4633  if (datasource == nullptr) {
4634  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4635  file_name);
4636  }
4637 
4638  OGRLayer& layer =
4639  getLayerWithSpecifiedName(copy_params.geo_layer_name, datasource, file_name);
4640 
4641  auto const* feature_defn = layer.GetLayerDefn();
4642  CHECK(feature_defn);
4643 
4644  // metadata columns?
4645  auto const metadata_column_infos =
4647 
4648  // get limited feature count
4649  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4650  auto const feature_count = static_cast<uint64_t>(layer.GetFeatureCount());
4651  auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4652 
4653  // prepare sample data map
4654  for (int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4655  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4656  CHECK(column_name);
4657  sample_data[column_name] = {};
4658  }
4659  sample_data[geo_column_name] = {};
4660  for (auto const& mci : metadata_column_infos) {
4661  sample_data[mci.column_descriptor.columnName] = {};
4662  }
4663 
4664  // prepare to read
4665  layer.ResetReading();
4666 
4667  // read features (up to limited count)
4668  uint64_t feature_index{0u};
4669  while (feature_index < num_features) {
4670  // get (and take ownership of) feature
4671  Geospatial::GDAL::FeatureUqPtr feature(layer.GetNextFeature());
4672  if (!feature) {
4673  break;
4674  }
4675 
4676  // get feature geometry
4677  auto const* geometry = feature->GetGeometryRef();
4678  if (geometry == nullptr) {
4679  break;
4680  }
4681 
4682  // validate geom type (again?)
4683  switch (wkbFlatten(geometry->getGeometryType())) {
4684  case wkbPoint:
4685  case wkbMultiPoint:
4686  case wkbLineString:
4687  case wkbMultiLineString:
4688  case wkbPolygon:
4689  case wkbMultiPolygon:
4690  break;
4691  default:
4692  throw std::runtime_error("Unsupported geometry type: " +
4693  std::string(geometry->getGeometryName()));
4694  }
4695 
4696  // populate sample data for regular field columns
4697  for (int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4698  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4699  sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4700  }
4701 
4702  // populate sample data for metadata columns?
4703  for (auto const& mci : metadata_column_infos) {
4704  sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4705  }
4706 
4707  // populate sample data for geo column with WKT string
4708  char* wkts = nullptr;
4709  geometry->exportToWkt(&wkts);
4710  CHECK(wkts);
4711  sample_data[geo_column_name].push_back(wkts);
4712  CPLFree(wkts);
4713 
4714  // next feature
4715  feature_index++;
4716  }
4717 }
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const Geospatial::GDAL::DataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4602
std::unique_ptr< OGRFeature, FeatureDeleter > FeatureUqPtr
Definition: GDAL.h:53
std::string add_metadata_columns
Definition: CopyParams.h:94
std::string geo_layer_name
Definition: CopyParams.h:81
MetadataColumnInfos parse_add_metadata_columns(const std::string &add_metadata_columns, const std::string &file_path)
static Geospatial::GDAL::DataSourceUqPtr openGDALDataSource(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4583
std::unique_ptr< OGRDataSource, DataSourceDeleter > DataSourceUqPtr
Definition: GDAL.h:48
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< double > &  coords,
std::vector< double > &  bounds,
std::vector< int > &  ring_sizes,
std::vector< int > &  poly_rings,
const bool  force_null = false 
)
static

Definition at line 1636 of file Importer.cpp.

References ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by import_export::TypedImportBuffer::convert_arrow_val_to_import_buffer(), Parser::AddColumnStmt::execute(), import_export::fill_missing_columns(), import_export::import_thread_delimited(), foreign_storage::TextFileBufferParser::processGeoColumn(), and foreign_storage::TextFileBufferParser::processInvalidGeoColumn().

1645  {
1646  const auto col_ti = cd->columnType;
1647  const auto col_type = col_ti.get_type();
1648  auto columnId = cd->columnId;
1649  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1650  bool is_null_geo = false;
1651  bool is_null_point = false;
1652  if (!col_ti.get_notnull()) {
1653  // Check for NULL geo
1654  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1655  is_null_point = true;
1656  coords.clear();
1657  }
1658  is_null_geo = coords.empty();
1659  if (is_null_point) {
1660  coords.push_back(NULL_ARRAY_DOUBLE);
1661  coords.push_back(NULL_DOUBLE);
1662  // Treating POINT coords as notnull, need to store actual encoding
1663  // [un]compressed+[not]null
1664  is_null_geo = false;
1665  }
1666  }
1667  if (force_null) {
1668  is_null_geo = true;
1669  }
1670  TDatum tdd_coords;
1671  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1672  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1673  // in a fixlen array, compressed and uncompressed.
1674  if (!is_null_geo) {
1675  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1676  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1677  for (auto cc : compressed_coords) {
1678  tdd_coords.val.arr_val.emplace_back();
1679  tdd_coords.val.arr_val.back().val.int_val = cc;
1680  }
1681  }
1682  tdd_coords.is_null = is_null_geo;
1683  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1684 
1685  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1686  // Create [linest]ring_sizes array value and add it to the physical column
1687  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1688  TDatum tdd_ring_sizes;
1689  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1690  if (!is_null_geo) {
1691  for (auto ring_size : ring_sizes) {
1692  tdd_ring_sizes.val.arr_val.emplace_back();
1693  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1694  }
1695  }
1696  tdd_ring_sizes.is_null = is_null_geo;
1697  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1698  }
1699 
1700  if (col_type == kMULTIPOLYGON) {
1701  // Create poly_rings array value and add it to the physical column
1702  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1703  TDatum tdd_poly_rings;
1704  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1705  if (!is_null_geo) {
1706  for (auto num_rings : poly_rings) {
1707  tdd_poly_rings.val.arr_val.emplace_back();
1708  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1709  }
1710  }
1711  tdd_poly_rings.is_null = is_null_geo;
1712  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1713  }
1714 
1715  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1716  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1717  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1718  TDatum tdd_bounds;
1719  tdd_bounds.val.arr_val.reserve(bounds.size());
1720  if (!is_null_geo) {
1721  for (auto b : bounds) {
1722  tdd_bounds.val.arr_val.emplace_back();
1723  tdd_bounds.val.arr_val.back().val.real_val = b;
1724  }
1725  }
1726  tdd_bounds.is_null = is_null_geo;
1727  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1728  }
1729 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_geo_physical_import_buffer_columnar ( const Catalog_Namespace::Catalog catalog,
const ColumnDescriptor cd,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
std::vector< std::vector< double >> &  coords_column,
std::vector< std::vector< double >> &  bounds_column,
std::vector< std::vector< int >> &  ring_sizes_column,
std::vector< std::vector< int >> &  poly_rings_column 
)
static

Definition at line 1731 of file Importer.cpp.

References CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Geospatial::compress_coords(), SQLTypeInfo::get_type(), Catalog_Namespace::Catalog::getMetadataForColumn(), Geospatial::is_null_point(), kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, NULL_ARRAY_DOUBLE, NULL_DOUBLE, and ColumnDescriptor::tableId.

Referenced by DBHandler::fillGeoColumns().

1739  {
1740  const auto col_ti = cd->columnType;
1741  const auto col_type = col_ti.get_type();
1742  auto columnId = cd->columnId;
1743 
1744  auto coords_row_count = coords_column.size();
1745  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1746  for (auto& coords : coords_column) {
1747  bool is_null_geo = false;
1748  bool is_null_point = false;
1749  if (!col_ti.get_notnull()) {
1750  // Check for NULL geo
1751  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1752  is_null_point = true;
1753  coords.clear();
1754  }
1755  is_null_geo = coords.empty();
1756  if (is_null_point) {
1757  coords.push_back(NULL_ARRAY_DOUBLE);
1758  coords.push_back(NULL_DOUBLE);
1759  // Treating POINT coords as notnull, need to store actual encoding
1760  // [un]compressed+[not]null
1761  is_null_geo = false;
1762  }
1763  }
1764  std::vector<TDatum> td_coords_data;
1765  if (!is_null_geo) {
1766  std::vector<uint8_t> compressed_coords =
1767  Geospatial::compress_coords(coords, col_ti);
1768  for (auto const& cc : compressed_coords) {
1769  TDatum td_byte;
1770  td_byte.val.int_val = cc;
1771  td_coords_data.push_back(td_byte);
1772  }
1773  }
1774  TDatum tdd_coords;
1775  tdd_coords.val.arr_val = td_coords_data;
1776  tdd_coords.is_null = is_null_geo;
1777  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1778  }
1779  col_idx++;
1780 
1781  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1782  if (ring_sizes_column.size() != coords_row_count) {
1783  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1784  }
1785  // Create [linest[ring_sizes array value and add it to the physical column
1786  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1787  for (auto const& ring_sizes : ring_sizes_column) {
1788  bool is_null_geo = false;
1789  if (!col_ti.get_notnull()) {
1790  // Check for NULL geo
1791  is_null_geo = ring_sizes.empty();
1792  }
1793  std::vector<TDatum> td_ring_sizes;
1794  for (auto const& ring_size : ring_sizes) {
1795  TDatum td_ring_size;
1796  td_ring_size.val.int_val = ring_size;
1797  td_ring_sizes.push_back(td_ring_size);
1798  }
1799  TDatum tdd_ring_sizes;
1800  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1801  tdd_ring_sizes.is_null = is_null_geo;
1802  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1803  }
1804  col_idx++;
1805  }
1806 
1807  if (col_type == kMULTIPOLYGON) {
1808  if (poly_rings_column.size() != coords_row_count) {
1809  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1810  }
1811  // Create poly_rings array value and add it to the physical column
1812  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1813  for (auto const& poly_rings : poly_rings_column) {
1814  bool is_null_geo = false;
1815  if (!col_ti.get_notnull()) {
1816  // Check for NULL geo
1817  is_null_geo = poly_rings.empty();
1818  }
1819  std::vector<TDatum> td_poly_rings;
1820  for (auto const& num_rings : poly_rings) {
1821  TDatum td_num_rings;
1822  td_num_rings.val.int_val = num_rings;
1823  td_poly_rings.push_back(td_num_rings);
1824  }
1825  TDatum tdd_poly_rings;
1826  tdd_poly_rings.val.arr_val = td_poly_rings;
1827  tdd_poly_rings.is_null = is_null_geo;
1828  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1829  }
1830  col_idx++;
1831  }
1832 
1833  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1834  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1835  if (bounds_column.size() != coords_row_count) {
1836  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1837  }
1838  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1839  for (auto const& bounds : bounds_column) {
1840  bool is_null_geo = false;
1841  if (!col_ti.get_notnull()) {
1842  // Check for NULL geo
1843  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1844  }
1845  std::vector<TDatum> td_bounds_data;
1846  for (auto const& b : bounds) {
1847  TDatum td_double;
1848  td_double.val.real_val = b;
1849  td_bounds_data.push_back(td_double);
1850  }
1851  TDatum tdd_bounds;
1852  tdd_bounds.val.arr_val = td_bounds_data;
1853  tdd_bounds.is_null = is_null_geo;
1854  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1855  }
1856  col_idx++;
1857  }
1858 }
#define NULL_DOUBLE
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
std::vector< uint8_t > compress_coords(const std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define NULL_ARRAY_DOUBLE
#define CHECK(condition)
Definition: Logger.h:291
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void import_export::Importer::set_import_status ( const std::string &  id,
const ImportStatus  is 
)
static

Definition at line 240 of file Importer.cpp.

References import_export::ImportStatus::elapsed, import_export::ImportStatus::end, import_id, import_export::import_status_map, import_export::ImportStatus::start, and import_export::status_mutex.

Referenced by importDelimited(), importGDALGeo(), importGDALRaster(), import_export::ForeignDataImporter::importGeneralS3(), and anonymous_namespace{ForeignDataImporter.cpp}::load_foreign_data_buffers().

240  {
242  is.end = std::chrono::steady_clock::now();
243  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
245 }
std::lock_guard< T > lock_guard
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:167
static heavyai::shared_mutex status_mutex
Definition: Importer.cpp:166
std::string import_id
Definition: Importer.h:895

+ Here is the caller graph for this function:

Member Data Documentation

char* import_export::Importer::buffer[2]
private

Definition at line 898 of file Importer.h.

Referenced by Importer(), and ~Importer().

size_t import_export::Importer::file_size
private

Definition at line 896 of file Importer.h.

Referenced by importDelimited(), and Importer().

std::vector<std::vector<std::unique_ptr<TypedImportBuffer> > > import_export::Importer::import_buffers_vec
private
std::mutex import_export::Importer::init_gdal_mutex
staticprivate

Definition at line 902 of file Importer.h.

std::unique_ptr<bool[]> import_export::Importer::is_array_a
private

Definition at line 901 of file Importer.h.

Referenced by get_is_array(), and Importer().

std::unique_ptr<Loader> import_export::Importer::loader
private
size_t import_export::Importer::max_threads
private

Definition at line 897 of file Importer.h.

Referenced by importDelimited(), Importer(), importGDALGeo(), and importGDALRaster().


The documentation for this class was generated from the following files: