@@ -467,7 +467,7 @@ public function testShouldCallOnBeforeReceiveExtensionMethod()
467
467
$ queueConsumer ->consume ();
468
468
}
469
469
470
- public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods ()
470
+ public function testShouldCallOnPreReceivedExtensionMethodWithExpectedContext ()
471
471
{
472
472
$ expectedMessage = $ this ->createMessageMock ();
473
473
$ messageConsumerStub = $ this ->createMessageConsumerStub ($ expectedMessage );
@@ -497,6 +497,62 @@ public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods()
497
497
$ this ->assertFalse ($ context ->isExecutionInterrupted ());
498
498
})
499
499
;
500
+
501
+ $ chainExtensions = new ChainExtension ([$ extension , new BreakCycleExtension (1 )]);
502
+ $ queueConsumer = new QueueConsumer ($ contextStub , $ chainExtensions , 0 );
503
+ $ queueConsumer ->bind (new NullQueue ('aQueueName ' ), $ processorMock );
504
+
505
+ $ queueConsumer ->consume ();
506
+ }
507
+
508
+ public function testShouldCallOnResultExtensionMethodWithExpectedContext ()
509
+ {
510
+ $ expectedMessage = $ this ->createMessageMock ();
511
+ $ messageConsumerStub = $ this ->createMessageConsumerStub ($ expectedMessage );
512
+
513
+ $ contextStub = $ this ->createPsrContextStub ($ messageConsumerStub );
514
+
515
+ $ processorMock = $ this ->createProcessorStub ();
516
+
517
+ $ extension = $ this ->createExtension ();
518
+ $ extension
519
+ ->expects ($ this ->once ())
520
+ ->method ('onResult ' )
521
+ ->with ($ this ->isInstanceOf (Context::class))
522
+ ->willReturnCallback (function (Context $ context ) use (
523
+ $ contextStub ,
524
+ $ messageConsumerStub ,
525
+ $ processorMock ,
526
+ $ expectedMessage
527
+ ) {
528
+ $ this ->assertSame ($ contextStub , $ context ->getPsrContext ());
529
+ $ this ->assertSame ($ messageConsumerStub , $ context ->getPsrConsumer ());
530
+ $ this ->assertSame ($ processorMock , $ context ->getPsrProcessor ());
531
+ $ this ->assertSame ($ expectedMessage , $ context ->getPsrMessage ());
532
+ $ this ->assertInstanceOf (NullLogger::class, $ context ->getLogger ());
533
+ $ this ->assertNull ($ context ->getException ());
534
+ $ this ->assertSame (Result::ACK , $ context ->getResult ());
535
+ $ this ->assertFalse ($ context ->isExecutionInterrupted ());
536
+ })
537
+ ;
538
+
539
+ $ chainExtensions = new ChainExtension ([$ extension , new BreakCycleExtension (1 )]);
540
+ $ queueConsumer = new QueueConsumer ($ contextStub , $ chainExtensions , 0 );
541
+ $ queueConsumer ->bind (new NullQueue ('aQueueName ' ), $ processorMock );
542
+
543
+ $ queueConsumer ->consume ();
544
+ }
545
+
546
+ public function testShouldCallOnPostReceivedExtensionMethodWithExpectedContext ()
547
+ {
548
+ $ expectedMessage = $ this ->createMessageMock ();
549
+ $ messageConsumerStub = $ this ->createMessageConsumerStub ($ expectedMessage );
550
+
551
+ $ contextStub = $ this ->createPsrContextStub ($ messageConsumerStub );
552
+
553
+ $ processorMock = $ this ->createProcessorStub ();
554
+
555
+ $ extension = $ this ->createExtension ();
500
556
$ extension
501
557
->expects ($ this ->once ())
502
558
->method ('onPostReceived ' )
@@ -722,6 +778,57 @@ public function testShouldAllowInterruptConsumingOnPreReceiveButProcessCurrentMe
722
778
$ queueConsumer ->consume ();
723
779
}
724
780
781
+ public function testShouldAllowInterruptConsumingOnResult ()
782
+ {
783
+ $ expectedMessage = $ this ->createMessageMock ();
784
+ $ messageConsumerStub = $ this ->createMessageConsumerStub ($ expectedMessage );
785
+
786
+ $ contextStub = $ this ->createPsrContextStub ($ messageConsumerStub );
787
+
788
+ $ processorMock = $ this ->createProcessorMock ();
789
+ $ processorMock
790
+ ->expects ($ this ->once ())
791
+ ->method ('process ' )
792
+ ->willReturn (Result::ACK )
793
+ ;
794
+
795
+ $ extension = $ this ->createExtension ();
796
+ $ extension
797
+ ->expects ($ this ->once ())
798
+ ->method ('onResult ' )
799
+ ->with ($ this ->isInstanceOf (Context::class))
800
+ ->willReturnCallback (function (Context $ context ) {
801
+ $ context ->setExecutionInterrupted (true );
802
+ })
803
+ ;
804
+ $ extension
805
+ ->expects ($ this ->atLeastOnce ())
806
+ ->method ('onInterrupted ' )
807
+ ->with ($ this ->isInstanceOf (Context::class))
808
+ ->willReturnCallback (function (Context $ context ) use (
809
+ $ contextStub ,
810
+ $ messageConsumerStub ,
811
+ $ processorMock ,
812
+ $ expectedMessage
813
+ ) {
814
+ $ this ->assertSame ($ contextStub , $ context ->getPsrContext ());
815
+ $ this ->assertSame ($ messageConsumerStub , $ context ->getPsrConsumer ());
816
+ $ this ->assertSame ($ processorMock , $ context ->getPsrProcessor ());
817
+ $ this ->assertSame ($ expectedMessage , $ context ->getPsrMessage ());
818
+ $ this ->assertInstanceOf (NullLogger::class, $ context ->getLogger ());
819
+ $ this ->assertNull ($ context ->getException ());
820
+ $ this ->assertSame (Result::ACK , $ context ->getResult ());
821
+ $ this ->assertTrue ($ context ->isExecutionInterrupted ());
822
+ })
823
+ ;
824
+
825
+ $ chainExtensions = new ChainExtension ([$ extension , new BreakCycleExtension (1 )]);
826
+ $ queueConsumer = new QueueConsumer ($ contextStub , $ chainExtensions , 0 );
827
+ $ queueConsumer ->bind (new NullQueue ('aQueueName ' ), $ processorMock );
828
+
829
+ $ queueConsumer ->consume ();
830
+ }
831
+
725
832
public function testShouldAllowInterruptConsumingOnPostReceive ()
726
833
{
727
834
$ expectedMessage = $ this ->createMessageMock ();
@@ -850,6 +957,11 @@ public function testShouldCallExtensionPassedOnRuntime()
850
957
->method ('onPreReceived ' )
851
958
->with ($ this ->isInstanceOf (Context::class))
852
959
;
960
+ $ runtimeExtension
961
+ ->expects ($ this ->once ())
962
+ ->method ('onResult ' )
963
+ ->with ($ this ->isInstanceOf (Context::class))
964
+ ;
853
965
$ runtimeExtension
854
966
->expects ($ this ->once ())
855
967
->method ('onPostReceived ' )
@@ -936,7 +1048,7 @@ public function testShouldCallEachQueueOneByOne()
936
1048
})
937
1049
;
938
1050
$ extension
939
- ->expects ($ this ->at (4 ))
1051
+ ->expects ($ this ->at (5 ))
940
1052
->method ('onBeforeReceive ' )
941
1053
->with ($ this ->isInstanceOf (Context::class))
942
1054
->willReturnCallback (function (Context $ context ) use ($ anotherProcessorMock , $ queue2 ) {
0 commit comments