35 for (
auto it = meta_vec.begin(); it != meta_vec.end();) {
42 namespace foreign_storage {
46 const size_t num_bytes) {
57 auto storage_type_entry = foreign_table->foreign_server->options.find(
60 if (storage_type_entry == foreign_table->foreign_server->options.end()) {
64 bool is_s3_storage_type =
66 if (is_s3_storage_type) {
67 throw ForeignStorageException{
68 "Query cannot be executed for S3 backed foreign table because AWS S3 support is "
69 "currently disabled."};
85 int64_t actual_chunk_size = buffer->
size();
92 for (
const auto& [chunk_key, buffer] : buffers) {
93 int64_t actual_chunk_size = buffer->
size();
101 const int column_id)
const {
107 std::stringstream error_stream;
108 error_stream <<
"Chunk populated by data wrapper which is " << actual_chunk_size
109 <<
" bytes exceeds maximum byte size limit of " <<
max_chunk_size_ <<
"."
111 <<
", column name : " << column_name;
112 throw ForeignStorageException(error_stream.str());
117 const size_t num_bytes) {
118 ChunkSizeValidator chunk_size_validator(chunk_key);
121 CHECK(destination_buffer);
125 chunk_size_validator.validateChunkSize(destination_buffer);
145 chunk_key, column_keys,
getDataWrapper(chunk_key)->getNonCachedParallelismLevel());
146 if (optional_keys.size()) {
151 column_keys.erase(chunk_key);
153 required_buffers[chunk_key] = destination_buffer;
155 getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
156 chunk_size_validator.validateChunkSizes(required_buffers);
157 chunk_size_validator.validateChunkSizes(optional_buffers);
163 for (
const auto& [key, buffer] : buffers) {
166 auto column = catalog->getMetadataForColumn(key[CHUNK_KEY_TABLE_IDX],
168 if (column->columnType.is_varlen_indeed() &&
172 auto foreign_table = catalog->getForeignTable(key[CHUNK_KEY_TABLE_IDX]);
173 auto fragmenter = foreign_table->fragmenter;
179 auto chunk_metadata = std::make_shared<ChunkMetadata>();
180 encoder->getMetadata(chunk_metadata);
181 fragmenter->updateColumnChunkMetadata(
205 buffer->
copyTo(destination_buffer, num_bytes);
217 throw ForeignStorageException{
218 "Query cannot be executed for foreign table because FSI is currently disabled."};
231 getDataWrapper(key_prefix)->populateChunkMetadata(chunk_metadata);
240 const ChunkKey table_key{db_id, table_id};
245 mock_it->second->unsetParentWrapper();
253 return FOREIGN_STORAGE_MGR;
257 return ToString(FOREIGN_STORAGE_MGR);
277 std::shared_ptr<MockForeignDataWrapper> data_wrapper) {
282 data_wrapper->setParentWrapper(wrapper_iter->second);
294 auto foreign_table = catalog->getForeignTable(tb_id);
297 foreign_table->foreign_server->data_wrapper_type, db_id, foreign_table);
320 const bool evict_cached_entries) {
325 if (evict_cached_entries ||
326 !catalog->getForeignTable(table_key[CHUNK_KEY_TABLE_IDX])->isAppendMode()) {
345 std::numeric_limits<int>::max()};
383 const size_t page_size,
384 const size_t initial_size) {
391 const size_t num_bytes) {
442 return catalog->getForeignTable(table_id)->maxChunkSize;
446 std::set<ChunkKey> chunk_keys;
453 auto foreign_table = catalog->getForeignTable(table_id);
455 ForeignTableSchema schema{db_id, foreign_table};
456 auto logical_column = schema.getLogicalColumn(destination_column_id);
457 auto logical_column_id = logical_column->columnId;
459 for (
auto column_id = logical_column_id;
460 column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
462 auto column = schema.getColumnDescriptor(column_id);
463 if (column->columnType.is_varlen_indeed()) {
464 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
465 chunk_keys.emplace(data_chunk_key);
467 ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
468 chunk_keys.emplace(index_chunk_key);
470 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
471 chunk_keys.emplace(data_chunk_key);
478 std::vector<ChunkKey> chunk_keys;
485 auto foreign_table = catalog->getForeignTable(table_id);
487 ForeignTableSchema schema{db_id, foreign_table};
488 auto logical_column = schema.getLogicalColumn(destination_column_id);
489 auto logical_column_id = logical_column->columnId;
491 for (
auto column_id = logical_column_id;
492 column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
494 auto column = schema.getColumnDescriptor(column_id);
495 if (column->columnType.is_varlen_indeed()) {
496 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
497 chunk_keys.emplace_back(data_chunk_key);
499 ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
500 chunk_keys.emplace_back(index_chunk_key);
502 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
503 chunk_keys.emplace_back(data_chunk_key);
516 if ((left[CHUNK_KEY_DB_IDX] < right[CHUNK_KEY_DB_IDX]) ||
522 if (left.size() < right.size()) {
534 for (
const auto& key : key_set) {
543 const std::set<ChunkKey>& chunk_keys) {
546 for (
const auto& chunk_key : chunk_keys) {
549 chunk_buffer_map[chunk_key]->resetToEmpty();
551 return chunk_buffer_map;
555 const std::map<
ChunkKey, std::set<ParallelismHint>>& hints_per_table) {
560 std::pair<std::set<ChunkKey, decltype(set_comp)*>,
561 std::set<ChunkKey, decltype(set_comp)*>>
564 const std::set<ChunkKey>& required_chunk_keys,
567 auto same_fragment_keys = std::set<ChunkKey, decltype(set_comp)*>(
set_comp);
568 auto diff_fragment_keys = std::set<ChunkKey, decltype(set_comp)*>(
set_comp);
574 for (
const auto& hint : table_hints->second) {
575 const auto& [column_id, fragment_id] = hint;
577 optional_chunk_key.push_back(column_id);
579 optional_chunk_key.push_back(chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
581 optional_chunk_key.push_back(fragment_id);
590 if (optional_chunk_key[CHUNK_KEY_FRAGMENT_IDX] ==
591 chunk_key[CHUNK_KEY_FRAGMENT_IDX]) {
592 same_fragment_keys.emplace(optional_chunk_key);
594 diff_fragment_keys.emplace(optional_chunk_key);
598 return {same_fragment_keys, diff_fragment_keys};
605 std::set<ChunkKey> optional_keys;
606 for (
const auto& keys : {same_fragment_keys, diff_fragment_keys}) {
607 for (
auto key : keys) {
609 for (
auto column_key : column_keys) {
610 optional_keys.emplace(column_key);
614 return optional_keys;
619 const std::set<ChunkKey>& required_chunk_keys,
625 auto [same_fragment_keys, diff_fragment_keys] =
631 std::set<ChunkKey> optional_keys_to_delete;
632 if (!optional_keys.empty()) {
633 for (
const auto& key : optional_keys) {
636 auto all_keys_cached =
637 std::all_of(key_set.begin(), key_set.end(), [
this](
const ChunkKey& key) {
642 if (all_keys_cached) {
643 optional_keys_to_delete.insert(key_set.begin(), key_set.end());
650 for (
const auto& key : optional_keys_to_delete) {
651 optional_keys.erase(key);
653 return optional_keys;
std::lock_guard< T > lock_guard
bool set_comp(const ChunkKey &left, const ChunkKey &right)
void clearTempChunkBufferMapEntriesForTableUnlocked(const ChunkKey &table_key)
bool contains(const T &container, const U &element)
std::vector< int > ChunkKey
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
AbstractBuffer * alloc(const size_t num_bytes) override
virtual std::set< ChunkKey > getOptionalKeysWithinSizeLimit(const ChunkKey &chunk_key, const std::set< ChunkKey, decltype(set_comp)* > &same_fragment_keys, const std::set< ChunkKey, decltype(set_comp)* > &diff_fragment_keys) const
bool fetchBufferIfTempBufferMapEntryExists(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes)
bool is_system_table_chunk_key(const ChunkKey &chunk_key)
bool is_table_key(const ChunkKey &key)
virtual bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
std::set< ChunkKey > getOptionalChunkKeySetAndNormalizeCache(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level)
void filter_metadata_by_leaf(ChunkMetadataVector &meta_vec, const ChunkKey &key_prefix)
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key) const
void validateChunkSize(const AbstractBuffer *buffer) const
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
ChunkSizeValidator(const ChunkKey &chunk_key)
#define CHUNK_KEY_FRAGMENT_IDX
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
size_t getNumChunks() override
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void free(AbstractBuffer *buffer) override
std::shared_ptr< Catalog_Namespace::Catalog > catalog_
std::set< ChunkKey > get_column_key_set(const ChunkKey &destination_chunk_key)
const ColumnDescriptor * column_
void removeTableRelatedDS(const int db_id, const int table_id) override
virtual bool isChunkCached(const ChunkKey &chunk_key) const
bool is_replicated_table_chunk_key(const ChunkKey &chunk_key)
ChunkKey get_table_key(const ChunkKey &key)
std::pair< std::set< ChunkKey, decltype(set_comp)* >, std::set< ChunkKey, decltype(set_comp)* > > getPrefetchSets(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level) const
bool is_table_enabled_on_node(const ChunkKey &key)
std::shared_lock< T > shared_lock
This file contains the class specification and related data structures for Catalog.
std::shared_mutex parallelism_hints_mutex_
bool isAllocationCapped() override
void updateFragmenterMetadata(const ChunkToBufferMap &) const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static SysCatalog & instance()
static const std::string STORAGE_TYPE_KEY
bool contains_fragment_key(const std::set< ChunkKey > &key_set, const ChunkKey &target_key)
bool is_varlen_index_key(const ChunkKey &key)
bool hasDataWrapperForChunk(const ChunkKey &chunk_key) const
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool isDatawrapperRestored(const ChunkKey &chunk_key)
std::shared_mutex temp_chunk_buffer_map_mutex_
std::map< ChunkKey, std::set< ParallelismHint > > parallelism_hints_per_table_
std::unique_lock< T > unique_lock
#define CHUNK_KEY_TABLE_IDX
bool has_table_prefix(const ChunkKey &key)
An AbstractBuffer is a unit of data management for a data manager.
size_t getAllocated() override
std::string getStringMgrType() override
const ForeignTable * foreign_table_
bool fragment_maps_to_leaf(const ChunkKey &key)
bool is_shardable_key(const ChunkKey &key)
size_t getMaxSize() override
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
size_t get_max_chunk_size(const ChunkKey &key)
virtual void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries)
void setDataWrapper(const ChunkKey &table_key, std::shared_ptr< MockForeignDataWrapper > data_wrapper)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
AbstractBuffer * getBuffer(const ChunkKey &chunk_key, const size_t num_bytes) override
ChunkToBufferMap allocateTempBuffersForChunks(const std::set< ChunkKey > &chunk_keys)
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
#define CHUNK_KEY_VARLEN_IDX
virtual void eraseDataWrapper(const ChunkKey &table_key)
std::map< ChunkKey, std::shared_ptr< MockForeignDataWrapper > > mocked_wrapper_map_
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
std::pair< int, int > get_table_prefix(const ChunkKey &key)
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
Encoder * getEncoder() const
void setParallelismHints(const std::map< ChunkKey, std::set< ParallelismHint >> &hints_per_table)
void validateChunkSizes(const ChunkToBufferMap &buffers) const
void createDataWrapperUnlocked(int32_t db, int32_t tb)
virtual bool hasMaxFetchSize() const
MgrType getMgrType() override
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
void throwChunkSizeViolatedError(const int64_t actual_chunk_size, const int column_id=-1) const
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
size_t getInUseSize() override
#define CHUNK_KEY_COLUMN_IDX
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
void checkpoint() override
virtual void evictChunkFromCache(const ChunkKey &chunk_key)
ChunkKey get_fragment_key(const ChunkKey &key)
static const std::string S3_STORAGE_TYPE
std::vector< ChunkKey > get_column_key_vec(const ChunkKey &destination_chunk_key)
std::shared_mutex data_wrapper_mutex_
virtual size_t maxFetchSize(int32_t db_id) const
std::string printSlabs() override
bool is_varlen_key(const ChunkKey &key)
bool isBufferOnDevice(const ChunkKey &chunk_key) override