Compare commits

...

20 Commits
main ... 6.2.fb

Author SHA1 Message Date
Vijay Nadimpalli
3513d4e93f Making platform 007 (gcc 7) default in build_detect_platform.sh (#5947)
Summary:
Making platform 007 (gcc 7) default in build_detect_platform.sh.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5947

Differential Revision: D18038837

Pulled By: vjnadimpalli

fbshipit-source-id: 9ac2ddaa93bf328a416faec028970e039886378e
2019-10-30 10:29:05 -07:00
myabandeh
76a56d89a7 Bump version to 6.2.4 2019-09-18 14:42:43 -07:00
Maysam Yabandeh
dd757ffde0 Disable snapshot refresh feature when snap_refresh_nanos is 0 (#5724)
Summary:
The comments of snap_refresh_nanos advertise that the snapshot refresh feature will be disabled when the option is set to 0. This contract is however not honored in the code: https://github.com/facebook/rocksdb/pull/5278
The patch fixes that and also adds an assert to ensure that the feature is not used when the option  is zero.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5724

Differential Revision: D16918185

Pulled By: maysamyabandeh

fbshipit-source-id: fec167287df7d85093e087fc39c0eb243e3bbd7e
2019-09-18 14:39:45 -07:00
Maysam Yabandeh
fdc56e2b73 Declare snapshot refresh incompatible with delete range (#5625)
Summary:
The ::snap_refresh_nanos option is incompatible with DeleteRange feature. Currently the code relies on range_del_agg.IsEmpty() to disable it if there are range delete tombstones. However ::IsEmpty does not guarantee that there is no RangeDelete tombstones in the SST files. The patch declares the two features incompatible in inline comments until we later figure how to properly detect the presence of RangeDelete tombstones in compaction inputs.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5625

Differential Revision: D16468218

Pulled By: maysamyabandeh

fbshipit-source-id: bd7beca278bc7e1db75e7ee4522d05a3a6ca86f4
2019-09-18 14:37:22 -07:00
Maysam Yabandeh
cfd6eb3e68 Disable refresh snapshot feature by default (#5606)
Summary:
There are concerns about the correctness of this patch. Disabling by default until the concerns are resolved.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5606

Differential Revision: D16428064

Pulled By: maysamyabandeh

fbshipit-source-id: a89280f0ea85796c9c9dfbfd9a8e91dad9b000b3
2019-09-18 14:37:16 -07:00
Maysam Yabandeh
9623a66b24 Refresh snapshot list during long compactions (2nd attempt) (#5278)
Summary:
Part of compaction cpu goes to processing snapshot list, the larger the list the bigger the overhead. Although the lifetime of most of the snapshots is much shorter than the lifetime of compactions, the compaction conservatively operates on the list of snapshots that it initially obtained. This patch allows the snapshot list to be updated via a callback if the compaction is taking long. This should let the compaction to continue more efficiently with much smaller snapshot list.
For simplicity, to avoid the feature is disabled in two cases: i) When more than one sub-compaction are sharing the same snapshot list, ii) when Range Delete is used in which the range delete aggregator has its own copy of snapshot list.
This fixes the reverted https://github.com/facebook/rocksdb/pull/5099 issue with range deletes.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5278

Differential Revision: D15203291

Pulled By: maysamyabandeh

fbshipit-source-id: fa645611e606aa222c7ce53176dc5bb6f259c258
2019-09-18 14:35:50 -07:00
Maysam Yabandeh
47f0d5fdd0 Revert snap_refresh_nanos feature (#5269)
Summary:
Our daily stress tests are failing after this feature. Reverting temporarily until we figure the reason for test failures.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5269

Differential Revision: D15151285

Pulled By: maysamyabandeh

fbshipit-source-id: e4002b99690a97df30d4b4b58bf0f61e9591bc6e
2019-09-18 14:30:55 -07:00
Yanqin Jin
3542f7ef00 Bump patch version 2019-09-03 13:41:22 -07:00
Yanqin Jin
5c12a474f3 Fix a bug in file ingestion (#5760)
Summary:
Before this PR, when the number of column families involved in a file ingestion exceeds 2, a bug in the looping logic prevents correct file number being assigned to each ingestion job.
Also skip deleting non-existing hard links during cleanup-after-failure.

Test plan (devserver)
```
$COMPILE_WITH_ASAN=1 make all
$./external_sst_file_test --gtest_filter=ExternalSSTFileTest/ExternalSSTFileTest.IngestFilesIntoMultipleColumnFamilies_*/*
$makke check
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5760

Differential Revision: D17142982

Pulled By: riversand963

fbshipit-source-id: 06c1847a4e7a402647bcf28d124e70f2a0f9daf6
2019-09-03 13:29:55 -07:00
Yanqin Jin
6d113fc066 Update HISTORY and bump version 2019-06-07 16:23:07 -07:00
Yanqin Jin
11afcbe7ba Disable dynamic extension support by default for CMake (#5419)
Summary:
We have users reporting linking error while building RocksDB using CMake, and we do not enable dynamic extension feature for them. The fix is to add `-DROCKSDB_NO_DYNAMIC_EXTENSION` to CMake by default.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5419

Differential Revision: D15676792

Pulled By: riversand963

fbshipit-source-id: d45aaacfc64ea61646fd7329c352cd760145baf3
2019-06-05 14:09:27 -07:00
Levi Tamasi
b6e554e698 Bump version to 6.2.1 2019-06-04 16:38:51 -07:00
Andrew Kryczka
a1f08cc953 Fix merging range tombstone covering put during flush/compaction (#5406)
Summary:
Flush/compaction use `MergeUntil` which has a special code path to
handle a merge ending with a non-`Merge` point key. In particular if
that key is a `Put` we forgot to check whether it is covered by a range
tombstone. If it is covered then we must not include it in the following call
to `TimedFullMerge`.

Fixes #5392.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5406

Differential Revision: D15611144

Pulled By: sagar0

fbshipit-source-id: ba6a7863ca2d043f591de78fd0c4f4561f0c500e
2019-06-04 13:44:37 -07:00
anand76
0d9bfa6e4d Fix a bug in db_stress and an incorrect assertion in FilePickerMultiGet (#5301)
Summary:
This PR has two fixes for crash test failures -
1. Fix a bug in TestMultiGet() in db_stress that was passing list of key to MultiGet() in the wrong order, thus ensuring that actual values don't match expected values
2. Remove an incorrect assertion in FilePickerMultiGet::GetNextFileInLevelWithKeys() that checks that files in a level are in sorted order. This is not true with MultiGet(), especially if there are duplicate keys and we may have to go back one file for the next key. Furthermore, this assertion makes more sense when a new version is created, rather than at lookup time

Test -
asan_crash and ubsan_crash tests
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5301

Differential Revision: D15337383

Pulled By: anand1976

fbshipit-source-id: 35092cb15bbc1700e5e823cbe07bfa62f1e9e6c6
2019-05-24 13:22:04 -07:00
anand76
9feb730c6e Fix bugs in FilePickerMultiGet (#5292)
Summary:
This PR fixes a couple of bugs in FilePickerMultiGet that were causing db_stress test failures. The failures were caused by -
1. Improper handling of a key that matches the user key portion of an L0 file's largest key. In this case, the curr_index_in_curr_level file index in L0 for that key was getting incremented, but batch_iter_ was not advanced. By design, all keys in a batch are supposed to be checked against an L0 file before advancing to the next L0 file. Not advancing to the next key in the batch was causing a double increment of curr_index_in_curr_level due to the same key being processed again
2. Improper handling of a key that matches the user key portion of the largest key in the last file of L1 and higher. This was resulting in a premature end to the processing of the batch for that level when the next key in the batch is a duplicate. Typically, the keys in MultiGet will not be duplicates, but its good to handle that case correctly

Test -
asan_crash
make check
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5292

Differential Revision: D15282530

Pulled By: anand1976

fbshipit-source-id: d1a6a86e0af273169c3632db22a44d79c66a581f
2019-05-24 13:19:55 -07:00
Zhongyi Xie
70dca18c96 multiget: fix memory issues due to vector auto resizing (#5279)
Summary:
This PR fixes three memory issues found by ASAN
* in db_stress, the key vector for MultiGet is created using `emplace_back` which could potentially invalidates references to the underlying storage (vector<string>) due to auto resizing. Fix by calling reserve in advance.
* Similar issue in construction of GetContext autovector in version_set.cc
* In multiget_context.h use T[] specialization for unique_ptr that holds a char array
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5279

Differential Revision: D15202893

Pulled By: miasantreble

fbshipit-source-id: 14cc2cda0ed64d29f2a1e264a6bfdaa4294ee75d
2019-05-24 13:19:06 -07:00
anand76
5f703af1ea Add option to use MultiGet in db_stress (#5264)
Summary:
The new option will pick a batch size randomly in the range 1-64. It will then space the keys in the batch by random intervals.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5264

Differential Revision: D15175522

Pulled By: anand1976

fbshipit-source-id: c16baa69d0f1ff4cf53c55c813ddd82c8aeb58fc
2019-05-24 13:18:02 -07:00
Yanqin Jin
8baa66acf6 Update HISTORY.md 2019-05-15 11:26:03 -07:00
Yanqin Jin
570d490a3d Fix a race condition caused by unlocking db mutex (#5294)
Summary:
Previous code may call `~ColumnFamilyData` in `DBImpl::AtomicFlushMemTablesToOutputFiles` if the column family is dropped or `cfd->IsFlushPending() == false`. In `~ColumnFamilyData`, the db mutex is released briefly and re-acquired. This can cause correctness issue. The reason is as follows.

Assume there are more bg flush threads. After bg_flush_thr1 releases the db mutex, bg_flush_thr2 can grab it and pop an element from the flush queue. This will cause bg_flush_thr2 to accidentally pick some memtables which should have been picked by bg_flush_thr1. To make the matter worse, bg_flush_thr2 can clear `flush_requested_` flag for the memtable list, causing a subsequent call to `MemTableList::IsFlushPending()` by bg_flush_thr1 to return false, which is wrong.

The fix is to delay `ColumnFamilyData::Unref` and `~ColumnFamilyData` for column families not selected for flush until `AtomicFlushMemTablesToOutputFiles` returns. Furthermore, a bg flush thread should not clear `MemTableList::flush_requested_` in `MemTableList::PickMemtablesToFlush` unless atomic flush is not used **or** the memtable list does not have unpicked memtables.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5294

Differential Revision: D15295297

Pulled By: riversand963

fbshipit-source-id: 03b101205ca22c242647cbf488bcf0ed80b2ecbd
2019-05-15 11:23:47 -07:00
Fosco Marotto
55320dedab Update history and version for future 6.2.0 2019-04-30 13:07:04 -07:00
20 changed files with 375 additions and 84 deletions

View File

@ -317,6 +317,10 @@ if(DISABLE_STALL_NOTIF)
add_definitions(-DROCKSDB_DISABLE_STALL_NOTIFICATION)
endif()
option(WITH_DYNAMIC_EXTENSION "build with dynamic extension support" OFF)
if(NOT WITH_DYNAMIC_EXTENSION)
add_definitions(-DROCKSDB_NO_DYNAMIC_EXTENSION)
endif()
if(DEFINED USE_RTTI)
if(USE_RTTI)

View File

@ -1,5 +1,21 @@
# Rocksdb Change Log
## Unreleased
## 6.2.4 (9/18/2019)
### Bug Fixes
* Disable snap_refresh_nanos by default. The feature is to be deprecated in the next release.
## 6.2.3 (9/3/2019)
### Bug Fixes
* Fix a bug in file ingestion caused by incorrect file number allocation when the number of column families involved in the ingestion exceeds 2.
## 6.2.2 (6/7/2019)
### Bug Fixes
* Disable dynamic extension support by default for CMake.
## 6.2.1 (6/4/2019)
### Bug Fixes
* Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level.
## 6.2.0 (4/30/2019)
### New Features
* Add an option `strict_bytes_per_sync` that causes a file-writing thread to block rather than exceed the limit on bytes pending writeback specified by `bytes_per_sync` or `wal_bytes_per_sync`.
* Improve range scan performance by avoiding per-key upper bound check in BlockBasedTableIterator.
@ -19,6 +35,7 @@
* Fix a race condition between WritePrepared::Get and ::Put with duplicate keys.
* Fix crash when memtable prefix bloom is enabled and read/write a key out of domain of prefix extractor.
* Close a WAL file before another thread deletes it.
* Fix an assertion failure `IsFlushPending() == true` caused by one bg thread releasing the db mutex in ~ColumnFamilyData and another thread clearing `flush_requested_` flag.
## 6.1.1 (4/9/2019)
### New Features

View File

@ -56,10 +56,10 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
if [ -n "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
# we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh"
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_PLATFORM007" ]; then
source "$PWD/build_tools/fbcode_config_platform007.sh"
else
elif [ -n "$ROCKSDB_FBCODE_BUILD_WITH_5xx" ]; then
source "$PWD/build_tools/fbcode_config.sh"
else
source "$PWD/build_tools/fbcode_config_platform007.sh"
fi
fi

View File

@ -39,6 +39,7 @@ class SnapshotListFetchCallback {
virtual void Refresh(std::vector<SequenceNumber>* snapshots,
SequenceNumber max) = 0;
inline bool TimeToRefresh(const size_t key_index) {
assert(snap_refresh_nanos_ != 0);
// skip the key if key_index % every_nth_key (which is of power 2) is not 0.
if ((key_index & every_nth_key_minus_one_) != 0) {
return false;

View File

@ -893,7 +893,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
&range_del_agg, sub_compact->compaction, compaction_filter,
shutting_down_, preserve_deletes_seqnum_, snap_list_callback_));
shutting_down_, preserve_deletes_seqnum_,
// Currently range_del_agg is incompatible with snapshot refresh feature.
range_del_agg.IsEmpty() ? snap_list_callback_ : nullptr));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {

View File

@ -966,7 +966,7 @@ TEST_F(CompactionJobTest, SnapshotRefresh) {
public:
SnapshotListFetchCallbackTest(Env* env, Random64& rand,
std::vector<SequenceNumber>* snapshots)
: SnapshotListFetchCallback(env, 0 /*no time delay*/,
: SnapshotListFetchCallback(env, 1 /*short time delay*/,
1 /*fetch after each key*/),
rand_(rand),
snapshots_(snapshots) {}

View File

@ -514,6 +514,37 @@ TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {
ASSERT_EQ("value", Get(0, "key"));
}
TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {
bool atomic_flush = GetParam();
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
options.max_write_buffer_number = 4;
// Set min_write_buffer_number_to_merge to be greater than 1, so that
// a column family with one memtable in the imm will not cause IsFlushPending
// to return true when flush_requested_ is false.
options.min_write_buffer_number_to_merge = 2;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ASSERT_OK(dbfull()->PauseBackgroundWork());
ASSERT_OK(Put(0, "key00", "value00"));
ASSERT_OK(Put(1, "key10", "value10"));
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
ASSERT_OK(Put(0, "key01", "value01"));
// Since max_write_buffer_number is 4, the following flush won't cause write
// stall.
ASSERT_OK(dbfull()->Flush(flush_opts));
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
handles_[1] = nullptr;
ASSERT_OK(dbfull()->ContinueBackgroundWork());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
delete handles_[0];
handles_.clear();
}
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());

View File

@ -3538,9 +3538,9 @@ Status DBImpl::IngestExternalFiles(
exec_results.emplace_back(false, Status::OK());
}
// TODO(yanqin) maybe make jobs run in parallel
uint64_t start_file_number = next_file_number;
for (size_t i = 1; i != num_cfs; ++i) {
uint64_t start_file_number =
next_file_number + args[i - 1].external_files.size();
start_file_number += args[i - 1].external_files.size();
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);

View File

@ -798,6 +798,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
return s;
}
namespace {
class SnapshotListFetchCallbackImpl : public SnapshotListFetchCallback {
public:
SnapshotListFetchCallbackImpl(DBImpl* db_impl, Env* env,
@ -820,6 +821,7 @@ class SnapshotListFetchCallbackImpl : public SnapshotListFetchCallback {
DBImpl* db_impl_;
Logger* info_log_;
};
} // namespace
Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
@ -1005,8 +1007,10 @@ Status DBImpl::CompactFilesImpl(
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER,
immutable_db_options_.max_subcompactions <= 1 ? &fetch_callback
: nullptr);
immutable_db_options_.max_subcompactions <= 1 &&
c->mutable_cf_options()->snap_refresh_nanos > 0
? &fetch_callback
: nullptr);
// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
@ -2080,6 +2084,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
autovector<BGFlushArg> bg_flush_args;
std::vector<SuperVersionContext>& superversion_contexts =
job_context->superversion_contexts;
autovector<ColumnFamilyData*> column_families_not_to_flush;
while (!flush_queue_.empty()) {
// This cfd is already referenced
const FlushRequest& flush_req = PopFirstFromFlushQueue();
@ -2090,9 +2095,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
ColumnFamilyData* cfd = iter.first;
if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
// can't flush this CF, try next one
if (cfd->Unref()) {
delete cfd;
}
column_families_not_to_flush.push_back(cfd);
continue;
}
superversion_contexts.emplace_back(SuperVersionContext(true));
@ -2131,6 +2134,11 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
}
}
}
for (auto cfd : column_families_not_to_flush) {
if (cfd->Unref()) {
delete cfd;
}
}
return status;
}
@ -2663,8 +2671,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri,
immutable_db_options_.max_subcompactions <= 1 ? &fetch_callback
: nullptr);
immutable_db_options_.max_subcompactions <= 1 &&
c->mutable_cf_options()->snap_refresh_nanos > 0
? &fetch_callback
: nullptr);
compaction_job.Prepare();
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,

View File

@ -491,6 +491,30 @@ TEST_F(DBRangeDelTest, CompactionRemovesCoveredMergeOperands) {
ASSERT_EQ(expected, actual);
}
TEST_F(DBRangeDelTest, PutDeleteRangeMergeFlush) {
// Test the sequence of operations: (1) Put, (2) DeleteRange, (3) Merge, (4)
// Flush. The `CompactionIterator` previously had a bug where we forgot to
// check for covering range tombstones when processing the (1) Put, causing
// it to reappear after the flush.
Options opts = CurrentOptions();
opts.merge_operator = MergeOperators::CreateUInt64AddOperator();
Reopen(opts);
std::string val;
PutFixed64(&val, 1);
ASSERT_OK(db_->Put(WriteOptions(), "key", val));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
"key", "key_"));
ASSERT_OK(db_->Merge(WriteOptions(), "key", val));
ASSERT_OK(db_->Flush(FlushOptions()));
ReadOptions read_opts;
std::string expected, actual;
ASSERT_OK(db_->Get(read_opts, "key", &actual));
PutFixed64(&expected, 1);
ASSERT_EQ(expected, actual);
}
// NumTableFilesAtLevel() is not supported in ROCKSDB_LITE
#ifndef ROCKSDB_LITE
TEST_F(DBRangeDelTest, ObsoleteTombstoneCleanup) {

View File

@ -124,7 +124,7 @@ Status ExternalSstFileIngestionJob::Prepare(
// We failed, remove all files that we copied into the db
for (IngestedFileInfo& f : files_to_ingest_) {
if (f.internal_file_path.empty()) {
break;
continue;
}
Status s = env_->DeleteFile(f.internal_file_path);
if (!s.ok()) {
@ -255,6 +255,9 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
// We failed to add the files to the database
// remove all the files we copied
for (IngestedFileInfo& f : files_to_ingest_) {
if (f.internal_file_path.empty()) {
continue;
}
Status s = env_->DeleteFile(f.internal_file_path);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,

View File

@ -2300,10 +2300,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2317,6 +2318,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
@ -2324,8 +2328,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
-1, true, true_data);
ASSERT_OK(s);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
ASSERT_EQ(3, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
@ -2357,10 +2362,11 @@ TEST_P(ExternalSSTFileTest,
Options options = CurrentOptions();
options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
const std::vector<std::map<std::string, std::string>> data_before_ingestion =
{{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}}};
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}},
{{"bar4", "bv4_0"}, {"bar5", "bv5_0"}, {"bar6", "bv6_0"}}};
for (size_t i = 0; i != handles_.size(); ++i) {
int cf = static_cast<int>(i);
const auto& orig_data = data_before_ingestion[i];
@ -2373,6 +2379,7 @@ TEST_P(ExternalSSTFileTest,
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2386,6 +2393,8 @@ TEST_P(ExternalSSTFileTest,
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
@ -2439,10 +2448,11 @@ TEST_P(ExternalSSTFileTest,
dbfull()->ReleaseSnapshot(read_opts.snapshot);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
// Should see consistent state after ingestion for all column families even
// without snapshot.
ASSERT_EQ(2, handles_.size());
ASSERT_EQ(3, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
@ -2472,10 +2482,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2489,6 +2500,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
@ -2508,8 +2522,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
fault_injection_env->SetFilesystemActive(true);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
ASSERT_EQ(3, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
@ -2538,10 +2553,11 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
"DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2555,6 +2571,8 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
@ -2574,8 +2592,9 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
fault_injection_env->SetFilesystemActive(true);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
ASSERT_EQ(3, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
@ -2595,7 +2614,7 @@ TEST_P(ExternalSSTFileTest,
Options options = CurrentOptions();
options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
SyncPoint::GetInstance()->ClearTrace();
SyncPoint::GetInstance()->DisableProcessing();
@ -2613,6 +2632,7 @@ TEST_P(ExternalSSTFileTest,
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
column_families.push_back(handles_[2]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
@ -2626,6 +2646,8 @@ TEST_P(ExternalSSTFileTest,
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
data.push_back(
{std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
@ -2646,8 +2668,9 @@ TEST_P(ExternalSSTFileTest,
fault_injection_env->DropUnsyncedFileData();
fault_injection_env->SetFilesystemActive(true);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
ASSERT_EQ(3, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {

View File

@ -277,8 +277,12 @@ void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id,
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
const auto& memlist = current_->memlist_;
bool atomic_flush = false;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
atomic_flush = true;
}
if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) {
break;
}
@ -292,7 +296,9 @@ void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id,
ret->push_back(m);
}
}
flush_requested_ = false; // start-flush request is complete
if (!atomic_flush || num_flush_not_started_ == 0) {
flush_requested_ = false; // start-flush request is complete
}
}
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,

View File

@ -201,7 +201,15 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// want. Also if we're in compaction and it's a put, it would be nice to
// run compaction filter on it.
const Slice val = iter->value();
const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr;
const Slice* val_ptr;
if (kTypeValue == ikey.type &&
(range_del_agg == nullptr ||
!range_del_agg->ShouldDelete(
ikey, RangeDelPositioningMode::kForwardTraversal))) {
val_ptr = &val;
} else {
val_ptr = nullptr;
}
std::string merge_result;
s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr,
merge_context_.GetOperands(), &merge_result, logger_,

View File

@ -353,7 +353,7 @@ class FilePickerMultiGet {
struct FilePickerContext;
public:
FilePickerMultiGet(std::vector<FileMetaData*>* files, MultiGetRange* range,
FilePickerMultiGet(MultiGetRange* range,
autovector<LevelFilesBrief>* file_levels,
unsigned int num_levels, FileIndexer* file_indexer,
const Comparator* user_comparator,
@ -368,18 +368,12 @@ class FilePickerMultiGet {
maybe_repeat_key_(false),
current_level_range_(*range, range->begin(), range->end()),
current_file_range_(*range, range->begin(), range->end()),
#ifndef NDEBUG
files_(files),
#endif
level_files_brief_(file_levels),
is_hit_file_last_in_level_(false),
curr_file_level_(nullptr),
file_indexer_(file_indexer),
user_comparator_(user_comparator),
internal_comparator_(internal_comparator) {
#ifdef NDEBUG
(void)files;
#endif
for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
fp_ctx_array_[iter.index()] =
FilePickerContext(0, FileIndexer::kLevelMaxIndex);
@ -416,6 +410,18 @@ class FilePickerMultiGet {
bool file_hit = false;
int cmp_largest = -1;
if (curr_file_index >= curr_file_level_->num_files) {
// In the unlikely case the next key is a duplicate of the current key,
// and the current key is the last in the level and the internal key
// was not found, we need to skip lookup for the remaining keys and
// reset the search bounds
if (batch_iter_ != current_level_range_.end()) {
++batch_iter_;
for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
fp_ctx.search_left_bound = 0;
fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
}
}
return false;
}
// Loops over keys in the MultiGet batch until it finds a file with
@ -473,25 +479,6 @@ class FilePickerMultiGet {
} else {
file_hit = true;
}
#ifndef NDEBUG
// Sanity check to make sure that the files are correctly sorted
if (f != prev_file_) {
if (prev_file_) {
if (curr_level_ != 0) {
int comp_sign = internal_comparator_->Compare(
prev_file_->largest_key, f->smallest_key);
assert(comp_sign < 0);
} else if (fp_ctx.curr_index_in_curr_level > 0) {
// level == 0, the current file cannot be newer than the previous
// one. Use compressed data structure, has no attribute seqNo
assert(!NewestFirstBySeqNo(
files_[0][fp_ctx.curr_index_in_curr_level],
files_[0][fp_ctx.curr_index_in_curr_level - 1]));
}
}
prev_file_ = f;
}
#endif
if (cmp_largest == 0) {
// cmp_largest is 0, which means the next key will not be in this
// file, so stop looking further. Also don't increment megt_iter_
@ -533,7 +520,10 @@ class FilePickerMultiGet {
// any further for that key, so advance batch_iter_. Else, keep
// batch_iter_ positioned on that key so we look it up again in
// the next file
if (current_level_range_.CheckKeyDone(batch_iter_)) {
// For L0, always advance the key because we will look in the next
// file regardless for all keys not found yet
if (current_level_range_.CheckKeyDone(batch_iter_) ||
curr_level_ == 0) {
++batch_iter_;
}
}
@ -601,7 +591,8 @@ class FilePickerMultiGet {
unsigned int start_index_in_curr_level;
FilePickerContext(int32_t left, int32_t right)
: search_left_bound(left), search_right_bound(right) {}
: search_left_bound(left), search_right_bound(right),
curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
FilePickerContext() = default;
};
@ -619,9 +610,6 @@ class FilePickerMultiGet {
bool maybe_repeat_key_;
MultiGetRange current_level_range_;
MultiGetRange current_file_range_;
#ifndef NDEBUG
std::vector<FileMetaData*>* files_;
#endif
autovector<LevelFilesBrief>* level_files_brief_;
bool search_ended_;
bool is_hit_file_last_in_level_;
@ -629,9 +617,6 @@ class FilePickerMultiGet {
FileIndexer* file_indexer_;
const Comparator* user_comparator_;
const InternalKeyComparator* internal_comparator_;
#ifndef NDEBUG
FdWithKeyRange* prev_file_;
#endif
// Setup local variables to search next level.
// Returns false if there are no more levels to search.
@ -640,9 +625,6 @@ class FilePickerMultiGet {
MultiGetRange::Iterator mget_iter = current_level_range_.begin();
if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
curr_file_level_->num_files) {
#ifndef NDEBUG
prev_file_ = nullptr;
#endif
batch_iter_prev_ = current_level_range_.begin();
batch_iter_ = current_level_range_.begin();
return true;
@ -738,9 +720,6 @@ class FilePickerMultiGet {
fp_ctx.curr_index_in_curr_level = start_index;
}
if (level_contains_keys) {
#ifndef NDEBUG
prev_file_ = nullptr;
#endif
batch_iter_prev_ = current_level_range_.begin();
batch_iter_ = current_level_range_.begin();
return true;
@ -1758,12 +1737,16 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
iter->value, nullptr, &(iter->merge_context),
&iter->max_covering_tombstone_seq, this->env_, &iter->seq,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
iter->get_context = &get_ctx.back();
}
int get_ctx_index = 0;
for (auto iter = range->begin(); iter != range->end();
++iter, get_ctx_index++) {
iter->get_context = &(get_ctx[get_ctx_index]);
}
MultiGetRange file_picker_range(*range, range->begin(), range->end());
FilePickerMultiGet fp(
storage_info_.files_, &file_picker_range,
&file_picker_range,
&storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
&storage_info_.file_indexer_, user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();

View File

@ -275,10 +275,12 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// this option helps reducing the cpu usage of long-running compactions. The
// feature is disabled when max_subcompactions is greater than one.
//
// Default: 0.5s
// NOTE: This feautre is currently incompatible with RangeDeletes.
//
// Default: 0
//
// Dynamically changeable through SetOptions() API
uint64_t snap_refresh_nanos = 500 * 1000 * 1000; // 0.5s
uint64_t snap_refresh_nanos = 0;
// Disable automatic compactions. Manual compactions can still
// be issued on this column family

View File

@ -5,8 +5,8 @@
#pragma once
#define ROCKSDB_MAJOR 6
#define ROCKSDB_MINOR 1
#define ROCKSDB_PATCH 1
#define ROCKSDB_MINOR 2
#define ROCKSDB_PATCH 4
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -123,7 +123,7 @@ class MultiGetContext {
KeyContext** sorted_keys_;
size_t num_keys_;
uint64_t value_mask_;
std::unique_ptr<char> lookup_key_heap_buf;
std::unique_ptr<char[]> lookup_key_heap_buf;
LookupKey* lookup_key_ptr_;
public:

View File

@ -65,6 +65,7 @@ default_params = {
"writepercent": 35,
"format_version": lambda: random.randint(2, 4),
"index_block_restart_interval": lambda: random.choice(range(1, 16)),
"use_multiget" : lambda: random.randint(0, 1),
}
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'
@ -342,8 +343,9 @@ def whitebox_crash_main(args, unknown_args):
if additional_opts['kill_random_test'] is None and (retncode == 0):
# we expect zero retncode if no kill option
expected = True
elif additional_opts['kill_random_test'] is not None and retncode < 0:
# we expect negative retncode if kill option was given
elif additional_opts['kill_random_test'] is not None and retncode <= 0:
# When kill option is given, the test MIGHT kill itself.
# If it does, negative retncode is expected. Otherwise 0.
expected = True
if not expected:

View File

@ -455,6 +455,9 @@ DEFINE_uint64(snapshot_hold_ops, 0,
"If non-zero, then releases snapshots N operations after they're "
"acquired.");
DEFINE_bool(use_multiget, false,
"If set, use the batched MultiGet API for reads");
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
if (value < 0 || value>100) {
fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n",
@ -1725,6 +1728,27 @@ class StressTest {
return base_key + thread->rand.Next() % FLAGS_active_width;
}
static std::vector<int64_t> GenerateNKeys(
ThreadState* thread,
int num_keys,
uint64_t iteration) {
const double completed_ratio =
static_cast<double>(iteration) / FLAGS_ops_per_thread;
const int64_t base_key = static_cast<int64_t>(
completed_ratio * (FLAGS_max_key - FLAGS_active_width));
std::vector<int64_t> keys;
keys.reserve(num_keys);
int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
keys.push_back(next_key);
for (int i = 1; i < num_keys; ++i) {
// This may result in some duplicate keys
next_key = next_key + thread->rand.Next() %
(FLAGS_active_width - (next_key - base_key));
keys.push_back(next_key);
}
return keys;
}
static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) {
size_t value_sz =
((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
@ -2162,7 +2186,14 @@ class StressTest {
int prob_op = thread->rand.Uniform(100);
if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
// OPERATION read
TestGet(thread, read_opts, rand_column_families, rand_keys);
if (FLAGS_use_multiget) {
int num_keys = thread->rand.Uniform(64);
rand_keys = GenerateNKeys(thread, num_keys, i);
TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
i += num_keys - 1;
} else {
TestGet(thread, read_opts, rand_column_families, rand_keys);
}
} else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
// OPERATION prefix scan
// keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
@ -2211,6 +2242,11 @@ class StressTest {
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
@ -2546,6 +2582,8 @@ class StressTest {
fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
FLAGS_subcompactions);
fprintf(stdout, "Use MultiGet : %s\n",
FLAGS_use_multiget ? "true" : "false");
const char* memtablerep = "";
switch (FLAGS_rep_factory) {
@ -3012,6 +3050,40 @@ class NonBatchedOpsStressTest : public StressTest {
return s;
}
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
size_t num_keys = rand_keys.size();
std::vector<std::string> key_str;
std::vector<Slice> keys;
key_str.reserve(num_keys);
keys.reserve(num_keys);
std::vector<PinnableSlice> values(num_keys);
std::vector<Status> statuses(num_keys);
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
for (size_t i = 0; i < num_keys; ++i) {
key_str.emplace_back(Key(rand_keys[i]));
keys.emplace_back(key_str.back());
}
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
statuses.data());
for (const auto& s : statuses) {
if (s.ok()) {
// found case
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
} else {
// errors case
thread->stats.AddErrors(1);
}
}
return statuses;
}
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
@ -3532,6 +3604,76 @@ class BatchedOpsStressTest : public StressTest {
return s;
}
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
size_t num_keys = rand_keys.size();
std::vector<Status> ret_status(num_keys);
std::array<std::string, 10> keys = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
size_t num_prefixes = keys.size();
for (size_t rand_key = 0; rand_key < num_keys; ++rand_key) {
std::vector<Slice> key_slices;
std::vector<PinnableSlice> values(num_prefixes);
std::vector<Status> statuses(num_prefixes);
ReadOptions readoptionscopy = readoptions;
readoptionscopy.snapshot = db_->GetSnapshot();
std::vector<std::string> key_str;
key_str.reserve(num_prefixes);
key_slices.reserve(num_prefixes);
std::string from_db;
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
for (size_t key = 0; key < num_prefixes; ++key) {
key_str.emplace_back(keys[key] + Key(rand_keys[rand_key]));
key_slices.emplace_back(key_str.back());
}
db_->MultiGet(readoptionscopy, cfh, num_prefixes, key_slices.data(),
values.data(), statuses.data());
for (size_t i = 0; i < num_prefixes; i++) {
Status s = statuses[i];
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
ret_status[rand_key] = s;
// we continue after error rather than exiting so that we can
// find more errors if any
} else if (s.IsNotFound()) {
thread->stats.AddGets(1, 0);
ret_status[rand_key] = s;
} else {
char expected_prefix = (keys[i])[0];
char actual_prefix = (values[i])[0];
if (actual_prefix != expected_prefix) {
fprintf(stderr, "error expected prefix = %c actual = %c\n",
expected_prefix, actual_prefix);
}
std::string str;
str.assign(values[i].data(), values[i].size());
values[i].Reset();
str[0] = ' '; // blank out the differing character
values[i].PinSelf(str);
thread->stats.AddGets(1, 1);
}
}
db_->ReleaseSnapshot(readoptionscopy.snapshot);
// Now that we retrieved all values, check that they all match
for (size_t i = 1; i < num_prefixes; i++) {
if (values[i] != values[0]) {
fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
key_str[i].c_str(),
StringToHex(values[0].ToString()).c_str(),
StringToHex(values[i].ToString()).c_str());
// we continue after error rather than exiting so that we can
// find more errors if any
}
}
}
return ret_status;
}
// Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
// in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
// of the key. Each of these 10 scans returns a series of values;
@ -3747,6 +3889,39 @@ class AtomicFlushStressTest : public StressTest {
return s;
}
virtual std::vector<Status> TestMultiGet(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
int num_keys = rand_keys.size();
std::vector<std::string> key_str;
std::vector<Slice> keys;
keys.reserve(num_keys);
key_str.reserve(num_keys);
std::vector<PinnableSlice> values(num_keys);
std::vector<Status> statuses(num_keys);
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
for (int i = 0; i < num_keys; ++i) {
key_str.emplace_back(Key(rand_keys[i]));
keys.emplace_back(key_str.back());
}
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), statuses.data());
for (auto s : statuses) {
if (s.ok()) {
// found case
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
} else {
// errors case
thread->stats.AddErrors(1);
}
}
return statuses;
}
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,