improvement: better sqlite

This commit is contained in:
Timo Kösters 2021-08-15 13:17:42 +02:00 committed by Jonas Zohren
parent 6f58af0b99
commit 8f1a41dcd2
3 changed files with 47 additions and 18 deletions

View file

@ -35,6 +35,7 @@ pub trait Tree: Send + Sync {
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
fn increment(&self, key: &[u8]) -> Result<Vec<u8>>;
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()>;
fn scan_prefix<'a>(
&'a self,

View file

@ -49,11 +49,11 @@ impl Engine {
fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result<Connection> {
let conn = Connection::open(&path)?;
conn.pragma_update(Some(Main), "page_size", &32768)?;
conn.pragma_update(Some(Main), "page_size", &1024)?;
conn.pragma_update(Some(Main), "journal_mode", &"WAL")?;
conn.pragma_update(Some(Main), "synchronous", &"NORMAL")?;
conn.pragma_update(Some(Main), "cache_size", &(-i64::from(cache_size_kb)))?;
conn.pragma_update(Some(Main), "wal_autocheckpoint", &0)?;
conn.pragma_update(Some(Main), "wal_autocheckpoint", &8000)?;
Ok(conn)
}
@ -93,8 +93,9 @@ impl Engine {
}
pub fn flush_wal(self: &Arc<Self>) -> Result<()> {
self.write_lock()
.pragma_update(Some(Main), "wal_checkpoint", &"TRUNCATE")?;
// We use autocheckpoints
//self.write_lock()
//.pragma_update(Some(Main), "wal_checkpoint", &"TRUNCATE")?;
Ok(())
}
}
@ -248,6 +249,24 @@ impl Tree for SqliteTable {
Ok(())
}
#[tracing::instrument(skip(self, iter))]
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
let guard = self.engine.write_lock();
guard.execute("BEGIN", [])?;
for key in iter {
let old = self.get_with_guard(&guard, &key)?;
let new = crate::utils::increment(old.as_deref())
.expect("utils::increment always returns Some");
self.insert_with_guard(&guard, &key, &new)?;
}
guard.execute("COMMIT", [])?;
drop(guard);
Ok(())
}
#[tracing::instrument(skip(self, key))]
fn remove(&self, key: &[u8]) -> Result<()> {
let guard = self.engine.write_lock();

View file

@ -92,13 +92,17 @@ pub struct Rooms {
pub(super) pdu_cache: Mutex<LruCache<EventId, Arc<PduEvent>>>,
pub(super) auth_chain_cache: Mutex<LruCache<u64, HashSet<u64>>>,
pub(super) shorteventid_cache: Mutex<LruCache<u64, EventId>>,
pub(super) stateinfo_cache: Mutex<LruCache<u64,
Vec<(
u64, // sstatehash
HashSet<CompressedStateEvent>, // full state
HashSet<CompressedStateEvent>, // added
HashSet<CompressedStateEvent>, // removed
)>>>,
pub(super) stateinfo_cache: Mutex<
LruCache<
u64,
Vec<(
u64, // sstatehash
HashSet<CompressedStateEvent>, // full state
HashSet<CompressedStateEvent>, // added
HashSet<CompressedStateEvent>, // removed
)>,
>,
>,
}
impl Rooms {
@ -414,7 +418,8 @@ impl Rooms {
HashSet<CompressedStateEvent>, // removed
)>,
> {
if let Some(r) = self.stateinfo_cache
if let Some(r) = self
.stateinfo_cache
.lock()
.unwrap()
.get_mut(&shortstatehash)
@ -458,10 +463,6 @@ impl Rooms {
response.push((shortstatehash, state, added, removed));
self.stateinfo_cache
.lock()
.unwrap()
.insert(shortstatehash, response.clone());
Ok(response)
} else {
let mut response = Vec::new();
@ -1173,6 +1174,9 @@ impl Rooms {
let sync_pdu = pdu.to_sync_room_event();
let mut notifies = Vec::new();
let mut highlights = Vec::new();
for user in db
.rooms
.room_members(&pdu.room_id)
@ -1218,11 +1222,11 @@ impl Rooms {
userroom_id.extend_from_slice(pdu.room_id.as_bytes());
if notify {
self.userroomid_notificationcount.increment(&userroom_id)?;
notifies.push(userroom_id.clone());
}
if highlight {
self.userroomid_highlightcount.increment(&userroom_id)?;
highlights.push(userroom_id);
}
for senderkey in db.pusher.get_pusher_senderkeys(&user) {
@ -1230,6 +1234,11 @@ impl Rooms {
}
}
self.userroomid_notificationcount
.increment_batch(&mut notifies.into_iter())?;
self.userroomid_highlightcount
.increment_batch(&mut highlights.into_iter())?;
match pdu.kind {
EventType::RoomRedaction => {
if let Some(redact_id) = &pdu.redacts {