From 4ce02e8ff69b71f94c51451535a2c3e6a062eaf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sat, 28 Aug 2021 11:39:33 +0200 Subject: [PATCH] fix: don't retry soft failed events --- src/client_server/sync.rs | 86 +++++++++++++----------------------- src/database.rs | 6 ++- src/database/rooms.rs | 91 ++++++++++++++++++++++++++++++++------- src/server_server.rs | 12 ++++++ 4 files changed, 124 insertions(+), 71 deletions(-) diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 21a9ef2b..d6e32ea2 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -256,8 +256,8 @@ async fn sync_helper( // Calculates joined_member_count, invited_member_count and heroes let calculate_counts = || { - let joined_member_count = db.rooms.room_members(&room_id).count(); - let invited_member_count = db.rooms.room_members_invited(&room_id).count(); + let joined_member_count = db.rooms.room_joined_count(&room_id)?.unwrap_or(0); + let invited_member_count = db.rooms.room_invited_count(&room_id)?.unwrap_or(0); // Recalculate heroes (first 5 members) let mut heroes = Vec::new(); @@ -407,64 +407,40 @@ async fn sync_helper( }); if encrypted_room { - for (user_id, current_member) in db - .rooms - .room_members(&room_id) - .filter_map(|r| r.ok()) - .filter_map(|user_id| { - db.rooms - .state_get( - current_shortstatehash, - &EventType::RoomMember, - user_id.as_str(), - ) - .ok() - .flatten() - .map(|current_member| (user_id, current_member)) - }) - { - let current_membership = serde_json::from_value::< - Raw, - >(current_member.content.clone()) - .expect("Raw::from_value always works") - .deserialize() - .map_err(|_| Error::bad_database("Invalid PDU in database."))? - .membership; + for state_event in &state_events { + if state_event.kind != EventType::RoomMember { + continue; + } - let since_membership = db - .rooms - .state_get( - since_shortstatehash, - &EventType::RoomMember, - user_id.as_str(), - )? - .and_then(|since_member| { - serde_json::from_value::< - Raw, - >(since_member.content.clone()) - .expect("Raw::from_value always works") - .deserialize() - .map_err(|_| Error::bad_database("Invalid PDU in database.")) - .ok() - }) - .map_or(MembershipState::Leave, |member| member.membership); + if let Some(state_key) = &state_event.state_key { + let user_id = UserId::try_from(state_key.clone()) + .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?; - let user_id = UserId::try_from(user_id.clone()) - .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?; + if user_id == sender_user { + continue; + } - match (since_membership, current_membership) { - (MembershipState::Leave, MembershipState::Join) => { - // A new user joined an encrypted room - if !share_encrypted_room(&db, &sender_user, &user_id, &room_id)? { - device_list_updates.insert(user_id); + let new_membership = serde_json::from_value::< + Raw, + >(state_event.content.clone()) + .expect("Raw::from_value always works") + .deserialize() + .map_err(|_| Error::bad_database("Invalid PDU in database."))? + .membership; + + match new_membership { + MembershipState::Join => { + // A new user joined an encrypted room + if !share_encrypted_room(&db, &sender_user, &user_id, &room_id)? { + device_list_updates.insert(user_id); + } } + MembershipState::Leave => { + // Write down users that have left encrypted rooms we are in + left_encrypted_users.insert(user_id); + } + _ => {} } - // TODO: Remove, this should never happen here, right? - (MembershipState::Join, MembershipState::Leave) => { - // Write down users that have left encrypted rooms we are in - left_encrypted_users.insert(user_id); - } - _ => {} } } } diff --git a/src/database.rs b/src/database.rs index 8fd745bc..ca3d2f0c 100644 --- a/src/database.rs +++ b/src/database.rs @@ -252,6 +252,7 @@ impl Database { userroomid_joined: builder.open_tree("userroomid_joined")?, roomuserid_joined: builder.open_tree("roomuserid_joined")?, roomid_joinedcount: builder.open_tree("roomid_joinedcount")?, + roomid_invitedcount: builder.open_tree("roomid_invitedcount")?, roomuseroncejoinedids: builder.open_tree("roomuseroncejoinedids")?, userroomid_invitestate: builder.open_tree("userroomid_invitestate")?, roomuserid_invitecount: builder.open_tree("roomuserid_invitecount")?, @@ -277,6 +278,8 @@ impl Database { statehash_shortstatehash: builder.open_tree("statehash_shortstatehash")?, eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?, + softfailedeventids: builder.open_tree("softfailedeventids")?, + referencedevents: builder.open_tree("referencedevents")?, pdu_cache: Mutex::new(LruCache::new(100_000)), auth_chain_cache: Mutex::new(LruCache::new(1_000_000)), @@ -285,6 +288,7 @@ impl Database { shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)), statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), stateinfo_cache: Mutex::new(LruCache::new(1000)), + our_real_users_cache: RwLock::new(HashMap::new()), }, account_data: account_data::AccountData { roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, @@ -442,7 +446,7 @@ impl Database { let room_id = RoomId::try_from(utils::string_from_bytes(&roomid).unwrap()).unwrap(); - db.rooms.update_joined_count(&room_id)?; + db.rooms.update_joined_count(&room_id, &db)?; } db.globals.bump_database_version(6)?; diff --git a/src/database/rooms.rs b/src/database/rooms.rs index b829a1bf..729c8f30 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -26,7 +26,7 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, convert::{TryFrom, TryInto}, mem::size_of, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, RwLock}, time::Instant, }; use tokio::sync::MutexGuard; @@ -59,6 +59,7 @@ pub struct Rooms { pub(super) userroomid_joined: Arc, pub(super) roomuserid_joined: Arc, pub(super) roomid_joinedcount: Arc, + pub(super) roomid_invitedcount: Arc, pub(super) roomuseroncejoinedids: Arc, pub(super) userroomid_invitestate: Arc, // InviteState = Vec> pub(super) roomuserid_invitecount: Arc, // InviteCount = Count @@ -90,6 +91,7 @@ pub struct Rooms { /// RoomId + EventId -> outlier PDU. /// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn. pub(super) eventid_outlierpdu: Arc, + pub(super) softfailedeventids: Arc, /// RoomId + EventId -> Parent PDU EventId. pub(super) referencedevents: Arc, @@ -100,6 +102,7 @@ pub struct Rooms { pub(super) eventidshort_cache: Mutex>, pub(super) statekeyshort_cache: Mutex>, pub(super) shortstatekey_cache: Mutex>, + pub(super) our_real_users_cache: RwLock>>>, pub(super) stateinfo_cache: Mutex< LruCache< u64, @@ -425,7 +428,7 @@ impl Rooms { } } - self.update_joined_count(room_id)?; + self.update_joined_count(room_id, &db)?; self.roomid_shortstatehash .insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?; @@ -1229,9 +1232,19 @@ impl Rooms { self.eventid_outlierpdu.insert( &event_id.as_bytes(), &serde_json::to_vec(&pdu).expect("CanonicalJsonObject is valid"), - )?; + ) + } - Ok(()) + #[tracing::instrument(skip(self))] + pub fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()> { + self.softfailedeventids.insert(&event_id.as_bytes(), &[]) + } + + #[tracing::instrument(skip(self))] + pub fn is_event_soft_failed(&self, event_id: &EventId) -> Result { + self.softfailedeventids + .get(&event_id.as_bytes()) + .map(|o| o.is_some()) } /// Creates a new persisted data unit and adds it to a room. @@ -1334,15 +1347,9 @@ impl Rooms { let mut notifies = Vec::new(); let mut highlights = Vec::new(); - for user in db - .rooms - .room_members(&pdu.room_id) - .filter_map(|r| r.ok()) - .filter(|user_id| user_id.server_name() == db.globals.server_name()) - .filter(|user_id| !db.users.is_deactivated(user_id).unwrap_or(true)) - { + for user in self.get_our_real_users(&pdu.room_id, db)?.iter() { // Don't notify the user of their own events - if user == pdu.sender { + if user == &pdu.sender { continue; } @@ -2443,29 +2450,45 @@ impl Rooms { } if update_joined_count { - self.update_joined_count(room_id)?; + self.update_joined_count(room_id, db)?; } Ok(()) } - #[tracing::instrument(skip(self))] - pub fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { + #[tracing::instrument(skip(self, room_id, db))] + pub fn update_joined_count(&self, room_id: &RoomId, db: &Database) -> Result<()> { let mut joinedcount = 0_u64; + let mut invitedcount = 0_u64; let mut joined_servers = HashSet::new(); + let mut real_users = HashSet::new(); for joined in self.room_members(&room_id).filter_map(|r| r.ok()) { joined_servers.insert(joined.server_name().to_owned()); + if joined.server_name() == db.globals.server_name() + && !db.users.is_deactivated(&joined).unwrap_or(true) + { + real_users.insert(joined); + } joinedcount += 1; } for invited in self.room_members_invited(&room_id).filter_map(|r| r.ok()) { joined_servers.insert(invited.server_name().to_owned()); + invitedcount += 1; } self.roomid_joinedcount .insert(room_id.as_bytes(), &joinedcount.to_be_bytes())?; + self.roomid_invitedcount + .insert(room_id.as_bytes(), &invitedcount.to_be_bytes())?; + + self.our_real_users_cache + .write() + .unwrap() + .insert(room_id.clone(), Arc::new(real_users)); + for old_joined_server in self.room_servers(room_id).filter_map(|r| r.ok()) { if !joined_servers.remove(&old_joined_server) { // Server not in room anymore @@ -2499,6 +2522,32 @@ impl Rooms { Ok(()) } + #[tracing::instrument(skip(self, room_id, db))] + pub fn get_our_real_users( + &self, + room_id: &RoomId, + db: &Database, + ) -> Result>> { + let maybe = self + .our_real_users_cache + .read() + .unwrap() + .get(room_id) + .cloned(); + if let Some(users) = maybe { + Ok(users) + } else { + self.update_joined_count(room_id, &db)?; + Ok(Arc::clone( + self.our_real_users_cache + .read() + .unwrap() + .get(room_id) + .unwrap(), + )) + } + } + #[tracing::instrument(skip(self, db))] pub async fn leave_room( &self, @@ -2977,6 +3026,18 @@ impl Rooms { .transpose()?) } + #[tracing::instrument(skip(self))] + pub fn room_invited_count(&self, room_id: &RoomId) -> Result> { + Ok(self + .roomid_invitedcount + .get(room_id.as_bytes())? + .map(|b| { + utils::u64_from_bytes(&b) + .map_err(|_| Error::bad_database("Invalid joinedcount in db.")) + }) + .transpose()?) + } + /// Returns an iterator over all User IDs who ever joined a room. #[tracing::instrument(skip(self))] pub fn room_useroncejoined<'a>( diff --git a/src/server_server.rs b/src/server_server.rs index 85a32f8f..331e9567 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1271,6 +1271,15 @@ async fn upgrade_outlier_to_timeline_pdu( if let Ok(Some(pduid)) = db.rooms.get_pdu_id(&incoming_pdu.event_id) { return Ok(Some(pduid)); } + + if db + .rooms + .is_event_soft_failed(&incoming_pdu.event_id) + .map_err(|_| "Failed to ask db for soft fail".to_owned())? + { + return Err("Event has been soft failed".into()); + } + // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities // doing all the checks in this list starting at 1. These are not timeline events. @@ -1683,6 +1692,9 @@ async fn upgrade_outlier_to_timeline_pdu( if soft_fail { // Soft fail, we keep the event as an outlier but don't add it to the timeline warn!("Event was soft failed: {:?}", incoming_pdu); + db.rooms + .mark_event_soft_failed(&incoming_pdu.event_id) + .map_err(|_| "Failed to set soft failed flag".to_owned())?; return Err("Event has been soft failed".into()); }