Add more tests to ASSERT_STATUS_CHECKED (4) (#7718)

Summary:
Fourth batch of adding more tests to ASSERT_STATUS_CHECKED.

* db_range_del_test
* db_write_test
* random_access_file_reader_test
* merge_test
* external_sst_file_test
* write_buffer_manager_test
* stringappend_test
* deletefile_test

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7718

Reviewed By: pdillinger

Differential Revision: D25671608

fbshipit-source-id: 687a794e98a9e0cd5428ead9898ef05ced987c31
This commit is contained in:
Adam Retter 2020-12-22 15:08:17 -08:00 committed by Facebook GitHub Bot
parent 41ff125a8a
commit 81592d9ffa
14 changed files with 495 additions and 433 deletions

View File

@ -606,9 +606,13 @@ ifdef ASSERT_STATUS_CHECKED
db_wal_test \
db_with_timestamp_basic_test \
db_with_timestamp_compaction_test \
db_write_test \
db_options_test \
db_properties_test \
db_range_del_test \
db_secondary_test \
deletefile_test \
external_sst_file_test \
options_file_test \
defer_test \
filename_test \
@ -631,6 +635,7 @@ ifdef ASSERT_STATUS_CHECKED
iostats_context_test \
ldb_cmd_test \
memkind_kmem_allocator_test \
merge_test \
merger_test \
mock_env_test \
object_registry_test \
@ -643,6 +648,7 @@ ifdef ASSERT_STATUS_CHECKED
options_settable_test \
options_test \
point_lock_manager_test \
random_access_file_reader_test \
random_test \
range_del_aggregator_test \
sst_file_reader_test \
@ -654,6 +660,7 @@ ifdef ASSERT_STATUS_CHECKED
sst_dump_test \
statistics_test \
stats_history_test \
stringappend_test \
thread_local_test \
trace_analyzer_test \
transaction_test \
@ -671,6 +678,7 @@ ifdef ASSERT_STATUS_CHECKED
version_builder_test \
version_edit_test \
work_queue_test \
write_buffer_manager_test \
write_controller_test \
write_prepared_transaction_test \
write_unprepared_transaction_test \

View File

@ -1449,9 +1449,8 @@ Status CompactionJob::FinishCompactionOutputFile(
} else {
sub_compact->builder->Abandon();
}
IOStatus io_s = sub_compact->builder->io_status();
if (s.ok()) {
s = io_s;
s = sub_compact->builder->io_status();
}
const uint64_t current_bytes = sub_compact->builder->FileSize();
if (s.ok()) {
@ -1462,6 +1461,7 @@ Status CompactionJob::FinishCompactionOutputFile(
sub_compact->total_bytes += current_bytes;
// Finish and check for file errors
IOStatus io_s;
if (s.ok()) {
StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
@ -1500,7 +1500,18 @@ Status CompactionJob::FinishCompactionOutputFile(
std::string fname =
TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
meta->fd.GetNumber(), meta->fd.GetPathId());
env_->DeleteFile(fname);
// TODO(AR) it is not clear if there are any larger implications if
// DeleteFile fails here
Status ds = env_->DeleteFile(fname);
if (!ds.ok()) {
ROCKS_LOG_WARN(
db_options_.info_log,
"[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64
" at bottom level%s",
cfd->GetName().c_str(), job_id_, output_number,
meta->marked_for_compaction ? " (need compaction)" : "");
}
// Also need to remove the file from outputs, or it will be added to the
// VersionEdit.

View File

@ -3483,7 +3483,6 @@ Status DBImpl::DeleteFile(std::string name) {
return Status::InvalidArgument("Invalid file name");
}
Status status;
if (type == kWalFile) {
// Only allow deleting archived log files
if (log_type != kArchivedLogFile) {
@ -3492,7 +3491,7 @@ Status DBImpl::DeleteFile(std::string name) {
name.c_str());
return Status::NotSupported("Delete only supported for archived logs");
}
status = wal_manager_.DeleteFile(name, number);
Status status = wal_manager_.DeleteFile(name, number);
if (!status.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"DeleteFile %s failed -- %s.\n", name.c_str(),
@ -3501,6 +3500,7 @@ Status DBImpl::DeleteFile(std::string name) {
return status;
}
Status status;
int level;
FileMetaData* metadata;
ColumnFamilyData* cfd;
@ -4354,7 +4354,7 @@ Status DBImpl::IngestExternalFiles(
}
}
// Ingest multiple external SST files atomically.
size_t num_cfs = args.size();
const size_t num_cfs = args.size();
for (size_t i = 0; i != num_cfs; ++i) {
if (args[i].external_files.empty()) {
char err_msg[128] = {0};
@ -4395,10 +4395,7 @@ Status DBImpl::IngestExternalFiles(
env_, versions_.get(), cfd, immutable_db_options_, file_options_,
&snapshots_, arg.options, &directories_, &event_logger_, io_tracer_);
}
std::vector<std::pair<bool, Status>> exec_results;
for (size_t i = 0; i != num_cfs; ++i) {
exec_results.emplace_back(false, Status::OK());
}
// TODO(yanqin) maybe make jobs run in parallel
uint64_t start_file_number = next_file_number;
for (size_t i = 1; i != num_cfs; ++i) {
@ -4406,10 +4403,13 @@ Status DBImpl::IngestExternalFiles(
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
exec_results[i].second = ingestion_jobs[i].Prepare(
Status es = ingestion_jobs[i].Prepare(
args[i].external_files, args[i].files_checksums,
args[i].files_checksum_func_names, start_file_number, super_version);
exec_results[i].first = true;
// capture first error only
if (!es.ok() && status.ok()) {
status = es;
}
CleanupSuperVersion(super_version);
}
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
@ -4418,24 +4418,18 @@ Status DBImpl::IngestExternalFiles(
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
exec_results[0].second = ingestion_jobs[0].Prepare(
Status es = ingestion_jobs[0].Prepare(
args[0].external_files, args[0].files_checksums,
args[0].files_checksum_func_names, next_file_number, super_version);
exec_results[0].first = true;
if (!es.ok()) {
status = es;
}
CleanupSuperVersion(super_version);
}
for (const auto& exec_result : exec_results) {
if (!exec_result.second.ok()) {
status = exec_result.second;
break;
}
}
if (!status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
if (exec_results[i].first) {
ingestion_jobs[i].Cleanup(status);
}
}
InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
return status;

View File

@ -566,7 +566,12 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
}
// NOTE: the memtable_write_group is declared before the following
// `if` statement because its lifetime needs to be longer
// that the inner context of the `if` as a reference to it
// may be used further below within the outer _write_thread
WriteThread::WriteGroup memtable_write_group;
if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
assert(w.ShouldWriteToMemtable());
@ -583,6 +588,10 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
}
} else {
// NOTE: the memtable_write_group is never really used,
// so we need to set its status to pass ASSERT_STATUS_CHECKED
memtable_write_group.status.PermitUncheckedError();
}
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {

View File

@ -56,7 +56,7 @@ TEST_F(DBRangeDelTest, EndSameAsStartCoversNothing) {
}
TEST_F(DBRangeDelTest, EndComesBeforeStartInvalidArgument) {
db_->Put(WriteOptions(), "b", "val");
ASSERT_OK(db_->Put(WriteOptions(), "b", "val"));
ASSERT_TRUE(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "b", "a")
.IsInvalidArgument());
@ -82,13 +82,14 @@ TEST_F(DBRangeDelTest, CompactionOutputHasOnlyRangeTombstone) {
// snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z");
db_->Flush(FlushOptions());
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(1));
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */));
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(0, TestGetTickerCount(opts, COMPACTION_RANGE_DEL_DROP_OBSOLETE));
@ -118,7 +119,8 @@ TEST_F(DBRangeDelTest, CompactionOutputFilesExactlyFilled) {
// snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0), Key(1));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
Key(1)));
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
@ -128,18 +130,18 @@ TEST_F(DBRangeDelTest, CompactionOutputFilesExactlyFilled) {
values.push_back(rnd.RandomString(3 << 10));
ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j]));
if (j == 0 && i > 0) {
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
}
}
// put extra key to trigger final flush
ASSERT_OK(Put("", ""));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(kNumFiles, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(1));
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */));
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(2, NumTableFilesAtLevel(1));
db_->ReleaseSnapshot(snapshot);
@ -178,12 +180,12 @@ TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) {
}
// extra entry to trigger SpecialSkipListFactory's flush
ASSERT_OK(Put(GetNumericStr(kNumPerFile), ""));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */));
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GE(NumTableFilesAtLevel(1), 2);
@ -221,10 +223,10 @@ TEST_F(DBRangeDelTest, SentinelsOmittedFromOutputFile) {
}
TEST_F(DBRangeDelTest, FlushRangeDelsSameStartKey) {
db_->Put(WriteOptions(), "b1", "val");
ASSERT_OK(db_->Put(WriteOptions(), "b1", "val"));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "c"));
db_->Put(WriteOptions(), "b2", "val");
ASSERT_OK(db_->Put(WriteOptions(), "b2", "val"));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "b"));
// first iteration verifies query correctness in memtable, second verifies
@ -241,8 +243,9 @@ TEST_F(DBRangeDelTest, FlushRangeDelsSameStartKey) {
}
TEST_F(DBRangeDelTest, CompactRangeDelsSameStartKey) {
db_->Put(WriteOptions(), "unused", "val"); // prevents empty after compaction
db_->Put(WriteOptions(), "b1", "val");
ASSERT_OK(db_->Put(WriteOptions(), "unused",
"val")); // prevents empty after compaction
ASSERT_OK(db_->Put(WriteOptions(), "b1", "val"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "c"));
@ -254,8 +257,8 @@ TEST_F(DBRangeDelTest, CompactRangeDelsSameStartKey) {
for (int i = 0; i < 2; ++i) {
if (i > 0) {
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */));
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(1));
}
@ -279,12 +282,13 @@ TEST_F(DBRangeDelTest, FlushRemovesCoveredKeys) {
if (i == kNum / 3) {
snapshot = db_->GetSnapshot();
} else if (i == 2 * kNum / 3) {
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(kRangeBegin), GetNumericStr(kRangeEnd));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(kRangeBegin),
GetNumericStr(kRangeEnd)));
}
db_->Put(WriteOptions(), GetNumericStr(i), "val");
ASSERT_OK(db_->Put(WriteOptions(), GetNumericStr(i), "val"));
}
db_->Flush(FlushOptions());
ASSERT_OK(db_->Flush(FlushOptions()));
for (int i = 0; i < kNum; ++i) {
ReadOptions read_opts;
@ -314,24 +318,27 @@ TEST_F(DBRangeDelTest, CompactionRemovesCoveredKeys) {
for (int i = 0; i < kNumFiles; ++i) {
if (i > 0) {
// range tombstone covers first half of the previous file
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
ASSERT_OK(db_->DeleteRange(
WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr((i - 1) * kNumPerFile),
GetNumericStr((i - 1) * kNumPerFile + kNumPerFile / 2));
GetNumericStr((i - 1) * kNumPerFile + kNumPerFile / 2)));
}
// Make sure a given key appears in each file so compaction won't be able to
// use trivial move, which would happen if the ranges were non-overlapping.
// Also, we need an extra element since flush is only triggered when the
// number of keys is one greater than SpecialSkipListFactory's limit.
// We choose a key outside the key-range used by the test to avoid conflict.
db_->Put(WriteOptions(), GetNumericStr(kNumPerFile * kNumFiles), "val");
ASSERT_OK(db_->Put(WriteOptions(), GetNumericStr(kNumPerFile * kNumFiles),
"val"));
for (int j = 0; j < kNumPerFile; ++j) {
db_->Put(WriteOptions(), GetNumericStr(i * kNumPerFile + j), "val");
ASSERT_OK(
db_->Put(WriteOptions(), GetNumericStr(i * kNumPerFile + j), "val"));
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(1), 0);
ASSERT_EQ((kNumFiles - 1) * kNumPerFile / 2,
@ -373,8 +380,8 @@ TEST_F(DBRangeDelTest, ValidLevelSubcompactionBoundaries) {
if (i > 0) {
// delete [95,105) in two files, [295,305) in next two
int mid = (j + (1 - j % 2)) * kNumPerFile;
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(mid - 5), Key(mid + 5));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(mid - 5), Key(mid + 5)));
}
std::vector<std::string> values;
// Write 100KB (100 values, each 1K)
@ -384,7 +391,7 @@ TEST_F(DBRangeDelTest, ValidLevelSubcompactionBoundaries) {
}
// put extra key to trigger flush
ASSERT_OK(Put("", ""));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
if (j < kNumFiles - 1) {
// background compaction may happen early for kNumFiles'th file
ASSERT_EQ(NumTableFilesAtLevel(0), j + 1);
@ -400,7 +407,7 @@ TEST_F(DBRangeDelTest, ValidLevelSubcompactionBoundaries) {
// oversized L0 (relative to base_level) causes the compaction to run
// earlier.
ASSERT_OK(db_->EnableAutoCompaction({db_->DefaultColumnFamily()}));
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(db_->SetOptions(db_->DefaultColumnFamily(),
{{"disable_auto_compactions", "true"}}));
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
@ -433,8 +440,8 @@ TEST_F(DBRangeDelTest, ValidUniversalSubcompactionBoundaries) {
// insert range deletions [95,105) in two files, [295,305) in next two
// to prepare L1 for later manual compaction.
int mid = (j + (1 - j % 2)) * kNumPerFile;
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(mid - 5), Key(mid + 5));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(mid - 5), Key(mid + 5)));
}
std::vector<std::string> values;
// Write 100KB (100 values, each 1K)
@ -444,13 +451,13 @@ TEST_F(DBRangeDelTest, ValidUniversalSubcompactionBoundaries) {
}
// put extra key to trigger flush
ASSERT_OK(Put("", ""));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
if (j < kFilesPerLevel - 1) {
// background compaction may happen early for kFilesPerLevel'th file
ASSERT_EQ(NumTableFilesAtLevel(0), j + 1);
}
}
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(kNumLevels - 1 - i), kFilesPerLevel - 1);
}
@ -483,17 +490,17 @@ TEST_F(DBRangeDelTest, CompactionRemovesCoveredMergeOperands) {
for (int i = 0; i <= kNumFiles * kNumPerFile; ++i) {
if (i % kNumPerFile == 0 && i / kNumPerFile == kNumFiles - 1) {
// Delete merge operands from all but the last file
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "key",
"key_");
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
"key", "key_"));
}
std::string val;
PutFixed64(&val, i);
db_->Merge(WriteOptions(), "key", val);
ASSERT_OK(db_->Merge(WriteOptions(), "key", val));
// we need to prevent trivial move using Puts so compaction will actually
// process the merge operands.
db_->Put(WriteOptions(), "prevent_trivial_move", "");
ASSERT_OK(db_->Put(WriteOptions(), "prevent_trivial_move", ""));
if (i > 0 && i % kNumPerFile == 0) {
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
}
@ -504,7 +511,7 @@ TEST_F(DBRangeDelTest, CompactionRemovesCoveredMergeOperands) {
PutFixed64(&expected, 45); // 1+2+...+9
ASSERT_EQ(expected, actual);
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
expected.clear();
ASSERT_OK(db_->Get(read_opts, "key", &actual));
@ -550,19 +557,19 @@ TEST_F(DBRangeDelTest, ObsoleteTombstoneCleanup) {
opts.statistics = CreateDBStatistics();
Reopen(opts);
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1",
"dr10"); // obsolete after compaction
db_->Put(WriteOptions(), "key", "val");
db_->Flush(FlushOptions());
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1",
"dr10")); // obsolete after compaction
ASSERT_OK(db_->Put(WriteOptions(), "key", "val"));
ASSERT_OK(db_->Flush(FlushOptions()));
const Snapshot* snapshot = db_->GetSnapshot();
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr2",
"dr20"); // protected by snapshot
db_->Put(WriteOptions(), "key", "val");
db_->Flush(FlushOptions());
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr2",
"dr20")); // protected by snapshot
ASSERT_OK(db_->Put(WriteOptions(), "key", "val"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(2, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(1));
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(1, TestGetTickerCount(opts, COMPACTION_RANGE_DEL_DROP_OBSOLETE));
@ -609,22 +616,24 @@ TEST_F(DBRangeDelTest, TableEvictedDuringScan) {
// to bottommost level (i.e., L1).
const Snapshot* snapshot = db_->GetSnapshot();
for (int i = 0; i < kNum; ++i) {
db_->Put(WriteOptions(), GetNumericStr(i), "val");
ASSERT_OK(db_->Put(WriteOptions(), GetNumericStr(i), "val"));
if (i > 0) {
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
if (i >= kNum / 2 && i < kNum / 2 + kNumRanges) {
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(kRangeBegin), GetNumericStr(kRangeEnd));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(kRangeBegin),
GetNumericStr(kRangeEnd)));
}
}
// Must be > 1 so the first L1 file can be closed before scan finishes
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(NumTableFilesAtLevel(1), 1);
std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
ReadOptions read_opts;
auto* iter = db_->NewIterator(read_opts);
ASSERT_OK(iter->status());
int expected = kRangeEnd;
iter->SeekToFirst();
for (auto file_number : file_numbers) {
@ -647,7 +656,7 @@ TEST_F(DBRangeDelTest, TableEvictedDuringScan) {
TEST_F(DBRangeDelTest, GetCoveredKeyFromMutableMemtable) {
do {
DestroyAndReopen(CurrentOptions());
db_->Put(WriteOptions(), "key", "val");
ASSERT_OK(db_->Put(WriteOptions(), "key", "val"));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
@ -669,10 +678,10 @@ TEST_F(DBRangeDelTest, GetCoveredKeyFromImmutableMemtable) {
opts.memtable_factory.reset(new SpecialSkipListFactory(1));
DestroyAndReopen(opts);
db_->Put(WriteOptions(), "key", "val");
ASSERT_OK(db_->Put(WriteOptions(), "key", "val"));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
db_->Put(WriteOptions(), "blah", "val");
ASSERT_OK(db_->Put(WriteOptions(), "blah", "val"));
ReadOptions read_opts;
std::string value;
@ -683,7 +692,7 @@ TEST_F(DBRangeDelTest, GetCoveredKeyFromImmutableMemtable) {
TEST_F(DBRangeDelTest, GetCoveredKeyFromSst) {
do {
DestroyAndReopen(CurrentOptions());
db_->Put(WriteOptions(), "key", "val");
ASSERT_OK(db_->Put(WriteOptions(), "key", "val"));
// snapshot prevents key from being deleted during flush
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(
@ -706,11 +715,11 @@ TEST_F(DBRangeDelTest, GetCoveredMergeOperandFromMemtable) {
for (int i = 0; i < kNumMergeOps; ++i) {
std::string val;
PutFixed64(&val, i);
db_->Merge(WriteOptions(), "key", val);
ASSERT_OK(db_->Merge(WriteOptions(), "key", val));
if (i == kNumMergeOps / 2) {
// deletes [0, 5]
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "key",
"key_");
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
"key", "key_"));
}
}
@ -734,16 +743,16 @@ TEST_F(DBRangeDelTest, GetIgnoresRangeDeletions) {
opts.memtable_factory.reset(new SpecialSkipListFactory(1));
Reopen(opts);
db_->Put(WriteOptions(), "sst_key", "val");
ASSERT_OK(db_->Put(WriteOptions(), "sst_key", "val"));
// snapshot prevents key from being deleted during flush
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
ASSERT_OK(db_->Flush(FlushOptions()));
db_->Put(WriteOptions(), "imm_key", "val");
ASSERT_OK(db_->Put(WriteOptions(), "imm_key", "val"));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
db_->Put(WriteOptions(), "mem_key", "val");
ASSERT_OK(db_->Put(WriteOptions(), "mem_key", "val"));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
@ -768,13 +777,15 @@ TEST_F(DBRangeDelTest, IteratorRemovesCoveredKeys) {
// should be deleted.
for (int i = 0; i < kNum; ++i) {
if (i == kNum / 2) {
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(kRangeBegin), GetNumericStr(kRangeEnd));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(kRangeBegin),
GetNumericStr(kRangeEnd)));
}
db_->Put(WriteOptions(), GetNumericStr(i), "val");
ASSERT_OK(db_->Put(WriteOptions(), GetNumericStr(i), "val"));
}
ReadOptions read_opts;
auto* iter = db_->NewIterator(read_opts);
ASSERT_OK(iter->status());
int expected = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -802,14 +813,16 @@ TEST_F(DBRangeDelTest, IteratorOverUserSnapshot) {
for (int i = 0; i < kNum; ++i) {
if (i == kNum / 2) {
snapshot = db_->GetSnapshot();
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(kRangeBegin), GetNumericStr(kRangeEnd));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(kRangeBegin),
GetNumericStr(kRangeEnd)));
}
db_->Put(WriteOptions(), GetNumericStr(i), "val");
ASSERT_OK(db_->Put(WriteOptions(), GetNumericStr(i), "val"));
}
ReadOptions read_opts;
read_opts.snapshot = snapshot;
auto* iter = db_->NewIterator(read_opts);
ASSERT_OK(iter->status());
int expected = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -828,22 +841,23 @@ TEST_F(DBRangeDelTest, IteratorIgnoresRangeDeletions) {
opts.memtable_factory.reset(new SpecialSkipListFactory(1));
Reopen(opts);
db_->Put(WriteOptions(), "sst_key", "val");
ASSERT_OK(db_->Put(WriteOptions(), "sst_key", "val"));
// snapshot prevents key from being deleted during flush
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
ASSERT_OK(db_->Flush(FlushOptions()));
db_->Put(WriteOptions(), "imm_key", "val");
ASSERT_OK(db_->Put(WriteOptions(), "imm_key", "val"));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
db_->Put(WriteOptions(), "mem_key", "val");
ASSERT_OK(db_->Put(WriteOptions(), "mem_key", "val"));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
ReadOptions read_opts;
read_opts.ignore_range_deletions = true;
auto* iter = db_->NewIterator(read_opts);
ASSERT_OK(iter->status());
int i = 0;
std::string expected[] = {"imm_key", "mem_key", "sst_key"};
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++i) {
@ -857,7 +871,7 @@ TEST_F(DBRangeDelTest, IteratorIgnoresRangeDeletions) {
#ifndef ROCKSDB_UBSAN_RUN
TEST_F(DBRangeDelTest, TailingIteratorRangeTombstoneUnsupported) {
db_->Put(WriteOptions(), "key", "val");
ASSERT_OK(db_->Put(WriteOptions(), "key", "val"));
// snapshot prevents key from being deleted during flush
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(
@ -873,6 +887,7 @@ TEST_F(DBRangeDelTest, TailingIteratorRangeTombstoneUnsupported) {
iter->SeekToFirst();
}
ASSERT_TRUE(iter->status().IsNotSupported());
delete iter;
if (i == 0) {
ASSERT_OK(db_->Flush(FlushOptions()));
@ -882,7 +897,6 @@ TEST_F(DBRangeDelTest, TailingIteratorRangeTombstoneUnsupported) {
}
db_->ReleaseSnapshot(snapshot);
}
#endif // !ROCKSDB_UBSAN_RUN
TEST_F(DBRangeDelTest, SubcompactionHasEmptyDedicatedRangeDelFile) {
@ -926,8 +940,8 @@ TEST_F(DBRangeDelTest, SubcompactionHasEmptyDedicatedRangeDelFile) {
ASSERT_EQ(kNumFiles, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(1));
db_->EnableAutoCompaction({db_->DefaultColumnFamily()});
dbfull()->TEST_WaitForCompact();
ASSERT_OK(db_->EnableAutoCompaction({db_->DefaultColumnFamily()}));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
db_->ReleaseSnapshot(snapshot);
}
@ -949,7 +963,7 @@ TEST_F(DBRangeDelTest, MemtableBloomFilter) {
for (int i = 0; i < kNumKeys; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
Flush();
ASSERT_OK(Flush());
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
Key(kNumKeys)));
for (int i = 0; i < kNumKeys; ++i) {
@ -987,8 +1001,8 @@ TEST_F(DBRangeDelTest, CompactionTreatsSplitInputLevelDeletionAtomically) {
// snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
Key(2 * kNumFilesPerLevel));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(0), Key(2 * kNumFilesPerLevel)));
Random rnd(301);
std::string value = rnd.RandomString(kValueBytes);
@ -997,14 +1011,14 @@ TEST_F(DBRangeDelTest, CompactionTreatsSplitInputLevelDeletionAtomically) {
ASSERT_OK(Put(Key(j), value));
ASSERT_OK(Put(Key(2 * kNumFilesPerLevel - 1 - j), value));
if (j > 0) {
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(j, NumTableFilesAtLevel(0));
}
}
// put extra key to trigger final flush
ASSERT_OK(Put("", ""));
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(kNumFilesPerLevel, NumTableFilesAtLevel(1));
@ -1022,7 +1036,7 @@ TEST_F(DBRangeDelTest, CompactionTreatsSplitInputLevelDeletionAtomically) {
} else if (i == 2) {
ASSERT_OK(db_->SetOptions(db_->DefaultColumnFamily(),
{{"max_bytes_for_level_base", "10000"}}));
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(1, NumTableFilesAtLevel(1));
}
ASSERT_GT(NumTableFilesAtLevel(2), 0);
@ -1056,8 +1070,8 @@ TEST_F(DBRangeDelTest, RangeTombstoneEndKeyAsSstableUpperBound) {
// A snapshot protects the range tombstone from dropping due to
// becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(0), Key(2 * kNumFilesPerLevel));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
Key(2 * kNumFilesPerLevel)));
// Create 2 additional sstables in L0. Note that the first sstable
// contains the range tombstone.
@ -1096,7 +1110,7 @@ TEST_F(DBRangeDelTest, RangeTombstoneEndKeyAsSstableUpperBound) {
ASSERT_EQ(value, Get(Key(2)));
auto begin_str = Key(3);
const ROCKSDB_NAMESPACE::Slice begin = begin_str;
dbfull()->TEST_CompactRange(1, &begin, nullptr);
ASSERT_OK(dbfull()->TEST_CompactRange(1, &begin, nullptr));
ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(2, NumTableFilesAtLevel(2));
ASSERT_EQ(value, Get(Key(2)));
@ -1115,7 +1129,7 @@ TEST_F(DBRangeDelTest, RangeTombstoneEndKeyAsSstableUpperBound) {
// [key000002#6,1, key000004#72057594037927935,15]
auto begin_str = Key(0);
const ROCKSDB_NAMESPACE::Slice begin = begin_str;
dbfull()->TEST_CompactRange(1, &begin, &begin);
ASSERT_OK(dbfull()->TEST_CompactRange(1, &begin, &begin));
ASSERT_EQ(0, NumTableFilesAtLevel(1));
ASSERT_EQ(3, NumTableFilesAtLevel(2));
}
@ -1216,9 +1230,9 @@ TEST_F(DBRangeDelTest, KeyAtOverlappingEndpointReappears) {
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "key", &value).IsNotFound());
dbfull()->TEST_CompactRange(0 /* level */, nullptr /* begin */,
nullptr /* end */, nullptr /* column_family */,
true /* disallow_trivial_move */);
ASSERT_OK(dbfull()->TEST_CompactRange(
0 /* level */, nullptr /* begin */, nullptr /* end */,
nullptr /* column_family */, true /* disallow_trivial_move */));
ASSERT_EQ(0, NumTableFilesAtLevel(0));
// Now we have multiple files at L1 all containing a single user key, thus
// guaranteeing overlap in the file endpoints.
@ -1229,9 +1243,9 @@ TEST_F(DBRangeDelTest, KeyAtOverlappingEndpointReappears) {
// Compact and verify again. It's worthwhile because now the files have
// tighter endpoints, so we can verify that doesn't mess anything up.
dbfull()->TEST_CompactRange(1 /* level */, nullptr /* begin */,
nullptr /* end */, nullptr /* column_family */,
true /* disallow_trivial_move */);
ASSERT_OK(dbfull()->TEST_CompactRange(
1 /* level */, nullptr /* begin */, nullptr /* end */,
nullptr /* column_family */, true /* disallow_trivial_move */));
ASSERT_GT(NumTableFilesAtLevel(2), 1);
ASSERT_TRUE(db_->Get(ReadOptions(), "key", &value).IsNotFound());
@ -1307,6 +1321,7 @@ TEST_F(DBRangeDelTest, UntruncatedTombstoneDoesNotDeleteNewerKey) {
auto get_key_count = [this]() -> int {
auto* iter = db_->NewIterator(ReadOptions());
assert(iter->status().ok());
iter->SeekToFirst();
int keys_found = 0;
for (; iter->Valid(); iter->Next()) {
@ -1409,6 +1424,7 @@ TEST_F(DBRangeDelTest, DeletedMergeOperandReappearsIterPrev) {
ASSERT_GT(NumTableFilesAtLevel(1), 1);
auto* iter = db_->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
iter->SeekToLast();
int keys_found = 0;
for (; iter->Valid(); iter->Prev()) {
@ -1435,11 +1451,12 @@ TEST_F(DBRangeDelTest, SnapshotPreventsDroppedKeys) {
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
Key(10)));
db_->Flush(FlushOptions());
ASSERT_OK(db_->Flush(FlushOptions()));
ReadOptions read_opts;
read_opts.snapshot = snapshot;
auto* iter = db_->NewIterator(read_opts);
ASSERT_OK(iter->status());
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
@ -1482,6 +1499,7 @@ TEST_F(DBRangeDelTest, SnapshotPreventsDroppedKeysInImmMemTables) {
ReadOptions read_opts;
read_opts.snapshot = snapshot.get();
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
ASSERT_OK(iter->status());
TEST_SYNC_POINT("SnapshotPreventsDroppedKeysInImmMemTables:AfterNewIterator");
@ -1519,7 +1537,7 @@ TEST_F(DBRangeDelTest, RangeTombstoneWrittenToMinimalSsts) {
std::string value = rnd.RandomString(kValueBytes);
ASSERT_OK(Put(key, value));
}
db_->Flush(FlushOptions());
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(2);
}
ASSERT_EQ(0, NumTableFilesAtLevel(0));
@ -1538,7 +1556,7 @@ TEST_F(DBRangeDelTest, RangeTombstoneWrittenToMinimalSsts) {
// TODO(ajkr): remove this `Put` after file cutting accounts for range
// tombstones (#3977).
ASSERT_OK(Put("c" + Key(1), "value"));
db_->Flush(FlushOptions());
ASSERT_OK(db_->Flush(FlushOptions()));
// Ensure manual L0->L1 compaction cuts the outputs before the range tombstone
// and the range tombstone is only placed in the second SST.
@ -1546,9 +1564,9 @@ TEST_F(DBRangeDelTest, RangeTombstoneWrittenToMinimalSsts) {
Slice begin_key(begin_key_storage);
std::string end_key_storage("d");
Slice end_key(end_key_storage);
dbfull()->TEST_CompactRange(0 /* level */, &begin_key /* begin */,
&end_key /* end */, nullptr /* column_family */,
true /* disallow_trivial_move */);
ASSERT_OK(dbfull()->TEST_CompactRange(
0 /* level */, &begin_key /* begin */, &end_key /* end */,
nullptr /* column_family */, true /* disallow_trivial_move */));
ASSERT_EQ(2, NumTableFilesAtLevel(1));
std::vector<LiveFileMetaData> all_metadata;
@ -1613,15 +1631,15 @@ TEST_F(DBRangeDelTest, OverlappedTombstones) {
ASSERT_EQ(1, NumTableFilesAtLevel(0));
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */));
// The tombstone range is not broken up into multiple SSTs which may incur a
// large compaction with L2.
ASSERT_EQ(1, NumTableFilesAtLevel(1));
std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */));
ASSERT_EQ(1, NumTableFilesAtLevel(2));
ASSERT_EQ(0, NumTableFilesAtLevel(1));
}
@ -1654,13 +1672,13 @@ TEST_F(DBRangeDelTest, OverlappedKeys) {
// The key range is broken up into three SSTs to avoid a future big compaction
// with the grandparent
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */));
ASSERT_EQ(3, NumTableFilesAtLevel(1));
std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */));
ASSERT_EQ(1, NumTableFilesAtLevel(2));
ASSERT_EQ(0, NumTableFilesAtLevel(1));
}

View File

@ -60,14 +60,15 @@ TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = false;
dbfull()->Put(wo, key, "bar");
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
};
std::function<void()> write_no_slowdown_func = [&]() {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = true;
dbfull()->Put(wo, key, "bar");
Status s = dbfull()->Put(wo, key, "bar");
ASSERT_TRUE(s.ok() || s.IsIncomplete());
};
std::function<void(void*)> unblock_main_thread_func = [&](void*) {
mutex.Lock();
@ -77,13 +78,13 @@ TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
};
// Create 3 L0 files and schedule 4th without waiting
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
Flush();
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
Flush();
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
Flush();
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
@ -104,7 +105,7 @@ TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
// write_thread
FlushOptions fopt;
fopt.wait = false;
dbfull()->Flush(fopt);
ASSERT_OK(dbfull()->Flush(fopt));
// Create a mix of slowdown/no_slowdown write threads
mutex.Lock();
@ -145,7 +146,7 @@ TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
mutex.Unlock();
TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
dbfull()->TEST_WaitForFlushMemTable(nullptr);
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
// This would have triggered a write stall. Unblock the write group leader
TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
// The leader is going to create missing newer links. When the leader
@ -178,14 +179,15 @@ TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = false;
dbfull()->Put(wo, key, "bar");
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
};
std::function<void()> write_no_slowdown_func = [&]() {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = true;
dbfull()->Put(wo, key, "bar");
Status s = dbfull()->Put(wo, key, "bar");
ASSERT_TRUE(s.ok() || s.IsIncomplete());
};
std::function<void(void *)> unblock_main_thread_func = [&](void *) {
mutex.Lock();
@ -195,13 +197,13 @@ TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
};
// Create 3 L0 files and schedule 4th without waiting
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
Flush();
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
Flush();
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
Flush();
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
@ -222,7 +224,7 @@ TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
// write_thread
FlushOptions fopt;
fopt.wait = false;
dbfull()->Flush(fopt);
ASSERT_OK(dbfull()->Flush(fopt));
// Create a mix of slowdown/no_slowdown write threads
mutex.Lock();
@ -243,7 +245,7 @@ TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
mutex.Unlock();
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
dbfull()->TEST_WaitForFlushMemTable(nullptr);
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
// This would have triggered a write stall. Unblock the write group leader
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
// The leader is going to create missing newer links. When the leader finishes,
@ -307,6 +309,11 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
threads[i].join();
}
ASSERT_EQ(1, leader_count);
// The Failed PUT operations can cause a BG error to be set.
// Mark it as Checked for the ASSERT_STATUS_CHECKED
dbfull()->Resume().PermitUncheckedError();
// Close before mock_env destruct.
Close();
}
@ -351,7 +358,9 @@ TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
}
*/
if (!options.manual_wal_flush) {
ASSERT_FALSE(res.ok());
ASSERT_NOK(res);
} else {
ASSERT_OK(res);
}
}
// Close before mock_env destruct.
@ -423,13 +432,14 @@ TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
ROCKSDB_NAMESPACE::WriteOptions write_option_default;
std::string no_wal_key = no_wal_key_prefix + std::to_string(t) +
"_" + std::to_string(i);
this->Put(no_wal_key, no_wal_value, write_option_disable);
ASSERT_OK(
this->Put(no_wal_key, no_wal_value, write_option_disable));
std::string wal_key =
wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
this->Put(wal_key, wal_value, write_option_default);
dbfull()->SyncWAL();
ASSERT_OK(this->Put(wal_key, wal_value, write_option_default));
ASSERT_OK(dbfull()->SyncWAL());
}
return 0;
return;
});
}
for (auto& t: threads) {

View File

@ -40,7 +40,7 @@ class DeleteFileTest : public DBTestBase {
wal_dir_(dbname_ + "/wal_files") {}
void SetOptions(Options* options) {
assert(options);
ASSERT_NE(options, nullptr);
options->delete_obsolete_files_period_micros = 0; // always do full purge
options->enable_thread_tracking = true;
options->write_buffer_size = 1024 * 1024 * 1000;
@ -105,7 +105,7 @@ class DeleteFileTest : public DBTestBase {
void CheckFileTypeCounts(const std::string& dir, int required_log,
int required_sst, int required_manifest) {
std::vector<std::string> filenames;
env_->GetChildren(dir, &filenames);
ASSERT_OK(env_->GetChildren(dir, &filenames));
int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0;
for (auto file : filenames) {
@ -180,7 +180,8 @@ TEST_F(DeleteFileTest, AddKeysAndQueryLevels) {
ASSERT_TRUE(status.IsInvalidArgument());
// Lowest level file deletion should succeed.
ASSERT_OK(db_->DeleteFile(level2file));
status = db_->DeleteFile(level2file);
ASSERT_OK(status);
}
TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) {
@ -201,7 +202,7 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) {
compact_options.change_level = true;
compact_options.target_level = 2;
Slice first_slice(first), last_slice(last);
db_->CompactRange(compact_options, &first_slice, &last_slice);
ASSERT_OK(db_->CompactRange(compact_options, &first_slice, &last_slice));
// 1 sst after compaction
CheckFileTypeCounts(dbname_, 0, 1, 1);
@ -210,7 +211,9 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) {
Iterator *itr = nullptr;
CreateTwoLevels();
itr = db_->NewIterator(ReadOptions());
db_->CompactRange(compact_options, &first_slice, &last_slice);
ASSERT_OK(itr->status());
ASSERT_OK(db_->CompactRange(compact_options, &first_slice, &last_slice));
ASSERT_OK(itr->status());
// 3 sst after compaction with live iterator
CheckFileTypeCounts(dbname_, 0, 3, 1);
delete itr;
@ -237,7 +240,8 @@ TEST_F(DeleteFileTest, BackgroundPurgeIteratorTest) {
ReadOptions read_options;
read_options.background_purge_on_iterator_cleanup = true;
itr = db_->NewIterator(read_options);
db_->CompactRange(compact_options, &first_slice, &last_slice);
ASSERT_OK(itr->status());
ASSERT_OK(db_->CompactRange(compact_options, &first_slice, &last_slice));
// 3 sst after compaction with live iterator
CheckFileTypeCounts(dbname_, 0, 3, 1);
test::SleepingBackgroundTask sleeping_task_before;
@ -344,11 +348,12 @@ TEST_F(DeleteFileTest, BackgroundPurgeCopyOptions) {
ReadOptions read_options;
read_options.background_purge_on_iterator_cleanup = true;
itr = db_->NewIterator(read_options);
ASSERT_OK(itr->status());
// ReadOptions is deleted, but iterator cleanup function should not be
// affected
}
db_->CompactRange(compact_options, &first_slice, &last_slice);
ASSERT_OK(db_->CompactRange(compact_options, &first_slice, &last_slice));
// 3 sst after compaction with live iterator
CheckFileTypeCounts(dbname_, 0, 3, 1);
delete itr;
@ -382,9 +387,11 @@ TEST_F(DeleteFileTest, BackgroundPurgeTestMultipleJobs) {
ReadOptions read_options;
read_options.background_purge_on_iterator_cleanup = true;
Iterator* itr1 = db_->NewIterator(read_options);
ASSERT_OK(itr1->status());
CreateTwoLevels();
Iterator* itr2 = db_->NewIterator(read_options);
db_->CompactRange(compact_options, &first_slice, &last_slice);
ASSERT_OK(itr2->status());
ASSERT_OK(db_->CompactRange(compact_options, &first_slice, &last_slice));
// 5 sst files after 2 compactions with 2 live iterators
CheckFileTypeCounts(dbname_, 0, 5, 1);
@ -417,6 +424,7 @@ TEST_F(DeleteFileTest, DeleteFileWithIterator) {
CreateTwoLevels();
ReadOptions read_options;
Iterator* it = db_->NewIterator(read_options);
ASSERT_OK(it->status());
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
@ -432,7 +440,7 @@ TEST_F(DeleteFileTest, DeleteFileWithIterator) {
Status status = db_->DeleteFile(level2file);
fprintf(stdout, "Deletion status %s: %s\n",
level2file.c_str(), status.ToString().c_str());
ASSERT_TRUE(status.ok());
ASSERT_OK(status);
it->SeekToFirst();
int numKeysIterated = 0;
while(it->Valid()) {
@ -452,7 +460,7 @@ TEST_F(DeleteFileTest, DeleteLogFiles) {
AddKeys(10, 0);
VectorLogPtr logfiles;
db_->GetSortedWalFiles(logfiles);
ASSERT_OK(db_->GetSortedWalFiles(logfiles));
ASSERT_GT(logfiles.size(), 0UL);
// Take the last log file which is expected to be alive and try to delete it
// Should not succeed because live logs are not allowed to be deleted
@ -461,7 +469,7 @@ TEST_F(DeleteFileTest, DeleteLogFiles) {
ASSERT_OK(env_->FileExists(wal_dir_ + "/" + alive_log->PathName()));
fprintf(stdout, "Deleting alive log file %s\n",
alive_log->PathName().c_str());
ASSERT_TRUE(!db_->DeleteFile(alive_log->PathName()).ok());
ASSERT_NOK(db_->DeleteFile(alive_log->PathName()));
ASSERT_OK(env_->FileExists(wal_dir_ + "/" + alive_log->PathName()));
logfiles.clear();
@ -469,10 +477,10 @@ TEST_F(DeleteFileTest, DeleteLogFiles) {
// Call Flush again to flush out memtable and move alive log to archived log
// and try to delete the archived log file
FlushOptions fopts;
db_->Flush(fopts);
ASSERT_OK(db_->Flush(fopts));
AddKeys(10, 0);
db_->Flush(fopts);
db_->GetSortedWalFiles(logfiles);
ASSERT_OK(db_->Flush(fopts));
ASSERT_OK(db_->GetSortedWalFiles(logfiles));
ASSERT_GT(logfiles.size(), 0UL);
std::unique_ptr<LogFile> archived_log = std::move(logfiles.front());
ASSERT_EQ(archived_log->Type(), kArchivedLogFile);
@ -480,8 +488,8 @@ TEST_F(DeleteFileTest, DeleteLogFiles) {
fprintf(stdout, "Deleting archived log file %s\n",
archived_log->PathName().c_str());
ASSERT_OK(db_->DeleteFile(archived_log->PathName()));
ASSERT_EQ(Status::NotFound(),
env_->FileExists(wal_dir_ + "/" + archived_log->PathName()));
ASSERT_TRUE(
env_->FileExists(wal_dir_ + "/" + archived_log->PathName()).IsNotFound());
}
TEST_F(DeleteFileTest, DeleteNonDefaultColumnFamily) {
@ -520,6 +528,7 @@ TEST_F(DeleteFileTest, DeleteNonDefaultColumnFamily) {
{
std::unique_ptr<Iterator> itr(db_->NewIterator(ReadOptions(), handles_[1]));
ASSERT_OK(itr->status());
int count = 0;
for (itr->SeekToFirst(); itr->Valid(); itr->Next()) {
ASSERT_OK(itr->status());

View File

@ -47,8 +47,8 @@ class ExternSSTFileLinkFailFallbackTest
: DBTestBase("/external_sst_file_test", /*env_do_fsync=*/true),
test_env_(new ExternalSSTTestEnv(env_, true)) {
sst_files_dir_ = dbname_ + "/sst_files/";
DestroyDir(env_, sst_files_dir_);
env_->CreateDir(sst_files_dir_);
EXPECT_EQ(DestroyDir(env_, sst_files_dir_), Status::OK());
EXPECT_EQ(env_->CreateDir(sst_files_dir_), Status::OK());
options_ = CurrentOptions();
options_.disable_auto_compactions = true;
options_.env = test_env_;
@ -79,8 +79,8 @@ class ExternalSSTFileTest
}
void DestroyAndRecreateExternalSSTFilesDir() {
DestroyDir(env_, sst_files_dir_);
env_->CreateDir(sst_files_dir_);
ASSERT_OK(DestroyDir(env_, sst_files_dir_));
ASSERT_OK(env_->CreateDir(sst_files_dir_));
}
Status GenerateOneExternalFile(
@ -116,7 +116,7 @@ class ExternalSSTFileTest
for (const auto& entry : data) {
s = sst_file_writer.Put(entry.first, entry.second);
if (!s.ok()) {
sst_file_writer.Finish();
sst_file_writer.Finish().PermitUncheckedError();
return s;
}
}
@ -171,7 +171,7 @@ class ExternalSSTFileTest
for (auto& entry : data) {
s = sst_file_writer.Put(entry.first, entry.second);
if (!s.ok()) {
sst_file_writer.Finish();
sst_file_writer.Finish().PermitUncheckedError();
return s;
}
}
@ -213,11 +213,10 @@ class ExternalSSTFileTest
size_t num_cfs = column_families.size();
assert(ifos.size() == num_cfs);
assert(data.size() == num_cfs);
Status s;
std::vector<IngestExternalFileArg> args(num_cfs);
for (size_t i = 0; i != num_cfs; ++i) {
std::string external_file_path;
s = GenerateOneExternalFile(
Status s = GenerateOneExternalFile(
options, column_families[i], data[i], file_id, sort_data,
&external_file_path,
true_data.size() == num_cfs ? &true_data[i] : nullptr);
@ -230,8 +229,7 @@ class ExternalSSTFileTest
args[i].external_files.push_back(external_file_path);
args[i].options = ifos[i];
}
s = db_->IngestExternalFiles(args);
return s;
return db_->IngestExternalFiles(args);
}
Status GenerateAndAddExternalFile(
@ -282,7 +280,9 @@ class ExternalSSTFileTest
return db_->IngestExternalFile(files, opts);
}
~ExternalSSTFileTest() override { DestroyDir(env_, sst_files_dir_); }
~ExternalSSTFileTest() override {
DestroyDir(env_, sst_files_dir_).PermitUncheckedError();
}
protected:
int last_file_id_ = 0;
@ -305,8 +305,7 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file1_info));
// Current file size should be non-zero after success write.
ASSERT_GT(sst_file_writer.FileSize(), 0);
@ -319,8 +318,7 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_EQ(file1_info.smallest_range_del_key, "");
ASSERT_EQ(file1_info.largest_range_del_key, "");
// sst_file_writer already finished, cannot add this value
s = sst_file_writer.Put(Key(100), "bad_val");
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(sst_file_writer.Put(Key(100), "bad_val"));
// file2.sst (100 => 199)
std::string file2 = sst_files_dir_ + "file2.sst";
@ -329,11 +327,9 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
// Cannot add this key because it's not after last added key
s = sst_file_writer.Put(Key(99), "bad_val");
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(sst_file_writer.Put(Key(99), "bad_val"));
ExternalSstFileInfo file2_info;
s = sst_file_writer.Finish(&file2_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file2_info));
ASSERT_EQ(file2_info.file_path, file2);
ASSERT_EQ(file2_info.num_entries, 100);
ASSERT_EQ(file2_info.smallest_key, Key(100));
@ -347,9 +343,8 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
}
ExternalSstFileInfo file3_info;
s = sst_file_writer.Finish(&file3_info);
ASSERT_OK(sst_file_writer.Finish(&file3_info));
ASSERT_TRUE(s.ok()) << s.ToString();
// Current file size should be non-zero after success finish.
ASSERT_GT(sst_file_writer.FileSize(), 0);
ASSERT_EQ(file3_info.file_path, file3);
@ -365,8 +360,7 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
}
ExternalSstFileInfo file4_info;
s = sst_file_writer.Finish(&file4_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file4_info));
ASSERT_EQ(file4_info.file_path, file4);
ASSERT_EQ(file4_info.num_entries, 10);
ASSERT_EQ(file4_info.smallest_key, Key(30));
@ -379,8 +373,7 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file5_info;
s = sst_file_writer.Finish(&file5_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file5_info));
ASSERT_EQ(file5_info.file_path, file5);
ASSERT_EQ(file5_info.num_entries, 100);
ASSERT_EQ(file5_info.smallest_key, Key(400));
@ -389,10 +382,9 @@ TEST_F(ExternalSSTFileTest, Basic) {
// file6.sst (delete 400 => 500)
std::string file6 = sst_files_dir_ + "file6.sst";
ASSERT_OK(sst_file_writer.Open(file6));
sst_file_writer.DeleteRange(Key(400), Key(500));
ASSERT_OK(sst_file_writer.DeleteRange(Key(400), Key(500)));
ExternalSstFileInfo file6_info;
s = sst_file_writer.Finish(&file6_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file6_info));
ASSERT_EQ(file6_info.file_path, file6);
ASSERT_EQ(file6_info.num_entries, 0);
ASSERT_EQ(file6_info.smallest_key, "");
@ -404,17 +396,16 @@ TEST_F(ExternalSSTFileTest, Basic) {
// file7.sst (delete 500 => 570, put 520 => 599 divisible by 2)
std::string file7 = sst_files_dir_ + "file7.sst";
ASSERT_OK(sst_file_writer.Open(file7));
sst_file_writer.DeleteRange(Key(500), Key(550));
ASSERT_OK(sst_file_writer.DeleteRange(Key(500), Key(550)));
for (int k = 520; k < 560; k += 2) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
sst_file_writer.DeleteRange(Key(525), Key(575));
ASSERT_OK(sst_file_writer.DeleteRange(Key(525), Key(575)));
for (int k = 560; k < 600; k += 2) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file7_info;
s = sst_file_writer.Finish(&file7_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file7_info));
ASSERT_EQ(file7_info.file_path, file7);
ASSERT_EQ(file7_info.num_entries, 40);
ASSERT_EQ(file7_info.smallest_key, Key(520));
@ -426,10 +417,9 @@ TEST_F(ExternalSSTFileTest, Basic) {
// file8.sst (delete 600 => 700)
std::string file8 = sst_files_dir_ + "file8.sst";
ASSERT_OK(sst_file_writer.Open(file8));
sst_file_writer.DeleteRange(Key(600), Key(700));
ASSERT_OK(sst_file_writer.DeleteRange(Key(600), Key(700)));
ExternalSstFileInfo file8_info;
s = sst_file_writer.Finish(&file8_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file8_info));
ASSERT_EQ(file8_info.file_path, file8);
ASSERT_EQ(file8_info.num_entries, 0);
ASSERT_EQ(file8_info.smallest_key, "");
@ -441,13 +431,11 @@ TEST_F(ExternalSSTFileTest, Basic) {
// Cannot create an empty sst file
std::string file_empty = sst_files_dir_ + "file_empty.sst";
ExternalSstFileInfo file_empty_info;
s = sst_file_writer.Finish(&file_empty_info);
ASSERT_NOK(s);
ASSERT_NOK(sst_file_writer.Finish(&file_empty_info));
DestroyAndReopen(options);
// Add file using file path
s = DeprecatedAddFile({file1});
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(DeprecatedAddFile({file1}));
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
for (int k = 0; k < 100; k++) {
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
@ -468,12 +456,10 @@ TEST_F(ExternalSSTFileTest, Basic) {
}
// This file has overlapping values with the existing data
s = DeprecatedAddFile({file3});
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(DeprecatedAddFile({file3}));
// This file has overlapping values with the existing data
s = DeprecatedAddFile({file4});
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(DeprecatedAddFile({file4}));
// Overwrite values of keys divisible by 5
for (int k = 0; k < 200; k += 5) {
@ -485,8 +471,7 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_OK(DeprecatedAddFile({file5}));
// This file has overlapping values with the existing data
s = DeprecatedAddFile({file6});
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(DeprecatedAddFile({file6}));
// Key range of file7 (500 => 598) don't overlap with any keys in DB
ASSERT_OK(DeprecatedAddFile({file7}));
@ -614,15 +599,13 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file1_info));
ASSERT_EQ(file1_info.file_path, file1);
ASSERT_EQ(file1_info.num_entries, 100);
ASSERT_EQ(file1_info.smallest_key, Key(0));
ASSERT_EQ(file1_info.largest_key, Key(99));
// sst_file_writer already finished, cannot add this value
s = sst_file_writer.Put(Key(100), "bad_val");
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(sst_file_writer.Put(Key(100), "bad_val"));
// file2.sst (100 => 199)
std::string file2 = sst_files_dir_ + "file2.sst";
@ -631,11 +614,9 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
// Cannot add this key because it's not after last added key
s = sst_file_writer.Put(Key(99), "bad_val");
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(sst_file_writer.Put(Key(99), "bad_val"));
ExternalSstFileInfo file2_info;
s = sst_file_writer.Finish(&file2_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file2_info));
ASSERT_EQ(file2_info.file_path, file2);
ASSERT_EQ(file2_info.num_entries, 100);
ASSERT_EQ(file2_info.smallest_key, Key(100));
@ -649,8 +630,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
}
ExternalSstFileInfo file3_info;
s = sst_file_writer.Finish(&file3_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file3_info));
ASSERT_EQ(file3_info.file_path, file3);
ASSERT_EQ(file3_info.num_entries, 5);
ASSERT_EQ(file3_info.smallest_key, Key(195));
@ -664,8 +644,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
}
ExternalSstFileInfo file4_info;
s = sst_file_writer.Finish(&file4_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file4_info));
ASSERT_EQ(file4_info.file_path, file4);
ASSERT_EQ(file4_info.num_entries, 10);
ASSERT_EQ(file4_info.smallest_key, Key(30));
@ -678,8 +657,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file5_info;
s = sst_file_writer.Finish(&file5_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file5_info));
ASSERT_EQ(file5_info.file_path, file5);
ASSERT_EQ(file5_info.num_entries, 100);
ASSERT_EQ(file5_info.smallest_key, Key(200));
@ -691,8 +669,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_OK(sst_file_writer.DeleteRange(Key(0), Key(75)));
ASSERT_OK(sst_file_writer.DeleteRange(Key(25), Key(100)));
ExternalSstFileInfo file6_info;
s = sst_file_writer.Finish(&file6_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file6_info));
ASSERT_EQ(file6_info.file_path, file6);
ASSERT_EQ(file6_info.num_entries, 0);
ASSERT_EQ(file6_info.smallest_key, "");
@ -706,8 +683,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_OK(sst_file_writer.Open(file7));
ASSERT_OK(sst_file_writer.DeleteRange(Key(99), Key(201)));
ExternalSstFileInfo file7_info;
s = sst_file_writer.Finish(&file7_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file7_info));
ASSERT_EQ(file7_info.file_path, file7);
ASSERT_EQ(file7_info.num_entries, 0);
ASSERT_EQ(file7_info.smallest_key, "");
@ -727,17 +703,13 @@ TEST_F(ExternalSSTFileTest, AddList) {
DestroyAndReopen(options);
// These lists of files have key ranges that overlap with each other
s = DeprecatedAddFile(file_list1);
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(DeprecatedAddFile(file_list1));
// Both of the following overlap on the range deletion tombstone.
s = DeprecatedAddFile(file_list4);
ASSERT_FALSE(s.ok()) << s.ToString();
s = DeprecatedAddFile(file_list5);
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(DeprecatedAddFile(file_list4));
ASSERT_NOK(DeprecatedAddFile(file_list5));
// Add files using file path list
s = DeprecatedAddFile(file_list0);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(DeprecatedAddFile(file_list0));
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
for (int k = 0; k < 200; k++) {
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
@ -778,8 +750,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
}
// This file list has overlapping values with the existing data
s = DeprecatedAddFile(file_list3);
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(DeprecatedAddFile(file_list3));
// Overwrite values of keys divisible by 5
for (int k = 0; k < 200; k += 5) {
@ -847,16 +818,14 @@ TEST_F(ExternalSSTFileTest, AddListAtomicity) {
for (int k = i * 100; k < (i + 1) * 100; k++) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
Status s = sst_file_writer.Finish(&files_info[i]);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&files_info[i]));
ASSERT_EQ(files_info[i].file_path, files[i]);
ASSERT_EQ(files_info[i].num_entries, 100);
ASSERT_EQ(files_info[i].smallest_key, Key(i * 100));
ASSERT_EQ(files_info[i].largest_key, Key((i + 1) * 100 - 1));
}
files.push_back(sst_files_dir_ + "file" + std::to_string(n) + ".sst");
auto s = DeprecatedAddFile(files);
ASSERT_NOK(s) << s.ToString();
ASSERT_NOK(DeprecatedAddFile(files));
for (int k = 0; k < n * 100; k++) {
ASSERT_EQ("NOT_FOUND", Get(Key(k)));
}
@ -878,17 +847,14 @@ TEST_F(ExternalSSTFileTest, PurgeObsoleteFilesBug) {
// file1.sst (0 => 500)
std::string sst_file_path = sst_files_dir_ + "file1.sst";
Status s = sst_file_writer.Open(sst_file_path);
ASSERT_OK(s);
ASSERT_OK(sst_file_writer.Open(sst_file_path));
for (int i = 0; i < 500; i++) {
std::string k = Key(i);
s = sst_file_writer.Put(k, k + "_val");
ASSERT_OK(s);
ASSERT_OK(sst_file_writer.Put(k, k + "_val"));
}
ExternalSstFileInfo sst_file_info;
s = sst_file_writer.Finish(&sst_file_info);
ASSERT_OK(s);
ASSERT_OK(sst_file_writer.Finish(&sst_file_info));
options.delete_obsolete_files_period_micros = 0;
options.disable_auto_compactions = true;
@ -900,12 +866,11 @@ TEST_F(ExternalSSTFileTest, PurgeObsoleteFilesBug) {
ASSERT_OK(Flush());
ASSERT_OK(Put("aaa", "xxx"));
ASSERT_OK(Flush());
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
s = DeprecatedAddFile({sst_file_path});
ASSERT_OK(s);
ASSERT_OK(DeprecatedAddFile({sst_file_path}));
for (int i = 0; i < 500; i++) {
std::string k = Key(i);
@ -928,8 +893,7 @@ TEST_F(ExternalSSTFileTest, SkipSnapshot) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file1_info));
ASSERT_EQ(file1_info.file_path, file1);
ASSERT_EQ(file1_info.num_entries, 100);
ASSERT_EQ(file1_info.smallest_key, Key(0));
@ -942,8 +906,7 @@ TEST_F(ExternalSSTFileTest, SkipSnapshot) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file2_info;
s = sst_file_writer.Finish(&file2_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file2_info));
ASSERT_EQ(file2_info.file_path, file2);
ASSERT_EQ(file2_info.num_entries, 200);
ASSERT_EQ(file2_info.smallest_key, Key(100));
@ -972,8 +935,7 @@ TEST_F(ExternalSSTFileTest, SkipSnapshot) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file3_info;
s = sst_file_writer.Finish(&file3_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish(&file3_info));
ASSERT_EQ(file3_info.file_path, file3);
ASSERT_EQ(file3_info.num_entries, 100);
ASSERT_EQ(file3_info.smallest_key, Key(300));
@ -1019,8 +981,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k)));
}
Status s = sst_file_writer.Finish();
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(sst_file_writer.Finish());
};
// Write num_files files in parallel
std::vector<port::Thread> sst_writer_threads;
@ -1082,8 +1043,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) {
// Overwrite values of keys divisible by 100
for (int k = 0; k < num_files * keys_per_file; k += 100) {
std::string key = Key(k);
Status s = Put(key, key + "_new");
ASSERT_TRUE(s.ok());
ASSERT_OK(Put(key, key + "_new"));
}
for (int i = 0; i < 2; i++) {
@ -1167,7 +1127,8 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) {
// Generate the file containing the range
std::string file_name = sst_files_dir_ + env_->GenerateUniqueId();
ASSERT_OK(sst_file_writer.Open(file_name));
s = sst_file_writer.Open(file_name);
ASSERT_OK(s);
for (int k = range_start; k <= range_end; k++) {
s = sst_file_writer.Put(Key(k), range_val);
ASSERT_OK(s);
@ -1212,10 +1173,10 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) {
// Flush / Compact the DB
if (i && i % 50 == 0) {
Flush();
ASSERT_OK(Flush());
}
if (i && i % 75 == 0) {
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}
}
@ -1293,7 +1254,7 @@ TEST_P(ExternalSSTFileTest, PickedLevel) {
// Hold compaction from finishing
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevel:2");
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
EXPECT_EQ(FilesPerLevel(), "1,1,1,2");
size_t kcnt = 0;
@ -1380,7 +1341,7 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
ASSERT_OK(bg_addfile_status);
ASSERT_OK(bg_compact_status);
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
int total_keys = 0;
Iterator* iter = db_->NewIterator(ReadOptions());
@ -1417,7 +1378,7 @@ TEST_F(ExternalSSTFileTest, IngestNonExistingFile) {
// After full compaction, there should be only 1 file.
std::vector<std::string> files;
env_->GetChildren(dbname_, &files);
ASSERT_OK(env_->GetChildren(dbname_, &files));
int num_sst_files = 0;
for (auto& f : files) {
uint64_t number;
@ -1539,7 +1500,7 @@ TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:2");
// Output of the compaction will go to L3
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
EXPECT_EQ(FilesPerLevel(), "1,0,0,2");
Close();
@ -1681,7 +1642,7 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) {
cro.exclusive_manual_compaction = false;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
@ -1733,9 +1694,9 @@ TEST_F(ExternalSSTFileTest, WithUnorderedWrite) {
Options options = CurrentOptions();
options.unordered_write = true;
DestroyAndReopen(options);
Put("foo", "v1");
ASSERT_OK(Put("foo", "v1"));
SyncPoint::GetInstance()->EnableProcessing();
port::Thread writer([&]() { Put("bar", "v2"); });
port::Thread writer([&]() { ASSERT_OK(Put("bar", "v2")); });
TEST_SYNC_POINT("ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL");
ASSERT_OK(GenerateAndAddExternalFile(options, {{"bar", "v3"}}, -1,
@ -1784,7 +1745,7 @@ TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) {
}
size_t kcnt = 0;
VerifyDBFromMap(true_data, &kcnt, false);
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyDBFromMap(true_data, &kcnt, false);
}
}
@ -1868,8 +1829,8 @@ TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoMemtableFlush) {
ASSERT_OK(Put(Key(k), "memtable"));
true_data[Key(k)] = "memtable";
}
db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable);
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable));
ASSERT_GE(entries_in_memtable, 1);
bool write_global_seqno = std::get<0>(GetParam());
@ -1878,40 +1839,40 @@ TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoMemtableFlush) {
ASSERT_OK(GenerateAndAddExternalFile(
options, {90, 100, 110}, -1, true, write_global_seqno,
verify_checksums_before_ingest, false, false, &true_data));
db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable);
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable));
ASSERT_GE(entries_in_memtable, 1);
// This file will flush the memtable
ASSERT_OK(GenerateAndAddExternalFile(
options, {19, 20, 21}, -1, true, write_global_seqno,
verify_checksums_before_ingest, false, false, &true_data));
db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable);
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable));
ASSERT_EQ(entries_in_memtable, 0);
for (int k : {200, 201, 205, 206}) {
ASSERT_OK(Put(Key(k), "memtable"));
true_data[Key(k)] = "memtable";
}
db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable);
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable));
ASSERT_GE(entries_in_memtable, 1);
// No need for flush, this file keys fit between the memtable keys
ASSERT_OK(GenerateAndAddExternalFile(
options, {202, 203, 204}, -1, true, write_global_seqno,
verify_checksums_before_ingest, false, false, &true_data));
db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable);
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable));
ASSERT_GE(entries_in_memtable, 1);
// This file will flush the memtable
ASSERT_OK(GenerateAndAddExternalFile(
options, {206, 207}, -1, true, write_global_seqno,
verify_checksums_before_ingest, false, false, &true_data));
db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable);
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable));
ASSERT_EQ(entries_in_memtable, 0);
size_t kcnt = 0;
@ -2309,7 +2270,7 @@ TEST_P(ExternalSSTFileTest, IngestBehind) {
ASSERT_OK(Put(Key(i), "memtable"));
true_data[Key(i)] = "memtable";
}
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Universal picker should go at second from the bottom level
ASSERT_EQ("0,1", FilesPerLevel());
ASSERT_OK(GenerateAndAddExternalFile(
@ -2323,7 +2284,7 @@ TEST_P(ExternalSSTFileTest, IngestBehind) {
verify_checksums_before_ingest, true /*ingest_behind*/,
false /*sort_data*/, &true_data));
ASSERT_EQ("0,1,1", FilesPerLevel());
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// bottom level should be empty
ASSERT_EQ("0,1", FilesPerLevel());
@ -2471,9 +2432,8 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data);
ASSERT_OK(s);
ASSERT_OK(GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data));
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
@ -2654,9 +2614,8 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
port::Thread ingest_thread([&]() {
Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data);
ASSERT_NOK(s);
ASSERT_NOK(GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data));
});
TEST_SYNC_POINT(
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:"
@ -2724,9 +2683,8 @@ TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
port::Thread ingest_thread([&]() {
Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data);
ASSERT_NOK(s);
ASSERT_NOK(GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data));
});
TEST_SYNC_POINT(
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
@ -2799,9 +2757,8 @@ TEST_P(ExternalSSTFileTest,
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
port::Thread ingest_thread([&]() {
Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data);
ASSERT_NOK(s);
ASSERT_NOK(GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data));
});
TEST_SYNC_POINT(
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
@ -2812,7 +2769,7 @@ TEST_P(ExternalSSTFileTest,
"PartialManifestWriteFail:1");
ingest_thread.join();
fault_injection_env->DropUnsyncedFileData();
ASSERT_OK(fault_injection_env->DropUnsyncedFileData());
fault_injection_env->SetFilesystemActive(true);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
@ -2847,7 +2804,7 @@ TEST_P(ExternalSSTFileTest, IngestFilesTriggerFlushingWithTwoWriteQueue) {
// sure that it won't enter the 2nd writer queue for the second time.
std::vector<std::pair<std::string, std::string>> data;
data.push_back(std::make_pair("1001", "v2"));
GenerateAndAddExternalFile(options, data);
ASSERT_OK(GenerateAndAddExternalFile(options, data, -1, true));
}
TEST_P(ExternalSSTFileTest, DeltaEncodingWhileGlobalSeqnoPresent) {

View File

@ -244,6 +244,12 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
if (sv_) {
RebuildIterators(false);
}
// immutable_status_ is a local aggregation of the
// status of the immutable Iterators.
// We have to PermitUncheckedError in case it is never
// used, otherwise it will fail ASSERT_STATUS_CHECKED.
immutable_status_.PermitUncheckedError();
}
ForwardIterator::~ForwardIterator() {

View File

@ -80,8 +80,8 @@ std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
options.create_if_missing = true;
options.merge_operator = std::make_shared<CountMergeOperator>();
options.max_successive_merges = max_successive_merges;
EXPECT_OK(DestroyDB(dbname, Options()));
Status s;
DestroyDB(dbname, Options());
// DBWithTTL is not supported in ROCKSDB_LITE
#ifndef ROCKSDB_LITE
if (ttl) {
@ -95,10 +95,8 @@ std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
assert(!ttl);
s = DB::Open(options, dbname, &db);
#endif // !ROCKSDB_LITE
if (!s.ok()) {
std::cerr << s.ToString() << std::endl;
assert(false);
}
EXPECT_OK(s);
assert(s.ok());
return std::shared_ptr<DB>(db);
}
@ -258,21 +256,25 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) {
counters.assert_set("a", 1);
if (test_compaction) db->Flush(o);
if (test_compaction) {
ASSERT_OK(db->Flush(o));
}
assert(counters.assert_get("a") == 1);
ASSERT_EQ(counters.assert_get("a"), 1);
counters.assert_remove("b");
// defaut value is 0 if non-existent
assert(counters.assert_get("b") == 0);
ASSERT_EQ(counters.assert_get("b"), 0);
counters.assert_add("a", 2);
if (test_compaction) db->Flush(o);
if (test_compaction) {
ASSERT_OK(db->Flush(o));
}
// 1+2 = 3
assert(counters.assert_get("a")== 3);
ASSERT_EQ(counters.assert_get("a"), 3);
dumpDb(db);
@ -282,19 +284,19 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) {
counters.assert_add("b", i);
sum += i;
}
assert(counters.assert_get("b") == sum);
ASSERT_EQ(counters.assert_get("b"), sum);
dumpDb(db);
if (test_compaction) {
db->Flush(o);
ASSERT_OK(db->Flush(o));
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
dumpDb(db);
assert(counters.assert_get("a")== 3);
assert(counters.assert_get("b") == sum);
ASSERT_EQ(counters.assert_get("a"), 3);
ASSERT_EQ(counters.assert_get("b"), sum);
}
}
@ -400,14 +402,14 @@ void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
sum += i;
if (i % (max_num_merges + 1) == 0) {
assert(num_merge_operator_calls == max_num_merges + 1);
ASSERT_EQ(num_merge_operator_calls, max_num_merges + 1);
} else {
assert(num_merge_operator_calls == 0);
ASSERT_EQ(num_merge_operator_calls, 0);
}
resetNumMergeOperatorCalls();
assert(counters.assert_get("z") == sum);
assert(num_merge_operator_calls == i % (max_num_merges + 1));
ASSERT_EQ(counters.assert_get("z"), sum);
ASSERT_EQ(num_merge_operator_calls, i % (max_num_merges + 1));
}
}
@ -424,8 +426,8 @@ void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
counters->assert_add("b", i);
tmp_sum += i;
}
db->Flush(o);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db->Flush(o));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(tmp_sum, counters->assert_get("b"));
if (count > max_merge) {
// in this case, FullMerge should be called instead.
@ -438,20 +440,20 @@ void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
// Test case 2: partial merge should not be called when a put is found.
resetNumPartialMergeCalls();
tmp_sum = 0;
db->Put(ROCKSDB_NAMESPACE::WriteOptions(), "c", "10");
ASSERT_OK(db->Put(ROCKSDB_NAMESPACE::WriteOptions(), "c", "10"));
for (size_t i = 1; i <= count; i++) {
counters->assert_add("c", i);
tmp_sum += i;
}
db->Flush(o);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db->Flush(o));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(tmp_sum, counters->assert_get("c"));
ASSERT_EQ(num_partial_merge_calls, 0U);
}
void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges,
size_t num_merges) {
assert(num_merges > max_num_merges);
ASSERT_GT(num_merges, max_num_merges);
Slice key("BatchSuccessiveMerge");
uint64_t merge_value = 1;
@ -462,15 +464,12 @@ void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges,
// Create the batch
WriteBatch batch;
for (size_t i = 0; i < num_merges; ++i) {
batch.Merge(key, merge_value_slice);
ASSERT_OK(batch.Merge(key, merge_value_slice));
}
// Apply to memtable and count the number of merges
resetNumMergeOperatorCalls();
{
Status s = db->Write(WriteOptions(), &batch);
assert(s.ok());
}
ASSERT_OK(db->Write(WriteOptions(), &batch));
ASSERT_EQ(
num_merge_operator_calls,
static_cast<size_t>(num_merges - (num_merges % (max_num_merges + 1))));
@ -478,10 +477,7 @@ void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges,
// Get the value
resetNumMergeOperatorCalls();
std::string get_value_str;
{
Status s = db->Get(ReadOptions(), key, &get_value_str);
assert(s.ok());
}
ASSERT_OK(db->Get(ReadOptions(), key, &get_value_str));
assert(get_value_str.size() == sizeof(uint64_t));
uint64_t get_value = DecodeFixed64(&get_value_str[0]);
ASSERT_EQ(get_value, num_merges * merge_value);
@ -505,7 +501,7 @@ void runTest(const std::string& dbname, const bool use_ttl = false) {
}
}
DestroyDB(dbname, Options());
ASSERT_OK(DestroyDB(dbname, Options()));
{
size_t max_merge = 5;
@ -514,7 +510,8 @@ void runTest(const std::string& dbname, const bool use_ttl = false) {
testCounters(counters, db.get(), use_compression);
testSuccessiveMerge(counters, max_merge, max_merge * 2);
testSingleBatchSuccessiveMerge(db.get(), 5, 7);
DestroyDB(dbname, Options());
ASSERT_OK(db->Close());
ASSERT_OK(DestroyDB(dbname, Options()));
}
{
@ -525,14 +522,16 @@ void runTest(const std::string& dbname, const bool use_ttl = false) {
auto db = OpenDb(dbname, use_ttl, max_merge);
MergeBasedCounters counters(db, 0);
testPartialMerge(&counters, db.get(), max_merge, min_merge, count);
DestroyDB(dbname, Options());
ASSERT_OK(db->Close());
ASSERT_OK(DestroyDB(dbname, Options()));
}
{
auto db = OpenDb(dbname, use_ttl, max_merge);
MergeBasedCounters counters(db, 0);
testPartialMerge(&counters, db.get(), max_merge, min_merge,
min_merge * 10);
DestroyDB(dbname, Options());
ASSERT_OK(db->Close());
ASSERT_OK(DestroyDB(dbname, Options()));
}
}
@ -543,15 +542,15 @@ void runTest(const std::string& dbname, const bool use_ttl = false) {
counters.add("test-key", 1);
counters.add("test-key", 1);
counters.add("test-key", 1);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}
DB* reopen_db;
ASSERT_OK(DB::Open(Options(), dbname, &reopen_db));
std::string value;
ASSERT_TRUE(!(reopen_db->Get(ReadOptions(), "test-key", &value).ok()));
ASSERT_NOK(reopen_db->Get(ReadOptions(), "test-key", &value));
delete reopen_db;
DestroyDB(dbname, Options());
ASSERT_OK(DestroyDB(dbname, Options()));
}
/* Temporary remove this test
@ -591,7 +590,7 @@ TEST_F(MergeTest, MergeWithCompactionAndFlush) {
testCountersWithFlushAndCompaction(counters, db.get());
}
}
DestroyDB(dbname, Options());
ASSERT_OK(DestroyDB(dbname, Options()));
}
#endif // !ROCKSDB_LITE

View File

@ -221,11 +221,19 @@ Status RandomAccessFileReader::MultiRead(const IOOptions& opts,
aligned_reqs.reserve(num_reqs);
// Align and merge the read requests.
size_t alignment = file_->GetRequiredBufferAlignment();
aligned_reqs.push_back(Align(read_reqs[0], alignment));
for (size_t i = 1; i < num_reqs; i++) {
for (size_t i = 0; i < num_reqs; i++) {
const auto& r = Align(read_reqs[i], alignment);
if (!TryMerge(&aligned_reqs.back(), r)) {
if (i == 0) {
// head
aligned_reqs.push_back(r);
} else if (!TryMerge(&aligned_reqs.back(), r)) {
// head + n
aligned_reqs.push_back(r);
} else {
// unused
r.status.PermitUncheckedError();
}
}
TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs",

View File

@ -145,6 +145,7 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
// Reads the first page internally.
ASSERT_EQ(aligned_reqs.size(), 1);
const FSReadRequest& aligned_r = aligned_reqs[0];
ASSERT_OK(aligned_r.status);
ASSERT_EQ(aligned_r.offset, 0);
ASSERT_EQ(aligned_r.len, page_size);
}
@ -189,6 +190,7 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
// Reads the first two pages in one request internally.
ASSERT_EQ(aligned_reqs.size(), 1);
const FSReadRequest& aligned_r = aligned_reqs[0];
ASSERT_OK(aligned_r.status);
ASSERT_EQ(aligned_r.offset, 0);
ASSERT_EQ(aligned_r.len, 2 * page_size);
}
@ -233,6 +235,7 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
// Reads the first 3 pages in one request internally.
ASSERT_EQ(aligned_reqs.size(), 1);
const FSReadRequest& aligned_r = aligned_reqs[0];
ASSERT_OK(aligned_r.status);
ASSERT_EQ(aligned_r.offset, 0);
ASSERT_EQ(aligned_r.len, 3 * page_size);
}
@ -270,8 +273,10 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
ASSERT_EQ(aligned_reqs.size(), 2);
const FSReadRequest& aligned_r0 = aligned_reqs[0];
const FSReadRequest& aligned_r1 = aligned_reqs[1];
ASSERT_OK(aligned_r0.status);
ASSERT_EQ(aligned_r0.offset, 0);
ASSERT_EQ(aligned_r0.len, page_size);
ASSERT_OK(aligned_r1.status);
ASSERT_EQ(aligned_r1.offset, 2 * page_size);
ASSERT_EQ(aligned_r1.len, page_size);
}
@ -287,8 +292,11 @@ TEST(FSReadRequest, Align) {
r.offset = 2000;
r.len = 2000;
r.scratch = nullptr;
ASSERT_OK(r.status);
FSReadRequest aligned_r = Align(r, 1024);
ASSERT_OK(r.status);
ASSERT_OK(aligned_r.status);
ASSERT_EQ(aligned_r.offset, 1024);
ASSERT_EQ(aligned_r.len, 3072);
}
@ -303,14 +311,20 @@ TEST(FSReadRequest, TryMerge) {
dest.offset = 0;
dest.len = 10;
dest.scratch = nullptr;
ASSERT_OK(dest.status);
FSReadRequest src;
src.offset = 15;
src.len = 10;
src.scratch = nullptr;
ASSERT_OK(src.status);
if (reverse) std::swap(dest, src);
if (reverse) {
std::swap(dest, src);
}
ASSERT_FALSE(TryMerge(&dest, src));
ASSERT_OK(dest.status);
ASSERT_OK(src.status);
}
{
@ -320,16 +334,22 @@ TEST(FSReadRequest, TryMerge) {
dest.offset = 0;
dest.len = 10;
dest.scratch = nullptr;
ASSERT_OK(dest.status);
FSReadRequest src;
src.offset = 10;
src.len = 10;
src.scratch = nullptr;
ASSERT_OK(src.status);
if (reverse) std::swap(dest, src);
if (reverse) {
std::swap(dest, src);
}
ASSERT_TRUE(TryMerge(&dest, src));
ASSERT_EQ(dest.offset, 0);
ASSERT_EQ(dest.len, 20);
ASSERT_OK(dest.status);
ASSERT_OK(src.status);
}
{
@ -339,16 +359,22 @@ TEST(FSReadRequest, TryMerge) {
dest.offset = 0;
dest.len = 10;
dest.scratch = nullptr;
ASSERT_OK(dest.status);
FSReadRequest src;
src.offset = 5;
src.len = 10;
src.scratch = nullptr;
ASSERT_OK(src.status);
if (reverse) std::swap(dest, src);
if (reverse) {
std::swap(dest, src);
}
ASSERT_TRUE(TryMerge(&dest, src));
ASSERT_EQ(dest.offset, 0);
ASSERT_EQ(dest.len, 15);
ASSERT_OK(dest.status);
ASSERT_OK(src.status);
}
{
@ -358,16 +384,22 @@ TEST(FSReadRequest, TryMerge) {
dest.offset = 0;
dest.len = 10;
dest.scratch = nullptr;
ASSERT_OK(dest.status);
FSReadRequest src;
src.offset = 5;
src.len = 5;
src.scratch = nullptr;
ASSERT_OK(src.status);
if (reverse) std::swap(dest, src);
if (reverse) {
std::swap(dest, src);
}
ASSERT_TRUE(TryMerge(&dest, src));
ASSERT_EQ(dest.offset, 0);
ASSERT_EQ(dest.len, 10);
ASSERT_OK(dest.status);
ASSERT_OK(src.status);
}
{
@ -377,16 +409,20 @@ TEST(FSReadRequest, TryMerge) {
dest.offset = 0;
dest.len = 10;
dest.scratch = nullptr;
ASSERT_OK(dest.status);
FSReadRequest src;
src.offset = 5;
src.len = 1;
src.scratch = nullptr;
ASSERT_OK(src.status);
if (reverse) std::swap(dest, src);
ASSERT_TRUE(TryMerge(&dest, src));
ASSERT_EQ(dest.offset, 0);
ASSERT_EQ(dest.len, 10);
ASSERT_OK(dest.status);
ASSERT_OK(src.status);
}
{
@ -396,16 +432,20 @@ TEST(FSReadRequest, TryMerge) {
dest.offset = 0;
dest.len = 10;
dest.scratch = nullptr;
ASSERT_OK(dest.status);
FSReadRequest src;
src.offset = 0;
src.len = 10;
src.scratch = nullptr;
ASSERT_OK(src.status);
if (reverse) std::swap(dest, src);
ASSERT_TRUE(TryMerge(&dest, src));
ASSERT_EQ(dest.offset, 0);
ASSERT_EQ(dest.len, 10);
ASSERT_OK(dest.status);
ASSERT_OK(src.status);
}
{
@ -415,16 +455,20 @@ TEST(FSReadRequest, TryMerge) {
dest.offset = 0;
dest.len = 10;
dest.scratch = nullptr;
ASSERT_OK(dest.status);
FSReadRequest src;
src.offset = 0;
src.len = 5;
src.scratch = nullptr;
ASSERT_OK(src.status);
if (reverse) std::swap(dest, src);
ASSERT_TRUE(TryMerge(&dest, src));
ASSERT_EQ(dest.offset, 0);
ASSERT_EQ(dest.len, 10);
ASSERT_OK(dest.status);
ASSERT_OK(src.status);
}
}
}

View File

@ -120,7 +120,8 @@ class StringAppendOperatorTest : public testing::Test,
public ::testing::WithParamInterface<bool> {
public:
StringAppendOperatorTest() {
DestroyDB(kDbName, Options()); // Start each test with a fresh DB
DestroyDB(kDbName, Options())
.PermitUncheckedError(); // Start each test with a fresh DB
}
void SetUp() override {
@ -253,9 +254,7 @@ TEST_P(StringAppendOperatorTest, SimpleTest) {
slists.Append("k1", "v3");
std::string res;
bool status = slists.Get("k1", &res);
ASSERT_TRUE(status);
ASSERT_TRUE(slists.Get("k1", &res));
ASSERT_EQ(res, "v1,v2,v3");
}
@ -268,7 +267,7 @@ TEST_P(StringAppendOperatorTest, SimpleDelimiterTest) {
slists.Append("k1", "v3");
std::string res;
slists.Get("k1", &res);
ASSERT_TRUE(slists.Get("k1", &res));
ASSERT_EQ(res, "v1|v2|v3");
}
@ -279,7 +278,7 @@ TEST_P(StringAppendOperatorTest, OneValueNoDelimiterTest) {
slists.Append("random_key", "single_val");
std::string res;
slists.Get("random_key", &res);
ASSERT_TRUE(slists.Get("random_key", &res));
ASSERT_EQ(res, "single_val");
}
@ -424,9 +423,9 @@ TEST_P(StringAppendOperatorTest, PersistentVariousKeys) {
slists.Append("c", "asdasd");
std::string a, b, c;
slists.Get("a", &a);
slists.Get("b", &b);
slists.Get("c", &c);
ASSERT_TRUE(slists.Get("a", &a));
ASSERT_TRUE(slists.Get("b", &b));
ASSERT_TRUE(slists.Get("c", &c));
ASSERT_EQ(a, "x\nt\nr");
ASSERT_EQ(b, "y\n2");
@ -450,9 +449,9 @@ TEST_P(StringAppendOperatorTest, PersistentVariousKeys) {
// The most recent changes should be in memory (MemTable)
// Hence, this will test both Get() paths.
std::string a, b, c;
slists.Get("a", &a);
slists.Get("b", &b);
slists.Get("c", &c);
ASSERT_TRUE(slists.Get("a", &a));
ASSERT_TRUE(slists.Get("b", &b));
ASSERT_TRUE(slists.Get("c", &c));
ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk");
ASSERT_EQ(b, "y\n2\ndf\nl;");
@ -466,9 +465,9 @@ TEST_P(StringAppendOperatorTest, PersistentVariousKeys) {
// All changes should be on disk. This will test VersionSet Get()
std::string a, b, c;
slists.Get("a", &a);
slists.Get("b", &b);
slists.Get("c", &c);
ASSERT_TRUE(slists.Get("a", &a));
ASSERT_TRUE(slists.Get("b", &b));
ASSERT_TRUE(slists.Get("c", &c));
ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk");
ASSERT_EQ(b, "y\n2\ndf\nl;");
@ -482,41 +481,34 @@ TEST_P(StringAppendOperatorTest, PersistentFlushAndCompaction) {
auto db = OpenDb('\n');
StringLists slists(db);
std::string a, b, c;
bool success;
// Append, Flush, Get
slists.Append("c", "asdasd");
db->Flush(ROCKSDB_NAMESPACE::FlushOptions());
success = slists.Get("c", &c);
ASSERT_TRUE(success);
ASSERT_OK(db->Flush(ROCKSDB_NAMESPACE::FlushOptions()));
ASSERT_TRUE(slists.Get("c", &c));
ASSERT_EQ(c, "asdasd");
// Append, Flush, Append, Get
slists.Append("a", "x");
slists.Append("b", "y");
db->Flush(ROCKSDB_NAMESPACE::FlushOptions());
ASSERT_OK(db->Flush(ROCKSDB_NAMESPACE::FlushOptions()));
slists.Append("a", "t");
slists.Append("a", "r");
slists.Append("b", "2");
success = slists.Get("a", &a);
assert(success == true);
ASSERT_TRUE(slists.Get("a", &a));
ASSERT_EQ(a, "x\nt\nr");
success = slists.Get("b", &b);
assert(success == true);
ASSERT_TRUE(slists.Get("b", &b));
ASSERT_EQ(b, "y\n2");
// Append, Get
success = slists.Append("c", "asdasd");
assert(success);
success = slists.Append("b", "monkey");
assert(success);
ASSERT_TRUE(slists.Append("c", "asdasd"));
ASSERT_TRUE(slists.Append("b", "monkey"));
// I omit the "assert(success)" checks here.
slists.Get("a", &a);
slists.Get("b", &b);
slists.Get("c", &c);
ASSERT_TRUE(slists.Get("a", &a));
ASSERT_TRUE(slists.Get("b", &b));
ASSERT_TRUE(slists.Get("c", &c));
ASSERT_EQ(a, "x\nt\nr");
ASSERT_EQ(b, "y\n2\nmonkey");
@ -530,17 +522,17 @@ TEST_P(StringAppendOperatorTest, PersistentFlushAndCompaction) {
std::string a, b, c;
// Get (Quick check for persistence of previous database)
slists.Get("a", &a);
ASSERT_TRUE(slists.Get("a", &a));
ASSERT_EQ(a, "x\nt\nr");
//Append, Compact, Get
slists.Append("c", "bbnagnagsx");
slists.Append("a", "sa");
slists.Append("b", "df");
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
slists.Get("a", &a);
slists.Get("b", &b);
slists.Get("c", &c);
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_TRUE(slists.Get("a", &a));
ASSERT_TRUE(slists.Get("b", &b));
ASSERT_TRUE(slists.Get("c", &c));
ASSERT_EQ(a, "x\nt\nr\nsa");
ASSERT_EQ(b, "y\n2\nmonkey\ndf");
ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx");
@ -550,24 +542,24 @@ TEST_P(StringAppendOperatorTest, PersistentFlushAndCompaction) {
slists.Append("a", "jk");
slists.Append("b", "l;");
slists.Append("c", "rogosh");
slists.Get("a", &a);
slists.Get("b", &b);
slists.Get("c", &c);
ASSERT_TRUE(slists.Get("a", &a));
ASSERT_TRUE(slists.Get("b", &b));
ASSERT_TRUE(slists.Get("c", &c));
ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk");
ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;");
ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh");
// Compact, Get
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk");
ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;");
ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh");
// Append, Flush, Compact, Get
slists.Append("b", "afcg");
db->Flush(ROCKSDB_NAMESPACE::FlushOptions());
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
slists.Get("b", &b);
ASSERT_OK(db->Flush(ROCKSDB_NAMESPACE::FlushOptions()));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_TRUE(slists.Get("b", &b));
ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;\nafcg");
}
}
@ -581,17 +573,16 @@ TEST_P(StringAppendOperatorTest, SimpleTestNullDelimiter) {
slists.Append("k1", "v3");
std::string res;
bool status = slists.Get("k1", &res);
ASSERT_TRUE(status);
ASSERT_TRUE(slists.Get("k1", &res));
// Construct the desired string. Default constructor doesn't like '\0' chars.
std::string checker("v1,v2,v3"); // Verify that the string is right size.
checker[2] = '\0'; // Use null delimiter instead of comma.
checker[5] = '\0';
assert(checker.size() == 8); // Verify it is still the correct size
ASSERT_EQ(checker.size(), 8); // Verify it is still the correct size
// Check that the rocksdb result string matches the desired string
assert(res.size() == checker.size());
ASSERT_EQ(res.size(), checker.size());
ASSERT_EQ(res, checker);
}

View File

@ -38,7 +38,7 @@ DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {}
DBWithTTLImpl::~DBWithTTLImpl() {
if (!closed_) {
Close();
Close().PermitUncheckedError();
}
}
@ -185,31 +185,32 @@ bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) {
// Strips the TS from the end of the slice
Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
Status st;
if (pinnable_val->size() < kTSLength) {
return Status::Corruption("Bad timestamp in key-value");
}
// Erasing characters which hold the TS
pinnable_val->remove_suffix(kTSLength);
return st;
return Status::OK();
}
// Strips the TS from the end of the string
Status DBWithTTLImpl::StripTS(std::string* str) {
Status st;
if (str->length() < kTSLength) {
return Status::Corruption("Bad timestamp in key-value");
}
// Erasing characters which hold the TS
str->erase(str->length() - kTSLength, kTSLength);
return st;
return Status::OK();
}
Status DBWithTTLImpl::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& val) {
WriteBatch batch;
batch.Put(column_family, key, val);
Status st = batch.Put(column_family, key, val);
if (!st.ok()) {
return st;
}
return Write(options, &batch);
}
@ -262,7 +263,10 @@ Status DBWithTTLImpl::Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
WriteBatch batch;
batch.Merge(column_family, key, value);
Status st = batch.Merge(column_family, key, value);
if (!st.ok()) {
return st;
}
return Write(options, &batch);
}
@ -271,34 +275,28 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
public:
explicit Handler(Env* env) : env_(env) {}
WriteBatch updates_ttl;
Status batch_rewrite_status;
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
std::string value_with_ts;
Status st = AppendTS(value, &value_with_ts, env_);
if (!st.ok()) {
batch_rewrite_status = st;
} else {
WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
value_with_ts);
return st;
}
return Status::OK();
return WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
value_with_ts);
}
Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
std::string value_with_ts;
Status st = AppendTS(value, &value_with_ts, env_);
if (!st.ok()) {
batch_rewrite_status = st;
} else {
WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
return st;
}
return WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
value_with_ts);
}
return Status::OK();
}
Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
return Status::OK();
return WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
}
void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); }
@ -306,9 +304,9 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
Env* env_;
};
Handler handler(GetEnv());
updates->Iterate(&handler);
if (!handler.batch_rewrite_status.ok()) {
return handler.batch_rewrite_status;
Status st = updates->Iterate(&handler);
if (!st.ok()) {
return st;
} else {
return db_->Write(opts, &(handler.updates_ttl));
}