Skip to content

Commit 0bf5166

Browse files
authored
[Stable-24-3] Fixes for sinks (#10655)
1 parent e742895 commit 0bf5166

File tree

13 files changed

+167
-42
lines changed

13 files changed

+167
-42
lines changed

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,18 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
814814
DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true);
815815
IteratorDeliveryProblems = KqpGroup->GetCounter("IteratorReads/DeliveryProblems", true);
816816

817+
/* sink writes */
818+
WriteActorsShardResolve = KqpGroup->GetCounter("SinkWrites/WriteActorShardResolve", true);
819+
WriteActorsCount = KqpGroup->GetCounter("SinkWrites/WriteActorsCount", false);
820+
WriteActorImmediateWrites = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWrites", true);
821+
WriteActorImmediateWritesRetries = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWritesRetries", true);
822+
WriteActorWritesSizeHistogram =
823+
KqpGroup->GetHistogram("SinkWrites/WriteActorWritesSize", NMonitoring::ExponentialHistogram(28, 2, 1));
824+
WriteActorWritesOperationsHistogram =
825+
KqpGroup->GetHistogram("SinkWrites/WriteActorWritesOperations", NMonitoring::ExponentialHistogram(20, 2, 1));
826+
WriteActorWritesLatencyHistogram =
827+
KqpGroup->GetHistogram("SinkWrites/WriteActorWritesLatencyMs", NMonitoring::ExponentialHistogram(20, 2, 1));
828+
817829
/* sequencers */
818830

819831
SequencerActorsCount = KqpGroup->GetCounter("Sequencer/ActorCount", false);

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,15 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
409409
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages;
410410
::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems;
411411

412+
// Sink write counters
413+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorsShardResolve;
414+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorsCount;
415+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWrites;
416+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWritesRetries;
417+
NMonitoring::THistogramPtr WriteActorWritesSizeHistogram;
418+
NMonitoring::THistogramPtr WriteActorWritesOperationsHistogram;
419+
NMonitoring::THistogramPtr WriteActorWritesLatencyHistogram;
420+
412421
// Scheduler signals
413422
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerThrottled;
414423
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerCapacity;

ydb/core/kqp/expr_nodes/kqp_expr_nodes.json

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,10 +546,9 @@
546546
"Match": {"Type": "Callable", "Name": "KqpTableSinkSettings"},
547547
"Children": [
548548
{"Index": 0, "Name": "Table", "Type": "TKqpTable"},
549-
{"Index": 1, "Name": "Columns", "Type": "TCoAtomList"},
550-
{"Index": 2, "Name": "InconsistentWrite", "Type": "TCoAtom"},
551-
{"Index": 3, "Name": "Mode", "Type": "TCoAtom"},
552-
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
549+
{"Index": 1, "Name": "InconsistentWrite", "Type": "TCoAtom"},
550+
{"Index": 2, "Name": "Mode", "Type": "TCoAtom"},
551+
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
553552
]
554553
},
555554
{

ydb/core/kqp/opt/kqp_opt_effects.cpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ TCoAtomList BuildKeyColumnsList(const TKikimrTableDescription& table, TPositionH
231231
.Done();
232232
}
233233

234-
TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const TCoAtomList& columns,
234+
TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table,
235235
const bool allowInconsistentWrites, const TStringBuf mode, TExprContext& ctx) {
236236
Y_DEBUG_ABORT_UNLESS(IsDqPureExpr(expr));
237237

@@ -253,7 +253,6 @@ TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const
253253
.Index().Value("0").Build()
254254
.Settings<TKqpTableSinkSettings>()
255255
.Table(table)
256-
.Columns(columns)
257256
.InconsistentWrite(allowInconsistentWrites
258257
? ctx.NewAtom(expr.Pos(), "true")
259258
: ctx.NewAtom(expr.Pos(), "false"))
@@ -311,7 +310,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
311310
if (IsDqPureExpr(node.Input())) {
312311
if (sinkEffect) {
313312
stageInput = RebuildPureStageWithSink(
314-
node.Input(), node.Table(), node.Columns(),
313+
node.Input(), node.Table(),
315314
settings.AllowInconsistentWrites, settings.Mode, ctx);
316315
effect = Build<TKqpSinkEffect>(ctx, node.Pos())
317316
.Stage(stageInput.Cast().Ptr())
@@ -349,7 +348,6 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
349348
.Index().Value("0").Build()
350349
.Settings<TKqpTableSinkSettings>()
351350
.Table(node.Table())
352-
.Columns(node.Columns())
353351
.InconsistentWrite(settings.AllowInconsistentWrites
354352
? ctx.NewAtom(node.Pos(), "true")
355353
: ctx.NewAtom(node.Pos(), "false"))
@@ -459,7 +457,7 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
459457
if (IsDqPureExpr(node.Input())) {
460458
if (sinkEffect) {
461459
const auto keyColumns = BuildKeyColumnsList(table, node.Pos(), ctx);
462-
stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), keyColumns, false, "delete", ctx);
460+
stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), false, "delete", ctx);
463461
effect = Build<TKqpSinkEffect>(ctx, node.Pos())
464462
.Stage(stageInput.Cast().Ptr())
465463
.SinkIndex().Build("0")
@@ -486,7 +484,6 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
486484
auto input = dqUnion.Output().Stage().Program().Body();
487485

488486
if (sinkEffect) {
489-
const auto keyColumns = BuildKeyColumnsList(table, node.Pos(), ctx);
490487
auto sink = Build<TDqSink>(ctx, node.Pos())
491488
.DataSink<TKqpTableSink>()
492489
.Category(ctx.NewAtom(node.Pos(), NYql::KqpTableSinkName))
@@ -495,7 +492,6 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
495492
.Index().Value("0").Build()
496493
.Settings<TKqpTableSinkSettings>()
497494
.Table(node.Table())
498-
.Columns(keyColumns)
499495
.InconsistentWrite(ctx.NewAtom(node.Pos(), "false"))
500496
.Mode(ctx.NewAtom(node.Pos(), "delete"))
501497
.Settings()

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,15 @@ void FillTablesMap(const TKqpTable& table, const TCoAtomList& columns,
141141
THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
142142
{
143143
FillTablesMap(table, tablesMap);
144+
for (const auto& column : columns) {
145+
tablesMap[table.Path()].emplace(column);
146+
}
147+
}
148+
149+
void FillTablesMap(const TKqpTable& table, const TVector<TStringBuf>& columns,
150+
THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
151+
{
152+
FillTablesMap(table, tablesMap);
144153

145154
for (const auto& column : columns) {
146155
tablesMap[table.Path()].emplace(column);
@@ -763,7 +772,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
763772
YQL_ENSURE(maybeSinkNode);
764773
auto sinkNode = maybeSinkNode.Cast();
765774
auto* sinkProto = stageProto.AddSinks();
766-
FillSink(sinkNode, sinkProto, tablesMap, ctx);
775+
FillSink(sinkNode, sinkProto, tablesMap, stage, ctx);
767776
sinkProto->SetOutputIndex(FromString(TStringBuf(sinkNode.Index())));
768777

769778
if (IsTableSink(sinkNode.DataSink().Cast<TCoDataSink>().Category())) {
@@ -1029,23 +1038,38 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
10291038
}
10301039
}
10311040

1032-
void FillKqpSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) {
1041+
void FillKqpSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, const TDqPhyStage& stage) {
10331042
if (auto settings = sink.Settings().Maybe<TKqpTableSinkSettings>()) {
10341043
NKqpProto::TKqpInternalSink& internalSinkProto = *protoSink->MutableInternalSink();
10351044
internalSinkProto.SetType(TString(NYql::KqpTableSinkName));
10361045
NKikimrKqp::TKqpTableSinkSettings settingsProto;
1037-
FillTablesMap(settings.Table().Cast(), settings.Columns().Cast(), tablesMap);
1046+
1047+
const auto& tupleType = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>();
1048+
YQL_ENSURE(tupleType);
1049+
YQL_ENSURE(tupleType->GetSize() == 1);
1050+
const auto& listType = tupleType->GetItems()[0]->Cast<TListExprType>();
1051+
YQL_ENSURE(listType);
1052+
const auto& structType = listType->GetItemType()->Cast<TStructExprType>();
1053+
YQL_ENSURE(structType);
1054+
1055+
TVector<TStringBuf> columns;
1056+
columns.reserve(structType->GetSize());
1057+
for (const auto& item : structType->GetItems()) {
1058+
columns.emplace_back(item->GetName());
1059+
}
1060+
1061+
FillTablesMap(settings.Table().Cast(), columns, tablesMap);
10381062
FillTableId(settings.Table().Cast(), *settingsProto.MutableTable());
10391063

10401064
const auto tableMeta = TablesData->ExistingTable(Cluster, settings.Table().Cast().Path()).Metadata;
10411065

10421066
for (const auto& columnName : tableMeta->KeyColumnNames) {
10431067
const auto columnMeta = tableMeta->Columns.FindPtr(columnName);
1044-
YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + columnName + "\"");
1068+
YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + TString(columnName) + "\"");
10451069

10461070
auto keyColumnProto = settingsProto.AddKeyColumns();
10471071
keyColumnProto->SetId(columnMeta->Id);
1048-
keyColumnProto->SetName(columnName);
1072+
keyColumnProto->SetName(TString(columnName));
10491073
keyColumnProto->SetTypeId(columnMeta->TypeInfo.GetTypeId());
10501074

10511075
if (columnMeta->TypeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
@@ -1055,14 +1079,13 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
10551079
}
10561080
}
10571081

1058-
for (const auto& column : settings.Columns().Cast()) {
1059-
const auto columnName = column.StringValue();
1082+
for (const auto& columnName : columns) {
10601083
const auto columnMeta = tableMeta->Columns.FindPtr(columnName);
1061-
YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + columnName + "\"");
1084+
YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + TString(columnName) + "\"");
10621085

10631086
auto columnProto = settingsProto.AddColumns();
10641087
columnProto->SetId(columnMeta->Id);
1065-
columnProto->SetName(columnName);
1088+
columnProto->SetName(TString(columnName));
10661089
columnProto->SetTypeId(columnMeta->TypeInfo.GetTypeId());
10671090

10681091
if (columnMeta->TypeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
@@ -1102,11 +1125,11 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
11021125
|| dataSinkCategory == NYql::KqpTableSinkName;
11031126
}
11041127

1105-
void FillSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, TExprContext& ctx) {
1128+
void FillSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, const TDqPhyStage& stage, TExprContext& ctx) {
11061129
Y_UNUSED(ctx);
11071130
const TStringBuf dataSinkCategory = sink.DataSink().Cast<TCoDataSink>().Category();
11081131
if (IsTableSink(dataSinkCategory)) {
1109-
FillKqpSink(sink, protoSink, tablesMap);
1132+
FillKqpSink(sink, protoSink, tablesMap, stage);
11101133
} else {
11111134
// Delegate sink filling to dq integration of specific provider
11121135
const auto provider = TypesCtx.DataSinkMap.find(dataSinkCategory);

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <ydb/core/tx/tx.h>
2020
#include <ydb/library/actors/core/actorsystem.h>
2121
#include <ydb/library/actors/core/interconnect.h>
22+
#include <ydb/library/wilson_ids/wilson.h>
2223
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
2324
#include <ydb/library/yql/public/issue/yql_issue_message.h>
2425

@@ -134,10 +135,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
134135
, InconsistentTx(
135136
Settings.GetInconsistentTx())
136137
, MemoryLimit(MessageSettings.InFlightMemoryLimitPerActorBytes)
138+
, WriteActorSpan(TWilsonKqp::WriteActor, NWilson::TTraceId(args.TraceId), "WriteActor")
137139
{
138140
YQL_ENSURE(std::holds_alternative<ui64>(TxId));
139141
YQL_ENSURE(!ImmediateTx);
140142
EgressStats.Level = args.StatsLevel;
143+
144+
Counters->WriteActorsCount->Inc();
141145
}
142146

143147
void Bootstrap() {
@@ -244,6 +248,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
244248
}
245249

246250
void ResolveTable() {
251+
Counters->WriteActorsShardResolve->Inc();
247252
SchemeEntry.reset();
248253
SchemeRequest.reset();
249254

@@ -267,8 +272,11 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
267272
entry.ShowPrivatePath = true;
268273
request->ResultSet.emplace_back(entry);
269274

270-
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
271-
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request));
275+
WriteActorStateSpan = NWilson::TSpan(TWilsonKqp::WriteActorTableNavigate, WriteActorSpan.GetTraceId(),
276+
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);
277+
278+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}), 0, 0, WriteActorSpan.GetTraceId());
279+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, 0, WriteActorSpan.GetTraceId());
272280
}
273281

274282
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
@@ -327,7 +335,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
327335
request->ResultSet.emplace_back(std::move(keyRange));
328336

329337
TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request));
330-
Send(MakeSchemeCacheID(), resolveReq.Release(), 0, 0);
338+
Send(MakeSchemeCacheID(), resolveReq.Release(), 0, 0, WriteActorSpan.GetTraceId());
331339
}
332340

333341
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
@@ -368,6 +376,8 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
368376
}()
369377
<< ", Cookie=" << ev->Cookie);
370378

379+
380+
371381
switch (ev->Get()->GetStatus()) {
372382
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
373383
CA_LOG_E("Got UNSPECIFIED for table `"
@@ -542,6 +552,11 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
542552
EgressStats.Chunks++;
543553
EgressStats.Splits++;
544554
EgressStats.Resume();
555+
556+
if (auto it = SendTime.find(shardId); it != std::end(SendTime)) {
557+
Counters->WriteActorWritesLatencyHistogram->Collect((TInstant::Now() - it->second).MilliSeconds());
558+
SendTime.erase(it);
559+
}
545560
}
546561
resumeNotificator.CheckMemory();
547562
}
@@ -579,7 +594,6 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
579594
NYql::NDqProto::StatusIds::UNAVAILABLE);
580595
return;
581596
}
582-
583597
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(
584598
NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
585599

@@ -613,6 +627,16 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
613627
ShardedWriteController->GetDataFormat());
614628
}
615629

630+
if (metadata->SendAttempts == 0) {
631+
Counters->WriteActorImmediateWrites->Inc();
632+
Counters->WriteActorWritesSizeHistogram->Collect(serializationResult.TotalDataSize);
633+
Counters->WriteActorWritesOperationsHistogram->Collect(metadata->OperationsCount);
634+
635+
SendTime[shardId] = TInstant::Now();
636+
} else {
637+
Counters->WriteActorImmediateWritesRetries->Inc();
638+
}
639+
616640
CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", TxId=" << evWrite->Record.GetTxId()
617641
<< ", TxMode=" << evWrite->Record.GetTxMode()
618642
<< ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId()
@@ -708,6 +732,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
708732
NYql::TIssues issues;
709733
issues.AddIssue(std::move(issue));
710734

735+
if (WriteActorStateSpan) {
736+
WriteActorStateSpan.EndError(issues.ToOneLineString());
737+
}
738+
if (WriteActorSpan) {
739+
WriteActorSpan.EndError(issues.ToOneLineString());
740+
}
741+
711742
Callbacks->OnAsyncOutputError(OutputIndex, std::move(issues), statusCode);
712743
}
713744

@@ -717,6 +748,8 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
717748
}
718749

719750
void Prepare() {
751+
WriteActorStateSpan.EndOk();
752+
720753
YQL_ENSURE(SchemeEntry);
721754
ResolveAttempts = 0;
722755

@@ -788,12 +821,16 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
788821
std::optional<NSchemeCache::TSchemeCacheRequest::TEntry> SchemeRequest;
789822
ui64 ResolveAttempts = 0;
790823

824+
THashMap<ui64, TInstant> SendTime;
791825
THashMap<ui64, TLockInfo> LocksInfo;
792826
bool Finished = false;
793827

794828
const i64 MemoryLimit;
795829

796830
IShardedWriteControllerPtr ShardedWriteController = nullptr;
831+
832+
NWilson::TSpan WriteActorSpan;
833+
NWilson::TSpan WriteActorStateSpan;
797834
};
798835

799836
void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) {

ydb/core/kqp/runtime/kqp_write_table.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -893,14 +893,17 @@ class TShardsInfo {
893893

894894
void MakeNextBatches(i64 maxDataSize, ui64 maxCount) {
895895
YQL_ENSURE(BatchesInFlight == 0);
896+
YQL_ENSURE(!IsEmpty());
896897
i64 dataSize = 0;
898+
// For columnshard batch can be slightly larger than the limit.
897899
while (BatchesInFlight < maxCount
898900
&& BatchesInFlight < Batches.size()
899-
&& dataSize + GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize) {
901+
&& (dataSize + GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize || BatchesInFlight == 0)) {
900902
dataSize += GetBatch(BatchesInFlight)->GetMemory();
901903
++BatchesInFlight;
902904
}
903-
YQL_ENSURE(BatchesInFlight == Batches.size() || GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize);
905+
YQL_ENSURE(BatchesInFlight != 0);
906+
YQL_ENSURE(BatchesInFlight == maxCount || BatchesInFlight == Batches.size() || dataSize + GetBatch(BatchesInFlight)->GetMemory() >= maxDataSize);
904907
}
905908

906909
const IPayloadSerializer::IBatchPtr& GetBatch(size_t index) const {
@@ -1204,7 +1207,9 @@ class TShardedWriteController : public IShardedWriteController {
12041207
if (force) {
12051208
for (auto& [shardId, batches] : Serializer->FlushBatchesForce()) {
12061209
for (auto& batch : batches) {
1207-
ShardsInfo.GetShard(shardId).PushBatch(std::move(batch));
1210+
if (batch && !batch->IsEmpty()) {
1211+
ShardsInfo.GetShard(shardId).PushBatch(std::move(batch));
1212+
}
12081213
}
12091214
}
12101215
} else {

0 commit comments

Comments
 (0)