Google Summer of Code 2014 Boost.Thread
Proposal
With the introduction of <thread>
C++ programmers now have a standardized method for thread creation.
Unfortunately the management of these threads is still largely left to the programmer to handle.
Common tools such as thread pools are a basic feature of many other languages such as Java and C# that make thread management significantly easier.
This a proposal is for adding executors
, basic thread management tools, into Boost.Thread
.
It is based on the following proposals from openstd.org and a partial implementation that has already been started.
The core idea behind executor are that work units are created as std::function<void>()>
types and queued in some manner such that they can
be executed at some point in the future.
Closures can be create by using lambda or using std::bind
. Here are a few examples from cppreference.com
void print_num(int i) {
std::cout << i << '\n';
}
// store a lambda
std::function<void()> f_display_42 = []() { print_num(42); };
f_display_42();
// store the result of a call to std::bind
std::function<void()> f_display_31337 = std::bind(print_num, 31337);
f_display_31337();
These work units can thus be stored and executed via the ()
operator at a later point in time.
Using closures a basic fixed size thread pool can be created in a few lines. Obviously the following implementation is not thread safe and incomplete but just servers as quick example.
class thread_pool {
std::queue<std::function<void()>> tasks;
std::vector<std::thread> workers;
thread_pool(int num_threads) {
for(int i = 0; i < num_threads: i++){
workers.emplace_back(std::thread([this](){
while(1){
auto work = tasks.front();
tasks.pop();
work();
}
});
}
}
void submit(std::function closure){
tasks.push(closure);
}
};
A complete implementation can be found on my GitHub.
This proposal will focus on the scheduled_executor
interface and its concrete implementation as work has already started on other executors
such as thread_pool
and serial_executor
.
The scheduled_executor
interface extends the executor
interface and includes the new functions submit_at
and submit_after
.
Both interfaces are shown below.
class executor {
virtual ~exector();
virtual void submit(std::function<void()> closure) = 0;
virtual int num_pending() = 0;
};
class scheduled_executor : executor {
virtual submit_at(chrono::system_clock::time_point abs_time, std::function<void()> closure) = 0;
virtual submit_after(chrono::system_clock::duration rel_time, std::function<void()> closure) = 0;
}
The submit_at
function accepts a time_point
and a closure
and guarantees that the submitted closure will not be scheduled to run until, or after, the give time point.
The submit_after
function accepts a duration
and a closure
and guarantees that the submitted closure will not be scheduled to run until duration
(nano/mirco/mille/seconds/minutes) from when it is submitted.
On possible implementation would be based on a priority queue. Tasks would be submitted to the queue which would be sorted based on time_point
's relation operators. Therefore the closure that was submitted with least time_point
would be at the front of the queue. Closures submitted with a duration
would be changed to time_point
s based on the time of submission. Submissions using submit
could default to using NOW as their time_point
.
Worker thread(s) would then query the front of the queue to see if the next task is eligible to be executed. If not then the worker thread could calculate the amount until the task will eligible to be executed then use wait_for
or wait_until
on a condition variable that would be notified on insertion to the queue. This would ensure that threads will not get stuck sleeping on a task scheduled far in the future while new tasks are submitted.
Closures could be paired with time_point
s using a simple struct
like the one below or could be put into a std::pair
or std::tuple
.
struct timed_closure {
std::chrono::steady_clock::time_point submit_time;
std::function<void()> task;
timed_closure(std::chrono::steady_clock::time_point abs_time, std::function<void()> closure)
: submit_time(abs_time), task(closure) {};
bool operator <(const timed_closure& other){
return submit_time < other.submit_time;
}
};
The basic outline of a thread_pool implementing scheduled_executor
would be similar to the following.
I have already implemented a similar prioritized thread_pool on my GitHub however tasks are ordered by int
s rather than time_point
s.
class scheduled_thread_pool : scheduled_executor {
std::priority_queue<timed_closure> tasks;
std::vector<std::thread> workers;
std::mutex q_mutex;
std::conditon_variable cv;
scheduled_thread_pool(int num_threads){
for(size_t i = 0 ; i < numThreads; i++){
workers.emplace_back(std::thread(
[this] {
while(true){
//Get mutex
//Check if queue is empty or no tasks are eligible to run
//Calculate time until next task can be run
//Use wait_until on the condition variable if there is a pending task.
//Otherwise use regular wait if the queue is empty.
//Upon wake up check again
//If we can execute a task then continue
auto next_task = tasks.top();
tasks.pop();
lock.unlock(); //Unlock mutex
next_task();
}
}
));
}
}
virtual submit_at(chrono::system_clock::time_point abs_time, std::function<void()> closure){
//Get Scoped q_mutex
task.push(timed_closure(abs_time, closure);
cv.notify_one();
}
virtual submit_after(chrono::system_clock::duration rel_time, std::function<void()> closure){
//Get Scoped q_mutex
//Calculate time_point using now + rel_time
task.push(timed_closure(now + rel_time, closure);
cv.notify_one();
}
virtual submit( std::function<void()> closure){
//Get Scoped q_mutex
task.push(timed_closure(NOW, closure);
cv.notify_one();
}
};
Additional Work
At the moment there is no method for getting futures from the closures that are submitted. Here is a basic implementation that would allow this generically. This could be static member function or just a regular function in the Boost.Thread
namespace.
#include <future>
#include <functional>
class executor {
virtual void add(std::function<void()> f) = 0;
};
template<class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type >
submit(executor& exec, F&& f, Args&&... args) {
typedef typename std::result_of<F(Args...)>::type return_type;
auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...) );
std::future<return_type> result = task->get_future();
exec.add( [task](){ (*task)(); });
return result;
}
Additionally it would be nice to extend std::async
to provide similar behaviour as above but using a new launch policy that would submit tasks to the global_default_executor
as described in Revision 3 on the working paper on which this proposal is based.
Another project would be implementing a priority thread pool where tasks can be submitted with a priority.
Finally I would be willing to finish or at least work on any incomplete aspects of the executors
library.
Additional Request from Boost Community
-
Provide in addition a static polymorphic interface. The template functions submit_at and submit_after would accept any chrono
time_point
andduration
. -
Provide an adaptor that wraps an scheduled executor and makes it a polymorphic one.
-
Provide an adaptor that wraps an executor, manage the time based operations using a specific thread and forward all the operations to the wrapped executor.
Issues / Questions
- Which concrete classes should be implemented? Almost any
executor
that uses a queue can be swapped for apriority_queue
. - Is there a better way?
- Could classes be templated on the type of queue they use?
Timeline
Week 1-2 : Familiarize with Boost development tools, environment and Boost.Thread codebase. Week 3 : Finalize work plan, interfaces, class declarations, skeleton code. Week 4-6 : Work on a scheduled thread pool class. Including testing and documentation. This will be the foundation of the project. Week 7-8 : Work on Adaptors and utility functions. Week 9-12 : Work on other classes like priority thread pool, scheduled loop executor, etc. Week 12-14 : Finalize work. Develop tests and examples to showcase the work. Complete documentation of the project.