Skip to content

chore: log tweaks, retry cancels, add options.toBuidler #1276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* FlagdOptions is a builder to build flagd provider options.
*/
@Builder
@Builder(toBuilder = true)
@Getter
@SuppressWarnings("PMD.TooManyStaticImports")
public class FlagdOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void waitForInitialization(long deadline) {
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
if (now >= end) {
throw new GeneralError(String.format(
"Deadline exceeded. Condition did not complete within the %d ms deadline", deadline));
"Initialization timeout exceeded; did not complete within the %d ms deadline.", deadline));
}
long remaining = end - now;
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ public class ChannelBuilder {
"retryableStatusCodes",
Arrays.asList(
/*
* All codes are retryable except OK, CANCELLED and DEADLINE_EXCEEDED since
* All codes are retryable except OK and DEADLINE_EXCEEDED since
* any others not listed here cause a very tight loop of retries.
* CANCELLED is not retryable because it is a client-side termination.
* DEADLINE_EXCEEDED is typically a result of a client specified deadline,
* and definitionally should not result in a tight loop (it's a timeout).
*/
Code.CANCELLED.toString(),
Code.UNKNOWN.toString(),
Code.INVALID_ARGUMENT.toString(),
Code.NOT_FOUND.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ChannelConnector(final FlagdOptions options, final Consumer<FlagdProvider
* @throws Exception if the channel does not reach the desired state within the deadline
*/
public void initialize() throws Exception {
log.info("Initializing GRPC connection...");
log.info("Initializing GRPC connection.");
monitorChannelState(ConnectivityState.READY);
}

Expand All @@ -80,7 +80,7 @@ public void initialize() throws Exception {
* @throws InterruptedException if interrupted while waiting for termination
*/
public void shutdown() throws InterruptedException {
log.info("Shutting down GRPC connection...");
log.info("Shutting down GRPC connection.");

if (!channel.isShutdown()) {
channel.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class QueueingStreamObserver<T> implements StreamObserver<T> {

public QueueingStreamObserver(final BlockingQueue<StreamResponseModel<T>> queue) {
blockingQueue = queue;
queue.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void observeSyncStream() throws InterruptedException {
// create a context which exists to track and cancel the stream
try (CancellableContext context = Context.current().withCancellation()) {

restart(); // start the stream within the context
restart(); // start the stream with the context

// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
if (!syncMetadataDisabled) {
Expand All @@ -150,16 +150,18 @@ private void observeSyncStream() throws InterruptedException {
while (!shutdown.get() && !Context.current().isCancelled()) {
final StreamResponseModel<SyncFlagsResponse> taken = incomingQueue.take();
if (taken.isComplete()) {
log.debug("Sync stream completed, will reconnect");
log.debug("Sync stream completed, will restart");
// The stream is complete, we still try to reconnect
break;
}

Throwable streamException = taken.getError();
if (streamException != null) {
log.debug("Exception in GRPC connection, streamException {}, will reconnect", streamException);
log.debug("Exception in stream RPC, streamException {}, will restart", streamException);
if (!outgoingQueue.offer(new QueuePayload(
QueuePayloadType.ERROR, "Error from stream or metadata", metadataResponse))) {
QueuePayloadType.ERROR,
String.format("Error from stream: %s", streamException.getMessage()),
metadataResponse))) {
log.error("Failed to convey ERROR status, queue is full");
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class SyncStreamQueueSourceTest {
@BeforeEach
public void init() throws Exception {
blockingStub = mock(FlagSyncServiceBlockingStub.class);
when(blockingStub.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStub);
when(blockingStub.getMetadata(any())).thenReturn(GetMetadataResponse.getDefaultInstance());

mockConnector = mock(ChannelConnector.class);
Expand Down
Loading