|
117 | 117 | topic_a = generate_topic_name
|
118 | 118 | topic_b = generate_topic_name
|
119 | 119 |
|
120 |
| - messages_a = (1..500).to_a |
121 |
| - messages_b = (501..1000).to_a |
| 120 | + messages_a = (1..5).to_a |
| 121 | + messages_b = (6..10).to_a |
122 | 122 | messages = messages_a + messages_b
|
123 | 123 |
|
124 |
| - begin |
125 |
| - kafka = Kafka.new(kafka_brokers, client_id: "test") |
126 |
| - producer = kafka.producer |
| 124 | + producer = Kafka.new(kafka_brokers, client_id: "test").producer |
127 | 125 |
|
128 |
| - messages_a.each do |i| |
129 |
| - producer.produce(i.to_s, topic: topic_a) |
130 |
| - end |
131 |
| - |
132 |
| - producer.deliver_messages |
133 |
| - end |
| 126 | + messages_a.each { |i| producer.produce(i.to_s, topic: topic_a) } |
| 127 | + producer.deliver_messages |
134 | 128 |
|
135 | 129 | group_id = "test#{rand(1000)}"
|
136 | 130 |
|
137 |
| - mutex = Mutex.new |
138 | 131 | received_messages = []
|
139 | 132 |
|
140 |
| - consumers = 2.times.map do |
141 |
| - kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger) |
142 |
| - consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time) |
143 |
| - consumer.subscribe(/#{topic_a}|#{topic_b}/, refresh_topic_interval: 2) |
144 |
| - consumer |
145 |
| - end |
| 133 | + kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger) |
| 134 | + consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time) |
| 135 | + consumer.subscribe(/#{topic_a}|#{topic_b}/, refresh_topic_interval: 2) |
146 | 136 |
|
147 |
| - threads = consumers.map do |consumer| |
148 |
| - t = Thread.new do |
149 |
| - consumer.each_message do |message| |
150 |
| - mutex.synchronize do |
151 |
| - received_messages << message |
| 137 | + thread = Thread.new do |
| 138 | + consumer.each_message do |message| |
| 139 | + received_messages << message |
152 | 140 |
|
153 |
| - if received_messages.count == messages.count |
154 |
| - consumers.each(&:stop) |
155 |
| - end |
156 |
| - end |
| 141 | + if received_messages.count == messages.count |
| 142 | + consumer.stop |
157 | 143 | end
|
158 | 144 | end
|
159 |
| - |
160 |
| - t.abort_on_exception = true |
161 |
| - |
162 |
| - t |
163 | 145 | end
|
| 146 | + thread.abort_on_exception = true |
164 | 147 |
|
165 |
| - begin |
166 |
| - kafka = Kafka.new(kafka_brokers, client_id: "test") |
167 |
| - producer = kafka.producer |
168 |
| - |
169 |
| - messages_b.each do |i| |
170 |
| - producer.produce(i.to_s, topic: topic_b) |
171 |
| - end |
172 |
| - |
173 |
| - producer.deliver_messages |
174 |
| - end |
| 148 | + messages_b.each { |i| producer.produce(i.to_s, topic: topic_b) } |
| 149 | + producer.deliver_messages |
175 | 150 |
|
176 |
| - threads.each(&:join) |
| 151 | + thread.join |
177 | 152 |
|
178 | 153 | expect(received_messages.map(&:value).map(&:to_i).sort).to match_array messages
|
179 | 154 | end
|
|
0 commit comments