mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-04-22 14:10:16 +03:00
Merge branch 'WIP_backup' into 'next'
Draft: Work in progres for backup and restore data from running conduit instance See merge request famedly/conduit!546
This commit is contained in:
commit
4fd465657c
10 changed files with 790 additions and 39 deletions
|
@ -175,6 +175,9 @@ version = "0.25"
|
|||
[target.'cfg(unix)'.dependencies]
|
||||
nix = { version = "0.28", features = ["resource"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.2"
|
||||
|
||||
[features]
|
||||
default = ["backend_rocksdb", "backend_sqlite", "conduit_bin", "systemd"]
|
||||
#backend_sled = ["sled"]
|
||||
|
|
|
@ -26,11 +26,16 @@ pub mod persy;
|
|||
))]
|
||||
pub mod watchers;
|
||||
|
||||
pub mod json_stream_export;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub trait KeyValueDatabaseEngine: Send + Sync {
|
||||
fn open(config: &Config) -> Result<Self>
|
||||
where
|
||||
Self: Sized;
|
||||
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>>;
|
||||
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>>;
|
||||
fn flush(&self) -> Result<()>;
|
||||
fn cleanup(&self) -> Result<()> {
|
||||
Ok(())
|
||||
|
@ -38,6 +43,15 @@ pub trait KeyValueDatabaseEngine: Send + Sync {
|
|||
fn memory_usage(&self) -> Result<String> {
|
||||
Ok("Current database engine does not support memory usage reporting.".to_owned())
|
||||
}
|
||||
fn clear_caches(&self) {}
|
||||
|
||||
fn export(&self, exporter: &mut dyn KvExport) -> Result<()>;
|
||||
}
|
||||
|
||||
pub trait KvExport {
|
||||
fn start_tree(&mut self, name: &str) -> Result<()>;
|
||||
fn key_value(&mut self, key: &[u8], value: &[u8]) -> Result<()>;
|
||||
fn end_tree(&mut self, name: &str) -> Result<()>;
|
||||
}
|
||||
|
||||
pub trait KvTree: Send + Sync {
|
||||
|
|
|
@ -9,7 +9,7 @@ use std::{
|
|||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use super::{DatabaseEngine, Tree};
|
||||
use super::{KeyValueDatabaseEngine, KvExport, KvTree};
|
||||
|
||||
type TupleOfBytes = (Vec<u8>, Vec<u8>);
|
||||
|
||||
|
@ -30,8 +30,8 @@ fn convert_error(error: heed::Error) -> Error {
|
|||
}
|
||||
}
|
||||
|
||||
impl DatabaseEngine for Engine {
|
||||
fn open(config: &Config) -> Result<Arc<Self>> {
|
||||
impl KeyValueDatabaseEngine for Arc<Engine> {
|
||||
fn open(config: &Config) -> Result<Self> {
|
||||
let mut env_builder = heed::EnvOpenOptions::new();
|
||||
env_builder.map_size(1024 * 1024 * 1024 * 1024); // 1 Terabyte
|
||||
env_builder.max_readers(126);
|
||||
|
@ -49,10 +49,10 @@ impl DatabaseEngine for Engine {
|
|||
}))
|
||||
}
|
||||
|
||||
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> {
|
||||
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
|
||||
// Creates the db if it doesn't exist already
|
||||
Ok(Arc::new(EngineTree {
|
||||
engine: Arc::clone(self),
|
||||
engine: self.clone(),
|
||||
tree: Arc::new(
|
||||
self.env
|
||||
.create_database(Some(name))
|
||||
|
@ -62,10 +62,24 @@ impl DatabaseEngine for Engine {
|
|||
}))
|
||||
}
|
||||
|
||||
fn flush(self: &Arc<Self>) -> Result<()> {
|
||||
fn flush(&self) -> Result<()> {
|
||||
self.env.force_sync().map_err(convert_error)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn export(&self, exporter: &mut dyn KvExport) -> Result<()> {
|
||||
// Heed do not support snapshots
|
||||
let trees: Vec<String> = unimplemented!("heed has no way lo list trees");
|
||||
for tree_name in &trees {
|
||||
exporter.start_tree(tree_name)?;
|
||||
let tree = self.open_tree(tree_name)?;
|
||||
for (key, value) in tree.iter() {
|
||||
exporter.key_value(&key, &value)?;
|
||||
}
|
||||
exporter.end_tree(&tree_name)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl EngineTree {
|
||||
|
@ -78,17 +92,7 @@ impl EngineTree {
|
|||
let (s, r) = bounded::<TupleOfBytes>(100);
|
||||
let engine = Arc::clone(&self.engine);
|
||||
|
||||
let lock = self.engine.iter_pool.lock().await;
|
||||
if lock.active_count() < lock.max_count() {
|
||||
lock.execute(move || {
|
||||
iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s);
|
||||
});
|
||||
} else {
|
||||
std::thread::spawn(move || {
|
||||
iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s);
|
||||
});
|
||||
}
|
||||
|
||||
let lock = self.engine.iter_pool.lock();
|
||||
Box::new(r.into_iter())
|
||||
}
|
||||
}
|
||||
|
@ -123,7 +127,7 @@ fn iter_from_thread_work(
|
|||
}
|
||||
}
|
||||
|
||||
impl Tree for EngineTree {
|
||||
impl KvTree for EngineTree {
|
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||
let txn = self.engine.env.read_txn().map_err(convert_error)?;
|
||||
Ok(self
|
||||
|
@ -143,6 +147,36 @@ impl Tree for EngineTree {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
|
||||
let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
|
||||
for (key, value) in iter {
|
||||
self.tree
|
||||
.put(&mut txn, &key.as_slice(), &value.as_slice())
|
||||
.map_err(convert_error)?;
|
||||
self.watchers.wake(&key);
|
||||
}
|
||||
txn.commit().map_err(convert_error)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
|
||||
let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
|
||||
for key in iter {
|
||||
let old = self
|
||||
.tree
|
||||
.get(&txn, &key.as_slice())
|
||||
.map_err(convert_error)?;
|
||||
let new = crate::utils::increment(old.as_deref())
|
||||
.expect("utils::increment always returns Some");
|
||||
|
||||
self.tree
|
||||
.put(&mut txn, &key.as_slice(), &&*new)
|
||||
.map_err(convert_error)?;
|
||||
}
|
||||
txn.commit().map_err(convert_error)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove(&self, key: &[u8]) -> Result<()> {
|
||||
let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
|
||||
self.tree.delete(&mut txn, &key).map_err(convert_error)?;
|
||||
|
@ -150,7 +184,7 @@ impl Tree for EngineTree {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> {
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||
self.iter_from(&[], false)
|
||||
}
|
||||
|
||||
|
@ -158,7 +192,7 @@ impl Tree for EngineTree {
|
|||
&self,
|
||||
from: &[u8],
|
||||
backwards: bool,
|
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send> {
|
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)>> {
|
||||
self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards)
|
||||
}
|
||||
|
||||
|
@ -181,7 +215,7 @@ impl Tree for EngineTree {
|
|||
fn scan_prefix<'a>(
|
||||
&'a self,
|
||||
prefix: Vec<u8>,
|
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> {
|
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||
Box::new(
|
||||
self.iter_from(&prefix, false)
|
||||
.take_while(move |(key, _)| key.starts_with(&prefix)),
|
||||
|
|
77
src/database/abstraction/json_stream_export.rs
Normal file
77
src/database/abstraction/json_stream_export.rs
Normal file
|
@ -0,0 +1,77 @@
|
|||
use crate::{
|
||||
database::abstraction::{KeyValueDatabaseEngine, KvExport},
|
||||
Result,
|
||||
};
|
||||
use base64::{engine::general_purpose::STANDARD, Engine};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::{BufRead, BufReader, Read, Write};
|
||||
|
||||
pub trait KeyValueJsonExporter {
|
||||
fn export_json_stream(&self, output: &mut dyn Write) -> Result<()>;
|
||||
fn import_json_stream(&self, input: &mut dyn Read) -> Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct Entry {
|
||||
tree: String,
|
||||
key: String,
|
||||
value: String,
|
||||
}
|
||||
|
||||
struct JsonExporter<'a> {
|
||||
current_name: String,
|
||||
write: &'a mut dyn Write,
|
||||
}
|
||||
impl<'a> KvExport for JsonExporter<'a> {
|
||||
fn start_tree(&mut self, name: &str) -> Result<()> {
|
||||
self.current_name = name.to_owned();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn key_value(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
|
||||
let entry = Entry {
|
||||
tree: self.current_name.clone(),
|
||||
key: STANDARD.encode(key),
|
||||
value: STANDARD.encode(value),
|
||||
};
|
||||
writeln!(self.write, "{}", serde_json::to_string(&entry).unwrap())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn end_tree(&mut self, _name: &str) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: KeyValueDatabaseEngine> KeyValueJsonExporter for T {
|
||||
fn export_json_stream(&self, output: &mut dyn Write) -> Result<()> {
|
||||
self.export(&mut JsonExporter {
|
||||
current_name: Default::default(),
|
||||
write: output,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
fn import_json_stream(&self, input: &mut dyn Read) -> Result<()> {
|
||||
let bf = BufReader::new(input);
|
||||
//Just a cache to avoid to reopen the tree all the times
|
||||
let mut cur_tree = None;
|
||||
for line in bf.lines() {
|
||||
if let Ok(entry) = serde_json::from_str::<Entry>(&line?) {
|
||||
if let (Ok(key), Ok(value)) =
|
||||
(STANDARD.decode(&entry.key), STANDARD.decode(&entry.value))
|
||||
{
|
||||
let (tree, tree_name) = match cur_tree {
|
||||
Some((tree, tree_name)) if tree_name == entry.tree => (tree, tree_name),
|
||||
_ => {
|
||||
let tree = self.open_tree(&entry.tree)?;
|
||||
(tree, entry.tree.clone())
|
||||
}
|
||||
};
|
||||
tree.insert(&key, &value)?;
|
||||
cur_tree = Some((tree, tree_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
database::{
|
||||
abstraction::{watchers::Watchers, KeyValueDatabaseEngine, KvTree},
|
||||
abstraction::{watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree},
|
||||
Config,
|
||||
},
|
||||
Result,
|
||||
|
@ -27,7 +27,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
Ok(Arc::new(Engine { persy }))
|
||||
}
|
||||
|
||||
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
|
||||
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
|
||||
// Create if it doesn't exist
|
||||
if !self.persy.exists_index(name)? {
|
||||
let mut tx = self.persy.begin()?;
|
||||
|
@ -45,6 +45,22 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
fn flush(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn export(&self, exporter: &mut dyn KvExport) -> Result<()> {
|
||||
let snapshot = self.persy.snapshot()?;
|
||||
let indexes = snapshot.list_indexes()?;
|
||||
for (index, _) in indexes {
|
||||
exporter.start_tree(&index)?;
|
||||
let data = snapshot.range::<ByteVec, ByteVec, _>(&index, ..)?;
|
||||
for (key, values) in data {
|
||||
for value in values {
|
||||
exporter.key_value(&key, &value)?;
|
||||
}
|
||||
}
|
||||
exporter.end_tree(&index)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PersyTree {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
||||
use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree};
|
||||
use crate::{utils, Result};
|
||||
use std::{
|
||||
future::Future,
|
||||
|
@ -11,11 +11,12 @@ pub struct Engine {
|
|||
max_open_files: i32,
|
||||
cache: rocksdb::Cache,
|
||||
old_cfs: Vec<String>,
|
||||
database_path: String,
|
||||
}
|
||||
|
||||
pub struct RocksDbEngineTree<'a> {
|
||||
pub struct RocksDbEngineTree {
|
||||
db: Arc<Engine>,
|
||||
name: &'a str,
|
||||
name: String,
|
||||
watchers: Watchers,
|
||||
write_lock: RwLock<()>,
|
||||
}
|
||||
|
@ -85,10 +86,11 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
max_open_files: config.rocksdb_max_open_files,
|
||||
cache: rocksdb_cache,
|
||||
old_cfs: cfs,
|
||||
database_path: config.database_path.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
|
||||
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
|
||||
if !self.old_cfs.contains(&name.to_owned()) {
|
||||
// Create if it didn't exist
|
||||
let _ = self
|
||||
|
@ -97,7 +99,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
}
|
||||
|
||||
Ok(Arc::new(RocksDbEngineTree {
|
||||
name,
|
||||
name: name.to_owned(),
|
||||
db: Arc::clone(self),
|
||||
watchers: Watchers::default(),
|
||||
write_lock: RwLock::new(()),
|
||||
|
@ -126,15 +128,39 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
self.cache.get_pinned_usage() as f64 / 1024.0 / 1024.0,
|
||||
))
|
||||
}
|
||||
|
||||
fn export(&self, exporter: &mut dyn KvExport) -> Result<()> {
|
||||
let snapshot = self.rocks.snapshot();
|
||||
let column_familes = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
|
||||
&rocksdb::Options::default(),
|
||||
&self.database_path,
|
||||
)
|
||||
.unwrap();
|
||||
for column_family in column_familes {
|
||||
if let Some(handle) = self.rocks.cf_handle(&column_family) {
|
||||
exporter.start_tree(&column_family)?;
|
||||
let data = snapshot.iterator_cf(&handle, rocksdb::IteratorMode::Start);
|
||||
for ele in data {
|
||||
if let Ok((key, value)) = ele {
|
||||
exporter.key_value(&key, &value)?;
|
||||
}
|
||||
}
|
||||
exporter.end_tree(&column_family)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn clear_caches(&self) {}
|
||||
}
|
||||
|
||||
impl RocksDbEngineTree<'_> {
|
||||
impl RocksDbEngineTree {
|
||||
fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
|
||||
self.db.rocks.cf_handle(self.name).unwrap()
|
||||
self.db.rocks.cf_handle(&self.name).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl KvTree for RocksDbEngineTree<'_> {
|
||||
impl KvTree for RocksDbEngineTree {
|
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||
let readoptions = rocksdb::ReadOptions::default();
|
||||
|
||||
|
|
|
@ -27,6 +27,20 @@ impl DatabaseEngine for Engine {
|
|||
fn flush(self: &Arc<Self>) -> Result<()> {
|
||||
Ok(()) // noop
|
||||
}
|
||||
|
||||
fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> {
|
||||
// Sled do not support snapshots
|
||||
let indexes = self.0.tree_names();
|
||||
for index in &indexes {
|
||||
exporter.start_index(index)?;
|
||||
let tree = Arc::new(SledEngineTree(self.0.open_tree(name)?));
|
||||
for (key, value) in tree.iter() {
|
||||
exporter.key_value(&key, &value)?;
|
||||
}
|
||||
exporter.end_index(&index)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Tree for SledEngineTree {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
||||
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree};
|
||||
use crate::{database::Config, Result};
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
use rusqlite::{Connection, DatabaseName::Main, OptionalExtension};
|
||||
|
@ -78,6 +78,14 @@ impl Engine {
|
|||
.pragma_update(Some(Main), "wal_checkpoint", "RESTART")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn open_tree_impl(self: &Arc<Self>, name: &str) -> Arc<dyn KvTree> {
|
||||
Arc::new(SqliteTable {
|
||||
engine: Arc::clone(self),
|
||||
name: name.to_owned(),
|
||||
watchers: Watchers::default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl KeyValueDatabaseEngine for Arc<Engine> {
|
||||
|
@ -108,11 +116,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
|
||||
self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {name} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )"), [])?;
|
||||
|
||||
Ok(Arc::new(SqliteTable {
|
||||
engine: Arc::clone(self),
|
||||
name: name.to_owned(),
|
||||
watchers: Watchers::default(),
|
||||
}))
|
||||
Ok(self.open_tree_impl(name))
|
||||
}
|
||||
|
||||
fn flush(&self) -> Result<()> {
|
||||
|
@ -123,6 +127,27 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
fn cleanup(&self) -> Result<()> {
|
||||
self.flush_wal()
|
||||
}
|
||||
|
||||
fn export(&self, exporter: &mut dyn KvExport) -> Result<()> {
|
||||
// TODO: rusqlite do not support snapshot yet, change this when they are supported
|
||||
let tables: Vec<String> = {
|
||||
let guard = self.read_lock();
|
||||
guard
|
||||
.prepare("SELECT name FROM sqlite_master WHERE type='table'")?
|
||||
.query_map([], |row| row.get(0))?
|
||||
.map(|r| r.unwrap())
|
||||
.collect()
|
||||
};
|
||||
for table in &tables {
|
||||
exporter.start_tree(table)?;
|
||||
let tree = self.open_tree_impl(table);
|
||||
for (key, value) in tree.iter() {
|
||||
exporter.key_value(&key, &value)?;
|
||||
}
|
||||
exporter.end_tree(&table)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SqliteTable {
|
||||
|
|
542
src/database/abstraction/tests.rs
Normal file
542
src/database/abstraction/tests.rs
Normal file
|
@ -0,0 +1,542 @@
|
|||
use crate::database::{
|
||||
abstraction::{
|
||||
json_stream_export::KeyValueJsonExporter, KeyValueDatabaseEngine, KvExport, KvTree,
|
||||
},
|
||||
Config,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use tempfile::{Builder, TempDir};
|
||||
|
||||
fn empty_config(database_path: &str) -> Config {
|
||||
use figment::providers::{Format, Toml};
|
||||
Toml::from_str(&format!(
|
||||
r#"
|
||||
server_name = "test"
|
||||
database_path = "{}"
|
||||
"#,
|
||||
database_path
|
||||
))
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn open_instance<T>(test_name: &str) -> (Arc<T>, TempDir)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let db_folder = Builder::new().prefix(test_name).tempdir().unwrap();
|
||||
let config = empty_config(db_folder.path().to_str().unwrap());
|
||||
let instance = Arc::<T>::open(&config).unwrap();
|
||||
(instance, db_folder)
|
||||
}
|
||||
/// Make sure to keep the reference of the tree returned values for
|
||||
/// the length of the test, to avoid early cleanups that may create test issues
|
||||
fn open_tree<T>(test_name: &str) -> (Arc<dyn KvTree>, impl KeyValueDatabaseEngine, TempDir)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (instance, db_folder) = open_instance(test_name);
|
||||
let tree = instance.open_tree("test").unwrap();
|
||||
(tree, instance, db_folder)
|
||||
}
|
||||
|
||||
fn insert_get<T>(name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (tree, _inst, _temp_dir) = open_tree::<T>(name);
|
||||
let key = "key".as_bytes();
|
||||
let value = "value".as_bytes();
|
||||
tree.insert(key, value).unwrap();
|
||||
let read = tree.get(key).unwrap();
|
||||
assert_eq!(read, Some(value.to_owned()));
|
||||
}
|
||||
|
||||
fn insert_get_replace<T>(name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (tree, _inst, _temp_dir) = open_tree::<T>(name);
|
||||
let key = "key".as_bytes();
|
||||
let value = "value".as_bytes();
|
||||
tree.insert(key, value).unwrap();
|
||||
let read = tree.get(key).unwrap();
|
||||
assert_eq!(read, Some(value.to_owned()));
|
||||
|
||||
let value1 = "value1".as_bytes();
|
||||
tree.insert(key, value1).unwrap();
|
||||
let read = tree.get(key).unwrap();
|
||||
assert_eq!(read, Some(value1.to_owned()));
|
||||
}
|
||||
|
||||
fn insert_get_remove<T>(name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (tree, _inst, _temp_dir) = open_tree::<T>(name);
|
||||
let key = "key".as_bytes();
|
||||
let value = "value".as_bytes();
|
||||
tree.insert(key, value).unwrap();
|
||||
let read = tree.get(key).unwrap();
|
||||
assert_eq!(read, Some(value.to_owned()));
|
||||
tree.remove(key).unwrap();
|
||||
let read = tree.get(key).unwrap();
|
||||
assert_eq!(read, None);
|
||||
// Remove of not existing key should run seamless
|
||||
tree.remove(key).unwrap();
|
||||
}
|
||||
|
||||
fn batch_insert_get<T>(name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (tree, _inst, _temp_dir) = open_tree::<T>(name);
|
||||
let key = "key".as_bytes();
|
||||
let value = "value".as_bytes();
|
||||
let key1 = "key1".as_bytes();
|
||||
let value1 = "value1".as_bytes();
|
||||
let key2 = "key2".as_bytes();
|
||||
let value2 = "value2".as_bytes();
|
||||
tree.insert_batch(
|
||||
&mut vec![
|
||||
(key.to_owned(), value.to_owned()),
|
||||
(key1.to_owned(), value1.to_owned()),
|
||||
(key2.to_owned(), value2.to_owned()),
|
||||
]
|
||||
.into_iter(),
|
||||
)
|
||||
.unwrap();
|
||||
let read = tree.get(key).unwrap();
|
||||
assert_eq!(read, Some(value.to_owned()));
|
||||
let read = tree.get(key1).unwrap();
|
||||
assert_eq!(read, Some(value1.to_owned()));
|
||||
let read = tree.get(key2).unwrap();
|
||||
assert_eq!(read, Some(value2.to_owned()));
|
||||
}
|
||||
|
||||
fn insert_iter<T>(name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (tree, _inst, _temp_dir) = open_tree::<T>(name);
|
||||
let key = "key".as_bytes();
|
||||
let value = "value".as_bytes();
|
||||
tree.insert(key, value).unwrap();
|
||||
let key1 = "key1".as_bytes();
|
||||
let value1 = "value1".as_bytes();
|
||||
tree.insert(key1, value1).unwrap();
|
||||
let key2 = "key2".as_bytes();
|
||||
let value2 = "value2".as_bytes();
|
||||
tree.insert(key2, value2).unwrap();
|
||||
let mut iter = tree.iter();
|
||||
assert_eq!(iter.next(), Some((key.to_owned(), value.to_owned())));
|
||||
assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned())));
|
||||
assert_eq!(iter.next(), Some((key2.to_owned(), value2.to_owned())));
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
fn insert_iter_from<T>(name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (tree, _inst, _temp_dir) = open_tree::<T>(name);
|
||||
let key = "key".as_bytes();
|
||||
let value = "value".as_bytes();
|
||||
tree.insert(key, value).unwrap();
|
||||
let key1 = "key1".as_bytes();
|
||||
let value1 = "value1".as_bytes();
|
||||
tree.insert(key1, value1).unwrap();
|
||||
let key2 = "key2".as_bytes();
|
||||
let value2 = "value2".as_bytes();
|
||||
tree.insert(key2, value2).unwrap();
|
||||
let mut iter = tree.iter_from(key1, false);
|
||||
assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned())));
|
||||
assert_eq!(iter.next(), Some((key2.to_owned(), value2.to_owned())));
|
||||
assert_eq!(iter.next(), None);
|
||||
let mut iter = tree.iter_from(key1, true);
|
||||
assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned())));
|
||||
assert_eq!(iter.next(), Some((key.to_owned(), value.to_owned())));
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
fn insert_iter_prefix<T>(name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (tree, _inst, _temp_dir) = open_tree::<T>(name);
|
||||
let key = "key".as_bytes();
|
||||
let value = "value".as_bytes();
|
||||
tree.insert(key, value).unwrap();
|
||||
let key1 = "key1".as_bytes();
|
||||
let value1 = "value1".as_bytes();
|
||||
tree.insert(key1, value1).unwrap();
|
||||
let key11 = "key11".as_bytes();
|
||||
let value11 = "value11".as_bytes();
|
||||
tree.insert(key11, value11).unwrap();
|
||||
let key2 = "key2".as_bytes();
|
||||
let value2 = "value2".as_bytes();
|
||||
tree.insert(key2, value2).unwrap();
|
||||
let mut iter = tree.scan_prefix(key1.to_owned());
|
||||
assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned())));
|
||||
assert_eq!(iter.next(), Some((key11.to_owned(), value11.to_owned())));
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
fn insert_clear<T>(name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (tree, _inst, _temp_dir) = open_tree::<T>(name);
|
||||
let key = "key".as_bytes();
|
||||
let value = "value".as_bytes();
|
||||
tree.insert(key, value).unwrap();
|
||||
let key1 = "key1".as_bytes();
|
||||
let value1 = "value1".as_bytes();
|
||||
tree.insert(key1, value1).unwrap();
|
||||
let key2 = "key2".as_bytes();
|
||||
let value2 = "value2".as_bytes();
|
||||
tree.insert(key2, value2).unwrap();
|
||||
assert_eq!(tree.iter().count(), 3);
|
||||
tree.clear().unwrap();
|
||||
assert_eq!(tree.iter().count(), 0);
|
||||
}
|
||||
|
||||
fn increment<T>(name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (tree, _inst, _temp_dir) = open_tree::<T>(name);
|
||||
let key = "key".as_bytes();
|
||||
tree.increment(key).unwrap();
|
||||
let read = tree.get(key).unwrap();
|
||||
assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 1);
|
||||
tree.increment(key).unwrap();
|
||||
let read = tree.get(key).unwrap();
|
||||
assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2);
|
||||
}
|
||||
|
||||
fn increment_batch<T>(name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (tree, _inst, _temp_dir) = open_tree::<T>(name);
|
||||
let key = "key".as_bytes();
|
||||
let key1 = "key1".as_bytes();
|
||||
tree.increment_batch(&mut vec![key.to_owned(), key1.to_owned()].into_iter())
|
||||
.unwrap();
|
||||
let read = tree.get(key).unwrap();
|
||||
assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 1);
|
||||
let read = tree.get(key1).unwrap();
|
||||
assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 1);
|
||||
tree.increment_batch(&mut vec![key.to_owned(), key1.to_owned()].into_iter())
|
||||
.unwrap();
|
||||
let read = tree.get(key).unwrap();
|
||||
assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2);
|
||||
let read = tree.get(key1).unwrap();
|
||||
assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2);
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TestBackup {
|
||||
data: Vec<(String, Vec<u8>, Vec<u8>)>,
|
||||
current_tree: String,
|
||||
}
|
||||
impl TestBackup {
|
||||
fn import<T>(&self, store: &Arc<T>) -> crate::Result<()>
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
for (tree, k, v) in &self.data {
|
||||
let data = store.open_tree(&tree)?;
|
||||
data.insert(&k, &v)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
impl KvExport for TestBackup {
|
||||
fn start_tree(&mut self, name: &str) -> crate::Result<()> {
|
||||
self.current_tree = name.to_owned();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn key_value(&mut self, key: &[u8], value: &[u8]) -> crate::Result<()> {
|
||||
self.data
|
||||
.push((self.current_tree.clone(), key.to_owned(), value.to_owned()));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn end_tree(&mut self, _name: &str) -> crate::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_data<TT>(instance: &Arc<TT>, data: &str)
|
||||
where
|
||||
Arc<TT>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let tree = instance.open_tree(data).unwrap();
|
||||
let key = format!("{}", data);
|
||||
let value = "value".as_bytes();
|
||||
tree.insert(key.as_bytes(), value).unwrap();
|
||||
let key1 = format!("{}1", data);
|
||||
let value1 = "value1".as_bytes();
|
||||
tree.insert(key1.as_bytes(), value1).unwrap();
|
||||
let key2 = format!("{}2", data);
|
||||
let value2 = "value2".as_bytes();
|
||||
tree.insert(key2.as_bytes(), value2).unwrap();
|
||||
}
|
||||
|
||||
fn check_data<TT>(instance: &Arc<TT>, data: &str)
|
||||
where
|
||||
Arc<TT>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let tree = instance.open_tree(data).unwrap();
|
||||
let key = format!("{}", data);
|
||||
let value = "value".as_bytes();
|
||||
let key1 = format!("{}1", data);
|
||||
let value1 = "value1".as_bytes();
|
||||
let key2 = format!("{}2", data);
|
||||
let value2 = "value2".as_bytes();
|
||||
let mut iter = tree.iter();
|
||||
assert_eq!(
|
||||
iter.next(),
|
||||
Some((key.as_bytes().to_owned(), value.to_owned()))
|
||||
);
|
||||
assert_eq!(
|
||||
iter.next(),
|
||||
Some((key1.as_bytes().to_owned(), value1.to_owned()))
|
||||
);
|
||||
assert_eq!(
|
||||
iter.next(),
|
||||
Some((key2.as_bytes().to_owned(), value2.to_owned()))
|
||||
);
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
fn test_export_import<T>(test_name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (instance, _db_folder) = open_instance(test_name);
|
||||
insert_data(&instance, "one");
|
||||
insert_data(&instance, "two");
|
||||
let mut bk = TestBackup::default();
|
||||
instance.export(&mut bk).unwrap();
|
||||
let (instance_r, _db_folder) = open_instance(&format!("{}_restore", test_name));
|
||||
bk.import(&instance_r).unwrap();
|
||||
check_data(&instance_r, "one");
|
||||
check_data(&instance_r, "two");
|
||||
}
|
||||
|
||||
fn test_export_import_json<T>(test_name: &str)
|
||||
where
|
||||
Arc<T>: KeyValueDatabaseEngine,
|
||||
{
|
||||
let (instance, _db_folder) = open_instance(test_name);
|
||||
insert_data(&instance, "one");
|
||||
insert_data(&instance, "two");
|
||||
let mut buffer = Vec::new();
|
||||
instance.export_json_stream(&mut buffer).unwrap();
|
||||
let (instance_r, _db_folder) = open_instance(&format!("{}_restore", test_name));
|
||||
instance_r
|
||||
.import_json_stream(&mut std::io::Cursor::new(buffer))
|
||||
.unwrap();
|
||||
check_data(&instance_r, "one");
|
||||
check_data(&instance_r, "two");
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
mod sqlite {
|
||||
|
||||
use super::*;
|
||||
use crate::database::abstraction::sqlite::Engine;
|
||||
|
||||
#[test]
|
||||
fn sqlite_insert_get() {
|
||||
insert_get::<Engine>("sqlite_insert_get")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sqlite_insert_replace_get() {
|
||||
insert_get_replace::<Engine>("sqlite_insert_get_replace")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sqlite_insert_get_remove() {
|
||||
insert_get_remove::<Engine>("sqlite_insert_get_remove")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sqlite_batch_insert_get() {
|
||||
batch_insert_get::<Engine>("sqlite_batch_insert_get")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sqlite_insert_iter() {
|
||||
insert_iter::<Engine>("sqlite_insert_iter")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sqlite_insert_iter_from() {
|
||||
insert_iter_from::<Engine>("sqlite_insert_iter_from")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sqlite_insert_iter_prefix() {
|
||||
insert_iter_prefix::<Engine>("sqlite_insert_iter_prefix")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sqlite_insert_clear() {
|
||||
insert_clear::<Engine>("sqlite_insert_iter_prefix")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sqlite_increment() {
|
||||
increment::<Engine>("sqlite_increment")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sqlite_increment_batch() {
|
||||
increment_batch::<Engine>("sqlite_increment_batch")
|
||||
}
|
||||
#[test]
|
||||
fn sqlite_export_import() {
|
||||
test_export_import::<Engine>("sqlite_export_import")
|
||||
}
|
||||
#[test]
|
||||
fn sqlite_export_import_json() {
|
||||
test_export_import_json::<Engine>("sqlite_export_import_json")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "rocksdb")]
|
||||
mod rocksdb {
|
||||
|
||||
use super::*;
|
||||
use crate::database::abstraction::rocksdb::Engine;
|
||||
|
||||
#[test]
|
||||
fn rocksdb_insert_get() {
|
||||
insert_get::<Engine>("rocksdb_insert_get")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rocksdb_insert_replace_get() {
|
||||
insert_get_replace::<Engine>("rocksdb_insert_get_replace")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rocksdb_insert_get_remove() {
|
||||
insert_get_remove::<Engine>("rocksdb_insert_get_remove")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rocksdb_batch_insert_get() {
|
||||
batch_insert_get::<Engine>("rocksdb_batch_insert_get")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rocksdb_insert_iter() {
|
||||
insert_iter::<Engine>("rocksdb_insert_iter")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rocksdb_insert_iter_from() {
|
||||
insert_iter_from::<Engine>("rocksdb_insert_iter_from")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rocksdb_insert_iter_prefix() {
|
||||
insert_iter_prefix::<Engine>("rocksdb_insert_iter_prefix")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rocksdb_insert_clear() {
|
||||
insert_clear::<Engine>("rocksdb_insert_iter_prefix")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rocksdb_increment() {
|
||||
increment::<Engine>("rocksdb_increment")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rocksdb_increment_batch() {
|
||||
increment_batch::<Engine>("rocksdb_increment_batch")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rocksdb_export_import() {
|
||||
test_export_import::<Engine>("rocksdb_export_import")
|
||||
}
|
||||
#[test]
|
||||
fn rocksdb_export_import_json() {
|
||||
test_export_import_json::<Engine>("rocksdb_export_import_json")
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "persy")]
|
||||
mod persy {
|
||||
|
||||
use super::*;
|
||||
use crate::database::abstraction::persy::Engine;
|
||||
|
||||
#[test]
|
||||
fn persy_insert_get() {
|
||||
insert_get::<Engine>("persy_insert_get")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persy_insert_replace_get() {
|
||||
insert_get_replace::<Engine>("persy_insert_get_replace")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persy_insert_get_remove() {
|
||||
insert_get_remove::<Engine>("persy_insert_get_remove")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persy_batch_insert_get() {
|
||||
batch_insert_get::<Engine>("persy_batch_insert_get")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persy_insert_iter() {
|
||||
insert_iter::<Engine>("persy_insert_iter")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persy_insert_iter_from() {
|
||||
insert_iter_from::<Engine>("persy_insert_iter_from")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persy_insert_iter_prefix() {
|
||||
insert_iter_prefix::<Engine>("persy_insert_iter_prefix")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persy_insert_clear() {
|
||||
insert_clear::<Engine>("persy_insert_iter_prefix")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persy_increment() {
|
||||
increment::<Engine>("persy_increment")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persy_increment_batch() {
|
||||
increment_batch::<Engine>("persy_increment_batch")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persy_export_import() {
|
||||
test_export_import::<Engine>("persy_export_import")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persy_export_import_json() {
|
||||
test_export_import_json::<Engine>("persy_export_import_json")
|
||||
}
|
||||
}
|
|
@ -164,7 +164,7 @@ impl Error {
|
|||
#[cfg(feature = "persy")]
|
||||
Self::PersyError { .. } => db_error,
|
||||
#[cfg(feature = "heed")]
|
||||
Self::HeedError => db_error,
|
||||
Self::HeedError { .. } => db_error,
|
||||
#[cfg(feature = "rocksdb")]
|
||||
Self::RocksDbError { .. } => db_error,
|
||||
Self::IoError { .. } => db_error,
|
||||
|
|
Loading…
Reference in a new issue