Skip to content

Commit f714ab4

Browse files
authored
fix hanging read actor in case of exhausted free space (#2868)
1 parent 61bf0b9 commit f714ab4

File tree

1 file changed

+26
-11
lines changed

1 file changed

+26
-11
lines changed

ydb/core/kqp/runtime/kqp_read_actor.cpp

+26-11
Original file line numberDiff line numberDiff line change
@@ -1136,7 +1136,7 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
11361136
NMiniKQL::WriteColumnValuesFromArrow(editAccessors, NMiniKQL::TBatchDataAccessor(result->Get()->GetArrowBatch()), columnIndex, resultColumnIndex, column.TypeInfo)
11371137
);
11381138
if (column.NotNull) {
1139-
std::shared_ptr<arrow::Array> columnSharedPtr = result->Get()->GetArrowBatch()->column(columnIndex);
1139+
std::shared_ptr<arrow::Array> columnSharedPtr = result->Get()->GetArrowBatch()->column(columnIndex);
11401140
bool gotNullValue = false;
11411141
for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
11421142
if (columnSharedPtr->IsNull(rowIndex)) {
@@ -1181,9 +1181,14 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
11811181
}
11821182

11831183
NMiniKQL::TBytesStatistics PackCells(TResult& handle, i64& freeSpace) {
1184-
auto& [shardId, result, batch, _, packed] = handle;
1184+
auto& [shardId, result, batch, processedRows, packed] = handle;
11851185
NMiniKQL::TBytesStatistics stats;
11861186
batch->reserve(batch->size());
1187+
CA_LOG_D(TStringBuilder() << "enter pack cells method "
1188+
<< " shardId: " << shardId
1189+
<< " processedRows: " << processedRows
1190+
<< " packed rows: " << packed
1191+
<< " freeSpace: " << freeSpace);
11871192

11881193
for (size_t rowIndex = packed; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
11891194
const auto& row = result->Get()->GetCells(rowIndex);
@@ -1225,6 +1230,12 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
12251230
break;
12261231
}
12271232
}
1233+
1234+
CA_LOG_D(TStringBuilder() << "exit pack cells method "
1235+
<< " shardId: " << shardId
1236+
<< " processedRows: " << processedRows
1237+
<< " packed rows: " << packed
1238+
<< " freeSpace: " << freeSpace);
12281239
return stats;
12291240
}
12301241

@@ -1246,7 +1257,9 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
12461257

12471258
YQL_ENSURE(!resultBatch.IsWide(), "Wide stream is not supported");
12481259

1249-
CA_LOG_D(TStringBuilder() << " enter getasyncinputdata results size " << Results.size());
1260+
CA_LOG_D(TStringBuilder() << " enter getasyncinputdata results size " << Results.size()
1261+
<< ", freeSpace " << freeSpace);
1262+
12501263
ui64 bytes = 0;
12511264
while (!Results.empty()) {
12521265
auto& result = Results.front();
@@ -1255,14 +1268,15 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
12551268
auto& msg = *result.ReadResult->Get();
12561269
if (!batch.Defined()) {
12571270
batch.ConstructInPlace();
1258-
switch (msg.Record.GetResultFormat()) {
1259-
case NKikimrDataEvents::FORMAT_ARROW:
1260-
BytesStats.AddStatistics(PackArrow(result, freeSpace));
1261-
break;
1262-
case NKikimrDataEvents::FORMAT_UNSPECIFIED:
1263-
case NKikimrDataEvents::FORMAT_CELLVEC:
1264-
BytesStats.AddStatistics(PackCells(result, freeSpace));
1265-
}
1271+
}
1272+
1273+
switch (msg.Record.GetResultFormat()) {
1274+
case NKikimrDataEvents::FORMAT_ARROW:
1275+
BytesStats.AddStatistics(PackArrow(result, freeSpace));
1276+
break;
1277+
case NKikimrDataEvents::FORMAT_UNSPECIFIED:
1278+
case NKikimrDataEvents::FORMAT_CELLVEC:
1279+
BytesStats.AddStatistics(PackCells(result, freeSpace));
12661280
}
12671281

12681282
auto id = result.ReadResult->Get()->Record.GetReadId();
@@ -1334,6 +1348,7 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
13341348

13351349
CA_LOG_D(TStringBuilder() << "returned async data"
13361350
<< " processed rows " << ProcessedRowCount
1351+
<< " left freeSpace " << freeSpace
13371352
<< " received rows " << ReceivedRowCount
13381353
<< " running reads " << RunningReads()
13391354
<< " pending shards " << PendingShards.Size()

0 commit comments

Comments
 (0)