19 #include <boost/noncopyable.hpp>
25 #include <unordered_map>
47 namespace Catalog_Namespace {
63 allocators_.emplace_back(std::make_unique<CpuMgrArenaAllocator>());
73 CHECK_GT(num_kernels, static_cast<size_t>(0));
76 auto const required_num_kernels = num_kernels + 1;
84 auto const required_num_allocators = required_num_kernels -
allocators_.size();
85 VLOG(1) <<
"Prepare " << required_num_allocators
87 <<
", # existing allocator(s): " <<
allocators_.size()
88 <<
", # requested allocator(s): " << required_num_kernels <<
")";
89 for (
size_t i = 0; i < required_num_allocators; i++) {
91 allocators_.emplace_back(std::make_unique<CpuMgrArenaAllocator>());
103 int8_t*
allocate(
const size_t num_bytes)
override {
104 constexpr
size_t thread_idx = 0u;
105 return allocate(num_bytes, thread_idx);
109 int8_t*
allocate(
const size_t num_bytes,
const size_t thread_idx) {
117 VLOG(2) <<
"Count distinct buffer allocator initialized with buffer_size: "
118 << buffer_size <<
", thread_idx: " << thread_idx;
121 VLOG(2) <<
"Replacing count_distinct_buffer_allocators_[" << thread_idx <<
"].";
124 std::make_unique<CountDistinctBufferAllocator>(
129 const size_t thread_idx) {
139 VLOG(1) <<
"Try to allocate CPU memory: " << num_bytes <<
" bytes (THREAD-"
140 << thread_idx <<
")";
142 int64_t* group_by_buffer =
reinterpret_cast<int64_t*
>(allocator->allocate(num_bytes));
143 CHECK(group_by_buffer);
146 return std::make_pair(group_by_buffer,
false);
150 const size_t thread_idx = 0) {
154 std::memset(buffer, 0, num_bytes);
161 const bool physical_buffer) {
199 std::vector<int64_t>*
addArray(
const std::vector<int64_t>& arr) {
207 const int64_t generation) {
211 CHECK_EQ(it->second->getDictionary(), str_dict.get());
212 it->second->updateGeneration(generation);
213 return it->second.get();
218 std::make_shared<StringDictionaryProxy>(str_dict, dict_key, generation))
220 return it->second.get();
225 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
226 std::ostringstream oss;
227 oss <<
"{source_dict_key: " << source_proxy_dict_key
228 <<
" StringOps: " << string_op_infos <<
"}";
235 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
236 std::ostringstream oss;
237 oss <<
"{source_dict_key: " << source_proxy_dict_key
238 <<
", dest_dict_key: " << dest_proxy_dict_key <<
" StringOps: " << string_op_infos
246 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
257 dest_proxy, string_op_infos))
265 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
269 if (it->first != map_key) {
279 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
290 dest_proxy, string_op_infos))
297 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
304 std::make_shared<StringOps_Namespace::StringOps>(string_op_infos))
307 return it->second.get();
314 return it->second.get();
318 const bool with_generation);
321 std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy) {
334 const bool with_generation,
336 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos);
341 const bool with_generation,
342 const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos);
350 std::ostringstream oss;
351 oss <<
"Destruct RowSetMemoryOwner attached to Executor-" <<
executor_id_ <<
"{\t";
352 int allocator_id = 0;
354 auto const usedBytes = allocator->bytesUsed();
356 oss <<
"allocator-" << allocator_id <<
", byteUsed: " << usedBytes <<
"/"
357 << allocator->totalBytes() <<
"\t";
363 VLOG(1) << oss.str();
365 delete count_distinct_set;
371 CHECK(varlen_input_buffer);
372 varlen_input_buffer->unPin();
402 const uint8_t* raw_data,
403 const size_t num_bytes,
406 std::memcpy(metadata_value.first.data(), raw_data, num_bytes);
412 const uint8_t*& raw_data,
418 throw std::runtime_error(
"Failed to find Table Function Metadata with key '" +
419 std::string(key) +
"'");
421 raw_data = itr->second.first.data();
422 num_bytes = itr->second.first.size();
423 value_type = itr->second.second;
434 VLOG(1) <<
"Try to allocate CPU memory: " << num_bytes <<
" bytes (THREAD-"
435 << thread_idx <<
")";
438 return reinterpret_cast<int8_t*
>(allocator->allocate(num_bytes));
453 std::unordered_map<shared::StringDictKey, std::shared_ptr<StringDictionaryProxy>>
455 std::map<std::string, StringDictionaryProxy::IdMap>
457 std::map<std::string, StringDictionaryProxy::IdMap>
459 std::map<std::string, StringDictionaryProxy::TranslationMap<Datum>>
470 std::map<std::string, std::shared_ptr<StringOps_Namespace::StringOps>>
478 std::vector<std::unique_ptr<CountDistinctBufferAllocator>>
std::shared_ptr< RowSetMemoryOwner > cloneStrDictDataOnly()
robin_hood::unordered_set< int64_t > CountDistinctSet
std::vector< std::unique_ptr< Arena > > allocators_
int8_t * allocateCountDistinctBuffer(const size_t num_bytes, const size_t thread_idx=0)
std::list< std::vector< int64_t > > arrays_
void addVarlenInputBuffer(Data_Namespace::AbstractBuffer *buffer)
const shared::StringDictKey & getDictKey() const noexcept
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const shared::StringDictKey &source_dict_id_in, const shared::StringDictKey &dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
const bool physical_buffer
void addLiteralStringDictProxy(std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy)
std::map< std::string, StringDictionaryProxy::TranslationMap< Datum > > str_proxy_numeric_translation_maps_owned_
const StringDictionaryProxy::TranslationMap< Datum > * addStringProxyNumericTranslationMap(const StringDictionaryProxy *source_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::list< std::string > strings_
Calculate approximate median and general quantiles, based on "Computing Extremely Accurate Quantiles ...
void addCountDistinctBuffer(int8_t *count_distinct_buffer, const size_t bytes, const bool physical_buffer)
virtual MemoryLevel getType() const =0
std::vector< int64_t > * addArray(const std::vector< int64_t > &arr)
std::vector< CountDistinctSet * > count_distinct_sets_
StringDictionary * getDictionary() const noexcept
std::pair< std::vector< uint8_t >, TableFunctionMetadataType > MetadataValue
int8_t * allocateUnlocked(const size_t num_bytes, const size_t thread_idx)
StringDictionaryGenerations & getStringDictionaryGenerations()
StringDictionaryProxy * getStringDictProxy(const shared::StringDictKey &dict_key) const
StringDictionaryProxy * getLiteralStringDictProxy() const
bool g_use_cpu_mem_pool_for_output_buffers
std::mutex table_function_metadata_store_mutex_
std::pair< int64_t *, bool > allocateCachedGroupByBuffer(const size_t num_bytes, const size_t thread_idx)
TranslationMap< Datum > buildNumericTranslationMap(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
Builds a vectorized string_id translation map from this proxy to dest_proxy.
int8_t * allocate(const size_t num_bytes) override
void setKernelMemoryAllocator(const size_t num_kernels)
RowSetMemoryOwner(const size_t arena_block_size, const size_t executor_id)
quantile::TDigest * initTDigest(size_t thread_idx, ApproxQuantileDescriptor, double q)
std::map< std::string, MetadataValue > table_function_metadata_store_
std::list< AggMode > mode_maps_
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const shared::StringDictKey &dict_key, const int64_t generation)
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_intersection_translation_maps_owned_
void setDictionaryGenerations(StringDictionaryGenerations generations)
const StringDictionaryProxy::TranslationMap< Datum > * getOrAddStringProxyNumericTranslationMap(const shared::StringDictKey &source_dict_id_in, const bool with_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::vector< CountDistinctBitmapBuffer > count_distinct_bitmaps_
std::vector< void * > col_buffers_
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_union_translation_maps_owned_
std::map< std::string, std::shared_ptr< StringOps_Namespace::StringOps > > string_ops_owned_
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
Calculate statistical mode as an aggregate function.
std::vector< void * > varlen_buffers_
std::string generate_translation_map_key(const shared::StringDictKey &source_proxy_dict_key, const shared::StringDictKey &dest_proxy_dict_key, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::unordered_map< shared::StringDictKey, std::shared_ptr< StringDictionaryProxy > > str_dict_proxy_owned_
std::vector< Data_Namespace::AbstractBuffer * > varlen_input_buffers_
An AbstractBuffer is a unit of data management for a data manager.
Quickly allocate many memory pieces by reserving them ahead of time. Calls to allocate() are thread-s...
StringDictionaryGenerations string_dictionary_generations_
IdMap buildUnionTranslationMapToOtherProxy(StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_types) const
void initCountDistinctBufferAllocator(size_t buffer_size, size_t thread_idx)
const StringOps_Namespace::StringOps * getStringOps(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
int8_t * allocate(const size_t num_bytes, const size_t thread_idx)
Allocate CPU memory using CpuBuffers via DataMgr.
void setTableFunctionMetadata(const char *key, const uint8_t *raw_data, const size_t num_bytes, const TableFunctionMetadataType value_type)
void addVarlenBuffer(void *varlen_buffer)
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
Functions used to work with (approximate) count distinct sets.
void addCountDistinctSet(CountDistinctSet *count_distinct_set)
std::vector< std::unique_ptr< CountDistinctBufferAllocator > > count_distinct_buffer_allocators_
void reserveTDigestMemory(size_t thread_idx, size_t capacity)
void clearNonOwnedGroupByBuffers()
void getTableFunctionMetadata(const char *key, const uint8_t *&raw_data, size_t &num_bytes, TableFunctionMetadataType &value_type) const
std::vector< int64_t * > non_owned_group_by_buffers_
std::deque< TDigestAllocator > t_digest_allocators_
const StringDictionaryProxy::IdMap * addStringProxyUnionTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
IdMap buildIntersectionTranslationMapToOtherProxy(const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
bool g_allow_memory_status_log
std::string * addString(const std::string &str)
const StringDictionaryProxy::IdMap * addStringProxyIntersectionTranslationMap(const StringDictionaryProxy *source_proxy, const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
void addColBuffer(const void *col_buffer)
std::string generate_translation_map_key(const shared::StringDictKey &source_proxy_dict_key, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const unsigned block_size, const unsigned grid_size)
StringDictionaryProxy * getOrAddStringDictProxy(const shared::StringDictKey &dict_key, const bool with_generation)