mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-01-15 22:16:27 +03:00
WIP improvement: much better state storage
This commit is contained in:
parent
f29a6d7945
commit
0b073c6534
3 changed files with 345 additions and 10 deletions
338
src/database.rs
338
src/database.rs
|
@ -24,13 +24,14 @@ use rocket::{
|
|||
request::{FromRequest, Request},
|
||||
Shutdown, State,
|
||||
};
|
||||
use ruma::{DeviceId, RoomId, ServerName, UserId};
|
||||
use ruma::{DeviceId, EventId, RoomId, ServerName, UserId};
|
||||
use serde::{de::IgnoredAny, Deserialize};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
convert::TryFrom,
|
||||
fs::{self, remove_dir_all},
|
||||
io::Write,
|
||||
mem::size_of,
|
||||
ops::Deref,
|
||||
path::Path,
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
|
@ -261,7 +262,12 @@ impl Database {
|
|||
userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?,
|
||||
|
||||
statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?,
|
||||
|
||||
shortroomid_roomid: builder.open_tree("shortroomid_roomid")?,
|
||||
roomid_shortroomid: builder.open_tree("roomid_shortroomid")?,
|
||||
|
||||
stateid_shorteventid: builder.open_tree("stateid_shorteventid")?,
|
||||
shortstatehash_statediff: builder.open_tree("shortstatehash_statediff")?,
|
||||
eventid_shorteventid: builder.open_tree("eventid_shorteventid")?,
|
||||
shorteventid_eventid: builder.open_tree("shorteventid_eventid")?,
|
||||
shorteventid_shortstatehash: builder.open_tree("shorteventid_shortstatehash")?,
|
||||
|
@ -438,6 +444,334 @@ impl Database {
|
|||
|
||||
println!("Migration: 5 -> 6 finished");
|
||||
}
|
||||
|
||||
fn load_shortstatehash_info(
|
||||
shortstatehash: &[u8],
|
||||
db: &Database,
|
||||
lru: &mut LruCache<
|
||||
Vec<u8>,
|
||||
Vec<(
|
||||
Vec<u8>,
|
||||
HashSet<Vec<u8>>,
|
||||
HashSet<Vec<u8>>,
|
||||
HashSet<Vec<u8>>,
|
||||
)>,
|
||||
>,
|
||||
) -> Result<
|
||||
Vec<(
|
||||
Vec<u8>, // sstatehash
|
||||
HashSet<Vec<u8>>, // full state
|
||||
HashSet<Vec<u8>>, // added
|
||||
HashSet<Vec<u8>>, // removed
|
||||
)>,
|
||||
> {
|
||||
if let Some(result) = lru.get_mut(shortstatehash) {
|
||||
return Ok(result.clone());
|
||||
}
|
||||
|
||||
let value = db
|
||||
.rooms
|
||||
.shortstatehash_statediff
|
||||
.get(shortstatehash)?
|
||||
.ok_or_else(|| Error::bad_database("State hash does not exist"))?;
|
||||
let parent = value[0..size_of::<u64>()].to_vec();
|
||||
|
||||
let mut add_mode = true;
|
||||
let mut added = HashSet::new();
|
||||
let mut removed = HashSet::new();
|
||||
|
||||
let mut i = size_of::<u64>();
|
||||
while let Some(v) = value.get(i..i + 2 * size_of::<u64>()) {
|
||||
if add_mode && v.starts_with(&0_u64.to_be_bytes()) {
|
||||
add_mode = false;
|
||||
i += size_of::<u64>();
|
||||
continue;
|
||||
}
|
||||
if add_mode {
|
||||
added.insert(v.to_vec());
|
||||
} else {
|
||||
removed.insert(v.to_vec());
|
||||
}
|
||||
i += 2 * size_of::<u64>();
|
||||
}
|
||||
|
||||
if parent != 0_u64.to_be_bytes() {
|
||||
let mut response = load_shortstatehash_info(&parent, db, lru)?;
|
||||
let mut state = response.last().unwrap().1.clone();
|
||||
state.extend(added.iter().cloned());
|
||||
for r in &removed {
|
||||
state.remove(r);
|
||||
}
|
||||
|
||||
response.push((shortstatehash.to_vec(), state, added, removed));
|
||||
|
||||
lru.insert(shortstatehash.to_vec(), response.clone());
|
||||
Ok(response)
|
||||
} else {
|
||||
let mut response = Vec::new();
|
||||
response.push((shortstatehash.to_vec(), added.clone(), added, removed));
|
||||
lru.insert(shortstatehash.to_vec(), response.clone());
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
fn update_shortstatehash_level(
|
||||
current_shortstatehash: &[u8],
|
||||
statediffnew: HashSet<Vec<u8>>,
|
||||
statediffremoved: HashSet<Vec<u8>>,
|
||||
diff_to_sibling: usize,
|
||||
mut parent_states: Vec<(
|
||||
Vec<u8>, // sstatehash
|
||||
HashSet<Vec<u8>>, // full state
|
||||
HashSet<Vec<u8>>, // added
|
||||
HashSet<Vec<u8>>, // removed
|
||||
)>,
|
||||
db: &Database,
|
||||
) -> Result<()> {
|
||||
let diffsum = statediffnew.len() + statediffremoved.len();
|
||||
|
||||
if parent_states.len() > 3 {
|
||||
// Number of layers
|
||||
// To many layers, we have to go deeper
|
||||
let parent = parent_states.pop().unwrap();
|
||||
|
||||
let mut parent_new = parent.2;
|
||||
let mut parent_removed = parent.3;
|
||||
|
||||
for removed in statediffremoved {
|
||||
if !parent_new.remove(&removed) {
|
||||
parent_removed.insert(removed);
|
||||
}
|
||||
}
|
||||
parent_new.extend(statediffnew);
|
||||
|
||||
update_shortstatehash_level(
|
||||
current_shortstatehash,
|
||||
parent_new,
|
||||
parent_removed,
|
||||
diffsum,
|
||||
parent_states,
|
||||
db,
|
||||
)?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if parent_states.len() == 0 {
|
||||
// There is no parent layer, create a new state
|
||||
let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent
|
||||
for new in &statediffnew {
|
||||
value.extend_from_slice(&new);
|
||||
}
|
||||
|
||||
if !statediffremoved.is_empty() {
|
||||
warn!("Tried to create new state with removals");
|
||||
}
|
||||
|
||||
db.rooms
|
||||
.shortstatehash_statediff
|
||||
.insert(¤t_shortstatehash, &value)?;
|
||||
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Else we have two options.
|
||||
// 1. We add the current diff on top of the parent layer.
|
||||
// 2. We replace a layer above
|
||||
|
||||
let parent = parent_states.pop().unwrap();
|
||||
let parent_diff = parent.2.len() + parent.3.len();
|
||||
|
||||
if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff {
|
||||
// Diff too big, we replace above layer(s)
|
||||
let mut parent_new = parent.2;
|
||||
let mut parent_removed = parent.3;
|
||||
|
||||
for removed in statediffremoved {
|
||||
if !parent_new.remove(&removed) {
|
||||
parent_removed.insert(removed);
|
||||
}
|
||||
}
|
||||
|
||||
parent_new.extend(statediffnew);
|
||||
update_shortstatehash_level(
|
||||
current_shortstatehash,
|
||||
parent_new,
|
||||
parent_removed,
|
||||
diffsum,
|
||||
parent_states,
|
||||
db,
|
||||
)?;
|
||||
} else {
|
||||
// Diff small enough, we add diff as layer on top of parent
|
||||
let mut value = parent.0.clone();
|
||||
for new in &statediffnew {
|
||||
value.extend_from_slice(&new);
|
||||
}
|
||||
|
||||
if !statediffremoved.is_empty() {
|
||||
value.extend_from_slice(&0_u64.to_be_bytes());
|
||||
for removed in &statediffremoved {
|
||||
value.extend_from_slice(&removed);
|
||||
}
|
||||
}
|
||||
|
||||
db.rooms
|
||||
.shortstatehash_statediff
|
||||
.insert(¤t_shortstatehash, &value)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
if db.globals.database_version()? < 7 {
|
||||
// Upgrade state store
|
||||
let mut lru = LruCache::new(1000);
|
||||
let mut last_roomstates: HashMap<RoomId, Vec<u8>> = HashMap::new();
|
||||
let mut current_sstatehash: Vec<u8> = Vec::new();
|
||||
let mut current_room = None;
|
||||
let mut current_state = HashSet::new();
|
||||
let mut counter = 0;
|
||||
for (k, seventid) in db._db.open_tree("stateid_shorteventid")?.iter() {
|
||||
let sstatehash = k[0..size_of::<u64>()].to_vec();
|
||||
let sstatekey = k[size_of::<u64>()..].to_vec();
|
||||
if sstatehash != current_sstatehash {
|
||||
if !current_sstatehash.is_empty() {
|
||||
counter += 1;
|
||||
println!("counter: {}", counter);
|
||||
let current_room = current_room.as_ref().unwrap();
|
||||
let last_roomsstatehash = last_roomstates.get(¤t_room);
|
||||
|
||||
let states_parents = last_roomsstatehash.map_or_else(
|
||||
|| Ok(Vec::new()),
|
||||
|last_roomsstatehash| {
|
||||
load_shortstatehash_info(&last_roomsstatehash, &db, &mut lru)
|
||||
},
|
||||
)?;
|
||||
|
||||
let (statediffnew, statediffremoved) =
|
||||
if let Some(parent_stateinfo) = states_parents.last() {
|
||||
let statediffnew = current_state
|
||||
.difference(&parent_stateinfo.1)
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let statediffremoved = parent_stateinfo
|
||||
.1
|
||||
.difference(¤t_state)
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
(statediffnew, statediffremoved)
|
||||
} else {
|
||||
(current_state, HashSet::new())
|
||||
};
|
||||
|
||||
update_shortstatehash_level(
|
||||
¤t_sstatehash,
|
||||
statediffnew,
|
||||
statediffremoved,
|
||||
2, // every state change is 2 event changes on average
|
||||
states_parents,
|
||||
&db,
|
||||
)?;
|
||||
|
||||
/*
|
||||
let mut tmp = load_shortstatehash_info(¤t_sstatehash, &db)?;
|
||||
let state = tmp.pop().unwrap();
|
||||
println!(
|
||||
"{}\t{}{:?}: {:?} + {:?} - {:?}",
|
||||
current_room,
|
||||
" ".repeat(tmp.len()),
|
||||
utils::u64_from_bytes(¤t_sstatehash).unwrap(),
|
||||
tmp.last().map(|b| utils::u64_from_bytes(&b.0).unwrap()),
|
||||
state
|
||||
.2
|
||||
.iter()
|
||||
.map(|b| utils::u64_from_bytes(&b[size_of::<u64>()..]).unwrap())
|
||||
.collect::<Vec<_>>(),
|
||||
state
|
||||
.3
|
||||
.iter()
|
||||
.map(|b| utils::u64_from_bytes(&b[size_of::<u64>()..]).unwrap())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
*/
|
||||
|
||||
last_roomstates.insert(current_room.clone(), current_sstatehash);
|
||||
}
|
||||
current_state = HashSet::new();
|
||||
current_sstatehash = sstatehash;
|
||||
|
||||
let event_id = db
|
||||
.rooms
|
||||
.shorteventid_eventid
|
||||
.get(&seventid)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let event_id =
|
||||
EventId::try_from(utils::string_from_bytes(&event_id).unwrap())
|
||||
.unwrap();
|
||||
let pdu = db.rooms.get_pdu(&event_id).unwrap().unwrap();
|
||||
|
||||
if Some(&pdu.room_id) != current_room.as_ref() {
|
||||
current_room = Some(pdu.room_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let mut val = sstatekey;
|
||||
val.extend_from_slice(&seventid);
|
||||
current_state.insert(val);
|
||||
}
|
||||
|
||||
db.globals.bump_database_version(7)?;
|
||||
|
||||
println!("Migration: 6 -> 7 finished");
|
||||
}
|
||||
|
||||
if db.globals.database_version()? < 8 {
|
||||
// Generate short room ids for all rooms
|
||||
for (room_id, _) in db.rooms.roomid_shortstatehash.iter() {
|
||||
let shortroomid = db.globals.next_count()?.to_be_bytes();
|
||||
db.rooms.roomid_shortroomid.insert(&room_id, &shortroomid)?;
|
||||
db.rooms.shortroomid_roomid.insert(&shortroomid, &room_id)?;
|
||||
}
|
||||
// Update pduids db layout
|
||||
for (key, v) in db.rooms.pduid_pdu.iter() {
|
||||
let mut parts = key.splitn(2, |&b| b == 0xff);
|
||||
let room_id = parts.next().unwrap();
|
||||
let count = parts.next().unwrap();
|
||||
|
||||
let short_room_id = db.rooms.roomid_shortroomid.get(&room_id)?.unwrap();
|
||||
|
||||
let mut new_key = short_room_id;
|
||||
new_key.extend_from_slice(count);
|
||||
|
||||
println!("{:?}", new_key);
|
||||
}
|
||||
|
||||
// Update tokenids db layout
|
||||
for (key, _) in db.rooms.tokenids.iter() {
|
||||
let mut parts = key.splitn(4, |&b| b == 0xff);
|
||||
let room_id = parts.next().unwrap();
|
||||
let word = parts.next().unwrap();
|
||||
let _pdu_id_room = parts.next().unwrap();
|
||||
let pdu_id_count = parts.next().unwrap();
|
||||
|
||||
let short_room_id = db.rooms.roomid_shortroomid.get(&room_id)?.unwrap();
|
||||
let mut new_key = short_room_id;
|
||||
new_key.extend_from_slice(word);
|
||||
new_key.push(0xff);
|
||||
new_key.extend_from_slice(pdu_id_count);
|
||||
println!("{:?}", new_key);
|
||||
}
|
||||
|
||||
db.globals.bump_database_version(8)?;
|
||||
|
||||
println!("Migration: 7 -> 8 finished");
|
||||
}
|
||||
|
||||
panic!();
|
||||
}
|
||||
|
||||
let guard = db.read().await;
|
||||
|
|
|
@ -223,10 +223,7 @@ impl Tree for SqliteTable {
|
|||
|
||||
let statement = Box::leak(Box::new(
|
||||
guard
|
||||
.prepare(&format!(
|
||||
"SELECT key, value FROM {} ORDER BY key ASC",
|
||||
&self.name
|
||||
))
|
||||
.prepare(&format!("SELECT key, value FROM {} ORDER BY key ASC", &self.name))
|
||||
.unwrap(),
|
||||
));
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ pub struct Rooms {
|
|||
pub(super) aliasid_alias: Arc<dyn Tree>, // AliasId = RoomId + Count
|
||||
pub(super) publicroomids: Arc<dyn Tree>,
|
||||
|
||||
pub(super) tokenids: Arc<dyn Tree>, // TokenId = RoomId + Token + PduId
|
||||
pub(super) tokenids: Arc<dyn Tree>, // TokenId = ShortRoomId + Token + PduIdCount
|
||||
|
||||
/// Participating servers in a room.
|
||||
pub(super) roomserverids: Arc<dyn Tree>, // RoomServerId = RoomId + ServerName
|
||||
|
@ -71,14 +71,18 @@ pub struct Rooms {
|
|||
pub(super) shorteventid_shortstatehash: Arc<dyn Tree>,
|
||||
/// StateKey = EventType + StateKey, ShortStateKey = Count
|
||||
pub(super) statekey_shortstatekey: Arc<dyn Tree>,
|
||||
|
||||
pub(super) shortroomid_roomid: Arc<dyn Tree>,
|
||||
pub(super) roomid_shortroomid: Arc<dyn Tree>,
|
||||
|
||||
pub(super) shorteventid_eventid: Arc<dyn Tree>,
|
||||
/// ShortEventId = Count
|
||||
pub(super) eventid_shorteventid: Arc<dyn Tree>,
|
||||
/// ShortEventId = Count
|
||||
|
||||
pub(super) statehash_shortstatehash: Arc<dyn Tree>,
|
||||
/// ShortStateHash = Count
|
||||
/// StateId = ShortStateHash + ShortStateKey
|
||||
/// StateId = ShortStateHash
|
||||
pub(super) stateid_shorteventid: Arc<dyn Tree>,
|
||||
pub(super) shortstatehash_statediff: Arc<dyn Tree>, // StateDiff = parent (or 0) + (shortstatekey+shorteventid++) + 0_u64 + (shortstatekey+shorteventid--)
|
||||
|
||||
/// RoomId + EventId -> outlier PDU.
|
||||
/// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.
|
||||
|
|
Loading…
Reference in a new issue