Spicy
sink.h
1 // Copyright (c) 2020-2021 by the Zeek Project. See LICENSE for details.
2 
3 #pragma once
4 
5 #include <list>
6 #include <string>
7 #include <type_traits>
8 #include <utility>
9 #include <vector>
10 
11 #include <hilti/rt/exception.h>
12 #include <hilti/rt/extension-points.h>
13 #include <hilti/rt/types/bytes.h>
14 #include <hilti/rt/types/integer.h>
15 #include <hilti/rt/types/reference.h>
16 #include <hilti/rt/types/stream.h>
17 
18 #include <spicy/rt/debug.h>
19 #include <spicy/rt/filter.h>
20 #include <spicy/rt/mime.h>
21 #include <spicy/rt/parser-fwd.h>
22 #include <spicy/rt/typedefs.h>
23 
24 namespace spicy::rt {
25 
29 HILTI_EXCEPTION(SinkError, UsageError)
30 
31 namespace sink {
32 enum class ReassemblerPolicy { First };
33 } // namespace sink
34 
35 namespace sink::detail {
36 
38 template<typename T, typename = int>
39 struct supports_sinks : std::false_type {};
40 
41 template<typename T>
42 struct supports_sinks<T, decltype((void)T::__sink, 0)> : std::true_type {};
43 
45 struct State {
48 
51 
54 
56  bool skip_delivery = false;
57 };
58 
63 template<typename U>
64 auto _connectUnit(UnitRef<U>& unit) {
65  auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::__parser.parse2);
66 
67  auto self = hilti::rt::ValueReference<U>::self(&*unit);
68 
69  auto& state = unit->__sink;
70  state = new sink::detail::State(); // NOLINT
71  state->resumable = (*parse2)(self, state->data, {}, {}); // Kick-off parsing with empty data.
72  state->parser = &U::__parser;
73  return state;
74 }
75 
81 template<typename U>
82 ::spicy::rt::detail::ParseSinkFunction parseFunction() {
83  return []() {
84  auto unit = UnitRef<U>(U());
85  return std::make_pair(std::move(unit), _connectUnit(unit));
86  };
87 }
88 
98 template<typename Unit, auto Hook, typename... Args>
99 auto hookFunction() {
100  return [](const hilti::rt::StrongReferenceGeneric& u, Args&&... args) -> void {
101  auto unit = u.as<Unit>();
102  ((*unit).*Hook)(std::forward<Args>(args)...);
103  };
104 }
105 
106 // Name used as template parameter for sink's filter state. */
107 inline const char sink_name[] = "__sink__";
108 } // namespace sink::detail
109 
117 class Sink {
118 public:
119  Sink() { _init(); } // NOLINT(hicpp-member-init)
120  ~Sink() {
121  try {
122  _close(true);
123  } catch ( ... ) {
124  // Ignore errors.
125  }
126  }
127 
128  Sink(const Sink&) = delete;
129  Sink(Sink&&) = default;
130  Sink& operator=(const Sink&) = delete;
131  Sink& operator=(Sink&&) = default;
132 
139  template<typename T>
141  SPICY_RT_DEBUG_VERBOSE(hilti::rt::fmt("connecting parser %s [%p] to sink %p", T::__parser.name, &*unit, this));
142  auto state = spicy::rt::sink::detail::_connectUnit(unit);
143  _units.emplace_back(std::move(unit));
144  _states.emplace_back(std::move(state));
145  }
146 
156  template<typename T>
158  if ( _size )
159  throw SinkError("cannot connect filter after data has been forwarded already");
160 
161  SPICY_RT_DEBUG_VERBOSE(
162  hilti::rt::fmt("connecting filter unit %s [%p] to sink %p", T::__parser.name, &*unit, this));
163  spicy::rt::filter::connect(_filter, unit);
164  }
165 
170  void close() { _close(true); }
171 
181  void connect_mime_type(const MIMEType& mt, const std::string& scope);
182 
193  void connect_mime_type(const std::string& mt, const std::string& scope) { connect_mime_type(MIMEType(mt), scope); }
194 
205  void connect_mime_type(const hilti::rt::Bytes& mt, const std::string& scope) {
206  connect_mime_type(MIMEType(mt.str()), scope);
207  }
208 
215  void gap(uint64_t seq, uint64_t len);
216 
220  uint64_t sequence_number() const { return _initial_seq + _cur_rseq; }
221 
227  void set_auto_trim(bool enable) { _auto_trim = enable; }
228 
234  void set_initial_sequence_number(uint64_t seq) {
235  if ( _haveInput() ) {
236  _close(false);
237  throw SinkError("sink cannot update initial sequence number after activity has already been seen");
238  }
239 
240  _initial_seq = seq;
241  }
242 
244  void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
245 
249  hilti::rt::integer::safe<uint64_t> size() const { return _size; }
250 
256  void skip(uint64_t seq);
257 
263  void trim(uint64_t seq);
264 
272  void write(hilti::rt::Bytes data, std::optional<uint64_t> seq = {}, std::optional<uint64_t> len = {});
273 
282 
283 private:
284  struct Chunk {
285  std::optional<hilti::rt::Bytes> data; // Data at +1; unset for gap
286  uint64_t rseq; // Sequence number of first byte.
287  uint64_t rupper; // Sequence number of last byte + 1.
288 
289  Chunk(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper)
290  : data(std::move(data)), rseq(rseq), rupper(rupper) {}
291  };
292 
293  using ChunkList = std::list<Chunk>;
294 
295  // Returns true if any input has been passed in already (including gaps).
296  bool _haveInput() { return _cur_rseq || _chunks.size(); }
297 
298  // Backend for disconnecting the sink. If orderly, connected units get a
299  // chance to parse any remaining input; otherwise we abort directly.
300  void _close(bool orderly);
301 
302  // Turns an absolute sequence number into a relative one.
303  uint64_t _rseq(uint64_t seq) const {
304  // I believe this does the right thing for wrap-around ...
305  return seq - _initial_seq;
306  }
307 
308  // Turns a relative sequence number into an absolute one.
309  uint64_t _aseq(uint64_t rseq) const {
310  // I believe this does the right thing for wrap-around ...
311  return _initial_seq + rseq;
312  }
313 
314  // (Re-)initialize instance.
315  void _init();
316 
317  // Add new data to buffer, beginning search for insert position at given start *c*.
318  ChunkList::iterator _addAndCheck(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper,
319  ChunkList::iterator c);
320 
321  // Deliver data to connected parsers. Returns false if the data is empty (i.e., a gap).
322  bool _deliver(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper);
323 
324  // Entry point for all new data. If not bytes instance is given, that signals a gap.
325  void _newData(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t len);
326 
327  // Skip up to sequence number.
328  void _skip(uint64_t rseq);
329 
330  // Trim up to sequence number.
331  void _trim(uint64_t rseq);
332 
333  // Deliver as much as possible starting at given buffer position.
334  void _tryDeliver(ChunkList::iterator c);
335 
336  // Trigger various hooks.
337  void _reportGap(uint64_t rseq, uint64_t len) const;
338  void _reportOverlap(uint64_t rseq, const hilti::rt::Bytes& old, const hilti::rt::Bytes& new_) const;
339  void _reportSkipped(uint64_t rseq) const;
340  void _reportUndelivered(uint64_t rseq, const hilti::rt::Bytes& data) const;
341  void _reportUndeliveredUpTo(uint64_t rupper) const;
342 
343  // Output reassembler state for debugging.
344  void _debugReassembler(const std::string& msg, const std::optional<hilti::rt::Bytes>& data, uint64_t seq,
345  uint64_t len) const;
346  void _debugReassemblerBuffer(const std::string& msg) const;
347  void _debugDeliver(const hilti::rt::Bytes& data) const;
348 
349  // States for connected units.
350  std::vector<sink::detail::State*> _states;
351 
352  // Must come after `_state` as it's keeping the states around.
353  std::vector<hilti::rt::StrongReferenceGeneric> _units;
354 
355  // Filter input and output.
356  struct FilterData {
359  hilti::rt::stream::View output_cur;
360  };
361 
362  std::optional<FilterData> _filter_data;
363 
364  // Reassembly state.
365  sink::ReassemblerPolicy _policy; // Current policy
366  bool _auto_trim{}; // True if automatic trimming is enabled.
367  uint64_t _size{};
368  uint64_t _initial_seq{}; // Initial sequence number.
369  uint64_t _cur_rseq{}; // Sequence of last delivered byte + 1 (i.e., seq of next)
370  uint64_t _last_reassem_rseq{}; // Sequence of last byte reassembled and delivered + 1.
371  uint64_t _trim_rseq{}; // Sequence of last byte trimmed so far + 1.
372  ChunkList _chunks; // Buffered data not yet delivered or trimmed
373 };
374 
375 } // namespace spicy::rt
376 
377 namespace hilti::rt::detail::adl {
378 std::string to_string(const spicy::rt::Sink& /* x */, adl::tag /*unused*/);
379 std::string to_string(const spicy::rt::sink::ReassemblerPolicy& x, adl::tag /*unused*/);
380 } // namespace hilti::rt::detail::adl
hilti::rt::integer::safe< uint64_t > size() const
Definition: sink.h:249
void set_auto_trim(bool enable)
Definition: sink.h:227
hilti::rt::Resumable resumable
Definition: sink.h:50
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:234
uint64_t sequence_number() const
Definition: sink.h:220
Definition: reference.h:663
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:140
void close()
Definition: sink.h:170
Definition: bytes.h:157
Definition: hook.h:29
void connect_mime_type(const std::string &mt, const std::string &scope)
Definition: sink.h:193
Definition: stream.h:984
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:47
Definition: mime.h:29
T * as() const
Definition: reference.h:677
void connect_filter(spicy::rt::UnitRef< T > unit)
Definition: sink.h:157
Definition: sink.h:45
Definition: reference.h:345
void connect_mime_type(const hilti::rt::Bytes &mt, const std::string &scope)
Definition: sink.h:205
static ValueReference self(T *t)
Definition: reference.h:279
const std::string & str() const &
Definition: bytes.h:223
Definition: deferred-expression.h:41
Definition: fiber.h:295
Definition: parser.h:140
Parser * parser
Definition: sink.h:53
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:244
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:281
Definition: sink.h:117