@@ -130,10 +130,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
130
130
const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
131
131
const bool enableOlapSink)
132
132
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
133
- maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, " DataExecuter"
133
+ maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, " DataExecuter" , streamResult
134
134
)
135
135
, AsyncIoFactory(std::move(asyncIoFactory))
136
- , StreamResult(streamResult)
137
136
, EnableOlapSink(enableOlapSink)
138
137
{
139
138
Target = creator;
@@ -347,7 +346,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
347
346
hFunc (TEvPersQueue::TEvProposeTransactionResult, HandlePrepare);
348
347
hFunc (TEvPrivate::TEvReattachToShard, HandleExecute);
349
348
hFunc (TEvDqCompute::TEvState, HandlePrepare); // from CA
350
- hFunc (TEvDqCompute::TEvChannelData, HandleExecute); // from CA
349
+ hFunc (TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
350
+ hFunc (TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
351
351
hFunc (TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
352
352
hFunc (TEvKqp::TEvAbortExecution, HandlePrepare);
353
353
hFunc (TEvents::TEvUndelivered, HandleUndelivered);
@@ -935,7 +935,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
935
935
hFunc (TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
936
936
hFunc (TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
937
937
hFunc (TEvDqCompute::TEvState, HandleComputeStats);
938
- hFunc (TEvDqCompute::TEvChannelData, HandleExecute);
938
+ hFunc (NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
939
+ hFunc (TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
939
940
hFunc (TEvKqp::TEvAbortExecution, HandleExecute);
940
941
IgnoreFunc (TEvInterconnect::TEvNodeConnected);
941
942
default :
@@ -1286,41 +1287,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
1286
1287
}
1287
1288
}
1288
1289
1289
- void HandleExecute (TEvDqCompute::TEvChannelData::TPtr& ev) {
1290
- auto & record = ev->Get ()->Record ;
1291
- auto & channelData = record.GetChannelData ();
1292
-
1293
- TDqSerializedBatch batch;
1294
- batch.Proto = std::move (*record.MutableChannelData ()->MutableData ());
1295
- if (batch.Proto .HasPayloadId ()) {
1296
- batch.Payload = ev->Get ()->GetPayload (batch.Proto .GetPayloadId ());
1297
- }
1298
-
1299
- auto & channel = TasksGraph.GetChannel (channelData.GetChannelId ());
1300
- YQL_ENSURE (channel.DstTask == 0 );
1301
- auto shardId = TasksGraph.GetTask (channel.SrcTask ).Meta .ShardId ;
1302
-
1303
- if (Stats) {
1304
- Stats->ResultBytes += batch.Size ();
1305
- Stats->ResultRows += batch.RowCount ();
1306
- }
1307
-
1308
- LOG_T (" Got result, channelId: " << channel.Id << " , shardId: " << shardId
1309
- << " , inputIndex: " << channel.DstInputIndex << " , from: " << ev->Sender
1310
- << " , finished: " << channelData.GetFinished ());
1311
-
1312
- ResponseEv->TakeResult (channel.DstInputIndex , std::move (batch));
1313
- {
1314
- LOG_T (" Send ack to channelId: " << channel.Id << " , seqNo: " << record.GetSeqNo () << " , to: " << ev->Sender );
1315
-
1316
- auto ackEv = MakeHolder<TEvDqCompute::TEvChannelDataAck>();
1317
- ackEv->Record .SetSeqNo (record.GetSeqNo ());
1318
- ackEv->Record .SetChannelId (channel.Id );
1319
- ackEv->Record .SetFreeSpace (50_MB);
1320
- Send (ev->Sender , ackEv.Release (), /* TODO: undelivery */ 0 , /* cookie */ channel.Id );
1321
- }
1322
- }
1323
-
1324
1290
private:
1325
1291
bool IsReadOnlyTx () const {
1326
1292
if (Request.TopicOperations .HasOperations ()) {
@@ -2417,7 +2383,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
2417
2383
2418
2384
private:
2419
2385
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
2420
- bool StreamResult = false ;
2421
2386
bool EnableOlapSink = false ;
2422
2387
2423
2388
bool HasExternalSources = false ;
0 commit comments