TD_FD_DEBUG: cmake option to debug EINVAL errors

GitOrigin-RevId: d0bd1be3a16b94a71c45ec4cca5f42e1364a3200
This commit is contained in:
Arseny Smirnov 2019-07-31 12:18:48 +03:00
parent 6ef242ad38
commit 115fba770f
6 changed files with 130 additions and 33 deletions

View File

@ -6,3 +6,4 @@
#cmakedefine01 TD_HAVE_COROUTINES #cmakedefine01 TD_HAVE_COROUTINES
#cmakedefine01 TD_HAVE_ABSL #cmakedefine01 TD_HAVE_ABSL
#cmakedefine01 TD_HAVE_GETOPT #cmakedefine01 TD_HAVE_GETOPT
#cmakedefine01 TD_FD_DEBUG

View File

@ -71,7 +71,7 @@ void Epoll::unsubscribe(PollableFdRef fd_ref) {
int err = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, native_fd, nullptr); int err = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, native_fd, nullptr);
auto epoll_ctl_errno = errno; auto epoll_ctl_errno = errno;
LOG_IF(FATAL, err == -1) << Status::PosixError(epoll_ctl_errno, "epoll_ctl DEL failed") << ", epoll_fd = " << epoll_fd LOG_IF(FATAL, err == -1) << Status::PosixError(epoll_ctl_errno, "epoll_ctl DEL failed") << ", epoll_fd = " << epoll_fd
<< ", fd = " << native_fd; << ", fd = " << native_fd << fd.native_fd().validate();
} }
void Epoll::unsubscribe_before_close(PollableFdRef fd) { void Epoll::unsubscribe_before_close(PollableFdRef fd) {

View File

@ -21,37 +21,33 @@ char disable_linker_warning_about_empty_file_kqueue_cpp TD_UNUSED;
namespace td { namespace td {
namespace detail { namespace detail {
KQueue::KQueue() {
kq = -1;
}
KQueue::~KQueue() { KQueue::~KQueue() {
clear(); clear();
} }
void KQueue::init() { void KQueue::init() {
kq = kqueue(); kq_ = NativeFd(kqueue());
auto kqueue_errno = errno; auto kqueue_errno = errno;
LOG_IF(FATAL, kq == -1) << Status::PosixError(kqueue_errno, "kqueue creation failed"); LOG_IF(FATAL, !kq_) << Status::PosixError(kqueue_errno, "kqueue creation failed");
// TODO: const // TODO: const
events.resize(1000); events_.resize(1000);
changes_n = 0; changes_n_ = 0;
} }
void KQueue::clear() { void KQueue::clear() {
if (kq == -1) { if (!kq_) {
return; return;
} }
events.clear(); events_.clear();
close(kq); kq_.close();
kq = -1; for (auto *list_node = list_root_.next; list_node != &list_root_;) {
for (auto *list_node = list_root.next; list_node != &list_root;) {
auto pollable_fd = PollableFd::from_list_node(list_node); auto pollable_fd = PollableFd::from_list_node(list_node);
list_node = list_node->next; list_node = list_node->next;
} }
} }
int KQueue::update(int nevents, const timespec *timeout, bool may_fail) { int KQueue::update(int nevents, const timespec *timeout, bool may_fail) {
int err = kevent(kq, &events[0], changes_n, &events[0], nevents, timeout); int err = kevent(kq_.fd(), &events_[0], changes_n_, &events_[0], nevents, timeout);
auto kevent_errno = errno; auto kevent_errno = errno;
bool is_fatal_error = [&] { bool is_fatal_error = [&] {
@ -65,7 +61,7 @@ int KQueue::update(int nevents, const timespec *timeout, bool may_fail) {
}(); }();
LOG_IF(FATAL, is_fatal_error) << Status::PosixError(kevent_errno, "kevent failed"); LOG_IF(FATAL, is_fatal_error) << Status::PosixError(kevent_errno, "kevent failed");
changes_n = 0; changes_n_ = 0;
if (err < 0) { if (err < 0) {
return 0; return 0;
} }
@ -73,7 +69,7 @@ int KQueue::update(int nevents, const timespec *timeout, bool may_fail) {
} }
void KQueue::flush_changes(bool may_fail) { void KQueue::flush_changes(bool may_fail) {
if (!changes_n) { if (!changes_n_) {
return; return;
} }
int n = update(0, nullptr, may_fail); int n = update(0, nullptr, may_fail);
@ -82,18 +78,18 @@ void KQueue::flush_changes(bool may_fail) {
void KQueue::add_change(std::uintptr_t ident, int16 filter, uint16 flags, uint32 fflags, std::intptr_t data, void KQueue::add_change(std::uintptr_t ident, int16 filter, uint16 flags, uint32 fflags, std::intptr_t data,
void *udata) { void *udata) {
if (changes_n == static_cast<int>(events.size())) { if (changes_n_ == static_cast<int>(events_.size())) {
flush_changes(); flush_changes();
} }
EV_SET(&events[changes_n], ident, filter, flags, fflags, data, udata); EV_SET(&events_[changes_n_], ident, filter, flags, fflags, data, udata);
VLOG(fd) << "Subscribe [fd:" << ident << "] [filter:" << filter << "] [udata: " << udata << "]"; VLOG(fd) << "Subscribe [fd:" << ident << "] [filter:" << filter << "] [udata: " << udata << "]";
changes_n++; changes_n_++;
} }
void KQueue::subscribe(PollableFd fd, PollFlags flags) { void KQueue::subscribe(PollableFd fd, PollFlags flags) {
auto native_fd = fd.native_fd().fd(); auto native_fd = fd.native_fd().fd();
auto list_node = fd.release_as_list_node(); auto list_node = fd.release_as_list_node();
list_root.put(list_node); list_root_.put(list_node);
if (flags.can_read()) { if (flags.can_read()) {
add_change(native_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, list_node); add_change(native_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, list_node);
} }
@ -103,10 +99,10 @@ void KQueue::subscribe(PollableFd fd, PollFlags flags) {
} }
void KQueue::invalidate(int native_fd) { void KQueue::invalidate(int native_fd) {
for (int i = 0; i < changes_n; i++) { for (int i = 0; i < changes_n_; i++) {
if (events[i].ident == static_cast<std::uintptr_t>(native_fd)) { if (events_[i].ident == static_cast<std::uintptr_t>(native_fd)) {
changes_n--; changes_n_--;
std::swap(events[i], events[changes_n]); std::swap(events_[i], events_[changes_n_]);
i--; i--;
} }
} }
@ -129,7 +125,7 @@ void KQueue::unsubscribe_before_close(PollableFdRef fd_ref) {
invalidate(pollable_fd.native_fd().fd()); invalidate(pollable_fd.native_fd().fd());
// just to avoid O(changes_n ^ 2) // just to avoid O(changes_n ^ 2)
if (changes_n != 0) { if (changes_n_ != 0) {
flush_changes(); flush_changes();
} }
} }
@ -145,9 +141,9 @@ void KQueue::run(int timeout_ms) {
timeout_ptr = &timeout_data; timeout_ptr = &timeout_data;
} }
int n = update(static_cast<int>(events.size()), timeout_ptr); int n = update(static_cast<int>(events_.size()), timeout_ptr);
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
struct kevent *event = &events[i]; struct kevent *event = &events_[i];
PollFlags flags; PollFlags flags;
if (event->filter == EVFILT_WRITE) { if (event->filter == EVFILT_WRITE) {
flags.add_flags(PollFlags::Write()); flags.add_flags(PollFlags::Write());

View File

@ -27,7 +27,7 @@ namespace detail {
class KQueue final : public PollBase { class KQueue final : public PollBase {
public: public:
KQueue(); KQueue() = default;
KQueue(const KQueue &) = delete; KQueue(const KQueue &) = delete;
KQueue &operator=(const KQueue &) = delete; KQueue &operator=(const KQueue &) = delete;
KQueue(KQueue &&) = delete; KQueue(KQueue &&) = delete;
@ -51,10 +51,10 @@ class KQueue final : public PollBase {
} }
private: private:
vector<struct kevent> events; vector<struct kevent> events_;
int changes_n; int changes_n_;
int kq; NativeFd kq_;
ListNode list_root; ListNode list_root_;
int update(int nevents, const timespec *timeout, bool may_fail = false); int update(int nevents, const timespec *timeout, bool may_fail = false);

View File

@ -15,13 +15,98 @@
#include <unistd.h> #include <unistd.h>
#endif #endif
#if TD_FD_DEBUG
#include <set>
#include <mutex>
#endif
namespace td { namespace td {
#if TD_FD_DEBUG
class FdSet {
public:
void on_create_fd(NativeFd::Fd fd) {
CHECK(fd >= 0);
if (is_stdio(fd)) {
return;
}
std::unique_lock<std::mutex> guard(mutex_);
if (fds_.count(fd) > 1) {
LOG(FATAL) << "Create duplicated fd: " << fd;
}
fds_.insert(fd);
}
void on_release_fd(NativeFd::Fd fd) {
CHECK(fd >= 0);
if (is_stdio(fd)) {
return;
}
LOG(FATAL) << "Unexpected release of non stdio NativeFd: " << fd;
}
Status validate(NativeFd::Fd fd) {
if (fd < 0) {
return Status::Error(PSLICE() << "Invalid fd: " << fd);
}
if (is_stdio(fd)) {
return Status::OK();
}
std::unique_lock<std::mutex> guard(mutex_);
if (fds_.count(fd) != 1) {
return Status::Error(PSLICE() << "Unknown fd: " << fd);
}
return Status::OK();
}
void on_close_fd(NativeFd::Fd fd) {
CHECK(fd >= 0);
if (is_stdio(fd)) {
return;
}
std::unique_lock<std::mutex> guard(mutex_);
if (fds_.count(fd) != 1) {
LOG(FATAL) << "Close unknown fd: " << fd;
}
fds_.erase(fd);
}
private:
std::mutex mutex_;
std::set<NativeFd::Fd> fds_;
bool is_stdio(NativeFd::Fd fd) {
return fd >= 0 && fd <= 2;
}
};
namespace {
FdSet &get_fd_set() {
static FdSet res;
return res;
}
} // namespace
#endif
Status NativeFd::validate() const {
#if TD_FD_DEBUG
return get_fd_set().validate(fd_.get());
#else
return Status::OK();
#endif
}
NativeFd::NativeFd(Fd fd) : fd_(fd) { NativeFd::NativeFd(Fd fd) : fd_(fd) {
VLOG(fd) << *this << " create"; VLOG(fd) << *this << " create";
#if TD_FD_DEBUG
get_fd_set().on_create_fd(fd);
#endif
} }
NativeFd::NativeFd(Fd fd, bool nolog) : fd_(fd) { NativeFd::NativeFd(Fd fd, bool nolog) : fd_(fd) {
#if TD_FD_DEBUG
get_fd_set().on_create_fd(fd);
#endif
} }
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
@ -29,6 +114,14 @@ NativeFd::NativeFd(Socket socket) : fd_(reinterpret_cast<Fd>(socket)), is_socket
VLOG(fd) << *this << " create"; VLOG(fd) << *this << " create";
} }
#endif #endif
NativeFd &NativeFd::operator=(NativeFd &&from) {
close();
fd_ = std::move(from.fd_);
#if TD_PORT_WINDOWS
is_socket_ = from.is_socket_;
#endif
return *this;
}
NativeFd::~NativeFd() { NativeFd::~NativeFd() {
close(); close();
@ -105,6 +198,11 @@ void NativeFd::close() {
if (!*this) { if (!*this) {
return; return;
} }
#if TD_FD_DEBUG
get_fd_set().on_close_fd(fd());
#endif
VLOG(fd) << *this << " close"; VLOG(fd) << *this << " close";
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
if (is_socket_ ? closesocket(socket()) : !CloseHandle(fd())) { if (is_socket_ ? closesocket(socket()) : !CloseHandle(fd())) {

View File

@ -26,7 +26,7 @@ class NativeFd {
#endif #endif
NativeFd() = default; NativeFd() = default;
NativeFd(NativeFd &&) = default; NativeFd(NativeFd &&) = default;
NativeFd &operator=(NativeFd &&) = default; NativeFd &operator=(NativeFd &&);
explicit NativeFd(Fd fd); explicit NativeFd(Fd fd);
NativeFd(Fd fd, bool nolog); NativeFd(Fd fd, bool nolog);
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
@ -52,6 +52,8 @@ class NativeFd {
void close(); void close();
Fd release(); Fd release();
Status validate() const;
private: private:
#if TD_PORT_POSIX #if TD_PORT_POSIX
MovableValue<Fd, -1> fd_; MovableValue<Fd, -1> fd_;