20 #include <condition_variable>
24 #include <rapidjson/document.h>
25 #include <boost/filesystem.hpp>
35 namespace foreign_storage {
38 , foreign_table_(nullptr)
39 , user_mapping_(nullptr)
40 , disable_cache_(
false)
41 , is_first_file_scan_call_(
true)
42 , is_file_scan_in_progress_(
false)
43 , iterative_scan_last_fragment_id_(-1) {}
49 , foreign_table_(foreign_table)
51 , user_mapping_(nullptr)
52 , disable_cache_(
false)
53 , is_first_file_scan_call_(
true)
54 , is_file_scan_in_progress_(
false)
55 , iterative_scan_last_fragment_id_(-1) {}
61 const bool disable_cache)
63 , foreign_table_(foreign_table)
65 , user_mapping_(user_mapping)
66 , disable_cache_(disable_cache)
67 , is_first_file_scan_call_(
true)
68 , is_file_scan_in_progress_(
false)
69 , iterative_scan_last_fragment_id_(-1) {}
74 const int32_t fragment_id,
75 const int32_t max_fragment_id) {
77 "Attempting to populate fragment id " +
std::to_string(fragment_id) +
78 " for foreign table " + table->
tableName +
79 " which is greater than the maximum fragment id of " +
85 const int32_t table_id,
86 const int fragment_id) {
87 CHECK(!buffers.empty());
88 std::set<const ColumnDescriptor*> columns;
89 for (
const auto& entry : buffers) {
93 columns.emplace(column);
104 const std::set<const ColumnDescriptor*>& columns,
105 const int fragment_id,
107 std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
108 for (
const auto column : columns) {
114 column_id_to_chunk_map[column->columnId]);
125 CHECK(!required_buffers.empty());
128 auto required_columns =
130 std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
132 required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
134 if (!optional_buffers.empty()) {
135 auto optional_columns =
138 optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
140 populateChunks(column_id_to_chunk_map, fragment_id, delete_buffer);
148 std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
152 for (
auto& entry : column_id_to_chunk_map) {
158 if (column->columnType.is_varlen_indeed()) {
159 data_chunk_key.emplace_back(1);
164 auto cached_metadata_previous =
167 std::make_shared<ChunkMetadata>();
169 *cached_metadata = *cached_metadata_previous;
170 auto chunk_metadata =
171 entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
172 cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
173 cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
174 cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
175 cached_metadata->numBytes = entry.second.getBuffer()->size();
197 const size_t num_loaded,
198 const std::string& item_type,
199 const std::string& foreign_table_name) {
202 num_expected, num_loaded, item_type);
205 std::string(except.what()) +
" Foreign table: " + foreign_table_name);
217 const size_t start_index,
218 const size_t end_index,
221 const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
225 load_file_region_result.
file_offset = file_regions[start_index].first_row_file_offset;
226 load_file_region_result.row_count = 0;
229 for (
size_t i = start_index; i <= end_index; i++) {
232 file_regions[i].first_row_file_offset,
233 file_regions[i].region_size);
234 if (file_regions[i].region_size != read_size) {
241 parse_file_request.
end_pos = file_regions[i].region_size;
243 parse_file_request.
file_offset = file_regions[i].first_row_file_offset;
246 result = parser.
parseBuffer(parse_file_request, (i == end_index));
248 for (
const auto& rejected_row_index : result.
rejected_rows) {
249 load_file_region_result.rejected_row_indices.insert(
250 load_file_region_result.row_count + rejected_row_index);
252 load_file_region_result.row_count += result.
row_count;
254 load_file_region_result.column_id_to_data_blocks_map =
256 return load_file_region_result;
265 const bool size_known,
268 if (size_known && file_size < buffer_size) {
269 buffer_size = file_size + 1;
275 size_t buffer_size = 0;
276 for (
const auto& file_region : file_regions) {
277 buffer_size = std::max(buffer_size, file_region.region_size);
288 const bool size_known,
290 const size_t buffer_size) {
291 size_t thread_count = copy_params.
threads;
292 if (thread_count == 0) {
293 thread_count = std::thread::hardware_concurrency();
295 if (size_known && file_size > 0) {
296 size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
297 if (num_buffers_in_file < thread_count) {
298 thread_count = num_buffers_in_file;
301 CHECK_GT(thread_count, static_cast<size_t>(0));
307 size_t thread_count = copy_params.
threads;
308 if (thread_count == 0) {
310 std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
312 CHECK_GT(thread_count, static_cast<size_t>(0));
317 const size_t chunk_element_count) {
318 if (delete_buffer->
size() < chunk_element_count) {
319 auto remaining_rows = chunk_element_count - delete_buffer->
size();
320 std::vector<int8_t> data(remaining_rows,
false);
321 delete_buffer->
append(data.data(), remaining_rows);
326 std::unique_lock<std::mutex> deferred_requests_lock(
340 std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
344 CHECK(!column_id_to_chunk_map.empty());
358 auto file_regions_it_one_ahead =
364 column_id_to_chunk_map, fragment_id, delete_buffer};
385 const auto& file_regions = file_regions_it->second;
388 if (file_regions.empty()) {
395 const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
397 std::vector<ParseBufferRequest> parse_file_requests{};
398 parse_file_requests.reserve(thread_count);
399 std::set<int> column_filter_set;
400 for (
const auto& pair : column_id_to_chunk_map) {
401 column_filter_set.insert(pair.first);
404 std::vector<std::unique_ptr<FileReader>> file_readers;
405 rapidjson::Value reader_metadata(rapidjson::kObjectType);
406 rapidjson::Document d;
409 file_reader_->serialize(reader_metadata, d.GetAllocator());
412 std::vector<size_t> start_indices, end_indices;
414 for (
size_t i = 0; i < file_regions.size(); i += batch_size) {
415 parse_file_requests.emplace_back(buffer_size,
421 delete_buffer !=
nullptr);
422 auto start_index = i;
423 start_indices.emplace_back(start_index);
424 end_indices.emplace_back(
425 std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1));
428 file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
429 file_path, copy_params, reader_metadata));
435 CHECK_EQ(start_indices.size(), file_readers.size());
436 CHECK_EQ(start_indices.size(), parse_file_requests.size());
437 CHECK_EQ(start_indices.size(), end_indices.size());
442 std::vector<std::future<ParseFileRegionResult>> futures{};
443 for (
size_t i = 0; i < file_readers.size(); i++) {
446 std::ref(file_regions),
449 std::ref(*(file_readers[i])),
450 std::ref(parse_file_requests[i]),
451 std::ref(column_id_to_chunk_map),
455 for (
auto& future : futures) {
459 std::vector<ParseFileRegionResult> load_file_region_results{};
460 for (
auto& future : futures) {
461 load_file_region_results.emplace_back(future.get());
464 std::set<size_t> chunk_rejected_row_indices;
465 size_t chunk_offset = 0;
466 for (
auto result : load_file_region_results) {
467 for (
auto& [column_id, chunk] : column_id_to_chunk_map) {
469 result.column_id_to_data_blocks_map[column_id],
result.row_count, 0);
471 for (
const auto& rejected_row_index :
result.rejected_row_indices) {
472 chunk_rejected_row_indices.insert(rejected_row_index + chunk_offset);
474 chunk_offset +=
result.row_count;
481 auto delete_buffer_data = delete_buffer->
getMemoryPtr();
482 for (
const auto rejected_row_index : chunk_rejected_row_indices) {
483 delete_buffer_data[rejected_row_index] =
true;
494 const size_t max_fragment_size,
495 const size_t rows_remaining) {
496 size_t start_position_in_fragment = start_row_index % max_fragment_size;
497 return std::min<size_t>(rows_remaining, max_fragment_size - start_position_in_fragment);
507 const size_t max_fragment_size,
508 const size_t buffer_row_count) {
509 CHECK(buffer_row_count > 0);
510 std::vector<size_t> partitions{};
511 size_t remaining_rows_in_last_fragment;
512 if (start_row_index % max_fragment_size == 0) {
513 remaining_rows_in_last_fragment = 0;
515 remaining_rows_in_last_fragment =
516 max_fragment_size - (start_row_index % max_fragment_size);
518 if (buffer_row_count <= remaining_rows_in_last_fragment) {
519 partitions.emplace_back(buffer_row_count);
521 if (remaining_rows_in_last_fragment > 0) {
522 partitions.emplace_back(remaining_rows_in_last_fragment);
524 size_t remaining_buffer_row_count =
525 buffer_row_count - remaining_rows_in_last_fragment;
526 while (remaining_buffer_row_count > 0) {
527 partitions.emplace_back(
528 std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
529 remaining_buffer_row_count -= partitions.back();
541 std::unique_lock<std::mutex> pending_requests_lock(
544 pending_requests_lock, [&multi_threading_params] {
553 pending_requests_lock.unlock();
555 return std::move(request);
564 size_t first_row_index,
566 const std::string& file_path) {
567 fragment_id_to_file_regions_map[fragment_id].emplace_back(
583 const size_t row_count) {
594 std::shared_ptr<Catalog_Namespace::Catalog>& catalog,
595 const bool disable_cache) {
596 if (!disable_cache && catalog->getDataMgr()
597 .getPersistentStorageMgr()
598 ->getDiskCacheConfig()
599 .isEnabledForFSI()) {
600 return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
614 bool disable_cache) {
633 if (cached_chunks.find(chunk_key) == cached_chunks.end()) {
635 cached_chunks[chunk_key].setBuffer(
636 cache->getChunkBufferForPrecaching(chunk_key, is_first_block));
638 cached_chunks[chunk_key].setIndexBuffer(
639 cache->getChunkBufferForPrecaching(index_key, is_first_block));
641 if (is_first_block) {
642 cached_chunks[chunk_key].initEncoder();
645 cached_chunks[chunk_key].appendData(data_block, row_count, 0);
655 const size_t element_count_required) {
660 std::unique_lock<std::mutex> chunk_lock(file_scan_param.
getChunkMutex(column_id));
661 conditional_variable.wait(chunk_lock, [element_count_required, &chunk]() {
662 return chunk.getBuffer()->getEncoder()->getNumElems() == element_count_required;
665 chunk.appendData(data_block, row_count, 0);
680 const std::map<int, const ColumnDescriptor*>& column_by_id,
681 const std::map<int, DataBlockPtr>& data_blocks) {
682 std::map<int, DataBlockPtr> dict_encoded_data_blocks;
683 std::map<int, DataBlockPtr> none_dict_encoded_data_blocks;
684 for (
auto& [column_id, data_block] : data_blocks) {
686 if (column->columnType.is_dict_encoded_string()) {
687 dict_encoded_data_blocks[column_id] = data_block;
689 none_dict_encoded_data_blocks[column_id] = data_block;
692 return {dict_encoded_data_blocks, none_dict_encoded_data_blocks};
699 const size_t start_position_in_fragment) {
703 auto chunk_offset = start_position_in_fragment;
709 auto delete_buffer_data = delete_buffer->getMemoryPtr();
711 CHECK(rejected_row_index + chunk_offset < delete_buffer->size());
712 delete_buffer_data[rejected_row_index + chunk_offset] =
true;
722 std::map<int, const ColumnDescriptor*>& column_by_id,
723 std::map<int, FileRegions>& fragment_id_to_file_regions_map,
725 const size_t expected_current_element_count) {
736 std::vector<std::pair<const size_t, std::future<int8_t*>>>
737 encoded_data_block_ptrs_futures;
740 if (import_buffer ==
nullptr) {
744 if (import_buffer->getTypeInfo().is_dict_encoded_string()) {
745 auto string_payload_ptr = import_buffer->getStringBuffer();
748 auto column_id = import_buffer->getColumnDesc()->columnId;
749 encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
751 import_buffer->addDictEncodedString(*string_payload_ptr);
752 return import_buffer->getStringDictBuffer();
757 auto process_subset_of_data_blocks =
758 [&](
const std::map<int, DataBlockPtr>& data_blocks) {
759 for (
auto& [column_id, data_block] : data_blocks) {
760 const auto column = column_by_id[column_id];
768 expected_current_element_count);
773 auto [dict_encoded_data_blocks, none_dict_encoded_data_blocks] =
776 process_subset_of_data_blocks(
777 none_dict_encoded_data_blocks);
780 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
781 encoded_ptr_future.second.wait();
783 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
784 CHECK_GT(dict_encoded_data_blocks.count(encoded_ptr_future.first), 0UL);
785 dict_encoded_data_blocks[encoded_ptr_future.first].numbersPtr =
786 encoded_ptr_future.second.get();
789 process_subset_of_data_blocks(
790 dict_encoded_data_blocks);
802 std::map<int, const ColumnDescriptor*>& column_by_id,
803 std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
814 const auto column = column_by_id[column_id];
815 if (column->columnType.is_varlen_indeed()) {
816 chunk_key.emplace_back(1);
821 std::make_unique<ForeignStorageBuffer>();
852 std::unique_lock<std::mutex> completed_requests_queue_lock(
854 multi_threading_params.
request_pool.emplace(std::move(request));
855 completed_requests_queue_lock.unlock();
864 std::map<int, FileRegions>& fragment_id_to_file_regions_map,
866 std::map<int, const ColumnDescriptor*> column_by_id{};
869 if (!request_opt.has_value()) {
872 auto& request = request_opt.value();
874 if (column_by_id.empty()) {
875 for (
const auto column : request.getColumns()) {
876 column_by_id[column->columnId] = column;
880 request.first_row_index, request.getMaxFragRows(), request.buffer_row_count);
881 request.begin_pos = 0;
882 size_t row_index = request.first_row_index;
883 for (
const auto partition : partitions) {
884 request.process_row_count = partition;
885 for (
const auto& import_buffer : request.import_buffers) {
886 if (import_buffer !=
nullptr) {
887 import_buffer->clear();
891 int fragment_id = row_index / request.getMaxFragRows();
897 fragment_id_to_file_regions_map);
898 row_index +=
result.row_count;
899 request.begin_pos =
result.row_offsets.back() - request.file_offset;
904 std::lock_guard<std::mutex> pending_requests_lock(
920 std::unique_lock<std::mutex> request_pool_lock(
924 [&multi_threading_params] {
return !multi_threading_params.
request_pool.empty(); });
925 auto request = std::move(multi_threading_params.
request_pool.front());
927 request_pool_lock.unlock();
928 CHECK(request.buffer);
933 std::unique_lock<std::mutex> request_pool_lock(
945 std::unique_lock<std::mutex> deferred_requests_lock(
955 std::unique_lock<std::mutex> deferred_requests_lock(
958 std::unique_lock<std::mutex> pending_requests_lock(
977 std::unique_lock<std::mutex> pending_requests_lock(
989 std::map<int, FileRegions>& fragment_id_to_file_regions_map,
992 std::map<int, const ColumnDescriptor*> column_by_id{};
995 if (!request_opt.has_value()) {
1000 if (column_by_id.empty()) {
1001 for (
const auto column : request.
getColumns()) {
1002 column_by_id[column->columnId] = column;
1006 for (
size_t num_rows_left_to_process =
1008 num_rows_left_to_process > 0;
1009 num_rows_left_to_process =
1023 if (import_buffer !=
nullptr) {
1024 import_buffer->clear();
1028 size_t start_position_in_fragment = row_index % request.
getMaxFragRows();
1034 fragment_id_to_file_regions_map,
1036 start_position_in_fragment);
1042 request,
result, file_scan_param, start_position_in_fragment);
1048 std::lock_guard<std::mutex> pending_requests_lock(
1064 size_t& buffer_size,
1065 const size_t alloc_size) {
1067 if (buffer_size < alloc_size) {
1068 buffer = std::make_unique<char[]>(alloc_size);
1069 buffer_size = alloc_size;
1088 const size_t& buffer_size,
1089 const std::string& file_path,
1093 size_t& first_row_index_in_buffer,
1094 size_t& current_file_offset,
1098 iterative_residual_buffer,
1099 const bool is_first_file_scan_call,
1100 int& iterative_scan_last_fragment_id) {
1101 auto& alloc_size = iterative_residual_buffer.
alloc_size;
1102 auto& residual_buffer = iterative_residual_buffer.
residual_data;
1106 if (is_first_file_scan_call) {
1107 alloc_size = buffer_size;
1108 residual_buffer = std::make_unique<char[]>(alloc_size);
1109 residual_buffer_size = 0;
1110 residual_buffer_alloc_size = alloc_size;
1119 bool current_fragment_fully_read_during_iterative_scan =
1120 file_scan_param && file_scan_param->
fragment_id < iterative_scan_last_fragment_id;
1136 !current_fragment_fully_read_during_iterative_scan)) {
1138 std::lock_guard<std::mutex> pending_requests_lock(
1148 if (residual_buffer_size > 0) {
1149 memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
1151 size_t size = residual_buffer_size;
1152 size += file_reader.
read(request.buffer.get() + residual_buffer_size,
1153 alloc_size - residual_buffer_size);
1161 }
else if (size == 1 && request.buffer[0] == copy_params.
line_delim) {
1165 current_file_offset++;
1169 unsigned int num_rows_in_buffer = 0;
1174 first_row_index_in_buffer,
1177 request.buffer_size = size;
1178 request.buffer_alloc_size = alloc_size;
1179 request.first_row_index = first_row_index_in_buffer;
1180 request.file_offset = current_file_offset;
1181 request.buffer_row_count = num_rows_in_buffer;
1182 request.processed_row_count = 0;
1183 request.begin_pos = 0;
1185 residual_buffer_size = size - request.end_pos;
1186 if (residual_buffer_size > 0) {
1188 memcpy(residual_buffer.get(),
1189 request.buffer.get() + request.end_pos,
1190 residual_buffer_size);
1193 current_file_offset += request.end_pos;
1194 first_row_index_in_buffer += num_rows_in_buffer;
1196 if (num_rows_in_buffer > 0) {
1202 if (file_scan_param) {
1203 const int32_t last_fragment_index =
1205 if (last_fragment_index > file_scan_param->
fragment_id) {
1206 iterative_scan_last_fragment_id = last_fragment_index;
1212 std::unique_lock<std::mutex> pending_requests_queue_lock(
1215 pending_requests_queue_lock, [&multi_threading_params] {
1220 pending_requests_queue_lock.unlock();
1226 const size_t& buffer_size,
1227 const std::string& file_path,
1231 size_t& first_row_index_in_buffer,
1232 size_t& current_file_offset,
1236 iterative_residual_buffer,
1237 const bool is_first_file_scan_call,
1238 int& iterative_scan_last_fragment_id) {
1245 multi_threading_params,
1246 first_row_index_in_buffer,
1247 current_file_offset,
1250 iterative_residual_buffer,
1251 is_first_file_scan_call,
1252 iterative_scan_last_fragment_id);
1255 std::unique_lock<std::mutex> pending_requests_lock(
1266 const size_t& buffer_size,
1267 const std::string& file_path,
1271 size_t& first_row_index_in_buffer,
1272 size_t& current_file_offset,
1276 iterative_residual_buffer,
1277 const bool is_first_file_scan_call) {
1284 multi_threading_params,
1285 first_row_index_in_buffer,
1286 current_file_offset,
1289 iterative_residual_buffer,
1290 is_first_file_scan_call,
1303 const size_t start_row,
1304 const size_t total_num_rows,
1305 std::map<
ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map) {
1308 chunk_key.emplace_back(1);
1312 int start_fragment = start_row / foreign_table->
maxFragRows;
1313 int end_fragment{0};
1314 if (total_num_rows > 0) {
1315 end_fragment = (total_num_rows - 1) / foreign_table->
maxFragRows;
1317 for (
int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
1318 size_t num_elements = (
static_cast<size_t>(foreign_table->
maxFragRows *
1319 (fragment_id + 1)) > total_num_rows)
1324 chunk_metadata_map[chunk_key] =
1330 const std::map<
ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map,
1331 const std::map<int, FileRegions>& fragment_id_to_file_regions_map,
1333 std::unique_ptr<FileReader>& file_reader,
1334 const std::string& file_path,
1337 const std::optional<size_t>& max_file_count,
1341 std::function<std::string()> get_s3_key,
1343 size_t& append_start_offset) {
1345 CHECK(chunk_metadata_map.empty());
1346 CHECK(fragment_id_to_file_regions_map.empty());
1350 file_reader = std::make_unique<LocalMultiFileReader>(
1351 file_path, copy_params, file_path_options, max_file_count);
1357 append_start_offset = 0;
1384 std::set<std::string> rolled_off_files;
1416 std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1417 for (
auto column : columns) {
1418 column_by_id[column->columnId] = column;
1428 std::set<int> columns_to_scan;
1429 for (
auto column : columns) {
1431 columns_to_scan.insert(column->columnId);
1448 std::vector<std::future<void>> futures{};
1449 for (
size_t i = 0; i < thread_count; i++) {
1450 multi_threading_params.
request_pool.emplace(buffer_size,
1460 std::ref(multi_threading_params),
1471 multi_threading_params,
1479 for (
auto& future : futures) {
1487 CHECK(column_entry != column_by_id.end());
1488 const auto& column_type = column_entry->second->columnType;
1489 auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
1490 chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
1491 const auto& cached_chunks = multi_threading_params.
cached_chunks;
1492 if (!column_type.is_varlen_indeed()) {
1493 chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
1494 }
else if (
auto chunk_entry = cached_chunks.find(chunk_key);
1495 chunk_entry != cached_chunks.end()) {
1496 auto cached_buffer = chunk_entry->second.getBuffer();
1497 CHECK(cached_buffer);
1498 chunk_metadata->numBytes = cached_buffer->size();
1499 buffer->setSize(cached_buffer->size());
1501 chunk_metadata->numBytes = buffer->size();
1506 for (
auto column : columns) {
1513 if (!rolled_off_files.empty()) {
1518 chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
1535 <<
" iterative file scan can not be used with APPEND mode.";
1566 std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1567 for (
auto column : columns) {
1568 column_by_id[column->columnId] = column;
1578 std::set<int> columns_to_scan;
1579 for (
auto column : columns) {
1580 columns_to_scan.insert(column->columnId);
1597 std::vector<std::future<void>> futures{};
1613 std::ref(file_scan_param)));
1630 for (
auto& future : futures) {
1646 const std::set<std::string>& rolled_off_files,
1647 const std::map<int32_t, const ColumnDescriptor*>& column_by_id) {
1648 std::set<int32_t> deleted_fragment_ids;
1649 std::optional<int32_t> partially_deleted_fragment_id;
1650 std::optional<size_t> partially_deleted_fragment_row_count;
1652 bool file_region_deleted{
false};
1653 for (
auto it = file_regions.begin(); it != file_regions.end();) {
1655 it = file_regions.erase(it);
1656 file_region_deleted =
true;
1661 if (file_regions.empty()) {
1662 deleted_fragment_ids.emplace(fragment_id);
1663 }
else if (file_region_deleted) {
1664 partially_deleted_fragment_id = fragment_id;
1665 partially_deleted_fragment_row_count = 0;
1666 for (
const auto& file_region : file_regions) {
1667 partially_deleted_fragment_row_count.value() += file_region.row_count;
1675 chunk_metadata->numElements = 0;
1676 chunk_metadata->numBytes = 0;
1677 }
else if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] == partially_deleted_fragment_id) {
1678 CHECK(partially_deleted_fragment_row_count.has_value());
1679 auto old_chunk_stats = chunk_metadata->chunkStats;
1682 cd->columnType, partially_deleted_fragment_row_count.value());
1685 chunk_metadata->chunkStats = old_chunk_stats;
1691 rapidjson::Document d;
1697 "fragment_id_to_file_regions_map",
1701 rapidjson::Value reader_metadata(rapidjson::kObjectType);
1702 file_reader_->serialize(reader_metadata, d.GetAllocator());
1703 d.AddMember(
"reader_metadata", reader_metadata, d.GetAllocator());
1713 const std::string& file_path,
1716 CHECK(d.IsObject());
1723 CHECK(d.HasMember(
"reader_metadata"));
1729 full_file_path, copy_params, d[
"reader_metadata"]);
1741 for (
auto& pair : chunk_metadata) {
1750 pair.second->numElements);
1752 pair.second->chunkStats);
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool contains(const T &container, const U &element)
void append_data_block_to_chunk(const foreign_storage::IterativeFileScanParameters &file_scan_param, DataBlockPtr data_block, size_t row_count, const int column_id, const ColumnDescriptor *column, const size_t element_count_required)
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
virtual bool isScanFinished() const =0
std::vector< int > ChunkKey
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
virtual void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const =0
virtual size_t read(void *buffer, size_t max_size)=0
std::string getTableName() const
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog, const bool disable_cache)
class for a per-database catalog. also includes metadata for the current database and the current use...
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
std::set< size_t > rejected_rows
std::vector< ArrayDatum > * arraysPtr
void resize_delete_buffer(AbstractBuffer *delete_buffer, const size_t chunk_element_count)
std::pair< std::map< int, DataBlockPtr >, std::map< int, DataBlockPtr > > partition_data_blocks(const std::map< int, const ColumnDescriptor * > &column_by_id, const std::map< int, DataBlockPtr > &data_blocks)
void populate_chunks_using_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const ParseBufferRequest &request, ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::IterativeFileScanParameters &file_scan_param, const size_t expected_current_element_count)
static const std::string LOCAL_FILE_STORAGE_TYPE
std::condition_variable & getChunkConditionalVariable(const int col_id) const
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
#define CHUNK_KEY_FRAGMENT_IDX
void updateRolledOffChunks(const std::set< std::string > &rolled_off_files, const std::map< int32_t, const ColumnDescriptor * > &column_by_id)
const UserMapping * user_mapping_
virtual ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const =0
bool skip_metadata_scan(const ColumnDescriptor *column)
virtual int8_t * getMemoryPtr()=0
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
std::string getSerializedDataWrapper() const override
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void reset_multithreading_params(foreign_storage::MetadataScanMultiThreadingParams &multi_threading_params)
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
size_t num_rows_to_process(const size_t start_row_index, const size_t max_fragment_size, const size_t rows_remaining)
virtual const TextFileBufferParser & getFileBufferParser() const =0
std::map< int, Chunk_NS::Chunk > & column_id_to_chunk_map
size_t append_start_offset_
void dispatch_scan_requests(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams ©_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t ¤t_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call, int &iterative_scan_last_fragment_id)
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
size_t residual_buffer_alloc_size
bool no_deferred_requests(MetadataScanMultiThreadingParams &multi_threading_params)
const bool disable_cache_
std::set< size_t > rejected_row_indices
bool is_first_file_scan_call_
void iterativeFileScan(ChunkMetadataVector &chunk_metadata_vector, IterativeFileScanParameters &file_scan_param)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
rapidjson::Document read_from_file(const std::string &file_path)
size_t get_buffer_size(const import_export::CopyParams ©_params, const bool size_known, const size_t file_size)
std::vector< FileRegion > FileRegions
MetadataScanMultiThreadingParams multi_threading_params_
bool key_does_not_shard_to_leaf(const ChunkKey &key)
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const SQLTypeInfo &type, size_t num_elements)
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
static const std::string STORAGE_TYPE_KEY
size_t get_thread_count(const import_export::CopyParams ©_params, const bool size_known, const size_t file_size, const size_t buffer_size)
void throw_fragment_id_out_of_bounds_error(const TableDescriptor *table, const int32_t fragment_id, const int32_t max_fragment_id)
void update_delete_buffer(const ParseBufferRequest &request, const ParseBufferResult &result, const foreign_storage::IterativeFileScanParameters &file_scan_param, const size_t start_position_in_fragment)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void populate_chunks(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser, foreign_storage::IterativeFileScanParameters &file_scan_param)
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
virtual std::optional< size_t > getMaxFileCount() const
std::unique_ptr< FileReader > file_reader_
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
void initialize_non_append_mode_scan(const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams ©_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)
std::string write_to_string(const rapidjson::Document &document)
bool is_dict_encoded_type() const
An AbstractBuffer is a unit of data management for a data manager.
bool operator<(const ParseFileRegionResult &other) const
void dispatch_all_deferred_requests(MetadataScanMultiThreadingParams &multi_threading_params)
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > getColumns() const
bool g_enable_smem_group_by true
void dispatch_scan_requests_with_exception_handling(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams ©_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t ¤t_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call, int &iterative_scan_last_fragment_id)
bool isAppendMode() const
Checks if the table is in append mode.
std::unique_ptr< char[]> residual_data
std::mutex & getChunkMutex(const int col_id) const
size_t processed_row_count
bool request_pool_non_empty(MetadataScanMultiThreadingParams &multi_threading_params)
void defer_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
std::vector< size_t > row_offsets
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static bool allowFileRollOff(const ForeignTable *foreign_table)
bool is_file_scan_in_progress_
V & get_from_map(std::map< K, V, comp > &map, const K &key)
void process_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const ParseBufferRequest &request, ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
const ForeignTable * foreign_table_
ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
size_t residual_buffer_size
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
ResidualBuffer residual_buffer_
bool isRestored() const override
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
int iterative_scan_last_fragment_id_
AbstractBuffer * delete_buffer
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
std::map< int, FileRegions > fragment_id_to_file_regions_map_
const ForeignServer * foreign_server
virtual size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const =0
bool g_enable_watchdog false
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, int fragment_id, size_t first_row_index, const ParseBufferResult &result, const std::string &file_path)
#define DEBUG_TIMER(name)
std::map< std::string, std::string, std::less<>> OptionsMap
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
int32_t getTableId() const
bool is_varlen_indeed() const
std::optional< ParseBufferRequest > get_next_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
#define CHUNK_KEY_COLUMN_IDX
bool is_file_scan_finished(const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
size_t getMaxFragRows() const
void throw_unexpected_number_of_items(const size_t &num_expected, const size_t &num_loaded, const std::string &item_type)
virtual std::set< std::string > checkForRolledOffFiles(const shared::FilePathOptions &file_path_options)
std::mutex delete_buffer_mutex
std::string getFilePath() const
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
virtual void updateStats(const int64_t val, const bool is_null)=0
size_t file_size(const int fd)
std::unique_ptr< char[]> buffer
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
void dispatch_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
void cache_blocks(std::map< ChunkKey, Chunk_NS::Chunk > &cached_chunks, DataBlockPtr data_block, size_t row_count, ChunkKey &chunk_key, const ColumnDescriptor *column, bool is_first_block, bool disable_cache)
virtual std::string getCurrentFilePath() const =0
AbstractTextFileDataWrapper()