mirror of
https://gitlab.com/famedly/conduit.git
synced 2024-12-27 21:13:47 +03:00
Convert uses of serde_json::Value to CanonicalJsonObject
This commit is contained in:
parent
b6d721374f
commit
27e686f9ff
5 changed files with 277 additions and 256 deletions
431
Cargo.lock
generated
431
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -18,12 +18,13 @@ use ruma::{
|
||||||
federation,
|
federation,
|
||||||
},
|
},
|
||||||
events::{pdu::Pdu, room::member, EventType},
|
events::{pdu::Pdu, room::member, EventType},
|
||||||
|
serde::{to_canonical_value, CanonicalJsonObject},
|
||||||
EventId, Raw, RoomId, RoomVersionId, ServerName, UserId,
|
EventId, Raw, RoomId, RoomVersionId, ServerName, UserId,
|
||||||
};
|
};
|
||||||
use state_res::StateEvent;
|
use state_res::StateEvent;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, HashMap, HashSet},
|
collections::{BTreeMap, HashMap, HashSet},
|
||||||
convert::{TryFrom, TryInto},
|
convert::TryFrom,
|
||||||
iter,
|
iter,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
@ -477,30 +478,25 @@ async fn join_room_by_id_helper(
|
||||||
|
|
||||||
let (make_join_response, remote_server) = make_join_response_and_server?;
|
let (make_join_response, remote_server) = make_join_response_and_server?;
|
||||||
|
|
||||||
let mut join_event_stub_value =
|
let mut join_event_stub =
|
||||||
serde_json::from_str::<serde_json::Value>(make_join_response.event.json().get())
|
serde_json::from_str::<CanonicalJsonObject>(make_join_response.event.json().get())
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::BadServerResponse("Invalid make_join event json received from server.")
|
Error::BadServerResponse("Invalid make_join event json received from server.")
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let join_event_stub =
|
|
||||||
join_event_stub_value
|
|
||||||
.as_object_mut()
|
|
||||||
.ok_or(Error::BadServerResponse(
|
|
||||||
"Invalid make join event object received from server.",
|
|
||||||
))?;
|
|
||||||
|
|
||||||
join_event_stub.insert(
|
join_event_stub.insert(
|
||||||
"origin".to_owned(),
|
"origin".to_owned(),
|
||||||
db.globals.server_name().to_owned().to_string().into(),
|
to_canonical_value(db.globals.server_name())
|
||||||
|
.map_err(|_| Error::bad_database("Invalid server name found"))?,
|
||||||
);
|
);
|
||||||
join_event_stub.insert(
|
join_event_stub.insert(
|
||||||
"origin_server_ts".to_owned(),
|
"origin_server_ts".to_owned(),
|
||||||
utils::millis_since_unix_epoch().into(),
|
to_canonical_value(utils::millis_since_unix_epoch())
|
||||||
|
.expect("Timestamp is valid js_int value"),
|
||||||
);
|
);
|
||||||
join_event_stub.insert(
|
join_event_stub.insert(
|
||||||
"content".to_owned(),
|
"content".to_owned(),
|
||||||
serde_json::to_value(member::MemberEventContent {
|
to_canonical_value(member::MemberEventContent {
|
||||||
membership: member::MembershipState::Join,
|
membership: member::MembershipState::Join,
|
||||||
displayname: db.users.displayname(&sender_user)?,
|
displayname: db.users.displayname(&sender_user)?,
|
||||||
avatar_url: db.users.avatar_url(&sender_user)?,
|
avatar_url: db.users.avatar_url(&sender_user)?,
|
||||||
|
@ -510,18 +506,14 @@ async fn join_room_by_id_helper(
|
||||||
.expect("event is valid, we just created it"),
|
.expect("event is valid, we just created it"),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Convert `serde_json;:Value` to `CanonicalJsonObj` for hashing/signing
|
|
||||||
let mut canon_json_stub: BTreeMap<_, ruma::signatures::CanonicalJsonValue> =
|
|
||||||
serde_json::from_value(join_event_stub_value).expect("json Value is canonical JSON");
|
|
||||||
|
|
||||||
// We don't leave the event id in the pdu because that's only allowed in v1 or v2 rooms
|
// We don't leave the event id in the pdu because that's only allowed in v1 or v2 rooms
|
||||||
canon_json_stub.remove("event_id");
|
join_event_stub.remove("event_id");
|
||||||
|
|
||||||
// In order to create a compatible ref hash (EventID) the `hashes` field needs to be present
|
// In order to create a compatible ref hash (EventID) the `hashes` field needs to be present
|
||||||
ruma::signatures::hash_and_sign_event(
|
ruma::signatures::hash_and_sign_event(
|
||||||
db.globals.server_name().as_str(),
|
db.globals.server_name().as_str(),
|
||||||
db.globals.keypair(),
|
db.globals.keypair(),
|
||||||
&mut canon_json_stub,
|
&mut join_event_stub,
|
||||||
&RoomVersionId::Version6,
|
&RoomVersionId::Version6,
|
||||||
)
|
)
|
||||||
.expect("event is valid, we just created it");
|
.expect("event is valid, we just created it");
|
||||||
|
@ -529,21 +521,19 @@ async fn join_room_by_id_helper(
|
||||||
// Generate event id
|
// Generate event id
|
||||||
let event_id = EventId::try_from(&*format!(
|
let event_id = EventId::try_from(&*format!(
|
||||||
"${}",
|
"${}",
|
||||||
ruma::signatures::reference_hash(&canon_json_stub, &RoomVersionId::Version6)
|
ruma::signatures::reference_hash(&join_event_stub, &RoomVersionId::Version6)
|
||||||
.expect("ruma can calculate reference hashes")
|
.expect("ruma can calculate reference hashes")
|
||||||
))
|
))
|
||||||
.expect("ruma's reference hashes are valid event ids");
|
.expect("ruma's reference hashes are valid event ids");
|
||||||
|
|
||||||
// Add event_id back
|
// Add event_id back
|
||||||
canon_json_stub.insert(
|
join_event_stub.insert(
|
||||||
"event_id".to_owned(),
|
"event_id".to_owned(),
|
||||||
serde_json::json!(event_id)
|
to_canonical_value(&event_id).expect("EventId is a valid CanonicalJsonValue"),
|
||||||
.try_into()
|
|
||||||
.expect("EventId is a valid CanonicalJsonValue"),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// It has enough fields to be called a proper event now
|
// It has enough fields to be called a proper event now
|
||||||
let join_event = canon_json_stub;
|
let join_event = join_event_stub;
|
||||||
|
|
||||||
let send_join_response = server_server::send_request(
|
let send_join_response = server_server::send_request(
|
||||||
&db.globals,
|
&db.globals,
|
||||||
|
@ -559,7 +549,7 @@ async fn join_room_by_id_helper(
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let add_event_id = |pdu: &Raw<Pdu>| -> Result<(EventId, serde_json::Value)> {
|
let add_event_id = |pdu: &Raw<Pdu>| -> Result<(EventId, CanonicalJsonObject)> {
|
||||||
let mut value = serde_json::from_str(pdu.json().get())
|
let mut value = serde_json::from_str(pdu.json().get())
|
||||||
.expect("converting raw jsons to values always works");
|
.expect("converting raw jsons to values always works");
|
||||||
let event_id = EventId::try_from(&*format!(
|
let event_id = EventId::try_from(&*format!(
|
||||||
|
@ -571,18 +561,18 @@ async fn join_room_by_id_helper(
|
||||||
|
|
||||||
value.insert(
|
value.insert(
|
||||||
"event_id".to_owned(),
|
"event_id".to_owned(),
|
||||||
serde_json::from_value(serde_json::json!(event_id))
|
to_canonical_value(&event_id)
|
||||||
.expect("a valid EventId can be converted to CanonicalJsonValue"),
|
.expect("a valid EventId can be converted to CanonicalJsonValue"),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok((event_id, serde_json::json!(value)))
|
Ok((event_id, value))
|
||||||
};
|
};
|
||||||
|
|
||||||
let room_state = send_join_response.room_state.state.iter().map(add_event_id);
|
let room_state = send_join_response.room_state.state.iter().map(add_event_id);
|
||||||
|
|
||||||
let state_events = room_state
|
let state_events = room_state
|
||||||
.clone()
|
.clone()
|
||||||
.map(|pdu: Result<(EventId, serde_json::Value)>| Ok(pdu?.0))
|
.map(|pdu: Result<(EventId, CanonicalJsonObject)>| Ok(pdu?.0))
|
||||||
.chain(iter::once(Ok(event_id.clone()))) // Add join event we just created
|
.chain(iter::once(Ok(event_id.clone()))) // Add join event we just created
|
||||||
.collect::<Result<HashSet<EventId>>>()?;
|
.collect::<Result<HashSet<EventId>>>()?;
|
||||||
|
|
||||||
|
@ -594,16 +584,13 @@ async fn join_room_by_id_helper(
|
||||||
|
|
||||||
let mut event_map = room_state
|
let mut event_map = room_state
|
||||||
.chain(auth_chain)
|
.chain(auth_chain)
|
||||||
.chain(iter::once(Ok((
|
.chain(iter::once(Ok((event_id, join_event)))) // Add join event we just created
|
||||||
event_id,
|
|
||||||
serde_json::to_value(join_event).unwrap(),
|
|
||||||
)))) // Add join event we just created
|
|
||||||
.map(|r| {
|
.map(|r| {
|
||||||
let (event_id, value) = r?;
|
let (event_id, value) = r?;
|
||||||
state_res::StateEvent::from_id_value(event_id.clone(), value.clone())
|
state_res::StateEvent::from_id_canon_obj(event_id.clone(), value.clone())
|
||||||
.map(|ev| (event_id, Arc::new(ev)))
|
.map(|ev| (event_id, Arc::new(ev)))
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
warn!("{}: {}", value, e);
|
warn!("{:?}: {}", value, e);
|
||||||
Error::BadServerResponse("Invalid PDU in send_join response.")
|
Error::BadServerResponse("Invalid PDU in send_join response.")
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -692,7 +679,7 @@ async fn join_room_by_id_helper(
|
||||||
pdu_id.extend_from_slice(&count.to_be_bytes());
|
pdu_id.extend_from_slice(&count.to_be_bytes());
|
||||||
db.rooms.append_pdu(
|
db.rooms.append_pdu(
|
||||||
&PduEvent::from(&**pdu),
|
&PduEvent::from(&**pdu),
|
||||||
&serde_json::to_value(&**pdu).expect("PDU is valid value"),
|
&utils::to_canonical_object(&**pdu).expect("Pdu is valid canonical object"),
|
||||||
count,
|
count,
|
||||||
pdu_id.clone().into(),
|
pdu_id.clone().into(),
|
||||||
&db.globals,
|
&db.globals,
|
||||||
|
|
|
@ -15,6 +15,7 @@ use ruma::{
|
||||||
},
|
},
|
||||||
EventType,
|
EventType,
|
||||||
},
|
},
|
||||||
|
serde::{to_canonical_value, CanonicalJsonObject},
|
||||||
EventId, Raw, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
|
EventId, Raw, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
|
||||||
};
|
};
|
||||||
use sled::IVec;
|
use sled::IVec;
|
||||||
|
@ -506,7 +507,7 @@ impl Rooms {
|
||||||
pub fn append_pdu(
|
pub fn append_pdu(
|
||||||
&self,
|
&self,
|
||||||
pdu: &PduEvent,
|
pdu: &PduEvent,
|
||||||
pdu_json: &serde_json::Value,
|
pdu_json: &CanonicalJsonObject,
|
||||||
count: u64,
|
count: u64,
|
||||||
pdu_id: IVec,
|
pdu_id: IVec,
|
||||||
globals: &super::globals::Globals,
|
globals: &super::globals::Globals,
|
||||||
|
@ -520,7 +521,11 @@ impl Rooms {
|
||||||
self.edus
|
self.edus
|
||||||
.private_read_set(&pdu.room_id, &pdu.sender, count, &globals)?;
|
.private_read_set(&pdu.room_id, &pdu.sender, count, &globals)?;
|
||||||
|
|
||||||
self.pduid_pdu.insert(&pdu_id, &*pdu_json.to_string())?;
|
self.pduid_pdu.insert(
|
||||||
|
&pdu_id,
|
||||||
|
&*serde_json::to_string(pdu_json)
|
||||||
|
.expect("CanonicalJsonObject is always a valid String"),
|
||||||
|
)?;
|
||||||
|
|
||||||
self.eventid_pduid
|
self.eventid_pduid
|
||||||
.insert(pdu.event_id.as_bytes(), &*pdu_id)?;
|
.insert(pdu.event_id.as_bytes(), &*pdu_id)?;
|
||||||
|
@ -863,8 +868,7 @@ impl Rooms {
|
||||||
// Add origin because synapse likes that (and it's required in the spec)
|
// Add origin because synapse likes that (and it's required in the spec)
|
||||||
pdu_json.insert(
|
pdu_json.insert(
|
||||||
"origin".to_owned(),
|
"origin".to_owned(),
|
||||||
serde_json::json!(globals.server_name())
|
to_canonical_value(globals.server_name())
|
||||||
.try_into()
|
|
||||||
.expect("server name is a valid CanonicalJsonValue"),
|
.expect("server name is a valid CanonicalJsonValue"),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -886,9 +890,7 @@ impl Rooms {
|
||||||
|
|
||||||
pdu_json.insert(
|
pdu_json.insert(
|
||||||
"event_id".to_owned(),
|
"event_id".to_owned(),
|
||||||
serde_json::json!(pdu.event_id)
|
to_canonical_value(&pdu.event_id).expect("EventId is a valid CanonicalJsonValue"),
|
||||||
.try_into()
|
|
||||||
.expect("EventId is a valid CanonicalJsonValue"),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// Increment the last index and use that
|
// Increment the last index and use that
|
||||||
|
@ -904,7 +906,7 @@ impl Rooms {
|
||||||
|
|
||||||
self.append_pdu(
|
self.append_pdu(
|
||||||
&pdu,
|
&pdu,
|
||||||
&serde_json::json!(pdu_json), // TODO fixup CanonicalJsonValue
|
&pdu_json,
|
||||||
count,
|
count,
|
||||||
pdu_id.clone().into(),
|
pdu_id.clone().into(),
|
||||||
globals,
|
globals,
|
||||||
|
|
|
@ -185,9 +185,7 @@ impl Sending {
|
||||||
.iter()
|
.iter()
|
||||||
.map(|pdu_id| {
|
.map(|pdu_id| {
|
||||||
Ok::<_, (Box<ServerName>, Error)>(
|
Ok::<_, (Box<ServerName>, Error)>(
|
||||||
// TODO: this was a PduStub
|
// TODO: check room version and remove event_id if needed
|
||||||
// In order for sending to work these actually do have to be
|
|
||||||
// PduStub but, since they are Raw<..> we can fake it.
|
|
||||||
serde_json::from_str(
|
serde_json::from_str(
|
||||||
PduEvent::convert_to_outgoing_federation_event(
|
PduEvent::convert_to_outgoing_federation_event(
|
||||||
rooms
|
rooms
|
||||||
|
|
|
@ -20,6 +20,7 @@ use ruma::{
|
||||||
OutgoingRequest,
|
OutgoingRequest,
|
||||||
},
|
},
|
||||||
directory::{IncomingFilter, IncomingRoomNetwork},
|
directory::{IncomingFilter, IncomingRoomNetwork},
|
||||||
|
serde::{to_canonical_value, CanonicalJsonObject},
|
||||||
EventId, RoomId, RoomVersionId, ServerName, UserId,
|
EventId, RoomId, RoomVersionId, ServerName, UserId,
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{
|
||||||
|
@ -431,12 +432,13 @@ pub async fn send_transaction_message_route<'a>(
|
||||||
let mut resolved_map = BTreeMap::new();
|
let mut resolved_map = BTreeMap::new();
|
||||||
for pdu in &body.pdus {
|
for pdu in &body.pdus {
|
||||||
let (event_id, value) = process_incoming_pdu(pdu);
|
let (event_id, value) = process_incoming_pdu(pdu);
|
||||||
let pdu = serde_json::from_value::<PduEvent>(value.clone())
|
// TODO: this is an unfortunate conversion dance...
|
||||||
|
let pdu = serde_json::from_value::<PduEvent>(serde_json::to_value(&value).expect("msg"))
|
||||||
.expect("all ruma pdus are conduit pdus");
|
.expect("all ruma pdus are conduit pdus");
|
||||||
let room_id = &pdu.room_id;
|
let room_id = &pdu.room_id;
|
||||||
|
|
||||||
// If we have no idea about this room skip the PDU
|
// If we have no idea about this room skip the PDU
|
||||||
if !db.rooms.exists(&pdu.room_id)? {
|
if !db.rooms.exists(room_id)? {
|
||||||
error!("Room does not exist on this server.");
|
error!("Room does not exist on this server.");
|
||||||
resolved_map.insert(event_id, Err("Room is unknown to this server".into()));
|
resolved_map.insert(event_id, Err("Room is unknown to this server".into()));
|
||||||
continue;
|
continue;
|
||||||
|
@ -477,7 +479,7 @@ pub async fn send_transaction_message_route<'a>(
|
||||||
// When creating a StateEvent the event_id arg will be used
|
// When creating a StateEvent the event_id arg will be used
|
||||||
// over any found in the json and it will not use ruma::reference_hash
|
// over any found in the json and it will not use ruma::reference_hash
|
||||||
// to generate one
|
// to generate one
|
||||||
state_res::StateEvent::from_id_value(event_id, json)
|
state_res::StateEvent::from_id_canon_obj(event_id, json)
|
||||||
.expect("valid pdu json"),
|
.expect("valid pdu json"),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
@ -485,7 +487,7 @@ pub async fn send_transaction_message_route<'a>(
|
||||||
.collect::<BTreeMap<_, _>>();
|
.collect::<BTreeMap<_, _>>();
|
||||||
|
|
||||||
if value.get("state_key").is_none() {
|
if value.get("state_key").is_none() {
|
||||||
if !db.rooms.is_joined(&pdu.sender, &pdu.room_id)? {
|
if !db.rooms.is_joined(&pdu.sender, room_id)? {
|
||||||
error!("Sender is not joined {}", pdu.kind);
|
error!("Sender is not joined {}", pdu.kind);
|
||||||
resolved_map.insert(event_id, Err("User is not in this room".into()));
|
resolved_map.insert(event_id, Err("User is not in this room".into()));
|
||||||
continue;
|
continue;
|
||||||
|
@ -750,7 +752,7 @@ pub fn get_user_devices_route<'a>(
|
||||||
/// Generates a correct eventId for the incoming pdu.
|
/// Generates a correct eventId for the incoming pdu.
|
||||||
///
|
///
|
||||||
/// Returns a tuple of the new `EventId` and the PDU with the eventId inserted as a `serde_json::Value`.
|
/// Returns a tuple of the new `EventId` and the PDU with the eventId inserted as a `serde_json::Value`.
|
||||||
fn process_incoming_pdu(pdu: &ruma::Raw<ruma::events::pdu::Pdu>) -> (EventId, serde_json::Value) {
|
fn process_incoming_pdu(pdu: &ruma::Raw<ruma::events::pdu::Pdu>) -> (EventId, CanonicalJsonObject) {
|
||||||
let mut value =
|
let mut value =
|
||||||
serde_json::from_str(pdu.json().get()).expect("A Raw<...> is always valid JSON");
|
serde_json::from_str(pdu.json().get()).expect("A Raw<...> is always valid JSON");
|
||||||
|
|
||||||
|
@ -763,13 +765,8 @@ fn process_incoming_pdu(pdu: &ruma::Raw<ruma::events::pdu::Pdu>) -> (EventId, se
|
||||||
|
|
||||||
value.insert(
|
value.insert(
|
||||||
"event_id".to_owned(),
|
"event_id".to_owned(),
|
||||||
serde_json::json!(event_id)
|
to_canonical_value(&event_id).expect("EventId is a valid CanonicalJsonValue"),
|
||||||
.try_into()
|
|
||||||
.expect("EventId is a valid CanonicalJsonValue"),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
(
|
(event_id, value)
|
||||||
event_id,
|
|
||||||
serde_json::to_value(value).expect("JSON Value is a CanonicalJsonValue"),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue