Skip to content

Commit e326e41

Browse files
authored
refactor: tracing using OpenTelemetry. (#2085)
* feat: tracing using OpenTelemetry. * Fix typos: traceProvider -> tracerProvider. * address feedback. * Address feedback. * Address feedback. * Add the missing caret (`^`) for new deps.
1 parent f73e28b commit e326e41

18 files changed

+1665
-319
lines changed

.idea/runConfigurations/System_Test.xml

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dev/src/bulk-writer.ts

Lines changed: 59 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ import {StatusCode} from './status-code';
4949
// eslint-disable-next-line no-undef
5050
import GrpcStatus = FirebaseFirestore.GrpcStatus;
5151
import api = google.firestore.v1;
52+
import {
53+
ATTRIBUTE_KEY_DOC_COUNT,
54+
SPAN_NAME_BULK_WRITER_COMMIT,
55+
} from './telemetry/trace-util';
5256

5357
/*!
5458
* The maximum number of writes that can be in a single batch.
@@ -243,55 +247,63 @@ class BulkCommitBatch extends WriteBatch {
243247
}
244248

245249
async bulkCommit(options: {requestTag?: string} = {}): Promise<void> {
246-
const tag = options?.requestTag ?? requestTag();
247-
248-
// Capture the error stack to preserve stack tracing across async calls.
249-
const stack = Error().stack!;
250-
251-
let response: api.IBatchWriteResponse;
252-
try {
253-
logger(
254-
'BulkCommitBatch.bulkCommit',
255-
tag,
256-
`Sending next batch with ${this._opCount} writes`
257-
);
258-
const retryCodes = getRetryCodes('batchWrite');
259-
response = await this._commit<
260-
api.BatchWriteRequest,
261-
api.BatchWriteResponse
262-
>({retryCodes, methodName: 'batchWrite', requestTag: tag});
263-
} catch (err) {
264-
// Map the failure to each individual write's result.
265-
const ops = Array.from({length: this.pendingOps.length});
266-
response = {
267-
writeResults: ops.map(() => {
268-
return {};
269-
}),
270-
status: ops.map(() => err),
271-
};
272-
}
273-
274-
for (let i = 0; i < (response.writeResults || []).length; ++i) {
275-
// Since delete operations currently do not have write times, use a
276-
// sentinel Timestamp value.
277-
// TODO(b/158502664): Use actual delete timestamp.
278-
const DELETE_TIMESTAMP_SENTINEL = Timestamp.fromMillis(0);
279-
280-
const status = (response.status || [])[i];
281-
if (status.code === StatusCode.OK) {
282-
const updateTime = Timestamp.fromProto(
283-
response.writeResults![i].updateTime || DELETE_TIMESTAMP_SENTINEL
284-
);
285-
this.pendingOps[i].onSuccess(new WriteResult(updateTime));
286-
} else {
287-
const error =
288-
new (require('google-gax/build/src/fallback').GoogleError)(
289-
status.message || undefined
250+
return this._firestore._traceUtil.startActiveSpan(
251+
SPAN_NAME_BULK_WRITER_COMMIT,
252+
async () => {
253+
const tag = options?.requestTag ?? requestTag();
254+
255+
// Capture the error stack to preserve stack tracing across async calls.
256+
const stack = Error().stack!;
257+
258+
let response: api.IBatchWriteResponse;
259+
try {
260+
logger(
261+
'BulkCommitBatch.bulkCommit',
262+
tag,
263+
`Sending next batch with ${this._opCount} writes`
290264
);
291-
error.code = status.code as number;
292-
this.pendingOps[i].onError(wrapError(error, stack));
265+
const retryCodes = getRetryCodes('batchWrite');
266+
response = await this._commit<
267+
api.BatchWriteRequest,
268+
api.BatchWriteResponse
269+
>({retryCodes, methodName: 'batchWrite', requestTag: tag});
270+
} catch (err) {
271+
// Map the failure to each individual write's result.
272+
const ops = Array.from({length: this.pendingOps.length});
273+
response = {
274+
writeResults: ops.map(() => {
275+
return {};
276+
}),
277+
status: ops.map(() => err),
278+
};
279+
}
280+
281+
for (let i = 0; i < (response.writeResults || []).length; ++i) {
282+
// Since delete operations currently do not have write times, use a
283+
// sentinel Timestamp value.
284+
// TODO(b/158502664): Use actual delete timestamp.
285+
const DELETE_TIMESTAMP_SENTINEL = Timestamp.fromMillis(0);
286+
287+
const status = (response.status || [])[i];
288+
if (status.code === StatusCode.OK) {
289+
const updateTime = Timestamp.fromProto(
290+
response.writeResults![i].updateTime || DELETE_TIMESTAMP_SENTINEL
291+
);
292+
this.pendingOps[i].onSuccess(new WriteResult(updateTime));
293+
} else {
294+
const error =
295+
new (require('google-gax/build/src/fallback').GoogleError)(
296+
status.message || undefined
297+
);
298+
error.code = status.code as number;
299+
this.pendingOps[i].onError(wrapError(error, stack));
300+
}
301+
}
302+
},
303+
{
304+
[ATTRIBUTE_KEY_DOC_COUNT]: this._opCount,
293305
}
294-
}
306+
);
295307
}
296308

297309
/**

dev/src/collection-group.ts

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {validateInteger} from './validate';
2929
import api = protos.google.firestore.v1;
3030
import {defaultConverter} from './types';
3131
import {compareArrays} from './order';
32+
import {SPAN_NAME_PARTITION_QUERY} from './telemetry/trace-util';
3233

3334
/**
3435
* A `CollectionGroup` refers to all documents that are contained in a
@@ -81,48 +82,53 @@ export class CollectionGroup<
8182
async *getPartitions(
8283
desiredPartitionCount: number
8384
): AsyncIterable<QueryPartition<AppModelType, DbModelType>> {
84-
validateInteger('desiredPartitionCount', desiredPartitionCount, {
85-
minValue: 1,
86-
});
87-
88-
const tag = requestTag();
89-
await this.firestore.initializeIfNeeded(tag);
90-
9185
const partitions: Array<api.IValue>[] = [];
9286

93-
if (desiredPartitionCount > 1) {
94-
// Partition queries require explicit ordering by __name__.
95-
const queryWithDefaultOrder = this.orderBy(FieldPath.documentId());
96-
const request: api.IPartitionQueryRequest =
97-
queryWithDefaultOrder.toProto();
98-
99-
// Since we are always returning an extra partition (with an empty endBefore
100-
// cursor), we reduce the desired partition count by one.
101-
request.partitionCount = desiredPartitionCount - 1;
102-
103-
const stream = await this.firestore.requestStream(
104-
'partitionQueryStream',
105-
/* bidirectional= */ false,
106-
request,
107-
tag
108-
);
109-
stream.resume();
110-
111-
for await (const currentCursor of stream) {
112-
partitions.push(currentCursor.values ?? []);
87+
await this._firestore._traceUtil.startActiveSpan(
88+
SPAN_NAME_PARTITION_QUERY,
89+
async () => {
90+
validateInteger('desiredPartitionCount', desiredPartitionCount, {
91+
minValue: 1,
92+
});
93+
94+
const tag = requestTag();
95+
await this.firestore.initializeIfNeeded(tag);
96+
97+
if (desiredPartitionCount > 1) {
98+
// Partition queries require explicit ordering by __name__.
99+
const queryWithDefaultOrder = this.orderBy(FieldPath.documentId());
100+
const request: api.IPartitionQueryRequest =
101+
queryWithDefaultOrder.toProto();
102+
103+
// Since we are always returning an extra partition (with an empty endBefore
104+
// cursor), we reduce the desired partition count by one.
105+
request.partitionCount = desiredPartitionCount - 1;
106+
107+
const stream = await this.firestore.requestStream(
108+
'partitionQueryStream',
109+
/* bidirectional= */ false,
110+
request,
111+
tag
112+
);
113+
stream.resume();
114+
115+
for await (const currentCursor of stream) {
116+
partitions.push(currentCursor.values ?? []);
117+
}
118+
}
119+
120+
logger(
121+
'Firestore.getPartitions',
122+
tag,
123+
'Received %d partitions',
124+
partitions.length
125+
);
126+
127+
// Sort the partitions as they may not be ordered if responses are paged.
128+
partitions.sort((l, r) => compareArrays(l, r));
113129
}
114-
}
115-
116-
logger(
117-
'Firestore.getPartitions',
118-
tag,
119-
'Received %d partitions',
120-
partitions.length
121130
);
122131

123-
// Sort the partitions as they may not be ordered if responses are paged.
124-
partitions.sort((l, r) => compareArrays(l, r));
125-
126132
for (let i = 0; i < partitions.length; ++i) {
127133
yield new QueryPartition(
128134
this._firestore,

0 commit comments

Comments
 (0)