Skip to content

Commit ded3818

Browse files
authored
Merge bb1736b into 27e9095
2 parents 27e9095 + bb1736b commit ded3818

File tree

10 files changed

+205
-86
lines changed

10 files changed

+205
-86
lines changed

ydb/core/grpc_services/query/rpc_execute_query.cpp

+22-4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu
2424
struct TProducerState {
2525
TMaybe<ui64> LastSeqNo;
2626
ui64 AckedFreeSpaceBytes = 0;
27+
TActorId ActorId;
2728
};
2829

2930
class TRpcFlowControlState {
@@ -335,8 +336,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
335336
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
336337
resp->Record.SetSeqNo(*producer.LastSeqNo);
337338
resp->Record.SetFreeSpace(freeSpaceBytes);
339+
resp->Record.SetQueryResultIndex(producerId);
338340

339-
ctx.Send(producerId, resp.Release());
341+
ctx.Send(producer.ActorId, resp.Release());
340342

341343
producer.AckedFreeSpaceBytes = freeSpaceBytes;
342344
}
@@ -347,6 +349,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
347349
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
348350
Ydb::Query::ExecuteQueryResponsePart response;
349351
response.set_status(Ydb::StatusIds::SUCCESS);
352+
ui64 queryResultIndex = ev->Get()->Record.GetQueryResultIndex();
350353
response.set_result_set_index(ev->Get()->Record.GetQueryResultIndex());
351354
response.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
352355

@@ -358,7 +361,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
358361

359362
Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);
360363

361-
auto& producer = StreamProducers_[ev->Sender];
364+
auto& producer = StreamProducers_[queryResultIndex];
365+
producer.ActorId = ev->Sender;
362366
producer.LastSeqNo = ev->Get()->Record.GetSeqNo();
363367
producer.AckedFreeSpaceBytes = freeSpaceBytes;
364368

@@ -371,6 +375,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
371375
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
372376
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
373377
resp->Record.SetFreeSpace(freeSpaceBytes);
378+
resp->Record.SetQueryResultIndex(queryResultIndex);
374379

375380
ctx.Send(ev->Sender, resp.Release());
376381
}
@@ -381,14 +386,27 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
381386
const auto& issueMessage = record.GetResponse().GetQueryIssues();
382387

383388
bool hasTrailingMessage = false;
384-
389+
390+
auto& kqpResponse = record.GetResponse();
391+
if (kqpResponse.GetYdbResults().size() > 1) {
392+
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
393+
"Unexpected trailing message with multiple result sets.");
394+
ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issue);
395+
return;
396+
}
397+
385398
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
386399
Request_->SetRuHeader(record.GetConsumedRu());
387400

388401
auto& kqpResponse = record.GetResponse();
389402

390403
Ydb::Query::ExecuteQueryResponsePart response;
391404

405+
for(int i = 0; i < kqpResponse.GetYdbResults().size(); i++) {
406+
response.set_result_set_index(i);
407+
response.mutable_result_set()->Swap(record.MutableResponse()->MutableYdbResults(i));
408+
}
409+
392410
AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response);
393411

394412
if (kqpResponse.HasTxMeta()) {
@@ -493,7 +511,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
493511
std::shared_ptr<TEvExecuteQueryRequest> Request_;
494512

495513
TRpcFlowControlState FlowControl_;
496-
TMap<TActorId, TProducerState> StreamProducers_;
514+
TMap<ui64, TProducerState> StreamProducers_;
497515
};
498516

499517
} // namespace

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+5-43
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
130130
const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
131131
const bool enableOlapSink)
132132
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
133-
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter"
133+
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter", streamResult
134134
)
135135
, AsyncIoFactory(std::move(asyncIoFactory))
136-
, StreamResult(streamResult)
137136
, EnableOlapSink(enableOlapSink)
138137
{
139138
Target = creator;
@@ -347,7 +346,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
347346
hFunc(TEvPersQueue::TEvProposeTransactionResult, HandlePrepare);
348347
hFunc(TEvPrivate::TEvReattachToShard, HandleExecute);
349348
hFunc(TEvDqCompute::TEvState, HandlePrepare); // from CA
350-
hFunc(TEvDqCompute::TEvChannelData, HandleExecute); // from CA
349+
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
350+
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
351351
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
352352
hFunc(TEvKqp::TEvAbortExecution, HandlePrepare);
353353
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
@@ -935,7 +935,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
935935
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
936936
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
937937
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
938-
hFunc(TEvDqCompute::TEvChannelData, HandleExecute);
938+
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
939+
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
939940
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
940941
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
941942
default:
@@ -1286,41 +1287,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
12861287
}
12871288
}
12881289

1289-
void HandleExecute(TEvDqCompute::TEvChannelData::TPtr& ev) {
1290-
auto& record = ev->Get()->Record;
1291-
auto& channelData = record.GetChannelData();
1292-
1293-
TDqSerializedBatch batch;
1294-
batch.Proto = std::move(*record.MutableChannelData()->MutableData());
1295-
if (batch.Proto.HasPayloadId()) {
1296-
batch.Payload = ev->Get()->GetPayload(batch.Proto.GetPayloadId());
1297-
}
1298-
1299-
auto& channel = TasksGraph.GetChannel(channelData.GetChannelId());
1300-
YQL_ENSURE(channel.DstTask == 0);
1301-
auto shardId = TasksGraph.GetTask(channel.SrcTask).Meta.ShardId;
1302-
1303-
if (Stats) {
1304-
Stats->ResultBytes += batch.Size();
1305-
Stats->ResultRows += batch.RowCount();
1306-
}
1307-
1308-
LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << shardId
1309-
<< ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender
1310-
<< ", finished: " << channelData.GetFinished());
1311-
1312-
ResponseEv->TakeResult(channel.DstInputIndex, std::move(batch));
1313-
{
1314-
LOG_T("Send ack to channelId: " << channel.Id << ", seqNo: " << record.GetSeqNo() << ", to: " << ev->Sender);
1315-
1316-
auto ackEv = MakeHolder<TEvDqCompute::TEvChannelDataAck>();
1317-
ackEv->Record.SetSeqNo(record.GetSeqNo());
1318-
ackEv->Record.SetChannelId(channel.Id);
1319-
ackEv->Record.SetFreeSpace(50_MB);
1320-
Send(ev->Sender, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id);
1321-
}
1322-
}
1323-
13241290
private:
13251291
bool IsReadOnlyTx() const {
13261292
if (Request.TopicOperations.HasOperations()) {
@@ -1770,10 +1736,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
17701736
THashMap<ui64, TVector<ui64>> remoteComputeTasks; // shardId -> [task]
17711737
TVector<ui64> computeTasks;
17721738

1773-
if (StreamResult) {
1774-
InitializeChannelProxies();
1775-
}
1776-
17771739
for (auto& task : TasksGraph.GetTasks()) {
17781740
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
17791741
if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) {

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+130-11
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#include <ydb/core/kqp/common/kqp_ru_calc.h>
1313
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
14+
#include <ydb/core/kqp/runtime/kqp_transport.h>
1415

1516
#include <ydb/core/actorlib_impl/long_timer.h>
1617
#include <ydb/core/base/appdata.h>
@@ -34,6 +35,7 @@
3435

3536
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
3637
#include <ydb/library/yql/dq/runtime/dq_transport.h>
38+
#include <ydb/library/yql/dq/common/dq_serialized_batch.h>
3739
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
3840
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
3941
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -45,6 +47,7 @@
4547
#include <ydb/library/actors/core/hfunc.h>
4648
#include <ydb/library/actors/core/log.h>
4749

50+
4851
#include <util/generic/size_literals.h>
4952

5053

@@ -119,7 +122,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
119122
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
120123
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
121124
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
122-
ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase")
125+
ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase", bool streamResult = false)
123126
: Request(std::move(request))
124127
, Database(database)
125128
, UserToken(userToken)
@@ -130,6 +133,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
130133
, MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
131134
, AggregationSettings(aggregation)
132135
, HasOlapTable(false)
136+
, StreamResult(streamResult)
133137
{
134138
TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
135139
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
@@ -234,6 +238,128 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
234238
return true;
235239
}
236240

241+
struct TEvComputeChannelDataOOB {
242+
NYql::NDqProto::TEvComputeChannelData Proto;
243+
TRope Payload;
244+
245+
size_t Size() const {
246+
return Proto.GetChannelData().GetData().GetRaw().size() + Payload.size();
247+
}
248+
249+
ui32 RowCount() const {
250+
return Proto.GetChannelData().GetData().GetRows();
251+
}
252+
};
253+
254+
void HandleChannelData(NYql::NDq::TEvDqCompute::TEvChannelData::TPtr& ev) {
255+
auto& record = ev->Get()->Record;
256+
auto& channelData = record.GetChannelData();
257+
auto& channel = TasksGraph.GetChannel(channelData.GetChannelId());
258+
auto& task = TasksGraph.GetTask(channel.SrcTask);
259+
const TActorId channelComputeActorId = ev->Sender;
260+
auto [it, _] = ResultChannelToComputeActor.emplace(channel.DstInputIndex, std::make_pair(ev->Sender, channel.Id));
261+
262+
YQL_ENSURE(it->second.first == channelComputeActorId);
263+
264+
auto& txResult = ResponseEv->TxResults[channel.DstInputIndex];
265+
if (StreamResult && txResult.IsStream && txResult.QueryResultIndex.Defined()) {
266+
267+
TEvComputeChannelDataOOB computeData;
268+
computeData.Proto = std::move(ev->Get()->Record);
269+
if (computeData.Proto.GetChannelData().GetData().HasPayloadId()) {
270+
computeData.Payload = ev->Get()->GetPayload(computeData.Proto.GetChannelData().GetData().GetPayloadId());
271+
}
272+
273+
const bool trailingResults = (
274+
computeData.Proto.GetChannelData().GetFinished() &&
275+
(ResponseEv->TxResults.size() == 1) &&
276+
Request.IsTrailingResultsAllowed());
277+
278+
TVector<NYql::NDq::TDqSerializedBatch> batches(1);
279+
auto& batch = batches.front();
280+
281+
batch.Proto = std::move(*computeData.Proto.MutableChannelData()->MutableData());
282+
batch.Payload = std::move(computeData.Payload);
283+
284+
TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
285+
auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), txResult.MkqlItemType, txResult.ColumnOrder);
286+
287+
if (!trailingResults) {
288+
auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>();
289+
streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
290+
streamEv->Record.SetQueryResultIndex(*txResult.QueryResultIndex);
291+
streamEv->Record.MutableResultSet()->Swap(&resultSet);
292+
293+
LOG_E("Send TEvStreamData to " << Target << ", seqNo: " << streamEv->Record.GetSeqNo()
294+
<< ", nRows: " << streamEv->Record.GetResultSet().rows().size());
295+
296+
this->Send(Target, streamEv.Release());
297+
298+
} else {
299+
auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
300+
ackEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
301+
ackEv->Record.SetChannelId(channel.Id);
302+
ackEv->Record.SetFreeSpace(50_MB);
303+
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id);
304+
txResult.TrailingResult.Swap(&resultSet);
305+
LOG_E("staging TEvStreamData to " << Target << ", seqNo: " << computeData.Proto.GetSeqNo()
306+
<< ", nRows: " << txResult.TrailingResult.rows().size());
307+
}
308+
309+
return;
310+
}
311+
312+
NYql::NDq::TDqSerializedBatch batch;
313+
batch.Proto = std::move(*record.MutableChannelData()->MutableData());
314+
if (batch.Proto.HasPayloadId()) {
315+
batch.Payload = ev->Get()->GetPayload(batch.Proto.GetPayloadId());
316+
}
317+
318+
YQL_ENSURE(channel.DstTask == 0);
319+
320+
if (Stats) {
321+
Stats->ResultBytes += batch.Size();
322+
Stats->ResultRows += batch.RowCount();
323+
}
324+
325+
LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << task.Meta.ShardId
326+
<< ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender
327+
<< ", finished: " << channelData.GetFinished());
328+
329+
ResponseEv->TakeResult(channel.DstInputIndex, std::move(batch));
330+
LOG_T("Send ack to channelId: " << channel.Id << ", seqNo: " << record.GetSeqNo() << ", to: " << ev->Sender);
331+
332+
auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
333+
ackEv->Record.SetSeqNo(record.GetSeqNo());
334+
ackEv->Record.SetChannelId(channel.Id);
335+
ackEv->Record.SetFreeSpace(50_MB);
336+
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id);
337+
}
338+
339+
void HandleStreamAck(TEvKqpExecuter::TEvStreamDataAck::TPtr& ev) {
340+
ui64 queryResultIndex = ev->Get()->Record.GetQueryResultIndex();
341+
auto it = ResultChannelToComputeActor.find(queryResultIndex);
342+
YQL_ENSURE(it != ResultChannelToComputeActor.end());
343+
const auto [channelComputeActorId, channelId] = it->second;
344+
345+
ui64 seqNo = ev->Get()->Record.GetSeqNo();
346+
i64 freeSpace = ev->Get()->Record.GetFreeSpace();
347+
348+
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId
349+
<< ", send ack to channelId: " << channelId
350+
<< ", seqNo: " << seqNo
351+
<< ", enough: " << ev->Get()->Record.GetEnough()
352+
<< ", freeSpace: " << freeSpace
353+
<< ", to: " << channelComputeActorId);
354+
355+
auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
356+
ackEv->Record.SetSeqNo(seqNo);
357+
ackEv->Record.SetChannelId(channelId);
358+
ackEv->Record.SetFreeSpace(freeSpace);
359+
ackEv->Record.SetFinish(ev->Get()->Record.GetEnough());
360+
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
361+
}
362+
237363
void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
238364
TActorId computeActor = ev->Sender;
239365
auto& state = ev->Get()->Record;
@@ -1594,16 +1720,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
15941720
return true;
15951721
}
15961722

1597-
void InitializeChannelProxies() {
1598-
for(const auto& channel: TasksGraph.GetChannels()) {
1599-
if (channel.DstTask) {
1600-
continue;
1601-
}
1602-
1603-
CreateChannelProxy(channel);
1604-
}
1605-
}
1606-
16071723
const IKqpGateway::TKqpSnapshot& GetSnapshot() const {
16081724
return TasksGraph.GetMeta().Snapshot;
16091725
}
@@ -1753,8 +1869,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17531869
const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
17541870
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
17551871
bool HasOlapTable = false;
1872+
bool StreamResult = false;
17561873
bool HasDatashardSourceScan = false;
17571874
bool UnknownAffectedShardCount = false;
1875+
1876+
THashMap<ui64, std::pair<TActorId, ui64>> ResultChannelToComputeActor;
17581877
THashMap<NYql::NDq::TStageId, THashMap<ui64, TShardInfo>> SourceScanStageIdToParititions;
17591878

17601879
private:

0 commit comments

Comments
 (0)