Skip to content

Commit b3bee3a

Browse files
authored
EvWrite Sink buffer for DataShard (#3966)
1 parent 0e176bd commit b3bee3a

File tree

10 files changed

+285
-60
lines changed

10 files changed

+285
-60
lines changed

ydb/core/kqp/common/kqp_tx.cpp

+11
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
140140

141141
size_t readPhases = 0;
142142
bool hasEffects = false;
143+
bool hasSourceRead = false;
144+
bool hasSinkWrite = false;
143145

144146
for (const auto &tx : physicalQuery.GetTransactions()) {
145147
switch (tx.GetType()) {
@@ -155,13 +157,22 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
155157
if (tx.GetHasEffects()) {
156158
hasEffects = true;
157159
}
160+
161+
for (const auto &stage : tx.GetStages()) {
162+
hasSourceRead |= !stage.GetSources().empty();
163+
hasSinkWrite |= !stage.GetSinks().empty();
164+
}
158165
}
159166

160167
if (txCtx.HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
161168
YQL_ENSURE(txCtx.EnableImmediateEffects);
162169
return true;
163170
}
164171

172+
if (hasSourceRead && hasSinkWrite) {
173+
return true;
174+
}
175+
165176
// We don't want snapshot when there are effects at the moment,
166177
// because it hurts performance when there are multiple single-shard
167178
// reads and a single distributed commit. Taking snapshot costs

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -1605,6 +1605,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16051605

16061606
void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap)
16071607
{
1608+
YQL_ENSURE(!UseEvWrite);
16081609
TShardState shardState;
16091610
shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
16101611
shardState.DatashardState.ConstructInPlace();
@@ -1652,7 +1653,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16521653

16531654
std::unique_ptr<IEventBase> ev;
16541655
if (isOlap) {
1655-
YQL_ENSURE(!UseEvWrite);
16561656
const ui32 flags =
16571657
(ImmediateTx ? NKikimrTxColumnShard::ETransactionFlag::TX_FLAG_IMMEDIATE: 0);
16581658
ev.reset(new TEvColumnShard::TEvProposeTransaction(
@@ -1984,10 +1984,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
19841984
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
19851985
YQL_ENSURE(evWriteTxs.empty() || datashardTxs.empty());
19861986

1987-
// Single-shard transactions are always immediate
1988-
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + Request.TopicOperations.GetSize() + sourceScanPartitionsCount) <= 1
1987+
// Single-shard datashard transactions are always immediate
1988+
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + Request.TopicOperations.GetSize() + sourceScanPartitionsCount) == 1
19891989
&& !UnknownAffectedShardCount
1990-
&& evWriteTxs.empty();
1990+
&& evWriteTxs.empty()
1991+
&& !HasOlapTable;
19911992

19921993
switch (Request.IsolationLevel) {
19931994
// OnlineRO with AllowInconsistentReads = true

ydb/core/kqp/runtime/kqp_read_actor.cpp

+5-7
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,11 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
568568

569569
auto keyDesc = std::move(request->ResultSet[0].KeyDescription);
570570

571-
if (keyDesc->GetPartitions().size() == 1) {
571+
if (keyDesc->GetPartitions().empty()) {
572+
TString error = TStringBuilder() << "No partitions to read from '" << Settings->GetTable().GetTablePath() << "'";
573+
CA_LOG_E(error);
574+
return RuntimeError(error, NDqProto::StatusIds::SCHEME_ERROR);
575+
} else if (keyDesc->GetPartitions().size() == 1) {
572576
auto& partition = keyDesc->GetPartitions()[0];
573577
if (partition.ShardId == state->TabletId) {
574578
// we re-resolved the same shard
@@ -589,12 +593,6 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
589593
return RuntimeError("Inconsistent reads after shards split", NDqProto::StatusIds::UNAVAILABLE);
590594
}
591595

592-
if (keyDesc->GetPartitions().empty()) {
593-
TString error = TStringBuilder() << "No partitions to read from '" << Settings->GetTable().GetTablePath() << "'";
594-
CA_LOG_E(error);
595-
return RuntimeError(error, NDqProto::StatusIds::SCHEME_ERROR);
596-
}
597-
598596
const auto& tr = *AppData()->TypeRegistry;
599597

600598
TVector<THolder<TShardState>> newShards;

ydb/core/kqp/runtime/kqp_write_actor.cpp

+73-23
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ namespace {
5959
public:
6060
struct TInFlightBatch {
6161
TString Data;
62-
ui32 SendAttempts = 0;
6362
ui64 Cookie = 0;
63+
ui32 SendAttempts = 0;
6464
};
6565

6666
size_t Size() const {
@@ -112,8 +112,8 @@ namespace {
112112
YQL_ENSURE(!IsClosed());
113113
Batches.push_back(TInFlightBatch{
114114
.Data = std::move(data),
115-
.SendAttempts = 0,
116115
.Cookie = ++NextCookie,
116+
.SendAttempts = 0,
117117
});
118118
Memory += Batches.back().Data.size();
119119
}
@@ -199,7 +199,7 @@ namespace {
199199
i64 Memory = 0;
200200
};
201201

202-
constexpr i64 kInFlightMemoryLimitPerActor = 100_MB;
202+
constexpr i64 kInFlightMemoryLimitPerActor = 64_MB;
203203
}
204204

205205

@@ -293,10 +293,6 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
293293
const i64 result = Serializer
294294
? MemoryLimit - Serializer->GetMemory() - ShardsInfo.GetMemory()
295295
: std::numeric_limits<i64>::min(); // Can't use zero here because compute can use overcommit!
296-
297-
if (result <= 0) {
298-
CA_LOG_D("No free space left. FreeSpace=" << result << " bytes.");
299-
}
300296
return result;
301297
}
302298

@@ -329,19 +325,22 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
329325
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
330326
}
331327

332-
TResumeNotificationManager resumeNotificator(*this);
333-
for (auto& [shardId, batches] : Serializer->FlushBatches()) {
334-
for (auto& batch : batches) {
335-
ShardsInfo.GetShard(shardId).PushBatch(std::move(batch));
328+
if (Finished || GetFreeSpace() <= 0 || SchemeEntry->Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable) {
329+
TResumeNotificationManager resumeNotificator(*this);
330+
for (auto& [shardId, batches] : Serializer->FlushBatchesForce()) {
331+
for (auto& batch : batches) {
332+
ShardsInfo.GetShard(shardId).PushBatch(std::move(batch));
333+
}
336334
}
335+
resumeNotificator.CheckMemory();
337336
}
338-
resumeNotificator.CheckMemory();
339-
YQL_ENSURE(!Finished || Serializer->IsFinished());
340337

341338
if (Finished) {
342339
for (auto& [shardId, shardInfo] : ShardsInfo.GetShards()) {
343340
shardInfo.Close();
344341
}
342+
343+
YQL_ENSURE(Serializer->IsFinished());
345344
}
346345

347346
ProcessBatches();
@@ -458,6 +457,10 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
458457

459458
switch (ev->Get()->GetStatus()) {
460459
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
460+
CA_LOG_E("Got UNSPECIFIED for table `"
461+
<< SchemeEntry->TableId.PathId.ToString() << "`."
462+
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
463+
<< " Sink=" << this->SelfId() << ".");
461464
RuntimeError(
462465
TStringBuilder() << "Got UNSPECIFIED for table `"
463466
<< SchemeEntry->TableId.PathId.ToString() << "`.",
@@ -473,6 +476,10 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
473476
return;
474477
}
475478
case NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED: {
479+
CA_LOG_E("Got ABORTED for table `"
480+
<< SchemeEntry->TableId.PathId.ToString() << "`."
481+
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
482+
<< " Sink=" << this->SelfId() << ".");
476483
RuntimeError(
477484
TStringBuilder() << "Got ABORTED for table `"
478485
<< SchemeEntry->TableId.PathId.ToString() << "`.",
@@ -481,6 +488,10 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
481488
return;
482489
}
483490
case NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR: {
491+
CA_LOG_E("Got INTERNAL ERROR for table `"
492+
<< SchemeEntry->TableId.PathId.ToString() << "`."
493+
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
494+
<< " Sink=" << this->SelfId() << ".");
484495
RuntimeError(
485496
TStringBuilder() << "Got INTERNAL ERROR for table `"
486497
<< SchemeEntry->TableId.PathId.ToString() << "`.",
@@ -489,12 +500,18 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
489500
return;
490501
}
491502
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
492-
CA_LOG_D("Got OVERLOADED for table `"
493-
<< SchemeEntry->TableId.PathId.ToString() << "`. "
494-
<< "Ignored this error.");
503+
CA_LOG_W("Got OVERLOADED for table `"
504+
<< SchemeEntry->TableId.PathId.ToString() << "`."
505+
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
506+
<< " Sink=" << this->SelfId() << "."
507+
<< " Ignored this error.");
495508
return;
496509
}
497510
case NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED: {
511+
CA_LOG_E("Got CANCELLED for table `"
512+
<< SchemeEntry->TableId.PathId.ToString() << "`."
513+
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
514+
<< " Sink=" << this->SelfId() << ".");
498515
RuntimeError(
499516
TStringBuilder() << "Got CANCELLED for table `"
500517
<< SchemeEntry->TableId.PathId.ToString() << "`.",
@@ -503,6 +520,10 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
503520
return;
504521
}
505522
case NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST: {
523+
CA_LOG_E("Got BAD REQUEST for table `"
524+
<< SchemeEntry->TableId.PathId.ToString() << "`."
525+
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
526+
<< " Sink=" << this->SelfId() << ".");
506527
RuntimeError(
507528
TStringBuilder() << "Got BAD REQUEST for table `"
508529
<< SchemeEntry->TableId.PathId.ToString() << "`.",
@@ -511,6 +532,10 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
511532
return;
512533
}
513534
case NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED: {
535+
CA_LOG_E("Got SCHEME CHANGED for table `"
536+
<< SchemeEntry->TableId.PathId.ToString() << "`."
537+
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
538+
<< " Sink=" << this->SelfId() << ".");
514539
RuntimeError(
515540
TStringBuilder() << "Got SCHEME CHANGED for table `"
516541
<< SchemeEntry->TableId.PathId.ToString() << "`.",
@@ -519,6 +544,10 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
519544
return;
520545
}
521546
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
547+
CA_LOG_E("Got LOCKS BROKEN for table `"
548+
<< SchemeEntry->TableId.PathId.ToString() << "`."
549+
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
550+
<< " Sink=" << this->SelfId() << ".");
522551
RuntimeError(
523552
TStringBuilder() << "Got LOCKS BROKEN for table `"
524553
<< SchemeEntry->TableId.PathId.ToString() << "`.",
@@ -557,15 +586,28 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
557586
}
558587

559588
void ProcessBatches() {
589+
MakeNewBatches();
560590
SendBatchesToShards();
561-
if (ShardsInfo.IsFinished()) {
591+
if (Finished && Serializer->IsFinished() && ShardsInfo.IsFinished()) {
562592
CA_LOG_D("Write actor finished");
563593
Callbacks->OnAsyncOutputFinished(GetOutputIndex());
564594
}
565595
}
566596

597+
void MakeNewBatches() {
598+
for (const size_t shardId : Serializer->GetShardIds()) {
599+
auto& shard = ShardsInfo.GetShard(shardId);
600+
if (shard.IsEmpty()) {
601+
auto batch = Serializer->FlushBatch(shardId);
602+
if (!batch.empty()) {
603+
shard.PushBatch(std::move(batch));
604+
}
605+
}
606+
}
607+
}
608+
567609
void SendBatchesToShards() {
568-
for (size_t shardId : ShardsInfo.GetPendingShards()) {
610+
for (const size_t shardId : ShardsInfo.GetPendingShards()) {
569611
auto& shard = ShardsInfo.GetShard(shardId);
570612
YQL_ENSURE(!shard.IsEmpty());
571613
SendDataToShard(shardId);
@@ -577,14 +619,20 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
577619
YQL_ENSURE(!shard.IsEmpty());
578620
auto& inFlightBatch = shard.CurrentBatch();
579621
if (inFlightBatch.SendAttempts >= BackoffSettings()->MaxWriteAttempts) {
622+
CA_LOG_E("ShardId=" << shardId
623+
<< " for table '" << Settings.GetTable().GetPath()
624+
<< "': retry limit exceeded."
625+
<< " Sink=" << this->SelfId() << ".");
580626
RuntimeError(
581-
TStringBuilder() << "ShardId=" << shardId << " for table '" << Settings.GetTable().GetPath() << "': retry limit exceeded",
627+
TStringBuilder()
628+
<< "ShardId=" << shardId
629+
<< " for table '" << Settings.GetTable().GetPath()
630+
<< "': retry limit exceeded.",
582631
NYql::NDqProto::StatusIds::UNAVAILABLE);
583632
return;
584633
}
585634

586635
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(
587-
std::get<ui64>(TxId),
588636
NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
589637
YQL_ENSURE(!inFlightBatch.Data.empty());
590638

@@ -608,7 +656,8 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
608656
CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", TxId=" << std::get<ui64>(TxId)
609657
<< ", LockTxId=" << Settings.GetLockTxId() << ", LockNodeId=" << Settings.GetLockNodeId()
610658
<< ", Size=" << inFlightBatch.Data.size() << ", Cookie=" << inFlightBatch.Cookie
611-
<< "; ShardBatchesLeft=" << shard.Size() << ", ShardClosed=" << shard.IsClosed());
659+
<< "; ShardBatchesLeft=" << shard.Size() << ", ShardClosed=" << shard.IsClosed()
660+
<< "; Attempts=" << inFlightBatch.SendAttempts);
612661
Send(
613662
PipeCacheId,
614663
new TEvPipeCache::TEvForward(evWrite.release(), shardId, true),
@@ -630,16 +679,17 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
630679
return;
631680
}
632681

633-
CA_LOG_D("Retry ShardID=" << shardId);
682+
CA_LOG_T("Retry ShardID=" << shardId << " with Cookie=" << ifCookieEqual.value_or(0));
634683
SendDataToShard(shardId);
635684
}
636685

637686
void Handle(TEvPrivate::TEvShardRequestTimeout::TPtr& ev) {
687+
CA_LOG_W("Timeout shardID=" << ev->Get()->ShardId);
638688
RetryShard(ev->Get()->ShardId, ev->Cookie);
639689
}
640690

641691
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
642-
CA_LOG_D("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId);
692+
CA_LOG_W("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId);
643693
RetryShard(ev->Get()->TabletId, std::nullopt);
644694
}
645695

0 commit comments

Comments
 (0)