diff --git a/tddb/td/db/binlog/Binlog.cpp b/tddb/td/db/binlog/Binlog.cpp index 5d4c526be..88ac4408a 100644 --- a/tddb/td/db/binlog/Binlog.cpp +++ b/tddb/td/db/binlog/Binlog.cpp @@ -667,9 +667,9 @@ void Binlog::do_reindex() { << fd_size_ << ' ' << detail::file_size(path_) << ' ' << fd_events_ << ' ' << path_; double ratio = static_cast(start_size) / static_cast(finish_size + 1); - LOG(INFO) << "Regenerate index " << tag("name", path_) << tag("time", format::as_time(finish_time - start_time)) - << tag("before_size", format::as_size(start_size)) << tag("after_size", format::as_size(finish_size)) - << tag("ratio", ratio) << tag("before_events", start_events) << tag("after_events", finish_events); + LOG(ERROR) << "Regenerate index " << tag("name", path_) << tag("time", format::as_time(finish_time - start_time)) + << tag("before_size", format::as_size(start_size)) << tag("after_size", format::as_size(finish_size)) + << tag("ratio", ratio) << tag("before_events", start_events) << tag("after_events", finish_events); buffer_writer_ = ChainBufferWriter(); buffer_reader_ = buffer_writer_.extract_reader(); diff --git a/tdutils/td/utils/ThreadSafeCounter.h b/tdutils/td/utils/ThreadSafeCounter.h index b3d94a99e..d5d1ab628 100644 --- a/tdutils/td/utils/ThreadSafeCounter.h +++ b/tdutils/td/utils/ThreadSafeCounter.h @@ -28,7 +28,7 @@ class ThreadSafeMultiCounter { int64 sum(size_t index) const { CHECK(index < N); int64 res = 0; - tls_.for_each([&res](auto &value) { res += value[index].load(std::memory_order_relaxed); }); + tls_.for_each([&res, &index](auto &value) { res += value[index].load(std::memory_order_relaxed); }); return res; } void clear() { diff --git a/tdutils/td/utils/buffer.cpp b/tdutils/td/utils/buffer.cpp index 77605ab01..cba6fbddf 100644 --- a/tdutils/td/utils/buffer.cpp +++ b/tdutils/td/utils/buffer.cpp @@ -8,6 +8,8 @@ #include "td/utils/port/thread_local.h" +#include "td/utils/ThreadSafeCounter.h" + #include #include @@ -24,6 +26,16 @@ TD_THREAD_LOCAL BufferAllocator::BufferRawTls *BufferAllocator::buffer_raw_tls; std::atomic BufferAllocator::buffer_mem; +static td::ThreadSafeCounter buffer_slice_size_; + +int64 BufferAllocator::get_buffer_slice_size() { + return buffer_slice_size_.sum(); +} + +void BufferAllocator::track_buffer_slice(int64 size) { + buffer_slice_size_.add(size); +} + size_t BufferAllocator::get_buffer_mem() { return buffer_mem; } diff --git a/tdutils/td/utils/buffer.h b/tdutils/td/utils/buffer.h index d970b1719..8d1eb2e24 100644 --- a/tdutils/td/utils/buffer.h +++ b/tdutils/td/utils/buffer.h @@ -67,10 +67,15 @@ class BufferAllocator { static ReaderPtr create_reader(const ReaderPtr &raw); static size_t get_buffer_mem(); + static int64 get_buffer_slice_size(); static void clear_thread_local(); private: + friend class BufferSlice; + + static void track_buffer_slice(int64 size); + static ReaderPtr create_reader_fast(size_t size); static WriterPtr create_writer_exact(size_t size); @@ -104,16 +109,29 @@ class BufferSlice { return; } begin_ = buffer_->begin_; + end_ = begin_; sync_with_writer(); } BufferSlice(BufferReaderPtr buffer_ptr, size_t begin, size_t end) : buffer_(std::move(buffer_ptr)), begin_(begin), end_(end) { + debug_track(); + } + BufferSlice(BufferSlice &&other) : BufferSlice(std::move(other.buffer_), other.begin_, other.end_) { + debug_untrack(); // yes, debug_untrack + } + BufferSlice &operator=(BufferSlice &&other) { + debug_untrack(); + buffer_ = std::move(other.buffer_); + begin_ = other.begin_; + end_ = other.end_; + return *this; } explicit BufferSlice(size_t size) : buffer_(BufferAllocator::create_reader(size)) { end_ = buffer_->end_.load(std::memory_order_relaxed); begin_ = end_ - ((size + 7) & -8); end_ = begin_ + size; + debug_track(); } explicit BufferSlice(Slice slice) : BufferSlice(slice.size()) { @@ -123,6 +141,17 @@ class BufferSlice { BufferSlice(const char *ptr, size_t size) : BufferSlice(Slice(ptr, size)) { } + ~BufferSlice() { + debug_untrack(); + } + + void debug_track() { + BufferAllocator::track_buffer_slice(static_cast(size())); + } + void debug_untrack() { + BufferAllocator::track_buffer_slice(-static_cast(size())); + } + BufferSlice clone() const { if (is_null()) { return BufferSlice(BufferReaderPtr(), begin_, end_); @@ -166,14 +195,18 @@ class BufferSlice { } bool confirm_read(size_t size) { + debug_untrack(); begin_ += size; CHECK(begin_ <= end_); + debug_track(); return begin_ == end_; } void truncate(size_t limit) { if (size() > limit) { + debug_untrack(); end_ = begin_ + limit; + debug_track(); } } @@ -181,6 +214,7 @@ class BufferSlice { auto res = BufferSlice(BufferAllocator::create_reader(buffer_)); res.begin_ = static_cast(slice.ubegin() - buffer_->data_); res.end_ = static_cast(slice.uend() - buffer_->data_); + res.debug_track(); CHECK(buffer_->begin_ <= res.begin_); CHECK(res.begin_ <= res.end_); CHECK(res.end_ <= buffer_->end_.load(std::memory_order_relaxed)); @@ -220,9 +254,11 @@ class BufferSlice { // set end_ into writer's end_ size_t sync_with_writer() { + debug_untrack(); CHECK(!is_null()); auto old_end = end_; end_ = buffer_->end_.load(std::memory_order_acquire); + debug_track(); return end_ - old_end; } bool is_writer_alive() const { @@ -230,6 +266,7 @@ class BufferSlice { return buffer_->has_writer_.load(std::memory_order_acquire); } void clear() { + debug_untrack(); begin_ = 0; end_ = 0; buffer_ = nullptr; diff --git a/test/http.cpp b/test/http.cpp index 55476e02d..1982c79f7 100644 --- a/test/http.cpp +++ b/test/http.cpp @@ -133,6 +133,7 @@ TEST(Http, reader) { clear_thread_locals(); SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); auto start_mem = BufferAllocator::get_buffer_mem(); + auto start_size = BufferAllocator::get_buffer_slice_size(); for (int i = 0; i < 20; i++) { td::ChainBufferWriter input_writer; auto input = input_writer.extract_reader(); @@ -184,6 +185,7 @@ TEST(Http, reader) { } clear_thread_locals(); ASSERT_EQ(start_mem, BufferAllocator::get_buffer_mem()); + ASSERT_EQ(start_size, BufferAllocator::get_buffer_slice_size()); } TEST(Http, gzip_bomb) { diff --git a/test/tqueue.cpp b/test/tqueue.cpp index 782105dba..812eeb1cc 100644 --- a/test/tqueue.cpp +++ b/test/tqueue.cpp @@ -9,6 +9,8 @@ #include "td/db/binlog/BinlogHelper.h" #include "td/db/TQueue.h" +#include "td/utils/Random.h" +#include "td/utils/buffer.h" #include "td/utils/int_types.h" #include "td/utils/misc.h" #include "td/utils/port/path.h" @@ -192,3 +194,35 @@ TEST(TQueue, random) { steps.step(rnd); } } + +TEST(TQueue, memory_leak) { + //return; + auto tqueue = td::TQueue::create(); + auto tqueue_binlog = td::make_unique>(); + std::string binlog_path = "test_tqueue.binlog"; + td::Binlog::destroy(binlog_path).ensure(); + auto binlog = std::make_shared(); + binlog->init(binlog_path, [&](const td::BinlogEvent &event) { UNREACHABLE(); }).ensure(); + tqueue_binlog->set_binlog(std::move(binlog)); + tqueue->set_callback(std::move(tqueue_binlog)); + + double now = 0; + std::vector ids; + td::Random::Xorshift128plus rnd(123); + int i = 0; + while (true) { + auto id = tqueue->push(1, "a", now + 600000, 0, {}).move_as_ok(); + ids.push_back(id); + if (ids.size() > rnd() % 100000) { + auto it = rnd() % ids.size(); + std::swap(ids.back(), ids[it]); + tqueue->forget(1, ids.back()); + ids.pop_back(); + } + now += 1; + if (i++ % 100000 == 0) { + LOG(ERROR) << td::BufferAllocator::get_buffer_mem() << " " << tqueue->get_size(1) << " " + << td::BufferAllocator::get_buffer_slice_size(); + } + } +}