@@ -24,6 +24,8 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu
24
24
struct TProducerState {
25
25
TMaybe<ui64> LastSeqNo;
26
26
ui64 AckedFreeSpaceBytes = 0 ;
27
+ TActorId ActorId;
28
+ ui64 QueryResultIndex = 0 ;
27
29
};
28
30
29
31
class TRpcFlowControlState {
@@ -244,8 +246,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
244
246
const auto traceId = Request_->GetTraceId ();
245
247
246
248
NYql::TIssues issues;
247
- NKikimrKqp::EQueryAction queryAction;
248
- if (!ParseQueryAction (*req, queryAction, issues)) {
249
+ if (!ParseQueryAction (*req, QueryAction, issues)) {
249
250
return ReplyFinishStream (Ydb::StatusIds::BAD_REQUEST, std::move (issues));
250
251
}
251
252
@@ -274,7 +275,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
274
275
cachePolicy->set_keep_in_cache (true );
275
276
276
277
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
277
- queryAction ,
278
+ QueryAction ,
278
279
queryType,
279
280
SelfId (),
280
281
Request_,
@@ -288,7 +289,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
288
289
nullptr , // operationParams
289
290
false , // keepSession
290
291
false , // useCancelAfter
291
- syntax);
292
+ syntax,
293
+ true );
292
294
293
295
if (!ctx.Send (NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), ev.Release ())) {
294
296
NYql::TIssues issues;
@@ -322,23 +324,24 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
322
324
323
325
ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes ();
324
326
325
- for (auto & pair : StreamProducers_ ) {
326
- const auto & producerId = pair.first ;
327
- auto & producer = pair.second ;
327
+ for (auto & pair : StreamChannels_ ) {
328
+ const auto & channelId = pair.first ;
329
+ auto & channel = pair.second ;
328
330
329
- if (freeSpaceBytes > 0 && producer .LastSeqNo && producer .AckedFreeSpaceBytes == 0 ) {
331
+ if (freeSpaceBytes > 0 && channel .LastSeqNo && channel .AckedFreeSpaceBytes == 0 ) {
330
332
LOG_DEBUG_S (ctx, NKikimrServices::RPC_REQUEST, this ->SelfId () << " Resume execution, "
331
- << " , producer : " << producerId
332
- << " , seqNo: " << producer .LastSeqNo
333
+ << " , channel : " << channelId
334
+ << " , seqNo: " << channel .LastSeqNo
333
335
<< " , freeSpace: " << freeSpaceBytes);
334
336
335
337
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
336
- resp->Record .SetSeqNo (*producer .LastSeqNo );
338
+ resp->Record .SetSeqNo (*channel .LastSeqNo );
337
339
resp->Record .SetFreeSpace (freeSpaceBytes);
340
+ resp->Record .SetChannelId (channelId);
338
341
339
- ctx.Send (producerId , resp.Release ());
342
+ ctx.Send (channel. ActorId , resp.Release ());
340
343
341
- producer .AckedFreeSpaceBytes = freeSpaceBytes;
344
+ channel .AckedFreeSpaceBytes = freeSpaceBytes;
342
345
}
343
346
}
344
347
@@ -358,9 +361,11 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
358
361
359
362
Request_->SendSerializedResult (std::move (out), Ydb::StatusIds::SUCCESS);
360
363
361
- auto & producer = StreamProducers_[ev->Sender ];
362
- producer.LastSeqNo = ev->Get ()->Record .GetSeqNo ();
363
- producer.AckedFreeSpaceBytes = freeSpaceBytes;
364
+ auto & channel = StreamChannels_[ev->Get ()->Record .GetChannelId ()];
365
+ channel.ActorId = ev->Sender ;
366
+ channel.LastSeqNo = ev->Get ()->Record .GetSeqNo ();
367
+ channel.AckedFreeSpaceBytes = freeSpaceBytes;
368
+ channel.QueryResultIndex = ev->Get ()->Record .GetQueryResultIndex ();
364
369
365
370
LOG_DEBUG_S (ctx, NKikimrServices::RPC_REQUEST, this ->SelfId () << " Send stream data ack"
366
371
<< " , seqNo: " << ev->Get ()->Record .GetSeqNo ()
@@ -371,8 +376,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
371
376
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
372
377
resp->Record .SetSeqNo (ev->Get ()->Record .GetSeqNo ());
373
378
resp->Record .SetFreeSpace (freeSpaceBytes);
379
+ resp->Record .SetChannelId (ev->Get ()->Record .GetChannelId ());
374
380
375
- ctx.Send (ev-> Sender , resp.Release ());
381
+ ctx.Send (channel. ActorId , resp.Release ());
376
382
}
377
383
378
384
void Handle (NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
@@ -381,14 +387,30 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
381
387
const auto & issueMessage = record.GetResponse ().GetQueryIssues ();
382
388
383
389
bool hasTrailingMessage = false ;
384
-
390
+
391
+ auto & kqpResponse = record.GetResponse ();
392
+ if (kqpResponse.GetYdbResults ().size () > 1 ) {
393
+ auto issue = MakeIssue (NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
394
+ " Unexpected trailing message with multiple result sets." );
395
+ ReplyFinishStream (Ydb::StatusIds::INTERNAL_ERROR, issue);
396
+ return ;
397
+ }
398
+
385
399
if (record.GetYdbStatus () == Ydb::StatusIds::SUCCESS) {
386
400
Request_->SetRuHeader (record.GetConsumedRu ());
387
401
388
402
auto & kqpResponse = record.GetResponse ();
389
403
390
404
Ydb::Query::ExecuteQueryResponsePart response;
391
405
406
+ if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) {
407
+ for (int i = 0 ; i < kqpResponse.GetYdbResults ().size (); i++) {
408
+ hasTrailingMessage = true ;
409
+ response.set_result_set_index (i);
410
+ response.mutable_result_set ()->Swap (record.MutableResponse ()->MutableYdbResults (i));
411
+ }
412
+ }
413
+
392
414
AuditContextAppend (Request_.get (), *Request_->GetProtoRequest (), response);
393
415
394
416
if (kqpResponse.HasTxMeta ()) {
@@ -496,8 +518,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
496
518
private:
497
519
std::shared_ptr<TEvExecuteQueryRequest> Request_;
498
520
521
+ NKikimrKqp::EQueryAction QueryAction;
499
522
TRpcFlowControlState FlowControl_;
500
- TMap<TActorId , TProducerState> StreamProducers_ ;
523
+ TMap<ui64 , TProducerState> StreamChannels_ ;
501
524
};
502
525
503
526
} // namespace
0 commit comments