fix: e2ee over federation

to device events were not being sent
This commit is contained in:
Timo Kösters 2021-08-25 14:42:46 +02:00 committed by Jonas Zohren
parent ee4b08c185
commit 72e2b643bb
2 changed files with 19 additions and 13 deletions

View file

@ -56,6 +56,7 @@ pub async fn send_event_to_device_route(
}, },
)) ))
.expect("DirectToDevice EDU can be serialized"), .expect("DirectToDevice EDU can be serialized"),
db.globals.next_count()?,
)?; )?;
continue; continue;

View file

@ -84,8 +84,8 @@ pub enum SendingEventType {
pub struct Sending { pub struct Sending {
/// The state for a given state hash. /// The state for a given state hash.
pub(super) servername_educount: Arc<dyn Tree>, // EduCount: Count of last EDU sync pub(super) servername_educount: Arc<dyn Tree>, // EduCount: Count of last EDU sync
pub(super) servernameevent_data: Arc<dyn Tree>, // ServernamEvent = (+ / $)SenderKey / ServerName / UserId + PduId / * (for edus), Data = EDU content pub(super) servernameevent_data: Arc<dyn Tree>, // ServernamEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id (for edus), Data = EDU content
pub(super) servercurrentevent_data: Arc<dyn Tree>, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / * (for edus), Data = EDU content pub(super) servercurrentevent_data: Arc<dyn Tree>, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for edus), Data = EDU content
pub(super) maximum_requests: Arc<Semaphore>, pub(super) maximum_requests: Arc<Semaphore>,
pub sender: mpsc::UnboundedSender<(Vec<u8>, Vec<u8>)>, pub sender: mpsc::UnboundedSender<(Vec<u8>, Vec<u8>)>,
} }
@ -435,10 +435,15 @@ impl Sending {
} }
#[tracing::instrument(skip(self, server, serialized))] #[tracing::instrument(skip(self, server, serialized))]
pub fn send_reliable_edu(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> { pub fn send_reliable_edu(
&self,
server: &ServerName,
serialized: Vec<u8>,
id: u64,
) -> Result<()> {
let mut key = server.as_bytes().to_vec(); let mut key = server.as_bytes().to_vec();
key.push(0xff); key.push(0xff);
key.push(b'*'); key.extend_from_slice(&id.to_be_bytes());
self.servernameevent_data.insert(&key, &serialized)?; self.servernameevent_data.insert(&key, &serialized)?;
self.sender.unbounded_send((key, serialized)).unwrap(); self.sender.unbounded_send((key, serialized)).unwrap();
@ -714,10 +719,10 @@ impl Sending {
OutgoingKind::Appservice(Box::<ServerName>::try_from(server).map_err(|_| { OutgoingKind::Appservice(Box::<ServerName>::try_from(server).map_err(|_| {
Error::bad_database("Invalid server string in server_currenttransaction") Error::bad_database("Invalid server string in server_currenttransaction")
})?), })?),
if event.starts_with(b"*") { if value.is_empty() {
SendingEventType::Edu(value)
} else {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(event.to_vec())
} else {
SendingEventType::Edu(value)
}, },
) )
} else if key.starts_with(b"$") { } else if key.starts_with(b"$") {
@ -732,10 +737,10 @@ impl Sending {
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
( (
OutgoingKind::Push(user.to_vec(), pushkey.to_vec()), OutgoingKind::Push(user.to_vec(), pushkey.to_vec()),
if event.starts_with(b"*") { if value.is_empty() {
SendingEventType::Edu(value)
} else {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(event.to_vec())
} else {
SendingEventType::Edu(value)
}, },
) )
} else { } else {
@ -753,10 +758,10 @@ impl Sending {
OutgoingKind::Normal(Box::<ServerName>::try_from(server).map_err(|_| { OutgoingKind::Normal(Box::<ServerName>::try_from(server).map_err(|_| {
Error::bad_database("Invalid server string in server_currenttransaction") Error::bad_database("Invalid server string in server_currenttransaction")
})?), })?),
if event.starts_with(b"*") { if value.is_empty() {
SendingEventType::Edu(event[1..].to_vec())
} else {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(event.to_vec())
} else {
SendingEventType::Edu(value)
}, },
) )
}) })