From 3266984243683145647df12f26f4e843849a4cca Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Mon, 10 Sep 2018 19:21:34 +0300 Subject: [PATCH] BufferedStdin: windows GitOrigin-RevId: ff7393ea2d648de84f691043aa7780260af77f6d --- tdutils/td/utils/port/StdStreams.cpp | 123 +++++++++++++++++++++++++++ tdutils/td/utils/port/StdStreams.h | 58 +++++-------- 2 files changed, 143 insertions(+), 38 deletions(-) diff --git a/tdutils/td/utils/port/StdStreams.cpp b/tdutils/td/utils/port/StdStreams.cpp index 800d6470..6b0d648c 100644 --- a/tdutils/td/utils/port/StdStreams.cpp +++ b/tdutils/td/utils/port/StdStreams.cpp @@ -46,4 +46,127 @@ FileFd &Stderr() { ); return res; } + +#if TD_WINDOWS +namespace detail { +class BufferedStdinImpl { + public: + BufferedStdinImpl() { + file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().raw())); + file_fd_.get_native_fd().set_is_blocking(false); + read_thread_ = td::thread([this] { this->read_loop(); }); + } + ~BufferedStdinImpl() { + file_fd_.move_as_native_fd().release(); + } + void close() { + close_flag_ = true; + } + + ChainBufferReader &input_buffer() { + return reader_; + } + + PollableFdInfo &get_poll_info() { + return file_fd_.get_poll_info(); + } + const PollableFdInfo &get_poll_info() const { + return file_fd_.get_poll_info(); + } + + Result flush_read(size_t max_read = std::numeric_limits::max()) TD_WARN_UNUSED_RESULT { + reader_.sync_with_writer(); + return reader_.size(); + } + + private: + FileFd file_fd_; + ChainBufferWriter writer_; + ChainBufferReader reader_ = writer_.extract_reader(); + td::thread read_thread_; + + std::atomic close_flag_{false}; + + void read_loop() { + while (!close_flag_) { + auto slice = writer_.prepare_append(); + auto size = file_fd_.read(slice).move_as_ok(); + writer_.confirm_append(size); + file_fd_.get_poll_info().add_flags_from_poll(td::PollFlags::Read()); + } + //TODO delete + } +}; +void BufferedStdinImplDeleter::operator()(BufferedStdinImpl *impl) { + impl->close(); +} +} // namespace detail +#else +namespace detail { +class BufferedStdinImpl { + public: + BufferedStdinImpl() { + file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().raw())); + file_fd_.get_native_fd().set_is_blocking(false); + } + ~BufferedStdinImpl() { + file_fd_.move_as_native_fd().release(); + } + + ChainBufferReader &input_buffer() { + return reader_; + } + + PollableFdInfo &get_poll_info() { + return file_fd_.get_poll_info(); + } + const PollableFdInfo &get_poll_info() const { + return file_fd_.get_poll_info(); + } + + Result flush_read(size_t max_read = std::numeric_limits::max()) TD_WARN_UNUSED_RESULT { + size_t result = 0; + while (::td::can_read(*this) && max_read) { + MutableSlice slice = writer_.prepare_append().truncate(max_read); + TRY_RESULT(x, file_fd_.read(slice)); + slice.truncate(x); + writer_.confirm_append(x); + result += x; + max_read -= x; + } + if (result) { + reader_.sync_with_writer(); + } + return result; + } + + private: + FileFd file_fd_; + ChainBufferWriter writer_; + ChainBufferReader reader_ = writer_.extract_reader(); +}; +void BufferedStdinImplDeleter::operator()(BufferedStdinImpl *impl) { + delete impl; +} +} // namespace detail +#endif +BufferedStdin::BufferedStdin() : impl_(std::make_unique().release()) { +} +BufferedStdin::BufferedStdin(BufferedStdin &&) = default; +BufferedStdin &BufferedStdin::operator=(BufferedStdin &&) = default; +BufferedStdin::~BufferedStdin() = default; + +ChainBufferReader &BufferedStdin::input_buffer() { + return impl_->input_buffer(); +} +PollableFdInfo &BufferedStdin::get_poll_info() { + return impl_->get_poll_info(); +} +const PollableFdInfo &BufferedStdin::get_poll_info() const { + return impl_->get_poll_info(); +} +Result BufferedStdin::flush_read(size_t max_read) { + return impl_->flush_read(max_read); +} + } // namespace td diff --git a/tdutils/td/utils/port/StdStreams.h b/tdutils/td/utils/port/StdStreams.h index 575b1ae2..e22dbc51 100644 --- a/tdutils/td/utils/port/StdStreams.h +++ b/tdutils/td/utils/port/StdStreams.h @@ -7,6 +7,7 @@ #pragma once #include "td/utils/port/FileFd.h" +#include "td/utils/port/thread.h" #include "td/utils/buffer.h" namespace td { @@ -15,48 +16,29 @@ FileFd &Stdin(); FileFd &Stdout(); FileFd &Stderr(); +namespace detail { +class BufferedStdinImpl; +class BufferedStdinImplDeleter { + public: + void operator()(BufferedStdinImpl *impl); +}; +} // namespace detail + class BufferedStdin { public: - BufferedStdin() { - file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().raw())); - file_fd_.get_native_fd().set_is_blocking(false); - } - - ~BufferedStdin() { - file_fd_.move_as_native_fd().release(); - } - - ChainBufferReader &input_buffer() { - return reader_; - } - - PollableFdInfo &get_poll_info() { - return file_fd_.get_poll_info(); - } - const PollableFdInfo &get_poll_info() const { - return file_fd_.get_poll_info(); - } - - Result flush_read(size_t max_read = std::numeric_limits::max()) TD_WARN_UNUSED_RESULT { - size_t result = 0; - while (::td::can_read(*this) && max_read) { - MutableSlice slice = writer_.prepare_append().truncate(max_read); - TRY_RESULT(x, file_fd_.read(slice)); - slice.truncate(x); - writer_.confirm_append(x); - result += x; - max_read -= x; - } - if (result) { - reader_.sync_with_writer(); - } - return result; - } + BufferedStdin(); + BufferedStdin(const BufferedStdin &) = delete; + BufferedStdin &operator=(const BufferedStdin &) = delete; + BufferedStdin(BufferedStdin &&); + BufferedStdin &operator=(BufferedStdin &&); + ~BufferedStdin(); + ChainBufferReader &input_buffer(); + PollableFdInfo &get_poll_info(); + const PollableFdInfo &get_poll_info() const; + Result flush_read(size_t max_read = std::numeric_limits::max()) TD_WARN_UNUSED_RESULT; private: - FileFd file_fd_; - ChainBufferWriter writer_; - ChainBufferReader reader_ = writer_.extract_reader(); + std::unique_ptr impl_; }; } // namespace td