Skip to content

Commit e1c8485

Browse files
authored
Migrate CTR to Boot 3.x and Batch 5.x (#5839)
* Migrate CTR to Boot 3.3 and Batch 5 * Remove the Batch Configurer and replace with a Configuration * Update Tests so that they will work with Boot3 * Removed EnableBatchAutoConfiguration no longer needed if using BatchAutoConfiguration * Removed schema requirements * Re-enable CTR Module build in main pom.xml * Add ability for composed task runner to use the proper JobRepository and TaskExecutor * BeanPostProcessor has been added so that CTR can use its jobRepository vs. the one provided by BatchAutoConfiguration
1 parent ff19e23 commit e1c8485

19 files changed

+219
-377
lines changed

pom.xml

+1-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,7 @@
7676
<module>spring-cloud-dataflow-server</module>
7777
<module>spring-cloud-dataflow-tasklauncher</module>
7878
<module>spring-cloud-dataflow-single-step-batch-job</module>
79-
<!-- TODO: Boot3x followup -->
80-
<!-- <module>spring-cloud-dataflow-composed-task-runner</module>-->
79+
<module>spring-cloud-dataflow-composed-task-runner</module>
8180
<module>spring-cloud-dataflow-test</module>
8281
<module>spring-cloud-dataflow-dependencies</module>
8382
<module>spring-cloud-dataflow-classic-docs</module>

spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedBatchConfigurer.java

-88
This file was deleted.

spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedRunnerJobFactory.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,11 +27,12 @@
2727
import org.springframework.batch.core.JobParametersBuilder;
2828
import org.springframework.batch.core.JobParametersIncrementer;
2929
import org.springframework.batch.core.Step;
30-
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
3130
import org.springframework.batch.core.job.builder.FlowBuilder;
3231
import org.springframework.batch.core.job.builder.FlowJobBuilder;
32+
import org.springframework.batch.core.job.builder.JobBuilder;
3333
import org.springframework.batch.core.job.flow.Flow;
3434
import org.springframework.batch.core.launch.support.RunIdIncrementer;
35+
import org.springframework.batch.core.repository.JobRepository;
3536
import org.springframework.beans.factory.FactoryBean;
3637
import org.springframework.beans.factory.annotation.Autowired;
3738
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
@@ -65,7 +66,7 @@ public class ComposedRunnerJobFactory implements FactoryBean<Job> {
6566
private TaskExecutor taskExecutor;
6667

6768
@Autowired
68-
private JobBuilderFactory jobBuilderFactory;
69+
private JobRepository jobRepository;
6970

7071
@Autowired
7172
private TaskNameResolver taskNameResolver;
@@ -105,9 +106,8 @@ public Job getObject() throws Exception {
105106
taskParser.parse().accept(composedRunnerVisitor);
106107

107108
this.visitorDeque = composedRunnerVisitor.getFlow();
108-
109-
FlowJobBuilder builder = this.jobBuilderFactory
110-
.get(this.taskNameResolver.getTaskName())
109+
JobBuilder jobBuilder = new JobBuilder(this.taskNameResolver.getTaskName(), jobRepository);
110+
FlowJobBuilder builder = jobBuilder
111111
.start(this.flowBuilder
112112
.start(createFlow())
113113
.end())
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,103 +17,42 @@
1717
package org.springframework.cloud.dataflow.composedtaskrunner;
1818

1919
import javax.sql.DataSource;
20-
import java.util.HashMap;
21-
import java.util.Map;
22-
import java.util.Set;
23-
24-
import org.slf4j.Logger;
25-
import org.slf4j.LoggerFactory;
2620

2721
import org.springframework.batch.core.StepExecutionListener;
28-
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
29-
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
30-
import org.springframework.boot.autoconfigure.batch.BatchProperties;
31-
import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
22+
import org.springframework.batch.core.repository.JobRepository;
23+
import org.springframework.beans.factory.config.BeanPostProcessor;
3224
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3325
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
34-
import org.springframework.cloud.dataflow.core.database.support.MultiSchemaTaskExecutionDaoFactoryBean;
35-
import org.springframework.cloud.dataflow.core.dsl.TaskParser;
3626
import org.springframework.cloud.task.configuration.EnableTask;
3727
import org.springframework.cloud.task.listener.TaskExecutionListener;
3828
import org.springframework.cloud.task.repository.TaskExplorer;
39-
import org.springframework.cloud.task.repository.support.SimpleTaskExplorer;
40-
import org.springframework.cloud.task.repository.support.TaskExecutionDaoFactoryBean;
4129
import org.springframework.context.annotation.Bean;
4230
import org.springframework.context.annotation.Configuration;
4331
import org.springframework.context.annotation.Import;
44-
import org.springframework.core.env.Environment;
4532
import org.springframework.core.task.TaskExecutor;
4633
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
47-
import org.springframework.util.StringUtils;
34+
import org.springframework.transaction.PlatformTransactionManager;
4835

4936
/**
5037
* Configures the Job that will execute the Composed Task Execution.
5138
*
5239
* @author Glenn Renfro
5340
* @author Corneil du Plessis
5441
*/
55-
@EnableBatchProcessing
5642
@EnableTask
5743
@EnableConfigurationProperties(ComposedTaskProperties.class)
5844
@Configuration
5945
@Import(org.springframework.cloud.dataflow.composedtaskrunner.StepBeanDefinitionRegistrar.class)
6046
public class ComposedTaskRunnerConfiguration {
61-
private final static Logger logger = LoggerFactory.getLogger(ComposedTaskRunnerConfiguration.class);
6247

6348
@Bean
6449
public TaskExecutionListener taskExecutionListener() {
6550
return new ComposedTaskRunnerTaskListener();
6651
}
6752

6853
@Bean
69-
public StepExecutionListener composedTaskStepExecutionListener(TaskExplorerContainer taskExplorerContainer) {
70-
return new org.springframework.cloud.dataflow.composedtaskrunner.ComposedTaskStepExecutionListener(taskExplorerContainer);
71-
}
72-
73-
@Bean
74-
TaskExplorerContainer taskExplorerContainer(TaskExplorer taskExplorer, DataSource dataSource, ComposedTaskProperties properties, Environment env) {
75-
Map<String, TaskExplorer> explorers = new HashMap<>();
76-
String ctrName = env.getProperty("spring.cloud.task.name");
77-
if (!StringUtils.hasText(ctrName)) {
78-
throw new IllegalStateException("spring.cloud.task.name property must have a value.");
79-
}
80-
TaskParser parser = new TaskParser("ctr", properties.getGraph(), false, true);
81-
StepBeanDefinitionRegistrar.TaskAppsMapCollector collector = new StepBeanDefinitionRegistrar.TaskAppsMapCollector();
82-
parser.parse().accept(collector);
83-
Set<String> taskNames = collector.getTaskApps().keySet();
84-
logger.debug("taskExplorerContainer:taskNames:{}", taskNames);
85-
for (String taskName : taskNames) {
86-
addTaskExplorer(dataSource, properties, env, explorers, taskName);
87-
String appName = taskName.replace(ctrName + "-", "");
88-
addTaskExplorer(dataSource, properties, env, explorers, appName);
89-
if(taskName.length() > ctrName.length()) {
90-
String shortTaskName = taskName.substring(ctrName.length() + 1);
91-
addTaskExplorer(dataSource, properties, env, explorers, shortTaskName);
92-
}
93-
}
94-
return new TaskExplorerContainer(explorers, taskExplorer);
95-
}
96-
97-
private static void addTaskExplorer(
98-
DataSource dataSource,
99-
ComposedTaskProperties properties,
100-
Environment env,
101-
Map<String, TaskExplorer> explorers,
102-
String taskName
103-
) {
104-
logger.debug("addTaskExplorer:{}", taskName);
105-
String propertyName = String.format("app.%s.spring.cloud.task.tablePrefix", taskName);
106-
String prefix = properties.getComposedTaskAppProperties().get(propertyName);
107-
if (prefix == null) {
108-
prefix = env.getProperty(propertyName);
109-
}
110-
if (prefix != null) {
111-
TaskExecutionDaoFactoryBean factoryBean = new MultiSchemaTaskExecutionDaoFactoryBean(dataSource, prefix);
112-
logger.debug("taskExplorerContainer:adding:{}:{}", taskName, prefix);
113-
explorers.put(taskName, new SimpleTaskExplorer(factoryBean));
114-
} else {
115-
logger.warn("Cannot find {} in {} ", propertyName, properties.getComposedTaskAppProperties());
116-
}
54+
public StepExecutionListener composedTaskStepExecutionListener(TaskExplorer taskExplorer) {
55+
return new org.springframework.cloud.dataflow.composedtaskrunner.ComposedTaskStepExecutionListener(taskExplorer);
11756
}
11857

11958
@Bean
@@ -128,25 +67,21 @@ public TaskExecutor taskExecutor(ComposedTaskProperties properties) {
12867
taskExecutor.setMaxPoolSize(properties.getSplitThreadMaxPoolSize());
12968
taskExecutor.setKeepAliveSeconds(properties.getSplitThreadKeepAliveSeconds());
13069
taskExecutor.setAllowCoreThreadTimeOut(
131-
properties.isSplitThreadAllowCoreThreadTimeout());
70+
properties.isSplitThreadAllowCoreThreadTimeout());
13271
taskExecutor.setQueueCapacity(properties.getSplitThreadQueueCapacity());
13372
taskExecutor.setWaitForTasksToCompleteOnShutdown(
134-
properties.isSplitThreadWaitForTasksToCompleteOnShutdown());
73+
properties.isSplitThreadWaitForTasksToCompleteOnShutdown());
13574
return taskExecutor;
13675
}
13776

77+
/**
78+
* Provides the {@link JobRepository} that is configured to be used by the composed task runner.
79+
*/
13880
@Bean
139-
public BatchConfigurer getComposedBatchConfigurer(
140-
BatchProperties properties,
141-
DataSource dataSource,
142-
TransactionManagerCustomizers transactionManagerCustomizers,
143-
ComposedTaskProperties composedTaskProperties
144-
) {
145-
return new ComposedBatchConfigurer(
146-
properties,
147-
dataSource,
148-
transactionManagerCustomizers,
149-
composedTaskProperties
150-
);
81+
public BeanPostProcessor jobRepositoryBeanPostProcessor(PlatformTransactionManager transactionManager,
82+
DataSource incrementerDataSource,
83+
ComposedTaskProperties composedTaskProperties) {
84+
return new JobRepositoryBeanPostProcessor(transactionManager, incrementerDataSource, composedTaskProperties);
15185
}
86+
15287
}

spring-cloud-dataflow-composed-task-runner/src/main/java/org/springframework/cloud/dataflow/composedtaskrunner/ComposedTaskRunnerStepFactory.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,18 +31,21 @@
3131

3232
import org.springframework.batch.core.Step;
3333
import org.springframework.batch.core.StepExecutionListener;
34-
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
34+
import org.springframework.batch.core.repository.JobRepository;
35+
import org.springframework.batch.core.step.builder.StepBuilder;
3536
import org.springframework.beans.factory.FactoryBean;
3637
import org.springframework.beans.factory.annotation.Autowired;
3738
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
3839
import org.springframework.cloud.dataflow.core.Base64Utils;
3940
import org.springframework.cloud.dataflow.rest.support.jackson.Jackson2DataflowModule;
4041
import org.springframework.cloud.task.configuration.TaskProperties;
42+
import org.springframework.cloud.task.repository.TaskExplorer;
4143
import org.springframework.core.env.Environment;
4244
import org.springframework.hateoas.mediatype.hal.Jackson2HalModule;
4345
import org.springframework.security.oauth2.client.endpoint.OAuth2AccessTokenResponseClient;
4446
import org.springframework.security.oauth2.client.endpoint.OAuth2ClientCredentialsGrantRequest;
4547
import org.springframework.security.oauth2.client.registration.ClientRegistrationRepository;
48+
import org.springframework.transaction.PlatformTransactionManager;
4649
import org.springframework.transaction.annotation.Isolation;
4750
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
4851
import org.springframework.transaction.interceptor.TransactionAttribute;
@@ -74,13 +77,16 @@ public class ComposedTaskRunnerStepFactory implements FactoryBean<Step> {
7477
private List<String> arguments = new ArrayList<>();
7578

7679
@Autowired
77-
private StepBuilderFactory steps;
80+
private JobRepository jobRepository;
81+
82+
@Autowired
83+
private PlatformTransactionManager transactionManager;
7884

7985
@Autowired
8086
private StepExecutionListener composedTaskStepExecutionListener;
8187

8288
@Autowired
83-
private TaskExplorerContainer taskExplorerContainer;
89+
private TaskExplorer taskExplorer;
8490

8591
@Autowired
8692
private TaskProperties taskProperties;
@@ -133,7 +139,7 @@ public Step getObject() {
133139
TaskLauncherTasklet taskLauncherTasklet = new TaskLauncherTasklet(
134140
this.clientRegistrations,
135141
this.clientCredentialsTokenResponseClient,
136-
this.taskExplorerContainer.get(this.taskNameId),
142+
this.taskExplorer,
137143
this.composedTaskPropertiesFromEnv,
138144
this.taskName,
139145
taskProperties,
@@ -168,9 +174,9 @@ public Step getObject() {
168174

169175
taskLauncherTasklet.setProperties(propertiesToUse);
170176
logger.debug("Properties to use {}", propertiesToUse);
171-
172-
return this.steps.get(this.taskName)
173-
.tasklet(taskLauncherTasklet)
177+
StepBuilder stepBuilder = new StepBuilder(this.taskName, this.jobRepository);
178+
return stepBuilder
179+
.tasklet(taskLauncherTasklet, this.transactionManager)
174180
.transactionAttribute(getTransactionAttribute())
175181
.listener(this.composedTaskStepExecutionListener)
176182
.build();

0 commit comments

Comments
 (0)