EventLogger

Summary:
Here's my proposal for making our LOGs easier to read by machines.

The idea is to dump all events as JSON objects. JSON is easy to read by humans, but more importantly, it's easy to read by machines. That way, we can parse this, load into SQLite/mongo and then query or visualize.

I started with table_create and table_delete events, but if everybody agrees, I'll continue by adding more events (flush/compaction/etc etc)

Test Plan:
Ran db_bench. Observed:
2015/01/15-14:13:25.788019 1105ef000 EVENT_LOG_v1 {"time_micros": 1421360005788015, "event": "table_file_creation", "file_number": 12, "file_size": 1909699}
2015/01/15-14:13:25.956500 110740000 EVENT_LOG_v1 {"time_micros": 1421360005956498, "event": "table_file_deletion", "file_number": 12}

Reviewers: yhchiang, rven, dhruba, MarkCallaghan, lgalanis, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D31647
This commit is contained in:
Igor Canadi 2015-03-13 10:15:54 -07:00
parent 756532daf5
commit 52d8347a91
10 changed files with 269 additions and 9 deletions

View File

@ -228,6 +228,7 @@ TESTS = \
geodb_test \ geodb_test \
rate_limiter_test \ rate_limiter_test \
options_test \ options_test \
event_logger_test \
cuckoo_table_builder_test \ cuckoo_table_builder_test \
cuckoo_table_reader_test \ cuckoo_table_reader_test \
cuckoo_table_db_test \ cuckoo_table_db_test \
@ -623,6 +624,9 @@ compact_files_test: db/compact_files_test.o $(LIBOBJECTS) $(TESTHARNESS)
options_test: util/options_test.o $(LIBOBJECTS) $(TESTHARNESS) options_test: util/options_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
event_logger_test: util/event_logger_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
sst_dump_test: util/sst_dump_test.o $(LIBOBJECTS) $(TESTHARNESS) sst_dump_test: util/sst_dump_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

View File

@ -199,9 +199,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
db_options_(SanitizeOptions(dbname, options)), db_options_(SanitizeOptions(dbname, options)),
stats_(db_options_.statistics.get()), stats_(db_options_.statistics.get()),
db_lock_(nullptr), db_lock_(nullptr),
mutex_(stats_, env_, mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS, options.use_adaptive_mutex),
DB_MUTEX_WAIT_MICROS,
options.use_adaptive_mutex),
shutting_down_(false), shutting_down_(false),
bg_cv_(&mutex_), bg_cv_(&mutex_),
logfile_number_(0), logfile_number_(0),
@ -229,6 +227,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
wal_manager_(db_options_, env_options_), wal_manager_(db_options_, env_options_),
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
event_logger_(db_options_.info_log.get()),
bg_work_gate_closed_(false), bg_work_gate_closed_(false),
refitting_level_(false), refitting_level_(false),
opened_successfully_(false), opened_successfully_(false),
@ -652,6 +651,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
// evict from cache // evict from cache
TableCache::Evict(table_cache_.get(), number); TableCache::Evict(table_cache_.get(), number);
fname = TableFileName(db_options_.db_paths, number, path_id); fname = TableFileName(db_options_.db_paths, number, path_id);
event_logger_.Log() << "event"
<< "table_file_deletion"
<< "file_number" << number;
} else { } else {
fname = ((type == kLogFile) ? fname = ((type == kLogFile) ?
db_options_.wal_dir : dbname_) + "/" + to_delete; db_options_.wal_dir : dbname_) + "/" + to_delete;
@ -1140,7 +1142,8 @@ Status DBImpl::FlushMemTableToOutputFile(
env_options_, versions_.get(), &mutex_, &shutting_down_, env_options_, versions_.get(), &mutex_, &shutting_down_,
snapshots_.GetNewest(), job_context, log_buffer, snapshots_.GetNewest(), job_context, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(0U), directories_.GetDbDir(), directories_.GetDataDir(0U),
GetCompressionFlush(*cfd->ioptions()), stats_); GetCompressionFlush(*cfd->ioptions()), stats_,
&event_logger_);
uint64_t file_number; uint64_t file_number;
Status s = flush_job.Run(&file_number); Status s = flush_job.Run(&file_number);

View File

@ -32,6 +32,8 @@
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/event_logger.h"
#include "util/hash.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/thread_local.h" #include "util/thread_local.h"
#include "util/scoped_arena_iterator.h" #include "util/scoped_arena_iterator.h"
@ -599,6 +601,9 @@ class DBImpl : public DB {
WalManager wal_manager_; WalManager wal_manager_;
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// Unified interface for logging events
EventLogger event_logger_;
// A value of true temporarily disables scheduling of background work // A value of true temporarily disables scheduling of background work
bool bg_work_gate_closed_; bool bg_work_gate_closed_;

View File

@ -40,6 +40,7 @@
#include "table/table_builder.h" #include "table/table_builder.h"
#include "table/two_level_iterator.h" #include "table/two_level_iterator.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/event_logger.h"
#include "util/file_util.h" #include "util/file_util.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/log_buffer.h" #include "util/log_buffer.h"
@ -61,7 +62,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
SequenceNumber newest_snapshot, JobContext* job_context, SequenceNumber newest_snapshot, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory, LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory, Directory* output_file_directory,
CompressionType output_compression, Statistics* stats) CompressionType output_compression, Statistics* stats,
EventLogger* event_logger)
: dbname_(dbname), : dbname_(dbname),
cfd_(cfd), cfd_(cfd),
db_options_(db_options), db_options_(db_options),
@ -76,7 +78,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
db_directory_(db_directory), db_directory_(db_directory),
output_file_directory_(output_file_directory), output_file_directory_(output_file_directory),
output_compression_(output_compression), output_compression_(output_compression),
stats_(stats) {} stats_(stats),
event_logger_(event_logger) {}
Status FlushJob::Run(uint64_t* file_number) { Status FlushJob::Run(uint64_t* file_number) {
// Save the contents of the earliest memtable as a new Table // Save the contents of the earliest memtable as a new Table
@ -180,6 +183,10 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber(), cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber(),
meta.fd.GetFileSize(), s.ToString().c_str()); meta.fd.GetFileSize(), s.ToString().c_str());
event_logger_->Log() << "event"
<< "table_file_creation"
<< "file_number" << meta.fd.GetNumber() << "file_size"
<< meta.fd.GetFileSize();
if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {
output_file_directory_->Fsync(); output_file_directory_->Fsync();
} }

View File

@ -28,6 +28,7 @@
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/event_logger.h"
#include "util/instrumented_mutex.h" #include "util/instrumented_mutex.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/thread_local.h" #include "util/thread_local.h"
@ -59,7 +60,7 @@ class FlushJob {
SequenceNumber newest_snapshot, JobContext* job_context, SequenceNumber newest_snapshot, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory, LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory, CompressionType output_compression, Directory* output_file_directory, CompressionType output_compression,
Statistics* stats); Statistics* stats, EventLogger* event_logger);
~FlushJob() {} ~FlushJob() {}
Status Run(uint64_t* file_number = nullptr); Status Run(uint64_t* file_number = nullptr);
@ -82,6 +83,7 @@ class FlushJob {
Directory* output_file_directory_; Directory* output_file_directory_;
CompressionType output_compression_; CompressionType output_compression_;
Statistics* stats_; Statistics* stats_;
EventLogger* event_logger_;
}; };
} // namespace rocksdb } // namespace rocksdb

View File

@ -83,11 +83,12 @@ class FlushJobTest {
TEST(FlushJobTest, Empty) { TEST(FlushJobTest, Empty) {
JobContext job_context(0); JobContext job_context(0);
auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto cfd = versions_->GetColumnFamilySet()->GetDefault();
EventLogger event_logger(db_options_.info_log.get());
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_, env_options_, versions_.get(), &mutex_, &shutting_down_,
SequenceNumber(), &job_context, nullptr, nullptr, nullptr, SequenceNumber(), &job_context, nullptr, nullptr, nullptr,
kNoCompression, nullptr); kNoCompression, nullptr, &event_logger);
ASSERT_OK(flush_job.Run()); ASSERT_OK(flush_job.Run());
job_context.Clean(); job_context.Clean();
} }
@ -107,11 +108,12 @@ TEST(FlushJobTest, NonEmpty) {
} }
cfd->imm()->Add(new_mem); cfd->imm()->Add(new_mem);
EventLogger event_logger(db_options_.info_log.get());
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_, env_options_, versions_.get(), &mutex_, &shutting_down_,
SequenceNumber(), &job_context, nullptr, nullptr, nullptr, SequenceNumber(), &job_context, nullptr, nullptr, nullptr,
kNoCompression, nullptr); kNoCompression, nullptr, &event_logger);
mutex_.Lock(); mutex_.Lock();
ASSERT_OK(flush_job.Run()); ASSERT_OK(flush_job.Run());
mutex_.Unlock(); mutex_.Unlock();

2
src.mk
View File

@ -105,6 +105,7 @@ LIB_SOURCES = \
utilities/spatialdb/spatial_db.cc \ utilities/spatialdb/spatial_db.cc \
utilities/ttl/db_ttl_impl.cc \ utilities/ttl/db_ttl_impl.cc \
utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \
util/event_logger.cc \
util/ldb_cmd.cc \ util/ldb_cmd.cc \
util/ldb_tool.cc \ util/ldb_tool.cc \
util/log_buffer.cc \ util/log_buffer.cc \
@ -209,6 +210,7 @@ TEST_BENCH_SOURCES = \
util/memenv_test.cc \ util/memenv_test.cc \
util/mock_env_test.cc \ util/mock_env_test.cc \
util/options_test.cc \ util/options_test.cc \
util/event_logger_test.cc \
util/rate_limiter_test.cc \ util/rate_limiter_test.cc \
util/signal_test.cc \ util/signal_test.cc \
util/slice_transform_test.cc \ util/slice_transform_test.cc \

35
util/event_logger.cc Normal file
View File

@ -0,0 +1,35 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include "util/event_logger.h"
#include <inttypes.h>
#include <cassert>
#include <chrono>
#include <sstream>
#include <string>
#include "util/string_util.h"
namespace rocksdb {
const char* kEventLoggerPrefix = "EVENT_LOG_v1";
EventLoggerStream::EventLoggerStream(Logger* logger)
: logger_(logger), json_writter_(nullptr) {}
EventLoggerStream::~EventLoggerStream() {
if (json_writter_) {
json_writter_->EndObject();
Log(logger_, "%s %s", kEventLoggerPrefix, json_writter_->Get().c_str());
delete json_writter_;
}
}
} // namespace rocksdb

158
util/event_logger.h Normal file
View File

@ -0,0 +1,158 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <memory>
#include <sstream>
#include <string>
#include "rocksdb/env.h"
namespace rocksdb {
// JSONWritter doesn't support objects in arrays yet. There wasn't a need for
// that.
class JSONWritter {
public:
JSONWritter() : state_(kExpectKey), first_element_(true) { stream_ << "{"; }
void AddKey(const std::string& key) {
assert(state_ == kExpectKey);
if (!first_element_) {
stream_ << ", ";
}
stream_ << "\"" << key << "\": ";
state_ = kExpectValue;
first_element_ = false;
}
void AddValue(const char* value) {
assert(state_ == kExpectValue || state_ == kInArray);
if (state_ == kInArray && !first_element_) {
stream_ << ", ";
}
stream_ << "\"" << value << "\"";
if (state_ != kInArray) {
state_ = kExpectKey;
}
first_element_ = false;
}
template <typename T>
void AddValue(const T& value) {
assert(state_ == kExpectValue || state_ == kInArray);
if (state_ == kInArray && !first_element_) {
stream_ << ", ";
}
stream_ << value;
if (state_ != kInArray) {
state_ = kExpectKey;
}
first_element_ = false;
}
void StartArray() {
assert(state_ == kExpectKey);
state_ = kInArray;
if (!first_element_) {
stream_ << ", ";
}
stream_ << "[";
first_element_ = true;
}
void EndArray() {
assert(state_ == kInArray);
state_ = kExpectKey;
stream_ << "]";
first_element_ = false;
}
void StartObject() {
assert(state_ == kExpectValue);
stream_ << "{";
first_element_ = true;
}
void EndObject() {
assert(state_ == kExpectKey);
stream_ << "}";
first_element_ = false;
}
std::string Get() const { return stream_.str(); }
JSONWritter& operator<<(const char* val) {
if (state_ == kExpectKey) {
AddKey(val);
} else {
AddValue(val);
}
return *this;
}
JSONWritter& operator<<(const std::string& val) {
return *this << val.c_str();
}
template <typename T>
JSONWritter& operator<<(const T& val) {
assert(state_ != kExpectKey);
AddValue(val);
return *this;
}
private:
enum JSONWritterState {
kExpectKey,
kExpectValue,
kInArray,
};
JSONWritterState state_;
bool first_element_;
std::ostringstream stream_;
};
class EventLoggerStream {
public:
template <typename T>
EventLoggerStream& operator<<(const T& val) {
MakeStream();
*json_writter_ << val;
return *this;
}
~EventLoggerStream();
private:
void MakeStream() {
if (!json_writter_) {
json_writter_ = new JSONWritter();
*this << "time_micros"
<< std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
}
friend class EventLogger;
explicit EventLoggerStream(Logger* logger);
Logger* logger_;
// ownership
JSONWritter* json_writter_;
};
// here is an example of the output that will show up in the LOG:
// 2015/01/15-14:13:25.788019 1105ef000 EVENT_LOG_v1 {"time_micros":
// 1421360005788015, "event": "table_file_creation", "file_number": 12,
// "file_size": 1909699}
class EventLogger {
public:
explicit EventLogger(Logger* logger) : logger_(logger) {}
EventLoggerStream Log() { return EventLoggerStream(logger_); }
private:
Logger* logger_;
};
} // namespace rocksdb

42
util/event_logger_test.cc Normal file
View File

@ -0,0 +1,42 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <string>
#include "util/event_logger.h"
#include "util/testharness.h"
namespace rocksdb {
class EventLoggerTest {};
class StringLogger : public Logger {
public:
using Logger::Logv;
virtual void Logv(const char* format, va_list ap) override {
vsnprintf(buffer_, sizeof(buffer_), format, ap);
}
char* buffer() { return buffer_; }
private:
char buffer_[1000];
};
TEST(EventLoggerTest, SimpleTest) {
StringLogger logger;
EventLogger event_logger(&logger);
event_logger.Log() << "id" << 5 << "event"
<< "just_testing";
std::string output(logger.buffer());
ASSERT_TRUE(output.find("\"event\": \"just_testing\"") != std::string::npos);
ASSERT_TRUE(output.find("\"id\": 5") != std::string::npos);
ASSERT_TRUE(output.find("\"time_micros\"") != std::string::npos);
}
} // namespace rocksdb
int main(int argc, char** argv) {
return rocksdb::test::RunAllTests();
}