22 namespace ExecutorResourceMgr_Namespace {
27 template <
typename... Ts>
30 (std::cout << ... << print_args);
31 std::cout << std::endl;
35 std::cout << std::endl <<
"Request Grant Info" << std::endl;
36 std::cout <<
"Grant CPU slots: " <<
cpu_slots << std::endl;
37 std::cout <<
"Grant GPU slots: " <<
gpu_slots << std::endl;
42 std::ostringstream oss;
49 const std::vector<std::pair<ResourceType, size_t>>& total_resources,
50 const std::vector<ConcurrentResourceGrantPolicy>& concurrent_resource_grant_policies,
51 const std::vector<ResourceGrantPolicy>& max_per_request_resource_grant_policies) {
53 concurrent_resource_grant_policies,
54 max_per_request_resource_grant_policies);
59 const std::vector<std::pair<ResourceType, size_t>>& total_resources,
60 const std::vector<ConcurrentResourceGrantPolicy>& concurrent_resource_grant_policies,
61 const std::vector<ResourceGrantPolicy>& max_resource_grants_per_request_policies) {
62 for (
const auto& total_resource : total_resources) {
66 total_resources_[
static_cast<size_t>(total_resource.first)] = total_resource.second;
70 for (
const auto& concurrent_resource_grant_policy :
71 concurrent_resource_grant_policies) {
72 const ResourceType resource_type = concurrent_resource_grant_policy.resource_type;
77 concurrent_resource_grant_policy;
80 for (
const auto& max_resource_grant_per_request_policy :
81 max_resource_grants_per_request_policies) {
83 max_resource_grant_per_request_policy.resource_subtype;
88 max_resource_grant_per_request_policy;
96 size_t resource_type_idx = 0;
98 const auto resource_type =
static_cast<ResourceType>(resource_type_idx);
99 const auto concurrency_policy_resource_type =
100 concurrent_resource_grant_policy.resource_type;
101 CHECK(resource_type == concurrency_policy_resource_type ||
105 concurrent_resource_grant_policy.resource_type = resource_type;
106 concurrent_resource_grant_policy.concurrency_policy =
108 concurrent_resource_grant_policy.oversubscription_concurrency_policy =
119 size_t resource_subtype_idx = 0;
120 for (
auto& max_resource_grant_per_request_policy :
122 const auto resource_subtype =
static_cast<ResourceSubtype>(resource_subtype_idx);
124 const auto policy_resource_subtype =
125 max_resource_grant_per_request_policy.resource_subtype;
126 CHECK(resource_subtype == policy_resource_subtype ||
130 max_resource_grant_per_request_policy.resource_subtype = resource_subtype;
131 max_resource_grant_per_request_policy.policy_size_type =
135 max_resource_grant_per_request_policy.resource_subtype)] =
136 max_resource_grant_per_request_policy.get_grant_quantity(
139 .oversubscription_concurrency_policy ==
142 max_resource_grant_per_request_policy.resource_subtype =
145 resource_subtype_idx++;
150 for (
size_t resource_idx = 0; resource_idx <
ResourceTypeSize; ++resource_idx) {
157 LOG(
EXECUTOR) <<
"Resource: " << resource_type_str <<
": " << total_resource;
158 LOG(
EXECUTOR) <<
"Concurrency Policy for " << resource_type_str <<
": "
160 LOG(
EXECUTOR) <<
"Max per-request resource grants for sub-types:";
162 for (
const auto& resource_subtype : resource_subtypes) {
172 size_t resource_type_allocation_sum{0};
173 for (
const auto& resource_subtype : resource_subtypes) {
176 return resource_type_allocation_sum;
181 std::shared_lock<std::shared_mutex> resource_read_lock(
resource_mutex_);
187 std::shared_lock<std::shared_mutex> resource_read_lock(
resource_mutex_);
213 const size_t resource_quantity) {
216 throw std::runtime_error(
217 "Executor Pool must be clear of requests to change resources available.");
219 const std::vector<std::pair<ResourceType, size_t>> total_resources_vec = {
220 std::make_pair(resource_type, resource_quantity)};
221 init(total_resources_vec, {}, {});
228 throw std::runtime_error(
229 "Executor Pool must be clear of requests to change resource concurrent resource "
232 init({}, {concurrent_resource_grant_policy}, {});
236 const size_t requested_resource_quantity,
237 const size_t min_requested_resource_quantity,
238 const size_t max_grantable_resource_quantity)
const {
239 if (requested_resource_quantity <= max_grantable_resource_quantity) {
240 return requested_resource_quantity;
242 if (min_requested_resource_quantity <= max_grantable_resource_quantity) {
243 return max_grantable_resource_quantity;
245 return static_cast<size_t>(0);
248 std::pair<size_t, size_t>
250 const size_t min_requested_dependent_resource_quantity,
251 const size_t min_requested_independent_resource_quantity,
252 const size_t dependent_to_independent_resource_ratio)
const {
253 const size_t adjusted_min_independent_resource_quantity =
254 std::max(static_cast<size_t>(
255 ceil(static_cast<double>(min_requested_dependent_resource_quantity) /
256 dependent_to_independent_resource_ratio)),
257 min_requested_independent_resource_quantity);
258 const size_t adjusted_min_dependent_resource_quantity =
259 adjusted_min_independent_resource_quantity *
260 dependent_to_independent_resource_ratio;
261 return std::make_pair(adjusted_min_dependent_resource_quantity,
262 adjusted_min_independent_resource_quantity);
265 std::pair<size_t, size_t>
267 const size_t requested_dependent_resource_quantity,
268 const size_t min_requested_dependent_resource_quantity,
269 const size_t max_grantable_dependent_resource_quantity,
270 const size_t min_requested_independent_resource_quantity,
271 const size_t max_grantable_independent_resource_quantity,
272 const size_t dependent_to_independent_resource_ratio)
const {
273 CHECK_LE(min_requested_dependent_resource_quantity,
274 requested_dependent_resource_quantity);
275 CHECK_LE(min_requested_independent_resource_quantity,
276 max_grantable_independent_resource_quantity);
278 if (requested_dependent_resource_quantity <=
279 max_grantable_dependent_resource_quantity) {
282 return std::make_pair(requested_dependent_resource_quantity,
283 max_grantable_independent_resource_quantity);
287 const auto adjusted_min_dependent_and_independent_resource_grant =
289 min_requested_dependent_resource_quantity,
290 min_requested_independent_resource_quantity,
291 dependent_to_independent_resource_ratio);
293 if (adjusted_min_dependent_and_independent_resource_grant.first >
294 max_grantable_dependent_resource_quantity) {
297 return std::make_pair(static_cast<size_t>(0), static_cast<size_t>(0));
300 const size_t adjusted_max_independent_resource_quantity = std::min(
301 max_grantable_dependent_resource_quantity / dependent_to_independent_resource_ratio,
302 max_grantable_independent_resource_quantity);
304 CHECK_GE(adjusted_max_independent_resource_quantity,
305 adjusted_min_dependent_and_independent_resource_grant.second);
307 const size_t granted_dependent_resource_quantity =
308 dependent_to_independent_resource_ratio *
309 adjusted_max_independent_resource_quantity;
310 return std::make_pair(granted_dependent_resource_quantity,
311 adjusted_max_independent_resource_quantity);
316 const size_t min_resource_requested)
const {
317 const size_t max_resource_grant_per_request =
320 switch (resource_subtype) {
323 min_resource_requested);
326 min_resource_requested);
329 min_resource_requested);
331 throw std::runtime_error(
332 "Insufficient resources for request");
336 std::vector<ResourceRequestGrant>
338 const std::vector<ResourceRequest>& resource_requests)
const {
339 std::vector<ResourceRequestGrant> resource_request_grants;
341 std::array<ResourceRequestGrant, ResourceSubtypeSize> all_resource_grants;
342 for (
const auto& resource_request : resource_requests) {
344 CHECK_LE(resource_request.min_quantity, resource_request.max_quantity);
349 resource_request.max_quantity,
350 resource_request.min_quantity,
352 if (resource_grant.
max_quantity < resource_request.min_quantity) {
356 resource_request.min_quantity);
361 return resource_request_grants;
364 std::pair<ResourceGrant, ResourceGrant>
407 const size_t max_pinned_buffer_pool_grant_for_memory_level =
413 if (chunk_request_info.total_bytes > max_pinned_buffer_pool_grant_for_memory_level) {
414 if (!chunk_request_info.bytes_scales_per_kernel) {
416 chunk_request_info.total_bytes,
417 chunk_request_info.device_memory_pool_type);
422 const size_t max_pageable_buffer_pool_grant_for_memory_level =
428 const auto max_chunk_memory_and_cpu_slots_grant =
430 chunk_request_info.total_bytes,
432 .max_bytes_per_kernel,
433 max_pageable_buffer_pool_grant_for_memory_level,
437 .max_bytes_per_kernel);
439 CHECK_LE(max_chunk_memory_and_cpu_slots_grant.second, max_resource_grant.
cpu_slots);
440 if (max_chunk_memory_and_cpu_slots_grant.first ==
size_t(0)) {
442 CHECK_EQ(max_chunk_memory_and_cpu_slots_grant.second,
size_t(0));
445 const auto adjusted_min_chunk_memory_and_cpu_slots_grant =
448 .max_bytes_per_kernel,
451 .max_bytes_per_kernel);
453 CHECK_GT(adjusted_min_chunk_memory_and_cpu_slots_grant.first,
454 max_pageable_buffer_pool_grant_for_memory_level);
458 CHECK_GE(adjusted_min_chunk_memory_and_cpu_slots_grant.second,
464 max_pageable_buffer_pool_grant_for_memory_level,
465 adjusted_min_chunk_memory_and_cpu_slots_grant
467 chunk_request_info.device_memory_pool_type);
473 max_resource_grant.
cpu_slots = max_chunk_memory_and_cpu_slots_grant.second;
479 chunk_request_info.max_bytes_per_kernel * max_resource_grant.
cpu_slots;
481 chunk_request_info.max_bytes_per_kernel * request_info.
min_cpu_slots;
488 return std::make_pair(min_resource_grant, max_resource_grant);
492 const size_t resource_total,
493 const size_t resource_allocated,
497 resource_allocated > 0) {
502 resource_allocated > resource_total) {
509 const size_t min_resource_request,
510 const size_t resource_total,
511 const size_t resource_allocated,
512 const size_t global_outstanding_requests,
514 auto test_request_against_policy =
515 [min_resource_request, resource_allocated, global_outstanding_requests](
517 switch (resource_concurrency_policy) {
523 return min_resource_request == 0;
529 return global_outstanding_requests == 0;
532 return min_resource_request == 0 || resource_allocated == 0;
543 if (!test_request_against_policy(concurrent_resource_grant_policy.
concurrency_policy)) {
546 if (min_resource_request + resource_allocated <= resource_total) {
549 return test_request_against_policy(
630 if (!(can_satisfy_cpu_slots_request && can_satisfy_gpu_slots_request &&
631 can_satisfy_cpu_result_mem_request)) {
646 std::vector<std::pair<ChunkKey, size_t>> missing_chunks_with_byte_sizes;
648 if (chunk_map_for_memory_level.find(requested_chunk.first) ==
649 chunk_map_for_memory_level.end()) {
651 missing_chunk_info.
total_bytes += requested_chunk.second;
655 return missing_chunk_info;
664 size_t chunk_bytes_not_in_pool{0};
666 const auto chunk_itr = chunk_map_for_memory_level.find(requested_chunk.first);
667 if (chunk_itr == chunk_map_for_memory_level.end()) {
668 chunk_bytes_not_in_pool += requested_chunk.second;
669 }
else if (requested_chunk.second > chunk_itr->second.second) {
670 chunk_bytes_not_in_pool += requested_chunk.second - chunk_itr->second.second;
673 return chunk_bytes_not_in_pool;
681 const size_t total_buffer_mem_for_memory_level =
685 const size_t allocated_buffer_mem_for_memory_level =
694 const size_t min_buffer_pool_mem_required =
698 CHECK_LE(min_buffer_pool_mem_required, total_buffer_mem_for_memory_level);
699 return allocated_buffer_mem_for_memory_level + min_buffer_pool_mem_required <=
700 total_buffer_mem_for_memory_level;
710 return chunk_bytes_not_in_pool + allocated_buffer_mem_for_memory_level <=
711 total_buffer_mem_for_memory_level;
729 const std::string& pool_level_string =
732 LOG(
EXECUTOR) <<
"ExecutorResourePool " << pool_level_string
733 <<
" allocated_temp chunk addition: "
735 LOG(
EXECUTOR) <<
"ExecutorResourePool " << pool_level_string
736 <<
" pool state: Transient Allocations: "
739 <<
" Total Allocations: "
749 size_t& pinned_buffer_mem_for_memory_level =
755 const size_t total_buffer_mem_for_memory_level =
761 const size_t pre_pinned_chunks_for_memory_level = chunk_map_for_memory_level.size();
762 const size_t pre_pinned_buffer_mem_for_memory_level =
763 pinned_buffer_mem_for_memory_level;
766 auto chunk_itr = chunk_map_for_memory_level.find(requested_chunk.first);
767 if (chunk_itr == chunk_map_for_memory_level.end()) {
768 pinned_buffer_mem_for_memory_level += requested_chunk.second;
769 chunk_map_for_memory_level.insert(
770 std::make_pair(requested_chunk.first,
771 std::make_pair(
size_t(1) ,
772 requested_chunk.second)));
774 if (requested_chunk.second > chunk_itr->second.second) {
775 pinned_buffer_mem_for_memory_level +=
776 requested_chunk.second - chunk_itr->second.second;
777 chunk_itr->second.second = requested_chunk.second;
779 chunk_itr->second.first += 1;
782 const size_t post_pinned_chunks_for_memory_level = chunk_map_for_memory_level.size();
783 const size_t net_new_allocated_chunks =
784 post_pinned_chunks_for_memory_level - pre_pinned_chunks_for_memory_level;
785 const size_t net_new_allocated_memory =
786 pinned_buffer_mem_for_memory_level - pre_pinned_buffer_mem_for_memory_level;
788 const std::string& pool_level_string =
791 LOG(
EXECUTOR) <<
"ExecutorResourePool " << pool_level_string
792 <<
" chunk allocation: " << chunk_request_info.
num_chunks <<
" chunks | "
794 LOG(
EXECUTOR) <<
"ExecutorResourePool " << pool_level_string
795 <<
" pool delta: " << net_new_allocated_chunks <<
" chunks added | "
797 LOG(
EXECUTOR) <<
"ExecutorResourePool " << pool_level_string
798 <<
" pool state: " << post_pinned_chunks_for_memory_level <<
" chunks | "
808 chunk_map_for_memory_level.size(),
811 CHECK_LE(pinned_buffer_mem_for_memory_level, total_buffer_mem_for_memory_level);
830 const std::string& pool_level_string =
833 LOG(
EXECUTOR) <<
"ExecutorResourePool " << pool_level_string
834 <<
" allocated_temp chunk removal: "
836 LOG(
EXECUTOR) <<
"ExecutorResourePool " << pool_level_string
837 <<
" pool state: Transient Allocations: "
840 <<
" Total Allocations: "
850 size_t& pinned_buffer_mem_for_memory_level =
858 const size_t pre_remove_allocated_chunks_for_memory_level =
859 chunk_map_for_memory_level.size();
860 const size_t pre_remove_allocated_buffer_mem_for_memory_level =
861 pinned_buffer_mem_for_memory_level;
864 auto chunk_itr = chunk_map_for_memory_level.find(requested_chunk.first);
866 CHECK(chunk_itr != chunk_map_for_memory_level.end());
867 chunk_itr->second.first -= 1;
868 if (chunk_itr->second.first ==
size_t(0)) {
869 pinned_buffer_mem_for_memory_level -= chunk_itr->second.second;
870 chunk_map_for_memory_level.erase(chunk_itr);
873 const size_t total_buffer_mem_for_memory_level =
878 const size_t post_remove_allocated_chunks_for_memory_level =
879 chunk_map_for_memory_level.size();
880 const size_t net_removed_allocated_chunks =
881 pre_remove_allocated_chunks_for_memory_level -
882 post_remove_allocated_chunks_for_memory_level;
883 const size_t net_removed_allocated_memory =
884 pre_remove_allocated_buffer_mem_for_memory_level -
885 pinned_buffer_mem_for_memory_level;
887 const std::string& pool_level_string =
890 LOG(
EXECUTOR) <<
"ExecutorResourePool " << pool_level_string
891 <<
" chunk removal: " << chunk_request_info.
num_chunks <<
" chunks | "
893 LOG(
EXECUTOR) <<
"ExecutorResourePool " << pool_level_string
894 <<
" pool delta: " << net_removed_allocated_chunks <<
" chunks removed | "
896 LOG(
EXECUTOR) <<
"ExecutorResourePool " << pool_level_string
897 <<
" pool state: " << post_remove_allocated_chunks_for_memory_level
905 chunk_map_for_memory_level.size(),
913 std::shared_lock<std::shared_mutex> resource_read_lock(
resource_mutex_);
918 const size_t min_resource_requested,
919 const size_t max_resource_requested,
920 const size_t resource_allocated,
921 const size_t total_resource,
922 const double max_request_backoff_ratio)
const {
923 CHECK_LE(min_resource_requested, max_resource_requested);
924 if (min_resource_requested + resource_allocated >= total_resource) {
925 return min_resource_requested;
929 const size_t resource_remaining = total_resource - resource_allocated;
930 return std::max(min_resource_requested,
931 std::min(max_resource_requested,
933 round(max_request_backoff_ratio * resource_remaining))));
940 const double max_request_backoff_ratio)
const {
941 std::unique_lock<std::shared_mutex> resource_write_lock(
resource_mutex_);
942 CHECK_LE(max_request_backoff_ratio, 1.0);
943 const bool can_satisfy_request =
946 if (!can_satisfy_request) {
947 return std::make_pair(
false, actual_resource_grant);
954 max_request_backoff_ratio);
960 max_request_backoff_ratio);
968 max_request_backoff_ratio);
977 const size_t allocated_buffer_mem_for_memory_level =
981 const size_t total_buffer_mem_for_memory_level =
986 CHECK_LE(allocated_buffer_mem_for_memory_level, total_buffer_mem_for_memory_level);
988 const size_t remaining_buffer_mem_for_memory_level =
989 total_buffer_mem_for_memory_level - allocated_buffer_mem_for_memory_level;
992 remaining_buffer_mem_for_memory_level);
993 const size_t max_grantable_mem =
994 std::min(remaining_buffer_mem_for_memory_level,
996 const auto granted_buffer_mem_and_cpu_slots =
1006 const size_t granted_buffer_mem = granted_buffer_mem_and_cpu_slots.first;
1007 const size_t granted_cpu_slots = granted_buffer_mem_and_cpu_slots.second;
1019 return std::make_pair(
true, actual_resource_grant);
1025 std::unique_lock<std::shared_mutex> resource_write_lock(
resource_mutex_);
1030 const bool can_satisfy_request =
1032 CHECK(can_satisfy_request);
1075 LOG(
EXECUTOR) <<
"ExecutorResourcePool state: CPU slots: "
1090 std::unique_lock<std::shared_mutex> resource_write_lock(
resource_mutex_);
1137 LOG(
EXECUTOR) <<
"ExecutorResourcePool state: CPU slots: "
1154 const size_t sum_resource_requests =
1161 const bool has_outstanding_resource_requests = sum_resource_requests > 0;
1163 CHECK_EQ(has_outstanding_resource_requests, has_outstanding_num_requests_globally);
size_t get_total_resource(const ResourceType resource_type) const
A container for various stats about the current state of the ExecutorResourcePool. Note that ExecutorResourcePool does not persist a struct of this type, but rather builds one on the fly when ExecutorResourcePool::get_resource_info() is called.
A container to store requested and minimum neccessary resource requests across all resource types cur...
size_t get_chunk_bytes_not_in_pool(const ChunkRequestInfo &chunk_request_info) const
ResourcePoolInfo get_resource_info() const
Returns a struct detailing the allocated and total available resources of each type tracked in Execut...
std::array< size_t, ResourceSubtypeSize > allocated_resources_
void init_max_resource_grants_per_requests()
ResourceConcurrencyPolicy
Specifies whether grants for a specified resource can be made concurrently (ALLOW_CONCURRENT_REQEUSTS...
std::shared_mutex resource_mutex_
size_t buffer_mem_per_slot
bool can_currently_satisfy_request(const ResourceGrant &min_resource_grant, const ChunkRequestInfo &chunk_request_info) const
std::array< bool, ResourceTypeSize > resource_type_validity_
std::string to_string() const
size_t get_allocated_resource_of_subtype(const ResourceSubtype resource_subtype) const
std::array< size_t, ResourceTypeSize > total_resources_
ChunkRequestInfo get_requested_chunks_not_in_pool(const ChunkRequestInfo &chunk_request_info) const
ResourceType resource_type
The type of a resource this concurrent resource grant policy pertains to.
void set_resource(const ResourceType resource_type, const size_t resource_quantity)
Sets the quantity of resource_type to resource_quantity. If pool has outstanding requests, will throw. Responsibility of allowing the pool to empty and preventing concurrent requests while this operation is running is left to the caller (in particular, ExecutorResourceMgr::set_resource pauses the process queue, which waits until all executing requests are finished before yielding to the caller, before calling this method).
static constexpr size_t ResourceTypeSize
const ResourceGrantPolicy & get_max_resource_grant_per_request_policy(const ResourceSubtype resource_subtype) const
void deallocate_resources(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
Deallocates resources granted to a requestor such that they can be used for other requests...
Specifies the minimum and maximum quanity either requested or granted for a request of resource_subty...
ResourceType
Stores the resource type for a ExecutorResourcePool request.
std::vector< ResourceSubtype > map_resource_type_to_resource_subtypes(const ResourceType resource_type)
Returns the 1-or-more ResourceSubtypes associated with a given ResourceType.
bool is_resource_valid(const ResourceType resource_type) const
void init_concurrency_policies()
bool can_currently_satisfy_request_impl(const ResourceGrant &min_resource_grant, const ChunkRequestInfo &chunk_request_info) const
size_t calc_max_resource_grant_for_request(const size_t requested_resource_quantity, const size_t min_requested_resource_quantity, const size_t max_grantable_resource_quantity) const
std::map< ChunkKey, std::pair< size_t, size_t >> BufferPoolChunkMap
std::vector< std::pair< ChunkKey, size_t > > chunks_with_byte_sizes
Specifies the policies for resource grants in the presence of other requests, both under situations o...
void add_chunk_requests_to_allocated_pool(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
size_t determine_dynamic_single_resource_grant(const size_t min_resource_requested, const size_t max_resource_requested, const size_t resource_allocated, const size_t total_resource, const double max_request_backoff_ratio) const
ResourceSubtype
Stores the resource sub-type for a ExecutorResourcePool request.
bool can_currently_satisfy_chunk_request(const ResourceGrant &min_resource_grant, const ChunkRequestInfo &chunk_request_info) const
size_t get_total_allocated_buffer_pool_mem_for_level(const ExecutorDeviceType memory_pool_type) const
size_t increment_outstanding_per_resource_num_requests(const ResourceType resource_type)
BufferPoolChunkMap allocated_cpu_buffer_pool_chunks_
ResourceConcurrencyPolicy concurrency_policy
The grant policy in effect when there are concurrent requests for the resource specified by resource_...
static std::mutex debug_print_mutex_
std::pair< ResourceGrant, ResourceGrant > calc_min_max_resource_grants_for_request(const RequestInfo &resource_request) const
Given the provided resource_request, statically calculate the minimum and maximum grantable resources...
std::vector< ResourceRequestGrant > calc_static_resource_grant_ranges_for_request(const std::vector< ResourceRequest > &resource_requests) const
void allocate_resources(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
Given a resource grant (assumed to be computed in determine_dynamic_resource_grant), actually allocate (reserve) the resources in the pool so other requestors (queries) cannot use those resources until returned to the pool.
Specifies the resources of each type for a given resource grant.
void set_concurrent_resource_grant_policy(const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
Resets the concurrent resource grant policy object, which specifies a ResourceType as well as normal ...
size_t increment_total_per_resource_num_requests(const ResourceType resource_type)
size_t buffer_mem_for_given_slots
size_t min_cpu_result_mem
std::string to_string() const
std::pair< size_t, size_t > calc_max_dependent_resource_grant_for_request(const size_t requested_dependent_resource_quantity, const size_t min_requested_dependent_resource_quantity, const size_t max_grantable_dependent_resource_quantity, const size_t min_requested_independent_resource_quantity, const size_t max_grantable_independent_resource_quantity, const size_t dependent_to_independent_resource_ratio) const
bool check_request_against_global_policy(const size_t resource_total, const size_t resource_allocated, const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy) const
ExecutorResourcePool(const std::vector< std::pair< ResourceType, size_t >> &total_resources, const std::vector< ConcurrentResourceGrantPolicy > &concurrent_resource_grant_policies, const std::vector< ResourceGrantPolicy > &max_per_request_resource_grant_policies)
ResourceConcurrencyPolicy oversubscription_concurrency_policy
The grant policy in effect when there are concurrent requests for the resource specified by resource_...
std::string to_string() const
void sanity_check_requests_against_allocations() const
std::pair< bool, ResourceGrant > determine_dynamic_resource_grant(const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant, const ChunkRequestInfo &chunk_request_info, const double max_request_backoff_ratio) const
Determines the actual resource grant to give a query (which will be somewhere between the provided mi...
std::string resource_type_to_string(const ResourceType resource_type)
std::pair< size_t, size_t > calc_min_dependent_resource_grant_for_request(const size_t min_requested_dependent_resource_quantity, const size_t min_requested_independent_resource_quantity, const size_t dependent_to_independent_resource_ratio) const
BufferPoolChunkMap allocated_gpu_buffer_pool_chunks_
ExecutorDeviceType device_memory_pool_type
size_t outstanding_num_requests_
std::array< ConcurrentResourceGrantPolicy, ResourceTypeSize > concurrent_resource_grant_policies_
ResourceType map_resource_subtype_to_resource_type(const ResourceSubtype resource_subtype)
Returns the ResourceType associated with a given ResourceSubtype
void remove_chunk_requests_from_allocated_pool(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
void init(const std::vector< std::pair< ResourceType, size_t >> &total_resources, const std::vector< ConcurrentResourceGrantPolicy > &concurrent_resource_grant_policies, const std::vector< ResourceGrantPolicy > &max_per_request_resource_grant_policies)
void log_parameters() const
size_t get_outstanding_per_resource_num_requests(const ResourceType resource_type) const
size_t total_num_requests_
const bool sanity_check_pool_state_on_deallocations_
size_t get_max_resource_grant_per_request(const ResourceSubtype resource_subtype) const
ResourceSubtype resource_subtype
bool check_request_against_policy(const size_t resource_request, const size_t resource_total, const size_t resource_allocated, const size_t global_outstanding_requests, const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy) const
Specifies all DataMgr chunks needed for a query step/request, along with their sizes in bytes...
bool buffer_mem_gated_per_slot
size_t max_bytes_per_kernel
std::array< size_t, ResourceSubtypeSize > max_resource_grants_per_request_
size_t decrement_outstanding_per_resource_num_requests(const ResourceType resource_type)
void throw_insufficient_resource_error(const ResourceSubtype resource_subtype, const size_t min_resource_requested) const
size_t get_allocated_resource_of_type(const ResourceType resource_type) const
const bool ENABLE_DEBUG_PRINTING
ChunkRequestInfo chunk_request_info
std::array< ResourceGrantPolicy, ResourceSubtypeSize > max_resource_grants_per_request_policies_
void debug_print(Ts &&...print_args)
ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ResourceType resource_type) const