HiPipe  0.7.0
C++17 data pipeline with Python bindings.
buffer.hpp
1 /****************************************************************************
2  * hipipe library
3  * Copyright (c) 2017, Cognexa Solutions s.r.o.
4  * Copyright (c) 2018, Iterait a.s.
5  * Author(s) Filip Matzner
6  *
7  * This file is distributed under the MIT License.
8  * See the accompanying file LICENSE.txt for the complete license agreement.
9  ****************************************************************************/
10 
11 #ifndef HIPIPE_CORE_STREAM_BUFFER_HPP
12 #define HIPIPE_CORE_STREAM_BUFFER_HPP
13 
14 #include <hipipe/core/thread.hpp>
15 
16 #include <range/v3/core.hpp>
17 #include <range/v3/view/all.hpp>
18 #include <range/v3/view/view.hpp>
19 
20 #include <climits>
21 #include <deque>
22 #include <future>
23 #include <memory>
24 
25 namespace hipipe::stream {
26 
27 namespace rgv = ranges::views;
28 
29 template<typename Rng>
30 struct buffer_view : ranges::view_facade<buffer_view<Rng>> {
31 private:
33  friend ranges::range_access;
35 
36  Rng rng_;
37  std::size_t n_;
38 
39  struct cursor {
40  private:
41  buffer_view<Rng>* rng_ = nullptr;
42  ranges::iterator_t<Rng> it_ = {};
43  using value_type = ranges::range_value_t<Rng>;
44  using reference_type = value_type&;
45 
46  std::size_t n_;
47 
48  // std::shared_future only allows retrieving the shared state via
49  // a const reference. Therefore, we store the computed results
50  // on heap (shared_ptr) and return references to those objects
51  // (non-const references).
52  std::deque<std::shared_future<std::shared_ptr<value_type>>> buffer_;
53 
54  void pop_buffer()
55  {
56  if (!buffer_.empty()) {
57  buffer_.pop_front();
58  }
59  }
60 
61  void fill_buffer()
62  {
63  while (it_ != ranges::end(rng_->rng_) && buffer_.size() < n_) {
64  auto task = [it = it_]() { return std::make_shared<value_type>(*it); };
65  buffer_.emplace_back(global_thread_pool.enqueue(std::move(task)));
66  ++it_;
67  }
68  }
69 
70  public:
71  using single_pass = std::true_type;
72 
73  cursor() = default;
74 
75  explicit cursor(buffer_view<Rng>& rng)
76  : rng_{&rng}
77  , it_{ranges::begin(rng.rng_)}
78  , n_{rng.n_}
79  {
80  fill_buffer();
81  }
82 
83  value_type&& read() const
84  {
85  return std::move(*buffer_.front().get());
86  }
87 
88  bool equal(ranges::default_sentinel_t) const
89  {
90  return buffer_.empty() && it_ == ranges::end(rng_->rng_);
91  }
92 
93  bool equal(const cursor& that) const
94  {
95  assert(rng_ == that.rng_);
96  return n_ == that.n_ && it_ == that.it_;
97  }
98 
99  void next()
100  {
101  pop_buffer();
102  fill_buffer();
103  }
104 
105  ~cursor()
106  {
107  for (auto& future : buffer_) {
108  if (future.valid()) future.wait();
109  }
110  }
111  }; // class buffer_view
112 
113  cursor begin_cursor()
114  {
115  return cursor{*this};
116  }
117 
118 public:
119  buffer_view() = default;
120 
121  buffer_view(Rng rng, std::size_t n)
122  : rng_{rng}
123  , n_{n}
124  {
125  }
126 
127  CPP_template(int dummy = 0)(requires ranges::sized_range<const Rng>)
128  constexpr ranges::range_size_type_t<Rng> size() const
129  {
130  return ranges::size(rng_);
131  }
132 
133  CPP_template(int dummy = 0)(requires ranges::sized_range<const Rng>)
134  constexpr ranges::range_size_type_t<Rng> size()
135  {
136  return ranges::size(rng_);
137  }
138 };
139 
140 class buffer_fn {
141 private:
143  friend rgv::view_access;
145 
146  static auto bind(buffer_fn buffer, std::size_t n = std::numeric_limits<std::size_t>::max())
147  {
148  return ranges::make_pipeable(std::bind(buffer, std::placeholders::_1, n));
149  }
150 
151 public:
152  CPP_template(typename Rng)(requires ranges::forward_range<Rng>)
153  buffer_view<rgv::all_t<Rng>>
154  operator()(Rng&& rng, std::size_t n = std::numeric_limits<std::size_t>::max()) const
155  {
156  return {rgv::all(std::forward<Rng>(rng)), n};
157  }
158 
160  CPP_template(typename Rng)(requires !ranges::forward_range<Rng>)
161  void operator()(Rng&&, std::size_t n = 0) const
162  {
163  CONCEPT_ASSERT_MSG(ranges::forward_range<Rng>(),
164  "stream::buffer only works on ranges satisfying the forward_range concept.");
165  }
167 };
168 
189 inline rgv::view<buffer_fn> buffer{};
190 
191 } // end namespace hipipe::stream
192 #endif
hipipe::global_thread_pool
static thread_pool global_thread_pool
Global thread pool object.
Definition: thread.hpp:97
hipipe::utility::CPP_template
CPP_template(class Rng)(requires ranges
Unzips a range of tuples to a tuple of std::vectors.
Definition: tuple.hpp:255
hipipe::stream::buffer
rgv::view< buffer_fn > buffer
Asynchronously buffers the given range.
Definition: buffer.hpp:195
hipipe::thread_pool::enqueue
std::future< std::result_of_t< Fun(Args...)> > enqueue(Fun fun, Args... args)
Definition: thread.hpp:66