Skip to content

Commit e3630b0

Browse files
committed
Fix read iterator local snapshot consistency. Fixes ydb-platform#2885. (ydb-platform#3037)
1 parent fe6d594 commit e3630b0

9 files changed

+575
-112
lines changed

ydb/core/tx/datashard/datashard.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -2107,6 +2107,12 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
21072107
<< " promoting UnprotectedReadEdge to " << version);
21082108
SnapshotManager.PromoteUnprotectedReadEdge(version);
21092109

2110+
// Make sure pending distributed transactions are marked incomplete,
2111+
// since we just protected up to and including version from writes,
2112+
// we need to make sure new immediate conflicting writes are blocked
2113+
// and don't perform writes with out-of-order versions.
2114+
res.HadWrites |= Pipeline.MarkPlannedLogicallyIncompleteUpTo(version, txc);
2115+
21102116
// We want to promote the complete edge when protected reads are
21112117
// used or when we're already writing something anyway.
21122118
if (res.HadWrites) {

ydb/core/tx/datashard/datashard__read_iterator.cpp

+116-106
Large diffs are not rendered by default.

ydb/core/tx/datashard/datashard_dep_tracker.cpp

+5-3
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ namespace {
4646
return a.GetStep() < b.Step || (a.GetStep() == b.Step && a.GetTxId() < b.TxId);
4747
}
4848

49-
bool IsLessEqual(const TOperation& a, const TRowVersion& b) {
50-
return a.GetStep() < b.Step || (a.GetStep() == b.Step && a.GetTxId() <= b.TxId);
49+
bool IsEqual(const TOperation& a, const TRowVersion& b) {
50+
return a.GetStep() == b.Step && a.GetTxId() == b.TxId;
5151
}
5252
}
5353

@@ -799,8 +799,10 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
799799
Y_ABORT_UNLESS(!conflict.IsImmediate());
800800
if (snapshot.IsMax()) {
801801
conflict.AddImmediateConflict(op);
802-
} else if (snapshotRepeatable ? IsLessEqual(conflict, snapshot) : IsLess(conflict, snapshot)) {
802+
} else if (IsLess(conflict, snapshot)) {
803803
op->AddDependency(&conflict);
804+
} else if (IsEqual(conflict, snapshot)) {
805+
op->AddRepeatableReadConflict(&conflict);
804806
}
805807
};
806808

ydb/core/tx/datashard/datashard_pipeline.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ TPipeline::~TPipeline()
3838
pr.second->ClearSpecialDependencies();
3939
pr.second->ClearPlannedConflicts();
4040
pr.second->ClearImmediateConflicts();
41+
pr.second->ClearRepeatableReadConflicts();
4142
}
4243
}
4344

@@ -487,6 +488,7 @@ void TPipeline::UnblockNormalDependencies(const TOperation::TPtr &op)
487488
op->ClearDependencies();
488489
op->ClearPlannedConflicts();
489490
op->ClearImmediateConflicts();
491+
op->ClearRepeatableReadConflicts();
490492
DepTracker.RemoveOperation(op);
491493
}
492494

ydb/core/tx/datashard/datashard_ut_common_kqp.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,13 @@ namespace NKqpHelpers {
208208
return FormatResult(result);
209209
}
210210

211-
inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
211+
inline auto KqpSimpleSendCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
212212
Y_ABORT_UNLESS(!txId.empty(), "commit on empty transaction");
213-
auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */)));
213+
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */));
214+
}
215+
216+
inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
217+
auto response = AwaitResponse(runtime, KqpSimpleSendCommit(runtime, sessionId, txId, query));
214218
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
215219
return TStringBuilder() << "ERROR: " << response.operation().status();
216220
}

0 commit comments

Comments
 (0)