Skip to content

Commit cd8c306

Browse files
authored
Better handling for mediator time jumps in datashard (#2342)
1 parent 2712745 commit cd8c306

File tree

2 files changed

+64
-55
lines changed

2 files changed

+64
-55
lines changed

ydb/core/tx/datashard/datashard.cpp

Lines changed: 60 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2050,68 +2050,74 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
20502050
<< " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge()
20512051
<< " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied());
20522052

2053-
TRowVersion edge;
2054-
TRowVersion readEdge = Max(
2055-
SnapshotManager.GetCompleteEdge(),
2056-
SnapshotManager.GetUnprotectedReadEdge());
2057-
TRowVersion writeEdge = Max(readEdge, SnapshotManager.GetIncompleteEdge());
2058-
switch (mode) {
2059-
case EMvccTxMode::ReadOnly:
2060-
// With read-only transactions we don't need reads to include
2061-
// changes made at the incomplete edge, as that is a point where
2062-
// distributed transactions performed some reads, not writes.
2063-
// Since incomplete transactions are still inflight, the actual
2064-
// version will stick to the first incomplete transaction is queue,
2065-
// effectively reading non-repeatable state before that transaction.
2066-
edge = readEdge;
2067-
break;
2068-
case EMvccTxMode::ReadWrite:
2069-
// With read-write transactions we must choose a point that is
2070-
// greater than both complete and incomplete edges. The reason
2071-
// is that incomplete transactions performed some reads at that
2072-
// point and these snapshot points must be repeatable.
2073-
// Note that as soon as the first write past the IncompleteEdge
2074-
// happens it cements all distributed transactions up to that point
2075-
// as complete, so all future reads and writes are guaranteed to
2076-
// include that point as well.
2077-
edge = writeEdge;
2078-
break;
2079-
}
2053+
TRowVersion version = [&]() {
2054+
TRowVersion edge;
2055+
TRowVersion readEdge = Max(
2056+
SnapshotManager.GetCompleteEdge(),
2057+
SnapshotManager.GetUnprotectedReadEdge());
2058+
TRowVersion writeEdge = Max(readEdge, SnapshotManager.GetIncompleteEdge());
2059+
switch (mode) {
2060+
case EMvccTxMode::ReadOnly:
2061+
// With read-only transactions we don't need reads to include
2062+
// changes made at the incomplete edge, as that is a point where
2063+
// distributed transactions performed some reads, not writes.
2064+
// Since incomplete transactions are still inflight, the actual
2065+
// version will stick to the first incomplete transaction is queue,
2066+
// effectively reading non-repeatable state before that transaction.
2067+
edge = readEdge;
2068+
break;
2069+
case EMvccTxMode::ReadWrite:
2070+
// With read-write transactions we must choose a point that is
2071+
// greater than both complete and incomplete edges. The reason
2072+
// is that incomplete transactions performed some reads at that
2073+
// point and these snapshot points must be repeatable.
2074+
// Note that as soon as the first write past the IncompleteEdge
2075+
// happens it cements all distributed transactions up to that point
2076+
// as complete, so all future reads and writes are guaranteed to
2077+
// include that point as well.
2078+
edge = writeEdge;
2079+
break;
2080+
}
20802081

2081-
// If there's any planned operation that is above our edge, it would be a
2082-
// suitable version for a new immediate operation. We effectively try to
2083-
// execute "before" that point if possible.
2084-
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
2085-
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
2082+
// If there's any planned operation that is above our edge, it would be a
2083+
// suitable version for a new immediate operation. We effectively try to
2084+
// execute "before" that point if possible.
2085+
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
2086+
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
2087+
2088+
// Normally we stick transactions to the end of the last known mediator step
2089+
// Note this calculations only happen when we don't have distributed
2090+
// transactions left in queue, and we won't have any more transactions
2091+
// up to the current mediator time. The mediator time itself may be stale,
2092+
// in which case we may have evidence of its higher value via complete and
2093+
// incomplete edges above.
2094+
const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
2095+
TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
2096+
2097+
switch (mode) {
2098+
case EMvccTxMode::ReadOnly: {
2099+
// We read at the end of the current step
2100+
return mediatorEdge;
2101+
}
2102+
2103+
case EMvccTxMode::ReadWrite: {
2104+
// We write at the end of the current step, or the start of the next step when that's protected
2105+
return Max(mediatorEdge, writeEdge.Next());
2106+
}
2107+
}
20862108

2087-
// Normally we stick transactions to the end of the last known mediator step
2088-
// Note this calculations only happen when we don't have distributed
2089-
// transactions left in queue, and we won't have any more transactions
2090-
// up to the current mediator time. The mediator time itself may be stale,
2091-
// in which case we may have evidence of its higher value via complete and
2092-
// incomplete edges above.
2093-
const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
2094-
TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
2109+
Y_ABORT("unreachable");
2110+
}();
20952111

20962112
switch (mode) {
20972113
case EMvccTxMode::ReadOnly: {
2098-
// We want to include everything that was potentially confirmed to
2099-
// users, but we don't want to include anything that is not replied
2100-
// at the start of this read.
2101-
// Note it's only possible to have ImmediateWriteEdge > mediatorEdge
2102-
// when ImmediateWriteEdge == mediatorEdge + 1
2103-
return Max(mediatorEdge, SnapshotManager.GetImmediateWriteEdgeReplied());
2114+
// We must read all writes we have replied to already
2115+
return Max(version, SnapshotManager.GetImmediateWriteEdgeReplied());
21042116
}
21052117

21062118
case EMvccTxMode::ReadWrite: {
2107-
// We must use at least a previously used immediate write edge
2108-
// But we must also avoid trumpling over any unprotected mvcc
2109-
// snapshot reads that have occurred.
2110-
// Note it's only possible to go past the last known mediator step
2111-
// when we had an unprotected read, which itself happens at the
2112-
// last mediator step. So we may only ever have a +1 step, never
2113-
// anything more.
2114-
return Max(mediatorEdge, writeEdge.Next(), SnapshotManager.GetImmediateWriteEdge());
2119+
// We must never go backwards in our single-shard writes
2120+
return Max(version, SnapshotManager.GetImmediateWriteEdge());
21152121
}
21162122
}
21172123

ydb/core/tx/time_cast/time_cast.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ void TMediatorTimecastEntry::Update(ui64 step, ui64 *exemption, ui64 exsz) {
3030
Y_UNUSED(exemption);
3131
Y_UNUSED(exsz);
3232

33-
AtomicSet(Step, step);
33+
// Mediator time shouldn't go back while shards are running
34+
if (Get(0) < step) {
35+
AtomicSet(Step, step);
36+
}
3437
}
3538

3639
class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {

0 commit comments

Comments
 (0)