Remove unneeded code from Fd.cpp/Fd.h.

GitOrigin-RevId: a387ef08fbd75c0557f6882bef95a572605979c5
This commit is contained in:
levlam 2018-09-10 16:52:27 +03:00
parent 00b4fe3bc3
commit 27b848f5c0
6 changed files with 25 additions and 1246 deletions

File diff suppressed because it is too large Load Diff

View File

@ -10,204 +10,14 @@
#include "td/utils/port/config.h"
#include "td/utils/common.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/port/IPAddress.h"
#if TD_PORT_WINDOWS
#include <memory>
#endif
#if TD_PORT_POSIX
#include <errno.h>
#include <atomic>
#endif
namespace td {
class ObserverBase;
#if TD_PORT_WINDOWS
namespace detail {
class EventFdWindows;
} // namespace detail
#endif
class Fd {
public:
// TODO: Close may be not enough
// Sometimes descriptor is half-closed.
enum Flag : int32 {
Write = 0x001,
Read = 0x002,
Close = 0x004,
Error = 0x008,
All = Write | Read | Close | Error,
None = 0
};
using Flags = int32;
class FlagsSet {
public:
bool write_flags(Flags flags);
bool write_flags_local(Flags flags);
bool flush() const;
Flags read_flags() const;
Flags read_flags_local() const;
void clear_flags(Flags flags);
void clear();
private:
mutable std::atomic<Flags> to_write_;
mutable Flags flags_;
};
enum class Mode { Reference, Owner };
Fd();
Fd(const Fd &) = delete;
Fd &operator=(const Fd &) = delete;
Fd(Fd &&other);
Fd &operator=(Fd &&other);
~Fd();
#if TD_PORT_POSIX
Fd(int fd, Mode mode);
#endif
#if TD_PORT_WINDOWS
static Fd create_file_fd(HANDLE handle);
static Fd create_socket_fd(SOCKET sock);
static Fd create_server_socket_fd(SOCKET sock, int socket_family);
static Fd create_event_fd();
#endif
Fd clone() const;
static Fd &Stderr();
static Fd &Stdout();
static Fd &Stdin();
static Status duplicate(const Fd &from, Fd &to);
bool empty() const;
const Fd &get_fd() const;
Fd &get_fd();
void set_observer(ObserverBase *observer);
ObserverBase *get_observer() const;
void close();
void update_flags(Flags flags);
void after_notify();
Flags get_flags() const;
Flags get_flags_local() const;
bool has_pending_error() const;
bool has_pending_error_local() const;
Status get_pending_error() TD_WARN_UNUSED_RESULT;
Result<size_t> write(Slice slice) TD_WARN_UNUSED_RESULT;
Result<size_t> read(MutableSlice slice) TD_WARN_UNUSED_RESULT;
#if TD_PORT_POSIX
void update_flags_notify(Flags flags);
void clear_flags(Flags flags);
Result<size_t> write_unsafe(Slice slice) TD_WARN_UNUSED_RESULT;
int get_native_fd() const;
int move_as_native_fd();
#endif
#if TD_PORT_WINDOWS
Result<Fd> accept() TD_WARN_UNUSED_RESULT;
void connect(const IPAddress &addr);
uint64 get_key() const;
HANDLE get_read_event();
HANDLE get_write_event();
void on_read_event();
void on_write_event();
SOCKET get_native_socket() const;
HANDLE get_io_handle() const;
#endif
private:
Mode mode_ = Mode::Owner;
#if TD_PORT_POSIX
struct Info {
std::atomic<int> refcnt;
FlagsSet flags;
ObserverBase *observer;
};
struct InfoSet {
InfoSet();
Info &get_info(int32 id);
private:
static constexpr int MAX_FD = 1 << 18;
Info fd_array_[MAX_FD];
};
static InfoSet fd_info_set_;
static Fd stderr_;
static Fd stdout_;
static Fd stdin_;
void update_flags_inner(int32 new_flags, bool notify_flag);
Info *get_info();
const Info *get_info() const;
void clear_info();
void close_ref();
void close_own();
int fd_ = -1;
#endif
#if TD_PORT_WINDOWS
class FdImpl;
enum class Type { Empty, EventFd, FileFd, StdinFileFd, SocketFd, ServerSocketFd };
Fd(Type type, Mode mode, HANDLE handle);
Fd(Type type, Mode mode, SOCKET sock, int socket_family);
explicit Fd(std::shared_ptr<FdImpl> impl);
friend class detail::EventFdWindows; // for release and acquire
void acquire();
void release();
std::shared_ptr<FdImpl> impl_;
#endif
};
template <class FdT>
bool can_read(const FdT &fd) {
return (fd.get_flags() & (Fd::Read | Fd::Error)) != 0;
}
template <class FdT>
bool can_write(const FdT &fd) {
return (fd.get_flags() & Fd::Write) != 0;
}
template <class FdT>
bool can_close(const FdT &fd) {
return (fd.get_flags() & Fd::Close) != 0;
}
} // namespace td
#endif

View File

@ -222,10 +222,10 @@ Result<size_t> FileFd::write(Slice slice) {
auto native_fd = get_native_fd().io_handle();
DWORD bytes_written = 0;
auto res = WriteFile(native_fd, slice.data(), narrow_cast<DWORD>(slice.size()), &bytes_written, nullptr);
if (!res) {
return OS_ERROR("Failed to write_sync");
if (res) {
return bytes_written;
}
return bytes_written;
return OS_ERROR(PSLICE() << "Write to [fd = " << native_fd << "] has failed");
#endif
}
@ -255,13 +255,13 @@ Result<size_t> FileFd::read(MutableSlice slice) {
auto native_fd = get_native_fd().io_handle();
DWORD bytes_read = 0;
auto res = ReadFile(native_fd, slice.data(), narrow_cast<DWORD>(slice.size()), &bytes_read, nullptr);
if (!res) {
return OS_ERROR("Failed to read_sync");
if (res) {
if (bytes_read == 0) {
get_poll_info().clear_flags(PollFlags::Read());
}
return static_cast<size_t>(bytes_read);
}
if (bytes_read == 0) {
get_poll_info().clear_flags(PollFlags::Read());
}
return bytes_read;
return OS_ERROR(PSLICE() << "Read from [fd = " << native_fd << "] has failed");
#endif
}
@ -296,10 +296,10 @@ Result<size_t> FileFd::pwrite(Slice slice, int64 offset) {
overlapped.Offset = static_cast<DWORD>(offset);
overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
auto res = WriteFile(native_fd, slice.data(), narrow_cast<DWORD>(slice.size()), &bytes_written, &overlapped);
if (!res) {
return OS_ERROR("Failed to pwrite");
if (res) {
return bytes_written;
}
return bytes_written;
return OS_ERROR(PSLICE() << "Pwrite to [fd = " << native_fd << "] at [offset = " << offset << "] has failed");
#endif
}
@ -334,10 +334,10 @@ Result<size_t> FileFd::pread(MutableSlice slice, int64 offset) {
overlapped.Offset = static_cast<DWORD>(offset);
overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
auto res = ReadFile(native_fd, slice.data(), narrow_cast<DWORD>(slice.size()), &bytes_read, &overlapped);
if (!res) {
return OS_ERROR("Failed to pread");
if (res) {
return bytes_read;
}
return bytes_read;
return OS_ERROR(PSLICE() << "Pread from [fd = " << native_fd << "] at [offset = " << offset << "] has failed");
#endif
}

View File

@ -128,7 +128,7 @@ class ServerSocketFdImpl : private IOCP::Callback {
VLOG(fd) << get_native_fd().io_handle() << " start accept";
auto status = AcceptEx(get_native_fd().socket(), accept_socket_.socket(), addr_buf_, 0, MAX_ADDR_SIZE,
MAX_ADDR_SIZE, nullptr, &read_overlapped_);
if (check_status(status, "accent")) {
if (check_status(status, "Failed to accept connection")) {
inc_refcnt();
is_read_active_ = true;
}

View File

@ -42,6 +42,7 @@ class SocketFdImpl : private IOCP::Callback {
is_read_active_ = true;
notify_iocp_connected();
}
SocketFdImpl(NativeFd native_fd, const IPAddress &addr) : info(std::move(native_fd)) {
VLOG(fd) << get_native_fd().io_handle() << " create from native_fd and connect";
get_poll_info().add_flags(PollFlags::Write());
@ -62,14 +63,16 @@ class SocketFdImpl : private IOCP::Callback {
auto status = ConnectExPtr(get_native_fd().socket(), addr.get_sockaddr(), narrow_cast<int>(addr.get_sockaddr_len()),
nullptr, 0, nullptr, &read_overlapped_);
if (!check_status(status, "connect")) {
if (!check_status(status, "Failed to connect")) {
is_read_active_ = false;
dec_refcnt();
}
}
void close() {
notify_iocp_close();
}
PollableFdInfo &get_poll_info() {
return info;
}
@ -80,6 +83,7 @@ class SocketFdImpl : private IOCP::Callback {
const NativeFd &get_native_fd() const {
return info.native_fd();
}
Result<size_t> write(Slice data) {
output_writer_.append(data);
if (is_write_waiting_) {
@ -89,6 +93,7 @@ class SocketFdImpl : private IOCP::Callback {
}
return data.size();
}
Result<size_t> read(MutableSlice slice) {
if (get_poll_info().get_flags().has_pending_error()) {
TRY_STATUS(get_pending_error());
@ -100,6 +105,7 @@ class SocketFdImpl : private IOCP::Callback {
}
return res;
}
Status get_pending_error() {
Status res;
{
@ -158,7 +164,7 @@ class SocketFdImpl : private IOCP::Callback {
auto dest = input_writer_.prepare_append();
auto status =
ReadFile(get_native_fd().io_handle(), dest.data(), narrow_cast<DWORD>(dest.size()), nullptr, &read_overlapped_);
if (check_status(status, "read")) {
if (check_status(status, "Failed to read from connection")) {
inc_refcnt();
is_read_active_ = true;
}
@ -185,7 +191,7 @@ class SocketFdImpl : private IOCP::Callback {
std::memset(&write_overlapped_, 0, sizeof(write_overlapped_));
auto status = WriteFile(get_native_fd().io_handle(), dest.data(), narrow_cast<DWORD>(dest.size()), nullptr,
&write_overlapped_);
if (check_status(status, "write")) {
if (check_status(status, "Failed to write to connection")) {
inc_refcnt();
is_write_active_ = true;
}

View File

@ -44,9 +44,6 @@ class SocketFd {
Result<size_t> write(Slice slice) TD_WARN_UNUSED_RESULT;
Result<size_t> read(MutableSlice slice) TD_WARN_UNUSED_RESULT;
Result<size_t> write_message(Slice slice) TD_WARN_UNUSED_RESULT;
Result<size_t> read_message(MutableSlice slice) TD_WARN_UNUSED_RESULT;
const NativeFd &get_native_fd() const;
static Result<SocketFd> from_native_fd(NativeFd fd);