Skip to content

Commit 6318ff4

Browse files
committed
Improve PR #3819
- Move filtering into separate step of the pipeline - Change method name to 'retain' to make purpose clearer - Add logging
1 parent 895a8e4 commit 6318ff4

File tree

5 files changed

+110
-31
lines changed

5 files changed

+110
-31
lines changed

modules/flowable-event-registry-api/src/main/java/org/flowable/eventregistry/api/InboundEventFilter.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,13 @@
2525
public interface InboundEventFilter<T> {
2626

2727
/**
28-
* Returns true, if the event should be further processed or false, if the event can be ignored and will not be processed
29-
* any further and the pipeline will stop afterwards.
28+
* Returns true, if the event should be further processed
29+
* or false if the event should be ignored and will not be processed any further.
3030
*
31-
* @param payload the payload of the event
31+
* @param event the inbound event information
3232
* @return true, if the event should continue to be processed, false, if the pipeline will ignore the event and stop any
3333
* further processing
3434
*/
35-
boolean filter(T payload);
35+
boolean retain(FlowableEventInfo<T> event);
3636

37-
default boolean filter(FlowableEventInfo<T> event) {
38-
return filter(event.getPayload());
39-
}
4037
}

modules/flowable-event-registry-api/src/main/java/org/flowable/eventregistry/api/model/InboundChannelModelBuilder.java

+43-7
Original file line numberDiff line numberDiff line change
@@ -207,28 +207,64 @@ interface InboundEventProcessingPipelineBuilder {
207207
/**
208208
* Deserializes the event to JSON.
209209
*/
210-
InboundEventKeyJsonDetectorBuilder jsonDeserializer();
210+
InboundEventFilterJsonBuilder jsonDeserializer();
211211

212212
/**
213213
* Deserializes the event to XML.
214214
*/
215-
InboundEventKeyXmlDetectorBuilder xmlDeserializer();
215+
InboundEventFilterXmlBuilder xmlDeserializer();
216+
217+
/**
218+
* Uses a delegate expression to deserialize the event.
219+
*/
220+
InboundEventFilterBuilder delegateExpressionDeserializer(String delegateExpression);
221+
222+
/**
223+
* Uses a delegate expression to determine the custom {@link InboundEventProcessingPipeline} instance.
224+
*/
225+
InboundChannelModelBuilder eventProcessingPipeline(String delegateExpression);
226+
227+
}
228+
229+
/**
230+
* Builder for the filtering out inbound JSON events.
231+
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
232+
*/
233+
interface InboundEventFilterJsonBuilder extends InboundEventKeyJsonDetectorBuilder { // Extends because using filtering is optional
216234

217235
/**
218236
* Uses a delegate expression to filter the events before further processing.
219237
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
220238
*/
221-
InboundEventProcessingPipelineBuilder delegateExpressionEventFilter(String delegateExpression);
239+
InboundEventKeyJsonDetectorBuilder delegateExpressionEventFilter(String delegateExpression);
240+
241+
}
242+
243+
/**
244+
* Builder for the filtering out inbound XML events.
245+
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
246+
*/
247+
interface InboundEventFilterXmlBuilder extends InboundEventKeyXmlDetectorBuilder { // Extends because using filtering is optional
222248

223249
/**
224-
* Uses a delegate expression to deserialize the event.
250+
* Uses a delegate expression to filter the events before further processing.
251+
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
225252
*/
226-
InboundEventKeyDetectorBuilder delegateExpressionDeserializer(String delegateExpression);
253+
InboundEventKeyXmlDetectorBuilder delegateExpressionEventFilter(String delegateExpression);
254+
255+
}
256+
257+
/**
258+
* Builder for the filtering out inbound events.
259+
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
260+
*/
261+
interface InboundEventFilterBuilder extends InboundEventKeyDetectorBuilder { // Extends because using filtering is optional
227262

228263
/**
229-
* Uses a delegate expression to determine the custom {@link InboundEventProcessingPipeline} instance.
264+
* Uses a delegate expression to filter the events before further processing.
265+
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
230266
*/
231-
InboundChannelModelBuilder eventProcessingPipeline(String delegateExpression);
267+
InboundEventKeyDetectorBuilder delegateExpressionEventFilter(String delegateExpression);
232268

233269
}
234270

modules/flowable-event-registry/src/main/java/org/flowable/eventregistry/impl/model/InboundChannelDefinitionBuilderImpl.java

+54-12
Original file line numberDiff line numberDiff line change
@@ -320,47 +320,89 @@ public InboundEventProcessingPipelineBuilderImpl(InboundChannelModel channelMode
320320
}
321321

322322
@Override
323-
public InboundEventKeyJsonDetectorBuilder jsonDeserializer() {
323+
public InboundEventFilterJsonBuilder jsonDeserializer() {
324324
channelModel.setDeserializerType("json");
325325

326326
InboundEventProcessingPipelineBuilderImpl<JsonNode> jsonPipelineBuilder
327327
= new InboundEventProcessingPipelineBuilderImpl<>(channelModel, eventRepository, channelDefinitionBuilder);
328328
this.channelDefinitionBuilder.inboundEventProcessingPipelineBuilder = jsonPipelineBuilder;
329329

330-
return new InboundEventKeyJsonDetectorBuilderImpl(jsonPipelineBuilder);
330+
return new InboundEventFilterJsonBuilderImpl(jsonPipelineBuilder);
331331
}
332332

333333
@Override
334-
public InboundEventKeyXmlDetectorBuilder xmlDeserializer() {
334+
public InboundEventFilterXmlBuilder xmlDeserializer() {
335335
channelModel.setDeserializerType("xml");
336336
InboundEventProcessingPipelineBuilderImpl<Document> xmlPipelineBuilder
337337
= new InboundEventProcessingPipelineBuilderImpl<>(channelModel, eventRepository, channelDefinitionBuilder);
338338
this.channelDefinitionBuilder.inboundEventProcessingPipelineBuilder = xmlPipelineBuilder;
339339

340-
return new InboundEventKeyXmlDetectorBuilderImpl(xmlPipelineBuilder);
340+
return new InboundEventFilterXmlBuilderImpl(xmlPipelineBuilder);
341341
}
342342

343343
@Override
344-
public InboundEventProcessingPipelineBuilder delegateExpressionEventFilter(String delegateExpression) {
345-
channelModel.setEventFilterDelegateExpression(delegateExpression);
346-
return this;
347-
}
348-
349-
@Override
350-
public InboundEventKeyDetectorBuilder delegateExpressionDeserializer(String delegateExpression) {
344+
public InboundEventFilterBuilder delegateExpressionDeserializer(String delegateExpression) {
351345
channelModel.setDeserializerType("expression");
352346
channelModel.setDeserializerDelegateExpression(delegateExpression);
353347
InboundEventProcessingPipelineBuilderImpl customPipelineBuilder = new InboundEventProcessingPipelineBuilderImpl<>(channelModel,
354348
eventRepository, channelDefinitionBuilder);
355349
this.channelDefinitionBuilder.inboundEventProcessingPipelineBuilder = customPipelineBuilder;
356-
return new InboundEventDefinitionKeyDetectorBuilderImpl(customPipelineBuilder);
350+
return new InboundEventFilterBuilderImpl(customPipelineBuilder);
357351
}
358352

359353
@Override
360354
public InboundChannelModelBuilder eventProcessingPipeline(String delegateExpression) {
361355
this.channelModel.setPipelineDelegateExpression(delegateExpression);
362356
return channelDefinitionBuilder;
363357
}
358+
}
359+
360+
public static class InboundEventFilterJsonBuilderImpl extends InboundEventKeyJsonDetectorBuilderImpl implements InboundEventFilterJsonBuilder {
361+
362+
public InboundEventFilterJsonBuilderImpl(InboundEventProcessingPipelineBuilderImpl<JsonNode> inboundEventProcessingPipelineBuilder) {
363+
super(inboundEventProcessingPipelineBuilder);
364+
}
365+
366+
@Override
367+
public InboundEventKeyJsonDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
368+
inboundEventProcessingPipelineBuilder.channelModel.setEventFilterDelegateExpression(delegateExpression);
369+
return new InboundEventKeyJsonDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
370+
}
371+
372+
}
373+
374+
public static class InboundEventFilterXmlBuilderImpl extends InboundEventKeyXmlDetectorBuilderImpl implements InboundEventFilterXmlBuilder {
375+
376+
public InboundEventFilterXmlBuilderImpl(InboundEventProcessingPipelineBuilderImpl<Document> inboundEventProcessingPipelineBuilder) {
377+
super(inboundEventProcessingPipelineBuilder);
378+
}
379+
380+
@Override
381+
public InboundEventKeyXmlDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
382+
inboundEventProcessingPipelineBuilder.channelModel.setEventFilterDelegateExpression(delegateExpression);
383+
return new InboundEventKeyXmlDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
384+
}
385+
386+
}
387+
388+
public static class InboundEventFilterBuilderImpl implements InboundEventFilterBuilder {
389+
390+
protected InboundEventProcessingPipelineBuilderImpl inboundEventProcessingPipelineBuilder;
391+
392+
public InboundEventFilterBuilderImpl(InboundEventProcessingPipelineBuilderImpl inboundEventProcessingPipelineBuilder) {
393+
this.inboundEventProcessingPipelineBuilder = inboundEventProcessingPipelineBuilder;
394+
}
395+
396+
@Override
397+
public InboundEventKeyDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
398+
inboundEventProcessingPipelineBuilder.channelModel.setEventFilterDelegateExpression(delegateExpression);
399+
return new InboundEventDefinitionKeyDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
400+
}
401+
402+
@Override
403+
public InboundEventTenantDetectorBuilder delegateExpressionKeyDetector(String delegateExpression) {
404+
return null;
405+
}
364406

365407
}
366408

modules/flowable-event-registry/src/main/java/org/flowable/eventregistry/impl/pipeline/DefaultInboundEventProcessingPipeline.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,12 @@ public Collection<EventRegistryEvent> run(InboundChannelModel inboundChannel, In
8181

8282
FlowableEventInfo<T> event = new FlowableEventInfoImpl<>(inboundEvent, deserializedBody, inboundChannel);
8383

84-
// if there is a custom filter in place, invoke it to filter the event for further processing or to abort the pipeline
84+
// if there is a custom filter in place, invoke it to retain only the events that are wanted or to abort the pipeline
8585
if (inboundEventFilter != null) {
86-
if (!inboundEventFilter.filter(event)) {
86+
if (!inboundEventFilter.retain(event)) {
87+
if (debugLoggingEnabled) {
88+
logger.debug("Inbound event {} on inbound {} channel {} was filtered out.", inboundEvent, inboundChannel.getChannelType(), inboundChannel.getKey());
89+
}
8790
return Collections.emptyList();
8891
}
8992
}

modules/flowable-event-registry/src/test/java/org/flowable/eventregistry/test/CustomEventProcessingPipelineTest.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.flowable.eventregistry.api.EventRegistry;
2525
import org.flowable.eventregistry.api.EventRegistryEvent;
26+
import org.flowable.eventregistry.api.FlowableEventInfo;
2627
import org.flowable.eventregistry.api.InboundEvent;
2728
import org.flowable.eventregistry.api.InboundEventChannelAdapter;
2829
import org.flowable.eventregistry.api.InboundEventDeserializer;
@@ -94,8 +95,8 @@ public void testCustomInboundPipelineComponentsInvoked() {
9495
.key("customTestChannel")
9596
.resourceName("customTest.channel")
9697
.channelAdapter("${testInboundChannelAdapter}")
97-
.delegateExpressionEventFilter("${testInboundEventFilter}")
9898
.delegateExpressionDeserializer("${testInboundEventDeserializer}")
99+
.delegateExpressionEventFilter("${testInboundEventFilter}")
99100
.delegateExpressionKeyDetector("${testInboundEventKeyDetector}")
100101
.delegateExpressionTenantDetector("${testInboundEventTenantDetector}")
101102
.payloadExtractor("${testInboundEventPayloadExtractor}")
@@ -143,8 +144,8 @@ public void testCustomInboundEventDroppingFilter() {
143144
.key("customTestChannel")
144145
.resourceName("customTest.channel")
145146
.channelAdapter("${testInboundChannelAdapter}")
146-
.delegateExpressionEventFilter("${testInboundEventFilter}")
147147
.delegateExpressionDeserializer("${testInboundEventDeserializer}")
148+
.delegateExpressionEventFilter("${testInboundEventFilter}")
148149
.delegateExpressionKeyDetector("${testInboundEventKeyDetector}")
149150
.delegateExpressionTenantDetector("${testInboundEventTenantDetector}")
150151
.payloadExtractor("${testInboundEventPayloadExtractor}")
@@ -408,7 +409,7 @@ private TestInboundEventFilter(boolean filter) {
408409
}
409410

410411
@Override
411-
public boolean filter(String payload) {
412+
public boolean retain(FlowableEventInfo<String> event) {
412413
counter.incrementAndGet();
413414
return filter;
414415
}

0 commit comments

Comments
 (0)