@@ -24,6 +24,7 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu
24
24
struct TProducerState {
25
25
TMaybe<ui64> LastSeqNo;
26
26
ui64 AckedFreeSpaceBytes = 0 ;
27
+ TActorId ActorId;
27
28
};
28
29
29
30
class TRpcFlowControlState {
@@ -244,8 +245,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
244
245
const auto traceId = Request_->GetTraceId ();
245
246
246
247
NYql::TIssues issues;
247
- NKikimrKqp::EQueryAction queryAction;
248
- if (!ParseQueryAction (*req, queryAction, issues)) {
248
+ if (!ParseQueryAction (*req, QueryAction, issues)) {
249
249
return ReplyFinishStream (Ydb::StatusIds::BAD_REQUEST, std::move (issues));
250
250
}
251
251
@@ -274,7 +274,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
274
274
cachePolicy->set_keep_in_cache (true );
275
275
276
276
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
277
- queryAction ,
277
+ QueryAction ,
278
278
queryType,
279
279
SelfId (),
280
280
Request_,
@@ -288,7 +288,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
288
288
nullptr , // operationParams
289
289
false , // keepSession
290
290
false , // useCancelAfter
291
- syntax);
291
+ syntax,
292
+ true );
292
293
293
294
if (!ctx.Send (NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), ev.Release ())) {
294
295
NYql::TIssues issues;
@@ -322,23 +323,24 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
322
323
323
324
ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes ();
324
325
325
- for (auto & pair : StreamProducers_ ) {
326
- const auto & producerId = pair.first ;
327
- auto & producer = pair.second ;
326
+ for (auto & pair : StreamChannels_ ) {
327
+ const auto & channelId = pair.first ;
328
+ auto & channel = pair.second ;
328
329
329
- if (freeSpaceBytes > 0 && producer .LastSeqNo && producer .AckedFreeSpaceBytes == 0 ) {
330
+ if (freeSpaceBytes > 0 && channel .LastSeqNo && channel .AckedFreeSpaceBytes == 0 ) {
330
331
LOG_DEBUG_S (ctx, NKikimrServices::RPC_REQUEST, this ->SelfId () << " Resume execution, "
331
- << " , producer : " << producerId
332
- << " , seqNo: " << producer .LastSeqNo
332
+ << " , channel : " << channelId
333
+ << " , seqNo: " << channel .LastSeqNo
333
334
<< " , freeSpace: " << freeSpaceBytes);
334
335
335
336
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
336
- resp->Record .SetSeqNo (*producer .LastSeqNo );
337
+ resp->Record .SetSeqNo (*channel .LastSeqNo );
337
338
resp->Record .SetFreeSpace (freeSpaceBytes);
339
+ resp->Record .SetChannelId (channelId);
338
340
339
- ctx.Send (producerId , resp.Release ());
341
+ ctx.Send (channel. ActorId , resp.Release ());
340
342
341
- producer .AckedFreeSpaceBytes = freeSpaceBytes;
343
+ channel .AckedFreeSpaceBytes = freeSpaceBytes;
342
344
}
343
345
}
344
346
@@ -358,9 +360,10 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
358
360
359
361
Request_->SendSerializedResult (std::move (out), Ydb::StatusIds::SUCCESS);
360
362
361
- auto & producer = StreamProducers_[ev->Sender ];
362
- producer.LastSeqNo = ev->Get ()->Record .GetSeqNo ();
363
- producer.AckedFreeSpaceBytes = freeSpaceBytes;
363
+ auto & channel = StreamChannels_[ev->Get ()->Record .GetChannelId ()];
364
+ channel.ActorId = ev->Sender ;
365
+ channel.LastSeqNo = ev->Get ()->Record .GetSeqNo ();
366
+ channel.AckedFreeSpaceBytes = freeSpaceBytes;
364
367
365
368
LOG_DEBUG_S (ctx, NKikimrServices::RPC_REQUEST, this ->SelfId () << " Send stream data ack"
366
369
<< " , seqNo: " << ev->Get ()->Record .GetSeqNo ()
@@ -371,8 +374,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
371
374
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
372
375
resp->Record .SetSeqNo (ev->Get ()->Record .GetSeqNo ());
373
376
resp->Record .SetFreeSpace (freeSpaceBytes);
377
+ resp->Record .SetChannelId (ev->Get ()->Record .GetChannelId ());
374
378
375
- ctx.Send (ev-> Sender , resp.Release ());
379
+ ctx.Send (channel. ActorId , resp.Release ());
376
380
}
377
381
378
382
void Handle (NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
@@ -381,14 +385,30 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
381
385
const auto & issueMessage = record.GetResponse ().GetQueryIssues ();
382
386
383
387
bool hasTrailingMessage = false ;
384
-
388
+
389
+ auto & kqpResponse = record.GetResponse ();
390
+ if (kqpResponse.GetYdbResults ().size () > 1 ) {
391
+ auto issue = MakeIssue (NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
392
+ " Unexpected trailing message with multiple result sets." );
393
+ ReplyFinishStream (Ydb::StatusIds::INTERNAL_ERROR, issue);
394
+ return ;
395
+ }
396
+
385
397
if (record.GetYdbStatus () == Ydb::StatusIds::SUCCESS) {
386
398
Request_->SetRuHeader (record.GetConsumedRu ());
387
399
388
400
auto & kqpResponse = record.GetResponse ();
389
401
390
402
Ydb::Query::ExecuteQueryResponsePart response;
391
403
404
+ if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) {
405
+ for (int i = 0 ; i < kqpResponse.GetYdbResults ().size (); i++) {
406
+ hasTrailingMessage = true ;
407
+ response.set_result_set_index (i);
408
+ response.mutable_result_set ()->Swap (record.MutableResponse ()->MutableYdbResults (i));
409
+ }
410
+ }
411
+
392
412
AuditContextAppend (Request_.get (), *Request_->GetProtoRequest (), response);
393
413
394
414
if (kqpResponse.HasTxMeta ()) {
@@ -492,8 +512,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
492
512
private:
493
513
std::shared_ptr<TEvExecuteQueryRequest> Request_;
494
514
515
+ NKikimrKqp::EQueryAction QueryAction;
495
516
TRpcFlowControlState FlowControl_;
496
- TMap<TActorId , TProducerState> StreamProducers_ ;
517
+ TMap<ui64 , TProducerState> StreamChannels_ ;
497
518
};
498
519
499
520
} // namespace
0 commit comments