Skip to content

Commit 6fe3b58

Browse files
authored
DataShard: prioritize cancellation processing for reads (#15398)
1 parent 9187047 commit 6fe3b58

File tree

5 files changed

+53
-4
lines changed

5 files changed

+53
-4
lines changed

ydb/core/tablet_flat/flat_executor.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,10 +224,10 @@ void TExecutor::RecreatePageCollectionsCache() noexcept
224224

225225
if (TransactionWaitPads) {
226226
for (auto &xpair : TransactionWaitPads) {
227+
xpair.second->WaitingSpan.EndOk();
227228
TSeat* seat = xpair.second->Seat;
228229
Y_ABORT_UNLESS(seat->State == ESeatState::Waiting);
229230
seat->State = ESeatState::None;
230-
xpair.second->WaitingSpan.EndOk();
231231

232232
if (seat->Cancelled) {
233233
FinishCancellation(seat, false);
@@ -627,11 +627,11 @@ void TExecutor::ActivateWaitingTransactions(TPrivatePageCache::TPage::TWaitQueue
627627
bool cancelled = false;
628628
while (TPrivatePageCacheWaitPad *waitPad = waitPadsQueue->Pop()) {
629629
if (auto it = TransactionWaitPads.find(waitPad); it != TransactionWaitPads.end()) {
630+
it->second->WaitingSpan.EndOk();
630631
TSeat* seat = it->second->Seat;
631632
Y_ABORT_UNLESS(seat->State == ESeatState::Waiting);
632633
seat->State = ESeatState::None;
633-
seat->WaitingSpan.EndOk();
634-
TransactionWaitPads.erase(waitPad);
634+
TransactionWaitPads.erase(it);
635635
if (seat->Cancelled) {
636636
FinishCancellation(seat, false);
637637
cancelled = true;

ydb/core/tx/datashard/datashard__read_iterator.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2399,6 +2399,7 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB
23992399
return true;
24002400
}
24012401
auto& state = *readIt->second;
2402+
state.EnqueuedLocalTxId = 0;
24022403
ReplyError(
24032404
Ydb::StatusIds::INTERNAL_ERROR,
24042405
TStringBuilder() << "Failed to sync follower: " << errMessage
@@ -2417,6 +2418,7 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB
24172418
const auto& record = request->Record;
24182419

24192420
Y_ABORT_UNLESS(state.State == TReadIteratorState::EState::Init);
2421+
state.EnqueuedLocalTxId = 0;
24202422

24212423
bool setUsingSnapshotFlag = false;
24222424

@@ -3221,7 +3223,9 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
32213223

32223224
SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size());
32233225

3224-
Executor()->Execute(new TTxReadViaPipeline(this, localReadId, request->ReadSpan.GetTraceId()), ctx);
3226+
// Note: we may have a read cancellation already in the mailbox, so we
3227+
// enqueue a low-priority transaction.
3228+
state.EnqueuedLocalTxId = Executor()->EnqueueLowPriority(new TTxReadViaPipeline(this, localReadId, request->ReadSpan.GetTraceId()));
32253229
}
32263230

32273231
void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorContext& ctx) {
@@ -3454,6 +3458,9 @@ void TDataShard::DeleteReadIterator(TReadIteratorsMap::iterator it) {
34543458
if (state.IsExhausted()) {
34553459
DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
34563460
}
3461+
if (state.EnqueuedLocalTxId) {
3462+
Executor()->CancelTransaction(state.EnqueuedLocalTxId);
3463+
}
34573464
ReadIteratorsByLocalReadId.erase(state.LocalReadId);
34583465
ReadIterators.erase(it);
34593466
SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size());

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4969,4 +4969,41 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorBatchMode) {
49694969

49704970
}
49714971

4972+
Y_UNIT_TEST_SUITE(DataShardReadIteratorFastCancel) {
4973+
4974+
Y_UNIT_TEST(ShouldProcessFastCancel) {
4975+
TTestHelper helper;
4976+
4977+
auto request1 = helper.GetBaseReadRequest("table-1", 1);
4978+
AddRangeQuery<ui32>(*request1, { 1 }, true, { 1 }, true);
4979+
auto readResult1 = helper.SendRead("table-1", request1.release());
4980+
CheckResult(helper.Tables.at("table-1").UserTable, *readResult1, {
4981+
{1, 1, 1, 100},
4982+
});
4983+
UNIT_ASSERT(readResult1->Record.GetFinished());
4984+
4985+
auto& runtime = *helper.Server->GetRuntime();
4986+
auto sender = runtime.AllocateEdgeActor();
4987+
auto shardActor = ResolveTablet(runtime, helper.Tables.at("table-1").TabletId);
4988+
4989+
auto request2 = helper.GetBaseReadRequest("table-1", 2);
4990+
AddRangeQuery<ui32>(*request2, { 1 }, true, { 1 }, true);
4991+
auto request3 = helper.GetBaseReadRequest("table-1", 3);
4992+
AddRangeQuery<ui32>(*request3, { 1 }, true, { 1 }, true);
4993+
auto cancel2 = std::make_unique<TEvDataShard::TEvReadCancel>();
4994+
cancel2->Record.SetReadId(2);
4995+
auto cancel3 = std::make_unique<TEvDataShard::TEvReadCancel>();
4996+
cancel3->Record.SetReadId(3);
4997+
4998+
runtime.Send(new IEventHandle(shardActor, sender, request2.release()), 0, true);
4999+
runtime.Send(new IEventHandle(shardActor, sender, request3.release()), 0, true);
5000+
runtime.Send(new IEventHandle(shardActor, sender, cancel2.release()), 0, true);
5001+
runtime.Send(new IEventHandle(shardActor, sender, cancel3.release()), 0, true);
5002+
5003+
auto ev = runtime.GrabEdgeEvent<TEvDataShard::TEvReadResult>(sender, TDuration::Seconds(1));
5004+
UNIT_ASSERT_C(!ev, "Unexpected response received (should have been cancelled)");
5005+
}
5006+
5007+
}
5008+
49725009
} // namespace NKikimr

ydb/core/tx/datashard/datashard_ut_trace.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
359359
Repeat(
360360
ExpectedSpan("Datashard.Read",
361361
ExpectedSpan("Tablet.Transaction",
362+
ExpectedSpan("Tablet.Transaction.Enqueued"),
362363
ExpectedSpan("Tablet.Transaction.Execute",
363364
Repeat("Datashard.Unit", 3)),
364365
// No extra page fault with btree index (root is in meta)
@@ -443,6 +444,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
443444
Repeat(
444445
ExpectedSpan("Datashard.Read",
445446
ExpectedSpan("Tablet.Transaction",
447+
ExpectedSpan("Tablet.Transaction.Enqueued"),
446448
ExpectedSpan("Tablet.Transaction.Execute",
447449
Repeat("Datashard.Unit", 4)),
448450
"Tablet.Transaction.Complete"),

ydb/core/tx/datashard/read_iterator.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,9 @@ struct TReadIteratorState {
218218
TActorId ScanActorId;
219219
// temporary storage for forwarded events until scan has started
220220
std::vector<std::unique_ptr<IEventHandle>> ScanPendingEvents;
221+
222+
// May be used to cancel enqueued transactions
223+
ui64 EnqueuedLocalTxId = 0;
221224
};
222225

223226
using TReadIteratorsMap = THashMap<TReadIteratorId, TReadIteratorState, TReadIteratorId::THash>;

0 commit comments

Comments
 (0)