OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignStorageInterface.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include "Catalog/Catalog.h"
19 #include "Shared/StringTransform.h"
20 
22  const ChunkKey& chunk_key,
23  PersistentForeignStorageInterface* persistent_foreign_storage)
24  : Data_Namespace::AbstractBuffer(0)
25  , chunk_key_(chunk_key)
26  , persistent_foreign_storage_(persistent_foreign_storage) {}
27 
28 void ForeignStorageBuffer::read(int8_t* const dst,
29  const size_t numBytes,
30  const size_t offset,
31  const Data_Namespace::MemoryLevel dstBufferType,
32  const int dstDeviceId) {
33  CHECK_EQ(size_t(0), offset);
34  CHECK_EQ(-1, dstDeviceId);
36 }
37 
38 int8_t* ForeignStorageBuffer::tryZeroCopy(const size_t numBytes) {
40 }
41 
43  const size_t numBytes,
44  const Data_Namespace::MemoryLevel srcBufferType,
45  const int deviceId) {
46  setAppended();
47  buff_.insert(buff_.end(), src, src + numBytes);
48  size_ += numBytes;
49 }
50 
52  const int db_id,
53  const int table_id,
54  PersistentForeignStorageInterface* persistent_foreign_storage)
55  : AbstractBufferMgr(0), persistent_foreign_storage_(persistent_foreign_storage) {}
56 
58  // TODO(alex)
59  std::vector<ForeignStorageColumnBuffer> column_buffers;
60  for (auto& kv : chunk_index_) {
61  const auto buffer = kv.second->moveBuffer();
62  column_buffers.emplace_back(
63  ForeignStorageColumnBuffer{kv.first, kv.second->getSqlType(), buffer});
64  }
65  persistent_foreign_storage_->append(column_buffers);
66 }
67 
69  const ChunkKey& key,
70  const size_t pageSize,
71  const size_t initialSize) {
73  const auto it_ok = chunk_index_.emplace(
74  key, std::make_unique<ForeignStorageBuffer>(key, persistent_foreign_storage_));
75  // this check fails if we create table, drop it and create again
76  // CHECK(it_ok.second);
77  return it_ok.first->second.get();
78 }
79 
81  const ChunkKey& key,
82  const size_t numBytes) {
84  const auto it = chunk_index_.find(key);
85  CHECK(it != chunk_index_.end());
86  return it->second.get();
87 }
88 
91  const size_t numBytes) {
92  CHECK(numBytes);
93  auto file_buffer = dynamic_cast<ForeignStorageBuffer*>(getBuffer(key, numBytes));
94  CHECK(file_buffer);
95 
96  // TODO: check if GPU is used
97  auto buf = file_buffer->tryZeroCopy(numBytes);
98  if (buf) {
99  destBuffer->setMemoryPtr(buf);
100  } else {
101  destBuffer->reserve(numBytes);
102  file_buffer->read(destBuffer->getMemoryPtr(), numBytes);
103  }
104  destBuffer->setSize(numBytes);
105  destBuffer->syncEncoder(file_buffer);
106 }
107 
109  ChunkMetadataVector& chunkMetadataVec,
110  const ChunkKey& keyPrefix) {
111  heavyai::unique_lock<heavyai::shared_mutex> chunk_index_write_lock(
112  chunk_index_mutex_); // is this guarding the right structure? it look slike we
113  // oly read here for chunk
114  auto chunk_it = chunk_index_.lower_bound(keyPrefix);
115  if (chunk_it == chunk_index_.end()) {
116  CHECK(false); // throw?
117  }
118 
119  while (chunk_it != chunk_index_.end() &&
120  std::search(chunk_it->first.begin(),
121  chunk_it->first.begin() + keyPrefix.size(),
122  keyPrefix.begin(),
123  keyPrefix.end()) != chunk_it->first.begin() + keyPrefix.size()) {
124  const auto& chunk_key = chunk_it->first;
125  if (chunk_key.size() == 5) {
126  if (chunk_key[4] == 1) {
127  const auto& buffer = *chunk_it->second;
128  auto type = buffer.getSqlType();
129  auto size = buffer.size();
130  auto subkey = chunk_key;
131  subkey[4] = 2;
132  auto& index_buf = *(chunk_index_.find(subkey)->second);
133  auto bs = index_buf.size() / index_buf.getSqlType().get_size();
134  auto chunk_metadata =
135  std::make_shared<ChunkMetadata>(type, size, bs, ChunkStats{});
136  chunkMetadataVec.emplace_back(chunk_key, chunk_metadata);
137  }
138  } else {
139  const auto& buffer = *chunk_it->second;
140  auto chunk_metadata = std::make_shared<ChunkMetadata>();
141  chunk_metadata->sqlType = buffer.getSqlType();
142  buffer.getEncoder()->getMetadata(chunk_metadata);
143  chunkMetadataVec.emplace_back(chunk_key, chunk_metadata);
144  }
145  chunk_it++;
146  }
147 }
148 
149 Data_Namespace::AbstractBufferMgr* ForeignStorageInterface::lookupBufferManager(
150  const int db_id,
151  const int table_id) {
152  auto key = std::make_pair(db_id, table_id);
153  if (managers_map_.count(key)) {
154  return managers_map_[key].get();
155  }
156 
157  std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
159  const auto it = table_persistent_storage_interface_map_.find(key);
160  if (it == table_persistent_storage_interface_map_.end()) {
161  return nullptr;
162  }
163  const auto it_ok = managers_map_.emplace(
164  key, std::make_unique<ForeignStorageBufferMgr>(db_id, table_id, it->second));
165  CHECK(it_ok.second);
166  return it_ok.first->second.get();
167 }
168 
169 void ForeignStorageInterface::dropBufferManager(const int db_id, const int table_id) {
170  std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
172  managers_map_.erase(std::make_pair(db_id, table_id));
173 }
174 
176  std::unique_ptr<PersistentForeignStorageInterface> persistent_foreign_storage) {
177  std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
179  const auto it_ok = persistent_storage_interfaces_.emplace(
180  persistent_foreign_storage->getType(), std::move(persistent_foreign_storage));
181  CHECK(it_ok.second);
182 }
183 
184 std::pair<std::string, std::string> parseStorageType(const std::string& type) {
185  size_t sep = type.find_first_of(':'), sep2 = sep != std::string::npos ? sep + 1 : sep;
186  auto res = std::make_pair(to_upper(type.substr(0, sep)), type.substr(sep2));
187  return res;
188 }
189 
191  TableDescriptor& td,
192  std::list<ColumnDescriptor>& cols) {
193  auto type = parseStorageType(td.storageType);
194  std::unique_lock<std::mutex> persistent_storage_interfaces_lock(
196  const auto it = persistent_storage_interfaces_.find(type.first);
197  if (it == persistent_storage_interfaces_.end()) {
198  throw std::runtime_error("storage type " + type.first + " not supported");
199  }
200  auto& p = it->second;
201  persistent_storage_interfaces_lock.unlock();
202  p->prepareTable(db_id, type.second, td, cols);
203 }
204 
206  const TableDescriptor& td,
207  const std::list<ColumnDescriptor>& cols) {
208  const int table_id = td.tableId;
209  auto type = parseStorageType(td.storageType);
210 
211  std::unique_lock<std::mutex> persistent_storage_interfaces_lock(
213  const auto it = persistent_storage_interfaces_.find(type.first);
214  if (it == persistent_storage_interfaces_.end()) {
215  throw std::runtime_error("storage type " + type.first + " not supported");
216  }
217 
218  auto db_id = catalog->getCurrentDB().dbId;
219  const auto it_ok = table_persistent_storage_interface_map_.emplace(
220  std::make_pair(db_id, table_id), it->second.get());
221  // this check fails if we create table, drop it and create again
222  // CHECK(it_ok.second);
223  persistent_storage_interfaces_lock.unlock();
224  it_ok.first->second->registerTable(catalog,
225  it_ok.first->first,
226  type.second,
227  td,
228  cols,
229  lookupBufferManager(db_id, table_id));
230 }
PersistentForeignStorageInterface * persistent_foreign_storage_
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
std::unordered_map< std::string, std::unique_ptr< PersistentForeignStorageInterface > > persistent_storage_interfaces_
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
void syncEncoder(const AbstractBuffer *src_buffer)
Data_Namespace::AbstractBufferMgr * lookupBufferManager(const int db_id, const int table_id)
void append(int8_t *src, const size_t numBytes, const Data_Namespace::MemoryLevel srcBufferType=Data_Namespace::CPU_LEVEL, const int deviceId=-1) override
std::string storageType
virtual int8_t * getMemoryPtr()=0
void fetchBuffer(const ChunkKey &key, Data_Namespace::AbstractBuffer *destBuffer, const size_t numBytes=0) override
std::shared_lock< T > shared_lock
This file contains the class specification and related data structures for Catalog.
void registerPersistentStorageInterface(std::unique_ptr< PersistentForeignStorageInterface > persistent_foreign_storage)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:265
ForeignStorageBufferMgr(const int db_id, const int table_id, PersistentForeignStorageInterface *persistent_foreign_storage)
std::unique_lock< T > unique_lock
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
virtual void append(const std::vector< ForeignStorageColumnBuffer > &column_buffers)=0
ForeignStorageBuffer(const ChunkKey &chunk_key, PersistentForeignStorageInterface *persistent_foreign_storage)
An AbstractBuffer is a unit of data management for a data manager.
void registerTable(Catalog_Namespace::Catalog *catalog, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols)
ids are created
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_index_
std::vector< int8_t > buff_
std::string to_upper(const std::string &str)
virtual void setMemoryPtr(int8_t *new_ptr)
Data_Namespace::AbstractBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
virtual int8_t * tryZeroCopy(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, const size_t num_bytes)
std::map< std::pair< int, int >, PersistentForeignStorageInterface * > table_persistent_storage_interface_map_
heavyai::shared_mutex chunk_index_mutex_
void dropBufferManager(const int db_id, const int table_id)
void setSize(const size_t size)
#define CHECK(condition)
Definition: Logger.h:291
std::map< std::pair< int, int >, std::unique_ptr< ForeignStorageBufferMgr > > managers_map_
PersistentForeignStorageInterface * persistent_foreign_storage_
void read(int8_t *const dst, const size_t numBytes, const size_t offset=0, const Data_Namespace::MemoryLevel dstBufferType=Data_Namespace::CPU_LEVEL, const int dstDeviceId=-1) override
virtual void reserve(size_t num_bytes)=0
std::pair< std::string, std::string > parseStorageType(const std::string &type)
virtual void read(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t num_bytes)=0
int8_t * tryZeroCopy(const size_t numBytes)
void prepareTable(const int db_id, TableDescriptor &td, std::list< ColumnDescriptor > &cols)
prepare table options and modify columns
Data_Namespace::AbstractBuffer * createBuffer(const ChunkKey &key, const size_t pageSize=0, const size_t initialSize=0) override