Move Iocp to separate files.

GitOrigin-RevId: 8b60ea4ab775d264b70987316aac8141719d3a6b
This commit is contained in:
levlam 2018-09-11 17:43:43 +03:00
parent 94f2fca814
commit 4df6f95818
10 changed files with 170 additions and 136 deletions

View File

@ -14,10 +14,6 @@
#include "td/utils/MpscPollableQueue.h" #include "td/utils/MpscPollableQueue.h"
#include "td/utils/port/thread_local.h" #include "td/utils/port/thread_local.h"
#if TD_PORT_WINDOWS
#include "td/utils/port/detail/WineventPoll.h"
#endif
#include <memory> #include <memory>
namespace td { namespace td {
@ -61,7 +57,7 @@ void ConcurrentScheduler::init(int32 threads_n) {
} }
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
iocp_ = std::make_unique<detail::IOCP>(); iocp_ = std::make_unique<detail::Iocp>();
iocp_->init(); iocp_->init();
#endif #endif
@ -86,7 +82,7 @@ void ConcurrentScheduler::start() {
threads_.push_back(td::thread([&, tid = i]() { threads_.push_back(td::thread([&, tid = i]() {
set_thread_id(static_cast<int32>(tid)); set_thread_id(static_cast<int32>(tid));
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
td::detail::IOCP::Guard iocp_guard(iocp_.get()); td::detail::Iocp::Guard iocp_guard(iocp_.get());
#endif #endif
while (!is_finished()) { while (!is_finished()) {
sched->run(10); sched->run(10);
@ -110,7 +106,7 @@ bool ConcurrentScheduler::run_main(double timeout) {
auto &main_sched = schedulers_[0]; auto &main_sched = schedulers_[0];
if (!is_finished()) { if (!is_finished()) {
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
td::detail::IOCP::Guard iocp_guard(iocp_.get()); td::detail::Iocp::Guard iocp_guard(iocp_.get());
#endif #endif
main_sched->run(timeout); main_sched->run(timeout);
} }
@ -126,7 +122,7 @@ void ConcurrentScheduler::finish() {
SCOPE_EXIT { SCOPE_EXIT {
iocp_->clear(); iocp_->clear();
}; };
td::detail::IOCP::Guard iocp_guard(iocp_.get()); td::detail::Iocp::Guard iocp_guard(iocp_.get());
#endif #endif
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED

View File

@ -13,6 +13,10 @@
#include "td/utils/port/thread.h" #include "td/utils/port/thread.h"
#include "td/utils/Slice.h" #include "td/utils/Slice.h"
#if TD_PORT_WINDOWS
#include "td/utils/port/detail/Iocp.h"
#endif
#include <atomic> #include <atomic>
#include <functional> #include <functional>
#include <mutex> #include <mutex>
@ -20,12 +24,6 @@
namespace td { namespace td {
#if TD_PORT_WINDOWS
namespace detail {
class IOCP;
}
#endif
class ConcurrentScheduler : private Scheduler::Callback { class ConcurrentScheduler : private Scheduler::Callback {
public: public:
void init(int32 threads_n); void init(int32 threads_n);
@ -87,7 +85,7 @@ class ConcurrentScheduler : private Scheduler::Callback {
std::vector<thread> threads_; std::vector<thread> threads_;
#endif #endif
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
std::unique_ptr<detail::IOCP> iocp_; std::unique_ptr<detail::Iocp> iocp_;
td::thread iocp_thread_; td::thread iocp_thread_;
#endif #endif

View File

@ -52,6 +52,7 @@ set(TDUTILS_SOURCE
td/utils/port/detail/EventFdBsd.cpp td/utils/port/detail/EventFdBsd.cpp
td/utils/port/detail/EventFdLinux.cpp td/utils/port/detail/EventFdLinux.cpp
td/utils/port/detail/EventFdWindows.cpp td/utils/port/detail/EventFdWindows.cpp
td/utils/port/detail/Iocp.cpp
td/utils/port/detail/KQueue.cpp td/utils/port/detail/KQueue.cpp
td/utils/port/detail/NativeFd.cpp td/utils/port/detail/NativeFd.cpp
td/utils/port/detail/Poll.cpp td/utils/port/detail/Poll.cpp
@ -117,6 +118,7 @@ set(TDUTILS_SOURCE
td/utils/port/detail/EventFdBsd.h td/utils/port/detail/EventFdBsd.h
td/utils/port/detail/EventFdLinux.h td/utils/port/detail/EventFdLinux.h
td/utils/port/detail/EventFdWindows.h td/utils/port/detail/EventFdWindows.h
td/utils/port/detail/Iocp.h
td/utils/port/detail/KQueue.h td/utils/port/detail/KQueue.h
td/utils/port/detail/NativeFd.h td/utils/port/detail/NativeFd.h
td/utils/port/detail/Poll.h td/utils/port/detail/Poll.h

View File

@ -25,7 +25,7 @@
#endif #endif
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
#include "td/utils/port/detail/WineventPoll.h" #include "td/utils/port/detail/Iocp.h"
#include "td/utils/SpinLock.h" #include "td/utils/SpinLock.h"
#include "td/utils/VectorQueue.h" #include "td/utils/VectorQueue.h"
#endif #endif
@ -37,11 +37,11 @@ namespace td {
namespace detail { namespace detail {
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
class ServerSocketFdImpl : private IOCP::Callback { class ServerSocketFdImpl : private Iocp::Callback {
public: public:
ServerSocketFdImpl(NativeFd fd, int socket_family) : info_(std::move(fd)), socket_family_(socket_family) { ServerSocketFdImpl(NativeFd fd, int socket_family) : info_(std::move(fd)), socket_family_(socket_family) {
VLOG(fd) << get_native_fd().socket() << " create ServerSocketFd"; VLOG(fd) << get_native_fd().socket() << " create ServerSocketFd";
IOCP::get()->subscribe(get_native_fd(), this); Iocp::get()->subscribe(get_native_fd(), this);
notify_iocp_read(); notify_iocp_read();
} }
void close() { void close() {
@ -185,11 +185,11 @@ class ServerSocketFdImpl : private IOCP::Callback {
void notify_iocp_read() { void notify_iocp_read() {
VLOG(fd) << get_native_fd().socket() << " notify_read"; VLOG(fd) << get_native_fd().socket() << " notify_read";
inc_refcnt(); inc_refcnt();
IOCP::get()->post(0, this, nullptr); Iocp::get()->post(0, this, nullptr);
} }
void notify_iocp_close() { void notify_iocp_close() {
VLOG(fd) << get_native_fd().socket() << " notify_close"; VLOG(fd) << get_native_fd().socket() << " notify_close";
IOCP::get()->post(0, this, reinterpret_cast<WSAOVERLAPPED *>(&close_overlapped_)); Iocp::get()->post(0, this, reinterpret_cast<WSAOVERLAPPED *>(&close_overlapped_));
} }
}; };
void ServerSocketFdImplDeleter::operator()(ServerSocketFdImpl *impl) { void ServerSocketFdImplDeleter::operator()(ServerSocketFdImpl *impl) {

View File

@ -12,7 +12,7 @@
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
#include "td/utils/buffer.h" #include "td/utils/buffer.h"
#include "td/utils/port/detail/WineventPoll.h" #include "td/utils/port/detail/Iocp.h"
#include "td/utils/SpinLock.h" #include "td/utils/SpinLock.h"
#include "td/utils/VectorQueue.h" #include "td/utils/VectorQueue.h"
#endif #endif
@ -33,12 +33,12 @@
namespace td { namespace td {
namespace detail { namespace detail {
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
class SocketFdImpl : private IOCP::Callback { class SocketFdImpl : private Iocp::Callback {
public: public:
explicit SocketFdImpl(NativeFd native_fd) : info(std::move(native_fd)) { explicit SocketFdImpl(NativeFd native_fd) : info(std::move(native_fd)) {
VLOG(fd) << get_native_fd().socket() << " create from native_fd"; VLOG(fd) << get_native_fd().socket() << " create from native_fd";
get_poll_info().add_flags(PollFlags::Write()); get_poll_info().add_flags(PollFlags::Write());
IOCP::get()->subscribe(get_native_fd(), this); Iocp::get()->subscribe(get_native_fd(), this);
is_read_active_ = true; is_read_active_ = true;
notify_iocp_connected(); notify_iocp_connected();
} }
@ -46,7 +46,7 @@ class SocketFdImpl : private IOCP::Callback {
SocketFdImpl(NativeFd native_fd, const IPAddress &addr) : info(std::move(native_fd)) { SocketFdImpl(NativeFd native_fd, const IPAddress &addr) : info(std::move(native_fd)) {
VLOG(fd) << get_native_fd().socket() << " create from native_fd and connect"; VLOG(fd) << get_native_fd().socket() << " create from native_fd and connect";
get_poll_info().add_flags(PollFlags::Write()); get_poll_info().add_flags(PollFlags::Write());
IOCP::get()->subscribe(get_native_fd(), this); Iocp::get()->subscribe(get_native_fd(), this);
LPFN_CONNECTEX ConnectExPtr = nullptr; LPFN_CONNECTEX ConnectExPtr = nullptr;
GUID guid = WSAID_CONNECTEX; GUID guid = WSAID_CONNECTEX;
DWORD numBytes; DWORD numBytes;
@ -294,14 +294,14 @@ class SocketFdImpl : private IOCP::Callback {
void notify_iocp_write() { void notify_iocp_write() {
inc_refcnt(); inc_refcnt();
IOCP::get()->post(0, this, nullptr); Iocp::get()->post(0, this, nullptr);
} }
void notify_iocp_close() { void notify_iocp_close() {
IOCP::get()->post(0, this, reinterpret_cast<WSAOVERLAPPED *>(&close_overlapped_)); Iocp::get()->post(0, this, reinterpret_cast<WSAOVERLAPPED *>(&close_overlapped_));
} }
void notify_iocp_connected() { void notify_iocp_connected() {
inc_refcnt(); inc_refcnt();
IOCP::get()->post(0, this, &read_overlapped_); Iocp::get()->post(0, this, &read_overlapped_);
} }
}; };
@ -498,7 +498,8 @@ Result<SocketFd> SocketFd::open(const IPAddress &address) {
TRY_STATUS(detail::init_socket_options(native_fd)); TRY_STATUS(detail::init_socket_options(native_fd));
#if TD_PORT_POSIX #if TD_PORT_POSIX
int e_connect = connect(native_fd.socket(), address.get_sockaddr(), narrow_cast<socklen_t>(address.get_sockaddr_len())); int e_connect =
connect(native_fd.socket(), address.get_sockaddr(), narrow_cast<socklen_t>(address.get_sockaddr_len()));
if (e_connect == -1) { if (e_connect == -1) {
auto connect_errno = errno; auto connect_errno = errno;
if (connect_errno != EINPROGRESS) { if (connect_errno != EINPROGRESS) {

View File

@ -15,7 +15,7 @@
#include "td/utils/VectorQueue.h" #include "td/utils/VectorQueue.h"
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
#include "td/utils/port/detail/WineventPoll.h" #include "td/utils/port/detail/Iocp.h"
#include "td/utils/SpinLock.h" #include "td/utils/SpinLock.h"
#endif #endif
@ -98,11 +98,11 @@ class UdpSocketSendHelper {
WSABUF buf_; WSABUF buf_;
}; };
class UdpSocketFdImpl : private IOCP::Callback { class UdpSocketFdImpl : private Iocp::Callback {
public: public:
explicit UdpSocketFdImpl(NativeFd fd) : info_(std::move(fd)) { explicit UdpSocketFdImpl(NativeFd fd) : info_(std::move(fd)) {
get_poll_info().add_flags(PollFlags::Write()); get_poll_info().add_flags(PollFlags::Write());
IOCP::get()->subscribe(get_native_fd(), this); Iocp::get()->subscribe(get_native_fd(), this);
is_receive_active_ = true; is_receive_active_ = true;
notify_iocp_connected(); notify_iocp_connected();
} }
@ -339,14 +339,14 @@ class UdpSocketFdImpl : private IOCP::Callback {
void notify_iocp_send() { void notify_iocp_send() {
inc_refcnt(); inc_refcnt();
IOCP::get()->post(0, this, nullptr); Iocp::get()->post(0, this, nullptr);
} }
void notify_iocp_close() { void notify_iocp_close() {
IOCP::get()->post(0, this, reinterpret_cast<WSAOVERLAPPED *>(&close_overlapped_)); Iocp::get()->post(0, this, reinterpret_cast<WSAOVERLAPPED *>(&close_overlapped_));
} }
void notify_iocp_connected() { void notify_iocp_connected() {
inc_refcnt(); inc_refcnt();
IOCP::get()->post(0, this, reinterpret_cast<WSAOVERLAPPED *>(&receive_overlapped_)); Iocp::get()->post(0, this, reinterpret_cast<WSAOVERLAPPED *>(&receive_overlapped_));
} }
}; };

View File

@ -0,0 +1,87 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/utils/port/detail/Iocp.h"
char disable_linker_warning_about_empty_file_iocp_cpp TD_UNUSED;
#ifdef TD_PORT_WINDOWS
#include "td/utils/logging.h"
namespace td {
namespace detail {
Iocp::~Iocp() {
clear();
}
void Iocp::loop() {
Iocp::Guard guard(this);
while (true) {
DWORD bytes = 0;
ULONG_PTR key = 0;
WSAOVERLAPPED *overlapped = nullptr;
BOOL ok =
GetQueuedCompletionStatus(iocp_handle_.fd(), &bytes, &key, reinterpret_cast<OVERLAPPED **>(&overlapped), 1000);
if (bytes || key || overlapped) {
// LOG(ERROR) << "Got IOCP " << bytes << " " << key << " " << overlapped;
}
if (ok) {
auto callback = reinterpret_cast<Iocp::Callback *>(key);
if (callback == nullptr) {
// LOG(ERROR) << "Interrupt IOCP loop";
return;
}
callback->on_iocp(bytes, overlapped);
} else {
if (overlapped != nullptr) {
auto error = OS_ERROR("Received from IOCP");
auto callback = reinterpret_cast<Iocp::Callback *>(key);
CHECK(callback != nullptr);
callback->on_iocp(std::move(error), overlapped);
}
}
}
}
void Iocp::interrupt_loop() {
post(0, nullptr, nullptr);
}
void Iocp::init() {
CHECK(!iocp_handle_);
auto res = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
if (res == nullptr) {
auto error = OS_ERROR("IOCP creation failed");
LOG(FATAL) << error;
}
iocp_handle_ = NativeFd(res);
}
void Iocp::clear() {
iocp_handle_.close();
}
void Iocp::subscribe(const NativeFd &native_fd, Callback *callback) {
CHECK(iocp_handle_);
auto iocp_handle =
CreateIoCompletionPort(native_fd.fd(), iocp_handle_.fd(), reinterpret_cast<ULONG_PTR>(callback), 0);
if (iocp_handle == INVALID_HANDLE_VALUE) {
auto error = OS_ERROR("CreateIoCompletionPort");
LOG(FATAL) << error;
}
CHECK(iocp_handle == iocp_handle_.fd()) << iocp_handle << " " << iocp_handle_.fd();
}
void Iocp::post(size_t size, Callback *callback, WSAOVERLAPPED *overlapped) {
PostQueuedCompletionStatus(iocp_handle_.fd(), DWORD(size), reinterpret_cast<ULONG_PTR>(callback),
reinterpret_cast<OVERLAPPED *>(overlapped));
}
} // namespace detail
} // namespace td
#endif

View File

@ -0,0 +1,52 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/port/config.h"
#ifdef TD_PORT_WINDOWS
#include "td/utils/common.h"
#include "td/utils/Context.h"
#include "td/utils/port/detail/NativeFd.h"
#include "td/utils/port/thread.h"
#include "td/utils/Status.h"
namespace td {
namespace detail {
class Iocp final : public Context<Iocp> {
public:
Iocp() = default;
Iocp(const Iocp &) = delete;
Iocp &operator=(const Iocp &) = delete;
Iocp(Iocp &&) = delete;
Iocp &operator=(Iocp &&) = delete;
~Iocp();
class Callback {
public:
virtual ~Callback() = default;
virtual void on_iocp(Result<size_t> r_size, WSAOVERLAPPED *overlapped) = 0;
};
void init();
void subscribe(const NativeFd &fd, Callback *callback);
void post(size_t size, Callback *callback, WSAOVERLAPPED *overlapped);
void loop();
void interrupt_loop();
void clear();
private:
NativeFd iocp_handle_;
std::vector<td::thread> workers_;
};
} // namespace detail
} // namespace td
#endif

View File

@ -10,81 +10,10 @@ char disable_linker_warning_about_empty_file_wineventpoll_cpp TD_UNUSED;
#ifdef TD_POLL_WINEVENT #ifdef TD_POLL_WINEVENT
#include "td/utils/common.h"
#include "td/utils/logging.h" #include "td/utils/logging.h"
#include "td/utils/port/PollBase.h"
#include "td/utils/Status.h"
#include <utility>
namespace td { namespace td {
namespace detail { namespace detail {
IOCP::~IOCP() {
clear();
}
void IOCP::loop() {
IOCP::Guard guard(this);
while (true) {
DWORD bytes = 0;
ULONG_PTR key = 0;
WSAOVERLAPPED *overlapped = nullptr;
BOOL ok =
GetQueuedCompletionStatus(iocp_handle_.fd(), &bytes, &key, reinterpret_cast<OVERLAPPED **>(&overlapped), 1000);
if (bytes || key || overlapped) {
// LOG(ERROR) << "Got IOCP " << bytes << " " << key << " " << overlapped;
}
if (ok) {
auto callback = reinterpret_cast<IOCP::Callback *>(key);
if (callback == nullptr) {
// LOG(ERROR) << "Interrupt IOCP loop";
return;
}
callback->on_iocp(bytes, overlapped);
} else {
if (overlapped != nullptr) {
auto error = OS_ERROR("Received from IOCP");
auto callback = reinterpret_cast<IOCP::Callback *>(key);
CHECK(callback != nullptr);
callback->on_iocp(std::move(error), overlapped);
}
}
}
}
void IOCP::interrupt_loop() {
post(0, nullptr, nullptr);
}
void IOCP::init() {
CHECK(!iocp_handle_);
auto res = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
if (res == nullptr) {
auto error = OS_ERROR("IOCP creation failed");
LOG(FATAL) << error;
}
iocp_handle_ = NativeFd(res);
}
void IOCP::clear() {
iocp_handle_.close();
}
void IOCP::subscribe(const NativeFd &native_fd, Callback *callback) {
CHECK(iocp_handle_);
auto iocp_handle =
CreateIoCompletionPort(native_fd.fd(), iocp_handle_.fd(), reinterpret_cast<ULONG_PTR>(callback), 0);
if (iocp_handle == INVALID_HANDLE_VALUE) {
auto error = OS_ERROR("CreateIoCompletionPort");
LOG(FATAL) << error;
}
CHECK(iocp_handle == iocp_handle_.fd()) << iocp_handle << " " << iocp_handle_.fd();
}
void IOCP::post(size_t size, Callback *callback, WSAOVERLAPPED *overlapped) {
PostQueuedCompletionStatus(iocp_handle_.fd(), DWORD(size), reinterpret_cast<ULONG_PTR>(callback),
reinterpret_cast<OVERLAPPED *>(overlapped));
}
void WineventPoll::init() { void WineventPoll::init() {
} }

View File

@ -11,44 +11,13 @@
#ifdef TD_POLL_WINEVENT #ifdef TD_POLL_WINEVENT
#include "td/utils/common.h" #include "td/utils/common.h"
#include "td/utils/Context.h"
#include "td/utils/port/detail/NativeFd.h"
#include "td/utils/port/detail/PollableFd.h" #include "td/utils/port/detail/PollableFd.h"
#include "td/utils/port/PollBase.h" #include "td/utils/port/PollBase.h"
#include "td/utils/port/PollFlags.h" #include "td/utils/port/PollFlags.h"
#include "td/utils/port/thread.h"
#include "td/utils/Status.h"
namespace td { namespace td {
namespace detail { namespace detail {
class IOCP final : public Context<IOCP> {
public:
IOCP() = default;
IOCP(const IOCP &) = delete;
IOCP &operator=(const IOCP &) = delete;
IOCP(IOCP &&) = delete;
IOCP &operator=(IOCP &&) = delete;
~IOCP();
class Callback {
public:
virtual ~Callback() = default;
virtual void on_iocp(Result<size_t> r_size, WSAOVERLAPPED *overlapped) = 0;
};
void init();
void subscribe(const NativeFd &fd, Callback *callback);
void post(size_t size, Callback *callback, WSAOVERLAPPED *overlapped);
void loop();
void interrupt_loop();
void clear();
private:
NativeFd iocp_handle_;
std::vector<td::thread> workers_;
};
class WineventPoll final : public PollBase { class WineventPoll final : public PollBase {
public: public:
WineventPoll() = default; WineventPoll() = default;