mirror of
https://gitlab.com/famedly/conduit.git
synced 2024-12-29 05:53:48 +03:00
fix: long prev event fetch times for huge rooms
This commit is contained in:
parent
665aee11c0
commit
9410d3ef9c
3 changed files with 49 additions and 38 deletions
|
@ -272,7 +272,7 @@ impl Database {
|
||||||
referencedevents: builder.open_tree("referencedevents")?,
|
referencedevents: builder.open_tree("referencedevents")?,
|
||||||
pdu_cache: Mutex::new(LruCache::new(100_000)),
|
pdu_cache: Mutex::new(LruCache::new(100_000)),
|
||||||
auth_chain_cache: Mutex::new(LruCache::new(100_000)),
|
auth_chain_cache: Mutex::new(LruCache::new(100_000)),
|
||||||
shorteventid_cache: Mutex::new(LruCache::new(100_000)),
|
shorteventid_cache: Mutex::new(LruCache::new(1_000_000)),
|
||||||
},
|
},
|
||||||
account_data: account_data::AccountData {
|
account_data: account_data::AccountData {
|
||||||
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
|
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
|
||||||
|
|
|
@ -99,15 +99,11 @@ impl Rooms {
|
||||||
Ok(self
|
Ok(self
|
||||||
.stateid_shorteventid
|
.stateid_shorteventid
|
||||||
.scan_prefix(shortstatehash.to_be_bytes().to_vec())
|
.scan_prefix(shortstatehash.to_be_bytes().to_vec())
|
||||||
.map(|(_, bytes)| self.shorteventid_eventid.get(&bytes).ok().flatten())
|
.map(|(_, bytes)| {
|
||||||
.flatten()
|
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap())
|
||||||
.map(|bytes| {
|
.ok()
|
||||||
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
|
|
||||||
Error::bad_database("EventID in stateid_shorteventid is invalid unicode.")
|
|
||||||
})?)
|
|
||||||
.map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid."))
|
|
||||||
})
|
})
|
||||||
.filter_map(|r| r.ok())
|
.flatten()
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,15 +114,11 @@ impl Rooms {
|
||||||
let state = self
|
let state = self
|
||||||
.stateid_shorteventid
|
.stateid_shorteventid
|
||||||
.scan_prefix(shortstatehash.to_be_bytes().to_vec())
|
.scan_prefix(shortstatehash.to_be_bytes().to_vec())
|
||||||
.map(|(_, bytes)| self.shorteventid_eventid.get(&bytes).ok().flatten())
|
.map(|(_, bytes)| {
|
||||||
.flatten()
|
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap())
|
||||||
.map(|bytes| {
|
.ok()
|
||||||
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
|
|
||||||
Error::bad_database("EventID in stateid_shorteventid is invalid unicode.")
|
|
||||||
})?)
|
|
||||||
.map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid."))
|
|
||||||
})
|
})
|
||||||
.filter_map(|r| r.ok())
|
.flatten()
|
||||||
.map(|eventid| self.get_pdu(&eventid))
|
.map(|eventid| self.get_pdu(&eventid))
|
||||||
.filter_map(|r| r.ok().flatten())
|
.filter_map(|r| r.ok().flatten())
|
||||||
.map(|pdu| {
|
.map(|pdu| {
|
||||||
|
@ -168,15 +160,10 @@ impl Rooms {
|
||||||
Ok(self
|
Ok(self
|
||||||
.stateid_shorteventid
|
.stateid_shorteventid
|
||||||
.get(&stateid)?
|
.get(&stateid)?
|
||||||
.map(|bytes| self.shorteventid_eventid.get(&bytes).ok().flatten())
|
|
||||||
.flatten()
|
|
||||||
.map(|bytes| {
|
.map(|bytes| {
|
||||||
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
|
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap())
|
||||||
Error::bad_database("EventID in stateid_shorteventid is invalid unicode.")
|
.ok()
|
||||||
})?)
|
|
||||||
.map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid."))
|
|
||||||
})
|
})
|
||||||
.map(|r| r.ok())
|
|
||||||
.flatten())
|
.flatten())
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
|
@ -448,7 +435,12 @@ impl Rooms {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result<EventId> {
|
pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result<EventId> {
|
||||||
if let Some(id) = self.shorteventid_cache.lock().unwrap().get_mut(&shorteventid) {
|
if let Some(id) = self
|
||||||
|
.shorteventid_cache
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.get_mut(&shorteventid)
|
||||||
|
{
|
||||||
return Ok(id.clone());
|
return Ok(id.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -457,12 +449,11 @@ impl Rooms {
|
||||||
.get(&shorteventid.to_be_bytes())?
|
.get(&shorteventid.to_be_bytes())?
|
||||||
.ok_or_else(|| Error::bad_database("Shorteventid does not exist"))?;
|
.ok_or_else(|| Error::bad_database("Shorteventid does not exist"))?;
|
||||||
|
|
||||||
let event_id = EventId::try_from(
|
let event_id =
|
||||||
utils::string_from_bytes(&bytes).map_err(|_| {
|
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
|
||||||
Error::bad_database("EventID in roomid_pduleaves is invalid unicode.")
|
Error::bad_database("EventID in roomid_pduleaves is invalid unicode.")
|
||||||
})?,
|
})?)
|
||||||
)
|
.map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))?;
|
||||||
.map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))?;
|
|
||||||
|
|
||||||
self.shorteventid_cache
|
self.shorteventid_cache
|
||||||
.lock()
|
.lock()
|
||||||
|
|
|
@ -668,7 +668,7 @@ pub async fn send_transaction_message_route(
|
||||||
|
|
||||||
let elapsed = start_time.elapsed();
|
let elapsed = start_time.elapsed();
|
||||||
warn!(
|
warn!(
|
||||||
"Handling event {} took {}m{}s",
|
"Handling transaction of event {} took {}m{}s",
|
||||||
event_id,
|
event_id,
|
||||||
elapsed.as_secs() / 60,
|
elapsed.as_secs() / 60,
|
||||||
elapsed.as_secs() % 60
|
elapsed.as_secs() % 60
|
||||||
|
@ -850,6 +850,8 @@ pub fn handle_incoming_pdu<'a>(
|
||||||
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
||||||
) -> AsyncRecursiveType<'a, StdResult<Option<Vec<u8>>, String>> {
|
) -> AsyncRecursiveType<'a, StdResult<Option<Vec<u8>>, String>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
let start_time = Instant::now();
|
||||||
|
|
||||||
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
|
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
|
||||||
match db.rooms.exists(&room_id) {
|
match db.rooms.exists(&room_id) {
|
||||||
Ok(true) => {}
|
Ok(true) => {}
|
||||||
|
@ -1014,12 +1016,18 @@ pub fn handle_incoming_pdu<'a>(
|
||||||
// 8. if not timeline event: stop
|
// 8. if not timeline event: stop
|
||||||
if !is_timeline_event
|
if !is_timeline_event
|
||||||
|| incoming_pdu.origin_server_ts
|
|| incoming_pdu.origin_server_ts
|
||||||
< db.rooms
|
< (utils::millis_since_unix_epoch() - 1000 * 60 * 20)
|
||||||
.first_pdu_in_room(&room_id)
|
.try_into()
|
||||||
.map_err(|_| "Error loading first room event.".to_owned())?
|
.expect("time is valid")
|
||||||
.expect("Room exists")
|
// Not older than 20 mins
|
||||||
.origin_server_ts
|
|
||||||
{
|
{
|
||||||
|
let elapsed = start_time.elapsed();
|
||||||
|
warn!(
|
||||||
|
"Handling outlier event {} took {}m{}s",
|
||||||
|
event_id,
|
||||||
|
elapsed.as_secs() / 60,
|
||||||
|
elapsed.as_secs() % 60
|
||||||
|
);
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1312,7 +1320,8 @@ pub fn handle_incoming_pdu<'a>(
|
||||||
for state in fork_states {
|
for state in fork_states {
|
||||||
auth_chain_sets.push(
|
auth_chain_sets.push(
|
||||||
get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db)
|
get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db)
|
||||||
.map_err(|_| "Failed to load auth chain.".to_owned())?.collect(),
|
.map_err(|_| "Failed to load auth chain.".to_owned())?
|
||||||
|
.collect(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1385,6 +1394,14 @@ pub fn handle_incoming_pdu<'a>(
|
||||||
|
|
||||||
// Event has passed all auth/stateres checks
|
// Event has passed all auth/stateres checks
|
||||||
drop(state_lock);
|
drop(state_lock);
|
||||||
|
|
||||||
|
let elapsed = start_time.elapsed();
|
||||||
|
warn!(
|
||||||
|
"Handling timeline event {} took {}m{}s",
|
||||||
|
event_id,
|
||||||
|
elapsed.as_secs() / 60,
|
||||||
|
elapsed.as_secs() % 60
|
||||||
|
);
|
||||||
Ok(pdu_id)
|
Ok(pdu_id)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1757,7 +1774,10 @@ fn append_incoming_pdu(
|
||||||
Ok(pdu_id)
|
Ok(pdu_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_auth_chain(starting_events: Vec<EventId>, db: &Database) -> Result<impl Iterator<Item = EventId> + '_> {
|
fn get_auth_chain(
|
||||||
|
starting_events: Vec<EventId>,
|
||||||
|
db: &Database,
|
||||||
|
) -> Result<impl Iterator<Item = EventId> + '_> {
|
||||||
let mut full_auth_chain = HashSet::new();
|
let mut full_auth_chain = HashSet::new();
|
||||||
|
|
||||||
let starting_events = starting_events
|
let starting_events = starting_events
|
||||||
|
|
Loading…
Reference in a new issue