1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-04-22 14:10:16 +03:00

feat: first implementation of KeyValue data export

This commit is contained in:
Tglman 2023-08-22 01:47:11 +01:00
parent c901ac0bb8
commit 6c8da70122
3 changed files with 55 additions and 2 deletions
src/database

View file

@ -41,6 +41,17 @@ pub trait KeyValueDatabaseEngine: Send + Sync {
fn memory_usage(&self) -> Result<String> {
Ok("Current database engine does not support memory usage reporting.".to_owned())
}
fn clear_caches(&self) {}
fn export(&self, _exporter: &mut Box<dyn KvExport>) -> Result<()> {
unimplemented!()
}
}
pub trait KvExport {
fn start_index(&mut self, name: &str) -> Result<()>;
fn key_value(&mut self, key: &[u8], value: &[u8]) -> Result<()>;
fn end_index(&mut self, name: &str) -> Result<()>;
}
pub trait KvTree: Send + Sync {

View file

@ -1,6 +1,6 @@
use crate::{
database::{
abstraction::{watchers::Watchers, KeyValueDatabaseEngine, KvTree},
abstraction::{watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree},
Config,
},
Result,
@ -45,6 +45,22 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
fn flush(&self) -> Result<()> {
Ok(())
}
fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> {
let snapshot = self.persy.snapshot()?;
let indexes = snapshot.list_indexes()?;
for (index, _) in indexes {
exporter.start_index(&index)?;
let data = snapshot.range::<ByteVec, ByteVec, _>(&index, ..)?;
for (key, values) in data {
for value in values {
exporter.key_value(&key, &value)?;
}
}
exporter.end_index(&index)?;
}
Ok(())
}
}
pub struct PersyTree {

View file

@ -1,4 +1,4 @@
use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree};
use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree};
use crate::{utils, Result};
use std::{
future::Future,
@ -11,6 +11,7 @@ pub struct Engine {
max_open_files: i32,
cache: rocksdb::Cache,
old_cfs: Vec<String>,
database_path: String,
}
pub struct RocksDbEngineTree<'a> {
@ -85,6 +86,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
max_open_files: config.rocksdb_max_open_files,
cache: rocksdb_cache,
old_cfs: cfs,
database_path: config.database_path.clone(),
}))
}
@ -126,6 +128,30 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
self.cache.get_pinned_usage() as f64 / 1024.0 / 1024.0,
))
}
fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> {
let snapshot = self.rocks.snapshot();
let indexes = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
&rocksdb::Options::default(),
&self.database_path,
)
.unwrap();
for index in indexes {
if let Some(handle) = self.rocks.cf_handle(&index) {
exporter.start_index(&index)?;
let data = snapshot.iterator_cf(&handle, rocksdb::IteratorMode::Start);
for ele in data {
if let Ok((key, value)) = ele {
exporter.key_value(&key, &value)?;
}
}
exporter.end_index(&index)?;
}
}
Ok(())
}
fn clear_caches(&self) {}
}
impl RocksDbEngineTree<'_> {