25
25
import com .google .cloud .ServiceOptions ;
26
26
import com .google .cloud .datastore .execution .AggregationQueryExecutor ;
27
27
import com .google .cloud .datastore .spi .v1 .DatastoreRpc ;
28
- import com .google .cloud .datastore .telemetry .TraceUtil .Context ;
29
28
import com .google .common .base .MoreObjects ;
30
29
import com .google .common .base .Preconditions ;
30
+ import com .google .common .base .Throwables ;
31
31
import com .google .common .collect .AbstractIterator ;
32
32
import com .google .common .collect .ImmutableList ;
33
33
import com .google .common .collect .ImmutableMap ;
42
42
import io .opencensus .common .Scope ;
43
43
import io .opencensus .trace .Span ;
44
44
import io .opencensus .trace .Status ;
45
+ import io .opentelemetry .api .common .Attributes ;
46
+ import io .opentelemetry .api .trace .SpanBuilder ;
47
+ import io .opentelemetry .api .trace .SpanKind ;
48
+ import io .opentelemetry .api .trace .StatusCode ;
49
+ import io .opentelemetry .context .Context ;
45
50
import java .util .ArrayList ;
46
51
import java .util .Arrays ;
47
52
import java .util .Collections ;
53
58
import java .util .Optional ;
54
59
import java .util .Set ;
55
60
import java .util .concurrent .Callable ;
56
- import javax .annotation .Nonnull ;
61
+ import javax .annotation .Nullable ;
57
62
58
63
final class DatastoreImpl extends BaseService <DatastoreOptions > implements Datastore {
59
64
@@ -106,15 +111,18 @@ static class ReadWriteTransactionCallable<T> implements Callable<T> {
106
111
private volatile TransactionOptions options ;
107
112
private volatile Transaction transaction ;
108
113
114
+ private final com .google .cloud .datastore .telemetry .TraceUtil .SpanContext parentSpanContext ;
115
+
109
116
ReadWriteTransactionCallable (
110
117
Datastore datastore ,
111
118
TransactionCallable <T > callable ,
112
119
TransactionOptions options ,
113
- @ Nonnull Context parentTraceContext ) {
120
+ @ Nullable com . google . cloud . datastore . telemetry . TraceUtil . SpanContext parentSpanContext ) {
114
121
this .datastore = datastore ;
115
122
this .callable = callable ;
116
123
this .options = options ;
117
124
this .transaction = null ;
125
+ this .parentSpanContext = parentSpanContext ;
118
126
}
119
127
120
128
Datastore getDatastore () {
@@ -135,26 +143,57 @@ void setPrevTransactionId(ByteString transactionId) {
135
143
options = options .toBuilder ().setReadWrite (readWrite ).build ();
136
144
}
137
145
146
+ private io .opentelemetry .api .trace .Span startSpanWithParentContext (
147
+ String spanName ,
148
+ com .google .cloud .datastore .telemetry .TraceUtil .SpanContext parentSpanContext ) {
149
+ com .google .cloud .datastore .telemetry .TraceUtil otelTraceUtil =
150
+ datastore .getOptions ().getTraceUtil ();
151
+ SpanBuilder spanBuilder =
152
+ otelTraceUtil
153
+ .getTracer ()
154
+ .spanBuilder (com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_TRANSACTION_RUN )
155
+ .setSpanKind (SpanKind .PRODUCER )
156
+ .setParent (
157
+ Context .current ()
158
+ .with (
159
+ io .opentelemetry .api .trace .Span .wrap (
160
+ parentSpanContext .getSpanContext ())));
161
+ return spanBuilder .startSpan ();
162
+ }
163
+
138
164
@ Override
139
165
public T call () throws DatastoreException {
140
- com .google .cloud .datastore .telemetry .TraceUtil traceUtil =
141
- datastore .getOptions ().getTraceUtil ();
142
- com .google .cloud .datastore .telemetry .TraceUtil .Span span =
143
- traceUtil .startSpan (
166
+ // TODO Instead of using OTel Spans directly, TraceUtil.Span should be used here. However,
167
+ // the same code in startSpanInternal doesn't work when EnabledTraceUtil.StartSpan is called
168
+ // probably because of some thread-local caching that is getting lost. This needs more
169
+ // debugging. The code below works and is idiomatic but could be prettier and more consistent
170
+ // with the use of TraceUtil-provided framework.
171
+ io .opentelemetry .api .trace .Span span =
172
+ startSpanWithParentContext (
144
173
com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_TRANSACTION_RUN ,
145
- datastore . getOptions (). getTraceUtil (). getCurrentContext () );
146
- try (com . google . cloud . datastore . telemetry . TraceUtil .Scope ignored = span .makeCurrent ()) {
174
+ parentSpanContext );
175
+ try (io . opentelemetry . context .Scope ignored = span .makeCurrent ()) {
147
176
transaction = datastore .newTransaction (options );
148
177
T value = callable .run (transaction );
149
178
transaction .commit ();
150
179
return value ;
151
180
} catch (Exception ex ) {
152
181
transaction .rollback ();
182
+ span .setStatus (StatusCode .ERROR , ex .getMessage ());
183
+ span .recordException (
184
+ ex ,
185
+ Attributes .builder ()
186
+ .put ("exception.message" , ex .getMessage ())
187
+ .put ("exception.type" , ex .getClass ().getName ())
188
+ .put ("exception.stacktrace" , Throwables .getStackTraceAsString (ex ))
189
+ .build ());
190
+ span .end ();
153
191
throw DatastoreException .propagateUserException (ex );
154
192
} finally {
155
193
if (transaction .isActive ()) {
156
194
transaction .rollback ();
157
195
}
196
+ span .end ();
158
197
if (options != null
159
198
&& options .getModeCase ().equals (TransactionOptions .ModeCase .READ_WRITE )) {
160
199
setPrevTransactionId (transaction .getTransactionId ());
@@ -165,42 +204,30 @@ public T call() throws DatastoreException {
165
204
166
205
@ Override
167
206
public <T > T runInTransaction (final TransactionCallable <T > callable ) {
168
- com .google .cloud .datastore .telemetry .TraceUtil .Span span =
169
- otelTraceUtil .startSpan (
170
- com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_TRANSACTION_RUN );
171
- try (com .google .cloud .datastore .telemetry .TraceUtil .Scope ignored = span .makeCurrent ()) {
207
+ try {
172
208
return RetryHelper .runWithRetries (
173
209
new ReadWriteTransactionCallable <T >(
174
- this , callable , null , otelTraceUtil .getCurrentContext ()),
210
+ this , callable , null , otelTraceUtil .getCurrentSpanContext ()),
175
211
retrySettings ,
176
212
TRANSACTION_EXCEPTION_HANDLER ,
177
213
getOptions ().getClock ());
178
214
} catch (RetryHelperException e ) {
179
- span .end (e );
180
215
throw DatastoreException .translateAndThrow (e );
181
- } finally {
182
- span .end ();
183
216
}
184
217
}
185
218
186
219
@ Override
187
220
public <T > T runInTransaction (
188
221
final TransactionCallable <T > callable , TransactionOptions transactionOptions ) {
189
- com .google .cloud .datastore .telemetry .TraceUtil .Span span =
190
- otelTraceUtil .startSpan (
191
- com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_TRANSACTION_RUN );
192
- try (com .google .cloud .datastore .telemetry .TraceUtil .Scope ignored = span .makeCurrent ()) {
222
+ try {
193
223
return RetryHelper .runWithRetries (
194
224
new ReadWriteTransactionCallable <T >(
195
- this , callable , transactionOptions , otelTraceUtil .getCurrentContext ()),
225
+ this , callable , transactionOptions , otelTraceUtil .getCurrentSpanContext ()),
196
226
retrySettings ,
197
227
TRANSACTION_EXCEPTION_HANDLER ,
198
228
getOptions ().getClock ());
199
229
} catch (RetryHelperException e ) {
200
- span .end (e );
201
230
throw DatastoreException .translateAndThrow (e );
202
- } finally {
203
- span .end ();
204
231
}
205
232
}
206
233
@@ -258,11 +285,14 @@ public AggregationResults runAggregation(
258
285
259
286
com .google .datastore .v1 .RunQueryResponse runQuery (
260
287
final com .google .datastore .v1 .RunQueryRequest requestPb ) {
261
- com .google .cloud .datastore .telemetry .TraceUtil .Span span =
262
- otelTraceUtil .startSpan (com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_RUN_QUERY );
263
288
ReadOptions readOptions = requestPb .getReadOptions ();
264
- span .setAttribute (
265
- "isTransactional" , readOptions .hasTransaction () || readOptions .hasNewTransaction ());
289
+ boolean isTransactional = readOptions .hasTransaction () || readOptions .hasNewTransaction ();
290
+ String spanName =
291
+ (isTransactional
292
+ ? com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_TRANSACTION_RUN_QUERY
293
+ : com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_RUN_QUERY );
294
+ com .google .cloud .datastore .telemetry .TraceUtil .Span span = otelTraceUtil .startSpan (spanName );
295
+ span .setAttribute ("isTransactional" , isTransactional );
266
296
span .setAttribute ("readConsistency" , readOptions .getReadConsistency ().toString ());
267
297
268
298
try (com .google .cloud .datastore .telemetry .TraceUtil .Scope ignored = span .makeCurrent ()) {
@@ -275,7 +305,7 @@ com.google.datastore.v1.RunQueryResponse runQuery(
275
305
: TRANSACTION_OPERATION_EXCEPTION_HANDLER ,
276
306
getOptions ().getClock ());
277
307
span .addEvent (
278
- com . google . cloud . datastore . telemetry . TraceUtil . SPAN_NAME_RUN_QUERY + ": Completed" ,
308
+ spanName + ": Completed" ,
279
309
new ImmutableMap .Builder <String , Object >()
280
310
.put ("Received" , response .getBatch ().getEntityResultsCount ())
281
311
.put ("More results" , response .getBatch ().getMoreResults ().toString ())
@@ -689,7 +719,7 @@ com.google.datastore.v1.BeginTransactionResponse beginTransaction(
689
719
com .google .cloud .datastore .telemetry .TraceUtil .Span span =
690
720
otelTraceUtil .startSpan (
691
721
com .google .cloud .datastore .telemetry .TraceUtil .SPAN_NAME_BEGIN_TRANSACTION ,
692
- otelTraceUtil .getCurrentContext ());
722
+ otelTraceUtil .getCurrentSpanContext ());
693
723
try (com .google .cloud .datastore .telemetry .TraceUtil .Scope scope = span .makeCurrent ()) {
694
724
return RetryHelper .runWithRetries (
695
725
() -> datastoreRpc .beginTransaction (requestPb ),
0 commit comments