OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
QueryMemoryDescriptor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #ifndef QUERYENGINE_QUERYMEMORYDESCRIPTOR_H
24 #define QUERYENGINE_QUERYMEMORYDESCRIPTOR_H
25 
27 #include "ColSlotContext.h"
28 #include "Logger/Logger.h"
31 #include "QueryEngine/enums.h"
32 #include "Shared/SqlTypesLayout.h"
33 #include "Shared/TargetInfo.h"
34 
35 #include <boost/optional.hpp>
36 
37 #include <algorithm>
38 #include <cstddef>
39 #include <cstdint>
40 #include <memory>
41 #include <numeric>
42 #include <unordered_map>
43 #include <vector>
44 
45 extern bool g_cluster;
46 
47 class Executor;
49 class RenderInfo;
50 class RowSetMemoryOwner;
51 struct InputTableInfo;
52 struct RelAlgExecutionUnit;
53 class TResultSetBufferDescriptor;
55 struct ColRangeInfo;
56 struct KeylessInfo;
57 
58 using heavyai::QueryDescriptionType;
59 
60 class StreamingTopNOOM : public std::runtime_error {
61  public:
62  StreamingTopNOOM(const size_t heap_size_bytes)
63  : std::runtime_error("Unable to use streaming top N due to required heap size of " +
64  std::to_string(heap_size_bytes) +
65  " bytes exceeding maximum slab size.") {}
66 };
67 
69  public:
71 
72  // constructor for init call
73  QueryMemoryDescriptor(const Executor* executor,
74  const RelAlgExecutionUnit& ra_exe_unit,
75  const std::vector<InputTableInfo>& query_infos,
76  const bool allow_multifrag,
77  const bool keyless_hash,
78  const bool interleaved_bins_on_gpu,
79  const int32_t idx_target_as_key,
80  const ColRangeInfo& col_range_info,
81  const ColSlotContext& col_slot_context,
82  const std::vector<int8_t>& group_col_widths,
83  const int8_t group_col_compact_width,
84  const std::vector<int64_t>& target_groupby_indices,
85  const size_t entry_count,
88  const bool sort_on_gpu_hint,
89  const bool output_columnar,
90  const bool render_output,
91  const bool must_use_baseline_sort,
92  const bool use_streaming_top_n,
93  const bool threads_can_reuse_group_by_buffers);
94 
95  QueryMemoryDescriptor(const Executor* executor,
96  const size_t entry_count,
97  const QueryDescriptionType query_desc_type);
98 
99  QueryMemoryDescriptor(const QueryDescriptionType query_desc_type,
100  const int64_t min_val,
101  const int64_t max_val,
102  const bool has_nulls,
103  const std::vector<int8_t>& group_col_widths);
104 
105  // Serialization
106  QueryMemoryDescriptor(const TResultSetBufferDescriptor& thrift_query_memory_descriptor);
107  static TResultSetBufferDescriptor toThrift(const QueryMemoryDescriptor&);
108 
109  bool operator==(const QueryMemoryDescriptor& other) const;
110 
111  static std::unique_ptr<QueryMemoryDescriptor> init(
112  const Executor* executor,
113  const RelAlgExecutionUnit& ra_exe_unit,
114  const std::vector<InputTableInfo>& query_infos,
115  const ColRangeInfo& col_range_info,
116  const KeylessInfo& keyless_info,
117  const bool allow_multifrag,
118  const ExecutorDeviceType device_type,
119  const int8_t crt_min_byte_width,
120  const bool sort_on_gpu_hint,
121  const size_t shard_count,
122  const size_t max_groups_buffer_entry_count,
123  RenderInfo* render_info,
126  const bool must_use_baseline_sort,
127  const bool output_columnar_hint,
128  const bool streaming_top_n_hint,
129  const bool threads_can_reuse_group_by_buffers);
130 
131  std::unique_ptr<QueryExecutionContext> getQueryExecutionContext(
132  const RelAlgExecutionUnit&,
133  const Executor* executor,
134  const ExecutorDeviceType device_type,
135  const ExecutorDispatchMode dispatch_mode,
136  const int device_id,
137  const shared::TableKey& outer_table_key,
138  const int64_t num_rows,
139  const std::vector<std::vector<const int8_t*>>& col_buffers,
140  const std::vector<std::vector<uint64_t>>& frag_offsets,
141  std::shared_ptr<RowSetMemoryOwner>,
142  const bool output_columnar,
143  const bool sort_on_gpu,
144  const size_t thread_idx,
145  RenderInfo*) const;
146 
147  static bool many_entries(const int64_t max_val,
148  const int64_t min_val,
149  const int64_t bucket) {
150  return max_val - min_val > 10000 * std::max(bucket, int64_t(1));
151  }
152 
154  const CountDistinctDescriptors& count_distinct_descriptors) {
155  return std::all_of(count_distinct_descriptors.begin(),
156  count_distinct_descriptors.end(),
157  [](const CountDistinctDescriptor& desc) {
158  return desc.impl_type_ == CountDistinctImplType::Invalid;
159  });
160  }
161 
164  }
165 
166  static int8_t pick_target_compact_width(const RelAlgExecutionUnit& ra_exe_unit,
167  const std::vector<InputTableInfo>& query_infos,
168  const int8_t crt_min_byte_width);
169 
170  // Getters and Setters
171  const Executor* getExecutor() const { return executor_; }
172 
173  QueryDescriptionType getQueryDescriptionType() const { return query_desc_type_; }
174  void setQueryDescriptionType(const QueryDescriptionType val) { query_desc_type_ = val; }
177  getGroupbyColCount() == 1;
178  }
179 
180  bool hasKeylessHash() const { return keyless_hash_; }
181  void setHasKeylessHash(const bool val) { keyless_hash_ = val; }
182 
184  void setHasInterleavedBinsOnGpu(const bool val) { interleaved_bins_on_gpu_ = val; }
185 
186  int32_t getTargetIdxForKey() const { return idx_target_as_key_; }
187  void setTargetIdxForKey(const int32_t val) { idx_target_as_key_ = val; }
188 
189  int8_t groupColWidth(const size_t key_idx) const {
190  CHECK_LT(key_idx, group_col_widths_.size());
191  return group_col_widths_[key_idx];
192  }
193  size_t getPrependedGroupColOffInBytes(const size_t group_idx) const;
194  size_t getPrependedGroupBufferSizeInBytes() const;
195 
196  const auto groupColWidthsBegin() const { return group_col_widths_.begin(); }
197  const auto groupColWidthsEnd() const { return group_col_widths_.end(); }
199 
200  bool isGroupBy() const { return !group_col_widths_.empty(); }
201 
202  void setGroupColCompactWidth(const int8_t val) { group_col_compact_width_ = val; }
203 
204  size_t getColCount() const;
205  size_t getSlotCount() const;
206 
207  const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const;
208  const int8_t getLogicalSlotWidthBytes(const size_t slot_idx) const;
209 
210  void setPaddedSlotWidthBytes(const size_t slot_idx, const int8_t bytes);
211 
212  const int8_t getSlotIndexForSingleSlotCol(const size_t col_idx) const;
213 
214  size_t getPaddedColWidthForRange(const size_t offset, const size_t range) const {
215  size_t ret = 0;
216  for (size_t i = offset; i < offset + range; i++) {
217  ret += static_cast<size_t>(getPaddedSlotWidthBytes(i));
218  }
219  return ret;
220  }
221 
222  void useConsistentSlotWidthSize(const int8_t slot_width_size);
223  size_t getRowWidth() const;
224 
225  int8_t updateActualMinByteWidth(const int8_t actual_min_byte_width) const;
226 
227  void addColSlotInfo(const std::vector<std::tuple<int8_t, int8_t>>& slots_for_col);
228 
229  // FlatBuffer support:
230  void addColSlotInfoFlatBuffer(const int64_t flatbuffer_size);
231  int64_t getFlatBufferSize(const size_t slot_idx) const {
232  return col_slot_context_.getFlatBufferSize(slot_idx);
233  }
234  bool checkSlotUsesFlatBufferFormat(const size_t slot_idx) const {
236  }
237  int64_t getPaddedSlotBufferSize(const size_t slot_idx) const;
238 
239  void clearSlotInfo();
240 
241  void alignPaddedSlots();
242 
243  int64_t getTargetGroupbyIndex(const size_t target_idx) const {
244  CHECK_LT(target_idx, target_groupby_indices_.size());
245  return target_groupby_indices_[target_idx];
246  }
247 
248  void setAllTargetGroupbyIndices(std::vector<int64_t> group_by_indices) {
249  target_groupby_indices_ = group_by_indices;
250  }
251 
252  size_t targetGroupbyIndicesSize() const { return target_groupby_indices_.size(); }
254  return std::count_if(
255  target_groupby_indices_.begin(),
257  [](const int64_t& target_group_by_index) { return target_group_by_index < 0; });
258  }
260 
261  size_t getEntryCount() const { return entry_count_; }
262  void setEntryCount(const size_t val) { entry_count_ = val; }
263 
264  int64_t getMinVal() const { return min_val_; }
265  int64_t getMaxVal() const { return max_val_; }
266  int64_t getBucket() const { return bucket_; }
267 
268  bool hasNulls() const { return has_nulls_; }
269 
272  }
273 
274  const CountDistinctDescriptor& getCountDistinctDescriptor(const size_t idx) const {
276  return count_distinct_descriptors_[idx];
277  }
279  return count_distinct_descriptors_.size();
280  }
281 
282  bool sortOnGpu() const { return sort_on_gpu_; }
283 
284  bool canOutputColumnar() const;
285  bool didOutputColumnar() const { return output_columnar_; }
286  void setOutputColumnar(const bool val);
287 
288  bool useStreamingTopN() const { return use_streaming_top_n_; }
289 
290  bool isLogicalSizedColumnsAllowed() const;
291 
293 
296  }
297 
298  void setThreadsCanReuseGroupByBuffers(const bool val) {
300  }
301 
302  // TODO(adb): remove and store this info more naturally in another
303  // member
304  bool forceFourByteFloat() const { return force_4byte_float_; }
305  void setForceFourByteFloat(const bool val) { force_4byte_float_ = val; }
306 
307  // Getters derived from state
308  size_t getGroupbyColCount() const { return group_col_widths_.size(); }
309  size_t getKeyCount() const { return keyless_hash_ ? 0 : getGroupbyColCount(); }
310  size_t getBufferColSlotCount() const;
311 
312  size_t getBufferSizeBytes(const RelAlgExecutionUnit& ra_exe_unit,
313  const unsigned thread_count,
314  const ExecutorDeviceType device_type) const;
315  size_t getBufferSizeBytes(const ExecutorDeviceType device_type) const;
316  size_t getBufferSizeBytes(const ExecutorDeviceType device_type,
317  const size_t override_entry_count) const;
318 
320 
321  // TODO(alex): remove
322  bool usesGetGroupValueFast() const;
323 
324  bool blocksShareMemory() const;
325  bool threadsShareMemory() const;
326 
327  bool lazyInitGroups(const ExecutorDeviceType) const;
328 
329  bool interleavedBins(const ExecutorDeviceType) const;
330 
331  size_t getColOffInBytes(const size_t col_idx) const;
332  size_t getColOffInBytesInNextBin(const size_t col_idx) const;
333  size_t getNextColOffInBytes(const int8_t* col_ptr,
334  const size_t bin,
335  const size_t col_idx) const;
336 
337  // returns the ptr offset of the next column, 64-bit aligned
338  size_t getNextColOffInBytesRowOnly(const int8_t* col_ptr, const size_t col_idx) const;
339  // returns the ptr offset of the current column, 64-bit aligned
340  size_t getColOnlyOffInBytes(const size_t col_idx) const;
341  size_t getRowSize() const;
342  size_t getColsSize() const;
343  size_t getWarpCount() const;
344 
345  size_t getCompactByteWidth() const;
346 
347  inline size_t getEffectiveKeyWidth() const {
348  return group_col_compact_width_ ? group_col_compact_width_ : sizeof(int64_t);
349  }
350 
351  bool isWarpSyncRequired(const ExecutorDeviceType) const;
352 
353  std::string queryDescTypeToString() const;
354  std::string toString() const;
355 
356  std::string reductionKey() const;
357 
359 
360  // returns a value if the buffer can be a fixed size; otherwise, we will need to use the
361  // bump allocator
362  std::optional<size_t> varlenOutputBufferElemSize() const;
363 
364  // returns the number of bytes needed for all slots preceeding slot_idx. Used to compute
365  // the offset into the varlen buffer for each projected target in a given row.
366  size_t varlenOutputRowSizeToSlot(const size_t slot_idx) const;
367 
368  bool slotIsVarlenOutput(const size_t slot_idx) const {
369  return col_slot_context_.slotIsVarlen(slot_idx);
370  }
371 
373 
374  void setAvailableCpuThreads(size_t num_available_threads) const {
375  num_available_threads_ = num_available_threads;
376  }
377 
378  std::optional<size_t> getMaxPerDeviceCardinality(
379  const RelAlgExecutionUnit& ra_exe_unit) const;
380 
381  bool canUsePerDeviceCardinality(const RelAlgExecutionUnit& ra_exe_unit) const;
382 
383  protected:
384  void resetGroupColWidths(const std::vector<int8_t>& new_group_col_widths) {
385  group_col_widths_ = new_group_col_widths;
386  }
387 
388  private:
391  QueryDescriptionType query_desc_type_;
394  int32_t idx_target_as_key_; // If keyless_hash_ enabled, then represents what target
395  // expression should be used to identify the key (e.g., in
396  // locating empty entries). Currently only valid with
397  // keyless_hash_ and single-column GroupByPerfectHash
398  std::vector<int8_t> group_col_widths_;
399  int8_t group_col_compact_width_; // compact width for all group
400  // cols if able to be consistent
401  // otherwise 0
402  std::vector<int64_t> target_groupby_indices_;
403  size_t entry_count_; // the number of entries in the main buffer
404  int64_t min_val_; // meaningful for OneColKnownRange,
405  // MultiColPerfectHash only
406  int64_t max_val_;
407  int64_t bucket_;
418 
420 
421  // # available CPU threads can be used for this query kernel, i.e., to parallelize rest
422  // of query initialization step its default value is one which means we do not
423  // parallelize for the query kernel, and it will be updated to a proper value before
424  // performing the query initialization
425  mutable size_t num_available_threads_{1};
426 
427  size_t getTotalBytesOfColumnarBuffers() const;
428  size_t getTotalBytesOfColumnarBuffers(const size_t num_entries_per_column) const;
429  size_t getTotalBytesOfColumnarProjections(const size_t projection_count) const;
430 
431  friend class ResultSet;
432  friend class QueryExecutionContext;
433 };
434 
435 inline void set_notnull(TargetInfo& target, const bool not_null) {
436  target.skip_null_val = !not_null;
437  auto new_type = get_compact_type(target);
438  new_type.set_notnull(not_null);
439  set_compact_type(target, new_type);
440 }
441 
442 std::vector<TargetInfo> target_exprs_to_infos(
443  const std::vector<Analyzer::Expr*>& targets,
445 
446 #endif // QUERYENGINE_QUERYMEMORYDESCRIPTOR_H
size_t varlenOutputRowSizeToSlot(const size_t slot_idx) const
GroupByPerfectHash
Definition: enums.h:58
void set_compact_type(TargetInfo &target, const SQLTypeInfo &new_type)
static bool many_entries(const int64_t max_val, const int64_t min_val, const int64_t bucket)
void addColSlotInfoFlatBuffer(const int64_t flatbuffer_size)
bool canUsePerDeviceCardinality(const RelAlgExecutionUnit &ra_exe_unit) const
size_t getBufferSizeBytes(const RelAlgExecutionUnit &ra_exe_unit, const unsigned thread_count, const ExecutorDeviceType device_type) const
bool slotIsVarlenOutput(const size_t slot_idx) const
bool countDistinctDescriptorsLogicallyEmpty() const
bool slotIsVarlen(const size_t slot_idx) const
size_t getTotalBytesOfColumnarProjections(const size_t projection_count) const
void setEntryCount(const size_t val)
size_t getAvailableCpuThreads() const
int64_t getTargetGroupbyIndex(const size_t target_idx) const
void sort_on_gpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes, ThrustAllocator &alloc, const int device_id)
std::string toString() const
bool isLogicalSizedColumnsAllowed() const
void setHasKeylessHash(const bool val)
void setGroupColCompactWidth(const int8_t val)
void setThreadsCanReuseGroupByBuffers(const bool val)
static std::unique_ptr< QueryMemoryDescriptor > init(const Executor *executor, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const ColRangeInfo &col_range_info, const KeylessInfo &keyless_info, const bool allow_multifrag, const ExecutorDeviceType device_type, const int8_t crt_min_byte_width, const bool sort_on_gpu_hint, const size_t shard_count, const size_t max_groups_buffer_entry_count, RenderInfo *render_info, const ApproxQuantileDescriptors &, const CountDistinctDescriptors, const bool must_use_baseline_sort, const bool output_columnar_hint, const bool streaming_top_n_hint, const bool threads_can_reuse_group_by_buffers)
void setOutputColumnar(const bool val)
const ApproxQuantileDescriptors & getApproxQuantileDescriptors() const
size_t getNextColOffInBytes(const int8_t *col_ptr, const size_t bin, const size_t col_idx) const
size_t getEffectiveKeyWidth() const
bool use_streaming_top_n(const RelAlgExecutionUnit &ra_exe_unit, const bool output_columnar)
std::vector< ApproxQuantileDescriptor > ApproxQuantileDescriptors
bool hasVarlenOutput() const
bool skip_null_val
Definition: TargetInfo.h:54
void setQueryDescriptionType(const QueryDescriptionType val)
size_t targetGroupbyNegativeIndicesSize() const
bool hasInterleavedBinsOnGpu() const
ExecutorDeviceType
std::string to_string(char const *&&v)
const auto groupColWidthsBegin() const
std::optional< size_t > getMaxPerDeviceCardinality(const RelAlgExecutionUnit &ra_exe_unit) const
void useConsistentSlotWidthSize(const int8_t slot_width_size)
size_t getColOnlyOffInBytes(const size_t col_idx) const
ExecutorDispatchMode
const SQLTypeInfo get_compact_type(const TargetInfo &target)
const auto groupColWidthsEnd() const
int8_t groupColWidth(const size_t key_idx) const
void setTargetIdxForKey(const int32_t val)
std::vector< CountDistinctDescriptor > CountDistinctDescriptors
Definition: CountDistinct.h:34
Provides column info and slot info for the output buffer and some metadata helpers.
size_t getGroupbyColCount() const
bool lazyInitGroups(const ExecutorDeviceType) const
size_t targetGroupbyIndicesSize() const
bool threadsCanReuseGroupByBuffers() const
size_t getPrependedGroupBufferSizeInBytes() const
size_t getTotalBytesOfColumnarBuffers() const
std::vector< int64_t > target_groupby_indices_
static int8_t pick_target_compact_width(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const int8_t crt_min_byte_width)
CountDistinctDescriptors count_distinct_descriptors_
size_t getPaddedColWidthForRange(const size_t offset, const size_t range) const
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
int64_t getPaddedSlotBufferSize(const size_t slot_idx) const
static TResultSetBufferDescriptor toThrift(const QueryMemoryDescriptor &)
size_t getCountDistinctDescriptorsSize() const
QueryDescriptionType getQueryDescriptionType() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
std::optional< size_t > varlenOutputBufferElemSize() const
#define CHECK_LT(x, y)
Definition: Logger.h:303
bool isSingleColumnGroupByWithPerfectHash() const
size_t getNextColOffInBytesRowOnly(const int8_t *col_ptr, const size_t col_idx) const
bool checkSlotUsesFlatBufferFormat(const size_t slot_idx) const
StreamingTopNOOM(const size_t heap_size_bytes)
QueryDescriptionType query_desc_type_
void setAvailableCpuThreads(size_t num_available_threads) const
Functions used to work with (approximate) count distinct sets.
int8_t updateActualMinByteWidth(const int8_t actual_min_byte_width) const
QueryEngine enum classes with minimal #include files.
bool operator==(const QueryMemoryDescriptor &other) const
void setForceFourByteFloat(const bool val)
bool isWarpSyncRequired(const ExecutorDeviceType) const
bool interleavedBins(const ExecutorDeviceType) const
const ColSlotContext & getColSlotContext() const
std::vector< int8_t > group_col_widths_
void setAllTargetGroupbyIndices(std::vector< int64_t > group_by_indices)
bool g_cluster
void setPaddedSlotWidthBytes(const size_t slot_idx, const int8_t bytes)
void resetGroupColWidths(const std::vector< int8_t > &new_group_col_widths)
std::vector< TargetInfo > target_exprs_to_infos(const std::vector< Analyzer::Expr * > &targets, const QueryMemoryDescriptor &query_mem_desc)
std::string queryDescTypeToString() const
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
void setHasInterleavedBinsOnGpu(const bool val)
static bool countDescriptorsLogicallyEmpty(const CountDistinctDescriptors &count_distinct_descriptors)
int64_t getFlatBufferSize(const size_t slot_idx) const
const int8_t getSlotIndexForSingleSlotCol(const size_t col_idx) const
bool checkSlotUsesFlatBufferFormat(const size_t slot_idx) const
const int8_t getLogicalSlotWidthBytes(const size_t slot_idx) const
size_t getColOffInBytes(const size_t col_idx) const
ApproxQuantileDescriptors approx_quantile_descriptors_
int64_t getFlatBufferSize(const size_t slot_idx) const
size_t getColOffInBytesInNextBin(const size_t col_idx) const
std::unique_ptr< QueryExecutionContext > getQueryExecutionContext(const RelAlgExecutionUnit &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const shared::TableKey &outer_table_key, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner >, const bool output_columnar, const bool sort_on_gpu, const size_t thread_idx, RenderInfo *) const
std::string reductionKey() const
const Executor * getExecutor() const
void set_notnull(TargetInfo &target, const bool not_null)
int32_t getTargetIdxForKey() const
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const