OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ParquetDataWrapper.h"
18 
19 #include <queue>
20 
21 #include <arrow/filesystem/localfs.h>
22 #include <boost/filesystem.hpp>
23 
24 #include "Catalog/Catalog.h"
26 #include "FsiChunkUtils.h"
27 #include "LazyParquetChunkLoader.h"
28 #include "ParquetShared.h"
29 #include "Shared/JsonUtils.h"
30 #include "Shared/SysDefinitions.h"
31 #include "Shared/file_path_util.h"
32 #include "Shared/misc.h"
33 #include "Utils/DdlUtils.h"
34 
35 namespace foreign_storage {
36 
37 namespace {
38 void reduce_metadata(std::shared_ptr<ChunkMetadata> reduce_to,
39  std::shared_ptr<ChunkMetadata> reduce_from) {
40  CHECK(reduce_to->sqlType == reduce_from->sqlType);
41  reduce_to->numBytes += reduce_from->numBytes;
42  reduce_to->numElements += reduce_from->numElements;
43  reduce_to->chunkStats.has_nulls |= reduce_from->chunkStats.has_nulls;
44 
45  auto column_type = reduce_to->sqlType;
46  column_type = column_type.is_array() ? column_type.get_elem_type() : column_type;
47 
48  // metadata reducution is done at metadata scan time, both string & geometry
49  // columns have no valid stats to reduce beyond `has_nulls`
50  if (column_type.is_string() || column_type.is_geometry()) {
51  // Reset to invalid range, as formerly valid metadata
52  // needs to be invalidated during an append for these types
53  reduce_to->chunkStats.max = reduce_from->chunkStats.max;
54  reduce_to->chunkStats.min = reduce_from->chunkStats.min;
55  return;
56  }
57 
58  ForeignStorageBuffer buffer_to;
59  buffer_to.initEncoder(column_type);
60  auto encoder_to = buffer_to.getEncoder();
61  encoder_to->resetChunkStats(reduce_to->chunkStats);
62 
63  ForeignStorageBuffer buffer_from;
64  buffer_from.initEncoder(column_type);
65  auto encoder_from = buffer_from.getEncoder();
66  encoder_from->resetChunkStats(reduce_from->chunkStats);
67 
68  encoder_to->reduceStats(*encoder_from);
69  auto updated_metadata = std::make_shared<ChunkMetadata>();
70  encoder_to->getMetadata(updated_metadata);
71  reduce_to->chunkStats = updated_metadata->chunkStats;
72 }
73 } // namespace
74 
76  : do_metadata_stats_validation_(true), db_id_(-1), foreign_table_(nullptr) {}
77 
79  std::shared_ptr<arrow::fs::FileSystem> file_system)
80  : do_metadata_stats_validation_(false)
81  , db_id_(-1)
82  , foreign_table_(foreign_table)
83  , last_fragment_index_(0)
84  , last_fragment_row_count_(0)
85  , total_row_count_(0)
86  , last_file_row_count_(0)
87  , last_row_group_(0)
88  , is_restored_(false)
89  , file_system_(file_system)
90  , file_reader_cache_(std::make_unique<FileReaderMap>()) {}
91 
93  const ForeignTable* foreign_table,
94  const bool do_metadata_stats_validation)
95  : do_metadata_stats_validation_(do_metadata_stats_validation)
96  , db_id_(db_id)
97  , foreign_table_(foreign_table)
98  , last_fragment_index_(0)
99  , last_fragment_row_count_(0)
100  , total_row_count_(0)
101  , last_file_row_count_(0)
102  , last_row_group_(0)
103  , is_restored_(false)
104  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
105  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
106  auto& server_options = foreign_table->foreign_server->options;
107  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
108  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
109  } else {
110  UNREACHABLE();
111  }
112 }
113 
117 
118  last_row_group_ = 0;
121  total_row_count_ = 0;
123  file_reader_cache_->clear();
124 }
125 
126 std::list<const ColumnDescriptor*> ParquetDataWrapper::getColumnsToInitialize(
127  const Interval<ColumnType>& column_interval) {
129  CHECK(catalog);
130  const auto& columns = schema_->getLogicalAndPhysicalColumns();
131  auto column_start = column_interval.start;
132  auto column_end = column_interval.end;
133  std::list<const ColumnDescriptor*> columns_to_init;
134  for (const auto column : columns) {
135  auto column_id = column->columnId;
136  if (column_id >= column_start && column_id <= column_end) {
137  columns_to_init.push_back(column);
138  }
139  }
140  return columns_to_init;
141 }
142 
144  const int fragment_index,
145  const Interval<ColumnType>& column_interval,
146  const ChunkToBufferMap& required_buffers,
147  const bool reserve_buffers_and_set_stats) {
148  for (const auto column : getColumnsToInitialize(column_interval)) {
149  Chunk_NS::Chunk chunk{column, false};
150  ChunkKey data_chunk_key;
151  if (column->columnType.is_varlen_indeed()) {
152  data_chunk_key = {
153  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
154  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
155  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
156  chunk.setBuffer(data_buffer);
157 
158  ChunkKey index_chunk_key{
159  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
160  CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
161  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
162  chunk.setIndexBuffer(index_buffer);
163  } else {
164  data_chunk_key = {
165  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
166  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
167  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
168  chunk.setBuffer(data_buffer);
169  }
170  chunk.initEncoder();
171  if (reserve_buffers_and_set_stats) {
172  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
173  CHECK(metadata_it != chunk_metadata_map_.end());
174  auto buffer = chunk.getBuffer();
175  auto& metadata = metadata_it->second;
176  auto encoder = buffer->getEncoder();
177  encoder->resetChunkStats(metadata->chunkStats);
178  encoder->setNumElems(metadata->numElements);
179  if ((column->columnType.is_string() &&
180  column->columnType.get_compression() == kENCODING_NONE) ||
181  column->columnType.is_geometry()) {
182  // non-dictionary string or geometry WKT string
183  auto index_buffer = chunk.getIndexBuf();
184  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
185  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
186  auto index_buffer = chunk.getIndexBuf();
187  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
188  } else {
189  size_t num_bytes_to_reserve =
190  metadata->numElements * column->columnType.get_size();
191  buffer->reserve(num_bytes_to_reserve);
192  }
193  }
194  }
195 }
196 
200 }
201 
202 void ParquetDataWrapper::addNewFragment(int row_group, const std::string& file_path) {
203  const auto last_fragment_entry =
205  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
206 
207  last_fragment_entry->second.back().end_index = last_row_group_;
211  RowGroupInterval{file_path, row_group});
212  setLastFileRowCount(file_path);
213 }
214 
215 bool ParquetDataWrapper::isNewFile(const std::string& file_path) const {
216  const auto last_fragment_entry =
218  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
219 
220  // The entry for the first fragment starts out as an empty vector
221  if (last_fragment_entry->second.empty()) {
222  // File roll off can result in empty older fragments.
225  }
226  return true;
227  } else {
228  return (last_fragment_entry->second.back().file_path != file_path);
229  }
230 }
231 
232 void ParquetDataWrapper::addNewFile(const std::string& file_path) {
233  const auto last_fragment_entry =
235  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
236 
237  // The entry for the first fragment starts out as an empty vector
238  if (last_fragment_entry->second.empty()) {
239  // File roll off can result in empty older fragments.
242  }
243  } else {
244  last_fragment_entry->second.back().end_index = last_row_group_;
245  }
246  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
247  setLastFileRowCount(file_path);
248 }
249 
250 void ParquetDataWrapper::setLastFileRowCount(const std::string& file_path) {
251  auto reader = file_reader_cache_->getOrInsert(file_path, file_system_);
252  last_file_row_count_ = reader->parquet_reader()->metadata()->num_rows();
253 }
254 
257  CHECK(catalog);
258  std::vector<std::string> new_file_paths;
259  auto processed_file_paths = getOrderedProcessedFilePaths();
260  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
261  auto all_file_paths = getAllFilePaths();
263  const auto rolled_off_files =
264  shared::check_for_rolled_off_file_paths(all_file_paths, processed_file_paths);
265  updateMetadataForRolledOffFiles(rolled_off_files);
266  }
267 
268  for (const auto& file_path : processed_file_paths) {
269  if (!shared::contains(all_file_paths, file_path)) {
270  throw_removed_file_error(file_path);
271  }
272  }
273 
274  // For multi-file appends, reprocess the last file in order to account for appends
275  // that may have occurred to this file. For single file appends, reprocess file if new
276  // rows have been added.
277  if (!processed_file_paths.empty()) {
278  // Single file append
279  if (all_file_paths.size() == 1) {
280  CHECK_EQ(processed_file_paths.size(), size_t(1));
281  CHECK_EQ(processed_file_paths[0], all_file_paths[0]);
282  }
283 
284  const auto& last_file_path = processed_file_paths.back();
285  // Since an existing file is being appended to we need to update the cached
286  // FileReader as the existing one will be out of date.
287  auto reader = file_reader_cache_->insert(last_file_path, file_system_);
288  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
289  if (row_count < last_file_row_count_) {
290  throw_removed_row_in_file_error(last_file_path);
291  } else if (row_count > last_file_row_count_) {
292  removeMetadataForLastFile(last_file_path);
293  new_file_paths.emplace_back(last_file_path);
294  }
295  }
296 
297  for (const auto& file_path : all_file_paths) {
298  if (!shared::contains(processed_file_paths, file_path)) {
299  new_file_paths.emplace_back(file_path);
300  }
301  }
302  } else {
303  CHECK(chunk_metadata_map_.empty());
304  new_file_paths = getAllFilePaths();
306  }
307 
308  if (!new_file_paths.empty()) {
309  metadataScanFiles(new_file_paths);
310  }
311 }
312 
314  const std::set<std::string>& rolled_off_files) {
315  if (!rolled_off_files.empty()) {
316  std::set<int32_t> deleted_fragment_ids;
317  std::optional<int32_t> partially_deleted_fragment_id;
318  std::vector<std::string> remaining_files_in_partially_deleted_fragment;
319  for (auto& [fragment_id, row_group_interval_vec] :
321  for (auto it = row_group_interval_vec.begin();
322  it != row_group_interval_vec.end();) {
323  if (shared::contains(rolled_off_files, it->file_path)) {
324  it = row_group_interval_vec.erase(it);
325  } else {
326  remaining_files_in_partially_deleted_fragment.emplace_back(it->file_path);
327  it++;
328  }
329  }
330  if (row_group_interval_vec.empty()) {
331  deleted_fragment_ids.emplace(fragment_id);
332  } else {
333  CHECK(!remaining_files_in_partially_deleted_fragment.empty());
334  partially_deleted_fragment_id = fragment_id;
335  break;
336  }
337  }
338 
339  for (auto it = chunk_metadata_map_.begin(); it != chunk_metadata_map_.end();) {
340  const auto& chunk_key = it->first;
341  if (shared::contains(deleted_fragment_ids, chunk_key[CHUNK_KEY_FRAGMENT_IDX])) {
342  auto& chunk_metadata = it->second;
343  chunk_metadata->numElements = 0;
344  chunk_metadata->numBytes = 0;
345  it++;
346  } else if (partially_deleted_fragment_id.has_value() &&
347  chunk_key[CHUNK_KEY_FRAGMENT_IDX] == partially_deleted_fragment_id) {
348  // Metadata for the partially deleted fragment will be re-populated.
349  it = chunk_metadata_map_.erase(it);
350  } else {
351  it++;
352  }
353  }
354 
355  if (partially_deleted_fragment_id.has_value()) {
356  // Create map of row group to row group metadata for remaining files in the
357  // fragment.
358  auto row_group_metadata_map =
359  getRowGroupMetadataMap(remaining_files_in_partially_deleted_fragment);
360 
361  // Re-populate metadata for remaining row groups in partially deleted fragment.
362  auto column_interval =
363  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
364  schema_->getLogicalAndPhysicalColumns().back()->columnId};
365  auto row_group_intervals = shared::get_from_map(
366  fragment_to_row_group_interval_map_, partially_deleted_fragment_id.value());
367  for (const auto& row_group_interval : row_group_intervals) {
368  for (auto row_group = row_group_interval.start_index;
369  row_group <= row_group_interval.end_index;
370  row_group++) {
371  auto itr =
372  row_group_metadata_map.find({row_group_interval.file_path, row_group});
373  CHECK(itr != row_group_metadata_map.end());
374  updateChunkMetadataForFragment(column_interval,
375  itr->second.column_chunk_metadata,
376  partially_deleted_fragment_id.value());
377  }
378  }
379  }
380  }
381 }
382 
384  std::vector<std::string> file_paths;
385  for (const auto& entry : fragment_to_row_group_interval_map_) {
386  for (const auto& row_group_interval : entry.second) {
387  if (file_paths.empty() || file_paths.back() != row_group_interval.file_path) {
388  file_paths.emplace_back(row_group_interval.file_path);
389  }
390  }
391  }
392  return file_paths;
393 }
394 
395 std::vector<std::string> ParquetDataWrapper::getAllFilePaths() {
396  auto timer = DEBUG_TIMER(__func__);
397  std::vector<std::string> found_file_paths;
398  auto file_path = getFullFilePath(foreign_table_);
399  const auto file_path_options = getFilePathOptions(foreign_table_);
400  auto& server_options = foreign_table_->foreign_server->options;
401  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
402  found_file_paths = shared::local_glob_filter_sort_files(file_path, file_path_options);
403  } else {
404  UNREACHABLE();
405  }
406  return found_file_paths;
407 }
408 
409 void ParquetDataWrapper::metadataScanFiles(const std::vector<std::string>& file_paths) {
410  auto row_group_metadata = getRowGroupMetadataForFilePaths(file_paths);
411  metadataScanRowGroupMetadata(row_group_metadata);
412 }
413 
415  const std::list<RowGroupMetadata>& row_group_metadata) {
416  auto column_interval =
417  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
418  schema_->getLogicalAndPhysicalColumns().back()->columnId};
419  for (const auto& row_group_metadata_item : row_group_metadata) {
420  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
421  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
422  const auto import_row_count = (*column_chunk_metadata_iter)->numElements;
423  auto row_group = row_group_metadata_item.row_group_index;
424  const auto& file_path = row_group_metadata_item.file_path;
425  if (moveToNextFragment(import_row_count)) {
426  addNewFragment(row_group, file_path);
427  } else if (isNewFile(file_path)) {
428  CHECK_EQ(row_group, 0);
429  addNewFile(file_path);
430  }
431  last_row_group_ = row_group;
433  column_interval, column_chunk_metadata, last_fragment_index_);
434 
435  last_fragment_row_count_ += import_row_count;
436  total_row_count_ += import_row_count;
437  }
439 }
440 
442  const std::vector<std::string>& file_paths) const {
443  LazyParquetChunkLoader chunk_loader(
445  return chunk_loader.metadataScan(file_paths, *schema_, do_metadata_stats_validation_);
446 }
447 
449  const Interval<ColumnType>& column_interval,
450  const std::list<std::shared_ptr<ChunkMetadata>>& column_chunk_metadata,
451  int32_t fragment_id) {
452  CHECK_EQ(static_cast<int>(column_chunk_metadata.size()),
453  schema_->numLogicalAndPhysicalColumns());
454  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
455  for (auto column_id = column_interval.start; column_id <= column_interval.end;
456  column_id++, column_chunk_metadata_iter++) {
457  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
458  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
459  const auto& type_info = column_descriptor->columnType;
460  ChunkKey const data_chunk_key =
461  type_info.is_varlen_indeed()
462  ? ChunkKey{db_id_, foreign_table_->tableId, column_id, fragment_id, 1}
463  : ChunkKey{db_id_, foreign_table_->tableId, column_id, fragment_id};
464  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
465 
466  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
467  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
468  } else {
469  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
470  }
471  }
472 }
473 
474 bool ParquetDataWrapper::moveToNextFragment(size_t new_rows_count) const {
475  return (last_fragment_row_count_ + new_rows_count) >
476  static_cast<size_t>(foreign_table_->maxFragRows);
477 }
478 
480  ChunkMetadataVector& chunk_metadata_vector) {
482  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
483  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
484  }
485 }
486 
488  const int logical_column_id,
489  const int fragment_id,
490  const ChunkToBufferMap& required_buffers,
491  AbstractBuffer* delete_buffer) {
492  const auto& row_group_intervals =
494  // File roll off can lead to an empty row group interval vector.
495  if (row_group_intervals.empty()) {
496  return;
497  }
498 
500  CHECK(catalog);
501  const ColumnDescriptor* logical_column =
502  schema_->getColumnDescriptor(logical_column_id);
503  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
504 
505  const Interval<ColumnType> column_interval = {
506  logical_column_id,
507  logical_column_id + logical_column->columnType.get_physical_cols()};
508  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
509 
510  const bool is_dictionary_encoded_string_column =
511  logical_column->columnType.is_dict_encoded_string() ||
512  (logical_column->columnType.is_array() &&
513  logical_column->columnType.get_elem_type().is_dict_encoded_string());
514 
515  StringDictionary* string_dictionary = nullptr;
516  if (is_dictionary_encoded_string_column) {
517  auto dict_descriptor =
518  catalog->getMetadataForDict(logical_column->columnType.get_comp_param(), true);
519  CHECK(dict_descriptor);
520  string_dictionary = dict_descriptor->stringDict.get();
521  }
522 
523  std::list<Chunk_NS::Chunk> chunks;
524  for (int column_id = column_interval.start; column_id <= column_interval.end;
525  ++column_id) {
526  auto column_descriptor = schema_->getColumnDescriptor(column_id);
527  Chunk_NS::Chunk chunk{column_descriptor, false};
528  if (column_descriptor->columnType.is_varlen_indeed()) {
529  ChunkKey data_chunk_key = {
530  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
531  auto buffer = shared::get_from_map(required_buffers, data_chunk_key);
532  chunk.setBuffer(buffer);
533  ChunkKey index_chunk_key = {
534  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
535  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
536  chunk.setIndexBuffer(index_buffer);
537  } else {
538  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
539  auto buffer = shared::get_from_map(required_buffers, chunk_key);
540  chunk.setBuffer(buffer);
541  }
542  chunks.emplace_back(chunk);
543  }
544 
545  std::unique_ptr<RejectedRowIndices> rejected_row_indices;
546  if (delete_buffer) {
547  rejected_row_indices = std::make_unique<RejectedRowIndices>();
548  }
549  LazyParquetChunkLoader chunk_loader(
551  auto metadata = chunk_loader.loadChunk(row_group_intervals,
552  parquet_column_index,
553  chunks,
554  string_dictionary,
555  rejected_row_indices.get());
556 
557  if (delete_buffer) {
558  // all modifying operations on `delete_buffer` must be synchronized as it is a
559  // shared buffer
560  std::unique_lock<std::mutex> delete_buffer_lock(delete_buffer_mutex_);
561 
562  CHECK(!chunks.empty());
563  CHECK(chunks.begin()->getBuffer()->hasEncoder());
564  auto num_rows_in_chunk = chunks.begin()->getBuffer()->getEncoder()->getNumElems();
565 
566  // ensure delete buffer is sized appropriately
567  if (delete_buffer->size() < num_rows_in_chunk) {
568  auto remaining_rows = num_rows_in_chunk - delete_buffer->size();
569  std::vector<int8_t> data(remaining_rows, false);
570  delete_buffer->append(data.data(), remaining_rows);
571  }
572 
573  // compute a logical OR with current `delete_buffer` contents and this chunks
574  // rejected indices
575  CHECK(rejected_row_indices);
576  auto delete_buffer_data = delete_buffer->getMemoryPtr();
577  for (const auto& rejected_index : *rejected_row_indices) {
578  CHECK_GT(delete_buffer->size(), static_cast<size_t>(rejected_index));
579  delete_buffer_data[rejected_index] = true;
580  }
581  }
582 
583  auto metadata_iter = metadata.begin();
584  for (int column_id = column_interval.start; column_id <= column_interval.end;
585  ++column_id, ++metadata_iter) {
586  auto column = schema_->getColumnDescriptor(column_id);
587  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
588  if (column->columnType.is_varlen_indeed()) {
589  data_chunk_key.emplace_back(1);
590  }
591  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
592 
593  // Allocate new shared_ptr for metadata so we dont modify old one which may be used
594  // by executor
595  auto cached_metadata_previous =
596  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
597  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
598  std::make_shared<ChunkMetadata>();
599  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
600  *cached_metadata = *cached_metadata_previous;
601 
602  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
603  cached_metadata->numBytes =
604  shared::get_from_map(required_buffers, data_chunk_key)->size();
605 
606  // for certain types, update the metadata statistics
607  // should update the cache, and the internal chunk_metadata_map_
608  if (is_dictionary_encoded_string_column || logical_column->columnType.is_geometry()) {
609  CHECK(metadata_iter != metadata.end());
610  cached_metadata->chunkStats = (*metadata_iter)->chunkStats;
611 
612  // Update stats on buffer so it is saved in cache
613  shared::get_from_map(required_buffers, data_chunk_key)
614  ->getEncoder()
615  ->resetChunkStats(cached_metadata->chunkStats);
616  }
617  }
618 }
619 
621  const ChunkToBufferMap& optional_buffers,
622  AbstractBuffer* delete_buffer) {
623  ChunkToBufferMap buffers_to_load;
624  buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
625  buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
626 
627  CHECK(!buffers_to_load.empty());
628 
629  std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
630  for (const auto& [chunk_key, buffer] : buffers_to_load) {
631  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
632  col_frag_hints.emplace(
633  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId,
634  chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
635  }
636 
637  const logger::ThreadLocalIds parent_thread_local_ids = logger::thread_local_ids();
638 
639  std::function<void(const std::set<ForeignStorageMgr::ParallelismHint>&)> lambda =
640  [&, this](const std::set<ForeignStorageMgr::ParallelismHint>& hint_set) {
641  // Enable debug timers
642  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
643  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
644 
645  for (const auto& [col_id, frag_id] : hint_set) {
647  col_id, frag_id, buffers_to_load, delete_buffer);
648  VLOG(1) << "Loaded key " << db_id_ << "," << foreign_table_->tableId << ","
649  << col_id << "," << frag_id;
650  }
651  };
652 
655 
656  VLOG(1) << "Populating chunk from parquet source using " + std::to_string(num_threads) +
657  " threads.";
658  auto futures = create_futures_for_workers(col_frag_hints, num_threads, lambda);
659 
660  // We wait on all futures, then call get because we want all threads to have finished
661  // before we propagate a potential exception.
662  for (auto& future : futures) {
663  future.wait();
664  }
665 
666  for (auto& future : futures) {
667  future.get();
668  }
669 }
670 
671 void set_value(rapidjson::Value& json_val,
672  const RowGroupInterval& value,
673  rapidjson::Document::AllocatorType& allocator) {
674  json_val.SetObject();
675  json_utils::add_value_to_object(json_val, value.file_path, "file_path", allocator);
676  json_utils::add_value_to_object(json_val, value.start_index, "start_index", allocator);
677  json_utils::add_value_to_object(json_val, value.end_index, "end_index", allocator);
678 }
679 
680 void get_value(const rapidjson::Value& json_val, RowGroupInterval& value) {
681  CHECK(json_val.IsObject());
682  json_utils::get_value_from_object(json_val, value.file_path, "file_path");
683  json_utils::get_value_from_object(json_val, value.start_index, "start_index");
684  json_utils::get_value_from_object(json_val, value.end_index, "end_index");
685 }
686 
688  rapidjson::Document d;
689  d.SetObject();
690 
693  "fragment_to_row_group_interval_map",
694  d.GetAllocator());
695  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
697  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
699  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
701  d, total_row_count_, "total_row_count", d.GetAllocator());
703  d, last_file_row_count_, "last_file_row_count", d.GetAllocator());
704  return json_utils::write_to_string(d);
705 }
706 
708  const std::string& file_path,
709  const ChunkMetadataVector& chunk_metadata_vector) {
710  auto d = json_utils::read_from_file(file_path);
711  CHECK(d.IsObject());
712 
714  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
716  json_utils::get_value_from_object(d, last_fragment_index_, "last_fragment_index");
718  d, last_fragment_row_count_, "last_fragment_row_count");
720  if (d.HasMember("last_file_row_count")) {
721  json_utils::get_value_from_object(d, last_file_row_count_, "last_file_row_count");
722  }
723 
724  CHECK(chunk_metadata_map_.empty());
725  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
726  chunk_metadata_map_[chunk_key] = chunk_metadata;
727  }
728  is_restored_ = true;
729 }
730 
732  return is_restored_;
733 }
734 
736  LazyParquetChunkLoader chunk_loader(
738  auto file_paths = getAllFilePaths();
739  if (file_paths.empty()) {
740  throw ForeignStorageException{"No file found at \"" +
742  }
743  return chunk_loader.previewFiles(file_paths, num_rows, *foreign_table_);
744 }
745 
746 void ParquetDataWrapper::removeMetadataForLastFile(const std::string& last_file_path) {
747  std::optional<int32_t> first_deleted_fragment_id;
748  for (auto it = fragment_to_row_group_interval_map_.begin();
750  const auto fragment_id = it->first;
751  const auto& row_group_intervals = it->second;
752  for (const auto& row_group_interval : row_group_intervals) {
753  if (first_deleted_fragment_id.has_value()) {
754  // All subsequent fragments should map to the last file.
755  CHECK_EQ(last_file_path, row_group_interval.file_path);
756  } else if (last_file_path == row_group_interval.file_path) {
757  first_deleted_fragment_id = fragment_id;
758  }
759  }
760  if (first_deleted_fragment_id.has_value() &&
761  first_deleted_fragment_id.value() < fragment_id) {
763  } else {
764  it++;
765  }
766  }
767  CHECK(first_deleted_fragment_id.has_value());
768 
769  std::map<int32_t, size_t> remaining_fragments_row_counts;
770  for (auto it = chunk_metadata_map_.begin(); it != chunk_metadata_map_.end();) {
771  auto fragment_id = it->first[CHUNK_KEY_FRAGMENT_IDX];
772  if (fragment_id >= first_deleted_fragment_id.value()) {
773  it = chunk_metadata_map_.erase(it);
774  } else {
775  auto fragment_count_it = remaining_fragments_row_counts.find(fragment_id);
776  if (fragment_count_it == remaining_fragments_row_counts.end()) {
777  remaining_fragments_row_counts[fragment_id] = it->second->numElements;
778  } else {
779  CHECK_EQ(remaining_fragments_row_counts[fragment_id], it->second->numElements);
780  }
781  it++;
782  }
783  }
784 
785  total_row_count_ = 0;
786  for (const auto [fragment_id, row_count] : remaining_fragments_row_counts) {
787  total_row_count_ += row_count;
788  }
789 
790  // Re-populate metadata for last fragment with deleted rows, excluding metadata for the
791  // last file.
792  auto row_group_intervals_to_scan = shared::get_from_map(
793  fragment_to_row_group_interval_map_, first_deleted_fragment_id.value());
794  auto it = std::find_if(row_group_intervals_to_scan.begin(),
795  row_group_intervals_to_scan.end(),
796  [&last_file_path](const auto& row_group_interval) {
797  return row_group_interval.file_path == last_file_path;
798  });
799  CHECK(it != row_group_intervals_to_scan.end());
800  row_group_intervals_to_scan.erase(it, row_group_intervals_to_scan.end());
801 
802  if (first_deleted_fragment_id.value() > 0) {
803  last_fragment_index_ = first_deleted_fragment_id.value() - 1;
805  shared::get_from_map(remaining_fragments_row_counts, last_fragment_index_);
806  const auto& last_row_group_intervals =
808  if (last_row_group_intervals.empty()) {
809  last_row_group_ = 0;
810  } else {
811  last_row_group_ = last_row_group_intervals.back().end_index;
812  }
813  fragment_to_row_group_interval_map_.erase(first_deleted_fragment_id.value());
814  } else {
815  CHECK_EQ(total_row_count_, size_t(0));
817  }
818 
819  if (!row_group_intervals_to_scan.empty()) {
820  metadataScanRowGroupIntervals(row_group_intervals_to_scan);
821  }
822 }
823 
825  const std::vector<RowGroupInterval>& row_group_intervals) {
826  std::vector<std::string> file_paths;
827  for (const auto& row_group_interval : row_group_intervals) {
828  file_paths.emplace_back(row_group_interval.file_path);
829  }
830  auto row_group_metadata_map = getRowGroupMetadataMap(file_paths);
831  std::list<RowGroupMetadata> row_group_metadata;
832  for (const auto& row_group_interval : row_group_intervals) {
833  for (auto row_group = row_group_interval.start_index;
834  row_group <= row_group_interval.end_index;
835  row_group++) {
836  row_group_metadata.emplace_back(shared::get_from_map(
837  row_group_metadata_map, {row_group_interval.file_path, row_group}));
838  }
839  }
840  metadataScanRowGroupMetadata(row_group_metadata);
841 }
842 
843 std::map<FilePathAndRowGroup, RowGroupMetadata>
845  const std::vector<std::string>& file_paths) const {
846  auto row_group_metadata = getRowGroupMetadataForFilePaths(file_paths);
847  std::map<FilePathAndRowGroup, RowGroupMetadata> row_group_metadata_map;
848  for (const auto& row_group_metadata_item : row_group_metadata) {
849  row_group_metadata_map[{row_group_metadata_item.file_path,
850  row_group_metadata_item.row_group_index}] =
851  row_group_metadata_item;
852  }
853  return row_group_metadata_map;
854 }
855 } // namespace foreign_storage
bool contains(const T &container, const U &element)
Definition: misc.h:204
std::string getSerializedDataWrapper() const override
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
std::unique_ptr< FileReaderMap > file_reader_cache_
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
void updateMetadataForRolledOffFiles(const std::set< std::string > &rolled_off_files)
std::vector< std::string > getOrderedProcessedFilePaths()
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
void setLastFileRowCount(const std::string &file_path)
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
std::vector< std::string > getAllFilePaths()
void throw_removed_row_in_file_error(const std::string &file_path)
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
size_t get_num_threads(const ForeignTable &table)
virtual int8_t * getMemoryPtr()=0
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: JsonUtils.h:270
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
#define UNREACHABLE()
Definition: Logger.h:338
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:417
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
Definition: CsvShared.cpp:44
#define CHECK_GT(x, y)
Definition: Logger.h:305
void metadataScanRowGroupIntervals(const std::vector< RowGroupInterval > &row_group_intervals)
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::set< std::string > check_for_rolled_off_file_paths(const std::vector< std::string > &all_file_paths, std::vector< std::string > &processed_file_paths)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::string to_string(char const *&&v)
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
Definition: Encoder.h:274
int32_t StringOffsetT
Definition: sqltypes.h:1495
rapidjson::Document read_from_file(const std::string &file_path)
Definition: JsonUtils.cpp:201
void throw_removed_file_error(const std::string &file_path)
This file contains the class specification and related data structures for Catalog.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void addNewFile(const std::string &file_path)
DataPreview getDataPreview(const size_t num_rows)
int get_physical_cols() const
Definition: sqltypes.h:432
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
static SysCatalog & instance()
Definition: SysCatalog.h:343
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
void metadataScanRowGroupMetadata(const std::list< RowGroupMetadata > &row_group_metadata)
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
void updateChunkMetadataForFragment(const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::string write_to_string(const rapidjson::Document &document)
Definition: JsonUtils.cpp:225
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
bool g_enable_smem_group_by true
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
void removeMetadataForLastFile(const std::string &last_file_path)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
static bool allowFileRollOff(const ForeignTable *foreign_table)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
void metadataScanFiles(const std::vector< std::string > &file_paths)
int32_t ArrayOffsetT
Definition: sqltypes.h:1496
LocalIdsScopeGuard setNewThreadId() const
Definition: Logger.cpp:538
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: JsonUtils.h:255
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:402
const ForeignServer * foreign_server
Definition: ForeignTable.h:57
bool g_enable_watchdog false
Definition: Execute.cpp:80
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:597
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)
bool is_dict_encoded_string() const
Definition: sqltypes.h:643
SQLTypeInfo columnType
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
std::map< FilePathAndRowGroup, RowGroupMetadata > getRowGroupMetadataMap(const std::vector< std::string > &file_paths) const
ThreadId thread_id_
Definition: Logger.h:138
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:977
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
Definition: CsvShared.cpp:26
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:882
bool is_array() const
Definition: sqltypes.h:585
#define VLOG(n)
Definition: Logger.h:388
std::list< RowGroupMetadata > getRowGroupMetadataForFilePaths(const std::vector< std::string > &file_paths) const