Skip high levels with no key falling in the range in CompactRange (#6482)

Summary:
In CompactRange, if there is no key in memtable falling in the specified range, then flush is skipped.
This PR extends this skipping logic to SST file levels: it starts compaction from the highest level (starting from L0) that has files with key falling in the specified range, instead of always starts from L0.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6482

Test Plan:
A new test ManualCompactionTest::SkipLevel is added.

Also updated a test related to statistics of index block cache hit in db_test2, the index cache hit is increased by 1 in this PR because when checking overlap for the key range in L0, OverlapWithLevelIterator will do a seek in the table cache iterator, which will read from the cached index.

Also updated db_compaction_test and db_test to use correct range for full compaction.

Differential Revision: D20251149

Pulled By: cheng-chang

fbshipit-source-id: f822157cf4796972bd5035d9d7178d8dfb7af08b
This commit is contained in:
Cheng Chang 2020-03-04 20:12:23 -08:00 committed by Facebook Github Bot
parent e62fe50634
commit afb97094ae
6 changed files with 253 additions and 92 deletions

View File

@ -4,6 +4,9 @@
* Fix a bug where range tombstone blocks in ingested files were cached incorrectly during ingestion. If range tombstones were read from those incorrectly cached blocks, the keys they covered would be exposed. * Fix a bug where range tombstone blocks in ingested files were cached incorrectly during ingestion. If range tombstones were read from those incorrectly cached blocks, the keys they covered would be exposed.
* Fix a data race that might cause crash when calling DB::GetCreationTimeOfOldestFile() by a small chance. The bug was introduced in 6.6 Release. * Fix a data race that might cause crash when calling DB::GetCreationTimeOfOldestFile() by a small chance. The bug was introduced in 6.6 Release.
### Performance Improvements
* In CompactRange, for levels starting from 0, if the level does not have any file with any key falling in the specified range, the level is skipped. So instead of always compacting from level 0, the compaction starts from the first level with keys in the specified range until the last such level.
## 6.8.0 (02/24/2020) ## 6.8.0 (02/24/2020)
### Java API Changes ### Java API Changes
* Major breaking changes to Java comparators, toward standardizing on ByteBuffer for performant, locale-neutral operations on keys (#6252). * Major breaking changes to Java comparators, toward standardizing on ByteBuffer for performant, locale-neutral operations on keys (#6252).

View File

@ -2459,7 +2459,7 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) {
ASSERT_EQ("1,1,1", FilesPerLevel(1)); ASSERT_EQ("1,1,1", FilesPerLevel(1));
// Compaction range overlaps files // Compaction range overlaps files
Compact(1, "p1", "p9"); Compact(1, "p", "q");
ASSERT_EQ("0,0,1", FilesPerLevel(1)); ASSERT_EQ("0,0,1", FilesPerLevel(1));
// Populate a different range // Populate a different range
@ -2526,7 +2526,7 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
ASSERT_EQ("3", FilesPerLevel(1)); ASSERT_EQ("3", FilesPerLevel(1));
// Compaction range overlaps files // Compaction range overlaps files
Compact(1, "p1", "p9", 1); Compact(1, "p", "q", 1);
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,1", FilesPerLevel(1)); ASSERT_EQ("0,1", FilesPerLevel(1));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
@ -4655,7 +4655,7 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) {
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
MakeTables(3, "p", "q", 1); MakeTables(3, "p", "q", 1);
ASSERT_EQ("1,1,1", FilesPerLevel(1)); ASSERT_EQ("1,1,1", FilesPerLevel(1));
Compact(1, "p1", "p9"); Compact(1, "p", "q");
ASSERT_EQ(readahead, options.use_direct_reads); ASSERT_EQ(readahead, options.use_direct_reads);
ASSERT_EQ("0,0,1", FilesPerLevel(1)); ASSERT_EQ("0,0,1", FilesPerLevel(1));
Destroy(options); Destroy(options);

View File

@ -652,8 +652,6 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
return Status::InvalidArgument("Invalid target path ID"); return Status::InvalidArgument("Invalid target path ID");
} }
bool exclusive = options.exclusive_manual_compaction;
bool flush_needed = true; bool flush_needed = true;
if (begin != nullptr && end != nullptr) { if (begin != nullptr && end != nullptr) {
// TODO(ajkr): We could also optimize away the flush in certain cases where // TODO(ajkr): We could also optimize away the flush in certain cases where
@ -686,25 +684,9 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
} }
} }
int max_level_with_files = 0; constexpr int kInvalidLevel = -1;
// max_file_num_to_ignore can be used to filter out newly created SST files, int final_output_level = kInvalidLevel;
// useful for bottom level compaction in a manual compaction bool exclusive = options.exclusive_manual_compaction;
uint64_t max_file_num_to_ignore = port::kMaxUint64;
uint64_t next_file_number = port::kMaxUint64;
{
InstrumentedMutexLock l(&mutex_);
Version* base = cfd->current();
for (int level = 1; level < base->storage_info()->num_non_empty_levels();
level++) {
if (base->storage_info()->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
}
}
next_file_number = versions_->current_next_file_number();
}
int final_output_level = 0;
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal && if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
cfd->NumberLevels() > 1) { cfd->NumberLevels() > 1) {
// Always compact all files together. // Always compact all files together.
@ -715,10 +697,49 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
} }
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
final_output_level, options, begin, end, exclusive, final_output_level, options, begin, end, exclusive,
false, max_file_num_to_ignore); false, port::kMaxUint64);
} else { } else {
for (int level = 0; level <= max_level_with_files; level++) { int first_overlapped_level = kInvalidLevel;
int max_overlapped_level = kInvalidLevel;
{
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
Version* current_version = super_version->current;
ReadOptions ro;
ro.total_order_seek = true;
bool overlap;
for (int level = 0;
level < current_version->storage_info()->num_non_empty_levels();
level++) {
overlap = true;
if (begin != nullptr && end != nullptr) {
Status status = current_version->OverlapWithLevelIterator(
ro, file_options_, *begin, *end, level, &overlap);
if (!status.ok()) {
overlap = current_version->storage_info()->OverlapInLevel(
level, begin, end);
}
} else {
overlap = current_version->storage_info()->OverlapInLevel(level,
begin, end);
}
if (overlap) {
if (first_overlapped_level == kInvalidLevel) {
first_overlapped_level = level;
}
max_overlapped_level = level;
}
}
CleanupSuperVersion(super_version);
}
if (s.ok() && first_overlapped_level != kInvalidLevel) {
// max_file_num_to_ignore can be used to filter out newly created SST
// files, useful for bottom level compaction in a manual compaction
uint64_t max_file_num_to_ignore = port::kMaxUint64;
uint64_t next_file_number = versions_->current_next_file_number();
final_output_level = max_overlapped_level;
int output_level; int output_level;
for (int level = first_overlapped_level; level <= max_overlapped_level;
level++) {
// in case the compaction is universal or if we're compacting the // in case the compaction is universal or if we're compacting the
// bottom-most level, the output level will be the same as input one. // bottom-most level, the output level will be the same as input one.
// level 0 can never be the bottommost level (i.e. if all files are in // level 0 can never be the bottommost level (i.e. if all files are in
@ -726,7 +747,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal || if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
output_level = level; output_level = level;
} else if (level == max_level_with_files && level > 0) { } else if (level == max_overlapped_level && level > 0) {
if (options.bottommost_level_compaction == if (options.bottommost_level_compaction ==
BottommostLevelCompaction::kSkip) { BottommostLevelCompaction::kSkip) {
// Skip bottommost level compaction // Skip bottommost level compaction
@ -741,8 +762,8 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
} }
output_level = level; output_level = level;
// update max_file_num_to_ignore only for bottom level compaction // update max_file_num_to_ignore only for bottom level compaction
// because data in newly compacted files in middle levels may still need // because data in newly compacted files in middle levels may still
// to be pushed down // need to be pushed down
max_file_num_to_ignore = next_file_number; max_file_num_to_ignore = next_file_number;
} else { } else {
output_level = level + 1; output_level = level + 1;
@ -766,7 +787,8 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2"); TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
} }
} }
if (!s.ok()) { }
if (!s.ok() || final_output_level == kInvalidLevel) {
LogFlush(immutable_db_options_.info_log); LogFlush(immutable_db_options_.info_log);
return s; return s;
} }

View File

@ -4297,7 +4297,7 @@ TEST_P(DBTestWithParam, PreShutdownManualCompaction) {
ASSERT_EQ("1,1,1", FilesPerLevel(1)); ASSERT_EQ("1,1,1", FilesPerLevel(1));
// Compaction range overlaps files // Compaction range overlaps files
Compact(1, "p1", "p9"); Compact(1, "p", "q");
ASSERT_EQ("0,0,1", FilesPerLevel(1)); ASSERT_EQ("0,0,1", FilesPerLevel(1));
// Populate a different range // Populate a different range

View File

@ -1678,12 +1678,12 @@ TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) {
ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
} else { } else {
ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); ASSERT_EQ(ih + 4, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
} }
// Bloom and index hit will happen when a Get() happens. // Bloom and index hit will happen when a Get() happens.
@ -1692,12 +1692,12 @@ TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) {
ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); ASSERT_EQ(ih + 4, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
} else { } else {
ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); ASSERT_EQ(fh + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih + 4, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); ASSERT_EQ(ih + 5, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
} }
} }

View File

@ -4,8 +4,6 @@
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
// //
// Test for issue 178: a manual compaction causes deleted data to reappear. // Test for issue 178: a manual compaction causes deleted data to reappear.
#include <iostream>
#include <sstream>
#include <cstdlib> #include <cstdlib>
#include "port/port.h" #include "port/port.h"
@ -40,8 +38,8 @@ class ManualCompactionTest : public testing::Test {
public: public:
ManualCompactionTest() { ManualCompactionTest() {
// Get rid of any state from an old run. // Get rid of any state from an old run.
dbname_ = ROCKSDB_NAMESPACE::test::PerThreadDBPath("rocksdb_cbug_test"); dbname_ = test::PerThreadDBPath("rocksdb_manual_compaction_test");
DestroyDB(dbname_, ROCKSDB_NAMESPACE::Options()); DestroyDB(dbname_, Options());
} }
std::string dbname_; std::string dbname_;
@ -60,6 +58,33 @@ class DestroyAllCompactionFilter : public CompactionFilter {
const char* Name() const override { return "DestroyAllCompactionFilter"; } const char* Name() const override { return "DestroyAllCompactionFilter"; }
}; };
class LogCompactionFilter : public CompactionFilter {
public:
const char* Name() const override { return "LogCompactionFilter"; }
bool Filter(int level, const Slice& key, const Slice& /*existing_value*/,
std::string* /*new_value*/,
bool* /*value_changed*/) const override {
key_level_[key.ToString()] = level;
return false;
}
void Reset() { key_level_.clear(); }
size_t NumKeys() const { return key_level_.size(); }
int KeyLevel(const Slice& key) {
auto it = key_level_.find(key.ToString());
if (it == key_level_.end()) {
return -1;
}
return it->second;
}
private:
mutable std::map<std::string, int> key_level_;
};
TEST_F(ManualCompactionTest, CompactTouchesAllKeys) { TEST_F(ManualCompactionTest, CompactTouchesAllKeys) {
for (int iter = 0; iter < 2; ++iter) { for (int iter = 0; iter < 2; ++iter) {
DB* db; DB* db;
@ -71,7 +96,7 @@ TEST_F(ManualCompactionTest, CompactTouchesAllKeys) {
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
} }
options.create_if_missing = true; options.create_if_missing = true;
options.compression = ROCKSDB_NAMESPACE::kNoCompression; options.compression = kNoCompression;
options.compaction_filter = new DestroyAllCompactionFilter(); options.compaction_filter = new DestroyAllCompactionFilter();
ASSERT_OK(DB::Open(options, dbname_, &db)); ASSERT_OK(DB::Open(options, dbname_, &db));
@ -100,46 +125,45 @@ TEST_F(ManualCompactionTest, Test) {
// Open database. Disable compression since it affects the creation // Open database. Disable compression since it affects the creation
// of layers and the code below is trying to test against a very // of layers and the code below is trying to test against a very
// specific scenario. // specific scenario.
ROCKSDB_NAMESPACE::DB* db; DB* db;
ROCKSDB_NAMESPACE::Options db_options; Options db_options;
db_options.write_buffer_size = 1024; db_options.write_buffer_size = 1024;
db_options.create_if_missing = true; db_options.create_if_missing = true;
db_options.compression = ROCKSDB_NAMESPACE::kNoCompression; db_options.compression = kNoCompression;
ASSERT_OK(ROCKSDB_NAMESPACE::DB::Open(db_options, dbname_, &db)); ASSERT_OK(DB::Open(db_options, dbname_, &db));
// create first key range // create first key range
ROCKSDB_NAMESPACE::WriteBatch batch; WriteBatch batch;
for (int i = 0; i < kNumKeys; i++) { for (int i = 0; i < kNumKeys; i++) {
batch.Put(Key1(i), "value for range 1 key"); batch.Put(Key1(i), "value for range 1 key");
} }
ASSERT_OK(db->Write(ROCKSDB_NAMESPACE::WriteOptions(), &batch)); ASSERT_OK(db->Write(WriteOptions(), &batch));
// create second key range // create second key range
batch.Clear(); batch.Clear();
for (int i = 0; i < kNumKeys; i++) { for (int i = 0; i < kNumKeys; i++) {
batch.Put(Key2(i), "value for range 2 key"); batch.Put(Key2(i), "value for range 2 key");
} }
ASSERT_OK(db->Write(ROCKSDB_NAMESPACE::WriteOptions(), &batch)); ASSERT_OK(db->Write(WriteOptions(), &batch));
// delete second key range // delete second key range
batch.Clear(); batch.Clear();
for (int i = 0; i < kNumKeys; i++) { for (int i = 0; i < kNumKeys; i++) {
batch.Delete(Key2(i)); batch.Delete(Key2(i));
} }
ASSERT_OK(db->Write(ROCKSDB_NAMESPACE::WriteOptions(), &batch)); ASSERT_OK(db->Write(WriteOptions(), &batch));
// compact database // compact database
std::string start_key = Key1(0); std::string start_key = Key1(0);
std::string end_key = Key1(kNumKeys - 1); std::string end_key = Key1(kNumKeys - 1);
ROCKSDB_NAMESPACE::Slice least(start_key.data(), start_key.size()); Slice least(start_key.data(), start_key.size());
ROCKSDB_NAMESPACE::Slice greatest(end_key.data(), end_key.size()); Slice greatest(end_key.data(), end_key.size());
// commenting out the line below causes the example to work correctly // commenting out the line below causes the example to work correctly
db->CompactRange(CompactRangeOptions(), &least, &greatest); db->CompactRange(CompactRangeOptions(), &least, &greatest);
// count the keys // count the keys
ROCKSDB_NAMESPACE::Iterator* iter = Iterator* iter = db->NewIterator(ReadOptions());
db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions());
int num_keys = 0; int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
num_keys++; num_keys++;
@ -149,7 +173,119 @@ TEST_F(ManualCompactionTest, Test) {
// close database // close database
delete db; delete db;
DestroyDB(dbname_, ROCKSDB_NAMESPACE::Options()); DestroyDB(dbname_, Options());
}
TEST_F(ManualCompactionTest, SkipLevel) {
DB* db;
Options options;
options.num_levels = 3;
// Initially, flushed L0 files won't exceed 100.
options.level0_file_num_compaction_trigger = 100;
options.compaction_style = kCompactionStyleLevel;
options.create_if_missing = true;
options.compression = kNoCompression;
LogCompactionFilter* filter = new LogCompactionFilter();
options.compaction_filter = filter;
ASSERT_OK(DB::Open(options, dbname_, &db));
WriteOptions wo;
FlushOptions fo;
ASSERT_OK(db->Put(wo, "1", ""));
ASSERT_OK(db->Flush(fo));
ASSERT_OK(db->Put(wo, "2", ""));
ASSERT_OK(db->Flush(fo));
ASSERT_OK(db->Put(wo, "4", ""));
ASSERT_OK(db->Put(wo, "8", ""));
ASSERT_OK(db->Flush(fo));
{
// L0: 1, 2, [4, 8]
// no file has keys in range [5, 7]
Slice start("5");
Slice end("7");
filter->Reset();
db->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_EQ(0, filter->NumKeys());
}
{
// L0: 1, 2, [4, 8]
// [3, 7] overlaps with 4 in L0
Slice start("3");
Slice end("7");
filter->Reset();
db->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_EQ(2, filter->NumKeys());
ASSERT_EQ(0, filter->KeyLevel("4"));
ASSERT_EQ(0, filter->KeyLevel("8"));
}
{
// L0: 1, 2
// L1: [4, 8]
// no file has keys in range (-inf, 0]
Slice end("0");
filter->Reset();
db->CompactRange(CompactRangeOptions(), nullptr, &end);
ASSERT_EQ(0, filter->NumKeys());
}
{
// L0: 1, 2
// L1: [4, 8]
// no file has keys in range [9, inf)
Slice start("9");
filter->Reset();
db->CompactRange(CompactRangeOptions(), &start, nullptr);
ASSERT_EQ(0, filter->NumKeys());
}
{
// L0: 1, 2
// L1: [4, 8]
// [2, 2] overlaps with 2 in L0
Slice start("2");
Slice end("2");
filter->Reset();
db->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_EQ(1, filter->NumKeys());
ASSERT_EQ(0, filter->KeyLevel("2"));
}
{
// L0: 1
// L1: 2, [4, 8]
// [2, 5] overlaps with 2 and [4, 8) in L1, skip L0
Slice start("2");
Slice end("5");
filter->Reset();
db->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_EQ(3, filter->NumKeys());
ASSERT_EQ(1, filter->KeyLevel("2"));
ASSERT_EQ(1, filter->KeyLevel("4"));
ASSERT_EQ(1, filter->KeyLevel("8"));
}
{
// L0: 1
// L1: [2, 4, 8]
// [0, inf) overlaps all files
Slice start("0");
filter->Reset();
db->CompactRange(CompactRangeOptions(), &start, nullptr);
ASSERT_EQ(4, filter->NumKeys());
// 1 is first compacted to L1 and then further compacted into [2, 4, 8],
// so finally the logged level for 1 is L1.
ASSERT_EQ(1, filter->KeyLevel("1"));
ASSERT_EQ(1, filter->KeyLevel("2"));
ASSERT_EQ(1, filter->KeyLevel("4"));
ASSERT_EQ(1, filter->KeyLevel("8"));
}
delete filter;
delete db;
DestroyDB(dbname_, options);
} }
} // anonymous namespace } // anonymous namespace