diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index f07f2adb..c930ce49 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -649,7 +649,8 @@ async fn join_room_by_id_helper( services().rooms.timeline.append_pdu( &parsed_pdu, join_event, - iter::once(&*parsed_pdu.event_id), + vec![(*parsed_pdu.event_id).to_owned()], + &state_lock )?; // We set the room state after inserting the pdu, so that we never have a moment in time diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index dfbdbc64..41df5441 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -54,19 +54,20 @@ impl service::rooms::search::Data for KeyValueDatabase { .map(|(key, _)| key[key.len() - size_of::()..].to_vec()) }); - Ok(utils::common_elements(iterators, |a, b| { + let common_elements = match utils::common_elements(iterators, |a, b| { // We compare b with a because we reversed the iterator earlier b.cmp(a) - }) - .map(|iter| { - ( - Box::new(iter.map(move |id| { + }) { + Some(it) => it, + None => return Ok(None), + }; + + let mapped = common_elements.map(move |id| { let mut pduid = prefix_clone.clone(); pduid.extend_from_slice(&id); pduid - })), - words, - ) - })) + }); + + Ok(Some((Box::new(mapped), words))) } } diff --git a/src/database/key_value/rooms/state.rs b/src/database/key_value/rooms/state.rs index b2822b32..90ac0d55 100644 --- a/src/database/key_value/rooms/state.rs +++ b/src/database/key_value/rooms/state.rs @@ -49,7 +49,7 @@ impl service::rooms::state::Data for KeyValueDatabase { fn set_forward_extremities<'a>( &self, room_id: &RoomId, - event_ids: &mut dyn Iterator, + event_ids: Vec>, _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()> { let mut prefix = room_id.as_bytes().to_vec(); diff --git a/src/database/key_value/rooms/state_cache.rs b/src/database/key_value/rooms/state_cache.rs index 5f054858..4043bc40 100644 --- a/src/database/key_value/rooms/state_cache.rs +++ b/src/database/key_value/rooms/state_cache.rs @@ -1,6 +1,9 @@ -use ruma::{UserId, RoomId, events::{AnyStrippedStateEvent, AnySyncStateEvent}, serde::Raw}; +use std::{collections::HashSet, sync::Arc}; -use crate::{service, database::KeyValueDatabase, services, Result}; +use regex::Regex; +use ruma::{UserId, RoomId, events::{AnyStrippedStateEvent, AnySyncStateEvent}, serde::Raw, ServerName}; + +use crate::{service, database::KeyValueDatabase, services, Result, Error, utils}; impl service::rooms::state_cache::Data for KeyValueDatabase { fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { @@ -75,4 +78,485 @@ impl service::rooms::state_cache::Data for KeyValueDatabase { Ok(()) } + + fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { + let mut joinedcount = 0_u64; + let mut invitedcount = 0_u64; + let mut joined_servers = HashSet::new(); + let mut real_users = HashSet::new(); + + for joined in self.room_members(room_id).filter_map(|r| r.ok()) { + joined_servers.insert(joined.server_name().to_owned()); + if joined.server_name() == services().globals.server_name() + && !services().users.is_deactivated(&joined).unwrap_or(true) + { + real_users.insert(joined); + } + joinedcount += 1; + } + + for invited in self.room_members_invited(room_id).filter_map(|r| r.ok()) { + joined_servers.insert(invited.server_name().to_owned()); + invitedcount += 1; + } + + self.roomid_joinedcount + .insert(room_id.as_bytes(), &joinedcount.to_be_bytes())?; + + self.roomid_invitedcount + .insert(room_id.as_bytes(), &invitedcount.to_be_bytes())?; + + self.our_real_users_cache + .write() + .unwrap() + .insert(room_id.to_owned(), Arc::new(real_users)); + + self.appservice_in_room_cache + .write() + .unwrap() + .remove(room_id); + + Ok(()) + } + + #[tracing::instrument(skip(self, room_id))] + fn get_our_real_users(&self, room_id: &RoomId) -> Result>>> { + let maybe = self + .our_real_users_cache + .read() + .unwrap() + .get(room_id) + .cloned(); + if let Some(users) = maybe { + Ok(users) + } else { + self.update_joined_count(room_id)?; + Ok(Arc::clone( + self.our_real_users_cache + .read() + .unwrap() + .get(room_id) + .unwrap(), + )) + } + } + + #[tracing::instrument(skip(self, room_id, appservice))] + fn appservice_in_room( + &self, + room_id: &RoomId, + appservice: &(String, serde_yaml::Value), + ) -> Result { + let maybe = self + .appservice_in_room_cache + .read() + .unwrap() + .get(room_id) + .and_then(|map| map.get(&appservice.0)) + .copied(); + + if let Some(b) = maybe { + Ok(b) + } else if let Some(namespaces) = appservice.1.get("namespaces") { + let users = namespaces + .get("users") + .and_then(|users| users.as_sequence()) + .map_or_else(Vec::new, |users| { + users + .iter() + .filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok()) + .collect::>() + }); + + let bridge_user_id = appservice + .1 + .get("sender_localpart") + .and_then(|string| string.as_str()) + .and_then(|string| { + UserId::parse_with_server_name(string, services().globals.server_name()).ok() + }); + + let in_room = bridge_user_id + .map_or(false, |id| self.is_joined(&id, room_id).unwrap_or(false)) + || self.room_members(room_id).any(|userid| { + userid.map_or(false, |userid| { + users.iter().any(|r| r.is_match(userid.as_str())) + }) + }); + + self.appservice_in_room_cache + .write() + .unwrap() + .entry(room_id.to_owned()) + .or_default() + .insert(appservice.0.clone(), in_room); + + Ok(in_room) + } else { + Ok(false) + } + } + + /// Makes a user forget a room. + #[tracing::instrument(skip(self))] + fn forget(&self, room_id: &RoomId, user_id: &UserId) -> Result<()> { + let mut userroom_id = user_id.as_bytes().to_vec(); + userroom_id.push(0xff); + userroom_id.extend_from_slice(room_id.as_bytes()); + + let mut roomuser_id = room_id.as_bytes().to_vec(); + roomuser_id.push(0xff); + roomuser_id.extend_from_slice(user_id.as_bytes()); + + self.userroomid_leftstate.remove(&userroom_id)?; + self.roomuserid_leftcount.remove(&roomuser_id)?; + + Ok(()) + } + + /// Returns an iterator of all servers participating in this room. + #[tracing::instrument(skip(self))] + fn room_servers<'a>( + &'a self, + room_id: &RoomId, + ) -> Box>> + 'a> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + Box::new(self.roomserverids.scan_prefix(prefix).map(|(key, _)| { + ServerName::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("Server name in roomserverids is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("Server name in roomserverids is invalid.")) + })) + } + + #[tracing::instrument(skip(self))] + fn server_in_room<'a>(&'a self, server: &ServerName, room_id: &RoomId) -> Result { + let mut key = server.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + + self.serverroomids.get(&key).map(|o| o.is_some()) + } + + /// Returns an iterator of all rooms a server participates in (as far as we know). + #[tracing::instrument(skip(self))] + fn server_rooms<'a>( + &'a self, + server: &ServerName, + ) -> Box>> + 'a> { + let mut prefix = server.as_bytes().to_vec(); + prefix.push(0xff); + + Box::new(self.serverroomids.scan_prefix(prefix).map(|(key, _)| { + RoomId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| Error::bad_database("RoomId in serverroomids is invalid unicode."))?, + ) + .map_err(|_| Error::bad_database("RoomId in serverroomids is invalid.")) + })) + } + + /// Returns an iterator over all joined members of a room. + #[tracing::instrument(skip(self))] + fn room_members<'a>( + &'a self, + room_id: &RoomId, + ) -> Box>> + 'a> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + Box::new(self.roomuserid_joined.scan_prefix(prefix).map(|(key, _)| { + UserId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("User ID in roomuserid_joined is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("User ID in roomuserid_joined is invalid.")) + })) + } + + #[tracing::instrument(skip(self))] + fn room_joined_count(&self, room_id: &RoomId) -> Result> { + self.roomid_joinedcount + .get(room_id.as_bytes())? + .map(|b| { + utils::u64_from_bytes(&b) + .map_err(|_| Error::bad_database("Invalid joinedcount in db.")) + }) + .transpose() + } + + #[tracing::instrument(skip(self))] + fn room_invited_count(&self, room_id: &RoomId) -> Result> { + self.roomid_invitedcount + .get(room_id.as_bytes())? + .map(|b| { + utils::u64_from_bytes(&b) + .map_err(|_| Error::bad_database("Invalid joinedcount in db.")) + }) + .transpose() + } + + /// Returns an iterator over all User IDs who ever joined a room. + #[tracing::instrument(skip(self))] + fn room_useroncejoined<'a>( + &'a self, + room_id: &RoomId, + ) -> Box>> + 'a> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + Box::new(self.roomuseroncejoinedids + .scan_prefix(prefix) + .map(|(key, _)| { + UserId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("User ID in room_useroncejoined is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("User ID in room_useroncejoined is invalid.")) + })) + } + + /// Returns an iterator over all invited members of a room. + #[tracing::instrument(skip(self))] + fn room_members_invited<'a>( + &'a self, + room_id: &RoomId, + ) -> Box>> + 'a> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + Box::new(self.roomuserid_invitecount + .scan_prefix(prefix) + .map(|(key, _)| { + UserId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("User ID in roomuserid_invited is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("User ID in roomuserid_invited is invalid.")) + })) + } + + #[tracing::instrument(skip(self))] + fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result> { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + + self.roomuserid_invitecount + .get(&key)? + .map_or(Ok(None), |bytes| { + Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Invalid invitecount in db.") + })?)) + }) + } + + #[tracing::instrument(skip(self))] + fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result> { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + + self.roomuserid_leftcount + .get(&key)? + .map(|bytes| { + utils::u64_from_bytes(&bytes) + .map_err(|_| Error::bad_database("Invalid leftcount in db.")) + }) + .transpose() + } + + /// Returns an iterator over all rooms this user joined. + #[tracing::instrument(skip(self))] + fn rooms_joined<'a>( + &'a self, + user_id: &UserId, + ) -> Box>> + 'a> { + Box::new(self.userroomid_joined + .scan_prefix(user_id.as_bytes().to_vec()) + .map(|(key, _)| { + RoomId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("Room ID in userroomid_joined is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("Room ID in userroomid_joined is invalid.")) + })) + } + + /// Returns an iterator over all rooms a user was invited to. + #[tracing::instrument(skip(self))] + fn rooms_invited<'a>( + &'a self, + user_id: &UserId, + ) -> Box, Vec>)>> + 'a> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + + Box::new(self.userroomid_invitestate + .scan_prefix(prefix) + .map(|(key, state)| { + let room_id = RoomId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("Room ID in userroomid_invited is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?; + + let state = serde_json::from_slice(&state) + .map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate."))?; + + Ok((room_id, state)) + })) + } + + #[tracing::instrument(skip(self))] + fn invite_state( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result>>> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + + self.userroomid_invitestate + .get(&key)? + .map(|state| { + let state = serde_json::from_slice(&state) + .map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate."))?; + + Ok(state) + }) + .transpose() + } + + #[tracing::instrument(skip(self))] + fn left_state( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result>>> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + + self.userroomid_leftstate + .get(&key)? + .map(|state| { + let state = serde_json::from_slice(&state) + .map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))?; + + Ok(state) + }) + .transpose() + } + + /// Returns an iterator over all rooms a user left. + #[tracing::instrument(skip(self))] + fn rooms_left<'a>( + &'a self, + user_id: &UserId, + ) -> Box, Vec>)>> + 'a> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + + Box::new(self.userroomid_leftstate + .scan_prefix(prefix) + .map(|(key, state)| { + let room_id = RoomId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("Room ID in userroomid_invited is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?; + + let state = serde_json::from_slice(&state) + .map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))?; + + Ok((room_id, state)) + })) + } + + #[tracing::instrument(skip(self))] + fn once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result { + let mut userroom_id = user_id.as_bytes().to_vec(); + userroom_id.push(0xff); + userroom_id.extend_from_slice(room_id.as_bytes()); + + Ok(self.roomuseroncejoinedids.get(&userroom_id)?.is_some()) + } + + #[tracing::instrument(skip(self))] + fn is_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result { + let mut userroom_id = user_id.as_bytes().to_vec(); + userroom_id.push(0xff); + userroom_id.extend_from_slice(room_id.as_bytes()); + + Ok(self.userroomid_joined.get(&userroom_id)?.is_some()) + } + + #[tracing::instrument(skip(self))] + fn is_invited(&self, user_id: &UserId, room_id: &RoomId) -> Result { + let mut userroom_id = user_id.as_bytes().to_vec(); + userroom_id.push(0xff); + userroom_id.extend_from_slice(room_id.as_bytes()); + + Ok(self.userroomid_invitestate.get(&userroom_id)?.is_some()) + } + + #[tracing::instrument(skip(self))] + fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> Result { + let mut userroom_id = user_id.as_bytes().to_vec(); + userroom_id.push(0xff); + userroom_id.extend_from_slice(room_id.as_bytes()); + + Ok(self.userroomid_leftstate.get(&userroom_id)?.is_some()) + } } diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index 0b7286b2..17231867 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -187,13 +187,29 @@ impl service::rooms::timeline::Data for KeyValueDatabase { .map_err(|_| Error::bad_database("PDU has invalid count bytes.")) } + fn append_pdu(&self, pdu_id: &[u8], pdu: &PduEvent, json: &CanonicalJsonObject, count: u64) -> Result<()> { + self.pduid_pdu.insert( + pdu_id, + &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"))?; + + self.lasttimelinecount_cache + .lock() + .unwrap() + .insert(pdu.room_id.clone(), count); + + self.eventid_pduid + .insert(pdu.event_id.as_bytes(), &pdu_id)?; + self.eventid_outlierpdu.remove(pdu.event_id.as_bytes())?; + + Ok(()) + } + /// Removes a pdu and creates a new one with the same id. fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> { if self.pduid_pdu.get(pdu_id)?.is_some() { self.pduid_pdu.insert( pdu_id, - &serde_json::to_vec(pdu).expect("PduEvent::to_vec always works"), - )?; + &serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"))?; Ok(()) } else { Err(Error::BadRequest( @@ -306,4 +322,27 @@ impl service::rooms::timeline::Data for KeyValueDatabase { Ok((pdu_id, pdu)) }))) } + + fn increment_notification_counts(&self, room_id: &RoomId, notifies: Vec>, highlights: Vec>) -> Result<()> { + let notifies_batch = Vec::new(); + let highlights_batch = Vec::new(); + for user in notifies { + let mut userroom_id = user.as_bytes().to_vec(); + userroom_id.push(0xff); + userroom_id.extend_from_slice(room_id.as_bytes()); + notifies_batch.push(userroom_id); + } + for user in highlights { + let mut userroom_id = user.as_bytes().to_vec(); + userroom_id.push(0xff); + userroom_id.extend_from_slice(room_id.as_bytes()); + highlights_batch.push(userroom_id); + } + + self.userroomid_notificationcount + .increment_batch(&mut notifies_batch.into_iter())?; + self.userroomid_highlightcount + .increment_batch(&mut highlights_batch.into_iter())?; + Ok(()) + } } diff --git a/src/service/mod.rs b/src/service/mod.rs index a772c1db..daf43293 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -3,6 +3,8 @@ use std::{ sync::{Arc, Mutex}, }; +use lru_cache::LruCache; + use crate::{Result, Config}; pub mod account_data; @@ -74,8 +76,8 @@ impl Services { state: rooms::state::Service { db: db.clone() }, state_accessor: rooms::state_accessor::Service { db: db.clone() }, state_cache: rooms::state_cache::Service { db: db.clone() }, - state_compressor: rooms::state_compressor::Service { db: db.clone() }, - timeline: rooms::timeline::Service { db: db.clone() }, + state_compressor: rooms::state_compressor::Service { db: db.clone(), stateinfo_cache: Mutex::new(LruCache::new((100.0 * config.conduit_cache_capacity_modifier) as usize,)) }, + timeline: rooms::timeline::Service { db: db.clone(), lasttimelinecount_cache: Mutex::new(HashMap::new()) }, user: rooms::user::Service { db: db.clone() }, }, transaction_ids: transaction_ids::Service { diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 79f93b50..d6ec8e95 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -755,7 +755,7 @@ impl Service { services().rooms.timeline.append_incoming_pdu( &incoming_pdu, val, - extremities.iter().map(std::ops::Deref::deref), + extremities.iter().map(|e| (**e).to_owned()).collect(), state_ids_compressed, soft_fail, &state_lock, @@ -936,7 +936,7 @@ impl Service { // Set the new room state to the resolved state if update_state { info!("Forcing new room state"); - let (sstatehash, _, _) = services().rooms.state_compressor.save_state(room_id, new_room_state)?; + let sstatehash = services().rooms.state_compressor.save_state(room_id, new_room_state)?; services() .rooms .state @@ -955,7 +955,7 @@ impl Service { .append_incoming_pdu( &incoming_pdu, val, - extremities.iter().map(std::ops::Deref::deref), + extremities.iter().map(|e| (**e).to_owned()).collect(), state_ids_compressed, soft_fail, &state_lock, diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs index 20c177a2..8eca21d1 100644 --- a/src/service/rooms/state/data.rs +++ b/src/service/rooms/state/data.rs @@ -22,7 +22,7 @@ pub trait Data: Send + Sync { /// Replace the forward extremities of the room. fn set_forward_extremities<'a>(&self, room_id: &RoomId, - event_ids: &mut dyn Iterator, + event_ids: Vec>, _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()>; } diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 79807c55..57a0e773 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -294,6 +294,14 @@ impl Service { self.db.get_forward_extremities(room_id) } + pub fn set_forward_extremities<'a>(&self, + room_id: &RoomId, + event_ids: Vec>, + state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + ) -> Result<()> { + self.db.set_forward_extremities(room_id, event_ids, state_lock) + } + /// This fetches auth events from the current state. #[tracing::instrument(skip(self))] pub fn get_auth_events( diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index fd299489..a0f5523b 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -13,17 +13,15 @@ pub struct Service { impl Service { /// Builds a StateMap by iterating over all keys that start /// with state_hash, this gives the full state for the given state_hash. - #[tracing::instrument(skip(self))] pub async fn state_full_ids(&self, shortstatehash: u64) -> Result>> { - self.db.state_full_ids(shortstatehash) + self.db.state_full_ids(shortstatehash).await } - #[tracing::instrument(skip(self))] pub async fn state_full( &self, shortstatehash: u64, ) -> Result>> { - self.db.state_full(shortstatehash) + self.db.state_full(shortstatehash).await } /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). @@ -59,7 +57,7 @@ impl Service { &self, room_id: &RoomId, ) -> Result>> { - self.db.room_state_full(room_id) + self.db.room_state_full(room_id).await } /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). diff --git a/src/service/rooms/state_cache/data.rs b/src/service/rooms/state_cache/data.rs index b9db7217..950143ff 100644 --- a/src/service/rooms/state_cache/data.rs +++ b/src/service/rooms/state_cache/data.rs @@ -1,4 +1,6 @@ -use ruma::{UserId, RoomId, serde::Raw, events::AnyStrippedStateEvent}; +use std::{collections::HashSet, sync::Arc}; + +use ruma::{UserId, RoomId, serde::Raw, events::{AnyStrippedStateEvent, AnySyncStateEvent}, ServerName}; use crate::Result; pub trait Data: Send + Sync { @@ -6,4 +8,95 @@ pub trait Data: Send + Sync { fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; fn mark_as_invited(&self, user_id: &UserId, room_id: &RoomId, last_state: Option>>) -> Result<()>; fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; + + fn update_joined_count(&self, room_id: &RoomId) -> Result<()>; + + fn get_our_real_users(&self, room_id: &RoomId) -> Result>>>; + + fn appservice_in_room( + &self, + room_id: &RoomId, + appservice: &(String, serde_yaml::Value), + ) -> Result; + + /// Makes a user forget a room. + fn forget(&self, room_id: &RoomId, user_id: &UserId) -> Result<()>; + + /// Returns an iterator of all servers participating in this room. + fn room_servers<'a>( + &'a self, + room_id: &RoomId, + ) -> Box>> + 'a>; + + fn server_in_room<'a>(&'a self, server: &ServerName, room_id: &RoomId) -> Result; + + /// Returns an iterator of all rooms a server participates in (as far as we know). + fn server_rooms<'a>( + &'a self, + server: &ServerName, + ) -> Box>> + 'a>; + + /// Returns an iterator over all joined members of a room. + fn room_members<'a>( + &'a self, + room_id: &RoomId, + ) -> Box>> + 'a>; + + fn room_joined_count(&self, room_id: &RoomId) -> Result>; + + fn room_invited_count(&self, room_id: &RoomId) -> Result>; + + /// Returns an iterator over all User IDs who ever joined a room. + fn room_useroncejoined<'a>( + &'a self, + room_id: &RoomId, + ) -> Box>> + 'a>; + + /// Returns an iterator over all invited members of a room. + fn room_members_invited<'a>( + &'a self, + room_id: &RoomId, + ) -> Box>> + 'a>; + + fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result>; + + fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result>; + + /// Returns an iterator over all rooms this user joined. + fn rooms_joined<'a>( + &'a self, + user_id: &UserId, + ) -> Box>> + 'a>; + + /// Returns an iterator over all rooms a user was invited to. + fn rooms_invited<'a>( + &'a self, + user_id: &UserId, + ) -> Box, Vec>)>> + 'a>; + + fn invite_state( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result>>>; + + fn left_state( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result>>>; + + /// Returns an iterator over all rooms a user left. + fn rooms_left<'a>( + &'a self, + user_id: &UserId, + ) -> Box, Vec>)>> + 'a>; + + fn once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result; + + fn is_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result; + + fn is_invited(&self, user_id: &UserId, room_id: &RoomId) -> Result; + + fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> Result; } diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index ab6a0d6c..69bd8328 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -191,65 +191,12 @@ impl Service { #[tracing::instrument(skip(self, room_id))] pub fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { - let mut joinedcount = 0_u64; - let mut invitedcount = 0_u64; - let mut joined_servers = HashSet::new(); - let mut real_users = HashSet::new(); - - for joined in self.room_members(room_id).filter_map(|r| r.ok()) { - joined_servers.insert(joined.server_name().to_owned()); - if joined.server_name() == services().globals.server_name() - && !services().users.is_deactivated(&joined).unwrap_or(true) - { - real_users.insert(joined); - } - joinedcount += 1; - } - - for invited in self.room_members_invited(room_id).filter_map(|r| r.ok()) { - joined_servers.insert(invited.server_name().to_owned()); - invitedcount += 1; - } - - self.roomid_joinedcount - .insert(room_id.as_bytes(), &joinedcount.to_be_bytes())?; - - self.roomid_invitedcount - .insert(room_id.as_bytes(), &invitedcount.to_be_bytes())?; - - self.our_real_users_cache - .write() - .unwrap() - .insert(room_id.to_owned(), Arc::new(real_users)); - - self.appservice_in_room_cache - .write() - .unwrap() - .remove(room_id); - - Ok(()) + self.db.update_joined_count(room_id) } #[tracing::instrument(skip(self, room_id))] pub fn get_our_real_users(&self, room_id: &RoomId) -> Result>>> { - let maybe = self - .our_real_users_cache - .read() - .unwrap() - .get(room_id) - .cloned(); - if let Some(users) = maybe { - Ok(users) - } else { - self.update_joined_count(room_id)?; - Ok(Arc::clone( - self.our_real_users_cache - .read() - .unwrap() - .get(room_id) - .unwrap(), - )) - } + self.db.get_our_real_users(room_id) } #[tracing::instrument(skip(self, room_id, appservice))] @@ -258,71 +205,13 @@ impl Service { room_id: &RoomId, appservice: &(String, serde_yaml::Value), ) -> Result { - let maybe = self - .appservice_in_room_cache - .read() - .unwrap() - .get(room_id) - .and_then(|map| map.get(&appservice.0)) - .copied(); - - if let Some(b) = maybe { - Ok(b) - } else if let Some(namespaces) = appservice.1.get("namespaces") { - let users = namespaces - .get("users") - .and_then(|users| users.as_sequence()) - .map_or_else(Vec::new, |users| { - users - .iter() - .filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok()) - .collect::>() - }); - - let bridge_user_id = appservice - .1 - .get("sender_localpart") - .and_then(|string| string.as_str()) - .and_then(|string| { - UserId::parse_with_server_name(string, services().globals.server_name()).ok() - }); - - let in_room = bridge_user_id - .map_or(false, |id| self.is_joined(&id, room_id).unwrap_or(false)) - || self.room_members(room_id).any(|userid| { - userid.map_or(false, |userid| { - users.iter().any(|r| r.is_match(userid.as_str())) - }) - }); - - self.appservice_in_room_cache - .write() - .unwrap() - .entry(room_id.to_owned()) - .or_default() - .insert(appservice.0.clone(), in_room); - - Ok(in_room) - } else { - Ok(false) - } + self.db.appservice_in_room(room_id, appservice) } /// Makes a user forget a room. #[tracing::instrument(skip(self))] pub fn forget(&self, room_id: &RoomId, user_id: &UserId) -> Result<()> { - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xff); - userroom_id.extend_from_slice(room_id.as_bytes()); - - let mut roomuser_id = room_id.as_bytes().to_vec(); - roomuser_id.push(0xff); - roomuser_id.extend_from_slice(user_id.as_bytes()); - - self.userroomid_leftstate.remove(&userroom_id)?; - self.roomuserid_leftcount.remove(&roomuser_id)?; - - Ok(()) + self.db.forget(room_id, user_id) } /// Returns an iterator of all servers participating in this room. @@ -331,31 +220,12 @@ impl Service { &'a self, room_id: &RoomId, ) -> impl Iterator>> + 'a { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - self.roomserverids.scan_prefix(prefix).map(|(key, _)| { - ServerName::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| { - Error::bad_database("Server name in roomserverids is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("Server name in roomserverids is invalid.")) - }) + self.db.room_servers(room_id) } #[tracing::instrument(skip(self))] pub fn server_in_room<'a>(&'a self, server: &ServerName, room_id: &RoomId) -> Result { - let mut key = server.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(room_id.as_bytes()); - - self.serverroomids.get(&key).map(|o| o.is_some()) + self.db.server_in_room(server, room_id) } /// Returns an iterator of all rooms a server participates in (as far as we know). @@ -364,20 +234,7 @@ impl Service { &'a self, server: &ServerName, ) -> impl Iterator>> + 'a { - let mut prefix = server.as_bytes().to_vec(); - prefix.push(0xff); - - self.serverroomids.scan_prefix(prefix).map(|(key, _)| { - RoomId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| Error::bad_database("RoomId in serverroomids is invalid unicode."))?, - ) - .map_err(|_| Error::bad_database("RoomId in serverroomids is invalid.")) - }) + self.db.server_rooms(server) } /// Returns an iterator over all joined members of a room. @@ -386,44 +243,17 @@ impl Service { &'a self, room_id: &RoomId, ) -> impl Iterator>> + 'a { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - self.roomuserid_joined.scan_prefix(prefix).map(|(key, _)| { - UserId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| { - Error::bad_database("User ID in roomuserid_joined is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("User ID in roomuserid_joined is invalid.")) - }) + self.db.room_members(room_id) } #[tracing::instrument(skip(self))] pub fn room_joined_count(&self, room_id: &RoomId) -> Result> { - self.roomid_joinedcount - .get(room_id.as_bytes())? - .map(|b| { - utils::u64_from_bytes(&b) - .map_err(|_| Error::bad_database("Invalid joinedcount in db.")) - }) - .transpose() + self.db.room_joined_count(room_id) } #[tracing::instrument(skip(self))] pub fn room_invited_count(&self, room_id: &RoomId) -> Result> { - self.roomid_invitedcount - .get(room_id.as_bytes())? - .map(|b| { - utils::u64_from_bytes(&b) - .map_err(|_| Error::bad_database("Invalid joinedcount in db.")) - }) - .transpose() + self.db.room_invited_count(room_id) } /// Returns an iterator over all User IDs who ever joined a room. @@ -432,24 +262,7 @@ impl Service { &'a self, room_id: &RoomId, ) -> impl Iterator>> + 'a { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - self.roomuseroncejoinedids - .scan_prefix(prefix) - .map(|(key, _)| { - UserId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| { - Error::bad_database("User ID in room_useroncejoined is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("User ID in room_useroncejoined is invalid.")) - }) + self.db.room_useroncejoined(room_id) } /// Returns an iterator over all invited members of a room. @@ -458,54 +271,17 @@ impl Service { &'a self, room_id: &RoomId, ) -> impl Iterator>> + 'a { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - self.roomuserid_invitecount - .scan_prefix(prefix) - .map(|(key, _)| { - UserId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| { - Error::bad_database("User ID in roomuserid_invited is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("User ID in roomuserid_invited is invalid.")) - }) + self.db.room_members_invited(room_id) } #[tracing::instrument(skip(self))] pub fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result> { - let mut key = room_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(user_id.as_bytes()); - - self.roomuserid_invitecount - .get(&key)? - .map_or(Ok(None), |bytes| { - Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Invalid invitecount in db.") - })?)) - }) + self.db.get_invite_count(room_id, user_id) } #[tracing::instrument(skip(self))] pub fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result> { - let mut key = room_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(user_id.as_bytes()); - - self.roomuserid_leftcount - .get(&key)? - .map(|bytes| { - utils::u64_from_bytes(&bytes) - .map_err(|_| Error::bad_database("Invalid leftcount in db.")) - }) - .transpose() + self.db.get_left_count(room_id, user_id) } /// Returns an iterator over all rooms this user joined. @@ -514,21 +290,7 @@ impl Service { &'a self, user_id: &UserId, ) -> impl Iterator>> + 'a { - self.userroomid_joined - .scan_prefix(user_id.as_bytes().to_vec()) - .map(|(key, _)| { - RoomId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| { - Error::bad_database("Room ID in userroomid_joined is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("Room ID in userroomid_joined is invalid.")) - }) + self.db.rooms_joined(user_id) } /// Returns an iterator over all rooms a user was invited to. @@ -537,29 +299,7 @@ impl Service { &'a self, user_id: &UserId, ) -> impl Iterator, Vec>)>> + 'a { - let mut prefix = user_id.as_bytes().to_vec(); - prefix.push(0xff); - - self.userroomid_invitestate - .scan_prefix(prefix) - .map(|(key, state)| { - let room_id = RoomId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| { - Error::bad_database("Room ID in userroomid_invited is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?; - - let state = serde_json::from_slice(&state) - .map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate."))?; - - Ok((room_id, state)) - }) + self.db.rooms_invited(user_id) } #[tracing::instrument(skip(self))] @@ -568,19 +308,7 @@ impl Service { user_id: &UserId, room_id: &RoomId, ) -> Result>>> { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(room_id.as_bytes()); - - self.userroomid_invitestate - .get(&key)? - .map(|state| { - let state = serde_json::from_slice(&state) - .map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate."))?; - - Ok(state) - }) - .transpose() + self.db.invite_state(user_id, room_id) } #[tracing::instrument(skip(self))] @@ -589,19 +317,7 @@ impl Service { user_id: &UserId, room_id: &RoomId, ) -> Result>>> { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(room_id.as_bytes()); - - self.userroomid_leftstate - .get(&key)? - .map(|state| { - let state = serde_json::from_slice(&state) - .map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))?; - - Ok(state) - }) - .transpose() + self.db.left_state(user_id, room_id) } /// Returns an iterator over all rooms a user left. @@ -610,64 +326,26 @@ impl Service { &'a self, user_id: &UserId, ) -> impl Iterator, Vec>)>> + 'a { - let mut prefix = user_id.as_bytes().to_vec(); - prefix.push(0xff); - - self.userroomid_leftstate - .scan_prefix(prefix) - .map(|(key, state)| { - let room_id = RoomId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| { - Error::bad_database("Room ID in userroomid_invited is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?; - - let state = serde_json::from_slice(&state) - .map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))?; - - Ok((room_id, state)) - }) + self.db.rooms_left(user_id) } #[tracing::instrument(skip(self))] pub fn once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result { - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xff); - userroom_id.extend_from_slice(room_id.as_bytes()); - - Ok(self.roomuseroncejoinedids.get(&userroom_id)?.is_some()) + self.db.once_joined(user_id, room_id) } #[tracing::instrument(skip(self))] pub fn is_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result { - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xff); - userroom_id.extend_from_slice(room_id.as_bytes()); - - Ok(self.userroomid_joined.get(&userroom_id)?.is_some()) + self.db.is_joined(user_id, room_id) } #[tracing::instrument(skip(self))] pub fn is_invited(&self, user_id: &UserId, room_id: &RoomId) -> Result { - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xff); - userroom_id.extend_from_slice(room_id.as_bytes()); - - Ok(self.userroomid_invitestate.get(&userroom_id)?.is_some()) + self.db.is_invited(user_id, room_id) } #[tracing::instrument(skip(self))] pub fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> Result { - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xff); - userroom_id.extend_from_slice(room_id.as_bytes()); - - Ok(self.userroomid_leftstate.get(&userroom_id)?.is_some()) + self.db.is_left(user_id, room_id) } } diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index 0c32c4bd..5f2cf02d 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -1,7 +1,8 @@ pub mod data; -use std::{mem::size_of, sync::Arc, collections::HashSet}; +use std::{mem::size_of, sync::{Arc, Mutex}, collections::HashSet}; pub use data::Data; +use lru_cache::LruCache; use ruma::{EventId, RoomId}; use crate::{Result, utils, services}; @@ -10,6 +11,19 @@ use self::data::StateDiff; pub struct Service { db: Arc, + + pub stateinfo_cache: Mutex< + LruCache< + u64, + Vec<( + u64, // sstatehash + HashSet, // full state + HashSet, // added + HashSet, // removed + )>, + >, + >, + } pub type CompressedStateEvent = [u8; 2 * size_of::()]; @@ -82,7 +96,7 @@ impl Service { Ok(( utils::u64_from_bytes(&compressed_event[0..size_of::()]) .expect("bytes have right length"), - self.get_eventid_from_short( + services().rooms.short.get_eventid_from_short( utils::u64_from_bytes(&compressed_event[size_of::()..]) .expect("bytes have right length"), )?, @@ -214,9 +228,7 @@ impl Service { &self, room_id: &RoomId, new_state_ids_compressed: HashSet, - ) -> Result<(u64, - HashSet, // added - HashSet)> // removed + ) -> Result { let previous_shortstatehash = services().rooms.state.get_room_shortstatehash(room_id)?; @@ -231,7 +243,7 @@ impl Service { services().rooms.short.get_or_create_shortstatehash(&state_hash)?; if Some(new_shortstatehash) == previous_shortstatehash { - return Ok(()); + return Ok(new_shortstatehash); } let states_parents = previous_shortstatehash @@ -265,6 +277,6 @@ impl Service { )?; }; - Ok((new_shortstatehash, statediffnew, statediffremoved)) + Ok(new_shortstatehash) } } diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 2220b5f2..20eae7f1 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -44,6 +44,9 @@ pub trait Data: Send + Sync { /// Returns the `count` of this pdu's id. fn pdu_count(&self, pdu_id: &[u8]) -> Result; + /// Adds a new pdu to the timeline + fn append_pdu(&self, pdu_id: &[u8], pdu: &PduEvent, json: &CanonicalJsonObject, count: u64) -> Result<()>; + /// Removes a pdu and creates a new one with the same id. fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()>; @@ -71,4 +74,6 @@ pub trait Data: Send + Sync { room_id: &RoomId, from: u64, ) -> Result, PduEvent)>>>>; + + fn increment_notification_counts(&self, room_id: &RoomId, notifies: Vec>, highlights: Vec>) -> Result<()>; } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 78172255..f25550d5 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -1,6 +1,7 @@ mod data; use std::borrow::Cow; -use std::sync::Arc; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use std::{iter, collections::HashSet}; use std::fmt::Debug; @@ -22,6 +23,8 @@ use super::state_compressor::CompressedStateEvent; pub struct Service { db: Arc, + + pub(super) lasttimelinecount_cache: Mutex, u64>>, } impl Service { @@ -73,7 +76,7 @@ impl Service { &self, event_id: &EventId, ) -> Result> { - self.db.get_non_outlier_pdu(event_id) + self.db.get_non_outlier_pdu_json(event_id) } /// Returns the pdu's id. @@ -129,9 +132,10 @@ impl Service { &self, pdu: &PduEvent, mut pdu_json: CanonicalJsonObject, - leaves: impl IntoIterator + Debug, + leaves: Vec>, + state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex ) -> Result> { - let shortroomid = self.get_shortroomid(&pdu.room_id)?.expect("room exists"); + let shortroomid = services().rooms.short.get_shortroomid(&pdu.room_id)?.expect("room exists"); // Make unsigned fields correct. This is not properly documented in the spec, but state // events need to have previous content in the unsigned field, so clients can easily @@ -141,8 +145,8 @@ impl Service { .entry("unsigned".to_owned()) .or_insert_with(|| CanonicalJsonValue::Object(Default::default())) { - if let Some(shortstatehash) = self.pdu_shortstatehash(&pdu.event_id).unwrap() { - if let Some(prev_state) = self + if let Some(shortstatehash) = services().rooms.state_accessor.pdu_shortstatehash(&pdu.event_id).unwrap() { + if let Some(prev_state) = services().rooms.state_accessor .state_get(shortstatehash, &pdu.kind.to_string().into(), state_key) .unwrap() { @@ -161,8 +165,8 @@ impl Service { } // We must keep track of all events that have been referenced. - self.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; - self.replace_pdu_leaves(&pdu.room_id, leaves)?; + services().rooms.pdu_metadata.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; + services().rooms.state.set_forward_extremities(&pdu.room_id, leaves, state_lock)?; let mutex_insert = Arc::clone( services().globals @@ -177,37 +181,23 @@ impl Service { let count1 = services().globals.next_count()?; // Mark as read first so the sending client doesn't get a notification even if appending // fails - self.edus + services().rooms.edus.read_receipt .private_read_set(&pdu.room_id, &pdu.sender, count1)?; - self.reset_notification_counts(&pdu.sender, &pdu.room_id)?; + services().rooms.user.reset_notification_counts(&pdu.sender, &pdu.room_id)?; let count2 = services().globals.next_count()?; let mut pdu_id = shortroomid.to_be_bytes().to_vec(); pdu_id.extend_from_slice(&count2.to_be_bytes()); - // There's a brief moment of time here where the count is updated but the pdu does not - // exist. This could theoretically lead to dropped pdus, but it's extremely rare - // - // Update: We fixed this using insert_lock - - self.pduid_pdu.insert( - &pdu_id, - &serde_json::to_vec(&pdu_json).expect("CanonicalJsonObject is always a valid"), - )?; - self.lasttimelinecount_cache - .lock() - .unwrap() - .insert(pdu.room_id.clone(), count2); - - self.eventid_pduid - .insert(pdu.event_id.as_bytes(), &pdu_id)?; - self.eventid_outlierpdu.remove(pdu.event_id.as_bytes())?; + // Insert pdu + self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2)?; drop(insert_lock); // See if the event matches any known pushers let power_levels: RoomPowerLevelsEventContent = services() .rooms + .state_accessor .room_state_get(&pdu.room_id, &StateEventType::RoomPowerLevels, "")? .map(|ev| { serde_json::from_str(ev.content.get()) @@ -221,9 +211,9 @@ impl Service { let mut notifies = Vec::new(); let mut highlights = Vec::new(); - for user in self.get_our_real_users(&pdu.room_id)?.iter() { + for user in services().rooms.state_cache.get_our_real_users(&pdu.room_id)?.into_iter() { // Don't notify the user of their own events - if user == &pdu.sender { + if &user == &pdu.sender { continue; } @@ -231,17 +221,19 @@ impl Service { .account_data .get( None, - user, + &user, GlobalAccountDataEventType::PushRules.to_string().into(), )? + .map(|event| serde_json::from_str::(event.get()) + .map_err(|_| Error::bad_database("Invalid push rules event in db."))).transpose()? .map(|ev: PushRulesEvent| ev.content.global) - .unwrap_or_else(|| Ruleset::server_default(user)); + .unwrap_or_else(|| Ruleset::server_default(&user)); let mut highlight = false; let mut notify = false; for action in services().pusher.get_actions( - user, + &user, &rules_for_user, &power_levels, &sync_pdu, @@ -258,27 +250,20 @@ impl Service { }; } - let mut userroom_id = user.as_bytes().to_vec(); - userroom_id.push(0xff); - userroom_id.extend_from_slice(pdu.room_id.as_bytes()); - if notify { - notifies.push(userroom_id.clone()); + notifies.push(user); } if highlight { - highlights.push(userroom_id); + highlights.push(user); } - for senderkey in services().pusher.get_pusher_senderkeys(user) { + for senderkey in services().pusher.get_pusher_senderkeys(&user) { services().sending.send_push_pdu(&*pdu_id, senderkey)?; } } - self.userroomid_notificationcount - .increment_batch(&mut notifies.into_iter())?; - self.userroomid_highlightcount - .increment_batch(&mut highlights.into_iter())?; + self.db.increment_notification_counts(&pdu.room_id, notifies, highlights); match pdu.kind { RoomEventType::RoomRedaction => { @@ -302,7 +287,7 @@ impl Service { let invite_state = match content.membership { MembershipState::Invite => { - let state = self.calculate_invite_state(pdu)?; + let state = services().rooms.state.calculate_invite_state(pdu)?; Some(state) } _ => None, @@ -310,7 +295,7 @@ impl Service { // Update our membership info, we do this here incase a user is invited // and immediately leaves we need the DB to record the invite event for auth - self.update_membership( + services().rooms.state_cache.update_membership( &pdu.room_id, &target_user_id, content.membership, @@ -322,18 +307,17 @@ impl Service { } RoomEventType::RoomMessage => { #[derive(Deserialize)] - struct ExtractBody<'a> { - #[serde(borrow)] - body: Option>, + struct ExtractBody { + body: Option, } - let content = serde_json::from_str::>(pdu.content.get()) + let content = serde_json::from_str::(pdu.content.get()) .map_err(|_| Error::bad_database("Invalid content in pdu."))?; if let Some(body) = content.body { - services().rooms.search.index_pdu(shortroomid, pdu_id, body)?; + services().rooms.search.index_pdu(shortroomid, &pdu_id, body)?; - let admin_room = self.alias.resolve_local_alias( + let admin_room = services().rooms.alias.resolve_local_alias( <&RoomAliasId>::try_from( format!("#admins:{}", services().globals.server_name()).as_str(), ) @@ -357,7 +341,7 @@ impl Service { } for appservice in services().appservice.all()? { - if self.appservice_in_room(&pdu.room_id, &appservice)? { + if services().rooms.state_cache.appservice_in_room(&pdu.room_id, &appservice)? { services().sending.send_pdu_appservice(&appservice.0, &pdu_id)?; continue; } @@ -418,7 +402,7 @@ impl Service { .map_or(false, |state_key| users.is_match(state_key)) }; let matching_aliases = |aliases: &Regex| { - self.room_aliases(&pdu.room_id) + services().rooms.alias.local_aliases_for_room(&pdu.room_id) .filter_map(|r| r.ok()) .any(|room_alias| aliases.is_match(room_alias.as_str())) }; @@ -461,6 +445,7 @@ impl Service { let create_event = services() .rooms + .state_accessor .room_state_get(room_id, &StateEventType::RoomCreate, "")?; let create_event_content: Option = create_event @@ -483,12 +468,12 @@ impl Service { RoomVersion::new(&room_version_id).expect("room version is supported"); let auth_events = - self.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?; + services().rooms.state.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?; // Our depth is the maximum depth of prev_events + 1 let depth = prev_events .iter() - .filter_map(|event_id| Some(services().rooms.get_pdu(event_id).ok()??.depth)) + .filter_map(|event_id| Some(services().rooms.timeline.get_pdu(event_id).ok()??.depth)) .max() .unwrap_or_else(|| uint!(0)) + uint!(1); @@ -497,7 +482,7 @@ impl Service { if let Some(state_key) = &state_key { if let Some(prev_pdu) = - self.room_state_get(room_id, &event_type.to_string().into(), state_key)? + services().rooms.state_accessor.room_state_get(room_id, &event_type.to_string().into(), state_key)? { unsigned.insert( "prev_content".to_owned(), @@ -604,7 +589,7 @@ impl Service { ); // Generate short event id - let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id)?; + let _shorteventid = services().rooms.short.get_or_create_shorteventid(&pdu.event_id)?; Ok((pdu, pdu_json)) } @@ -623,22 +608,23 @@ impl Service { // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. - let statehashid = self.append_to_state(&pdu)?; + let statehashid = services().rooms.state.append_to_state(&pdu)?; let pdu_id = self.append_pdu( &pdu, pdu_json, // Since this PDU references all pdu_leaves we can update the leaves // of the room - iter::once(&*pdu.event_id), + vec![(*pdu.event_id).to_owned()], + state_lock, )?; // We set the room state after inserting the pdu, so that we never have a moment in time // where events in the current room state do not exist - self.set_room_state(room_id, statehashid)?; + services().rooms.state.set_room_state(room_id, statehashid, state_lock)?; let mut servers: HashSet> = - self.room_servers(room_id).filter_map(|r| r.ok()).collect(); + services().rooms.state_cache.room_servers(room_id).filter_map(|r| r.ok()).collect(); // In case we are kicking or banning a user, we need to inform their server of the change if pdu.kind == RoomEventType::RoomMember { @@ -666,27 +652,27 @@ impl Service { &self, pdu: &PduEvent, pdu_json: CanonicalJsonObject, - new_room_leaves: impl IntoIterator + Clone + Debug, + new_room_leaves: Vec>, state_ids_compressed: HashSet, soft_fail: bool, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex ) -> Result>> { // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. - services().rooms.set_event_state( + services().rooms.state.set_event_state( &pdu.event_id, &pdu.room_id, state_ids_compressed, )?; if soft_fail { - services().rooms + services().rooms.pdu_metadata .mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; - services().rooms.replace_pdu_leaves(&pdu.room_id, new_room_leaves)?; + services().rooms.state.set_forward_extremities(&pdu.room_id, new_room_leaves, state_lock)?; return Ok(None); } - let pdu_id = services().rooms.append_pdu(pdu, pdu_json, new_room_leaves)?; + let pdu_id = services().rooms.timeline.append_pdu(pdu, pdu_json, new_room_leaves, state_lock)?; Ok(Some(pdu_id)) } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index fde251b7..b3350959 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -6,7 +6,7 @@ use std::{ }; use crate::{ - utils, Error, PduEvent, Result, services, api::{server_server, appservice_server}, + utils::{self, calculate_hash}, Error, PduEvent, Result, services, api::{server_server, appservice_server}, }; use federation::transactions::send_transaction_message; use futures_util::{stream::FuturesUnordered, StreamExt}; @@ -677,7 +677,7 @@ impl Service { edus: &edu_jsons, origin_server_ts: MilliSecondsSinceUnixEpoch::now(), transaction_id: (&*base64::encode_config( - Self::calculate_hash( + calculate_hash( &events .iter() .map(|e| match e { diff --git a/src/service/transaction_ids/mod.rs b/src/service/transaction_ids/mod.rs index d7066e24..8d5fd0af 100644 --- a/src/service/transaction_ids/mod.rs +++ b/src/service/transaction_ids/mod.rs @@ -18,15 +18,7 @@ impl Service { txn_id: &TransactionId, data: &[u8], ) -> Result<()> { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(device_id.map(|d| d.as_bytes()).unwrap_or_default()); - key.push(0xff); - key.extend_from_slice(txn_id.as_bytes()); - - self.userdevicetxnid_response.insert(&key, data)?; - - Ok(()) + self.db.add_txnid(user_id, device_id, txn_id, data) } pub fn existing_txnid( @@ -35,13 +27,6 @@ impl Service { device_id: Option<&DeviceId>, txn_id: &TransactionId, ) -> Result>> { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(device_id.map(|d| d.as_bytes()).unwrap_or_default()); - key.push(0xff); - key.extend_from_slice(txn_id.as_bytes()); - - // If there's no entry, this is a new transaction - self.userdevicetxnid_response.get(&key) + self.db.existing_txnid(user_id, device_id, txn_id) } } diff --git a/src/service/uiaa/mod.rs b/src/service/uiaa/mod.rs index 73b2273d..5444118f 100644 --- a/src/service/uiaa/mod.rs +++ b/src/service/uiaa/mod.rs @@ -21,13 +21,13 @@ impl Service { uiaainfo: &UiaaInfo, json_body: &CanonicalJsonValue, ) -> Result<()> { - self.set_uiaa_request( + self.db.set_uiaa_request( user_id, device_id, uiaainfo.session.as_ref().expect("session should be set"), // TODO: better session error handling (why is it optional in ruma?) json_body, )?; - self.update_uiaa_session( + self.db.update_uiaa_session( user_id, device_id, uiaainfo.session.as_ref().expect("session should be set"), @@ -44,7 +44,7 @@ impl Service { ) -> Result<(bool, UiaaInfo)> { let mut uiaainfo = auth .session() - .map(|session| self.get_uiaa_session(user_id, device_id, session)) + .map(|session| self.db.get_uiaa_session(user_id, device_id, session)) .unwrap_or_else(|| Ok(uiaainfo.clone()))?; if uiaainfo.session.is_none() { @@ -110,7 +110,7 @@ impl Service { } if !completed { - self.update_uiaa_session( + self.db.update_uiaa_session( user_id, device_id, uiaainfo.session.as_ref().expect("session is always set"), @@ -120,7 +120,7 @@ impl Service { } // UIAA was successful! Remove this session and return true - self.update_uiaa_session( + self.db.update_uiaa_session( user_id, device_id, uiaainfo.session.as_ref().expect("session is always set"), @@ -137,14 +137,4 @@ impl Service { ) -> Option { self.db.get_uiaa_request(user_id, device_id, session) } - - fn update_uiaa_session( - &self, - user_id: &UserId, - device_id: &DeviceId, - session: &str, - uiaainfo: Option<&UiaaInfo>, - ) -> Result<()> { - self.db.update_uiaa_session(user_id, device_id, session, uiaainfo) - } } diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index 2cf18765..826e0494 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -290,7 +290,7 @@ impl Service { } pub fn get_devicelist_version(&self, user_id: &UserId) -> Result> { - self.db.devicelist_version(user_id) + self.db.get_devicelist_version(user_id) } pub fn all_devices_metadata<'a>( @@ -310,7 +310,7 @@ impl Service { // Set the password to "" to indicate a deactivated account. Hashes will never result in an // empty string, so the user will not be able to log in again. Systems like changing the // password without logging in should check if the account is deactivated. - self.userid_password.insert(user_id.as_bytes(), &[])?; + self.db.set_password(user_id, None)?; // TODO: Unhook 3PID Ok(())