Spicy
sink.h
1 // Copyright (c) 2020-2021 by the Zeek Project. See LICENSE for details.
2 
3 #pragma once
4 
5 #include <list>
6 #include <string>
7 #include <type_traits>
8 #include <utility>
9 #include <vector>
10 
11 #include <hilti/rt/exception.h>
12 #include <hilti/rt/extension-points.h>
13 #include <hilti/rt/types/bytes.h>
14 #include <hilti/rt/types/reference.h>
15 #include <hilti/rt/types/stream.h>
16 
17 #include <spicy/rt/debug.h>
18 #include <spicy/rt/filter.h>
19 #include <spicy/rt/mime.h>
20 #include <spicy/rt/parser-fwd.h>
21 #include <spicy/rt/typedefs.h>
22 
23 namespace spicy::rt {
24 
28 HILTI_EXCEPTION(SinkError, UserException)
29 
30 namespace sink {
31 enum class ReassemblerPolicy { First };
32 } // namespace sink
33 
34 namespace sink::detail {
35 
37 template<typename T, typename = int>
38 struct supports_sinks : std::false_type {};
39 
40 template<typename T>
41 struct supports_sinks<T, decltype((void)T::__sink, 0)> : std::true_type {};
42 
44 struct State {
47 
50 
51  Parser* parser;
52 };
53 
58 template<typename U>
59 auto _connectUnit(UnitRef<U>& unit) {
60  auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::__parser.parse2);
61 
62  auto self = hilti::rt::ValueReference<U>::self(&*unit);
63 
64  auto& state = unit->__sink;
65  state = new sink::detail::State(); // NOLINT
66  state->resumable = (*parse2)(self, state->data, {}, {}); // Kick-off parsing with empty data.
67  state->parser = &U::__parser;
68  return state;
69 }
70 
76 template<typename U>
77 ::spicy::rt::detail::ParseSinkFunction parseFunction() {
78  return []() {
79  auto unit = UnitRef<U>(U());
80  return std::make_pair(std::move(unit), _connectUnit(unit));
81  };
82 }
83 
93 template<typename Unit, auto Hook, typename... Args>
94 auto hookFunction() {
95  return [](const hilti::rt::StrongReferenceGeneric& u, Args&&... args) -> void {
96  auto unit = u.as<Unit>();
97  ((*unit).*Hook)(std::forward<Args>(args)...);
98  };
99 }
100 
101 // Name used as template parameter for sink's filter state. */
102 inline const char sink_name[] = "__sink__";
103 } // namespace sink::detail
104 
112 class Sink {
113 public:
114  Sink() { _init(); } // NOLINT(hicpp-member-init)
115  ~Sink() { _close(true); }
116 
117  Sink(const Sink&) = delete;
118  Sink(Sink&&) = default;
119  Sink& operator=(const Sink&) = delete;
120  Sink& operator=(Sink&&) = default;
121 
128  template<typename T>
130  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("connecting parser %s [%p] to sink %p", T::__parser.name, &*unit, this));
131  auto state = spicy::rt::sink::detail::_connectUnit(unit);
132  _units.emplace_back(std::move(unit));
133  _states.emplace_back(std::move(state));
134  }
135 
145  template<typename T>
147  if ( _size )
148  throw SinkError("cannot connect filter after data has been forwarded already");
149 
150  SPICY_RT_DEBUG_VERBOSE(
151  hilti::rt::fmt("connecting filter unit %s [%p] to sink %p", T::__parser.name, &*unit, this));
152  spicy::rt::filter::connect(_filter, unit);
153  }
154 
159  void close() { _close(true); }
160 
168  void connect_mime_type(const MIMEType& mt);
169 
178  void connect_mime_type(const std::string& mt) { connect_mime_type(MIMEType(mt)); }
179 
188 
195  void gap(uint64_t seq, uint64_t len);
196 
200  uint64_t sequence_number() const { return _initial_seq + _cur_rseq; }
201 
207  void set_auto_trim(bool enable) { _auto_trim = enable; }
208 
214  void set_initial_sequence_number(uint64_t seq) {
215  if ( _haveInput() ) {
216  _close(false);
217  throw SinkError("sink cannot update initial sequence number after activity has already been seen");
218  }
219 
220  _initial_seq = seq;
221  }
222 
224  void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
225 
229  uint64_t size() const { return _size; }
230 
236  void skip(uint64_t seq);
237 
243  void trim(uint64_t seq);
244 
252  void write(hilti::rt::Bytes data, std::optional<uint64_t> seq = {}, std::optional<uint64_t> len = {});
253 
262 
263 private:
264  struct Chunk {
265  std::optional<hilti::rt::Bytes> data; // Data at +1; unset for gap
266  uint64_t rseq; // Sequence number of first byte.
267  uint64_t rupper; // Sequence number of last byte + 1.
268 
269  Chunk(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper)
270  : data(std::move(data)), rseq(rseq), rupper(rupper) {}
271  };
272 
273  using ChunkList = std::list<Chunk>;
274 
275  // Returns true if any input has been passed in already (including gaps).
276  bool _haveInput() { return _cur_rseq || _chunks.size(); }
277 
278  // Backend for disconnecting the sink. If orderly, connected units get a
279  // chance to parse any remaining input; otherwise we abort directly.
280  void _close(bool orderly);
281 
282  // Turns an absolute sequence number into a relative one.
283  uint64_t _rseq(uint64_t seq) const {
284  // I believe this does the right thing for wrap-around ...
285  return seq - _initial_seq;
286  }
287 
288  // Turns a relative sequence number into an absolute one.
289  uint64_t _aseq(uint64_t rseq) const {
290  // I believe this does the right thing for wrap-around ...
291  return _initial_seq + rseq;
292  }
293 
294  // (Re-)initialize instance.
295  void _init();
296 
297  // Add new data to buffer, beginning search for insert position at given start *c*.
298  ChunkList::iterator _addAndCheck(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper,
299  ChunkList::iterator c);
300 
301  // Deliver data to connected parsers. Returns false if the data is empty (i.e., a gap).
302  bool _deliver(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper);
303 
304  // Entry point for all new data. If not bytes instance is given, that signals a gap.
305  void _newData(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t len);
306 
307  // Skip up to sequence number.
308  void _skip(uint64_t rseq);
309 
310  // Trim up to sequence number.
311  void _trim(uint64_t rseq);
312 
313  // Deliver as much as possible starting at given buffer position.
314  void _tryDeliver(ChunkList::iterator c);
315 
316  // Trigger various hooks.
317  void _reportGap(uint64_t rseq, uint64_t len) const;
318  void _reportOverlap(uint64_t rseq, const hilti::rt::Bytes& old, const hilti::rt::Bytes& new_) const;
319  void _reportSkipped(uint64_t rseq) const;
320  void _reportUndelivered(uint64_t rseq, const hilti::rt::Bytes& data) const;
321  void _reportUndeliveredUpTo(uint64_t rupper) const;
322 
323  // Output reassembler state for debugging.
324  void _debugReassembler(const std::string& msg, const std::optional<hilti::rt::Bytes>& data, uint64_t seq,
325  uint64_t len) const;
326  void _debugReassemblerBuffer(const std::string& msg) const;
327  void _debugDeliver(const hilti::rt::Bytes& data) const;
328 
329  // States for connected units.
330  std::vector<sink::detail::State*> _states;
331 
332  // Must come after `_state` as it's keeping the states around.
333  std::vector<hilti::rt::StrongReferenceGeneric> _units;
334 
335  // Filter input and output.
336  struct FilterData {
339  hilti::rt::stream::View output_cur;
340  };
341 
342  std::optional<FilterData> _filter_data;
343 
344  // Reassembly state.
345  sink::ReassemblerPolicy _policy; // Current policy
346  bool _auto_trim{}; // True if automatic trimming is enabled.
347  uint64_t _size{};
348  uint64_t _initial_seq{}; // Initial sequence number.
349  uint64_t _cur_rseq{}; // Sequence of last delivered byte + 1 (i.e., seq of next)
350  uint64_t _last_reassem_rseq{}; // Sequence of last byte reassembled and delivered + 1.
351  uint64_t _trim_rseq{}; // Sequence of last byte trimmed so far + 1.
352  ChunkList _chunks; // Buffered data not yet delivered or trimmed
353 };
354 
355 } // namespace spicy::rt
356 
357 namespace hilti::rt::detail::adl {
358 std::string to_string(const spicy::rt::Sink& /* x */, adl::tag /*unused*/);
359 std::string to_string(const spicy::rt::sink::ReassemblerPolicy& x, adl::tag /*unused*/);
360 } // namespace hilti::rt::detail::adl
void set_auto_trim(bool enable)
Definition: sink.h:207
hilti::rt::Resumable resumable
Definition: sink.h:49
uint64_t size() const
Definition: sink.h:229
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:214
uint64_t sequence_number() const
Definition: sink.h:200
Definition: reference.h:633
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:129
void close()
Definition: sink.h:159
Definition: bytes.h:153
void connect_mime_type(const std::string &mt)
Definition: sink.h:178
Definition: hook.h:17
Definition: stream.h:978
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:46
Definition: mime.h:29
T * as() const
Definition: reference.h:647
void connect_filter(spicy::rt::UnitRef< T > unit)
Definition: sink.h:146
Definition: sink.h:44
Definition: reference.h:321
static ValueReference self(T *t)
Definition: reference.h:255
const std::string & str() const &
Definition: bytes.h:214
Definition: deferred-expression.h:41
Definition: fiber.h:274
void connect_mime_type(const hilti::rt::Bytes &mt)
Definition: sink.h:187
Definition: parser.h:137
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:224
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:261
Definition: sink.h:112