mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-01-16 22:46:27 +03:00
fix: handle bad events in db better
This commit is contained in:
parent
1cf9da26bf
commit
d2f406e0e8
2 changed files with 41 additions and 37 deletions
|
@ -270,8 +270,8 @@ impl Database {
|
||||||
|
|
||||||
eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?,
|
eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?,
|
||||||
referencedevents: builder.open_tree("referencedevents")?,
|
referencedevents: builder.open_tree("referencedevents")?,
|
||||||
pdu_cache: Mutex::new(LruCache::new(0)),
|
pdu_cache: Mutex::new(LruCache::new(1_000_000)),
|
||||||
auth_chain_cache: Mutex::new(LruCache::new(0)),
|
auth_chain_cache: Mutex::new(LruCache::new(1_000_000)),
|
||||||
},
|
},
|
||||||
account_data: account_data::AccountData {
|
account_data: account_data::AccountData {
|
||||||
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
|
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
|
||||||
|
|
|
@ -666,14 +666,12 @@ pub async fn send_transaction_message_route(
|
||||||
drop(mutex_lock);
|
drop(mutex_lock);
|
||||||
|
|
||||||
let elapsed = start_time.elapsed();
|
let elapsed = start_time.elapsed();
|
||||||
if elapsed > Duration::from_secs(1) {
|
warn!(
|
||||||
warn!(
|
"Handling event {} took {}m{}s",
|
||||||
"Handling event {} took {}m{}s",
|
event_id,
|
||||||
event_id,
|
elapsed.as_secs() / 60,
|
||||||
elapsed.as_secs() / 60,
|
elapsed.as_secs() % 60
|
||||||
elapsed.as_secs() % 60
|
);
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for pdu in &resolved_map {
|
for pdu in &resolved_map {
|
||||||
|
@ -1271,7 +1269,6 @@ pub fn handle_incoming_pdu<'a>(
|
||||||
} else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) {
|
} else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) {
|
||||||
// There was only one state, so it has to be the room's current state (because that is
|
// There was only one state, so it has to be the room's current state (because that is
|
||||||
// always included)
|
// always included)
|
||||||
warn!("Skipping stateres because there is no new state.");
|
|
||||||
fork_states[0]
|
fork_states[0]
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(k, pdu)| (k.clone(), pdu.event_id.clone()))
|
.map(|(k, pdu)| (k.clone(), pdu.event_id.clone()))
|
||||||
|
@ -1411,12 +1408,12 @@ pub(crate) fn fetch_and_handle_events<'a>(
|
||||||
// a. Look in the main timeline (pduid_pdu tree)
|
// a. Look in the main timeline (pduid_pdu tree)
|
||||||
// b. Look at outlier pdu tree
|
// b. Look at outlier pdu tree
|
||||||
// (get_pdu checks both)
|
// (get_pdu checks both)
|
||||||
let pdu = match db.rooms.get_pdu(&id)? {
|
let pdu = match db.rooms.get_pdu(&id) {
|
||||||
Some(pdu) => {
|
Ok(Some(pdu)) => {
|
||||||
trace!("Found {} in db", id);
|
trace!("Found {} in db", id);
|
||||||
pdu
|
pdu
|
||||||
}
|
}
|
||||||
None => {
|
Ok(None) => {
|
||||||
// c. Ask origin server over federation
|
// c. Ask origin server over federation
|
||||||
debug!("Fetching {} over federation.", id);
|
debug!("Fetching {} over federation.", id);
|
||||||
match db
|
match db
|
||||||
|
@ -1431,7 +1428,11 @@ pub(crate) fn fetch_and_handle_events<'a>(
|
||||||
Ok(res) => {
|
Ok(res) => {
|
||||||
debug!("Got {} over federation", id);
|
debug!("Got {} over federation", id);
|
||||||
let (event_id, mut value) =
|
let (event_id, mut value) =
|
||||||
crate::pdu::gen_event_id_canonical_json(&res.pdu)?;
|
match crate::pdu::gen_event_id_canonical_json(&res.pdu) {
|
||||||
|
Ok(t) => t,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
// This will also fetch the auth chain
|
// This will also fetch the auth chain
|
||||||
match handle_incoming_pdu(
|
match handle_incoming_pdu(
|
||||||
origin,
|
origin,
|
||||||
|
@ -1474,6 +1475,10 @@ pub(crate) fn fetch_and_handle_events<'a>(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Err(e) => {
|
||||||
|
debug!("Error loading {}: {}", id, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
pdus.push(pdu);
|
pdus.push(pdu);
|
||||||
}
|
}
|
||||||
|
@ -1728,44 +1733,47 @@ fn get_auth_chain(starting_events: Vec<EventId>, db: &Database) -> Result<HashSe
|
||||||
let mut full_auth_chain = HashSet::new();
|
let mut full_auth_chain = HashSet::new();
|
||||||
|
|
||||||
let mut cache = db.rooms.auth_chain_cache();
|
let mut cache = db.rooms.auth_chain_cache();
|
||||||
if let Some(cached) = cache.get_mut(&starting_events) {
|
|
||||||
return Ok(cached.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
for event_id in &starting_events {
|
for event_id in &starting_events {
|
||||||
if let Some(cached) = cache.get_mut(&[event_id.clone()][..]) {
|
if let Some(cached) = cache.get_mut(&[event_id.clone()][..]) {
|
||||||
full_auth_chain.extend(cached.iter().cloned());
|
full_auth_chain.extend(cached.iter().cloned());
|
||||||
} else {
|
} else {
|
||||||
drop(cache);
|
drop(cache);
|
||||||
let auth_chain = get_auth_chain_recursive(&event_id, HashSet::new(), db)?;
|
let mut auth_chain = HashSet::new();
|
||||||
|
get_auth_chain_recursive(&event_id, &mut auth_chain, db)?;
|
||||||
cache = db.rooms.auth_chain_cache();
|
cache = db.rooms.auth_chain_cache();
|
||||||
cache.insert(vec![event_id.clone()], auth_chain.clone());
|
cache.insert(vec![event_id.clone()], auth_chain.clone());
|
||||||
full_auth_chain.extend(auth_chain);
|
full_auth_chain.extend(auth_chain);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
cache.insert(starting_events, full_auth_chain.clone());
|
|
||||||
|
|
||||||
Ok(full_auth_chain)
|
Ok(full_auth_chain)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_auth_chain_recursive(
|
fn get_auth_chain_recursive(
|
||||||
event_id: &EventId,
|
event_id: &EventId,
|
||||||
mut found: HashSet<EventId>,
|
found: &mut HashSet<EventId>,
|
||||||
db: &Database,
|
db: &Database,
|
||||||
) -> Result<HashSet<EventId>> {
|
) -> Result<()> {
|
||||||
if let Some(pdu) = db.rooms.get_pdu(&event_id)? {
|
let r = db.rooms.get_pdu(&event_id);
|
||||||
for auth_event in &pdu.auth_events {
|
match r {
|
||||||
if !found.contains(auth_event) {
|
Ok(Some(pdu)) => {
|
||||||
found.insert(auth_event.clone());
|
for auth_event in &pdu.auth_events {
|
||||||
found = get_auth_chain_recursive(&auth_event, found, db)?;
|
if !found.contains(auth_event) {
|
||||||
|
found.insert(auth_event.clone());
|
||||||
|
get_auth_chain_recursive(&auth_event, found, db)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
Ok(None) => {
|
||||||
warn!("Could not find pdu mentioned in auth events.");
|
warn!("Could not find pdu mentioned in auth events.");
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Could not load event in auth chain: {}", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(found)
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg_attr(
|
#[cfg_attr(
|
||||||
|
@ -1860,12 +1868,8 @@ pub fn get_event_authorization_route(
|
||||||
Ok(get_event_authorization::v1::Response {
|
Ok(get_event_authorization::v1::Response {
|
||||||
auth_chain: auth_chain_ids
|
auth_chain: auth_chain_ids
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|id| {
|
.filter_map(|id| Some(db.rooms.get_pdu_json(&id).ok()??))
|
||||||
Ok::<_, Error>(PduEvent::convert_to_outgoing_federation_event(
|
.map(|event| PduEvent::convert_to_outgoing_federation_event(event))
|
||||||
db.rooms.get_pdu_json(&id)?.unwrap(),
|
|
||||||
))
|
|
||||||
})
|
|
||||||
.filter_map(|r| r.ok())
|
|
||||||
.collect(),
|
.collect(),
|
||||||
}
|
}
|
||||||
.into())
|
.into())
|
||||||
|
|
Loading…
Reference in a new issue