// // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once #include "td/utils/buffer.h" #include "td/utils/common.h" #include "td/utils/format.h" #include "td/utils/logging.h" #include "td/utils/port/detail/PollableFd.h" #include "td/utils/port/IoSlice.h" #include "td/utils/Slice.h" #include "td/utils/Span.h" #include "td/utils/Status.h" #include namespace td { // just reads from given reader and writes to given writer template class BufferedFdBase : public FdT { public: BufferedFdBase() = default; explicit BufferedFdBase(FdT &&fd); // TODO: make move constructor and move assignment safer Result flush_read(size_t max_read = std::numeric_limits::max()) TD_WARN_UNUSED_RESULT; Result flush_write() TD_WARN_UNUSED_RESULT; bool need_flush_write(size_t at_least = 0) { return ready_for_flush_write() > at_least; } size_t ready_for_flush_write() { CHECK(write_); write_->sync_with_writer(); return write_->size(); } void sync_with_poll() { ::td::sync_with_poll(*this); } void set_input_writer(ChainBufferWriter *read) { read_ = read; } void set_output_reader(ChainBufferReader *write) { write_ = write; } private: ChainBufferWriter *read_ = nullptr; ChainBufferReader *write_ = nullptr; }; template class BufferedFd final : public BufferedFdBase { using Parent = BufferedFdBase; ChainBufferWriter input_writer_; ChainBufferReader input_reader_; ChainBufferWriter output_writer_; ChainBufferReader output_reader_; void init(); void init_ptr(); public: BufferedFd(); explicit BufferedFd(FdT &&fd); BufferedFd(BufferedFd &&) noexcept; BufferedFd &operator=(BufferedFd &&) noexcept; BufferedFd(const BufferedFd &) = delete; BufferedFd &operator=(const BufferedFd &) = delete; ~BufferedFd(); void close(); size_t left_unread() const { return input_reader_.size(); } size_t left_unwritten() const { return output_reader_.size(); } Result flush_read(size_t max_read = std::numeric_limits::max()) TD_WARN_UNUSED_RESULT; Result flush_write() TD_WARN_UNUSED_RESULT; // Yep, direct access to buffers. It is IO interface too. ChainBufferReader &input_buffer(); ChainBufferWriter &output_buffer(); }; // IMPLEMENTATION /*** BufferedFd ***/ template BufferedFdBase::BufferedFdBase(FdT &&fd) : FdT(std::move(fd)) { } template Result BufferedFdBase::flush_read(size_t max_read) { CHECK(read_); size_t result = 0; while (::td::can_read_local(*this) && max_read) { MutableSlice slice = read_->prepare_append(); slice.truncate(max_read); TRY_RESULT(x, FdT::read(slice)); slice.truncate(x); read_->confirm_append(x); result += x; max_read -= x; } return result; } template Result BufferedFdBase::flush_write() { // TODO: sync on demand write_->sync_with_writer(); size_t result = 0; while (!write_->empty() && ::td::can_write_local(*this)) { constexpr size_t BUF_SIZE = 20; IoSlice buf[BUF_SIZE]; auto it = write_->clone(); size_t buf_i; for (buf_i = 0; buf_i < BUF_SIZE; buf_i++) { Slice slice = it.prepare_read(); if (slice.empty()) { break; } buf[buf_i] = as_io_slice(slice); it.confirm_read(slice.size()); } TRY_RESULT(x, FdT::writev(Span(buf, buf_i))); write_->advance(x); result += x; } if (result == 0) { if (write_->empty()) { LOG(DEBUG) << "Nothing to write to " << FdT::get_poll_info().native_fd(); } else { LOG(DEBUG) << "Can't flush write to " << FdT::get_poll_info().native_fd() << " with flags = " << FdT::get_poll_info().get_flags_local(); } } return result; } /*** BufferedFd ***/ template void BufferedFd::init() { input_reader_ = input_writer_.extract_reader(); output_reader_ = output_writer_.extract_reader(); init_ptr(); } template void BufferedFd::init_ptr() { this->set_input_writer(&input_writer_); this->set_output_reader(&output_reader_); } template BufferedFd::BufferedFd() { init(); } template BufferedFd::BufferedFd(FdT &&fd) : Parent(std::move(fd)) { init(); } template BufferedFd::BufferedFd(BufferedFd &&from) noexcept { *this = std::move(from); } template BufferedFd &BufferedFd::operator=(BufferedFd &&from) noexcept { FdT::operator=(std::move(static_cast(from))); input_reader_ = std::move(from.input_reader_); input_writer_ = std::move(from.input_writer_); output_reader_ = std::move(from.output_reader_); output_writer_ = std::move(from.output_writer_); init_ptr(); return *this; } template BufferedFd::~BufferedFd() { close(); } template void BufferedFd::close() { FdT::close(); // TODO: clear buffers } template Result BufferedFd::flush_read(size_t max_read) { TRY_RESULT(result, Parent::flush_read(max_read)); if (result) { // TODO: faster sync is possible if you owns writer. input_reader_.sync_with_writer(); LOG(DEBUG) << "Flush read: +" << format::as_size(result) << tag("total", format::as_size(input_reader_.size())); } return result; } template Result BufferedFd::flush_write() { TRY_RESULT(result, Parent::flush_write()); if (result) { LOG(DEBUG) << "Flush write: +" << format::as_size(result) << tag("left", format::as_size(output_reader_.size())); } return result; } // Yep, direct access to buffers. It is IO interface too. template ChainBufferReader &BufferedFd::input_buffer() { return input_reader_; } template ChainBufferWriter &BufferedFd::output_buffer() { return output_writer_; } } // namespace td