Skip to content

Commit 81a0d7d

Browse files
pranav-patilPranav Patil
authored and
Pranav Patil
committed
Adding Kafka message broker and Zipkin stream service for kafka
1 parent 2af5892 commit 81a0d7d

File tree

10 files changed

+274
-7
lines changed

10 files changed

+274
-7
lines changed

analytics-service/build.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ dependencies {
4040
compile('org.springframework.cloud:spring-cloud-starter-netflix-eureka-client')
4141
compile('org.springframework.cloud:spring-cloud-starter-netflix-hystrix')
4242
compile('org.springframework.cloud:spring-cloud-starter-config')
43+
compile('org.springframework.cloud:spring-cloud-starter-zipkin')
44+
compile('org.springframework.cloud:spring-cloud-sleuth-zipkin')
45+
compile('org.springframework.kafka:spring-kafka')
4346
compile('org.springframework.security.oauth:spring-security-oauth2')
4447
compile('com.google.code.gson:gson:2.8.4')
4548
runtime('com.h2database:h2')

config-service/src/main/resources/shared/analytics-service.yml

+13
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
spring:
2+
zipkin:
3+
baseUrl: http://localhost:9411/
4+
sender:
5+
type: kafka
6+
sleuth:
7+
sampler:
8+
probability: 1.0
19

210
server:
311
port: 8309
@@ -41,3 +49,8 @@ management:
4149
web:
4250
exposure:
4351
include: hystrix.stream,info,health
52+
53+
sample:
54+
zipkin:
55+
# When enabled=false, traces log to the console. When enabled=true traces send to zipkin
56+
enabled: true

config-service/src/main/resources/shared/finance-service.yml

+12-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
spring:
2-
cloud:
3-
refresh:
4-
enabled: false
2+
zipkin:
3+
baseUrl: http://localhost:9411/
4+
sender:
5+
type: kafka
6+
sleuth:
7+
sampler:
8+
probability: 1.0
59

610
server:
711
port: 8301
@@ -47,3 +51,8 @@ management:
4751
web:
4852
exposure:
4953
include: hystrix.stream,info,health
54+
55+
sample:
56+
zipkin:
57+
# When enabled=false, traces log to the console. When enabled=true traces send to zipkin
58+
enabled: true

finance-service/build.gradle

+3-1
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,16 @@ ext {
3030

3131
dependencies {
3232
compile('org.springframework.boot:spring-boot-starter-actuator')
33-
compile('org.springframework.boot:spring-boot-starter-data-jpa')
3433
compile('org.springframework.boot:spring-boot-starter-data-rest')
3534
compile('org.springframework.boot:spring-boot-starter-security')
3635
compile('org.springframework.cloud:spring-cloud-starter-config')
3736
compile('org.springframework.cloud:spring-cloud-starter-oauth2')
3837
compile('org.springframework.cloud:spring-cloud-starter-openfeign')
3938
compile('org.springframework.cloud:spring-cloud-starter-netflix-eureka-client')
4039
compile('org.springframework.cloud:spring-cloud-starter-netflix-hystrix')
40+
compile('org.springframework.cloud:spring-cloud-starter-zipkin')
41+
compile('org.springframework.cloud:spring-cloud-sleuth-zipkin')
42+
compile('org.springframework.kafka:spring-kafka')
4143
compile('org.springframework.security.oauth:spring-security-oauth2')
4244
compile('com.google.code.gson:gson:2.8.4')
4345
runtime('com.h2database:h2')

finance-service/src/main/java/com/emprovise/service/api/StockResource.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.emprovise.service.api;
22

3+
import brave.Tracer;
34
import com.emprovise.service.config.SSLUtil;
45
import com.emprovise.service.dto.StockDetailDTO;
56
import com.emprovise.service.mapper.StockDetailDTOMapper;
@@ -23,6 +24,8 @@ public class StockResource {
2324
private StockDetailDTOMapper stockDTOMapper;
2425
@Value("${alphavantage.apikey}")
2526
private String apiKey;
27+
@Autowired
28+
private Tracer tracer;
2629

2730
private static final String ALPHA_VANTAGE_URL = "https://www.alphavantage.co/query";
2831
private static final String TIME_SERIES_INTRADAY = "TIME_SERIES_INTRADAY";
@@ -51,7 +54,7 @@ public StockDetailDTO getStockDetailsFallback(String symbol, String interval) th
5154

5255
@GetMapping("/greeting")
5356
public String greeting() {
57+
tracer.currentSpan().tag("service.greeting", "hello_world");
5458
return "Hello World";
5559
}
56-
5760
}

kafka-broker/README.md

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
Kafka Message Broker
2+
=============
3+
4+
Apache Kafka is a fast and scalable messaging queue, capable of handling heavy read and write operations.
5+
Apache Kafka requires a running Zookeeper instance, which is used for reliable distributed co-ordination.
6+
Apache ZooKeeper is a distributed, open-source coordination service for distributed applications.
7+
ZooKeeper enables kafka to manage multiple clusters of kafka instances.
8+
9+
### Installation and Running of Apache Zookeeper
10+
11+
* Download latest [Apache Zookeeper release](http://zookeeper.apache.org/releases.html#download) and extract the .tar.gz file.
12+
* Copy and Rename **zoo_sample.cfg** to **zoo.cfg** file in ZOOKEEPER_HOME\conf directory, were ZOOKEEPER_HOME is the path of extracted zookeeper directory.
13+
* Find & edit the line in zoo.cfg file from "dataDir=/tmp/zookeeper" to "dataDir=ZOOKEEPER_HOME\data"
14+
* Go to ZOOKEEPER_HOME\bin directory and execute the command "zkserver" to run zookeeper.
15+
* Apache Zookeeper runs on default port 2181.
16+
17+
All the above steps can be executed directly by running below runZooKeeper gradle task for windows.
18+
19+
$ gradle runZooKeeper
20+
21+
### Installation and Running of Apache Kafka message broker
22+
23+
Before running kafka ensure that Apache Zookeeper instance is already running on default port 2181.
24+
25+
* Download latest stable [Apache Kafka release](https://kafka.apache.org/downloads) and extract the .tgz file.
26+
* Go to config directory in Apache Kafka in path KAFKA_HOME\config were KAFKA_HOME is the path of extracted kafka directory.
27+
* Find "log.dirs" in **server.properties** in config directory, and replace line line "log.dirs=/tmp/kafka-logs" to "log.dir= KAFKA_HOME\kafka-logs".
28+
* If Zookeeper is running on different machine or cluster, edit "zookeeper.connect:2181" to the custom IP and port.
29+
* Go to KAFKA_HOME directory and execute the below windows command to run kafka.
30+
* Apache Kafka runs on default port 9092.
31+
32+
33+
.\bin\windows\kafka-server-start.bat .\config\server.properties
34+
35+
All the above steps can be executed directly by running below runKafka gradle task for windows.
36+
37+
$ gradle runKafka
38+
39+
### Creating topics
40+
41+
Kafka topics can be created using the below windows command. Although **zipin** should already exist by default.
42+
43+
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zipin
44+
45+
Alternatively **zipin** topic can be created using below createZipkinTopic gradle task.
46+
47+
$ gradle createZipkinTopic
48+
49+
### Consuming topics
50+
51+
To produce messages to Kafka topic use the below windows command.
52+
53+
kafka-console-producer.bat --broker-list localhost:9092 --topic zipin
54+
55+
To consume the messages in Kafka topic use the below windows command.
56+
57+
kafka-console-consumer.bat --zookeeper localhost:2181 --topic zipin
58+
59+
Alternatively messages from **zipin** topic can viewed by running consumeZipkinTopic gradle task.
60+
61+
$ gradle consumeZipkinTopic
62+
63+
### Notes
64+
65+
* Microsoft Windows XP or later have the maximum length of command to 8191 characters. If it exceeds we get an error **The input line is too long. The syntax of the command is incorrect.**.
66+
Apache Kafka command has many classpath dependencies hence, running this in a nested directory could cause above error. To avoid this please run Apache Kafka in a directory within c: root seperately or clone this project in c: root itself.
67+
* On running the bat commands for zookeeper and kafka we might get unrelated errors like **\IBM\WebSphere was unexpected at this time.**. Not sure about the reason but this can be avoided by removing corresponding path from the environment variables.

kafka-broker/build.gradle

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
2+
description = "Kafka Message Broker"
3+
group = 'com.emprovise.service'
4+
version = '0.0.1-SNAPSHOT'
5+
6+
buildscript {
7+
ext {
8+
zookeeperVersion = '3.4.12'
9+
scalaVersion = '2.11'
10+
kafkaVersion = '1.1.0'
11+
}
12+
repositories {
13+
jcenter()
14+
}
15+
dependencies {
16+
classpath("de.undercouch:gradle-download-task:3.4.3")
17+
}
18+
}
19+
20+
apply plugin: 'de.undercouch.download'
21+
def pullCommonDir = new File(buildDir, 'repo')
22+
pullCommonDir.mkdirs()
23+
24+
task downloadFiles(type: Download) {
25+
src([
26+
"http://apache.mirrors.ionfish.org/zookeeper/zookeeper-${zookeeperVersion}/zookeeper-${zookeeperVersion}.tar.gz",
27+
"http://apache.mirrors.ionfish.org/kafka/${kafkaVersion}/kafka_${scalaVersion}-${kafkaVersion}.tgz"
28+
])
29+
30+
dest "${buildDir}/repo"
31+
overwrite false
32+
}
33+
34+
task extractArchives(dependsOn: downloadFiles) {
35+
doLast {
36+
copy {
37+
into new File(buildDir, "libs")
38+
pullCommonDir.eachFileRecurse {
39+
if (it.path.endsWith(".tar.gz") || it.path.endsWith(".tgz")) {
40+
from tarTree(it)
41+
}
42+
}
43+
}
44+
}
45+
}
46+
47+
task copyAndRename(dependsOn: extractArchives) {
48+
doLast {
49+
if (!file("${buildDir}/libs/zookeeper-${zookeeperVersion}/conf/zoo.cfg").exists()) {
50+
println "copying zoo.cfg file..."
51+
def buildDirPath = "${buildDir}".replace("\\", "/")
52+
53+
copy {
54+
from "${buildDir}/libs/zookeeper-${zookeeperVersion}/conf/zoo_sample.cfg"
55+
into "${buildDir}/libs/zookeeper-${zookeeperVersion}/conf"
56+
filter { line -> line.replaceAll("dataDir=/tmp/zookeeper", "dataDir=${buildDirPath}/libs/zookeeper-${zookeeperVersion}/data") }
57+
rename { String fileName ->
58+
fileName.replace("zoo_sample.cfg", "zoo.cfg")
59+
}
60+
}
61+
}
62+
63+
def kafkaConfigFile = new File("${buildDir}/libs/kafka_${scalaVersion}-${kafkaVersion}/config/server.properties")
64+
65+
if (kafkaConfigFile.exists()) {
66+
println "modifying kafka config file..."
67+
def fileText = kafkaConfigFile.text
68+
kafkaConfigFile.withWriter { writer ->
69+
writer.print fileText.replace("log.dirs=/tmp/kafka-logs", "log.dirs=${buildDir}\\libs\\kafka_${scalaVersion}-${kafkaVersion}\\kafka-logs")
70+
}
71+
}
72+
}
73+
}
74+
75+
def KAFKA_HOME = "${buildDir}\\libs\\kafka_${scalaVersion}-${kafkaVersion}"
76+
77+
task runZooKeeper(type: Exec, dependsOn: copyAndRename) {
78+
workingDir "${buildDir}/libs/zookeeper-${zookeeperVersion}/bin"
79+
commandLine 'cmd', '/c', 'zkServer'
80+
}
81+
82+
task runKafka(type: Exec, dependsOn: copyAndRename) {
83+
workingDir "${KAFKA_HOME}\\bin\\windows"
84+
commandLine 'cmd', '/c', "kafka-server-start.bat ..\\..\\config\\server.properties"
85+
}
86+
87+
task createZipkinTopic(type: Exec) {
88+
workingDir "${KAFKA_HOME}\\bin\\windows"
89+
commandLine 'cmd', '/c', "kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zipkin"
90+
}
91+
92+
task consumeZipkinTopic(type: Exec) {
93+
workingDir "${KAFKA_HOME}\\bin\\windows"
94+
commandLine 'cmd', '/c', "kafka-console-consumer.bat --zookeeper localhost:2181 --topic zipkin"
95+
}

settings.gradle

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ include "config-service"
66
include "discovery-service"
77
include "finance-service"
88
include "gateway-service"
9-
include "sqldb-service"
9+
include "kafka-broker"
1010
include "monitor-service"
11-
11+
include "sqldb-service"
12+
include "zipkin-service"

zipkin-service/README.md

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
Zipkin Service
2+
=============
3+
4+
[OpenZipkin](https://github.com/openzipkin/zipkin) is a distributed tracing system which enables to trace requests spanning across multiple micro services.
5+
Distributed tracing is a process of collecting end-to-end transaction graphs in real time.
6+
Zipkin collects the request data in form of traces and spans. A trace represents entire journey of a request, while a span represents single operation call.
7+
The first service called has traceId and spanId, while the subsequent service call have additional parentId including traceId and spanId, which essentially is spanId of the caller service.
8+
9+
[Spring Cloud Sleuth](http://cloud.spring.io/spring-cloud-sleuth/single/spring-cloud-sleuth.html) is a tracer which adds unique trace and span ids to Slf4J MDC.
10+
It has an instrumentation or sampling policy represented by application property spring.sleuth.sampler.probability.
11+
12+
### Running the Zipkin default application (using http for trace requests)
13+
14+
In order to run the default [zipkin-server](https://github.com/openzipkin/zipkin/tree/master/zipkin-server) use the gradle runZipkin task
15+
16+
$ gradle runZipkin
17+
18+
The runZipkin downloads the [zipkin-server-executable jar](https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec) from maven repository.
19+
It then executes the jar using default java command. Instead of using the above gradle task, you can directly execute the below command.
20+
21+
$ java -jar zipkin-server-2.10.1-exec.jar
22+
23+
### Running the Zipkin Kafka application (using kafka message broker for trace messages)
24+
25+
Before running zipkin with kafka message broker, go to kafka-broker service to run kafka message broker.
26+
By default the kafka message broker executes on port 9092. Then use the below runZipkinKafka gradle task to run zipkin stream server configured with default kafka message broker.
27+
28+
$ gradle runZipkinKafka
29+
30+
Instead of using the above gradle task, you can directly execute the below java command with the additional kafka config argument [zipkin.collector.kafka.bootstrap-servers](https://github.com/openzipkin/zipkin/tree/master/zipkin-autoconfigure/collector-kafka).
31+
32+
$ java -jar zipkin-server-2.10.1-exec.jar -Dzipkin.collector.kafka.bootstrap-servers=127.0.0.1:9092
33+
34+
### Accessing Zipin Server
35+
36+
To view the zipin traces for micro services using zipin's brave UI, access the server with default port 9411.
37+
38+
http://localhost:9411/

zipkin-service/build.gradle

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
2+
description = "Zipkin Stream Server"
3+
group = 'com.emprovise.service'
4+
version = '0.0.1-SNAPSHOT'
5+
sourceCompatibility = 1.8
6+
7+
buildscript {
8+
ext {
9+
zipkinVersion = '2.10.1'
10+
}
11+
repositories {
12+
jcenter()
13+
}
14+
dependencies {
15+
classpath("de.undercouch:gradle-download-task:3.4.3")
16+
}
17+
}
18+
19+
apply plugin: 'de.undercouch.download'
20+
21+
task downloadFile(type: Download) {
22+
src "https://search.maven.org/remotecontent?filepath=io/zipkin/java/zipkin-server/$zipkinVersion/zipkin-server-$zipkinVersion-exec.jar"
23+
dest "$buildDir/runtime/libs/zipkin-server-$zipkinVersion-exec.jar"
24+
overwrite false
25+
}
26+
27+
task runZipkin(type: JavaExec, dependsOn: downloadFile) {
28+
main = "-jar"
29+
args = [ "$buildDir/runtime/libs/zipkin-server-$zipkinVersion-exec.jar" ]
30+
}
31+
32+
task runZipkinKafka(type: JavaExec, dependsOn: downloadFile) {
33+
main = "-jar"
34+
jvmArgs = [ "-Dzipkin.collector.kafka.bootstrap-servers=127.0.0.1:9092" ]
35+
args = [ "$buildDir/runtime/libs/zipkin-server-$zipkinVersion-exec.jar" ]
36+
}

0 commit comments

Comments
 (0)