From cd5a83d4e2f59eac2fbad42cfa2b3c290cc173d3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jakub=20Kub=C3=ADk?= <jakub.kubik.it@protonmail.com>
Date: Thu, 17 Nov 2022 19:18:28 +0100
Subject: [PATCH] feat(presence): start presence timeout implementation

---
 src/database/key_value/rooms/edus/presence.rs | 37 +++++++++++++------
 src/service/mod.rs                            |  2 +-
 src/service/rooms/edus/presence/data.rs       |  4 ++
 src/service/rooms/edus/presence/mod.rs        | 31 +++++++++++++++-
 4 files changed, 61 insertions(+), 13 deletions(-)

diff --git a/src/database/key_value/rooms/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs
index 904b1c44..a72f1136 100644
--- a/src/database/key_value/rooms/edus/presence.rs
+++ b/src/database/key_value/rooms/edus/presence.rs
@@ -1,8 +1,10 @@
-use std::collections::HashMap;
+use futures_util::{stream::FuturesUnordered, StreamExt};
+use std::{collections::HashMap, time::Duration};
 
 use ruma::{
     events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId,
 };
+use tokio::{sync::mpsc, time::sleep};
 
 use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
 
@@ -109,24 +111,37 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
         Ok(hashmap)
     }
 
-    /*
-    fn presence_maintain(&self, db: Arc<TokioRwLock<Database>>) {
-        // TODO @M0dEx: move this to a timed tasks module
+    fn presence_maintain(
+        &self,
+        mut timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>,
+    ) -> Result<()> {
+        let mut timers = FuturesUnordered::new();
+
         tokio::spawn(async move {
             loop {
-                select! {
-                    Some(user_id) = self.presence_timers.next() {
-                        // TODO @M0dEx: would it be better to acquire the lock outside the loop?
-                        let guard = db.read().await;
+                tokio::select! {
+                    Some(_user_id) = timers.next() => {
+                        // TODO: Handle presence timeouts
+                    }
+                    Some(user_id) = timer_receiver.recv() => {
+                        // Idle timeout
+                        timers.push(create_presence_timer(Duration::from_secs(60), user_id.clone()));
 
-                        // TODO @M0dEx: add self.presence_timers
-                        // TODO @M0dEx: maintain presence
+                        // Offline timeout
+                        timers.push(create_presence_timer(Duration::from_secs(60*15) , user_id));
                     }
                 }
             }
         });
+
+        Ok(())
     }
-    */
+}
+
+async fn create_presence_timer(duration: Duration, user_id: Box<UserId>) -> Box<UserId> {
+    sleep(duration).await;
+
+    user_id
 }
 
 fn parse_presence_event(bytes: &[u8]) -> Result<PresenceEvent> {
diff --git a/src/service/mod.rs b/src/service/mod.rs
index 385dcc69..6858ce1e 100644
--- a/src/service/mod.rs
+++ b/src/service/mod.rs
@@ -62,7 +62,7 @@ impl Services {
                 auth_chain: rooms::auth_chain::Service { db },
                 directory: rooms::directory::Service { db },
                 edus: rooms::edus::Service {
-                    presence: rooms::edus::presence::Service { db },
+                    presence: rooms::edus::presence::Service::build(db)?,
                     read_receipt: rooms::edus::read_receipt::Service { db },
                     typing: rooms::edus::typing::Service { db },
                 },
diff --git a/src/service/rooms/edus/presence/data.rs b/src/service/rooms/edus/presence/data.rs
index 53329e08..9c016705 100644
--- a/src/service/rooms/edus/presence/data.rs
+++ b/src/service/rooms/edus/presence/data.rs
@@ -2,6 +2,7 @@ use std::collections::HashMap;
 
 use crate::Result;
 use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId};
+use tokio::sync::mpsc;
 
 pub trait Data: Send + Sync {
     /// Adds a presence event which will be saved until a new event replaces it.
@@ -35,4 +36,7 @@ pub trait Data: Send + Sync {
         room_id: &RoomId,
         since: u64,
     ) -> Result<HashMap<OwnedUserId, PresenceEvent>>;
+
+    fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>)
+        -> Result<()>;
 }
diff --git a/src/service/rooms/edus/presence/mod.rs b/src/service/rooms/edus/presence/mod.rs
index 860aea18..23194dd1 100644
--- a/src/service/rooms/edus/presence/mod.rs
+++ b/src/service/rooms/edus/presence/mod.rs
@@ -3,14 +3,30 @@ use std::collections::HashMap;
 
 pub use data::Data;
 use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId};
+use tokio::sync::mpsc;
 
-use crate::Result;
+use crate::{Error, Result};
 
 pub struct Service {
     pub db: &'static dyn Data,
+
+    // Presence timers
+    timer_sender: mpsc::UnboundedSender<Box<UserId>>,
 }
 
 impl Service {
+    pub fn build(db: &'static dyn Data) -> Result<Self> {
+        let (sender, receiver) = mpsc::unbounded_channel();
+        let service = Self {
+            db,
+            timer_sender: sender,
+        };
+
+        service.presence_maintain(receiver)?;
+
+        Ok(service)
+    }
+
     /// Adds a presence event which will be saved until a new event replaces it.
     ///
     /// Note: This method takes a RoomId because presence updates are always bound to rooms to
@@ -21,11 +37,17 @@ impl Service {
         room_id: &RoomId,
         presence: PresenceEvent,
     ) -> Result<()> {
+        self.timer_sender
+            .send(user_id.into())
+            .map_err(|_| Error::bad_database("Sender errored out"))?;
         self.db.update_presence(user_id, room_id, presence)
     }
 
     /// Resets the presence timeout, so the user will stay in their current presence state.
     pub fn ping_presence(&self, user_id: &UserId) -> Result<()> {
+        self.timer_sender
+            .send(user_id.into())
+            .map_err(|_| Error::bad_database("Sender errored out"))?;
         self.db.ping_presence(user_id)
     }
 
@@ -42,6 +64,13 @@ impl Service {
         self.db.get_presence_event(room_id, user_id, last_update)
     }
 
+    pub fn presence_maintain(
+        &self,
+        timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>,
+    ) -> Result<()> {
+        self.db.presence_maintain(timer_receiver)
+    }
+
     /* TODO
     /// Sets all users to offline who have been quiet for too long.
     fn _presence_maintain(