Skip to content

Add maxQueueSize to limit the number of unawaited events sent to Sentry #1868

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Features

- Use `recordHttpBreadcrumbs` to set iOS `enableNetworkBreadcrumbs` ([#1884](https://github.com/getsentry/sentry-dart/pull/1884))
- Add `maxQueueSize` to limit the number of unawaited events sent to Sentry ([#1868]((https://github.com/getsentry/sentry-dart/pull/1868))

## 7.16.1

Expand Down
10 changes: 9 additions & 1 deletion dart/lib/src/sentry_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import 'sentry_stack_trace_factory.dart';
import 'transport/http_transport.dart';
import 'transport/noop_transport.dart';
import 'transport/spotlight_http_transport.dart';
import 'transport/task_queue.dart';
import 'utils/isolate_utils.dart';
import 'version.dart';
import 'sentry_envelope.dart';
Expand All @@ -32,6 +33,10 @@ const _defaultIpAddress = '{{auto}}';
/// Logs crash reports and events to the Sentry.io service.
class SentryClient {
final SentryOptions _options;
late final _taskQueue = TaskQueue<SentryId?>(
_options.maxQueueSize,
_options.logger,
);

final Random? _random;

Expand Down Expand Up @@ -514,6 +519,9 @@ class SentryClient {
Future<SentryId?> _attachClientReportsAndSend(SentryEnvelope envelope) {
final clientReport = _options.recorder.flush();
envelope.addClientReport(clientReport);
return _options.transport.send(envelope);
return _taskQueue.enqueue(
() => _options.transport.send(envelope),
SentryId.empty(),
);
}
}
14 changes: 14 additions & 0 deletions dart/lib/src/sentry_options.dart
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ class SentryOptions {
_maxSpans = maxSpans;
}

int _maxQueueSize = 30;

/// Returns the max number of events Sentry will send when calling capture
/// methods in a tight loop. Default is 30.
int get maxQueueSize => _maxQueueSize;

/// Sets how many unawaited events can be sent by Sentry. (e.g. capturing
/// events in a tight loop) at once. If you need to send more, please use the
/// await keyword.
set maxQueueSize(int count) {
assert(count > 0);
_maxQueueSize = count;
}

/// Configures up to which size request bodies should be included in events.
/// This does not change whether an event is captured.
MaxRequestBodySize maxRequestBodySize = MaxRequestBodySize.never;
Expand Down
29 changes: 29 additions & 0 deletions dart/lib/src/transport/task_queue.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import 'dart:async';

import '../../sentry.dart';

typedef Task<T> = Future<T> Function();

class TaskQueue<T> {
TaskQueue(this._maxQueueSize, this._logger);

final int _maxQueueSize;
final SentryLogger _logger;

int _queueCount = 0;

Future<T> enqueue(Task<T> task, T fallbackResult) async {
if (_queueCount >= _maxQueueSize) {
_logger(SentryLevel.warning,
'Task dropped due to backpressure. Avoid capturing in a tight loop.');
return fallbackResult;
} else {
_queueCount++;
try {
return await task();
} finally {
_queueCount--;
}
}
}
}
118 changes: 118 additions & 0 deletions dart/test/transport/tesk_queue_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import 'dart:async';

import 'package:sentry/sentry.dart';
import 'package:sentry/src/transport/task_queue.dart';
import 'package:test/test.dart';

import '../mocks.dart';

void main() {
group("called sync", () {
late Fixture fixture;

setUp(() {
fixture = Fixture();
});

test("enqueue only executed `maxQueueSize` times when not awaiting",
() async {
final sut = fixture.getSut(maxQueueSize: 5);

var completedTasks = 0;

for (int i = 0; i < 10; i++) {
unawaited(sut.enqueue(() async {
print('Task $i');
await Future.delayed(Duration(milliseconds: 1));
completedTasks += 1;
return 1 + 1;
}, -1));
}

// This will always await the other futures, even if they are running longer, as it was scheduled after them.
print('Started waiting for first 5 tasks');
await Future.delayed(Duration(milliseconds: 1));
print('Stopped waiting for first 5 tasks');

expect(completedTasks, 5);
});

test("enqueue picks up tasks again after await in-between", () async {
final sut = fixture.getSut(maxQueueSize: 5);

var completedTasks = 0;

for (int i = 1; i <= 10; i++) {
unawaited(sut.enqueue(() async {
print('Started task $i');
await Future.delayed(Duration(milliseconds: 1));
print('Completed task $i');
completedTasks += 1;
return 1 + 1;
}, -1));
}

print('Started waiting for first 5 tasks');
await Future.delayed(Duration(milliseconds: 1));
print('Stopped waiting for first 5 tasks');

for (int i = 6; i <= 15; i++) {
unawaited(sut.enqueue(() async {
print('Started task $i');
await Future.delayed(Duration(milliseconds: 1));
print('Completed task $i');
completedTasks += 1;
return 1 + 1;
}, -1));
}

print('Started waiting for second 5 tasks');
await Future.delayed(Duration(milliseconds: 5));
print('Stopped waiting for second 5 tasks');

expect(completedTasks, 10); // 10 were dropped
});

test("enqueue executes all tasks when awaiting", () async {
final sut = fixture.getSut(maxQueueSize: 5);

var completedTasks = 0;

for (int i = 0; i < 10; i++) {
await sut.enqueue(() async {
print('Task $i');
await Future.delayed(Duration(milliseconds: 1));
completedTasks += 1;
return 1 + 1;
}, -1);
}
expect(completedTasks, 10);
});

test("throwing tasks still execute as expected", () async {
final sut = fixture.getSut(maxQueueSize: 5);

var completedTasks = 0;

for (int i = 0; i < 10; i++) {
try {
await sut.enqueue(() async {
completedTasks += 1;
throw Error();
}, -1);
} catch (_) {
// Ignore
}
}
expect(completedTasks, 10);
});
});
}

class Fixture {
final options = SentryOptions(dsn: fakeDsn);

TaskQueue<int> getSut({required int maxQueueSize}) {
return TaskQueue(maxQueueSize, options.logger);
}
}