Compare commits
20 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
3513d4e93f | ||
|
76a56d89a7 | ||
|
dd757ffde0 | ||
|
fdc56e2b73 | ||
|
cfd6eb3e68 | ||
|
9623a66b24 | ||
|
47f0d5fdd0 | ||
|
3542f7ef00 | ||
|
5c12a474f3 | ||
|
6d113fc066 | ||
|
11afcbe7ba | ||
|
b6e554e698 | ||
|
a1f08cc953 | ||
|
0d9bfa6e4d | ||
|
9feb730c6e | ||
|
70dca18c96 | ||
|
5f703af1ea | ||
|
8baa66acf6 | ||
|
570d490a3d | ||
|
55320dedab |
@ -317,6 +317,10 @@ if(DISABLE_STALL_NOTIF)
|
||||
add_definitions(-DROCKSDB_DISABLE_STALL_NOTIFICATION)
|
||||
endif()
|
||||
|
||||
option(WITH_DYNAMIC_EXTENSION "build with dynamic extension support" OFF)
|
||||
if(NOT WITH_DYNAMIC_EXTENSION)
|
||||
add_definitions(-DROCKSDB_NO_DYNAMIC_EXTENSION)
|
||||
endif()
|
||||
|
||||
if(DEFINED USE_RTTI)
|
||||
if(USE_RTTI)
|
||||
|
19
HISTORY.md
19
HISTORY.md
@ -1,5 +1,21 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
## 6.2.4 (9/18/2019)
|
||||
### Bug Fixes
|
||||
* Disable snap_refresh_nanos by default. The feature is to be deprecated in the next release.
|
||||
|
||||
## 6.2.3 (9/3/2019)
|
||||
### Bug Fixes
|
||||
* Fix a bug in file ingestion caused by incorrect file number allocation when the number of column families involved in the ingestion exceeds 2.
|
||||
|
||||
## 6.2.2 (6/7/2019)
|
||||
### Bug Fixes
|
||||
* Disable dynamic extension support by default for CMake.
|
||||
|
||||
## 6.2.1 (6/4/2019)
|
||||
### Bug Fixes
|
||||
* Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level.
|
||||
|
||||
## 6.2.0 (4/30/2019)
|
||||
### New Features
|
||||
* Add an option `strict_bytes_per_sync` that causes a file-writing thread to block rather than exceed the limit on bytes pending writeback specified by `bytes_per_sync` or `wal_bytes_per_sync`.
|
||||
* Improve range scan performance by avoiding per-key upper bound check in BlockBasedTableIterator.
|
||||
@ -19,6 +35,7 @@
|
||||
* Fix a race condition between WritePrepared::Get and ::Put with duplicate keys.
|
||||
* Fix crash when memtable prefix bloom is enabled and read/write a key out of domain of prefix extractor.
|
||||
* Close a WAL file before another thread deletes it.
|
||||
* Fix an assertion failure `IsFlushPending() == true` caused by one bg thread releasing the db mutex in ~ColumnFamilyData and another thread clearing `flush_requested_` flag.
|
||||
|
||||
## 6.1.1 (4/9/2019)
|
||||
### New Features
|
||||
|
@ -56,10 +56,10 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
|
||||
if [ -n "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
|
||||
# we need this to build with MySQL. Don't use for other purposes.
|
||||
source "$PWD/build_tools/fbcode_config4.8.1.sh"
|
||||
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_PLATFORM007" ]; then
|
||||
source "$PWD/build_tools/fbcode_config_platform007.sh"
|
||||
else
|
||||
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_5xx" ]; then
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
else
|
||||
source "$PWD/build_tools/fbcode_config_platform007.sh"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
@ -39,6 +39,7 @@ class SnapshotListFetchCallback {
|
||||
virtual void Refresh(std::vector<SequenceNumber>* snapshots,
|
||||
SequenceNumber max) = 0;
|
||||
inline bool TimeToRefresh(const size_t key_index) {
|
||||
assert(snap_refresh_nanos_ != 0);
|
||||
// skip the key if key_index % every_nth_key (which is of power 2) is not 0.
|
||||
if ((key_index & every_nth_key_minus_one_) != 0) {
|
||||
return false;
|
||||
|
@ -893,7 +893,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||
&existing_snapshots_, earliest_write_conflict_snapshot_,
|
||||
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
|
||||
&range_del_agg, sub_compact->compaction, compaction_filter,
|
||||
shutting_down_, preserve_deletes_seqnum_, snap_list_callback_));
|
||||
shutting_down_, preserve_deletes_seqnum_,
|
||||
// Currently range_del_agg is incompatible with snapshot refresh feature.
|
||||
range_del_agg.IsEmpty() ? snap_list_callback_ : nullptr));
|
||||
auto c_iter = sub_compact->c_iter.get();
|
||||
c_iter->SeekToFirst();
|
||||
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
|
||||
|
@ -966,7 +966,7 @@ TEST_F(CompactionJobTest, SnapshotRefresh) {
|
||||
public:
|
||||
SnapshotListFetchCallbackTest(Env* env, Random64& rand,
|
||||
std::vector<SequenceNumber>* snapshots)
|
||||
: SnapshotListFetchCallback(env, 0 /*no time delay*/,
|
||||
: SnapshotListFetchCallback(env, 1 /*short time delay*/,
|
||||
1 /*fetch after each key*/),
|
||||
rand_(rand),
|
||||
snapshots_(snapshots) {}
|
||||
|
@ -514,6 +514,37 @@ TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {
|
||||
ASSERT_EQ("value", Get(0, "key"));
|
||||
}
|
||||
|
||||
TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {
|
||||
bool atomic_flush = GetParam();
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
options.atomic_flush = atomic_flush;
|
||||
options.max_write_buffer_number = 4;
|
||||
// Set min_write_buffer_number_to_merge to be greater than 1, so that
|
||||
// a column family with one memtable in the imm will not cause IsFlushPending
|
||||
// to return true when flush_requested_ is false.
|
||||
options.min_write_buffer_number_to_merge = 2;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ASSERT_OK(dbfull()->PauseBackgroundWork());
|
||||
ASSERT_OK(Put(0, "key00", "value00"));
|
||||
ASSERT_OK(Put(1, "key10", "value10"));
|
||||
FlushOptions flush_opts;
|
||||
flush_opts.wait = false;
|
||||
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
|
||||
ASSERT_OK(Put(0, "key01", "value01"));
|
||||
// Since max_write_buffer_number is 4, the following flush won't cause write
|
||||
// stall.
|
||||
ASSERT_OK(dbfull()->Flush(flush_opts));
|
||||
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
|
||||
ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
|
||||
handles_[1] = nullptr;
|
||||
ASSERT_OK(dbfull()->ContinueBackgroundWork());
|
||||
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
|
||||
delete handles_[0];
|
||||
handles_.clear();
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
|
||||
testing::Bool());
|
||||
|
||||
|
@ -3538,9 +3538,9 @@ Status DBImpl::IngestExternalFiles(
|
||||
exec_results.emplace_back(false, Status::OK());
|
||||
}
|
||||
// TODO(yanqin) maybe make jobs run in parallel
|
||||
uint64_t start_file_number = next_file_number;
|
||||
for (size_t i = 1; i != num_cfs; ++i) {
|
||||
uint64_t start_file_number =
|
||||
next_file_number + args[i - 1].external_files.size();
|
||||
start_file_number += args[i - 1].external_files.size();
|
||||
auto* cfd =
|
||||
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
|
||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
|
||||
|
@ -798,6 +798,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
|
||||
return s;
|
||||
}
|
||||
|
||||
namespace {
|
||||
class SnapshotListFetchCallbackImpl : public SnapshotListFetchCallback {
|
||||
public:
|
||||
SnapshotListFetchCallbackImpl(DBImpl* db_impl, Env* env,
|
||||
@ -820,6 +821,7 @@ class SnapshotListFetchCallbackImpl : public SnapshotListFetchCallback {
|
||||
DBImpl* db_impl_;
|
||||
Logger* info_log_;
|
||||
};
|
||||
} // namespace
|
||||
|
||||
Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
|
||||
ColumnFamilyHandle* column_family,
|
||||
@ -1005,8 +1007,10 @@ Status DBImpl::CompactFilesImpl(
|
||||
c->mutable_cf_options()->paranoid_file_checks,
|
||||
c->mutable_cf_options()->report_bg_io_stats, dbname_,
|
||||
&compaction_job_stats, Env::Priority::USER,
|
||||
immutable_db_options_.max_subcompactions <= 1 ? &fetch_callback
|
||||
: nullptr);
|
||||
immutable_db_options_.max_subcompactions <= 1 &&
|
||||
c->mutable_cf_options()->snap_refresh_nanos > 0
|
||||
? &fetch_callback
|
||||
: nullptr);
|
||||
|
||||
// Creating a compaction influences the compaction score because the score
|
||||
// takes running compactions into account (by skipping files that are already
|
||||
@ -2080,6 +2084,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
|
||||
autovector<BGFlushArg> bg_flush_args;
|
||||
std::vector<SuperVersionContext>& superversion_contexts =
|
||||
job_context->superversion_contexts;
|
||||
autovector<ColumnFamilyData*> column_families_not_to_flush;
|
||||
while (!flush_queue_.empty()) {
|
||||
// This cfd is already referenced
|
||||
const FlushRequest& flush_req = PopFirstFromFlushQueue();
|
||||
@ -2090,9 +2095,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
|
||||
ColumnFamilyData* cfd = iter.first;
|
||||
if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
|
||||
// can't flush this CF, try next one
|
||||
if (cfd->Unref()) {
|
||||
delete cfd;
|
||||
}
|
||||
column_families_not_to_flush.push_back(cfd);
|
||||
continue;
|
||||
}
|
||||
superversion_contexts.emplace_back(SuperVersionContext(true));
|
||||
@ -2131,6 +2134,11 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto cfd : column_families_not_to_flush) {
|
||||
if (cfd->Unref()) {
|
||||
delete cfd;
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
@ -2663,8 +2671,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
|
||||
c->mutable_cf_options()->report_bg_io_stats, dbname_,
|
||||
&compaction_job_stats, thread_pri,
|
||||
immutable_db_options_.max_subcompactions <= 1 ? &fetch_callback
|
||||
: nullptr);
|
||||
immutable_db_options_.max_subcompactions <= 1 &&
|
||||
c->mutable_cf_options()->snap_refresh_nanos > 0
|
||||
? &fetch_callback
|
||||
: nullptr);
|
||||
compaction_job.Prepare();
|
||||
|
||||
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
|
||||
|
@ -491,6 +491,30 @@ TEST_F(DBRangeDelTest, CompactionRemovesCoveredMergeOperands) {
|
||||
ASSERT_EQ(expected, actual);
|
||||
}
|
||||
|
||||
TEST_F(DBRangeDelTest, PutDeleteRangeMergeFlush) {
|
||||
// Test the sequence of operations: (1) Put, (2) DeleteRange, (3) Merge, (4)
|
||||
// Flush. The `CompactionIterator` previously had a bug where we forgot to
|
||||
// check for covering range tombstones when processing the (1) Put, causing
|
||||
// it to reappear after the flush.
|
||||
Options opts = CurrentOptions();
|
||||
opts.merge_operator = MergeOperators::CreateUInt64AddOperator();
|
||||
Reopen(opts);
|
||||
|
||||
std::string val;
|
||||
PutFixed64(&val, 1);
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "key", val));
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
"key", "key_"));
|
||||
ASSERT_OK(db_->Merge(WriteOptions(), "key", val));
|
||||
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||
|
||||
ReadOptions read_opts;
|
||||
std::string expected, actual;
|
||||
ASSERT_OK(db_->Get(read_opts, "key", &actual));
|
||||
PutFixed64(&expected, 1);
|
||||
ASSERT_EQ(expected, actual);
|
||||
}
|
||||
|
||||
// NumTableFilesAtLevel() is not supported in ROCKSDB_LITE
|
||||
#ifndef ROCKSDB_LITE
|
||||
TEST_F(DBRangeDelTest, ObsoleteTombstoneCleanup) {
|
||||
|
@ -124,7 +124,7 @@ Status ExternalSstFileIngestionJob::Prepare(
|
||||
// We failed, remove all files that we copied into the db
|
||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||
if (f.internal_file_path.empty()) {
|
||||
break;
|
||||
continue;
|
||||
}
|
||||
Status s = env_->DeleteFile(f.internal_file_path);
|
||||
if (!s.ok()) {
|
||||
@ -255,6 +255,9 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
|
||||
// We failed to add the files to the database
|
||||
// remove all the files we copied
|
||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||
if (f.internal_file_path.empty()) {
|
||||
continue;
|
||||
}
|
||||
Status s = env_->DeleteFile(f.internal_file_path);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_WARN(db_options_.info_log,
|
||||
|
@ -2300,10 +2300,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
||||
new FaultInjectionTestEnv(env_));
|
||||
Options options = CurrentOptions();
|
||||
options.env = fault_injection_env.get();
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
column_families.push_back(handles_[0]);
|
||||
column_families.push_back(handles_[1]);
|
||||
column_families.push_back(handles_[2]);
|
||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||
for (auto& ifo : ifos) {
|
||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||
@ -2317,6 +2318,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||
|
||||
// Resize the true_data vector upon construction to avoid re-alloc
|
||||
std::vector<std::map<std::string, std::string>> true_data(
|
||||
column_families.size());
|
||||
@ -2324,8 +2328,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
|
||||
-1, true, true_data);
|
||||
ASSERT_OK(s);
|
||||
Close();
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||
options);
|
||||
ASSERT_EQ(3, handles_.size());
|
||||
int cf = 0;
|
||||
for (const auto& verify_map : true_data) {
|
||||
for (const auto& elem : verify_map) {
|
||||
@ -2357,10 +2362,11 @@ TEST_P(ExternalSSTFileTest,
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.env = fault_injection_env.get();
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||
const std::vector<std::map<std::string, std::string>> data_before_ingestion =
|
||||
{{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
|
||||
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}}};
|
||||
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}},
|
||||
{{"bar4", "bv4_0"}, {"bar5", "bv5_0"}, {"bar6", "bv6_0"}}};
|
||||
for (size_t i = 0; i != handles_.size(); ++i) {
|
||||
int cf = static_cast<int>(i);
|
||||
const auto& orig_data = data_before_ingestion[i];
|
||||
@ -2373,6 +2379,7 @@ TEST_P(ExternalSSTFileTest,
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
column_families.push_back(handles_[0]);
|
||||
column_families.push_back(handles_[1]);
|
||||
column_families.push_back(handles_[2]);
|
||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||
for (auto& ifo : ifos) {
|
||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||
@ -2386,6 +2393,8 @@ TEST_P(ExternalSSTFileTest,
|
||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||
// Resize the true_data vector upon construction to avoid re-alloc
|
||||
std::vector<std::map<std::string, std::string>> true_data(
|
||||
column_families.size());
|
||||
@ -2439,10 +2448,11 @@ TEST_P(ExternalSSTFileTest,
|
||||
dbfull()->ReleaseSnapshot(read_opts.snapshot);
|
||||
|
||||
Close();
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||
options);
|
||||
// Should see consistent state after ingestion for all column families even
|
||||
// without snapshot.
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ASSERT_EQ(3, handles_.size());
|
||||
int cf = 0;
|
||||
for (const auto& verify_map : true_data) {
|
||||
for (const auto& elem : verify_map) {
|
||||
@ -2472,10 +2482,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
||||
"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
column_families.push_back(handles_[0]);
|
||||
column_families.push_back(handles_[1]);
|
||||
column_families.push_back(handles_[2]);
|
||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||
for (auto& ifo : ifos) {
|
||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||
@ -2489,6 +2500,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||
|
||||
// Resize the true_data vector upon construction to avoid re-alloc
|
||||
std::vector<std::map<std::string, std::string>> true_data(
|
||||
column_families.size());
|
||||
@ -2508,8 +2522,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
|
||||
|
||||
fault_injection_env->SetFilesystemActive(true);
|
||||
Close();
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||
options);
|
||||
ASSERT_EQ(3, handles_.size());
|
||||
int cf = 0;
|
||||
for (const auto& verify_map : true_data) {
|
||||
for (const auto& elem : verify_map) {
|
||||
@ -2538,10 +2553,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
||||
"DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
column_families.push_back(handles_[0]);
|
||||
column_families.push_back(handles_[1]);
|
||||
column_families.push_back(handles_[2]);
|
||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||
for (auto& ifo : ifos) {
|
||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||
@ -2555,6 +2571,8 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||
// Resize the true_data vector upon construction to avoid re-alloc
|
||||
std::vector<std::map<std::string, std::string>> true_data(
|
||||
column_families.size());
|
||||
@ -2574,8 +2592,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
|
||||
|
||||
fault_injection_env->SetFilesystemActive(true);
|
||||
Close();
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||
options);
|
||||
ASSERT_EQ(3, handles_.size());
|
||||
int cf = 0;
|
||||
for (const auto& verify_map : true_data) {
|
||||
for (const auto& elem : verify_map) {
|
||||
@ -2595,7 +2614,7 @@ TEST_P(ExternalSSTFileTest,
|
||||
Options options = CurrentOptions();
|
||||
options.env = fault_injection_env.get();
|
||||
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||
|
||||
SyncPoint::GetInstance()->ClearTrace();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
@ -2613,6 +2632,7 @@ TEST_P(ExternalSSTFileTest,
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
column_families.push_back(handles_[0]);
|
||||
column_families.push_back(handles_[1]);
|
||||
column_families.push_back(handles_[2]);
|
||||
std::vector<IngestExternalFileOptions> ifos(column_families.size());
|
||||
for (auto& ifo : ifos) {
|
||||
ifo.allow_global_seqno = true; // Always allow global_seqno
|
||||
@ -2626,6 +2646,8 @@ TEST_P(ExternalSSTFileTest,
|
||||
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
|
||||
data.push_back(
|
||||
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
|
||||
// Resize the true_data vector upon construction to avoid re-alloc
|
||||
std::vector<std::map<std::string, std::string>> true_data(
|
||||
column_families.size());
|
||||
@ -2646,8 +2668,9 @@ TEST_P(ExternalSSTFileTest,
|
||||
fault_injection_env->DropUnsyncedFileData();
|
||||
fault_injection_env->SetFilesystemActive(true);
|
||||
Close();
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
|
||||
options);
|
||||
ASSERT_EQ(3, handles_.size());
|
||||
int cf = 0;
|
||||
for (const auto& verify_map : true_data) {
|
||||
for (const auto& elem : verify_map) {
|
||||
|
@ -277,8 +277,12 @@ void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id,
|
||||
AutoThreadOperationStageUpdater stage_updater(
|
||||
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
|
||||
const auto& memlist = current_->memlist_;
|
||||
bool atomic_flush = false;
|
||||
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
|
||||
MemTable* m = *it;
|
||||
if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
|
||||
atomic_flush = true;
|
||||
}
|
||||
if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) {
|
||||
break;
|
||||
}
|
||||
@ -292,7 +296,9 @@ void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id,
|
||||
ret->push_back(m);
|
||||
}
|
||||
}
|
||||
flush_requested_ = false; // start-flush request is complete
|
||||
if (!atomic_flush || num_flush_not_started_ == 0) {
|
||||
flush_requested_ = false; // start-flush request is complete
|
||||
}
|
||||
}
|
||||
|
||||
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
||||
|
@ -201,7 +201,15 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
|
||||
// want. Also if we're in compaction and it's a put, it would be nice to
|
||||
// run compaction filter on it.
|
||||
const Slice val = iter->value();
|
||||
const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr;
|
||||
const Slice* val_ptr;
|
||||
if (kTypeValue == ikey.type &&
|
||||
(range_del_agg == nullptr ||
|
||||
!range_del_agg->ShouldDelete(
|
||||
ikey, RangeDelPositioningMode::kForwardTraversal))) {
|
||||
val_ptr = &val;
|
||||
} else {
|
||||
val_ptr = nullptr;
|
||||
}
|
||||
std::string merge_result;
|
||||
s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr,
|
||||
merge_context_.GetOperands(), &merge_result, logger_,
|
||||
|
@ -353,7 +353,7 @@ class FilePickerMultiGet {
|
||||
struct FilePickerContext;
|
||||
|
||||
public:
|
||||
FilePickerMultiGet(std::vector<FileMetaData*>* files, MultiGetRange* range,
|
||||
FilePickerMultiGet(MultiGetRange* range,
|
||||
autovector<LevelFilesBrief>* file_levels,
|
||||
unsigned int num_levels, FileIndexer* file_indexer,
|
||||
const Comparator* user_comparator,
|
||||
@ -368,18 +368,12 @@ class FilePickerMultiGet {
|
||||
maybe_repeat_key_(false),
|
||||
current_level_range_(*range, range->begin(), range->end()),
|
||||
current_file_range_(*range, range->begin(), range->end()),
|
||||
#ifndef NDEBUG
|
||||
files_(files),
|
||||
#endif
|
||||
level_files_brief_(file_levels),
|
||||
is_hit_file_last_in_level_(false),
|
||||
curr_file_level_(nullptr),
|
||||
file_indexer_(file_indexer),
|
||||
user_comparator_(user_comparator),
|
||||
internal_comparator_(internal_comparator) {
|
||||
#ifdef NDEBUG
|
||||
(void)files;
|
||||
#endif
|
||||
for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
|
||||
fp_ctx_array_[iter.index()] =
|
||||
FilePickerContext(0, FileIndexer::kLevelMaxIndex);
|
||||
@ -416,6 +410,18 @@ class FilePickerMultiGet {
|
||||
bool file_hit = false;
|
||||
int cmp_largest = -1;
|
||||
if (curr_file_index >= curr_file_level_->num_files) {
|
||||
// In the unlikely case the next key is a duplicate of the current key,
|
||||
// and the current key is the last in the level and the internal key
|
||||
// was not found, we need to skip lookup for the remaining keys and
|
||||
// reset the search bounds
|
||||
if (batch_iter_ != current_level_range_.end()) {
|
||||
++batch_iter_;
|
||||
for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
|
||||
struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
|
||||
fp_ctx.search_left_bound = 0;
|
||||
fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
// Loops over keys in the MultiGet batch until it finds a file with
|
||||
@ -473,25 +479,6 @@ class FilePickerMultiGet {
|
||||
} else {
|
||||
file_hit = true;
|
||||
}
|
||||
#ifndef NDEBUG
|
||||
// Sanity check to make sure that the files are correctly sorted
|
||||
if (f != prev_file_) {
|
||||
if (prev_file_) {
|
||||
if (curr_level_ != 0) {
|
||||
int comp_sign = internal_comparator_->Compare(
|
||||
prev_file_->largest_key, f->smallest_key);
|
||||
assert(comp_sign < 0);
|
||||
} else if (fp_ctx.curr_index_in_curr_level > 0) {
|
||||
// level == 0, the current file cannot be newer than the previous
|
||||
// one. Use compressed data structure, has no attribute seqNo
|
||||
assert(!NewestFirstBySeqNo(
|
||||
files_[0][fp_ctx.curr_index_in_curr_level],
|
||||
files_[0][fp_ctx.curr_index_in_curr_level - 1]));
|
||||
}
|
||||
}
|
||||
prev_file_ = f;
|
||||
}
|
||||
#endif
|
||||
if (cmp_largest == 0) {
|
||||
// cmp_largest is 0, which means the next key will not be in this
|
||||
// file, so stop looking further. Also don't increment megt_iter_
|
||||
@ -533,7 +520,10 @@ class FilePickerMultiGet {
|
||||
// any further for that key, so advance batch_iter_. Else, keep
|
||||
// batch_iter_ positioned on that key so we look it up again in
|
||||
// the next file
|
||||
if (current_level_range_.CheckKeyDone(batch_iter_)) {
|
||||
// For L0, always advance the key because we will look in the next
|
||||
// file regardless for all keys not found yet
|
||||
if (current_level_range_.CheckKeyDone(batch_iter_) ||
|
||||
curr_level_ == 0) {
|
||||
++batch_iter_;
|
||||
}
|
||||
}
|
||||
@ -601,7 +591,8 @@ class FilePickerMultiGet {
|
||||
unsigned int start_index_in_curr_level;
|
||||
|
||||
FilePickerContext(int32_t left, int32_t right)
|
||||
: search_left_bound(left), search_right_bound(right) {}
|
||||
: search_left_bound(left), search_right_bound(right),
|
||||
curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
|
||||
|
||||
FilePickerContext() = default;
|
||||
};
|
||||
@ -619,9 +610,6 @@ class FilePickerMultiGet {
|
||||
bool maybe_repeat_key_;
|
||||
MultiGetRange current_level_range_;
|
||||
MultiGetRange current_file_range_;
|
||||
#ifndef NDEBUG
|
||||
std::vector<FileMetaData*>* files_;
|
||||
#endif
|
||||
autovector<LevelFilesBrief>* level_files_brief_;
|
||||
bool search_ended_;
|
||||
bool is_hit_file_last_in_level_;
|
||||
@ -629,9 +617,6 @@ class FilePickerMultiGet {
|
||||
FileIndexer* file_indexer_;
|
||||
const Comparator* user_comparator_;
|
||||
const InternalKeyComparator* internal_comparator_;
|
||||
#ifndef NDEBUG
|
||||
FdWithKeyRange* prev_file_;
|
||||
#endif
|
||||
|
||||
// Setup local variables to search next level.
|
||||
// Returns false if there are no more levels to search.
|
||||
@ -640,9 +625,6 @@ class FilePickerMultiGet {
|
||||
MultiGetRange::Iterator mget_iter = current_level_range_.begin();
|
||||
if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
|
||||
curr_file_level_->num_files) {
|
||||
#ifndef NDEBUG
|
||||
prev_file_ = nullptr;
|
||||
#endif
|
||||
batch_iter_prev_ = current_level_range_.begin();
|
||||
batch_iter_ = current_level_range_.begin();
|
||||
return true;
|
||||
@ -738,9 +720,6 @@ class FilePickerMultiGet {
|
||||
fp_ctx.curr_index_in_curr_level = start_index;
|
||||
}
|
||||
if (level_contains_keys) {
|
||||
#ifndef NDEBUG
|
||||
prev_file_ = nullptr;
|
||||
#endif
|
||||
batch_iter_prev_ = current_level_range_.begin();
|
||||
batch_iter_ = current_level_range_.begin();
|
||||
return true;
|
||||
@ -1758,12 +1737,16 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
||||
iter->value, nullptr, &(iter->merge_context),
|
||||
&iter->max_covering_tombstone_seq, this->env_, &iter->seq,
|
||||
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
|
||||
iter->get_context = &get_ctx.back();
|
||||
}
|
||||
int get_ctx_index = 0;
|
||||
for (auto iter = range->begin(); iter != range->end();
|
||||
++iter, get_ctx_index++) {
|
||||
iter->get_context = &(get_ctx[get_ctx_index]);
|
||||
}
|
||||
|
||||
MultiGetRange file_picker_range(*range, range->begin(), range->end());
|
||||
FilePickerMultiGet fp(
|
||||
storage_info_.files_, &file_picker_range,
|
||||
&file_picker_range,
|
||||
&storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
|
||||
&storage_info_.file_indexer_, user_comparator(), internal_comparator());
|
||||
FdWithKeyRange* f = fp.GetNextFile();
|
||||
|
@ -275,10 +275,12 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
|
||||
// this option helps reducing the cpu usage of long-running compactions. The
|
||||
// feature is disabled when max_subcompactions is greater than one.
|
||||
//
|
||||
// Default: 0.5s
|
||||
// NOTE: This feautre is currently incompatible with RangeDeletes.
|
||||
//
|
||||
// Default: 0
|
||||
//
|
||||
// Dynamically changeable through SetOptions() API
|
||||
uint64_t snap_refresh_nanos = 500 * 1000 * 1000; // 0.5s
|
||||
uint64_t snap_refresh_nanos = 0;
|
||||
|
||||
// Disable automatic compactions. Manual compactions can still
|
||||
// be issued on this column family
|
||||
|
@ -5,8 +5,8 @@
|
||||
#pragma once
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 1
|
||||
#define ROCKSDB_PATCH 1
|
||||
#define ROCKSDB_MINOR 2
|
||||
#define ROCKSDB_PATCH 4
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -123,7 +123,7 @@ class MultiGetContext {
|
||||
KeyContext** sorted_keys_;
|
||||
size_t num_keys_;
|
||||
uint64_t value_mask_;
|
||||
std::unique_ptr<char> lookup_key_heap_buf;
|
||||
std::unique_ptr<char[]> lookup_key_heap_buf;
|
||||
LookupKey* lookup_key_ptr_;
|
||||
|
||||
public:
|
||||
|
@ -65,6 +65,7 @@ default_params = {
|
||||
"writepercent": 35,
|
||||
"format_version": lambda: random.randint(2, 4),
|
||||
"index_block_restart_interval": lambda: random.choice(range(1, 16)),
|
||||
"use_multiget" : lambda: random.randint(0, 1),
|
||||
}
|
||||
|
||||
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'
|
||||
@ -342,8 +343,9 @@ def whitebox_crash_main(args, unknown_args):
|
||||
if additional_opts['kill_random_test'] is None and (retncode == 0):
|
||||
# we expect zero retncode if no kill option
|
||||
expected = True
|
||||
elif additional_opts['kill_random_test'] is not None and retncode < 0:
|
||||
# we expect negative retncode if kill option was given
|
||||
elif additional_opts['kill_random_test'] is not None and retncode <= 0:
|
||||
# When kill option is given, the test MIGHT kill itself.
|
||||
# If it does, negative retncode is expected. Otherwise 0.
|
||||
expected = True
|
||||
|
||||
if not expected:
|
||||
|
@ -455,6 +455,9 @@ DEFINE_uint64(snapshot_hold_ops, 0,
|
||||
"If non-zero, then releases snapshots N operations after they're "
|
||||
"acquired.");
|
||||
|
||||
DEFINE_bool(use_multiget, false,
|
||||
"If set, use the batched MultiGet API for reads");
|
||||
|
||||
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
|
||||
if (value < 0 || value>100) {
|
||||
fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n",
|
||||
@ -1725,6 +1728,27 @@ class StressTest {
|
||||
return base_key + thread->rand.Next() % FLAGS_active_width;
|
||||
}
|
||||
|
||||
static std::vector<int64_t> GenerateNKeys(
|
||||
ThreadState* thread,
|
||||
int num_keys,
|
||||
uint64_t iteration) {
|
||||
const double completed_ratio =
|
||||
static_cast<double>(iteration) / FLAGS_ops_per_thread;
|
||||
const int64_t base_key = static_cast<int64_t>(
|
||||
completed_ratio * (FLAGS_max_key - FLAGS_active_width));
|
||||
std::vector<int64_t> keys;
|
||||
keys.reserve(num_keys);
|
||||
int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
|
||||
keys.push_back(next_key);
|
||||
for (int i = 1; i < num_keys; ++i) {
|
||||
// This may result in some duplicate keys
|
||||
next_key = next_key + thread->rand.Next() %
|
||||
(FLAGS_active_width - (next_key - base_key));
|
||||
keys.push_back(next_key);
|
||||
}
|
||||
return keys;
|
||||
}
|
||||
|
||||
static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) {
|
||||
size_t value_sz =
|
||||
((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
|
||||
@ -2162,7 +2186,14 @@ class StressTest {
|
||||
int prob_op = thread->rand.Uniform(100);
|
||||
if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
|
||||
// OPERATION read
|
||||
TestGet(thread, read_opts, rand_column_families, rand_keys);
|
||||
if (FLAGS_use_multiget) {
|
||||
int num_keys = thread->rand.Uniform(64);
|
||||
rand_keys = GenerateNKeys(thread, num_keys, i);
|
||||
TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
|
||||
i += num_keys - 1;
|
||||
} else {
|
||||
TestGet(thread, read_opts, rand_column_families, rand_keys);
|
||||
}
|
||||
} else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
|
||||
// OPERATION prefix scan
|
||||
// keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
|
||||
@ -2211,6 +2242,11 @@ class StressTest {
|
||||
const std::vector<int>& rand_column_families,
|
||||
const std::vector<int64_t>& rand_keys) = 0;
|
||||
|
||||
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
|
||||
const ReadOptions& read_opts,
|
||||
const std::vector<int>& rand_column_families,
|
||||
const std::vector<int64_t>& rand_keys) = 0;
|
||||
|
||||
virtual Status TestPrefixScan(ThreadState* thread,
|
||||
const ReadOptions& read_opts,
|
||||
const std::vector<int>& rand_column_families,
|
||||
@ -2546,6 +2582,8 @@ class StressTest {
|
||||
fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
|
||||
fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
|
||||
FLAGS_subcompactions);
|
||||
fprintf(stdout, "Use MultiGet : %s\n",
|
||||
FLAGS_use_multiget ? "true" : "false");
|
||||
|
||||
const char* memtablerep = "";
|
||||
switch (FLAGS_rep_factory) {
|
||||
@ -3012,6 +3050,40 @@ class NonBatchedOpsStressTest : public StressTest {
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
|
||||
const ReadOptions& read_opts,
|
||||
const std::vector<int>& rand_column_families,
|
||||
const std::vector<int64_t>& rand_keys) {
|
||||
size_t num_keys = rand_keys.size();
|
||||
std::vector<std::string> key_str;
|
||||
std::vector<Slice> keys;
|
||||
key_str.reserve(num_keys);
|
||||
keys.reserve(num_keys);
|
||||
std::vector<PinnableSlice> values(num_keys);
|
||||
std::vector<Status> statuses(num_keys);
|
||||
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
|
||||
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
key_str.emplace_back(Key(rand_keys[i]));
|
||||
keys.emplace_back(key_str.back());
|
||||
}
|
||||
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
|
||||
statuses.data());
|
||||
for (const auto& s : statuses) {
|
||||
if (s.ok()) {
|
||||
// found case
|
||||
thread->stats.AddGets(1, 1);
|
||||
} else if (s.IsNotFound()) {
|
||||
// not found case
|
||||
thread->stats.AddGets(1, 0);
|
||||
} else {
|
||||
// errors case
|
||||
thread->stats.AddErrors(1);
|
||||
}
|
||||
}
|
||||
return statuses;
|
||||
}
|
||||
|
||||
virtual Status TestPrefixScan(ThreadState* thread,
|
||||
const ReadOptions& read_opts,
|
||||
const std::vector<int>& rand_column_families,
|
||||
@ -3532,6 +3604,76 @@ class BatchedOpsStressTest : public StressTest {
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
|
||||
const ReadOptions& readoptions,
|
||||
const std::vector<int>& rand_column_families,
|
||||
const std::vector<int64_t>& rand_keys) {
|
||||
size_t num_keys = rand_keys.size();
|
||||
std::vector<Status> ret_status(num_keys);
|
||||
std::array<std::string, 10> keys = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
|
||||
size_t num_prefixes = keys.size();
|
||||
for (size_t rand_key = 0; rand_key < num_keys; ++rand_key) {
|
||||
std::vector<Slice> key_slices;
|
||||
std::vector<PinnableSlice> values(num_prefixes);
|
||||
std::vector<Status> statuses(num_prefixes);
|
||||
ReadOptions readoptionscopy = readoptions;
|
||||
readoptionscopy.snapshot = db_->GetSnapshot();
|
||||
std::vector<std::string> key_str;
|
||||
key_str.reserve(num_prefixes);
|
||||
key_slices.reserve(num_prefixes);
|
||||
std::string from_db;
|
||||
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
|
||||
|
||||
for (size_t key = 0; key < num_prefixes; ++key) {
|
||||
key_str.emplace_back(keys[key] + Key(rand_keys[rand_key]));
|
||||
key_slices.emplace_back(key_str.back());
|
||||
}
|
||||
db_->MultiGet(readoptionscopy, cfh, num_prefixes, key_slices.data(),
|
||||
values.data(), statuses.data());
|
||||
for (size_t i = 0; i < num_prefixes; i++) {
|
||||
Status s = statuses[i];
|
||||
if (!s.ok() && !s.IsNotFound()) {
|
||||
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
|
||||
thread->stats.AddErrors(1);
|
||||
ret_status[rand_key] = s;
|
||||
// we continue after error rather than exiting so that we can
|
||||
// find more errors if any
|
||||
} else if (s.IsNotFound()) {
|
||||
thread->stats.AddGets(1, 0);
|
||||
ret_status[rand_key] = s;
|
||||
} else {
|
||||
char expected_prefix = (keys[i])[0];
|
||||
char actual_prefix = (values[i])[0];
|
||||
if (actual_prefix != expected_prefix) {
|
||||
fprintf(stderr, "error expected prefix = %c actual = %c\n",
|
||||
expected_prefix, actual_prefix);
|
||||
}
|
||||
std::string str;
|
||||
str.assign(values[i].data(), values[i].size());
|
||||
values[i].Reset();
|
||||
str[0] = ' '; // blank out the differing character
|
||||
values[i].PinSelf(str);
|
||||
thread->stats.AddGets(1, 1);
|
||||
}
|
||||
}
|
||||
db_->ReleaseSnapshot(readoptionscopy.snapshot);
|
||||
|
||||
// Now that we retrieved all values, check that they all match
|
||||
for (size_t i = 1; i < num_prefixes; i++) {
|
||||
if (values[i] != values[0]) {
|
||||
fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
|
||||
key_str[i].c_str(),
|
||||
StringToHex(values[0].ToString()).c_str(),
|
||||
StringToHex(values[i].ToString()).c_str());
|
||||
// we continue after error rather than exiting so that we can
|
||||
// find more errors if any
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret_status;
|
||||
}
|
||||
|
||||
// Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
|
||||
// in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
|
||||
// of the key. Each of these 10 scans returns a series of values;
|
||||
@ -3747,6 +3889,39 @@ class AtomicFlushStressTest : public StressTest {
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
|
||||
const ReadOptions& read_opts,
|
||||
const std::vector<int>& rand_column_families,
|
||||
const std::vector<int64_t>& rand_keys) {
|
||||
int num_keys = rand_keys.size();
|
||||
std::vector<std::string> key_str;
|
||||
std::vector<Slice> keys;
|
||||
keys.reserve(num_keys);
|
||||
key_str.reserve(num_keys);
|
||||
std::vector<PinnableSlice> values(num_keys);
|
||||
std::vector<Status> statuses(num_keys);
|
||||
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
|
||||
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
key_str.emplace_back(Key(rand_keys[i]));
|
||||
keys.emplace_back(key_str.back());
|
||||
}
|
||||
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), statuses.data());
|
||||
for (auto s : statuses) {
|
||||
if (s.ok()) {
|
||||
// found case
|
||||
thread->stats.AddGets(1, 1);
|
||||
} else if (s.IsNotFound()) {
|
||||
// not found case
|
||||
thread->stats.AddGets(1, 0);
|
||||
} else {
|
||||
// errors case
|
||||
thread->stats.AddErrors(1);
|
||||
}
|
||||
}
|
||||
return statuses;
|
||||
}
|
||||
|
||||
virtual Status TestPrefixScan(ThreadState* thread,
|
||||
const ReadOptions& readoptions,
|
||||
const std::vector<int>& rand_column_families,
|
||||
|
Loading…
Reference in New Issue
Block a user