Skip to content

Commit 8474482

Browse files
committed
Changed: Change partition recovery hierachy in RdKafkaProducer
1 parent 3880257 commit 8474482

File tree

2 files changed

+115
-1
lines changed

2 files changed

+115
-1
lines changed

pkg/rdkafka/RdKafkaProducer.php

+17-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,22 @@ public function __construct(VendorProducer $producer, Serializer $serializer)
2828
$this->setSerializer($serializer);
2929
}
3030

31+
32+
/**
33+
* @param RdKafkaTopic $destination
34+
* @param RdKafkaMessage $message
35+
*/
36+
private function getPartition(Destination $destination, Message $message): int
37+
{
38+
if (!is_null($message->getPartition())) {
39+
return $message->getPartition();
40+
}
41+
if (!is_null($destination->getPartition())) {
42+
return $destination->getPartition();
43+
}
44+
return RD_KAFKA_PARTITION_UA;
45+
}
46+
3147
/**
3248
* @param RdKafkaTopic $destination
3349
* @param RdKafkaMessage $message
@@ -37,7 +53,7 @@ public function send(Destination $destination, Message $message): void
3753
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
3854
InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);
3955

40-
$partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA;
56+
$partition = $this->getPartition($destination, $message);
4157
$payload = $this->serializer->toString($message);
4258
$key = $message->getKey() ?: $destination->getKey() ?: null;
4359

pkg/rdkafka/Tests/RdKafkaProducerTest.php

+98
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,104 @@ public function testShouldAllowSerializersToSerializeKeys()
225225
$producer->send(new RdKafkaTopic('theQueueName'), $message);
226226
}
227227

228+
public function testShouldGetPartitionFromMessage(): void
229+
{
230+
$partition = 1;
231+
232+
$kafkaTopic = $this->createKafkaTopicMock();
233+
$kafkaTopic
234+
->expects($this->once())
235+
->method('producev')
236+
->with(
237+
$partition,
238+
0,
239+
'theSerializedMessage',
240+
'theSerializedKey'
241+
)
242+
;
243+
244+
$kafkaProducer = $this->createKafkaProducerMock();
245+
$kafkaProducer
246+
->expects($this->once())
247+
->method('newTopic')
248+
->willReturn($kafkaTopic)
249+
;
250+
$kafkaProducer
251+
->expects($this->once())
252+
->method('poll')
253+
->with(0)
254+
;
255+
$messageHeaders = ['bar' => 'barVal'];
256+
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders);
257+
$message->setKey('key');
258+
$message->setPartition($partition);
259+
260+
$serializer = $this->createSerializerMock();
261+
$serializer
262+
->expects($this->once())
263+
->method('toString')
264+
->willReturnCallback(function () use ($message) {
265+
$message->setKey('theSerializedKey');
266+
267+
return 'theSerializedMessage';
268+
})
269+
;
270+
271+
$destination = new RdKafkaTopic('theQueueName');
272+
273+
$producer = new RdKafkaProducer($kafkaProducer, $serializer);
274+
$producer->send($destination, $message);
275+
}
276+
277+
public function testShouldGetPartitionFromDestination(): void
278+
{
279+
$partition = 2;
280+
281+
$kafkaTopic = $this->createKafkaTopicMock();
282+
$kafkaTopic
283+
->expects($this->once())
284+
->method('producev')
285+
->with(
286+
$partition,
287+
0,
288+
'theSerializedMessage',
289+
'theSerializedKey'
290+
)
291+
;
292+
293+
$kafkaProducer = $this->createKafkaProducerMock();
294+
$kafkaProducer
295+
->expects($this->once())
296+
->method('newTopic')
297+
->willReturn($kafkaTopic)
298+
;
299+
$kafkaProducer
300+
->expects($this->once())
301+
->method('poll')
302+
->with(0)
303+
;
304+
$messageHeaders = ['bar' => 'barVal'];
305+
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders);
306+
$message->setKey('key');
307+
308+
$serializer = $this->createSerializerMock();
309+
$serializer
310+
->expects($this->once())
311+
->method('toString')
312+
->willReturnCallback(function () use ($message) {
313+
$message->setKey('theSerializedKey');
314+
315+
return 'theSerializedMessage';
316+
})
317+
;
318+
319+
$destination = new RdKafkaTopic('theQueueName');
320+
$destination->setPartition($partition);
321+
322+
$producer = new RdKafkaProducer($kafkaProducer, $serializer);
323+
$producer->send($destination, $message);
324+
}
325+
228326
/**
229327
* @return \PHPUnit\Framework\MockObject\MockObject|ProducerTopic
230328
*/

0 commit comments

Comments
 (0)