From 2a00c547a1baca5e2ca57966ef5ce5c7f063f367 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz>
Date: Tue, 8 Feb 2022 09:25:44 +0100
Subject: [PATCH] improvement: faster /syncs

---
 src/client_server/sync.rs | 61 ++++++++++++++++++++++-----------------
 src/database.rs           |  1 +
 src/database/rooms.rs     | 37 +++++++++++++++++++++++-
 3 files changed, 71 insertions(+), 28 deletions(-)

diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs
index 7cfea5af..1ccf7982 100644
--- a/src/client_server/sync.rs
+++ b/src/client_server/sync.rs
@@ -245,30 +245,41 @@ async fn sync_helper(
         let insert_lock = mutex_insert.lock().unwrap();
         drop(insert_lock);
 
-        let mut non_timeline_pdus = db
-            .rooms
-            .pdus_until(&sender_user, &room_id, u64::MAX)?
-            .filter_map(|r| {
-                // Filter out buggy events
-                if r.is_err() {
-                    error!("Bad pdu in pdus_since: {:?}", r);
-                }
-                r.ok()
-            })
-            .take_while(|(pduid, _)| {
-                db.rooms
-                    .pdu_count(pduid)
-                    .map_or(false, |count| count > since)
-            });
+        let timeline_pdus;
+        let limited;
+        if db.rooms.last_timeline_count(&sender_user, &room_id)? > since {
+            let mut non_timeline_pdus = db
+                .rooms
+                .pdus_until(&sender_user, &room_id, u64::MAX)?
+                .filter_map(|r| {
+                    // Filter out buggy events
+                    if r.is_err() {
+                        error!("Bad pdu in pdus_since: {:?}", r);
+                    }
+                    r.ok()
+                })
+                .take_while(|(pduid, _)| {
+                    db.rooms
+                        .pdu_count(pduid)
+                        .map_or(false, |count| count > since)
+                });
 
-        // Take the last 10 events for the timeline
-        let timeline_pdus: Vec<_> = non_timeline_pdus
-            .by_ref()
-            .take(10)
-            .collect::<Vec<_>>()
-            .into_iter()
-            .rev()
-            .collect();
+            // Take the last 10 events for the timeline
+            timeline_pdus = non_timeline_pdus
+                .by_ref()
+                .take(10)
+                .collect::<Vec<_>>()
+                .into_iter()
+                .rev()
+                .collect::<Vec<_>>();
+
+            // They /sync response doesn't always return all messages, so we say the output is
+            // limited unless there are events in non_timeline_pdus
+            limited = non_timeline_pdus.next().is_some();
+        } else {
+            timeline_pdus = Vec::new();
+            limited = false;
+        }
 
         let send_notification_counts = !timeline_pdus.is_empty()
             || db
@@ -277,10 +288,6 @@ async fn sync_helper(
                 .last_privateread_update(&sender_user, &room_id)?
                 > since;
 
-        // They /sync response doesn't always return all messages, so we say the output is
-        // limited unless there are events in non_timeline_pdus
-        let limited = non_timeline_pdus.next().is_some();
-
         let mut timeline_users = HashSet::new();
         for (_, event) in &timeline_pdus {
             timeline_users.insert(event.sender.as_str().to_owned());
diff --git a/src/database.rs b/src/database.rs
index 2b1671cd..8e95b1ef 100644
--- a/src/database.rs
+++ b/src/database.rs
@@ -263,6 +263,7 @@ impl Database {
                 stateinfo_cache: Mutex::new(LruCache::new(
                     (100.0 * config.conduit_cache_capacity_modifier) as usize,
                 )),
+                lasttimelinecount_cache: Mutex::new(HashMap::new()),
             },
             account_data: account_data::AccountData {
                 roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
diff --git a/src/database/rooms.rs b/src/database/rooms.rs
index 0abd2e79..17c9b743 100644
--- a/src/database/rooms.rs
+++ b/src/database/rooms.rs
@@ -32,7 +32,7 @@ use serde::Deserialize;
 use serde_json::value::to_raw_value;
 use std::{
     borrow::Cow,
-    collections::{BTreeMap, HashMap, HashSet},
+    collections::{hash_map, BTreeMap, HashMap, HashSet},
     fmt::Debug,
     iter,
     mem::size_of,
@@ -128,6 +128,7 @@ pub struct Rooms {
             )>,
         >,
     >,
+    pub(super) lasttimelinecount_cache: Mutex<HashMap<Box<RoomId>, u64>>,
 }
 
 impl Rooms {
@@ -1331,6 +1332,10 @@ impl Rooms {
             &pdu_id,
             &serde_json::to_vec(&pdu_json).expect("CanonicalJsonObject is always a valid"),
         )?;
+        self.lasttimelinecount_cache
+            .lock()
+            .unwrap()
+            .insert(pdu.room_id.clone(), count2);
 
         self.eventid_pduid
             .insert(pdu.event_id.as_bytes(), &pdu_id)?;
@@ -1498,6 +1503,36 @@ impl Rooms {
         Ok(pdu_id)
     }
 
+    #[tracing::instrument(skip(self))]
+    pub fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<u64> {
+        match self
+            .lasttimelinecount_cache
+            .lock()
+            .unwrap()
+            .entry(room_id.to_owned())
+        {
+            hash_map::Entry::Vacant(v) => {
+                if let Some(last_count) = self
+                    .pdus_until(&sender_user, &room_id, u64::MAX)?
+                    .filter_map(|r| {
+                        // Filter out buggy events
+                        if r.is_err() {
+                            error!("Bad pdu in pdus_since: {:?}", r);
+                        }
+                        r.ok()
+                    })
+                    .map(|(pduid, _)| self.pdu_count(&pduid))
+                    .next()
+                {
+                    Ok(*v.insert(last_count?))
+                } else {
+                    Ok(0)
+                }
+            }
+            hash_map::Entry::Occupied(o) => Ok(*o.get()),
+        }
+    }
+
     #[tracing::instrument(skip(self))]
     pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
         let mut userroom_id = user_id.as_bytes().to_vec();