Spicy
sink.h
1 // Copyright (c) 2020-now by the Zeek Project. See LICENSE for details.
2 
3 #pragma once
4 
5 #include <list>
6 #include <memory>
7 #include <string>
8 #include <string_view>
9 #include <type_traits>
10 #include <utility>
11 #include <vector>
12 
13 #include <hilti/rt/exception.h>
14 #include <hilti/rt/extension-points.h>
15 #include <hilti/rt/types/bytes.h>
16 #include <hilti/rt/types/integer.h>
17 #include <hilti/rt/types/reference.h>
18 #include <hilti/rt/types/stream.h>
19 
20 #include <spicy/rt/debug.h>
21 #include <spicy/rt/filter.h>
22 #include <spicy/rt/mime.h>
23 #include <spicy/rt/parser-fwd.h>
24 #include <spicy/rt/typedefs.h>
25 
26 namespace spicy::rt {
27 
31 HILTI_EXCEPTION(SinkError, UsageError)
32 
33 namespace sink {
34 enum class ReassemblerPolicy { First };
35 } // namespace sink
36 
37 namespace sink::detail {
38 
40 template<typename T, typename = int>
41 struct supports_sinks : std::false_type {};
42 
43 template<typename T>
44 struct supports_sinks<T, decltype((void)T::HILTI_INTERNAL(sink), 0)> : std::true_type {};
45 
47 struct State {
50 
53 
56 
58  bool skip_delivery = false;
59 };
60 
65 template<typename U>
66 auto connectUnit(UnitRef<U>& unit) {
67  auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::HILTI_INTERNAL(parser).parse2);
68 
69  auto self = hilti::rt::ValueReference<U>::self(&*unit);
70 
71  auto& state = unit->HILTI_INTERNAL(sink);
72  state = std::make_shared<sink::detail::State>();
73  state->resumable = (*parse2)(self, state->data, {}, {}); // Kick-off parsing with empty data.
74  state->parser = &U::HILTI_INTERNAL(parser);
75  return state;
76 }
77 
78 // Name used as template parameter for sink's filter state. */
79 inline const char sink_name[] = HILTI_INTERNAL_ID("sink");
80 } // namespace sink::detail
81 
89 class Sink {
90 public:
91  Sink() { _init(); } // NOLINT(hicpp-member-init)
92  ~Sink() {
93  try {
94  _close(true);
95  } catch ( ... ) {
96  // Ignore errors.
97  }
98  }
99 
100  Sink(const Sink&) = delete;
101  Sink(Sink&&) = default;
102  Sink& operator=(const Sink&) = delete;
103  Sink& operator=(Sink&&) = default;
104 
111  template<typename T>
113  SPICY_RT_DEBUG_VERBOSE(
114  hilti::rt::fmt("connecting parser %s [%p] to sink %p", T::HILTI_INTERNAL(parser).name, &*unit, this));
115  auto state = spicy::rt::sink::detail::connectUnit(unit);
116  _units.emplace_back(std::move(unit));
117  _states.emplace_back(std::move(state));
118  }
119 
129  template<typename T>
131  if ( _size )
132  throw SinkError("cannot connect filter after data has been forwarded already");
133 
134  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("connecting filter unit %s [%p] to sink %p",
135  T::HILTI_INTERNAL(parser).name,
136  &*filter_unit,
137  this));
138  spicy::rt::filter::detail::connect(_filter, filter_unit);
139  }
140 
145  void close() { _close(true); }
146 
156  void connect_mime_type(const MIMEType& mt, uint64_t scope);
157 
168  void connect_mime_type(const hilti::rt::String& mt, uint64_t scope) { connect_mime_type(MIMEType(mt), scope); }
169 
180  void connect_mime_type(const hilti::rt::Bytes& mt, uint64_t scope) {
181  connect_mime_type(MIMEType(mt.decode(hilti::rt::unicode::Charset::ASCII)), scope);
182  }
183 
190  void gap(uint64_t seq, uint64_t len);
191 
195  uint64_t sequence_number() const { return _initial_seq + _cur_rseq; }
196 
202  void set_auto_trim(bool enable) { _auto_trim = enable; }
203 
209  void set_initial_sequence_number(uint64_t seq) {
210  if ( _haveInput() ) {
211  _close(false);
212  throw SinkError("sink cannot update initial sequence number after activity has already been seen");
213  }
214 
215  _initial_seq = seq;
216  }
217 
219  void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
220 
224  hilti::rt::integer::safe<uint64_t> size() const { return _size; }
225 
231  void skip(uint64_t seq);
232 
238  void trim(uint64_t seq);
239 
248 
257 
258 private:
259  struct Chunk {
260  hilti::rt::Optional<hilti::rt::Bytes> data; // Data at +1; unset for gap
261  uint64_t rseq; // Sequence number of first byte.
262  uint64_t rupper; // Sequence number of last byte + 1.
263 
264  Chunk(hilti::rt::Optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper)
265  : data(std::move(data)), rseq(rseq), rupper(rupper) {}
266  };
267 
268  using ChunkList = std::list<Chunk>;
269 
270  // Returns true if any input has been passed in already (including gaps).
271  bool _haveInput() { return _cur_rseq || _chunks.size(); }
272 
273  // Backend for disconnecting the sink. If orderly, connected units get a
274  // chance to parse any remaining input; otherwise we abort directly.
275  void _close(bool orderly);
276 
277  // Turns an absolute sequence number into a relative one.
278  uint64_t _rseq(uint64_t seq) const {
279  // I believe this does the right thing for wrap-around ...
280  return seq - _initial_seq;
281  }
282 
283  // Turns a relative sequence number into an absolute one.
284  uint64_t _aseq(uint64_t rseq) const {
285  // I believe this does the right thing for wrap-around ...
286  return _initial_seq + rseq;
287  }
288 
289  // (Re-)initialize instance.
290  void _init();
291 
292  // Add new data to buffer, beginning search for insert position at given start *c*.
293  ChunkList::iterator _addAndCheck(hilti::rt::Optional<hilti::rt::Bytes> data,
294  uint64_t rseq,
295  uint64_t rupper,
296  ChunkList::iterator c);
297 
298  // Deliver data to connected parsers. Returns false if the data is empty (i.e., a gap).
299  bool _deliver(hilti::rt::Optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper);
300 
301  // Entry point for all new data. If not bytes instance is given, that signals a gap.
302  void _newData(hilti::rt::Optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t len);
303 
304  // Skip up to sequence number.
305  void _skip(uint64_t rseq);
306 
307  // Trim up to sequence number.
308  void _trim(uint64_t rseq);
309 
310  // Deliver as much as possible starting at given buffer position.
311  void _tryDeliver(ChunkList::iterator c);
312 
313  // Trigger various hooks.
314  void _reportGap(uint64_t rseq, uint64_t len) const;
315  void _reportOverlap(uint64_t rseq, const hilti::rt::Bytes& old, const hilti::rt::Bytes& new_) const;
316  void _reportSkipped(uint64_t rseq) const;
317  void _reportUndelivered(uint64_t rseq, const hilti::rt::Bytes& data) const;
318  void _reportUndeliveredUpTo(uint64_t rupper) const;
319 
320  // Output reassembler state for debugging.
321  void _debugReassembler(std::string_view msg,
323  uint64_t seq,
324  uint64_t len) const;
325  void _debugReassemblerBuffer(std::string_view msg) const;
326  void _debugDeliver(const hilti::rt::Bytes& data) const;
327 
328  // States for connected units.
329  std::vector<std::shared_ptr<sink::detail::State>> _states;
330 
331  // Must come after `_state` as it's keeping the states around.
332  std::vector<hilti::rt::StrongReferenceGeneric> _units;
333 
334  // Filter input and output.
335  struct FilterData {
338  hilti::rt::stream::View output_cur;
339  };
340 
341  hilti::rt::Optional<FilterData> _filter_data;
342 
343  // Reassembly state.
344  sink::ReassemblerPolicy _policy; // Current policy
345  bool _auto_trim{}; // True if automatic trimming is enabled.
346  uint64_t _size{};
347  uint64_t _initial_seq{}; // Initial sequence number.
348  uint64_t _cur_rseq{}; // Sequence of last delivered byte + 1 (i.e., seq of next)
349  uint64_t _last_reassem_rseq{}; // Sequence of last byte reassembled and delivered + 1.
350  uint64_t _trim_rseq{}; // Sequence of last byte trimmed so far + 1.
351  ChunkList _chunks; // Buffered data not yet delivered or trimmed
352 };
353 
354 } // namespace spicy::rt
355 
356 namespace hilti::rt::detail::adl {
357 std::string to_string(const spicy::rt::Sink& /* x */, adl::tag /*unused*/);
358 std::string to_string(const spicy::rt::sink::ReassemblerPolicy& x, adl::tag /*unused*/);
359 } // namespace hilti::rt::detail::adl
Definition: bytes.h:235
String decode(unicode::Charset cs, unicode::DecodeErrorStrategy errors=unicode::DecodeErrorStrategy::REPLACE) const
Definition: fiber.h:320
Definition: string.h:31
Definition: reference.h:399
static ValueReference self(T *t)
Definition: reference.h:324
Definition: stream.h:1172
Definition: mime.h:32
Definition: sink.h:89
void skip(uint64_t seq)
Definition: sink.cc:463
void trim(uint64_t seq)
Definition: sink.cc:468
hilti::rt::integer::safe< uint64_t > size() const
Definition: sink.h:224
uint64_t sequence_number() const
Definition: sink.h:195
void connect_mime_type(const MIMEType &mt, uint64_t scope)
Definition: sink.cc:396
void write(hilti::rt::Bytes data, hilti::rt::Optional< uint64_t > seq={}, hilti::rt::Optional< uint64_t > len={})
Definition: sink.cc:473
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:112
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:256
void connect_filter(spicy::rt::UnitRef< T > filter_unit)
Definition: sink.h:130
void gap(uint64_t seq, uint64_t len)
Definition: sink.cc:461
void set_auto_trim(bool enable)
Definition: sink.h:202
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:209
void connect_mime_type(const hilti::rt::Bytes &mt, uint64_t scope)
Definition: sink.h:180
void connect_mime_type(const hilti::rt::String &mt, uint64_t scope)
Definition: sink.h:168
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:219
void close()
Definition: sink.h:145
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:17
std::string to_string(T &&x)
Definition: extension-points.h:26
Definition: parser.h:149
Definition: sink.h:47
Parser * parser
Definition: sink.h:55
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:49
bool skip_delivery
Definition: sink.h:58
hilti::rt::Resumable resumable
Definition: sink.h:52