12 #include <hilti/rt/exception.h>
13 #include <hilti/rt/extension-points.h>
14 #include <hilti/rt/types/bytes.h>
15 #include <hilti/rt/types/integer.h>
16 #include <hilti/rt/types/reference.h>
17 #include <hilti/rt/types/stream.h>
19 #include <spicy/rt/debug.h>
20 #include <spicy/rt/filter.h>
21 #include <spicy/rt/mime.h>
22 #include <spicy/rt/parser-fwd.h>
23 #include <spicy/rt/typedefs.h>
30 HILTI_EXCEPTION(SinkError, UsageError)
33 enum class ReassemblerPolicy { First };
36 namespace sink::detail {
39 template<
typename T,
typename =
int>
66 auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::__parser.parse2);
70 auto& state = unit->__sink;
72 state->resumable = (*parse2)(
self, state->data, {}, {});
73 state->parser = &U::__parser;
78 inline const char sink_name[] =
"__sink__";
101 Sink& operator=(
const Sink&) =
delete;
112 SPICY_RT_DEBUG_VERBOSE(
hilti::rt::fmt(
"connecting parser %s [%p] to sink %p", T::__parser.name, &*unit,
this));
113 auto state = spicy::rt::sink::detail::connectUnit(unit);
114 _units.emplace_back(std::move(unit));
115 _states.emplace_back(std::move(state));
130 throw SinkError(
"cannot connect filter after data has been forwarded already");
132 SPICY_RT_DEBUG_VERBOSE(
133 hilti::rt::fmt(
"connecting filter unit %s [%p] to sink %p", T::__parser.name, &*filter_unit,
this));
134 spicy::rt::filter::detail::connect(
_filter, filter_unit);
184 void gap(uint64_t seq, uint64_t len);
204 if ( _haveInput() ) {
206 throw SinkError(
"sink cannot update initial sequence number after activity has already been seen");
213 void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
218 hilti::rt::integer::safe<uint64_t>
size()
const {
return _size; }
225 void skip(uint64_t seq);
232 void trim(uint64_t seq);
241 void write(
hilti::rt::Bytes data, std::optional<uint64_t> seq = {}, std::optional<uint64_t> len = {});
254 std::optional<hilti::rt::Bytes> data;
258 Chunk(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper)
259 : data(std::move(data)), rseq(rseq), rupper(rupper) {}
262 using ChunkList = std::list<Chunk>;
265 bool _haveInput() {
return _cur_rseq || _chunks.size(); }
269 void _close(
bool orderly);
272 uint64_t _rseq(uint64_t seq)
const {
274 return seq - _initial_seq;
278 uint64_t _aseq(uint64_t rseq)
const {
280 return _initial_seq + rseq;
287 ChunkList::iterator _addAndCheck(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper,
288 ChunkList::iterator c);
291 bool _deliver(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper);
294 void _newData(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t len);
297 void _skip(uint64_t rseq);
300 void _trim(uint64_t rseq);
303 void _tryDeliver(ChunkList::iterator c);
306 void _reportGap(uint64_t rseq, uint64_t len)
const;
308 void _reportSkipped(uint64_t rseq)
const;
310 void _reportUndeliveredUpTo(uint64_t rupper)
const;
313 void _debugReassembler(std::string_view msg,
const std::optional<hilti::rt::Bytes>& data, uint64_t seq,
315 void _debugReassemblerBuffer(std::string_view msg)
const;
319 std::vector<sink::detail::State*> _states;
322 std::vector<hilti::rt::StrongReferenceGeneric> _units;
331 std::optional<FilterData> _filter_data;
334 sink::ReassemblerPolicy _policy;
337 uint64_t _initial_seq{};
338 uint64_t _cur_rseq{};
339 uint64_t _last_reassem_rseq{};
340 uint64_t _trim_rseq{};
346 namespace hilti::rt::detail::adl {
348 std::string
to_string(
const spicy::rt::sink::ReassemblerPolicy& x, adl::tag );
const std::string & str() const &
Definition: bytes.h:296
Definition: reference.h:376
static ValueReference self(T *t)
Definition: reference.h:299
Definition: stream.h:1167
void write(hilti::rt::Bytes data, std::optional< uint64_t > seq={}, std::optional< uint64_t > len={})
Definition: sink.cc:459
void skip(uint64_t seq)
Definition: sink.cc:449
void trim(uint64_t seq)
Definition: sink.cc:454
void connect_mime_type(const std::string &mt, uint64_t scope)
Definition: sink.h:164
hilti::rt::integer::safe< uint64_t > size() const
Definition: sink.h:218
uint64_t sequence_number() const
Definition: sink.h:189
void connect_mime_type(const MIMEType &mt, uint64_t scope)
Definition: sink.cc:383
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:111
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:250
void connect_filter(spicy::rt::UnitRef< T > filter_unit)
Definition: sink.h:128
void gap(uint64_t seq, uint64_t len)
Definition: sink.cc:447
void set_auto_trim(bool enable)
Definition: sink.h:196
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:203
void connect_mime_type(const hilti::rt::Bytes &mt, uint64_t scope)
Definition: sink.h:176
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:213
void close()
Definition: sink.h:141
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
std::string to_string(T &&x)
Definition: extension-points.h:26
Parser * parser
Definition: sink.h:54
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:48
bool skip_delivery
Definition: sink.h:57
hilti::rt::Resumable resumable
Definition: sink.h:51