24 #include <parquet/schema.h>
25 #include <parquet/types.h>
30 namespace foreign_storage {
35 const size_t omnisci_data_type_byte_size,
36 const size_t parquet_data_type_byte_size)
41 virtual void reserve(
const size_t num_elements) = 0;
58 const int16_t* rep_levels,
59 const int64_t values_read,
60 const int64_t levels_read,
61 int8_t* values)
override {
63 for (int64_t i = 0; i < values_read; ++i) {
69 if (values_read < levels_read) {
77 for (int64_t i = levels_read - 1; i >= 0; --i) {
92 const int16_t* def_levels,
93 const int64_t values_read,
94 const int64_t levels_read,
95 const bool do_encoding) {
96 for (int64_t i = levels_read - 1, j = values_read - 1; i >= 0; --i) {
113 template <
typename V,
typename T,
typename NullType = V>
118 const parquet::ColumnDescriptor* parquet_column_descriptor)
122 parquet::GetTypeByteSize(parquet_column_descriptor->physical_type()))
124 if (
auto detect_buffer = dynamic_cast<TypedParquetDetectBuffer*>(
buffer_)) {
130 const size_t omnisci_data_type_byte_size,
131 const size_t parquet_data_type_byte_size)
134 if (
auto detect_buffer = dynamic_cast<TypedParquetDetectBuffer*>(
buffer_)) {
158 auto null_value = get_null_value<NullType>();
159 if (element == null_value) {
169 const auto& element =
reinterpret_cast<const V*
>(bytes)[0];
175 CHECK(detect_buffer);
176 std::function<std::string(const V&)> element_to_string = [
this](
const V& element) {
179 detect_buffer->setConverterType<V>(element_to_string);
183 const int64_t j)
const override {
187 void reserve(
const size_t num_append_elements)
override {
192 const int16_t* rep_levels,
193 const int64_t values_read,
194 const int64_t levels_read,
195 int8_t* values)
override {
198 for (i = 0, j = 0; i < levels_read; ++i) {
201 CHECK(j < values_read);
203 }
catch (
const std::runtime_error& error) {
211 appendData(def_levels, rep_levels, values_read, levels_read, values);
218 const int16_t* rep_levels,
219 const int64_t values_read,
220 const int64_t levels_read,
225 for (i = 0, j = 0; i < levels_read; ++i) {
228 CHECK(j < values_read);
230 }
catch (
const std::runtime_error& error) {
236 appendData(def_levels, rep_levels, values_read, levels_read, values);
241 if (invalid_indices.empty()) {
248 omnisci_data_values, omnisci_data_values + num_elements, [&](
const V& value) {
249 const V* start = omnisci_data_values;
250 auto index = std::distance(start, &value);
251 return invalid_indices.find(index) != invalid_indices.end();
254 CHECK(num_bytes_erased <= buffer_->size());
265 const int16_t* rep_levels,
266 const int64_t values_read,
267 const int64_t levels_read,
268 int8_t* values)
override {
269 if (std::is_same<V, T>::value && values_read == levels_read) {
271 for (int64_t i = 0; i < levels_read; ++i) {
273 values + i * omnisci_data_type_byte_size_);
279 def_levels, rep_levels, values_read, levels_read, values);
284 int8_t* omnisci_data_bytes,
285 const size_t num_elements)
override {
286 auto parquet_data_ptr =
reinterpret_cast<const T*
>(parquet_data_bytes);
287 auto omnisci_data_ptr =
reinterpret_cast<V*
>(omnisci_data_bytes);
288 for (
size_t i = 0; i < num_elements; ++i) {
289 encodeAndCopy(reinterpret_cast<const int8_t*>(&parquet_data_ptr[i]),
290 reinterpret_cast<int8_t*>(&omnisci_data_ptr[i]));
294 void setNull(int8_t* omnisci_data_bytes)
override {
295 auto& omnisci_data_value =
reinterpret_cast<V*
>(omnisci_data_bytes)[0];
296 omnisci_data_value = get_null_value<NullType>();
299 void copy(
const int8_t* omnisci_data_bytes_source,
300 int8_t* omnisci_data_bytes_destination)
override {
301 const auto& omnisci_data_value_source =
302 reinterpret_cast<const V*
>(omnisci_data_bytes_source)[0];
303 auto& omnisci_data_value_destination =
304 reinterpret_cast<V*
>(omnisci_data_bytes_destination)[0];
305 omnisci_data_value_destination = omnisci_data_value_source;
309 const parquet::RowGroupMetaData* group_metadata,
310 const int parquet_column_index,
313 auto column_metadata = group_metadata->ColumnChunk(parquet_column_index);
316 auto parquet_column_descriptor =
317 group_metadata->schema()->Column(parquet_column_index);
321 if (
stats->HasMinMax()) {
323 if (
auto parquet_scalar_validator =
324 dynamic_cast<ParquetMetadataValidator*>(
this)) {
326 parquet_scalar_validator->validate(
329 }
catch (
const std::exception& e) {
330 std::stringstream error_message;
332 << e.what() <<
" Error validating statistics of Parquet column '"
333 << group_metadata->schema()->Column(parquet_column_index)->name() <<
"'";
334 throw std::runtime_error(error_message.str());
339 auto updated_chunk_stats =
getUpdatedStats(stats_min, stats_max, column_type);
340 metadata->fillChunkStats(updated_chunk_stats.min,
341 updated_chunk_stats.max,
342 metadata->chunkStats.has_nulls);
344 auto null_count =
stats->null_count();
345 validateNullCount(group_metadata->schema()->Column(parquet_column_index)->name(),
348 metadata->chunkStats.has_nulls = null_count > 0;
355 * column_metadata->num_values();
356 metadata->numElements = group_metadata->num_rows();
365 T stats_min =
reinterpret_cast<T*
>(stats->EncodeMin().data())[0];
366 T stats_max =
reinterpret_cast<T*
>(stats->EncodeMax().data())[0];
367 return {stats_min, stats_max};
380 sizeof(V), reinterpret_cast<int8_t*>(&stats_min),
false,
DoNothingDeleter());
382 sizeof(V), reinterpret_cast<int8_t*>(&stats_max),
false,
DoNothingDeleter());
383 std::vector<ArrayDatum> min_max_datums{min_datum, max_datum};
384 encoder->updateStats(&min_max_datums, 0, 1);
386 encoder->updateStats(reinterpret_cast<int8_t*>(&stats_min), 1);
387 encoder->updateStats(reinterpret_cast<int8_t*>(&stats_max), 1);
389 auto updated_chunk_stats_metadata = std::make_shared<ChunkMetadata>();
390 encoder->getMetadata(updated_chunk_stats_metadata);
391 return updated_chunk_stats_metadata->chunkStats;
395 const parquet::ColumnDescriptor* parquet_column_descriptor,
396 std::shared_ptr<parquet::Statistics>
stats) {
397 V stats_min, stats_max;
398 auto min_string = stats->EncodeMin();
399 auto max_string = stats->EncodeMax();
400 if constexpr (std::is_same<T, parquet::FixedLenByteArray>::value) {
401 CHECK_EQ(parquet_column_descriptor->physical_type(),
402 parquet::Type::FIXED_LEN_BYTE_ARRAY);
403 parquet::FixedLenByteArray min_byte_array, max_byte_array;
404 min_byte_array.ptr =
reinterpret_cast<const uint8_t*
>(min_string.data());
405 max_byte_array.ptr =
reinterpret_cast<const uint8_t*
>(max_string.data());
407 reinterpret_cast<int8_t*>(&stats_min));
409 reinterpret_cast<int8_t*>(&stats_max));
410 }
else if constexpr (std::is_same<T, parquet::ByteArray>::value) {
411 CHECK_EQ(parquet_column_descriptor->physical_type(), parquet::Type::BYTE_ARRAY);
412 parquet::ByteArray min_byte_array, max_byte_array;
413 min_byte_array.ptr =
reinterpret_cast<const uint8_t*
>(min_string.data());
414 min_byte_array.len = min_string.length();
415 max_byte_array.ptr =
reinterpret_cast<const uint8_t*
>(max_string.data());
416 max_byte_array.len = max_string.length();
418 reinterpret_cast<int8_t*>(&stats_min));
420 reinterpret_cast<int8_t*>(&stats_max));
423 reinterpret_cast<int8_t*>(&stats_min));
425 reinterpret_cast<int8_t*>(&stats_max));
427 return {stats_min, stats_max};
ParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
bool is_error_tracking_enabled_
bool is_timestamp() const
RejectedRowIndices invalid_indices_
bool isIntegralType(const SQLTypeInfo &type) const
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination) override
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
void reserve(const size_t num_append_elements) override
std::string elementToString(const V &element) const
virtual int8_t * getMemoryPtr()=0
void validate(const int8_t *parquet_data, const int64_t j, const SQLTypeInfo &column_type) const override
void appendDataTrackErrors(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
void setNull(int8_t *omnisci_data_bytes) override
virtual void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes)=0
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
std::string encodedDataToString(const int8_t *bytes) const override
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const ColumnDescriptor *column_desciptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
virtual bool encodingIsIdentityForSameTypes() const
std::set< int64_t > InvalidRowGroupIndices
static void validateNullCount(const std::string &parquet_column_name, int64_t null_count, const SQLTypeInfo &column_type)
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
virtual void setNull(int8_t *omnisci_data_bytes)=0
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
std::pair< T, T > getUnencodedStats(std::shared_ptr< parquet::Statistics > stats) const
static std::shared_ptr< ChunkMetadata > createMetadata(const SQLTypeInfo &column_type)
virtual void reserve(const size_t num_elements)=0
const size_t parquet_data_type_byte_size_
void decodeNullsAndEncodeData(int8_t *data_ptr, const int16_t *def_levels, const int64_t values_read, const int64_t levels_read, const bool do_encoding)
void validateUsingEncodersColumnType(const int8_t *parquet_data, const int64_t j) const override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void setSize(const size_t size)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
std::pair< V, V > getEncodedStats(const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
Encoder * getEncoder() const
static ChunkStats getUpdatedStats(V &stats_min, V &stats_max, const SQLTypeInfo &column_type)
const size_t omnisci_data_type_byte_size_
void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements) override
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
HOST DEVICE bool get_notnull() const
bool validate_metadata_stats_
Data_Namespace::AbstractBuffer * buffer_
SQLTypeInfo get_elem_type() const
virtual void reserve(size_t num_bytes)=0
std::string integralTypeToString(const V &element) const
size_t current_chunk_offset_
void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices) override
void setDetectBufferConverterType()
int64_t current_batch_offset_
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
virtual void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination)=0