11 #include <hilti/rt/exception.h> 12 #include <hilti/rt/extension-points.h> 13 #include <hilti/rt/types/bytes.h> 14 #include <hilti/rt/types/integer.h> 15 #include <hilti/rt/types/reference.h> 16 #include <hilti/rt/types/stream.h> 18 #include <spicy/rt/debug.h> 19 #include <spicy/rt/filter.h> 20 #include <spicy/rt/mime.h> 21 #include <spicy/rt/parser-fwd.h> 22 #include <spicy/rt/typedefs.h> 29 HILTI_EXCEPTION(SinkError, UserException)
32 enum class ReassemblerPolicy { First };
35 namespace sink::detail {
38 template<
typename T,
typename =
int>
61 auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::__parser.parse2);
65 auto& state = unit->__sink;
67 state->resumable = (*parse2)(
self, state->data, {}, {});
68 state->parser = &U::__parser;
78 ::spicy::rt::detail::ParseSinkFunction parseFunction() {
81 return std::make_pair(std::move(unit), _connectUnit(unit));
94 template<
typename Unit,
auto Hook,
typename... Args>
97 auto unit = u.
as<Unit>();
98 ((*unit).*Hook)(std::forward<Args>(args)...);
103 inline const char sink_name[] =
"__sink__";
116 ~
Sink() { _close(
true); }
120 Sink& operator=(
const Sink&) =
delete;
131 SPICY_RT_DEBUG_VERBOSE(
hilti::rt::fmt(
"connecting parser %s [%p] to sink %p", T::__parser.name, &*unit,
this));
132 auto state = spicy::rt::sink::detail::_connectUnit(unit);
133 _units.emplace_back(std::move(unit));
134 _states.emplace_back(std::move(state));
149 throw SinkError(
"cannot connect filter after data has been forwarded already");
151 SPICY_RT_DEBUG_VERBOSE(
152 hilti::rt::fmt(
"connecting filter unit %s [%p] to sink %p", T::__parser.name, &*unit,
this));
153 spicy::rt::filter::connect(_filter, unit);
169 void connect_mime_type(
const MIMEType& mt);
196 void gap(uint64_t seq, uint64_t len);
216 if ( _haveInput() ) {
218 throw SinkError(
"sink cannot update initial sequence number after activity has already been seen");
225 void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
230 hilti::rt::integer::safe<uint64_t>
size()
const {
return _size; }
237 void skip(uint64_t seq);
244 void trim(uint64_t seq);
253 void write(
hilti::rt::Bytes data, std::optional<uint64_t> seq = {}, std::optional<uint64_t> len = {});
266 std::optional<hilti::rt::Bytes> data;
270 Chunk(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper)
271 : data(std::move(data)), rseq(rseq), rupper(rupper) {}
274 using ChunkList = std::list<Chunk>;
277 bool _haveInput() {
return _cur_rseq || _chunks.size(); }
281 void _close(
bool orderly);
284 uint64_t _rseq(uint64_t seq)
const {
286 return seq - _initial_seq;
290 uint64_t _aseq(uint64_t rseq)
const {
292 return _initial_seq + rseq;
299 ChunkList::iterator _addAndCheck(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper,
300 ChunkList::iterator c);
303 bool _deliver(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper);
306 void _newData(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t len);
309 void _skip(uint64_t rseq);
312 void _trim(uint64_t rseq);
315 void _tryDeliver(ChunkList::iterator c);
318 void _reportGap(uint64_t rseq, uint64_t len)
const;
320 void _reportSkipped(uint64_t rseq)
const;
322 void _reportUndeliveredUpTo(uint64_t rupper)
const;
325 void _debugReassembler(
const std::string& msg,
const std::optional<hilti::rt::Bytes>& data, uint64_t seq,
327 void _debugReassemblerBuffer(
const std::string& msg)
const;
331 std::vector<sink::detail::State*> _states;
334 std::vector<hilti::rt::StrongReferenceGeneric> _units;
343 std::optional<FilterData> _filter_data;
346 sink::ReassemblerPolicy _policy;
349 uint64_t _initial_seq{};
350 uint64_t _cur_rseq{};
351 uint64_t _last_reassem_rseq{};
352 uint64_t _trim_rseq{};
360 std::string to_string(
const spicy::rt::sink::ReassemblerPolicy& x, adl::tag );
hilti::rt::integer::safe< uint64_t > size() const
Definition: sink.h:230
void set_auto_trim(bool enable)
Definition: sink.h:208
hilti::rt::Resumable resumable
Definition: sink.h:50
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:215
uint64_t sequence_number() const
Definition: sink.h:201
Definition: reference.h:640
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:130
void close()
Definition: sink.h:160
void connect_mime_type(const std::string &mt)
Definition: sink.h:179
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:47
T * as() const
Definition: reference.h:654
void connect_filter(spicy::rt::UnitRef< T > unit)
Definition: sink.h:147
Definition: reference.h:328
static ValueReference self(T *t)
Definition: reference.h:262
const std::string & str() const &
Definition: bytes.h:216
Definition: deferred-expression.h:41
void connect_mime_type(const hilti::rt::Bytes &mt)
Definition: sink.h:188
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:225
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:262