@@ -84,6 +84,13 @@ namespace NWilson {
84
84
}
85
85
};
86
86
87
+ struct TExportRequestData : TIntrusiveListItem<TExportRequestData> {
88
+ std::unique_ptr<grpc::ClientContext> Context;
89
+ std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader;
90
+ grpc::Status Status;
91
+ NServiceProto::ExportTraceServiceResponse Response;
92
+ };
93
+
87
94
class TWilsonUploader
88
95
: public TActorBootstrapped<TWilsonUploader>
89
96
{
@@ -95,6 +102,7 @@ namespace NWilson {
95
102
ui64 MaxBytesInBatch;
96
103
TDuration MaxBatchAccumulation = TDuration::Seconds(1 );
97
104
TDuration MaxSpanTimeInQueue;
105
+ ui64 MaxExportInflight;
98
106
99
107
bool WakeupScheduled = false ;
100
108
@@ -106,10 +114,6 @@ namespace NWilson {
106
114
grpc::CompletionQueue CQ;
107
115
108
116
std::unique_ptr<IGrpcSigner> GrpcSigner;
109
- std::unique_ptr<grpc::ClientContext> Context;
110
- std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader;
111
- NServiceProto::ExportTraceServiceResponse Response;
112
- grpc::Status Status;
113
117
114
118
TBatch CurrentBatch;
115
119
std::queue<TBatch::TData> BatchQueue;
@@ -119,13 +123,17 @@ namespace NWilson {
119
123
bool BatchCompletionScheduled = false ;
120
124
TMonotonic NextBatchCompletion;
121
125
126
+ TIntrusiveListWithAutoDelete<TExportRequestData, TDelete> ExportRequests;
127
+ size_t ExportRequestsCount = 0 ;
128
+
122
129
public:
123
130
TWilsonUploader (WilsonUploaderParams params)
124
131
: MaxSpansPerSecond(params.MaxSpansPerSecond)
125
132
, MaxSpansInBatch(params.MaxSpansInBatch)
126
133
, MaxBytesInBatch(params.MaxBytesInBatch)
127
134
, MaxBatchAccumulation(params.MaxBatchAccumulation)
128
135
, MaxSpanTimeInQueue(TDuration::Seconds(params.SpanExportTimeoutSeconds))
136
+ , MaxExportInflight(params.MaxExportRequestsInflight)
129
137
, CollectorUrl(std::move(params.CollectorUrl))
130
138
, ServiceName(std::move(params.ServiceName))
131
139
, GrpcSigner(std::move(params.GrpcSigner))
@@ -149,6 +157,10 @@ namespace NWilson {
149
157
ALOG_WARN (WILSON_SERVICE_ID, " max_spans_in_batch shold be greater than 0, changing to 1" );
150
158
MaxSpansInBatch = 1 ;
151
159
}
160
+ if (MaxExportInflight == 0 ) {
161
+ ALOG_WARN (WILSON_SERVICE_ID, " max_span_export_inflight should be greater than 0, changing to 1" );
162
+ MaxExportInflight = 1 ;
163
+ }
152
164
153
165
TStringBuf scheme;
154
166
TStringBuf host;
@@ -243,7 +255,7 @@ namespace NWilson {
243
255
" dropped " << numSpansDropped << " span(s) due to expiration" );
244
256
}
245
257
246
- if (Context || BatchQueue.empty ()) {
258
+ if (ExportRequestsCount >= MaxExportInflight || BatchQueue.empty ()) {
247
259
return ;
248
260
} else if (now < NextSendTimestamp) {
249
261
ScheduleWakeup (NextSendTimestamp);
@@ -267,29 +279,42 @@ namespace NWilson {
267
279
SpansSizeBytes -= batch.SizeBytes ;
268
280
269
281
ScheduleWakeup (NextSendTimestamp);
270
- Context = std::make_unique<grpc::ClientContext>();
282
+
283
+ auto context = std::make_unique<grpc::ClientContext>();
271
284
if (GrpcSigner) {
272
- GrpcSigner->SignClientContext (*Context );
285
+ GrpcSigner->SignClientContext (*context );
273
286
}
274
- Reader = Stub->AsyncExport (Context.get (), std::move (batch.Request ), &CQ);
275
- Reader->Finish (&Response, &Status, nullptr );
287
+ auto reader = Stub->AsyncExport (context.get (), std::move (batch.Request ), &CQ);
288
+ auto uploadData = std::unique_ptr<TExportRequestData>(new TExportRequestData {
289
+ .Context = std::move (context),
290
+ .Reader = std::move (reader),
291
+ });
292
+ uploadData->Reader ->Finish (&uploadData->Response , &uploadData->Status , uploadData.get ());
293
+ ALOG_TRACE (WILSON_SERVICE_ID, " started export request " << (void *)uploadData.get ());
294
+ ExportRequests.PushBack (uploadData.release ());
295
+ ++ExportRequestsCount;
276
296
}
277
297
278
- void CheckIfDone () {
279
- if (Context) {
280
- void *tag;
281
- bool ok;
282
- if (CQ.AsyncNext (&tag, &ok, std::chrono::system_clock::now ()) == grpc::CompletionQueue::GOT_EVENT) {
283
- if (!Status.ok ()) {
284
- ALOG_ERROR (WILSON_SERVICE_ID,
285
- " failed to commit traces: " << Status.error_message ());
286
- }
287
-
288
- Reader.reset ();
289
- Context.reset ();
290
- } else {
291
- ScheduleWakeup (TDuration::MilliSeconds (100 ));
298
+ void ReapCompletedRequests () {
299
+ if (ExportRequests.Empty ()) {
300
+ return ;
301
+ }
302
+ void * tag;
303
+ bool ok;
304
+ while (CQ.AsyncNext (&tag, &ok, std::chrono::system_clock::now ()) == grpc::CompletionQueue::GOT_EVENT) {
305
+ auto node = std::unique_ptr<TExportRequestData>(static_cast <TExportRequestData*>(tag));
306
+ ALOG_TRACE (WILSON_SERVICE_ID, " finished export request " << (void *)node.get ());
307
+ if (!node->Status .ok ()) {
308
+ ALOG_ERROR (WILSON_SERVICE_ID,
309
+ " failed to commit traces: " << node->Status .error_message ());
292
310
}
311
+
312
+ --ExportRequestsCount;
313
+ node->Unlink ();
314
+ }
315
+
316
+ if (!ExportRequests.Empty ()) {
317
+ ScheduleWakeup (TDuration::MilliSeconds (100 ));
293
318
}
294
319
}
295
320
@@ -322,7 +347,7 @@ namespace NWilson {
322
347
}
323
348
324
349
void TryMakeProgress () {
325
- CheckIfDone ();
350
+ ReapCompletedRequests ();
326
351
TryToSend ();
327
352
}
328
353
0 commit comments