HiPipe  0.6.0
C++17 data pipeline with Python bindings.
Classes | Macros | Typedefs | Functions | Variables
Stream modifiers and data types.

Classes

class  hipipe::stream::batch
 Container for multiple columns. More...
 
class  hipipe::stream::abstract_column
 Abstract base class for HiPipe columns. More...
 
class  hipipe::stream::column_base< ColumnName, ExampleType >
 Implementation stub of a column defined by HIPIPE_DEFINE_COLUMN macro. More...
 

Macros

#define HIPIPE_DEFINE_COLUMN(column_name_, example_type_)
 Macro for fast column definition. More...
 

Typedefs

using hipipe::stream::forward_stream_t = ranges::any_view< batch_t, ranges::category::forward >
 The stream itself, i.e., a range of batches. More...
 
using hipipe::stream::input_stream_t = ranges::any_view< batch_t, ranges::category::input >
 The stream type after special eager operations. More...
 

Functions

template<typename... FromColumns, typename... ToColumns>
auto hipipe::stream::copy (from_t< FromColumns... > from_cols, to_t< ToColumns... > to_cols)
 Copy the data from FromColumns to the respective ToColumns. More...
 
template<typename... FromColumns, typename... ByColumns, typename Fun , int Dim = 1>
auto hipipe::stream::filter (from_t< FromColumns... > f, by_t< ByColumns... > b, Fun fun, dim_t< Dim > d=dim_t< 1 >{})
 Filter stream data. More...
 
template<typename... FromColumns, typename Fun , int Dim = 1>
auto hipipe::stream::for_each (from_t< FromColumns... > f, Fun fun, dim_t< Dim > d=dim_t< 1 >{})
 Apply a function to a subset of stream columns. More...
 
template<typename FromColumn , typename ToColumn , typename Gen , int Dim = utility::ndims<typename ToColumn::data_type>::value - utility::ndims<std::result_of_t<Gen()>>::value>
auto hipipe::stream::generate (from_t< FromColumn > size_from, to_t< ToColumn > fill_to, Gen gen, long gendims=std::numeric_limits< long >::max(), dim_t< Dim > d=dim_t< Dim >{})
 Fill the selected column using a generator (i.e., a nullary function). More...
 
template<typename FromColumn , typename MaskColumn , typename ValT = typename utility::ndim_type_t< typename FromColumn::data_type, utility::ndims<typename MaskColumn::data_type>::value>>
auto hipipe::stream::pad (from_t< FromColumn > f, mask_t< MaskColumn > m, ValT value=ValT{})
 Pad the selected column to a rectangular size. More...
 
template<typename FromColumn , typename ToColumn , typename Prng = std::mt19937, typename Dist = std::uniform_real_distribution<double>, int Dim = utility::ndims<typename ToColumn::data_type>::value - utility::ndims<std::result_of_t<Dist(Prng&)>>::value>
auto hipipe::stream::random_fill (from_t< FromColumn > size_from, to_t< ToColumn > fill_to, long rnddims=std::numeric_limits< long >::max(), Dist dist=Dist{0, 1}, Prng &prng=hipipe::utility::random_generator, dim_t< Dim > d=dim_t< Dim >{})
 Fill the selected column of a stream with random values. More...
 
template<typename... FromColumns, typename... ToColumns, typename Fun , int Dim = 1>
auto hipipe::stream::transform (from_t< FromColumns... > f, to_t< ToColumns... > t, Fun fun, dim_t< Dim > d=dim_t< 1 >{})
 Transform a subset of hipipe columns to a different subset of hipipe columns. More...
 
template<typename... FromColumns, typename... ToColumns, typename CondColumn , typename Fun , int Dim = 1>
auto hipipe::stream::transform (from_t< FromColumns... > f, to_t< ToColumns... > t, cond_t< CondColumn > c, Fun fun, dim_t< Dim > d=dim_t< 1 >{})
 Conditional transform of a subset of hipipe columns. More...
 
template<typename... FromColumns, typename... ToColumns, typename Fun , typename Prng = std::mt19937, int Dim = 1>
auto hipipe::stream::transform (from_t< FromColumns... > f, to_t< ToColumns... > t, double prob, Fun fun, Prng &prng=utility::random_generator, dim_t< Dim > d=dim_t< 1 >{})
 Probabilistic transform of a subset of hipipe columns. More...
 
template<typename Rng , typename... FromColumns, int Dim = 1>
auto hipipe::stream::unpack (Rng &&rng, from_t< FromColumns... > f, dim_t< Dim > d=dim_t< 1 >{})
 Unpack a stream into a tuple of ranges. More...
 

Variables

ranges::view::view< buffer_fn > hipipe::stream::buffer {}
 Asynchronously buffers the given range. More...
 
template<typename... Columns>
ranges::view::view< detail::create_fn< Columns... > > hipipe::stream::create {}
 Converts a data range to a HiPipe stream. More...
 
template<typename... Columns>
ranges::view::view< detail::drop_fn< Columns... > > hipipe::stream::drop {}
 Drops columns from a stream. More...
 
template<typename... Columns>
ranges::view::view< detail::keep_fn< Columns... > > hipipe::stream::keep {}
 Keep the specified columns in the stream, drop everything else. More...
 
ranges::view::view< rebatch_fn > hipipe::stream::rebatch {}
 Accumulate the stream and yield batches of a different size. More...
 

Detailed Description

Macro Definition Documentation

◆ HIPIPE_DEFINE_COLUMN

#define HIPIPE_DEFINE_COLUMN (   column_name_,
  example_type_ 
)
Value:
struct column_name_ : hipipe::stream::column_base<column_name_, example_type_> { \
std::string name() const override { return #column_name_; } \
};
virtual std::string name() const =0
Implementation stub of a column defined by HIPIPE_DEFINE_COLUMN macro.
Definition: column_t.hpp:121

Macro for fast column definition.

Under the hood, it creates a new type derived from column_base.

Definition at line 251 of file column_t.hpp.

Typedef Documentation

◆ forward_stream_t

using hipipe::stream::forward_stream_t = typedef ranges::any_view<batch_t, ranges::category::forward>

The stream itself, i.e., a range of batches.

Unless specified otherwise, the stream transformers expect this type and return this type. Exceptions are e.g. Stream modifiers and data types. stream::rebatch.

Definition at line 24 of file stream_t.hpp.

◆ input_stream_t

using hipipe::stream::input_stream_t = typedef ranges::any_view<batch_t, ranges::category::input>

The stream type after special eager operations.

For instance, stream::rebatch reduces the stream to InputRange and returns this type. Stream of such type cannot be further transformed.

Definition at line 32 of file stream_t.hpp.

Function Documentation

◆ copy()

template<typename... FromColumns, typename... ToColumns>
auto hipipe::stream::copy ( from_t< FromColumns... >  from_cols,
to_t< ToColumns... >  to_cols 
)

Copy the data from FromColumns to the respective ToColumns.

The data from i-th FromColumn are copied to i-th ToColumn. Note that the ToColumns examples must be constructible from their FromColumns counterparts.

Example:

// rng is a stream with four identical columns
auto rng = view::iota(0, 10) | create<i>() |
copy(from<i>, to<i2>) | copy(from<i, i2>, to<i3, l>);
Parameters
from_colsThe source columns.
to_colsThe target columns.

Definition at line 38 of file copy.hpp.

◆ filter()

template<typename... FromColumns, typename... ByColumns, typename Fun , int Dim = 1>
auto hipipe::stream::filter ( from_t< FromColumns... >  f,
by_t< ByColumns... >  b,
Fun  fun,
dim_t< Dim >  d = dim_t<1>{} 
)

Filter stream data.

Example:

HIPIPE_DEFINE_COLUMN(value, double)
std::vector<std::tuple<int, double>> data = {{3, 5.}, {1, 2.}};
auto rng = data
| create<id, value>()
| filter(from<id, value>, by<value>, [](double value) { return value > 3.; });
Parameters
fThe columns to be filtered.
bThe columns to be passed to the filtering function. Those have to be a subset of f.
funThe filtering function returning a boolean.
dThe dimension in which the function is applied. Choose 0 to filter whole batches (in such a case, the f parameter is ignored).

Definition at line 141 of file filter.hpp.

◆ for_each()

template<typename... FromColumns, typename Fun , int Dim = 1>
auto hipipe::stream::for_each ( from_t< FromColumns... >  f,
Fun  fun,
dim_t< Dim >  d = dim_t<1>{} 
)

Apply a function to a subset of stream columns.

The given function is applied to a subset of columns given by FromColumns. The function is applied lazily, i.e., only when the range is iterated.

Example:

HIPIPE_DEFINE_COLUMN(Double, double)
std::vector<std::tuple<Int, Double>> data = {{3, 5.}, {1, 2.}};
auto rng = data
| for_each(from<Int, Double>, [](int& v, double& d) { std::cout << c + d; });
Parameters
fThe columns to be exctracted out of the tuple of columns and passed to fun.
funThe function to be applied.
dThe dimension in which the function is applied. Choose 0 for the function to be applied to the whole batch.

Definition at line 62 of file for_each.hpp.

◆ generate()

template<typename FromColumn , typename ToColumn , typename Gen , int Dim = utility::ndims<typename ToColumn::data_type>::value - utility::ndims<std::result_of_t<Gen()>>::value>
auto hipipe::stream::generate ( from_t< FromColumn >  size_from,
to_t< ToColumn >  fill_to,
Gen  gen,
long  gendims = std::numeric_limits<long>::max(),
dim_t< Dim >  d = dim_t<Dim>{} 
)

Fill the selected column using a generator (i.e., a nullary function).

This function uses utility::generate(). Furthermore, the column to be filled is first resized so that it has the same size as the selected source column.

Tip: If there is no column the size could be taken from, than just resize the target column manually and use it as both from column and to column.

Example:

HIPIPE_DEFINE_COLUMN(value, double)
std::vector<int> data = {3, 1, 2};
auto rng = data
| create<id>()
// assign each id a value from an increasing sequence
| generate(from<id>, to<value>, [i = 0]() mutable { return i++; });
Parameters
size_fromThe column whose size will be used to initialize the generated column.
fill_toThe column to be filled using the generator.
genThe generator to be used.
gendimsThe number of generated dimensions. See utility::generate().
dThis is the dimension in which will the generator be applied. E.g., if set to 1, the generator result is considered to be a single example. The default is ndims<ToColumn::data_type> - ndims<gen()>. This value has to be positive.

Definition at line 83 of file generate.hpp.

◆ pad()

template<typename FromColumn , typename MaskColumn , typename ValT = typename utility::ndim_type_t< typename FromColumn::data_type, utility::ndims<typename MaskColumn::data_type>::value>>
auto hipipe::stream::pad ( from_t< FromColumn >  f,
mask_t< MaskColumn >  m,
ValT  value = ValT{} 
)

Pad the selected column to a rectangular size.

Each batch is padded separately.

The mask of the padded values is created along with the padding. The mask evaluates to true on the positions with the original elements and to false on the positions of the padded elements. The mask column should be a multidimensional vector of type bool/char/int/... The dimensionality of the mask column is used to deduce how many dimensions should be padded in the source column.

This transformer internally uses utility::ndim_pad().

Example:

HIPIPE_DEFINE_COLUMN(sequences, std::vector<int>)
HIPIPE_DEFINE_COLUMN(sequence_masks, std::vector<bool>)
std::vector<std::vector<int>> data = {{1, 2}, {3, 4, 5}, {}, {6, 7}};
auto rng = data
| create<sequences>(2)
| pad(from<sequences>, mask<sequence_masks>, -1);
// sequences_batch_1 == {{1, 2, -1}, {3, 4, 5}}
// sequences_batch_2 == {{-1, -1}, {6, 7}}
// sequence_masks_batch_1 == {{true, true, false}, {true, true, true}}
// sequence_masks_batch_2 == {{false, false}, {true, true}}
Parameters
fThe column to be padded.
mThe column where the mask should be stored and from which the dimension is taken.
valueThe value to pad with.

Definition at line 90 of file pad.hpp.

◆ random_fill()

template<typename FromColumn , typename ToColumn , typename Prng = std::mt19937, typename Dist = std::uniform_real_distribution<double>, int Dim = utility::ndims<typename ToColumn::data_type>::value - utility::ndims<std::result_of_t<Dist(Prng&)>>::value>
auto hipipe::stream::random_fill ( from_t< FromColumn >  size_from,
to_t< ToColumn >  fill_to,
long  rnddims = std::numeric_limits<long>::max(),
Dist  dist = Dist{0, 1},
Prng &  prng = hipipe::utility::random_generator,
dim_t< Dim >  d = dim_t<Dim>{} 
)

Fill the selected column of a stream with random values.

This function uses stream::generate() and has a similar semantics. That is, the column to be filled is first resized so that it has the same size as the selected source column.

Tip: If there is no column the size could be taken from, than just resize the target column manually and use it as both from column and to column.

Example:

HIPIPE_DEFINE_COLUMN(value, double)
std::vector<int> data = {3, 1, 2};
auto rng = data
| create<id>()
| random_fill(from<id>, to<value>);
| transform(from<id, value>, [](...){ ... });
Parameters
size_fromThe column whose size will be used to initialize the random column.
fill_toThe column to be filled with random data.
rnddimsThe number of random dimensions. See utility::random_fill().
distThe random distribution to be used. This object is copied on every use to avoid race conditions with stream::buffer().
prngThe random generator to be used.
dThis is the dimension in which will the generator be applied. E.g., if set to 1, the generator result is considered to be a single example. The default is ndims<ToColumn::data_type> - ndims<dist(prng)>. This value has to be positive.

Definition at line 52 of file random_fill.hpp.

◆ transform() [1/3]

template<typename... FromColumns, typename... ToColumns, typename Fun , int Dim = 1>
auto hipipe::stream::transform ( from_t< FromColumns... >  f,
to_t< ToColumns... >  t,
Fun  fun,
dim_t< Dim >  d = dim_t<1>{} 
)

Transform a subset of hipipe columns to a different subset of hipipe columns.

Example:

HIPIPE_DEFINE_COLUMN(value, double)
std::vector<std::tuple<int, double>> data = {{3, 5.}, {1, 2.}};
auto rng = data
| create<id, value>()
| transform(from<id>, to<value>, [](int id) { return id * 5. + 1.; });
Parameters
fThe columns to be extracted out of the tuple of columns and passed to fun.
tThe columns where the result will be saved. If the stream does not contain the selected columns, they are added to the stream. This parameter can overlap with the parameter f.
funThe function to be applied. The function should return the type represented by the target column in the given dimension. If there are multiple target columns, the function should return a tuple of the corresponding types.
dThe dimension in which is the function applied. Choose 0 for the function to be applied to the whole batch.

Definition at line 187 of file transform.hpp.

◆ transform() [2/3]

template<typename... FromColumns, typename... ToColumns, typename CondColumn , typename Fun , int Dim = 1>
auto hipipe::stream::transform ( from_t< FromColumns... >  f,
to_t< ToColumns... >  t,
cond_t< CondColumn >  c,
Fun  fun,
dim_t< Dim >  d = dim_t<1>{} 
)

Conditional transform of a subset of hipipe columns.

This function behaves the same as the original stream::transform(), but it accepts one extra argument denoting a column of true/false values of the same shape as the columns to be transformed. The transformation will only be applied on true values and it will be an identity on false values.

Note that this can be very useful in combination with stream::random_fill() and std::bernoulli_distribution.

Example:

HIPIPE_DEFINE_COLUMN(do_trans, char) // do not use bool here, vector<bool> is
// not a good OutputRange
std::vector<int> data_int = {3, 1, 5, 7};
// hardcoded usage
std::vector<int> data_cond = {true, true, false, false};
auto rng = ranges::view::zip(data_int, data_cond)
| create<dogs, do_trans>()
// this transforms only the first two examples and does nothing for the last two
| transform(from<dogs>, to<dogs>, cond<do_trans>, [](int dog) { return dog + 1; })
// this transformation reverts the previous one
| transform(from<dogs>, to<dogs>, cond<do_trans>, [](int dog) { return dog - 1; });
// random_fill usage
std::bernoulli_distribution dist{0.5};
auto rng2 = data_int
| create<dogs>()
| random_fill(from<dogs>, to<do_trans>, 1, dist, prng)
// the transformation of each example is performed with 50% probability
| transform(from<dogs>, to<dogs>, cond<do_trans>, [](int dog) { return dog + 1; })
// this transformation reverts the previous one
| transform(from<dogs>, to<dogs>, cond<do_trans>, [](int dog) { return dog - 1; });
Parameters
fThe columns to be extracted out of the tuple of columns and passed to fun.
tThe columns where the result will be saved. Those have to already exist in the stream.
cThe column of true/false values denoting whether the transformation should be performed or not. For false values, the transformation is an identity on the target columns.
funThe function to be applied. The function should return the type represented by the selected column in the given dimension. If there are multiple target columns, the function should return a tuple of the corresponding types.
dThe dimension in which is the function applied. Choose 0 for the function to be applied to the whole batch.

Definition at line 314 of file transform.hpp.

◆ transform() [3/3]

template<typename... FromColumns, typename... ToColumns, typename Fun , typename Prng = std::mt19937, int Dim = 1>
auto hipipe::stream::transform ( from_t< FromColumns... >  f,
to_t< ToColumns... >  t,
double  prob,
Fun  fun,
Prng &  prng = utility::random_generator,
dim_t< Dim >  d = dim_t<1>{} 
)

Probabilistic transform of a subset of hipipe columns.

This function behaves the same as the original stream::transform(), but it accepts one extra argument denoting the probability of transformation. If this probability is 0.0, the transformer behaves as an identity. If it is 1.0, the transofrmation function is always applied.

Example:

std::vector<int> data = {3, 1, 5, 7};
auto rng = data
| create<dogs>()
// In 50% of the cases, the number of dogs increase,
// and in the other 50% of the cases, it stays the same.
| transform(from<dogs>, to<dogs>, 0.5, [](int dog) { return dog + 1; });
Parameters
fThe columns to be extracted out of the tuple of columns and passed to fun.
tThe columns where the result will be saved. Those have to already exist in the stream.
probThe probability of transformation. If the dice roll fails, the transformer applies an identity on the target columns.
funThe function to be applied. The function should return the type represented by the selected column in the given dimension. If there are multiple target columns, the function should return a tuple of the corresponding types.
prngThe random generator to be used. Defaults to a thread_local std::mt19937.
dThe dimension in which is the function applied. Choose 0 for the function to be applied to the whole batch.

Definition at line 443 of file transform.hpp.

◆ unpack()

template<typename Rng , typename... FromColumns, int Dim = 1>
auto hipipe::stream::unpack ( Rng &&  rng,
from_t< FromColumns... >  f,
dim_t< Dim >  d = dim_t<1>{} 
)

Unpack a stream into a tuple of ranges.

This operation transforms the stream (i.e., a range of batches) into a tuple of the types represented by the columns. The data can be unpacked in a specific dimension and then the higher dimensions are joined together.

If there is only a single column to be unpacked, the result is an std::vector of the corresponding type. If there are multiple columns to be unpacked, the result is a tuple of std::vectors.

Example:

HIPIPE_DEFINE_COLUMN(values, std::vector<double>)
std::vector<std::tuple<int, std::vector<double>>> data = {{3, {5., 7.}}, {1, {2., 4.}}};
auto rng = data | create<id, values>(4);
// unpack in the first dimesion
std::vector<int> unp_ids;
std::vector<std::vector<double>> unp_values;
std::tie(unp_ids, unp_values) = unpack(rng, from<id, values>);
// unp_ids == {3, 1}
// unp_values == {{5., 7.}, {2., 4.}}
// unpack a single column in the second dimesion
std::vector<double> unp_values_dim2;
unp_values_dim2 = unpack(rng, from<values>, dim<2>);
// unp_values_dim2 == {5., 7., 2., 4.}

Definition at line 110 of file unpack.hpp.

Variable Documentation

◆ buffer

ranges::view::view<buffer_fn> hipipe::stream::buffer {}
inline

Asynchronously buffers the given range.

Asynchronously evaluates the given number of elements in advance. When queried for the next element, it is already prepared. This view works for any range, not only for hipipe streams.

Note that this transformer is not lazy and instead eagerly evaluates the data in asynchronous threads. To avoid recalculation of the entire underlying range whenever e.g., std::distance is called, this transformer intentionally changes the stream type to InputRange. The downside is that no further transformations can be appended (except for Stream modifiers and data types. stream::rebatch) and everything has to be prepared before the application of this transformer.

std::vector<int> data = {1, 2, 3, 4, 5};
auto buffered_rng = data
| ranges::view::transform([](int v) { return v + 1; })
| buffer(2);

Definition at line 181 of file buffer.hpp.

◆ create

template<typename... Columns>
ranges::view::view<detail::create_fn<Columns...> > hipipe::stream::create {}

Converts a data range to a HiPipe stream.

The value type of the input range is supposed to be either the type represented by the column to be created, or a tuple of such types if there are more columns to be created.

Example:

// rng is a stream where each batch is a single element from 0..9
auto rng = view::iota(0, 10) | create<id>();
// batched_rng is a stream with a single batch with numbers 0..9
auto rng = view::iota(0, 10) | create<id>(50);
// also multiple columns can be created at once
auto rng = view::zip(view::iota(0, 10), view::iota(30, 50)) | create<id, age>();
Parameters
batch_sizeThe requested batch size of the new stream.

Definition at line 111 of file create.hpp.

◆ drop

template<typename... Columns>
ranges::view::view<detail::drop_fn<Columns...> > hipipe::stream::drop {}

Drops columns from a stream.

Example:

HIPIPE_DEFINE_COLUMN(value, double)
std::vector<std::tuple<int, double>> data = {{3, 5.}, {1, 2.}};
auto rng = data | create<id, value>() | drop<id>;

Definition at line 67 of file drop.hpp.

◆ keep

template<typename... Columns>
ranges::view::view<detail::keep_fn<Columns...> > hipipe::stream::keep {}

Keep the specified columns in the stream, drop everything else.

Example:

HIPIPE_DEFINE_COLUMN(value, double)
std::vector<std::tuple<int, double>> data = {{3, 5.}, {1, 2.}};
auto rng = data | create<id, value>() | keep<value>; // now it has only the value column

Definition at line 67 of file keep.hpp.

◆ rebatch

ranges::view::view<rebatch_fn> hipipe::stream::rebatch {}
inline

Accumulate the stream and yield batches of a different size.

The batch size of the accumulated columns is allowed to differ between batches. To make one large batch of all the data, use std::numeric_limits<std::size_t>::max().

Note that this stream transformer is not lazy and instead eagerly evaluates the batches computed by the previous stream pipeline and reorganizes the evaluated data to batches of a different size. To avoid recalculation of the entire stream whenever e.g., std::distance is called, this transformer intentionally changes the stream type to InputRange. The downside is that no further transformations or buffering can be appended and everything has to be prepared before the application of this transformer.

auto rng = view::iota(0, 10)
| create<value>(2) // batches the data by two examples
| rebatch(3); // changes the batch size to three examples

Definition at line 168 of file rebatch.hpp.