20
20
import java .util .concurrent .atomic .AtomicBoolean ;
21
21
22
22
/**
23
- * Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract.
23
+ * Implements the {@link Connector} contract and emit flags obtained from flagd
24
+ * sync gRPC contract.
24
25
*/
25
26
@ Slf4j
26
- @ SuppressFBWarnings (value = {"PREDICTABLE_RANDOM" , "EI_EXPOSE_REP" } ,
27
- justification = "Random is used to generate a variation & flag configurations require exposing" )
27
+ @ SuppressFBWarnings (value = { "PREDICTABLE_RANDOM" ,
28
+ "EI_EXPOSE_REP" }, justification = "Random is used to generate a variation & flag configurations require exposing" )
28
29
public class GrpcStreamConnector implements Connector {
29
30
private static final Random RANDOM = new Random ();
30
31
@@ -111,40 +112,47 @@ public void shutdown() throws InterruptedException {
111
112
* Contains blocking calls, to be used concurrently.
112
113
*/
113
114
static void observeEventStream (final BlockingQueue <StreamPayload > writeTo ,
114
- final AtomicBoolean shutdown ,
115
- final FlagSyncServiceStub serviceStub ,
116
- final SyncFlagsRequest request )
115
+ final AtomicBoolean shutdown ,
116
+ final FlagSyncServiceStub serviceStub ,
117
+ final SyncFlagsRequest request )
117
118
throws InterruptedException {
118
119
119
120
final BlockingQueue <GrpcResponseModel > streamReceiver = new LinkedBlockingQueue <>(QUEUE_SIZE );
120
121
int retryDelay = INIT_BACK_OFF ;
121
122
123
+ log .info ("Initializing sync stream observer" );
124
+
122
125
while (!shutdown .get ()) {
126
+ log .debug ("Initializing sync stream request" );
123
127
serviceStub .syncFlags (request , new GrpcStreamHandler (streamReceiver ));
124
128
125
129
while (!shutdown .get ()) {
126
130
final GrpcResponseModel response = streamReceiver .take ();
127
131
128
132
if (response .isComplete ()) {
129
- // The stream is complete. This is not considered as an error
133
+ log .warn ("Stream completed" );
134
+ // The stream is complete, this isn't really an error but we should try to
135
+ // reconnect
130
136
break ;
131
137
}
132
138
133
139
if (response .getError () != null ) {
134
- log .warn (String .format ("Error from grpc connection, retrying in %dms" , retryDelay ),
140
+ log .error (String .format ("Error from grpc connection, retrying in %dms" , retryDelay ),
135
141
response .getError ());
136
142
137
143
if (!writeTo .offer (
138
144
new StreamPayload (StreamPayloadType .ERROR , "Error from stream connection, retrying" ))) {
139
- log .warn ("Failed to convey ERROR satus, queue is full" );
145
+ log .error ("Failed to convey ERROR satus, queue is full" );
140
146
}
141
147
break ;
142
148
}
143
149
144
150
final SyncFlagsResponse flagsResponse = response .getSyncFlagsResponse ();
151
+ String data = flagsResponse .getFlagConfiguration ();
152
+ log .debug ("Got stream response: " + data );
145
153
if (!writeTo .offer (
146
- new StreamPayload (StreamPayloadType .DATA , flagsResponse . getFlagConfiguration () ))) {
147
- log .warn ("Stream writing failed" );
154
+ new StreamPayload (StreamPayloadType .DATA , data ))) {
155
+ log .error ("Stream writing failed" );
148
156
}
149
157
150
158
// reset retry delay if we succeeded in a retry attempt
@@ -158,6 +166,7 @@ static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
158
166
}
159
167
160
168
// busy wait till next attempt
169
+ log .warn (String .format ("Stream failed, retrying in %dms" , retryDelay ));
161
170
Thread .sleep (retryDelay + RANDOM .nextInt (INIT_BACK_OFF ));
162
171
163
172
if (retryDelay < MAX_BACK_OFF ) {
0 commit comments