mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-01-15 22:16:27 +03:00
feat: Add federation backfill and event visibility
Co-authored-by: Nyaaori <+@nyaaori.cat>
This commit is contained in:
parent
249960b111
commit
d51f8a6c55
5 changed files with 150 additions and 11 deletions
|
@ -12,6 +12,7 @@ use ruma::{
|
||||||
client::error::{Error as RumaError, ErrorKind},
|
client::error::{Error as RumaError, ErrorKind},
|
||||||
federation::{
|
federation::{
|
||||||
authorization::get_event_authorization,
|
authorization::get_event_authorization,
|
||||||
|
backfill::get_backfill,
|
||||||
device::get_devices::{self, v1::UserDevice},
|
device::get_devices::{self, v1::UserDevice},
|
||||||
directory::{get_public_rooms, get_public_rooms_filtered},
|
directory::{get_public_rooms, get_public_rooms_filtered},
|
||||||
discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
|
discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
|
||||||
|
@ -43,11 +44,11 @@ use ruma::{
|
||||||
serde::{Base64, JsonObject, Raw},
|
serde::{Base64, JsonObject, Raw},
|
||||||
to_device::DeviceIdOrAllDevices,
|
to_device::DeviceIdOrAllDevices,
|
||||||
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
|
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
|
||||||
OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName,
|
OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, UInt,
|
||||||
};
|
};
|
||||||
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
||||||
use std::{
|
use std::{
|
||||||
collections::BTreeMap,
|
collections::{BTreeMap, HashSet},
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
mem,
|
mem,
|
||||||
net::{IpAddr, SocketAddr},
|
net::{IpAddr, SocketAddr},
|
||||||
|
@ -952,6 +953,53 @@ pub async fn get_event_route(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// # `GET /_matrix/federation/v1/backfill/<room_id>`
|
||||||
|
///
|
||||||
|
/// Retrieves events from before the sender joined the room, if the room's
|
||||||
|
/// history visibility allows.
|
||||||
|
pub async fn get_backfill_route(
|
||||||
|
body: Ruma<get_backfill::v1::IncomingRequest>,
|
||||||
|
) -> Result<get_backfill::v1::Response> {
|
||||||
|
if !services().globals.allow_federation() {
|
||||||
|
return Err(Error::bad_config("Federation is disabled."));
|
||||||
|
}
|
||||||
|
|
||||||
|
let sender_servername = body
|
||||||
|
.sender_servername
|
||||||
|
.as_ref()
|
||||||
|
.expect("server is authenticated");
|
||||||
|
|
||||||
|
info!("Got backfill request from: {}", sender_servername);
|
||||||
|
|
||||||
|
if !services()
|
||||||
|
.rooms
|
||||||
|
.state_cache
|
||||||
|
.server_in_room(sender_servername, &body.room_id)?
|
||||||
|
{
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
ErrorKind::Forbidden,
|
||||||
|
"Server is not in room.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let origin = services().globals.server_name().to_owned();
|
||||||
|
let earliest_events = &[];
|
||||||
|
|
||||||
|
let events = get_missing_events(
|
||||||
|
sender_servername,
|
||||||
|
&body.room_id,
|
||||||
|
earliest_events,
|
||||||
|
&body.v,
|
||||||
|
body.limit,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(get_backfill::v1::Response {
|
||||||
|
origin,
|
||||||
|
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
|
||||||
|
pdus: events,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
|
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
|
||||||
///
|
///
|
||||||
/// Retrieves events that the sender is missing.
|
/// Retrieves events that the sender is missing.
|
||||||
|
@ -983,11 +1031,43 @@ pub async fn get_missing_events_route(
|
||||||
.event_handler
|
.event_handler
|
||||||
.acl_check(sender_servername, &body.room_id)?;
|
.acl_check(sender_servername, &body.room_id)?;
|
||||||
|
|
||||||
let mut queued_events = body.latest_events.clone();
|
let events = get_missing_events(
|
||||||
|
sender_servername,
|
||||||
|
&body.room_id,
|
||||||
|
&body.earliest_events,
|
||||||
|
&body.latest_events,
|
||||||
|
body.limit,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(get_missing_events::v1::Response { events })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recursively fetch events starting from `latest_events`, going backwards
|
||||||
|
// through each event's `prev_events` until reaching the `earliest_events`.
|
||||||
|
//
|
||||||
|
// Used by the federation /backfill and /get_missing_events routes.
|
||||||
|
fn get_missing_events(
|
||||||
|
sender_servername: &ServerName,
|
||||||
|
room_id: &RoomId,
|
||||||
|
earliest_events: &[OwnedEventId],
|
||||||
|
latest_events: &Vec<OwnedEventId>,
|
||||||
|
limit: UInt,
|
||||||
|
) -> Result<Vec<Box<RawJsonValue>>> {
|
||||||
|
let limit = u64::from(limit) as usize;
|
||||||
|
|
||||||
|
let mut queued_events = latest_events.clone();
|
||||||
let mut events = Vec::new();
|
let mut events = Vec::new();
|
||||||
|
|
||||||
|
let mut stop_at_events = HashSet::with_capacity(limit);
|
||||||
|
stop_at_events.extend(earliest_events.iter().cloned());
|
||||||
|
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
while i < queued_events.len() && events.len() < u64::from(body.limit) as usize {
|
while i < queued_events.len() && events.len() < limit {
|
||||||
|
if stop_at_events.contains(&queued_events[i]) {
|
||||||
|
i += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? {
|
if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? {
|
||||||
let room_id_str = pdu
|
let room_id_str = pdu
|
||||||
.get("room_id")
|
.get("room_id")
|
||||||
|
@ -997,10 +1077,10 @@ pub async fn get_missing_events_route(
|
||||||
let event_room_id = <&RoomId>::try_from(room_id_str)
|
let event_room_id = <&RoomId>::try_from(room_id_str)
|
||||||
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
|
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
|
||||||
|
|
||||||
if event_room_id != body.room_id {
|
if event_room_id != room_id {
|
||||||
warn!(
|
warn!(
|
||||||
"Evil event detected: Event {} found while searching in room {}",
|
"Evil event detected: Event {} found while searching in room {}",
|
||||||
queued_events[i], body.room_id
|
queued_events[i], room_id
|
||||||
);
|
);
|
||||||
return Err(Error::BadRequest(
|
return Err(Error::BadRequest(
|
||||||
ErrorKind::InvalidParam,
|
ErrorKind::InvalidParam,
|
||||||
|
@ -1008,10 +1088,20 @@ pub async fn get_missing_events_route(
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if body.earliest_events.contains(&queued_events[i]) {
|
let event_is_visible = services()
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.server_can_see_event(sender_servername, &queued_events[i])?;
|
||||||
|
|
||||||
|
if !event_is_visible {
|
||||||
i += 1;
|
i += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Don't send this event again if it comes through some other
|
||||||
|
// event's prev_events.
|
||||||
|
stop_at_events.insert(queued_events[i].clone());
|
||||||
|
|
||||||
queued_events.extend_from_slice(
|
queued_events.extend_from_slice(
|
||||||
&serde_json::from_value::<Vec<OwnedEventId>>(
|
&serde_json::from_value::<Vec<OwnedEventId>>(
|
||||||
serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| {
|
serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| {
|
||||||
|
@ -1026,7 +1116,7 @@ pub async fn get_missing_events_route(
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(get_missing_events::v1::Response { events })
|
Ok(events)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`
|
/// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`
|
||||||
|
|
|
@ -5,7 +5,13 @@ use std::{
|
||||||
|
|
||||||
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
|
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use ruma::{events::StateEventType, EventId, RoomId};
|
use ruma::{
|
||||||
|
events::{
|
||||||
|
room::history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent},
|
||||||
|
StateEventType,
|
||||||
|
},
|
||||||
|
EventId, RoomId, ServerName,
|
||||||
|
};
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
||||||
|
@ -141,6 +147,35 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Whether a server is allowed to see an event through federation, based on
|
||||||
|
/// the room's history_visibility at that event's state.
|
||||||
|
///
|
||||||
|
/// Note: Joined/Invited history visibility not yet implemented.
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
fn server_can_see_event(&self, _server_name: &ServerName, event_id: &EventId) -> Result<bool> {
|
||||||
|
let shortstatehash = match self.pdu_shortstatehash(event_id) {
|
||||||
|
Ok(Some(shortstatehash)) => shortstatehash,
|
||||||
|
_ => return Ok(false),
|
||||||
|
};
|
||||||
|
|
||||||
|
let history_visibility = self
|
||||||
|
.state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")?
|
||||||
|
.map(|event| serde_json::from_str(event.content.get()))
|
||||||
|
.transpose()
|
||||||
|
.map_err(|_| Error::bad_database("Invalid room history visibility event in database."))?
|
||||||
|
.map(|content: RoomHistoryVisibilityEventContent| content.history_visibility);
|
||||||
|
|
||||||
|
Ok(match history_visibility {
|
||||||
|
Some(HistoryVisibility::WorldReadable) => true,
|
||||||
|
Some(HistoryVisibility::Shared) => true,
|
||||||
|
// TODO: Check if any of the server's users were invited
|
||||||
|
// at this point in time.
|
||||||
|
Some(HistoryVisibility::Joined) => false,
|
||||||
|
Some(HistoryVisibility::Invited) => false,
|
||||||
|
_ => false,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the full room state.
|
/// Returns the full room state.
|
||||||
async fn room_state_full(
|
async fn room_state_full(
|
||||||
&self,
|
&self,
|
||||||
|
|
|
@ -368,6 +368,7 @@ fn routes() -> Router {
|
||||||
.ruma_route(server_server::send_transaction_message_route)
|
.ruma_route(server_server::send_transaction_message_route)
|
||||||
.ruma_route(server_server::get_event_route)
|
.ruma_route(server_server::get_event_route)
|
||||||
.ruma_route(server_server::get_missing_events_route)
|
.ruma_route(server_server::get_missing_events_route)
|
||||||
|
.ruma_route(server_server::get_backfill_route)
|
||||||
.ruma_route(server_server::get_event_authorization_route)
|
.ruma_route(server_server::get_event_authorization_route)
|
||||||
.ruma_route(server_server::get_room_state_route)
|
.ruma_route(server_server::get_room_state_route)
|
||||||
.ruma_route(server_server::get_room_state_ids_route)
|
.ruma_route(server_server::get_room_state_ids_route)
|
||||||
|
|
|
@ -4,7 +4,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use ruma::{events::StateEventType, EventId, RoomId};
|
use ruma::{events::StateEventType, EventId, RoomId, ServerName};
|
||||||
|
|
||||||
use crate::{PduEvent, Result};
|
use crate::{PduEvent, Result};
|
||||||
|
|
||||||
|
@ -38,6 +38,9 @@ pub trait Data: Send + Sync {
|
||||||
/// Returns the state hash for this pdu.
|
/// Returns the state hash for this pdu.
|
||||||
fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>>;
|
fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>>;
|
||||||
|
|
||||||
|
/// Returns true if a server has permission to see an event
|
||||||
|
fn server_can_see_event(&self, sever_name: &ServerName, event_id: &EventId) -> Result<bool>;
|
||||||
|
|
||||||
/// Returns the full room state.
|
/// Returns the full room state.
|
||||||
async fn room_state_full(
|
async fn room_state_full(
|
||||||
&self,
|
&self,
|
||||||
|
|
|
@ -5,7 +5,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use data::Data;
|
pub use data::Data;
|
||||||
use ruma::{events::StateEventType, EventId, RoomId};
|
use ruma::{events::StateEventType, EventId, RoomId, ServerName};
|
||||||
|
|
||||||
use crate::{PduEvent, Result};
|
use crate::{PduEvent, Result};
|
||||||
|
|
||||||
|
@ -54,6 +54,16 @@ impl Service {
|
||||||
self.db.pdu_shortstatehash(event_id)
|
self.db.pdu_shortstatehash(event_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if a server has permission to see an event
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub fn server_can_see_event<'a>(
|
||||||
|
&'a self,
|
||||||
|
sever_name: &ServerName,
|
||||||
|
event_id: &EventId,
|
||||||
|
) -> Result<bool> {
|
||||||
|
self.db.server_can_see_event(sever_name, event_id)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the full room state.
|
/// Returns the full room state.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub async fn room_state_full(
|
pub async fn room_state_full(
|
||||||
|
|
Loading…
Reference in a new issue