Skip to content

Kafka Queue Handler #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
12 changes: 8 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
"license": "MIT",
"require": {
"php": ">=7.1",
"illuminate/queue": "^6.0|^7.0|^8.0",
"illuminate/queue": "^9.0",
"queue-interop/amqp-interop": "^0.8",
"queue-interop/queue-interop": "^0.7|^0.8",
"enqueue/enqueue": "^0.10",
"enqueue/dsn": "^0.10"
"enqueue/enqueue-dev": "dev-master"
},
"require-dev": {
"phpunit/phpunit": "~5.5",
"enqueue/enqueue": "^0.10",
"enqueue/null": "^0.10@dev",
"enqueue/test": "^0.10@dev",
"enqueue/simple-client": "^0.10@dev"
Expand All @@ -25,6 +23,12 @@
"/Tests/"
]
},
"repositories": [
{
"type": "vcs",
"url": "https://github.com/weconnectdata/enqueue-dev"
}
],
"suggest": {
"enqueue/simple-client": "If you want to use enqueue client and cli commands"
},
Expand Down
39 changes: 38 additions & 1 deletion src/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Interop\Queue\Context;
use Interop\Queue\Exception\DeliveryDelayNotSupportedException;
use Interop\Queue\Message;
use ReflectionClass;

class Job extends BaseJob implements JobContract
{
Expand Down Expand Up @@ -51,6 +52,42 @@ public function delete()
$this->consumer->acknowledge($this->message);
}

/**
* {@inheritdoc}
*/
public function fire()
{
$handlerClass = config('queue.connections.' . $this->getConnectionName() . '.handler');
$timeout = config('queue.connections.' . $this->getConnectionName() . '.timeout');

if (! empty($handlerClass)) {
return (new $handlerClass($this->consumer->receive($timeout)))->handle();
} else {
return parent::fire();
}
}

/**
* In case the payload is not a serialised PHP message, provide a default fall back
*
* @return array
* @throws \ReflectionException
*/
public function payload()
{
if (empty(parent::payload())) {
$handlerClass = config('queue.connections.' . $this->getConnectionName() . '.handler');

if (! empty($handlerClass)) {
return [
'job' => $handlerClass
];
}
}

return parent::payload();
}

/**
* {@inheritdoc}
*/
Expand All @@ -60,7 +97,7 @@ public function release($delay = 0)

$requeueMessage = clone $this->message;
$requeueMessage->setProperty('x-attempts', $this->attempts() + 1);

$producer = $this->context->createProducer();

try {
Expand Down
4 changes: 2 additions & 2 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,15 @@ public function onPostMessageReceived(PostMessageReceived $context): void
}
}

public function stop($status = 0)
public function stop($status = 0, $options = null)
{
if ($this->interop) {
$this->stopped = true;

return;
}

parent::stop($status);
parent::stop($status, $options);
}
}