Smarter purging during flush
Summary: Currently, we only purge duplicate keys and deletions during flush if `earliest_seqno_in_memtable <= newest_snapshot`. This means that the newest snapshot happened before we first created the memtable. This is almost never true for MyRocks and MongoRocks. This patch makes purging during flush able to understand snapshots. The main logic is copied from compaction_job.cc, although the logic over there is much more complicated and extensive. However, we should try to merge the common functionality at some point. I need this patch to implement no_overwrite_i_promise functionality for flush. We'll also need this to support SingleDelete() during Flush(). @yoshinorim requested the feature. Test Plan: make check I had to adjust some unit tests to understand this new behavior Reviewers: yhchiang, yoshinorim, anthony, sdong, noetzli Reviewed By: noetzli Subscribers: yoshinorim, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D42087
This commit is contained in:
parent
4c81ac0c59
commit
4ab26c5ad1
181
db/builder.cc
181
db/builder.cc
@ -9,6 +9,7 @@
|
||||
|
||||
#include "db/builder.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <deque>
|
||||
#include <vector>
|
||||
|
||||
@ -31,6 +32,28 @@
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
namespace {
|
||||
inline SequenceNumber EarliestVisibleSnapshot(
|
||||
SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
|
||||
SequenceNumber* prev_snapshot) {
|
||||
if (snapshots.empty()) {
|
||||
*prev_snapshot = 0; // 0 means no previous snapshot
|
||||
return kMaxSequenceNumber;
|
||||
}
|
||||
SequenceNumber prev = 0;
|
||||
for (const auto cur : snapshots) {
|
||||
assert(prev <= cur);
|
||||
if (cur >= in) {
|
||||
*prev_snapshot = prev;
|
||||
return cur;
|
||||
}
|
||||
prev = cur; // assignment
|
||||
}
|
||||
*prev_snapshot = prev;
|
||||
return kMaxSequenceNumber;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
class TableFactory;
|
||||
|
||||
TableBuilder* NewTableBuilder(
|
||||
@ -53,9 +76,7 @@ Status BuildTable(
|
||||
FileMetaData* meta, const InternalKeyComparator& internal_comparator,
|
||||
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
|
||||
int_tbl_prop_collector_factories,
|
||||
const SequenceNumber newest_snapshot,
|
||||
const SequenceNumber earliest_seqno_in_memtable,
|
||||
const CompressionType compression,
|
||||
std::vector<SequenceNumber> snapshots, const CompressionType compression,
|
||||
const CompressionOptions& compression_opts, bool paranoid_file_checks,
|
||||
InternalStats* internal_stats, const Env::IOPriority io_priority,
|
||||
TableProperties* table_properties) {
|
||||
@ -66,14 +87,6 @@ Status BuildTable(
|
||||
meta->smallest_seqno = meta->largest_seqno = 0;
|
||||
iter->SeekToFirst();
|
||||
|
||||
// If the sequence number of the smallest entry in the memtable is
|
||||
// smaller than the most recent snapshot, then we do not trigger
|
||||
// removal of duplicate/deleted keys as part of this builder.
|
||||
bool purge = true;
|
||||
if (earliest_seqno_in_memtable <= newest_snapshot) {
|
||||
purge = false;
|
||||
}
|
||||
|
||||
std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
|
||||
meta->fd.GetPathId());
|
||||
if (iter->Valid()) {
|
||||
@ -107,40 +120,62 @@ Status BuildTable(
|
||||
ioptions.min_partial_merge_operands,
|
||||
true /* internal key corruption is not ok */);
|
||||
|
||||
if (purge) {
|
||||
// Ugly walkaround to avoid compiler error for release build
|
||||
bool ok __attribute__((unused)) = true;
|
||||
|
||||
// Will write to builder if current key != prev key
|
||||
ParsedInternalKey prev_ikey;
|
||||
std::string prev_key;
|
||||
bool is_first_key = true; // Also write if this is the very first key
|
||||
IterKey current_user_key;
|
||||
bool has_current_user_key = false;
|
||||
// If has_current_user_key == true, this variable remembers the earliest
|
||||
// snapshot in which this current key already exists. If two internal keys
|
||||
// have the same user key AND the earlier one should be visible in the
|
||||
// snapshot in which we already have a user key, we can drop the earlier
|
||||
// user key
|
||||
SequenceNumber current_user_key_exists_in_snapshot = kMaxSequenceNumber;
|
||||
|
||||
while (iter->Valid()) {
|
||||
bool iterator_at_next = false;
|
||||
|
||||
// Get current key
|
||||
ParsedInternalKey this_ikey;
|
||||
ParsedInternalKey ikey;
|
||||
Slice key = iter->key();
|
||||
Slice value = iter->value();
|
||||
|
||||
// In-memory key corruption is not ok;
|
||||
// TODO: find a clean way to treat in memory key corruption
|
||||
ok = ParseInternalKey(key, &this_ikey);
|
||||
// Ugly walkaround to avoid compiler error for release build
|
||||
bool ok __attribute__((unused)) = true;
|
||||
ok = ParseInternalKey(key, &ikey);
|
||||
assert(ok);
|
||||
assert(this_ikey.sequence >= earliest_seqno_in_memtable);
|
||||
|
||||
meta->smallest_seqno = std::min(meta->smallest_seqno, ikey.sequence);
|
||||
meta->largest_seqno = std::max(meta->largest_seqno, ikey.sequence);
|
||||
|
||||
// If the key is the same as the previous key (and it is not the
|
||||
// first key), then we skip it, since it is an older version.
|
||||
// Otherwise we output the key and mark it as the "new" previous key.
|
||||
if (!is_first_key && !internal_comparator.user_comparator()->Compare(
|
||||
prev_ikey.user_key, this_ikey.user_key)) {
|
||||
// seqno within the same key are in decreasing order
|
||||
assert(this_ikey.sequence < prev_ikey.sequence);
|
||||
} else {
|
||||
is_first_key = false;
|
||||
if (!has_current_user_key ||
|
||||
internal_comparator.user_comparator()->Compare(
|
||||
ikey.user_key, current_user_key.GetKey()) != 0) {
|
||||
// First occurrence of this user key
|
||||
current_user_key.SetKey(ikey.user_key);
|
||||
has_current_user_key = true;
|
||||
current_user_key_exists_in_snapshot = 0;
|
||||
}
|
||||
|
||||
// If there are no snapshots, then this kv affect visibility at tip.
|
||||
// Otherwise, search though all existing snapshots to find
|
||||
// the earlist snapshot that is affected by this kv.
|
||||
SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
|
||||
SequenceNumber key_needs_to_exist_in_snapshot =
|
||||
EarliestVisibleSnapshot(ikey.sequence, snapshots, &prev_snapshot);
|
||||
|
||||
if (current_user_key_exists_in_snapshot ==
|
||||
key_needs_to_exist_in_snapshot) {
|
||||
// If this user key already exists in snapshot in which it needs to
|
||||
// exist, we can drop it.
|
||||
// In other words, if the earliest snapshot is which this key is visible
|
||||
// in is the same as the visibily of a previous instance of the
|
||||
// same key, then this kv is not visible in any snapshot.
|
||||
// Hidden by an newer entry for same user key
|
||||
iter->Next();
|
||||
} else if (ikey.type == kTypeMerge) {
|
||||
meta->largest.DecodeFrom(key);
|
||||
|
||||
if (this_ikey.type == kTypeMerge) {
|
||||
// TODO(tbd): Add a check here to prevent RocksDB from crash when
|
||||
// reopening a DB w/o properly specifying the merge operator. But
|
||||
// currently we observed a memory leak on failing in RocksDB
|
||||
@ -150,71 +185,49 @@ Status BuildTable(
|
||||
|
||||
// Handle merge-type keys using the MergeHelper
|
||||
// TODO: pass statistics to MergeUntil
|
||||
merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
|
||||
iterator_at_next = true;
|
||||
merge.MergeUntil(iter, prev_snapshot, false, nullptr, env);
|
||||
// IMPORTANT: Slice key doesn't point to a valid value anymore!!
|
||||
|
||||
// Write them out one-by-one. (Proceed back() to front())
|
||||
// If the merge successfully merged the input into
|
||||
// a kTypeValue, the list contains a single element.
|
||||
const std::deque<std::string>& keys = merge.keys();
|
||||
const std::deque<std::string>& values = merge.values();
|
||||
assert(keys.size() == values.size() && keys.size() >= 1);
|
||||
std::deque<std::string>::const_reverse_iterator key_iter;
|
||||
std::deque<std::string>::const_reverse_iterator value_iter;
|
||||
for (key_iter = keys.rbegin(), value_iter = values.rbegin();
|
||||
key_iter != keys.rend() && value_iter != values.rend();
|
||||
++key_iter, ++value_iter) {
|
||||
builder->Add(Slice(*key_iter), Slice(*value_iter));
|
||||
}
|
||||
const auto& keys = merge.keys();
|
||||
const auto& values = merge.values();
|
||||
assert(!keys.empty());
|
||||
assert(keys.size() == values.size());
|
||||
|
||||
// Sanity check. Both iterators should end at the same time
|
||||
assert(key_iter == keys.rend() && value_iter == values.rend());
|
||||
// largest possible sequence number in a merge queue is already stored
|
||||
// in ikey.sequence.
|
||||
// we additionally have to consider the front of the merge queue, which
|
||||
// might have the smallest sequence number (out of all the merges with
|
||||
// the same key)
|
||||
meta->smallest_seqno =
|
||||
std::min(meta->smallest_seqno, GetInternalKeySeqno(keys.front()));
|
||||
|
||||
prev_key.assign(keys.front());
|
||||
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
|
||||
assert(ok);
|
||||
} else {
|
||||
// Handle Put/Delete-type keys by simply writing them
|
||||
// We have a list of keys to write, write all keys in the list.
|
||||
for (auto key_iter = keys.rbegin(), value_iter = values.rbegin();
|
||||
key_iter != keys.rend(); key_iter++, value_iter++) {
|
||||
key = Slice(*key_iter);
|
||||
value = Slice(*value_iter);
|
||||
bool valid_key __attribute__((__unused__)) =
|
||||
ParseInternalKey(key, &ikey);
|
||||
// MergeUntil stops when it encounters a corrupt key and does not
|
||||
// include them in the result, so we expect the keys here to valid.
|
||||
assert(valid_key);
|
||||
builder->Add(key, value);
|
||||
prev_key.assign(key.data(), key.size());
|
||||
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
|
||||
assert(ok);
|
||||
}
|
||||
}
|
||||
|
||||
if (io_priority == Env::IO_HIGH &&
|
||||
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
|
||||
ThreadStatusUtil::IncreaseThreadOperationProperty(
|
||||
ThreadStatus::FLUSH_BYTES_WRITTEN,
|
||||
IOSTATS(bytes_written));
|
||||
IOSTATS_RESET(bytes_written);
|
||||
}
|
||||
if (!iterator_at_next) iter->Next();
|
||||
}
|
||||
|
||||
// The last key is the largest key
|
||||
meta->largest.DecodeFrom(Slice(prev_key));
|
||||
SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key));
|
||||
meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
|
||||
meta->largest_seqno = std::max(meta->largest_seqno, seqno);
|
||||
|
||||
} else {
|
||||
for (; iter->Valid(); iter->Next()) {
|
||||
Slice key = iter->key();
|
||||
} else { // just write out the key-value
|
||||
builder->Add(key, value);
|
||||
meta->largest.DecodeFrom(key);
|
||||
builder->Add(key, iter->value());
|
||||
SequenceNumber seqno = GetInternalKeySeqno(key);
|
||||
meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
|
||||
meta->largest_seqno = std::max(meta->largest_seqno, seqno);
|
||||
iter->Next();
|
||||
}
|
||||
|
||||
current_user_key_exists_in_snapshot = key_needs_to_exist_in_snapshot;
|
||||
|
||||
if (io_priority == Env::IO_HIGH &&
|
||||
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
|
||||
ThreadStatusUtil::IncreaseThreadOperationProperty(
|
||||
ThreadStatus::FLUSH_BYTES_WRITTEN,
|
||||
IOSTATS(bytes_written));
|
||||
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
|
||||
IOSTATS_RESET(bytes_written);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Finish and check for builder errors
|
||||
if (s.ok()) {
|
||||
|
@ -52,9 +52,7 @@ extern Status BuildTable(
|
||||
FileMetaData* meta, const InternalKeyComparator& internal_comparator,
|
||||
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
|
||||
int_tbl_prop_collector_factories,
|
||||
const SequenceNumber newest_snapshot,
|
||||
const SequenceNumber earliest_seqno_in_memtable,
|
||||
const CompressionType compression,
|
||||
std::vector<SequenceNumber> snapshots, const CompressionType compression,
|
||||
const CompressionOptions& compression_opts, bool paranoid_file_checks,
|
||||
InternalStats* internal_stats,
|
||||
const Env::IOPriority io_priority = Env::IO_HIGH,
|
||||
|
@ -1274,9 +1274,6 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||
TableProperties table_properties;
|
||||
{
|
||||
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
|
||||
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
|
||||
const SequenceNumber earliest_seqno_in_memtable =
|
||||
mem->GetFirstSequenceNumber();
|
||||
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
|
||||
"[%s] [WriteLevel0TableForRecovery]"
|
||||
" Level-0 table #%" PRIu64 ": started",
|
||||
@ -1290,8 +1287,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||
s = BuildTable(
|
||||
dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(),
|
||||
iter.get(), &meta, cfd->internal_comparator(),
|
||||
cfd->int_tbl_prop_collector_factories(), newest_snapshot,
|
||||
earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()),
|
||||
cfd->int_tbl_prop_collector_factories(), snapshots_.GetAll(),
|
||||
GetCompressionFlush(*cfd->ioptions()),
|
||||
cfd->ioptions()->compression_opts, paranoid_file_checks,
|
||||
cfd->internal_stats(), Env::IO_HIGH, &info.table_properties);
|
||||
LogFlush(db_options_.info_log);
|
||||
@ -1348,7 +1345,7 @@ Status DBImpl::FlushMemTableToOutputFile(
|
||||
|
||||
FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options,
|
||||
env_options_, versions_.get(), &mutex_, &shutting_down_,
|
||||
snapshots_.GetNewest(), job_context, log_buffer,
|
||||
snapshots_.GetAll(), job_context, log_buffer,
|
||||
directories_.GetDbDir(), directories_.GetDataDir(0U),
|
||||
GetCompressionFlush(*cfd->ioptions()), stats_,
|
||||
&event_logger_);
|
||||
|
@ -3280,22 +3280,16 @@ TEST_F(DBTest, CompactBetweenSnapshots) {
|
||||
Put(1, "foo", "sixth");
|
||||
|
||||
// All entries (including duplicates) exist
|
||||
// before any compaction is triggered.
|
||||
ASSERT_OK(Flush(1));
|
||||
ASSERT_EQ("sixth", Get(1, "foo"));
|
||||
ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
|
||||
ASSERT_EQ("first", Get(1, "foo", snapshot1));
|
||||
// before any compaction or flush is triggered.
|
||||
ASSERT_EQ(AllEntriesFor("foo", 1),
|
||||
"[ sixth, fifth, fourth, third, second, first ]");
|
||||
|
||||
// After a compaction, "second", "third" and "fifth" should
|
||||
// be removed
|
||||
FillLevels("a", "z", 1);
|
||||
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
|
||||
nullptr);
|
||||
ASSERT_EQ("sixth", Get(1, "foo"));
|
||||
ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
|
||||
ASSERT_EQ("first", Get(1, "foo", snapshot1));
|
||||
|
||||
// After a flush, "second", "third" and "fifth" should
|
||||
// be removed
|
||||
ASSERT_OK(Flush(1));
|
||||
ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
|
||||
|
||||
// after we release the snapshot1, only two values left
|
||||
|
@ -60,9 +60,9 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
|
||||
const EnvOptions& env_options, VersionSet* versions,
|
||||
InstrumentedMutex* db_mutex,
|
||||
std::atomic<bool>* shutting_down,
|
||||
SequenceNumber newest_snapshot, JobContext* job_context,
|
||||
LogBuffer* log_buffer, Directory* db_directory,
|
||||
Directory* output_file_directory,
|
||||
std::vector<SequenceNumber> existing_snapshots,
|
||||
JobContext* job_context, LogBuffer* log_buffer,
|
||||
Directory* db_directory, Directory* output_file_directory,
|
||||
CompressionType output_compression, Statistics* stats,
|
||||
EventLogger* event_logger)
|
||||
: dbname_(dbname),
|
||||
@ -73,7 +73,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
|
||||
versions_(versions),
|
||||
db_mutex_(db_mutex),
|
||||
shutting_down_(shutting_down),
|
||||
newest_snapshot_(newest_snapshot),
|
||||
existing_snapshots_(std::move(existing_snapshots)),
|
||||
job_context_(job_context),
|
||||
log_buffer_(log_buffer),
|
||||
db_directory_(db_directory),
|
||||
@ -188,8 +188,6 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
|
||||
// path 0 for level 0 file.
|
||||
meta->fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
|
||||
|
||||
const SequenceNumber earliest_seqno_in_memtable =
|
||||
mems[0]->GetFirstSequenceNumber();
|
||||
Version* base = cfd_->current();
|
||||
base->Ref(); // it is likely that we do not need this reference
|
||||
Status s;
|
||||
@ -234,9 +232,8 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
|
||||
s = BuildTable(
|
||||
dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
|
||||
cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(),
|
||||
cfd_->int_tbl_prop_collector_factories(), newest_snapshot_,
|
||||
earliest_seqno_in_memtable, output_compression_,
|
||||
cfd_->ioptions()->compression_opts,
|
||||
cfd_->int_tbl_prop_collector_factories(), existing_snapshots_,
|
||||
output_compression_, cfd_->ioptions()->compression_opts,
|
||||
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
|
||||
Env::IO_HIGH, &info.table_properties);
|
||||
LogFlush(db_options_.info_log);
|
||||
|
@ -57,10 +57,11 @@ class FlushJob {
|
||||
const MutableCFOptions& mutable_cf_options,
|
||||
const EnvOptions& env_options, VersionSet* versions,
|
||||
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
|
||||
SequenceNumber newest_snapshot, JobContext* job_context,
|
||||
LogBuffer* log_buffer, Directory* db_directory,
|
||||
Directory* output_file_directory, CompressionType output_compression,
|
||||
Statistics* stats, EventLogger* event_logger);
|
||||
std::vector<SequenceNumber> existing_snapshots,
|
||||
JobContext* job_context, LogBuffer* log_buffer,
|
||||
Directory* db_directory, Directory* output_file_directory,
|
||||
CompressionType output_compression, Statistics* stats,
|
||||
EventLogger* event_logger);
|
||||
|
||||
~FlushJob();
|
||||
|
||||
@ -80,7 +81,7 @@ class FlushJob {
|
||||
VersionSet* versions_;
|
||||
InstrumentedMutex* db_mutex_;
|
||||
std::atomic<bool>* shutting_down_;
|
||||
SequenceNumber newest_snapshot_;
|
||||
std::vector<SequenceNumber> existing_snapshots_;
|
||||
JobContext* job_context_;
|
||||
LogBuffer* log_buffer_;
|
||||
Directory* db_directory_;
|
||||
|
@ -3,6 +3,7 @@
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#include <algorithm>
|
||||
#include <map>
|
||||
#include <string>
|
||||
|
||||
@ -91,7 +92,7 @@ TEST_F(FlushJobTest, Empty) {
|
||||
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
|
||||
db_options_, *cfd->GetLatestMutableCFOptions(),
|
||||
env_options_, versions_.get(), &mutex_, &shutting_down_,
|
||||
SequenceNumber(), &job_context, nullptr, nullptr, nullptr,
|
||||
{}, &job_context, nullptr, nullptr, nullptr,
|
||||
kNoCompression, nullptr, &event_logger);
|
||||
ASSERT_OK(flush_job.Run());
|
||||
job_context.Clean();
|
||||
@ -104,9 +105,17 @@ TEST_F(FlushJobTest, NonEmpty) {
|
||||
kMaxSequenceNumber);
|
||||
new_mem->Ref();
|
||||
mock::MockFileContents inserted_keys;
|
||||
// Test data:
|
||||
// seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ]
|
||||
// key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ]
|
||||
// Expected:
|
||||
// smallest_key = "0"
|
||||
// largest_key = "9999"
|
||||
// smallest_seqno = 1
|
||||
// smallest_seqno = 9999
|
||||
for (int i = 1; i < 10000; ++i) {
|
||||
std::string key(ToString(i));
|
||||
std::string value("value" + ToString(i));
|
||||
std::string key(ToString((i + 1000) % 10000));
|
||||
std::string value("value" + key);
|
||||
new_mem->Add(SequenceNumber(i), kTypeValue, key, value);
|
||||
InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
|
||||
inserted_keys.insert({internal_key.Encode().ToString(), value});
|
||||
@ -122,7 +131,71 @@ TEST_F(FlushJobTest, NonEmpty) {
|
||||
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
|
||||
db_options_, *cfd->GetLatestMutableCFOptions(),
|
||||
env_options_, versions_.get(), &mutex_, &shutting_down_,
|
||||
SequenceNumber(), &job_context, nullptr, nullptr, nullptr,
|
||||
{}, &job_context, nullptr, nullptr, nullptr,
|
||||
kNoCompression, nullptr, &event_logger);
|
||||
FileMetaData fd;
|
||||
mutex_.Lock();
|
||||
ASSERT_OK(flush_job.Run(&fd));
|
||||
mutex_.Unlock();
|
||||
ASSERT_EQ(ToString(0), fd.smallest.user_key().ToString());
|
||||
ASSERT_EQ(ToString(9999), fd.largest.user_key().ToString());
|
||||
ASSERT_EQ(1, fd.smallest_seqno);
|
||||
ASSERT_EQ(9999, fd.largest_seqno);
|
||||
mock_table_factory_->AssertSingleFile(inserted_keys);
|
||||
job_context.Clean();
|
||||
}
|
||||
|
||||
TEST_F(FlushJobTest, Snapshots) {
|
||||
JobContext job_context(0);
|
||||
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
|
||||
auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
|
||||
kMaxSequenceNumber);
|
||||
|
||||
std::vector<SequenceNumber> snapshots;
|
||||
std::set<SequenceNumber> snapshots_set;
|
||||
int keys = 10000;
|
||||
int max_inserts_per_keys = 8;
|
||||
|
||||
Random rnd(301);
|
||||
for (int i = 0; i < keys / 2; ++i) {
|
||||
snapshots.push_back(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1);
|
||||
snapshots_set.insert(snapshots.back());
|
||||
}
|
||||
std::sort(snapshots.begin(), snapshots.end());
|
||||
|
||||
new_mem->Ref();
|
||||
SequenceNumber current_seqno = 0;
|
||||
mock::MockFileContents inserted_keys;
|
||||
for (int i = 1; i < keys; ++i) {
|
||||
std::string key(ToString(i));
|
||||
int insertions = rnd.Uniform(max_inserts_per_keys);
|
||||
for (int j = 0; j < insertions; ++j) {
|
||||
std::string value(test::RandomHumanReadableString(&rnd, 10));
|
||||
auto seqno = ++current_seqno;
|
||||
new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value);
|
||||
// a key is visible only if:
|
||||
// 1. it's the last one written (j == insertions - 1)
|
||||
// 2. there's a snapshot pointing at it
|
||||
bool visible = (j == insertions - 1) ||
|
||||
(snapshots_set.find(seqno) != snapshots_set.end());
|
||||
if (visible) {
|
||||
InternalKey internal_key(key, seqno, kTypeValue);
|
||||
inserted_keys.insert({internal_key.Encode().ToString(), value});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
autovector<MemTable*> to_delete;
|
||||
cfd->imm()->Add(new_mem, &to_delete);
|
||||
for (auto& m : to_delete) {
|
||||
delete m;
|
||||
}
|
||||
|
||||
EventLogger event_logger(db_options_.info_log.get());
|
||||
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
|
||||
db_options_, *cfd->GetLatestMutableCFOptions(),
|
||||
env_options_, versions_.get(), &mutex_, &shutting_down_,
|
||||
snapshots, &job_context, nullptr, nullptr, nullptr,
|
||||
kNoCompression, nullptr, &event_logger);
|
||||
mutex_.Lock();
|
||||
ASSERT_OK(flush_job.Run());
|
||||
|
@ -292,7 +292,7 @@ class Repairer {
|
||||
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
|
||||
status = BuildTable(dbname_, env_, ioptions_, env_options_, table_cache_,
|
||||
iter.get(), &meta, icmp_,
|
||||
&int_tbl_prop_collector_factories_, 0, 0,
|
||||
&int_tbl_prop_collector_factories_, {},
|
||||
kNoCompression, CompressionOptions(), false, nullptr);
|
||||
}
|
||||
delete mem->Unref();
|
||||
|
Loading…
Reference in New Issue
Block a user