incorperate feedback

This commit is contained in:
Jonathan de Jong 2021-07-11 15:41:10 +02:00
parent 318d9c1a35
commit 3a76fda92b
6 changed files with 40 additions and 45 deletions

View file

@ -25,7 +25,6 @@ tokio = "1.2.0"
# Used for storing data permanently # Used for storing data permanently
sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true }
rocksdb = { version = "0.16.0", features = ["multi-threaded-cf"], optional = true } rocksdb = { version = "0.16.0", features = ["multi-threaded-cf"], optional = true }
# sqlx = { version = "0.5.5", features = ["sqlite", "runtime-tokio-rustls"], optional = true }
#sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] }
# Used for the http request / response body type for Ruma endpoints used with reqwest # Used for the http request / response body type for Ruma endpoints used with reqwest

View file

@ -42,10 +42,8 @@ use self::proxy::ProxyConfig;
pub struct Config { pub struct Config {
server_name: Box<ServerName>, server_name: Box<ServerName>,
database_path: String, database_path: String,
#[serde(default = "default_cache_capacity")] #[serde(default = "default_db_cache_capacity")]
cache_capacity: u32, db_cache_capacity: u32,
#[serde(default = "default_sqlite_cache_kib")]
sqlite_cache_kib: u32,
#[serde(default = "default_sqlite_read_pool_size")] #[serde(default = "default_sqlite_read_pool_size")]
sqlite_read_pool_size: usize, sqlite_read_pool_size: usize,
#[serde(default = "false_fn")] #[serde(default = "false_fn")]
@ -83,14 +81,10 @@ fn true_fn() -> bool {
true true
} }
fn default_cache_capacity() -> u32 { fn default_db_cache_capacity() -> u32 {
1024 * 1024 * 1024 1024 * 1024 * 1024
} }
fn default_sqlite_cache_kib() -> u32 {
2000
}
fn default_sqlite_read_pool_size() -> usize { fn default_sqlite_read_pool_size() -> usize {
num_cpus::get().max(1) num_cpus::get().max(1)
} }
@ -116,13 +110,13 @@ fn default_log() -> String {
} }
#[cfg(feature = "sled")] #[cfg(feature = "sled")]
pub type Engine = abstraction::sled::SledEngine; pub type Engine = abstraction::sled::Engine;
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
pub type Engine = abstraction::rocksdb::RocksDbEngine; pub type Engine = abstraction::rocksdb::Engine;
#[cfg(feature = "sqlite")] #[cfg(feature = "sqlite")]
pub type Engine = abstraction::sqlite::SqliteEngine; pub type Engine = abstraction::sqlite::Engine;
pub struct Database { pub struct Database {
_db: Arc<Engine>, _db: Arc<Engine>,
@ -268,7 +262,7 @@ impl Database {
globals: globals::Globals::load( globals: globals::Globals::load(
builder.open_tree("global")?, builder.open_tree("global")?,
builder.open_tree("server_signingkeys")?, builder.open_tree("server_signingkeys")?,
config, config.clone(),
)?, )?,
})); }));
@ -372,6 +366,9 @@ impl Database {
drop(guard); drop(guard);
#[cfg(feature = "sqlite")]
Self::start_wal_clean_task(&db, &config).await;
Ok(db) Ok(db)
} }
@ -481,6 +478,7 @@ impl Database {
#[cfg(feature = "sqlite")] #[cfg(feature = "sqlite")]
pub async fn start_wal_clean_task(lock: &Arc<TokioRwLock<Self>>, config: &Config) { pub async fn start_wal_clean_task(lock: &Arc<TokioRwLock<Self>>, config: &Config) {
use tokio::{ use tokio::{
select,
signal::unix::{signal, SignalKind}, signal::unix::{signal, SignalKind},
time::{interval, timeout}, time::{interval, timeout},
}; };
@ -501,13 +499,14 @@ impl Database {
let mut s = signal(SignalKind::hangup()).unwrap(); let mut s = signal(SignalKind::hangup()).unwrap();
loop { loop {
if do_timer { select! {
i.tick().await; _ = i.tick(), if do_timer => {
log::info!(target: "wal-trunc", "Timer ticked") log::info!(target: "wal-trunc", "Timer ticked")
} else { }
s.recv().await; _ = s.recv() => {
log::info!(target: "wal-trunc", "Received SIGHUP") log::info!(target: "wal-trunc", "Received SIGHUP")
} }
};
if let Some(arc) = Weak::upgrade(&weak) { if let Some(arc) = Weak::upgrade(&weak) {
log::info!(target: "wal-trunc", "Locking..."); log::info!(target: "wal-trunc", "Locking...");

View file

@ -7,15 +7,15 @@ use super::{DatabaseEngine, Tree};
use std::{collections::BTreeMap, sync::RwLock}; use std::{collections::BTreeMap, sync::RwLock};
pub struct RocksDbEngine(rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>); pub struct Engine(rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>);
pub struct RocksDbEngineTree<'a> { pub struct RocksDbEngineTree<'a> {
db: Arc<RocksDbEngine>, db: Arc<Engine>,
name: &'a str, name: &'a str,
watchers: RwLock<BTreeMap<Vec<u8>, Vec<tokio::sync::oneshot::Sender<()>>>>, watchers: RwLock<BTreeMap<Vec<u8>, Vec<tokio::sync::oneshot::Sender<()>>>>,
} }
impl DatabaseEngine for RocksDbEngine { impl DatabaseEngine for Engine {
fn open(config: &Config) -> Result<Arc<Self>> { fn open(config: &Config) -> Result<Arc<Self>> {
let mut db_opts = rocksdb::Options::default(); let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true); db_opts.create_if_missing(true);
@ -45,7 +45,7 @@ impl DatabaseEngine for RocksDbEngine {
.map(|name| rocksdb::ColumnFamilyDescriptor::new(name, options.clone())), .map(|name| rocksdb::ColumnFamilyDescriptor::new(name, options.clone())),
)?; )?;
Ok(Arc::new(RocksDbEngine(db))) Ok(Arc::new(Engine(db)))
} }
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> { fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> {

View file

@ -5,13 +5,13 @@ use std::{future::Future, pin::Pin, sync::Arc};
use super::{DatabaseEngine, Tree}; use super::{DatabaseEngine, Tree};
pub struct SledEngine(sled::Db); pub struct Engine(sled::Db);
pub struct SledEngineTree(sled::Tree); pub struct SledEngineTree(sled::Tree);
impl DatabaseEngine for SledEngine { impl DatabaseEngine for Engine {
fn open(config: &Config) -> Result<Arc<Self>> { fn open(config: &Config) -> Result<Arc<Self>> {
Ok(Arc::new(SledEngine( Ok(Arc::new(Engine(
sled::Config::default() sled::Config::default()
.path(&config.database_path) .path(&config.database_path)
.cache_capacity(config.cache_capacity as u64) .cache_capacity(config.cache_capacity as u64)

View file

@ -119,22 +119,22 @@ impl Pool {
} }
} }
pub struct SqliteEngine { pub struct Engine {
pool: Pool, pool: Pool,
} }
impl DatabaseEngine for SqliteEngine { impl DatabaseEngine for Engine {
fn open(config: &Config) -> Result<Arc<Self>> { fn open(config: &Config) -> Result<Arc<Self>> {
let pool = Pool::new( let pool = Pool::new(
Path::new(&config.database_path).join("conduit.db"), Path::new(&config.database_path).join("conduit.db"),
config.sqlite_read_pool_size, config.sqlite_read_pool_size,
config.sqlite_cache_kib, config.db_cache_capacity / 1024, // bytes -> kb
)?; )?;
pool.write_lock() pool.write_lock()
.execute("CREATE TABLE IF NOT EXISTS _noop (\"key\" INT)", params![])?; .execute("CREATE TABLE IF NOT EXISTS _noop (\"key\" INT)", params![])?;
let arc = Arc::new(SqliteEngine { pool }); let arc = Arc::new(Engine { pool });
Ok(arc) Ok(arc)
} }
@ -166,7 +166,7 @@ impl DatabaseEngine for SqliteEngine {
} }
} }
impl SqliteEngine { impl Engine {
pub fn flush_wal(self: &Arc<Self>) -> Result<()> { pub fn flush_wal(self: &Arc<Self>) -> Result<()> {
self.pool self.pool
.write_lock() .write_lock()
@ -185,7 +185,7 @@ impl SqliteEngine {
} }
pub struct SqliteTable { pub struct SqliteTable {
engine: Arc<SqliteEngine>, engine: Arc<Engine>,
name: String, name: String,
watchers: RwLock<BTreeMap<Vec<u8>, Vec<Sender<()>>>>, watchers: RwLock<BTreeMap<Vec<u8>, Vec<Sender<()>>>>,
} }
@ -257,19 +257,19 @@ impl Tree for SqliteTable {
} }
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
{ let guard = self.engine.pool.write_lock();
let guard = self.engine.pool.write_lock();
let start = Instant::now(); let start = Instant::now();
self.insert_with_guard(&guard, key, value)?; self.insert_with_guard(&guard, key, value)?;
let elapsed = start.elapsed(); let elapsed = start.elapsed();
if elapsed > MILLI { if elapsed > MILLI {
debug!("insert: took {:012?} : {}", elapsed, &self.name); debug!("insert: took {:012?} : {}", elapsed, &self.name);
}
} }
drop(guard);
let watchers = self.watchers.read(); let watchers = self.watchers.read();
let mut triggered = Vec::new(); let mut triggered = Vec::new();

View file

@ -202,9 +202,6 @@ async fn main() {
.await .await
.expect("config is valid"); .expect("config is valid");
#[cfg(feature = "sqlite")]
Database::start_wal_clean_task(&db, &config).await;
if config.allow_jaeger { if config.allow_jaeger {
let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline()
.with_service_name("conduit") .with_service_name("conduit")