diff --git a/tdutils/td/utils/port/Fd.cpp b/tdutils/td/utils/port/Fd.cpp index 9c60c6d9..89e29a9f 100644 --- a/tdutils/td/utils/port/Fd.cpp +++ b/tdutils/td/utils/port/Fd.cpp @@ -4,167 +4,12 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/utils/port/Fd.h" #if 0 -#include "td/utils/common.h" -#include "td/utils/format.h" -#include "td/utils/logging.h" -#include "td/utils/misc.h" -#include "td/utils/Observer.h" - -#if TD_PORT_POSIX - -#include - -#include -#include -#include -#include - -#endif - -#if TD_PORT_WINDOWS - -#include "td/utils/buffer.h" -#include "td/utils/misc.h" - -#include - -#endif - namespace td { #if TD_PORT_POSIX -Fd::InfoSet::InfoSet() { - get_info(0).refcnt = 1; - get_info(1).refcnt = 1; - get_info(2).refcnt = 1; -} -Fd::Info &Fd::InfoSet::get_info(int32 id) { - CHECK(0 <= id && id < InfoSet::MAX_FD) << tag("fd", id); - return fd_array_[id]; -} -Fd::InfoSet Fd::fd_info_set_; - -bool Fd::FlagsSet::write_flags(Flags flags) { - if (!flags) { - return false; - } - auto old_flags = to_write_.fetch_or(flags, std::memory_order_relaxed); - return (flags & ~old_flags) != 0; -} -bool Fd::FlagsSet::write_flags_local(Flags flags) { - auto old_flags = flags_; - flags_ |= flags; - return flags_ != old_flags; -} -bool Fd::FlagsSet::flush() const { - if (to_write_.load(std::memory_order_relaxed) == 0) { - return false; - } - Flags to_write = to_write_.exchange(0, std::memory_order_relaxed); - auto old_flags = flags_; - flags_ |= to_write; - if (flags_ & Close) { - flags_ &= ~Write; - } - return flags_ != old_flags; -} -Fd::Flags Fd::FlagsSet::read_flags() const { - flush(); - return flags_; -} -Fd::Flags Fd::FlagsSet::read_flags_local() const { - return flags_; -} -void Fd::FlagsSet::clear_flags(Flags flags) { - flags_ &= ~flags; -} -void Fd::FlagsSet::clear() { - to_write_ = 0; - flags_ = 0; -} - -// TODO(bug) if constuctor call tries to output something to the LOG it will fail, because log is not initialized -Fd Fd::stderr_(2, Mode::Reference); -Fd Fd::stdout_(1, Mode::Reference); -Fd Fd::stdin_(0, Mode::Reference); - -Fd::Fd() = default; - -Fd::Fd(int fd, Mode mode) : mode_(mode), fd_(fd) { - auto *info = get_info(); - int old_ref_cnt = info->refcnt.load(std::memory_order_relaxed); - if (old_ref_cnt == 0) { - old_ref_cnt = info->refcnt.load(std::memory_order_acquire); - CHECK(old_ref_cnt == 0); - CHECK(mode_ == Mode::Owner) << tag("fd", fd_); - VLOG(fd) << "FD created [fd:" << fd_ << "]"; - - auto fcntl_res = fcntl(fd_, F_GETFD); - auto fcntl_errno = errno; - LOG_IF(FATAL, fcntl_res == -1) << Status::PosixError(fcntl_errno, "fcntl F_GET_FD failed"); - - info->refcnt.store(1, std::memory_order_relaxed); - CHECK(mode_ != Mode::Reference); - CHECK(info->observer == nullptr); - info->flags.clear(); - info->observer = nullptr; - } else { - CHECK(mode_ == Mode::Reference) << tag("fd", fd_); - auto fcntl_res = fcntl(fd_, F_GETFD); - auto fcntl_errno = errno; - LOG_IF(FATAL, fcntl_res == -1) << Status::PosixError(fcntl_errno, "fcntl F_GET_FD failed"); - - CHECK(mode_ == Mode::Reference); - info->refcnt.fetch_add(1, std::memory_order_relaxed); - } -} - -int Fd::move_as_native_fd() { - clear_info(); - auto res = fd_; - fd_ = -1; - return res; -} - -Fd::~Fd() { - close(); -} - -Fd::Fd(Fd &&other) { - fd_ = other.fd_; - mode_ = other.mode_; - other.fd_ = -1; -} - -Fd &Fd::operator=(Fd &&other) { - if (this != &other) { - close(); - - fd_ = other.fd_; - mode_ = other.mode_; - other.fd_ = -1; - } - return *this; -} - -Fd Fd::clone() const { - return Fd(fd_, Mode::Reference); -} - -Fd &Fd::Stderr() { - return stderr_; -} -Fd &Fd::Stdout() { - return stdout_; -} -Fd &Fd::Stdin() { - return stdin_; -} - Status Fd::duplicate(const Fd &from, Fd &to) { CHECK(!from.empty()); CHECK(!to.empty()); @@ -174,246 +19,6 @@ Status Fd::duplicate(const Fd &from, Fd &to) { return Status::OK(); } -bool Fd::empty() const { - return fd_ == -1; -} - -const Fd &Fd::get_fd() const { - return *this; -} - -Fd &Fd::get_fd() { - return *this; -} - -int Fd::get_native_fd() const { - CHECK(!empty()); - return fd_; -} - -void Fd::set_observer(ObserverBase *observer) { - auto *info = get_info(); - CHECK(observer == nullptr || info->observer == nullptr); - info->observer = observer; -} - -ObserverBase *Fd::get_observer() const { - auto *info = get_info(); - return info->observer; -} - -void Fd::close_ref() { - CHECK(mode_ == Mode::Reference); - auto *info = get_info(); - - int old_ref_cnt = info->refcnt.fetch_sub(1, std::memory_order_relaxed); - CHECK(old_ref_cnt > 1) << tag("fd", fd_); - fd_ = -1; -} - -void Fd::close_own() { - CHECK(mode_ == Mode::Owner); - VLOG(fd) << "FD closed [fd:" << fd_ << "]"; - - clear_info(); - ::close(fd_); - fd_ = -1; -} - -void Fd::close() { - if (!empty()) { - switch (mode_) { - case Mode::Reference: - close_ref(); - break; - case Mode::Owner: - close_own(); - break; - } - } -} - -Fd::Info *Fd::get_info() { - CHECK(!empty()); - return &fd_info_set_.get_info(fd_); -} - -const Fd::Info *Fd::get_info() const { - CHECK(!empty()); - return &fd_info_set_.get_info(fd_); -} - -void Fd::clear_info() { - CHECK(!empty()); - CHECK(mode_ != Mode::Reference); - - auto *info = get_info(); - int old_ref_cnt = info->refcnt.load(std::memory_order_relaxed); - CHECK(old_ref_cnt == 1) << old_ref_cnt; - info->flags.clear(); - info->observer = nullptr; - info->refcnt.store(0, std::memory_order_release); -} - -void Fd::after_notify() { - get_info()->flags.flush(); -} - -void Fd::update_flags_notify(Flags new_flags) { - auto *info = get_info(); - auto &flags = info->flags; - if (!flags.write_flags(new_flags)) { - return; - } - VLOG(fd) << "Add flags " << tag("fd", fd_) << tag("to", format::as_binary(new_flags)); - auto observer = info->observer; - if (observer == nullptr) { - return; - } - - observer->notify(); -} - -void Fd::update_flags(Flags flags) { - get_info()->flags.write_flags_local(flags); -} - -Fd::Flags Fd::get_flags() const { - return get_info()->flags.read_flags(); -} -Fd::Flags Fd::get_flags_local() const { - return get_info()->flags.read_flags_local(); -} - -void Fd::clear_flags(Flags flags) { - get_info()->flags.clear_flags(flags); -} - -bool Fd::has_pending_error() const { - return (get_flags() & Fd::Flag::Error) != 0; -} -bool Fd::has_pending_error_local() const { - return (get_flags_local() & Fd::Flag::Error) != 0; -} - -Status Fd::get_pending_error() { - if (!has_pending_error()) { - return Status::OK(); - } - int error = 0; - socklen_t errlen = sizeof(error); - if (getsockopt(fd_, SOL_SOCKET, SO_ERROR, static_cast(&error), &errlen) == 0) { - if (error == 0) { - clear_flags(Fd::Error); - return Status::OK(); - } - return Status::PosixError(error, PSLICE() << "Error on socket [fd_ = " << fd_ << "]"); - } - auto status = OS_SOCKET_ERROR(PSLICE() << "Can't load error on socket [fd_ = " << fd_ << "]"); - LOG(INFO) << "Can't load pending socket error: " << status; - return status; -} - -Result Fd::write_unsafe(Slice slice) { - int native_fd = get_native_fd(); - auto write_res = detail::skip_eintr([&] { return ::write(native_fd, slice.begin(), slice.size()); }); - auto write_errno = errno; - if (write_res >= 0) { - return narrow_cast(write_res); - } - return Status::PosixError(write_errno, PSLICE() << "Write to fd " << native_fd << " has failed"); -} - -Result Fd::write(Slice slice) { - int native_fd = get_native_fd(); - auto write_res = detail::skip_eintr([&] { return ::write(native_fd, slice.begin(), slice.size()); }); - auto write_errno = errno; - if (write_res >= 0) { - return narrow_cast(write_res); - } - - if (write_errno == EAGAIN -#if EAGAIN != EWOULDBLOCK - || write_errno == EWOULDBLOCK -#endif - ) { - clear_flags(Write); - return 0; - } - - auto error = Status::PosixError(write_errno, PSLICE() << "Write to fd " << native_fd << " has failed"); - switch (write_errno) { - case EBADF: - case ENXIO: - case EFAULT: - case EINVAL: - LOG(FATAL) << error; - UNREACHABLE(); - default: - LOG(WARNING) << error; - // fallthrough - case ECONNRESET: - case EDQUOT: - case EFBIG: - case EIO: - case ENETDOWN: - case ENETUNREACH: - case ENOSPC: - case EPIPE: - clear_flags(Write); - update_flags(Close); - return std::move(error); - } -} - -Result Fd::read(MutableSlice slice) { - if (has_pending_error()) { - return get_pending_error(); - } - int native_fd = get_native_fd(); - CHECK(slice.size() > 0); - auto read_res = detail::skip_eintr([&] { return ::read(native_fd, slice.begin(), slice.size()); }); - auto read_errno = errno; - if (read_res >= 0) { - if (read_res == 0) { - errno = 0; - clear_flags(Read); - update_flags(Close); - } - return narrow_cast(read_res); - } - if (read_errno == EAGAIN -#if EAGAIN != EWOULDBLOCK - || read_errno == EWOULDBLOCK -#endif - ) { - clear_flags(Read); - return 0; - } - auto error = Status::PosixError(read_errno, PSLICE() << "Read from fd " << native_fd << " has failed"); - switch (read_errno) { - case EISDIR: - case EBADF: - case ENXIO: - case EFAULT: - case EINVAL: - LOG(FATAL) << error; - UNREACHABLE(); - default: - LOG(WARNING) << error; - // fallthrough - case ENOTCONN: - case EIO: - case ENOBUFS: - case ENOMEM: - case ECONNRESET: - case ETIMEDOUT: - clear_flags(Read); - update_flags(Close); - return std::move(error); - } -} - Status Fd::set_is_blocking(bool is_blocking) { auto old_flags = fcntl(fd_, F_GETFL); if (old_flags == -1) { @@ -431,612 +36,6 @@ Status Fd::set_is_blocking(bool is_blocking) { #if TD_PORT_WINDOWS -class Fd::FdImpl { - public: - FdImpl(Fd::Type type, HANDLE handle) - : type_(type), handle_(handle), async_mode_(type_ == Fd::Type::EventFd || type_ == Fd::Type::StdinFileFd) { - init(); - } - FdImpl(Fd::Type type, SOCKET socket, int socket_family) - : type_(type), socket_(socket), socket_family_(socket_family), async_mode_(true) { - init(); - } - - FdImpl(const FdImpl &) = delete; - FdImpl &operator=(const FdImpl &) = delete; - FdImpl(FdImpl &&) = delete; - FdImpl &operator=(FdImpl &&) = delete; - - ~FdImpl() { - close(); - } - void set_observer(ObserverBase *observer) { - observer_ = observer; - } - ObserverBase *get_observer() const { - return observer_; - } - - void update_flags_notify(Fd::Flags flags) { - update_flags_inner(flags, true); - } - - void update_flags(Fd::Flags flags) { - update_flags_inner(flags, false); - } - - void update_flags_inner(int32 new_flags, bool notify_flag) { - if (!flags_.write_flags_local(new_flags)) { - return; - } - VLOG(fd) << "Add flags " << tag("fd", get_io_handle()) << tag("to", format::as_binary(new_flags)); - auto observer = observer_; - if (!notify_flag || observer == nullptr) { - return; - } - observer->notify(); - } - - int32 get_flags() const { - return flags_.read_flags_local(); - } - - void clear_flags(Fd::Flags mask) { - flags_.clear_flags(mask); - } - - Status get_pending_error() { - if (!has_pending_error()) { - return Status::OK(); - } - clear_flags(Fd::Error); - return std::move(pending_error_); - } - bool has_pending_error() const { - return (get_flags() & Fd::Flag::Error) != 0; - } - - HANDLE get_read_event() { - if (type() == Fd::Type::StdinFileFd) { - return get_io_handle(); - } - return read_event_; - } - void on_read_event() { - if (type_ == Fd::Type::StdinFileFd) { - return try_read_stdin(); - } - ResetEvent(read_event_); - if (type_ == Fd::Type::EventFd) { - return update_flags_notify(Fd::Flag::Read); - } - if (type_ == Fd::Type::SocketFd && !connected_) { - on_connect_ready(); - } else { - if (!async_read_flag_) { - return; - } - - if (type_ == Fd::Type::ServerSocketFd) { - on_accept_ready(); - } else { - on_read_ready(); - } - } - loop(); - } - HANDLE get_write_event() { - return write_event_; - } - void on_write_event() { - CHECK(async_write_flag_); - ResetEvent(write_event_); - on_write_ready(); - loop(); - } - - SOCKET get_native_socket() const { - return socket_; - } - - HANDLE get_io_handle() const { - CHECK(!empty()); - if (type() == Fd::Type::FileFd || type() == Fd::Type::StdinFileFd) { - return handle_; - } - return reinterpret_cast(socket_); - } - - Result write(Slice slice) TD_WARN_UNUSED_RESULT { - if (async_mode_) { - return write_async(slice); - } else { - return write_sync(slice); - } - } - - Result read(MutableSlice slice) TD_WARN_UNUSED_RESULT { - if (async_mode_) { - return read_async(slice); - } else { - return read_sync(slice); - } - } - - Result write_async(Slice slice) TD_WARN_UNUSED_RESULT { - CHECK(async_mode_); - output_writer_.append(slice); - output_reader_.sync_with_writer(); - loop(); - return slice.size(); - } - Result write_sync(Slice slice) TD_WARN_UNUSED_RESULT { - CHECK(!async_mode_); - DWORD bytes_written = 0; - auto res = WriteFile(get_io_handle(), slice.data(), narrow_cast(slice.size()), &bytes_written, nullptr); - if (!res) { - return OS_ERROR("Failed to write_sync"); - } - return bytes_written; - } - Result read_async(MutableSlice slice) TD_WARN_UNUSED_RESULT { - CHECK(async_mode_); - auto res = input_reader_.advance(min(slice.size(), input_reader_.size()), slice); - if (res == 0) { - clear_flags(Fd::Flag::Read); - } - return res; - } - Result read_sync(MutableSlice slice) TD_WARN_UNUSED_RESULT { - CHECK(!async_mode_); - DWORD bytes_read = 0; - auto res = ReadFile(get_io_handle(), slice.data(), narrow_cast(slice.size()), &bytes_read, nullptr); - if (!res) { - return OS_ERROR("Failed to read_sync"); - } - if (bytes_read == 0) { - clear_flags(Fd::Flag::Read); - } - return bytes_read; - } - - // for ServerSocket - Result accept() { - if (accepted_.empty()) { - clear_flags(Fd::Flag::Read); - return Status::Error(-1, "Operation would block"); - } - auto res = std::move(accepted_.back()); - accepted_.pop_back(); - return std::move(res); - } - - void connect(const IPAddress &addr) { - CHECK(!connected_); - CHECK(type_ == Fd::Type::SocketFd); - DWORD bytes_read; - std::memset(&read_overlapped_, 0, sizeof(read_overlapped_)); - read_overlapped_.hEvent = read_event_; - LPFN_CONNECTEX ConnectExPtr = nullptr; - GUID guid = WSAID_CONNECTEX; - DWORD numBytes; - int error = ::WSAIoctl(socket_, SIO_GET_EXTENSION_FUNCTION_POINTER, static_cast(&guid), sizeof(guid), - static_cast(&ConnectExPtr), sizeof(ConnectExPtr), &numBytes, nullptr, nullptr); - if (error) { - return on_error(OS_SOCKET_ERROR("WSAIoctl failed"), Fd::Flag::Read); - } - auto status = ConnectExPtr(socket_, addr.get_sockaddr(), narrow_cast(addr.get_sockaddr_len()), nullptr, 0, - &bytes_read, &read_overlapped_); - if (status != 0) { - ResetEvent(read_event_); - connected_ = true; - update_flags_notify(Fd::Flag::Read); - return; - } - - auto last_error = GetLastError(); - if (last_error == ERROR_IO_PENDING) { - return; - } - on_error(OS_SOCKET_ERROR("Failed to connect"), Fd::Flag::Read); - } - - // for EventFd - void release() { - CHECK(type_ == Fd::Type::EventFd); - SetEvent(read_event_); - } - - void acquire() { - CHECK(type_ == Fd::Type::EventFd); - ResetEvent(read_event_); - clear_flags(Fd::Flag::Read); - } - - // TODO: interface for BufferedFd optimization. - - bool empty() const { - return type() == Fd::Type::Empty; - } - void close() { - if (empty()) { - return; - } - switch (type()) { - case Fd::Type::StdinFileFd: - case Fd::Type::FileFd: { - if (!CloseHandle(handle_)) { - auto error = OS_ERROR("Failed to close file"); - LOG(ERROR) << error; - } - handle_ = INVALID_HANDLE_VALUE; - break; - } - case Fd::Type::ServerSocketFd: - case Fd::Type::SocketFd: { - if (closesocket(socket_) != 0) { - auto error = OS_SOCKET_ERROR("Failed to close socket"); - LOG(ERROR) << error; - } - socket_ = INVALID_SOCKET; - break; - } - case Fd::Type::EventFd: - break; - default: - UNREACHABLE(); - } - - if (read_event_ != INVALID_HANDLE_VALUE) { - if (!CloseHandle(read_event_)) { - auto error = OS_ERROR("Failed to close event"); - LOG(ERROR) << error; - } - read_event_ = INVALID_HANDLE_VALUE; - } - if (write_event_ != INVALID_HANDLE_VALUE) { - if (!CloseHandle(write_event_)) { - auto error = OS_ERROR("Failed to close event"); - LOG(ERROR) << error; - } - write_event_ = INVALID_HANDLE_VALUE; - } - - type_ = Fd::Type::Empty; - } - - private: - Fd::Type type_; - HANDLE handle_ = INVALID_HANDLE_VALUE; - SOCKET socket_ = INVALID_SOCKET; - - int socket_family_ = 0; - - bool async_mode_ = false; - - ObserverBase *observer_ = nullptr; - Fd::FlagsSet flags_; - Status pending_error_; - Fd::Flags internal_flags_ = Fd::Flag::Write | Fd::Flag::Read; - - HANDLE read_event_ = INVALID_HANDLE_VALUE; // used by WineventPoll - bool async_read_flag_ = false; // do we have pending read? - OVERLAPPED read_overlapped_; - ChainBufferWriter input_writer_; - ChainBufferReader input_reader_ = input_writer_.extract_reader(); - - bool connected_ = false; - - std::vector accepted_; - SOCKET accept_socket_ = INVALID_SOCKET; - static constexpr size_t MAX_ADDR_SIZE = sizeof(sockaddr_in6) + 16; - char addr_buf_[MAX_ADDR_SIZE * 2]; - - HANDLE write_event_ = INVALID_HANDLE_VALUE; // used by WineventPoll - bool async_write_flag_ = false; // do we have pending write? - OVERLAPPED write_overlapped_; - ChainBufferWriter output_writer_; - ChainBufferReader output_reader_ = output_writer_.extract_reader(); - - void init() { - flags_.write_flags_local(Fd::Write); - if (async_mode_) { - if (type_ != Fd::Type::EventFd) { - write_event_ = CreateEventW(nullptr, true, false, nullptr); - } - read_event_ = CreateEventW(nullptr, true, false, nullptr); - loop(); - } - } - - Fd::Type type() const { - return type_; - } - - void on_error(Status error, Fd::Flag flag) { - VLOG(fd) << tag("fd", get_io_handle()) << error; - pending_error_ = std::move(error); - internal_flags_ &= ~flag; - update_flags_notify(Fd::Flag::Error); - } - void on_eof() { - internal_flags_ &= ~Fd::Flag::Read; - update_flags_notify(Fd::Flag::Close); - } - - void on_read_ready() { - async_read_flag_ = false; - DWORD bytes_read; - auto status = GetOverlappedResult(get_io_handle(), &read_overlapped_, &bytes_read, false); - if (status == 0) { - return on_error(OS_ERROR("Failed to read from file"), Fd::Flag::Read); - } - - VLOG(fd) << "Read " << tag("fd", get_io_handle()) << tag("size", bytes_read); - if (bytes_read == 0) { // eof - return on_eof(); - } - input_writer_.confirm_append(bytes_read); - input_reader_.sync_with_writer(); - update_flags_notify(Fd::Flag::Read); - } - void on_write_ready() { - async_write_flag_ = false; - DWORD bytes_written; - auto status = GetOverlappedResult(get_io_handle(), &write_overlapped_, &bytes_written, false); - if (status == 0) { - return on_error(OS_ERROR("Failed to write to file"), Fd::Flag::Write); - } - if (bytes_written != 0) { - VLOG(fd) << "Write " << tag("fd", get_io_handle()) << tag("size", bytes_written); - output_reader_.confirm_read(bytes_written); - update_flags_notify(Fd::Flag::Write); - } - } - - void on_accept_ready() { - async_read_flag_ = false; - DWORD bytes_read; - auto status = GetOverlappedResult(get_io_handle(), &read_overlapped_, &bytes_read, false); - if (status == 0) { - return on_error(OS_ERROR("Failed to accept connection"), Fd::Flag::Write); - } - accepted_.push_back(Fd::create_socket_fd(accept_socket_)); - accept_socket_ = INVALID_SOCKET; - update_flags_notify(Fd::Flag::Read); - } - - void on_connect_ready() { - async_read_flag_ = false; - DWORD bytes_read; - VLOG(fd) << "on_connect_ready"; - auto status = GetOverlappedResult(get_io_handle(), &read_overlapped_, &bytes_read, false); - if (status == 0) { - return on_error(OS_ERROR("Failed to connect"), Fd::Flag::Write); - } - connected_ = true; - VLOG(fd) << "connected = true"; - } - - void try_read_stdin() { - } - void try_start_read() { - auto dest = input_writer_.prepare_append(); - DWORD bytes_read; - std::memset(&read_overlapped_, 0, sizeof(read_overlapped_)); - read_overlapped_.hEvent = read_event_; - VLOG(fd) << "try_read.."; - auto status = - ReadFile(get_io_handle(), dest.data(), narrow_cast(dest.size()), &bytes_read, &read_overlapped_); - if (status != 0) { // ok - ResetEvent(read_event_); - VLOG(fd) << "Read " << tag("fd", get_io_handle()) << tag("size", bytes_read); - if (bytes_read == 0) { // eof - return on_eof(); - } - input_writer_.confirm_append(bytes_read); - input_reader_.sync_with_writer(); - update_flags_notify(Fd::Flag::Read); - return; - } - auto last_error = GetLastError(); - if (last_error == ERROR_IO_PENDING) { - async_read_flag_ = true; - return; - } - on_error(OS_ERROR("Failed to read from file"), Fd::Flag::Read); - } - - void try_start_write() { - auto dest = output_reader_.prepare_read(); - DWORD bytes_written; - std::memset(&write_overlapped_, 0, sizeof(write_overlapped_)); - write_overlapped_.hEvent = write_event_; - VLOG(fd) << "try_start_write"; - auto status = - WriteFile(get_io_handle(), dest.data(), narrow_cast(dest.size()), &bytes_written, &write_overlapped_); - if (status != 0) { // ok - VLOG(fd) << "Write " << tag("fd", get_io_handle()) << tag("size", bytes_written); - ResetEvent(write_event_); - output_reader_.confirm_read(bytes_written); - update_flags_notify(Fd::Flag::Write); - return; - } - auto last_error = GetLastError(); - if (last_error == ERROR_IO_PENDING) { - VLOG(fd) << "try_start_write: ERROR_IO_PENDING"; - async_write_flag_ = true; - return; - } - CHECK(WaitForSingleObject(write_event_, 0) != WAIT_OBJECT_0); - on_error(OS_ERROR("Failed to write to file"), Fd::Flag::Write); - } - - void try_start_accept() { - if (async_read_flag_ == true) { - return; - } - accept_socket_ = socket(socket_family_, SOCK_STREAM, 0); - DWORD bytes_read; - std::memset(&read_overlapped_, 0, sizeof(read_overlapped_)); - read_overlapped_.hEvent = read_event_; - auto status = - AcceptEx(socket_, accept_socket_, addr_buf_, 0, MAX_ADDR_SIZE, MAX_ADDR_SIZE, &bytes_read, &read_overlapped_); - if (status != 0) { - ResetEvent(read_event_); - accepted_.push_back(Fd::create_socket_fd(accept_socket_)); - accept_socket_ = INVALID_SOCKET; - update_flags_notify(Fd::Flag::Read); - return; - } - - auto last_error = GetLastError(); - if (last_error == ERROR_IO_PENDING) { - async_read_flag_ = true; - return; - } - on_error(OS_SOCKET_ERROR("Failed to accept connection"), Fd::Flag::Read); - } - - void loop() { - CHECK(async_mode_); - - if (type_ == Fd::Type::EventFd) { - return; - } - if (type_ == Fd::Type::ServerSocketFd) { - while (async_read_flag_ == false && (internal_flags_ & Fd::Flag::Read) != 0) { - // read always - try_start_accept(); - } - return; - } - if (!connected_) { - return; - } - while (async_read_flag_ == false && (internal_flags_ & Fd::Flag::Read) != 0) { - // read always - try_start_read(); - } - VLOG(fd) << (async_write_flag_ == false) << " " << output_reader_.size() << " " - << ((internal_flags_ & Fd::Flag::Write) != 0); - while (async_write_flag_ == false && output_reader_.size() && (internal_flags_ & Fd::Flag::Write) != 0) { - // write if we have data to write - try_start_write(); - } - } -}; - -Fd::Fd() = default; - -Fd::Fd(Fd &&other) = default; - -Fd &Fd::operator=(Fd &&other) = default; - -Fd::~Fd() = default; - -Fd Fd::create_file_fd(HANDLE handle) { - return Fd(Fd::Type::FileFd, Fd::Mode::Owner, handle); -} - -Fd Fd::create_socket_fd(SOCKET sock) { - return Fd(Fd::Type::SocketFd, Fd::Mode::Owner, sock, AF_UNSPEC); -} - -Fd Fd::create_server_socket_fd(SOCKET sock, int socket_family) { - return Fd(Fd::Type::ServerSocketFd, Fd::Mode::Owner, sock, socket_family); -} - -Fd Fd::create_event_fd() { - return Fd(Fd::Type::EventFd, Fd::Mode::Owner, INVALID_HANDLE_VALUE); -} - -const Fd &Fd::get_fd() const { - return *this; -} - -Fd &Fd::get_fd() { - return *this; -} - -Result Fd::read(MutableSlice slice) { - if (has_pending_error()) { - return get_pending_error(); - } - return impl_->read(slice); -} - -Result Fd::write(Slice slice) { - CHECK(!empty()); - return impl_->write(slice); -} - -bool Fd::empty() const { - return !impl_; -} - -void Fd::close() { - impl_.reset(); -} - -Result Fd::accept() { - return impl_->accept(); -} -void Fd::connect(const IPAddress &addr) { - return impl_->connect(addr); -} - -Fd Fd::clone() const { - return Fd(impl_); -} - -uint64 Fd::get_key() const { - return reinterpret_cast(impl_.get()); -} - -void Fd::set_observer(ObserverBase *observer) { - return impl_->set_observer(observer); -} -ObserverBase *Fd::get_observer() const { - return impl_->get_observer(); -} - -Fd::Flags Fd::get_flags() const { - return impl_->get_flags(); -} -void Fd::update_flags(Flags flags) { - impl_->update_flags(flags); -} - -void Fd::on_read_event() { - impl_->on_read_event(); -} -void Fd::on_write_event() { - impl_->on_write_event(); -} - -bool Fd::has_pending_error() const { - return impl_->has_pending_error(); -} -Status Fd::get_pending_error() { - return impl_->get_pending_error(); -} - -HANDLE Fd::get_read_event() { - return impl_->get_read_event(); -} -HANDLE Fd::get_write_event() { - return impl_->get_write_event(); -} - -SOCKET Fd::get_native_socket() const { - return impl_->get_native_socket(); -} - -HANDLE Fd::get_io_handle() const { - return impl_->get_io_handle(); -} - #if WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP | WINAPI_PARTITION_SYSTEM) Fd &Fd::Stderr() { static auto handle = GetStdHandle(STD_ERROR_HANDLE); @@ -1078,39 +77,6 @@ Status Fd::duplicate(const Fd &from, Fd &to) { return Status::Error("Not supported"); } -Fd::Fd(Type type, Mode mode, HANDLE handle) : mode_(mode), impl_(std::make_shared(type, handle)) { -} - -Fd::Fd(Type type, Mode mode, SOCKET sock, int socket_family) - : mode_(mode), impl_(std::make_shared(type, sock, socket_family)) { -} - -Fd::Fd(std::shared_ptr impl) : mode_(Mode::Reference), impl_(std::move(impl)) { -} - -void Fd::acquire() { - return impl_->acquire(); -} - -void Fd::release() { - return impl_->release(); -} - -class InitWSA { - public: - InitWSA() { - /* Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h */ - WORD wVersionRequested = MAKEWORD(2, 2); - WSADATA wsaData; - if (WSAStartup(wVersionRequested, &wsaData) != 0) { - auto error = OS_SOCKET_ERROR("Failed to init WSA"); - LOG(FATAL) << error; - } - } -}; - -static InitWSA init_wsa; - #endif } // namespace td diff --git a/tdutils/td/utils/port/Fd.h b/tdutils/td/utils/port/Fd.h index c1e6486c..6700fa09 100644 --- a/tdutils/td/utils/port/Fd.h +++ b/tdutils/td/utils/port/Fd.h @@ -10,204 +10,14 @@ #include "td/utils/port/config.h" #include "td/utils/common.h" -#include "td/utils/Slice.h" #include "td/utils/Status.h" -#include "td/utils/port/IPAddress.h" - -#if TD_PORT_WINDOWS -#include -#endif - -#if TD_PORT_POSIX -#include - -#include -#endif - namespace td { -class ObserverBase; - -#if TD_PORT_WINDOWS -namespace detail { -class EventFdWindows; -} // namespace detail -#endif - class Fd { public: - // TODO: Close may be not enough - // Sometimes descriptor is half-closed. - - enum Flag : int32 { - Write = 0x001, - Read = 0x002, - Close = 0x004, - Error = 0x008, - All = Write | Read | Close | Error, - None = 0 - }; - using Flags = int32; - - class FlagsSet { - public: - bool write_flags(Flags flags); - bool write_flags_local(Flags flags); - bool flush() const; - Flags read_flags() const; - Flags read_flags_local() const; - void clear_flags(Flags flags); - void clear(); - - private: - mutable std::atomic to_write_; - mutable Flags flags_; - }; - enum class Mode { Reference, Owner }; - - Fd(); - Fd(const Fd &) = delete; - Fd &operator=(const Fd &) = delete; - Fd(Fd &&other); - Fd &operator=(Fd &&other); - ~Fd(); - -#if TD_PORT_POSIX - Fd(int fd, Mode mode); -#endif -#if TD_PORT_WINDOWS - static Fd create_file_fd(HANDLE handle); - - static Fd create_socket_fd(SOCKET sock); - - static Fd create_server_socket_fd(SOCKET sock, int socket_family); - - static Fd create_event_fd(); -#endif - - Fd clone() const; - - static Fd &Stderr(); - static Fd &Stdout(); - static Fd &Stdin(); - static Status duplicate(const Fd &from, Fd &to); - - bool empty() const; - - const Fd &get_fd() const; - Fd &get_fd(); - - void set_observer(ObserverBase *observer); - ObserverBase *get_observer() const; - - void close(); - - void update_flags(Flags flags); - - void after_notify(); - - Flags get_flags() const; - Flags get_flags_local() const; - - bool has_pending_error() const; - bool has_pending_error_local() const; - Status get_pending_error() TD_WARN_UNUSED_RESULT; - - Result write(Slice slice) TD_WARN_UNUSED_RESULT; - Result read(MutableSlice slice) TD_WARN_UNUSED_RESULT; - -#if TD_PORT_POSIX - void update_flags_notify(Flags flags); - void clear_flags(Flags flags); - - Result write_unsafe(Slice slice) TD_WARN_UNUSED_RESULT; - - int get_native_fd() const; - int move_as_native_fd(); -#endif - -#if TD_PORT_WINDOWS - Result accept() TD_WARN_UNUSED_RESULT; - void connect(const IPAddress &addr); - - uint64 get_key() const; - - HANDLE get_read_event(); - HANDLE get_write_event(); - void on_read_event(); - void on_write_event(); - - SOCKET get_native_socket() const; - HANDLE get_io_handle() const; -#endif - - private: - Mode mode_ = Mode::Owner; - -#if TD_PORT_POSIX - struct Info { - std::atomic refcnt; - FlagsSet flags; - ObserverBase *observer; - }; - struct InfoSet { - InfoSet(); - Info &get_info(int32 id); - - private: - static constexpr int MAX_FD = 1 << 18; - Info fd_array_[MAX_FD]; - }; - static InfoSet fd_info_set_; - - static Fd stderr_; - static Fd stdout_; - static Fd stdin_; - - void update_flags_inner(int32 new_flags, bool notify_flag); - Info *get_info(); - const Info *get_info() const; - void clear_info(); - - void close_ref(); - void close_own(); - - int fd_ = -1; -#endif -#if TD_PORT_WINDOWS - class FdImpl; - - enum class Type { Empty, EventFd, FileFd, StdinFileFd, SocketFd, ServerSocketFd }; - - Fd(Type type, Mode mode, HANDLE handle); - Fd(Type type, Mode mode, SOCKET sock, int socket_family); - explicit Fd(std::shared_ptr impl); - - friend class detail::EventFdWindows; // for release and acquire - - void acquire(); - void release(); - - std::shared_ptr impl_; -#endif }; -template -bool can_read(const FdT &fd) { - return (fd.get_flags() & (Fd::Read | Fd::Error)) != 0; -} - -template -bool can_write(const FdT &fd) { - return (fd.get_flags() & Fd::Write) != 0; -} - -template -bool can_close(const FdT &fd) { - return (fd.get_flags() & Fd::Close) != 0; -} - } // namespace td #endif diff --git a/tdutils/td/utils/port/FileFd.cpp b/tdutils/td/utils/port/FileFd.cpp index e9fbdc97..486d600d 100644 --- a/tdutils/td/utils/port/FileFd.cpp +++ b/tdutils/td/utils/port/FileFd.cpp @@ -222,10 +222,10 @@ Result FileFd::write(Slice slice) { auto native_fd = get_native_fd().io_handle(); DWORD bytes_written = 0; auto res = WriteFile(native_fd, slice.data(), narrow_cast(slice.size()), &bytes_written, nullptr); - if (!res) { - return OS_ERROR("Failed to write_sync"); + if (res) { + return bytes_written; } - return bytes_written; + return OS_ERROR(PSLICE() << "Write to [fd = " << native_fd << "] has failed"); #endif } @@ -255,13 +255,13 @@ Result FileFd::read(MutableSlice slice) { auto native_fd = get_native_fd().io_handle(); DWORD bytes_read = 0; auto res = ReadFile(native_fd, slice.data(), narrow_cast(slice.size()), &bytes_read, nullptr); - if (!res) { - return OS_ERROR("Failed to read_sync"); + if (res) { + if (bytes_read == 0) { + get_poll_info().clear_flags(PollFlags::Read()); + } + return static_cast(bytes_read); } - if (bytes_read == 0) { - get_poll_info().clear_flags(PollFlags::Read()); - } - return bytes_read; + return OS_ERROR(PSLICE() << "Read from [fd = " << native_fd << "] has failed"); #endif } @@ -296,10 +296,10 @@ Result FileFd::pwrite(Slice slice, int64 offset) { overlapped.Offset = static_cast(offset); overlapped.OffsetHigh = static_cast(offset >> 32); auto res = WriteFile(native_fd, slice.data(), narrow_cast(slice.size()), &bytes_written, &overlapped); - if (!res) { - return OS_ERROR("Failed to pwrite"); + if (res) { + return bytes_written; } - return bytes_written; + return OS_ERROR(PSLICE() << "Pwrite to [fd = " << native_fd << "] at [offset = " << offset << "] has failed"); #endif } @@ -334,10 +334,10 @@ Result FileFd::pread(MutableSlice slice, int64 offset) { overlapped.Offset = static_cast(offset); overlapped.OffsetHigh = static_cast(offset >> 32); auto res = ReadFile(native_fd, slice.data(), narrow_cast(slice.size()), &bytes_read, &overlapped); - if (!res) { - return OS_ERROR("Failed to pread"); + if (res) { + return bytes_read; } - return bytes_read; + return OS_ERROR(PSLICE() << "Pread from [fd = " << native_fd << "] at [offset = " << offset << "] has failed"); #endif } diff --git a/tdutils/td/utils/port/ServerSocketFd.cpp b/tdutils/td/utils/port/ServerSocketFd.cpp index 1aed2576..5d102375 100644 --- a/tdutils/td/utils/port/ServerSocketFd.cpp +++ b/tdutils/td/utils/port/ServerSocketFd.cpp @@ -128,7 +128,7 @@ class ServerSocketFdImpl : private IOCP::Callback { VLOG(fd) << get_native_fd().io_handle() << " start accept"; auto status = AcceptEx(get_native_fd().socket(), accept_socket_.socket(), addr_buf_, 0, MAX_ADDR_SIZE, MAX_ADDR_SIZE, nullptr, &read_overlapped_); - if (check_status(status, "accent")) { + if (check_status(status, "Failed to accept connection")) { inc_refcnt(); is_read_active_ = true; } diff --git a/tdutils/td/utils/port/SocketFd.cpp b/tdutils/td/utils/port/SocketFd.cpp index c9471c04..bea99e21 100644 --- a/tdutils/td/utils/port/SocketFd.cpp +++ b/tdutils/td/utils/port/SocketFd.cpp @@ -42,6 +42,7 @@ class SocketFdImpl : private IOCP::Callback { is_read_active_ = true; notify_iocp_connected(); } + SocketFdImpl(NativeFd native_fd, const IPAddress &addr) : info(std::move(native_fd)) { VLOG(fd) << get_native_fd().io_handle() << " create from native_fd and connect"; get_poll_info().add_flags(PollFlags::Write()); @@ -62,14 +63,16 @@ class SocketFdImpl : private IOCP::Callback { auto status = ConnectExPtr(get_native_fd().socket(), addr.get_sockaddr(), narrow_cast(addr.get_sockaddr_len()), nullptr, 0, nullptr, &read_overlapped_); - if (!check_status(status, "connect")) { + if (!check_status(status, "Failed to connect")) { is_read_active_ = false; dec_refcnt(); } } + void close() { notify_iocp_close(); } + PollableFdInfo &get_poll_info() { return info; } @@ -80,6 +83,7 @@ class SocketFdImpl : private IOCP::Callback { const NativeFd &get_native_fd() const { return info.native_fd(); } + Result write(Slice data) { output_writer_.append(data); if (is_write_waiting_) { @@ -89,6 +93,7 @@ class SocketFdImpl : private IOCP::Callback { } return data.size(); } + Result read(MutableSlice slice) { if (get_poll_info().get_flags().has_pending_error()) { TRY_STATUS(get_pending_error()); @@ -100,6 +105,7 @@ class SocketFdImpl : private IOCP::Callback { } return res; } + Status get_pending_error() { Status res; { @@ -158,7 +164,7 @@ class SocketFdImpl : private IOCP::Callback { auto dest = input_writer_.prepare_append(); auto status = ReadFile(get_native_fd().io_handle(), dest.data(), narrow_cast(dest.size()), nullptr, &read_overlapped_); - if (check_status(status, "read")) { + if (check_status(status, "Failed to read from connection")) { inc_refcnt(); is_read_active_ = true; } @@ -185,7 +191,7 @@ class SocketFdImpl : private IOCP::Callback { std::memset(&write_overlapped_, 0, sizeof(write_overlapped_)); auto status = WriteFile(get_native_fd().io_handle(), dest.data(), narrow_cast(dest.size()), nullptr, &write_overlapped_); - if (check_status(status, "write")) { + if (check_status(status, "Failed to write to connection")) { inc_refcnt(); is_write_active_ = true; } diff --git a/tdutils/td/utils/port/SocketFd.h b/tdutils/td/utils/port/SocketFd.h index 2a5a4b2f..e49d476f 100644 --- a/tdutils/td/utils/port/SocketFd.h +++ b/tdutils/td/utils/port/SocketFd.h @@ -44,9 +44,6 @@ class SocketFd { Result write(Slice slice) TD_WARN_UNUSED_RESULT; Result read(MutableSlice slice) TD_WARN_UNUSED_RESULT; - Result write_message(Slice slice) TD_WARN_UNUSED_RESULT; - Result read_message(MutableSlice slice) TD_WARN_UNUSED_RESULT; - const NativeFd &get_native_fd() const; static Result from_native_fd(NativeFd fd);