提交 a64e31a3 编辑于 作者: azagrebin's avatar azagrebin 提交者: Yun Tang
浏览文件

[FLINK-10471] Add Apache Flink specific compaction filter to evict expired...

[FLINK-10471] Add Apache Flink specific compaction filter to evict expired state which has time-to-live
上级 878b3a41
...@@ -741,6 +741,7 @@ set(SOURCES ...@@ -741,6 +741,7 @@ set(SOURCES
utilities/debug.cc utilities/debug.cc
utilities/env_mirror.cc utilities/env_mirror.cc
utilities/env_timed.cc utilities/env_timed.cc
utilities/flink/flink_compaction_filter.cc
utilities/leveldb_options/leveldb_options.cc utilities/leveldb_options/leveldb_options.cc
utilities/memory/memory_util.cc utilities/memory/memory_util.cc
utilities/merge_operators/bytesxor.cc utilities/merge_operators/bytesxor.cc
...@@ -1122,6 +1123,7 @@ if(WITH_TESTS) ...@@ -1122,6 +1123,7 @@ if(WITH_TESTS)
utilities/cassandra/cassandra_format_test.cc utilities/cassandra/cassandra_format_test.cc
utilities/cassandra/cassandra_row_merge_test.cc utilities/cassandra/cassandra_row_merge_test.cc
utilities/cassandra/cassandra_serialize_test.cc utilities/cassandra/cassandra_serialize_test.cc
utilities/flink/flink_compaction_filter_test.cc
utilities/checkpoint/checkpoint_test.cc utilities/checkpoint/checkpoint_test.cc
utilities/memory/memory_test.cc utilities/memory/memory_test.cc
utilities/merge_operators/string_append/stringappend_test.cc utilities/merge_operators/string_append/stringappend_test.cc
......
...@@ -585,6 +585,7 @@ TESTS = \ ...@@ -585,6 +585,7 @@ TESTS = \
compaction_picker_test \ compaction_picker_test \
version_builder_test \ version_builder_test \
file_indexer_test \ file_indexer_test \
flink_compaction_filter_test \
write_batch_test \ write_batch_test \
write_batch_with_index_test \ write_batch_with_index_test \
write_controller_test\ write_controller_test\
...@@ -1407,6 +1408,9 @@ cassandra_serialize_test: utilities/cassandra/cassandra_serialize_test.o $(LIBOB ...@@ -1407,6 +1408,9 @@ cassandra_serialize_test: utilities/cassandra/cassandra_serialize_test.o $(LIBOB
hash_table_test: utilities/persistent_cache/hash_table_test.o $(LIBOBJECTS) $(TESTHARNESS) hash_table_test: utilities/persistent_cache/hash_table_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
flink_compaction_filter_test: utilities/flink/flink_compaction_filter_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
histogram_test: monitoring/histogram_test.o $(LIBOBJECTS) $(TESTHARNESS) histogram_test: monitoring/histogram_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
......
...@@ -330,6 +330,7 @@ cpp_library( ...@@ -330,6 +330,7 @@ cpp_library(
"utilities/debug.cc", "utilities/debug.cc",
"utilities/env_mirror.cc", "utilities/env_mirror.cc",
"utilities/env_timed.cc", "utilities/env_timed.cc",
"utilities/flink/flink_compaction_filter.cc",
"utilities/leveldb_options/leveldb_options.cc", "utilities/leveldb_options/leveldb_options.cc",
"utilities/memory/memory_util.cc", "utilities/memory/memory_util.cc",
"utilities/merge_operators/bytesxor.cc", "utilities/merge_operators/bytesxor.cc",
...@@ -1106,6 +1107,13 @@ ROCKS_TESTS = [ ...@@ -1106,6 +1107,13 @@ ROCKS_TESTS = [
[], [],
[], [],
], ],
[
"flink_compaction_filter_test",
"utilities/flink/flink_compaction_filter_test.cc",
"serial",
[],
[],
],
[ [
"flush_job_test", "flush_job_test",
"db/flush_job_test.cc", "db/flush_job_test.cc",
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
Vagrant.configure("2") do |config| Vagrant.configure("2") do |config|
config.vm.provider "virtualbox" do |v| config.vm.provider "virtualbox" do |v|
v.memory = 4096 v.memory = 6096
v.cpus = 2 v.cpus = 2
end end
......
...@@ -29,6 +29,7 @@ set(JNI_NATIVE_SOURCES ...@@ -29,6 +29,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/config_options.cc rocksjni/config_options.cc
rocksjni/env.cc rocksjni/env.cc
rocksjni/env_options.cc rocksjni/env_options.cc
rocksjni/flink_compactionfilterjni.cc
rocksjni/filter.cc rocksjni/filter.cc
rocksjni/ingest_external_file_options.cc rocksjni/ingest_external_file_options.cc
rocksjni/iterator.cc rocksjni/iterator.cc
...@@ -140,6 +141,7 @@ set(JAVA_MAIN_CLASSES ...@@ -140,6 +141,7 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/EnvOptions.java src/main/java/org/rocksdb/EnvOptions.java
src/main/java/org/rocksdb/Experimental.java src/main/java/org/rocksdb/Experimental.java
src/main/java/org/rocksdb/Filter.java src/main/java/org/rocksdb/Filter.java
src/main/java/org/rocksdb/FlinkCompactionFilter.java
src/main/java/org/rocksdb/FlushOptions.java src/main/java/org/rocksdb/FlushOptions.java
src/main/java/org/rocksdb/HashLinkedListMemTableConfig.java src/main/java/org/rocksdb/HashLinkedListMemTableConfig.java
src/main/java/org/rocksdb/HashSkipListMemTableConfig.java src/main/java/org/rocksdb/HashSkipListMemTableConfig.java
...@@ -415,6 +417,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7" ...@@ -415,6 +417,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7"
org.rocksdb.Env org.rocksdb.Env
org.rocksdb.EnvOptions org.rocksdb.EnvOptions
org.rocksdb.Filter org.rocksdb.Filter
org.rocksdb.FlinkCompactionFilter
org.rocksdb.FlushOptions org.rocksdb.FlushOptions
org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig
org.rocksdb.HashSkipListMemTableConfig org.rocksdb.HashSkipListMemTableConfig
......
...@@ -30,6 +30,7 @@ NATIVE_JAVA_CLASSES = \ ...@@ -30,6 +30,7 @@ NATIVE_JAVA_CLASSES = \
org.rocksdb.DirectSlice\ org.rocksdb.DirectSlice\
org.rocksdb.Env\ org.rocksdb.Env\
org.rocksdb.EnvOptions\ org.rocksdb.EnvOptions\
org.rocksdb.FlinkCompactionFilter\
org.rocksdb.FlushOptions\ org.rocksdb.FlushOptions\
org.rocksdb.Filter\ org.rocksdb.Filter\
org.rocksdb.IngestExternalFileOptions\ org.rocksdb.IngestExternalFileOptions\
......
...@@ -33,7 +33,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| ...@@ -33,7 +33,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
end end
config.vm.provider "virtualbox" do |v| config.vm.provider "virtualbox" do |v|
v.memory = 2048 v.memory = 6048
v.cpus = 4 v.cpus = 4
v.customize ["modifyvm", :id, "--nictype1", "virtio" ] v.customize ["modifyvm", :id, "--nictype1", "virtio" ]
end end
......
#include <climits> // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <include/rocksdb/env.h>
#include <jni.h>
#include "include/org_rocksdb_FlinkCompactionFilter.h"
#include "loggerjnicallback.h"
#include "portal.h"
#include "rocksjni/jnicallback.h"
#include "utilities/flink/flink_compaction_filter.h"
using namespace ROCKSDB_NAMESPACE::flink;
class JniCallbackBase : public ROCKSDB_NAMESPACE::JniCallback {
public:
JniCallbackBase(JNIEnv* env, jobject jcallback_obj)
: JniCallback(env, jcallback_obj) {}
protected:
inline void CheckAndRethrowException(JNIEnv* env) const {
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->Throw(env->ExceptionOccurred());
}
}
};
// This list element filter operates on list state for which byte length of
// elements is unknown (variable), the list element serializer has to be used in
// this case to compute the offset of the next element. The filter wraps java
// object implenented in Flink. The java object holds element serializer and
// performs filtering.
class JavaListElementFilter
: public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ListElementFilter,
JniCallbackBase {
public:
JavaListElementFilter(JNIEnv* env, jobject jlist_filter)
: JniCallbackBase(env, jlist_filter) {
jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass(
env, "org/rocksdb/FlinkCompactionFilter$ListElementFilter");
if (jclazz == nullptr) {
// exception occurred accessing class
return;
}
m_jnext_unexpired_offset_methodid =
env->GetMethodID(jclazz, "nextUnexpiredOffset", "([BJJ)I");
assert(m_jnext_unexpired_offset_methodid != nullptr);
}
std::size_t NextUnexpiredOffset(const ROCKSDB_NAMESPACE::Slice& list,
int64_t ttl,
int64_t current_timestamp) const override {
jboolean attached_thread = JNI_FALSE;
JNIEnv* env = getJniEnv(&attached_thread);
jbyteArray jlist = ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, list);
CheckAndRethrowException(env);
if (jlist == nullptr) {
return static_cast<std::size_t>(-1);
}
auto jl_ttl = static_cast<jlong>(ttl);
auto jl_current_timestamp = static_cast<jlong>(current_timestamp);
jint next_offset =
env->CallIntMethod(m_jcallback_obj, m_jnext_unexpired_offset_methodid,
jlist, jl_ttl, jl_current_timestamp);
CheckAndRethrowException(env);
env->DeleteLocalRef(jlist);
releaseJniEnv(attached_thread);
return static_cast<std::size_t>(next_offset);
};
private:
jmethodID m_jnext_unexpired_offset_methodid;
};
class JavaListElemenFilterFactory
: public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::
ListElementFilterFactory,
JniCallbackBase {
public:
JavaListElemenFilterFactory(JNIEnv* env, jobject jlist_filter_factory)
: JniCallbackBase(env, jlist_filter_factory) {
jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass(
env, "org/rocksdb/FlinkCompactionFilter$ListElementFilterFactory");
if (jclazz == nullptr) {
// exception occurred accessing class
return;
}
m_jcreate_filter_methodid = env->GetMethodID(
jclazz, "createListElementFilter",
"()Lorg/rocksdb/FlinkCompactionFilter$ListElementFilter;");
assert(m_jcreate_filter_methodid != nullptr);
}
FlinkCompactionFilter::ListElementFilter* CreateListElementFilter(
std::shared_ptr<ROCKSDB_NAMESPACE::Logger> /*logger*/) const override {
jboolean attached_thread = JNI_FALSE;
JNIEnv* env = getJniEnv(&attached_thread);
auto jlist_filter =
env->CallObjectMethod(m_jcallback_obj, m_jcreate_filter_methodid);
auto list_filter = new JavaListElementFilter(env, jlist_filter);
CheckAndRethrowException(env);
releaseJniEnv(attached_thread);
return list_filter;
};
private:
jmethodID m_jcreate_filter_methodid;
};
class JavaTimeProvider
: public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::TimeProvider,
JniCallbackBase {
public:
JavaTimeProvider(JNIEnv* env, jobject jtime_provider)
: JniCallbackBase(env, jtime_provider) {
jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass(
env, "org/rocksdb/FlinkCompactionFilter$TimeProvider");
if (jclazz == nullptr) {
// exception occurred accessing class
return;
}
m_jcurrent_timestamp_methodid =
env->GetMethodID(jclazz, "currentTimestamp", "()J");
assert(m_jcurrent_timestamp_methodid != nullptr);
}
int64_t CurrentTimestamp() const override {
jboolean attached_thread = JNI_FALSE;
JNIEnv* env = getJniEnv(&attached_thread);
auto jtimestamp =
env->CallLongMethod(m_jcallback_obj, m_jcurrent_timestamp_methodid);
CheckAndRethrowException(env);
releaseJniEnv(attached_thread);
return static_cast<int64_t>(jtimestamp);
};
private:
jmethodID m_jcurrent_timestamp_methodid;
};
static FlinkCompactionFilter::ListElementFilterFactory*
createListElementFilterFactory(JNIEnv* env, jint ji_list_elem_len,
jobject jlist_filter_factory) {
FlinkCompactionFilter::ListElementFilterFactory* list_filter_factory =
nullptr;
if (ji_list_elem_len > 0) {
auto fixed_size = static_cast<std::size_t>(ji_list_elem_len);
list_filter_factory =
new FlinkCompactionFilter::FixedListElementFilterFactory(
fixed_size, static_cast<std::size_t>(0));
} else if (jlist_filter_factory != nullptr) {
list_filter_factory =
new JavaListElemenFilterFactory(env, jlist_filter_factory);
}
return list_filter_factory;
}
/*x
* Class: org_rocksdb_FlinkCompactionFilter
* Method: createNewFlinkCompactionFilterConfigHolder
* Signature: ()J
*/
jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilterConfigHolder(
JNIEnv* /* env */, jclass /* jcls */) {
using namespace ROCKSDB_NAMESPACE::flink;
return reinterpret_cast<jlong>(
new std::shared_ptr<FlinkCompactionFilter::ConfigHolder>(
new FlinkCompactionFilter::ConfigHolder()));
}
/*
* Class: org_rocksdb_FlinkCompactionFilter
* Method: disposeFlinkCompactionFilterConfigHolder
* Signature: (J)V
*/
void Java_org_rocksdb_FlinkCompactionFilter_disposeFlinkCompactionFilterConfigHolder(
JNIEnv* /* env */, jclass /* jcls */, jlong handle) {
using namespace ROCKSDB_NAMESPACE::flink;
auto* config_holder =
reinterpret_cast<std::shared_ptr<FlinkCompactionFilter::ConfigHolder>*>(
handle);
delete config_holder;
}
/*
* Class: org_rocksdb_FlinkCompactionFilter
* Method: createNewFlinkCompactionFilter0
* Signature: (JJJ)J
*/
jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilter0(
JNIEnv* env, jclass /* jcls */, jlong config_holder_handle,
jobject jtime_provider, jlong logger_handle) {
using namespace ROCKSDB_NAMESPACE::flink;
auto config_holder =
*(reinterpret_cast<std::shared_ptr<FlinkCompactionFilter::ConfigHolder>*>(
config_holder_handle));
auto time_provider = new JavaTimeProvider(env, jtime_provider);
auto logger =
logger_handle == 0
? nullptr
: *(reinterpret_cast<
std::shared_ptr<ROCKSDB_NAMESPACE::LoggerJniCallback>*>(
logger_handle));
return reinterpret_cast<jlong>(new FlinkCompactionFilter(
config_holder,
std::unique_ptr<FlinkCompactionFilter::TimeProvider>(time_provider),
logger));
}
/*
* Class: org_rocksdb_FlinkCompactionFilter
* Method: configureFlinkCompactionFilter
* Signature: (JIIJJILorg/rocksdb/FlinkCompactionFilter$ListElementFilter;)Z
*/
jboolean Java_org_rocksdb_FlinkCompactionFilter_configureFlinkCompactionFilter(
JNIEnv* env, jclass /* jcls */, jlong handle, jint ji_state_type,
jint ji_timestamp_offset, jlong jl_ttl_milli,
jlong jquery_time_after_num_entries, jint ji_list_elem_len,
jobject jlist_filter_factory) {
auto state_type =
static_cast<FlinkCompactionFilter::StateType>(ji_state_type);
auto timestamp_offset = static_cast<size_t>(ji_timestamp_offset);
auto ttl = static_cast<int64_t>(jl_ttl_milli);
auto query_time_after_num_entries =
static_cast<int64_t>(jquery_time_after_num_entries);
auto config_holder =
*(reinterpret_cast<std::shared_ptr<FlinkCompactionFilter::ConfigHolder>*>(
handle));
auto list_filter_factory = createListElementFilterFactory(
env, ji_list_elem_len, jlist_filter_factory);
auto config = new FlinkCompactionFilter::Config{
state_type, timestamp_offset, ttl, query_time_after_num_entries,
std::unique_ptr<FlinkCompactionFilter::ListElementFilterFactory>(
list_filter_factory)};
return static_cast<jboolean>(config_holder->Configure(config));
}
\ No newline at end of file
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
package org.rocksdb;
/**
* Just a Java wrapper around FlinkCompactionFilter implemented in C++.
*
* Note: this compaction filter is a special implementation, designed for usage only in Apache Flink
* project.
*/
public class FlinkCompactionFilter extends AbstractCompactionFilter<Slice> {
public enum StateType {
// WARNING!!! Do not change the order of enum entries as it is important for jni translation
Disabled,
Value,
List
}
public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvider) {
this(configHolder, timeProvider, null);
}
public FlinkCompactionFilter(
ConfigHolder configHolder, TimeProvider timeProvider, Logger logger) {
super(createNewFlinkCompactionFilter0(
configHolder.nativeHandle_, timeProvider, logger == null ? 0 : logger.nativeHandle_));
}
private native static long createNewFlinkCompactionFilter0(
long configHolderHandle, TimeProvider timeProvider, long loggerHandle);
private native static long createNewFlinkCompactionFilterConfigHolder();
private native static void disposeFlinkCompactionFilterConfigHolder(long configHolderHandle);
private native static boolean configureFlinkCompactionFilter(long configHolderHandle,
int stateType, int timestampOffset, long ttl, long queryTimeAfterNumEntries,
int fixedElementLength, ListElementFilterFactory listElementFilterFactory);
public interface ListElementFilter {
/**
* Gets offset of the first unexpired element in the list.
*
* <p>Native code wraps this java object and calls it for list state
* for which element byte length is unknown and Flink custom type serializer has to be used
* to compute offset of the next element in serialized form.
*
* @param list serialised list of elements with timestamp
* @param ttl time-to-live of the list elements
* @param currentTimestamp current timestamp to check expiration against
* @return offset of the first unexpired element in the list
*/
@SuppressWarnings("unused")
int nextUnexpiredOffset(byte[] list, long ttl, long currentTimestamp);
}
public interface ListElementFilterFactory {
@SuppressWarnings("unused") ListElementFilter createListElementFilter();
}
public static class Config {
final StateType stateType;
final int timestampOffset;
final long ttl;
/**
* Number of state entries to process by compaction filter before updating current timestamp.
*/
final long queryTimeAfterNumEntries;
final int fixedElementLength;
final ListElementFilterFactory listElementFilterFactory;
private Config(StateType stateType, int timestampOffset, long ttl,
long queryTimeAfterNumEntries, int fixedElementLength,
ListElementFilterFactory listElementFilterFactory) {
this.stateType = stateType;
this.timestampOffset = timestampOffset;
this.ttl = ttl;
this.queryTimeAfterNumEntries = queryTimeAfterNumEntries;
this.fixedElementLength = fixedElementLength;
this.listElementFilterFactory = listElementFilterFactory;
}
@SuppressWarnings("WeakerAccess")
public static Config createNotList(
StateType stateType, int timestampOffset, long ttl, long queryTimeAfterNumEntries) {
return new Config(stateType, timestampOffset, ttl, queryTimeAfterNumEntries, -1, null);
}
@SuppressWarnings("unused")
public static Config createForValue(long ttl, long queryTimeAfterNumEntries) {
return createNotList(StateType.Value, 0, ttl, queryTimeAfterNumEntries);
}
@SuppressWarnings("unused")
public static Config createForMap(long ttl, long queryTimeAfterNumEntries) {
return createNotList(StateType.Value, 1, ttl, queryTimeAfterNumEntries);
}
@SuppressWarnings("WeakerAccess")
public static Config createForFixedElementList(
long ttl, long queryTimeAfterNumEntries, int fixedElementLength) {
return new Config(StateType.List, 0, ttl, queryTimeAfterNumEntries, fixedElementLength, null);
}
@SuppressWarnings("WeakerAccess")
public static Config createForList(long ttl, long queryTimeAfterNumEntries,
ListElementFilterFactory listElementFilterFactory) {
return new Config(
StateType.List, 0, ttl, queryTimeAfterNumEntries, -1, listElementFilterFactory);
}
}
private static class ConfigHolder extends RocksObject {
ConfigHolder() {
super(createNewFlinkCompactionFilterConfigHolder());
}
@Override
protected void disposeInternal(long handle) {
disposeFlinkCompactionFilterConfigHolder(handle);
}
}
/** Provides current timestamp to check expiration, it must be thread safe. */
public interface TimeProvider { long currentTimestamp(); }
public static class FlinkCompactionFilterFactory
extends AbstractCompactionFilterFactory<FlinkCompactionFilter> {
private final ConfigHolder configHolder;
private final TimeProvider timeProvider;
private final Logger logger;
@SuppressWarnings("unused")
public FlinkCompactionFilterFactory(TimeProvider timeProvider) {
this(timeProvider, null);
}
@SuppressWarnings("WeakerAccess")
public FlinkCompactionFilterFactory(TimeProvider timeProvider, Logger logger) {
this.configHolder = new ConfigHolder();
this.timeProvider = timeProvider;
this.logger = logger;
}
@Override
public void close() {
super.close();
configHolder.close();
if (logger != null) {
logger.close();
}
}
@Override
public FlinkCompactionFilter createCompactionFilter(Context context) {
return new FlinkCompactionFilter(configHolder, timeProvider, logger);
}
@Override
public String name() {
return "FlinkCompactionFilterFactory";
}
@SuppressWarnings("WeakerAccess")
public void configure(Config config) {
boolean already_configured =
!configureFlinkCompactionFilter(configHolder.nativeHandle_, config.stateType.ordinal(),
config.timestampOffset, config.ttl, config.queryTimeAfterNumEntries,
config.fixedElementLength, config.listElementFilterFactory);
if (already_configured) {
throw new IllegalStateException("Compaction filter is already configured");
}
}
}
}
...@@ -16,7 +16,7 @@ public class FilterTest { ...@@ -16,7 +16,7 @@ public class FilterTest {
@Test @Test
public void filter() { public void filter() {
// new Bloom filter // new Bloom filterFactory
final BlockBasedTableConfig blockConfig = new BlockBasedTableConfig(); final BlockBasedTableConfig blockConfig = new BlockBasedTableConfig();
try(final Options options = new Options()) { try(final Options options = new Options()) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*