OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ExecutorResourceMgr_Namespace::ExecutorResourceMgr Class Reference

ExecutorResourceMgr is the central manager for resources available to all executors in the system. It manages an ExecutorResourcePool to keep track of available and allocated resources (currently CPU slots/threads, GPUs, CPU result memory, and CPU and GPU buffer pool memory). It also manages a thread queue which keeps requesting threads (from Executor::launchKernelsViaResourceMgr) waiting until there it can schedule them. At that point, it gives the calling executor thread a ResourceHandle detailing the resources granted to the query, which once it goes out of scope will return the granted resources to the ExecutorResourcePool. More...

#include <ExecutorResourceMgr.h>

+ Inheritance diagram for ExecutorResourceMgr_Namespace::ExecutorResourceMgr:
+ Collaboration diagram for ExecutorResourceMgr_Namespace::ExecutorResourceMgr:

Public Member Functions

 ExecutorResourceMgr (const std::vector< std::pair< ResourceType, size_t >> &total_resources, const std::vector< ConcurrentResourceGrantPolicy > &concurrent_resource_grant_policies, const std::vector< ResourceGrantPolicy > &max_per_request_resource_grant_policies, const double max_available_resource_use_ratio)
 The constructor instantiates an ExecutorResourcePool with the provided parameters, and starts the process queue by launching a thread to invoke process_queue_loop. More...
 
 ~ExecutorResourceMgr ()
 The destructor ensures that the process queue thread (process_queue_thread) is stopped and that any threads waiting for resources are joined. Currently only called on database shutdown. More...
 
std::unique_ptr
< ExecutorResourceHandle
request_resources_with_timeout (const RequestInfo &request_info, const size_t timeout_in_ms)
 Requests resources from ExecutorResourceMgr, will throw if request takes longer than time specified by timeout_in_ms. More...
 
std::unique_ptr
< ExecutorResourceHandle
request_resources (const RequestInfo &request_info)
 Requests resources from ExecutorResourceMgr, with no timeout (unlike request_resources_with_timeout) More...
 
void release_resources (const RequestId request_id, const ResourceGrant &resource_grant)
 Instructs ExecutorResourceMgr that the resources held by the requestor with the given request_id can be freed/returned to the ExecutorResourcePool. More...
 
ExecutorStats get_executor_stats () const
 Returns a copy of the ExecutorStats struct held by ExecutorResourceMgr. Used for testing currently. More...
 
void print_executor_stats () const
 Prints the ExecutorStats struct. Use for debugging. More...
 
std::pair< size_t, size_t > get_resource_info (const ResourceType resource_type) const
 Returns the allocated and total available amount of the resource specified. More...
 
ResourcePoolInfo get_resource_info () const
 Returns a struct containing the total and allocated amounts of all resources tracked by ExecutorResourceMgr/ExecutorResourcePool, as well as the number of outstanding requests (in total and by resource type) More...
 
void set_resource (const ResourceType resource_type, const size_t resoure_quantity)
 Used to change the total amount available of a specified resource after construction of ExecutorResourceMgr More...
 
ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy (const ResourceType resource_type) const
 Get the concurrent resource grant policy for a given resource type. More...
 
void set_concurrent_resource_grant_policy (const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
 Set the concurrent resource grant policy for a given resource type (stored in ConcurrentResourceGrantPolicy) More...
 
void pause_process_queue ()
 Pauses the process queue in a thread-safe manner, waiting for all queries in the executing stage to finish before yielding to the caller (ensuring that the ExecutorResourcePool has no outstanding allocations). If the process queue is already paused, the call is a no-op. This method is used to live-change parameters associated with ExecutorResourcePool. More...
 
void resume_process_queue ()
 Resumes the process queue in a thread-safe manner. If the process queue is already paused, the call is a no-op. More...
 

Private Member Functions

void process_queue_loop ()
 Internal method: A thread is assigned to run this function in the constructor of ExecutorResourceMgr, where it loops continuously waiting for changes to the process queue (i.e. the introduction of a new resource request or the finish of an existing request). More...
 
RequestStats get_request_for_id (const RequestId request_id) const
 Internal method: Returns the RequestStats for a request specified by request_id. More...
 
void mark_request_error (const RequestId request_id, std::string error_msg)
 
RequestId choose_next_request ()
 Internal method: Invoked from process_queue_loop, chooses the next resource request to grant. More...
 
RequestId enqueue_request (const RequestInfo &request_info, const size_t timeout_in_ms, const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant)
 Internal method: Invoked from request_resource/request_resource_with_timeout, places request in the request queue where it will be wait to be granted the resources requested. More...
 
void mark_request_dequed (const RequestId request_id)
 Internal method: Moves the request from the QUEUED stage to EXECUTING stage and performs other bookkeeping. More...
 
void mark_request_timed_out (const RequestId request_id)
 Internal method: Called if the request times out (i.e. request was made via request_resources_with_timeout), moves request out of QUEUED stage and does other bookkeeping on the request's RequestStats and on ExecutorStats. More...
 
void mark_request_finished (const RequestId request_id)
 Internal method: Invoked on successful completion of a query step from release_resources method, removes request from EXECUTING stage and performs various bookkeeping, including recording execution and final times in request_stats. More...
 
void set_process_queue_flag ()
 Internal method: Set the should_process_queue_ flag to true, signifying that the queue should be processed. Protected by a lock on processor_queue_mutex_. More...
 
void stop_process_queue_thread ()
 Internal method: Invoked from ExecutorResourceMgr destructor, sets stop_process_queue_thread_ to true (behind a lock on processor_queue_mutex_) and then attempts to join all threads left in the request queue on server shutdown. More...
 
std::vector< RequestIdget_requests_for_stage (const ExecutionRequestStage request_status) const
 Internal method: Get the request ids for a given stage (QUEUED or EXECUTING) More...
 
void add_request_to_stage (const RequestId request_id, const ExecutionRequestStage request_status)
 Internal method: Adds the request specified by the provided request_id to the specified stage. More...
 
void remove_request_from_stage (const RequestId request_id, const ExecutionRequestStage request_status)
 Internal method: Removes the request specified by the provided request_id from the specified stage. More...
 
ChunkRequestInfo get_chunk_request_info (const RequestId request_id)
 Get the DataMgr chunk ids and associated sizes pertaining to the input data needed by a request. More...
 

Private Attributes

ExecutorResourcePool executor_resource_pool_
 Keeps track of available resources for execution. More...
 
std::atomic< size_t > requests_count_ {0}
 An atomic that is incremented with each incoming request, and used to assign RequestIds to incoming request. More...
 
ExecutorStats executor_stats_
 Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc. More...
 
const size_t ACTUALLY_QUEUED_MIN_MS {2}
 
std::thread process_queue_thread_
 The thread started in the ExecutorResourceMgr constructor that continuously loops inside of process_queue_loop to determine the next resource request that should be granted. More...
 
std::mutex processor_queue_mutex_
 RW mutex that protects access to stop_process_queue_thread_ and pause_processor_queue_ More...
 
std::mutex pause_processor_queue_mutex_
 
std::mutex print_mutex_
 
std::shared_mutex queue_stats_mutex_
 RW mutex that protects access to executor_stats_ and request_stats_ More...
 
std::shared_mutex queued_set_mutex_
 RW mutex that protects access to queued_requests_ More...
 
std::shared_mutex executing_set_mutex_
 RW mutex that protects access to executing_requests_ More...
 
std::condition_variable processor_queue_condition_
 
std::condition_variable pause_processor_queue_condition_
 
bool should_process_queue_ {false}
 
bool stop_process_queue_thread_ {false}
 
bool pause_process_queue_ {false}
 
bool process_queue_is_paused_ {false}
 
size_t process_queue_counter_ {0}
 
OutstandingQueueRequests outstanding_queue_requests_
 Stores and manages a map of request ids to BinarySemaphore objects to allow threads waiting for resources to be selectively queued/blocked and then when they are choosen for resource grants/execution, woken. More...
 
std::set< RequestIdqueued_requests_
 Set of all request ids that are currently queued. Protected by queued_set_mutex_. More...
 
std::set< RequestIdexecuting_requests_
 Set of all request ids that are currently executing (i.e. post-granting of resources). Protected by executing_set_mutex_. More...
 
std::vector< RequestStatsrequests_stats_
 Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming request appending a RequestStats struct to this vector. Protected by queue_stats_mutex_. More...
 
const bool enable_stats_printing_ {false}
 
const bool enable_debug_printing_ {false}
 
const RequestId INVALID_REQUEST_ID {std::numeric_limits<size_t>::max()}
 
const double max_available_resource_use_ratio_
 

Detailed Description

ExecutorResourceMgr is the central manager for resources available to all executors in the system. It manages an ExecutorResourcePool to keep track of available and allocated resources (currently CPU slots/threads, GPUs, CPU result memory, and CPU and GPU buffer pool memory). It also manages a thread queue which keeps requesting threads (from Executor::launchKernelsViaResourceMgr) waiting until there it can schedule them. At that point, it gives the calling executor thread a ResourceHandle detailing the resources granted to the query, which once it goes out of scope will return the granted resources to the ExecutorResourcePool.

Definition at line 137 of file ExecutorResourceMgr.h.

Constructor & Destructor Documentation

ExecutorResourceMgr_Namespace::ExecutorResourceMgr::ExecutorResourceMgr ( const std::vector< std::pair< ResourceType, size_t >> &  total_resources,
const std::vector< ConcurrentResourceGrantPolicy > &  concurrent_resource_grant_policies,
const std::vector< ResourceGrantPolicy > &  max_per_request_resource_grant_policies,
const double  max_available_resource_use_ratio 
)

The constructor instantiates an ExecutorResourcePool with the provided parameters, and starts the process queue by launching a thread to invoke process_queue_loop.

Definition at line 25 of file ExecutorResourceMgr.cpp.

References CHECK_GT, CHECK_LE, logger::EXECUTOR, LOG, max_available_resource_use_ratio_, process_queue_loop(), and process_queue_thread_.

30  : executor_resource_pool_(total_resources,
31  concurrent_resource_grant_policies,
32  max_per_request_resource_grant_policies)
33  , max_available_resource_use_ratio_(max_available_resource_use_ratio) {
37  LOG(EXECUTOR) << "Executor Resource Manager queue proccessing thread started";
38 }
#define LOG(tag)
Definition: Logger.h:285
void process_queue_loop()
Internal method: A thread is assigned to run this function in the constructor of ExecutorResourceMgr...
#define CHECK_GT(x, y)
Definition: Logger.h:305
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::thread process_queue_thread_
The thread started in the ExecutorResourceMgr constructor that continuously loops inside of process_q...

+ Here is the call graph for this function:

ExecutorResourceMgr_Namespace::ExecutorResourceMgr::~ExecutorResourceMgr ( )

The destructor ensures that the process queue thread (process_queue_thread) is stopped and that any threads waiting for resources are joined. Currently only called on database shutdown.

Definition at line 40 of file ExecutorResourceMgr.cpp.

References stop_process_queue_thread().

40  {
42 }
void stop_process_queue_thread()
Internal method: Invoked from ExecutorResourceMgr destructor, sets stop_process_queue_thread_ to true...

+ Here is the call graph for this function:

Member Function Documentation

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::add_request_to_stage ( const RequestId  request_id,
const ExecutionRequestStage  request_status 
)
private

Internal method: Adds the request specified by the provided request_id to the specified stage.

Parameters
request_id- Request id to add to the specified stage
request_status- Stage (QUEUED or EXECUTING) to add the specified request to

Definition at line 576 of file ExecutorResourceMgr.cpp.

References CHECK, executing_requests_, executing_set_mutex_, ExecutorResourceMgr_Namespace::QUEUED, queued_requests_, and queued_set_mutex_.

Referenced by enqueue_request(), and mark_request_dequed().

578  {
579  auto& chosen_set = request_stage == ExecutionRequestStage::QUEUED ? queued_requests_
581  auto& chosen_mutex = request_stage == ExecutionRequestStage::QUEUED
584  std::unique_lock<std::shared_mutex> set_write_lock(chosen_mutex);
585 
586  CHECK(chosen_set.insert(request_id)
587  .second); // Should return true as element should not exist in set
588 }
std::set< RequestId > executing_requests_
Set of all request ids that are currently executing (i.e. post-granting of resources). Protected by executing_set_mutex_.
std::shared_mutex queued_set_mutex_
RW mutex that protects access to queued_requests_
RequestId request_id()
Definition: Logger.cpp:876
std::shared_mutex executing_set_mutex_
RW mutex that protects access to executing_requests_
#define CHECK(condition)
Definition: Logger.h:291
std::set< RequestId > queued_requests_
Set of all request ids that are currently queued. Protected by queued_set_mutex_. ...

+ Here is the caller graph for this function:

RequestId ExecutorResourceMgr_Namespace::ExecutorResourceMgr::choose_next_request ( )
private

Internal method: Invoked from process_queue_loop, chooses the next resource request to grant.

Currently based on FIFO logic, choosing the oldest request in the queue that we have enough resources in ExecutorResourcePool to fulfill. Future variants could add more sophisticated logic for more optimal query scheduling (i.e. non-FIFO).

Returns
RequestId - The id of the request we will grant resources for.

Definition at line 133 of file ExecutorResourceMgr.cpp.

References ExecutorResourceMgr_Namespace::ExecutorResourcePool::determine_dynamic_resource_grant(), enable_debug_printing_, logger::EXECUTOR, executor_resource_pool_, get_requests_for_stage(), INVALID_REQUEST_ID, LOG, max_available_resource_use_ratio_, print_mutex_, process_queue_counter_, queue_stats_mutex_, ExecutorResourceMgr_Namespace::QUEUED, logger::request_id(), and requests_stats_.

Referenced by process_queue_loop().

133  {
134  const auto request_ids = get_requests_for_stage(ExecutionRequestStage::QUEUED);
135  LOG(EXECUTOR) << "ExecutorResourceMgr Queue Itr: " << process_queue_counter_ - 1
136  << " Queued requests: " << request_ids.size();
137  std::unique_lock<std::shared_mutex> queue_stats_lock(queue_stats_mutex_);
138  for (const auto request_id : request_ids) {
139  auto& request_stats = requests_stats_[request_id];
140  try {
141  const auto actual_resource_grant =
143  request_stats.min_resource_grant,
144  request_stats.max_resource_grant,
145  request_stats.request_info.chunk_request_info,
147  // boolean sentinel first member of returned pair says whether
148  // a resource grant was able to be made at all
149  if (actual_resource_grant.first) {
150  request_stats.actual_resource_grant = actual_resource_grant.second;
151  LOG(EXECUTOR) << "ExecutorResourceMgr Queue chosen request ID: " << request_id
152  << " from " << request_ids.size() << " queued requests.";
153  LOG(EXECUTOR) << "Request grant: " << actual_resource_grant.second.to_string();
155  std::unique_lock<std::mutex> print_lock(print_mutex_);
156  std::cout << std::endl << "Actual grant";
157  actual_resource_grant.second.print();
158  }
159  return request_id;
160  }
161  } catch (std::runtime_error const& e) {
162  throw ExecutorResourceMgrError(request_id, e.what());
163  }
164  }
165  return INVALID_REQUEST_ID;
166 }
#define LOG(tag)
Definition: Logger.h:285
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
std::vector< RequestId > get_requests_for_stage(const ExecutionRequestStage request_status) const
Internal method: Get the request ids for a given stage (QUEUED or EXECUTING)
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
std::pair< bool, ResourceGrant > determine_dynamic_resource_grant(const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant, const ChunkRequestInfo &chunk_request_info, const double max_request_backoff_ratio) const
Determines the actual resource grant to give a query (which will be somewhere between the provided mi...
RequestId request_id()
Definition: Logger.cpp:876
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

RequestId ExecutorResourceMgr_Namespace::ExecutorResourceMgr::enqueue_request ( const RequestInfo request_info,
const size_t  timeout_in_ms,
const ResourceGrant min_resource_grant,
const ResourceGrant max_resource_grant 
)
private

Internal method: Invoked from request_resource/request_resource_with_timeout, places request in the request queue where it will be wait to be granted the resources requested.

Note that this method assigns a RequestId to the request, adds the request to the RequestStats, and adds the assigned request_id to the QUEUED stage, but does not actually notify the process queue or queue the request in the OutstandQueueRequests object. Those tasks are done subsequently to invocation of this method in the parent call (request_resources_with_timeout)

Parameters
request_info- info for the current resource request
timeout_in_ms- request timeout
min_resource_grant- min allowable resource request, calculated in caller request_resources_with_timeout method from request_info
max_resource_grant- max (ideal) resource request, calculated in caller request_resources_with_timeout method from request_info
Returns
RequestId - generated request id for this request

Definition at line 391 of file ExecutorResourceMgr.cpp.

References add_request_to_stage(), CPU, ExecutorResourceMgr_Namespace::ExecutorStats::cpu_queue_length, ExecutorResourceMgr_Namespace::ExecutorStats::cpu_requests, executor_stats_, GPU, ExecutorResourceMgr_Namespace::ExecutorStats::gpu_queue_length, ExecutorResourceMgr_Namespace::ExecutorStats::gpu_requests, ExecutorResourceMgr_Namespace::ExecutorStats::queue_length, queue_stats_mutex_, ExecutorResourceMgr_Namespace::QUEUED, ExecutorResourceMgr_Namespace::RequestInfo::request_device_type, logger::request_id(), ExecutorResourceMgr_Namespace::ExecutorStats::requests, requests_count_, requests_stats_, ExecutorResourceMgr_Namespace::ExecutorStats::requests_with_timeouts, ExecutorResourceMgr_Namespace::ExecutorStats::sum_cpu_queue_size_at_entry, ExecutorResourceMgr_Namespace::ExecutorStats::sum_gpu_queue_size_at_entry, ExecutorResourceMgr_Namespace::ExecutorStats::sum_queue_size_at_entry, and UNREACHABLE.

Referenced by request_resources_with_timeout().

394  {
395  const std::chrono::steady_clock::time_point enqueue_time =
396  std::chrono::steady_clock::now();
397  std::unique_lock<std::shared_mutex> queue_stats_write_lock(queue_stats_mutex_);
398  const RequestId request_id = requests_count_.fetch_add(1, std::memory_order_relaxed);
400  if (timeout_in_ms > 0) {
402  }
403  const size_t queue_length_at_entry = executor_stats_.queue_length++;
404  executor_stats_.sum_queue_size_at_entry += queue_length_at_entry;
405  size_t device_type_queue_length_at_entry{0};
406  switch (request_info.request_device_type) {
409  device_type_queue_length_at_entry = executor_stats_.cpu_queue_length++;
410  executor_stats_.sum_cpu_queue_size_at_entry += device_type_queue_length_at_entry;
411  break;
412  }
415  device_type_queue_length_at_entry = executor_stats_.gpu_queue_length++;
416  executor_stats_.sum_gpu_queue_size_at_entry += device_type_queue_length_at_entry;
417  break;
418  }
419  default:
420  UNREACHABLE();
421  }
422 
423  requests_stats_.emplace_back(RequestStats(request_id,
424  request_info,
425  min_resource_grant,
426  max_resource_grant,
427  enqueue_time,
428  queue_length_at_entry,
429  device_type_queue_length_at_entry,
430  timeout_in_ms));
432  return request_id;
433 }
void add_request_to_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Adds the request specified by the provided request_id to the specified stage...
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
std::atomic< size_t > requests_count_
An atomic that is incremented with each incoming request, and used to assign RequestIds to incoming r...
RequestId request_id()
Definition: Logger.cpp:876
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ChunkRequestInfo ExecutorResourceMgr_Namespace::ExecutorResourceMgr::get_chunk_request_info ( const RequestId  request_id)
private

Get the DataMgr chunk ids and associated sizes pertaining to the input data needed by a request.

Parameters
request_id- Request id to fetch the needed chunks for
Returns
ChunkRequestInfo - struct containing vector of ChunkKey and byte sizes as well as device memory space (CPU or GPU), total bytes, etc

Definition at line 604 of file ExecutorResourceMgr.cpp.

References queue_stats_mutex_, logger::request_id(), and requests_stats_.

Referenced by release_resources().

604  {
605  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
606  return requests_stats_[request_id].request_info.chunk_request_info;
607 }
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
RequestId request_id()
Definition: Logger.cpp:876
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ConcurrentResourceGrantPolicy ExecutorResourceMgr_Namespace::ExecutorResourceMgr::get_concurrent_resource_grant_policy ( const ResourceType  resource_type) const

Get the concurrent resource grant policy for a given resource type.

Queries the ExecutorResourcePool for the current concurrency policies (including normal and oversubscribed) for a resource type

Parameters
resource_type- Type of resource to get the concurrency policy for
Returns
const ConcurrentResourceGrantPolicy& - Specifies the chosen concurrency policy for normal operation and when the specified resource is oversubscribed

Definition at line 293 of file ExecutorResourceMgr.cpp.

References executor_resource_pool_, and ExecutorResourceMgr_Namespace::ExecutorResourcePool::get_concurrent_resource_grant_policy().

294  {
296 }
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ResourceType resource_type) const

+ Here is the call graph for this function:

ExecutorStats ExecutorResourceMgr_Namespace::ExecutorResourceMgr::get_executor_stats ( ) const

Returns a copy of the ExecutorStats struct held by ExecutorResourceMgr. Used for testing currently.

Definition at line 168 of file ExecutorResourceMgr.cpp.

References executor_stats_, and queue_stats_mutex_.

Referenced by print_executor_stats().

168  {
169  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
170  return executor_stats_; // Will make copy
171 }
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_

+ Here is the caller graph for this function:

RequestStats ExecutorResourceMgr_Namespace::ExecutorResourceMgr::get_request_for_id ( const RequestId  request_id) const
private

Internal method: Returns the RequestStats for a request specified by request_id.

Takes a read lock on queue_stats_mutex_ for thread safety. Note this method should not be used in internal methods where a lock is already taken on queue_stats_mutex_.

Parameters
request_id- The RequestId for the request we want to retrieve stats for
Returns
RequestStats - The stats for the specified request, including resources granted

Definition at line 120 of file ExecutorResourceMgr.cpp.

References CHECK_LT, queue_stats_mutex_, logger::request_id(), and requests_stats_.

Referenced by process_queue_loop().

120  {
121  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
123  return requests_stats_[request_id];
124 }
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
#define CHECK_LT(x, y)
Definition: Logger.h:303
RequestId request_id()
Definition: Logger.cpp:876
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< RequestId > ExecutorResourceMgr_Namespace::ExecutorResourceMgr::get_requests_for_stage ( const ExecutionRequestStage  request_status) const
private

Internal method: Get the request ids for a given stage (QUEUED or EXECUTING)

Invoked from choose_next_request to get all outstanding queued requests

Parameters
request_status- request stage type to fetch request ids for
Returns
std::vector<RequestId> - vector of request ids in the requested stage

Definition at line 562 of file ExecutorResourceMgr.cpp.

References executing_requests_, executing_set_mutex_, ExecutorResourceMgr_Namespace::QUEUED, queued_requests_, and queued_set_mutex_.

Referenced by choose_next_request().

563  {
564  auto& chosen_set = request_stage == ExecutionRequestStage::QUEUED ? queued_requests_
566  auto& chosen_mutex = request_stage == ExecutionRequestStage::QUEUED
569  std::shared_lock<std::shared_mutex> set_read_lock(chosen_mutex);
570 
571  const std::vector<RequestId> request_ids_for_stage(chosen_set.begin(),
572  chosen_set.end());
573  return request_ids_for_stage;
574 }
std::set< RequestId > executing_requests_
Set of all request ids that are currently executing (i.e. post-granting of resources). Protected by executing_set_mutex_.
std::shared_mutex queued_set_mutex_
RW mutex that protects access to queued_requests_
std::shared_mutex executing_set_mutex_
RW mutex that protects access to executing_requests_
std::set< RequestId > queued_requests_
Set of all request ids that are currently queued. Protected by queued_set_mutex_. ...

+ Here is the caller graph for this function:

std::pair<size_t, size_t> ExecutorResourceMgr_Namespace::ExecutorResourceMgr::get_resource_info ( const ResourceType  resource_type) const
inline

Returns the allocated and total available amount of the resource specified.

Interally queries ExecutorResourcePool

Returns
std::pair<size_t, size_t> - First member is the allocated amount of the resource, the second member is the total amount of the resource (including allocated and available)

Definition at line 224 of file ExecutorResourceMgr.h.

References executor_resource_pool_, and ExecutorResourceMgr_Namespace::ExecutorResourcePool::get_resource_info().

224  {
225  return executor_resource_pool_.get_resource_info(resource_type);
226  }
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
std::pair< size_t, size_t > get_resource_info(const ResourceType resource_type) const
Returns the allocated and total available amount of the resource specified.

+ Here is the call graph for this function:

ResourcePoolInfo ExecutorResourceMgr_Namespace::ExecutorResourceMgr::get_resource_info ( ) const
inline

Returns a struct containing the total and allocated amounts of all resources tracked by ExecutorResourceMgr/ExecutorResourcePool, as well as the number of outstanding requests (in total and by resource type)

Returns
ResourcePoolInfo - Defined in ExecutorResourcePool.h, contains total and allocated amounts for all resources, and total requests outstanding and per type of resource

Definition at line 237 of file ExecutorResourceMgr.h.

References executor_resource_pool_, and ExecutorResourceMgr_Namespace::ExecutorResourcePool::get_resource_info().

Referenced by set_resource().

237  {
239  }
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
std::pair< size_t, size_t > get_resource_info(const ResourceType resource_type) const
Returns the allocated and total available amount of the resource specified.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::mark_request_dequed ( const RequestId  request_id)
private

Internal method: Moves the request from the QUEUED stage to EXECUTING stage and performs other bookkeeping.

Invoked by process_queue_loop after determing the next resource request to serve (via choose_next_request).

Parameters
request_id- RequestId of the request being moved out of the request queue (to be executed)

Definition at line 435 of file ExecutorResourceMgr.cpp.

References ACTUALLY_QUEUED_MIN_MS, add_request_to_stage(), CHECK_LT, CPU, ExecutorResourceMgr_Namespace::ExecutorStats::cpu_queue_length, ExecutorResourceMgr_Namespace::ExecutorStats::cpu_requests_actually_queued, ExecutorResourceMgr_Namespace::ExecutorStats::cpu_requests_executing, ExecutorResourceMgr_Namespace::RequestStats::deque_time, ExecutorResourceMgr_Namespace::RequestStats::enqueue_time, ExecutorResourceMgr_Namespace::EXECUTING, executor_stats_, ExecutorResourceMgr_Namespace::RequestStats::finished_queueing, GPU, ExecutorResourceMgr_Namespace::ExecutorStats::gpu_queue_length, ExecutorResourceMgr_Namespace::ExecutorStats::gpu_requests_actually_queued, ExecutorResourceMgr_Namespace::ExecutorStats::gpu_requests_executing, ExecutorResourceMgr_Namespace::ExecutorStats::queue_length, queue_stats_mutex_, ExecutorResourceMgr_Namespace::RequestStats::queue_time_ms, ExecutorResourceMgr_Namespace::QUEUED, remove_request_from_stage(), ExecutorResourceMgr_Namespace::RequestInfo::request_device_type, logger::request_id(), ExecutorResourceMgr_Namespace::RequestStats::request_info, ExecutorResourceMgr_Namespace::ExecutorStats::requests_actually_queued, requests_count_, ExecutorResourceMgr_Namespace::ExecutorStats::requests_executing, requests_stats_, ExecutorResourceMgr_Namespace::ExecutorStats::total_cpu_queue_time_ms, ExecutorResourceMgr_Namespace::ExecutorStats::total_gpu_queue_time_ms, ExecutorResourceMgr_Namespace::ExecutorStats::total_queue_time_ms, and UNREACHABLE.

Referenced by process_queue_loop().

435  {
436  const std::chrono::steady_clock::time_point deque_time =
437  std::chrono::steady_clock::now();
438  // Below is only to CHECK our request_id against high water mark... should be
439  // relatively inexpensive though
440  const size_t current_request_count = requests_count_.load(std::memory_order_relaxed);
441  CHECK_LT(request_id, current_request_count);
442  {
443  std::unique_lock<std::shared_mutex> queue_stats_write_lock(queue_stats_mutex_);
444  RequestStats& request_stats = requests_stats_[request_id];
445  request_stats.deque_time = deque_time;
446  request_stats.finished_queueing = true;
447  request_stats.queue_time_ms =
448  std::chrono::duration_cast<std::chrono::milliseconds>(request_stats.deque_time -
449  request_stats.enqueue_time)
450  .count();
451  }
454 
455  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
456  const RequestStats& request_stats = requests_stats_[request_id];
459  if (request_stats.queue_time_ms <= ACTUALLY_QUEUED_MIN_MS) {
460  executor_stats_.total_queue_time_ms += request_stats.queue_time_ms;
462  }
463  switch (request_stats.request_info.request_device_type) {
467  if (request_stats.queue_time_ms <= ACTUALLY_QUEUED_MIN_MS) {
468  executor_stats_.total_cpu_queue_time_ms += request_stats.queue_time_ms;
470  }
471  break;
475  if (request_stats.queue_time_ms <= ACTUALLY_QUEUED_MIN_MS) {
476  executor_stats_.total_gpu_queue_time_ms += request_stats.queue_time_ms;
478  }
479  break;
480  default:
481  UNREACHABLE();
482  }
483 }
void remove_request_from_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Removes the request specified by the provided request_id from the specified stage...
void add_request_to_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Adds the request specified by the provided request_id to the specified stage...
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
std::atomic< size_t > requests_count_
An atomic that is incremented with each incoming request, and used to assign RequestIds to incoming r...
#define CHECK_LT(x, y)
Definition: Logger.h:303
RequestId request_id()
Definition: Logger.cpp:876
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::mark_request_error ( const RequestId  request_id,
std::string  error_msg 
)
private

Definition at line 126 of file ExecutorResourceMgr.cpp.

References CHECK_LT, queue_stats_mutex_, logger::request_id(), and requests_stats_.

Referenced by process_queue_loop().

127  {
128  std::unique_lock<std::shared_mutex> queue_stats_write_lock(queue_stats_mutex_);
130  requests_stats_[request_id].error = std::move(error_msg);
131 }
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
#define CHECK_LT(x, y)
Definition: Logger.h:303
RequestId request_id()
Definition: Logger.cpp:876
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::mark_request_finished ( const RequestId  request_id)
private

Internal method: Invoked on successful completion of a query step from release_resources method, removes request from EXECUTING stage and performs various bookkeeping, including recording execution and final times in request_stats.

Parameters
request_id- RequestId for the resource request that has finished executing

Definition at line 517 of file ExecutorResourceMgr.cpp.

References CHECK_LT, CPU, ExecutorResourceMgr_Namespace::ExecutorStats::cpu_requests_executed, ExecutorResourceMgr_Namespace::ExecutorStats::cpu_requests_executing, ExecutorResourceMgr_Namespace::RequestStats::deque_time, ExecutorResourceMgr_Namespace::RequestStats::enqueue_time, ExecutorResourceMgr_Namespace::EXECUTING, ExecutorResourceMgr_Namespace::RequestStats::execution_finished_time, ExecutorResourceMgr_Namespace::RequestStats::execution_time_ms, executor_stats_, ExecutorResourceMgr_Namespace::RequestStats::finished_executing, GPU, ExecutorResourceMgr_Namespace::ExecutorStats::gpu_requests_executed, ExecutorResourceMgr_Namespace::ExecutorStats::gpu_requests_executing, queue_stats_mutex_, remove_request_from_stage(), ExecutorResourceMgr_Namespace::RequestInfo::request_device_type, logger::request_id(), ExecutorResourceMgr_Namespace::RequestStats::request_info, requests_count_, ExecutorResourceMgr_Namespace::ExecutorStats::requests_executed, ExecutorResourceMgr_Namespace::ExecutorStats::requests_executing, requests_stats_, ExecutorResourceMgr_Namespace::ExecutorStats::total_cpu_execution_time_ms, ExecutorResourceMgr_Namespace::ExecutorStats::total_cpu_time_ms, ExecutorResourceMgr_Namespace::ExecutorStats::total_execution_time_ms, ExecutorResourceMgr_Namespace::ExecutorStats::total_gpu_execution_time_ms, ExecutorResourceMgr_Namespace::ExecutorStats::total_gpu_time_ms, ExecutorResourceMgr_Namespace::ExecutorStats::total_time_ms, ExecutorResourceMgr_Namespace::RequestStats::total_time_ms, and UNREACHABLE.

Referenced by release_resources().

517  {
518  const std::chrono::steady_clock::time_point execution_finished_time =
519  std::chrono::steady_clock::now();
520  // Below is only to CHECK our request_id against high water mark... should be
521  // relatively inexpensive though
522  const size_t current_request_count = requests_count_.load(std::memory_order_relaxed);
523  CHECK_LT(request_id, current_request_count);
524  std::unique_lock<std::shared_mutex> queue_stats_write_lock(queue_stats_mutex_);
525  RequestStats& request_stats = requests_stats_[request_id];
526  request_stats.execution_finished_time = execution_finished_time;
527  request_stats.finished_executing = true;
528  request_stats.execution_time_ms =
529  std::chrono::duration_cast<std::chrono::milliseconds>(
530  request_stats.execution_finished_time - request_stats.deque_time)
531  .count();
532  request_stats.total_time_ms =
533  std::chrono::duration_cast<std::chrono::milliseconds>(
534  request_stats.execution_finished_time - request_stats.enqueue_time)
535  .count();
537 
540  executor_stats_.total_execution_time_ms += request_stats.execution_time_ms;
541  executor_stats_.total_time_ms += request_stats.total_time_ms;
542  switch (request_stats.request_info.request_device_type) {
546  executor_stats_.total_cpu_execution_time_ms += request_stats.execution_time_ms;
547  executor_stats_.total_cpu_time_ms += request_stats.total_time_ms;
548  break;
549  }
553  executor_stats_.total_gpu_execution_time_ms += request_stats.execution_time_ms;
554  executor_stats_.total_gpu_time_ms += request_stats.total_time_ms;
555  break;
556  }
557  default:
558  UNREACHABLE();
559  }
560 }
void remove_request_from_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Removes the request specified by the provided request_id from the specified stage...
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
std::atomic< size_t > requests_count_
An atomic that is incremented with each incoming request, and used to assign RequestIds to incoming r...
#define CHECK_LT(x, y)
Definition: Logger.h:303
RequestId request_id()
Definition: Logger.cpp:876
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::mark_request_timed_out ( const RequestId  request_id)
private

Internal method: Called if the request times out (i.e. request was made via request_resources_with_timeout), moves request out of QUEUED stage and does other bookkeeping on the request's RequestStats and on ExecutorStats.

Invoked from request_resources_with_timeout if a QueryTimedOutWaitingInQueue exception is thrown

Parameters
request_id- RequestId for the resource request that has timed out

Definition at line 485 of file ExecutorResourceMgr.cpp.

References CHECK, CHECK_GT, CHECK_LT, CPU, ExecutorResourceMgr_Namespace::ExecutorStats::cpu_queue_length, executor_stats_, ExecutorResourceMgr_Namespace::RequestStats::finished_queueing, GPU, ExecutorResourceMgr_Namespace::ExecutorStats::gpu_queue_length, ExecutorResourceMgr_Namespace::ExecutorStats::queue_length, queue_stats_mutex_, ExecutorResourceMgr_Namespace::QUEUED, remove_request_from_stage(), ExecutorResourceMgr_Namespace::RequestInfo::request_device_type, logger::request_id(), ExecutorResourceMgr_Namespace::RequestStats::request_info, requests_count_, requests_stats_, ExecutorResourceMgr_Namespace::ExecutorStats::requests_timed_out, ExecutorResourceMgr_Namespace::RequestStats::timed_out, ExecutorResourceMgr_Namespace::RequestStats::timeout_in_ms, and UNREACHABLE.

Referenced by request_resources_with_timeout().

485  {
486  const size_t current_request_count = requests_count_.load(std::memory_order_relaxed);
487  CHECK_LT(request_id, current_request_count);
488  {
489  std::unique_lock<std::shared_mutex> queue_stats_write_lock(queue_stats_mutex_);
490  RequestStats& request_stats = requests_stats_[request_id];
491  CHECK(!request_stats.finished_queueing);
492  CHECK_GT(request_stats.timeout_in_ms, size_t(0));
493  request_stats.timed_out = true;
494  }
496  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
497  const RequestStats& request_stats = requests_stats_[request_id];
501  switch (request_stats.request_info.request_device_type) {
505  break;
506  }
510  break;
511  }
512  default:
513  UNREACHABLE();
514  }
515 }
void remove_request_from_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Removes the request specified by the provided request_id from the specified stage...
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::atomic< size_t > requests_count_
An atomic that is incremented with each incoming request, and used to assign RequestIds to incoming r...
#define CHECK_LT(x, y)
Definition: Logger.h:303
RequestId request_id()
Definition: Logger.cpp:876
#define CHECK(condition)
Definition: Logger.h:291
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::pause_process_queue ( )

Pauses the process queue in a thread-safe manner, waiting for all queries in the executing stage to finish before yielding to the caller (ensuring that the ExecutorResourcePool has no outstanding allocations). If the process queue is already paused, the call is a no-op. This method is used to live-change parameters associated with ExecutorResourcePool.

Note that when the queue is fully paused, there will be no executing requests but there can be one or more queued requests.

Definition at line 245 of file ExecutorResourceMgr.cpp.

References CHECK_EQ, executor_stats_, logger::INFO, LOG, pause_process_queue_, pause_processor_queue_condition_, pause_processor_queue_mutex_, process_queue_is_paused_, processor_queue_condition_, processor_queue_mutex_, and ExecutorResourceMgr_Namespace::ExecutorStats::requests_executing.

Referenced by set_concurrent_resource_grant_policy(), and set_resource().

245  {
246  {
247  std::unique_lock<std::mutex> queue_lock(processor_queue_mutex_);
248  if (pause_process_queue_ || process_queue_is_paused_) { // Was already true, abort
249  LOG(INFO)
250  << "Pause of ExecutorResourceMgr queue was called, but was already paused. "
251  "Taking no action.";
252  return;
253  }
254  pause_process_queue_ = true;
255  }
256  processor_queue_condition_.notify_one();
257 
258  std::unique_lock<std::mutex> pause_queue_lock(pause_processor_queue_mutex_);
259  pause_processor_queue_condition_.wait(pause_queue_lock,
260  [=] { return process_queue_is_paused_; });
261 
263 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
#define LOG(tag)
Definition: Logger.h:285
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
std::mutex processor_queue_mutex_
RW mutex that protects access to stop_process_queue_thread_ and pause_processor_queue_ ...

+ Here is the caller graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::print_executor_stats ( ) const

Prints the ExecutorStats struct. Use for debugging.

Definition at line 173 of file ExecutorResourceMgr.cpp.

References get_executor_stats(), print_mutex_, and process_queue_counter_.

Referenced by process_queue_loop().

173  {
174  // Get atomic copy of executor_stats_ first
175  const auto executor_stats = get_executor_stats();
176  std::unique_lock<std::mutex> print_lock(print_mutex_);
177  std::cout << std::endl << "Executor Stats" << std::endl;
178  std::cout << "Requests: " << executor_stats.requests << std::endl;
179  std::cout << "CPU Requests: " << executor_stats.cpu_requests << std::endl;
180  std::cout << "GPU Requests: " << executor_stats.gpu_requests << std::endl;
181  std::cout << "Queue Length: " << executor_stats.queue_length << std::endl;
182  std::cout << "CPU Queue Length: " << executor_stats.cpu_queue_length << std::endl;
183  std::cout << "GPU Queue Length: " << executor_stats.gpu_queue_length << std::endl;
184  std::cout << "Total Queue Time(ms): " << executor_stats.total_queue_time_ms
185  << std::endl;
186  std::cout << "Total CPU Queue Time(ms): " << executor_stats.total_cpu_queue_time_ms
187  << std::endl;
188  std::cout << "Total GPU Queue Time(ms): " << executor_stats.total_gpu_queue_time_ms
189  << std::endl;
190  std::cout << "Requests Actually Queued: " << executor_stats.requests_actually_queued
191  << std::endl;
192  std::cout << "Requests Executing: " << executor_stats.requests_executing << std::endl;
193  std::cout << "Requests Executed: " << executor_stats.requests_executed << std::endl;
194  std::cout << "Total Execution Time(ms): " << executor_stats.total_execution_time_ms
195  << std::endl;
196  std::cout << "Total CPU Execution Time(ms): "
197  << executor_stats.total_cpu_execution_time_ms << std::endl;
198  std::cout << "Total GPU Execution Time(ms): "
199  << executor_stats.total_gpu_execution_time_ms << std::endl;
200  std::cout << "Total Time(ms): " << executor_stats.total_time_ms << std::endl;
201  std::cout << "Total CPU Time(ms): " << executor_stats.total_cpu_time_ms << std::endl;
202  std::cout << "Total GPU Time(ms): " << executor_stats.total_gpu_time_ms << std::endl;
203 
204  // Below technically not thread safe, but called from process_queue_loop for now so ok
205 
206  const double avg_execution_time_ms =
207  executor_stats.total_execution_time_ms /
208  std::max(executor_stats.requests_executed, size_t(1));
209  const double avg_cpu_execution_time_ms =
210  executor_stats.total_cpu_execution_time_ms /
211  std::max(executor_stats.cpu_requests_executed, size_t(1));
212  const double avg_gpu_execution_time_ms =
213  executor_stats.total_gpu_execution_time_ms /
214  std::max(executor_stats.gpu_requests_executed, size_t(1));
215  const double avg_total_time_ms = executor_stats.total_time_ms /
216  std::max(executor_stats.requests_executed, size_t(1));
217  const double avg_cpu_total_time_ms =
218  executor_stats.total_cpu_time_ms /
219  std::max(executor_stats.cpu_requests_executed, size_t(1));
220  const double avg_gpu_total_time_ms =
221  executor_stats.total_gpu_time_ms /
222  std::max(executor_stats.gpu_requests_executed, size_t(1));
223 
224  std::cout << "Avg Execution Time(ms): " << avg_execution_time_ms << std::endl;
225  std::cout << "Avg CPU Execution Time(ms): " << avg_cpu_execution_time_ms << std::endl;
226  std::cout << "Avg GPU Execution Time(ms): " << avg_gpu_execution_time_ms << std::endl;
227 
228  std::cout << "Avg Total Time(ms): " << avg_total_time_ms << std::endl;
229  std::cout << "Avg CPU Total Time(ms): " << avg_cpu_total_time_ms << std::endl;
230  std::cout << "Avg GPU Total Time(ms): " << avg_gpu_total_time_ms << std::endl;
231 
232  std::cout << "Process queue loop counter: " << process_queue_counter_ << std::endl
233  << std::endl;
234 }
ExecutorStats get_executor_stats() const
Returns a copy of the ExecutorStats struct held by ExecutorResourceMgr. Used for testing currently...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::process_queue_loop ( )
private

Internal method: A thread is assigned to run this function in the constructor of ExecutorResourceMgr, where it loops continuously waiting for changes to the process queue (i.e. the introduction of a new resource request or the finish of an existing request).

Definition at line 313 of file ExecutorResourceMgr.cpp.

References ExecutorResourceMgr_Namespace::ExecutorResourcePool::allocate_resources(), choose_next_request(), enable_debug_printing_, enable_stats_printing_, executor_resource_pool_, executor_stats_, get_request_for_id(), ExecutorResourceMgr_Namespace::ExecutorResourceMgrError::getErrorMsg(), ExecutorResourceMgr_Namespace::ExecutorResourceMgrError::getRequestId(), INVALID_REQUEST_ID, mark_request_dequed(), mark_request_error(), outstanding_queue_requests_, pause_process_queue_, pause_processor_queue_condition_, pause_processor_queue_mutex_, print_executor_stats(), print_mutex_, process_queue_counter_, process_queue_is_paused_, processor_queue_condition_, processor_queue_mutex_, ExecutorResourceMgr_Namespace::ExecutorStats::requests_executing, should_process_queue_, stop_process_queue_thread_, and ExecutorResourceMgr_Namespace::OutstandingQueueRequests::wake_request_by_id().

Referenced by ExecutorResourceMgr().

313  {
314  const size_t min_ms_between_print_stats{5000}; // 5 sec
317  }
318  std::chrono::steady_clock::time_point last_print_time =
319  std::chrono::steady_clock::now();
320  while (true) {
321  std::unique_lock<std::mutex> queue_lock(processor_queue_mutex_);
322  processor_queue_condition_.wait(queue_lock, [=] {
324  });
325  // Use the following flag to know when to exit
326  // (to prevent leaving this thread dangling at server shutdown)
329  false; // not strictly neccessary, but would be if we add threads
330  return;
331  }
332 
333  if (pause_process_queue_) {
334  should_process_queue_ = false;
336  {
337  std::unique_lock<std::mutex> pause_queue_lock(pause_processor_queue_mutex_);
339  }
341  }
342  continue;
343  }
344 
346  RequestId chosen_request_id;
347  try {
348  chosen_request_id = choose_next_request();
349  } catch (ExecutorResourceMgrError const& e) {
350  chosen_request_id = e.getRequestId();
351  mark_request_error(chosen_request_id, e.getErrorMsg());
352  }
354  std::unique_lock<std::mutex> print_lock(print_mutex_);
355  std::cout << "Process loop iteration: " << process_queue_counter_ - 1 << std::endl;
356  std::cout << "Process loop chosen request_id: " << chosen_request_id << std::endl;
357  }
358  if (chosen_request_id == INVALID_REQUEST_ID) {
359  // Means no query was found that could be currently run
360  // Below is safe as we hold an exclusive lock on processor_queue_mutex_
361  should_process_queue_ = false;
362  continue;
363  }
364  // If here we have a valid request id
365  mark_request_dequed(chosen_request_id);
366  const auto request_stats = get_request_for_id(chosen_request_id);
367  if (!request_stats.error) {
369  request_stats.actual_resource_grant,
370  request_stats.request_info.chunk_request_info);
371  }
373 
375  std::chrono::steady_clock::time_point current_time =
376  std::chrono::steady_clock::now();
377  const size_t ms_since_last_print_stats =
378  std::chrono::duration_cast<std::chrono::milliseconds>(current_time -
379  last_print_time)
380  .count();
381  if (ms_since_last_print_stats >= min_ms_between_print_stats) {
383  last_print_time = current_time;
384  }
385  }
386  // Leave should_process_queue_ as true to see if we can allocate resources for another
387  // request
388  }
389 }
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
void allocate_resources(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
Given a resource grant (assumed to be computed in determine_dynamic_resource_grant), actually allocate (reserve) the resources in the pool so other requestors (queries) cannot use those resources until returned to the pool.
void mark_request_error(const RequestId request_id, std::string error_msg)
std::mutex processor_queue_mutex_
RW mutex that protects access to stop_process_queue_thread_ and pause_processor_queue_ ...
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
void mark_request_dequed(const RequestId request_id)
Internal method: Moves the request from the QUEUED stage to EXECUTING stage and performs other bookke...
RequestId choose_next_request()
Internal method: Invoked from process_queue_loop, chooses the next resource request to grant...
OutstandingQueueRequests outstanding_queue_requests_
Stores and manages a map of request ids to BinarySemaphore objects to allow threads waiting for resou...
void wake_request_by_id(const RequestId request_id)
Wakes a waiting thread in the queue. Invoked by ExecutorResourceMgr::process_queue_loop() ...
void print_executor_stats() const
Prints the ExecutorStats struct. Use for debugging.
RequestStats get_request_for_id(const RequestId request_id) const
Internal method: Returns the RequestStats for a request specified by request_id.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::release_resources ( const RequestId  request_id,
const ResourceGrant resource_grant 
)

Instructs ExecutorResourceMgr that the resources held by the requestor with the given request_id can be freed/returned to the ExecutorResourcePool.

This method is only called automatically in the destructor of ExecutorResourceHandle (i.e. when it goes out of scope along with the parent executor thread)

Parameters
request_id- RequestId for the query step that requested the resources
resource_grant- The resources that were granted from ExecutorResourcePool and that now will be freed/returned to the pool

Definition at line 108 of file ExecutorResourceMgr.cpp.

References ExecutorResourceMgr_Namespace::ExecutorResourcePool::deallocate_resources(), executor_resource_pool_, get_chunk_request_info(), ExecutorResourceMgr_Namespace::ResourceGrant::is_empty(), mark_request_finished(), processor_queue_condition_, and set_process_queue_flag().

109  {
110  if (!resource_grant.is_empty()) { // Should only be empty if request times out, should
111  // we CHECK for this
112  const auto chunk_request_info = get_chunk_request_info(request_id);
113  executor_resource_pool_.deallocate_resources(resource_grant, chunk_request_info);
114  }
117  processor_queue_condition_.notify_one();
118 }
ChunkRequestInfo get_chunk_request_info(const RequestId request_id)
Get the DataMgr chunk ids and associated sizes pertaining to the input data needed by a request...
void deallocate_resources(const ResourceGrant &resource_grant, const ChunkRequestInfo &chunk_request_info)
Deallocates resources granted to a requestor such that they can be used for other requests...
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
RequestId request_id()
Definition: Logger.cpp:876
void mark_request_finished(const RequestId request_id)
Internal method: Invoked on successful completion of a query step from release_resources method...
void set_process_queue_flag()
Internal method: Set the should_process_queue_ flag to true, signifying that the queue should be proc...

+ Here is the call graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::remove_request_from_stage ( const RequestId  request_id,
const ExecutionRequestStage  request_status 
)
private

Internal method: Removes the request specified by the provided request_id from the specified stage.

Parameters
request_id- Request id to remove from the specified stage
request_status- Stage (QUEUED or EXECUTING) to remove the specified request from

Definition at line 590 of file ExecutorResourceMgr.cpp.

References CHECK_EQ, executing_requests_, executing_set_mutex_, ExecutorResourceMgr_Namespace::QUEUED, queued_requests_, and queued_set_mutex_.

Referenced by mark_request_dequed(), mark_request_finished(), and mark_request_timed_out().

592  {
593  auto& chosen_set = request_stage == ExecutionRequestStage::QUEUED ? queued_requests_
595  auto& chosen_mutex = request_stage == ExecutionRequestStage::QUEUED
598  std::unique_lock<std::shared_mutex> set_write_lock(chosen_mutex);
599 
600  CHECK_EQ(chosen_set.erase(request_id),
601  size_t(1)); // Should return 1 as element must be in set
602 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::set< RequestId > executing_requests_
Set of all request ids that are currently executing (i.e. post-granting of resources). Protected by executing_set_mutex_.
std::shared_mutex queued_set_mutex_
RW mutex that protects access to queued_requests_
RequestId request_id()
Definition: Logger.cpp:876
std::shared_mutex executing_set_mutex_
RW mutex that protects access to executing_requests_
std::set< RequestId > queued_requests_
Set of all request ids that are currently queued. Protected by queued_set_mutex_. ...

+ Here is the caller graph for this function:

std::unique_ptr< ExecutorResourceHandle > ExecutorResourceMgr_Namespace::ExecutorResourceMgr::request_resources ( const RequestInfo request_info)

Requests resources from ExecutorResourceMgr, with no timeout (unlike request_resources_with_timeout)

Internally calls request_resources_with_timeout with 0 timeout, specifying no-timeout.

Parameters
request_info- Details the resources requested
Returns
std::unique_ptr<ExecutorResourceHandle> - The actual resource grant, which when it goes out of scope (and the destructor is called), will return the allocated resources to the ExecutorResourcePool

Definition at line 101 of file ExecutorResourceMgr.cpp.

References request_resources_with_timeout().

102  {
104  request_info,
105  static_cast<size_t>(0)); // 0 signifies no timeout
106 }
std::unique_ptr< ExecutorResourceHandle > request_resources_with_timeout(const RequestInfo &request_info, const size_t timeout_in_ms)
Requests resources from ExecutorResourceMgr, will throw if request takes longer than time specified b...

+ Here is the call graph for this function:

std::unique_ptr< ExecutorResourceHandle > ExecutorResourceMgr_Namespace::ExecutorResourceMgr::request_resources_with_timeout ( const RequestInfo request_info,
const size_t  timeout_in_ms 
)

Requests resources from ExecutorResourceMgr, will throw if request takes longer than time specified by timeout_in_ms.

Parameters
request_info- Details the resources requested
timeout_in_ms- Specifies the max time in ms the requesting thread should wait for a request before an exception is thrown
Returns
std::unique_ptr<ExecutorResourceHandle> - The actual resource grant, whcih when it goes out of scope (and the destructor is called), will return the allocated resources to the ExecutorResourcePool

Definition at line 45 of file ExecutorResourceMgr.cpp.

References ExecutorResourceMgr_Namespace::RequestStats::actual_resource_grant, ExecutorResourceMgr_Namespace::ExecutorResourcePool::calc_min_max_resource_grants_for_request(), CHECK_GE, ExecutorResourceMgr_Namespace::ResourceGrant::cpu_result_mem, ExecutorResourceMgr_Namespace::ResourceGrant::cpu_slots, enable_debug_printing_, enqueue_request(), ExecutorResourceMgr_Namespace::RequestStats::error, executor_resource_pool_, ExecutorResourceMgr_Namespace::ResourceGrant::gpu_slots, mark_request_timed_out(), outstanding_queue_requests_, print_mutex_, processor_queue_condition_, ExecutorResourceMgr_Namespace::OutstandingQueueRequests::queue_request_and_wait(), ExecutorResourceMgr_Namespace::OutstandingQueueRequests::queue_request_and_wait_with_timeout(), queue_stats_mutex_, logger::request_id(), requests_stats_, and set_process_queue_flag().

Referenced by request_resources().

46  {
47  std::pair<ResourceGrant, ResourceGrant> min_max_resource_grants;
48 
49  // Following can throw
50  // Should we put in stats to track errors?
51  min_max_resource_grants =
53 
54  const auto request_id = enqueue_request(request_info,
55  timeout_in_ms,
56  min_max_resource_grants.first,
57  min_max_resource_grants.second);
58 
60  std::unique_lock<std::mutex> print_lock(print_mutex_);
61  std::cout << std::endl << "Min resource grant";
62  min_max_resource_grants.first.print();
63  std::cout << std::endl << "Max resource grant";
64  min_max_resource_grants.second.print();
65  }
66 
68  processor_queue_condition_.notify_one();
69 
70  // Following queue_request methods will block until ExecutorResourceMgr lets them
71  // execute
72  if (timeout_in_ms > 0) {
73  try {
75  timeout_in_ms);
76  } catch (QueryTimedOutWaitingInQueue& timeout_exception) {
77  // Need to annotate request and executor stats accordingly
79  throw;
80  }
81  } else {
83  }
84 
85  auto this_ptr = shared_from_this();
86  std::shared_lock<std::shared_mutex> queue_stats_read_lock(queue_stats_mutex_);
87  RequestStats const& request_stats = requests_stats_[request_id];
88  if (request_stats.error) {
89  throw std::runtime_error("RequestStats error: " + *request_stats.error);
90  }
91  const ResourceGrant& actual_resource_grant = request_stats.actual_resource_grant;
92  // Ensure each resource granted was at least the minimum requested
93  CHECK_GE(actual_resource_grant.cpu_slots, min_max_resource_grants.first.cpu_slots);
94  CHECK_GE(actual_resource_grant.gpu_slots, min_max_resource_grants.first.gpu_slots);
95  CHECK_GE(actual_resource_grant.cpu_result_mem,
96  min_max_resource_grants.first.cpu_result_mem);
97  return std::make_unique<ExecutorResourceHandle>(
98  this_ptr, request_id, actual_resource_grant);
99 }
void queue_request_and_wait(const RequestId request_id)
Submits a request with id request_id into the queue, waiting on a BinarySemaphore until ExecutorResou...
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
std::pair< ResourceGrant, ResourceGrant > calc_min_max_resource_grants_for_request(const RequestInfo &resource_request) const
Given the provided resource_request, statically calculate the minimum and maximum grantable resources...
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
OutstandingQueueRequests outstanding_queue_requests_
Stores and manages a map of request ids to BinarySemaphore objects to allow threads waiting for resou...
RequestId request_id()
Definition: Logger.cpp:876
void queue_request_and_wait_with_timeout(const RequestId request_id, const size_t max_wait_in_ms)
Submits a request with id request_id into the queue, waiting on a BinarySemaphore until ExecutorResou...
void mark_request_timed_out(const RequestId request_id)
Internal method: Called if the request times out (i.e. request was made via request_resources_with_ti...
void set_process_queue_flag()
Internal method: Set the should_process_queue_ flag to true, signifying that the queue should be proc...
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...
RequestId enqueue_request(const RequestInfo &request_info, const size_t timeout_in_ms, const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant)
Internal method: Invoked from request_resource/request_resource_with_timeout, places request in the r...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::resume_process_queue ( )

Resumes the process queue in a thread-safe manner. If the process queue is already paused, the call is a no-op.

Definition at line 265 of file ExecutorResourceMgr.cpp.

References CHECK_EQ, executor_stats_, logger::INFO, LOG, pause_process_queue_, process_queue_is_paused_, processor_queue_condition_, processor_queue_mutex_, ExecutorResourceMgr_Namespace::ExecutorStats::requests_executing, and should_process_queue_.

Referenced by set_concurrent_resource_grant_policy(), and set_resource().

265  {
266  {
267  std::unique_lock<std::mutex> queue_lock(processor_queue_mutex_);
269  LOG(INFO)
270  << "Resume of ExecutorResourceMgr queue was called, but was not paused. Taking "
271  "no action.";
272  return;
273  }
275  process_queue_is_paused_ = false;
276  pause_process_queue_ = false;
277  should_process_queue_ = true;
278  }
279  processor_queue_condition_.notify_one();
280 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
#define LOG(tag)
Definition: Logger.h:285
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
std::mutex processor_queue_mutex_
RW mutex that protects access to stop_process_queue_thread_ and pause_processor_queue_ ...

+ Here is the caller graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::set_concurrent_resource_grant_policy ( const ConcurrentResourceGrantPolicy concurrent_resource_grant_policy)

Set the concurrent resource grant policy for a given resource type (stored in ConcurrentResourceGrantPolicy)

Parameters
concurrent_resource_grant_policy- Object containing the resource type and the concurrency policies for when the resource is undersubscribed and oversubscribed

Definition at line 298 of file ExecutorResourceMgr.cpp.

References CHECK, ExecutorResourceMgr_Namespace::ConcurrentResourceGrantPolicy::concurrency_policy, executor_resource_pool_, ExecutorResourceMgr_Namespace::ExecutorResourcePool::get_concurrent_resource_grant_policy(), ExecutorResourceMgr_Namespace::ConcurrentResourceGrantPolicy::oversubscription_concurrency_policy, pause_process_queue(), ExecutorResourceMgr_Namespace::ConcurrentResourceGrantPolicy::resource_type, resume_process_queue(), and ExecutorResourceMgr_Namespace::ExecutorResourcePool::set_concurrent_resource_grant_policy().

299  {
302  concurrent_resource_grant_policy);
303  const auto applied_concurrent_resource_grant_policy =
305  concurrent_resource_grant_policy.resource_type);
306  CHECK(concurrent_resource_grant_policy.concurrency_policy ==
307  applied_concurrent_resource_grant_policy.concurrency_policy);
308  CHECK(concurrent_resource_grant_policy.oversubscription_concurrency_policy ==
309  applied_concurrent_resource_grant_policy.oversubscription_concurrency_policy);
311 }
void set_concurrent_resource_grant_policy(const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
Resets the concurrent resource grant policy object, which specifies a ResourceType as well as normal ...
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
void pause_process_queue()
Pauses the process queue in a thread-safe manner, waiting for all queries in the executing stage to f...
#define CHECK(condition)
Definition: Logger.h:291
void resume_process_queue()
Resumes the process queue in a thread-safe manner. If the process queue is already paused...
ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ResourceType resource_type) const

+ Here is the call graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::set_process_queue_flag ( )
inlineprivate

Internal method: Set the should_process_queue_ flag to true, signifying that the queue should be processed. Protected by a lock on processor_queue_mutex_.

Invoked in two places: 1) release_resources (as the resources have been returned to the pool, potentially permitting granting of resources to another request) 2) After enqueue_request, as we have a new request to evaluate whether if it can be served

Definition at line 401 of file ExecutorResourceMgr.h.

References processor_queue_mutex_, and should_process_queue_.

Referenced by release_resources(), and request_resources_with_timeout().

401  {
402  std::unique_lock<std::mutex> queue_lock(processor_queue_mutex_);
403  should_process_queue_ = true;
404  }
std::mutex processor_queue_mutex_
RW mutex that protects access to stop_process_queue_thread_ and pause_processor_queue_ ...

+ Here is the caller graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::set_resource ( const ResourceType  resource_type,
const size_t  resoure_quantity 
)

Used to change the total amount available of a specified resource after construction of ExecutorResourceMgr

Currently only used for testing, but could also be used to provide adminstrator DDL commands to change resource availability in a running server

Parameters
resource_type- Type of resource to alter the quantity of
resoure_quantity- The new quantity available of the resource

Definition at line 282 of file ExecutorResourceMgr.cpp.

References CHECK_EQ, executor_resource_pool_, get_resource_info(), pause_process_queue(), resume_process_queue(), and ExecutorResourceMgr_Namespace::ExecutorResourcePool::set_resource().

283  {
285  CHECK_EQ(get_resource_info(resource_type).first, size_t(0));
286  executor_resource_pool_.set_resource(resource_type, resource_quantity);
287  const auto resource_info = get_resource_info(resource_type);
288  CHECK_EQ(resource_info.first, size_t(0));
289  CHECK_EQ(resource_info.second, resource_quantity);
291 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
void set_resource(const ResourceType resource_type, const size_t resource_quantity)
Sets the quantity of resource_type to resource_quantity. If pool has outstanding requests, will throw. Responsibility of allowing the pool to empty and preventing concurrent requests while this operation is running is left to the caller (in particular, ExecutorResourceMgr::set_resource pauses the process queue, which waits until all executing requests are finished before yielding to the caller, before calling this method).
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
ResourcePoolInfo get_resource_info() const
Returns a struct containing the total and allocated amounts of all resources tracked by ExecutorResou...
void pause_process_queue()
Pauses the process queue in a thread-safe manner, waiting for all queries in the executing stage to f...
void resume_process_queue()
Resumes the process queue in a thread-safe manner. If the process queue is already paused...

+ Here is the call graph for this function:

void ExecutorResourceMgr_Namespace::ExecutorResourceMgr::stop_process_queue_thread ( )
private

Internal method: Invoked from ExecutorResourceMgr destructor, sets stop_process_queue_thread_ to true (behind a lock on processor_queue_mutex_) and then attempts to join all threads left in the request queue on server shutdown.

Definition at line 236 of file ExecutorResourceMgr.cpp.

References process_queue_thread_, processor_queue_condition_, processor_queue_mutex_, and stop_process_queue_thread_.

Referenced by ~ExecutorResourceMgr().

236  {
237  {
238  std::unique_lock<std::mutex> queue_lock(processor_queue_mutex_);
240  }
241  processor_queue_condition_.notify_one();
242  process_queue_thread_.join();
243 }
std::mutex processor_queue_mutex_
RW mutex that protects access to stop_process_queue_thread_ and pause_processor_queue_ ...
std::thread process_queue_thread_
The thread started in the ExecutorResourceMgr constructor that continuously loops inside of process_q...

+ Here is the caller graph for this function:

Member Data Documentation

const size_t ExecutorResourceMgr_Namespace::ExecutorResourceMgr::ACTUALLY_QUEUED_MIN_MS {2}
private

Definition at line 477 of file ExecutorResourceMgr.h.

Referenced by mark_request_dequed().

const bool ExecutorResourceMgr_Namespace::ExecutorResourceMgr::enable_debug_printing_ {false}
private
const bool ExecutorResourceMgr_Namespace::ExecutorResourceMgr::enable_stats_printing_ {false}
private

Definition at line 557 of file ExecutorResourceMgr.h.

Referenced by process_queue_loop().

std::set<RequestId> ExecutorResourceMgr_Namespace::ExecutorResourceMgr::executing_requests_
private

Set of all request ids that are currently executing (i.e. post-granting of resources). Protected by executing_set_mutex_.

Definition at line 540 of file ExecutorResourceMgr.h.

Referenced by add_request_to_stage(), get_requests_for_stage(), and remove_request_from_stage().

std::shared_mutex ExecutorResourceMgr_Namespace::ExecutorResourceMgr::executing_set_mutex_
mutableprivate

RW mutex that protects access to executing_requests_

Definition at line 512 of file ExecutorResourceMgr.h.

Referenced by add_request_to_stage(), get_requests_for_stage(), and remove_request_from_stage().

ExecutorResourcePool ExecutorResourceMgr_Namespace::ExecutorResourceMgr::executor_resource_pool_
private
ExecutorStats ExecutorResourceMgr_Namespace::ExecutorResourceMgr::executor_stats_
private

Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.

Definition at line 475 of file ExecutorResourceMgr.h.

Referenced by enqueue_request(), get_executor_stats(), mark_request_dequed(), mark_request_finished(), mark_request_timed_out(), pause_process_queue(), process_queue_loop(), and resume_process_queue().

const RequestId ExecutorResourceMgr_Namespace::ExecutorResourceMgr::INVALID_REQUEST_ID {std::numeric_limits<size_t>::max()}
private

Definition at line 560 of file ExecutorResourceMgr.h.

Referenced by choose_next_request(), and process_queue_loop().

const double ExecutorResourceMgr_Namespace::ExecutorResourceMgr::max_available_resource_use_ratio_
private

Definition at line 562 of file ExecutorResourceMgr.h.

Referenced by choose_next_request(), and ExecutorResourceMgr().

OutstandingQueueRequests ExecutorResourceMgr_Namespace::ExecutorResourceMgr::outstanding_queue_requests_
private

Stores and manages a map of request ids to BinarySemaphore objects to allow threads waiting for resources to be selectively queued/blocked and then when they are choosen for resource grants/execution, woken.

Definition at line 528 of file ExecutorResourceMgr.h.

Referenced by process_queue_loop(), and request_resources_with_timeout().

bool ExecutorResourceMgr_Namespace::ExecutorResourceMgr::pause_process_queue_ {false}
private
std::condition_variable ExecutorResourceMgr_Namespace::ExecutorResourceMgr::pause_processor_queue_condition_
private

Definition at line 515 of file ExecutorResourceMgr.h.

Referenced by pause_process_queue(), and process_queue_loop().

std::mutex ExecutorResourceMgr_Namespace::ExecutorResourceMgr::pause_processor_queue_mutex_
mutableprivate

Definition at line 495 of file ExecutorResourceMgr.h.

Referenced by pause_process_queue(), and process_queue_loop().

std::mutex ExecutorResourceMgr_Namespace::ExecutorResourceMgr::print_mutex_
mutableprivate
size_t ExecutorResourceMgr_Namespace::ExecutorResourceMgr::process_queue_counter_ {0}
private
bool ExecutorResourceMgr_Namespace::ExecutorResourceMgr::process_queue_is_paused_ {false}
private
std::thread ExecutorResourceMgr_Namespace::ExecutorResourceMgr::process_queue_thread_
private

The thread started in the ExecutorResourceMgr constructor that continuously loops inside of process_queue_loop to determine the next resource request that should be granted.

When the database is stopped/shut down, join will be called on this thread to clean up outstanding threads waiting on resource requests

Definition at line 487 of file ExecutorResourceMgr.h.

Referenced by ExecutorResourceMgr(), and stop_process_queue_thread().

std::condition_variable ExecutorResourceMgr_Namespace::ExecutorResourceMgr::processor_queue_condition_
private
std::mutex ExecutorResourceMgr_Namespace::ExecutorResourceMgr::processor_queue_mutex_
mutableprivate

RW mutex that protects access to stop_process_queue_thread_ and pause_processor_queue_

Definition at line 493 of file ExecutorResourceMgr.h.

Referenced by pause_process_queue(), process_queue_loop(), resume_process_queue(), set_process_queue_flag(), and stop_process_queue_thread().

std::shared_mutex ExecutorResourceMgr_Namespace::ExecutorResourceMgr::queue_stats_mutex_
mutableprivate
std::set<RequestId> ExecutorResourceMgr_Namespace::ExecutorResourceMgr::queued_requests_
private

Set of all request ids that are currently queued. Protected by queued_set_mutex_.

Definition at line 534 of file ExecutorResourceMgr.h.

Referenced by add_request_to_stage(), get_requests_for_stage(), and remove_request_from_stage().

std::shared_mutex ExecutorResourceMgr_Namespace::ExecutorResourceMgr::queued_set_mutex_
mutableprivate

RW mutex that protects access to queued_requests_

Definition at line 507 of file ExecutorResourceMgr.h.

Referenced by add_request_to_stage(), get_requests_for_stage(), and remove_request_from_stage().

std::atomic<size_t> ExecutorResourceMgr_Namespace::ExecutorResourceMgr::requests_count_ {0}
private

An atomic that is incremented with each incoming request, and used to assign RequestIds to incoming request.

Definition at line 468 of file ExecutorResourceMgr.h.

Referenced by enqueue_request(), mark_request_dequed(), mark_request_finished(), and mark_request_timed_out().

std::vector<RequestStats> ExecutorResourceMgr_Namespace::ExecutorResourceMgr::requests_stats_
private

Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming request appending a RequestStats struct to this vector. Protected by queue_stats_mutex_.

With a long-running server this vector could become quite long, but the RequestStats objects are light enough where the total memory needed should still be negligible compared to all the other things stored in the server (even 100K requests would only total to a handful of MB). The longer-term goal for this state is for ExecutorResourceMgr to use it as historical data to optimize query/request scheduling based on usage/request patterns.

Definition at line 555 of file ExecutorResourceMgr.h.

Referenced by choose_next_request(), enqueue_request(), get_chunk_request_info(), get_request_for_id(), mark_request_dequed(), mark_request_error(), mark_request_finished(), mark_request_timed_out(), and request_resources_with_timeout().

bool ExecutorResourceMgr_Namespace::ExecutorResourceMgr::should_process_queue_ {false}
private
bool ExecutorResourceMgr_Namespace::ExecutorResourceMgr::stop_process_queue_thread_ {false}
private

Definition at line 518 of file ExecutorResourceMgr.h.

Referenced by process_queue_loop(), and stop_process_queue_thread().


The documentation for this class was generated from the following files: