mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-04-22 14:10:16 +03:00
feat: knocking
You may notice that we do no database migration for populating the state cache for knocking. This is because that in all the places where we use the state cache, it doesn't make a difference: - For local users, the clients wouldn't have been able to knock on rooms, as the `/knock` endpoint wasn't implemented yet, and I am not aware of any client which tries to knock over `/state`, as it would fail if the server is not currently in the room - It is not used for remote users
This commit is contained in:
parent
f4d90e9989
commit
21af83ea72
15 changed files with 1362 additions and 867 deletions
src
|
@ -1,15 +1,7 @@
|
|||
use crate::{services, Error, Result, Ruma};
|
||||
use rand::seq::SliceRandom;
|
||||
use ruma::{
|
||||
api::{
|
||||
appservice,
|
||||
client::{
|
||||
alias::{create_alias, delete_alias, get_alias},
|
||||
error::ErrorKind,
|
||||
},
|
||||
federation,
|
||||
},
|
||||
OwnedRoomAliasId,
|
||||
use ruma::api::client::{
|
||||
alias::{create_alias, delete_alias, get_alias},
|
||||
error::ErrorKind,
|
||||
};
|
||||
|
||||
/// # `PUT /_matrix/client/r0/directory/room/{roomAlias}`
|
||||
|
@ -115,75 +107,9 @@ pub async fn delete_alias_route(
|
|||
pub async fn get_alias_route(
|
||||
body: Ruma<get_alias::v3::Request>,
|
||||
) -> Result<get_alias::v3::Response> {
|
||||
get_alias_helper(body.body.room_alias).await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_alias_helper(
|
||||
room_alias: OwnedRoomAliasId,
|
||||
) -> Result<get_alias::v3::Response> {
|
||||
if room_alias.server_name() != services().globals.server_name() {
|
||||
let response = services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
room_alias.server_name(),
|
||||
federation::query::get_room_information::v1::Request {
|
||||
room_alias: room_alias.to_owned(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut servers = response.servers;
|
||||
servers.shuffle(&mut rand::thread_rng());
|
||||
|
||||
return Ok(get_alias::v3::Response::new(response.room_id, servers));
|
||||
}
|
||||
|
||||
let mut room_id = None;
|
||||
match services().rooms.alias.resolve_local_alias(&room_alias)? {
|
||||
Some(r) => room_id = Some(r),
|
||||
None => {
|
||||
for appservice in services().appservice.read().await.values() {
|
||||
if appservice.aliases.is_match(room_alias.as_str())
|
||||
&& matches!(
|
||||
services()
|
||||
.sending
|
||||
.send_appservice_request(
|
||||
appservice.registration.clone(),
|
||||
appservice::query::query_room_alias::v1::Request {
|
||||
room_alias: room_alias.clone(),
|
||||
},
|
||||
)
|
||||
.await,
|
||||
Ok(Some(_opt_result))
|
||||
)
|
||||
{
|
||||
room_id = Some(
|
||||
services()
|
||||
.rooms
|
||||
.alias
|
||||
.resolve_local_alias(&room_alias)?
|
||||
.ok_or_else(|| {
|
||||
Error::bad_config("Appservice lied to us. Room does not exist.")
|
||||
})?,
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let room_id = match room_id {
|
||||
Some(room_id) => room_id,
|
||||
None => {
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::NotFound,
|
||||
"Room with alias not found.",
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
Ok(get_alias::v3::Response::new(
|
||||
room_id,
|
||||
vec![services().globals.server_name().to_owned()],
|
||||
))
|
||||
services()
|
||||
.rooms
|
||||
.alias
|
||||
.get_alias_helper(body.body.room_alias)
|
||||
.await
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -10,7 +10,8 @@ use ruma::{
|
|||
self,
|
||||
v3::{
|
||||
Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
|
||||
LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
|
||||
KnockState, KnockedRoom, LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms,
|
||||
State, Timeline, ToDevice,
|
||||
},
|
||||
v4::{SlidingOp, SlidingSyncRoomHero},
|
||||
DeviceLists, UnreadNotificationsCount,
|
||||
|
@ -503,6 +504,50 @@ async fn sync_helper(
|
|||
);
|
||||
}
|
||||
|
||||
let mut knocked_rooms = BTreeMap::new();
|
||||
let all_knocked_rooms: Vec<_> = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.rooms_knocked(&sender_user)
|
||||
.collect();
|
||||
for result in all_knocked_rooms {
|
||||
let (room_id, knock_state_events) = result?;
|
||||
|
||||
{
|
||||
// Get and drop the lock to wait for remaining operations to finish
|
||||
let mutex_insert = Arc::clone(
|
||||
services()
|
||||
.globals
|
||||
.roomid_mutex_insert
|
||||
.write()
|
||||
.await
|
||||
.entry(room_id.clone())
|
||||
.or_default(),
|
||||
);
|
||||
let insert_lock = mutex_insert.lock().await;
|
||||
drop(insert_lock);
|
||||
}
|
||||
|
||||
let knock_count = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.get_knock_count(&room_id, &sender_user)?;
|
||||
|
||||
// knock before last sync
|
||||
if Some(since) >= knock_count {
|
||||
continue;
|
||||
}
|
||||
|
||||
knocked_rooms.insert(
|
||||
room_id.clone(),
|
||||
KnockedRoom {
|
||||
knock_state: KnockState {
|
||||
events: knock_state_events,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
for user_id in left_encrypted_users {
|
||||
let dont_share_encrypted_room = services()
|
||||
.rooms
|
||||
|
@ -538,7 +583,7 @@ async fn sync_helper(
|
|||
leave: left_rooms,
|
||||
join: joined_rooms,
|
||||
invite: invited_rooms,
|
||||
knock: BTreeMap::new(), // TODO
|
||||
knock: knocked_rooms,
|
||||
},
|
||||
presence: Presence {
|
||||
events: presence_updates
|
||||
|
|
|
@ -31,6 +31,7 @@ use ruma::{
|
|||
},
|
||||
event::{get_event, get_missing_events, get_room_state, get_room_state_ids},
|
||||
keys::{claim_keys, get_keys},
|
||||
knock::{create_knock_event_template, send_knock},
|
||||
membership::{
|
||||
create_invite, create_join_event, create_leave_event, prepare_join_event,
|
||||
prepare_leave_event,
|
||||
|
@ -1497,6 +1498,28 @@ pub async fn get_room_state_ids_route(
|
|||
})
|
||||
}
|
||||
|
||||
/// # `GET /_matrix/federation/v1/make_knock/{roomId}/{userId}`
|
||||
///
|
||||
/// Creates a knock template.
|
||||
pub async fn create_knock_event_template_route(
|
||||
body: Ruma<create_knock_event_template::v1::Request>,
|
||||
) -> Result<create_knock_event_template::v1::Response> {
|
||||
let (mutex_state, room_version_id) =
|
||||
member_shake_preamble(&body.sender_servername, &body.room_id).await?;
|
||||
let state_lock = mutex_state.lock().await;
|
||||
|
||||
Ok(create_knock_event_template::v1::Response {
|
||||
room_version: room_version_id,
|
||||
event: create_membership_template(
|
||||
&body.user_id,
|
||||
&body.room_id,
|
||||
None,
|
||||
MembershipState::Knock,
|
||||
state_lock,
|
||||
)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// # `GET /_matrix/federation/v1/make_leave/{roomId}/{userId}`
|
||||
///
|
||||
/// Creates a leave template.
|
||||
|
@ -1933,6 +1956,31 @@ pub async fn create_leave_event_route(
|
|||
Ok(create_leave_event::v2::Response {})
|
||||
}
|
||||
|
||||
/// # `PUT /_matrix/federation/v1/send_knock/{roomId}/{eventId}`
|
||||
///
|
||||
/// Submits a signed knock event.
|
||||
pub async fn create_knock_event_route(
|
||||
body: Ruma<send_knock::v1::Request>,
|
||||
) -> Result<send_knock::v1::Response> {
|
||||
let sender_servername = body
|
||||
.sender_servername
|
||||
.as_ref()
|
||||
.expect("server is authenticated");
|
||||
room_and_acl_check(&body.room_id, sender_servername)?;
|
||||
|
||||
append_member_pdu(
|
||||
MembershipState::Knock,
|
||||
sender_servername,
|
||||
&body.room_id,
|
||||
&body.pdu,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(send_knock::v1::Response {
|
||||
knock_room_state: services().rooms.state.stripped_state(&body.room_id)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Checks whether the given user can join the given room via a restricted join.
|
||||
/// This doesn't check the current user's membership. This should be done externally,
|
||||
/// either by using the state cache or attempting to authorize the event.
|
||||
|
@ -2012,44 +2060,54 @@ fn user_can_perform_restricted_join(
|
|||
pub async fn create_invite_route(
|
||||
body: Ruma<create_invite::v2::Request>,
|
||||
) -> Result<create_invite::v2::Response> {
|
||||
let sender_servername = body
|
||||
.sender_servername
|
||||
.as_ref()
|
||||
.expect("server is authenticated");
|
||||
let Ruma::<create_invite::v2::Request> {
|
||||
body,
|
||||
sender_servername,
|
||||
..
|
||||
} = body;
|
||||
|
||||
let create_invite::v2::Request {
|
||||
room_id,
|
||||
room_version,
|
||||
event,
|
||||
invite_room_state,
|
||||
..
|
||||
} = body;
|
||||
|
||||
let sender_servername = sender_servername.expect("server is authenticated");
|
||||
|
||||
services()
|
||||
.rooms
|
||||
.event_handler
|
||||
.acl_check(sender_servername, &body.room_id)?;
|
||||
|
||||
.acl_check(&sender_servername, &room_id)?;
|
||||
if !services()
|
||||
.globals
|
||||
.supported_room_versions()
|
||||
.contains(&body.room_version)
|
||||
.contains(&room_version)
|
||||
{
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::IncompatibleRoomVersion {
|
||||
room_version: body.room_version.clone(),
|
||||
room_version: room_version.clone(),
|
||||
},
|
||||
"Server does not support this room version.",
|
||||
));
|
||||
}
|
||||
|
||||
let mut signed_event = utils::to_canonical_object(&body.event)
|
||||
let mut signed_event = utils::to_canonical_object(&event)
|
||||
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invite event is invalid."))?;
|
||||
|
||||
ruma::signatures::hash_and_sign_event(
|
||||
services().globals.server_name().as_str(),
|
||||
services().globals.keypair(),
|
||||
&mut signed_event,
|
||||
&body.room_version,
|
||||
&room_version,
|
||||
)
|
||||
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Failed to sign event."))?;
|
||||
|
||||
// Generate event id
|
||||
let event_id = EventId::parse(format!(
|
||||
"${}",
|
||||
ruma::signatures::reference_hash(&signed_event, &body.room_version)
|
||||
ruma::signatures::reference_hash(&signed_event, &room_version)
|
||||
.expect("Event format validated when event was hashed")
|
||||
))
|
||||
.expect("ruma's reference hashes are valid event ids");
|
||||
|
@ -2084,9 +2142,9 @@ pub async fn create_invite_route(
|
|||
)
|
||||
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "state_key is not a user id."))?;
|
||||
|
||||
let mut invite_state = body.invite_room_state.clone();
|
||||
let mut invite_state = invite_room_state.clone();
|
||||
|
||||
let mut event: JsonObject = serde_json::from_str(body.event.get())
|
||||
let mut event: JsonObject = serde_json::from_str(event.get())
|
||||
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event bytes."))?;
|
||||
|
||||
event.insert("event_id".to_owned(), "$dummy".into());
|
||||
|
@ -2102,16 +2160,55 @@ pub async fn create_invite_route(
|
|||
if !services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(services().globals.server_name(), &body.room_id)?
|
||||
.server_in_room(services().globals.server_name(), &room_id)?
|
||||
{
|
||||
services().rooms.state_cache.update_membership(
|
||||
&body.room_id,
|
||||
&invited_user,
|
||||
MembershipState::Invite,
|
||||
&sender,
|
||||
Some(invite_state),
|
||||
true,
|
||||
)?;
|
||||
// If the user has already knocked on the room, we take that as the user wanting to join
|
||||
// the room as soon as their knock is accepted, as recommended by the spec.
|
||||
//
|
||||
// https://spec.matrix.org/v1.13/client-server-api/#knocking-on-rooms
|
||||
if services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.is_knocked(&invited_user, &room_id)
|
||||
.unwrap_or_default()
|
||||
{
|
||||
// We want to try join automatically first, before notifying clients that they were invited.
|
||||
// We also shouldn't block giving the calling server the response on attempting to join the
|
||||
// room, since it's not relevant for the caller.
|
||||
tokio::spawn(async move {
|
||||
if services().rooms.helpers.join_room_by_id(&invited_user, &room_id, None, &invite_state.iter() .filter_map(|event| event.deserialize().ok())
|
||||
.map(|event| event.sender().server_name().to_owned())
|
||||
.collect::<Vec<_>>()
|
||||
, None)
|
||||
.await
|
||||
.is_err() &&
|
||||
// Checking whether the state has changed since we started this join handshake
|
||||
services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.is_knocked(&invited_user, &room_id)
|
||||
.unwrap_or_default()
|
||||
{
|
||||
let _ = services().rooms.state_cache.update_membership(
|
||||
&room_id,
|
||||
&invited_user,
|
||||
MembershipState::Invite,
|
||||
&sender,
|
||||
Some(invite_state),
|
||||
true,
|
||||
);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
services().rooms.state_cache.update_membership(
|
||||
&room_id,
|
||||
&invited_user,
|
||||
MembershipState::Invite,
|
||||
&sender,
|
||||
Some(invite_state),
|
||||
true,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(create_invite::v2::Response {
|
||||
|
|
|
@ -27,6 +27,8 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
|
|||
self.roomuserid_joined.insert(&roomuser_id, &[])?;
|
||||
self.userroomid_invitestate.remove(&userroom_id)?;
|
||||
self.roomuserid_invitecount.remove(&roomuser_id)?;
|
||||
self.userroomid_knockstate.remove(&userroom_id)?;
|
||||
self.roomuserid_knockcount.remove(&roomuser_id)?;
|
||||
self.userroomid_leftstate.remove(&userroom_id)?;
|
||||
self.roomuserid_leftcount.remove(&roomuser_id)?;
|
||||
|
||||
|
@ -52,12 +54,40 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
|
|||
)?;
|
||||
self.userroomid_joined.remove(&userroom_id)?;
|
||||
self.roomuserid_joined.remove(&roomuser_id)?;
|
||||
self.userroomid_knockstate.remove(&userroom_id)?;
|
||||
self.roomuserid_knockcount.remove(&roomuser_id)?;
|
||||
self.userroomid_leftstate.remove(&userroom_id)?;
|
||||
self.roomuserid_leftcount.remove(&roomuser_id)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mark_as_knocked(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
|
||||
) -> Result<()> {
|
||||
let (roomuser_id, userroom_id) = get_room_and_user_byte_ids(room_id, user_id);
|
||||
|
||||
self.userroomid_knockstate.insert(
|
||||
&userroom_id,
|
||||
&serde_json::to_vec(&last_state.unwrap_or_default())
|
||||
.expect("state to bytes always works"),
|
||||
)?;
|
||||
self.roomuserid_knockcount.insert(
|
||||
&roomuser_id,
|
||||
&services().globals.next_count()?.to_be_bytes(),
|
||||
)?;
|
||||
self.userroomid_joined.remove(&userroom_id)?;
|
||||
self.roomuserid_joined.remove(&roomuser_id)?;
|
||||
self.userroomid_invitestate.remove(&userroom_id)?;
|
||||
self.roomuserid_invitecount.remove(&roomuser_id)?;
|
||||
self.userroomid_leftstate.remove(&userroom_id)?;
|
||||
self.roomuserid_leftcount.remove(&roomuser_id)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
|
||||
let (roomuser_id, userroom_id) = get_room_and_user_byte_ids(room_id, user_id);
|
||||
|
@ -74,6 +104,8 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
|
|||
self.roomuserid_joined.remove(&roomuser_id)?;
|
||||
self.userroomid_invitestate.remove(&userroom_id)?;
|
||||
self.roomuserid_invitecount.remove(&roomuser_id)?;
|
||||
self.userroomid_knockstate.remove(&userroom_id)?;
|
||||
self.roomuserid_knockcount.remove(&roomuser_id)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -390,6 +422,21 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
|
|||
})
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn get_knock_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
|
||||
let mut key = room_id.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
self.roomuserid_knockcount
|
||||
.get(&key)?
|
||||
.map_or(Ok(None), |bytes| {
|
||||
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
|
||||
Error::bad_database("Invalid knockcount in db.")
|
||||
})?))
|
||||
})
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
|
||||
let mut key = room_id.as_bytes().to_vec();
|
||||
|
@ -440,6 +487,16 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
|
|||
scan_userroom_id_memberstate_tree(user_id, &self.userroomid_invitestate)
|
||||
}
|
||||
|
||||
/// Returns an iterator over all rooms a user has knocked on.
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn rooms_knocked<'a>(
|
||||
&'a self,
|
||||
user_id: &UserId,
|
||||
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a> {
|
||||
scan_userroom_id_memberstate_tree(user_id, &self.userroomid_knockstate)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn invite_state(
|
||||
&self,
|
||||
|
@ -461,6 +518,27 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
|
|||
.transpose()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn knock_state(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
) -> Result<Option<Vec<Raw<AnyStrippedStateEvent>>>> {
|
||||
let mut key = user_id.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(room_id.as_bytes());
|
||||
|
||||
self.userroomid_knockstate
|
||||
.get(&key)?
|
||||
.map(|state| {
|
||||
let state = serde_json::from_slice(&state)
|
||||
.map_err(|_| Error::bad_database("Invalid state in userroomid_knockstate."))?;
|
||||
|
||||
Ok(state)
|
||||
})
|
||||
.transpose()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn left_state(
|
||||
&self,
|
||||
|
|
|
@ -99,6 +99,8 @@ pub struct KeyValueDatabase {
|
|||
pub(super) roomuseroncejoinedids: Arc<dyn KvTree>,
|
||||
pub(super) userroomid_invitestate: Arc<dyn KvTree>, // InviteState = Vec<Raw<AnyStrippedStateEvent>>
|
||||
pub(super) roomuserid_invitecount: Arc<dyn KvTree>, // InviteCount = Count
|
||||
pub(super) userroomid_knockstate: Arc<dyn KvTree>, // KnockState = Vec<Raw<AnyStrippedStateEvent>>
|
||||
pub(super) roomuserid_knockcount: Arc<dyn KvTree>, // KnockCount = Count
|
||||
pub(super) userroomid_leftstate: Arc<dyn KvTree>,
|
||||
pub(super) roomuserid_leftcount: Arc<dyn KvTree>,
|
||||
|
||||
|
@ -313,6 +315,8 @@ impl KeyValueDatabase {
|
|||
roomuseroncejoinedids: builder.open_tree("roomuseroncejoinedids")?,
|
||||
userroomid_invitestate: builder.open_tree("userroomid_invitestate")?,
|
||||
roomuserid_invitecount: builder.open_tree("roomuserid_invitecount")?,
|
||||
userroomid_knockstate: builder.open_tree("userroomid_knockstate")?,
|
||||
roomuserid_knockcount: builder.open_tree("roomuserid_knockcount")?,
|
||||
userroomid_leftstate: builder.open_tree("userroomid_leftstate")?,
|
||||
roomuserid_leftcount: builder.open_tree("roomuserid_leftcount")?,
|
||||
|
||||
|
|
|
@ -345,6 +345,7 @@ fn routes(config: &Config) -> Router {
|
|||
.ruma_route(client_server::get_alias_route)
|
||||
.ruma_route(client_server::join_room_by_id_route)
|
||||
.ruma_route(client_server::join_room_by_id_or_alias_route)
|
||||
.ruma_route(client_server::knock_room_route)
|
||||
.ruma_route(client_server::joined_members_route)
|
||||
.ruma_route(client_server::leave_room_route)
|
||||
.ruma_route(client_server::forget_room_route)
|
||||
|
@ -460,6 +461,8 @@ fn routes(config: &Config) -> Router {
|
|||
.ruma_route(server_server::create_join_event_v2_route)
|
||||
.ruma_route(server_server::create_leave_event_template_route)
|
||||
.ruma_route(server_server::create_leave_event_route)
|
||||
.ruma_route(server_server::create_knock_event_template_route)
|
||||
.ruma_route(server_server::create_knock_event_route)
|
||||
.ruma_route(server_server::create_invite_route)
|
||||
.ruma_route(server_server::get_devices_route)
|
||||
.ruma_route(server_server::get_content_route)
|
||||
|
|
|
@ -73,6 +73,7 @@ impl Services {
|
|||
},
|
||||
},
|
||||
event_handler: rooms::event_handler::Service,
|
||||
helpers: rooms::helpers::Service,
|
||||
lazy_loading: rooms::lazy_loading::Service {
|
||||
db,
|
||||
lazy_load_waiting: Mutex::new(HashMap::new()),
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
mod data;
|
||||
|
||||
pub use data::Data;
|
||||
use rand::seq::SliceRandom;
|
||||
use tracing::error;
|
||||
|
||||
use crate::{services, Error, Result};
|
||||
use ruma::{
|
||||
api::client::error::ErrorKind,
|
||||
api::{
|
||||
appservice,
|
||||
client::{alias::get_alias, error::ErrorKind},
|
||||
federation,
|
||||
},
|
||||
events::{
|
||||
room::power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
|
||||
StateEventType,
|
||||
|
@ -98,4 +103,72 @@ impl Service {
|
|||
) -> Box<dyn Iterator<Item = Result<OwnedRoomAliasId>> + 'a> {
|
||||
self.db.local_aliases_for_room(room_id)
|
||||
}
|
||||
|
||||
/// Resolves an alias to a room id, and a set of servers to join or knock via, either locally or over federation
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub async fn get_alias_helper(
|
||||
&self,
|
||||
room_alias: OwnedRoomAliasId,
|
||||
) -> Result<get_alias::v3::Response> {
|
||||
if room_alias.server_name() != services().globals.server_name() {
|
||||
let response = services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
room_alias.server_name(),
|
||||
federation::query::get_room_information::v1::Request {
|
||||
room_alias: room_alias.to_owned(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut servers = response.servers;
|
||||
servers.shuffle(&mut rand::thread_rng());
|
||||
|
||||
return Ok(get_alias::v3::Response::new(response.room_id, servers));
|
||||
}
|
||||
|
||||
let mut room_id = None;
|
||||
match services().rooms.alias.resolve_local_alias(&room_alias)? {
|
||||
Some(r) => room_id = Some(r),
|
||||
None => {
|
||||
for appservice in services().appservice.read().await.values() {
|
||||
if appservice.aliases.is_match(room_alias.as_str())
|
||||
&& matches!(
|
||||
services()
|
||||
.sending
|
||||
.send_appservice_request(
|
||||
appservice.registration.clone(),
|
||||
appservice::query::query_room_alias::v1::Request {
|
||||
room_alias: room_alias.clone(),
|
||||
},
|
||||
)
|
||||
.await,
|
||||
Ok(Some(_opt_result))
|
||||
)
|
||||
{
|
||||
room_id =
|
||||
Some(self.resolve_local_alias(&room_alias)?.ok_or_else(|| {
|
||||
Error::bad_config("Appservice lied to us. Room does not exist.")
|
||||
})?);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let room_id = match room_id {
|
||||
Some(room_id) => room_id,
|
||||
None => {
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::NotFound,
|
||||
"Room with alias not found.",
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
Ok(get_alias::v3::Response::new(
|
||||
room_id,
|
||||
vec![services().globals.server_name().to_owned()],
|
||||
))
|
||||
}
|
||||
}
|
||||
|
|
699
src/service/rooms/helpers/mod.rs
Normal file
699
src/service/rooms/helpers/mod.rs
Normal file
|
@ -0,0 +1,699 @@
|
|||
use std::{
|
||||
collections::{hash_map::Entry, BTreeMap, HashMap},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use ruma::{
|
||||
api::{
|
||||
client::{
|
||||
error::ErrorKind,
|
||||
membership::{join_room_by_id, ThirdPartySigned},
|
||||
},
|
||||
federation,
|
||||
},
|
||||
canonical_json::to_canonical_value,
|
||||
events::{
|
||||
room::{
|
||||
join_rules::{AllowRule, JoinRule, RoomJoinRulesEventContent},
|
||||
member::{MembershipState, RoomMemberEventContent},
|
||||
},
|
||||
TimelineEventType,
|
||||
},
|
||||
state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
|
||||
OwnedEventId, OwnedServerName, OwnedUserId, RoomId, RoomVersionId, UserId,
|
||||
};
|
||||
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::{
|
||||
service::{
|
||||
globals::SigningKeys,
|
||||
pdu::{gen_event_id_canonical_json, PduBuilder},
|
||||
},
|
||||
services, utils, Error, PduEvent, Result,
|
||||
};
|
||||
|
||||
pub struct Service;
|
||||
|
||||
impl Service {
|
||||
/// Attempts to join a room.
|
||||
/// If the room cannot be joined locally, it attempts to join over federation, soley using the
|
||||
/// specified servers
|
||||
#[tracing::instrument(skip(self, reason, servers, _third_party_signed))]
|
||||
pub async fn join_room_by_id(
|
||||
&self,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
reason: Option<String>,
|
||||
servers: &[OwnedServerName],
|
||||
_third_party_signed: Option<&ThirdPartySigned>,
|
||||
) -> Result<join_room_by_id::v3::Response, Error> {
|
||||
if let Ok(true) = services().rooms.state_cache.is_joined(sender_user, room_id) {
|
||||
return Ok(join_room_by_id::v3::Response {
|
||||
room_id: room_id.into(),
|
||||
});
|
||||
}
|
||||
|
||||
let mutex_state = Arc::clone(
|
||||
services()
|
||||
.globals
|
||||
.roomid_mutex_state
|
||||
.write()
|
||||
.await
|
||||
.entry(room_id.to_owned())
|
||||
.or_default(),
|
||||
);
|
||||
let state_lock = mutex_state.lock().await;
|
||||
|
||||
// Ask a remote server if we are not participating in this room
|
||||
if !services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(services().globals.server_name(), room_id)?
|
||||
{
|
||||
info!("Joining {room_id} over federation.");
|
||||
|
||||
let (make_join_response, remote_server) =
|
||||
make_join_request(sender_user, room_id, servers).await?;
|
||||
|
||||
info!("make_join finished");
|
||||
|
||||
let room_version_id = match make_join_response.room_version {
|
||||
Some(room_version)
|
||||
if services()
|
||||
.globals
|
||||
.supported_room_versions()
|
||||
.contains(&room_version) =>
|
||||
{
|
||||
room_version
|
||||
}
|
||||
_ => return Err(Error::BadServerResponse("Room version is not supported")),
|
||||
};
|
||||
|
||||
let (event_id, mut join_event, _) = self.populate_membership_template(
|
||||
&make_join_response.event,
|
||||
sender_user,
|
||||
reason,
|
||||
&room_version_id,
|
||||
MembershipState::Join,
|
||||
)?;
|
||||
|
||||
info!("Asking {remote_server} for send_join");
|
||||
let send_join_response = services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&remote_server,
|
||||
federation::membership::create_join_event::v2::Request {
|
||||
room_id: room_id.to_owned(),
|
||||
event_id: event_id.to_owned(),
|
||||
pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()),
|
||||
omit_members: false,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("send_join finished");
|
||||
|
||||
if let Some(signed_raw) = &send_join_response.room_state.event {
|
||||
info!("There is a signed event. This room is probably using restricted joins. Adding signature to our event");
|
||||
let (signed_event_id, signed_value) =
|
||||
match gen_event_id_canonical_json(signed_raw, &room_version_id) {
|
||||
Ok(t) => t,
|
||||
Err(_) => {
|
||||
// Event could not be converted to canonical json
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Could not convert event to canonical json.",
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if signed_event_id != event_id {
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Server sent event with wrong event id",
|
||||
));
|
||||
}
|
||||
|
||||
match signed_value["signatures"]
|
||||
.as_object()
|
||||
.ok_or(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Server sent invalid signatures type",
|
||||
))
|
||||
.and_then(|e| {
|
||||
e.get(remote_server.as_str()).ok_or(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Server did not send its signature",
|
||||
))
|
||||
}) {
|
||||
Ok(signature) => {
|
||||
join_event
|
||||
.get_mut("signatures")
|
||||
.expect("we created a valid pdu")
|
||||
.as_object_mut()
|
||||
.expect("we created a valid pdu")
|
||||
.insert(remote_server.to_string(), signature.clone());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Server {remote_server} sent invalid signature in sendjoin signatures for event {signed_value:?}: {e:?}",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
services().rooms.short.get_or_create_shortroomid(room_id)?;
|
||||
|
||||
info!("Parsing join event");
|
||||
let parsed_join_pdu = PduEvent::from_id_val(&event_id, join_event.clone())
|
||||
.map_err(|_| Error::BadServerResponse("Invalid join event PDU."))?;
|
||||
|
||||
let mut state = HashMap::new();
|
||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||
|
||||
info!("Fetching join signing keys");
|
||||
services()
|
||||
.rooms
|
||||
.event_handler
|
||||
.fetch_join_signing_keys(&send_join_response, &room_version_id, &pub_key_map)
|
||||
.await?;
|
||||
|
||||
info!("Going through send_join response room_state");
|
||||
for result in send_join_response
|
||||
.room_state
|
||||
.state
|
||||
.iter()
|
||||
.map(|pdu| validate_and_add_event_id(pdu, &room_version_id, &pub_key_map))
|
||||
{
|
||||
let (event_id, value) = match result.await {
|
||||
Ok(t) => t,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| {
|
||||
warn!("Invalid PDU in send_join response: {} {:?}", e, value);
|
||||
Error::BadServerResponse("Invalid PDU in send_join response.")
|
||||
})?;
|
||||
|
||||
services()
|
||||
.rooms
|
||||
.outlier
|
||||
.add_pdu_outlier(&event_id, &value)?;
|
||||
if let Some(state_key) = &pdu.state_key {
|
||||
let shortstatekey = services()
|
||||
.rooms
|
||||
.short
|
||||
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)?;
|
||||
state.insert(shortstatekey, pdu.event_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
info!("Going through send_join response auth_chain");
|
||||
for result in send_join_response
|
||||
.room_state
|
||||
.auth_chain
|
||||
.iter()
|
||||
.map(|pdu| validate_and_add_event_id(pdu, &room_version_id, &pub_key_map))
|
||||
{
|
||||
let (event_id, value) = match result.await {
|
||||
Ok(t) => t,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
services()
|
||||
.rooms
|
||||
.outlier
|
||||
.add_pdu_outlier(&event_id, &value)?;
|
||||
}
|
||||
|
||||
info!("Running send_join auth check");
|
||||
let authenticated = state_res::event_auth::auth_check(
|
||||
&state_res::RoomVersion::new(&room_version_id).expect("room version is supported"),
|
||||
&parsed_join_pdu,
|
||||
None::<PduEvent>, // TODO: third party invite
|
||||
|k, s| {
|
||||
services()
|
||||
.rooms
|
||||
.timeline
|
||||
.get_pdu(
|
||||
state.get(
|
||||
&services()
|
||||
.rooms
|
||||
.short
|
||||
.get_or_create_shortstatekey(&k.to_string().into(), s)
|
||||
.ok()?,
|
||||
)?,
|
||||
)
|
||||
.ok()?
|
||||
},
|
||||
)
|
||||
.map_err(|e| {
|
||||
warn!("Auth check failed: {e}");
|
||||
Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed")
|
||||
})?;
|
||||
|
||||
if !authenticated {
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Auth check failed",
|
||||
));
|
||||
}
|
||||
|
||||
info!("Saving state from send_join");
|
||||
let (statehash_before_join, new, removed) =
|
||||
services().rooms.state_compressor.save_state(
|
||||
room_id,
|
||||
Arc::new(
|
||||
state
|
||||
.into_iter()
|
||||
.map(|(k, id)| {
|
||||
services()
|
||||
.rooms
|
||||
.state_compressor
|
||||
.compress_state_event(k, &id)
|
||||
})
|
||||
.collect::<Result<_>>()?,
|
||||
),
|
||||
)?;
|
||||
|
||||
services()
|
||||
.rooms
|
||||
.state
|
||||
.force_state(room_id, statehash_before_join, new, removed, &state_lock)
|
||||
.await?;
|
||||
|
||||
info!("Updating joined counts for new room");
|
||||
services().rooms.state_cache.update_joined_count(room_id)?;
|
||||
|
||||
// We append to state before appending the pdu, so we don't have a moment in time with the
|
||||
// pdu without it's state. This is okay because append_pdu can't fail.
|
||||
let statehash_after_join = services().rooms.state.append_to_state(&parsed_join_pdu)?;
|
||||
|
||||
info!("Appending new room join event");
|
||||
services()
|
||||
.rooms
|
||||
.timeline
|
||||
.append_pdu(
|
||||
&parsed_join_pdu,
|
||||
join_event,
|
||||
vec![(*parsed_join_pdu.event_id).to_owned()],
|
||||
&state_lock,
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("Setting final room state for new room");
|
||||
// We set the room state after inserting the pdu, so that we never have a moment in time
|
||||
// where events in the current room state do not exist
|
||||
services()
|
||||
.rooms
|
||||
.state
|
||||
.set_room_state(room_id, statehash_after_join, &state_lock)?;
|
||||
} else {
|
||||
info!("We can join locally");
|
||||
|
||||
let join_rules_event_content =
|
||||
services().rooms.state_accessor.get_join_rules(room_id)?;
|
||||
|
||||
let restriction_rooms = match join_rules_event_content {
|
||||
Some(RoomJoinRulesEventContent {
|
||||
join_rule: JoinRule::Restricted(restricted),
|
||||
})
|
||||
| Some(RoomJoinRulesEventContent {
|
||||
join_rule: JoinRule::KnockRestricted(restricted),
|
||||
}) => restricted
|
||||
.allow
|
||||
.into_iter()
|
||||
.filter_map(|a| match a {
|
||||
AllowRule::RoomMembership(r) => Some(r.room_id),
|
||||
_ => None,
|
||||
})
|
||||
.collect(),
|
||||
_ => Vec::new(),
|
||||
};
|
||||
|
||||
let authorized_user = if restriction_rooms.iter().any(|restriction_room_id| {
|
||||
services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.is_joined(sender_user, restriction_room_id)
|
||||
.unwrap_or(false)
|
||||
}) {
|
||||
let mut auth_user = None;
|
||||
for user in services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.room_members(room_id)
|
||||
.filter_map(Result::ok)
|
||||
.collect::<Vec<_>>()
|
||||
{
|
||||
if user.server_name() == services().globals.server_name()
|
||||
&& services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.user_can_invite(room_id, &user, sender_user, &state_lock)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
auth_user = Some(user);
|
||||
break;
|
||||
}
|
||||
}
|
||||
auth_user
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let event = RoomMemberEventContent {
|
||||
membership: MembershipState::Join,
|
||||
displayname: services().users.displayname(sender_user)?,
|
||||
avatar_url: services().users.avatar_url(sender_user)?,
|
||||
is_direct: None,
|
||||
third_party_invite: None,
|
||||
blurhash: services().users.blurhash(sender_user)?,
|
||||
reason: reason.clone(),
|
||||
join_authorized_via_users_server: authorized_user,
|
||||
};
|
||||
|
||||
// Try normal join first
|
||||
let Err(error) = services()
|
||||
.rooms
|
||||
.timeline
|
||||
.build_and_append_pdu(
|
||||
PduBuilder {
|
||||
event_type: TimelineEventType::RoomMember,
|
||||
content: to_raw_value(&event).expect("event is valid, we just created it"),
|
||||
unsigned: None,
|
||||
state_key: Some(sender_user.to_string()),
|
||||
redacts: None,
|
||||
timestamp: None,
|
||||
},
|
||||
sender_user,
|
||||
room_id,
|
||||
&state_lock,
|
||||
)
|
||||
.await
|
||||
else {
|
||||
return Ok(join_room_by_id::v3::Response::new(room_id.to_owned()));
|
||||
};
|
||||
|
||||
if !restriction_rooms.is_empty()
|
||||
&& servers
|
||||
.iter()
|
||||
.any(|s| *s != services().globals.server_name())
|
||||
{
|
||||
info!(
|
||||
"We couldn't do the join locally, maybe federation can help to satisfy the restricted join requirements"
|
||||
);
|
||||
let (make_join_response, remote_server) =
|
||||
make_join_request(sender_user, room_id, servers).await?;
|
||||
|
||||
let room_version_id = match make_join_response.room_version {
|
||||
Some(room_version_id)
|
||||
if services()
|
||||
.globals
|
||||
.supported_room_versions()
|
||||
.contains(&room_version_id) =>
|
||||
{
|
||||
room_version_id
|
||||
}
|
||||
_ => return Err(Error::BadServerResponse("Room version is not supported")),
|
||||
};
|
||||
|
||||
let (event_id, join_event, restricted_join) = self.populate_membership_template(
|
||||
&make_join_response.event,
|
||||
sender_user,
|
||||
reason,
|
||||
&room_version_id,
|
||||
MembershipState::Join,
|
||||
)?;
|
||||
|
||||
let send_join_response = services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&remote_server,
|
||||
federation::membership::create_join_event::v2::Request {
|
||||
room_id: room_id.to_owned(),
|
||||
event_id: event_id.to_owned(),
|
||||
pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()),
|
||||
omit_members: false,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pdu = if let Some(signed_raw) = send_join_response.room_state.event {
|
||||
let (signed_event_id, signed_pdu) =
|
||||
gen_event_id_canonical_json(&signed_raw, &room_version_id)?;
|
||||
|
||||
if signed_event_id != event_id {
|
||||
return Err(Error::BadServerResponse(
|
||||
"Server sent event with wrong event id",
|
||||
));
|
||||
}
|
||||
|
||||
signed_pdu
|
||||
} else if restricted_join {
|
||||
return Err(Error::BadServerResponse(
|
||||
"No signed event was returned, despite just performing a restricted join",
|
||||
));
|
||||
} else {
|
||||
join_event
|
||||
};
|
||||
|
||||
drop(state_lock);
|
||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||
services()
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(
|
||||
&remote_server,
|
||||
&event_id,
|
||||
room_id,
|
||||
pdu,
|
||||
true,
|
||||
&pub_key_map,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(join_room_by_id::v3::Response::new(room_id.to_owned()))
|
||||
}
|
||||
|
||||
/// Takes a membership template, as returned from the `/federation/*/make_*` endpoints, and
|
||||
/// populates them to the point as to where they are a full pdu, ready to be appended to the timeline
|
||||
///
|
||||
/// Returns the event id, the pdu, and whether this event is a restricted join
|
||||
pub fn populate_membership_template(
|
||||
&self,
|
||||
member_template: &RawJsonValue,
|
||||
sender_user: &UserId,
|
||||
reason: Option<String>,
|
||||
room_version_id: &RoomVersionId,
|
||||
membership: MembershipState,
|
||||
) -> Result<(OwnedEventId, BTreeMap<String, CanonicalJsonValue>, bool), Error> {
|
||||
let mut member_event_stub: CanonicalJsonObject =
|
||||
serde_json::from_str(member_template.get()).map_err(|_| {
|
||||
Error::BadServerResponse("Invalid make_knock event json received from server.")
|
||||
})?;
|
||||
|
||||
let join_authorized_via_users_server = member_event_stub
|
||||
.get("content")
|
||||
.map(|s| {
|
||||
s.as_object()?
|
||||
.get("join_authorised_via_users_server")?
|
||||
.as_str()
|
||||
})
|
||||
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok());
|
||||
|
||||
let restricted_join = join_authorized_via_users_server.is_some();
|
||||
|
||||
member_event_stub.insert(
|
||||
"origin".to_owned(),
|
||||
CanonicalJsonValue::String(services().globals.server_name().as_str().to_owned()),
|
||||
);
|
||||
|
||||
member_event_stub.insert(
|
||||
"origin_server_ts".to_owned(),
|
||||
CanonicalJsonValue::Integer(
|
||||
utils::millis_since_unix_epoch()
|
||||
.try_into()
|
||||
.expect("Timestamp is valid js_int value"),
|
||||
),
|
||||
);
|
||||
|
||||
member_event_stub.insert(
|
||||
"content".to_owned(),
|
||||
to_canonical_value(RoomMemberEventContent {
|
||||
membership,
|
||||
displayname: services().users.displayname(sender_user)?,
|
||||
avatar_url: services().users.avatar_url(sender_user)?,
|
||||
is_direct: None,
|
||||
third_party_invite: None,
|
||||
blurhash: services().users.blurhash(sender_user)?,
|
||||
reason: reason.clone(),
|
||||
join_authorized_via_users_server,
|
||||
})
|
||||
.expect("event is valid, we just created it"),
|
||||
);
|
||||
|
||||
member_event_stub.remove("event_id");
|
||||
|
||||
ruma::signatures::hash_and_sign_event(
|
||||
services().globals.server_name().as_str(),
|
||||
services().globals.keypair(),
|
||||
&mut member_event_stub,
|
||||
room_version_id,
|
||||
)
|
||||
.expect("event is valid, we just created it");
|
||||
|
||||
let event_id = format!(
|
||||
"${}",
|
||||
ruma::signatures::reference_hash(&member_event_stub, room_version_id)
|
||||
.expect("Event format validated when event was hashed")
|
||||
);
|
||||
|
||||
let event_id = <OwnedEventId>::try_from(event_id)
|
||||
.expect("ruma's reference hashes are valid event ids");
|
||||
|
||||
member_event_stub.insert(
|
||||
"event_id".to_owned(),
|
||||
CanonicalJsonValue::String(event_id.as_str().to_owned()),
|
||||
);
|
||||
|
||||
Ok((event_id, member_event_stub, restricted_join))
|
||||
}
|
||||
}
|
||||
|
||||
async fn make_join_request(
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
servers: &[OwnedServerName],
|
||||
) -> Result<(
|
||||
federation::membership::prepare_join_event::v1::Response,
|
||||
OwnedServerName,
|
||||
)> {
|
||||
let mut make_join_response_and_server = Err(Error::BadServerResponse(
|
||||
"No server available to assist in joining.",
|
||||
));
|
||||
|
||||
for remote_server in servers {
|
||||
if remote_server == services().globals.server_name() {
|
||||
continue;
|
||||
}
|
||||
info!("Asking {remote_server} for make_join");
|
||||
let make_join_response = services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
remote_server,
|
||||
federation::membership::prepare_join_event::v1::Request {
|
||||
room_id: room_id.to_owned(),
|
||||
user_id: sender_user.to_owned(),
|
||||
ver: services().globals.supported_room_versions(),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
make_join_response_and_server = make_join_response.map(|r| (r, remote_server.clone()));
|
||||
|
||||
if make_join_response_and_server.is_ok() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
make_join_response_and_server
|
||||
}
|
||||
async fn validate_and_add_event_id(
|
||||
pdu: &RawJsonValue,
|
||||
room_version: &RoomVersionId,
|
||||
pub_key_map: &RwLock<BTreeMap<String, SigningKeys>>,
|
||||
) -> Result<(OwnedEventId, CanonicalJsonObject)> {
|
||||
let mut value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
|
||||
error!("Invalid PDU in server response: {:?}: {:?}", pdu, e);
|
||||
Error::BadServerResponse("Invalid PDU in server response")
|
||||
})?;
|
||||
let event_id = EventId::parse(format!(
|
||||
"${}",
|
||||
ruma::signatures::reference_hash(&value, room_version)
|
||||
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid PDU format"))?
|
||||
))
|
||||
.expect("ruma's reference hashes are valid event ids");
|
||||
|
||||
let back_off = |id| async {
|
||||
match services()
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.await
|
||||
.entry(id)
|
||||
{
|
||||
Entry::Vacant(e) => {
|
||||
e.insert((Instant::now(), 1));
|
||||
}
|
||||
Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1),
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((time, tries)) = services()
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.await
|
||||
.get(&event_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries);
|
||||
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
|
||||
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
|
||||
}
|
||||
|
||||
if time.elapsed() < min_elapsed_duration {
|
||||
debug!("Backing off from {}", event_id);
|
||||
return Err(Error::BadServerResponse("bad event, still backing off"));
|
||||
}
|
||||
}
|
||||
|
||||
let origin_server_ts = value.get("origin_server_ts").ok_or_else(|| {
|
||||
error!("Invalid PDU, no origin_server_ts field");
|
||||
Error::BadRequest(
|
||||
ErrorKind::MissingParam,
|
||||
"Invalid PDU, no origin_server_ts field",
|
||||
)
|
||||
})?;
|
||||
|
||||
let origin_server_ts: MilliSecondsSinceUnixEpoch = {
|
||||
let ts = origin_server_ts.as_integer().ok_or_else(|| {
|
||||
Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"origin_server_ts must be an integer",
|
||||
)
|
||||
})?;
|
||||
|
||||
MilliSecondsSinceUnixEpoch(i64::from(ts).try_into().map_err(|_| {
|
||||
Error::BadRequest(ErrorKind::InvalidParam, "Time must be after the unix epoch")
|
||||
})?)
|
||||
};
|
||||
|
||||
let unfiltered_keys = (*pub_key_map.read().await).clone();
|
||||
|
||||
let keys =
|
||||
services()
|
||||
.globals
|
||||
.filter_keys_server_map(unfiltered_keys, origin_server_ts, room_version);
|
||||
|
||||
if let Err(e) = ruma::signatures::verify_event(&keys, &value, room_version) {
|
||||
warn!("Event {} failed verification {:?} {}", event_id, pdu, e);
|
||||
back_off(event_id).await;
|
||||
return Err(Error::BadServerResponse("Event failed verification."));
|
||||
}
|
||||
|
||||
value.insert(
|
||||
"event_id".to_owned(),
|
||||
CanonicalJsonValue::String(event_id.as_str().to_owned()),
|
||||
);
|
||||
|
||||
Ok((event_id, value))
|
||||
}
|
|
@ -3,6 +3,7 @@ pub mod auth_chain;
|
|||
pub mod directory;
|
||||
pub mod edus;
|
||||
pub mod event_handler;
|
||||
pub mod helpers;
|
||||
pub mod lazy_loading;
|
||||
pub mod metadata;
|
||||
pub mod outlier;
|
||||
|
@ -45,6 +46,7 @@ pub struct Service {
|
|||
pub directory: directory::Service,
|
||||
pub edus: edus::Service,
|
||||
pub event_handler: event_handler::Service,
|
||||
pub helpers: helpers::Service,
|
||||
pub lazy_loading: lazy_loading::Service,
|
||||
pub metadata: metadata::Service,
|
||||
pub outlier: outlier::Service,
|
||||
|
|
|
@ -426,8 +426,8 @@ impl Service {
|
|||
})
|
||||
}
|
||||
|
||||
/// Returns the join rule for a given room
|
||||
pub fn get_join_rule(
|
||||
/// Returns the space-room join rule for a given room
|
||||
pub fn get_space_room_join_rule(
|
||||
&self,
|
||||
current_room: &RoomId,
|
||||
) -> Result<(SpaceRoomJoinRule, Vec<OwnedRoomId>), Error> {
|
||||
|
@ -450,6 +450,26 @@ impl Service {
|
|||
.unwrap_or((SpaceRoomJoinRule::Invite, vec![])))
|
||||
}
|
||||
|
||||
/// Returns the join rules event content of a room, if there are any and we are aware of it locally
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn get_join_rules(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
) -> Result<Option<RoomJoinRulesEventContent>, Error> {
|
||||
let join_rules_event = self.room_state_get(room_id, &StateEventType::RoomJoinRules, "")?;
|
||||
|
||||
join_rules_event
|
||||
.as_ref()
|
||||
.map(|join_rules_event| {
|
||||
serde_json::from_str::<RoomJoinRulesEventContent>(join_rules_event.content.get())
|
||||
.map_err(|e| {
|
||||
warn!("Invalid join rules event: {}", e);
|
||||
Error::bad_database("Invalid join rules event in db.")
|
||||
})
|
||||
})
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Returns an empty vec if not a restricted room
|
||||
pub fn allowed_room_ids(&self, join_rule: JoinRule) -> Vec<OwnedRoomId> {
|
||||
let mut room_ids = vec![];
|
||||
|
|
|
@ -16,6 +16,12 @@ pub trait Data: Send + Sync {
|
|||
room_id: &RoomId,
|
||||
last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
|
||||
) -> Result<()>;
|
||||
fn mark_as_knocked(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
|
||||
) -> Result<()>;
|
||||
fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>;
|
||||
|
||||
fn update_joined_count(&self, room_id: &RoomId) -> Result<()>;
|
||||
|
@ -65,6 +71,8 @@ pub trait Data: Send + Sync {
|
|||
|
||||
fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;
|
||||
|
||||
fn get_knock_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;
|
||||
|
||||
fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;
|
||||
|
||||
/// Returns an iterator over all rooms this user joined.
|
||||
|
@ -80,12 +88,25 @@ pub trait Data: Send + Sync {
|
|||
user_id: &UserId,
|
||||
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a>;
|
||||
|
||||
/// Returns an iterator over all rooms a user has knocked on.
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn rooms_knocked<'a>(
|
||||
&'a self,
|
||||
user_id: &UserId,
|
||||
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a>;
|
||||
|
||||
fn invite_state(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
) -> Result<Option<Vec<Raw<AnyStrippedStateEvent>>>>;
|
||||
|
||||
fn knock_state(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
) -> Result<Option<Vec<Raw<AnyStrippedStateEvent>>>>;
|
||||
|
||||
fn left_state(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
|
@ -105,5 +126,7 @@ pub trait Data: Send + Sync {
|
|||
|
||||
fn is_invited(&self, user_id: &UserId, room_id: &RoomId) -> Result<bool>;
|
||||
|
||||
fn is_knocked(&self, user_id: &UserId, room_id: &RoomId) -> Result<bool>;
|
||||
|
||||
fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> Result<bool>;
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ use ruma::{
|
|||
RoomAccountDataEventType, StateEventType,
|
||||
},
|
||||
serde::Raw,
|
||||
OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
|
||||
OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
|
||||
};
|
||||
use tracing::warn;
|
||||
|
||||
|
@ -185,6 +185,9 @@ impl Service {
|
|||
|
||||
self.db.mark_as_invited(user_id, room_id, last_state)?;
|
||||
}
|
||||
MembershipState::Knock => {
|
||||
self.db.mark_as_knocked(user_id, room_id, last_state)?;
|
||||
}
|
||||
MembershipState::Leave | MembershipState::Ban => {
|
||||
self.db.mark_as_left(user_id, room_id)?;
|
||||
}
|
||||
|
@ -290,6 +293,11 @@ impl Service {
|
|||
self.db.get_invite_count(room_id, user_id)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn get_knock_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
|
||||
self.db.get_knock_count(room_id, user_id)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
|
||||
self.db.get_left_count(room_id, user_id)
|
||||
|
@ -313,6 +321,15 @@ impl Service {
|
|||
self.db.rooms_invited(user_id)
|
||||
}
|
||||
|
||||
/// Returns an iterator over all rooms a user has knocked on.
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn rooms_knocked<'a>(
|
||||
&'a self,
|
||||
user_id: &UserId,
|
||||
) -> impl Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a {
|
||||
self.db.rooms_knocked(user_id)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn invite_state(
|
||||
&self,
|
||||
|
@ -322,6 +339,15 @@ impl Service {
|
|||
self.db.invite_state(user_id, room_id)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn knock_state(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
) -> Result<Option<Vec<Raw<AnyStrippedStateEvent>>>> {
|
||||
self.db.knock_state(user_id, room_id)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn left_state(
|
||||
&self,
|
||||
|
@ -355,8 +381,60 @@ impl Service {
|
|||
self.db.is_invited(user_id, room_id)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn is_knocked(&self, user_id: &UserId, room_id: &RoomId) -> Result<bool> {
|
||||
self.db.is_knocked(user_id, room_id)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> Result<bool> {
|
||||
self.db.is_left(user_id, room_id)
|
||||
}
|
||||
|
||||
/// Function to assist performing a membership event that may require help from a remote server
|
||||
///
|
||||
/// If a room id is provided, the servers returned will consist of:
|
||||
/// - the `via` argument, provided by the client
|
||||
/// - servers of the senders of the stripped state events we are given
|
||||
/// - the server in the room id
|
||||
///
|
||||
/// Otherwise, the servers returned will come from the response when resolving the alias.
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub async fn get_room_id_and_via_servers(
|
||||
&self,
|
||||
sender_user: &UserId,
|
||||
room_id_or_alias: OwnedRoomOrAliasId,
|
||||
via: Vec<OwnedServerName>,
|
||||
) -> Result<(Vec<OwnedServerName>, OwnedRoomId), Error> {
|
||||
let (servers, room_id) = match OwnedRoomId::try_from(room_id_or_alias) {
|
||||
Ok(room_id) => {
|
||||
let mut servers = via;
|
||||
servers.extend(
|
||||
self.invite_state(sender_user, &room_id)
|
||||
.transpose()
|
||||
.or_else(|| self.knock_state(sender_user, &room_id).transpose())
|
||||
.transpose()?
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.filter_map(|event| event.deserialize().ok())
|
||||
.map(|event| event.sender().server_name().to_owned()),
|
||||
);
|
||||
|
||||
servers.push(
|
||||
room_id
|
||||
.server_name()
|
||||
.expect("Room IDs should always have a server name")
|
||||
.into(),
|
||||
);
|
||||
|
||||
(servers, room_id)
|
||||
}
|
||||
Err(room_alias) => {
|
||||
let response = services().rooms.alias.get_alias_helper(room_alias).await?;
|
||||
|
||||
(response.servers, response.room_id)
|
||||
}
|
||||
};
|
||||
Ok((servers, room_id))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -451,17 +451,22 @@ impl Service {
|
|||
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
|
||||
|
||||
let stripped_state = match content.membership {
|
||||
MembershipState::Invite => {
|
||||
MembershipState::Invite | MembershipState::Knock => {
|
||||
let mut state = services().rooms.state.stripped_state(&pdu.room_id)?;
|
||||
// So that clients can get info about who invitied them, the reason, when, etc.
|
||||
// So that clients can get info about who invitied them (not relevant for knocking), the reason, when, etc.
|
||||
state.push(pdu.to_stripped_state_event());
|
||||
Some(state)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// Update our membership info, we do this here incase a user is invited
|
||||
// and immediately leaves we need the DB to record the invite event for auth
|
||||
// Here we don't attempt to join if the previous membership was knock and the
|
||||
// new one is join, like we do for `/federation/*/invite`, as not only are there
|
||||
// implementation difficulties due to callers not implementing `Send`, but
|
||||
// invites we recieve which aren't over `/invite` must have been due to a
|
||||
// database reset or switching server implementations, which means we probably
|
||||
// shouldn't be joining automatically anyways, since it may surprise users to
|
||||
// suddenly join rooms which clients didn't even show as being knocked on before.
|
||||
services().rooms.state_cache.update_membership(
|
||||
&pdu.room_id,
|
||||
&target_user_id,
|
||||
|
|
Loading…
Reference in a new issue