1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-01-20 08:05:41 +03:00

Merge pull request 'Federation, Admin room and many fixes' () from federation into master

Reviewed-on: https://git.koesters.xyz/timo/conduit/pulls/221
This commit is contained in:
Timo Kösters 2020-10-13 19:04:42 +02:00
commit 0e817f6951
46 changed files with 3326 additions and 1920 deletions

775
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -12,27 +12,52 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Used to handle requests
# TODO: This can become optional as soon as proper configs are supported
#rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "8d779caa22c63b15a6c3ceb75d8f6d4971b2eb67", features = ["tls"] } # Used to handle requests
rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", features = ["tls"] }
#rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "8d779caa22c63b15a6c3ceb75d8f6d4971b2eb67", default-features = false, features = ["tls"] } # Used to handle requests
rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] }
#ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"], rev = "987d48666cf166cf12100b5dbc61b5e3385c4014" } # Used for matrix spec type definitions and helpers
ruma = { git = "https://github.com/timokoesters/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "timo-old-fixes" } # Used for matrix spec type definitions and helpers
#ruma = { path = "../ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] }
tokio = "0.2.22" # Used for long polling
sled = "0.32.0" # Used for storing data permanently
log = "0.4.8" # Used for emitting log entries
http = "0.2.1" # Used for rocket<->ruma conversions
directories = "2.0.2" # Used to find data directory for default db path
js_int = "0.1.5" # Used for number types for ruma
serde_json = { version = "1.0.53", features = ["raw_value"] } # Used for ruma wrapper
serde = "1.0.111" # Used for pdu definition
rand = "0.7.3" # Used for secure identifiers
rust-argon2 = "0.8.2" # Used to hash passwords
reqwest = "0.10.6" # Used to send requests
thiserror = "1.0.19" # Used for conduit::Error type
image = { version = "0.23.4", default-features = false, features = ["jpeg", "png", "gif"] } # Used to generate thumbnails for images
base64 = "0.12.3" # Used to encode server public key
# Used for matrix spec type definitions and helpers
#ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"], rev = "aff914050eb297bd82b8aafb12158c88a9e480e1" }
ruma = { git = "https://github.com/timokoesters/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "timo-fed-fixes" }
#ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] }
# Used when doing state resolution
state-res = { git = "https://github.com/timokoesters/state-res", branch = "spec-comp", features = ["unstable-pre-spec"] }
#state-res = { path = "../state-res", features = ["unstable-pre-spec"] }
# Used for long polling
tokio = "0.2.22"
# Used for storing data permanently
sled = "0.34.4"
# Used for emitting log entries
log = "0.4.11"
# Used for rocket<->ruma conversions
http = "0.2.1"
# Used to find data directory for default db path
directories = "3.0.1"
# Used for number types for ruma
js_int = "0.1.9"
# Used for ruma wrapper
serde_json = { version = "1.0.57", features = ["raw_value"] }
# Used for pdu definition
serde = "1.0.116"
# Used for secure identifiers
rand = "0.7.3"
# Used to hash passwords
rust-argon2 = "0.8.2"
# Used to send requests
reqwest = "0.10.8"
# Used for conduit::Error type
thiserror = "1.0.20"
# Used to generate thumbnails for images
image = { version = "0.23.9", default-features = false, features = ["jpeg", "png", "gif"] }
# Used to encode server public key
base64 = "0.12.3"
# Used when hashing the state
ring = "0.16.15"
# Used when querying the SRV record of other servers
trust-dns-resolver = "0.19.5"
[features]
default = ["conduit_bin"]

View file

@ -27,7 +27,10 @@ Environment="ROCKET_SERVER_NAME=YOURSERVERNAME.HERE" # EDIT THIS
Environment="ROCKET_PORT=14004" # Reverse proxy port
#Environment="ROCKET_MAX_REQUEST_SIZE=20000000" # in bytes
#Environment="ROCKET_REGISTRATION_DISABLED=true"
#Environment="ROCKET_ENCRYPTION_DISABLED=true"
#Environment="ROCKET_FEDERATION_ENABLED=true"
#Environment="ROCKET_LOG=normal" # Detailed logging
Environment="ROCKET_ENV=production"

View file

@ -16,6 +16,8 @@ port = 14004
# Note: existing rooms will continue to work
#encryption_disabled = true
#federation_enabled = true
# Default path is in this user's data
#database_path = "/home/timo/MyConduitServer"

View file

@ -31,6 +31,7 @@ services:
# ROCKET_PORT: 8000
# ROCKET_REGISTRATION_DISABLED: 'true'
# ROCKET_ENCRYPTION_DISABLED: 'true'
# ROCKET_FEDERATION_ENABLED: 'true'
# ROCKET_DATABASE_PATH: /srv/conduit/.local/share/conduit
# ROCKET_WORKERS: 10
# ROCKET_MAX_REQUEST_SIZE: 20_000_000 # in bytes, ~20 MB

View file

@ -1,3 +1,5 @@
use std::{collections::BTreeMap, convert::TryInto};
use super::{State, DEVICE_ID_LENGTH, SESSION_ID_LENGTH, TOKEN_LENGTH};
use crate::{pdu::PduBuilder, utils, ConduitResult, Database, Error, Ruma};
use ruma::{
@ -11,8 +13,11 @@ use ruma::{
uiaa::{AuthFlow, UiaaInfo},
},
},
events::{room::member, EventType},
UserId,
events::{
room::canonical_alias, room::guest_access, room::history_visibility, room::join_rules,
room::member, room::name, room::topic, EventType,
},
RoomAliasId, RoomId, RoomVersionId, UserId,
};
use register::RegistrationKind;
@ -33,7 +38,7 @@ const GUEST_NAME_LENGTH: usize = 10;
)]
pub fn get_register_available_route(
db: State<'_, Database>,
body: Ruma<get_username_availability::Request>,
body: Ruma<get_username_availability::Request<'_>>,
) -> ConduitResult<get_username_availability::Response> {
// Validate user id
let user_id = UserId::parse_with_server_name(body.username.clone(), db.globals.server_name())
@ -73,9 +78,9 @@ pub fn get_register_available_route(
feature = "conduit_bin",
post("/_matrix/client/r0/register", data = "<body>")
)]
pub fn register_route(
pub async fn register_route(
db: State<'_, Database>,
body: Ruma<register::Request>,
body: Ruma<register::Request<'_>>,
) -> ConduitResult<register::Response> {
if db.globals.registration_disabled() {
return Err(Error::BadRequest(
@ -84,7 +89,7 @@ pub fn register_route(
));
}
let is_guest = matches!(body.kind, Some(RegistrationKind::Guest));
let is_guest = body.kind == RegistrationKind::Guest;
let mut missing_username = false;
@ -202,6 +207,265 @@ pub fn register_route(
body.initial_device_display_name.clone(),
)?;
// If this is the first user on this server, create the admins room
if db.users.count() == 1 {
// Create a user for the server
let conduit_user = UserId::parse_with_server_name("conduit", db.globals.server_name())
.expect("@conduit:server_name is valid");
db.users.create(&conduit_user, "")?;
let room_id = RoomId::new(db.globals.server_name());
let mut content = ruma::events::room::create::CreateEventContent::new(conduit_user.clone());
content.federate = true;
content.predecessor = None;
content.room_version = RoomVersionId::Version6;
// 1. The room create event
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomCreate,
content: serde_json::to_value(content).expect("event is valid, we just created it"),
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
&conduit_user,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// 2. Make conduit bot join
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent {
membership: member::MembershipState::Join,
displayname: None,
avatar_url: None,
is_direct: None,
third_party_invite: None,
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(conduit_user.to_string()),
redacts: None,
},
&conduit_user,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// 3. Power levels
let mut users = BTreeMap::new();
users.insert(conduit_user.clone(), 100.into());
users.insert(user_id.clone(), 100.into());
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomPowerLevels,
content: serde_json::to_value(
ruma::events::room::power_levels::PowerLevelsEventContent {
ban: 50.into(),
events: BTreeMap::new(),
events_default: 0.into(),
invite: 50.into(),
kick: 50.into(),
redact: 50.into(),
state_default: 50.into(),
users,
users_default: 0.into(),
notifications: ruma::events::room::power_levels::NotificationPowerLevels {
room: 50.into(),
},
},
)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
&conduit_user,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// 4.1 Join Rules
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomJoinRules,
content: serde_json::to_value(join_rules::JoinRulesEventContent::new(
join_rules::JoinRule::Invite,
))
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
&conduit_user,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// 4.2 History Visibility
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomHistoryVisibility,
content: serde_json::to_value(
history_visibility::HistoryVisibilityEventContent::new(
history_visibility::HistoryVisibility::Shared,
),
)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
&conduit_user,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// 4.3 Guest Access
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomGuestAccess,
content: serde_json::to_value(guest_access::GuestAccessEventContent::new(
guest_access::GuestAccess::Forbidden,
))
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
&conduit_user,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// 6. Events implied by name and topic
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomName,
content: serde_json::to_value(
name::NameEventContent::new("Admin Room".to_owned()).map_err(|_| {
Error::BadRequest(ErrorKind::InvalidParam, "Name is invalid.")
})?,
)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
&conduit_user,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomTopic,
content: serde_json::to_value(topic::TopicEventContent {
topic: format!("Manage {}", db.globals.server_name()),
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
&conduit_user,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// Room alias
let alias: RoomAliasId = format!("#admins:{}", db.globals.server_name())
.try_into()
.expect("#admins:server_name is a valid alias name");
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomCanonicalAlias,
content: serde_json::to_value(canonical_alias::CanonicalAliasEventContent {
alias: Some(alias.clone()),
alt_aliases: Vec::new(),
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
&conduit_user,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?;
// Invite and join the real user
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent {
membership: member::MembershipState::Invite,
displayname: None,
avatar_url: None,
is_direct: None,
third_party_invite: None,
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(user_id.to_string()),
redacts: None,
},
&conduit_user,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent {
membership: member::MembershipState::Join,
displayname: None,
avatar_url: None,
is_direct: None,
third_party_invite: None,
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(user_id.to_string()),
redacts: None,
},
&user_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
}
Ok(register::Response {
access_token: Some(token),
user_id,
@ -223,7 +487,7 @@ pub fn register_route(
)]
pub fn change_password_route(
db: State<'_, Database>,
body: Ruma<change_password::Request>,
body: Ruma<change_password::Request<'_>>,
) -> ConduitResult<change_password::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let device_id = body.device_id.as_ref().expect("user is authenticated");
@ -303,9 +567,9 @@ pub fn whoami_route(body: Ruma<whoami::Request>) -> ConduitResult<whoami::Respon
feature = "conduit_bin",
post("/_matrix/client/r0/account/deactivate", data = "<body>")
)]
pub fn deactivate_route(
pub async fn deactivate_route(
db: State<'_, Database>,
body: Ruma<deactivate::Request>,
body: Ruma<deactivate::Request<'_>>,
) -> ConduitResult<deactivate::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let device_id = body.device_id.as_ref().expect("user is authenticated");
@ -354,17 +618,18 @@ pub fn deactivate_route(
third_party_invite: None,
};
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomMember,
content: serde_json::to_value(event).expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(sender_id.to_string()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
}

View file

@ -1,11 +1,14 @@
use super::State;
use crate::{server_server, ConduitResult, Database, Error, Ruma};
use ruma::api::{
client::{
error::ErrorKind,
r0::alias::{create_alias, delete_alias, get_alias},
use ruma::{
api::{
client::{
error::ErrorKind,
r0::alias::{create_alias, delete_alias, get_alias},
},
federation,
},
federation,
RoomAliasId,
};
#[cfg(feature = "conduit_bin")]
@ -17,7 +20,7 @@ use rocket::{delete, get, put};
)]
pub fn create_alias_route(
db: State<'_, Database>,
body: Ruma<create_alias::IncomingRequest>,
body: Ruma<create_alias::Request<'_>>,
) -> ConduitResult<create_alias::Response> {
if db.rooms.id_from_alias(&body.room_alias)?.is_some() {
return Err(Error::Conflict("Alias already exists."));
@ -26,7 +29,7 @@ pub fn create_alias_route(
db.rooms
.set_alias(&body.room_alias, Some(&body.room_id), &db.globals)?;
Ok(create_alias::Response.into())
Ok(create_alias::Response::new().into())
}
#[cfg_attr(
@ -35,11 +38,11 @@ pub fn create_alias_route(
)]
pub fn delete_alias_route(
db: State<'_, Database>,
body: Ruma<delete_alias::IncomingRequest>,
body: Ruma<delete_alias::Request<'_>>,
) -> ConduitResult<delete_alias::Response> {
db.rooms.set_alias(&body.room_alias, None, &db.globals)?;
Ok(delete_alias::Response.into())
Ok(delete_alias::Response::new().into())
}
#[cfg_attr(
@ -48,36 +51,33 @@ pub fn delete_alias_route(
)]
pub async fn get_alias_route(
db: State<'_, Database>,
body: Ruma<get_alias::IncomingRequest>,
body: Ruma<get_alias::Request<'_>>,
) -> ConduitResult<get_alias::Response> {
if body.room_alias.server_name() != db.globals.server_name() {
get_alias_helper(&db, &body.room_alias).await
}
pub async fn get_alias_helper(
db: &Database,
room_alias: &RoomAliasId,
) -> ConduitResult<get_alias::Response> {
if room_alias.server_name() != db.globals.server_name() {
let response = server_server::send_request(
&db,
body.room_alias.server_name().to_string(),
federation::query::get_room_information::v1::Request {
room_alias: body.room_alias.to_string(),
},
&db.globals,
room_alias.server_name().to_owned(),
federation::query::get_room_information::v1::Request { room_alias },
)
.await?;
return Ok(get_alias::Response {
room_id: response.room_id,
servers: response.servers,
}
.into());
return Ok(get_alias::Response::new(response.room_id, response.servers).into());
}
let room_id = db
.rooms
.id_from_alias(&body.room_alias)?
.id_from_alias(&room_alias)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
"Room with alias not found.",
))?;
Ok(get_alias::Response {
room_id,
servers: vec![db.globals.server_name().to_string()],
}
.into())
Ok(get_alias::Response::new(room_id, vec![db.globals.server_name().to_owned()]).into())
}

View file

@ -35,7 +35,7 @@ pub fn create_backup_route(
)]
pub fn update_backup_route(
db: State<'_, Database>,
body: Ruma<update_backup::Request>,
body: Ruma<update_backup::Request<'_>>,
) -> ConduitResult<update_backup::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
db.key_backups
@ -77,7 +77,7 @@ pub fn get_latest_backup_route(
)]
pub fn get_backup_route(
db: State<'_, Database>,
body: Ruma<get_backup::Request>,
body: Ruma<get_backup::Request<'_>>,
) -> ConduitResult<get_backup::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let algorithm = db
@ -92,7 +92,7 @@ pub fn get_backup_route(
algorithm,
count: (db.key_backups.count_keys(sender_id, &body.version)? as u32).into(),
etag: db.key_backups.get_etag(sender_id, &body.version)?,
version: body.version.clone(),
version: body.version.to_owned(),
}
.into())
}
@ -119,7 +119,7 @@ pub fn delete_backup_route(
)]
pub fn add_backup_keys_route(
db: State<'_, Database>,
body: Ruma<add_backup_keys::Request>,
body: Ruma<add_backup_keys::Request<'_>>,
) -> ConduitResult<add_backup_keys::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -205,7 +205,7 @@ pub fn add_backup_key_session_route(
)]
pub fn get_backup_keys_route(
db: State<'_, Database>,
body: Ruma<get_backup_keys::Request>,
body: Ruma<get_backup_keys::Request<'_>>,
) -> ConduitResult<get_backup_keys::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");

View file

@ -5,10 +5,9 @@ use ruma::{
error::ErrorKind,
r0::config::{get_global_account_data, set_global_account_data},
},
events::{custom::CustomEventContent, BasicEvent, EventType},
events::{custom::CustomEventContent, BasicEvent},
Raw,
};
use std::convert::TryFrom;
#[cfg(feature = "conduit_bin")]
use rocket::{get, put};
@ -19,7 +18,7 @@ use rocket::{get, put};
)]
pub fn set_global_account_data_route(
db: State<'_, Database>,
body: Ruma<set_global_account_data::Request>,
body: Ruma<set_global_account_data::Request<'_>>,
) -> ConduitResult<set_global_account_data::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -50,17 +49,13 @@ pub fn set_global_account_data_route(
)]
pub fn get_global_account_data_route(
db: State<'_, Database>,
body: Ruma<get_global_account_data::Request>,
body: Ruma<get_global_account_data::Request<'_>>,
) -> ConduitResult<get_global_account_data::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let data = db
.account_data
.get::<Raw<ruma::events::AnyBasicEvent>>(
None,
sender_id,
EventType::try_from(&body.event_type).expect("EventType::try_from can never fail"),
)?
.get::<Raw<ruma::events::AnyBasicEvent>>(None, sender_id, body.event_type.clone().into())?
.ok_or(Error::BadRequest(ErrorKind::NotFound, "Data not found."))?;
Ok(get_global_account_data::Response { account_data: data }.into())

View file

@ -12,7 +12,7 @@ use rocket::get;
)]
pub fn get_context_route(
db: State<'_, Database>,
body: Ruma<get_context::Request>,
body: Ruma<get_context::Request<'_>>,
) -> ConduitResult<get_context::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -49,7 +49,10 @@ pub fn get_context_route(
.filter_map(|r| r.ok()) // Remove buggy events
.collect::<Vec<_>>();
let start_token = events_before.last().map(|(count, _)| count.to_string());
let start_token = events_before
.last()
.and_then(|(pdu_id, _)| db.rooms.pdu_count(pdu_id).ok())
.map(|count| count.to_string());
let events_before = events_before
.into_iter()
@ -68,25 +71,28 @@ pub fn get_context_route(
.filter_map(|r| r.ok()) // Remove buggy events
.collect::<Vec<_>>();
let end_token = events_after.last().map(|(count, _)| count.to_string());
let end_token = events_after
.last()
.and_then(|(pdu_id, _)| db.rooms.pdu_count(pdu_id).ok())
.map(|count| count.to_string());
let events_after = events_after
.into_iter()
.map(|(_, pdu)| pdu.to_room_event())
.collect::<Vec<_>>();
Ok(get_context::Response {
start: start_token,
end: end_token,
events_before,
event: Some(base_event),
events_after,
state: db // TODO: State at event
.rooms
.room_state_full(&body.room_id)?
.values()
.map(|pdu| pdu.to_state_event())
.collect(),
}
.into())
let mut resp = get_context::Response::new();
resp.start = start_token;
resp.end = end_token;
resp.events_before = events_before;
resp.event = Some(base_event);
resp.events_after = events_after;
resp.state = db // TODO: State at event
.rooms
.room_state_full(&body.room_id)?
.values()
.map(|pdu| pdu.to_state_event())
.collect();
Ok(resp.into())
}

View file

@ -37,7 +37,7 @@ pub fn get_devices_route(
)]
pub fn get_device_route(
db: State<'_, Database>,
body: Ruma<get_device::Request>,
body: Ruma<get_device::Request<'_>>,
_device_id: String,
) -> ConduitResult<get_device::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -56,7 +56,7 @@ pub fn get_device_route(
)]
pub fn update_device_route(
db: State<'_, Database>,
body: Ruma<update_device::Request>,
body: Ruma<update_device::Request<'_>>,
_device_id: String,
) -> ConduitResult<update_device::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -80,7 +80,7 @@ pub fn update_device_route(
)]
pub fn delete_device_route(
db: State<'_, Database>,
body: Ruma<delete_device::Request>,
body: Ruma<delete_device::Request<'_>>,
_device_id: String,
) -> ConduitResult<delete_device::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -127,7 +127,7 @@ pub fn delete_device_route(
)]
pub fn delete_devices_route(
db: State<'_, Database>,
body: Ruma<delete_devices::Request>,
body: Ruma<delete_devices::Request<'_>>,
) -> ConduitResult<delete_devices::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let device_id = body.device_id.as_ref().expect("user is authenticated");

View file

@ -6,7 +6,7 @@ use ruma::{
error::ErrorKind,
r0::{
directory::{
self, get_public_rooms, get_public_rooms_filtered, get_room_visibility,
get_public_rooms, get_public_rooms_filtered, get_room_visibility,
set_room_visibility,
},
room,
@ -14,11 +14,14 @@ use ruma::{
},
federation,
},
directory::Filter,
directory::RoomNetwork,
directory::{IncomingFilter, IncomingRoomNetwork, PublicRoomsChunk},
events::{
room::{avatar, canonical_alias, guest_access, history_visibility, name, topic},
EventType,
},
Raw,
Raw, ServerName,
};
#[cfg(feature = "conduit_bin")]
@ -30,20 +33,103 @@ use rocket::{get, post, put};
)]
pub async fn get_public_rooms_filtered_route(
db: State<'_, Database>,
body: Ruma<get_public_rooms_filtered::IncomingRequest>,
body: Ruma<get_public_rooms_filtered::Request<'_>>,
) -> ConduitResult<get_public_rooms_filtered::Response> {
if let Some(other_server) = body
.server
get_public_rooms_filtered_helper(
&db,
body.server.as_deref(),
body.limit,
body.since.as_deref(),
&body.filter,
&body.room_network,
)
.await
}
#[cfg_attr(
feature = "conduit_bin",
get("/_matrix/client/r0/publicRooms", data = "<body>")
)]
pub async fn get_public_rooms_route(
db: State<'_, Database>,
body: Ruma<get_public_rooms::Request<'_>>,
) -> ConduitResult<get_public_rooms::Response> {
let response = get_public_rooms_filtered_helper(
&db,
body.server.as_deref(),
body.limit,
body.since.as_deref(),
&IncomingFilter::default(),
&IncomingRoomNetwork::Matrix,
)
.await?
.0;
Ok(get_public_rooms::Response {
chunk: response.chunk,
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
}
.into())
}
#[cfg_attr(
feature = "conduit_bin",
put("/_matrix/client/r0/directory/list/room/<_>", data = "<body>")
)]
pub async fn set_room_visibility_route(
db: State<'_, Database>,
body: Ruma<set_room_visibility::Request<'_>>,
) -> ConduitResult<set_room_visibility::Response> {
match body.visibility {
room::Visibility::Public => db.rooms.set_public(&body.room_id, true)?,
room::Visibility::Private => db.rooms.set_public(&body.room_id, false)?,
}
Ok(set_room_visibility::Response.into())
}
#[cfg_attr(
feature = "conduit_bin",
get("/_matrix/client/r0/directory/list/room/<_>", data = "<body>")
)]
pub async fn get_room_visibility_route(
db: State<'_, Database>,
body: Ruma<get_room_visibility::Request<'_>>,
) -> ConduitResult<get_room_visibility::Response> {
Ok(get_room_visibility::Response {
visibility: if db.rooms.is_public_room(&body.room_id)? {
room::Visibility::Public
} else {
room::Visibility::Private
},
}
.into())
}
pub async fn get_public_rooms_filtered_helper(
db: &Database,
server: Option<&ServerName>,
limit: Option<js_int::UInt>,
since: Option<&str>,
filter: &IncomingFilter,
_network: &IncomingRoomNetwork,
) -> ConduitResult<get_public_rooms_filtered::Response> {
if let Some(other_server) = server
.clone()
.filter(|server| server != &db.globals.server_name().as_str())
.filter(|server| *server != db.globals.server_name().as_str())
{
let response = server_server::send_request(
&db,
other_server,
federation::directory::get_public_rooms::v1::Request {
limit: body.limit,
since: body.since.clone(),
room_network: federation::directory::get_public_rooms::v1::RoomNetwork::Matrix,
&db.globals,
other_server.to_owned(),
federation::directory::get_public_rooms_filtered::v1::Request {
limit,
since: since.as_deref(),
filter: Filter {
generic_search_term: filter.generic_search_term.as_deref(),
},
room_network: RoomNetwork::Matrix,
},
)
.await?;
@ -72,10 +158,10 @@ pub async fn get_public_rooms_filtered_route(
.into());
}
let limit = body.limit.map_or(10, u64::from);
let mut since = 0_u64;
let limit = limit.map_or(10, u64::from);
let mut num_since = 0_u64;
if let Some(s) = &body.since {
if let Some(s) = &since {
let mut characters = s.chars();
let backwards = match characters.next() {
Some('n') => false,
@ -88,13 +174,13 @@ pub async fn get_public_rooms_filtered_route(
}
};
since = characters
num_since = characters
.collect::<String>()
.parse()
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `since` token."))?;
if backwards {
since = since.saturating_sub(limit);
num_since = num_since.saturating_sub(limit);
}
}
@ -107,7 +193,7 @@ pub async fn get_public_rooms_filtered_route(
// TODO: Do not load full state?
let state = db.rooms.room_state_full(&room_id)?;
let chunk = directory::PublicRoomsChunk {
let chunk = PublicRoomsChunk {
aliases: Vec::new(),
canonical_alias: state
.get(&(EventType::RoomCanonicalAlias, "".to_owned()))
@ -216,20 +302,20 @@ pub async fn get_public_rooms_filtered_route(
let chunk = all_rooms
.into_iter()
.skip(since as usize)
.skip(num_since as usize)
.take(limit as usize)
.collect::<Vec<_>>();
let prev_batch = if since == 0 {
let prev_batch = if num_since == 0 {
None
} else {
Some(format!("p{}", since))
Some(format!("p{}", num_since))
};
let next_batch = if chunk.len() < limit as usize {
None
} else {
Some(format!("n{}", since + limit))
Some(format!("n{}", num_since + limit))
};
Ok(get_public_rooms_filtered::Response {
@ -240,89 +326,3 @@ pub async fn get_public_rooms_filtered_route(
}
.into())
}
#[cfg_attr(
feature = "conduit_bin",
get("/_matrix/client/r0/publicRooms", data = "<body>")
)]
pub async fn get_public_rooms_route(
db: State<'_, Database>,
body: Ruma<get_public_rooms::IncomingRequest>,
) -> ConduitResult<get_public_rooms::Response> {
let Ruma {
body:
get_public_rooms::IncomingRequest {
limit,
server,
since,
},
sender_id,
device_id,
json_body,
} = body;
let get_public_rooms_filtered::Response {
chunk,
prev_batch,
next_batch,
total_room_count_estimate,
} = get_public_rooms_filtered_route(
db,
Ruma {
body: get_public_rooms_filtered::IncomingRequest {
filter: None,
limit,
room_network: get_public_rooms_filtered::RoomNetwork::Matrix,
server,
since,
},
sender_id,
device_id,
json_body,
},
)
.await?
.0;
Ok(get_public_rooms::Response {
chunk,
prev_batch,
next_batch,
total_room_count_estimate,
}
.into())
}
#[cfg_attr(
feature = "conduit_bin",
put("/_matrix/client/r0/directory/list/room/<_>", data = "<body>")
)]
pub async fn set_room_visibility_route(
db: State<'_, Database>,
body: Ruma<set_room_visibility::Request>,
) -> ConduitResult<set_room_visibility::Response> {
match body.visibility {
room::Visibility::Public => db.rooms.set_public(&body.room_id, true)?,
room::Visibility::Private => db.rooms.set_public(&body.room_id, false)?,
}
Ok(set_room_visibility::Response.into())
}
#[cfg_attr(
feature = "conduit_bin",
get("/_matrix/client/r0/directory/list/room/<_>", data = "<body>")
)]
pub async fn get_room_visibility_route(
db: State<'_, Database>,
body: Ruma<get_room_visibility::Request>,
) -> ConduitResult<get_room_visibility::Response> {
Ok(get_room_visibility::Response {
visibility: if db.rooms.is_public_room(&body.room_id)? {
room::Visibility::Public
} else {
room::Visibility::Private
},
}
.into())
}

View file

@ -7,23 +7,18 @@ use rocket::{get, post};
#[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/user/<_>/filter/<_>"))]
pub fn get_filter_route() -> ConduitResult<get_filter::Response> {
// TODO
Ok(get_filter::Response {
filter: filter::FilterDefinition {
event_fields: None,
event_format: None,
account_data: None,
room: None,
presence: None,
},
}
Ok(get_filter::Response::new(filter::IncomingFilterDefinition {
event_fields: None,
event_format: None,
account_data: None,
room: None,
presence: None,
})
.into())
}
#[cfg_attr(feature = "conduit_bin", post("/_matrix/client/r0/user/<_>/filter"))]
pub fn create_filter_route() -> ConduitResult<create_filter::Response> {
// TODO
Ok(create_filter::Response {
filter_id: utils::random_string(10),
}
.into())
Ok(create_filter::Response::new(utils::random_string(10)).into())
}

View file

@ -11,7 +11,7 @@ use ruma::{
uiaa::{AuthFlow, UiaaInfo},
},
},
encryption::UnsignedDeviceInfo,
encryption::IncomingUnsignedDeviceInfo,
};
use std::collections::{BTreeMap, HashSet};
@ -24,7 +24,7 @@ use rocket::{get, post};
)]
pub fn upload_keys_route(
db: State<'_, Database>,
body: Ruma<upload_keys::Request>,
body: Ruma<upload_keys::Request<'_>>,
) -> ConduitResult<upload_keys::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let device_id = body.device_id.as_ref().expect("user is authenticated");
@ -56,7 +56,7 @@ pub fn upload_keys_route(
)]
pub fn get_keys_route(
db: State<'_, Database>,
body: Ruma<get_keys::IncomingRequest>,
body: Ruma<get_keys::Request<'_>>,
) -> ConduitResult<get_keys::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -78,9 +78,9 @@ pub fn get_keys_route(
Error::bad_database("all_device_keys contained nonexistent device.")
})?;
keys.unsigned = Some(UnsignedDeviceInfo {
keys.unsigned = IncomingUnsignedDeviceInfo {
device_display_name: metadata.display_name,
});
};
container.insert(device_id, keys);
}
@ -97,9 +97,9 @@ pub fn get_keys_route(
),
)?;
keys.unsigned = Some(UnsignedDeviceInfo {
keys.unsigned = IncomingUnsignedDeviceInfo {
device_display_name: metadata.display_name,
});
};
container.insert(device_id.clone(), keys);
}
@ -167,7 +167,7 @@ pub fn claim_keys_route(
)]
pub fn upload_signing_keys_route(
db: State<'_, Database>,
body: Ruma<upload_signing_keys::Request>,
body: Ruma<upload_signing_keys::Request<'_>>,
) -> ConduitResult<upload_signing_keys::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let device_id = body.device_id.as_ref().expect("user is authenticated");
@ -280,7 +280,7 @@ pub fn upload_signatures_route(
)]
pub fn get_key_changes_route(
db: State<'_, Database>,
body: Ruma<get_key_changes::IncomingRequest>,
body: Ruma<get_key_changes::Request<'_>>,
) -> ConduitResult<get_key_changes::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");

View file

@ -1,5 +1,7 @@
use super::State;
use crate::{database::media::FileMeta, utils, ConduitResult, Database, Error, Ruma};
use crate::{
database::media::FileMeta, server_server, utils, ConduitResult, Database, Error, Ruma,
};
use ruma::api::client::{
error::ErrorKind,
r0::media::{create_content, get_content, get_content_thumbnail, get_media_config},
@ -27,7 +29,7 @@ pub fn get_media_config_route(
)]
pub fn create_content_route(
db: State<'_, Database>,
body: Ruma<create_content::Request>,
body: Ruma<create_content::Request<'_>>,
) -> ConduitResult<create_content::Response> {
let mxc = format!(
"mxc://{}/{}",
@ -36,7 +38,7 @@ pub fn create_content_route(
);
db.media.create(
mxc.clone(),
body.filename.as_ref(),
&body.filename.as_deref(),
&body.content_type,
&body.file,
)?;
@ -46,24 +48,19 @@ pub fn create_content_route(
#[cfg_attr(
feature = "conduit_bin",
get(
"/_matrix/media/r0/download/<_server_name>/<_media_id>",
data = "<body>"
)
get("/_matrix/media/r0/download/<_>/<_>", data = "<body>")
)]
pub fn get_content_route(
pub async fn get_content_route(
db: State<'_, Database>,
body: Ruma<get_content::Request>,
_server_name: String,
_media_id: String,
body: Ruma<get_content::Request<'_>>,
) -> ConduitResult<get_content::Response> {
let mxc = format!("mxc://{}/{}", body.server_name, body.media_id);
if let Some(FileMeta {
filename,
content_type,
file,
}) = db
.media
.get(format!("mxc://{}/{}", body.server_name, body.media_id))?
}) = db.media.get(&mxc)?
{
Ok(get_content::Response {
file,
@ -71,6 +68,26 @@ pub fn get_content_route(
content_disposition: filename.unwrap_or_default(), // TODO: Spec says this should be optional
}
.into())
} else if &*body.server_name != db.globals.server_name() && body.allow_remote {
let get_content_response = server_server::send_request(
&db.globals,
body.server_name.clone(),
get_content::Request {
allow_remote: false,
server_name: &body.server_name,
media_id: &body.media_id,
},
)
.await?;
db.media.create(
mxc,
&Some(&get_content_response.content_disposition),
&get_content_response.content_type,
&get_content_response.file,
)?;
Ok(get_content_response.into())
} else {
Err(Error::BadRequest(ErrorKind::NotFound, "Media not found."))
}
@ -78,21 +95,18 @@ pub fn get_content_route(
#[cfg_attr(
feature = "conduit_bin",
get(
"/_matrix/media/r0/thumbnail/<_server_name>/<_media_id>",
data = "<body>"
)
get("/_matrix/media/r0/thumbnail/<_>/<_>", data = "<body>")
)]
pub fn get_content_thumbnail_route(
pub async fn get_content_thumbnail_route(
db: State<'_, Database>,
body: Ruma<get_content_thumbnail::Request>,
_server_name: String,
_media_id: String,
body: Ruma<get_content_thumbnail::Request<'_>>,
) -> ConduitResult<get_content_thumbnail::Response> {
let mxc = format!("mxc://{}/{}", body.server_name, body.media_id);
if let Some(FileMeta {
content_type, file, ..
}) = db.media.get_thumbnail(
format!("mxc://{}/{}", body.server_name, body.media_id),
mxc.clone(),
body.width
.try_into()
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?,
@ -101,6 +115,31 @@ pub fn get_content_thumbnail_route(
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?,
)? {
Ok(get_content_thumbnail::Response { file, content_type }.into())
} else if &*body.server_name != db.globals.server_name() && body.allow_remote {
let get_thumbnail_response = server_server::send_request(
&db.globals,
body.server_name.clone(),
get_content_thumbnail::Request {
allow_remote: false,
height: body.height,
width: body.width,
method: body.method,
server_name: &body.server_name,
media_id: &body.media_id,
},
)
.await?;
db.media.upload_thumbnail(
mxc,
&None,
&get_thumbnail_response.content_type,
body.width.try_into().expect("all UInts are valid u32s"),
body.height.try_into().expect("all UInts are valid u32s"),
&get_thumbnail_response.file,
)?;
Ok(get_thumbnail_response.into())
} else {
Err(Error::BadRequest(ErrorKind::NotFound, "Media not found."))
}

File diff suppressed because it is too large Load diff

View file

@ -5,6 +5,7 @@ use ruma::{
error::ErrorKind,
r0::message::{get_message_events, send_message_event},
},
events::EventContent,
EventId,
};
use std::convert::{TryFrom, TryInto};
@ -16,9 +17,9 @@ use rocket::{get, put};
feature = "conduit_bin",
put("/_matrix/client/r0/rooms/<_>/send/<_>/<_>", data = "<body>")
)]
pub fn send_message_event_route(
pub async fn send_message_event_route(
db: State<'_, Database>,
body: Ruma<send_message_event::IncomingRequest>,
body: Ruma<send_message_event::Request<'_>>,
) -> ConduitResult<send_message_event::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let device_id = body.device_id.as_ref().expect("user is authenticated");
@ -48,11 +49,9 @@ pub fn send_message_event_route(
let mut unsigned = serde_json::Map::new();
unsigned.insert("transaction_id".to_owned(), body.txn_id.clone().into());
let event_id = db.rooms.append_pdu(
let event_id = db.rooms.build_and_append_pdu(
PduBuilder {
room_id: body.room_id.clone(),
sender: sender_id.clone(),
event_type: body.event_type.clone(),
event_type: body.content.event_type().into(),
content: serde_json::from_str(
body.json_body
.as_ref()
@ -64,13 +63,17 @@ pub fn send_message_event_route(
state_key: None,
redacts: None,
},
&sender_id,
&body.room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
db.transaction_ids
.add_txnid(sender_id, device_id, &body.txn_id, event_id.as_bytes())?;
Ok(send_message_event::Response { event_id }.into())
Ok(send_message_event::Response::new(event_id).into())
}
#[cfg_attr(
@ -79,7 +82,7 @@ pub fn send_message_event_route(
)]
pub fn get_message_events_route(
db: State<'_, Database>,
body: Ruma<get_message_events::Request>,
body: Ruma<get_message_events::Request<'_>>,
) -> ConduitResult<get_message_events::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -111,6 +114,12 @@ pub fn get_message_events_route(
.pdus_after(&sender_id, &body.room_id, from)
.take(limit)
.filter_map(|r| r.ok()) // Filter out buggy events
.filter_map(|(pdu_id, pdu)| {
db.rooms
.pdu_count(&pdu_id)
.map(|pdu_count| (pdu_count, pdu))
.ok()
})
.take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to`
.collect::<Vec<_>>();
@ -121,13 +130,13 @@ pub fn get_message_events_route(
.map(|(_, pdu)| pdu.to_room_event())
.collect::<Vec<_>>();
Ok(get_message_events::Response {
start: Some(body.from.clone()),
end: end_token,
chunk: events_after,
state: Vec::new(),
}
.into())
let mut resp = get_message_events::Response::new();
resp.start = Some(body.from.to_owned());
resp.end = end_token;
resp.chunk = events_after;
resp.state = Vec::new();
Ok(resp.into())
}
get_message_events::Direction::Backward => {
let events_before = db
@ -135,6 +144,12 @@ pub fn get_message_events_route(
.pdus_until(&sender_id, &body.room_id, from)
.take(limit)
.filter_map(|r| r.ok()) // Filter out buggy events
.filter_map(|(pdu_id, pdu)| {
db.rooms
.pdu_count(&pdu_id)
.map(|pdu_count| (pdu_count, pdu))
.ok()
})
.take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to`
.collect::<Vec<_>>();
@ -145,13 +160,13 @@ pub fn get_message_events_route(
.map(|(_, pdu)| pdu.to_room_event())
.collect::<Vec<_>>();
Ok(get_message_events::Response {
start: Some(body.from.clone()),
end: start_token,
chunk: events_before,
state: Vec::new(),
}
.into())
let mut resp = get_message_events::Response::new();
resp.start = Some(body.from.to_owned());
resp.end = start_token;
resp.chunk = events_before;
resp.state = Vec::new();
Ok(resp.into())
}
}
}

View file

@ -12,7 +12,7 @@ use rocket::put;
)]
pub fn set_presence_route(
db: State<'_, Database>,
body: Ruma<set_presence::Request>,
body: Ruma<set_presence::Request<'_>>,
) -> ConduitResult<set_presence::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");

View file

@ -19,9 +19,9 @@ use std::convert::TryInto;
feature = "conduit_bin",
put("/_matrix/client/r0/profile/<_>/displayname", data = "<body>")
)]
pub fn set_displayname_route(
pub async fn set_displayname_route(
db: State<'_, Database>,
body: Ruma<set_display_name::Request>,
body: Ruma<set_display_name::Request<'_>>,
) -> ConduitResult<set_display_name::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -31,10 +31,8 @@ pub fn set_displayname_route(
// Send a new membership event and presence update into all joined rooms
for room_id in db.rooms.rooms_joined(&sender_id) {
let room_id = room_id?;
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomMember,
content: serde_json::to_value(ruma::events::room::member::MemberEventContent {
displayname: body.displayname.clone(),
@ -62,7 +60,10 @@ pub fn set_displayname_route(
state_key: Some(sender_id.to_string()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
@ -98,7 +99,7 @@ pub fn set_displayname_route(
)]
pub fn get_displayname_route(
db: State<'_, Database>,
body: Ruma<get_display_name::Request>,
body: Ruma<get_display_name::Request<'_>>,
) -> ConduitResult<get_display_name::Response> {
Ok(get_display_name::Response {
displayname: db.users.displayname(&body.user_id)?,
@ -110,34 +111,20 @@ pub fn get_displayname_route(
feature = "conduit_bin",
put("/_matrix/client/r0/profile/<_>/avatar_url", data = "<body>")
)]
pub fn set_avatar_url_route(
pub async fn set_avatar_url_route(
db: State<'_, Database>,
body: Ruma<set_avatar_url::Request>,
body: Ruma<set_avatar_url::Request<'_>>,
) -> ConduitResult<set_avatar_url::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
if let Some(avatar_url) = &body.avatar_url {
if !avatar_url.starts_with("mxc://") {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"avatar_url has to start with mxc://.",
));
}
// TODO in the future when we can handle media uploads make sure that this url is our own server
// TODO also make sure this is valid mxc:// format (not only starting with it)
}
db.users
.set_avatar_url(&sender_id, body.avatar_url.clone())?;
// Send a new membership event and presence update into all joined rooms
for room_id in db.rooms.rooms_joined(&sender_id) {
let room_id = room_id?;
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomMember,
content: serde_json::to_value(ruma::events::room::member::MemberEventContent {
avatar_url: body.avatar_url.clone(),
@ -165,7 +152,10 @@ pub fn set_avatar_url_route(
state_key: Some(sender_id.to_string()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
@ -201,7 +191,7 @@ pub fn set_avatar_url_route(
)]
pub fn get_avatar_url_route(
db: State<'_, Database>,
body: Ruma<get_avatar_url::Request>,
body: Ruma<get_avatar_url::Request<'_>>,
) -> ConduitResult<get_avatar_url::Response> {
Ok(get_avatar_url::Response {
avatar_url: db.users.avatar_url(&body.user_id)?,
@ -215,7 +205,7 @@ pub fn get_avatar_url_route(
)]
pub fn get_profile_route(
db: State<'_, Database>,
body: Ruma<get_profile::Request>,
body: Ruma<get_profile::Request<'_>>,
) -> ConduitResult<get_profile::Response> {
if !db.users.exists(&body.user_id)? {
// Return 404 if this user doesn't exist

View file

@ -15,7 +15,7 @@ use std::{collections::BTreeMap, time::SystemTime};
)]
pub fn set_read_marker_route(
db: State<'_, Database>,
body: Ruma<set_read_marker::Request>,
body: Ruma<set_read_marker::Request<'_>>,
) -> ConduitResult<set_read_marker::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -53,7 +53,7 @@ pub fn set_read_marker_route(
);
let mut receipt_content = BTreeMap::new();
receipt_content.insert(
event.clone(),
event.to_owned(),
ruma::events::receipt::Receipts {
read: Some(user_receipts),
},

View file

@ -12,16 +12,14 @@ use rocket::put;
feature = "conduit_bin",
put("/_matrix/client/r0/rooms/<_>/redact/<_>/<_>", data = "<body>")
)]
pub fn redact_event_route(
pub async fn redact_event_route(
db: State<'_, Database>,
body: Ruma<redact_event::Request>,
body: Ruma<redact_event::Request<'_>>,
) -> ConduitResult<redact_event::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let event_id = db.rooms.append_pdu(
let event_id = db.rooms.build_and_append_pdu(
PduBuilder {
room_id: body.room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomRedaction,
content: serde_json::to_value(redaction::RedactionEventContent {
reason: body.reason.clone(),
@ -31,7 +29,10 @@ pub fn redact_event_route(
state_key: None,
redacts: Some(body.event_id.clone()),
},
&sender_id,
&body.room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;

View file

@ -20,9 +20,9 @@ use rocket::{get, post};
feature = "conduit_bin",
post("/_matrix/client/r0/createRoom", data = "<body>")
)]
pub fn create_room_route(
pub async fn create_room_route(
db: State<'_, Database>,
body: Ruma<create_room::Request>,
body: Ruma<create_room::Request<'_>>,
) -> ConduitResult<create_room::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -48,39 +48,35 @@ pub fn create_room_route(
})?;
let mut content = ruma::events::room::create::CreateEventContent::new(sender_id.clone());
content.federate = body.creation_content.as_ref().map_or(true, |c| c.federate);
content.predecessor = body
.creation_content
.as_ref()
.and_then(|c| c.predecessor.clone());
content.federate = body.creation_content.federate;
content.predecessor = body.creation_content.predecessor.clone();
content.room_version = RoomVersionId::Version6;
// 1. The room create event
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomCreate,
content: serde_json::to_value(content).expect("event is valid, we just created it"),
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// 2. Let the room creator join
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent {
membership: member::MembershipState::Join,
displayname: db.users.displayname(&sender_id)?,
avatar_url: db.users.avatar_url(&sender_id)?,
is_direct: body.is_direct,
is_direct: Some(body.is_direct),
third_party_invite: None,
})
.expect("event is valid, we just created it"),
@ -88,7 +84,10 @@ pub fn create_room_route(
state_key: Some(sender_id.to_string()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
@ -120,34 +119,32 @@ pub fn create_room_route(
})
.expect("event is valid, we just created it")
};
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomPowerLevels,
content: power_levels_content,
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// 4. Events set by preset
// Figure out preset. We need it for preset specific events
let visibility = body.visibility.unwrap_or(room::Visibility::Private);
let preset = body.preset.unwrap_or_else(|| match visibility {
let preset = body.preset.unwrap_or_else(|| match body.visibility {
room::Visibility::Private => create_room::RoomPreset::PrivateChat,
room::Visibility::Public => create_room::RoomPreset::PublicChat,
});
// 4.1 Join Rules
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomJoinRules,
content: match preset {
create_room::RoomPreset::PublicChat => serde_json::to_value(
@ -164,15 +161,16 @@ pub fn create_room_route(
state_key: Some("".to_owned()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// 4.2 History Visibility
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomHistoryVisibility,
content: serde_json::to_value(history_visibility::HistoryVisibilityEventContent::new(
history_visibility::HistoryVisibility::Shared,
@ -182,15 +180,16 @@ pub fn create_room_route(
state_key: Some("".to_owned()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// 4.3 Guest Access
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomGuestAccess,
content: match preset {
create_room::RoomPreset::PublicChat => {
@ -208,45 +207,39 @@ pub fn create_room_route(
state_key: Some("".to_owned()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// 5. Events listed in initial_state
for create_room::InitialStateEvent {
event_type,
state_key,
content,
} in &body.initial_state
{
for event in &body.initial_state {
let pdu_builder = serde_json::from_str::<PduBuilder>(
&serde_json::to_string(&event).expect("AnyInitialStateEvent::to_string always works"),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid initial state event."))?;
// Silently skip encryption events if they are not allowed
if event_type == &EventType::RoomEncryption && db.globals.encryption_disabled() {
if pdu_builder.event_type == EventType::RoomEncryption && db.globals.encryption_disabled() {
continue;
}
db.rooms.append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: event_type.clone(),
content: serde_json::from_str(content.get()).map_err(|_| {
Error::BadRequest(ErrorKind::BadJson, "Invalid initial_state content.")
})?,
unsigned: None,
state_key: state_key.clone(),
redacts: None,
},
db.rooms.build_and_append_pdu(
pdu_builder,
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
}
// 6. Events implied by name and topic
if let Some(name) = &body.name {
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomName,
content: serde_json::to_value(
name::NameEventContent::new(name.clone()).map_err(|_| {
@ -258,16 +251,17 @@ pub fn create_room_route(
state_key: Some("".to_owned()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
}
if let Some(topic) = &body.topic {
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomTopic,
content: serde_json::to_value(topic::TopicEventContent {
topic: topic.clone(),
@ -277,23 +271,24 @@ pub fn create_room_route(
state_key: Some("".to_owned()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
}
// 7. Events implied by invite (and TODO: invite_3pid)
for user in &body.invite {
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent {
membership: member::MembershipState::Invite,
displayname: db.users.displayname(&user)?,
avatar_url: db.users.avatar_url(&user)?,
is_direct: body.is_direct,
is_direct: Some(body.is_direct),
third_party_invite: None,
})
.expect("event is valid, we just created it"),
@ -301,7 +296,10 @@ pub fn create_room_route(
state_key: Some(user.to_string()),
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
}
@ -311,11 +309,11 @@ pub fn create_room_route(
db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?;
}
if let Some(room::Visibility::Public) = body.visibility {
if body.visibility == room::Visibility::Public {
db.rooms.set_public(&room_id, true)?;
}
Ok(create_room::Response { room_id }.into())
Ok(create_room::Response::new(room_id).into())
}
#[cfg_attr(
@ -324,7 +322,7 @@ pub fn create_room_route(
)]
pub fn get_room_event_route(
db: State<'_, Database>,
body: Ruma<get_room_event::Request>,
body: Ruma<get_room_event::Request<'_>>,
) -> ConduitResult<get_room_event::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -349,19 +347,15 @@ pub fn get_room_event_route(
feature = "conduit_bin",
post("/_matrix/client/r0/rooms/<_room_id>/upgrade", data = "<body>")
)]
pub fn upgrade_room_route(
pub async fn upgrade_room_route(
db: State<'_, Database>,
body: Ruma<upgrade_room::Request>,
body: Ruma<upgrade_room::Request<'_>>,
_room_id: String,
) -> ConduitResult<upgrade_room::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
// Validate the room version requested
let new_version =
RoomVersionId::try_from(body.new_version.clone()).expect("invalid room version id");
if !matches!(
new_version,
body.new_version,
RoomVersionId::Version5 | RoomVersionId::Version6
) {
return Err(Error::BadRequest(
@ -375,10 +369,8 @@ pub fn upgrade_room_route(
// Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further
// Fail if the sender does not have the required permissions
let tombstone_event_id = db.rooms.append_pdu(
let tombstone_event_id = db.rooms.build_and_append_pdu(
PduBuilder {
room_id: body.room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomTombstone,
content: serde_json::to_value(ruma::events::room::tombstone::TombstoneEventContent {
body: "This room has been replaced".to_string(),
@ -389,7 +381,10 @@ pub fn upgrade_room_route(
state_key: Some("".to_owned()),
redacts: None,
},
sender_id,
&body.room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
@ -415,13 +410,11 @@ pub fn upgrade_room_route(
let mut create_event_content =
ruma::events::room::create::CreateEventContent::new(sender_id.clone());
create_event_content.federate = federate;
create_event_content.room_version = new_version;
create_event_content.room_version = body.new_version.clone();
create_event_content.predecessor = predecessor;
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: replacement_room.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomCreate,
content: serde_json::to_value(create_event_content)
.expect("event is valid, we just created it"),
@ -429,15 +422,16 @@ pub fn upgrade_room_route(
state_key: Some("".to_owned()),
redacts: None,
},
sender_id,
&replacement_room,
&db.globals,
&db.sending,
&db.account_data,
)?;
// Join the new room
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: replacement_room.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent {
membership: member::MembershipState::Join,
@ -451,7 +445,10 @@ pub fn upgrade_room_route(
state_key: Some(sender_id.to_string()),
redacts: None,
},
sender_id,
&replacement_room,
&db.globals,
&db.sending,
&db.account_data,
)?;
@ -475,17 +472,18 @@ pub fn upgrade_room_route(
None => continue, // Skipping missing events.
};
db.rooms.append_pdu(
db.rooms.build_and_append_pdu(
PduBuilder {
room_id: replacement_room.clone(),
sender: sender_id.clone(),
event_type,
content: event_content,
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
sender_id,
&replacement_room,
&db.globals,
&db.sending,
&db.account_data,
)?;
}
@ -517,22 +515,21 @@ pub fn upgrade_room_route(
power_levels_event_content.invite = new_level;
// Modify the power levels in the old room to prevent sending of events and inviting new users
db.rooms
.append_pdu(
PduBuilder {
room_id: body.room_id.clone(),
sender: sender_id.clone(),
event_type: EventType::RoomPowerLevels,
content: serde_json::to_value(power_levels_event_content)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
&db.globals,
&db.account_data,
)
.ok();
let _ = db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomPowerLevels,
content: serde_json::to_value(power_levels_event_content)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some("".to_owned()),
redacts: None,
},
sender_id,
&body.room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
// Return the replacement room id
Ok(upgrade_room::Response { replacement_room }.into())

View file

@ -1,11 +1,10 @@
use super::State;
use crate::{ConduitResult, Database, Error, Ruma};
use js_int::uint;
use ruma::api::client::{error::ErrorKind, r0::search::search_events};
#[cfg(feature = "conduit_bin")]
use rocket::post;
use search_events::{ResultCategories, ResultRoomEvents, SearchResult};
use search_events::{EventContextResult, ResultCategories, ResultRoomEvents, SearchResult};
use std::collections::BTreeMap;
#[cfg_attr(
@ -14,7 +13,7 @@ use std::collections::BTreeMap;
)]
pub fn search_events_route(
db: State<'_, Database>,
body: Ruma<search_events::Request>,
body: Ruma<search_events::Request<'_>>,
) -> ConduitResult<search_events::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -51,7 +50,13 @@ pub fn search_events_route(
.0
.map(|result| {
Ok::<_, Error>(SearchResult {
context: None,
context: EventContextResult {
end: None,
events_after: Vec::new(),
events_before: Vec::new(),
profile_info: BTreeMap::new(),
start: None,
},
rank: None,
result: db
.rooms
@ -70,17 +75,15 @@ pub fn search_events_route(
Some((skip + limit).to_string())
};
Ok(search_events::Response {
search_categories: ResultCategories {
room_events: Some(ResultRoomEvents {
count: uint!(0), // TODO
groups: BTreeMap::new(), // TODO
next_batch,
results,
state: BTreeMap::new(), // TODO
highlights: search.1,
}),
Ok(search_events::Response::new(ResultCategories {
room_events: ResultRoomEvents {
count: None, // TODO? maybe not
groups: BTreeMap::new(), // TODO
next_batch,
results,
state: BTreeMap::new(), // TODO
highlights: search.1,
},
}
})
.into())
}

View file

@ -1,5 +1,4 @@
use super::State;
use super::{DEVICE_ID_LENGTH, TOKEN_LENGTH};
use super::{State, DEVICE_ID_LENGTH, TOKEN_LENGTH};
use crate::{utils, ConduitResult, Database, Error, Ruma};
use ruma::{
api::client::{
@ -18,10 +17,7 @@ use rocket::{get, post};
/// when logging in.
#[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/login"))]
pub fn get_login_types_route() -> ConduitResult<get_login_types::Response> {
Ok(get_login_types::Response {
flows: vec![get_login_types::LoginType::Password],
}
.into())
Ok(get_login_types::Response::new(vec![get_login_types::LoginType::Password]).into())
}
/// # `POST /_matrix/client/r0/login`
@ -40,15 +36,15 @@ pub fn get_login_types_route() -> ConduitResult<get_login_types::Response> {
)]
pub fn login_route(
db: State<'_, Database>,
body: Ruma<login::Request>,
body: Ruma<login::Request<'_>>,
) -> ConduitResult<login::Response> {
// Validate login method
let user_id =
// TODO: Other login methods
if let (login::UserInfo::MatrixId(username), login::LoginInfo::Password { password }) =
(body.user.clone(), body.login_info.clone())
if let (login::IncomingUserInfo::MatrixId(username), login::IncomingLoginInfo::Password { password }) =
(&body.user, &body.login_info)
{
let user_id = UserId::parse_with_server_name(username, db.globals.server_name())
let user_id = UserId::parse_with_server_name(username.to_string(), db.globals.server_name())
.map_err(|_| Error::BadRequest(
ErrorKind::InvalidUsername,
"Username is invalid."
@ -126,7 +122,7 @@ pub fn logout_route(
db.users.remove_device(&sender_id, device_id)?;
Ok(logout::Response.into())
Ok(logout::Response::new().into())
}
/// # `POST /_matrix/client/r0/logout/all`
@ -154,5 +150,5 @@ pub fn logout_all_route(
}
}
Ok(logout_all::Response.into())
Ok(logout_all::Response::new().into())
}

View file

@ -1,5 +1,5 @@
use super::State;
use crate::{pdu::PduBuilder, ConduitResult, Database, Error, Ruma};
use crate::{pdu::PduBuilder, ConduitResult, Database, Error, Result, Ruma};
use ruma::{
api::client::{
error::ErrorKind,
@ -8,8 +8,8 @@ use ruma::{
send_state_event_for_empty_key, send_state_event_for_key,
},
},
events::{room::canonical_alias, EventType},
Raw,
events::{AnyStateEventContent, EventContent},
EventId, RoomId, UserId,
};
#[cfg(feature = "conduit_bin")]
@ -19,9 +19,9 @@ use rocket::{get, put};
feature = "conduit_bin",
put("/_matrix/client/r0/rooms/<_>/state/<_>/<_>", data = "<body>")
)]
pub fn send_state_event_for_key_route(
pub async fn send_state_event_for_key_route(
db: State<'_, Database>,
body: Ruma<send_state_event_for_key::IncomingRequest>,
body: Ruma<send_state_event_for_key::Request<'_>>,
) -> ConduitResult<send_state_event_for_key::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -33,93 +33,57 @@ pub fn send_state_event_for_key_route(
)
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?;
if body.event_type == EventType::RoomCanonicalAlias {
let canonical_alias = serde_json::from_value::<
Raw<canonical_alias::CanonicalAliasEventContent>,
>(content.clone())
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid canonical alias."))?;
let mut aliases = canonical_alias.alt_aliases;
if let Some(alias) = canonical_alias.alias {
aliases.push(alias);
}
for alias in aliases {
if alias.server_name() != db.globals.server_name()
|| db
.rooms
.id_from_alias(&alias)?
.filter(|room| room == &body.room_id) // Make sure it's the right room
.is_none()
{
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"You are only allowed to send canonical_alias \
events when it's aliases already exists",
));
}
}
}
let event_id = db.rooms.append_pdu(
PduBuilder {
room_id: body.room_id.clone(),
sender: sender_id.clone(),
event_type: body.event_type.clone(),
Ok(send_state_event_for_key::Response::new(
send_state_event_for_key_helper(
&db,
sender_id,
&body.content,
content,
unsigned: None,
state_key: Some(body.state_key.clone()),
redacts: None,
},
&db.globals,
&db.account_data,
)?;
Ok(send_state_event_for_key::Response { event_id }.into())
&body.room_id,
Some(body.state_key.to_owned()),
)
.await?,
)
.into())
}
#[cfg_attr(
feature = "conduit_bin",
put("/_matrix/client/r0/rooms/<_>/state/<_>", data = "<body>")
)]
pub fn send_state_event_for_empty_key_route(
pub async fn send_state_event_for_empty_key_route(
db: State<'_, Database>,
body: Ruma<send_state_event_for_empty_key::IncomingRequest>,
body: Ruma<send_state_event_for_empty_key::Request<'_>>,
) -> ConduitResult<send_state_event_for_empty_key::Response> {
// This just calls send_state_event_for_key_route
let Ruma {
body:
send_state_event_for_empty_key::IncomingRequest {
room_id,
event_type,
data,
},
body,
sender_id,
device_id,
device_id: _,
json_body,
} = body;
Ok(send_state_event_for_empty_key::Response {
event_id: send_state_event_for_key_route(
db,
Ruma {
body: send_state_event_for_key::IncomingRequest {
room_id,
event_type,
data,
state_key: "".to_owned(),
},
sender_id,
device_id,
json_body,
},
)?
.0
.event_id,
}
let json = serde_json::from_str::<serde_json::Value>(
json_body
.as_ref()
.ok_or(Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?
.get(),
)
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?;
Ok(send_state_event_for_empty_key::Response::new(
send_state_event_for_key_helper(
&db,
sender_id
.as_ref()
.expect("no user for send state empty key rout"),
&body.content,
json,
&body.room_id,
Some("".into()),
)
.await?,
)
.into())
}
@ -214,3 +178,55 @@ pub fn get_state_events_for_empty_key_route(
}
.into())
}
pub async fn send_state_event_for_key_helper(
db: &Database,
sender: &UserId,
content: &AnyStateEventContent,
json: serde_json::Value,
room_id: &RoomId,
state_key: Option<String>,
) -> Result<EventId> {
let sender_id = sender;
if let AnyStateEventContent::RoomCanonicalAlias(canonical_alias) = content {
let mut aliases = canonical_alias.alt_aliases.clone();
if let Some(alias) = canonical_alias.alias.clone() {
aliases.push(alias);
}
for alias in aliases {
if alias.server_name() != db.globals.server_name()
|| db
.rooms
.id_from_alias(&alias)?
.filter(|room| room == room_id) // Make sure it's the right room
.is_none()
{
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"You are only allowed to send canonical_alias \
events when it's aliases already exists",
));
}
}
}
let event_id = db.rooms.build_and_append_pdu(
PduBuilder {
event_type: content.event_type().into(),
content: json,
unsigned: None,
state_key,
redacts: None,
},
&sender_id,
&room_id,
&db.globals,
&db.sending,
&db.account_data,
)?;
Ok(event_id)
}

View file

@ -31,7 +31,7 @@ use std::{
)]
pub async fn sync_events_route(
db: State<'_, Database>,
body: Ruma<sync_events::Request>,
body: Ruma<sync_events::Request<'_>>,
) -> ConduitResult<sync_events::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let device_id = body.device_id.as_ref().expect("user is authenticated");
@ -93,76 +93,134 @@ pub async fn sync_events_route(
let mut limited = false;
let mut state_pdus = Vec::new();
for pdu in non_timeline_pdus {
for (_, pdu) in non_timeline_pdus {
if pdu.state_key.is_some() {
state_pdus.push(pdu);
}
limited = true;
}
// Database queries:
let encrypted_room = db
.rooms
.room_state_get(&room_id, &EventType::RoomEncryption, "")?
.is_some();
// TODO: optimize this?
let mut send_member_count = false;
let mut joined_since_last_sync = false;
let mut new_encrypted_room = false;
for (state_key, pdu) in db
// These type is Option<Option<_>>. The outer Option is None when there is no event between
// since and the current room state, meaning there should be no updates.
// The inner Option is None when there is an event, but there is no state hash associated
// with it. This can happen for the RoomCreate event, so all updates should arrive.
let since_state_hash = db
.rooms
.pdus_since(&sender_id, &room_id, since)?
.filter_map(|r| r.ok())
.filter_map(|pdu| Some((pdu.state_key.clone()?, pdu)))
{
if pdu.kind == EventType::RoomMember {
send_member_count = true;
.pdus_after(sender_id, &room_id, since) // - 1 So we can get the event at since
.next()
.map(|pdu| db.rooms.pdu_state_hash(&pdu.ok()?.0).ok()?);
let content = serde_json::from_value::<
let since_members = since_state_hash.as_ref().map(|state_hash| {
state_hash.as_ref().and_then(|state_hash| {
db.rooms
.state_type(&state_hash, &EventType::RoomMember)
.ok()
})
});
let since_encryption = since_state_hash.as_ref().map(|state_hash| {
state_hash.as_ref().and_then(|state_hash| {
db.rooms
.state_get(&state_hash, &EventType::RoomEncryption, "")
.ok()
})
});
let current_members = db.rooms.room_state_type(&room_id, &EventType::RoomMember)?;
// Calculations:
let new_encrypted_room =
encrypted_room && since_encryption.map_or(false, |encryption| encryption.is_none());
let send_member_count = since_members.as_ref().map_or(false, |since_members| {
since_members.as_ref().map_or(true, |since_members| {
current_members.len() != since_members.len()
})
});
let since_sender_member = since_members.as_ref().map(|since_members| {
since_members.as_ref().and_then(|members| {
members.get(sender_id.as_str()).and_then(|pdu| {
serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>(
pdu.content.clone(),
)
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database."))
.ok()
})
})
});
if encrypted_room {
for (user_id, current_member) in current_members {
let current_membership = serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>,
>(pdu.content.clone())
>(current_member.content.clone())
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database."))?;
.map_err(|_| Error::bad_database("Invalid PDU in database."))?
.membership;
if pdu.state_key == Some(sender_id.to_string())
&& content.membership == MembershipState::Join
{
joined_since_last_sync = true;
} else if encrypted_room && content.membership == MembershipState::Join {
// A new user joined an encrypted room
let user_id = UserId::try_from(state_key)
.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
// Add encryption update if we didn't share an encrypted room already
if !share_encrypted_room(&db, &sender_id, &user_id, &room_id) {
device_list_updates.insert(user_id);
let since_membership =
since_members
.as_ref()
.map_or(MembershipState::Join, |members| {
members
.as_ref()
.and_then(|members| {
members.get(&user_id).and_then(|since_member| {
serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>,
>(
since_member.content.clone()
)
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| {
Error::bad_database("Invalid PDU in database.")
})
.ok()
})
})
.map_or(MembershipState::Leave, |member| member.membership)
});
let user_id = UserId::try_from(user_id)
.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
match (since_membership, current_membership) {
(MembershipState::Leave, MembershipState::Join) => {
// A new user joined an encrypted room
if !share_encrypted_room(&db, &sender_id, &user_id, &room_id) {
device_list_updates.insert(user_id);
}
}
} else if encrypted_room && content.membership == MembershipState::Leave {
// Write down users that have left encrypted rooms we are in
left_encrypted_users.insert(
UserId::try_from(state_key)
.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?,
);
(MembershipState::Join, MembershipState::Leave) => {
// Write down users that have left encrypted rooms we are in
left_encrypted_users.insert(user_id);
}
_ => {}
}
} else if pdu.kind == EventType::RoomEncryption {
new_encrypted_room = true;
}
}
let joined_since_last_sync = since_sender_member.map_or(false, |member| {
member.map_or(true, |member| member.membership != MembershipState::Join)
});
if joined_since_last_sync && encrypted_room || new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users
device_list_updates.extend(
db.rooms
.room_members(&room_id)
.filter_map(|user_id| {
Some(
UserId::try_from(user_id.ok()?.clone())
.map_err(|_| {
Error::bad_database("Invalid member event state key in db.")
})
.ok()?,
)
})
.filter_map(|user_id| Some(user_id.ok()?))
.filter(|user_id| {
// Don't send key updates from the sender to the sender
sender_id != user_id
@ -196,8 +254,8 @@ pub async fn sync_events_route(
.rooms
.all_pdus(&sender_id, &room_id)?
.filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
.filter(|pdu| pdu.kind == EventType::RoomMember)
.map(|pdu| {
.filter(|(_, pdu)| pdu.kind == EventType::RoomMember)
.map(|(_, pdu)| {
let content = serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>,
>(pdu.content.clone())
@ -252,7 +310,7 @@ pub async fn sync_events_route(
(db.rooms
.pdus_since(&sender_id, &room_id, last_read)?
.filter_map(|pdu| pdu.ok()) // Filter out buggy events
.filter(|pdu| {
.filter(|(_, pdu)| {
matches!(
pdu.kind.clone(),
EventType::RoomMessage | EventType::RoomEncrypted
@ -268,18 +326,15 @@ pub async fn sync_events_route(
None
};
let prev_batch = timeline_pdus.first().map_or(Ok::<_, Error>(None), |e| {
Ok(Some(
db.rooms
.get_pdu_count(&e.event_id)?
.ok_or_else(|| Error::bad_database("Can't find count from event in db."))?
.to_string(),
))
})?;
let prev_batch = timeline_pdus
.first()
.map_or(Ok::<_, Error>(None), |(pdu_id, _)| {
Ok(Some(db.rooms.pdu_count(pdu_id)?.to_string()))
})?;
let room_events = timeline_pdus
.into_iter()
.map(|pdu| pdu.to_sync_room_event())
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect::<Vec<_>>();
let mut edus = db
@ -388,7 +443,7 @@ pub async fn sync_events_route(
let pdus = db.rooms.pdus_since(&sender_id, &room_id, since)?;
let room_events = pdus
.filter_map(|pdu| pdu.ok()) // Filter out buggy events
.map(|pdu| pdu.to_sync_room_event())
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect();
let left_room = sync_events::LeftRoom {
@ -401,37 +456,43 @@ pub async fn sync_events_route(
state: sync_events::State { events: Vec::new() },
};
let mut left_since_last_sync = false;
for pdu in db.rooms.pdus_since(&sender_id, &room_id, since)? {
let pdu = pdu?;
if pdu.kind == EventType::RoomMember && pdu.state_key == Some(sender_id.to_string()) {
let content = serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>,
>(pdu.content.clone())
let since_member = db
.rooms
.pdus_after(sender_id, &room_id, since)
.next()
.and_then(|pdu| pdu.ok())
.and_then(|pdu| {
db.rooms
.pdu_state_hash(&pdu.0)
.ok()?
.ok_or_else(|| Error::bad_database("Pdu in db doesn't have a state hash."))
.ok()
})
.and_then(|state_hash| {
db.rooms
.state_get(&state_hash, &EventType::RoomMember, sender_id.as_str())
.ok()?
.ok_or_else(|| Error::bad_database("State hash in db doesn't have a state."))
.ok()
})
.and_then(|pdu| {
serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>(
pdu.content,
)
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database."))?;
.map_err(|_| Error::bad_database("Invalid PDU in database."))
.ok()
});
if content.membership == MembershipState::Leave {
left_since_last_sync = true;
break;
}
}
}
let left_since_last_sync =
since_member.map_or(false, |member| member.membership == MembershipState::Join);
if left_since_last_sync {
device_list_left.extend(
db.rooms
.room_members(&room_id)
.filter_map(|user_id| {
Some(
UserId::try_from(user_id.ok()?.clone())
.map_err(|_| {
Error::bad_database("Invalid member event state key in db.")
})
.ok()?,
)
})
.filter_map(|user_id| Some(user_id.ok()?))
.filter(|user_id| {
// Don't send key updates from the sender to the sender
sender_id != user_id
@ -454,7 +515,7 @@ pub async fn sync_events_route(
let room_id = room_id?;
let mut invited_since_last_sync = false;
for pdu in db.rooms.pdus_since(&sender_id, &room_id, since)? {
let pdu = pdu?;
let (_, pdu) = pdu?;
if pdu.kind == EventType::RoomMember && pdu.state_key == Some(sender_id.to_string()) {
let content = serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>,
@ -491,9 +552,7 @@ pub async fn sync_events_route(
}
for user_id in left_encrypted_users {
// If the user doesn't share an encrypted room with the target anymore, we need to tell
// them
if db
let still_share_encrypted_room = db
.rooms
.get_shared_rooms(vec![sender_id.clone(), user_id.clone()])
.filter_map(|r| r.ok())
@ -505,8 +564,10 @@ pub async fn sync_events_route(
.is_some(),
)
})
.all(|encrypted| !encrypted)
{
.all(|encrypted| !encrypted);
// If the user doesn't share an encrypted room with the target anymore, we need to tell
// them
if still_share_encrypted_room {
device_list_left.insert(user_id);
}
}
@ -544,7 +605,9 @@ pub async fn sync_events_route(
changed: device_list_updates.into_iter().collect(),
left: device_list_left.into_iter().collect(),
},
device_one_time_keys_count: if db.users.last_one_time_keys_update(sender_id)? > since {
device_one_time_keys_count: if db.users.last_one_time_keys_update(sender_id)? > since
|| since == 0
{
db.users.count_one_time_keys(sender_id, device_id)?
} else {
BTreeMap::new()

View file

@ -15,7 +15,7 @@ use rocket::{delete, get, put};
)]
pub fn update_tag_route(
db: State<'_, Database>,
body: Ruma<create_tag::Request>,
body: Ruma<create_tag::Request<'_>>,
) -> ConduitResult<create_tag::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -49,7 +49,7 @@ pub fn update_tag_route(
)]
pub fn delete_tag_route(
db: State<'_, Database>,
body: Ruma<delete_tag::Request>,
body: Ruma<delete_tag::Request<'_>>,
) -> ConduitResult<delete_tag::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
@ -80,7 +80,7 @@ pub fn delete_tag_route(
)]
pub fn get_tags_route(
db: State<'_, Database>,
body: Ruma<get_tags::Request>,
body: Ruma<get_tags::Request<'_>>,
) -> ConduitResult<get_tags::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");

View file

@ -14,7 +14,7 @@ use rocket::put;
)]
pub fn send_event_to_device_route(
db: State<'_, Database>,
body: Ruma<send_event_to_device::IncomingRequest>,
body: Ruma<send_event_to_device::Request<'_>>,
) -> ConduitResult<send_event_to_device::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let device_id = body.device_id.as_ref().expect("user is authenticated");

View file

@ -1,5 +1,6 @@
use super::State;
use crate::{utils, ConduitResult, Database, Ruma};
use create_typing_event::Typing;
use ruma::api::client::r0::typing::create_typing_event;
#[cfg(feature = "conduit_bin")]
@ -11,16 +12,15 @@ use rocket::put;
)]
pub fn create_typing_event_route(
db: State<'_, Database>,
body: Ruma<create_typing_event::Request>,
body: Ruma<create_typing_event::Request<'_>>,
) -> ConduitResult<create_typing_event::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
if body.typing {
if let Typing::Yes(duration) = body.state {
db.rooms.edus.typing_add(
&sender_id,
&body.room_id,
body.timeout.map(|d| d.as_millis() as u64).unwrap_or(30000)
+ utils::millis_since_unix_epoch(),
duration.as_millis() as u64 + utils::millis_since_unix_epoch(),
&db.globals,
)?;
} else {

View file

@ -1,6 +1,5 @@
use crate::ConduitResult;
use ruma::api::client::unversioned::get_supported_versions;
use std::collections::BTreeMap;
#[cfg(feature = "conduit_bin")]
use rocket::get;
@ -17,13 +16,11 @@ use rocket::get;
/// unstable features in their stable releases
#[cfg_attr(feature = "conduit_bin", get("/_matrix/client/versions"))]
pub fn get_supported_versions_route() -> ConduitResult<get_supported_versions::Response> {
let mut unstable_features = BTreeMap::new();
let mut resp =
get_supported_versions::Response::new(vec!["r0.5.0".to_owned(), "r0.6.0".to_owned()]);
unstable_features.insert("org.matrix.e2e_cross_signing".to_owned(), true);
resp.unstable_features
.insert("org.matrix.e2e_cross_signing".to_owned(), true);
Ok(get_supported_versions::Response {
versions: vec!["r0.5.0".to_owned(), "r0.6.0".to_owned()],
unstable_features,
}
.into())
Ok(resp.into())
}

View file

@ -11,7 +11,7 @@ use rocket::post;
)]
pub fn search_users_route(
db: State<'_, Database>,
body: Ruma<search_users::IncomingRequest>,
body: Ruma<search_users::Request<'_>>,
) -> ConduitResult<search_users::Response> {
let limit = u64::from(body.limit) as usize;

View file

@ -3,6 +3,7 @@ pub mod globals;
pub mod key_backups;
pub mod media;
pub mod rooms;
pub mod sending;
pub mod transaction_ids;
pub mod uiaa;
pub mod users;
@ -25,6 +26,7 @@ pub struct Database {
pub media: media::Media,
pub key_backups: key_backups::KeyBackups,
pub transaction_ids: transaction_ids::TransactionIds,
pub sending: sending::Sending,
pub _db: sled::Db,
}
@ -102,7 +104,6 @@ impl Database {
pduid_pdu: db.open_tree("pduid_pdu")?,
eventid_pduid: db.open_tree("eventid_pduid")?,
roomid_pduleaves: db.open_tree("roomid_pduleaves")?,
roomstateid_pdu: db.open_tree("roomstateid_pdu")?,
alias_roomid: db.open_tree("alias_roomid")?,
aliasid_alias: db.open_tree("alias_roomid")?,
@ -110,12 +111,17 @@ impl Database {
tokenids: db.open_tree("tokenids")?,
roomserverids: db.open_tree("roomserverids")?,
userroomid_joined: db.open_tree("userroomid_joined")?,
roomuserid_joined: db.open_tree("roomuserid_joined")?,
roomuseroncejoinedids: db.open_tree("roomuseroncejoinedids")?,
userroomid_invited: db.open_tree("userroomid_invited")?,
roomuserid_invited: db.open_tree("roomuserid_invited")?,
userroomid_left: db.open_tree("userroomid_left")?,
stateid_pduid: db.open_tree("stateid_pduid")?,
pduid_statehash: db.open_tree("pduid_statehash")?,
roomid_statehash: db.open_tree("roomid_statehash")?,
},
account_data: account_data::AccountData {
roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?,
@ -131,6 +137,9 @@ impl Database {
transaction_ids: transaction_ids::TransactionIds {
userdevicetxnid_response: db.open_tree("userdevicetxnid_response")?,
},
sending: sending::Sending {
serverpduids: db.open_tree("serverpduids")?,
},
_db: db,
})
}

View file

@ -1,32 +1,61 @@
use crate::{utils, Error, Result};
use log::error;
use ruma::ServerName;
use std::convert::TryInto;
use std::{convert::TryInto, sync::Arc};
pub const COUNTER: &str = "c";
#[derive(Clone)]
pub struct Globals {
pub(super) globals: sled::Tree,
keypair: ruma::signatures::Ed25519KeyPair,
keypair: Arc<ruma::signatures::Ed25519KeyPair>,
reqwest_client: reqwest::Client,
server_name: Box<ServerName>,
max_request_size: u32,
registration_disabled: bool,
encryption_disabled: bool,
federation_enabled: bool,
}
impl Globals {
pub fn load(globals: sled::Tree, config: &rocket::Config) -> Result<Self> {
let keypair = ruma::signatures::Ed25519KeyPair::new(
&*globals
.update_and_fetch("keypair", utils::generate_keypair)?
.expect("utils::generate_keypair always returns Some"),
"key1".to_owned(),
let bytes = &*globals
.update_and_fetch("keypair", utils::generate_keypair)?
.expect("utils::generate_keypair always returns Some");
let mut parts = bytes.splitn(2, |&b| b == 0xff);
let keypair = utils::string_from_bytes(
// 1. version
parts
.next()
.expect("splitn always returns at least one element"),
)
.map_err(|_| Error::bad_database("Private or public keys are invalid."))?;
.map_err(|_| Error::bad_database("Invalid version bytes in keypair."))
.and_then(|version| {
// 2. key
parts
.next()
.ok_or_else(|| Error::bad_database("Invalid keypair format in database."))
.map(|key| (version, key))
})
.and_then(|(version, key)| {
ruma::signatures::Ed25519KeyPair::new(&key, version)
.map_err(|_| Error::bad_database("Private or public keys are invalid."))
});
let keypair = match keypair {
Ok(k) => k,
Err(e) => {
error!("Keypair invalid. Deleting...");
globals.remove("keypair")?;
return Err(e);
}
};
Ok(Self {
globals,
keypair,
keypair: Arc::new(keypair),
reqwest_client: reqwest::Client::new(),
server_name: config
.get_str("server_name")
@ -41,6 +70,7 @@ impl Globals {
.map_err(|_| Error::BadConfig("Invalid max_request_size."))?,
registration_disabled: config.get_bool("registration_disabled").unwrap_or(false),
encryption_disabled: config.get_bool("encryption_disabled").unwrap_or(false),
federation_enabled: config.get_bool("federation_enabled").unwrap_or(false),
})
}
@ -86,4 +116,8 @@ impl Globals {
pub fn encryption_disabled(&self) -> bool {
self.encryption_disabled
}
pub fn federation_enabled(&self) -> bool {
self.federation_enabled
}
}

View file

@ -16,7 +16,7 @@ impl Media {
pub fn create(
&self,
mxc: String,
filename: Option<&String>,
filename: &Option<&str>,
content_type: &str,
file: &[u8],
) -> Result<()> {
@ -25,7 +25,31 @@ impl Media {
key.extend_from_slice(&0_u32.to_be_bytes()); // Width = 0 if it's not a thumbnail
key.extend_from_slice(&0_u32.to_be_bytes()); // Height = 0 if it's not a thumbnail
key.push(0xff);
key.extend_from_slice(filename.map(|f| f.as_bytes()).unwrap_or_default());
key.extend_from_slice(filename.as_ref().map(|f| f.as_bytes()).unwrap_or_default());
key.push(0xff);
key.extend_from_slice(content_type.as_bytes());
self.mediaid_file.insert(key, file)?;
Ok(())
}
/// Uploads or replaces a file thumbnail.
pub fn upload_thumbnail(
&self,
mxc: String,
filename: &Option<String>,
content_type: &str,
width: u32,
height: u32,
file: &[u8],
) -> Result<()> {
let mut key = mxc.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&width.to_be_bytes());
key.extend_from_slice(&height.to_be_bytes());
key.push(0xff);
key.extend_from_slice(filename.as_ref().map(|f| f.as_bytes()).unwrap_or_default());
key.push(0xff);
key.extend_from_slice(content_type.as_bytes());
@ -35,7 +59,7 @@ impl Media {
}
/// Downloads a file.
pub fn get(&self, mxc: String) -> Result<Option<FileMeta>> {
pub fn get(&self, mxc: &str) -> Result<Option<FileMeta>> {
let mut prefix = mxc.as_bytes().to_vec();
prefix.push(0xff);
prefix.extend_from_slice(&0_u32.to_be_bytes()); // Width = 0 if it's not a thumbnail

File diff suppressed because it is too large Load diff

View file

@ -11,8 +11,10 @@ use ruma::{
use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
mem,
};
#[derive(Clone)]
pub struct RoomEdus {
pub(in super::super) readreceiptid_readreceipt: sled::Tree, // ReadReceiptId = RoomId + Count + UserId
pub(in super::super) roomuserid_privateread: sled::Tree, // RoomUserId = Room + User, PrivateRead = Count
@ -227,9 +229,11 @@ impl RoomEdus {
let key = key?;
Ok::<_, Error>((
key.clone(),
utils::u64_from_bytes(key.split(|&b| b == 0xff).nth(1).ok_or_else(|| {
Error::bad_database("RoomTyping has invalid timestamp or delimiters.")
})?)
utils::u64_from_bytes(
&key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| {
Error::bad_database("RoomTyping has invalid timestamp or delimiters.")
})?[0..mem::size_of::<u64>()],
)
.map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?,
))
})

114
src/database/sending.rs Normal file
View file

@ -0,0 +1,114 @@
use std::{collections::HashSet, convert::TryFrom, time::SystemTime};
use crate::{server_server, utils, Error, PduEvent, Result};
use federation::transactions::send_transaction_message;
use log::warn;
use rocket::futures::stream::{FuturesUnordered, StreamExt};
use ruma::{api::federation, ServerName};
use sled::IVec;
use tokio::select;
pub struct Sending {
/// The state for a given state hash.
pub(super) serverpduids: sled::Tree, // ServerPduId = ServerName + PduId
}
impl Sending {
pub fn start_handler(&self, globals: &super::globals::Globals, rooms: &super::rooms::Rooms) {
let serverpduids = self.serverpduids.clone();
let rooms = rooms.clone();
let globals = globals.clone();
tokio::spawn(async move {
let mut futures = FuturesUnordered::new();
let mut waiting_servers = HashSet::new();
let mut subscriber = serverpduids.watch_prefix(b"");
loop {
select! {
Some(server) = futures.next() => {
warn!("response: {:?}", &server);
warn!("futures left: {}", &futures.len());
match server {
Ok((server, _response)) => {
waiting_servers.remove(&server)
}
Err((server, _e)) => {
waiting_servers.remove(&server)
}
};
},
Some(event) = &mut subscriber => {
if let sled::Event::Insert { key, .. } = event {
let serverpduid = key.clone();
let mut parts = serverpduid.splitn(2, |&b| b == 0xff);
if let Some((server, pdu_id)) = utils::string_from_bytes(
parts
.next()
.expect("splitn will always return 1 or more elements"),
)
.map_err(|_| Error::bad_database("ServerName in serverpduid bytes are invalid."))
.and_then(|server_str|Box::<ServerName>::try_from(server_str)
.map_err(|_| Error::bad_database("ServerName in serverpduid is invalid.")))
.ok()
.filter(|server| waiting_servers.insert(server.clone()))
.and_then(|server| parts
.next()
.ok_or_else(|| Error::bad_database("Invalid serverpduid in db.")).ok().map(|pdu_id| (server, pdu_id)))
{
futures.push(Self::handle_event(server, pdu_id.into(), &globals, &rooms));
}
}
}
}
}
});
}
pub fn send_pdu(&self, server: Box<ServerName>, pdu_id: &[u8]) -> Result<()> {
let mut key = server.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(pdu_id);
self.serverpduids.insert(key, b"")?;
Ok(())
}
async fn handle_event(
server: Box<ServerName>,
pdu_id: IVec,
globals: &super::globals::Globals,
rooms: &super::rooms::Rooms,
) -> std::result::Result<
(Box<ServerName>, send_transaction_message::v1::Response),
(Box<ServerName>, Error),
> {
let pdu_json = PduEvent::convert_to_outgoing_federation_event(
rooms
.get_pdu_json_from_id(&pdu_id)
.map_err(|e| (server.clone(), e))?
.ok_or_else(|| {
(
server.clone(),
Error::bad_database("Event in serverpduids not found in db."),
)
})?,
);
server_server::send_request(
&globals,
server.clone(),
send_transaction_message::v1::Request {
origin: globals.server_name(),
pdus: &[pdu_json],
edus: &[],
origin_server_ts: SystemTime::now(),
transaction_id: &utils::random_string(16),
},
)
.await
.map(|response| (server.clone(), response))
.map_err(|e| (server, e))
}
}

View file

@ -2,7 +2,7 @@ use crate::{Error, Result};
use ruma::{
api::client::{
error::ErrorKind,
r0::uiaa::{AuthData, UiaaInfo},
r0::uiaa::{IncomingAuthData, UiaaInfo},
},
DeviceId, UserId,
};
@ -26,12 +26,12 @@ impl Uiaa {
&self,
user_id: &UserId,
device_id: &DeviceId,
auth: &AuthData,
auth: &IncomingAuthData,
uiaainfo: &UiaaInfo,
users: &super::users::Users,
globals: &super::globals::Globals,
) -> Result<(bool, UiaaInfo)> {
if let AuthData::DirectRequest {
if let IncomingAuthData::DirectRequest {
kind,
session,
auth_parameters,

View file

@ -8,7 +8,7 @@ use ruma::{
keys::{CrossSigningKey, OneTimeKey},
},
},
encryption::DeviceKeys,
encryption::IncomingDeviceKeys,
events::{AnyToDeviceEvent, EventType},
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, Raw, UserId,
};
@ -57,6 +57,11 @@ impl Users {
Ok(())
}
/// Returns the number of users registered on this server.
pub fn count(&self) -> usize {
self.userid_password.iter().count()
}
/// Find out which user an access token belongs to.
pub fn find_from_token(&self, token: &str) -> Result<Option<(UserId, String)>> {
self.token_userdeviceid
@ -395,7 +400,7 @@ impl Users {
&self,
user_id: &UserId,
device_id: &DeviceId,
device_keys: &DeviceKeys,
device_keys: &IncomingDeviceKeys,
rooms: &super::rooms::Rooms,
globals: &super::globals::Globals,
) -> Result<()> {
@ -603,7 +608,7 @@ impl Users {
.room_state_get(&room_id, &EventType::RoomEncryption, "")?
.is_none()
{
return Ok(());
continue;
}
let mut key = room_id.to_string().as_bytes().to_vec();
@ -625,7 +630,7 @@ impl Users {
&self,
user_id: &UserId,
device_id: &DeviceId,
) -> Result<Option<DeviceKeys>> {
) -> Result<Option<IncomingDeviceKeys>> {
let mut key = user_id.to_string().as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(device_id.as_bytes());

View file

@ -70,14 +70,14 @@ where
use ErrorKind::*;
let (kind, status_code) = match self {
Self::BadRequest(kind, _) => (
kind,
kind.clone(),
match kind {
Forbidden | GuestAccessForbidden | ThreepidAuthFailed | ThreepidDenied => {
StatusCode::FORBIDDEN
}
Unauthorized | UnknownToken | MissingToken => StatusCode::UNAUTHORIZED,
Unauthorized | UnknownToken { .. } | MissingToken => StatusCode::UNAUTHORIZED,
NotFound => StatusCode::NOT_FOUND,
LimitExceeded => StatusCode::TOO_MANY_REQUESTS,
LimitExceeded { .. } => StatusCode::TOO_MANY_REQUESTS,
UserDeactivated => StatusCode::FORBIDDEN,
TooLarge => StatusCode::PAYLOAD_TOO_LARGE,
_ => StatusCode::BAD_REQUEST,

View file

@ -119,17 +119,21 @@ fn setup_rocket() -> rocket::Rocket {
client_server::get_pushers_route,
client_server::set_pushers_route,
client_server::upgrade_room_route,
server_server::well_known_server,
server_server::get_server_version,
server_server::get_server_keys,
server_server::get_server_keys_deprecated,
server_server::get_public_rooms_route,
server_server::get_public_rooms_filtered_route,
server_server::send_transaction_message_route,
server_server::get_missing_events_route,
server_server::get_profile_information_route,
],
)
.attach(AdHoc::on_attach("Config", |mut rocket| async {
let data = Database::load_or_create(rocket.config().await).expect("valid config");
data.sending.start_handler(&data.globals, &data.rooms);
Ok(rocket.manage(data))
}))
}

View file

@ -1,22 +1,21 @@
use crate::{Error, Result};
use crate::Error;
use js_int::UInt;
use ruma::{
events::{
pdu::EventHash, room::member::MemberEventContent, AnyRoomEvent, AnyStateEvent,
pdu::EventHash, room::member::MemberEventContent, AnyEvent, AnyRoomEvent, AnyStateEvent,
AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent, EventType, StateEvent,
},
EventId, Raw, RoomId, ServerName, UserId,
EventId, Raw, RoomId, ServerKeyId, ServerName, UserId,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::{collections::BTreeMap, convert::TryInto, sync::Arc, time::UNIX_EPOCH};
#[derive(Deserialize, Serialize)]
#[derive(Deserialize, Serialize, Debug)]
pub struct PduEvent {
pub event_id: EventId,
pub room_id: RoomId,
pub sender: UserId,
pub origin: Box<ServerName>,
pub origin_server_ts: UInt,
#[serde(rename = "type")]
pub kind: EventType,
@ -31,11 +30,11 @@ pub struct PduEvent {
#[serde(default, skip_serializing_if = "serde_json::Map::is_empty")]
pub unsigned: serde_json::Map<String, serde_json::Value>,
pub hashes: EventHash,
pub signatures: HashMap<String, HashMap<String, String>>,
pub signatures: BTreeMap<Box<ServerName>, BTreeMap<ServerKeyId, String>>,
}
impl PduEvent {
pub fn redact(&mut self, reason: &PduEvent) -> Result<()> {
pub fn redact(&mut self, reason: &PduEvent) -> crate::Result<()> {
self.unsigned.clear();
let allowed: &[&str] = match self.kind {
@ -101,6 +100,28 @@ impl PduEvent {
serde_json::from_value(json).expect("Raw::from_value always works")
}
/// This only works for events that are also AnyRoomEvents.
pub fn to_any_event(&self) -> Raw<AnyEvent> {
let mut json = json!({
"content": self.content,
"type": self.kind,
"event_id": self.event_id,
"sender": self.sender,
"origin_server_ts": self.origin_server_ts,
"unsigned": self.unsigned,
"room_id": self.room_id,
});
if let Some(state_key) = &self.state_key {
json["state_key"] = json!(state_key);
}
if let Some(redacts) = &self.redacts {
json["redacts"] = json!(redacts);
}
serde_json::from_value(json).expect("Raw::from_value always works")
}
pub fn to_room_event(&self) -> Raw<AnyRoomEvent> {
let mut json = json!({
"content": self.content,
@ -177,13 +198,93 @@ impl PduEvent {
serde_json::from_value(json).expect("Raw::from_value always works")
}
pub fn convert_to_outgoing_federation_event(
mut pdu_json: serde_json::Value,
) -> Raw<ruma::events::pdu::PduStub> {
if let Some(unsigned) = pdu_json
.as_object_mut()
.expect("json is object")
.get_mut("unsigned")
{
unsigned
.as_object_mut()
.expect("unsigned is object")
.remove("transaction_id");
}
pdu_json
.as_object_mut()
.expect("json is object")
.remove("event_id");
serde_json::from_value::<Raw<_>>(pdu_json).expect("Raw::from_value always works")
}
}
impl From<&state_res::StateEvent> for PduEvent {
fn from(pdu: &state_res::StateEvent) -> Self {
Self {
event_id: pdu.event_id().clone(),
room_id: pdu.room_id().unwrap().clone(),
sender: pdu.sender().clone(),
origin_server_ts: (pdu
.origin_server_ts()
.duration_since(UNIX_EPOCH)
.expect("time is valid")
.as_millis() as u64)
.try_into()
.expect("time is valid"),
kind: pdu.kind(),
content: pdu.content().clone(),
state_key: Some(pdu.state_key()),
prev_events: pdu.prev_event_ids(),
depth: *pdu.depth(),
auth_events: pdu.auth_events(),
redacts: pdu.redacts().cloned(),
unsigned: pdu.unsigned().clone().into_iter().collect(),
hashes: pdu.hashes().clone(),
signatures: pdu.signatures(),
}
}
}
impl PduEvent {
pub fn convert_for_state_res(&self) -> Arc<state_res::StateEvent> {
Arc::new(
serde_json::from_value(json!({
"event_id": self.event_id,
"room_id": self.room_id,
"sender": self.sender,
"origin_server_ts": self.origin_server_ts,
"type": self.kind,
"content": self.content,
"state_key": self.state_key,
"prev_events": self.prev_events
.iter()
// TODO How do we create one of these
.map(|id| (id, EventHash { sha256: "hello".into() }))
.collect::<Vec<_>>(),
"depth": self.depth,
"auth_events": self.auth_events
.iter()
// TODO How do we create one of these
.map(|id| (id, EventHash { sha256: "hello".into() }))
.collect::<Vec<_>>(),
"redacts": self.redacts,
"unsigned": self.unsigned,
"hashes": self.hashes,
"signatures": self.signatures,
}))
.expect("all conduit PDUs are state events"),
)
}
}
/// Build the start of a PDU in order to add it to the `Database`.
#[derive(Debug)]
#[derive(Debug, Deserialize)]
pub struct PduBuilder {
pub room_id: RoomId,
pub sender: UserId,
#[serde(rename = "type")]
pub event_type: EventType,
pub content: serde_json::Value,
pub unsigned: Option<serde_json::Map<String, serde_json::Value>>,

View file

@ -1,6 +1,9 @@
use crate::Error;
use ruma::identifiers::{DeviceId, UserId};
use std::{convert::TryInto, ops::Deref};
use ruma::{
api::{Outgoing, OutgoingRequest},
identifiers::{DeviceId, UserId},
};
use std::{convert::TryFrom, convert::TryInto, ops::Deref};
#[cfg(feature = "conduit_bin")]
use {
@ -16,21 +19,26 @@ use {
tokio::io::AsyncReadExt,
Request, State,
},
ruma::api::IncomingRequest,
std::io::Cursor,
};
/// This struct converts rocket requests into ruma structs by converting them into http requests
/// first.
pub struct Ruma<T> {
pub body: T,
pub struct Ruma<T: Outgoing> {
pub body: T::Incoming,
pub sender_id: Option<UserId>,
pub device_id: Option<Box<DeviceId>>,
pub json_body: Option<Box<serde_json::value::RawValue>>, // This is None when body is not a valid string
}
#[cfg(feature = "conduit_bin")]
impl<'a, T: IncomingRequest> FromTransformedData<'a> for Ruma<T> {
impl<'a, T: Outgoing + OutgoingRequest> FromTransformedData<'a> for Ruma<T>
where
<T as Outgoing>::Incoming: TryFrom<http::request::Request<std::vec::Vec<u8>>> + std::fmt::Debug,
<<T as Outgoing>::Incoming as std::convert::TryFrom<
http::request::Request<std::vec::Vec<u8>>,
>>::Error: std::fmt::Debug,
{
type Error = (); // TODO: Better error handling
type Owned = Data;
type Borrowed = Self::Owned;
@ -91,7 +99,7 @@ impl<'a, T: IncomingRequest> FromTransformedData<'a> for Ruma<T> {
let http_request = http_request.body(body.clone()).unwrap();
log::info!("{:?}", http_request);
match T::try_from(http_request) {
match <T as Outgoing>::Incoming::try_from(http_request) {
Ok(t) => Success(Ruma {
body: t,
sender_id: user_id,
@ -110,8 +118,8 @@ impl<'a, T: IncomingRequest> FromTransformedData<'a> for Ruma<T> {
}
}
impl<T> Deref for Ruma<T> {
type Target = T;
impl<T: Outgoing> Deref for Ruma<T> {
type Target = T::Incoming;
fn deref(&self) -> &Self::Target {
&self.body

View file

@ -1,25 +1,38 @@
use crate::{client_server, ConduitResult, Database, Error, Result, Ruma};
use http::header::{HeaderValue, AUTHORIZATION};
use crate::{client_server, ConduitResult, Database, Error, PduEvent, Result, Ruma};
use get_profile_information::v1::ProfileField;
use http::header::{HeaderValue, AUTHORIZATION, HOST};
use log::warn;
use rocket::{get, post, put, response::content::Json, State};
use ruma::api::federation::{
directory::get_public_rooms,
discovery::{
get_server_keys, get_server_version::v1 as get_server_version, ServerKey, VerifyKey,
use ruma::{
api::{
federation::{
directory::{get_public_rooms, get_public_rooms_filtered},
discovery::{
get_server_keys, get_server_version::v1 as get_server_version, ServerKey, VerifyKey,
},
event::get_missing_events,
query::get_profile_information,
transactions::send_transaction_message,
},
OutgoingRequest,
},
transactions::send_transaction_message,
directory::{IncomingFilter, IncomingRoomNetwork},
EventId, ServerName,
};
use ruma::api::{client, OutgoingRequest};
use serde_json::json;
use std::{
collections::BTreeMap,
convert::TryFrom,
fmt::Debug,
time::{Duration, SystemTime},
};
use trust_dns_resolver::AsyncResolver;
pub async fn request_well_known(db: &crate::Database, destination: &str) -> Option<String> {
pub async fn request_well_known(
globals: &crate::database::globals::Globals,
destination: &str,
) -> Option<String> {
let body: serde_json::Value = serde_json::from_str(
&db.globals
&globals
.reqwest_client()
.get(&format!(
"https://{}/.well-known/matrix/server",
@ -37,28 +50,62 @@ pub async fn request_well_known(db: &crate::Database, destination: &str) -> Opti
}
pub async fn send_request<T: OutgoingRequest>(
db: &crate::Database,
destination: String,
globals: &crate::database::globals::Globals,
destination: Box<ServerName>,
request: T,
) -> Result<T::IncomingResponse>
where
T: Debug,
{
if !globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled."));
}
let resolver = AsyncResolver::tokio_from_system_conf()
.await
.map_err(|_| Error::BadConfig("Failed to set up trust dns resolver with system config."))?;
let mut host = None;
let actual_destination = "https://".to_owned()
+ &request_well_known(db, &destination)
.await
.unwrap_or(destination.clone() + ":8448");
+ &if let Some(mut delegated_hostname) =
request_well_known(globals, &destination.as_str()).await
{
if let Ok(Some(srv)) = resolver
.srv_lookup(format!("_matrix._tcp.{}", delegated_hostname))
.await
.map(|srv| srv.iter().next().map(|result| result.target().to_string()))
{
host = Some(delegated_hostname);
srv.trim_end_matches('.').to_owned()
} else {
if delegated_hostname.find(':').is_none() {
delegated_hostname += ":8448";
}
delegated_hostname
}
} else {
let mut destination = destination.as_str().to_owned();
if destination.find(':').is_none() {
destination += ":8448";
}
destination
};
let mut http_request = request
.try_into_http_request(&actual_destination, Some(""))
.unwrap();
.map_err(|e| {
warn!("{}: {}", actual_destination, e);
Error::BadServerResponse("Invalid destination")
})?;
let mut request_map = serde_json::Map::new();
if !http_request.body().is_empty() {
request_map.insert(
"content".to_owned(),
serde_json::from_slice(http_request.body()).unwrap(),
serde_json::from_slice(http_request.body())
.expect("body is valid json, we just created it"),
);
};
@ -72,19 +119,16 @@ where
.to_string()
.into(),
);
request_map.insert(
"origin".to_owned(),
db.globals.server_name().as_str().into(),
);
request_map.insert("destination".to_owned(), destination.into());
request_map.insert("origin".to_owned(), globals.server_name().as_str().into());
request_map.insert("destination".to_owned(), destination.as_str().into());
let mut request_json = request_map.into();
ruma::signatures::sign_json(
db.globals.server_name().as_str(),
db.globals.keypair(),
globals.server_name().as_str(),
globals.keypair(),
&mut request_json,
)
.unwrap();
.expect("our request json is what ruma expects");
let signatures = request_json["signatures"]
.as_object()
@ -103,7 +147,7 @@ where
AUTHORIZATION,
HeaderValue::from_str(&format!(
"X-Matrix origin={},key=\"{}\",sig=\"{}\"",
db.globals.server_name(),
globals.server_name(),
s.0,
s.1
))
@ -112,10 +156,19 @@ where
}
}
let reqwest_request = reqwest::Request::try_from(http_request)
if let Some(host) = host {
http_request
.headers_mut()
.insert(HOST, HeaderValue::from_str(&host).unwrap());
}
let mut reqwest_request = reqwest::Request::try_from(http_request)
.expect("all http requests are valid reqwest requests");
let reqwest_response = db.globals.reqwest_client().execute(reqwest_request).await;
*reqwest_request.timeout_mut() = Some(Duration::from_secs(30));
let url = reqwest_request.url().clone();
let reqwest_response = globals.reqwest_client().execute(reqwest_request).await;
// Because reqwest::Response -> http::Response is complicated:
match reqwest_response {
@ -136,22 +189,30 @@ where
.unwrap()
.into_iter()
.collect();
Ok(
T::IncomingResponse::try_from(http_response.body(body).unwrap())
.expect("TODO: error handle other server errors"),
)
let response = T::IncomingResponse::try_from(
http_response
.body(body)
.expect("reqwest body is valid http body"),
);
response.map_err(|e| {
warn!(
"Server returned bad response {} ({}): {:?}",
destination, url, e
);
Error::BadServerResponse("Server returned bad response.")
})
}
Err(e) => Err(e.into()),
}
}
#[cfg_attr(feature = "conduit_bin", get("/.well-known/matrix/server"))]
pub fn well_known_server() -> Json<String> {
rocket::response::content::Json(json!({ "m.server": "pc.koesters.xyz:59003"}).to_string())
}
#[cfg_attr(feature = "conduit_bin", get("/_matrix/federation/v1/version"))]
pub fn get_server_version() -> ConduitResult<get_server_version::Response> {
pub fn get_server_version(db: State<'_, Database>) -> ConduitResult<get_server_version::Response> {
if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled."));
}
Ok(get_server_version::Response {
server: Some(get_server_version::Server {
name: Some("Conduit".to_owned()),
@ -163,6 +224,11 @@ pub fn get_server_version() -> ConduitResult<get_server_version::Response> {
#[cfg_attr(feature = "conduit_bin", get("/_matrix/key/v2/server"))]
pub fn get_server_keys(db: State<'_, Database>) -> Json<String> {
if !db.globals.federation_enabled() {
// TODO: Use proper types
return Json("Federation is disabled.".to_owned());
}
let mut verify_keys = BTreeMap::new();
verify_keys.insert(
format!("ed25519:{}", db.globals.keypair().version()),
@ -202,47 +268,28 @@ pub fn get_server_keys_deprecated(db: State<'_, Database>) -> Json<String> {
feature = "conduit_bin",
post("/_matrix/federation/v1/publicRooms", data = "<body>")
)]
pub async fn get_public_rooms_route(
pub async fn get_public_rooms_filtered_route(
db: State<'_, Database>,
body: Ruma<get_public_rooms::v1::Request>,
) -> ConduitResult<get_public_rooms::v1::Response> {
let Ruma {
body:
get_public_rooms::v1::Request {
room_network: _room_network, // TODO
limit,
since,
},
sender_id,
device_id,
json_body,
} = body;
body: Ruma<get_public_rooms_filtered::v1::Request<'_>>,
) -> ConduitResult<get_public_rooms_filtered::v1::Response> {
if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled."));
}
let client::r0::directory::get_public_rooms_filtered::Response {
chunk,
prev_batch,
next_batch,
total_room_count_estimate,
} = client_server::get_public_rooms_filtered_route(
db,
Ruma {
body: client::r0::directory::get_public_rooms_filtered::IncomingRequest {
filter: None,
limit,
room_network: client::r0::directory::get_public_rooms_filtered::RoomNetwork::Matrix,
server: None,
since,
},
sender_id,
device_id,
json_body,
},
let response = client_server::get_public_rooms_filtered_helper(
&db,
None,
body.limit,
body.since.as_deref(),
&body.filter,
&body.room_network,
)
.await?
.0;
Ok(get_public_rooms::v1::Response {
chunk: chunk
Ok(get_public_rooms_filtered::v1::Response {
chunk: response
.chunk
.into_iter()
.map(|c| {
// Convert ruma::api::federation::directory::get_public_rooms::v1::PublicRoomsChunk
@ -257,9 +304,56 @@ pub async fn get_public_rooms_route(
})
.filter_map(|r| r.ok())
.collect(),
prev_batch,
next_batch,
total_room_count_estimate,
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
}
.into())
}
#[cfg_attr(
feature = "conduit_bin",
get("/_matrix/federation/v1/publicRooms", data = "<body>")
)]
pub async fn get_public_rooms_route(
db: State<'_, Database>,
body: Ruma<get_public_rooms::v1::Request<'_>>,
) -> ConduitResult<get_public_rooms::v1::Response> {
if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled."));
}
let response = client_server::get_public_rooms_filtered_helper(
&db,
None,
body.limit,
body.since.as_deref(),
&IncomingFilter::default(),
&IncomingRoomNetwork::Matrix,
)
.await?
.0;
Ok(get_public_rooms::v1::Response {
chunk: response
.chunk
.into_iter()
.map(|c| {
// Convert ruma::api::federation::directory::get_public_rooms::v1::PublicRoomsChunk
// to ruma::api::client::r0::directory::PublicRoomsChunk
Ok::<_, Error>(
serde_json::from_str(
&serde_json::to_string(&c)
.expect("PublicRoomsChunk::to_string always works"),
)
.expect("federation and client-server PublicRoomsChunk are the same type"),
)
})
.filter_map(|r| r.ok())
.collect(),
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
}
.into())
}
@ -268,13 +362,152 @@ pub async fn get_public_rooms_route(
feature = "conduit_bin",
put("/_matrix/federation/v1/send/<_>", data = "<body>")
)]
pub fn send_transaction_message_route(
db: State<'_, Database>,
body: Ruma<send_transaction_message::v1::Request>,
pub fn send_transaction_message_route<'a>(
db: State<'a, Database>,
body: Ruma<send_transaction_message::v1::Request<'_>>,
) -> ConduitResult<send_transaction_message::v1::Response> {
dbg!(&*body);
if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled."));
}
//dbg!(&*body);
for pdu in &body.pdus {
let mut value = serde_json::from_str(pdu.json().get())
.expect("converting raw jsons to values always works");
let event_id = EventId::try_from(&*format!(
"${}",
ruma::signatures::reference_hash(&value).expect("ruma can calculate reference hashes")
))
.expect("ruma's reference hashes are valid event ids");
value
.as_object_mut()
.expect("ruma pdus are json objects")
.insert("event_id".to_owned(), event_id.to_string().into());
let pdu = serde_json::from_value::<PduEvent>(value.clone())
.expect("all ruma pdus are conduit pdus");
if db.rooms.exists(&pdu.room_id)? {
let pdu_id =
db.rooms
.append_pdu(&pdu, &value, &db.globals, &db.account_data, &db.sending)?;
db.rooms.append_to_state(&pdu_id, &pdu)?;
}
}
Ok(send_transaction_message::v1::Response {
pdus: BTreeMap::new(),
}
.into())
}
#[cfg_attr(
feature = "conduit_bin",
post("/_matrix/federation/v1/get_missing_events/<_>", data = "<body>")
)]
pub fn get_missing_events_route<'a>(
db: State<'a, Database>,
body: Ruma<get_missing_events::v1::Request<'_>>,
) -> ConduitResult<get_missing_events::v1::Response> {
if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled."));
}
let mut queued_events = body.latest_events.clone();
let mut events = Vec::new();
let mut i = 0;
while i < queued_events.len() && events.len() < u64::from(body.limit) as usize {
if let Some(pdu) = db.rooms.get_pdu_json(&queued_events[i])? {
if body.earliest_events.contains(
&serde_json::from_value(
pdu.get("event_id")
.cloned()
.ok_or_else(|| Error::bad_database("Event in db has no event_id field."))?,
)
.map_err(|_| Error::bad_database("Invalid event_id field in pdu in db."))?,
) {
i += 1;
continue;
}
queued_events.extend_from_slice(
&serde_json::from_value::<Vec<EventId>>(
pdu.get("prev_events").cloned().ok_or_else(|| {
Error::bad_database("Invalid prev_events field of pdu in db.")
})?,
)
.map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?,
);
events.push(PduEvent::convert_to_outgoing_federation_event(pdu));
}
i += 1;
}
dbg!(&events);
Ok(get_missing_events::v1::Response { events }.into())
}
#[cfg_attr(
feature = "conduit_bin",
get("/_matrix/federation/v1/query/profile", data = "<body>")
)]
pub fn get_profile_information_route<'a>(
db: State<'a, Database>,
body: Ruma<get_profile_information::v1::Request<'_>>,
) -> ConduitResult<get_profile_information::v1::Response> {
if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled."));
}
let mut displayname = None;
let mut avatar_url = None;
match body.field {
Some(ProfileField::DisplayName) => displayname = db.users.displayname(&body.user_id)?,
Some(ProfileField::AvatarUrl) => avatar_url = db.users.avatar_url(&body.user_id)?,
None => {
displayname = db.users.displayname(&body.user_id)?;
avatar_url = db.users.avatar_url(&body.user_id)?;
}
}
Ok(get_profile_information::v1::Response {
displayname,
avatar_url,
}
.into())
}
/*
#[cfg_attr(
feature = "conduit_bin",
get("/_matrix/federation/v2/invite/<_>/<_>", data = "<body>")
)]
pub fn get_user_devices_route<'a>(
db: State<'a, Database>,
body: Ruma<membership::v1::Request<'_>>,
) -> ConduitResult<get_profile_information::v1::Response> {
if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled."));
}
let mut displayname = None;
let mut avatar_url = None;
match body.field {
Some(ProfileField::DisplayName) => displayname = db.users.displayname(&body.user_id)?,
Some(ProfileField::AvatarUrl) => avatar_url = db.users.avatar_url(&body.user_id)?,
None => {
displayname = db.users.displayname(&body.user_id)?;
avatar_url = db.users.avatar_url(&body.user_id)?;
}
}
Ok(get_profile_information::v1::Response {
displayname,
avatar_url,
}
.into())
}
*/

View file

@ -1,59 +0,0 @@
use std::collections::HashMap;
fn stateres(state_a: HashMap<StateTuple, PduEvent>, state_b: HashMap<StateTuple, PduEvent>) {
let mut unconflicted = todo!("state at fork event");
let mut conflicted: HashMap<StateTuple, PduEvent> = state_a
.iter()
.filter(|(key_a, value_a)| match state_b.remove(key_a) {
Some(value_b) if value_a == value_b => unconflicted.insert(key_a, value_a),
_ => false,
})
.collect();
// We removed unconflicted from state_b, now we can easily insert all events that are only in fork b
conflicted.extend(state_b);
let partial_state = unconflicted.clone();
let full_conflicted = conflicted.clone(); // TODO: auth events
let output_rev = Vec::new();
let event_map = HashMap::new();
let incoming_edges = HashMap::new();
for event in full_conflicted {
event_map.insert(event.event_id, event);
incoming_edges.insert(event.event_id, 0);
}
for e in conflicted_control_events {
for a in e.auth_events {
incoming_edges[a.event_id] += 1;
}
}
while incoming_edges.len() > 0 {
let mut count_0 = incoming_edges
.iter()
.filter(|(_, c)| c == 0)
.collect::<Vec<_>>();
count_0.sort_by(|(x, _), (y, _)| {
x.power_level
.cmp(&a.power_level)
.then_with(|| x.origin_server.ts.cmp(&y.origin_server_ts))
.then_with(|| x.event_id.cmp(&y.event_id))
});
for (id, count) in count_0 {
output_rev.push(event_map[id]);
for auth_event in event_map[id].auth_events {
incoming_edges[auth_event.event_id] -= 1;
}
incoming_edges.remove(id);
}
}
}

View file

@ -29,8 +29,13 @@ pub fn increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
pub fn generate_keypair(old: Option<&[u8]>) -> Option<Vec<u8>> {
Some(old.map(|s| s.to_vec()).unwrap_or_else(|| {
ruma::signatures::Ed25519KeyPair::generate()
.expect("Ed25519KeyPair generation always works (?)")
let mut value = random_string(8).as_bytes().to_vec();
value.push(0xff);
value.extend_from_slice(
&ruma::signatures::Ed25519KeyPair::generate()
.expect("Ed25519KeyPair generation always works (?)"),
);
value
}))
}