Developing a C ++ concurrency library with futures or a similar paradigm

I am working on a C ++ project that needs to run many jobs in threadpool. Jobs are error prone, which means I need to know how each job completes after it is completed. Being a Java programmer for the most part, I like the idea of ​​using "futures" or a similar paradigm, akin to the various classes in the Java util.concurrent package.

I have two questions: firstly, something like this already exists for C ++ (I did not find anything in Boost, but maybe I don't look complicated enough); and secondly, is that even a reasonable idea for C ++?

I found a brief example of what I'm trying to do here:

http://www.boostcookbook.com/Recipe:/1234841

Does this approach make sense?

+3
source share
3 answers

Futures are present in the upcoming standard (C ++ 0x) and inside boost. Note that although the main name is the futuresame, you will need to read through the documentation to find other types and understand the semantics. I don’t know Java futures, so I can’t tell you where they differ if they do.

The boost library was written by Anthony Williams , who I believe was also involved in defining this part of the standard. He also wrote C ++ Concurrency in action , which includes a good description of futures, tasks, promises, and related objects. His company also sells full and pre-implementation C ++ 0x thread libraries if you are interested.

+5

Boost .

, get() boost::unique_future, , .

- :

#pragma once

#include <tbb/concurrent_queue.h>

#include <boost/thread.hpp>
#include <boost/noncopyable.hpp>

#include <functional>

namespace internal
{
    template<typename T>
    struct move_on_copy
    {
        move_on_copy(const move_on_copy<T>& other) : value(std::move(other.value)){}
        move_on_copy(T&& value) : value(std::move(value)){}
        mutable T value;
    };

    template<typename T>
    move_on_copy<T> make_move_on_copy(T&& value)
    {
        return move_on_copy<T>(std::move(value));
    }
}

class executor : boost::noncopyable
{
    boost::thread thread_;

    tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;

    template<typename Func>
    auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept
    {   
        typedef boost::packaged_task<decltype(func())> task_type;

        auto task = task_type(std::forward<Func>(func));            

        task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
        {
            try
            {
                if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.
                    my_task();
            }
            catch(boost::task_already_started&){}
        }));

        return std::move(task);
    }

public:

    explicit executor() // noexcept
    {
        thread_ = boost::thread([this]{run();});
    }

    ~executor() // noexcept
    {   
        execution_queue_.push(nullptr); // Wake the execution thread.
        thread_.join();
    }

    template<typename Func>
    auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept
    {   
        // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
        auto task_adaptor = internal::make_move_on_copy(create_task(func));

        auto future = task_adaptor.value.get_future();

        execution_queue_.push([=]
        {
            try{task_adaptor.value();}
            catch(boost::task_already_started&){}
        });

        return std::move(future);       
    }

    template<typename Func>
    auto invoke(Func&& func) -> decltype(func()) // noexcept
    {
        if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.
            return func();

        return begin_invoke(std::forward<Func>(func), prioriy).get();
    }

private:

    void run() // noexcept
    {
        while(true)
        {
            std::function<void()> func;
            execution_queue_.pop(func); 
                    if(!func)
                       break;
            func();
        }
    }   
};
+5

C ++ templates are less restrictive than Java Generics, so "The future can be easily ported with them and thread synchronization primitives. As for existing libraries that support this mechanism, I hope someone else knows about it.

-1
source

All Articles