17 #ifndef FIXED_LENGTH_ENCODER_H
18 #define FIXED_LENGTH_ENCODER_H
28 #include <tbb/parallel_for.h>
29 #include <tbb/parallel_reduce.h>
33 template <
typename T,
typename V>
41 const std::vector<size_t>& selected_idx,
42 const size_t byte_limit)
override {
44 <<
"getNumElemsForBytesEncodedDataAtIndices unexpectedly called for non varlen"
52 const std::vector<size_t>& selected_idx)
override {
53 std::shared_ptr<ChunkMetadata> chunk_metadata;
58 selected_idx, [&](
const size_t start_pos,
const size_t end_pos) {
59 size_t elem_count = end_pos - start_pos;
63 return chunk_metadata;
68 const size_t start_idx,
69 const size_t num_elements)
override {
70 auto current_data = data + start_idx *
sizeof(V);
72 current_data, num_elements,
SQLTypeInfo{},
false, -1,
true);
75 std::shared_ptr<ChunkMetadata>
appendData(int8_t*& src_data,
76 const size_t num_elems_to_append,
78 const bool replicating =
false,
79 const int64_t offset = -1)
override {
81 src_data, num_elems_to_append, ti, replicating, offset,
false);
84 void getMetadata(
const std::shared_ptr<ChunkMetadata>& chunkMetadata)
override {
91 auto chunk_metadata = std::make_shared<ChunkMetadata>(ti, 0, 0,
ChunkStats{});
93 return chunk_metadata;
101 const auto data =
static_cast<T>(val);
112 const auto data =
static_cast<T>(val);
118 void updateStats(
const int8_t*
const src_data,
const size_t num_elements)
override {
119 const T* unencoded_data =
reinterpret_cast<const T*
>(src_data);
120 for (
size_t i = 0; i < num_elements; ++i) {
126 const size_t num_elements)
override {
127 const V* data =
reinterpret_cast<const V*
>(dst_data);
130 tbb::blocked_range(
size_t(0), num_elements),
132 [&](
const auto& range,
auto init) {
133 auto [min, max, nulls] =
init;
134 for (
size_t i = range.begin(); i < range.end(); i++) {
135 if (data[i] != std::numeric_limits<V>::min()) {
137 min = std::min(min, data[i]);
138 max = std::max(max, data[i]);
143 return std::tuple(min, max, nulls);
145 [&](
auto lhs,
auto rhs) {
146 const auto [lhs_min, lhs_max, lhs_nulls] = lhs;
147 const auto [rhs_min, rhs_max, rhs_nulls] = rhs;
148 return std::tuple(std::min(lhs_min, rhs_min),
149 std::max(lhs_max, rhs_max),
150 lhs_nulls || rhs_nulls);
155 const size_t start_idx,
156 const size_t num_elements)
override {
161 const size_t start_idx,
162 const size_t num_elements)
override {
169 if (that_typed.has_nulls) {
180 dataMin = castedEncoder->dataMin;
181 dataMax = castedEncoder->dataMax;
187 fwrite((int8_t*)&
num_elems_,
sizeof(
size_t), 1, f);
188 fwrite((int8_t*)&
dataMin,
sizeof(
T), 1, f);
189 fwrite((int8_t*)&
dataMax,
sizeof(
T), 1, f);
190 fwrite((int8_t*)&
has_nulls,
sizeof(
bool), 1, f);
195 fread((int8_t*)&
num_elems_,
sizeof(
size_t), 1, f);
196 fread((int8_t*)&
dataMin, 1,
sizeof(
T), f);
197 fread((int8_t*)&
dataMax, 1,
sizeof(
T), f);
198 fread((int8_t*)&
has_nulls, 1,
sizeof(
bool), f);
202 const auto new_min = DatumFetcher::getDatumVal<T>(stats.
min);
203 const auto new_max = DatumFetcher::getDatumVal<T>(stats.
max);
216 dataMin = std::numeric_limits<T>::max();
217 dataMax = std::numeric_limits<T>::lowest();
228 const size_t num_elems_to_append,
230 const bool replicating,
231 const int64_t offset,
232 const bool is_encoded) {
234 num_elems_to_append >=
239 CHECK(!is_encoded || !replicating);
241 T* unencoded_data =
reinterpret_cast<T*
>(src_data);
242 std::vector<V> encoded_data;
243 V* data_to_write =
nullptr;
245 encoded_data.resize(num_elems_to_append);
246 data_to_write = encoded_data.data();
247 for (
size_t i = 0; i < num_elems_to_append; ++i) {
248 size_t ri = replicating ? 0 : i;
252 data_to_write =
reinterpret_cast<V*
>(src_data);
253 for (
size_t i = 0; i < num_elems_to_append; ++i) {
260 auto append_data_size = num_elems_to_append *
sizeof(V);
263 buffer_->
append(reinterpret_cast<int8_t*>(data_to_write), append_data_size);
265 src_data += num_elems_to_append *
sizeof(
T);
271 buffer_->
write(reinterpret_cast<int8_t*>(data_to_write),
272 num_elems_to_append *
sizeof(V),
273 static_cast<size_t>(offset));
275 auto chunk_metadata = std::make_shared<ChunkMetadata>();
277 return chunk_metadata;
281 if (encoded_data == std::numeric_limits<V>::min()) {
290 V encoded_data =
static_cast<V
>(unencoded_data);
291 if (unencoded_data != encoded_data) {
293 LOG(
ERROR) <<
"Fixed encoding failed, Unencoded: " +
297 T data = unencoded_data;
298 if (data == std::numeric_limits<V>::min()) {
310 #endif // FIXED_LENGTH_ENCODER_H
void updateStats(const int8_t *const src_data, const size_t num_elements) override
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1) override
DecimalOverflowValidator decimal_overflow_validator_
std::shared_ptr< ChunkMetadata > appendEncodedDataAtIndices(const int8_t *, int8_t *data, const std::vector< size_t > &selected_idx) override
std::shared_ptr< ChunkMetadata > appendEncodedData(const int8_t *, int8_t *data, const size_t start_idx, const size_t num_elements) override
void resetChunkStats() override
void updateStats(const int64_t val, const bool is_null) override
void updateStats(const std::vector< std::string > *const src_data, const size_t start_idx, const size_t num_elements) override
std::shared_ptr< ChunkMetadata > appendEncodedOrUnencodedData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating, const int64_t offset, const bool is_encoded)
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
void execute_over_contiguous_indices(const std::vector< size_t > &indices, std::function< void(const size_t, const size_t)> to_execute)
CONSTEXPR DEVICE bool is_null(const T &value)
Data_Namespace::AbstractBuffer * buffer_
void init(LogOptions const &log_opts)
void copyMetadata(const Encoder *copyFromEncoder) override
void readMetadata(FILE *f) override
size_t getNumElems() const
void updateStats(const double val, const bool is_null) override
V encodeDataAndUpdateStats(const T &unencoded_data)
An AbstractBuffer is a unit of data management for a data manager.
void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata) override
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
void updateStats(const std::vector< ArrayDatum > *const src_data, const size_t start_idx, const size_t num_elements) override
bool resetChunkStats(const ChunkStats &stats) override
: Reset chunk level stats (min, max, nulls) using new values from the argument.
void updateStatsEncoded(const int8_t *const dst_data, const size_t num_elements) override
FixedLengthEncoder(Data_Namespace::AbstractBuffer *buffer)
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
void writeMetadata(FILE *f) override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
size_t getNumElemsForBytesEncodedDataAtIndices(const int8_t *index_data, const std::vector< size_t > &selected_idx, const size_t byte_limit) override
void validate(T value) const
virtual void reserve(size_t num_bytes)=0
void updateStatsWithAlreadyEncoded(const V &encoded_data)
void reduceStats(const Encoder &that) override
std::shared_ptr< ChunkMetadata > getMetadata(const SQLTypeInfo &ti) override