Skip to content

Commit b12cda0

Browse files
authored
24-3: Fix unexpected read iterator stream reset (#7710)
1 parent c85f845 commit b12cda0

File tree

4 files changed

+209
-13
lines changed

4 files changed

+209
-13
lines changed

ydb/core/tx/datashard/datashard__read_iterator.cpp

+70-13
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "datashard_locks_db.h"
66
#include "probes.h"
77

8+
#include <ydb/core/base/counters.h>
89
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
910

1011
#include <ydb/library/actors/core/monotonic_provider.h>
@@ -315,6 +316,8 @@ class TReader {
315316
, Self(self)
316317
, TableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion)
317318
, FirstUnprocessedQuery(State.FirstUnprocessedQuery)
319+
, LastProcessedKey(State.LastProcessedKey)
320+
, LastProcessedKeyErased(State.LastProcessedKeyErased)
318321
{
319322
GetTimeFast(&StartTime);
320323
EndTime = StartTime;
@@ -329,10 +332,10 @@ class TReader {
329332
bool toInclusive;
330333
TSerializedCellVec keyFromCells;
331334
TSerializedCellVec keyToCells;
332-
if (Y_UNLIKELY(FirstUnprocessedQuery == State.FirstUnprocessedQuery && State.LastProcessedKey)) {
335+
if (LastProcessedKey) {
333336
if (!State.Reverse) {
334-
keyFromCells = TSerializedCellVec(State.LastProcessedKey);
335-
fromInclusive = State.LastProcessedKeyErased;
337+
keyFromCells = TSerializedCellVec(LastProcessedKey);
338+
fromInclusive = LastProcessedKeyErased;
336339

337340
keyToCells = range.To;
338341
toInclusive = range.ToInclusive;
@@ -341,8 +344,8 @@ class TReader {
341344
keyFromCells = range.From;
342345
fromInclusive = range.FromInclusive;
343346

344-
keyToCells = TSerializedCellVec(State.LastProcessedKey);
345-
toInclusive = State.LastProcessedKeyErased;
347+
keyToCells = TSerializedCellVec(LastProcessedKey);
348+
toInclusive = LastProcessedKeyErased;
346349
}
347350
} else {
348351
keyFromCells = range.From;
@@ -500,6 +503,7 @@ class TReader {
500503
while (FirstUnprocessedQuery < State.Request->Ranges.size()) {
501504
if (ReachedTotalRowsLimit()) {
502505
FirstUnprocessedQuery = -1;
506+
LastProcessedKey.clear();
503507
return true;
504508
}
505509

@@ -526,6 +530,7 @@ class TReader {
526530
FirstUnprocessedQuery++;
527531
else
528532
FirstUnprocessedQuery--;
533+
LastProcessedKey.clear();
529534
}
530535

531536
return true;
@@ -537,6 +542,7 @@ class TReader {
537542
while (FirstUnprocessedQuery < State.Request->Keys.size()) {
538543
if (ReachedTotalRowsLimit()) {
539544
FirstUnprocessedQuery = -1;
545+
LastProcessedKey.clear();
540546
return true;
541547
}
542548

@@ -562,6 +568,7 @@ class TReader {
562568
FirstUnprocessedQuery++;
563569
else
564570
FirstUnprocessedQuery--;
571+
LastProcessedKey.clear();
565572
}
566573

567574
return true;
@@ -727,6 +734,28 @@ class TReader {
727734
}
728735

729736
void UpdateState(TReadIteratorState& state, bool sentResult) {
737+
if (state.FirstUnprocessedQuery == FirstUnprocessedQuery &&
738+
state.LastProcessedKey && !LastProcessedKey)
739+
{
740+
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
741+
"DataShard " << Self->TabletID() << " detected unexpected reset of LastProcessedKey:"
742+
<< " ReadId# " << State.ReadId
743+
<< " LastSeqNo# " << State.SeqNo
744+
<< " LastQuery# " << State.FirstUnprocessedQuery
745+
<< " RowsRead# " << RowsRead
746+
<< " RowsProcessed# " << RowsProcessed
747+
<< " RowsSinceLastCheck# " << RowsSinceLastCheck
748+
<< " BytesInResult# " << BytesInResult
749+
<< " DeletedRowSkips# " << DeletedRowSkips
750+
<< " InvisibleRowSkips# " << InvisibleRowSkips
751+
<< " Quota.Rows# " << State.Quota.Rows
752+
<< " Quota.Bytes# " << State.Quota.Bytes
753+
<< " State.TotalRows# " << State.TotalRows
754+
<< " State.TotalRowsLimit# " << State.TotalRowsLimit
755+
<< " State.MaxRowsInResult# " << State.MaxRowsInResult);
756+
Self->IncCounterReadIteratorLastKeyReset();
757+
}
758+
730759
state.TotalRows += RowsRead;
731760
state.FirstUnprocessedQuery = FirstUnprocessedQuery;
732761
state.LastProcessedKey = LastProcessedKey;
@@ -1632,6 +1661,7 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation {
16321661
if (Reader->HasUnreadQueries()) {
16331662
Reader->UpdateState(state, ResultSent);
16341663
if (!state.IsExhausted()) {
1664+
state.ReadContinuePending = true;
16351665
ctx.Send(
16361666
Self->SelfId(),
16371667
new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId));
@@ -2282,6 +2312,15 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
22822312
Y_ASSERT(it->second);
22832313
auto& state = *it->second;
22842314

2315+
if (state.IsExhausted()) {
2316+
// iterator quota reduced and exhausted while ReadContinue was inflight
2317+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId
2318+
<< ", quota exhausted while rescheduling");
2319+
state.ReadContinuePending = false;
2320+
Result.reset();
2321+
return true;
2322+
}
2323+
22852324
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId
22862325
<< ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery);
22872326

@@ -2394,6 +2433,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
23942433
if (Reader->Read(txc, ctx)) {
23952434
// Retry later when dependencies are resolved
23962435
if (!Reader->GetVolatileReadDependencies().empty()) {
2436+
state.ReadContinuePending = true;
23972437
Self->WaitVolatileDependenciesThenSend(
23982438
Reader->GetVolatileReadDependencies(),
23992439
Self->SelfId(),
@@ -2480,6 +2520,8 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
24802520
Y_ABORT_UNLESS(it->second);
24812521
auto& state = *it->second;
24822522

2523+
state.ReadContinuePending = false;
2524+
24832525
if (!Result) {
24842526
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId
24852527
<< " TTxReadContinue::Execute() finished without Result, aborting");
@@ -2527,14 +2569,14 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
25272569
}
25282570

25292571
if (Reader->HasUnreadQueries()) {
2530-
Y_ASSERT(it->second);
2531-
auto& state = *it->second;
2572+
bool wasExhausted = state.IsExhausted();
25322573
Reader->UpdateState(state, useful);
25332574
if (!state.IsExhausted()) {
2575+
state.ReadContinuePending = true;
25342576
ctx.Send(
25352577
Self->SelfId(),
25362578
new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId));
2537-
} else {
2579+
} else if (!wasExhausted) {
25382580
Self->IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
25392581
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
25402582
<< " read iterator# " << ReadId << " exhausted");
@@ -2807,14 +2849,19 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
28072849
bool wasExhausted = state.IsExhausted();
28082850
state.UpQuota(
28092851
record.GetSeqNo(),
2810-
record.GetMaxRows(),
2811-
record.GetMaxBytes());
2852+
record.HasMaxRows() ? record.GetMaxRows() : Max<ui64>(),
2853+
record.HasMaxBytes() ? record.GetMaxBytes() : Max<ui64>());
28122854

28132855
if (wasExhausted && !state.IsExhausted()) {
28142856
DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
2815-
ctx.Send(
2816-
SelfId(),
2817-
new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId()));
2857+
if (!state.ReadContinuePending) {
2858+
state.ReadContinuePending = true;
2859+
ctx.Send(
2860+
SelfId(),
2861+
new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId()));
2862+
}
2863+
} else if (!wasExhausted && state.IsExhausted()) {
2864+
IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
28182865
}
28192866

28202867
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck for read iterator# " << readId
@@ -2943,6 +2990,16 @@ void TDataShard::UnsubscribeReadIteratorSessions(const TActorContext& ctx) {
29432990
ReadIteratorSessions.clear();
29442991
}
29452992

2993+
void TDataShard::IncCounterReadIteratorLastKeyReset() {
2994+
if (!CounterReadIteratorLastKeyReset) {
2995+
CounterReadIteratorLastKeyReset = GetServiceCounters(AppData()->Counters, "tablets")
2996+
->GetSubgroup("type", "DataShard")
2997+
->GetSubgroup("category", "app")
2998+
->GetCounter("DataShard/ReadIteratorLastKeyReset", true);
2999+
}
3000+
++*CounterReadIteratorLastKeyReset;
3001+
}
3002+
29463003
} // NKikimr::NDataShard
29473004

29483005
template<>

ydb/core/tx/datashard/datashard_impl.h

+4
Original file line numberDiff line numberDiff line change
@@ -3319,6 +3319,10 @@ class TDataShard
33193319
bool AllowCancelROwithReadsets() const;
33203320

33213321
void ResolveTablePath(const TActorContext &ctx);
3322+
3323+
public:
3324+
NMonitoring::TDynamicCounters::TCounterPtr CounterReadIteratorLastKeyReset;
3325+
void IncCounterReadIteratorLastKeyReset();
33223326
};
33233327

33243328
NKikimrTxDataShard::TError::EKind ConvertErrCode(NMiniKQL::IEngineFlat::EResult code);

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

+134
Original file line numberDiff line numberDiff line change
@@ -4670,6 +4670,140 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {
46704670
"result2: " << result2);
46714671
}
46724672

4673+
template<class TEvType>
4674+
class TBlockEvents : public std::deque<typename TEvType::TPtr> {
4675+
public:
4676+
TBlockEvents(TTestActorRuntime& runtime, std::function<bool(typename TEvType::TPtr&)> condition = {})
4677+
: Runtime(runtime)
4678+
, Condition(std::move(condition))
4679+
, Holder(Runtime.AddObserver<TEvType>(
4680+
[this](typename TEvType::TPtr& ev) {
4681+
this->Process(ev);
4682+
}))
4683+
{}
4684+
4685+
TBlockEvents& Unblock(size_t count = -1) {
4686+
while (!this->empty() && count > 0) {
4687+
auto& ev = this->front();
4688+
IEventHandle* ptr = ev.Get();
4689+
UnblockedOnce.insert(ptr);
4690+
Runtime.Send(ev.Release(), 0, /* viaActorSystem */ true);
4691+
this->pop_front();
4692+
--count;
4693+
}
4694+
return *this;
4695+
}
4696+
4697+
void Stop() {
4698+
UnblockedOnce.clear();
4699+
Holder.Remove();
4700+
}
4701+
4702+
private:
4703+
void Process(typename TEvType::TPtr& ev) {
4704+
IEventHandle* ptr = ev.Get();
4705+
auto it = UnblockedOnce.find(ptr);
4706+
if (it != UnblockedOnce.end()) {
4707+
UnblockedOnce.erase(it);
4708+
return;
4709+
}
4710+
4711+
if (Condition && !Condition(ev)) {
4712+
return;
4713+
}
4714+
4715+
this->emplace_back(std::move(ev));
4716+
}
4717+
4718+
private:
4719+
TTestActorRuntime& Runtime;
4720+
std::function<bool(typename TEvType::TPtr&)> Condition;
4721+
TTestActorRuntime::TEventObserverHolder Holder;
4722+
THashSet<IEventHandle*> UnblockedOnce;
4723+
};
4724+
4725+
Y_UNIT_TEST(Bug_7674_IteratorDuplicateRows) {
4726+
TPortManager pm;
4727+
TServerSettings serverSettings(pm.GetPort(2134));
4728+
serverSettings.SetDomainName("Root")
4729+
.SetUseRealThreads(false);
4730+
TServer::TPtr server = new TServer(serverSettings);
4731+
4732+
auto& runtime = *server->GetRuntime();
4733+
auto sender = runtime.AllocateEdgeActor();
4734+
4735+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
4736+
4737+
InitRoot(server, sender);
4738+
4739+
TDisableDataShardLogBatching disableDataShardLogBatching;
4740+
4741+
CreateShardedTable(server, sender, "/Root", "table-1", 1);
4742+
4743+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);");
4744+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (6, 60), (7, 70), (8, 80), (9, 90), (10, 100);");
4745+
runtime.SimulateSleep(TDuration::Seconds(1));
4746+
4747+
auto forceSmallChunks = runtime.AddObserver<TEvDataShard::TEvRead>(
4748+
[&](TEvDataShard::TEvRead::TPtr& ev) {
4749+
auto* msg = ev->Get();
4750+
// Force chunks of at most 3 rows
4751+
msg->Record.SetMaxRowsInResult(3);
4752+
});
4753+
4754+
TBlockEvents<TEvDataShard::TEvReadAck> blockedAcks(runtime);
4755+
TBlockEvents<TEvDataShard::TEvReadResult> blockedResults(runtime);
4756+
TBlockEvents<TEvDataShard::TEvReadContinue> blockedContinue(runtime);
4757+
4758+
auto waitFor = [&](const TString& description, const auto& condition, size_t count = 1) {
4759+
while (!condition()) {
4760+
UNIT_ASSERT_C(count > 0, "... failed to wait for " << description);
4761+
Cerr << "... waiting for " << description << Endl;
4762+
TDispatchOptions options;
4763+
options.CustomFinalCondition = [&]() {
4764+
return condition();
4765+
};
4766+
runtime.DispatchEvents(options);
4767+
--count;
4768+
}
4769+
};
4770+
4771+
auto readFuture = KqpSimpleSend(runtime, "SELECT key, value FROM `/Root/table-1` ORDER BY key LIMIT 7");
4772+
waitFor("first TEvReadContinue", [&]{ return blockedContinue.size() >= 1; });
4773+
waitFor("first TEvReadResult", [&]{ return blockedResults.size() >= 1; });
4774+
4775+
blockedContinue.Unblock(1);
4776+
waitFor("second TEvReadContinue", [&]{ return blockedContinue.size() >= 1; });
4777+
waitFor("second TEvReadResult", [&]{ return blockedResults.size() >= 2; });
4778+
4779+
// We need both results to arrive without pauses
4780+
blockedResults.Unblock();
4781+
4782+
waitFor("both TEvReadAcks", [&]{ return blockedAcks.size() >= 2; });
4783+
4784+
// Unblock the first TEvReadAck and then pending TEvReadContinue
4785+
blockedAcks.Unblock(1);
4786+
blockedContinue.Unblock(1);
4787+
4788+
// Give it some time to trigger the bug
4789+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
4790+
4791+
// Stop blocking everything
4792+
blockedAcks.Unblock().Stop();
4793+
blockedResults.Unblock().Stop();
4794+
blockedContinue.Unblock().Stop();
4795+
4796+
UNIT_ASSERT_VALUES_EQUAL(
4797+
FormatResult(AwaitResponse(runtime, std::move(readFuture))),
4798+
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
4799+
"{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
4800+
"{ items { uint32_value: 3 } items { uint32_value: 30 } }, "
4801+
"{ items { uint32_value: 4 } items { uint32_value: 40 } }, "
4802+
"{ items { uint32_value: 5 } items { uint32_value: 50 } }, "
4803+
"{ items { uint32_value: 6 } items { uint32_value: 60 } }, "
4804+
"{ items { uint32_value: 7 } items { uint32_value: 70 } }");
4805+
}
4806+
46734807
}
46744808

46754809
} // namespace NKikimr

ydb/core/tx/datashard/read_iterator.h

+1
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ struct TReadIteratorState {
205205
TActorId SessionId;
206206
TMonotonic StartTs;
207207
bool IsFinished = false;
208+
bool ReadContinuePending = false;
208209

209210
// note that we send SeqNo's starting from 1
210211
ui64 SeqNo = 0;

0 commit comments

Comments
 (0)