11 #include <hilti/rt/exception.h> 12 #include <hilti/rt/extension-points.h> 13 #include <hilti/rt/types/bytes.h> 14 #include <hilti/rt/types/integer.h> 15 #include <hilti/rt/types/reference.h> 16 #include <hilti/rt/types/stream.h> 18 #include <spicy/rt/debug.h> 19 #include <spicy/rt/filter.h> 20 #include <spicy/rt/mime.h> 21 #include <spicy/rt/parser-fwd.h> 22 #include <spicy/rt/typedefs.h> 29 HILTI_EXCEPTION(SinkError, UserException)
32 enum class ReassemblerPolicy { First };
35 namespace sink::detail {
38 template<
typename T,
typename =
int>
56 bool skip_delivery =
false;
65 auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::__parser.parse2);
69 auto& state = unit->__sink;
71 state->resumable = (*parse2)(
self, state->data, {}, {});
72 state->parser = &U::__parser;
82 ::spicy::rt::detail::ParseSinkFunction parseFunction() {
85 return std::make_pair(std::move(unit), _connectUnit(unit));
98 template<
typename Unit,
auto Hook,
typename... Args>
101 auto unit = u.
as<Unit>();
102 ((*unit).*Hook)(std::forward<Args>(args)...);
107 inline const char sink_name[] =
"__sink__";
120 ~
Sink() { _close(
true); }
124 Sink& operator=(
const Sink&) =
delete;
135 SPICY_RT_DEBUG_VERBOSE(
hilti::rt::fmt(
"connecting parser %s [%p] to sink %p", T::__parser.name, &*unit,
this));
136 auto state = spicy::rt::sink::detail::_connectUnit(unit);
137 _units.emplace_back(std::move(unit));
138 _states.emplace_back(std::move(state));
153 throw SinkError(
"cannot connect filter after data has been forwarded already");
155 SPICY_RT_DEBUG_VERBOSE(
156 hilti::rt::fmt(
"connecting filter unit %s [%p] to sink %p", T::__parser.name, &*unit,
this));
157 spicy::rt::filter::connect(_filter, unit);
175 void connect_mime_type(
const MIMEType& mt,
const std::string& scope);
209 void gap(uint64_t seq, uint64_t len);
229 if ( _haveInput() ) {
231 throw SinkError(
"sink cannot update initial sequence number after activity has already been seen");
238 void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
243 hilti::rt::integer::safe<uint64_t>
size()
const {
return _size; }
250 void skip(uint64_t seq);
257 void trim(uint64_t seq);
266 void write(
hilti::rt::Bytes data, std::optional<uint64_t> seq = {}, std::optional<uint64_t> len = {});
279 std::optional<hilti::rt::Bytes> data;
283 Chunk(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper)
284 : data(std::move(data)), rseq(rseq), rupper(rupper) {}
287 using ChunkList = std::list<Chunk>;
290 bool _haveInput() {
return _cur_rseq || _chunks.size(); }
294 void _close(
bool orderly);
297 uint64_t _rseq(uint64_t seq)
const {
299 return seq - _initial_seq;
303 uint64_t _aseq(uint64_t rseq)
const {
305 return _initial_seq + rseq;
312 ChunkList::iterator _addAndCheck(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper,
313 ChunkList::iterator c);
316 bool _deliver(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper);
319 void _newData(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t len);
322 void _skip(uint64_t rseq);
325 void _trim(uint64_t rseq);
328 void _tryDeliver(ChunkList::iterator c);
331 void _reportGap(uint64_t rseq, uint64_t len)
const;
333 void _reportSkipped(uint64_t rseq)
const;
335 void _reportUndeliveredUpTo(uint64_t rupper)
const;
338 void _debugReassembler(
const std::string& msg,
const std::optional<hilti::rt::Bytes>& data, uint64_t seq,
340 void _debugReassemblerBuffer(
const std::string& msg)
const;
344 std::vector<sink::detail::State*> _states;
347 std::vector<hilti::rt::StrongReferenceGeneric> _units;
356 std::optional<FilterData> _filter_data;
359 sink::ReassemblerPolicy _policy;
362 uint64_t _initial_seq{};
363 uint64_t _cur_rseq{};
364 uint64_t _last_reassem_rseq{};
365 uint64_t _trim_rseq{};
373 std::string to_string(
const spicy::rt::sink::ReassemblerPolicy& x, adl::tag );
hilti::rt::integer::safe< uint64_t > size() const
Definition: sink.h:243
void set_auto_trim(bool enable)
Definition: sink.h:221
hilti::rt::Resumable resumable
Definition: sink.h:50
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:228
uint64_t sequence_number() const
Definition: sink.h:214
Definition: reference.h:652
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:134
void close()
Definition: sink.h:164
void connect_mime_type(const std::string &mt, const std::string &scope)
Definition: sink.h:187
Definition: stream.h:1001
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:47
T * as() const
Definition: reference.h:666
void connect_filter(spicy::rt::UnitRef< T > unit)
Definition: sink.h:151
Definition: reference.h:340
void connect_mime_type(const hilti::rt::Bytes &mt, const std::string &scope)
Definition: sink.h:199
static ValueReference self(T *t)
Definition: reference.h:274
const std::string & str() const &
Definition: bytes.h:217
Definition: deferred-expression.h:41
Parser * parser
Definition: sink.h:53
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:238
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:275