210 lines
5.9 KiB
C++
Raw Normal View History

// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "utilities/blob_db/blob_db.h"
#ifndef ROCKSDB_LITE
#include "db/filename.h"
#include "db/write_batch_internal.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/utilities/stackable_db.h"
#include "table/block.h"
#include "table/block_based_table_builder.h"
#include "table/block_builder.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/instrumented_mutex.h"
namespace rocksdb {
namespace {
int kBlockBasedTableVersionFormat = 2;
} // namespace
class BlobDB : public StackableDB {
public:
using rocksdb::StackableDB::Put;
Status Put(const WriteOptions& options, const Slice& key,
const Slice& value) override;
using rocksdb::StackableDB::Get;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override;
Status Open();
explicit BlobDB(DB* db);
private:
std::string dbname_;
ImmutableCFOptions ioptions_;
InstrumentedMutex mutex_;
std::unique_ptr<RandomAccessFileReader> file_reader_;
std::unique_ptr<WritableFileWriter> file_writer_;
size_t writer_offset_;
size_t next_sync_offset_;
static const std::string kFileName;
static const size_t kBlockHeaderSize;
static const size_t kBytesPerSync;
};
Status NewBlobDB(Options options, std::string dbname, DB** blob_db) {
DB* db;
Status s = DB::Open(options, dbname, &db);
if (!s.ok()) {
return s;
}
BlobDB* bdb = new BlobDB(db);
s = bdb->Open();
if (!s.ok()) {
delete bdb;
}
*blob_db = bdb;
return s;
}
const std::string BlobDB::kFileName = "blob_log";
const size_t BlobDB::kBlockHeaderSize = 8;
const size_t BlobDB::kBytesPerSync = 1024 * 1024 * 128;
BlobDB::BlobDB(DB* db)
: StackableDB(db),
ioptions_(db->GetOptions()),
writer_offset_(0),
next_sync_offset_(kBytesPerSync) {}
Status BlobDB::Open() {
unique_ptr<WritableFile> wfile;
EnvOptions env_options(db_->GetOptions());
Status s = ioptions_.env->NewWritableFile(db_->GetName() + "/" + kFileName,
&wfile, env_options);
if (!s.ok()) {
return s;
}
file_writer_.reset(new WritableFileWriter(std::move(wfile), env_options));
// Write version
std::string version;
PutFixed64(&version, 0);
s = file_writer_->Append(Slice(version));
if (!s.ok()) {
return s;
}
writer_offset_ += version.size();
std::unique_ptr<RandomAccessFile> rfile;
s = ioptions_.env->NewRandomAccessFile(db_->GetName() + "/" + kFileName,
&rfile, env_options);
if (!s.ok()) {
return s;
}
file_reader_.reset(new RandomAccessFileReader(std::move(rfile)));
return s;
}
Status BlobDB::Put(const WriteOptions& options, const Slice& key,
const Slice& value) {
BlockBuilder block_builder(1, false);
block_builder.Add(key, value);
CompressionType compression = CompressionType::kLZ4Compression;
CompressionOptions compression_opts;
Slice block_contents;
std::string compression_output;
block_contents = CompressBlock(block_builder.Finish(), compression_opts,
&compression, kBlockBasedTableVersionFormat,
Slice() /* dictionary */, &compression_output);
char header[kBlockHeaderSize];
char trailer[kBlockTrailerSize];
trailer[0] = compression;
auto crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type
EncodeFixed32(trailer + 1, crc32c::Mask(crc));
BlockHandle handle;
std::string index_entry;
Status s;
{
InstrumentedMutexLock l(&mutex_);
auto raw_block_size = block_contents.size();
EncodeFixed64(header, raw_block_size);
s = file_writer_->Append(Slice(header, kBlockHeaderSize));
writer_offset_ += kBlockHeaderSize;
if (s.ok()) {
handle.set_offset(writer_offset_);
handle.set_size(raw_block_size);
s = file_writer_->Append(block_contents);
}
if (s.ok()) {
s = file_writer_->Append(Slice(trailer, kBlockTrailerSize));
}
if (s.ok()) {
s = file_writer_->Flush();
}
if (s.ok() && writer_offset_ > next_sync_offset_) {
// Sync every kBytesPerSync. This is a hacky way to limit unsynced data.
next_sync_offset_ += kBytesPerSync;
s = file_writer_->Sync(db_->GetOptions().use_fsync);
}
if (s.ok()) {
writer_offset_ += block_contents.size() + kBlockTrailerSize;
// Put file number
PutVarint64(&index_entry, 0);
handle.EncodeTo(&index_entry);
s = db_->Put(options, key, index_entry);
}
}
return s;
}
Status BlobDB::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
std::string index_entry;
s = db_->Get(options, key, &index_entry);
if (!s.ok()) {
return s;
}
BlockHandle handle;
Slice index_entry_slice(index_entry);
uint64_t file_number;
if (!GetVarint64(&index_entry_slice, &file_number)) {
return Status::Corruption();
}
assert(file_number == 0);
s = handle.DecodeFrom(&index_entry_slice);
if (!s.ok()) {
return s;
}
Footer footer(0, kBlockBasedTableVersionFormat);
BlockContents contents;
s = ReadBlockContents(file_reader_.get(), footer, options, handle, &contents,
ioptions_);
if (!s.ok()) {
return s;
}
Block block(std::move(contents));
BlockIter bit;
InternalIterator* it = block.NewIterator(nullptr, &bit);
it->SeekToFirst();
if (!it->status().ok()) {
return it->status();
}
*value = it->value().ToString();
return s;
}
} // namespace rocksdb
#else
namespace rocksdb {
Status NewBlobDB(Options options, std::string dbname, DB** blob_db) {
return Status::NotSupported();
}
} // namespace rocksdb
#endif // ROCKSDB_LITE