Skip to content

Commit 4fa5115

Browse files
committed
Handle HTTP binding error. (#1024)
Signed-off-by: Artur Souza <[email protected]> Update binding http IT Signed-off-by: Artur Souza <[email protected]>
1 parent 9ee2633 commit 4fa5115

File tree

9 files changed

+432
-44
lines changed

9 files changed

+432
-44
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: github-http-binding-404
5+
spec:
6+
type: bindings.http
7+
version: v1
8+
metadata:
9+
- name: url
10+
value: https://api.github.com/unknown_path
11+
scopes:
12+
- bindingit-httpoutputbinding-exception
13+
---
14+
apiVersion: dapr.io/v1alpha1
15+
kind: Component
16+
metadata:
17+
name: github-http-binding-404-success
18+
spec:
19+
type: bindings.http
20+
version: v1
21+
metadata:
22+
- name: url
23+
value: https://api.github.com/unknown_path
24+
- name: errorIfNot2XX
25+
value: "false"
26+
scopes:
27+
- bindingit-httpoutputbinding-ignore-error

sdk-tests/deploy/local-test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ services:
66
ZOOKEEPER_CLIENT_PORT: 2181
77
ZOOKEEPER_TICK_TIME: 2000
88
ports:
9-
- 22181:2181
9+
- 2181:2181
1010

1111
kafka:
1212
image: confluentinc/cp-kafka:7.4.4
1313
depends_on:
1414
- zookeeper
1515
ports:
16-
- 9092:9092
16+
- "9092:9092"
1717
environment:
1818
KAFKA_BROKER_ID: 1
1919
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 The Dapr Authors
2+
* Copyright 2024 The Dapr Authors
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
55
* You may obtain a copy of the License at
@@ -17,6 +17,7 @@
1717
import io.dapr.client.DaprClient;
1818
import io.dapr.client.DaprClientBuilder;
1919
import io.dapr.client.domain.HttpExtension;
20+
import io.dapr.exceptions.DaprException;
2021
import io.dapr.it.BaseIT;
2122
import io.dapr.it.DaprRun;
2223
import org.junit.jupiter.api.Test;
@@ -26,22 +27,58 @@
2627

2728
import static io.dapr.it.Retry.callWithRetry;
2829
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.assertTrue;
2931
import static org.junit.jupiter.api.Assertions.fail;
3032

3133
/**
3234
* Service for input and output binding example.
3335
*/
3436
public class BindingIT extends BaseIT {
3537

36-
private static final String BINDING_NAME = "sample123";
37-
38-
private static final String BINDING_OPERATION = "create";
39-
40-
public static class MyClass {
41-
public MyClass() {
38+
@Test
39+
public void httpOutputBindingError() throws Exception {
40+
startDaprApp(
41+
this.getClass().getSimpleName() + "-httpoutputbinding-exception",
42+
60000);
43+
try(DaprClient client = new DaprClientBuilder().build()) {
44+
// Validate error message
45+
callWithRetry(() -> {
46+
System.out.println("Checking exception handling for output binding ...");
47+
try {
48+
client.invokeBinding("github-http-binding-404", "get", "").block();
49+
fail("Should throw an exception");
50+
} catch (DaprException e) {
51+
assertEquals(404, e.getHttpStatusCode());
52+
// This HTTP binding did not set `errorIfNot2XX` to false in component metadata, so the error payload is not
53+
// consistent between HTTP and gRPC.
54+
assertTrue(new String(e.getPayload()).contains(
55+
"error invoking output binding github-http-binding-404: received status code 404"));
56+
}
57+
}, 10000);
4258
}
59+
}
4360

44-
public String message;
61+
@Test
62+
public void httpOutputBindingErrorIgnoredByComponent() throws Exception {
63+
startDaprApp(
64+
this.getClass().getSimpleName() + "-httpoutputbinding-ignore-error",
65+
60000);
66+
try(DaprClient client = new DaprClientBuilder().build()) {
67+
// Validate error message
68+
callWithRetry(() -> {
69+
System.out.println("Checking exception handling for output binding ...");
70+
try {
71+
client.invokeBinding("github-http-binding-404-success", "get", "").block();
72+
fail("Should throw an exception");
73+
} catch (DaprException e) {
74+
assertEquals(404, e.getHttpStatusCode());
75+
// The HTTP binding must set `errorIfNot2XX` to false in component metadata for the error payload to be
76+
// consistent between HTTP and gRPC.
77+
assertTrue(new String(e.getPayload()).contains("\"message\":\"Not Found\""));
78+
assertTrue(new String(e.getPayload()).contains("\"documentation_url\":\"https://docs.github.com/rest\""));
79+
}
80+
}, 10000);
81+
}
4582
}
4683

4784
@Test
@@ -53,11 +90,13 @@ public void inputOutputBinding() throws Exception {
5390
true,
5491
60000);
5592

93+
var bidingName = "sample123";
94+
5695
try(DaprClient client = new DaprClientBuilder().build()) {
5796
callWithRetry(() -> {
5897
System.out.println("Checking if input binding is up before publishing events ...");
5998
client.invokeBinding(
60-
BINDING_NAME, BINDING_OPERATION, "ping").block();
99+
bidingName, "create", "ping").block();
61100

62101
try {
63102
Thread.sleep(1000);
@@ -76,14 +115,14 @@ public void inputOutputBinding() throws Exception {
76115

77116
System.out.println("sending first message");
78117
client.invokeBinding(
79-
BINDING_NAME, BINDING_OPERATION, myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
118+
bidingName, "create", myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
80119

81120
// This is an example of sending a plain string. The input binding will receive
82121
// cat
83122
final String m = "cat";
84123
System.out.println("sending " + m);
85124
client.invokeBinding(
86-
BINDING_NAME, BINDING_OPERATION, m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
125+
bidingName, "create", m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
87126

88127
// Metadata is not used by Kafka component, so it is not possible to validate.
89128
callWithRetry(() -> {
@@ -115,4 +154,11 @@ public void inputOutputBinding() throws Exception {
115154
}, 8000);
116155
}
117156
}
157+
158+
public static class MyClass {
159+
public MyClass() {
160+
}
161+
162+
public String message;
163+
}
118164
}

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
5757
import io.dapr.client.resiliency.ResiliencyOptions;
5858
import io.dapr.exceptions.DaprException;
59+
import io.dapr.internal.exceptions.DaprHttpException;
5960
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
6061
import io.dapr.internal.resiliency.RetryPolicy;
6162
import io.dapr.internal.resiliency.TimeoutPolicy;
@@ -76,6 +77,7 @@
7677
import io.dapr.v1.DaprProtos.RegisteredComponents;
7778
import io.grpc.CallOptions;
7879
import io.grpc.Channel;
80+
import io.grpc.Metadata;
7981
import io.grpc.stub.AbstractStub;
8082
import io.grpc.stub.StreamObserver;
8183
import reactor.core.publisher.Flux;
@@ -99,6 +101,10 @@
99101
import java.util.function.Function;
100102
import java.util.stream.Collectors;
101103

104+
import static io.dapr.internal.exceptions.DaprHttpException.isSuccessfulHttpStatusCode;
105+
import static io.dapr.internal.exceptions.DaprHttpException.isValidHttpStatusCode;
106+
import static io.dapr.internal.exceptions.DaprHttpException.parseHttpStatusCode;
107+
102108
/**
103109
* Implementation of the Dapr client combining gRPC and HTTP (when applicable).
104110
*
@@ -486,12 +492,22 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
486492
}
487493
DaprProtos.InvokeBindingRequest envelope = builder.build();
488494

495+
Metadata responseMetadata = new Metadata();
489496
return Mono.deferContextual(
490497
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
491-
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
498+
responseMetadata,
499+
it -> intercept(context, asyncStub, m -> responseMetadata.merge(m)).invokeBinding(envelope, it)
492500
)
493501
).flatMap(
494502
it -> {
503+
int httpStatusCode =
504+
parseHttpStatusCode(it.getMetadataMap().getOrDefault("statusCode", ""));
505+
if (isValidHttpStatusCode(httpStatusCode) && !isSuccessfulHttpStatusCode(httpStatusCode)) {
506+
// Exception condition in a successful request.
507+
// This is useful to send an exception due to an error from the HTTP binding component.
508+
throw DaprException.propagate(new DaprHttpException(httpStatusCode, it.getData().toByteArray()));
509+
}
510+
495511
try {
496512
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
497513
} catch (IOException e) {
@@ -1201,17 +1217,39 @@ private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub clien
12011217
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context);
12021218
}
12031219

1220+
/**
1221+
* Populates GRPC client with interceptors for telemetry.
1222+
*
1223+
* @param context Reactor's context.
1224+
* @param client GRPC client for Dapr.
1225+
* @param metadataConsumer Consumer of gRPC metadata.
1226+
* @return Client after adding interceptors.
1227+
*/
1228+
private DaprGrpc.DaprStub intercept(
1229+
ContextView context, DaprGrpc.DaprStub client, Consumer<Metadata> metadataConsumer) {
1230+
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context, metadataConsumer);
1231+
}
1232+
12041233
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
1234+
return this.createMono(null, consumer);
1235+
}
1236+
1237+
private <T> Mono<T> createMono(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
12051238
return retryPolicy.apply(
1206-
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
1239+
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(
1240+
createStreamObserver(sink, metadata))).run()));
12071241
}
12081242

12091243
private <T> Flux<T> createFlux(Consumer<StreamObserver<T>> consumer) {
1244+
return this.createFlux(null, consumer);
1245+
}
1246+
1247+
private <T> Flux<T> createFlux(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
12101248
return retryPolicy.apply(
1211-
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
1249+
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink, metadata))).run()));
12121250
}
12131251

1214-
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
1252+
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink, Metadata grpcMetadata) {
12151253
return new StreamObserver<T>() {
12161254
@Override
12171255
public void onNext(T value) {
@@ -1220,7 +1258,7 @@ public void onNext(T value) {
12201258

12211259
@Override
12221260
public void onError(Throwable t) {
1223-
sink.error(DaprException.propagate(new ExecutionException(t)));
1261+
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
12241262
}
12251263

12261264
@Override
@@ -1230,7 +1268,7 @@ public void onCompleted() {
12301268
};
12311269
}
12321270

1233-
private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink) {
1271+
private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink, final Metadata grpcMetadata) {
12341272
return new StreamObserver<T>() {
12351273
@Override
12361274
public void onNext(T value) {
@@ -1239,7 +1277,7 @@ public void onNext(T value) {
12391277

12401278
@Override
12411279
public void onError(Throwable t) {
1242-
sink.error(DaprException.propagate(new ExecutionException(t)));
1280+
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
12431281
}
12441282

12451283
@Override

sdk/src/main/java/io/dapr/client/DaprHttp.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.dapr.config.Properties;
1919
import io.dapr.exceptions.DaprError;
2020
import io.dapr.exceptions.DaprException;
21+
import io.dapr.internal.exceptions.DaprHttpException;
2122
import io.dapr.utils.Version;
2223
import okhttp3.Call;
2324
import okhttp3.Callback;
@@ -381,17 +382,19 @@ public void onFailure(Call call, IOException e) {
381382

382383
@Override
383384
public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException {
384-
if (!response.isSuccessful()) {
385+
int httpStatusCode = parseHttpStatusCode(response.header("Metadata.statuscode"), response.code());
386+
if (!DaprHttpException.isSuccessfulHttpStatusCode(httpStatusCode)) {
385387
try {
386388
byte[] payload = getBodyBytesOrEmptyArray(response);
387389
DaprError error = parseDaprError(payload);
390+
388391
if (error != null) {
389-
future.completeExceptionally(new DaprException(error, payload, response.code()));
392+
future.completeExceptionally(new DaprException(error, payload, httpStatusCode));
390393
return;
391394
}
392395

393396
future.completeExceptionally(
394-
new DaprException("UNKNOWN", "", payload, response.code()));
397+
new DaprException("UNKNOWN", "", payload, httpStatusCode));
395398
return;
396399
} catch (DaprException e) {
397400
future.completeExceptionally(e);
@@ -404,8 +407,24 @@ public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) t
404407
response.headers().forEach(pair -> {
405408
mapHeaders.put(pair.getFirst(), pair.getSecond());
406409
});
407-
future.complete(new Response(result, mapHeaders, response.code()));
410+
future.complete(new Response(result, mapHeaders, httpStatusCode));
408411
}
409412
}
410413

414+
private static int parseHttpStatusCode(String headerValue, int defaultStatusCode) {
415+
if ((headerValue == null) || headerValue.isEmpty()) {
416+
return defaultStatusCode;
417+
}
418+
419+
// Metadata used to override status code with code received from HTTP binding.
420+
try {
421+
int httpStatusCode = Integer.parseInt(headerValue);
422+
if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) {
423+
return httpStatusCode;
424+
}
425+
return defaultStatusCode;
426+
} catch (NumberFormatException nfe) {
427+
return defaultStatusCode;
428+
}
429+
}
411430
}

0 commit comments

Comments
 (0)