@@ -81,6 +81,10 @@ struct TShardRangesWithShardId {
81
81
const TShardKeyRanges* Ranges;
82
82
};
83
83
84
+ struct TStageScheduleInfo {
85
+ double StageCost = 0.0 ;
86
+ ui32 TaskCount = 0 ;
87
+ };
84
88
85
89
TActorId ReportToRl (ui64 ru, const TString& database, const TString& userToken,
86
90
const NKikimrKqp::TRlPath& path);
@@ -807,6 +811,40 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
807
811
}
808
812
}
809
813
814
+ std::map<ui32, TStageScheduleInfo> ScheduleByCost (const IKqpGateway::TPhysicalTxData& tx, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) {
815
+ std::map<ui32, TStageScheduleInfo> result;
816
+ if (!resourceSnapshot.empty ()) // can't schedule w/o node count
817
+ {
818
+ // collect costs and schedule stages with external sources only
819
+ double totalCost = 0.0 ;
820
+ for (ui32 stageIdx = 0 ; stageIdx < tx.Body ->StagesSize (); ++stageIdx) {
821
+ auto & stage = tx.Body ->GetStages (stageIdx);
822
+ if (stage.SourcesSize () > 0 && stage.GetSources (0 ).GetTypeCase () == NKqpProto::TKqpSource::kExternalSource ) {
823
+ if (stage.GetStageCost () > 0.0 && stage.GetTaskCount () == 0 ) {
824
+ totalCost += stage.GetStageCost ();
825
+ result.emplace (stageIdx, TStageScheduleInfo{.StageCost = stage.GetStageCost ()});
826
+ }
827
+ }
828
+ }
829
+ // assign task counts
830
+ if (!result.empty ()) {
831
+ // allow use 2/3 of threads in single stage
832
+ ui32 maxStageTaskCount = (TStagePredictor::GetUsableThreads () * 2 + 2 ) / 3 ;
833
+ // total limit per mode is x2
834
+ ui32 maxTotalTaskCount = maxStageTaskCount * 2 ;
835
+ for (auto & [_, stageInfo] : result) {
836
+ // schedule tasks evenly between nodes
837
+ stageInfo.TaskCount =
838
+ std::max<ui32>(
839
+ std::min (static_cast <ui32>(maxTotalTaskCount * stageInfo.StageCost / totalCost), maxStageTaskCount)
840
+ , 1
841
+ ) * resourceSnapshot.size ();
842
+ }
843
+ }
844
+ }
845
+ return result;
846
+ }
847
+
810
848
void BuildSysViewScanTasks (TStageInfo& stageInfo) {
811
849
Y_DEBUG_ABORT_UNLESS (stageInfo.Meta .IsSysView ());
812
850
@@ -912,7 +950,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
912
950
}
913
951
}
914
952
915
- void BuildReadTasksFromSource (TStageInfo& stageInfo, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) {
953
+ void BuildReadTasksFromSource (TStageInfo& stageInfo, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot, ui32 scheduledTaskCount ) {
916
954
const auto & stage = stageInfo.Meta .GetStage (stageInfo.Id );
917
955
918
956
YQL_ENSURE (stage.GetSources (0 ).HasExternalSource ());
@@ -923,7 +961,16 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
923
961
924
962
ui32 taskCount = externalSource.GetPartitionedTaskParams ().size ();
925
963
926
- if (!resourceSnapshot.empty ()) {
964
+ auto taskCountHint = stage.GetTaskCount ();
965
+ if (taskCountHint == 0 ) {
966
+ taskCountHint = scheduledTaskCount;
967
+ }
968
+
969
+ if (taskCountHint) {
970
+ if (taskCount > taskCountHint) {
971
+ taskCount = taskCountHint;
972
+ }
973
+ } else if (!resourceSnapshot.empty ()) {
927
974
ui32 maxTaskcount = resourceSnapshot.size () * 2 ;
928
975
if (taskCount > maxTaskcount) {
929
976
taskCount = maxTaskcount;
@@ -1302,7 +1349,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1302
1349
}
1303
1350
1304
1351
if (isShuffle) {
1305
- partitionsCount = std::max (partitionsCount, GetMaxTasksAggregation (stageInfo, inputTasks, nodesCount));
1352
+ if (stage.GetTaskCount ()) {
1353
+ partitionsCount = stage.GetTaskCount ();
1354
+ } else {
1355
+ partitionsCount = std::max (partitionsCount, GetMaxTasksAggregation (stageInfo, inputTasks, nodesCount));
1356
+ }
1306
1357
}
1307
1358
1308
1359
for (ui32 i = 0 ; i < partitionsCount; ++i) {
0 commit comments