plain_table_key_coding.cc 17.5 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
Siying Dong's avatar
Siying Dong committed
2
3
4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5
6

#ifndef ROCKSDB_LITE
7
#include "table/plain/plain_table_key_coding.h"
8

9
10
#include <algorithm>
#include <string>
11
#include "db/dbformat.h"
12
#include "file/writable_file_writer.h"
13
#include "table/plain/plain_table_factory.h"
14
#include "table/plain/plain_table_reader.h"
15

16
namespace ROCKSDB_NAMESPACE {
17

sdong's avatar
sdong committed
18
enum PlainTableEntryType : unsigned char {
19
20
21
22
23
  kFullKey = 0,
  kPrefixFromPreviousKey = 1,
  kKeySuffix = 2,
};

24
25
namespace {

26
27
28
29
30
31
32
33
// Control byte:
// First two bits indicate type of entry
// Other bytes are inlined sizes. If all bits are 1 (0x03F), overflow bytes
// are used. key_size-0x3F will be encoded as a variint32 after this bytes.

const unsigned char kSizeInlineLimit = 0x3F;

// Return 0 for error
sdong's avatar
sdong committed
34
35
size_t EncodeSize(PlainTableEntryType type, uint32_t key_size,
                  char* out_buffer) {
36
37
  out_buffer[0] = type << 6;

38
  if (key_size < static_cast<uint32_t>(kSizeInlineLimit)) {
39
40
41
42
43
44
45
46
47
    // size inlined
    out_buffer[0] |= static_cast<char>(key_size);
    return 1;
  } else {
    out_buffer[0] |= kSizeInlineLimit;
    char* ptr = EncodeVarint32(out_buffer + 1, key_size - kSizeInlineLimit);
    return ptr - out_buffer;
  }
}
48
}  // namespace
49

50
51
52
53
54
55
56
57
58
59
// Fill bytes_read with number of bytes read.
inline Status PlainTableKeyDecoder::DecodeSize(uint32_t start_offset,
                                               PlainTableEntryType* entry_type,
                                               uint32_t* key_size,
                                               uint32_t* bytes_read) {
  Slice next_byte_slice;
  bool success = file_reader_.Read(start_offset, 1, &next_byte_slice);
  if (!success) {
    return file_reader_.status();
  }
sdong's avatar
sdong committed
60
  *entry_type = static_cast<PlainTableEntryType>(
61
62
63
      (static_cast<unsigned char>(next_byte_slice[0]) & ~kSizeInlineLimit) >>
      6);
  char inline_key_size = next_byte_slice[0] & kSizeInlineLimit;
64
65
  if (inline_key_size < kSizeInlineLimit) {
    *key_size = inline_key_size;
66
67
    *bytes_read = 1;
    return Status::OK();
68
69
  } else {
    uint32_t extra_size;
70
71
72
73
74
    uint32_t tmp_bytes_read;
    success = file_reader_.ReadVarint32(start_offset + 1, &extra_size,
                                        &tmp_bytes_read);
    if (!success) {
      return file_reader_.status();
75
    }
76
    assert(tmp_bytes_read > 0);
77
    *key_size = kSizeInlineLimit + extra_size;
78
79
    *bytes_read = tmp_bytes_read + 1;
    return Status::OK();
80
81
82
  }
}

83
84
85
86
IOStatus PlainTableKeyEncoder::AppendKey(const Slice& key,
                                         WritableFileWriter* file,
                                         uint64_t* offset, char* meta_bytes_buf,
                                         size_t* meta_bytes_buf_size) {
87
88
  ParsedInternalKey parsed_key;
  if (!ParseInternalKey(key, &parsed_key)) {
89
    return IOStatus::Corruption(Slice());
90
91
92
93
  }

  Slice key_to_write = key;  // Portion of internal key to write out.

94
  uint32_t user_key_size = static_cast<uint32_t>(key.size() - 8);
95
96
97
98
99
100
101
  if (encoding_type_ == kPlain) {
    if (fixed_user_key_len_ == kPlainTableVariableLength) {
      // Write key length
      char key_size_buf[5];  // tmp buffer for key size as varint32
      char* ptr = EncodeVarint32(key_size_buf, user_key_size);
      assert(ptr <= key_size_buf + sizeof(key_size_buf));
      auto len = ptr - key_size_buf;
102
103
104
      IOStatus io_s = file->Append(Slice(key_size_buf, len));
      if (!io_s.ok()) {
        return io_s;
105
106
107
108
109
110
111
112
113
114
      }
      *offset += len;
    }
  } else {
    assert(encoding_type_ == kPrefix);
    char size_bytes[12];
    size_t size_bytes_pos = 0;

    Slice prefix =
        prefix_extractor_->Transform(Slice(key.data(), user_key_size));
115
    if (key_count_for_prefix_ == 0 || prefix != pre_prefix_.GetUserKey() ||
wankai's avatar
wankai committed
116
117
        key_count_for_prefix_ % index_sparseness_ == 0) {
      key_count_for_prefix_ = 1;
118
      pre_prefix_.SetUserKey(prefix);
119
      size_bytes_pos += EncodeSize(kFullKey, user_key_size, size_bytes);
120
121
122
      IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos));
      if (!io_s.ok()) {
        return io_s;
123
124
125
      }
      *offset += size_bytes_pos;
    } else {
wankai's avatar
wankai committed
126
127
      key_count_for_prefix_++;
      if (key_count_for_prefix_ == 2) {
128
129
        // For second key within a prefix, need to encode prefix length
        size_bytes_pos +=
130
            EncodeSize(kPrefixFromPreviousKey,
131
                       static_cast<uint32_t>(pre_prefix_.GetUserKey().size()),
132
133
                       size_bytes + size_bytes_pos);
      }
134
135
      uint32_t prefix_len =
          static_cast<uint32_t>(pre_prefix_.GetUserKey().size());
136
137
      size_bytes_pos += EncodeSize(kKeySuffix, user_key_size - prefix_len,
                                   size_bytes + size_bytes_pos);
138
139
140
      IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos));
      if (!io_s.ok()) {
        return io_s;
141
142
143
144
145
146
147
148
149
150
151
      }
      *offset += size_bytes_pos;
      key_to_write = Slice(key.data() + prefix_len, key.size() - prefix_len);
    }
  }

  // Encode full key
  // For value size as varint32 (up to 5 bytes).
  // If the row is of value type with seqId 0, flush the special flag together
  // in this buffer to safe one file append call, which takes 1 byte.
  if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) {
152
    IOStatus io_s =
153
        file->Append(Slice(key_to_write.data(), key_to_write.size() - 8));
154
155
    if (!io_s.ok()) {
      return io_s;
156
157
158
159
160
    }
    *offset += key_to_write.size() - 8;
    meta_bytes_buf[*meta_bytes_buf_size] = PlainTableFactory::kValueTypeSeqId0;
    *meta_bytes_buf_size += 1;
  } else {
161
162
163
164
    IOStatus io_s = file->Append(key_to_write);
    if (!io_s.ok()) {
      return io_s;
    }
165
166
167
    *offset += key_to_write.size();
  }

168
  return IOStatus::OK();
169
170
}

171
172
173
174
175
Slice PlainTableFileReader::GetFromBuffer(Buffer* buffer, uint32_t file_offset,
                                          uint32_t len) {
  assert(file_offset + len <= file_info_->data_end_offset);
  return Slice(buffer->buf.get() + (file_offset - buffer->buf_start_offset),
               len);
176
177
}

178
179
bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len,
                                       Slice* out) {
180
  const uint32_t kPrefetchSize = 256u;
181
182
183
184
185
186
187
188

  // Try to read from buffers.
  for (uint32_t i = 0; i < num_buf_; i++) {
    Buffer* buffer = buffers_[num_buf_ - 1 - i].get();
    if (file_offset >= buffer->buf_start_offset &&
        file_offset + len <= buffer->buf_start_offset + buffer->buf_len) {
      *out = GetFromBuffer(buffer, file_offset, len);
      return true;
189
190
    }
  }
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212

  Buffer* new_buffer;
  // Data needed is not in any of the buffer. Allocate a new buffer.
  if (num_buf_ < buffers_.size()) {
    // Add a new buffer
    new_buffer = new Buffer();
    buffers_[num_buf_++].reset(new_buffer);
  } else {
    // Now simply replace the last buffer. Can improve the placement policy
    // if needed.
    new_buffer = buffers_[num_buf_ - 1].get();
  }

  assert(file_offset + len <= file_info_->data_end_offset);
  uint32_t size_to_read = std::min(file_info_->data_end_offset - file_offset,
                                   std::max(kPrefetchSize, len));
  if (size_to_read > new_buffer->buf_capacity) {
    new_buffer->buf.reset(new char[size_to_read]);
    new_buffer->buf_capacity = size_to_read;
    new_buffer->buf_len = 0;
  }
  Slice read_result;
213
214
215
  Status s =
      file_info_->file->Read(IOOptions(), file_offset, size_to_read,
                             &read_result, new_buffer->buf.get(), nullptr);
216
217
218
219
220
221
222
  if (!s.ok()) {
    status_ = s;
    return false;
  }
  new_buffer->buf_start_offset = file_offset;
  new_buffer->buf_len = size_to_read;
  *out = GetFromBuffer(new_buffer, file_offset, len);
223
224
225
  return true;
}

226
227
inline bool PlainTableFileReader::ReadVarint32(uint32_t offset, uint32_t* out,
                                               uint32_t* bytes_read) {
228
229
230
231
232
233
234
235
236
237
238
239
240
  if (file_info_->is_mmap_mode) {
    const char* start = file_info_->file_data.data() + offset;
    const char* limit =
        file_info_->file_data.data() + file_info_->data_end_offset;
    const char* key_ptr = GetVarint32Ptr(start, limit, out);
    assert(key_ptr != nullptr);
    *bytes_read = static_cast<uint32_t>(key_ptr - start);
    return true;
  } else {
    return ReadVarint32NonMmap(offset, out, bytes_read);
  }
}

241
242
bool PlainTableFileReader::ReadVarint32NonMmap(uint32_t offset, uint32_t* out,
                                               uint32_t* bytes_read) {
243
244
245
246
247
248
249
250
  const char* start;
  const char* limit;
  const uint32_t kMaxVarInt32Size = 6u;
  uint32_t bytes_to_read =
      std::min(file_info_->data_end_offset - offset, kMaxVarInt32Size);
  Slice bytes;
  if (!Read(offset, bytes_to_read, &bytes)) {
    return false;
251
  }
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
  start = bytes.data();
  limit = bytes.data() + bytes.size();

  const char* key_ptr = GetVarint32Ptr(start, limit, out);
  *bytes_read =
      (key_ptr != nullptr) ? static_cast<uint32_t>(key_ptr - start) : 0;
  return true;
}

Status PlainTableKeyDecoder::ReadInternalKey(
    uint32_t file_offset, uint32_t user_key_size, ParsedInternalKey* parsed_key,
    uint32_t* bytes_read, bool* internal_key_valid, Slice* internal_key) {
  Slice tmp_slice;
  bool success = file_reader_.Read(file_offset, user_key_size + 1, &tmp_slice);
  if (!success) {
    return file_reader_.status();
  }
  if (tmp_slice[user_key_size] == PlainTableFactory::kValueTypeSeqId0) {
270
    // Special encoding for the row with seqID=0
271
    parsed_key->user_key = Slice(tmp_slice.data(), user_key_size);
272
273
274
275
276
    parsed_key->sequence = 0;
    parsed_key->type = kTypeValue;
    *bytes_read += user_key_size + 1;
    *internal_key_valid = false;
  } else {
277
278
279
    success = file_reader_.Read(file_offset, user_key_size + 8, internal_key);
    if (!success) {
      return file_reader_.status();
280
281
282
283
284
285
286
287
288
289
290
    }
    *internal_key_valid = true;
    if (!ParseInternalKey(*internal_key, parsed_key)) {
      return Status::Corruption(
          Slice("Incorrect value type found when reading the next key"));
    }
    *bytes_read += user_key_size + 8;
  }
  return Status::OK();
}

291
292
293
294
Status PlainTableKeyDecoder::NextPlainEncodingKey(uint32_t start_offset,
                                                  ParsedInternalKey* parsed_key,
                                                  Slice* internal_key,
                                                  uint32_t* bytes_read,
Andrew Kryczka's avatar
Andrew Kryczka committed
295
                                                  bool* /*seekable*/) {
296
  uint32_t user_key_size = 0;
297
  Status s;
298
299
300
301
  if (fixed_user_key_len_ != kPlainTableVariableLength) {
    user_key_size = fixed_user_key_len_;
  } else {
    uint32_t tmp_size = 0;
302
303
304
305
306
    uint32_t tmp_read;
    bool success =
        file_reader_.ReadVarint32(start_offset, &tmp_size, &tmp_read);
    if (!success) {
      return file_reader_.status();
307
    }
308
    assert(tmp_read > 0);
309
    user_key_size = tmp_size;
310
    *bytes_read = tmp_read;
311
  }
312
313
  // dummy initial value to avoid compiler complain
  bool decoded_internal_key_valid = true;
314
  Slice decoded_internal_key;
315
316
317
  s = ReadInternalKey(start_offset + *bytes_read, user_key_size, parsed_key,
                      bytes_read, &decoded_internal_key_valid,
                      &decoded_internal_key);
318
319
320
  if (!s.ok()) {
    return s;
  }
321
  if (!file_reader_.file_info()->is_mmap_mode) {
322
    cur_key_.SetInternalKey(*parsed_key);
323
324
    parsed_key->user_key =
        Slice(cur_key_.GetInternalKey().data(), user_key_size);
325
    if (internal_key != nullptr) {
326
      *internal_key = cur_key_.GetInternalKey();
327
328
    }
  } else if (internal_key != nullptr) {
329
330
331
332
333
    if (decoded_internal_key_valid) {
      *internal_key = decoded_internal_key;
    } else {
      // Need to copy out the internal key
      cur_key_.SetInternalKey(*parsed_key);
334
      *internal_key = cur_key_.GetInternalKey();
335
336
337
338
339
340
    }
  }
  return Status::OK();
}

Status PlainTableKeyDecoder::NextPrefixEncodingKey(
341
342
    uint32_t start_offset, ParsedInternalKey* parsed_key, Slice* internal_key,
    uint32_t* bytes_read, bool* seekable) {
sdong's avatar
sdong committed
343
  PlainTableEntryType entry_type;
344
345

  bool expect_suffix = false;
346
  Status s;
347
  do {
348
    uint32_t size = 0;
349
350
    // dummy initial value to avoid compiler complain
    bool decoded_internal_key_valid = true;
351
352
353
354
355
356
357
    uint32_t my_bytes_read = 0;
    s = DecodeSize(start_offset + *bytes_read, &entry_type, &size,
                   &my_bytes_read);
    if (!s.ok()) {
      return s;
    }
    if (my_bytes_read == 0) {
358
359
      return Status::Corruption("Unexpected EOF when reading size of the key");
    }
360
    *bytes_read += my_bytes_read;
361
362
363
364
365

    switch (entry_type) {
      case kFullKey: {
        expect_suffix = false;
        Slice decoded_internal_key;
366
367
368
        s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
                            bytes_read, &decoded_internal_key_valid,
                            &decoded_internal_key);
369
370
371
        if (!s.ok()) {
          return s;
        }
372
        if (!file_reader_.file_info()->is_mmap_mode ||
373
374
375
376
377
            (internal_key != nullptr && !decoded_internal_key_valid)) {
          // In non-mmap mode, always need to make a copy of keys returned to
          // users, because after reading value for the key, the key might
          // be invalid.
          cur_key_.SetInternalKey(*parsed_key);
378
          saved_user_key_ = cur_key_.GetUserKey();
379
          if (!file_reader_.file_info()->is_mmap_mode) {
380
381
            parsed_key->user_key =
                Slice(cur_key_.GetInternalKey().data(), size);
382
383
          }
          if (internal_key != nullptr) {
384
            *internal_key = cur_key_.GetInternalKey();
385
          }
386
387
388
389
390
        } else {
          if (internal_key != nullptr) {
            *internal_key = decoded_internal_key;
          }
          saved_user_key_ = parsed_key->user_key;
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
        }
        break;
      }
      case kPrefixFromPreviousKey: {
        if (seekable != nullptr) {
          *seekable = false;
        }
        prefix_len_ = size;
        assert(prefix_extractor_ == nullptr ||
               prefix_extractor_->Transform(saved_user_key_).size() ==
                   prefix_len_);
        // Need read another size flag for suffix
        expect_suffix = true;
        break;
      }
      case kKeySuffix: {
        expect_suffix = false;
        if (seekable != nullptr) {
          *seekable = false;
        }

        Slice tmp_slice;
413
414
415
        s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
                            bytes_read, &decoded_internal_key_valid,
                            &tmp_slice);
416
417
418
        if (!s.ok()) {
          return s;
        }
419
        if (!file_reader_.file_info()->is_mmap_mode) {
420
421
422
423
424
425
426
427
428
429
430
          // In non-mmap mode, we need to make a copy of keys returned to
          // users, because after reading value for the key, the key might
          // be invalid.
          // saved_user_key_ points to cur_key_. We are making a copy of
          // the prefix part to another string, and construct the current
          // key from the prefix part and the suffix part back to cur_key_.
          std::string tmp =
              Slice(saved_user_key_.data(), prefix_len_).ToString();
          cur_key_.Reserve(prefix_len_ + size);
          cur_key_.SetInternalKey(tmp, *parsed_key);
          parsed_key->user_key =
431
432
              Slice(cur_key_.GetInternalKey().data(), prefix_len_ + size);
          saved_user_key_ = cur_key_.GetUserKey();
433
434
435
436
437
        } else {
          cur_key_.Reserve(prefix_len_ + size);
          cur_key_.SetInternalKey(Slice(saved_user_key_.data(), prefix_len_),
                                  *parsed_key);
        }
438
        parsed_key->user_key = cur_key_.GetUserKey();
439
        if (internal_key != nullptr) {
440
          *internal_key = cur_key_.GetInternalKey();
441
442
443
444
        }
        break;
      }
      default:
445
        return Status::Corruption("Un-identified size flag.");
446
447
448
449
450
    }
  } while (expect_suffix);  // Another round if suffix is expected.
  return Status::OK();
}

451
Status PlainTableKeyDecoder::NextKey(uint32_t start_offset,
452
                                     ParsedInternalKey* parsed_key,
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
                                     Slice* internal_key, Slice* value,
                                     uint32_t* bytes_read, bool* seekable) {
  assert(value != nullptr);
  Status s = NextKeyNoValue(start_offset, parsed_key, internal_key, bytes_read,
                            seekable);
  if (s.ok()) {
    assert(bytes_read != nullptr);
    uint32_t value_size;
    uint32_t value_size_bytes;
    bool success = file_reader_.ReadVarint32(start_offset + *bytes_read,
                                             &value_size, &value_size_bytes);
    if (!success) {
      return file_reader_.status();
    }
    if (value_size_bytes == 0) {
      return Status::Corruption(
          "Unexpected EOF when reading the next value's size.");
    }
    *bytes_read += value_size_bytes;
    success = file_reader_.Read(start_offset + *bytes_read, value_size, value);
    if (!success) {
      return file_reader_.status();
    }
    *bytes_read += value_size;
  }
  return s;
}

Status PlainTableKeyDecoder::NextKeyNoValue(uint32_t start_offset,
                                            ParsedInternalKey* parsed_key,
                                            Slice* internal_key,
                                            uint32_t* bytes_read,
                                            bool* seekable) {
486
487
488
489
  *bytes_read = 0;
  if (seekable != nullptr) {
    *seekable = true;
  }
490
  Status s;
491
  if (encoding_type_ == kPlain) {
492
    return NextPlainEncodingKey(start_offset, parsed_key, internal_key,
493
494
495
                                bytes_read, seekable);
  } else {
    assert(encoding_type_ == kPrefix);
496
    return NextPrefixEncodingKey(start_offset, parsed_key, internal_key,
497
498
499
500
                                 bytes_read, seekable);
  }
}

501
}  // namespace ROCKSDB_NAMESPACE
502
#endif  // ROCKSDB_LIT