21 #include <arrow/filesystem/localfs.h>
22 #include <boost/filesystem.hpp>
34 namespace foreign_storage {
40 std::shared_ptr<arrow::fs::FileSystem> file_system)
81 num_row_groups_ = file_reader->parquet_reader()->metadata()->num_row_groups();
103 std::optional<Fragmenter_Namespace::InsertData>
getInsertData()
const override;
106 std::pair<std::map<int, Chunk_NS::Chunk>, std::map<int, StringDictionary*>>
111 const size_t num_rows_rejected);
124 const size_t num_rows_rejected) {
130 const std::map<int, Chunk_NS::Chunk>& chunks) {
132 size_t num_rows = chunks.begin()->second.getBuffer()->getEncoder()->getNumElems();
133 for (
const auto& [column_id, chunk] : chunks) {
134 auto column_descriptor = chunk.getColumnDesc();
135 CHECK(chunk.getBuffer()->getEncoder()->getNumElems() == num_rows);
137 auto buffer = chunk.getBuffer();
139 if (column_descriptor->columnType.is_array()) {
141 block_ptr.
arraysPtr = array_buffer->getBufferPtr();
142 }
else if ((column_descriptor->columnType.is_string() &&
143 !column_descriptor->columnType.is_dict_encoded_string()) ||
144 column_descriptor->columnType.is_geometry()) {
146 block_ptr.
stringsPtr = string_buffer->getBufferPtr();
148 block_ptr.
numbersPtr = buffer->getMemoryPtr();
158 std::pair<std::map<int, Chunk_NS::Chunk>, std::map<int, StringDictionary*>>
160 std::map<int, Chunk_NS::Chunk> chunks;
161 std::map<int, StringDictionary*> string_dictionaries;
165 const bool is_dictionary_encoded_string_column =
166 column_descriptor->columnType.is_dict_encoded_string() ||
167 (column_descriptor->columnType.is_array() &&
168 column_descriptor->columnType.get_elem_type().is_dict_encoded_string());
170 if (is_dictionary_encoded_string_column) {
171 auto dict_descriptor = catalog->getMetadataForDict(
172 column_descriptor->columnType.get_comp_param(),
true);
173 CHECK(dict_descriptor);
174 auto string_dictionary = dict_descriptor->stringDict.get();
175 string_dictionaries[column_descriptor->columnId] = string_dictionary;
180 if (column_descriptor->columnType.is_varlen_indeed()) {
181 chunk.setIndexBuffer(
nullptr);
184 chunks[column_descriptor->columnId] = chunk;
186 return {chunks, string_dictionaries};
192 : foreign_table_(foreign_table), db_id_(db_id), schema_(schema) {
194 if (column_descriptor->columnType.is_array()) {
196 std::make_unique<TypedParquetStorageBuffer<ArrayDatum>>();
197 }
else if ((column_descriptor->columnType.is_string() &&
198 !column_descriptor->columnType.is_dict_encoded_string()) ||
199 column_descriptor->columnType.is_geometry()) {
201 std::make_unique<TypedParquetStorageBuffer<std::string>>();
204 std::make_unique<ForeignStorageBuffer>();
219 return schema_->numLogicalColumns();
227 : db_id_(-1), foreign_table_(nullptr), num_threads_(1) {}
233 , foreign_table_(foreign_table)
239 file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
247 std::set<std::string> file_paths;
249 arrow::Result<arrow::fs::FileInfo> file_info_result;
250 arrow::Result<std::vector<arrow::fs::FileInfo>> selector_result;
252 auto get_file_info_timer =
DEBUG_TIMER(
"GetFileInfo-file_info");
253 file_info_result =
file_system_->GetFileInfo(file_path);
255 if (!file_info_result.ok()) {
258 auto& file_info = file_info_result.ValueOrDie();
259 if (file_info.type() == arrow::fs::FileType::NotFound) {
261 }
else if (file_info.type() == arrow::fs::FileType::File) {
262 file_paths.emplace(file_path);
264 CHECK_EQ(arrow::fs::FileType::Directory, file_info.type());
265 arrow::fs::FileSelector file_selector{};
266 file_selector.base_dir = file_path;
267 file_selector.recursive =
true;
269 auto get_file_info_timer =
DEBUG_TIMER(
"GetFileInfo-selector");
270 selector_result =
file_system_->GetFileInfo(file_selector);
272 if (!selector_result.ok()) {
275 auto& file_info_vector = selector_result.ValueOrDie();
276 for (
const auto& file_info : file_info_vector) {
277 if (file_info.type() == arrow::fs::FileType::File) {
278 file_paths.emplace(file_info.path());
302 std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>>
316 auto import_batch_result =
318 auto [chunks, string_dictionaries] = import_batch_result->getChunksAndDictionaries();
324 for (
const auto& [column_id, dict] : string_dictionaries) {
334 std::optional<RowGroupInterval> next_row_group;
339 size_t num_rows_completed, num_rows_rejected;
340 if (next_row_group.has_value()) {
341 std::tie(num_rows_completed, num_rows_rejected) = chunk_loader.loadRowGroups(
344 return import_batch_result;
348 import_batch_result->populateImportStatus(num_rows_completed, num_rows_rejected);
349 import_batch_result->populateInsertData(chunks);
351 return import_batch_result;
355 const std::string& file_path,
std::set< std::string > getAllFilePaths()
void advanceToNextRowGroup()
std::shared_mutex row_group_interval_tracker_mutex_
void populateImportStatus(const size_t num_rows_completed, const size_t num_rows_rejected)
std::pair< std::map< int, Chunk_NS::Chunk >, std::map< int, StringDictionary * > > getChunksAndDictionaries() const
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::vector< std::string > * stringsPtr
void setNumThreads(const int num_threads)
std::unique_ptr< AbstractRowGroupIntervalTracker > row_group_interval_tracker_
std::vector< ArrayDatum > * arraysPtr
std::unique_ptr< ForeignTableSchema > schema_
static const std::string LOCAL_FILE_STORAGE_TYPE
void throw_file_access_error(const std::string &file_path, const std::string &message)
std::optional< Fragmenter_Namespace::InsertData > insert_data_
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::shared_mutex string_dictionaries_per_column_mutex_
std::optional< RowGroupInterval > getNextRowGroupInterval() override
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > getStringDictionaries() const
void throw_file_not_found_error(const std::string &file_path)
void setBuffer(AbstractBuffer *b)
std::set< std::string > file_paths_
This file contains the class specification and related data structures for Catalog.
std::set< std::string >::const_iterator current_file_iter_
const ForeignTableSchema * schema_
static SysCatalog & instance()
static const std::string STORAGE_TYPE_KEY
bool isRestored() const override
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
std::unique_lock< T > unique_lock
An AbstractBuffer is a unit of data management for a data manager.
FileReaderMap * file_reader_cache_
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
const ForeignTable * foreign_table_
std::map< int, std::unique_ptr< AbstractBuffer > > import_buffers_
std::optional< Fragmenter_Namespace::InsertData > getInsertData() const override
std::string getSerializedDataWrapper() const override
void populateInsertData(const std::map< int, Chunk_NS::Chunk > &chunks)
const ForeignTable * foreign_table_
RowGroupIntervalTracker(const std::set< std::string > &file_paths, FileReaderMap *file_reader_cache, std::shared_ptr< arrow::fs::FileSystem > file_system)
int current_row_group_index_
import_export::ImportStatus getImportStatus() const override
std::unique_ptr< FileReaderMap > file_reader_cache_
const ForeignServer * foreign_server
bool g_enable_watchdog false
#define DEBUG_TIMER(name)
The data to be inserted using the fragment manager.
int getMaxNumUsefulThreads() const
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::unique_ptr< import_export::ImportBatchResult > getNextImportBatch()
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
ParquetImportBatchResult()=default
import_export::ImportStatus import_status_
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > string_dictionaries_per_column_