Spicy
filter.h
1 // Copyright (c) 2020-2021 by the Zeek Project. See LICENSE for details.
2 
3 #pragma once
4 
5 #include <utility>
6 
7 #include <hilti/rt/extension-points.h>
8 #include <hilti/rt/types/reference.h>
9 #include <hilti/rt/types/stream.h>
10 
11 #include <spicy/rt/debug.h>
12 #include <spicy/rt/typedefs.h>
13 
14 namespace spicy::rt::filter {
15 namespace detail {
16 
18 template<typename T, typename = int>
19 struct is_filter : std::false_type {};
20 
21 template<typename T>
22 struct is_filter<T, decltype((void)T::__forward, 0)> : std::true_type {};
23 
24 struct OneFilter {
25  using Parse1Function = std::function<hilti::rt::Resumable(hilti::rt::ValueReference<hilti::rt::Stream>&,
26  const std::optional<hilti::rt::stream::View>&)>;
27 
28  OneFilter() = default;
29  OneFilter(OneFilter&&) = default;
30  OneFilter(const OneFilter&) = delete;
31  OneFilter(Parse1Function _parse, hilti::rt::ValueReference<hilti::rt::Stream> _input,
32  hilti::rt::Resumable _resumable)
33  : parse(std::move(_parse)), input(std::move(_input)), resumable(std::move(_resumable)) {}
34 
35  Parse1Function parse;
37  hilti::rt::Resumable resumable;
38 };
39 
45 
51 
52 } // namespace detail
53 
69 template<const char* debug_type_name>
70 struct State {
73 
76 
78  operator bool() const { return __filters && (*__filters).size(); }
79 
81  using _ParserDummy = struct {
82  const char* name;
83  };
84 
86  inline static _ParserDummy __parser = _ParserDummy{.name = debug_type_name};
87 };
88 
89 template<const char* debug_type_name>
90 inline std::ostream& operator<<(std::ostream& out, State<debug_type_name>& s) {
91  out << s.__parser.name;
92  return out;
93 }
94 
103 template<typename S>
104 void disconnect(S& state) {
105  if ( state.__filters ) {
106  for ( auto& f : *state.__filters ) {
107  SPICY_RT_DEBUG_VERBOSE(
108  hilti::rt::fmt("- disconnecting existing filter unit from unit %s [%p]", S::__parser.name, &state));
109  f.resumable.abort();
110  }
111 
112  (*state.__filters).clear(); // Will invalidate the targets' output
113  }
114 
115  if constexpr ( detail::is_filter<S>::value ) {
116  if ( state.__forward ) {
117  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- sending EOD from filter unit %s [%p] to stream %p on disconnect",
118  S::__parser.name, &state, state.__forward.get()));
119  (*state.__forward).freeze();
120  }
121  }
122 }
123 
124 template<typename U>
125 void disconnect(UnitType<U>& unit) {
126  return disconnect(*unit);
127 }
128 
140 template<typename S, typename F>
141 void connect(S& state, UnitRef<F> filter_unit) {
142  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- connecting filter unit %s [%p] to unit %s [%p]", F::__parser.name,
143  &*filter_unit, S::__parser.name, &state));
144 
145  if ( ! state.__filters )
146  state.__filters = hilti::rt::reference::make_strong<::spicy::rt::filter::detail::Filters>();
147 
148  auto filter = detail::OneFilter{[filter_unit](hilti::rt::ValueReference<hilti::rt::Stream>& data,
149  const std::optional<hilti::rt::stream::View>& cur) mutable
151  auto lhs_filter_unit = filter_unit.derefAsValue();
152  auto parse2 = hilti::rt::any_cast<Parse2Function<F>>(F::__parser.parse2);
153  SPICY_RT_DEBUG_VERBOSE(
154  hilti::rt::fmt(" + parsing from stream %p, forwarding to stream %p",
155  data.get(), lhs_filter_unit->__forward.get()));
156  return (*parse2)(lhs_filter_unit, data, cur, {});
157  },
159  {}};
160 
161  (*state.__filters).push_back(std::move(filter));
162  filter_unit->__forward = (*state.__filters).back().input;
163 }
164 
165 template<typename U, typename F>
166 void connect(UnitType<U>& unit, UnitRef<F> filter_unit) {
167  return connect(*unit, filter_unit);
168 }
169 
176 template<typename S>
178  S& state, // NOLINT(google-runtime-references)
179  hilti::rt::ValueReference<hilti::rt::Stream>& data, // NOLINT(google-runtime-references)
180  const hilti::rt::stream::View& cur) {
181  if ( ! (state.__filters && (*state.__filters).size()) )
182  return {};
183 
184  detail::OneFilter* previous = nullptr;
185 
186  for ( auto& f : *state.__filters ) {
187  SPICY_RT_DEBUG_VERBOSE(
188  hilti::rt::fmt("- beginning to filter input for unit %s [%p]", S::__parser.name, &state));
189 
190  if ( ! previous )
191  f.resumable = f.parse(data, cur);
192  else
193  f.resumable = f.parse(previous->input, previous->input->view());
194 
195  previous = &f;
196  }
197 
198  return hilti::rt::StrongReference<hilti::rt::Stream>((*state.__filters).back().input);
199 }
200 
201 template<typename U>
203  UnitType<U>& unit, // NOLINT(google-runtime-references)
204  hilti::rt::ValueReference<hilti::rt::Stream>& data, // NOLINT(google-runtime-references)
205  const hilti::rt::stream::View& cur) {
206  return init(*unit, data, cur);
207 }
208 
215 template<typename S>
216 inline void forward(S& state, const hilti::rt::Bytes& data) {
217  if ( ! state.__forward ) {
218  SPICY_RT_DEBUG_VERBOSE(
219  hilti::rt::fmt("- filter unit %s [%p] is forwarding \"%s\", but not connected to any unit",
220  S::__parser.name, &state, data));
221  return;
222  }
223 
224  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- filter unit %s [%p] is forwarding \"%s\" to stream %p", S::__parser.name,
225  &state, data, state.__forward.get()));
226  state.__forward->append(data);
227 }
228 
229 template<typename U>
230 inline void forward(UnitType<U>& unit, const hilti::rt::Bytes& data) {
231  return forward(*unit, data);
232 }
233 
240 template<typename S>
241 inline void forward_eod(S& state) {
242  if ( ! state.__forward ) {
243  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- filter unit %s [%p] is forwarding EOD, but not connected to any unit",
244  S::__parser.name, &state));
245  return;
246  }
247 
248  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- filter unit %s [%p] is forwarding EOD to stream %p", S::__parser.name,
249  &state, state.__forward.get()));
250  state.__forward->freeze();
251 }
252 
253 template<typename U>
254 inline void forward_eod(UnitType<U>& unit) {
255  return forward_eod(*unit);
256 }
257 
264  for ( auto& f : (*filters) )
265  f.resumable.resume();
266 }
267 
274 template<typename S>
275 inline void flush(S& state) {
276  flush(state.__filters);
277 }
278 
279 template<typename U>
280 inline void flush(UnitType<U>& unit) {
281  flush(*unit);
282 }
283 
284 } // namespace spicy::rt::filter
hilti::rt::StrongReference<::spicy::rt::filter::detail::Filters > __filters
Definition: filter.h:72
Definition: reference.h:497
Definition: bytes.h:157
Definition: stream.h:1001
const T * get() const
Definition: reference.h:106
Definition: reference.h:345
hilti::rt::WeakReference<::spicy::rt::filter::detail::Forward > __forward
Definition: filter.h:75
Definition: stream.h:1407
Definition: vector.h:251
Definition: fiber.h:295
Definition: filter.h:14
ValueReference< T > derefAsValue() const
Definition: reference.h:387
Definition: filter.h:70
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
View view(bool expanding=true) const
Definition: stream.h:1577