19 #include <arrow/api.h>
20 #include <arrow/io/api.h>
21 #include <parquet/arrow/reader.h>
22 #include <parquet/column_scanner.h>
23 #include <parquet/exception.h>
24 #include <parquet/platform.h>
25 #include <parquet/statistics.h>
26 #include <parquet/types.h>
52 namespace foreign_storage {
57 return value >= lower_bound && value <=
upper_bound;
61 return (parquet_column->logical_type()->is_none() &&
62 parquet_column->physical_type() == parquet::Type::BYTE_ARRAY) ||
63 parquet_column->logical_type()->is_string();
103 const parquet::schema::Node* node = parquet_column->schema_node().get();
104 if ((node->name() !=
"element" && node->name() !=
"item") ||
105 !(node->is_required() ||
106 node->is_optional())) {
113 node = node->parent();
117 if (node->name() !=
"list" || !node->is_repeated() ||
123 node = node->parent();
127 if (!node->logical_type()->is_list() ||
128 !(node->is_optional() ||
129 node->is_required())) {
140 node = node->parent();
147 template <
typename V,
typename NullType>
150 const parquet::ColumnDescriptor* parquet_column_descriptor,
152 switch (parquet_column_descriptor->physical_type()) {
153 case parquet::Type::INT32:
154 return std::make_shared<ParquetDecimalEncoder<V, int32_t, NullType>>(
155 buffer, column_descriptor, parquet_column_descriptor);
156 case parquet::Type::INT64:
157 return std::make_shared<ParquetDecimalEncoder<V, int64_t, NullType>>(
158 buffer, column_descriptor, parquet_column_descriptor);
159 case parquet::Type::FIXED_LEN_BYTE_ARRAY:
160 return std::make_shared<
162 buffer, column_descriptor, parquet_column_descriptor);
163 case parquet::Type::BYTE_ARRAY:
164 return std::make_shared<ParquetDecimalEncoder<V, parquet::ByteArray, NullType>>(
165 buffer, column_descriptor, parquet_column_descriptor);
174 const parquet::ColumnDescriptor* parquet_column,
176 const bool is_metadata_scan_or_for_import) {
177 if (parquet_column->logical_type()->is_decimal()) {
179 return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int64_t>(
180 omnisci_column, parquet_column, buffer);
183 if (is_metadata_scan_or_for_import) {
186 return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int16_t>(
187 omnisci_column, parquet_column, buffer);
189 return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int32_t>(
190 omnisci_column, parquet_column, buffer);
197 return create_parquet_decimal_encoder_with_omnisci_type<int16_t, int16_t>(
198 omnisci_column, parquet_column, buffer);
200 return create_parquet_decimal_encoder_with_omnisci_type<int32_t, int32_t>(
201 omnisci_column, parquet_column, buffer);
224 template <
typename V,
typename T,
typename U,
typename NullType>
225 std::shared_ptr<ParquetEncoder>
228 const size_t omnisci_data_type_byte_size,
229 const size_t parquet_data_type_byte_size,
230 const bool is_signed) {
231 CHECK(
sizeof(NullType) == omnisci_data_type_byte_size);
233 return std::make_shared<ParquetFixedLengthEncoder<V, T, NullType>>(
234 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
236 return std::make_shared<ParquetUnsignedFixedLengthEncoder<V, T, U, NullType>>(
237 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
260 template <
typename V,
typename NullType>
263 const size_t omnisci_data_type_byte_size,
264 const size_t parquet_data_type_byte_size,
266 const bool is_signed) {
273 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
279 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
285 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
291 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
300 const parquet::ColumnDescriptor* parquet_column,
302 const bool is_metadata_scan_or_for_import) {
303 auto column_type = omnisci_column->
columnType;
304 auto physical_type = parquet_column->physical_type();
307 int is_signed =
false;
309 if (parquet_column->logical_type()->is_none() && column_type.is_integer()) {
310 if (physical_type == parquet::Type::INT32) {
312 }
else if (physical_type == parquet::Type::INT64) {
320 if (
auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
321 parquet_column->logical_type().get())) {
322 bit_width = int_logical_column->bit_width();
323 is_signed = int_logical_column->is_signed();
326 if (bit_width == -1) {
330 const size_t omnisci_data_type_byte_size = column_type.get_size();
331 const size_t parquet_data_type_byte_size = parquet::GetTypeByteSize(physical_type);
333 switch (omnisci_data_type_byte_size) {
336 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int64_t>(
338 omnisci_data_type_byte_size,
339 parquet_data_type_byte_size,
343 if (is_metadata_scan_or_for_import && column_type.get_type() ==
kBIGINT) {
344 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int32_t>(
346 omnisci_data_type_byte_size,
347 parquet_data_type_byte_size,
351 return create_parquet_integral_encoder_with_omnisci_type<int32_t, int32_t>(
353 omnisci_data_type_byte_size,
354 parquet_data_type_byte_size,
358 if (is_metadata_scan_or_for_import) {
359 switch (column_type.get_type()) {
361 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int16_t>(
363 omnisci_data_type_byte_size,
364 parquet_data_type_byte_size,
368 return create_parquet_integral_encoder_with_omnisci_type<int32_t, int16_t>(
370 omnisci_data_type_byte_size,
371 parquet_data_type_byte_size,
380 return create_parquet_integral_encoder_with_omnisci_type<int16_t, int16_t>(
382 omnisci_data_type_byte_size,
383 parquet_data_type_byte_size,
387 if (is_metadata_scan_or_for_import) {
388 switch (column_type.get_type()) {
390 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int8_t>(
392 omnisci_data_type_byte_size,
393 parquet_data_type_byte_size,
397 return create_parquet_integral_encoder_with_omnisci_type<int32_t, int8_t>(
399 omnisci_data_type_byte_size,
400 parquet_data_type_byte_size,
404 return create_parquet_integral_encoder_with_omnisci_type<int16_t, int8_t>(
406 omnisci_data_type_byte_size,
407 parquet_data_type_byte_size,
416 return create_parquet_integral_encoder_with_omnisci_type<int8_t, int8_t>(
418 omnisci_data_type_byte_size,
419 parquet_data_type_byte_size,
430 const parquet::ColumnDescriptor* parquet_column,
432 auto column_type = omnisci_column->
columnType;
433 if (!column_type.is_fp()) {
437 switch (column_type.get_type()) {
439 switch (parquet_column->physical_type()) {
440 case parquet::Type::FLOAT:
441 return std::make_shared<ParquetFixedLengthEncoder<float, float>>(
442 buffer, omnisci_column, parquet_column);
443 case parquet::Type::DOUBLE:
444 return std::make_shared<ParquetFixedLengthEncoder<float, double>>(
445 buffer, omnisci_column, parquet_column);
450 CHECK(parquet_column->physical_type() == parquet::Type::DOUBLE);
451 return std::make_shared<ParquetFixedLengthEncoder<double, double>>(
452 buffer, omnisci_column, parquet_column);
461 const parquet::ColumnDescriptor* parquet_column,
463 auto column_type = omnisci_column->
columnType;
464 if (parquet_column->logical_type()->is_none() &&
467 switch (column_type.get_type()) {
469 return std::make_shared<ParquetFixedLengthEncoder<int8_t, bool>>(
470 buffer, omnisci_column, parquet_column);
481 template <
typename V,
typename T,
typename NullType>
484 const parquet::ColumnDescriptor* parquet_column,
486 if (
auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
487 parquet_column->logical_type().get())) {
488 switch (timestamp_logical_type->time_unit()) {
489 case parquet::LogicalType::TimeUnit::MILLIS:
490 return std::make_shared<ParquetTimestampEncoder<V, T, 1000L, NullType>>(
491 buffer, omnisci_column, parquet_column);
492 case parquet::LogicalType::TimeUnit::MICROS:
493 return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L, NullType>>(
494 buffer, omnisci_column, parquet_column);
495 case parquet::LogicalType::TimeUnit::NANOS:
496 return std::make_shared<
498 buffer, omnisci_column, parquet_column);
508 template <
typename V,
typename T,
typename NullType>
511 const parquet::ColumnDescriptor* parquet_column,
513 const bool is_metadata_scan_or_for_import) {
514 if (
auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
515 parquet_column->logical_type().get())) {
516 switch (timestamp_logical_type->time_unit()) {
517 case parquet::LogicalType::TimeUnit::MILLIS:
518 if (is_metadata_scan_or_for_import) {
519 return std::make_shared<
521 buffer, omnisci_column, parquet_column);
523 return std::make_shared<
525 buffer, omnisci_column, parquet_column);
526 case parquet::LogicalType::TimeUnit::MICROS:
527 if (is_metadata_scan_or_for_import) {
528 return std::make_shared<
530 buffer, omnisci_column, parquet_column);
532 return std::make_shared<
534 buffer, omnisci_column, parquet_column);
535 case parquet::LogicalType::TimeUnit::NANOS:
536 if (is_metadata_scan_or_for_import) {
537 return std::make_shared<
540 1000L * 1000L * 1000L,
542 buffer, omnisci_column, parquet_column);
544 return std::make_shared<
546 buffer, omnisci_column, parquet_column);
558 const parquet::ColumnDescriptor* parquet_column,
560 const bool is_metadata_scan_or_for_import) {
561 auto column_type = omnisci_column->
columnType;
563 if (parquet_column->logical_type()->is_timestamp()) {
565 if (precision == 0) {
566 return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int64_t>(
567 omnisci_column, parquet_column, buffer);
569 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
570 buffer, omnisci_column, parquet_column);
573 CHECK(column_type.get_comp_param() == 32);
574 if (is_metadata_scan_or_for_import) {
575 return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int32_t>(
576 omnisci_column, parquet_column, buffer);
578 return create_parquet_timestamp_encoder_with_types<int32_t, int64_t, int32_t>(
579 omnisci_column, parquet_column, buffer);
582 }
else if (parquet_column->logical_type()->is_none() && column_type.is_timestamp()) {
583 if (parquet_column->physical_type() == parquet::Type::INT32) {
585 column_type.get_comp_param() == 32);
586 if (is_metadata_scan_or_for_import) {
587 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int32_t, int32_t>>(
588 buffer, omnisci_column, parquet_column);
590 return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t, int32_t>>(
591 buffer, omnisci_column, parquet_column);
593 }
else if (parquet_column->physical_type() == parquet::Type::INT64) {
595 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
596 buffer, omnisci_column, parquet_column);
598 CHECK(column_type.get_comp_param() == 32);
599 if (is_metadata_scan_or_for_import) {
600 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int32_t>>(
601 buffer, omnisci_column, parquet_column);
603 return std::make_shared<ParquetFixedLengthEncoder<int32_t, int64_t, int32_t>>(
604 buffer, omnisci_column, parquet_column);
614 template <
typename V,
typename T,
typename NullType>
617 const parquet::ColumnDescriptor* parquet_column,
619 if (
auto time_logical_type = dynamic_cast<const parquet::TimeLogicalType*>(
620 parquet_column->logical_type().get())) {
621 switch (time_logical_type->time_unit()) {
622 case parquet::LogicalType::TimeUnit::MILLIS:
623 return std::make_shared<ParquetTimeEncoder<V, T, 1000L, NullType>>(
624 buffer, omnisci_column, parquet_column);
625 case parquet::LogicalType::TimeUnit::MICROS:
626 return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L, NullType>>(
627 buffer, omnisci_column, parquet_column);
628 case parquet::LogicalType::TimeUnit::NANOS:
629 return std::make_shared<
631 buffer, omnisci_column, parquet_column);
643 const parquet::ColumnDescriptor* parquet_column,
645 const bool is_metadata_scan_or_for_import) {
646 auto column_type = omnisci_column->
columnType;
647 if (
auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
648 parquet_column->logical_type().get())) {
650 if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
651 return create_parquet_time_encoder_with_types<int64_t, int32_t, int64_t>(
652 omnisci_column, parquet_column, buffer);
654 return create_parquet_time_encoder_with_types<int64_t, int64_t, int64_t>(
655 omnisci_column, parquet_column, buffer);
658 if (is_metadata_scan_or_for_import) {
659 if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
660 CHECK(parquet_column->physical_type() == parquet::Type::INT32);
661 return create_parquet_time_encoder_with_types<int64_t, int32_t, int32_t>(
662 omnisci_column, parquet_column, buffer);
664 CHECK(time_logical_column->time_unit() ==
665 parquet::LogicalType::TimeUnit::MICROS ||
666 time_logical_column->time_unit() ==
667 parquet::LogicalType::TimeUnit::NANOS);
668 CHECK(parquet_column->physical_type() == parquet::Type::INT64);
669 return create_parquet_time_encoder_with_types<int64_t, int64_t, int32_t>(
670 omnisci_column, parquet_column, buffer);
673 if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
674 CHECK(parquet_column->physical_type() == parquet::Type::INT32);
675 return create_parquet_time_encoder_with_types<int32_t, int32_t, int32_t>(
676 omnisci_column, parquet_column, buffer);
678 CHECK(time_logical_column->time_unit() ==
679 parquet::LogicalType::TimeUnit::MICROS ||
680 time_logical_column->time_unit() ==
681 parquet::LogicalType::TimeUnit::NANOS);
682 CHECK(parquet_column->physical_type() == parquet::Type::INT64);
683 return create_parquet_time_encoder_with_types<int32_t, int64_t, int32_t>(
684 omnisci_column, parquet_column, buffer);
696 const parquet::ColumnDescriptor* parquet_column,
698 const bool is_metadata_scan_or_for_import) {
699 auto column_type = omnisci_column->
columnType;
700 if (parquet_column->logical_type()->is_timestamp() && column_type.is_date()) {
702 if (is_metadata_scan_or_for_import) {
703 if (column_type.get_comp_param() ==
708 omnisci_column, parquet_column, buffer,
true);
709 }
else if (column_type.get_comp_param() == 16) {
713 omnisci_column, parquet_column, buffer,
true);
718 if (column_type.get_comp_param() ==
723 omnisci_column, parquet_column, buffer,
false);
724 }
else if (column_type.get_comp_param() == 16) {
728 omnisci_column, parquet_column, buffer,
false);
739 const parquet::ColumnDescriptor* parquet_column,
741 const bool is_metadata_scan_or_for_import) {
742 auto column_type = omnisci_column->
columnType;
743 if (parquet_column->logical_type()->is_date() && column_type.is_date()) {
745 if (is_metadata_scan_or_for_import) {
746 if (column_type.get_comp_param() ==
750 }
else if (column_type.get_comp_param() == 16) {
757 if (column_type.get_comp_param() ==
759 return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
760 buffer, omnisci_column, parquet_column);
761 }
else if (column_type.get_comp_param() == 16) {
762 return std::make_shared<ParquetFixedLengthEncoder<int16_t, int32_t>>(
763 buffer, omnisci_column, parquet_column);
770 buffer, omnisci_column, parquet_column);
780 const parquet::ColumnDescriptor* parquet_column,
783 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
785 const bool is_for_detect) {
786 auto column_type = omnisci_column->
columnType;
793 return std::make_shared<ParquetStringImportEncoder>(chunk.
getBuffer());
795 return std::make_shared<ParquetStringNoneEncoder>(chunk.
getBuffer(),
799 if (!is_for_detect) {
800 chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
801 std::unique_ptr<ChunkMetadata>& logical_chunk_metadata = chunk_metadata.back();
802 logical_chunk_metadata->sqlType = omnisci_column->
columnType;
803 switch (column_type.get_size()) {
805 return std::make_shared<ParquetStringEncoder<uint8_t>>(
808 is_for_import ?
nullptr : logical_chunk_metadata.get());
810 return std::make_shared<ParquetStringEncoder<uint16_t>>(
813 is_for_import ?
nullptr : logical_chunk_metadata.get());
815 return std::make_shared<ParquetStringEncoder<int32_t>>(
818 is_for_import ?
nullptr : logical_chunk_metadata.get());
823 return std::make_shared<ParquetDetectStringEncoder>(chunk.
getBuffer());
833 const parquet::ColumnDescriptor* parquet_column,
834 std::list<Chunk_NS::Chunk>& chunks,
835 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
836 const bool is_metadata_scan,
837 const bool is_for_import,
838 const bool geo_validate_geometry) {
839 auto column_type = omnisci_column->
columnType;
844 return std::make_shared<ParquetGeospatialImportEncoder>(chunks,
845 geo_validate_geometry);
847 if (is_metadata_scan) {
848 return std::make_shared<ParquetGeospatialEncoder>(geo_validate_geometry);
850 for (
auto chunks_iter = chunks.begin(); chunks_iter != chunks.end(); ++chunks_iter) {
851 chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
852 auto& chunk_metadata_ptr = chunk_metadata.back();
853 chunk_metadata_ptr->sqlType = chunks_iter->getColumnDesc()->columnType;
855 return std::make_shared<ParquetGeospatialEncoder>(
856 parquet_column, chunks, chunk_metadata, geo_validate_geometry);
864 const parquet::ColumnDescriptor* parquet_column,
865 std::list<Chunk_NS::Chunk>& chunks,
867 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
868 const bool is_metadata_scan,
869 const bool is_for_import,
870 const bool is_for_detect,
871 const bool geo_validate_geometry);
907 const parquet::ColumnDescriptor* parquet_column,
908 std::list<Chunk_NS::Chunk>& chunks,
910 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
911 const bool is_metadata_scan,
912 const bool is_for_import,
913 const bool is_for_detect,
914 const bool geo_validate_geometry) {
915 CHECK(!(is_metadata_scan && is_for_import));
916 auto buffer = chunks.empty() ?
nullptr : chunks.begin()->getBuffer();
923 geo_validate_geometry)) {
934 geo_validate_geometry)) {
938 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
942 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
950 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
958 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
962 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
966 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
987 std::list<Chunk_NS::Chunk>& chunks,
989 const parquet::ColumnDescriptor* parquet_column,
991 const bool geo_validate_geometry) {
992 std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1001 geo_validate_geometry);
1010 const parquet::ColumnDescriptor* parquet_column,
1011 const bool geo_validate_geometry) {
1012 std::list<Chunk_NS::Chunk> chunks;
1013 std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1022 geo_validate_geometry);
1027 const parquet::ColumnDescriptor* parquet_column,
1028 std::list<Chunk_NS::Chunk>& chunks,
1030 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
1031 const bool is_metadata_scan,
1032 const bool is_for_import,
1033 const bool is_for_detect,
1034 const bool geo_validate_geometry) {
1039 std::unique_ptr<ColumnDescriptor> omnisci_column_sub_type_column =
1049 geo_validate_geometry);
1050 CHECK(encoder.get());
1052 CHECK(scalar_encoder);
1053 if (!is_for_import) {
1054 if (!is_for_detect) {
1056 encoder = std::make_shared<ParquetFixedLengthArrayEncoder>(
1057 is_metadata_scan ?
nullptr : chunks.begin()->getBuffer(),
1061 encoder = std::make_shared<ParquetVariableLengthArrayEncoder>(
1062 is_metadata_scan ?
nullptr : chunks.begin()->getBuffer(),
1063 is_metadata_scan ?
nullptr : chunks.begin()->getIndexBuf(),
1068 encoder = std::make_shared<ParquetArrayDetectEncoder>(
1069 chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1072 encoder = std::make_shared<ParquetArrayImportEncoder>(
1073 chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1079 const parquet::ParquetFileReader* reader,
1080 const int row_group_index,
1081 const int column_index,
1082 const int16_t* def_levels,
1083 const int64_t num_levels,
1084 const parquet::ColumnDescriptor* parquet_column_descriptor) {
1086 if (!is_valid_parquet_list) {
1089 std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1090 reader->metadata()->RowGroup(row_group_index);
1091 auto column_metadata = group_metadata->ColumnChunk(column_index);
1093 if (group_metadata->num_rows() == 0) {
1097 if (!
stats->HasMinMax()) {
1098 auto find_it = std::find_if(def_levels,
1099 def_levels + num_levels,
1100 [](
const int16_t def_level) {
return def_level == 3; });
1101 if (find_it != def_levels + num_levels) {
1102 throw std::runtime_error(
1103 "No minimum and maximum statistic set in list column but non-null & non-empty "
1104 "array/value detected.");
1115 const parquet::ColumnDescriptor* parquet_column_descriptor,
1116 std::vector<int16_t>& def_levels) {
1118 parquet_column_descriptor->max_definition_level() == 0) {
1119 if (!parquet_column_descriptor->schema_node()->is_required()) {
1120 throw std::runtime_error(
1121 "Unsupported parquet column detected. Column '" +
1122 parquet_column_descriptor->path()->ToDotString() +
1123 "' detected to have max definition level of 0 but is optional.");
1125 def_levels.assign(def_levels.size(), 1);
1131 const parquet::ColumnDescriptor* parquet_column_descriptor) {
1134 throw std::runtime_error(
1135 "Unsupported mapping detected. Column '" +
1136 parquet_column_descriptor->path()->ToDotString() +
1137 "' detected to be a parquet list but HeavyDB mapped column '" +
1138 omnisci_column_descriptor->
columnName +
"' is not an array.");
1140 if (is_valid_parquet_list) {
1141 if (parquet_column_descriptor->max_repetition_level() != 1 ||
1142 parquet_column_descriptor->max_definition_level() != 3) {
1143 throw std::runtime_error(
1144 "Incorrect schema max repetition level detected in column '" +
1145 parquet_column_descriptor->path()->ToDotString() +
1146 "'. Expected a max repetition level of 1 and max definition level of 3 for "
1147 "list column but column has a max "
1148 "repetition level of " +
1149 std::to_string(parquet_column_descriptor->max_repetition_level()) +
1150 " and a max definition level of " +
1151 std::to_string(parquet_column_descriptor->max_definition_level()) +
".");
1154 if (parquet_column_descriptor->max_repetition_level() != 0 ||
1155 !(parquet_column_descriptor->max_definition_level() == 1 ||
1156 parquet_column_descriptor->max_definition_level() == 0)) {
1157 throw std::runtime_error(
1158 "Incorrect schema max repetition level detected in column '" +
1159 parquet_column_descriptor->path()->ToDotString() +
1160 "'. Expected a max repetition level of 0 and max definition level of 1 or 0 "
1162 "flat column but column has a max "
1163 "repetition level of " +
1164 std::to_string(parquet_column_descriptor->max_repetition_level()) +
1165 " and a max definition level of " +
1166 std::to_string(parquet_column_descriptor->max_definition_level()) +
".");
1172 const parquet::ColumnDescriptor* parquet_column,
1173 std::vector<int8_t>& values) {
1174 auto max_type_byte_size =
1176 parquet::GetTypeByteSize(parquet_column->physical_type()));
1177 size_t values_size =
1179 values.resize(values_size);
1183 const parquet::ColumnDescriptor* parquet_column) {
1184 if (
auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1185 parquet_column->logical_type().get())) {
1187 decimal_logical_column->precision() &&
1197 if (
auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1198 parquet_column->logical_type().get())) {
1199 auto parquet_precision = decimal_logical_column->precision();
1200 auto parquet_scale = decimal_logical_column->
scale();
1203 "Parquet column \"" + parquet_column->ToString() +
1204 "\" has decimal precision of " +
std::to_string(parquet_precision) +
1205 " which is too high to import, maximum precision supported is " +
1217 <<
" a Parquet column's decimal logical type failed to be read appropriately";
1222 const parquet::ColumnDescriptor* parquet_column) {
1230 return (parquet_column->physical_type() == parquet::Type::DOUBLE) ||
1231 (parquet_column->physical_type() == parquet::Type::FLOAT &&
1238 const parquet::ColumnDescriptor* parquet_column) {
1240 if (parquet_column->physical_type() == parquet::Type::FLOAT) {
1242 }
else if (parquet_column->physical_type() == parquet::Type::DOUBLE) {
1253 const parquet::ColumnDescriptor* parquet_column) {
1257 if (
auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1258 parquet_column->logical_type().get())) {
1261 const int bits_per_byte = 8;
1264 const int bit_widening_factor = int_logical_column->is_signed() ? 1 : 2;
1266 int_logical_column->bit_width() * bit_widening_factor;
1272 return (parquet_column->physical_type() == parquet::Type::INT64) ||
1273 (parquet_column->physical_type() == parquet::Type::INT32 &&
1282 if (
auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1283 parquet_column->logical_type().get())) {
1284 auto bit_width = int_logical_column->bit_width();
1285 if (!int_logical_column->is_signed()) {
1288 "Unsigned integer column \"" + parquet_column->path()->ToDotString() +
1289 "\" in Parquet file with 64 bit-width has no supported type for ingestion "
1290 "that will not result in data loss");
1313 CHECK(parquet_column->logical_type()->is_none());
1314 if (parquet_column->physical_type() == parquet::Type::INT32) {
1317 CHECK(parquet_column->physical_type() == parquet::Type::INT64);
1329 const parquet::TimestampLogicalType* timestamp_logical_column) {
1330 return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::NANOS;
1338 const parquet::TimestampLogicalType* timestamp_logical_column) {
1339 return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MICROS;
1347 const parquet::TimestampLogicalType* timestamp_logical_column) {
1348 return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS;
1352 const parquet::ColumnDescriptor* parquet_column) {
1353 bool is_none_encoded_mapping =
1355 (parquet_column->physical_type() == parquet::Type::BOOLEAN &&
1357 return parquet_column->logical_type()->is_none() && is_none_encoded_mapping;
1361 const parquet::ColumnDescriptor* parquet_column) {
1370 const parquet::ColumnDescriptor* parquet_column) {
1378 if (
auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1379 parquet_column->logical_type().get())) {
1394 if (parquet_column->logical_type()->is_none() &&
1395 ((parquet_column->physical_type() == parquet::Type::INT32 &&
1398 parquet_column->physical_type() == parquet::Type::INT64)) {
1405 if (
auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1406 parquet_column->logical_type().get())) {
1425 const parquet::ColumnDescriptor* parquet_column) {
1432 if (parquet_column->logical_type()->is_time()) {
1439 CHECK(parquet_column->logical_type()->is_time());
1441 type.set_type(
kTIME);
1443 type.set_fixed_size();
1448 const parquet::ColumnDescriptor* parquet_column) {
1459 return parquet_column->logical_type()->is_date() ||
1460 parquet_column->logical_type()
1465 CHECK(parquet_column->logical_type()->is_date());
1467 type.set_type(
kDATE);
1469 type.set_fixed_size();
1474 const parquet::ColumnDescriptor* parquet_column) {
1495 const parquet::ColumnDescriptor* parquet_column) {
1501 const parquet::arrow::FileReader* new_file_reader,
1502 const std::string& reference_file_path,
1503 const std::string& new_file_path) {
1504 const auto reference_num_columns =
1505 reference_file_reader->parquet_reader()->metadata()->num_columns();
1506 const auto new_num_columns =
1507 new_file_reader->parquet_reader()->metadata()->num_columns();
1508 if (reference_num_columns != new_num_columns) {
1509 throw std::runtime_error{
"Parquet file \"" + new_file_path +
1510 "\" has a different schema. Please ensure that all Parquet "
1511 "files use the same schema. Reference Parquet file: \"" +
1512 reference_file_path +
"\" has " +
1514 " columns. New Parquet file \"" + new_file_path +
"\" has " +
1518 for (
int i = 0; i < reference_num_columns; i++) {
1521 reference_file_path,
1529 bool allowed_type =
false;
1532 auto omnisci_column_sub_type_column =
1535 omnisci_column_sub_type_column.get(), parquet_column);
1541 if (!allowed_type) {
1542 auto logical_type = parquet_column->logical_type();
1543 if (logical_type->is_timestamp()) {
1544 auto timestamp_type =
1545 dynamic_cast<const parquet::TimestampLogicalType*
>(logical_type.get());
1546 CHECK(timestamp_type);
1548 if (!timestamp_type->is_adjusted_to_utc()) {
1549 LOG(
WARNING) <<
"Non-UTC timezone specified in Parquet file for column \""
1551 <<
"\". Only UTC timezone is currently supported.";
1554 std::string parquet_type;
1556 if (parquet_column->logical_type()->is_none()) {
1557 parquet_type = parquet::TypeToString(physical_type);
1559 parquet_type = logical_type->ToString();
1562 throw std::runtime_error{
"Conversion from Parquet type \"" + parquet_type +
1563 "\" to HeavyDB type \"" + omnisci_type +
1564 "\" is not allowed. Please use an appropriate column type."};
1570 if (parquet_column->logical_type()->is_decimal()) {
1574 if (parquet_column->logical_type()->is_none() &&
1575 (parquet_column->physical_type() == parquet::Type::FLOAT ||
1576 parquet_column->physical_type() == parquet::Type::DOUBLE)) {
1580 if ((parquet_column->logical_type()->is_none() &&
1581 (parquet_column->physical_type() == parquet::Type::INT32 ||
1582 parquet_column->physical_type() == parquet::Type::INT64)) ||
1583 parquet_column->logical_type()->is_int()) {
1587 if (parquet_column->logical_type()->is_none() &&
1588 parquet_column->physical_type() == parquet::Type::BOOLEAN) {
1592 if (parquet_column->logical_type()->is_timestamp()) {
1596 if (parquet_column->logical_type()->is_time()) {
1600 if (parquet_column->logical_type()->is_date()) {
1609 parquet_column->ToString());
1613 const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1614 const std::string& file_path,
1623 const int column_index,
1624 const std::string& file_path) {
1625 throw std::runtime_error{
1626 "Statistics metadata is required for all row groups. Metadata is missing for "
1627 "row group index: " +
1629 ", column index: " +
std::to_string(column_index) +
", file path: " + file_path};
1640 const int fragment_size) {
1642 "Parquet file has a row group size that is larger than the fragment size. "
1643 "Please set the table fragment size to a number that is larger than the "
1644 "row group size. Row group index: " +
1648 ", file path: " + max_row_group_stats.
file_path};
1649 metadata_scan_exception.min_feasible_fragment_size_ =
1651 throw metadata_scan_exception;
1655 const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1656 const std::string& file_path,
1658 const bool do_metadata_stats_validation) {
1661 for (
int i = 0; i < file_metadata->num_columns(); ++i, ++column_it) {
1662 const parquet::ColumnDescriptor* descr = file_metadata->schema()->Column(i);
1665 }
catch (std::runtime_error& e) {
1666 std::stringstream error_message;
1667 error_message << e.what() <<
" Parquet column: " << descr->path()->ToDotString()
1668 <<
", HeavyDB column: " << (*column_it)->columnName
1669 <<
", Parquet file: " << file_path <<
".";
1670 throw std::runtime_error(error_message.str());
1673 for (
int r = 0; r < file_metadata->num_row_groups(); ++r) {
1674 auto group_metadata = file_metadata->RowGroup(r);
1675 auto num_rows = group_metadata->num_rows();
1676 if (num_rows == 0) {
1678 }
else if (num_rows > max_row_group_stats.max_row_group_size) {
1679 max_row_group_stats.max_row_group_size = num_rows;
1680 max_row_group_stats.max_row_group_index = r;
1681 max_row_group_stats.file_path = file_path;
1684 if (do_metadata_stats_validation) {
1685 auto column_chunk = group_metadata->ColumnChunk(i);
1686 bool contains_metadata = column_chunk->is_stats_set();
1687 if (contains_metadata) {
1688 auto stats = column_chunk->statistics();
1689 bool is_all_nulls =
stats->null_count() == column_chunk->num_values();
1695 if (!(
stats->HasMinMax() || is_all_nulls || is_list)) {
1696 contains_metadata =
false;
1700 if (!contains_metadata) {
1706 return max_row_group_stats;
1710 const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1711 const std::string& file_path,
1713 const bool do_metadata_stats_validation) {
1716 file_metadata, file_path, schema, do_metadata_stats_validation);
1720 const std::map<
int, std::shared_ptr<ParquetEncoder>>& encoder_map,
1724 std::list<RowGroupMetadata> row_group_metadata;
1725 auto column_interval =
1729 auto file_metadata = reader->parquet_reader()->metadata();
1730 for (
int row_group = row_group_interval.
start_index;
1731 row_group <= row_group_interval.
end_index;
1733 auto& row_group_metadata_item = row_group_metadata.emplace_back();
1734 row_group_metadata_item.row_group_index = row_group;
1735 row_group_metadata_item.file_path = row_group_interval.
file_path;
1737 std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1738 file_metadata->RowGroup(row_group);
1740 for (
int column_id = column_interval.start; column_id <= column_interval.end;
1744 auto encoder_map_iter =
1746 CHECK(encoder_map_iter != encoder_map.end());
1748 auto metadata = encoder_map_iter->second->getRowGroupMetadata(
1749 group_metadata.get(), parquet_column_index, column_descriptor->columnType);
1750 row_group_metadata_item.column_chunk_metadata.emplace_back(metadata);
1751 }
catch (
const std::exception& e) {
1752 std::stringstream error_message;
1753 error_message << e.what() <<
" in row group " << row_group <<
" of Parquet file '"
1754 << row_group_interval.
file_path <<
"'.";
1755 throw std::runtime_error(error_message.str());
1759 return row_group_metadata;
1763 const std::map<int, Chunk_NS::Chunk> chunks,
1766 const std::map<int, StringDictionary*> column_dictionaries,
1767 const int64_t num_rows,
1768 const bool geo_validate_geometry) {
1769 std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1770 auto file_metadata = reader->parquet_reader()->metadata();
1771 for (
auto& [column_id, chunk] : chunks) {
1773 if (column_descriptor->isGeoPhyCol) {
1776 auto parquet_column_descriptor =
1778 auto find_it = column_dictionaries.find(column_id);
1780 (find_it == column_dictionaries.end() ?
nullptr : find_it->second);
1781 std::list<Chunk_NS::Chunk> chunks_for_import;
1782 chunks_for_import.push_back(chunk);
1783 if (column_descriptor->columnType.is_geometry()) {
1784 for (
int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
1785 chunks_for_import.push_back(chunks.at(column_id + i + 1));
1790 parquet_column_descriptor,
1792 geo_validate_geometry);
1797 if (
auto inplace_encoder = dynamic_cast<ParquetInPlaceEncoder*>(encoder.get())) {
1798 inplace_encoder->reserve(num_rows);
1808 const bool do_metadata_stats_validation,
1809 const bool geo_validate_geometry) {
1810 std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1811 auto file_metadata = reader->parquet_reader()->metadata();
1812 for (
int column_id = column_interval.
start; column_id <= column_interval.
end;
1815 auto parquet_column_descriptor =
1818 column_descriptor, parquet_column_descriptor, geo_validate_geometry);
1819 if (!do_metadata_stats_validation) {
1822 column_id += column_descriptor->columnType.get_physical_cols();
1829 const std::vector<RowGroupInterval>& row_group_intervals,
1830 const int parquet_column_index,
1832 std::list<Chunk_NS::Chunk>& chunks,
1835 const bool is_for_detect,
1836 const std::optional<int64_t> max_rows_to_read) {
1838 std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1844 std::vector<int8_t> values;
1848 Timer<> initialization_timer_ms;
1850 Timer<> parquet_read_timer_ms;
1852 size_t total_row_groups_read = 0;
1854 summary_timer.
start();
1856 initialization_timer_ms.
start();
1857 CHECK(!row_group_intervals.empty());
1858 const auto& first_file_path = row_group_intervals.front().file_path;
1861 auto first_parquet_column_descriptor =
1865 const bool geo_validate_geometry =
1868 first_parquet_column_descriptor,
1875 geo_validate_geometry);
1876 CHECK(encoder.get());
1878 if (rejected_row_indices) {
1879 encoder->initializeErrorTracking();
1881 encoder->initializeColumnType(column_descriptor->
columnType);
1882 initialization_timer_ms.
stop();
1884 bool early_exit =
false;
1885 int64_t total_rows_read = 0;
1886 for (
const auto& row_group_interval : row_group_intervals) {
1887 initialization_timer_ms.
start();
1888 const auto& file_path = row_group_interval.file_path;
1892 CHECK(row_group_interval.start_index >= 0 &&
1893 row_group_interval.end_index < num_row_groups);
1894 CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1896 parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1897 auto parquet_column_descriptor =
1900 initialization_timer_ms.
stop();
1902 validation_timer_ms.
start();
1904 parquet_column_descriptor,
1909 parquet_column_descriptor);
1912 validation_timer_ms.
stop();
1914 int64_t values_read = 0;
1915 for (
int row_group_index = row_group_interval.start_index;
1916 row_group_index <= row_group_interval.end_index;
1917 ++row_group_index) {
1918 total_row_groups_read++;
1919 parquet_read_timer_ms.
start();
1920 auto group_reader = parquet_reader->RowGroup(row_group_index);
1921 std::shared_ptr<parquet::ColumnReader> col_reader =
1922 group_reader->Column(parquet_column_index);
1923 parquet_read_timer_ms.
stop();
1926 while (col_reader->HasNext()) {
1927 parquet_read_timer_ms.
start();
1928 int64_t levels_read =
1932 reinterpret_cast<uint8_t*
>(values.data()),
1935 parquet_read_timer_ms.
stop();
1937 encoding_timer_ms.
start();
1938 if (rejected_row_indices) {
1939 encoder->appendDataTrackErrors(def_levels.data(),
1948 parquet_column_index,
1951 parquet_column_descriptor);
1953 encoder->appendData(def_levels.data(),
1959 encoding_timer_ms.
stop();
1961 if (max_rows_to_read.has_value()) {
1963 auto array_encoder =
1965 CHECK(array_encoder);
1966 total_rows_read = array_encoder->getArraysCount();
1970 total_rows_read += levels_read;
1973 if (total_rows_read >= max_rows_to_read.value()) {
1979 encoding_timer_ms.
start();
1980 if (
auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1981 array_encoder->finalizeRowGroup();
1983 encoding_timer_ms.
stop();
1984 }
catch (
const std::exception& error) {
1987 if (boost::regex_search(error.what(),
1988 boost::regex{
"Deserializing page header failed."})) {
1990 "Unable to read from foreign data source, possible cause is an unexpected "
1991 "change of source. Please use the \"REFRESH FOREIGN TABLES\" command on "
1994 "if data source has been updated. Foreign table: " +
1999 std::string(error.what()) +
" Row group: " +
std::to_string(row_group_index) +
2000 ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
2001 "', Parquet file: '" + file_path +
"'");
2003 if (max_rows_to_read.has_value() && early_exit) {
2007 if (max_rows_to_read.has_value() && early_exit) {
2012 encoding_timer_ms.
start();
2013 if (rejected_row_indices) {
2014 *rejected_row_indices = encoder->getRejectedRowIndices();
2016 encoding_timer_ms.
stop();
2018 summary_timer.
stop();
2020 VLOG(1) <<
"Appended " << total_row_groups_read
2021 <<
" row groups to chunk. Column: " << column_descriptor->
columnName
2022 <<
", Column id: " << column_descriptor->
columnId <<
", Parquet column: "
2023 << first_parquet_column_descriptor->path()->ToDotString();
2024 VLOG(1) <<
"Runtime summary:";
2025 VLOG(1) <<
" Parquet chunk loading total time: " << summary_timer.
elapsed() <<
"ms";
2026 VLOG(1) <<
" Parquet encoder initialization time: " << initialization_timer_ms.
elapsed()
2028 VLOG(1) <<
" Parquet metadata validation time: " << validation_timer_ms.
elapsed()
2030 VLOG(1) <<
" Parquet column read time: " << parquet_read_timer_ms.
elapsed() <<
"ms";
2031 VLOG(1) <<
" Parquet data conversion time: " << encoding_timer_ms.
elapsed() <<
"ms";
2033 return chunk_metadata;
2037 const parquet::ColumnDescriptor* parquet_column) {
2042 return type.get_array_type();
2050 const parquet::ColumnDescriptor* parquet_column) {
2052 <<
"isColumnMappingSupported should not be called on arrays";
2084 std::shared_ptr<arrow::fs::FileSystem> file_system,
2087 : file_system_(file_system)
2088 , file_reader_cache_(file_map)
2089 , foreign_table_(foreign_table) {
2094 const std::vector<RowGroupInterval>& row_group_intervals,
2095 const int parquet_column_index,
2096 std::list<Chunk_NS::Chunk>& chunks,
2099 CHECK(!chunks.empty());
2100 auto const& chunk = *chunks.begin();
2101 auto column_descriptor = chunk.getColumnDesc();
2102 auto buffer = chunk.getBuffer();
2107 parquet_column_index,
2111 rejected_row_indices);
2113 }
catch (
const std::exception& error) {
2135 const parquet::ColumnDescriptor* parquet_column_descriptor,
2138 const int row_group_index,
2139 const int parquet_column_index,
2140 const parquet::ParquetFileReader* parquet_reader)
2162 reinterpret_cast<uint8_t*
>(batch_data.
values.data()),
2178 batch_data.
values.data(),
2182 if (
auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(
encoder_)) {
2183 array_encoder->finalizeRowGroup();
2205 const std::map<int, Chunk_NS::Chunk>& chunks,
2207 const std::map<int, StringDictionary*>& column_dictionaries,
2208 const int num_threads) {
2211 const auto& file_path = row_group_interval.
file_path;
2215 auto file_reader = file_reader_owner.get();
2216 auto file_metadata = file_reader->parquet_reader()->metadata();
2224 auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
2227 }
catch (std::runtime_error& e) {
2228 std::stringstream error_message;
2229 error_message << e.what()
2230 <<
" Parquet column: " << parquet_column->path()->ToDotString()
2231 <<
", HeavyDB column: " << column_descriptor->columnName
2232 <<
", Parquet file: " << file_path <<
".";
2233 throw std::runtime_error(error_message.str());
2238 auto row_group_index = row_group_interval.
start_index;
2239 std::map<int, ParquetRowGroupReader> row_group_reader_map;
2241 parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
2242 auto group_reader = parquet_reader->RowGroup(row_group_index);
2244 std::vector<InvalidRowGroupIndices> invalid_indices_per_thread(num_threads);
2246 const bool geo_validate_geometry =
2251 column_dictionaries,
2252 group_reader->metadata()->num_rows(),
2253 geo_validate_geometry);
2255 std::vector<std::set<int>> partitions(num_threads);
2256 std::map<int, int> column_id_to_thread;
2257 for (
auto& [column_id, encoder] : encoder_map) {
2258 auto thread_id = column_id % num_threads;
2259 column_id_to_thread[column_id] =
thread_id;
2260 partitions[
thread_id].insert(column_id);
2263 for (
auto& [column_id, encoder] : encoder_map) {
2266 auto parquet_column_descriptor =
2267 file_metadata->schema()->Column(parquet_column_index);
2272 row_group_interval.
end_index < num_row_groups);
2273 CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
2275 parquet_column_descriptor);
2277 std::shared_ptr<parquet::ColumnReader> col_reader =
2278 group_reader->Column(parquet_column_index);
2280 row_group_reader_map.insert(
2284 parquet_column_descriptor,
2287 column_id_to_thread, column_id)],
2289 parquet_column_index,
2293 std::vector<std::future<void>> futures;
2294 for (
int ithread = 0; ithread < num_threads; ++ithread) {
2295 auto column_ids_for_thread = partitions[ithread];
2296 futures.emplace_back(
2298 for (
const auto column_id : column_ids_for_thread) {
2300 .readAndValidateRowGroup();
2306 for (
auto& future : futures) {
2310 for (
auto& future : futures) {
2316 for (
auto& thread_invalid_indices : invalid_indices_per_thread) {
2317 invalid_indices.merge(thread_invalid_indices);
2320 for (
auto& [_, reader] : row_group_reader_map) {
2321 reader.eraseInvalidRowGroupData(
2327 auto column_id = column_descriptor->columnId;
2329 CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
2330 invalid_indices.size());
2331 size_t updated_num_elems = db_encoder->getNumElems() +
2332 group_reader->metadata()->num_rows() -
2333 invalid_indices.size();
2334 db_encoder->setNumElems(updated_num_elems);
2335 if (column_descriptor->columnType.is_geometry()) {
2336 for (
int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
2339 db_encoder->setNumElems(updated_num_elems);
2344 return {group_reader->metadata()->num_rows() - invalid_indices.size(),
2345 invalid_indices.size()};
2356 const size_t max_num_rows,
2358 CHECK(!files.empty());
2360 auto first_file = *files.begin();
2363 for (
auto current_file_it = ++files.begin(); current_file_it != files.end();
2364 ++current_file_it) {
2369 auto first_file_metadata = first_file_reader->parquet_reader()->metadata();
2370 auto num_columns = first_file_metadata->num_columns();
2375 auto current_file_it = files.begin();
2376 while (data_preview.
sample_rows.size() < max_num_rows &&
2377 current_file_it != files.end()) {
2378 size_t total_num_rows = data_preview.
sample_rows.size();
2379 size_t max_num_rows_to_append = max_num_rows - data_preview.
sample_rows.size();
2382 std::vector<RowGroupInterval> row_group_intervals;
2383 for (; current_file_it != files.end(); ++current_file_it) {
2384 const auto& file_path = *current_file_it;
2386 auto file_metadata = file_reader->parquet_reader()->metadata();
2387 auto num_row_groups = file_metadata->num_row_groups();
2388 int end_row_group = 0;
2389 for (
int i = 0; i < num_row_groups && total_num_rows < max_num_rows; ++i) {
2390 const size_t next_num_rows = file_metadata->RowGroup(i)->num_rows();
2391 total_num_rows += next_num_rows;
2394 row_group_intervals.push_back(
RowGroupInterval{file_path, 0, end_row_group});
2398 for (
int i = 0; i < num_columns; ++i) {
2399 auto col = first_file_metadata->schema()->Column(i);
2404 sql_type.is_array() ? col->path()->ToDotVector()[0] +
"_array" : col->name();
2412 std::make_unique<TypedParquetDetectBuffer>());
2414 std::make_unique<RejectedRowIndices>());
2416 auto& chunk = preview_context.
column_chunks.emplace_back(&cd);
2417 chunk.setPinnable(
false);
2418 chunk.setBuffer(detect_buffer.get());
2421 std::function<void(const std::vector<int>&)> append_row_groups_for_column =
2422 [&](
const std::vector<int>& column_indices) {
2423 for (
const auto& column_index : column_indices) {
2425 auto chunk_list = std::list<Chunk_NS::Chunk>{chunk};
2426 auto& rejected_row_indices =
2430 chunk.getColumnDesc(),
2433 rejected_row_indices.get(),
2435 max_num_rows_to_append);
2441 std::vector<int> columns(num_columns);
2442 std::iota(columns.begin(), columns.end(), 0);
2445 for (
auto& future : futures) {
2448 for (
auto& future : futures) {
2453 auto rejected_row_indices = std::make_unique<RejectedRowIndices>();
2454 for (
int i = 0; i < num_columns; ++i) {
2455 rejected_row_indices->insert(
2460 size_t num_rows = 0;
2462 for (
int i = 0; i < num_columns; ++i, ++buffers_it) {
2464 auto& strings = buffers_it->get()->getStrings();
2466 num_rows = strings.size();
2468 CHECK_EQ(num_rows, strings.size());
2472 size_t num_rejected_rows = rejected_row_indices->size();
2474 CHECK_GE(num_rows, num_rejected_rows);
2475 auto row_count = num_rows - num_rejected_rows;
2477 auto offset_row = data_preview.
sample_rows.size();
2478 data_preview.
sample_rows.resize(std::min(offset_row + row_count, max_num_rows));
2480 for (
size_t irow = 0, rows_appended = 0;
2481 irow < num_rows && offset_row + rows_appended < max_num_rows;
2483 if (rejected_row_indices->find(irow) != rejected_row_indices->end()) {
2486 auto& row_data = data_preview.
sample_rows[offset_row + rows_appended];
2487 row_data.resize(num_columns);
2489 for (
int i = 0; i < num_columns; ++i, ++buffers_it) {
2491 auto& strings = buffers_it->get()->getStrings();
2492 row_data[i] = strings[irow];
2499 for (
int i = 0; i < num_columns; ++i) {
2501 if (type_info.is_string()) {
2502 auto tentative_geo_type =
2504 if (tentative_geo_type.has_value()) {
2505 data_preview.
column_types[i].set_type(tentative_geo_type.value());
2511 return data_preview;
2515 const std::vector<std::string>& file_paths,
2517 const bool do_metadata_stats_validation) {
2519 auto column_interval =
2522 CHECK(!file_paths.empty());
2526 const auto& first_path = *file_paths.begin();
2528 auto max_row_group_stats =
2532 do_metadata_stats_validation);
2535 auto table_ptr = schema.getForeignTable();
2538 VLOG(1) <<
"Metadata scan using " << num_threads <<
" threads";
2540 const bool geo_validate_geometry =
2545 do_metadata_stats_validation,
2546 geo_validate_geometry);
2548 VLOG(1) <<
"Starting metadata scan of path " << first_path;
2550 encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
2551 VLOG(1) <<
"Completed metadata scan of path " << first_path;
2559 std::vector<std::string> cache_subset;
2560 for (
auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
2562 cache_subset.emplace_back(*path_it);
2566 std::vector<std::future<std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats>>>
2568 for (
const auto& path_group : paths_per_thread) {
2571 [&](
const auto& paths,
const auto& file_reader_cache)
2572 -> std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats> {
2574 Timer<> get_or_insert_reader_timer_ms;
2578 summary_timer.
start();
2580 std::list<RowGroupMetadata> reduced_metadata;
2581 MaxRowGroupSizeStats max_row_group_stats{0, 0};
2582 for (
const auto& path : paths.get()) {
2583 get_or_insert_reader_timer_ms.
start();
2584 auto reader = file_reader_cache.get().getOrInsert(path,
file_system_);
2585 get_or_insert_reader_timer_ms.
stop();
2587 validation_timer_ms.
start();
2589 auto local_max_row_group_stats =
2593 do_metadata_stats_validation);
2594 if (local_max_row_group_stats.max_row_group_size >
2595 max_row_group_stats.max_row_group_size) {
2596 max_row_group_stats = local_max_row_group_stats;
2598 validation_timer_ms.
stop();
2600 VLOG(1) <<
"Starting metadata scan of path " << path;
2602 metadata_scan_timer.
start();
2605 reduced_metadata.splice(
2606 reduced_metadata.end(),
2608 metadata_scan_timer.
stop();
2610 VLOG(1) <<
"Completed metadata scan of path " << path;
2613 summary_timer.
stop();
2615 VLOG(1) <<
"Runtime summary:";
2616 VLOG(1) <<
" Parquet metadata scan total time: " << summary_timer.
elapsed()
2618 VLOG(1) <<
" Parquet file reader opening time: "
2619 << get_or_insert_reader_timer_ms.
elapsed() <<
"ms";
2620 VLOG(1) <<
" Parquet metadata validation time: "
2621 << validation_timer_ms.
elapsed() <<
"ms";
2622 VLOG(1) <<
" Parquet metadata processing time: "
2623 << validation_timer_ms.
elapsed() <<
"ms";
2625 return {reduced_metadata, max_row_group_stats};
2627 std::ref(path_group),
2632 for (
auto& future : futures) {
2633 auto [metadata, local_max_row_group_stats] = future.get();
2634 row_group_metadata.splice(row_group_metadata.end(), metadata);
2635 if (local_max_row_group_stats.max_row_group_size >
2636 max_row_group_stats.max_row_group_size) {
2637 max_row_group_stats = local_max_row_group_stats;
2641 if (max_row_group_stats.max_row_group_size > schema.getForeignTable()->maxFragRows) {
2643 max_row_group_stats, schema.getForeignTable()->maxFragRows);
2646 return row_group_metadata;
DEVICE auto upper_bound(ARGS &&...args)
std::shared_ptr< parquet::ColumnReader > col_reader_
std::list< ColumnDescriptor > column_descriptors
HOST DEVICE SQLTypes get_subtype() const
void set_compression(EncodingType c)
AbstractBuffer * getIndexBuf() const
std::shared_ptr< ParquetEncoder > create_parquet_signed_or_unsigned_integral_encoder_with_types(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const bool is_signed)
Create a signed or unsigned integral parquet encoder using types.
HOST DEVICE int get_size() const
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::optional< SQLTypes > detect_geo_type(const SampleRows &sample_rows, size_t column_index)
std::vector< Chunk_NS::Chunk > column_chunks
static constexpr int32_t kMaxNumericPrecision
std::vector< std::string > column_names
std::vector< SQLTypeInfo > column_types
const parquet::ParquetFileReader * parquet_reader_
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
HOST DEVICE int get_scale() const
size_t get_num_threads(const ForeignTable &table)
std::shared_ptr< ParquetEncoder > create_parquet_geospatial_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool geo_validate_geometry)
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
const parquet::ColumnDescriptor * parquet_column_descriptor_
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool is_nanosecond_precision(const ColumnDescriptor *omnisci_column)
const int row_group_index_
std::shared_ptr< ParquetEncoder > create_parquet_none_type_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
SQLTypeInfo suggest_decimal_mapping(const parquet::ColumnDescriptor *parquet_column)
MaxRowGroupSizeStats validate_column_mapping_and_row_group_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema, const bool do_metadata_stats_validation)
InvalidRowGroupIndices & invalid_indices_
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
ParquetEncoder * encoder_
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
HOST DEVICE SQLTypes get_type() const
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect, const bool geo_validate_geometry)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
void validate_list_column_metadata_statistics(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)
int getParquetColumnIndex(const int column_id) const
std::shared_ptr< ParquetEncoder > create_parquet_array_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect, const bool geo_validate_geometry)
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
SQLTypeInfo suggest_timestamp_mapping(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_date_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
int64_t max_row_group_size
void throw_missing_metadata_error(const int row_group_index, const int column_index, const std::string &file_path)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
void set_definition_levels_for_zero_max_definition_level_case(const parquet::ColumnDescriptor *parquet_column_descriptor, std::vector< int16_t > &def_levels)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
ParquetImportEncoder * import_encoder
SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
future< Result > async(Fn &&fn, Args &&...args)
bool is_fixlen_array() const
std::set< int64_t > InvalidRowGroupIndices
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder_with_omnisci_type(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const int bit_width, const bool is_signed)
Create a integral parquet encoder using types.
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
SQLTypeInfo suggest_date_mapping(const parquet::ColumnDescriptor *parquet_column)
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
void readAndValidateRowGroup()
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
SQLTypeInfo suggest_floating_point_mapping(const parquet::ColumnDescriptor *parquet_column)
An AbstractBuffer is a unit of data management for a data manager.
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
specifies the content in-memory of a row in the column metadata table
std::pair< size_t, size_t > loadRowGroups(const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries, const int num_threads=1)
Load row groups of data into given chunks.
parquet::arrow::FileReader * ReaderPtr
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_string_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const Chunk_NS::Chunk &chunk, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, bool is_for_import, const bool is_for_detect)
int get_precision() const
int numLogicalColumns() const
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder_with_omnisci_type(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, AbstractBuffer *buffer)
void set_comp_param(int p)
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
const ForeignTable * foreign_table_
int64_t max_row_group_index
SQLTypeInfo suggest_integral_mapping(const parquet::ColumnDescriptor *parquet_column)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
std::vector< int8_t > values
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
void eraseInvalidRowGroupData(const InvalidRowGroupIndices &invalid_indices)
DEVICE auto lower_bound(ARGS &&...args)
HOST DEVICE EncodingType get_compression() const
AbstractBuffer * getBuffer() const
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_import(std::list< Chunk_NS::Chunk > &chunks, const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, StringDictionary *string_dictionary, const bool geo_validate_geometry)
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
virtual void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices)=0
DEVICE void iota(ARGS &&...args)
static const int batch_reader_num_elements
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
HOST DEVICE int get_dimension() const
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries, const int64_t num_rows, const bool geo_validate_geometry)
std::string get_type_name() const
void initializeIfEmpty(const std::string &path)
virtual void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices)=0
std::set< int64_t > RejectedRowIndices
DataPreview previewFiles(const std::vector< std::string > &files, const size_t max_num_rows, const ForeignTable &table)
Preview rows of data and column types in a set of files.
std::vector< std::unique_ptr< TypedParquetDetectBuffer > > detect_buffers
std::vector< std::unique_ptr< RejectedRowIndices > > rejected_row_indices_per_column
std::shared_ptr< ParquetEncoder > create_parquet_floating_point_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
HOST DEVICE int get_comp_param() const
bool is_millisecond_precision(const ColumnDescriptor *omnisci_column)
ParquetRowGroupReader(std::shared_ptr< parquet::ColumnReader > col_reader, const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, ParquetEncoder *encoder, InvalidRowGroupIndices &invalid_indices, const int row_group_index, const int parquet_column_index, const parquet::ParquetFileReader *parquet_reader)
FileReaderMap * file_reader_cache_
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
#define DEBUG_TIMER(name)
bool is_valid_parquet_string(const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader, const bool do_metadata_stats_validation, const bool geo_validate_geometry)
LazyParquetChunkLoader(std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache, const ForeignTable *foreign_table)
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
const int parquet_column_index_
SQLTypeInfo suggest_boolean_type_mapping(const parquet::ColumnDescriptor *parquet_column)
bool is_microsecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const ColumnDescriptor * getLogicalColumn(const int column_id) const
std::vector< int16_t > def_levels
MaxRowGroupSizeStats validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema, const bool do_metadata_stats_validation)
void throw_row_group_larger_than_fragment_size_error(const MaxRowGroupSizeStats max_row_group_stats, const int fragment_size)
std::vector< int16_t > rep_levels
SQLTypeInfo suggest_time_mapping(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool getOptionAsBool(const std::string_view &key) const
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const ColumnDescriptor * column_descriptor_
const ColumnDescriptor * getColumnDescriptor(const int column_id) const
void set_precision(int d)
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_metadata_scan(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const bool geo_validate_geometry)
static SQLTypeInfo suggestColumnMapping(const parquet::ColumnDescriptor *parquet_column)
bool within_range(int64_t lower_bound, int64_t upper_bound, int64_t value)
HOST DEVICE void set_type(SQLTypes t)