Blob storage pr

Summary:
The final pull request for Blob Storage.
Closes https://github.com/facebook/rocksdb/pull/2269

Differential Revision: D5033189

Pulled By: yiwu-arbug

fbshipit-source-id: 6356b683ccd58cbf38a1dc55e2ea400feecd5d06
This commit is contained in:
Anirban Rahut 2017-05-10 14:54:35 -07:00 committed by Facebook Github Bot
parent 492fc49a86
commit d85ff4953c
29 changed files with 5695 additions and 223 deletions

View File

@ -446,6 +446,10 @@ set(SOURCES
util/xxhash.cc
utilities/backupable/backupable_db.cc
utilities/blob_db/blob_db.cc
utilities/blob_db/blob_db_impl.cc
utilities/blob_db/blob_log_reader.cc
utilities/blob_db/blob_log_writer.cc
utilities/blob_db/blob_log_format.cc
utilities/checkpoint/checkpoint_impl.cc
utilities/col_buf_decoder.cc
utilities/col_buf_encoder.cc
@ -658,6 +662,7 @@ set(TESTS
util/heap_test.cc
util/rate_limiter_test.cc
util/slice_transform_test.cc
util/timer_queue_test.cc
util/thread_list_test.cc
util/thread_local_test.cc
utilities/backupable/backupable_db_test.cc

View File

@ -403,7 +403,6 @@ TESTS = \
ttl_test \
date_tiered_test \
backupable_db_test \
blob_db_test \
document_db_test \
json_document_test \
sim_cache_test \
@ -424,6 +423,7 @@ TESTS = \
options_settable_test \
options_util_test \
event_logger_test \
timer_queue_test \
cuckoo_table_builder_test \
cuckoo_table_reader_test \
cuckoo_table_db_test \
@ -1307,6 +1307,9 @@ db_bench_tool_test: tools/db_bench_tool_test.o $(BENCHTOOLOBJECTS) $(TESTHARNESS
event_logger_test: util/event_logger_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
timer_queue_test: util/timer_queue_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
sst_dump_test: tools/sst_dump_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

View File

@ -196,6 +196,12 @@ cpp_library(
"util/xxhash.cc",
"utilities/backupable/backupable_db.cc",
"utilities/blob_db/blob_db.cc",
"utilities/blob_db/blob_db_impl.cc",
"utilities/blob_db/blob_db_options_impl.cc",
"utilities/blob_db/blob_file.cc",
"utilities/blob_db/blob_log_reader.cc",
"utilities/blob_db/blob_log_writer.cc",
"utilities/blob_db/blob_log_format.cc",
"utilities/checkpoint/checkpoint_impl.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc",

View File

@ -93,7 +93,9 @@ CompactionIterator::CompactionIterator(
latest_snapshot_ = snapshots_->back();
}
if (compaction_filter_ != nullptr) {
if (compaction_filter_->IgnoreSnapshots()) ignore_snapshots_ = true;
if (compaction_filter_->IgnoreSnapshots()) {
ignore_snapshots_ = true;
}
} else {
ignore_snapshots_ = false;
}

View File

@ -1577,6 +1577,14 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
delete casted_s;
}
bool DBImpl::HasActiveSnapshotLaterThanSN(SequenceNumber sn) {
InstrumentedMutexLock l(&mutex_);
if (snapshots_.empty()) {
return false;
}
return (snapshots_.newest()->GetSequenceNumber() > sn);
}
#ifndef ROCKSDB_LITE
Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
TablePropertiesCollection* props) {
@ -1821,6 +1829,20 @@ ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
return cf_memtables->GetColumnFamilyHandle();
}
// REQUIRED: mutex is NOT held.
ColumnFamilyHandle* DBImpl::GetColumnFamilyHandleUnlocked(
uint32_t column_family_id) {
ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
InstrumentedMutexLock l(&mutex_);
if (!cf_memtables->Seek(column_family_id)) {
return nullptr;
}
return cf_memtables->GetColumnFamilyHandle();
}
void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
const Range& range,
uint64_t* const count,

View File

@ -203,6 +203,8 @@ class DBImpl : public DB {
virtual SequenceNumber GetLatestSequenceNumber() const override;
bool HasActiveSnapshotLaterThanSN(SequenceNumber sn);
#ifndef ROCKSDB_LITE
using DB::ResetStats;
virtual Status ResetStats() override;
@ -465,6 +467,9 @@ class DBImpl : public DB {
// mutex is released.
ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t column_family_id);
// Same as above, should called without mutex held and not on write thread.
ColumnFamilyHandle* GetColumnFamilyHandleUnlocked(uint32_t column_family_id);
// Returns the number of currently running flushes.
// REQUIREMENT: mutex_ must be held when calling this function.
int num_running_flushes() {

21
env/env_posix.cc vendored
View File

@ -253,13 +253,14 @@ class PosixEnv : public Env {
return s;
}
virtual Status NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) override {
virtual Status OpenWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options,
bool reopen = false) {
result->reset();
Status s;
int fd = -1;
int flags = O_CREAT | O_TRUNC;
int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC);
// Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
if (options.use_direct_writes && !options.use_mmap_writes) {
// Note: we should avoid O_APPEND here due to ta the following bug:
@ -333,6 +334,18 @@ class PosixEnv : public Env {
return s;
}
virtual Status NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) override {
return OpenWritableFile(fname, result, options, false);
}
virtual Status ReopenWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) override {
return OpenWritableFile(fname, result, options, true);
}
virtual Status ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
unique_ptr<WritableFile>* result,

View File

@ -468,8 +468,6 @@ class SequentialFile {
// aligned buffer for Direct I/O
virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }
virtual void Rewind() {}
// Remove any kind of caching of data from the offset to offset+length
// of this file. If the length is 0, then it refers to the end of file.
// If the system is not caching the file contents, then this is a noop.

7
src.mk
View File

@ -150,6 +150,12 @@ LIB_SOURCES = \
util/xxhash.cc \
utilities/backupable/backupable_db.cc \
utilities/blob_db/blob_db.cc \
utilities/blob_db/blob_db_impl.cc \
utilities/blob_db/blob_db_options_impl.cc \
utilities/blob_db/blob_file.cc \
utilities/blob_db/blob_log_reader.cc \
utilities/blob_db/blob_log_writer.cc \
utilities/blob_db/blob_log_format.cc \
utilities/checkpoint/checkpoint_impl.cc \
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \
utilities/convenience/info_log_finder.cc \
@ -308,6 +314,7 @@ MAIN_SOURCES = \
util/log_write_bench.cc \
util/rate_limiter_test.cc \
util/slice_transform_test.cc \
util/timer_queue_test.cc \
util/thread_list_test.cc \
util/thread_local_test.cc \
utilities/backupable/backupable_db_test.cc \

View File

@ -583,6 +583,10 @@ DEFINE_bool(optimistic_transaction_db, false,
"Open a OptimisticTransactionDB instance. "
"Required for randomtransaction benchmark.");
DEFINE_bool(use_blob_db, false,
"Open a BlobDB instance. "
"Required for largevalue benchmark.");
DEFINE_bool(transaction_db, false,
"Open a TransactionDB instance. "
"Required for randomtransaction benchmark.");
@ -630,8 +634,6 @@ DEFINE_bool(report_bg_io_stats, false,
DEFINE_bool(use_stderr_info_logger, false,
"Write info logs to stderr instead of to LOG file. ");
DEFINE_bool(use_blob_db, false, "Whether to use BlobDB. ");
static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
assert(ctype);
@ -1128,6 +1130,15 @@ class RandomGenerator {
pos_ += len;
return Slice(data_.data() + pos_ - len, len);
}
Slice GenerateWithTTL(unsigned int len) {
assert(len <= data_.size());
if (pos_ + len > data_.size()) {
pos_ = 0;
}
pos_ += len;
return Slice(data_.data() + pos_ - len, len);
}
};
static void AppendWithSpace(std::string* str, Slice msg) {
@ -3227,9 +3238,14 @@ void VerifyDBFromDB(std::string& truth_db_name) {
if (s.ok()) {
db->db = ptr;
}
#endif // ROCKSDB_LITE
} else if (FLAGS_use_blob_db) {
s = NewBlobDB(options, db_name, &db->db);
blob_db::BlobDBOptions blob_db_options;
blob_db::BlobDB* ptr;
s = blob_db::BlobDB::Open(options, blob_db_options, db_name, &ptr);
if (s.ok()) {
db->db = ptr;
}
#endif // ROCKSDB_LITE
} else {
s = DB::Open(options, db_name, &db->db);
}
@ -3406,8 +3422,12 @@ void VerifyDBFromDB(std::string& truth_db_name) {
int64_t rand_num = key_gens[id]->Next();
GenerateKeyFromInt(rand_num, FLAGS_num, &key);
if (FLAGS_use_blob_db) {
s = db_with_cfh->db->Put(write_options_, key,
gen.Generate(value_size_));
Slice val = gen.Generate(value_size_);
int ttl = rand() % 86400;
blob_db::BlobDB* blobdb =
static_cast<blob_db::BlobDB*>(db_with_cfh->db);
s = blobdb->PutWithTTL(write_options_, key, val, ttl);
} else if (FLAGS_num_column_families <= 1) {
batch.Put(key, gen.Generate(value_size_));
} else {

View File

@ -48,6 +48,8 @@ class SequentialFileReader {
Status Skip(uint64_t n);
void Rewind();
SequentialFile* file() { return file_.get(); }
bool use_direct_io() const { return file_->use_direct_io(); }

158
util/mpsc.h Normal file
View File

@ -0,0 +1,158 @@
// Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Large parts of this file is borrowed from the public domain code below.
// from https://github.com/mstump/queues
// C++ implementation of Dmitry Vyukov's non-intrusive
// lock free unbound MPSC queue
// http://www.1024cores.net/home/
// lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
// License from mstump/queues
// This is free and unencumbered software released into the public domain.
//
// Anyone is free to copy, modify, publish, use, compile, sell, or
// distribute this software, either in source code form or as a compiled
// binary, for any purpose, commercial or non-commercial, and by any
// means.
//
// In jurisdictions that recognize copyright laws, the author or authors
// of this software dedicate any and all copyright interest in the
// software to the public domain. We make this dedication for the benefit
// of the public at large and to the detriment of our heirs and
// successors. We intend this dedication to be an overt act of
// relinquishment in perpetuity of all present and future rights to this
// software under copyright law.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
// OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
// OTHER DEALINGS IN THE SOFTWARE.
//
// For more information, please refer to <http://unlicense.org>
// License from http://www.1024cores.net/home/
// lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
// Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
// Redistribution and use in source and binary forms, with or
// without modification, are permitted provided that the following
// conditions are met:
// 1. Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR
// IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
// OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
// EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
// BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// The views and conclusions contained in the software and documentation
// are those of the authors and should not be interpreted as representing
// official policies, either expressed or implied, of Dmitry Vyukov.
//
#ifndef UTIL_MPSC_H_
#define UTIL_MPSC_H_
#include <atomic>
#include <cassert>
#include <type_traits>
/**
* Multiple Producer Single Consumer Lockless Q
*/
template <typename T>
class mpsc_queue_t {
public:
struct buffer_node_t {
T data;
std::atomic<buffer_node_t*> next;
};
mpsc_queue_t() {
buffer_node_aligned_t* al_st = new buffer_node_aligned_t;
buffer_node_t* node = new (al_st) buffer_node_t();
_head.store(node);
_tail.store(node);
node->next.store(nullptr, std::memory_order_relaxed);
}
~mpsc_queue_t() {
T output;
while (this->dequeue(&output)) {
}
buffer_node_t* front = _head.load(std::memory_order_relaxed);
front->~buffer_node_t();
::operator delete(front);
}
void enqueue(const T& input) {
buffer_node_aligned_t* al_st = new buffer_node_aligned_t;
buffer_node_t* node = new (al_st) buffer_node_t();
node->data = input;
node->next.store(nullptr, std::memory_order_relaxed);
buffer_node_t* prev_head = _head.exchange(node, std::memory_order_acq_rel);
prev_head->next.store(node, std::memory_order_release);
}
bool dequeue(T* output) {
buffer_node_t* tail = _tail.load(std::memory_order_relaxed);
buffer_node_t* next = tail->next.load(std::memory_order_acquire);
if (next == nullptr) {
return false;
}
*output = next->data;
_tail.store(next, std::memory_order_release);
tail->~buffer_node_t();
::operator delete(tail);
return true;
}
// you can only use pop_all if the queue is SPSC
buffer_node_t* pop_all() {
// nobody else can move the tail pointer.
buffer_node_t* tptr = _tail.load(std::memory_order_relaxed);
buffer_node_t* next =
tptr->next.exchange(nullptr, std::memory_order_acquire);
_head.exchange(tptr, std::memory_order_acquire);
// there is a race condition here
return next;
}
private:
typedef typename std::aligned_storage<
sizeof(buffer_node_t), std::alignment_of<buffer_node_t>::value>::type
buffer_node_aligned_t;
std::atomic<buffer_node_t*> _head;
std::atomic<buffer_node_t*> _tail;
mpsc_queue_t(const mpsc_queue_t&) = delete;
mpsc_queue_t& operator=(const mpsc_queue_t&) = delete;
};
#endif // UTIL_MPSC_H_

217
util/timer_queue.h Normal file
View File

@ -0,0 +1,217 @@
// Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Borrowed from
// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
// Timer Queue
//
// License
//
// The source code in this article is licensed under the CC0 license, so feel
// free to copy, modify, share, do whatever you want with it.
// No attribution is required, but Ill be happy if you do.
// CC0 license
// The person who associated a work with this deed has dedicated the work to the
// public domain by waiving all of his or her rights to the work worldwide
// under copyright law, including all related and neighboring rights, to the
// extent allowed by law. You can copy, modify, distribute and perform the
// work, even for commercial purposes, all without asking permission.
#pragma once
#include <assert.h>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
// Allows execution of handlers at a specified time in the future
// Guarantees:
// - All handlers are executed ONCE, even if cancelled (aborted parameter will
// be set to true)
// - If TimerQueue is destroyed, it will cancel all handlers.
// - Handlers are ALWAYS executed in the Timer Queue worker thread.
// - Handlers execution order is NOT guaranteed
//
////////////////////////////////////////////////////////////////////////////////
// borrowed from
// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
class TimerQueue {
public:
TimerQueue() : m_th(&TimerQueue::run, this) {}
~TimerQueue() {
cancelAll();
// Abusing the timer queue to trigger the shutdown.
add(0, [this](bool) {
m_finish = true;
return std::make_pair(false, 0);
});
m_th.join();
}
// Adds a new timer
// \return
// Returns the ID of the new timer. You can use this ID to cancel the
// timer
uint64_t add(int64_t milliseconds,
std::function<std::pair<bool, int64_t>(bool)> handler) {
WorkItem item;
Clock::time_point tp = Clock::now();
item.end = tp + std::chrono::milliseconds(milliseconds);
item.period = milliseconds;
item.handler = std::move(handler);
std::unique_lock<std::mutex> lk(m_mtx);
uint64_t id = ++m_idcounter;
item.id = id;
m_items.push(std::move(item));
// Something changed, so wake up timer thread
m_checkWork.notify_one();
return id;
}
// Cancels the specified timer
// \return
// 1 if the timer was cancelled.
// 0 if you were too late to cancel (or the timer ID was never valid to
// start with)
size_t cancel(uint64_t id) {
// Instead of removing the item from the container (thus breaking the
// heap integrity), we set the item as having no handler, and put
// that handler on a new item at the top for immediate execution
// The timer thread will then ignore the original item, since it has no
// handler.
std::unique_lock<std::mutex> lk(m_mtx);
for (auto&& item : m_items.getContainer()) {
if (item.id == id && item.handler) {
WorkItem newItem;
// Zero time, so it stays at the top for immediate execution
newItem.end = Clock::time_point();
newItem.id = 0; // Means it is a canceled item
// Move the handler from item to newitem (thus clearing item)
newItem.handler = std::move(item.handler);
m_items.push(std::move(newItem));
// Something changed, so wake up timer thread
m_checkWork.notify_one();
return 1;
}
}
return 0;
}
// Cancels all timers
// \return
// The number of timers cancelled
size_t cancelAll() {
// Setting all "end" to 0 (for immediate execution) is ok,
// since it maintains the heap integrity
std::unique_lock<std::mutex> lk(m_mtx);
m_cancel = true;
for (auto&& item : m_items.getContainer()) {
if (item.id && item.handler) {
item.end = Clock::time_point();
item.id = 0;
}
}
auto ret = m_items.size();
m_checkWork.notify_one();
return ret;
}
private:
using Clock = std::chrono::steady_clock;
TimerQueue(const TimerQueue&) = delete;
TimerQueue& operator=(const TimerQueue&) = delete;
void run() {
std::unique_lock<std::mutex> lk(m_mtx);
while (!m_finish) {
auto end = calcWaitTime_lock();
if (end.first) {
// Timers found, so wait until it expires (or something else
// changes)
m_checkWork.wait_until(lk, end.second);
} else {
// No timers exist, so wait forever until something changes
m_checkWork.wait(lk);
}
// Check and execute as much work as possible, such as, all expired
// timers
checkWork(&lk);
}
// If we are shutting down, we should not have any items left,
// since the shutdown cancels all items
assert(m_items.size() == 0);
}
std::pair<bool, Clock::time_point> calcWaitTime_lock() {
while (m_items.size()) {
if (m_items.top().handler) {
// Item present, so return the new wait time
return std::make_pair(true, m_items.top().end);
} else {
// Discard empty handlers (they were cancelled)
m_items.pop();
}
}
// No items found, so return no wait time (causes the thread to wait
// indefinitely)
return std::make_pair(false, Clock::time_point());
}
void checkWork(std::unique_lock<std::mutex>* lk) {
while (m_items.size() && m_items.top().end <= Clock::now()) {
WorkItem item(m_items.top());
m_items.pop();
if (item.handler) {
(*lk).unlock();
auto reschedule_pair = item.handler(item.id == 0);
(*lk).lock();
if (!m_cancel && reschedule_pair.first) {
int64_t new_period = (reschedule_pair.second == -1)
? item.period
: reschedule_pair.second;
item.period = new_period;
item.end = Clock::now() + std::chrono::milliseconds(new_period);
m_items.push(std::move(item));
}
}
}
}
bool m_finish = false;
bool m_cancel = false;
uint64_t m_idcounter = 0;
std::condition_variable m_checkWork;
struct WorkItem {
Clock::time_point end;
int64_t period;
uint64_t id; // id==0 means it was cancelled
std::function<std::pair<bool, int64_t>(bool)> handler;
bool operator>(const WorkItem& other) const { return end > other.end; }
};
std::mutex m_mtx;
// Inheriting from priority_queue, so we can access the internal container
class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>,
std::greater<WorkItem>> {
public:
std::vector<WorkItem>& getContainer() { return this->c; }
} m_items;
std::thread m_th;
};

72
util/timer_queue_test.cc Normal file
View File

@ -0,0 +1,72 @@
// Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// borrowed from
// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
// Timer Queue
//
// License
//
// The source code in this article is licensed under the CC0 license, so feel
// free
// to copy, modify, share, do whatever you want with it.
// No attribution is required, but Ill be happy if you do.
// CC0 license
// The person who associated a work with this deed has dedicated the work to the
// public domain by waiving all of his or her rights to the work worldwide
// under copyright law, including all related and neighboring rights, to the
// extent allowed by law. You can copy, modify, distribute and perform the
// work, even for
// commercial purposes, all without asking permission. See Other Information
// below.
//
#include "util/timer_queue.h"
#include <future>
namespace Timing {
using Clock = std::chrono::high_resolution_clock;
double now() {
static auto start = Clock::now();
return std::chrono::duration<double, std::milli>(Clock::now() - start)
.count();
}
} // namespace Timing
int main() {
TimerQueue q;
double tnow = Timing::now();
q.add(10000, [tnow](bool aborted) mutable {
printf("T 1: %d, Elapsed %4.2fms\n", aborted, Timing::now() - tnow);
return std::make_pair(false, 0);
});
q.add(10001, [tnow](bool aborted) mutable {
printf("T 2: %d, Elapsed %4.2fms\n", aborted, Timing::now() - tnow);
return std::make_pair(false, 0);
});
q.add(1000, [tnow](bool aborted) mutable {
printf("T 3: %d, Elapsed %4.2fms\n", aborted, Timing::now() - tnow);
return std::make_pair(!aborted, 1000);
});
auto id = q.add(2000, [tnow](bool aborted) mutable {
printf("T 4: %d, Elapsed %4.2fms\n", aborted, Timing::now() - tnow);
return std::make_pair(!aborted, 2000);
});
(void)id;
// auto ret = q.cancel(id);
// assert(ret == 1);
// q.cancelAll();
return 0;
}
//////////////////////////////////////////

View File

@ -1,12 +1,15 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "utilities/blob_db/blob_db.h"
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_db.h"
#include "db/write_batch_internal.h"
#include "monitoring/instrumented_mutex.h"
#include "options/cf_options.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
@ -17,194 +20,152 @@
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/filename.h"
#include "utilities/blob_db/blob_db_impl.h"
namespace rocksdb {
namespace {
int kBlockBasedTableVersionFormat = 2;
} // namespace
namespace blob_db {
port::Mutex listener_mutex;
typedef std::shared_ptr<BlobDBFlushBeginListener> FlushBeginListener_t;
typedef std::shared_ptr<BlobReconcileWalFilter> ReconcileWalFilter_t;
typedef std::shared_ptr<EvictAllVersionsCompactionListener>
CompactionListener_t;
class BlobDB : public StackableDB {
public:
using rocksdb::StackableDB::Put;
Status Put(const WriteOptions& options, const Slice& key,
const Slice& value) override;
// to ensure the lifetime of the listeners
std::vector<std::shared_ptr<EventListener>> all_blobdb_listeners;
std::vector<ReconcileWalFilter_t> all_wal_filters;
using rocksdb::StackableDB::Get;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override;
Status BlobDB::OpenAndLoad(const Options& options,
const BlobDBOptions& bdb_options,
const std::string& dbname, BlobDB** blob_db,
Options* changed_options) {
*changed_options = options;
*blob_db = nullptr;
Status Open();
FlushBeginListener_t fblistener =
std::make_shared<BlobDBFlushBeginListener>();
ReconcileWalFilter_t rw_filter = std::make_shared<BlobReconcileWalFilter>();
CompactionListener_t ce_listener =
std::make_shared<EvictAllVersionsCompactionListener>();
explicit BlobDB(DB* db);
private:
std::string dbname_;
ImmutableCFOptions ioptions_;
InstrumentedMutex mutex_;
std::unique_ptr<RandomAccessFileReader> file_reader_;
std::unique_ptr<WritableFileWriter> file_writer_;
size_t writer_offset_;
size_t next_sync_offset_;
static const std::string kFileName;
static const size_t kBlockHeaderSize;
static const size_t kBytesPerSync;
};
Status NewBlobDB(Options options, std::string dbname, DB** blob_db) {
DB* db;
Status s = DB::Open(options, dbname, &db);
if (!s.ok()) {
return s;
{
MutexLock l(&listener_mutex);
all_blobdb_listeners.push_back(fblistener);
all_blobdb_listeners.push_back(ce_listener);
all_wal_filters.push_back(rw_filter);
}
BlobDB* bdb = new BlobDB(db);
s = bdb->Open();
changed_options->listeners.emplace_back(fblistener);
changed_options->listeners.emplace_back(ce_listener);
changed_options->wal_filter = rw_filter.get();
DBOptions db_options(*changed_options);
// we need to open blob db first so that recovery can happen
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options);
fblistener->SetImplPtr(bdb);
ce_listener->SetImplPtr(bdb);
rw_filter->SetImplPtr(bdb);
Status s = bdb->OpenPhase1();
if (!s.ok()) return s;
*blob_db = bdb;
return s;
}
Status BlobDB::Open(const Options& options, const BlobDBOptions& bdb_options,
const std::string& dbname, BlobDB** blob_db) {
*blob_db = nullptr;
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<ColumnFamilyHandle*> handles;
Status s = BlobDB::Open(db_options, bdb_options, dbname, column_families,
&handles, blob_db);
if (s.ok()) {
assert(handles.size() == 1);
// i can delete the handle since DBImpl is always holding a reference to
// default column family
delete handles[0];
}
return s;
}
Status BlobDB::Open(const DBOptions& db_options,
const BlobDBOptions& bdb_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, BlobDB** blob_db,
bool no_base_db) {
*blob_db = nullptr;
DBOptions my_db_options(db_options);
FlushBeginListener_t fblistener =
std::make_shared<BlobDBFlushBeginListener>();
CompactionListener_t ce_listener =
std::make_shared<EvictAllVersionsCompactionListener>();
ReconcileWalFilter_t rw_filter = std::make_shared<BlobReconcileWalFilter>();
my_db_options.listeners.emplace_back(fblistener);
my_db_options.listeners.emplace_back(ce_listener);
my_db_options.wal_filter = rw_filter.get();
{
MutexLock l(&listener_mutex);
all_blobdb_listeners.push_back(fblistener);
all_blobdb_listeners.push_back(ce_listener);
all_wal_filters.push_back(rw_filter);
}
// we need to open blob db first so that recovery can happen
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, my_db_options);
fblistener->SetImplPtr(bdb);
ce_listener->SetImplPtr(bdb);
rw_filter->SetImplPtr(bdb);
Status s = bdb->OpenPhase1();
if (!s.ok()) return s;
if (no_base_db) return s;
DB* db = nullptr;
s = DB::Open(my_db_options, dbname, column_families, handles, &db);
if (!s.ok()) return s;
// set the implementation pointer
s = bdb->LinkToBaseDB(db);
if (!s.ok()) {
delete bdb;
bdb = nullptr;
}
*blob_db = bdb;
return s;
}
const std::string BlobDB::kFileName = "blob_log";
const size_t BlobDB::kBlockHeaderSize = 8;
const size_t BlobDB::kBytesPerSync = 1024 * 1024 * 128;
BlobDB::BlobDB(DB* db) : StackableDB(db) {}
BlobDB::BlobDB(DB* db)
: StackableDB(db),
ioptions_(db->GetOptions()),
writer_offset_(0),
next_sync_offset_(kBytesPerSync) {}
////////////////////////////////////////////////////////////////////////////////
//
//
// std::function<int(double)> fnCaller =
// std::bind(&A::fn, &anInstance, std::placeholders::_1);
////////////////////////////////////////////////////////////////////////////////
BlobDBOptions::BlobDBOptions()
: blob_dir("blob_dir"),
path_relative(true),
is_fifo(false),
blob_dir_size(1000ULL * 1024ULL * 1024ULL * 1024ULL),
ttl_range_secs(3600),
min_blob_size(512),
bytes_per_sync(0),
blob_file_size(256 * 1024 * 1024),
num_concurrent_simple_blobs(4),
default_ttl_extractor(false),
compression(kNoCompression) {}
Status BlobDB::Open() {
unique_ptr<WritableFile> wfile;
EnvOptions env_options(db_->GetOptions());
Status s = ioptions_.env->NewWritableFile(db_->GetName() + "/" + kFileName,
&wfile, env_options);
if (!s.ok()) {
return s;
}
file_writer_.reset(new WritableFileWriter(std::move(wfile), env_options));
// Write version
std::string version;
PutFixed64(&version, 0);
s = file_writer_->Append(Slice(version));
if (!s.ok()) {
return s;
}
writer_offset_ += version.size();
std::unique_ptr<RandomAccessFile> rfile;
s = ioptions_.env->NewRandomAccessFile(db_->GetName() + "/" + kFileName,
&rfile, env_options);
if (!s.ok()) {
return s;
}
file_reader_.reset(new RandomAccessFileReader(std::move(rfile)));
return s;
}
Status BlobDB::Put(const WriteOptions& options, const Slice& key,
const Slice& value) {
BlockBuilder block_builder(1, false);
block_builder.Add(key, value);
CompressionType compression = CompressionType::kLZ4Compression;
CompressionOptions compression_opts;
Slice block_contents;
std::string compression_output;
block_contents = CompressBlock(block_builder.Finish(), compression_opts,
&compression, kBlockBasedTableVersionFormat,
Slice() /* dictionary */, &compression_output);
char header[kBlockHeaderSize];
char trailer[kBlockTrailerSize];
trailer[0] = compression;
auto crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type
EncodeFixed32(trailer + 1, crc32c::Mask(crc));
BlockHandle handle;
std::string index_entry;
Status s;
{
InstrumentedMutexLock l(&mutex_);
auto raw_block_size = block_contents.size();
EncodeFixed64(header, raw_block_size);
s = file_writer_->Append(Slice(header, kBlockHeaderSize));
writer_offset_ += kBlockHeaderSize;
if (s.ok()) {
handle.set_offset(writer_offset_);
handle.set_size(raw_block_size);
s = file_writer_->Append(block_contents);
}
if (s.ok()) {
s = file_writer_->Append(Slice(trailer, kBlockTrailerSize));
}
if (s.ok()) {
s = file_writer_->Flush();
}
if (s.ok() && writer_offset_ > next_sync_offset_) {
// Sync every kBytesPerSync. This is a hacky way to limit unsynced data.
next_sync_offset_ += kBytesPerSync;
s = file_writer_->Sync(db_->GetOptions().use_fsync);
}
if (s.ok()) {
writer_offset_ += block_contents.size() + kBlockTrailerSize;
// Put file number
PutVarint64(&index_entry, 0);
handle.EncodeTo(&index_entry);
s = db_->Put(options, key, index_entry);
}
}
return s;
}
Status BlobDB::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
std::string index_entry;
s = db_->Get(options, key, &index_entry);
if (!s.ok()) {
return s;
}
BlockHandle handle;
Slice index_entry_slice(index_entry);
uint64_t file_number;
if (!GetVarint64(&index_entry_slice, &file_number)) {
return Status::Corruption();
}
assert(file_number == 0);
s = handle.DecodeFrom(&index_entry_slice);
if (!s.ok()) {
return s;
}
Footer footer(0, kBlockBasedTableVersionFormat);
BlockContents contents;
s = ReadBlockContents(file_reader_.get(), footer, options, handle, &contents,
ioptions_);
if (!s.ok()) {
return s;
}
Block block(std::move(contents), kDisableGlobalSequenceNumber);
BlockIter bit;
InternalIterator* it = block.NewIterator(nullptr, &bit);
it->SeekToFirst();
if (!it->status().ok()) {
return it->status();
}
*value = it->value().ToString();
return s;
}
} // namespace blob_db
} // namespace rocksdb
#else
namespace rocksdb {
Status NewBlobDB(Options options, std::string dbname, DB** blob_db) {
return Status::NotSupported();
}
} // namespace rocksdb
#endif // ROCKSDB_LITE
#endif

View File

@ -7,12 +7,19 @@
#pragma once
#ifndef ROCKSDB_LITE
#include <functional>
#include <string>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/stackable_db.h"
namespace rocksdb {
// EXPERIMENAL ONLY
namespace blob_db {
// A wrapped database which puts values of KV pairs in a separate log
// and store location to the log in the underlying DB.
// It lacks lots of importatant functionalities, e.g. DB restarts,
@ -20,5 +27,177 @@ namespace rocksdb {
//
// The factory needs to be moved to include/rocksdb/utilities to allow
// users to use blob DB.
extern Status NewBlobDB(Options options, std::string dbname, DB** blob_db);
struct BlobDBOptions {
// name of the directory under main db, where blobs will be stored.
// default is "blob_dir"
std::string blob_dir;
// whether the blob_dir path is relative or absolute.
bool path_relative;
// is the eviction strategy fifo based
bool is_fifo;
// maximum size of the blob dir. Once this gets used, up
// evict the blob file which is oldest (is_fifo )
// 0 means no limits
uint64_t blob_dir_size;
// a new bucket is opened, for ttl_range. So if ttl_range is 600seconds
// (10 minutes), and the first bucket starts at 1471542000
// then the blob buckets will be
// first bucket is 1471542000 - 1471542600
// second bucket is 1471542600 - 1471543200
// and so on
uint32_t ttl_range_secs;
// at what size will the blobs be stored in separate log rather than
// inline
uint64_t min_blob_size;
// at what bytes will the blob files be synced to blob log.
uint64_t bytes_per_sync;
// the target size of each blob file. File will become immutable
// after it exceeds that size
uint64_t blob_file_size;
// how many files to use for simple blobs at one time
uint32_t num_concurrent_simple_blobs;
// this function is to be provided by client if they intend to
// use Put API to provide TTL.
// the first argument is the value in the Put API
// in case you want to do some modifications to the value,
// return a new Slice in the second.
// otherwise just copy the input value into output.
// the ttl should be extracted and returned in last pointer.
// otherwise assign it to -1
std::function<bool(const Slice&, Slice*, int32_t*)> extract_ttl_fn;
// eviction callback.
// this function will be called for every blob that is getting
// evicted.
std::function<void(const ColumnFamilyHandle*, const Slice&, const Slice&)>
gc_evict_cb_fn;
// default ttl extactor
bool default_ttl_extractor;
// what compression to use for Blob's
CompressionType compression;
// default constructor
BlobDBOptions();
BlobDBOptions(const BlobDBOptions& in) = default;
virtual ~BlobDBOptions() = default;
};
class BlobDB : public StackableDB {
public:
// the suffix to a blob value to represent "ttl:TTLVAL"
static const uint64_t kTTLSuffixLength = 8;
public:
using rocksdb::StackableDB::Put;
// This function needs to be called before destroying
// the base DB
static Status DestroyBlobDB(const std::string& dbname, const Options& options,
const BlobDBOptions& bdb_options);
virtual Status Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override = 0;
using rocksdb::StackableDB::Delete;
virtual Status Delete(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key) override = 0;
virtual Status PutWithTTL(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value, int32_t ttl) = 0;
virtual Status PutWithTTL(const WriteOptions& options, const Slice& key,
const Slice& value, int32_t ttl) {
return PutWithTTL(options, DefaultColumnFamily(), key, value, ttl);
}
virtual Status PutUntil(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value, int32_t expiration) = 0;
virtual Status PutUntil(const WriteOptions& options, const Slice& key,
const Slice& value, int32_t expiration) {
return PutUntil(options, DefaultColumnFamily(), key, value, expiration);
}
using rocksdb::StackableDB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override = 0;
using rocksdb::StackableDB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override = 0;
using rocksdb::StackableDB::SingleDelete;
virtual Status SingleDelete(const WriteOptions& wopts,
ColumnFamilyHandle* column_family,
const Slice& key) override = 0;
using rocksdb::StackableDB::Merge;
virtual Status Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override {
return Status::NotSupported("Not supported operation in blob db.");
}
virtual Status Write(const WriteOptions& opts,
WriteBatch* updates) override = 0;
// Starting point for opening a Blob DB.
// changed_options - critical. Blob DB loads and inserts listeners
// into options which are necessary for recovery and atomicity
// Use this pattern if you need control on step 2, i.e. your
// BaseDB is not just a simple rocksdb but a stacked DB
// 1. ::OpenAndLoad
// 2. Open Base DB with the changed_options
// 3. ::LinkToBaseDB
static Status OpenAndLoad(const Options& options,
const BlobDBOptions& bdb_options,
const std::string& dbname, BlobDB** blob_db,
Options* changed_options);
// This is another way to open BLOB DB which do not have other
// Stackable DB's in play
// Steps.
// 1. ::Open
static Status Open(const Options& options, const BlobDBOptions& bdb_options,
const std::string& dbname, BlobDB** blob_db);
static Status Open(const DBOptions& db_options,
const BlobDBOptions& bdb_options,
const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
BlobDB** blob_db, bool no_base_db = false);
virtual ~BlobDB() {}
virtual Status LinkToBaseDB(DB* db_base) = 0;
protected:
explicit BlobDB(DB* db);
};
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,657 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <atomic>
#include <condition_variable>
#include <ctime>
#include <list>
#include <memory>
#include <set>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/listener.h"
#include "rocksdb/options.h"
#include "rocksdb/wal_filter.h"
#include "util/file_reader_writer.h"
#include "util/mpsc.h"
#include "util/mutexlock.h"
#include "util/timer_queue.h"
#include "utilities/blob_db/blob_db.h"
#include "utilities/blob_db/blob_db_options_impl.h"
#include "utilities/blob_db/blob_log_format.h"
#include "utilities/blob_db/blob_log_reader.h"
#include "utilities/blob_db/blob_log_writer.h"
namespace rocksdb {
class DBImpl;
class ColumnFamilyHandle;
class ColumnFamilyData;
class OptimisticTransactionDBImpl;
struct FlushJobInfo;
namespace blob_db {
class BlobFile;
class BlobDBImpl;
struct GCStats;
class BlobDBFlushBeginListener : public EventListener {
public:
explicit BlobDBFlushBeginListener() : impl_(nullptr) {}
void OnFlushBegin(DB* db, const FlushJobInfo& info) override;
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
protected:
BlobDBImpl* impl_;
};
// this implements the callback from the WAL which ensures that the
// blob record is present in the blob log. If fsync/fdatasync in not
// happening on every write, there is the probability that keys in the
// blob log can lag the keys in blobs
class BlobReconcileWalFilter : public WalFilter {
public:
virtual WalFilter::WalProcessingOption LogRecordFound(
unsigned long long log_number, const std::string& log_file_name,
const WriteBatch& batch, WriteBatch* new_batch,
bool* batch_changed) override;
virtual const char* Name() const override { return "BlobDBWalReconciler"; }
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
protected:
BlobDBImpl* impl_;
};
class EvictAllVersionsCompactionListener : public EventListener {
public:
class InternalListener : public CompactionEventListener {
friend class BlobDBImpl;
public:
virtual void OnCompaction(int level, const Slice& key,
CompactionListenerValueType value_type,
const Slice& existing_value,
const SequenceNumber& sn, bool is_new) override;
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
private:
BlobDBImpl* impl_;
};
explicit EvictAllVersionsCompactionListener()
: internal_listener_(new InternalListener()) {}
virtual CompactionEventListener* GetCompactionEventListener() override {
return internal_listener_.get();
}
void SetImplPtr(BlobDBImpl* p) { internal_listener_->SetImplPtr(p); }
private:
std::unique_ptr<InternalListener> internal_listener_;
};
#if 0
class EvictAllVersionsFilterFactory : public CompactionFilterFactory {
private:
BlobDBImpl* impl_;
public:
EvictAllVersionsFilterFactory() : impl_(nullptr) {}
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override;
virtual const char* Name() const override {
return "EvictAllVersionsFilterFactory";
}
};
#endif
// Comparator to sort "TTL" aware Blob files based on the lower value of
// TTL range.
struct blobf_compare_ttl {
bool operator()(const std::shared_ptr<BlobFile>& lhs,
const std::shared_ptr<BlobFile>& rhs) const;
};
/**
* The implementation class for BlobDB. This manages the value
* part in TTL aware sequentially written files. These files are
* Garbage Collected.
*/
class BlobDBImpl : public BlobDB {
friend class BlobDBFlushBeginListener;
friend class EvictAllVersionsCompactionListener;
friend class BlobDB;
friend class BlobFile;
friend class BlobDBIterator;
public:
using rocksdb::StackableDB::Put;
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) override;
using rocksdb::StackableDB::Delete;
Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key) override;
using rocksdb::StackableDB::SingleDelete;
virtual Status SingleDelete(const WriteOptions& wopts,
ColumnFamilyHandle* column_family,
const Slice& key) override;
using rocksdb::StackableDB::Get;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) override;
using rocksdb::StackableDB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& opts,
ColumnFamilyHandle* column_family) override;
using rocksdb::StackableDB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
using BlobDB::PutWithTTL;
Status PutWithTTL(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value, int32_t ttl) override;
using BlobDB::PutUntil;
Status PutUntil(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value_unc, int32_t expiration) override;
Status LinkToBaseDB(DB* db) override;
BlobDBImpl(DB* db, const BlobDBOptions& bdb_options);
BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
const DBOptions& db_options);
~BlobDBImpl();
private:
static bool ExtractTTLFromBlob(const Slice& value, Slice* newval,
int32_t* ttl_val);
Status OpenPhase1();
Status CommonGet(const ColumnFamilyData* cfd, const Slice& key,
const std::string& index_entry, std::string* value);
// Just before flush starts acting on memtable files,
// this handler is called.
void OnFlushBeginHandler(DB* db, const FlushJobInfo& info);
// timer queue callback to close a file by appending a footer
// removes file from open files list
std::pair<bool, int64_t> CloseSeqWrite(std::shared_ptr<BlobFile> bfile,
bool aborted);
// is this file ready for Garbage collection. if the TTL of the file
// has expired or if threshold of the file has been evicted
// tt - current time
// last_id - the id of the non-TTL file to evict
bool ShouldGCFile(std::shared_ptr<BlobFile> bfile, std::time_t tt,
uint64_t last_id, std::string* reason);
// collect all the blob log files from the blob directory
Status GetAllLogFiles(std::set<std::pair<uint64_t, std::string>>* file_nums);
// appends a task into timer queue to close the file
void CloseIf(const std::shared_ptr<BlobFile>& bfile);
Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
const std::string& headerbuf, const Slice& key,
const Slice& value, std::string* index_entry);
Status AppendSN(const std::shared_ptr<BlobFile>& bfile,
const SequenceNumber& sn);
// find an existing blob log file based on the expiration unix epoch
// if such a file does not exist, return nullptr
std::shared_ptr<BlobFile> SelectBlobFileTTL(uint32_t expiration);
// find an existing blob log file to append the value to
std::shared_ptr<BlobFile> SelectBlobFile();
std::shared_ptr<BlobFile> FindBlobFileLocked(uint32_t expiration) const;
void UpdateWriteOptions(const WriteOptions& options);
void Shutdown();
// periodic sanity check. Bunch of checks
std::pair<bool, int64_t> SanityCheck(bool aborted);
// delete files which have been garbage collected and marked
// obsolete. Check whether any snapshots exist which refer to
// the same
std::pair<bool, int64_t> DeleteObsFiles(bool aborted);
// Major task to garbage collect expired and deleted blobs
std::pair<bool, int64_t> RunGC(bool aborted);
// asynchronous task to fsync/fdatasync the open blob files
std::pair<bool, int64_t> FsyncFiles(bool aborted);
// periodically check if open blob files and their TTL's has expired
// if expired, close the sequential writer and make the file immutable
std::pair<bool, int64_t> CheckSeqFiles(bool aborted);
// if the number of open files, approaches ULIMIT's this
// task will close random readers, which are kept around for
// efficiency
std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted);
// periodically print write amplification statistics
std::pair<bool, int64_t> WaStats(bool aborted);
// background task to do book-keeping of deleted keys
std::pair<bool, int64_t> EvictDeletions(bool aborted);
std::pair<bool, int64_t> EvictCompacted(bool aborted);
bool CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile);
std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted);
std::pair<bool, int64_t> CallbackEvicts(TimerQueue* tq,
std::shared_ptr<BlobFile> bfile,
bool aborted);
// Adds the background tasks to the timer queue
void StartBackgroundTasks();
// add a new Blob File
std::shared_ptr<BlobFile> NewBlobFile(const std::string& reason);
Status OpenAllFiles();
// hold write mutex on file and call
// creates a Random Access reader for GET call
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
const std::shared_ptr<BlobFile>& bfile, Env* env,
const EnvOptions& env_options);
// hold write mutex on file and call.
// Close the above Random Access reader
void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile);
// hold write mutex on file and call
// creates a sequential (append) writer for this blobfile
Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile);
// returns a Writer object for the file. If writer is not
// already present, creates one. Needs Write Mutex to be held
std::shared_ptr<Writer> CheckOrCreateWriterLocked(
const std::shared_ptr<BlobFile>& bfile);
// Iterate through keys and values on Blob and write into
// separate file the remaining blobs and delete/update pointers
// in LSM atomically
Status GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
GCStats* gcstats);
// checks if there is no snapshot which is referencing the
// blobs
bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);
bool MarkBlobDeleted(const Slice& key, const Slice& lsmValue);
bool FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
uint64_t blob_offset, uint64_t blob_size);
void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy,
uint64_t* last_id);
void FilterSubsetOfFiles(
const std::vector<std::shared_ptr<BlobFile>>& blob_files,
std::vector<std::shared_ptr<BlobFile>>* to_process, uint64_t epoch,
uint64_t last_id, size_t files_to_collect);
private:
// the base DB
DBImpl* db_impl_;
Env* myenv_;
// Optimistic Transaction DB used during Garbage collection
// for atomicity
std::unique_ptr<OptimisticTransactionDBImpl> opt_db_;
// a boolean to capture whether write_options has been set
std::atomic<bool> wo_set_;
WriteOptions write_options_;
// the options that govern the behavior of Blob Storage
BlobDBOptionsImpl bdb_options_;
DBOptions db_options_;
EnvOptions env_options_;
// name of the database directory
std::string dbname_;
// by default this is "blob_dir" under dbname_
// but can be configured
std::string blob_dir_;
// pointer to directory
std::unique_ptr<Directory> dir_ent_;
std::atomic<bool> dir_change_;
// Read Write Mutex, which protects all the data structures
// HEAVILY TRAFFICKED
port::RWMutex mutex_;
// counter for blob file number
std::atomic<uint64_t> next_file_number_;
// entire metadata of all the BLOB files memory
std::unordered_map<uint64_t, std::shared_ptr<BlobFile>> blob_files_;
// epoch or version of the open files.
std::atomic<uint64_t> epoch_of_;
// typically we keep 4 open blob files (simple i.e. no TTL)
std::vector<std::shared_ptr<BlobFile>> open_simple_files_;
// all the blob files which are currently being appended to based
// on variety of incoming TTL's
std::multiset<std::shared_ptr<BlobFile>, blobf_compare_ttl> open_blob_files_;
// packet of information to put in lockess delete(s) queue
struct delete_packet_t {
ColumnFamilyHandle* cfh_;
std::string key_;
SequenceNumber dsn_;
};
struct override_packet_t {
uint64_t file_number_;
uint64_t key_size_;
uint64_t blob_offset_;
uint64_t blob_size_;
SequenceNumber dsn_;
};
// LOCKLESS multiple producer single consumer queue to quickly append
// deletes without taking lock. Can rapidly grow in size!!
// deletes happen in LSM, but minor book-keeping needs to happen on
// BLOB side (for triggering eviction)
mpsc_queue_t<delete_packet_t> delete_keys_q_;
// LOCKLESS multiple producer single consumer queue for values
// that are being compacted
mpsc_queue_t<override_packet_t> override_vals_q_;
// atomic bool to represent shutdown
std::atomic<bool> shutdown_;
// timer based queue to execute tasks
TimerQueue tqueue_;
// timer queues to call eviction callbacks.
std::vector<std::shared_ptr<TimerQueue>> cb_threads_;
// only accessed in GC thread, hence not atomic. The epoch of the
// GC task. Each execution is one epoch. Helps us in allocating
// files to one execution
uint64_t current_epoch_;
// number of files opened for random access/GET
// counter is used to monitor and close excess RA files.
std::atomic<uint32_t> open_file_count_;
// should hold mutex to modify
// STATISTICS for WA of Blob Files due to GC
// collect by default 24 hourly periods
std::list<uint64_t> all_periods_write_;
std::list<uint64_t> all_periods_ampl_;
std::atomic<uint64_t> last_period_write_;
std::atomic<uint64_t> last_period_ampl_;
uint64_t total_periods_write_;
uint64_t total_periods_ampl_;
// total size of all blob files at a given time
std::atomic<uint64_t> total_blob_space_;
std::list<std::shared_ptr<BlobFile>> obsolete_files_;
bool open_p1_done_;
uint32_t debug_level_;
};
class BlobFile {
friend class BlobDBImpl;
friend struct blobf_compare_ttl;
private:
// access to parent
const BlobDBImpl* parent_;
// path to blob directory
std::string path_to_dir_;
// the id of the file.
// the above 2 are created during file creation and never changed
// after that
uint64_t file_number_;
// number of blobs in the file
std::atomic<uint64_t> blob_count_;
// the file will be selected for GC in this future epoch
std::atomic<int64_t> gc_epoch_;
// size of the file
std::atomic<uint64_t> file_size_;
// number of blobs in this particular file which have been evicted
uint64_t deleted_count_;
// size of deleted blobs (used by heuristic to select file for GC)
uint64_t deleted_size_;
BlobLogHeader header_;
// closed_ = true implies the file is no more mutable
// no more blobs will be appended and the footer has been written out
std::atomic<bool> closed_;
// has a pass of garbage collection successfully finished on this file
// can_be_deleted_ still needs to do iterator/snapshot checks
std::atomic<bool> can_be_deleted_;
// should this file been gc'd once to reconcile lost deletes/compactions
std::atomic<bool> gc_once_after_open_;
// et - lt of the blobs
ttlrange_t ttl_range_;
// et - lt of the timestamp of the KV pairs.
tsrange_t time_range_;
// ESN - LSN of the blobs
snrange_t sn_range_;
// Sequential/Append writer for blobs
std::shared_ptr<Writer> log_writer_;
// random access file reader for GET calls
std::shared_ptr<RandomAccessFileReader> ra_file_reader_;
// This Read-Write mutex is per file specific and protects
// all the datastructures
port::RWMutex mutex_;
// time when the random access reader was last created.
std::atomic<std::time_t> last_access_;
// last time file was fsync'd/fdatasyncd
std::atomic<uint64_t> last_fsync_;
bool header_valid_;
public:
BlobFile();
BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum);
~BlobFile();
ColumnFamilyHandle* GetColumnFamily(DB* db);
// Returns log file's pathname relative to the main db dir
// Eg. For a live-log-file = blob_dir/000003.blob
std::string PathName() const;
// Primary identifier for blob file.
// once the file is created, this never changes
uint64_t BlobFileNumber() const { return file_number_; }
// the following functions are atomic, and don't need
// read lock
uint64_t BlobCount() const {
return blob_count_.load(std::memory_order_acquire);
}
std::string DumpState() const;
// if the file has gone through GC and blobs have been relocated
bool Obsolete() const { return can_be_deleted_.load(); }
// if the file is not taking any more appends.
bool Immutable() const { return closed_.load(); }
// we will assume this is atomic
bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const;
uint64_t GetFileSize() const {
return file_size_.load(std::memory_order_acquire);
}
// All Get functions which are not atomic, will need ReadLock on the mutex
tsrange_t GetTimeRange() const {
assert(HasTimestamp());
return time_range_;
}
ttlrange_t GetTTLRange() const { return ttl_range_; }
snrange_t GetSNRange() const { return sn_range_; }
bool HasTTL() const {
assert(header_valid_);
return header_.HasTTL();
}
bool HasTimestamp() const {
assert(header_valid_);
return header_.HasTimestamp();
}
std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
void Fsync();
private:
std::shared_ptr<Reader> OpenSequentialReader(
Env* env, const DBOptions& db_options,
const EnvOptions& env_options) const;
Status ReadFooter(BlobLogFooter* footer);
Status WriteFooterAndCloseLocked();
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
Env* env, const EnvOptions& env_options, bool* fresh_open);
void CloseRandomAccessLocked();
// this is used, when you are reading only the footer of a
// previously closed file
Status SetFromFooterLocked(const BlobLogFooter& footer);
void set_time_range(const tsrange_t& tr) { time_range_ = tr; }
void set_ttl_range(const ttlrange_t& ttl) { ttl_range_ = ttl; }
void SetSNRange(const snrange_t& snr) { sn_range_ = snr; }
// The following functions are atomic, and don't need locks
void SetFileSize(uint64_t fs) { file_size_ = fs; }
void SetBlobCount(uint64_t bc) { blob_count_ = bc; }
void SetCanBeDeleted() { can_be_deleted_ = true; }
};
class BlobDBIterator : public Iterator {
public:
explicit BlobDBIterator(Iterator* iter, ColumnFamilyHandle* column_family,
BlobDBImpl* impl)
: iter_(iter), cfh_(column_family), db_impl_(impl) {
assert(iter_);
}
~BlobDBIterator() { delete iter_; }
bool Valid() const override { return iter_->Valid(); }
void SeekToFirst() override { iter_->SeekToFirst(); }
void SeekToLast() override { iter_->SeekToLast(); }
void Seek(const Slice& target) override { iter_->Seek(target); }
void SeekForPrev(const Slice& target) override { iter_->SeekForPrev(target); }
void Next() override { iter_->Next(); }
void Prev() override { iter_->Prev(); }
Slice key() const override { return iter_->key(); }
Slice value() const override;
Status status() const override { return iter_->status(); }
private:
Iterator* iter_;
ColumnFamilyHandle* cfh_;
BlobDBImpl* db_impl_;
mutable std::string vpart_;
};
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,66 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_db_options_impl.h"
namespace rocksdb {
namespace blob_db {
BlobDBOptionsImpl::BlobDBOptionsImpl(const BlobDBOptions& in)
: BlobDBOptions(in),
deletion_check_period_millisecs(2 * 1000),
gc_file_pct(20),
gc_check_period_millisecs(60 * 1000),
sanity_check_period_millisecs(20 * 60 * 1000),
open_files_trigger(100),
wa_num_stats_periods(24),
wa_stats_period_millisecs(3600 * 1000),
partial_expiration_gc_range_secs(4 * 3600),
partial_expiration_pct(75),
fsync_files_period_millisecs(10 * 1000),
reclaim_of_period_millisecs(1 * 1000),
delete_obsf_period_millisecs(10 * 1000),
check_seqf_period_millisecs(10 * 1000) {}
BlobDBOptionsImpl::BlobDBOptionsImpl()
: deletion_check_period_millisecs(2 * 1000),
gc_file_pct(20),
gc_check_period_millisecs(60 * 1000),
sanity_check_period_millisecs(20 * 60 * 1000),
open_files_trigger(100),
wa_num_stats_periods(24),
wa_stats_period_millisecs(3600 * 1000),
partial_expiration_gc_range_secs(4 * 3600),
partial_expiration_pct(75),
fsync_files_period_millisecs(10 * 1000),
reclaim_of_period_millisecs(1 * 1000),
delete_obsf_period_millisecs(10 * 1000),
check_seqf_period_millisecs(10 * 1000) {}
BlobDBOptionsImpl& BlobDBOptionsImpl::operator=(const BlobDBOptionsImpl& in) {
BlobDBOptions::operator=(in);
if (this != &in) {
deletion_check_period_millisecs = in.deletion_check_period_millisecs;
gc_file_pct = in.gc_file_pct;
gc_check_period_millisecs = in.gc_check_period_millisecs;
sanity_check_period_millisecs = in.sanity_check_period_millisecs;
open_files_trigger = in.open_files_trigger;
wa_num_stats_periods = in.wa_num_stats_periods;
wa_stats_period_millisecs = in.wa_stats_period_millisecs;
partial_expiration_gc_range_secs = in.partial_expiration_gc_range_secs;
partial_expiration_pct = in.partial_expiration_pct;
fsync_files_period_millisecs = in.fsync_files_period_millisecs;
reclaim_of_period_millisecs = in.reclaim_of_period_millisecs;
delete_obsf_period_millisecs = in.delete_obsf_period_millisecs;
check_seqf_period_millisecs = in.check_seqf_period_millisecs;
}
return *this;
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,73 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_db.h"
namespace rocksdb {
namespace blob_db {
struct BlobDBOptionsImpl : public BlobDBOptions {
// deletions check period
uint32_t deletion_check_period_millisecs;
// gc percentage each check period
uint32_t gc_file_pct;
// gc period
uint32_t gc_check_period_millisecs;
// sanity check task
uint32_t sanity_check_period_millisecs;
// how many random access open files can we tolerate
uint32_t open_files_trigger;
// how many periods of stats do we keep.
uint32_t wa_num_stats_periods;
// what is the length of any period
uint32_t wa_stats_period_millisecs;
// we will garbage collect blob files in
// which entire files have expired. However if the
// ttl_range of files is very large say a day, we
// would have to wait for the entire day, before we
// recover most of the space.
uint32_t partial_expiration_gc_range_secs;
// this should be based on allowed Write Amplification
// if 50% of the space of a blob file has been deleted/expired,
uint32_t partial_expiration_pct;
// how often should we schedule a job to fsync open files
uint32_t fsync_files_period_millisecs;
// how often to schedule reclaim open files.
uint32_t reclaim_of_period_millisecs;
// how often to schedule delete obs files periods
uint32_t delete_obsf_period_millisecs;
// how often to schedule check seq files period
uint32_t check_seqf_period_millisecs;
// default constructor
BlobDBOptionsImpl();
explicit BlobDBOptionsImpl(const BlobDBOptions& in);
BlobDBOptionsImpl& operator=(const BlobDBOptionsImpl& in);
};
} // namespace blob_db
} // namespace rocksdb
#endif // endif ROCKSDB

View File

@ -1,66 +1,567 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_db.h"
#include <cstdlib>
#include "db/db_test_util.h"
#include "util/random.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/blob_db/blob_db_options_impl.h"
namespace rocksdb {
class BlobDBTest : public testing::Test {
public:
BlobDBTest() {
dbname_ = test::TmpDir() + "/blob_db_test";
Options options;
options.create_if_missing = true;
EXPECT_TRUE(NewBlobDB(options, dbname_, &db_).ok());
namespace blob_db {
Random s_rnd(301);
void gen_random(char *s, const int len) {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
for (int i = 0; i < len; ++i) {
s[i] = alphanum[s_rnd.Next() % (sizeof(alphanum) - 1)];
}
~BlobDBTest() { delete db_; }
s[len] = 0;
}
DB* db_;
class BlobDBTest : public testing::Test {
public:
BlobDBTest() : blobdb_(nullptr) {
dbname_ = test::TmpDir() + "/blob_db_test";
// Reopen1(BlobDBOptionsImpl());
}
~BlobDBTest() {
if (blobdb_) {
delete blobdb_;
blobdb_ = nullptr;
}
}
void Reopen1(const BlobDBOptionsImpl &bdboptions,
const Options &options = Options()) {
if (blobdb_) {
delete blobdb_;
blobdb_ = nullptr;
}
BlobDBOptionsImpl bblobdb_options = bdboptions;
Options myoptions = options;
BlobDB::DestroyBlobDB(dbname_, myoptions, bblobdb_options);
DestroyDB(dbname_, myoptions);
myoptions.create_if_missing = true;
EXPECT_TRUE(
BlobDB::Open(myoptions, bblobdb_options, dbname_, &blobdb_).ok());
}
void insert_blobs() {
WriteOptions wo;
ReadOptions ro;
std::string value;
ColumnFamilyHandle *dcfh = blobdb_->DefaultColumnFamily();
Random rnd(301);
for (size_t i = 0; i < 100000; i++) {
int len = rnd.Next() % 16384;
if (!len) continue;
char *val = new char[len + 1];
gen_random(val, len);
std::string key("key");
key += std::to_string(i % 500);
Slice keyslice(key);
Slice valslice(val, len + 1);
int ttl = rnd.Next() % 86400;
ASSERT_OK(blobdb_->PutWithTTL(wo, dcfh, keyslice, valslice, ttl));
delete[] val;
}
for (size_t i = 0; i < 10; i++) {
std::string key("key");
key += std::to_string(i % 500);
Slice keyslice(key);
blobdb_->Delete(wo, dcfh, keyslice);
}
}
BlobDB *blobdb_;
std::string dbname_;
}; // class BlobDBTest
TEST_F(BlobDBTest, Basic) {
TEST_F(BlobDBTest, DeleteComplex) {
BlobDBOptionsImpl bdboptions;
bdboptions.partial_expiration_pct = 75;
bdboptions.gc_check_period_millisecs = 20 * 1000;
bdboptions.blob_file_size = 219 * 1024;
Reopen1(bdboptions);
WriteOptions wo;
ReadOptions ro;
std::string value;
ASSERT_OK(db_->Put(wo, "foo", "v1"));
ASSERT_OK(db_->Put(wo, "bar", "v2"));
ColumnFamilyHandle *dcfh = blobdb_->DefaultColumnFamily();
ASSERT_OK(db_->Get(ro, "foo", &value));
ASSERT_EQ("v1", value);
ASSERT_OK(db_->Get(ro, "bar", &value));
ASSERT_EQ("v2", value);
Random rnd(301);
for (size_t i = 0; i < 100; i++) {
int len = rnd.Next() % 16384;
if (!len) continue;
char *val = new char[len + 1];
gen_random(val, len);
std::string key("key");
key += std::to_string(i);
Slice keyslice(key);
Slice valslice(val, len + 1);
ASSERT_OK(blobdb_->Put(wo, dcfh, keyslice, valslice));
delete[] val;
}
for (size_t i = 0; i < 99; i++) {
std::string key("key");
key += std::to_string(i);
Slice keyslice(key);
blobdb_->Delete(wo, dcfh, keyslice);
}
Env::Default()->SleepForMicroseconds(60 * 1000 * 1000);
}
TEST_F(BlobDBTest, OverrideTest) {
BlobDBOptionsImpl bdboptions;
bdboptions.ttl_range_secs = 30;
bdboptions.gc_file_pct = 100;
bdboptions.gc_check_period_millisecs = 20 * 1000;
bdboptions.num_concurrent_simple_blobs = 2;
bdboptions.blob_file_size = 876 * 1024 * 10;
Options options;
options.write_buffer_size = 256 * 1024;
options.info_log_level = INFO_LEVEL;
Reopen1(bdboptions, options);
WriteOptions wo;
ReadOptions ro;
std::string value;
Random rnd(301);
ColumnFamilyHandle *dcfh = blobdb_->DefaultColumnFamily();
for (int i = 0; i < 10000; i++) {
int len = rnd.Next() % 16384;
if (!len) continue;
char *val = new char[len + 1];
gen_random(val, len);
std::string key("key");
char x[10];
std::sprintf(x, "%04d", i);
key += std::string(x);
Slice keyslice(key);
Slice valslice(val, len + 1);
ASSERT_OK(blobdb_->Put(wo, dcfh, keyslice, valslice));
delete[] val;
}
// override all the keys
for (int i = 0; i < 10000; i++) {
int len = rnd.Next() % 16384;
if (!len) continue;
char *val = new char[len + 1];
gen_random(val, len);
std::string key("key");
char x[10];
std::sprintf(x, "%04d", i);
key += std::string(x);
Slice keyslice(key);
Slice valslice(val, len + 1);
ASSERT_OK(blobdb_->Put(wo, dcfh, keyslice, valslice));
delete[] val;
}
blobdb_->Flush(FlushOptions());
#if 1
blobdb_->GetBaseDB()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
reinterpret_cast<DBImpl *>(blobdb_->GetBaseDB())->TEST_WaitForFlushMemTable();
reinterpret_cast<DBImpl *>(blobdb_->GetBaseDB())->TEST_WaitForCompact();
#endif
Env::Default()->SleepForMicroseconds(120 * 1000 * 1000);
}
TEST_F(BlobDBTest, DeleteTest) {
BlobDBOptionsImpl bdboptions;
bdboptions.ttl_range_secs = 30;
bdboptions.gc_file_pct = 100;
bdboptions.partial_expiration_pct = 18;
bdboptions.gc_check_period_millisecs = 20 * 1000;
bdboptions.num_concurrent_simple_blobs = 1;
bdboptions.blob_file_size = 876 * 1024;
Reopen1(bdboptions);
WriteOptions wo;
ReadOptions ro;
std::string value;
Random rnd(301);
ColumnFamilyHandle *dcfh = blobdb_->DefaultColumnFamily();
for (size_t i = 0; i < 100; i++) {
int len = rnd.Next() % 16384;
if (!len) continue;
char *val = new char[len + 1];
gen_random(val, len);
std::string key("key");
key += std::to_string(i);
Slice keyslice(key);
Slice valslice(val, len + 1);
ASSERT_OK(blobdb_->Put(wo, dcfh, keyslice, valslice));
delete[] val;
}
for (size_t i = 0; i < 100; i += 5) {
std::string key("key");
key += std::to_string(i);
Slice keyslice(key);
blobdb_->Delete(wo, dcfh, keyslice);
}
Env::Default()->SleepForMicroseconds(60 * 1000 * 1000);
}
TEST_F(BlobDBTest, GCTestWithWrite) {
BlobDBOptionsImpl bdboptions;
bdboptions.ttl_range_secs = 30;
bdboptions.gc_file_pct = 100;
bdboptions.gc_check_period_millisecs = 20 * 1000;
bdboptions.default_ttl_extractor = true;
Reopen1(bdboptions);
WriteOptions wo;
ReadOptions ro;
std::string value;
ColumnFamilyHandle *dcfh = blobdb_->DefaultColumnFamily();
WriteBatch WB;
Random rnd(301);
for (size_t i = 0; i < 100; i++) {
int len = rnd.Next() % 16384;
if (!len) continue;
int ttl = 30;
char *val = new char[len + BlobDB::kTTLSuffixLength];
gen_random(val, len);
strncpy(val + len, "ttl:", 4);
EncodeFixed32(val + len + 4, ttl);
std::string key("key");
key += std::to_string(i);
Slice keyslice(key);
Slice valslice(val, len + BlobDB::kTTLSuffixLength);
WB.Put(dcfh, keyslice, valslice);
delete[] val;
}
ASSERT_OK(blobdb_->Write(wo, &WB));
Env::Default()->SleepForMicroseconds(120 * 1000 * 1000);
}
void cb_evict(const ColumnFamilyHandle *cfh, const Slice &key,
const Slice &val) {
fprintf(stderr, "key evicted: %s\n", key.ToString().c_str());
}
static const char *LONG_STRING =
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFJFJFJTWFNLLFKFFMFMFMFMFMFMFMFMFMFMFMFMFMMF "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJAJFJFJFJFJTWBFNMFLLWMFMFMFMWKWMFMFMFMFMFMFM "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH "
"AJFJFJFFFFFFFFFFFFFFFFFFFJFHFHFHFHFHFHFHHFHHFHHFH ";
TEST_F(BlobDBTest, GetWithCompression) {
BlobDBOptionsImpl bdboptions;
bdboptions.gc_file_pct = 100;
bdboptions.gc_check_period_millisecs = 20 * 1000;
bdboptions.default_ttl_extractor = true;
bdboptions.gc_evict_cb_fn = &cb_evict;
bdboptions.compression = CompressionType::kLZ4Compression;
Reopen1(bdboptions);
WriteOptions wo;
ReadOptions ro;
std::string value;
Random rnd(301);
ColumnFamilyHandle *dcfh = blobdb_->DefaultColumnFamily();
std::string orig(LONG_STRING);
for (size_t i = 0; i < 10000; i++) {
int len = orig.length();
int ttl = 3000 * (rnd.Next() % 10);
char *val = new char[len + BlobDB::kTTLSuffixLength];
strncpy(val, LONG_STRING, len);
strncpy(val + len, "ttl:", 4);
EncodeFixed32(val + len + 4, ttl);
std::string key("key");
key += std::to_string(i);
Slice keyslice(key);
Slice valslice(val, len + BlobDB::kTTLSuffixLength);
ASSERT_OK(blobdb_->Put(wo, dcfh, keyslice, valslice));
delete[] val;
}
for (size_t i = 0; i < 10000; i++) {
std::string key("key");
key += std::to_string(i);
Slice keyslice(key);
std::string val;
Status s = blobdb_->Get(ro, dcfh, keyslice, &val);
ASSERT_TRUE(orig == val);
}
Env::Default()->SleepForMicroseconds(120 * 1000 * 1000);
}
TEST_F(BlobDBTest, GCTestWithPutAndCompression) {
BlobDBOptionsImpl bdboptions;
bdboptions.ttl_range_secs = 30;
bdboptions.gc_file_pct = 100;
bdboptions.gc_check_period_millisecs = 20 * 1000;
bdboptions.default_ttl_extractor = true;
bdboptions.gc_evict_cb_fn = &cb_evict;
bdboptions.compression = CompressionType::kLZ4Compression;
Reopen1(bdboptions);
WriteOptions wo;
ReadOptions ro;
std::string value;
Random rnd(301);
ColumnFamilyHandle *dcfh = blobdb_->DefaultColumnFamily();
for (size_t i = 0; i < 100; i++) {
int len = rnd.Next() % 16384;
if (!len) continue;
int ttl = 30;
char *val = new char[len + BlobDB::kTTLSuffixLength];
gen_random(val, len);
strncpy(val + len, "ttl:", 4);
EncodeFixed32(val + len + 4, ttl);
std::string key("key");
key += std::to_string(i);
Slice keyslice(key);
Slice valslice(val, len + BlobDB::kTTLSuffixLength);
ASSERT_OK(blobdb_->Put(wo, dcfh, keyslice, valslice));
delete[] val;
}
Env::Default()->SleepForMicroseconds(120 * 1000 * 1000);
}
TEST_F(BlobDBTest, GCTestWithPut) {
BlobDBOptionsImpl bdboptions;
bdboptions.ttl_range_secs = 30;
bdboptions.gc_file_pct = 100;
bdboptions.gc_check_period_millisecs = 20 * 1000;
bdboptions.default_ttl_extractor = true;
bdboptions.gc_evict_cb_fn = &cb_evict;
Reopen1(bdboptions);
WriteOptions wo;
ReadOptions ro;
std::string value;
Random rnd(301);
ColumnFamilyHandle *dcfh = blobdb_->DefaultColumnFamily();
for (size_t i = 0; i < 100; i++) {
int len = rnd.Next() % 16384;
if (!len) continue;
int ttl = 30;
char *val = new char[len + BlobDB::kTTLSuffixLength];
gen_random(val, len);
strncpy(val + len, "ttl:", 4);
EncodeFixed32(val + len + 4, ttl);
std::string key("key");
key += std::to_string(i);
Slice keyslice(key);
Slice valslice(val, len + BlobDB::kTTLSuffixLength);
ASSERT_OK(blobdb_->Put(wo, dcfh, keyslice, valslice));
delete[] val;
}
Env::Default()->SleepForMicroseconds(120 * 1000 * 1000);
}
TEST_F(BlobDBTest, GCTest) {
BlobDBOptionsImpl bdboptions;
bdboptions.ttl_range_secs = 30;
bdboptions.gc_file_pct = 100;
Reopen1(bdboptions);
WriteOptions wo;
ReadOptions ro;
std::string value;
Random rnd(301);
ColumnFamilyHandle *dcfh = blobdb_->DefaultColumnFamily();
for (size_t i = 0; i < 100; i++) {
int len = rnd.Next() % 16384;
if (!len) continue;
char *val = new char[len + 1];
gen_random(val, len);
std::string key("key");
key += std::to_string(i);
Slice keyslice(key);
Slice valslice(val, len + 1);
int ttl = 30;
ASSERT_OK(blobdb_->PutWithTTL(wo, dcfh, keyslice, valslice, ttl));
delete[] val;
}
Env::Default()->SleepForMicroseconds(240 * 1000 * 1000);
}
TEST_F(BlobDBTest, DISABLED_MultipleWriters) {
BlobDBOptionsImpl bdboptions;
Reopen1(bdboptions);
ASSERT_TRUE(blobdb_ != nullptr);
std::vector<std::thread> workers;
for (size_t ii = 0; ii < 10; ii++)
workers.push_back(std::thread(&BlobDBTest::insert_blobs, this));
for (std::thread &t : workers) {
if (t.joinable()) {
t.join();
}
}
Env::Default()->SleepForMicroseconds(180 * 1000 * 1000);
// ASSERT_OK(blobdb_->PutWithTTL(wo, dcfh, "bar", "v2", 60));
// ASSERT_OK(blobdb_->Get(ro, dcfh, "foo", &value));
// ASSERT_EQ("v1", value);
// ASSERT_OK(blobdb_->Get(ro, dcfh, "bar", &value));
// ASSERT_EQ("v2", value);
}
#if 0
TEST_F(BlobDBTest, Large) {
ASSERT_TRUE(blobdb_ != nullptr);
WriteOptions wo;
ReadOptions ro;
std::string value1, value2, value3;
Random rnd(301);
ColumnFamilyHandle* dcfh = blobdb_->DefaultColumnFamily();
value1.assign(8999, '1');
ASSERT_OK(db_->Put(wo, "foo", value1));
ASSERT_OK(blobdb_->PutWithTTL(wo, dcfh, "foo", value1, 3600));
value2.assign(9001, '2');
ASSERT_OK(db_->Put(wo, "bar", value2));
ASSERT_OK(blobdb_->PutWithTTL(wo, dcfh, "bar", value2, 3600));
test::RandomString(&rnd, 13333, &value3);
ASSERT_OK(db_->Put(wo, "barfoo", value3));
ASSERT_OK(blobdb_->PutWithTTL(wo, dcfh, "barfoo", value3, 3600));
std::string value;
ASSERT_OK(db_->Get(ro, "foo", &value));
ASSERT_OK(blobdb_->Get(ro, dcfh, "foo", &value));
ASSERT_EQ(value1, value);
ASSERT_OK(db_->Get(ro, "bar", &value));
ASSERT_OK(blobdb_->Get(ro, dcfh, "bar", &value));
ASSERT_EQ(value2, value);
ASSERT_OK(db_->Get(ro, "barfoo", &value));
ASSERT_OK(blobdb_->Get(ro, dcfh, "barfoo", &value));
ASSERT_EQ(value3, value);
}
#endif
} // namespace blob_db
} // namespace rocksdb
// A black-box test for the ttl wrapper around rocksdb

View File

@ -0,0 +1,225 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include <chrono>
#include <cinttypes>
#include <memory>
#include "utilities/blob_db/blob_db_impl.h"
#include "util/filename.h"
namespace rocksdb {
namespace blob_db {
BlobFile::BlobFile()
: parent_(nullptr),
file_number_(0),
blob_count_(0),
gc_epoch_(-1),
file_size_(0),
deleted_count_(0),
deleted_size_(0),
closed_(false),
can_be_deleted_(false),
gc_once_after_open_(false),
ttl_range_(std::make_pair(0, 0)),
time_range_(std::make_pair(0, 0)),
sn_range_(std::make_pair(0, 0)),
last_access_(-1),
last_fsync_(0),
header_valid_(false) {}
BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn)
: parent_(p),
path_to_dir_(bdir),
file_number_(fn),
blob_count_(0),
gc_epoch_(-1),
file_size_(0),
deleted_count_(0),
deleted_size_(0),
closed_(false),
can_be_deleted_(false),
gc_once_after_open_(false),
ttl_range_(std::make_pair(0, 0)),
time_range_(std::make_pair(0, 0)),
sn_range_(std::make_pair(0, 0)),
last_access_(-1),
last_fsync_(0),
header_valid_(false) {}
BlobFile::~BlobFile() {
if (can_be_deleted_) {
std::string pn(PathName());
Status s = Env::Default()->DeleteFile(PathName());
if (!s.ok()) {
// Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
// "File could not be deleted %s", pn.c_str());
}
}
}
std::string BlobFile::PathName() const {
return BlobFileName(path_to_dir_, file_number_);
}
std::shared_ptr<Reader> BlobFile::OpenSequentialReader(
Env* env, const DBOptions& db_options,
const EnvOptions& env_options) const {
std::unique_ptr<SequentialFile> sfile;
Status s = env->NewSequentialFile(PathName(), &sfile, env_options);
if (!s.ok()) {
// report something here.
return nullptr;
}
std::unique_ptr<SequentialFileReader> sfile_reader;
sfile_reader.reset(new SequentialFileReader(std::move(sfile)));
std::shared_ptr<Reader> log_reader =
std::make_shared<Reader>(db_options.info_log, std::move(sfile_reader));
return log_reader;
}
std::string BlobFile::DumpState() const {
char str[1000];
std::snprintf(str, sizeof(str),
"path: %s fn: %" PRIu64 " blob_count: %" PRIu64
" gc_epoch: %" PRIu64 " file_size: %" PRIu64
" deleted_count: %" PRIu64 " deleted_size: %" PRIu64
" closed: %d can_be_deleted: %d ttl_range: (%d, %d)"
" sn_range: (%" PRIu64 " %" PRIu64 "), writer: %d reader: %d",
path_to_dir_.c_str(), file_number_, blob_count_.load(),
gc_epoch_.load(), file_size_.load(), deleted_count_,
deleted_size_, closed_.load(), can_be_deleted_.load(),
ttl_range_.first, ttl_range_.second, sn_range_.first,
sn_range_.second, (!!log_writer_), (!!ra_file_reader_));
return str;
}
bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const {
assert(last_fsync_ <= file_size_);
return (hard) ? file_size_ > last_fsync_
: (file_size_ - last_fsync_) >= bytes_per_sync;
}
Status BlobFile::WriteFooterAndCloseLocked() {
Log(InfoLogLevel::INFO_LEVEL, parent_->db_options_.info_log,
"File is being closed after footer %s", PathName().c_str());
BlobLogFooter footer;
footer.blob_count_ = blob_count_;
if (HasTTL()) footer.set_ttl_range(ttl_range_);
footer.sn_range_ = sn_range_;
if (HasTimestamp()) footer.set_time_range(time_range_);
// this will close the file and reset the Writable File Pointer.
Status s = log_writer_->AppendFooter(footer);
if (s.ok()) {
closed_ = true;
file_size_ += BlobLogFooter::kFooterSize;
} else {
Log(InfoLogLevel::ERROR_LEVEL, parent_->db_options_.info_log,
"Failure to read Header for blob-file %s", PathName().c_str());
}
// delete the sequential writer
log_writer_.reset();
return s;
}
Status BlobFile::ReadFooter(BlobLogFooter* bf) {
if (file_size_ < (BlobLogHeader::kHeaderSize + BlobLogFooter::kFooterSize)) {
return Status::IOError("File does not have footer", PathName());
}
uint64_t footer_offset = file_size_ - BlobLogFooter::kFooterSize;
// assume that ra_file_reader_ is valid before we enter this
assert(ra_file_reader_);
Slice result;
char scratch[BlobLogFooter::kFooterSize + 10];
Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kFooterSize,
&result, scratch);
if (!s.ok()) return s;
if (result.size() != BlobLogFooter::kFooterSize) {
// should not happen
return Status::IOError("EOF reached before footer");
}
s = bf->DecodeFrom(&result);
return s;
}
Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
if (footer.HasTTL() != header_.HasTTL()) {
return Status::Corruption("has_ttl mismatch");
}
if (footer.HasTimestamp() != header_.HasTimestamp()) {
return Status::Corruption("has_ts mismatch");
}
// assume that file has been fully fsync'd
last_fsync_.store(file_size_);
blob_count_ = footer.GetBlobCount();
ttl_range_ = footer.GetTTLRange();
time_range_ = footer.GetTimeRange();
sn_range_ = footer.GetSNRange();
closed_ = true;
return Status::OK();
}
void BlobFile::Fsync() {
if (log_writer_.get()) {
log_writer_->Sync();
last_fsync_.store(file_size_.load());
}
}
void BlobFile::CloseRandomAccessLocked() {
ra_file_reader_.reset();
last_access_ = -1;
}
std::shared_ptr<RandomAccessFileReader> BlobFile::GetOrOpenRandomAccessReader(
Env* env, const EnvOptions& env_options, bool* fresh_open) {
*fresh_open = false;
last_access_ =
std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
{
ReadLock lockbfile_r(&mutex_);
if (ra_file_reader_) return ra_file_reader_;
}
WriteLock lockbfile_w(&mutex_);
if (ra_file_reader_) return ra_file_reader_;
std::unique_ptr<RandomAccessFile> rfile;
Status s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, parent_->db_options_.info_log,
"Failed to open blob file for random-read: %s status: '%s'"
" exists: '%s'",
PathName().c_str(), s.ToString().c_str(),
env->FileExists(PathName()).ToString().c_str());
return nullptr;
}
ra_file_reader_ = std::make_shared<RandomAccessFileReader>(std::move(rfile));
*fresh_open = true;
return ra_file_reader_;
}
ColumnFamilyHandle* BlobFile::GetColumnFamily(DB* db) {
return db->DefaultColumnFamily();
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,313 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_log_format.h"
#include "util/coding.h"
#include "util/crc32c.h"
namespace rocksdb {
namespace blob_db {
const uint32_t kMagicNumber = 2395959;
const uint32_t kVersion1 = 1;
const size_t kBlockSize = 32768;
BlobLogHeader::BlobLogHeader()
: magic_number_(kMagicNumber), compression_(kNoCompression) {}
BlobLogHeader& BlobLogHeader::operator=(BlobLogHeader&& in) noexcept {
if (this != &in) {
magic_number_ = in.magic_number_;
version_ = in.version_;
ttl_guess_ = std::move(in.ttl_guess_);
ts_guess_ = std::move(in.ts_guess_);
compression_ = in.compression_;
}
return *this;
}
BlobLogFooter::BlobLogFooter() : magic_number_(kMagicNumber), blob_count_(0) {}
Status BlobLogFooter::DecodeFrom(Slice* input) {
uint32_t val;
if (!GetFixed32(input, &val)) {
return Status::Corruption("Invalid Blob Footer: flags");
}
bool has_ttl = false;
bool has_ts = false;
val >>= 8;
RecordSubType st = static_cast<RecordSubType>(val);
switch (st) {
case kRegularType:
break;
case kTTLType:
has_ttl = true;
break;
case kTimestampType:
has_ts = true;
break;
default:
return Status::Corruption("Invalid Blob Footer: flags_val");
}
if (!GetFixed64(input, &blob_count_)) {
return Status::Corruption("Invalid Blob Footer: blob_count");
}
ttlrange_t temp_ttl;
if (!GetFixed32(input, &temp_ttl.first) ||
!GetFixed32(input, &temp_ttl.second)) {
return Status::Corruption("Invalid Blob Footer: ttl_range");
}
if (has_ttl) {
printf("has ttl\n");
ttl_range_.reset(new ttlrange_t(temp_ttl));
}
if (!GetFixed64(input, &sn_range_.first) ||
!GetFixed64(input, &sn_range_.second)) {
return Status::Corruption("Invalid Blob Footer: sn_range");
}
tsrange_t temp_ts;
if (!GetFixed64(input, &temp_ts.first) ||
!GetFixed64(input, &temp_ts.second)) {
return Status::Corruption("Invalid Blob Footer: ts_range");
}
if (has_ts) ts_range_.reset(new tsrange_t(temp_ts));
if (!GetFixed32(input, &magic_number_) || magic_number_ != kMagicNumber) {
return Status::Corruption("Invalid Blob Footer: magic");
}
return Status::OK();
}
void BlobLogFooter::EncodeTo(std::string* dst) const {
dst->reserve(kFooterSize);
RecordType rt = kFullType;
RecordSubType st = kRegularType;
if (HasTTL()) {
st = kTTLType;
} else if (HasTimestamp()) {
st = kTimestampType;
}
uint32_t val = static_cast<uint32_t>(rt) | (static_cast<uint32_t>(st) << 8);
PutFixed32(dst, val);
PutFixed64(dst, blob_count_);
bool has_ttl = HasTTL();
bool has_ts = HasTimestamp();
if (has_ttl) {
PutFixed32(dst, ttl_range_.get()->first);
PutFixed32(dst, ttl_range_.get()->second);
} else {
PutFixed32(dst, 0);
PutFixed32(dst, 0);
}
PutFixed64(dst, sn_range_.first);
PutFixed64(dst, sn_range_.second);
if (has_ts) {
PutFixed64(dst, ts_range_.get()->first);
PutFixed64(dst, ts_range_.get()->second);
} else {
PutFixed64(dst, 0);
PutFixed64(dst, 0);
}
PutFixed32(dst, magic_number_);
}
void BlobLogHeader::EncodeTo(std::string* dst) const {
dst->reserve(kHeaderSize);
PutFixed32(dst, magic_number_);
PutFixed32(dst, version_);
RecordSubType st = kRegularType;
bool has_ttl = HasTTL();
bool has_ts = HasTimestamp();
if (has_ttl) {
st = kTTLType;
} else if (has_ts) {
st = kTimestampType;
}
uint32_t val =
static_cast<uint32_t>(st) | (static_cast<uint32_t>(compression_) << 8);
PutFixed32(dst, val);
if (has_ttl) {
PutFixed32(dst, ttl_guess_.get()->first);
PutFixed32(dst, ttl_guess_.get()->second);
} else {
PutFixed32(dst, 0);
PutFixed32(dst, 0);
}
if (has_ts) {
PutFixed64(dst, ts_guess_.get()->first);
PutFixed64(dst, ts_guess_.get()->second);
} else {
PutFixed64(dst, 0);
PutFixed64(dst, 0);
}
}
Status BlobLogHeader::DecodeFrom(Slice* input) {
if (!GetFixed32(input, &magic_number_) || magic_number_ != kMagicNumber) {
return Status::Corruption("Invalid Blob Log Header: magic");
}
// as of today, we only support 1 version
if (!GetFixed32(input, &version_) || version_ != kVersion1) {
return Status::Corruption("Invalid Blob Log Header: version");
}
uint32_t val;
if (!GetFixed32(input, &val)) {
return Status::Corruption("Invalid Blob Log Header: subtype");
}
bool has_ttl = false;
bool has_ts = false;
RecordSubType st = static_cast<RecordSubType>(val & 0xff);
compression_ = static_cast<CompressionType>((val >> 8) & 0xff);
switch (st) {
case kRegularType:
break;
case kTTLType:
has_ttl = true;
break;
case kTimestampType:
has_ts = true;
break;
default:
return Status::Corruption("Invalid Blob Log Header: subtype_2");
}
ttlrange_t temp_ttl;
if (!GetFixed32(input, &temp_ttl.first) ||
!GetFixed32(input, &temp_ttl.second)) {
return Status::Corruption("Invalid Blob Log Header: ttl");
}
if (has_ttl) set_ttl_guess(temp_ttl);
tsrange_t temp_ts;
if (!GetFixed64(input, &temp_ts.first) ||
!GetFixed64(input, &temp_ts.second)) {
return Status::Corruption("Invalid Blob Log Header: timestamp");
}
if (has_ts) set_ts_guess(temp_ts);
return Status::OK();
}
BlobLogRecord::BlobLogRecord()
: checksum_(0),
header_cksum_(0),
key_size_(0),
blob_size_(0),
time_val_(0),
ttl_val_(0),
sn_(0),
type_(0),
subtype_(0) {}
BlobLogRecord::~BlobLogRecord() {}
void BlobLogRecord::ResizeKeyBuffer(size_t kbs) {
if (kbs > key_buffer_.size()) {
key_buffer_.resize(kbs);
}
}
void BlobLogRecord::ResizeBlobBuffer(size_t bbs) {
if (bbs > blob_buffer_.size()) {
blob_buffer_.resize(bbs);
}
}
void BlobLogRecord::Clear() {
checksum_ = 0;
header_cksum_ = 0;
key_size_ = 0;
blob_size_ = 0;
time_val_ = 0;
ttl_val_ = 0;
sn_ = 0;
type_ = subtype_ = 0;
key_.clear();
blob_.clear();
}
Status BlobLogRecord::DecodeHeaderFrom(const Slice& hdrslice) {
Slice input = hdrslice;
if (input.size() < kHeaderSize) {
return Status::Corruption("Invalid Blob Record Header: size");
}
if (!GetFixed32(&input, &key_size_)) {
return Status::Corruption("Invalid Blob Record Header: key_size");
}
if (!GetFixed64(&input, &blob_size_)) {
return Status::Corruption("Invalid Blob Record Header: blob_size");
}
if (!GetFixed32(&input, &ttl_val_)) {
return Status::Corruption("Invalid Blob Record Header: ttl_val");
}
if (!GetFixed64(&input, &time_val_)) {
return Status::Corruption("Invalid Blob Record Header: time_val");
}
type_ = *(input.data());
input.remove_prefix(1);
subtype_ = *(input.data());
input.remove_prefix(1);
if (!GetFixed32(&input, &header_cksum_)) {
return Status::Corruption("Invalid Blob Record Header: header_cksum");
}
if (!GetFixed32(&input, &checksum_)) {
return Status::Corruption("Invalid Blob Record Header: checksum");
}
return Status::OK();
}
Status BlobLogRecord::DecodeFooterFrom(const Slice& footerslice) {
Slice input = footerslice;
if (input.size() < kFooterSize) {
return Status::Corruption("Invalid Blob Record Footer: size");
}
uint32_t f_crc = crc32c::Extend(0, input.data(), 8);
f_crc = crc32c::Mask(f_crc);
if (!GetFixed64(&input, &sn_)) {
return Status::Corruption("Invalid Blob Record Footer: sn");
}
if (!GetFixed32(&input, &footer_cksum_)) {
return Status::Corruption("Invalid Blob Record Footer: cksum");
}
if (f_crc != footer_cksum_) {
return Status::Corruption("Record Checksum mismatch: footer_cksum");
}
return Status::OK();
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,226 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Log format information shared by reader and writer.
#pragma once
#ifndef ROCKSDB_LITE
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
namespace rocksdb {
namespace blob_db {
class BlobFile;
class BlobDBImpl;
enum RecordType : uint8_t {
// Zero is reserved for preallocated files
kFullType = 0,
// For fragments
kFirstType = 1,
kMiddleType = 2,
kLastType = 3,
kMaxRecordType = kLastType
};
enum RecordSubType : uint8_t {
kRegularType = 0,
kTTLType = 1,
kTimestampType = 2,
};
extern const uint32_t kMagicNumber;
class Reader;
typedef std::pair<uint32_t, uint32_t> ttlrange_t;
typedef std::pair<uint64_t, uint64_t> tsrange_t;
typedef std::pair<rocksdb::SequenceNumber, rocksdb::SequenceNumber> snrange_t;
class BlobLogHeader {
friend class BlobFile;
friend class BlobDBImpl;
private:
uint32_t magic_number_ = 0;
uint32_t version_ = 1;
CompressionType compression_;
std::unique_ptr<ttlrange_t> ttl_guess_;
std::unique_ptr<tsrange_t> ts_guess_;
private:
void set_ttl_guess(const ttlrange_t& ttl) {
ttl_guess_.reset(new ttlrange_t(ttl));
}
void set_version(uint32_t v) { version_ = v; }
void set_ts_guess(const tsrange_t& ts) { ts_guess_.reset(new tsrange_t(ts)); }
public:
// magic number + version + flags + ttl guess + timestamp range
static const size_t kHeaderSize = 4 + 4 + 4 + 4 * 2 + 8 * 2;
// 32
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* input);
BlobLogHeader();
bool HasTTL() const { return !!ttl_guess_; }
bool HasTimestamp() const { return !!ts_guess_; }
BlobLogHeader& operator=(BlobLogHeader&& in) noexcept;
};
// Footer encapsulates the fixed information stored at the tail
// end of every blob log file.
class BlobLogFooter {
friend class BlobFile;
public:
// Use this constructor when you plan to write out the footer using
// EncodeTo(). Never use this constructor with DecodeFrom().
BlobLogFooter();
uint64_t magic_number() const { return magic_number_; }
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* input);
// convert this object to a human readable form
std::string ToString() const;
// footer size = 4 byte magic number
// 8 bytes count
// 4, 4 - ttl range
// 8, 8 - sn range
// 8, 8 - ts range
// = 56
static const size_t kFooterSize = 4 + 4 + 8 + (4 * 2) + (8 * 2) + (8 * 2);
bool HasTTL() const { return !!ttl_range_; }
bool HasTimestamp() const { return !!ts_range_; }
uint64_t GetBlobCount() const { return blob_count_; }
ttlrange_t GetTTLRange() const {
if (ttl_range_) {
*ttl_range_;
}
return {0, 0};
}
tsrange_t GetTimeRange() const {
if (ts_range_) {
return *ts_range_;
}
return {0, 0};
}
const snrange_t& GetSNRange() const { return sn_range_; }
private:
uint32_t magic_number_ = 0;
uint64_t blob_count_ = 0;
std::unique_ptr<ttlrange_t> ttl_range_;
std::unique_ptr<tsrange_t> ts_range_;
snrange_t sn_range_;
private:
void set_ttl_range(const ttlrange_t& ttl) {
ttl_range_.reset(new ttlrange_t(ttl));
}
void set_time_range(const tsrange_t& ts) {
ts_range_.reset(new tsrange_t(ts));
}
};
extern const size_t kBlockSize;
class BlobLogRecord {
friend class Reader;
private:
// this might not be set.
uint32_t checksum_;
uint32_t header_cksum_;
uint32_t key_size_;
uint64_t blob_size_;
uint64_t time_val_;
uint32_t ttl_val_;
SequenceNumber sn_;
uint32_t footer_cksum_;
char type_;
char subtype_;
Slice key_;
Slice blob_;
std::string key_buffer_;
std::string blob_buffer_;
private:
void Clear();
char* GetKeyBuffer() { return &(key_buffer_[0]); }
char* GetBlobBuffer() { return &(blob_buffer_[0]); }
void ResizeKeyBuffer(size_t kbs);
void ResizeBlobBuffer(size_t bbs);
public:
// Header is
// Key Length ( 4 bytes ),
// Blob Length ( 8 bytes), timestamp/ttl (8 bytes),
// type (1 byte), subtype (1 byte)
// header checksum (4 bytes), blob checksum (4 bytes),
// = 34
static const size_t kHeaderSize = 4 + 4 + 4 + 8 + 4 + 8 + 1 + 1;
static const size_t kFooterSize = 8 + 4;
public:
BlobLogRecord();
~BlobLogRecord();
const Slice& Key() const { return key_; }
const Slice& Blob() const { return blob_; }
uint32_t GetKeySize() const { return key_size_; }
uint64_t GetBlobSize() const { return blob_size_; }
uint32_t GetTTL() const { return ttl_val_; }
uint64_t GetTimeVal() const { return time_val_; }
SequenceNumber GetSN() const { return sn_; }
Status DecodeHeaderFrom(const Slice& hdrslice);
Status DecodeFooterFrom(const Slice& footerslice);
};
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,163 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_log_reader.h"
#include <cstdio>
#include "rocksdb/env.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
namespace rocksdb {
namespace blob_db {
Reader::Reader(std::shared_ptr<Logger> info_log,
unique_ptr<SequentialFileReader>&& _file)
: info_log_(info_log), file_(std::move(_file)), buffer_(), next_byte_(0) {
backing_store_.resize(kBlockSize);
}
Reader::~Reader() {}
Status Reader::ReadHeader(BlobLogHeader* header) {
assert(file_.get() != nullptr);
assert(next_byte_ == 0);
Status status =
file_->Read(BlobLogHeader::kHeaderSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogHeader::kHeaderSize) {
return Status::IOError("EOF reached before file header");
}
status = header->DecodeFrom(&buffer_);
return status;
}
Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
WALRecoveryMode wal_recovery_mode) {
record->Clear();
buffer_.clear();
backing_store_[0] = '\0';
Status status =
file_->Read(BlobLogRecord::kHeaderSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogRecord::kHeaderSize) {
return Status::IOError("EOF reached before record header");
}
status = record->DecodeHeaderFrom(buffer_);
if (!status.ok()) return status;
uint32_t header_crc = 0;
uint32_t blob_crc = 0;
size_t crc_data_size = BlobLogRecord::kHeaderSize - 2 * sizeof(uint32_t);
header_crc = crc32c::Extend(header_crc, buffer_.data(), crc_data_size);
uint64_t kb_size = record->GetKeySize() + record->GetBlobSize();
switch (level) {
case kReadHdrFooter:
file_->Skip(kb_size);
next_byte_ += kb_size;
status =
file_->Read(BlobLogRecord::kFooterSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogRecord::kFooterSize) {
return Status::IOError("EOF reached before record footer");
}
status = record->DecodeFooterFrom(buffer_);
return status;
case kReadHdrKeyFooter:
record->ResizeKeyBuffer(record->GetKeySize());
status = file_->Read(record->GetKeySize(), &record->key_,
record->GetKeyBuffer());
next_byte_ += record->key_.size();
if (!status.ok()) return status;
if (record->key_.size() != record->GetKeySize()) {
return Status::IOError("EOF reached before key read");
}
header_crc =
crc32c::Extend(header_crc, record->key_.data(), record->GetKeySize());
header_crc = crc32c::Mask(header_crc);
if (header_crc != record->header_cksum_) {
return Status::Corruption("Record Checksum mismatch: header_cksum");
}
file_->Skip(record->GetBlobSize());
next_byte_ += record->GetBlobSize();
status =
file_->Read(BlobLogRecord::kFooterSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogRecord::kFooterSize) {
return Status::IOError("EOF reached during footer read");
}
status = record->DecodeFooterFrom(buffer_);
return status;
case kReadHdrKeyBlobFooter:
record->ResizeKeyBuffer(record->GetKeySize());
status = file_->Read(record->GetKeySize(), &record->key_,
record->GetKeyBuffer());
next_byte_ += record->key_.size();
if (!status.ok()) return status;
if (record->key_.size() != record->GetKeySize()) {
return Status::IOError("EOF reached before key read");
}
header_crc =
crc32c::Extend(header_crc, record->key_.data(), record->GetKeySize());
header_crc = crc32c::Mask(header_crc);
if (header_crc != record->header_cksum_) {
return Status::Corruption("Record Checksum mismatch: header_cksum");
}
record->ResizeBlobBuffer(record->GetBlobSize());
status = file_->Read(record->GetBlobSize(), &record->blob_,
record->GetBlobBuffer());
next_byte_ += record->blob_.size();
if (!status.ok()) return status;
if (record->blob_.size() != record->GetBlobSize()) {
return Status::IOError("EOF reached during blob read");
}
blob_crc =
crc32c::Extend(blob_crc, record->blob_.data(), record->blob_.size());
blob_crc = crc32c::Mask(blob_crc);
if (blob_crc != record->checksum_) {
return Status::Corruption("Blob Checksum mismatch");
}
status =
file_->Read(BlobLogRecord::kFooterSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogRecord::kFooterSize) {
return Status::IOError("EOF reached during blob footer read");
}
status = record->DecodeFooterFrom(buffer_);
return status;
default:
assert(0);
return status;
}
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,93 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#pragma once
#ifndef ROCKSDB_LITE
#include <cstdint>
#include <memory>
#include <string>
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "utilities/blob_db/blob_log_format.h"
namespace rocksdb {
class SequentialFileReader;
class Logger;
namespace blob_db {
/**
* Reader is a general purpose log stream reader implementation. The actual job
* of reading from the device is implemented by the SequentialFile interface.
*
* Please see Writer for details on the file and record layout.
*/
class Reader {
public:
enum ReadLevel {
kReadHdrFooter,
kReadHdrKeyFooter,
kReadHdrKeyBlobFooter,
};
// Create a reader that will return log records from "*file".
// "*file" must remain live while this Reader is in use.
//
// If "reporter" is non-nullptr, it is notified whenever some data is
// dropped due to a detected corruption. "*reporter" must remain
// live while this Reader is in use.
//
// If "checksum" is true, verify checksums if available.
//
// The Reader will start reading at the first record located at physical
// position >= initial_offset within the file.
Reader(std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& file);
~Reader();
Status ReadHeader(BlobLogHeader* header);
// Read the next record into *record. Returns true if read
// successfully, false if we hit end of the input. May use
// "*scratch" as temporary storage. The contents filled in *record
// will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch.
Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHdrFooter,
WALRecoveryMode wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords);
SequentialFileReader* file() { return file_.get(); }
void ResetNextByte() { next_byte_ = 0; }
uint64_t GetNextByte() const { return next_byte_; }
private:
char* GetReadBuffer() { return &(backing_store_[0]); }
private:
std::shared_ptr<Logger> info_log_;
const std::unique_ptr<SequentialFileReader> file_;
std::string backing_store_;
Slice buffer_;
// which byte to read next. For asserting proper usage
uint64_t next_byte_;
// No copying allowed
Reader(const Reader&) = delete;
Reader& operator=(const Reader&) = delete;
};
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,172 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_log_writer.h"
#include <cstdint>
#include <limits>
#include <string>
#include "rocksdb/env.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
namespace rocksdb {
namespace blob_db {
Writer::Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
uint64_t bpsync, bool use_fs, uint64_t boffset)
: dest_(std::move(dest)),
log_number_(log_number),
block_offset_(boffset),
bytes_per_sync_(bpsync),
next_sync_offset_(0),
use_fsync_(use_fs),
last_elem_type_(kEtNone) {
for (int i = 0; i <= kMaxRecordType; i++) {
char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1);
}
}
Writer::~Writer() {}
void Writer::Sync() { dest_->Sync(use_fsync_); }
Status Writer::WriteHeader(const BlobLogHeader& header) {
assert(block_offset_ == 0);
assert(last_elem_type_ == kEtNone);
std::string str;
header.EncodeTo(&str);
Status s = dest_->Append(Slice(str));
if (s.ok()) {
block_offset_ += str.size();
s = dest_->Flush();
}
last_elem_type_ = kEtFileHdr;
return s;
}
Status Writer::AppendFooter(const BlobLogFooter& footer) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtFooter);
std::string str;
footer.EncodeTo(&str);
Status s = dest_->Append(Slice(str));
if (s.ok()) {
block_offset_ += str.size();
s = dest_->Close();
dest_.reset();
}
last_elem_type_ = kEtFileFooter;
return s;
}
Status Writer::AddRecord(const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset,
uint32_t ttl) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtFooter);
std::string buf;
ConstructBlobHeader(&buf, key, val, ttl, -1);
Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
return s;
}
Status Writer::AddRecord(const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtFooter);
std::string buf;
ConstructBlobHeader(&buf, key, val, -1, -1);
Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
return s;
}
void Writer::ConstructBlobHeader(std::string* headerbuf, const Slice& key,
const Slice& val, int32_t ttl, int64_t ts) {
headerbuf->reserve(BlobLogRecord::kHeaderSize);
uint32_t key_size = static_cast<uint32_t>(key.size());
PutFixed32(headerbuf, key_size);
PutFixed64(headerbuf, val.size());
uint32_t ttl_write = (ttl != -1) ? static_cast<uint32_t>(ttl)
: std::numeric_limits<uint32_t>::max();
PutFixed32(headerbuf, ttl_write);
uint64_t ts_write = (ts != -1) ? static_cast<uint64_t>(ts)
: std::numeric_limits<uint64_t>::max();
PutFixed64(headerbuf, ts_write);
RecordType t = kFullType;
headerbuf->push_back(static_cast<char>(t));
RecordSubType st = kRegularType;
if (ttl != -1) st = kTTLType;
headerbuf->push_back(static_cast<char>(st));
uint32_t header_crc = 0;
header_crc =
crc32c::Extend(header_crc, headerbuf->c_str(), headerbuf->size());
header_crc = crc32c::Extend(header_crc, key.data(), key.size());
header_crc = crc32c::Mask(header_crc);
PutFixed32(headerbuf, header_crc);
uint32_t crc = 0;
// Compute the crc of the record type and the payload.
crc = crc32c::Extend(crc, val.data(), val.size());
crc = crc32c::Mask(crc); // Adjust for storage
PutFixed32(headerbuf, crc);
}
Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset) {
Status s = dest_->Append(Slice(headerbuf));
if (s.ok()) {
s = dest_->Append(key);
if (s.ok()) s = dest_->Append(val);
}
*key_offset = block_offset_ + BlobLogRecord::kHeaderSize;
*blob_offset = *key_offset + key.size();
block_offset_ = *blob_offset + val.size();
last_elem_type_ = kEtRecord;
return s;
}
Status Writer::AddRecordFooter(const SequenceNumber& seq) {
assert(last_elem_type_ == kEtRecord);
std::string buf;
PutFixed64(&buf, seq);
uint32_t footer_crc = crc32c::Extend(0, buf.c_str(), buf.size());
footer_crc = crc32c::Mask(footer_crc);
PutFixed32(&buf, footer_crc);
Status s = dest_->Append(Slice(buf));
block_offset_ += BlobLogRecord::kFooterSize;
if (s.ok()) dest_->Flush();
last_elem_type_ = kEtFooter;
return s;
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,98 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#pragma once
#ifndef ROCKSDB_LITE
#include <cstdint>
#include <memory>
#include <string>
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "utilities/blob_db/blob_log_format.h"
namespace rocksdb {
class WritableFileWriter;
namespace blob_db {
/**
* Writer is the blob log stream writer. It provides an append-only
* abstraction for writing blob data.
*
*
* Look at blob_db_format.h to see the details of the record formats.
*/
class Writer {
public:
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use.
explicit Writer(std::unique_ptr<WritableFileWriter>&& dest,
uint64_t log_number, uint64_t bpsync, bool use_fsync,
uint64_t boffset = 0);
~Writer();
static void ConstructBlobHeader(std::string* headerbuf, const Slice& key,
const Slice& val, int32_t ttl, int64_t ts);
Status AddRecord(const Slice& key, const Slice& val, uint64_t* key_offset,
uint64_t* blob_offset);
Status AddRecord(const Slice& key, const Slice& val, uint64_t* key_offset,
uint64_t* blob_offset, uint32_t ttl);
Status EmitPhysicalRecord(const std::string& headerbuf, const Slice& key,
const Slice& val, uint64_t* key_offset,
uint64_t* blob_offset);
Status AddRecordFooter(const SequenceNumber& sn);
Status AppendFooter(const BlobLogFooter& footer);
Status WriteHeader(const BlobLogHeader& header);
WritableFileWriter* file() { return dest_.get(); }
const WritableFileWriter* file() const { return dest_.get(); }
uint64_t get_log_number() const { return log_number_; }
bool ShouldSync() const { return block_offset_ > next_sync_offset_; }
void Sync();
void ResetSyncPointer() { next_sync_offset_ += bytes_per_sync_; }
private:
std::unique_ptr<WritableFileWriter> dest_;
uint64_t log_number_;
uint64_t block_offset_; // Current offset in block
uint64_t bytes_per_sync_;
uint64_t next_sync_offset_;
bool use_fsync_;
// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
// record type stored in the header.
uint32_t type_crc_[kMaxRecordType + 1];
// No copying allowed
Writer(const Writer&) = delete;
Writer& operator=(const Writer&) = delete;
public:
enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFooter, kEtFileFooter };
ElemType last_elem_type_;
};
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -16,10 +16,14 @@ namespace rocksdb {
class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
public:
explicit OptimisticTransactionDBImpl(DB* db)
: OptimisticTransactionDB(db), db_(db) {}
explicit OptimisticTransactionDBImpl(DB* db, bool take_ownership = true)
: OptimisticTransactionDB(db), db_(db), db_owner_(take_ownership) {}
~OptimisticTransactionDBImpl() {}
~OptimisticTransactionDBImpl() {
if (!db_owner_) {
db_.release();
}
}
Transaction* BeginTransaction(const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options,
@ -29,6 +33,7 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
private:
std::unique_ptr<DB> db_;
bool db_owner_;
void ReinitializeTransaction(Transaction* txn,
const WriteOptions& write_options,