You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: src/reference/asciidoc/aggregator.adoc
+10-10
Original file line number
Diff line number
Diff line change
@@ -127,7 +127,7 @@ See <<./splitter.adoc#splitter,Splitter>> for more information.
127
127
128
128
[[agg-message-collection]]
129
129
IMPORTANT: The `SimpleMessageGroup.getMessages()` method returns an `unmodifiableCollection`.
130
-
Therefore, if your aggregating POJO method has a `Collection<Message>` parameter, the argument passed in is exactly that `Collection` instance and, when you use a `SimpleMessageStore` for the aggregator, that original `Collection<Message>` is cleared after releasing the group.
130
+
Therefore, if an aggregating POJO method has a `Collection<Message>` parameter, the argument passed in is exactly that `Collection` instance and, when you use a `SimpleMessageStore` for the aggregator, that original `Collection<Message>` is cleared after releasing the group.
131
131
Consequently, the `Collection<Message>` variable in the POJO is cleared too, if it is passed out of the aggregator.
132
132
If you wish to simply release that collection as-is for further processing, you must build a new `Collection` (for example, `new ArrayList<Message>(messages)`).
133
133
Starting with version 4.3, the framework no longer copies the messages to a new collection, to avoid undesired extra object creation.
@@ -195,7 +195,7 @@ public class MyReleaseStrategy {
195
195
196
196
Based on the signatures in the preceding two examples, the POJO-based release strategy is passed a `Collection` of not-yet-released messages (if you need access to the whole `Message`) or a `Collection` of payload objects (if the type parameter is anything other than `Message`).
197
197
This satisfies the majority of use cases.
198
-
However if, for some reason, you need to access the full `MessageGroup`, you should provide an implementation of the `ReleaseStrategy` interface.
198
+
However, if, for some reason, you need to access the full `MessageGroup`, you should provide an implementation of the `ReleaseStrategy` interface.
199
199
200
200
[WARNING]
201
201
=====
@@ -218,7 +218,7 @@ You can release partial sequences by using a `MessageGroupStoreReaper` together
218
218
IMPORTANT: To facilitate discarding of late-arriving messages, the aggregator must maintain state about the group after it has been released.
219
219
This can eventually cause out-of-memory conditions.
220
220
To avoid such situations, you should consider configuring a `MessageGroupStoreReaper` to remove the group metadata.
221
-
The expiry parameters should be set to expire groups once a point has been reach after after which late messages are not expected to arrive.
221
+
The expiry parameters should be set to expire groups once a point has been reach after which late messages are not expected to arrive.
222
222
For information about configuring a reaper, see <<reaper>>.
223
223
224
224
Spring Integration provides an implementation for `ReleaseStrategy`: `SimpleSequenceSizeReleaseStrategy`.
@@ -422,7 +422,7 @@ See <<./message-store.adoc#message-store,Message Store>> for more information.
422
422
Optional.
423
423
<8> Indicates that expired messages should be aggregated and sent to the 'output-channel' or 'replyChannel' once their containing `MessageGroup` is expired (see https://docs.spring.io/spring-integration/api/org/springframework/integration/store/MessageGroupStore.html#expireMessageGroups-long[`MessageGroupStore.expireMessageGroups(long)`]).
424
424
One way of expiring a `MessageGroup` is by configuring a `MessageGroupStoreReaper`.
425
-
However you can alternatively expire `MessageGroup` by calling `MessageGroupStore.expireMessageGroups(timeout)`.
425
+
However, you can alternatively expire `MessageGroup` by calling `MessageGroupStore.expireMessageGroups(timeout)`.
426
426
You can accomplish that through a Control Bus operation or, if you have a reference to the `MessageGroupStore` instance, by invoking `expireMessageGroups(timeout)`.
427
427
Otherwise, by itself, this attribute does nothing.
428
428
It serves only as an indicator of whether to discard or send to the output or reply channel any messages that are still in the `MessageGroup` that is about to be expired.
@@ -433,7 +433,7 @@ Defaults to `-1`, which results in blocking indefinitely.
433
433
It is applied only if the output channel has some 'sending' limitations, such as a `QueueChannel` with a fixed 'capacity'.
434
434
In this case, a `MessageDeliveryException` is thrown.
435
435
For `AbstractSubscribableChannel` implementations, the `send-timeout` is ignored .
436
-
For `group-timeout(-expression)`, the `MessageDeliveryException` from the scheduled expire task leads this task to be rescheduled.
436
+
For `group-timeout(-expression)`, the `MessageDeliveryException` from the scheduled expiring task leads this task to be rescheduled.
437
437
Optional.
438
438
<10> A reference to a bean that implements the message correlation (grouping) algorithm.
439
439
The bean can be an implementation of the `CorrelationStrategy` interface or a POJO.
@@ -549,7 +549,7 @@ Such a periodic purge functionality is useful when a message store is needed to
549
549
In most cases this happens after an application restart, when using a persistent message group store.
550
550
The functionality is similar to the `MessageGroupStoreReaper` with a scheduled task, but provides a convenient way to deal with old groups within specific components, when using group timeout instead of a reaper.
551
551
The `MessageGroupStore` must be provided exclusively for the current correlation endpoint.
552
-
Otherwise one aggregator may purge groups from another.
552
+
Otherwise, one aggregator may purge groups from another.
553
553
With the aggregator, groups expired using this technique will either be discarded or released as a partial group, depending on the `expireGroupsUponCompletion` property.
554
554
=====
555
555
@@ -920,11 +920,11 @@ In version 5.2, the `FluxAggregatorMessageHandler` component has been introduced
920
920
It is based on the Project Reactor `Flux.groupBy()` and `Flux.window()` operators.
921
921
The incoming messages are emitted into the `FluxSink` initiated by the `Flux.create()` in the constructor of this component.
922
922
If the `outputChannel` is not provided or it is not an instance of `ReactiveStreamsSubscribableChannel`, the subscription to the main `Flux` is done from the `Lifecycle.start()` implementation.
923
-
Otherwise it is postponed to the subscription done by the `ReactiveStreamsSubscribableChannel` implementation.
923
+
Otherwise, it is postponed to the subscription done by the `ReactiveStreamsSubscribableChannel` implementation.
924
924
The messages are grouped by the `Flux.groupBy()` using a `CorrelationStrategy` for the group key.
925
925
By default, the `IntegrationMessageHeaderAccessor.CORRELATION_ID` header of the message is consulted.
926
926
927
-
By default every closed window is released as a `Flux` in payload of a message to produce.
927
+
By default, every closed window is released as a `Flux` in payload of a message to produce.
928
928
This message contains all the headers from the first message in the window.
929
929
This `Flux` in the output message payload must be subscribed and processed downstream.
930
930
Such a logic can be customized (or superseded) by the `setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)` configuration option of the `FluxAggregatorMessageHandler`.
@@ -948,8 +948,8 @@ There are several options in the `FluxAggregatorMessageHandler` to select an app
948
948
See its JavaDocs for more information.
949
949
Has a precedence over all other window options.
950
950
* `setWindowSize(int)` and `setWindowSizeFunction(Function<Message<?>, Integer>)` - is propagated to the `Flux.window(int)` or `windowTimeout(int, Duration)`.
951
-
By default a window size is calculated from the first message in group and its `IntegrationMessageHeaderAccessor.SEQUENCE_SIZE` header.
952
-
* `setWindowTimespan(Duration)` - is propagated to the `Flux.window(Duration)` or `windowTimeout(int, Duration)` depending in the window size configuration.
951
+
By default, a window size is calculated from the first message in group and its `IntegrationMessageHeaderAccessor.SEQUENCE_SIZE` header.
952
+
* `setWindowTimespan(Duration)` - is propagated to the `Flux.window(Duration)` or `windowTimeout(int, Duration)` depending on the window size configuration.
953
953
* `setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)` - a function to apply a transformation into the grouped fluxes for any custom window operation not covered by the exposed options.
954
954
955
955
Since this component is a `MessageHandler` implementation it can simply be used as a `@Bean` definition together with a `@ServiceActivator` messaging annotation.
Copy file name to clipboardExpand all lines: src/reference/asciidoc/amqp.adoc
+4-5
Original file line number
Diff line number
Diff line change
@@ -599,7 +599,7 @@ Default none (nacks will not be generated).
599
599
This requires a `RabbitTemplate` configured for confirms as well as a `confirm-correlation-expression`.
600
600
The thread will block for up to `confirm-timeout` (or 5 seconds by default).
601
601
If a timeout occurs, a `MessageTimeoutException` will be thrown.
602
-
If returns are enabled and a message is returned, or any other exception occurs while awaiting the confirm, a `MessageHandlingException` will be thrown, with an appropriate message.
602
+
If returns are enabled and a message is returned, or any other exception occurs while awaiting the confirmation, a `MessageHandlingException` will be thrown, with an appropriate message.
603
603
<15> The channel to which returned messages are sent.
604
604
When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter.
605
605
When there is no `ErrorMessageStrategy` configured, the message is constructed from the data received from AMQP, with the following additional headers: `amqp_returnReplyCode`, `amqp_returnReplyText`, `amqp_returnExchange`, `amqp_returnRoutingKey`.
@@ -969,7 +969,7 @@ See also <<./service-activator.adoc#async-service-activator,Asynchronous Service
969
969
.RabbitTemplate
970
970
====
971
971
When you use confirmations and returns, we recommend that the `RabbitTemplate` wired into the `AsyncRabbitTemplate` be dedicated.
972
-
Otherwise, unexpected side-effects may be encountered.
972
+
Otherwise, unexpected sideeffects may be encountered.
973
973
====
974
974
975
975
[[alternative-confirms-returns]]
@@ -1373,7 +1373,7 @@ public IntegrationFlow flow(RabbitTemplate template) {
1373
1373
Suppose we send messages `A`, `B` and `C` to the gateway.
1374
1374
While it is likely that messages `A`, `B`, `C` are sent in order, there is no guarantee.
1375
1375
This is because the template "`borrows`" a channel from the cache for each send operation, and there is no guarantee that the same channel is used for each message.
1376
-
One solution is to start a transaction before the splitter, but transactions are expensive in RabbitMQ and can reduce performance several hundredfold.
1376
+
One solution is to start a transaction before the splitter, but transactions are expensive in RabbitMQ and can reduce performance several hundred-fold.
1377
1377
1378
1378
To solve this problem in a more efficient manner, starting with version 5.1, Spring Integration provides the `BoundRabbitChannelAdvice` which is a `HandleMessageAdvice`.
1379
1379
See <<./handler-advice.adoc#handle-message-advice,Handling Message Advice>>.
@@ -1417,5 +1417,4 @@ In return, that message is retrieved by Spring Integration and printed to the co
1417
1417
1418
1418
The following image illustrates the basic set of Spring Integration components used in this sample.
Copy file name to clipboardExpand all lines: src/reference/asciidoc/bridge.adoc
+1-1
Original file line number
Diff line number
Diff line change
@@ -6,7 +6,7 @@ For example, you may want to connect a `PollableChannel` to a `SubscribableChann
6
6
Instead, the messaging bridge provides the polling configuration.
7
7
8
8
By providing an intermediary poller between two channels, you can use a messaging bridge to throttle inbound messages.
9
-
The poller's trigger determines the rate at which messages arrive on the second channel, and the poller's `maxMessagesPerPoll` property enforces a limit on the throughput.
9
+
The poller's trigger determines the rate at which messages arrive at the second channel, and the poller's `maxMessagesPerPoll` property enforces a limit on the throughput.
10
10
11
11
Another valid use for a messaging bridge is to connect two different systems.
12
12
In such a scenario, Spring Integration's role is limited to making the connection between these systems and managing a poller, if necessary.
Copy file name to clipboardExpand all lines: src/reference/asciidoc/chain.adoc
+1-1
Original file line number
Diff line number
Diff line change
@@ -123,7 +123,7 @@ Its `componentName` is based on its position in the `<chain>`.
123
123
In this case, it is 'somethingChain$child#1'.
124
124
(The final element of the name is the order within the chain, beginning with '#0').
125
125
Note, this transformer is not registered as a bean within the application context, so it does not get a `beanName`.
126
-
However its `componentName` has a value that is useful for logging and other purposes.
126
+
However, its `componentName` has a value that is useful for logging and other purposes.
127
127
128
128
The `id` attribute for `<chain>` elements lets them be eligible for <<./jmx.adoc#jmx-mbean-exporter,JMX export>>, and they are trackable in the <<./message-history.adoc#message-history,message history>>.
129
129
You can access them from the `BeanFactory` by using the appropriate bean name, as discussed earlier.
0 commit comments