Google Summer of Code 2014 Boost.Thread Proposal

With the introduction of <thread> C++ programmers now have a standardized method for thread creation. Unfortunately the management of these threads is still largely left to the programmer to handle. Common tools such as thread pools are a basic feature of many other languages such as Java and C# that make thread management significantly easier.

This a proposal is for adding executors, basic thread management tools, into Boost.Thread. It is based on the following proposals from openstd.org and a partial implementation that has already been started.

The core idea behind executor are that work units are created as std::function<void>()> types and queued in some manner such that they can be executed at some point in the future.

Closures can be create by using lambda or using std::bind. Here are a few examples from cppreference.com

void print_num(int i) {
    std::cout << i << '\n';
}

// store a lambda
std::function<void()> f_display_42 = []() { print_num(42); };
f_display_42();

// store the result of a call to std::bind
std::function<void()> f_display_31337 = std::bind(print_num, 31337);
f_display_31337();

These work units can thus be stored and executed via the () operator at a later point in time.

Using closures a basic fixed size thread pool can be created in a few lines. Obviously the following implementation is not thread safe and incomplete but just servers as quick example.

class thread_pool {
    std::queue<std::function<void()>> tasks;
    std::vector<std::thread> workers;

    thread_pool(int num_threads) {
        for(int i = 0; i < num_threads: i++){
            workers.emplace_back(std::thread([this](){
                while(1){
                    auto work = tasks.front();
                    tasks.pop();
                    work();
                }
            });
        }
    }

    void submit(std::function closure){
        tasks.push(closure);
    }
};

A complete implementation can be found on my GitHub.

This proposal will focus on the scheduled_executor interface and its concrete implementation as work has already started on other executors such as thread_pool and serial_executor.

The scheduled_executor interface extends the executor interface and includes the new functions submit_at and submit_after. Both interfaces are shown below.

class executor {
    virtual ~exector();
    virtual void submit(std::function<void()> closure) = 0;
    virtual int num_pending() = 0;
};

class scheduled_executor : executor {
    virtual submit_at(chrono::system_clock::time_point abs_time, std::function<void()> closure) = 0;
    virtual submit_after(chrono::system_clock::duration rel_time, std::function<void()> closure) = 0;
}

The submit_at function accepts a time_point and a closure and guarantees that the submitted closure will not be scheduled to run until, or after, the give time point.

The submit_after function accepts a duration and a closure and guarantees that the submitted closure will not be scheduled to run until duration (nano/mirco/mille/seconds/minutes) from when it is submitted.

On possible implementation would be based on a priority queue. Tasks would be submitted to the queue which would be sorted based on time_point's relation operators. Therefore the closure that was submitted with least time_point would be at the front of the queue. Closures submitted with a duration would be changed to time_points based on the time of submission. Submissions using submit could default to using NOW as their time_point.

Worker thread(s) would then query the front of the queue to see if the next task is eligible to be executed. If not then the worker thread could calculate the amount until the task will eligible to be executed then use wait_for or wait_until on a condition variable that would be notified on insertion to the queue. This would ensure that threads will not get stuck sleeping on a task scheduled far in the future while new tasks are submitted.

Closures could be paired with time_points using a simple struct like the one below or could be put into a std::pair or std::tuple.

struct timed_closure {
    std::chrono::steady_clock::time_point submit_time;
    std::function<void()> task;

    timed_closure(std::chrono::steady_clock::time_point abs_time, std::function<void()> closure) 
	: submit_time(abs_time), task(closure) {};

    bool operator <(const timed_closure& other){
        return submit_time < other.submit_time;
    }
};

The basic outline of a thread_pool implementing scheduled_executor would be similar to the following. I have already implemented a similar prioritized thread_pool on my GitHub however tasks are ordered by ints rather than time_points.

class scheduled_thread_pool : scheduled_executor {
    std::priority_queue<timed_closure> tasks;
    std::vector<std::thread> workers;

    std::mutex q_mutex;
    std::conditon_variable cv;

    scheduled_thread_pool(int num_threads){
         for(size_t i = 0 ; i < numThreads; i++){
            workers.emplace_back(std::thread(
            [this] {
            while(true){
                //Get mutex
                //Check if queue is empty or no tasks are eligible to run
                //Calculate time until next task can be run
                //Use wait_until on the condition variable if there is a pending task.
                //Otherwise use regular wait if the queue is empty. 
                //Upon wake up check again
                //If we can execute a task then continue
                auto next_task = tasks.top();
                tasks.pop();
                lock.unlock(); //Unlock mutex
                next_task();
            }
            }
            ));
        }
    }

    virtual submit_at(chrono::system_clock::time_point abs_time, std::function<void()> closure){
        //Get Scoped q_mutex
        task.push(timed_closure(abs_time, closure);
        cv.notify_one();
    }

    virtual submit_after(chrono::system_clock::duration rel_time, std::function<void()> closure){
        //Get Scoped q_mutex
        //Calculate time_point using now + rel_time
        task.push(timed_closure(now + rel_time, closure);
        cv.notify_one();
     }

     virtual submit( std::function<void()> closure){
        //Get Scoped q_mutex
        task.push(timed_closure(NOW, closure);
        cv.notify_one();
     }
};

Additional Work

At the moment there is no method for getting futures from the closures that are submitted. Here is a basic implementation that would allow this generically. This could be static member function or just a regular function in the Boost.Thread namespace.

#include <future>
#include <functional>

class executor {
    virtual void add(std::function<void()> f) = 0;
};

template<class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type >
submit(executor& exec, F&& f, Args&&... args) {
    typedef typename std::result_of<F(Args...)>::type return_type;

    auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...) );

    std::future<return_type> result = task->get_future();

    exec.add( [task](){ (*task)(); });

    return result;

}

Additionally it would be nice to extend std::async to provide similar behaviour as above but using a new launch policy that would submit tasks to the global_default_executor as described in Revision 3 on the working paper on which this proposal is based.

Another project would be implementing a priority thread pool where tasks can be submitted with a priority.

Finally I would be willing to finish or at least work on any incomplete aspects of the executors library.

Additional Request from Boost Community

  1. Provide in addition a static polymorphic interface. The template functions submit_at and submit_after would accept any chrono time_point and duration.

  2. Provide an adaptor that wraps an scheduled executor and makes it a polymorphic one.

  3. Provide an adaptor that wraps an executor, manage the time based operations using a specific thread and forward all the operations to the wrapped executor.

Issues / Questions

  1. Which concrete classes should be implemented? Almost any executor that uses a queue can be swapped for a priority_queue.
  2. Is there a better way?
  3. Could classes be templated on the type of queue they use?

Timeline

Week 1-2 : Familiarize with Boost development tools, environment and Boost.Thread codebase.
Week 3 : Finalize work plan, interfaces, class declarations, skeleton code.
Week 4-6 : Work on a scheduled thread pool class. Including testing and documentation. This will be the foundation of the project.
Week 7-8 : Work on Adaptors and utility functions.
Week 9-12 : Work on other classes like priority thread pool, scheduled loop executor, etc.
Week 12-14 : Finalize work. Develop tests and examples to showcase the work. Complete documentation of the project.



View Ian Forbes's profile on LinkedIn

Android app on Google Play