Simplify NativeFd.
GitOrigin-RevId: 07a33f1ad18b426ef339da76467c667607c1a3b9
This commit is contained in:
parent
db228c09c4
commit
10118d0449
@ -219,7 +219,7 @@ Result<size_t> FileFd::write(Slice slice) {
|
||||
}
|
||||
return std::move(error);
|
||||
#elif TD_PORT_WINDOWS
|
||||
auto native_fd = get_native_fd().io_handle();
|
||||
auto native_fd = get_native_fd().fd();
|
||||
DWORD bytes_written = 0;
|
||||
auto res = WriteFile(native_fd, slice.data(), narrow_cast<DWORD>(slice.size()), &bytes_written, nullptr);
|
||||
if (res) {
|
||||
@ -252,7 +252,7 @@ Result<size_t> FileFd::read(MutableSlice slice) {
|
||||
}
|
||||
return std::move(error);
|
||||
#elif TD_PORT_WINDOWS
|
||||
auto native_fd = get_native_fd().io_handle();
|
||||
auto native_fd = get_native_fd().fd();
|
||||
DWORD bytes_read = 0;
|
||||
auto res = ReadFile(native_fd, slice.data(), narrow_cast<DWORD>(slice.size()), &bytes_read, nullptr);
|
||||
if (res) {
|
||||
@ -289,7 +289,7 @@ Result<size_t> FileFd::pwrite(Slice slice, int64 offset) {
|
||||
}
|
||||
return std::move(error);
|
||||
#elif TD_PORT_WINDOWS
|
||||
auto native_fd = get_native_fd().io_handle();
|
||||
auto native_fd = get_native_fd().fd();
|
||||
DWORD bytes_written = 0;
|
||||
OVERLAPPED overlapped;
|
||||
std::memset(&overlapped, 0, sizeof(overlapped));
|
||||
@ -327,7 +327,7 @@ Result<size_t> FileFd::pread(MutableSlice slice, int64 offset) {
|
||||
}
|
||||
return std::move(error);
|
||||
#elif TD_PORT_WINDOWS
|
||||
auto native_fd = get_native_fd().io_handle();
|
||||
auto native_fd = get_native_fd().fd();
|
||||
DWORD bytes_read = 0;
|
||||
OVERLAPPED overlapped;
|
||||
std::memset(&overlapped, 0, sizeof(overlapped));
|
||||
@ -348,7 +348,7 @@ Status FileFd::lock(FileFd::LockFlags flags, int32 max_tries) {
|
||||
#if TD_PORT_POSIX
|
||||
auto native_fd = get_native_fd().fd();
|
||||
#elif TD_PORT_WINDOWS
|
||||
auto native_fd = get_native_fd().io_handle();
|
||||
auto native_fd = get_native_fd().fd();
|
||||
#endif
|
||||
while (true) {
|
||||
#if TD_PORT_POSIX
|
||||
@ -442,8 +442,7 @@ Stat FileFd::stat() {
|
||||
Stat res;
|
||||
|
||||
FILE_BASIC_INFO basic_info;
|
||||
auto status =
|
||||
GetFileInformationByHandleEx(get_native_fd().io_handle(), FileBasicInfo, &basic_info, sizeof(basic_info));
|
||||
auto status = GetFileInformationByHandleEx(get_native_fd().fd(), FileBasicInfo, &basic_info, sizeof(basic_info));
|
||||
if (!status) {
|
||||
auto error = OS_ERROR("Stat failed");
|
||||
LOG(FATAL) << error;
|
||||
@ -454,8 +453,7 @@ Stat FileFd::stat() {
|
||||
res.is_reg_ = true;
|
||||
|
||||
FILE_STANDARD_INFO standard_info;
|
||||
status = GetFileInformationByHandleEx(get_native_fd().io_handle(), FileStandardInfo, &standard_info,
|
||||
sizeof(standard_info));
|
||||
status = GetFileInformationByHandleEx(get_native_fd().fd(), FileStandardInfo, &standard_info, sizeof(standard_info));
|
||||
if (!status) {
|
||||
auto error = OS_ERROR("Stat failed");
|
||||
LOG(FATAL) << error;
|
||||
@ -471,7 +469,7 @@ Status FileFd::sync() {
|
||||
#if TD_PORT_POSIX
|
||||
if (fsync(get_native_fd().fd()) != 0) {
|
||||
#elif TD_PORT_WINDOWS
|
||||
if (FlushFileBuffers(get_native_fd().io_handle()) == 0) {
|
||||
if (FlushFileBuffers(get_native_fd().fd()) == 0) {
|
||||
#endif
|
||||
return OS_ERROR("Sync failed");
|
||||
}
|
||||
@ -486,7 +484,7 @@ Status FileFd::seek(int64 position) {
|
||||
#elif TD_PORT_WINDOWS
|
||||
LARGE_INTEGER offset;
|
||||
offset.QuadPart = position;
|
||||
if (SetFilePointerEx(get_native_fd().io_handle(), offset, nullptr, FILE_BEGIN) == 0) {
|
||||
if (SetFilePointerEx(get_native_fd().fd(), offset, nullptr, FILE_BEGIN) == 0) {
|
||||
#endif
|
||||
return OS_ERROR("Seek failed");
|
||||
}
|
||||
@ -499,7 +497,7 @@ Status FileFd::truncate_to_current_position(int64 current_position) {
|
||||
TRY_RESULT(current_position_off_t, narrow_cast_safe<off_t>(current_position));
|
||||
if (detail::skip_eintr([&] { return ::ftruncate(get_native_fd().fd(), current_position_off_t); }) < 0) {
|
||||
#elif TD_PORT_WINDOWS
|
||||
if (SetEndOfFile(get_native_fd().io_handle()) == 0) {
|
||||
if (SetEndOfFile(get_native_fd().fd()) == 0) {
|
||||
#endif
|
||||
return OS_ERROR("Truncate failed");
|
||||
}
|
||||
|
@ -383,9 +383,9 @@ Status IPAddress::init_sockaddr(sockaddr *addr, socklen_t len) {
|
||||
|
||||
Status IPAddress::init_socket_address(const SocketFd &socket_fd) {
|
||||
is_valid_ = false;
|
||||
auto fd = socket_fd.get_native_fd().socket();
|
||||
auto socket = socket_fd.get_native_fd().socket();
|
||||
socklen_t len = storage_size();
|
||||
int ret = getsockname(fd, &sockaddr_, &len);
|
||||
int ret = getsockname(socket, &sockaddr_, &len);
|
||||
if (ret != 0) {
|
||||
return OS_SOCKET_ERROR("Failed to get socket address");
|
||||
}
|
||||
@ -395,9 +395,9 @@ Status IPAddress::init_socket_address(const SocketFd &socket_fd) {
|
||||
|
||||
Status IPAddress::init_peer_address(const SocketFd &socket_fd) {
|
||||
is_valid_ = false;
|
||||
auto fd = socket_fd.get_native_fd().socket();
|
||||
auto socket = socket_fd.get_native_fd().socket();
|
||||
socklen_t len = storage_size();
|
||||
int ret = getpeername(fd, &sockaddr_, &len);
|
||||
int ret = getpeername(socket, &sockaddr_, &len);
|
||||
if (ret != 0) {
|
||||
return OS_SOCKET_ERROR("Failed to get peer socket address");
|
||||
}
|
||||
|
@ -213,7 +213,7 @@ class ServerSocketFdImpl {
|
||||
Result<SocketFd> accept() {
|
||||
sockaddr_storage addr;
|
||||
socklen_t addr_len = sizeof(addr);
|
||||
int native_fd = get_native_fd().fd();
|
||||
int native_fd = get_native_fd().socket();
|
||||
int r_fd = detail::skip_eintr([&] { return ::accept(native_fd, reinterpret_cast<sockaddr *>(&addr), &addr_len); });
|
||||
auto accept_errno = errno;
|
||||
if (r_fd >= 0) {
|
||||
|
@ -341,7 +341,7 @@ class SocketFdImpl {
|
||||
return info.native_fd();
|
||||
}
|
||||
Result<size_t> write(Slice slice) {
|
||||
int native_fd = get_native_fd().fd();
|
||||
int native_fd = get_native_fd().socket();
|
||||
auto write_res = detail::skip_eintr([&] { return ::write(native_fd, slice.begin(), slice.size()); });
|
||||
auto write_errno = errno;
|
||||
if (write_res >= 0) {
|
||||
@ -385,7 +385,7 @@ class SocketFdImpl {
|
||||
if (get_poll_info().get_flags().has_pending_error()) {
|
||||
TRY_STATUS(get_pending_error());
|
||||
}
|
||||
int native_fd = get_native_fd().fd();
|
||||
int native_fd = get_native_fd().socket();
|
||||
CHECK(slice.size() > 0);
|
||||
auto read_res = detail::skip_eintr([&] { return ::read(native_fd, slice.begin(), slice.size()); });
|
||||
auto read_errno = errno;
|
||||
@ -498,7 +498,7 @@ Result<SocketFd> SocketFd::open(const IPAddress &address) {
|
||||
TRY_STATUS(detail::init_socket_options(native_fd));
|
||||
|
||||
#if TD_PORT_POSIX
|
||||
int e_connect = connect(native_fd.fd(), address.get_sockaddr(), narrow_cast<socklen_t>(address.get_sockaddr_len()));
|
||||
int e_connect = connect(native_fd.socket(), address.get_sockaddr(), narrow_cast<socklen_t>(address.get_sockaddr_len()));
|
||||
if (e_connect == -1) {
|
||||
auto connect_errno = errno;
|
||||
if (connect_errno != EINPROGRESS) {
|
||||
|
@ -52,8 +52,8 @@ namespace detail {
|
||||
class BufferedStdinImpl {
|
||||
public:
|
||||
BufferedStdinImpl() {
|
||||
file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().raw()));
|
||||
copy_file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().raw()));
|
||||
file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().fd()));
|
||||
copy_file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().fd()));
|
||||
read_thread_ = td::thread([this] { this->read_loop(); });
|
||||
}
|
||||
BufferedStdinImpl(const BufferedStdinImpl &) = delete;
|
||||
@ -114,7 +114,7 @@ namespace detail {
|
||||
class BufferedStdinImpl {
|
||||
public:
|
||||
BufferedStdinImpl() {
|
||||
file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().raw()));
|
||||
file_fd_ = FileFd::from_native_fd(NativeFd(Stdin().get_native_fd().fd()));
|
||||
file_fd_.get_native_fd().set_is_blocking(false);
|
||||
}
|
||||
BufferedStdinImpl(const BufferedStdinImpl &) = delete;
|
||||
|
@ -501,7 +501,7 @@ class UdpSocketFdImpl {
|
||||
detail::UdpSocketReceiveHelper helper;
|
||||
helper.to_native(message, message_header);
|
||||
|
||||
auto native_fd = get_native_fd().fd();
|
||||
auto native_fd = get_native_fd().socket();
|
||||
auto recvmsg_res = detail::skip_eintr([&] { return recvmsg(native_fd, &message_header, flags); });
|
||||
auto recvmsg_errno = errno;
|
||||
if (recvmsg_res >= 0) {
|
||||
@ -556,7 +556,7 @@ class UdpSocketFdImpl {
|
||||
detail::UdpSocketSendHelper helper;
|
||||
helper.to_native(message, message_header);
|
||||
|
||||
auto native_fd = get_native_fd().fd();
|
||||
auto native_fd = get_native_fd().socket();
|
||||
auto sendmsg_res = detail::skip_eintr([&] { return sendmsg(native_fd, &message_header, 0); });
|
||||
auto sendmsg_errno = errno;
|
||||
if (sendmsg_res >= 0) {
|
||||
@ -665,7 +665,7 @@ class UdpSocketFdImpl {
|
||||
headers[i].msg_len = 0;
|
||||
}
|
||||
|
||||
auto native_fd = get_native_fd().fd();
|
||||
auto native_fd = get_native_fd().socket();
|
||||
auto sendmmsg_res =
|
||||
detail::skip_eintr([&] { return sendmmsg(native_fd, headers.data(), narrow_cast<unsigned int>(to_send), 0); });
|
||||
auto sendmmsg_errno = errno;
|
||||
@ -716,7 +716,7 @@ class UdpSocketFdImpl {
|
||||
headers[i].msg_len = 0;
|
||||
}
|
||||
|
||||
auto native_fd = get_native_fd().fd();
|
||||
auto native_fd = get_native_fd().socket();
|
||||
auto recvmmsg_res = detail::skip_eintr(
|
||||
[&] { return recvmmsg(native_fd, headers.data(), narrow_cast<unsigned int>(to_receive), flags, nullptr); });
|
||||
auto recvmmsg_errno = errno;
|
||||
|
@ -36,16 +36,16 @@ PollableFdInfo &EventFdWindows::get_poll_info() {
|
||||
}
|
||||
|
||||
void EventFdWindows::release() {
|
||||
SetEvent(event_.io_handle());
|
||||
SetEvent(event_.fd());
|
||||
}
|
||||
|
||||
void EventFdWindows::acquire() {
|
||||
ResetEvent(event_.io_handle());
|
||||
ResetEvent(event_.fd());
|
||||
}
|
||||
|
||||
void EventFdWindows::wait(int timeout_ms) {
|
||||
WaitForSingleObject(event_.io_handle(), timeout_ms);
|
||||
ResetEvent(event_.io_handle());
|
||||
WaitForSingleObject(event_.fd(), timeout_ms);
|
||||
ResetEvent(event_.fd());
|
||||
}
|
||||
|
||||
} // namespace detail
|
||||
|
@ -11,21 +11,21 @@
|
||||
#include "td/utils/Status.h"
|
||||
|
||||
#if TD_PORT_POSIX
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#endif
|
||||
|
||||
namespace td {
|
||||
|
||||
NativeFd::NativeFd(Raw raw) : fd_(raw) {
|
||||
NativeFd::NativeFd(Fd fd) : fd_(fd) {
|
||||
VLOG(fd) << *this << " create";
|
||||
}
|
||||
|
||||
NativeFd::NativeFd(Raw raw, bool nolog) : fd_(raw) {
|
||||
NativeFd::NativeFd(Fd fd, bool nolog) : fd_(fd) {
|
||||
}
|
||||
|
||||
#if TD_PORT_WINDOWS
|
||||
NativeFd::NativeFd(SOCKET raw) : fd_(reinterpret_cast<HANDLE>(raw)), is_socket_(true) {
|
||||
NativeFd::NativeFd(Socket socket) : fd_(reinterpret_cast<Fd>(socket)), is_socket_(true) {
|
||||
VLOG(fd) << *this << " create";
|
||||
}
|
||||
#endif
|
||||
@ -35,10 +35,10 @@ NativeFd::~NativeFd() {
|
||||
}
|
||||
|
||||
NativeFd::operator bool() const {
|
||||
return fd_.get() != empty_raw();
|
||||
return fd_.get() != empty_fd();
|
||||
}
|
||||
|
||||
NativeFd::Raw NativeFd::empty_raw() {
|
||||
NativeFd::Fd NativeFd::empty_fd() {
|
||||
#if TD_PORT_POSIX
|
||||
return -1;
|
||||
#elif TD_PORT_WINDOWS
|
||||
@ -46,27 +46,18 @@ NativeFd::Raw NativeFd::empty_raw() {
|
||||
#endif
|
||||
}
|
||||
|
||||
NativeFd::Raw NativeFd::raw() const {
|
||||
NativeFd::Fd NativeFd::fd() const {
|
||||
return fd_.get();
|
||||
}
|
||||
|
||||
NativeFd::Raw NativeFd::fd() const {
|
||||
return raw();
|
||||
}
|
||||
|
||||
#if TD_PORT_WINDOWS
|
||||
NativeFd::Raw NativeFd::io_handle() const {
|
||||
return raw();
|
||||
}
|
||||
SOCKET NativeFd::socket() const {
|
||||
NativeFd::Socket NativeFd::socket() const {
|
||||
#if TD_PORT_POSIX
|
||||
return fd();
|
||||
#elif TD_PORT_WINDOWS
|
||||
CHECK(is_socket_);
|
||||
return reinterpret_cast<SOCKET>(fd_.get());
|
||||
}
|
||||
#elif TD_PORT_POSIX
|
||||
NativeFd::Raw NativeFd::socket() const {
|
||||
return raw();
|
||||
}
|
||||
return reinterpret_cast<Socket>(fd_.get());
|
||||
#endif
|
||||
}
|
||||
|
||||
Status NativeFd::set_is_blocking(bool is_blocking) const {
|
||||
#if TD_PORT_POSIX
|
||||
@ -116,7 +107,7 @@ void NativeFd::close() {
|
||||
}
|
||||
VLOG(fd) << *this << " close";
|
||||
#if TD_PORT_WINDOWS
|
||||
if (is_socket_ ? closesocket(socket()) : !CloseHandle(io_handle())) {
|
||||
if (is_socket_ ? closesocket(socket()) : !CloseHandle(fd())) {
|
||||
#elif TD_PORT_POSIX
|
||||
if (::close(fd()) < 0) {
|
||||
#endif
|
||||
@ -126,7 +117,7 @@ void NativeFd::close() {
|
||||
fd_ = {};
|
||||
}
|
||||
|
||||
NativeFd::Raw NativeFd::release() {
|
||||
NativeFd::Fd NativeFd::release() {
|
||||
VLOG(fd) << *this << " release";
|
||||
auto res = fd_.get();
|
||||
fd_ = {};
|
||||
@ -134,7 +125,7 @@ NativeFd::Raw NativeFd::release() {
|
||||
}
|
||||
|
||||
StringBuilder &operator<<(StringBuilder &sb, const NativeFd &fd) {
|
||||
return sb << tag("fd", fd.raw());
|
||||
return sb << tag("fd", fd.fd());
|
||||
}
|
||||
|
||||
} // namespace td
|
||||
|
@ -18,33 +18,30 @@ namespace td {
|
||||
class NativeFd {
|
||||
public:
|
||||
#if TD_PORT_POSIX
|
||||
using Raw = int;
|
||||
using Fd = int;
|
||||
using Socket = int;
|
||||
#elif TD_PORT_WINDOWS
|
||||
using Raw = HANDLE;
|
||||
using Fd = HANDLE;
|
||||
using Socket = SOCKET;
|
||||
#endif
|
||||
NativeFd() = default;
|
||||
NativeFd(NativeFd &&) = default;
|
||||
NativeFd &operator=(NativeFd &&) = default;
|
||||
explicit NativeFd(Raw raw);
|
||||
NativeFd &operator=(const NativeFd &) = delete;
|
||||
NativeFd(Raw raw, bool nolog);
|
||||
explicit NativeFd(Fd fd);
|
||||
NativeFd(Fd fd, bool nolog);
|
||||
#if TD_PORT_WINDOWS
|
||||
explicit NativeFd(SOCKET raw);
|
||||
explicit NativeFd(Socket socket);
|
||||
#endif
|
||||
NativeFd(const NativeFd &) = delete;
|
||||
NativeFd &operator=(const NativeFd &) = delete;
|
||||
~NativeFd();
|
||||
|
||||
explicit operator bool() const;
|
||||
|
||||
static Raw empty_raw();
|
||||
static Fd empty_fd();
|
||||
|
||||
Raw raw() const;
|
||||
Raw fd() const;
|
||||
#if TD_PORT_WINDOWS
|
||||
Raw io_handle() const;
|
||||
SOCKET socket() const;
|
||||
#elif TD_PORT_POSIX
|
||||
Raw socket() const;
|
||||
#endif
|
||||
Fd fd() const;
|
||||
Socket socket() const;
|
||||
|
||||
Status set_is_blocking(bool is_blocking) const;
|
||||
|
||||
@ -53,13 +50,13 @@ class NativeFd {
|
||||
Status duplicate(const NativeFd &to) const;
|
||||
|
||||
void close();
|
||||
Raw release();
|
||||
Fd release();
|
||||
|
||||
private:
|
||||
#if TD_PORT_POSIX
|
||||
MovableValue<Raw, -1> fd_;
|
||||
MovableValue<Fd, -1> fd_;
|
||||
#elif TD_PORT_WINDOWS
|
||||
MovableValue<Raw, INVALID_HANDLE_VALUE> fd_;
|
||||
MovableValue<Fd, INVALID_HANDLE_VALUE> fd_;
|
||||
bool is_socket_{false};
|
||||
#endif
|
||||
};
|
||||
|
@ -29,10 +29,10 @@ void IOCP::loop() {
|
||||
DWORD bytes = 0;
|
||||
ULONG_PTR key = 0;
|
||||
WSAOVERLAPPED *overlapped = nullptr;
|
||||
BOOL ok = GetQueuedCompletionStatus(iocp_handle_.io_handle(), &bytes, &key,
|
||||
reinterpret_cast<OVERLAPPED **>(&overlapped), 1000);
|
||||
BOOL ok =
|
||||
GetQueuedCompletionStatus(iocp_handle_.fd(), &bytes, &key, reinterpret_cast<OVERLAPPED **>(&overlapped), 1000);
|
||||
if (bytes || key || overlapped) {
|
||||
// LOG(ERROR) << "Got iocp " << bytes << " " << key << " " << overlapped;
|
||||
// LOG(ERROR) << "Got IOCP " << bytes << " " << key << " " << overlapped;
|
||||
}
|
||||
if (ok) {
|
||||
auto callback = reinterpret_cast<IOCP::Callback *>(key);
|
||||
@ -43,7 +43,7 @@ void IOCP::loop() {
|
||||
callback->on_iocp(bytes, overlapped);
|
||||
} else {
|
||||
if (overlapped != nullptr) {
|
||||
auto error = OS_ERROR("from iocp");
|
||||
auto error = OS_ERROR("Received from IOCP");
|
||||
auto callback = reinterpret_cast<IOCP::Callback *>(key);
|
||||
CHECK(callback != nullptr);
|
||||
callback->on_iocp(std::move(error), overlapped);
|
||||
@ -60,7 +60,7 @@ void IOCP::init() {
|
||||
CHECK(!iocp_handle_);
|
||||
auto res = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
|
||||
if (res == nullptr) {
|
||||
auto error = OS_ERROR("Iocp creation failed");
|
||||
auto error = OS_ERROR("IOCP creation failed");
|
||||
LOG(FATAL) << error;
|
||||
}
|
||||
iocp_handle_ = NativeFd(res);
|
||||
@ -73,16 +73,16 @@ void IOCP::clear() {
|
||||
void IOCP::subscribe(const NativeFd &native_fd, Callback *callback) {
|
||||
CHECK(iocp_handle_);
|
||||
auto iocp_handle =
|
||||
CreateIoCompletionPort(native_fd.io_handle(), iocp_handle_.io_handle(), reinterpret_cast<ULONG_PTR>(callback), 0);
|
||||
CreateIoCompletionPort(native_fd.fd(), iocp_handle_.fd(), reinterpret_cast<ULONG_PTR>(callback), 0);
|
||||
if (iocp_handle == INVALID_HANDLE_VALUE) {
|
||||
auto error = OS_ERROR("CreateIoCompletionPort");
|
||||
LOG(FATAL) << error;
|
||||
}
|
||||
CHECK(iocp_handle == iocp_handle_.io_handle()) << iocp_handle << " " << iocp_handle_.io_handle();
|
||||
CHECK(iocp_handle == iocp_handle_.fd()) << iocp_handle << " " << iocp_handle_.fd();
|
||||
}
|
||||
|
||||
void IOCP::post(size_t size, Callback *callback, WSAOVERLAPPED *overlapped) {
|
||||
PostQueuedCompletionStatus(iocp_handle_.io_handle(), DWORD(size), reinterpret_cast<ULONG_PTR>(callback),
|
||||
PostQueuedCompletionStatus(iocp_handle_.fd(), DWORD(size), reinterpret_cast<ULONG_PTR>(callback),
|
||||
reinterpret_cast<OVERLAPPED *>(overlapped));
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user