From 8b8381bcc030c844557a758690ec6e1cbc0a9ac4 Mon Sep 17 00:00:00 2001
From: timokoesters <timo@koesters.xyz>
Date: Sat, 4 Apr 2020 11:53:37 +0200
Subject: [PATCH] New PduEvent struct

---
 Cargo.lock  |  1 +
 Cargo.toml  |  1 +
 src/data.rs | 55 +++++++++++++++++++++++--------------------
 src/main.rs | 68 ++++++++++++++++++++++++++++++-----------------------
 src/pdu.rs  | 42 +++++++++++++++++++++++++++++++++
 5 files changed, 112 insertions(+), 55 deletions(-)
 create mode 100644 src/pdu.rs

diff --git a/Cargo.lock b/Cargo.lock
index c4c6419a..ced79c6c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -505,6 +505,7 @@ dependencies = [
  "ruma-federation-api",
  "ruma-identifiers",
  "ruma-signatures",
+ "serde",
  "serde_json",
  "sled",
 ]
diff --git a/Cargo.toml b/Cargo.toml
index b76a9c94..d2597071 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,3 +21,4 @@ js_int = "0.1.4"
 serde_json = "1.0.50"
 ruma-signatures = { git = "https://github.com/ruma/ruma-signatures.git" }
 ruma-federation-api = "0.0.1"
+serde = "1.0.105"
diff --git a/src/data.rs b/src/data.rs
index 28b8d05f..f0917ff4 100644
--- a/src/data.rs
+++ b/src/data.rs
@@ -1,9 +1,12 @@
-use crate::{utils, Database};
+use crate::{utils, Database, PduEvent};
 use log::debug;
-use ruma_events::collections::all::Event;
+use ruma_events::{room::message::MessageEvent, EventType};
 use ruma_federation_api::RoomV3Pdu;
 use ruma_identifiers::{EventId, RoomId, UserId};
-use std::convert::{TryFrom, TryInto};
+use std::{
+    collections::HashMap,
+    convert::{TryFrom, TryInto},
+};
 
 pub struct Data {
     hostname: String,
@@ -145,7 +148,7 @@ impl Data {
     }
 
     /// Add a persisted data unit from this homeserver
-    pub fn pdu_append(&self, event_id: &EventId, room_id: &RoomId, event: Event) {
+    pub fn pdu_append_message(&self, event_id: &EventId, room_id: &RoomId, event: MessageEvent) {
         // prev_events are the leaves of the current graph. This method removes all leaves from the
         // room and replaces them with our event
         let prev_events = self.pdu_leaves_replace(room_id, event_id);
@@ -163,25 +166,25 @@ impl Data {
             .unwrap_or(0_u64)
             + 1;
 
-        let mut pdu_value = serde_json::to_value(&event).expect("message event can be serialized");
-        let pdu = pdu_value.as_object_mut().unwrap();
-
-        pdu.insert(
-            "prev_events".to_owned(),
-            prev_events
-                .iter()
-                .map(|id| id.to_string())
-                .collect::<Vec<_>>()
-                .into(),
-        );
-        pdu.insert("origin".to_owned(), self.hostname().into());
-        pdu.insert("depth".to_owned(), depth.into());
-        pdu.insert("auth_events".to_owned(), vec!["$auth_eventid"].into()); // TODO
-        pdu.insert(
-            "hashes".to_owned(),
-            "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".into(),
-        ); // TODO
-        pdu.insert("signatures".to_owned(), "signature".into()); // TODO
+        let pdu = PduEvent {
+            event_id: event_id.clone(),
+            room_id: room_id.clone(),
+            sender: event.sender,
+            origin: self.hostname.clone(),
+            origin_server_ts: event.origin_server_ts,
+            kind: EventType::RoomMessage,
+            content: serde_json::to_value(event.content).unwrap(),
+            state_key: None,
+            prev_events,
+            depth: depth.try_into().unwrap(),
+            auth_events: Vec::new(),
+            redacts: None,
+            unsigned: Default::default(),
+            hashes: ruma_federation_api::EventHash {
+                sha256: "aaa".to_owned(),
+            },
+            signatures: HashMap::new(),
+        };
 
         // The new value will need a new index. We store the last used index in 'n' + id
         let mut count_key: Vec<u8> = vec![b'n'];
@@ -205,7 +208,7 @@ impl Data {
 
         self.db
             .pduid_pdus
-            .insert(&pdu_id, dbg!(&*serde_json::to_string(&pdu).unwrap()))
+            .insert(&pdu_id, &*serde_json::to_string(&pdu).unwrap())
             .unwrap();
 
         self.db
@@ -215,7 +218,7 @@ impl Data {
     }
 
     /// Returns a vector of all PDUs.
-    pub fn pdus_all(&self) -> Vec<RoomV3Pdu> {
+    pub fn pdus_all(&self) -> Vec<PduEvent> {
         self.pdus_since(
             self.db
                 .eventid_pduid
@@ -229,7 +232,7 @@ impl Data {
     }
 
     /// Returns a vector of all events that happened after the event with id `since`.
-    pub fn pdus_since(&self, since: String) -> Vec<RoomV3Pdu> {
+    pub fn pdus_since(&self, since: String) -> Vec<PduEvent> {
         let mut pdus = Vec::new();
 
         if let Some(room_id) = since.rsplitn(2, '#').nth(1) {
diff --git a/src/main.rs b/src/main.rs
index 8af476d0..44ff413b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,13 +1,15 @@
 #![feature(proc_macro_hygiene, decl_macro)]
 mod data;
 mod database;
+mod pdu;
 mod ruma_wrapper;
 mod utils;
 
 pub use data::Data;
 pub use database::Database;
+pub use pdu::PduEvent;
 
-use log::debug;
+use log::{debug, error};
 use rocket::{get, options, post, put, routes, State};
 use ruma_client_api::{
     error::{Error, ErrorKind},
@@ -17,7 +19,7 @@ use ruma_client_api::{
     },
     unversioned::get_supported_versions,
 };
-use ruma_events::{collections::all::Event, room::message::MessageEvent};
+use ruma_events::{collections::all::RoomEvent, room::message::MessageEvent, EventResult};
 use ruma_identifiers::{EventId, UserId};
 use ruma_wrapper::{MatrixResult, Ruma};
 use serde_json::map::Map;
@@ -212,7 +214,7 @@ fn create_message_event_route(
     body: Ruma<create_message_event::Request>,
 ) -> MatrixResult<create_message_event::Response> {
     // Construct event
-    let mut event = Event::RoomMessage(MessageEvent {
+    let mut event = RoomEvent::RoomMessage(MessageEvent {
         content: body.data.clone().into_result().unwrap(),
         event_id: EventId::try_from("$thiswillbefilledinlater").unwrap(),
         origin_server_ts: utils::millis_since_unix_epoch(),
@@ -230,13 +232,13 @@ fn create_message_event_route(
     .expect("ruma's reference hashes are correct");
 
     // Insert event id
-    if let Event::RoomMessage(message) = &mut event {
+    if let RoomEvent::RoomMessage(message) = &mut event {
         message.event_id = event_id.clone();
+        data.pdu_append_message(&event_id, &body.room_id, message.clone());
+    } else {
+        error!("only roommessages are handled currently");
     }
 
-    // Add PDU to the graph
-    data.pdu_append(&event_id, &body.room_id, event);
-
     MatrixResult(Ok(create_message_event::Response { event_id }))
 }
 
@@ -245,30 +247,38 @@ fn sync_route(
     data: State<Data>,
     body: Ruma<sync_events::Request>,
 ) -> MatrixResult<sync_events::Response> {
-    let pdus = data.pdus_all();
     let mut joined_rooms = HashMap::new();
-    joined_rooms.insert(
-        "!roomid:localhost".try_into().unwrap(),
-        sync_events::JoinedRoom {
-            account_data: sync_events::AccountData { events: Vec::new() },
-            summary: sync_events::RoomSummary {
-                heroes: Vec::new(),
-                joined_member_count: None,
-                invited_member_count: None,
+    {
+        let pdus = data.pdus_all();
+        let mut room_events = Vec::new();
+
+        for pdu in pdus {
+            room_events.push(pdu.to_room_event());
+        }
+
+        joined_rooms.insert(
+            "!roomid:localhost".try_into().unwrap(),
+            sync_events::JoinedRoom {
+                account_data: sync_events::AccountData { events: Vec::new() },
+                summary: sync_events::RoomSummary {
+                    heroes: Vec::new(),
+                    joined_member_count: None,
+                    invited_member_count: None,
+                },
+                unread_notifications: sync_events::UnreadNotificationsCount {
+                    highlight_count: None,
+                    notification_count: None,
+                },
+                timeline: sync_events::Timeline {
+                    limited: None,
+                    prev_batch: None,
+                    events: room_events,
+                },
+                state: sync_events::State { events: Vec::new() },
+                ephemeral: sync_events::Ephemeral { events: Vec::new() },
             },
-            unread_notifications: sync_events::UnreadNotificationsCount {
-                highlight_count: None,
-                notification_count: None,
-            },
-            timeline: sync_events::Timeline {
-                limited: None,
-                prev_batch: None,
-                events: todo!(),
-            },
-            state: sync_events::State { events: Vec::new() },
-            ephemeral: sync_events::Ephemeral { events: Vec::new() },
-        },
-    );
+        );
+    }
 
     MatrixResult(Ok(sync_events::Response {
         next_batch: String::new(),
diff --git a/src/pdu.rs b/src/pdu.rs
new file mode 100644
index 00000000..588242b7
--- /dev/null
+++ b/src/pdu.rs
@@ -0,0 +1,42 @@
+use js_int::UInt;
+use ruma_events::{collections::all::RoomEvent, EventResult, EventType};
+use ruma_federation_api::EventHash;
+use ruma_identifiers::{EventId, RoomId, UserId};
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+
+#[derive(Deserialize, Serialize)]
+pub struct PduEvent {
+    pub event_id: EventId,
+    pub room_id: RoomId,
+    pub sender: UserId,
+    pub origin: String,
+    pub origin_server_ts: UInt,
+    #[serde(rename = "type")]
+    pub kind: EventType,
+    pub content: serde_json::Value,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub state_key: Option<String>,
+    pub prev_events: Vec<EventId>,
+    pub depth: UInt,
+    pub auth_events: Vec<EventId>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub redacts: Option<EventId>,
+    #[serde(default, skip_serializing_if = "serde_json::Map::is_empty")]
+    pub unsigned: serde_json::Map<String, serde_json::Value>,
+    pub hashes: EventHash,
+    pub signatures: HashMap<String, HashMap<String, String>>,
+}
+
+impl PduEvent {
+    pub fn to_room_event(&self) -> RoomEvent {
+        // Can only fail in rare circumstances that won't ever happen here, see
+        // https://docs.rs/serde_json/1.0.50/serde_json/fn.to_string.html
+        let json = serde_json::to_string(&self).unwrap();
+        // EventResult's deserialize implementation always returns `Ok(...)`
+        serde_json::from_str::<EventResult<RoomEvent>>(&json)
+            .unwrap()
+            .into_result()
+            .unwrap()
+    }
+}