-
Notifications
You must be signed in to change notification settings - Fork 179
/
Copy pathtest_out_rdkafka2.rb
120 lines (102 loc) · 3.21 KB
/
test_out_rdkafka2.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
require 'helper'
require 'fluent/test/helpers'
require 'fluent/test/driver/input'
require 'fluent/test/driver/output'
require 'securerandom'
class Rdkafka2OutputTest < Test::Unit::TestCase
include Fluent::Test::Helpers
def have_rdkafka
begin
require 'fluent/plugin/out_rdkafka2'
true
rescue LoadError
false
end
end
def setup
omit_unless(have_rdkafka, "rdkafka isn't installed")
Fluent::Test.setup
end
def base_config
config_element('ROOT', '', {"@type" => "rdkafka2"}, [
config_element('format', "", {"@type" => "json"})
])
end
def config(default_topic: "kitagawakeiko")
base_config + config_element('ROOT', '', {"default_topic" => default_topic,
"brokers" => "localhost:9092"}, [
])
end
def create_driver(conf = config, tag='test')
Fluent::Test::Driver::Output.new(Fluent::Rdkafka2Output).configure(conf)
end
def test_configure
assert_nothing_raised(Fluent::ConfigError) {
create_driver(base_config)
}
assert_nothing_raised(Fluent::ConfigError) {
create_driver(config)
}
assert_nothing_raised(Fluent::ConfigError) {
create_driver(config + config_element('buffer', "", {"@type" => "memory"}))
}
d = create_driver
assert_equal 'kitagawakeiko', d.instance.default_topic
assert_equal 'localhost:9092', d.instance.brokers
end
def test_mutli_worker_support
d = create_driver
assert_equal true, d.instance.multi_workers_ready?
end
class WriteTest < self
TOPIC_NAME = "kafka-output-#{SecureRandom.uuid}"
INPUT_CONFIG = %[
@type kafka
brokers localhost:9092
format json
@label @kafka
topics #{TOPIC_NAME}
]
def create_target_driver(conf = INPUT_CONFIG)
Fluent::Test::Driver::Input.new(Fluent::KafkaInput).configure(conf)
end
def setup
@kafka = nil
omit_unless(have_rdkafka, "rdkafka isn't installed")
@kafka = Kafka.new(["localhost:9092"], client_id: 'kafka')
end
def teardown
if @kafka
@kafka.delete_topic(TOPIC_NAME)
@kafka.close
end
end
def test_write
target_driver = create_target_driver
expected_message = {"a" => 2}
target_driver.run(expect_records: 1, timeout: 5) do
sleep 2
d = create_driver(config(default_topic: TOPIC_NAME))
d.run do
d.feed("test", event_time, expected_message)
end
end
actual_messages = target_driver.events.collect { |event| event[2] }
assert_equal([expected_message], actual_messages)
end
def test_exclude_fields
conf = config(default_topic: TOPIC_NAME) +
config_element('ROOT', '', {"exclude_fields" => "$.foo"}, [])
target_driver = create_target_driver
target_driver.run(expect_records: 1, timeout: 5) do
sleep 2
d = create_driver(conf)
d.run do
d.feed('test', event_time, {'a' => 'b', 'foo' => 'bar', 'message' => 'test'})
end
end
actual_messages = target_driver.events.collect { |event| event[2] }
assert_equal([{'a' => 'b', 'message' => 'test'}], actual_messages)
end
end
end