mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-03-16 00:22:25 +03:00
add config and optimise
This commit is contained in:
parent
dc5f1f41fd
commit
0c23874194
5 changed files with 109 additions and 53 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1178,6 +1178,7 @@ version = "0.22.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "290b64917f8b0cb885d9de0f9959fe1f775d7fa12f1da2db9001c1c8ab60f89d"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
|
|
@ -74,7 +74,7 @@ tracing-opentelemetry = "0.11.0"
|
|||
opentelemetry-jaeger = "0.11.0"
|
||||
pretty_env_logger = "0.4.0"
|
||||
lru-cache = "0.1.2"
|
||||
rusqlite = { version = "0.25.3", optional = true }
|
||||
rusqlite = { version = "0.25.3", optional = true, features = ["bundled"] }
|
||||
parking_lot = { version = "0.11.1", optional = true }
|
||||
crossbeam = { version = "0.8.1", optional = true }
|
||||
num_cpus = { version = "1.13.0", optional = true }
|
||||
|
@ -84,7 +84,7 @@ default = ["conduit_bin", "backend_sqlite"]
|
|||
backend_sled = ["sled"]
|
||||
backend_rocksdb = ["rocksdb"]
|
||||
backend_sqlite = ["sqlite"]
|
||||
sqlite = ["rusqlite", "parking_lot", "crossbeam", "num_cpus"]
|
||||
sqlite = ["rusqlite", "parking_lot", "crossbeam", "num_cpus", "tokio/signal"]
|
||||
conduit_bin = [] # TODO: add rocket to this when it is optional
|
||||
|
||||
[[bin]]
|
||||
|
|
|
@ -44,6 +44,16 @@ pub struct Config {
|
|||
database_path: String,
|
||||
#[serde(default = "default_cache_capacity")]
|
||||
cache_capacity: u32,
|
||||
#[serde(default = "default_sqlite_cache_kib")]
|
||||
sqlite_cache_kib: u32,
|
||||
#[serde(default = "default_sqlite_read_pool_size")]
|
||||
sqlite_read_pool_size: usize,
|
||||
#[serde(default = "false_fn")]
|
||||
sqlite_wal_clean_timer: bool,
|
||||
#[serde(default = "default_sqlite_wal_clean_second_interval")]
|
||||
sqlite_wal_clean_second_interval: u32,
|
||||
#[serde(default = "default_sqlite_wal_clean_second_timeout")]
|
||||
sqlite_wal_clean_second_timeout: u32,
|
||||
#[serde(default = "default_max_request_size")]
|
||||
max_request_size: u32,
|
||||
#[serde(default = "default_max_concurrent_requests")]
|
||||
|
@ -77,6 +87,22 @@ fn default_cache_capacity() -> u32 {
|
|||
1024 * 1024 * 1024
|
||||
}
|
||||
|
||||
fn default_sqlite_cache_kib() -> u32 {
|
||||
2000
|
||||
}
|
||||
|
||||
fn default_sqlite_read_pool_size() -> usize {
|
||||
num_cpus::get().max(1)
|
||||
}
|
||||
|
||||
fn default_sqlite_wal_clean_second_interval() -> u32 {
|
||||
60
|
||||
}
|
||||
|
||||
fn default_sqlite_wal_clean_second_timeout() -> u32 {
|
||||
2
|
||||
}
|
||||
|
||||
fn default_max_request_size() -> u32 {
|
||||
20 * 1024 * 1024 // Default to 20 MB
|
||||
}
|
||||
|
@ -451,6 +477,61 @@ impl Database {
|
|||
pub fn flush_wal(&self) -> Result<()> {
|
||||
self._db.flush_wal()
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
pub async fn start_wal_clean_task(lock: &Arc<TokioRwLock<Self>>, config: &Config) {
|
||||
use tokio::{
|
||||
signal::unix::{signal, SignalKind},
|
||||
time::{interval, timeout},
|
||||
};
|
||||
|
||||
use std::{
|
||||
sync::Weak,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
let weak: Weak<TokioRwLock<Database>> = Arc::downgrade(&lock);
|
||||
|
||||
let lock_timeout = Duration::from_secs(config.sqlite_wal_clean_second_timeout as u64);
|
||||
let timer_interval = Duration::from_secs(config.sqlite_wal_clean_second_interval as u64);
|
||||
let do_timer = config.sqlite_wal_clean_timer;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut i = interval(timer_interval);
|
||||
let mut s = signal(SignalKind::hangup()).unwrap();
|
||||
|
||||
loop {
|
||||
if do_timer {
|
||||
i.tick().await;
|
||||
log::info!(target: "wal-trunc", "Timer ticked")
|
||||
} else {
|
||||
s.recv().await;
|
||||
log::info!(target: "wal-trunc", "Received SIGHUP")
|
||||
}
|
||||
|
||||
if let Some(arc) = Weak::upgrade(&weak) {
|
||||
log::info!(target: "wal-trunc", "Locking...");
|
||||
let guard = {
|
||||
if let Ok(guard) = timeout(lock_timeout, arc.write()).await {
|
||||
guard
|
||||
} else {
|
||||
log::info!(target: "wal-trunc", "Lock failed in timeout, canceled.");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
log::info!(target: "wal-trunc", "Locked, flushing...");
|
||||
let start = Instant::now();
|
||||
if let Err(e) = guard.flush_wal() {
|
||||
log::error!(target: "wal-trunc", "Errored: {}", e);
|
||||
} else {
|
||||
log::info!(target: "wal-trunc", "Flushed in {:?}", start.elapsed());
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReadGuard(OwnedRwLockReadGuard<Database>);
|
||||
|
|
|
@ -36,6 +36,7 @@ use tokio::sync::oneshot::Sender;
|
|||
struct Pool {
|
||||
writer: Mutex<Connection>,
|
||||
readers: Vec<Mutex<Connection>>,
|
||||
spill_tracker: Arc<()>,
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
|
@ -43,7 +44,7 @@ pub const MILLI: Duration = Duration::from_millis(1);
|
|||
|
||||
enum HoldingConn<'a> {
|
||||
FromGuard(MutexGuard<'a, Connection>),
|
||||
FromOwned(Connection),
|
||||
FromOwned(Connection, Arc<()>),
|
||||
}
|
||||
|
||||
impl<'a> Deref for HoldingConn<'a> {
|
||||
|
@ -52,29 +53,30 @@ impl<'a> Deref for HoldingConn<'a> {
|
|||
fn deref(&self) -> &Self::Target {
|
||||
match self {
|
||||
HoldingConn::FromGuard(guard) => guard.deref(),
|
||||
HoldingConn::FromOwned(conn) => conn,
|
||||
HoldingConn::FromOwned(conn, _) => conn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Pool {
|
||||
fn new<P: AsRef<Path>>(path: P, num_readers: usize) -> Result<Self> {
|
||||
let writer = Mutex::new(Self::prepare_conn(&path)?);
|
||||
fn new<P: AsRef<Path>>(path: P, num_readers: usize, cache_size: u32) -> Result<Self> {
|
||||
let writer = Mutex::new(Self::prepare_conn(&path, Some(cache_size))?);
|
||||
|
||||
let mut readers = Vec::new();
|
||||
|
||||
for _ in 0..num_readers {
|
||||
readers.push(Mutex::new(Self::prepare_conn(&path)?))
|
||||
readers.push(Mutex::new(Self::prepare_conn(&path, Some(cache_size))?))
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
writer,
|
||||
readers,
|
||||
spill_tracker: Arc::new(()),
|
||||
path: path.as_ref().to_path_buf(),
|
||||
})
|
||||
}
|
||||
|
||||
fn prepare_conn<P: AsRef<Path>>(path: P) -> Result<Connection> {
|
||||
fn prepare_conn<P: AsRef<Path>>(path: P, cache_size: Option<u32>) -> Result<Connection> {
|
||||
let conn = Connection::open(path)?;
|
||||
|
||||
conn.pragma_update(Some(Main), "journal_mode", &"WAL".to_owned())?;
|
||||
|
@ -85,6 +87,10 @@ impl Pool {
|
|||
|
||||
conn.pragma_update(Some(Main), "synchronous", &"OFF".to_owned())?;
|
||||
|
||||
if let Some(cache_kib) = cache_size {
|
||||
conn.pragma_update(Some(Main), "cache_size", &(-Into::<i64>::into(cache_kib)))?;
|
||||
}
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
|
@ -99,11 +105,18 @@ impl Pool {
|
|||
}
|
||||
}
|
||||
|
||||
log::warn!("all readers locked, creating spillover reader...");
|
||||
let spill_arc = self.spill_tracker.clone();
|
||||
let now_count = Arc::strong_count(&spill_arc) - 1 /* because one is held by the pool */;
|
||||
|
||||
let spilled = Self::prepare_conn(&self.path).unwrap();
|
||||
log::warn!("read_lock: all readers locked, creating spillover reader...");
|
||||
|
||||
return HoldingConn::FromOwned(spilled);
|
||||
if now_count > 1 {
|
||||
log::warn!("read_lock: now {} spillover readers exist", now_count);
|
||||
}
|
||||
|
||||
let spilled = Self::prepare_conn(&self.path, None).unwrap();
|
||||
|
||||
return HoldingConn::FromOwned(spilled, spill_arc);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,7 +128,8 @@ impl DatabaseEngine for SqliteEngine {
|
|||
fn open(config: &Config) -> Result<Arc<Self>> {
|
||||
let pool = Pool::new(
|
||||
format!("{}/conduit.db", &config.database_path),
|
||||
num_cpus::get(),
|
||||
config.sqlite_read_pool_size,
|
||||
config.sqlite_cache_kib,
|
||||
)?;
|
||||
|
||||
pool.write_lock()
|
||||
|
|
42
src/main.rs
42
src/main.rs
|
@ -203,47 +203,7 @@ async fn main() {
|
|||
.expect("config is valid");
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
{
|
||||
use tokio::time::{interval, timeout};
|
||||
|
||||
use std::{
|
||||
sync::Weak,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
let weak: Weak<RwLock<Database>> = Arc::downgrade(&db);
|
||||
|
||||
tokio::spawn(async {
|
||||
let weak = weak;
|
||||
|
||||
let mut i = interval(Duration::from_secs(60));
|
||||
|
||||
loop {
|
||||
i.tick().await;
|
||||
|
||||
if let Some(arc) = Weak::upgrade(&weak) {
|
||||
log::warn!("wal-trunc: locking...");
|
||||
let guard = {
|
||||
if let Ok(guard) = timeout(Duration::from_secs(5), arc.write()).await {
|
||||
guard
|
||||
} else {
|
||||
log::warn!("wal-trunc: lock failed in timeout, canceled.");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
log::warn!("wal-trunc: locked, flushing...");
|
||||
let start = Instant::now();
|
||||
if let Err(e) = guard.flush_wal() {
|
||||
log::warn!("wal-trunc: errored: {}", e);
|
||||
} else {
|
||||
log::warn!("wal-trunc: flushed in {:?}", start.elapsed());
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
Database::start_wal_clean_task(&db, &config).await;
|
||||
|
||||
if config.allow_jaeger {
|
||||
let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline()
|
||||
|
|
Loading…
Reference in a new issue