diff --git a/db/c.cc b/db/c.cc index c775fc919..db88c7a83 100644 --- a/db/c.cc +++ b/db/c.cc @@ -502,6 +502,13 @@ void leveldb_options_set_compression(leveldb_options_t* opt, int t) { opt->rep.compression = static_cast(t); } +void leveldb_options_set_compression_options( + leveldb_options_t* opt, int w_bits, int level, int strategy) { + opt->rep.compression_opts.window_bits = w_bits; + opt->rep.compression_opts.level = level; + opt->rep.compression_opts.strategy = strategy; +} + void leveldb_options_set_disable_data_sync( leveldb_options_t* opt, bool disable_data_sync) { opt->rep.disableDataSync = disable_data_sync; diff --git a/db/c_test.c b/db/c_test.c index 979244715..a1e82b2f1 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -187,6 +187,7 @@ int main(int argc, char** argv) { leveldb_options_set_block_size(options, 1024); leveldb_options_set_block_restart_interval(options, 8); leveldb_options_set_compression(options, leveldb_no_compression); + leveldb_options_set_compression_options(options, -14, -1, 0); roptions = leveldb_readoptions_create(); leveldb_readoptions_set_verify_checksums(roptions, 1); diff --git a/db/db_bench.cc b/db/db_bench.cc index 86547bbbc..8e614d71e 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -179,8 +179,8 @@ static uint64_t FLAGS_delete_obsolete_files_period_micros = 0; static enum leveldb::CompressionType FLAGS_compression_type = leveldb::kSnappyCompression; -// Allows compression for levels 0 and 1 to be disabled when -// other levels are compressed +// Allows compression for levels 0 and 1 to be disabled when +// other levels are compressed static int FLAGS_min_level_to_compress = -1; static int FLAGS_table_cache_numshardbits = 4; @@ -509,15 +509,18 @@ class Benchmark { switch (FLAGS_compression_type) { case kSnappyCompression: - result = port::Snappy_Compress(text, strlen(text), &compressed); + result = port::Snappy_Compress(Options().compression_opts, text, + strlen(text), &compressed); name = "Snappy"; break; case kZlibCompression: - result = port::Zlib_Compress(text, strlen(text), &compressed); + result = port::Zlib_Compress(Options().compression_opts, text, + strlen(text), &compressed); name = "Zlib"; break; case kBZip2Compression: - result = port::BZip2_Compress(text, strlen(text), &compressed); + result = port::BZip2_Compress(Options().compression_opts, text, + strlen(text), &compressed); name = "BZip2"; break; } @@ -855,7 +858,8 @@ class Benchmark { bool ok = true; std::string compressed; while (ok && bytes < 1024 * 1048576) { // Compress 1G - ok = port::Snappy_Compress(input.data(), input.size(), &compressed); + ok = port::Snappy_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); produced += compressed.size(); bytes += input.size(); thread->stats.FinishedSingleOp(NULL); @@ -876,7 +880,8 @@ class Benchmark { RandomGenerator gen; Slice input = gen.Generate(Options().block_size); std::string compressed; - bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); + bool ok = port::Snappy_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); int64_t bytes = 0; char* uncompressed = new char[input.size()]; while (ok && bytes < 1024 * 1048576) { // Compress 1G @@ -928,7 +933,7 @@ class Benchmark { for (unsigned int i = 0; i < FLAGS_min_level_to_compress; i++) { options.compression_per_level[i] = kNoCompression; } - for (unsigned int i = FLAGS_min_level_to_compress; + for (unsigned int i = FLAGS_min_level_to_compress; i < FLAGS_num_levels; i++) { options.compression_per_level[i] = FLAGS_compression_type; } @@ -1352,8 +1357,8 @@ int main(int argc, char** argv) { else { fprintf(stdout, "Cannot parse %s\n", argv[i]); } - } else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1 - && n >= 0) { + } else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1 + && n >= 0) { FLAGS_min_level_to_compress = n; } else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { diff --git a/db/db_test.cc b/db/db_test.cc index e3d61ab74..d81a88369 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -20,23 +20,23 @@ namespace leveldb { -static bool SnappyCompressionSupported() { - std::string out; - Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Snappy_Compress(in.data(), in.size(), &out); -} - -static bool ZlibCompressionSupported() { - std::string out; - Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Zlib_Compress(in.data(), in.size(), &out); -} - -static bool BZip2CompressionSupported() { - std::string out; - Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::BZip2_Compress(in.data(), in.size(), &out); -} +static bool SnappyCompressionSupported(const CompressionOptions& options) { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Snappy_Compress(options, in.data(), in.size(), &out); +} + +static bool ZlibCompressionSupported(const CompressionOptions& options) { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Zlib_Compress(options, in.data(), in.size(), &out); +} + +static bool BZip2CompressionSupported(const CompressionOptions& options) { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::BZip2_Compress(options, in.data(), in.size(), &out); +} static std::string RandomString(Random* rnd, int len) { std::string r; @@ -1076,57 +1076,60 @@ TEST(DBTest, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1), 1); } -void MinLevelHelper(DBTest* self, Options& options) { - Random rnd(301); - - for (int num = 0; - num < options.level0_file_num_compaction_trigger - 1; - num++) - { - std::vector values; - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { - values.push_back(RandomString(&rnd, 10000)); - ASSERT_OK(self->Put(Key(i), values[i])); - } - self->dbfull()->TEST_WaitForCompactMemTable(); - ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1); - } - - //generate one more file in level-0, and should trigger level-0 compaction - std::vector values; - for (int i = 0; i < 12; i++) { - values.push_back(RandomString(&rnd, 10000)); - ASSERT_OK(self->Put(Key(i), values[i])); - } - self->dbfull()->TEST_WaitForCompact(); - - ASSERT_EQ(self->NumTableFilesAtLevel(0), 0); - ASSERT_EQ(self->NumTableFilesAtLevel(1), 1); -} - -TEST(DBTest, MinLevelToCompress) { - Options options = CurrentOptions(); - options.write_buffer_size = 100<<10; //100KB - options.num_levels = 3; - options.max_mem_compaction_level = 0; - options.level0_file_num_compaction_trigger = 3; +void MinLevelHelper(DBTest* self, Options& options) { + Random rnd(301); + + for (int num = 0; + num < options.level0_file_num_compaction_trigger - 1; + num++) + { + std::vector values; + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(self->Put(Key(i), values[i])); + } + self->dbfull()->TEST_WaitForCompactMemTable(); + ASSERT_EQ(self->NumTableFilesAtLevel(0), num + 1); + } + + //generate one more file in level-0, and should trigger level-0 compaction + std::vector values; + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(self->Put(Key(i), values[i])); + } + self->dbfull()->TEST_WaitForCompact(); + + ASSERT_EQ(self->NumTableFilesAtLevel(0), 0); + ASSERT_EQ(self->NumTableFilesAtLevel(1), 1); +} + +void MinLevelToCompress(CompressionType& type, Options& options, int wbits, + int lev, int strategy) { + fprintf(stderr, "Test with compression options : window_bits = %d, level = %d + , strategy = %d}\n", wbits, lev, strategy); + options.write_buffer_size = 100<<10; //100KB + options.num_levels = 3; + options.max_mem_compaction_level = 0; + options.level0_file_num_compaction_trigger = 3; options.create_if_missing = true; - CompressionType type; - - if (SnappyCompressionSupported()) { - type = kSnappyCompression; - fprintf(stderr, "using snappy\n"); - } else if (ZlibCompressionSupported()) { - type = kZlibCompression; - fprintf(stderr, "using zlib\n"); - } else if (BZip2CompressionSupported()) { - type = kBZip2Compression; - fprintf(stderr, "using bzip2\n"); - } else { - fprintf(stderr, "skipping test, compression disabled\n"); - return; - } + + if (SnappyCompressionSupported(CompressionOptions(wbits, lev, strategy))) { + type = kSnappyCompression; + fprintf(stderr, "using snappy\n"); + } else if (ZlibCompressionSupported( + CompressionOptions(wbits, lev, strategy))) { + type = kZlibCompression; + fprintf(stderr, "using zlib\n"); + } else if (BZip2CompressionSupported( + CompressionOptions(wbits, lev, strategy))) { + type = kBZip2Compression; + fprintf(stderr, "using bzip2\n"); + } else { + fprintf(stderr, "skipping test, compression disabled\n"); + return; + } options.compression_per_level = new CompressionType[options.num_levels]; // do not compress L0 @@ -1136,9 +1139,14 @@ TEST(DBTest, MinLevelToCompress) { for (int i = 1; i < options.num_levels; i++) { options.compression_per_level[i] = type; } - Reopen(&options); - MinLevelHelper(this, options); - +} +TEST(DBTest, MinLevelToCompress1) { + Options options = CurrentOptions(); + CompressionType type; + MinLevelToCompress(type, options, -14, -1, 0); + Reopen(&options); + MinLevelHelper(this, options); + // do not compress L0 and L1 for (int i = 0; i < 2; i++) { options.compression_per_level[i] = kNoCompression; @@ -1146,9 +1154,27 @@ TEST(DBTest, MinLevelToCompress) { for (int i = 2; i < options.num_levels; i++) { options.compression_per_level[i] = type; } - DestroyAndReopen(&options); - MinLevelHelper(this, options); -} + DestroyAndReopen(&options); + MinLevelHelper(this, options); +} + +TEST(DBTest, MinLevelToCompress2) { + Options options = CurrentOptions(); + CompressionType type; + MinLevelToCompress(type, options, 15, -1, 0); + Reopen(&options); + MinLevelHelper(this, options); + + // do not compress L0 and L1 + for (int i = 0; i < 2; i++) { + options.compression_per_level[i] = kNoCompression; + } + for (int i = 2; i < options.num_levels; i++) { + options.compression_per_level[i] = type; + } + DestroyAndReopen(&options); + MinLevelHelper(this, options); +} TEST(DBTest, RepeatedWritesToSameKey) { Options options = CurrentOptions(); @@ -1851,7 +1877,7 @@ TEST(DBTest, SnapshotFiles) { uint64_t size; ASSERT_OK(env_->GetFileSize(src, &size)); - // record the number and the size of the + // record the number and the size of the // latest manifest file if (ParseFileName(files[i].substr(1), &number, &type)) { if (type == kDescriptorFile) { @@ -1866,7 +1892,7 @@ TEST(DBTest, SnapshotFiles) { ASSERT_OK(env_->NewSequentialFile(src, &srcfile)); WritableFile* destfile; ASSERT_OK(env_->NewWritableFile(dest, &destfile)); - + char buffer[4096]; Slice slice; while (size > 0) { @@ -1889,7 +1915,7 @@ TEST(DBTest, SnapshotFiles) { extras.push_back(RandomString(&rnd, 100000)); ASSERT_OK(Put(Key(i), extras[i])); } - + // verify that data in the snapshot are correct Options opts; DB* snapdb; @@ -1905,7 +1931,7 @@ TEST(DBTest, SnapshotFiles) { } delete snapdb; - // look at the new live files after we added an 'extra' key + // look at the new live files after we added an 'extra' key // and after we took the first snapshot. uint64_t new_manifest_number = 0; uint64_t new_manifest_size = 0; @@ -1919,7 +1945,7 @@ TEST(DBTest, SnapshotFiles) { // previous shapshot. for (unsigned int i = 0; i < newfiles.size(); i++) { std::string src = dbname_ + "/" + newfiles[i]; - // record the lognumber and the size of the + // record the lognumber and the size of the // latest manifest file if (ParseFileName(newfiles[i].substr(1), &number, &type)) { if (type == kDescriptorFile) { @@ -1934,7 +1960,7 @@ TEST(DBTest, SnapshotFiles) { } ASSERT_EQ(manifest_number, new_manifest_number); ASSERT_GT(new_manifest_size, manifest_size); - + // release file snapshot dbfull()->DisableFileDeletions(); } @@ -1998,7 +2024,7 @@ TEST(DBTest, ReadCompaction) { // in some level, indicating that there was a compaction ASSERT_TRUE(NumTableFilesAtLevel(0) < l1 || NumTableFilesAtLevel(1) < l2 || - NumTableFilesAtLevel(2) < l3); + NumTableFilesAtLevel(2) < l3); delete options.block_cache; } } diff --git a/include/leveldb/options.h b/include/leveldb/options.h index cc50ba1f8..530d8d6d3 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -32,6 +32,19 @@ enum CompressionType { kBZip2Compression = 0x3 }; +// Compression options for different compression algorithms like Zlib +struct CompressionOptions { + int window_bits; + int level; + int strategy; + CompressionOptions():window_bits(-14), + level(-1), + strategy(0){} + CompressionOptions(int wbits, int lev, int strategy):window_bits(wbits), + level(lev), + strategy(strategy){} +}; + // Options to control the behavior of a database (passed to DB::Open) struct Options { // ------------------- @@ -144,10 +157,13 @@ struct Options { // reponsible for allocating memory and initializing the values in it // before invoking Open(). The caller is responsible for freeing this // array and it could be freed anytime after the return from Open(). - // This could have been a std::vector but that makes the equivalent + // This could have been a std::vector but that makes the equivalent // java/C api hard to construct. CompressionType* compression_per_level; + //different options for compression algorithms + CompressionOptions compression_opts; + // If non-NULL, use the specified filter policy to reduce disk reads. // Many applications will benefit from passing the result of // NewBloomFilterPolicy() here. diff --git a/port/port_posix.h b/port/port_posix.h index db4e0b8ca..e2540a426 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -44,6 +44,7 @@ #include #include #include +#include "leveldb/options.h" #include "port/atomic_pointer.h" #ifndef PLATFORM_IS_LITTLE_ENDIAN @@ -131,8 +132,8 @@ typedef pthread_once_t OnceType; #define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT extern void InitOnce(OnceType* once, void (*initializer)()); -inline bool Snappy_Compress(const char* input, size_t length, - ::std::string* output) { +inline bool Snappy_Compress(const CompressionOptions& opts, const char* input, + size_t length, ::std::string* output) { #ifdef SNAPPY output->resize(snappy::MaxCompressedLength(length)); size_t outlen; @@ -162,9 +163,8 @@ inline bool Snappy_Uncompress(const char* input, size_t length, #endif } -inline bool Zlib_Compress(const char* input, size_t length, - ::std::string* output, int windowBits = -14, int level = -1, - int strategy = 0) { +inline bool Zlib_Compress(const CompressionOptions& opts, const char* input, + size_t length, ::std::string* output) { #ifdef ZLIB // The memLevel parameter specifies how much memory should be allocated for // the internal compression state. @@ -174,8 +174,8 @@ inline bool Zlib_Compress(const char* input, size_t length, static const int memLevel = 8; z_stream _stream; memset(&_stream, 0, sizeof(z_stream)); - int st = deflateInit2(&_stream, level, Z_DEFLATED, windowBits, - memLevel, strategy); + int st = deflateInit2(&_stream, opts.level, Z_DEFLATED, opts.window_bits, + memLevel, opts.strategy); if (st != Z_OK) { return false; } @@ -284,8 +284,8 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length, return NULL; } -inline bool BZip2_Compress(const char* input, size_t length, - ::std::string* output) { +inline bool BZip2_Compress(const CompressionOptions& opts, const char* input, + size_t length, ::std::string* output) { #ifdef BZIP2 bz_stream _stream; memset(&_stream, 0, sizeof(bz_stream)); diff --git a/table/table_builder.cc b/table/table_builder.cc index 5306b2976..d867a1ca9 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -175,7 +175,8 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { case kSnappyCompression: { std::string* compressed = &r->compressed_output; - if (port::Snappy_Compress(raw.data(), raw.size(), compressed) && + if (port::Snappy_Compress(r->options.compression_opts, raw.data(), + raw.size(), compressed) && GoodCompressionRatio(compressed->size(), raw.size())) { block_contents = *compressed; } else { @@ -187,7 +188,8 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { break; } case kZlibCompression: - if (port::Zlib_Compress(raw.data(), raw.size(), compressed) && + if (port::Zlib_Compress(r->options.compression_opts, raw.data(), + raw.size(), compressed) && GoodCompressionRatio(compressed->size(), raw.size())) { block_contents = *compressed; } else { @@ -198,7 +200,8 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { } break; case kBZip2Compression: - if (port::BZip2_Compress(raw.data(), raw.size(), compressed) && + if (port::BZip2_Compress(r->options.compression_opts, raw.data(), + raw.size(), compressed) && GoodCompressionRatio(compressed->size(), raw.size())) { block_contents = *compressed; } else { diff --git a/table/table_test.cc b/table/table_test.cc index dd0ba6d5f..c78c4689d 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -247,6 +247,7 @@ class TableConstructor: public Constructor { source_ = new StringSource(sink.contents()); Options table_options; table_options.comparator = options.comparator; + table_options.compression_opts = options.compression_opts; return Table::Open(table_options, source_, sink.contents().size(), &table_); } @@ -399,19 +400,22 @@ class DBConstructor: public Constructor { static bool SnappyCompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Snappy_Compress(in.data(), in.size(), &out); + return port::Snappy_Compress(Options().compression_opts, in.data(), in.size(), + &out); } static bool ZlibCompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Zlib_Compress(in.data(), in.size(), &out); + return port::Zlib_Compress(Options().compression_opts, in.data(), in.size(), + &out); } static bool BZip2CompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::BZip2_Compress(in.data(), in.size(), &out); + return port::BZip2_Compress(Options().compression_opts, in.data(), in.size(), + &out); } enum TestType { diff --git a/util/options.cc b/util/options.cc index 4a2d53456..765b72e5e 100644 --- a/util/options.cc +++ b/util/options.cc @@ -71,7 +71,7 @@ Options::Dump( Log(log,"Options.block_restart_interval: %d", block_restart_interval); if (compression_per_level != NULL) { for (unsigned int i = 0; i < num_levels; i++){ - Log(log," Options.compression[%d]: %d", + Log(log," Options.compression[%d]: %d", i, compression_per_level[i]); } } else { @@ -85,6 +85,12 @@ Options::Dump( Log(log," Options.max_log_file_size: %d", max_log_file_size); Log(log," Options.db_stats_log_interval: %d", db_stats_log_interval); + Log(log," Options.compression_opts.window_bits: %d", + compression_opts.window_bits); + Log(log," Options.compression_opts.level: %d", + compression_opts.level); + Log(log," Options.compression_opts.strategy: %d", + compression_opts.strategy); Log(log," Options.level0_file_num_compaction_trigger: %d", level0_file_num_compaction_trigger); Log(log," Options.level0_slowdown_writes_trigger: %d",