Skip to content

Commit ab89e4b

Browse files
committed
support trailing generic query responses
1 parent 8ce7edb commit ab89e4b

File tree

12 files changed

+261
-78
lines changed

12 files changed

+261
-78
lines changed

ydb/core/grpc_services/query/rpc_execute_query.cpp

+31-7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu
2424
struct TProducerState {
2525
TMaybe<ui64> LastSeqNo;
2626
ui64 AckedFreeSpaceBytes = 0;
27+
TActorId ActorId;
28+
ui64 QueryResultIndex = 0;
2729
};
2830

2931
class TRpcFlowControlState {
@@ -244,8 +246,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
244246
const auto traceId = Request_->GetTraceId();
245247

246248
NYql::TIssues issues;
247-
NKikimrKqp::EQueryAction queryAction;
248-
if (!ParseQueryAction(*req, queryAction, issues)) {
249+
if (!ParseQueryAction(*req, QueryAction, issues)) {
249250
return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST, std::move(issues));
250251
}
251252

@@ -274,7 +275,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
274275
cachePolicy->set_keep_in_cache(true);
275276

276277
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
277-
queryAction,
278+
QueryAction,
278279
queryType,
279280
SelfId(),
280281
Request_,
@@ -335,8 +336,10 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
335336
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
336337
resp->Record.SetSeqNo(*producer.LastSeqNo);
337338
resp->Record.SetFreeSpace(freeSpaceBytes);
339+
resp->Record.SetQueryResultIndex(producer.QueryResultIndex);
340+
resp->Record.SetChannelId(producerId);
338341

339-
ctx.Send(producerId, resp.Release());
342+
ctx.Send(producer.ActorId, resp.Release());
340343

341344
producer.AckedFreeSpaceBytes = freeSpaceBytes;
342345
}
@@ -358,9 +361,11 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
358361

359362
Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);
360363

361-
auto& producer = StreamProducers_[ev->Sender];
364+
auto& producer = StreamProducers_[ev->Get()->Record.GetChannelId()];
365+
producer.ActorId = ev->Sender;
362366
producer.LastSeqNo = ev->Get()->Record.GetSeqNo();
363367
producer.AckedFreeSpaceBytes = freeSpaceBytes;
368+
producer.QueryResultIndex = ev->Get()->Record.GetQueryResultIndex();
364369

365370
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Send stream data ack"
366371
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
@@ -371,6 +376,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
371376
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
372377
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
373378
resp->Record.SetFreeSpace(freeSpaceBytes);
379+
resp->Record.SetQueryResultIndex(ev->Get()->Record.GetQueryResultIndex());
380+
resp->Record.SetChannelId(ev->Get()->Record.GetChannelId());
374381

375382
ctx.Send(ev->Sender, resp.Release());
376383
}
@@ -381,14 +388,30 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
381388
const auto& issueMessage = record.GetResponse().GetQueryIssues();
382389

383390
bool hasTrailingMessage = false;
384-
391+
392+
auto& kqpResponse = record.GetResponse();
393+
if (kqpResponse.GetYdbResults().size() > 1) {
394+
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
395+
"Unexpected trailing message with multiple result sets.");
396+
ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issue);
397+
return;
398+
}
399+
385400
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
386401
Request_->SetRuHeader(record.GetConsumedRu());
387402

388403
auto& kqpResponse = record.GetResponse();
389404

390405
Ydb::Query::ExecuteQueryResponsePart response;
391406

407+
if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) {
408+
for(int i = 0; i < kqpResponse.GetYdbResults().size(); i++) {
409+
hasTrailingMessage = true;
410+
response.set_result_set_index(i);
411+
response.mutable_result_set()->Swap(record.MutableResponse()->MutableYdbResults(i));
412+
}
413+
}
414+
392415
AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response);
393416

394417
if (kqpResponse.HasTxMeta()) {
@@ -492,8 +515,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
492515
private:
493516
std::shared_ptr<TEvExecuteQueryRequest> Request_;
494517

518+
NKikimrKqp::EQueryAction QueryAction;
495519
TRpcFlowControlState FlowControl_;
496-
TMap<TActorId, TProducerState> StreamProducers_;
520+
TMap<ui64, TProducerState> StreamProducers_;
497521
};
498522

499523
} // namespace

ydb/core/kqp/common/events/query.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
3434
const ::Ydb::Operations::OperationParams* operationParams,
3535
bool keepSession = false,
3636
bool useCancelAfter = true,
37-
const ::Ydb::Query::Syntax syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED);
37+
const ::Ydb::Query::Syntax syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED,
38+
bool supportsStreamTrailingResult = false);
3839

3940
TEvQueryRequest() = default;
4041

@@ -285,6 +286,10 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
285286
ProgressStatsPeriod = progressStatsPeriod;
286287
}
287288

289+
bool GetSupportsStreamTrailingResult() const {
290+
return SupportsStreamTrailingResult;
291+
}
292+
288293
TDuration GetProgressStatsPeriod() const {
289294
return ProgressStatsPeriod;
290295
}
@@ -317,6 +322,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
317322
const ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED;
318323
TIntrusivePtr<TUserRequestContext> UserRequestContext;
319324
TDuration ProgressStatsPeriod;
325+
bool SupportsStreamTrailingResult = false;
320326
};
321327

322328
struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart,

ydb/core/kqp/common/kqp_event_impl.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
2020
const ::Ydb::Operations::OperationParams* operationParams,
2121
bool keepSession,
2222
bool useCancelAfter,
23-
const ::Ydb::Query::Syntax syntax)
23+
const ::Ydb::Query::Syntax syntax,
24+
bool supportsStreamTrailingResult)
2425
: RequestCtx(ctx)
2526
, RequestActorId(requestActorId)
2627
, Database(CanonizePath(ctx->GetDatabaseName().GetOrElse("")))
@@ -36,6 +37,7 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
3637
, HasOperationParams(operationParams)
3738
, KeepSession(keepSession)
3839
, Syntax(syntax)
40+
, SupportsStreamTrailingResult(supportsStreamTrailingResult)
3941
{
4042
if (HasOperationParams) {
4143
OperationTimeout = GetDuration(operationParams->operation_timeout());

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+5-40
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
130130
const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
131131
const bool enableOlapSink)
132132
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
133-
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter"
133+
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter", streamResult
134134
)
135135
, AsyncIoFactory(std::move(asyncIoFactory))
136-
, StreamResult(streamResult)
137136
, EnableOlapSink(enableOlapSink)
138137
{
139138
Target = creator;
@@ -347,7 +346,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
347346
hFunc(TEvPersQueue::TEvProposeTransactionResult, HandlePrepare);
348347
hFunc(TEvPrivate::TEvReattachToShard, HandleExecute);
349348
hFunc(TEvDqCompute::TEvState, HandlePrepare); // from CA
350-
hFunc(TEvDqCompute::TEvChannelData, HandleExecute); // from CA
349+
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
350+
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
351351
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
352352
hFunc(TEvKqp::TEvAbortExecution, HandlePrepare);
353353
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
@@ -935,7 +935,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
935935
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
936936
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
937937
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
938-
hFunc(TEvDqCompute::TEvChannelData, HandleExecute);
938+
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
939+
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
939940
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
940941
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
941942
default:
@@ -1286,41 +1287,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
12861287
}
12871288
}
12881289

1289-
void HandleExecute(TEvDqCompute::TEvChannelData::TPtr& ev) {
1290-
auto& record = ev->Get()->Record;
1291-
auto& channelData = record.GetChannelData();
1292-
1293-
TDqSerializedBatch batch;
1294-
batch.Proto = std::move(*record.MutableChannelData()->MutableData());
1295-
if (batch.Proto.HasPayloadId()) {
1296-
batch.Payload = ev->Get()->GetPayload(batch.Proto.GetPayloadId());
1297-
}
1298-
1299-
auto& channel = TasksGraph.GetChannel(channelData.GetChannelId());
1300-
YQL_ENSURE(channel.DstTask == 0);
1301-
auto shardId = TasksGraph.GetTask(channel.SrcTask).Meta.ShardId;
1302-
1303-
if (Stats) {
1304-
Stats->ResultBytes += batch.Size();
1305-
Stats->ResultRows += batch.RowCount();
1306-
}
1307-
1308-
LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << shardId
1309-
<< ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender
1310-
<< ", finished: " << channelData.GetFinished());
1311-
1312-
ResponseEv->TakeResult(channel.DstInputIndex, std::move(batch));
1313-
{
1314-
LOG_T("Send ack to channelId: " << channel.Id << ", seqNo: " << record.GetSeqNo() << ", to: " << ev->Sender);
1315-
1316-
auto ackEv = MakeHolder<TEvDqCompute::TEvChannelDataAck>();
1317-
ackEv->Record.SetSeqNo(record.GetSeqNo());
1318-
ackEv->Record.SetChannelId(channel.Id);
1319-
ackEv->Record.SetFreeSpace(50_MB);
1320-
Send(ev->Sender, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id);
1321-
}
1322-
}
1323-
13241290
private:
13251291
bool IsReadOnlyTx() const {
13261292
if (Request.TopicOperations.HasOperations()) {
@@ -2417,7 +2383,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24172383

24182384
private:
24192385
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
2420-
bool StreamResult = false;
24212386
bool EnableOlapSink = false;
24222387

24232388
bool HasExternalSources = false;

0 commit comments

Comments
 (0)