From 54697404672b5ad217d2f5bba26cac8b09015478 Mon Sep 17 00:00:00 2001
From: cloudAndMonkey <xieyun.xie@gmail.com>
Date: Fri, 6 Jan 2023 15:56:56 +0800
Subject: [PATCH 1/4] =?UTF-8?q?apijson=E5=A4=9A=E6=95=B0=E6=8D=AE=E6=BA=90?=
 =?UTF-8?q?=E6=94=AF=E6=8C=81kafka?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

apijson5.4支持
---
 .../README.md                                 |  13 ++
 .../APIJSONDemo-MultiDataSource-Kafka/pom.xml | 203 ++++++++++++++++++
 .../java/apijson/demo/DataBaseConfig.java     |  22 ++
 .../main/java/apijson/demo/DataBaseUtil.java  |  54 +++++
 .../java/apijson/demo/DemoApplication.java    | 101 +++++++++
 .../java/apijson/demo/DemoController.java     |  99 +++++++++
 .../java/apijson/demo/DemoFunctionParser.java |  93 ++++++++
 .../java/apijson/demo/DemoObjectParser.java   |  26 +++
 .../main/java/apijson/demo/DemoParser.java    |  35 +++
 .../main/java/apijson/demo/DemoSQLConfig.java | 133 ++++++++++++
 .../java/apijson/demo/DemoSQLExecutor.java    | 128 +++++++++++
 .../java/apijson/demo/DynamicDataSource.java  | 169 +++++++++++++++
 .../apijson/demo/KafkaSimpleProducer.java     |  34 +++
 .../java/apijson/demo/SpringContextUtils.java |  60 ++++++
 .../src/main/resources/application.yml        |  35 +++
 .../apijson/demo/KafkaSimpleConsumer.java     |  42 ++++
 16 files changed, 1247 insertions(+)
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/pom.xml
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseConfig.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseUtil.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoApplication.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoController.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoFunctionParser.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoObjectParser.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoParser.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLConfig.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLExecutor.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DynamicDataSource.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/KafkaSimpleProducer.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/SpringContextUtils.java
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/resources/application.yml
 create mode 100644 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/test/java/apijson/demo/KafkaSimpleConsumer.java

diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
new file mode 100644
index 00000000..2214e962
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
@@ -0,0 +1,13 @@
+# APIJSONDemo
+
+## 支持多数据源-消息队列
+
+示例:kafka
+
+原理说明:
+
+Access表名 = 消息队列 topic
+
+Access表配置说明:
+
+![image-20230106155124881](/Users/xy/Library/Application Support/typora-user-images/image-20230106155124881.png)
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/pom.xml b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/pom.xml
new file mode 100644
index 00000000..8a84134d
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/pom.xml
@@ -0,0 +1,203 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.springframework.boot</groupId>
+		<artifactId>spring-boot-starter-parent</artifactId>
+		<version>2.5.13</version>
+		<!--        <relativePath>./pom.xml</relativePath>-->
+	</parent>
+	<groupId>apijson.demo</groupId>
+	<artifactId>apijsondemo-multidatasource-kafka</artifactId>
+	<version>5.4.0</version>
+
+	<name>apijsondemo-multidatasource-kafka</name>
+	<description>Demo project for testing APIJSON server based on SpringBoot</description>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+		<commons-lang3.version>3.12.0</commons-lang3.version>
+		<druid.version>1.1.16</druid.version>
+		<mybatisplus.version>3.5.1</mybatisplus.version>
+		<mybatis-plus-support.version>2.3.3</mybatis-plus-support.version>
+		<commons-collections4.version>4.4</commons-collections4.version>
+		<commons.configuration.version>1.10</commons.configuration.version>
+		<guava.version>30.1.1-jre</guava.version>
+		<fastjson.version>1.2.72</fastjson.version>
+		<hutool.version>4.1.1</hutool.version>
+		<lombok.version>1.18.4</lombok.version>
+		<commons-lang3.version>3.12.0</commons-lang3.version>
+		<commons.io.version>2.5</commons.io.version>
+		<commons.codec.version>1.10</commons.codec.version>
+		<commons-collections4.version>4.4</commons-collections4.version>
+		<commons.configuration.version>1.10</commons.configuration.version>
+		<apijson.version>5.4.0</apijson.version>
+		<mysql.version>8.0.31</mysql.version>
+		<spring-context-support.version>5.3.18</spring-context-support.version>
+		<spring-boot-configuration-processor.version>2.6.6</spring-boot-configuration-processor.version>
+		<dynamic-datasource-spring-boot-starter.version>3.5.2</dynamic-datasource-spring-boot-starter.version>
+		<java.version>1.8</java.version>
+		<kafka.version>3.2.1</kafka.version>
+	</properties>
+
+	<dependencies>
+		<!-- JDK 11+ 需要,否则启动报错 NoClassDefFoundError: javax/activation/UnsupportedDataTypeException -->
+		<dependency>
+			<groupId>javax.activation</groupId>
+			<artifactId>activation</artifactId>
+			<version>1.1.1</version>
+		</dependency>
+
+		<!-- 需要的 APIJSON 相关依赖 -->
+		<dependency>
+            <groupId>com.github.Tencent</groupId>
+            <artifactId>APIJSON</artifactId>
+            <version>${apijson.version}</version>
+        </dependency>
+		<dependency>
+			<groupId>com.github.APIJSON</groupId>
+			<artifactId>apijson-framework</artifactId>
+			<version>${apijson.version}</version>
+		</dependency>
+
+		<!-- 需要用的数据库 JDBC 驱动 -->
+
+		<!-- Oracle, SQLServer 等其它数据库的 JDBC 驱动,可以在这里加上 Maven 依赖或 libs 目录放 Jar 包并依赖 -->
+
+		<!-- 需要用的 SpringBoot 框架,1.4.0 以上 -->
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-web</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework</groupId>
+			<artifactId>spring-context-support</artifactId>
+			<version>${spring-context-support.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-configuration-processor</artifactId>
+			<version>${spring-boot-configuration-processor.version}</version>
+			<optional>true</optional>
+		</dependency>
+		<dependency>
+			<groupId>com.alibaba</groupId>
+			<artifactId>druid-spring-boot-starter</artifactId>
+			<version>${druid.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.baomidou</groupId>
+			<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
+			<version>${dynamic-datasource-spring-boot-starter.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.baomidou</groupId>
+			<artifactId>mybatis-plus-boot-starter</artifactId>
+			<version>${mybatisplus.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.baomidou</groupId>
+					<artifactId>mybatis-plus-generator</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.baomidou</groupId>
+			<artifactId>mybatis-plus-support</artifactId>
+			<version>${mybatis-plus-support.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-collections4</artifactId>
+			<version>${commons-collections4.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>mysql</groupId>
+			<artifactId>mysql-connector-java</artifactId>
+			<version>${mysql.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.projectlombok</groupId>
+			<artifactId>lombok</artifactId>
+			<version>${lombok.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-io</groupId>
+			<artifactId>commons-io</artifactId>
+			<version>${commons.io.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-codec</groupId>
+			<artifactId>commons-codec</artifactId>
+			<version>${commons.codec.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-configuration</groupId>
+			<artifactId>commons-configuration</artifactId>
+			<version>${commons.configuration.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>${kafka.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.springframework.boot</groupId>
+				<artifactId>spring-boot-maven-plugin</artifactId>
+				<configuration>
+					<fork>true</fork>
+					<mainClass>apijson.demo.DemoApplication</mainClass>
+				</configuration>
+				<executions>
+					<execution>
+						<goals>
+							<goal>repackage</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<source>1.8</source>
+					<target>1.8</target>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<repositories>
+		<!-- APIJSON 必须用到的托管平台 -->
+		<repository>
+			<id>jitpack.io</id>
+			<url>https://jitpack.io</url>
+			<snapshots>
+				<enabled>true</enabled>
+			</snapshots>
+		</repository>
+
+		<repository>
+			<id>spring-snapshots</id>
+			<url>https://repo.spring.io/snapshot</url>
+			<snapshots>
+				<enabled>true</enabled>
+			</snapshots>
+		</repository>
+		<repository>
+			<id>spring-milestones</id>
+			<url>https://repo.spring.io/milestone</url>
+		</repository>
+	</repositories>
+
+</project>
\ No newline at end of file
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseConfig.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseConfig.java
new file mode 100644
index 00000000..6e261537
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseConfig.java
@@ -0,0 +1,22 @@
+package apijson.demo;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class DataBaseConfig {
+	private String primary;
+
+	@Value("${spring.datasource.dynamic.primary}")
+	public void setPrimary(String primary) {
+		this.primary = primary;
+	}
+
+	public String getPrimary() {
+		return primary;
+	}
+
+	public static DataBaseConfig getInstence() {
+		return SpringContextUtils.getBean(DataBaseConfig.class);
+	}
+}
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseUtil.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseUtil.java
new file mode 100644
index 00000000..a4c6bbf3
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseUtil.java
@@ -0,0 +1,54 @@
+package apijson.demo;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+public class DataBaseUtil {
+
+    /**
+     * 根据url获取库名
+     * @param url
+     * @return
+     */
+    public static String getLibname(String url) {
+        Pattern p = Pattern.compile("jdbc:(?<db>\\w+):.*((//)|@)(?<host>.+):(?<port>\\d+)(/|(;DatabaseName=)|:)(?<dbName>\\w+)\\??.*");
+        Matcher m = p.matcher(url);
+        if(m.find()) {
+            return m.group("dbName");
+        }
+        return null;
+    }
+
+    /***
+     * primary: master
+     * strict: false
+     * @param datasource: 匹配不成功, 自动匹配默认数据库
+     * @return
+     */
+    public static javax.sql.DataSource getDataSource(String datasource) {
+		try {
+			return DynamicDataSource.getDetail(datasource).getDataSource(); // 数据源
+		} catch (Exception e) {
+			throw new IllegalArgumentException("动态数据源配置错误 " + datasource);
+		}
+	}
+
+	public static String getDruidUrl(String datasource) {
+		return DynamicDataSource.getDetail(datasource).getUrl(); // 数据库连接url
+	}
+
+	public static String getDruidSchema(String datasource) {
+		return getLibname(DynamicDataSource.getDetail(datasource).getUrl()); // 数据库名;
+	}
+	
+	public static String getDruidDBAccount(String datasource) {
+		return DynamicDataSource.getDetail(datasource).getDbAccount(); // 数据库用户名
+	}
+	
+	public static String getDruidDBPassword(String datasource) {
+		return DynamicDataSource.getDetail(datasource).getDbPassword(); // 数据库密码
+	}
+}
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoApplication.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoApplication.java
new file mode 100644
index 00000000..2a5f6897
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoApplication.java
@@ -0,0 +1,101 @@
+/*Copyright ©2016 TommyLemon(https://github.com/TommyLemon/APIJSON)
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.*/
+
+package apijson.demo;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.web.server.WebServerFactoryCustomizer;
+import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.servlet.config.annotation.CorsRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+import apijson.Log;
+import apijson.framework.APIJSONApplication;
+import apijson.framework.APIJSONCreator;
+import apijson.orm.AbstractFunctionParser;
+import apijson.orm.AbstractVerifier;
+import apijson.orm.FunctionParser;
+import apijson.orm.Parser;
+import apijson.orm.SQLConfig;
+import apijson.orm.SQLExecutor;
+
+/**
+ * Demo SpringBoot Application 主应用程序启动类 右键这个类 > Run As > Java Application 具体见
+ * SpringBoot 文档
+ * https://www.springcloud.cc/spring-boot.html#using-boot-locating-the-main-class
+ *
+ * @author Lemon
+ */
+@Configuration
+@SpringBootApplication
+@EnableAutoConfiguration
+@EnableConfigurationProperties
+public class DemoApplication implements WebServerFactoryCustomizer<ConfigurableServletWebServerFactory> {
+	public static final String TAG = "DemoApplication";
+
+	public static void main(String[] args) throws Exception {
+		SpringApplication.run(DemoApplication.class, args);
+		Log.DEBUG = true;
+		APIJSONApplication.init(false); // 4.4.0 以上需要这句来保证以上 static 代码块中给 DEFAULT_APIJSON_CREATOR 赋值会生效
+	}
+
+	// SpringBoot 2.x 自定义端口方式
+	@Override
+	public void customize(ConfigurableServletWebServerFactory server) {
+		server.setPort(8080);
+	}
+
+	// 支持 APIAuto 中 JavaScript 代码跨域请求
+	@Bean
+	public WebMvcConfigurer corsConfigurer() {
+		return new WebMvcConfigurer() {
+			@Override
+			public void addCorsMappings(CorsRegistry registry) {
+				registry.addMapping("/**").allowedOriginPatterns("*").allowedMethods("*").allowCredentials(true)
+						.maxAge(3600);
+			}
+		};
+	}
+
+	static {
+		// 使用本项目的自定义处理类
+		APIJSONApplication.DEFAULT_APIJSON_CREATOR = new APIJSONCreator<String>() {
+			@Override
+			public Parser<String> createParser() {
+				return new DemoParser();
+			}
+
+			@Override
+			public SQLConfig createSQLConfig() {
+				return new DemoSQLConfig();
+			}
+
+			@Override
+			public FunctionParser createFunctionParser() {
+				return new DemoFunctionParser();
+			}
+
+			@Override
+			public SQLExecutor createSQLExecutor() {
+				return new DemoSQLExecutor();
+			}
+		};
+	}
+
+}
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoController.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoController.java
new file mode 100644
index 00000000..c91c58f3
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoController.java
@@ -0,0 +1,99 @@
+/*Copyright ©2016 TommyLemon(https://github.com/TommyLemon/APIJSON)
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.*/
+
+package apijson.demo;
+
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.net.URLDecoder;
+import java.util.Map;
+
+import javax.servlet.http.HttpSession;
+
+import apijson.RequestMethod;
+import apijson.StringUtil;
+import apijson.framework.APIJSONController;
+import apijson.orm.Parser;
+
+
+/**请求路由入口控制器,包括通用增删改查接口等,转交给 APIJSON 的 Parser 来处理
+ * 具体见 SpringBoot 文档
+ * https://www.springcloud.cc/spring-boot.html#boot-features-spring-mvc
+ * 以及 APIJSON 通用文档 3.设计规范 3.1 操作方法  
+ * https://github.com/Tencent/APIJSON/blob/master/Document.md#3.1
+ * <br > 建议全通过HTTP POST来请求:
+ * <br > 1.减少代码 - 客户端无需写HTTP GET,PUT等各种方式的请求代码
+ * <br > 2.提高性能 - 无需URL encode和decode
+ * <br > 3.调试方便 - 建议使用 APIAuto(http://apijson.cn/api) 或 Postman
+ * @author Lemon
+ */
+@RestController
+@RequestMapping("")
+public class DemoController extends APIJSONController<Long> {
+
+	@Override
+	public Parser<Long> newParser(HttpSession session, RequestMethod method) {
+		return super.newParser(session, method).setNeedVerify(false);  // TODO 这里关闭校验,方便新手快速测试,实际线上项目建议开启
+	}
+
+	/**增删改查统一接口,这个一个接口可替代 7 个万能通用接口,牺牲一些路由解析性能来提升一点开发效率
+	 * @param method
+	 * @param request
+	 * @param session
+	 * @return
+	 */
+	@PostMapping(value = "{method}")  // 如果和其它的接口 URL 冲突,可以加前缀,例如改为 crud/{method} 或 Controller 注解 @RequestMapping("crud")
+	@Override
+	public String crud(@PathVariable String method, @RequestBody String request, HttpSession session) {
+		return super.crud(method, request, session);
+	}
+
+	/**增删改查统一接口,这个一个接口可替代 7 个万能通用接口,牺牲一些路由解析性能来提升一点开发效率
+	 * @param method
+	 * @param tag
+	 * @param params
+	 * @param request
+	 * @param session
+	 * @return
+	 */
+	@PostMapping("{method}/{tag}")  // 如果和其它的接口 URL 冲突,可以加前缀,例如改为 crud/{method}/{tag} 或 Controller 注解 @RequestMapping("crud")
+	@Override
+	public String crudByTag(@PathVariable String method, @PathVariable String tag, @RequestParam Map<String, String> params, @RequestBody String request, HttpSession session) {
+		return super.crudByTag(method, tag, params, request, session);
+	}
+
+	/**获取
+	 * 只为兼容HTTP GET请求,推荐用HTTP POST,可删除
+	 * @param request 只用String,避免encode后未decode
+	 * @param session
+	 * @return
+	 * @see {@link RequestMethod#GET}
+	 */
+	@GetMapping("get/{request}")
+	public String openGet(@PathVariable String request, HttpSession session) {
+		try {
+			request = URLDecoder.decode(request, StringUtil.UTF_8);
+		} catch (Exception e) {
+			// Parser 会报错
+		}
+		return get(request, session);
+	}
+
+}
\ No newline at end of file
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoFunctionParser.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoFunctionParser.java
new file mode 100644
index 00000000..92e729c7
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoFunctionParser.java
@@ -0,0 +1,93 @@
+package apijson.demo;
+
+import javax.servlet.http.HttpSession;
+
+import com.alibaba.fastjson.JSONObject;
+
+import apijson.NotNull;
+import apijson.RequestMethod;
+import apijson.StringUtil;
+import apijson.framework.APIJSONFunctionParser;
+import apijson.framework.APIJSONVerifier;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DemoFunctionParser extends APIJSONFunctionParser {
+	public DemoFunctionParser() {
+		this(null, null, 0, null, null);
+	}
+
+	// 展示在远程函数内部可以用 this 拿到的东西
+	public DemoFunctionParser(RequestMethod method, String tag, int version, JSONObject request, HttpSession session) {
+		super(method, tag, version, request, session);
+	}
+
+	/***
+	 * 获取当前用户id
+	 * 
+	 * @param current
+	 * @return
+	 */
+	public String getCurrentUserId(@NotNull JSONObject current) {
+		if (this.getSession() == null) {
+			return "test"; // 启动时的自动测试
+		}
+		return APIJSONVerifier.getVisitorId(getSession());
+	}
+
+	/**
+	 * 一个最简单的远程函数示例,返回一个前面拼接了 Hello 的字符串
+	 * 
+	 * @param current
+	 * @param name
+	 * @return
+	 * @throws Exception
+	 */
+	public String sayHello(@NotNull JSONObject current, @NotNull String name) throws Exception {
+		// 注意这里参数 name 是 key,不是 value
+		Object obj = current.get(name);
+
+		if (this.getSession() == null) {
+			return "test"; // 启动时的自动测试
+		}
+		
+		if (obj == null) {
+			throw new IllegalArgumentException();
+		}
+		if (!(obj instanceof String)) {
+			throw new IllegalArgumentException();
+		}
+		
+		// 之后可以用 this.getSession 拿到当前的 HttpSession
+		return "Hello, " + obj.toString();
+	}
+
+	/***
+	 * 密码加密
+	 * 
+	 * @param current
+	 * @param id       添加id生成
+	 * @param password 密码字段名
+	 * @return
+	 * @throws Exception
+	 */
+	public void pwdEncrypt(@NotNull JSONObject current, @NotNull String id, @NotNull String password)
+			throws Exception {
+		String c_password = current.getString(password);
+		current.put(password, c_password + "_" + System.currentTimeMillis());
+	}
+
+	public void childFunTest(@NotNull JSONObject current, @NotNull String addr) throws Exception {
+		String c_addr = current.getString(addr);
+		current.put(addr, c_addr + "_" + System.currentTimeMillis());
+	}
+
+
+	public void removeKeys(@NotNull JSONObject current, String keys) {
+		String[] ks = StringUtil.split(keys, ";"); // 用分号 ; 分割
+		// 根据 ks remove 掉 current 里的字段
+		for (int i = 0; i < ks.length; i++) {
+			current.remove(ks[i]);
+		}
+	}
+}
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoObjectParser.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoObjectParser.java
new file mode 100644
index 00000000..627de64c
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoObjectParser.java
@@ -0,0 +1,26 @@
+package apijson.demo;
+
+import java.util.List;
+
+import javax.servlet.http.HttpSession;
+
+import com.alibaba.fastjson.JSONObject;
+
+import apijson.NotNull;
+import apijson.RequestMethod;
+import apijson.framework.APIJSONObjectParser;
+import apijson.orm.Join;
+import apijson.orm.SQLConfig;
+
+public class DemoObjectParser extends APIJSONObjectParser {
+	
+    public DemoObjectParser(HttpSession session, @NotNull JSONObject request, String parentPath, SQLConfig arrayConfig
+            , boolean isSubquery, boolean isTable, boolean isArrayMainTable) throws Exception {
+        super(session, request, parentPath, arrayConfig, isSubquery, isTable, isArrayMainTable);
+    }
+
+    @Override
+    public SQLConfig newSQLConfig(RequestMethod method, String table, String alias, JSONObject request, List<Join> joinList, boolean isProcedure) throws Exception {
+        return DemoSQLConfig.newSQLConfig(method, table, alias, request, joinList, isProcedure);
+    }
+}
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoParser.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoParser.java
new file mode 100644
index 00000000..e12333e6
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoParser.java
@@ -0,0 +1,35 @@
+package apijson.demo;
+
+import com.alibaba.fastjson.JSONObject;
+
+import apijson.RequestMethod;
+import apijson.framework.APIJSONObjectParser;
+import apijson.framework.APIJSONParser;
+import apijson.orm.SQLConfig;
+
+public class DemoParser extends APIJSONParser<String> {
+	public DemoParser() {
+		super();
+	}
+
+	public DemoParser(RequestMethod method) {
+		super(method);
+	}
+
+	public DemoParser(RequestMethod method, boolean needVerify) {
+		super(method, needVerify);
+	}
+
+	// 可重写来设置最大查询数量
+	// @Override
+	// public int getMaxQueryCount() {
+	// return 50;
+	// }
+
+	@Override
+	public APIJSONObjectParser createObjectParser(JSONObject request, String parentPath, SQLConfig arrayConfig, boolean isSubquery, boolean isTable, boolean isArrayMainTable) throws Exception {
+		return new DemoObjectParser(getSession(), request, parentPath, arrayConfig, isSubquery, isTable, isArrayMainTable).setMethod(getMethod()).setParser(this);
+	}
+
+	
+}
\ No newline at end of file
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLConfig.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLConfig.java
new file mode 100644
index 00000000..6337d426
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLConfig.java
@@ -0,0 +1,133 @@
+/*Copyright ©2016 TommyLemon(https://github.com/TommyLemon/APIJSON)
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.*/
+
+package apijson.demo;
+
+import java.util.UUID;
+
+import com.alibaba.fastjson.annotation.JSONField;
+
+import apijson.RequestMethod;
+import apijson.framework.APIJSONSQLConfig;
+import apijson.orm.AbstractSQLConfig;
+
+
+/**SQL 配置
+ * TiDB 用法和 MySQL 一致
+ * 具体见详细的说明文档 C.开发说明 C-1-1.修改数据库链接  
+ * https://github.com/Tencent/APIJSON/blob/master/%E8%AF%A6%E7%BB%86%E7%9A%84%E8%AF%B4%E6%98%8E%E6%96%87%E6%A1%A3.md#c-1-1%E4%BF%AE%E6%94%B9%E6%95%B0%E6%8D%AE%E5%BA%93%E9%93%BE%E6%8E%A5
+ * @author Lemon
+ */
+public class DemoSQLConfig extends APIJSONSQLConfig {
+
+	public DemoSQLConfig() {
+		super();
+	}
+
+	public DemoSQLConfig(RequestMethod method, String table) {
+		super(method, table);
+	}
+	
+    static {
+//        DEFAULT_DATABASE = DATABASE_ELASTICSEARCH;  // TODO 默认数据库类型,改成你自己的
+//        DEFAULT_SCHEMA = "sys";  // TODO 默认数据库名/模式,改成你自己的,默认情况是 MySQL: sys, PostgreSQL: public, SQL Server: dbo, Oracle:
+
+        // 表名和数据库不一致的,需要配置映射关系。只使用 APIJSONORM 时才需要;
+        // 如果用了 apijson-framework 且调用了 APIJSONApplication.init 则不需要
+        // (间接调用 DemoVerifier.init 方法读取数据库 Access 表来替代手动输入配置)。
+        // 但如果 Access 这张表的对外表名与数据库实际表名不一致,仍然需要这里注册。例如
+        //		TABLE_KEY_MAP.put(Access.class.getSimpleName(), "access");
+
+        SIMPLE_CALLBACK = new SimpleCallback<String>() {
+
+			@Override
+			public AbstractSQLConfig getSQLConfig(RequestMethod method, String database, String schema,
+					String datasource, String table) {
+				return new DemoSQLConfig(method, table);
+			}
+
+			// 取消注释来实现数据库自增 id
+			@Override
+			public String newId(RequestMethod method, String database, String schema, String datasource, String table) {
+				if(table.equals("Access") || table.equals("Request") || table.equals("Function")){
+					return null;
+				}
+				return UUID.randomUUID().toString(); // return null 则不生成 id,一般用于数据库自增 id
+			}
+		};
+    }
+
+    @JSONField(serialize = false)  // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加
+	@Override
+	public String getDBVersion() {
+		return DynamicDataSource.getDetail(this.getDatasource()).getDbVersion();
+	}
+
+	@JSONField(serialize = false) // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加
+	@Override
+	public String getDatabase() {
+		if (super.getDatabase() != null) {
+			return super.getDatabase();
+		}
+		try {
+			return DynamicDataSource.getDetail(this.getDatasource()).getDatabase();
+		} catch (Exception e) {
+			throw new IllegalArgumentException("动态数据源配置错误 " + this.getDatasource());
+		}
+	}
+
+	@JSONField(serialize = false) // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加
+	@Override
+	public String getSchema() {
+		if (super.getSchema() != null) {
+			return super.getSchema();
+		}
+		try {
+			return DynamicDataSource.getDetail(this.getDatasource()).getSchema();
+		} catch (Exception e) {
+			throw new IllegalArgumentException("动态数据源配置错误 " + this.getDatasource());
+		}
+	}
+
+	@JSONField(serialize = false)  // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加
+	@Override
+	public String getDBUri() {
+		try {
+			return DynamicDataSource.getDetail(this.getDatasource()).getUrl(); // 数据库连接url
+		} catch (Exception e) {
+			throw new IllegalArgumentException("动态数据源配置错误 " + this.getDatasource());
+		}
+	}
+
+	@JSONField(serialize = false)  // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加
+	@Override
+	public String getDBAccount() {
+		try {
+			return DynamicDataSource.getDetail(this.getDatasource()).getDbAccount();
+		} catch (Exception e) {
+			throw new IllegalArgumentException("动态数据源配置错误 " + this.getDatasource());
+		}
+	}
+
+	@JSONField(serialize = false)  // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加
+	@Override
+	public String getDBPassword() {
+		try {
+			return DynamicDataSource.getDetail(this.getDatasource()).getDbPassword();
+		} catch (Exception e) {
+			throw new IllegalArgumentException("动态数据源配置错误 " + this.getDatasource());
+		}
+	}
+
+}
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLExecutor.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLExecutor.java
new file mode 100644
index 00000000..f4e8cbed
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLExecutor.java
@@ -0,0 +1,128 @@
+/*Copyright ©2016 TommyLemon(https://github.com/TommyLemon/APIJSON)
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.*/
+
+package apijson.demo;
+
+import java.sql.Connection;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import apijson.Log;
+import apijson.NotNull;
+import apijson.StringUtil;
+import apijson.framework.APIJSONSQLExecutor;
+import apijson.orm.SQLConfig;
+import lombok.extern.log4j.Log4j2;
+
+/**
+ * SQL 执行器,支持连接池及多数据源 具体见 https://github.com/Tencent/APIJSON/issues/151
+ * 
+ * @author Lemon
+ */
+@Log4j2
+public class DemoSQLExecutor extends APIJSONSQLExecutor {
+	public static final String TAG = "DemoSQLExecutor";
+
+	// 适配连接池,如果这里能拿到连接池的有效 Connection,则 SQLConfig 不需要配置 dbVersion, dbUri, dbAccount,
+	// dbPassword
+	@Override
+	public Connection getConnection(SQLConfig config) throws Exception {
+		String datasource = config.getDatasource();
+		Log.d(TAG, "getConnection  config.getDatasource() = " + datasource);
+
+		String key = datasource + "-" + config.getDatabase();
+		Connection conn = connectionMap.get(key);
+		if (conn == null || conn.isClosed()) {
+			DataSource dataSource = DataBaseUtil.getDataSource(datasource);
+			connectionMap.put(key, dataSource == null ? null : dataSource.getConnection());
+		}
+		return super.getConnection(config);
+	}
+
+	@SuppressWarnings("incomplete-switch")
+	@Override
+	public int executeUpdate(@NotNull SQLConfig config, String sql) throws Exception {
+		if (StringUtil.equals(config.getDatasource(), "kafka")) {
+		// if (config.isMQ() && StringUtil.equals(config.getDatasource(), "kafka")) {
+			switch (config.getMethod()) {
+			case POST:
+				// 消息组装、二次处理
+				String jsonColumn = "message";
+				DynamicDataSource DynamicDataSource = apijson.demo.DynamicDataSource.getDetail(config.getDatasource());
+				for (int i = 0; i < config.getColumn().size(); i++) {
+					String column = config.getColumn().get(i);
+					if (StringUtil.equals(column, jsonColumn)) {
+						for (List<Object> list : config.getValues()) {
+							Object message = list.get(i);
+							return KafkaSimpleProducer.sendMessage(config.getDatasource(), DynamicDataSource.getProps(), config.getTable(), message);
+						}
+					}
+				}
+			}
+			return 0;
+		}
+		return super.executeUpdate(config, sql);
+	}
+	/***
+	 * 查询返回字段值进行二次处理
+	 */
+//	@Override
+//	protected JSONObject onPutColumn(@NotNull SQLConfig config, @NotNull ResultSet rs, @NotNull ResultSetMetaData rsmd
+//			, final int tablePosition, @NotNull JSONObject table, final int columnIndex, Join join, Map<String, JSONObject> childMap) throws Exception {
+//		if (table == null) {  // 对应副表 viceSql 不能生成正常 SQL, 或者是 ! - Outer, ( - ANTI JOIN 的副表这种不需要缓存及返回的数据
+//			Log.i(TAG, "onPutColumn table == null >> return table;");
+//			return table;
+//		}
+//
+//		if (isHideColumn(config, rs, rsmd, tablePosition, table, columnIndex, childMap)) {
+//			Log.i(TAG, "onPutColumn isHideColumn(config, rs, rsmd, tablePosition, table, columnIndex, childMap) >> return table;");
+//			return table;
+//		}
+//
+//		String label = getKey(config, rs, rsmd, tablePosition, table, columnIndex, childMap);
+//		Object value = getValue(config, rs, rsmd, tablePosition, table, columnIndex, label, childMap);
+//		
+//		// TODO
+//		if(StringUtils.equals(config.getTable(), "User") && StringUtils.equals(label, "addr_id")) {
+//			value = "1-1-1";
+//		}
+//		// 主表必须 put 至少一个 null 进去,否则全部字段为 null 都不 put 会导致中断后续正常返回值
+//		if (value != null || (join == null && table.isEmpty())) {
+//			table.put(label, value);
+//		}
+//
+//		return table;
+//	}
+
+	// 取消注释支持 !key 反选字段 和 字段名映射,需要先依赖插件 https://github.com/APIJSON/apijson-column
+	// @Override
+	// protected String getKey(SQLConfig config, ResultSet rs, ResultSetMetaData
+	// rsmd, int tablePosition, JSONObject table,
+	// int columnIndex, Map<String, JSONObject> childMap) throws Exception {
+	// return ColumnUtil.compatOutputKey(super.getKey(config, rs, rsmd,
+	// tablePosition, table, columnIndex, childMap), config.getTable(),
+	// config.getMethod());
+	// }
+
+	// 不需要隐藏字段这个功能时,取消注释来提升性能
+	// @Override
+	// protected boolean isHideColumn(SQLConfig config, ResultSet rs,
+	// ResultSetMetaData rsmd, int tablePosition,
+	// JSONObject table, int columnIndex, Map<String, JSONObject> childMap) throws
+	// SQLException {
+	// return false;
+	// }
+
+}
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DynamicDataSource.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DynamicDataSource.java
new file mode 100644
index 00000000..7eef7c54
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DynamicDataSource.java
@@ -0,0 +1,169 @@
+package apijson.demo;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
+import com.baomidou.dynamic.datasource.ds.ItemDataSource;
+import com.baomidou.mybatisplus.extension.toolkit.JdbcUtils;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+/***
+ * 不存在并发问题 缓存 jdbc 数据源,供apijson调用 1、应用启动添加数据源 2、页面动态添加数据源(数据库存储数据源信息)
+ * 
+ *
+ */
+@Data
+@Order(value = 10)
+@Component
+@Slf4j
+public class DynamicDataSource implements ApplicationRunner {
+	// value: 数据源相关信息
+	private static Map<String, DynamicDataSource> dataSourceMap = new HashMap<>();
+	private String database; // 表所在的数据库类型
+	private String schema; // 表所在的数据库名
+	private String datasourceName; // 数据源
+	private String url; // jdbc url
+	private String dbAccount; // 数据库用户名
+	private String dbPassword; // 数据库密码
+	private String dbVersion; // 数据库版本号
+	private String clusterName; // 集群名称
+	private Properties props; // 属性值
+	
+	@Autowired
+	private DataSource dataSource; // 数据源
+
+	public static void addDataSource(DynamicDataSource detail) {
+		dataSourceMap.put(detail.getDatasourceName(), detail);
+	}
+
+	/***
+	 * 获取数据源详细信息
+	 * 
+	 * @return
+	 */
+	public static DynamicDataSource getDetail(String datasource) {
+		if (datasource == null) {
+			// 默认数据源
+			datasource = DataBaseConfig.getInstence().getPrimary();
+		}
+		// 不存在交给框架处理
+		return dataSourceMap.get(datasource);
+	}
+
+	@Override
+	public void run(ApplicationArguments args) throws Exception {
+		initJdbcDataSource(); // 初始化spring application.xml 数据库连接池
+		// kafka
+		initMQ_kafka();
+	}
+
+	/***
+	 * 初始化数据库连接池
+	 */
+	private void initJdbcDataSource() {
+		DynamicRoutingDataSource dataSourceList = (DynamicRoutingDataSource) this.dataSource;
+		for (String datasourceName : dataSourceList.getDataSources().keySet()) {
+			ItemDataSource dataSource = (ItemDataSource) dataSourceList.getDataSources().get(datasourceName);
+			DruidDataSource druid = (DruidDataSource) dataSource.getRealDataSource();
+			String url = druid.getDataSourceStat().getUrl(); // 数据库连接url
+			String schema = DataBaseUtil.getLibname(url); // 数据库名;
+			String database = JdbcUtils.getDbType(url).getDb().toUpperCase(); // 数据库类型
+			String dbAccount = druid.getUsername(); // 数据库用户名
+			String dbPassword = druid.getPassword(); // 数据库密码
+			String dbVersion = getDBVersion(dataSource);
+
+			DynamicDataSource dynDataSource = new DynamicDataSource();
+			dynDataSource.setDatasourceName(datasourceName);
+			dynDataSource.setDatabase(database);
+			dynDataSource.setDataSource(druid);
+			dynDataSource.setSchema(schema);
+			dynDataSource.setUrl(url);
+			dynDataSource.setDbAccount(dbAccount);
+			dynDataSource.setDbPassword(dbPassword);
+			dynDataSource.setDbVersion(dbVersion);
+			dataSourceMap.put(datasourceName, dynDataSource);
+		}
+	}
+	
+	/***
+	 * 仅供测试使用
+	 */
+	public void initMQ_kafka() {
+		/* 1.创建kafka生产者的配置信息 */
+		Properties props = new Properties();
+		/*2.指定连接的kafka集群, broker-list */
+		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092");  
+		/*3.ack应答级别*/
+		props.put(ProducerConfig.ACKS_CONFIG, "all");
+		/*4.重试次数*/ 
+		props.put(ProducerConfig.RETRIES_CONFIG, 3); 
+		/*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */
+		props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);  
+		/*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */
+		props.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
+		/*7. RecordAccumulator 缓冲区大小*/ 
+		props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);  
+		/*8. key, value 的序列化类 */ 
+		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+		
+		
+		DynamicDataSource dynDataSource = new DynamicDataSource();
+		dynDataSource.setDatasourceName("kafka");
+		dynDataSource.setDatabase("MQ");
+		dynDataSource.setSchema(""); // 不需要配置数据库名
+		dynDataSource.setDbVersion("2.8.1"); // 后面做成动态的
+		dynDataSource.setClusterName("kafka");
+		dynDataSource.setProps(props);
+		dataSourceMap.put(dynDataSource.getDatasourceName(), dynDataSource);
+	}
+	
+	public String getDBVersion(DataSource dataSource) {
+		Connection connection = null;
+		Statement statement = null;
+		ResultSet resultSet = null;
+		try {
+			connection = dataSource.getConnection();
+			statement = connection.createStatement();
+			resultSet = statement.executeQuery("select version() as version");
+			while (resultSet.next()) {
+				return resultSet.getString("version");
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+		} finally {
+			try {
+				if (resultSet != null) {
+					resultSet.close();
+				}
+				if (statement != null) {
+					statement.close();
+				}
+				if (connection != null) {
+					connection.close();
+				}
+			} catch (SQLException throwables) {
+			}
+		}
+		return null;
+	}
+}
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/KafkaSimpleProducer.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/KafkaSimpleProducer.java
new file mode 100644
index 00000000..71c822fc
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/KafkaSimpleProducer.java
@@ -0,0 +1,34 @@
+package apijson.demo;
+
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class KafkaSimpleProducer {
+
+	public static int sendMessage(String datasource, Properties props,String topic, Object message) {
+		KafkaProducer<String, Object> producer = null;
+		try {
+			/* 9.创建生产者对象 */
+			producer = new KafkaProducer<>(props);
+			Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, message));
+			RecordMetadata rMetadata = future.get(); // 调用future的get方法,让main线程阻塞,就可以实现同步发送 
+			log.info("rMetadata: {}", rMetadata.toString());
+			return 1;
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new IllegalArgumentException("动态数据源配置错误 " + datasource);
+		} finally {
+			if(producer != null) {
+				/* 关闭资源 */  
+				producer.close();
+			}
+		}
+	}
+}
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/SpringContextUtils.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/SpringContextUtils.java
new file mode 100644
index 00000000..7a8e8f4f
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/SpringContextUtils.java
@@ -0,0 +1,60 @@
+package apijson.demo;
+
+import java.util.Map;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+/**
+ * Spring Context 工具类
+ *
+ * @author
+ */
+@Component
+public class SpringContextUtils implements ApplicationContextAware {
+	public static ApplicationContext applicationContext; 
+
+	@Override
+	public void setApplicationContext(ApplicationContext applicationContext)
+			throws BeansException {
+		SpringContextUtils.applicationContext = applicationContext;
+	}
+
+	public static Object getBean(String name) {
+		return applicationContext.getBean(name);
+	}
+
+	public static <T> T getBean(String name, Class<T> requiredType) {
+		return applicationContext.getBean(name, requiredType);
+	}
+	
+	/**
+	 * 通过class获取Bean.
+	 *
+	 * @param clazz
+	 * @param       <T>
+	 * @return
+	 */
+	public static <T> T getBean(Class<T> clazz) {
+		return applicationContext.getBean(clazz);
+	}
+
+	public static boolean containsBean(String name) {
+		return applicationContext.containsBean(name);
+	}
+
+	public static boolean isSingleton(String name) {
+		return applicationContext.isSingleton(name);
+	}
+
+	public static Class<? extends Object> getType(String name) {
+		return applicationContext.getType(name);
+	}
+	
+	public static <T> Map<String, T> getBeansOfType(Class<T> type) {
+		return applicationContext.getBeansOfType(type);
+	}
+
+}
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/resources/application.yml b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/resources/application.yml
new file mode 100644
index 00000000..7d5a68ee
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/resources/application.yml
@@ -0,0 +1,35 @@
+spring:
+  datasource:
+    type: com.alibaba.druid.pool.DruidDataSource
+    dynamic:
+      primary: master
+      strict: true
+      druid:
+        initial-size: 5
+        min-idle: 5
+        maxActive: 2000
+        maxWait: 60000
+        timeBetweenEvictionRunsMillis: 60000
+        minEvictableIdleTimeMillis: 300000
+        validationQuery: SELECT 1 FROM DUAL
+        testWhileIdle: true
+        testOnBorrow: false
+        testOnReturn: false
+        poolPreparedStatements: true
+        maxPoolPreparedStatementPerConnectionSize: 20
+        filters: stat,slf4j
+        connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
+      datasource:
+        master:
+          driver-class-name: com.mysql.cj.jdbc.Driver
+          url: jdbc:mysql://localhost:3306/xxxx?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowMultiQueries=true&useSSL=false
+          username: 
+          password: 
+      filter:
+        stat:
+          log-slow-sql: true
+          slow-sql-millis: 1000
+          merge-sql: false
+        wall:
+          config:
+            multi-statement-allow: true
diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/test/java/apijson/demo/KafkaSimpleConsumer.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/test/java/apijson/demo/KafkaSimpleConsumer.java
new file mode 100644
index 00000000..c8ebc23d
--- /dev/null
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/test/java/apijson/demo/KafkaSimpleConsumer.java
@@ -0,0 +1,42 @@
+package apijson.demo;
+
+import java.util.Properties;
+import java.util.Arrays;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+public class KafkaSimpleConsumer {
+	public static void main(String[] args) throws Exception {
+
+		// Kafka consumer configuration settings
+		String topicName = "Topic_User";
+		Properties props = new Properties();
+
+		props.put("bootstrap.servers", "xxx:9092");
+		props.put("group.id", "test");
+		props.put("enable.auto.commit", "true");
+		props.put("auto.commit.interval.ms", "1000");
+		props.put("session.timeout.ms", "30000");
+		props.put("key.deserializer", StringDeserializer.class.getName());
+		props.put("value.deserializer", StringDeserializer.class.getName());
+		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
+
+		// Kafka Consumer subscribes list of topics here.
+		consumer.subscribe(Arrays.asList(topicName));
+
+		// print the topic name
+		System.out.println("Subscribed to topic " + topicName);
+		int i = 0;
+
+		while (true) {
+			ConsumerRecords<String, String> records = consumer.poll(10);
+			for (ConsumerRecord<String, String> record : records)
+
+				// print the offset,key and value for the consumer records.
+				System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
+		}
+	}
+}

From 33db3e4606ed7fe93a9efd151ebe9b7ab308122a Mon Sep 17 00:00:00 2001
From: cloudAndMonkey <xieyun.xie@gmail.com>
Date: Fri, 6 Jan 2023 16:05:04 +0800
Subject: [PATCH 2/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=8F=8F=E8=BF=B0?=
 =?UTF-8?q?=E4=BF=A1=E6=81=AF?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../README.md                                 | 35 ++++++++++++++++++-
 1 file changed, 34 insertions(+), 1 deletion(-)

diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
index 2214e962..939e9429 100644
--- a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
@@ -9,5 +9,38 @@
 Access表名 = 消息队列 topic
 
 Access表配置说明:
+![image](https://user-images.githubusercontent.com/12228225/210956299-204115a7-433c-4f18-af27-5120068dab2e.png)
+Request表配置post权限
+![image](https://user-images.githubusercontent.com/12228225/210956378-be095589-0ced-4317-bb46-6b296538f26e.png)
 
-![image-20230106155124881](/Users/xy/Library/Application Support/typora-user-images/image-20230106155124881.png)
+apijson发送mq消息:
+单条
+{
+	"@datasource": "kafka",
+    "Topic_User":{
+        "message":"test-101"
+    },
+    "tag": "Topic_User",
+    "@explain": false
+}
+多条
+{
+    "Topic_User[]": [
+        {
+           "message":"test-100"
+        },
+        {
+            "message":"test-101"
+        }
+    ],
+    "tag": "Topic_User[]",
+    "@datasource": "kafka",
+    "@explain": true
+}
+
+客户端接收消息:
+offset = 47, key = null, value = test-101
+offset = 48, key = null, value = test-100
+offset = 49, key = null, value = test-101
+
+用java代码方式,获取具体数据源,调用即可

From 3b086daa6d660d17fc6793d648b11a15041849aa Mon Sep 17 00:00:00 2001
From: cloudAndMonkey <xieyun.xie@gmail.com>
Date: Fri, 6 Jan 2023 16:05:53 +0800
Subject: [PATCH 3/4] Update README.md

---
 APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
index 939e9429..a003e95e 100644
--- a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
@@ -39,8 +39,10 @@ apijson发送mq消息:
 }
 
 客户端接收消息:
+
 offset = 47, key = null, value = test-101
 offset = 48, key = null, value = test-100
 offset = 49, key = null, value = test-101
 
+
 用java代码方式,获取具体数据源,调用即可

From 983ad107a392001b06f272f43f18ba5a7ddb287a Mon Sep 17 00:00:00 2001
From: cloudAndMonkey <xieyun.xie@gmail.com>
Date: Fri, 6 Jan 2023 16:09:08 +0800
Subject: [PATCH 4/4] Update README.md

---
 .../APIJSONDemo-MultiDataSource-Kafka/README.md    | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
index a003e95e..82218340 100644
--- a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
+++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md
@@ -14,16 +14,16 @@ Request表配置post权限
 ![image](https://user-images.githubusercontent.com/12228225/210956378-be095589-0ced-4317-bb46-6b296538f26e.png)
 
 apijson发送mq消息:
-单条
+单条<br/>
 {
-	"@datasource": "kafka",
+    "@datasource": "kafka",
     "Topic_User":{
         "message":"test-101"
     },
     "tag": "Topic_User",
     "@explain": false
-}
-多条
+}<br/>
+多条<br/>
 {
     "Topic_User[]": [
         {
@@ -40,9 +40,9 @@ apijson发送mq消息:
 
 客户端接收消息:
 
-offset = 47, key = null, value = test-101
-offset = 48, key = null, value = test-100
-offset = 49, key = null, value = test-101
+offset = 47, key = null, value = test-101<br/>
+offset = 48, key = null, value = test-100<br/>
+offset = 49, key = null, value = test-101<br/>
 
 
 用java代码方式,获取具体数据源,调用即可