mirror of
https://gitlab.com/famedly/conduit.git
synced 2024-12-26 20:43:48 +03:00
fix: don't fail the entire transaction if any PDU's format is invalid
This commit is contained in:
parent
75322af8c7
commit
ea3e7045b4
1 changed files with 75 additions and 82 deletions
|
@ -2,7 +2,10 @@
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
api::client_server::{self, claim_keys_helper, get_keys_helper},
|
api::client_server::{self, claim_keys_helper, get_keys_helper},
|
||||||
service::pdu::{gen_event_id_canonical_json, PduBuilder},
|
service::{
|
||||||
|
globals::SigningKeys,
|
||||||
|
pdu::{gen_event_id_canonical_json, PduBuilder},
|
||||||
|
},
|
||||||
services, utils, Error, PduEvent, Result, Ruma,
|
services, utils, Error, PduEvent, Result, Ruma,
|
||||||
};
|
};
|
||||||
use axum::{response::IntoResponse, Json};
|
use axum::{response::IntoResponse, Json};
|
||||||
|
@ -800,17 +803,78 @@ pub fn parse_incoming_pdu(
|
||||||
|
|
||||||
let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) {
|
let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) {
|
||||||
Ok(t) => t,
|
Ok(t) => t,
|
||||||
Err(_) => {
|
Err(e) => {
|
||||||
// Event could not be converted to canonical json
|
// Event could not be converted to canonical json
|
||||||
return Err(Error::BadRequest(
|
return Err(e);
|
||||||
ErrorKind::InvalidParam,
|
|
||||||
"Could not convert event to canonical json.",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Ok((event_id, value, room_id))
|
Ok((event_id, value, room_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempts to parse and append PDU to timeline.
|
||||||
|
/// If no event ID is returned, then the PDU was failed to be parsed.
|
||||||
|
/// If the Ok(()) is returned, then the PDU was successfully appended to the timeline.
|
||||||
|
async fn handle_pdu_in_transaction(
|
||||||
|
origin: &ServerName,
|
||||||
|
pub_key_map: &RwLock<BTreeMap<String, SigningKeys>>,
|
||||||
|
pdu: &RawJsonValue,
|
||||||
|
) -> (Option<OwnedEventId>, Result<()>) {
|
||||||
|
let (event_id, value, room_id) = match parse_incoming_pdu(pdu) {
|
||||||
|
Ok(t) => t,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Could not parse PDU: {e}");
|
||||||
|
warn!("Full PDU: {:?}", &pdu);
|
||||||
|
return (None, Err(Error::BadServerResponse("Could not parse PDU")));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Makes use of the m.room.create event. If we cannot fetch this event,
|
||||||
|
// we must have never been in that room.
|
||||||
|
if services().rooms.state.get_room_version(&room_id).is_err() {
|
||||||
|
debug!("Room {room_id} is not known to this server");
|
||||||
|
return (
|
||||||
|
Some(event_id),
|
||||||
|
Err(Error::BadServerResponse("Room is not known to this server")),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
||||||
|
|
||||||
|
let mutex = Arc::clone(
|
||||||
|
services()
|
||||||
|
.globals
|
||||||
|
.roomid_mutex_federation
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.entry(room_id.to_owned())
|
||||||
|
.or_default(),
|
||||||
|
);
|
||||||
|
let mutex_lock = mutex.lock().await;
|
||||||
|
let start_time = Instant::now();
|
||||||
|
|
||||||
|
if let Err(e) = services()
|
||||||
|
.rooms
|
||||||
|
.event_handler
|
||||||
|
.handle_incoming_pdu(origin, &event_id, &room_id, value, true, pub_key_map)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
warn!("Error appending PDU to timeline: {}: {:?}", e, pdu);
|
||||||
|
return (Some(event_id), Err(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(mutex_lock);
|
||||||
|
|
||||||
|
let elapsed = start_time.elapsed();
|
||||||
|
debug!(
|
||||||
|
"Handling transaction of event {} took {}m{}s",
|
||||||
|
event_id,
|
||||||
|
elapsed.as_secs() / 60,
|
||||||
|
elapsed.as_secs() % 60
|
||||||
|
);
|
||||||
|
|
||||||
|
(Some(event_id), Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
/// # `PUT /_matrix/federation/v1/send/{txnId}`
|
/// # `PUT /_matrix/federation/v1/send/{txnId}`
|
||||||
///
|
///
|
||||||
/// Push EDUs and PDUs to this server.
|
/// Push EDUs and PDUs to this server.
|
||||||
|
@ -835,77 +899,11 @@ pub async fn send_transaction_message_route(
|
||||||
// let mut auth_cache = EventMap::new();
|
// let mut auth_cache = EventMap::new();
|
||||||
|
|
||||||
for pdu in &body.pdus {
|
for pdu in &body.pdus {
|
||||||
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
|
let (event_id, result) =
|
||||||
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
|
handle_pdu_in_transaction(sender_servername, &pub_key_map, pdu).await;
|
||||||
Error::BadServerResponse("Invalid PDU in server response")
|
|
||||||
})?;
|
|
||||||
let room_id: OwnedRoomId = value
|
|
||||||
.get("room_id")
|
|
||||||
.and_then(|id| RoomId::parse(id.as_str()?).ok())
|
|
||||||
.ok_or(Error::BadRequest(
|
|
||||||
ErrorKind::InvalidParam,
|
|
||||||
"Invalid room id in pdu",
|
|
||||||
))?;
|
|
||||||
|
|
||||||
if services().rooms.state.get_room_version(&room_id).is_err() {
|
if let Some(event_id) = event_id {
|
||||||
debug!("Server is not in room {room_id}");
|
resolved_map.insert(event_id.clone(), result.map_err(|e| e.sanitized_error()));
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let r = parse_incoming_pdu(pdu);
|
|
||||||
let (event_id, value, room_id) = match r {
|
|
||||||
Ok(t) => t,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Could not parse PDU: {e}");
|
|
||||||
warn!("Full PDU: {:?}", &pdu);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
|
||||||
|
|
||||||
let mutex = Arc::clone(
|
|
||||||
services()
|
|
||||||
.globals
|
|
||||||
.roomid_mutex_federation
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry(room_id.to_owned())
|
|
||||||
.or_default(),
|
|
||||||
);
|
|
||||||
let mutex_lock = mutex.lock().await;
|
|
||||||
let start_time = Instant::now();
|
|
||||||
resolved_map.insert(
|
|
||||||
event_id.clone(),
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.event_handler
|
|
||||||
.handle_incoming_pdu(
|
|
||||||
sender_servername,
|
|
||||||
&event_id,
|
|
||||||
&room_id,
|
|
||||||
value,
|
|
||||||
true,
|
|
||||||
&pub_key_map,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map(|_| ()),
|
|
||||||
);
|
|
||||||
drop(mutex_lock);
|
|
||||||
|
|
||||||
let elapsed = start_time.elapsed();
|
|
||||||
debug!(
|
|
||||||
"Handling transaction of event {} took {}m{}s",
|
|
||||||
event_id,
|
|
||||||
elapsed.as_secs() / 60,
|
|
||||||
elapsed.as_secs() % 60
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
for pdu in &resolved_map {
|
|
||||||
if let Err(e) = pdu.1 {
|
|
||||||
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
|
|
||||||
warn!("Incoming PDU failed {:?}", pdu);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1074,12 +1072,7 @@ pub async fn send_transaction_message_route(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(send_transaction_message::v1::Response {
|
Ok(send_transaction_message::v1::Response { pdus: resolved_map })
|
||||||
pdus: resolved_map
|
|
||||||
.into_iter()
|
|
||||||
.map(|(e, r)| (e, r.map_err(|e| e.sanitized_error())))
|
|
||||||
.collect(),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # `GET /_matrix/federation/v1/event/{eventId}`
|
/// # `GET /_matrix/federation/v1/event/{eventId}`
|
||||||
|
|
Loading…
Reference in a new issue