Allow concurrent writes to blob db
Summary: I'm going with brute-force solution, just letting Put() and Write() holding a mutex before writing. May improve concurrent writing with finer granularity locking later. Closes https://github.com/facebook/rocksdb/pull/2682 Differential Revision: D5552690 Pulled By: yiwu-arbug fbshipit-source-id: 039abd675b5d274a7af6428198d1733cafecef4c
This commit is contained in:
parent
2c45ada4c4
commit
0b814ba92d
@ -28,6 +28,7 @@
|
||||
#include "util/file_reader_writer.h"
|
||||
#include "util/filename.h"
|
||||
#include "util/logging.h"
|
||||
#include "util/mutexlock.h"
|
||||
#include "util/random.h"
|
||||
#include "util/timer_queue.h"
|
||||
#include "utilities/transactions/optimistic_transaction_db_impl.h"
|
||||
@ -878,6 +879,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
||||
}
|
||||
};
|
||||
|
||||
MutexLock l(&write_mutex_);
|
||||
|
||||
SequenceNumber sequence = db_impl_->GetLatestSequenceNumber() + 1;
|
||||
BlobInserter blob_inserter(this, sequence);
|
||||
updates->Iterate(&blob_inserter);
|
||||
@ -953,6 +956,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
||||
Status BlobDBImpl::PutUntil(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& value_unc, int32_t expiration) {
|
||||
MutexLock l(&write_mutex_);
|
||||
UpdateWriteOptions(options);
|
||||
|
||||
std::shared_ptr<BlobFile> bfile =
|
||||
|
@ -447,6 +447,9 @@ class BlobDBImpl : public BlobDB {
|
||||
// HEAVILY TRAFFICKED
|
||||
port::RWMutex mutex_;
|
||||
|
||||
// Writers has to hold write_mutex_ before writing.
|
||||
mutable port::Mutex write_mutex_;
|
||||
|
||||
// counter for blob file number
|
||||
std::atomic<uint64_t> next_file_number_;
|
||||
|
||||
|
@ -511,18 +511,35 @@ TEST_F(BlobDBTest, Compression) {
|
||||
}
|
||||
#endif
|
||||
|
||||
TEST_F(BlobDBTest, DISABLED_MultipleWriters) {
|
||||
Open();
|
||||
TEST_F(BlobDBTest, MultipleWriters) {
|
||||
Open(BlobDBOptions());
|
||||
|
||||
std::vector<port::Thread> workers;
|
||||
for (size_t ii = 0; ii < 10; ii++)
|
||||
workers.push_back(port::Thread(&BlobDBTest::InsertBlobs, this));
|
||||
|
||||
for (auto& t : workers) {
|
||||
if (t.joinable()) {
|
||||
t.join();
|
||||
std::vector<std::map<std::string, std::string>> data_set(10);
|
||||
for (uint32_t i = 0; i < 10; i++)
|
||||
workers.push_back(port::Thread(
|
||||
[&](uint32_t id) {
|
||||
Random rnd(301 + id);
|
||||
for (int j = 0; j < 100; j++) {
|
||||
std::string key = "key" + ToString(id) + "_" + ToString(j);
|
||||
if (id < 5) {
|
||||
PutRandom(key, &rnd, &data_set[id]);
|
||||
} else {
|
||||
WriteBatch batch;
|
||||
PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
|
||||
blob_db_->Write(WriteOptions(), &batch);
|
||||
}
|
||||
}
|
||||
},
|
||||
i));
|
||||
std::map<std::string, std::string> data;
|
||||
for (size_t i = 0; i < 10; i++) {
|
||||
if (workers[i].joinable()) {
|
||||
workers[i].join();
|
||||
}
|
||||
data.insert(data_set[i].begin(), data_set[i].end());
|
||||
}
|
||||
VerifyDB(data);
|
||||
}
|
||||
|
||||
// Test sequence number store in blob file is correct.
|
||||
|
Loading…
Reference in New Issue
Block a user