Skip to content

Commit 38fc237

Browse files
committed
Add tests for out_rdkafka2
Signed-off-by: Takuro Ashie <[email protected]>
1 parent 57eed8c commit 38fc237

File tree

2 files changed

+108
-0
lines changed

2 files changed

+108
-0
lines changed

fluent-plugin-kafka.gemspec

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
1919
gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
2020
gem.add_dependency 'ltsv'
2121
gem.add_dependency 'ruby-kafka', '>= 1.4.0', '< 2'
22+
gem.add_development_dependency "rdkafka", ">= 0.10.0"
2223
gem.add_development_dependency "rake", ">= 0.9.2"
2324
gem.add_development_dependency "test-unit", ">= 3.0.8"
2425
gem.add_development_dependency "test-unit-rr", "~> 1.0"

test/plugin/test_out_rdkafka2.rb

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
require 'helper'
2+
require 'fluent/test/helpers'
3+
require 'fluent/test/driver/input'
4+
require 'fluent/test/driver/output'
5+
require 'securerandom'
6+
require 'fluent/plugin/out_rdkafka2'
7+
8+
class Rdkafka2OutputTest < Test::Unit::TestCase
9+
include Fluent::Test::Helpers
10+
11+
def setup
12+
Fluent::Test.setup
13+
end
14+
15+
def base_config
16+
config_element('ROOT', '', {"@type" => "rdkafka2"}, [
17+
config_element('format', "", {"@type" => "json"})
18+
])
19+
end
20+
21+
def config(default_topic: "kitagawakeiko")
22+
base_config + config_element('ROOT', '', {"default_topic" => default_topic,
23+
"brokers" => "localhost:9092"}, [
24+
])
25+
end
26+
27+
def create_driver(conf = config, tag='test')
28+
Fluent::Test::Driver::Output.new(Fluent::Rdkafka2Output).configure(conf)
29+
end
30+
31+
def test_configure
32+
assert_nothing_raised(Fluent::ConfigError) {
33+
create_driver(base_config)
34+
}
35+
36+
assert_nothing_raised(Fluent::ConfigError) {
37+
create_driver(config)
38+
}
39+
40+
assert_nothing_raised(Fluent::ConfigError) {
41+
create_driver(config + config_element('buffer', "", {"@type" => "memory"}))
42+
}
43+
44+
d = create_driver
45+
assert_equal 'kitagawakeiko', d.instance.default_topic
46+
assert_equal 'localhost:9092', d.instance.brokers
47+
end
48+
49+
def test_mutli_worker_support
50+
d = create_driver
51+
assert_equal true, d.instance.multi_workers_ready?
52+
end
53+
54+
class WriteTest < self
55+
TOPIC_NAME = "kafka-output-#{SecureRandom.uuid}"
56+
57+
INPUT_CONFIG = %[
58+
@type kafka
59+
brokers localhost:9092
60+
format json
61+
@label @kafka
62+
topics #{TOPIC_NAME}
63+
]
64+
65+
def create_target_driver(conf = INPUT_CONFIG)
66+
Fluent::Test::Driver::Input.new(Fluent::KafkaInput).configure(conf)
67+
end
68+
69+
def setup
70+
@kafka = Kafka.new(["localhost:9092"], client_id: 'kafka')
71+
end
72+
73+
def teardown
74+
@kafka.delete_topic(TOPIC_NAME)
75+
@kafka.close
76+
end
77+
78+
def test_write
79+
target_driver = create_target_driver
80+
expected_message = {"a" => 2}
81+
target_driver.run(expect_records: 1, timeout: 5) do
82+
sleep 2
83+
d = create_driver(config(default_topic: TOPIC_NAME))
84+
d.run do
85+
d.feed("test", event_time, expected_message)
86+
end
87+
end
88+
actual_messages = target_driver.events.collect { |event| event[2] }
89+
assert_equal([expected_message], actual_messages)
90+
end
91+
92+
def test_exclude_fields
93+
conf = config(default_topic: TOPIC_NAME) +
94+
config_element('ROOT', '', {"exclude_fields" => "$.foo"}, [])
95+
target_driver = create_target_driver
96+
target_driver.run(expect_records: 1, timeout: 5) do
97+
sleep 2
98+
d = create_driver(conf)
99+
d.run do
100+
d.feed('test', event_time, {'a' => 'b', 'foo' => 'bar', 'message' => 'test'})
101+
end
102+
end
103+
actual_messages = target_driver.events.collect { |event| event[2] }
104+
assert_equal([{'a' => 'b', 'message' => 'test'}], actual_messages)
105+
end
106+
end
107+
end

0 commit comments

Comments
 (0)