From 054a5657f868aa057a940bc1051523de88316704 Mon Sep 17 00:00:00 2001 From: heyongqiang Date: Wed, 27 Jun 2012 23:41:33 -0700 Subject: [PATCH] add zlib compression Summary: add zlib compression Test Plan: Will add more testcases Reviewers: dhruba Reviewed By: dhruba Differential Revision: https://reviews.facebook.net/D3873 --- build_detect_platform | 12 +++- include/leveldb/options.h | 3 +- port/port_android.h | 12 ++++ port/port_posix.h | 126 ++++++++++++++++++++++++++++++++++ table/format.cc | 15 ++++- table/table_builder.cc | 22 +++++- table/table_test.cc | 138 +++++++++++++++++++++++++------------- 7 files changed, 276 insertions(+), 52 deletions(-) diff --git a/build_detect_platform b/build_detect_platform index 4b5de4a2a..1b9b9cbeb 100755 --- a/build_detect_platform +++ b/build_detect_platform @@ -51,7 +51,7 @@ case "$TARGET_OS" in ;; Linux) PLATFORM=OS_LINUX - COMMON_FLAGS="-fno-builtin-memcmp -pthread -DOS_LINUX -fPIC" + COMMON_FLAGS="-I/usr/include -fno-builtin-memcmp -pthread -DOS_LINUX -fPIC" PLATFORM_LDFLAGS="-pthread" PORT_FILE=port/port_posix.cc ;; @@ -139,6 +139,16 @@ EOF PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lsnappy" fi + # Test whether zlib library is installed + $CXX $CFLAGS $COMMON_FLAGS -x c++ - -o /dev/null 2>/dev/null < + int main() {} +EOF + if [ "$?" = 0 ]; then + COMMON_FLAGS="$COMMON_FLAGS -DZLIB" + PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lz" + fi + # Test whether tcmalloc is available $CXX $CFLAGS -x c++ - -o /dev/null -ltcmalloc 2>/dev/null < #endif +#ifdef ZLIB +#include +#endif #include #include +#include #include "port/atomic_pointer.h" #ifdef LITTLE_ENDIAN @@ -119,6 +123,128 @@ inline bool Snappy_Uncompress(const char* input, size_t length, #endif } +inline bool Zlib_Compress(const char* input, size_t length, + ::std::string* output, int windowBits = 15, int level = -1, + int strategy = 0) { +#ifdef ZLIB + // The memLevel parameter specifies how much memory should be allocated for + // the internal compression state. + // memLevel=1 uses minimum memory but is slow and reduces compression ratio. + // memLevel=9 uses maximum memory for optimal speed. + // The default value is 8. See zconf.h for more details. + static const int memLevel = 8; + z_stream _stream; + memset(&_stream, 0, sizeof(z_stream)); + int st = deflateInit2(&_stream, level, Z_DEFLATED, windowBits, + memLevel, strategy); + if (st != Z_OK) { + return false; + } + + // Resize output to be the plain data length. + // This may not be big enough if the compression actually expands data. + output->resize(length); + + // Compress the input, and put compressed data in output. + _stream.next_in = (Bytef *)input; + _stream.avail_in = length; + + // Initialize the output size. + _stream.avail_out = length; + _stream.next_out = (Bytef *)&(*output)[0]; + + int old_sz =0, new_sz =0; + while(_stream.next_in != NULL && _stream.avail_in != 0) { + int st = deflate(&_stream, Z_FINISH); + switch (st) { + case Z_STREAM_END: + break; + case Z_OK: + // No output space. Increase the output space by 20%. + // (Should we fail the compression since it expands the size?) + old_sz = output->size(); + new_sz = output->size() * 1.2; + output->resize(new_sz); + // Set more output. + _stream.next_out = (Bytef *)&(*output)[old_sz]; + _stream.avail_out = new_sz - old_sz; + break; + case Z_BUF_ERROR: + default: + deflateEnd(&_stream); + return false; + } + } + + output->resize(output->size() - _stream.avail_out); + deflateEnd(&_stream); + return true; +#endif + return false; +} + +inline char* Zlib_Uncompress(const char* input_data, size_t input_length, + int* decompress_size, int windowBits = 15) { +#ifdef ZLIB + z_stream _stream; + memset(&_stream, 0, sizeof(z_stream)); + + // For raw inflate, the windowBits should be Ð8..Ð15. + // If windowBits is bigger than zero, it will use either zlib + // header or gzip header. Adding 32 to it will do automatic detection. + int st = inflateInit2(&_stream, + windowBits > 0 ? windowBits + 32 : windowBits); + if (st != Z_OK) { + return NULL; + } + + _stream.next_in = (Bytef *)input_data; + _stream.avail_in = input_length; + + // Assume the decompressed data size will 5x of compressed size. + int output_len = input_length * 5; + char* output = new char[output_len]; + int old_sz = output_len; + + _stream.next_out = (Bytef *)output; + _stream.avail_out = output_len; + + char* tmp = NULL; + + while(_stream.next_in != NULL && _stream.avail_in != 0) { + int st = inflate(&_stream, Z_SYNC_FLUSH); + switch (st) { + case Z_STREAM_END: + break; + case Z_OK: + // No output space. Increase the output space by 20%. + old_sz = output_len; + output_len = output_len * 1.2; + tmp = new char[output_len]; + memcpy(tmp, output, old_sz); + delete[] output; + output = tmp; + + // Set more output. + _stream.next_out = (Bytef *)(output + old_sz); + _stream.avail_out = output_len - old_sz; + break; + case Z_BUF_ERROR: + default: + delete[] output; + inflateEnd(&_stream); + return NULL; + } + } + + *decompress_size = output_len - _stream.avail_out; + inflateEnd(&_stream); + return output; +#endif + + return false; +} + inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) { return false; } diff --git a/table/format.cc b/table/format.cc index cda1decdf..728a8431d 100644 --- a/table/format.cc +++ b/table/format.cc @@ -98,6 +98,7 @@ Status ReadBlock(RandomAccessFile* file, } } + char* ubuf = NULL; switch (data[n]) { case kNoCompression: if (data != buf) { @@ -122,7 +123,7 @@ Status ReadBlock(RandomAccessFile* file, delete[] buf; return Status::Corruption("corrupted compressed block contents"); } - char* ubuf = new char[ulength]; + ubuf = new char[ulength]; if (!port::Snappy_Uncompress(data, n, ubuf)) { delete[] buf; delete[] ubuf; @@ -134,6 +135,18 @@ Status ReadBlock(RandomAccessFile* file, result->cachable = true; break; } + case kZlibCompression: + int decompress_size; + ubuf = port::Zlib_Uncompress(data, n, &decompress_size); + if (!ubuf) { + delete[] buf; + return Status::Corruption("corrupted compressed block contents"); + } + delete[] buf; + result->data = Slice(ubuf, decompress_size); + result->heap_allocated = true; + result->cachable = true; + break; default: delete[] buf; return Status::Corruption("bad block type"); diff --git a/table/table_builder.cc b/table/table_builder.cc index 62002c84f..4a8d767db 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -136,6 +136,11 @@ void TableBuilder::Flush() { } } +static bool GoodCompressionRatio(int compressed_size, int raw_size) { + // Check to see if compressed less than 12.5% + return compressed_size < raw_size - (raw_size / 8u); +} + void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { // File format contains a sequence of blocks where each block has: // block_data: uint8[n] @@ -147,7 +152,6 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { Slice block_contents; CompressionType type = r->options.compression; - // TODO(postrelease): Support more compression options: zlib? switch (type) { case kNoCompression: block_contents = raw; @@ -156,16 +160,28 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { case kSnappyCompression: { std::string* compressed = &r->compressed_output; if (port::Snappy_Compress(raw.data(), raw.size(), compressed) && - compressed->size() < raw.size() - (raw.size() / 8u)) { + GoodCompressionRatio(compressed->size(), raw.size())) { block_contents = *compressed; } else { - // Snappy not supported, or compressed less than 12.5%, so just + // Snappy not supported, or not good compression ratio, so just // store uncompressed form block_contents = raw; type = kNoCompression; } break; } + case kZlibCompression: + std::string* compressed = &r->compressed_output; + if (port::Zlib_Compress(raw.data(), raw.size(), compressed) && + GoodCompressionRatio(compressed->size(), raw.size())) { + block_contents = *compressed; + } else { + // Zlib not supported, or not good compression ratio, so just + // store uncompressed form + block_contents = raw; + type = kNoCompression; + } + break; } WriteRawBlock(block_contents, type, handle); r->compressed_output.clear(); diff --git a/table/table_test.cc b/table/table_test.cc index 4792e542c..fe7e96b44 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -396,6 +396,18 @@ class DBConstructor: public Constructor { DB* db_; }; +static bool SnappyCompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Snappy_Compress(in.data(), in.size(), &out); +} + +static bool ZlibCompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Zlib_Compress(in.data(), in.size(), &out); +} + enum TestType { TABLE_TEST, BLOCK_TEST, @@ -407,32 +419,47 @@ struct TestArgs { TestType type; bool reverse_compare; int restart_interval; + CompressionType compression; }; -static const TestArgs kTestArgList[] = { - { TABLE_TEST, false, 16 }, - { TABLE_TEST, false, 1 }, - { TABLE_TEST, false, 1024 }, - { TABLE_TEST, true, 16 }, - { TABLE_TEST, true, 1 }, - { TABLE_TEST, true, 1024 }, - { BLOCK_TEST, false, 16 }, - { BLOCK_TEST, false, 1 }, - { BLOCK_TEST, false, 1024 }, - { BLOCK_TEST, true, 16 }, - { BLOCK_TEST, true, 1 }, - { BLOCK_TEST, true, 1024 }, +static std::vector Generate_Arg_List() +{ + std::vector ret; + TestType test_type[4] = {TABLE_TEST, BLOCK_TEST, MEMTABLE_TEST, DB_TEST}; + int test_type_len = 4; + bool reverse_compare[2] = {false, true}; + int reverse_compare_len = 2; + int restart_interval[3] = {16, 1, 1024}; + int restart_interval_len = 3; - // Restart interval does not matter for memtables - { MEMTABLE_TEST, false, 16 }, - { MEMTABLE_TEST, true, 16 }, + // Only add compression if it is supported + std::vector compression_types; + compression_types.push_back(kNoCompression); +#ifdef SNAPPY + if (SnappyCompressionSupported()) + compression_types.push_back(kSnappyCompression); +#endif - // Do not bother with restart interval variations for DB - { DB_TEST, false, 16 }, - { DB_TEST, true, 16 }, -}; -static const int kNumTestArgs = sizeof(kTestArgList) / sizeof(kTestArgList[0]); +#ifdef ZLIB + if (ZlibCompressionSupported()) + compression_types.push_back(kZlibCompression); +#endif + + for(int i =0; i < test_type_len; i++) + for (int j =0; j < reverse_compare_len; j++) + for (int k =0; k < restart_interval_len; k++) + for (int n =0; n < compression_types.size(); n++) { + TestArgs one_arg; + one_arg.type = test_type[i]; + one_arg.reverse_compare = reverse_compare[j]; + one_arg.restart_interval = restart_interval[k]; + one_arg.compression = compression_types[n]; + ret.push_back(one_arg); + } + + return ret; +} class Harness { public: @@ -444,6 +471,7 @@ class Harness { options_ = Options(); options_.block_restart_interval = args.restart_interval; + options_.compression = args.compression; // Use shorter block size for tests to exercise block boundary // conditions more. options_.block_size = 256; @@ -646,8 +674,9 @@ class Harness { // Test the empty key TEST(Harness, SimpleEmptyKey) { - for (int i = 0; i < kNumTestArgs; i++) { - Init(kTestArgList[i]); + std::vector args = Generate_Arg_List(); + for (int i = 0; i < args.size(); i++) { + Init(args[i]); Random rnd(test::RandomSeed() + 1); Add("", "v"); Test(&rnd); @@ -655,8 +684,9 @@ TEST(Harness, SimpleEmptyKey) { } TEST(Harness, SimpleSingle) { - for (int i = 0; i < kNumTestArgs; i++) { - Init(kTestArgList[i]); + std::vector args = Generate_Arg_List(); + for (int i = 0; i < args.size(); i++) { + Init(args[i]); Random rnd(test::RandomSeed() + 2); Add("abc", "v"); Test(&rnd); @@ -664,8 +694,9 @@ TEST(Harness, SimpleSingle) { } TEST(Harness, SimpleMulti) { - for (int i = 0; i < kNumTestArgs; i++) { - Init(kTestArgList[i]); + std::vector args = Generate_Arg_List(); + for (int i = 0; i < args.size(); i++) { + Init(args[i]); Random rnd(test::RandomSeed() + 3); Add("abc", "v"); Add("abcd", "v"); @@ -675,8 +706,9 @@ TEST(Harness, SimpleMulti) { } TEST(Harness, SimpleSpecialKey) { - for (int i = 0; i < kNumTestArgs; i++) { - Init(kTestArgList[i]); + std::vector args = Generate_Arg_List(); + for (int i = 0; i < args.size(); i++) { + Init(args[i]); Random rnd(test::RandomSeed() + 4); Add("\xff\xff", "v3"); Test(&rnd); @@ -684,14 +716,15 @@ TEST(Harness, SimpleSpecialKey) { } TEST(Harness, Randomized) { - for (int i = 0; i < kNumTestArgs; i++) { - Init(kTestArgList[i]); + std::vector args = Generate_Arg_List(); + for (int i = 0; i < args.size(); i++) { + Init(args[i]); Random rnd(test::RandomSeed() + 5); for (int num_entries = 0; num_entries < 2000; num_entries += (num_entries < 50 ? 1 : 200)) { if ((num_entries % 10) == 0) { fprintf(stderr, "case %d of %d: num_entries = %d\n", - (i + 1), int(kNumTestArgs), num_entries); + (i + 1), int(args.size()), num_entries); } for (int e = 0; e < num_entries; e++) { std::string v; @@ -705,7 +738,7 @@ TEST(Harness, Randomized) { TEST(Harness, RandomizedLongDB) { Random rnd(test::RandomSeed()); - TestArgs args = { DB_TEST, false, 16 }; + TestArgs args = { DB_TEST, false, 16, kNoCompression }; Init(args); int num_entries = 100000; for (int e = 0; e < num_entries; e++) { @@ -797,18 +830,7 @@ TEST(TableTest, ApproximateOffsetOfPlain) { } -static bool SnappyCompressionSupported() { - std::string out; - Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Snappy_Compress(in.data(), in.size(), &out); -} - -TEST(TableTest, ApproximateOffsetOfCompressed) { - if (!SnappyCompressionSupported()) { - fprintf(stderr, "skipping compression tests\n"); - return; - } - +static void Do_Compression_Test(CompressionType comp) { Random rnd(301); TableConstructor c(BytewiseComparator()); std::string tmp; @@ -820,7 +842,7 @@ TEST(TableTest, ApproximateOffsetOfCompressed) { KVMap kvmap; Options options; options.block_size = 1024; - options.compression = kSnappyCompression; + options.compression = comp; c.Finish(options, &keys, &kvmap); ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0)); @@ -831,6 +853,30 @@ TEST(TableTest, ApproximateOffsetOfCompressed) { ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 4000, 6000)); } +TEST(TableTest, ApproximateOffsetOfCompressed) { + CompressionType compression_state[2]; + int valid = 0; + if (!SnappyCompressionSupported()) { + fprintf(stderr, "skipping snappy compression tests\n"); + } else { + compression_state[valid] = kSnappyCompression; + valid++; + } + + if (!ZlibCompressionSupported()) { + fprintf(stderr, "skipping zlib compression tests\n"); + } else { + compression_state[valid] = kZlibCompression; + valid++; + } + + for(int i =0; i < valid; i++) + { + Do_Compression_Test(compression_state[i]); + } + +} + } // namespace leveldb int main(int argc, char** argv) {