pessimistic_transaction.cc 23.8 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
Siying Dong's avatar
Siying Dong committed
2
3
4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
agiardullo's avatar
agiardullo committed
5
6
7

#ifndef ROCKSDB_LITE

8
#include "utilities/transactions/pessimistic_transaction.h"
agiardullo's avatar
agiardullo committed
9
10
11
12
13
14
15

#include <map>
#include <set>
#include <string>
#include <vector>

#include "db/column_family.h"
16
#include "db/db_impl/db_impl.h"
agiardullo's avatar
agiardullo committed
17
18
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
19
#include "rocksdb/snapshot.h"
agiardullo's avatar
agiardullo committed
20
21
#include "rocksdb/status.h"
#include "rocksdb/utilities/transaction_db.h"
22
#include "test_util/sync_point.h"
Siying Dong's avatar
Siying Dong committed
23
#include "util/cast_util.h"
agiardullo's avatar
agiardullo committed
24
#include "util/string_util.h"
Maysam Yabandeh's avatar
Maysam Yabandeh committed
25
#include "utilities/transactions/pessimistic_transaction_db.h"
agiardullo's avatar
agiardullo committed
26
27
#include "utilities/transactions/transaction_util.h"

28
namespace ROCKSDB_NAMESPACE {
agiardullo's avatar
agiardullo committed
29
30
31

struct WriteOptions;

32
std::atomic<TransactionID> PessimisticTransaction::txn_id_counter_(1);
agiardullo's avatar
agiardullo committed
33

34
TransactionID PessimisticTransaction::GenTxnID() {
agiardullo's avatar
agiardullo committed
35
36
37
  return txn_id_counter_.fetch_add(1);
}

38
39
PessimisticTransaction::PessimisticTransaction(
    TransactionDB* txn_db, const WriteOptions& write_options,
40
    const TransactionOptions& txn_options, const bool init)
41
    : TransactionBaseImpl(txn_db->GetRootDB(), write_options),
agiardullo's avatar
agiardullo committed
42
      txn_db_impl_(nullptr),
Maysam Yabandeh's avatar
Maysam Yabandeh committed
43
      expiration_time_(0),
44
      txn_id_(0),
45
46
      waiting_cf_id_(0),
      waiting_key_(nullptr),
Manuel Ung's avatar
Manuel Ung committed
47
48
      lock_timeout_(0),
      deadlock_detect_(false),
49
50
      deadlock_detect_depth_(0),
      skip_concurrency_control_(false) {
51
52
  txn_db_impl_ = static_cast_with_check<PessimisticTransactionDB>(txn_db);
  db_impl_ = static_cast_with_check<DBImpl>(db_);
53
54
55
  if (init) {
    Initialize(txn_options);
  }
56
57
}

58
void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
59
60
  txn_id_ = GenTxnID();

61
  txn_state_ = STARTED;
62

Manuel Ung's avatar
Manuel Ung committed
63
64
  deadlock_detect_ = txn_options.deadlock_detect;
  deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
65
  write_batch_.SetMaxBytes(txn_options.max_write_batch_size);
66
  skip_concurrency_control_ = txn_options.skip_concurrency_control;
Manuel Ung's avatar
Manuel Ung committed
67

68
  lock_timeout_ = txn_options.lock_timeout * 1000;
agiardullo's avatar
agiardullo committed
69
70
  if (lock_timeout_ < 0) {
    // Lock timeout not set, use default
agiardullo's avatar
agiardullo committed
71
72
    lock_timeout_ =
        txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;
agiardullo's avatar
agiardullo committed
73
74
  }

75
76
77
78
79
80
  if (txn_options.expiration >= 0) {
    expiration_time_ = start_time_ + txn_options.expiration * 1000;
  } else {
    expiration_time_ = 0;
  }

agiardullo's avatar
agiardullo committed
81
82
83
  if (txn_options.set_snapshot) {
    SetSnapshot();
  }
84

85
86
87
  if (expiration_time_ > 0) {
    txn_db_impl_->InsertExpirableTransaction(txn_id_, this);
  }
88
89
  use_only_the_last_commit_time_batch_for_recovery_ =
      txn_options.use_only_the_last_commit_time_batch_for_recovery;
90
  skip_prepare_ = txn_options.skip_prepare;
agiardullo's avatar
agiardullo committed
91
92
}

93
PessimisticTransaction::~PessimisticTransaction() {
94
  txn_db_impl_->UnLock(this, &GetTrackedKeys());
95
96
97
  if (expiration_time_ > 0) {
    txn_db_impl_->RemoveExpirableTransaction(txn_id_);
  }
98
  if (!name_.empty() && txn_state_ != COMMITTED) {
Reid Horuff's avatar
Reid Horuff committed
99
100
    txn_db_impl_->UnregisterTransaction(this);
  }
agiardullo's avatar
agiardullo committed
101
102
}

103
void PessimisticTransaction::Clear() {
104
  txn_db_impl_->UnLock(this, &GetTrackedKeys());
agiardullo's avatar
agiardullo committed
105
  TransactionBaseImpl::Clear();
agiardullo's avatar
agiardullo committed
106
107
}

108
109
110
void PessimisticTransaction::Reinitialize(
    TransactionDB* txn_db, const WriteOptions& write_options,
    const TransactionOptions& txn_options) {
111
  if (!name_.empty() && txn_state_ != COMMITTED) {
Reid Horuff's avatar
Reid Horuff committed
112
113
    txn_db_impl_->UnregisterTransaction(this);
  }
114
  TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);
115
116
117
  Initialize(txn_options);
}

118
bool PessimisticTransaction::IsExpired() const {
agiardullo's avatar
agiardullo committed
119
  if (expiration_time_ > 0) {
agiardullo's avatar
agiardullo committed
120
    if (db_->GetEnv()->NowMicros() >= expiration_time_) {
agiardullo's avatar
agiardullo committed
121
122
123
124
125
126
127
128
      // Transaction is expired.
      return true;
    }
  }

  return false;
}

129
130
131
WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
                                     const WriteOptions& write_options,
                                     const TransactionOptions& txn_options)
132
    : PessimisticTransaction(txn_db, write_options, txn_options){};
Maysam Yabandeh's avatar
Maysam Yabandeh committed
133

134
Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
agiardullo's avatar
agiardullo committed
135
136
137
  TransactionKeyMap keys_to_unlock;
  Status s = LockBatch(batch, &keys_to_unlock);

Reid Horuff's avatar
Reid Horuff committed
138
139
140
  if (!s.ok()) {
    return s;
  }
agiardullo's avatar
agiardullo committed
141

Reid Horuff's avatar
Reid Horuff committed
142
143
144
145
146
  bool can_commit = false;

  if (IsExpired()) {
    s = Status::Expired();
  } else if (expiration_time_ > 0) {
147
148
    TransactionState expected = STARTED;
    can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected,
Reid Horuff's avatar
Reid Horuff committed
149
                                                     AWAITING_COMMIT);
150
  } else if (txn_state_ == STARTED) {
Reid Horuff's avatar
Reid Horuff committed
151
152
153
154
155
    // lock stealing is not a concern
    can_commit = true;
  }

  if (can_commit) {
156
    txn_state_.store(AWAITING_COMMIT);
157
    s = CommitBatchInternal(batch);
Reid Horuff's avatar
Reid Horuff committed
158
    if (s.ok()) {
159
      txn_state_.store(COMMITTED);
Reid Horuff's avatar
Reid Horuff committed
160
    }
161
  } else if (txn_state_ == LOCKS_STOLEN) {
Reid Horuff's avatar
Reid Horuff committed
162
163
164
    s = Status::Expired();
  } else {
    s = Status::InvalidArgument("Transaction is not in state for commit.");
agiardullo's avatar
agiardullo committed
165
166
  }

Reid Horuff's avatar
Reid Horuff committed
167
168
  txn_db_impl_->UnLock(this, &keys_to_unlock);

agiardullo's avatar
agiardullo committed
169
170
171
  return s;
}

172
Status PessimisticTransaction::Prepare() {
Reid Horuff's avatar
Reid Horuff committed
173
174
175
176
177
178
  Status s;

  if (name_.empty()) {
    return Status::InvalidArgument(
        "Cannot prepare a transaction that has not been named.");
  }
agiardullo's avatar
agiardullo committed
179

Reid Horuff's avatar
Reid Horuff committed
180
181
182
183
184
185
186
187
188
  if (IsExpired()) {
    return Status::Expired();
  }

  bool can_prepare = false;

  if (expiration_time_ > 0) {
    // must concern ourselves with expiraton and/or lock stealing
    // need to compare/exchange bc locks could be stolen under us here
189
190
    TransactionState expected = STARTED;
    can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected,
Reid Horuff's avatar
Reid Horuff committed
191
                                                      AWAITING_PREPARE);
192
  } else if (txn_state_ == STARTED) {
Reid Horuff's avatar
Reid Horuff committed
193
    // expiration and lock stealing is not possible
194
    txn_state_.store(AWAITING_PREPARE);
Reid Horuff's avatar
Reid Horuff committed
195
196
197
198
199
200
    can_prepare = true;
  }

  if (can_prepare) {
    // transaction can't expire after preparation
    expiration_time_ = 0;
201
202
    assert(log_number_ == 0 ||
           txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
203

204
    s = PrepareInternal();
Reid Horuff's avatar
Reid Horuff committed
205
    if (s.ok()) {
206
      txn_state_.store(PREPARED);
Reid Horuff's avatar
Reid Horuff committed
207
    }
208
  } else if (txn_state_ == LOCKS_STOLEN) {
Reid Horuff's avatar
Reid Horuff committed
209
    s = Status::Expired();
210
  } else if (txn_state_ == PREPARED) {
Reid Horuff's avatar
Reid Horuff committed
211
    s = Status::InvalidArgument("Transaction has already been prepared.");
212
  } else if (txn_state_ == COMMITTED) {
Reid Horuff's avatar
Reid Horuff committed
213
    s = Status::InvalidArgument("Transaction has already been committed.");
214
  } else if (txn_state_ == ROLLEDBACK) {
Reid Horuff's avatar
Reid Horuff committed
215
216
217
218
    s = Status::InvalidArgument("Transaction has already been rolledback.");
  } else {
    s = Status::InvalidArgument("Transaction is not in state for commit.");
  }
agiardullo's avatar
agiardullo committed
219
220
221
222

  return s;
}

223
224
225
226
Status WriteCommittedTxn::PrepareInternal() {
  WriteOptions write_options = write_options_;
  write_options.disableWAL = false;
  WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
227
228
229
230
231
232
233
  class MarkLogCallback : public PreReleaseCallback {
   public:
    MarkLogCallback(DBImpl* db, bool two_write_queues)
        : db_(db), two_write_queues_(two_write_queues) {
      (void)two_write_queues_;  // to silence unused private field warning
    }
    virtual Status Callback(SequenceNumber, bool is_mem_disabled,
234
235
                            uint64_t log_number, size_t /*index*/,
                            size_t /*total*/) override {
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#ifdef NDEBUG
      (void)is_mem_disabled;
#endif
      assert(log_number != 0);
      assert(!two_write_queues_ || is_mem_disabled);  // implies the 2nd queue
      db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number);
      return Status::OK();
    }

   private:
    DBImpl* db_;
    bool two_write_queues_;
  } mark_log_callback(db_impl_,
                      db_impl_->immutable_db_options().two_write_queues);

  WriteCallback* const kNoWriteCallback = nullptr;
  const uint64_t kRefNoLog = 0;
  const bool kDisableMemtable = true;
  SequenceNumber* const KIgnoreSeqUsed = nullptr;
  const size_t kNoBatchCount = 0;
  Status s = db_impl_->WriteImpl(
      write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback,
      &log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
      &mark_log_callback);
260
261
262
263
  return s;
}

Status PessimisticTransaction::Commit() {
agiardullo's avatar
agiardullo committed
264
  Status s;
265
  bool commit_without_prepare = false;
Reid Horuff's avatar
Reid Horuff committed
266
  bool commit_prepared = false;
agiardullo's avatar
agiardullo committed
267

Reid Horuff's avatar
Reid Horuff committed
268
269
270
  if (IsExpired()) {
    return Status::Expired();
  }
271

Reid Horuff's avatar
Reid Horuff committed
272
273
274
275
276
277
  if (expiration_time_ > 0) {
    // we must atomicaly compare and exchange the state here because at
    // this state in the transaction it is possible for another thread
    // to change our state out from under us in the even that we expire and have
    // our locks stolen. In this case the only valid state is STARTED because
    // a state of PREPARED would have a cleared expiration_time_.
278
    TransactionState expected = STARTED;
279
280
    commit_without_prepare = std::atomic_compare_exchange_strong(
        &txn_state_, &expected, AWAITING_COMMIT);
281
    TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
282
  } else if (txn_state_ == PREPARED) {
Reid Horuff's avatar
Reid Horuff committed
283
284
    // expiration and lock stealing is not a concern
    commit_prepared = true;
285
  } else if (txn_state_ == STARTED) {
Reid Horuff's avatar
Reid Horuff committed
286
    // expiration and lock stealing is not a concern
287
288
289
290
291
    if (skip_prepare_) {
      commit_without_prepare = true;
    } else {
      return Status::TxnNotPrepared();
    }
Reid Horuff's avatar
Reid Horuff committed
292
  }
293

294
  if (commit_without_prepare) {
Reid Horuff's avatar
Reid Horuff committed
295
296
297
298
    assert(!commit_prepared);
    if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
      s = Status::InvalidArgument(
          "Commit-time batch contains values that will not be committed.");
299
    } else {
300
      txn_state_.store(AWAITING_COMMIT);
301
302
303
304
      if (log_number_ > 0) {
        dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
            log_number_);
      }
305
      s = CommitWithoutPrepareInternal();
306
307
308
      if (!name_.empty()) {
        txn_db_impl_->UnregisterTransaction(this);
      }
Reid Horuff's avatar
Reid Horuff committed
309
310
      Clear();
      if (s.ok()) {
311
        txn_state_.store(COMMITTED);
Reid Horuff's avatar
Reid Horuff committed
312
      }
313
    }
Reid Horuff's avatar
Reid Horuff committed
314
  } else if (commit_prepared) {
315
    txn_state_.store(AWAITING_COMMIT);
Reid Horuff's avatar
Reid Horuff committed
316

317
    s = CommitInternal();
318

Reid Horuff's avatar
Reid Horuff committed
319
    if (!s.ok()) {
320
321
      ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
                     "Commit write failed");
Reid Horuff's avatar
Reid Horuff committed
322
323
324
325
326
327
328
      return s;
    }

    // FindObsoleteFiles must now look to the memtables
    // to determine what prep logs must be kept around,
    // not the prep section heap.
    assert(log_number_ > 0);
329
330
    dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
        log_number_);
Reid Horuff's avatar
Reid Horuff committed
331
332
333
    txn_db_impl_->UnregisterTransaction(this);

    Clear();
334
    txn_state_.store(COMMITTED);
335
  } else if (txn_state_ == LOCKS_STOLEN) {
Reid Horuff's avatar
Reid Horuff committed
336
    s = Status::Expired();
337
  } else if (txn_state_ == COMMITTED) {
Reid Horuff's avatar
Reid Horuff committed
338
    s = Status::InvalidArgument("Transaction has already been committed.");
339
  } else if (txn_state_ == ROLLEDBACK) {
Reid Horuff's avatar
Reid Horuff committed
340
    s = Status::InvalidArgument("Transaction has already been rolledback.");
agiardullo's avatar
agiardullo committed
341
  } else {
Reid Horuff's avatar
Reid Horuff committed
342
    s = Status::InvalidArgument("Transaction is not in state for commit.");
agiardullo's avatar
agiardullo committed
343
344
345
346
347
  }

  return s;
}

348
Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
349
350
351
352
353
354
355
356
357
  uint64_t seq_used = kMaxSequenceNumber;
  auto s =
      db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(),
                          /*callback*/ nullptr, /*log_used*/ nullptr,
                          /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used);
  assert(!s.ok() || seq_used != kMaxSequenceNumber);
  if (s.ok()) {
    SetId(seq_used);
  }
358
359
360
  return s;
}

361
Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {
362
363
364
365
366
367
368
369
  uint64_t seq_used = kMaxSequenceNumber;
  auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr,
                               /*log_used*/ nullptr, /*log_ref*/ 0,
                               /*disable_memtable*/ false, &seq_used);
  assert(!s.ok() || seq_used != kMaxSequenceNumber);
  if (s.ok()) {
    SetId(seq_used);
  }
370
371
372
  return s;
}

373
374
375
376
377
378
379
380
381
382
383
384
385
386
Status WriteCommittedTxn::CommitInternal() {
  // We take the commit-time batch and append the Commit marker.
  // The Memtable will ignore the Commit marker in non-recovery mode
  WriteBatch* working_batch = GetCommitTimeWriteBatch();
  WriteBatchInternal::MarkCommit(working_batch, name_);

  // any operations appended to this working_batch will be ignored from WAL
  working_batch->MarkWalTerminationPoint();

  // insert prepared batch into Memtable only skipping WAL.
  // Memtable will ignore BeginPrepare/EndPrepare markers
  // in non recovery mode and simply insert the values
  WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());

387
388
389
390
391
392
393
394
395
  uint64_t seq_used = kMaxSequenceNumber;
  auto s =
      db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
                          /*log_used*/ nullptr, /*log_ref*/ log_number_,
                          /*disable_memtable*/ false, &seq_used);  
  assert(!s.ok() || seq_used != kMaxSequenceNumber);
  if (s.ok()) {
    SetId(seq_used);
  }
396
397
398
  return s;
}

Maysam Yabandeh's avatar
Maysam Yabandeh committed
399
Status PessimisticTransaction::Rollback() {
Reid Horuff's avatar
Reid Horuff committed
400
  Status s;
401
402
  if (txn_state_ == PREPARED) {
    txn_state_.store(AWAITING_ROLLBACK);
Maysam Yabandeh's avatar
Maysam Yabandeh committed
403
404
405

    s = RollbackInternal();

Reid Horuff's avatar
Reid Horuff committed
406
407
408
    if (s.ok()) {
      // we do not need to keep our prepared section around
      assert(log_number_ > 0);
409
410
      dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
          log_number_);
Reid Horuff's avatar
Reid Horuff committed
411
      Clear();
412
      txn_state_.store(ROLLEDBACK);
Reid Horuff's avatar
Reid Horuff committed
413
    }
414
  } else if (txn_state_ == STARTED) {
415
416
417
418
419
420
421
422
423
424
    if (log_number_ > 0) {
      assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
      assert(GetId() > 0);
      s = RollbackInternal();

      if (s.ok()) {
        dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
            log_number_);
      }
    }
Reid Horuff's avatar
Reid Horuff committed
425
426
    // prepare couldn't have taken place
    Clear();
427
  } else if (txn_state_ == COMMITTED) {
Reid Horuff's avatar
Reid Horuff committed
428
429
430
431
432
433
434
435
    s = Status::InvalidArgument("This transaction has already been committed.");
  } else {
    s = Status::InvalidArgument(
        "Two phase transaction is not in state for rollback.");
  }

  return s;
}
Maysam Yabandeh's avatar
Maysam Yabandeh committed
436
437
438
439
440
441
442

Status WriteCommittedTxn::RollbackInternal() {
  WriteBatch rollback_marker;
  WriteBatchInternal::MarkRollback(&rollback_marker, name_);
  auto s = db_impl_->WriteImpl(write_options_, &rollback_marker);
  return s;
}
agiardullo's avatar
agiardullo committed
443

444
Status PessimisticTransaction::RollbackToSavePoint() {
445
  if (txn_state_ != STARTED) {
Reid Horuff's avatar
Reid Horuff committed
446
447
448
    return Status::InvalidArgument("Transaction is beyond state for rollback.");
  }

449
  // Unlock any keys locked since last transaction
agiardullo's avatar
agiardullo committed
450
451
452
  const std::unique_ptr<TransactionKeyMap>& keys =
      GetTrackedKeysSinceSavePoint();

453
  if (keys) {
agiardullo's avatar
agiardullo committed
454
    txn_db_impl_->UnLock(this, keys.get());
455
456
457
458
459
  }

  return TransactionBaseImpl::RollbackToSavePoint();
}

agiardullo's avatar
agiardullo committed
460
461
// Lock all keys in this batch.
// On success, caller should unlock keys_to_unlock
462
Status PessimisticTransaction::LockBatch(WriteBatch* batch,
463
                                         TransactionKeyMap* keys_to_unlock) {
agiardullo's avatar
agiardullo committed
464
465
466
467
468
469
470
471
472
473
474
475
476
  class Handler : public WriteBatch::Handler {
   public:
    // Sorted map of column_family_id to sorted set of keys.
    // Since LockBatch() always locks keys in sorted order, it cannot deadlock
    // with itself.  We're not using a comparator here since it doesn't matter
    // what the sorting is as long as it's consistent.
    std::map<uint32_t, std::set<std::string>> keys_;

    Handler() {}

    void RecordKey(uint32_t column_family_id, const Slice& key) {
      std::string key_str = key.ToString();

477
478
479
      auto& cfh_keys = keys_[column_family_id];
      auto iter = cfh_keys.find(key_str);
      if (iter == cfh_keys.end()) {
agiardullo's avatar
agiardullo committed
480
        // key not yet seen, store it.
481
        cfh_keys.insert({std::move(key_str)});
agiardullo's avatar
agiardullo committed
482
483
484
      }
    }

485
486
    Status PutCF(uint32_t column_family_id, const Slice& key,
                 const Slice& /* unused */) override {
agiardullo's avatar
agiardullo committed
487
488
489
      RecordKey(column_family_id, key);
      return Status::OK();
    }
490
491
    Status MergeCF(uint32_t column_family_id, const Slice& key,
                   const Slice& /* unused */) override {
agiardullo's avatar
agiardullo committed
492
493
494
      RecordKey(column_family_id, key);
      return Status::OK();
    }
495
    Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
agiardullo's avatar
agiardullo committed
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
      RecordKey(column_family_id, key);
      return Status::OK();
    }
  };

  // Iterating on this handler will add all keys in this batch into keys
  Handler handler;
  batch->Iterate(&handler);

  Status s;

  // Attempt to lock all keys
  for (const auto& cf_iter : handler.keys_) {
    uint32_t cfh_id = cf_iter.first;
    auto& cfh_keys = cf_iter.second;

    for (const auto& key_iter : cfh_keys) {
      const std::string& key = key_iter;

Manuel Ung's avatar
Manuel Ung committed
515
      s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */);
agiardullo's avatar
agiardullo committed
516
517
518
      if (!s.ok()) {
        break;
      }
agiardullo's avatar
agiardullo committed
519
      TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
Manuel Ung's avatar
Manuel Ung committed
520
               false, true /* exclusive */);
agiardullo's avatar
agiardullo committed
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
    }

    if (!s.ok()) {
      break;
    }
  }

  if (!s.ok()) {
    txn_db_impl_->UnLock(this, keys_to_unlock);
  }

  return s;
}

// Attempt to lock this key.
// Returns OK if the key has been successfully locked.  Non-ok, otherwise.
// If check_shapshot is true and this transaction has a snapshot set,
// this key will only be locked if there have been no writes to this key since
// the snapshot time.
540
Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
541
                                       const Slice& key, bool read_only,
542
543
544
                                       bool exclusive, const bool do_validate,
                                       const bool assume_tracked) {
  assert(!assume_tracked || !do_validate);
545
546
547
548
  Status s;
  if (UNLIKELY(skip_concurrency_control_)) {
    return s;
  }
agiardullo's avatar
agiardullo committed
549
550
551
  uint32_t cfh_id = GetColumnFamilyID(column_family);
  std::string key_str = key.ToString();
  bool previously_locked;
Manuel Ung's avatar
Manuel Ung committed
552
  bool lock_upgrade = false;
agiardullo's avatar
agiardullo committed
553

554
  // lock this key if this transactions hasn't already locked it
555
  SequenceNumber tracked_at_seq = kMaxSequenceNumber;
556
557
558
559

  const auto& tracked_keys = GetTrackedKeys();
  const auto tracked_keys_cf = tracked_keys.find(cfh_id);
  if (tracked_keys_cf == tracked_keys.end()) {
agiardullo's avatar
agiardullo committed
560
    previously_locked = false;
561
562
563
564
565
  } else {
    auto iter = tracked_keys_cf->second.find(key_str);
    if (iter == tracked_keys_cf->second.end()) {
      previously_locked = false;
    } else {
Manuel Ung's avatar
Manuel Ung committed
566
567
568
      if (!iter->second.exclusive && exclusive) {
        lock_upgrade = true;
      }
569
      previously_locked = true;
570
      tracked_at_seq = iter->second.seq;
571
572
    }
  }
agiardullo's avatar
agiardullo committed
573

Manuel Ung's avatar
Manuel Ung committed
574
575
576
  // Lock this key if this transactions hasn't already locked it or we require
  // an upgrade.
  if (!previously_locked || lock_upgrade) {
Manuel Ung's avatar
Manuel Ung committed
577
    s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
agiardullo's avatar
agiardullo committed
578
579
  }

580
581
582
583
584
585
586
587
  SetSnapshotIfNeeded();

  // Even though we do not care about doing conflict checking for this write,
  // we still need to take a lock to make sure we do not cause a conflict with
  // some other write.  However, we do not need to check if there have been
  // any writes since this transaction's snapshot.
  // TODO(agiardullo): could optimize by supporting shared txn locks in the
  // future
588
589
590
591
592
  if (!do_validate || snapshot_ == nullptr) {
    if (assume_tracked && !previously_locked) {
      s = Status::InvalidArgument(
          "assume_tracked is set but it is not tracked yet");
    }
593
594
595
596
    // Need to remember the earliest sequence number that we know that this
    // key has not been modified after.  This is useful if this same
    // transaction
    // later tries to lock this key again.
597
    if (tracked_at_seq == kMaxSequenceNumber) {
598
599
      // Since we haven't checked a snapshot, we only know this key has not
      // been modified since after we locked it.
600
      // Note: when last_seq_same_as_publish_seq_==false this is less than the
601
602
603
604
605
      // latest allocated seq but it is ok since i) this is just a heuristic
      // used only as a hint to avoid actual check for conflicts, ii) this would
      // cause a false positive only if the snapthot is taken right after the
      // lock, which would be an unusual sequence.
      tracked_at_seq = db_->GetLatestSequenceNumber();
606
607
    }
  } else {
agiardullo's avatar
agiardullo committed
608
609
    // If a snapshot is set, we need to make sure the key hasn't been modified
    // since the snapshot.  This must be done after we locked the key.
610
611
    // If we already have validated an earilier snapshot it must has been
    // reflected in tracked_at_seq and ValidateSnapshot will return OK.
612
    if (s.ok()) {
613
      s = ValidateSnapshot(column_family, key, &tracked_at_seq);
614
615
616
617
618

      if (!s.ok()) {
        // Failed to validate key
        if (!previously_locked) {
          // Unlock key we just locked
Manuel Ung's avatar
Manuel Ung committed
619
620
621
622
623
624
625
          if (lock_upgrade) {
            s = txn_db_impl_->TryLock(this, cfh_id, key_str,
                                      false /* exclusive */);
            assert(s.ok());
          } else {
            txn_db_impl_->UnLock(this, cfh_id, key.ToString());
          }
agiardullo's avatar
agiardullo committed
626
627
628
629
630
        }
      }
    }
  }

631
  if (s.ok()) {
632
633
    // We must track all the locked keys so that we can unlock them later. If
    // the key is already locked, this func will update some stats on the
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
    // tracked key. It could also update the tracked_at_seq if it is lower
    // than the existing tracked key seq. These stats are necessary for
    // RollbackToSavePoint to determine whether a key can be safely removed
    // from tracked_keys_. Removal can only be done if a key was only locked
    // during the current savepoint.
    //
    // Recall that if assume_tracked is true, we assume that TrackKey has been
    // called previously since the last savepoint, with the same exclusive
    // setting, and at a lower sequence number, so skipping here should be
    // safe.
    if (!assume_tracked) {
      TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
    } else {
#ifndef NDEBUG
      assert(tracked_keys_cf->second.count(key_str) > 0);
      const auto& info = tracked_keys_cf->second.find(key_str)->second;
      assert(info.seq <= tracked_at_seq);
      assert(info.exclusive == exclusive);
#endif
    }
654
655
  }

agiardullo's avatar
agiardullo committed
656
657
658
659
660
  return s;
}

// Return OK() if this key has not been modified more recently than the
// transaction snapshot_.
661
662
// tracked_at_seq is the global seq at which we either locked the key or already
// have done ValidateSnapshot.
663
664
Status PessimisticTransaction::ValidateSnapshot(
    ColumnFamilyHandle* column_family, const Slice& key,
665
    SequenceNumber* tracked_at_seq) {
666
667
  assert(snapshot_);

668
669
670
671
672
  SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
  if (*tracked_at_seq <= snap_seq) {
    // If the key has been previous validated (or locked) at a sequence number
    // earlier than the current snapshot's sequence number, we already know it
    // has not been modified aftter snap_seq either.
673
674
    return Status::OK();
  }
675
676
677
  // Otherwise we have either
  // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
  // 2: snap_seq < tracked_at_seq: last time we lock the key was via
678
  // do_validate=false which means we had skipped ValidateSnapshot. In both
679
  // cases we should do ValidateSnapshot now.
agiardullo's avatar
agiardullo committed
680

681
  *tracked_at_seq = snap_seq;
agiardullo's avatar
agiardullo committed
682

683
  ColumnFamilyHandle* cfh =
Maysam Yabandeh's avatar
Maysam Yabandeh committed
684
      column_family ? column_family : db_impl_->DefaultColumnFamily();
agiardullo's avatar
agiardullo committed
685

686
687
  return TransactionUtil::CheckKeyForConflicts(
      db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */);
agiardullo's avatar
agiardullo committed
688
689
}

690
bool PessimisticTransaction::TryStealingLocks() {
691
  assert(IsExpired());
692
693
  TransactionState expected = STARTED;
  return std::atomic_compare_exchange_strong(&txn_state_, &expected,
694
695
696
                                             LOCKS_STOLEN);
}

697
698
void PessimisticTransaction::UnlockGetForUpdate(
    ColumnFamilyHandle* column_family, const Slice& key) {
agiardullo's avatar
agiardullo committed
699
700
701
  txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
}

702
Status PessimisticTransaction::SetName(const TransactionName& name) {
Reid Horuff's avatar
Reid Horuff committed
703
  Status s;
704
  if (txn_state_ == STARTED) {
Reid Horuff's avatar
Reid Horuff committed
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
    if (name_.length()) {
      s = Status::InvalidArgument("Transaction has already been named.");
    } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) {
      s = Status::InvalidArgument("Transaction name must be unique.");
    } else if (name.length() < 1 || name.length() > 512) {
      s = Status::InvalidArgument(
          "Transaction name length must be between 1 and 512 chars.");
    } else {
      name_ = name;
      txn_db_impl_->RegisterTransaction(this);
    }
  } else {
    s = Status::InvalidArgument("Transaction is beyond state for naming.");
  }
  return s;
}

722
}  // namespace ROCKSDB_NAMESPACE
agiardullo's avatar
agiardullo committed
723
724

#endif  // ROCKSDB_LITE