25 , chunk_key_(chunk_key)
26 , persistent_foreign_storage_(persistent_foreign_storage) {}
29 const size_t numBytes,
32 const int dstDeviceId) {
43 const size_t numBytes,
47 buff_.insert(
buff_.end(), src, src + numBytes);
55 : AbstractBufferMgr(0), persistent_foreign_storage_(persistent_foreign_storage) {}
59 std::vector<ForeignStorageColumnBuffer> column_buffers;
61 const auto buffer = kv.second->moveBuffer();
62 column_buffers.emplace_back(
70 const size_t pageSize,
71 const size_t initialSize) {
77 return it_ok.first->second.get();
82 const size_t numBytes) {
86 return it->second.get();
91 const size_t numBytes) {
97 auto buf = file_buffer->tryZeroCopy(numBytes);
102 file_buffer->read(destBuffer->
getMemoryPtr(), numBytes);
120 std::search(chunk_it->first.begin(),
121 chunk_it->first.begin() + keyPrefix.size(),
123 keyPrefix.end()) != chunk_it->first.begin() + keyPrefix.size()) {
124 const auto& chunk_key = chunk_it->first;
125 if (chunk_key.size() == 5) {
126 if (chunk_key[4] == 1) {
127 const auto& buffer = *chunk_it->second;
128 auto type = buffer.getSqlType();
129 auto size = buffer.size();
130 auto subkey = chunk_key;
133 auto bs = index_buf.size() / index_buf.getSqlType().get_size();
134 auto chunk_metadata =
136 chunkMetadataVec.emplace_back(chunk_key, chunk_metadata);
139 const auto& buffer = *chunk_it->second;
140 auto chunk_metadata = std::make_shared<ChunkMetadata>();
141 chunk_metadata->sqlType = buffer.getSqlType();
142 buffer.getEncoder()->getMetadata(chunk_metadata);
143 chunkMetadataVec.emplace_back(chunk_key, chunk_metadata);
151 const int table_id) {
152 auto key = std::make_pair(db_id, table_id);
157 std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
164 key, std::make_unique<ForeignStorageBufferMgr>(db_id, table_id, it->second));
166 return it_ok.first->second.get();
170 std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
176 std::unique_ptr<PersistentForeignStorageInterface> persistent_foreign_storage) {
177 std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
180 persistent_foreign_storage->getType(), std::move(persistent_foreign_storage));
185 size_t sep = type.find_first_of(
':'), sep2 = sep != std::string::npos ? sep + 1 : sep;
186 auto res = std::make_pair(
to_upper(type.substr(0, sep)), type.substr(sep2));
192 std::list<ColumnDescriptor>& cols) {
194 std::unique_lock<std::mutex> persistent_storage_interfaces_lock(
198 throw std::runtime_error(
"storage type " +
type.first +
" not supported");
200 auto& p = it->second;
201 persistent_storage_interfaces_lock.unlock();
202 p->prepareTable(db_id,
type.second, td, cols);
207 const std::list<ColumnDescriptor>& cols) {
208 const int table_id = td.
tableId;
211 std::unique_lock<std::mutex> persistent_storage_interfaces_lock(
215 throw std::runtime_error(
"storage type " +
type.first +
" not supported");
220 std::make_pair(db_id, table_id), it->second.get());
223 persistent_storage_interfaces_lock.unlock();
224 it_ok.first->second->registerTable(catalog,
PersistentForeignStorageInterface * persistent_foreign_storage_
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
std::vector< int > ChunkKey
std::unordered_map< std::string, std::unique_ptr< PersistentForeignStorageInterface > > persistent_storage_interfaces_
class for a per-database catalog. also includes metadata for the current database and the current use...
void syncEncoder(const AbstractBuffer *src_buffer)
void checkpoint() override
Data_Namespace::AbstractBufferMgr * lookupBufferManager(const int db_id, const int table_id)
void append(int8_t *src, const size_t numBytes, const Data_Namespace::MemoryLevel srcBufferType=Data_Namespace::CPU_LEVEL, const int deviceId=-1) override
virtual int8_t * getMemoryPtr()=0
const ChunkKey chunk_key_
void fetchBuffer(const ChunkKey &key, Data_Namespace::AbstractBuffer *destBuffer, const size_t numBytes=0) override
std::shared_lock< T > shared_lock
This file contains the class specification and related data structures for Catalog.
void registerPersistentStorageInterface(std::unique_ptr< PersistentForeignStorageInterface > persistent_foreign_storage)
const DBMetadata & getCurrentDB() const
ForeignStorageBufferMgr(const int db_id, const int table_id, PersistentForeignStorageInterface *persistent_foreign_storage)
std::unique_lock< T > unique_lock
virtual void append(const std::vector< ForeignStorageColumnBuffer > &column_buffers)=0
ForeignStorageBuffer(const ChunkKey &chunk_key, PersistentForeignStorageInterface *persistent_foreign_storage)
An AbstractBuffer is a unit of data management for a data manager.
void registerTable(Catalog_Namespace::Catalog *catalog, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols)
ids are created
std::mutex persistent_storage_interfaces_mutex_
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_index_
std::vector< int8_t > buff_
virtual void setMemoryPtr(int8_t *new_ptr)
Data_Namespace::AbstractBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
virtual int8_t * tryZeroCopy(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, const size_t num_bytes)
std::map< std::pair< int, int >, PersistentForeignStorageInterface * > table_persistent_storage_interface_map_
heavyai::shared_mutex chunk_index_mutex_
void dropBufferManager(const int db_id, const int table_id)
void setSize(const size_t size)
std::map< std::pair< int, int >, std::unique_ptr< ForeignStorageBufferMgr > > managers_map_
PersistentForeignStorageInterface * persistent_foreign_storage_
void read(int8_t *const dst, const size_t numBytes, const size_t offset=0, const Data_Namespace::MemoryLevel dstBufferType=Data_Namespace::CPU_LEVEL, const int dstDeviceId=-1) override
virtual void reserve(size_t num_bytes)=0
std::pair< std::string, std::string > parseStorageType(const std::string &type)
virtual void read(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t num_bytes)=0
int8_t * tryZeroCopy(const size_t numBytes)
void prepareTable(const int db_id, TableDescriptor &td, std::list< ColumnDescriptor > &cols)
prepare table options and modify columns
Data_Namespace::AbstractBuffer * createBuffer(const ChunkKey &key, const size_t pageSize=0, const size_t initialSize=0) override