HiPipe  0.7.0
C++17 data pipeline with Python bindings.
transform.hpp
1 /****************************************************************************
2  * hipipe library
3  * Copyright (c) 2017, Cognexa Solutions s.r.o.
4  * Copyright (c) 2018, Iterait a.s.
5  * Author(s) Filip Matzner
6  *
7  * This file is distributed under the MIT License.
8  * See the accompanying file LICENSE.txt for the complete license agreement.
9  ****************************************************************************/
10 
11 #pragma once
12 
13 #include <hipipe/build_config.hpp>
14 #include <hipipe/core/stream/stream_t.hpp>
15 #include <hipipe/core/stream/template_arguments.hpp>
16 #include <hipipe/core/utility/ndim.hpp>
17 #include <hipipe/core/utility/random.hpp>
18 #include <hipipe/core/utility/tuple.hpp>
19 
20 #include <range/v3/range/conversion.hpp>
21 #include <range/v3/view/any_view.hpp>
22 #include <range/v3/view/transform.hpp>
23 #include <range/v3/view/zip.hpp>
24 
25 #include <functional>
26 #include <utility>
27 
28 namespace hipipe::stream {
29 
30 namespace rgv = ranges::views;
31 
32 // partial transform //
33 
34 namespace detail {
35 
36  // Implementation of partial_transform.
37  template<typename Fun, typename From, typename To>
38  struct partial_transform_impl;
39 
40  template<typename Fun, typename... FromTypes, typename... ToTypes>
41  struct partial_transform_impl<Fun, from_t<FromTypes...>, to_t<ToTypes...>> {
42  Fun fun;
43 
44  batch_t operator()(batch_t source)
45  {
46  // build the view of the selected source columns for the transformer
47  std::tuple<typename FromTypes::data_type&...> slice_view{
48  source.extract<FromTypes>()...
49  };
50  // process the transformer's result and convert it to the requested types
51  static_assert(std::is_invocable_v<Fun&, decltype(slice_view)&&>,
52  "hipipe::stream::partial_transform: "
53  "Cannot apply the given function to the given `from<>` columns.");
54  static_assert(std::is_invocable_r_v<
55  std::tuple<typename ToTypes::data_type...>, Fun&, decltype(slice_view)&&>,
56  "hipipe::stream::partial_transform: "
57  "The function return type does not correspond to the tuple of the "
58  "selected `to<>` columns.");
59  std::tuple<typename ToTypes::data_type...> result =
60  std::invoke(fun, std::move(slice_view));
61  // convert the function results to the corresponding column(s)
62  utility::times_with_index<sizeof...(ToTypes)>([&source, &result](auto i) {
63  using Column = std::tuple_element_t<i, std::tuple<ToTypes...>>;
64  source.insert_or_assign<Column>(std::move(std::get<i>(result)));
65  });
66  return source;
67  }
68  };
69 
70  class partial_transform_fn {
71  private:
72  friend rgv::view_access;
73 
74  template <typename From, typename To, typename Fun>
75  static auto bind(partial_transform_fn transformer, From f, To t, Fun fun)
76  {
77  return ranges::make_pipeable(
78  std::bind(transformer, std::placeholders::_1, f, t, std::move(fun)));
79  }
80 
81  public:
82  template <typename... FromTypes, typename... ToTypes, typename Fun>
83  forward_stream_t operator()(
84  forward_stream_t rng, from_t<FromTypes...>, to_t<ToTypes...>, Fun fun) const
85  {
86  static_assert(sizeof...(ToTypes) > 0,
87  "For non-transforming operations, please use stream::for_each.");
88 
89  detail::partial_transform_impl<Fun, from_t<FromTypes...>, to_t<ToTypes...>>
90  trans_fun{std::move(fun)};
91 
92  return rgv::transform(std::move(rng), std::move(trans_fun));
93  }
94  };
95 
96 } // namespace detail
97 
98 // Transform a subset of columns in each batch.
99 //
100 // This transformer accepts a function that is applied on the chosen
101 // subset of source columns from the batch. The function should accept
102 // data_type of the chosen source columns as its parameters and return
103 // a tuple of data_type of the chosen target columns.
104 //
105 // This transformer is used internally by stream::transform and should not
106 // be used directly by the end user of the library.
107 inline rgv::view<detail::partial_transform_fn> partial_transform{};
108 
109 // transform //
110 
111 namespace detail {
116  template <typename DestTuple, typename SourceTuple>
117  DestTuple convert_tuple_of_ranges(SourceTuple rngs)
118  {
119  static_assert(std::tuple_size_v<SourceTuple> == std::tuple_size_v<DestTuple>);
120  return utility::tuple_transform_with_index(std::move(rngs), [](auto rng, auto index) {
121  using DestType = std::tuple_element_t<index, DestTuple>;
122  return ranges::to<DestType>(rgv::move(rng));
123  });
124  }
125 
126  // Apply fun to each element in tuple of ranges in the given dimension.
127  template<typename Fun, std::size_t Dim, typename From, typename To>
128  struct wrap_fun_for_dim;
129 
130  template<typename Fun, std::size_t Dim, typename... FromTypes, typename... ToTypes>
131  struct wrap_fun_for_dim<Fun, Dim, from_t<FromTypes...>, to_t<ToTypes...>> {
132  Fun fun;
133  using FunRef = decltype(std::ref(fun));
134 
135  utility::maybe_tuple<ToTypes...>
136  operator()(std::tuple<FromTypes&...> tuple_of_ranges)
137  {
138  assert(utility::same_size(tuple_of_ranges));
139  // build the function to be applied
140  wrap_fun_for_dim<FunRef, Dim-1,
141  from_t<ranges::range_value_t<FromTypes>...>,
142  to_t<ranges::range_value_t<ToTypes>...>>
143  fun_wrapper{std::ref(fun)};
144  // transform
145  auto trans_view_of_tuples =
147  std::apply(rgv::zip, std::move(tuple_of_ranges)),
148  std::move(fun_wrapper));
149  // unzip the result and convert the ranges to the desired types
150  if constexpr (sizeof...(ToTypes) > 1) {
151  auto trans_tuple_of_vectors =
152  utility::unzip(std::move(trans_view_of_tuples));
153  return convert_tuple_of_ranges<std::tuple<ToTypes...>>(
154  std::move(trans_tuple_of_vectors));
155  // result is only one range, no unzipping
156  } else {
157  return ranges::to<ToTypes...>(std::move(trans_view_of_tuples));
158  }
159  }
160  };
161 
162  template<typename Fun, typename... FromTypes, typename... ToTypes>
163  struct wrap_fun_for_dim<Fun, 0, from_t<FromTypes...>, to_t<ToTypes...>> {
164  Fun fun;
165 
166  utility::maybe_tuple<ToTypes...>
167  operator()(std::tuple<FromTypes&...> tuple)
168  {
169  static_assert(std::is_invocable_v<Fun&, FromTypes&...>,
170  "hipipe::stream::transform: "
171  "Cannot call the given function on the selected from<> columns.");
172  if constexpr(sizeof...(ToTypes) == 1) {
173  static_assert(std::is_invocable_r_v<
174  ToTypes..., Fun&, FromTypes&...>,
175  "hipipe::stream::transform: "
176  "The function does not return the selected to<> column.");
177  } else {
178  static_assert(std::is_invocable_r_v<
179  std::tuple<ToTypes...>, Fun&, FromTypes&...>,
180  "hipipe::stream::transform: "
181  "The function does not return the tuple of the selected to<> columns.");
182  }
183  return std::apply(fun, std::move(tuple));
184  }
185  };
186 
187 } // namespace detail
188 
211 template<typename... FromColumns, typename... ToColumns, typename Fun, int Dim = 1>
212 auto transform(
213  from_t<FromColumns...> f,
214  to_t<ToColumns...> t,
215  Fun fun,
216  dim_t<Dim> d = dim_t<1>{})
217 {
218  static_assert(
221  "hipipe::stream::transform: The dimension in which to apply the operation needs"
222  " to be at most the lowest dimension of all the from<> and to<> columns.");
223 
224  // a bit of function type erasure to speed up compilation
225  using FunT = std::function<
226  utility::maybe_tuple<utility::ndim_type_t<typename ToColumns::data_type, Dim>...>
227  (utility::ndim_type_t<typename FromColumns::data_type, Dim>&...)>;
228  // wrap the function to be applied in the appropriate dimension
229  detail::wrap_fun_for_dim<
230  FunT, Dim,
231  from_t<typename FromColumns::data_type...>,
232  to_t<typename ToColumns::data_type...>>
233  fun_wrapper{std::move(fun)};
234 
235  return stream::partial_transform(f, t, std::move(fun_wrapper));
236 }
237 
238 // conditional transform //
239 
240 namespace detail {
241 
242  // wrap the function to be applied only on if the first argument evaluates to true
243  template<typename Fun, typename FromIdxs, typename ToIdxs, typename From, typename To>
244  struct wrap_fun_with_cond;
245 
246  template<typename Fun, std::size_t... FromIdxs, std::size_t... ToIdxs,
247  typename CondCol, typename... Cols, typename... ToTypes>
248  struct wrap_fun_with_cond<Fun,
249  std::index_sequence<FromIdxs...>,
250  std::index_sequence<ToIdxs...>,
251  from_t<CondCol, Cols...>, to_t<ToTypes...>> {
252  Fun fun;
253 
254  utility::maybe_tuple<ToTypes...> operator()(CondCol& cond, Cols&... cols)
255  {
256  // make a tuple of all arguments, except for the condition
257  std::tuple<Cols&...> args_view{cols...};
258  // apply the function if the condition is true
259  if (cond) {
260  // the function is applied only on a subset of the arguments
261  // representing FromColumns
262  static_assert(std::is_invocable_v<Fun&,
263  std::tuple_element_t<FromIdxs, decltype(args_view)>...>,
264  "hipipe::stream::conditional_transform: "
265  "Cannot apply the given function to the given `from<>` columns.");
266  static_assert(std::is_invocable_r_v<
267  std::tuple<ToTypes...>, Fun&,
268  std::tuple_element_t<FromIdxs, decltype(args_view)>...>,
269  "hipipe::stream::conditional_transform: "
270  "The function return type does not correspond to the selected `to<>` columns.");
271  return std::invoke(fun, std::get<FromIdxs>(args_view)...);
272  }
273  // return the original arguments if the condition is false
274  // only a subset of the arguments representing ToColumns is returned
275  // note: We can force std::move in here, because
276  // we are only copying data to themselves.
277  return {std::move(std::get<ToIdxs>(args_view))...};
278  }
279  };
280 
281 } // namespace detail
282 
333 template<
334  typename... FromColumns,
335  typename... ToColumns,
336  typename CondColumn,
337  typename Fun,
338  int Dim = 1>
339 auto transform(
340  from_t<FromColumns...> f,
341  to_t<ToColumns...> t,
342  cond_t<CondColumn> c,
343  Fun fun,
344  dim_t<Dim> d = dim_t<1>{})
345 {
346  // make index sequences for source and target columns when they
347  // are concatenated in a single tuple
348  constexpr std::size_t n_from = sizeof...(FromColumns);
349  constexpr std::size_t n_to = sizeof...(ToColumns);
350  using FromIdxs = std::make_index_sequence<n_from>;
352 
353  static_assert(
357  "hipipe::stream::conditional_transform: The dimension in which to apply the operation needs"
358  " to be at most the lowest dimension of all the from<>, to<> and cond<> columns.");
359 
360  // a bit of function type erasure to speed up compilation
361  using FunT = std::function<
362  utility::maybe_tuple<utility::ndim_type_t<typename ToColumns::data_type, Dim>...>
363  (utility::ndim_type_t<typename FromColumns::data_type, Dim>&...)>;
364  // wrap the function to be applied in the appropriate dimension using the condition column
365  detail::wrap_fun_with_cond<
366  FunT, FromIdxs, ToIdxs,
367  from_t<utility::ndim_type_t<typename CondColumn::data_type, Dim>,
368  utility::ndim_type_t<typename FromColumns::data_type, Dim>...,
369  utility::ndim_type_t<typename ToColumns::data_type, Dim>...>,
370  to_t<utility::ndim_type_t<typename ToColumns::data_type, Dim>...>>
371  cond_fun{std::move(fun)};
372 
373  // transform from both, FromColumns and ToColumns into ToColumns
374  // the wrapper function takes care of extracting the parameters for the original function
375  return stream::transform(from_t<CondColumn, FromColumns..., ToColumns...>{},
376  t, std::move(cond_fun), d);
377 }
378 
379 // probabilistic transform //
380 
381 namespace detail {
382 
383  // wrap the function to be an identity if the dice roll fails
384  template<typename Fun, typename Prng,
385  typename FromIdxs, typename ToIdxs,
386  typename From, typename To>
387  struct wrap_fun_with_prob;
388 
389  template<typename Fun, typename Prng,
390  std::size_t... FromIdxs, std::size_t... ToIdxs,
391  typename... FromTypes, typename... ToTypes>
392  struct wrap_fun_with_prob<Fun, Prng,
393  std::index_sequence<FromIdxs...>,
394  std::index_sequence<ToIdxs...>,
395  from_t<FromTypes...>, to_t<ToTypes...>> {
396  Fun fun;
397  std::reference_wrapper<Prng> prng;
398  const double prob;
399 
400  utility::maybe_tuple<ToTypes...> operator()(FromTypes&... cols)
401  {
402  assert(prob >= 0. && prob <= 1.);
403  std::uniform_real_distribution<> dis{0, 1};
404  // make a tuple of all arguments
405  std::tuple<FromTypes&...> args_view{cols...};
406  // apply the function if the dice roll succeeds
407  if (prob == 1. || (prob > 0. && dis(prng.get()) < prob)) {
408  // the function is applied only on a subset of the arguments
409  // representing FromColumns
410  static_assert(std::is_invocable_v<Fun&,
411  std::tuple_element_t<FromIdxs, decltype(args_view)>...>,
412  "hipipe::stream::probabilistic_transform: "
413  "Cannot apply the given function to the given `from<>` columns.");
414  static_assert(std::is_invocable_r_v<
415  std::tuple<ToTypes...>, Fun&,
416  std::tuple_element_t<FromIdxs, decltype(args_view)>...>,
417  "hipipe::stream::probabilistic_transform: "
418  "The function return type does not correspond to the selected `to<>` columns.");
419  return std::invoke(fun, std::get<FromIdxs>(args_view)...);
420  }
421  // return the original arguments if the dice roll fails
422  // only a subset of the arguments representing ToColumns is returned
423  // note: We can force std::move in here, because
424  // we are only copying data to themselves.
425  return {std::move(std::get<ToIdxs>(args_view))...};
426  }
427  };
428 
429 } // namespace detail
430 
462 template<
463  typename... FromColumns,
464  typename... ToColumns,
465  typename Fun,
466  typename Prng = std::mt19937,
467  int Dim = 1>
468 auto transform(
469  from_t<FromColumns...> f,
470  to_t<ToColumns...> t,
471  double prob,
472  Fun fun,
473  Prng& prng = utility::random_generator,
474  dim_t<Dim> d = dim_t<1>{})
475 {
476  // make index sequences for source and target columns when they
477  // are concatenated in a single tuple
478  constexpr std::size_t n_from = sizeof...(FromColumns);
479  constexpr std::size_t n_to = sizeof...(ToColumns);
480  using FromIdxs = std::make_index_sequence<n_from>;
482 
483  static_assert(
486  "hipipe::stream::probabilistic_transform: The dimension in which to apply the operation "
487  " needs to be at most the lowest dimension of all the from<> and to<> columns.");
488 
489  // a bit of function type erasure to speed up compilation
490  using FunT = std::function<
491  utility::maybe_tuple<utility::ndim_type_t<typename ToColumns::data_type, Dim>...>
492  (utility::ndim_type_t<typename FromColumns::data_type, Dim>&...)>;
493  // wrap the function to be applied in the appropriate dimension with the given probabiliy
494  detail::wrap_fun_with_prob<
495  FunT, Prng, FromIdxs, ToIdxs,
496  from_t<utility::ndim_type_t<typename FromColumns::data_type, Dim>...,
497  utility::ndim_type_t<typename ToColumns::data_type, Dim>...>,
498  to_t<utility::ndim_type_t<typename ToColumns::data_type, Dim>...>>
499  prob_fun{std::move(fun), prng, prob};
500 
501  // transform from both, FromColumns and ToColumns into ToColumns
502  // the wrapper function takes care of extracting the parameters for the original function
503  return stream::transform(from_t<FromColumns..., ToColumns...>{}, t, std::move(prob_fun), d);
504 }
505 
506 } // namespace hipipe::stream
hipipe::utility::random_generator
static thread_local std::mt19937 random_generator
Thread local pseudo-random number generator seeded by std::random_device.
Definition: random.hpp:20
hipipe::utility::times_with_index
constexpr Fun times_with_index(Fun &&fun)
Repeat a function N times in compile time.
Definition: tuple.hpp:402
hipipe::utility::ndims
Gets the number of dimensions of a multidimensional range.
Definition: ndim.hpp:50
same_size
bool same_size(Tuple &&rngs)
Utility function which checks that all the ranges in a tuple have the same size.
Definition: ndim.hpp:735
hipipe::stream::batch::extract
Column::data_type & extract()
Extract a reference to the stored data of the given column.
Definition: batch_t.hpp:104
hipipe::utility::tuple_transform_with_index
constexpr auto tuple_transform_with_index(Tuple &&tuple, Fun &&fun)
Similar to tuple_transform(), but with index available.
Definition: tuple.hpp:457
hipipe::utility::make_offset_index_sequence
decltype(plus< Offset >(std::make_index_sequence< N >{})) make_offset_index_sequence
Make std::index_sequence with the given offset.
Definition: tuple.hpp:90
hipipe::stream::forward_stream_t
ranges::any_view< batch_t, ranges::category::forward > forward_stream_t
The stream itself, i.e., a range of batches.
Definition: stream_t.hpp:29
hipipe::stream::transform
auto transform(from_t< FromColumns... > f, to_t< ToColumns... > t, double prob, Fun fun, Prng &prng=utility::random_generator, dim_t< Dim > d=dim_t< 1 >{})
Probabilistic transform of a subset of hipipe columns.
Definition: transform.hpp:474