50
50
import io .dapr .client .resiliency .ResiliencyOptions ;
51
51
import io .dapr .config .Properties ;
52
52
import io .dapr .exceptions .DaprException ;
53
- import io .dapr .internal .opencensus . GrpcWrapper ;
53
+ import io .dapr .internal .exceptions . DaprHttpException ;
54
54
import io .dapr .internal .resiliency .RetryPolicy ;
55
55
import io .dapr .internal .resiliency .TimeoutPolicy ;
56
56
import io .dapr .serializer .DaprObjectSerializer ;
65
65
import io .grpc .ClientCall ;
66
66
import io .grpc .ClientInterceptor ;
67
67
import io .grpc .ForwardingClientCall ;
68
+ import io .grpc .ForwardingClientCallListener ;
68
69
import io .grpc .Metadata ;
69
70
import io .grpc .MethodDescriptor ;
70
71
import io .grpc .stub .StreamObserver ;
81
82
import java .util .Iterator ;
82
83
import java .util .List ;
83
84
import java .util .Map ;
84
- import java .util .concurrent .ExecutionException ;
85
85
import java .util .function .Consumer ;
86
86
import java .util .stream .Collectors ;
87
87
88
+ import static io .dapr .internal .exceptions .DaprHttpException .isSuccessfulHttpStatusCode ;
89
+ import static io .dapr .internal .exceptions .DaprHttpException .isValidHttpStatusCode ;
90
+ import static io .dapr .internal .exceptions .DaprHttpException .parseHttpStatusCode ;
91
+ import static io .dapr .internal .opencensus .GrpcWrapper .appendTracingToMetadata ;
92
+
88
93
/**
89
94
* An adapter for the GRPC Client.
90
95
*
@@ -351,6 +356,7 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
351
356
*/
352
357
@ Override
353
358
public <T > Mono <T > invokeBinding (InvokeBindingRequest request , TypeRef <T > type ) {
359
+ Metadata responseMetadata = new Metadata ();
354
360
try {
355
361
final String name = request .getName ();
356
362
final String operation = request .getOperation ();
@@ -377,10 +383,19 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
377
383
378
384
return Mono .deferContextual (
379
385
context -> this .<DaprProtos .InvokeBindingResponse >createMono (
380
- it -> intercept (context , asyncStub ).invokeBinding (envelope , it )
386
+ responseMetadata ,
387
+ it -> intercept (context , asyncStub , m -> responseMetadata .merge (m )).invokeBinding (envelope , it )
381
388
)
382
389
).flatMap (
383
390
it -> {
391
+ int httpStatusCode =
392
+ parseHttpStatusCode (it .getMetadataMap ().getOrDefault ("statusCode" , "" ));
393
+ if (isValidHttpStatusCode (httpStatusCode ) && !isSuccessfulHttpStatusCode (httpStatusCode )) {
394
+ // Exception condition in a successful request.
395
+ // This is useful to send an exception due to an error from the HTTP binding component.
396
+ throw DaprException .propagate (new DaprHttpException (httpStatusCode , it .getData ().toByteArray ()));
397
+ }
398
+
384
399
try {
385
400
return Mono .justOrEmpty (objectSerializer .deserialize (it .getData ().toByteArray (), type ));
386
401
} catch (IOException e ) {
@@ -1155,21 +1170,74 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
1155
1170
* @param client GRPC client for Dapr.
1156
1171
* @return Client after adding interceptors.
1157
1172
*/
1158
- private static DaprGrpc .DaprStub intercept (ContextView context , DaprGrpc .DaprStub client ) {
1159
- return GrpcWrapper .intercept (context , client );
1173
+ private static DaprGrpc .DaprStub intercept (
1174
+ ContextView context ,
1175
+ DaprGrpc .DaprStub client ) {
1176
+ return intercept (context , client , null );
1177
+ }
1178
+
1179
+ /**
1180
+ * Populates GRPC client with interceptors for telemetry - internal use only.
1181
+ *
1182
+ * @param context Reactor's context.
1183
+ * @param client GRPC client for Dapr.
1184
+ * @param metadataConsumer Handles metadata result.
1185
+ * @return Client after adding interceptors.
1186
+ */
1187
+ public static DaprGrpc .DaprStub intercept (
1188
+ ContextView context ,
1189
+ DaprGrpc .DaprStub client ,
1190
+ Consumer <Metadata > metadataConsumer ) {
1191
+ ClientInterceptor interceptor = new ClientInterceptor () {
1192
+ @ Override
1193
+ public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
1194
+ MethodDescriptor <ReqT , RespT > methodDescriptor ,
1195
+ CallOptions options ,
1196
+ Channel channel ) {
1197
+ ClientCall <ReqT , RespT > clientCall = channel .newCall (methodDescriptor , options );
1198
+ return new ForwardingClientCall .SimpleForwardingClientCall <>(clientCall ) {
1199
+ @ Override
1200
+ public void start (final Listener <RespT > responseListener , final Metadata metadata ) {
1201
+ appendTracingToMetadata (context , metadata );
1202
+
1203
+ final ClientCall .Listener <RespT > headerListener =
1204
+ new ForwardingClientCallListener .SimpleForwardingClientCallListener <>(responseListener ) {
1205
+ @ Override
1206
+ public void onHeaders (Metadata headers ) {
1207
+ responseListener .onHeaders (headers );
1208
+ if (metadataConsumer != null ) {
1209
+ metadataConsumer .accept (headers );
1210
+ }
1211
+ }
1212
+ };
1213
+ super .start (headerListener , metadata );
1214
+ }
1215
+ };
1216
+ }
1217
+ };
1218
+ return client .withInterceptors (interceptor );
1160
1219
}
1161
1220
1162
1221
private <T > Mono <T > createMono (Consumer <StreamObserver <T >> consumer ) {
1222
+ return this .createMono (null , consumer );
1223
+ }
1224
+
1225
+ private <T > Mono <T > createMono (Metadata metadata , Consumer <StreamObserver <T >> consumer ) {
1163
1226
return retryPolicy .apply (
1164
- Mono .create (sink -> DaprException .wrap (() -> consumer .accept (createStreamObserver (sink ))).run ()));
1227
+ Mono .create (sink -> DaprException .wrap (() -> consumer .accept (
1228
+ createStreamObserver (sink , metadata ))).run ()));
1165
1229
}
1166
1230
1167
1231
private <T > Flux <T > createFlux (Consumer <StreamObserver <T >> consumer ) {
1232
+ return this .createFlux (null , consumer );
1233
+ }
1234
+
1235
+ private <T > Flux <T > createFlux (Metadata metadata , Consumer <StreamObserver <T >> consumer ) {
1168
1236
return retryPolicy .apply (
1169
- Flux .create (sink -> DaprException .wrap (() -> consumer .accept (createStreamObserver (sink ))).run ()));
1237
+ Flux .create (sink -> DaprException .wrap (() -> consumer .accept (createStreamObserver (sink , metadata ))).run ()));
1170
1238
}
1171
1239
1172
- private <T > StreamObserver <T > createStreamObserver (MonoSink <T > sink ) {
1240
+ private <T > StreamObserver <T > createStreamObserver (MonoSink <T > sink , Metadata grpcMetadata ) {
1173
1241
return new StreamObserver <T >() {
1174
1242
@ Override
1175
1243
public void onNext (T value ) {
@@ -1178,7 +1246,7 @@ public void onNext(T value) {
1178
1246
1179
1247
@ Override
1180
1248
public void onError (Throwable t ) {
1181
- sink .error (DaprException .propagate (new ExecutionException ( t )));
1249
+ sink .error (DaprException .propagate (DaprHttpException . fromGrpcExecutionException ( grpcMetadata , t )));
1182
1250
}
1183
1251
1184
1252
@ Override
@@ -1188,7 +1256,7 @@ public void onCompleted() {
1188
1256
};
1189
1257
}
1190
1258
1191
- private <T > StreamObserver <T > createStreamObserver (FluxSink <T > sink ) {
1259
+ private <T > StreamObserver <T > createStreamObserver (FluxSink <T > sink , final Metadata grpcMetadata ) {
1192
1260
return new StreamObserver <T >() {
1193
1261
@ Override
1194
1262
public void onNext (T value ) {
@@ -1197,7 +1265,7 @@ public void onNext(T value) {
1197
1265
1198
1266
@ Override
1199
1267
public void onError (Throwable t ) {
1200
- sink .error (DaprException .propagate (new ExecutionException ( t )));
1268
+ sink .error (DaprException .propagate (DaprHttpException . fromGrpcExecutionException ( grpcMetadata , t )));
1201
1269
}
1202
1270
1203
1271
@ Override
0 commit comments