Skip to content

[client] Improve client extension. #517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 30, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .php_cs.dist → .php_cs.php
Original file line number Diff line number Diff line change
@@ -21,9 +21,10 @@
'psr4' => true,
'strict_param' => true,
))
->setCacheFile(getenv('TRAVIS') ? getenv('HOME') . '/.php-cs-fixer' : __DIR__.'/var/.php_cs.cache')
->setFinder(
PhpCsFixer\Finder::create()
->name('/\.php$/')
->in(__DIR__)
)
;
;
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ matrix:
cache:
directories:
- $HOME/.composer/cache
- $HOME/.php-cs-fixer

before_install:
- echo "extension = mongodb.so" >> $HOME/.phpenv/versions/$(phpenv version-name)/etc/php.ini
@@ -52,9 +53,9 @@ install:
- if [ "$PREPARE_CONTAINER" = true ]; then bin/dev -b; fi

script:
- IFS=$'\n'; COMMIT_SCA_FILES=($(git diff --name-only --diff-filter=ACMRTUXB "${TRAVIS_COMMIT_RANGE}")); unset IFS
- if [ "$PHP_CS_FIXER" = true ]; then ./bin/php-cs-fixer --no-interaction --dry-run --diff -v --path-mode=intersection -- "${COMMIT_SCA_FILES[@]}" fix; fi
- if [ "$PHPSTAN" = true ]; then docker run --workdir="/mqdev" -v "`pwd`:/mqdev" --rm enqueue/dev:latest php -d memory_limit=1024M bin/phpstan analyse -l 1 -c phpstan.neon -- "${COMMIT_SCA_FILES[@]}" ; fi
- PKG_PHP_CHANGED_FILES=`./bin/git-find-changed-php-files.sh "${TRAVIS_COMMIT_RANGE}"`
- if [ "$PHP_CS_FIXER" = true ] && [ ! -z "${PKG_PHP_CHANGED_FILES}" ]; then ./bin/php-cs-fixer fix --config=.php_cs.php --no-interaction --dry-run --diff -v --path-mode=intersection -- ${PKG_PHP_CHANGED_FILES[@]} ; fi
- if [ "$PHPSTAN" = true ] && [ ! -z "${PKG_PHP_CHANGED_FILES}" ]; then docker run --workdir="/mqdev" -v "`pwd`:/mqdev" --rm enqueue/dev:latest php -d memory_limit=1024M bin/phpstan analyse -l 1 -c phpstan.neon -- ${PKG_PHP_CHANGED_FILES[@]} ; fi
- if [ "$UNIT_TESTS" = true ]; then bin/phpunit --exclude-group=functional; fi
- if [ "$FUNCTIONAL_TESTS" = true ]; then bin/test.sh --exclude-group=rdkafka; fi
- if [ "RDKAFKA_TESTS" = true ]; then bin/test.sh --group=rdkafka; fi
15 changes: 15 additions & 0 deletions bin/git-find-changed-php-files.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env bash

if (( "$#" != 1 ))
then
echo "Git range must be provided"
exit 1
fi


IFS='
'
ALL_CHANGED_FILES=$(git diff --name-only --diff-filter=ACMRTUXB "$1");
PKG_PHP_CHANGED_FILES=$(echo "$ALL_CHANGED_FILES" | grep -E "^pkg\/" | grep -E ".*?\.php$");

echo "$PKG_PHP_CHANGED_FILES";
6 changes: 5 additions & 1 deletion bin/pre-commit
Original file line number Diff line number Diff line change
@@ -68,6 +68,10 @@ function getFilesToFix()
return (bool) preg_match('/\.(php|twig|translations\/*.yml)$/', $file);
});

$stagedFiles = array_filter($stagedFiles, function ($file) {
return (bool) preg_match('/^pkg\//', $file);
});

return $stagedFiles;
}

@@ -104,7 +108,7 @@ function runPhpCsFixer()
$returnCode = null;

exec(sprintf(
'%s %s fix %s --dry-run',
'%s %s fix %s --dry-run --config=.php_cs.php',
$phpBin,
$phpCsFixerBin,
$projectRootDir.'/'.$file
34 changes: 22 additions & 12 deletions pkg/enqueue/Client/ChainExtension.php
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@

namespace Enqueue\Client;

class ChainExtension implements ExtensionInterface
final class ChainExtension implements ExtensionInterface
{
/**
* @var ExtensionInterface[]
@@ -14,26 +14,36 @@ class ChainExtension implements ExtensionInterface
*/
public function __construct(array $extensions)
{
$this->extensions = $extensions;
array_walk($extensions, function (ExtensionInterface $extension) {
$this->extensions[] = $extension;
});
}

/**
* {@inheritdoc}
*/
public function onPreSend($topic, Message $message)
public function onPreSendEvent(PreSend $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPreSend($topic, $message);
$extension->onPreSendEvent($event);
}
}

/**
* {@inheritdoc}
*/
public function onPostSend($topic, Message $message)
public function onPreSendCommand(PreSend $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPreSendCommand($event);
}
}

public function onDriverPreSend(DriverPreSend $context): void
{
foreach ($this->extensions as $extension) {
$extension->onDriverPreSend($context);
}
}

public function onPostSend(PostSend $event): void
{
foreach ($this->extensions as $extension) {
$extension->onPostSend($topic, $message);
$extension->onPostSend($event);
}
}
}
Original file line number Diff line number Diff line change
@@ -3,15 +3,16 @@
namespace Enqueue\Client\ConsumptionExtension;

use Enqueue\Client\Config;
use Enqueue\Client\EmptyExtensionTrait as ClientEmptyExtensionTrait;
use Enqueue\Client\ExtensionInterface as ClientExtensionInterface;
use Enqueue\Client\Message;
use Enqueue\Client\PreSend;
use Enqueue\Consumption\Context;
use Enqueue\Consumption\EmptyExtensionTrait;
use Enqueue\Consumption\EmptyExtensionTrait as ConsumptionEmptyExtensionTrait;
use Enqueue\Consumption\ExtensionInterface as ConsumptionExtensionInterface;

class ExclusiveCommandExtension implements ConsumptionExtensionInterface, ClientExtensionInterface
final class ExclusiveCommandExtension implements ConsumptionExtensionInterface, ClientExtensionInterface
{
use EmptyExtensionTrait;
use ConsumptionEmptyExtensionTrait, ClientEmptyExtensionTrait;

/**
* @var string[]
@@ -60,26 +61,14 @@ public function onPreReceived(Context $context)
}
}

/**
* {@inheritdoc}
*/
public function onPreSend($topic, Message $message)
public function onPreSendCommand(PreSend $context): void
{
if (Config::COMMAND_TOPIC != $topic) {
return;
}
$message = $context->getMessage();
$command = $context->getCommand();

$commandName = $message->getProperty(Config::PARAMETER_COMMAND_NAME);
if (array_key_exists($commandName, $this->processorNameToQueueNameMap)) {
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $commandName);
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->processorNameToQueueNameMap[$commandName]);
if (array_key_exists($command, $this->processorNameToQueueNameMap)) {
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $command);
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->processorNameToQueueNameMap[$command]);
}
}

/**
* {@inheritdoc}
*/
public function onPostSend($topic, Message $message)
{
}
}
49 changes: 49 additions & 0 deletions pkg/enqueue/Client/DriverPreSend.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Enqueue\Client;

final class DriverPreSend
{
private $message;

private $producer;

private $driver;

public function __construct(Message $message, ProducerInterface $producer, DriverInterface $driver)
{
$this->message = $message;
$this->producer = $producer;
$this->driver = $driver;
}

public function getMessage(): Message
{
return $this->message;
}

public function getProducer(): ProducerInterface
{
return $this->producer;
}

public function getDriver(): DriverInterface
{
return $this->driver;
}

public function isEvent(): bool
{
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}

public function getCommand(): string
{
return $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
}

public function getTopic(): string
{
return $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}
}
22 changes: 22 additions & 0 deletions pkg/enqueue/Client/EmptyExtensionTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace Enqueue\Client;

trait EmptyExtensionTrait
{
public function onPreSendEvent(PreSend $context): void
{
}

public function onPreSendCommand(PreSend $context): void
{
}

public function onDriverPreSend(DriverPreSend $context): void
{
}

public function onPostSend(PostSend $context): void
{
}
}
59 changes: 59 additions & 0 deletions pkg/enqueue/Client/Extension/PrepareBodyExtension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

namespace Enqueue\Client\Extension;

use Enqueue\Client\EmptyExtensionTrait;
use Enqueue\Client\ExtensionInterface;
use Enqueue\Client\Message;
use Enqueue\Client\PreSend;
use Enqueue\Util\JSON;

class PrepareBodyExtension implements ExtensionInterface
{
use EmptyExtensionTrait;

public function onPreSendEvent(PreSend $context): void
{
$this->prepareBody($context->getMessage());
}

public function onPreSendCommand(PreSend $context): void
{
$this->prepareBody($context->getMessage());
}

private function prepareBody(Message $message): void
{
$body = $message->getBody();
$contentType = $message->getContentType();

if (is_scalar($body) || null === $body) {
$contentType = $contentType ?: 'text/plain';
$body = (string) $body;
} elseif (is_array($body)) {
// only array of scalars is allowed.
array_walk_recursive($body, function ($value) {
if (!is_scalar($value) && null !== $value) {
throw new \LogicException(sprintf(
'The message\'s body must be an array of scalars. Found not scalar in the array: %s',
is_object($value) ? get_class($value) : gettype($value)
));
}
});

$contentType = $contentType ?: 'application/json';
$body = JSON::encode($body);
} elseif ($body instanceof \JsonSerializable) {
$contentType = $contentType ?: 'application/json';
$body = JSON::encode($body);
} else {
throw new \InvalidArgumentException(sprintf(
'The message\'s body must be either null, scalar, array or object (implements \JsonSerializable). Got: %s',
is_object($body) ? get_class($body) : gettype($body)
));
}

$message->setContentType($contentType);
$message->setBody($body);
}
}
30 changes: 16 additions & 14 deletions pkg/enqueue/Client/ExtensionInterface.php
Original file line number Diff line number Diff line change
@@ -4,19 +4,21 @@

interface ExtensionInterface
{
/**
* @param string $topic
* @param Message $message
*
* @return
*/
public function onPreSend($topic, Message $message);
public function onPreSendEvent(PreSend $context): void;

/**
* @param string $topic
* @param Message $message
*
* @return
*/
public function onPostSend($topic, Message $message);
public function onPreSendCommand(PreSend $context): void;

public function onDriverPreSend(DriverPreSend $context): void;

public function onPostSend(PostSend $context): void;

// /**
// * @deprecated
// */
// public function onPreSend($topic, Message $message);
//
// /**
// * @deprecated
// */
// public function onPostSend($topic, Message $message);
}
49 changes: 49 additions & 0 deletions pkg/enqueue/Client/PostSend.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Enqueue\Client;

final class PostSend
{
private $message;

private $producer;

private $driver;

public function __construct(Message $message, ProducerInterface $producer, DriverInterface $driver)
{
$this->message = $message;
$this->producer = $producer;
$this->driver = $driver;
}

public function getMessage(): Message
{
return $this->message;
}

public function getProducer(): ProducerInterface
{
return $this->producer;
}

public function getDriver(): DriverInterface
{
return $this->driver;
}

public function isEvent(): bool
{
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}

public function getCommand(): string
{
return $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
}

public function getTopic(): string
{
return $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
}
}
Loading