feat: WIP relationships and threads

This commit is contained in:
Timo Kösters 2023-06-25 19:31:40 +02:00
parent def079267d
commit c7e0ea525a
No known key found for this signature in database
GPG key ID: 0B25E636FBA7E4CB
28 changed files with 766 additions and 340 deletions

597
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -19,14 +19,14 @@ rust-version = "1.64.0"
[dependencies] [dependencies]
# Web framework # Web framework
axum = { version = "0.5.17", default-features = false, features = ["form", "headers", "http1", "http2", "json", "matched-path"], optional = true } axum = { version = "0.5.16", default-features = false, features = ["form", "headers", "http1", "http2", "json", "matched-path"], optional = true }
axum-server = { version = "0.4.7", features = ["tls-rustls"] } axum-server = { version = "0.5.1", features = ["tls-rustls"] }
tower = { version = "0.4.13", features = ["util"] } tower = { version = "0.4.13", features = ["util"] }
tower-http = { version = "0.3.5", features = ["add-extension", "cors", "compression-full", "sensitive-headers", "trace", "util"] } tower-http = { version = "0.4.1", features = ["add-extension", "cors", "compression-zstd", "sensitive-headers", "trace", "util"] }
# Used for matrix spec type definitions and helpers # Used for matrix spec type definitions and helpers
#ruma = { version = "0.4.0", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } #ruma = { version = "0.4.0", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
ruma = { git = "https://github.com/ruma/ruma", rev = "8eea3e05490fa9a318f9ed66c3a75272e6ef0ee5", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-msc2448", "unstable-exhaustive-types", "ring-compat", "unstable-unspecified" ] } ruma = { git = "https://github.com/ruma/ruma", rev = "761771a317460f30590da170115d007892381e85", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-msc2448", "unstable-exhaustive-types", "ring-compat", "unstable-unspecified" ] }
#ruma = { git = "https://github.com/timokoesters/ruma", rev = "50c1db7e0a3a21fc794b0cce3b64285a4c750c71", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } #ruma = { git = "https://github.com/timokoesters/ruma", rev = "50c1db7e0a3a21fc794b0cce3b64285a4c750c71", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
#ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } #ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }

View file

@ -1,23 +0,0 @@
[build.env]
# CI uses an S3 endpoint to store sccache artifacts, so their config needs to
# be available in the cross container as well
passthrough = [
"RUSTC_WRAPPER",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"SCCACHE_BUCKET",
"SCCACHE_ENDPOINT",
"SCCACHE_S3_USE_SSL",
]
[target.aarch64-unknown-linux-musl]
image = "registry.gitlab.com/jfowl/conduit-containers/rust-cross-aarch64-unknown-linux-musl:latest"
[target.arm-unknown-linux-musleabihf]
image = "registry.gitlab.com/jfowl/conduit-containers/rust-cross-arm-unknown-linux-musleabihf:latest"
[target.armv7-unknown-linux-musleabihf]
image = "registry.gitlab.com/jfowl/conduit-containers/rust-cross-armv7-unknown-linux-musleabihf:latest"
[target.x86_64-unknown-linux-musl]
image = "registry.gitlab.com/jfowl/conduit-containers/rust-cross-x86_64-unknown-linux-musl@sha256:b6d689e42f0236c8a38b961bca2a12086018b85ed20e0826310421daf182e2bb"

View file

@ -69,18 +69,18 @@ pub async fn get_context_route(
lazy_loaded.insert(base_event.sender.as_str().to_owned()); lazy_loaded.insert(base_event.sender.as_str().to_owned());
} }
// Use limit with maximum 100
let limit = usize::try_from(body.limit)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Limit value is invalid."))?
.min(100);
let base_event = base_event.to_room_event(); let base_event = base_event.to_room_event();
let events_before: Vec<_> = services() let events_before: Vec<_> = services()
.rooms .rooms
.timeline .timeline
.pdus_until(sender_user, &room_id, base_token)? .pdus_until(sender_user, &room_id, base_token)?
.take( .take(limit / 2)
u32::try_from(body.limit).map_err(|_| {
Error::BadRequest(ErrorKind::InvalidParam, "Limit value is invalid.")
})? as usize
/ 2,
)
.filter_map(|r| r.ok()) // Remove buggy events .filter_map(|r| r.ok()) // Remove buggy events
.filter(|(_, pdu)| { .filter(|(_, pdu)| {
services() services()
@ -114,12 +114,7 @@ pub async fn get_context_route(
.rooms .rooms
.timeline .timeline
.pdus_after(sender_user, &room_id, base_token)? .pdus_after(sender_user, &room_id, base_token)?
.take( .take(limit / 2)
u32::try_from(body.limit).map_err(|_| {
Error::BadRequest(ErrorKind::InvalidParam, "Limit value is invalid.")
})? as usize
/ 2,
)
.filter_map(|r| r.ok()) // Remove buggy events .filter_map(|r| r.ok()) // Remove buggy events
.filter(|(_, pdu)| { .filter(|(_, pdu)| {
services() services()

View file

@ -1,3 +1,5 @@
use std::time::Duration;
use crate::{service::media::FileMeta, services, utils, Error, Result, Ruma}; use crate::{service::media::FileMeta, services, utils, Error, Result, Ruma};
use ruma::api::client::{ use ruma::api::client::{
error::ErrorKind, error::ErrorKind,
@ -67,6 +69,8 @@ pub async fn get_remote_content(
allow_remote: false, allow_remote: false,
server_name: server_name.to_owned(), server_name: server_name.to_owned(),
media_id, media_id,
timeout_ms: Duration::from_secs(20),
allow_redirect: false,
}, },
) )
.await?; .await?;
@ -194,6 +198,8 @@ pub async fn get_content_thumbnail_route(
method: body.method.clone(), method: body.method.clone(),
server_name: body.server_name.clone(), server_name: body.server_name.clone(),
media_id: body.media_id.clone(), media_id: body.media_id.clone(),
timeout_ms: Duration::from_secs(20),
allow_redirect: false,
}, },
) )
.await?; .await?;

View file

@ -133,8 +133,12 @@ pub async fn get_message_events_route(
from, from,
)?; )?;
// Use limit or else 10 // Use limit or else 10, with maximum 100
let limit = body.limit.try_into().map_or(10_usize, |l: u32| l as usize); let limit = body
.limit
.try_into()
.map_or(10_usize, |l: u32| l as usize)
.min(100);
let next_token; let next_token;

View file

@ -24,6 +24,7 @@ mod state;
mod sync; mod sync;
mod tag; mod tag;
mod thirdparty; mod thirdparty;
mod threads;
mod to_device; mod to_device;
mod typing; mod typing;
mod unversioned; mod unversioned;
@ -56,6 +57,7 @@ pub use state::*;
pub use sync::*; pub use sync::*;
pub use tag::*; pub use tag::*;
pub use thirdparty::*; pub use thirdparty::*;
pub use threads::*;
pub use to_device::*; pub use to_device::*;
pub use typing::*; pub use typing::*;
pub use unversioned::*; pub use unversioned::*;

View file

@ -0,0 +1,10 @@
use crate::{services, Result, Ruma};
use std::time::{Duration, SystemTime};
/// # `GET /_matrix/client/r0/todo`
pub async fn get_relating_events_route(
body: Ruma<get_turn_server_info::v3::Request>,
) -> Result<get_turn_server_info::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
todo!();
}

View file

@ -0,0 +1,49 @@
use ruma::api::client::{error::ErrorKind, threads::get_threads};
use crate::{services, Error, Result, Ruma};
/// # `GET /_matrix/client/r0/rooms/{roomId}/threads`
pub async fn get_threads_route(
body: Ruma<get_threads::v1::Request>,
) -> Result<get_threads::v1::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
// Use limit or else 10, with maximum 100
let limit = body
.limit
.and_then(|l| l.try_into().ok())
.unwrap_or(10)
.min(100);
let from = if let Some(from) = &body.from {
from.parse()
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, ""))?
} else {
u64::MAX
};
let threads = services()
.rooms
.threads
.threads_until(sender_user, &body.room_id, from, &body.include)?
.take(limit)
.filter_map(|r| r.ok())
.filter(|(_, pdu)| {
services()
.rooms
.state_accessor
.user_can_see_event(sender_user, &body.room_id, &pdu.event_id)
.unwrap_or(false)
})
.collect::<Vec<_>>();
let next_batch = threads.last().map(|(count, _)| count.to_string());
Ok(get_threads::v1::Response {
chunk: threads
.into_iter()
.map(|(_, pdu)| pdu.to_room_event())
.collect(),
next_batch,
})
}

View file

@ -23,6 +23,8 @@ pub async fn get_supported_versions_route(
"r0.6.0".to_owned(), "r0.6.0".to_owned(),
"v1.1".to_owned(), "v1.1".to_owned(),
"v1.2".to_owned(), "v1.2".to_owned(),
"v1.3".to_owned(),
"v1.4".to_owned(),
], ],
unstable_features: BTreeMap::from_iter([("org.matrix.e2e_cross_signing".to_owned(), true)]), unstable_features: BTreeMap::from_iter([("org.matrix.e2e_cross_signing".to_owned(), true)]),
}; };

View file

@ -1,3 +1,4 @@
#[allow(deprecated)]
use crate::{ use crate::{
api::client_server::{self, claim_keys_helper, get_keys_helper}, api::client_server::{self, claim_keys_helper, get_keys_helper},
service::pdu::{gen_event_id_canonical_json, PduBuilder}, service::pdu::{gen_event_id_canonical_json, PduBuilder},
@ -497,6 +498,9 @@ async fn request_well_known(destination: &str) -> Option<String> {
.send() .send()
.await; .await;
debug!("Got well known response"); debug!("Got well known response");
if let Err(e) = &response {
error!("Well known error: {e:?}");
}
let text = response.ok()?.text().await; let text = response.ok()?.text().await;
debug!("Got well known response text"); debug!("Got well known response text");
let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?; let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?;

View file

@ -12,6 +12,7 @@ mod state;
mod state_accessor; mod state_accessor;
mod state_cache; mod state_cache;
mod state_compressor; mod state_compressor;
mod threads;
mod timeline; mod timeline;
mod user; mod user;

View file

@ -5,6 +5,13 @@ use ruma::{EventId, RoomId};
use crate::{database::KeyValueDatabase, service, Result}; use crate::{database::KeyValueDatabase, service, Result};
impl service::rooms::pdu_metadata::Data for KeyValueDatabase { impl service::rooms::pdu_metadata::Data for KeyValueDatabase {
fn add_relation(&self, from: u64, to: u64) -> Result<()> {
let mut key = from.to_be_bytes().to_vec();
key.extend_from_slice(&to.to_be_bytes());
self.fromto_relation.insert(&key, &[])?;
Ok(())
}
fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> { fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> {
for prev in event_ids { for prev in event_ids {
let mut key = room_id.as_bytes().to_vec(); let mut key = room_id.as_bytes().to_vec();

View file

@ -0,0 +1,78 @@
use std::mem;
use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId};
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
impl service::rooms::threads::Data for KeyValueDatabase {
fn threads_until<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
until: u64,
include: &'a IncludeThreads,
) -> Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>> {
let prefix = services()
.rooms
.short
.get_shortroomid(room_id)?
.expect("room exists")
.to_be_bytes()
.to_vec();
let mut current = prefix.clone();
current.extend_from_slice(&(until - 1).to_be_bytes());
Ok(Box::new(
self.threadid_userids
.iter_from(&current, true)
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(pduid, users)| {
let count = utils::u64_from_bytes(&pduid[(mem::size_of::<u64>())..])
.map_err(|_| Error::bad_database("Invalid pduid in threadid_userids."))?;
let mut pdu = services()
.rooms
.timeline
.get_pdu_from_id(&pduid)?
.ok_or_else(|| {
Error::bad_database("Invalid pduid reference in threadid_userids")
})?;
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
Ok((count, pdu))
}),
))
}
fn update_participants(&self, root_id: &[u8], participants: &[OwnedUserId]) -> Result<()> {
let users = participants
.iter()
.map(|user| user.as_bytes())
.collect::<Vec<_>>()
.join(&[0xff][..]);
self.threadid_userids.insert(&root_id, &users)?;
Ok(())
}
fn get_participants(&self, root_id: &[u8]) -> Result<Option<Vec<OwnedUserId>>> {
if let Some(users) = self.threadid_userids.get(&root_id)? {
Ok(Some(
users
.split(|b| *b == 0xff)
.map(|bytes| {
UserId::parse(utils::string_from_bytes(bytes).map_err(|_| {
Error::bad_database("Invalid UserId bytes in threadid_userids.")
})?)
.map_err(|_| Error::bad_database("Invalid UserId in threadid_userids."))
})
.filter_map(|r| r.ok())
.collect(),
))
} else {
Ok(None)
}
}
}

View file

@ -198,19 +198,30 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
} }
/// Removes a pdu and creates a new one with the same id. /// Removes a pdu and creates a new one with the same id.
fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> { fn replace_pdu(
&self,
pdu_id: &[u8],
pdu_json: &CanonicalJsonObject,
pdu: &PduEvent,
) -> Result<()> {
if self.pduid_pdu.get(pdu_id)?.is_some() { if self.pduid_pdu.get(pdu_id)?.is_some() {
self.pduid_pdu.insert( self.pduid_pdu.insert(
pdu_id, pdu_id,
&serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"), &serde_json::to_vec(pdu_json).expect("CanonicalJsonObject is always a valid"),
)?; )?;
Ok(())
} else { } else {
Err(Error::BadRequest( return Err(Error::BadRequest(
ErrorKind::NotFound, ErrorKind::NotFound,
"PDU does not exist.", "PDU does not exist.",
)) ));
} }
self.pdu_cache
.lock()
.unwrap()
.remove(&(*pdu.event_id).to_owned());
Ok(())
} }
/// Returns an iterator over all events and their tokens in a room that happened before the /// Returns an iterator over all events and their tokens in a room that happened before the

View file

@ -80,6 +80,8 @@ pub struct KeyValueDatabase {
pub(super) aliasid_alias: Arc<dyn KvTree>, // AliasId = RoomId + Count pub(super) aliasid_alias: Arc<dyn KvTree>, // AliasId = RoomId + Count
pub(super) publicroomids: Arc<dyn KvTree>, pub(super) publicroomids: Arc<dyn KvTree>,
pub(super) threadid_userids: Arc<dyn KvTree>, // ThreadId = RoomId + Count
pub(super) tokenids: Arc<dyn KvTree>, // TokenId = ShortRoomId + Token + PduIdCount pub(super) tokenids: Arc<dyn KvTree>, // TokenId = ShortRoomId + Token + PduIdCount
/// Participating servers in a room. /// Participating servers in a room.
@ -128,6 +130,8 @@ pub struct KeyValueDatabase {
pub(super) eventid_outlierpdu: Arc<dyn KvTree>, pub(super) eventid_outlierpdu: Arc<dyn KvTree>,
pub(super) softfailedeventids: Arc<dyn KvTree>, pub(super) softfailedeventids: Arc<dyn KvTree>,
/// ShortEventId + ShortEventId -> ().
pub(super) fromto_relation: Arc<dyn KvTree>,
/// RoomId + EventId -> Parent PDU EventId. /// RoomId + EventId -> Parent PDU EventId.
pub(super) referencedevents: Arc<dyn KvTree>, pub(super) referencedevents: Arc<dyn KvTree>,
@ -302,6 +306,8 @@ impl KeyValueDatabase {
aliasid_alias: builder.open_tree("aliasid_alias")?, aliasid_alias: builder.open_tree("aliasid_alias")?,
publicroomids: builder.open_tree("publicroomids")?, publicroomids: builder.open_tree("publicroomids")?,
threadid_userids: builder.open_tree("threadid_userids")?,
tokenids: builder.open_tree("tokenids")?, tokenids: builder.open_tree("tokenids")?,
roomserverids: builder.open_tree("roomserverids")?, roomserverids: builder.open_tree("roomserverids")?,
@ -342,6 +348,7 @@ impl KeyValueDatabase {
eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?, eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?,
softfailedeventids: builder.open_tree("softfailedeventids")?, softfailedeventids: builder.open_tree("softfailedeventids")?,
fromto_relation: builder.open_tree("fromto_relation")?,
referencedevents: builder.open_tree("referencedevents")?, referencedevents: builder.open_tree("referencedevents")?,
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?, roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?,

View file

@ -383,6 +383,7 @@ fn routes() -> Router {
.ruma_route(client_server::set_pushers_route) .ruma_route(client_server::set_pushers_route)
// .ruma_route(client_server::third_party_route) // .ruma_route(client_server::third_party_route)
.ruma_route(client_server::upgrade_room_route) .ruma_route(client_server::upgrade_room_route)
.ruma_route(client_server::get_threads_route)
.ruma_route(server_server::get_server_version_route) .ruma_route(server_server::get_server_version_route)
.route( .route(
"/_matrix/key/v2/server", "/_matrix/key/v2/server",

View file

@ -8,7 +8,7 @@ use image::imageops::FilterType;
use tokio::{ use tokio::{
fs::File, fs::File,
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt, BufReader},
}; };
pub struct FileMeta { pub struct FileMeta {
@ -70,7 +70,9 @@ impl Service {
{ {
let path = services().globals.get_media_file(&key); let path = services().globals.get_media_file(&key);
let mut file = Vec::new(); let mut file = Vec::new();
File::open(path).await?.read_to_end(&mut file).await?; BufReader::new(File::open(path).await?)
.read_to_end(&mut file)
.await?;
Ok(Some(FileMeta { Ok(Some(FileMeta {
content_disposition, content_disposition,

View file

@ -97,6 +97,7 @@ impl Services {
db, db,
lasttimelinecount_cache: Mutex::new(HashMap::new()), lasttimelinecount_cache: Mutex::new(HashMap::new()),
}, },
threads: rooms::threads::Service { db },
user: rooms::user::Service { db }, user: rooms::user::Service { db },
}, },
transaction_ids: transaction_ids::Service { db }, transaction_ids: transaction_ids::Service { db },

View file

@ -1,9 +1,9 @@
use crate::Error; use crate::Error;
use ruma::{ use ruma::{
events::{ events::{
room::member::RoomMemberEventContent, AnyEphemeralRoomEvent, AnyStateEvent, room::member::RoomMemberEventContent, AnyEphemeralRoomEvent, AnyMessageLikeEvent,
AnyStrippedStateEvent, AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, AnyStateEvent, AnyStrippedStateEvent, AnySyncStateEvent, AnySyncTimelineEvent,
StateEvent, TimelineEventType, AnyTimelineEvent, StateEvent, TimelineEventType,
}, },
serde::Raw, serde::Raw,
state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
@ -175,6 +175,30 @@ impl PduEvent {
serde_json::from_value(json).expect("Raw::from_value always works") serde_json::from_value(json).expect("Raw::from_value always works")
} }
#[tracing::instrument(skip(self))]
pub fn to_message_like_event(&self) -> Raw<AnyMessageLikeEvent> {
let mut json = json!({
"content": self.content,
"type": self.kind,
"event_id": self.event_id,
"sender": self.sender,
"origin_server_ts": self.origin_server_ts,
"room_id": self.room_id,
});
if let Some(unsigned) = &self.unsigned {
json["unsigned"] = json!(unsigned);
}
if let Some(state_key) = &self.state_key {
json["state_key"] = json!(state_key);
}
if let Some(redacts) = &self.redacts {
json["redacts"] = json!(redacts);
}
serde_json::from_value(json).expect("Raw::from_value always works")
}
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn to_state_event(&self) -> Raw<AnyStateEvent> { pub fn to_state_event(&self) -> Raw<AnyStateEvent> {
let mut json = json!({ let mut json = json!({

View file

@ -162,9 +162,7 @@ impl Service {
&pdu.room_id, &pdu.room_id,
)? { )? {
let n = match action { let n = match action {
Action::DontNotify => false, Action::Notify => true,
// TODO: Implement proper support for coalesce
Action::Notify | Action::Coalesce => true,
Action::SetTweak(tweak) => { Action::SetTweak(tweak) => {
tweaks.push(tweak.clone()); tweaks.push(tweak.clone());
continue; continue;

View file

@ -13,6 +13,7 @@ pub mod state;
pub mod state_accessor; pub mod state_accessor;
pub mod state_cache; pub mod state_cache;
pub mod state_compressor; pub mod state_compressor;
pub mod threads;
pub mod timeline; pub mod timeline;
pub mod user; pub mod user;
@ -32,6 +33,7 @@ pub trait Data:
+ state_cache::Data + state_cache::Data
+ state_compressor::Data + state_compressor::Data
+ timeline::Data + timeline::Data
+ threads::Data
+ user::Data + user::Data
{ {
} }
@ -53,5 +55,6 @@ pub struct Service {
pub state_cache: state_cache::Service, pub state_cache: state_cache::Service,
pub state_compressor: state_compressor::Service, pub state_compressor: state_compressor::Service,
pub timeline: timeline::Service, pub timeline: timeline::Service,
pub threads: threads::Service,
pub user: user::Service, pub user: user::Service,
} }

View file

@ -4,6 +4,7 @@ use crate::Result;
use ruma::{EventId, RoomId}; use ruma::{EventId, RoomId};
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
fn add_relation(&self, from: u64, to: u64) -> Result<()>;
fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()>; fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()>;
fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool>; fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool>;
fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()>; fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()>;

View file

@ -4,13 +4,20 @@ use std::sync::Arc;
pub use data::Data; pub use data::Data;
use ruma::{EventId, RoomId}; use ruma::{EventId, RoomId};
use crate::Result; use crate::{services, Result};
pub struct Service { pub struct Service {
pub db: &'static dyn Data, pub db: &'static dyn Data,
} }
impl Service { impl Service {
#[tracing::instrument(skip(self, from, to))]
pub fn add_relation(&self, from: &EventId, to: &EventId) -> Result<()> {
let from = services().rooms.short.get_or_create_shorteventid(from)?;
let to = services().rooms.short.get_or_create_shorteventid(to)?;
self.db.add_relation(from, to)
}
#[tracing::instrument(skip(self, room_id, event_ids))] #[tracing::instrument(skip(self, room_id, event_ids))]
pub fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> { pub fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> {
self.db.mark_as_referenced(room_id, event_ids) self.db.mark_as_referenced(room_id, event_ids)

View file

@ -0,0 +1,15 @@
use crate::{PduEvent, Result};
use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId};
pub trait Data: Send + Sync {
fn threads_until<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
until: u64,
include: &'a IncludeThreads,
) -> Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>>;
fn update_participants(&self, root_id: &[u8], participants: &[OwnedUserId]) -> Result<()>;
fn get_participants(&self, root_id: &[u8]) -> Result<Option<Vec<OwnedUserId>>>;
}

View file

@ -0,0 +1,119 @@
mod data;
use std::sync::Arc;
pub use data::Data;
use ruma::{
api::client::{error::ErrorKind, threads::get_threads::v1::IncludeThreads},
events::{relation::BundledThread, StateEventType},
uint, CanonicalJsonValue, EventId, OwnedUserId, RoomId, UserId,
};
use serde::Deserialize;
use serde_json::json;
use crate::{services, utils, Error, PduEvent, Result};
use super::timeline::PduCount;
pub struct Service {
pub db: &'static dyn Data,
}
impl Service {
pub fn threads_until<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
until: u64,
include: &'a IncludeThreads,
) -> Result<impl Iterator<Item = Result<(u64, PduEvent)>> + 'a> {
self.db.threads_until(user_id, room_id, until, include)
}
pub fn add_to_thread<'a>(&'a self, root_event_id: &EventId, pdu: &PduEvent) -> Result<()> {
let root_id = &services()
.rooms
.timeline
.get_pdu_id(root_event_id)?
.ok_or_else(|| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Invalid event id in thread message",
)
})?;
let root_pdu = services()
.rooms
.timeline
.get_pdu_from_id(root_id)?
.ok_or_else(|| {
Error::BadRequest(ErrorKind::InvalidParam, "Thread root pdu not found")
})?;
let mut root_pdu_json = services()
.rooms
.timeline
.get_pdu_json_from_id(root_id)?
.ok_or_else(|| {
Error::BadRequest(ErrorKind::InvalidParam, "Thread root pdu not found")
})?;
if let CanonicalJsonValue::Object(unsigned) = root_pdu_json
.entry("unsigned".to_owned())
.or_insert_with(|| CanonicalJsonValue::Object(Default::default()))
{
if let Some(mut relations) = unsigned
.get("m.relations")
.and_then(|r| r.as_object())
.and_then(|r| r.get("m.thread"))
.and_then(|relations| {
serde_json::from_value::<BundledThread>(relations.clone().into()).ok()
})
{
// Thread already existed
relations.count += uint!(1);
relations.latest_event = pdu.to_message_like_event();
let content = serde_json::to_value(relations).expect("to_value always works");
unsigned.insert(
"m.relations".to_owned(),
json!({ "m.thread": content })
.try_into()
.expect("thread is valid json"),
);
} else {
// New thread
let relations = BundledThread {
latest_event: pdu.to_message_like_event(),
count: uint!(1),
current_user_participated: true,
};
let content = serde_json::to_value(relations).expect("to_value always works");
unsigned.insert(
"m.relations".to_owned(),
json!({ "m.thread": content })
.try_into()
.expect("thread is valid json"),
);
}
services()
.rooms
.timeline
.replace_pdu(root_id, &root_pdu_json, &root_pdu)?;
}
let mut users = Vec::new();
if let Some(userids) = self.db.get_participants(&root_id)? {
users.extend_from_slice(&userids);
users.push(pdu.sender.clone());
} else {
users.push(root_pdu.sender);
users.push(pdu.sender.clone());
}
self.db.update_participants(root_id, &users)
}
}

View file

@ -57,7 +57,12 @@ pub trait Data: Send + Sync {
) -> Result<()>; ) -> Result<()>;
/// Removes a pdu and creates a new one with the same id. /// Removes a pdu and creates a new one with the same id.
fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()>; fn replace_pdu(
&self,
pdu_id: &[u8],
pdu_json: &CanonicalJsonObject,
pdu: &PduEvent,
) -> Result<()>;
/// Returns an iterator over all events and their tokens in a room that happened before the /// Returns an iterator over all events and their tokens in a room that happened before the
/// event with id `until` in reverse-chronological order. /// event with id `until` in reverse-chronological order.

View file

@ -18,13 +18,13 @@ use ruma::{
events::{ events::{
push_rules::PushRulesEvent, push_rules::PushRulesEvent,
room::{ room::{
create::RoomCreateEventContent, member::MembershipState, create::RoomCreateEventContent, encrypted::Relation, member::MembershipState,
power_levels::RoomPowerLevelsEventContent, power_levels::RoomPowerLevelsEventContent,
}, },
GlobalAccountDataEventType, StateEventType, TimelineEventType, GlobalAccountDataEventType, StateEventType, TimelineEventType,
}, },
push::{Action, Ruleset, Tweak}, push::{Action, Ruleset, Tweak},
serde::Base64, serde::{Base64, JsonObject},
state_res, state_res,
state_res::{Event, RoomVersion}, state_res::{Event, RoomVersion},
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
@ -197,8 +197,13 @@ impl Service {
/// Removes a pdu and creates a new one with the same id. /// Removes a pdu and creates a new one with the same id.
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> { pub fn replace_pdu(
self.db.replace_pdu(pdu_id, pdu) &self,
pdu_id: &[u8],
pdu_json: &CanonicalJsonObject,
pdu: &PduEvent,
) -> Result<()> {
self.db.replace_pdu(pdu_id, pdu_json, pdu)
} }
/// Creates a new persisted data unit and adds it to a room. /// Creates a new persisted data unit and adds it to a room.
@ -352,9 +357,7 @@ impl Service {
&pdu.room_id, &pdu.room_id,
)? { )? {
match action { match action {
Action::DontNotify => notify = false, Action::Notify => notify = true,
// TODO: Implement proper support for coalesce
Action::Notify | Action::Coalesce => notify = true,
Action::SetTweak(Tweak::Highlight(true)) => { Action::SetTweak(Tweak::Highlight(true)) => {
highlight = true; highlight = true;
} }
@ -457,6 +460,50 @@ impl Service {
_ => {} _ => {}
} }
// Update Relationships
#[derive(Deserialize)]
struct ExtractRelatesTo {
#[serde(rename = "m.relates_to")]
relates_to: Relation,
}
#[derive(Clone, Debug, Deserialize)]
struct ExtractEventId {
event_id: OwnedEventId,
}
#[derive(Clone, Debug, Deserialize)]
struct ExtractRelatesToEventId {
#[serde(rename = "m.relates_to")]
relates_to: ExtractEventId,
}
if let Ok(content) = serde_json::from_str::<ExtractRelatesToEventId>(pdu.content.get()) {
services()
.rooms
.pdu_metadata
.add_relation(&pdu.event_id, &content.relates_to.event_id)?;
}
if let Ok(content) = serde_json::from_str::<ExtractRelatesTo>(pdu.content.get()) {
match content.relates_to {
Relation::Reply { in_reply_to } => {
// We need to do it again here, because replies don't have
// event_id as a top level field
services()
.rooms
.pdu_metadata
.add_relation(&pdu.event_id, &in_reply_to.event_id)?;
}
Relation::Thread(thread) => {
services()
.rooms
.threads
.add_to_thread(&thread.event_id, pdu)?;
}
_ => {} // TODO: Aggregate other types
}
}
for appservice in services().appservice.all()? { for appservice in services().appservice.all()? {
if services() if services()
.rooms .rooms
@ -957,12 +1004,17 @@ impl Service {
/// Replace a PDU with the redacted form. /// Replace a PDU with the redacted form.
#[tracing::instrument(skip(self, reason))] #[tracing::instrument(skip(self, reason))]
pub fn redact_pdu(&self, event_id: &EventId, reason: &PduEvent) -> Result<()> { pub fn redact_pdu(&self, event_id: &EventId, reason: &PduEvent) -> Result<()> {
// TODO: Don't reserialize, keep original json
if let Some(pdu_id) = self.get_pdu_id(event_id)? { if let Some(pdu_id) = self.get_pdu_id(event_id)? {
let mut pdu = self let mut pdu = self
.get_pdu_from_id(&pdu_id)? .get_pdu_from_id(&pdu_id)?
.ok_or_else(|| Error::bad_database("PDU ID points to invalid PDU."))?; .ok_or_else(|| Error::bad_database("PDU ID points to invalid PDU."))?;
pdu.redact(reason)?; pdu.redact(reason)?;
self.replace_pdu(&pdu_id, &pdu)?; self.replace_pdu(
&pdu_id,
&utils::to_canonical_object(&pdu).expect("PDU is an object"),
&pdu,
)?;
} }
// If event does not exist, just noop // If event does not exist, just noop
Ok(()) Ok(())