Dynamic thread pool example in boost :: asio

I am going to implement boost :: asio server with thread pool using one io_service( HTTP Server 3 example ). io_servicewill be bound to the unix socket of the domain and transmit requests coming from the connections of this socket to various streams. To reduce resource consumption, I want to make a dynamic thread pool.

Here is the concept. First, a single thread is created. When a request arrives and the server sees that there is no idle thread in the pool, it creates a new thread and passes the request to it. The server can create up to a certain maximum number of threads. Ideally, this should have the functionality of suspend threads that have been inactive for some time.

Has anyone done something like this? Or maybe someone has a corresponding example?

As for me, I have to somehow redefine io_service.dispatchin order to achieve this.

+5
source share
1 answer

There may be several problems with the initial approach:

  • boost::asio::io_servicenot intended for output or overriding. Note the lack of virtual functions.
  • If your thread library does not provide the ability to query the status of a thread, then status information should be managed separately.

, io_service, , io_service. , , , , , , . , .

, , deadline_timer.

  • deadline_timer 3 .
  • deadline_timer. 3 deadline_timer.
  • . , 2 , io_service , .

:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>

class thread_pool_checker
  : private boost::noncopyable
{
public:

  thread_pool_checker( boost::asio::io_service& io_service,
                       boost::thread_group& threads,
                       unsigned int max_threads,
                       long threshold_seconds,
                       long periodic_seconds )
    : io_service_( io_service ),
      timer_( io_service ),
      threads_( threads ),
      max_threads_( max_threads ),
      threshold_seconds_( threshold_seconds ),
      periodic_seconds_( periodic_seconds )
    {
      schedule_check();
    }

private:

  void schedule_check();
  void on_check( const boost::system::error_code& error );

private:

  boost::asio::io_service&    io_service_;
  boost::asio::deadline_timer timer_;
  boost::thread_group&        threads_;
  unsigned int                max_threads_;
  long                        threshold_seconds_;
  long                        periodic_seconds_;
};

void thread_pool_checker::schedule_check()
{
  // Thread pool is already at max size.
  if ( max_threads_ <= threads_.size() )
  {
    std::cout << "Thread pool has reached its max.  Example will shutdown."
              << std::endl;
    io_service_.stop();
    return;
  }

  // Schedule check to see if pool needs to increase.
  std::cout << "Will check if pool needs to increase in " 
            << periodic_seconds_ << " seconds." << std::endl;
  timer_.expires_from_now( boost::posix_time::seconds( periodic_seconds_ ) );
  timer_.async_wait( 
    boost::bind( &thread_pool_checker::on_check, this,
                 boost::asio::placeholders::error ) );
}

void thread_pool_checker::on_check( const boost::system::error_code& error )
{
  // On error, return early.
  if ( error ) return;

  // Check how long this job was waiting in the service queue.  This
  // returns the expiration time relative to now.  Thus, if it expired
  // 7 seconds ago, then the delta time is -7 seconds.
  boost::posix_time::time_duration delta = timer_.expires_from_now();
  long wait_in_seconds = -delta.seconds();

  // If the time delta is greater than the threshold, then the job
  // remained in the service queue for too long, so increase the
  // thread pool.
  std::cout << "Job job sat in queue for " 
            << wait_in_seconds << " seconds." << std::endl;
  if ( threshold_seconds_ < wait_in_seconds )
  {
    std::cout << "Increasing thread pool." << std::endl;
    threads_.create_thread(
      boost::bind( &boost::asio::io_service::run,
                   &io_service_ ) );
  }

  // Otherwise, schedule another pool check.
  run();
}

// Busy work functions.
void busy_work( boost::asio::io_service&,
                unsigned int );

void add_busy_work( boost::asio::io_service& io_service,
                    unsigned int count )
{
  io_service.post(
    boost::bind( busy_work,
                 boost::ref( io_service ),
                 count ) );
}

void busy_work( boost::asio::io_service& io_service,
                unsigned int count )
{
  boost::this_thread::sleep( boost::posix_time::seconds( 5 ) );

  count += 1;

  // When the count is 3, spawn additional busy work.
  if ( 3 == count )
  {
    add_busy_work( io_service, 0 );
  }
  add_busy_work( io_service, count );
}

int main()
{
  using boost::asio::ip::tcp;

  // Create io service.
  boost::asio::io_service io_service;

  // Add some busy work to the service.
  add_busy_work( io_service, 0 );

  // Create thread group and thread_pool_checker.
  boost::thread_group threads;
  thread_pool_checker checker( io_service, threads,
                               3,   // Max pool size.
                               2,   // Create thread if job waits for 2 sec.
                               3 ); // Check if pool needs to grow every 3 sec.

  // Start running the io service.
  io_service.run();

  threads.join_all();

  return 0;
}

:

Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 7 seconds.
Increasing thread pool.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 4 seconds.
Increasing thread pool.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 3 seconds.
Increasing thread pool.
Thread pool has reached its max.  Example will shutdown.
+5

All Articles