17 #ifndef NONE_ENCODER_H
18 #define NONE_ENCODER_H
26 #include <tbb/parallel_for.h>
27 #include <tbb/parallel_reduce.h>
32 return std::is_integral<T>::value ? inline_int_null_value<T>()
33 : inline_fp_null_value<T>();
44 const std::vector<size_t>& selected_idx,
45 const size_t byte_limit)
override {
47 <<
"getNumElemsForBytesEncodedDataAtIndices unexpectedly called for non varlen"
55 const std::vector<size_t>& selected_idx)
override {
56 std::shared_ptr<ChunkMetadata> chunk_metadata;
61 selected_idx, [&](
const size_t start_pos,
const size_t end_pos) {
62 size_t elem_count = end_pos - start_pos;
63 auto data_ptr = data +
sizeof(
T) * selected_idx[start_pos];
67 return chunk_metadata;
72 const size_t start_idx,
73 const size_t num_elements)
override {
74 auto current_data = data +
sizeof(
T) * start_idx;
78 std::shared_ptr<ChunkMetadata>
appendData(int8_t*& src_data,
79 const size_t num_elems_to_append,
81 const bool replicating =
false,
82 const int64_t offset = -1)
override {
84 src_data, num_elems_to_append, replicating, offset,
false);
87 void getMetadata(
const std::shared_ptr<ChunkMetadata>& chunkMetadata)
override {
94 auto chunk_metadata = std::make_shared<ChunkMetadata>(ti, 0, 0,
ChunkStats{});
96 return chunk_metadata;
104 const auto data =
static_cast<T>(val);
115 const auto data =
static_cast<T>(val);
121 void updateStats(
const int8_t*
const src_data,
const size_t num_elements)
override {
126 const size_t num_elements)
override {
127 const T* data =
reinterpret_cast<const T*
>(dst_data);
130 tbb::blocked_range(
size_t(0), num_elements),
132 [&](
const auto& range,
auto init) {
133 auto [min, max, nulls] =
init;
134 for (
size_t i = range.begin(); i < range.end(); i++) {
135 if (data[i] != none_encoded_null_value<T>()) {
137 min = std::min(min, data[i]);
138 max = std::max(max, data[i]);
143 return std::tuple(min, max, nulls);
145 [&](
auto lhs,
auto rhs) {
146 const auto [lhs_min, lhs_max, lhs_nulls] = lhs;
147 const auto [rhs_min, rhs_max, rhs_nulls] = rhs;
148 return std::tuple(std::min(lhs_min, rhs_min),
149 std::max(lhs_max, rhs_max),
150 lhs_nulls || rhs_nulls);
155 const size_t start_idx,
156 const size_t num_elements)
override {
161 const size_t start_idx,
162 const size_t num_elements)
override {
168 const auto that_typed =
static_cast<const NoneEncoder&
>(that);
169 if (that_typed.has_nulls) {
178 fwrite((int8_t*)&
num_elems_,
sizeof(
size_t), 1, f);
179 fwrite((int8_t*)&
dataMin,
sizeof(
T), 1, f);
180 fwrite((int8_t*)&
dataMax,
sizeof(
T), 1, f);
181 fwrite((int8_t*)&
has_nulls,
sizeof(
bool), 1, f);
186 fread((int8_t*)&
num_elems_,
sizeof(
size_t), 1, f);
187 fread((int8_t*)&
dataMin,
sizeof(
T), 1, f);
188 fread((int8_t*)&
dataMax,
sizeof(
T), 1, f);
189 fread((int8_t*)&
has_nulls,
sizeof(
bool), 1, f);
193 const auto new_min = DatumFetcher::getDatumVal<T>(stats.
min);
194 const auto new_max = DatumFetcher::getDatumVal<T>(stats.
max);
208 auto castedEncoder =
reinterpret_cast<const NoneEncoder<T>*
>(copyFromEncoder);
209 dataMin = castedEncoder->dataMin;
210 dataMax = castedEncoder->dataMax;
215 dataMin = std::numeric_limits<T>::max();
216 dataMax = std::numeric_limits<T>::lowest();
227 const size_t num_elems_to_append,
228 const bool replicating,
229 const int64_t offset,
230 const bool is_validated_data) {
231 if (offset == 0 && num_elems_to_append >=
num_elems_) {
234 T* unencodedData =
reinterpret_cast<T*
>(src_data);
235 std::vector<T> encoded_data;
237 if (num_elems_to_append > 0) {
238 encoded_data.resize(num_elems_to_append);
240 std::fill(encoded_data.begin(), encoded_data.end(), data);
243 updateStats(src_data, num_elems_to_append, is_validated_data);
246 auto append_data_size = num_elems_to_append *
sizeof(
T);
250 replicating ? reinterpret_cast<int8_t*>(encoded_data.data()) : src_data,
253 src_data += num_elems_to_append *
sizeof(
T);
260 src_data, num_elems_to_append *
sizeof(
T), static_cast<size_t>(offset));
262 auto chunk_metadata = std::make_shared<ChunkMetadata>();
264 return chunk_metadata;
268 const bool is_validated_data =
false) {
269 if (unencoded_data == none_encoded_null_value<T>()) {
272 if (!is_validated_data) {
278 return unencoded_data;
282 const size_t num_elements,
283 const bool is_validated_data) {
284 const T* unencoded_data =
reinterpret_cast<const T*
>(src_data);
285 for (
size_t i = 0; i < num_elements; ++i) {
291 #endif // NONE_ENCODER_H
void updateStats(const int8_t *const src_data, const size_t num_elements) override
void writeMetadata(FILE *f) override
DecimalOverflowValidator decimal_overflow_validator_
void updateStats(const int8_t *const src_data, const size_t num_elements, const bool is_validated_data)
void updateStats(const int64_t val, const bool is_null) override
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
void resetChunkStats() override
void execute_over_contiguous_indices(const std::vector< size_t > &indices, std::function< void(const size_t, const size_t)> to_execute)
std::shared_ptr< ChunkMetadata > appendEncodedDataAtIndices(const int8_t *, int8_t *data, const std::vector< size_t > &selected_idx) override
void updateStats(const std::vector< std::string > *const src_data, const size_t start_idx, const size_t num_elements) override
DEVICE void fill(ARGS &&...args)
CONSTEXPR DEVICE bool is_null(const T &value)
Data_Namespace::AbstractBuffer * buffer_
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &, const bool replicating=false, const int64_t offset=-1) override
void init(LogOptions const &log_opts)
std::shared_ptr< ChunkMetadata > appendValidatedOrNonValidatedData(int8_t *&src_data, const size_t num_elems_to_append, const bool replicating, const int64_t offset, const bool is_validated_data)
size_t getNumElems() const
An AbstractBuffer is a unit of data management for a data manager.
void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata) override
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
T none_encoded_null_value()
bool resetChunkStats(const ChunkStats &stats) override
: Reset chunk level stats (min, max, nulls) using new values from the argument.
size_t getNumElemsForBytesEncodedDataAtIndices(const int8_t *index_data, const std::vector< size_t > &selected_idx, const size_t byte_limit) override
T validateDataAndUpdateStats(const T &unencoded_data, const bool is_validated_data=false)
void updateStats(const double val, const bool is_null) override
void updateStats(const std::vector< ArrayDatum > *const src_data, const size_t start_idx, const size_t num_elements) override
std::shared_ptr< ChunkMetadata > getMetadata(const SQLTypeInfo &ti) override
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
std::shared_ptr< ChunkMetadata > appendEncodedData(const int8_t *, int8_t *data, const size_t start_idx, const size_t num_elements) override
NoneEncoder(Data_Namespace::AbstractBuffer *buffer)
void updateStatsEncoded(const int8_t *const dst_data, const size_t num_elements) override
void reduceStats(const Encoder &that) override
void copyMetadata(const Encoder *copyFromEncoder) override
void validate(T value) const
virtual void reserve(size_t num_bytes)=0
void readMetadata(FILE *f) override