Allow iterate refresh for secondary instance (#8700)

Summary:
Test plan
make check

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8700

Reviewed By: zhichao-cao

Differential Revision: D30523907

Pulled By: riversand963

fbshipit-source-id: 68928ab4dafb64ce80ab7bc69d83727a4713ab91
This commit is contained in:
Yanqin Jin 2021-08-24 15:39:31 -07:00 committed by Facebook GitHub Bot
parent 74cfe7db60
commit 229350ef48
4 changed files with 59 additions and 4 deletions

View File

@ -1,5 +1,8 @@
# Rocksdb Change Log
## Unreleased
### Bug Fixes
* Allow secondary instance to refresh iterator. Assign read seq after referencing SuperVersion.
### New Features
* RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions.

View File

@ -415,7 +415,7 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
return NewErrorIterator(
Status::NotSupported("snapshot not supported in secondary mode"));
} else {
auto snapshot = versions_->LastSequence();
SequenceNumber snapshot(kMaxSequenceNumber);
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
}
return result;
@ -423,14 +423,19 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
const ReadOptions& read_options, ColumnFamilyData* cfd,
SequenceNumber snapshot, ReadCallback* read_callback) {
SequenceNumber snapshot, ReadCallback* read_callback,
bool expose_blob_index, bool allow_refresh) {
assert(nullptr != cfd);
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
assert(snapshot == kMaxSequenceNumber);
snapshot = versions_->LastSequence();
assert(snapshot != kMaxSequenceNumber);
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
super_version->current, snapshot,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback);
super_version->version_number, read_callback, this, cfd,
expose_blob_index, read_options.snapshot ? false : allow_refresh);
auto internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot,

View File

@ -97,7 +97,9 @@ class DBImplSecondary : public DBImpl {
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
ReadCallback* read_callback);
ReadCallback* read_callback,
bool expose_blob_index = false,
bool allow_refresh = true);
Status NewIterators(const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,

View File

@ -560,6 +560,51 @@ TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) {
verify_db_func("new_foo_value_1", "new_bar_value");
}
TEST_F(DBSecondaryTest, RefreshIterator) {
Options options;
options.env = env_;
Reopen(options);
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondary(options1);
std::unique_ptr<Iterator> it(db_secondary_->NewIterator(ReadOptions()));
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
if (0 == i) {
it->Seek("foo");
ASSERT_FALSE(it->Valid());
ASSERT_OK(it->status());
ASSERT_OK(it->Refresh());
it->Seek("foo");
ASSERT_OK(it->status());
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key());
ASSERT_EQ("foo_value0", it->value());
} else {
it->Seek("foo");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key());
ASSERT_EQ("foo_value" + std::to_string(i - 1), it->value());
ASSERT_OK(it->status());
ASSERT_OK(it->Refresh());
it->Seek("foo");
ASSERT_OK(it->status());
ASSERT_TRUE(it->Valid());
ASSERT_EQ("foo", it->key());
ASSERT_EQ("foo_value" + std::to_string(i), it->value());
}
}
}
TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {
Options options;
options.env = env_;