@@ -130,10 +130,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
130130 const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
131131 const bool enableOlapSink)
132132 : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
133- maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, " DataExecuter"
133+ maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, " DataExecuter" , streamResult
134134 )
135135 , AsyncIoFactory(std::move(asyncIoFactory))
136- , StreamResult(streamResult)
137136 , EnableOlapSink(enableOlapSink)
138137 {
139138 Target = creator;
@@ -347,7 +346,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
347346 hFunc (TEvPersQueue::TEvProposeTransactionResult, HandlePrepare);
348347 hFunc (TEvPrivate::TEvReattachToShard, HandleExecute);
349348 hFunc (TEvDqCompute::TEvState, HandlePrepare); // from CA
350- hFunc (TEvDqCompute::TEvChannelData, HandleExecute); // from CA
349+ hFunc (TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
350+ hFunc (TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
351351 hFunc (TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
352352 hFunc (TEvKqp::TEvAbortExecution, HandlePrepare);
353353 hFunc (TEvents::TEvUndelivered, HandleUndelivered);
@@ -935,7 +935,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
935935 hFunc (TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
936936 hFunc (TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
937937 hFunc (TEvDqCompute::TEvState, HandleComputeStats);
938- hFunc (TEvDqCompute::TEvChannelData, HandleExecute);
938+ hFunc (NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
939+ hFunc (TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
939940 hFunc (TEvKqp::TEvAbortExecution, HandleExecute);
940941 IgnoreFunc (TEvInterconnect::TEvNodeConnected);
941942 default :
@@ -1286,41 +1287,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
12861287 }
12871288 }
12881289
1289- void HandleExecute (TEvDqCompute::TEvChannelData::TPtr& ev) {
1290- auto & record = ev->Get ()->Record ;
1291- auto & channelData = record.GetChannelData ();
1292-
1293- TDqSerializedBatch batch;
1294- batch.Proto = std::move (*record.MutableChannelData ()->MutableData ());
1295- if (batch.Proto .HasPayloadId ()) {
1296- batch.Payload = ev->Get ()->GetPayload (batch.Proto .GetPayloadId ());
1297- }
1298-
1299- auto & channel = TasksGraph.GetChannel (channelData.GetChannelId ());
1300- YQL_ENSURE (channel.DstTask == 0 );
1301- auto shardId = TasksGraph.GetTask (channel.SrcTask ).Meta .ShardId ;
1302-
1303- if (Stats) {
1304- Stats->ResultBytes += batch.Size ();
1305- Stats->ResultRows += batch.RowCount ();
1306- }
1307-
1308- LOG_T (" Got result, channelId: " << channel.Id << " , shardId: " << shardId
1309- << " , inputIndex: " << channel.DstInputIndex << " , from: " << ev->Sender
1310- << " , finished: " << channelData.GetFinished ());
1311-
1312- ResponseEv->TakeResult (channel.DstInputIndex , std::move (batch));
1313- {
1314- LOG_T (" Send ack to channelId: " << channel.Id << " , seqNo: " << record.GetSeqNo () << " , to: " << ev->Sender );
1315-
1316- auto ackEv = MakeHolder<TEvDqCompute::TEvChannelDataAck>();
1317- ackEv->Record .SetSeqNo (record.GetSeqNo ());
1318- ackEv->Record .SetChannelId (channel.Id );
1319- ackEv->Record .SetFreeSpace (50_MB);
1320- Send (ev->Sender , ackEv.Release (), /* TODO: undelivery */ 0 , /* cookie */ channel.Id );
1321- }
1322- }
1323-
13241290private:
13251291 bool IsReadOnlyTx () const {
13261292 if (Request.TopicOperations .HasOperations ()) {
@@ -2417,7 +2383,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24172383
24182384private:
24192385 NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
2420- bool StreamResult = false ;
24212386 bool EnableOlapSink = false ;
24222387
24232388 bool HasExternalSources = false ;
0 commit comments