Merge branch 'master' into columnfamilies
This commit is contained in:
commit
e86d7dffd7
@ -2827,6 +2827,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
|||||||
compact->CleanupBatchBuffer();
|
compact->CleanupBatchBuffer();
|
||||||
compact->CleanupMergedBuffer();
|
compact->CleanupMergedBuffer();
|
||||||
compact->cur_prefix_ = kNullString;
|
compact->cur_prefix_ = kNullString;
|
||||||
|
bool prefix_initialized = false;
|
||||||
|
|
||||||
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
|
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
|
||||||
ColumnFamilyData* cfd = compact->compaction->column_family_data();
|
ColumnFamilyData* cfd = compact->compaction->column_family_data();
|
||||||
@ -2903,8 +2904,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
|||||||
const SliceTransform* transformer =
|
const SliceTransform* transformer =
|
||||||
cfd->options()->compaction_filter_factory_v2->GetPrefixExtractor();
|
cfd->options()->compaction_filter_factory_v2->GetPrefixExtractor();
|
||||||
std::string key_prefix = transformer->Transform(key).ToString();
|
std::string key_prefix = transformer->Transform(key).ToString();
|
||||||
if (compact->cur_prefix_ == kNullString) {
|
if (!prefix_initialized) {
|
||||||
compact->cur_prefix_ = key_prefix;
|
compact->cur_prefix_ = key_prefix;
|
||||||
|
prefix_initialized = true;
|
||||||
}
|
}
|
||||||
if (!ParseInternalKey(key, &ikey)) {
|
if (!ParseInternalKey(key, &ikey)) {
|
||||||
// log error
|
// log error
|
||||||
|
@ -3969,6 +3969,56 @@ TEST(DBTest, CompactionFilterV2WithValueChange) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(DBTest, CompactionFilterV2NULLPrefix) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.num_levels = 3;
|
||||||
|
options.max_mem_compaction_level = 0;
|
||||||
|
auto prefix_extractor = NewFixedPrefixTransform(8);
|
||||||
|
options.compaction_filter_factory_v2 =
|
||||||
|
std::make_shared<ChangeFilterFactoryV2>(prefix_extractor);
|
||||||
|
// In a testing environment, we can only flush the application
|
||||||
|
// compaction filter buffer using universal compaction
|
||||||
|
option_config_ = kUniversalCompaction;
|
||||||
|
options.compaction_style = (rocksdb::CompactionStyle)1;
|
||||||
|
Reopen(&options);
|
||||||
|
|
||||||
|
// Write 100K+1 keys, these are written to a few files
|
||||||
|
// in L0. We do this so that the current snapshot points
|
||||||
|
// to the 100001 key.The compaction filter is not invoked
|
||||||
|
// on keys that are visible via a snapshot because we
|
||||||
|
// anyways cannot delete it.
|
||||||
|
const std::string value(10, 'x');
|
||||||
|
char first_key[100];
|
||||||
|
snprintf(first_key, sizeof(first_key), "%s0000%010d", "NULL", 1);
|
||||||
|
Put(first_key, value);
|
||||||
|
for (int i = 1; i < 100000; i++) {
|
||||||
|
char key[100];
|
||||||
|
snprintf(key, sizeof(key), "%08d%010d", i, i);
|
||||||
|
Put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
char last_key[100];
|
||||||
|
snprintf(last_key, sizeof(last_key), "%s0000%010d", "NULL", 2);
|
||||||
|
Put(last_key, value);
|
||||||
|
|
||||||
|
// push all files to lower levels
|
||||||
|
dbfull()->TEST_FlushMemTable();
|
||||||
|
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
|
||||||
|
|
||||||
|
// verify that all keys now have the new value that
|
||||||
|
// was set by the compaction process.
|
||||||
|
std::string newvalue = Get(first_key);
|
||||||
|
ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
|
||||||
|
newvalue = Get(last_key);
|
||||||
|
ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
|
||||||
|
for (int i = 1; i < 100000; i++) {
|
||||||
|
char key[100];
|
||||||
|
snprintf(key, sizeof(key), "%08d%010d", i, i);
|
||||||
|
std::string newvalue = Get(key);
|
||||||
|
ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST(DBTest, SparseMerge) {
|
TEST(DBTest, SparseMerge) {
|
||||||
do {
|
do {
|
||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
|
@ -18,10 +18,13 @@ bool MergeOperator::PartialMergeMulti(const Slice& key,
|
|||||||
const std::deque<Slice>& operand_list,
|
const std::deque<Slice>& operand_list,
|
||||||
std::string* new_value,
|
std::string* new_value,
|
||||||
Logger* logger) const {
|
Logger* logger) const {
|
||||||
|
assert(operand_list.size() >= 2);
|
||||||
// Simply loop through the operands
|
// Simply loop through the operands
|
||||||
std::string temp_value;
|
std::string temp_value;
|
||||||
Slice temp_slice;
|
Slice temp_slice(operand_list[0]);
|
||||||
for (const auto& operand : operand_list) {
|
|
||||||
|
for (int i = 1; i < operand_list.size(); ++i) {
|
||||||
|
auto& operand = operand_list[i];
|
||||||
if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) {
|
if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1363,7 +1363,7 @@ class PosixEnv : public Env {
|
|||||||
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options) const {
|
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options) const {
|
||||||
EnvOptions optimized = env_options;
|
EnvOptions optimized = env_options;
|
||||||
optimized.use_mmap_writes = false;
|
optimized.use_mmap_writes = false;
|
||||||
optimized.fallocate_with_keep_size = true;
|
optimized.fallocate_with_keep_size = false;
|
||||||
return optimized;
|
return optimized;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user