mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-01-13 21:16:27 +03:00
feat: Implement private read receipts, partial notification clearing
This commit is contained in:
parent
c86313d4fa
commit
e8d435c541
10 changed files with 262 additions and 59 deletions
|
@ -34,29 +34,33 @@ pub async fn set_read_marker_route(
|
|||
)?;
|
||||
}
|
||||
|
||||
if body.private_read_receipt.is_some() || body.read_receipt.is_some() {
|
||||
services()
|
||||
.rooms
|
||||
.user
|
||||
.reset_notification_counts(sender_user, &body.room_id)?;
|
||||
}
|
||||
|
||||
if let Some(event) = &body.private_read_receipt {
|
||||
services().rooms.edus.read_receipt.private_read_set(
|
||||
&body.room_id,
|
||||
sender_user,
|
||||
services()
|
||||
let _pdu = services()
|
||||
.rooms
|
||||
.timeline
|
||||
.get_pdu_count(event)?
|
||||
.get_pdu(event)?
|
||||
.ok_or(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Event does not exist.",
|
||||
))?,
|
||||
))?;
|
||||
|
||||
services().rooms.edus.read_receipt.private_read_set(
|
||||
&body.room_id,
|
||||
sender_user,
|
||||
services().rooms.short.get_or_create_shorteventid(event)?,
|
||||
)?;
|
||||
}
|
||||
|
||||
if let Some(event) = &body.read_receipt {
|
||||
let _pdu = services()
|
||||
.rooms
|
||||
.timeline
|
||||
.get_pdu(event)?
|
||||
.ok_or(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Event does not exist.",
|
||||
))?;
|
||||
|
||||
let mut user_receipts = BTreeMap::new();
|
||||
user_receipts.insert(
|
||||
sender_user.clone(),
|
||||
|
@ -80,6 +84,12 @@ pub async fn set_read_marker_route(
|
|||
room_id: body.room_id.clone(),
|
||||
},
|
||||
)?;
|
||||
|
||||
services().rooms.edus.read_receipt.private_read_set(
|
||||
&body.room_id,
|
||||
sender_user,
|
||||
services().rooms.short.get_or_create_shorteventid(event)?,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(set_read_marker::v3::Response {})
|
||||
|
@ -93,16 +103,6 @@ pub async fn create_receipt_route(
|
|||
) -> Result<create_receipt::v3::Response> {
|
||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||
|
||||
if matches!(
|
||||
&body.receipt_type,
|
||||
create_receipt::v3::ReceiptType::Read | create_receipt::v3::ReceiptType::ReadPrivate
|
||||
) {
|
||||
services()
|
||||
.rooms
|
||||
.user
|
||||
.reset_notification_counts(sender_user, &body.room_id)?;
|
||||
}
|
||||
|
||||
match body.receipt_type {
|
||||
create_receipt::v3::ReceiptType::FullyRead => {
|
||||
let fully_read_event = ruma::events::fully_read::FullyReadEvent {
|
||||
|
@ -118,6 +118,16 @@ pub async fn create_receipt_route(
|
|||
)?;
|
||||
}
|
||||
create_receipt::v3::ReceiptType::Read => {
|
||||
let _pdu =
|
||||
services()
|
||||
.rooms
|
||||
.timeline
|
||||
.get_pdu(&body.event_id)?
|
||||
.ok_or(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Event does not exist.",
|
||||
))?;
|
||||
|
||||
let mut user_receipts = BTreeMap::new();
|
||||
user_receipts.insert(
|
||||
sender_user.clone(),
|
||||
|
@ -140,19 +150,34 @@ pub async fn create_receipt_route(
|
|||
room_id: body.room_id.clone(),
|
||||
},
|
||||
)?;
|
||||
}
|
||||
create_receipt::v3::ReceiptType::ReadPrivate => {
|
||||
|
||||
services().rooms.edus.read_receipt.private_read_set(
|
||||
&body.room_id,
|
||||
sender_user,
|
||||
services()
|
||||
.rooms
|
||||
.short
|
||||
.get_or_create_shorteventid(&body.event_id)?,
|
||||
)?;
|
||||
}
|
||||
create_receipt::v3::ReceiptType::ReadPrivate => {
|
||||
let _pdu =
|
||||
services()
|
||||
.rooms
|
||||
.timeline
|
||||
.get_pdu_count(&body.event_id)?
|
||||
.get_pdu(&body.event_id)?
|
||||
.ok_or(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Event does not exist.",
|
||||
))?,
|
||||
))?;
|
||||
|
||||
services().rooms.edus.read_receipt.private_read_set(
|
||||
&body.room_id,
|
||||
sender_user,
|
||||
services()
|
||||
.rooms
|
||||
.short
|
||||
.get_or_create_shorteventid(&body.event_id)?,
|
||||
)?;
|
||||
}
|
||||
_ => return Err(Error::bad_database("Unsupported receipt type")),
|
||||
|
|
|
@ -6,6 +6,7 @@ use ruma::{
|
|||
uiaa::UiaaResponse,
|
||||
},
|
||||
events::{
|
||||
receipt::{ReceiptThread, ReceiptType},
|
||||
room::member::{MembershipState, RoomMemberEventContent},
|
||||
RoomEventType, StateEventType,
|
||||
},
|
||||
|
@ -731,6 +732,50 @@ async fn sync_helper(
|
|||
.map(|(_, _, v)| v)
|
||||
.collect();
|
||||
|
||||
if services()
|
||||
.rooms
|
||||
.edus
|
||||
.read_receipt
|
||||
.last_privateread_update(&sender_user, &room_id)
|
||||
.unwrap_or(0)
|
||||
> since
|
||||
{
|
||||
if let Ok(event_id) = services().rooms.short.get_eventid_from_short(
|
||||
services()
|
||||
.rooms
|
||||
.edus
|
||||
.read_receipt
|
||||
.private_read_get(&room_id, &sender_user)
|
||||
.expect("User did not have a valid private read receipt?")
|
||||
.expect("User had a last read private receipt update but no receipt?"),
|
||||
) {
|
||||
let mut user_receipts = BTreeMap::new();
|
||||
user_receipts.insert(
|
||||
sender_user.clone(),
|
||||
ruma::events::receipt::Receipt {
|
||||
ts: None,
|
||||
thread: ReceiptThread::Unthreaded,
|
||||
},
|
||||
);
|
||||
|
||||
let mut receipts = BTreeMap::new();
|
||||
receipts.insert(ReceiptType::ReadPrivate, user_receipts);
|
||||
|
||||
let mut receipt_content = BTreeMap::new();
|
||||
receipt_content.insert((*event_id).to_owned(), receipts);
|
||||
|
||||
edus.push(
|
||||
serde_json::from_str(
|
||||
&serde_json::to_string(&ruma::events::SyncEphemeralRoomEvent {
|
||||
content: ruma::events::receipt::ReceiptEventContent(receipt_content),
|
||||
})
|
||||
.expect("Did not get valid JSON?"),
|
||||
)
|
||||
.expect("JSON was somehow invalid despite just being created"),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
if services().rooms.edus.typing.last_typing_update(&room_id)? > since {
|
||||
edus.push(
|
||||
serde_json::from_str(
|
||||
|
|
|
@ -24,7 +24,10 @@ pub async fn get_supported_versions_route(
|
|||
"v1.1".to_owned(),
|
||||
"v1.2".to_owned(),
|
||||
],
|
||||
unstable_features: BTreeMap::from_iter([("org.matrix.e2e_cross_signing".to_owned(), true)]),
|
||||
unstable_features: BTreeMap::from_iter([
|
||||
("org.matrix.e2e_cross_signing".to_owned(), true),
|
||||
("org.matrix.msc2285.stable".to_owned(), true),
|
||||
]),
|
||||
};
|
||||
|
||||
Ok(resp)
|
||||
|
|
|
@ -105,16 +105,25 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase {
|
|||
)
|
||||
}
|
||||
|
||||
fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> {
|
||||
fn private_read_set(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
user_id: &UserId,
|
||||
shorteventid: u64,
|
||||
) -> Result<()> {
|
||||
let mut key = room_id.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
if self.private_read_get(room_id, user_id)?.unwrap_or(0) < shorteventid {
|
||||
self.roomuserid_privateread
|
||||
.insert(&key, &count.to_be_bytes())?;
|
||||
.insert(&key, &shorteventid.to_be_bytes())?;
|
||||
|
||||
self.roomuserid_lastprivatereadupdate
|
||||
.insert(&key, &services().globals.next_count()?.to_be_bytes())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
|
||||
|
|
|
@ -3,7 +3,13 @@ use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
|
|||
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
|
||||
|
||||
impl service::rooms::user::Data for KeyValueDatabase {
|
||||
fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
|
||||
fn update_notification_counts(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
notification_count: u64,
|
||||
highlight_count: u64,
|
||||
) -> Result<()> {
|
||||
let mut userroom_id = user_id.as_bytes().to_vec();
|
||||
userroom_id.push(0xff);
|
||||
userroom_id.extend_from_slice(room_id.as_bytes());
|
||||
|
@ -12,9 +18,9 @@ impl service::rooms::user::Data for KeyValueDatabase {
|
|||
roomuser_id.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
self.userroomid_notificationcount
|
||||
.insert(&userroom_id, &0_u64.to_be_bytes())?;
|
||||
.insert(&userroom_id, ¬ification_count.to_be_bytes())?;
|
||||
self.userroomid_highlightcount
|
||||
.insert(&userroom_id, &0_u64.to_be_bytes())?;
|
||||
.insert(&userroom_id, &highlight_count.to_be_bytes())?;
|
||||
|
||||
self.roomuserid_lastnotificationread.insert(
|
||||
&roomuser_id,
|
||||
|
|
|
@ -25,8 +25,9 @@ pub trait Data: Send + Sync {
|
|||
> + 'a,
|
||||
>;
|
||||
|
||||
/// Sets a private read marker at `count`.
|
||||
fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()>;
|
||||
/// Sets a private read marker at `shorteventid`.
|
||||
fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, shorteventid: u64)
|
||||
-> Result<()>;
|
||||
|
||||
/// Returns the private read marker.
|
||||
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;
|
||||
|
|
|
@ -2,7 +2,7 @@ mod data;
|
|||
|
||||
pub use data::Data;
|
||||
|
||||
use crate::Result;
|
||||
use crate::{services, Result};
|
||||
use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId};
|
||||
|
||||
pub struct Service {
|
||||
|
@ -36,10 +36,19 @@ impl Service {
|
|||
self.db.readreceipts_since(room_id, since)
|
||||
}
|
||||
|
||||
/// Sets a private read marker at `count`.
|
||||
/// Sets a private read marker at `shorteventid`.
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> {
|
||||
self.db.private_read_set(room_id, user_id, count)
|
||||
pub fn private_read_set(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
user_id: &UserId,
|
||||
shorteventid: u64,
|
||||
) -> Result<()> {
|
||||
self.db.private_read_set(room_id, user_id, shorteventid)?;
|
||||
services()
|
||||
.rooms
|
||||
.user
|
||||
.update_notification_counts(user_id, room_id)
|
||||
}
|
||||
|
||||
/// Returns the private read marker.
|
||||
|
|
|
@ -213,18 +213,17 @@ impl Service {
|
|||
);
|
||||
let insert_lock = mutex_insert.lock().unwrap();
|
||||
|
||||
let count1 = services().globals.next_count()?;
|
||||
let _count1 = services().globals.next_count()?;
|
||||
// Mark as read first so the sending client doesn't get a notification even if appending
|
||||
// fails
|
||||
services().rooms.edus.read_receipt.private_read_set(
|
||||
&pdu.room_id,
|
||||
&pdu.sender,
|
||||
services()
|
||||
.rooms
|
||||
.edus
|
||||
.read_receipt
|
||||
.private_read_set(&pdu.room_id, &pdu.sender, count1)?;
|
||||
services()
|
||||
.rooms
|
||||
.user
|
||||
.reset_notification_counts(&pdu.sender, &pdu.room_id)?;
|
||||
.short
|
||||
.get_or_create_shorteventid(&pdu.event_id)?,
|
||||
)?;
|
||||
|
||||
let count2 = services().globals.next_count()?;
|
||||
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
|
||||
|
|
|
@ -2,7 +2,13 @@ use crate::Result;
|
|||
use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
|
||||
|
||||
pub trait Data: Send + Sync {
|
||||
fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>;
|
||||
fn update_notification_counts(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
notification_count: u64,
|
||||
highlight_count: u64,
|
||||
) -> Result<()>;
|
||||
|
||||
fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
|
||||
|
||||
|
|
|
@ -1,17 +1,117 @@
|
|||
mod data;
|
||||
|
||||
pub use data::Data;
|
||||
use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
|
||||
use ruma::{
|
||||
events::{
|
||||
push_rules::PushRulesEvent, room::power_levels::RoomPowerLevelsEventContent,
|
||||
GlobalAccountDataEventType, StateEventType,
|
||||
},
|
||||
push::{Action, Ruleset, Tweak},
|
||||
OwnedRoomId, OwnedUserId, RoomId, UserId,
|
||||
};
|
||||
|
||||
use crate::Result;
|
||||
use crate::{services, Error, Result};
|
||||
|
||||
pub struct Service {
|
||||
pub db: &'static dyn Data,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
|
||||
self.db.reset_notification_counts(user_id, room_id)
|
||||
pub fn update_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
|
||||
let power_levels: RoomPowerLevelsEventContent = services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.room_state_get(room_id, &StateEventType::RoomPowerLevels, "")?
|
||||
.map(|ev| {
|
||||
serde_json::from_str(ev.content.get())
|
||||
.map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
|
||||
})
|
||||
.transpose()?
|
||||
.unwrap_or_default();
|
||||
|
||||
let read_event = services()
|
||||
.rooms
|
||||
.edus
|
||||
.read_receipt
|
||||
.private_read_get(room_id, user_id)
|
||||
.unwrap_or(None)
|
||||
.unwrap_or(0u64);
|
||||
let mut notification_count = 0u64;
|
||||
let mut highlight_count = 0u64;
|
||||
|
||||
services()
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_since(user_id, room_id, read_event)?
|
||||
.filter_map(|pdu| pdu.ok())
|
||||
.map(|(_, pdu)| pdu)
|
||||
.filter(|pdu| {
|
||||
// Don't include user's own messages in notification counts
|
||||
user_id != &pdu.sender
|
||||
&& services()
|
||||
.rooms
|
||||
.short
|
||||
.get_or_create_shorteventid(&pdu.event_id)
|
||||
.unwrap_or(0)
|
||||
!= read_event
|
||||
})
|
||||
.filter_map(|pdu| {
|
||||
let rules_for_user = services()
|
||||
.account_data
|
||||
.get(
|
||||
None,
|
||||
user_id,
|
||||
GlobalAccountDataEventType::PushRules.to_string().into(),
|
||||
)
|
||||
.ok()?
|
||||
.map(|event| {
|
||||
serde_json::from_str::<PushRulesEvent>(event.get())
|
||||
.map_err(|_| Error::bad_database("Invalid push rules event in db."))
|
||||
})
|
||||
.transpose()
|
||||
.ok()?
|
||||
.map(|ev: PushRulesEvent| ev.content.global)
|
||||
.unwrap_or_else(|| Ruleset::server_default(user_id));
|
||||
|
||||
let mut highlight = false;
|
||||
let mut notify = false;
|
||||
|
||||
for action in services()
|
||||
.pusher
|
||||
.get_actions(
|
||||
user_id,
|
||||
&rules_for_user,
|
||||
&power_levels,
|
||||
&pdu.to_sync_room_event(),
|
||||
&pdu.room_id,
|
||||
)
|
||||
.ok()?
|
||||
{
|
||||
match action {
|
||||
Action::DontNotify => notify = false,
|
||||
// TODO: Implement proper support for coalesce
|
||||
Action::Notify | Action::Coalesce => notify = true,
|
||||
Action::SetTweak(Tweak::Highlight(true)) => {
|
||||
highlight = true;
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
}
|
||||
|
||||
if notify {
|
||||
notification_count += 1;
|
||||
};
|
||||
|
||||
if highlight {
|
||||
highlight_count += 1;
|
||||
};
|
||||
|
||||
Some(())
|
||||
})
|
||||
.for_each(|_| {});
|
||||
|
||||
self.db
|
||||
.update_notification_counts(user_id, room_id, notification_count, highlight_count)
|
||||
}
|
||||
|
||||
pub fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
|
||||
|
|
Loading…
Reference in a new issue