OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LazyParquetChunkLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LazyParquetChunkLoader.h"
18 
19 #include <arrow/api.h>
20 #include <arrow/io/api.h>
21 #include <parquet/arrow/reader.h>
22 #include <parquet/column_scanner.h>
23 #include <parquet/exception.h>
24 #include <parquet/platform.h>
25 #include <parquet/statistics.h>
26 #include <parquet/types.h>
27 
30 #include "FsiChunkUtils.h"
35 #include "ParquetDecimalEncoder.h"
41 #include "ParquetStringEncoder.h"
44 #include "ParquetTimeEncoder.h"
47 #include "Shared/measure.h"
48 #include "Shared/misc.h"
51 
52 namespace foreign_storage {
53 
54 namespace {
55 
56 bool within_range(int64_t lower_bound, int64_t upper_bound, int64_t value) {
57  return value >= lower_bound && value <= upper_bound;
58 }
59 
60 bool is_valid_parquet_string(const parquet::ColumnDescriptor* parquet_column) {
61  return (parquet_column->logical_type()->is_none() &&
62  parquet_column->physical_type() == parquet::Type::BYTE_ARRAY) ||
63  parquet_column->logical_type()->is_string();
64 }
65 
102 bool is_valid_parquet_list_column(const parquet::ColumnDescriptor* parquet_column) {
103  const parquet::schema::Node* node = parquet_column->schema_node().get();
104  if ((node->name() != "element" && node->name() != "item") ||
105  !(node->is_required() ||
106  node->is_optional())) { // ensure first innermost node is named "element"
107  // which is required by the parquet specification;
108  // however testing shows that pyarrow generates this
109  // column with the name of "item"
110  // this field must be either required or optional
111  return false;
112  }
113  node = node->parent();
114  if (!node) { // required nested structure
115  return false;
116  }
117  if (node->name() != "list" || !node->is_repeated() ||
118  !node->is_group()) { // ensure second innermost node is named "list" which is
119  // a repeated group; this is
120  // required by the parquet specification
121  return false;
122  }
123  node = node->parent();
124  if (!node) { // required nested structure
125  return false;
126  }
127  if (!node->logical_type()->is_list() ||
128  !(node->is_optional() ||
129  node->is_required())) { // ensure third outermost node has logical type LIST
130  // which is either optional or required; this is required
131  // by the parquet specification
132  return false;
133  }
134  node =
135  node->parent(); // this must now be the root node of schema which is required by
136  // FSI (lists can not be embedded into a deeper nested structure)
137  if (!node) { // required nested structure
138  return false;
139  }
140  node = node->parent();
141  if (node) { // implies the previous node was not the root node
142  return false;
143  }
144  return true;
145 }
146 
147 template <typename V, typename NullType>
148 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder_with_omnisci_type(
149  const ColumnDescriptor* column_descriptor,
150  const parquet::ColumnDescriptor* parquet_column_descriptor,
151  AbstractBuffer* buffer) {
152  switch (parquet_column_descriptor->physical_type()) {
153  case parquet::Type::INT32:
154  return std::make_shared<ParquetDecimalEncoder<V, int32_t, NullType>>(
155  buffer, column_descriptor, parquet_column_descriptor);
156  case parquet::Type::INT64:
157  return std::make_shared<ParquetDecimalEncoder<V, int64_t, NullType>>(
158  buffer, column_descriptor, parquet_column_descriptor);
159  case parquet::Type::FIXED_LEN_BYTE_ARRAY:
160  return std::make_shared<
162  buffer, column_descriptor, parquet_column_descriptor);
163  case parquet::Type::BYTE_ARRAY:
164  return std::make_shared<ParquetDecimalEncoder<V, parquet::ByteArray, NullType>>(
165  buffer, column_descriptor, parquet_column_descriptor);
166  default:
167  UNREACHABLE();
168  }
169  return {};
170 }
171 
172 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder(
173  const ColumnDescriptor* omnisci_column,
174  const parquet::ColumnDescriptor* parquet_column,
175  AbstractBuffer* buffer,
176  const bool is_metadata_scan_or_for_import) {
177  if (parquet_column->logical_type()->is_decimal()) {
178  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
179  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int64_t>(
180  omnisci_column, parquet_column, buffer);
181  }
182  CHECK(omnisci_column->columnType.get_compression() == kENCODING_FIXED);
183  if (is_metadata_scan_or_for_import) {
184  switch (omnisci_column->columnType.get_comp_param()) {
185  case 16:
186  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int16_t>(
187  omnisci_column, parquet_column, buffer);
188  case 32:
189  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int32_t>(
190  omnisci_column, parquet_column, buffer);
191  default:
192  UNREACHABLE();
193  }
194  } else {
195  switch (omnisci_column->columnType.get_comp_param()) {
196  case 16:
197  return create_parquet_decimal_encoder_with_omnisci_type<int16_t, int16_t>(
198  omnisci_column, parquet_column, buffer);
199  case 32:
200  return create_parquet_decimal_encoder_with_omnisci_type<int32_t, int32_t>(
201  omnisci_column, parquet_column, buffer);
202  default:
203  UNREACHABLE();
204  }
205  }
206  }
207  return {};
208 }
209 
224 template <typename V, typename T, typename U, typename NullType>
225 std::shared_ptr<ParquetEncoder>
227  AbstractBuffer* buffer,
228  const size_t omnisci_data_type_byte_size,
229  const size_t parquet_data_type_byte_size,
230  const bool is_signed) {
231  CHECK(sizeof(NullType) == omnisci_data_type_byte_size);
232  if (is_signed) {
233  return std::make_shared<ParquetFixedLengthEncoder<V, T, NullType>>(
234  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
235  } else {
236  return std::make_shared<ParquetUnsignedFixedLengthEncoder<V, T, U, NullType>>(
237  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
238  }
239 }
240 
260 template <typename V, typename NullType>
262  AbstractBuffer* buffer,
263  const size_t omnisci_data_type_byte_size,
264  const size_t parquet_data_type_byte_size,
265  const int bit_width,
266  const bool is_signed) {
267  switch (bit_width) {
268  case 8:
270  int32_t,
271  uint8_t,
272  NullType>(
273  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
274  case 16:
276  int32_t,
277  uint16_t,
278  NullType>(
279  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
280  case 32:
282  int32_t,
283  uint32_t,
284  NullType>(
285  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
286  case 64:
288  int64_t,
289  uint64_t,
290  NullType>(
291  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
292  default:
293  UNREACHABLE();
294  }
295  return {};
296 }
297 
298 std::shared_ptr<ParquetEncoder> create_parquet_integral_encoder(
299  const ColumnDescriptor* omnisci_column,
300  const parquet::ColumnDescriptor* parquet_column,
301  AbstractBuffer* buffer,
302  const bool is_metadata_scan_or_for_import) {
303  auto column_type = omnisci_column->columnType;
304  auto physical_type = parquet_column->physical_type();
305 
306  int bit_width = -1;
307  int is_signed = false;
308  // handle the integral case with no Parquet annotation
309  if (parquet_column->logical_type()->is_none() && column_type.is_integer()) {
310  if (physical_type == parquet::Type::INT32) {
311  bit_width = 32;
312  } else if (physical_type == parquet::Type::INT64) {
313  bit_width = 64;
314  } else {
315  UNREACHABLE();
316  }
317  is_signed = true;
318  }
319  // handle the integral case with Parquet annotation
320  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
321  parquet_column->logical_type().get())) {
322  bit_width = int_logical_column->bit_width();
323  is_signed = int_logical_column->is_signed();
324  }
325 
326  if (bit_width == -1) { // no valid logical type (with or without annotation) found
327  return {};
328  }
329 
330  const size_t omnisci_data_type_byte_size = column_type.get_size();
331  const size_t parquet_data_type_byte_size = parquet::GetTypeByteSize(physical_type);
332 
333  switch (omnisci_data_type_byte_size) {
334  case 8:
335  CHECK(column_type.get_compression() == kENCODING_NONE);
336  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int64_t>(
337  buffer,
338  omnisci_data_type_byte_size,
339  parquet_data_type_byte_size,
340  bit_width,
341  is_signed);
342  case 4:
343  if (is_metadata_scan_or_for_import && column_type.get_type() == kBIGINT) {
344  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int32_t>(
345  buffer,
346  omnisci_data_type_byte_size,
347  parquet_data_type_byte_size,
348  bit_width,
349  is_signed);
350  }
351  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int32_t>(
352  buffer,
353  omnisci_data_type_byte_size,
354  parquet_data_type_byte_size,
355  bit_width,
356  is_signed);
357  case 2:
358  if (is_metadata_scan_or_for_import) {
359  switch (column_type.get_type()) {
360  case kBIGINT:
361  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int16_t>(
362  buffer,
363  omnisci_data_type_byte_size,
364  parquet_data_type_byte_size,
365  bit_width,
366  is_signed);
367  case kINT:
368  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int16_t>(
369  buffer,
370  omnisci_data_type_byte_size,
371  parquet_data_type_byte_size,
372  bit_width,
373  is_signed);
374  case kSMALLINT:
375  break;
376  default:
377  UNREACHABLE();
378  }
379  }
380  return create_parquet_integral_encoder_with_omnisci_type<int16_t, int16_t>(
381  buffer,
382  omnisci_data_type_byte_size,
383  parquet_data_type_byte_size,
384  bit_width,
385  is_signed);
386  case 1:
387  if (is_metadata_scan_or_for_import) {
388  switch (column_type.get_type()) {
389  case kBIGINT:
390  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int8_t>(
391  buffer,
392  omnisci_data_type_byte_size,
393  parquet_data_type_byte_size,
394  bit_width,
395  is_signed);
396  case kINT:
397  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int8_t>(
398  buffer,
399  omnisci_data_type_byte_size,
400  parquet_data_type_byte_size,
401  bit_width,
402  is_signed);
403  case kSMALLINT:
404  return create_parquet_integral_encoder_with_omnisci_type<int16_t, int8_t>(
405  buffer,
406  omnisci_data_type_byte_size,
407  parquet_data_type_byte_size,
408  bit_width,
409  is_signed);
410  case kTINYINT:
411  break;
412  default:
413  UNREACHABLE();
414  }
415  }
416  return create_parquet_integral_encoder_with_omnisci_type<int8_t, int8_t>(
417  buffer,
418  omnisci_data_type_byte_size,
419  parquet_data_type_byte_size,
420  bit_width,
421  is_signed);
422  default:
423  UNREACHABLE();
424  }
425  return {};
426 }
427 
428 std::shared_ptr<ParquetEncoder> create_parquet_floating_point_encoder(
429  const ColumnDescriptor* omnisci_column,
430  const parquet::ColumnDescriptor* parquet_column,
431  AbstractBuffer* buffer) {
432  auto column_type = omnisci_column->columnType;
433  if (!column_type.is_fp()) {
434  return {};
435  }
436  CHECK_EQ(column_type.get_compression(), kENCODING_NONE);
437  switch (column_type.get_type()) {
438  case kFLOAT:
439  switch (parquet_column->physical_type()) {
440  case parquet::Type::FLOAT:
441  return std::make_shared<ParquetFixedLengthEncoder<float, float>>(
442  buffer, omnisci_column, parquet_column);
443  case parquet::Type::DOUBLE:
444  return std::make_shared<ParquetFixedLengthEncoder<float, double>>(
445  buffer, omnisci_column, parquet_column);
446  default:
447  UNREACHABLE();
448  }
449  case kDOUBLE:
450  CHECK(parquet_column->physical_type() == parquet::Type::DOUBLE);
451  return std::make_shared<ParquetFixedLengthEncoder<double, double>>(
452  buffer, omnisci_column, parquet_column);
453  default:
454  UNREACHABLE();
455  }
456  return {};
457 }
458 
459 std::shared_ptr<ParquetEncoder> create_parquet_none_type_encoder(
460  const ColumnDescriptor* omnisci_column,
461  const parquet::ColumnDescriptor* parquet_column,
462  AbstractBuffer* buffer) {
463  auto column_type = omnisci_column->columnType;
464  if (parquet_column->logical_type()->is_none() &&
465  !omnisci_column->columnType.is_string()) { // boolean
466  if (column_type.get_compression() == kENCODING_NONE) {
467  switch (column_type.get_type()) {
468  case kBOOLEAN:
469  return std::make_shared<ParquetFixedLengthEncoder<int8_t, bool>>(
470  buffer, omnisci_column, parquet_column);
471  default:
472  UNREACHABLE();
473  }
474  } else {
475  UNREACHABLE();
476  }
477  }
478  return {};
479 }
480 
481 template <typename V, typename T, typename NullType>
482 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder_with_types(
483  const ColumnDescriptor* omnisci_column,
484  const parquet::ColumnDescriptor* parquet_column,
485  AbstractBuffer* buffer) {
486  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
487  parquet_column->logical_type().get())) {
488  switch (timestamp_logical_type->time_unit()) {
489  case parquet::LogicalType::TimeUnit::MILLIS:
490  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L, NullType>>(
491  buffer, omnisci_column, parquet_column);
492  case parquet::LogicalType::TimeUnit::MICROS:
493  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L, NullType>>(
494  buffer, omnisci_column, parquet_column);
495  case parquet::LogicalType::TimeUnit::NANOS:
496  return std::make_shared<
498  buffer, omnisci_column, parquet_column);
499  default:
500  UNREACHABLE();
501  }
502  } else {
503  UNREACHABLE();
504  }
505  return {};
506 }
507 
508 template <typename V, typename T, typename NullType>
510  const ColumnDescriptor* omnisci_column,
511  const parquet::ColumnDescriptor* parquet_column,
512  AbstractBuffer* buffer,
513  const bool is_metadata_scan_or_for_import) {
514  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
515  parquet_column->logical_type().get())) {
516  switch (timestamp_logical_type->time_unit()) {
517  case parquet::LogicalType::TimeUnit::MILLIS:
518  if (is_metadata_scan_or_for_import) {
519  return std::make_shared<
521  buffer, omnisci_column, parquet_column);
522  }
523  return std::make_shared<
525  buffer, omnisci_column, parquet_column);
526  case parquet::LogicalType::TimeUnit::MICROS:
527  if (is_metadata_scan_or_for_import) {
528  return std::make_shared<
530  buffer, omnisci_column, parquet_column);
531  }
532  return std::make_shared<
534  buffer, omnisci_column, parquet_column);
535  case parquet::LogicalType::TimeUnit::NANOS:
536  if (is_metadata_scan_or_for_import) {
537  return std::make_shared<
539  T,
540  1000L * 1000L * 1000L,
541  NullType>>(
542  buffer, omnisci_column, parquet_column);
543  }
544  return std::make_shared<
546  buffer, omnisci_column, parquet_column);
547  default:
548  UNREACHABLE();
549  }
550  } else {
551  UNREACHABLE();
552  }
553  return {};
554 }
555 
556 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder(
557  const ColumnDescriptor* omnisci_column,
558  const parquet::ColumnDescriptor* parquet_column,
559  AbstractBuffer* buffer,
560  const bool is_metadata_scan_or_for_import) {
561  auto column_type = omnisci_column->columnType;
562  auto precision = column_type.get_precision();
563  if (parquet_column->logical_type()->is_timestamp()) {
564  if (column_type.get_compression() == kENCODING_NONE) {
565  if (precision == 0) {
566  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int64_t>(
567  omnisci_column, parquet_column, buffer);
568  } else {
569  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
570  buffer, omnisci_column, parquet_column);
571  }
572  } else if (column_type.get_compression() == kENCODING_FIXED) {
573  CHECK(column_type.get_comp_param() == 32);
574  if (is_metadata_scan_or_for_import) {
575  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int32_t>(
576  omnisci_column, parquet_column, buffer);
577  } else {
578  return create_parquet_timestamp_encoder_with_types<int32_t, int64_t, int32_t>(
579  omnisci_column, parquet_column, buffer);
580  }
581  }
582  } else if (parquet_column->logical_type()->is_none() && column_type.is_timestamp()) {
583  if (parquet_column->physical_type() == parquet::Type::INT32) {
584  CHECK(column_type.get_compression() == kENCODING_FIXED &&
585  column_type.get_comp_param() == 32);
586  if (is_metadata_scan_or_for_import) {
587  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int32_t, int32_t>>(
588  buffer, omnisci_column, parquet_column);
589  } else {
590  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t, int32_t>>(
591  buffer, omnisci_column, parquet_column);
592  }
593  } else if (parquet_column->physical_type() == parquet::Type::INT64) {
594  if (column_type.get_compression() == kENCODING_NONE) {
595  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
596  buffer, omnisci_column, parquet_column);
597  } else if (column_type.get_compression() == kENCODING_FIXED) {
598  CHECK(column_type.get_comp_param() == 32);
599  if (is_metadata_scan_or_for_import) {
600  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int32_t>>(
601  buffer, omnisci_column, parquet_column);
602  } else {
603  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int64_t, int32_t>>(
604  buffer, omnisci_column, parquet_column);
605  }
606  }
607  } else {
608  UNREACHABLE();
609  }
610  }
611  return {};
612 }
613 
614 template <typename V, typename T, typename NullType>
615 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder_with_types(
616  const ColumnDescriptor* omnisci_column,
617  const parquet::ColumnDescriptor* parquet_column,
618  AbstractBuffer* buffer) {
619  if (auto time_logical_type = dynamic_cast<const parquet::TimeLogicalType*>(
620  parquet_column->logical_type().get())) {
621  switch (time_logical_type->time_unit()) {
622  case parquet::LogicalType::TimeUnit::MILLIS:
623  return std::make_shared<ParquetTimeEncoder<V, T, 1000L, NullType>>(
624  buffer, omnisci_column, parquet_column);
625  case parquet::LogicalType::TimeUnit::MICROS:
626  return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L, NullType>>(
627  buffer, omnisci_column, parquet_column);
628  case parquet::LogicalType::TimeUnit::NANOS:
629  return std::make_shared<
631  buffer, omnisci_column, parquet_column);
632  default:
633  UNREACHABLE();
634  }
635  } else {
636  UNREACHABLE();
637  }
638  return {};
639 }
640 
641 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder(
642  const ColumnDescriptor* omnisci_column,
643  const parquet::ColumnDescriptor* parquet_column,
644  AbstractBuffer* buffer,
645  const bool is_metadata_scan_or_for_import) {
646  auto column_type = omnisci_column->columnType;
647  if (auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
648  parquet_column->logical_type().get())) {
649  if (column_type.get_compression() == kENCODING_NONE) {
650  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
651  return create_parquet_time_encoder_with_types<int64_t, int32_t, int64_t>(
652  omnisci_column, parquet_column, buffer);
653  } else {
654  return create_parquet_time_encoder_with_types<int64_t, int64_t, int64_t>(
655  omnisci_column, parquet_column, buffer);
656  }
657  } else if (column_type.get_compression() == kENCODING_FIXED) {
658  if (is_metadata_scan_or_for_import) {
659  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
660  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
661  return create_parquet_time_encoder_with_types<int64_t, int32_t, int32_t>(
662  omnisci_column, parquet_column, buffer);
663  } else {
664  CHECK(time_logical_column->time_unit() ==
665  parquet::LogicalType::TimeUnit::MICROS ||
666  time_logical_column->time_unit() ==
667  parquet::LogicalType::TimeUnit::NANOS);
668  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
669  return create_parquet_time_encoder_with_types<int64_t, int64_t, int32_t>(
670  omnisci_column, parquet_column, buffer);
671  }
672  } else {
673  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
674  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
675  return create_parquet_time_encoder_with_types<int32_t, int32_t, int32_t>(
676  omnisci_column, parquet_column, buffer);
677  } else {
678  CHECK(time_logical_column->time_unit() ==
679  parquet::LogicalType::TimeUnit::MICROS ||
680  time_logical_column->time_unit() ==
681  parquet::LogicalType::TimeUnit::NANOS);
682  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
683  return create_parquet_time_encoder_with_types<int32_t, int64_t, int32_t>(
684  omnisci_column, parquet_column, buffer);
685  }
686  }
687  } else {
688  UNREACHABLE();
689  }
690  }
691  return {};
692 }
693 
694 std::shared_ptr<ParquetEncoder> create_parquet_date_from_timestamp_encoder(
695  const ColumnDescriptor* omnisci_column,
696  const parquet::ColumnDescriptor* parquet_column,
697  AbstractBuffer* buffer,
698  const bool is_metadata_scan_or_for_import) {
699  auto column_type = omnisci_column->columnType;
700  if (parquet_column->logical_type()->is_timestamp() && column_type.is_date()) {
701  CHECK(column_type.get_compression() == kENCODING_DATE_IN_DAYS);
702  if (is_metadata_scan_or_for_import) {
703  if (column_type.get_comp_param() ==
704  0) { // DATE ENCODING FIXED (32) uses comp param 0
706  int64_t,
707  int32_t>(
708  omnisci_column, parquet_column, buffer, true);
709  } else if (column_type.get_comp_param() == 16) {
711  int64_t,
712  int16_t>(
713  omnisci_column, parquet_column, buffer, true);
714  } else {
715  UNREACHABLE();
716  }
717  } else {
718  if (column_type.get_comp_param() ==
719  0) { // DATE ENCODING FIXED (32) uses comp param 0
721  int64_t,
722  int32_t>(
723  omnisci_column, parquet_column, buffer, false);
724  } else if (column_type.get_comp_param() == 16) {
726  int64_t,
727  int16_t>(
728  omnisci_column, parquet_column, buffer, false);
729  } else {
730  UNREACHABLE();
731  }
732  }
733  }
734  return {};
735 }
736 
737 std::shared_ptr<ParquetEncoder> create_parquet_date_encoder(
738  const ColumnDescriptor* omnisci_column,
739  const parquet::ColumnDescriptor* parquet_column,
740  AbstractBuffer* buffer,
741  const bool is_metadata_scan_or_for_import) {
742  auto column_type = omnisci_column->columnType;
743  if (parquet_column->logical_type()->is_date() && column_type.is_date()) {
744  if (column_type.get_compression() == kENCODING_DATE_IN_DAYS) {
745  if (is_metadata_scan_or_for_import) {
746  if (column_type.get_comp_param() ==
747  0) { // DATE ENCODING FIXED (32) uses comp param 0
748  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int32_t>>(
749  buffer);
750  } else if (column_type.get_comp_param() == 16) {
751  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int16_t>>(
752  buffer);
753  } else {
754  UNREACHABLE();
755  }
756  } else {
757  if (column_type.get_comp_param() ==
758  0) { // DATE ENCODING FIXED (32) uses comp param 0
759  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
760  buffer, omnisci_column, parquet_column);
761  } else if (column_type.get_comp_param() == 16) {
762  return std::make_shared<ParquetFixedLengthEncoder<int16_t, int32_t>>(
763  buffer, omnisci_column, parquet_column);
764  } else {
765  UNREACHABLE();
766  }
767  }
768  } else if (column_type.get_compression() == kENCODING_NONE) { // for array types
769  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int64_t>>(
770  buffer, omnisci_column, parquet_column);
771  } else {
772  UNREACHABLE();
773  }
774  }
775  return {};
776 }
777 
778 std::shared_ptr<ParquetEncoder> create_parquet_string_encoder(
779  const ColumnDescriptor* omnisci_column,
780  const parquet::ColumnDescriptor* parquet_column,
781  const Chunk_NS::Chunk& chunk,
782  StringDictionary* string_dictionary,
783  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
784  bool is_for_import,
785  const bool is_for_detect) {
786  auto column_type = omnisci_column->columnType;
787  if (!is_valid_parquet_string(parquet_column) ||
788  !omnisci_column->columnType.is_string()) {
789  return {};
790  }
791  if (column_type.get_compression() == kENCODING_NONE) {
792  if (is_for_import) {
793  return std::make_shared<ParquetStringImportEncoder>(chunk.getBuffer());
794  } else {
795  return std::make_shared<ParquetStringNoneEncoder>(chunk.getBuffer(),
796  chunk.getIndexBuf());
797  }
798  } else if (column_type.get_compression() == kENCODING_DICT) {
799  if (!is_for_detect) { // non-detect use case
800  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
801  std::unique_ptr<ChunkMetadata>& logical_chunk_metadata = chunk_metadata.back();
802  logical_chunk_metadata->sqlType = omnisci_column->columnType;
803  switch (column_type.get_size()) {
804  case 1:
805  return std::make_shared<ParquetStringEncoder<uint8_t>>(
806  chunk.getBuffer(),
807  string_dictionary,
808  is_for_import ? nullptr : logical_chunk_metadata.get());
809  case 2:
810  return std::make_shared<ParquetStringEncoder<uint16_t>>(
811  chunk.getBuffer(),
812  string_dictionary,
813  is_for_import ? nullptr : logical_chunk_metadata.get());
814  case 4:
815  return std::make_shared<ParquetStringEncoder<int32_t>>(
816  chunk.getBuffer(),
817  string_dictionary,
818  is_for_import ? nullptr : logical_chunk_metadata.get());
819  default:
820  UNREACHABLE();
821  }
822  } else { // detect use-case
823  return std::make_shared<ParquetDetectStringEncoder>(chunk.getBuffer());
824  }
825  } else {
826  UNREACHABLE();
827  }
828  return {};
829 }
830 
831 std::shared_ptr<ParquetEncoder> create_parquet_geospatial_encoder(
832  const ColumnDescriptor* omnisci_column,
833  const parquet::ColumnDescriptor* parquet_column,
834  std::list<Chunk_NS::Chunk>& chunks,
835  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
836  const bool is_metadata_scan,
837  const bool is_for_import,
838  const bool geo_validate_geometry) {
839  auto column_type = omnisci_column->columnType;
840  if (!is_valid_parquet_string(parquet_column) || !column_type.is_geometry()) {
841  return {};
842  }
843  if (is_for_import) {
844  return std::make_shared<ParquetGeospatialImportEncoder>(chunks,
845  geo_validate_geometry);
846  }
847  if (is_metadata_scan) {
848  return std::make_shared<ParquetGeospatialEncoder>(geo_validate_geometry);
849  }
850  for (auto chunks_iter = chunks.begin(); chunks_iter != chunks.end(); ++chunks_iter) {
851  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
852  auto& chunk_metadata_ptr = chunk_metadata.back();
853  chunk_metadata_ptr->sqlType = chunks_iter->getColumnDesc()->columnType;
854  }
855  return std::make_shared<ParquetGeospatialEncoder>(
856  parquet_column, chunks, chunk_metadata, geo_validate_geometry);
857 }
858 
859 // forward declare `create_parquet_array_encoder`: `create_parquet_encoder` and
860 // `create_parquet_array_encoder` each make use of each other, so
861 // one of the two functions must have a forward declaration
862 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
863  const ColumnDescriptor* omnisci_column,
864  const parquet::ColumnDescriptor* parquet_column,
865  std::list<Chunk_NS::Chunk>& chunks,
866  StringDictionary* string_dictionary,
867  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
868  const bool is_metadata_scan,
869  const bool is_for_import,
870  const bool is_for_detect,
871  const bool geo_validate_geometry);
872 
905 std::shared_ptr<ParquetEncoder> create_parquet_encoder(
906  const ColumnDescriptor* omnisci_column,
907  const parquet::ColumnDescriptor* parquet_column,
908  std::list<Chunk_NS::Chunk>& chunks,
909  StringDictionary* string_dictionary,
910  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
911  const bool is_metadata_scan,
912  const bool is_for_import,
913  const bool is_for_detect,
914  const bool geo_validate_geometry) {
915  CHECK(!(is_metadata_scan && is_for_import));
916  auto buffer = chunks.empty() ? nullptr : chunks.begin()->getBuffer();
917  if (auto encoder = create_parquet_geospatial_encoder(omnisci_column,
918  parquet_column,
919  chunks,
920  chunk_metadata,
921  is_metadata_scan,
922  is_for_import,
923  geo_validate_geometry)) {
924  return encoder;
925  }
926  if (auto encoder = create_parquet_array_encoder(omnisci_column,
927  parquet_column,
928  chunks,
929  string_dictionary,
930  chunk_metadata,
931  is_metadata_scan,
932  is_for_import,
933  is_for_detect,
934  geo_validate_geometry)) {
935  return encoder;
936  }
937  if (auto encoder = create_parquet_decimal_encoder(
938  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
939  return encoder;
940  }
941  if (auto encoder = create_parquet_integral_encoder(
942  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
943  return encoder;
944  }
945  if (auto encoder =
946  create_parquet_floating_point_encoder(omnisci_column, parquet_column, buffer)) {
947  return encoder;
948  }
949  if (auto encoder = create_parquet_timestamp_encoder(
950  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
951  return encoder;
952  }
953  if (auto encoder =
954  create_parquet_none_type_encoder(omnisci_column, parquet_column, buffer)) {
955  return encoder;
956  }
957  if (auto encoder = create_parquet_time_encoder(
958  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
959  return encoder;
960  }
962  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
963  return encoder;
964  }
965  if (auto encoder = create_parquet_date_encoder(
966  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
967  return encoder;
968  }
969  if (auto encoder = create_parquet_string_encoder(
970  omnisci_column,
971  parquet_column,
972  chunks.empty() ? Chunk_NS::Chunk{} : *chunks.begin(),
973  string_dictionary,
974  chunk_metadata,
975  is_for_import,
976  is_for_detect)) {
977  return encoder;
978  }
979  UNREACHABLE();
980  return {};
981 }
982 
986 std::shared_ptr<ParquetEncoder> create_parquet_encoder_for_import(
987  std::list<Chunk_NS::Chunk>& chunks,
988  const ColumnDescriptor* omnisci_column,
989  const parquet::ColumnDescriptor* parquet_column,
990  StringDictionary* string_dictionary,
991  const bool geo_validate_geometry) {
992  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
993  return create_parquet_encoder(omnisci_column,
994  parquet_column,
995  chunks,
996  string_dictionary,
997  chunk_metadata,
998  false,
999  true,
1000  false,
1001  geo_validate_geometry);
1002 }
1003 
1008 std::shared_ptr<ParquetEncoder> create_parquet_encoder_for_metadata_scan(
1009  const ColumnDescriptor* omnisci_column,
1010  const parquet::ColumnDescriptor* parquet_column,
1011  const bool geo_validate_geometry) {
1012  std::list<Chunk_NS::Chunk> chunks;
1013  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1014  return create_parquet_encoder(omnisci_column,
1015  parquet_column,
1016  chunks,
1017  nullptr,
1018  chunk_metadata,
1019  true,
1020  false,
1021  false,
1022  geo_validate_geometry);
1023 }
1024 
1025 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
1026  const ColumnDescriptor* omnisci_column,
1027  const parquet::ColumnDescriptor* parquet_column,
1028  std::list<Chunk_NS::Chunk>& chunks,
1029  StringDictionary* string_dictionary,
1030  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
1031  const bool is_metadata_scan,
1032  const bool is_for_import,
1033  const bool is_for_detect,
1034  const bool geo_validate_geometry) {
1035  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column);
1036  if (!is_valid_parquet_list || !omnisci_column->columnType.is_array()) {
1037  return {};
1038  }
1039  std::unique_ptr<ColumnDescriptor> omnisci_column_sub_type_column =
1040  get_sub_type_column_descriptor(omnisci_column);
1041  auto encoder = create_parquet_encoder(omnisci_column_sub_type_column.get(),
1042  parquet_column,
1043  chunks,
1044  string_dictionary,
1045  chunk_metadata,
1046  is_metadata_scan,
1047  is_for_import,
1048  is_for_detect,
1049  geo_validate_geometry);
1050  CHECK(encoder.get());
1051  auto scalar_encoder = std::dynamic_pointer_cast<ParquetScalarEncoder>(encoder);
1052  CHECK(scalar_encoder);
1053  if (!is_for_import) {
1054  if (!is_for_detect) {
1055  if (omnisci_column->columnType.is_fixlen_array()) {
1056  encoder = std::make_shared<ParquetFixedLengthArrayEncoder>(
1057  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
1058  scalar_encoder,
1059  omnisci_column);
1060  } else {
1061  encoder = std::make_shared<ParquetVariableLengthArrayEncoder>(
1062  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
1063  is_metadata_scan ? nullptr : chunks.begin()->getIndexBuf(),
1064  scalar_encoder,
1065  omnisci_column);
1066  }
1067  } else { // is_for_detect
1068  encoder = std::make_shared<ParquetArrayDetectEncoder>(
1069  chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1070  }
1071  } else { // is_for_import
1072  encoder = std::make_shared<ParquetArrayImportEncoder>(
1073  chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1074  }
1075  return encoder;
1076 }
1077 
1079  const parquet::ParquetFileReader* reader,
1080  const int row_group_index,
1081  const int column_index,
1082  const int16_t* def_levels,
1083  const int64_t num_levels,
1084  const parquet::ColumnDescriptor* parquet_column_descriptor) {
1085  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
1086  if (!is_valid_parquet_list) {
1087  return;
1088  }
1089  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1090  reader->metadata()->RowGroup(row_group_index);
1091  auto column_metadata = group_metadata->ColumnChunk(column_index);
1092  // In case of a empty row group do not validate
1093  if (group_metadata->num_rows() == 0) {
1094  return;
1095  }
1096  auto stats = validate_and_get_column_metadata_statistics(column_metadata.get());
1097  if (!stats->HasMinMax()) {
1098  auto find_it = std::find_if(def_levels,
1099  def_levels + num_levels,
1100  [](const int16_t def_level) { return def_level == 3; });
1101  if (find_it != def_levels + num_levels) {
1102  throw std::runtime_error(
1103  "No minimum and maximum statistic set in list column but non-null & non-empty "
1104  "array/value detected.");
1105  }
1106  }
1107 }
1108 
1115  const parquet::ColumnDescriptor* parquet_column_descriptor,
1116  std::vector<int16_t>& def_levels) {
1117  if (!is_valid_parquet_list_column(parquet_column_descriptor) &&
1118  parquet_column_descriptor->max_definition_level() == 0) {
1119  if (!parquet_column_descriptor->schema_node()->is_required()) {
1120  throw std::runtime_error(
1121  "Unsupported parquet column detected. Column '" +
1122  parquet_column_descriptor->path()->ToDotString() +
1123  "' detected to have max definition level of 0 but is optional.");
1124  }
1125  def_levels.assign(def_levels.size(), 1);
1126  }
1127 }
1128 
1130  const ColumnDescriptor* omnisci_column_descriptor,
1131  const parquet::ColumnDescriptor* parquet_column_descriptor) {
1132  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
1133  if (is_valid_parquet_list && !omnisci_column_descriptor->columnType.is_array()) {
1134  throw std::runtime_error(
1135  "Unsupported mapping detected. Column '" +
1136  parquet_column_descriptor->path()->ToDotString() +
1137  "' detected to be a parquet list but HeavyDB mapped column '" +
1138  omnisci_column_descriptor->columnName + "' is not an array.");
1139  }
1140  if (is_valid_parquet_list) {
1141  if (parquet_column_descriptor->max_repetition_level() != 1 ||
1142  parquet_column_descriptor->max_definition_level() != 3) {
1143  throw std::runtime_error(
1144  "Incorrect schema max repetition level detected in column '" +
1145  parquet_column_descriptor->path()->ToDotString() +
1146  "'. Expected a max repetition level of 1 and max definition level of 3 for "
1147  "list column but column has a max "
1148  "repetition level of " +
1149  std::to_string(parquet_column_descriptor->max_repetition_level()) +
1150  " and a max definition level of " +
1151  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
1152  }
1153  } else {
1154  if (parquet_column_descriptor->max_repetition_level() != 0 ||
1155  !(parquet_column_descriptor->max_definition_level() == 1 ||
1156  parquet_column_descriptor->max_definition_level() == 0)) {
1157  throw std::runtime_error(
1158  "Incorrect schema max repetition level detected in column '" +
1159  parquet_column_descriptor->path()->ToDotString() +
1160  "'. Expected a max repetition level of 0 and max definition level of 1 or 0 "
1161  "for "
1162  "flat column but column has a max "
1163  "repetition level of " +
1164  std::to_string(parquet_column_descriptor->max_repetition_level()) +
1165  " and a max definition level of " +
1166  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
1167  }
1168  }
1169 }
1170 
1171 void resize_values_buffer(const ColumnDescriptor* omnisci_column,
1172  const parquet::ColumnDescriptor* parquet_column,
1173  std::vector<int8_t>& values) {
1174  auto max_type_byte_size =
1175  std::max(omnisci_column->columnType.get_size(),
1176  parquet::GetTypeByteSize(parquet_column->physical_type()));
1177  size_t values_size =
1179  values.resize(values_size);
1180 }
1181 
1182 bool validate_decimal_mapping(const ColumnDescriptor* omnisci_column,
1183  const parquet::ColumnDescriptor* parquet_column) {
1184  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1185  parquet_column->logical_type().get())) {
1186  return omnisci_column->columnType.get_precision() ==
1187  decimal_logical_column->precision() &&
1188  omnisci_column->columnType.get_scale() == decimal_logical_column->scale() &&
1189  omnisci_column->columnType.is_decimal() &&
1190  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1191  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1192  }
1193  return false;
1194 }
1195 
1196 SQLTypeInfo suggest_decimal_mapping(const parquet::ColumnDescriptor* parquet_column) {
1197  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1198  parquet_column->logical_type().get())) {
1199  auto parquet_precision = decimal_logical_column->precision();
1200  auto parquet_scale = decimal_logical_column->scale();
1201  if (parquet_precision > sql_constants::kMaxNumericPrecision) {
1203  "Parquet column \"" + parquet_column->ToString() +
1204  "\" has decimal precision of " + std::to_string(parquet_precision) +
1205  " which is too high to import, maximum precision supported is " +
1207  }
1208  SQLTypeInfo type;
1209  type.set_type(kDECIMAL);
1211  type.set_precision(parquet_precision);
1212  type.set_scale(parquet_scale);
1213  type.set_fixed_size();
1214  return type;
1215  }
1216  UNREACHABLE()
1217  << " a Parquet column's decimal logical type failed to be read appropriately";
1218  return {};
1219 }
1220 
1222  const parquet::ColumnDescriptor* parquet_column) {
1223  if (!omnisci_column->columnType.is_fp()) {
1224  return false;
1225  }
1226  // check if mapping is a valid coerced or non-coerced floating point mapping
1227  // with no annotation (floating point columns have no annotation in the
1228  // Parquet specification)
1229  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1230  return (parquet_column->physical_type() == parquet::Type::DOUBLE) ||
1231  (parquet_column->physical_type() == parquet::Type::FLOAT &&
1232  omnisci_column->columnType.get_type() == kFLOAT);
1233  }
1234  return false;
1235 }
1236 
1238  const parquet::ColumnDescriptor* parquet_column) {
1239  SQLTypeInfo type;
1240  if (parquet_column->physical_type() == parquet::Type::FLOAT) {
1241  type.set_type(kFLOAT);
1242  } else if (parquet_column->physical_type() == parquet::Type::DOUBLE) {
1243  type.set_type(kDOUBLE);
1244  } else {
1245  UNREACHABLE();
1246  }
1248  type.set_fixed_size();
1249  return type;
1250 }
1251 
1252 bool validate_integral_mapping(const ColumnDescriptor* omnisci_column,
1253  const parquet::ColumnDescriptor* parquet_column) {
1254  if (!omnisci_column->columnType.is_integer()) {
1255  return false;
1256  }
1257  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1258  parquet_column->logical_type().get())) {
1259  CHECK(omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1260  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1261  const int bits_per_byte = 8;
1262  // unsigned types are permitted to map to a wider integral type in order to avoid
1263  // precision loss
1264  const int bit_widening_factor = int_logical_column->is_signed() ? 1 : 2;
1265  return omnisci_column->columnType.get_size() * bits_per_byte <=
1266  int_logical_column->bit_width() * bit_widening_factor;
1267  }
1268  // check if mapping is a valid coerced or non-coerced integral mapping with no
1269  // annotation
1270  if ((omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1271  omnisci_column->columnType.get_compression() == kENCODING_FIXED)) {
1272  return (parquet_column->physical_type() == parquet::Type::INT64) ||
1273  (parquet_column->physical_type() == parquet::Type::INT32 &&
1274  omnisci_column->columnType.get_size() <= 4);
1275  }
1276  return false;
1277 }
1278 
1279 SQLTypeInfo suggest_integral_mapping(const parquet::ColumnDescriptor* parquet_column) {
1280  SQLTypeInfo type;
1282  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1283  parquet_column->logical_type().get())) {
1284  auto bit_width = int_logical_column->bit_width();
1285  if (!int_logical_column->is_signed()) {
1286  if (within_range(33, 64, bit_width)) {
1288  "Unsigned integer column \"" + parquet_column->path()->ToDotString() +
1289  "\" in Parquet file with 64 bit-width has no supported type for ingestion "
1290  "that will not result in data loss");
1291  } else if (within_range(17, 32, bit_width)) {
1292  type.set_type(kBIGINT);
1293  } else if (within_range(9, 16, bit_width)) {
1294  type.set_type(kINT);
1295  } else if (within_range(0, 8, bit_width)) {
1296  type.set_type(kSMALLINT);
1297  }
1298  } else {
1299  if (within_range(33, 64, bit_width)) {
1300  type.set_type(kBIGINT);
1301  } else if (within_range(17, 32, bit_width)) {
1302  type.set_type(kINT);
1303  } else if (within_range(9, 16, bit_width)) {
1304  type.set_type(kSMALLINT);
1305  } else if (within_range(0, 8, bit_width)) {
1306  type.set_type(kTINYINT);
1307  }
1308  }
1309  type.set_fixed_size();
1310  return type;
1311  }
1312 
1313  CHECK(parquet_column->logical_type()->is_none());
1314  if (parquet_column->physical_type() == parquet::Type::INT32) {
1315  type.set_type(kINT);
1316  } else {
1317  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
1318  type.set_type(kBIGINT);
1319  }
1320  type.set_fixed_size();
1321  return type;
1322 }
1323 
1324 bool is_nanosecond_precision(const ColumnDescriptor* omnisci_column) {
1325  return omnisci_column->columnType.get_dimension() == 9;
1326 }
1327 
1329  const parquet::TimestampLogicalType* timestamp_logical_column) {
1330  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::NANOS;
1331 }
1332 
1333 bool is_microsecond_precision(const ColumnDescriptor* omnisci_column) {
1334  return omnisci_column->columnType.get_dimension() == 6;
1335 }
1336 
1338  const parquet::TimestampLogicalType* timestamp_logical_column) {
1339  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MICROS;
1340 }
1341 
1342 bool is_millisecond_precision(const ColumnDescriptor* omnisci_column) {
1343  return omnisci_column->columnType.get_dimension() == 3;
1344 }
1345 
1347  const parquet::TimestampLogicalType* timestamp_logical_column) {
1348  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS;
1349 }
1350 
1352  const parquet::ColumnDescriptor* parquet_column) {
1353  bool is_none_encoded_mapping =
1354  omnisci_column->columnType.get_compression() == kENCODING_NONE &&
1355  (parquet_column->physical_type() == parquet::Type::BOOLEAN &&
1356  omnisci_column->columnType.get_type() == kBOOLEAN);
1357  return parquet_column->logical_type()->is_none() && is_none_encoded_mapping;
1358 }
1359 
1361  const parquet::ColumnDescriptor* parquet_column) {
1362  SQLTypeInfo type;
1364  type.set_type(kBOOLEAN);
1365  type.set_fixed_size();
1366  return type;
1367 }
1368 
1370  const parquet::ColumnDescriptor* parquet_column) {
1371  if (!(omnisci_column->columnType.get_type() == kTIMESTAMP &&
1372  ((omnisci_column->columnType.get_compression() == kENCODING_NONE) ||
1373  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1374  omnisci_column->columnType.get_comp_param() == 32)))) {
1375  return false;
1376  }
1377  // check the annotated case
1378  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1379  parquet_column->logical_type().get())) {
1380  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1381  return omnisci_column->columnType.get_dimension() == 0 ||
1382  ((is_nanosecond_precision(omnisci_column) &&
1383  is_nanosecond_precision(timestamp_logical_column)) ||
1384  (is_microsecond_precision(omnisci_column) &&
1385  is_microsecond_precision(timestamp_logical_column)) ||
1386  (is_millisecond_precision(omnisci_column) &&
1387  is_millisecond_precision(timestamp_logical_column)));
1388  }
1389  if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
1390  return omnisci_column->columnType.get_dimension() == 0;
1391  }
1392  }
1393  // check the unannotated case
1394  if (parquet_column->logical_type()->is_none() &&
1395  ((parquet_column->physical_type() == parquet::Type::INT32 &&
1396  omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1397  omnisci_column->columnType.get_comp_param() == 32) ||
1398  parquet_column->physical_type() == parquet::Type::INT64)) {
1399  return true;
1400  }
1401  return false;
1402 }
1403 
1404 SQLTypeInfo suggest_timestamp_mapping(const parquet::ColumnDescriptor* parquet_column) {
1405  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1406  parquet_column->logical_type().get())) {
1407  SQLTypeInfo type;
1408  type.set_type(kTIMESTAMP);
1410  if (is_nanosecond_precision(timestamp_logical_column)) {
1411  type.set_precision(9);
1412  } else if (is_microsecond_precision(timestamp_logical_column)) {
1413  type.set_precision(6);
1414  } else if (is_millisecond_precision(timestamp_logical_column)) {
1415  type.set_precision(3);
1416  }
1417  type.set_fixed_size();
1418  return type;
1419  }
1420  UNREACHABLE();
1421  return {};
1422 }
1423 
1424 bool validate_time_mapping(const ColumnDescriptor* omnisci_column,
1425  const parquet::ColumnDescriptor* parquet_column) {
1426  if (!(omnisci_column->columnType.get_type() == kTIME &&
1427  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1428  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1429  omnisci_column->columnType.get_comp_param() == 32)))) {
1430  return false;
1431  }
1432  if (parquet_column->logical_type()->is_time()) {
1433  return true;
1434  }
1435  return false;
1436 }
1437 
1438 SQLTypeInfo suggest_time_mapping(const parquet::ColumnDescriptor* parquet_column) {
1439  CHECK(parquet_column->logical_type()->is_time());
1440  SQLTypeInfo type;
1441  type.set_type(kTIME);
1442  type.set_compression(kENCODING_NONE);
1443  type.set_fixed_size();
1444  return type;
1445 }
1446 
1447 bool validate_date_mapping(const ColumnDescriptor* omnisci_column,
1448  const parquet::ColumnDescriptor* parquet_column) {
1449  if (!(omnisci_column->columnType.get_type() == kDATE &&
1450  ((omnisci_column->columnType.get_compression() == kENCODING_DATE_IN_DAYS &&
1451  (omnisci_column->columnType.get_comp_param() ==
1452  0 // DATE ENCODING DAYS (32) specifies comp_param of 0
1453  || omnisci_column->columnType.get_comp_param() == 16)) ||
1454  omnisci_column->columnType.get_compression() ==
1455  kENCODING_NONE // for array types
1456  ))) {
1457  return false;
1458  }
1459  return parquet_column->logical_type()->is_date() ||
1460  parquet_column->logical_type()
1461  ->is_timestamp(); // to support TIMESTAMP -> DATE coercion
1462 }
1463 
1464 SQLTypeInfo suggest_date_mapping(const parquet::ColumnDescriptor* parquet_column) {
1465  CHECK(parquet_column->logical_type()->is_date());
1466  SQLTypeInfo type;
1467  type.set_type(kDATE);
1468  type.set_compression(kENCODING_NONE);
1469  type.set_fixed_size();
1470  return type;
1471 }
1472 
1473 bool validate_string_mapping(const ColumnDescriptor* omnisci_column,
1474  const parquet::ColumnDescriptor* parquet_column) {
1475  return is_valid_parquet_string(parquet_column) &&
1476  omnisci_column->columnType.is_string() &&
1477  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1478  omnisci_column->columnType.get_compression() == kENCODING_DICT);
1479 }
1480 
1481 SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor* parquet_column) {
1482  CHECK(is_valid_parquet_string(parquet_column));
1483  SQLTypeInfo type;
1484  type.set_type(kTEXT);
1486  type.set_comp_param(0); // `comp_param` is expected either to be zero or
1487  // equal to a string dictionary id in some code
1488  // paths, since we don't have a string dictionary we
1489  // set this to zero
1490  type.set_fixed_size();
1491  return type;
1492 }
1493 
1495  const parquet::ColumnDescriptor* parquet_column) {
1496  return is_valid_parquet_string(parquet_column) &&
1497  omnisci_column->columnType.is_geometry();
1498 }
1499 
1500 void validate_equal_schema(const parquet::arrow::FileReader* reference_file_reader,
1501  const parquet::arrow::FileReader* new_file_reader,
1502  const std::string& reference_file_path,
1503  const std::string& new_file_path) {
1504  const auto reference_num_columns =
1505  reference_file_reader->parquet_reader()->metadata()->num_columns();
1506  const auto new_num_columns =
1507  new_file_reader->parquet_reader()->metadata()->num_columns();
1508  if (reference_num_columns != new_num_columns) {
1509  throw std::runtime_error{"Parquet file \"" + new_file_path +
1510  "\" has a different schema. Please ensure that all Parquet "
1511  "files use the same schema. Reference Parquet file: \"" +
1512  reference_file_path + "\" has " +
1513  std::to_string(reference_num_columns) +
1514  " columns. New Parquet file \"" + new_file_path + "\" has " +
1515  std::to_string(new_num_columns) + " columns."};
1516  }
1517 
1518  for (int i = 0; i < reference_num_columns; i++) {
1519  validate_equal_column_descriptor(get_column_descriptor(reference_file_reader, i),
1520  get_column_descriptor(new_file_reader, i),
1521  reference_file_path,
1522  new_file_path);
1523  }
1524 }
1525 
1526 void validate_allowed_mapping(const parquet::ColumnDescriptor* parquet_column,
1527  const ColumnDescriptor* omnisci_column) {
1528  validate_max_repetition_and_definition_level(omnisci_column, parquet_column);
1529  bool allowed_type = false;
1530  if (omnisci_column->columnType.is_array()) {
1531  if (is_valid_parquet_list_column(parquet_column)) {
1532  auto omnisci_column_sub_type_column =
1533  get_sub_type_column_descriptor(omnisci_column);
1535  omnisci_column_sub_type_column.get(), parquet_column);
1536  }
1537  } else {
1538  allowed_type =
1539  LazyParquetChunkLoader::isColumnMappingSupported(omnisci_column, parquet_column);
1540  }
1541  if (!allowed_type) {
1542  auto logical_type = parquet_column->logical_type();
1543  if (logical_type->is_timestamp()) {
1544  auto timestamp_type =
1545  dynamic_cast<const parquet::TimestampLogicalType*>(logical_type.get());
1546  CHECK(timestamp_type);
1547 
1548  if (!timestamp_type->is_adjusted_to_utc()) {
1549  LOG(WARNING) << "Non-UTC timezone specified in Parquet file for column \""
1550  << omnisci_column->columnName
1551  << "\". Only UTC timezone is currently supported.";
1552  }
1553  }
1554  std::string parquet_type;
1555  parquet::Type::type physical_type = parquet_column->physical_type();
1556  if (parquet_column->logical_type()->is_none()) {
1557  parquet_type = parquet::TypeToString(physical_type);
1558  } else {
1559  parquet_type = logical_type->ToString();
1560  }
1561  std::string omnisci_type = omnisci_column->columnType.get_type_name();
1562  throw std::runtime_error{"Conversion from Parquet type \"" + parquet_type +
1563  "\" to HeavyDB type \"" + omnisci_type +
1564  "\" is not allowed. Please use an appropriate column type."};
1565  }
1566 }
1567 
1568 SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor* parquet_column) {
1569  // decimal case
1570  if (parquet_column->logical_type()->is_decimal()) {
1571  return suggest_decimal_mapping(parquet_column);
1572  }
1573  // float case
1574  if (parquet_column->logical_type()->is_none() &&
1575  (parquet_column->physical_type() == parquet::Type::FLOAT ||
1576  parquet_column->physical_type() == parquet::Type::DOUBLE)) {
1577  return suggest_floating_point_mapping(parquet_column);
1578  }
1579  // integral case
1580  if ((parquet_column->logical_type()->is_none() &&
1581  (parquet_column->physical_type() == parquet::Type::INT32 ||
1582  parquet_column->physical_type() == parquet::Type::INT64)) ||
1583  parquet_column->logical_type()->is_int()) {
1584  return suggest_integral_mapping(parquet_column);
1585  }
1586  // boolean case
1587  if (parquet_column->logical_type()->is_none() &&
1588  parquet_column->physical_type() == parquet::Type::BOOLEAN) {
1589  return suggest_boolean_type_mapping(parquet_column);
1590  }
1591  // timestamp case
1592  if (parquet_column->logical_type()->is_timestamp()) {
1593  return suggest_timestamp_mapping(parquet_column);
1594  }
1595  // time case
1596  if (parquet_column->logical_type()->is_time()) {
1597  return suggest_time_mapping(parquet_column);
1598  }
1599  // date case
1600  if (parquet_column->logical_type()->is_date()) {
1601  return suggest_date_mapping(parquet_column);
1602  }
1603  // string case
1604  if (is_valid_parquet_string(parquet_column)) {
1605  return suggest_string_mapping(parquet_column);
1606  }
1607 
1608  throw ForeignStorageException("Unsupported data type detected for column: " +
1609  parquet_column->ToString());
1610 }
1611 
1613  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1614  const std::string& file_path,
1615  const ForeignTableSchema& schema) {
1616  if (schema.numLogicalColumns() != file_metadata->num_columns()) {
1618  schema.numLogicalColumns(), file_metadata->num_columns(), file_path);
1619  }
1620 }
1621 
1622 void throw_missing_metadata_error(const int row_group_index,
1623  const int column_index,
1624  const std::string& file_path) {
1625  throw std::runtime_error{
1626  "Statistics metadata is required for all row groups. Metadata is missing for "
1627  "row group index: " +
1628  std::to_string(row_group_index) +
1629  ", column index: " + std::to_string(column_index) + ", file path: " + file_path};
1630 }
1631 
1635  std::string file_path;
1636 };
1637 
1639  const MaxRowGroupSizeStats max_row_group_stats,
1640  const int fragment_size) {
1641  auto metadata_scan_exception = MetadataScanInfeasibleFragmentSizeException{
1642  "Parquet file has a row group size that is larger than the fragment size. "
1643  "Please set the table fragment size to a number that is larger than the "
1644  "row group size. Row group index: " +
1645  std::to_string(max_row_group_stats.max_row_group_index) +
1646  ", row group size: " + std::to_string(max_row_group_stats.max_row_group_size) +
1647  ", fragment size: " + std::to_string(fragment_size) +
1648  ", file path: " + max_row_group_stats.file_path};
1649  metadata_scan_exception.min_feasible_fragment_size_ =
1650  max_row_group_stats.max_row_group_size;
1651  throw metadata_scan_exception;
1652 }
1653 
1655  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1656  const std::string& file_path,
1657  const ForeignTableSchema& schema,
1658  const bool do_metadata_stats_validation) {
1659  auto column_it = schema.getLogicalColumns().begin();
1660  MaxRowGroupSizeStats max_row_group_stats{0, 0};
1661  for (int i = 0; i < file_metadata->num_columns(); ++i, ++column_it) {
1662  const parquet::ColumnDescriptor* descr = file_metadata->schema()->Column(i);
1663  try {
1664  validate_allowed_mapping(descr, *column_it);
1665  } catch (std::runtime_error& e) {
1666  std::stringstream error_message;
1667  error_message << e.what() << " Parquet column: " << descr->path()->ToDotString()
1668  << ", HeavyDB column: " << (*column_it)->columnName
1669  << ", Parquet file: " << file_path << ".";
1670  throw std::runtime_error(error_message.str());
1671  }
1672 
1673  for (int r = 0; r < file_metadata->num_row_groups(); ++r) {
1674  auto group_metadata = file_metadata->RowGroup(r);
1675  auto num_rows = group_metadata->num_rows();
1676  if (num_rows == 0) {
1677  continue;
1678  } else if (num_rows > max_row_group_stats.max_row_group_size) {
1679  max_row_group_stats.max_row_group_size = num_rows;
1680  max_row_group_stats.max_row_group_index = r;
1681  max_row_group_stats.file_path = file_path;
1682  }
1683 
1684  if (do_metadata_stats_validation) {
1685  auto column_chunk = group_metadata->ColumnChunk(i);
1686  bool contains_metadata = column_chunk->is_stats_set();
1687  if (contains_metadata) {
1688  auto stats = column_chunk->statistics();
1689  bool is_all_nulls = stats->null_count() == column_chunk->num_values();
1690  bool is_list = is_valid_parquet_list_column(file_metadata->schema()->Column(i));
1691  // Given a list, it is possible it has no min or max if it is comprised
1692  // only of empty lists & nulls. This can not be detected by comparing
1693  // the null count; therefore we afford list types the benefit of the
1694  // doubt in this situation.
1695  if (!(stats->HasMinMax() || is_all_nulls || is_list)) {
1696  contains_metadata = false;
1697  }
1698  }
1699 
1700  if (!contains_metadata) {
1701  throw_missing_metadata_error(r, i, file_path);
1702  }
1703  }
1704  }
1705  }
1706  return max_row_group_stats;
1707 }
1708 
1710  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1711  const std::string& file_path,
1712  const ForeignTableSchema& schema,
1713  const bool do_metadata_stats_validation) {
1714  validate_number_of_columns(file_metadata, file_path, schema);
1716  file_metadata, file_path, schema, do_metadata_stats_validation);
1717 }
1718 
1719 std::list<RowGroupMetadata> metadata_scan_rowgroup_interval(
1720  const std::map<int, std::shared_ptr<ParquetEncoder>>& encoder_map,
1721  const RowGroupInterval& row_group_interval,
1722  const ReaderPtr& reader,
1723  const ForeignTableSchema& schema) {
1724  std::list<RowGroupMetadata> row_group_metadata;
1725  auto column_interval =
1726  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1727  schema.getLogicalAndPhysicalColumns().back()->columnId};
1728 
1729  auto file_metadata = reader->parquet_reader()->metadata();
1730  for (int row_group = row_group_interval.start_index;
1731  row_group <= row_group_interval.end_index;
1732  ++row_group) {
1733  auto& row_group_metadata_item = row_group_metadata.emplace_back();
1734  row_group_metadata_item.row_group_index = row_group;
1735  row_group_metadata_item.file_path = row_group_interval.file_path;
1736 
1737  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1738  file_metadata->RowGroup(row_group);
1739 
1740  for (int column_id = column_interval.start; column_id <= column_interval.end;
1741  column_id++) {
1742  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1743  auto parquet_column_index = schema.getParquetColumnIndex(column_id);
1744  auto encoder_map_iter =
1745  encoder_map.find(schema.getLogicalColumn(column_id)->columnId);
1746  CHECK(encoder_map_iter != encoder_map.end());
1747  try {
1748  auto metadata = encoder_map_iter->second->getRowGroupMetadata(
1749  group_metadata.get(), parquet_column_index, column_descriptor->columnType);
1750  row_group_metadata_item.column_chunk_metadata.emplace_back(metadata);
1751  } catch (const std::exception& e) {
1752  std::stringstream error_message;
1753  error_message << e.what() << " in row group " << row_group << " of Parquet file '"
1754  << row_group_interval.file_path << "'.";
1755  throw std::runtime_error(error_message.str());
1756  }
1757  }
1758  }
1759  return row_group_metadata;
1760 }
1761 
1762 std::map<int, std::shared_ptr<ParquetEncoder>> populate_encoder_map_for_import(
1763  const std::map<int, Chunk_NS::Chunk> chunks,
1764  const ForeignTableSchema& schema,
1765  const ReaderPtr& reader,
1766  const std::map<int, StringDictionary*> column_dictionaries,
1767  const int64_t num_rows,
1768  const bool geo_validate_geometry) {
1769  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1770  auto file_metadata = reader->parquet_reader()->metadata();
1771  for (auto& [column_id, chunk] : chunks) {
1772  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1773  if (column_descriptor->isGeoPhyCol) { // skip physical columns
1774  continue;
1775  }
1776  auto parquet_column_descriptor =
1777  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1778  auto find_it = column_dictionaries.find(column_id);
1779  StringDictionary* dictionary =
1780  (find_it == column_dictionaries.end() ? nullptr : find_it->second);
1781  std::list<Chunk_NS::Chunk> chunks_for_import;
1782  chunks_for_import.push_back(chunk);
1783  if (column_descriptor->columnType.is_geometry()) {
1784  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
1785  chunks_for_import.push_back(chunks.at(column_id + i + 1));
1786  }
1787  }
1788  encoder_map[column_id] = create_parquet_encoder_for_import(chunks_for_import,
1789  column_descriptor,
1790  parquet_column_descriptor,
1791  dictionary,
1792  geo_validate_geometry);
1793 
1794  // reserve space in buffer when num-elements known ahead of time for types
1795  // of known size (for example dictionary encoded strings)
1796  auto encoder = shared::get_from_map(encoder_map, column_id);
1797  if (auto inplace_encoder = dynamic_cast<ParquetInPlaceEncoder*>(encoder.get())) {
1798  inplace_encoder->reserve(num_rows);
1799  }
1800  }
1801  return encoder_map;
1802 }
1803 
1804 std::map<int, std::shared_ptr<ParquetEncoder>> populate_encoder_map_for_metadata_scan(
1805  const Interval<ColumnType>& column_interval,
1806  const ForeignTableSchema& schema,
1807  const ReaderPtr& reader,
1808  const bool do_metadata_stats_validation,
1809  const bool geo_validate_geometry) {
1810  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1811  auto file_metadata = reader->parquet_reader()->metadata();
1812  for (int column_id = column_interval.start; column_id <= column_interval.end;
1813  column_id++) {
1814  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1815  auto parquet_column_descriptor =
1816  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1817  encoder_map[column_id] = create_parquet_encoder_for_metadata_scan(
1818  column_descriptor, parquet_column_descriptor, geo_validate_geometry);
1819  if (!do_metadata_stats_validation) {
1820  shared::get_from_map(encoder_map, column_id)->disableMetadataStatsValidation();
1821  }
1822  column_id += column_descriptor->columnType.get_physical_cols();
1823  }
1824  return encoder_map;
1825 }
1826 } // namespace
1827 
1828 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::appendRowGroups(
1829  const std::vector<RowGroupInterval>& row_group_intervals,
1830  const int parquet_column_index,
1831  const ColumnDescriptor* column_descriptor,
1832  std::list<Chunk_NS::Chunk>& chunks,
1833  StringDictionary* string_dictionary,
1834  RejectedRowIndices* rejected_row_indices,
1835  const bool is_for_detect,
1836  const std::optional<int64_t> max_rows_to_read) {
1837  auto timer = DEBUG_TIMER(__func__);
1838  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1839  // `def_levels` and `rep_levels` below are used to store the read definition
1840  // and repetition levels of the Dremel encoding implemented by the Parquet
1841  // format
1842  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1843  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1844  std::vector<int8_t> values;
1845 
1846  // Timing information used in logging
1847  Timer<> summary_timer;
1848  Timer<> initialization_timer_ms;
1849  Timer<> validation_timer_ms;
1850  Timer<> parquet_read_timer_ms;
1851  Timer<> encoding_timer_ms;
1852  size_t total_row_groups_read = 0;
1853 
1854  summary_timer.start();
1855 
1856  initialization_timer_ms.start();
1857  CHECK(!row_group_intervals.empty());
1858  const auto& first_file_path = row_group_intervals.front().file_path;
1859 
1860  auto first_file_reader = file_reader_cache_->getOrInsert(first_file_path, file_system_);
1861  auto first_parquet_column_descriptor =
1862  get_column_descriptor(first_file_reader, parquet_column_index);
1863  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
1864 
1865  const bool geo_validate_geometry =
1867  auto encoder = create_parquet_encoder(column_descriptor,
1868  first_parquet_column_descriptor,
1869  chunks,
1870  string_dictionary,
1871  chunk_metadata,
1872  false,
1873  false,
1874  is_for_detect,
1875  geo_validate_geometry);
1876  CHECK(encoder.get());
1877 
1878  if (rejected_row_indices) { // error tracking is enabled
1879  encoder->initializeErrorTracking();
1880  }
1881  encoder->initializeColumnType(column_descriptor->columnType);
1882  initialization_timer_ms.stop();
1883 
1884  bool early_exit = false;
1885  int64_t total_rows_read = 0;
1886  for (const auto& row_group_interval : row_group_intervals) {
1887  initialization_timer_ms.start();
1888  const auto& file_path = row_group_interval.file_path;
1889  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
1890 
1891  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1892  CHECK(row_group_interval.start_index >= 0 &&
1893  row_group_interval.end_index < num_row_groups);
1894  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1895 
1896  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1897  auto parquet_column_descriptor =
1898  get_column_descriptor(file_reader, parquet_column_index);
1899 
1900  initialization_timer_ms.stop();
1901 
1902  validation_timer_ms.start();
1903  validate_equal_column_descriptor(first_parquet_column_descriptor,
1904  parquet_column_descriptor,
1905  first_file_path,
1906  file_path);
1907 
1909  parquet_column_descriptor);
1911  def_levels);
1912  validation_timer_ms.stop();
1913 
1914  int64_t values_read = 0;
1915  for (int row_group_index = row_group_interval.start_index;
1916  row_group_index <= row_group_interval.end_index;
1917  ++row_group_index) {
1918  total_row_groups_read++;
1919  parquet_read_timer_ms.start();
1920  auto group_reader = parquet_reader->RowGroup(row_group_index);
1921  std::shared_ptr<parquet::ColumnReader> col_reader =
1922  group_reader->Column(parquet_column_index);
1923  parquet_read_timer_ms.stop();
1924 
1925  try {
1926  while (col_reader->HasNext()) {
1927  parquet_read_timer_ms.start();
1928  int64_t levels_read =
1930  def_levels.data(),
1931  rep_levels.data(),
1932  reinterpret_cast<uint8_t*>(values.data()),
1933  &values_read,
1934  col_reader.get());
1935  parquet_read_timer_ms.stop();
1936 
1937  encoding_timer_ms.start();
1938  if (rejected_row_indices) { // error tracking is enabled
1939  encoder->appendDataTrackErrors(def_levels.data(),
1940  rep_levels.data(),
1941  values_read,
1942  levels_read,
1943  values.data());
1944  } else { // no error tracking enabled
1946  parquet_reader, // this validation only in effect for foreign tables
1947  row_group_index,
1948  parquet_column_index,
1949  def_levels.data(),
1950  levels_read,
1951  parquet_column_descriptor);
1952 
1953  encoder->appendData(def_levels.data(),
1954  rep_levels.data(),
1955  values_read,
1956  levels_read,
1957  values.data());
1958  }
1959  encoding_timer_ms.stop();
1960 
1961  if (max_rows_to_read.has_value()) {
1962  if (column_descriptor->columnType.is_array()) {
1963  auto array_encoder =
1964  dynamic_cast<ParquetArrayDetectEncoder*>(encoder.get());
1965  CHECK(array_encoder);
1966  total_rows_read = array_encoder->getArraysCount();
1967  } else {
1968  // For scalar types it is safe to assume the number of levels read is equal
1969  // to the number of rows read
1970  total_rows_read += levels_read;
1971  }
1972 
1973  if (total_rows_read >= max_rows_to_read.value()) {
1974  early_exit = true;
1975  break;
1976  }
1977  }
1978  }
1979  encoding_timer_ms.start();
1980  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1981  array_encoder->finalizeRowGroup();
1982  }
1983  encoding_timer_ms.stop();
1984  } catch (const std::exception& error) {
1985  // check for a specific error to detect a possible unexpected switch of data
1986  // source in order to respond with informative error message
1987  if (boost::regex_search(error.what(),
1988  boost::regex{"Deserializing page header failed."})) {
1990  "Unable to read from foreign data source, possible cause is an unexpected "
1991  "change of source. Please use the \"REFRESH FOREIGN TABLES\" command on "
1992  "the "
1993  "foreign table "
1994  "if data source has been updated. Foreign table: " +
1996  }
1997 
1999  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
2000  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
2001  "', Parquet file: '" + file_path + "'");
2002  }
2003  if (max_rows_to_read.has_value() && early_exit) {
2004  break;
2005  }
2006  }
2007  if (max_rows_to_read.has_value() && early_exit) {
2008  break;
2009  }
2010  }
2011 
2012  encoding_timer_ms.start();
2013  if (rejected_row_indices) { // error tracking is enabled
2014  *rejected_row_indices = encoder->getRejectedRowIndices();
2015  }
2016  encoding_timer_ms.stop();
2017 
2018  summary_timer.stop();
2019 
2020  VLOG(1) << "Appended " << total_row_groups_read
2021  << " row groups to chunk. Column: " << column_descriptor->columnName
2022  << ", Column id: " << column_descriptor->columnId << ", Parquet column: "
2023  << first_parquet_column_descriptor->path()->ToDotString();
2024  VLOG(1) << "Runtime summary:";
2025  VLOG(1) << " Parquet chunk loading total time: " << summary_timer.elapsed() << "ms";
2026  VLOG(1) << " Parquet encoder initialization time: " << initialization_timer_ms.elapsed()
2027  << "ms";
2028  VLOG(1) << " Parquet metadata validation time: " << validation_timer_ms.elapsed()
2029  << "ms";
2030  VLOG(1) << " Parquet column read time: " << parquet_read_timer_ms.elapsed() << "ms";
2031  VLOG(1) << " Parquet data conversion time: " << encoding_timer_ms.elapsed() << "ms";
2032 
2033  return chunk_metadata;
2034 }
2035 
2037  const parquet::ColumnDescriptor* parquet_column) {
2038  auto type = suggest_column_scalar_type(parquet_column);
2039 
2040  // array case
2041  if (is_valid_parquet_list_column(parquet_column)) {
2042  return type.get_array_type();
2043  }
2044 
2045  return type;
2046 }
2047 
2049  const ColumnDescriptor* omnisci_column,
2050  const parquet::ColumnDescriptor* parquet_column) {
2051  CHECK(!omnisci_column->columnType.is_array())
2052  << "isColumnMappingSupported should not be called on arrays";
2053  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
2054  return true;
2055  }
2056  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
2057  return true;
2058  }
2059  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
2060  return true;
2061  }
2062  if (validate_integral_mapping(omnisci_column, parquet_column)) {
2063  return true;
2064  }
2065  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
2066  return true;
2067  }
2068  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
2069  return true;
2070  }
2071  if (validate_time_mapping(omnisci_column, parquet_column)) {
2072  return true;
2073  }
2074  if (validate_date_mapping(omnisci_column, parquet_column)) {
2075  return true;
2076  }
2077  if (validate_string_mapping(omnisci_column, parquet_column)) {
2078  return true;
2079  }
2080  return false;
2081 }
2082 
2084  std::shared_ptr<arrow::fs::FileSystem> file_system,
2085  FileReaderMap* file_map,
2086  const ForeignTable* foreign_table)
2087  : file_system_(file_system)
2088  , file_reader_cache_(file_map)
2089  , foreign_table_(foreign_table) {
2090  CHECK(foreign_table_) << "LazyParquetChunkLoader: null Foreign Table ptr";
2091 }
2092 
2093 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::loadChunk(
2094  const std::vector<RowGroupInterval>& row_group_intervals,
2095  const int parquet_column_index,
2096  std::list<Chunk_NS::Chunk>& chunks,
2097  StringDictionary* string_dictionary,
2098  RejectedRowIndices* rejected_row_indices) {
2099  CHECK(!chunks.empty());
2100  auto const& chunk = *chunks.begin();
2101  auto column_descriptor = chunk.getColumnDesc();
2102  auto buffer = chunk.getBuffer();
2103  CHECK(buffer);
2104 
2105  try {
2106  auto metadata = appendRowGroups(row_group_intervals,
2107  parquet_column_index,
2108  column_descriptor,
2109  chunks,
2110  string_dictionary,
2111  rejected_row_indices);
2112  return metadata;
2113  } catch (const std::exception& error) {
2114  throw ForeignStorageException(error.what());
2115  }
2116 
2117  return {};
2118 }
2119 
2122  : def_levels(LazyParquetChunkLoader::batch_reader_num_elements)
2123  , rep_levels(LazyParquetChunkLoader::batch_reader_num_elements) {}
2124  std::vector<int16_t> def_levels;
2125  std::vector<int16_t> rep_levels;
2126  std::vector<int8_t> values;
2127  int64_t values_read;
2128  int64_t levels_read;
2129 };
2130 
2132  public:
2133  ParquetRowGroupReader(std::shared_ptr<parquet::ColumnReader> col_reader,
2134  const ColumnDescriptor* column_descriptor,
2135  const parquet::ColumnDescriptor* parquet_column_descriptor,
2136  ParquetEncoder* encoder,
2137  InvalidRowGroupIndices& invalid_indices,
2138  const int row_group_index,
2139  const int parquet_column_index,
2140  const parquet::ParquetFileReader* parquet_reader)
2141  : col_reader_(col_reader)
2142  , column_descriptor_(column_descriptor)
2143  , parquet_column_descriptor_(parquet_column_descriptor)
2144  , encoder_(encoder)
2145  , invalid_indices_(invalid_indices)
2146  , row_group_index_(row_group_index)
2147  , parquet_column_index_(parquet_column_index)
2148  , parquet_reader_(parquet_reader) {
2149  import_encoder = dynamic_cast<ParquetImportEncoder*>(encoder);
2151  }
2152 
2154  while (col_reader_->HasNext()) {
2155  ParquetBatchData batch_data;
2158  batch_data.levels_read =
2160  batch_data.def_levels.data(),
2161  batch_data.rep_levels.data(),
2162  reinterpret_cast<uint8_t*>(batch_data.values.data()),
2163  &batch_data.values_read,
2164  col_reader_.get());
2171  batch_data.def_levels.data(),
2172  batch_data.levels_read,
2174  import_encoder->validateAndAppendData(batch_data.def_levels.data(),
2175  batch_data.rep_levels.data(),
2176  batch_data.values_read,
2177  batch_data.levels_read,
2178  batch_data.values.data(),
2179  column_type,
2181  }
2182  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder_)) {
2183  array_encoder->finalizeRowGroup();
2184  }
2185  }
2186 
2187  void eraseInvalidRowGroupData(const InvalidRowGroupIndices& invalid_indices) {
2188  import_encoder->eraseInvalidIndicesInBuffer(invalid_indices);
2189  }
2190 
2191  private:
2192  std::shared_ptr<parquet::ColumnReader> col_reader_;
2194  const parquet::ColumnDescriptor* parquet_column_descriptor_;
2198  const int row_group_index_;
2200  const parquet::ParquetFileReader* parquet_reader_;
2201 };
2202 
2203 std::pair<size_t, size_t> LazyParquetChunkLoader::loadRowGroups(
2204  const RowGroupInterval& row_group_interval,
2205  const std::map<int, Chunk_NS::Chunk>& chunks,
2206  const ForeignTableSchema& schema,
2207  const std::map<int, StringDictionary*>& column_dictionaries,
2208  const int num_threads) {
2209  auto timer = DEBUG_TIMER(__func__);
2210 
2211  const auto& file_path = row_group_interval.file_path;
2212 
2213  // do not use caching with file-readers, open a new one for every request
2214  auto file_reader_owner = open_parquet_table(file_path, file_system_);
2215  auto file_reader = file_reader_owner.get();
2216  auto file_metadata = file_reader->parquet_reader()->metadata();
2217 
2218  validate_number_of_columns(file_metadata, file_path, schema);
2219 
2220  // check for fixed length encoded columns and indicate to the user
2221  // they should not be used
2222  for (const auto column_descriptor : schema.getLogicalColumns()) {
2223  auto parquet_column_index = schema.getParquetColumnIndex(column_descriptor->columnId);
2224  auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
2225  try {
2226  validate_allowed_mapping(parquet_column, column_descriptor);
2227  } catch (std::runtime_error& e) {
2228  std::stringstream error_message;
2229  error_message << e.what()
2230  << " Parquet column: " << parquet_column->path()->ToDotString()
2231  << ", HeavyDB column: " << column_descriptor->columnName
2232  << ", Parquet file: " << file_path << ".";
2233  throw std::runtime_error(error_message.str());
2234  }
2235  }
2236 
2237  CHECK(row_group_interval.start_index == row_group_interval.end_index);
2238  auto row_group_index = row_group_interval.start_index;
2239  std::map<int, ParquetRowGroupReader> row_group_reader_map;
2240 
2241  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
2242  auto group_reader = parquet_reader->RowGroup(row_group_index);
2243 
2244  std::vector<InvalidRowGroupIndices> invalid_indices_per_thread(num_threads);
2245 
2246  const bool geo_validate_geometry =
2248  auto encoder_map = populate_encoder_map_for_import(chunks,
2249  schema,
2250  file_reader,
2251  column_dictionaries,
2252  group_reader->metadata()->num_rows(),
2253  geo_validate_geometry);
2254 
2255  std::vector<std::set<int>> partitions(num_threads);
2256  std::map<int, int> column_id_to_thread;
2257  for (auto& [column_id, encoder] : encoder_map) {
2258  auto thread_id = column_id % num_threads;
2259  column_id_to_thread[column_id] = thread_id;
2260  partitions[thread_id].insert(column_id);
2261  }
2262 
2263  for (auto& [column_id, encoder] : encoder_map) {
2264  const auto& column_descriptor = schema.getColumnDescriptor(column_id);
2265  const auto parquet_column_index = schema.getParquetColumnIndex(column_id);
2266  auto parquet_column_descriptor =
2267  file_metadata->schema()->Column(parquet_column_index);
2268 
2269  // validate
2270  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
2271  CHECK(row_group_interval.start_index >= 0 &&
2272  row_group_interval.end_index < num_row_groups);
2273  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
2275  parquet_column_descriptor);
2276 
2277  std::shared_ptr<parquet::ColumnReader> col_reader =
2278  group_reader->Column(parquet_column_index);
2279 
2280  row_group_reader_map.insert(
2281  {column_id,
2282  ParquetRowGroupReader(col_reader,
2283  column_descriptor,
2284  parquet_column_descriptor,
2285  shared::get_from_map(encoder_map, column_id).get(),
2286  invalid_indices_per_thread[shared::get_from_map(
2287  column_id_to_thread, column_id)],
2288  row_group_index,
2289  parquet_column_index,
2290  parquet_reader)});
2291  }
2292 
2293  std::vector<std::future<void>> futures;
2294  for (int ithread = 0; ithread < num_threads; ++ithread) {
2295  auto column_ids_for_thread = partitions[ithread];
2296  futures.emplace_back(
2297  std::async(std::launch::async, [&row_group_reader_map, column_ids_for_thread] {
2298  for (const auto column_id : column_ids_for_thread) {
2299  shared::get_from_map(row_group_reader_map, column_id)
2300  .readAndValidateRowGroup(); // reads and validate entire row group per
2301  // column
2302  }
2303  }));
2304  }
2305 
2306  for (auto& future : futures) {
2307  future.wait();
2308  }
2309 
2310  for (auto& future : futures) {
2311  future.get();
2312  }
2313 
2314  // merge/reduce invalid indices
2315  InvalidRowGroupIndices invalid_indices;
2316  for (auto& thread_invalid_indices : invalid_indices_per_thread) {
2317  invalid_indices.merge(thread_invalid_indices);
2318  }
2319 
2320  for (auto& [_, reader] : row_group_reader_map) {
2321  reader.eraseInvalidRowGroupData(
2322  invalid_indices); // removes invalid encoded data in buffers
2323  }
2324 
2325  // update the element count for each encoder
2326  for (const auto column_descriptor : schema.getLogicalColumns()) {
2327  auto column_id = column_descriptor->columnId;
2328  auto db_encoder = shared::get_from_map(chunks, column_id).getBuffer()->getEncoder();
2329  CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
2330  invalid_indices.size());
2331  size_t updated_num_elems = db_encoder->getNumElems() +
2332  group_reader->metadata()->num_rows() -
2333  invalid_indices.size();
2334  db_encoder->setNumElems(updated_num_elems);
2335  if (column_descriptor->columnType.is_geometry()) {
2336  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
2337  auto db_encoder =
2338  shared::get_from_map(chunks, column_id + i + 1).getBuffer()->getEncoder();
2339  db_encoder->setNumElems(updated_num_elems);
2340  }
2341  }
2342  }
2343 
2344  return {group_reader->metadata()->num_rows() - invalid_indices.size(),
2345  invalid_indices.size()};
2346 }
2347 
2349  std::vector<std::unique_ptr<TypedParquetDetectBuffer>> detect_buffers;
2350  std::vector<Chunk_NS::Chunk> column_chunks;
2351  std::vector<std::unique_ptr<RejectedRowIndices>> rejected_row_indices_per_column;
2352  std::list<ColumnDescriptor> column_descriptors;
2353 };
2354 
2355 DataPreview LazyParquetChunkLoader::previewFiles(const std::vector<std::string>& files,
2356  const size_t max_num_rows,
2357  const ForeignTable& foreign_table) {
2358  CHECK(!files.empty());
2359 
2360  auto first_file = *files.begin();
2361  auto first_file_reader = file_reader_cache_->getOrInsert(*files.begin(), file_system_);
2362 
2363  for (auto current_file_it = ++files.begin(); current_file_it != files.end();
2364  ++current_file_it) {
2365  auto file_reader = file_reader_cache_->getOrInsert(*current_file_it, file_system_);
2366  validate_equal_schema(first_file_reader, file_reader, first_file, *current_file_it);
2367  }
2368 
2369  auto first_file_metadata = first_file_reader->parquet_reader()->metadata();
2370  auto num_columns = first_file_metadata->num_columns();
2371 
2372  DataPreview data_preview;
2373  data_preview.num_rejected_rows = 0;
2374 
2375  auto current_file_it = files.begin();
2376  while (data_preview.sample_rows.size() < max_num_rows &&
2377  current_file_it != files.end()) {
2378  size_t total_num_rows = data_preview.sample_rows.size();
2379  size_t max_num_rows_to_append = max_num_rows - data_preview.sample_rows.size();
2380 
2381  // gather enough rows in row groups to produce required samples
2382  std::vector<RowGroupInterval> row_group_intervals;
2383  for (; current_file_it != files.end(); ++current_file_it) {
2384  const auto& file_path = *current_file_it;
2385  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
2386  auto file_metadata = file_reader->parquet_reader()->metadata();
2387  auto num_row_groups = file_metadata->num_row_groups();
2388  int end_row_group = 0;
2389  for (int i = 0; i < num_row_groups && total_num_rows < max_num_rows; ++i) {
2390  const size_t next_num_rows = file_metadata->RowGroup(i)->num_rows();
2391  total_num_rows += next_num_rows;
2392  end_row_group = i;
2393  }
2394  row_group_intervals.push_back(RowGroupInterval{file_path, 0, end_row_group});
2395  }
2396 
2397  PreviewContext preview_context;
2398  for (int i = 0; i < num_columns; ++i) {
2399  auto col = first_file_metadata->schema()->Column(i);
2400  ColumnDescriptor& cd = preview_context.column_descriptors.emplace_back();
2401  auto sql_type = LazyParquetChunkLoader::suggestColumnMapping(col);
2402  cd.columnType = sql_type;
2403  cd.columnName =
2404  sql_type.is_array() ? col->path()->ToDotVector()[0] + "_array" : col->name();
2405  cd.isSystemCol = false;
2406  cd.isVirtualCol = false;
2407  cd.tableId = -1;
2408  cd.columnId = i + 1;
2409  data_preview.column_names.emplace_back(cd.columnName);
2410  data_preview.column_types.emplace_back(sql_type);
2411  preview_context.detect_buffers.push_back(
2412  std::make_unique<TypedParquetDetectBuffer>());
2413  preview_context.rejected_row_indices_per_column.push_back(
2414  std::make_unique<RejectedRowIndices>());
2415  auto& detect_buffer = preview_context.detect_buffers.back();
2416  auto& chunk = preview_context.column_chunks.emplace_back(&cd);
2417  chunk.setPinnable(false);
2418  chunk.setBuffer(detect_buffer.get());
2419  }
2420 
2421  std::function<void(const std::vector<int>&)> append_row_groups_for_column =
2422  [&](const std::vector<int>& column_indices) {
2423  for (const auto& column_index : column_indices) {
2424  auto& chunk = preview_context.column_chunks[column_index];
2425  auto chunk_list = std::list<Chunk_NS::Chunk>{chunk};
2426  auto& rejected_row_indices =
2427  preview_context.rejected_row_indices_per_column[column_index];
2428  appendRowGroups(row_group_intervals,
2429  column_index,
2430  chunk.getColumnDesc(),
2431  chunk_list,
2432  nullptr,
2433  rejected_row_indices.get(),
2434  true,
2435  max_num_rows_to_append);
2436  }
2437  };
2438 
2439  auto num_threads = foreign_storage::get_num_threads(foreign_table);
2440 
2441  std::vector<int> columns(num_columns);
2442  std::iota(columns.begin(), columns.end(), 0);
2443  auto futures =
2444  create_futures_for_workers(columns, num_threads, append_row_groups_for_column);
2445  for (auto& future : futures) {
2446  future.wait();
2447  }
2448  for (auto& future : futures) {
2449  future.get();
2450  }
2451 
2452  // merge all `rejected_row_indices_per_column`
2453  auto rejected_row_indices = std::make_unique<RejectedRowIndices>();
2454  for (int i = 0; i < num_columns; ++i) {
2455  rejected_row_indices->insert(
2456  preview_context.rejected_row_indices_per_column[i]->begin(),
2457  preview_context.rejected_row_indices_per_column[i]->end());
2458  }
2459 
2460  size_t num_rows = 0;
2461  auto buffers_it = preview_context.detect_buffers.begin();
2462  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2463  CHECK(buffers_it != preview_context.detect_buffers.end());
2464  auto& strings = buffers_it->get()->getStrings();
2465  if (i == 0) {
2466  num_rows = strings.size();
2467  } else {
2468  CHECK_EQ(num_rows, strings.size());
2469  }
2470  }
2471 
2472  size_t num_rejected_rows = rejected_row_indices->size();
2473  data_preview.num_rejected_rows += num_rejected_rows;
2474  CHECK_GE(num_rows, num_rejected_rows);
2475  auto row_count = num_rows - num_rejected_rows;
2476 
2477  auto offset_row = data_preview.sample_rows.size();
2478  data_preview.sample_rows.resize(std::min(offset_row + row_count, max_num_rows));
2479 
2480  for (size_t irow = 0, rows_appended = 0;
2481  irow < num_rows && offset_row + rows_appended < max_num_rows;
2482  ++irow) {
2483  if (rejected_row_indices->find(irow) != rejected_row_indices->end()) {
2484  continue;
2485  }
2486  auto& row_data = data_preview.sample_rows[offset_row + rows_appended];
2487  row_data.resize(num_columns);
2488  auto buffers_it = preview_context.detect_buffers.begin();
2489  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2490  CHECK(buffers_it != preview_context.detect_buffers.end());
2491  auto& strings = buffers_it->get()->getStrings();
2492  row_data[i] = strings[irow];
2493  }
2494  ++rows_appended;
2495  }
2496  }
2497 
2498  // attempt to detect geo columns
2499  for (int i = 0; i < num_columns; ++i) {
2500  auto type_info = data_preview.column_types[i];
2501  if (type_info.is_string()) {
2502  auto tentative_geo_type =
2504  if (tentative_geo_type.has_value()) {
2505  data_preview.column_types[i].set_type(tentative_geo_type.value());
2506  data_preview.column_types[i].set_compression(kENCODING_NONE);
2507  }
2508  }
2509  }
2510 
2511  return data_preview;
2512 }
2513 
2514 std::list<RowGroupMetadata> LazyParquetChunkLoader::metadataScan(
2515  const std::vector<std::string>& file_paths,
2516  const ForeignTableSchema& schema,
2517  const bool do_metadata_stats_validation) {
2518  auto timer = DEBUG_TIMER(__func__);
2519  auto column_interval =
2520  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
2521  schema.getLogicalAndPhysicalColumns().back()->columnId};
2522  CHECK(!file_paths.empty());
2523 
2524  // The encoder map needs to be populated before we can start scanning rowgroups, so we
2525  // peel the first file_path out of the async loop below to perform population.
2526  const auto& first_path = *file_paths.begin();
2527  auto first_reader = file_reader_cache_->insert(first_path, file_system_);
2528  auto max_row_group_stats =
2529  validate_parquet_metadata(first_reader->parquet_reader()->metadata(),
2530  first_path,
2531  schema,
2532  do_metadata_stats_validation);
2533 
2534  // Iterate asynchronously over any paths beyond the first.
2535  auto table_ptr = schema.getForeignTable();
2536  CHECK(table_ptr);
2537  auto num_threads = foreign_storage::get_num_threads(*table_ptr);
2538  VLOG(1) << "Metadata scan using " << num_threads << " threads";
2539 
2540  const bool geo_validate_geometry =
2542  auto encoder_map = populate_encoder_map_for_metadata_scan(column_interval,
2543  schema,
2544  first_reader,
2545  do_metadata_stats_validation,
2546  geo_validate_geometry);
2547  const auto num_row_groups = get_parquet_table_size(first_reader).first;
2548  VLOG(1) << "Starting metadata scan of path " << first_path;
2549  auto row_group_metadata = metadata_scan_rowgroup_interval(
2550  encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
2551  VLOG(1) << "Completed metadata scan of path " << first_path;
2552 
2553  // We want each (filepath->FileReader) pair in the cache to be initialized before we
2554  // multithread so that we are not adding keys in a concurrent environment, so we add
2555  // cache entries for each path and initialize to an empty unique_ptr if the file has not
2556  // yet been opened.
2557  // Since we have already performed the first iteration, we skip it in the thread groups
2558  // so as not to process it twice.
2559  std::vector<std::string> cache_subset;
2560  for (auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
2562  cache_subset.emplace_back(*path_it);
2563  }
2564 
2565  auto paths_per_thread = partition_for_threads(cache_subset, num_threads);
2566  std::vector<std::future<std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats>>>
2567  futures;
2568  for (const auto& path_group : paths_per_thread) {
2569  futures.emplace_back(std::async(
2571  [&](const auto& paths, const auto& file_reader_cache)
2572  -> std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats> {
2573  Timer<> summary_timer;
2574  Timer<> get_or_insert_reader_timer_ms;
2575  Timer<> validation_timer_ms;
2576  Timer<> metadata_scan_timer;
2577 
2578  summary_timer.start();
2579 
2580  std::list<RowGroupMetadata> reduced_metadata;
2581  MaxRowGroupSizeStats max_row_group_stats{0, 0};
2582  for (const auto& path : paths.get()) {
2583  get_or_insert_reader_timer_ms.start();
2584  auto reader = file_reader_cache.get().getOrInsert(path, file_system_);
2585  get_or_insert_reader_timer_ms.stop();
2586 
2587  validation_timer_ms.start();
2588  validate_equal_schema(first_reader, reader, first_path, path);
2589  auto local_max_row_group_stats =
2590  validate_parquet_metadata(reader->parquet_reader()->metadata(),
2591  path,
2592  schema,
2593  do_metadata_stats_validation);
2594  if (local_max_row_group_stats.max_row_group_size >
2595  max_row_group_stats.max_row_group_size) {
2596  max_row_group_stats = local_max_row_group_stats;
2597  }
2598  validation_timer_ms.stop();
2599 
2600  VLOG(1) << "Starting metadata scan of path " << path;
2601 
2602  metadata_scan_timer.start();
2603  const auto num_row_groups = get_parquet_table_size(reader).first;
2604  const auto interval = RowGroupInterval{path, 0, num_row_groups - 1};
2605  reduced_metadata.splice(
2606  reduced_metadata.end(),
2607  metadata_scan_rowgroup_interval(encoder_map, interval, reader, schema));
2608  metadata_scan_timer.stop();
2609 
2610  VLOG(1) << "Completed metadata scan of path " << path;
2611  }
2612 
2613  summary_timer.stop();
2614 
2615  VLOG(1) << "Runtime summary:";
2616  VLOG(1) << " Parquet metadata scan total time: " << summary_timer.elapsed()
2617  << "ms";
2618  VLOG(1) << " Parquet file reader opening time: "
2619  << get_or_insert_reader_timer_ms.elapsed() << "ms";
2620  VLOG(1) << " Parquet metadata validation time: "
2621  << validation_timer_ms.elapsed() << "ms";
2622  VLOG(1) << " Parquet metadata processing time: "
2623  << validation_timer_ms.elapsed() << "ms";
2624 
2625  return {reduced_metadata, max_row_group_stats};
2626  },
2627  std::ref(path_group),
2628  std::ref(*file_reader_cache_)));
2629  }
2630 
2631  // Reduce all the row_group results.
2632  for (auto& future : futures) {
2633  auto [metadata, local_max_row_group_stats] = future.get();
2634  row_group_metadata.splice(row_group_metadata.end(), metadata);
2635  if (local_max_row_group_stats.max_row_group_size >
2636  max_row_group_stats.max_row_group_size) {
2637  max_row_group_stats = local_max_row_group_stats;
2638  }
2639  }
2640 
2641  if (max_row_group_stats.max_row_group_size > schema.getForeignTable()->maxFragRows) {
2643  max_row_group_stats, schema.getForeignTable()->maxFragRows);
2644  }
2645 
2646  return row_group_metadata;
2647 }
2648 
2649 } // namespace foreign_storage
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
std::shared_ptr< parquet::ColumnReader > col_reader_
std::list< ColumnDescriptor > column_descriptors
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:392
void set_compression(EncodingType c)
Definition: sqltypes.h:481
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:148
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::shared_ptr< ParquetEncoder > create_parquet_signed_or_unsigned_integral_encoder_with_types(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const bool is_signed)
Create a signed or unsigned integral parquet encoder using types.
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::optional< SQLTypes > detect_geo_type(const SampleRows &sample_rows, size_t column_index)
Definition: DataPreview.cpp:22
Definition: measure.h:55
std::vector< Chunk_NS::Chunk > column_chunks
static constexpr int32_t kMaxNumericPrecision
Definition: sqltypes.h:58
Definition: sqltypes.h:76
std::vector< std::string > column_names
Definition: DataPreview.h:28
std::string tableName
std::vector< SQLTypeInfo > column_types
Definition: DataPreview.h:29
const parquet::ParquetFileReader * parquet_reader_
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: FsiChunkUtils.h:41
#define LOG(tag)
Definition: Logger.h:285
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
bool is_fp() const
Definition: sqltypes.h:573
HOST DEVICE int get_scale() const
Definition: sqltypes.h:396
void stop()
Definition: measure.h:64
size_t get_num_threads(const ForeignTable &table)
std::shared_ptr< ParquetEncoder > create_parquet_geospatial_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool geo_validate_geometry)
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
const parquet::ColumnDescriptor * parquet_column_descriptor_
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
#define CHECK_GE(x, y)
Definition: Logger.h:306
bool is_nanosecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< ParquetEncoder > create_parquet_none_type_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
SQLTypeInfo suggest_decimal_mapping(const parquet::ColumnDescriptor *parquet_column)
MaxRowGroupSizeStats validate_column_mapping_and_row_group_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema, const bool do_metadata_stats_validation)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
dictionary stats
Definition: report.py:116
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect, const bool geo_validate_geometry)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
void validate_list_column_metadata_statistics(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::string to_string(char const *&&v)
int getParquetColumnIndex(const int column_id) const
std::shared_ptr< ParquetEncoder > create_parquet_array_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect, const bool geo_validate_geometry)
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
SQLTypeInfo suggest_timestamp_mapping(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_date_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
void throw_missing_metadata_error(const int row_group_index, const int column_index, const std::string &file_path)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
void set_definition_levels_for_zero_max_definition_level_case(const parquet::ColumnDescriptor *parquet_column_descriptor, std::vector< int16_t > &def_levels)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
future< Result > async(Fn &&fn, Args &&...args)
bool is_fixlen_array() const
Definition: sqltypes.h:591
std::set< int64_t > InvalidRowGroupIndices
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder_with_omnisci_type(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const int bit_width, const bool is_signed)
Create a integral parquet encoder using types.
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
void set_fixed_size()
Definition: sqltypes.h:479
SQLTypeInfo suggest_date_mapping(const parquet::ColumnDescriptor *parquet_column)
bool is_integer() const
Definition: sqltypes.h:567
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:70
void set_scale(int s)
Definition: sqltypes.h:475
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
SQLTypeInfo suggest_floating_point_mapping(const parquet::ColumnDescriptor *parquet_column)
An AbstractBuffer is a unit of data management for a data manager.
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
specifies the content in-memory of a row in the column metadata table
std::pair< size_t, size_t > loadRowGroups(const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries, const int num_threads=1)
Load row groups of data into given chunks.
parquet::arrow::FileReader * ReaderPtr
Definition: ParquetShared.h:33
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_string_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const Chunk_NS::Chunk &chunk, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, bool is_for_import, const bool is_for_detect)
int get_precision() const
Definition: sqltypes.h:394
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder_with_omnisci_type(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, AbstractBuffer *buffer)
void set_comp_param(int p)
Definition: sqltypes.h:482
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
Definition: sqltypes.h:79
Definition: sqltypes.h:80
SQLTypeInfo suggest_integral_mapping(const parquet::ColumnDescriptor *parquet_column)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
Definition: ForeignTable.h:49
void eraseInvalidRowGroupData(const InvalidRowGroupIndices &invalid_indices)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
AbstractBuffer * getBuffer() const
Definition: Chunk.h:146
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_import(std::list< Chunk_NS::Chunk > &chunks, const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, StringDictionary *string_dictionary, const bool geo_validate_geometry)
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
virtual void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices)=0
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:79
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:393
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries, const int64_t num_rows, const bool geo_validate_geometry)
std::string get_type_name() const
Definition: sqltypes.h:484
void initializeIfEmpty(const std::string &path)
Definition: ParquetShared.h:86
virtual void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices)=0
std::set< int64_t > RejectedRowIndices
DataPreview previewFiles(const std::vector< std::string > &files, const size_t max_num_rows, const ForeignTable &table)
Preview rows of data and column types in a set of files.
std::vector< std::unique_ptr< TypedParquetDetectBuffer > > detect_buffers
std::vector< std::unique_ptr< RejectedRowIndices > > rejected_row_indices_per_column
std::shared_ptr< ParquetEncoder > create_parquet_floating_point_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:402
ThreadId thread_id()
Definition: Logger.cpp:879
TimeT::rep elapsed()
Definition: measure.h:72
bool is_millisecond_precision(const ColumnDescriptor *omnisci_column)
ParquetRowGroupReader(std::shared_ptr< parquet::ColumnReader > col_reader, const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, ParquetEncoder *encoder, InvalidRowGroupIndices &invalid_indices, const int row_group_index, const int parquet_column_index, const parquet::ParquetFileReader *parquet_reader)
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:597
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool is_valid_parquet_string(const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader, const bool do_metadata_stats_validation, const bool geo_validate_geometry)
LazyParquetChunkLoader(std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache, const ForeignTable *foreign_table)
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
SQLTypeInfo suggest_boolean_type_mapping(const parquet::ColumnDescriptor *parquet_column)
bool is_microsecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
Definition: sqltypes.h:72
SQLTypeInfo columnType
void start()
Definition: measure.h:59
const ColumnDescriptor * getLogicalColumn(const int column_id) const
bool is_string() const
Definition: sqltypes.h:561
MaxRowGroupSizeStats validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema, const bool do_metadata_stats_validation)
void throw_row_group_larger_than_fragment_size_error(const MaxRowGroupSizeStats max_row_group_stats, const int fragment_size)
bool is_decimal() const
Definition: sqltypes.h:570
SQLTypeInfo suggest_time_mapping(const parquet::ColumnDescriptor *parquet_column)
std::string columnName
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool getOptionAsBool(const std::string_view &key) const
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool is_array() const
Definition: sqltypes.h:585
#define VLOG(n)
Definition: Logger.h:388
const ColumnDescriptor * getColumnDescriptor(const int column_id) const
void set_precision(int d)
Definition: sqltypes.h:473
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_metadata_scan(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const bool geo_validate_geometry)
static SQLTypeInfo suggestColumnMapping(const parquet::ColumnDescriptor *parquet_column)
bool within_range(int64_t lower_bound, int64_t upper_bound, int64_t value)
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:470