20
20
import java .util .Arrays ;
21
21
import java .util .Collections ;
22
22
import java .util .Date ;
23
+ import java .util .HashMap ;
23
24
import java .util .List ;
25
+ import java .util .Map ;
24
26
25
27
import org .apache .commons .logging .Log ;
26
28
import org .apache .commons .logging .LogFactory ;
29
31
import org .springframework .batch .core .Job ;
30
32
import org .springframework .batch .core .JobExecution ;
31
33
import org .springframework .batch .core .JobExecutionException ;
34
+ import org .springframework .batch .core .JobParameter ;
32
35
import org .springframework .batch .core .JobParameters ;
33
36
import org .springframework .batch .core .JobParametersBuilder ;
37
+ import org .springframework .batch .core .JobParametersIncrementer ;
34
38
import org .springframework .batch .core .JobParametersInvalidException ;
35
39
import org .springframework .batch .core .explore .JobExplorer ;
36
40
import org .springframework .batch .core .launch .JobLauncher ;
37
41
import org .springframework .batch .core .repository .JobExecutionAlreadyRunningException ;
38
42
import org .springframework .batch .core .repository .JobInstanceAlreadyCompleteException ;
43
+ import org .springframework .batch .core .repository .JobRepository ;
39
44
import org .springframework .batch .core .repository .JobRestartException ;
40
45
import org .springframework .batch .repeat .RepeatCallback ;
41
46
import org .springframework .batch .repeat .RepeatContext ;
@@ -71,6 +76,8 @@ public class TaskJobLauncherCommandLineRunner extends JobLauncherCommandLineRunn
71
76
72
77
private JobExplorer taskJobExplorer ;
73
78
79
+ private JobRepository taskJobRepository ;
80
+
74
81
private static final Log logger = LogFactory
75
82
.getLog (TaskJobLauncherCommandLineRunner .class );
76
83
@@ -80,11 +87,20 @@ public class TaskJobLauncherCommandLineRunner extends JobLauncherCommandLineRunn
80
87
81
88
private TaskBatchProperties taskBatchProperties ;
82
89
83
- public TaskJobLauncherCommandLineRunner (JobLauncher jobLauncher ,
84
- JobExplorer jobExplorer , TaskBatchProperties taskBatchProperties ) {
85
- super (jobLauncher , jobExplorer );
90
+ /**
91
+ * Create a new {@link TaskJobLauncherCommandLineRunner}.
92
+ * @param jobLauncher to launch jobs
93
+ * @param jobExplorer to check the job repository for previous executions
94
+ * @param jobRepository to check if a job instance exists with the given parameters
95
+ * when running a job
96
+ * @param taskBatchProperties the properties used to configure the taskBatchProperties.
97
+ */
98
+ public TaskJobLauncherCommandLineRunner (JobLauncher jobLauncher , JobExplorer jobExplorer ,
99
+ JobRepository jobRepository , TaskBatchProperties taskBatchProperties ) {
100
+ super (jobLauncher , jobExplorer , jobRepository );
86
101
this .taskJobLauncher = jobLauncher ;
87
102
this .taskJobExplorer = jobExplorer ;
103
+ this .taskJobRepository = jobRepository ;
88
104
this .taskBatchProperties = taskBatchProperties ;
89
105
}
90
106
@@ -103,9 +119,39 @@ public void run(String... args) throws JobExecutionException {
103
119
protected void execute (Job job , JobParameters jobParameters )
104
120
throws JobExecutionAlreadyRunningException , JobRestartException ,
105
121
JobInstanceAlreadyCompleteException , JobParametersInvalidException {
106
- JobParameters nextParameters = new JobParametersBuilder (jobParameters ,
107
- this .taskJobExplorer ).getNextJobParameters (job ).toJobParameters ();
108
- JobExecution execution = this .taskJobLauncher .run (job , nextParameters );
122
+ String jobName = job .getName ();
123
+ JobParameters parameters = jobParameters ;
124
+ boolean jobInstanceExists = this .taskJobRepository .isJobInstanceExists (jobName ,
125
+ parameters );
126
+ if (jobInstanceExists ) {
127
+ JobExecution lastJobExecution = this .taskJobRepository
128
+ .getLastJobExecution (jobName , jobParameters );
129
+ if (lastJobExecution != null && isStoppedOrFailed (lastJobExecution )
130
+ && job .isRestartable ()) {
131
+ // Retry a failed or stopped execution with previous parameters
132
+ JobParameters previousParameters = lastJobExecution .getJobParameters ();
133
+ /*
134
+ * remove Non-identifying parameters from the previous execution's
135
+ * parameters since there is no way to remove them programmatically. If
136
+ * they are required (or need to be modified) on a restart, they need to
137
+ * be (re)specified.
138
+ */
139
+ JobParameters previousIdentifyingParameters = removeNonIdentifying (
140
+ previousParameters );
141
+ // merge additional parameters with previous ones (overriding those with
142
+ // the same key)
143
+ parameters = merge (previousIdentifyingParameters , jobParameters );
144
+ }
145
+ }
146
+ else {
147
+ JobParametersIncrementer incrementer = job .getJobParametersIncrementer ();
148
+ if (incrementer != null ) {
149
+ JobParameters nextParameters = new JobParametersBuilder (jobParameters ,
150
+ this .taskJobExplorer ).getNextJobParameters (job ).toJobParameters ();
151
+ parameters = merge (nextParameters , jobParameters );
152
+ }
153
+ }
154
+ JobExecution execution = this .taskJobLauncher .run (job , parameters );
109
155
if (this .taskApplicationEventPublisher != null ) {
110
156
this .taskApplicationEventPublisher .publishEvent (new JobExecutionEvent (execution ));
111
157
}
@@ -164,5 +210,24 @@ public void throwJobFailedException(List<JobExecution> failedJobExecutions) {
164
210
throw new TaskException (message );
165
211
166
212
}
167
-
213
+ private JobParameters removeNonIdentifying (JobParameters parameters ) {
214
+ Map <String , JobParameter > parameterMap = parameters .getParameters ();
215
+ HashMap <String , JobParameter > copy = new HashMap <>(parameterMap );
216
+ for (Map .Entry <String , JobParameter > parameter : copy .entrySet ()) {
217
+ if (!parameter .getValue ().isIdentifying ()) {
218
+ parameterMap .remove (parameter .getKey ());
219
+ }
220
+ }
221
+ return new JobParameters (parameterMap );
222
+ }
223
+ private boolean isStoppedOrFailed (JobExecution execution ) {
224
+ BatchStatus status = execution .getStatus ();
225
+ return (status == BatchStatus .STOPPED || status == BatchStatus .FAILED );
226
+ }
227
+ private JobParameters merge (JobParameters parameters , JobParameters additionals ) {
228
+ Map <String , JobParameter > merged = new HashMap <>();
229
+ merged .putAll (parameters .getParameters ());
230
+ merged .putAll (additionals .getParameters ());
231
+ return new JobParameters (merged );
232
+ }
168
233
}
0 commit comments