tdlight/tdutils/td/utils/queue.h

485 lines
9.5 KiB
C
Raw Normal View History

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/port/EventFd.h"
#include "td/utils/port/thread.h"
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
#if !TD_WINDOWS
#include <poll.h> // for pollfd, poll, POLLIN
#include <sched.h>
#endif
#include <atomic>
#include <type_traits>
#include <utility>
namespace td {
namespace detail {
class Backoff {
int cnt = 0;
public:
bool next() {
// TODO: find out better strategy
// TODO: try adaptive backoff
// TODO: different strategy one core cpu
// return false;
cnt++;
if (cnt < 1) { // 50
return true;
} else {
td::this_thread::yield();
return cnt < 3; // 500
}
}
};
class InfBackoff {
int cnt = 0;
public:
bool next() {
cnt++;
if (cnt < 50) {
return true;
} else {
td::this_thread::yield();
return true;
}
}
};
} // namespace detail
template <class T, int P = 10>
class SPSCBlockQueue {
public:
using ValueType = T;
private:
static constexpr int buffer_size() {
static_assert(P >= 1 && P <= 20, "Bad size of BlockQueue");
return 1 << P;
}
struct Position {
std::atomic<uint32> i;
char pad[64 - sizeof(std::atomic<uint32>)];
uint32 local_writer_i;
char pad2[64 - sizeof(uint32)];
uint32 local_reader_i;
char pad3[64 - sizeof(uint32)];
void init() {
i = 0;
local_reader_i = 0;
local_writer_i = 0;
}
};
typename std::aligned_storage<sizeof(ValueType)>::type data_[buffer_size()];
Position writer_;
Position reader_;
static int fix_i(int i) {
return i & (buffer_size() - 1);
}
ValueType *at_ptr(int i) {
return reinterpret_cast<ValueType *>(&data_[fix_i(i)]);
}
ValueType &at(int i) {
return *at_ptr(i);
}
public:
void init() {
writer_.init();
reader_.init();
}
void destroy() {
}
int writer_size() {
return static_cast<int>(writer_.local_reader_i + buffer_size() - writer_.local_writer_i);
}
bool writer_empty() {
return writer_.local_reader_i + buffer_size() == writer_.local_writer_i;
}
template <class PutValueType>
void writer_put_unsafe(PutValueType &&value) {
at(writer_.local_writer_i++) = std::forward<PutValueType>(value);
}
int writer_update() {
writer_.local_reader_i = reader_.i.load(std::memory_order_acquire);
return writer_size();
}
void writer_flush() {
writer_.i.store(writer_.local_writer_i, std::memory_order_release);
}
int reader_size() {
return static_cast<int>(reader_.local_writer_i - reader_.local_reader_i);
}
int reader_empty() {
return reader_.local_writer_i == reader_.local_reader_i;
}
ValueType reader_get_unsafe() {
return std::move(at(reader_.local_reader_i++));
}
int reader_update() {
reader_.local_writer_i = writer_.i.load(std::memory_order_acquire);
return reader_size();
}
void reader_flush() {
reader_.i.store(reader_.local_reader_i, std::memory_order_release);
}
};
template <class T, class BlockQueueT = SPSCBlockQueue<T> >
class SPSCChainQueue {
public:
using ValueType = T;
void init() {
head_ = tail_ = create_node();
}
SPSCChainQueue() = default;
SPSCChainQueue(const SPSCChainQueue &) = delete;
SPSCChainQueue &operator=(const SPSCChainQueue &) = delete;
SPSCChainQueue(SPSCChainQueue &&) = delete;
SPSCChainQueue &operator=(SPSCChainQueue &&) = delete;
~SPSCChainQueue() {
destroy();
}
void destroy() {
while (head_ != nullptr) {
Node *to_delete = head_;
head_ = head_->next_;
delete_node(to_delete);
}
tail_ = nullptr;
}
int writer_size() {
return tail_->q_.writer_size();
}
bool writer_empty() {
return tail_->q_.writer_empty();
}
template <class PutValueType>
void writer_put_unsafe(PutValueType &&value) {
tail_->q_.writer_put_unsafe(std::forward<PutValueType>(value));
}
int writer_update() {
int res = tail_->q_.writer_update();
if (res != 0) {
return res;
}
writer_flush();
Node *new_tail = create_node();
tail_->next_ = new_tail;
tail_->is_closed_.store(true, std::memory_order_release);
tail_ = new_tail;
return tail_->q_.writer_update();
}
void writer_flush() {
tail_->q_.writer_flush();
}
int reader_size() {
return head_->q_.reader_size();
}
int reader_empty() {
return head_->q_.reader_empty();
}
ValueType reader_get_unsafe() {
return std::move(head_->q_.reader_get_unsafe());
}
int reader_update() {
int res = head_->q_.reader_update();
if (res != 0) {
return res;
}
if (!head_->is_closed_.load(std::memory_order_acquire)) {
return 0;
}
res = head_->q_.reader_update();
if (res != 0) {
return res;
}
// reader_flush();
Node *old_head = head_;
head_ = head_->next_;
delete_node(old_head);
return head_->q_.reader_update();
}
void reader_flush() {
head_->q_.reader_flush();
}
private:
struct Node {
BlockQueueT q_;
std::atomic<bool> is_closed_;
Node *next_;
void init() {
q_.init();
is_closed_ = false;
next_ = nullptr;
}
void destroy() {
q_.destroy();
next_ = nullptr;
}
};
Node *head_;
char pad[64 - sizeof(Node *)];
Node *tail_;
char pad2[64 - sizeof(Node *)];
Node *create_node() {
Node *res = new Node();
res->init();
return res;
}
void delete_node(Node *node) {
node->destroy();
delete node;
}
};
template <class T, class QueueT = SPSCChainQueue<T>, class BackoffT = detail::Backoff>
class BackoffQueue : public QueueT {
public:
using ValueType = T;
template <class PutValueType>
void writer_put(PutValueType &&value) {
if (this->writer_empty()) {
int sz = this->writer_update();
CHECK(sz != 0);
}
this->writer_put_unsafe(std::forward<PutValueType>(value));
}
int reader_wait() {
BackoffT backoff;
int res = 0;
do {
res = this->reader_update();
} while (res == 0 && backoff.next());
return res;
}
};
template <class T, class QueueT = SPSCChainQueue<T> >
using InfBackoffQueue = BackoffQueue<T, QueueT, detail::InfBackoff>;
template <class T, class QueueT = BackoffQueue<T> >
class PollQueue : public QueueT {
public:
using ValueType = T;
using QueueType = QueueT;
void init() {
QueueType::init();
event_fd_.init();
wait_state_ = 0;
writer_wait_state_ = 0;
}
PollQueue() = default;
PollQueue(const PollQueue &) = delete;
PollQueue &operator=(const PollQueue &) = delete;
PollQueue(PollQueue &&) = delete;
PollQueue &operator=(PollQueue &&) = delete;
~PollQueue() {
destroy_impl();
}
void destroy() {
destroy_impl();
QueueType::destroy();
}
void writer_flush() {
int old_wait_state = get_wait_state();
std::atomic_thread_fence(std::memory_order_seq_cst);
QueueType::writer_flush();
std::atomic_thread_fence(std::memory_order_seq_cst);
int wait_state = get_wait_state();
if ((wait_state & 1) && wait_state != writer_wait_state_) {
event_fd_.release();
writer_wait_state_ = old_wait_state;
}
}
EventFd &reader_get_event_fd() {
return event_fd_;
}
// if 0 is returned than it is useless to rerun it before fd is
// ready to read.
int reader_wait_nonblock() {
int res;
if ((get_wait_state() & 1) == 0) {
res = this->QueueType::reader_wait();
if (res != 0) {
return res;
}
inc_wait_state();
std::atomic_thread_fence(std::memory_order_seq_cst);
res = this->reader_update();
if (res != 0) {
inc_wait_state();
return res;
}
}
event_fd_.acquire();
std::atomic_thread_fence(std::memory_order_seq_cst);
res = this->reader_update();
if (res != 0) {
inc_wait_state();
}
return res;
}
// Just example of usage
#if !TD_WINDOWS
int reader_wait() {
int res;
while ((res = reader_wait_nonblock()) == 0) {
// TODO: reader_flush?
pollfd fd;
fd.fd = reader_get_event_fd().get_fd().get_native_fd();
fd.events = POLLIN;
poll(&fd, 1, -1);
}
return res;
}
#endif
private:
EventFd event_fd_;
std::atomic<int> wait_state_;
int writer_wait_state_;
int get_wait_state() {
return wait_state_.load(std::memory_order_relaxed);
}
void inc_wait_state() {
wait_state_.store(get_wait_state() + 1, std::memory_order_relaxed);
}
void destroy_impl() {
if (!event_fd_.empty()) {
event_fd_.close();
}
}
};
} // namespace td
#else
#include "td/utils/logging.h"
namespace td {
// dummy implementation which shouldn't be used
template <class T>
class PollQueue {
public:
using ValueType = T;
void init() {
UNREACHABLE();
}
template <class PutValueType>
void writer_put(PutValueType &&value) {
UNREACHABLE();
}
void writer_flush() {
UNREACHABLE();
}
int reader_wait_nonblock() {
UNREACHABLE();
return 0;
}
ValueType reader_get_unsafe() {
UNREACHABLE();
return ValueType();
}
void reader_flush() {
UNREACHABLE();
}
PollQueue() = default;
PollQueue(const PollQueue &) = delete;
PollQueue &operator=(const PollQueue &) = delete;
PollQueue(PollQueue &&) = delete;
PollQueue &operator=(PollQueue &&) = delete;
~PollQueue() = default;
};
} // namespace td
#endif