From 9410d3ef9cc2a19096149ac69ba3422231c760b0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz>
Date: Thu, 12 Aug 2021 17:55:16 +0200
Subject: [PATCH] fix: long prev event fetch times for huge rooms

---
 src/database.rs       |  2 +-
 src/database/rooms.rs | 49 ++++++++++++++++++-------------------------
 src/server_server.rs  | 36 ++++++++++++++++++++++++-------
 3 files changed, 49 insertions(+), 38 deletions(-)

diff --git a/src/database.rs b/src/database.rs
index 7996057d..7a17e53e 100644
--- a/src/database.rs
+++ b/src/database.rs
@@ -272,7 +272,7 @@ impl Database {
                 referencedevents: builder.open_tree("referencedevents")?,
                 pdu_cache: Mutex::new(LruCache::new(100_000)),
                 auth_chain_cache: Mutex::new(LruCache::new(100_000)),
-                shorteventid_cache: Mutex::new(LruCache::new(100_000)),
+                shorteventid_cache: Mutex::new(LruCache::new(1_000_000)),
             },
             account_data: account_data::AccountData {
                 roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
diff --git a/src/database/rooms.rs b/src/database/rooms.rs
index 246aa0ba..88878e9d 100644
--- a/src/database/rooms.rs
+++ b/src/database/rooms.rs
@@ -99,15 +99,11 @@ impl Rooms {
         Ok(self
             .stateid_shorteventid
             .scan_prefix(shortstatehash.to_be_bytes().to_vec())
-            .map(|(_, bytes)| self.shorteventid_eventid.get(&bytes).ok().flatten())
-            .flatten()
-            .map(|bytes| {
-                EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
-                    Error::bad_database("EventID in stateid_shorteventid is invalid unicode.")
-                })?)
-                .map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid."))
+            .map(|(_, bytes)| {
+                self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap())
+                    .ok()
             })
-            .filter_map(|r| r.ok())
+            .flatten()
             .collect())
     }
 
@@ -118,15 +114,11 @@ impl Rooms {
         let state = self
             .stateid_shorteventid
             .scan_prefix(shortstatehash.to_be_bytes().to_vec())
-            .map(|(_, bytes)| self.shorteventid_eventid.get(&bytes).ok().flatten())
-            .flatten()
-            .map(|bytes| {
-                EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
-                    Error::bad_database("EventID in stateid_shorteventid is invalid unicode.")
-                })?)
-                .map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid."))
+            .map(|(_, bytes)| {
+                self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap())
+                    .ok()
             })
-            .filter_map(|r| r.ok())
+            .flatten()
             .map(|eventid| self.get_pdu(&eventid))
             .filter_map(|r| r.ok().flatten())
             .map(|pdu| {
@@ -168,15 +160,10 @@ impl Rooms {
             Ok(self
                 .stateid_shorteventid
                 .get(&stateid)?
-                .map(|bytes| self.shorteventid_eventid.get(&bytes).ok().flatten())
-                .flatten()
                 .map(|bytes| {
-                    EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
-                        Error::bad_database("EventID in stateid_shorteventid is invalid unicode.")
-                    })?)
-                    .map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid."))
+                    self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap())
+                        .ok()
                 })
-                .map(|r| r.ok())
                 .flatten())
         } else {
             Ok(None)
@@ -448,7 +435,12 @@ impl Rooms {
     }
 
     pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result<EventId> {
-        if let Some(id) = self.shorteventid_cache.lock().unwrap().get_mut(&shorteventid) {
+        if let Some(id) = self
+            .shorteventid_cache
+            .lock()
+            .unwrap()
+            .get_mut(&shorteventid)
+        {
             return Ok(id.clone());
         }
 
@@ -457,12 +449,11 @@ impl Rooms {
             .get(&shorteventid.to_be_bytes())?
             .ok_or_else(|| Error::bad_database("Shorteventid does not exist"))?;
 
-        let event_id = EventId::try_from(
-            utils::string_from_bytes(&bytes).map_err(|_| {
+        let event_id =
+            EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
                 Error::bad_database("EventID in roomid_pduleaves is invalid unicode.")
-            })?,
-        )
-        .map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))?;
+            })?)
+            .map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))?;
 
         self.shorteventid_cache
             .lock()
diff --git a/src/server_server.rs b/src/server_server.rs
index a4c90a71..b3f0353a 100644
--- a/src/server_server.rs
+++ b/src/server_server.rs
@@ -668,7 +668,7 @@ pub async fn send_transaction_message_route(
 
         let elapsed = start_time.elapsed();
         warn!(
-            "Handling event {} took {}m{}s",
+            "Handling transaction of event {} took {}m{}s",
             event_id,
             elapsed.as_secs() / 60,
             elapsed.as_secs() % 60
@@ -850,6 +850,8 @@ pub fn handle_incoming_pdu<'a>(
     pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
 ) -> AsyncRecursiveType<'a, StdResult<Option<Vec<u8>>, String>> {
     Box::pin(async move {
+        let start_time = Instant::now();
+
         // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
         match db.rooms.exists(&room_id) {
             Ok(true) => {}
@@ -1014,12 +1016,18 @@ pub fn handle_incoming_pdu<'a>(
         // 8. if not timeline event: stop
         if !is_timeline_event
             || incoming_pdu.origin_server_ts
-                < db.rooms
-                    .first_pdu_in_room(&room_id)
-                    .map_err(|_| "Error loading first room event.".to_owned())?
-                    .expect("Room exists")
-                    .origin_server_ts
+                < (utils::millis_since_unix_epoch() - 1000 * 60 * 20)
+                    .try_into()
+                    .expect("time is valid")
+        // Not older than 20 mins
         {
+            let elapsed = start_time.elapsed();
+            warn!(
+                "Handling outlier event {} took {}m{}s",
+                event_id,
+                elapsed.as_secs() / 60,
+                elapsed.as_secs() % 60
+            );
             return Ok(None);
         }
 
@@ -1312,7 +1320,8 @@ pub fn handle_incoming_pdu<'a>(
             for state in fork_states {
                 auth_chain_sets.push(
                     get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db)
-                        .map_err(|_| "Failed to load auth chain.".to_owned())?.collect(),
+                        .map_err(|_| "Failed to load auth chain.".to_owned())?
+                        .collect(),
                 );
             }
 
@@ -1385,6 +1394,14 @@ pub fn handle_incoming_pdu<'a>(
 
         // Event has passed all auth/stateres checks
         drop(state_lock);
+
+        let elapsed = start_time.elapsed();
+        warn!(
+            "Handling timeline event {} took {}m{}s",
+            event_id,
+            elapsed.as_secs() / 60,
+            elapsed.as_secs() % 60
+        );
         Ok(pdu_id)
     })
 }
@@ -1757,7 +1774,10 @@ fn append_incoming_pdu(
     Ok(pdu_id)
 }
 
-fn get_auth_chain(starting_events: Vec<EventId>, db: &Database) -> Result<impl Iterator<Item = EventId> + '_> {
+fn get_auth_chain(
+    starting_events: Vec<EventId>,
+    db: &Database,
+) -> Result<impl Iterator<Item = EventId> + '_> {
     let mut full_auth_chain = HashSet::new();
 
     let starting_events = starting_events