Improve bench_queue.

This commit is contained in:
levlam 2021-10-27 00:23:30 +03:00
parent 7226c09d31
commit a68d8e77ef

View File

@ -17,15 +17,16 @@
// TODO: all return values must be checked // TODO: all return values must be checked
#include <atomic> #include <atomic>
#include <cstdio>
#include <cstdlib> #include <cstdlib>
#include <vector> #include <vector>
#if TD_PORT_POSIX
#include <pthread.h> #include <pthread.h>
#include <sched.h> #include <sched.h>
#include <semaphore.h> #include <semaphore.h>
#include <sys/syscall.h> #include <sys/syscall.h>
#include <unistd.h> #include <unistd.h>
#endif
#if TD_LINUX #if TD_LINUX
#include <sys/eventfd.h> #include <sys/eventfd.h>
@ -43,6 +44,22 @@
using qvalue_t = int; using qvalue_t = int;
class Backoff {
int cnt = 0;
public:
bool next() {
cnt++;
if (cnt < 50) {
return true;
} else {
td::this_thread::yield();
return cnt < 500;
}
}
};
#if TD_PORT_POSIX
// Just for testing, not production // Just for testing, not production
class PipeQueue { class PipeQueue {
int input; int input;
@ -75,24 +92,6 @@ class PipeQueue {
} }
}; };
class Backoff {
int cnt;
public:
Backoff() : cnt(0) {
}
bool next() {
cnt++;
if (cnt < 50) {
return true;
} else {
sched_yield();
return cnt < 500;
}
}
};
class VarQueue { class VarQueue {
std::atomic<qvalue_t> data{0}; std::atomic<qvalue_t> data{0};
@ -176,6 +175,7 @@ class SemQueue {
return get(); return get();
} }
}; };
#endif
#if TD_LINUX #if TD_LINUX
class EventfdQueue { class EventfdQueue {
@ -318,8 +318,7 @@ class BufferQueue {
return; return;
} }
if (!update_writer()) { if (!update_writer()) {
std::fprintf(stderr, "put strong failed\n"); LOG(FATAL) << "Put strong failed";
std::exit(0);
} }
put_unsafe(val); put_unsafe(val);
} }
@ -472,10 +471,9 @@ class FdQueue {
td::int64 x; td::int64 x;
wait_flag.store(1, MODE); wait_flag.store(1, MODE);
__sync_synchronize(); __sync_synchronize();
// std::fprintf(stderr, "!\n");
// while (res == -1 && read(fd, &x, sizeof(x)) == sizeof(x)) { // while (res == -1 && read(fd, &x, sizeof(x)) == sizeof(x)) {
// res = q.try_get(); // res = q.try_get();
//} // }
do { do {
__sync_synchronize(); __sync_synchronize();
res = q.try_get(); res = q.try_get();
@ -491,6 +489,7 @@ class FdQueue {
}; };
#endif #endif
#if TD_PORT_POSIX
class SemBackoffQueue { class SemBackoffQueue {
sem_t sem; sem_t sem;
VarQueue q; VarQueue q;
@ -566,15 +565,17 @@ class QueueBenchmark2 final : public td::Benchmark {
int server_active_connections; int server_active_connections;
int client_active_connections; int client_active_connections;
std::vector<td::int64> server_conn; td::vector<td::int64> server_conn;
std::vector<td::int64> client_conn; td::vector<td::int64> client_conn;
td::string name;
public: public:
explicit QueueBenchmark2(int connections_n = 1) : connections_n(connections_n) { QueueBenchmark2(int connections_n, td::string name) : connections_n(connections_n), name(std::move(name)) {
} }
std::string get_description() const final { td::string get_description() const final {
return "QueueBenchmark2"; return name;
} }
void start_up() final { void start_up() final {
@ -590,16 +591,8 @@ class QueueBenchmark2 final : public td::Benchmark {
void server_process(qvalue_t value) { void server_process(qvalue_t value) {
int no = value & 0x00FFFFFF; int no = value & 0x00FFFFFF;
auto co = static_cast<int>(static_cast<td::uint32>(value) >> 24); auto co = static_cast<int>(static_cast<td::uint32>(value) >> 24);
// std::fprintf(stderr, "-->%d %d\n", co, no); CHECK(co >= 0 && co < connections_n);
if (co < 0 || co >= connections_n || no != server_conn[co]++) { CHECK(no == server_conn[co]++);
std::fprintf(stderr, "%d %d\n", co, no);
std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(server_conn[co] - 1));
std::fprintf(stderr, "Server BUG\n");
while (true) {
}
}
// std::fprintf(stderr, "no = %d/%d\n", no, queries_n);
// std::fprintf(stderr, "answer: %d %d\n", no, co);
client.writer_put(value); client.writer_put(value);
client.writer_flush(); client.writer_flush();
@ -609,15 +602,12 @@ class QueueBenchmark2 final : public td::Benchmark {
} }
void *server_run(void *) { void *server_run(void *) {
server_conn = std::vector<td::int64>(connections_n); server_conn = td::vector<td::int64>(connections_n);
server_active_connections = connections_n; server_active_connections = connections_n;
while (server_active_connections > 0) { while (server_active_connections > 0) {
int cnt = server.reader_wait(); int cnt = server.reader_wait();
if (cnt == 0) { CHECK(cnt != 0);
std::fprintf(stderr, "ERROR!\n");
std::exit(0);
}
while (cnt-- > 0) { while (cnt-- > 0) {
server_process(server.reader_get_unsafe()); server_process(server.reader_get_unsafe());
server.reader_flush(); server.reader_flush();
@ -631,17 +621,9 @@ class QueueBenchmark2 final : public td::Benchmark {
void client_process(qvalue_t value) { void client_process(qvalue_t value) {
int no = value & 0x00FFFFFF; int no = value & 0x00FFFFFF;
auto co = static_cast<int>(static_cast<td::uint32>(value) >> 24); auto co = static_cast<int>(static_cast<td::uint32>(value) >> 24);
// std::fprintf(stderr, "<--%d %d\n", co, no); CHECK(co >= 0 && co < connections_n);
if (co < 0 || co >= connections_n || no != client_conn[co]++) { CHECK(no == client_conn[co]++);
std::fprintf(stderr, "%d %d\n", co, no);
std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(client_conn[co] - 1));
std::fprintf(stderr, "BUG\n");
while (true) {
}
std::exit(0);
}
if (no + 1 < queries_n) { if (no + 1 < queries_n) {
// std::fprintf(stderr, "query: %d %d\n", no + 1, co);
server.writer_put(value + 1); server.writer_put(value + 1);
server.writer_flush(); server.writer_flush();
} else { } else {
@ -650,12 +632,9 @@ class QueueBenchmark2 final : public td::Benchmark {
} }
void *client_run(void *) { void *client_run(void *) {
client_conn = std::vector<td::int64>(connections_n); client_conn = td::vector<td::int64>(connections_n);
client_active_connections = connections_n; client_active_connections = connections_n;
if (queries_n >= (1 << 24)) { CHECK(queries_n < (1 << 24));
std::fprintf(stderr, "Too big queries_n\n");
std::exit(0);
}
for (int i = 0; i < connections_n; i++) { for (int i = 0; i < connections_n; i++) {
server.writer_put(static_cast<qvalue_t>(i) << 24); server.writer_put(static_cast<qvalue_t>(i) << 24);
@ -664,10 +643,7 @@ class QueueBenchmark2 final : public td::Benchmark {
while (client_active_connections > 0) { while (client_active_connections > 0) {
int cnt = client.reader_wait(); int cnt = client.reader_wait();
if (cnt == 0) { CHECK(cnt != 0);
std::fprintf(stderr, "ERROR!\n");
std::exit(0);
}
while (cnt-- > 0) { while (cnt-- > 0) {
client_process(client.reader_get_unsafe()); client_process(client.reader_get_unsafe());
client.reader_flush(); client.reader_flush();
@ -707,12 +683,14 @@ class QueueBenchmark final : public td::Benchmark {
const int connections_n; const int connections_n;
int queries_n; int queries_n;
td::string name;
public: public:
explicit QueueBenchmark(int connections_n = 1) : connections_n(connections_n) { QueueBenchmark(int connections_n, td::string name) : connections_n(connections_n), name(std::move(name)) {
} }
std::string get_description() const final { td::string get_description() const final {
return "QueueBenchmark"; return name;
} }
void start_up() final { void start_up() final {
@ -726,21 +704,14 @@ class QueueBenchmark final : public td::Benchmark {
} }
void *server_run(void *) { void *server_run(void *) {
std::vector<td::int64> conn(connections_n); td::vector<td::int64> conn(connections_n);
int active_connections = connections_n; int active_connections = connections_n;
while (active_connections > 0) { while (active_connections > 0) {
qvalue_t value = server.get(); qvalue_t value = server.get();
int no = value & 0x00FFFFFF; int no = value & 0x00FFFFFF;
auto co = static_cast<int>(value >> 24); auto co = static_cast<int>(value >> 24);
// std::fprintf(stderr, "-->%d %d\n", co, no); CHECK(co >= 0 && co < connections_n);
if (co < 0 || co >= connections_n || no != conn[co]++) { CHECK(no == conn[co]++);
std::fprintf(stderr, "%d %d\n", co, no);
std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1));
std::fprintf(stderr, "Server BUG\n");
while (true) {
}
}
// std::fprintf(stderr, "no = %d/%d\n", no, queries_n);
client.put(value); client.put(value);
if (no + 1 >= queries_n) { if (no + 1 >= queries_n) {
active_connections--; active_connections--;
@ -750,11 +721,8 @@ class QueueBenchmark final : public td::Benchmark {
} }
void *client_run(void *) { void *client_run(void *) {
std::vector<td::int64> conn(connections_n); td::vector<td::int64> conn(connections_n);
if (queries_n >= (1 << 24)) { CHECK(queries_n < (1 << 24));
std::fprintf(stderr, "Too big queries_n\n");
std::exit(0);
}
for (int i = 0; i < connections_n; i++) { for (int i = 0; i < connections_n; i++) {
server.put(static_cast<qvalue_t>(i) << 24); server.put(static_cast<qvalue_t>(i) << 24);
} }
@ -763,15 +731,8 @@ class QueueBenchmark final : public td::Benchmark {
qvalue_t value = client.get(); qvalue_t value = client.get();
int no = value & 0x00FFFFFF; int no = value & 0x00FFFFFF;
auto co = static_cast<int>(value >> 24); auto co = static_cast<int>(value >> 24);
// std::fprintf(stderr, "<--%d %d\n", co, no); CHECK(co >= 0 && co < connections_n);
if (co < 0 || co >= connections_n || no != conn[co]++) { CHECK(no == conn[co]++);
std::fprintf(stderr, "%d %d\n", co, no);
std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1));
std::fprintf(stderr, "BUG\n");
while (true) {
}
std::exit(0);
}
if (no + 1 < queries_n) { if (no + 1 < queries_n) {
server.put(value + 1); server.put(value + 1);
} else { } else {
@ -783,11 +744,8 @@ class QueueBenchmark final : public td::Benchmark {
} }
void *client_run2(void *) { void *client_run2(void *) {
std::vector<td::int64> conn(connections_n); td::vector<td::int64> conn(connections_n);
if (queries_n >= (1 << 24)) { CHECK(queries_n < (1 << 24));
std::fprintf(stderr, "Too big queries_n\n");
std::exit(0);
}
for (int query = 0; query < queries_n; query++) { for (int query = 0; query < queries_n; query++) {
for (int i = 0; i < connections_n; i++) { for (int i = 0; i < connections_n; i++) {
server.put((static_cast<td::int64>(i) << 24) + query); server.put((static_cast<td::int64>(i) << 24) + query);
@ -796,15 +754,8 @@ class QueueBenchmark final : public td::Benchmark {
qvalue_t value = client.get(); qvalue_t value = client.get();
int no = value & 0x00FFFFFF; int no = value & 0x00FFFFFF;
auto co = static_cast<int>(value >> 24); auto co = static_cast<int>(value >> 24);
// std::fprintf(stderr, "<--%d %d\n", co, no); CHECK(co >= 0 && co < connections_n);
if (co < 0 || co >= connections_n || no != conn[co]++) { CHECK(no == conn[co]++);
std::fprintf(stderr, "%d %d\n", co, no);
std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1));
std::fprintf(stderr, "BUG\n");
while (true) {
}
std::exit(0);
}
} }
} }
// system("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq"); // system("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq");
@ -846,7 +797,6 @@ class RingBenchmark final : public td::Benchmark {
void *run() { void *run() {
qvalue_t value; qvalue_t value;
// std::fprintf(stderr, "start %d\n", int_id);
do { do {
int cnt = queue.reader_wait(); int cnt = queue.reader_wait();
CHECK(cnt == 1); CHECK(cnt == 1);
@ -886,7 +836,6 @@ class RingBenchmark final : public td::Benchmark {
pthread_create(&q[i].id, nullptr, run_gateway, &q[i]); pthread_create(&q[i].id, nullptr, run_gateway, &q[i]);
} }
std::fprintf(stderr, "run %d\n", n);
if (n < 1000) { if (n < 1000) {
n = 1000; n = 1000;
} }
@ -898,12 +847,14 @@ class RingBenchmark final : public td::Benchmark {
} }
} }
}; };
#endif
/*
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
static void test_queue() { static void test_queue() {
std::vector<td::thread> threads; td::vector<td::thread> threads;
static constexpr size_t THREAD_COUNT = 100; static constexpr size_t THREAD_COUNT = 100;
std::vector<td::MpscPollableQueue<int>> queues(THREAD_COUNT); td::vector<td::MpscPollableQueue<int>> queues(THREAD_COUNT);
for (auto &q : queues) { for (auto &q : queues) {
q.init(); q.init();
} }
@ -919,66 +870,66 @@ static void test_queue() {
}); });
} }
while (true) { for (size_t iter = 0; iter < THREAD_COUNT; iter++) {
td::usleep_for(100); td::usleep_for(100);
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
queues[td::Random::fast(0, THREAD_COUNT - 1)].writer_put(1); queues[td::Random::fast(0, THREAD_COUNT - 1)].writer_put(1);
} }
} }
for (size_t i = 0; i < THREAD_COUNT; i++) {
threads[i].join();
}
} }
#endif #endif
*/
int main() { int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
// test_queue(); // test_queue();
#endif #endif
#define BENCH_Q2(Q, N) \
std::fprintf(stderr, "!%s %d:\t", #Q, N); \
td::bench(QueueBenchmark2<Q>(N));
#define BENCH_Q(Q, N) \
std::fprintf(stderr, "%s %d:\t", #Q, N); \
td::bench(QueueBenchmark<Q>(N));
#define BENCH_R(Q) \ #if TD_PORT_POSIX
std::fprintf(stderr, "%s:\t", #Q); \
td::bench(RingBenchmark<Q>());
// TODO: yield makes it extremely slow. Yet some backoff may be necessary. // TODO: yield makes it extremely slow. Yet some backoff may be necessary.
// BENCH_R(SemQueue); // td::bench(RingBenchmark<SemQueue>());
// BENCH_R(td::PollQueue<qvalue_t>); // td::bench(RingBenchmark<td::PollQueue<qvalue_t>>());
#define BENCH_Q2(Q, N) td::bench(QueueBenchmark2<Q<qvalue_t>>(N, #Q "(" #N ")"))
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
BENCH_Q2(td::PollQueue<qvalue_t>, 1); BENCH_Q2(td::InfBackoffQueue, 1);
BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 1); BENCH_Q2(td::MpscPollableQueue, 1);
BENCH_Q2(td::PollQueue<qvalue_t>, 100); BENCH_Q2(td::PollQueue, 1);
BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 100);
BENCH_Q2(td::PollQueue<qvalue_t>, 10); BENCH_Q2(td::InfBackoffQueue, 10);
BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 10); BENCH_Q2(td::MpscPollableQueue, 10);
BENCH_Q2(td::PollQueue, 10);
BENCH_Q2(td::InfBackoffQueue, 100);
BENCH_Q2(td::MpscPollableQueue, 100);
BENCH_Q2(td::PollQueue, 100);
BENCH_Q2(td::PollQueue, 4);
BENCH_Q2(td::PollQueue, 10);
BENCH_Q2(td::PollQueue, 100);
#endif #endif
BENCH_Q(VarQueue, 1); #define BENCH_Q(Q, N) td::bench(QueueBenchmark<Q>(N, #Q "(" #N ")"))
// BENCH_Q(FdQueue, 1);
// BENCH_Q(BufferedFdQueue, 1); #if TD_LINUX
BENCH_Q(BufferQueue, 1);
BENCH_Q(BufferedFdQueue, 1);
BENCH_Q(FdQueue, 1);
#endif
BENCH_Q(PipeQueue, 1); BENCH_Q(PipeQueue, 1);
BENCH_Q(SemCheatQueue, 1); BENCH_Q(SemCheatQueue, 1);
BENCH_Q(SemQueue, 1); BENCH_Q(SemQueue, 1);
BENCH_Q(VarQueue, 1);
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED #if TD_LINUX
// BENCH_Q2(td::PollQueue<qvalue_t>, 100); BENCH_Q(BufferQueue, 4);
// BENCH_Q2(td::PollQueue<qvalue_t>, 10); BENCH_Q(BufferQueue, 10);
// BENCH_Q2(td::PollQueue<qvalue_t>, 4); BENCH_Q(BufferQueue, 100);
// BENCH_Q2(td::InfBackoffQueue<qvalue_t>, 100); #endif
// BENCH_Q2(td::InfBackoffQueue<qvalue_t>, 1);
#endif #endif
// BENCH_Q(SemCheatQueue, 1);
// BENCH_Q(BufferedFdQueue, 100);
// BENCH_Q(BufferedFdQueue, 10);
// BENCH_Q(BufferQueue, 4);
// BENCH_Q(BufferQueue, 100);
// BENCH_Q(BufferQueue, 10);
// BENCH_Q(BufferQueue, 1);
} }