tdlight/tdutils/td/utils/buffer.h

766 lines
19 KiB
C
Raw Normal View History

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/thread_local.h"
#include "td/utils/Slice.h"
#include <atomic>
#include <cstring>
#include <limits>
#include <memory>
namespace td {
struct BufferRaw {
size_t data_size_;
// Constant after first reader is created.
// May be change by writer before it.
// So writer may do prepends till there is no reader created.
size_t begin_;
// Write by writer.
// Read by reader.
std::atomic<size_t> end_;
mutable std::atomic<int32> ref_cnt_;
std::atomic<bool> has_writer_;
bool was_reader_;
alignas(4) unsigned char data_[1];
};
class BufferAllocator {
public:
class DeleteWriterPtr {
public:
void operator()(BufferRaw *ptr) {
ptr->has_writer_.store(false, std::memory_order_release);
dec_ref_cnt(ptr);
}
};
class DeleteReaderPtr {
public:
void operator()(BufferRaw *ptr) {
dec_ref_cnt(ptr);
}
};
using WriterPtr = std::unique_ptr<BufferRaw, DeleteWriterPtr>;
using ReaderPtr = std::unique_ptr<BufferRaw, DeleteReaderPtr>;
static WriterPtr create_writer(size_t size);
static WriterPtr create_writer(size_t size, size_t prepend, size_t append);
static ReaderPtr create_reader(size_t size);
static ReaderPtr create_reader(const WriterPtr &raw);
static ReaderPtr create_reader(const ReaderPtr &raw);
static size_t get_buffer_mem();
static void clear_thread_local();
private:
static ReaderPtr create_reader_fast(size_t size);
static WriterPtr create_writer_exact(size_t size);
struct BufferRawDeleter {
void operator()(BufferRaw *ptr) {
dec_ref_cnt(ptr);
}
};
struct BufferRawTls {
std::unique_ptr<BufferRaw, BufferRawDeleter> buffer_raw;
};
static TD_THREAD_LOCAL BufferRawTls *buffer_raw_tls;
static void dec_ref_cnt(BufferRaw *ptr);
static BufferRaw *create_buffer_raw(size_t size);
static std::atomic<size_t> buffer_mem;
};
using BufferWriterPtr = BufferAllocator::WriterPtr;
using BufferReaderPtr = BufferAllocator::ReaderPtr;
class BufferSlice {
public:
BufferSlice() = default;
explicit BufferSlice(BufferReaderPtr buffer_ptr) : buffer_(std::move(buffer_ptr)) {
if (is_null()) {
return;
}
begin_ = buffer_->begin_;
sync_with_writer();
}
BufferSlice(BufferReaderPtr buffer_ptr, size_t begin, size_t end)
: buffer_(std::move(buffer_ptr)), begin_(begin), end_(end) {
}
explicit BufferSlice(size_t size) : buffer_(BufferAllocator::create_reader(size)) {
end_ = buffer_->end_.load(std::memory_order_relaxed);
begin_ = end_ - ((size + 7) & -8);
end_ = begin_ + size;
}
explicit BufferSlice(Slice slice) : BufferSlice(slice.size()) {
std::memcpy(as_slice().begin(), slice.begin(), slice.size());
}
BufferSlice(const char *ptr, size_t size) : BufferSlice(Slice(ptr, size)) {
}
BufferSlice clone() const {
if (is_null()) {
return BufferSlice(BufferReaderPtr(), begin_, end_);
}
return BufferSlice(BufferAllocator::create_reader(buffer_), begin_, end_);
}
BufferSlice copy() const {
if (is_null()) {
return BufferSlice(BufferReaderPtr(), begin_, end_);
}
return BufferSlice(as_slice());
}
Slice as_slice() const {
if (is_null()) {
return Slice();
}
return Slice(buffer_->data_ + begin_, size());
}
operator Slice() const {
return as_slice();
}
MutableSlice as_slice() {
if (is_null()) {
return MutableSlice();
}
return MutableSlice(buffer_->data_ + begin_, size());
}
Slice prepare_read() const {
return as_slice();
}
Slice after(size_t offset) const {
auto full = as_slice();
full.remove_prefix(offset);
return full;
}
bool confirm_read(size_t size) {
begin_ += size;
CHECK(begin_ <= end_);
return begin_ == end_;
}
void truncate(size_t limit) {
if (size() > limit) {
end_ = begin_ + limit;
}
}
BufferSlice from_slice(Slice slice) const {
auto res = BufferSlice(BufferAllocator::create_reader(buffer_));
res.begin_ = static_cast<size_t>(slice.ubegin() - buffer_->data_);
res.end_ = static_cast<size_t>(slice.uend() - buffer_->data_);
CHECK(buffer_->begin_ <= res.begin_);
CHECK(res.begin_ <= res.end_);
CHECK(res.end_ <= buffer_->end_.load(std::memory_order_relaxed));
return res;
}
// like in std::string
char *data() {
return as_slice().data();
}
const char *data() const {
return as_slice().data();
}
char operator[](size_t at) const {
return as_slice()[at];
}
bool empty() const {
return size() == 0;
}
bool is_null() const {
return !buffer_;
}
size_t size() const {
if (is_null()) {
return 0;
}
return end_ - begin_;
}
// like in std::string
size_t length() const {
return size();
}
// set end_ into writer's end_
size_t sync_with_writer() {
CHECK(!is_null());
auto old_end = end_;
end_ = buffer_->end_.load(std::memory_order_acquire);
return end_ - old_end;
}
bool is_writer_alive() const {
CHECK(!is_null());
return buffer_->has_writer_.load(std::memory_order_acquire);
}
void clear() {
begin_ = 0;
end_ = 0;
buffer_ = nullptr;
}
private:
BufferReaderPtr buffer_;
size_t begin_ = 0;
size_t end_ = 0;
};
template <class StorerT>
void store(const BufferSlice &buffer_slice, StorerT &storer) {
storer.store_string(buffer_slice);
}
template <class ParserT>
void parse(BufferSlice &buffer_slice, ParserT &parser) {
buffer_slice = parser.template fetch_string<BufferSlice>();
}
class BufferWriter {
public:
BufferWriter() = default;
explicit BufferWriter(size_t size) : BufferWriter(BufferAllocator::create_writer(size)) {
}
BufferWriter(size_t size, size_t prepend, size_t append)
: BufferWriter(BufferAllocator::create_writer(size, prepend, append)) {
}
BufferWriter(Slice slice, size_t prepend, size_t append)
: BufferWriter(BufferAllocator::create_writer(slice.size(), prepend, append)) {
as_slice().copy_from(slice);
}
explicit BufferWriter(BufferWriterPtr buffer_ptr) : buffer_(std::move(buffer_ptr)) {
}
BufferSlice as_buffer_slice() const {
return BufferSlice(BufferAllocator::create_reader(buffer_));
}
bool is_null() const {
return !buffer_;
}
bool empty() const {
return size() == 0;
}
size_t size() const {
if (is_null()) {
return 0;
}
return buffer_->end_.load(std::memory_order_relaxed) - buffer_->begin_;
}
MutableSlice as_slice() {
auto end = buffer_->end_.load(std::memory_order_relaxed);
return MutableSlice(buffer_->data_ + buffer_->begin_, buffer_->data_ + end);
}
MutableSlice prepare_prepend() {
if (is_null()) {
return MutableSlice();
}
CHECK(!buffer_->was_reader_);
return MutableSlice(buffer_->data_, buffer_->begin_);
}
MutableSlice prepare_append() {
if (is_null()) {
return MutableSlice();
}
auto end = buffer_->end_.load(std::memory_order_relaxed);
return MutableSlice(buffer_->data_ + end, buffer_->data_size_ - end);
}
void confirm_append(size_t size) {
if (is_null()) {
CHECK(size == 0);
return;
}
auto new_end = buffer_->end_.load(std::memory_order_relaxed) + size;
CHECK(new_end <= buffer_->data_size_);
buffer_->end_.store(new_end, std::memory_order_release);
}
void confirm_prepend(size_t size) {
if (is_null()) {
CHECK(size == 0);
return;
}
CHECK(buffer_->begin_ >= size);
buffer_->begin_ -= size;
}
private:
BufferWriterPtr buffer_;
};
struct ChainBufferNode {
friend struct DeleteWriterPtr;
struct DeleteWriterPtr {
void operator()(ChainBufferNode *ptr) {
ptr->has_writer_.store(false, std::memory_order_release);
dec_ref_cnt(ptr);
}
};
friend struct DeleteReaderPtr;
struct DeleteReaderPtr {
void operator()(ChainBufferNode *ptr) {
dec_ref_cnt(ptr);
}
};
using WriterPtr = std::unique_ptr<ChainBufferNode, DeleteWriterPtr>;
using ReaderPtr = std::unique_ptr<ChainBufferNode, DeleteReaderPtr>;
static WriterPtr make_writer_ptr(ChainBufferNode *ptr) {
ptr->ref_cnt_.store(1, std::memory_order_relaxed);
ptr->has_writer_.store(true, std::memory_order_relaxed);
return WriterPtr(ptr);
}
static ReaderPtr make_reader_ptr(ChainBufferNode *ptr) {
ptr->ref_cnt_.fetch_add(1, std::memory_order_acq_rel);
return ReaderPtr(ptr);
}
bool has_writer() {
return has_writer_.load(std::memory_order_acquire);
}
bool unique() {
return ref_cnt_.load(std::memory_order_acquire) == 1;
}
ChainBufferNode(BufferSlice slice, bool sync_flag) : slice_(std::move(slice)), sync_flag_(sync_flag) {
}
// reader
// There are two options
// 1. Fixed slice of Buffer
// 2. Slice with non-fixed right end
// In each case slice_ is const. Reader should read it and use sync_with_writer on its own copy.
const BufferSlice slice_;
const bool sync_flag_{false}; // should we call slice_.sync_with_writer or not.
// writer
ReaderPtr next_{nullptr};
private:
std::atomic<int> ref_cnt_{0};
std::atomic<bool> has_writer_{false};
static void clear_nonrecursive(ReaderPtr ptr) {
while (ptr && ptr->unique()) {
ptr = std::move(ptr->next_);
}
}
static void dec_ref_cnt(ChainBufferNode *ptr) {
int left = --ptr->ref_cnt_;
if (left == 0) {
clear_nonrecursive(std::move(ptr->next_));
// TODO(refact): move memory management into allocator (?)
delete ptr;
}
}
};
using ChainBufferNodeWriterPtr = ChainBufferNode::WriterPtr;
using ChainBufferNodeReaderPtr = ChainBufferNode::ReaderPtr;
class ChainBufferNodeAllocator {
public:
static ChainBufferNodeWriterPtr create(BufferSlice slice, bool sync_flag) {
auto *ptr = new ChainBufferNode(std::move(slice), sync_flag);
return ChainBufferNode::make_writer_ptr(ptr);
}
static ChainBufferNodeReaderPtr clone(const ChainBufferNodeReaderPtr &ptr) {
if (!ptr) {
return ChainBufferNodeReaderPtr();
}
return ChainBufferNode::make_reader_ptr(ptr.get());
}
static ChainBufferNodeReaderPtr clone(ChainBufferNodeWriterPtr &ptr) {
if (!ptr) {
return ChainBufferNodeReaderPtr();
}
return ChainBufferNode::make_reader_ptr(ptr.get());
}
};
class ChainBufferIterator {
public:
ChainBufferIterator() = default;
explicit ChainBufferIterator(ChainBufferNodeReaderPtr head) : head_(std::move(head)) {
load_head();
}
ChainBufferIterator clone() const {
return ChainBufferIterator(ChainBufferNodeAllocator::clone(head_), reader_.clone(), need_sync_, offset_);
}
size_t offset() const {
return offset_;
}
void clear() {
*this = ChainBufferIterator();
}
Slice prepare_read() {
if (!head_) {
return Slice();
}
while (true) {
auto res = reader_.prepare_read();
if (!res.empty()) {
return res;
}
auto has_writer = head_->has_writer();
if (need_sync_) {
reader_.sync_with_writer();
res = reader_.prepare_read();
if (!res.empty()) {
return res;
}
}
if (has_writer) {
return Slice();
}
head_ = ChainBufferNodeAllocator::clone(head_->next_);
if (!head_) {
return Slice();
}
load_head();
}
}
// returns only head
BufferSlice read_as_buffer_slice(size_t limit) {
prepare_read();
auto res = reader_.clone();
res.truncate(limit);
confirm_read(res.size());
return res;
}
const BufferSlice &head() const {
return reader_;
}
void confirm_read(size_t size) {
offset_ += size;
reader_.confirm_read(size);
}
void advance_till_end() {
while (true) {
auto ready = prepare_read();
if (ready.empty()) {
break;
}
confirm_read(ready.size());
}
}
size_t advance(size_t offset, MutableSlice dest = MutableSlice()) {
size_t skipped = 0;
while (offset != 0) {
auto ready = prepare_read();
if (ready.empty()) {
break;
}
// read no more than offset
ready.truncate(offset);
offset -= ready.size();
skipped += ready.size();
// copy to dest if possible
auto to_dest_size = min(ready.size(), dest.size());
if (to_dest_size != 0) {
std::memcpy(dest.data(), ready.data(), to_dest_size);
dest.remove_prefix(to_dest_size);
}
confirm_read(ready.size());
}
return skipped;
}
private:
ChainBufferNodeReaderPtr head_;
BufferSlice reader_; // copy of head_->slice_
bool need_sync_ = false; // copy of head_->sync_flag_
size_t offset_ = 0; // position in the union of all nodes
ChainBufferIterator(ChainBufferNodeReaderPtr head, BufferSlice reader, bool need_sync, size_t offset)
: head_(std::move(head)), reader_(std::move(reader)), need_sync_(need_sync), offset_(offset) {
}
void load_head() {
reader_ = head_->slice_.clone();
need_sync_ = head_->sync_flag_;
}
};
class ChainBufferReader {
public:
ChainBufferReader() = default;
explicit ChainBufferReader(ChainBufferNodeReaderPtr head)
: begin_(ChainBufferNodeAllocator::clone(head)), end_(std::move(head)) {
end_.advance_till_end();
}
ChainBufferReader(ChainBufferIterator begin, ChainBufferIterator end, bool sync_flag)
: begin_(std::move(begin)), end_(std::move(end)), sync_flag_(sync_flag) {
}
ChainBufferReader(ChainBufferNodeReaderPtr head, size_t size)
: begin_(ChainBufferNodeAllocator::clone(head)), end_(std::move(head)) {
auto advanced = end_.advance(size);
CHECK(advanced == size);
}
ChainBufferReader(ChainBufferReader &&) = default;
ChainBufferReader &operator=(ChainBufferReader &&) = default;
ChainBufferReader(const ChainBufferReader &) = delete;
ChainBufferReader &operator=(const ChainBufferReader &) = delete;
~ChainBufferReader() = default;
ChainBufferReader clone() {
return ChainBufferReader(begin_.clone(), end_.clone(), sync_flag_);
}
Slice prepare_read() {
auto res = begin_.prepare_read();
res.truncate(size());
return res;
}
void confirm_read(size_t size) {
CHECK(size <= this->size());
begin_.confirm_read(size);
}
size_t advance(size_t offset, MutableSlice dest = MutableSlice()) {
CHECK(offset <= size());
return begin_.advance(offset, dest);
}
size_t size() const {
return end_.offset() - begin_.offset();
}
bool empty() const {
return size() == 0;
}
void sync_with_writer() {
if (sync_flag_) {
end_.advance_till_end();
}
}
void advance_end(size_t size) {
end_.advance(size);
}
const ChainBufferIterator &begin() {
return begin_;
}
const ChainBufferIterator &end() {
return end_;
}
// Return [begin_, tail.begin_)
// *this = tail
ChainBufferReader cut_head(ChainBufferIterator pos) {
auto tmp = begin_.clone();
begin_ = pos.clone();
return ChainBufferReader(std::move(tmp), std::move(pos), false);
}
ChainBufferReader cut_head(size_t offset) {
CHECK(offset <= size()) << offset << " " << size();
auto it = begin_.clone();
it.advance(offset);
return cut_head(std::move(it));
}
BufferSlice move_as_buffer_slice() {
BufferSlice res;
if (begin_.head().size() >= size()) {
res = begin_.read_as_buffer_slice(size());
} else {
auto save_size = size();
res = BufferSlice{save_size};
advance(save_size, res.as_slice());
}
*this = ChainBufferReader();
return res;
}
BufferSlice read_as_buffer_slice(size_t limit = std::numeric_limits<size_t>::max()) {
return begin_.read_as_buffer_slice(min(limit, size()));
}
private:
ChainBufferIterator begin_; // use it for prepare_read. Fix result with size()
ChainBufferIterator end_; // keep end as far as we can. use it for size()
bool sync_flag_ = true; // auto sync of end_
// 1. We have fixed size. Than end_ is useless.
// 2. No fixed size. One has to sync end_ with end_.advance_till_end() in order to calculate size.
};
class ChainBufferWriter {
public:
ChainBufferWriter() {
init();
}
void init(size_t size = 0) {
writer_ = BufferWriter(size);
tail_ = ChainBufferNodeAllocator::create(writer_.as_buffer_slice(), true);
head_ = ChainBufferNodeAllocator::clone(tail_);
}
MutableSlice prepare_append(size_t hint = 0) {
CHECK(!empty());
auto res = prepare_append_inplace();
if (res.empty()) {
return prepare_append_alloc(hint);
}
return res;
}
MutableSlice prepare_append_inplace() {
CHECK(!empty());
return writer_.prepare_append();
}
MutableSlice prepare_append_alloc(size_t hint = 0) {
CHECK(!empty());
if (hint < (1 << 10)) {
hint = 1 << 12;
}
BufferWriter new_writer(hint);
auto new_tail = ChainBufferNodeAllocator::create(new_writer.as_buffer_slice(), true);
tail_->next_ = ChainBufferNodeAllocator::clone(new_tail);
writer_ = std::move(new_writer);
tail_ = std::move(new_tail); // release tail_
return writer_.prepare_append();
}
void confirm_append(size_t size) {
CHECK(!empty());
writer_.confirm_append(size);
}
void append(Slice slice) {
while (!slice.empty()) {
auto ready = prepare_append(slice.size());
auto shift = min(ready.size(), slice.size());
std::memcpy(ready.data(), slice.data(), shift);
confirm_append(shift);
slice.remove_prefix(shift);
}
}
void append(BufferSlice slice) {
auto ready = prepare_append_inplace();
// TODO(perf): we have to store some stats in ChainBufferWriter
// for better append logic
if (slice.size() < (1 << 8) || ready.size() >= slice.size()) {
return append(slice.as_slice());
}
auto new_tail = ChainBufferNodeAllocator::create(std::move(slice), false);
tail_->next_ = ChainBufferNodeAllocator::clone(new_tail);
writer_ = BufferWriter();
tail_ = std::move(new_tail); // release tail_
}
void append(ChainBufferReader &&reader) {
while (!reader.empty()) {
append(reader.read_as_buffer_slice());
}
}
void append(ChainBufferReader &reader) {
while (!reader.empty()) {
append(reader.read_as_buffer_slice());
}
}
ChainBufferReader extract_reader() {
CHECK(head_);
return ChainBufferReader(std::move(head_));
}
private:
bool empty() const {
return !tail_;
}
ChainBufferNodeReaderPtr head_;
ChainBufferNodeWriterPtr tail_;
BufferWriter writer_;
};
class BufferBuilder {
public:
BufferBuilder() = default;
BufferBuilder(Slice slice, size_t prepend_size, size_t append_size)
: buffer_writer_(slice, prepend_size, append_size) {
}
void append(BufferSlice slice);
void append(Slice slice);
void prepend(BufferSlice slice);
void prepend(Slice slice);
template <class F>
void for_each(F &&f) {
for (auto &slice : reversed(to_prepend_)) {
f(slice);
}
if (!buffer_writer_.empty()) {
f(buffer_writer_.as_buffer_slice());
}
for (auto &slice : to_append_) {
f(slice);
}
}
BufferSlice extract();
private:
BufferWriter buffer_writer_;
std::vector<BufferSlice> to_append_;
std::vector<BufferSlice> to_prepend_;
bool append_inplace(Slice slice);
void append_slow(BufferSlice slice);
bool prepend_inplace(Slice slice);
void prepend_slow(BufferSlice slice);
};
} // namespace td