11 #include <hilti/rt/exception.h> 12 #include <hilti/rt/extension-points.h> 13 #include <hilti/rt/types/bytes.h> 14 #include <hilti/rt/types/integer.h> 15 #include <hilti/rt/types/reference.h> 16 #include <hilti/rt/types/stream.h> 18 #include <spicy/rt/debug.h> 19 #include <spicy/rt/filter.h> 20 #include <spicy/rt/mime.h> 21 #include <spicy/rt/parser-fwd.h> 22 #include <spicy/rt/typedefs.h> 29 HILTI_EXCEPTION(SinkError, UsageError)
32 enum class ReassemblerPolicy { First };
35 namespace sink::detail {
38 template<
typename T,
typename =
int>
56 bool skip_delivery =
false;
65 auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::__parser.parse2);
69 auto& state = unit->__sink;
71 state->resumable = (*parse2)(
self, state->data, {}, {});
72 state->parser = &U::__parser;
82 ::spicy::rt::detail::ParseSinkFunction parseFunction() {
85 return std::make_pair(std::move(unit), _connectUnit(unit));
98 template<
typename Unit,
auto Hook,
typename... Args>
101 auto unit = u.
as<Unit>();
102 ((*unit).*Hook)(std::forward<Args>(args)...);
107 inline const char sink_name[] =
"__sink__";
130 Sink& operator=(
const Sink&) =
delete;
141 SPICY_RT_DEBUG_VERBOSE(
hilti::rt::fmt(
"connecting parser %s [%p] to sink %p", T::__parser.name, &*unit,
this));
142 auto state = spicy::rt::sink::detail::_connectUnit(unit);
143 _units.emplace_back(std::move(unit));
144 _states.emplace_back(std::move(state));
159 throw SinkError(
"cannot connect filter after data has been forwarded already");
161 SPICY_RT_DEBUG_VERBOSE(
162 hilti::rt::fmt(
"connecting filter unit %s [%p] to sink %p", T::__parser.name, &*unit,
this));
163 spicy::rt::filter::connect(_filter, unit);
181 void connect_mime_type(
const MIMEType& mt,
const std::string& scope);
215 void gap(uint64_t seq, uint64_t len);
235 if ( _haveInput() ) {
237 throw SinkError(
"sink cannot update initial sequence number after activity has already been seen");
244 void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
249 hilti::rt::integer::safe<uint64_t>
size()
const {
return _size; }
256 void skip(uint64_t seq);
263 void trim(uint64_t seq);
272 void write(
hilti::rt::Bytes data, std::optional<uint64_t> seq = {}, std::optional<uint64_t> len = {});
285 std::optional<hilti::rt::Bytes> data;
289 Chunk(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper)
290 : data(std::move(data)), rseq(rseq), rupper(rupper) {}
293 using ChunkList = std::list<Chunk>;
296 bool _haveInput() {
return _cur_rseq || _chunks.size(); }
300 void _close(
bool orderly);
303 uint64_t _rseq(uint64_t seq)
const {
305 return seq - _initial_seq;
309 uint64_t _aseq(uint64_t rseq)
const {
311 return _initial_seq + rseq;
318 ChunkList::iterator _addAndCheck(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper,
319 ChunkList::iterator c);
322 bool _deliver(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper);
325 void _newData(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t len);
328 void _skip(uint64_t rseq);
331 void _trim(uint64_t rseq);
334 void _tryDeliver(ChunkList::iterator c);
337 void _reportGap(uint64_t rseq, uint64_t len)
const;
339 void _reportSkipped(uint64_t rseq)
const;
341 void _reportUndeliveredUpTo(uint64_t rupper)
const;
344 void _debugReassembler(
const std::string& msg,
const std::optional<hilti::rt::Bytes>& data, uint64_t seq,
346 void _debugReassemblerBuffer(
const std::string& msg)
const;
350 std::vector<sink::detail::State*> _states;
353 std::vector<hilti::rt::StrongReferenceGeneric> _units;
362 std::optional<FilterData> _filter_data;
365 sink::ReassemblerPolicy _policy;
368 uint64_t _initial_seq{};
369 uint64_t _cur_rseq{};
370 uint64_t _last_reassem_rseq{};
371 uint64_t _trim_rseq{};
379 std::string to_string(
const spicy::rt::sink::ReassemblerPolicy& x, adl::tag );
hilti::rt::integer::safe< uint64_t > size() const
Definition: sink.h:249
void set_auto_trim(bool enable)
Definition: sink.h:227
hilti::rt::Resumable resumable
Definition: sink.h:50
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:234
uint64_t sequence_number() const
Definition: sink.h:220
Definition: reference.h:663
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:140
void close()
Definition: sink.h:170
void connect_mime_type(const std::string &mt, const std::string &scope)
Definition: sink.h:193
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:47
T * as() const
Definition: reference.h:677
void connect_filter(spicy::rt::UnitRef< T > unit)
Definition: sink.h:157
Definition: reference.h:345
void connect_mime_type(const hilti::rt::Bytes &mt, const std::string &scope)
Definition: sink.h:205
static ValueReference self(T *t)
Definition: reference.h:279
const std::string & str() const &
Definition: bytes.h:223
Definition: deferred-expression.h:41
Parser * parser
Definition: sink.h:53
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:244
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:281