Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Add support for MySQL 5/8 via MariaDB driver #5071

Merged
merged 1 commit into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions spring-cloud-dataflow-server-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,16 @@
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mariadb</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,46 @@
import javax.sql.DataSource;

import org.flywaydb.core.api.configuration.FluentConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.boot.autoconfigure.flyway.FlywayConfigurationCustomizer;
import org.springframework.boot.jdbc.DatabaseDriver;
import org.springframework.cloud.dataflow.common.flyway.DatabaseDriverUtils;
import org.springframework.cloud.dataflow.server.db.migration.db2.Db2BeforeBaseline;
import org.springframework.cloud.dataflow.server.db.migration.mariadb.MariadbBeforeBaseline;
import org.springframework.cloud.dataflow.server.db.migration.mysql.MysqlBeforeBaseline;
import org.springframework.cloud.dataflow.server.db.migration.oracle.OracleBeforeBaseline;
import org.springframework.cloud.dataflow.server.db.migration.postgresql.PostgresBeforeBaseline;
import org.springframework.cloud.dataflow.server.db.migration.sqlserver.MsSqlBeforeBaseline;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.jdbc.support.MetaDataAccessException;

/**
* Flyway {@link FlywayConfigurationCustomizer} bean customizing callbacks per
* active db vendor.
*
* @author Janne Valkealahti
*
* @author Chris Bono
*/
public class DataFlowFlywayConfigurationCustomizer implements FlywayConfigurationCustomizer {

private static final Logger LOG = LoggerFactory.getLogger(DataFlowFlywayConfigurationCustomizer.class);

@Override
public void customize(FluentConfiguration configuration) {
// boot's flyway auto-config doesn't allow to define callbacks per
// Boot's flyway auto-config doesn't allow to define callbacks per
// vendor id, so essentially customizing those here.
DataSource dataSource = configuration.getDataSource();
DatabaseDriver databaseDriver = getDatabaseDriver(dataSource);
DatabaseDriver databaseDriver = DatabaseDriverUtils.getDatabaseDriver(dataSource);
LOG.info("Adding vendor specific Flyway callback for {}", databaseDriver.name());
if (databaseDriver == DatabaseDriver.POSTGRESQL) {
configuration.callbacks(new PostgresBeforeBaseline());
}
else if (databaseDriver == DatabaseDriver.MARIADB) {
configuration.callbacks(new MariadbBeforeBaseline());
}
else if (databaseDriver == DatabaseDriver.MYSQL) {
configuration.callbacks(new MysqlBeforeBaseline());
}
else if (databaseDriver == DatabaseDriver.SQLSERVER) {
configuration.callbacks(new MsSqlBeforeBaseline());
}
Expand All @@ -60,15 +68,4 @@ else if (databaseDriver == DatabaseDriver.DB2) {
configuration.callbacks(new Db2BeforeBaseline());
}
}

private DatabaseDriver getDatabaseDriver(DataSource dataSource) {
// copied from boot's flyway auto-config to get matching db vendor id
try {
String url = JdbcUtils.extractDatabaseMetaData(dataSource, "getURL");
return DatabaseDriver.fromJdbcUrl(url);
}
catch (MetaDataAccessException ex) {
throw new IllegalStateException(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.dataflow.server.db.migration.mysql;

import java.util.Arrays;
import java.util.List;

import org.springframework.cloud.dataflow.common.flyway.SqlCommand;
import org.springframework.cloud.dataflow.server.db.migration.AbstractBaselineCallback;

/**
* Baselining schema setup for {@code postgres}.
*
* @author Janne Valkealahti
* @author Chris Bono
*/
public class MysqlBeforeBaseline extends AbstractBaselineCallback {

public final static String DROP_AUDIT_RECORDS_AUDIT_ACTION_IDX_INDEX =
"drop index AUDIT_RECORDS_AUDIT_ACTION_IDX on AUDIT_RECORDS";

public final static String DROP_AUDIT_RECORDS_AUDIT_OPERATION_IDX_INDEX =
"drop index AUDIT_RECORDS_AUDIT_OPERATION_IDX on AUDIT_RECORDS";

public final static String DROP_AUDIT_RECORDS_CORRELATION_ID_IDX_INDEX =
"drop index AUDIT_RECORDS_CORRELATION_ID_IDX on AUDIT_RECORDS";

public final static String DROP_AUDIT_RECORDS_CREATED_ON_IDX_INDEX =
"drop index AUDIT_RECORDS_CREATED_ON_IDX on AUDIT_RECORDS";

public final static String CREATE_APP_REGISTRATION_TMP_TABLE =
V1__Initial_Setup.CREATE_APP_REGISTRATION_TABLE.replaceFirst("app_registration", "app_registration_tmp");

public final static String INSERT_APP_REGISTRATION_DATA =
"insert into\n" +
" app_registration_tmp (id, object_version, default_version, metadata_uri, name, type, uri, version) \n" +
" select id, object_Version, default_Version, metadata_Uri, name, type, uri, version\n" +
" from APP_REGISTRATION";

public final static String DROP_APP_REGISTRATION_TABLE =
"drop table APP_REGISTRATION";

public final static String RENAME_APP_REGISTRATION_TMP_TABLE =
"alter table app_registration_tmp rename to app_registration";

public final static String CREATE_STREAM_DEFINITIONS_TMP_TABLE =
V1__Initial_Setup.CREATE_STREAM_DEFINITIONS_TABLE.replaceFirst("stream_definitions", "stream_definitions_tmp");

public final static String INSERT_STREAM_DEFINITIONS_DATA =
"insert into\n" +
" stream_definitions_tmp (definition_name, definition) \n" +
" select DEFINITION_NAME, DEFINITION\n" +
" from STREAM_DEFINITIONS";

public final static String DROP_STREAM_DEFINITIONS_TABLE =
"drop table STREAM_DEFINITIONS";

public final static String RENAME_STREAM_DEFINITIONS_TMP_TABLE =
"alter table stream_definitions_tmp rename to stream_definitions";

public final static String CREATE_TASK_DEFINITIONS_TMP_TABLE =
V1__Initial_Setup.CREATE_TASK_DEFINITIONS_TABLE.replaceFirst("task_definitions", "task_definitions_tmp");

public final static String INSERT_TASK_DEFINITIONS_DATA =
"insert into\n" +
" task_definitions_tmp (definition_name, definition) \n" +
" select DEFINITION_NAME, DEFINITION\n" +
" from TASK_DEFINITIONS";

public final static String DROP_TASK_DEFINITIONS_TABLE =
"drop table TASK_DEFINITIONS";

public final static String RENAME_TASK_DEFINITIONS_TMP_TABLE =
"alter table task_definitions_tmp rename to task_definitions";

public final static String CREATE_AUDIT_RECORDS_TMP_TABLE =
V1__Initial_Setup.CREATE_AUDIT_RECORDS_TABLE.replaceFirst("audit_records", "audit_records_tmp");

public final static String INSERT_AUDIT_RECORDS_DATA =
"insert into\n" +
" audit_records_tmp (id, audit_action, audit_data, audit_operation, correlation_id, created_by, created_on)\n" +
" select id, audit_Action, audit_data, audit_Operation, correlation_id, created_by, created_On\n" +
" from AUDIT_RECORDS";

public final static String DROP_AUDIT_RECORDS_TABLE =
"drop table AUDIT_RECORDS";

public final static String RENAME_AUDIT_RECORDS_TMP_TABLE =
"alter table audit_records_tmp rename to audit_records";

public final static String CREATE_AUDIT_RECORDS_AUDIT_ACTION_IDX_INDEX =
"create index audit_records_audit_action_idx on audit_records (audit_action)";

public final static String CREATE_AUDIT_RECORDS_AUDIT_OPERATION_IDX_INDEX =
"create index audit_records_audit_operation_idx on audit_records (audit_operation)";

public final static String CREATE_AUDIT_RECORDS_CORRELATION_ID_IDX_INDEX =
"create index audit_records_correlation_id_idx on audit_records (correlation_id)";

public final static String CREATE_AUDIT_RECORDS_CREATED_ON_IDX_INDEX =
"create index audit_records_created_on_idx on audit_records (created_on)";

/**
* Instantiates a new postgres before baseline.
*/
public MysqlBeforeBaseline() {
super(new V1__Initial_Setup());
}

@Override
public List<SqlCommand> dropIndexes() {
return Arrays.asList(
SqlCommand.from(DROP_AUDIT_RECORDS_AUDIT_ACTION_IDX_INDEX),
SqlCommand.from(DROP_AUDIT_RECORDS_AUDIT_OPERATION_IDX_INDEX),
SqlCommand.from(DROP_AUDIT_RECORDS_CORRELATION_ID_IDX_INDEX),
SqlCommand.from(DROP_AUDIT_RECORDS_CREATED_ON_IDX_INDEX));
}

@Override
public List<SqlCommand> changeAppRegistrationTable() {
return Arrays.asList(
SqlCommand.from(CREATE_APP_REGISTRATION_TMP_TABLE),
SqlCommand.from(INSERT_APP_REGISTRATION_DATA),
SqlCommand.from(DROP_APP_REGISTRATION_TABLE),
SqlCommand.from(RENAME_APP_REGISTRATION_TMP_TABLE));
}

@Override
public List<SqlCommand> changeUriRegistryTable() {
return Arrays.asList(
new MysqlMigrateUriRegistrySqlCommand());
}

@Override
public List<SqlCommand> changeStreamDefinitionsTable() {
return Arrays.asList(
SqlCommand.from(CREATE_STREAM_DEFINITIONS_TMP_TABLE),
SqlCommand.from(INSERT_STREAM_DEFINITIONS_DATA),
SqlCommand.from(DROP_STREAM_DEFINITIONS_TABLE),
SqlCommand.from(RENAME_STREAM_DEFINITIONS_TMP_TABLE));
}

@Override
public List<SqlCommand> changeTaskDefinitionsTable() {
return Arrays.asList(
SqlCommand.from(CREATE_TASK_DEFINITIONS_TMP_TABLE),
SqlCommand.from(INSERT_TASK_DEFINITIONS_DATA),
SqlCommand.from(DROP_TASK_DEFINITIONS_TABLE),
SqlCommand.from(RENAME_TASK_DEFINITIONS_TMP_TABLE));
}

@Override
public List<SqlCommand> changeAuditRecordsTable() {
return Arrays.asList(
SqlCommand.from(CREATE_AUDIT_RECORDS_TMP_TABLE),
SqlCommand.from(INSERT_AUDIT_RECORDS_DATA),
SqlCommand.from(DROP_AUDIT_RECORDS_TABLE),
SqlCommand.from(RENAME_AUDIT_RECORDS_TMP_TABLE));
}

@Override
public List<SqlCommand> createIndexes() {
return Arrays.asList(
SqlCommand.from(CREATE_AUDIT_RECORDS_AUDIT_ACTION_IDX_INDEX),
SqlCommand.from(CREATE_AUDIT_RECORDS_AUDIT_OPERATION_IDX_INDEX),
SqlCommand.from(CREATE_AUDIT_RECORDS_CORRELATION_ID_IDX_INDEX),
SqlCommand.from(CREATE_AUDIT_RECORDS_CREATED_ON_IDX_INDEX));
}

@Override
public List<SqlCommand> createTaskLockTable() {
return Arrays.asList(
SqlCommand.from(V1__Initial_Setup.CREATE_TASK_LOCK_TABLE));
}

@Override
public List<SqlCommand> createTaskDeploymentTable() {
return Arrays.asList(SqlCommand.from(
V1__Initial_Setup.CREATE_TASK_DEPLOYMENT_TABLE));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.dataflow.server.db.migration.mysql;

import java.util.List;

import org.postgresql.core.SqlCommand;

import org.springframework.cloud.dataflow.server.db.migration.AbstractMigrateUriRegistrySqlCommand;
import org.springframework.jdbc.core.JdbcTemplate;

/**
* {@code mysql} related {@link SqlCommand} for migrating data from
* {@code URI_REGISTRY} into {@code app_registration}.
*
* @author Janne Valkealahti
* @author Chris Bono
*/
public class MysqlMigrateUriRegistrySqlCommand extends AbstractMigrateUriRegistrySqlCommand {

@Override
protected void updateAppRegistration(JdbcTemplate jdbcTemplate, List<AppRegistrationMigrationData> data) {
// get value from hibernate sequence table and update it later
// depending on how many updates we did
long nextVal = jdbcTemplate.queryForObject("select next_val as id_val from hibernate_sequence", Long.class);
long nextValUpdates = nextVal;
for (AppRegistrationMigrationData d : data) {
jdbcTemplate.update(
"insert into app_registration (id, object_version, default_version, metadata_uri, name, type, uri, version) values (?,?,?,?,?,?,?,?)",
nextValUpdates++, 0, d.isDefaultVersion() ? 1 : 0, d.getMetadataUri(), d.getName(), d.getType(),
d.getUri(), d.getVersion());
}
jdbcTemplate.update("update hibernate_sequence set next_val= ? where next_val=?", nextValUpdates, nextVal);
}
}
Loading