Move HDFS support to separate repo (#9170)

Summary:
This PR moves HDFS support from RocksDB repo to a separate repo. The new (temporary?) repo
in this PR serves as an example before we finalize the decision on where and who to host hdfs support. At this point,
people can start from the example repo and fork.

Java/JNI is not included yet, and needs to be done later if necessary.

The goal is to include this commit in RocksDB 7.0 release.

Reference:
https://github.com/ajkr/dedupfs by ajkr

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9170

Test Plan:
Follow the instructions in https://github.com/riversand963/rocksdb-hdfs-env/blob/master/README.md. Build and run db_bench and db_stress.

make check

Reviewed By: ajkr

Differential Revision: D33751662

Pulled By: riversand963

fbshipit-source-id: 22b4db7f31762ed417a20239f5a08dcd1696244f
This commit is contained in:
Yanqin Jin 2022-01-24 19:56:37 -08:00 committed by Facebook GitHub Bot
parent 1cecd22de9
commit 50135c1bf3
25 changed files with 29 additions and 1271 deletions

View File

@ -719,7 +719,6 @@ set(SOURCES
env/env.cc env/env.cc
env/env_chroot.cc env/env_chroot.cc
env/env_encryption.cc env/env_encryption.cc
env/env_hdfs.cc
env/file_system.cc env/file_system.cc
env/file_system_tracer.cc env/file_system_tracer.cc
env/fs_remap.cc env/fs_remap.cc

View File

@ -1,4 +1,8 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased
### Public API changes
* Remove HDFS support from main repo.
## 6.29.0 (01/21/2022) ## 6.29.0 (01/21/2022)
Note: The next release will be major release 7.0. See https://github.com/facebook/rocksdb/issues/9390 for more info. Note: The next release will be major release 7.0. See https://github.com/facebook/rocksdb/issues/9390 for more info.
### Public API change ### Public API change

View File

@ -559,7 +559,7 @@ endif
# dependencies # dependencies
ifneq ($(filter check-headers, $(MAKECMDGOALS)),) ifneq ($(filter check-headers, $(MAKECMDGOALS)),)
# TODO: add/support JNI headers # TODO: add/support JNI headers
DEV_HEADER_DIRS := $(sort include/ hdfs/ $(dir $(ALL_SOURCES))) DEV_HEADER_DIRS := $(sort include/ $(dir $(ALL_SOURCES)))
# Some headers like in port/ are platform-specific # Some headers like in port/ are platform-specific
DEV_HEADERS := $(shell $(FIND) $(DEV_HEADER_DIRS) -type f -name '*.h' | egrep -v 'port/|plugin/|lua/|range_tree/|tools/rdb/db_wrapper.h|include/rocksdb/utilities/env_librados.h') DEV_HEADERS := $(shell $(FIND) $(DEV_HEADER_DIRS) -type f -name '*.h' | egrep -v 'port/|plugin/|lua/|range_tree/|tools/rdb/db_wrapper.h|include/rocksdb/utilities/env_librados.h')
else else

View File

@ -1,4 +1,5 @@
This is the list of all known third-party plugins for RocksDB. If something is missing, please open a pull request to add it. This is the list of all known third-party plugins for RocksDB. If something is missing, please open a pull request to add it.
* [Dedupfs](https://github.com/ajkr/dedupfs): an example for plugin developers to reference * [Dedupfs](https://github.com/ajkr/dedupfs): an example for plugin developers to reference
* [HDFS](https://github.com/riversand963/rocksdb-hdfs-env): an Env used for interacting with HDFS. Migrated from main RocksDB repo
* [ZenFS](https://github.com/westerndigitalcorporation/zenfs): a file system for zoned block devices * [ZenFS](https://github.com/westerndigitalcorporation/zenfs): a file system for zoned block devices

View File

@ -229,7 +229,6 @@ cpp_library(
"env/env.cc", "env/env.cc",
"env/env_chroot.cc", "env/env_chroot.cc",
"env/env_encryption.cc", "env/env_encryption.cc",
"env/env_hdfs.cc",
"env/env_posix.cc", "env/env_posix.cc",
"env/file_system.cc", "env/file_system.cc",
"env/file_system_tracer.cc", "env/file_system_tracer.cc",
@ -559,7 +558,6 @@ cpp_library(
"env/env.cc", "env/env.cc",
"env/env_chroot.cc", "env/env_chroot.cc",
"env/env_encryption.cc", "env/env_encryption.cc",
"env/env_hdfs.cc",
"env/env_posix.cc", "env/env_posix.cc",
"env/file_system.cc", "env/file_system.cc",
"env/file_system_tracer.cc", "env/file_system_tracer.cc",

View File

@ -617,22 +617,6 @@ EOF
fi fi
fi fi
# shall we use HDFS?
if test "$USE_HDFS"; then
if test -z "$JAVA_HOME"; then
echo "JAVA_HOME has to be set for HDFS usage." >&2
exit 1
fi
HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS -I$HADOOP_HOME/include"
HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64 -L$HADOOP_HOME/lib/native"
HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib"
HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm"
COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS"
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS $HDFS_LDFLAGS"
JAVA_LDFLAGS="$JAVA_LDFLAGS $HDFS_LDFLAGS"
fi
if test "0$PORTABLE" -eq 0; then if test "0$PORTABLE" -eq 0; then
if test -n "`echo $TARGET_ARCHITECTURE | grep ^ppc64`"; then if test -n "`echo $TARGET_ARCHITECTURE | grep ^ppc64`"; then
# Tune for this POWER processor, treating '+' models as base models # Tune for this POWER processor, treating '+' models as base models

View File

@ -41,7 +41,6 @@
#include "db_stress_tool/db_stress_listener.h" #include "db_stress_tool/db_stress_listener.h"
#include "db_stress_tool/db_stress_shared_state.h" #include "db_stress_tool/db_stress_shared_state.h"
#include "db_stress_tool/db_stress_test_base.h" #include "db_stress_tool/db_stress_test_base.h"
#include "hdfs/env_hdfs.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "monitoring/histogram.h" #include "monitoring/histogram.h"
#include "options/options_helper.h" #include "options/options_helper.h"
@ -212,7 +211,6 @@ DECLARE_int32(compression_zstd_max_train_bytes);
DECLARE_int32(compression_parallel_threads); DECLARE_int32(compression_parallel_threads);
DECLARE_uint64(compression_max_dict_buffer_bytes); DECLARE_uint64(compression_max_dict_buffer_bytes);
DECLARE_string(checksum_type); DECLARE_string(checksum_type);
DECLARE_string(hdfs);
DECLARE_string(env_uri); DECLARE_string(env_uri);
DECLARE_string(fs_uri); DECLARE_string(fs_uri);
DECLARE_uint64(ops_per_thread); DECLARE_uint64(ops_per_thread);
@ -277,7 +275,7 @@ constexpr long KB = 1024;
constexpr int kRandomValueMaxFactor = 3; constexpr int kRandomValueMaxFactor = 3;
constexpr int kValueMaxLen = 100; constexpr int kValueMaxLen = 100;
// wrapped posix or hdfs environment // wrapped posix environment
extern ROCKSDB_NAMESPACE::Env* db_stress_env; extern ROCKSDB_NAMESPACE::Env* db_stress_env;
#ifndef NDEBUG #ifndef NDEBUG
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {

View File

@ -708,17 +708,12 @@ DEFINE_string(bottommost_compression_type, "disable",
DEFINE_string(checksum_type, "kCRC32c", "Algorithm to use to checksum blocks"); DEFINE_string(checksum_type, "kCRC32c", "Algorithm to use to checksum blocks");
DEFINE_string(hdfs, "", DEFINE_string(env_uri, "",
"Name of hdfs environment. Mutually exclusive with" "URI for env lookup. Mutually exclusive with --fs_uri");
" --env_uri and --fs_uri.");
DEFINE_string(
env_uri, "",
"URI for env lookup. Mutually exclusive with --hdfs and --fs_uri");
DEFINE_string(fs_uri, "", DEFINE_string(fs_uri, "",
"URI for registry Filesystem lookup. Mutually exclusive" "URI for registry Filesystem lookup. Mutually exclusive"
" with --hdfs and --env_uri." " with --env_uri."
" Creates a default environment with the specified filesystem."); " Creates a default environment with the specified filesystem.");
DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread."); DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread.");

View File

@ -64,24 +64,18 @@ int db_stress_tool(int argc, char** argv) {
Env* raw_env; Env* raw_env;
int env_opts = int env_opts = !FLAGS_env_uri.empty() + !FLAGS_fs_uri.empty();
!FLAGS_hdfs.empty() + !FLAGS_env_uri.empty() + !FLAGS_fs_uri.empty();
if (env_opts > 1) { if (env_opts > 1) {
fprintf(stderr, fprintf(stderr, "Error: --env_uri and --fs_uri are mutually exclusive\n");
"Error: --hdfs, --env_uri and --fs_uri are mutually exclusive\n");
exit(1); exit(1);
} }
if (!FLAGS_hdfs.empty()) { Status s = Env::CreateFromUri(ConfigOptions(), FLAGS_env_uri, FLAGS_fs_uri,
raw_env = new ROCKSDB_NAMESPACE::HdfsEnv(FLAGS_hdfs); &raw_env, &env_guard);
} else { if (!s.ok()) {
Status s = Env::CreateFromUri(ConfigOptions(), FLAGS_env_uri, FLAGS_fs_uri, fprintf(stderr, "Error Creating Env URI: %s: %s\n", FLAGS_env_uri.c_str(),
&raw_env, &env_guard); s.ToString().c_str());
if (!s.ok()) { exit(1);
fprintf(stderr, "Error Creating Env URI: %s: %s\n", FLAGS_env_uri.c_str(),
s.ToString().c_str());
exit(1);
}
} }
#ifndef NDEBUG #ifndef NDEBUG
@ -238,8 +232,7 @@ int db_stress_tool(int argc, char** argv) {
std::string default_secondaries_path; std::string default_secondaries_path;
db_stress_env->GetTestDirectory(&default_secondaries_path); db_stress_env->GetTestDirectory(&default_secondaries_path);
default_secondaries_path += "/dbstress_secondaries"; default_secondaries_path += "/dbstress_secondaries";
ROCKSDB_NAMESPACE::Status s = s = db_stress_env->CreateDirIfMissing(default_secondaries_path);
db_stress_env->CreateDirIfMissing(default_secondaries_path);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Failed to create directory %s: %s\n", fprintf(stderr, "Failed to create directory %s: %s\n",
default_secondaries_path.c_str(), s.ToString().c_str()); default_secondaries_path.c_str(), s.ToString().c_str());

650
env/env_hdfs.cc vendored
View File

@ -1,650 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#include "rocksdb/env.h"
#include "hdfs/env_hdfs.h"
#ifdef USE_HDFS
#ifndef ROCKSDB_HDFS_FILE_C
#define ROCKSDB_HDFS_FILE_C
#include <stdio.h>
#include <time.h>
#include <algorithm>
#include <iostream>
#include <sstream>
#include "logging/logging.h"
#include "rocksdb/status.h"
#include "util/string_util.h"
#define HDFS_EXISTS 0
#define HDFS_DOESNT_EXIST -1
#define HDFS_SUCCESS 0
//
// This file defines an HDFS environment for rocksdb. It uses the libhdfs
// api to access HDFS. All HDFS files created by one instance of rocksdb
// will reside on the same HDFS cluster.
//
namespace ROCKSDB_NAMESPACE {
namespace {
// Log error message
static Status IOError(const std::string& context, int err_number) {
return (err_number == ENOSPC)
? Status::NoSpace(context, errnoStr(err_number).c_str())
: (err_number == ENOENT)
? Status::PathNotFound(context, errnoStr(err_number).c_str())
: Status::IOError(context, errnoStr(err_number).c_str());
}
// assume that there is one global logger for now. It is not thread-safe,
// but need not be because the logger is initialized at db-open time.
static Logger* mylog = nullptr;
// Used for reading a file from HDFS. It implements both sequential-read
// access methods as well as random read access methods.
class HdfsReadableFile : virtual public SequentialFile,
virtual public RandomAccessFile {
private:
hdfsFS fileSys_;
std::string filename_;
hdfsFile hfile_;
public:
HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
: fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
filename_.c_str());
hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
ROCKS_LOG_DEBUG(mylog,
"[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
filename_.c_str(), hfile_);
}
virtual ~HdfsReadableFile() {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
filename_.c_str());
hdfsCloseFile(fileSys_, hfile_);
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
filename_.c_str());
hfile_ = nullptr;
}
bool isValid() {
return hfile_ != nullptr;
}
// sequential access, read data at current offset in file
virtual Status Read(size_t n, Slice* result, char* scratch) {
Status s;
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
filename_.c_str(), n);
char* buffer = scratch;
size_t total_bytes_read = 0;
tSize bytes_read = 0;
tSize remaining_bytes = (tSize)n;
// Read a total of n bytes repeatedly until we hit error or eof
while (remaining_bytes > 0) {
bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
if (bytes_read <= 0) {
break;
}
assert(bytes_read <= remaining_bytes);
total_bytes_read += bytes_read;
remaining_bytes -= bytes_read;
buffer += bytes_read;
}
assert(total_bytes_read <= n);
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",
filename_.c_str());
if (bytes_read < 0) {
s = IOError(filename_, errno);
} else {
*result = Slice(scratch, total_bytes_read);
}
return s;
}
// random access, read data from specified offset in file
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
Status s;
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
filename_.c_str());
tSize bytes_read =
hdfsPread(fileSys_, hfile_, offset, static_cast<void*>(scratch),
static_cast<tSize>(n));
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
filename_.c_str());
*result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
if (bytes_read < 0) {
// An error: return a non-ok status
s = IOError(filename_, errno);
}
return s;
}
virtual Status Skip(uint64_t n) {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",
filename_.c_str());
// get current offset from file
tOffset current = hdfsTell(fileSys_, hfile_);
if (current < 0) {
return IOError(filename_, errno);
}
// seek to new offset in file
tOffset newoffset = current + n;
int val = hdfsSeek(fileSys_, hfile_, newoffset);
if (val < 0) {
return IOError(filename_, errno);
}
return Status::OK();
}
private:
// returns true if we are at the end of file, false otherwise
bool feof() {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",
filename_.c_str());
if (hdfsTell(fileSys_, hfile_) == fileSize()) {
return true;
}
return false;
}
// the current size of the file
tOffset fileSize() {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",
filename_.c_str());
hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
tOffset size = 0L;
if (pFileInfo != nullptr) {
size = pFileInfo->mSize;
hdfsFreeFileInfo(pFileInfo, 1);
} else {
throw HdfsFatalException("fileSize on unknown file " + filename_);
}
return size;
}
};
// Appends to an existing file in HDFS.
class HdfsWritableFile: public WritableFile {
private:
hdfsFS fileSys_;
std::string filename_;
hdfsFile hfile_;
public:
HdfsWritableFile(hdfsFS fileSys, const std::string& fname,
const EnvOptions& options)
: WritableFile(options),
fileSys_(fileSys),
filename_(fname),
hfile_(nullptr) {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
filename_.c_str());
hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",
filename_.c_str());
assert(hfile_ != nullptr);
}
virtual ~HdfsWritableFile() {
if (hfile_ != nullptr) {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
filename_.c_str());
hdfsCloseFile(fileSys_, hfile_);
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
filename_.c_str());
hfile_ = nullptr;
}
}
using WritableFile::Append;
// If the file was successfully created, then this returns true.
// Otherwise returns false.
bool isValid() {
return hfile_ != nullptr;
}
// The name of the file, mostly needed for debug logging.
const std::string& getName() {
return filename_;
}
virtual Status Append(const Slice& data) {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",
filename_.c_str());
const char* src = data.data();
size_t left = data.size();
size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
filename_.c_str());
if (ret != left) {
return IOError(filename_, errno);
}
return Status::OK();
}
virtual Status Flush() {
return Status::OK();
}
virtual Status Sync() {
Status s;
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",
filename_.c_str());
if (hdfsFlush(fileSys_, hfile_) == -1) {
return IOError(filename_, errno);
}
if (hdfsHSync(fileSys_, hfile_) == -1) {
return IOError(filename_, errno);
}
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",
filename_.c_str());
return Status::OK();
}
// This is used by HdfsLogger to write data to the debug log file
virtual Status Append(const char* src, size_t size) {
if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
static_cast<tSize>(size)) {
return IOError(filename_, errno);
}
return Status::OK();
}
virtual Status Close() {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
filename_.c_str());
if (hdfsCloseFile(fileSys_, hfile_) != 0) {
return IOError(filename_, errno);
}
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
filename_.c_str());
hfile_ = nullptr;
return Status::OK();
}
};
// The object that implements the debug logs to reside in HDFS.
class HdfsLogger : public Logger {
private:
HdfsWritableFile* file_;
uint64_t (*gettid_)(); // Return the thread id for the current thread
Status HdfsCloseHelper() {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
file_->getName().c_str());
if (mylog != nullptr && mylog == this) {
mylog = nullptr;
}
return Status::OK();
}
protected:
virtual Status CloseImpl() override { return HdfsCloseHelper(); }
public:
HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
: file_(f), gettid_(gettid) {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",
file_->getName().c_str());
}
~HdfsLogger() override {
if (!closed_) {
closed_ = true;
HdfsCloseHelper();
}
}
using Logger::Logv;
void Logv(const char* format, va_list ap) override {
const uint64_t thread_id = (*gettid_)();
// We try twice: the first time with a fixed-size stack allocated buffer,
// and the second time with a much larger dynamically allocated buffer.
char buffer[500];
for (int iter = 0; iter < 2; iter++) {
char* base;
int bufsize;
if (iter == 0) {
bufsize = sizeof(buffer);
base = buffer;
} else {
bufsize = 30000;
base = new char[bufsize];
}
char* p = base;
char* limit = base + bufsize;
struct timeval now_tv;
gettimeofday(&now_tv, nullptr);
const time_t seconds = now_tv.tv_sec;
struct tm t;
localtime_r(&seconds, &t);
p += snprintf(p, limit - p,
"%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
t.tm_year + 1900,
t.tm_mon + 1,
t.tm_mday,
t.tm_hour,
t.tm_min,
t.tm_sec,
static_cast<int>(now_tv.tv_usec),
static_cast<long long unsigned int>(thread_id));
// Print the message
if (p < limit) {
va_list backup_ap;
va_copy(backup_ap, ap);
p += vsnprintf(p, limit - p, format, backup_ap);
va_end(backup_ap);
}
// Truncate to available space if necessary
if (p >= limit) {
if (iter == 0) {
continue; // Try again with larger buffer
} else {
p = limit - 1;
}
}
// Add newline if necessary
if (p == base || p[-1] != '\n') {
*p++ = '\n';
}
assert(p <= limit);
file_->Append(base, p-base);
file_->Flush();
if (base != buffer) {
delete[] base;
}
break;
}
}
};
} // namespace
// Finally, the hdfs environment
const std::string HdfsEnv::kProto = "hdfs://";
const std::string HdfsEnv::pathsep = "/";
// open a file for sequential reading
Status HdfsEnv::NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& /*options*/) {
result->reset();
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == nullptr || !f->isValid()) {
delete f;
*result = nullptr;
return IOError(fname, errno);
}
result->reset(dynamic_cast<SequentialFile*>(f));
return Status::OK();
}
// open a file for random reading
Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& /*options*/) {
result->reset();
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == nullptr || !f->isValid()) {
delete f;
*result = nullptr;
return IOError(fname, errno);
}
result->reset(dynamic_cast<RandomAccessFile*>(f));
return Status::OK();
}
// create a new file for writing
Status HdfsEnv::NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) {
result->reset();
Status s;
HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
if (f == nullptr || !f->isValid()) {
delete f;
*result = nullptr;
return IOError(fname, errno);
}
result->reset(dynamic_cast<WritableFile*>(f));
return Status::OK();
}
class HdfsDirectory : public Directory {
public:
explicit HdfsDirectory(int fd) : fd_(fd) {}
~HdfsDirectory() {}
Status Fsync() override { return Status::OK(); }
int GetFd() const { return fd_; }
private:
int fd_;
};
Status HdfsEnv::NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) {
int value = hdfsExists(fileSys_, name.c_str());
switch (value) {
case HDFS_EXISTS:
result->reset(new HdfsDirectory(0));
return Status::OK();
default: // fail if the directory doesn't exist
ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");
throw HdfsFatalException("hdfsExists call failed with error " +
ToString(value) + " on path " + name +
".\n");
}
}
Status HdfsEnv::FileExists(const std::string& fname) {
int value = hdfsExists(fileSys_, fname.c_str());
switch (value) {
case HDFS_EXISTS:
return Status::OK();
case HDFS_DOESNT_EXIST:
return Status::NotFound();
default: // anything else should be an error
ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");
return Status::IOError("hdfsExists call failed with error " +
ToString(value) + " on path " + fname + ".\n");
}
}
Status HdfsEnv::GetChildren(const std::string& path,
std::vector<std::string>* result) {
int value = hdfsExists(fileSys_, path.c_str());
switch (value) {
case HDFS_EXISTS: { // directory exists
int numEntries = 0;
hdfsFileInfo* pHdfsFileInfo = 0;
pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
if (numEntries >= 0) {
for(int i = 0; i < numEntries; i++) {
std::string pathname(pHdfsFileInfo[i].mName);
size_t pos = pathname.rfind("/");
if (std::string::npos != pos) {
result->push_back(pathname.substr(pos + 1));
}
}
if (pHdfsFileInfo != nullptr) {
hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
}
} else {
// numEntries < 0 indicates error
ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");
throw HdfsFatalException(
"hdfsListDirectory call failed negative error.\n");
}
break;
}
case HDFS_DOESNT_EXIST: // directory does not exist, exit
return Status::NotFound();
default: // anything else should be an error
ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");
throw HdfsFatalException("hdfsExists call failed with error " +
ToString(value) + ".\n");
}
return Status::OK();
}
Status HdfsEnv::DeleteFile(const std::string& fname) {
if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
return Status::OK();
}
return IOError(fname, errno);
};
Status HdfsEnv::CreateDir(const std::string& name) {
if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
return Status::OK();
}
return IOError(name, errno);
};
Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
const int value = hdfsExists(fileSys_, name.c_str());
// Not atomic. state might change b/w hdfsExists and CreateDir.
switch (value) {
case HDFS_EXISTS:
return Status::OK();
case HDFS_DOESNT_EXIST:
return CreateDir(name);
default: // anything else should be an error
ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");
throw HdfsFatalException("hdfsExists call failed with error " +
ToString(value) + ".\n");
}
};
Status HdfsEnv::DeleteDir(const std::string& name) {
return DeleteFile(name);
};
Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
*size = 0L;
hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
if (pFileInfo != nullptr) {
*size = pFileInfo->mSize;
hdfsFreeFileInfo(pFileInfo, 1);
return Status::OK();
}
return IOError(fname, errno);
}
Status HdfsEnv::GetFileModificationTime(const std::string& fname,
uint64_t* time) {
hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
if (pFileInfo != nullptr) {
*time = static_cast<uint64_t>(pFileInfo->mLastMod);
hdfsFreeFileInfo(pFileInfo, 1);
return Status::OK();
}
return IOError(fname, errno);
}
// The rename is not atomic. HDFS does not allow a renaming if the
// target already exists. So, we delete the target before attempting the
// rename.
Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
hdfsDelete(fileSys_, target.c_str(), 1);
if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
return Status::OK();
}
return IOError(src, errno);
}
Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
// there isn's a very good way to atomically check and create
// a file via libhdfs
*lock = nullptr;
return Status::OK();
}
Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
Status HdfsEnv::NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) {
// EnvOptions is used exclusively for its `strict_bytes_per_sync` value. That
// option is only intended for WAL/flush/compaction writes, so turn it off in
// the logger.
EnvOptions options;
options.strict_bytes_per_sync = false;
HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
if (f == nullptr || !f->isValid()) {
delete f;
*result = nullptr;
return IOError(fname, errno);
}
HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
result->reset(h);
if (mylog == nullptr) {
// mylog = h; // uncomment this for detailed logging
}
return Status::OK();
}
Status HdfsEnv::IsDirectory(const std::string& path, bool* is_dir) {
hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, path.c_str());
if (pFileInfo != nullptr) {
if (is_dir != nullptr) {
*is_dir = (pFileInfo->mKind == kObjectKindDirectory);
}
hdfsFreeFileInfo(pFileInfo, 1);
return Status::OK();
}
return IOError(path, errno);
}
// The factory method for creating an HDFS Env
Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
*hdfs_env = new HdfsEnv(fsname);
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_HDFS_FILE_C
#else // USE_HDFS
// dummy placeholders used when HDFS is not available
namespace ROCKSDB_NAMESPACE {
Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,
std::unique_ptr<SequentialFile>* /*result*/,
const EnvOptions& /*options*/) {
return Status::NotSupported("Not compiled with hdfs support");
}
Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {
return Status::NotSupported("Not compiled with hdfs support");
}
} // namespace ROCKSDB_NAMESPACE
#endif

View File

@ -1,23 +0,0 @@
This directory contains the hdfs extensions needed to make rocksdb store
files in HDFS.
It has been compiled and testing against CDH 4.4 (2.0.0+1475-1.cdh4.4.0.p0.23~precise-cdh4.4.0).
The configuration assumes that packages libhdfs0, libhdfs0-dev are
installed which basically means that hdfs.h is in /usr/include and libhdfs in /usr/lib
The env_hdfs.h file defines the rocksdb objects that are needed to talk to an
underlying filesystem.
If you want to compile rocksdb with hdfs support, please set the following
environment variables appropriately (also defined in setup.sh for convenience)
USE_HDFS=1
JAVA_HOME=/usr/local/jdk-7u79-64
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/jdk-7u79-64/jre/lib/amd64/server:/usr/local/jdk-7u79-64/jre/lib/amd64/:./snappy/libs
make clean all db_bench
To run dbbench,
set CLASSPATH to include your hadoop distribution
db_bench --hdfs="hdfs://hbaseudbperf001.snc1.facebook.com:9000"

View File

@ -1,394 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include <algorithm>
#include <stdio.h>
#include <time.h>
#include <iostream>
#include "port/sys_time.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#ifdef USE_HDFS
#include <hdfs.h>
namespace ROCKSDB_NAMESPACE {
// Thrown during execution when there is an issue with the supplied
// arguments.
class HdfsUsageException : public std::exception { };
// A simple exception that indicates something went wrong that is not
// recoverable. The intention is for the message to be printed (with
// nothing else) and the process terminate.
class HdfsFatalException : public std::exception {
public:
explicit HdfsFatalException(const std::string& s) : what_(s) { }
virtual ~HdfsFatalException() throw() { }
virtual const char* what() const throw() {
return what_.c_str();
}
private:
const std::string what_;
};
//
// The HDFS environment for rocksdb. This class overrides all the
// file/dir access methods and delegates the thread-mgmt methods to the
// default posix environment.
//
class HdfsEnv : public Env {
public:
explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) {
posixEnv = Env::Default();
fileSys_ = connectToPath(fsname_);
}
static const char* kClassName() { return "HdfsEnv"; }
const char* Name() const override { return kClassName(); }
static const char* kNickName() { return "hdfs"; }
const char* NickName() const override { return kNickName(); }
virtual ~HdfsEnv() {
fprintf(stderr, "Destroying HdfsEnv::Default()\n");
hdfsDisconnect(fileSys_);
}
Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) override;
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) override;
Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override;
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override;
Status FileExists(const std::string& fname) override;
Status GetChildren(const std::string& path,
std::vector<std::string>* result) override;
Status DeleteFile(const std::string& fname) override;
Status CreateDir(const std::string& name) override;
Status CreateDirIfMissing(const std::string& name) override;
Status DeleteDir(const std::string& name) override;
Status GetFileSize(const std::string& fname, uint64_t* size) override;
Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) override;
Status RenameFile(const std::string& src, const std::string& target) override;
Status LinkFile(const std::string& /*src*/,
const std::string& /*target*/) override {
return Status::NotSupported(); // not supported
}
Status LockFile(const std::string& fname, FileLock** lock) override;
Status UnlockFile(FileLock* lock) override;
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override;
Status IsDirectory(const std::string& path, bool* is_dir) override;
void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,
void* tag = nullptr,
void (*unschedFunction)(void* arg) = 0) override {
posixEnv->Schedule(function, arg, pri, tag, unschedFunction);
}
int UnSchedule(void* tag, Priority pri) override {
return posixEnv->UnSchedule(tag, pri);
}
void StartThread(void (*function)(void* arg), void* arg) override {
posixEnv->StartThread(function, arg);
}
void WaitForJoin() override { posixEnv->WaitForJoin(); }
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
return posixEnv->GetThreadPoolQueueLen(pri);
}
Status GetTestDirectory(std::string* path) override {
return posixEnv->GetTestDirectory(path);
}
uint64_t NowMicros() override { return posixEnv->NowMicros(); }
void SleepForMicroseconds(int micros) override {
posixEnv->SleepForMicroseconds(micros);
}
Status GetHostName(char* name, uint64_t len) override {
return posixEnv->GetHostName(name, len);
}
Status GetCurrentTime(int64_t* unix_time) override {
return posixEnv->GetCurrentTime(unix_time);
}
Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override {
return posixEnv->GetAbsolutePath(db_path, output_path);
}
void SetBackgroundThreads(int number, Priority pri = LOW) override {
posixEnv->SetBackgroundThreads(number, pri);
}
int GetBackgroundThreads(Priority pri = LOW) override {
return posixEnv->GetBackgroundThreads(pri);
}
void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
}
std::string TimeToString(uint64_t number) override {
return posixEnv->TimeToString(number);
}
static uint64_t gettid() { return Env::Default()->GetThreadID(); }
uint64_t GetThreadID() const override { return HdfsEnv::gettid(); }
private:
std::string fsname_; // string of the form "hdfs://hostname:port/"
hdfsFS fileSys_; // a single FileSystem object for all files
Env* posixEnv; // This object is derived from Env, but not from
// posixEnv. We have posixnv as an encapsulated
// object here so that we can use posix timers,
// posix threads, etc.
static const std::string kProto;
static const std::string pathsep;
/**
* If the URI is specified of the form hdfs://server:port/path,
* then connect to the specified cluster
* else connect to default.
*/
hdfsFS connectToPath(const std::string& uri) {
if (uri.empty()) {
return nullptr;
}
if (uri.find(kProto) != 0) {
// uri doesn't start with hdfs:// -> use default:0, which is special
// to libhdfs.
return hdfsConnectNewInstance("default", 0);
}
const std::string hostport = uri.substr(kProto.length());
std::vector <std::string> parts;
split(hostport, ':', parts);
if (parts.size() != 2) {
throw HdfsFatalException("Bad uri for hdfs " + uri);
}
// parts[0] = hosts, parts[1] = port/xxx/yyy
std::string host(parts[0]);
std::string remaining(parts[1]);
int rem = static_cast<int>(remaining.find(pathsep));
std::string portStr = (rem == 0 ? remaining :
remaining.substr(0, rem));
tPort port = static_cast<tPort>(atoi(portStr.c_str()));
if (port == 0) {
throw HdfsFatalException("Bad host-port for hdfs " + uri);
}
hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port);
return fs;
}
void split(const std::string &s, char delim,
std::vector<std::string> &elems) {
elems.clear();
size_t prev = 0;
size_t pos = s.find(delim);
while (pos != std::string::npos) {
elems.push_back(s.substr(prev, pos));
prev = pos + 1;
pos = s.find(delim, prev);
}
elems.push_back(s.substr(prev, s.size()));
}
};
} // namespace ROCKSDB_NAMESPACE
#else // USE_HDFS
namespace ROCKSDB_NAMESPACE {
class HdfsEnv : public Env {
public:
explicit HdfsEnv(const std::string& /*fsname*/) {
fprintf(stderr, "You have not build rocksdb with HDFS support\n");
fprintf(stderr, "Please see hdfs/README for details\n");
abort();
}
static const char* kClassName() { return "HdfsEnv"; }
const char* Name() const override { return kClassName(); }
static const char* kNickName() { return "hdfs"; }
const char* NickName() const override { return kNickName(); }
virtual ~HdfsEnv() {
}
virtual Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) override;
virtual Status NewRandomAccessFile(
const std::string& /*fname*/,
std::unique_ptr<RandomAccessFile>* /*result*/,
const EnvOptions& /*options*/) override {
return Status::NotSupported();
}
virtual Status NewWritableFile(const std::string& /*fname*/,
std::unique_ptr<WritableFile>* /*result*/,
const EnvOptions& /*options*/) override {
return Status::NotSupported();
}
virtual Status NewDirectory(const std::string& /*name*/,
std::unique_ptr<Directory>* /*result*/) override {
return Status::NotSupported();
}
virtual Status FileExists(const std::string& /*fname*/) override {
return Status::NotSupported();
}
virtual Status GetChildren(const std::string& /*path*/,
std::vector<std::string>* /*result*/) override {
return Status::NotSupported();
}
virtual Status DeleteFile(const std::string& /*fname*/) override {
return Status::NotSupported();
}
virtual Status CreateDir(const std::string& /*name*/) override {
return Status::NotSupported();
}
virtual Status CreateDirIfMissing(const std::string& /*name*/) override {
return Status::NotSupported();
}
virtual Status DeleteDir(const std::string& /*name*/) override {
return Status::NotSupported();
}
virtual Status GetFileSize(const std::string& /*fname*/,
uint64_t* /*size*/) override {
return Status::NotSupported();
}
virtual Status GetFileModificationTime(const std::string& /*fname*/,
uint64_t* /*time*/) override {
return Status::NotSupported();
}
virtual Status RenameFile(const std::string& /*src*/,
const std::string& /*target*/) override {
return Status::NotSupported();
}
virtual Status LinkFile(const std::string& /*src*/,
const std::string& /*target*/) override {
return Status::NotSupported();
}
virtual Status LockFile(const std::string& /*fname*/,
FileLock** /*lock*/) override {
return Status::NotSupported();
}
virtual Status UnlockFile(FileLock* /*lock*/) override {
return Status::NotSupported();
}
virtual Status NewLogger(const std::string& /*fname*/,
std::shared_ptr<Logger>* /*result*/) override {
return Status::NotSupported();
}
Status IsDirectory(const std::string& /*path*/, bool* /*is_dir*/) override {
return Status::NotSupported();
}
virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/,
Priority /*pri*/ = LOW, void* /*tag*/ = nullptr,
void (* /*unschedFunction*/)(void* arg) = 0) override {}
virtual int UnSchedule(void* /*tag*/, Priority /*pri*/) override { return 0; }
virtual void StartThread(void (* /*function*/)(void* arg),
void* /*arg*/) override {}
virtual void WaitForJoin() override {}
virtual unsigned int GetThreadPoolQueueLen(
Priority /*pri*/ = LOW) const override {
return 0;
}
virtual Status GetTestDirectory(std::string* /*path*/) override {
return Status::NotSupported();
}
virtual uint64_t NowMicros() override { return 0; }
virtual void SleepForMicroseconds(int /*micros*/) override {}
virtual Status GetHostName(char* /*name*/, uint64_t /*len*/) override {
return Status::NotSupported();
}
virtual Status GetCurrentTime(int64_t* /*unix_time*/) override {
return Status::NotSupported();
}
virtual Status GetAbsolutePath(const std::string& /*db_path*/,
std::string* /*outputpath*/) override {
return Status::NotSupported();
}
virtual void SetBackgroundThreads(int /*number*/,
Priority /*pri*/ = LOW) override {}
virtual int GetBackgroundThreads(Priority /*pri*/ = LOW) override {
return 0;
}
virtual void IncBackgroundThreadsIfNeeded(int /*number*/,
Priority /*pri*/) override {}
virtual std::string TimeToString(uint64_t /*number*/) override { return ""; }
virtual uint64_t GetThreadID() const override {
return 0;
}
};
} // namespace ROCKSDB_NAMESPACE
#endif // USE_HDFS

View File

@ -1,9 +0,0 @@
# shellcheck disable=SC2148
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
export USE_HDFS=1
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:$HADOOP_HOME/lib/native
export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
for f in `find /usr/lib/hadoop-hdfs | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
for f in `find /usr/lib/hadoop | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
for f in `find /usr/lib/hadoop/client | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done

View File

@ -1844,10 +1844,6 @@ class LoggerWrapper : public Logger {
// *base_env must remain live while the result is in use. // *base_env must remain live while the result is in use.
Env* NewMemEnv(Env* base_env); Env* NewMemEnv(Env* base_env);
// Returns a new environment that is used for HDFS environment.
// This is a factory method for HdfsEnv declared in hdfs/env_hdfs.h
Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname);
// Returns a new environment that measures function call times for filesystem // Returns a new environment that measures function call times for filesystem
// operations, reporting results to variables in PerfContext. // operations, reporting results to variables in PerfContext.
// This is a factory method for TimedEnv defined in utilities/env_timed.cc. // This is a factory method for TimedEnv defined in utilities/env_timed.cc.

View File

@ -38,7 +38,6 @@ struct BackupEngineOptions {
// Backup Env object. It will be used for backup file I/O. If it's // Backup Env object. It will be used for backup file I/O. If it's
// nullptr, backups will be written out using DBs Env. If it's // nullptr, backups will be written out using DBs Env. If it's
// non-nullptr, backup's I/O will be performed using this object. // non-nullptr, backup's I/O will be performed using this object.
// If you want to have backups on HDFS, use HDFS Env here!
// Default: nullptr // Default: nullptr
Env* backup_env; Env* backup_env;

View File

@ -157,7 +157,6 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/FlushOptions.java src/main/java/org/rocksdb/FlushOptions.java
src/main/java/org/rocksdb/HashLinkedListMemTableConfig.java src/main/java/org/rocksdb/HashLinkedListMemTableConfig.java
src/main/java/org/rocksdb/HashSkipListMemTableConfig.java src/main/java/org/rocksdb/HashSkipListMemTableConfig.java
src/main/java/org/rocksdb/HdfsEnv.java
src/main/java/org/rocksdb/HistogramData.java src/main/java/org/rocksdb/HistogramData.java
src/main/java/org/rocksdb/HistogramType.java src/main/java/org/rocksdb/HistogramType.java
src/main/java/org/rocksdb/Holder.java src/main/java/org/rocksdb/Holder.java
@ -451,7 +450,6 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7"
org.rocksdb.FlushOptions org.rocksdb.FlushOptions
org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig
org.rocksdb.HashSkipListMemTableConfig org.rocksdb.HashSkipListMemTableConfig
org.rocksdb.HdfsEnv
org.rocksdb.IngestExternalFileOptions org.rocksdb.IngestExternalFileOptions
org.rocksdb.Logger org.rocksdb.Logger
org.rocksdb.LRUCache org.rocksdb.LRUCache

View File

@ -37,7 +37,6 @@ NATIVE_JAVA_CLASSES = \
org.rocksdb.IngestExternalFileOptions\ org.rocksdb.IngestExternalFileOptions\
org.rocksdb.HashLinkedListMemTableConfig\ org.rocksdb.HashLinkedListMemTableConfig\
org.rocksdb.HashSkipListMemTableConfig\ org.rocksdb.HashSkipListMemTableConfig\
org.rocksdb.HdfsEnv\
org.rocksdb.ConcurrentTaskLimiter\ org.rocksdb.ConcurrentTaskLimiter\
org.rocksdb.ConcurrentTaskLimiterImpl\ org.rocksdb.ConcurrentTaskLimiterImpl\
org.rocksdb.KeyMayExist\ org.rocksdb.KeyMayExist\
@ -134,7 +133,6 @@ JAVA_TESTS = \
org.rocksdb.util.EnvironmentTest\ org.rocksdb.util.EnvironmentTest\
org.rocksdb.EnvOptionsTest\ org.rocksdb.EnvOptionsTest\
org.rocksdb.EventListenerTest\ org.rocksdb.EventListenerTest\
org.rocksdb.HdfsEnvTest\
org.rocksdb.IngestExternalFileOptionsTest\ org.rocksdb.IngestExternalFileOptionsTest\
org.rocksdb.util.IntComparatorTest\ org.rocksdb.util.IntComparatorTest\
org.rocksdb.util.JNIComparatorTest\ org.rocksdb.util.JNIComparatorTest\

View File

@ -600,7 +600,6 @@ public class DbBenchmark {
options.setCompressionType((String)flags_.get(Flag.compression_type)); options.setCompressionType((String)flags_.get(Flag.compression_type));
options.setCompressionLevel((Integer)flags_.get(Flag.compression_level)); options.setCompressionLevel((Integer)flags_.get(Flag.compression_level));
options.setMinLevelToCompress((Integer)flags_.get(Flag.min_level_to_compress)); options.setMinLevelToCompress((Integer)flags_.get(Flag.min_level_to_compress));
options.setHdfs((String)flags_.get(Flag.hdfs)); // env
options.setStatistics((Boolean)flags_.get(Flag.statistics)); options.setStatistics((Boolean)flags_.get(Flag.statistics));
options.setUniversalSizeRatio( options.setUniversalSizeRatio(
(Integer)flags_.get(Flag.universal_size_ratio)); (Integer)flags_.get(Flag.universal_size_ratio));

View File

@ -6,16 +6,17 @@
// This file implements the "bridge" between Java and C++ and enables // This file implements the "bridge" between Java and C++ and enables
// calling c++ ROCKSDB_NAMESPACE::Env methods from Java side. // calling c++ ROCKSDB_NAMESPACE::Env methods from Java side.
#include "rocksdb/env.h"
#include <jni.h> #include <jni.h>
#include <vector> #include <vector>
#include "portal.h"
#include "rocksdb/env.h"
#include "include/org_rocksdb_Env.h" #include "include/org_rocksdb_Env.h"
#include "include/org_rocksdb_HdfsEnv.h"
#include "include/org_rocksdb_RocksEnv.h" #include "include/org_rocksdb_RocksEnv.h"
#include "include/org_rocksdb_RocksMemEnv.h" #include "include/org_rocksdb_RocksMemEnv.h"
#include "include/org_rocksdb_TimedEnv.h" #include "include/org_rocksdb_TimedEnv.h"
#include "portal.h"
/* /*
* Class: org_rocksdb_Env * Class: org_rocksdb_Env
@ -176,43 +177,6 @@ void Java_org_rocksdb_RocksMemEnv_disposeInternal(
delete e; delete e;
} }
/*
* Class: org_rocksdb_HdfsEnv
* Method: createHdfsEnv
* Signature: (Ljava/lang/String;)J
*/
jlong Java_org_rocksdb_HdfsEnv_createHdfsEnv(
JNIEnv* env, jclass, jstring jfsname) {
jboolean has_exception = JNI_FALSE;
auto fsname =
ROCKSDB_NAMESPACE::JniUtil::copyStdString(env, jfsname, &has_exception);
if (has_exception == JNI_TRUE) {
// exception occurred
return 0;
}
ROCKSDB_NAMESPACE::Env* hdfs_env;
ROCKSDB_NAMESPACE::Status s =
ROCKSDB_NAMESPACE::NewHdfsEnv(&hdfs_env, fsname);
if (!s.ok()) {
// error occurred
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, s);
return 0;
}
return reinterpret_cast<jlong>(hdfs_env);
}
/*
* Class: org_rocksdb_HdfsEnv
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_HdfsEnv_disposeInternal(
JNIEnv*, jobject, jlong jhandle) {
auto* e = reinterpret_cast<ROCKSDB_NAMESPACE::Env*>(jhandle);
assert(e != nullptr);
delete e;
}
/* /*
* Class: org_rocksdb_TimedEnv * Class: org_rocksdb_TimedEnv
* Method: createTimedEnv * Method: createTimedEnv

View File

@ -11,7 +11,7 @@ import java.util.List;
* and restore the database * and restore the database
* *
* Be aware, that `new BackupEngine` takes time proportional to the amount * Be aware, that `new BackupEngine` takes time proportional to the amount
* of backups. So if you have a slow filesystem to backup (like HDFS) * of backups. So if you have a slow filesystem to backup
* and you have a lot of backups then restoring can take some time. * and you have a lot of backups then restoring can take some time.
* That's why we recommend to limit the number of backups. * That's why we recommend to limit the number of backups.
* Also we recommend to keep BackupEngine alive and not to recreate it every * Also we recommend to keep BackupEngine alive and not to recreate it every

View File

@ -59,8 +59,6 @@ public class BackupableDBOptions extends RocksObject {
* null, backups will be written out using DBs Env. Otherwise * null, backups will be written out using DBs Env. Otherwise
* backup's I/O will be performed using this object. * backup's I/O will be performed using this object.
* *
* If you want to have backups on HDFS, use HDFS Env here!
*
* Default: null * Default: null
* *
* @param env The environment to use * @param env The environment to use
@ -78,8 +76,6 @@ public class BackupableDBOptions extends RocksObject {
* null, backups will be written out using DBs Env. Otherwise * null, backups will be written out using DBs Env. Otherwise
* backup's I/O will be performed using this object. * backup's I/O will be performed using this object.
* *
* If you want to have backups on HDFS, use HDFS Env here!
*
* Default: null * Default: null
* *
* @return The environment in use * @return The environment in use

View File

@ -1,27 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
/**
* HDFS environment.
*/
public class HdfsEnv extends Env {
/**
<p>Creates a new environment that is used for HDFS environment.</p>
*
* <p>The caller must delete the result when it is
* no longer needed.</p>
*
* @param fsName the HDFS as a string in the form "hdfs://hostname:port/"
*/
public HdfsEnv(final String fsName) {
super(createHdfsEnv(fsName));
}
private static native long createHdfsEnv(final String fsName);
@Override protected final native void disposeInternal(final long handle);
}

View File

@ -1,45 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static java.nio.charset.StandardCharsets.UTF_8;
public class HdfsEnvTest {
@ClassRule
public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE =
new RocksNativeLibraryResource();
@Rule
public TemporaryFolder dbFolder = new TemporaryFolder();
// expect org.rocksdb.RocksDBException: Not compiled with hdfs support
@Test(expected = RocksDBException.class)
public void construct() throws RocksDBException {
try (final Env env = new HdfsEnv("hdfs://localhost:5000")) {
// no-op
}
}
// expect org.rocksdb.RocksDBException: Not compiled with hdfs support
@Test(expected = RocksDBException.class)
public void construct_integration() throws RocksDBException {
try (final Env env = new HdfsEnv("hdfs://localhost:5000");
final Options options = new Options()
.setCreateIfMissing(true)
.setEnv(env);
) {
try (final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getPath())) {
db.put("key1".getBytes(UTF_8), "value1".getBytes(UTF_8));
}
}
}
}

1
src.mk
View File

@ -88,7 +88,6 @@ LIB_SOURCES = \
env/env.cc \ env/env.cc \
env/env_chroot.cc \ env/env_chroot.cc \
env/env_encryption.cc \ env/env_encryption.cc \
env/env_hdfs.cc \
env/env_posix.cc \ env/env_posix.cc \
env/file_system.cc \ env/file_system.cc \
env/fs_posix.cc \ env/fs_posix.cc \

View File

@ -30,6 +30,7 @@
#include <cinttypes> #include <cinttypes>
#include <condition_variable> #include <condition_variable>
#include <cstddef> #include <cstddef>
#include <iostream>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <queue> #include <queue>
@ -39,7 +40,6 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/malloc_stats.h" #include "db/malloc_stats.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "hdfs/env_hdfs.h"
#include "monitoring/histogram.h" #include "monitoring/histogram.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "options/cf_options.h" #include "options/cf_options.h"
@ -1155,15 +1155,12 @@ DEFINE_int32(table_cache_numshardbits, 4, "");
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
DEFINE_string(env_uri, "", DEFINE_string(env_uri, "",
"URI for registry Env lookup. Mutually exclusive" "URI for registry Env lookup. Mutually exclusive"
" with --hdfs and --fs_uri"); " with --fs_uri");
DEFINE_string(fs_uri, "", DEFINE_string(fs_uri, "",
"URI for registry Filesystem lookup. Mutually exclusive" "URI for registry Filesystem lookup. Mutually exclusive"
" with --hdfs and --env_uri." " with --env_uri."
" Creates a default environment with the specified filesystem."); " Creates a default environment with the specified filesystem.");
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
DEFINE_string(hdfs, "",
"Name of hdfs environment. Mutually exclusive with"
" --env_uri and --fs_uri");
DEFINE_string(simulate_hybrid_fs_file, "", DEFINE_string(simulate_hybrid_fs_file, "",
"File for Store Metadata for Simulate hybrid FS. Empty means " "File for Store Metadata for Simulate hybrid FS. Empty means "
"disable the feature. Now, if it is set, " "disable the feature. Now, if it is set, "
@ -3049,12 +3046,6 @@ class Benchmark {
} }
if (report_file_operations_) { if (report_file_operations_) {
if (!FLAGS_hdfs.empty()) {
fprintf(stderr,
"--hdfs and --report_file_operations cannot be enabled "
"at the same time");
exit(1);
}
FLAGS_env = new ReportFileOpEnv(FLAGS_env); FLAGS_env = new ReportFileOpEnv(FLAGS_env);
} }
@ -8143,11 +8134,9 @@ int db_bench_tool(int argc, char** argv) {
FLAGS_blob_db_compression_type_e = FLAGS_blob_db_compression_type_e =
StringToCompressionType(FLAGS_blob_db_compression_type.c_str()); StringToCompressionType(FLAGS_blob_db_compression_type.c_str());
int env_opts = int env_opts = !FLAGS_env_uri.empty() + !FLAGS_fs_uri.empty();
!FLAGS_hdfs.empty() + !FLAGS_env_uri.empty() + !FLAGS_fs_uri.empty();
if (env_opts > 1) { if (env_opts > 1) {
fprintf(stderr, fprintf(stderr, "Error: --env_uri and --fs_uri are mutually exclusive\n");
"Error: --hdfs, --env_uri and --fs_uri are mutually exclusive\n");
exit(1); exit(1);
} }
@ -8177,10 +8166,6 @@ int db_bench_tool(int argc, char** argv) {
exit(1); exit(1);
} }
if (!FLAGS_hdfs.empty()) {
FLAGS_env = new ROCKSDB_NAMESPACE::HdfsEnv(FLAGS_hdfs);
}
if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE")) if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::NONE; FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::NONE;
else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL")) else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))