improvement: faster /syncs

This commit is contained in:
Timo Kösters 2022-02-08 09:25:44 +01:00
parent 0cec421930
commit 2a00c547a1
No known key found for this signature in database
GPG key ID: 356E705610F626D5
3 changed files with 71 additions and 28 deletions

View file

@ -245,6 +245,9 @@ async fn sync_helper(
let insert_lock = mutex_insert.lock().unwrap(); let insert_lock = mutex_insert.lock().unwrap();
drop(insert_lock); drop(insert_lock);
let timeline_pdus;
let limited;
if db.rooms.last_timeline_count(&sender_user, &room_id)? > since {
let mut non_timeline_pdus = db let mut non_timeline_pdus = db
.rooms .rooms
.pdus_until(&sender_user, &room_id, u64::MAX)? .pdus_until(&sender_user, &room_id, u64::MAX)?
@ -262,13 +265,21 @@ async fn sync_helper(
}); });
// Take the last 10 events for the timeline // Take the last 10 events for the timeline
let timeline_pdus: Vec<_> = non_timeline_pdus timeline_pdus = non_timeline_pdus
.by_ref() .by_ref()
.take(10) .take(10)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter() .into_iter()
.rev() .rev()
.collect(); .collect::<Vec<_>>();
// They /sync response doesn't always return all messages, so we say the output is
// limited unless there are events in non_timeline_pdus
limited = non_timeline_pdus.next().is_some();
} else {
timeline_pdus = Vec::new();
limited = false;
}
let send_notification_counts = !timeline_pdus.is_empty() let send_notification_counts = !timeline_pdus.is_empty()
|| db || db
@ -277,10 +288,6 @@ async fn sync_helper(
.last_privateread_update(&sender_user, &room_id)? .last_privateread_update(&sender_user, &room_id)?
> since; > since;
// They /sync response doesn't always return all messages, so we say the output is
// limited unless there are events in non_timeline_pdus
let limited = non_timeline_pdus.next().is_some();
let mut timeline_users = HashSet::new(); let mut timeline_users = HashSet::new();
for (_, event) in &timeline_pdus { for (_, event) in &timeline_pdus {
timeline_users.insert(event.sender.as_str().to_owned()); timeline_users.insert(event.sender.as_str().to_owned());

View file

@ -263,6 +263,7 @@ impl Database {
stateinfo_cache: Mutex::new(LruCache::new( stateinfo_cache: Mutex::new(LruCache::new(
(100.0 * config.conduit_cache_capacity_modifier) as usize, (100.0 * config.conduit_cache_capacity_modifier) as usize,
)), )),
lasttimelinecount_cache: Mutex::new(HashMap::new()),
}, },
account_data: account_data::AccountData { account_data: account_data::AccountData {
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,

View file

@ -32,7 +32,7 @@ use serde::Deserialize;
use serde_json::value::to_raw_value; use serde_json::value::to_raw_value;
use std::{ use std::{
borrow::Cow, borrow::Cow,
collections::{BTreeMap, HashMap, HashSet}, collections::{hash_map, BTreeMap, HashMap, HashSet},
fmt::Debug, fmt::Debug,
iter, iter,
mem::size_of, mem::size_of,
@ -128,6 +128,7 @@ pub struct Rooms {
)>, )>,
>, >,
>, >,
pub(super) lasttimelinecount_cache: Mutex<HashMap<Box<RoomId>, u64>>,
} }
impl Rooms { impl Rooms {
@ -1331,6 +1332,10 @@ impl Rooms {
&pdu_id, &pdu_id,
&serde_json::to_vec(&pdu_json).expect("CanonicalJsonObject is always a valid"), &serde_json::to_vec(&pdu_json).expect("CanonicalJsonObject is always a valid"),
)?; )?;
self.lasttimelinecount_cache
.lock()
.unwrap()
.insert(pdu.room_id.clone(), count2);
self.eventid_pduid self.eventid_pduid
.insert(pdu.event_id.as_bytes(), &pdu_id)?; .insert(pdu.event_id.as_bytes(), &pdu_id)?;
@ -1498,6 +1503,36 @@ impl Rooms {
Ok(pdu_id) Ok(pdu_id)
} }
#[tracing::instrument(skip(self))]
pub fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<u64> {
match self
.lasttimelinecount_cache
.lock()
.unwrap()
.entry(room_id.to_owned())
{
hash_map::Entry::Vacant(v) => {
if let Some(last_count) = self
.pdus_until(&sender_user, &room_id, u64::MAX)?
.filter_map(|r| {
// Filter out buggy events
if r.is_err() {
error!("Bad pdu in pdus_since: {:?}", r);
}
r.ok()
})
.map(|(pduid, _)| self.pdu_count(&pduid))
.next()
{
Ok(*v.insert(last_count?))
} else {
Ok(0)
}
}
hash_map::Entry::Occupied(o) => Ok(*o.get()),
}
}
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
let mut userroom_id = user_id.as_bytes().to_vec(); let mut userroom_id = user_id.as_bytes().to_vec();