OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::ParquetDataWrapper Class Reference

#include <ParquetDataWrapper.h>

+ Inheritance diagram for foreign_storage::ParquetDataWrapper:
+ Collaboration diagram for foreign_storage::ParquetDataWrapper:

Public Member Functions

 ParquetDataWrapper ()
 
 ParquetDataWrapper (const int db_id, const ForeignTable *foreign_table, const bool do_metadata_stats_validation=true)
 
 ParquetDataWrapper (const ForeignTable *foreign_table, std::shared_ptr< arrow::fs::FileSystem > file_system)
 Constructor intended for detect use-case only. More...
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
DataPreview getDataPreview (const size_t num_rows)
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
const std::set< std::string > getAlterableTableOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 
virtual bool isLazyFragmentFetchingEnabled () const
 

Private Member Functions

std::list< const
ColumnDescriptor * > 
getColumnsToInitialize (const Interval< ColumnType > &column_interval)
 
void initializeChunkBuffers (const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
 
void fetchChunkMetadata ()
 
void loadBuffersUsingLazyParquetChunkLoader (const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
 
std::vector< std::string > getOrderedProcessedFilePaths ()
 
std::vector< std::string > getAllFilePaths ()
 
bool moveToNextFragment (size_t new_rows_count) const
 
void finalizeFragmentMap ()
 
void addNewFragment (int row_group, const std::string &file_path)
 
bool isNewFile (const std::string &file_path) const
 
void addNewFile (const std::string &file_path)
 
void setLastFileRowCount (const std::string &file_path)
 
void resetParquetMetadata ()
 
void metadataScanFiles (const std::vector< std::string > &file_paths)
 
void metadataScanRowGroupMetadata (const std::list< RowGroupMetadata > &row_group_metadata)
 
std::list< RowGroupMetadatagetRowGroupMetadataForFilePaths (const std::vector< std::string > &file_paths) const
 
std::map< FilePathAndRowGroup,
RowGroupMetadata
getRowGroupMetadataMap (const std::vector< std::string > &file_paths) const
 
void updateChunkMetadataForFragment (const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
 
void metadataScanRowGroupIntervals (const std::vector< RowGroupInterval > &row_group_intervals)
 
void updateMetadataForRolledOffFiles (const std::set< std::string > &rolled_off_files)
 
void removeMetadataForLastFile (const std::string &last_file_path)
 

Private Attributes

const bool do_metadata_stats_validation_
 
std::map< int, std::vector
< RowGroupInterval > > 
fragment_to_row_group_interval_map_
 
std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
const int db_id_
 
const ForeignTableforeign_table_
 
int last_fragment_index_
 
size_t last_fragment_row_count_
 
size_t total_row_count_
 
size_t last_file_row_count_
 
int last_row_group_
 
bool is_restored_
 
std::unique_ptr
< ForeignTableSchema
schema_
 
std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
std::unique_ptr< FileReaderMapfile_reader_cache_
 
std::mutex delete_buffer_mutex_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static shared::FilePathOptions getFilePathOptions (const ForeignTable *foreign_table)
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::string ALLOW_FILE_ROLL_OFF_KEY = "ALLOW_FILE_ROLL_OFF"
 
static const std::string THREADS_KEY = "THREADS"
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 
static bool allowFileRollOff (const ForeignTable *foreign_table)
 

Detailed Description

Definition at line 42 of file ParquetDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( )
foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const int  db_id,
const ForeignTable foreign_table,
const bool  do_metadata_stats_validation = true 
)

Definition at line 92 of file ParquetDataWrapper.cpp.

References file_system_, foreign_storage::ForeignTable::foreign_server, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

95  : do_metadata_stats_validation_(do_metadata_stats_validation)
96  , db_id_(db_id)
97  , foreign_table_(foreign_table)
100  , total_row_count_(0)
102  , last_row_group_(0)
103  , is_restored_(false)
104  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
105  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
106  auto& server_options = foreign_table->foreign_server->options;
107  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
108  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
109  } else {
110  UNREACHABLE();
111  }
112 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_ptr< arrow::fs::FileSystem > file_system_
foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const ForeignTable foreign_table,
std::shared_ptr< arrow::fs::FileSystem >  file_system 
)

Constructor intended for detect use-case only.

Definition at line 78 of file ParquetDataWrapper.cpp.

81  , db_id_(-1)
82  , foreign_table_(foreign_table)
85  , total_row_count_(0)
87  , last_row_group_(0)
88  , is_restored_(false)
89  , file_system_(file_system)
90  , file_reader_cache_(std::make_unique<FileReaderMap>()) {}
std::unique_ptr< FileReaderMap > file_reader_cache_
std::shared_ptr< arrow::fs::FileSystem > file_system_

Member Function Documentation

void foreign_storage::ParquetDataWrapper::addNewFile ( const std::string &  file_path)
private

Definition at line 232 of file ParquetDataWrapper.cpp.

References foreign_storage::AbstractFileStorageDataWrapper::allowFileRollOff(), CHECK, CHECK_EQ, foreign_table_, fragment_to_row_group_interval_map_, last_fragment_index_, last_row_group_, and setLastFileRowCount().

Referenced by metadataScanRowGroupMetadata().

232  {
233  const auto last_fragment_entry =
235  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
236 
237  // The entry for the first fragment starts out as an empty vector
238  if (last_fragment_entry->second.empty()) {
239  // File roll off can result in empty older fragments.
242  }
243  } else {
244  last_fragment_entry->second.back().end_index = last_row_group_;
245  }
246  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
247  setLastFileRowCount(file_path);
248 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
void setLastFileRowCount(const std::string &file_path)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
static bool allowFileRollOff(const ForeignTable *foreign_table)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::addNewFragment ( int  row_group,
const std::string &  file_path 
)
private

Definition at line 202 of file ParquetDataWrapper.cpp.

References CHECK, fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, and setLastFileRowCount().

Referenced by metadataScanRowGroupMetadata().

202  {
203  const auto last_fragment_entry =
205  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
206 
207  last_fragment_entry->second.back().end_index = last_row_group_;
211  RowGroupInterval{file_path, row_group});
212  setLastFileRowCount(file_path);
213 }
void setLastFileRowCount(const std::string &file_path)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::fetchChunkMetadata ( )
private

Definition at line 255 of file ParquetDataWrapper.cpp.

References foreign_storage::AbstractFileStorageDataWrapper::allowFileRollOff(), CHECK, CHECK_EQ, shared::check_for_rolled_off_file_paths(), chunk_metadata_map_, shared::contains(), db_id_, file_reader_cache_, file_system_, foreign_table_, getAllFilePaths(), Catalog_Namespace::SysCatalog::getCatalog(), getOrderedProcessedFilePaths(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), last_file_row_count_, metadataScanFiles(), removeMetadataForLastFile(), resetParquetMetadata(), foreign_storage::throw_removed_file_error(), foreign_storage::throw_removed_row_in_file_error(), and updateMetadataForRolledOffFiles().

Referenced by populateChunkMetadata().

255  {
257  CHECK(catalog);
258  std::vector<std::string> new_file_paths;
259  auto processed_file_paths = getOrderedProcessedFilePaths();
260  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
261  auto all_file_paths = getAllFilePaths();
263  const auto rolled_off_files =
264  shared::check_for_rolled_off_file_paths(all_file_paths, processed_file_paths);
265  updateMetadataForRolledOffFiles(rolled_off_files);
266  }
267 
268  for (const auto& file_path : processed_file_paths) {
269  if (!shared::contains(all_file_paths, file_path)) {
270  throw_removed_file_error(file_path);
271  }
272  }
273 
274  // For multi-file appends, reprocess the last file in order to account for appends
275  // that may have occurred to this file. For single file appends, reprocess file if new
276  // rows have been added.
277  if (!processed_file_paths.empty()) {
278  // Single file append
279  if (all_file_paths.size() == 1) {
280  CHECK_EQ(processed_file_paths.size(), size_t(1));
281  CHECK_EQ(processed_file_paths[0], all_file_paths[0]);
282  }
283 
284  const auto& last_file_path = processed_file_paths.back();
285  // Since an existing file is being appended to we need to update the cached
286  // FileReader as the existing one will be out of date.
287  auto reader = file_reader_cache_->insert(last_file_path, file_system_);
288  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
289  if (row_count < last_file_row_count_) {
290  throw_removed_row_in_file_error(last_file_path);
291  } else if (row_count > last_file_row_count_) {
292  removeMetadataForLastFile(last_file_path);
293  new_file_paths.emplace_back(last_file_path);
294  }
295  }
296 
297  for (const auto& file_path : all_file_paths) {
298  if (!shared::contains(processed_file_paths, file_path)) {
299  new_file_paths.emplace_back(file_path);
300  }
301  }
302  } else {
303  CHECK(chunk_metadata_map_.empty());
304  new_file_paths = getAllFilePaths();
306  }
307 
308  if (!new_file_paths.empty()) {
309  metadataScanFiles(new_file_paths);
310  }
311 }
bool contains(const T &container, const U &element)
Definition: misc.h:204
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unique_ptr< FileReaderMap > file_reader_cache_
void updateMetadataForRolledOffFiles(const std::set< std::string > &rolled_off_files)
std::vector< std::string > getOrderedProcessedFilePaths()
std::vector< std::string > getAllFilePaths()
void throw_removed_row_in_file_error(const std::string &file_path)
std::set< std::string > check_for_rolled_off_file_paths(const std::vector< std::string > &all_file_paths, std::vector< std::string > &processed_file_paths)
void throw_removed_file_error(const std::string &file_path)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static SysCatalog & instance()
Definition: SysCatalog.h:343
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
void removeMetadataForLastFile(const std::string &last_file_path)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static bool allowFileRollOff(const ForeignTable *foreign_table)
void metadataScanFiles(const std::vector< std::string > &file_paths)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::finalizeFragmentMap ( )
private

Definition at line 197 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by metadataScanRowGroupMetadata().

197  {
200 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

std::vector< std::string > foreign_storage::ParquetDataWrapper::getAllFilePaths ( )
private

Definition at line 395 of file ParquetDataWrapper.cpp.

References DEBUG_TIMER, foreign_storage::ForeignTable::foreign_server, foreign_table_, foreign_storage::AbstractFileStorageDataWrapper::getFilePathOptions(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, shared::local_glob_filter_sort_files(), foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

Referenced by fetchChunkMetadata(), and getDataPreview().

395  {
396  auto timer = DEBUG_TIMER(__func__);
397  std::vector<std::string> found_file_paths;
398  auto file_path = getFullFilePath(foreign_table_);
399  const auto file_path_options = getFilePathOptions(foreign_table_);
400  auto& server_options = foreign_table_->foreign_server->options;
401  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
402  found_file_paths = shared::local_glob_filter_sort_files(file_path, file_path_options);
403  } else {
404  UNREACHABLE();
405  }
406  return found_file_paths;
407 }
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
#define UNREACHABLE()
Definition: Logger.h:338
const ForeignServer * foreign_server
Definition: ForeignTable.h:57
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::ParquetDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 70 of file ParquetDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTER_FRAGMENT.

std::list< const ColumnDescriptor * > foreign_storage::ParquetDataWrapper::getColumnsToInitialize ( const Interval< ColumnType > &  column_interval)
private

Definition at line 126 of file ParquetDataWrapper.cpp.

References CHECK, db_id_, foreign_storage::Interval< T >::end, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), schema_, and foreign_storage::Interval< T >::start.

Referenced by initializeChunkBuffers().

127  {
129  CHECK(catalog);
130  const auto& columns = schema_->getLogicalAndPhysicalColumns();
131  auto column_start = column_interval.start;
132  auto column_end = column_interval.end;
133  std::list<const ColumnDescriptor*> columns_to_init;
134  for (const auto column : columns) {
135  auto column_id = column->columnId;
136  if (column_id >= column_start && column_id <= column_end) {
137  columns_to_init.push_back(column);
138  }
139  }
140  return columns_to_init;
141 }
std::unique_ptr< ForeignTableSchema > schema_
static SysCatalog & instance()
Definition: SysCatalog.h:343
T const end
Definition: Intervals.h:68
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

DataPreview foreign_storage::ParquetDataWrapper::getDataPreview ( const size_t  num_rows)

Definition at line 735 of file ParquetDataWrapper.cpp.

References file_reader_cache_, file_system_, foreign_table_, getAllFilePaths(), and foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath().

735  {
736  LazyParquetChunkLoader chunk_loader(
738  auto file_paths = getAllFilePaths();
739  if (file_paths.empty()) {
740  throw ForeignStorageException{"No file found at \"" +
742  }
743  return chunk_loader.previewFiles(file_paths, num_rows, *foreign_table_);
744 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::vector< std::string > getAllFilePaths()
std::shared_ptr< arrow::fs::FileSystem > file_system_
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

ParallelismLevel foreign_storage::ParquetDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 72 of file ParquetDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

std::vector< std::string > foreign_storage::ParquetDataWrapper::getOrderedProcessedFilePaths ( )
private

Definition at line 383 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_.

Referenced by fetchChunkMetadata().

383  {
384  std::vector<std::string> file_paths;
385  for (const auto& entry : fragment_to_row_group_interval_map_) {
386  for (const auto& row_group_interval : entry.second) {
387  if (file_paths.empty() || file_paths.back() != row_group_interval.file_path) {
388  file_paths.emplace_back(row_group_interval.file_path);
389  }
390  }
391  }
392  return file_paths;
393 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

std::list< RowGroupMetadata > foreign_storage::ParquetDataWrapper::getRowGroupMetadataForFilePaths ( const std::vector< std::string > &  file_paths) const
private

Definition at line 441 of file ParquetDataWrapper.cpp.

References do_metadata_stats_validation_, file_reader_cache_, file_system_, foreign_table_, foreign_storage::LazyParquetChunkLoader::metadataScan(), and schema_.

Referenced by getRowGroupMetadataMap(), and metadataScanFiles().

442  {
443  LazyParquetChunkLoader chunk_loader(
445  return chunk_loader.metadataScan(file_paths, *schema_, do_metadata_stats_validation_);
446 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
std::shared_ptr< arrow::fs::FileSystem > file_system_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::map< FilePathAndRowGroup, RowGroupMetadata > foreign_storage::ParquetDataWrapper::getRowGroupMetadataMap ( const std::vector< std::string > &  file_paths) const
private

Definition at line 844 of file ParquetDataWrapper.cpp.

References getRowGroupMetadataForFilePaths().

Referenced by metadataScanRowGroupIntervals(), and updateMetadataForRolledOffFiles().

845  {
846  auto row_group_metadata = getRowGroupMetadataForFilePaths(file_paths);
847  std::map<FilePathAndRowGroup, RowGroupMetadata> row_group_metadata_map;
848  for (const auto& row_group_metadata_item : row_group_metadata) {
849  row_group_metadata_map[{row_group_metadata_item.file_path,
850  row_group_metadata_item.row_group_index}] =
851  row_group_metadata_item;
852  }
853  return row_group_metadata_map;
854 }
std::list< RowGroupMetadata > getRowGroupMetadataForFilePaths(const std::vector< std::string > &file_paths) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string foreign_storage::ParquetDataWrapper::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 687 of file ParquetDataWrapper.cpp.

References json_utils::add_value_to_object(), fragment_to_row_group_interval_map_, last_file_row_count_, last_fragment_index_, last_fragment_row_count_, last_row_group_, total_row_count_, and json_utils::write_to_string().

687  {
688  rapidjson::Document d;
689  d.SetObject();
690 
693  "fragment_to_row_group_interval_map",
694  d.GetAllocator());
695  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
697  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
699  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
701  d, total_row_count_, "total_row_count", d.GetAllocator());
703  d, last_file_row_count_, "last_file_row_count", d.GetAllocator());
704  return json_utils::write_to_string(d);
705 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::string write_to_string(const rapidjson::Document &document)
Definition: JsonUtils.cpp:225
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: JsonUtils.h:255

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::initializeChunkBuffers ( const int  fragment_index,
const Interval< ColumnType > &  column_interval,
const ChunkToBufferMap required_buffers,
const bool  reserve_buffers_and_set_stats = false 
)
private

Definition at line 143 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, shared::get_from_map(), getColumnsToInitialize(), kENCODING_NONE, and TableDescriptor::tableId.

Referenced by loadBuffersUsingLazyParquetChunkLoader().

147  {
148  for (const auto column : getColumnsToInitialize(column_interval)) {
149  Chunk_NS::Chunk chunk{column, false};
150  ChunkKey data_chunk_key;
151  if (column->columnType.is_varlen_indeed()) {
152  data_chunk_key = {
153  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
154  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
155  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
156  chunk.setBuffer(data_buffer);
157 
158  ChunkKey index_chunk_key{
159  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
160  CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
161  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
162  chunk.setIndexBuffer(index_buffer);
163  } else {
164  data_chunk_key = {
165  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
166  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
167  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
168  chunk.setBuffer(data_buffer);
169  }
170  chunk.initEncoder();
171  if (reserve_buffers_and_set_stats) {
172  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
173  CHECK(metadata_it != chunk_metadata_map_.end());
174  auto buffer = chunk.getBuffer();
175  auto& metadata = metadata_it->second;
176  auto encoder = buffer->getEncoder();
177  encoder->resetChunkStats(metadata->chunkStats);
178  encoder->setNumElems(metadata->numElements);
179  if ((column->columnType.is_string() &&
180  column->columnType.get_compression() == kENCODING_NONE) ||
181  column->columnType.is_geometry()) {
182  // non-dictionary string or geometry WKT string
183  auto index_buffer = chunk.getIndexBuf();
184  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
185  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
186  auto index_buffer = chunk.getIndexBuf();
187  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
188  } else {
189  size_t num_bytes_to_reserve =
190  metadata->numElements * column->columnType.get_size();
191  buffer->reserve(num_bytes_to_reserve);
192  }
193  }
194  }
195 }
std::vector< int > ChunkKey
Definition: types.h:36
int32_t StringOffsetT
Definition: sqltypes.h:1495
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
int32_t ArrayOffsetT
Definition: sqltypes.h:1496
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isNewFile ( const std::string &  file_path) const
private

Definition at line 215 of file ParquetDataWrapper.cpp.

References foreign_storage::AbstractFileStorageDataWrapper::allowFileRollOff(), CHECK, CHECK_EQ, foreign_table_, fragment_to_row_group_interval_map_, and last_fragment_index_.

Referenced by metadataScanRowGroupMetadata().

215  {
216  const auto last_fragment_entry =
218  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
219 
220  // The entry for the first fragment starts out as an empty vector
221  if (last_fragment_entry->second.empty()) {
222  // File roll off can result in empty older fragments.
225  }
226  return true;
227  } else {
228  return (last_fragment_entry->second.back().file_path != file_path);
229  }
230 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
static bool allowFileRollOff(const ForeignTable *foreign_table)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 731 of file ParquetDataWrapper.cpp.

References is_restored_.

731  {
732  return is_restored_;
733 }
void foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader ( const int  logical_column_id,
const int  fragment_id,
const ChunkToBufferMap required_buffers,
AbstractBuffer delete_buffer 
)
private

Definition at line 487 of file ParquetDataWrapper.cpp.

References Data_Namespace::AbstractBuffer::append(), CHECK, CHECK_GT, chunk_metadata_map_, ColumnDescriptor::columnType, db_id_, delete_buffer_mutex_, foreign_storage::Interval< T >::end, file_reader_cache_, file_system_, foreign_table_, fragment_to_row_group_interval_map_, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_elem_type(), shared::get_from_map(), SQLTypeInfo::get_physical_cols(), Catalog_Namespace::SysCatalog::getCatalog(), Data_Namespace::AbstractBuffer::getMemoryPtr(), initializeChunkBuffers(), Catalog_Namespace::SysCatalog::instance(), SQLTypeInfo::is_array(), SQLTypeInfo::is_dict_encoded_string(), SQLTypeInfo::is_geometry(), foreign_storage::LazyParquetChunkLoader::loadChunk(), schema_, Data_Namespace::AbstractBuffer::size(), foreign_storage::Interval< T >::start, and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

491  {
492  const auto& row_group_intervals =
494  // File roll off can lead to an empty row group interval vector.
495  if (row_group_intervals.empty()) {
496  return;
497  }
498 
500  CHECK(catalog);
501  const ColumnDescriptor* logical_column =
502  schema_->getColumnDescriptor(logical_column_id);
503  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
504 
505  const Interval<ColumnType> column_interval = {
506  logical_column_id,
507  logical_column_id + logical_column->columnType.get_physical_cols()};
508  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
509 
510  const bool is_dictionary_encoded_string_column =
511  logical_column->columnType.is_dict_encoded_string() ||
512  (logical_column->columnType.is_array() &&
513  logical_column->columnType.get_elem_type().is_dict_encoded_string());
514 
515  StringDictionary* string_dictionary = nullptr;
516  if (is_dictionary_encoded_string_column) {
517  auto dict_descriptor =
518  catalog->getMetadataForDict(logical_column->columnType.get_comp_param(), true);
519  CHECK(dict_descriptor);
520  string_dictionary = dict_descriptor->stringDict.get();
521  }
522 
523  std::list<Chunk_NS::Chunk> chunks;
524  for (int column_id = column_interval.start; column_id <= column_interval.end;
525  ++column_id) {
526  auto column_descriptor = schema_->getColumnDescriptor(column_id);
527  Chunk_NS::Chunk chunk{column_descriptor, false};
528  if (column_descriptor->columnType.is_varlen_indeed()) {
529  ChunkKey data_chunk_key = {
530  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
531  auto buffer = shared::get_from_map(required_buffers, data_chunk_key);
532  chunk.setBuffer(buffer);
533  ChunkKey index_chunk_key = {
534  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
535  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
536  chunk.setIndexBuffer(index_buffer);
537  } else {
538  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
539  auto buffer = shared::get_from_map(required_buffers, chunk_key);
540  chunk.setBuffer(buffer);
541  }
542  chunks.emplace_back(chunk);
543  }
544 
545  std::unique_ptr<RejectedRowIndices> rejected_row_indices;
546  if (delete_buffer) {
547  rejected_row_indices = std::make_unique<RejectedRowIndices>();
548  }
549  LazyParquetChunkLoader chunk_loader(
551  auto metadata = chunk_loader.loadChunk(row_group_intervals,
552  parquet_column_index,
553  chunks,
554  string_dictionary,
555  rejected_row_indices.get());
556 
557  if (delete_buffer) {
558  // all modifying operations on `delete_buffer` must be synchronized as it is a
559  // shared buffer
560  std::unique_lock<std::mutex> delete_buffer_lock(delete_buffer_mutex_);
561 
562  CHECK(!chunks.empty());
563  CHECK(chunks.begin()->getBuffer()->hasEncoder());
564  auto num_rows_in_chunk = chunks.begin()->getBuffer()->getEncoder()->getNumElems();
565 
566  // ensure delete buffer is sized appropriately
567  if (delete_buffer->size() < num_rows_in_chunk) {
568  auto remaining_rows = num_rows_in_chunk - delete_buffer->size();
569  std::vector<int8_t> data(remaining_rows, false);
570  delete_buffer->append(data.data(), remaining_rows);
571  }
572 
573  // compute a logical OR with current `delete_buffer` contents and this chunks
574  // rejected indices
575  CHECK(rejected_row_indices);
576  auto delete_buffer_data = delete_buffer->getMemoryPtr();
577  for (const auto& rejected_index : *rejected_row_indices) {
578  CHECK_GT(delete_buffer->size(), static_cast<size_t>(rejected_index));
579  delete_buffer_data[rejected_index] = true;
580  }
581  }
582 
583  auto metadata_iter = metadata.begin();
584  for (int column_id = column_interval.start; column_id <= column_interval.end;
585  ++column_id, ++metadata_iter) {
586  auto column = schema_->getColumnDescriptor(column_id);
587  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
588  if (column->columnType.is_varlen_indeed()) {
589  data_chunk_key.emplace_back(1);
590  }
591  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
592 
593  // Allocate new shared_ptr for metadata so we dont modify old one which may be used
594  // by executor
595  auto cached_metadata_previous =
596  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
597  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
598  std::make_shared<ChunkMetadata>();
599  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
600  *cached_metadata = *cached_metadata_previous;
601 
602  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
603  cached_metadata->numBytes =
604  shared::get_from_map(required_buffers, data_chunk_key)->size();
605 
606  // for certain types, update the metadata statistics
607  // should update the cache, and the internal chunk_metadata_map_
608  if (is_dictionary_encoded_string_column || logical_column->columnType.is_geometry()) {
609  CHECK(metadata_iter != metadata.end());
610  cached_metadata->chunkStats = (*metadata_iter)->chunkStats;
611 
612  // Update stats on buffer so it is saved in cache
613  shared::get_from_map(required_buffers, data_chunk_key)
614  ->getEncoder()
615  ->resetChunkStats(cached_metadata->chunkStats);
616  }
617  }
618 }
std::vector< int > ChunkKey
Definition: types.h:36
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
virtual int8_t * getMemoryPtr()=0
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
int get_physical_cols() const
Definition: sqltypes.h:432
static SysCatalog & instance()
Definition: SysCatalog.h:343
T const end
Definition: Intervals.h:68
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
specifies the content in-memory of a row in the column metadata table
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:402
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:597
bool is_dict_encoded_string() const
Definition: sqltypes.h:643
SQLTypeInfo columnType
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:977
bool is_array() const
Definition: sqltypes.h:585

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::metadataScanFiles ( const std::vector< std::string > &  file_paths)
private

Definition at line 409 of file ParquetDataWrapper.cpp.

References getRowGroupMetadataForFilePaths(), and metadataScanRowGroupMetadata().

Referenced by fetchChunkMetadata().

409  {
410  auto row_group_metadata = getRowGroupMetadataForFilePaths(file_paths);
411  metadataScanRowGroupMetadata(row_group_metadata);
412 }
void metadataScanRowGroupMetadata(const std::list< RowGroupMetadata > &row_group_metadata)
std::list< RowGroupMetadata > getRowGroupMetadataForFilePaths(const std::vector< std::string > &file_paths) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::metadataScanRowGroupIntervals ( const std::vector< RowGroupInterval > &  row_group_intervals)
private

Definition at line 824 of file ParquetDataWrapper.cpp.

References shared::get_from_map(), getRowGroupMetadataMap(), and metadataScanRowGroupMetadata().

Referenced by removeMetadataForLastFile().

825  {
826  std::vector<std::string> file_paths;
827  for (const auto& row_group_interval : row_group_intervals) {
828  file_paths.emplace_back(row_group_interval.file_path);
829  }
830  auto row_group_metadata_map = getRowGroupMetadataMap(file_paths);
831  std::list<RowGroupMetadata> row_group_metadata;
832  for (const auto& row_group_interval : row_group_intervals) {
833  for (auto row_group = row_group_interval.start_index;
834  row_group <= row_group_interval.end_index;
835  row_group++) {
836  row_group_metadata.emplace_back(shared::get_from_map(
837  row_group_metadata_map, {row_group_interval.file_path, row_group}));
838  }
839  }
840  metadataScanRowGroupMetadata(row_group_metadata);
841 }
void metadataScanRowGroupMetadata(const std::list< RowGroupMetadata > &row_group_metadata)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
std::map< FilePathAndRowGroup, RowGroupMetadata > getRowGroupMetadataMap(const std::vector< std::string > &file_paths) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::metadataScanRowGroupMetadata ( const std::list< RowGroupMetadata > &  row_group_metadata)
private

Definition at line 414 of file ParquetDataWrapper.cpp.

References addNewFile(), addNewFragment(), CHECK_EQ, finalizeFragmentMap(), isNewFile(), last_fragment_index_, last_fragment_row_count_, last_row_group_, moveToNextFragment(), schema_, total_row_count_, and updateChunkMetadataForFragment().

Referenced by metadataScanFiles(), and metadataScanRowGroupIntervals().

415  {
416  auto column_interval =
417  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
418  schema_->getLogicalAndPhysicalColumns().back()->columnId};
419  for (const auto& row_group_metadata_item : row_group_metadata) {
420  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
421  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
422  const auto import_row_count = (*column_chunk_metadata_iter)->numElements;
423  auto row_group = row_group_metadata_item.row_group_index;
424  const auto& file_path = row_group_metadata_item.file_path;
425  if (moveToNextFragment(import_row_count)) {
426  addNewFragment(row_group, file_path);
427  } else if (isNewFile(file_path)) {
428  CHECK_EQ(row_group, 0);
429  addNewFile(file_path);
430  }
431  last_row_group_ = row_group;
433  column_interval, column_chunk_metadata, last_fragment_index_);
434 
435  last_fragment_row_count_ += import_row_count;
436  total_row_count_ += import_row_count;
437  }
439 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unique_ptr< ForeignTableSchema > schema_
void addNewFile(const std::string &file_path)
void addNewFragment(int row_group, const std::string &file_path)
void updateChunkMetadataForFragment(const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::moveToNextFragment ( size_t  new_rows_count) const
private

Definition at line 474 of file ParquetDataWrapper.cpp.

References foreign_table_, last_fragment_row_count_, and TableDescriptor::maxFragRows.

Referenced by metadataScanRowGroupMetadata().

474  {
475  return (last_fragment_row_count_ + new_rows_count) >
476  static_cast<size_t>(foreign_table_->maxFragRows);
477 }

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers,
AbstractBuffer delete_buffer 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)
delete_buffer- chunk buffer for fragment's delete column, if non-null data wrapper is expected to mark deleted rows in buffer and continue processing

Implements foreign_storage::ForeignDataWrapper.

Definition at line 620 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, foreign_storage::create_futures_for_workers(), db_id_, DEBUG_TIMER_NEW_THREAD, foreign_table_, foreign_storage::get_num_threads(), loadBuffersUsingLazyParquetChunkLoader(), schema_, logger::ThreadLocalIds::setNewThreadId(), TableDescriptor::tableId, logger::ThreadLocalIds::thread_id_, logger::thread_local_ids(), to_string(), and VLOG.

622  {
623  ChunkToBufferMap buffers_to_load;
624  buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
625  buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
626 
627  CHECK(!buffers_to_load.empty());
628 
629  std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
630  for (const auto& [chunk_key, buffer] : buffers_to_load) {
631  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
632  col_frag_hints.emplace(
633  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId,
634  chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
635  }
636 
637  const logger::ThreadLocalIds parent_thread_local_ids = logger::thread_local_ids();
638 
639  std::function<void(const std::set<ForeignStorageMgr::ParallelismHint>&)> lambda =
640  [&, this](const std::set<ForeignStorageMgr::ParallelismHint>& hint_set) {
641  // Enable debug timers
642  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
643  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
644 
645  for (const auto& [col_id, frag_id] : hint_set) {
647  col_id, frag_id, buffers_to_load, delete_buffer);
648  VLOG(1) << "Loaded key " << db_id_ << "," << foreign_table_->tableId << ","
649  << col_id << "," << frag_id;
650  }
651  };
652 
655 
656  VLOG(1) << "Populating chunk from parquet source using " + std::to_string(num_threads) +
657  " threads.";
658  auto futures = create_futures_for_workers(col_frag_hints, num_threads, lambda);
659 
660  // We wait on all futures, then call get because we want all threads to have finished
661  // before we propagate a potential exception.
662  for (auto& future : futures) {
663  future.wait();
664  }
665 
666  for (auto& future : futures) {
667  future.get();
668  }
669 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
size_t get_num_threads(const ForeignTable &table)
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:417
std::string to_string(char const *&&v)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
LocalIdsScopeGuard setNewThreadId() const
Definition: Logger.cpp:538
#define CHECK(condition)
Definition: Logger.h:291
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
ThreadId thread_id_
Definition: Logger.h:138
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:882
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates given chunk metadata vector with metadata for all chunks in related foreign table.

Parameters
chunk_metadata_vector- vector that will be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 479 of file ParquetDataWrapper.cpp.

References chunk_metadata_map_, and fetchChunkMetadata().

480  {
482  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
483  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
484  }
485 }
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::removeMetadataForLastFile ( const std::string &  last_file_path)
private

Definition at line 746 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, CHUNK_KEY_FRAGMENT_IDX, chunk_metadata_map_, fragment_to_row_group_interval_map_, shared::get_from_map(), last_fragment_index_, last_fragment_row_count_, last_row_group_, metadataScanRowGroupIntervals(), resetParquetMetadata(), and total_row_count_.

Referenced by fetchChunkMetadata().

746  {
747  std::optional<int32_t> first_deleted_fragment_id;
748  for (auto it = fragment_to_row_group_interval_map_.begin();
750  const auto fragment_id = it->first;
751  const auto& row_group_intervals = it->second;
752  for (const auto& row_group_interval : row_group_intervals) {
753  if (first_deleted_fragment_id.has_value()) {
754  // All subsequent fragments should map to the last file.
755  CHECK_EQ(last_file_path, row_group_interval.file_path);
756  } else if (last_file_path == row_group_interval.file_path) {
757  first_deleted_fragment_id = fragment_id;
758  }
759  }
760  if (first_deleted_fragment_id.has_value() &&
761  first_deleted_fragment_id.value() < fragment_id) {
763  } else {
764  it++;
765  }
766  }
767  CHECK(first_deleted_fragment_id.has_value());
768 
769  std::map<int32_t, size_t> remaining_fragments_row_counts;
770  for (auto it = chunk_metadata_map_.begin(); it != chunk_metadata_map_.end();) {
771  auto fragment_id = it->first[CHUNK_KEY_FRAGMENT_IDX];
772  if (fragment_id >= first_deleted_fragment_id.value()) {
773  it = chunk_metadata_map_.erase(it);
774  } else {
775  auto fragment_count_it = remaining_fragments_row_counts.find(fragment_id);
776  if (fragment_count_it == remaining_fragments_row_counts.end()) {
777  remaining_fragments_row_counts[fragment_id] = it->second->numElements;
778  } else {
779  CHECK_EQ(remaining_fragments_row_counts[fragment_id], it->second->numElements);
780  }
781  it++;
782  }
783  }
784 
785  total_row_count_ = 0;
786  for (const auto [fragment_id, row_count] : remaining_fragments_row_counts) {
787  total_row_count_ += row_count;
788  }
789 
790  // Re-populate metadata for last fragment with deleted rows, excluding metadata for the
791  // last file.
792  auto row_group_intervals_to_scan = shared::get_from_map(
793  fragment_to_row_group_interval_map_, first_deleted_fragment_id.value());
794  auto it = std::find_if(row_group_intervals_to_scan.begin(),
795  row_group_intervals_to_scan.end(),
796  [&last_file_path](const auto& row_group_interval) {
797  return row_group_interval.file_path == last_file_path;
798  });
799  CHECK(it != row_group_intervals_to_scan.end());
800  row_group_intervals_to_scan.erase(it, row_group_intervals_to_scan.end());
801 
802  if (first_deleted_fragment_id.value() > 0) {
803  last_fragment_index_ = first_deleted_fragment_id.value() - 1;
805  shared::get_from_map(remaining_fragments_row_counts, last_fragment_index_);
806  const auto& last_row_group_intervals =
808  if (last_row_group_intervals.empty()) {
809  last_row_group_ = 0;
810  } else {
811  last_row_group_ = last_row_group_intervals.back().end_index;
812  }
813  fragment_to_row_group_interval_map_.erase(first_deleted_fragment_id.value());
814  } else {
815  CHECK_EQ(total_row_count_, size_t(0));
817  }
818 
819  if (!row_group_intervals_to_scan.empty()) {
820  metadataScanRowGroupIntervals(row_group_intervals_to_scan);
821  }
822 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void metadataScanRowGroupIntervals(const std::vector< RowGroupInterval > &row_group_intervals)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::resetParquetMetadata ( )
private

Definition at line 114 of file ParquetDataWrapper.cpp.

References file_reader_cache_, fragment_to_row_group_interval_map_, last_file_row_count_, last_fragment_index_, last_fragment_row_count_, last_row_group_, and total_row_count_.

Referenced by fetchChunkMetadata(), and removeMetadataForLastFile().

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 707 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, fragment_to_row_group_interval_map_, json_utils::get_value_from_object(), is_restored_, last_file_row_count_, last_fragment_index_, last_fragment_row_count_, last_row_group_, json_utils::read_from_file(), and total_row_count_.

709  {
710  auto d = json_utils::read_from_file(file_path);
711  CHECK(d.IsObject());
712 
714  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
716  json_utils::get_value_from_object(d, last_fragment_index_, "last_fragment_index");
718  d, last_fragment_row_count_, "last_fragment_row_count");
720  if (d.HasMember("last_file_row_count")) {
721  json_utils::get_value_from_object(d, last_file_row_count_, "last_file_row_count");
722  }
723 
724  CHECK(chunk_metadata_map_.empty());
725  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
726  chunk_metadata_map_[chunk_key] = chunk_metadata;
727  }
728  is_restored_ = true;
729 }
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: JsonUtils.h:270
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
rapidjson::Document read_from_file(const std::string &file_path)
Definition: JsonUtils.cpp:201
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::setLastFileRowCount ( const std::string &  file_path)
private

Definition at line 250 of file ParquetDataWrapper.cpp.

References file_reader_cache_, file_system_, and last_file_row_count_.

Referenced by addNewFile(), and addNewFragment().

250  {
251  auto reader = file_reader_cache_->getOrInsert(file_path, file_system_);
252  last_file_row_count_ = reader->parquet_reader()->metadata()->num_rows();
253 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::shared_ptr< arrow::fs::FileSystem > file_system_

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::updateChunkMetadataForFragment ( const Interval< ColumnType > &  column_interval,
const std::list< std::shared_ptr< ChunkMetadata >> &  column_chunk_metadata,
int32_t  fragment_id 
)
private

Definition at line 448 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, chunk_metadata_map_, db_id_, foreign_storage::Interval< T >::end, foreign_table_, foreign_storage::anonymous_namespace{ParquetDataWrapper.cpp}::reduce_metadata(), schema_, foreign_storage::Interval< T >::start, and TableDescriptor::tableId.

Referenced by metadataScanRowGroupMetadata(), and updateMetadataForRolledOffFiles().

451  {
452  CHECK_EQ(static_cast<int>(column_chunk_metadata.size()),
453  schema_->numLogicalAndPhysicalColumns());
454  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
455  for (auto column_id = column_interval.start; column_id <= column_interval.end;
456  column_id++, column_chunk_metadata_iter++) {
457  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
458  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
459  const auto& type_info = column_descriptor->columnType;
460  ChunkKey const data_chunk_key =
461  type_info.is_varlen_indeed()
462  ? ChunkKey{db_id_, foreign_table_->tableId, column_id, fragment_id, 1}
463  : ChunkKey{db_id_, foreign_table_->tableId, column_id, fragment_id};
464  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
465 
466  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
467  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
468  } else {
469  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
470  }
471  }
472 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
std::unique_ptr< ForeignTableSchema > schema_
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
T const end
Definition: Intervals.h:68
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::updateMetadataForRolledOffFiles ( const std::set< std::string > &  rolled_off_files)
private

Definition at line 313 of file ParquetDataWrapper.cpp.

References CHECK, CHUNK_KEY_FRAGMENT_IDX, chunk_metadata_map_, shared::contains(), fragment_to_row_group_interval_map_, shared::get_from_map(), getRowGroupMetadataMap(), schema_, and updateChunkMetadataForFragment().

Referenced by fetchChunkMetadata().

314  {
315  if (!rolled_off_files.empty()) {
316  std::set<int32_t> deleted_fragment_ids;
317  std::optional<int32_t> partially_deleted_fragment_id;
318  std::vector<std::string> remaining_files_in_partially_deleted_fragment;
319  for (auto& [fragment_id, row_group_interval_vec] :
321  for (auto it = row_group_interval_vec.begin();
322  it != row_group_interval_vec.end();) {
323  if (shared::contains(rolled_off_files, it->file_path)) {
324  it = row_group_interval_vec.erase(it);
325  } else {
326  remaining_files_in_partially_deleted_fragment.emplace_back(it->file_path);
327  it++;
328  }
329  }
330  if (row_group_interval_vec.empty()) {
331  deleted_fragment_ids.emplace(fragment_id);
332  } else {
333  CHECK(!remaining_files_in_partially_deleted_fragment.empty());
334  partially_deleted_fragment_id = fragment_id;
335  break;
336  }
337  }
338 
339  for (auto it = chunk_metadata_map_.begin(); it != chunk_metadata_map_.end();) {
340  const auto& chunk_key = it->first;
341  if (shared::contains(deleted_fragment_ids, chunk_key[CHUNK_KEY_FRAGMENT_IDX])) {
342  auto& chunk_metadata = it->second;
343  chunk_metadata->numElements = 0;
344  chunk_metadata->numBytes = 0;
345  it++;
346  } else if (partially_deleted_fragment_id.has_value() &&
347  chunk_key[CHUNK_KEY_FRAGMENT_IDX] == partially_deleted_fragment_id) {
348  // Metadata for the partially deleted fragment will be re-populated.
349  it = chunk_metadata_map_.erase(it);
350  } else {
351  it++;
352  }
353  }
354 
355  if (partially_deleted_fragment_id.has_value()) {
356  // Create map of row group to row group metadata for remaining files in the
357  // fragment.
358  auto row_group_metadata_map =
359  getRowGroupMetadataMap(remaining_files_in_partially_deleted_fragment);
360 
361  // Re-populate metadata for remaining row groups in partially deleted fragment.
362  auto column_interval =
363  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
364  schema_->getLogicalAndPhysicalColumns().back()->columnId};
365  auto row_group_intervals = shared::get_from_map(
366  fragment_to_row_group_interval_map_, partially_deleted_fragment_id.value());
367  for (const auto& row_group_interval : row_group_intervals) {
368  for (auto row_group = row_group_interval.start_index;
369  row_group <= row_group_interval.end_index;
370  row_group++) {
371  auto itr =
372  row_group_metadata_map.find({row_group_interval.file_path, row_group});
373  CHECK(itr != row_group_metadata_map.end());
374  updateChunkMetadataForFragment(column_interval,
375  itr->second.column_chunk_metadata,
376  partially_deleted_fragment_id.value());
377  }
378  }
379  }
380  }
381 }
bool contains(const T &container, const U &element)
Definition: misc.h:204
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void updateChunkMetadataForFragment(const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
#define CHECK(condition)
Definition: Logger.h:291
std::map< FilePathAndRowGroup, RowGroupMetadata > getRowGroupMetadataMap(const std::vector< std::string > &file_paths) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::mutex foreign_storage::ParquetDataWrapper::delete_buffer_mutex_
private

Definition at line 145 of file ParquetDataWrapper.h.

Referenced by loadBuffersUsingLazyParquetChunkLoader().

const bool foreign_storage::ParquetDataWrapper::do_metadata_stats_validation_
private

Definition at line 130 of file ParquetDataWrapper.h.

Referenced by getRowGroupMetadataForFilePaths().

std::unique_ptr<FileReaderMap> foreign_storage::ParquetDataWrapper::file_reader_cache_
private
std::shared_ptr<arrow::fs::FileSystem> foreign_storage::ParquetDataWrapper::file_system_
private
bool foreign_storage::ParquetDataWrapper::is_restored_
private

Definition at line 140 of file ParquetDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

size_t foreign_storage::ParquetDataWrapper::last_file_row_count_
private
size_t foreign_storage::ParquetDataWrapper::last_fragment_row_count_
private
size_t foreign_storage::ParquetDataWrapper::total_row_count_
private

The documentation for this class was generated from the following files: