Skip to content

Commit a85f8a9

Browse files
committed
use topology sort
1 parent 858ad5e commit a85f8a9

File tree

1 file changed

+51
-10
lines changed
  • dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph

1 file changed

+51
-10
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java

+51-10
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,16 @@
1919

2020
import static com.google.common.base.Preconditions.checkNotNull;
2121

22-
import org.apache.dolphinscheduler.common.graph.DAG;
2322
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
2423
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelation;
2524

25+
import java.util.ArrayDeque;
2626
import java.util.ArrayList;
2727
import java.util.HashMap;
2828
import java.util.HashSet;
2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.Queue;
3132
import java.util.Set;
3233
import java.util.function.Function;
3334
import java.util.stream.Collectors;
@@ -60,18 +61,58 @@ public WorkflowGraph(List<WorkflowTaskRelation> workflowTaskRelations, List<Task
6061
}
6162

6263
private void checkIfDAG(List<WorkflowTaskRelation> workflowTaskRelations, List<TaskDefinition> taskDefinitions) {
63-
DAG<Long, TaskDefinition, WorkflowTaskRelation> graph = new DAG<>();
64-
// Fill the vertices
64+
// If topology-sort-result`s size less than taskDefinitions`s size, then not a DAG
65+
Map<Long, List<Long>> preTaskCodeMap = workflowTaskRelations
66+
.stream()
67+
.filter(relation -> relation.getPreTaskCode() != 0)
68+
.collect(Collectors.groupingBy(WorkflowTaskRelation::getPostTaskCode,
69+
Collectors.mapping(WorkflowTaskRelation::getPreTaskCode, Collectors.toList())));
70+
Map<Long, List<Long>> postTaskCodeMap = workflowTaskRelations
71+
.stream()
72+
.collect(Collectors.groupingBy(WorkflowTaskRelation::getPreTaskCode,
73+
Collectors.mapping(WorkflowTaskRelation::getPostTaskCode, Collectors.toList())));
74+
75+
// build in-degree count
76+
Map<Long, Integer> inDegreeCount = new HashMap<>();
6577
for (TaskDefinition taskDefinition : taskDefinitions) {
66-
graph.addNode(taskDefinition.getCode(), taskDefinition);
78+
List<Long> preTasks = preTaskCodeMap.get(taskDefinition.getCode());
79+
if (preTasks == null) {
80+
inDegreeCount.put(taskDefinition.getCode(), 0);
81+
} else {
82+
inDegreeCount.put(taskDefinition.getCode(), preTasks.size());
83+
}
84+
}
85+
86+
// Adds the task with zero-in-degree to the queue
87+
Set<Long> visitTable = new HashSet<>();
88+
Queue<Long> queue = new ArrayDeque<>();
89+
for (Map.Entry<Long, Integer> entry : inDegreeCount.entrySet()) {
90+
if (entry.getValue() == 0 && visitTable.add(entry.getKey())) {
91+
queue.offer(entry.getKey());
92+
}
6793
}
68-
// Fill edge relations
69-
for (WorkflowTaskRelation relation : workflowTaskRelations) {
70-
long preTaskCode = relation.getPreTaskCode();
71-
// When exist a ring cycle, then not a DAG.
72-
if (preTaskCode != 0 && !graph.addEdge(preTaskCode, relation.getPostTaskCode())) {
73-
throw new IllegalArgumentException("The workflow graph is not a DAG");
94+
95+
// topology sort
96+
Set<Long> resultTable = new HashSet<>();
97+
while (!queue.isEmpty()) {
98+
Long taskCode = queue.poll();
99+
resultTable.add(taskCode);
100+
101+
List<Long> postCodes = postTaskCodeMap.get(taskCode);
102+
if (postCodes == null) {
103+
continue;
74104
}
105+
for (Long postCode : postCodes) {
106+
inDegreeCount.put(postCode, inDegreeCount.get(postCode) - 1);
107+
108+
if (inDegreeCount.get(postCode) == 0) {
109+
queue.offer(postCode);
110+
}
111+
}
112+
}
113+
114+
if (resultTable.size() < taskDefinitions.size()) {
115+
throw new IllegalArgumentException("The workflow task relation is not a DAG");
75116
}
76117
}
77118

0 commit comments

Comments
 (0)