-
Notifications
You must be signed in to change notification settings - Fork 339
/
Copy pathbroker_spec.rb
97 lines (77 loc) · 2.4 KB
/
broker_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
require "kafka/protocol/message"
describe Kafka::Broker do
let(:logger) { LOGGER }
let(:connection) { FakeConnection.new }
let(:broker) { Kafka::Broker.new(connection: connection, logger: logger) }
class FakeConnection
def initialize
@mocked_response = nil
end
def mock_response(response)
@mocked_response = response
end
def send_request(request)
@mocked_response
end
end
describe "#address_match?" do
it "delegates to @connection" do
host = "test_host"
port = 333
connection = instance_double(Kafka::Connection)
allow(connection).to receive(:address_match?).with(host, port) { true }
broker = Kafka::Broker.new(connection: connection, logger: logger)
expect(broker.address_match?(host, port)).to be_truthy
end
end
describe "#metadata" do
it "fetches cluster metadata" do
response = Kafka::Protocol::MetadataResponse.new(brokers: [], controller_id: nil, topics: [])
connection.mock_response(response)
metadata = broker.fetch_metadata(topics: [])
expect(metadata).to eq response
end
end
describe "#produce" do
let(:message) { Kafka::Protocol::Message.new(key: "yo", value: "lo") }
it "waits for a response if acknowledgements are required" do
response = Kafka::Protocol::ProduceResponse.new
connection.mock_response(response)
actual_response = broker.produce(
required_acks: -1, # -1 means all replicas must ack
timeout: 1,
messages_for_topics: {
"yolos" => {
3 => [message],
}
}
)
expect(actual_response).to eq response
end
it "doesn't wait for a response if zero acknowledgements are required" do
response = broker.produce(
required_acks: 0, # 0 means the server doesn't respond or ack at all
timeout: 1,
messages_for_topics: {
"yolos" => {
3 => [message],
}
}
)
expect(response).to be_nil
end
end
describe "#fetch_messages" do
it "fetches messages from the specified topic/partition" do
response = Kafka::Protocol::ProduceResponse.new
connection.mock_response(response)
actual_response = broker.fetch_messages(
max_wait_time: 0,
min_bytes: 0,
max_bytes: 10 * 1024,
topics: {}
)
expect(actual_response.topics).to eq []
end
end
end