|
1 | 1 | # frozen_string_literal: true
|
2 | 2 |
|
3 | 3 | require "fake_broker"
|
| 4 | +require "fake_producer_interceptor" |
4 | 5 | require "timecop"
|
| 6 | +require "kafka/interceptors" |
5 | 7 |
|
6 | 8 | describe Kafka::Producer do
|
7 | 9 | let(:logger) { LOGGER }
|
|
20 | 22 |
|
21 | 23 | allow(cluster).to receive(:get_leader).with("greetings", 0) { broker1 }
|
22 | 24 | allow(cluster).to receive(:get_leader).with("greetings", 1) { broker2 }
|
| 25 | + allow(cluster).to receive(:disconnect).and_return(nil) |
23 | 26 |
|
24 | 27 | allow(compressor).to receive(:compress) {|message_set| message_set }
|
25 | 28 |
|
|
30 | 33 | allow(transaction_manager).to receive(:producer_epoch).and_return(0)
|
31 | 34 | allow(transaction_manager).to receive(:transactional_id).and_return(nil)
|
32 | 35 | allow(transaction_manager).to receive(:send_offsets_to_txn).and_return(nil)
|
| 36 | + allow(transaction_manager).to receive(:close).and_return(nil) |
33 | 37 | end
|
34 | 38 |
|
35 | 39 | describe "#produce" do
|
|
360 | 364 | end
|
361 | 365 | end
|
362 | 366 |
|
| 367 | + describe '#interceptor' do |
| 368 | + let(:now) { Time.now } |
| 369 | + let(:pending_message) { |
| 370 | + Kafka::PendingMessage.new( |
| 371 | + value: "hello1", |
| 372 | + key: nil, |
| 373 | + headers: { |
| 374 | + hello: 'World' |
| 375 | + }, |
| 376 | + topic: "greetings", |
| 377 | + partition: 0, |
| 378 | + partition_key: nil, |
| 379 | + create_time: now |
| 380 | + ) |
| 381 | + } |
| 382 | + |
| 383 | + it "creates and shuts down a producer with interceptor" do |
| 384 | + interceptor = FakeProducerInterceptor.new |
| 385 | + producer = initialize_producer( |
| 386 | + interceptors: [interceptor], |
| 387 | + cluster: cluster, |
| 388 | + transaction_manager: transaction_manager |
| 389 | + ) |
| 390 | + |
| 391 | + producer.shutdown |
| 392 | + end |
| 393 | + |
| 394 | + it "chains call" do |
| 395 | + interceptor1 = FakeProducerInterceptor.new(append_s: 'hello2') |
| 396 | + interceptor2 = FakeProducerInterceptor.new(append_s: 'hello3') |
| 397 | + interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger) |
| 398 | + intercepted_message = interceptors.call(pending_message) |
| 399 | + |
| 400 | + expect(intercepted_message).to eq Kafka::PendingMessage.new( |
| 401 | + value: "hello1hello2hello3", |
| 402 | + key: nil, |
| 403 | + headers: { |
| 404 | + hello: 'World' |
| 405 | + }, |
| 406 | + topic: "greetings", |
| 407 | + partition: 0, |
| 408 | + partition_key: nil, |
| 409 | + create_time: now |
| 410 | + ) |
| 411 | + end |
| 412 | + |
| 413 | + it "does not break the call chain" do |
| 414 | + interceptor1 = FakeProducerInterceptor.new(append_s: 'hello2', on_call_error: true) |
| 415 | + interceptor2 = FakeProducerInterceptor.new(append_s: 'hello3') |
| 416 | + interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger) |
| 417 | + intercepted_message = interceptors.call(pending_message) |
| 418 | + |
| 419 | + expect(intercepted_message).to eq Kafka::PendingMessage.new( |
| 420 | + value: "hello1hello3", |
| 421 | + key: nil, |
| 422 | + headers: { |
| 423 | + hello: 'World' |
| 424 | + }, |
| 425 | + topic: "greetings", |
| 426 | + partition: 0, |
| 427 | + partition_key: nil, |
| 428 | + create_time: now |
| 429 | + ) |
| 430 | + end |
| 431 | + |
| 432 | + it "returns original message when all interceptors fail" do |
| 433 | + interceptor1 = FakeProducerInterceptor.new(append_s: 'hello2', on_call_error: true) |
| 434 | + interceptor2 = FakeProducerInterceptor.new(append_s: 'hello3', on_call_error: true) |
| 435 | + interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger) |
| 436 | + intercepted_message = interceptors.call(pending_message) |
| 437 | + |
| 438 | + expect(intercepted_message).to eq pending_message |
| 439 | + end |
| 440 | + end |
| 441 | + |
363 | 442 | def initialize_producer(**options)
|
364 | 443 | default_options = {
|
365 | 444 | cluster: cluster,
|
|
0 commit comments