BufferedStdin: windows

GitOrigin-RevId: ff7393ea2d648de84f691043aa7780260af77f6d
This commit is contained in:
Arseny Smirnov 2018-09-10 19:21:34 +03:00
parent 982369c495
commit 3266984243
2 changed files with 143 additions and 38 deletions

View File

@ -46,4 +46,127 @@ FileFd &Stderr() {
);
return res;
}
#if TD_WINDOWS
namespace detail {
class BufferedStdinImpl {
public:
BufferedStdinImpl() {
file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().raw()));
file_fd_.get_native_fd().set_is_blocking(false);
read_thread_ = td::thread([this] { this->read_loop(); });
}
~BufferedStdinImpl() {
file_fd_.move_as_native_fd().release();
}
void close() {
close_flag_ = true;
}
ChainBufferReader &input_buffer() {
return reader_;
}
PollableFdInfo &get_poll_info() {
return file_fd_.get_poll_info();
}
const PollableFdInfo &get_poll_info() const {
return file_fd_.get_poll_info();
}
Result<size_t> flush_read(size_t max_read = std::numeric_limits<size_t>::max()) TD_WARN_UNUSED_RESULT {
reader_.sync_with_writer();
return reader_.size();
}
private:
FileFd file_fd_;
ChainBufferWriter writer_;
ChainBufferReader reader_ = writer_.extract_reader();
td::thread read_thread_;
std::atomic<bool> close_flag_{false};
void read_loop() {
while (!close_flag_) {
auto slice = writer_.prepare_append();
auto size = file_fd_.read(slice).move_as_ok();
writer_.confirm_append(size);
file_fd_.get_poll_info().add_flags_from_poll(td::PollFlags::Read());
}
//TODO delete
}
};
void BufferedStdinImplDeleter::operator()(BufferedStdinImpl *impl) {
impl->close();
}
} // namespace detail
#else
namespace detail {
class BufferedStdinImpl {
public:
BufferedStdinImpl() {
file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().raw()));
file_fd_.get_native_fd().set_is_blocking(false);
}
~BufferedStdinImpl() {
file_fd_.move_as_native_fd().release();
}
ChainBufferReader &input_buffer() {
return reader_;
}
PollableFdInfo &get_poll_info() {
return file_fd_.get_poll_info();
}
const PollableFdInfo &get_poll_info() const {
return file_fd_.get_poll_info();
}
Result<size_t> flush_read(size_t max_read = std::numeric_limits<size_t>::max()) TD_WARN_UNUSED_RESULT {
size_t result = 0;
while (::td::can_read(*this) && max_read) {
MutableSlice slice = writer_.prepare_append().truncate(max_read);
TRY_RESULT(x, file_fd_.read(slice));
slice.truncate(x);
writer_.confirm_append(x);
result += x;
max_read -= x;
}
if (result) {
reader_.sync_with_writer();
}
return result;
}
private:
FileFd file_fd_;
ChainBufferWriter writer_;
ChainBufferReader reader_ = writer_.extract_reader();
};
void BufferedStdinImplDeleter::operator()(BufferedStdinImpl *impl) {
delete impl;
}
} // namespace detail
#endif
BufferedStdin::BufferedStdin() : impl_(std::make_unique<detail::BufferedStdinImpl>().release()) {
}
BufferedStdin::BufferedStdin(BufferedStdin &&) = default;
BufferedStdin &BufferedStdin::operator=(BufferedStdin &&) = default;
BufferedStdin::~BufferedStdin() = default;
ChainBufferReader &BufferedStdin::input_buffer() {
return impl_->input_buffer();
}
PollableFdInfo &BufferedStdin::get_poll_info() {
return impl_->get_poll_info();
}
const PollableFdInfo &BufferedStdin::get_poll_info() const {
return impl_->get_poll_info();
}
Result<size_t> BufferedStdin::flush_read(size_t max_read) {
return impl_->flush_read(max_read);
}
} // namespace td

View File

@ -7,6 +7,7 @@
#pragma once
#include "td/utils/port/FileFd.h"
#include "td/utils/port/thread.h"
#include "td/utils/buffer.h"
namespace td {
@ -15,48 +16,29 @@ FileFd &Stdin();
FileFd &Stdout();
FileFd &Stderr();
namespace detail {
class BufferedStdinImpl;
class BufferedStdinImplDeleter {
public:
void operator()(BufferedStdinImpl *impl);
};
} // namespace detail
class BufferedStdin {
public:
BufferedStdin() {
file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().raw()));
file_fd_.get_native_fd().set_is_blocking(false);
}
~BufferedStdin() {
file_fd_.move_as_native_fd().release();
}
ChainBufferReader &input_buffer() {
return reader_;
}
PollableFdInfo &get_poll_info() {
return file_fd_.get_poll_info();
}
const PollableFdInfo &get_poll_info() const {
return file_fd_.get_poll_info();
}
Result<size_t> flush_read(size_t max_read = std::numeric_limits<size_t>::max()) TD_WARN_UNUSED_RESULT {
size_t result = 0;
while (::td::can_read(*this) && max_read) {
MutableSlice slice = writer_.prepare_append().truncate(max_read);
TRY_RESULT(x, file_fd_.read(slice));
slice.truncate(x);
writer_.confirm_append(x);
result += x;
max_read -= x;
}
if (result) {
reader_.sync_with_writer();
}
return result;
}
BufferedStdin();
BufferedStdin(const BufferedStdin &) = delete;
BufferedStdin &operator=(const BufferedStdin &) = delete;
BufferedStdin(BufferedStdin &&);
BufferedStdin &operator=(BufferedStdin &&);
~BufferedStdin();
ChainBufferReader &input_buffer();
PollableFdInfo &get_poll_info();
const PollableFdInfo &get_poll_info() const;
Result<size_t> flush_read(size_t max_read = std::numeric_limits<size_t>::max()) TD_WARN_UNUSED_RESULT;
private:
FileFd file_fd_;
ChainBufferWriter writer_;
ChainBufferReader reader_ = writer_.extract_reader();
std::unique_ptr<detail::BufferedStdinImpl, detail::BufferedStdinImplDeleter> impl_;
};
} // namespace td