How to use C ++ 11 <thread> designing a system that extracts data from sources

This question comes from: C ++ 11 thread does not work with virtual member function

As stated in the comment, my question in the previous post may be wrong, so here is the original question:

I want to create a capture system that will query multiple sources at a constant / dynamic frequency (it depends on the sources, say, 10 times / sec) and extract data in each queue. while sources are not fixed, they can add / remove at runtime.

and there is a monitor that draws out of the queues with a constant frequency and displays the data.

So what is the best design pattern or structure for this problem.

I'm trying to create a list for all source pullers, and each puller has a thread and a specified pull function (somehow the pull function can interact with the puller, say, if the source is depleted, it will ask you to stop the pull process on this thread.)

+3
source share
1 answer

If the operation in which you request the source is blocking (or you have a lot of them), you do not need to use threads for this. We could start with Producerone that will work with synchronous or asynchronous (streaming) sending:

template <typename OutputType>
class Producer
{
    std::list<OutputType> output;

protected:
    int poll_interval; // seconds? milliseconds?
    virtual OutputType query() = 0;

public:
    virtual ~Producer();

    int next_poll_interval() const { return poll_interval; }
    void poll() { output.push_back(this->query()); }

    std::size_t size() { return output.size(); }
    // whatever accessors you need for the queue here:
    // pop_front, swap entire list, etc.
};

Producer query . poll_interval query. , .

template <typename OutputType>
class ThreadDispatcher
{
    Producer<OutputType> *producer;
    bool shutdown;
    std::thread thread;

    static void loop(ThreadDispatcher *self)
    {
        Producer<OutputType> *producer = self->producer;

        while (!self->shutdown)
        {
            producer->poll();
            // some mechanism to pass the produced values back to the owner
            auto delay = // assume millis for sake of argument
                std::chrono::milliseconds(producer->next_poll_interval());
            std::this_thread::sleep_for(delay);
        }
    }

public:
    explicit ThreadDispatcher(Producer<OutputType> *p)
      : producer(p), shutdown(false), thread(loop, this)
    {
    }

    ~ThreadDispatcher()
    {
        shutdown = true;
        thread.join();
    }

    // again, the accessors you need for reading produced values go here
    // Producer::output isn't synchronised, so you can't expose it directly
    // to the calling thread
};

, , , . , , , .

, - , , .

, , select/poll - Boost.Asio .

+2

All Articles