11 #ifndef HIPIPE_CORE_STREAM_BUFFER_HPP
12 #define HIPIPE_CORE_STREAM_BUFFER_HPP
14 #include <hipipe/core/thread.hpp>
16 #include <range/v3/core.hpp>
17 #include <range/v3/view/all.hpp>
18 #include <range/v3/view/view.hpp>
25 namespace hipipe::stream {
27 namespace rgv = ranges::views;
29 template<
typename Rng>
30 struct buffer_view : ranges::view_facade<buffer_view<Rng>> {
33 friend ranges::range_access;
41 buffer_view<Rng>* rng_ =
nullptr;
42 ranges::iterator_t<Rng> it_ = {};
43 using value_type = ranges::range_value_t<Rng>;
44 using reference_type = value_type&;
52 std::deque<std::shared_future<std::shared_ptr<value_type>>> buffer_;
56 if (!buffer_.empty()) {
63 while (it_ != ranges::end(rng_->rng_) && buffer_.size() < n_) {
64 auto task = [it = it_]() {
return std::make_shared<value_type>(*it); };
71 using single_pass = std::true_type;
75 explicit cursor(buffer_view<Rng>& rng)
77 , it_{ranges::begin(rng.rng_)}
83 value_type&& read()
const
85 return std::move(*buffer_.front().get());
88 bool equal(ranges::default_sentinel_t)
const
90 return buffer_.empty() && it_ == ranges::end(rng_->rng_);
93 bool equal(
const cursor& that)
const
95 assert(rng_ == that.rng_);
96 return n_ == that.n_ && it_ == that.it_;
107 for (
auto& future : buffer_) {
108 if (future.valid()) future.wait();
113 cursor begin_cursor()
115 return cursor{*
this};
119 buffer_view() =
default;
121 buffer_view(Rng rng, std::size_t n)
127 CPP_template(
int dummy = 0)(requires ranges::sized_range<const Rng>)
128 constexpr ranges::range_size_type_t<Rng> size()
const
130 return ranges::size(rng_);
133 CPP_template(
int dummy = 0)(requires ranges::sized_range<const Rng>)
134 constexpr ranges::range_size_type_t<Rng> size()
136 return ranges::size(rng_);
143 friend rgv::view_access;
146 static auto bind(buffer_fn
buffer, std::size_t n = std::numeric_limits<std::size_t>::max())
148 return ranges::make_pipeable(std::bind(
buffer, std::placeholders::_1, n));
152 CPP_template(
typename Rng)(requires ranges::forward_range<Rng>)
153 buffer_view<rgv::all_t<Rng>>
154 operator()(Rng&& rng, std::size_t n = std::numeric_limits<std::size_t>::max())
const
156 return {rgv::all(std::forward<Rng>(rng)), n};
160 CPP_template(
typename Rng)(requires !ranges::forward_range<Rng>)
161 void operator()(Rng&&, std::size_t n = 0)
const
163 CONCEPT_ASSERT_MSG(ranges::forward_range<Rng>(),
164 "stream::buffer only works on ranges satisfying the forward_range concept.");
189 inline rgv::view<buffer_fn>
buffer{};