Spicy
sink.h
1 // Copyright (c) 2020-now by the Zeek Project. See LICENSE for details.
2 
3 #pragma once
4 
5 #include <list>
6 #include <string>
7 #include <string_view>
8 #include <type_traits>
9 #include <utility>
10 #include <vector>
11 
12 #include <hilti/rt/exception.h>
13 #include <hilti/rt/extension-points.h>
14 #include <hilti/rt/types/bytes.h>
15 #include <hilti/rt/types/integer.h>
16 #include <hilti/rt/types/reference.h>
17 #include <hilti/rt/types/stream.h>
18 
19 #include <spicy/rt/debug.h>
20 #include <spicy/rt/filter.h>
21 #include <spicy/rt/mime.h>
22 #include <spicy/rt/parser-fwd.h>
23 #include <spicy/rt/typedefs.h>
24 
25 namespace spicy::rt {
26 
30 HILTI_EXCEPTION(SinkError, UsageError)
31 
32 namespace sink {
33 enum class ReassemblerPolicy { First };
34 } // namespace sink
35 
36 namespace sink::detail {
37 
39 template<typename T, typename = int>
40 struct supports_sinks : std::false_type {};
41 
42 template<typename T>
43 struct supports_sinks<T, decltype((void)T::__sink, 0)> : std::true_type {};
44 
46 struct State {
49 
52 
55 
57  bool skip_delivery = false;
58 };
59 
64 template<typename U>
65 auto connectUnit(UnitRef<U>& unit) {
66  auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::__parser.parse2);
67 
68  auto self = hilti::rt::ValueReference<U>::self(&*unit);
69 
70  auto& state = unit->__sink;
71  state = new sink::detail::State(); // NOLINT
72  state->resumable = (*parse2)(self, state->data, {}, {}); // Kick-off parsing with empty data.
73  state->parser = &U::__parser;
74  return state;
75 }
76 
77 // Name used as template parameter for sink's filter state. */
78 inline const char sink_name[] = "__sink__";
79 } // namespace sink::detail
80 
88 class Sink {
89 public:
90  Sink() { _init(); } // NOLINT(hicpp-member-init)
91  ~Sink() {
92  try {
93  _close(true);
94  } catch ( ... ) {
95  // Ignore errors.
96  }
97  }
98 
99  Sink(const Sink&) = delete;
100  Sink(Sink&&) = default;
101  Sink& operator=(const Sink&) = delete;
102  Sink& operator=(Sink&&) = default;
103 
110  template<typename T>
112  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("connecting parser %s [%p] to sink %p", T::__parser.name, &*unit, this));
113  auto state = spicy::rt::sink::detail::connectUnit(unit);
114  _units.emplace_back(std::move(unit));
115  _states.emplace_back(std::move(state));
116  }
117 
127  template<typename T>
129  if ( _size )
130  throw SinkError("cannot connect filter after data has been forwarded already");
131 
132  SPICY_RT_DEBUG_VERBOSE(
133  hilti::rt::fmt("connecting filter unit %s [%p] to sink %p", T::__parser.name, &*filter_unit, this));
134  spicy::rt::filter::detail::connect(_filter, filter_unit);
135  }
136 
141  void close() { _close(true); }
142 
152  void connect_mime_type(const MIMEType& mt, uint64_t scope);
153 
164  void connect_mime_type(const std::string& mt, uint64_t scope) { connect_mime_type(MIMEType(mt), scope); }
165 
176  void connect_mime_type(const hilti::rt::Bytes& mt, uint64_t scope) { connect_mime_type(MIMEType(mt.str()), scope); }
177 
184  void gap(uint64_t seq, uint64_t len);
185 
189  uint64_t sequence_number() const { return _initial_seq + _cur_rseq; }
190 
196  void set_auto_trim(bool enable) { _auto_trim = enable; }
197 
203  void set_initial_sequence_number(uint64_t seq) {
204  if ( _haveInput() ) {
205  _close(false);
206  throw SinkError("sink cannot update initial sequence number after activity has already been seen");
207  }
208 
209  _initial_seq = seq;
210  }
211 
213  void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
214 
218  hilti::rt::integer::safe<uint64_t> size() const { return _size; }
219 
225  void skip(uint64_t seq);
226 
232  void trim(uint64_t seq);
233 
241  void write(hilti::rt::Bytes data, std::optional<uint64_t> seq = {}, std::optional<uint64_t> len = {});
242 
251 
252 private:
253  struct Chunk {
254  std::optional<hilti::rt::Bytes> data; // Data at +1; unset for gap
255  uint64_t rseq; // Sequence number of first byte.
256  uint64_t rupper; // Sequence number of last byte + 1.
257 
258  Chunk(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper)
259  : data(std::move(data)), rseq(rseq), rupper(rupper) {}
260  };
261 
262  using ChunkList = std::list<Chunk>;
263 
264  // Returns true if any input has been passed in already (including gaps).
265  bool _haveInput() { return _cur_rseq || _chunks.size(); }
266 
267  // Backend for disconnecting the sink. If orderly, connected units get a
268  // chance to parse any remaining input; otherwise we abort directly.
269  void _close(bool orderly);
270 
271  // Turns an absolute sequence number into a relative one.
272  uint64_t _rseq(uint64_t seq) const {
273  // I believe this does the right thing for wrap-around ...
274  return seq - _initial_seq;
275  }
276 
277  // Turns a relative sequence number into an absolute one.
278  uint64_t _aseq(uint64_t rseq) const {
279  // I believe this does the right thing for wrap-around ...
280  return _initial_seq + rseq;
281  }
282 
283  // (Re-)initialize instance.
284  void _init();
285 
286  // Add new data to buffer, beginning search for insert position at given start *c*.
287  ChunkList::iterator _addAndCheck(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper,
288  ChunkList::iterator c);
289 
290  // Deliver data to connected parsers. Returns false if the data is empty (i.e., a gap).
291  bool _deliver(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper);
292 
293  // Entry point for all new data. If not bytes instance is given, that signals a gap.
294  void _newData(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t len);
295 
296  // Skip up to sequence number.
297  void _skip(uint64_t rseq);
298 
299  // Trim up to sequence number.
300  void _trim(uint64_t rseq);
301 
302  // Deliver as much as possible starting at given buffer position.
303  void _tryDeliver(ChunkList::iterator c);
304 
305  // Trigger various hooks.
306  void _reportGap(uint64_t rseq, uint64_t len) const;
307  void _reportOverlap(uint64_t rseq, const hilti::rt::Bytes& old, const hilti::rt::Bytes& new_) const;
308  void _reportSkipped(uint64_t rseq) const;
309  void _reportUndelivered(uint64_t rseq, const hilti::rt::Bytes& data) const;
310  void _reportUndeliveredUpTo(uint64_t rupper) const;
311 
312  // Output reassembler state for debugging.
313  void _debugReassembler(std::string_view msg, const std::optional<hilti::rt::Bytes>& data, uint64_t seq,
314  uint64_t len) const;
315  void _debugReassemblerBuffer(std::string_view msg) const;
316  void _debugDeliver(const hilti::rt::Bytes& data) const;
317 
318  // States for connected units.
319  std::vector<sink::detail::State*> _states;
320 
321  // Must come after `_state` as it's keeping the states around.
322  std::vector<hilti::rt::StrongReferenceGeneric> _units;
323 
324  // Filter input and output.
325  struct FilterData {
328  hilti::rt::stream::View output_cur;
329  };
330 
331  std::optional<FilterData> _filter_data;
332 
333  // Reassembly state.
334  sink::ReassemblerPolicy _policy; // Current policy
335  bool _auto_trim{}; // True if automatic trimming is enabled.
336  uint64_t _size{};
337  uint64_t _initial_seq{}; // Initial sequence number.
338  uint64_t _cur_rseq{}; // Sequence of last delivered byte + 1 (i.e., seq of next)
339  uint64_t _last_reassem_rseq{}; // Sequence of last byte reassembled and delivered + 1.
340  uint64_t _trim_rseq{}; // Sequence of last byte trimmed so far + 1.
341  ChunkList _chunks; // Buffered data not yet delivered or trimmed
342 };
343 
344 } // namespace spicy::rt
345 
346 namespace hilti::rt::detail::adl {
347 std::string to_string(const spicy::rt::Sink& /* x */, adl::tag /*unused*/);
348 std::string to_string(const spicy::rt::sink::ReassemblerPolicy& x, adl::tag /*unused*/);
349 } // namespace hilti::rt::detail::adl
Definition: bytes.h:234
const std::string & str() const &
Definition: bytes.h:296
Definition: fiber.h:305
Definition: reference.h:376
static ValueReference self(T *t)
Definition: reference.h:299
Definition: stream.h:1167
Definition: mime.h:29
Definition: sink.h:88
void write(hilti::rt::Bytes data, std::optional< uint64_t > seq={}, std::optional< uint64_t > len={})
Definition: sink.cc:459
void skip(uint64_t seq)
Definition: sink.cc:449
void trim(uint64_t seq)
Definition: sink.cc:454
void connect_mime_type(const std::string &mt, uint64_t scope)
Definition: sink.h:164
hilti::rt::integer::safe< uint64_t > size() const
Definition: sink.h:218
uint64_t sequence_number() const
Definition: sink.h:189
void connect_mime_type(const MIMEType &mt, uint64_t scope)
Definition: sink.cc:383
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:111
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:250
void connect_filter(spicy::rt::UnitRef< T > filter_unit)
Definition: sink.h:128
void gap(uint64_t seq, uint64_t len)
Definition: sink.cc:447
void set_auto_trim(bool enable)
Definition: sink.h:196
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:203
void connect_mime_type(const hilti::rt::Bytes &mt, uint64_t scope)
Definition: sink.h:176
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:213
void close()
Definition: sink.h:141
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
std::string to_string(T &&x)
Definition: extension-points.h:26
Definition: parser.h:142
Definition: sink.h:46
Parser * parser
Definition: sink.h:54
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:48
bool skip_delivery
Definition: sink.h:57
hilti::rt::Resumable resumable
Definition: sink.h:51