mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-01-13 21:16:27 +03:00
refactor federation membership handshake endpoints, reducing duplication
This commit is contained in:
parent
12ada1c86a
commit
d820cd4007
1 changed files with 98 additions and 80 deletions
|
@ -56,7 +56,7 @@ use ruma::{
|
||||||
to_device::DeviceIdOrAllDevices,
|
to_device::DeviceIdOrAllDevices,
|
||||||
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
|
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
|
||||||
OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId,
|
OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId,
|
||||||
ServerName,
|
ServerName, UserId,
|
||||||
};
|
};
|
||||||
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
||||||
use std::{
|
use std::{
|
||||||
|
@ -67,7 +67,7 @@ use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{Duration, Instant, SystemTime},
|
time::{Duration, Instant, SystemTime},
|
||||||
};
|
};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::{Mutex, RwLock};
|
||||||
|
|
||||||
use tracing::{debug, error, warn};
|
use tracing::{debug, error, warn};
|
||||||
|
|
||||||
|
@ -1500,32 +1500,8 @@ pub async fn get_room_state_ids_route(
|
||||||
pub async fn create_join_event_template_route(
|
pub async fn create_join_event_template_route(
|
||||||
body: Ruma<prepare_join_event::v1::Request>,
|
body: Ruma<prepare_join_event::v1::Request>,
|
||||||
) -> Result<prepare_join_event::v1::Response> {
|
) -> Result<prepare_join_event::v1::Response> {
|
||||||
if !services().rooms.metadata.exists(&body.room_id)? {
|
let (mutex_state, room_version_id) =
|
||||||
return Err(Error::BadRequest(
|
member_shake_preamble(&body.sender_servername, &body.room_id).await?;
|
||||||
ErrorKind::NotFound,
|
|
||||||
"Room is unknown to this server.",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let sender_servername = body
|
|
||||||
.sender_servername
|
|
||||||
.as_ref()
|
|
||||||
.expect("server is authenticated");
|
|
||||||
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.event_handler
|
|
||||||
.acl_check(sender_servername, &body.room_id)?;
|
|
||||||
|
|
||||||
let mutex_state = Arc::clone(
|
|
||||||
services()
|
|
||||||
.globals
|
|
||||||
.roomid_mutex_state
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry(body.room_id.to_owned())
|
|
||||||
.or_default(),
|
|
||||||
);
|
|
||||||
let state_lock = mutex_state.lock().await;
|
let state_lock = mutex_state.lock().await;
|
||||||
|
|
||||||
// TODO: Conduit does not implement restricted join rules yet, we always reject
|
// TODO: Conduit does not implement restricted join rules yet, we always reject
|
||||||
|
@ -1557,7 +1533,6 @@ pub async fn create_join_event_template_route(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let room_version_id = services().rooms.state.get_room_version(&body.room_id)?;
|
|
||||||
if !body.ver.contains(&room_version_id) {
|
if !body.ver.contains(&room_version_id) {
|
||||||
return Err(Error::BadRequest(
|
return Err(Error::BadRequest(
|
||||||
ErrorKind::IncompatibleRoomVersion {
|
ErrorKind::IncompatibleRoomVersion {
|
||||||
|
@ -1567,12 +1542,29 @@ pub async fn create_join_event_template_route(
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(prepare_join_event::v1::Response {
|
||||||
|
room_version: Some(room_version_id),
|
||||||
|
event: create_membership_template(
|
||||||
|
&body.user_id,
|
||||||
|
&body.room_id,
|
||||||
|
MembershipState::Join,
|
||||||
|
state_lock,
|
||||||
|
)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_membership_template(
|
||||||
|
user_id: &UserId,
|
||||||
|
room_id: &RoomId,
|
||||||
|
membership: MembershipState,
|
||||||
|
state_lock: tokio::sync::MutexGuard<'_, ()>,
|
||||||
|
) -> Result<Box<RawJsonValue>, Error> {
|
||||||
let content = to_raw_value(&RoomMemberEventContent {
|
let content = to_raw_value(&RoomMemberEventContent {
|
||||||
avatar_url: None,
|
avatar_url: None,
|
||||||
blurhash: None,
|
blurhash: None,
|
||||||
displayname: None,
|
displayname: None,
|
||||||
is_direct: None,
|
is_direct: None,
|
||||||
membership: MembershipState::Join,
|
membership,
|
||||||
third_party_invite: None,
|
third_party_invite: None,
|
||||||
reason: None,
|
reason: None,
|
||||||
join_authorized_via_users_server: None,
|
join_authorized_via_users_server: None,
|
||||||
|
@ -1584,12 +1576,12 @@ pub async fn create_join_event_template_route(
|
||||||
event_type: TimelineEventType::RoomMember,
|
event_type: TimelineEventType::RoomMember,
|
||||||
content,
|
content,
|
||||||
unsigned: None,
|
unsigned: None,
|
||||||
state_key: Some(body.user_id.to_string()),
|
state_key: Some(user_id.to_string()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
timestamp: None,
|
timestamp: None,
|
||||||
},
|
},
|
||||||
&body.user_id,
|
user_id,
|
||||||
&body.room_id,
|
room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
@ -1597,28 +1589,59 @@ pub async fn create_join_event_template_route(
|
||||||
|
|
||||||
pdu_json.remove("event_id");
|
pdu_json.remove("event_id");
|
||||||
|
|
||||||
Ok(prepare_join_event::v1::Response {
|
let raw_event = to_raw_value(&pdu_json).expect("CanonicalJson can be serialized to JSON");
|
||||||
room_version: Some(room_version_id),
|
|
||||||
event: to_raw_value(&pdu_json).expect("CanonicalJson can be serialized to JSON"),
|
Ok(raw_event)
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_join_event(
|
/// checks whether the given room exists, and checks whether the specified server is allowed to send events according to the ACL
|
||||||
sender_servername: &ServerName,
|
fn room_and_acl_check(
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
pdu: &RawJsonValue,
|
sender_servername: &Option<OwnedServerName>,
|
||||||
) -> Result<create_join_event::v1::RoomState> {
|
) -> Result<(), Error> {
|
||||||
if !services().rooms.metadata.exists(room_id)? {
|
if !services().rooms.metadata.exists(room_id)? {
|
||||||
return Err(Error::BadRequest(
|
return Err(Error::BadRequest(
|
||||||
ErrorKind::NotFound,
|
ErrorKind::NotFound,
|
||||||
"Room is unknown to this server.",
|
"Room is unknown to this server.",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
let sender_servername = sender_servername.as_ref().expect("server is authenticated");
|
||||||
services()
|
services()
|
||||||
.rooms
|
.rooms
|
||||||
.event_handler
|
.event_handler
|
||||||
.acl_check(sender_servername, room_id)?;
|
.acl_check(sender_servername, room_id)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Takes care of common boilerpalte for room membership handshake endpoints.
|
||||||
|
/// The returned mutex must be locked by the caller.
|
||||||
|
async fn member_shake_preamble(
|
||||||
|
sender_servername: &Option<OwnedServerName>,
|
||||||
|
room_id: &RoomId,
|
||||||
|
) -> Result<(Arc<Mutex<()>>, ruma::RoomVersionId), Error> {
|
||||||
|
room_and_acl_check(room_id, sender_servername)?;
|
||||||
|
|
||||||
|
let mutex_state = Arc::clone(
|
||||||
|
services()
|
||||||
|
.globals
|
||||||
|
.roomid_mutex_state
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.entry(room_id.to_owned())
|
||||||
|
.or_default(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let room_version_id = services().rooms.state.get_room_version(room_id)?;
|
||||||
|
|
||||||
|
Ok((mutex_state, room_version_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_join_event(
|
||||||
|
sender_servername: &Option<OwnedServerName>,
|
||||||
|
room_id: &RoomId,
|
||||||
|
pdu: &RawJsonValue,
|
||||||
|
) -> Result<create_join_event::v1::RoomState> {
|
||||||
|
room_and_acl_check(room_id, sender_servername)?;
|
||||||
|
|
||||||
// TODO: Conduit does not implement restricted join rules yet, we always reject
|
// TODO: Conduit does not implement restricted join rules yet, we always reject
|
||||||
let join_rules_event = services().rooms.state_accessor.room_state_get(
|
let join_rules_event = services().rooms.state_accessor.room_state_get(
|
||||||
|
@ -1659,8 +1682,37 @@ async fn create_join_event(
|
||||||
"Pdu state not found.",
|
"Pdu state not found.",
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
|
append_member_pdu(room_id, pdu).await?;
|
||||||
|
|
||||||
|
let state_ids = services()
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.state_full_ids(shortstatehash)
|
||||||
|
.await?;
|
||||||
|
let auth_chain_ids = services()
|
||||||
|
.rooms
|
||||||
|
.auth_chain
|
||||||
|
.get_auth_chain(room_id, state_ids.values().cloned().collect())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(create_join_event::v1::RoomState {
|
||||||
|
auth_chain: auth_chain_ids
|
||||||
|
.filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok().flatten())
|
||||||
|
.map(PduEvent::convert_to_outgoing_federation_event)
|
||||||
|
.collect(),
|
||||||
|
state: state_ids
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(_, id)| services().rooms.timeline.get_pdu_json(id).ok().flatten())
|
||||||
|
.map(PduEvent::convert_to_outgoing_federation_event)
|
||||||
|
.collect(),
|
||||||
|
event: None, // TODO: handle restricted joins
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Takes the given membership PDU and attempts to append it to the timeline
|
||||||
|
// TODO: validate that the PDU actually a membership event, and ensure it matches the state specified by the endpoint
|
||||||
|
async fn append_member_pdu(room_id: &RoomId, pdu: &RawJsonValue) -> Result<(), Error> {
|
||||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||||
// let mut auth_cache = EventMap::new();
|
|
||||||
|
|
||||||
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
||||||
let room_version_id = services().rooms.state.get_room_version(room_id)?;
|
let room_version_id = services().rooms.state.get_room_version(room_id)?;
|
||||||
|
@ -1705,17 +1757,6 @@ async fn create_join_event(
|
||||||
))?;
|
))?;
|
||||||
drop(mutex_lock);
|
drop(mutex_lock);
|
||||||
|
|
||||||
let state_ids = services()
|
|
||||||
.rooms
|
|
||||||
.state_accessor
|
|
||||||
.state_full_ids(shortstatehash)
|
|
||||||
.await?;
|
|
||||||
let auth_chain_ids = services()
|
|
||||||
.rooms
|
|
||||||
.auth_chain
|
|
||||||
.get_auth_chain(room_id, state_ids.values().cloned().collect())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let servers = services()
|
let servers = services()
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
|
@ -1723,20 +1764,7 @@ async fn create_join_event(
|
||||||
.filter_map(|r| r.ok())
|
.filter_map(|r| r.ok())
|
||||||
.filter(|server| &**server != services().globals.server_name());
|
.filter(|server| &**server != services().globals.server_name());
|
||||||
|
|
||||||
services().sending.send_pdu(servers, &pdu_id)?;
|
services().sending.send_pdu(servers, &pdu_id)
|
||||||
|
|
||||||
Ok(create_join_event::v1::RoomState {
|
|
||||||
auth_chain: auth_chain_ids
|
|
||||||
.filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok().flatten())
|
|
||||||
.map(PduEvent::convert_to_outgoing_federation_event)
|
|
||||||
.collect(),
|
|
||||||
state: state_ids
|
|
||||||
.iter()
|
|
||||||
.filter_map(|(_, id)| services().rooms.timeline.get_pdu_json(id).ok().flatten())
|
|
||||||
.map(PduEvent::convert_to_outgoing_federation_event)
|
|
||||||
.collect(),
|
|
||||||
event: None, // TODO: handle restricted joins
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # `PUT /_matrix/federation/v1/send_join/{roomId}/{eventId}`
|
/// # `PUT /_matrix/federation/v1/send_join/{roomId}/{eventId}`
|
||||||
|
@ -1745,12 +1773,7 @@ async fn create_join_event(
|
||||||
pub async fn create_join_event_v1_route(
|
pub async fn create_join_event_v1_route(
|
||||||
body: Ruma<create_join_event::v1::Request>,
|
body: Ruma<create_join_event::v1::Request>,
|
||||||
) -> Result<create_join_event::v1::Response> {
|
) -> Result<create_join_event::v1::Response> {
|
||||||
let sender_servername = body
|
let room_state = create_join_event(&body.sender_servername, &body.room_id, &body.pdu).await?;
|
||||||
.sender_servername
|
|
||||||
.as_ref()
|
|
||||||
.expect("server is authenticated");
|
|
||||||
|
|
||||||
let room_state = create_join_event(sender_servername, &body.room_id, &body.pdu).await?;
|
|
||||||
|
|
||||||
Ok(create_join_event::v1::Response { room_state })
|
Ok(create_join_event::v1::Response { room_state })
|
||||||
}
|
}
|
||||||
|
@ -1761,16 +1784,11 @@ pub async fn create_join_event_v1_route(
|
||||||
pub async fn create_join_event_v2_route(
|
pub async fn create_join_event_v2_route(
|
||||||
body: Ruma<create_join_event::v2::Request>,
|
body: Ruma<create_join_event::v2::Request>,
|
||||||
) -> Result<create_join_event::v2::Response> {
|
) -> Result<create_join_event::v2::Response> {
|
||||||
let sender_servername = body
|
|
||||||
.sender_servername
|
|
||||||
.as_ref()
|
|
||||||
.expect("server is authenticated");
|
|
||||||
|
|
||||||
let create_join_event::v1::RoomState {
|
let create_join_event::v1::RoomState {
|
||||||
auth_chain,
|
auth_chain,
|
||||||
state,
|
state,
|
||||||
event,
|
event,
|
||||||
} = create_join_event(sender_servername, &body.room_id, &body.pdu).await?;
|
} = create_join_event(&body.sender_servername, &body.room_id, &body.pdu).await?;
|
||||||
let room_state = create_join_event::v2::RoomState {
|
let room_state = create_join_event::v2::RoomState {
|
||||||
members_omitted: false,
|
members_omitted: false,
|
||||||
auth_chain,
|
auth_chain,
|
||||||
|
|
Loading…
Reference in a new issue