13 #include <hipipe/core/stream/stream_t.hpp>
14 #include <hipipe/core/utility/tuple.hpp>
16 #include <range/v3/view/transform.hpp>
17 #include <range/v3/view/chunk.hpp>
20 #include <unordered_map>
22 namespace hipipe::stream {
24 namespace rgv = ranges::views;
28 template<
typename... Columns>
31 template<
typename Source>
32 batch_t operator()(Source&& source)
const
36 if constexpr(
sizeof...(Columns) == 0) {
37 static_assert(
"hipipe::stream::create: At least one column has to be provided.");
38 }
else if constexpr(
sizeof...(Columns) == 1) {
39 static_assert(std::is_constructible_v<Columns..., Source&&>,
40 "hipipe::stream::create: "
41 "Cannot convert the given data range to the selected column type.");
42 batch.insert_or_assign<Columns...>(ranges::to_vector(std::forward<Source>(source)));
44 using SourceValue = ranges::range_value_t<Source>;
45 static_assert(std::is_constructible_v<
46 std::tuple<typename Columns::example_type...>, SourceValue&&>,
47 "hipipe::stream::create: "
48 "Cannot convert the given data range to the selected column types.");
49 std::tuple<Columns...> data = utility::unzip(std::forward<Source>(source));
51 batch.insert_or_assign<std::decay_t<decltype(column)>>(std::move(column));
59 template<
typename... Columns>
62 friend rgv::view_access;
64 static auto bind(create_fn<Columns...> fun, std::size_t batch_size = 1)
66 return ranges::make_pipeable(std::bind(fun, std::placeholders::_1, batch_size));
69 CPP_template(
class Rng)(requires ranges::forward_range<Rng>)
73 rgv::chunk(std::forward<Rng>(rng), batch_size),
74 create_impl<Columns...>{});
78 CPP_template(
class Rng)(requires !ranges::forward_range<Rng>)
79 void operator()(Rng&&, std::size_t batch_size = 1)
const
81 CONCEPT_ASSERT_MSG(ranges::forward_range<Rng>(),
82 "stream::create only works on ranges satisfying the forward_range concept.");
112 template<
typename... Columns>
113 rgv::view<detail::create_fn<Columns...>>
create{};