Gzip: more tests (with watermark and memory limit)

GitOrigin-RevId: b8eacf5a27d646b9265d2fe43b847888ad58f5af
This commit is contained in:
Arseny Smirnov 2020-07-23 16:39:13 +03:00
parent 93e0a10ffb
commit c4921adcad
4 changed files with 132 additions and 9 deletions

View File

@ -3735,9 +3735,7 @@ void Td::start_up() {
LOG_IF(FATAL, symbol != c) << "TDLib requires little-endian platform";
}
{
TsList<NetQueryDebug>::lock(); // initialize mutex before any NetQuery
}
TsList<NetQueryDebug>::lock().unlock(); // initialize mutex before any NetQuery
VLOG(td_init) << "Create Global";
set_context(std::make_shared<Global>());

View File

@ -250,7 +250,9 @@ class ByteFlowSource : public ByteFlowInterface {
parent_ = nullptr;
}
void wakeup() final {
CHECK(parent_);
if (!parent_) {
return;
}
parent_->wakeup();
}
size_t get_need_size() final {

View File

@ -15,14 +15,13 @@ char disable_linker_warning_about_empty_file_gzipbyteflow_cpp TD_UNUSED;
namespace td {
bool GzipByteFlow::loop() {
bool result = false;
if (gzip_.need_input()) {
auto slice = input_->prepare_read();
if (slice.empty()) {
if (!is_input_active_) {
gzip_.close_input();
} else {
return result;
return false;
}
} else {
gzip_.set_input(input_->prepare_read());
@ -40,10 +39,9 @@ bool GzipByteFlow::loop() {
total_output_size_ += output_size;
if (total_output_size_ > max_output_size_) {
finish(Status::Error("Max output size limit exceeded"));
return result;
return false;
}
output_.confirm_append(output_size);
result = true;
}
auto input_size = gzip_.flush_input();
@ -59,7 +57,7 @@ bool GzipByteFlow::loop() {
consume_input();
return false;
}
return result;
return true;
}
} // namespace td

View File

@ -14,6 +14,8 @@
#include "td/utils/tests.h"
#include "td/utils/Time.h"
#include <algorithm>
static void encode_decode(td::string s) {
auto r = td::gzencode(s, 2);
ASSERT_TRUE(!r.empty());
@ -119,3 +121,126 @@ TEST(Gzip, encode_decode_flow) {
ASSERT_TRUE(sink.status().is_ok());
ASSERT_EQ(str, sink.result()->move_as_buffer_slice().as_slice().str());
}
TEST(Gzip, encode_decode_flow_big) {
td::clear_thread_locals();
auto start_mem = td::BufferAllocator::get_buffer_mem();
{
auto str = std::string(1000000, 'a');
td::ChainBufferWriter input_writer;
auto input = input_writer.extract_reader();
td::ByteFlowSource source(&input);
td::GzipByteFlow gzip_encode_flow(td::Gzip::Mode::Encode);
td::GzipByteFlow gzip_decode_flow(td::Gzip::Mode::Decode);
td::GzipByteFlow gzip_encode_flow2(td::Gzip::Mode::Encode);
td::GzipByteFlow gzip_decode_flow2(td::Gzip::Mode::Decode);
td::ByteFlowSink sink;
source >> gzip_encode_flow >> gzip_decode_flow >> gzip_encode_flow2 >> gzip_decode_flow2 >> sink;
ASSERT_TRUE(!sink.is_ready());
size_t n = 200;
size_t left_size = n * str.size();
auto validate = [&](td::Slice chunk) {
CHECK(chunk.size() <= left_size);
left_size -= chunk.size();
ASSERT_TRUE(std::all_of(chunk.begin(), chunk.end(), [](auto c) { return c == 'a'; }));
};
for (size_t i = 0; i < n; i++) {
input_writer.append(str);
source.wakeup();
auto extra_mem = td::BufferAllocator::get_buffer_mem() - start_mem;
// limit means nothing. just check that we do not use 200Mb or so
CHECK(extra_mem < (10 << 20));
auto size = sink.get_output()->size();
validate(sink.get_output()->cut_head(size).move_as_buffer_slice().as_slice());
}
ASSERT_TRUE(!sink.is_ready());
source.close_input(td::Status::OK());
ASSERT_TRUE(sink.is_ready());
LOG_IF(ERROR, sink.status().is_error()) << sink.status();
ASSERT_TRUE(sink.status().is_ok());
validate(sink.result()->move_as_buffer_slice().as_slice());
ASSERT_EQ(0u, left_size);
}
td::clear_thread_locals();
ASSERT_EQ(start_mem, td::BufferAllocator::get_buffer_mem());
}
TEST(Gzip, decode_encode_flow_bomb) {
std::string gzip_bomb_str;
size_t N = 200;
{
td::ChainBufferWriter input_writer;
auto input = input_writer.extract_reader();
td::GzipByteFlow gzip_flow(td::Gzip::Mode::Encode);
td::ByteFlowSource source(&input);
td::ByteFlowSink sink;
source >> gzip_flow >> sink;
std::string s(1 << 20, 'a');
for (size_t i = 0; i < N; i++) {
input_writer.append(s);
source.wakeup();
}
source.close_input(td::Status::OK());
ASSERT_TRUE(sink.is_ready());
LOG_IF(ERROR, sink.status().is_error()) << sink.status();
ASSERT_TRUE(sink.status().is_ok());
gzip_bomb_str = sink.result()->move_as_buffer_slice().as_slice().str();
}
td::clear_thread_locals();
auto start_mem = td::BufferAllocator::get_buffer_mem();
{
td::ChainBufferWriter input_writer;
auto input = input_writer.extract_reader();
td::ByteFlowSource source(&input);
td::GzipByteFlow::Options decode_options;
decode_options.write_watermark.low = 2 << 20;
decode_options.write_watermark.high = 4 << 20;
td::GzipByteFlow::Options encode_options;
encode_options.read_watermark.low = 2 << 20;
encode_options.read_watermark.high = 4 << 20;
td::GzipByteFlow gzip_decode_flow(td::Gzip::Mode::Decode);
gzip_decode_flow.set_options(decode_options);
td::GzipByteFlow gzip_encode_flow(td::Gzip::Mode::Encode);
gzip_encode_flow.set_options(encode_options);
td::GzipByteFlow gzip_decode_flow2(td::Gzip::Mode::Decode);
gzip_decode_flow2.set_options(decode_options);
td::GzipByteFlow gzip_encode_flow2(td::Gzip::Mode::Encode);
gzip_encode_flow2.set_options(encode_options);
td::GzipByteFlow gzip_decode_flow3(td::Gzip::Mode::Decode);
gzip_decode_flow3.set_options(decode_options);
td::ByteFlowSink sink;
source >> gzip_decode_flow >> gzip_encode_flow >> gzip_decode_flow2 >> gzip_encode_flow2 >> gzip_decode_flow3 >>
sink;
ASSERT_TRUE(!sink.is_ready());
size_t left_size = N * (1 << 20);
auto validate = [&](td::Slice chunk) {
CHECK(chunk.size() <= left_size);
left_size -= chunk.size();
ASSERT_TRUE(std::all_of(chunk.begin(), chunk.end(), [](auto c) { return c == 'a'; }));
};
input_writer.append(gzip_bomb_str);
source.close_input(td::Status::OK());
do {
gzip_decode_flow3.wakeup();
gzip_decode_flow2.wakeup();
gzip_decode_flow.wakeup();
source.wakeup();
auto extra_mem = td::BufferAllocator::get_buffer_mem() - start_mem;
// limit means nothing. just check that we do not use 200Mb or so
CHECK(extra_mem < (10 << 20));
auto size = sink.get_output()->size();
validate(sink.get_output()->cut_head(size).move_as_buffer_slice().as_slice());
} while (!sink.is_ready());
ASSERT_EQ(0u, left_size);
}
td::clear_thread_locals();
ASSERT_EQ(start_mem, td::BufferAllocator::get_buffer_mem());
}