提交 18203969 编辑于 作者: rui xia's avatar rui xia
浏览文件

Extract key group to filter keys

上级 e515ea70
...@@ -67,32 +67,18 @@ jlong Java_org_rocksdb_FlinkRescalingCompactionFilter_createNewFlinkRescalingCom ...@@ -67,32 +67,18 @@ jlong Java_org_rocksdb_FlinkRescalingCompactionFilter_createNewFlinkRescalingCom
* Signature: ? * Signature: ?
*/ */
jboolean Java_org_rocksdb_FlinkRescalingCompactionFilter_configureFlinkRescalingCompactionFilter( jboolean Java_org_rocksdb_FlinkRescalingCompactionFilter_configureFlinkRescalingCompactionFilter(
JNIEnv* env, jclass /* jcls */, jlong handle, jint ji_rescale_round, JNIEnv* /* env */, jclass /* jcls */, jlong handle, jint ji_rescale_round,
jbyteArray j_smallest_key, jint j_smallest_key_len, jint j_start_key_range, jint j_end_key_range, jint j_key_group_prefix_bytes) {
jbyteArray j_largest_key, jint j_largest_key_len) {
auto rescale_round = static_cast<FlinkRescalingCompactionFilter::RescaleRound>(ji_rescale_round); auto rescale_round = static_cast<FlinkRescalingCompactionFilter::RescaleRound>(ji_rescale_round);
auto config_holder = auto config_holder =
*(reinterpret_cast<std::shared_ptr<FlinkRescalingCompactionFilter::ConfigHolder>*>( *(reinterpret_cast<std::shared_ptr<FlinkRescalingCompactionFilter::ConfigHolder>*>(
handle)); handle));
jbyte* smallest_key = new jbyte[j_smallest_key_len];
env->GetByteArrayRegion(j_smallest_key, 0, j_smallest_key_len, smallest_key);
if (env->ExceptionCheck()) {
// exception thrown: ArrayIndexOutOfBoundsException
delete[] smallest_key;
return false;
}
jbyte* largest_key = new jbyte[j_largest_key_len]; auto start_key_group = static_cast<uint32_t>(j_start_key_range);
env->GetByteArrayRegion(j_largest_key, 0, j_largest_key_len, largest_key); auto end_key_group = static_cast<uint32_t>(j_end_key_range);
if (env->ExceptionCheck()) { auto key_group_prefix_bytes = static_cast<uint32_t>(j_key_group_prefix_bytes);
// exception thrown: ArrayIndexOutOfBoundsException
delete[] largest_key; auto config = new FlinkRescalingCompactionFilter::Config{
return false; rescale_round, start_key_group, end_key_group, key_group_prefix_bytes};
}
ROCKSDB_NAMESPACE::Slice smallest_key_slice(reinterpret_cast<char*>(smallest_key), j_smallest_key_len);
ROCKSDB_NAMESPACE::Slice largest_key_slice(reinterpret_cast<char*>(largest_key), j_largest_key_len);
auto config = new FlinkRescalingCompactionFilter::Config{rescale_round, smallest_key_slice, largest_key_slice};
return static_cast<jboolean>(config_holder->Configure(config)); return static_cast<jboolean>(config_holder->Configure(config));
} }
\ No newline at end of file
...@@ -31,27 +31,28 @@ public class FlinkRescalingCompactionFilter extends AbstractCompactionFilter<Sli ...@@ -31,27 +31,28 @@ public class FlinkRescalingCompactionFilter extends AbstractCompactionFilter<Sli
private native static long createNewFlinkRescalingCompactionConfigHolder(); private native static long createNewFlinkRescalingCompactionConfigHolder();
private native static void disposeFlinkRescalingCompactionFilterConfigHolder(long configHolderHandle); private native static void disposeFlinkRescalingCompactionFilterConfigHolder(long configHolderHandle);
private native static boolean configureFlinkRescalingCompactionFilter(long configHolderHandle, private native static boolean configureFlinkRescalingCompactionFilter(long configHolderHandle,
int rescaleRound, byte[] smallestKey, int smallestKeyLen, int rescaleRound, int startKeyGroup, int endKeyGroup, int keyGroupPrefixBytes);
byte[] largestKey, int largestKeyLen);
public static class Config { public static class Config {
final RescaleRound rescaleRound; final RescaleRound rescaleRound;
final byte[] smallestKey; final int startKeyGroup;
final byte[] largestKey; final int endKeyGroup;
final int keyGroupPrefixBytes;
private Config(RescaleRound rescaleRound, byte[] smallestKey, byte[] largestKey) { private Config(RescaleRound rescaleRound, int startKeyGroup, int endKeyGroup, int keyGroupPrefixBytes) {
this.rescaleRound = rescaleRound; this.rescaleRound = rescaleRound;
this.smallestKey = smallestKey; this.startKeyGroup = startKeyGroup;
this.largestKey = largestKey; this.endKeyGroup = endKeyGroup;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
} }
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
public static Config createForZero(byte[] smallestKey, byte[] largestKey) { public static Config createForZero(int startKeyGroup, int endKeyGroup, int keyGroupPrefixBytes) {
return new Config(RescaleRound.Zero, smallestKey, largestKey); return new Config(RescaleRound.Zero, startKeyGroup, endKeyGroup, keyGroupPrefixBytes);
} }
public static Config createForOne(byte[] smallestKey, byte[] largestKey) { public static Config createForOne(int startKeyGroup, int endKeyGroup, int keyGroupPrefixBytes) {
return new Config(RescaleRound.One, smallestKey, largestKey); return new Config(RescaleRound.One, startKeyGroup, endKeyGroup, keyGroupPrefixBytes);
} }
} }
...@@ -105,8 +106,7 @@ public class FlinkRescalingCompactionFilter extends AbstractCompactionFilter<Sli ...@@ -105,8 +106,7 @@ public class FlinkRescalingCompactionFilter extends AbstractCompactionFilter<Sli
public void configure(Config config) { public void configure(Config config) {
boolean already_configured = boolean already_configured =
!configureFlinkRescalingCompactionFilter(configHolder.nativeHandle_, config.rescaleRound.ordinal(), !configureFlinkRescalingCompactionFilter(configHolder.nativeHandle_, config.rescaleRound.ordinal(),
config.smallestKey, config.smallestKey.length, config.startKeyGroup, config.endKeyGroup, config.keyGroupPrefixBytes);
config.largestKey, config.largestKey.length);
if (already_configured) { if (already_configured) {
throw new IllegalStateException("Compaction filter is already configured"); throw new IllegalStateException("Compaction filter is already configured");
} }
......
...@@ -8,8 +8,15 @@ ...@@ -8,8 +8,15 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace flink { namespace flink {
int compare(const Slice& a, const Slice& b) { uint32_t get_key_group(const Slice& key, int key_group_prefix_bytes) {
return a.compare(b); // de-serialize key-group in the prefix of the key
uint32_t key_group = 0;
for (int i = 0; i < key_group_prefix_bytes; ++i) {
key_group <<= 8;
key_group |= (key[i] & 0xFF);
}
return key_group;
} }
FlinkRescalingCompactionFilter::ConfigHolder::ConfigHolder() FlinkRescalingCompactionFilter::ConfigHolder::ConfigHolder()
...@@ -57,37 +64,23 @@ const char* FlinkRescalingCompactionFilter::Name() const { ...@@ -57,37 +64,23 @@ const char* FlinkRescalingCompactionFilter::Name() const {
CompactionFilter::Decision FlinkRescalingCompactionFilter::FilterV2( CompactionFilter::Decision FlinkRescalingCompactionFilter::FilterV2(
int /* level */, const Slice& key, ValueType /* value_type */, int /* level */, const Slice& key, ValueType /* value_type */,
const Slice& existing_value, std::string* new_value, const Slice& /* existing_value */, std::string* /* new_value */,
std::string* /*skip_until*/) const { std::string* /*skip_until*/) const {
InitConfigIfNotYet(); InitConfigIfNotYet();
const char key_rescale_round = existing_value.data()[0]; const uint32_t key_group = get_key_group(key, config_cached_->key_group_prefix_bytes_);
Debug(logger_.get(), Debug(logger_.get(),
"Call FlinkRescalingCompactionFilter::FilterV2 - Key %s, rescale byte %d, " "Call FlinkRescalingCompactionFilter::FilterV2 - Key %s (key group %d), "
"Rescaling round: %d, smallest key: %s, largest key: %s", "smallest key group: %d, largest key group: %d",
key.ToString().c_str(), config_cached_->rescale_round_, key_rescale_round, key.ToString().c_str(), key_group,
config_cached_->smallest_key_.ToString().c_str(), config_cached_->start_key_group_,
config_cached_->largest_key_.ToString().c_str()); config_cached_->end_key_group_);
const RescaleRound rescale_round = config_cached_->rescale_round_;
if (key_rescale_round == rescale_round) {
return Decision::kKeep;
}
// if key is in range [smallest, largest] // if key is in range [smallest, largest]
if (compare(key, config_cached_->smallest_key_) >= 0 && if (key_group >= config_cached_->start_key_group_ && key_group <= config_cached_->end_key_group_) {
compare(key, config_cached_->largest_key_) <= 0) { return Decision::kKeep;
new_value->clear();
new_value->assign(existing_value.data(), existing_value.size());
(*new_value)[0] = (char) rescale_round;
Logger* logger = logger_.get();
if (logger && logger->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) {
Debug(logger, "New value: %s", new_value->c_str());
}
return Decision::kChangeValue;
} else { } else {
return Decision::kRemove; return Decision::kRemove;
} }
......
...@@ -31,10 +31,11 @@ class FlinkRescalingCompactionFilter : public CompactionFilter { ...@@ -31,10 +31,11 @@ class FlinkRescalingCompactionFilter : public CompactionFilter {
One One
}; };
struct Config { struct Config {
RescaleRound rescale_round_; RescaleRound rescale_round_;
Slice smallest_key_; uint32_t start_key_group_;
Slice largest_key_; uint32_t end_key_group_;
uint32_t key_group_prefix_bytes_;
}; };
class ConfigHolder { class ConfigHolder {
......
...@@ -41,8 +41,8 @@ std::string new_list = ""; // NOLINT ...@@ -41,8 +41,8 @@ std::string new_list = ""; // NOLINT
std::string stub = ""; // NOLINT std::string stub = ""; // NOLINT
FlinkRescalingCompactionFilter::RescaleRound rescale_round; FlinkRescalingCompactionFilter::RescaleRound rescale_round;
Slice largest_key; uint32_t end_key_group;
Slice smallest_key; uint32_t start_key_group;
CompactionFilter::ValueType value_type; CompactionFilter::ValueType value_type;
FlinkRescalingCompactionFilter* filter; // NOLINT FlinkRescalingCompactionFilter* filter; // NOLINT
...@@ -53,12 +53,12 @@ CompactionFilter::Decision decide() { ...@@ -53,12 +53,12 @@ CompactionFilter::Decision decide() {
void Init( void Init(
FlinkRescalingCompactionFilter::RescaleRound rround, FlinkRescalingCompactionFilter::RescaleRound rround,
FlinkRescalingCompactionFilter::RescaleRound vround, FlinkRescalingCompactionFilter::RescaleRound vround,
const Slice& skey, const uint32_t skey_group,
const Slice& lkey, const uint32_t ekey_group,
const Slice& vkey = Slice("key")) { const Slice& vkey = Slice(std::string(new char[5]{10, '0', '0', '0', 0}, 5))) {
rescale_round = rround; rescale_round = rround;
smallest_key = skey; start_key_group = skey_group;
largest_key = lkey; end_key_group = ekey_group;
value_type = CompactionFilter::ValueType::kValue; value_type = CompactionFilter::ValueType::kValue;
data[0] = vround; data[0] = vround;
key = vkey; key = vkey;
...@@ -67,7 +67,7 @@ void Init( ...@@ -67,7 +67,7 @@ void Init(
auto logger = std::make_shared<ConsoleLogger>(); auto logger = std::make_shared<ConsoleLogger>();
filter = new FlinkRescalingCompactionFilter(config_holder, logger); filter = new FlinkRescalingCompactionFilter(config_holder, logger);
auto config = new FlinkRescalingCompactionFilter::Config{rescale_round, smallest_key, largest_key}; auto config = new FlinkRescalingCompactionFilter::Config{rescale_round, start_key_group, end_key_group, 1};
EXPECT_TRUE(config_holder->Configure(config)); EXPECT_TRUE(config_holder->Configure(config));
EXPECT_FALSE(config_holder->Configure(config)); EXPECT_FALSE(config_holder->Configure(config));
} }
...@@ -83,7 +83,7 @@ TEST(FlinkStateRescaleTest, CheckRescaleRoundEnumOrder) { ...@@ -83,7 +83,7 @@ TEST(FlinkStateRescaleTest, CheckRescaleRoundEnumOrder) {
TEST(FlinkStateRescaleTest, CurrentRound) { TEST(FlinkStateRescaleTest, CurrentRound) {
Init(FlinkRescalingCompactionFilter::RescaleRound::One, Init(FlinkRescalingCompactionFilter::RescaleRound::One,
FlinkRescalingCompactionFilter::RescaleRound::One, FlinkRescalingCompactionFilter::RescaleRound::One,
Slice("000"), Slice("111")); 0, 127);
EXPECT_EQ(decide(), KKEEP); EXPECT_EQ(decide(), KKEEP);
Deinit(); Deinit();
} }
...@@ -91,7 +91,7 @@ TEST(FlinkStateRescaleTest, CurrentRound) { ...@@ -91,7 +91,7 @@ TEST(FlinkStateRescaleTest, CurrentRound) {
TEST(FlinkStateRescaleTest, CurrentRound2) { TEST(FlinkStateRescaleTest, CurrentRound2) {
Init(FlinkRescalingCompactionFilter::RescaleRound::Zero, Init(FlinkRescalingCompactionFilter::RescaleRound::Zero,
FlinkRescalingCompactionFilter::RescaleRound::Zero, FlinkRescalingCompactionFilter::RescaleRound::Zero,
Slice("000"), Slice("111")); 0, 127);
EXPECT_EQ(decide(), KKEEP); EXPECT_EQ(decide(), KKEEP);
Deinit(); Deinit();
} }
...@@ -99,29 +99,27 @@ TEST(FlinkStateRescaleTest, CurrentRound2) { ...@@ -99,29 +99,27 @@ TEST(FlinkStateRescaleTest, CurrentRound2) {
TEST(FlinkStateRescaleTest, NotCurrentRoundInRange) { TEST(FlinkStateRescaleTest, NotCurrentRoundInRange) {
Init(FlinkRescalingCompactionFilter::RescaleRound::Zero, Init(FlinkRescalingCompactionFilter::RescaleRound::Zero,
FlinkRescalingCompactionFilter::RescaleRound::One, FlinkRescalingCompactionFilter::RescaleRound::One,
Slice("000"), Slice("111"), Slice("000")); 0, 127, Slice(std::string(new char[5]{0, '0', '0', '0', 0}, 5)));
EXPECT_EQ(decide(), KCHANGE); EXPECT_EQ(decide(), KKEEP);
EXPECT_EQ(new_list.data()[0], FlinkRescalingCompactionFilter::RescaleRound::Zero);
Deinit(); Deinit();
Init(FlinkRescalingCompactionFilter::RescaleRound::Zero, Init(FlinkRescalingCompactionFilter::RescaleRound::Zero,
FlinkRescalingCompactionFilter::RescaleRound::One, FlinkRescalingCompactionFilter::RescaleRound::One,
Slice("000"), Slice("111"), Slice("111")); 0, 127, Slice(std::string(new char[5]{127, '0', '0', '0', 0}, 5)));
EXPECT_EQ(decide(), KCHANGE); EXPECT_EQ(decide(), KKEEP);
EXPECT_EQ(new_list.data()[0], FlinkRescalingCompactionFilter::RescaleRound::Zero);
Deinit(); Deinit();
} }
TEST(FlinkStateRescaleTest, NotCurrentRoundNotInRange) { TEST(FlinkStateRescaleTest, NotCurrentRoundNotInRange) {
Init(FlinkRescalingCompactionFilter::RescaleRound::Zero, Init(FlinkRescalingCompactionFilter::RescaleRound::Zero,
FlinkRescalingCompactionFilter::RescaleRound::One, FlinkRescalingCompactionFilter::RescaleRound::One,
Slice("100"), Slice("111"), Slice("000")); 0, 0, Slice(std::string(new char[5]{1, '0', '0', '0', 0}, 5)));
EXPECT_EQ(decide(), KREMOVE); EXPECT_EQ(decide(), KREMOVE);
Deinit(); Deinit();
Init(FlinkRescalingCompactionFilter::RescaleRound::Zero, Init(FlinkRescalingCompactionFilter::RescaleRound::Zero,
FlinkRescalingCompactionFilter::RescaleRound::One, FlinkRescalingCompactionFilter::RescaleRound::One,
Slice("000"), Slice("110"), Slice("111")); 0, 126, Slice(std::string(new char[5]{127, '0', '0', '0', 0}, 5)));
EXPECT_EQ(decide(), KREMOVE); EXPECT_EQ(decide(), KREMOVE);
Deinit(); Deinit();
} }
......
支持 Markdown
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册