diff --git a/Cargo.lock b/Cargo.lock index bccfa8fd522..c93da76361a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3080,6 +3080,7 @@ dependencies = [ "async-trait", "bitflags 2.4.1", "eyeball", + "eyeball-im", "futures-executor", "futures-util", "http", @@ -3288,6 +3289,7 @@ dependencies = [ "matrix-sdk-ui", "once_cell", "rand 0.8.5", + "stream_assert", "tempfile", "tokio", "tracing", diff --git a/bindings/matrix-sdk-ffi/src/room_info.rs b/bindings/matrix-sdk-ffi/src/room_info.rs index cbcb5a6a812..580db3554a5 100644 --- a/bindings/matrix-sdk-ffi/src/room_info.rs +++ b/bindings/matrix-sdk-ffi/src/room_info.rs @@ -31,6 +31,15 @@ pub struct RoomInfo { user_defined_notification_mode: Option, has_room_call: bool, active_room_call_participants: Vec, + /// "Interesting" messages received in that room, independently of the + /// notification settings. + num_unread_messages: u64, + /// Events that will notify the user, according to their + /// notification settings. + num_unread_notifications: u64, + /// Events causing mentions/highlights for the user, according to their + /// notification settings. + num_unread_mentions: u64, } impl RoomInfo { @@ -75,6 +84,9 @@ impl RoomInfo { .iter() .map(|u| u.to_string()) .collect(), + num_unread_messages: room.num_unread_messages(), + num_unread_notifications: room.num_unread_notifications(), + num_unread_mentions: room.num_unread_mentions(), }) } } diff --git a/crates/matrix-sdk-base/Cargo.toml b/crates/matrix-sdk-base/Cargo.toml index 01592d5951f..fc6092e6a66 100644 --- a/crates/matrix-sdk-base/Cargo.toml +++ b/crates/matrix-sdk-base/Cargo.toml @@ -40,6 +40,7 @@ assert_matches2 = { workspace = true, optional = true } async-trait = { workspace = true } bitflags = "2.1.0" eyeball = { workspace = true } +eyeball-im = { workspace = true } futures-util = { workspace = true } http = { workspace = true, optional = true } matrix-sdk-common = { version = "0.6.0", path = "../matrix-sdk-common" } diff --git a/crates/matrix-sdk-base/src/lib.rs b/crates/matrix-sdk-base/src/lib.rs index 74718894d62..6f40ea2a141 100644 --- a/crates/matrix-sdk-base/src/lib.rs +++ b/crates/matrix-sdk-base/src/lib.rs @@ -30,8 +30,14 @@ mod error; pub mod latest_event; pub mod media; mod rooms; + +#[cfg(feature = "experimental-sliding-sync")] +mod read_receipts; +#[cfg(feature = "experimental-sliding-sync")] +pub use read_receipts::PreviousEventsProvider; #[cfg(feature = "experimental-sliding-sync")] mod sliding_sync; + pub mod store; pub mod sync; mod utils; diff --git a/crates/matrix-sdk-base/src/read_receipts.rs b/crates/matrix-sdk-base/src/read_receipts.rs new file mode 100644 index 00000000000..6faeabd9e2c --- /dev/null +++ b/crates/matrix-sdk-base/src/read_receipts.rs @@ -0,0 +1,439 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! # Client-side read receipts computation +//! +//! While Matrix servers have the ability to provide basic information about the +//! unread status of rooms, via [`matrix_sdk::ruma::UnreadNotificationCounts`], +//! it's not reliable for encrypted rooms. Indeed, the server doesn't have +//! access to the content of encrypted events, so it can only makes guesses when +//! estimating unread and highlight counts. +//! +//! Instead, this module provides facilities to compute the number of unread +//! messages, unread notifications and unread highlights in a room. +//! +//! Counting unread messages is performed by looking at the latest receipt of +//! the current user, and inferring which events are following it, according to +//! the sync ordering. +//! +//! For notifications and highlights to be precisely accounted for, we also need +//! to pay attention to the user's notification settings. Fortunately, this is +//! also something we need to for notifications, so we can reuse this code. +//! +//! Of course, not all events are created equal, and some are less interesting +//! than others, and shouldn't cause a room to be marked unread. This module's +//! `marks_as_unread` function shows the opiniated set of rules that will filter +//! out uninterested events. +//! +//! The only public method in that module is [`compute_notifications`], which +//! updates the `RoomInfo` in place according to the new counts. + +use eyeball_im::Vector; +use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; +use ruma::{ + events::{ + poll::{start::PollStartEventContent, unstable_start::UnstablePollStartEventContent}, + receipt::{ReceiptThread, ReceiptType}, + room::message::Relation, + AnySyncMessageLikeEvent, AnySyncTimelineEvent, OriginalSyncMessageLikeEvent, + SyncMessageLikeEvent, + }, + serde::Raw, + EventId, OwnedEventId, RoomId, UserId, +}; +use tracing::{instrument, trace}; + +use super::BaseClient; +use crate::{error::Result, store::StateChanges, RoomInfo}; + +/// Provider for timeline events prior to the current sync. +pub trait PreviousEventsProvider: Send + Sync { + /// Returns the list of known timeline events, in sync order, for the given + /// room. + fn for_room(&self, room_id: &RoomId) -> Vector; +} + +impl PreviousEventsProvider for () { + fn for_room(&self, _: &RoomId) -> Vector { + Vector::new() + } +} + +/// Given a set of events coming from sync, for a room, update the +/// [`RoomInfo`]'s counts of unread messages, notifications and highlights' in +/// place. +/// +/// A provider of previous events may be required to reconcile a read receipt +/// that has been just received for an event that came in a previous sync. +/// +/// See this module's documentation for more information. +#[instrument(skip_all, fields(room_id = %room_info.room_id))] +pub(crate) async fn compute_notifications( + client: &BaseClient, + changes: &StateChanges, + previous_events_provider: &PEP, + new_events: &[SyncTimelineEvent], + room_info: &mut RoomInfo, +) -> Result<()> { + let user_id = &client.session_meta().unwrap().user_id; + let prev_latest_receipt_event_id = room_info.read_receipts.latest_read_receipt_event_id.clone(); + + if let Some(receipt_event) = changes.receipts.get(room_info.room_id()) { + trace!("Got a new receipt event!"); + + // Find a private or public read receipt for the current user. + let mut receipt_event_id = None; + if let Some((event_id, receipt)) = receipt_event + .user_receipt(user_id, ReceiptType::Read) + .or_else(|| receipt_event.user_receipt(user_id, ReceiptType::ReadPrivate)) + { + if receipt.thread == ReceiptThread::Unthreaded || receipt.thread == ReceiptThread::Main + { + receipt_event_id = Some(event_id.to_owned()); + } + } + + if let Some(receipt_event_id) = receipt_event_id { + // We've found the id of an event to which the receipt attaches. The associated + // event may either come from the new batch of events associated to + // this sync, or it may live in the past timeline events we know + // about. + + // First, save the event id as the latest one that has a read receipt. + room_info.read_receipts.latest_read_receipt_event_id = Some(receipt_event_id.clone()); + + // Try to find if the read receipt refers to an event from the current sync, to + // avoid searching the cached timeline events. + trace!("We got a new event with a read receipt: {receipt_event_id}. Search in new events..."); + if find_and_count_events(&receipt_event_id, user_id, new_events, room_info) { + // It did, so our work here is done. + return Ok(()); + } + + // We didn't find the event attached to the receipt in the new batches of + // events. It's possible it's referring to an event we've already + // seen. In that case, try to find it. + let previous_events = previous_events_provider.for_room(&room_info.room_id); + + trace!("Couldn't find the event attached to the receipt in the new events; looking in past events too now..."); + if find_and_count_events( + &receipt_event_id, + user_id, + previous_events.iter().chain(new_events.iter()), + room_info, + ) { + // It did refer to an old event, so our work here is done. + return Ok(()); + } + } + } + + if let Some(receipt_event_id) = prev_latest_receipt_event_id { + // There's no new read-receipt here. We assume the cached events have been + // properly processed, and we only need to process the new events based + // on the previous receipt. + trace!("No new receipts, or couldn't find attached event; looking if the past latest known receipt refers to a new event..."); + if find_and_count_events(&receipt_event_id, user_id, new_events, room_info) { + // We found the event to which the previous receipt attached to, so our work is + // done here. + return Ok(()); + } + } + + // If we haven't returned at this point, it means that either we had no previous + // read receipt, or the previous read receipt was not attached to any new + // event. + // + // In that case, accumulate all events as part of the current batch, and wait + // for the next receipt. + trace!("Default path: including all new events for the receipts count."); + for event in new_events { + count_unread_and_mentions(event, user_id, room_info); + } + + Ok(()) +} + +#[inline(always)] +fn count_unread_and_mentions( + event: &SyncTimelineEvent, + user_id: &UserId, + room_info: &mut RoomInfo, +) { + if marks_as_unread(&event.event, user_id) { + room_info.read_receipts.num_unread += 1; + } + for action in &event.push_actions { + if action.should_notify() { + room_info.read_receipts.num_notifications += 1; + } + if action.is_highlight() { + room_info.read_receipts.num_mentions += 1; + } + } +} + +/// Try to find the event to which the receipt attaches to, and if found, will +/// update the notification count in the room. +/// +/// Returns a boolean indicating if it's found the event and updated the count. +fn find_and_count_events<'a>( + receipt_event_id: &EventId, + user_id: &UserId, + events: impl IntoIterator, + room_info: &mut RoomInfo, +) -> bool { + let mut counting_receipts = false; + for event in events { + if counting_receipts { + count_unread_and_mentions(event, user_id, room_info); + } else if let Ok(Some(event_id)) = event.event.get_field::("event_id") { + if event_id == receipt_event_id { + // Bingo! Switch over to the counting state, after resetting the + // previous counts. + trace!("Found the event the receipt was referring to! Starting to count."); + room_info.read_receipts.num_unread = 0; + room_info.read_receipts.num_notifications = 0; + room_info.read_receipts.num_mentions = 0; + counting_receipts = true; + } + } + } + counting_receipts +} + +/// Is the event worth marking a room as unread? +fn marks_as_unread(event: &Raw, user_id: &UserId) -> bool { + let event = match event.deserialize() { + Ok(event) => event, + Err(err) => { + tracing::debug!( + "couldn't deserialize event {:?}: {err}", + event.get_field::("event_id").ok().flatten() + ); + return false; + } + }; + + if event.sender() == user_id { + // Not interested in one's own events. + return false; + } + + match event { + ruma::events::AnySyncTimelineEvent::MessageLike(event) => { + // Filter out redactions. + let Some(content) = event.original_content() else { + tracing::trace!("not interesting because redacted"); + return false; + }; + + // Filter out edits. + if matches!( + content.relation(), + Some(ruma::events::room::encrypted::Relation::Replacement(..)) + ) { + tracing::trace!("not interesting because edited"); + return false; + } + + match event { + AnySyncMessageLikeEvent::CallAnswer(_) + | AnySyncMessageLikeEvent::CallInvite(_) + | AnySyncMessageLikeEvent::CallHangup(_) + | AnySyncMessageLikeEvent::CallCandidates(_) + | AnySyncMessageLikeEvent::CallNegotiate(_) + | AnySyncMessageLikeEvent::CallReject(_) + | AnySyncMessageLikeEvent::CallSelectAnswer(_) + | AnySyncMessageLikeEvent::PollResponse(_) + | AnySyncMessageLikeEvent::UnstablePollResponse(_) + | AnySyncMessageLikeEvent::Reaction(_) + | AnySyncMessageLikeEvent::RoomRedaction(_) + | AnySyncMessageLikeEvent::KeyVerificationStart(_) + | AnySyncMessageLikeEvent::KeyVerificationReady(_) + | AnySyncMessageLikeEvent::KeyVerificationCancel(_) + | AnySyncMessageLikeEvent::KeyVerificationAccept(_) + | AnySyncMessageLikeEvent::KeyVerificationDone(_) + | AnySyncMessageLikeEvent::KeyVerificationMac(_) + | AnySyncMessageLikeEvent::KeyVerificationKey(_) => false, + + // For some reason, Ruma doesn't handle these two in `content.relation()` above. + AnySyncMessageLikeEvent::PollStart(SyncMessageLikeEvent::Original( + OriginalSyncMessageLikeEvent { + content: + PollStartEventContent { relates_to: Some(Relation::Replacement(_)), .. }, + .. + }, + )) + | AnySyncMessageLikeEvent::UnstablePollStart(SyncMessageLikeEvent::Original( + OriginalSyncMessageLikeEvent { + content: UnstablePollStartEventContent::Replacement(_), + .. + }, + )) => false, + + AnySyncMessageLikeEvent::Message(_) + | AnySyncMessageLikeEvent::PollStart(_) + | AnySyncMessageLikeEvent::UnstablePollStart(_) + | AnySyncMessageLikeEvent::PollEnd(_) + | AnySyncMessageLikeEvent::UnstablePollEnd(_) + | AnySyncMessageLikeEvent::RoomEncrypted(_) + | AnySyncMessageLikeEvent::RoomMessage(_) + | AnySyncMessageLikeEvent::Sticker(_) => true, + + _ => { + // What I don't know about, I don't care about. + tracing::debug!("unhandled timeline event type: {}", event.event_type()); + false + } + } + } + + ruma::events::AnySyncTimelineEvent::State(_) => false, + } +} + +#[cfg(test)] +mod tests { + use std::ops::Not as _; + + use matrix_sdk_test::sync_timeline_event; + use ruma::{event_id, user_id}; + + use crate::read_receipts::marks_as_unread; + + #[test] + fn test_room_message_marks_as_unread() { + let user_id = user_id!("@alice:example.org"); + let other_user_id = user_id!("@bob:example.org"); + + // A message from somebody else marks the room as unread... + let ev = sync_timeline_event!({ + "sender": other_user_id, + "type": "m.room.message", + "event_id": "$ida", + "origin_server_ts": 12344446, + "content": { "body":"A", "msgtype": "m.text" }, + }); + assert!(marks_as_unread(&ev, user_id)); + + // ... but a message from ourselves doesn't. + let ev = sync_timeline_event!({ + "sender": user_id, + "type": "m.room.message", + "event_id": "$ida", + "origin_server_ts": 12344446, + "content": { "body":"A", "msgtype": "m.text" }, + }); + assert!(marks_as_unread(&ev, user_id).not()); + } + + #[test] + fn test_room_edit_doesnt_mark_as_unread() { + let user_id = user_id!("@alice:example.org"); + let other_user_id = user_id!("@bob:example.org"); + + // An edit to a message from somebody else doesn't mark the room as unread. + let ev = sync_timeline_event!({ + "sender": other_user_id, + "type": "m.room.message", + "event_id": "$ida", + "origin_server_ts": 12344446, + "content": { + "body": " * edited message", + "m.new_content": { + "body": "edited message", + "msgtype": "m.text" + }, + "m.relates_to": { + "event_id": "$someeventid:localhost", + "rel_type": "m.replace" + }, + "msgtype": "m.text" + }, + }); + assert!(marks_as_unread(&ev, user_id).not()); + } + + #[test] + fn test_redaction_doesnt_mark_room_as_unread() { + let user_id = user_id!("@alice:example.org"); + let other_user_id = user_id!("@bob:example.org"); + + // A redact of a message from somebody else doesn't mark the room as unread. + let ev = sync_timeline_event!({ + "content": { + "reason": "🛑" + }, + "event_id": "$151957878228ssqrJ:localhost", + "origin_server_ts": 151957878000000_u64, + "sender": other_user_id, + "type": "m.room.redaction", + "redacts": "$151957878228ssqrj:localhost", + "unsigned": { + "age": 85 + } + }); + + assert!(marks_as_unread(&ev, user_id).not()); + } + + #[test] + fn test_reaction_doesnt_mark_room_as_unread() { + let user_id = user_id!("@alice:example.org"); + let other_user_id = user_id!("@bob:example.org"); + + // A reaction from somebody else to a message doesn't mark the room as unread. + let ev = sync_timeline_event!({ + "content": { + "m.relates_to": { + "event_id": "$15275047031IXQRi:localhost", + "key": "👍", + "rel_type": "m.annotation" + } + }, + "event_id": "$15275047031IXQRi:localhost", + "origin_server_ts": 159027581000000_u64, + "sender": other_user_id, + "type": "m.reaction", + "unsigned": { + "age": 85 + } + }); + + assert!(marks_as_unread(&ev, user_id).not()); + } + + #[test] + fn test_state_event_doesnt_mark_as_unread() { + let user_id = user_id!("@alice:example.org"); + let event_id = event_id!("$1"); + let ev = sync_timeline_event!({ + "content": { + "displayname": "Alice", + "membership": "join", + }, + "event_id": event_id, + "origin_server_ts": 1432135524678u64, + "sender": user_id, + "state_key": user_id, + "type": "m.room.member", + }); + + assert!(marks_as_unread(&ev, user_id).not()); + + let other_user_id = user_id!("@bob:example.org"); + assert!(marks_as_unread(&ev, other_user_id).not()); + } +} diff --git a/crates/matrix-sdk-base/src/rooms/mod.rs b/crates/matrix-sdk-base/src/rooms/mod.rs index 0c6b5b8b7b2..a27ee98c691 100644 --- a/crates/matrix-sdk-base/src/rooms/mod.rs +++ b/crates/matrix-sdk-base/src/rooms/mod.rs @@ -100,7 +100,7 @@ pub struct BaseRoomInfo { pub(crate) tombstone: Option>, /// The topic of this room. pub(crate) topic: Option>, - /// All Minimal state events that containing one or more running matrixRTC + /// All minimal state events that containing one or more running matrixRTC /// memberships. #[serde(skip_serializing_if = "BTreeMap::is_empty", default)] pub(crate) rtc_member: BTreeMap>, diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index 715e339b38e..81cab793133 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -187,6 +187,31 @@ impl Room { self.inner.read().notification_counts } + /// Get the number of unread messages (computed client-side). + /// + /// This might be more precise than [`Self::unread_notification_counts`] for + /// encrypted rooms. + pub fn num_unread_messages(&self) -> u64 { + self.inner.read().read_receipts.num_unread + } + + /// Get the number of unread notifications (computed client-side). + /// + /// This might be more precise than [`Self::unread_notification_counts`] for + /// encrypted rooms. + pub fn num_unread_notifications(&self) -> u64 { + self.inner.read().read_receipts.num_notifications + } + + /// Get the number of unread mentions (computed client-side), that is, + /// messages causing a highlight in a room. + /// + /// This might be more precise than [`Self::unread_notification_counts`] for + /// encrypted rooms. + pub fn num_unread_mentions(&self) -> u64 { + self.inner.read().read_receipts.num_mentions + } + /// Check if the room has its members fully synced. /// /// Members might be missing if lazy member loading was enabled for the @@ -710,6 +735,25 @@ impl Room { } } +/// Information about read receipts collected during processing of that room. +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct RoomReadReceipts { + /// Does the room have unread messages? + pub(crate) num_unread: u64, + + /// Does the room have unread events that should notify? + pub(crate) num_notifications: u64, + + /// Does the room have messages causing highlights for the users? (aka + /// mentions) + pub(crate) num_mentions: u64, + + /// The id of the event the last unthreaded (or main-threaded, for better + /// compatibility with clients that have thread support) read receipt is + /// attached to. + pub(crate) latest_read_receipt_event_id: Option, +} + /// The underlying pure data structure for joined and left rooms. /// /// Holds all the info needed to persist a room into the state store. @@ -717,23 +761,39 @@ impl Room { pub struct RoomInfo { /// The unique room id of the room. pub(crate) room_id: OwnedRoomId, + /// The state of the room. pub(crate) room_state: RoomState, - /// The unread notifications counts. + + /// The unread notifications counts, as returned by the server. + /// + /// These might be incorrect for encrypted rooms, since the server doesn't + /// have access to the content of the encrypted events. pub(crate) notification_counts: UnreadNotificationsCount, + /// The summary of this room. pub(crate) summary: RoomSummary, + /// Flag remembering if the room members are synced. pub(crate) members_synced: bool, + /// The prev batch of this room we received during the last sync. pub(crate) last_prev_batch: Option, + /// How much we know about this room. pub(crate) sync_info: SyncInfo, + /// Whether or not the encryption info was been synced. pub(crate) encryption_state_synced: bool, + /// The last event send by sliding sync #[cfg(feature = "experimental-sliding-sync")] pub(crate) latest_event: Option>, + + /// Information about read receipts for this room. + #[serde(default)] + pub(crate) read_receipts: RoomReadReceipts, + /// Base room info which holds some basic event contents important for the /// room state. pub(crate) base_info: Box, @@ -770,6 +830,7 @@ impl RoomInfo { encryption_state_synced: false, #[cfg(feature = "experimental-sliding-sync")] latest_event: None, + read_receipts: Default::default(), base_info: Box::new(BaseRoomInfo::new()), } } @@ -1251,6 +1312,7 @@ mod tests { Raw::from_json_string(json!({"sender": "@u:i.uk"}).to_string()).unwrap().into(), ))), base_info: Box::new(BaseRoomInfo::new()), + read_receipts: Default::default(), }; let info_json = json!({ @@ -1290,6 +1352,12 @@ mod tests { "name": null, "tombstone": null, "topic": null, + }, + "read_receipts": { + "num_unread": 0, + "num_mentions": 0, + "num_notifications": 0, + "latest_read_receipt_event_id": null, } }); diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index f2c9af4cf26..985659eb7d6 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -16,7 +16,6 @@ use std::collections::BTreeMap; #[cfg(feature = "e2e-encryption")] use std::ops::Deref; -#[cfg(feature = "e2e-encryption")] use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; #[cfg(feature = "e2e-encryption")] use ruma::events::AnyToDeviceEvent; @@ -42,6 +41,7 @@ use crate::RoomMemberships; use crate::{ deserialized_responses::AmbiguityChanges, error::Result, + read_receipts::{compute_notifications, PreviousEventsProvider}, rooms::RoomState, store::{ambiguity_map::AmbiguityCache, StateChanges, Store}, sync::{JoinedRoom, LeftRoom, Rooms, SyncResponse}, @@ -113,7 +113,11 @@ impl BaseClient { /// * `response` - The response that we received after a successful sliding /// sync. #[instrument(skip_all, level = "trace")] - pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result { + pub async fn process_sliding_sync( + &self, + response: &v4::Response, + previous_events_provider: &PEP, + ) -> Result { let v4::Response { // FIXME not yet supported by sliding sync. see // https://github.com/matrix-org/matrix-rust-sdk/issues/1014 @@ -152,11 +156,11 @@ impl BaseClient { let mut new_rooms = Rooms::default(); let mut notifications = Default::default(); - for (room_id, room_data) in rooms { - let (room_to_store, joined_room, left_room, invited_room) = self + for (room_id, response_room_data) in rooms { + let (room_info, joined_room, left_room, invited_room) = self .process_sliding_sync_room( room_id, - room_data, + response_room_data, account_data, &store, &mut changes, @@ -165,7 +169,7 @@ impl BaseClient { ) .await?; - changes.add_room(room_to_store); + changes.add_room(room_info); if let Some(joined_room) = joined_room { new_rooms.join.insert(room_id.clone(), joined_room); @@ -180,7 +184,10 @@ impl BaseClient { } } - // Process receipts now we have rooms. + // Handle read receipts and typing notifications independently of the rooms: + // these both live in a different subsection of the server's response, + // so they may exist without any update for the associated room. + for (room_id, raw) in &extensions.receipts.rooms { match raw.deserialize() { Ok(event) => { @@ -191,24 +198,51 @@ impl BaseClient { #[rustfmt::skip] warn!( ?room_id, event_id, - "Failed to deserialize ephemeral room event: {e}" + "Failed to deserialize read receipt room event: {e}" ); } } - // Also include the receipts in the room update, so the timeline is aware of - // those. We assume that those happen only in joined rooms. - let room_update = - new_rooms.join.entry(room_id.clone()).or_insert_with(JoinedRoom::default); - room_update.ephemeral.push(raw.clone().cast()); + // We assume this can only happen in joined rooms, or something's very wrong. + new_rooms + .join + .entry(room_id.to_owned()) + .or_insert_with(JoinedRoom::default) + .ephemeral + .push(raw.clone().cast()); } for (room_id, raw) in &extensions.typing.rooms { - // Include the typing notifications in the room update, so the timeline is aware - // of those. We assume that those happen only in joined rooms. - let room_update = - new_rooms.join.entry(room_id.clone()).or_insert_with(JoinedRoom::default); - room_update.ephemeral.push(raw.clone().cast()); + // We assume this can only happen in joined rooms, or something's very wrong. + new_rooms + .join + .entry(room_id.to_owned()) + .or_insert_with(JoinedRoom::default) + .ephemeral + .push(raw.clone().cast()); + } + + // Rooms in `new_rooms.join` either have a timeline update, or a new read + // receipt. Update the read receipt accordingly. + for (room_id, joined_room_update) in &mut new_rooms.join { + if let Some(mut room_info) = changes + .room_infos + .get(room_id) + .cloned() + .or_else(|| self.get_room(room_id).map(|r| r.clone_info())) + { + // TODO only add the room if there was an update + compute_notifications( + self, + &changes, + previous_events_provider, + &joined_room_update.timeline.events, + &mut room_info, + ) + .await?; + + changes.add_room(room_info); + } } // TODO remove this, we're processing account data events here again @@ -337,23 +371,27 @@ impl BaseClient { } } - let notification_count = room_data.unread_notifications.clone().into(); - room_info.update_notification_count(notification_count); - match room_info.state() { - RoomState::Joined => Ok(( - room_info, - Some(JoinedRoom::new( - timeline, - raw_state_events, - room_account_data.unwrap_or_default(), - Vec::new(), /* ephemeral events are handled later in - * `Self::process_sliding_sync`. */ - notification_count, - )), - None, - None, - )), + RoomState::Joined => { + // Ephemeral events are added separately, because we might not + // have a room subsection in the response, yet we may have receipts for + // that room. + let ephemeral = Vec::new(); + + let notification_counts = room_info.notification_counts; + Ok(( + room_info, + Some(JoinedRoom::new( + timeline, + raw_state_events, + room_account_data.unwrap_or_default(), + ephemeral, + notification_counts, + )), + None, + None, + )) + } RoomState::Left => Ok(( room_info, @@ -656,7 +694,7 @@ mod tests { async fn can_process_empty_sliding_sync_response() { let client = logged_in_client().await; let empty_response = v4::Response::new("5".to_owned()); - client.process_sliding_sync(&empty_response).await.expect("Failed to process sync"); + client.process_sliding_sync(&empty_response, &()).await.expect("Failed to process sync"); } #[async_test] @@ -671,7 +709,7 @@ mod tests { room.joined_count = Some(uint!(41)); let response = response_with_room(room_id, room).await; let sync_resp = - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room appears in the client (with the same joined count) let client_room = client.get_room(room_id).expect("No room found"); @@ -696,7 +734,7 @@ mod tests { room.name = Some("little room".to_owned()); let response = response_with_room(room_id, room).await; let sync_resp = - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room appears in the client with the expected name let client_room = client.get_room(room_id).expect("No room found"); @@ -722,7 +760,7 @@ mod tests { room.name = Some("little room".to_owned()); let response = response_with_room(room_id, room).await; let sync_resp = - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room appears in the client with the expected name let client_room = client.get_room(room_id).expect("No room found"); @@ -746,7 +784,7 @@ mod tests { let mut room = v4::SlidingSyncRoom::new(); set_room_joined(&mut room, user_id); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined); // And then leave with a `required_state` state event… @@ -754,7 +792,7 @@ mod tests { set_room_left(&mut room, user_id); let response = response_with_room(room_id, room).await; let sync_resp = - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // The room is left. assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left); @@ -776,14 +814,14 @@ mod tests { let mut room = v4::SlidingSyncRoom::new(); set_room_joined(&mut room, user_id); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined); // And then leave with a `timeline` state event… let mut room = v4::SlidingSyncRoom::new(); set_room_left_as_timeline_event(&mut room, user_id); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // The room is left. assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left); @@ -802,7 +840,7 @@ mod tests { let mut room = v4::SlidingSyncRoom::new(); set_room_joined(&mut room, user_id); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // (sanity: state is join) assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined); @@ -810,7 +848,7 @@ mod tests { let mut room = v4::SlidingSyncRoom::new(); set_room_left(&mut room, user_id); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // (sanity: state is left) assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left); @@ -818,7 +856,7 @@ mod tests { let mut room = v4::SlidingSyncRoom::new(); set_room_invited(&mut room, user_id); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room is in the invite state assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited); @@ -931,7 +969,7 @@ mod tests { // When I send sliding sync response containing a room with an avatar let room = room_with_avatar(mxc_uri!("mxc://e.uk/med1"), user_id); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room in the client has the avatar let client_room = client.get_room(room_id).expect("No room found"); @@ -953,7 +991,7 @@ mod tests { set_room_invited(&mut room, user_id); let response = response_with_room(room_id, room).await; let sync_resp = - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room is added to the client let client_room = client.get_room(room_id).expect("No room found"); @@ -976,7 +1014,7 @@ mod tests { let mut room = room_with_avatar(mxc_uri!("mxc://e.uk/med1"), user_id); set_room_invited(&mut room, user_id); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room in the client has the avatar let client_room = client.get_room(room_id).expect("No room found"); @@ -998,7 +1036,7 @@ mod tests { let mut room = room_with_canonical_alias(room_alias_id, user_id); set_room_invited(&mut room, user_id); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room in the client has the avatar let client_room = client.get_room(room_id).expect("No room found"); @@ -1018,7 +1056,7 @@ mod tests { let mut room = room_with_canonical_alias(room_alias_id, user_id); room.name = Some("This came from the server".to_owned()); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room's name is just exactly what the server supplied let client_room = client.get_room(room_id).expect("No room found"); @@ -1052,7 +1090,7 @@ mod tests { let events = &[event_a, event_b.clone()]; let room = room_with_timeline(events); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room holds the latest event let client_room = client.get_room(room_id).expect("No room found"); @@ -1078,7 +1116,7 @@ mod tests { // When the sliding sync response contains a timeline let room = room_with_timeline(&[event_a]); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room holds the latest event let client_room = client.get_room(room_id).expect("No room found"); @@ -1099,7 +1137,7 @@ mod tests { // When a redaction for that event is received let room = room_with_timeline(&[redaction]); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); // Then the room still holds the latest event let client_room = client.get_room(room_id).expect("No room found"); @@ -1486,7 +1524,7 @@ mod tests { let mut response = response_with_room(room_id, room).await; set_direct_with(&mut response, their_id.to_owned(), vec![room_id.to_owned()]); - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); } /// Set this user's membership within this room to new_state @@ -1499,7 +1537,7 @@ mod tests { let mut room = v4::SlidingSyncRoom::new(); room.required_state.push(make_membership_event(user_id, new_state)); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.expect("Failed to process sync"); + client.process_sliding_sync(&response, &()).await.expect("Failed to process sync"); } fn set_direct_with( diff --git a/crates/matrix-sdk-base/src/store/migration_helpers.rs b/crates/matrix-sdk-base/src/store/migration_helpers.rs index 58134876c7e..6db9e2ebbe7 100644 --- a/crates/matrix-sdk-base/src/store/migration_helpers.rs +++ b/crates/matrix-sdk-base/src/store/migration_helpers.rs @@ -118,6 +118,7 @@ impl RoomInfoV1 { encryption_state_synced, #[cfg(feature = "experimental-sliding-sync")] latest_event: latest_event.map(|ev| Box::new(LatestEvent::new(ev))), + read_receipts: Default::default(), base_info: base_info.migrate(create), } } diff --git a/crates/matrix-sdk-base/src/sync.rs b/crates/matrix-sdk-base/src/sync.rs index ac29a12ff07..760be905126 100644 --- a/crates/matrix-sdk-base/src/sync.rs +++ b/crates/matrix-sdk-base/src/sync.rs @@ -140,7 +140,7 @@ impl JoinedRoom { } /// Counts of unread notifications for a room. -#[derive(Copy, Clone, Debug, Default, Deserialize, Serialize)] +#[derive(Copy, Clone, Debug, Default, Deserialize, Serialize, PartialEq)] pub struct UnreadNotificationsCount { /// The number of unread notifications for this room with the highlight flag /// set. diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index 1d5fa797a2c..054c33feb72 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -560,7 +560,8 @@ mod tests { } #[async_test] - async fn latest_message_event_can_be_wrapped_as_a_timeline_item_with_sender_from_the_storage() { + async fn test_latest_message_event_can_be_wrapped_as_a_timeline_item_with_sender_from_the_storage( + ) { // Given a sync event that is suitable to be used as a latest_event, and a room // with a member event for the sender @@ -574,7 +575,7 @@ mod tests { // And the room is stored in the client so it can be extracted when needed let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.unwrap(); + client.process_sliding_sync_test_helper(&response).await.unwrap(); // When we construct a timeline event from it let timeline_item = @@ -595,7 +596,8 @@ mod tests { } #[async_test] - async fn latest_message_event_can_be_wrapped_as_a_timeline_item_with_sender_from_the_cache() { + async fn test_latest_message_event_can_be_wrapped_as_a_timeline_item_with_sender_from_the_cache( + ) { // Given a sync event that is suitable to be used as a latest_event, a room, and // a member event for the sender (which isn't part of the room yet). @@ -617,7 +619,7 @@ mod tests { // And the room is stored in the client so it can be extracted when needed let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.unwrap(); + client.process_sliding_sync_test_helper(&response).await.unwrap(); // When we construct a timeline event from it let timeline_item = EventTimelineItem::from_latest_event( diff --git a/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs b/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs index 7d1bc599500..296182248cd 100644 --- a/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs +++ b/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs @@ -87,13 +87,13 @@ mod tests { } #[async_test] - async fn latest_message_event_is_wrapped_as_a_timeline_item() { + async fn test_latest_message_event_is_wrapped_as_a_timeline_item() { // Given a room exists, and an event came in through a sync let room_id = room_id!("!r:x.uk"); let user_id = user_id!("@s:o.uk"); let client = logged_in_client(None).await; let event = message_event(room_id, user_id, "**My msg**", "My msg", 122343); - process_event_via_sync(room_id, event, &client).await; + process_event_via_sync_test_helper(room_id, event, &client).await; // When we ask for the latest event in the room let room = SlidingSyncRoom::new( @@ -118,11 +118,15 @@ mod tests { } } - async fn process_event_via_sync(room_id: &RoomId, event: SyncTimelineEvent, client: &Client) { + async fn process_event_via_sync_test_helper( + room_id: &RoomId, + event: SyncTimelineEvent, + client: &Client, + ) { let mut room = v4::SlidingSyncRoom::new(); room.timeline.push(event.event); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.unwrap(); + client.process_sliding_sync_test_helper(&response).await.unwrap(); } fn message_event( diff --git a/crates/matrix-sdk/src/sliding_sync/client.rs b/crates/matrix-sdk/src/sliding_sync/client.rs index 8215fc622a3..123676f1209 100644 --- a/crates/matrix-sdk/src/sliding_sync/client.rs +++ b/crates/matrix-sdk/src/sliding_sync/client.rs @@ -1,9 +1,11 @@ -use matrix_sdk_base::sync::SyncResponse; -use ruma::{api::client::sync::sync_events::v4, events::AnyToDeviceEvent, serde::Raw}; -use tracing::{debug, instrument}; +use std::collections::BTreeMap; + +use imbl::Vector; +use matrix_sdk_base::{sync::SyncResponse, PreviousEventsProvider}; +use ruma::{api::client::sync::sync_events::v4, events::AnyToDeviceEvent, serde::Raw, OwnedRoomId}; use super::{SlidingSync, SlidingSyncBuilder}; -use crate::{Client, Result}; +use crate::{Client, Result, SlidingSyncRoom}; impl Client { /// Create a [`SlidingSyncBuilder`] tied to this client, with the given @@ -19,32 +21,48 @@ impl Client { /// /// If you need to handle encryption too, use the internal /// `SlidingSyncResponseProcessor` instead. - #[instrument(skip(self, response))] - pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result { - let response = self.base_client().process_sliding_sync(response).await?; - - debug!("done processing on base_client"); + #[cfg(any(test, feature = "testing"))] + #[tracing::instrument(skip(self, response))] + pub async fn process_sliding_sync_test_helper( + &self, + response: &v4::Response, + ) -> Result { + let response = self.base_client().process_sliding_sync(response, &()).await?; + + tracing::debug!("done processing on base_client"); self.handle_sync_response(&response).await?; Ok(response) } } +struct SlidingSyncPreviousEventsProvider<'a>(&'a BTreeMap); + +impl<'a> PreviousEventsProvider for SlidingSyncPreviousEventsProvider<'a> { + fn for_room( + &self, + room_id: &ruma::RoomId, + ) -> Vector { + self.0.get(room_id).map(|room| room.timeline_queue()).unwrap_or_default() + } +} + /// Small helper to handle a `SlidingSync` response's sub parts. /// /// This will properly handle the encryption and the room response /// independently, if needs be, making sure that both are properly processed by /// event handlers. #[must_use] -pub(crate) struct SlidingSyncResponseProcessor { +pub(crate) struct SlidingSyncResponseProcessor<'a> { client: Client, to_device_events: Vec>, response: Option, + rooms: &'a BTreeMap, } -impl SlidingSyncResponseProcessor { - pub fn new(client: Client) -> Self { - Self { client, to_device_events: Vec::new(), response: None } +impl<'a> SlidingSyncResponseProcessor<'a> { + pub fn new(client: Client, rooms: &'a BTreeMap) -> Self { + Self { client, to_device_events: Vec::new(), response: None, rooms } } #[cfg(feature = "e2e-encryption")] @@ -63,7 +81,12 @@ impl SlidingSyncResponseProcessor { } pub async fn handle_room_response(&mut self, response: &v4::Response) -> Result<()> { - self.response = Some(self.client.base_client().process_sliding_sync(response).await?); + self.response = Some( + self.client + .base_client() + .process_sliding_sync(response, &SlidingSyncPreviousEventsProvider(self.rooms)) + .await?, + ); Ok(()) } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index cfb6fc166a4..edf4dd734e9 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -325,24 +325,28 @@ impl SlidingSync { // `sliding_sync_response` is vital, so it must be done somewhere; for now it // happens here. - let mut response_processor = SlidingSyncResponseProcessor::new(self.inner.client.clone()); + let mut sync_response = { + let rooms = &*self.inner.rooms.read().await; + let mut response_processor = + SlidingSyncResponseProcessor::new(self.inner.client.clone(), rooms); - #[cfg(feature = "e2e-encryption")] - if self.is_e2ee_enabled() { - response_processor.handle_encryption(&sliding_sync_response.extensions).await? - } + #[cfg(feature = "e2e-encryption")] + if self.is_e2ee_enabled() { + response_processor.handle_encryption(&sliding_sync_response.extensions).await? + } - // Only handle the room's subsection of the response, if this sliding sync was - // configured to do so. That's because even when not requesting it, - // sometimes the current (2023-07-20) proxy will forward room events - // unrelated to the current connection's parameters. - // - // NOTE: SS proxy workaround. - if must_process_rooms_response { - response_processor.handle_room_response(&sliding_sync_response).await?; - } + // Only handle the room's subsection of the response, if this sliding sync was + // configured to do so. That's because even when not requesting it, + // sometimes the current (2023-07-20) proxy will forward room events + // unrelated to the current connection's parameters. + // + // NOTE: SS proxy workaround. + if must_process_rooms_response { + response_processor.handle_room_response(&sliding_sync_response).await?; + } - let mut sync_response = response_processor.process_and_take_response().await?; + response_processor.process_and_take_response().await? + }; debug!(?sync_response, "Sliding Sync response has been handled by the client"); diff --git a/crates/matrix-sdk/src/sliding_sync/room.rs b/crates/matrix-sdk/src/sliding_sync/room.rs index 11d07f4a80e..26381efd8f7 100644 --- a/crates/matrix-sdk/src/sliding_sync/room.rs +++ b/crates/matrix-sdk/src/sliding_sync/room.rs @@ -271,6 +271,13 @@ struct SlidingSyncRoomInner { /// A queue of received events, used to build a /// [`Timeline`][crate::Timeline]. + /// + /// Given a room, its size is theoretically unbounded: we'll accumulate + /// events in this list, until we reach a limited sync, in which case + /// we'll clear it. + /// + /// When persisting the room, this queue is truncated to keep only the last + /// N events. timeline_queue: RwLock>, } diff --git a/testing/matrix-sdk-integration-testing/Cargo.toml b/testing/matrix-sdk-integration-testing/Cargo.toml index 1914c3b9dfb..8aad9ffbd92 100644 --- a/testing/matrix-sdk-integration-testing/Cargo.toml +++ b/testing/matrix-sdk-integration-testing/Cargo.toml @@ -18,6 +18,7 @@ matrix-sdk-ui = { path = "../../crates/matrix-sdk-ui" } matrix-sdk-test = { path = "../matrix-sdk-test" } once_cell = { workspace = true } rand = { workspace = true } +stream_assert = "0.1.1" tempfile = "3.3.0" tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] } tracing = { workspace = true } diff --git a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs index 561c3e76fc7..846077090e6 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs @@ -1,16 +1,29 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use anyhow::Result; -use futures_util::{pin_mut, StreamExt}; +use assert_matches2::assert_let; +use futures_util::{pin_mut, StreamExt as _}; use matrix_sdk::{ config::SyncSettings, ruma::{ - api::client::room::create_room::v3::Request as CreateRoomRequest, assign, - events::room::message::RoomMessageEventContent, mxc_uri, + api::client::{ + receipt::create_receipt::v3::ReceiptType, + room::create_room::v3::Request as CreateRoomRequest, + sync::sync_events::v4::{ + AccountDataConfig, E2EEConfig, ReceiptsConfig, ToDeviceConfig, + }, + }, + assign, + events::{ + receipt::ReceiptThread, room::message::RoomMessageEventContent, + AnySyncMessageLikeEvent, Mentions, + }, + mxc_uri, }, RoomListEntry, RoomState, SlidingSyncList, SlidingSyncMode, }; -use tokio::time::sleep; +use stream_assert::assert_pending; +use tokio::{sync::Mutex, time::sleep}; use tracing::{error, warn}; use crate::helpers::TestClientBuilder; @@ -198,3 +211,258 @@ async fn test_room_avatar_group_conversation() -> Result<()> { Ok(()) } + +#[ignore = "times out or fails assertions in code coverage builds (#2963)"] +#[tokio::test] +async fn test_room_notification_count() -> Result<()> { + let bob = + TestClientBuilder::new("bob".to_owned()).randomize_username().use_sqlite().build().await?; + + // Spawn sync for bob. + let b = bob.clone(); + tokio::task::spawn(async move { + let bob = b; + loop { + if let Err(err) = bob.sync(Default::default()).await { + tracing::error!("bob sync error: {err}"); + } + } + }); + + // Set up sliding sync for alice. + let alice = TestClientBuilder::new("alice".to_owned()) + .randomize_username() + .use_sqlite() + .build() + .await?; + + tokio::task::spawn({ + let sync = alice + .sliding_sync("main")? + .with_receipt_extension(assign!(ReceiptsConfig::default(), { enabled: Some(true) })) + .with_account_data_extension( + assign!(AccountDataConfig::default(), { enabled: Some(true) }), + ) + .add_list( + SlidingSyncList::builder("all") + .sync_mode(SlidingSyncMode::new_selective().add_range(0..=20)), + ) + .build() + .await?; + + async move { + let stream = sync.sync(); + pin_mut!(stream); + while let Some(up) = stream.next().await { + warn!("received update: {up:?}"); + } + } + }); + + tokio::task::spawn({ + let sync = alice + .sliding_sync("e2ee")? + .with_e2ee_extension(assign!(E2EEConfig::default(), { enabled: Some(true) })) + .with_to_device_extension(assign!(ToDeviceConfig::default(), { enabled: Some(true) })) + .build() + .await?; + + async move { + let stream = sync.sync(); + pin_mut!(stream); + while let Some(up) = stream.next().await { + warn!("received update: {up:?}"); + } + } + }); + + let latest_event = Arc::new(Mutex::new(None)); + let l = latest_event.clone(); + alice.add_event_handler(|ev: AnySyncMessageLikeEvent| async move { + let mut latest_event = l.lock().await; + *latest_event = Some(ev); + }); + + // alice creates a room and invites bob. + let room_id = alice + .create_room(assign!(CreateRoomRequest::new(), { + invite: vec![bob.user_id().unwrap().to_owned()], + is_direct: true, + })) + .await? + .room_id() + .to_owned(); + + let mut alice_room = None; + for i in 1..=4 { + sleep(Duration::from_millis(30 * i)).await; + alice_room = alice.get_room(&room_id); + if alice_room.is_some() { + break; + } + } + + let alice_room = alice_room.unwrap(); + assert_eq!(alice_room.state(), RoomState::Joined); + + alice_room.enable_encryption().await?; + + let mut info_updates = alice_room.subscribe_info(); + + // At first, nothing has happened, so we shouldn't have any notifications. + assert_eq!(alice_room.num_unread_messages(), 0); + assert_eq!(alice_room.num_unread_mentions(), 0); + assert_eq!(alice_room.num_unread_notifications(), 0); + + assert_pending!(info_updates); + + // Bob joins, nothing happens. + bob.join_room_by_id(&room_id).await?; + + assert!(info_updates.next().await.is_some()); + + assert_eq!(alice_room.num_unread_messages(), 0); + assert_eq!(alice_room.num_unread_mentions(), 0); + assert_eq!(alice_room.num_unread_notifications(), 0); + assert!(alice_room.latest_event().is_none()); + + assert_pending!(info_updates); + + // Bob sends a non-mention message. + let bob_room = bob.get_room(&room_id).expect("bob knows about alice's room"); + + bob_room.send(RoomMessageEventContent::text_plain("hello world")).await?; + + assert!(info_updates.next().await.is_some()); + + assert_eq!(alice_room.num_unread_messages(), 1); + assert_eq!(alice_room.num_unread_notifications(), 1); + assert_eq!(alice_room.num_unread_mentions(), 0); + + assert_pending!(info_updates); + + // Bob sends a mention message. + bob_room + .send( + RoomMessageEventContent::text_plain("Hello my dear friend Alice!") + .set_mentions(Mentions::with_user_ids([alice.user_id().unwrap().to_owned()])), + ) + .await?; + + loop { + assert!(info_updates.next().await.is_some()); + + // FIXME we receive multiple spurious room info updates. + if alice_room.num_unread_messages() == 1 && alice_room.num_unread_mentions() == 0 { + tracing::warn!("ignoring"); + continue; + } + + // The highlight also counts as a notification. + assert_eq!(alice_room.num_unread_messages(), 2); + assert_eq!(alice_room.num_unread_notifications(), 2); + assert_eq!(alice_room.num_unread_mentions(), 1); + break; + } + + assert_pending!(info_updates); + + // Alice marks the room as read. + let event_id = latest_event.lock().await.take().unwrap().event_id().to_owned(); + alice_room.send_single_receipt(ReceiptType::Read, ReceiptThread::Unthreaded, event_id).await?; + + // Remote echo of marking the room as read. + assert_let!(Some(_room_info) = info_updates.next().await); + + loop { + assert!(info_updates.next().await.is_some()); + + if alice_room.num_unread_messages() == 2 && alice_room.num_unread_mentions() == 1 { + // Sometimes we get notified for changes to unrelated, other fields of + // `info_updates`. + tracing::warn!("ignoring"); + continue; + } + + assert_eq!(alice_room.num_unread_messages(), 0); + assert_eq!(alice_room.num_unread_notifications(), 0); + assert_eq!(alice_room.num_unread_mentions(), 0); + break; + } + + assert_pending!(info_updates); + + // Alice sends a message. + alice_room.send(RoomMessageEventContent::text_plain("hello bob")).await?; + + // Local echo for our own message. + assert!(info_updates.next().await.is_some()); + + assert_eq!(alice_room.num_unread_messages(), 0); + assert_eq!(alice_room.num_unread_notifications(), 0); + assert_eq!(alice_room.num_unread_mentions(), 0); + + // Remote echo for our own message. + assert!(info_updates.next().await.is_some()); + + assert_eq!(alice_room.num_unread_messages(), 0); + assert_eq!(alice_room.num_unread_notifications(), 0); + assert_eq!(alice_room.num_unread_mentions(), 0); + + assert_pending!(info_updates); + + // Now Alice is only interesting in mentions of their name. + let settings = alice.notification_settings().await; + + tracing::warn!("Updating room notification mode to mentions and keywords only..."); + settings + .set_room_notification_mode( + alice_room.room_id(), + matrix_sdk::notification_settings::RoomNotificationMode::MentionsAndKeywordsOnly, + ) + .await?; + tracing::warn!("Done!"); + + // Wait for remote echo. + let _ = settings.subscribe_to_changes().recv().await; + + bob_room.send(RoomMessageEventContent::text_plain("I said hello!")).await?; + + assert!(info_updates.next().await.is_some()); + + // The message doesn't contain a mention, so it doesn't notify Alice. But it + // exists. + assert_eq!(alice_room.num_unread_messages(), 1); + assert_eq!(alice_room.num_unread_notifications(), 0); + assert_eq!(alice_room.num_unread_mentions(), 0); + + assert_pending!(info_updates); + + // Bob sends a mention message. + bob_room + .send( + RoomMessageEventContent::text_plain("Why, hello there Alice!") + .set_mentions(Mentions::with_user_ids([alice.user_id().unwrap().to_owned()])), + ) + .await?; + + loop { + assert!(info_updates.next().await.is_some()); + + // FIXME we receive multiple spurious room info updates. + if alice_room.num_unread_messages() == 1 && alice_room.num_unread_mentions() == 0 { + tracing::warn!("ignoring"); + continue; + } + + // The highlight also counts as a notification. + assert_eq!(alice_room.num_unread_messages(), 2); + assert_eq!(alice_room.num_unread_notifications(), 1); + assert_eq!(alice_room.num_unread_mentions(), 1); + break; + } + + assert_pending!(info_updates); + + Ok(()) +}