HiPipe  0.6.0
C++17 data pipeline with Python bindings.
buffer.hpp
1 /****************************************************************************
2  * hipipe library
3  * Copyright (c) 2017, Cognexa Solutions s.r.o.
4  * Copyright (c) 2018, Iterait a.s.
5  * Author(s) Filip Matzner
6  *
7  * This file is distributed under the MIT License.
8  * See the accompanying file LICENSE.txt for the complete license agreement.
9  ****************************************************************************/
10 
11 #ifndef HIPIPE_CORE_STREAM_BUFFER_HPP
12 #define HIPIPE_CORE_STREAM_BUFFER_HPP
13 
14 #include <hipipe/core/thread.hpp>
15 
16 #include <range/v3/core.hpp>
17 #include <range/v3/view/all.hpp>
18 #include <range/v3/view/view.hpp>
19 
20 #include <climits>
21 #include <deque>
22 #include <future>
23 #include <memory>
24 
25 namespace hipipe::stream {
26 
27 
28 template<typename Rng>
29 struct buffer_view : ranges::view_facade<buffer_view<Rng>> {
30 private:
32  friend ranges::range_access;
34 
35  Rng rng_;
36  std::size_t n_;
37 
38  struct cursor {
39  private:
40  buffer_view<Rng>* rng_ = nullptr;
41  ranges::iterator_t<Rng> it_ = {};
42  using value_type = ranges::range_value_type_t<Rng>;
43  using reference_type = value_type&;
44 
45  std::size_t n_;
46 
47  // std::shared_future only allows retrieving the shared state via
48  // a const reference. Therefore, we store the computed results
49  // on heap (shared_ptr) and return references to those objects
50  // (non-const references).
51  std::deque<std::shared_future<std::shared_ptr<value_type>>> buffer_;
52 
53  void pop_buffer()
54  {
55  if (!buffer_.empty()) {
56  buffer_.pop_front();
57  }
58  }
59 
60  void fill_buffer()
61  {
62  while (it_ != ranges::end(rng_->rng_) && buffer_.size() < n_) {
63  auto task = [it = it_]() { return std::make_shared<value_type>(*it); };
64  buffer_.emplace_back(global_thread_pool.enqueue(std::move(task)));
65  ++it_;
66  }
67  }
68 
69  public:
70  using single_pass = std::true_type;
71 
72  cursor() = default;
73 
74  explicit cursor(buffer_view<Rng>& rng)
75  : rng_{&rng}
76  , it_{ranges::begin(rng.rng_)}
77  , n_{rng.n_}
78  {
79  fill_buffer();
80  }
81 
82  value_type&& read() const
83  {
84  return std::move(*buffer_.front().get());
85  }
86 
87  bool equal(ranges::default_sentinel) const
88  {
89  return buffer_.empty() && it_ == ranges::end(rng_->rng_);
90  }
91 
92  bool equal(const cursor& that) const
93  {
94  assert(rng_ == that.rng_);
95  return n_ == that.n_ && it_ == that.it_;
96  }
97 
98  void next()
99  {
100  pop_buffer();
101  fill_buffer();
102  }
103  }; // class buffer_view
104 
105  cursor begin_cursor()
106  {
107  return cursor{*this};
108  }
109 
110 public:
111  buffer_view() = default;
112 
113  buffer_view(Rng rng, std::size_t n)
114  : rng_{rng}
115  , n_{n}
116  {
117  }
118 
119  CONCEPT_REQUIRES(ranges::SizedRange<Rng const>())
120  constexpr ranges::range_size_type_t<Rng> size() const
121  {
122  return ranges::size(rng_);
123  }
124 
125  CONCEPT_REQUIRES(ranges::SizedRange<Rng>())
126  constexpr ranges::range_size_type_t<Rng> size()
127  {
128  return ranges::size(rng_);
129  }
130 };
131 
132 class buffer_fn {
133 private:
135  friend ranges::view::view_access;
137 
138  static auto bind(buffer_fn buffer, std::size_t n = std::numeric_limits<std::size_t>::max())
139  {
140  return ranges::make_pipeable(std::bind(buffer, std::placeholders::_1, n));
141  }
142 
143 public:
144  template<typename Rng, CONCEPT_REQUIRES_(ranges::ForwardRange<Rng>())>
145  buffer_view<ranges::view::all_t<Rng>>
146  operator()(Rng&& rng, std::size_t n = std::numeric_limits<std::size_t>::max()) const
147  {
148  return {ranges::view::all(std::forward<Rng>(rng)), n};
149  }
150 
152  template<typename Rng, CONCEPT_REQUIRES_(!ranges::ForwardRange<Rng>())>
153  void operator()(Rng&&, std::size_t n = 0) const
154  {
155  CONCEPT_ASSERT_MSG(ranges::ForwardRange<Rng>(),
156  "stream::buffer only works on ranges satisfying the ForwardRange concept.");
157  }
159 };
160 
181 inline ranges::view::view<buffer_fn> buffer{};
182 
183 } // end namespace hipipe::stream
184 #endif
std::future< std::result_of_t< Fun(Args...)> > enqueue(Fun fun, Args... args)
Definition: thread.hpp:67
ranges::view::view< buffer_fn > buffer
Asynchronously buffers the given range.
Definition: buffer.hpp:181
static thread_pool global_thread_pool
Global thread pool object.
Definition: thread.hpp:98