14
14
package io .dapr .actors .runtime ;
15
15
16
16
import com .fasterxml .jackson .databind .ObjectMapper ;
17
- import com .google .common .util .concurrent .ListenableFuture ;
18
17
import com .google .protobuf .Any ;
19
18
import com .google .protobuf .ByteString ;
20
19
import com .google .protobuf .Empty ;
21
20
import io .dapr .config .Properties ;
21
+ import io .dapr .exceptions .DaprException ;
22
22
import io .dapr .utils .DurationUtils ;
23
23
import io .dapr .v1 .DaprGrpc ;
24
24
import io .dapr .v1 .DaprProtos ;
25
25
import io .grpc .ManagedChannel ;
26
+ import io .grpc .stub .StreamObserver ;
26
27
import reactor .core .publisher .Mono ;
28
+ import reactor .core .publisher .MonoSink ;
27
29
28
30
import java .io .IOException ;
29
31
import java .nio .charset .Charset ;
30
32
import java .util .ArrayList ;
31
33
import java .util .List ;
34
+ import java .util .concurrent .ExecutionException ;
32
35
33
36
/**
34
37
* A DaprClient over HTTP for Actor's runtime.
@@ -48,44 +51,41 @@ class DaprGrpcClient implements DaprClient {
48
51
/**
49
52
* The GRPC client to be used.
50
53
*
51
- * @see io.dapr.v1.DaprGrpc.DaprFutureStub
54
+ * @see io.dapr.v1.DaprGrpc.DaprStub
52
55
*/
53
- private DaprGrpc .DaprFutureStub client ;
56
+ private DaprGrpc .DaprStub client ;
54
57
55
58
/**
56
59
* Internal constructor.
57
60
*
58
61
* @param channel channel (client needs to close channel after use).
59
62
*/
60
63
DaprGrpcClient (ManagedChannel channel ) {
61
- this (DaprGrpc .newFutureStub (channel ));
64
+ this (DaprGrpc .newStub (channel ));
62
65
}
63
66
64
67
/**
65
68
* Internal constructor.
66
69
*
67
- * @param grpcClient Dapr's GRPC client.
70
+ * @param daprStubClient Dapr's GRPC client.
68
71
*/
69
- DaprGrpcClient (DaprGrpc .DaprFutureStub grpcClient ) {
70
- this .client = grpcClient ;
72
+ DaprGrpcClient (DaprGrpc .DaprStub daprStubClient ) {
73
+ this .client = daprStubClient ;
71
74
}
72
75
73
76
/**
74
77
* {@inheritDoc}
75
78
*/
76
79
@ Override
77
80
public Mono <byte []> getState (String actorType , String actorId , String keyName ) {
78
- return Mono .fromCallable (() -> {
79
81
DaprProtos .GetActorStateRequest req =
80
- DaprProtos .GetActorStateRequest .newBuilder ()
81
- .setActorType (actorType )
82
- .setActorId (actorId )
83
- .setKey (keyName )
84
- .build ();
82
+ DaprProtos .GetActorStateRequest .newBuilder ()
83
+ .setActorType (actorType )
84
+ .setActorId (actorId )
85
+ .setKey (keyName )
86
+ .build ();
85
87
86
- ListenableFuture <DaprProtos .GetActorStateResponse > futureResponse = client .getActorState (req );
87
- return futureResponse .get ();
88
- }).map (r -> r .getData ().toByteArray ());
88
+ return Mono .<DaprProtos .GetActorStateResponse >create (it -> client .getActorState (req , createStreamObserver (it ))).map (r -> r .getData ().toByteArray ());
89
89
}
90
90
91
91
/**
@@ -132,10 +132,7 @@ public Mono<Void> saveStateTransactionally(
132
132
.addAllOperations (grpcOps )
133
133
.build ();
134
134
135
- return Mono .fromCallable (() -> {
136
- ListenableFuture <Empty > futureResponse = client .executeActorStateTransaction (req );
137
- return futureResponse .get ();
138
- }).then ();
135
+ return Mono .<Empty >create (it -> client .executeActorStateTransaction (req , createStreamObserver (it ))).then ();
139
136
}
140
137
141
138
/**
@@ -147,40 +144,31 @@ public Mono<Void> registerReminder(
147
144
String actorId ,
148
145
String reminderName ,
149
146
ActorReminderParams reminderParams ) {
150
- return Mono .fromCallable (() -> {
151
- DaprProtos .RegisterActorReminderRequest req =
152
- DaprProtos .RegisterActorReminderRequest .newBuilder ()
153
- .setActorType (actorType )
154
- .setActorId (actorId )
155
- .setName (reminderName )
156
- .setData (ByteString .copyFrom (reminderParams .getData ()))
157
- .setDueTime (DurationUtils .convertDurationToDaprFormat (reminderParams .getDueTime ()))
158
- .setPeriod (DurationUtils .convertDurationToDaprFormat (reminderParams .getPeriod ()))
159
- .build ();
160
-
161
- ListenableFuture <Empty > futureResponse = client .registerActorReminder (req );
162
- futureResponse .get ();
163
- return null ;
164
- });
147
+ DaprProtos .RegisterActorReminderRequest req =
148
+ DaprProtos .RegisterActorReminderRequest .newBuilder ()
149
+ .setActorType (actorType )
150
+ .setActorId (actorId )
151
+ .setName (reminderName )
152
+ .setData (ByteString .copyFrom (reminderParams .getData ()))
153
+ .setDueTime (DurationUtils .convertDurationToDaprFormat (reminderParams .getDueTime ()))
154
+ .setPeriod (DurationUtils .convertDurationToDaprFormat (reminderParams .getPeriod ()))
155
+ .build ();
156
+ return Mono .<Empty >create (it -> client .registerActorReminder (req , createStreamObserver (it ))).then ().then ();
165
157
}
166
158
167
159
/**
168
160
* {@inheritDoc}
169
161
*/
170
162
@ Override
171
163
public Mono <Void > unregisterReminder (String actorType , String actorId , String reminderName ) {
172
- return Mono .fromCallable (() -> {
173
164
DaprProtos .UnregisterActorReminderRequest req =
174
165
DaprProtos .UnregisterActorReminderRequest .newBuilder ()
175
166
.setActorType (actorType )
176
167
.setActorId (actorId )
177
168
.setName (reminderName )
178
169
.build ();
179
170
180
- ListenableFuture <Empty > futureResponse = client .unregisterActorReminder (req );
181
- futureResponse .get ();
182
- return null ;
183
- });
171
+ return Mono .<Empty >create (it -> client .unregisterActorReminder (req , createStreamObserver (it ))).then ().then ();
184
172
}
185
173
186
174
/**
@@ -192,7 +180,6 @@ public Mono<Void> registerTimer(
192
180
String actorId ,
193
181
String timerName ,
194
182
ActorTimerParams timerParams ) {
195
- return Mono .fromCallable (() -> {
196
183
DaprProtos .RegisterActorTimerRequest req =
197
184
DaprProtos .RegisterActorTimerRequest .newBuilder ()
198
185
.setActorType (actorType )
@@ -204,29 +191,41 @@ public Mono<Void> registerTimer(
204
191
.setPeriod (DurationUtils .convertDurationToDaprFormat (timerParams .getPeriod ()))
205
192
.build ();
206
193
207
- ListenableFuture <Empty > futureResponse = client .registerActorTimer (req );
208
- futureResponse .get ();
209
- return null ;
210
- });
194
+ return Mono .<Empty >create (it -> client .registerActorTimer (req , createStreamObserver (it ))).then ().then ();
211
195
}
212
196
213
197
/**
214
198
* {@inheritDoc}
215
199
*/
216
200
@ Override
217
201
public Mono <Void > unregisterTimer (String actorType , String actorId , String timerName ) {
218
- return Mono .fromCallable (() -> {
219
202
DaprProtos .UnregisterActorTimerRequest req =
220
203
DaprProtos .UnregisterActorTimerRequest .newBuilder ()
221
204
.setActorType (actorType )
222
205
.setActorId (actorId )
223
206
.setName (timerName )
224
207
.build ();
225
208
226
- ListenableFuture <Empty > futureResponse = client .unregisterActorTimer (req );
227
- futureResponse .get ();
228
- return null ;
229
- });
209
+ return Mono .<Empty >create (it -> client .unregisterActorTimer (req , createStreamObserver (it ))).then ().then ();
210
+ }
211
+
212
+ private <T > StreamObserver <T > createStreamObserver (MonoSink <T > sink ) {
213
+ return new StreamObserver <T >() {
214
+ @ Override
215
+ public void onNext (T value ) {
216
+ sink .success (value );
217
+ }
218
+
219
+ @ Override
220
+ public void onError (Throwable t ) {
221
+ sink .error (DaprException .propagate (new ExecutionException (t )));
222
+ }
223
+
224
+ @ Override
225
+ public void onCompleted () {
226
+ sink .success ();
227
+ }
228
+ };
230
229
}
231
230
232
231
}
0 commit comments