diff --git a/src/database.rs b/src/database.rs index 036ec598..e9646dfe 100644 --- a/src/database.rs +++ b/src/database.rs @@ -456,6 +456,8 @@ impl Database { .watch_prefix(&userid_bytes), ); + futures.push(Box::pin(self.globals.rotate.watch())); + // Wait until one of them finds something futures.next().await; } @@ -509,6 +511,11 @@ impl Database { }; if let Some(arc) = Weak::upgrade(&weak) { + log::info!(target: "wal-trunc", "Rotating sync helpers..."); + // This actually creates a very small race condition between firing this and trying to acquire the subsequent write lock. + // Though it is not a huge deal if the write lock doesn't "catch", as it'll harmlessly time out. + arc.read().await.globals.rotate.fire(); + log::info!(target: "wal-trunc", "Locking..."); let guard = { if let Ok(guard) = timeout(lock_timeout, arc.write()).await { diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index fe548133..310e03aa 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -16,7 +16,7 @@ use super::{DatabaseEngine, Tree}; use log::debug; use crossbeam::channel::{bounded, Sender as ChannelSender}; -use parking_lot::{Mutex, MutexGuard, RwLock}; +use parking_lot::{FairMutex, FairMutexGuard, Mutex, MutexGuard, RwLock}; use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension}; use tokio::sync::oneshot::Sender; @@ -33,7 +33,7 @@ use tokio::sync::oneshot::Sender; // "SELECT key, value FROM {} WHERE key <= ? ORDER BY DESC"; struct Pool { - writer: Mutex<Connection>, + writer: FairMutex<Connection>, readers: Vec<Mutex<Connection>>, spill_tracker: Arc<()>, path: PathBuf, @@ -59,7 +59,7 @@ impl<'a> Deref for HoldingConn<'a> { impl Pool { fn new<P: AsRef<Path>>(path: P, num_readers: usize, cache_size: u32) -> Result<Self> { - let writer = Mutex::new(Self::prepare_conn(&path, Some(cache_size))?); + let writer = FairMutex::new(Self::prepare_conn(&path, Some(cache_size))?); let mut readers = Vec::new(); @@ -93,7 +93,7 @@ impl Pool { Ok(conn) } - fn write_lock(&self) -> MutexGuard<'_, Connection> { + fn write_lock(&self) -> FairMutexGuard<'_, Connection> { self.writer.lock() } diff --git a/src/database/globals.rs b/src/database/globals.rs index eef478a1..7c530729 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -11,11 +11,12 @@ use rustls::{ServerCertVerifier, WebPKIVerifier}; use std::{ collections::{BTreeMap, HashMap}, fs, + future::Future, path::PathBuf, sync::{Arc, RwLock}, time::{Duration, Instant}, }; -use tokio::sync::Semaphore; +use tokio::sync::{broadcast, Semaphore}; use trust_dns_resolver::TokioAsyncResolver; use super::abstraction::Tree; @@ -47,6 +48,7 @@ pub struct Globals { ), // since, rx >, >, + pub rotate: RotationHandler, } struct MatrixServerVerifier { @@ -82,6 +84,28 @@ impl ServerCertVerifier for MatrixServerVerifier { } } +pub struct RotationHandler(broadcast::Sender<()>, broadcast::Receiver<()>); + +impl RotationHandler { + pub fn new() -> Self { + let (s, r) = broadcast::channel::<()>(1); + + Self(s, r) + } + + pub fn watch(&self) -> impl Future<Output = ()> { + let mut r = self.0.subscribe(); + + async move { + let _ = r.recv().await; + } + } + + pub fn fire(&self) { + let _ = self.0.send(()); + } +} + impl Globals { pub fn load( globals: Arc<dyn Tree>, @@ -168,6 +192,7 @@ impl Globals { bad_signature_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())), servername_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())), sync_receivers: RwLock::new(BTreeMap::new()), + rotate: RotationHandler::new(), }; fs::create_dir_all(s.get_media_folder())?;