HiPipe  0.6.0
C++17 data pipeline with Python bindings.
transform.hpp
1 /****************************************************************************
2  * hipipe library
3  * Copyright (c) 2017, Cognexa Solutions s.r.o.
4  * Copyright (c) 2018, Iterait a.s.
5  * Author(s) Filip Matzner
6  *
7  * This file is distributed under the MIT License.
8  * See the accompanying file LICENSE.txt for the complete license agreement.
9  ****************************************************************************/
10 
11 #pragma once
12 
13 #include <hipipe/build_config.hpp>
14 #include <hipipe/core/stream/stream_t.hpp>
15 #include <hipipe/core/stream/template_arguments.hpp>
16 #include <hipipe/core/utility/ndim.hpp>
17 #include <hipipe/core/utility/random.hpp>
18 #include <hipipe/core/utility/tuple.hpp>
19 
20 #include <range/v3/view/any_view.hpp>
21 #include <range/v3/view/transform.hpp>
22 #include <range/v3/view/zip.hpp>
23 
24 #include <functional>
25 #include <utility>
26 
27 namespace hipipe::stream {
28 
29 // partial transform //
30 
31 namespace detail {
32 
33  // Implementation of partial_transform.
34  template<typename Fun, typename From, typename To>
35  struct partial_transform_impl;
36 
37  template<typename Fun, typename... FromTypes, typename... ToTypes>
38  struct partial_transform_impl<Fun, from_t<FromTypes...>, to_t<ToTypes...>> {
39  Fun fun;
40 
41  batch_t operator()(batch_t source)
42  {
43  // build the view of the selected source columns for the transformer
44  std::tuple<typename FromTypes::data_type&...> slice_view{
45  source.extract<FromTypes>()...
46  };
47  // process the transformer's result and convert it to the requested types
48  static_assert(std::is_invocable_v<Fun&, decltype(slice_view)&&>,
49  "hipipe::stream::partial_transform: "
50  "Cannot apply the given function to the given `from<>` columns.");
51  static_assert(std::is_invocable_r_v<
52  std::tuple<typename ToTypes::data_type...>, Fun&, decltype(slice_view)&&>,
53  "hipipe::stream::partial_transform: "
54  "The function return type does not correspond to the tuple of the "
55  "selected `to<>` columns.");
56  std::tuple<typename ToTypes::data_type...> result =
57  std::invoke(fun, std::move(slice_view));
58  // convert the function results to the corresponding column(s)
59  utility::times_with_index<sizeof...(ToTypes)>([&source, &result](auto i) {
60  using Column = std::tuple_element_t<i, std::tuple<ToTypes...>>;
61  source.insert_or_assign<Column>(std::move(std::get<i>(result)));
62  });
63  return source;
64  }
65  };
66 
67  class partial_transform_fn {
68  private:
69  friend ranges::view::view_access;
70 
71  template <typename From, typename To, typename Fun>
72  static auto bind(partial_transform_fn transformer, From f, To t, Fun fun)
73  {
74  return ranges::make_pipeable(
75  std::bind(transformer, std::placeholders::_1, f, t, std::move(fun)));
76  }
77 
78  public:
79  template <typename... FromTypes, typename... ToTypes, typename Fun>
80  forward_stream_t operator()(
81  forward_stream_t rng, from_t<FromTypes...>, to_t<ToTypes...>, Fun fun) const
82  {
83  static_assert(sizeof...(ToTypes) > 0,
84  "For non-transforming operations, please use stream::for_each.");
85 
86  detail::partial_transform_impl<Fun, from_t<FromTypes...>, to_t<ToTypes...>>
87  trans_fun{std::move(fun)};
88 
89  return ranges::view::transform(std::move(rng), std::move(trans_fun));
90  }
91  };
92 
93 } // namespace detail
94 
95 // Transform a subset of columns in each batch.
96 //
97 // This transformer accepts a function that is applied on the chosen
98 // subset of source columns from the batch. The function should accept
99 // data_type of the chosen source columns as its parameters and return
100 // a tuple of data_type of the chosen target columns.
101 //
102 // This transformer is used internally by stream::transform and should not
103 // be used directly by the end user of the library.
104 inline ranges::view::view<detail::partial_transform_fn> partial_transform{};
105 
106 // transform //
107 
108 namespace detail {
109 
110  // Apply fun to each element in tuple of ranges in the given dimension.
111  template<typename Fun, std::size_t Dim, typename From, typename To>
112  struct wrap_fun_for_dim;
113 
114  template<typename Fun, std::size_t Dim, typename... FromTypes, typename... ToTypes>
115  struct wrap_fun_for_dim<Fun, Dim, from_t<FromTypes...>, to_t<ToTypes...>> {
116  Fun fun;
117  using FunRef = decltype(std::ref(fun));
118 
119  utility::maybe_tuple<ToTypes...>
120  operator()(std::tuple<FromTypes&...> tuple_of_ranges)
121  {
122  assert(utility::same_size(tuple_of_ranges));
123  // build the function to be applied
124  wrap_fun_for_dim<FunRef, Dim-1,
125  from_t<ranges::range_value_type_t<FromTypes>...>,
126  to_t<ranges::range_value_type_t<ToTypes>...>>
127  fun_wrapper{std::ref(fun)};
128  // transform
129  auto range_of_tuples =
131  std::apply(ranges::view::zip, std::move(tuple_of_ranges)),
132  std::move(fun_wrapper));
133  return utility::unzip_if<(sizeof...(ToTypes) > 1)>(std::move(range_of_tuples));
134  }
135  };
136 
137  template<typename Fun, typename... FromTypes, typename... ToTypes>
138  struct wrap_fun_for_dim<Fun, 0, from_t<FromTypes...>, to_t<ToTypes...>> {
139  Fun fun;
140 
141  utility::maybe_tuple<ToTypes...>
142  operator()(std::tuple<FromTypes&...> tuple)
143  {
144  static_assert(std::is_invocable_v<Fun&, FromTypes&...>,
145  "hipipe::stream::transform: "
146  "Cannot call the given function on the selected from<> columns.");
147  if constexpr(sizeof...(ToTypes) == 1) {
148  static_assert(std::is_invocable_r_v<
149  ToTypes..., Fun&, FromTypes&...>,
150  "hipipe::stream::transform: "
151  "The function does not return the selected to<> column.");
152  } else {
153  static_assert(std::is_invocable_r_v<
154  std::tuple<ToTypes...>, Fun&, FromTypes&...>,
155  "hipipe::stream::transform: "
156  "The function does not return the tuple of the selected to<> columns.");
157  }
158  return std::apply(fun, std::move(tuple));
159  }
160  };
161 
162 } // namespace detail
163 
186 template<typename... FromColumns, typename... ToColumns, typename Fun, int Dim = 1>
188  from_t<FromColumns...> f,
189  to_t<ToColumns...> t,
190  Fun fun,
191  dim_t<Dim> d = dim_t<1>{})
192 {
193  static_assert(
196  "hipipe::stream::transform: The dimension in which to apply the operation needs"
197  " to be at most the lowest dimension of all the from<> and to<> columns.");
198 
199  // a bit of function type erasure to speed up compilation
200  using FunT = std::function<
201  utility::maybe_tuple<utility::ndim_type_t<typename ToColumns::data_type, Dim>...>
202  (utility::ndim_type_t<typename FromColumns::data_type, Dim>&...)>;
203  // wrap the function to be applied in the appropriate dimension
204  detail::wrap_fun_for_dim<
205  FunT, Dim,
206  from_t<typename FromColumns::data_type...>,
207  to_t<typename ToColumns::data_type...>>
208  fun_wrapper{std::move(fun)};
209 
210  return stream::partial_transform(f, t, std::move(fun_wrapper));
211 }
212 
213 // conditional transform //
214 
215 namespace detail {
216 
217  // wrap the function to be applied only on if the first argument evaluates to true
218  template<typename Fun, typename FromIdxs, typename ToIdxs, typename From, typename To>
219  struct wrap_fun_with_cond;
220 
221  template<typename Fun, std::size_t... FromIdxs, std::size_t... ToIdxs,
222  typename CondCol, typename... Cols, typename... ToTypes>
223  struct wrap_fun_with_cond<Fun,
224  std::index_sequence<FromIdxs...>,
225  std::index_sequence<ToIdxs...>,
226  from_t<CondCol, Cols...>, to_t<ToTypes...>> {
227  Fun fun;
228 
229  utility::maybe_tuple<ToTypes...> operator()(CondCol& cond, Cols&... cols)
230  {
231  // make a tuple of all arguments, except for the condition
232  std::tuple<Cols&...> args_view{cols...};
233  // apply the function if the condition is true
234  if (cond) {
235  // the function is applied only on a subset of the arguments
236  // representing FromColumns
237  static_assert(std::is_invocable_v<Fun&,
238  std::tuple_element_t<FromIdxs, decltype(args_view)>...>,
239  "hipipe::stream::conditional_transform: "
240  "Cannot apply the given function to the given `from<>` columns.");
241  static_assert(std::is_invocable_r_v<
242  std::tuple<ToTypes...>, Fun&,
243  std::tuple_element_t<FromIdxs, decltype(args_view)>...>,
244  "hipipe::stream::conditional_transform: "
245  "The function return type does not correspond to the selected `to<>` columns.");
246  return std::invoke(fun, std::get<FromIdxs>(args_view)...);
247  }
248  // return the original arguments if the condition is false
249  // only a subset of the arguments representing ToColumns is returned
250  // note: We can force std::move in here, because
251  // we are only copying data to themselves.
252  return {std::move(std::get<ToIdxs>(args_view))...};
253  }
254  };
255 
256 } // namespace detail
257 
308 template<
309  typename... FromColumns,
310  typename... ToColumns,
311  typename CondColumn,
312  typename Fun,
313  int Dim = 1>
315  from_t<FromColumns...> f,
316  to_t<ToColumns...> t,
317  cond_t<CondColumn> c,
318  Fun fun,
319  dim_t<Dim> d = dim_t<1>{})
320 {
321  // make index sequences for source and target columns when they
322  // are concatenated in a single tuple
323  constexpr std::size_t n_from = sizeof...(FromColumns);
324  constexpr std::size_t n_to = sizeof...(ToColumns);
325  using FromIdxs = std::make_index_sequence<n_from>;
327 
328  static_assert(
332  "hipipe::stream::conditional_transform: The dimension in which to apply the operation needs"
333  " to be at most the lowest dimension of all the from<>, to<> and cond<> columns.");
334 
335  // a bit of function type erasure to speed up compilation
336  using FunT = std::function<
337  utility::maybe_tuple<utility::ndim_type_t<typename ToColumns::data_type, Dim>...>
338  (utility::ndim_type_t<typename FromColumns::data_type, Dim>&...)>;
339  // wrap the function to be applied in the appropriate dimension using the condition column
340  detail::wrap_fun_with_cond<
341  FunT, FromIdxs, ToIdxs,
342  from_t<utility::ndim_type_t<typename CondColumn::data_type, Dim>,
343  utility::ndim_type_t<typename FromColumns::data_type, Dim>...,
344  utility::ndim_type_t<typename ToColumns::data_type, Dim>...>,
345  to_t<utility::ndim_type_t<typename ToColumns::data_type, Dim>...>>
346  cond_fun{std::move(fun)};
347 
348  // transform from both, FromColumns and ToColumns into ToColumns
349  // the wrapper function takes care of extracting the parameters for the original function
350  return stream::transform(from_t<CondColumn, FromColumns..., ToColumns...>{},
351  t, std::move(cond_fun), d);
352 }
353 
354 // probabilistic transform //
355 
356 namespace detail {
357 
358  // wrap the function to be an identity if the dice roll fails
359  template<typename Fun, typename Prng,
360  typename FromIdxs, typename ToIdxs,
361  typename From, typename To>
362  struct wrap_fun_with_prob;
363 
364  template<typename Fun, typename Prng,
365  std::size_t... FromIdxs, std::size_t... ToIdxs,
366  typename... FromTypes, typename... ToTypes>
367  struct wrap_fun_with_prob<Fun, Prng,
368  std::index_sequence<FromIdxs...>,
369  std::index_sequence<ToIdxs...>,
370  from_t<FromTypes...>, to_t<ToTypes...>> {
371  Fun fun;
372  std::reference_wrapper<Prng> prng;
373  const double prob;
374 
375  utility::maybe_tuple<ToTypes...> operator()(FromTypes&... cols)
376  {
377  assert(prob >= 0. && prob <= 1.);
378  std::uniform_real_distribution<> dis{0, 1};
379  // make a tuple of all arguments
380  std::tuple<FromTypes&...> args_view{cols...};
381  // apply the function if the dice roll succeeds
382  if (prob == 1. || (prob > 0. && dis(prng.get()) < prob)) {
383  // the function is applied only on a subset of the arguments
384  // representing FromColumns
385  static_assert(std::is_invocable_v<Fun&,
386  std::tuple_element_t<FromIdxs, decltype(args_view)>...>,
387  "hipipe::stream::probabilistic_transform: "
388  "Cannot apply the given function to the given `from<>` columns.");
389  static_assert(std::is_invocable_r_v<
390  std::tuple<ToTypes...>, Fun&,
391  std::tuple_element_t<FromIdxs, decltype(args_view)>...>,
392  "hipipe::stream::probabilistic_transform: "
393  "The function return type does not correspond to the selected `to<>` columns.");
394  return std::invoke(fun, std::get<FromIdxs>(args_view)...);
395  }
396  // return the original arguments if the dice roll fails
397  // only a subset of the arguments representing ToColumns is returned
398  // note: We can force std::move in here, because
399  // we are only copying data to themselves.
400  return {std::move(std::get<ToIdxs>(args_view))...};
401  }
402  };
403 
404 } // namespace detail
405 
437 template<
438  typename... FromColumns,
439  typename... ToColumns,
440  typename Fun,
441  typename Prng = std::mt19937,
442  int Dim = 1>
444  from_t<FromColumns...> f,
445  to_t<ToColumns...> t,
446  double prob,
447  Fun fun,
448  Prng& prng = utility::random_generator,
449  dim_t<Dim> d = dim_t<1>{})
450 {
451  // make index sequences for source and target columns when they
452  // are concatenated in a single tuple
453  constexpr std::size_t n_from = sizeof...(FromColumns);
454  constexpr std::size_t n_to = sizeof...(ToColumns);
455  using FromIdxs = std::make_index_sequence<n_from>;
457 
458  static_assert(
461  "hipipe::stream::probabilistic_transform: The dimension in which to apply the operation "
462  " needs to be at most the lowest dimension of all the from<> and to<> columns.");
463 
464  // a bit of function type erasure to speed up compilation
465  using FunT = std::function<
466  utility::maybe_tuple<utility::ndim_type_t<typename ToColumns::data_type, Dim>...>
467  (utility::ndim_type_t<typename FromColumns::data_type, Dim>&...)>;
468  // wrap the function to be applied in the appropriate dimension with the given probabiliy
469  detail::wrap_fun_with_prob<
470  FunT, Prng, FromIdxs, ToIdxs,
471  from_t<utility::ndim_type_t<typename FromColumns::data_type, Dim>...,
472  utility::ndim_type_t<typename ToColumns::data_type, Dim>...>,
473  to_t<utility::ndim_type_t<typename ToColumns::data_type, Dim>...>>
474  prob_fun{std::move(fun), prng, prob};
475 
476  // transform from both, FromColumns and ToColumns into ToColumns
477  // the wrapper function takes care of extracting the parameters for the original function
478  return stream::transform(from_t<FromColumns..., ToColumns...>{}, t, std::move(prob_fun), d);
479 }
480 
481 } // namespace hipipe::stream
auto transform(from_t< FromColumns... > f, to_t< ToColumns... > t, double prob, Fun fun, Prng &prng=utility::random_generator, dim_t< Dim > d=dim_t< 1 >{})
Probabilistic transform of a subset of hipipe columns.
Definition: transform.hpp:443
ranges::any_view< batch_t, ranges::category::forward > forward_stream_t
The stream itself, i.e., a range of batches.
Definition: stream_t.hpp:24
decltype(auto) unzip_if(RangeT &&range)
Unzips a range of tuples to a tuple of ranges if a constexpr condition holds.
Definition: tuple.hpp:317
STL namespace.
Gets the number of dimensions of a multidimensional range.
Definition: ndim.hpp:47
decltype(plus< Offset >(std::make_index_sequence< N >{})) make_offset_index_sequence
Make std::index_sequence with the given offset.
Definition: tuple.hpp:91
Definition: ndim.hpp:144
constexpr Fun times_with_index(Fun &&fun)
Repeat a function N times in compile time.
Definition: tuple.hpp:403
static thread_local std::mt19937 random_generator
Thread local pseudo-random number generator seeded by std::random_device.
Definition: random.hpp:21
Column::data_type & extract()
Extract a reference to the stored data of the given column.
Definition: batch_t.hpp:93
bool same_size(Tuple &&rngs)
Utility function which checks that all the ranges in a tuple have the same size.
Definition: ndim.hpp:732