OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UpdelStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <algorithm>
18 #include <mutex>
19 #include <string>
20 #include <vector>
21 
22 #include <boost/variant.hpp>
23 #include <boost/variant/get.hpp>
24 
25 #include "Catalog/Catalog.h"
29 #include "LockMgr/LockMgr.h"
30 #include "QueryEngine/Execute.h"
31 #include "Shared/DateConverters.h"
33 #include "Shared/thread_count.h"
35 
36 extern bool g_enable_string_functions;
37 
39 
40 namespace Fragmenter_Namespace {
41 
42 inline void wait_cleanup_threads(std::vector<std::future<void>>& threads) {
43  for (auto& t : threads) {
44  t.get();
45  }
46  threads.clear();
47 }
48 
49 inline bool is_integral(const SQLTypeInfo& t) {
50  return t.is_integer() || t.is_boolean() || t.is_time() || t.is_timeinterval();
51 }
52 
54 
56  const TableDescriptor* td,
57  const ColumnDescriptor* cd,
58  const int fragment_id,
59  const std::vector<uint64_t>& frag_offsets,
60  const ScalarTargetValue& rhs_value,
61  const SQLTypeInfo& rhs_type,
62  const Data_Namespace::MemoryLevel memory_level,
63  UpdelRoll& updel_roll) {
64  updateColumn(catalog,
65  td,
66  cd,
67  fragment_id,
68  frag_offsets,
69  std::vector<ScalarTargetValue>(1, rhs_value),
70  rhs_type,
71  memory_level,
72  updel_roll);
73 }
74 
75 static int get_chunks(const Catalog_Namespace::Catalog* catalog,
76  const TableDescriptor* td,
77  const FragmentInfo& fragment,
78  const Data_Namespace::MemoryLevel memory_level,
79  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
80  for (int cid = 1, nc = 0; nc < td->nColumns; ++cid) {
81  if (const auto cd = catalog->getMetadataForColumn(td->tableId, cid)) {
82  ++nc;
83  if (!cd->isVirtualCol) {
84  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cid);
85  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
86  ChunkKey chunk_key{
87  catalog->getCurrentDB().dbId, td->tableId, cid, fragment.fragmentId};
88  auto chunk = Chunk_NS::Chunk::getChunk(cd,
89  &catalog->getDataMgr(),
90  chunk_key,
91  memory_level,
92  0,
93  chunk_meta_it->second->numBytes,
94  chunk_meta_it->second->numElements);
95  chunks.push_back(chunk);
96  }
97  }
98  }
99  return chunks.size();
100 }
101 
103  public:
105 
106  virtual void convertToColumnarFormat(size_t row, size_t indexInFragment) = 0;
107 
108  virtual void addDataBlocksToInsertData(
109  Fragmenter_Namespace::InsertData& insertData) = 0;
110 };
111 
112 template <typename BUFFER_DATA_TYPE, typename INSERT_DATA_TYPE>
114  using ColumnDataPtr =
115  std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
116 
120  const BUFFER_DATA_TYPE* data_buffer_addr_;
121 
122  ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
123  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
124  column_data_ = ColumnDataPtr(reinterpret_cast<INSERT_DATA_TYPE*>(
125  checked_malloc(num_rows * sizeof(INSERT_DATA_TYPE))));
126  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
127  }
128 
129  ~ScalarChunkConverter() override {}
130 
131  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
132  auto buffer_value = data_buffer_addr_[indexInFragment];
133  auto insert_value = static_cast<INSERT_DATA_TYPE>(buffer_value);
134  column_data_.get()[row] = insert_value;
135  }
136 
138  DataBlockPtr dataBlock;
139  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
140  insertData.data.push_back(dataBlock);
141  insertData.columnIds.push_back(column_descriptor_->columnId);
142  }
143 };
144 
148 
149  std::unique_ptr<std::vector<ArrayDatum>> column_data_;
152 
153  FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
154  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
155  column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
158  }
159 
161 
162  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
163  auto src_value_ptr = data_buffer_addr_ + (indexInFragment * fixed_array_length_);
164 
166  src_value_ptr);
167 
168  (*column_data_)[row] = ArrayDatum(
169  fixed_array_length_, (int8_t*)src_value_ptr, is_null, DoNothingDeleter());
170  }
171 
173  DataBlockPtr dataBlock;
174  dataBlock.arraysPtr = column_data_.get();
175  insertData.data.push_back(dataBlock);
176  insertData.columnIds.push_back(column_descriptor_->columnId);
177  }
178 };
179 
182 
183  ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
184  : FixedLenArrayChunkConverter(num_rows, chunk) {
186  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
187  : nullptr);
188  }
189 
190  ~ArrayChunkConverter() override {}
191 
192  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
193  auto startIndex = index_buffer_addr_[indexInFragment];
194  auto endIndex = index_buffer_addr_[indexInFragment + 1];
195  size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
196  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
197  (*column_data_)[row] = ArrayDatum(
198  src_value_size, (int8_t*)src_value_ptr, endIndex < 0, DoNothingDeleter());
199  }
200 };
201 
205 
206  std::unique_ptr<std::vector<std::string>> column_data_;
207  const int8_t* data_buffer_addr_;
209 
210  StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk* chunk)
211  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
212  column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
215  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
216  : nullptr);
217  }
218 
219  ~StringChunkConverter() override {}
220 
221  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
222  size_t src_value_size =
223  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
224  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
225  (*column_data_)[row] = std::string((const char*)src_value_ptr, src_value_size);
226  }
227 
229  DataBlockPtr dataBlock;
230  dataBlock.stringsPtr = column_data_.get();
231  insertData.data.push_back(dataBlock);
232  insertData.columnIds.push_back(column_descriptor_->columnId);
233  }
234 };
235 
236 template <typename BUFFER_DATA_TYPE>
238  using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
239 
243  const BUFFER_DATA_TYPE* data_buffer_addr_;
244 
245  DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
246  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
248  reinterpret_cast<int64_t*>(checked_malloc(num_rows * sizeof(int64_t))));
249  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
250  }
251 
252  ~DateChunkConverter() override {}
253 
254  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
255  auto buffer_value = data_buffer_addr_[indexInFragment];
256  auto insert_value = static_cast<int64_t>(buffer_value);
258  }
259 
261  DataBlockPtr dataBlock;
262  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
263  insertData.data.push_back(dataBlock);
264  insertData.columnIds.push_back(column_descriptor_->columnId);
265  }
266 };
267 
269  const Catalog_Namespace::Catalog* catalog,
270  const TableDescriptor* td,
271  const int fragmentId,
272  const std::vector<TargetMetaInfo> sourceMetaInfo,
273  const std::vector<const ColumnDescriptor*> columnDescriptors,
274  const RowDataProvider& sourceDataProvider,
275  const size_t indexOffFragmentOffsetColumn,
276  const Data_Namespace::MemoryLevel memoryLevel,
277  UpdelRoll& updelRoll,
278  Executor* executor) {
279  updelRoll.is_varlen_update = true;
280  updelRoll.catalog = catalog;
281  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
282  updelRoll.memoryLevel = memoryLevel;
283 
284  size_t num_entries = sourceDataProvider.getEntryCount();
285  size_t num_rows = sourceDataProvider.getRowCount();
286 
287  if (0 == num_rows) {
288  // bail out early
289  return;
290  }
291 
293 
294  auto fragment_ptr = getFragmentInfo(fragmentId);
295  auto& fragment = *fragment_ptr;
296  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
297  get_chunks(catalog, td, fragment, memoryLevel, chunks);
298  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
299  columnDescriptors.size());
300  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
301  size_t indexOfDeletedColumn{0};
302  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
303  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
304  auto chunk = chunks[indexOfChunk];
305  const auto chunk_cd = chunk->getColumnDesc();
306 
307  if (chunk_cd->isDeletedCol) {
308  indexOfDeletedColumn = chunk_cd->columnId;
309  deletedChunk = chunk;
310  continue;
311  }
312 
313  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
314  columnDescriptors.end(),
315  [=](const ColumnDescriptor* cd) -> bool {
316  return cd->columnId == chunk_cd->columnId;
317  });
318 
319  if (targetColumnIt != columnDescriptors.end()) {
320  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
321 
322  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
323  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
324 
326  num_rows,
327  sourceDataMetaInfo,
328  targetDescriptor,
329  *catalog,
330  targetDescriptor->columnType,
331  !targetDescriptor->columnType.get_notnull(),
332  sourceDataProvider.getLiteralDictionary(),
334  sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
335  ? executor->getStringDictionaryProxy(
336  sourceDataMetaInfo.get_type_info().getStringDictKey(),
337  executor->getRowSetMemoryOwner(),
338  true)
339  : nullptr};
340  auto converter = factory.create(param);
341  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
342 
343  if (targetDescriptor->columnType.is_geometry()) {
344  // geometry columns are composites
345  // need to skip chunks, depending on geo type
346  switch (targetDescriptor->columnType.get_type()) {
347  case kMULTIPOLYGON:
348  indexOfChunk += 5;
349  break;
350  case kPOLYGON:
351  indexOfChunk += 4;
352  break;
353  case kMULTILINESTRING:
354  indexOfChunk += 3;
355  break;
356  case kLINESTRING:
357  case kMULTIPOINT:
358  indexOfChunk += 2;
359  break;
360  case kPOINT:
361  indexOfChunk += 1;
362  break;
363  default:
364  CHECK(false); // not supported
365  }
366  }
367  } else {
368  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
369  std::unique_ptr<ChunkToInsertDataConverter> converter;
370 
371  if (chunk_cd->columnType.is_fixlen_array()) {
372  converter =
373  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
374  } else if (chunk_cd->columnType.is_string()) {
375  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
376  } else if (chunk_cd->columnType.is_geometry()) {
377  // the logical geo column is a string column
378  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
379  } else {
380  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
381  }
382 
383  chunkConverters.push_back(std::move(converter));
384 
385  } else if (chunk_cd->columnType.is_date_in_days()) {
386  /* Q: Why do we need this?
387  A: In variable length updates path we move the chunk content of column
388  without decoding. Since it again passes through DateDaysEncoder
389  the expected value should be in seconds, but here it will be in days.
390  Therefore, using DateChunkConverter chunk values are being scaled to
391  seconds which then ultimately encoded in days in DateDaysEncoder.
392  */
393  std::unique_ptr<ChunkToInsertDataConverter> converter;
394  const size_t physical_size = chunk_cd->columnType.get_size();
395  if (physical_size == 2) {
396  converter =
397  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
398  } else if (physical_size == 4) {
399  converter =
400  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
401  } else {
402  CHECK(false);
403  }
404  chunkConverters.push_back(std::move(converter));
405  } else {
406  std::unique_ptr<ChunkToInsertDataConverter> converter;
407  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
408  int logical_size = logical_type.get_size();
409  int physical_size = chunk_cd->columnType.get_size();
410 
411  if (logical_type.is_string()) {
412  // for dicts -> logical = physical
413  logical_size = physical_size;
414  }
415 
416  if (8 == physical_size) {
417  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
418  num_rows, chunk.get());
419  } else if (4 == physical_size) {
420  if (8 == logical_size) {
421  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
422  num_rows, chunk.get());
423  } else {
424  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
425  num_rows, chunk.get());
426  }
427  } else if (2 == chunk_cd->columnType.get_size()) {
428  if (8 == logical_size) {
429  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
430  num_rows, chunk.get());
431  } else if (4 == logical_size) {
432  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
433  num_rows, chunk.get());
434  } else {
435  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
436  num_rows, chunk.get());
437  }
438  } else if (1 == chunk_cd->columnType.get_size()) {
439  if (8 == logical_size) {
440  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
441  num_rows, chunk.get());
442  } else if (4 == logical_size) {
443  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
444  num_rows, chunk.get());
445  } else if (2 == logical_size) {
446  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
447  num_rows, chunk.get());
448  } else {
449  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
450  num_rows, chunk.get());
451  }
452  } else {
453  CHECK(false); // unknown
454  }
455 
456  chunkConverters.push_back(std::move(converter));
457  }
458  }
459  }
460 
461  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
462  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
463 
464  updelRoll.addDirtyChunk(deletedChunk, fragment.fragmentId);
465  bool* deletedChunkBuffer =
466  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
467 
468  std::atomic<size_t> row_idx{0};
469 
470  auto row_converter = [&sourceDataProvider,
471  &sourceDataConverters,
472  &indexOffFragmentOffsetColumn,
473  &chunkConverters,
474  &deletedChunkBuffer,
475  &row_idx](size_t indexOfEntry) -> void {
476  // convert the source data
477  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
478  if (row.empty()) {
479  return;
480  }
481 
482  size_t indexOfRow = row_idx.fetch_add(1);
483 
484  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
485  if (sourceDataConverters[col]) {
486  const auto& mapd_variant = row[col];
487  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
488  }
489  }
490 
491  auto scalar = checked_get(
492  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
493  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
494 
495  // convert the remaining chunks
496  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
497  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
498  }
499 
500  // now mark the row as deleted
501  deletedChunkBuffer[indexInChunkBuffer] = true;
502  };
503 
504  bool can_go_parallel = num_rows > 20000;
505 
506  if (can_go_parallel) {
507  const size_t num_worker_threads = cpu_threads();
508  std::vector<std::future<void>> worker_threads;
509  for (size_t i = 0,
510  start_entry = 0,
511  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
512  i < num_worker_threads && start_entry < num_entries;
513  ++i, start_entry += stride) {
514  const auto end_entry = std::min(start_entry + stride, num_rows);
515  worker_threads.push_back(std::async(
517  [&row_converter](const size_t start, const size_t end) {
518  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
519  row_converter(indexOfRow);
520  }
521  },
522  start_entry,
523  end_entry));
524  }
525 
526  for (auto& child : worker_threads) {
527  child.wait();
528  }
529 
530  } else {
531  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
532  row_converter(entryIdx);
533  }
534  }
535 
537  insert_data.databaseId = catalog->getCurrentDB().dbId;
538  insert_data.tableId = td->tableId;
539 
540  for (size_t i = 0; i < chunkConverters.size(); i++) {
541  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
542  continue;
543  }
544 
545  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
546  if (sourceDataConverters[i]) {
547  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
548  }
549  continue;
550  }
551 
552  insert_data.numRows = num_rows;
553  insert_data.is_default.resize(insert_data.columnIds.size(), false);
554  insertDataNoCheckpoint(insert_data);
555 
556  // update metdata for deleted chunk as we are doing special handling
557  auto chunkMetadata =
558  updelRoll.getChunkMetadata({td, &fragment}, indexOfDeletedColumn, fragment);
559  chunkMetadata->chunkStats.max.boolval = 1;
560 
561  // Im not completely sure that we need to do this in fragmented and on the buffer
562  // but leaving this alone for now
563  if (!deletedChunk->getBuffer()->hasEncoder()) {
564  deletedChunk->initEncoder();
565  }
566  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
567 
568  if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
569  // An append to the same fragment will increase shadowNumTuples.
570  // Update NumElems in this case. Otherwise, use existing NumElems.
571  deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
572  }
573  deletedChunk->getBuffer()->setUpdated();
574 }
575 
576 namespace {
577 inline void update_metadata(SQLTypeInfo const& ti,
579  int64_t const updated_val,
580  int64_t const old_val,
581  NullSentinelSupplier s = NullSentinelSupplier()) {
582  if (ti.get_notnull()) {
584  update_stats.new_values_stats.max_int64t,
585  updated_val);
587  update_stats.old_values_stats.max_int64t,
588  old_val);
589  } else {
591  update_stats.new_values_stats.max_int64t,
592  update_stats.new_values_stats.has_null,
593  updated_val,
594  s(ti, updated_val));
596  update_stats.old_values_stats.max_int64t,
597  update_stats.old_values_stats.has_null,
598  old_val,
599  s(ti, old_val));
600  }
601 }
602 
603 inline void update_metadata(SQLTypeInfo const& ti,
604  ChunkUpdateStats& update_stats,
605  double const updated_val,
606  double const old_val,
607  NullSentinelSupplier s = NullSentinelSupplier()) {
608  if (ti.get_notnull()) {
610  update_stats.new_values_stats.max_double,
611  updated_val);
613  update_stats.old_values_stats.max_double,
614  old_val);
615  } else {
617  update_stats.new_values_stats.max_double,
618  update_stats.new_values_stats.has_null,
619  updated_val,
620  s(ti, updated_val));
622  update_stats.old_values_stats.max_double,
623  update_stats.old_values_stats.has_null,
624  old_val,
625  s(ti, old_val));
626  }
627 }
628 
629 inline void update_metadata(UpdateValuesStats& agg_stats,
630  const UpdateValuesStats& new_stats) {
631  agg_stats.has_null = agg_stats.has_null || new_stats.has_null;
632  agg_stats.max_double = std::max<double>(agg_stats.max_double, new_stats.max_double);
633  agg_stats.min_double = std::min<double>(agg_stats.min_double, new_stats.min_double);
634  agg_stats.max_int64t = std::max<int64_t>(agg_stats.max_int64t, new_stats.max_int64t);
635  agg_stats.min_int64t = std::min<int64_t>(agg_stats.min_int64t, new_stats.min_int64t);
636 }
637 } // namespace
638 
639 std::optional<ChunkUpdateStats> InsertOrderFragmenter::updateColumn(
640  const Catalog_Namespace::Catalog* catalog,
641  const TableDescriptor* td,
642  const ColumnDescriptor* cd,
643  const int fragment_id,
644  const std::vector<uint64_t>& frag_offsets,
645  const std::vector<ScalarTargetValue>& rhs_values,
646  const SQLTypeInfo& rhs_type,
647  const Data_Namespace::MemoryLevel memory_level,
648  UpdelRoll& updel_roll) {
649  updel_roll.catalog = catalog;
650  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
651  updel_roll.memoryLevel = memory_level;
652 
653  const size_t ncore = cpu_threads();
654  const auto nrow = frag_offsets.size();
655  const auto n_rhs_values = rhs_values.size();
656  if (0 == nrow) {
657  return {};
658  }
659  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
660 
661  auto fragment_ptr = getFragmentInfo(fragment_id);
662  auto& fragment = *fragment_ptr;
663  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
664  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
665  ChunkKey chunk_key{
666  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
667  auto chunk = Chunk_NS::Chunk::getChunk(cd,
668  &catalog->getDataMgr(),
669  chunk_key,
671  0,
672  chunk_meta_it->second->numBytes,
673  chunk_meta_it->second->numElements);
674 
675  std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
676 
677  // parallel update elements
678  std::vector<std::future<void>> threads;
679 
680  const auto segsz = (nrow + ncore - 1) / ncore;
681  auto dbuf = chunk->getBuffer();
682  auto dbuf_addr = dbuf->getMemoryPtr();
683  dbuf->setUpdated();
684  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
685  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
686  threads.emplace_back(std::async(
687  std::launch::async, [=, &update_stats_per_thread, &frag_offsets, &rhs_values] {
688  SQLTypeInfo lhs_type = cd->columnType;
689 
690  // !! not sure if this is a undocumented convention or a bug, but for a sharded
691  // table the dictionary id of a encoded string column is not specified by
692  // comp_param in physical table but somehow in logical table :) comp_param in
693  // physical table is always 0, so need to adapt accordingly...
694  auto cdl = (shard_ < 0)
695  ? cd
696  : catalog->getMetadataForColumn(
697  catalog->getLogicalTableId(td->tableId), cd->columnId);
698  CHECK(cdl);
699  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
700  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
701  lhs_type, &decimalOverflowValidator);
702  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
703  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
704  lhs_type, &dateDaysOverflowValidator);
705 
706  StringDictionary* stringDict{nullptr};
707  if (lhs_type.is_string()) {
708  CHECK(kENCODING_DICT == lhs_type.get_compression());
709  auto dictDesc = const_cast<DictDescriptor*>(
710  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
711  CHECK(dictDesc);
712  stringDict = dictDesc->stringDict.get();
713  CHECK(stringDict);
714  }
715 
716  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
717  const auto roffs = frag_offsets[r];
718  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
719  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
720  ScalarTargetValue sv2;
721 
722  // Subtle here is on the two cases of string-to-string assignments, when
723  // upstream passes RHS string as a string index instead of a preferred "real
724  // string".
725  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
726  // index
727  // in this layer, so if upstream passes a str idx here, an
728  // exception is thrown.
729  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
730  // str idx.
731  if (rhs_type.is_string()) {
732  if (const auto vp = boost::get<int64_t>(sv)) {
733  auto dictDesc = const_cast<DictDescriptor*>(
734  catalog->getMetadataForDict(rhs_type.get_comp_param()));
735  if (nullptr == dictDesc) {
736  throw std::runtime_error(
737  "UPDATE does not support cast from string literal to string "
738  "column.");
739  }
740  auto stringDict = dictDesc->stringDict.get();
741  CHECK(stringDict);
742  sv2 = NullableString(stringDict->getString(*vp));
743  sv = &sv2;
744  }
745  }
746 
747  if (const auto vp = boost::get<int64_t>(sv)) {
748  auto v = *vp;
749  if (lhs_type.is_string()) {
750  throw std::runtime_error("UPDATE does not support cast to string.");
751  }
752  int64_t old_val;
753  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
754  // Handle special case where date column with date in days encoding stores
755  // metadata in epoch seconds.
756  if (lhs_type.is_date_in_days()) {
758  }
759  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
760  if (lhs_type.is_decimal()) {
761  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
762  int64_t decimal_val;
763  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
764  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
765  lhs_type.get_notnull() == false)
766  ? v
767  : decimal_val;
769  lhs_type, update_stats_per_thread[c], target_value, old_val);
770  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
771  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
772  if (positive_v_and_negative_d || negative_v_and_positive_d) {
773  throw std::runtime_error(
774  "Data conversion overflow on " + std::to_string(v) +
775  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
776  std::to_string(rhs_type.get_scale()) + ") to (" +
777  std::to_string(lhs_type.get_dimension()) + ", " +
778  std::to_string(lhs_type.get_scale()) + ")");
779  }
780  } else if (is_integral(lhs_type)) {
781  if (lhs_type.is_date_in_days()) {
782  // Store meta values in seconds
783  if (lhs_type.get_size() == 2) {
784  nullAwareDateOverflowValidator.validate<int16_t>(v);
785  } else {
786  nullAwareDateOverflowValidator.validate<int32_t>(v);
787  }
788  int64_t days;
789  get_scalar<int64_t>(data_ptr, lhs_type, days);
790  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
791  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
792  lhs_type.get_notnull() == false)
793  ? NullSentinelSupplier()(lhs_type, v)
794  : seconds;
796  lhs_type, update_stats_per_thread[c], target_value, old_val);
797  } else {
798  int64_t target_value;
799  if (rhs_type.is_decimal()) {
800  target_value = round(decimal_to_double(rhs_type, v));
801  } else {
802  target_value = v;
803  }
805  lhs_type, update_stats_per_thread[c], target_value, old_val);
806  }
807  } else {
808  if (rhs_type.is_decimal()) {
809  update_metadata(lhs_type,
810  update_stats_per_thread[c],
811  decimal_to_double(rhs_type, v),
812  double(old_val));
813  } else {
814  update_metadata(lhs_type, update_stats_per_thread[c], v, old_val);
815  }
816  }
817  } else if (const auto vp = boost::get<double>(sv)) {
818  auto v = *vp;
819  if (lhs_type.is_string()) {
820  throw std::runtime_error("UPDATE does not support cast to string.");
821  }
822  double old_val;
823  get_scalar<double>(data_ptr, lhs_type, old_val);
824  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
825  if (lhs_type.is_integer()) {
827  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
828  } else if (lhs_type.is_fp()) {
830  lhs_type, update_stats_per_thread[c], double(v), double(old_val));
831  } else {
832  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
833  "LHS with a floating RHS.";
834  }
835  } else if (const auto vp = boost::get<float>(sv)) {
836  auto v = *vp;
837  if (lhs_type.is_string()) {
838  throw std::runtime_error("UPDATE does not support cast to string.");
839  }
840  float old_val;
841  get_scalar<float>(data_ptr, lhs_type, old_val);
842  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
843  if (lhs_type.is_integer()) {
845  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
846  } else {
847  update_metadata(lhs_type, update_stats_per_thread[c], double(v), old_val);
848  }
849  } else if (const auto vp = boost::get<NullableString>(sv)) {
850  const auto s = boost::get<std::string>(vp);
851  const auto sval = s ? *s : std::string("");
852  if (lhs_type.is_string()) {
853  decltype(stringDict->getOrAdd(sval)) sidx;
854  {
855  std::unique_lock<std::mutex> lock(temp_mutex_);
856  sidx = stringDict->getOrAdd(sval);
857  }
858  int64_t old_val;
859  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
860  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
862  lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
863  } else if (sval.size() > 0) {
864  auto dval = std::atof(sval.data());
865  if (lhs_type.is_boolean()) {
866  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
867  } else if (lhs_type.is_time()) {
868  throw std::runtime_error(
869  "Date/Time/Timestamp update not supported through translated "
870  "string path.");
871  }
872  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
873  double old_val;
874  get_scalar<double>(data_ptr, lhs_type, old_val);
875  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
877  lhs_type, update_stats_per_thread[c], double(dval), old_val);
878  } else {
879  int64_t old_val;
880  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
881  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
883  lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
884  }
885  } else {
886  put_null(data_ptr, lhs_type, cd->columnName);
887  update_stats_per_thread[c].new_values_stats.has_null = true;
888  }
889  } else {
890  CHECK(false);
891  }
892  }
893  }));
894  if (threads.size() >= (size_t)cpu_threads()) {
895  wait_cleanup_threads(threads);
896  }
897  }
898  wait_cleanup_threads(threads);
899 
900  // for unit test
902  if (cd->isDeletedCol) {
903  const auto deleted_offsets = getVacuumOffsets(chunk);
904  if (deleted_offsets.size() > 0) {
905  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
906  return {};
907  }
908  }
909  }
911  for (size_t c = 0; c < ncore; ++c) {
912  update_metadata(update_stats.new_values_stats,
913  update_stats_per_thread[c].new_values_stats);
914  update_metadata(update_stats.old_values_stats,
915  update_stats_per_thread[c].old_values_stats);
916  }
917 
918  CHECK_GT(fragment.shadowNumTuples, size_t(0));
920  cd, fragment, chunk, update_stats.new_values_stats, cd->columnType, updel_roll);
921  update_stats.updated_rows_count = nrow;
922  update_stats.fragment_rows_count = fragment.shadowNumTuples;
923  update_stats.chunk = chunk;
924  return update_stats;
925 }
926 
928  const ColumnDescriptor* cd,
929  FragmentInfo& fragment,
930  std::shared_ptr<Chunk_NS::Chunk> chunk,
931  const UpdateValuesStats& new_values_stats,
932  const SQLTypeInfo& rhs_type,
933  UpdelRoll& updel_roll) {
935  auto buffer = chunk->getBuffer();
936  const auto& lhs_type = cd->columnType;
937 
938  auto encoder = buffer->getEncoder();
939  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
940  static_assert(std::is_same<decltype(min), decltype(max)>::value,
941  "Type mismatch on min/max");
942  if (has_null) {
943  encoder->updateStats(decltype(min)(), true);
944  }
945  if (max < min) {
946  return;
947  }
948  encoder->updateStats(min, false);
949  encoder->updateStats(max, false);
950  };
951 
952  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
953  update_stats(new_values_stats.min_int64t,
954  new_values_stats.max_int64t,
955  new_values_stats.has_null);
956  } else if (lhs_type.is_fp()) {
957  update_stats(new_values_stats.min_double,
958  new_values_stats.max_double,
959  new_values_stats.has_null);
960  } else if (lhs_type.is_decimal()) {
961  update_stats((int64_t)(new_values_stats.min_double * pow(10, lhs_type.get_scale())),
962  (int64_t)(new_values_stats.max_double * pow(10, lhs_type.get_scale())),
963  new_values_stats.has_null);
964  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
965  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
966  update_stats(new_values_stats.min_int64t,
967  new_values_stats.max_int64t,
968  new_values_stats.has_null);
969  }
970  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
971  auto chunk_metadata =
972  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
973  buffer->getEncoder()->getMetadata(chunk_metadata);
974 }
975 
977  const MetaDataKey& key,
978  UpdelRoll& updel_roll) {
980  const auto chunk_metadata_map = updel_roll.getChunkMetadataMap(key);
981  auto& fragmentInfo = *key.second;
982  fragmentInfo.setChunkMetadataMap(chunk_metadata_map);
983  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysicalCopy();
984  fragmentInfo.shadowNumTuples = updel_roll.getNumTuple(key);
985  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
986 }
987 
989  const TableDescriptor* td,
990  const FragmentInfo& fragment,
991  const Data_Namespace::MemoryLevel memory_level) {
992  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
993  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
994  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
995  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
996  ++ncol;
997  if (!cd->isVirtualCol) {
998  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
999  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1000  ChunkKey chunk_key{
1001  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1002  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1003  &catalog_->getDataMgr(),
1004  chunk_key,
1005  memory_level,
1006  0,
1007  chunk_meta_it->second->numBytes,
1008  chunk_meta_it->second->numElements);
1009  chunks.push_back(chunk);
1010  }
1011  }
1012  }
1013  return chunks;
1014 }
1015 
1016 // get a sorted vector of offsets of rows to vacuum
1017 const std::vector<uint64_t> InsertOrderFragmenter::getVacuumOffsets(
1018  const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1019  const auto data_buffer = chunk->getBuffer();
1020  const auto data_addr = data_buffer->getMemoryPtr();
1021  const size_t nrows_in_chunk = data_buffer->size();
1022  const size_t ncore = cpu_threads();
1023  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1024  std::vector<std::vector<uint64_t>> deleted_offsets;
1025  deleted_offsets.resize(ncore);
1026  std::vector<std::future<void>> threads;
1027  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1028  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1029  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1030  const auto ithread = rbegin / segsz;
1031  CHECK(ithread < deleted_offsets.size());
1032  deleted_offsets[ithread].reserve(segsz);
1033  for (size_t r = rbegin; r < rend; ++r) {
1034  if (data_addr[r]) {
1035  deleted_offsets[ithread].push_back(r);
1036  }
1037  }
1038  }));
1039  }
1040  wait_cleanup_threads(threads);
1041  std::vector<uint64_t> all_deleted_offsets;
1042  for (size_t i = 0; i < ncore; ++i) {
1043  all_deleted_offsets.insert(
1044  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1045  }
1046  return all_deleted_offsets;
1047 }
1048 
1049 template <typename T>
1050 static void set_chunk_stats(const SQLTypeInfo& col_type,
1051  int8_t* data_addr,
1052  bool& has_null,
1053  T& min,
1054  T& max) {
1055  T v;
1056  const auto can_be_null = !col_type.get_notnull();
1057  const auto is_null = get_scalar<T>(data_addr, col_type, v);
1058  if (is_null) {
1059  has_null = has_null || (can_be_null && is_null);
1060  } else {
1061  set_minmax(min, max, v);
1062  }
1063 }
1064 
1066  FragmentInfo& fragment,
1067  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1068  const size_t nrows_to_keep,
1069  UpdelRoll& updel_roll) {
1070  auto cd = chunk->getColumnDesc();
1071  auto td = catalog->getMetadataForTable(cd->tableId);
1072  auto data_buffer = chunk->getBuffer();
1073  auto chunkMetadata =
1074  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
1075  chunkMetadata->numElements = nrows_to_keep;
1076  chunkMetadata->numBytes = data_buffer->size();
1077  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
1078 }
1079 
1081  const FragmentInfo& fragment,
1082  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1083  const std::vector<uint64_t>& frag_offsets) {
1084  const auto cd = chunk->getColumnDesc();
1085  const auto& col_type = cd->columnType;
1086  auto data_buffer = chunk->getBuffer();
1087  auto data_addr = data_buffer->getMemoryPtr();
1088  auto element_size =
1089  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1090  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1091  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1092  size_t nbytes_fix_data_to_keep = 0;
1093  auto nrows_to_vacuum = frag_offsets.size();
1094  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1095  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1096  auto is_last_one = irow == nrows_to_vacuum;
1097  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1098  auto maddr_to_vacuum = data_addr;
1099  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1100  if (nrows_to_keep > 0) {
1101  auto nbytes_to_keep = nrows_to_keep * element_size;
1102  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1103  // move curr fixlen row block toward front
1104  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1105  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1106  nbytes_to_keep);
1107  }
1108  irow_of_blk_to_fill += nrows_to_keep;
1109  nbytes_fix_data_to_keep += nbytes_to_keep;
1110  }
1111  irow_of_blk_to_keep = irow_to_vacuum + 1;
1112  }
1113  return nbytes_fix_data_to_keep;
1114 }
1115 
1116 // Gets the initial padding required for the chunk buffer. For variable length array
1117 // columns, if the first element after vacuuming is going to be a null array, a padding
1118 // with a value that is greater than 0 is expected.
1119 size_t get_null_padding(bool is_varlen_array,
1120  const std::vector<uint64_t>& frag_offsets,
1121  const StringOffsetT* index_array,
1122  size_t fragment_row_count) {
1123  if (is_varlen_array) {
1124  size_t first_non_deleted_row_index{0};
1125  for (auto deleted_offset : frag_offsets) {
1126  if (first_non_deleted_row_index < deleted_offset) {
1127  break;
1128  } else {
1129  first_non_deleted_row_index++;
1130  }
1131  }
1132  CHECK_LT(first_non_deleted_row_index, fragment_row_count);
1133  if (first_non_deleted_row_index == 0) {
1134  // If the first row in the fragment is not deleted, then the first offset in the
1135  // index buffer/array already contains expected padding.
1136  return index_array[0];
1137  } else {
1138  // If the first non-deleted element is a null array (indentified by a negative
1139  // offset), get a padding value for the chunk buffer.
1140  if (index_array[first_non_deleted_row_index + 1] < 0) {
1141  size_t first_non_zero_offset{0};
1142  for (size_t i = 0; i <= first_non_deleted_row_index; i++) {
1143  if (index_array[i] != 0) {
1144  first_non_zero_offset = index_array[i];
1145  break;
1146  }
1147  }
1148  CHECK_GT(first_non_zero_offset, static_cast<size_t>(0));
1150  first_non_zero_offset);
1151  } else {
1152  return 0;
1153  }
1154  }
1155  } else {
1156  return 0;
1157  }
1158 }
1159 
1160 // Gets the indexes of variable length null arrays in the chunk after vacuuming.
1161 std::set<size_t> get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info,
1162  const std::vector<uint64_t>& frag_offsets,
1163  const StringOffsetT* index_array,
1164  size_t fragment_row_count) {
1165  std::set<size_t> null_array_indexes;
1166  if (sql_type_info.is_varlen_array() && !sql_type_info.get_notnull()) {
1167  size_t frag_offset_index{0};
1168  size_t vacuum_offset{0};
1169  for (size_t i = 0; i < fragment_row_count; i++) {
1170  if (frag_offset_index < frag_offsets.size() &&
1171  i == frag_offsets[frag_offset_index]) {
1172  frag_offset_index++;
1173  vacuum_offset++;
1174  } else if (index_array[i + 1] < 0) {
1175  null_array_indexes.emplace(i - vacuum_offset);
1176  }
1177  }
1178  }
1179  return null_array_indexes;
1180 }
1181 
1182 StringOffsetT get_buffer_offset(bool is_varlen_array,
1183  const StringOffsetT* index_array,
1184  size_t index) {
1185  auto offset = index_array[index];
1186  if (offset < 0) {
1187  // Variable length arrays encode null arrays as negative offsets
1188  CHECK(is_varlen_array);
1189  offset = -offset;
1190  }
1191  return offset;
1192 }
1193 
1195  const FragmentInfo& fragment,
1196  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1197  const std::vector<uint64_t>& frag_offsets) {
1198  auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1199  auto data_buffer = chunk->getBuffer();
1200  CHECK(data_buffer);
1201  auto index_buffer = chunk->getIndexBuf();
1202  CHECK(index_buffer);
1203  auto data_addr = data_buffer->getMemoryPtr();
1204  auto indices_addr = index_buffer->getMemoryPtr();
1205  CHECK(indices_addr);
1206  auto index_array = (StringOffsetT*)indices_addr;
1207  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1208  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1209  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1210  size_t null_padding =
1211  get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1212  size_t nbytes_var_data_to_keep = null_padding;
1213  auto null_array_indexes = get_var_len_null_array_indexes(
1214  chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1215  auto nrows_to_vacuum = frag_offsets.size();
1216  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1217  auto is_last_one = irow == nrows_to_vacuum;
1218  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1219  auto maddr_to_vacuum = data_addr;
1220  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1221  if (nrows_to_keep > 0) {
1222  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1223  auto deleted_row_start_offset =
1224  get_buffer_offset(is_varlen_array, index_array, irow_to_vacuum);
1225  auto kept_row_start_offset =
1226  get_buffer_offset(is_varlen_array, index_array, irow_of_blk_to_keep);
1227  auto nbytes_to_keep =
1228  (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1229  kept_row_start_offset;
1230  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1231  if (nbytes_to_keep > 0) {
1232  CHECK(data_addr);
1233  // move curr varlen row block toward front
1234  memmove(data_addr + ibyte_var_data_to_keep,
1235  data_addr + kept_row_start_offset,
1236  nbytes_to_keep);
1237  }
1238 
1239  const auto base_offset = kept_row_start_offset;
1240  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1241  auto update_index = irow_of_blk_to_keep + i;
1242  auto offset = get_buffer_offset(is_varlen_array, index_array, update_index);
1243  index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1244  }
1245  }
1246  nbytes_var_data_to_keep += nbytes_to_keep;
1247  maddr_to_vacuum = indices_addr;
1248 
1249  constexpr static auto index_element_size = sizeof(StringOffsetT);
1250  nbytes_to_keep = nrows_to_keep * index_element_size;
1251  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1252  // move curr fixlen row block toward front
1253  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1254  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1255  nbytes_to_keep);
1256  }
1257  irow_of_blk_to_fill += nrows_to_keep;
1258  }
1259  irow_of_blk_to_keep = irow_to_vacuum + 1;
1260  }
1261 
1262  // Set expected null padding, last offset, and negative values for null array offsets.
1263  index_array[0] = null_padding;
1264  auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1265  index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1266  if (!is_varlen_array) {
1267  CHECK(null_array_indexes.empty());
1268  }
1269  for (auto index : null_array_indexes) {
1270  index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1271  }
1272  return nbytes_var_data_to_keep;
1273 }
1274 
1276  const TableDescriptor* td,
1277  const int fragment_id,
1278  const std::vector<uint64_t>& frag_offsets,
1279  const Data_Namespace::MemoryLevel memory_level,
1280  UpdelRoll& updel_roll) {
1281  auto fragment_ptr = getFragmentInfo(fragment_id);
1282  auto& fragment = *fragment_ptr;
1283  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1284  const auto ncol = chunks.size();
1285 
1286  std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1287 
1288  // parallel delete columns
1289  std::vector<std::future<void>> threads;
1290  auto nrows_to_vacuum = frag_offsets.size();
1291  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1292  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1293 
1294  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1295  auto chunk = chunks[ci];
1296  const auto cd = chunk->getColumnDesc();
1297  const auto& col_type = cd->columnType;
1298  auto data_buffer = chunk->getBuffer();
1299  auto index_buffer = chunk->getIndexBuf();
1300  auto data_addr = data_buffer->getMemoryPtr();
1301  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1302  auto index_array = (StringOffsetT*)indices_addr;
1303  bool is_varlen = col_type.is_varlen_indeed();
1304 
1305  auto fixlen_vacuum =
1306  [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1307  size_t nbytes_fix_data_to_keep;
1308  if (nrows_to_keep == 0) {
1309  nbytes_fix_data_to_keep = 0;
1310  } else {
1311  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1312  }
1313 
1314  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1315  data_buffer->setSize(nbytes_fix_data_to_keep);
1316  data_buffer->setUpdated();
1317 
1318  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1319 
1320  auto daddr = data_addr;
1321  auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1322  : get_element_size(col_type);
1323  data_buffer->getEncoder()->resetChunkStats();
1324  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1325  if (col_type.is_fixlen_array()) {
1326  auto encoder =
1327  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1328  CHECK(encoder);
1329  encoder->updateMetadata((int8_t*)daddr);
1330  } else if (col_type.is_fp()) {
1331  set_chunk_stats(col_type,
1332  daddr,
1333  update_stats_per_thread[ci].new_values_stats.has_null,
1334  update_stats_per_thread[ci].new_values_stats.min_double,
1335  update_stats_per_thread[ci].new_values_stats.max_double);
1336  } else {
1337  set_chunk_stats(col_type,
1338  daddr,
1339  update_stats_per_thread[ci].new_values_stats.has_null,
1340  update_stats_per_thread[ci].new_values_stats.min_int64t,
1341  update_stats_per_thread[ci].new_values_stats.max_int64t);
1342  }
1343  }
1344  };
1345 
1346  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1347  size_t nbytes_var_data_to_keep;
1348  if (nrows_to_keep == 0) {
1349  nbytes_var_data_to_keep = 0;
1350  } else {
1351  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1352  }
1353 
1354  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1355  data_buffer->setSize(nbytes_var_data_to_keep);
1356  data_buffer->setUpdated();
1357 
1358  index_buffer->setSize(sizeof(*index_array) *
1359  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1360  index_buffer->setUpdated();
1361 
1362  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1363  };
1364 
1365  if (is_varlen) {
1366  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1367  } else {
1368  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1369  }
1370  if (threads.size() >= (size_t)cpu_threads()) {
1371  wait_cleanup_threads(threads);
1372  }
1373  }
1374 
1375  wait_cleanup_threads(threads);
1376 
1377  updel_roll.setNumTuple({td, &fragment}, nrows_to_keep);
1378  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1379  auto chunk = chunks[ci];
1380  auto cd = chunk->getColumnDesc();
1381  if (!cd->columnType.is_fixlen_array()) {
1382  // For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is
1383  // stored in seconds. Do the metadata conversion here before updating the chunk
1384  // stats.
1385  if (cd->columnType.is_date_in_days()) {
1386  auto& stats = update_stats_per_thread[ci].new_values_stats;
1387  stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t);
1388  stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t);
1389  }
1391  fragment,
1392  chunk,
1393  update_stats_per_thread[ci].new_values_stats,
1394  cd->columnType,
1395  updel_roll);
1396  }
1397  }
1398 }
1399 
1400 } // namespace Fragmenter_Namespace
1401 
1403  if (nullptr == catalog) {
1404  return false;
1405  }
1406  const auto td = catalog->getMetadataForTable(logicalTableId);
1407  CHECK(td);
1408  ChunkKey chunk_key{catalog->getDatabaseId(), td->tableId};
1409  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1410 
1411  // Checkpoint all shards. Otherwise, epochs can go out of sync.
1413  auto table_epochs = catalog->getTableEpochs(catalog->getDatabaseId(), logicalTableId);
1414  try {
1415  // `checkpointWithAutoRollback` is not called here because, if a failure occurs,
1416  // `dirtyChunks` has to be cleared before resetting epochs
1417  catalog->checkpoint(logicalTableId);
1418  } catch (...) {
1419  dirty_chunks.clear();
1420  catalog->setTableEpochsLogExceptions(catalog->getDatabaseId(), table_epochs);
1421  throw;
1422  }
1423  }
1424  updateFragmenterAndCleanupChunks();
1425  return true;
1426 }
1427 
1429  CHECK(catalog);
1430  auto db_id = catalog->getDatabaseId();
1431  CHECK(table_descriptor);
1432  auto table_id = table_descriptor->tableId;
1434  CHECK_EQ(table_descriptor->persistenceLevel, Data_Namespace::MemoryLevel::DISK_LEVEL);
1435  try {
1436  catalog->getDataMgr().checkpoint(db_id, table_id, memoryLevel);
1437  } catch (...) {
1438  dirty_chunks.clear();
1439  throw;
1440  }
1441  updateFragmenterAndCleanupChunks();
1442 }
1443 
1445  // for each dirty fragment
1446  for (auto& cm : chunk_metadata_map_per_fragment) {
1447  cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *this);
1448  }
1449 
1450  // flush gpu dirty chunks if update was not on gpu
1451  if (memoryLevel != Data_Namespace::MemoryLevel::GPU_LEVEL) {
1452  for (const auto& [chunk_key, chunk] : dirty_chunks) {
1455  }
1456  }
1457  dirty_chunks.clear();
1458 }
1459 
1461  if (nullptr == catalog) {
1462  return;
1463  }
1464 
1465  // TODO: needed?
1466  ChunkKey chunk_key{catalog->getDatabaseId(), logicalTableId};
1467  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1468  if (is_varlen_update) {
1469  int databaseId = catalog->getDatabaseId();
1470  auto table_epochs = catalog->getTableEpochs(databaseId, logicalTableId);
1471 
1472  dirty_chunks.clear();
1473  catalog->setTableEpochs(databaseId, table_epochs);
1474  } else {
1475  const auto td = catalog->getMetadataForTable(logicalTableId);
1476  CHECK(td);
1477  if (td->persistenceLevel != memoryLevel) {
1478  for (const auto& [chunk_key, chunk] : dirty_chunks) {
1479  catalog->getDataMgr().free(chunk->getBuffer());
1480  chunk->setBuffer(nullptr);
1481  }
1482  }
1483  }
1484 }
1485 
1486 void UpdelRoll::addDirtyChunk(std::shared_ptr<Chunk_NS::Chunk> chunk,
1487  int32_t fragment_id) {
1488  heavyai::unique_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1489  CHECK(catalog);
1490  ChunkKey chunk_key{catalog->getDatabaseId(),
1491  chunk->getColumnDesc()->tableId,
1492  chunk->getColumnDesc()->columnId,
1493  fragment_id};
1494  dirty_chunks[chunk_key] = chunk;
1495 }
1496 
1498  const TableDescriptor* td,
1499  Fragmenter_Namespace::FragmentInfo& fragment_info) {
1500  heavyai::unique_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1501  MetaDataKey key{td, &fragment_info};
1502  if (chunk_metadata_map_per_fragment.count(key) == 0) {
1503  chunk_metadata_map_per_fragment[key] =
1504  fragment_info.getChunkMetadataMapPhysicalCopy();
1505  }
1506  if (num_tuples.count(key) == 0) {
1507  num_tuples[key] = fragment_info.shadowNumTuples;
1508  }
1509 }
1510 
1511 std::shared_ptr<ChunkMetadata> UpdelRoll::getChunkMetadata(
1512  const MetaDataKey& key,
1513  int32_t column_id,
1514  Fragmenter_Namespace::FragmentInfo& fragment_info) {
1515  initializeUnsetMetadata(key.first, fragment_info);
1516  heavyai::shared_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1517  auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1518  CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1519  auto chunk_metadata_it = metadata_map_it->second.find(column_id);
1520  CHECK(chunk_metadata_it != metadata_map_it->second.end());
1521  return chunk_metadata_it->second;
1522 }
1523 
1525  heavyai::shared_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1526  auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1527  CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1528  return metadata_map_it->second;
1529 }
1530 
1531 size_t UpdelRoll::getNumTuple(const MetaDataKey& key) const {
1532  heavyai::shared_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1533  auto it = num_tuples.find(key);
1534  CHECK(it != num_tuples.end());
1535  return it->second;
1536 }
1537 
1538 void UpdelRoll::setNumTuple(const MetaDataKey& key, size_t num_tuple) {
1539  heavyai::unique_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1540  num_tuples[key] = num_tuple;
1541 }
std::shared_ptr< Chunk_NS::Chunk > chunk
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:55
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
bool is_varlen_update
Definition: UpdelRoll.h:57
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:148
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
void setNumTuple(const MetaDataKey &key, size_t num_tuple)
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
ChunkMetadataMap getChunkMetadataMapPhysicalCopy() const
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t >> ColumnDataPtr
bool is_varlen_array() const
Definition: sqltypes.h:588
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:234
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:235
const ColumnDescriptor * column_descriptor_
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:41
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:266
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:634
void cancelUpdate()
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
bool is_fp() const
Definition: sqltypes.h:573
HOST DEVICE int get_scale() const
Definition: sqltypes.h:396
virtual int8_t * getMemoryPtr()=0
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
const ChunkMetadataMap & getChunkMetadataMapPhysical() const
Definition: Fragmenter.h:108
#define UNREACHABLE()
Definition: Logger.h:338
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1472
std::vector< bool > is_default
Definition: Fragmenter.h:75
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:114
bool g_enable_auto_metadata_update
void updateFragmenterAndCleanupChunks()
dictionary stats
Definition: report.py:116
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
bool is_time() const
Definition: sqltypes.h:579
std::string to_string(char const *&&v)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
int32_t StringOffsetT
Definition: sqltypes.h:1495
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
bool g_enable_string_functions
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
std::shared_lock< T > shared_lock
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:229
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
const ColumnDescriptor * getColumnDesc() const
Definition: Chunk.h:65
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
bool commitUpdate()
void stageUpdate()
CONSTEXPR DEVICE bool is_null(const T &value)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:265
bool is_integer() const
Definition: sqltypes.h:567
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:86
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
ChunkMetadataMap getChunkMetadataMap(const MetaDataKey &key) const
bool is_timeinterval() const
Definition: sqltypes.h:594
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData)=0
virtual size_t const getEntryCount() const =0
std::unique_lock< T > unique_lock
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getDatabaseId() const
Definition: Catalog.h:326
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:5018
void initializeUnsetMetadata(const TableDescriptor *td, Fragmenter_Namespace::FragmentInfo &fragment_info)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1907
specifies the content in-memory of a row in the column metadata table
void validate(T value)
Definition: Encoder.h:98
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
bool is_boolean() const
Definition: sqltypes.h:582
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:5032
#define CHECK_LT(x, y)
Definition: Logger.h:303
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:572
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:73
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
AbstractBuffer * getBuffer() const
Definition: Chunk.h:146
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:1496
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
virtual void convertToColumnarFormat(size_t row, size_t indexInFragment)=0
Data_Namespace::MemoryLevel persistenceLevel
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:393
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:179
int logicalTableId
Definition: UpdelRoll.h:54
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:402
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
size_t getNumTuple(const MetaDataKey &key) const
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3859
#define CHECK(condition)
Definition: Logger.h:291
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static bool is_null(const SQLTypeInfo &type, int8_t *array)
void setTableEpochsLogExceptions(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3895
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
bool is_string() const
Definition: sqltypes.h:561
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
void free(AbstractBuffer *buffer)
Definition: DataMgr.cpp:614
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:398
int8_t * numbersPtr
Definition: sqltypes.h:233
unencoded array encoder
void set_minmax(T &min, T &max, T const val)
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
int cpu_threads()
Definition: thread_count.h:25
bool is_decimal() const
Definition: sqltypes.h:570
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
std::string columnName
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE >> ColumnDataPtr
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3831
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)