19 #include <arrow/api.h>
20 #include <arrow/csv/reader.h>
21 #include <arrow/io/file.h>
22 #include <arrow/util/decimal.h>
23 #include <tbb/parallel_for.h>
24 #include <tbb/task_group.h>
47 std::vector<std::shared_ptr<arrow::ArrayData>>
chunks;
52 void append(
const std::vector<ForeignStorageColumnBuffer>& column_buffers)
override;
57 const size_t numBytes)
override;
61 const size_t numBytes)
override;
63 void dropTable(
const int db_id,
const int table_id)
override;
66 std::pair<int, int> table_key,
67 const std::string&
type,
69 const std::list<ColumnDescriptor>& cols,
70 Data_Namespace::AbstractBufferMgr* mgr,
71 const arrow::Table& table);
76 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
81 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
83 template <
typename T,
typename ChunkType>
86 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
90 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
94 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
97 const std::shared_ptr<arrow::Array>& chunk,
104 const std::vector<std::shared_ptr<arrow::Array>>& chunks,
107 std::map<std::array<int, 3>, std::vector<ArrowFragment>>
m_columns;
112 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
113 const size_t typeSize = columnType.
get_size();
117 return replaceNullValuesImpl<int8_t>(arr_col_chunked_array);
119 return replaceNullValuesImpl<int16_t>(arr_col_chunked_array);
121 return replaceNullValuesImpl<int32_t>(arr_col_chunked_array);
123 return replaceNullValuesImpl<int64_t>(arr_col_chunked_array);
128 }
else if (columnType.
is_fp()) {
131 return replaceNullValuesImpl<float>(arr_col_chunked_array);
133 return replaceNullValuesImpl<double>(arr_col_chunked_array);
136 return replaceNullValuesImpl<bool>(arr_col_chunked_array);
144 const uint8_t* bitmap,
147 for (int64_t bitmap_idx = 0; bitmap_idx < length / 8; ++bitmap_idx) {
148 auto source = src[bitmap_idx];
149 auto dest = dst + bitmap_idx * 8;
150 auto inversed_bitmap = ~bitmap[bitmap_idx];
151 for (int8_t bitmap_offset = 0; bitmap_offset < 8; ++bitmap_offset) {
152 auto is_null = (inversed_bitmap >> bitmap_offset) & 1;
153 auto val = (source >> bitmap_offset) & 1;
158 for (int64_t j = (length / 8) * 8; j < length; ++j) {
159 auto is_null = (~bitmap[length / 8] >> (j % 8)) & 1;
160 auto val = (src[length / 8] >> (j % 8)) & 1;
161 dst[j] =
is_null ? null_value : val;
168 for (int64_t bitmap_idx = 0; bitmap_idx < length / 8; ++bitmap_idx) {
169 auto source = src[bitmap_idx];
170 auto dest = dst + bitmap_idx * 8;
171 for (int8_t bitmap_offset = 0; bitmap_offset < 8; ++bitmap_offset) {
172 dest[bitmap_offset] = (source >> bitmap_offset) & 1;
176 for (int64_t j = (length / 8) * 8; j < length; ++j) {
177 dst[j] = (src[length / 8] >> (j % 8)) & 1;
183 std::enable_if_t<!std::is_same_v<V, bool> && std::is_integral<V>::value,
int> = 0>
185 return inline_int_null_value<V>();
188 template <
typename V, std::enable_if_t<std::is_same_v<V,
bool>,
int> = 0>
190 return inline_int_null_value<int8_t>();
193 template <typename V, std::enable_if_t<std::is_floating_point<V>::value,
int> = 0>
195 return inline_fp_null_value<V>();
198 template <
typename T>
200 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
201 if ((!std::is_same_v<T, bool>)&&(arr_col_chunked_array->null_count() == 0)) {
203 return arr_col_chunked_array;
206 auto null_value = get_null_value<T>();
209 arrow::AllocateBuffer(
sizeof(
T) * arr_col_chunked_array->length()).ValueOrDie();
210 auto resultData =
reinterpret_cast<T*
>(resultBuf->mutable_data());
213 tbb::blocked_range<size_t>(0, arr_col_chunked_array->num_chunks()),
214 [&](
const tbb::blocked_range<size_t>& r) {
215 for (
size_t c = r.begin(); c != r.end(); ++c) {
217 for (
size_t i = 0; i < c; i++) {
218 offset += arr_col_chunked_array->chunk(i)->length();
220 auto resWithOffset = resultData + offset;
222 auto chunk = arr_col_chunked_array->chunk(c);
224 if (chunk->null_count() == chunk->length()) {
225 std::fill(resWithOffset, resWithOffset + chunk->length(), null_value);
229 auto chunkData =
reinterpret_cast<const T*
>(chunk->data()->buffers[1]->data());
231 const uint8_t* bitmap_data = chunk->null_bitmap_data();
232 const int64_t length = chunk->length();
234 if (chunk->null_count() == 0) {
235 if constexpr (std::is_same_v<T, bool>) {
237 reinterpret_cast<int8_t*>(resWithOffset),
238 reinterpret_cast<const uint8_t*>(chunkData),
241 std::copy(chunkData, chunkData + chunk->length(), resWithOffset);
246 if constexpr (std::is_same_v<T, bool>) {
248 reinterpret_cast<const uint8_t*>(chunkData),
253 for (int64_t bitmap_idx = 0; bitmap_idx < length / 8; ++bitmap_idx) {
254 auto source = chunkData + bitmap_idx * 8;
255 auto dest = resWithOffset + bitmap_idx * 8;
256 auto inversed_bitmap = ~bitmap_data[bitmap_idx];
257 for (int8_t bitmap_offset = 0; bitmap_offset < 8; ++bitmap_offset) {
258 auto is_null = (inversed_bitmap >> bitmap_offset) & 1;
259 auto val =
is_null ? null_value : source[bitmap_offset];
260 dest[bitmap_offset] = val;
264 for (int64_t j = length / 8 * 8; j < length; ++j) {
265 auto is_null = (~bitmap_data[length / 8] >> (j % 8)) & 1;
266 auto val =
is_null ? null_value : chunkData[j];
267 resWithOffset[j] = val;
273 using ArrowType =
typename arrow::CTypeTraits<T>::ArrowType;
274 using ArrayType =
typename arrow::TypeTraits<ArrowType>::ArrayType;
277 std::make_shared<ArrayType>(arr_col_chunked_array->length(), std::move(resultBuf));
278 return std::make_shared<arrow::ChunkedArray>(array);
282 const std::shared_ptr<arrow::Array>& chunk,
293 const std::vector<std::shared_ptr<arrow::Array>>& chunks,
300 arrowFrag.
offset += offset;
301 arrowFrag.
sz += size;
303 auto& buffers = chunks[i]->data()->buffers;
305 if (buffers.size() <= 2) {
306 throw std::runtime_error(
307 "Importing fixed length arrow array as variable length column");
309 auto offsets_buffer =
reinterpret_cast<const uint32_t*
>(buffers[1]->data());
310 varlen += offsets_buffer[offset + size] - offsets_buffer[offset];
311 }
else if (buffers.size() != 2) {
312 throw std::runtime_error(
313 "Importing varialbe length arrow array as fixed length column");
321 size_t maxFragRows) {
322 std::vector<Frag> fragments;
325 fragments.push_back({0, 0, 0, 0});
326 size_t num_chunks = (size_t)array.num_chunks();
327 for (
size_t i = 0; i < num_chunks;) {
328 auto& chunk = *array.chunk(i);
329 auto& frag = *fragments.rbegin();
330 if (maxFragRows - sz > chunk.length() - offset) {
331 sz += chunk.length() - offset;
332 if (i == num_chunks - 1) {
333 fragments.rbegin()->last_chunk = num_chunks - 1;
334 fragments.rbegin()->last_chunk_size =
335 array.chunk((
int)num_chunks - 1)->length() - offset;
341 frag.last_chunk_size = maxFragRows - sz;
342 offset += maxFragRows - sz;
344 fragments.push_back({i, offset, 0, 0});
347 if (fragments.rbegin()->first_chunk == fragments.rbegin()->first_chunk &&
348 fragments.rbegin()->last_chunk_size == 0) {
350 fragments.pop_back();
356 std::pair<int, int> table_key,
357 const std::string&
type,
359 const std::list<ColumnDescriptor>& cols,
360 Data_Namespace::AbstractBufferMgr* mgr,
361 const arrow::Table& table) {
363 for (
auto& c : cols) {
364 std::array<int, 3> col_key{table_key.first, table_key.second, c.columnId};
368 if (c.columnType.is_dict_encoded_string()) {
370 dictionaries[col_key] = dictDesc->
stringDict.get();
377 tbb::blocked_range(0, (
int)cols.size()),
378 [
this, &tg, &table_key, &td, mgr, &table, &cols, &dictionaries](
auto range) {
379 auto columnIter = std::next(cols.begin(), range.begin());
380 for (
auto col_idx = range.begin(); col_idx != range.end(); col_idx++) {
381 auto& c = *(columnIter++);
388 ChunkKey key{table_key.first, table_key.second, c.columnId, 0};
389 std::array<int, 3> col_key{table_key.first, table_key.second, c.columnId};
391 if (col_idx >= table.num_columns()) {
392 LOG(
ERROR) <<
"Number of columns read from Arrow (" << table.num_columns()
393 <<
") mismatch CREATE TABLE request: " << cols.size();
397 auto arr_col_chunked_array = table.column(col_idx);
398 auto column_type = c.columnType.get_type();
401 !c.columnType.is_string()) {
405 if (c.columnType.is_dict_encoded_string()) {
408 switch (arr_col_chunked_array->type()->id()) {
410 arr_col_chunked_array =
413 case arrow::Type::DICTIONARY:
414 arr_col_chunked_array =
421 switch (c.columnType.get_size()) {
423 arr_col_chunked_array = createDecimalColumn<int16_t, arrow::Int16Array>(
424 c, arr_col_chunked_array);
427 arr_col_chunked_array = createDecimalColumn<int32_t, arrow::Int32Array>(
428 c, arr_col_chunked_array);
431 arr_col_chunked_array = createDecimalColumn<int64_t, arrow::Int64Array>(
432 c, arr_col_chunked_array);
444 auto ctype = c.columnType.get_type();
446 col.resize(fragments.size());
448 for (
size_t f = 0;
f < fragments.size();
f++) {
451 bool is_varlen = ctype ==
kTEXT && !c.columnType.is_dict_encoded_string();
453 fragments[
f], frag, arr_col_chunked_array->chunks(), is_varlen);
456 if (ctype ==
kTEXT && !c.columnType.is_dict_encoded_string()) {
460 auto b = mgr->createBuffer(k);
462 b->initEncoder(c.columnType);
466 auto b = mgr->createBuffer(k);
468 b->setSize(frag.sz * b->getSqlType().get_size());
471 auto b = mgr->createBuffer(key);
472 b->setSize(frag.sz * c.columnType.get_size());
473 b->initEncoder(c.columnType);
474 size_t type_size = c.columnType.get_size();
475 tg.run([b, fr = &frag, type_size]() {
477 for (
size_t i = 0; i < fr->chunks.size(); i++) {
478 auto& chunk = fr->chunks[i];
479 int offset = (i == 0) ? fr->offset : 0;
480 size_t size = (i == fr->chunks.size() - 1) ? (fr->sz - sz)
481 : (chunk->length - offset);
483 auto data = chunk->buffers[1]->data();
484 b->getEncoder()->updateStatsEncoded(
485 (
const int8_t*)data + offset * type_size, size);
488 b->getEncoder()->setNumElems(frag.sz);
499 const std::vector<ForeignStorageColumnBuffer>& column_buffers) {
506 const size_t numBytes) {
507 std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
508 auto& frag =
m_columns.at(col_key).at(chunk_key[3]);
510 CHECK(!frag.chunks.empty() || !chunk_key[3]);
511 int64_t sz = 0, copied = 0;
512 int varlen_offset = 0;
513 size_t read_size = 0;
514 for (
size_t i = 0; i < frag.chunks.size(); i++) {
515 auto& array_data = frag.chunks[i];
516 int offset = (i == 0) ? frag.offset : 0;
517 size_t size = (i == frag.chunks.size() - 1) ? (frag.sz - read_size)
518 : (array_data->length - offset);
520 arrow::Buffer* bp =
nullptr;
523 bp = array_data->buffers[1].get();
525 CHECK_GE(array_data->buffers.size(), 3UL);
527 bp = array_data->buffers[2].get();
528 }
else if (array_data->null_count != array_data->length) {
530 CHECK_GE(array_data->buffers.size(), 2UL);
531 bp = array_data->buffers[1].get();
535 if (chunk_key.size() == 5 && chunk_key[4] == 2) {
536 auto data =
reinterpret_cast<const uint32_t*
>(bp->data()) + offset;
537 auto dest_ui32 =
reinterpret_cast<uint32_t*
>(
dest);
540 sz = (size + 1) *
sizeof(uint32_t);
548 sz -=
sizeof(uint32_t);
553 varlen_offset -= data[0];
558 data + (sz /
sizeof(uint32_t)),
560 [varlen_offset](uint32_t val) {
return val + varlen_offset; });
561 varlen_offset += data[(sz /
sizeof(uint32_t)) - 1];
564 auto fixed_type =
dynamic_cast<arrow::FixedWidthType*
>(array_data->type.get());
568 bp->data() + (array_data->offset + offset) * (fixed_type->bit_width() / 8),
569 sz = size * (fixed_type->bit_width() / 8));
571 auto offsets_buffer =
572 reinterpret_cast<const uint32_t*
>(array_data->buffers[1]->data());
573 auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
574 auto string_buffer_size =
575 offsets_buffer[offset + array_data->offset + size] - string_buffer_offset;
576 std::memcpy(dest, bp->data() + string_buffer_offset, sz = string_buffer_size);
587 const size_t numBytes) {
588 std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
589 auto& frag =
m_columns.at(col_key).at(chunk_key[3]);
592 if (frag.chunks.size() != 1) {
596 auto& array_data = frag.chunks[0];
597 int offset = frag.offset;
599 arrow::Buffer* bp =
nullptr;
602 bp = array_data->buffers[1].get();
604 CHECK_GE(array_data->buffers.size(), 3UL);
606 bp = array_data->buffers[2].get();
607 }
else if (array_data->null_count != array_data->length) {
609 CHECK_GE(array_data->buffers.size(), 2UL);
610 bp = array_data->buffers[1].get();
618 auto data =
reinterpret_cast<int8_t*
>(
const_cast<uint8_t*
>(bp->data()));
621 if (chunk_key.size() == 5 && chunk_key[4] == 2) {
630 auto fixed_type =
dynamic_cast<arrow::FixedWidthType*
>(array_data->type.get());
632 return data + (array_data->offset + offset) * (fixed_type->bit_width() / 8);
636 auto offsets_buffer =
reinterpret_cast<const uint32_t*
>(array_data->buffers[1]->data());
637 auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
638 return data + string_buffer_offset;
642 auto it =
m_columns.lower_bound({db_id, table_id, 0});
643 while (it->first[0] == db_id && it->first[1] == table_id) {
648 std::shared_ptr<arrow::ChunkedArray>
652 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
654 size_t bulk_size = 0;
655 std::vector<int> offsets(arr_col_chunked_array->num_chunks());
656 for (
int i = 0; i < arr_col_chunked_array->num_chunks(); i++) {
657 offsets[i] = bulk_size;
658 bulk_size += arr_col_chunked_array->chunk(i)->length();
661 std::vector<std::string_view> bulk(bulk_size);
664 tbb::blocked_range<int>(0, arr_col_chunked_array->num_chunks()),
665 [&bulk, &arr_col_chunked_array, &offsets](
const tbb::blocked_range<int>& r) {
666 for (
int i = r.begin(); i < r.end(); i++) {
667 auto chunk = std::static_pointer_cast<arrow::StringArray>(
668 arr_col_chunked_array->chunk(i));
669 auto offset = offsets[i];
670 for (
int j = 0; j < chunk->length(); j++) {
671 auto view = chunk->GetView(j);
672 bulk[offset + j] = std::string_view(view.data(), view.length());
677 std::shared_ptr<arrow::Buffer> indices_buf;
678 auto res = arrow::AllocateBuffer(bulk_size *
sizeof(int32_t));
680 indices_buf = std::move(
res).ValueOrDie();
681 auto raw_data =
reinterpret_cast<int*
>(indices_buf->mutable_data());
683 auto array = std::make_shared<arrow::Int32Array>(bulk_size, indices_buf);
684 return std::make_shared<arrow::ChunkedArray>(array);
690 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
693 std::vector<std::shared_ptr<arrow::Array>> converted_chunks;
694 for (
auto& chunk : arr_col_chunked_array->chunks()) {
695 auto dict_array = std::static_pointer_cast<arrow::DictionaryArray>(chunk);
696 auto values = std::static_pointer_cast<arrow::StringArray>(dict_array->dictionary());
697 std::vector<std::string_view> strings(values->length());
698 for (
int i = 0; i < values->length(); i++) {
699 auto view = values->GetView(i);
700 strings[i] = std::string_view(view.data(), view.length());
703 std::static_pointer_cast<arrow::Int32Array>(dict_array->indices());
704 std::vector<int> indices_mapping(values->length());
708 std::shared_ptr<arrow::Buffer> dict_indices_buf;
709 auto res = arrow::AllocateBuffer(arrow_indices->length() *
sizeof(int32_t));
711 dict_indices_buf = std::move(
res).ValueOrDie();
712 auto raw_data =
reinterpret_cast<int32_t*
>(dict_indices_buf->mutable_data());
714 for (
int i = 0; i < arrow_indices->length(); i++) {
715 raw_data[i] = indices_mapping[arrow_indices->Value(i)];
718 converted_chunks.push_back(
719 std::make_shared<arrow::Int32Array>(arrow_indices->length(), dict_indices_buf));
721 return std::make_shared<arrow::ChunkedArray>(converted_chunks);
724 template <
typename T,
typename ChunkType>
727 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
728 size_t column_size = 0;
729 std::vector<int> offsets(arr_col_chunked_array->num_chunks());
730 for (
int i = 0; i < arr_col_chunked_array->num_chunks(); i++) {
731 offsets[i] = column_size;
732 column_size += arr_col_chunked_array->chunk(i)->length();
735 std::shared_ptr<arrow::Buffer> result_buffer;
738 result_buffer = std::move(
res).ValueOrDie();
740 T* buffer_data =
reinterpret_cast<T*
>(result_buffer->mutable_data());
742 tbb::blocked_range(0, arr_col_chunked_array->num_chunks()),
743 [buffer_data, &offsets, arr_col_chunked_array](
auto& range) {
744 for (
int chunk_idx = range.begin(); chunk_idx < range.end(); chunk_idx++) {
745 auto offset = offsets[chunk_idx];
746 T* chunk_buffer = buffer_data + offset;
748 auto decimalArray = std::static_pointer_cast<arrow::Decimal128Array>(
749 arr_col_chunked_array->chunk(chunk_idx));
751 arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
752 for (
int i = 0; i < decimalArray->length(); i++) {
753 if (empty || decimalArray->null_count() == decimalArray->length() ||
754 decimalArray->IsNull(i)) {
755 chunk_buffer[i] = inline_int_null_value<T>();
757 arrow::Decimal128 val(decimalArray->GetValue(i));
759 static_cast<int64_t
>(val);
764 auto array = std::make_shared<ChunkType>(column_size, result_buffer);
765 return std::make_shared<arrow::ChunkedArray>(array);
773 const std::string&
type,
775 std::list<ColumnDescriptor>& cols)
override;
777 std::pair<int, int> table_key,
778 const std::string&
type,
780 const std::list<ColumnDescriptor>& cols,
781 Data_Namespace::AbstractBufferMgr* mgr)
override;
783 std::string
getType()
const override;
787 static std::map<std::string, std::shared_ptr<arrow::Table>>
tables;
791 std::map<std::string, std::shared_ptr<arrow::Table>>();
794 using namespace arrow;
830 type.set_comp_param(
sizeof(uint32_t) * 8);
833 case Type::DECIMAL: {
834 const auto& decimal_type =
static_cast<const arrow::DecimalType&
>(
type);
839 case Type::TIMESTAMP:
840 switch (static_cast<const arrow::TimestampType&>(type).unit()) {
841 case TimeUnit::SECOND:
843 case TimeUnit::MILLI:
845 case TimeUnit::MICRO:
851 throw std::runtime_error(type.ToString() +
" is not yet supported.");
856 const std::string&
name,
858 std::list<ColumnDescriptor>& cols) {
862 for (
auto&
field : table->schema()->fields()) {
871 std::pair<int, int> table_key,
872 const std::string& info,
874 const std::list<ColumnDescriptor>& cols,
875 Data_Namespace::AbstractBufferMgr* mgr) {
880 LOG(
INFO) <<
"CSV backed temporary tables has been activated. Create table `with "
881 "(storage_type='CSV:path/to/file.csv');`\n";
894 fsi->registerPersistentStorageInterface(std::make_unique<ArrowForeignStorage>());
902 const std::string&
type,
904 std::list<ColumnDescriptor>& cols)
override;
906 std::pair<int, int> table_key,
907 const std::string&
type,
909 const std::list<ColumnDescriptor>& cols,
910 Data_Namespace::AbstractBufferMgr* mgr)
override;
912 std::string
getType()
const override;
916 const std::string&
type,
918 std::list<ColumnDescriptor>& cols) {
925 using namespace arrow;
943 return arrow::boolean();
956 return time32(TimeUnit::SECOND);
959 return arrow::date64();
961 return arrow::date32();
966 return timestamp(TimeUnit::SECOND);
968 return timestamp(TimeUnit::MILLI);
970 return timestamp(TimeUnit::MICRO);
972 return timestamp(TimeUnit::NANO);
974 throw std::runtime_error(
"Unsupported timestamp precision for Arrow: " +
981 throw std::runtime_error(type.
get_type_name() +
" is not supported in Arrow.");
987 std::pair<int, int> table_key,
988 const std::string& info,
990 const std::list<ColumnDescriptor>& cols,
991 Data_Namespace::AbstractBufferMgr* mgr) {
994 bool isDataframe = df_td ?
true :
false;
995 std::unique_ptr<DataframeTableDescriptor> df_td_owned;
997 df_td_owned = std::make_unique<DataframeTableDescriptor>(td);
999 df_td = df_td_owned.get();
1002 #if defined(ENABLE_ARROW_4) || defined(_WIN32)
1003 auto io_context = arrow::io::default_io_context();
1005 auto io_context = arrow::default_memory_pool();
1007 auto arrow_parse_options = arrow::csv::ParseOptions::Defaults();
1008 arrow_parse_options.quoting =
false;
1009 arrow_parse_options.escaping =
false;
1010 arrow_parse_options.newlines_in_values =
false;
1011 arrow_parse_options.delimiter = *df_td->
delimiter.c_str();
1012 auto arrow_read_options = arrow::csv::ReadOptions::Defaults();
1013 arrow_read_options.use_threads =
true;
1015 arrow_read_options.block_size = 20 * 1024 * 1024;
1016 arrow_read_options.autogenerate_column_names =
false;
1017 arrow_read_options.skip_rows =
1020 auto arrow_convert_options = arrow::csv::ConvertOptions::Defaults();
1021 arrow_convert_options.check_utf8 =
false;
1022 arrow_convert_options.include_columns = arrow_read_options.column_names;
1023 arrow_convert_options.strings_can_be_null =
true;
1025 for (
auto& c : cols) {
1026 if (c.isSystemCol) {
1029 arrow_convert_options.column_types.emplace(c.columnName,
1031 arrow_read_options.column_names.push_back(c.columnName);
1034 std::shared_ptr<arrow::io::ReadableFile> inp;
1035 auto file_result = arrow::io::ReadableFile::Open(info.c_str());
1037 inp = file_result.ValueOrDie();
1039 auto table_reader_result = arrow::csv::TableReader::Make(
1040 io_context, inp, arrow_read_options, arrow_parse_options, arrow_convert_options);
1042 auto table_reader = table_reader_result.ValueOrDie();
1044 std::shared_ptr<arrow::Table> arrowTable;
1046 auto arrow_table_result = table_reader->Read();
1048 arrowTable = arrow_table_result.ValueOrDie();
1051 VLOG(1) <<
"Read Arrow CSV file " << info <<
" in " << time <<
"ms";
1053 arrow::Table& table = *arrowTable.get();
1058 LOG(
INFO) <<
"CSV backed temporary tables has been activated. Create table `with "
1059 "(storage_type='CSV:path/to/file.csv');`\n";
1064 fsi->registerPersistentStorageInterface(std::make_unique<ArrowCsvForeignStorage>());
std::shared_ptr< arrow::ChunkedArray > replaceNullValues(const SQLTypeInfo &columnType, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
std::vector< int > ChunkKey
static SQLTypeInfo getOmnisciType(const arrow::DataType &type)
HOST DEVICE int get_size() const
void prepareTable(const int db_id, const std::string &type, TableDescriptor &td, std::list< ColumnDescriptor > &cols) override
#define ARROW_THROW_NOT_OK(s)
class for a per-database catalog. also includes metadata for the current database and the current use...
void convertBoolBitmapBufferWithoutNulls(int8_t *dst, const uint8_t *src, int64_t length)
static TimeT::rep execution(F func, Args &&...args)
void registerArrowForeignStorage(std::shared_ptr< ForeignStorageInterface > fsi)
void dropTable(const int db_id, const int table_id) override
HOST DEVICE int get_scale() const
std::string getType() const override
void read(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t numBytes) override
size_t first_chunk_offset
void setArrowTable(std::string name, std::shared_ptr< arrow::Table > table)
void append(const std::vector< ForeignStorageColumnBuffer > &column_buffers) override
void registerArrowCsvForeignStorage(std::shared_ptr< ForeignStorageInterface > fsi)
HOST DEVICE SQLTypes get_type() const
int8_t * tryZeroCopy(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, const size_t numBytes) override
std::shared_ptr< arrow::ChunkedArray > convertArrowDictionary(StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
void releaseArrowTable(std::string name)
std::shared_ptr< StringDictionary > stringDict
static std::map< std::string, std::shared_ptr< arrow::Table > > tables
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
void prepareTable(const int db_id, const std::string &type, TableDescriptor &td, std::list< ColumnDescriptor > &cols) override
std::string getType() const override
DEVICE void fill(ARGS &&...args)
CONSTEXPR DEVICE bool is_null(const T &value)
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns
DEVICE auto copy(ARGS &&...args)
void parseArrowTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr, const arrow::Table &table)
std::vector< std::shared_ptr< arrow::ArrayData > > chunks
void getSizeAndOffset(const Frag &frag, const std::shared_ptr< arrow::Array > &chunk, size_t i, int &size, int &offset)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
specifies the content in-memory of a row in the column metadata table
OUTPUT transform(INPUT const &input, FUNC const &func)
static std::shared_ptr< arrow::DataType > getArrowImportType(const SQLTypeInfo type)
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
int get_precision() const
void convertBoolBitmapBufferWithNulls(int8_t *dst, const uint8_t *src, const uint8_t *bitmap, int64_t length, int8_t null_value)
std::string get_type_name() const
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
std::shared_ptr< arrow::ChunkedArray > createDictionaryEncodedColumn(StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
void registerTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr) override
std::vector< Frag > calculateFragmentsOffsets(const arrow::ChunkedArray &array, size_t maxFragRows)
std::shared_ptr< arrow::ChunkedArray > replaceNullValuesImpl(std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
std::shared_ptr< arrow::ChunkedArray > createDecimalColumn(const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
bool is_dict_encoded_string() const
int64_t makeFragment(const Frag &frag, ArrowFragment &arrowFrag, const std::vector< std::shared_ptr< arrow::Array >> &chunks, bool is_varlen)
constexpr auto is_datetime(SQLTypes type)
void registerTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr) override
specifies the content in-memory of a row in the table metadata table