Spicy
sink.h
1 // Copyright (c) 2020-now by the Zeek Project. See LICENSE for details.
2 
3 #pragma once
4 
5 #include <list>
6 #include <string>
7 #include <string_view>
8 #include <type_traits>
9 #include <utility>
10 #include <vector>
11 
12 #include <hilti/rt/exception.h>
13 #include <hilti/rt/extension-points.h>
14 #include <hilti/rt/types/bytes.h>
15 #include <hilti/rt/types/integer.h>
16 #include <hilti/rt/types/reference.h>
17 #include <hilti/rt/types/stream.h>
18 
19 #include <spicy/rt/debug.h>
20 #include <spicy/rt/filter.h>
21 #include <spicy/rt/mime.h>
22 #include <spicy/rt/parser-fwd.h>
23 #include <spicy/rt/typedefs.h>
24 
25 namespace spicy::rt {
26 
30 HILTI_EXCEPTION(SinkError, UsageError)
31 
32 namespace sink {
33 enum class ReassemblerPolicy { First };
34 } // namespace sink
35 
36 namespace sink::detail {
37 
39 template<typename T, typename = int>
40 struct supports_sinks : std::false_type {};
41 
42 template<typename T>
43 struct supports_sinks<T, decltype((void)T::__sink, 0)> : std::true_type {};
44 
46 struct State {
49 
52 
55 
57  bool skip_delivery = false;
58 };
59 
64 template<typename U>
65 auto connectUnit(UnitRef<U>& unit) {
66  auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::__parser.parse2);
67 
68  auto self = hilti::rt::ValueReference<U>::self(&*unit);
69 
70  auto& state = unit->__sink;
71  state = new sink::detail::State(); // NOLINT
72  state->resumable = (*parse2)(self, state->data, {}, {}); // Kick-off parsing with empty data.
73  state->parser = &U::__parser;
74  return state;
75 }
76 
77 // Name used as template parameter for sink's filter state. */
78 inline const char sink_name[] = "__sink__";
79 } // namespace sink::detail
80 
88 class Sink {
89 public:
90  Sink() { _init(); } // NOLINT(hicpp-member-init)
91  ~Sink() {
92  try {
93  _close(true);
94  } catch ( ... ) {
95  // Ignore errors.
96  }
97  }
98 
99  Sink(const Sink&) = delete;
100  Sink(Sink&&) = default;
101  Sink& operator=(const Sink&) = delete;
102  Sink& operator=(Sink&&) = default;
103 
110  template<typename T>
112  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("connecting parser %s [%p] to sink %p", T::__parser.name, &*unit, this));
113  auto state = spicy::rt::sink::detail::connectUnit(unit);
114  _units.emplace_back(std::move(unit));
115  _states.emplace_back(std::move(state));
116  }
117 
127  template<typename T>
129  if ( _size )
130  throw SinkError("cannot connect filter after data has been forwarded already");
131 
132  SPICY_RT_DEBUG_VERBOSE(
133  hilti::rt::fmt("connecting filter unit %s [%p] to sink %p", T::__parser.name, &*filter_unit, this));
134  spicy::rt::filter::detail::connect(_filter, filter_unit);
135  }
136 
141  void close() { _close(true); }
142 
152  void connect_mime_type(const MIMEType& mt, std::string_view scope);
153 
164  void connect_mime_type(const std::string& mt, std::string_view scope) { connect_mime_type(MIMEType(mt), scope); }
165 
176  void connect_mime_type(const hilti::rt::Bytes& mt, std::string_view scope) {
177  connect_mime_type(MIMEType(mt.str()), scope);
178  }
179 
186  void gap(uint64_t seq, uint64_t len);
187 
191  uint64_t sequence_number() const { return _initial_seq + _cur_rseq; }
192 
198  void set_auto_trim(bool enable) { _auto_trim = enable; }
199 
205  void set_initial_sequence_number(uint64_t seq) {
206  if ( _haveInput() ) {
207  _close(false);
208  throw SinkError("sink cannot update initial sequence number after activity has already been seen");
209  }
210 
211  _initial_seq = seq;
212  }
213 
215  void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
216 
220  hilti::rt::integer::safe<uint64_t> size() const { return _size; }
221 
227  void skip(uint64_t seq);
228 
234  void trim(uint64_t seq);
235 
243  void write(hilti::rt::Bytes data, std::optional<uint64_t> seq = {}, std::optional<uint64_t> len = {});
244 
253 
254 private:
255  struct Chunk {
256  std::optional<hilti::rt::Bytes> data; // Data at +1; unset for gap
257  uint64_t rseq; // Sequence number of first byte.
258  uint64_t rupper; // Sequence number of last byte + 1.
259 
260  Chunk(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper)
261  : data(std::move(data)), rseq(rseq), rupper(rupper) {}
262  };
263 
264  using ChunkList = std::list<Chunk>;
265 
266  // Returns true if any input has been passed in already (including gaps).
267  bool _haveInput() { return _cur_rseq || _chunks.size(); }
268 
269  // Backend for disconnecting the sink. If orderly, connected units get a
270  // chance to parse any remaining input; otherwise we abort directly.
271  void _close(bool orderly);
272 
273  // Turns an absolute sequence number into a relative one.
274  uint64_t _rseq(uint64_t seq) const {
275  // I believe this does the right thing for wrap-around ...
276  return seq - _initial_seq;
277  }
278 
279  // Turns a relative sequence number into an absolute one.
280  uint64_t _aseq(uint64_t rseq) const {
281  // I believe this does the right thing for wrap-around ...
282  return _initial_seq + rseq;
283  }
284 
285  // (Re-)initialize instance.
286  void _init();
287 
288  // Add new data to buffer, beginning search for insert position at given start *c*.
289  ChunkList::iterator _addAndCheck(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper,
290  ChunkList::iterator c);
291 
292  // Deliver data to connected parsers. Returns false if the data is empty (i.e., a gap).
293  bool _deliver(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper);
294 
295  // Entry point for all new data. If not bytes instance is given, that signals a gap.
296  void _newData(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t len);
297 
298  // Skip up to sequence number.
299  void _skip(uint64_t rseq);
300 
301  // Trim up to sequence number.
302  void _trim(uint64_t rseq);
303 
304  // Deliver as much as possible starting at given buffer position.
305  void _tryDeliver(ChunkList::iterator c);
306 
307  // Trigger various hooks.
308  void _reportGap(uint64_t rseq, uint64_t len) const;
309  void _reportOverlap(uint64_t rseq, const hilti::rt::Bytes& old, const hilti::rt::Bytes& new_) const;
310  void _reportSkipped(uint64_t rseq) const;
311  void _reportUndelivered(uint64_t rseq, const hilti::rt::Bytes& data) const;
312  void _reportUndeliveredUpTo(uint64_t rupper) const;
313 
314  // Output reassembler state for debugging.
315  void _debugReassembler(std::string_view msg, const std::optional<hilti::rt::Bytes>& data, uint64_t seq,
316  uint64_t len) const;
317  void _debugReassemblerBuffer(std::string_view msg) const;
318  void _debugDeliver(const hilti::rt::Bytes& data) const;
319 
320  // States for connected units.
321  std::vector<sink::detail::State*> _states;
322 
323  // Must come after `_state` as it's keeping the states around.
324  std::vector<hilti::rt::StrongReferenceGeneric> _units;
325 
326  // Filter input and output.
327  struct FilterData {
330  hilti::rt::stream::View output_cur;
331  };
332 
333  std::optional<FilterData> _filter_data;
334 
335  // Reassembly state.
336  sink::ReassemblerPolicy _policy; // Current policy
337  bool _auto_trim{}; // True if automatic trimming is enabled.
338  uint64_t _size{};
339  uint64_t _initial_seq{}; // Initial sequence number.
340  uint64_t _cur_rseq{}; // Sequence of last delivered byte + 1 (i.e., seq of next)
341  uint64_t _last_reassem_rseq{}; // Sequence of last byte reassembled and delivered + 1.
342  uint64_t _trim_rseq{}; // Sequence of last byte trimmed so far + 1.
343  ChunkList _chunks; // Buffered data not yet delivered or trimmed
344 };
345 
346 } // namespace spicy::rt
347 
348 namespace hilti::rt::detail::adl {
349 std::string to_string(const spicy::rt::Sink& /* x */, adl::tag /*unused*/);
350 std::string to_string(const spicy::rt::sink::ReassemblerPolicy& x, adl::tag /*unused*/);
351 } // namespace hilti::rt::detail::adl
Definition: bytes.h:238
const std::string & str() const &
Definition: bytes.h:300
Definition: fiber.h:305
Definition: reference.h:376
static ValueReference self(T *t)
Definition: reference.h:299
Definition: stream.h:1167
Definition: mime.h:29
Definition: sink.h:88
void write(hilti::rt::Bytes data, std::optional< uint64_t > seq={}, std::optional< uint64_t > len={})
Definition: sink.cc:459
void skip(uint64_t seq)
Definition: sink.cc:449
void trim(uint64_t seq)
Definition: sink.cc:454
hilti::rt::integer::safe< uint64_t > size() const
Definition: sink.h:220
uint64_t sequence_number() const
Definition: sink.h:191
void connect_mime_type(const MIMEType &mt, std::string_view scope)
Definition: sink.cc:383
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:111
void connect_mime_type(const std::string &mt, std::string_view scope)
Definition: sink.h:164
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:252
void connect_filter(spicy::rt::UnitRef< T > filter_unit)
Definition: sink.h:128
void gap(uint64_t seq, uint64_t len)
Definition: sink.cc:447
void connect_mime_type(const hilti::rt::Bytes &mt, std::string_view scope)
Definition: sink.h:176
void set_auto_trim(bool enable)
Definition: sink.h:198
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:205
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:215
void close()
Definition: sink.h:141
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
std::string to_string(T &&x)
Definition: extension-points.h:26
Definition: parser.h:142
Definition: sink.h:46
Parser * parser
Definition: sink.h:54
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:48
bool skip_delivery
Definition: sink.h:57
hilti::rt::Resumable resumable
Definition: sink.h:51