From e7348621bd9ba1d7a5b623437343bdac899338b2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jakub=20Kub=C3=ADk?= <jakub.kubik.it@protonmail.com>
Date: Fri, 18 Nov 2022 17:18:20 +0100
Subject: [PATCH] feat(presence): implement most features for PoC

---
 src/api/client_server/presence.rs             |   2 +-
 src/api/server_server.rs                      |  29 +++-
 src/config/mod.rs                             |  13 ++
 src/database/key_value/rooms/edus/presence.rs | 164 +++++++++++++-----
 src/database/mod.rs                           |   5 +-
 src/service/globals/mod.rs                    |   8 +
 src/service/rooms/edus/presence/data.rs       |  10 +-
 src/service/rooms/edus/presence/mod.rs        |  81 +--------
 8 files changed, 183 insertions(+), 129 deletions(-)

diff --git a/src/api/client_server/presence.rs b/src/api/client_server/presence.rs
index dfac3dbd..7afda962 100644
--- a/src/api/client_server/presence.rs
+++ b/src/api/client_server/presence.rs
@@ -60,7 +60,7 @@ pub async fn get_presence_route(
             .rooms
             .edus
             .presence
-            .get_last_presence_event(sender_user, &room_id)?
+            .get_presence_event(sender_user, &room_id)?
         {
             presence_event = Some(presence);
             break;
diff --git a/src/api/server_server.rs b/src/api/server_server.rs
index b7f88078..9154b3ef 100644
--- a/src/api/server_server.rs
+++ b/src/api/server_server.rs
@@ -33,6 +33,7 @@ use ruma::{
     },
     directory::{IncomingFilter, IncomingRoomNetwork},
     events::{
+        presence::{PresenceEvent, PresenceEventContent},
         receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
         room::{
             join_rules::{JoinRule, RoomJoinRulesEventContent},
@@ -746,7 +747,33 @@ pub async fn send_transaction_message_route(
         .filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
     {
         match edu {
-            Edu::Presence(_) => {}
+            Edu::Presence(presence) => {
+                for presence_update in presence.push {
+                    let user_id = presence_update.user_id;
+                    for room_id in services()
+                        .rooms
+                        .state_cache
+                        .rooms_joined(&user_id)
+                        .filter_map(|room_id| room_id.ok())
+                    {
+                        services().rooms.edus.presence.update_presence(
+                            &user_id,
+                            &room_id,
+                            PresenceEvent {
+                                content: PresenceEventContent {
+                                    avatar_url: services().users.avatar_url(&user_id)?,
+                                    currently_active: Some(presence_update.currently_active),
+                                    displayname: services().users.displayname(&user_id)?,
+                                    last_active_ago: Some(presence_update.last_active_ago),
+                                    presence: presence_update.presence.clone(),
+                                    status_msg: presence_update.status_msg.clone(),
+                                },
+                                sender: user_id.clone(),
+                            },
+                        )?;
+                    }
+                }
+            }
             Edu::Receipt(receipt) => {
                 for (room_id, room_updates) in receipt.receipts {
                     for (user_id, user_updates) in room_updates.read {
diff --git a/src/config/mod.rs b/src/config/mod.rs
index 6b862bb6..78724ab2 100644
--- a/src/config/mod.rs
+++ b/src/config/mod.rs
@@ -76,6 +76,11 @@ pub struct Config {
 
     pub emergency_password: Option<String>,
 
+    #[serde(default = "default_presence_idle_timeout")]
+    pub presence_idle_timeout: u64,
+    #[serde(default = "default_presence_offline_timeout")]
+    pub presence_offline_timeout: u64,
+
     #[serde(flatten)]
     pub catchall: BTreeMap<String, IgnoredAny>,
 }
@@ -257,6 +262,14 @@ fn default_turn_ttl() -> u64 {
     60 * 60 * 24
 }
 
+fn default_presence_idle_timeout() -> u64 {
+    1 * 60 as u64
+}
+
+fn default_presence_offline_timeout() -> u64 {
+    15 * 60 as u64
+}
+
 // I know, it's a great name
 pub fn default_default_room_version() -> RoomVersionId {
     RoomVersionId::V9
diff --git a/src/database/key_value/rooms/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs
index 763572ec..dae6f759 100644
--- a/src/database/key_value/rooms/edus/presence.rs
+++ b/src/database/key_value/rooms/edus/presence.rs
@@ -1,5 +1,7 @@
 use futures_util::{stream::FuturesUnordered, StreamExt};
+use ruma::user_id;
 use std::{collections::HashMap, time::Duration};
+use tracing::error;
 
 use ruma::{
     events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId,
@@ -7,9 +9,11 @@ use ruma::{
 use tokio::{sync::mpsc, time::sleep};
 
 use crate::{
-    database::KeyValueDatabase, service, services, utils, utils::u64_from_bytes, Error, Result,
+    database::KeyValueDatabase,
+    service, services, utils,
+    utils::{millis_since_unix_epoch, u64_from_bytes},
+    Error, Result,
 };
-use crate::utils::millis_since_unix_epoch;
 
 pub struct PresenceUpdate {
     count: u64,
@@ -17,15 +21,15 @@ pub struct PresenceUpdate {
 }
 
 impl PresenceUpdate {
-    fn to_be_bytes(&self) -> &[u8] {
-        &*([self.count.to_be_bytes(), self.timestamp.to_be_bytes()].concat())
+    fn to_be_bytes(&self) -> Vec<u8> {
+        [self.count.to_be_bytes(), self.timestamp.to_be_bytes()].concat()
     }
 
     fn from_be_bytes(bytes: &[u8]) -> Result<Self> {
         let (count_bytes, timestamp_bytes) = bytes.split_at(bytes.len() / 2);
         Ok(Self {
-            count: u64_from_bytes(count_bytes)?,
-            timestamp: u64_from_bytes(timestamp_bytes)?,
+            count: u64_from_bytes(count_bytes).expect("count bytes from DB are valid"),
+            timestamp: u64_from_bytes(timestamp_bytes).expect("timestamp bytes from DB are valid"),
         })
     }
 }
@@ -37,19 +41,23 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
         room_id: &RoomId,
         presence: PresenceEvent,
     ) -> Result<()> {
-        let mut roomuser_id = [room_id.as_bytes(), 0xff, user_id.as_bytes()].concat();
+        let roomuser_id = [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat();
 
         self.roomuserid_presenceevent.insert(
             &roomuser_id,
-            &serde_json::to_vec(&presence)?,
+            &serde_json::to_vec(&presence).expect("presence event from DB is valid"),
         )?;
 
         self.userid_presenceupdate.insert(
             user_id.as_bytes(),
-            PresenceUpdate {
+            &*PresenceUpdate {
                 count: services().globals.next_count()?,
-                timestamp: millis_since_unix_epoch(),
-            }.to_be_bytes(),
+                timestamp: match presence.content.last_active_ago {
+                    Some(active_ago) => millis_since_unix_epoch().saturating_sub(active_ago.into()),
+                    None => millis_since_unix_epoch(),
+                },
+            }
+            .to_be_bytes(),
         )?;
 
         Ok(())
@@ -58,10 +66,11 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
     fn ping_presence(&self, user_id: &UserId) -> Result<()> {
         self.userid_presenceupdate.insert(
             user_id.as_bytes(),
-            PresenceUpdate {
+            &*PresenceUpdate {
                 count: services().globals.current_count()?,
                 timestamp: millis_since_unix_epoch(),
-            }.to_be_bytes()
+            }
+            .to_be_bytes(),
         )?;
 
         Ok(())
@@ -70,9 +79,7 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
     fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> {
         self.userid_presenceupdate
             .get(user_id.as_bytes())?
-            .map(|bytes| {
-                PresenceUpdate::from_be_bytes(bytes)?.timestamp
-            })
+            .map(|bytes| PresenceUpdate::from_be_bytes(&bytes).map(|update| update.timestamp))
             .transpose()
     }
 
@@ -80,57 +87,131 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
         &self,
         room_id: &RoomId,
         user_id: &UserId,
-        presence_timestamp: u64
+        presence_timestamp: u64,
     ) -> Result<Option<PresenceEvent>> {
-        let mut roomuser_id = [room_id.as_bytes(), 0xff, user_id.as_bytes()].concat();
+        let roomuser_id = [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat();
         self.roomuserid_presenceevent
             .get(&roomuser_id)?
             .map(|value| parse_presence_event(&value, presence_timestamp))
             .transpose()
     }
 
-    fn presence_since(
-        &self,
+    fn presence_since<'a>(
+        &'a self,
         room_id: &RoomId,
         since: u64,
-    ) -> Result<Box<dyn Iterator<Item=(&UserId, PresenceEvent)>>> {
+    ) -> Result<Box<dyn Iterator<Item = (OwnedUserId, PresenceEvent)> + 'a>> {
         let services = &services();
-        let mut user_timestamp: HashMap<UserId, u64> = self.userid_presenceupdate
+        let user_timestamp: HashMap<OwnedUserId, u64> = self
+            .userid_presenceupdate
             .iter()
-            .map(|(user_id_bytes, update_bytes)| (UserId::parse(utils::string_from_bytes(user_id_bytes)), PresenceUpdate::from_be_bytes(update_bytes)?))
+            .filter_map(|(user_id_bytes, update_bytes)| {
+                Some((
+                    OwnedUserId::from(
+                        UserId::parse(utils::string_from_bytes(&user_id_bytes).ok()?).ok()?,
+                    ),
+                    PresenceUpdate::from_be_bytes(&update_bytes).ok()?,
+                ))
+            })
             .filter_map(|(user_id, presence_update)| {
-                if presence_update.count <= since || !services.rooms.state_cache.is_joined(user_id, room_id)? {
-                    return None
+                if presence_update.count <= since
+                    || !services
+                        .rooms
+                        .state_cache
+                        .is_joined(&user_id, room_id)
+                        .ok()?
+                {
+                    return None;
                 }
 
                 Some((user_id, presence_update.timestamp))
             })
             .collect();
 
-        Ok(
+        Ok(Box::new(
             self.roomuserid_presenceevent
                 .iter()
-                .filter_map(|user_id_bytes, presence_bytes| (UserId::parse(utils::string_from_bytes(user_id_bytes)), presence_bytes))
-                .filter_map(|user_id, presence_bytes| {
-                    let timestamp = user_timestamp.get(user_id)?;
-
-                    Some((user_id, parse_presence_event(presence_bytes, *timestamp)?))
+                .filter_map(|(user_id_bytes, presence_bytes)| {
+                    Some((
+                        OwnedUserId::from(
+                            UserId::parse(utils::string_from_bytes(&user_id_bytes).ok()?).ok()?,
+                        ),
+                        presence_bytes,
+                    ))
                 })
-                .into_iter()
-        )
+                .filter_map(
+                    move |(user_id, presence_bytes)| -> Option<(OwnedUserId, PresenceEvent)> {
+                        let timestamp = user_timestamp.get(&user_id)?;
+
+                        Some((
+                            user_id,
+                            parse_presence_event(&presence_bytes, *timestamp).ok()?,
+                        ))
+                    },
+                ),
+        ))
     }
 
     fn presence_maintain(
         &self,
-        mut timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>,
+        mut timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>,
     ) -> Result<()> {
         let mut timers = FuturesUnordered::new();
 
+        // TODO: Get rid of this hack
+        timers.push(create_presence_timer(
+            Duration::from_secs(60),
+            user_id!("@test:test.com").to_owned(),
+        ));
+
         tokio::spawn(async move {
             loop {
                 tokio::select! {
-                    Some(_user_id) = timers.next() => {
-                        // TODO: Handle presence timeouts
+                    Some(user_id) = timers.next() => {
+                        let presence_timestamp = match services().rooms.edus.presence.last_presence_update(&user_id) {
+                            Ok(timestamp) => match timestamp {
+                                Some(timestamp) => timestamp,
+                                None => continue,
+                            },
+                            Err(e) => {
+                                error!("{e}");
+                                continue;
+                            }
+                        };
+
+                        let presence_state = determine_presence_state(presence_timestamp);
+
+                        // Continue if there is no change in state
+                        if presence_state != PresenceState::Offline {
+                            continue;
+                        }
+
+                        for room_id in services()
+                                        .rooms
+                                        .state_cache
+                                        .rooms_joined(&user_id)
+                                        .filter_map(|room_id| room_id.ok()) {
+                            let presence_event = match services().rooms.edus.presence.get_presence_event(&user_id, &room_id) {
+                                Ok(event) => match event {
+                                    Some(event) => event,
+                                    None => continue,
+                                },
+                                Err(e) => {
+                                    error!("{e}");
+                                    continue;
+                                }
+                            };
+
+                            match services().rooms.edus.presence.update_presence(&user_id, &room_id, presence_event) {
+                                Ok(()) => (),
+                                Err(e) => {
+                                    error!("{e}");
+                                    continue;
+                                }
+                            }
+
+                            // TODO: Send event over federation
+                        }
                     }
                     Some(user_id) = timer_receiver.recv() => {
                         // Idle timeout
@@ -147,7 +228,7 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
     }
 }
 
-async fn create_presence_timer(duration: Duration, user_id: Box<UserId>) -> Box<UserId> {
+async fn create_presence_timer(duration: Duration, user_id: OwnedUserId) -> OwnedUserId {
     sleep(duration).await;
 
     user_id
@@ -162,9 +243,7 @@ fn parse_presence_event(bytes: &[u8], presence_timestamp: u64) -> Result<Presenc
     Ok(presence)
 }
 
-fn determine_presence_state(
-    last_active_ago: u64,
-) -> PresenceState {
+fn determine_presence_state(last_active_ago: u64) -> PresenceState {
     let globals = &services().globals;
 
     return if last_active_ago < globals.presence_idle_timeout() {
@@ -177,10 +256,7 @@ fn determine_presence_state(
 }
 
 /// Translates the timestamp representing last_active_ago to a diff from now.
-fn translate_active_ago(
-    presence_event: &mut PresenceEvent,
-    last_active_ts: u64,
-) {
+fn translate_active_ago(presence_event: &mut PresenceEvent, last_active_ts: u64) {
     let last_active_ago = millis_since_unix_epoch().saturating_sub(last_active_ts);
 
     presence_event.content.presence = determine_presence_state(last_active_ago);
diff --git a/src/database/mod.rs b/src/database/mod.rs
index 0797a136..7baa512a 100644
--- a/src/database/mod.rs
+++ b/src/database/mod.rs
@@ -65,7 +65,7 @@ pub struct KeyValueDatabase {
     pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count
     pub(super) typingid_userid: Arc<dyn KvTree>,        // TypingId = RoomId + TimeoutTime + Count
     pub(super) roomid_lasttypingupdate: Arc<dyn KvTree>, // LastRoomTypingUpdate = Count
-    pub(super) userid_presenceupdate: Arc<dyn KvTree>,    // PresenceUpdate = Count + Timestamp
+    pub(super) userid_presenceupdate: Arc<dyn KvTree>,  // PresenceUpdate = Count + Timestamp
     pub(super) roomuserid_presenceevent: Arc<dyn KvTree>, // PresenceEvent
 
     //pub rooms: rooms::Rooms,
@@ -825,9 +825,6 @@ impl KeyValueDatabase {
             );
         }
 
-        // This data is probably outdated
-        db.presenceid_presence.clear()?;
-
         services().admin.start_handler();
 
         // Set emergency access for the conduit user
diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs
index affc0516..94e3fb97 100644
--- a/src/service/globals/mod.rs
+++ b/src/service/globals/mod.rs
@@ -286,6 +286,14 @@ impl Service {
         &self.config.emergency_password
     }
 
+    pub fn presence_idle_timeout(&self) -> u64 {
+        self.config.presence_idle_timeout
+    }
+
+    pub fn presence_offline_timeout(&self) -> u64 {
+        self.config.presence_offline_timeout
+    }
+
     pub fn supported_room_versions(&self) -> Vec<RoomVersionId> {
         let mut room_versions: Vec<RoomVersionId> = vec![];
         room_versions.extend(self.stable_room_versions.clone());
diff --git a/src/service/rooms/edus/presence/data.rs b/src/service/rooms/edus/presence/data.rs
index 216313fe..5dc4c3cb 100644
--- a/src/service/rooms/edus/presence/data.rs
+++ b/src/service/rooms/edus/presence/data.rs
@@ -1,5 +1,3 @@
-use std::collections::HashMap;
-
 use crate::Result;
 use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId};
 use tokio::sync::mpsc;
@@ -31,12 +29,12 @@ pub trait Data: Send + Sync {
     ) -> Result<Option<PresenceEvent>>;
 
     /// Returns the most recent presence updates that happened after the event with id `since`.
-    fn presence_since(
-        &self,
+    fn presence_since<'a>(
+        &'a self,
         room_id: &RoomId,
         since: u64,
-    ) -> Result<HashMap<OwnedUserId, PresenceEvent>>;
+    ) -> Result<Box<dyn Iterator<Item = (OwnedUserId, PresenceEvent)> + 'a>>;
 
-    fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>)
+    fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>)
         -> Result<()>;
 }
diff --git a/src/service/rooms/edus/presence/mod.rs b/src/service/rooms/edus/presence/mod.rs
index 23194dd1..faac5c76 100644
--- a/src/service/rooms/edus/presence/mod.rs
+++ b/src/service/rooms/edus/presence/mod.rs
@@ -1,5 +1,4 @@
 mod data;
-use std::collections::HashMap;
 
 pub use data::Data;
 use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId};
@@ -11,7 +10,7 @@ pub struct Service {
     pub db: &'static dyn Data,
 
     // Presence timers
-    timer_sender: mpsc::UnboundedSender<Box<UserId>>,
+    timer_sender: mpsc::UnboundedSender<OwnedUserId>,
 }
 
 impl Service {
@@ -51,7 +50,11 @@ impl Service {
         self.db.ping_presence(user_id)
     }
 
-    pub fn get_last_presence_event(
+    pub fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> {
+        self.db.last_presence_update(user_id)
+    }
+
+    pub fn get_presence_event(
         &self,
         user_id: &UserId,
         room_id: &RoomId,
@@ -66,86 +69,18 @@ impl Service {
 
     pub fn presence_maintain(
         &self,
-        timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>,
+        timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>,
     ) -> Result<()> {
         self.db.presence_maintain(timer_receiver)
     }
 
-    /* TODO
-    /// Sets all users to offline who have been quiet for too long.
-    fn _presence_maintain(
-        &self,
-        rooms: &super::Rooms,
-        globals: &super::super::globals::Globals,
-    ) -> Result<()> {
-        let current_timestamp = utils::millis_since_unix_epoch();
-
-        for (user_id_bytes, last_timestamp) in self
-            .userid_lastpresenceupdate
-            .iter()
-            .filter_map(|(k, bytes)| {
-                Some((
-                    k,
-                    utils::u64_from_bytes(&bytes)
-                        .map_err(|_| {
-                            Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
-                        })
-                        .ok()?,
-                ))
-            })
-            .take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000)
-        // 5 Minutes
-        {
-            // Send new presence events to set the user offline
-            let count = globals.next_count()?.to_be_bytes();
-            let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes)
-                .map_err(|_| {
-                    Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.")
-                })?
-                .try_into()
-                .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?;
-            for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) {
-                let mut presence_id = room_id.as_bytes().to_vec();
-                presence_id.push(0xff);
-                presence_id.extend_from_slice(&count);
-                presence_id.push(0xff);
-                presence_id.extend_from_slice(&user_id_bytes);
-
-                self.presenceid_presence.insert(
-                    &presence_id,
-                    &serde_json::to_vec(&PresenceEvent {
-                        content: PresenceEventContent {
-                            avatar_url: None,
-                            currently_active: None,
-                            displayname: None,
-                            last_active_ago: Some(
-                                last_timestamp.try_into().expect("time is valid"),
-                            ),
-                            presence: PresenceState::Offline,
-                            status_msg: None,
-                        },
-                        sender: user_id.to_owned(),
-                    })
-                    .expect("PresenceEvent can be serialized"),
-                )?;
-            }
-
-            self.userid_lastpresenceupdate.insert(
-                user_id.as_bytes(),
-                &utils::millis_since_unix_epoch().to_be_bytes(),
-            )?;
-        }
-
-        Ok(())
-    }*/
-
     /// Returns the most recent presence updates that happened after the event with id `since`.
     #[tracing::instrument(skip(self, since, room_id))]
     pub fn presence_since(
         &self,
         room_id: &RoomId,
         since: u64,
-    ) -> Result<HashMap<OwnedUserId, PresenceEvent>> {
+    ) -> Result<Box<dyn Iterator<Item = (OwnedUserId, PresenceEvent)>>> {
         self.db.presence_since(room_id, since)
     }
 }