13 #include <hilti/rt/exception.h>
14 #include <hilti/rt/extension-points.h>
15 #include <hilti/rt/types/bytes.h>
16 #include <hilti/rt/types/integer.h>
17 #include <hilti/rt/types/reference.h>
18 #include <hilti/rt/types/stream.h>
20 #include <spicy/rt/debug.h>
21 #include <spicy/rt/filter.h>
22 #include <spicy/rt/mime.h>
23 #include <spicy/rt/parser-fwd.h>
24 #include <spicy/rt/typedefs.h>
31 HILTI_EXCEPTION(SinkError, UsageError)
34 enum class ReassemblerPolicy { First };
37 namespace sink::detail {
40 template<
typename T,
typename =
int>
44 struct supports_sinks<T, decltype((void)T::HILTI_INTERNAL(sink), 0)> : std::true_type {};
67 auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::HILTI_INTERNAL(parser).parse2);
71 auto& state = unit->HILTI_INTERNAL(sink);
72 state = std::make_shared<sink::detail::State>();
73 state->resumable = (*parse2)(
self, state->data, {}, {});
74 state->parser = &U::HILTI_INTERNAL(parser);
79 inline const char sink_name[] = HILTI_INTERNAL_ID(
"sink");
102 Sink& operator=(
const Sink&) =
delete;
113 SPICY_RT_DEBUG_VERBOSE(
114 hilti::rt::fmt(
"connecting parser %s [%p] to sink %p", T::HILTI_INTERNAL(parser).name, &*unit,
this));
115 auto state = spicy::rt::sink::detail::connectUnit(unit);
116 _units.emplace_back(std::move(unit));
117 _states.emplace_back(std::move(state));
132 throw SinkError(
"cannot connect filter after data has been forwarded already");
134 SPICY_RT_DEBUG_VERBOSE(
hilti::rt::fmt(
"connecting filter unit %s [%p] to sink %p",
135 T::HILTI_INTERNAL(parser).name,
138 spicy::rt::filter::detail::connect(
_filter, filter_unit);
190 void gap(uint64_t seq, uint64_t len);
210 if ( _haveInput() ) {
212 throw SinkError(
"sink cannot update initial sequence number after activity has already been seen");
219 void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
224 hilti::rt::integer::safe<uint64_t>
size()
const {
return _size; }
231 void skip(uint64_t seq);
238 void trim(uint64_t seq);
265 : data(std::move(data)), rseq(rseq), rupper(rupper) {}
268 using ChunkList = std::list<Chunk>;
271 bool _haveInput() {
return _cur_rseq || _chunks.size(); }
275 void _close(
bool orderly);
278 uint64_t _rseq(uint64_t seq)
const {
280 return seq - _initial_seq;
284 uint64_t _aseq(uint64_t rseq)
const {
286 return _initial_seq + rseq;
296 ChunkList::iterator c);
305 void _skip(uint64_t rseq);
308 void _trim(uint64_t rseq);
311 void _tryDeliver(ChunkList::iterator c);
314 void _reportGap(uint64_t rseq, uint64_t len)
const;
316 void _reportSkipped(uint64_t rseq)
const;
318 void _reportUndeliveredUpTo(uint64_t rupper)
const;
321 void _debugReassembler(std::string_view msg,
325 void _debugReassemblerBuffer(std::string_view msg)
const;
329 std::vector<std::shared_ptr<sink::detail::State>> _states;
332 std::vector<hilti::rt::StrongReferenceGeneric> _units;
344 sink::ReassemblerPolicy _policy;
347 uint64_t _initial_seq{};
348 uint64_t _cur_rseq{};
349 uint64_t _last_reassem_rseq{};
350 uint64_t _trim_rseq{};
356 namespace hilti::rt::detail::adl {
358 std::string
to_string(
const spicy::rt::sink::ReassemblerPolicy& x, adl::tag );
String decode(unicode::Charset cs, unicode::DecodeErrorStrategy errors=unicode::DecodeErrorStrategy::REPLACE) const
Definition: reference.h:399
static ValueReference self(T *t)
Definition: reference.h:324
Definition: stream.h:1172
void skip(uint64_t seq)
Definition: sink.cc:463
void trim(uint64_t seq)
Definition: sink.cc:468
hilti::rt::integer::safe< uint64_t > size() const
Definition: sink.h:224
uint64_t sequence_number() const
Definition: sink.h:195
void connect_mime_type(const MIMEType &mt, uint64_t scope)
Definition: sink.cc:396
void write(hilti::rt::Bytes data, hilti::rt::Optional< uint64_t > seq={}, hilti::rt::Optional< uint64_t > len={})
Definition: sink.cc:473
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:112
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:256
void connect_filter(spicy::rt::UnitRef< T > filter_unit)
Definition: sink.h:130
void gap(uint64_t seq, uint64_t len)
Definition: sink.cc:461
void set_auto_trim(bool enable)
Definition: sink.h:202
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:209
void connect_mime_type(const hilti::rt::Bytes &mt, uint64_t scope)
Definition: sink.h:180
void connect_mime_type(const hilti::rt::String &mt, uint64_t scope)
Definition: sink.h:168
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:219
void close()
Definition: sink.h:145
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:17
std::string to_string(T &&x)
Definition: extension-points.h:26
Parser * parser
Definition: sink.h:55
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:49
bool skip_delivery
Definition: sink.h:58
hilti::rt::Resumable resumable
Definition: sink.h:52