diff --git a/benchmark/bench_queue.cpp b/benchmark/bench_queue.cpp index 475ffa41..a0ca18a9 100644 --- a/benchmark/bench_queue.cpp +++ b/benchmark/bench_queue.cpp @@ -9,6 +9,8 @@ #include "td/utils/logging.h" #include "td/utils/MpscPollableQueue.h" #include "td/utils/queue.h" +#include "td/utils/Random.h" +#include "td/utils/port/sleep.h" // TODO: check system calls // TODO: all return values must be checked @@ -899,8 +901,36 @@ class RingBenchmark : public td::Benchmark { } }; +void test_queue() { + std::vector threads; + constexpr size_t threads_n = 100; + std::vector> queues(threads_n); + for (auto &q : queues) { + q.init(); + } + for (size_t i = 0; i < threads_n; i++) { + threads.emplace_back([&q = queues[i]] { + while (true) { + auto got = q.reader_wait_nonblock(); + while (got-- > 0) { + q.reader_get_unsafe(); + } + q.reader_get_event_fd().wait(1000); + } + }); + } + + while (true) { + td::usleep_for(100); + for (int i = 0; i < 5; i++) { + queues[td::Random::fast(0, threads_n - 1)].writer_put(1); + } + } +} + int main() { SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); + //test_queue(); #define BENCH_Q2(Q, N) \ std::fprintf(stderr, "!%s %d:\t", #Q, N); \ td::bench(QueueBenchmark2(N)); diff --git a/tdutils/td/utils/MpscPollableQueue.h b/tdutils/td/utils/MpscPollableQueue.h index 9167cbc7..a639b3ee 100644 --- a/tdutils/td/utils/MpscPollableQueue.h +++ b/tdutils/td/utils/MpscPollableQueue.h @@ -29,9 +29,9 @@ class MpscPollableQueue { return narrow_cast(ready); } + event_fd_.acquire(); auto guard = lock_.lock(); if (writer_vector_.empty()) { - event_fd_.acquire(); wait_event_fd_ = true; return 0; } else { @@ -52,6 +52,7 @@ class MpscPollableQueue { writer_vector_.push_back(std::move(value)); if (wait_event_fd_) { wait_event_fd_ = false; + guard.reset(); event_fd_.release(); } }