292 lines
6.6 KiB
C
Raw Normal View History

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/Status.h"
namespace td {
class ByteFlowInterface {
public:
virtual void close_input(Status status) = 0;
virtual void wakeup() = 0;
virtual void set_parent(ByteFlowInterface &other) = 0;
virtual void set_input(ChainBufferReader *input) = 0;
virtual size_t get_need_size() = 0;
ByteFlowInterface() = default;
ByteFlowInterface(const ByteFlowInterface &) = delete;
ByteFlowInterface &operator=(const ByteFlowInterface &) = delete;
ByteFlowInterface(ByteFlowInterface &&) = default;
ByteFlowInterface &operator=(ByteFlowInterface &&) = default;
virtual ~ByteFlowInterface() = default;
};
class ByteFlowBaseCommon : public ByteFlowInterface {
public:
ByteFlowBaseCommon() = default;
void close_input(Status status) final {
if (status.is_error()) {
finish(std::move(status));
} else {
is_input_active_ = false;
wakeup();
}
}
void wakeup() final {
if (stop_flag_) {
return;
}
input_->sync_with_writer();
if (waiting_flag_) {
if (!is_input_active_) {
finish(Status::OK());
}
return;
}
if (is_input_active_) {
if (need_size_ != 0 && input_->size() < need_size_) {
return;
}
}
need_size_ = 0;
loop();
}
size_t get_need_size() final {
return need_size_;
}
virtual void loop() = 0;
protected:
bool waiting_flag_ = false;
ChainBufferReader *input_ = nullptr;
bool is_input_active_ = true;
size_t need_size_ = 0;
void finish(Status status) {
stop_flag_ = true;
need_size_ = 0;
if (parent_) {
parent_->close_input(std::move(status));
parent_ = nullptr;
}
}
void set_need_size(size_t need_size) {
need_size_ = need_size;
}
void on_output_updated() {
if (parent_) {
parent_->wakeup();
}
}
void consume_input() {
waiting_flag_ = true;
if (!is_input_active_) {
finish(Status::OK());
}
}
private:
ByteFlowInterface *parent_ = nullptr;
bool stop_flag_ = false;
friend class ByteFlowBase;
friend class ByteFlowInplaceBase;
};
class ByteFlowBase : public ByteFlowBaseCommon {
public:
ByteFlowBase() = default;
void set_input(ChainBufferReader *input) final {
input_ = input;
}
void set_parent(ByteFlowInterface &other) final {
parent_ = &other;
parent_->set_input(&output_reader_);
}
void loop() override = 0;
// ChainBufferWriter &get_output() {
// return output_;
//}
protected:
ChainBufferWriter output_;
ChainBufferReader output_reader_ = output_.extract_reader();
};
class ByteFlowInplaceBase : public ByteFlowBaseCommon {
public:
ByteFlowInplaceBase() = default;
void set_input(ChainBufferReader *input) final {
input_ = input;
output_ = ChainBufferReader(input_->begin().clone(), input_->begin().clone(), false);
}
void set_parent(ByteFlowInterface &other) final {
parent_ = &other;
parent_->set_input(&output_);
}
void loop() override = 0;
ChainBufferReader &get_output() {
return output_;
}
protected:
ChainBufferReader output_;
};
inline ByteFlowInterface &operator>>(ByteFlowInterface &from, ByteFlowInterface &to) {
from.set_parent(to);
return to;
}
class ByteFlowSource : public ByteFlowInterface {
public:
ByteFlowSource() = default;
explicit ByteFlowSource(ChainBufferReader *buffer) : buffer_(buffer) {
}
ByteFlowSource(ByteFlowSource &&other) : buffer_(other.buffer_), parent_(other.parent_) {
other.buffer_ = nullptr;
other.parent_ = nullptr;
}
ByteFlowSource &operator=(ByteFlowSource &&other) {
buffer_ = other.buffer_;
parent_ = other.parent_;
other.buffer_ = nullptr;
other.parent_ = nullptr;
return *this;
}
ByteFlowSource(const ByteFlowSource &) = delete;
ByteFlowSource &operator=(const ByteFlowSource &) = delete;
~ByteFlowSource() override = default;
void set_input(ChainBufferReader *) final {
UNREACHABLE();
}
void set_parent(ByteFlowInterface &parent) final {
CHECK(parent_ == nullptr);
parent_ = &parent;
parent_->set_input(buffer_);
}
void close_input(Status status) final {
CHECK(parent_);
parent_->close_input(std::move(status));
parent_ = nullptr;
}
void wakeup() final {
CHECK(parent_);
parent_->wakeup();
}
size_t get_need_size() final {
if (parent_ == nullptr) {
return 0;
}
return parent_->get_need_size();
}
private:
ChainBufferReader *buffer_ = nullptr;
ByteFlowInterface *parent_ = nullptr;
};
class ByteFlowSink : public ByteFlowInterface {
public:
void set_input(ChainBufferReader *input) final {
CHECK(buffer_ == nullptr);
buffer_ = input;
}
void set_parent(ByteFlowInterface & /*parent*/) final {
UNREACHABLE();
}
void close_input(Status status) final {
CHECK(active_);
active_ = false;
status_ = std::move(status);
buffer_->sync_with_writer();
}
void wakeup() final {
buffer_->sync_with_writer();
}
size_t get_need_size() final {
UNREACHABLE();
return 0;
}
bool is_ready() {
return !active_;
}
Status &status() {
return status_;
}
ChainBufferReader *result() {
CHECK(is_ready() && status().is_ok());
return buffer_;
}
ChainBufferReader *get_output() {
return buffer_;
}
private:
bool active_ = true;
Status status_;
ChainBufferReader *buffer_ = nullptr;
};
class ByteFlowMoveSink : public ByteFlowInterface {
public:
ByteFlowMoveSink() = default;
explicit ByteFlowMoveSink(ChainBufferWriter *output) {
set_output(output);
}
void set_input(ChainBufferReader *input) final {
CHECK(!input_);
input_ = input;
}
void set_parent(ByteFlowInterface & /*parent*/) final {
UNREACHABLE();
}
void close_input(Status status) final {
CHECK(active_);
active_ = false;
status_ = std::move(status);
wakeup();
}
void wakeup() final {
input_->sync_with_writer();
output_->append(*input_);
}
size_t get_need_size() final {
UNREACHABLE();
return 0;
}
void set_output(ChainBufferWriter *output) {
CHECK(!output_);
output_ = output;
}
bool is_ready() {
return !active_;
}
Status &status() {
return status_;
}
private:
bool active_ = true;
Status status_;
ChainBufferReader *input_ = nullptr;
ChainBufferWriter *output_ = nullptr;
};
} // namespace td