Skip to content

Commit 56bd581

Browse files
committed
Better handling for mediator time jumps in datashard (#2342)
1 parent 26fd3f5 commit 56bd581

File tree

2 files changed

+64
-55
lines changed

2 files changed

+64
-55
lines changed

ydb/core/tx/datashard/datashard.cpp

Lines changed: 60 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1983,68 +1983,74 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
19831983
<< " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge()
19841984
<< " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied());
19851985

1986-
TRowVersion edge;
1987-
TRowVersion readEdge = Max(
1988-
SnapshotManager.GetCompleteEdge(),
1989-
SnapshotManager.GetUnprotectedReadEdge());
1990-
TRowVersion writeEdge = Max(readEdge, SnapshotManager.GetIncompleteEdge());
1991-
switch (mode) {
1992-
case EMvccTxMode::ReadOnly:
1993-
// With read-only transactions we don't need reads to include
1994-
// changes made at the incomplete edge, as that is a point where
1995-
// distributed transactions performed some reads, not writes.
1996-
// Since incomplete transactions are still inflight, the actual
1997-
// version will stick to the first incomplete transaction is queue,
1998-
// effectively reading non-repeatable state before that transaction.
1999-
edge = readEdge;
2000-
break;
2001-
case EMvccTxMode::ReadWrite:
2002-
// With read-write transactions we must choose a point that is
2003-
// greater than both complete and incomplete edges. The reason
2004-
// is that incomplete transactions performed some reads at that
2005-
// point and these snapshot points must be repeatable.
2006-
// Note that as soon as the first write past the IncompleteEdge
2007-
// happens it cements all distributed transactions up to that point
2008-
// as complete, so all future reads and writes are guaranteed to
2009-
// include that point as well.
2010-
edge = writeEdge;
2011-
break;
2012-
}
1986+
TRowVersion version = [&]() {
1987+
TRowVersion edge;
1988+
TRowVersion readEdge = Max(
1989+
SnapshotManager.GetCompleteEdge(),
1990+
SnapshotManager.GetUnprotectedReadEdge());
1991+
TRowVersion writeEdge = Max(readEdge, SnapshotManager.GetIncompleteEdge());
1992+
switch (mode) {
1993+
case EMvccTxMode::ReadOnly:
1994+
// With read-only transactions we don't need reads to include
1995+
// changes made at the incomplete edge, as that is a point where
1996+
// distributed transactions performed some reads, not writes.
1997+
// Since incomplete transactions are still inflight, the actual
1998+
// version will stick to the first incomplete transaction is queue,
1999+
// effectively reading non-repeatable state before that transaction.
2000+
edge = readEdge;
2001+
break;
2002+
case EMvccTxMode::ReadWrite:
2003+
// With read-write transactions we must choose a point that is
2004+
// greater than both complete and incomplete edges. The reason
2005+
// is that incomplete transactions performed some reads at that
2006+
// point and these snapshot points must be repeatable.
2007+
// Note that as soon as the first write past the IncompleteEdge
2008+
// happens it cements all distributed transactions up to that point
2009+
// as complete, so all future reads and writes are guaranteed to
2010+
// include that point as well.
2011+
edge = writeEdge;
2012+
break;
2013+
}
20132014

2014-
// If there's any planned operation that is above our edge, it would be a
2015-
// suitable version for a new immediate operation. We effectively try to
2016-
// execute "before" that point if possible.
2017-
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
2018-
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
2015+
// If there's any planned operation that is above our edge, it would be a
2016+
// suitable version for a new immediate operation. We effectively try to
2017+
// execute "before" that point if possible.
2018+
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
2019+
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
2020+
2021+
// Normally we stick transactions to the end of the last known mediator step
2022+
// Note this calculations only happen when we don't have distributed
2023+
// transactions left in queue, and we won't have any more transactions
2024+
// up to the current mediator time. The mediator time itself may be stale,
2025+
// in which case we may have evidence of its higher value via complete and
2026+
// incomplete edges above.
2027+
const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
2028+
TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
2029+
2030+
switch (mode) {
2031+
case EMvccTxMode::ReadOnly: {
2032+
// We read at the end of the current step
2033+
return mediatorEdge;
2034+
}
2035+
2036+
case EMvccTxMode::ReadWrite: {
2037+
// We write at the end of the current step, or the start of the next step when that's protected
2038+
return Max(mediatorEdge, writeEdge.Next());
2039+
}
2040+
}
20192041

2020-
// Normally we stick transactions to the end of the last known mediator step
2021-
// Note this calculations only happen when we don't have distributed
2022-
// transactions left in queue, and we won't have any more transactions
2023-
// up to the current mediator time. The mediator time itself may be stale,
2024-
// in which case we may have evidence of its higher value via complete and
2025-
// incomplete edges above.
2026-
const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
2027-
TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
2042+
Y_ABORT("unreachable");
2043+
}();
20282044

20292045
switch (mode) {
20302046
case EMvccTxMode::ReadOnly: {
2031-
// We want to include everything that was potentially confirmed to
2032-
// users, but we don't want to include anything that is not replied
2033-
// at the start of this read.
2034-
// Note it's only possible to have ImmediateWriteEdge > mediatorEdge
2035-
// when ImmediateWriteEdge == mediatorEdge + 1
2036-
return Max(mediatorEdge, SnapshotManager.GetImmediateWriteEdgeReplied());
2047+
// We must read all writes we have replied to already
2048+
return Max(version, SnapshotManager.GetImmediateWriteEdgeReplied());
20372049
}
20382050

20392051
case EMvccTxMode::ReadWrite: {
2040-
// We must use at least a previously used immediate write edge
2041-
// But we must also avoid trumpling over any unprotected mvcc
2042-
// snapshot reads that have occurred.
2043-
// Note it's only possible to go past the last known mediator step
2044-
// when we had an unprotected read, which itself happens at the
2045-
// last mediator step. So we may only ever have a +1 step, never
2046-
// anything more.
2047-
return Max(mediatorEdge, writeEdge.Next(), SnapshotManager.GetImmediateWriteEdge());
2052+
// We must never go backwards in our single-shard writes
2053+
return Max(version, SnapshotManager.GetImmediateWriteEdge());
20482054
}
20492055
}
20502056

ydb/core/tx/time_cast/time_cast.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ void TMediatorTimecastEntry::Update(ui64 step, ui64 *exemption, ui64 exsz) {
3030
Y_UNUSED(exemption);
3131
Y_UNUSED(exsz);
3232

33-
AtomicSet(Step, step);
33+
// Mediator time shouldn't go back while shards are running
34+
if (Get(0) < step) {
35+
AtomicSet(Step, step);
36+
}
3437
}
3538

3639
class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {

0 commit comments

Comments
 (0)