提交 e515ea70 编辑于 作者: rui xia's avatar rui xia
浏览文件

support ingesting plain sst files

上级 52b3dfba
......@@ -1034,6 +1034,7 @@ if(WITH_TESTS)
db/obsolete_files_test.cc
db/external_sst_file_basic_test.cc
db/external_sst_file_test.cc
db/external_plain_sst_file_test.cc
db/fault_injection_test.cc
db/file_indexer_test.cc
db/filename_test.cc
......
此差异已折叠。
......@@ -33,6 +33,13 @@ Status ExternalSstFileIngestionJob::Prepare(
uint64_t next_file_number, SuperVersion* sv) {
Status status;
if (ingestion_options_.is_plain_file) {
assert(ingestion_options_.smallest_seqnos.size() ==
ingestion_options_.largest_seqnos.size());
assert(ingestion_options_.smallest_seqnos.size() ==
external_files_paths.size());
}
// Read the information of files we are ingesting
for (const std::string& file_path : external_files_paths) {
IngestedFileInfo file_to_ingest;
......@@ -348,6 +355,8 @@ Status ExternalSstFileIngestionJob::Run() {
edit_.SetColumnFamily(cfd_->GetID());
// The levels that the files will be ingested into
size_t file_idx = 0;
SequenceNumber cur_latest_seqno = last_seqno;
for (IngestedFileInfo& f : files_to_ingest_) {
SequenceNumber assigned_seqno = 0;
if (ingestion_options_.ingest_behind) {
......@@ -387,11 +396,35 @@ Status ExternalSstFileIngestionJob::Run() {
static_cast<uint64_t>(temp_current_time);
}
edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
if (ingestion_options_.is_plain_file) {
SequenceNumber smallest_seqno;
if (f.picked_level == 0) {
// if the plain file is going to ingest to L0, we need to carefully
// organize its seqno; otherwise, its seqno can keeped as original
smallest_seqno = std::max(cur_latest_seqno + 1, ingestion_options_.smallest_seqnos[file_idx]);
} else {
smallest_seqno = ingestion_options_.smallest_seqnos[file_idx];
}
SequenceNumber largest_seqno = smallest_seqno +
(ingestion_options_.largest_seqnos[file_idx] - ingestion_options_.smallest_seqnos[file_idx]);
file_idx++;
edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key,
f.largest_internal_key, smallest_seqno, largest_seqno,
false, kInvalidBlobFileNumber, oldest_ancester_time,
current_time, f.file_checksum, f.file_checksum_func_name);
cur_latest_seqno = std::max(largest_seqno, cur_latest_seqno);
} else {
edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key,
f.largest_internal_key, f.assigned_seqno, f.assigned_seqno,
false, kInvalidBlobFileNumber, oldest_ancester_time,
current_time, f.file_checksum, f.file_checksum_func_name);
}
}
if (ingestion_options_.is_plain_file) {
consumed_seqno_count_ = cur_latest_seqno - last_seqno;
}
return status;
}
......@@ -541,9 +574,15 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
// Get table version
auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion);
if (version_iter == uprops.end()) {
return Status::Corruption("External file version not found");
if (!ingestion_options_.is_plain_file) {
return Status::Corruption("External file version not found");
} else {
// version 3 => plain sst file, no use anyway
file_to_ingest->version = (uint32_t)(3);
}
} else {
file_to_ingest->version = DecodeFixed32(version_iter->second.c_str());
}
file_to_ingest->version = DecodeFixed32(version_iter->second.c_str());
auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno);
if (file_to_ingest->version == 2) {
......@@ -572,6 +611,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
return Status::InvalidArgument(
"External SST file V1 does not support global seqno");
}
} else if (file_to_ingest->version == 3) {
// ingesting plain sst files
} else {
return Status::InvalidArgument("External file version is not supported");
}
......@@ -604,7 +645,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
if (key.sequence != 0) {
if (key.sequence != 0 && !ingestion_options_.is_plain_file) {
return Status::Corruption("external file have non zero sequence number");
}
file_to_ingest->smallest_internal_key.SetFrom(key);
......@@ -613,7 +654,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
if (key.sequence != 0) {
if (key.sequence != 0 && !ingestion_options_.is_plain_file) {
return Status::Corruption("external file have non zero sequence number");
}
file_to_ingest->largest_internal_key.SetFrom(key);
......@@ -659,6 +700,14 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest,
SequenceNumber* assigned_seqno) {
if (ingestion_options_.is_plain_file && ingestion_options_.target_level_for_plain_files != -1) {
if (files_overlap_) {
ROCKS_LOG_WARN(db_options_.info_log, "Plain files to ingest is overlapped");
}
file_to_ingest->picked_level = ingestion_options_.target_level_for_plain_files;
return Status::OK();
}
Status status;
*assigned_seqno = 0;
if (force_global_seqno) {
......@@ -771,6 +820,11 @@ Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
IngestedFileInfo* file_to_ingest, SequenceNumber seqno) {
if (file_to_ingest->version == 3) {
// ingest plain SST file, no need to worry about seqno.
return Status::OK();
}
if (file_to_ingest->original_seqno == seqno) {
// This file already have the correct global seqno
return Status::OK();
......
......@@ -1591,6 +1591,14 @@ struct IngestExternalFileOptions {
// ingestion. However, if no checksum information is provided with the
// ingested files, DB will generate the checksum and store in the Manifest.
bool verify_file_checksum = true;
// Set to TRUE when ingesting plain SST files from other db instances.
bool is_plain_file = false;
// smallest seqno of all ingested plain files.
std::vector<SequenceNumber> smallest_seqnos;
// largest seqno of all ingested plain files.
std::vector<SequenceNumber> largest_seqnos;
// target level if ingesting plain files.
int target_level_for_plain_files = -1;
};
enum TraceFilterType : uint64_t {
......
......@@ -194,3 +194,89 @@ void Java_org_rocksdb_IngestExternalFileOptions_disposeInternal(
reinterpret_cast<ROCKSDB_NAMESPACE::IngestExternalFileOptions*>(jhandle);
delete options;
}
/*
* Class: org_rocksdb_IngestExternalFileOptions
* Method: isPlainFile
* Signature: (J)Z
*/
JNIEXPORT jboolean JNICALL Java_org_rocksdb_IngestExternalFileOptions_isPlainFile
(JNIEnv *, jobject, jlong jhandle) {
auto* options =
reinterpret_cast<ROCKSDB_NAMESPACE::IngestExternalFileOptions*>(jhandle);
return options->is_plain_file = JNI_TRUE;
}
/*
* Class: org_rocksdb_IngestExternalFileOptions
* Method: setIsPlainFile
* Signature: (JZ)V
*/
JNIEXPORT void JNICALL Java_org_rocksdb_IngestExternalFileOptions_setIsPlainFile
(JNIEnv *, jobject, jlong jhandle, jboolean jis_plain_file) {
auto* options =
reinterpret_cast<ROCKSDB_NAMESPACE::IngestExternalFileOptions*>(jhandle);
options->is_plain_file = jis_plain_file == JNI_TRUE;
}
/*
* Class: org_rocksdb_IngestExternalFileOptions
* Method: targetLevel
* Signature: (J)I
*/
JNIEXPORT jint JNICALL Java_org_rocksdb_IngestExternalFileOptions_targetLevel
(JNIEnv *, jobject, jlong jhandle) {
auto* options =
reinterpret_cast<ROCKSDB_NAMESPACE::IngestExternalFileOptions*>(jhandle);
return reinterpret_cast<jint>(options->target_level_for_plain_files);
}
/*
* Class: org_rocksdb_IngestExternalFileOptions
* Method: setTargetLevel
* Signature: (JI)V
*/
JNIEXPORT void JNICALL Java_org_rocksdb_IngestExternalFileOptions_setTargetLevel
(JNIEnv *, jobject, jlong jhandle, jint jtarget_level) {
auto* options =
reinterpret_cast<ROCKSDB_NAMESPACE::IngestExternalFileOptions*>(jhandle);
options->target_level_for_plain_files = static_cast<int>(jtarget_level);
}
/*
* Class: org_rocksdb_IngestExternalFileOptions
* Method: setSmallestSeqnos
* Signature: (J[JI)V
*/
JNIEXPORT void JNICALL Java_org_rocksdb_IngestExternalFileOptions_setSmallestSeqnos
(JNIEnv * env, jobject, jlong jhandle,
jlongArray jsmallest_seqnos, jint jsmallest_seqnos_len) {
auto* options =
reinterpret_cast<ROCKSDB_NAMESPACE::IngestExternalFileOptions*>(jhandle);
std::vector<ROCKSDB_NAMESPACE::SequenceNumber>& smallest_seqnos_list = options->smallest_seqnos;
int smallest_seqnos_len = static_cast<int>(jsmallest_seqnos_len);
jlong* smallest_seqnos = env->GetLongArrayElements(jsmallest_seqnos, nullptr);
for (int i = 0; i < smallest_seqnos_len; i++) {
smallest_seqnos_list.push_back(static_cast<uint64_t>(smallest_seqnos[i]));
}
}
/*
* Class: org_rocksdb_IngestExternalFileOptions
* Method: setLargestSeqnos
* Signature: (J[JI)V
*/
JNIEXPORT void JNICALL Java_org_rocksdb_IngestExternalFileOptions_setLargestSeqnos
(JNIEnv * env, jobject, jlong jhandle,
jlongArray jlargest_seqnos, jint jlargest_seqnos_len) {
auto* options =
reinterpret_cast<ROCKSDB_NAMESPACE::IngestExternalFileOptions*>(jhandle);
std::vector<ROCKSDB_NAMESPACE::SequenceNumber>& largest_seqnos_list = options->largest_seqnos;
int largest_seqnos_len = static_cast<int>(jlargest_seqnos_len);
jlong* largest_seqnos = env->GetLongArrayElements(jlargest_seqnos, nullptr);
for (int i = 0; i < largest_seqnos_len; i++) {
largest_seqnos_list.push_back(static_cast<uint64_t>(largest_seqnos[i]));
}
}
\ No newline at end of file
......@@ -201,6 +201,46 @@ public class IngestExternalFileOptions extends RocksObject {
return this;
}
/**
* Whether the ingested files are plain files.
*/
public boolean isPlainFile() {
return isPlainFile(nativeHandle_);
}
public IngestExternalFileOptions setIsPlainFile(final boolean isPlainFile) {
setIsPlainFile(nativeHandle_, isPlainFile);
return this;
}
/**
* The target level for ingested files.
*/
public int targetLevel() {
return targetLevel(nativeHandle_);
}
public IngestExternalFileOptions setTargetLevel(final int targetLevel) {
setTargetLevel(nativeHandle_, targetLevel);
return this;
}
/**
* Smallest seqnos for each ingested file.
*/
public IngestExternalFileOptions setSmallestSeqnos(final long[] smallestSeqnos) {
setSmallestSeqnos(nativeHandle_, smallestSeqnos, smallestSeqnos.length);
return this;
}
/**
* Largest seqnos for each ingested file.
*/
public IngestExternalFileOptions setLargestSeqnos(final long[] largestSeqnos) {
setLargestSeqnos(nativeHandle_, largestSeqnos, largestSeqnos.length);
return this;
}
private native static long newIngestExternalFileOptions();
private native static long newIngestExternalFileOptions(
final boolean moveFiles, final boolean snapshotConsistency,
......@@ -224,4 +264,14 @@ public class IngestExternalFileOptions extends RocksObject {
private native boolean writeGlobalSeqno(final long handle);
private native void setWriteGlobalSeqno(final long handle,
final boolean writeGlobalSeqNo);
private native boolean isPlainFile(final long handle);
private native void setIsPlainFile(final long handle,
final boolean isPlainFile);
private native int targetLevel(final long handle);
private native void setTargetLevel(final long handle,
final int targetLevel);
private native void setSmallestSeqnos(final long handle,
final long[] smallestSeqnos, final int smallestSeqnosLen);
private native void setLargestSeqnos(final long handle,
final long[] largestSeqnos, final int largestSeqnosLen);
}
Supports Markdown
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册