From 5cdc7c2c5343fe3c47ea75207b978288ae8f6a3f Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Tue, 11 Sep 2018 17:48:29 +0300 Subject: [PATCH] BufferedStdin: simplify GitOrigin-RevId: 8674573bea8246cf2e5c21bbdaae98712d8c7cc4 --- tdutils/td/utils/port/StdStreams.cpp | 41 ++++++++++++++++++---------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/tdutils/td/utils/port/StdStreams.cpp b/tdutils/td/utils/port/StdStreams.cpp index c220170e..909dd343 100644 --- a/tdutils/td/utils/port/StdStreams.cpp +++ b/tdutils/td/utils/port/StdStreams.cpp @@ -51,9 +51,7 @@ FileFd &Stderr() { namespace detail { class BufferedStdinImpl { public: - BufferedStdinImpl() { - file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().fd())); - copy_file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().fd())); + BufferedStdinImpl() : info_(NativeFd(GetStdHandle(STD_INPUT_HANDLE), true)) { read_thread_ = td::thread([this] { this->read_loop(); }); } BufferedStdinImpl(const BufferedStdinImpl &) = delete; @@ -61,8 +59,7 @@ class BufferedStdinImpl { BufferedStdinImpl(BufferedStdinImpl &&) = delete; BufferedStdinImpl &operator=(BufferedStdinImpl &&) = delete; ~BufferedStdinImpl() { - file_fd_.move_as_native_fd().release(); - copy_file_fd_.move_as_native_fd().release(); + info_.move_as_native_fd().release(); } void close() { close_flag_ = true; @@ -73,37 +70,51 @@ class BufferedStdinImpl { } PollableFdInfo &get_poll_info() { - return file_fd_.get_poll_info(); + return info_; } const PollableFdInfo &get_poll_info() const { - return file_fd_.get_poll_info(); + return info_; } Result flush_read(size_t max_read = std::numeric_limits::max()) TD_WARN_UNUSED_RESULT { - file_fd_.get_poll_info().get_flags(); - file_fd_.get_poll_info().clear_flags(PollFlags::Read()); + info_.get_flags(); + info_.clear_flags(PollFlags::Read()); reader_.sync_with_writer(); return reader_.size(); } private: - FileFd file_fd_; - FileFd copy_file_fd_; + PollableFdInfo info_; ChainBufferWriter writer_; ChainBufferReader reader_ = writer_.extract_reader(); td::thread read_thread_; - std::atomic close_flag_{false}; + void read_loop() { while (!close_flag_) { auto slice = writer_.prepare_append(); - auto size = copy_file_fd_.read(slice).move_as_ok(); - writer_.confirm_append(size); - file_fd_.get_poll_info().add_flags_from_poll(td::PollFlags::Read()); + auto r_size = read(slice); + if (r_size.is_error()) { + LOG(ERROR) << "Stop read stdin loop: " << r_size.error(); + break; + } + writer_.confirm_append(r_size.ok()); + info_.add_flags_from_poll(td::PollFlags::Read()); } //TODO delete } + + Result read(MutableSlice slice) { + auto native_fd = info_.native_fd().io_handle(); + DWORD bytes_read = 0; + auto res = ReadFile(native_fd, slice.data(), narrow_cast(slice.size()), &bytes_read, nullptr); + if (res) { + return static_cast(bytes_read); + } + return OS_ERROR(PSLICE() << "Read from [fd = " << native_fd << "] has failed"); + } + }; void BufferedStdinImplDeleter::operator()(BufferedStdinImpl *impl) { impl->close();