diff --git a/tdutils/td/utils/config.h.in b/tdutils/td/utils/config.h.in index ce6edb16..85f7bba7 100644 --- a/tdutils/td/utils/config.h.in +++ b/tdutils/td/utils/config.h.in @@ -6,3 +6,4 @@ #cmakedefine01 TD_HAVE_COROUTINES #cmakedefine01 TD_HAVE_ABSL #cmakedefine01 TD_HAVE_GETOPT +#cmakedefine01 TD_FD_DEBUG diff --git a/tdutils/td/utils/port/detail/Epoll.cpp b/tdutils/td/utils/port/detail/Epoll.cpp index 0fa6eb53..5c371c52 100644 --- a/tdutils/td/utils/port/detail/Epoll.cpp +++ b/tdutils/td/utils/port/detail/Epoll.cpp @@ -71,7 +71,7 @@ void Epoll::unsubscribe(PollableFdRef fd_ref) { int err = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, native_fd, nullptr); auto epoll_ctl_errno = errno; LOG_IF(FATAL, err == -1) << Status::PosixError(epoll_ctl_errno, "epoll_ctl DEL failed") << ", epoll_fd = " << epoll_fd - << ", fd = " << native_fd; + << ", fd = " << native_fd << fd.native_fd().validate(); } void Epoll::unsubscribe_before_close(PollableFdRef fd) { diff --git a/tdutils/td/utils/port/detail/KQueue.cpp b/tdutils/td/utils/port/detail/KQueue.cpp index 3c474442..3e2c4509 100644 --- a/tdutils/td/utils/port/detail/KQueue.cpp +++ b/tdutils/td/utils/port/detail/KQueue.cpp @@ -21,37 +21,33 @@ char disable_linker_warning_about_empty_file_kqueue_cpp TD_UNUSED; namespace td { namespace detail { -KQueue::KQueue() { - kq = -1; -} KQueue::~KQueue() { clear(); } void KQueue::init() { - kq = kqueue(); + kq_ = NativeFd(kqueue()); auto kqueue_errno = errno; - LOG_IF(FATAL, kq == -1) << Status::PosixError(kqueue_errno, "kqueue creation failed"); + LOG_IF(FATAL, !kq_) << Status::PosixError(kqueue_errno, "kqueue creation failed"); // TODO: const - events.resize(1000); - changes_n = 0; + events_.resize(1000); + changes_n_ = 0; } void KQueue::clear() { - if (kq == -1) { + if (!kq_) { return; } - events.clear(); - close(kq); - kq = -1; - for (auto *list_node = list_root.next; list_node != &list_root;) { + events_.clear(); + kq_.close(); + for (auto *list_node = list_root_.next; list_node != &list_root_;) { auto pollable_fd = PollableFd::from_list_node(list_node); list_node = list_node->next; } } int KQueue::update(int nevents, const timespec *timeout, bool may_fail) { - int err = kevent(kq, &events[0], changes_n, &events[0], nevents, timeout); + int err = kevent(kq_.fd(), &events_[0], changes_n_, &events_[0], nevents, timeout); auto kevent_errno = errno; bool is_fatal_error = [&] { @@ -65,7 +61,7 @@ int KQueue::update(int nevents, const timespec *timeout, bool may_fail) { }(); LOG_IF(FATAL, is_fatal_error) << Status::PosixError(kevent_errno, "kevent failed"); - changes_n = 0; + changes_n_ = 0; if (err < 0) { return 0; } @@ -73,7 +69,7 @@ int KQueue::update(int nevents, const timespec *timeout, bool may_fail) { } void KQueue::flush_changes(bool may_fail) { - if (!changes_n) { + if (!changes_n_) { return; } int n = update(0, nullptr, may_fail); @@ -82,18 +78,18 @@ void KQueue::flush_changes(bool may_fail) { void KQueue::add_change(std::uintptr_t ident, int16 filter, uint16 flags, uint32 fflags, std::intptr_t data, void *udata) { - if (changes_n == static_cast(events.size())) { + if (changes_n_ == static_cast(events_.size())) { flush_changes(); } - EV_SET(&events[changes_n], ident, filter, flags, fflags, data, udata); + EV_SET(&events_[changes_n_], ident, filter, flags, fflags, data, udata); VLOG(fd) << "Subscribe [fd:" << ident << "] [filter:" << filter << "] [udata: " << udata << "]"; - changes_n++; + changes_n_++; } void KQueue::subscribe(PollableFd fd, PollFlags flags) { auto native_fd = fd.native_fd().fd(); auto list_node = fd.release_as_list_node(); - list_root.put(list_node); + list_root_.put(list_node); if (flags.can_read()) { add_change(native_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, list_node); } @@ -103,10 +99,10 @@ void KQueue::subscribe(PollableFd fd, PollFlags flags) { } void KQueue::invalidate(int native_fd) { - for (int i = 0; i < changes_n; i++) { - if (events[i].ident == static_cast(native_fd)) { - changes_n--; - std::swap(events[i], events[changes_n]); + for (int i = 0; i < changes_n_; i++) { + if (events_[i].ident == static_cast(native_fd)) { + changes_n_--; + std::swap(events_[i], events_[changes_n_]); i--; } } @@ -129,7 +125,7 @@ void KQueue::unsubscribe_before_close(PollableFdRef fd_ref) { invalidate(pollable_fd.native_fd().fd()); // just to avoid O(changes_n ^ 2) - if (changes_n != 0) { + if (changes_n_ != 0) { flush_changes(); } } @@ -145,9 +141,9 @@ void KQueue::run(int timeout_ms) { timeout_ptr = &timeout_data; } - int n = update(static_cast(events.size()), timeout_ptr); + int n = update(static_cast(events_.size()), timeout_ptr); for (int i = 0; i < n; i++) { - struct kevent *event = &events[i]; + struct kevent *event = &events_[i]; PollFlags flags; if (event->filter == EVFILT_WRITE) { flags.add_flags(PollFlags::Write()); diff --git a/tdutils/td/utils/port/detail/KQueue.h b/tdutils/td/utils/port/detail/KQueue.h index a000def6..d054ed36 100644 --- a/tdutils/td/utils/port/detail/KQueue.h +++ b/tdutils/td/utils/port/detail/KQueue.h @@ -27,7 +27,7 @@ namespace detail { class KQueue final : public PollBase { public: - KQueue(); + KQueue() = default; KQueue(const KQueue &) = delete; KQueue &operator=(const KQueue &) = delete; KQueue(KQueue &&) = delete; @@ -51,10 +51,10 @@ class KQueue final : public PollBase { } private: - vector events; - int changes_n; - int kq; - ListNode list_root; + vector events_; + int changes_n_; + NativeFd kq_; + ListNode list_root_; int update(int nevents, const timespec *timeout, bool may_fail = false); diff --git a/tdutils/td/utils/port/detail/NativeFd.cpp b/tdutils/td/utils/port/detail/NativeFd.cpp index c4773cee..b36cb55d 100644 --- a/tdutils/td/utils/port/detail/NativeFd.cpp +++ b/tdutils/td/utils/port/detail/NativeFd.cpp @@ -15,13 +15,98 @@ #include #endif +#if TD_FD_DEBUG +#include +#include +#endif + namespace td { +#if TD_FD_DEBUG +class FdSet { + public: + void on_create_fd(NativeFd::Fd fd) { + CHECK(fd >= 0); + if (is_stdio(fd)) { + return; + } + std::unique_lock guard(mutex_); + if (fds_.count(fd) > 1) { + LOG(FATAL) << "Create duplicated fd: " << fd; + } + fds_.insert(fd); + } + + void on_release_fd(NativeFd::Fd fd) { + CHECK(fd >= 0); + if (is_stdio(fd)) { + return; + } + LOG(FATAL) << "Unexpected release of non stdio NativeFd: " << fd; + } + + Status validate(NativeFd::Fd fd) { + if (fd < 0) { + return Status::Error(PSLICE() << "Invalid fd: " << fd); + } + if (is_stdio(fd)) { + return Status::OK(); + } + std::unique_lock guard(mutex_); + if (fds_.count(fd) != 1) { + return Status::Error(PSLICE() << "Unknown fd: " << fd); + } + return Status::OK(); + } + + void on_close_fd(NativeFd::Fd fd) { + CHECK(fd >= 0); + if (is_stdio(fd)) { + return; + } + std::unique_lock guard(mutex_); + if (fds_.count(fd) != 1) { + LOG(FATAL) << "Close unknown fd: " << fd; + } + fds_.erase(fd); + } + + private: + std::mutex mutex_; + std::set fds_; + + bool is_stdio(NativeFd::Fd fd) { + return fd >= 0 && fd <= 2; + } +}; + +namespace { +FdSet &get_fd_set() { + static FdSet res; + return res; +} +} // namespace + +#endif +Status NativeFd::validate() const { +#if TD_FD_DEBUG + return get_fd_set().validate(fd_.get()); +#else + return Status::OK(); +#endif +} + NativeFd::NativeFd(Fd fd) : fd_(fd) { VLOG(fd) << *this << " create"; +#if TD_FD_DEBUG + get_fd_set().on_create_fd(fd); +#endif } NativeFd::NativeFd(Fd fd, bool nolog) : fd_(fd) { +#if TD_FD_DEBUG + get_fd_set().on_create_fd(fd); +#endif } #if TD_PORT_WINDOWS @@ -29,6 +114,14 @@ NativeFd::NativeFd(Socket socket) : fd_(reinterpret_cast(socket)), is_socket VLOG(fd) << *this << " create"; } #endif +NativeFd &NativeFd::operator=(NativeFd &&from) { + close(); + fd_ = std::move(from.fd_); +#if TD_PORT_WINDOWS + is_socket_ = from.is_socket_; +#endif + return *this; +} NativeFd::~NativeFd() { close(); @@ -105,6 +198,11 @@ void NativeFd::close() { if (!*this) { return; } + +#if TD_FD_DEBUG + get_fd_set().on_close_fd(fd()); +#endif + VLOG(fd) << *this << " close"; #if TD_PORT_WINDOWS if (is_socket_ ? closesocket(socket()) : !CloseHandle(fd())) { diff --git a/tdutils/td/utils/port/detail/NativeFd.h b/tdutils/td/utils/port/detail/NativeFd.h index 38201f5b..2a2085ac 100644 --- a/tdutils/td/utils/port/detail/NativeFd.h +++ b/tdutils/td/utils/port/detail/NativeFd.h @@ -26,7 +26,7 @@ class NativeFd { #endif NativeFd() = default; NativeFd(NativeFd &&) = default; - NativeFd &operator=(NativeFd &&) = default; + NativeFd &operator=(NativeFd &&); explicit NativeFd(Fd fd); NativeFd(Fd fd, bool nolog); #if TD_PORT_WINDOWS @@ -52,6 +52,8 @@ class NativeFd { void close(); Fd release(); + Status validate() const; + private: #if TD_PORT_POSIX MovableValue fd_;