14
14
package io .dapr .actors .runtime ;
15
15
16
16
import com .fasterxml .jackson .databind .ObjectMapper ;
17
- import com .google .common .util .concurrent .ListenableFuture ;
18
17
import com .google .protobuf .Any ;
19
18
import com .google .protobuf .ByteString ;
20
19
import com .google .protobuf .Empty ;
21
20
import io .dapr .config .Properties ;
21
+ import io .dapr .exceptions .DaprException ;
22
22
import io .dapr .utils .DurationUtils ;
23
23
import io .dapr .v1 .DaprGrpc ;
24
24
import io .dapr .v1 .DaprProtos ;
25
25
import io .grpc .ManagedChannel ;
26
+ import io .grpc .stub .StreamObserver ;
26
27
import reactor .core .publisher .Mono ;
28
+ import reactor .core .publisher .MonoSink ;
27
29
28
30
import java .io .IOException ;
29
31
import java .nio .charset .Charset ;
30
32
import java .util .ArrayList ;
31
33
import java .util .List ;
34
+ import java .util .concurrent .ExecutionException ;
32
35
33
36
/**
34
37
* A DaprClient over HTTP for Actor's runtime.
@@ -48,44 +51,42 @@ class DaprGrpcClient implements DaprClient {
48
51
/**
49
52
* The GRPC client to be used.
50
53
*
51
- * @see io.dapr.v1.DaprGrpc.DaprFutureStub
54
+ * @see io.dapr.v1.DaprGrpc.DaprStub
52
55
*/
53
- private DaprGrpc .DaprFutureStub client ;
56
+ private DaprGrpc .DaprStub client ;
54
57
55
58
/**
56
59
* Internal constructor.
57
60
*
58
61
* @param channel channel (client needs to close channel after use).
59
62
*/
60
63
DaprGrpcClient (ManagedChannel channel ) {
61
- this (DaprGrpc .newFutureStub (channel ));
64
+ this (DaprGrpc .newStub (channel ));
62
65
}
63
66
64
67
/**
65
68
* Internal constructor.
66
69
*
67
- * @param grpcClient Dapr's GRPC client.
70
+ * @param daprStubClient Dapr's GRPC client.
68
71
*/
69
- DaprGrpcClient (DaprGrpc .DaprFutureStub grpcClient ) {
70
- this .client = grpcClient ;
72
+ DaprGrpcClient (DaprGrpc .DaprStub daprStubClient ) {
73
+ this .client = daprStubClient ;
71
74
}
72
75
73
76
/**
74
77
* {@inheritDoc}
75
78
*/
76
79
@ Override
77
80
public Mono <byte []> getState (String actorType , String actorId , String keyName ) {
78
- return Mono .fromCallable (() -> {
79
- DaprProtos .GetActorStateRequest req =
80
- DaprProtos .GetActorStateRequest .newBuilder ()
81
- .setActorType (actorType )
82
- .setActorId (actorId )
83
- .setKey (keyName )
84
- .build ();
85
-
86
- ListenableFuture <DaprProtos .GetActorStateResponse > futureResponse = client .getActorState (req );
87
- return futureResponse .get ();
88
- }).map (r -> r .getData ().toByteArray ());
81
+ DaprProtos .GetActorStateRequest req =
82
+ DaprProtos .GetActorStateRequest .newBuilder ()
83
+ .setActorType (actorType )
84
+ .setActorId (actorId )
85
+ .setKey (keyName )
86
+ .build ();
87
+
88
+ return Mono .<DaprProtos .GetActorStateResponse >create (it ->
89
+ client .getActorState (req , createStreamObserver (it ))).map (r -> r .getData ().toByteArray ());
89
90
}
90
91
91
92
/**
@@ -132,10 +133,7 @@ public Mono<Void> saveStateTransactionally(
132
133
.addAllOperations (grpcOps )
133
134
.build ();
134
135
135
- return Mono .fromCallable (() -> {
136
- ListenableFuture <Empty > futureResponse = client .executeActorStateTransaction (req );
137
- return futureResponse .get ();
138
- }).then ();
136
+ return Mono .<Empty >create (it -> client .executeActorStateTransaction (req , createStreamObserver (it ))).then ();
139
137
}
140
138
141
139
/**
@@ -147,40 +145,31 @@ public Mono<Void> registerReminder(
147
145
String actorId ,
148
146
String reminderName ,
149
147
ActorReminderParams reminderParams ) {
150
- return Mono .fromCallable (() -> {
151
- DaprProtos .RegisterActorReminderRequest req =
152
- DaprProtos .RegisterActorReminderRequest .newBuilder ()
153
- .setActorType (actorType )
154
- .setActorId (actorId )
155
- .setName (reminderName )
156
- .setData (ByteString .copyFrom (reminderParams .getData ()))
157
- .setDueTime (DurationUtils .convertDurationToDaprFormat (reminderParams .getDueTime ()))
158
- .setPeriod (DurationUtils .convertDurationToDaprFormat (reminderParams .getPeriod ()))
159
- .build ();
160
-
161
- ListenableFuture <Empty > futureResponse = client .registerActorReminder (req );
162
- futureResponse .get ();
163
- return null ;
164
- });
148
+ DaprProtos .RegisterActorReminderRequest req =
149
+ DaprProtos .RegisterActorReminderRequest .newBuilder ()
150
+ .setActorType (actorType )
151
+ .setActorId (actorId )
152
+ .setName (reminderName )
153
+ .setData (ByteString .copyFrom (reminderParams .getData ()))
154
+ .setDueTime (DurationUtils .convertDurationToDaprFormat (reminderParams .getDueTime ()))
155
+ .setPeriod (DurationUtils .convertDurationToDaprFormat (reminderParams .getPeriod ()))
156
+ .build ();
157
+ return Mono .<Empty >create (it -> client .registerActorReminder (req , createStreamObserver (it ))).then ().then ();
165
158
}
166
159
167
160
/**
168
161
* {@inheritDoc}
169
162
*/
170
163
@ Override
171
164
public Mono <Void > unregisterReminder (String actorType , String actorId , String reminderName ) {
172
- return Mono .fromCallable (() -> {
173
- DaprProtos .UnregisterActorReminderRequest req =
174
- DaprProtos .UnregisterActorReminderRequest .newBuilder ()
175
- .setActorType (actorType )
176
- .setActorId (actorId )
177
- .setName (reminderName )
178
- .build ();
179
-
180
- ListenableFuture <Empty > futureResponse = client .unregisterActorReminder (req );
181
- futureResponse .get ();
182
- return null ;
183
- });
165
+ DaprProtos .UnregisterActorReminderRequest req =
166
+ DaprProtos .UnregisterActorReminderRequest .newBuilder ()
167
+ .setActorType (actorType )
168
+ .setActorId (actorId )
169
+ .setName (reminderName )
170
+ .build ();
171
+
172
+ return Mono .<Empty >create (it -> client .unregisterActorReminder (req , createStreamObserver (it ))).then ().then ();
184
173
}
185
174
186
175
/**
@@ -192,41 +181,52 @@ public Mono<Void> registerTimer(
192
181
String actorId ,
193
182
String timerName ,
194
183
ActorTimerParams timerParams ) {
195
- return Mono .fromCallable (() -> {
196
- DaprProtos .RegisterActorTimerRequest req =
197
- DaprProtos .RegisterActorTimerRequest .newBuilder ()
198
- .setActorType (actorType )
199
- .setActorId (actorId )
200
- .setName (timerName )
201
- .setCallback (timerParams .getCallback ())
202
- .setData (ByteString .copyFrom (timerParams .getData ()))
203
- .setDueTime (DurationUtils .convertDurationToDaprFormat (timerParams .getDueTime ()))
204
- .setPeriod (DurationUtils .convertDurationToDaprFormat (timerParams .getPeriod ()))
205
- .build ();
206
-
207
- ListenableFuture <Empty > futureResponse = client .registerActorTimer (req );
208
- futureResponse .get ();
209
- return null ;
210
- });
184
+ DaprProtos .RegisterActorTimerRequest req =
185
+ DaprProtos .RegisterActorTimerRequest .newBuilder ()
186
+ .setActorType (actorType )
187
+ .setActorId (actorId )
188
+ .setName (timerName )
189
+ .setCallback (timerParams .getCallback ())
190
+ .setData (ByteString .copyFrom (timerParams .getData ()))
191
+ .setDueTime (DurationUtils .convertDurationToDaprFormat (timerParams .getDueTime ()))
192
+ .setPeriod (DurationUtils .convertDurationToDaprFormat (timerParams .getPeriod ()))
193
+ .build ();
194
+
195
+ return Mono .<Empty >create (it -> client .registerActorTimer (req , createStreamObserver (it ))).then ().then ();
211
196
}
212
197
213
198
/**
214
199
* {@inheritDoc}
215
200
*/
216
201
@ Override
217
202
public Mono <Void > unregisterTimer (String actorType , String actorId , String timerName ) {
218
- return Mono .fromCallable (() -> {
219
- DaprProtos .UnregisterActorTimerRequest req =
220
- DaprProtos .UnregisterActorTimerRequest .newBuilder ()
221
- .setActorType (actorType )
222
- .setActorId (actorId )
223
- .setName (timerName )
224
- .build ();
225
-
226
- ListenableFuture <Empty > futureResponse = client .unregisterActorTimer (req );
227
- futureResponse .get ();
228
- return null ;
229
- });
203
+ DaprProtos .UnregisterActorTimerRequest req =
204
+ DaprProtos .UnregisterActorTimerRequest .newBuilder ()
205
+ .setActorType (actorType )
206
+ .setActorId (actorId )
207
+ .setName (timerName )
208
+ .build ();
209
+
210
+ return Mono .<Empty >create (it -> client .unregisterActorTimer (req , createStreamObserver (it ))).then ().then ();
211
+ }
212
+
213
+ private <T > StreamObserver <T > createStreamObserver (MonoSink <T > sink ) {
214
+ return new StreamObserver <T >() {
215
+ @ Override
216
+ public void onNext (T value ) {
217
+ sink .success (value );
218
+ }
219
+
220
+ @ Override
221
+ public void onError (Throwable t ) {
222
+ sink .error (DaprException .propagate (new ExecutionException (t )));
223
+ }
224
+
225
+ @ Override
226
+ public void onCompleted () {
227
+ sink .success ();
228
+ }
229
+ };
230
230
}
231
231
232
232
}
0 commit comments