12 #include <hipipe/build_config.hpp>
13 #include <hipipe/core/stream/column_t.hpp>
18 #include <type_traits>
21 #include <unordered_map>
23 namespace hipipe::stream {
33 std::unordered_map<std::type_index, std::unique_ptr<abstract_column>> columns_;
39 template<
typename Column>
40 void throw_check_contains()
const
42 if (!columns_.count(std::type_index{typeid(Column)})) {
43 throw std::runtime_error{
44 std::string{
"Trying to retrieve column `"} + Column{}.name()
45 +
"`, but the batch contains no such column."};
71 template<
typename Column>
72 std::unique_ptr<abstract_column>&
at()
74 throw_check_contains<Column>();
75 return columns_.at(std::type_index{
typeid(Column)});
92 template<
typename Column>
93 typename Column::data_type&
extract()
95 throw_check_contains<Column>();
96 return columns_.at(std::type_index{
typeid(Column)})->extract<Column>();
102 template<
typename Column>
103 const typename Column::data_type&
extract()
const
105 throw_check_contains<Column>();
106 return columns_.at(std::type_index{
typeid(Column)})->extract<Column>();
125 template<
typename Column,
typename... Args>
128 static_assert(std::is_constructible_v<Column, Args&&...>,
129 "Cannot construct the given column from the provided arguments.");
130 columns_.insert_or_assign(
131 std::type_index{
typeid(Column)},
132 std::make_unique<Column>(std::forward<Args>(args)...));
150 template<
typename Column>
153 columns_.insert_or_assign(std::type_index{
typeid(Column)}, std::move(column_ptr));
159 std::size_t
size()
const
161 return columns_.size();
167 template<
typename Column>
170 return columns_.count(std::type_index{
typeid(Column)});
179 template<
typename Column>
182 throw_check_contains<Column>();
183 columns_.erase(std::type_index{
typeid(Column)});
195 if (columns_.empty())
return 0;
196 std::size_t
batch_size = columns_.begin()->second->size();
197 for (
auto it = ++columns_.begin(); it != columns_.end(); ++it) {
199 throw std::runtime_error{
"hipipe: Canot deduce a batch size from a batch "
200 "with columns of different size (`" + it->second->name() +
"`)."};
217 for (
const auto& [key, col] : columns_) {
218 new_batch.columns_.insert_or_assign(key, col->take(n));
233 for (
auto& [key, col] : rhs.columns_) {
234 if (!columns_.count(key)) {
235 columns_[key] = std::move(col);
237 columns_.at(key)->push_back(std::move(col));
248 #ifdef HIPIPE_BUILD_PYTHON
249 boost::python::dict to_python()
251 boost::python::dict res;
252 for (
auto it = columns_.begin(); it != columns_.end(); ++it) {
253 res[it->second->name()] = it->second->to_python();
263 using batch_t = batch;