Skip to content

Commit 69f163b

Browse files
authored
Merge 678e3ad into 327e2be
2 parents 327e2be + 678e3ad commit 69f163b

9 files changed

+575
-112
lines changed

ydb/core/tx/datashard/datashard.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -2174,6 +2174,12 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
21742174
<< " promoting UnprotectedReadEdge to " << version);
21752175
SnapshotManager.PromoteUnprotectedReadEdge(version);
21762176

2177+
// Make sure pending distributed transactions are marked incomplete,
2178+
// since we just protected up to and including version from writes,
2179+
// we need to make sure new immediate conflicting writes are blocked
2180+
// and don't perform writes with out-of-order versions.
2181+
res.HadWrites |= Pipeline.MarkPlannedLogicallyIncompleteUpTo(version, txc);
2182+
21772183
// We want to promote the complete edge when protected reads are
21782184
// used or when we're already writing something anyway.
21792185
if (res.HadWrites) {

ydb/core/tx/datashard/datashard__read_iterator.cpp

+116-106
Large diffs are not rendered by default.

ydb/core/tx/datashard/datashard_dep_tracker.cpp

+5-3
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ namespace {
4646
return a.GetStep() < b.Step || (a.GetStep() == b.Step && a.GetTxId() < b.TxId);
4747
}
4848

49-
bool IsLessEqual(const TOperation& a, const TRowVersion& b) {
50-
return a.GetStep() < b.Step || (a.GetStep() == b.Step && a.GetTxId() <= b.TxId);
49+
bool IsEqual(const TOperation& a, const TRowVersion& b) {
50+
return a.GetStep() == b.Step && a.GetTxId() == b.TxId;
5151
}
5252
}
5353

@@ -799,8 +799,10 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
799799
Y_ABORT_UNLESS(!conflict.IsImmediate());
800800
if (snapshot.IsMax()) {
801801
conflict.AddImmediateConflict(op);
802-
} else if (snapshotRepeatable ? IsLessEqual(conflict, snapshot) : IsLess(conflict, snapshot)) {
802+
} else if (IsLess(conflict, snapshot)) {
803803
op->AddDependency(&conflict);
804+
} else if (IsEqual(conflict, snapshot)) {
805+
op->AddRepeatableReadConflict(&conflict);
804806
}
805807
};
806808

ydb/core/tx/datashard/datashard_pipeline.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ TPipeline::~TPipeline()
3939
pr.second->ClearSpecialDependencies();
4040
pr.second->ClearPlannedConflicts();
4141
pr.second->ClearImmediateConflicts();
42+
pr.second->ClearRepeatableReadConflicts();
4243
}
4344
}
4445

@@ -487,6 +488,7 @@ void TPipeline::UnblockNormalDependencies(const TOperation::TPtr &op)
487488
op->ClearDependencies();
488489
op->ClearPlannedConflicts();
489490
op->ClearImmediateConflicts();
491+
op->ClearRepeatableReadConflicts();
490492
DepTracker.RemoveOperation(op);
491493
}
492494

ydb/core/tx/datashard/datashard_ut_common_kqp.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,13 @@ namespace NKqpHelpers {
208208
return FormatResult(result);
209209
}
210210

211-
inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
211+
inline auto KqpSimpleSendCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
212212
Y_ABORT_UNLESS(!txId.empty(), "commit on empty transaction");
213-
auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */)));
213+
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */));
214+
}
215+
216+
inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
217+
auto response = AwaitResponse(runtime, KqpSimpleSendCommit(runtime, sessionId, txId, query));
214218
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
215219
return TStringBuilder() << "ERROR: " << response.operation().status();
216220
}

0 commit comments

Comments
 (0)