提交 52b3dfba 编辑于 作者: jiexray's avatar jiexray
浏览文件

implement FlinkRescalingCompactionFilter for eliminating redundant keys in key groups

上级 bf229ed5
......@@ -87,3 +87,10 @@ buckifier/__pycache__
compile_commands.json
clang-format-diff.py
.py3/
java/jmh/.settings/org.eclipse.m2e.core.prefs
java/jmh/.settings/org.eclipse.jdt.core.prefs
java/jmh/.settings/org.eclipse.jdt.apt.core.prefs
java/jmh/.project
java/jmh/.settings/org.eclipse.core.resources.prefs
java/jmh/.factorypath
java/jmh/.classpath
......@@ -742,6 +742,7 @@ set(SOURCES
utilities/env_mirror.cc
utilities/env_timed.cc
utilities/flink/flink_compaction_filter.cc
utilities/flink/flink_rescaling_compaction_filter.cc
utilities/leveldb_options/leveldb_options.cc
utilities/memory/memory_util.cc
utilities/merge_operators/bytesxor.cc
......@@ -1124,6 +1125,7 @@ if(WITH_TESTS)
utilities/cassandra/cassandra_row_merge_test.cc
utilities/cassandra/cassandra_serialize_test.cc
utilities/flink/flink_compaction_filter_test.cc
utilities/flink/flink_rescaling_compaction_filter_test.cc
utilities/checkpoint/checkpoint_test.cc
utilities/memory/memory_test.cc
utilities/merge_operators/string_append/stringappend_test.cc
......
......@@ -586,6 +586,7 @@ TESTS = \
version_builder_test \
file_indexer_test \
flink_compaction_filter_test \
flink_rescaling_compaction_filter_test \
write_batch_test \
write_batch_with_index_test \
write_controller_test\
......@@ -1411,6 +1412,9 @@ hash_table_test: utilities/persistent_cache/hash_table_test.o $(LIBOBJECTS) $(TE
flink_compaction_filter_test: utilities/flink/flink_compaction_filter_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
flink_rescaling_compaction_filter_test: utilities/flink/flink_rescaling_compaction_filter_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
histogram_test: monitoring/histogram_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
......
......@@ -12,12 +12,14 @@ ifneq ($(USE_RTTI), 1)
CXXFLAGS += -fno-rtti
endif
CXXFLAGS += -g
.PHONY: clean librocksdb
all: simple_example column_families_example compact_files_example c_simple_example optimistic_transaction_example transaction_example compaction_filter_example options_file_example
simple_example: librocksdb simple_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
simple_example: simple_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb_debug.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
column_families_example: librocksdb column_families_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
......@@ -50,4 +52,4 @@ clean:
rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example
librocksdb:
cd .. && $(MAKE) static_lib
cd .. && $(MAKE) dbg
......@@ -30,6 +30,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/env.cc
rocksjni/env_options.cc
rocksjni/flink_compactionfilterjni.cc
rocksjni/flink_rescalingcompactionfilterjni.cc
rocksjni/filter.cc
rocksjni/ingest_external_file_options.cc
rocksjni/iterator.cc
......@@ -142,6 +143,7 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/Experimental.java
src/main/java/org/rocksdb/Filter.java
src/main/java/org/rocksdb/FlinkCompactionFilter.java
src/main/java/org/rocksdb/FlinkRescalingCompactionFilter.java
src/main/java/org/rocksdb/FlushOptions.java
src/main/java/org/rocksdb/HashLinkedListMemTableConfig.java
src/main/java/org/rocksdb/HashSkipListMemTableConfig.java
......@@ -419,6 +421,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7"
org.rocksdb.EnvOptions
org.rocksdb.Filter
org.rocksdb.FlinkCompactionFilter
org.rocksdb.FlinkRescalingCompactionFilter
org.rocksdb.FlushOptions
org.rocksdb.HashLinkedListMemTableConfig
org.rocksdb.HashSkipListMemTableConfig
......
......@@ -31,6 +31,7 @@ NATIVE_JAVA_CLASSES = \
org.rocksdb.Env\
org.rocksdb.EnvOptions\
org.rocksdb.FlinkCompactionFilter\
org.rocksdb.FlinkRescalingCompactionFilter\
org.rocksdb.FlushOptions\
org.rocksdb.Filter\
org.rocksdb.IngestExternalFileOptions\
......
#include <climits> // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <include/rocksdb/env.h>
#include <jni.h>
#include "include/org_rocksdb_FlinkRescalingCompactionFilter.h"
#include "loggerjnicallback.h"
#include "portal.h"
#include "rocksjni/jnicallback.h"
#include "utilities/flink/flink_rescaling_compaction_filter.h"
using namespace ROCKSDB_NAMESPACE::flink;
/*
* Class: org_rocksdb_FlinkRescalingCompactionFilter
* Method: createNewFlinkRescalingCompactionConfigHolder
* Signature: ()J
*/
jlong Java_org_rocksdb_FlinkRescalingCompactionFilter_createNewFlinkRescalingCompactionConfigHolder(
JNIEnv* /* env */, jclass /* jcls */) {
using namespace ROCKSDB_NAMESPACE::flink;
return reinterpret_cast<jlong>(new std::shared_ptr<FlinkRescalingCompactionFilter::ConfigHolder>(
new FlinkRescalingCompactionFilter::ConfigHolder()));
}
/*
* Class: org_rocksdb_FlinkRescalingCompactionFilter
* Method: disposeFlinkRescalingCompactionFilterConfigHolder
* Signature: (J)V
*/
void Java_org_rocksdb_FlinkRescalingCompactionFilter_disposeFlinkRescalingCompactionFilterConfigHolder(
JNIEnv* /* env */, jclass /* jcls */, jlong handle) {
using namespace ROCKSDB_NAMESPACE::flink;
auto* config_holder =
reinterpret_cast<std::shared_ptr<FlinkRescalingCompactionFilter::ConfigHolder>*>(
handle);
delete config_holder;
}
/*
* Class: org_rocksdb_FlinkRescalingCompactionFilter
* Method: createNewFlinkRescalingCompactionFilter0
* Signature: (JJ)J
*/
jlong Java_org_rocksdb_FlinkRescalingCompactionFilter_createNewFlinkRescalingCompactionFilter0(
JNIEnv* /* env */, jclass /* jcls */, jlong config_holder_handle,
jlong logger_handle) {
using namespace ROCKSDB_NAMESPACE::flink;
auto config_holder = *(reinterpret_cast<std::shared_ptr<FlinkRescalingCompactionFilter::ConfigHolder>*>(
config_holder_handle));
auto logger =
logger_handle == 0
? nullptr
: *(reinterpret_cast<std::shared_ptr<ROCKSDB_NAMESPACE::LoggerJniCallback>*>(
logger_handle));
return reinterpret_cast<jlong>(new FlinkRescalingCompactionFilter(
config_holder,
logger));
}
/**
* Class: org_rocksdb_FlinkRescalingCompactionFilter
* Method: createNewFlinkRescalingCompactionFilter0
* Signature: ?
*/
jboolean Java_org_rocksdb_FlinkRescalingCompactionFilter_configureFlinkRescalingCompactionFilter(
JNIEnv* env, jclass /* jcls */, jlong handle, jint ji_rescale_round,
jbyteArray j_smallest_key, jint j_smallest_key_len,
jbyteArray j_largest_key, jint j_largest_key_len) {
auto rescale_round = static_cast<FlinkRescalingCompactionFilter::RescaleRound>(ji_rescale_round);
auto config_holder =
*(reinterpret_cast<std::shared_ptr<FlinkRescalingCompactionFilter::ConfigHolder>*>(
handle));
jbyte* smallest_key = new jbyte[j_smallest_key_len];
env->GetByteArrayRegion(j_smallest_key, 0, j_smallest_key_len, smallest_key);
if (env->ExceptionCheck()) {
// exception thrown: ArrayIndexOutOfBoundsException
delete[] smallest_key;
return false;
}
jbyte* largest_key = new jbyte[j_largest_key_len];
env->GetByteArrayRegion(j_largest_key, 0, j_largest_key_len, largest_key);
if (env->ExceptionCheck()) {
// exception thrown: ArrayIndexOutOfBoundsException
delete[] largest_key;
return false;
}
ROCKSDB_NAMESPACE::Slice smallest_key_slice(reinterpret_cast<char*>(smallest_key), j_smallest_key_len);
ROCKSDB_NAMESPACE::Slice largest_key_slice(reinterpret_cast<char*>(largest_key), j_largest_key_len);
auto config = new FlinkRescalingCompactionFilter::Config{rescale_round, smallest_key_slice, largest_key_slice};
return static_cast<jboolean>(config_holder->Configure(config));
}
\ No newline at end of file
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
/**
* Just a Java wrapper around FlinkRescalingCompactionFilter implemented in C++.
*
* Note: this compaction filter is a special implementation, designed for usage only in Apache Flink
* project.
*/
public class FlinkRescalingCompactionFilter extends AbstractCompactionFilter<Slice> {
public enum RescaleRound {
Disabled,
Zero,
One
}
public FlinkRescalingCompactionFilter(ConfigHolder configHolder) {
this(configHolder, null);
}
public FlinkRescalingCompactionFilter(ConfigHolder configHolder, Logger logger) {
super(createNewFlinkRescalingCompactionFilter0(configHolder.nativeHandle_, logger.nativeHandle_));
}
private native static long createNewFlinkRescalingCompactionFilter0(
long configHolderHandle, long loggerHandle);
private native static long createNewFlinkRescalingCompactionConfigHolder();
private native static void disposeFlinkRescalingCompactionFilterConfigHolder(long configHolderHandle);
private native static boolean configureFlinkRescalingCompactionFilter(long configHolderHandle,
int rescaleRound, byte[] smallestKey, int smallestKeyLen,
byte[] largestKey, int largestKeyLen);
public static class Config {
final RescaleRound rescaleRound;
final byte[] smallestKey;
final byte[] largestKey;
private Config(RescaleRound rescaleRound, byte[] smallestKey, byte[] largestKey) {
this.rescaleRound = rescaleRound;
this.smallestKey = smallestKey;
this.largestKey = largestKey;
}
@SuppressWarnings("WeakerAccess")
public static Config createForZero(byte[] smallestKey, byte[] largestKey) {
return new Config(RescaleRound.Zero, smallestKey, largestKey);
}
public static Config createForOne(byte[] smallestKey, byte[] largestKey) {
return new Config(RescaleRound.One, smallestKey, largestKey);
}
}
private static class ConfigHolder extends RocksObject {
ConfigHolder() {
super(createNewFlinkRescalingCompactionConfigHolder());
}
@Override
protected void disposeInternal(long handle) {
disposeFlinkRescalingCompactionFilterConfigHolder(handle);
}
}
public static class FlinkRescalingCompactionFilterFactory
extends AbstractCompactionFilterFactory<FlinkRescalingCompactionFilter> {
private final ConfigHolder configHolder;
private final Logger logger;
@SuppressWarnings("unused")
public FlinkRescalingCompactionFilterFactory() {
this(null);
}
@SuppressWarnings("WeakerAccess")
public FlinkRescalingCompactionFilterFactory(Logger logger) {
this.configHolder = new ConfigHolder();
this.logger = logger;
}
@Override
public void close() {
super.close();
configHolder.close();
if (logger != null) {
logger.close();
}
}
@Override
public FlinkRescalingCompactionFilter createCompactionFilter(Context context) {
return new FlinkRescalingCompactionFilter(configHolder, logger);
}
@Override
public String name() {
return "FlinkRescalingCompactionFilterFactory";
}
@SuppressWarnings("WeakerAccess")
public void configure(Config config) {
boolean already_configured =
!configureFlinkRescalingCompactionFilter(configHolder.nativeHandle_, config.rescaleRound.ordinal(),
config.smallestKey, config.smallestKey.length,
config.largestKey, config.largestKey.length);
if (already_configured) {
throw new IllegalStateException("Compaction filter is already configured");
}
}
}
}
\ No newline at end of file
......@@ -211,6 +211,7 @@ LIB_SOURCES = \
utilities/env_mirror.cc \
utilities/env_timed.cc \
utilities/flink/flink_compaction_filter.cc \
utilities/flink/flink_rescaling_compaction_filter.cc \
utilities/leveldb_options/leveldb_options.cc \
utilities/memory/memory_util.cc \
utilities/merge_operators/max.cc \
......@@ -466,6 +467,7 @@ MAIN_SOURCES = \
utilities/cassandra/cassandra_serialize_test.cc \
utilities/checkpoint/checkpoint_test.cc \
utilities/flink/flink_compaction_filter_test.cc \
utilities/flink/flink_rescaling_compaction_filter_test.cc \
utilities/memory/memory_test.cc \
utilities/merge_operators/string_append/stringappend_test.cc \
utilities/object_registry_test.cc \
......@@ -504,6 +506,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/env.cc \
java/rocksjni/env_options.cc \
java/rocksjni/flink_compactionfilterjni.cc \
java/rocksjni/flink_rescalingcompactionfilterjni.cc \
java/rocksjni/ingest_external_file_options.cc \
java/rocksjni/filter.cc \
java/rocksjni/iterator.cc \
......
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/flink/flink_rescaling_compaction_filter.h"
namespace ROCKSDB_NAMESPACE {
namespace flink {
int compare(const Slice& a, const Slice& b) {
return a.compare(b);
}
FlinkRescalingCompactionFilter::ConfigHolder::ConfigHolder()
: config_(const_cast<FlinkRescalingCompactionFilter::Config*>(&DISABLED_RESCALING_CONFIG)){};
FlinkRescalingCompactionFilter::ConfigHolder::~ConfigHolder() {
Config* config = config_.load();
if (config != &DISABLED_RESCALING_CONFIG) {
delete config;
}
}
bool FlinkRescalingCompactionFilter::ConfigHolder::Configure(Config* config) {
bool not_configured = GetConfig() == &DISABLED_RESCALING_CONFIG;
if (not_configured) {
config_ = config;
}
return not_configured;
}
FlinkRescalingCompactionFilter::Config*
FlinkRescalingCompactionFilter::ConfigHolder::GetConfig() {
return config_.load();
}
FlinkRescalingCompactionFilter::FlinkRescalingCompactionFilter(
std::shared_ptr<ConfigHolder> config_holder)
: FlinkRescalingCompactionFilter(std::move(config_holder), nullptr){};
FlinkRescalingCompactionFilter::FlinkRescalingCompactionFilter(
std::shared_ptr<ConfigHolder> config_holder, std::shared_ptr<Logger> logger)
: config_holder_(std::move(config_holder)),
logger_(std::move(logger)),
config_cached_(const_cast<Config*>(&DISABLED_RESCALING_CONFIG)){};
inline void FlinkRescalingCompactionFilter::InitConfigIfNotYet() const {
const_cast<FlinkRescalingCompactionFilter*>(this)->config_cached_ =
config_cached_ == &DISABLED_RESCALING_CONFIG ? config_holder_->GetConfig()
: config_cached_;
}
const char* FlinkRescalingCompactionFilter::Name() const {
return "FlinkRescalingCompactionFilter";
}
CompactionFilter::Decision FlinkRescalingCompactionFilter::FilterV2(
int /* level */, const Slice& key, ValueType /* value_type */,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const {
InitConfigIfNotYet();
const char key_rescale_round = existing_value.data()[0];
Debug(logger_.get(),
"Call FlinkRescalingCompactionFilter::FilterV2 - Key %s, rescale byte %d, "
"Rescaling round: %d, smallest key: %s, largest key: %s",
key.ToString().c_str(), config_cached_->rescale_round_, key_rescale_round,
config_cached_->smallest_key_.ToString().c_str(),
config_cached_->largest_key_.ToString().c_str());
const RescaleRound rescale_round = config_cached_->rescale_round_;
if (key_rescale_round == rescale_round) {
return Decision::kKeep;
}
// if key is in range [smallest, largest]
if (compare(key, config_cached_->smallest_key_) >= 0 &&
compare(key, config_cached_->largest_key_) <= 0) {
new_value->clear();
new_value->assign(existing_value.data(), existing_value.size());
(*new_value)[0] = (char) rescale_round;
Logger* logger = logger_.get();
if (logger && logger->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) {
Debug(logger, "New value: %s", new_value->c_str());
}
return Decision::kChangeValue;
} else {
return Decision::kRemove;
}
}
} // namespace flink
} // namespace ROCKSDB_NAMESPACE
\ No newline at end of file
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <include/rocksdb/env.h>
#include <atomic>
#include <chrono>
#include <functional>
#include <string>
#include <utility>
#include "rocksdb/compaction_filter.h"
#include "rocksdb/slice.h"
namespace ROCKSDB_NAMESPACE {
namespace flink {
static const std::size_t BITS_PER_BYTE = static_cast<std::size_t>(8);
static const std::size_t RESCALE_BIT_SIZE = static_cast<std::size_t>(1);
static const int64_t JAVA_MIN_LONG = static_cast<int64_t>(0x8000000000000000);
static const int64_t JAVA_MAX_LONG = static_cast<int64_t>(0x7fffffffffffffff);
static const std::size_t JAVA_MAX_SIZE = static_cast<std::size_t>(0x7fffffff);
class FlinkRescalingCompactionFilter : public CompactionFilter {
public:
enum RescaleRound {
Disabled,
Zero,
One
};
struct Config {
RescaleRound rescale_round_;
Slice smallest_key_;
Slice largest_key_;
};
class ConfigHolder {
public:
explicit ConfigHolder();
~ConfigHolder();
bool Configure(Config* config);
Config* GetConfig();
private:
std::atomic<Config*> config_;
};
explicit FlinkRescalingCompactionFilter(std::shared_ptr<ConfigHolder> config_holder);
explicit FlinkRescalingCompactionFilter(std::shared_ptr<ConfigHolder> config_holder,
std::shared_ptr<Logger> logger);
const char* Name() const override;
Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const override;
bool IgnoreSnapshots() const override { return true; }
private:
inline void InitConfigIfNotYet() const;
std::shared_ptr<ConfigHolder> config_holder_;
std::shared_ptr<Logger> logger_;
Config* config_cached_;
};
static const FlinkRescalingCompactionFilter::Config DISABLED_RESCALING_CONFIG =
FlinkRescalingCompactionFilter::Config{FlinkRescalingCompactionFilter::RescaleRound::Disabled};
} // namespace flink
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/flink/flink_rescaling_compaction_filter.h"
#include <random>
#include "test_util/testharness.h"
namespace ROCKSDB_NAMESPACE {
namespace flink {
#define DISABLE FlinkRescalingCompactionFilter::RescaleRound::Disabled
#define ZERO FlinkRescalingCompactionFilter::RescaleRound::Zero
#define ONE FlinkRescalingCompactionFilter::RescaleRound::One
#define KVALUE CompactionFilter::ValueType::kValue
#define KMERGE CompactionFilter::ValueType::kMergeOperand
#define KBLOB CompactionFilter::ValueType::kBlobIndex
#define KKEEP CompactionFilter::Decision::kKeep
#define KREMOVE CompactionFilter::Decision::kRemove
#define KCHANGE CompactionFilter::Decision::kChangeValue
class ConsoleLogger : public Logger {
public:
using Logger::Logv;
ConsoleLogger() : Logger(InfoLogLevel::DEBUG_LEVEL) {}
void Logv(const char* format, va_list ap) override {
vprintf(format, ap);
printf("\n");
}
};
Slice key; // NOLINT
char data[24];
std::string new_list = ""; // NOLINT
std::string stub = ""; // NOLINT
FlinkRescalingCompactionFilter::RescaleRound rescale_round;
Slice largest_key;
Slice smallest_key;
CompactionFilter::ValueType value_type;
FlinkRescalingCompactionFilter* filter; // NOLINT
CompactionFilter::Decision decide() {
return filter->FilterV2(0, key, value_type, Slice(data), &new_list, &stub);
}
void Init(
FlinkRescalingCompactionFilter::RescaleRound rround,
FlinkRescalingCompactionFilter::RescaleRound vround,
const Slice& skey,
const Slice& lkey,
const Slice& vkey = Slice("key")) {
rescale_round = rround;
smallest_key = skey;
largest_key = lkey;
value_type = CompactionFilter::ValueType::kValue;
data[0] = vround;
key = vkey;
auto config_holder = std::make_shared<FlinkRescalingCompactionFilter::ConfigHolder>();
auto logger = std::make_shared<ConsoleLogger>();
filter = new FlinkRescalingCompactionFilter(config_holder, logger);
auto config = new FlinkRescalingCompactionFilter::Config{rescale_round, smallest_key, largest_key};
EXPECT_TRUE(config_holder->Configure(config));
EXPECT_FALSE(config_holder->Configure(config));
}
void Deinit() {delete filter;}
TEST(FlinkStateRescaleTest, CheckRescaleRoundEnumOrder) {
EXPECT_EQ(DISABLE, 0);
EXPECT_EQ(ZERO, 1);
EXPECT_EQ(ONE, 2);
}
TEST(FlinkStateRescaleTest, CurrentRound) {