Merge pull request 'fix: send device list updates when user is in no rooms' (#170) from fix-set-up-encryption into master

Reviewed-on: https://git.koesters.xyz/timo/conduit/pulls/170
This commit is contained in:
Timo Kösters 2020-07-30 14:46:27 +02:00
commit 18dcf44aa4
3 changed files with 80 additions and 19 deletions

View file

@ -672,7 +672,11 @@ pub fn set_displayname_route(
displayname: body.displayname.clone(), displayname: body.displayname.clone(),
..serde_json::from_value::<Raw<_>>( ..serde_json::from_value::<Raw<_>>(
db.rooms db.rooms
.room_state_get(&room_id, &EventType::RoomMember, &sender_id.to_string())? .room_state_get(
&room_id,
&EventType::RoomMember,
&sender_id.to_string(),
)?
.ok_or_else(|| { .ok_or_else(|| {
Error::bad_database( Error::bad_database(
"Tried to send displayname update for user not in the room.", "Tried to send displayname update for user not in the room.",
@ -770,7 +774,11 @@ pub fn set_avatar_url_route(
avatar_url: body.avatar_url.clone(), avatar_url: body.avatar_url.clone(),
..serde_json::from_value::<Raw<_>>( ..serde_json::from_value::<Raw<_>>(
db.rooms db.rooms
.room_state_get(&room_id, &EventType::RoomMember, &sender_id.to_string())? .room_state_get(
&room_id,
&EventType::RoomMember,
&sender_id.to_string(),
)?
.ok_or_else(|| { .ok_or_else(|| {
Error::bad_database( Error::bad_database(
"Tried to send avatar url update for user not in the room.", "Tried to send avatar url update for user not in the room.",
@ -1884,12 +1892,11 @@ pub fn ban_user_route(
third_party_invite: None, third_party_invite: None,
}), }),
|event| { |event| {
let mut event = serde_json::from_value::<Raw<member::MemberEventContent>>( let mut event =
event.content, serde_json::from_value::<Raw<member::MemberEventContent>>(event.content)
) .expect("Raw::from_value always works")
.expect("Raw::from_value always works") .deserialize()
.deserialize() .map_err(|_| Error::bad_database("Invalid member event in database."))?;
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
event.membership = ruma::events::room::member::MembershipState::Ban; event.membership = ruma::events::room::member::MembershipState::Ban;
Ok(event) Ok(event)
}, },
@ -2211,6 +2218,7 @@ pub async fn get_public_rooms_filtered_route(
Ok::<_, Error>(chunk) Ok::<_, Error>(chunk)
}) })
.filter_map(|r| r.ok()) // Filter out buggy rooms .filter_map(|r| r.ok()) // Filter out buggy rooms
// We need to collect all, so we can sort by member count
.collect::<Vec<_>>(); .collect::<Vec<_>>();
chunk.sort_by(|l, r| r.num_joined_members.cmp(&l.num_joined_members)); chunk.sort_by(|l, r| r.num_joined_members.cmp(&l.num_joined_members));
@ -2618,6 +2626,13 @@ pub async fn sync_events_route(
let mut presence_updates = HashMap::new(); let mut presence_updates = HashMap::new();
let mut device_list_updates = HashSet::new(); let mut device_list_updates = HashSet::new();
// Look for device list updates of this account
device_list_updates.extend(
db.users
.keys_changed(&sender_id.to_string(), since, None)
.filter_map(|r| r.ok()),
);
for room_id in db.rooms.rooms_joined(&sender_id) { for room_id in db.rooms.rooms_joined(&sender_id) {
let room_id = room_id?; let room_id = room_id?;
@ -2869,7 +2884,7 @@ pub async fn sync_events_route(
// Look for device list updates in this room // Look for device list updates in this room
device_list_updates.extend( device_list_updates.extend(
db.users db.users
.keys_changed(&room_id, since, None) .keys_changed(&room_id.to_string(), since, None)
.filter_map(|r| r.ok()), .filter_map(|r| r.ok()),
); );
@ -3652,11 +3667,28 @@ pub fn get_key_changes_route(
let sender_id = body.sender_id.as_ref().expect("user is authenticated"); let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let mut device_list_updates = HashSet::new(); let mut device_list_updates = HashSet::new();
device_list_updates.extend(
db.users
.keys_changed(
&sender_id.to_string(),
body.from
.parse()
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `from`."))?,
Some(
body.to
.parse()
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `to`."))?,
),
)
.filter_map(|r| r.ok()),
);
for room_id in db.rooms.rooms_joined(sender_id).filter_map(|r| r.ok()) { for room_id in db.rooms.rooms_joined(sender_id).filter_map(|r| r.ok()) {
device_list_updates.extend( device_list_updates.extend(
db.users db.users
.keys_changed( .keys_changed(
&room_id, &room_id.to_string(),
body.from.parse().map_err(|_| { body.from.parse().map_err(|_| {
Error::BadRequest(ErrorKind::InvalidParam, "Invalid `from`.") Error::BadRequest(ErrorKind::InvalidParam, "Invalid `from`.")
})?, })?,

View file

@ -126,16 +126,17 @@ impl Database {
} }
pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) { pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) {
let mut userid_prefix = user_id.to_string().as_bytes().to_vec(); let userid_bytes = user_id.to_string().as_bytes().to_vec();
let mut userid_prefix = userid_bytes.clone();
userid_prefix.push(0xff); userid_prefix.push(0xff);
let mut userdeviceid_prefix = userid_prefix.clone(); let mut userdeviceid_prefix = userid_prefix.clone();
userdeviceid_prefix.extend_from_slice(device_id.as_bytes()); userdeviceid_prefix.extend_from_slice(device_id.as_bytes());
userdeviceid_prefix.push(0xff); userdeviceid_prefix.push(0xff);
let mut futures = futures::stream::FuturesUnordered::new(); let mut futures = futures::stream::FuturesUnordered::new();
futures.push(self.users.keychangeid_userid.watch_prefix(b""));
// Return when *any* user changed his key // Return when *any* user changed his key
// TODO: only send for user they share a room with // TODO: only send for user they share a room with
futures.push( futures.push(
@ -171,6 +172,9 @@ impl Database {
.watch_prefix(&roomid_prefix), .watch_prefix(&roomid_prefix),
); );
// Key changes
futures.push(self.users.keychangeid_userid.watch_prefix(&roomid_prefix));
// Room account data // Room account data
let mut roomuser_prefix = roomid_prefix.clone(); let mut roomuser_prefix = roomid_prefix.clone();
roomuser_prefix.extend_from_slice(&userid_prefix); roomuser_prefix.extend_from_slice(&userid_prefix);
@ -191,6 +195,16 @@ impl Database {
.watch_prefix(&globaluserdata_prefix), .watch_prefix(&globaluserdata_prefix),
); );
// More key changes (used when user is not joined to any rooms)
futures.push(self.users.keychangeid_userid.watch_prefix(&userid_prefix));
// One time keys
futures.push(
self.users
.userid_lastonetimekeyupdate
.watch_prefix(&userid_bytes),
);
// Wait until one of them finds something // Wait until one of them finds something
futures.next().await; futures.next().await;
} }

View file

@ -9,7 +9,7 @@ use ruma::{
}, },
}, },
events::{AnyToDeviceEvent, EventType}, events::{AnyToDeviceEvent, EventType},
DeviceId, Raw, RoomId, UserId, DeviceId, Raw, UserId,
}; };
use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime}; use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime};
@ -23,7 +23,7 @@ pub struct Users {
pub(super) onetimekeyid_onetimekeys: sled::Tree, // OneTimeKeyId = UserId + AlgorithmAndDeviceId pub(super) onetimekeyid_onetimekeys: sled::Tree, // OneTimeKeyId = UserId + AlgorithmAndDeviceId
pub(super) userid_lastonetimekeyupdate: sled::Tree, // LastOneTimeKeyUpdate = Count pub(super) userid_lastonetimekeyupdate: sled::Tree, // LastOneTimeKeyUpdate = Count
pub(super) keychangeid_userid: sled::Tree, // KeyChangeId = RoomId + Count pub(super) keychangeid_userid: sled::Tree, // KeyChangeId = UserId/RoomId + Count
pub(super) keyid_key: sled::Tree, // KeyId = UserId + KeyId (depends on key type) pub(super) keyid_key: sled::Tree, // KeyId = UserId + KeyId (depends on key type)
pub(super) userid_masterkeyid: sled::Tree, pub(super) userid_masterkeyid: sled::Tree,
pub(super) userid_selfsigningkeyid: sled::Tree, pub(super) userid_selfsigningkeyid: sled::Tree,
@ -305,8 +305,7 @@ impl Users {
} }
pub fn last_one_time_keys_update(&self, user_id: &UserId) -> Result<u64> { pub fn last_one_time_keys_update(&self, user_id: &UserId) -> Result<u64> {
self self.userid_lastonetimekeyupdate
.userid_lastonetimekeyupdate
.get(&user_id.to_string().as_bytes())? .get(&user_id.to_string().as_bytes())?
.map(|bytes| { .map(|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| { utils::u64_from_bytes(&bytes).map_err(|_| {
@ -417,6 +416,11 @@ impl Users {
self.keychangeid_userid.insert(key, &*user_id.to_string())?; self.keychangeid_userid.insert(key, &*user_id.to_string())?;
} }
let mut key = user_id.to_string().as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&count);
self.keychangeid_userid.insert(key, &*user_id.to_string())?;
Ok(()) Ok(())
} }
@ -524,6 +528,11 @@ impl Users {
self.keychangeid_userid.insert(key, &*user_id.to_string())?; self.keychangeid_userid.insert(key, &*user_id.to_string())?;
} }
let mut key = user_id.to_string().as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&count);
self.keychangeid_userid.insert(key, &*user_id.to_string())?;
Ok(()) Ok(())
} }
@ -576,16 +585,22 @@ impl Users {
.insert(key, &*target_id.to_string())?; .insert(key, &*target_id.to_string())?;
} }
let mut key = target_id.to_string().as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&count);
self.keychangeid_userid
.insert(key, &*target_id.to_string())?;
Ok(()) Ok(())
} }
pub fn keys_changed( pub fn keys_changed(
&self, &self,
room_id: &RoomId, user_or_room_id: &str,
from: u64, from: u64,
to: Option<u64>, to: Option<u64>,
) -> impl Iterator<Item = Result<UserId>> { ) -> impl Iterator<Item = Result<UserId>> {
let mut prefix = room_id.to_string().as_bytes().to_vec(); let mut prefix = user_or_room_id.as_bytes().to_vec();
prefix.push(0xff); prefix.push(0xff);
let mut start = prefix.clone(); let mut start = prefix.clone();