Skip to content

Commit 3aab8ed

Browse files
authored
YQ-3342 fix script results writing (#5682)
1 parent a7d7c9d commit 3aab8ed

File tree

1 file changed

+97
-61
lines changed

1 file changed

+97
-61
lines changed

ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp

Lines changed: 97 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ namespace NKikimr::NKqp {
3131
namespace {
3232

3333
constexpr ui32 LEASE_UPDATE_FREQUENCY = 2;
34-
constexpr ui32 MAX_SAVE_RESULT_IN_FLIGHT = 1;
34+
35+
constexpr ui64 MIN_SAVE_RESULT_BATCH_SIZE = 5_MB;
36+
constexpr i32 MIN_SAVE_RESULT_BATCH_ROWS = 5000;
37+
constexpr ui64 RUN_SCRIPT_ACTOR_BUFFER_SIZE = 40_MB;
3538

3639
class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
3740
enum class ERunState {
@@ -48,14 +51,20 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
4851
UpdateLeaseEvent,
4952
};
5053

51-
struct TPendingSaveResult {
52-
ui32 ResultSetIndex;
53-
ui64 FirstRow;
54-
ui64 AccumulatedSize;
55-
Ydb::ResultSet ResultSet;
54+
struct TResultSetInfo {
55+
bool Truncated = false;
56+
ui64 RowCount = 0;
57+
ui64 ByteCount = 0;
58+
NJson::TJsonValue* Meta;
59+
60+
ui64 FirstRowId = 0;
61+
ui64 AccumulatedSize = 0;
62+
Ydb::ResultSet PendingResult;
63+
};
5664

65+
struct TPendingAck {
5766
TActorId ReplyActorId;
58-
THolder<TEvKqpExecuter::TEvStreamDataAck> SaveResultResponse;
67+
THolder<TEvKqpExecuter::TEvStreamDataAck> AckEvent;
5968
};
6069

6170
public:
@@ -255,29 +264,55 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
255264
PassAway();
256265
}
257266

258-
void SendStreamDataResponse(TActorId replyActorId, THolder<TEvKqpExecuter::TEvStreamDataAck> saveResultResponse) const {
259-
LOG_D("Send stream data ack"
260-
<< ", seqNo: " << saveResultResponse->Record.GetSeqNo()
261-
<< ", to: " << replyActorId);
267+
void SendStreamDataResponse() {
268+
if (PendingAcks.empty()) {
269+
return;
270+
}
271+
272+
if (PendingResultSetsSize > RUN_SCRIPT_ACTOR_BUFFER_SIZE) {
273+
// Try to save any pending result
274+
SaveResult();
275+
}
262276

263-
Send(replyActorId, saveResultResponse.Release());
277+
if (PendingResultSetsSize <= RUN_SCRIPT_ACTOR_BUFFER_SIZE) {
278+
while (!PendingAcks.empty()) {
279+
auto response = std::move(PendingAcks.front());
280+
PendingAcks.pop();
281+
282+
LOG_D("Send stream data ack"
283+
<< ", seqNo: " << response.AckEvent->Record.GetSeqNo()
284+
<< ", to: " << response.ReplyActorId);
285+
286+
Send(response.ReplyActorId, response.AckEvent.Release());
287+
}
288+
}
264289
}
265290

266-
void SaveResult() {
267-
if (SaveResultInflight >= MAX_SAVE_RESULT_IN_FLIGHT || PendingSaveResults.empty()) {
291+
void SaveResult(size_t resultSetId) {
292+
if (SaveResultInflight) {
268293
return;
269294
}
270295

271296
if (!ExpireAt && ResultsTtl > TDuration::Zero()) {
272297
ExpireAt = TInstant::Now() + ResultsTtl;
273298
}
274299

275-
TPendingSaveResult& result = PendingSaveResults.back();
276-
Register(CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, result.ResultSetIndex, ExpireAt, result.FirstRow, result.AccumulatedSize, std::move(result.ResultSet)));
277-
SendStreamDataResponse(result.ReplyActorId, std::move(result.SaveResultResponse));
278-
279-
PendingSaveResults.pop_back();
300+
auto& resultSetInfo = ResultSetInfos[resultSetId];
301+
Register(CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, resultSetId, ExpireAt, resultSetInfo.FirstRowId, resultSetInfo.AccumulatedSize, std::move(resultSetInfo.PendingResult)));
280302
SaveResultInflight++;
303+
PendingResultSetsSize -= resultSetInfo.ByteCount - resultSetInfo.AccumulatedSize;
304+
resultSetInfo.FirstRowId = resultSetInfo.RowCount;
305+
resultSetInfo.AccumulatedSize = resultSetInfo.ByteCount;
306+
resultSetInfo.PendingResult = Ydb::ResultSet();
307+
}
308+
309+
void SaveResult() {
310+
for (size_t resultSetId = 0; resultSetId < ResultSetInfos.size(); ++resultSetId) {
311+
if (ResultSetInfos[resultSetId].PendingResult.rows_size()) {
312+
SaveResult(resultSetId);
313+
break;
314+
}
315+
}
281316
}
282317

283318
void Handle(TEvKqpExecuter::TEvStreamData::TPtr& ev) {
@@ -299,57 +334,52 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
299334

300335
auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();
301336

302-
if (resultSetIndex >= ResultSetMetaArray.size()) {
337+
if (resultSetIndex >= ResultSetInfos.size()) {
303338
// we don't know result set count, so just accept all of them
304339
// it's possible to have several result sets per script
305340
// they can arrive in any order and may be missed for some indices
306-
ResultSetRowCount.resize(resultSetIndex + 1);
307-
ResultSetByteCount.resize(resultSetIndex + 1);
308-
Truncated.resize(resultSetIndex + 1);
309-
ResultSetMetaArray.resize(resultSetIndex + 1, nullptr);
341+
ResultSetInfos.resize(resultSetIndex + 1);
310342
}
311343

312-
bool saveResultRequired = false;
313-
if (IsExecuting() && !Truncated[resultSetIndex]) {
314-
auto& rowCount = ResultSetRowCount[resultSetIndex];
315-
auto& byteCount = ResultSetByteCount[resultSetIndex];
316-
auto firstRow = rowCount;
317-
auto accumulatedSize = byteCount;
344+
auto& resultSetInfo = ResultSetInfos[resultSetIndex];
345+
if (IsExecuting() && !resultSetInfo.Truncated) {
346+
auto& rowCount = resultSetInfo.RowCount;
347+
auto& byteCount = resultSetInfo.ByteCount;
318348

319-
Ydb::ResultSet resultSet;
320349
for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) {
321350
if (QueryServiceConfig.GetScriptResultRowsLimit() && rowCount + 1 > QueryServiceConfig.GetScriptResultRowsLimit()) {
322-
Truncated[resultSetIndex] = true;
351+
resultSetInfo.Truncated = true;
323352
break;
324353
}
325354

326355
auto serializedSize = row.ByteSizeLong();
327356
if (QueryServiceConfig.GetScriptResultSizeLimit() && byteCount + serializedSize > QueryServiceConfig.GetScriptResultSizeLimit()) {
328-
Truncated[resultSetIndex] = true;
357+
resultSetInfo.Truncated = true;
329358
break;
330359
}
331360

332361
rowCount++;
333362
byteCount += serializedSize;
334-
*resultSet.add_rows() = std::move(row);
363+
PendingResultSetsSize += serializedSize;
364+
*resultSetInfo.PendingResult.add_rows() = std::move(row);
335365
}
336366

337-
bool newResultSet = ResultSetMetaArray[resultSetIndex] == nullptr;
338-
if (newResultSet || Truncated[resultSetIndex]) {
367+
bool newResultSet = resultSetInfo.Meta == nullptr;
368+
if (newResultSet || resultSetInfo.Truncated) {
339369
Ydb::Query::Internal::ResultSetMeta meta;
340370
if (newResultSet) {
341371
*meta.mutable_columns() = ev->Get()->Record.GetResultSet().columns();
342372
}
343-
if (Truncated[resultSetIndex]) {
373+
if (resultSetInfo.Truncated) {
344374
meta.set_truncated(true);
345375
}
346376

347377
NJson::TJsonValue* value;
348378
if (newResultSet) {
349379
value = &ResultSetMetas[resultSetIndex];
350-
ResultSetMetaArray[resultSetIndex] = value;
380+
resultSetInfo.Meta = value;
351381
} else {
352-
value = ResultSetMetaArray[resultSetIndex];
382+
value = resultSetInfo.Meta;
353383
}
354384
NProtobufJson::Proto2Json(meta, *value, NProtobufJson::TProto2JsonConfig());
355385

@@ -362,24 +392,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
362392
}
363393
}
364394

365-
if (resultSet.rows_size() > 0) {
366-
saveResultRequired = true;
367-
PendingSaveResults.push_back({
368-
resultSetIndex,
369-
firstRow,
370-
accumulatedSize,
371-
std::move(resultSet),
372-
ev->Sender,
373-
std::move(resp)
374-
});
395+
if (ShouldSaveResult(resultSetInfo)) {
396+
SaveResult(resultSetIndex);
375397
}
376398
}
377399

378-
if (saveResultRequired) {
379-
SaveResult();
380-
} else {
381-
SendStreamDataResponse(ev->Sender, std::move(resp));
382-
}
400+
PendingAcks.push({.ReplyActorId = ev->Sender, .AckEvent = std::move(resp)});
401+
SendStreamDataResponse();
383402
}
384403

385404
void SaveResultMeta() {
@@ -504,9 +523,15 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
504523
Status = ev->Get()->Status;
505524
Issues.AddIssues(ev->Get()->Issues);
506525
} else {
507-
SaveResult();
526+
for (size_t resultSetId = 0; resultSetId < ResultSetInfos.size(); ++resultSetId) {
527+
if (ShouldSaveResult(ResultSetInfos[resultSetId])) {
528+
SaveResult(resultSetId);
529+
break;
530+
}
531+
}
508532
}
509533
}
534+
SendStreamDataResponse();
510535
CheckInflight();
511536
}
512537

@@ -533,6 +558,12 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
533558
return;
534559
}
535560

561+
if (PendingResultSetsSize) {
562+
// Complete results saving
563+
SaveResult();
564+
return;
565+
}
566+
536567
if (!LeaseUpdateQueryRunning) {
537568
RunScriptExecutionFinisher();
538569
} else {
@@ -545,7 +576,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
545576
Status = status;
546577

547578
// if query has no results, save empty json array
548-
if (ResultSetMetaArray.empty()) {
579+
if (ResultSetInfos.empty()) {
549580
ResultSetMetas.SetType(NJson::JSON_ARRAY);
550581
SaveResultMeta();
551582
SaveResultMetaInflight++;
@@ -566,6 +597,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
566597
&& RunState != ERunState::Cancelling;
567598
}
568599

600+
static bool ShouldSaveResult(TResultSetInfo& resultInfo) {
601+
if (!resultInfo.PendingResult.rows_size()) {
602+
return false;
603+
}
604+
return resultInfo.Truncated || resultInfo.PendingResult.rows_size() >= MIN_SAVE_RESULT_BATCH_ROWS || resultInfo.ByteCount - resultInfo.AccumulatedSize >= MIN_SAVE_RESULT_BATCH_SIZE;
605+
}
606+
569607
private:
570608
const TString ExecutionId;
571609
NKikimrKqp::TEvQueryRequest Request;
@@ -589,16 +627,14 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
589627
Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
590628

591629
// Result
592-
std::vector<TPendingSaveResult> PendingSaveResults;
593-
std::vector<ui64> ResultSetRowCount;
594-
std::vector<ui64> ResultSetByteCount;
595-
std::vector<bool> Truncated;
596-
std::vector<NJson::TJsonValue*> ResultSetMetaArray;
630+
std::vector<TResultSetInfo> ResultSetInfos;
631+
std::queue<TPendingAck> PendingAcks;
597632
TMaybe<TInstant> ExpireAt;
598633
NJson::TJsonValue ResultSetMetas;
599634
ui32 SaveResultInflight = 0;
600635
ui32 SaveResultMetaInflight = 0;
601636
bool PendingResultMeta = false;
637+
ui64 PendingResultSetsSize = 0;
602638
std::optional<TString> QueryPlan;
603639
std::optional<TString> QueryAst;
604640
std::optional<NKqpProto::TKqpStatsQuery> QueryStats;

0 commit comments

Comments
 (0)