mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-01-15 22:16:27 +03:00
improvement: use simpler rocksdb config
This commit is contained in:
parent
81bc1fc4e3
commit
879a8b969d
5 changed files with 289 additions and 256 deletions
436
Cargo.lock
generated
436
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -121,6 +121,7 @@ optional = true
|
||||||
features = [
|
features = [
|
||||||
"multi-threaded-cf",
|
"multi-threaded-cf",
|
||||||
"zstd",
|
"zstd",
|
||||||
|
"lz4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[target.'cfg(unix)'.dependencies]
|
[target.'cfg(unix)'.dependencies]
|
||||||
|
|
|
@ -29,7 +29,7 @@ use std::{
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
use tokio::sync::watch::Sender;
|
use tokio::sync::watch::Sender;
|
||||||
use tracing::error;
|
use tracing::{error, info};
|
||||||
|
|
||||||
/// # `GET /_matrix/client/r0/sync`
|
/// # `GET /_matrix/client/r0/sync`
|
||||||
///
|
///
|
||||||
|
@ -99,6 +99,8 @@ pub async fn sync_events_route(
|
||||||
|
|
||||||
o.insert((body.since.clone(), rx.clone()));
|
o.insert((body.since.clone(), rx.clone()));
|
||||||
|
|
||||||
|
info!("Sync started for {sender_user}");
|
||||||
|
|
||||||
tokio::spawn(sync_helper_wrapper(
|
tokio::spawn(sync_helper_wrapper(
|
||||||
sender_user.clone(),
|
sender_user.clone(),
|
||||||
sender_device.clone(),
|
sender_device.clone(),
|
||||||
|
|
|
@ -48,6 +48,9 @@ pub async fn search_users_route(
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// It's a matching user, but is the sender allowed to see them?
|
||||||
|
let mut user_visible = false;
|
||||||
|
|
||||||
let user_is_in_public_rooms = services()
|
let user_is_in_public_rooms = services()
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
|
@ -69,9 +72,8 @@ pub async fn search_users_route(
|
||||||
});
|
});
|
||||||
|
|
||||||
if user_is_in_public_rooms {
|
if user_is_in_public_rooms {
|
||||||
return Some(user);
|
user_visible = true;
|
||||||
}
|
} else {
|
||||||
|
|
||||||
let user_is_in_shared_rooms = services()
|
let user_is_in_shared_rooms = services()
|
||||||
.rooms
|
.rooms
|
||||||
.user
|
.user
|
||||||
|
@ -81,10 +83,15 @@ pub async fn search_users_route(
|
||||||
.is_some();
|
.is_some();
|
||||||
|
|
||||||
if user_is_in_shared_rooms {
|
if user_is_in_shared_rooms {
|
||||||
return Some(user);
|
user_visible = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
None
|
if !user_visible {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(user)
|
||||||
});
|
});
|
||||||
|
|
||||||
let results = users.by_ref().take(limit).collect();
|
let results = users.by_ref().take(limit).collect();
|
||||||
|
|
|
@ -23,29 +23,23 @@ pub struct RocksDbEngineTree<'a> {
|
||||||
fn db_options(max_open_files: i32, rocksdb_cache: &rocksdb::Cache) -> rocksdb::Options {
|
fn db_options(max_open_files: i32, rocksdb_cache: &rocksdb::Cache) -> rocksdb::Options {
|
||||||
let mut block_based_options = rocksdb::BlockBasedOptions::default();
|
let mut block_based_options = rocksdb::BlockBasedOptions::default();
|
||||||
block_based_options.set_block_cache(rocksdb_cache);
|
block_based_options.set_block_cache(rocksdb_cache);
|
||||||
|
block_based_options.set_bloom_filter(10.0, false);
|
||||||
// "Difference of spinning disk"
|
|
||||||
// https://zhangyuchi.gitbooks.io/rocksdbbook/content/RocksDB-Tuning-Guide.html
|
|
||||||
block_based_options.set_block_size(4 * 1024);
|
block_based_options.set_block_size(4 * 1024);
|
||||||
block_based_options.set_cache_index_and_filter_blocks(true);
|
block_based_options.set_cache_index_and_filter_blocks(true);
|
||||||
|
block_based_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
|
||||||
|
block_based_options.set_optimize_filters_for_memory(true);
|
||||||
|
|
||||||
let mut db_opts = rocksdb::Options::default();
|
let mut db_opts = rocksdb::Options::default();
|
||||||
db_opts.set_block_based_table_factory(&block_based_options);
|
db_opts.set_block_based_table_factory(&block_based_options);
|
||||||
db_opts.set_optimize_filters_for_hits(true);
|
|
||||||
db_opts.set_skip_stats_update_on_db_open(true);
|
|
||||||
db_opts.set_level_compaction_dynamic_level_bytes(true);
|
|
||||||
db_opts.set_target_file_size_base(256 * 1024 * 1024);
|
|
||||||
//db_opts.set_compaction_readahead_size(2 * 1024 * 1024);
|
|
||||||
//db_opts.set_use_direct_reads(true);
|
|
||||||
//db_opts.set_use_direct_io_for_flush_and_compaction(true);
|
|
||||||
db_opts.create_if_missing(true);
|
db_opts.create_if_missing(true);
|
||||||
db_opts.increase_parallelism(num_cpus::get() as i32);
|
db_opts.increase_parallelism(num_cpus::get() as i32);
|
||||||
db_opts.set_max_open_files(max_open_files);
|
db_opts.set_max_open_files(max_open_files);
|
||||||
db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
|
db_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
|
||||||
|
db_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
|
||||||
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
|
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
|
||||||
db_opts.optimize_level_style_compaction(10 * 1024 * 1024);
|
|
||||||
|
|
||||||
// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
|
// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
|
||||||
|
db_opts.set_level_compaction_dynamic_level_bytes(true);
|
||||||
db_opts.set_max_background_jobs(6);
|
db_opts.set_max_background_jobs(6);
|
||||||
db_opts.set_bytes_per_sync(1048576);
|
db_opts.set_bytes_per_sync(1048576);
|
||||||
|
|
||||||
|
@ -59,9 +53,6 @@ fn db_options(max_open_files: i32, rocksdb_cache: &rocksdb::Cache) -> rocksdb::O
|
||||||
// restored via federation.
|
// restored via federation.
|
||||||
db_opts.set_wal_recovery_mode(rocksdb::DBRecoveryMode::TolerateCorruptedTailRecords);
|
db_opts.set_wal_recovery_mode(rocksdb::DBRecoveryMode::TolerateCorruptedTailRecords);
|
||||||
|
|
||||||
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
|
|
||||||
db_opts.set_prefix_extractor(prefix_extractor);
|
|
||||||
|
|
||||||
db_opts
|
db_opts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -147,12 +138,17 @@ impl RocksDbEngineTree<'_> {
|
||||||
|
|
||||||
impl KvTree for RocksDbEngineTree<'_> {
|
impl KvTree for RocksDbEngineTree<'_> {
|
||||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||||
Ok(self.db.rocks.get_cf(&self.cf(), key)?)
|
let readoptions = rocksdb::ReadOptions::default();
|
||||||
|
|
||||||
|
Ok(self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
||||||
|
let writeoptions = rocksdb::WriteOptions::default();
|
||||||
let lock = self.write_lock.read().unwrap();
|
let lock = self.write_lock.read().unwrap();
|
||||||
self.db.rocks.put_cf(&self.cf(), key, value)?;
|
self.db
|
||||||
|
.rocks
|
||||||
|
.put_cf_opt(&self.cf(), key, value, &writeoptions)?;
|
||||||
drop(lock);
|
drop(lock);
|
||||||
|
|
||||||
self.watchers.wake(key);
|
self.watchers.wake(key);
|
||||||
|
@ -161,22 +157,31 @@ impl KvTree for RocksDbEngineTree<'_> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
|
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
|
||||||
|
let writeoptions = rocksdb::WriteOptions::default();
|
||||||
for (key, value) in iter {
|
for (key, value) in iter {
|
||||||
self.db.rocks.put_cf(&self.cf(), key, value)?;
|
self.db
|
||||||
|
.rocks
|
||||||
|
.put_cf_opt(&self.cf(), key, value, &writeoptions)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove(&self, key: &[u8]) -> Result<()> {
|
fn remove(&self, key: &[u8]) -> Result<()> {
|
||||||
Ok(self.db.rocks.delete_cf(&self.cf(), key)?)
|
let writeoptions = rocksdb::WriteOptions::default();
|
||||||
|
Ok(self
|
||||||
|
.db
|
||||||
|
.rocks
|
||||||
|
.delete_cf_opt(&self.cf(), key, &writeoptions)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||||
|
let readoptions = rocksdb::ReadOptions::default();
|
||||||
|
|
||||||
Box::new(
|
Box::new(
|
||||||
self.db
|
self.db
|
||||||
.rocks
|
.rocks
|
||||||
.iterator_cf(&self.cf(), rocksdb::IteratorMode::Start)
|
.iterator_cf_opt(&self.cf(), readoptions, rocksdb::IteratorMode::Start)
|
||||||
.map(|r| r.unwrap())
|
.map(|r| r.unwrap())
|
||||||
.map(|(k, v)| (Vec::from(k), Vec::from(v))),
|
.map(|(k, v)| (Vec::from(k), Vec::from(v))),
|
||||||
)
|
)
|
||||||
|
@ -187,11 +192,14 @@ impl KvTree for RocksDbEngineTree<'_> {
|
||||||
from: &[u8],
|
from: &[u8],
|
||||||
backwards: bool,
|
backwards: bool,
|
||||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||||
|
let readoptions = rocksdb::ReadOptions::default();
|
||||||
|
|
||||||
Box::new(
|
Box::new(
|
||||||
self.db
|
self.db
|
||||||
.rocks
|
.rocks
|
||||||
.iterator_cf(
|
.iterator_cf_opt(
|
||||||
&self.cf(),
|
&self.cf(),
|
||||||
|
readoptions,
|
||||||
rocksdb::IteratorMode::From(
|
rocksdb::IteratorMode::From(
|
||||||
from,
|
from,
|
||||||
if backwards {
|
if backwards {
|
||||||
|
@ -207,23 +215,33 @@ impl KvTree for RocksDbEngineTree<'_> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
let readoptions = rocksdb::ReadOptions::default();
|
||||||
|
let writeoptions = rocksdb::WriteOptions::default();
|
||||||
|
|
||||||
let lock = self.write_lock.write().unwrap();
|
let lock = self.write_lock.write().unwrap();
|
||||||
|
|
||||||
let old = self.db.rocks.get_cf(&self.cf(), key)?;
|
let old = self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)?;
|
||||||
let new = utils::increment(old.as_deref()).unwrap();
|
let new = utils::increment(old.as_deref()).unwrap();
|
||||||
self.db.rocks.put_cf(&self.cf(), key, &new)?;
|
self.db
|
||||||
|
.rocks
|
||||||
|
.put_cf_opt(&self.cf(), key, &new, &writeoptions)?;
|
||||||
|
|
||||||
drop(lock);
|
drop(lock);
|
||||||
Ok(new)
|
Ok(new)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
|
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
|
||||||
|
let readoptions = rocksdb::ReadOptions::default();
|
||||||
|
let writeoptions = rocksdb::WriteOptions::default();
|
||||||
|
|
||||||
let lock = self.write_lock.write().unwrap();
|
let lock = self.write_lock.write().unwrap();
|
||||||
|
|
||||||
for key in iter {
|
for key in iter {
|
||||||
let old = self.db.rocks.get_cf(&self.cf(), &key)?;
|
let old = self.db.rocks.get_cf_opt(&self.cf(), &key, &readoptions)?;
|
||||||
let new = utils::increment(old.as_deref()).unwrap();
|
let new = utils::increment(old.as_deref()).unwrap();
|
||||||
self.db.rocks.put_cf(&self.cf(), key, new)?;
|
self.db
|
||||||
|
.rocks
|
||||||
|
.put_cf_opt(&self.cf(), key, new, &writeoptions)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(lock);
|
drop(lock);
|
||||||
|
@ -235,11 +253,14 @@ impl KvTree for RocksDbEngineTree<'_> {
|
||||||
&'a self,
|
&'a self,
|
||||||
prefix: Vec<u8>,
|
prefix: Vec<u8>,
|
||||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||||
|
let readoptions = rocksdb::ReadOptions::default();
|
||||||
|
|
||||||
Box::new(
|
Box::new(
|
||||||
self.db
|
self.db
|
||||||
.rocks
|
.rocks
|
||||||
.iterator_cf(
|
.iterator_cf_opt(
|
||||||
&self.cf(),
|
&self.cf(),
|
||||||
|
readoptions,
|
||||||
rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
|
rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
|
||||||
)
|
)
|
||||||
.map(|r| r.unwrap())
|
.map(|r| r.unwrap())
|
||||||
|
|
Loading…
Reference in a new issue