Skip to content

msglist: Throttle fetchOlder retries #1050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions lib/api/backoff.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ class BackoffMachine {
/// not the (random) previous wait duration itself.
static const double base = 2;

/// In debug mode, overrides the duration of the backoff wait.
///
/// Outside of debug mode, this is always `null` and the setter has no effect.
static Duration? get debugDuration {
Duration? result;
assert(() {
result = _debugDuration;
return true;
}());
return result;
}
static Duration? _debugDuration;
static set debugDuration(Duration? newValue) {
assert(() {
_debugDuration = newValue;
return true;
}());
}

/// A future that resolves after an appropriate backoff time,
/// with jitter applied to capped exponential growth.
///
Expand Down Expand Up @@ -66,8 +85,8 @@ class BackoffMachine {
Future<void> wait() async {
final bound = _minDuration(maxBound,
firstBound * pow(base, _waitsCompleted));
final duration = _maxDuration(const Duration(microseconds: 1),
bound * Random().nextDouble());
final duration = debugDuration ?? _maxDuration(const Duration(microseconds: 1),
bound * Random().nextDouble());
Comment on lines +88 to +89
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line too long (and in particular the value of microseconds is past 80 columns, just)

await Future<void>.delayed(duration);
_waitsCompleted++;
}
Expand Down
74 changes: 63 additions & 11 deletions lib/model/message_list.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import 'dart:async';

import 'package:collection/collection.dart';
import 'package:flutter/foundation.dart';

import '../api/backoff.dart';
import '../api/model/events.dart';
import '../api/model/model.dart';
import '../api/route/messages.dart';
Expand Down Expand Up @@ -89,9 +92,32 @@ mixin _MessageSequence {
bool _haveOldest = false;

/// Whether we are currently fetching the next batch of older messages.
///
/// When this is true, [fetchOlder] is a no-op.
/// That method is called frequently by Flutter's scrolling logic,
/// and this field helps us avoid spamming the same request just to get
/// the same response each time.
///
/// See also [fetchOlderCoolingDown].
bool get fetchingOlder => _fetchingOlder;
bool _fetchingOlder = false;

/// Whether [fetchOlder] had a request error recently.
///
/// When this is true, [fetchOlder] is a no-op.
/// That method is called frequently by Flutter's scrolling logic,
/// and this field mitigates spamming the same request and getting
/// the same error each time.
///
/// "Recently" is decided by a [BackoffMachine] that resets
/// when a [fetchOlder] request succeeds.
///
/// See also [fetchingOlder].
bool get fetchOlderCoolingDown => _fetchOlderCoolingDown;
bool _fetchOlderCoolingDown = false;

BackoffMachine? _fetchOlderCooldownBackoffMachine;

/// The parsed message contents, as a list parallel to [messages].
///
/// The i'th element is the result of parsing the i'th element of [messages].
Expand All @@ -107,7 +133,7 @@ mixin _MessageSequence {
/// before, between, or after the messages.
///
/// This information is completely derived from [messages] and
/// the flags [haveOldest] and [fetchingOlder].
/// the flags [haveOldest], [fetchingOlder] and [fetchOlderCoolingDown].
/// It exists as an optimization, to memoize that computation.
final QueueList<MessageListItem> items = QueueList();

Expand Down Expand Up @@ -241,6 +267,8 @@ mixin _MessageSequence {
_fetched = false;
_haveOldest = false;
_fetchingOlder = false;
_fetchOlderCoolingDown = false;
_fetchOlderCooldownBackoffMachine = null;
contents.clear();
items.clear();
}
Expand Down Expand Up @@ -288,8 +316,11 @@ mixin _MessageSequence {

/// Update [items] to include markers at start and end as appropriate.
void _updateEndMarkers() {
assert(!(haveOldest && fetchingOlder));
final startMarker = switch ((fetchingOlder, haveOldest)) {
assert(fetched);
assert(!(fetchingOlder && fetchOlderCoolingDown));
final effectiveFetchingOlder = fetchingOlder || fetchOlderCoolingDown;
assert(!(effectiveFetchingOlder && haveOldest));
final startMarker = switch ((effectiveFetchingOlder, haveOldest)) {
(true, _) => const MessageListLoadingItem(MessageListDirection.older),
(_, true) => const MessageListHistoryStartItem(),
(_, _) => null,
Expand Down Expand Up @@ -469,7 +500,7 @@ class MessageListView with ChangeNotifier, _MessageSequence {
Future<void> fetchInitial() async {
// TODO(#80): fetch from anchor firstUnread, instead of newest
// TODO(#82): fetch from a given message ID as anchor
assert(!fetched && !haveOldest && !fetchingOlder);
assert(!fetched && !haveOldest && !fetchingOlder && !fetchOlderCoolingDown);
assert(messages.isEmpty && contents.isEmpty);
// TODO schedule all this in another isolate
final generation = this.generation;
Expand Down Expand Up @@ -497,20 +528,28 @@ class MessageListView with ChangeNotifier, _MessageSequence {
Future<void> fetchOlder() async {
if (haveOldest) return;
if (fetchingOlder) return;
if (fetchOlderCoolingDown) return;
assert(fetched);
assert(messages.isNotEmpty);
_fetchingOlder = true;
_updateEndMarkers();
notifyListeners();
final generation = this.generation;
bool hasFetchError = false;
try {
final result = await getMessages(store.connection,
narrow: narrow.apiEncode(),
anchor: NumericAnchor(messages[0].id),
includeAnchor: false,
numBefore: kMessageListFetchBatchSize,
numAfter: 0,
);
final GetMessagesResult result;
try {
result = await getMessages(store.connection,
narrow: narrow.apiEncode(),
anchor: NumericAnchor(messages[0].id),
includeAnchor: false,
numBefore: kMessageListFetchBatchSize,
numAfter: 0,
);
} catch (e) {
hasFetchError = true;
rethrow;
}
if (this.generation > generation) return;

if (result.messages.isNotEmpty
Expand All @@ -531,6 +570,19 @@ class MessageListView with ChangeNotifier, _MessageSequence {
} finally {
if (this.generation == generation) {
_fetchingOlder = false;
if (hasFetchError) {
assert(!fetchOlderCoolingDown);
_fetchOlderCoolingDown = true;
unawaited((_fetchOlderCooldownBackoffMachine ??= BackoffMachine())
.wait().then((_) {
if (this.generation != generation) return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
if (this.generation != generation) return;
if (this.generation > generation) return;

(matching the similar checks elsewhere)

_fetchOlderCoolingDown = false;
Comment on lines 572 to +579
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or perhaps in fact we can avoid introducing more flags in the first place. How about we fuse the new flag with the old one — or put another way, instead of adding a new flag, we adjust the semantics of the existing fetchingOlder? Like this:

Suggested change
_fetchingOlder = false;
if (hasFetchError) {
assert(!fetchOlderCoolingDown);
_fetchOlderCoolingDown = true;
unawaited((_fetchOlderCooldownBackoffMachine ??= BackoffMachine())
.wait().then((_) {
if (this.generation != generation) return;
_fetchOlderCoolingDown = false;
if (!hasFetchError) {
_fetchingOlder = false;
} else {
unawaited((_fetchOlderCooldownBackoffMachine ??= BackoffMachine())
.wait().then((_) {
if (this.generation != generation) return;
_fetchingOlder = false;

_updateEndMarkers();
notifyListeners();
}));
} else {
_fetchOlderCooldownBackoffMachine = null;
}
_updateEndMarkers();
notifyListeners();
}
Expand Down
110 changes: 108 additions & 2 deletions test/model/message_list_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import 'dart:convert';
import 'package:checks/checks.dart';
import 'package:http/http.dart' as http;
import 'package:test/scaffolding.dart';
import 'package:zulip/api/backoff.dart';
import 'package:zulip/api/exception.dart';
import 'package:zulip/api/model/events.dart';
import 'package:zulip/api/model/model.dart';
import 'package:zulip/api/model/narrow.dart';
Expand Down Expand Up @@ -238,6 +240,40 @@ void main() {
..messages.length.equals(30);
});

test('fetchOlder nop during backoff', () => awaitFakeAsync((async) async {
final olderMessages = List.generate(5, (i) => eg.streamMessage());
final initialMessages = List.generate(5, (i) => eg.streamMessage());
await prepare(narrow: const CombinedFeedNarrow());
await prepareMessages(foundOldest: false, messages: initialMessages);
check(connection.takeRequests()).single;

connection.prepare(httpStatus: 400, json: {
'result': 'error', 'code': 'BAD_REQUEST', 'msg': 'Bad request'});
check(async.pendingTimers).isEmpty();
await check(model.fetchOlder()).throws<ZulipApiException>();
checkNotified(count: 2);
check(model).fetchOlderCoolingDown.isTrue();
check(connection.takeRequests()).single;

await model.fetchOlder();
checkNotNotified();
check(model).fetchOlderCoolingDown.isTrue();
check(model).fetchingOlder.isFalse();
check(connection.lastRequest).isNull();

// Wait long enough that a first backoff is sure to finish.
async.elapse(const Duration(seconds: 1));
check(model).fetchOlderCoolingDown.isFalse();
checkNotifiedOnce();
check(connection.lastRequest).isNull();

connection.prepare(json: olderResult(
anchor: 1000, foundOldest: false, messages: olderMessages).toJson());
await model.fetchOlder();
checkNotified(count: 2);
check(connection.takeRequests()).single;
}));
Comment on lines +272 to +275
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


test('fetchOlder handles servers not understanding includeAnchor', () async {
const narrow = CombinedFeedNarrow();
await prepare(narrow: narrow);
Expand Down Expand Up @@ -1020,6 +1056,70 @@ void main() {
checkNotNotified();
}));

test('fetchOlder backoff A starts, _reset, move fetch finishes,'
' fetchOlder backoff B starts, fetchOlder backoff A ends', () => awaitFakeAsync((async) async {
addTearDown(() => BackoffMachine.debugDuration = null);
await prepareNarrow(narrow, initialMessages);

connection.prepare(httpStatus: 400, json: {
'result': 'error', 'code': 'BAD_REQUEST', 'msg': 'Bad request'});
BackoffMachine.debugDuration = const Duration(seconds: 1);
await check(model.fetchOlder()).throws<ZulipApiException>();
final backoffTimerA = async.pendingTimers.single;
check(model).fetchOlderCoolingDown.isTrue();
check(model).fetched.isTrue();
checkHasMessages(initialMessages);
checkNotified(count: 2);

connection.prepare(json: newestResult(
foundOldest: false,
messages: initialMessages + movedMessages,
).toJson());
await store.handleEvent(eg.updateMessageEventMoveTo(
origTopic: movedMessages[0].topic,
origStreamId: otherStream.streamId,
newMessages: movedMessages,
));
// Check that _reset was called.
check(model).fetched.isFalse();
checkHasMessages([]);
checkNotifiedOnce();
check(model).fetchOlderCoolingDown.isFalse();
check(backoffTimerA.isActive).isTrue();

async.elapse(Duration.zero);
check(model).fetched.isTrue();
checkHasMessages(initialMessages + movedMessages);
checkNotifiedOnce();
check(model).fetchOlderCoolingDown.isFalse();
check(backoffTimerA.isActive).isTrue();

connection.prepare(httpStatus: 400, json: {
'result': 'error', 'code': 'BAD_REQUEST', 'msg': 'Bad request'});
BackoffMachine.debugDuration = const Duration(seconds: 2);
await check(model.fetchOlder()).throws<ZulipApiException>();
final backoffTimerB = async.pendingTimers.last;
check(model).fetchOlderCoolingDown.isTrue();
check(backoffTimerA.isActive).isTrue();
check(backoffTimerB.isActive).isTrue();
checkNotified(count: 2);

// When `backoffTimerA` ends, `fetchOlderCoolingDown` remains `true`
// because the backoff was from a previous generation.
async.elapse(const Duration(seconds: 1));
check(model).fetchOlderCoolingDown.isTrue();
check(backoffTimerA.isActive).isFalse();
check(backoffTimerB.isActive).isTrue();
checkNotNotified();

// When `backoffTimerB` ends, `fetchOlderCoolingDown` gets reset.
async.elapse(const Duration(seconds: 1));
check(model).fetchOlderCoolingDown.isFalse();
check(backoffTimerA.isActive).isFalse();
check(backoffTimerB.isActive).isFalse();
checkNotifiedOnce();
}));
Comment on lines +1107 to +1121
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new test looks good!


test('fetchInitial, _reset, initial fetch finishes, move fetch finishes', () => awaitFakeAsync((async) async {
await prepareNarrow(narrow, null);

Expand Down Expand Up @@ -1750,10 +1850,15 @@ void checkInvariants(MessageListView model) {
check(model)
..messages.isEmpty()
..haveOldest.isFalse()
..fetchingOlder.isFalse();
..fetchingOlder.isFalse()
..fetchOlderCoolingDown.isFalse();
}
if (model.haveOldest) {
check(model).fetchingOlder.isFalse();
check(model).fetchOlderCoolingDown.isFalse();
}
if (model.fetchingOlder) {
check(model).fetchOlderCoolingDown.isFalse();
}

for (final message in model.messages) {
Expand Down Expand Up @@ -1793,7 +1898,7 @@ void checkInvariants(MessageListView model) {
if (model.haveOldest) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apropos of my previous comment #1050 (comment) : there's a new invariant, so let's have this checkInvariants function check that invariant.

check(model.items[i++]).isA<MessageListHistoryStartItem>();
}
if (model.fetchingOlder) {
if (model.fetchingOlder || model.fetchOlderCoolingDown) {
check(model.items[i++]).isA<MessageListLoadingItem>();
}
for (int j = 0; j < model.messages.length; j++) {
Expand Down Expand Up @@ -1849,4 +1954,5 @@ extension MessageListViewChecks on Subject<MessageListView> {
Subject<bool> get fetched => has((x) => x.fetched, 'fetched');
Subject<bool> get haveOldest => has((x) => x.haveOldest, 'haveOldest');
Subject<bool> get fetchingOlder => has((x) => x.fetchingOlder, 'fetchingOlder');
Subject<bool> get fetchOlderCoolingDown => has((x) => x.fetchOlderCoolingDown, 'fetchOlderCoolingDown');
}