19 #include <boost/filesystem.hpp>
26 #ifdef ENABLE_IMPORT_PARQUET
32 namespace foreign_storage {
47 CHECK_GT(required_buffers.size(), 0U) <<
"Must populate at least one buffer";
51 chunk_size_validator.validateChunkSizes(required_buffers);
52 chunk_size_validator.validateChunkSizes(optional_buffers);
55 }
catch (
const std::runtime_error& error) {
59 for (
const auto& [chunk_key, buffer] : required_buffers) {
60 if (
auto file_buffer = dynamic_cast<File_Namespace::FileBuffer*>(buffer)) {
61 file_buffer->freeChunkPages();
64 for (
const auto& [chunk_key, buffer] : optional_buffers) {
65 if (
auto file_buffer = dynamic_cast<File_Namespace::FileBuffer*>(buffer)) {
66 file_buffer->freeChunkPages();
85 const size_t num_bytes) {
92 CHECK(destination_buffer);
98 buffer->copyTo(destination_buffer, num_bytes);
110 for (
const auto& key : column_keys) {
117 chunk_key, column_keys, data_wrapper.getCachedParallelismLevel());
121 CHECK(required_buffers.find(chunk_key) != required_buffers.end());
126 buffer->
copyTo(destination_buffer, num_bytes);
134 : std::runtime_error(error_message) {}
155 for (
auto& [key, meta] : chunk_metadata) {
166 }
catch (RestoreDataWrapperException& e) {
168 LOG(
ERROR) <<
"An error occurred while attempting to restore data wrapper using "
169 "disk cached metadata. Clearing cached data for table and proceeding "
170 "with a new data wrapper instance. Database ID: "
171 << db_id <<
", table ID: " << table_id <<
", error: " << e.what();
172 chunk_metadata.clear();
209 auto doc =
getDataWrapper(chunk_key_prefix)->getSerializedDataWrapper();
218 const bool evict_cached_entries) {
222 if (evict_cached_entries) {
237 std::vector<ChunkKey> old_chunk_keys =
242 for (
auto& key : old_chunk_keys) {
273 int last_frag_id = 0;
277 for (
const auto& [key, metadata] : cached_metadata) {
286 const std::vector<ChunkKey>& old_chunk_keys) {
295 }
catch (std::runtime_error& e) {
302 const std::vector<ChunkKey>& old_chunk_keys) {
311 }
catch (std::runtime_error& e) {
317 const std::vector<ChunkKey>& old_chunk_keys,
319 int64_t total_time{0};
320 auto fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
322 if (old_chunk_keys.empty()) {
329 std::set<ChunkKey> chunk_keys_to_be_cached;
332 std::set<ChunkKey> chunk_keys_in_fragment;
333 for (
const auto& chunk_key : old_chunk_keys) {
340 if (chunk_keys_in_fragment.size() > 0) {
341 auto required_buffers =
345 chunk_keys_in_fragment.clear();
350 auto current_time = std::chrono::high_resolution_clock::now();
351 total_time += std::chrono::duration_cast<std::chrono::seconds>(
352 current_time - fragment_refresh_start_time)
355 LOG(
WARNING) <<
"Refresh time exceeded for table key: { " << table_key[0]
356 <<
", " << table_key[1] <<
" } after fragment id: " << fragment_id;
359 fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
366 chunk_keys_in_fragment.insert(column_keys.begin(), column_keys.end());
367 chunk_keys_to_be_cached.insert(column_keys.begin(), column_keys.end());
371 if (chunk_keys_in_fragment.size() > 0) {
372 auto required_buffers =
389 if (boost::filesystem::exists(wrapper_file)) {
394 data_wrapper->restoreDataWrapperInternals(
396 }
catch (std::exception& e) {
397 throw RestoreDataWrapperException(e.what());
419 size_t total_size = 0U;
420 for (
const auto& key : key_set) {
430 std::set<ChunkKey> optional_keys;
435 for (
const auto& keys : {same_fragment_keys, diff_fragment_keys}) {
436 for (
const auto& key : keys) {
438 for (
const auto& column_key : column_keys) {
442 if (total_chunk_size > max_size) {
443 return optional_keys;
445 for (
const auto& column_key : column_keys) {
446 optional_keys.emplace(column_key);
450 return optional_keys;
454 size_t num_bytes = 0;
458 auto metadata = meta.begin()->second;
464 num_bytes = (metadata->sqlType.is_string())
469 num_bytes = metadata->numBytes;
std::lock_guard< T > lock_guard
bool set_comp(const ChunkKey &left, const ChunkKey &right)
void refreshAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
std::vector< int > ChunkKey
bool isMetadataCached(const ChunkKey &) const
size_t getBufferSize(const ChunkKey &key) const
bool is_table_key(const ChunkKey &key)
bool is_varlen_data_key(const ChunkKey &key)
std::set< ChunkKey > getOptionalChunkKeySetAndNormalizeCache(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level)
bool hasStoredDataWrapper(int32_t db, int32_t tb) const
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key) const
void validateChunkSize(const AbstractBuffer *buffer) const
void eraseChunk(const ChunkKey &chunk_key)
size_t getMaxChunkDataSize() const
void clearForTablePrefix(const ChunkKey &)
void storeDataWrapper(const std::string &doc, int32_t db_id, int32_t tb_id)
#define CHUNK_KEY_FRAGMENT_IDX
CachingForeignStorageMgr(ForeignStorageCache *cache)
size_t maxFetchSize(int32_t db_id) const override
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
bool is_append_table_chunk_key(const ChunkKey &chunk_key)
bool hasMaxFetchSize() const override
std::set< ChunkKey > get_column_key_set(const ChunkKey &destination_chunk_key)
void removeTableRelatedDS(const int db_id, const int table_id) override
void refreshTableInCache(const ChunkKey &table_key)
ChunkKey get_table_key(const ChunkKey &key)
ForeignStorageCache * disk_cache_
std::string show_chunk(const ChunkKey &key)
std::set< ChunkKey > getOptionalKeysWithinSizeLimit(const ChunkKey &chunk_key, const std::set< ChunkKey, decltype(set_comp)* > &same_fragment_keys, const std::set< ChunkKey, decltype(set_comp)* > &diff_fragment_keys) const override
bool is_in_memory_system_table
bool is_table_enabled_on_node(const ChunkKey &key)
void eraseDataWrapper(const ChunkKey &key) override
bool is_in_memory_system_table_chunk_key(const ChunkKey &chunk_key)
void getChunkMetadataVecFromDataWrapper(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix)
void clearTable(const ChunkKey &table_key)
This file contains the class specification and related data structures for Catalog.
void refreshNonAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
bool hasStoredDataWrapperMetadata(int32_t db_id, int32_t table_id) const
void updateFragmenterMetadata(const ChunkToBufferMap &) const
constexpr int64_t MAX_REFRESH_TIME_IN_SECONDS
void refreshChunksInCacheByFragment(const std::vector< ChunkKey > &old_chunk_keys, int last_frag_id)
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key) override
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
void evictChunkFromCache(const ChunkKey &chunk_key) override
#define CHUNK_KEY_TABLE_IDX
RestoreDataWrapperException(const std::string &error_message)
bool has_table_prefix(const ChunkKey &key)
An AbstractBuffer is a unit of data management for a data manager.
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
void populateChunkBuffersSafely(ForeignDataWrapper &data_wrapper, ChunkToBufferMap &required_buffers, ChunkToBufferMap &optional_buffers)
ChunkToBufferMap getChunkBuffersForCaching(const std::set< ChunkKey > &chunk_keys) const
void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries) override
bool fragment_maps_to_leaf(const ChunkKey &key)
bool is_shardable_key(const ChunkKey &key)
std::string getSerializedWrapperPath(int32_t db_id, int32_t tb_id) const
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
void cacheMetadataVec(const ChunkMetadataVector &)
const foreign_storage::ForeignTable & get_foreign_table_for_key(const ChunkKey &key)
bool isChunkCached(const ChunkKey &chunk_key) const override
V & get_from_map(std::map< K, V, comp > &map, const K &key)
size_t get_max_chunk_size(const ChunkKey &key)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
virtual void eraseDataWrapper(const ChunkKey &table_key)
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &) const
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
std::pair< int, int > get_table_prefix(const ChunkKey &key)
void checkpoint(const int32_t db_id, const int32_t tb_id)
void createDataWrapperUnlocked(int32_t db, int32_t tb)
File_Namespace::FileBuffer * getCachedChunkIfExists(const ChunkKey &)
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
void removeTableRelatedDS(const int db_id, const int table_id) override
size_t getRequiredBuffersSize(const ChunkKey &chunk_key) const
ChunkKey get_fragment_key(const ChunkKey &key)
std::shared_mutex data_wrapper_mutex_
bool is_varlen_key(const ChunkKey &key)
int getHighestCachedFragId(const ChunkKey &table_key)