HiPipe  0.7.0
C++17 data pipeline with Python bindings.
thread.hpp
1 /****************************************************************************
2  * hipipe library
3  * Copyright (c) 2017, Cognexa Solutions s.r.o.
4  * Copyright (c) 2018, Iterait a.s.
5  * Author(s) Filip Matzner
6  *
7  * This file is distributed under the MIT License.
8  * See the accompanying file LICENSE.txt for the complete license agreement.
9  ****************************************************************************/
11 
12 #ifndef HIPIPE_CORE_THREAD_HPP
13 #define HIPIPE_CORE_THREAD_HPP
14 
15 #include <boost/asio.hpp>
16 #include <boost/thread/thread.hpp>
17 
18 #include <cmath>
19 #include <functional>
20 #include <future>
21 #include <experimental/optional>
22 #include <thread>
23 
24 namespace hipipe {
25 
31 class thread_pool {
32 private:
33  boost::asio::io_service service_;
34  std::experimental::optional<boost::asio::io_service::work> work_{service_};
35  boost::thread_group threads_;
36 
37 public:
38 
42  thread_pool(unsigned n_threads = std::thread::hardware_concurrency())
43  {
44  // always use at least a single thread
45  n_threads = std::max(1u, n_threads);
46  // populate the thread pool
47  for (unsigned i = 0; i < n_threads; ++i) {
48  threads_.create_thread([this]() { return this->service_.run(); });
49  }
50  }
51 
66  template<typename Fun, typename... Args>
67  std::future<std::result_of_t<Fun(Args...)>> enqueue(Fun fun, Args... args)
68  {
69  using Ret = std::result_of_t<Fun(Args...)>;
70  std::packaged_task<Ret(Args...)> task{fun};
71  std::future<Ret> future = task.get_future();
72  // Packaged task is non-copyable, so ASIO's post() method cannot handle it.
73  // So we build a shared_ptr of the task and post a lambda
74  // dereferencing and running the task stored in the pointer.
75  auto shared_task = std::make_shared<std::packaged_task<Ret(Args...)>>(std::move(task));
76  auto shared_args = std::make_shared<std::tuple<Args...>>(std::move(args)...);
77  auto asio_task = [task = std::move(shared_task), args = std::move(shared_args)]() {
78  return std::apply(std::move(*task), std::move(*args));
79  };
80  service_.post(std::move(asio_task));
81  return future;
82  }
83 
87  ~thread_pool()
88  {
89  work_ = std::experimental::nullopt;
90  threads_.join_all();
91  }
92 };
93 
98 static thread_pool global_thread_pool;
99 
100 } // namespace hipipe
101 #endif
hipipe::thread_pool::thread_pool
thread_pool(unsigned n_threads=std::thread::hardware_concurrency())
Definition: thread.hpp:41
hipipe::global_thread_pool
static thread_pool global_thread_pool
Global thread pool object.
Definition: thread.hpp:97
hipipe::thread_pool::~thread_pool
~thread_pool()
Definition: thread.hpp:86
hipipe::thread_pool::enqueue
std::future< std::result_of_t< Fun(Args...)> > enqueue(Fun fun, Args... args)
Definition: thread.hpp:66