HiPipe  0.7.0
C++17 data pipeline with Python bindings.
batch_t.hpp
1 /****************************************************************************
2  * hipipe library
3  * Copyright (c) 2018, Iterait a.s.
4  * Author(s) Filip Matzner, Adam Blazek
5  *
6  * This file is distributed under the MIT License.
7  * See the accompanying file LICENSE.txt for the complete license agreement.
8  ****************************************************************************/
9 
10 #pragma once
11 
12 #include <hipipe/build_config.hpp>
13 #include <hipipe/core/stream/column_t.hpp>
14 
15 #include <memory>
16 #include <stdexcept>
17 #include <string>
18 #include <type_traits>
19 #include <typeindex>
20 #include <typeinfo>
21 #include <unordered_map>
22 
23 namespace hipipe::stream {
24 
29 class batch {
30 private:
31 
33  std::unordered_map<std::type_index, std::unique_ptr<abstract_column>> columns_;
34 
39  template<typename Column>
40  void throw_check_contains() const
41  {
42  if (!columns_.count(std::type_index{typeid(Column)})) {
43  throw std::runtime_error{
44  std::string{"Trying to retrieve column `"} + Column{}.name()
45  + "`, but the batch contains no such column."};
46  }
47  }
48 
49 public:
50 
51  // constructors //
52 
53  batch() = default;
54  batch(const batch&) = delete;
55  batch(batch&&) = default;
56 
57  // direct access //
58 
71  template<typename Column>
72  std::unique_ptr<abstract_column>& at()
73  {
74  throw_check_contains<Column>();
75  return columns_.at(std::type_index{typeid(Column)});
76  }
77 
78  // value extraction //
79 
92  template<typename Column>
93  typename Column::data_type& extract()
94  {
95  throw_check_contains<Column>();
96  return columns_.at(std::type_index{typeid(Column)})->extract<Column>();
97  }
98 
102  template<typename Column>
103  const typename Column::data_type& extract() const
104  {
105  throw_check_contains<Column>();
106  return columns_.at(std::type_index{typeid(Column)})->extract<Column>();
107  }
108 
109  // column insertion/rewrite //
110 
125  template<typename Column, typename... Args>
126  void insert_or_assign(Args&&... args)
127  {
128  static_assert(std::is_constructible_v<Column, Args&&...>,
129  "Cannot construct the given column from the provided arguments.");
130  columns_.insert_or_assign(
131  std::type_index{typeid(Column)},
132  std::make_unique<Column>(std::forward<Args>(args)...));
133  }
134 
150  template<typename Column>
151  void raw_insert_or_assign(std::unique_ptr<abstract_column> column_ptr)
152  {
153  columns_.insert_or_assign(std::type_index{typeid(Column)}, std::move(column_ptr));
154  }
155 
156  // column check //
157 
159  std::size_t size() const
160  {
161  return columns_.size();
162  }
163 
167  template<typename Column>
168  bool contains() const
169  {
170  return columns_.count(std::type_index{typeid(Column)});
171  }
172 
173  // column removal //
174 
179  template<typename Column>
180  void erase()
181  {
182  throw_check_contains<Column>();
183  columns_.erase(std::type_index{typeid(Column)});
184  }
185 
186  // batching utilities //
187 
193  std::size_t batch_size() const
194  {
195  if (columns_.empty()) return 0;
196  std::size_t batch_size = columns_.begin()->second->size();
197  for (auto it = ++columns_.begin(); it != columns_.end(); ++it) {
198  if (it->second->size() != batch_size) {
199  throw std::runtime_error{"hipipe: Canot deduce a batch size from a batch "
200  "with columns of different size (`" + it->second->name() + "`)."};
201  }
202  batch_size = it->second->size();
203  }
204  return batch_size;
205  }
206 
214  batch take(std::size_t n)
215  {
216  batch new_batch;
217  for (const auto& [key, col] : columns_) {
218  new_batch.columns_.insert_or_assign(key, col->take(n));
219  }
220  return new_batch;
221  }
222 
231  void push_back(batch rhs)
232  {
233  for (auto& [key, col] : rhs.columns_) {
234  if (!columns_.count(key)) {
235  columns_[key] = std::move(col);
236  } else {
237  columns_.at(key)->push_back(std::move(col));
238  }
239  }
240  }
241 
248  #ifdef HIPIPE_BUILD_PYTHON
249  boost::python::dict to_python()
250  {
251  boost::python::dict res;
252  for (auto it = columns_.begin(); it != columns_.end(); ++it) {
253  res[it->second->name()] = it->second->to_python();
254  }
255  columns_.clear();
256  return res;
257  }
258  #endif
259 };
260 
261 
263 using batch_t = batch;
264 
265 } // namespace hipipe::stream
hipipe::stream::batch
Container for multiple columns.
Definition: batch_t.hpp:34
hipipe::stream::batch::push_back
void push_back(batch rhs)
Concatenate the columns from two batches.
Definition: batch_t.hpp:242
hipipe::stream::batch::raw_insert_or_assign
void raw_insert_or_assign(std::unique_ptr< abstract_column > column_ptr)
Insert a raw column handle to the batch or rewrite an existing one.
Definition: batch_t.hpp:162
hipipe::stream::batch::size
std::size_t size() const
Get the number of columns in the batch.
Definition: batch_t.hpp:170
hipipe::stream::batch::at
std::unique_ptr< abstract_column > & at()
Retrieve the handle (unique pointer reference) to the given column.
Definition: batch_t.hpp:83
hipipe::stream::batch::extract
Column::data_type & extract()
Extract a reference to the stored data of the given column.
Definition: batch_t.hpp:104
hipipe::stream::batch::contains
bool contains() const
Check whether the given column is present in the batch.
Definition: batch_t.hpp:179
hipipe::stream::batch::batch_size
std::size_t batch_size() const
Calculate the batch size.
Definition: batch_t.hpp:204
hipipe::stream::batch::take
batch take(std::size_t n)
Steal the given number of examples from all the columns and create a new batch of them.
Definition: batch_t.hpp:225
hipipe::stream::batch::erase
void erase()
Remove the given column from the batch.
Definition: batch_t.hpp:191
hipipe::stream::batch::insert_or_assign
void insert_or_assign(Args &&... args)
Insert a new column to the batch or overwrite an existing one.
Definition: batch_t.hpp:137