Spicy
filter.h
1 // Copyright (c) 2020-2021 by the Zeek Project. See LICENSE for details.
2 
3 #pragma once
4 
5 #include <utility>
6 
7 #include <hilti/rt/extension-points.h>
8 #include <hilti/rt/types/reference.h>
9 #include <hilti/rt/types/stream.h>
10 
11 #include <spicy/rt/debug.h>
12 #include <spicy/rt/typedefs.h>
13 
14 namespace spicy::rt::filter {
15 namespace detail {
16 
18 template<typename T, typename = int>
19 struct is_filter : std::false_type {};
20 
21 template<typename T>
22 struct is_filter<T, decltype((void)T::__forward, 0)> : std::true_type {};
23 
24 struct OneFilter {
25  using Parse1Function = std::function<hilti::rt::Resumable(hilti::rt::ValueReference<hilti::rt::Stream>&,
26  const std::optional<hilti::rt::stream::View>&)>;
27 
28  Parse1Function parse;
30  hilti::rt::Resumable resumable;
31 };
32 
38 
44 
45 } // namespace detail
46 
62 template<const char* debug_type_name>
63 struct State {
66 
69 
71  operator bool() const { return __filters && (*__filters).size(); }
72 
74  using _ParserDummy = struct { const char* name; };
75 
77  inline static _ParserDummy __parser = _ParserDummy{.name = debug_type_name};
78 };
79 
80 template<const char* debug_type_name>
81 inline std::ostream& operator<<(std::ostream& out, State<debug_type_name>& s) {
82  out << s.__parser.name;
83  return out;
84 }
85 
94 template<typename S>
95 void disconnect(S& state) {
96  if ( state.__filters ) {
97  for ( auto& f : *state.__filters ) {
98  SPICY_RT_DEBUG_VERBOSE(
99  hilti::rt::fmt("- disconnecting existing filter unit from unit %s [%p]", S::__parser.name, &state));
100  f.resumable.abort();
101  }
102 
103  (*state.__filters).clear(); // Will invalidate the targets' output
104  }
105 
106  if constexpr ( detail::is_filter<S>::value ) {
107  if ( state.__forward ) {
108  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- sending EOD from filter unit %s [%p] to stream %p on disconnect",
109  S::__parser.name, &state, state.__forward.get()));
110  (*state.__forward).freeze();
111  }
112  }
113 }
114 
115 template<typename U>
116 void disconnect(UnitType<U>& unit) {
117  return disconnect(*unit);
118 }
119 
131 template<typename S, typename F>
132 void connect(S& state, UnitRef<F> filter_unit) {
133  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- connecting filter unit %s [%p] to unit %s [%p]", F::__parser.name,
134  &*filter_unit, S::__parser.name, &state));
135 
136  if ( ! state.__filters )
137  state.__filters = hilti::rt::reference::make_strong<::spicy::rt::filter::detail::Filters>();
138 
139  auto filter = detail::OneFilter{.parse = [filter_unit](hilti::rt::ValueReference<hilti::rt::Stream>& data,
140  const std::optional<hilti::rt::stream::View>& cur) mutable
142  auto lhs_filter_unit = filter_unit.derefAsValue();
143  auto parse2 = hilti::rt::any_cast<Parse2Function<F>>(F::__parser.parse2);
144  SPICY_RT_DEBUG_VERBOSE(
145  hilti::rt::fmt(" + parsing from stream %p, forwarding to stream %p",
146  data.get(), lhs_filter_unit->__forward.get()));
147  return (*parse2)(lhs_filter_unit, data, cur, {});
148  },
149  .input = hilti::rt::Stream()};
150 
151  (*state.__filters).push_back(std::move(filter));
152  filter_unit->__forward = (*state.__filters).back().input;
153 }
154 
155 template<typename U, typename F>
156 void connect(UnitType<U>& unit, UnitRef<F> filter_unit) {
157  return connect(*unit, filter_unit);
158 }
159 
166 template<typename S>
168  S& state, // NOLINT(google-runtime-references)
169  hilti::rt::ValueReference<hilti::rt::Stream>& data, // NOLINT(google-runtime-references)
170  const hilti::rt::stream::View& cur) {
171  if ( ! (state.__filters && (*state.__filters).size()) )
172  return {};
173 
174  detail::OneFilter* previous = nullptr;
175 
176  for ( auto& f : *state.__filters ) {
177  SPICY_RT_DEBUG_VERBOSE(
178  hilti::rt::fmt("- beginning to filter input for unit %s [%p]", S::__parser.name, &state));
179 
180  if ( ! previous )
181  f.resumable = f.parse(data, cur);
182  else
183  f.resumable = f.parse(previous->input, previous->input->view());
184 
185  previous = &f;
186  }
187 
188  return hilti::rt::StrongReference<hilti::rt::Stream>((*state.__filters).back().input);
189 }
190 
191 template<typename U>
193  UnitType<U>& unit, // NOLINT(google-runtime-references)
194  hilti::rt::ValueReference<hilti::rt::Stream>& data, // NOLINT(google-runtime-references)
195  const hilti::rt::stream::View& cur) {
196  return init(*unit, data, cur);
197 }
198 
205 template<typename S>
206 inline void forward(S& state, const hilti::rt::Bytes& data) {
207  if ( ! state.__forward ) {
208  SPICY_RT_DEBUG_VERBOSE(
209  hilti::rt::fmt("- filter unit %s [%p] is forwarding \"%s\", but not connected to any unit",
210  S::__parser.name, &state, data));
211  return;
212  }
213 
214  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- filter unit %s [%p] is forwarding \"%s\" to stream %p", S::__parser.name,
215  &state, data, state.__forward.get()));
216  state.__forward->append(data);
217 }
218 
219 template<typename U>
220 inline void forward(UnitType<U>& unit, const hilti::rt::Bytes& data) {
221  return forward(*unit, data);
222 }
223 
230 template<typename S>
231 inline void forward_eod(S& state) {
232  if ( ! state.__forward ) {
233  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- filter unit %s [%p] is forwarding EOD, but not connected to any unit",
234  S::__parser.name, &state));
235  return;
236  }
237 
238  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- filter unit %s [%p] is forwarding EOD to stream %p", S::__parser.name,
239  &state, state.__forward.get()));
240  state.__forward->freeze();
241 }
242 
243 template<typename U>
244 inline void forward_eod(UnitType<U>& unit) {
245  return forward_eod(*unit);
246 }
247 
254  for ( auto& f : (*filters) )
255  f.resumable.resume();
256 }
257 
264 template<typename S>
265 inline void flush(S& state) {
266  flush(state.__filters);
267 }
268 
269 template<typename U>
270 inline void flush(UnitType<U>& unit) {
271  flush(*unit);
272 }
273 
274 } // namespace spicy::rt::filter
hilti::rt::StrongReference<::spicy::rt::filter::detail::Filters > __filters
Definition: filter.h:65
Definition: reference.h:477
Definition: bytes.h:154
Definition: stream.h:978
const T * get() const
Definition: reference.h:106
Definition: reference.h:328
hilti::rt::WeakReference<::spicy::rt::filter::detail::Forward > __forward
Definition: filter.h:68
Definition: stream.h:1379
Definition: vector.h:251
Definition: fiber.h:274
Definition: filter.h:14
ValueReference< T > derefAsValue() const
Definition: reference.h:370
Definition: filter.h:63
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
View view(bool expanding=true) const
Definition: stream.h:1547