MpscPollableQueue: move system calls out of spin lock
GitOrigin-RevId: 6e826bcca1006aeadab4af84ad86ce2e402c10e4
This commit is contained in:
parent
948da791ad
commit
d4cb9d2d52
|
@ -9,6 +9,8 @@
|
||||||
#include "td/utils/logging.h"
|
#include "td/utils/logging.h"
|
||||||
#include "td/utils/MpscPollableQueue.h"
|
#include "td/utils/MpscPollableQueue.h"
|
||||||
#include "td/utils/queue.h"
|
#include "td/utils/queue.h"
|
||||||
|
#include "td/utils/Random.h"
|
||||||
|
#include "td/utils/port/sleep.h"
|
||||||
|
|
||||||
// TODO: check system calls
|
// TODO: check system calls
|
||||||
// TODO: all return values must be checked
|
// TODO: all return values must be checked
|
||||||
|
@ -899,8 +901,36 @@ class RingBenchmark : public td::Benchmark {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void test_queue() {
|
||||||
|
std::vector<td::thread> threads;
|
||||||
|
constexpr size_t threads_n = 100;
|
||||||
|
std::vector<td::MpscPollableQueue<int>> queues(threads_n);
|
||||||
|
for (auto &q : queues) {
|
||||||
|
q.init();
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < threads_n; i++) {
|
||||||
|
threads.emplace_back([&q = queues[i]] {
|
||||||
|
while (true) {
|
||||||
|
auto got = q.reader_wait_nonblock();
|
||||||
|
while (got-- > 0) {
|
||||||
|
q.reader_get_unsafe();
|
||||||
|
}
|
||||||
|
q.reader_get_event_fd().wait(1000);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
td::usleep_for(100);
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
queues[td::Random::fast(0, threads_n - 1)].writer_put(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int main() {
|
int main() {
|
||||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
|
||||||
|
//test_queue();
|
||||||
#define BENCH_Q2(Q, N) \
|
#define BENCH_Q2(Q, N) \
|
||||||
std::fprintf(stderr, "!%s %d:\t", #Q, N); \
|
std::fprintf(stderr, "!%s %d:\t", #Q, N); \
|
||||||
td::bench(QueueBenchmark2<Q>(N));
|
td::bench(QueueBenchmark2<Q>(N));
|
||||||
|
|
|
@ -29,9 +29,9 @@ class MpscPollableQueue {
|
||||||
return narrow_cast<int>(ready);
|
return narrow_cast<int>(ready);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event_fd_.acquire();
|
||||||
auto guard = lock_.lock();
|
auto guard = lock_.lock();
|
||||||
if (writer_vector_.empty()) {
|
if (writer_vector_.empty()) {
|
||||||
event_fd_.acquire();
|
|
||||||
wait_event_fd_ = true;
|
wait_event_fd_ = true;
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -52,6 +52,7 @@ class MpscPollableQueue {
|
||||||
writer_vector_.push_back(std::move(value));
|
writer_vector_.push_back(std::move(value));
|
||||||
if (wait_event_fd_) {
|
if (wait_event_fd_) {
|
||||||
wait_event_fd_ = false;
|
wait_event_fd_ = false;
|
||||||
|
guard.reset();
|
||||||
event_fd_.release();
|
event_fd_.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Reference in New Issue
Block a user