Compare commits
7 Commits
main
...
6.3.fb.myr
Author | SHA1 | Date | |
---|---|---|---|
|
9c34cded08 | ||
|
f633994fef | ||
|
4525e4bec2 | ||
|
a68aa71e3b | ||
|
c4774a12b7 | ||
|
1c6eb2b1e4 | ||
|
0849844411 |
@ -1,5 +1,9 @@
|
|||||||
# Rocksdb Change Log
|
# Rocksdb Change Log
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
### Bug Fixes
|
||||||
|
* Fix a bug where the compaction snapshot refresh feature is not disabled as advertised when `snap_refresh_nanos` is set to 0..
|
||||||
|
* Revert "DBIter::Next() can skip user key checking if previous entry's seqnum is 0", which is not a bug, but can make the previous bug's sympthom more severe.
|
||||||
|
|
||||||
### Default Option Change
|
### Default Option Change
|
||||||
* LRUCacheOptions.high_pri_pool_ratio is set to 0.5 (previously 0.0) by default, which means that by default midpoint insertion is enabled. The same change is made for the default value of high_pri_pool_ratio argument in NewLRUCache(). When block cache is not explictly created, the small block cache created by BlockBasedTable will still has this option to be 0.0.
|
* LRUCacheOptions.high_pri_pool_ratio is set to 0.5 (previously 0.0) by default, which means that by default midpoint insertion is enabled. The same change is made for the default value of high_pri_pool_ratio argument in NewLRUCache(). When block cache is not explictly created, the small block cache created by BlockBasedTable will still has this option to be 0.0.
|
||||||
* Change BlockBasedTableOptions.cache_index_and_filter_blocks_with_high_priority's default value from false to true.
|
* Change BlockBasedTableOptions.cache_index_and_filter_blocks_with_high_priority's default value from false to true.
|
||||||
@ -20,7 +24,7 @@
|
|||||||
* Added new APIs ExportColumnFamily() and CreateColumnFamilyWithImport() to support export and import of a Column Family. https://github.com/facebook/rocksdb/issues/3469
|
* Added new APIs ExportColumnFamily() and CreateColumnFamilyWithImport() to support export and import of a Column Family. https://github.com/facebook/rocksdb/issues/3469
|
||||||
|
|
||||||
### New Features
|
### New Features
|
||||||
* Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature.
|
* Add an option `snap_refresh_nanos` (default to 0) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature.
|
||||||
* Add an option `unordered_write` which trades snapshot guarantees with higher write throughput. When used with WRITE_PREPARED transactions with two_write_queues=true, it offers higher throughput with however no compromise on guarantees.
|
* Add an option `unordered_write` which trades snapshot guarantees with higher write throughput. When used with WRITE_PREPARED transactions with two_write_queues=true, it offers higher throughput with however no compromise on guarantees.
|
||||||
* Allow DBImplSecondary to remove memtables with obsolete data after replaying MANIFEST and WAL.
|
* Allow DBImplSecondary to remove memtables with obsolete data after replaying MANIFEST and WAL.
|
||||||
* Add an option `failed_move_fall_back_to_copy` (default is true) for external SST ingestion. When `move_files` is true and hard link fails, ingestion falls back to copy if `failed_move_fall_back_to_copy` is true. Otherwise, ingestion reports an error.
|
* Add an option `failed_move_fall_back_to_copy` (default is true) for external SST ingestion. When `move_files` is true and hard link fails, ingestion falls back to copy if `failed_move_fall_back_to_copy` is true. Otherwise, ingestion reports an error.
|
||||||
|
@ -39,6 +39,7 @@ class SnapshotListFetchCallback {
|
|||||||
virtual void Refresh(std::vector<SequenceNumber>* snapshots,
|
virtual void Refresh(std::vector<SequenceNumber>* snapshots,
|
||||||
SequenceNumber max) = 0;
|
SequenceNumber max) = 0;
|
||||||
inline bool TimeToRefresh(const size_t key_index) {
|
inline bool TimeToRefresh(const size_t key_index) {
|
||||||
|
assert(snap_refresh_nanos_ != 0);
|
||||||
// skip the key if key_index % every_nth_key (which is of power 2) is not 0.
|
// skip the key if key_index % every_nth_key (which is of power 2) is not 0.
|
||||||
if ((key_index & every_nth_key_minus_one_) != 0) {
|
if ((key_index & every_nth_key_minus_one_) != 0) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -964,7 +964,7 @@ TEST_F(CompactionJobTest, SnapshotRefresh) {
|
|||||||
public:
|
public:
|
||||||
SnapshotListFetchCallbackTest(Env* env, Random64& rand,
|
SnapshotListFetchCallbackTest(Env* env, Random64& rand,
|
||||||
std::vector<SequenceNumber>* snapshots)
|
std::vector<SequenceNumber>* snapshots)
|
||||||
: SnapshotListFetchCallback(env, 0 /*no time delay*/,
|
: SnapshotListFetchCallback(env, 1 /*short time delay*/,
|
||||||
1 /*fetch after each key*/),
|
1 /*fetch after each key*/),
|
||||||
rand_(rand),
|
rand_(rand),
|
||||||
snapshots_(snapshots) {}
|
snapshots_(snapshots) {}
|
||||||
|
@ -3669,9 +3669,9 @@ Status DBImpl::IngestExternalFiles(
|
|||||||
exec_results.emplace_back(false, Status::OK());
|
exec_results.emplace_back(false, Status::OK());
|
||||||
}
|
}
|
||||||
// TODO(yanqin) maybe make jobs run in parallel
|
// TODO(yanqin) maybe make jobs run in parallel
|
||||||
|
uint64_t start_file_number = next_file_number;
|
||||||
for (size_t i = 1; i != num_cfs; ++i) {
|
for (size_t i = 1; i != num_cfs; ++i) {
|
||||||
uint64_t start_file_number =
|
start_file_number += args[i - 1].external_files.size();
|
||||||
next_file_number + args[i - 1].external_files.size();
|
|
||||||
auto* cfd =
|
auto* cfd =
|
||||||
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
|
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
|
||||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
|
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
|
||||||
|
@ -1008,7 +1008,9 @@ Status DBImpl::CompactFilesImpl(
|
|||||||
c->mutable_cf_options()->paranoid_file_checks,
|
c->mutable_cf_options()->paranoid_file_checks,
|
||||||
c->mutable_cf_options()->report_bg_io_stats, dbname_,
|
c->mutable_cf_options()->report_bg_io_stats, dbname_,
|
||||||
&compaction_job_stats, Env::Priority::USER,
|
&compaction_job_stats, Env::Priority::USER,
|
||||||
immutable_db_options_.max_subcompactions <= 1 ? &fetch_callback
|
immutable_db_options_.max_subcompactions <= 1 &&
|
||||||
|
c->mutable_cf_options()->snap_refresh_nanos > 0
|
||||||
|
? &fetch_callback
|
||||||
: nullptr);
|
: nullptr);
|
||||||
|
|
||||||
// Creating a compaction influences the compaction score because the score
|
// Creating a compaction influences the compaction score because the score
|
||||||
@ -2737,7 +2739,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
|||||||
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
|
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
|
||||||
c->mutable_cf_options()->report_bg_io_stats, dbname_,
|
c->mutable_cf_options()->report_bg_io_stats, dbname_,
|
||||||
&compaction_job_stats, thread_pri,
|
&compaction_job_stats, thread_pri,
|
||||||
immutable_db_options_.max_subcompactions <= 1 ? &fetch_callback
|
immutable_db_options_.max_subcompactions <= 1 &&
|
||||||
|
c->mutable_cf_options()->snap_refresh_nanos > 0
|
||||||
|
? &fetch_callback
|
||||||
: nullptr);
|
: nullptr);
|
||||||
compaction_job.Prepare();
|
compaction_job.Prepare();
|
||||||
|
|
||||||
|
@ -133,7 +133,6 @@ class DBIter final: public Iterator {
|
|||||||
direction_(kForward),
|
direction_(kForward),
|
||||||
valid_(false),
|
valid_(false),
|
||||||
current_entry_is_merged_(false),
|
current_entry_is_merged_(false),
|
||||||
is_key_seqnum_zero_(false),
|
|
||||||
prefix_same_as_start_(read_options.prefix_same_as_start),
|
prefix_same_as_start_(read_options.prefix_same_as_start),
|
||||||
pin_thru_lifetime_(read_options.pin_data),
|
pin_thru_lifetime_(read_options.pin_data),
|
||||||
total_order_seek_(read_options.total_order_seek),
|
total_order_seek_(read_options.total_order_seek),
|
||||||
@ -334,10 +333,6 @@ class DBIter final: public Iterator {
|
|||||||
Direction direction_;
|
Direction direction_;
|
||||||
bool valid_;
|
bool valid_;
|
||||||
bool current_entry_is_merged_;
|
bool current_entry_is_merged_;
|
||||||
// True if we know that the current entry's seqnum is 0.
|
|
||||||
// This information is used as that the next entry will be for another
|
|
||||||
// user key.
|
|
||||||
bool is_key_seqnum_zero_;
|
|
||||||
const bool prefix_same_as_start_;
|
const bool prefix_same_as_start_;
|
||||||
// Means that we will pin all data blocks we read as long the Iterator
|
// Means that we will pin all data blocks we read as long the Iterator
|
||||||
// is not deleted, will be true if ReadOptions::pin_data is true
|
// is not deleted, will be true if ReadOptions::pin_data is true
|
||||||
@ -386,7 +381,6 @@ void DBIter::Next() {
|
|||||||
num_internal_keys_skipped_ = 0;
|
num_internal_keys_skipped_ = 0;
|
||||||
bool ok = true;
|
bool ok = true;
|
||||||
if (direction_ == kReverse) {
|
if (direction_ == kReverse) {
|
||||||
is_key_seqnum_zero_ = false;
|
|
||||||
if (!ReverseToForward()) {
|
if (!ReverseToForward()) {
|
||||||
ok = false;
|
ok = false;
|
||||||
}
|
}
|
||||||
@ -406,7 +400,6 @@ void DBIter::Next() {
|
|||||||
FindNextUserEntry(true /* skipping the current user key */,
|
FindNextUserEntry(true /* skipping the current user key */,
|
||||||
prefix_same_as_start_);
|
prefix_same_as_start_);
|
||||||
} else {
|
} else {
|
||||||
is_key_seqnum_zero_ = false;
|
|
||||||
valid_ = false;
|
valid_ = false;
|
||||||
}
|
}
|
||||||
if (statistics_ != nullptr && valid_) {
|
if (statistics_ != nullptr && valid_) {
|
||||||
@ -457,16 +450,10 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
|
|||||||
is_blob_ = false;
|
is_blob_ = false;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
// Will update is_key_seqnum_zero_ as soon as we parsed the current key
|
|
||||||
// but we need to save the previous value to be used in the loop.
|
|
||||||
bool is_prev_key_seqnum_zero = is_key_seqnum_zero_;
|
|
||||||
if (!ParseKey(&ikey_)) {
|
if (!ParseKey(&ikey_)) {
|
||||||
is_key_seqnum_zero_ = false;
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
is_key_seqnum_zero_ = (ikey_.sequence == 0);
|
|
||||||
|
|
||||||
assert(iterate_upper_bound_ == nullptr || iter_.MayBeOutOfUpperBound() ||
|
assert(iterate_upper_bound_ == nullptr || iter_.MayBeOutOfUpperBound() ||
|
||||||
user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) < 0);
|
user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) < 0);
|
||||||
if (iterate_upper_bound_ != nullptr && iter_.MayBeOutOfUpperBound() &&
|
if (iterate_upper_bound_ != nullptr && iter_.MayBeOutOfUpperBound() &&
|
||||||
@ -485,18 +472,11 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (IsVisible(ikey_.sequence)) {
|
if (IsVisible(ikey_.sequence)) {
|
||||||
// If the previous entry is of seqnum 0, the current entry will not
|
if (skipping && user_comparator_.Compare(ikey_.user_key,
|
||||||
// possibly be skipped. This condition can potentially be relaxed to
|
saved_key_.GetUserKey()) <= 0) {
|
||||||
// prev_key.seq <= ikey_.sequence. We are cautious because it will be more
|
|
||||||
// prone to bugs causing the same user key with the same sequence number.
|
|
||||||
if (!is_prev_key_seqnum_zero && skipping &&
|
|
||||||
user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey()) <=
|
|
||||||
0) {
|
|
||||||
num_skipped++; // skip this entry
|
num_skipped++; // skip this entry
|
||||||
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
|
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
|
||||||
} else {
|
} else {
|
||||||
assert(!skipping || user_comparator_.Compare(
|
|
||||||
ikey_.user_key, saved_key_.GetUserKey()) > 0);
|
|
||||||
num_skipped = 0;
|
num_skipped = 0;
|
||||||
switch (ikey_.type) {
|
switch (ikey_.type) {
|
||||||
case kTypeDeletion:
|
case kTypeDeletion:
|
||||||
@ -617,7 +597,6 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
|
|||||||
// If we have sequentially iterated via numerous equal keys, then it's
|
// If we have sequentially iterated via numerous equal keys, then it's
|
||||||
// better to seek so that we can avoid too many key comparisons.
|
// better to seek so that we can avoid too many key comparisons.
|
||||||
if (num_skipped > max_skip_ && CanReseekToSkip()) {
|
if (num_skipped > max_skip_ && CanReseekToSkip()) {
|
||||||
is_key_seqnum_zero_ = false;
|
|
||||||
num_skipped = 0;
|
num_skipped = 0;
|
||||||
std::string last_key;
|
std::string last_key;
|
||||||
if (skipping) {
|
if (skipping) {
|
||||||
@ -1291,7 +1270,6 @@ void DBIter::Seek(const Slice& target) {
|
|||||||
status_ = Status::OK();
|
status_ = Status::OK();
|
||||||
ReleaseTempPinnedData();
|
ReleaseTempPinnedData();
|
||||||
ResetInternalKeysSkippedCounter();
|
ResetInternalKeysSkippedCounter();
|
||||||
is_key_seqnum_zero_ = false;
|
|
||||||
|
|
||||||
SequenceNumber seq = sequence_;
|
SequenceNumber seq = sequence_;
|
||||||
saved_key_.Clear();
|
saved_key_.Clear();
|
||||||
@ -1350,7 +1328,6 @@ void DBIter::SeekForPrev(const Slice& target) {
|
|||||||
status_ = Status::OK();
|
status_ = Status::OK();
|
||||||
ReleaseTempPinnedData();
|
ReleaseTempPinnedData();
|
||||||
ResetInternalKeysSkippedCounter();
|
ResetInternalKeysSkippedCounter();
|
||||||
is_key_seqnum_zero_ = false;
|
|
||||||
saved_key_.Clear();
|
saved_key_.Clear();
|
||||||
// now saved_key is used to store internal key.
|
// now saved_key is used to store internal key.
|
||||||
saved_key_.SetInternalKey(target, 0 /* sequence_number */,
|
saved_key_.SetInternalKey(target, 0 /* sequence_number */,
|
||||||
@ -1418,7 +1395,6 @@ void DBIter::SeekToFirst() {
|
|||||||
ReleaseTempPinnedData();
|
ReleaseTempPinnedData();
|
||||||
ResetInternalKeysSkippedCounter();
|
ResetInternalKeysSkippedCounter();
|
||||||
ClearSavedValue();
|
ClearSavedValue();
|
||||||
is_key_seqnum_zero_ = false;
|
|
||||||
|
|
||||||
{
|
{
|
||||||
PERF_TIMER_GUARD(seek_internal_seek_time);
|
PERF_TIMER_GUARD(seek_internal_seek_time);
|
||||||
@ -1471,7 +1447,6 @@ void DBIter::SeekToLast() {
|
|||||||
ReleaseTempPinnedData();
|
ReleaseTempPinnedData();
|
||||||
ResetInternalKeysSkippedCounter();
|
ResetInternalKeysSkippedCounter();
|
||||||
ClearSavedValue();
|
ClearSavedValue();
|
||||||
is_key_seqnum_zero_ = false;
|
|
||||||
|
|
||||||
{
|
{
|
||||||
PERF_TIMER_GUARD(seek_internal_seek_time);
|
PERF_TIMER_GUARD(seek_internal_seek_time);
|
||||||
|
@ -182,6 +182,33 @@ TEST_P(DBIteratorTest, IterSeekBeforePrev) {
|
|||||||
delete iter;
|
delete iter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(DBIteratorTest, IterReseekNewUpperBound) {
|
||||||
|
Random rnd(301);
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
BlockBasedTableOptions table_options;
|
||||||
|
table_options.block_size = 1024;
|
||||||
|
table_options.block_size_deviation = 50;
|
||||||
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||||
|
options.compression = kNoCompression;
|
||||||
|
Reopen(options);
|
||||||
|
|
||||||
|
ASSERT_OK(Put("a", RandomString(&rnd, 400)));
|
||||||
|
ASSERT_OK(Put("aabb", RandomString(&rnd, 400)));
|
||||||
|
ASSERT_OK(Put("aaef", RandomString(&rnd, 400)));
|
||||||
|
ASSERT_OK(Put("b", RandomString(&rnd, 400)));
|
||||||
|
dbfull()->Flush(FlushOptions());
|
||||||
|
ReadOptions opts;
|
||||||
|
Slice ub = Slice("aa");
|
||||||
|
opts.iterate_upper_bound = &ub;
|
||||||
|
auto iter = NewIterator(opts);
|
||||||
|
iter->Seek(Slice("a"));
|
||||||
|
ub = Slice("b");
|
||||||
|
iter->Seek(Slice("aabc"));
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_EQ(iter->key().ToString(), "aaef");
|
||||||
|
delete iter;
|
||||||
|
}
|
||||||
|
|
||||||
TEST_P(DBIteratorTest, IterSeekForPrevBeforeNext) {
|
TEST_P(DBIteratorTest, IterSeekForPrevBeforeNext) {
|
||||||
ASSERT_OK(Put("a", "b"));
|
ASSERT_OK(Put("a", "b"));
|
||||||
ASSERT_OK(Put("c", "d"));
|
ASSERT_OK(Put("c", "d"));
|
||||||
|
@ -3771,6 +3771,46 @@ TEST_F(DBTest2, CloseWithUnreleasedSnapshot) {
|
|||||||
delete db_;
|
delete db_;
|
||||||
db_ = nullptr;
|
db_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBTest2, PrefixBloomReseek) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options.prefix_extractor.reset(NewCappedPrefixTransform(3));
|
||||||
|
BlockBasedTableOptions bbto;
|
||||||
|
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
||||||
|
bbto.whole_key_filtering = false;
|
||||||
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
// Construct two L1 files with keys:
|
||||||
|
// f1:[aaa1 ccc1] f2:[ddd0]
|
||||||
|
ASSERT_OK(Put("aaa1", ""));
|
||||||
|
ASSERT_OK(Put("ccc1", ""));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
ASSERT_OK(Put("ddd0", ""));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
CompactRangeOptions cro;
|
||||||
|
cro.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
|
||||||
|
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
|
||||||
|
|
||||||
|
ASSERT_OK(Put("bbb1", ""));
|
||||||
|
|
||||||
|
Iterator* iter = db_->NewIterator(ReadOptions());
|
||||||
|
|
||||||
|
// Seeking into f1, the iterator will check bloom filter which returns the
|
||||||
|
// file iterator ot be invalidate, and the cursor will put into f2, with
|
||||||
|
// the next key to be "ddd0".
|
||||||
|
iter->Seek("bbb1");
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_EQ("bbb1", iter->key().ToString());
|
||||||
|
|
||||||
|
// Reseek ccc1, the L1 iterator needs to go back to f1 and reseek.
|
||||||
|
iter->Seek("ccc1");
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_EQ("ccc1", iter->key().ToString());
|
||||||
|
|
||||||
|
delete iter;
|
||||||
|
}
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -160,7 +160,7 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||||||
// We failed, remove all files that we copied into the db
|
// We failed, remove all files that we copied into the db
|
||||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||||
if (f.internal_file_path.empty()) {
|
if (f.internal_file_path.empty()) {
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
Status s = env_->DeleteFile(f.internal_file_path);
|
Status s = env_->DeleteFile(f.internal_file_path);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
@ -291,6 +291,9 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
|
|||||||
// We failed to add the files to the database
|
// We failed to add the files to the database
|
||||||
// remove all the files we copied
|
// remove all the files we copied
|
||||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||||
|
if (f.internal_file_path.empty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
Status s = env_->DeleteFile(f.internal_file_path);
|
Status s = env_->DeleteFile(f.internal_file_path);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
ROCKS_LOG_WARN(db_options_.info_log,
|
ROCKS_LOG_WARN(db_options_.info_log,
|
||||||
|
@ -2369,10 +2369,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
|||||||
new FaultInjectionTestEnv(env_));
|
new FaultInjectionTestEnv(env_));
|
||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
options.env = fault_injection_env.get();
|
options.env = fault_injection_env.get();
|
||||||
CreateAndReopenWithCF({"pikachu"}, options);
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
std::vector<ColumnFamilyHandle*> column_families;
|
std::vector<ColumnFamilyHandle*> column_families;
|
||||||
column_families.push_back(handles_[0]);
|
column_families.push_back(handles_[0]);
|
||||||
column_families.push_back(handles_[1]);
|
column_families.push_back(handles_[1]);
|
||||||
|
column_families.push_back(handles_[2]);
|
||||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||||
for (auto& ifo : ifos) {
|
for (auto& ifo : ifos) {
|
||||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||||
@ -2386,6 +2387,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
|||||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||||
data.push_back(
|
data.push_back(
|
||||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||||
|
data.push_back(
|
||||||
|
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||||
|
|
||||||
// Resize the true_data vector upon construction to avoid re-alloc
|
// Resize the true_data vector upon construction to avoid re-alloc
|
||||||
std::vector<std::map<std::string, std::string>> true_data(
|
std::vector<std::map<std::string, std::string>> true_data(
|
||||||
column_families.size());
|
column_families.size());
|
||||||
@ -2393,8 +2397,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
|||||||
-1, true, true_data);
|
-1, true, true_data);
|
||||||
ASSERT_OK(s);
|
ASSERT_OK(s);
|
||||||
Close();
|
Close();
|
||||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||||
ASSERT_EQ(2, handles_.size());
|
options);
|
||||||
|
ASSERT_EQ(3, handles_.size());
|
||||||
int cf = 0;
|
int cf = 0;
|
||||||
for (const auto& verify_map : true_data) {
|
for (const auto& verify_map : true_data) {
|
||||||
for (const auto& elem : verify_map) {
|
for (const auto& elem : verify_map) {
|
||||||
@ -2426,10 +2431,11 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
|
|
||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
options.env = fault_injection_env.get();
|
options.env = fault_injection_env.get();
|
||||||
CreateAndReopenWithCF({"pikachu"}, options);
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
const std::vector<std::map<std::string, std::string>> data_before_ingestion =
|
const std::vector<std::map<std::string, std::string>> data_before_ingestion =
|
||||||
{{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
|
{{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
|
||||||
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}}};
|
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}},
|
||||||
|
{{"bar4", "bv4_0"}, {"bar5", "bv5_0"}, {"bar6", "bv6_0"}}};
|
||||||
for (size_t i = 0; i != handles_.size(); ++i) {
|
for (size_t i = 0; i != handles_.size(); ++i) {
|
||||||
int cf = static_cast<int>(i);
|
int cf = static_cast<int>(i);
|
||||||
const auto& orig_data = data_before_ingestion[i];
|
const auto& orig_data = data_before_ingestion[i];
|
||||||
@ -2442,6 +2448,7 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
std::vector<ColumnFamilyHandle*> column_families;
|
std::vector<ColumnFamilyHandle*> column_families;
|
||||||
column_families.push_back(handles_[0]);
|
column_families.push_back(handles_[0]);
|
||||||
column_families.push_back(handles_[1]);
|
column_families.push_back(handles_[1]);
|
||||||
|
column_families.push_back(handles_[2]);
|
||||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||||
for (auto& ifo : ifos) {
|
for (auto& ifo : ifos) {
|
||||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||||
@ -2455,6 +2462,8 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||||
data.push_back(
|
data.push_back(
|
||||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||||
|
data.push_back(
|
||||||
|
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||||
// Resize the true_data vector upon construction to avoid re-alloc
|
// Resize the true_data vector upon construction to avoid re-alloc
|
||||||
std::vector<std::map<std::string, std::string>> true_data(
|
std::vector<std::map<std::string, std::string>> true_data(
|
||||||
column_families.size());
|
column_families.size());
|
||||||
@ -2508,10 +2517,11 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
dbfull()->ReleaseSnapshot(read_opts.snapshot);
|
dbfull()->ReleaseSnapshot(read_opts.snapshot);
|
||||||
|
|
||||||
Close();
|
Close();
|
||||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||||
|
options);
|
||||||
// Should see consistent state after ingestion for all column families even
|
// Should see consistent state after ingestion for all column families even
|
||||||
// without snapshot.
|
// without snapshot.
|
||||||
ASSERT_EQ(2, handles_.size());
|
ASSERT_EQ(3, handles_.size());
|
||||||
int cf = 0;
|
int cf = 0;
|
||||||
for (const auto& verify_map : true_data) {
|
for (const auto& verify_map : true_data) {
|
||||||
for (const auto& elem : verify_map) {
|
for (const auto& elem : verify_map) {
|
||||||
@ -2541,10 +2551,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
|||||||
"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
|
"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
|
||||||
});
|
});
|
||||||
SyncPoint::GetInstance()->EnableProcessing();
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
CreateAndReopenWithCF({"pikachu"}, options);
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
std::vector<ColumnFamilyHandle*> column_families;
|
std::vector<ColumnFamilyHandle*> column_families;
|
||||||
column_families.push_back(handles_[0]);
|
column_families.push_back(handles_[0]);
|
||||||
column_families.push_back(handles_[1]);
|
column_families.push_back(handles_[1]);
|
||||||
|
column_families.push_back(handles_[2]);
|
||||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||||
for (auto& ifo : ifos) {
|
for (auto& ifo : ifos) {
|
||||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||||
@ -2558,6 +2569,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
|||||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||||
data.push_back(
|
data.push_back(
|
||||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||||
|
data.push_back(
|
||||||
|
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||||
|
|
||||||
// Resize the true_data vector upon construction to avoid re-alloc
|
// Resize the true_data vector upon construction to avoid re-alloc
|
||||||
std::vector<std::map<std::string, std::string>> true_data(
|
std::vector<std::map<std::string, std::string>> true_data(
|
||||||
column_families.size());
|
column_families.size());
|
||||||
@ -2577,8 +2591,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
|||||||
|
|
||||||
fault_injection_env->SetFilesystemActive(true);
|
fault_injection_env->SetFilesystemActive(true);
|
||||||
Close();
|
Close();
|
||||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||||
ASSERT_EQ(2, handles_.size());
|
options);
|
||||||
|
ASSERT_EQ(3, handles_.size());
|
||||||
int cf = 0;
|
int cf = 0;
|
||||||
for (const auto& verify_map : true_data) {
|
for (const auto& verify_map : true_data) {
|
||||||
for (const auto& elem : verify_map) {
|
for (const auto& elem : verify_map) {
|
||||||
@ -2607,10 +2622,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
|||||||
"DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
|
"DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
|
||||||
});
|
});
|
||||||
SyncPoint::GetInstance()->EnableProcessing();
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
CreateAndReopenWithCF({"pikachu"}, options);
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
std::vector<ColumnFamilyHandle*> column_families;
|
std::vector<ColumnFamilyHandle*> column_families;
|
||||||
column_families.push_back(handles_[0]);
|
column_families.push_back(handles_[0]);
|
||||||
column_families.push_back(handles_[1]);
|
column_families.push_back(handles_[1]);
|
||||||
|
column_families.push_back(handles_[2]);
|
||||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||||
for (auto& ifo : ifos) {
|
for (auto& ifo : ifos) {
|
||||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||||
@ -2624,6 +2640,8 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
|||||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||||
data.push_back(
|
data.push_back(
|
||||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||||
|
data.push_back(
|
||||||
|
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||||
// Resize the true_data vector upon construction to avoid re-alloc
|
// Resize the true_data vector upon construction to avoid re-alloc
|
||||||
std::vector<std::map<std::string, std::string>> true_data(
|
std::vector<std::map<std::string, std::string>> true_data(
|
||||||
column_families.size());
|
column_families.size());
|
||||||
@ -2643,8 +2661,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
|||||||
|
|
||||||
fault_injection_env->SetFilesystemActive(true);
|
fault_injection_env->SetFilesystemActive(true);
|
||||||
Close();
|
Close();
|
||||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||||
ASSERT_EQ(2, handles_.size());
|
options);
|
||||||
|
ASSERT_EQ(3, handles_.size());
|
||||||
int cf = 0;
|
int cf = 0;
|
||||||
for (const auto& verify_map : true_data) {
|
for (const auto& verify_map : true_data) {
|
||||||
for (const auto& elem : verify_map) {
|
for (const auto& elem : verify_map) {
|
||||||
@ -2664,7 +2683,7 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
options.env = fault_injection_env.get();
|
options.env = fault_injection_env.get();
|
||||||
|
|
||||||
CreateAndReopenWithCF({"pikachu"}, options);
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
|
|
||||||
SyncPoint::GetInstance()->ClearTrace();
|
SyncPoint::GetInstance()->ClearTrace();
|
||||||
SyncPoint::GetInstance()->DisableProcessing();
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
@ -2682,6 +2701,7 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
std::vector<ColumnFamilyHandle*> column_families;
|
std::vector<ColumnFamilyHandle*> column_families;
|
||||||
column_families.push_back(handles_[0]);
|
column_families.push_back(handles_[0]);
|
||||||
column_families.push_back(handles_[1]);
|
column_families.push_back(handles_[1]);
|
||||||
|
column_families.push_back(handles_[2]);
|
||||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||||
for (auto& ifo : ifos) {
|
for (auto& ifo : ifos) {
|
||||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||||
@ -2695,6 +2715,8 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||||
data.push_back(
|
data.push_back(
|
||||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||||
|
data.push_back(
|
||||||
|
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||||
// Resize the true_data vector upon construction to avoid re-alloc
|
// Resize the true_data vector upon construction to avoid re-alloc
|
||||||
std::vector<std::map<std::string, std::string>> true_data(
|
std::vector<std::map<std::string, std::string>> true_data(
|
||||||
column_families.size());
|
column_families.size());
|
||||||
@ -2715,8 +2737,9 @@ TEST_P(ExternalSSTFileTest,
|
|||||||
fault_injection_env->DropUnsyncedFileData();
|
fault_injection_env->DropUnsyncedFileData();
|
||||||
fault_injection_env->SetFilesystemActive(true);
|
fault_injection_env->SetFilesystemActive(true);
|
||||||
Close();
|
Close();
|
||||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||||
ASSERT_EQ(2, handles_.size());
|
options);
|
||||||
|
ASSERT_EQ(3, handles_.size());
|
||||||
int cf = 0;
|
int cf = 0;
|
||||||
for (const auto& verify_map : true_data) {
|
for (const auto& verify_map : true_data) {
|
||||||
for (const auto& elem : verify_map) {
|
for (const auto& elem : verify_map) {
|
||||||
|
@ -275,10 +275,10 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
|
|||||||
// this option helps reducing the cpu usage of long-running compactions. The
|
// this option helps reducing the cpu usage of long-running compactions. The
|
||||||
// feature is disabled when max_subcompactions is greater than one.
|
// feature is disabled when max_subcompactions is greater than one.
|
||||||
//
|
//
|
||||||
// Default: 0.1s
|
// Default: 0
|
||||||
//
|
//
|
||||||
// Dynamically changeable through SetOptions() API
|
// Dynamically changeable through SetOptions() API
|
||||||
uint64_t snap_refresh_nanos = 100 * 1000 * 1000; // 0.1s
|
uint64_t snap_refresh_nanos = 0;
|
||||||
|
|
||||||
// Disable automatic compactions. Manual compactions can still
|
// Disable automatic compactions. Manual compactions can still
|
||||||
// be issued on this column family
|
// be issued on this column family
|
||||||
|
@ -5,7 +5,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#define ROCKSDB_MAJOR 6
|
#define ROCKSDB_MAJOR 6
|
||||||
#define ROCKSDB_MINOR 2
|
#define ROCKSDB_MINOR 3
|
||||||
#define ROCKSDB_PATCH 0
|
#define ROCKSDB_PATCH 0
|
||||||
|
|
||||||
// Do not use these. We made the mistake of declaring macros starting with
|
// Do not use these. We made the mistake of declaring macros starting with
|
||||||
|
@ -2825,11 +2825,21 @@ void BlockBasedTableIterator<TBlockIter, TValue>::SeekImpl(
|
|||||||
// Index contains the first key of the block, and it's >= target.
|
// Index contains the first key of the block, and it's >= target.
|
||||||
// We can defer reading the block.
|
// We can defer reading the block.
|
||||||
is_at_first_key_from_index_ = true;
|
is_at_first_key_from_index_ = true;
|
||||||
|
// ResetDataIter() will invalidate block_iter_. Thus, there is no need to
|
||||||
|
// call CheckDataBlockWithinUpperBound() to check for iterate_upper_bound
|
||||||
|
// as that will be done later when the data block is actually read.
|
||||||
ResetDataIter();
|
ResetDataIter();
|
||||||
} else {
|
} else {
|
||||||
// Need to use the data block.
|
// Need to use the data block.
|
||||||
if (!same_block) {
|
if (!same_block) {
|
||||||
InitDataBlock();
|
InitDataBlock();
|
||||||
|
} else {
|
||||||
|
// When the user does a reseek, the iterate_upper_bound might have
|
||||||
|
// changed. CheckDataBlockWithinUpperBound() needs to be called
|
||||||
|
// explicitly if the reseek ends up in the same data block.
|
||||||
|
// If the reseek ends up in a different block, InitDataBlock() will do
|
||||||
|
// the iterator upper bound check.
|
||||||
|
CheckDataBlockWithinUpperBound();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (target) {
|
if (target) {
|
||||||
@ -2840,7 +2850,6 @@ void BlockBasedTableIterator<TBlockIter, TValue>::SeekImpl(
|
|||||||
FindKeyForward();
|
FindKeyForward();
|
||||||
}
|
}
|
||||||
|
|
||||||
CheckDataBlockWithinUpperBound();
|
|
||||||
CheckOutOfBound();
|
CheckOutOfBound();
|
||||||
|
|
||||||
if (target) {
|
if (target) {
|
||||||
|
@ -129,7 +129,7 @@ class MergingIterator : public InternalIterator {
|
|||||||
void Seek(const Slice& target) override {
|
void Seek(const Slice& target) override {
|
||||||
bool is_increasing_reseek = false;
|
bool is_increasing_reseek = false;
|
||||||
if (current_ != nullptr && direction_ == kForward && status_.ok() &&
|
if (current_ != nullptr && direction_ == kForward && status_.ok() &&
|
||||||
comparator_->Compare(target, key()) >= 0) {
|
!prefix_seek_mode_ && comparator_->Compare(target, key()) >= 0) {
|
||||||
is_increasing_reseek = true;
|
is_increasing_reseek = true;
|
||||||
}
|
}
|
||||||
ClearHeaps();
|
ClearHeaps();
|
||||||
|
Loading…
Reference in New Issue
Block a user