EventFd: wait

GitOrigin-RevId: 1ae5d1c3d1316a7c5a868c4246b406182915b399
This commit is contained in:
Arseny Smirnov 2018-09-13 16:27:09 +03:00
parent 9225edd757
commit 02e83aad78
8 changed files with 23 additions and 22 deletions

View File

@ -225,16 +225,9 @@ class Client::Impl final {
~Impl() { ~Impl() {
input_queue_->writer_put({0, nullptr}); input_queue_->writer_put({0, nullptr});
scheduler_thread_.join(); scheduler_thread_.join();
#if !TD_WINDOWS
auto &event_fd = output_queue_->reader_get_event_fd();
poll_.unsubscribe(event_fd.get_poll_info().get_pollable_fd_ref());
#endif
} }
private: private:
#if !TD_WINDOWS
Poll poll_;
#endif
std::shared_ptr<InputQueue> input_queue_; std::shared_ptr<InputQueue> input_queue_;
std::shared_ptr<OutputQueue> output_queue_; std::shared_ptr<OutputQueue> output_queue_;
std::shared_ptr<ConcurrentScheduler> scheduler_; std::shared_ptr<ConcurrentScheduler> scheduler_;
@ -257,12 +250,6 @@ class Client::Impl final {
} }
scheduler->finish(); scheduler->finish();
}); });
#if !TD_WINDOWS
poll_.init();
auto &event_fd = output_queue_->reader_get_event_fd();
poll_.subscribe(event_fd.get_poll_info().extract_pollable_fd(nullptr), PollFlags::Read());
#endif
} }
Response receive_unlocked(double timeout) { Response receive_unlocked(double timeout) {
@ -274,11 +261,7 @@ class Client::Impl final {
return output_queue_->reader_get_unsafe(); return output_queue_->reader_get_unsafe();
} }
if (timeout != 0) { if (timeout != 0) {
#if TD_WINDOWS
output_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000)); output_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
#else
poll_.run(static_cast<int>(timeout * 1000));
#endif
return receive_unlocked(0); return receive_unlocked(0);
} }
return {0, nullptr}; return {0, nullptr};

View File

@ -27,5 +27,6 @@ class EventFdBase {
virtual Status get_pending_error() TD_WARN_UNUSED_RESULT = 0; virtual Status get_pending_error() TD_WARN_UNUSED_RESULT = 0;
virtual void release() = 0; virtual void release() = 0;
virtual void acquire() = 0; virtual void acquire() = 0;
virtual void wait(int timeout_ms) = 0;
}; };
} // namespace td } // namespace td

View File

@ -19,6 +19,7 @@ char disable_linker_warning_about_empty_file_event_fd_bsd_cpp TD_UNUSED;
#include <fcntl.h> #include <fcntl.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/types.h> #include <sys/types.h>
#include <poll.h>
namespace td { namespace td {
namespace detail { namespace detail {
@ -88,6 +89,13 @@ void EventFdBsd::acquire() {
} }
} }
void EventFdBsd::wait(int timeout_ms) {
pollfd fd;
fd.fd = get_poll_info().native_fd().fd();
fd.events = POLLIN;
poll(&fd, 1, timeout_ms);
}
} // namespace detail } // namespace detail
} // namespace td } // namespace td

View File

@ -39,6 +39,8 @@ class EventFdBsd final : public EventFdBase {
void release() override; void release() override;
void acquire() override; void acquire() override;
void wait(int timeout_ms) override;
}; };
} // namespace detail } // namespace detail

View File

@ -18,6 +18,7 @@ char disable_linker_warning_about_empty_file_event_fd_linux_cpp TD_UNUSED;
#include <sys/eventfd.h> #include <sys/eventfd.h>
#include <unistd.h> #include <unistd.h>
#include <poll.h>
namespace td { namespace td {
namespace detail { namespace detail {
@ -106,6 +107,13 @@ void EventFdLinux::acquire() {
} }
} }
void EventFdLinux::wait(int timeout_ms) {
pollfd fd;
fd.fd = get_poll_info().native_fd().fd();
fd.events = POLLIN;
poll(&fd, 1, timeout_ms);
}
} // namespace detail } // namespace detail
} // namespace td } // namespace td

View File

@ -41,6 +41,8 @@ class EventFdLinux final : public EventFdBase {
void release() override; void release() override;
void acquire() override; void acquire() override;
void wait(int timeout_ms) override;
}; };
} // namespace detail } // namespace detail

View File

@ -39,7 +39,7 @@ class EventFdWindows final : public EventFdBase {
void acquire() override; void acquire() override;
void wait(int timeout_ms); void wait(int timeout_ms) override;
}; };
} // namespace detail } // namespace detail

View File

@ -400,10 +400,7 @@ class PollQueue : public QueueT {
while ((res = reader_wait_nonblock()) == 0) { while ((res = reader_wait_nonblock()) == 0) {
// TODO: reader_flush? // TODO: reader_flush?
pollfd fd; reader_get_event_fd().wait(1000);
fd.fd = reader_get_event_fd().get_poll_info().native_fd().fd();
fd.events = POLLIN;
poll(&fd, 1, -1);
} }
return res; return res;
} }