OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::LazyParquetChunkLoader Class Reference

#include <LazyParquetChunkLoader.h>

+ Collaboration diagram for foreign_storage::LazyParquetChunkLoader:

Public Member Functions

 LazyParquetChunkLoader (std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache, const ForeignTable *foreign_table)
 
std::list< std::unique_ptr
< ChunkMetadata > > 
loadChunk (const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
 
std::list< RowGroupMetadatametadataScan (const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
 Perform a metadata scan for the paths specified. More...
 
std::pair< size_t, size_t > loadRowGroups (const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries, const int num_threads=1)
 Load row groups of data into given chunks. More...
 
DataPreview previewFiles (const std::vector< std::string > &files, const size_t max_num_rows, const ForeignTable &table)
 Preview rows of data and column types in a set of files. More...
 

Static Public Member Functions

static bool isColumnMappingSupported (const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
 

Static Public Attributes

static const int batch_reader_num_elements = 4096
 

Private Member Functions

std::list< std::unique_ptr
< ChunkMetadata > > 
appendRowGroups (const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
 

Static Private Member Functions

static SQLTypeInfo suggestColumnMapping (const parquet::ColumnDescriptor *parquet_column)
 

Private Attributes

std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
FileReaderMapfile_reader_cache_
 
const ForeignTableforeign_table_
 

Detailed Description

A lazy parquet to chunk loader

Definition at line 37 of file LazyParquetChunkLoader.h.

Constructor & Destructor Documentation

foreign_storage::LazyParquetChunkLoader::LazyParquetChunkLoader ( std::shared_ptr< arrow::fs::FileSystem >  file_system,
FileReaderMap file_reader_cache,
const ForeignTable foreign_table 
)

Definition at line 2083 of file LazyParquetChunkLoader.cpp.

References CHECK, and foreign_table_.

2087  : file_system_(file_system)
2088  , file_reader_cache_(file_map)
2089  , foreign_table_(foreign_table) {
2090  CHECK(foreign_table_) << "LazyParquetChunkLoader: null Foreign Table ptr";
2091 }
#define CHECK(condition)
Definition: Logger.h:291
std::shared_ptr< arrow::fs::FileSystem > file_system_

Member Function Documentation

std::list< std::unique_ptr< ChunkMetadata > > foreign_storage::LazyParquetChunkLoader::appendRowGroups ( const std::vector< RowGroupInterval > &  row_group_intervals,
const int  parquet_column_index,
const ColumnDescriptor column_descriptor,
std::list< Chunk_NS::Chunk > &  chunks,
StringDictionary string_dictionary,
RejectedRowIndices rejected_row_indices,
const bool  is_for_detect = false,
const std::optional< int64_t >  max_levels_read = std::nullopt 
)
private

Definition at line 1828 of file LazyParquetChunkLoader.cpp.

References batch_reader_num_elements, CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::create_parquet_encoder(), DEBUG_TIMER, Timer< TimeT >::elapsed(), file_reader_cache_, file_system_, foreign_table_, foreign_storage::ForeignTable::GEO_VALIDATE_GEOMETRY_KEY, foreign_storage::get_column_descriptor(), foreign_storage::get_parquet_table_size(), foreign_storage::OptionsContainer::getOptionAsBool(), foreign_storage::FileReaderMap::getOrInsert(), SQLTypeInfo::is_array(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::resize_values_buffer(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::set_definition_levels_for_zero_max_definition_level_case(), Timer< TimeT >::start(), Timer< TimeT >::stop(), TableDescriptor::tableName, to_string(), foreign_storage::validate_equal_column_descriptor(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_list_column_metadata_statistics(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_max_repetition_and_definition_level(), and VLOG.

Referenced by loadChunk(), and previewFiles().

1836  {
1837  auto timer = DEBUG_TIMER(__func__);
1838  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1839  // `def_levels` and `rep_levels` below are used to store the read definition
1840  // and repetition levels of the Dremel encoding implemented by the Parquet
1841  // format
1842  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1843  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1844  std::vector<int8_t> values;
1845 
1846  // Timing information used in logging
1847  Timer<> summary_timer;
1848  Timer<> initialization_timer_ms;
1849  Timer<> validation_timer_ms;
1850  Timer<> parquet_read_timer_ms;
1851  Timer<> encoding_timer_ms;
1852  size_t total_row_groups_read = 0;
1853 
1854  summary_timer.start();
1855 
1856  initialization_timer_ms.start();
1857  CHECK(!row_group_intervals.empty());
1858  const auto& first_file_path = row_group_intervals.front().file_path;
1859 
1860  auto first_file_reader = file_reader_cache_->getOrInsert(first_file_path, file_system_);
1861  auto first_parquet_column_descriptor =
1862  get_column_descriptor(first_file_reader, parquet_column_index);
1863  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
1864 
1865  const bool geo_validate_geometry =
1867  auto encoder = create_parquet_encoder(column_descriptor,
1868  first_parquet_column_descriptor,
1869  chunks,
1870  string_dictionary,
1871  chunk_metadata,
1872  false,
1873  false,
1874  is_for_detect,
1875  geo_validate_geometry);
1876  CHECK(encoder.get());
1877 
1878  if (rejected_row_indices) { // error tracking is enabled
1879  encoder->initializeErrorTracking();
1880  }
1881  encoder->initializeColumnType(column_descriptor->columnType);
1882  initialization_timer_ms.stop();
1883 
1884  bool early_exit = false;
1885  int64_t total_rows_read = 0;
1886  for (const auto& row_group_interval : row_group_intervals) {
1887  initialization_timer_ms.start();
1888  const auto& file_path = row_group_interval.file_path;
1889  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
1890 
1891  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1892  CHECK(row_group_interval.start_index >= 0 &&
1893  row_group_interval.end_index < num_row_groups);
1894  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1895 
1896  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1897  auto parquet_column_descriptor =
1898  get_column_descriptor(file_reader, parquet_column_index);
1899 
1900  initialization_timer_ms.stop();
1901 
1902  validation_timer_ms.start();
1903  validate_equal_column_descriptor(first_parquet_column_descriptor,
1904  parquet_column_descriptor,
1905  first_file_path,
1906  file_path);
1907 
1909  parquet_column_descriptor);
1911  def_levels);
1912  validation_timer_ms.stop();
1913 
1914  int64_t values_read = 0;
1915  for (int row_group_index = row_group_interval.start_index;
1916  row_group_index <= row_group_interval.end_index;
1917  ++row_group_index) {
1918  total_row_groups_read++;
1919  parquet_read_timer_ms.start();
1920  auto group_reader = parquet_reader->RowGroup(row_group_index);
1921  std::shared_ptr<parquet::ColumnReader> col_reader =
1922  group_reader->Column(parquet_column_index);
1923  parquet_read_timer_ms.stop();
1924 
1925  try {
1926  while (col_reader->HasNext()) {
1927  parquet_read_timer_ms.start();
1928  int64_t levels_read =
1930  def_levels.data(),
1931  rep_levels.data(),
1932  reinterpret_cast<uint8_t*>(values.data()),
1933  &values_read,
1934  col_reader.get());
1935  parquet_read_timer_ms.stop();
1936 
1937  encoding_timer_ms.start();
1938  if (rejected_row_indices) { // error tracking is enabled
1939  encoder->appendDataTrackErrors(def_levels.data(),
1940  rep_levels.data(),
1941  values_read,
1942  levels_read,
1943  values.data());
1944  } else { // no error tracking enabled
1946  parquet_reader, // this validation only in effect for foreign tables
1947  row_group_index,
1948  parquet_column_index,
1949  def_levels.data(),
1950  levels_read,
1951  parquet_column_descriptor);
1952 
1953  encoder->appendData(def_levels.data(),
1954  rep_levels.data(),
1955  values_read,
1956  levels_read,
1957  values.data());
1958  }
1959  encoding_timer_ms.stop();
1960 
1961  if (max_rows_to_read.has_value()) {
1962  if (column_descriptor->columnType.is_array()) {
1963  auto array_encoder =
1964  dynamic_cast<ParquetArrayDetectEncoder*>(encoder.get());
1965  CHECK(array_encoder);
1966  total_rows_read = array_encoder->getArraysCount();
1967  } else {
1968  // For scalar types it is safe to assume the number of levels read is equal
1969  // to the number of rows read
1970  total_rows_read += levels_read;
1971  }
1972 
1973  if (total_rows_read >= max_rows_to_read.value()) {
1974  early_exit = true;
1975  break;
1976  }
1977  }
1978  }
1979  encoding_timer_ms.start();
1980  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1981  array_encoder->finalizeRowGroup();
1982  }
1983  encoding_timer_ms.stop();
1984  } catch (const std::exception& error) {
1985  // check for a specific error to detect a possible unexpected switch of data
1986  // source in order to respond with informative error message
1987  if (boost::regex_search(error.what(),
1988  boost::regex{"Deserializing page header failed."})) {
1989  throw ForeignStorageException(
1990  "Unable to read from foreign data source, possible cause is an unexpected "
1991  "change of source. Please use the \"REFRESH FOREIGN TABLES\" command on "
1992  "the "
1993  "foreign table "
1994  "if data source has been updated. Foreign table: " +
1996  }
1997 
1998  throw ForeignStorageException(
1999  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
2000  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
2001  "', Parquet file: '" + file_path + "'");
2002  }
2003  if (max_rows_to_read.has_value() && early_exit) {
2004  break;
2005  }
2006  }
2007  if (max_rows_to_read.has_value() && early_exit) {
2008  break;
2009  }
2010  }
2011 
2012  encoding_timer_ms.start();
2013  if (rejected_row_indices) { // error tracking is enabled
2014  *rejected_row_indices = encoder->getRejectedRowIndices();
2015  }
2016  encoding_timer_ms.stop();
2017 
2018  summary_timer.stop();
2019 
2020  VLOG(1) << "Appended " << total_row_groups_read
2021  << " row groups to chunk. Column: " << column_descriptor->columnName
2022  << ", Column id: " << column_descriptor->columnId << ", Parquet column: "
2023  << first_parquet_column_descriptor->path()->ToDotString();
2024  VLOG(1) << "Runtime summary:";
2025  VLOG(1) << " Parquet chunk loading total time: " << summary_timer.elapsed() << "ms";
2026  VLOG(1) << " Parquet encoder initialization time: " << initialization_timer_ms.elapsed()
2027  << "ms";
2028  VLOG(1) << " Parquet metadata validation time: " << validation_timer_ms.elapsed()
2029  << "ms";
2030  VLOG(1) << " Parquet column read time: " << parquet_read_timer_ms.elapsed() << "ms";
2031  VLOG(1) << " Parquet data conversion time: " << encoding_timer_ms.elapsed() << "ms";
2032 
2033  return chunk_metadata;
2034 }
Definition: measure.h:55
std::string tableName
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
void stop()
Definition: measure.h:64
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect, const bool geo_validate_geometry)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
void validate_list_column_metadata_statistics(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::string to_string(char const *&&v)
void set_definition_levels_for_zero_max_definition_level_case(const parquet::ColumnDescriptor *parquet_column_descriptor, std::vector< int16_t > &def_levels)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:70
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
Definition: ForeignTable.h:49
TimeT::rep elapsed()
Definition: measure.h:72
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
SQLTypeInfo columnType
void start()
Definition: measure.h:59
std::string columnName
bool getOptionAsBool(const std::string_view &key) const
bool is_array() const
Definition: sqltypes.h:585
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::LazyParquetChunkLoader::isColumnMappingSupported ( const ColumnDescriptor omnisci_column,
const parquet::ColumnDescriptor *  parquet_column 
)
static

Determine if a Parquet to OmniSci column mapping is supported.

Parameters
omnisci_column- the column descriptor of the OmniSci column
parquet_column- the column descriptor of the Parquet column
Returns
true if the column mapping is supported by LazyParquetChunkLoader, false otherwise

Definition at line 2048 of file LazyParquetChunkLoader.cpp.

References CHECK, ColumnDescriptor::columnType, SQLTypeInfo::is_array(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_date_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_decimal_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_floating_point_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_geospatial_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_integral_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_none_type_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_string_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_time_mapping(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_timestamp_mapping().

Referenced by foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_allowed_mapping().

2050  {
2051  CHECK(!omnisci_column->columnType.is_array())
2052  << "isColumnMappingSupported should not be called on arrays";
2053  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
2054  return true;
2055  }
2056  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
2057  return true;
2058  }
2059  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
2060  return true;
2061  }
2062  if (validate_integral_mapping(omnisci_column, parquet_column)) {
2063  return true;
2064  }
2065  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
2066  return true;
2067  }
2068  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
2069  return true;
2070  }
2071  if (validate_time_mapping(omnisci_column, parquet_column)) {
2072  return true;
2073  }
2074  if (validate_date_mapping(omnisci_column, parquet_column)) {
2075  return true;
2076  }
2077  if (validate_string_mapping(omnisci_column, parquet_column)) {
2078  return true;
2079  }
2080  return false;
2081 }
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
#define CHECK(condition)
Definition: Logger.h:291
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
SQLTypeInfo columnType
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool is_array() const
Definition: sqltypes.h:585

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::list< std::unique_ptr< ChunkMetadata > > foreign_storage::LazyParquetChunkLoader::loadChunk ( const std::vector< RowGroupInterval > &  row_group_intervals,
const int  parquet_column_index,
std::list< Chunk_NS::Chunk > &  chunks,
StringDictionary string_dictionary = nullptr,
RejectedRowIndices rejected_row_indices = nullptr 
)

Load a number of row groups of a column in a parquet file into a chunk

Parameters
row_group_interval- an inclusive interval [start,end] that specifies row groups to load
parquet_column_index- the logical column index in the parquet file (and omnisci db) of column to load
chunks- a list containing the chunks to load
string_dictionary- a string dictionary for the column corresponding to the column, if applicable
rejected_row_indices- optional, if specified errors will be tracked in this data structure while loading
Returns
An empty list when no metadata update is applicable, otherwise a list of ChunkMetadata shared pointers with which to update the corresponding column chunk metadata.

NOTE: if more than one chunk is supplied, the first chunk is required to be the chunk corresponding to the logical column, while the remaining chunks correspond to physical columns (in ascending order of column id.) Similarly, if a metada update is expected, the list of ChunkMetadata shared pointers returned will correspond directly to the list chunks.

Definition at line 2093 of file LazyParquetChunkLoader.cpp.

References appendRowGroups(), and CHECK.

Referenced by foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader().

2098  {
2099  CHECK(!chunks.empty());
2100  auto const& chunk = *chunks.begin();
2101  auto column_descriptor = chunk.getColumnDesc();
2102  auto buffer = chunk.getBuffer();
2103  CHECK(buffer);
2104 
2105  try {
2106  auto metadata = appendRowGroups(row_group_intervals,
2107  parquet_column_index,
2108  column_descriptor,
2109  chunks,
2110  string_dictionary,
2111  rejected_row_indices);
2112  return metadata;
2113  } catch (const std::exception& error) {
2114  throw ForeignStorageException(error.what());
2115  }
2116 
2117  return {};
2118 }
#define CHECK(condition)
Definition: Logger.h:291
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< size_t, size_t > foreign_storage::LazyParquetChunkLoader::loadRowGroups ( const RowGroupInterval row_group_interval,
const std::map< int, Chunk_NS::Chunk > &  chunks,
const ForeignTableSchema schema,
const std::map< int, StringDictionary * > &  column_dictionaries,
const int  num_threads = 1 
)

Load row groups of data into given chunks.

Parameters
row_group_interval- specifies which row groups to load
chunks- map of column index to chunk which data will be loaded into
schema- schema of the foreign table to perform metadata scan for
column_dictionaries- a map of string dictionaries for columns that require it
num_threads- number of threads to utilize while reading (if applicale)
Returns
[num_rows_completed,num_rows_rejected] - returns number of rows loaded and rejected while loading

Note that only logical chunks are expected because the data is read into an intermediate form into the underlying buffers. This member is intended to be used for import.

NOTE: Currently, loading one row group at a time is required.

Definition at line 2203 of file LazyParquetChunkLoader.cpp.

References threading_serial::async(), CHECK, DEBUG_TIMER, foreign_storage::RowGroupInterval::end_index, foreign_storage::RowGroupInterval::file_path, file_system_, foreign_table_, foreign_storage::ForeignTable::GEO_VALIDATE_GEOMETRY_KEY, shared::get_from_map(), foreign_storage::get_parquet_table_size(), foreign_storage::ForeignTableSchema::getColumnDescriptor(), foreign_storage::ForeignTableSchema::getLogicalColumns(), foreign_storage::OptionsContainer::getOptionAsBool(), foreign_storage::ForeignTableSchema::getParquetColumnIndex(), foreign_storage::open_parquet_table(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::populate_encoder_map_for_import(), foreign_storage::RowGroupInterval::start_index, logger::thread_id(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_allowed_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_max_repetition_and_definition_level(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_number_of_columns().

2208  {
2209  auto timer = DEBUG_TIMER(__func__);
2210 
2211  const auto& file_path = row_group_interval.file_path;
2212 
2213  // do not use caching with file-readers, open a new one for every request
2214  auto file_reader_owner = open_parquet_table(file_path, file_system_);
2215  auto file_reader = file_reader_owner.get();
2216  auto file_metadata = file_reader->parquet_reader()->metadata();
2217 
2218  validate_number_of_columns(file_metadata, file_path, schema);
2219 
2220  // check for fixed length encoded columns and indicate to the user
2221  // they should not be used
2222  for (const auto column_descriptor : schema.getLogicalColumns()) {
2223  auto parquet_column_index = schema.getParquetColumnIndex(column_descriptor->columnId);
2224  auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
2225  try {
2226  validate_allowed_mapping(parquet_column, column_descriptor);
2227  } catch (std::runtime_error& e) {
2228  std::stringstream error_message;
2229  error_message << e.what()
2230  << " Parquet column: " << parquet_column->path()->ToDotString()
2231  << ", HeavyDB column: " << column_descriptor->columnName
2232  << ", Parquet file: " << file_path << ".";
2233  throw std::runtime_error(error_message.str());
2234  }
2235  }
2236 
2237  CHECK(row_group_interval.start_index == row_group_interval.end_index);
2238  auto row_group_index = row_group_interval.start_index;
2239  std::map<int, ParquetRowGroupReader> row_group_reader_map;
2240 
2241  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
2242  auto group_reader = parquet_reader->RowGroup(row_group_index);
2243 
2244  std::vector<InvalidRowGroupIndices> invalid_indices_per_thread(num_threads);
2245 
2246  const bool geo_validate_geometry =
2248  auto encoder_map = populate_encoder_map_for_import(chunks,
2249  schema,
2250  file_reader,
2251  column_dictionaries,
2252  group_reader->metadata()->num_rows(),
2253  geo_validate_geometry);
2254 
2255  std::vector<std::set<int>> partitions(num_threads);
2256  std::map<int, int> column_id_to_thread;
2257  for (auto& [column_id, encoder] : encoder_map) {
2258  auto thread_id = column_id % num_threads;
2259  column_id_to_thread[column_id] = thread_id;
2260  partitions[thread_id].insert(column_id);
2261  }
2262 
2263  for (auto& [column_id, encoder] : encoder_map) {
2264  const auto& column_descriptor = schema.getColumnDescriptor(column_id);
2265  const auto parquet_column_index = schema.getParquetColumnIndex(column_id);
2266  auto parquet_column_descriptor =
2267  file_metadata->schema()->Column(parquet_column_index);
2268 
2269  // validate
2270  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
2271  CHECK(row_group_interval.start_index >= 0 &&
2272  row_group_interval.end_index < num_row_groups);
2273  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
2275  parquet_column_descriptor);
2276 
2277  std::shared_ptr<parquet::ColumnReader> col_reader =
2278  group_reader->Column(parquet_column_index);
2279 
2280  row_group_reader_map.insert(
2281  {column_id,
2282  ParquetRowGroupReader(col_reader,
2283  column_descriptor,
2284  parquet_column_descriptor,
2285  shared::get_from_map(encoder_map, column_id).get(),
2286  invalid_indices_per_thread[shared::get_from_map(
2287  column_id_to_thread, column_id)],
2288  row_group_index,
2289  parquet_column_index,
2290  parquet_reader)});
2291  }
2292 
2293  std::vector<std::future<void>> futures;
2294  for (int ithread = 0; ithread < num_threads; ++ithread) {
2295  auto column_ids_for_thread = partitions[ithread];
2296  futures.emplace_back(
2297  std::async(std::launch::async, [&row_group_reader_map, column_ids_for_thread] {
2298  for (const auto column_id : column_ids_for_thread) {
2299  shared::get_from_map(row_group_reader_map, column_id)
2300  .readAndValidateRowGroup(); // reads and validate entire row group per
2301  // column
2302  }
2303  }));
2304  }
2305 
2306  for (auto& future : futures) {
2307  future.wait();
2308  }
2309 
2310  for (auto& future : futures) {
2311  future.get();
2312  }
2313 
2314  // merge/reduce invalid indices
2315  InvalidRowGroupIndices invalid_indices;
2316  for (auto& thread_invalid_indices : invalid_indices_per_thread) {
2317  invalid_indices.merge(thread_invalid_indices);
2318  }
2319 
2320  for (auto& [_, reader] : row_group_reader_map) {
2321  reader.eraseInvalidRowGroupData(
2322  invalid_indices); // removes invalid encoded data in buffers
2323  }
2324 
2325  // update the element count for each encoder
2326  for (const auto column_descriptor : schema.getLogicalColumns()) {
2327  auto column_id = column_descriptor->columnId;
2328  auto db_encoder = shared::get_from_map(chunks, column_id).getBuffer()->getEncoder();
2329  CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
2330  invalid_indices.size());
2331  size_t updated_num_elems = db_encoder->getNumElems() +
2332  group_reader->metadata()->num_rows() -
2333  invalid_indices.size();
2334  db_encoder->setNumElems(updated_num_elems);
2335  if (column_descriptor->columnType.is_geometry()) {
2336  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
2337  auto db_encoder =
2338  shared::get_from_map(chunks, column_id + i + 1).getBuffer()->getEncoder();
2339  db_encoder->setNumElems(updated_num_elems);
2340  }
2341  }
2342  }
2343 
2344  return {group_reader->metadata()->num_rows() - invalid_indices.size(),
2345  invalid_indices.size()};
2346 }
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
future< Result > async(Fn &&fn, Args &&...args)
std::set< int64_t > InvalidRowGroupIndices
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
Definition: ForeignTable.h:49
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries, const int64_t num_rows, const bool geo_validate_geometry)
ThreadId thread_id()
Definition: Logger.cpp:879
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool getOptionAsBool(const std::string_view &key) const

+ Here is the call graph for this function:

std::list< RowGroupMetadata > foreign_storage::LazyParquetChunkLoader::metadataScan ( const std::vector< std::string > &  file_paths,
const ForeignTableSchema schema,
const bool  do_metadata_stats_validation = true 
)

Perform a metadata scan for the paths specified.

Parameters
file_paths- (ordered) files of the metadata scan
schema- schema of the foreign table to perform metadata scan for
do_metadata_stats_validation- validate stats in metadata of parquet files if true
Returns
a list of the row group metadata extracted from file_paths

Definition at line 2514 of file LazyParquetChunkLoader.cpp.

References threading_serial::async(), CHECK, DEBUG_TIMER, Timer< TimeT >::elapsed(), file_reader_cache_, file_system_, foreign_table_, foreign_storage::ForeignTable::GEO_VALIDATE_GEOMETRY_KEY, foreign_storage::get_num_threads(), foreign_storage::get_parquet_table_size(), foreign_storage::ForeignTableSchema::getLogicalAndPhysicalColumns(), foreign_storage::OptionsContainer::getOptionAsBool(), foreign_storage::FileReaderMap::initializeIfEmpty(), foreign_storage::FileReaderMap::insert(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::metadata_scan_rowgroup_interval(), foreign_storage::partition_for_threads(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::populate_encoder_map_for_metadata_scan(), Timer< TimeT >::start(), Timer< TimeT >::stop(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::throw_row_group_larger_than_fragment_size_error(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_equal_schema(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_parquet_metadata(), and VLOG.

Referenced by foreign_storage::ParquetDataWrapper::getRowGroupMetadataForFilePaths().

2517  {
2518  auto timer = DEBUG_TIMER(__func__);
2519  auto column_interval =
2520  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
2521  schema.getLogicalAndPhysicalColumns().back()->columnId};
2522  CHECK(!file_paths.empty());
2523 
2524  // The encoder map needs to be populated before we can start scanning rowgroups, so we
2525  // peel the first file_path out of the async loop below to perform population.
2526  const auto& first_path = *file_paths.begin();
2527  auto first_reader = file_reader_cache_->insert(first_path, file_system_);
2528  auto max_row_group_stats =
2529  validate_parquet_metadata(first_reader->parquet_reader()->metadata(),
2530  first_path,
2531  schema,
2532  do_metadata_stats_validation);
2533 
2534  // Iterate asynchronously over any paths beyond the first.
2535  auto table_ptr = schema.getForeignTable();
2536  CHECK(table_ptr);
2537  auto num_threads = foreign_storage::get_num_threads(*table_ptr);
2538  VLOG(1) << "Metadata scan using " << num_threads << " threads";
2539 
2540  const bool geo_validate_geometry =
2542  auto encoder_map = populate_encoder_map_for_metadata_scan(column_interval,
2543  schema,
2544  first_reader,
2545  do_metadata_stats_validation,
2546  geo_validate_geometry);
2547  const auto num_row_groups = get_parquet_table_size(first_reader).first;
2548  VLOG(1) << "Starting metadata scan of path " << first_path;
2549  auto row_group_metadata = metadata_scan_rowgroup_interval(
2550  encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
2551  VLOG(1) << "Completed metadata scan of path " << first_path;
2552 
2553  // We want each (filepath->FileReader) pair in the cache to be initialized before we
2554  // multithread so that we are not adding keys in a concurrent environment, so we add
2555  // cache entries for each path and initialize to an empty unique_ptr if the file has not
2556  // yet been opened.
2557  // Since we have already performed the first iteration, we skip it in the thread groups
2558  // so as not to process it twice.
2559  std::vector<std::string> cache_subset;
2560  for (auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
2562  cache_subset.emplace_back(*path_it);
2563  }
2564 
2565  auto paths_per_thread = partition_for_threads(cache_subset, num_threads);
2566  std::vector<std::future<std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats>>>
2567  futures;
2568  for (const auto& path_group : paths_per_thread) {
2569  futures.emplace_back(std::async(
2571  [&](const auto& paths, const auto& file_reader_cache)
2572  -> std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats> {
2573  Timer<> summary_timer;
2574  Timer<> get_or_insert_reader_timer_ms;
2575  Timer<> validation_timer_ms;
2576  Timer<> metadata_scan_timer;
2577 
2578  summary_timer.start();
2579 
2580  std::list<RowGroupMetadata> reduced_metadata;
2581  MaxRowGroupSizeStats max_row_group_stats{0, 0};
2582  for (const auto& path : paths.get()) {
2583  get_or_insert_reader_timer_ms.start();
2584  auto reader = file_reader_cache.get().getOrInsert(path, file_system_);
2585  get_or_insert_reader_timer_ms.stop();
2586 
2587  validation_timer_ms.start();
2588  validate_equal_schema(first_reader, reader, first_path, path);
2589  auto local_max_row_group_stats =
2590  validate_parquet_metadata(reader->parquet_reader()->metadata(),
2591  path,
2592  schema,
2593  do_metadata_stats_validation);
2594  if (local_max_row_group_stats.max_row_group_size >
2595  max_row_group_stats.max_row_group_size) {
2596  max_row_group_stats = local_max_row_group_stats;
2597  }
2598  validation_timer_ms.stop();
2599 
2600  VLOG(1) << "Starting metadata scan of path " << path;
2601 
2602  metadata_scan_timer.start();
2603  const auto num_row_groups = get_parquet_table_size(reader).first;
2604  const auto interval = RowGroupInterval{path, 0, num_row_groups - 1};
2605  reduced_metadata.splice(
2606  reduced_metadata.end(),
2607  metadata_scan_rowgroup_interval(encoder_map, interval, reader, schema));
2608  metadata_scan_timer.stop();
2609 
2610  VLOG(1) << "Completed metadata scan of path " << path;
2611  }
2612 
2613  summary_timer.stop();
2614 
2615  VLOG(1) << "Runtime summary:";
2616  VLOG(1) << " Parquet metadata scan total time: " << summary_timer.elapsed()
2617  << "ms";
2618  VLOG(1) << " Parquet file reader opening time: "
2619  << get_or_insert_reader_timer_ms.elapsed() << "ms";
2620  VLOG(1) << " Parquet metadata validation time: "
2621  << validation_timer_ms.elapsed() << "ms";
2622  VLOG(1) << " Parquet metadata processing time: "
2623  << validation_timer_ms.elapsed() << "ms";
2624 
2625  return {reduced_metadata, max_row_group_stats};
2626  },
2627  std::ref(path_group),
2628  std::ref(*file_reader_cache_)));
2629  }
2630 
2631  // Reduce all the row_group results.
2632  for (auto& future : futures) {
2633  auto [metadata, local_max_row_group_stats] = future.get();
2634  row_group_metadata.splice(row_group_metadata.end(), metadata);
2635  if (local_max_row_group_stats.max_row_group_size >
2636  max_row_group_stats.max_row_group_size) {
2637  max_row_group_stats = local_max_row_group_stats;
2638  }
2639  }
2640 
2641  if (max_row_group_stats.max_row_group_size > schema.getForeignTable()->maxFragRows) {
2643  max_row_group_stats, schema.getForeignTable()->maxFragRows);
2644  }
2645 
2646  return row_group_metadata;
2647 }
Definition: measure.h:55
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: FsiChunkUtils.h:41
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
void stop()
Definition: measure.h:64
size_t get_num_threads(const ForeignTable &table)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
future< Result > async(Fn &&fn, Args &&...args)
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
Definition: ForeignTable.h:49
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:79
void initializeIfEmpty(const std::string &path)
Definition: ParquetShared.h:86
TimeT::rep elapsed()
Definition: measure.h:72
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader, const bool do_metadata_stats_validation, const bool geo_validate_geometry)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void start()
Definition: measure.h:59
MaxRowGroupSizeStats validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema, const bool do_metadata_stats_validation)
void throw_row_group_larger_than_fragment_size_error(const MaxRowGroupSizeStats max_row_group_stats, const int fragment_size)
bool getOptionAsBool(const std::string_view &key) const
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

DataPreview foreign_storage::LazyParquetChunkLoader::previewFiles ( const std::vector< std::string > &  files,
const size_t  max_num_rows,
const ForeignTable table 
)

Preview rows of data and column types in a set of files.

Parameters
files- files to preview
max_num_rows- maximum number of rows to preview
table- foreign table for preview
Returns
a DataPreview instance that contains relevant preview information

Definition at line 2355 of file LazyParquetChunkLoader.cpp.

References appendRowGroups(), CHECK, CHECK_EQ, CHECK_GE, foreign_storage::PreviewContext::column_chunks, foreign_storage::PreviewContext::column_descriptors, foreign_storage::DataPreview::column_names, foreign_storage::DataPreview::column_types, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, foreign_storage::create_futures_for_workers(), foreign_storage::PreviewContext::detect_buffers, foreign_storage::detect_geo_type(), file_reader_cache_, file_system_, foreign_storage::get_num_threads(), foreign_storage::FileReaderMap::getOrInsert(), gpu_enabled::iota(), ColumnDescriptor::isSystemCol, ColumnDescriptor::isVirtualCol, kENCODING_NONE, foreign_storage::DataPreview::num_rejected_rows, foreign_storage::PreviewContext::rejected_row_indices_per_column, foreign_storage::DataPreview::sample_rows, suggestColumnMapping(), ColumnDescriptor::tableId, and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_equal_schema().

2357  {
2358  CHECK(!files.empty());
2359 
2360  auto first_file = *files.begin();
2361  auto first_file_reader = file_reader_cache_->getOrInsert(*files.begin(), file_system_);
2362 
2363  for (auto current_file_it = ++files.begin(); current_file_it != files.end();
2364  ++current_file_it) {
2365  auto file_reader = file_reader_cache_->getOrInsert(*current_file_it, file_system_);
2366  validate_equal_schema(first_file_reader, file_reader, first_file, *current_file_it);
2367  }
2368 
2369  auto first_file_metadata = first_file_reader->parquet_reader()->metadata();
2370  auto num_columns = first_file_metadata->num_columns();
2371 
2372  DataPreview data_preview;
2373  data_preview.num_rejected_rows = 0;
2374 
2375  auto current_file_it = files.begin();
2376  while (data_preview.sample_rows.size() < max_num_rows &&
2377  current_file_it != files.end()) {
2378  size_t total_num_rows = data_preview.sample_rows.size();
2379  size_t max_num_rows_to_append = max_num_rows - data_preview.sample_rows.size();
2380 
2381  // gather enough rows in row groups to produce required samples
2382  std::vector<RowGroupInterval> row_group_intervals;
2383  for (; current_file_it != files.end(); ++current_file_it) {
2384  const auto& file_path = *current_file_it;
2385  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
2386  auto file_metadata = file_reader->parquet_reader()->metadata();
2387  auto num_row_groups = file_metadata->num_row_groups();
2388  int end_row_group = 0;
2389  for (int i = 0; i < num_row_groups && total_num_rows < max_num_rows; ++i) {
2390  const size_t next_num_rows = file_metadata->RowGroup(i)->num_rows();
2391  total_num_rows += next_num_rows;
2392  end_row_group = i;
2393  }
2394  row_group_intervals.push_back(RowGroupInterval{file_path, 0, end_row_group});
2395  }
2396 
2397  PreviewContext preview_context;
2398  for (int i = 0; i < num_columns; ++i) {
2399  auto col = first_file_metadata->schema()->Column(i);
2400  ColumnDescriptor& cd = preview_context.column_descriptors.emplace_back();
2401  auto sql_type = LazyParquetChunkLoader::suggestColumnMapping(col);
2402  cd.columnType = sql_type;
2403  cd.columnName =
2404  sql_type.is_array() ? col->path()->ToDotVector()[0] + "_array" : col->name();
2405  cd.isSystemCol = false;
2406  cd.isVirtualCol = false;
2407  cd.tableId = -1;
2408  cd.columnId = i + 1;
2409  data_preview.column_names.emplace_back(cd.columnName);
2410  data_preview.column_types.emplace_back(sql_type);
2411  preview_context.detect_buffers.push_back(
2412  std::make_unique<TypedParquetDetectBuffer>());
2413  preview_context.rejected_row_indices_per_column.push_back(
2414  std::make_unique<RejectedRowIndices>());
2415  auto& detect_buffer = preview_context.detect_buffers.back();
2416  auto& chunk = preview_context.column_chunks.emplace_back(&cd);
2417  chunk.setPinnable(false);
2418  chunk.setBuffer(detect_buffer.get());
2419  }
2420 
2421  std::function<void(const std::vector<int>&)> append_row_groups_for_column =
2422  [&](const std::vector<int>& column_indices) {
2423  for (const auto& column_index : column_indices) {
2424  auto& chunk = preview_context.column_chunks[column_index];
2425  auto chunk_list = std::list<Chunk_NS::Chunk>{chunk};
2426  auto& rejected_row_indices =
2427  preview_context.rejected_row_indices_per_column[column_index];
2428  appendRowGroups(row_group_intervals,
2429  column_index,
2430  chunk.getColumnDesc(),
2431  chunk_list,
2432  nullptr,
2433  rejected_row_indices.get(),
2434  true,
2435  max_num_rows_to_append);
2436  }
2437  };
2438 
2439  auto num_threads = foreign_storage::get_num_threads(foreign_table);
2440 
2441  std::vector<int> columns(num_columns);
2442  std::iota(columns.begin(), columns.end(), 0);
2443  auto futures =
2444  create_futures_for_workers(columns, num_threads, append_row_groups_for_column);
2445  for (auto& future : futures) {
2446  future.wait();
2447  }
2448  for (auto& future : futures) {
2449  future.get();
2450  }
2451 
2452  // merge all `rejected_row_indices_per_column`
2453  auto rejected_row_indices = std::make_unique<RejectedRowIndices>();
2454  for (int i = 0; i < num_columns; ++i) {
2455  rejected_row_indices->insert(
2456  preview_context.rejected_row_indices_per_column[i]->begin(),
2457  preview_context.rejected_row_indices_per_column[i]->end());
2458  }
2459 
2460  size_t num_rows = 0;
2461  auto buffers_it = preview_context.detect_buffers.begin();
2462  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2463  CHECK(buffers_it != preview_context.detect_buffers.end());
2464  auto& strings = buffers_it->get()->getStrings();
2465  if (i == 0) {
2466  num_rows = strings.size();
2467  } else {
2468  CHECK_EQ(num_rows, strings.size());
2469  }
2470  }
2471 
2472  size_t num_rejected_rows = rejected_row_indices->size();
2473  data_preview.num_rejected_rows += num_rejected_rows;
2474  CHECK_GE(num_rows, num_rejected_rows);
2475  auto row_count = num_rows - num_rejected_rows;
2476 
2477  auto offset_row = data_preview.sample_rows.size();
2478  data_preview.sample_rows.resize(std::min(offset_row + row_count, max_num_rows));
2479 
2480  for (size_t irow = 0, rows_appended = 0;
2481  irow < num_rows && offset_row + rows_appended < max_num_rows;
2482  ++irow) {
2483  if (rejected_row_indices->find(irow) != rejected_row_indices->end()) {
2484  continue;
2485  }
2486  auto& row_data = data_preview.sample_rows[offset_row + rows_appended];
2487  row_data.resize(num_columns);
2488  auto buffers_it = preview_context.detect_buffers.begin();
2489  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2490  CHECK(buffers_it != preview_context.detect_buffers.end());
2491  auto& strings = buffers_it->get()->getStrings();
2492  row_data[i] = strings[irow];
2493  }
2494  ++rows_appended;
2495  }
2496  }
2497 
2498  // attempt to detect geo columns
2499  for (int i = 0; i < num_columns; ++i) {
2500  auto type_info = data_preview.column_types[i];
2501  if (type_info.is_string()) {
2502  auto tentative_geo_type =
2503  foreign_storage::detect_geo_type(data_preview.sample_rows, i);
2504  if (tentative_geo_type.has_value()) {
2505  data_preview.column_types[i].set_type(tentative_geo_type.value());
2506  data_preview.column_types[i].set_compression(kENCODING_NONE);
2507  }
2508  }
2509  }
2510 
2511  return data_preview;
2512 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::optional< SQLTypes > detect_geo_type(const SampleRows &sample_rows, size_t column_index)
Definition: DataPreview.cpp:22
size_t get_num_threads(const ForeignTable &table)
#define CHECK_GE(x, y)
Definition: Logger.h:306
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:70
specifies the content in-memory of a row in the column metadata table
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
#define CHECK(condition)
Definition: Logger.h:291
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
std::shared_ptr< arrow::fs::FileSystem > file_system_
SQLTypeInfo columnType
std::string columnName
static SQLTypeInfo suggestColumnMapping(const parquet::ColumnDescriptor *parquet_column)

+ Here is the call graph for this function:

SQLTypeInfo foreign_storage::LazyParquetChunkLoader::suggestColumnMapping ( const parquet::ColumnDescriptor *  parquet_column)
staticprivate

Suggest a possible Parquet to OmniSci column mapping based on heuristics.

Parameters
parquet_column- the column descriptor of the Parquet column
Returns
a supported OmniSci SQLTypeInfo given the Parquet column type

NOTE: the suggested type may be entirely inappropriate given a specific use-case; however, it is guaranteed to be an allowed mapping. For example, geo-types are never attempted to be detected and instead strings are always suggested in their place.

Definition at line 2036 of file LazyParquetChunkLoader.cpp.

References foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::is_valid_parquet_list_column(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::suggest_column_scalar_type(), and run_benchmark_import::type.

Referenced by previewFiles().

2037  {
2038  auto type = suggest_column_scalar_type(parquet_column);
2039 
2040  // array case
2041  if (is_valid_parquet_list_column(parquet_column)) {
2042  return type.get_array_type();
2043  }
2044 
2045  return type;
2046 }
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor *parquet_column)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

const int foreign_storage::LazyParquetChunkLoader::batch_reader_num_elements = 4096
static
FileReaderMap* foreign_storage::LazyParquetChunkLoader::file_reader_cache_
private

Definition at line 171 of file LazyParquetChunkLoader.h.

Referenced by appendRowGroups(), metadataScan(), and previewFiles().

std::shared_ptr<arrow::fs::FileSystem> foreign_storage::LazyParquetChunkLoader::file_system_
private
const ForeignTable* foreign_storage::LazyParquetChunkLoader::foreign_table_
private

The documentation for this class was generated from the following files: