Spicy
filter.h
1 // Copyright (c) 2020-2021 by the Zeek Project. See LICENSE for details.
2 
3 #pragma once
4 
5 #include <utility>
6 
7 #include <hilti/rt/extension-points.h>
8 #include <hilti/rt/types/reference.h>
9 #include <hilti/rt/types/stream.h>
10 
11 #include <spicy/rt/debug.h>
12 #include <spicy/rt/typedefs.h>
13 
14 namespace spicy::rt::filter {
15 namespace detail {
16 
18 template<typename T, typename = int>
19 struct is_filter : std::false_type {};
20 
21 template<typename T>
22 struct is_filter<T, decltype((void)T::__forward, 0)> : std::true_type {};
23 
24 struct OneFilter {
25  using Parse1Function = std::function<hilti::rt::Resumable(hilti::rt::ValueReference<hilti::rt::Stream>&,
26  const std::optional<hilti::rt::stream::View>&)>;
27 
28  OneFilter() = default;
29  OneFilter(OneFilter&&) = default;
30  OneFilter(const OneFilter&) = delete;
31  OneFilter(Parse1Function _parse, hilti::rt::ValueReference<hilti::rt::Stream> _input,
32  hilti::rt::Resumable _resumable)
33  : parse(std::move(_parse)), input(std::move(_input)), resumable(std::move(_resumable)) {}
34 
35  Parse1Function parse;
37  hilti::rt::Resumable resumable;
38 };
39 
45 
51 
52 } // namespace detail
53 
69 template<const char* debug_type_name>
70 struct State {
73 
76 
78  operator bool() const { return __filters && (*__filters).size(); }
79 
81  using _ParserDummy = struct { const char* name; };
82 
84  inline static _ParserDummy __parser = _ParserDummy{.name = debug_type_name};
85 };
86 
87 template<const char* debug_type_name>
88 inline std::ostream& operator<<(std::ostream& out, State<debug_type_name>& s) {
89  out << s.__parser.name;
90  return out;
91 }
92 
101 template<typename S>
102 void disconnect(S& state) {
103  if ( state.__filters ) {
104  for ( auto& f : *state.__filters ) {
105  SPICY_RT_DEBUG_VERBOSE(
106  hilti::rt::fmt("- disconnecting existing filter unit from unit %s [%p]", S::__parser.name, &state));
107  f.resumable.abort();
108  }
109 
110  (*state.__filters).clear(); // Will invalidate the targets' output
111  }
112 
113  if constexpr ( detail::is_filter<S>::value ) {
114  if ( state.__forward ) {
115  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- sending EOD from filter unit %s [%p] to stream %p on disconnect",
116  S::__parser.name, &state, state.__forward.get()));
117  (*state.__forward).freeze();
118  }
119  }
120 }
121 
122 template<typename U>
123 void disconnect(UnitType<U>& unit) {
124  return disconnect(*unit);
125 }
126 
138 template<typename S, typename F>
139 void connect(S& state, UnitRef<F> filter_unit) {
140  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- connecting filter unit %s [%p] to unit %s [%p]", F::__parser.name,
141  &*filter_unit, S::__parser.name, &state));
142 
143  if ( ! state.__filters )
144  state.__filters = hilti::rt::reference::make_strong<::spicy::rt::filter::detail::Filters>();
145 
146  auto filter = detail::OneFilter{[filter_unit](hilti::rt::ValueReference<hilti::rt::Stream>& data,
147  const std::optional<hilti::rt::stream::View>& cur) mutable
149  auto lhs_filter_unit = filter_unit.derefAsValue();
150  auto parse2 = hilti::rt::any_cast<Parse2Function<F>>(F::__parser.parse2);
151  SPICY_RT_DEBUG_VERBOSE(
152  hilti::rt::fmt(" + parsing from stream %p, forwarding to stream %p",
153  data.get(), lhs_filter_unit->__forward.get()));
154  return (*parse2)(lhs_filter_unit, data, cur, {});
155  },
157  {}};
158 
159  (*state.__filters).push_back(std::move(filter));
160  filter_unit->__forward = (*state.__filters).back().input;
161 }
162 
163 template<typename U, typename F>
164 void connect(UnitType<U>& unit, UnitRef<F> filter_unit) {
165  return connect(*unit, filter_unit);
166 }
167 
174 template<typename S>
176  S& state, // NOLINT(google-runtime-references)
177  hilti::rt::ValueReference<hilti::rt::Stream>& data, // NOLINT(google-runtime-references)
178  const hilti::rt::stream::View& cur) {
179  if ( ! (state.__filters && (*state.__filters).size()) )
180  return {};
181 
182  detail::OneFilter* previous = nullptr;
183 
184  for ( auto& f : *state.__filters ) {
185  SPICY_RT_DEBUG_VERBOSE(
186  hilti::rt::fmt("- beginning to filter input for unit %s [%p]", S::__parser.name, &state));
187 
188  if ( ! previous )
189  f.resumable = f.parse(data, cur);
190  else
191  f.resumable = f.parse(previous->input, previous->input->view());
192 
193  previous = &f;
194  }
195 
196  return hilti::rt::StrongReference<hilti::rt::Stream>((*state.__filters).back().input);
197 }
198 
199 template<typename U>
201  UnitType<U>& unit, // NOLINT(google-runtime-references)
202  hilti::rt::ValueReference<hilti::rt::Stream>& data, // NOLINT(google-runtime-references)
203  const hilti::rt::stream::View& cur) {
204  return init(*unit, data, cur);
205 }
206 
213 template<typename S>
214 inline void forward(S& state, const hilti::rt::Bytes& data) {
215  if ( ! state.__forward ) {
216  SPICY_RT_DEBUG_VERBOSE(
217  hilti::rt::fmt("- filter unit %s [%p] is forwarding \"%s\", but not connected to any unit",
218  S::__parser.name, &state, data));
219  return;
220  }
221 
222  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- filter unit %s [%p] is forwarding \"%s\" to stream %p", S::__parser.name,
223  &state, data, state.__forward.get()));
224  state.__forward->append(data);
225 }
226 
227 template<typename U>
228 inline void forward(UnitType<U>& unit, const hilti::rt::Bytes& data) {
229  return forward(*unit, data);
230 }
231 
238 template<typename S>
239 inline void forward_eod(S& state) {
240  if ( ! state.__forward ) {
241  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- filter unit %s [%p] is forwarding EOD, but not connected to any unit",
242  S::__parser.name, &state));
243  return;
244  }
245 
246  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("- filter unit %s [%p] is forwarding EOD to stream %p", S::__parser.name,
247  &state, state.__forward.get()));
248  state.__forward->freeze();
249 }
250 
251 template<typename U>
252 inline void forward_eod(UnitType<U>& unit) {
253  return forward_eod(*unit);
254 }
255 
262  for ( auto& f : (*filters) )
263  f.resumable.resume();
264 }
265 
272 template<typename S>
273 inline void flush(S& state) {
274  flush(state.__filters);
275 }
276 
277 template<typename U>
278 inline void flush(UnitType<U>& unit) {
279  flush(*unit);
280 }
281 
282 } // namespace spicy::rt::filter
hilti::rt::StrongReference<::spicy::rt::filter::detail::Filters > __filters
Definition: filter.h:72
Definition: reference.h:497
Definition: bytes.h:158
Definition: stream.h:983
const T * get() const
Definition: reference.h:106
Definition: reference.h:345
hilti::rt::WeakReference<::spicy::rt::filter::detail::Forward > __forward
Definition: filter.h:75
Definition: stream.h:1407
Definition: vector.h:251
Definition: fiber.h:295
Definition: filter.h:14
ValueReference< T > derefAsValue() const
Definition: reference.h:387
Definition: filter.h:70
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
View view(bool expanding=true) const
Definition: stream.h:1580