diff --git a/src/api/client_server/keys.rs b/src/api/client_server/keys.rs index 9fd00897..4af8890d 100644 --- a/src/api/client_server/keys.rs +++ b/src/api/client_server/keys.rs @@ -339,17 +339,19 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>( let mut failures = BTreeMap::new(); - let back_off = |id| match services() - .globals - .bad_query_ratelimiter - .write() - .unwrap() - .entry(id) - { - hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); + let back_off = |id| async { + match services() + .globals + .bad_query_ratelimiter + .write() + .await + .entry(id) + { + hash_map::Entry::Vacant(e) => { + e.insert((Instant::now(), 1)); + } + hash_map::Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), } - hash_map::Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), }; let mut futures: FuturesUnordered<_> = get_over_federation @@ -359,7 +361,7 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>( .globals .bad_query_ratelimiter .read() - .unwrap() + .await .get(server) { // Exponential backoff @@ -428,7 +430,8 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>( device_keys.extend(response.device_keys); } _ => { - back_off(server.to_owned()); + back_off(server.to_owned()).await; + failures.insert(server.to_string(), json!({})); } } diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index 5c78a1c2..bc84b262 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -26,9 +26,10 @@ use ruma::{ use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, - sync::{Arc, RwLock}, + sync::Arc, time::{Duration, Instant}, }; +use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; use crate::{ @@ -212,24 +213,28 @@ pub async fn kick_user_route( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(body.room_id.clone()) .or_default(), ); let state_lock = mutex_state.lock().await; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(body.user_id.to_string()), - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMember, + content: to_raw_value(&event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(body.user_id.to_string()), + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; drop(state_lock); @@ -276,24 +281,28 @@ pub async fn ban_user_route(body: Ruma<ban_user::v3::Request>) -> Result<ban_use .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(body.room_id.clone()) .or_default(), ); let state_lock = mutex_state.lock().await; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(body.user_id.to_string()), - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMember, + content: to_raw_value(&event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(body.user_id.to_string()), + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; drop(state_lock); @@ -334,24 +343,28 @@ pub async fn unban_user_route( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(body.room_id.clone()) .or_default(), ); let state_lock = mutex_state.lock().await; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(body.user_id.to_string()), - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMember, + content: to_raw_value(&event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(body.user_id.to_string()), + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; drop(state_lock); @@ -489,7 +502,7 @@ async fn join_room_by_id_helper( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(room_id.to_owned()) .or_default(), ); @@ -680,7 +693,7 @@ async fn join_room_by_id_helper( .iter() .map(|pdu| validate_and_add_event_id(pdu, &room_version_id, &pub_key_map)) { - let (event_id, value) = match result { + let (event_id, value) = match result.await { Ok(t) => t, Err(_) => continue, }; @@ -710,7 +723,7 @@ async fn join_room_by_id_helper( .iter() .map(|pdu| validate_and_add_event_id(pdu, &room_version_id, &pub_key_map)) { - let (event_id, value) = match result { + let (event_id, value) = match result.await { Ok(t) => t, Err(_) => continue, }; @@ -784,12 +797,16 @@ async fn join_room_by_id_helper( let statehash_after_join = services().rooms.state.append_to_state(&parsed_join_pdu)?; info!("Appending new room join event"); - services().rooms.timeline.append_pdu( - &parsed_join_pdu, - join_event, - vec![(*parsed_join_pdu.event_id).to_owned()], - &state_lock, - )?; + services() + .rooms + .timeline + .append_pdu( + &parsed_join_pdu, + join_event, + vec![(*parsed_join_pdu.event_id).to_owned()], + &state_lock, + ) + .await?; info!("Setting final room state for new room"); // We set the room state after inserting the pdu, so that we never have a moment in time @@ -902,18 +919,23 @@ async fn join_room_by_id_helper( }; // Try normal join first - let error = match services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_user.to_string()), - redacts: None, - }, - sender_user, - room_id, - &state_lock, - ) { + let error = match services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMember, + content: to_raw_value(&event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_user.to_string()), + redacts: None, + }, + sender_user, + room_id, + &state_lock, + ) + .await + { Ok(_event_id) => return Ok(join_room_by_id::v3::Response::new(room_id.to_owned())), Err(e) => e, }; @@ -1109,7 +1131,7 @@ async fn make_join_request( make_join_response_and_server } -fn validate_and_add_event_id( +async fn validate_and_add_event_id( pdu: &RawJsonValue, room_version: &RoomVersionId, pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>, @@ -1125,24 +1147,26 @@ fn validate_and_add_event_id( )) .expect("ruma's reference hashes are valid event ids"); - let back_off = |id| match services() - .globals - .bad_event_ratelimiter - .write() - .unwrap() - .entry(id) - { - Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); + let back_off = |id| async { + match services() + .globals + .bad_event_ratelimiter + .write() + .await + .entry(id) + { + Entry::Vacant(e) => { + e.insert((Instant::now(), 1)); + } + Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), } - Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), }; if let Some((time, tries)) = services() .globals .bad_event_ratelimiter .read() - .unwrap() + .await .get(&event_id) { // Exponential backoff @@ -1157,15 +1181,10 @@ fn validate_and_add_event_id( } } - if let Err(e) = ruma::signatures::verify_event( - &*pub_key_map - .read() - .map_err(|_| Error::bad_database("RwLock is poisoned."))?, - &value, - room_version, - ) { + if let Err(e) = ruma::signatures::verify_event(&*pub_key_map.read().await, &value, room_version) + { warn!("Event {} failed verification {:?} {}", event_id, pdu, e); - back_off(event_id); + back_off(event_id).await; return Err(Error::BadServerResponse("Event failed verification.")); } @@ -1191,7 +1210,7 @@ pub(crate) async fn invite_helper<'a>( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(room_id.to_owned()) .or_default(), ); @@ -1312,34 +1331,38 @@ pub(crate) async fn invite_helper<'a>( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(room_id.to_owned()) .or_default(), ); let state_lock = mutex_state.lock().await; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Invite, - displayname: services().users.displayname(user_id)?, - avatar_url: services().users.avatar_url(user_id)?, - is_direct: Some(is_direct), - third_party_invite: None, - blurhash: services().users.blurhash(user_id)?, - reason, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - sender_user, - room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Invite, + displayname: services().users.displayname(user_id)?, + avatar_url: services().users.avatar_url(user_id)?, + is_direct: Some(is_direct), + third_party_invite: None, + blurhash: services().users.blurhash(user_id)?, + reason, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + sender_user, + room_id, + &state_lock, + ) + .await?; drop(state_lock); @@ -1407,7 +1430,7 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option<Strin .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(room_id.to_owned()) .or_default(), ); @@ -1443,18 +1466,22 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option<Strin event.membership = MembershipState::Leave; event.reason = reason; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - user_id, - room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMember, + content: to_raw_value(&event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + user_id, + room_id, + &state_lock, + ) + .await?; } Ok(()) diff --git a/src/api/client_server/message.rs b/src/api/client_server/message.rs index 0952092b..89f33591 100644 --- a/src/api/client_server/message.rs +++ b/src/api/client_server/message.rs @@ -32,7 +32,7 @@ pub async fn send_message_event_route( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(body.room_id.clone()) .or_default(), ); @@ -73,19 +73,23 @@ pub async fn send_message_event_route( let mut unsigned = BTreeMap::new(); unsigned.insert("transaction_id".to_owned(), body.txn_id.to_string().into()); - let event_id = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: body.event_type.to_string().into(), - content: serde_json::from_str(body.body.body.json().get()) - .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?, - unsigned: Some(unsigned), - state_key: None, - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + let event_id = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: body.event_type.to_string().into(), + content: serde_json::from_str(body.body.body.json().get()) + .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?, + unsigned: Some(unsigned), + state_key: None, + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; services().transaction_ids.add_txnid( sender_user, @@ -126,12 +130,11 @@ pub async fn get_message_events_route( .as_ref() .and_then(|t| PduCount::try_from_string(t).ok()); - services().rooms.lazy_loading.lazy_load_confirm_delivery( - sender_user, - sender_device, - &body.room_id, - from, - )?; + services() + .rooms + .lazy_loading + .lazy_load_confirm_delivery(sender_user, sender_device, &body.room_id, from) + .await?; let limit = u64::from(body.limit).min(100) as usize; diff --git a/src/api/client_server/profile.rs b/src/api/client_server/profile.rs index 8fb38b59..9e6b277d 100644 --- a/src/api/client_server/profile.rs +++ b/src/api/client_server/profile.rs @@ -77,18 +77,17 @@ pub async fn set_displayname_route( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(room_id.clone()) .or_default(), ); let state_lock = mutex_state.lock().await; - let _ = services().rooms.timeline.build_and_append_pdu( - pdu_builder, - sender_user, - &room_id, - &state_lock, - ); + let _ = services() + .rooms + .timeline + .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) + .await; // Presence update services().rooms.edus.presence.update_presence( @@ -212,18 +211,17 @@ pub async fn set_avatar_url_route( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(room_id.clone()) .or_default(), ); let state_lock = mutex_state.lock().await; - let _ = services().rooms.timeline.build_and_append_pdu( - pdu_builder, - sender_user, - &room_id, - &state_lock, - ); + let _ = services() + .rooms + .timeline + .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) + .await; // Presence update services().rooms.edus.presence.update_presence( diff --git a/src/api/client_server/redact.rs b/src/api/client_server/redact.rs index 21da2221..f0603f4b 100644 --- a/src/api/client_server/redact.rs +++ b/src/api/client_server/redact.rs @@ -24,28 +24,32 @@ pub async fn redact_event_route( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(body.room_id.clone()) .or_default(), ); let state_lock = mutex_state.lock().await; - let event_id = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomRedaction, - content: to_raw_value(&RoomRedactionEventContent { - redacts: Some(body.event_id.clone()), - reason: body.reason.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: Some(body.event_id.into()), - }, - sender_user, - &body.room_id, - &state_lock, - )?; + let event_id = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomRedaction, + content: to_raw_value(&RoomRedactionEventContent { + redacts: Some(body.event_id.clone()), + reason: body.reason.clone(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: Some(body.event_id.into()), + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; drop(state_lock); diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs index 20409a28..5d4cbe8a 100644 --- a/src/api/client_server/room.rs +++ b/src/api/client_server/room.rs @@ -61,7 +61,7 @@ pub async fn create_room_route( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(room_id.clone()) .or_default(), ); @@ -174,42 +174,50 @@ pub async fn create_room_route( } // 1. The room create event - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomCreate, - content: to_raw_value(&content).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomCreate, + content: to_raw_value(&content).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 2. Let the room creator join - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: services().users.displayname(sender_user)?, - avatar_url: services().users.avatar_url(sender_user)?, - is_direct: Some(body.is_direct), - third_party_invite: None, - blurhash: services().users.blurhash(sender_user)?, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_user.to_string()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: services().users.displayname(sender_user)?, + avatar_url: services().users.avatar_url(sender_user)?, + is_direct: Some(body.is_direct), + third_party_invite: None, + blurhash: services().users.blurhash(sender_user)?, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_user.to_string()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 3. Power levels @@ -246,30 +254,14 @@ pub async fn create_room_route( } } - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomPowerLevels, - content: to_raw_value(&power_levels_content) - .expect("to_raw_value always works on serde_json::Value"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; - - // 4. Canonical room alias - if let Some(room_alias_id) = &alias { - services().rooms.timeline.build_and_append_pdu( + services() + .rooms + .timeline + .build_and_append_pdu( PduBuilder { - event_type: TimelineEventType::RoomCanonicalAlias, - content: to_raw_value(&RoomCanonicalAliasEventContent { - alias: Some(room_alias_id.to_owned()), - alt_aliases: vec![], - }) - .expect("We checked that alias earlier, it must be fine"), + event_type: TimelineEventType::RoomPowerLevels, + content: to_raw_value(&power_levels_content) + .expect("to_raw_value always works on serde_json::Value"), unsigned: None, state_key: Some("".to_owned()), redacts: None, @@ -277,64 +269,100 @@ pub async fn create_room_route( sender_user, &room_id, &state_lock, - )?; + ) + .await?; + + // 4. Canonical room alias + if let Some(room_alias_id) = &alias { + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomCanonicalAlias, + content: to_raw_value(&RoomCanonicalAliasEventContent { + alias: Some(room_alias_id.to_owned()), + alt_aliases: vec![], + }) + .expect("We checked that alias earlier, it must be fine"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; } // 5. Events set by preset // 5.1 Join Rules - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomJoinRules, - content: to_raw_value(&RoomJoinRulesEventContent::new(match preset { - RoomPreset::PublicChat => JoinRule::Public, - // according to spec "invite" is the default - _ => JoinRule::Invite, - })) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomJoinRules, + content: to_raw_value(&RoomJoinRulesEventContent::new(match preset { + RoomPreset::PublicChat => JoinRule::Public, + // according to spec "invite" is the default + _ => JoinRule::Invite, + })) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 5.2 History Visibility - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomHistoryVisibility, - content: to_raw_value(&RoomHistoryVisibilityEventContent::new( - HistoryVisibility::Shared, - )) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomHistoryVisibility, + content: to_raw_value(&RoomHistoryVisibilityEventContent::new( + HistoryVisibility::Shared, + )) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 5.3 Guest Access - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomGuestAccess, - content: to_raw_value(&RoomGuestAccessEventContent::new(match preset { - RoomPreset::PublicChat => GuestAccess::Forbidden, - _ => GuestAccess::CanJoin, - })) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomGuestAccess, + content: to_raw_value(&RoomGuestAccessEventContent::new(match preset { + RoomPreset::PublicChat => GuestAccess::Forbidden, + _ => GuestAccess::CanJoin, + })) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 6. Events listed in initial_state for event in &body.initial_state { @@ -353,47 +381,54 @@ pub async fn create_room_route( continue; } - services().rooms.timeline.build_and_append_pdu( - pdu_builder, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) + .await?; } // 7. Events implied by name and topic if let Some(name) = &body.name { - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomName, - content: to_raw_value(&RoomNameEventContent::new(name.clone())) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomName, + content: to_raw_value(&RoomNameEventContent::new(name.clone())) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; } if let Some(topic) = &body.topic { - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomTopic, - content: to_raw_value(&RoomTopicEventContent { - topic: topic.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomTopic, + content: to_raw_value(&RoomTopicEventContent { + topic: topic.clone(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; } // 8. Events implied by invite (and TODO: invite_3pid) @@ -523,7 +558,7 @@ pub async fn upgrade_room_route( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(body.room_id.clone()) .or_default(), ); @@ -531,22 +566,26 @@ pub async fn upgrade_room_route( // Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further // Fail if the sender does not have the required permissions - let tombstone_event_id = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomTombstone, - content: to_raw_value(&RoomTombstoneEventContent { - body: "This room has been replaced".to_owned(), - replacement_room: replacement_room.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + let tombstone_event_id = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomTombstone, + content: to_raw_value(&RoomTombstoneEventContent { + body: "This room has been replaced".to_owned(), + replacement_room: replacement_room.clone(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; // Change lock to replacement room drop(state_lock); @@ -555,7 +594,7 @@ pub async fn upgrade_room_route( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(replacement_room.clone()) .or_default(), ); @@ -613,43 +652,51 @@ pub async fn upgrade_room_route( )); } - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomCreate, - content: to_raw_value(&create_event_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &replacement_room, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomCreate, + content: to_raw_value(&create_event_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &replacement_room, + &state_lock, + ) + .await?; // Join the new room - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: services().users.displayname(sender_user)?, - avatar_url: services().users.avatar_url(sender_user)?, - is_direct: None, - third_party_invite: None, - blurhash: services().users.blurhash(sender_user)?, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_user.to_string()), - redacts: None, - }, - sender_user, - &replacement_room, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: services().users.displayname(sender_user)?, + avatar_url: services().users.avatar_url(sender_user)?, + is_direct: None, + third_party_invite: None, + blurhash: services().users.blurhash(sender_user)?, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_user.to_string()), + redacts: None, + }, + sender_user, + &replacement_room, + &state_lock, + ) + .await?; // Recommended transferable state events list from the specs let transferable_state_events = vec![ @@ -676,18 +723,22 @@ pub async fn upgrade_room_route( None => continue, // Skipping missing events. }; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: event_type.to_string().into(), - content: event_content, - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &replacement_room, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: event_type.to_string().into(), + content: event_content, + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &replacement_room, + &state_lock, + ) + .await?; } // Moves any local aliases to the new room @@ -721,19 +772,23 @@ pub async fn upgrade_room_route( power_levels_event_content.invite = new_level; // Modify the power levels in the old room to prevent sending of events and inviting new users - let _ = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomPowerLevels, - content: to_raw_value(&power_levels_event_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + let _ = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomPowerLevels, + content: to_raw_value(&power_levels_event_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; drop(state_lock); diff --git a/src/api/client_server/state.rs b/src/api/client_server/state.rs index 174282a1..e62aa013 100644 --- a/src/api/client_server/state.rs +++ b/src/api/client_server/state.rs @@ -227,24 +227,28 @@ async fn send_state_event_for_key_helper( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(room_id.to_owned()) .or_default(), ); let state_lock = mutex_state.lock().await; - let event_id = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: event_type.to_string().into(), - content: serde_json::from_str(json.json().get()).expect("content is valid json"), - unsigned: None, - state_key: Some(state_key), - redacts: None, - }, - sender_user, - room_id, - &state_lock, - )?; + let event_id = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: event_type.to_string().into(), + content: serde_json::from_str(json.json().get()).expect("content is valid json"), + unsigned: None, + state_key: Some(state_key), + redacts: None, + }, + sender_user, + room_id, + &state_lock, + ) + .await?; Ok(event_id) } diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 76b48d10..c510f5f5 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -1,6 +1,7 @@ use crate::{ service::rooms::timeline::PduCount, services, Error, PduEvent, Result, Ruma, RumaResponse, }; + use ruma::{ api::client::{ filter::{FilterDefinition, LazyLoadOptions}, @@ -75,7 +76,7 @@ pub async fn sync_events_route( .globals .sync_receivers .write() - .unwrap() + .await .entry((sender_user.clone(), sender_device.clone())) { Entry::Vacant(v) => { @@ -147,7 +148,7 @@ async fn sync_helper_wrapper( .globals .sync_receivers .write() - .unwrap() + .await .entry((sender_user, sender_device)) { Entry::Occupied(o) => { @@ -302,11 +303,11 @@ async fn sync_helper( .globals .roomid_mutex_insert .write() - .unwrap() + .await .entry(room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; drop(insert_lock); } @@ -434,11 +435,11 @@ async fn sync_helper( .globals .roomid_mutex_insert .write() - .unwrap() + .await .entry(room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; drop(insert_lock); } @@ -577,11 +578,11 @@ async fn load_joined_room( .globals .roomid_mutex_insert .write() - .unwrap() + .await .entry(room_id.to_owned()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; drop(insert_lock); } @@ -599,12 +600,11 @@ async fn load_joined_room( timeline_users.insert(event.sender.as_str().to_owned()); } - services().rooms.lazy_loading.lazy_load_confirm_delivery( - sender_user, - sender_device, - room_id, - sincecount, - )?; + services() + .rooms + .lazy_loading + .lazy_load_confirm_delivery(sender_user, sender_device, room_id, sincecount) + .await?; // Database queries: @@ -797,13 +797,17 @@ async fn load_joined_room( // The state_events above should contain all timeline_users, let's mark them as lazy // loaded. - services().rooms.lazy_loading.lazy_load_mark_sent( - sender_user, - sender_device, - room_id, - lazy_loaded, - next_batchcount, - ); + services() + .rooms + .lazy_loading + .lazy_load_mark_sent( + sender_user, + sender_device, + room_id, + lazy_loaded, + next_batchcount, + ) + .await; ( heroes, @@ -884,13 +888,17 @@ async fn load_joined_room( } } - services().rooms.lazy_loading.lazy_load_mark_sent( - sender_user, - sender_device, - room_id, - lazy_loaded, - next_batchcount, - ); + services() + .rooms + .lazy_loading + .lazy_load_mark_sent( + sender_user, + sender_device, + room_id, + lazy_loaded, + next_batchcount, + ) + .await; let encrypted_room = services() .rooms diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 1ba2edc0..f946feaf 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -51,9 +51,10 @@ use std::{ fmt::Debug, mem, net::{IpAddr, SocketAddr}, - sync::{Arc, RwLock}, + sync::Arc, time::{Duration, Instant, SystemTime}, }; +use tokio::sync::RwLock; use tracing::{debug, error, warn}; @@ -137,7 +138,7 @@ where .globals .actual_destination_cache .read() - .unwrap() + .await .get(destination) .cloned(); @@ -290,7 +291,7 @@ where .globals .actual_destination_cache .write() - .unwrap() + .await .insert( OwnedServerName::from(destination), (actual_destination, host), @@ -740,7 +741,7 @@ pub async fn send_transaction_message_route( .globals .roomid_mutex_federation .write() - .unwrap() + .await .entry(room_id.to_owned()) .or_default(), ); @@ -1409,7 +1410,7 @@ pub async fn create_join_event_template_route( .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(body.room_id.to_owned()) .or_default(), ); @@ -1579,7 +1580,7 @@ async fn create_join_event( .globals .roomid_mutex_federation .write() - .unwrap() + .await .entry(room_id.to_owned()) .or_default(), ); diff --git a/src/lib.rs b/src/lib.rs index 66d0c57c..70c6f373 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,9 @@ mod database; mod service; mod utils; +// Not async due to services() being used in many closures, and async closures are not stable as of writing +// This is the case for every other occurence of sync Mutex/RwLock, except for database related ones, where +// the current maintainer (Timo) has asked to not modify those use std::sync::RwLock; pub use api::ruma_wrapper::{Ruma, RumaResponse}; diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index f6e52f7a..d99be878 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -1,7 +1,7 @@ use std::{ collections::BTreeMap, convert::{TryFrom, TryInto}, - sync::{Arc, RwLock}, + sync::Arc, time::Instant, }; @@ -26,7 +26,7 @@ use ruma::{ EventId, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use serde_json::value::to_raw_value; -use tokio::sync::{mpsc, Mutex, MutexGuard}; +use tokio::sync::{mpsc, Mutex, RwLock}; use crate::{ api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH}, @@ -215,27 +215,6 @@ impl Service { .expect("@conduit:server_name is valid"); if let Ok(Some(conduit_room)) = services().admin.get_admin_room() { - let send_message = |message: RoomMessageEventContent, - mutex_lock: &MutexGuard<'_, ()>| { - services() - .rooms - .timeline - .build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMessage, - content: to_raw_value(&message) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - mutex_lock, - ) - .unwrap(); - }; - loop { tokio::select! { Some(event) = receiver.recv() => { @@ -248,16 +227,30 @@ impl Service { services().globals .roomid_mutex_state .write() - .unwrap() + .await .entry(conduit_room.to_owned()) .or_default(), ); let state_lock = mutex_state.lock().await; - send_message(message_content, &state_lock); - - drop(state_lock); + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMessage, + content: to_raw_value(&message_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &conduit_room, + &state_lock, + ) + .await.unwrap(); } } } @@ -425,11 +418,7 @@ impl Service { Err(e) => RoomMessageEventContent::text_plain(e.to_string()), }, AdminCommand::IncomingFederation => { - let map = services() - .globals - .roomid_federationhandletime - .read() - .unwrap(); + let map = services().globals.roomid_federationhandletime.read().await; let mut msg: String = format!("Handling {} incoming pdus:\n", map.len()); for (r, (e, i)) in map.iter() { @@ -543,7 +532,7 @@ impl Service { } } AdminCommand::MemoryUsage => { - let response1 = services().memory_usage(); + let response1 = services().memory_usage().await; let response2 = services().globals.db.memory_usage(); RoomMessageEventContent::text_plain(format!( @@ -556,7 +545,7 @@ impl Service { RoomMessageEventContent::text_plain("Done.") } AdminCommand::ClearServiceCaches { amount } => { - services().clear_caches(amount); + services().clear_caches(amount).await; RoomMessageEventContent::text_plain("Done.") } @@ -797,7 +786,7 @@ impl Service { .fetch_required_signing_keys(&value, &pub_key_map) .await?; - let pub_key_map = pub_key_map.read().unwrap(); + let pub_key_map = pub_key_map.read().await; match ruma::signatures::verify_json(&pub_key_map, &value) { Ok(_) => RoomMessageEventContent::text_plain("Signature correct"), Err(e) => RoomMessageEventContent::text_plain(format!( @@ -913,7 +902,7 @@ impl Service { .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(room_id.clone()) .or_default(), ); @@ -932,164 +921,202 @@ impl Service { content.room_version = services().globals.default_room_version(); // 1. The room create event - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomCreate, - content: to_raw_value(&content).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomCreate, + content: to_raw_value(&content).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 2. Make conduit bot join - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: None, - avatar_url: None, - is_direct: None, - third_party_invite: None, - blurhash: None, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(conduit_user.to_string()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: None, + avatar_url: None, + is_direct: None, + third_party_invite: None, + blurhash: None, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(conduit_user.to_string()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 3. Power levels let mut users = BTreeMap::new(); users.insert(conduit_user.clone(), 100.into()); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomPowerLevels, - content: to_raw_value(&RoomPowerLevelsEventContent { - users, - ..Default::default() - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomPowerLevels, + content: to_raw_value(&RoomPowerLevelsEventContent { + users, + ..Default::default() + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 4.1 Join Rules - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomJoinRules, - content: to_raw_value(&RoomJoinRulesEventContent::new(JoinRule::Invite)) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomJoinRules, + content: to_raw_value(&RoomJoinRulesEventContent::new(JoinRule::Invite)) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 4.2 History Visibility - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomHistoryVisibility, - content: to_raw_value(&RoomHistoryVisibilityEventContent::new( - HistoryVisibility::Shared, - )) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomHistoryVisibility, + content: to_raw_value(&RoomHistoryVisibilityEventContent::new( + HistoryVisibility::Shared, + )) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 4.3 Guest Access - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomGuestAccess, - content: to_raw_value(&RoomGuestAccessEventContent::new(GuestAccess::Forbidden)) + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomGuestAccess, + content: to_raw_value(&RoomGuestAccessEventContent::new( + GuestAccess::Forbidden, + )) .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 5. Events implied by name and topic let room_name = format!("{} Admin Room", services().globals.server_name()); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomName, - content: to_raw_value(&RoomNameEventContent::new(room_name)) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomName, + content: to_raw_value(&RoomNameEventContent::new(room_name)) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomTopic, - content: to_raw_value(&RoomTopicEventContent { - topic: format!("Manage {}", services().globals.server_name()), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomTopic, + content: to_raw_value(&RoomTopicEventContent { + topic: format!("Manage {}", services().globals.server_name()), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 6. Room alias let alias: OwnedRoomAliasId = format!("#admins:{}", services().globals.server_name()) .try_into() .expect("#admins:server_name is a valid alias name"); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomCanonicalAlias, - content: to_raw_value(&RoomCanonicalAliasEventContent { - alias: Some(alias.clone()), - alt_aliases: Vec::new(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomCanonicalAlias, + content: to_raw_value(&RoomCanonicalAliasEventContent { + alias: Some(alias.clone()), + alt_aliases: Vec::new(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; services().rooms.alias.set_alias(&alias, &room_id)?; @@ -1125,7 +1152,7 @@ impl Service { .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(room_id.clone()) .or_default(), ); @@ -1137,72 +1164,84 @@ impl Service { .expect("@conduit:server_name is valid"); // Invite and join the real user - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Invite, - displayname: None, - avatar_url: None, - is_direct: None, - third_party_invite: None, - blurhash: None, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: Some(displayname), - avatar_url: None, - is_direct: None, - third_party_invite: None, - blurhash: None, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - user_id, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Invite, + displayname: None, + avatar_url: None, + is_direct: None, + third_party_invite: None, + blurhash: None, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: Some(displayname), + avatar_url: None, + is_direct: None, + third_party_invite: None, + blurhash: None, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + user_id, + &room_id, + &state_lock, + ) + .await?; // Set power level let mut users = BTreeMap::new(); users.insert(conduit_user.to_owned(), 100.into()); users.insert(user_id.to_owned(), 100.into()); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomPowerLevels, - content: to_raw_value(&RoomPowerLevelsEventContent { - users, - ..Default::default() - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomPowerLevels, + content: to_raw_value(&RoomPowerLevelsEventContent { + users, + ..Default::default() + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // Send welcome message services().rooms.timeline.build_and_append_pdu( @@ -1220,7 +1259,7 @@ impl Service { &conduit_user, &room_id, &state_lock, - )?; + ).await?; } Ok(()) } diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index f7822940..22dc6959 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -31,11 +31,11 @@ use std::{ path::PathBuf, sync::{ atomic::{self, AtomicBool}, - Arc, Mutex, RwLock, + Arc, RwLock as StdRwLock, }, time::{Duration, Instant}, }; -use tokio::sync::{broadcast, watch::Receiver, Mutex as TokioMutex, Semaphore}; +use tokio::sync::{broadcast, watch::Receiver, Mutex, RwLock, Semaphore}; use tracing::{error, info}; use trust_dns_resolver::TokioAsyncResolver; @@ -53,7 +53,7 @@ pub struct Service { pub db: &'static dyn Data, pub actual_destination_cache: Arc<RwLock<WellKnownMap>>, // actual_destination, host - pub tls_name_override: Arc<RwLock<TlsNameMap>>, + pub tls_name_override: Arc<StdRwLock<TlsNameMap>>, pub config: Config, keypair: Arc<ruma::signatures::Ed25519KeyPair>, dns_resolver: TokioAsyncResolver, @@ -68,8 +68,8 @@ pub struct Service { pub servername_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, Arc<Semaphore>>>>, pub sync_receivers: RwLock<HashMap<(OwnedUserId, OwnedDeviceId), SyncHandle>>, pub roomid_mutex_insert: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>, - pub roomid_mutex_state: RwLock<HashMap<OwnedRoomId, Arc<TokioMutex<()>>>>, - pub roomid_mutex_federation: RwLock<HashMap<OwnedRoomId, Arc<TokioMutex<()>>>>, // this lock will be held longer + pub roomid_mutex_state: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>, + pub roomid_mutex_federation: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>, // this lock will be held longer pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>, pub stateres_mutex: Arc<Mutex<()>>, pub rotate: RotationHandler, @@ -109,11 +109,11 @@ impl Default for RotationHandler { pub struct Resolver { inner: GaiResolver, - overrides: Arc<RwLock<TlsNameMap>>, + overrides: Arc<StdRwLock<TlsNameMap>>, } impl Resolver { - pub fn new(overrides: Arc<RwLock<TlsNameMap>>) -> Self { + pub fn new(overrides: Arc<StdRwLock<TlsNameMap>>) -> Self { Resolver { inner: GaiResolver::new(), overrides, @@ -125,7 +125,7 @@ impl Resolve for Resolver { fn resolve(&self, name: Name) -> Resolving { self.overrides .read() - .expect("lock should not be poisoned") + .unwrap() .get(name.as_str()) .and_then(|(override_name, port)| { override_name.first().map(|first_name| { @@ -159,7 +159,7 @@ impl Service { } }; - let tls_name_override = Arc::new(RwLock::new(TlsNameMap::new())); + let tls_name_override = Arc::new(StdRwLock::new(TlsNameMap::new())); let jwt_decoding_key = config .jwt_secret diff --git a/src/service/mod.rs b/src/service/mod.rs index f85da788..8f9fb0a5 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,9 +1,10 @@ use std::{ collections::{BTreeMap, HashMap}, - sync::{Arc, Mutex}, + sync::{Arc, Mutex as StdMutex}, }; use lru_cache::LruCache; +use tokio::sync::Mutex; use crate::{Config, Result}; @@ -79,17 +80,17 @@ impl Services { state: rooms::state::Service { db }, state_accessor: rooms::state_accessor::Service { db, - server_visibility_cache: Mutex::new(LruCache::new( + server_visibility_cache: StdMutex::new(LruCache::new( (100.0 * config.conduit_cache_capacity_modifier) as usize, )), - user_visibility_cache: Mutex::new(LruCache::new( + user_visibility_cache: StdMutex::new(LruCache::new( (100.0 * config.conduit_cache_capacity_modifier) as usize, )), }, state_cache: rooms::state_cache::Service { db }, state_compressor: rooms::state_compressor::Service { db, - stateinfo_cache: Mutex::new(LruCache::new( + stateinfo_cache: StdMutex::new(LruCache::new( (100.0 * config.conduit_cache_capacity_modifier) as usize, )), }, @@ -107,7 +108,7 @@ impl Services { uiaa: uiaa::Service { db }, users: users::Service { db, - connections: Mutex::new(BTreeMap::new()), + connections: StdMutex::new(BTreeMap::new()), }, account_data: account_data::Service { db }, admin: admin::Service::build(), @@ -118,14 +119,8 @@ impl Services { globals: globals::Service::load(db, config)?, }) } - fn memory_usage(&self) -> String { - let lazy_load_waiting = self - .rooms - .lazy_loading - .lazy_load_waiting - .lock() - .unwrap() - .len(); + async fn memory_usage(&self) -> String { + let lazy_load_waiting = self.rooms.lazy_loading.lazy_load_waiting.lock().await.len(); let server_visibility_cache = self .rooms .state_accessor @@ -152,15 +147,9 @@ impl Services { .timeline .lasttimelinecount_cache .lock() - .unwrap() - .len(); - let roomid_spacechunk_cache = self - .rooms - .spaces - .roomid_spacechunk_cache - .lock() - .unwrap() + .await .len(); + let roomid_spacechunk_cache = self.rooms.spaces.roomid_spacechunk_cache.lock().await.len(); format!( "\ @@ -173,13 +162,13 @@ roomid_spacechunk_cache: {roomid_spacechunk_cache}\ " ) } - fn clear_caches(&self, amount: u32) { + async fn clear_caches(&self, amount: u32) { if amount > 0 { self.rooms .lazy_loading .lazy_load_waiting .lock() - .unwrap() + .await .clear(); } if amount > 1 { @@ -211,7 +200,7 @@ roomid_spacechunk_cache: {roomid_spacechunk_cache}\ .timeline .lasttimelinecount_cache .lock() - .unwrap() + .await .clear(); } if amount > 5 { @@ -219,7 +208,7 @@ roomid_spacechunk_cache: {roomid_spacechunk_cache}\ .spaces .roomid_spacechunk_cache .lock() - .unwrap() + .await .clear(); } } diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 99fc2cb1..1547d406 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -1,25 +1,23 @@ /// An async function that can recursively call itself. type AsyncRecursiveType<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>; -use ruma::{ - api::federation::discovery::{get_remote_server_keys, get_server_keys}, - CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, OwnedServerSigningKeyId, - RoomVersionId, -}; use std::{ collections::{hash_map, BTreeMap, HashMap, HashSet}, pin::Pin, - sync::{Arc, RwLock, RwLockWriteGuard}, + sync::Arc, time::{Duration, Instant, SystemTime}, }; -use tokio::sync::Semaphore; use futures_util::{stream::FuturesUnordered, Future, StreamExt}; use ruma::{ api::{ client::error::ErrorKind, federation::{ - discovery::get_remote_server_keys_batch::{self, v2::QueryCriteria}, + discovery::{ + get_remote_server_keys, + get_remote_server_keys_batch::{self, v2::QueryCriteria}, + get_server_keys, + }, event::{get_event, get_room_state_ids}, membership::create_join_event, }, @@ -31,9 +29,11 @@ use ruma::{ int, serde::Base64, state_res::{self, RoomVersion, StateMap}, - uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName, + uint, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, + OwnedServerName, OwnedServerSigningKeyId, RoomId, RoomVersionId, ServerName, }; use serde_json::value::RawValue as RawJsonValue; +use tokio::sync::{RwLock, RwLockWriteGuard, Semaphore}; use tracing::{debug, error, info, trace, warn}; use crate::{service::*, services, Error, PduEvent, Result}; @@ -168,7 +168,7 @@ impl Service { .globals .bad_event_ratelimiter .read() - .unwrap() + .await .get(&*prev_id) { // Exponential backoff @@ -189,7 +189,7 @@ impl Service { .globals .bad_event_ratelimiter .write() - .unwrap() + .await .entry((*prev_id).to_owned()) { hash_map::Entry::Vacant(e) => { @@ -213,7 +213,7 @@ impl Service { .globals .roomid_federationhandletime .write() - .unwrap() + .await .insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time)); if let Err(e) = self @@ -233,7 +233,7 @@ impl Service { .globals .bad_event_ratelimiter .write() - .unwrap() + .await .entry((*prev_id).to_owned()) { hash_map::Entry::Vacant(e) => { @@ -249,7 +249,7 @@ impl Service { .globals .roomid_federationhandletime .write() - .unwrap() + .await .remove(&room_id.to_owned()); debug!( "Handling prev event {} took {}m{}s", @@ -267,7 +267,7 @@ impl Service { .globals .roomid_federationhandletime .write() - .unwrap() + .await .insert(room_id.to_owned(), (event_id.to_owned(), start_time)); let r = services() .rooms @@ -285,7 +285,7 @@ impl Service { .globals .roomid_federationhandletime .write() - .unwrap() + .await .remove(&room_id.to_owned()); r @@ -326,11 +326,8 @@ impl Service { let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); - let mut val = match ruma::signatures::verify_event( - &pub_key_map.read().expect("RwLock is poisoned."), - &value, - room_version_id, - ) { + let guard = pub_key_map.read().await; + let mut val = match ruma::signatures::verify_event(&guard, &value, room_version_id) { Err(e) => { // Drop warn!("Dropping bad event {}: {}", event_id, e,); @@ -365,6 +362,8 @@ impl Service { Ok(ruma::signatures::Verified::All) => value, }; + drop(guard); + // Now that we have checked the signature and hashes we can add the eventID and convert // to our PduEvent type val.insert( @@ -692,13 +691,15 @@ impl Service { { Ok(res) => { debug!("Fetching state events at event."); + let collect = res + .pdu_ids + .iter() + .map(|x| Arc::from(&**x)) + .collect::<Vec<_>>(); let state_vec = self .fetch_and_handle_outliers( origin, - &res.pdu_ids - .iter() - .map(|x| Arc::from(&**x)) - .collect::<Vec<_>>(), + &collect, create_event, room_id, room_version_id, @@ -805,7 +806,7 @@ impl Service { .globals .roomid_mutex_state .write() - .unwrap() + .await .entry(room_id.to_owned()) .or_default(), ); @@ -884,14 +885,18 @@ impl Service { debug!("Starting soft fail auth check"); if soft_fail { - services().rooms.timeline.append_incoming_pdu( - &incoming_pdu, - val, - extremities.iter().map(|e| (**e).to_owned()).collect(), - state_ids_compressed, - soft_fail, - &state_lock, - )?; + services() + .rooms + .timeline + .append_incoming_pdu( + &incoming_pdu, + val, + extremities.iter().map(|e| (**e).to_owned()).collect(), + state_ids_compressed, + soft_fail, + &state_lock, + ) + .await?; // Soft fail, we keep the event as an outlier but don't add it to the timeline warn!("Event was soft failed: {:?}", incoming_pdu); @@ -912,14 +917,18 @@ impl Service { // We use the `state_at_event` instead of `state_after` so we accurately // represent the state for this event. - let pdu_id = services().rooms.timeline.append_incoming_pdu( - &incoming_pdu, - val, - extremities.iter().map(|e| (**e).to_owned()).collect(), - state_ids_compressed, - soft_fail, - &state_lock, - )?; + let pdu_id = services() + .rooms + .timeline + .append_incoming_pdu( + &incoming_pdu, + val, + extremities.iter().map(|e| (**e).to_owned()).collect(), + state_ids_compressed, + soft_fail, + &state_lock, + ) + .await?; debug!("Appended incoming pdu"); @@ -1045,17 +1054,21 @@ impl Service { ) -> AsyncRecursiveType<'a, Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>> { Box::pin(async move { - let back_off = |id| match services() - .globals - .bad_event_ratelimiter - .write() - .unwrap() - .entry(id) - { - hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); + let back_off = |id| async move { + match services() + .globals + .bad_event_ratelimiter + .write() + .await + .entry(id) + { + hash_map::Entry::Vacant(e) => { + e.insert((Instant::now(), 1)); + } + hash_map::Entry::Occupied(mut e) => { + *e.get_mut() = (Instant::now(), e.get().1 + 1) + } } - hash_map::Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), }; let mut pdus = vec![]; @@ -1081,7 +1094,7 @@ impl Service { .globals .bad_event_ratelimiter .read() - .unwrap() + .await .get(&*next_id) { // Exponential backoff @@ -1128,7 +1141,7 @@ impl Service { match pdu::gen_event_id_canonical_json(&res.pdu, room_version_id) { Ok(t) => t, Err(_) => { - back_off((*next_id).to_owned()); + back_off((*next_id).to_owned()).await; continue; } }; @@ -1160,7 +1173,7 @@ impl Service { } Err(_) => { warn!("Failed to fetch event: {}", next_id); - back_off((*next_id).to_owned()); + back_off((*next_id).to_owned()).await; } } } @@ -1170,7 +1183,7 @@ impl Service { .globals .bad_event_ratelimiter .read() - .unwrap() + .await .get(&**next_id) { // Exponential backoff @@ -1205,7 +1218,7 @@ impl Service { } Err(e) => { warn!("Authentication of event {} failed: {:?}", next_id, e); - back_off((**next_id).to_owned()); + back_off((**next_id).to_owned()).await; } } } @@ -1360,7 +1373,7 @@ impl Service { pub_key_map .write() - .map_err(|_| Error::bad_database("RwLock is poisoned."))? + .await .insert(signature_server.clone(), keys); } @@ -1369,7 +1382,7 @@ impl Service { // Gets a list of servers for which we don't have the signing key yet. We go over // the PDUs and either cache the key or add it to the list that needs to be retrieved. - fn get_server_keys_from_cache( + async fn get_server_keys_from_cache( &self, pdu: &RawJsonValue, servers: &mut BTreeMap<OwnedServerName, BTreeMap<OwnedServerSigningKeyId, QueryCriteria>>, @@ -1393,7 +1406,7 @@ impl Service { .globals .bad_event_ratelimiter .read() - .unwrap() + .await .get(event_id) { // Exponential backoff @@ -1469,17 +1482,19 @@ impl Service { > = BTreeMap::new(); { - let mut pkm = pub_key_map - .write() - .map_err(|_| Error::bad_database("RwLock is poisoned."))?; + let mut pkm = pub_key_map.write().await; // Try to fetch keys, failure is okay // Servers we couldn't find in the cache will be added to `servers` for pdu in &event.room_state.state { - let _ = self.get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm); + let _ = self + .get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm) + .await; } for pdu in &event.room_state.auth_chain { - let _ = self.get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm); + let _ = self + .get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm) + .await; } drop(pkm); @@ -1503,9 +1518,7 @@ impl Service { .await { trace!("Got signing keys: {:?}", keys); - let mut pkm = pub_key_map - .write() - .map_err(|_| Error::bad_database("RwLock is poisoned."))?; + let mut pkm = pub_key_map.write().await; for k in keys.server_keys { let k = match k.deserialize() { Ok(key) => key, @@ -1564,10 +1577,7 @@ impl Service { .into_iter() .map(|(k, v)| (k.to_string(), v.key)) .collect(); - pub_key_map - .write() - .map_err(|_| Error::bad_database("RwLock is poisoned."))? - .insert(origin.to_string(), result); + pub_key_map.write().await.insert(origin.to_string(), result); } } info!("Done handling result"); @@ -1632,14 +1642,14 @@ impl Service { .globals .servername_ratelimiter .read() - .unwrap() + .await .get(origin) .map(|s| Arc::clone(s).acquire_owned()); let permit = match permit { Some(p) => p, None => { - let mut write = services().globals.servername_ratelimiter.write().unwrap(); + let mut write = services().globals.servername_ratelimiter.write().await; let s = Arc::clone( write .entry(origin.to_owned()) @@ -1651,24 +1661,26 @@ impl Service { } .await; - let back_off = |id| match services() - .globals - .bad_signature_ratelimiter - .write() - .unwrap() - .entry(id) - { - hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); + let back_off = |id| async { + match services() + .globals + .bad_signature_ratelimiter + .write() + .await + .entry(id) + { + hash_map::Entry::Vacant(e) => { + e.insert((Instant::now(), 1)); + } + hash_map::Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), } - hash_map::Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), }; if let Some((time, tries)) = services() .globals .bad_signature_ratelimiter .read() - .unwrap() + .await .get(&signature_ids) { // Exponential backoff @@ -1775,7 +1787,7 @@ impl Service { drop(permit); - back_off(signature_ids); + back_off(signature_ids).await; warn!("Failed to find public key for server: {}", origin); Err(Error::BadServerResponse( diff --git a/src/service/rooms/lazy_loading/mod.rs b/src/service/rooms/lazy_loading/mod.rs index c51a57e9..e2594a0a 100644 --- a/src/service/rooms/lazy_loading/mod.rs +++ b/src/service/rooms/lazy_loading/mod.rs @@ -1,11 +1,9 @@ mod data; -use std::{ - collections::{HashMap, HashSet}, - sync::Mutex, -}; +use std::collections::{HashMap, HashSet}; pub use data::Data; use ruma::{DeviceId, OwnedDeviceId, OwnedRoomId, OwnedUserId, RoomId, UserId}; +use tokio::sync::Mutex; use crate::Result; @@ -33,7 +31,7 @@ impl Service { } #[tracing::instrument(skip(self))] - pub fn lazy_load_mark_sent( + pub async fn lazy_load_mark_sent( &self, user_id: &UserId, device_id: &DeviceId, @@ -41,7 +39,7 @@ impl Service { lazy_load: HashSet<OwnedUserId>, count: PduCount, ) { - self.lazy_load_waiting.lock().unwrap().insert( + self.lazy_load_waiting.lock().await.insert( ( user_id.to_owned(), device_id.to_owned(), @@ -53,14 +51,14 @@ impl Service { } #[tracing::instrument(skip(self))] - pub fn lazy_load_confirm_delivery( + pub async fn lazy_load_confirm_delivery( &self, user_id: &UserId, device_id: &DeviceId, room_id: &RoomId, since: PduCount, ) -> Result<()> { - if let Some(user_ids) = self.lazy_load_waiting.lock().unwrap().remove(&( + if let Some(user_ids) = self.lazy_load_waiting.lock().await.remove(&( user_id.to_owned(), device_id.to_owned(), room_id.to_owned(), diff --git a/src/service/rooms/spaces/mod.rs b/src/service/rooms/spaces/mod.rs index b0a9ed2a..981d4a37 100644 --- a/src/service/rooms/spaces/mod.rs +++ b/src/service/rooms/spaces/mod.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use lru_cache::LruCache; use ruma::{ @@ -25,6 +25,7 @@ use ruma::{ space::SpaceRoomJoinRule, OwnedRoomId, RoomId, UserId, }; +use tokio::sync::Mutex; use tracing::{debug, error, warn}; @@ -79,7 +80,7 @@ impl Service { if let Some(cached) = self .roomid_spacechunk_cache .lock() - .unwrap() + .await .get_mut(¤t_room.to_owned()) .as_ref() { @@ -171,7 +172,7 @@ impl Service { .transpose()? .unwrap_or(JoinRule::Invite); - self.roomid_spacechunk_cache.lock().unwrap().insert( + self.roomid_spacechunk_cache.lock().await.insert( current_room.clone(), Some(CachedSpaceChunk { chunk, @@ -265,7 +266,7 @@ impl Service { } } - self.roomid_spacechunk_cache.lock().unwrap().insert( + self.roomid_spacechunk_cache.lock().await.insert( current_room.clone(), Some(CachedSpaceChunk { chunk, @@ -289,7 +290,7 @@ impl Service { } else { self.roomid_spacechunk_cache .lock() - .unwrap() + .await .insert(current_room.clone(), None); } } diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index c209eb5a..f6581bb5 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -95,7 +95,7 @@ impl Service { .spaces .roomid_spacechunk_cache .lock() - .unwrap() + .await .remove(&pdu.room_id); } _ => continue, diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index b66fc645..097cc82f 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -2,12 +2,8 @@ mod data; use std::{ cmp::Ordering, - collections::{BTreeMap, HashMap}, -}; - -use std::{ - collections::HashSet, - sync::{Arc, Mutex, RwLock}, + collections::{BTreeMap, HashMap, HashSet}, + sync::Arc, }; pub use data::Data; @@ -32,7 +28,7 @@ use ruma::{ }; use serde::Deserialize; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; -use tokio::sync::MutexGuard; +use tokio::sync::{Mutex, MutexGuard, RwLock}; use tracing::{error, info, warn}; use crate::{ @@ -201,7 +197,7 @@ impl Service { /// /// Returns pdu id #[tracing::instrument(skip(self, pdu, pdu_json, leaves))] - pub fn append_pdu<'a>( + pub async fn append_pdu<'a>( &self, pdu: &PduEvent, mut pdu_json: CanonicalJsonObject, @@ -263,11 +259,11 @@ impl Service { .globals .roomid_mutex_insert .write() - .unwrap() + .await .entry(pdu.room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; let count1 = services().globals.next_count()?; // Mark as read first so the sending client doesn't get a notification even if appending @@ -395,7 +391,7 @@ impl Service { .spaces .roomid_spacechunk_cache .lock() - .unwrap() + .await .remove(&pdu.room_id); } } @@ -806,7 +802,7 @@ impl Service { /// Creates a new persisted data unit and adds it to a room. This function takes a /// roomid_mutex_state, meaning that only this function is able to mutate the room state. #[tracing::instrument(skip(self, state_lock))] - pub fn build_and_append_pdu( + pub async fn build_and_append_pdu( &self, pdu_builder: PduBuilder, sender: &UserId, @@ -902,14 +898,16 @@ impl Service { // pdu without it's state. This is okay because append_pdu can't fail. let statehashid = services().rooms.state.append_to_state(&pdu)?; - let pdu_id = self.append_pdu( - &pdu, - pdu_json, - // Since this PDU references all pdu_leaves we can update the leaves - // of the room - vec![(*pdu.event_id).to_owned()], - state_lock, - )?; + let pdu_id = self + .append_pdu( + &pdu, + pdu_json, + // Since this PDU references all pdu_leaves we can update the leaves + // of the room + vec![(*pdu.event_id).to_owned()], + state_lock, + ) + .await?; // We set the room state after inserting the pdu, so that we never have a moment in time // where events in the current room state do not exist @@ -947,7 +945,7 @@ impl Service { /// Append the incoming event setting the state snapshot to the state from the /// server that sent the event. #[tracing::instrument(skip_all)] - pub fn append_incoming_pdu<'a>( + pub async fn append_incoming_pdu<'a>( &self, pdu: &PduEvent, pdu_json: CanonicalJsonObject, @@ -977,11 +975,11 @@ impl Service { return Ok(None); } - let pdu_id = - services() - .rooms - .timeline - .append_pdu(pdu, pdu_json, new_room_leaves, state_lock)?; + let pdu_id = services() + .rooms + .timeline + .append_pdu(pdu, pdu_json, new_room_leaves, state_lock) + .await?; Ok(Some(pdu_id)) } @@ -1118,7 +1116,7 @@ impl Service { .globals .roomid_mutex_federation .write() - .unwrap() + .await .entry(room_id.to_owned()) .or_default(), ); @@ -1150,11 +1148,11 @@ impl Service { .globals .roomid_mutex_insert .write() - .unwrap() + .await .entry(room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; let count = services().globals.next_count()?; let mut pdu_id = shortroomid.to_be_bytes().to_vec();