Spicy
sink.h
1 // Copyright (c) 2020-2021 by the Zeek Project. See LICENSE for details.
2 
3 #pragma once
4 
5 #include <list>
6 #include <string>
7 #include <type_traits>
8 #include <utility>
9 #include <vector>
10 
11 #include <hilti/rt/exception.h>
12 #include <hilti/rt/extension-points.h>
13 #include <hilti/rt/types/bytes.h>
14 #include <hilti/rt/types/integer.h>
15 #include <hilti/rt/types/reference.h>
16 #include <hilti/rt/types/stream.h>
17 
18 #include <spicy/rt/debug.h>
19 #include <spicy/rt/filter.h>
20 #include <spicy/rt/mime.h>
21 #include <spicy/rt/parser-fwd.h>
22 #include <spicy/rt/typedefs.h>
23 
24 namespace spicy::rt {
25 
29 HILTI_EXCEPTION(SinkError, UserException)
30 
31 namespace sink {
32 enum class ReassemblerPolicy { First };
33 } // namespace sink
34 
35 namespace sink::detail {
36 
38 template<typename T, typename = int>
39 struct supports_sinks : std::false_type {};
40 
41 template<typename T>
42 struct supports_sinks<T, decltype((void)T::__sink, 0)> : std::true_type {};
43 
45 struct State {
48 
51 
52  Parser* parser;
53 };
54 
59 template<typename U>
60 auto _connectUnit(UnitRef<U>& unit) {
61  auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::__parser.parse2);
62 
63  auto self = hilti::rt::ValueReference<U>::self(&*unit);
64 
65  auto& state = unit->__sink;
66  state = new sink::detail::State(); // NOLINT
67  state->resumable = (*parse2)(self, state->data, {}, {}); // Kick-off parsing with empty data.
68  state->parser = &U::__parser;
69  return state;
70 }
71 
77 template<typename U>
78 ::spicy::rt::detail::ParseSinkFunction parseFunction() {
79  return []() {
80  auto unit = UnitRef<U>(U());
81  return std::make_pair(std::move(unit), _connectUnit(unit));
82  };
83 }
84 
94 template<typename Unit, auto Hook, typename... Args>
95 auto hookFunction() {
96  return [](const hilti::rt::StrongReferenceGeneric& u, Args&&... args) -> void {
97  auto unit = u.as<Unit>();
98  ((*unit).*Hook)(std::forward<Args>(args)...);
99  };
100 }
101 
102 // Name used as template parameter for sink's filter state. */
103 inline const char sink_name[] = "__sink__";
104 } // namespace sink::detail
105 
113 class Sink {
114 public:
115  Sink() { _init(); } // NOLINT(hicpp-member-init)
116  ~Sink() { _close(true); }
117 
118  Sink(const Sink&) = delete;
119  Sink(Sink&&) = default;
120  Sink& operator=(const Sink&) = delete;
121  Sink& operator=(Sink&&) = default;
122 
129  template<typename T>
131  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("connecting parser %s [%p] to sink %p", T::__parser.name, &*unit, this));
132  auto state = spicy::rt::sink::detail::_connectUnit(unit);
133  _units.emplace_back(std::move(unit));
134  _states.emplace_back(std::move(state));
135  }
136 
146  template<typename T>
148  if ( _size )
149  throw SinkError("cannot connect filter after data has been forwarded already");
150 
151  SPICY_RT_DEBUG_VERBOSE(
152  hilti::rt::fmt("connecting filter unit %s [%p] to sink %p", T::__parser.name, &*unit, this));
153  spicy::rt::filter::connect(_filter, unit);
154  }
155 
160  void close() { _close(true); }
161 
169  void connect_mime_type(const MIMEType& mt);
170 
179  void connect_mime_type(const std::string& mt) { connect_mime_type(MIMEType(mt)); }
180 
189 
196  void gap(uint64_t seq, uint64_t len);
197 
201  uint64_t sequence_number() const { return _initial_seq + _cur_rseq; }
202 
208  void set_auto_trim(bool enable) { _auto_trim = enable; }
209 
215  void set_initial_sequence_number(uint64_t seq) {
216  if ( _haveInput() ) {
217  _close(false);
218  throw SinkError("sink cannot update initial sequence number after activity has already been seen");
219  }
220 
221  _initial_seq = seq;
222  }
223 
225  void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
226 
230  hilti::rt::integer::safe<uint64_t> size() const { return _size; }
231 
237  void skip(uint64_t seq);
238 
244  void trim(uint64_t seq);
245 
253  void write(hilti::rt::Bytes data, std::optional<uint64_t> seq = {}, std::optional<uint64_t> len = {});
254 
263 
264 private:
265  struct Chunk {
266  std::optional<hilti::rt::Bytes> data; // Data at +1; unset for gap
267  uint64_t rseq; // Sequence number of first byte.
268  uint64_t rupper; // Sequence number of last byte + 1.
269 
270  Chunk(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper)
271  : data(std::move(data)), rseq(rseq), rupper(rupper) {}
272  };
273 
274  using ChunkList = std::list<Chunk>;
275 
276  // Returns true if any input has been passed in already (including gaps).
277  bool _haveInput() { return _cur_rseq || _chunks.size(); }
278 
279  // Backend for disconnecting the sink. If orderly, connected units get a
280  // chance to parse any remaining input; otherwise we abort directly.
281  void _close(bool orderly);
282 
283  // Turns an absolute sequence number into a relative one.
284  uint64_t _rseq(uint64_t seq) const {
285  // I believe this does the right thing for wrap-around ...
286  return seq - _initial_seq;
287  }
288 
289  // Turns a relative sequence number into an absolute one.
290  uint64_t _aseq(uint64_t rseq) const {
291  // I believe this does the right thing for wrap-around ...
292  return _initial_seq + rseq;
293  }
294 
295  // (Re-)initialize instance.
296  void _init();
297 
298  // Add new data to buffer, beginning search for insert position at given start *c*.
299  ChunkList::iterator _addAndCheck(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper,
300  ChunkList::iterator c);
301 
302  // Deliver data to connected parsers. Returns false if the data is empty (i.e., a gap).
303  bool _deliver(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper);
304 
305  // Entry point for all new data. If not bytes instance is given, that signals a gap.
306  void _newData(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t len);
307 
308  // Skip up to sequence number.
309  void _skip(uint64_t rseq);
310 
311  // Trim up to sequence number.
312  void _trim(uint64_t rseq);
313 
314  // Deliver as much as possible starting at given buffer position.
315  void _tryDeliver(ChunkList::iterator c);
316 
317  // Trigger various hooks.
318  void _reportGap(uint64_t rseq, uint64_t len) const;
319  void _reportOverlap(uint64_t rseq, const hilti::rt::Bytes& old, const hilti::rt::Bytes& new_) const;
320  void _reportSkipped(uint64_t rseq) const;
321  void _reportUndelivered(uint64_t rseq, const hilti::rt::Bytes& data) const;
322  void _reportUndeliveredUpTo(uint64_t rupper) const;
323 
324  // Output reassembler state for debugging.
325  void _debugReassembler(const std::string& msg, const std::optional<hilti::rt::Bytes>& data, uint64_t seq,
326  uint64_t len) const;
327  void _debugReassemblerBuffer(const std::string& msg) const;
328  void _debugDeliver(const hilti::rt::Bytes& data) const;
329 
330  // States for connected units.
331  std::vector<sink::detail::State*> _states;
332 
333  // Must come after `_state` as it's keeping the states around.
334  std::vector<hilti::rt::StrongReferenceGeneric> _units;
335 
336  // Filter input and output.
337  struct FilterData {
340  hilti::rt::stream::View output_cur;
341  };
342 
343  std::optional<FilterData> _filter_data;
344 
345  // Reassembly state.
346  sink::ReassemblerPolicy _policy; // Current policy
347  bool _auto_trim{}; // True if automatic trimming is enabled.
348  uint64_t _size{};
349  uint64_t _initial_seq{}; // Initial sequence number.
350  uint64_t _cur_rseq{}; // Sequence of last delivered byte + 1 (i.e., seq of next)
351  uint64_t _last_reassem_rseq{}; // Sequence of last byte reassembled and delivered + 1.
352  uint64_t _trim_rseq{}; // Sequence of last byte trimmed so far + 1.
353  ChunkList _chunks; // Buffered data not yet delivered or trimmed
354 };
355 
356 } // namespace spicy::rt
357 
358 namespace hilti::rt::detail::adl {
359 std::string to_string(const spicy::rt::Sink& /* x */, adl::tag /*unused*/);
360 std::string to_string(const spicy::rt::sink::ReassemblerPolicy& x, adl::tag /*unused*/);
361 } // namespace hilti::rt::detail::adl
hilti::rt::integer::safe< uint64_t > size() const
Definition: sink.h:230
void set_auto_trim(bool enable)
Definition: sink.h:208
hilti::rt::Resumable resumable
Definition: sink.h:50
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:215
uint64_t sequence_number() const
Definition: sink.h:201
Definition: reference.h:640
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:130
void close()
Definition: sink.h:160
Definition: bytes.h:154
void connect_mime_type(const std::string &mt)
Definition: sink.h:179
Definition: hook.h:29
Definition: stream.h:978
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:47
Definition: mime.h:29
T * as() const
Definition: reference.h:654
void connect_filter(spicy::rt::UnitRef< T > unit)
Definition: sink.h:147
Definition: sink.h:45
Definition: reference.h:328
static ValueReference self(T *t)
Definition: reference.h:262
const std::string & str() const &
Definition: bytes.h:216
Definition: deferred-expression.h:41
Definition: fiber.h:274
void connect_mime_type(const hilti::rt::Bytes &mt)
Definition: sink.h:188
Definition: parser.h:137
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:225
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:262
Definition: sink.h:113