OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
QueryDispatchQueue Class Reference

#include <QueryDispatchQueue.h>

Public Types

using Task = std::packaged_task< void(size_t)>
 

Public Member Functions

 QueryDispatchQueue (const size_t parallel_executors_max)
 
void submit (std::shared_ptr< Task > task, const bool is_update_delete)
 
bool hasIdleWorker ()
 
 ~QueryDispatchQueue ()
 

Private Member Functions

void worker (const size_t worker_idx)
 

Private Attributes

std::mutex queue_mutex_
 
std::condition_variable cv_
 
std::mutex update_delete_mutex_
 
bool threads_should_exit_ {false}
 
std::queue< std::shared_ptr
< Task > > 
queue_
 
std::vector< std::thread > workers_
 
int num_running_workers_
 
int num_workers_
 

Detailed Description

QueryDispatchQueue maintains a list of pending queries and dispatches those queries as Executors become available

Definition at line 29 of file QueryDispatchQueue.h.

Member Typedef Documentation

using QueryDispatchQueue::Task = std::packaged_task<void(size_t)>

Definition at line 31 of file QueryDispatchQueue.h.

Constructor & Destructor Documentation

QueryDispatchQueue::QueryDispatchQueue ( const size_t  parallel_executors_max)
inline

Definition at line 33 of file QueryDispatchQueue.h.

References num_running_workers_, num_workers_, worker(), and workers_.

33  {
34  workers_.resize(parallel_executors_max);
35  for (size_t i = 0; i < workers_.size(); i++) {
36  // worker IDs are 1-indexed, leaving Executor 0 for non-dispatch queue worker tasks
37  workers_[i] = std::thread(&QueryDispatchQueue::worker, this, i + 1);
38  }
40  num_workers_ = parallel_executors_max;
41  }
void worker(const size_t worker_idx)
std::vector< std::thread > workers_

+ Here is the call graph for this function:

QueryDispatchQueue::~QueryDispatchQueue ( )
inline

Definition at line 74 of file QueryDispatchQueue.h.

References cv_, queue_mutex_, threads_should_exit_, worker(), and workers_.

74  {
75  {
76  std::lock_guard<decltype(queue_mutex_)> lock(queue_mutex_);
77  threads_should_exit_ = true;
78  }
79  cv_.notify_all();
80  for (auto& worker : workers_) {
81  worker.join();
82  }
83  }
std::condition_variable cv_
void worker(const size_t worker_idx)
std::vector< std::thread > workers_

+ Here is the call graph for this function:

Member Function Documentation

bool QueryDispatchQueue::hasIdleWorker ( )
inline

Definition at line 69 of file QueryDispatchQueue.h.

References num_running_workers_, num_workers_, and queue_mutex_.

69  {
70  std::lock_guard<decltype(queue_mutex_)> lock(queue_mutex_);
72  }
void QueryDispatchQueue::submit ( std::shared_ptr< Task task,
const bool  is_update_delete 
)
inline

Submit a new task to the queue. Blocks until the task begins execution. The caller is expected to maintain a copy of the shared_ptr which will be used to access results once the task runs.

Definition at line 48 of file QueryDispatchQueue.h.

References CHECK, cv_, logger::INFO, LOG, queue_, queue_mutex_, update_delete_mutex_, and workers_.

48  {
49  if (workers_.size() == 1 && is_update_delete) {
50  std::lock_guard<decltype(update_delete_mutex_)> update_delete_lock(
52  CHECK(task);
53  // We only have 1 worker. Run this task on the calling thread on a special, second
54  // worker. The task is under the update delete lock, so we don't have to worry about
55  // contention on the special worker. This protects against deadlocks should the
56  // query running (or any pending queries) hold a read lock on something that
57  // requires a write lock during update/delete.
58  (*task)(2);
59  return;
60  }
61  std::unique_lock<decltype(queue_mutex_)> lock(queue_mutex_);
62 
63  LOG(INFO) << "Dispatching query with " << queue_.size() << " queries in the queue.";
64  queue_.push(task);
65  lock.unlock();
66  cv_.notify_all();
67  }
std::condition_variable cv_
std::mutex update_delete_mutex_
#define LOG(tag)
Definition: Logger.h:285
std::queue< std::shared_ptr< Task > > queue_
std::vector< std::thread > workers_
#define CHECK(condition)
Definition: Logger.h:291
void QueryDispatchQueue::worker ( const size_t  worker_idx)
inlineprivate

Definition at line 86 of file QueryDispatchQueue.h.

References CHECK, cv_, logger::INFO, LOG, num_running_workers_, queue_, queue_mutex_, and threads_should_exit_.

Referenced by QueryDispatchQueue(), and ~QueryDispatchQueue().

86  {
87  std::unique_lock<std::mutex> lock(queue_mutex_);
88  while (true) {
89  cv_.wait(lock, [this] { return !queue_.empty() || threads_should_exit_; });
90 
92  return;
93  }
94 
95  if (!queue_.empty()) {
96  auto task = queue_.front();
97  queue_.pop();
99 
100  LOG(INFO) << "Worker " << worker_idx
101  << " running query and returning control. There are now "
102  << num_running_workers_ << " workers are running and " << queue_.size()
103  << " queries in the queue.";
104  // allow other threads to pick up tasks
105  lock.unlock();
106  CHECK(task);
107  (*task)(worker_idx);
108  // wait for signal
109  lock.lock();
111  }
112  }
113  }
std::condition_variable cv_
#define LOG(tag)
Definition: Logger.h:285
std::queue< std::shared_ptr< Task > > queue_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

Member Data Documentation

std::condition_variable QueryDispatchQueue::cv_
private

Definition at line 116 of file QueryDispatchQueue.h.

Referenced by submit(), worker(), and ~QueryDispatchQueue().

int QueryDispatchQueue::num_running_workers_
private

Definition at line 123 of file QueryDispatchQueue.h.

Referenced by hasIdleWorker(), QueryDispatchQueue(), and worker().

int QueryDispatchQueue::num_workers_
private

Definition at line 124 of file QueryDispatchQueue.h.

Referenced by hasIdleWorker(), and QueryDispatchQueue().

std::queue<std::shared_ptr<Task> > QueryDispatchQueue::queue_
private

Definition at line 121 of file QueryDispatchQueue.h.

Referenced by submit(), and worker().

std::mutex QueryDispatchQueue::queue_mutex_
private

Definition at line 115 of file QueryDispatchQueue.h.

Referenced by hasIdleWorker(), submit(), worker(), and ~QueryDispatchQueue().

bool QueryDispatchQueue::threads_should_exit_ {false}
private

Definition at line 120 of file QueryDispatchQueue.h.

Referenced by worker(), and ~QueryDispatchQueue().

std::mutex QueryDispatchQueue::update_delete_mutex_
private

Definition at line 118 of file QueryDispatchQueue.h.

Referenced by submit().

std::vector<std::thread> QueryDispatchQueue::workers_
private

Definition at line 122 of file QueryDispatchQueue.h.

Referenced by QueryDispatchQueue(), submit(), and ~QueryDispatchQueue().


The documentation for this class was generated from the following file: