11 #include <hilti/rt/exception.h> 12 #include <hilti/rt/extension-points.h> 13 #include <hilti/rt/types/bytes.h> 14 #include <hilti/rt/types/reference.h> 15 #include <hilti/rt/types/stream.h> 17 #include <spicy/rt/debug.h> 18 #include <spicy/rt/filter.h> 19 #include <spicy/rt/mime.h> 20 #include <spicy/rt/parser-fwd.h> 21 #include <spicy/rt/typedefs.h> 28 HILTI_EXCEPTION(SinkError, UserException)
31 enum class ReassemblerPolicy { First };
34 namespace sink::detail {
37 template<
typename T,
typename =
int>
60 auto parse2 = hilti::rt::any_cast<spicy::rt::Parse2Function<U>>(U::__parser.parse2);
64 auto& state = unit->__sink;
66 state->resumable = (*parse2)(
self, state->data, {}, {});
67 state->parser = &U::__parser;
77 ::spicy::rt::detail::ParseSinkFunction parseFunction() {
80 return std::make_pair(std::move(unit), _connectUnit(unit));
93 template<
typename Unit,
auto Hook,
typename... Args>
96 auto unit = u.
as<Unit>();
97 ((*unit).*Hook)(std::forward<Args>(args)...);
102 inline const char sink_name[] =
"__sink__";
115 ~
Sink() { _close(
true); }
119 Sink& operator=(
const Sink&) =
delete;
130 SPICY_RT_DEBUG_VERBOSE(
hilti::rt::fmt(
"connecting parser %s [%p] to sink %p", T::__parser.name, &*unit,
this));
131 auto state = spicy::rt::sink::detail::_connectUnit(unit);
132 _units.emplace_back(std::move(unit));
133 _states.emplace_back(std::move(state));
148 throw SinkError(
"cannot connect filter after data has been forwarded already");
150 SPICY_RT_DEBUG_VERBOSE(
151 hilti::rt::fmt(
"connecting filter unit %s [%p] to sink %p", T::__parser.name, &*unit,
this));
152 spicy::rt::filter::connect(_filter, unit);
168 void connect_mime_type(
const MIMEType& mt);
195 void gap(uint64_t seq, uint64_t len);
215 if ( _haveInput() ) {
217 throw SinkError(
"sink cannot update initial sequence number after activity has already been seen");
224 void set_policy(sink::ReassemblerPolicy policy) { _policy = policy; }
229 uint64_t
size()
const {
return _size; }
236 void skip(uint64_t seq);
243 void trim(uint64_t seq);
252 void write(
hilti::rt::Bytes data, std::optional<uint64_t> seq = {}, std::optional<uint64_t> len = {});
265 std::optional<hilti::rt::Bytes> data;
269 Chunk(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper)
270 : data(std::move(data)), rseq(rseq), rupper(rupper) {}
273 using ChunkList = std::list<Chunk>;
276 bool _haveInput() {
return _cur_rseq || _chunks.size(); }
280 void _close(
bool orderly);
283 uint64_t _rseq(uint64_t seq)
const {
285 return seq - _initial_seq;
289 uint64_t _aseq(uint64_t rseq)
const {
291 return _initial_seq + rseq;
298 ChunkList::iterator _addAndCheck(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper,
299 ChunkList::iterator c);
302 bool _deliver(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t rupper);
305 void _newData(std::optional<hilti::rt::Bytes> data, uint64_t rseq, uint64_t len);
308 void _skip(uint64_t rseq);
311 void _trim(uint64_t rseq);
314 void _tryDeliver(ChunkList::iterator c);
317 void _reportGap(uint64_t rseq, uint64_t len)
const;
319 void _reportSkipped(uint64_t rseq)
const;
321 void _reportUndeliveredUpTo(uint64_t rupper)
const;
324 void _debugReassembler(
const std::string& msg,
const std::optional<hilti::rt::Bytes>& data, uint64_t seq,
326 void _debugReassemblerBuffer(
const std::string& msg)
const;
330 std::vector<sink::detail::State*> _states;
333 std::vector<hilti::rt::StrongReferenceGeneric> _units;
342 std::optional<FilterData> _filter_data;
345 sink::ReassemblerPolicy _policy;
348 uint64_t _initial_seq{};
349 uint64_t _cur_rseq{};
350 uint64_t _last_reassem_rseq{};
351 uint64_t _trim_rseq{};
359 std::string to_string(
const spicy::rt::sink::ReassemblerPolicy& x, adl::tag );
void set_auto_trim(bool enable)
Definition: sink.h:207
hilti::rt::Resumable resumable
Definition: sink.h:49
uint64_t size() const
Definition: sink.h:229
void set_initial_sequence_number(uint64_t seq)
Definition: sink.h:214
uint64_t sequence_number() const
Definition: sink.h:200
Definition: reference.h:633
void connect(spicy::rt::UnitRef< T > unit)
Definition: sink.h:129
void close()
Definition: sink.h:159
void connect_mime_type(const std::string &mt)
Definition: sink.h:178
hilti::rt::ValueReference< hilti::rt::Stream > data
Definition: sink.h:46
T * as() const
Definition: reference.h:647
void connect_filter(spicy::rt::UnitRef< T > unit)
Definition: sink.h:146
Definition: reference.h:321
static ValueReference self(T *t)
Definition: reference.h:255
const std::string & str() const &
Definition: bytes.h:214
Definition: deferred-expression.h:41
void connect_mime_type(const hilti::rt::Bytes &mt)
Definition: sink.h:187
void set_policy(sink::ReassemblerPolicy policy)
Definition: sink.h:224
std::string fmt(const char *fmt, const Args &... args)
Definition: fmt.h:13
filter::State< sink::detail::sink_name > _filter
Definition: sink.h:261