OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ArrowForeignStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ArrowForeignStorage.h"
18 
19 #include <arrow/api.h>
20 #include <arrow/csv/reader.h>
21 #include <arrow/io/file.h>
22 #include <arrow/util/decimal.h>
23 #include <tbb/parallel_for.h>
24 #include <tbb/task_group.h>
25 #include <array>
26 #include <future>
27 #include <vector>
28 
32 #include "Logger/Logger.h"
34 #include "Shared/ArrowUtil.h"
35 #include "Shared/measure.h"
36 
37 struct Frag {
38  size_t first_chunk; // index of the first chunk assigned to the fragment
39  size_t first_chunk_offset; // offset from the begining of the first chunk
40  size_t last_chunk; // index of the last chunk
41  size_t last_chunk_size; // number of elements in the last chunk
42 };
43 
44 struct ArrowFragment {
45  int64_t offset{0};
46  int64_t sz{0};
47  std::vector<std::shared_ptr<arrow::ArrayData>> chunks;
48 };
49 
51  public:
52  void append(const std::vector<ForeignStorageColumnBuffer>& column_buffers) override;
53 
54  void read(const ChunkKey& chunk_key,
55  const SQLTypeInfo& sql_type,
56  int8_t* dest,
57  const size_t numBytes) override;
58 
59  int8_t* tryZeroCopy(const ChunkKey& chunk_key,
60  const SQLTypeInfo& sql_type,
61  const size_t numBytes) override;
62 
63  void dropTable(const int db_id, const int table_id) override;
64 
66  std::pair<int, int> table_key,
67  const std::string& type,
68  const TableDescriptor& td,
69  const std::list<ColumnDescriptor>& cols,
70  Data_Namespace::AbstractBufferMgr* mgr,
71  const arrow::Table& table);
72 
73  std::shared_ptr<arrow::ChunkedArray> createDictionaryEncodedColumn(
74  StringDictionary* dict,
75  const ColumnDescriptor& c,
76  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
77 
78  std::shared_ptr<arrow::ChunkedArray> convertArrowDictionary(
79  StringDictionary* dict,
80  const ColumnDescriptor& c,
81  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
82 
83  template <typename T, typename ChunkType>
84  std::shared_ptr<arrow::ChunkedArray> createDecimalColumn(
85  const ColumnDescriptor& c,
86  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
87 
88  std::shared_ptr<arrow::ChunkedArray> replaceNullValues(
89  const SQLTypeInfo& columnType,
90  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
91 
92  template <typename T>
93  std::shared_ptr<arrow::ChunkedArray> replaceNullValuesImpl(
94  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
95 
96  void getSizeAndOffset(const Frag& frag,
97  const std::shared_ptr<arrow::Array>& chunk,
98  size_t i,
99  int& size,
100  int& offset);
101 
102  int64_t makeFragment(const Frag& frag,
103  ArrowFragment& arrowFrag,
104  const std::vector<std::shared_ptr<arrow::Array>>& chunks,
105  bool is_varlen);
106 
107  std::map<std::array<int, 3>, std::vector<ArrowFragment>> m_columns;
108 };
109 
110 std::shared_ptr<arrow::ChunkedArray> ArrowForeignStorageBase::replaceNullValues(
111  const SQLTypeInfo& columnType,
112  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
113  const size_t typeSize = columnType.get_size();
114  if (columnType.is_integer() || is_datetime(columnType.get_type())) {
115  switch (typeSize) {
116  case 1:
117  return replaceNullValuesImpl<int8_t>(arr_col_chunked_array);
118  case 2:
119  return replaceNullValuesImpl<int16_t>(arr_col_chunked_array);
120  case 4:
121  return replaceNullValuesImpl<int32_t>(arr_col_chunked_array);
122  case 8:
123  return replaceNullValuesImpl<int64_t>(arr_col_chunked_array);
124  default:
125  // TODO: throw unsupported integer type exception
126  CHECK(false);
127  }
128  } else if (columnType.is_fp()) {
129  switch (typeSize) {
130  case 4:
131  return replaceNullValuesImpl<float>(arr_col_chunked_array);
132  case 8:
133  return replaceNullValuesImpl<double>(arr_col_chunked_array);
134  }
135  } else if (columnType.is_boolean()) {
136  return replaceNullValuesImpl<bool>(arr_col_chunked_array);
137  }
138  CHECK(false);
139  return nullptr;
140 }
141 
143  const uint8_t* src,
144  const uint8_t* bitmap,
145  int64_t length,
146  int8_t null_value) {
147  for (int64_t bitmap_idx = 0; bitmap_idx < length / 8; ++bitmap_idx) {
148  auto source = src[bitmap_idx];
149  auto dest = dst + bitmap_idx * 8;
150  auto inversed_bitmap = ~bitmap[bitmap_idx];
151  for (int8_t bitmap_offset = 0; bitmap_offset < 8; ++bitmap_offset) {
152  auto is_null = (inversed_bitmap >> bitmap_offset) & 1;
153  auto val = (source >> bitmap_offset) & 1;
154  dest[bitmap_offset] = is_null ? null_value : val;
155  }
156  }
157 
158  for (int64_t j = (length / 8) * 8; j < length; ++j) {
159  auto is_null = (~bitmap[length / 8] >> (j % 8)) & 1;
160  auto val = (src[length / 8] >> (j % 8)) & 1;
161  dst[j] = is_null ? null_value : val;
162  }
163 }
164 
166  const uint8_t* src,
167  int64_t length) {
168  for (int64_t bitmap_idx = 0; bitmap_idx < length / 8; ++bitmap_idx) {
169  auto source = src[bitmap_idx];
170  auto dest = dst + bitmap_idx * 8;
171  for (int8_t bitmap_offset = 0; bitmap_offset < 8; ++bitmap_offset) {
172  dest[bitmap_offset] = (source >> bitmap_offset) & 1;
173  }
174  }
175 
176  for (int64_t j = (length / 8) * 8; j < length; ++j) {
177  dst[j] = (src[length / 8] >> (j % 8)) & 1;
178  }
179 }
180 
181 template <
182  typename V,
183  std::enable_if_t<!std::is_same_v<V, bool> && std::is_integral<V>::value, int> = 0>
184 inline V get_null_value() {
185  return inline_int_null_value<V>();
186 }
187 
188 template <typename V, std::enable_if_t<std::is_same_v<V, bool>, int> = 0>
189 inline int8_t get_null_value() {
190  return inline_int_null_value<int8_t>();
191 }
192 
193 template <typename V, std::enable_if_t<std::is_floating_point<V>::value, int> = 0>
194 inline V get_null_value() {
195  return inline_fp_null_value<V>();
196 }
197 
198 template <typename T>
199 std::shared_ptr<arrow::ChunkedArray> ArrowForeignStorageBase::replaceNullValuesImpl(
200  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
201  if ((!std::is_same_v<T, bool>)&&(arr_col_chunked_array->null_count() == 0)) {
202  // for boolean columns we still need to convert bitmaps to array
203  return arr_col_chunked_array;
204  }
205 
206  auto null_value = get_null_value<T>();
207 
208  auto resultBuf =
209  arrow::AllocateBuffer(sizeof(T) * arr_col_chunked_array->length()).ValueOrDie();
210  auto resultData = reinterpret_cast<T*>(resultBuf->mutable_data());
211 
213  tbb::blocked_range<size_t>(0, arr_col_chunked_array->num_chunks()),
214  [&](const tbb::blocked_range<size_t>& r) {
215  for (size_t c = r.begin(); c != r.end(); ++c) {
216  size_t offset = 0;
217  for (size_t i = 0; i < c; i++) {
218  offset += arr_col_chunked_array->chunk(i)->length();
219  }
220  auto resWithOffset = resultData + offset;
221 
222  auto chunk = arr_col_chunked_array->chunk(c);
223 
224  if (chunk->null_count() == chunk->length()) {
225  std::fill(resWithOffset, resWithOffset + chunk->length(), null_value);
226  continue;
227  }
228 
229  auto chunkData = reinterpret_cast<const T*>(chunk->data()->buffers[1]->data());
230 
231  const uint8_t* bitmap_data = chunk->null_bitmap_data();
232  const int64_t length = chunk->length();
233 
234  if (chunk->null_count() == 0) {
235  if constexpr (std::is_same_v<T, bool>) {
237  reinterpret_cast<int8_t*>(resWithOffset),
238  reinterpret_cast<const uint8_t*>(chunkData),
239  length);
240  } else {
241  std::copy(chunkData, chunkData + chunk->length(), resWithOffset);
242  }
243  continue;
244  }
245 
246  if constexpr (std::is_same_v<T, bool>) {
247  convertBoolBitmapBufferWithNulls(reinterpret_cast<int8_t*>(resWithOffset),
248  reinterpret_cast<const uint8_t*>(chunkData),
249  bitmap_data,
250  length,
251  null_value);
252  } else {
253  for (int64_t bitmap_idx = 0; bitmap_idx < length / 8; ++bitmap_idx) {
254  auto source = chunkData + bitmap_idx * 8;
255  auto dest = resWithOffset + bitmap_idx * 8;
256  auto inversed_bitmap = ~bitmap_data[bitmap_idx];
257  for (int8_t bitmap_offset = 0; bitmap_offset < 8; ++bitmap_offset) {
258  auto is_null = (inversed_bitmap >> bitmap_offset) & 1;
259  auto val = is_null ? null_value : source[bitmap_offset];
260  dest[bitmap_offset] = val;
261  }
262  }
263 
264  for (int64_t j = length / 8 * 8; j < length; ++j) {
265  auto is_null = (~bitmap_data[length / 8] >> (j % 8)) & 1;
266  auto val = is_null ? null_value : chunkData[j];
267  resWithOffset[j] = val;
268  }
269  }
270  }
271  });
272 
273  using ArrowType = typename arrow::CTypeTraits<T>::ArrowType;
274  using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType;
275 
276  auto array =
277  std::make_shared<ArrayType>(arr_col_chunked_array->length(), std::move(resultBuf));
278  return std::make_shared<arrow::ChunkedArray>(array);
279 }
280 
282  const std::shared_ptr<arrow::Array>& chunk,
283  size_t i,
284  int& size,
285  int& offset) {
286  offset = (i == frag.first_chunk) ? frag.first_chunk_offset : 0;
287  size = (i == frag.last_chunk) ? frag.last_chunk_size : (chunk->length() - offset);
288 }
289 
291  const Frag& frag,
292  ArrowFragment& arrowFrag,
293  const std::vector<std::shared_ptr<arrow::Array>>& chunks,
294  bool is_varlen) {
295  int64_t varlen = 0;
296  arrowFrag.chunks.resize(frag.last_chunk - frag.first_chunk + 1);
297  for (int i = frag.first_chunk, e = frag.last_chunk; i <= e; i++) {
298  int size, offset;
299  getSizeAndOffset(frag, chunks[i], i, size, offset);
300  arrowFrag.offset += offset;
301  arrowFrag.sz += size;
302  arrowFrag.chunks[i - frag.first_chunk] = chunks[i]->data();
303  auto& buffers = chunks[i]->data()->buffers;
304  if (is_varlen) {
305  if (buffers.size() <= 2) {
306  throw std::runtime_error(
307  "Importing fixed length arrow array as variable length column");
308  }
309  auto offsets_buffer = reinterpret_cast<const uint32_t*>(buffers[1]->data());
310  varlen += offsets_buffer[offset + size] - offsets_buffer[offset];
311  } else if (buffers.size() != 2) {
312  throw std::runtime_error(
313  "Importing varialbe length arrow array as fixed length column");
314  }
315  }
316  // return length of string buffer if array is none encoded string
317  return varlen;
318 }
319 
320 std::vector<Frag> calculateFragmentsOffsets(const arrow::ChunkedArray& array,
321  size_t maxFragRows) {
322  std::vector<Frag> fragments;
323  size_t sz = 0;
324  size_t offset = 0;
325  fragments.push_back({0, 0, 0, 0});
326  size_t num_chunks = (size_t)array.num_chunks();
327  for (size_t i = 0; i < num_chunks;) {
328  auto& chunk = *array.chunk(i);
329  auto& frag = *fragments.rbegin();
330  if (maxFragRows - sz > chunk.length() - offset) {
331  sz += chunk.length() - offset;
332  if (i == num_chunks - 1) {
333  fragments.rbegin()->last_chunk = num_chunks - 1;
334  fragments.rbegin()->last_chunk_size =
335  array.chunk((int)num_chunks - 1)->length() - offset;
336  }
337  offset = 0;
338  i++;
339  } else {
340  frag.last_chunk = i;
341  frag.last_chunk_size = maxFragRows - sz;
342  offset += maxFragRows - sz;
343  sz = 0;
344  fragments.push_back({i, offset, 0, 0});
345  }
346  }
347  if (fragments.rbegin()->first_chunk == fragments.rbegin()->first_chunk &&
348  fragments.rbegin()->last_chunk_size == 0) {
349  // remove empty fragment at the end if any
350  fragments.pop_back();
351  }
352  return fragments;
353 }
354 
356  std::pair<int, int> table_key,
357  const std::string& type,
358  const TableDescriptor& td,
359  const std::list<ColumnDescriptor>& cols,
360  Data_Namespace::AbstractBufferMgr* mgr,
361  const arrow::Table& table) {
362  std::map<std::array<int, 3>, StringDictionary*> dictionaries;
363  for (auto& c : cols) {
364  std::array<int, 3> col_key{table_key.first, table_key.second, c.columnId};
365  m_columns[col_key] = {};
366  // fsi registerTable runs under SqliteLock which does not allow invoking
367  // getMetadataForDict in other threads
368  if (c.columnType.is_dict_encoded_string()) {
369  auto dictDesc = catalog->getMetadataForDict(c.columnType.get_comp_param());
370  dictionaries[col_key] = dictDesc->stringDict.get();
371  }
372  }
373 
374  tbb::task_group tg;
375 
377  tbb::blocked_range(0, (int)cols.size()),
378  [this, &tg, &table_key, &td, mgr, &table, &cols, &dictionaries](auto range) {
379  auto columnIter = std::next(cols.begin(), range.begin());
380  for (auto col_idx = range.begin(); col_idx != range.end(); col_idx++) {
381  auto& c = *(columnIter++);
382 
383  if (c.isSystemCol) {
384  continue; // must be processed by base interface implementation
385  }
386 
387  // data comes like this - database_id, table_id, column_id, fragment_id
388  ChunkKey key{table_key.first, table_key.second, c.columnId, 0};
389  std::array<int, 3> col_key{table_key.first, table_key.second, c.columnId};
390 
391  if (col_idx >= table.num_columns()) {
392  LOG(ERROR) << "Number of columns read from Arrow (" << table.num_columns()
393  << ") mismatch CREATE TABLE request: " << cols.size();
394  break;
395  }
396 
397  auto arr_col_chunked_array = table.column(col_idx);
398  auto column_type = c.columnType.get_type();
399 
400  if (column_type != kDECIMAL && column_type != kNUMERIC &&
401  !c.columnType.is_string()) {
402  arr_col_chunked_array = replaceNullValues(column_type, arr_col_chunked_array);
403  }
404 
405  if (c.columnType.is_dict_encoded_string()) {
406  StringDictionary* dict = dictionaries[col_key];
407 
408  switch (arr_col_chunked_array->type()->id()) {
409  case arrow::Type::STRING:
410  arr_col_chunked_array =
411  createDictionaryEncodedColumn(dict, c, arr_col_chunked_array);
412  break;
413  case arrow::Type::DICTIONARY:
414  arr_col_chunked_array =
415  convertArrowDictionary(dict, c, arr_col_chunked_array);
416  break;
417  default:
418  CHECK(false);
419  }
420  } else if (column_type == kDECIMAL || column_type == kNUMERIC) {
421  switch (c.columnType.get_size()) {
422  case 2:
423  arr_col_chunked_array = createDecimalColumn<int16_t, arrow::Int16Array>(
424  c, arr_col_chunked_array);
425  break;
426  case 4:
427  arr_col_chunked_array = createDecimalColumn<int32_t, arrow::Int32Array>(
428  c, arr_col_chunked_array);
429  break;
430  case 8:
431  arr_col_chunked_array = createDecimalColumn<int64_t, arrow::Int64Array>(
432  c, arr_col_chunked_array);
433  break;
434  default:
435  // TODO: throw unsupported decimal type exception
436  CHECK(false);
437  break;
438  }
439  }
440 
441  auto fragments =
442  calculateFragmentsOffsets(*arr_col_chunked_array, td.maxFragRows);
443 
444  auto ctype = c.columnType.get_type();
445  auto& col = m_columns[col_key];
446  col.resize(fragments.size());
447 
448  for (size_t f = 0; f < fragments.size(); f++) {
449  key[3] = f;
450  auto& frag = col[f];
451  bool is_varlen = ctype == kTEXT && !c.columnType.is_dict_encoded_string();
452  size_t varlen = makeFragment(
453  fragments[f], frag, arr_col_chunked_array->chunks(), is_varlen);
454 
455  // create buffer descriptors
456  if (ctype == kTEXT && !c.columnType.is_dict_encoded_string()) {
457  auto k = key;
458  k.push_back(1);
459  {
460  auto b = mgr->createBuffer(k);
461  b->setSize(varlen);
462  b->initEncoder(c.columnType);
463  }
464  k[4] = 2;
465  {
466  auto b = mgr->createBuffer(k);
467  b->setSqlType(SQLTypeInfo(kINT, false));
468  b->setSize(frag.sz * b->getSqlType().get_size());
469  }
470  } else {
471  auto b = mgr->createBuffer(key);
472  b->setSize(frag.sz * c.columnType.get_size());
473  b->initEncoder(c.columnType);
474  size_t type_size = c.columnType.get_size();
475  tg.run([b, fr = &frag, type_size]() {
476  size_t sz = 0;
477  for (size_t i = 0; i < fr->chunks.size(); i++) {
478  auto& chunk = fr->chunks[i];
479  int offset = (i == 0) ? fr->offset : 0;
480  size_t size = (i == fr->chunks.size() - 1) ? (fr->sz - sz)
481  : (chunk->length - offset);
482  sz += size;
483  auto data = chunk->buffers[1]->data();
484  b->getEncoder()->updateStatsEncoded(
485  (const int8_t*)data + offset * type_size, size);
486  }
487  });
488  b->getEncoder()->setNumElems(frag.sz);
489  }
490  }
491  }
492  }); // each col and fragment
493 
494  // wait untill all stats have been updated
495  tg.wait();
496 }
497 
499  const std::vector<ForeignStorageColumnBuffer>& column_buffers) {
500  CHECK(false);
501 }
502 
504  const SQLTypeInfo& sql_type,
505  int8_t* dest,
506  const size_t numBytes) {
507  std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
508  auto& frag = m_columns.at(col_key).at(chunk_key[3]);
509 
510  CHECK(!frag.chunks.empty() || !chunk_key[3]);
511  int64_t sz = 0, copied = 0;
512  int varlen_offset = 0;
513  size_t read_size = 0;
514  for (size_t i = 0; i < frag.chunks.size(); i++) {
515  auto& array_data = frag.chunks[i];
516  int offset = (i == 0) ? frag.offset : 0;
517  size_t size = (i == frag.chunks.size() - 1) ? (frag.sz - read_size)
518  : (array_data->length - offset);
519  read_size += size;
520  arrow::Buffer* bp = nullptr;
521  if (sql_type.is_dict_encoded_string()) {
522  // array_data->buffers[1] stores dictionary indexes
523  bp = array_data->buffers[1].get();
524  } else if (sql_type.get_type() == kTEXT) {
525  CHECK_GE(array_data->buffers.size(), 3UL);
526  // array_data->buffers[2] stores string array
527  bp = array_data->buffers[2].get();
528  } else if (array_data->null_count != array_data->length) {
529  // any type except strings (none encoded strings offsets go here as well)
530  CHECK_GE(array_data->buffers.size(), 2UL);
531  bp = array_data->buffers[1].get();
532  }
533  CHECK(bp);
534  // offset buffer for none encoded strings need to be merged
535  if (chunk_key.size() == 5 && chunk_key[4] == 2) {
536  auto data = reinterpret_cast<const uint32_t*>(bp->data()) + offset;
537  auto dest_ui32 = reinterpret_cast<uint32_t*>(dest);
538  // as size contains count of string in chunk slice it would always be one less
539  // then offsets array size
540  sz = (size + 1) * sizeof(uint32_t);
541  if (sz > 0) {
542  if (i != 0) {
543  // We merge arrow chunks with string offsets into a single contigous fragment.
544  // Each string is represented by a pair of offsets, thus size of offset table
545  // is num strings + 1. When merging two chunks, the last number in the first
546  // chunk duplicates the first number in the second chunk, so we skip it.
547  data++;
548  sz -= sizeof(uint32_t);
549  } else {
550  // As we support cases when fragment starts with offset of arrow chunk we need
551  // to substract the first element of the first chunk from all elements in that
552  // fragment
553  varlen_offset -= data[0];
554  }
555  // We also re-calculate offsets in the second chunk as it is a continuation of
556  // the first one.
557  std::transform(data,
558  data + (sz / sizeof(uint32_t)),
559  dest_ui32,
560  [varlen_offset](uint32_t val) { return val + varlen_offset; });
561  varlen_offset += data[(sz / sizeof(uint32_t)) - 1];
562  }
563  } else {
564  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
565  if (fixed_type) {
566  std::memcpy(
567  dest,
568  bp->data() + (array_data->offset + offset) * (fixed_type->bit_width() / 8),
569  sz = size * (fixed_type->bit_width() / 8));
570  } else {
571  auto offsets_buffer =
572  reinterpret_cast<const uint32_t*>(array_data->buffers[1]->data());
573  auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
574  auto string_buffer_size =
575  offsets_buffer[offset + array_data->offset + size] - string_buffer_offset;
576  std::memcpy(dest, bp->data() + string_buffer_offset, sz = string_buffer_size);
577  }
578  }
579  dest += sz;
580  copied += sz;
581  }
582  CHECK_EQ(numBytes, size_t(copied));
583 }
584 
586  const SQLTypeInfo& sql_type,
587  const size_t numBytes) {
588  std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
589  auto& frag = m_columns.at(col_key).at(chunk_key[3]);
590 
591  // fragment should be continious to allow zero copy
592  if (frag.chunks.size() != 1) {
593  return nullptr;
594  }
595 
596  auto& array_data = frag.chunks[0];
597  int offset = frag.offset;
598 
599  arrow::Buffer* bp = nullptr;
600  if (sql_type.is_dict_encoded_string()) {
601  // array_data->buffers[1] stores dictionary indexes
602  bp = array_data->buffers[1].get();
603  } else if (sql_type.get_type() == kTEXT) {
604  CHECK_GE(array_data->buffers.size(), 3UL);
605  // array_data->buffers[2] stores string array
606  bp = array_data->buffers[2].get();
607  } else if (array_data->null_count != array_data->length) {
608  // any type except strings (none encoded strings offsets go here as well)
609  CHECK_GE(array_data->buffers.size(), 2UL);
610  bp = array_data->buffers[1].get();
611  }
612 
613  // arrow buffer is empty, it means we should fill fragment with null's in read function
614  if (!bp) {
615  return nullptr;
616  }
617 
618  auto data = reinterpret_cast<int8_t*>(const_cast<uint8_t*>(bp->data()));
619 
620  // if buffer is null encoded string index buffer
621  if (chunk_key.size() == 5 && chunk_key[4] == 2) {
622  // if offset != 0 we need to recalculate index buffer by adding offset to each index
623  if (offset != 0) {
624  return nullptr;
625  } else {
626  return data;
627  }
628  }
629 
630  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
631  if (fixed_type) {
632  return data + (array_data->offset + offset) * (fixed_type->bit_width() / 8);
633  }
634  // if buffer is none encoded string data buffer
635  // then we should find it's offset in offset buffer
636  auto offsets_buffer = reinterpret_cast<const uint32_t*>(array_data->buffers[1]->data());
637  auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
638  return data + string_buffer_offset;
639 }
640 
641 void ArrowForeignStorageBase::dropTable(const int db_id, const int table_id) {
642  auto it = m_columns.lower_bound({db_id, table_id, 0});
643  while (it->first[0] == db_id && it->first[1] == table_id) {
644  it = m_columns.erase(it);
645  }
646 }
647 
648 std::shared_ptr<arrow::ChunkedArray>
650  StringDictionary* dict,
651  const ColumnDescriptor& c,
652  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
653  // calculate offsets for every fragment in bulk
654  size_t bulk_size = 0;
655  std::vector<int> offsets(arr_col_chunked_array->num_chunks());
656  for (int i = 0; i < arr_col_chunked_array->num_chunks(); i++) {
657  offsets[i] = bulk_size;
658  bulk_size += arr_col_chunked_array->chunk(i)->length();
659  }
660 
661  std::vector<std::string_view> bulk(bulk_size);
662 
664  tbb::blocked_range<int>(0, arr_col_chunked_array->num_chunks()),
665  [&bulk, &arr_col_chunked_array, &offsets](const tbb::blocked_range<int>& r) {
666  for (int i = r.begin(); i < r.end(); i++) {
667  auto chunk = std::static_pointer_cast<arrow::StringArray>(
668  arr_col_chunked_array->chunk(i));
669  auto offset = offsets[i];
670  for (int j = 0; j < chunk->length(); j++) {
671  auto view = chunk->GetView(j);
672  bulk[offset + j] = std::string_view(view.data(), view.length());
673  }
674  }
675  });
676 
677  std::shared_ptr<arrow::Buffer> indices_buf;
678  auto res = arrow::AllocateBuffer(bulk_size * sizeof(int32_t));
679  CHECK(res.ok());
680  indices_buf = std::move(res).ValueOrDie();
681  auto raw_data = reinterpret_cast<int*>(indices_buf->mutable_data());
682  dict->getOrAddBulk(bulk, raw_data);
683  auto array = std::make_shared<arrow::Int32Array>(bulk_size, indices_buf);
684  return std::make_shared<arrow::ChunkedArray>(array);
685 }
686 
687 std::shared_ptr<arrow::ChunkedArray> ArrowForeignStorageBase::convertArrowDictionary(
688  StringDictionary* dict,
689  const ColumnDescriptor& c,
690  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
691  // TODO: allocate one big array and split it by fragments as it is done in
692  // createDictionaryEncodedColumn
693  std::vector<std::shared_ptr<arrow::Array>> converted_chunks;
694  for (auto& chunk : arr_col_chunked_array->chunks()) {
695  auto dict_array = std::static_pointer_cast<arrow::DictionaryArray>(chunk);
696  auto values = std::static_pointer_cast<arrow::StringArray>(dict_array->dictionary());
697  std::vector<std::string_view> strings(values->length());
698  for (int i = 0; i < values->length(); i++) {
699  auto view = values->GetView(i);
700  strings[i] = std::string_view(view.data(), view.length());
701  }
702  auto arrow_indices =
703  std::static_pointer_cast<arrow::Int32Array>(dict_array->indices());
704  std::vector<int> indices_mapping(values->length());
705  dict->getOrAddBulk(strings, indices_mapping.data());
706 
707  // create new arrow chunk with remapped indices
708  std::shared_ptr<arrow::Buffer> dict_indices_buf;
709  auto res = arrow::AllocateBuffer(arrow_indices->length() * sizeof(int32_t));
710  CHECK(res.ok());
711  dict_indices_buf = std::move(res).ValueOrDie();
712  auto raw_data = reinterpret_cast<int32_t*>(dict_indices_buf->mutable_data());
713 
714  for (int i = 0; i < arrow_indices->length(); i++) {
715  raw_data[i] = indices_mapping[arrow_indices->Value(i)];
716  }
717 
718  converted_chunks.push_back(
719  std::make_shared<arrow::Int32Array>(arrow_indices->length(), dict_indices_buf));
720  }
721  return std::make_shared<arrow::ChunkedArray>(converted_chunks);
722 }
723 
724 template <typename T, typename ChunkType>
725 std::shared_ptr<arrow::ChunkedArray> ArrowForeignStorageBase::createDecimalColumn(
726  const ColumnDescriptor& c,
727  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
728  size_t column_size = 0;
729  std::vector<int> offsets(arr_col_chunked_array->num_chunks());
730  for (int i = 0; i < arr_col_chunked_array->num_chunks(); i++) {
731  offsets[i] = column_size;
732  column_size += arr_col_chunked_array->chunk(i)->length();
733  }
734 
735  std::shared_ptr<arrow::Buffer> result_buffer;
736  auto res = arrow::AllocateBuffer(column_size * c.columnType.get_size());
737  CHECK(res.ok());
738  result_buffer = std::move(res).ValueOrDie();
739 
740  T* buffer_data = reinterpret_cast<T*>(result_buffer->mutable_data());
742  tbb::blocked_range(0, arr_col_chunked_array->num_chunks()),
743  [buffer_data, &offsets, arr_col_chunked_array](auto& range) {
744  for (int chunk_idx = range.begin(); chunk_idx < range.end(); chunk_idx++) {
745  auto offset = offsets[chunk_idx];
746  T* chunk_buffer = buffer_data + offset;
747 
748  auto decimalArray = std::static_pointer_cast<arrow::Decimal128Array>(
749  arr_col_chunked_array->chunk(chunk_idx));
750  auto empty =
751  arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
752  for (int i = 0; i < decimalArray->length(); i++) {
753  if (empty || decimalArray->null_count() == decimalArray->length() ||
754  decimalArray->IsNull(i)) {
755  chunk_buffer[i] = inline_int_null_value<T>();
756  } else {
757  arrow::Decimal128 val(decimalArray->GetValue(i));
758  chunk_buffer[i] =
759  static_cast<int64_t>(val); // arrow can cast only to int64_t
760  }
761  }
762  }
763  });
764  auto array = std::make_shared<ChunkType>(column_size, result_buffer);
765  return std::make_shared<arrow::ChunkedArray>(array);
766 }
767 
769  public:
771 
772  void prepareTable(const int db_id,
773  const std::string& type,
774  TableDescriptor& td,
775  std::list<ColumnDescriptor>& cols) override;
777  std::pair<int, int> table_key,
778  const std::string& type,
779  const TableDescriptor& td,
780  const std::list<ColumnDescriptor>& cols,
781  Data_Namespace::AbstractBufferMgr* mgr) override;
782 
783  std::string getType() const override;
784 
785  std::string name;
786 
787  static std::map<std::string, std::shared_ptr<arrow::Table>> tables;
788 };
789 
790 std::map<std::string, std::shared_ptr<arrow::Table>> ArrowForeignStorage::tables =
791  std::map<std::string, std::shared_ptr<arrow::Table>>();
792 
793 static SQLTypeInfo getOmnisciType(const arrow::DataType& type) {
794  using namespace arrow;
795  switch (type.id()) {
796  case Type::INT8:
797  return SQLTypeInfo(kTINYINT, false);
798  case Type::INT16:
799  return SQLTypeInfo(kSMALLINT, false);
800  case Type::INT32:
801  return SQLTypeInfo(kINT, false);
802  case Type::INT64:
803  return SQLTypeInfo(kBIGINT, false);
804  case Type::BOOL:
805  return SQLTypeInfo(kBOOLEAN, false);
806  case Type::FLOAT:
807  return SQLTypeInfo(kFLOAT, false);
808  case Type::DATE32:
809  case Type::DATE64:
810  return SQLTypeInfo(kDATE, false);
811  case Type::DOUBLE:
812  return SQLTypeInfo(kDOUBLE, false);
813  // uncomment when arrow 2.0 will be released and modin support for dictionary types
814  // in read_csv would be implemented
815 
816  // case Type::DICTIONARY: {
817  // auto type = SQLTypeInfo(kTEXT, false, kENCODING_DICT);
818  // // this is needed because createTable forces type.size to be equal to
819  // // comp_param / 8, no matter what type.size you set here
820  // type.set_comp_param(sizeof(uint32_t) * 8);
821  // return type;
822  // }
823  // case Type::STRING:
824  // return SQLTypeInfo(kTEXT, false, kENCODING_NONE);
825 
826  case Type::STRING: {
827  auto type = SQLTypeInfo(kTEXT, false, kENCODING_DICT);
828  // this is needed because createTable forces type.size to be equal to
829  // comp_param / 8, no matter what type.size you set here
830  type.set_comp_param(sizeof(uint32_t) * 8);
831  return type;
832  }
833  case Type::DECIMAL: {
834  const auto& decimal_type = static_cast<const arrow::DecimalType&>(type);
835  return SQLTypeInfo(kDECIMAL, decimal_type.precision(), decimal_type.scale(), false);
836  }
837  case Type::TIME32:
838  return SQLTypeInfo(kTIME, false);
839  case Type::TIMESTAMP:
840  switch (static_cast<const arrow::TimestampType&>(type).unit()) {
841  case TimeUnit::SECOND:
842  return SQLTypeInfo(kTIMESTAMP, 0, 0);
843  case TimeUnit::MILLI:
844  return SQLTypeInfo(kTIMESTAMP, 3, 0);
845  case TimeUnit::MICRO:
846  return SQLTypeInfo(kTIMESTAMP, 6, 0);
847  case TimeUnit::NANO:
848  return SQLTypeInfo(kTIMESTAMP, 9, 0);
849  }
850  default:
851  throw std::runtime_error(type.ToString() + " is not yet supported.");
852  }
853 }
854 
856  const std::string& name,
857  TableDescriptor& td,
858  std::list<ColumnDescriptor>& cols) {
859  td.hasDeletedCol = false;
860  this->name = name;
861  auto table = tables[name];
862  for (auto& field : table->schema()->fields()) {
863  ColumnDescriptor cd;
864  cd.columnName = field->name();
865  cd.columnType = getOmnisciType(*field->type());
866  cols.push_back(cd);
867  }
868 }
869 
871  std::pair<int, int> table_key,
872  const std::string& info,
873  const TableDescriptor& td,
874  const std::list<ColumnDescriptor>& cols,
875  Data_Namespace::AbstractBufferMgr* mgr) {
876  parseArrowTable(catalog, table_key, info, td, cols, mgr, *(tables[name].get()));
877 }
878 
879 std::string ArrowForeignStorage::getType() const {
880  LOG(INFO) << "CSV backed temporary tables has been activated. Create table `with "
881  "(storage_type='CSV:path/to/file.csv');`\n";
882  return "ARROW";
883 }
884 
885 void setArrowTable(std::string name, std::shared_ptr<arrow::Table> table) {
887 }
888 
889 void releaseArrowTable(std::string name) {
890  ArrowForeignStorage::tables.erase(name);
891 }
892 
893 void registerArrowForeignStorage(std::shared_ptr<ForeignStorageInterface> fsi) {
894  fsi->registerPersistentStorageInterface(std::make_unique<ArrowForeignStorage>());
895 }
896 
898  public:
900 
901  void prepareTable(const int db_id,
902  const std::string& type,
903  TableDescriptor& td,
904  std::list<ColumnDescriptor>& cols) override;
906  std::pair<int, int> table_key,
907  const std::string& type,
908  const TableDescriptor& td,
909  const std::list<ColumnDescriptor>& cols,
910  Data_Namespace::AbstractBufferMgr* mgr) override;
911 
912  std::string getType() const override;
913 };
914 
916  const std::string& type,
917  TableDescriptor& td,
918  std::list<ColumnDescriptor>& cols) {
919  td.hasDeletedCol = false;
920 }
921 
922 // TODO: this overlaps with getArrowType() from ArrowResultSetConverter.cpp but with few
923 // differences in kTEXT and kDATE
924 static std::shared_ptr<arrow::DataType> getArrowImportType(const SQLTypeInfo type) {
925  using namespace arrow;
926  auto ktype = type.get_type();
927  if (IS_INTEGER(ktype)) {
928  switch (type.get_size()) {
929  case 1:
930  return int8();
931  case 2:
932  return int16();
933  case 4:
934  return int32();
935  case 8:
936  return int64();
937  default:
938  CHECK(false);
939  }
940  }
941  switch (ktype) {
942  case kBOOLEAN:
943  return arrow::boolean();
944  case kFLOAT:
945  return float32();
946  case kDOUBLE:
947  return float64();
948  case kCHAR:
949  case kVARCHAR:
950  case kTEXT:
951  return utf8();
952  case kDECIMAL:
953  case kNUMERIC:
954  return decimal(type.get_precision(), type.get_scale());
955  case kTIME:
956  return time32(TimeUnit::SECOND);
957  case kDATE:
958 #ifdef HAVE_CUDA
959  return arrow::date64();
960 #else
961  return arrow::date32();
962 #endif
963  case kTIMESTAMP:
964  switch (type.get_precision()) {
965  case 0:
966  return timestamp(TimeUnit::SECOND);
967  case 3:
968  return timestamp(TimeUnit::MILLI);
969  case 6:
970  return timestamp(TimeUnit::MICRO);
971  case 9:
972  return timestamp(TimeUnit::NANO);
973  default:
974  throw std::runtime_error("Unsupported timestamp precision for Arrow: " +
975  std::to_string(type.get_precision()));
976  }
977  case kARRAY:
978  case kINTERVAL_DAY_TIME:
980  default:
981  throw std::runtime_error(type.get_type_name() + " is not supported in Arrow.");
982  }
983  return nullptr;
984 }
985 
987  std::pair<int, int> table_key,
988  const std::string& info,
989  const TableDescriptor& td,
990  const std::list<ColumnDescriptor>& cols,
991  Data_Namespace::AbstractBufferMgr* mgr) {
992  const DataframeTableDescriptor* df_td =
993  dynamic_cast<const DataframeTableDescriptor*>(&td);
994  bool isDataframe = df_td ? true : false;
995  std::unique_ptr<DataframeTableDescriptor> df_td_owned;
996  if (!isDataframe) {
997  df_td_owned = std::make_unique<DataframeTableDescriptor>(td);
998  CHECK(df_td_owned);
999  df_td = df_td_owned.get();
1000  }
1001 
1002 #if defined(ENABLE_ARROW_4) || defined(_WIN32)
1003  auto io_context = arrow::io::default_io_context();
1004 #else
1005  auto io_context = arrow::default_memory_pool();
1006 #endif
1007  auto arrow_parse_options = arrow::csv::ParseOptions::Defaults();
1008  arrow_parse_options.quoting = false;
1009  arrow_parse_options.escaping = false;
1010  arrow_parse_options.newlines_in_values = false;
1011  arrow_parse_options.delimiter = *df_td->delimiter.c_str();
1012  auto arrow_read_options = arrow::csv::ReadOptions::Defaults();
1013  arrow_read_options.use_threads = true;
1014 
1015  arrow_read_options.block_size = 20 * 1024 * 1024;
1016  arrow_read_options.autogenerate_column_names = false;
1017  arrow_read_options.skip_rows =
1018  df_td->hasHeader ? (df_td->skipRows + 1) : df_td->skipRows;
1019 
1020  auto arrow_convert_options = arrow::csv::ConvertOptions::Defaults();
1021  arrow_convert_options.check_utf8 = false;
1022  arrow_convert_options.include_columns = arrow_read_options.column_names;
1023  arrow_convert_options.strings_can_be_null = true;
1024 
1025  for (auto& c : cols) {
1026  if (c.isSystemCol) {
1027  continue; // must be processed by base interface implementation
1028  }
1029  arrow_convert_options.column_types.emplace(c.columnName,
1030  getArrowImportType(c.columnType));
1031  arrow_read_options.column_names.push_back(c.columnName);
1032  }
1033 
1034  std::shared_ptr<arrow::io::ReadableFile> inp;
1035  auto file_result = arrow::io::ReadableFile::Open(info.c_str());
1036  ARROW_THROW_NOT_OK(file_result.status());
1037  inp = file_result.ValueOrDie();
1038 
1039  auto table_reader_result = arrow::csv::TableReader::Make(
1040  io_context, inp, arrow_read_options, arrow_parse_options, arrow_convert_options);
1041  ARROW_THROW_NOT_OK(table_reader_result.status());
1042  auto table_reader = table_reader_result.ValueOrDie();
1043 
1044  std::shared_ptr<arrow::Table> arrowTable;
1045  auto time = measure<>::execution([&]() {
1046  auto arrow_table_result = table_reader->Read();
1047  ARROW_THROW_NOT_OK(arrow_table_result.status());
1048  arrowTable = arrow_table_result.ValueOrDie();
1049  });
1050 
1051  VLOG(1) << "Read Arrow CSV file " << info << " in " << time << "ms";
1052 
1053  arrow::Table& table = *arrowTable.get();
1054  parseArrowTable(catalog, table_key, info, td, cols, mgr, table);
1055 }
1056 
1057 std::string ArrowCsvForeignStorage::getType() const {
1058  LOG(INFO) << "CSV backed temporary tables has been activated. Create table `with "
1059  "(storage_type='CSV:path/to/file.csv');`\n";
1060  return "CSV";
1061 }
1062 
1063 void registerArrowCsvForeignStorage(std::shared_ptr<ForeignStorageInterface> fsi) {
1064  fsi->registerPersistentStorageInterface(std::make_unique<ArrowCsvForeignStorage>());
1065 }
std::shared_ptr< arrow::ChunkedArray > replaceNullValues(const SQLTypeInfo &columnType, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
static SQLTypeInfo getOmnisciType(const arrow::DataType &type)
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
void prepareTable(const int db_id, const std::string &type, TableDescriptor &td, std::list< ColumnDescriptor > &cols) override
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
Definition: sqltypes.h:76
void convertBoolBitmapBufferWithoutNulls(int8_t *dst, const uint8_t *src, int64_t length)
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
void registerArrowForeignStorage(std::shared_ptr< ForeignStorageInterface > fsi)
void dropTable(const int db_id, const int table_id) override
#define LOG(tag)
Definition: Logger.h:285
bool is_fp() const
Definition: sqltypes.h:573
HOST DEVICE int get_scale() const
Definition: sqltypes.h:396
std::string getType() const override
void read(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t numBytes) override
#define CHECK_GE(x, y)
Definition: Logger.h:306
size_t first_chunk_offset
void setArrowTable(std::string name, std::shared_ptr< arrow::Table > table)
void append(const std::vector< ForeignStorageColumnBuffer > &column_buffers) override
void registerArrowCsvForeignStorage(std::shared_ptr< ForeignStorageInterface > fsi)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
int8_t * tryZeroCopy(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, const size_t numBytes) override
std::shared_ptr< arrow::ChunkedArray > convertArrowDictionary(StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
tuple STRING
Definition: dtypes.py:31
void releaseArrowTable(std::string name)
std::string to_string(char const *&&v)
std::shared_ptr< StringDictionary > stringDict
static std::map< std::string, std::shared_ptr< arrow::Table > > tables
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
void prepareTable(const int db_id, const std::string &type, TableDescriptor &td, std::list< ColumnDescriptor > &cols) override
std::string getType() const override
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
CONSTEXPR DEVICE bool is_null(const T &value)
bool is_integer() const
Definition: sqltypes.h:567
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
void parseArrowTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr, const arrow::Table &table)
std::vector< std::shared_ptr< arrow::ArrayData > > chunks
void getSizeAndOffset(const Frag &frag, const std::shared_ptr< arrow::Array > &chunk, size_t i, int &size, int &offset)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1907
specifies the content in-memory of a row in the column metadata table
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:329
bool is_boolean() const
Definition: sqltypes.h:582
static std::shared_ptr< arrow::DataType > getArrowImportType(const SQLTypeInfo type)
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
int get_precision() const
Definition: sqltypes.h:394
void convertBoolBitmapBufferWithNulls(int8_t *dst, const uint8_t *src, const uint8_t *bitmap, int64_t length, int8_t null_value)
Definition: sqltypes.h:79
Definition: sqltypes.h:80
size_t last_chunk
std::string get_type_name() const
Definition: sqltypes.h:484
#define IS_INTEGER(T)
Definition: sqltypes.h:304
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
Definition: sqltypes.h:68
size_t last_chunk_size
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
std::shared_ptr< arrow::ChunkedArray > createDictionaryEncodedColumn(StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
void registerTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr) override
std::vector< Frag > calculateFragmentsOffsets(const arrow::ChunkedArray &array, size_t maxFragRows)
std::shared_ptr< arrow::ChunkedArray > replaceNullValuesImpl(std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
#define CHECK(condition)
Definition: Logger.h:291
size_t first_chunk
std::shared_ptr< arrow::ChunkedArray > createDecimalColumn(const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
For unencoded strings.
V get_null_value()
bool is_dict_encoded_string() const
Definition: sqltypes.h:643
Definition: sqltypes.h:72
SQLTypeInfo columnType
string name
Definition: setup.in.py:72
int64_t makeFragment(const Frag &frag, ArrowFragment &arrowFrag, const std::vector< std::shared_ptr< arrow::Array >> &chunks, bool is_varlen)
std::string columnName
#define VLOG(n)
Definition: Logger.h:388
constexpr auto is_datetime(SQLTypes type)
Definition: sqltypes.h:325
void registerTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr) override
specifies the content in-memory of a row in the table metadata table