HiPipe  0.7.0
C++17 data pipeline with Python bindings.
rebatch.hpp
1 /****************************************************************************
2  * hipipe library
3  * Copyright (c) 2017, Cognexa Solutions s.r.o.
4  * Copyright (c) 2018, Iterait a.s.
5  * Author(s) Filip Matzner
6  *
7  * This file is distributed under the MIT License.
8  * See the accompanying file LICENSE.txt for the complete license agreement.
9  ****************************************************************************/
10 
11 #pragma once
12 
13 #include <hipipe/core/stream/stream_t.hpp>
14 
15 #include <range/v3/core.hpp>
16 #include <range/v3/view/all.hpp>
17 #include <range/v3/view/view.hpp>
18 
19 #include <algorithm>
20 
21 namespace hipipe::stream {
22 
23 namespace rgv = ranges::views;
24 
25 template <typename Rng>
26 struct rebatch_view : ranges::view_facade<rebatch_view<Rng>> {
27 private:
29  friend ranges::range_access;
31  Rng rng_;
32  std::size_t n_;
33 
34  struct cursor {
35  private:
36  rebatch_view<Rng>* rng_ = nullptr;
37  ranges::iterator_t<Rng> it_ = {};
38 
39  // the batch into which we accumulate the data
40  // the batch will be a pointer to allow moving from it in const functions
41  std::shared_ptr<batch_t> batch_;
42 
43  // the subbatch of the original range
44  std::shared_ptr<batch_t> subbatch_;
45 
46  // whether the underlying range is at the end of iteration
47  bool done_ = false;
48 
49  // find the first non-empty subbatch and return if successful
50  bool find_next()
51  {
52  while (subbatch_->batch_size() == 0) {
53  if (it_ == ranges::end(rng_->rng_) || ++it_ == ranges::end(rng_->rng_)) {
54  return false;
55  }
56  subbatch_ = std::make_shared<batch_t>(*it_);
57  }
58  return true;
59  }
60 
61  // fill the batch_ with the elements from the current subbatch_
62  void fill_batch()
63  {
64  do {
65  assert(batch_->batch_size() < rng_->n_);
66  std::size_t to_take =
67  std::min(rng_->n_ - batch_->batch_size(), subbatch_->batch_size());
68  batch_->push_back(subbatch_->take(to_take));
69  } while (batch_->batch_size() < rng_->n_ && find_next());
70  }
71 
72  public:
73  using single_pass = std::true_type;
74 
75  cursor() = default;
76 
77  explicit cursor(rebatch_view<Rng>& rng)
78  : rng_{&rng}
79  , it_{ranges::begin(rng_->rng_)}
80  {
81  // do nothing if the subrange is empty
82  if (it_ == ranges::end(rng_->rng_)) {
83  done_ = true;
84  } else {
85  subbatch_ = std::make_shared<batch_t>(*it_);
86  next();
87  }
88  }
89 
90  batch_t&& read() const
91  {
92  return std::move(*batch_);
93  }
94 
95  bool equal(ranges::default_sentinel_t) const
96  {
97  return done_;
98  }
99 
100  bool equal(const cursor& that) const
101  {
102  assert(rng_ == that.rng_);
103  return it_ == that.it_ && subbatch_->batch_size() == that.subbatch_->batch_size();
104  }
105 
106  void next()
107  {
108  batch_ = std::make_shared<batch_t>();
109  if (find_next()) fill_batch();
110  else done_ = true;
111  }
112  }; // struct cursor
113 
114  cursor begin_cursor() { return cursor{*this}; }
115 
116 public:
117  rebatch_view() = default;
118  rebatch_view(Rng rng, std::size_t n)
119  : rng_{rng}
120  , n_{n}
121  {
122  if (n_ <= 0) {
123  throw std::invalid_argument{"hipipe::stream::rebatch:"
124  " The new batch size " + std::to_string(n_) + " is not strictly positive."};
125  }
126  }
127 }; // class rebatch_view
128 
129 class rebatch_fn {
130 private:
132  friend rgv::view_access;
134 
135  static auto bind(rebatch_fn rebatch, std::size_t n)
136  {
137  return ranges::make_pipeable(std::bind(rebatch, std::placeholders::_1, n));
138  }
139 
140 public:
141  CPP_template(class Rng)(requires ranges::input_range<Rng>)
142  rebatch_view<rgv::all_t<Rng>> operator()(Rng&& rng, std::size_t n) const
143  {
144  return {rgv::all(std::forward<Rng>(rng)), n};
145  }
146 }; // class rebatch_fn
147 
148 
169 inline rgv::view<rebatch_fn> rebatch{};
170 
171 } // namespace hipipe::stream
hipipe::utility::to_string
std::string to_string(const T &value)
Convert the given type to std::string.
Definition: string.hpp:90
hipipe::utility::CPP_template
CPP_template(class Rng)(requires ranges
Unzips a range of tuples to a tuple of std::vectors.
Definition: tuple.hpp:255
hipipe::stream::rebatch
rgv::view< rebatch_fn > rebatch
Accumulate the stream and yield batches of a different size.
Definition: rebatch.hpp:175