diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 8c5fd6bc8..6c2640a8e 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -99,7 +99,9 @@ class Transaction { virtual Status RollbackToSavePoint() = 0; // This function is similar to DB::Get() except it will also read pending - // changes in this transaction. + // changes in this transaction. Currently, this function will return + // Status::MergeInProgress if the most recent write to the queried key in + // this batch is a Merge. // // If read_options.snapshot is not set, the current version of the key will // be read. Calling SetSnapshot() does not affect the version of the data @@ -131,6 +133,9 @@ class Transaction { // snapshot is set in this transaction). The transaction behavior is the // same regardless of whether the key exists or not. // + // Note: Currently, this function will return Status::MergeInProgress + // if the most recent write to the queried key in this batch is a Merge. + // // The values returned by this function are similar to Transaction::Get(). // If value==nullptr, then this function will not read any data, but will // still ensure that this key cannot be written to by outside of this @@ -146,6 +151,7 @@ class Transaction { // Status::TimedOut() if a lock could not be acquired, // Status::TryAgain() if the memtable history size is not large enough // (See max_write_buffer_number_to_maintain) + // Status::MergeInProgress() if merge operations cannot be resolved. // or other errors if this key could not be read. virtual Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index ea096efa1..1e41e7869 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -135,6 +135,9 @@ class WriteBatchWithIndex : public WriteBatchBase { // Will create a new Iterator that will use WBWIIterator as a delta and // base_iterator as base. // + // This function is only supported if the WriteBatchWithIndex was + // constructed with overwrite_key=true. + // // The returned iterator should be deleted by the caller. // The base_iterator is now 'owned' by the returned iterator. Deleting the // returned iterator will also delete the base_iterator. @@ -152,7 +155,7 @@ class WriteBatchWithIndex : public WriteBatchBase { // Similar to previous function but does not require a column_family. // Note: An InvalidArgument status will be returned if there are any Merge - // operators for this key. + // operators for this key. Use previous method instead. Status GetFromBatch(const DBOptions& options, const Slice& key, std::string* value) { return GetFromBatch(nullptr, options, key, value); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 6705443a4..dedc94c2e 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -12,6 +12,8 @@ #include "rocksdb/utilities/transaction_db.h" #include "util/logging.h" #include "util/testharness.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/string_append/stringappend.h" using std::string; @@ -28,6 +30,7 @@ class TransactionTest : public testing::Test { TransactionTest() { options.create_if_missing = true; options.max_write_buffer_number = 2; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); dbname = test::TmpDir() + "/transaction_testdb"; DestroyDB(dbname, options); @@ -1824,6 +1827,62 @@ TEST_F(TransactionTest, SingleDeleteTest) { ASSERT_TRUE(s.IsNotFound()); } +TEST_F(TransactionTest, MergeTest) { + WriteOptions write_options; + ReadOptions read_options; + string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options, TransactionOptions()); + ASSERT_TRUE(txn); + + s = db->Put(write_options, "A", "a0"); + ASSERT_OK(s); + + s = txn->Merge("A", "1"); + ASSERT_OK(s); + + s = txn->Merge("A", "2"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + s = txn->Put("A", "a"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + s = txn->Merge("A", "3"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + TransactionOptions txn_options; + txn_options.lock_timeout = 1; // 1 ms + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + // verify that txn has "A" locked + s = txn2->Merge("A", "4"); + ASSERT_TRUE(s.IsTimedOut()); + + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; + + s = txn->Commit(); + ASSERT_OK(s); + delete txn; + + s = db->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a,3", value); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 4b531c3a2..ba90ec18a 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -636,9 +636,9 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, MergeContext merge_context; WriteBatchWithIndexInternal::Result result = - WriteBatchWithIndexInternal::GetFromBatch(options, this, column_family, - key, &merge_context, - &rep->comparator, value, &s); + WriteBatchWithIndexInternal::GetFromBatch( + options, this, column_family, key, &merge_context, &rep->comparator, + value, rep->overwrite_key, &s); switch (result) { case WriteBatchWithIndexInternal::Result::kFound: @@ -679,8 +679,8 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, std::string batch_value; WriteBatchWithIndexInternal::Result result = WriteBatchWithIndexInternal::GetFromBatch( - options, this, column_family, key, &merge_context, - &rep->comparator, &batch_value, &s); + options, this, column_family, key, &merge_context, &rep->comparator, + &batch_value, rep->overwrite_key, &s); if (result == WriteBatchWithIndexInternal::Result::kFound) { value->assign(batch_value.data(), batch_value.size()); @@ -692,6 +692,14 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, if (result == WriteBatchWithIndexInternal::Result::kError) { return s; } + if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && + rep->overwrite_key == true) { + // Since we've overwritten keys, we do not know what other operations are + // in this batch for this key, so we cannot do a Merge to compute the + // result. Instead, we will simply return MergeInProgress. + return Status::MergeInProgress(); + } + assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || result == WriteBatchWithIndexInternal::Result::kNotFound); diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index d336dddea..ba88e67d4 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -137,7 +137,7 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( const DBOptions& options, WriteBatchWithIndex* batch, ColumnFamilyHandle* column_family, const Slice& key, MergeContext* merge_context, WriteBatchEntryComparator* cmp, - std::string* value, Status* s) { + std::string* value, bool overwrite_key, Status* s) { uint32_t cf_id = GetColumnFamilyID(column_family); *s = Status::OK(); WriteBatchWithIndexInternal::Result result = @@ -211,6 +211,13 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( // We can stop iterating once we find a PUT or DELETE break; } + if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && + overwrite_key == true) { + // Since we've overwritten keys, we do not know what other operations are + // in this batch for this key, so we cannot do a Merge to compute the + // result. Instead, we will simply return MergeInProgress. + break; + } iter->Prev(); } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index b9d7e0fb7..b88cd768e 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -92,7 +92,7 @@ class WriteBatchWithIndexInternal { const DBOptions& options, WriteBatchWithIndex* batch, ColumnFamilyHandle* column_family, const Slice& key, MergeContext* merge_context, WriteBatchEntryComparator* cmp, - std::string* value, Status* s); + std::string* value, bool overwrite_key, Status* s); }; } // namespace rocksdb diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index 96f1c4b47..da695c4ca 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -971,7 +971,7 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) { DestroyDB(dbname, options); Status s = DB::Open(options, dbname, &db); - assert(s.ok()); + ASSERT_OK(s); ColumnFamilyHandle* column_family = db->DefaultColumnFamily(); WriteBatchWithIndex batch; @@ -1009,6 +1009,66 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) { DestroyDB(dbname, options); } +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge2) { + DB* db; + Options options; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + options.create_if_missing = true; + + std::string dbname = test::TmpDir() + "/write_batch_with_index_test"; + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + ASSERT_OK(s); + + ColumnFamilyHandle* column_family = db->DefaultColumnFamily(); + + // Test batch with overwrite_key=true + WriteBatchWithIndex batch(BytewiseComparator(), 0, true); + std::string value; + + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Put(column_family, "X", "x"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("x", value); + + batch.Put(column_family, "X", "x2"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("x2", value); + + batch.Merge(column_family, "X", "aaa"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Merge(column_family, "X", "bbb"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Put(column_family, "X", "x3"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("x3", value); + + batch.Merge(column_family, "X", "ccc"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Delete(column_family, "X"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Merge(column_family, "X", "ddd"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + delete db; + DestroyDB(dbname, options); +} + TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) { DB* db; Options options; @@ -1017,7 +1077,7 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) { DestroyDB(dbname, options); Status s = DB::Open(options, dbname, &db); - assert(s.ok()); + ASSERT_OK(s); WriteBatchWithIndex batch; ReadOptions read_options; @@ -1185,6 +1245,54 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) { DestroyDB(dbname, options); } +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge2) { + DB* db; + Options options; + + options.create_if_missing = true; + std::string dbname = test::TmpDir() + "/write_batch_with_index_test"; + + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + assert(s.ok()); + + // Test batch with overwrite_key=true + WriteBatchWithIndex batch(BytewiseComparator(), 0, true); + + ReadOptions read_options; + WriteOptions write_options; + std::string value; + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Merge("A", "xxx"); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Merge("A", "yyy"); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + s = db->Put(write_options, "A", "a0"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Delete("A"); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete db; + DestroyDB(dbname, options); +} + void AssertKey(std::string key, WBWIIterator* iter) { ASSERT_TRUE(iter->Valid()); ASSERT_EQ(key, iter->Entry().key.ToString());