OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ExecutorResourcePool.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <iostream> // For debug print methods
18 #include <sstream>
19 
20 #include "ExecutorResourcePool.h"
21 
22 namespace ExecutorResourceMgr_Namespace {
23 
24 const bool ENABLE_DEBUG_PRINTING{false};
25 static std::mutex debug_print_mutex_;
26 
27 template <typename... Ts>
28 void debug_print(Ts&&... print_args) {
29  std::unique_lock<std::mutex> print_lock(debug_print_mutex_);
30  (std::cout << ... << print_args);
31  std::cout << std::endl;
32 }
33 
34 void ResourceGrant::print() const {
35  std::cout << std::endl << "Request Grant Info" << std::endl;
36  std::cout << "Grant CPU slots: " << cpu_slots << std::endl;
37  std::cout << "Grant GPU slots: " << gpu_slots << std::endl;
38  std::cout << "Grant CPU result mem: " << format_num_bytes(cpu_result_mem) << std::endl;
39 }
40 
41 std::string ResourceGrant::to_string() const {
42  std::ostringstream oss;
43  oss << "Granted CPU Slots: " << cpu_slots << " GPU Slots: " << gpu_slots
44  << " CPU result mem: " << format_num_bytes(cpu_result_mem);
45  return oss.str();
46 }
47 
49  const std::vector<std::pair<ResourceType, size_t>>& total_resources,
50  const std::vector<ConcurrentResourceGrantPolicy>& concurrent_resource_grant_policies,
51  const std::vector<ResourceGrantPolicy>& max_per_request_resource_grant_policies) {
52  init(total_resources,
53  concurrent_resource_grant_policies,
54  max_per_request_resource_grant_policies);
56 }
57 
59  const std::vector<std::pair<ResourceType, size_t>>& total_resources,
60  const std::vector<ConcurrentResourceGrantPolicy>& concurrent_resource_grant_policies,
61  const std::vector<ResourceGrantPolicy>& max_resource_grants_per_request_policies) {
62  for (const auto& total_resource : total_resources) {
63  if (total_resource.first == ResourceType::INVALID_TYPE) {
64  continue;
65  }
66  total_resources_[static_cast<size_t>(total_resource.first)] = total_resource.second;
67  resource_type_validity_[static_cast<size_t>(total_resource.first)] = true;
68  }
69 
70  for (const auto& concurrent_resource_grant_policy :
71  concurrent_resource_grant_policies) {
72  const ResourceType resource_type = concurrent_resource_grant_policy.resource_type;
73  if (resource_type == ResourceType::INVALID_TYPE) {
74  continue;
75  }
76  concurrent_resource_grant_policies_[static_cast<size_t>(resource_type)] =
77  concurrent_resource_grant_policy;
78  }
79 
80  for (const auto& max_resource_grant_per_request_policy :
81  max_resource_grants_per_request_policies) {
82  const ResourceSubtype resource_subtype =
83  max_resource_grant_per_request_policy.resource_subtype;
84  if (resource_subtype == ResourceSubtype::INVALID_SUBTYPE) {
85  continue;
86  }
87  max_resource_grants_per_request_policies_[static_cast<size_t>(resource_subtype)] =
88  max_resource_grant_per_request_policy;
89  }
90 
93 }
94 
96  size_t resource_type_idx = 0;
97  for (auto& concurrent_resource_grant_policy : concurrent_resource_grant_policies_) {
98  const auto resource_type = static_cast<ResourceType>(resource_type_idx);
99  const auto concurrency_policy_resource_type =
100  concurrent_resource_grant_policy.resource_type;
101  CHECK(resource_type == concurrency_policy_resource_type ||
102  concurrency_policy_resource_type == ResourceType::INVALID_TYPE);
103  if (is_resource_valid(resource_type)) {
104  if (concurrency_policy_resource_type == ResourceType::INVALID_TYPE) {
105  concurrent_resource_grant_policy.resource_type = resource_type;
106  concurrent_resource_grant_policy.concurrency_policy =
108  concurrent_resource_grant_policy.oversubscription_concurrency_policy =
110  }
111  } else {
112  concurrent_resource_grant_policy.resource_type = ResourceType::INVALID_TYPE;
113  }
114  resource_type_idx++;
115  }
116 }
117 
119  size_t resource_subtype_idx = 0;
120  for (auto& max_resource_grant_per_request_policy :
122  const auto resource_subtype = static_cast<ResourceSubtype>(resource_subtype_idx);
123  const auto resource_type = map_resource_subtype_to_resource_type(resource_subtype);
124  const auto policy_resource_subtype =
125  max_resource_grant_per_request_policy.resource_subtype;
126  CHECK(resource_subtype == policy_resource_subtype ||
127  policy_resource_subtype == ResourceSubtype::INVALID_SUBTYPE);
128  if (is_resource_valid(resource_type)) {
129  if (policy_resource_subtype == ResourceSubtype::INVALID_SUBTYPE) {
130  max_resource_grant_per_request_policy.resource_subtype = resource_subtype;
131  max_resource_grant_per_request_policy.policy_size_type =
133  }
134  max_resource_grants_per_request_[static_cast<size_t>(
135  max_resource_grant_per_request_policy.resource_subtype)] =
136  max_resource_grant_per_request_policy.get_grant_quantity(
137  get_total_resource(resource_type),
139  .oversubscription_concurrency_policy ==
141  } else {
142  max_resource_grant_per_request_policy.resource_subtype =
144  }
145  resource_subtype_idx++;
146  }
147 }
148 
150  for (size_t resource_idx = 0; resource_idx < ResourceTypeSize; ++resource_idx) {
151  const ResourceType resource_type = static_cast<ResourceType>(resource_idx);
152  if (!is_resource_valid(resource_type)) {
153  continue;
154  }
155  const auto total_resource = get_total_resource(resource_type);
156  const auto resource_type_str = resource_type_to_string(resource_type);
157  LOG(EXECUTOR) << "Resource: " << resource_type_str << ": " << total_resource;
158  LOG(EXECUTOR) << "Concurrency Policy for " << resource_type_str << ": "
160  LOG(EXECUTOR) << "Max per-request resource grants for sub-types:";
161  const auto resource_subtypes = map_resource_type_to_resource_subtypes(resource_type);
162  for (const auto& resource_subtype : resource_subtypes) {
163  LOG(EXECUTOR)
165  }
166  }
167 }
168 
170  const ResourceType resource_type) const {
171  const auto resource_subtypes = map_resource_type_to_resource_subtypes(resource_type);
172  size_t resource_type_allocation_sum{0};
173  for (const auto& resource_subtype : resource_subtypes) {
174  resource_type_allocation_sum += get_allocated_resource_of_subtype(resource_subtype);
175  }
176  return resource_type_allocation_sum;
177 }
178 
179 std::pair<size_t, size_t> ExecutorResourcePool::get_resource_info(
180  const ResourceType resource_type) const {
181  std::shared_lock<std::shared_mutex> resource_read_lock(resource_mutex_);
182  return std::make_pair(get_allocated_resource_of_type(resource_type),
183  get_total_resource(resource_type));
184 }
185 
187  std::shared_lock<std::shared_mutex> resource_read_lock(resource_mutex_);
188  return ResourcePoolInfo(
210 }
211 
213  const size_t resource_quantity) {
214  CHECK(resource_type != ResourceType::INVALID_TYPE);
216  throw std::runtime_error(
217  "Executor Pool must be clear of requests to change resources available.");
218  }
219  const std::vector<std::pair<ResourceType, size_t>> total_resources_vec = {
220  std::make_pair(resource_type, resource_quantity)};
221  init(total_resources_vec, {}, {});
222 }
223 
225  const ConcurrentResourceGrantPolicy& concurrent_resource_grant_policy) {
226  CHECK(concurrent_resource_grant_policy.resource_type != ResourceType::INVALID_TYPE);
228  throw std::runtime_error(
229  "Executor Pool must be clear of requests to change resource concurrent resource "
230  "grant policies.");
231  }
232  init({}, {concurrent_resource_grant_policy}, {});
233 }
234 
236  const size_t requested_resource_quantity,
237  const size_t min_requested_resource_quantity,
238  const size_t max_grantable_resource_quantity) const {
239  if (requested_resource_quantity <= max_grantable_resource_quantity) {
240  return requested_resource_quantity;
241  }
242  if (min_requested_resource_quantity <= max_grantable_resource_quantity) {
243  return max_grantable_resource_quantity;
244  }
245  return static_cast<size_t>(0);
246 }
247 
248 std::pair<size_t, size_t>
250  const size_t min_requested_dependent_resource_quantity,
251  const size_t min_requested_independent_resource_quantity,
252  const size_t dependent_to_independent_resource_ratio) const {
253  const size_t adjusted_min_independent_resource_quantity =
254  std::max(static_cast<size_t>(
255  ceil(static_cast<double>(min_requested_dependent_resource_quantity) /
256  dependent_to_independent_resource_ratio)),
257  min_requested_independent_resource_quantity);
258  const size_t adjusted_min_dependent_resource_quantity =
259  adjusted_min_independent_resource_quantity *
260  dependent_to_independent_resource_ratio;
261  return std::make_pair(adjusted_min_dependent_resource_quantity,
262  adjusted_min_independent_resource_quantity);
263 }
264 
265 std::pair<size_t, size_t>
267  const size_t requested_dependent_resource_quantity,
268  const size_t min_requested_dependent_resource_quantity,
269  const size_t max_grantable_dependent_resource_quantity,
270  const size_t min_requested_independent_resource_quantity,
271  const size_t max_grantable_independent_resource_quantity,
272  const size_t dependent_to_independent_resource_ratio) const {
273  CHECK_LE(min_requested_dependent_resource_quantity,
274  requested_dependent_resource_quantity);
275  CHECK_LE(min_requested_independent_resource_quantity,
276  max_grantable_independent_resource_quantity);
277 
278  if (requested_dependent_resource_quantity <=
279  max_grantable_dependent_resource_quantity) {
280  // Dependent resource request falls under max grantable limit, grant requested
281  // resource
282  return std::make_pair(requested_dependent_resource_quantity,
283  max_grantable_independent_resource_quantity);
284  }
285  // First member of pair returned is min resource grant, second is min dependent
286  // resource grant
287  const auto adjusted_min_dependent_and_independent_resource_grant =
289  min_requested_dependent_resource_quantity,
290  min_requested_independent_resource_quantity,
291  dependent_to_independent_resource_ratio);
292 
293  if (adjusted_min_dependent_and_independent_resource_grant.first >
294  max_grantable_dependent_resource_quantity) {
295  // If here the min grantable dependent resource is greater than what was to provided
296  // to the function as grantable of the dependent resource
297  return std::make_pair(static_cast<size_t>(0), static_cast<size_t>(0));
298  }
299 
300  const size_t adjusted_max_independent_resource_quantity = std::min(
301  max_grantable_dependent_resource_quantity / dependent_to_independent_resource_ratio,
302  max_grantable_independent_resource_quantity);
303 
304  CHECK_GE(adjusted_max_independent_resource_quantity,
305  adjusted_min_dependent_and_independent_resource_grant.second);
306 
307  const size_t granted_dependent_resource_quantity =
308  dependent_to_independent_resource_ratio *
309  adjusted_max_independent_resource_quantity;
310  return std::make_pair(granted_dependent_resource_quantity,
311  adjusted_max_independent_resource_quantity);
312 }
313 
315  const ResourceSubtype resource_subtype,
316  const size_t min_resource_requested) const {
317  const size_t max_resource_grant_per_request =
318  get_max_resource_grant_per_request(resource_subtype);
319 
320  switch (resource_subtype) {
322  throw QueryNeedsTooManyCpuSlots(max_resource_grant_per_request,
323  min_resource_requested);
325  throw QueryNeedsTooManyGpuSlots(max_resource_grant_per_request,
326  min_resource_requested);
328  throw QueryNeedsTooMuchCpuResultMem(max_resource_grant_per_request,
329  min_resource_requested);
330  default:
331  throw std::runtime_error(
332  "Insufficient resources for request"); // todo: just placeholder
333  }
334 }
335 
336 std::vector<ResourceRequestGrant>
338  const std::vector<ResourceRequest>& resource_requests) const {
339  std::vector<ResourceRequestGrant> resource_request_grants;
340 
341  std::array<ResourceRequestGrant, ResourceSubtypeSize> all_resource_grants;
342  for (const auto& resource_request : resource_requests) {
343  CHECK(resource_request.resource_subtype != ResourceSubtype::INVALID_SUBTYPE);
344  CHECK_LE(resource_request.min_quantity, resource_request.max_quantity);
345 
346  ResourceRequestGrant resource_grant;
347  resource_grant.resource_subtype = resource_request.resource_subtype;
349  resource_request.max_quantity,
350  resource_request.min_quantity,
351  get_max_resource_grant_per_request(resource_request.resource_subtype));
352  if (resource_grant.max_quantity < resource_request.min_quantity) {
353  // Current implementation should always return 0 if it cannot grant requested amount
354  CHECK_EQ(resource_grant.max_quantity, size_t(0));
355  throw_insufficient_resource_error(resource_request.resource_subtype,
356  resource_request.min_quantity);
357  }
358  all_resource_grants[static_cast<size_t>(resource_grant.resource_subtype)] =
359  resource_grant;
360  }
361  return resource_request_grants;
362 }
363 
364 std::pair<ResourceGrant, ResourceGrant>
366  const RequestInfo& request_info) const {
367  ResourceGrant min_resource_grant, max_resource_grant;
368 
369  CHECK_LE(request_info.min_cpu_slots, request_info.cpu_slots);
370  CHECK_LE(request_info.min_gpu_slots, request_info.gpu_slots);
371  CHECK_LE(request_info.min_cpu_result_mem, request_info.cpu_result_mem);
372 
373  max_resource_grant.cpu_slots = calc_max_resource_grant_for_request(
374  request_info.cpu_slots,
375  request_info.min_cpu_slots,
377  if (max_resource_grant.cpu_slots == 0 && request_info.min_cpu_slots > 0) {
380  request_info.min_cpu_slots);
381  }
382 
383  max_resource_grant.gpu_slots = calc_max_resource_grant_for_request(
384  request_info.gpu_slots,
385  request_info.min_gpu_slots,
387  if (max_resource_grant.gpu_slots == 0 && request_info.min_gpu_slots > 0) {
390  request_info.min_gpu_slots);
391  }
392 
393  // Todo (todd): Modulate number of CPU threads launched to ensure that
394  // query can fit in max grantable CPU result memory (if possible)
396  request_info.cpu_result_mem,
397  request_info.min_cpu_result_mem,
399  if (max_resource_grant.cpu_result_mem == 0 && request_info.min_cpu_result_mem > 0) {
402  request_info.min_cpu_result_mem);
403  }
404 
405  const auto& chunk_request_info = request_info.chunk_request_info;
406 
407  const size_t max_pinned_buffer_pool_grant_for_memory_level =
409  chunk_request_info.device_memory_pool_type == ExecutorDeviceType::CPU
412 
413  if (chunk_request_info.total_bytes > max_pinned_buffer_pool_grant_for_memory_level) {
414  if (!chunk_request_info.bytes_scales_per_kernel) {
415  throw QueryNeedsTooMuchBufferPoolMem(max_pinned_buffer_pool_grant_for_memory_level,
416  chunk_request_info.total_bytes,
417  chunk_request_info.device_memory_pool_type);
418  }
419  // If here we have bytes_needed_scales_per_kernel
420  // For now, this can only be for a CPU request, but that may be relaxed down the
421  // road
422  const size_t max_pageable_buffer_pool_grant_for_memory_level =
424  chunk_request_info.device_memory_pool_type == ExecutorDeviceType::CPU
427  CHECK(chunk_request_info.device_memory_pool_type == ExecutorDeviceType::CPU);
428  const auto max_chunk_memory_and_cpu_slots_grant =
430  chunk_request_info.total_bytes, // requested_dependent_resource_quantity
431  chunk_request_info
432  .max_bytes_per_kernel, // min_requested_dependent_resource_quantity
433  max_pageable_buffer_pool_grant_for_memory_level, // max_grantable_dependent_resource_quantity
434  request_info.min_cpu_slots, // min_requested_independent_resource_quantity
435  max_resource_grant.cpu_slots, // max_grantable_indepndent_resource_quantity
436  chunk_request_info
437  .max_bytes_per_kernel); // dependent_to_independent_resource_ratio
438 
439  CHECK_LE(max_chunk_memory_and_cpu_slots_grant.second, max_resource_grant.cpu_slots);
440  if (max_chunk_memory_and_cpu_slots_grant.first == size_t(0)) {
441  // Make sure cpu_slots is 0 as well
442  CHECK_EQ(max_chunk_memory_and_cpu_slots_grant.second, size_t(0));
443  // Get what min grant would have been if it was grantable so that we can present a
444  // meaningful error message
445  const auto adjusted_min_chunk_memory_and_cpu_slots_grant =
447  chunk_request_info
448  .max_bytes_per_kernel, // min_requested_dependent_resource_quantity
449  request_info.min_cpu_slots, // min_requested_independent_resource_quantity
450  chunk_request_info
451  .max_bytes_per_kernel); // dependent_to_independent_resource_ratio
452  // Ensure we would not have been able to satisfy this grant
453  CHECK_GT(adjusted_min_chunk_memory_and_cpu_slots_grant.first,
454  max_pageable_buffer_pool_grant_for_memory_level);
455  // The logic for calc_min_dependent_resource_grant_for_request is constrained to
456  // at least return at least the min dependent resource quantity requested, here
457  // CPU slots
458  CHECK_GE(adjusted_min_chunk_memory_and_cpu_slots_grant.second,
459  request_info.min_cpu_slots);
460 
461  // May need additional error message as we could fail even though bytes per kernel
462  // < total buffer pool bytes, if cpu slots < min requested cpu slots
464  max_pageable_buffer_pool_grant_for_memory_level,
465  adjusted_min_chunk_memory_and_cpu_slots_grant
466  .first, // min chunk memory grant (without chunk grant constraints)
467  chunk_request_info.device_memory_pool_type);
468  }
469  // If here query is allowed but cpu slots are gated to gate number of chunks
470  // simultaneously pinned We should have been gated to a minimum of our request's
471  // min_cpu_slots
472  CHECK_GE(max_chunk_memory_and_cpu_slots_grant.second, request_info.min_cpu_slots);
473  max_resource_grant.cpu_slots = max_chunk_memory_and_cpu_slots_grant.second;
474  max_resource_grant.buffer_mem_gated_per_slot = true;
475  min_resource_grant.buffer_mem_gated_per_slot = true;
476  max_resource_grant.buffer_mem_per_slot = chunk_request_info.max_bytes_per_kernel;
477  min_resource_grant.buffer_mem_per_slot = chunk_request_info.max_bytes_per_kernel;
478  max_resource_grant.buffer_mem_for_given_slots =
479  chunk_request_info.max_bytes_per_kernel * max_resource_grant.cpu_slots;
480  min_resource_grant.buffer_mem_for_given_slots =
481  chunk_request_info.max_bytes_per_kernel * request_info.min_cpu_slots;
482  }
483 
484  min_resource_grant.cpu_slots = request_info.min_cpu_slots;
485  min_resource_grant.gpu_slots = request_info.min_gpu_slots;
486  min_resource_grant.cpu_result_mem = request_info.cpu_result_mem;
487 
488  return std::make_pair(min_resource_grant, max_resource_grant);
489 }
490 
492  const size_t resource_total,
493  const size_t resource_allocated,
494  const ConcurrentResourceGrantPolicy& concurrent_resource_grant_policy) const {
495  if (concurrent_resource_grant_policy.concurrency_policy ==
497  resource_allocated > 0) {
498  return false;
499  }
500  if (concurrent_resource_grant_policy.oversubscription_concurrency_policy ==
502  resource_allocated > resource_total) {
503  return false;
504  }
505  return true;
506 }
507 
509  const size_t min_resource_request,
510  const size_t resource_total,
511  const size_t resource_allocated,
512  const size_t global_outstanding_requests,
513  const ConcurrentResourceGrantPolicy& concurrent_resource_grant_policy) const {
514  auto test_request_against_policy =
515  [min_resource_request, resource_allocated, global_outstanding_requests](
516  const ResourceConcurrencyPolicy& resource_concurrency_policy) {
517  switch (resource_concurrency_policy) {
519  // DISALLOW_REQUESTS for undersubscription policy doesn't make much sense as
520  // a resource pool-wide policy (unless we are using it as a sanity check for
521  // something like CPU mode), but planning to implement per-query or priority
522  // level policies so will leave for now
523  return min_resource_request == 0;
524  }
526  // redundant with check_request_against_global_policy,
527  // so considered CHECKing instead that the following cannot
528  // be true, but didn't want to couple the two functions
529  return global_outstanding_requests == 0;
530  }
532  return min_resource_request == 0 || resource_allocated == 0;
533  }
535  return true;
536  }
537  default:
538  UNREACHABLE();
539  }
540  return false;
541  };
542 
543  if (!test_request_against_policy(concurrent_resource_grant_policy.concurrency_policy)) {
544  return false;
545  }
546  if (min_resource_request + resource_allocated <= resource_total) {
547  return true;
548  }
549  return test_request_against_policy(
550  concurrent_resource_grant_policy.oversubscription_concurrency_policy);
551 }
552 
553 /* Unlocked internal version */
554 
556  const ResourceGrant& min_resource_grant,
557  const ChunkRequestInfo& chunk_request_info) const {
558  // Currently expects to be protected by mutex from ExecutorResourceMgr
559 
560  // Arguably exceptions below shouldn't happen as resource_grant,
561  // if generated by ExecutorResourcePool per design, should be within
562  // per query max limits. But since this is an external class api call and
563  // the input could be anything provided by the caller, and we may want
564  // to allow for dynamic per query limits, throwing instead of CHECKing
565  // for now, but may re-evaluate.
566 
567  if (min_resource_grant.cpu_slots >
571  min_resource_grant.cpu_slots);
572  }
573  if (min_resource_grant.gpu_slots >
577  min_resource_grant.gpu_slots);
578  }
579  if (min_resource_grant.cpu_result_mem >
583  min_resource_grant.cpu_result_mem);
584  }
585 
586  // First check if request is in violation of any global
587  // ALLOW_SINGLE_GLOBAL_REQUEST policies
588 
593  return false;
594  }
599  return false;
600  }
605  return false;
606  }
607 
608  const bool can_satisfy_cpu_slots_request = check_request_against_policy(
609  min_resource_grant.cpu_slots,
614 
615  const bool can_satisfy_gpu_slots_request = check_request_against_policy(
616  min_resource_grant.gpu_slots,
621 
622  const bool can_satisfy_cpu_result_mem_request = check_request_against_policy(
623  min_resource_grant.cpu_result_mem,
628 
629  // Short circuit before heavier chunk check operation
630  if (!(can_satisfy_cpu_slots_request && can_satisfy_gpu_slots_request &&
631  can_satisfy_cpu_result_mem_request)) {
632  return false;
633  }
634 
635  return can_currently_satisfy_chunk_request(min_resource_grant, chunk_request_info);
636 }
637 
639  const ChunkRequestInfo& chunk_request_info) const {
640  const BufferPoolChunkMap& chunk_map_for_memory_level =
644  ChunkRequestInfo missing_chunk_info;
645  missing_chunk_info.device_memory_pool_type = chunk_request_info.device_memory_pool_type;
646  std::vector<std::pair<ChunkKey, size_t>> missing_chunks_with_byte_sizes;
647  for (const auto& requested_chunk : chunk_request_info.chunks_with_byte_sizes) {
648  if (chunk_map_for_memory_level.find(requested_chunk.first) ==
649  chunk_map_for_memory_level.end()) {
650  missing_chunk_info.chunks_with_byte_sizes.emplace_back(requested_chunk);
651  missing_chunk_info.total_bytes += requested_chunk.second;
652  }
653  }
654  missing_chunk_info.num_chunks = missing_chunk_info.chunks_with_byte_sizes.size();
655  return missing_chunk_info;
656 }
657 
659  const ChunkRequestInfo& chunk_request_info) const {
660  const BufferPoolChunkMap& chunk_map_for_memory_level =
664  size_t chunk_bytes_not_in_pool{0};
665  for (const auto& requested_chunk : chunk_request_info.chunks_with_byte_sizes) {
666  const auto chunk_itr = chunk_map_for_memory_level.find(requested_chunk.first);
667  if (chunk_itr == chunk_map_for_memory_level.end()) {
668  chunk_bytes_not_in_pool += requested_chunk.second;
669  } else if (requested_chunk.second > chunk_itr->second.second) {
670  chunk_bytes_not_in_pool += requested_chunk.second - chunk_itr->second.second;
671  }
672  }
673  return chunk_bytes_not_in_pool;
674 }
675 
677  const ResourceGrant& min_resource_grant,
678  const ChunkRequestInfo& chunk_request_info) const {
679  // Expects lock on resource_mutex_ already taken
680 
681  const size_t total_buffer_mem_for_memory_level =
685  const size_t allocated_buffer_mem_for_memory_level =
687  chunk_request_info.device_memory_pool_type);
688 
689  if (min_resource_grant.buffer_mem_gated_per_slot) {
690  CHECK_GT(min_resource_grant.buffer_mem_per_slot, size_t(0));
691  // We only allow scaling back slots to cap buffer pool memory required on CPU
692  // currently
694  const size_t min_buffer_pool_mem_required =
695  min_resource_grant.cpu_slots * min_resource_grant.buffer_mem_per_slot;
696  // Below is a sanity check... we'll never be able to run the query if minimum pool
697  // memory required is not <= the total buffer pool memory
698  CHECK_LE(min_buffer_pool_mem_required, total_buffer_mem_for_memory_level);
699  return allocated_buffer_mem_for_memory_level + min_buffer_pool_mem_required <=
700  total_buffer_mem_for_memory_level;
701  }
702 
703  // CHECK and not exception as parent should have checked this, can re-evaluate whether
704  // should be exception
705  CHECK_LE(chunk_request_info.total_bytes, total_buffer_mem_for_memory_level);
706  const size_t chunk_bytes_not_in_pool = get_chunk_bytes_not_in_pool(chunk_request_info);
707  if (ENABLE_DEBUG_PRINTING) {
708  debug_print("Chunk bytes not in pool: ", format_num_bytes(chunk_bytes_not_in_pool));
709  }
710  return chunk_bytes_not_in_pool + allocated_buffer_mem_for_memory_level <=
711  total_buffer_mem_for_memory_level;
712 }
713 
715  const ResourceGrant& resource_grant,
716  const ChunkRequestInfo& chunk_request_info) {
717  // Expects lock on resource_mutex_ already taken
718 
719  if (resource_grant.buffer_mem_gated_per_slot) {
722  chunk_request_info.device_memory_pool_type) +
723  resource_grant.buffer_mem_for_given_slots,
725  allocated_resources_[static_cast<size_t>(
727  resource_grant.buffer_mem_for_given_slots;
728 
729  const std::string& pool_level_string =
730  chunk_request_info.device_memory_pool_type == ExecutorDeviceType::CPU ? "CPU"
731  : "GPU";
732  LOG(EXECUTOR) << "ExecutorResourePool " << pool_level_string
733  << " allocated_temp chunk addition: "
734  << format_num_bytes(resource_grant.buffer_mem_for_given_slots);
735  LOG(EXECUTOR) << "ExecutorResourePool " << pool_level_string
736  << " pool state: Transient Allocations: "
739  << " Total Allocations: "
741  chunk_request_info.device_memory_pool_type));
742  return;
743  }
744 
745  BufferPoolChunkMap& chunk_map_for_memory_level =
749  size_t& pinned_buffer_mem_for_memory_level =
751  ? allocated_resources_[static_cast<size_t>(
753  : allocated_resources_[static_cast<size_t>(
755  const size_t total_buffer_mem_for_memory_level =
759 
760  // Following variables are for logging
761  const size_t pre_pinned_chunks_for_memory_level = chunk_map_for_memory_level.size();
762  const size_t pre_pinned_buffer_mem_for_memory_level =
763  pinned_buffer_mem_for_memory_level;
764 
765  for (const auto& requested_chunk : chunk_request_info.chunks_with_byte_sizes) {
766  auto chunk_itr = chunk_map_for_memory_level.find(requested_chunk.first);
767  if (chunk_itr == chunk_map_for_memory_level.end()) {
768  pinned_buffer_mem_for_memory_level += requested_chunk.second;
769  chunk_map_for_memory_level.insert(
770  std::make_pair(requested_chunk.first,
771  std::make_pair(size_t(1) /* initial reference count */,
772  requested_chunk.second)));
773  } else {
774  if (requested_chunk.second > chunk_itr->second.second) {
775  pinned_buffer_mem_for_memory_level +=
776  requested_chunk.second - chunk_itr->second.second;
777  chunk_itr->second.second = requested_chunk.second;
778  }
779  chunk_itr->second.first += 1; // Add reference count
780  }
781  }
782  const size_t post_pinned_chunks_for_memory_level = chunk_map_for_memory_level.size();
783  const size_t net_new_allocated_chunks =
784  post_pinned_chunks_for_memory_level - pre_pinned_chunks_for_memory_level;
785  const size_t net_new_allocated_memory =
786  pinned_buffer_mem_for_memory_level - pre_pinned_buffer_mem_for_memory_level;
787 
788  const std::string& pool_level_string =
789  chunk_request_info.device_memory_pool_type == ExecutorDeviceType::CPU ? "CPU"
790  : "GPU";
791  LOG(EXECUTOR) << "ExecutorResourePool " << pool_level_string
792  << " chunk allocation: " << chunk_request_info.num_chunks << " chunks | "
793  << format_num_bytes(chunk_request_info.total_bytes);
794  LOG(EXECUTOR) << "ExecutorResourePool " << pool_level_string
795  << " pool delta: " << net_new_allocated_chunks << " chunks added | "
796  << format_num_bytes(net_new_allocated_memory);
797  LOG(EXECUTOR) << "ExecutorResourePool " << pool_level_string
798  << " pool state: " << post_pinned_chunks_for_memory_level << " chunks | "
800  chunk_request_info.device_memory_pool_type));
801 
802  if (ENABLE_DEBUG_PRINTING) {
803  debug_print("After chunk allocation: ",
804  format_num_bytes(pinned_buffer_mem_for_memory_level),
805  " of ",
806  format_num_bytes(total_buffer_mem_for_memory_level),
807  ", with ",
808  chunk_map_for_memory_level.size(),
809  " chunks.");
810  }
811  CHECK_LE(pinned_buffer_mem_for_memory_level, total_buffer_mem_for_memory_level);
812 }
813 
815  const ResourceGrant& resource_grant,
816  const ChunkRequestInfo& chunk_request_info) {
817  // Expects lock on resource_mutex_ already taken
818 
819  if (resource_grant.buffer_mem_gated_per_slot) {
821  CHECK_GE(
823  resource_grant.buffer_mem_for_given_slots);
824  CHECK_GE(
826  resource_grant.buffer_mem_for_given_slots);
827  allocated_resources_[static_cast<size_t>(
829  resource_grant.buffer_mem_for_given_slots;
830  const std::string& pool_level_string =
831  chunk_request_info.device_memory_pool_type == ExecutorDeviceType::CPU ? "CPU"
832  : "GPU";
833  LOG(EXECUTOR) << "ExecutorResourePool " << pool_level_string
834  << " allocated_temp chunk removal: "
835  << format_num_bytes(resource_grant.buffer_mem_for_given_slots);
836  LOG(EXECUTOR) << "ExecutorResourePool " << pool_level_string
837  << " pool state: Transient Allocations: "
840  << " Total Allocations: "
843  return;
844  }
845 
846  BufferPoolChunkMap& chunk_map_for_memory_level =
850  size_t& pinned_buffer_mem_for_memory_level =
852  ? allocated_resources_[static_cast<size_t>(
854  : allocated_resources_[static_cast<size_t>(
856 
857  // Following variables are for logging
858  const size_t pre_remove_allocated_chunks_for_memory_level =
859  chunk_map_for_memory_level.size();
860  const size_t pre_remove_allocated_buffer_mem_for_memory_level =
861  pinned_buffer_mem_for_memory_level;
862 
863  for (const auto& requested_chunk : chunk_request_info.chunks_with_byte_sizes) {
864  auto chunk_itr = chunk_map_for_memory_level.find(requested_chunk.first);
865  // Chunk must exist in pool
866  CHECK(chunk_itr != chunk_map_for_memory_level.end());
867  chunk_itr->second.first -= 1;
868  if (chunk_itr->second.first == size_t(0)) {
869  pinned_buffer_mem_for_memory_level -= chunk_itr->second.second;
870  chunk_map_for_memory_level.erase(chunk_itr);
871  }
872  }
873  const size_t total_buffer_mem_for_memory_level =
877 
878  const size_t post_remove_allocated_chunks_for_memory_level =
879  chunk_map_for_memory_level.size();
880  const size_t net_removed_allocated_chunks =
881  pre_remove_allocated_chunks_for_memory_level -
882  post_remove_allocated_chunks_for_memory_level;
883  const size_t net_removed_allocated_memory =
884  pre_remove_allocated_buffer_mem_for_memory_level -
885  pinned_buffer_mem_for_memory_level;
886 
887  const std::string& pool_level_string =
888  chunk_request_info.device_memory_pool_type == ExecutorDeviceType::CPU ? "CPU"
889  : "GPU";
890  LOG(EXECUTOR) << "ExecutorResourePool " << pool_level_string
891  << " chunk removal: " << chunk_request_info.num_chunks << " chunks | "
892  << format_num_bytes(chunk_request_info.total_bytes);
893  LOG(EXECUTOR) << "ExecutorResourePool " << pool_level_string
894  << " pool delta: " << net_removed_allocated_chunks << " chunks removed | "
895  << format_num_bytes(net_removed_allocated_memory);
896  LOG(EXECUTOR) << "ExecutorResourePool " << pool_level_string
897  << " pool state: " << post_remove_allocated_chunks_for_memory_level
898  << " chunks | " << format_num_bytes(pinned_buffer_mem_for_memory_level);
899 
900  if (ENABLE_DEBUG_PRINTING) {
901  debug_print("After chunk removal: ",
902  format_num_bytes(pinned_buffer_mem_for_memory_level) + " of ",
903  format_num_bytes(total_buffer_mem_for_memory_level),
904  ", with ",
905  chunk_map_for_memory_level.size(),
906  " chunks.");
907  }
908 }
909 
911  const ResourceGrant& min_resource_grant,
912  const ChunkRequestInfo& chunk_request_info) const {
913  std::shared_lock<std::shared_mutex> resource_read_lock(resource_mutex_);
914  return can_currently_satisfy_request_impl(min_resource_grant, chunk_request_info);
915 }
916 
918  const size_t min_resource_requested,
919  const size_t max_resource_requested,
920  const size_t resource_allocated,
921  const size_t total_resource,
922  const double max_request_backoff_ratio) const {
923  CHECK_LE(min_resource_requested, max_resource_requested);
924  if (min_resource_requested + resource_allocated >= total_resource) {
925  return min_resource_requested;
926  }
927  // The below is safe in unsigned math as we know that resource_allocated <
928  // total_resource from the above conditional
929  const size_t resource_remaining = total_resource - resource_allocated;
930  return std::max(min_resource_requested,
931  std::min(max_resource_requested,
932  static_cast<size_t>(
933  round(max_request_backoff_ratio * resource_remaining))));
934 }
935 
937  const ResourceGrant& min_resource_grant,
938  const ResourceGrant& max_resource_grant,
939  const ChunkRequestInfo& chunk_request_info,
940  const double max_request_backoff_ratio) const {
941  std::unique_lock<std::shared_mutex> resource_write_lock(resource_mutex_);
942  CHECK_LE(max_request_backoff_ratio, 1.0);
943  const bool can_satisfy_request =
944  can_currently_satisfy_request_impl(min_resource_grant, chunk_request_info);
945  ResourceGrant actual_resource_grant;
946  if (!can_satisfy_request) {
947  return std::make_pair(false, actual_resource_grant);
948  }
949  actual_resource_grant.cpu_slots = determine_dynamic_single_resource_grant(
950  min_resource_grant.cpu_slots,
951  max_resource_grant.cpu_slots,
954  max_request_backoff_ratio);
955  actual_resource_grant.gpu_slots = determine_dynamic_single_resource_grant(
956  min_resource_grant.gpu_slots,
957  max_resource_grant.gpu_slots,
960  max_request_backoff_ratio);
961  // Todo (todd): Modulate number of CPU threads launched to ensure that
962  // query can fit in currently available CPU result memory
964  min_resource_grant.cpu_result_mem,
965  max_resource_grant.cpu_result_mem,
968  max_request_backoff_ratio);
969  if (min_resource_grant.buffer_mem_gated_per_slot) {
971  // Below is quite redundant, but can revisit
972  CHECK_EQ(chunk_request_info.max_bytes_per_kernel,
973  min_resource_grant.buffer_mem_per_slot);
974  CHECK_EQ(chunk_request_info.max_bytes_per_kernel,
975  max_resource_grant.buffer_mem_per_slot);
976 
977  const size_t allocated_buffer_mem_for_memory_level =
981  const size_t total_buffer_mem_for_memory_level =
985 
986  CHECK_LE(allocated_buffer_mem_for_memory_level, total_buffer_mem_for_memory_level);
987 
988  const size_t remaining_buffer_mem_for_memory_level =
989  total_buffer_mem_for_memory_level - allocated_buffer_mem_for_memory_level;
990 
991  CHECK_LE(min_resource_grant.buffer_mem_for_given_slots,
992  remaining_buffer_mem_for_memory_level);
993  const size_t max_grantable_mem =
994  std::min(remaining_buffer_mem_for_memory_level,
995  max_resource_grant.buffer_mem_for_given_slots);
996  const auto granted_buffer_mem_and_cpu_slots =
998  chunk_request_info.total_bytes, // requested_dependent_resource_quantity
999  min_resource_grant
1000  .buffer_mem_for_given_slots, // min_requested_dependent_resource_quantity
1001  max_grantable_mem, // max_grantable_dependent_resource_quantity
1002  min_resource_grant.cpu_slots, // min_requested_independent_resource_quantity
1003  max_resource_grant.cpu_slots, // max_grantable_independent_resource_quantity
1004  chunk_request_info
1005  .max_bytes_per_kernel); // dependent_to_independent_resource_ratio
1006  const size_t granted_buffer_mem = granted_buffer_mem_and_cpu_slots.first;
1007  const size_t granted_cpu_slots = granted_buffer_mem_and_cpu_slots.second;
1008  CHECK_EQ(granted_buffer_mem,
1009  granted_cpu_slots * chunk_request_info.max_bytes_per_kernel);
1010  CHECK_GE(granted_cpu_slots, min_resource_grant.cpu_slots);
1011  CHECK_LE(granted_cpu_slots, max_resource_grant.cpu_slots);
1012  actual_resource_grant.buffer_mem_gated_per_slot = true;
1013  actual_resource_grant.buffer_mem_per_slot = chunk_request_info.max_bytes_per_kernel;
1014  actual_resource_grant.buffer_mem_for_given_slots = granted_buffer_mem;
1015  actual_resource_grant.cpu_slots =
1016  granted_cpu_slots; // Override cpu slots with restricted dependent resource
1017  // calc
1018  }
1019  return std::make_pair(true, actual_resource_grant);
1020 }
1021 
1023  const ResourceGrant& resource_grant,
1024  const ChunkRequestInfo& chunk_request_info) {
1025  std::unique_lock<std::shared_mutex> resource_write_lock(resource_mutex_);
1026 
1027  // Caller (ExecutorResourceMgr) should never request resource allocation for a request
1028  // it knows cannot be granted, however use below as a sanity check Use unlocked
1029  // internal method as we already hold lock above
1030  const bool can_satisfy_request =
1031  can_currently_satisfy_request_impl(resource_grant, chunk_request_info);
1032  CHECK(can_satisfy_request);
1033 
1034  allocated_resources_[static_cast<size_t>(ResourceSubtype::CPU_SLOTS)] +=
1035  resource_grant.cpu_slots;
1036  allocated_resources_[static_cast<size_t>(ResourceSubtype::GPU_SLOTS)] +=
1037  resource_grant.gpu_slots;
1038  allocated_resources_[static_cast<size_t>(ResourceSubtype::CPU_RESULT_MEM)] +=
1039  resource_grant.cpu_result_mem;
1040 
1043  if (resource_grant.cpu_slots > 0) {
1046  }
1047  if (resource_grant.gpu_slots > 0) {
1050  }
1051  if (resource_grant.cpu_result_mem > 0) {
1054  }
1055  if (chunk_request_info.device_memory_pool_type == ExecutorDeviceType::CPU) {
1056  if (resource_grant.buffer_mem_gated_per_slot ||
1057  (chunk_request_info.num_chunks > 0 && chunk_request_info.total_bytes > 0)) {
1060  }
1061  } else if (chunk_request_info.device_memory_pool_type == ExecutorDeviceType::GPU) {
1062  if (resource_grant.buffer_mem_gated_per_slot ||
1063  (chunk_request_info.num_chunks > 0 && chunk_request_info.total_bytes > 0)) {
1066  }
1067  }
1068 
1069  LOG(EXECUTOR) << "ExecutorResourcePool allocation: " << outstanding_num_requests_
1070  << " requests ("
1072  << " CPU | "
1074  << " GPU)";
1075  LOG(EXECUTOR) << "ExecutorResourcePool state: CPU slots: "
1077  << get_total_resource(ResourceType::CPU_SLOTS) << " | GPU slots: "
1079  << get_total_resource(ResourceType::GPU_SLOTS) << " CPU result mem: "
1080  << format_num_bytes(
1082  << " of "
1084  add_chunk_requests_to_allocated_pool(resource_grant, chunk_request_info);
1085 }
1086 
1088  const ResourceGrant& resource_grant,
1089  const ChunkRequestInfo& chunk_request_info) {
1090  std::unique_lock<std::shared_mutex> resource_write_lock(resource_mutex_);
1091 
1092  // Caller (ExecutorResourceMgr) should never request resource allocation for a request
1093  // it knows cannot be granted, however use below as a sanity check
1094 
1095  CHECK_LE(resource_grant.cpu_slots,
1097  CHECK_LE(resource_grant.gpu_slots,
1099  CHECK_LE(resource_grant.cpu_result_mem,
1101 
1102  allocated_resources_[static_cast<size_t>(ResourceSubtype::CPU_SLOTS)] -=
1103  resource_grant.cpu_slots;
1104  allocated_resources_[static_cast<size_t>(ResourceSubtype::GPU_SLOTS)] -=
1105  resource_grant.gpu_slots;
1106  allocated_resources_[static_cast<size_t>(ResourceSubtype::CPU_RESULT_MEM)] -=
1107  resource_grant.cpu_result_mem;
1108 
1110  if (resource_grant.cpu_slots > 0) {
1112  }
1113  if (resource_grant.gpu_slots > 0) {
1115  }
1116  if (resource_grant.cpu_result_mem > 0) {
1118  }
1119  if (chunk_request_info.device_memory_pool_type == ExecutorDeviceType::CPU) {
1120  if (resource_grant.buffer_mem_gated_per_slot ||
1121  (chunk_request_info.num_chunks > 0 && chunk_request_info.total_bytes > 0)) {
1123  }
1124  } else if (chunk_request_info.device_memory_pool_type == ExecutorDeviceType::GPU) {
1125  if (resource_grant.buffer_mem_gated_per_slot ||
1126  (chunk_request_info.num_chunks > 0 && chunk_request_info.total_bytes > 0)) {
1128  }
1129  }
1130 
1131  LOG(EXECUTOR) << "ExecutorResourcePool de-allocation: " << outstanding_num_requests_
1132  << " requests ("
1134  << " CPU | "
1136  << " GPU)";
1137  LOG(EXECUTOR) << "ExecutorResourcePool state: CPU slots: "
1139  << get_total_resource(ResourceType::CPU_SLOTS) << " | GPU slots: "
1141  << get_total_resource(ResourceType::GPU_SLOTS) << " CPU result mem: "
1142  << format_num_bytes(
1144  << " of "
1146  remove_chunk_requests_from_allocated_pool(resource_grant, chunk_request_info);
1147 
1150  }
1151 }
1152 
1154  const size_t sum_resource_requests =
1158 
1160  CHECK_LE(outstanding_num_requests_, sum_resource_requests);
1161  const bool has_outstanding_resource_requests = sum_resource_requests > 0;
1162  const bool has_outstanding_num_requests_globally = outstanding_num_requests_ > 0;
1163  CHECK_EQ(has_outstanding_resource_requests, has_outstanding_num_requests_globally);
1164 
1169 
1174 
1179 
1180  CHECK_EQ(
1183 
1184  CHECK_EQ(
1187 
1188  if (outstanding_num_requests_ == static_cast<size_t>(0)) {
1190  size_t(0));
1192  size_t(0));
1195  }
1196 }
1197 
1198 } // namespace ExecutorResourceMgr_Namespace
size_t get_total_resource(const ResourceType resource_type) const
A container for various stats about the current state of the ExecutorResourcePool. Note that ExecutorResourcePool does not persist a struct of this type, but rather builds one on the fly when ExecutorResourcePool::get_resource_info() is called.
A container to store requested and minimum neccessary resource requests across all resource types cur...
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t get_chunk_bytes_not_in_pool(const ChunkRequestInfo &chunk_request_info) const
ResourcePoolInfo get_resource_info() const
Returns a struct detailing the allocated and total available resources of each type tracked in Execut...
std::array< size_t, ResourceSubtypeSize > allocated_resources_
ResourceConcurrencyPolicy
Specifies whether grants for a specified resource can be made concurrently (ALLOW_CONCURRENT_REQEUSTS...
bool can_currently_satisfy_request(const ResourceGrant &min_resource_grant, const ChunkRequestInfo &chunk_request_info) const
std::array< bool, ResourceTypeSize > resource_type_validity_
#define LOG(tag)
Definition: Logger.h:285
size_t get_allocated_resource_of_subtype(const ResourceSubtype resource_subtype) const
std::array< size_t, ResourceTypeSize > total_resources_
ChunkRequestInfo get_requested_chunks_not_in_pool(const ChunkRequestInfo &chunk_request_info) const
ResourceType resource_type
The type of a resource this concurrent resource grant policy pertains to.
void set_resource(const ResourceType resource_type, const size_t resource_quantity)
Sets the quantity of resource_type to resource_quantity. If pool has outstanding requests, will throw. Responsibility of allowing the pool to empty and preventing concurrent requests while this operation is running is left to the caller (in particular, ExecutorResourceMgr::set_resource pauses the process queue, which waits until all executing requests are finished before yielding to the caller, before calling this method).
static constexpr size_t ResourceTypeSize
const ResourceGrantPolicy & get_max_resource_grant_per_request_policy(const ResourceSubtype resource_subtype) const
void deallocate_resources(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
Deallocates resources granted to a requestor such that they can be used for other requests...
#define UNREACHABLE()
Definition: Logger.h:338
Specifies the minimum and maximum quanity either requested or granted for a request of resource_subty...
#define CHECK_GE(x, y)
Definition: Logger.h:306
ResourceType
Stores the resource type for a ExecutorResourcePool request.
std::vector< ResourceSubtype > map_resource_type_to_resource_subtypes(const ResourceType resource_type)
Returns the 1-or-more ResourceSubtypes associated with a given ResourceType.
bool is_resource_valid(const ResourceType resource_type) const
bool can_currently_satisfy_request_impl(const ResourceGrant &min_resource_grant, const ChunkRequestInfo &chunk_request_info) const
size_t calc_max_resource_grant_for_request(const size_t requested_resource_quantity, const size_t min_requested_resource_quantity, const size_t max_grantable_resource_quantity) const
std::map< ChunkKey, std::pair< size_t, size_t >> BufferPoolChunkMap
std::vector< std::pair< ChunkKey, size_t > > chunks_with_byte_sizes
Specifies the policies for resource grants in the presence of other requests, both under situations o...
void add_chunk_requests_to_allocated_pool(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
size_t determine_dynamic_single_resource_grant(const size_t min_resource_requested, const size_t max_resource_requested, const size_t resource_allocated, const size_t total_resource, const double max_request_backoff_ratio) const
#define CHECK_GT(x, y)
Definition: Logger.h:305
ResourceSubtype
Stores the resource sub-type for a ExecutorResourcePool request.
bool can_currently_satisfy_chunk_request(const ResourceGrant &min_resource_grant, const ChunkRequestInfo &chunk_request_info) const
size_t get_total_allocated_buffer_pool_mem_for_level(const ExecutorDeviceType memory_pool_type) const
size_t increment_outstanding_per_resource_num_requests(const ResourceType resource_type)
ResourceConcurrencyPolicy concurrency_policy
The grant policy in effect when there are concurrent requests for the resource specified by resource_...
std::pair< ResourceGrant, ResourceGrant > calc_min_max_resource_grants_for_request(const RequestInfo &resource_request) const
Given the provided resource_request, statically calculate the minimum and maximum grantable resources...
std::vector< ResourceRequestGrant > calc_static_resource_grant_ranges_for_request(const std::vector< ResourceRequest > &resource_requests) const
void allocate_resources(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
Given a resource grant (assumed to be computed in determine_dynamic_resource_grant), actually allocate (reserve) the resources in the pool so other requestors (queries) cannot use those resources until returned to the pool.
Specifies the resources of each type for a given resource grant.
void set_concurrent_resource_grant_policy(const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
Resets the concurrent resource grant policy object, which specifies a ResourceType as well as normal ...
size_t increment_total_per_resource_num_requests(const ResourceType resource_type)
std::pair< size_t, size_t > calc_max_dependent_resource_grant_for_request(const size_t requested_dependent_resource_quantity, const size_t min_requested_dependent_resource_quantity, const size_t max_grantable_dependent_resource_quantity, const size_t min_requested_independent_resource_quantity, const size_t max_grantable_independent_resource_quantity, const size_t dependent_to_independent_resource_ratio) const
bool check_request_against_global_policy(const size_t resource_total, const size_t resource_allocated, const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy) const
ExecutorResourcePool(const std::vector< std::pair< ResourceType, size_t >> &total_resources, const std::vector< ConcurrentResourceGrantPolicy > &concurrent_resource_grant_policies, const std::vector< ResourceGrantPolicy > &max_per_request_resource_grant_policies)
ResourceConcurrencyPolicy oversubscription_concurrency_policy
The grant policy in effect when there are concurrent requests for the resource specified by resource_...
std::pair< bool, ResourceGrant > determine_dynamic_resource_grant(const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant, const ChunkRequestInfo &chunk_request_info, const double max_request_backoff_ratio) const
Determines the actual resource grant to give a query (which will be somewhere between the provided mi...
std::string format_num_bytes(const size_t bytes)
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::string resource_type_to_string(const ResourceType resource_type)
std::pair< size_t, size_t > calc_min_dependent_resource_grant_for_request(const size_t min_requested_dependent_resource_quantity, const size_t min_requested_independent_resource_quantity, const size_t dependent_to_independent_resource_ratio) const
std::array< ConcurrentResourceGrantPolicy, ResourceTypeSize > concurrent_resource_grant_policies_
ResourceType map_resource_subtype_to_resource_type(const ResourceSubtype resource_subtype)
Returns the ResourceType associated with a given ResourceSubtype
void remove_chunk_requests_from_allocated_pool(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
void init(const std::vector< std::pair< ResourceType, size_t >> &total_resources, const std::vector< ConcurrentResourceGrantPolicy > &concurrent_resource_grant_policies, const std::vector< ResourceGrantPolicy > &max_per_request_resource_grant_policies)
size_t get_outstanding_per_resource_num_requests(const ResourceType resource_type) const
#define CHECK(condition)
Definition: Logger.h:291
size_t get_max_resource_grant_per_request(const ResourceSubtype resource_subtype) const
bool check_request_against_policy(const size_t resource_request, const size_t resource_total, const size_t resource_allocated, const size_t global_outstanding_requests, const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy) const
Specifies all DataMgr chunks needed for a query step/request, along with their sizes in bytes...
std::array< size_t, ResourceSubtypeSize > max_resource_grants_per_request_
size_t decrement_outstanding_per_resource_num_requests(const ResourceType resource_type)
void throw_insufficient_resource_error(const ResourceSubtype resource_subtype, const size_t min_resource_requested) const
size_t get_allocated_resource_of_type(const ResourceType resource_type) const
std::array< ResourceGrantPolicy, ResourceSubtypeSize > max_resource_grants_per_request_policies_
void debug_print(Ts &&...print_args)
ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ResourceType resource_type) const