From 46d8a46e1f79420ab14fd7c84c431671216039bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Thu, 19 Aug 2021 11:01:18 +0200 Subject: [PATCH] improvement: faster incoming transaction handling --- Cargo.lock | 114 ++++++++----- Cargo.toml | 8 +- src/client_server/account.rs | 4 + src/client_server/membership.rs | 5 + src/client_server/room.rs | 2 + src/client_server/session.rs | 7 +- src/database.rs | 2 +- src/database/rooms.rs | 39 +++++ src/database/uiaa.rs | 166 ++++++++----------- src/main.rs | 11 +- src/pdu.rs | 4 +- src/server_server.rs | 283 ++++++++++++++++++-------------- 12 files changed, 365 insertions(+), 280 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ed4ee7b..83e21a3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -248,7 +248,7 @@ dependencies = [ "jsonwebtoken", "lru-cache", "num_cpus", - "opentelemetry", + "opentelemetry 0.16.0", "opentelemetry-jaeger", "parking_lot", "pretty_env_logger", @@ -1466,16 +1466,46 @@ dependencies = [ ] [[package]] -name = "opentelemetry-jaeger" -version = "0.14.0" +name = "opentelemetry" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09a9fc8192722e7daa0c56e59e2336b797122fb8598383dcb11c8852733b435c" +checksum = "e1cf9b1c4e9a6c4de793c632496fa490bdc0e1eea73f0c91394f7b6990935d22" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures", + "js-sys", + "lazy_static", + "percent-encoding", + "pin-project", + "rand 0.8.4", + "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "opentelemetry-jaeger" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db22f492873ea037bc267b35a0e8e4fb846340058cb7c864efe3d0bf23684593" dependencies = [ "async-trait", "lazy_static", - "opentelemetry", + "opentelemetry 0.16.0", + "opentelemetry-semantic-conventions", "thiserror", "thrift", + "tokio", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffeac823339e8b0f27b961f4385057bf9f97f2863bc745bd015fd6091f2270e9" +dependencies = [ + "opentelemetry 0.16.0", ] [[package]] @@ -2014,8 +2044,8 @@ dependencies = [ [[package]] name = "ruma" -version = "0.2.0" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.3.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "assign", "js_int", @@ -2035,8 +2065,8 @@ dependencies = [ [[package]] name = "ruma-api" -version = "0.17.1" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.18.3" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "bytes", "http", @@ -2051,8 +2081,8 @@ dependencies = [ [[package]] name = "ruma-api-macros" -version = "0.17.1" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.18.3" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -2062,8 +2092,8 @@ dependencies = [ [[package]] name = "ruma-appservice-api" -version = "0.3.0" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.4.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "ruma-api", "ruma-common", @@ -2076,8 +2106,8 @@ dependencies = [ [[package]] name = "ruma-client-api" -version = "0.11.0" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.12.2" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "assign", "bytes", @@ -2096,8 +2126,8 @@ dependencies = [ [[package]] name = "ruma-common" -version = "0.5.4" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.6.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "indexmap", "js_int", @@ -2111,8 +2141,8 @@ dependencies = [ [[package]] name = "ruma-events" -version = "0.23.2" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.24.4" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "indoc", "js_int", @@ -2127,8 +2157,8 @@ dependencies = [ [[package]] name = "ruma-events-macros" -version = "0.23.2" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.24.4" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -2138,8 +2168,8 @@ dependencies = [ [[package]] name = "ruma-federation-api" -version = "0.2.0" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.3.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "js_int", "ruma-api", @@ -2153,8 +2183,8 @@ dependencies = [ [[package]] name = "ruma-identifiers" -version = "0.19.4" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.20.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "paste", "rand 0.8.4", @@ -2167,8 +2197,8 @@ dependencies = [ [[package]] name = "ruma-identifiers-macros" -version = "0.19.4" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.20.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "quote", "ruma-identifiers-validation", @@ -2177,13 +2207,13 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" -version = "0.4.0" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.5.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" [[package]] name = "ruma-identity-service-api" -version = "0.2.0" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.3.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "js_int", "ruma-api", @@ -2195,8 +2225,8 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" -version = "0.2.0" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.3.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "js_int", "ruma-api", @@ -2210,8 +2240,8 @@ dependencies = [ [[package]] name = "ruma-serde" -version = "0.4.1" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.5.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "bytes", "form_urlencoded", @@ -2224,8 +2254,8 @@ dependencies = [ [[package]] name = "ruma-serde-macros" -version = "0.4.1" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.5.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -2235,8 +2265,8 @@ dependencies = [ [[package]] name = "ruma-signatures" -version = "0.8.0" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.9.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "base64 0.13.0", "ed25519-dalek", @@ -2252,8 +2282,8 @@ dependencies = [ [[package]] name = "ruma-state-res" -version = "0.2.0" -source = "git+https://github.com/timokoesters/ruma?rev=a2d93500e1dbc87e7032a3c74f3b2479a7f84e93#a2d93500e1dbc87e7032a3c74f3b2479a7f84e93" +version = "0.3.0" +source = "git+https://github.com/ruma/ruma?rev=f5ab038e22421ed338396ece977b6b2844772ced#f5ab038e22421ed338396ece977b6b2844772ced" dependencies = [ "itertools 0.10.1", "js_int", @@ -3022,7 +3052,7 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c47440f2979c4cd3138922840eec122e3c0ba2148bc290f756bd7fd60fc97fff" dependencies = [ - "opentelemetry", + "opentelemetry 0.15.0", "tracing", "tracing-core", "tracing-log", diff --git a/Cargo.toml b/Cargo.toml index 3d18bfb7..69b54c86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,8 @@ edition = "2018" rocket = { version = "0.5.0-rc.1", features = ["tls"] } # Used to handle requests # Used for matrix spec type definitions and helpers -#ruma = { git = "https://github.com/ruma/ruma", rev = "eb19b0e08a901b87d11b3be0890ec788cc760492", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } -ruma = { git = "https://github.com/timokoesters/ruma", rev = "a2d93500e1dbc87e7032a3c74f3b2479a7f84e93", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } +ruma = { git = "https://github.com/ruma/ruma", rev = "f5ab038e22421ed338396ece977b6b2844772ced", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } +#ruma = { git = "https://github.com/timokoesters/ruma", rev = "995ccea20f5f6d4a8fb22041749ed4de22fa1b6a", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } #ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } # Used for long polling and federation sender, should be the same as rocket::tokio @@ -66,11 +66,11 @@ regex = "1.5.4" jsonwebtoken = "7.2.0" # Performance measurements tracing = { version = "0.1.26", features = ["release_max_level_warn"] } -opentelemetry = "0.15.0" tracing-subscriber = "0.2.19" tracing-opentelemetry = "0.14.0" tracing-flame = "0.1.0" -opentelemetry-jaeger = "0.14.0" +opentelemetry = { version = "0.16.0", features = ["rt-tokio"] } +opentelemetry-jaeger = { version = "0.15.0", features = ["rt-tokio"] } pretty_env_logger = "0.4.0" lru-cache = "0.1.2" rusqlite = { version = "0.25.3", optional = true, features = ["bundled"] } diff --git a/src/client_server/account.rs b/src/client_server/account.rs index b00882a6..e68c957f 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -292,6 +292,7 @@ pub async fn register_route( is_direct: None, third_party_invite: None, blurhash: None, + reason: None, }) .expect("event is valid, we just created it"), unsigned: None, @@ -457,6 +458,7 @@ pub async fn register_route( is_direct: None, third_party_invite: None, blurhash: None, + reason: None, }) .expect("event is valid, we just created it"), unsigned: None, @@ -478,6 +480,7 @@ pub async fn register_route( is_direct: None, third_party_invite: None, blurhash: None, + reason: None, }) .expect("event is valid, we just created it"), unsigned: None, @@ -683,6 +686,7 @@ pub async fn deactivate_route( is_direct: None, third_party_invite: None, blurhash: None, + reason: None, }; let mutex_state = Arc::clone( diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index de6fa5a1..222d204f 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -262,6 +262,7 @@ pub async fn ban_user_route( is_direct: None, third_party_invite: None, blurhash: db.users.blurhash(&body.user_id)?, + reason: None, }), |event| { let mut event = serde_json::from_value::>( @@ -563,6 +564,7 @@ async fn join_room_by_id_helper( is_direct: None, third_party_invite: None, blurhash: db.users.blurhash(&sender_user)?, + reason: None, }) .expect("event is valid, we just created it"), ); @@ -695,6 +697,7 @@ async fn join_room_by_id_helper( is_direct: None, third_party_invite: None, blurhash: db.users.blurhash(&sender_user)?, + reason: None, }; db.rooms.build_and_append_pdu( @@ -846,6 +849,7 @@ pub async fn invite_helper<'a>( membership: MembershipState::Invite, third_party_invite: None, blurhash: None, + reason: None, }) .expect("member event is valid value"); @@ -1040,6 +1044,7 @@ pub async fn invite_helper<'a>( is_direct: Some(is_direct), third_party_invite: None, blurhash: db.users.blurhash(&user_id)?, + reason: None, }) .expect("event is valid, we just created it"), unsigned: None, diff --git a/src/client_server/room.rs b/src/client_server/room.rs index c323be4a..25412788 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -107,6 +107,7 @@ pub async fn create_room_route( is_direct: Some(body.is_direct), third_party_invite: None, blurhash: db.users.blurhash(&sender_user)?, + reason: None, }) .expect("event is valid, we just created it"), unsigned: None, @@ -517,6 +518,7 @@ pub async fn upgrade_room_route( is_direct: None, third_party_invite: None, blurhash: db.users.blurhash(&sender_user)?, + reason: None, }) .expect("event is valid, we just created it"), unsigned: None, diff --git a/src/client_server/session.rs b/src/client_server/session.rs index d4d3c033..dada2d50 100644 --- a/src/client_server/session.rs +++ b/src/client_server/session.rs @@ -3,7 +3,10 @@ use crate::{database::DatabaseGuard, utils, ConduitResult, Error, Ruma}; use ruma::{ api::client::{ error::ErrorKind, - r0::session::{get_login_types, login, logout, logout_all}, + r0::{ + session::{get_login_types, login, logout, logout_all}, + uiaa::IncomingUserIdentifier, + }, }, UserId, }; @@ -60,7 +63,7 @@ pub async fn login_route( identifier, password, } => { - let username = if let login::IncomingUserIdentifier::MatrixId(matrix_id) = identifier { + let username = if let IncomingUserIdentifier::MatrixId(matrix_id) = identifier { matrix_id } else { return Err(Error::BadRequest(ErrorKind::Forbidden, "Bad login type.")); diff --git a/src/database.rs b/src/database.rs index 5ad2add8..bfc33f26 100644 --- a/src/database.rs +++ b/src/database.rs @@ -280,7 +280,7 @@ impl Database { shorteventid_cache: Mutex::new(LruCache::new(1_000_000)), eventidshort_cache: Mutex::new(LruCache::new(1_000_000)), statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), - stateinfo_cache: Mutex::new(LruCache::new(1000)), + stateinfo_cache: Mutex::new(LruCache::new(50)), }, account_data: account_data::AccountData { roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 600566c1..adb748d0 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -110,6 +110,7 @@ pub struct Rooms { impl Rooms { /// Builds a StateMap by iterating over all keys that start /// with state_hash, this gives the full state for the given state_hash. + #[tracing::instrument(skip(self))] pub fn state_full_ids(&self, shortstatehash: u64) -> Result> { let full_state = self .load_shortstatehash_info(shortstatehash)? @@ -122,6 +123,7 @@ impl Rooms { .collect() } + #[tracing::instrument(skip(self))] pub fn state_full( &self, shortstatehash: u64, @@ -220,6 +222,7 @@ impl Rooms { } /// This fetches auth events from the current state. + #[tracing::instrument(skip(self))] pub fn get_auth_events( &self, room_id: &RoomId, @@ -261,6 +264,7 @@ impl Rooms { } /// Checks if a room exists. + #[tracing::instrument(skip(self))] pub fn exists(&self, room_id: &RoomId) -> Result { let prefix = match self.get_shortroomid(room_id)? { Some(b) => b.to_be_bytes().to_vec(), @@ -277,6 +281,7 @@ impl Rooms { } /// Checks if a room exists. + #[tracing::instrument(skip(self))] pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result>> { let prefix = self .get_shortroomid(room_id)? @@ -300,6 +305,7 @@ impl Rooms { /// Force the creation of a new StateHash and insert it into the db. /// /// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot. + #[tracing::instrument(skip(self, new_state, db))] pub fn force_state( &self, room_id: &RoomId, @@ -412,6 +418,7 @@ impl Rooms { } /// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer. + #[tracing::instrument(skip(self))] pub fn load_shortstatehash_info( &self, shortstatehash: u64, @@ -480,6 +487,7 @@ impl Rooms { } } + #[tracing::instrument(skip(self, globals))] pub fn compress_state_event( &self, shortstatekey: u64, @@ -495,6 +503,7 @@ impl Rooms { Ok(v.try_into().expect("we checked the size above")) } + #[tracing::instrument(skip(self, compressed_event))] pub fn parse_compressed_state_event( &self, compressed_event: CompressedStateEvent, @@ -518,6 +527,13 @@ impl Rooms { /// * `statediffremoved` - Removed from base. Each vec is shortstatekey+shorteventid /// * `diff_to_sibling` - Approximately how much the diff grows each time for this layer /// * `parent_states` - A stack with info on shortstatehash, full state, added diff and removed diff for each parent layer + #[tracing::instrument(skip( + self, + statediffnew, + statediffremoved, + diff_to_sibling, + parent_states + ))] pub fn save_state_from_diff( &self, shortstatehash: u64, @@ -642,6 +658,7 @@ impl Rooms { } /// Returns (shortstatehash, already_existed) + #[tracing::instrument(skip(self, globals))] fn get_or_create_shortstatehash( &self, state_hash: &StateHashId, @@ -662,6 +679,7 @@ impl Rooms { }) } + #[tracing::instrument(skip(self, globals))] pub fn get_or_create_shorteventid( &self, event_id: &EventId, @@ -692,6 +710,7 @@ impl Rooms { Ok(short) } + #[tracing::instrument(skip(self))] pub fn get_shortroomid(&self, room_id: &RoomId) -> Result> { self.roomid_shortroomid .get(&room_id.as_bytes())? @@ -702,6 +721,7 @@ impl Rooms { .transpose() } + #[tracing::instrument(skip(self))] pub fn get_shortstatekey( &self, event_type: &EventType, @@ -739,6 +759,7 @@ impl Rooms { Ok(short) } + #[tracing::instrument(skip(self, globals))] pub fn get_or_create_shortroomid( &self, room_id: &RoomId, @@ -756,6 +777,7 @@ impl Rooms { }) } + #[tracing::instrument(skip(self, globals))] pub fn get_or_create_shortstatekey( &self, event_type: &EventType, @@ -794,6 +816,7 @@ impl Rooms { Ok(short) } + #[tracing::instrument(skip(self))] pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result { if let Some(id) = self .shorteventid_cache @@ -876,12 +899,14 @@ impl Rooms { } /// Returns the `count` of this pdu's id. + #[tracing::instrument(skip(self))] pub fn get_pdu_count(&self, event_id: &EventId) -> Result> { self.eventid_pduid .get(event_id.as_bytes())? .map_or(Ok(None), |pdu_id| self.pdu_count(&pdu_id).map(Some)) } + #[tracing::instrument(skip(self))] pub fn latest_pdu_count(&self, room_id: &RoomId) -> Result { let prefix = self .get_shortroomid(room_id)? @@ -902,6 +927,7 @@ impl Rooms { } /// Returns the json of a pdu. + #[tracing::instrument(skip(self))] pub fn get_pdu_json(&self, event_id: &EventId) -> Result> { self.eventid_pduid .get(event_id.as_bytes())? @@ -920,6 +946,7 @@ impl Rooms { } /// Returns the json of a pdu. + #[tracing::instrument(skip(self))] pub fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result> { self.eventid_outlierpdu .get(event_id.as_bytes())? @@ -930,6 +957,7 @@ impl Rooms { } /// Returns the json of a pdu. + #[tracing::instrument(skip(self))] pub fn get_non_outlier_pdu_json( &self, event_id: &EventId, @@ -951,6 +979,7 @@ impl Rooms { } /// Returns the pdu's id. + #[tracing::instrument(skip(self))] pub fn get_pdu_id(&self, event_id: &EventId) -> Result>> { self.eventid_pduid .get(event_id.as_bytes())? @@ -960,6 +989,7 @@ impl Rooms { /// Returns the pdu. /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. + #[tracing::instrument(skip(self))] pub fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result> { self.eventid_pduid .get(event_id.as_bytes())? @@ -980,6 +1010,7 @@ impl Rooms { /// Returns the pdu. /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. + #[tracing::instrument(skip(self))] pub fn get_pdu(&self, event_id: &EventId) -> Result>> { if let Some(p) = self.pdu_cache.lock().unwrap().get_mut(&event_id) { return Ok(Some(Arc::clone(p))); @@ -1019,6 +1050,7 @@ impl Rooms { /// Returns the pdu. /// /// This does __NOT__ check the outliers `Tree`. + #[tracing::instrument(skip(self))] pub fn get_pdu_from_id(&self, pdu_id: &[u8]) -> Result> { self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { Ok(Some( @@ -1029,6 +1061,7 @@ impl Rooms { } /// Returns the pdu as a `BTreeMap`. + #[tracing::instrument(skip(self))] pub fn get_pdu_json_from_id(&self, pdu_id: &[u8]) -> Result> { self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { Ok(Some( @@ -1039,6 +1072,7 @@ impl Rooms { } /// Removes a pdu and creates a new one with the same id. + #[tracing::instrument(skip(self))] fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> { if self.pduid_pdu.get(&pdu_id)?.is_some() { self.pduid_pdu.insert( @@ -2298,6 +2332,7 @@ impl Rooms { Ok(()) } + #[tracing::instrument(skip(self))] pub fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { let mut joinedcount = 0_u64; let mut joined_servers = HashSet::new(); @@ -2347,6 +2382,7 @@ impl Rooms { Ok(()) } + #[tracing::instrument(skip(self, db))] pub async fn leave_room( &self, user_id: &UserId, @@ -2419,6 +2455,7 @@ impl Rooms { Ok(()) } + #[tracing::instrument(skip(self, db))] async fn remote_leave_room( &self, user_id: &UserId, @@ -2650,6 +2687,7 @@ impl Rooms { }) } + #[tracing::instrument(skip(self))] pub fn search_pdus<'a>( &'a self, room_id: &RoomId, @@ -2809,6 +2847,7 @@ impl Rooms { }) } + #[tracing::instrument(skip(self))] pub fn room_joined_count(&self, room_id: &RoomId) -> Result> { Ok(self .roomid_joinedcount diff --git a/src/database/uiaa.rs b/src/database/uiaa.rs index 1372fef5..8a3fe4f0 100644 --- a/src/database/uiaa.rs +++ b/src/database/uiaa.rs @@ -4,11 +4,14 @@ use crate::{client_server::SESSION_ID_LENGTH, utils, Error, Result}; use ruma::{ api::client::{ error::ErrorKind, - r0::uiaa::{IncomingAuthData, UiaaInfo}, + r0::uiaa::{ + IncomingAuthData, IncomingPassword, IncomingUserIdentifier::MatrixId, UiaaInfo, + }, }, signatures::CanonicalJsonValue, DeviceId, UserId, }; +use tracing::error; use super::abstraction::Tree; @@ -49,126 +52,91 @@ impl Uiaa { users: &super::users::Users, globals: &super::globals::Globals, ) -> Result<(bool, UiaaInfo)> { - if let IncomingAuthData::DirectRequest { - kind, - session, - auth_parameters, - } = &auth - { - let mut uiaainfo = session - .as_ref() - .map(|session| self.get_uiaa_session(&user_id, &device_id, session)) - .unwrap_or_else(|| Ok(uiaainfo.clone()))?; + let mut uiaainfo = auth + .session() + .map(|session| self.get_uiaa_session(&user_id, &device_id, session)) + .unwrap_or_else(|| Ok(uiaainfo.clone()))?; - if uiaainfo.session.is_none() { - uiaainfo.session = Some(utils::random_string(SESSION_ID_LENGTH)); - } + if uiaainfo.session.is_none() { + uiaainfo.session = Some(utils::random_string(SESSION_ID_LENGTH)); + } + match auth { // Find out what the user completed - match &**kind { - "m.login.password" => { - let identifier = auth_parameters.get("identifier").ok_or(Error::BadRequest( - ErrorKind::MissingParam, - "m.login.password needs identifier.", - ))?; - - let identifier_type = identifier.get("type").ok_or(Error::BadRequest( - ErrorKind::MissingParam, - "Identifier needs a type.", - ))?; - - if identifier_type != "m.id.user" { + IncomingAuthData::Password(IncomingPassword { + identifier, + password, + .. + }) => { + let username = match identifier { + MatrixId(username) => username, + _ => { return Err(Error::BadRequest( ErrorKind::Unrecognized, "Identifier type not recognized.", - )); + )) } + }; - let username = identifier - .get("user") - .ok_or(Error::BadRequest( - ErrorKind::MissingParam, - "Identifier needs user field.", - ))? - .as_str() - .ok_or(Error::BadRequest( - ErrorKind::BadJson, - "User is not a string.", - ))?; - - let user_id = UserId::parse_with_server_name(username, globals.server_name()) + let user_id = + UserId::parse_with_server_name(username.clone(), globals.server_name()) .map_err(|_| { - Error::BadRequest(ErrorKind::InvalidParam, "User ID is invalid.") - })?; + Error::BadRequest(ErrorKind::InvalidParam, "User ID is invalid.") + })?; - let password = auth_parameters - .get("password") - .ok_or(Error::BadRequest( - ErrorKind::MissingParam, - "Password is missing.", - ))? - .as_str() - .ok_or(Error::BadRequest( - ErrorKind::BadJson, - "Password is not a string.", - ))?; + // Check if password is correct + if let Some(hash) = users.password_hash(&user_id)? { + let hash_matches = + argon2::verify_encoded(&hash, password.as_bytes()).unwrap_or(false); - // Check if password is correct - if let Some(hash) = users.password_hash(&user_id)? { - let hash_matches = - argon2::verify_encoded(&hash, password.as_bytes()).unwrap_or(false); - - if !hash_matches { - uiaainfo.auth_error = Some(ruma::api::client::error::ErrorBody { - kind: ErrorKind::Forbidden, - message: "Invalid username or password.".to_owned(), - }); - return Ok((false, uiaainfo)); - } - } - - // Password was correct! Let's add it to `completed` - uiaainfo.completed.push("m.login.password".to_owned()); - } - "m.login.dummy" => { - uiaainfo.completed.push("m.login.dummy".to_owned()); - } - k => panic!("type not supported: {}", k), - } - - // Check if a flow now succeeds - let mut completed = false; - 'flows: for flow in &mut uiaainfo.flows { - for stage in &flow.stages { - if !uiaainfo.completed.contains(stage) { - continue 'flows; + if !hash_matches { + uiaainfo.auth_error = Some(ruma::api::client::error::ErrorBody { + kind: ErrorKind::Forbidden, + message: "Invalid username or password.".to_owned(), + }); + return Ok((false, uiaainfo)); } } - // We didn't break, so this flow succeeded! - completed = true; - } - if !completed { - self.update_uiaa_session( - user_id, - device_id, - uiaainfo.session.as_ref().expect("session is always set"), - Some(&uiaainfo), - )?; - return Ok((false, uiaainfo)); + // Password was correct! Let's add it to `completed` + uiaainfo.completed.push("m.login.password".to_owned()); } + IncomingAuthData::Dummy(_) => { + uiaainfo.completed.push("m.login.dummy".to_owned()); + } + k => error!("type not supported: {:?}", k), + } - // UIAA was successful! Remove this session and return true + // Check if a flow now succeeds + let mut completed = false; + 'flows: for flow in &mut uiaainfo.flows { + for stage in &flow.stages { + if !uiaainfo.completed.contains(stage) { + continue 'flows; + } + } + // We didn't break, so this flow succeeded! + completed = true; + } + + if !completed { self.update_uiaa_session( user_id, device_id, uiaainfo.session.as_ref().expect("session is always set"), - None, + Some(&uiaainfo), )?; - Ok((true, uiaainfo)) - } else { - panic!("FallbackAcknowledgement is not supported yet"); + return Ok((false, uiaainfo)); } + + // UIAA was successful! Remove this session and return true + self.update_uiaa_session( + user_id, + device_id, + uiaainfo.session.as_ref().expect("session is always set"), + None, + )?; + Ok((true, uiaainfo)) } fn set_uiaa_request( diff --git a/src/main.rs b/src/main.rs index 5a6f8c76..72f753fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use database::Config; pub use database::Database; pub use error::{Error, Result}; -use opentelemetry::trace::Tracer; +use opentelemetry::trace::{FutureExt, Tracer}; pub use pdu::PduEvent; pub use rocket::State; use ruma::api::client::error::ErrorKind; @@ -220,14 +220,17 @@ async fn main() { }; if config.allow_jaeger { + opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); let tracer = opentelemetry_jaeger::new_pipeline() - .with_service_name("conduit") - .install_simple() + .install_batch(opentelemetry::runtime::Tokio) .unwrap(); let span = tracer.start("conduit"); - start.await; + start.with_current_context().await; drop(span); + + println!("exporting"); + opentelemetry::global::shutdown_tracer_provider(); } else { std::env::set_var("RUST_LOG", &config.log); diff --git a/src/pdu.rs b/src/pdu.rs index 00eda5b1..1016fe66 100644 --- a/src/pdu.rs +++ b/src/pdu.rs @@ -12,7 +12,7 @@ use ruma::{ use serde::{Deserialize, Serialize}; use serde_json::json; use std::{cmp::Ordering, collections::BTreeMap, convert::TryFrom}; -use tracing::error; +use tracing::warn; #[derive(Clone, Deserialize, Serialize, Debug)] pub struct PduEvent { @@ -322,7 +322,7 @@ pub(crate) fn gen_event_id_canonical_json( pdu: &Raw, ) -> crate::Result<(EventId, CanonicalJsonObject)> { let value = serde_json::from_str(pdu.json().get()).map_err(|e| { - error!("{:?}: {:?}", pdu, e); + warn!("Error parsing incoming event {:?}: {:?}", pdu, e); Error::BadServerResponse("Invalid PDU in server response") })?; diff --git a/src/server_server.rs b/src/server_server.rs index 49f225f1..56b28f27 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -111,7 +111,7 @@ impl FedDest { } } -#[tracing::instrument(skip(globals))] +#[tracing::instrument(skip(globals, request))] pub async fn send_request( globals: &crate::database::globals::Globals, destination: &ServerName, @@ -501,7 +501,7 @@ pub fn get_server_keys_route(db: DatabaseGuard) -> Json { ) .unwrap(); - Json(ruma::serde::to_canonical_json_string(&response).expect("JSON is canonical")) + Json(serde_json::to_string(&response).expect("JSON is canonical")) } #[cfg_attr(feature = "conduit_bin", get("/_matrix/key/v2/server/<_>"))] @@ -927,12 +927,17 @@ pub async fn handle_incoming_pdu<'a>( ); eventid_info.insert(prev_event_id.clone(), (pdu, json)); } else { + // Time based check failed graph.insert(prev_event_id.clone(), HashSet::new()); eventid_info.insert(prev_event_id.clone(), (pdu, json)); } } else { + // Get json failed graph.insert(prev_event_id.clone(), HashSet::new()); } + } else { + // Fetch and handle failed + graph.insert(prev_event_id.clone(), HashSet::new()); } } @@ -956,7 +961,9 @@ pub async fn handle_incoming_pdu<'a>( for prev_id in dbg!(sorted) { if let Some((pdu, json)) = eventid_info.remove(&prev_id) { - upgrade_outlier_to_timeline_pdu( + let start_time = Instant::now(); + let event_id = pdu.event_id.clone(); + if let Err(e) = upgrade_outlier_to_timeline_pdu( pdu, json, &create_event, @@ -965,7 +972,17 @@ pub async fn handle_incoming_pdu<'a>( room_id, pub_key_map, ) - .await?; + .await + { + warn!("Prev event {} failed: {}", event_id, e); + } + let elapsed = start_time.elapsed(); + warn!( + "Handling prev event {} took {}m{}s", + event_id, + elapsed.as_secs() / 60, + elapsed.as_secs() % 60 + ); } } @@ -981,6 +998,7 @@ pub async fn handle_incoming_pdu<'a>( .await } +#[tracing::instrument(skip(origin, create_event, event_id, room_id, value, db, pub_key_map))] fn handle_outlier_pdu<'a>( origin: &'a ServerName, create_event: &'a PduEvent, @@ -1141,6 +1159,7 @@ fn handle_outlier_pdu<'a>( }) } +#[tracing::instrument(skip(incoming_pdu, val, create_event, origin, db, room_id, pub_key_map))] async fn upgrade_outlier_to_timeline_pdu( incoming_pdu: Arc, val: BTreeMap, @@ -1352,41 +1371,6 @@ async fn upgrade_outlier_to_timeline_pdu( // Only keep those extremities were not referenced yet extremities.retain(|id| !matches!(db.rooms.is_event_referenced(&room_id, id), Ok(true))); - let mut extremity_statehashes = Vec::new(); - - for id in &extremities { - match db - .rooms - .get_pdu(&id) - .map_err(|_| "Failed to ask db for pdu.".to_owned())? - { - Some(leaf_pdu) => { - extremity_statehashes.push(( - db.rooms - .pdu_shortstatehash(&leaf_pdu.event_id) - .map_err(|_| "Failed to ask db for pdu state hash.".to_owned())? - .ok_or_else(|| { - error!( - "Found extremity pdu with no statehash in db: {:?}", - leaf_pdu - ); - "Found pdu with no statehash in db.".to_owned() - })?, - Some(leaf_pdu), - )); - } - _ => { - error!("Missing state snapshot for {:?}", id); - return Err("Missing state snapshot.".to_owned()); - } - } - } - - // 12. Ensure that the state is derived from the previous current state (i.e. we calculated - // by doing state res where one of the inputs was a previously trusted set of state, - // don't just trust a set of state we got from a remote). - - // We do this by adding the current state to the list of fork states let current_statehash = db .rooms .current_shortstatehash(&room_id) @@ -1398,91 +1382,138 @@ async fn upgrade_outlier_to_timeline_pdu( .state_full(current_statehash) .map_err(|_| "Failed to load room state.")?; - extremity_statehashes.push((current_statehash.clone(), None)); + if incoming_pdu.state_key.is_some() { + let mut extremity_statehashes = Vec::new(); - let mut fork_states = Vec::new(); - for (statehash, leaf_pdu) in extremity_statehashes { - let mut leaf_state = db - .rooms - .state_full(statehash) - .map_err(|_| "Failed to ask db for room state.".to_owned())?; - - if let Some(leaf_pdu) = leaf_pdu { - if let Some(state_key) = &leaf_pdu.state_key { - // Now it's the state after - let key = (leaf_pdu.kind.clone(), state_key.clone()); - leaf_state.insert(key, leaf_pdu); + for id in &extremities { + match db + .rooms + .get_pdu(&id) + .map_err(|_| "Failed to ask db for pdu.".to_owned())? + { + Some(leaf_pdu) => { + extremity_statehashes.push(( + db.rooms + .pdu_shortstatehash(&leaf_pdu.event_id) + .map_err(|_| "Failed to ask db for pdu state hash.".to_owned())? + .ok_or_else(|| { + error!( + "Found extremity pdu with no statehash in db: {:?}", + leaf_pdu + ); + "Found pdu with no statehash in db.".to_owned() + })?, + Some(leaf_pdu), + )); + } + _ => { + error!("Missing state snapshot for {:?}", id); + return Err("Missing state snapshot.".to_owned()); + } } } - fork_states.push(leaf_state); - } + // 12. Ensure that the state is derived from the previous current state (i.e. we calculated + // by doing state res where one of the inputs was a previously trusted set of state, + // don't just trust a set of state we got from a remote). - // We also add state after incoming event to the fork states - extremities.insert(incoming_pdu.event_id.clone()); - let mut state_after = state_at_incoming_event.clone(); - if let Some(state_key) = &incoming_pdu.state_key { - state_after.insert( - (incoming_pdu.kind.clone(), state_key.clone()), - incoming_pdu.clone(), - ); - } - fork_states.push(state_after.clone()); + // We do this by adding the current state to the list of fork states - let mut update_state = false; - // 14. Use state resolution to find new room state - let new_room_state = if fork_states.is_empty() { - return Err("State is empty.".to_owned()); - } else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { - // There was only one state, so it has to be the room's current state (because that is - // always included) - fork_states[0] - .iter() - .map(|(k, pdu)| (k.clone(), pdu.event_id.clone())) - .collect() - } else { - // We do need to force an update to this room's state - update_state = true; + extremity_statehashes.push((current_statehash.clone(), None)); - let fork_states = &fork_states - .into_iter() - .map(|map| { - map.into_iter() - .map(|(k, v)| (k, v.event_id.clone())) - .collect::>() - }) - .collect::>(); + let mut fork_states = Vec::new(); + for (statehash, leaf_pdu) in extremity_statehashes { + let mut leaf_state = db + .rooms + .state_full(statehash) + .map_err(|_| "Failed to ask db for room state.".to_owned())?; - let mut auth_chain_sets = Vec::new(); - for state in fork_states { - auth_chain_sets.push( - get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db) - .map_err(|_| "Failed to load auth chain.".to_owned())? - .collect(), + if let Some(leaf_pdu) = leaf_pdu { + if let Some(state_key) = &leaf_pdu.state_key { + // Now it's the state after + let key = (leaf_pdu.kind.clone(), state_key.clone()); + leaf_state.insert(key, leaf_pdu); + } + } + + fork_states.push(leaf_state); + } + + // We also add state after incoming event to the fork states + let mut state_after = state_at_incoming_event.clone(); + if let Some(state_key) = &incoming_pdu.state_key { + state_after.insert( + (incoming_pdu.kind.clone(), state_key.clone()), + incoming_pdu.clone(), ); } + fork_states.push(state_after.clone()); - let state = match state_res::StateResolution::resolve( - &room_id, - room_version_id, - fork_states, - auth_chain_sets, - |id| { - let res = db.rooms.get_pdu(id); - if let Err(e) = &res { - error!("LOOK AT ME Failed to fetch event: {}", e); - } - res.ok().flatten() - }, - ) { - Ok(new_state) => new_state, - Err(_) => { - return Err("State resolution failed, either an event could not be found or deserialization".into()); + let mut update_state = false; + // 14. Use state resolution to find new room state + let new_room_state = if fork_states.is_empty() { + return Err("State is empty.".to_owned()); + } else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { + // There was only one state, so it has to be the room's current state (because that is + // always included) + fork_states[0] + .iter() + .map(|(k, pdu)| (k.clone(), pdu.event_id.clone())) + .collect() + } else { + // We do need to force an update to this room's state + update_state = true; + + let fork_states = &fork_states + .into_iter() + .map(|map| { + map.into_iter() + .map(|(k, v)| (k, v.event_id.clone())) + .collect::>() + }) + .collect::>(); + + let mut auth_chain_sets = Vec::new(); + for state in fork_states { + auth_chain_sets.push( + get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db) + .map_err(|_| "Failed to load auth chain.".to_owned())? + .collect(), + ); } + + let state = match state_res::StateResolution::resolve( + &room_id, + room_version_id, + fork_states, + auth_chain_sets, + |id| { + let res = db.rooms.get_pdu(id); + if let Err(e) = &res { + error!("LOOK AT ME Failed to fetch event: {}", e); + } + res.ok().flatten() + }, + ) { + Ok(new_state) => new_state, + Err(_) => { + return Err("State resolution failed, either an event could not be found or deserialization".into()); + } + }; + + state }; - state - }; + // Set the new room state to the resolved state + if update_state { + db.rooms + .force_state(&room_id, new_room_state, &db) + .map_err(|_| "Failed to set new room state.".to_owned())?; + } + debug!("Updated resolved state"); + } + + extremities.insert(incoming_pdu.event_id.clone()); debug!("starting soft fail auth check"); // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it @@ -1516,14 +1547,6 @@ async fn upgrade_outlier_to_timeline_pdu( warn!("Event was soft failed: {:?}", incoming_pdu); } - // Set the new room state to the resolved state - if update_state { - db.rooms - .force_state(&room_id, new_room_state, &db) - .map_err(|_| "Failed to set new room state.".to_owned())?; - } - debug!("Updated resolved state"); - if soft_fail { // Soft fail, we leave the event as an outlier but don't add it to the timeline return Err("Event has been soft failed".into()); @@ -1543,7 +1566,7 @@ async fn upgrade_outlier_to_timeline_pdu( /// b. Look at outlier pdu tree /// c. Ask origin server over federation /// d. TODO: Ask other servers over federation? -//#[tracing::instrument(skip(db, key_map, auth_cache))] +#[tracing::instrument(skip(db, origin, events, create_event, room_id, pub_key_map))] pub(crate) fn fetch_and_handle_outliers<'a>( db: &'a Database, origin: &'a ServerName, @@ -1562,15 +1585,16 @@ pub(crate) fn fetch_and_handle_outliers<'a>( let mut pdus = vec![]; for id in events { + info!("loading {}", id); if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(&id) { // Exponential backoff - let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); + let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { min_elapsed_duration = Duration::from_secs(60 * 60 * 24); } if time.elapsed() < min_elapsed_duration { - debug!("Backing off from {}", id); + info!("Backing off from {}", id); continue; } } @@ -1586,7 +1610,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( } Ok(None) => { // c. Ask origin server over federation - debug!("Fetching {} over federation.", id); + info!("Fetching {} over federation.", id); match db .sending .send_federation_request( @@ -1597,11 +1621,14 @@ pub(crate) fn fetch_and_handle_outliers<'a>( .await { Ok(res) => { - debug!("Got {} over federation", id); + info!("Got {} over federation", id); let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(&res.pdu) { Ok(t) => t, - Err(_) => continue, + Err(_) => { + back_off(id.clone()); + continue; + } }; // This will also fetch the auth chain @@ -1632,7 +1659,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( } } Err(e) => { - debug!("Error loading {}: {}", id, e); + warn!("Error loading {}: {}", id, e); continue; } }; @@ -1644,7 +1671,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( /// Search the DB for the signing keys of the given server, if we don't have them /// fetch them from the server and save to our DB. -#[tracing::instrument(skip(db))] +#[tracing::instrument(skip(db, origin, signature_ids))] pub(crate) async fn fetch_signing_keys( db: &Database, origin: &ServerName, @@ -1885,6 +1912,7 @@ fn append_incoming_pdu( Ok(pdu_id) } +#[tracing::instrument(skip(starting_events, db))] fn get_auth_chain( starting_events: Vec, db: &Database, @@ -1921,6 +1949,7 @@ fn get_auth_chain( .filter_map(move |sid| db.rooms.get_eventid_from_short(sid).ok())) } +#[tracing::instrument(skip(event_id, db))] fn get_auth_chain_inner(event_id: &EventId, db: &Database) -> Result> { let mut todo = vec![event_id.clone()]; let mut found = HashSet::new(); @@ -2204,6 +2233,7 @@ pub fn create_join_event_template_route( is_direct: None, membership: MembershipState::Join, third_party_invite: None, + reason: None, }) .expect("member event is valid value"); @@ -2680,6 +2710,7 @@ pub async fn claim_keys_route( .into()) } +#[tracing::instrument(skip(event, pub_key_map, db))] pub async fn fetch_required_signing_keys( event: &BTreeMap, pub_key_map: &RwLock>>,