7
7
#include < algorithm>
8
8
#include < limits>
9
9
#include < memory>
10
+ #include < iostream>
10
11
#include < mutex>
11
12
#include < utility>
12
13
#include " opentelemetry/common/spin_lock_mutex.h"
@@ -186,6 +187,41 @@ void Base2ExponentialHistogramAggregation::Downscale(uint32_t by) noexcept
186
187
indexer_ = Base2ExponentialHistogramIndexer (point_data_.scale_ );
187
188
}
188
189
190
+ // Merge A and B into a new circular buffer C.
191
+ // Caller must ensure that A and B are used as buckets at the same scale.
192
+ AdaptingCircularBufferCounter MergeBuckets (size_t max_buckets, const AdaptingCircularBufferCounter &A, const AdaptingCircularBufferCounter &B)
193
+ {
194
+ AdaptingCircularBufferCounter C = AdaptingCircularBufferCounter (max_buckets);
195
+ C.Clear ();
196
+
197
+ if (A.Empty () && B.Empty ())
198
+ {
199
+ return C;
200
+ }
201
+ if (A.Empty ())
202
+ {
203
+ return B;
204
+ }
205
+ if (B.Empty ())
206
+ {
207
+ return A;
208
+ }
209
+
210
+ auto min_index = (std::min)(A.StartIndex (), B.StartIndex ());
211
+ auto max_index = (std::max)(A.EndIndex (), B.EndIndex ());
212
+
213
+ for (int i = min_index; i <= max_index; i++)
214
+ {
215
+ auto count = A.Get (i) + B.Get (i);
216
+ if (count > 0 )
217
+ {
218
+ C.Increment (i, count);
219
+ }
220
+ }
221
+
222
+ return C;
223
+ }
224
+
189
225
std::unique_ptr<Aggregation> Base2ExponentialHistogramAggregation::Merge (
190
226
const Aggregation &delta) const noexcept
191
227
{
@@ -197,44 +233,50 @@ std::unique_ptr<Aggregation> Base2ExponentialHistogramAggregation::Merge(
197
233
auto high_res = left.scale_ < right.scale_ ? right : left;
198
234
auto scale_reduction = high_res.scale_ - low_res.scale_ ;
199
235
200
- if (scale_reduction > 0 )
201
- {
202
- DownscaleBuckets (&high_res.positive_buckets_ , scale_reduction);
203
- DownscaleBuckets (&high_res.negative_buckets_ , scale_reduction);
204
- high_res.scale_ -= scale_reduction;
205
- }
206
-
207
236
Base2ExponentialHistogramPointData result_value;
208
237
result_value.count_ = low_res.count_ + high_res.count_ ;
209
238
result_value.sum_ = low_res.sum_ + high_res.sum_ ;
210
239
result_value.zero_count_ = low_res.zero_count_ + high_res.zero_count_ ;
211
240
result_value.scale_ = (std::min)(low_res.scale_ , high_res.scale_ );
212
- result_value.max_buckets_ = low_res.max_buckets_ ;
241
+ result_value.max_buckets_ = low_res.max_buckets_ >= high_res.max_buckets_
242
+ ? low_res.max_buckets_
243
+ : high_res.max_buckets_ ;
213
244
result_value.record_min_max_ = low_res.record_min_max_ && high_res.record_min_max_ ;
214
245
if (result_value.record_min_max_ )
215
246
{
216
247
result_value.min_ = (std::min)(low_res.min_ , high_res.min_ );
217
248
result_value.max_ = (std::max)(low_res.max_ , high_res.max_ );
218
249
}
219
- if (!high_res.positive_buckets_ .Empty ())
250
+
251
+ if (scale_reduction > 0 )
220
252
{
221
- for (int i = high_res.positive_buckets_ .StartIndex ();
222
- i <= high_res.positive_buckets_ .EndIndex (); i++)
223
- {
224
- low_res.positive_buckets_ .Increment (i, high_res.positive_buckets_ .Get (i));
225
- }
253
+ DownscaleBuckets (&high_res.positive_buckets_ , scale_reduction);
254
+ DownscaleBuckets (&high_res.negative_buckets_ , scale_reduction);
255
+ high_res.scale_ -= scale_reduction;
226
256
}
227
- result_value.positive_buckets_ = std::move (low_res.positive_buckets_ );
228
257
229
- if (!high_res.negative_buckets_ .Empty ())
258
+ auto pos_min_index = (std::min)(low_res.positive_buckets_ .StartIndex (), high_res.positive_buckets_ .StartIndex ());
259
+ auto pos_max_index = (std::max)(low_res.positive_buckets_ .EndIndex (), high_res.positive_buckets_ .EndIndex ());
260
+ auto neg_min_index = (std::min)(low_res.negative_buckets_ .StartIndex (), high_res.negative_buckets_ .StartIndex ());
261
+ auto neg_max_index = (std::max)(low_res.negative_buckets_ .EndIndex (), high_res.negative_buckets_ .EndIndex ());
262
+
263
+ if (pos_max_index > pos_min_index + result_value.max_buckets_ || neg_max_index > neg_min_index + result_value.max_buckets_ )
230
264
{
231
- for (int i = high_res.negative_buckets_ .StartIndex ();
232
- i <= high_res.negative_buckets_ .EndIndex (); i++)
233
- {
234
- low_res.negative_buckets_ .Increment (i, high_res.negative_buckets_ .Get (i));
235
- }
265
+ // We need to downscale the buckets to fit into the new max_buckets_.
266
+ const uint32_t scale_reduction = GetScaleReduction (pos_min_index, pos_max_index, result_value.max_buckets_ );
267
+ DownscaleBuckets (&low_res.positive_buckets_ , scale_reduction);
268
+ DownscaleBuckets (&high_res.positive_buckets_ , scale_reduction);
269
+ DownscaleBuckets (&low_res.negative_buckets_ , scale_reduction);
270
+ DownscaleBuckets (&high_res.negative_buckets_ , scale_reduction);
271
+ low_res.scale_ -= scale_reduction;
272
+ high_res.scale_ -= scale_reduction;
273
+ result_value.scale_ -= scale_reduction;
236
274
}
237
- result_value.negative_buckets_ = std::move (low_res.negative_buckets_ );
275
+
276
+ result_value.positive_buckets_ = MergeBuckets (result_value.max_buckets_ , low_res.positive_buckets_ ,
277
+ high_res.positive_buckets_ );
278
+ result_value.negative_buckets_ = MergeBuckets (result_value.max_buckets_ , low_res.negative_buckets_ ,
279
+ high_res.negative_buckets_ );
238
280
239
281
return std::unique_ptr<Base2ExponentialHistogramAggregation>{
240
282
new Base2ExponentialHistogramAggregation (std::move (result_value))};
@@ -258,58 +300,70 @@ std::unique_ptr<Aggregation> Base2ExponentialHistogramAggregation::Diff(
258
300
high_res.scale_ -= scale_reduction;
259
301
}
260
302
303
+ auto pos_min_index = (std::min)(left.positive_buckets_ .StartIndex (), right.positive_buckets_ .StartIndex ());
304
+ auto pos_max_index = (std::max)(left.positive_buckets_ .EndIndex (), right.positive_buckets_ .EndIndex ());
305
+ auto neg_min_index = (std::min)(left.negative_buckets_ .StartIndex (), right.negative_buckets_ .StartIndex ());
306
+ auto neg_max_index = (std::max)(left.negative_buckets_ .EndIndex (), right.negative_buckets_ .EndIndex ());
307
+
308
+ if (pos_max_index > pos_min_index + low_res.max_buckets_ || neg_max_index > neg_min_index + low_res.max_buckets_ )
309
+ {
310
+ // We need to downscale the buckets to fit into the new max_buckets_.
311
+ const uint32_t scale_reduction = GetScaleReduction (pos_min_index, pos_max_index, low_res.max_buckets_ );
312
+ DownscaleBuckets (&left.positive_buckets_ , scale_reduction);
313
+ DownscaleBuckets (&right.positive_buckets_ , scale_reduction);
314
+ DownscaleBuckets (&left.negative_buckets_ , scale_reduction);
315
+ DownscaleBuckets (&right.negative_buckets_ , scale_reduction);
316
+ left.scale_ -= scale_reduction;
317
+ right.scale_ -= scale_reduction;
318
+ }
319
+
261
320
Base2ExponentialHistogramPointData result_value;
262
321
result_value.scale_ = low_res.scale_ ;
263
322
result_value.max_buckets_ = low_res.max_buckets_ ;
264
323
result_value.record_min_max_ = false ;
265
324
// caution for underflow
266
- result_value.count_ = (left.count_ >= right.count_ ) ? (left.count_ - right.count_ ) : 0 ;
267
- result_value.sum_ = (left.sum_ >= right.sum_ ) ? (left.sum_ - right.sum_ ) : 0.0 ;
325
+ // expect right.{sum, count} >= left.{sum, count} since metric points should be monotonically increasing
326
+ result_value.count_ = (right.count_ >= left.count_ ) ? (right.count_ - left.count_ ) : 0.0 ;
327
+ result_value.sum_ = (right.sum_ >= left.sum_ ) ? (right.sum_ - left.sum_ ) : 0.0 ;
268
328
result_value.zero_count_ =
269
- (left.zero_count_ >= right.zero_count_ ) ? (left.zero_count_ - right.zero_count_ ) : 0 ;
270
- if (!high_res.positive_buckets_ .Empty ())
329
+ (right.zero_count_ >= left.zero_count_ ) ? (right.zero_count_ - left.zero_count_ ) : 0 ;
330
+ result_value.positive_buckets_ = AdaptingCircularBufferCounter{right.max_buckets_ };
331
+ result_value.negative_buckets_ = AdaptingCircularBufferCounter{right.max_buckets_ };
332
+
333
+ if (!left.positive_buckets_ .Empty () && !right.positive_buckets_ .Empty ())
271
334
{
272
- for (int i = high_res.positive_buckets_ .StartIndex ();
273
- i <= high_res.positive_buckets_ .EndIndex (); i++)
335
+ for (auto i = pos_min_index; i <= pos_max_index; i++)
274
336
{
275
- low_res.positive_buckets_ .Increment (i, 0 - high_res.positive_buckets_ .Get (i));
337
+ auto l_cnt = left.positive_buckets_ .Get (i);
338
+ auto r_cnt = right.positive_buckets_ .Get (i);
339
+ // expect right >= left since metric points should be monotonically increasing
340
+ auto delta = (std::max)((uint64_t )(0 ), r_cnt - l_cnt);
341
+ if (l_cnt > 0 )
342
+ {
343
+ result_value.positive_buckets_ .Increment (i, delta);
344
+ }
276
345
}
277
346
}
278
- result_value.positive_buckets_ = std::move (low_res.positive_buckets_ );
279
347
280
- if (!high_res .negative_buckets_ .Empty ())
348
+ if (!left. negative_buckets_ . Empty () && !right .negative_buckets_ .Empty ())
281
349
{
282
- for (int i = high_res.negative_buckets_ .StartIndex ();
283
- i <= high_res.negative_buckets_ .EndIndex (); i++)
350
+ for (auto i = neg_min_index; i <= neg_max_index; i++)
284
351
{
285
- low_res.negative_buckets_ .Increment (i, 0 - high_res.negative_buckets_ .Get (i));
352
+ auto l_cnt = left.negative_buckets_ .Get (i);
353
+ auto r_cnt = right.negative_buckets_ .Get (i);
354
+ // expect right >= left since metric points should be monotonically increasing
355
+ auto delta = (std::max)((uint64_t )(0 ), r_cnt - l_cnt);
356
+ if (delta > 0 )
357
+ {
358
+ result_value.negative_buckets_ .Increment (i, delta);
359
+ }
286
360
}
287
361
}
288
- result_value.negative_buckets_ = std::move (low_res.negative_buckets_ );
289
362
290
363
return std::unique_ptr<Base2ExponentialHistogramAggregation>{
291
364
new Base2ExponentialHistogramAggregation (std::move (result_value))};
292
365
}
293
366
294
- // std::unique_ptr<Aggregation> Base2ExponentialHistogramAggregation::Diff(
295
- // const Aggregation &next) const noexcept
296
- // {
297
- // auto curr_value = nostd::get<Base2ExponentialHistogramPointData>(ToPoint());
298
- // auto next_value = nostd::get<Base2ExponentialHistogramPointData>(
299
- // (static_cast<const Base2ExponentialHistogramAggregation &>(next).ToPoint()));
300
-
301
- // Base2ExponentialHistogramPointData result_value;
302
- // result_value.scale_ = curr_value.scale_;
303
- // result_value.max_buckets_ = curr_value.max_buckets_;
304
- // result_value.record_min_max_ = false;
305
- // result_value.count_ = next_value.count_ - curr_value.count_;
306
- // result_value.sum_ = next_value.sum_ - curr_value.sum_;
307
- // result_value.zero_count_ = next_value.zero_count_ - curr_value.zero_count_;
308
-
309
- // return std::unique_ptr<Base2ExponentialHistogramAggregation>{
310
- // new Base2ExponentialHistogramAggregation(std::move(result_value))};
311
- // }
312
-
313
367
PointType Base2ExponentialHistogramAggregation::ToPoint () const noexcept
314
368
{
315
369
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked (lock_);
0 commit comments