BufferedStdin: simplify

GitOrigin-RevId: 8674573bea8246cf2e5c21bbdaae98712d8c7cc4
This commit is contained in:
Arseny Smirnov 2018-09-11 17:48:29 +03:00
parent 4df6f95818
commit 5cdc7c2c53

View File

@ -51,9 +51,7 @@ FileFd &Stderr() {
namespace detail { namespace detail {
class BufferedStdinImpl { class BufferedStdinImpl {
public: public:
BufferedStdinImpl() { BufferedStdinImpl() : info_(NativeFd(GetStdHandle(STD_INPUT_HANDLE), true)) {
file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().fd()));
copy_file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().fd()));
read_thread_ = td::thread([this] { this->read_loop(); }); read_thread_ = td::thread([this] { this->read_loop(); });
} }
BufferedStdinImpl(const BufferedStdinImpl &) = delete; BufferedStdinImpl(const BufferedStdinImpl &) = delete;
@ -61,8 +59,7 @@ class BufferedStdinImpl {
BufferedStdinImpl(BufferedStdinImpl &&) = delete; BufferedStdinImpl(BufferedStdinImpl &&) = delete;
BufferedStdinImpl &operator=(BufferedStdinImpl &&) = delete; BufferedStdinImpl &operator=(BufferedStdinImpl &&) = delete;
~BufferedStdinImpl() { ~BufferedStdinImpl() {
file_fd_.move_as_native_fd().release(); info_.move_as_native_fd().release();
copy_file_fd_.move_as_native_fd().release();
} }
void close() { void close() {
close_flag_ = true; close_flag_ = true;
@ -73,37 +70,51 @@ class BufferedStdinImpl {
} }
PollableFdInfo &get_poll_info() { PollableFdInfo &get_poll_info() {
return file_fd_.get_poll_info(); return info_;
} }
const PollableFdInfo &get_poll_info() const { const PollableFdInfo &get_poll_info() const {
return file_fd_.get_poll_info(); return info_;
} }
Result<size_t> flush_read(size_t max_read = std::numeric_limits<size_t>::max()) TD_WARN_UNUSED_RESULT { Result<size_t> flush_read(size_t max_read = std::numeric_limits<size_t>::max()) TD_WARN_UNUSED_RESULT {
file_fd_.get_poll_info().get_flags(); info_.get_flags();
file_fd_.get_poll_info().clear_flags(PollFlags::Read()); info_.clear_flags(PollFlags::Read());
reader_.sync_with_writer(); reader_.sync_with_writer();
return reader_.size(); return reader_.size();
} }
private: private:
FileFd file_fd_; PollableFdInfo info_;
FileFd copy_file_fd_;
ChainBufferWriter writer_; ChainBufferWriter writer_;
ChainBufferReader reader_ = writer_.extract_reader(); ChainBufferReader reader_ = writer_.extract_reader();
td::thread read_thread_; td::thread read_thread_;
std::atomic<bool> close_flag_{false}; std::atomic<bool> close_flag_{false};
void read_loop() { void read_loop() {
while (!close_flag_) { while (!close_flag_) {
auto slice = writer_.prepare_append(); auto slice = writer_.prepare_append();
auto size = copy_file_fd_.read(slice).move_as_ok(); auto r_size = read(slice);
writer_.confirm_append(size); if (r_size.is_error()) {
file_fd_.get_poll_info().add_flags_from_poll(td::PollFlags::Read()); LOG(ERROR) << "Stop read stdin loop: " << r_size.error();
break;
}
writer_.confirm_append(r_size.ok());
info_.add_flags_from_poll(td::PollFlags::Read());
} }
//TODO delete //TODO delete
} }
Result<size_t> read(MutableSlice slice) {
auto native_fd = info_.native_fd().io_handle();
DWORD bytes_read = 0;
auto res = ReadFile(native_fd, slice.data(), narrow_cast<DWORD>(slice.size()), &bytes_read, nullptr);
if (res) {
return static_cast<size_t>(bytes_read);
}
return OS_ERROR(PSLICE() << "Read from [fd = " << native_fd << "] has failed");
}
}; };
void BufferedStdinImplDeleter::operator()(BufferedStdinImpl *impl) { void BufferedStdinImplDeleter::operator()(BufferedStdinImpl *impl) {
impl->close(); impl->close();