HiPipe  0.6.0
C++17 data pipeline with Python bindings.
rebatch.hpp
1 /****************************************************************************
2  * hipipe library
3  * Copyright (c) 2017, Cognexa Solutions s.r.o.
4  * Copyright (c) 2018, Iterait a.s.
5  * Author(s) Filip Matzner
6  *
7  * This file is distributed under the MIT License.
8  * See the accompanying file LICENSE.txt for the complete license agreement.
9  ****************************************************************************/
10 
11 #pragma once
12 
13 #include <hipipe/core/stream/stream_t.hpp>
14 
15 #include <range/v3/core.hpp>
16 #include <range/v3/view/all.hpp>
17 #include <range/v3/view/view.hpp>
18 
19 #include <algorithm>
20 
21 namespace hipipe::stream {
22 
23 
24 template <typename Rng>
25 struct rebatch_view : ranges::view_facade<rebatch_view<Rng>> {
26 private:
28  friend ranges::range_access;
30  Rng rng_;
31  std::size_t n_;
32 
33  struct cursor {
34  private:
35  rebatch_view<Rng>* rng_ = nullptr;
36  ranges::iterator_t<Rng> it_ = {};
37 
38  // the batch into which we accumulate the data
39  // the batch will be a pointer to allow moving from it in const functions
40  std::shared_ptr<batch_t> batch_;
41 
42  // the subbatch of the original range
43  std::shared_ptr<batch_t> subbatch_;
44 
45  // whether the underlying range is at the end of iteration
46  bool done_ = false;
47 
48  // find the first non-empty subbatch and return if successful
49  bool find_next()
50  {
51  while (subbatch_->batch_size() == 0) {
52  if (it_ == ranges::end(rng_->rng_) || ++it_ == ranges::end(rng_->rng_)) {
53  return false;
54  }
55  subbatch_ = std::make_shared<batch_t>(*it_);
56  }
57  return true;
58  }
59 
60  // fill the batch_ with the elements from the current subbatch_
61  void fill_batch()
62  {
63  do {
64  assert(batch_->batch_size() < rng_->n_);
65  std::size_t to_take =
66  std::min(rng_->n_ - batch_->batch_size(), subbatch_->batch_size());
67  batch_->push_back(subbatch_->take(to_take));
68  } while (batch_->batch_size() < rng_->n_ && find_next());
69  }
70 
71  public:
72  using single_pass = std::true_type;
73 
74  cursor() = default;
75 
76  explicit cursor(rebatch_view<Rng>& rng)
77  : rng_{&rng}
78  , it_{ranges::begin(rng_->rng_)}
79  {
80  // do nothing if the subrange is empty
81  if (it_ == ranges::end(rng_->rng_)) {
82  done_ = true;
83  } else {
84  subbatch_ = std::make_shared<batch_t>(*it_);
85  next();
86  }
87  }
88 
89  batch_t&& read() const
90  {
91  return std::move(*batch_);
92  }
93 
94  bool equal(ranges::default_sentinel) const
95  {
96  return done_;
97  }
98 
99  bool equal(const cursor& that) const
100  {
101  assert(rng_ == that.rng_);
102  return it_ == that.it_ && subbatch_->batch_size() == that.subbatch_->batch_size();
103  }
104 
105  void next()
106  {
107  batch_ = std::make_shared<batch_t>();
108  if (find_next()) fill_batch();
109  else done_ = true;
110  }
111  }; // struct cursor
112 
113  cursor begin_cursor() { return cursor{*this}; }
114 
115 public:
116  rebatch_view() = default;
117  rebatch_view(Rng rng, std::size_t n)
118  : rng_{rng}
119  , n_{n}
120  {
121  if (n_ <= 0) {
122  throw std::invalid_argument{"hipipe::stream::rebatch:"
123  " The new batch size " + std::to_string(n_) + " is not strictly positive."};
124  }
125  }
126 }; // class rebatch_view
127 
128 class rebatch_fn {
129 private:
131  friend ranges::view::view_access;
133 
134  static auto bind(rebatch_fn rebatch, std::size_t n)
135  {
136  return ranges::make_pipeable(std::bind(rebatch, std::placeholders::_1, n));
137  }
138 
139 public:
140  template <typename Rng, CONCEPT_REQUIRES_(ranges::InputRange<Rng>())>
141  rebatch_view<ranges::view::all_t<Rng>> operator()(Rng&& rng, std::size_t n) const
142  {
143  return {ranges::view::all(std::forward<Rng>(rng)), n};
144  }
145 }; // class rebatch_fn
146 
147 
168 inline ranges::view::view<rebatch_fn> rebatch{};
169 
170 } // namespace hipipe::stream
ranges::view::view< rebatch_fn > rebatch
Accumulate the stream and yield batches of a different size.
Definition: rebatch.hpp:168
std::string to_string(const T &value)
Convert the given type to std::string.
Definition: string.hpp:91