Append state event that pass resolution to DB, update to tokio 1.1

This commit is contained in:
Devin Ragotzy 2021-01-29 11:20:33 -05:00
parent b8b40ce38b
commit cd0c5c0566
6 changed files with 322 additions and 434 deletions

607
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -14,28 +14,28 @@ edition = "2018"
[dependencies] [dependencies]
# Used to handle requests # Used to handle requests
# TODO: This can become optional as soon as proper configs are supported # TODO: This can become optional as soon as proper configs are supported
rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "031948c1daaa146128d8a435be116476f2adde00", features = ["tls"] } # Used to handle requests rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "c24f15c18f02319be83af4f3c1951dc220b52c5e", features = ["tls"] } # Used to handle requests
#rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] }
# Used for matrix spec type definitions and helpers # Used for matrix spec type definitions and helpers
ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "0635b407290abf5f34d726e1e690c92c07c738e5" } ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "bba442580d6cd7ed990b2b63387eed2238cbadc8" }
# ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "verified-export" } # ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "verified-export" }
# ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] } # ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] }
# Used when doing state resolution # Used when doing state resolution
# state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] } # state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] }
# TODO: remove the gen-eventid feature # TODO: remove the gen-eventid feature
state-res = { git = "https://github.com/ruma/state-res", branch = "no-db", features = ["unstable-pre-spec", "gen-eventid"] } state-res = { git = "https://github.com/ruma/state-res", rev = "791c66d73cf064d09db0cdf767d5fef43a343425", features = ["unstable-pre-spec", "gen-eventid"] }
# state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] } # state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] }
# Used for long polling and federation sender, should be the same as rocket::tokio # Used for long polling and federation sender, should be the same as rocket::tokio
tokio = { version = "0.2.24" } tokio = { version = "1.1.0", features = ["macros", "time", "sync"] }
# Used for storing data permanently # Used for storing data permanently
sled = { version = "0.34.6", default-features = false } sled = { version = "0.34.6", default-features = false }
# Used for emitting log entries # Used for emitting log entries
log = "0.4.11" log = "0.4.11"
# Used for rocket<->ruma conversions # Used for rocket<->ruma conversions
http = "0.2.1" http = "0.2.3"
# Used to find data directory for default db path # Used to find data directory for default db path
directories = "3.0.1" directories = "3.0.1"
@ -50,7 +50,7 @@ rand = "0.7.3"
# Used to hash passwords # Used to hash passwords
rust-argon2 = "0.8.3" rust-argon2 = "0.8.3"
# Used to send requests # Used to send requests
reqwest = "0.10.9" reqwest = "0.11.0"
# Used for conduit::Error type # Used for conduit::Error type
thiserror = "1.0.22" thiserror = "1.0.22"
# Used to generate thumbnails for images # Used to generate thumbnails for images
@ -60,7 +60,7 @@ base64 = "0.13.0"
# Used when hashing the state # Used when hashing the state
ring = "0.16.19" ring = "0.16.19"
# Used when querying the SRV record of other servers # Used when querying the SRV record of other servers
trust-dns-resolver = "0.19.6" trust-dns-resolver = "0.20.0"
# Used to find matching events for appservices # Used to find matching events for appservices
regex = "1.4.2" regex = "1.4.2"

View file

@ -674,9 +674,10 @@ pub async fn sync_events_route(
if duration.as_secs() > 30 { if duration.as_secs() > 30 {
duration = Duration::from_secs(30); duration = Duration::from_secs(30);
} }
let mut delay = tokio::time::delay_for(duration); let delay = tokio::time::sleep(duration);
tokio::pin!(delay);
tokio::select! { tokio::select! {
_ = &mut delay => {} _ = &mut delay, if delay.is_elapsed() => {}
_ = watcher => {} _ = watcher => {}
} }
} }

View file

@ -106,8 +106,7 @@ impl Database {
db.open_tree("global")?, db.open_tree("global")?,
db.open_tree("servertimeout_signingkey")?, db.open_tree("servertimeout_signingkey")?,
config, config,
) )?,
.await?,
users: users::Users { users: users::Users {
userid_password: db.open_tree("userid_password")?, userid_password: db.open_tree("userid_password")?,
userid_displayname: db.open_tree("userid_displayname")?, userid_displayname: db.open_tree("userid_displayname")?,

View file

@ -26,11 +26,7 @@ pub struct Globals {
} }
impl Globals { impl Globals {
pub async fn load( pub fn load(globals: sled::Tree, server_keys: sled::Tree, config: Config) -> Result<Self> {
globals: sled::Tree,
server_keys: sled::Tree,
config: Config,
) -> Result<Self> {
let bytes = &*globals let bytes = &*globals
.update_and_fetch("keypair", utils::generate_keypair)? .update_and_fetch("keypair", utils::generate_keypair)?
.expect("utils::generate_keypair always returns Some"); .expect("utils::generate_keypair always returns Some");
@ -77,11 +73,9 @@ impl Globals {
config, config,
keypair: Arc::new(keypair), keypair: Arc::new(keypair),
reqwest_client, reqwest_client,
dns_resolver: TokioAsyncResolver::tokio_from_system_conf() dns_resolver: TokioAsyncResolver::tokio_from_system_conf().map_err(|_| {
.await Error::bad_config("Failed to set up trust dns resolver with system config.")
.map_err(|_| { })?,
Error::bad_config("Failed to set up trust dns resolver with system config.")
})?,
actual_destination_cache: Arc::new(RwLock::new(HashMap::new())), actual_destination_cache: Arc::new(RwLock::new(HashMap::new())),
servertimeout_signingkey: server_keys, servertimeout_signingkey: server_keys,
}) })

View file

@ -25,7 +25,7 @@ use ruma::{
}; };
use state_res::{Event, EventMap, StateMap}; use state_res::{Event, EventMap, StateMap};
use std::{ use std::{
collections::{BTreeMap, BTreeSet}, collections::{BTreeMap, BTreeSet, HashMap},
convert::TryFrom, convert::TryFrom,
fmt::Debug, fmt::Debug,
future::Future, future::Future,
@ -839,7 +839,7 @@ pub async fn send_transaction_message_route<'a>(
.map(|(_, pdu)| (pdu.event_id().clone(), pdu)), .map(|(_, pdu)| (pdu.event_id().clone(), pdu)),
); );
match state_res::StateResolution::resolve( let res = match state_res::StateResolution::resolve(
&pdu.room_id, &pdu.room_id,
&RoomVersionId::Version6, &RoomVersionId::Version6,
&fork_states &fork_states
@ -856,10 +856,7 @@ pub async fn send_transaction_message_route<'a>(
.collect(), .collect(),
&mut auth_cache, &mut auth_cache,
) { ) {
Ok(res) => res Ok(res) => res,
.into_iter()
.map(|(k, v)| (k, Arc::new(db.rooms.get_pdu(&v).unwrap().unwrap())))
.collect(),
Err(_) => { Err(_) => {
resolved_map.insert( resolved_map.insert(
pdu.event_id().clone(), pdu.event_id().clone(),
@ -867,7 +864,29 @@ pub async fn send_transaction_message_route<'a>(
); );
continue 'main_pdu_loop; continue 'main_pdu_loop;
} }
};
let mut resolved = BTreeMap::new();
for (k, id) in res {
// We should know of the event but just incase
let pdu = match auth_cache.get(&id) {
Some(pdu) => pdu.clone(),
None => {
match fetch_events(&db, server_name, &pub_key_map, &[id], &mut auth_cache)
.await
.map(|mut vec| vec.pop())
{
Ok(Some(aev)) => aev,
_ => {
resolved_map
.insert(event_id.clone(), Err("Failed to fetch event".into()));
continue 'main_pdu_loop;
}
}
}
};
resolved.insert(k, pdu);
} }
resolved
}; };
// Add the event to the DB and update the forward extremities (via roomid_pduleaves). // Add the event to the DB and update the forward extremities (via roomid_pduleaves).
@ -1199,37 +1218,67 @@ fn append_incoming_pdu(
new_room_leaves: &[EventId], new_room_leaves: &[EventId],
state: Option<StateMap<Arc<PduEvent>>>, state: Option<StateMap<Arc<PduEvent>>>,
) -> Result<()> { ) -> Result<()> {
// Update the state of the room if needed
// We can tell if we need to do this based on wether state resolution took place or not
if let Some(state) = state {
let mut new_state = HashMap::new();
for ((ev_type, state_k), pdu) in state {
match db.rooms.get_pdu_id(pdu.event_id())? {
Some(pduid) => {
new_state.insert(
(
ev_type,
state_k.ok_or_else(|| {
Error::Conflict("State contained non state event")
})?,
),
pduid.to_vec(),
);
}
None => {
let count = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
// TODO: can we use are current state if we just add this event to the end of our
// pduid_pdu tree??
let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;
db.rooms.append_pdu(
&*pdu,
utils::to_canonical_object(&*pdu).expect("Pdu is valid canonical object"),
count,
pdu_id.clone().into(),
&new_room_leaves,
&db,
)?;
// TODO: is this ok...
db.rooms.set_room_state(&pdu.room_id, &statehashid)?;
new_state.insert(
(
ev_type,
state_k.ok_or_else(|| {
Error::Conflict("State contained non state event")
})?,
),
pdu_id.to_vec(),
);
}
}
}
info!("Force update of state for {:?}", pdu);
db.rooms
.force_state(pdu.room_id(), new_state, &db.globals)?;
}
let count = db.globals.next_count()?; let count = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec(); let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff); pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes()); pdu_id.extend_from_slice(&count.to_be_bytes());
// Update the state of the room if needed
// We can tell if we need to do this based on wether state resolution took place or not
if let Some(state) = state {
let new = state
.into_iter()
.map(|((ev, k), pdu)| {
Ok((
(
ev,
k.ok_or_else(|| Error::Conflict("State contained non state event"))?,
),
db.rooms
.get_pdu_id(pdu.event_id())
.ok()
.flatten()
.ok_or_else(|| Error::Conflict("Resolved state contained unknown event"))?
.to_vec(),
))
})
.collect::<Result<_>>()?;
info!("Force update of state for {:?}", pdu);
db.rooms.force_state(pdu.room_id(), new, &db.globals)?;
}
// We append to state before appending the pdu, so we don't have a moment in time with the // We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail. // pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;