45
45
@ NotThreadSafe
46
46
public class MqttIncomingPublishFlows {
47
47
48
- private final @ NotNull MqttSubscriptionFlows subscriptionFlows ;
48
+ private final @ NotNull MqttSubscribedPublishFlows subscribedFlows ;
49
49
private final @ Nullable HandleList <MqttGlobalIncomingPublishFlow > @ NotNull [] globalFlows ;
50
50
51
51
@ Inject
52
- MqttIncomingPublishFlows (final @ NotNull MqttSubscriptionFlows subscriptionFlows ) {
53
- this .subscriptionFlows = subscriptionFlows ;
52
+ MqttIncomingPublishFlows (final @ NotNull MqttSubscribedPublishFlows subscribedFlows ) {
53
+ this .subscribedFlows = subscribedFlows ;
54
54
//noinspection unchecked
55
55
globalFlows = new HandleList [MqttGlobalPublishFilter .values ().length ];
56
56
}
@@ -62,7 +62,7 @@ public void subscribe(
62
62
final ImmutableList <MqttSubscription > subscriptions = subscribe .getSubscriptions ();
63
63
//noinspection ForLoopReplaceableByForEach
64
64
for (int i = 0 ; i < subscriptions .size (); i ++) {
65
- subscriptionFlows .subscribe (subscriptions .get (i ), subscriptionIdentifier , flow );
65
+ subscribedFlows .subscribe (subscriptions .get (i ), subscriptionIdentifier , flow );
66
66
}
67
67
}
68
68
@@ -73,7 +73,7 @@ public void subAck(
73
73
final ImmutableList <MqttSubscription > subscriptions = subscribe .getSubscriptions ();
74
74
final boolean countNotMatching = subscriptions .size () > reasonCodes .size ();
75
75
for (int i = 0 ; i < subscriptions .size (); i ++) {
76
- subscriptionFlows .suback (subscriptions .get (i ).getTopicFilter (), subscriptionIdentifier ,
76
+ subscribedFlows .suback (subscriptions .get (i ).getTopicFilter (), subscriptionIdentifier ,
77
77
countNotMatching || reasonCodes .get (i ).isError ());
78
78
}
79
79
}
@@ -86,13 +86,13 @@ public void unsubscribe(
86
86
final boolean allSuccess = reasonCodes == Mqtt3UnsubAckView .REASON_CODES_ALL_SUCCESS ;
87
87
for (int i = 0 ; i < topicFilters .size (); i ++) {
88
88
if (allSuccess || !reasonCodes .get (i ).isError ()) {
89
- subscriptionFlows .unsubscribe (topicFilters .get (i ));
89
+ subscribedFlows .unsubscribe (topicFilters .get (i ));
90
90
}
91
91
}
92
92
}
93
93
94
94
void cancel (final @ NotNull MqttSubscribedPublishFlow flow ) {
95
- subscriptionFlows .cancel (flow );
95
+ subscribedFlows .cancel (flow );
96
96
}
97
97
98
98
public void subscribeGlobal (final @ NotNull MqttGlobalIncomingPublishFlow flow ) {
@@ -126,7 +126,7 @@ void cancelGlobal(final @NotNull MqttGlobalIncomingPublishFlow flow) {
126
126
void findMatching (
127
127
final @ NotNull MqttStatefulPublish publish , final @ NotNull MqttMatchingPublishFlows matchingFlows ) {
128
128
129
- subscriptionFlows .findMatching (publish .stateless ().getTopic (), matchingFlows );
129
+ subscribedFlows .findMatching (publish .stateless ().getTopic (), matchingFlows );
130
130
if (matchingFlows .subscriptionFound ) {
131
131
add (matchingFlows , globalFlows [MqttGlobalPublishFilter .SUBSCRIBED .ordinal ()]);
132
132
} else {
@@ -150,7 +150,7 @@ private static void add(
150
150
}
151
151
152
152
public void clear (final @ NotNull Throwable cause ) {
153
- subscriptionFlows .clear (cause );
153
+ subscribedFlows .clear (cause );
154
154
for (int i = 0 ; i < globalFlows .length ; i ++) {
155
155
final HandleList <MqttGlobalIncomingPublishFlow > globalFlow = globalFlows [i ];
156
156
if (globalFlow != null ) {
@@ -163,6 +163,6 @@ public void clear(final @NotNull Throwable cause) {
163
163
}
164
164
165
165
public @ NotNull Map <@ NotNull Integer , @ NotNull List <@ NotNull MqttSubscription >> getSubscriptions () {
166
- return subscriptionFlows .getSubscriptions ();
166
+ return subscribedFlows .getSubscriptions ();
167
167
}
168
168
}
0 commit comments