forked from php-enqueue/enqueue-dev
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathRdKafkaSendToAndReceiveFromTopicTest.php
58 lines (44 loc) · 1.59 KB
/
RdKafkaSendToAndReceiveFromTopicTest.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<?php
namespace Enqueue\RdKafka\Tests\Spec;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Interop\Queue\Message;
use Interop\Queue\Spec\SendToAndReceiveFromTopicSpec;
/**
* @group rdkafka
* @group functional
* @retry 5
*/
class RdKafkaSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec
{
public function test()
{
$context = $this->createContext();
$topic = $this->createTopic($context, uniqid('', true));
$expectedBody = __CLASS__.time();
$producer = $context->createProducer();
$producer->send($topic, $context->createMessage($expectedBody));
// Calling close causes Producer to flush (wait for messages to be delivered to Kafka)
$context->close();
$consumer = $context->createConsumer($topic);
$context->createProducer()->send($topic, $context->createMessage($expectedBody));
$message = $consumer->receive(10000); // 10 sec
$this->assertInstanceOf(Message::class, $message);
$consumer->acknowledge($message);
$this->assertSame($expectedBody, $message->getBody());
}
protected function createContext()
{
$config = [
'global' => [
'group.id' => uniqid('', true),
'metadata.broker.list' => getenv('RDKAFKA_HOST').':'.getenv('RDKAFKA_PORT'),
'enable.auto.commit' => 'false',
],
'topic' => [
'auto.offset.reset' => 'beginning',
],
];
$context = (new RdKafkaConnectionFactory($config))->createContext();
return $context;
}
}