// // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/utils/port/StdStreams.h" #include "td/utils/logging.h" #include "td/utils/misc.h" #include "td/utils/port/detail/NativeFd.h" #include "td/utils/port/PollFlags.h" #include "td/utils/port/thread.h" #include "td/utils/Slice.h" #include namespace td { #if TD_PORT_POSIX template static FileFd &get_file_fd() { static FileFd result = FileFd::from_native_fd(NativeFd(id, true)); return result; } FileFd &Stdin() { return get_file_fd<0>(); } FileFd &Stdout() { return get_file_fd<1>(); } FileFd &Stderr() { return get_file_fd<2>(); } #elif TD_PORT_WINDOWS template static FileFd &get_file_fd() { #if WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP | WINAPI_PARTITION_SYSTEM) static auto handle = GetStdHandle(id); LOG_IF(FATAL, handle == INVALID_HANDLE_VALUE) << "Failed to GetStdHandle " << id; static FileFd result = FileFd::from_native_fd(NativeFd(handle, true)); #else static FileFd result; #endif return result; } FileFd &Stdin() { return get_file_fd(); } FileFd &Stdout() { return get_file_fd(); } FileFd &Stderr() { return get_file_fd(); } #endif #if TD_PORT_WINDOWS namespace detail { class BufferedStdinImpl { public: #if WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP | WINAPI_PARTITION_SYSTEM) BufferedStdinImpl() : info_(NativeFd(GetStdHandle(STD_INPUT_HANDLE), true)) { read_thread_ = td::thread([this] { this->read_loop(); }); } #else BufferedStdinImpl() { close(); } #endif BufferedStdinImpl(const BufferedStdinImpl &) = delete; BufferedStdinImpl &operator=(const BufferedStdinImpl &) = delete; BufferedStdinImpl(BufferedStdinImpl &&) = delete; BufferedStdinImpl &operator=(BufferedStdinImpl &&) = delete; ~BufferedStdinImpl() { info_.move_as_native_fd().release(); } void close() { close_flag_ = true; } ChainBufferReader &input_buffer() { return reader_; } PollableFdInfo &get_poll_info() { return info_; } const PollableFdInfo &get_poll_info() const { return info_; } Result flush_read(size_t max_read = std::numeric_limits::max()) TD_WARN_UNUSED_RESULT { info_.get_flags(); info_.clear_flags(PollFlags::Read()); reader_.sync_with_writer(); return reader_.size(); } private: PollableFdInfo info_; ChainBufferWriter writer_; ChainBufferReader reader_ = writer_.extract_reader(); td::thread read_thread_; std::atomic close_flag_{false}; void read_loop() { while (!close_flag_) { auto slice = writer_.prepare_append(); auto r_size = read(slice); if (r_size.is_error()) { LOG(ERROR) << "Stop read stdin loop: " << r_size.error(); break; } writer_.confirm_append(r_size.ok()); info_.add_flags_from_poll(td::PollFlags::Read()); } //TODO delete } Result read(MutableSlice slice) { auto native_fd = info_.native_fd().fd(); DWORD bytes_read = 0; auto res = ReadFile(native_fd, slice.data(), narrow_cast(slice.size()), &bytes_read, nullptr); if (res) { return static_cast(bytes_read); } return OS_ERROR(PSLICE() << "Read from [fd = " << native_fd << "] has failed"); } }; void BufferedStdinImplDeleter::operator()(BufferedStdinImpl *impl) { impl->close(); } } // namespace detail #elif TD_PORT_POSIX namespace detail { class BufferedStdinImpl { public: BufferedStdinImpl() { file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().fd())); file_fd_.get_native_fd().set_is_blocking(false); } BufferedStdinImpl(const BufferedStdinImpl &) = delete; BufferedStdinImpl &operator=(const BufferedStdinImpl &) = delete; BufferedStdinImpl(BufferedStdinImpl &&) = delete; BufferedStdinImpl &operator=(BufferedStdinImpl &&) = delete; ~BufferedStdinImpl() { file_fd_.move_as_native_fd().release(); } ChainBufferReader &input_buffer() { return reader_; } PollableFdInfo &get_poll_info() { return file_fd_.get_poll_info(); } const PollableFdInfo &get_poll_info() const { return file_fd_.get_poll_info(); } Result flush_read(size_t max_read = std::numeric_limits::max()) TD_WARN_UNUSED_RESULT { size_t result = 0; while (::td::can_read(*this) && max_read) { MutableSlice slice = writer_.prepare_append().truncate(max_read); TRY_RESULT(x, file_fd_.read(slice)); slice.truncate(x); writer_.confirm_append(x); result += x; max_read -= x; } if (result) { reader_.sync_with_writer(); } return result; } private: FileFd file_fd_; ChainBufferWriter writer_; ChainBufferReader reader_ = writer_.extract_reader(); }; void BufferedStdinImplDeleter::operator()(BufferedStdinImpl *impl) { delete impl; } } // namespace detail #endif BufferedStdin::BufferedStdin() : impl_(make_unique().release()) { } BufferedStdin::BufferedStdin(BufferedStdin &&) = default; BufferedStdin &BufferedStdin::operator=(BufferedStdin &&) = default; BufferedStdin::~BufferedStdin() = default; ChainBufferReader &BufferedStdin::input_buffer() { return impl_->input_buffer(); } PollableFdInfo &BufferedStdin::get_poll_info() { return impl_->get_poll_info(); } const PollableFdInfo &BufferedStdin::get_poll_info() const { return impl_->get_poll_info(); } Result BufferedStdin::flush_read(size_t max_read) { return impl_->flush_read(max_read); } } // namespace td