21 #include "../Shared/shard_key.h"
26 namespace Fragmenter_Namespace {
29 std::vector<std::vector<uint8_t>>
rawData;
34 template <
typename SRC>
40 bool duplicated_key_value) {
41 const auto n_shard_tables = shard_count * leaf_count;
42 std::vector<std::vector<size_t>> row_indices_of_shards(n_shard_tables);
43 if (!duplicated_key_value) {
44 for (
size_t row = 0; row < row_count; row++) {
47 auto shard_id = (std::is_unsigned<SRC>::value)
48 ? src[row] % n_shard_tables
50 row_indices_of_shards[shard_id].push_back(row);
53 auto shard_id = (std::is_unsigned<SRC>::value)
54 ? src[0] % n_shard_tables
56 row_indices_of_shards[shard_id].reserve(row_count);
57 for (
size_t row = 0; row < row_count; row++) {
58 row_indices_of_shards[shard_id].push_back(row);
62 return row_indices_of_shards;
67 typename std::vector<T>::iterator it = std::find(vec.begin(), vec.end(), val);
68 CHECK(it != vec.end());
69 return std::distance(vec.begin(), it);
84 const bool get_logical_size =
true) {
93 throw std::runtime_error(
"geo and array columns have variable length elements");
114 throw std::runtime_error(
115 "non encoded string columns have variable length elements");
119 throw std::runtime_error(
"not supported column type: " + cd->
columnName +
" (" +
130 auto find_it = insert_chunks.
chunks.find(shard_cd->columnId);
134 auto shard_count = td->nShards;
139 auto memory_ptr = shard_chunk.getBuffer()->getMemoryPtr();
146 reinterpret_cast<uint8_t*>(memory_ptr),
152 reinterpret_cast<uint16_t*>(memory_ptr),
158 reinterpret_cast<uint32_t*>(memory_ptr),
164 reinterpret_cast<uint64_t*>(memory_ptr),
167 UNREACHABLE() <<
"unexpected data element size of column";
178 auto shardDataBlockIndex =
indexOf(insert_data.
columnIds, shard_cd->columnId);
180 auto rowCount = insert_data.
numRows;
181 auto shardCount = td->nShards;
187 bool is_default = insert_data.
is_default[shardDataBlockIndex];
194 reinterpret_cast<uint8_t*>(shardDataBlock.
numbersPtr),
201 reinterpret_cast<uint16_t*>(shardDataBlock.
numbersPtr),
208 reinterpret_cast<uint32_t*>(shardDataBlock.
numbersPtr),
215 reinterpret_cast<uint64_t*>(shardDataBlock.
numbersPtr),
218 throw std::runtime_error(
"Unexpected data block element size");
221 template <
typename T>
223 for (
size_t row = 0; row < rowIndices.size(); row++) {
224 auto srcRowIndex = rowIndices[row];
225 dst[row] = src[srcRowIndex];
237 const std::vector<size_t>& rowIndices,
243 std::vector<size_t> single_row_idx({0ul});
244 const std::vector<size_t>&
rows = is_default ? single_row_idx : rowIndices;
246 auto& data = dataOwner.
stringData[columnIndex];
247 data.resize(rows.size());
252 auto& data = dataOwner.
arrayData[columnIndex];
253 data.resize(rows.size());
259 auto& data = dataOwner.
rawData[columnIndex];
260 data.resize(rows.size() * rawArrayElementSize);
262 switch (rawArrayElementSize) {
265 reinterpret_cast<uint8_t*>(dataBlock.
numbersPtr),
266 reinterpret_cast<uint8_t*>(&data[0]));
271 reinterpret_cast<uint16_t*>(dataBlock.
numbersPtr),
272 reinterpret_cast<uint16_t*>(&data[0]));
277 reinterpret_cast<uint32_t*>(dataBlock.
numbersPtr),
278 reinterpret_cast<uint32_t*>(&data[0]));
283 reinterpret_cast<uint64_t*>(dataBlock.
numbersPtr),
284 reinterpret_cast<uint64_t*>(&data[0]));
288 throw std::runtime_error(
"Unexpected data block element size");
291 ret.
numbersPtr =
reinterpret_cast<int8_t*
>(&data[0]);
294 return {pCol->
columnId, ret, is_default};
297 std::pair<std::list<std::unique_ptr<foreign_storage::ForeignStorageBuffer>>, InsertChunks>
301 const std::vector<size_t>& rowIndices) {
306 physical_table->tableId, insert_chunks.
db_id, {}, {}};
308 std::list<std::unique_ptr<foreign_storage::ForeignStorageBuffer>> buffers;
310 for (
const auto& [column_id, chunk] : insert_chunks.
chunks) {
311 auto column = chunk->getColumnDesc();
312 insert_chunks_for_shard.chunks[column_id] = std::make_shared<Chunk_NS::Chunk>(column);
313 auto& chunk_for_shard = *insert_chunks_for_shard.chunks[column_id];
314 chunk_for_shard.setBuffer(
315 buffers.emplace_back(std::make_unique<foreign_storage::ForeignStorageBuffer>())
317 if (column->columnType.is_varlen_indeed()) {
318 chunk_for_shard.setIndexBuffer(
319 buffers.emplace_back(std::make_unique<foreign_storage::ForeignStorageBuffer>())
322 chunk_for_shard.initEncoder();
323 chunk_for_shard.appendEncodedDataAtIndices(*chunk, rowIndices);
324 CHECK_EQ(chunk_for_shard.getBuffer()->getEncoder()->getNumElems(), rowIndices.size());
328 auto row_count = rowIndices.size();
329 insert_chunks_for_shard.valid_row_indices.reserve(row_count);
330 for (
size_t irow = 0; irow < row_count; ++irow) {
331 auto row_index = rowIndices[irow];
335 insert_chunks_for_shard.valid_row_indices.emplace_back(irow);
339 return {std::move(buffers), insert_chunks_for_shard};
346 const std::vector<size_t>& rowIndices) {
352 shardData.
tableId = ptd->tableId;
353 shardData.
numRows = rowIndices.size();
355 std::vector<const ColumnDescriptor*> pCols;
356 std::vector<int> lCols;
360 for (
const auto& cd : logicalColumns) {
361 lCols.push_back(cd->columnId);
364 auto physicalColumns =
366 for (
const auto& cd : physicalColumns) {
371 for (
size_t col = 0; col < insert_data.
columnIds.size(); col++) {
373 dataOwner.
rawData.emplace_back();
377 auto copycat = [&
cat, &dataOwner, &rowIndices, &lCols, &pCols, &insert_data](
int col) {
378 const auto lColId = insert_data.
columnIds[col];
379 const auto pCol = pCols[
indexOf(lCols, lColId)];
385 insert_data.
data[col],
389 std::vector<std::future<BlockWithColumnId>> worker_threads;
390 for (
size_t col = 0; col < insert_data.
columnIds.size(); col++) {
394 for (
auto& child : worker_threads) {
398 for (
auto& child : worker_threads) {
399 auto shardColumnData = child.get();
400 shardData.
columnIds.push_back(shardColumnData.columnId);
401 shardData.
data.push_back(shardColumnData.block);
402 shardData.
is_default.push_back(shardColumnData.is_default);
415 return starting_leaf_index;
421 const auto* td =
cat.getMetadataForTable(insert_chunks.
table_id);
424 if (td->nShards == 0) {
428 auto row_indices_of_shards =
431 auto insert_shard_data =
432 [
this, &session_info, &insert_chunks, &
cat, &td, &row_indices_of_shards](
434 const auto shard_tables = cat.getPhysicalTablesDescriptors(td);
435 auto stard_table_idx = shardId % td->nShards;
436 auto shard_leaf_idx = shardId / td->nShards;
438 const auto& row_indices_of_shard = row_indices_of_shards[shardId];
441 cat, insert_chunks, stard_table_idx, row_indices_of_shard);
443 session_info, shard_leaf_idx, shard_insert_chunks);
446 std::vector<std::future<void>> worker_threads;
447 for (
size_t shard_id = 0; shard_id < row_indices_of_shards.size(); shard_id++) {
448 if (row_indices_of_shards[shard_id].size() > 0) {
449 worker_threads.push_back(
453 for (
auto& child : worker_threads) {
456 for (
auto& child : worker_threads) {
465 const auto* td =
cat.getMetadataForTable(insert_data.
tableId);
468 if (td->nShards == 0) {
472 auto rowIndicesOfShards =
475 auto insertShardData =
476 [
this, &session_info, &insert_data, &
cat, &td, &rowIndicesOfShards](
478 const auto shard_tables = cat.getPhysicalTablesDescriptors(td);
479 auto stardTableIdx = shardId % td->nShards;
480 auto shardLeafIdx = shardId / td->nShards;
482 const auto& rowIndicesOfShard = rowIndicesOfShards[shardId];
486 cat, shardDataOwner, insert_data, stardTableIdx, rowIndicesOfShard);
491 std::vector<std::future<void>> worker_threads;
492 for (
size_t shardId = 0; shardId < rowIndicesOfShards.size(); shardId++) {
493 if (rowIndicesOfShards[shardId].size() > 0) {
494 worker_threads.push_back(
498 for (
auto& child : worker_threads) {
501 for (
auto& child : worker_threads) {
509 const size_t leaf_idx,
511 CHECK(leaf_idx == 0);
514 created_td->
fragmenter->insertChunksNoCheckpoint(insert_chunks);
518 const size_t leaf_idx,
520 CHECK(leaf_idx == 0);
523 created_td->
fragmenter->insertDataNoCheckpoint(insert_data);
536 auto table_epochs = catalog.getTableEpochs(db_id, table_id);
537 catalog.setTableEpochs(db_id, table_epochs);
HOST DEVICE int get_size() const
void insertChunks(const Catalog_Namespace::SessionInfo &session_info, const InsertChunks &insert_chunks)
class for a per-database catalog. also includes metadata for the current database and the current use...
std::vector< std::string > * stringsPtr
std::vector< ArrayDatum > * arraysPtr
std::pair< std::list< std::unique_ptr< foreign_storage::ForeignStorageBuffer > >, InsertChunks > copy_data_of_shard(const Catalog_Namespace::Catalog &cat, const InsertChunks &insert_chunks, int shardTableIndex, const std::vector< size_t > &rowIndices)
std::shared_mutex current_leaf_index_mutex_
std::vector< std::vector< size_t > > computeRowIndicesOfShards(const Catalog_Namespace::Catalog &cat, size_t leafCount, InsertData &insert_data)
std::vector< bool > is_default
virtual size_t leafCount()=0
HOST DEVICE SQLTypes get_type() const
virtual void insertDataToLeaf(const Catalog_Namespace::SessionInfo &parent_session_info, const size_t leaf_idx, Fragmenter_Namespace::InsertData &insert_data)=0
const ColumnDescriptor * getShardColumnMetadataForTable(const TableDescriptor *td) const
bool isStringVectorData(const ColumnDescriptor *cd)
void insertData(const Catalog_Namespace::SessionInfo &session_info, InsertData &insert_data)
int tableId
identifies the database into which the data is being inserted
size_t numRows
a vector of column ids for the row(s) being inserted
void checkpoint(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId) override
size_t size_of_raw_column(const Catalog_Namespace::Catalog &cat, const ColumnDescriptor *cd, const bool get_logical_size=true)
future< Result > async(Fn &&fn, Args &&...args)
size_t current_leaf_index_
int get_logical_size() const
void insertDataToLeaf(const Catalog_Namespace::SessionInfo &parent_session_info, const size_t leaf_idx, Fragmenter_Namespace::InsertData &insert_data) override
size_t getNumElems() const
std::unique_lock< T > unique_lock
std::vector< std::vector< uint8_t > > rawData
int getDatabaseId() const
specifies the content in-memory of a row in the column metadata table
std::vector< std::vector< size_t > > compute_row_indices_of_shards(size_t shard_count, size_t leaf_count, size_t row_count, SRC *src, bool duplicated_key_value)
void checkpointWithAutoRollback(const int logical_table_id) const
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
size_t indexOf(std::vector< T > &vec, T val)
HOST DEVICE EncodingType get_compression() const
std::vector< DataBlockPtr > data
the number of rows being inserted
AbstractBuffer * getBuffer() const
Catalog & getCatalog() const
std::map< int, std::shared_ptr< Chunk_NS::Chunk > > chunks
void insertChunksToLeaf(const Catalog_Namespace::SessionInfo &parent_session_info, const size_t leaf_idx, const Fragmenter_Namespace::InsertChunks &insert_chunks) override
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
void copyColumnDataOfShard(const std::vector< size_t > &rowIndices, T *src, T *dst)
std::vector< std::vector< ArrayDatum > > arrayData
std::string get_type_name() const
void rollback(const Catalog_Namespace::SessionInfo &parent_session_info, int tableId) override
std::vector< size_t > valid_row_indices
Encoder * getEncoder() const
bool isDatumVectorData(const ColumnDescriptor *cd)
std::vector< std::vector< std::string > > stringData
The data to be inserted using the fragment manager.
InsertData copyDataOfShard(const Catalog_Namespace::Catalog &cat, ShardDataOwner &dataOwner, InsertData &insert_data, int shardTableIndex, const std::vector< size_t > &rowIndices)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
std::vector< int > columnIds
identifies the table into which the data is being inserted
virtual void insertChunksToLeaf(const Catalog_Namespace::SessionInfo &parent_session_info, const size_t leaf_idx, const Fragmenter_Namespace::InsertChunks &insert_chunks)=0
#define SHARD_FOR_KEY(key, num_shards)
InsertConnector & connector_