21 #include <arrow/filesystem/localfs.h>
22 #include <boost/filesystem.hpp>
35 namespace foreign_storage {
39 std::shared_ptr<ChunkMetadata> reduce_from) {
40 CHECK(reduce_to->sqlType == reduce_from->sqlType);
41 reduce_to->numBytes += reduce_from->numBytes;
42 reduce_to->numElements += reduce_from->numElements;
43 reduce_to->chunkStats.has_nulls |= reduce_from->chunkStats.has_nulls;
45 auto column_type = reduce_to->sqlType;
46 column_type = column_type.is_array() ? column_type.get_elem_type() : column_type;
50 if (column_type.is_string() || column_type.is_geometry()) {
53 reduce_to->chunkStats.max = reduce_from->chunkStats.max;
54 reduce_to->chunkStats.min = reduce_from->chunkStats.min;
68 encoder_to->reduceStats(*encoder_from);
69 auto updated_metadata = std::make_shared<ChunkMetadata>();
70 encoder_to->getMetadata(updated_metadata);
71 reduce_to->chunkStats = updated_metadata->chunkStats;
76 : do_metadata_stats_validation_(
true), db_id_(-1), foreign_table_(nullptr) {}
79 std::shared_ptr<arrow::fs::FileSystem> file_system)
80 : do_metadata_stats_validation_(
false)
82 , foreign_table_(foreign_table)
83 , last_fragment_index_(0)
84 , last_fragment_row_count_(0)
86 , last_file_row_count_(0)
89 , file_system_(file_system)
94 const bool do_metadata_stats_validation)
95 : do_metadata_stats_validation_(do_metadata_stats_validation)
97 , foreign_table_(foreign_table)
98 , last_fragment_index_(0)
99 , last_fragment_row_count_(0)
100 , total_row_count_(0)
101 , last_file_row_count_(0)
103 , is_restored_(
false)
108 file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
130 const auto& columns =
schema_->getLogicalAndPhysicalColumns();
131 auto column_start = column_interval.
start;
132 auto column_end = column_interval.
end;
133 std::list<const ColumnDescriptor*> columns_to_init;
134 for (
const auto column : columns) {
135 auto column_id = column->columnId;
136 if (column_id >= column_start && column_id <= column_end) {
137 columns_to_init.push_back(column);
140 return columns_to_init;
144 const int fragment_index,
147 const bool reserve_buffers_and_set_stats) {
151 if (column->columnType.is_varlen_indeed()) {
154 CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
156 chunk.setBuffer(data_buffer);
160 CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
162 chunk.setIndexBuffer(index_buffer);
166 CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
168 chunk.setBuffer(data_buffer);
171 if (reserve_buffers_and_set_stats) {
174 auto buffer = chunk.getBuffer();
175 auto& metadata = metadata_it->second;
176 auto encoder = buffer->getEncoder();
177 encoder->resetChunkStats(metadata->chunkStats);
178 encoder->setNumElems(metadata->numElements);
179 if ((column->columnType.is_string() &&
181 column->columnType.is_geometry()) {
183 auto index_buffer = chunk.getIndexBuf();
184 index_buffer->reserve(
sizeof(
StringOffsetT) * (metadata->numElements + 1));
185 }
else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
186 auto index_buffer = chunk.getIndexBuf();
187 index_buffer->reserve(
sizeof(
ArrayOffsetT) * (metadata->numElements + 1));
189 size_t num_bytes_to_reserve =
190 metadata->numElements * column->columnType.get_size();
191 buffer->reserve(num_bytes_to_reserve);
203 const auto last_fragment_entry =
216 const auto last_fragment_entry =
221 if (last_fragment_entry->second.empty()) {
228 return (last_fragment_entry->second.back().file_path != file_path);
233 const auto last_fragment_entry =
238 if (last_fragment_entry->second.empty()) {
258 std::vector<std::string> new_file_paths;
263 const auto rolled_off_files =
268 for (
const auto& file_path : processed_file_paths) {
277 if (!processed_file_paths.empty()) {
279 if (all_file_paths.size() == 1) {
280 CHECK_EQ(processed_file_paths.size(), size_t(1));
281 CHECK_EQ(processed_file_paths[0], all_file_paths[0]);
284 const auto& last_file_path = processed_file_paths.back();
288 size_t row_count = reader->parquet_reader()->metadata()->num_rows();
293 new_file_paths.emplace_back(last_file_path);
297 for (
const auto& file_path : all_file_paths) {
299 new_file_paths.emplace_back(file_path);
308 if (!new_file_paths.empty()) {
314 const std::set<std::string>& rolled_off_files) {
315 if (!rolled_off_files.empty()) {
316 std::set<int32_t> deleted_fragment_ids;
317 std::optional<int32_t> partially_deleted_fragment_id;
318 std::vector<std::string> remaining_files_in_partially_deleted_fragment;
319 for (
auto& [fragment_id, row_group_interval_vec] :
321 for (
auto it = row_group_interval_vec.begin();
322 it != row_group_interval_vec.end();) {
324 it = row_group_interval_vec.erase(it);
326 remaining_files_in_partially_deleted_fragment.emplace_back(it->file_path);
330 if (row_group_interval_vec.empty()) {
331 deleted_fragment_ids.emplace(fragment_id);
333 CHECK(!remaining_files_in_partially_deleted_fragment.empty());
334 partially_deleted_fragment_id = fragment_id;
340 const auto& chunk_key = it->first;
342 auto& chunk_metadata = it->second;
343 chunk_metadata->numElements = 0;
344 chunk_metadata->numBytes = 0;
346 }
else if (partially_deleted_fragment_id.has_value() &&
355 if (partially_deleted_fragment_id.has_value()) {
358 auto row_group_metadata_map =
362 auto column_interval =
364 schema_->getLogicalAndPhysicalColumns().back()->columnId};
366 fragment_to_row_group_interval_map_, partially_deleted_fragment_id.value());
367 for (
const auto& row_group_interval : row_group_intervals) {
368 for (
auto row_group = row_group_interval.start_index;
369 row_group <= row_group_interval.end_index;
372 row_group_metadata_map.find({row_group_interval.file_path, row_group});
373 CHECK(itr != row_group_metadata_map.end());
375 itr->second.column_chunk_metadata,
376 partially_deleted_fragment_id.value());
384 std::vector<std::string> file_paths;
386 for (
const auto& row_group_interval : entry.second) {
387 if (file_paths.empty() || file_paths.back() != row_group_interval.file_path) {
388 file_paths.emplace_back(row_group_interval.file_path);
397 std::vector<std::string> found_file_paths;
406 return found_file_paths;
415 const std::list<RowGroupMetadata>& row_group_metadata) {
416 auto column_interval =
418 schema_->getLogicalAndPhysicalColumns().back()->columnId};
419 for (
const auto& row_group_metadata_item : row_group_metadata) {
420 const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
421 auto column_chunk_metadata_iter = column_chunk_metadata.begin();
422 const auto import_row_count = (*column_chunk_metadata_iter)->numElements;
423 auto row_group = row_group_metadata_item.row_group_index;
424 const auto& file_path = row_group_metadata_item.file_path;
442 const std::vector<std::string>& file_paths)
const {
450 const std::list<std::shared_ptr<ChunkMetadata>>& column_chunk_metadata,
451 int32_t fragment_id) {
452 CHECK_EQ(static_cast<int>(column_chunk_metadata.size()),
453 schema_->numLogicalAndPhysicalColumns());
454 auto column_chunk_metadata_iter = column_chunk_metadata.begin();
455 for (
auto column_id = column_interval.
start; column_id <= column_interval.
end;
456 column_id++, column_chunk_metadata_iter++) {
457 CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
458 const auto column_descriptor =
schema_->getColumnDescriptor(column_id);
459 const auto& type_info = column_descriptor->columnType;
461 type_info.is_varlen_indeed()
464 std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
483 chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
488 const int logical_column_id,
489 const int fragment_id,
492 const auto& row_group_intervals =
495 if (row_group_intervals.empty()) {
502 schema_->getColumnDescriptor(logical_column_id);
503 auto parquet_column_index =
schema_->getParquetColumnIndex(logical_column_id);
510 const bool is_dictionary_encoded_string_column =
516 if (is_dictionary_encoded_string_column) {
517 auto dict_descriptor =
519 CHECK(dict_descriptor);
520 string_dictionary = dict_descriptor->stringDict.get();
523 std::list<Chunk_NS::Chunk> chunks;
524 for (
int column_id = column_interval.
start; column_id <= column_interval.
end;
526 auto column_descriptor =
schema_->getColumnDescriptor(column_id);
528 if (column_descriptor->columnType.is_varlen_indeed()) {
532 chunk.setBuffer(buffer);
536 chunk.setIndexBuffer(index_buffer);
540 chunk.setBuffer(buffer);
542 chunks.emplace_back(chunk);
545 std::unique_ptr<RejectedRowIndices> rejected_row_indices;
547 rejected_row_indices = std::make_unique<RejectedRowIndices>();
551 auto metadata = chunk_loader.
loadChunk(row_group_intervals,
552 parquet_column_index,
555 rejected_row_indices.get());
562 CHECK(!chunks.empty());
563 CHECK(chunks.begin()->getBuffer()->hasEncoder());
564 auto num_rows_in_chunk = chunks.begin()->getBuffer()->getEncoder()->getNumElems();
567 if (delete_buffer->
size() < num_rows_in_chunk) {
568 auto remaining_rows = num_rows_in_chunk - delete_buffer->
size();
569 std::vector<int8_t> data(remaining_rows,
false);
570 delete_buffer->
append(data.data(), remaining_rows);
575 CHECK(rejected_row_indices);
576 auto delete_buffer_data = delete_buffer->
getMemoryPtr();
577 for (
const auto& rejected_index : *rejected_row_indices) {
578 CHECK_GT(delete_buffer->
size(),
static_cast<size_t>(rejected_index));
579 delete_buffer_data[rejected_index] =
true;
583 auto metadata_iter = metadata.begin();
584 for (
int column_id = column_interval.
start; column_id <= column_interval.
end;
585 ++column_id, ++metadata_iter) {
586 auto column =
schema_->getColumnDescriptor(column_id);
588 if (column->columnType.is_varlen_indeed()) {
589 data_chunk_key.emplace_back(1);
595 auto cached_metadata_previous =
598 std::make_shared<ChunkMetadata>();
600 *cached_metadata = *cached_metadata_previous;
602 CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
603 cached_metadata->numBytes =
609 CHECK(metadata_iter != metadata.end());
610 cached_metadata->chunkStats = (*metadata_iter)->chunkStats;
615 ->resetChunkStats(cached_metadata->chunkStats);
624 buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
625 buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
627 CHECK(!buffers_to_load.empty());
629 std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
630 for (
const auto& [chunk_key, buffer] : buffers_to_load) {
631 CHECK_EQ(buffer->size(),
static_cast<size_t>(0));
632 col_frag_hints.emplace(
639 std::function<void(const std::set<ForeignStorageMgr::ParallelismHint>&)> lambda =
640 [&,
this](
const std::set<ForeignStorageMgr::ParallelismHint>& hint_set) {
645 for (
const auto& [col_id, frag_id] : hint_set) {
647 col_id, frag_id, buffers_to_load, delete_buffer);
649 << col_id <<
"," << frag_id;
656 VLOG(1) <<
"Populating chunk from parquet source using " +
std::to_string(num_threads) +
662 for (
auto& future : futures) {
666 for (
auto& future : futures) {
673 rapidjson::Document::AllocatorType& allocator) {
674 json_val.SetObject();
681 CHECK(json_val.IsObject());
688 rapidjson::Document d;
693 "fragment_to_row_group_interval_map",
708 const std::string& file_path,
720 if (d.HasMember(
"last_file_row_count")) {
725 for (
const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
739 if (file_paths.empty()) {
743 return chunk_loader.previewFiles(file_paths, num_rows, *
foreign_table_);
747 std::optional<int32_t> first_deleted_fragment_id;
750 const auto fragment_id = it->first;
751 const auto& row_group_intervals = it->second;
752 for (
const auto& row_group_interval : row_group_intervals) {
753 if (first_deleted_fragment_id.has_value()) {
755 CHECK_EQ(last_file_path, row_group_interval.file_path);
756 }
else if (last_file_path == row_group_interval.file_path) {
757 first_deleted_fragment_id = fragment_id;
760 if (first_deleted_fragment_id.has_value() &&
761 first_deleted_fragment_id.value() < fragment_id) {
767 CHECK(first_deleted_fragment_id.has_value());
769 std::map<int32_t, size_t> remaining_fragments_row_counts;
772 if (fragment_id >= first_deleted_fragment_id.value()) {
775 auto fragment_count_it = remaining_fragments_row_counts.find(fragment_id);
776 if (fragment_count_it == remaining_fragments_row_counts.end()) {
777 remaining_fragments_row_counts[fragment_id] = it->second->numElements;
779 CHECK_EQ(remaining_fragments_row_counts[fragment_id], it->second->numElements);
786 for (
const auto [fragment_id, row_count] : remaining_fragments_row_counts) {
794 auto it = std::find_if(row_group_intervals_to_scan.begin(),
795 row_group_intervals_to_scan.end(),
796 [&last_file_path](
const auto& row_group_interval) {
797 return row_group_interval.file_path == last_file_path;
799 CHECK(it != row_group_intervals_to_scan.end());
800 row_group_intervals_to_scan.erase(it, row_group_intervals_to_scan.end());
802 if (first_deleted_fragment_id.value() > 0) {
806 const auto& last_row_group_intervals =
808 if (last_row_group_intervals.empty()) {
819 if (!row_group_intervals_to_scan.empty()) {
825 const std::vector<RowGroupInterval>& row_group_intervals) {
826 std::vector<std::string> file_paths;
827 for (
const auto& row_group_interval : row_group_intervals) {
828 file_paths.emplace_back(row_group_interval.file_path);
831 std::list<RowGroupMetadata> row_group_metadata;
832 for (
const auto& row_group_interval : row_group_intervals) {
833 for (
auto row_group = row_group_interval.start_index;
834 row_group <= row_group_interval.end_index;
837 row_group_metadata_map, {row_group_interval.file_path, row_group}));
843 std::map<FilePathAndRowGroup, RowGroupMetadata>
845 const std::vector<std::string>& file_paths)
const {
847 std::map<FilePathAndRowGroup, RowGroupMetadata> row_group_metadata_map;
848 for (
const auto& row_group_metadata_item : row_group_metadata) {
849 row_group_metadata_map[{row_group_metadata_item.file_path,
850 row_group_metadata_item.row_group_index}] =
851 row_group_metadata_item;
853 return row_group_metadata_map;
bool contains(const T &container, const U &element)
std::string getSerializedDataWrapper() const override
size_t last_fragment_row_count_
void finalizeFragmentMap()
std::vector< int > ChunkKey
std::unique_ptr< FileReaderMap > file_reader_cache_
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
void updateMetadataForRolledOffFiles(const std::set< std::string > &rolled_off_files)
std::vector< std::string > getOrderedProcessedFilePaths()
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
void setLastFileRowCount(const std::string &file_path)
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
static const std::string LOCAL_FILE_STORAGE_TYPE
std::vector< std::string > getAllFilePaths()
void throw_removed_row_in_file_error(const std::string &file_path)
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
size_t get_num_threads(const ForeignTable &table)
virtual int8_t * getMemoryPtr()=0
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::mutex delete_buffer_mutex_
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
void metadataScanRowGroupIntervals(const std::vector< RowGroupInterval > &row_group_intervals)
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::set< std::string > check_for_rolled_off_file_paths(const std::vector< std::string > &all_file_paths, std::vector< std::string > &processed_file_paths)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
rapidjson::Document read_from_file(const std::string &file_path)
void throw_removed_file_error(const std::string &file_path)
void resetParquetMetadata()
This file contains the class specification and related data structures for Catalog.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
const bool do_metadata_stats_validation_
void addNewFile(const std::string &file_path)
void fetchChunkMetadata()
DataPreview getDataPreview(const size_t num_rows)
int get_physical_cols() const
const ForeignTable * foreign_table_
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
static SysCatalog & instance()
static const std::string STORAGE_TYPE_KEY
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
void metadataScanRowGroupMetadata(const std::list< RowGroupMetadata > &row_group_metadata)
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
void updateChunkMetadataForFragment(const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
size_t last_file_row_count_
std::string write_to_string(const rapidjson::Document &document)
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
bool g_enable_smem_group_by true
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
void removeMetadataForLastFile(const std::string &last_file_path)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
static bool allowFileRollOff(const ForeignTable *foreign_table)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
void metadataScanFiles(const std::vector< std::string > &file_paths)
LocalIdsScopeGuard setNewThreadId() const
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
HOST DEVICE int get_comp_param() const
const ForeignServer * foreign_server
Encoder * getEncoder() const
bool g_enable_watchdog false
#define DEBUG_TIMER(name)
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)
bool is_dict_encoded_string() const
#define CHUNK_KEY_COLUMN_IDX
std::map< FilePathAndRowGroup, RowGroupMetadata > getRowGroupMetadataMap(const std::vector< std::string > &file_paths) const
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
SQLTypeInfo get_elem_type() const
bool isRestored() const override
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
ThreadLocalIds thread_local_ids()
std::list< RowGroupMetadata > getRowGroupMetadataForFilePaths(const std::vector< std::string > &file_paths) const