mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-01-15 22:16:27 +03:00
Merge branch 'threads' into 'next'
feat: WIP relationships and threads See merge request famedly/conduit!483
This commit is contained in:
commit
5f9ca8e458
28 changed files with 766 additions and 340 deletions
597
Cargo.lock
generated
597
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -19,14 +19,14 @@ rust-version = "1.64.0"
|
|||
|
||||
[dependencies]
|
||||
# Web framework
|
||||
axum = { version = "0.5.17", default-features = false, features = ["form", "headers", "http1", "http2", "json", "matched-path"], optional = true }
|
||||
axum-server = { version = "0.4.7", features = ["tls-rustls"] }
|
||||
axum = { version = "0.5.16", default-features = false, features = ["form", "headers", "http1", "http2", "json", "matched-path"], optional = true }
|
||||
axum-server = { version = "0.5.1", features = ["tls-rustls"] }
|
||||
tower = { version = "0.4.13", features = ["util"] }
|
||||
tower-http = { version = "0.3.5", features = ["add-extension", "cors", "compression-full", "sensitive-headers", "trace", "util"] }
|
||||
tower-http = { version = "0.4.1", features = ["add-extension", "cors", "compression-zstd", "sensitive-headers", "trace", "util"] }
|
||||
|
||||
# Used for matrix spec type definitions and helpers
|
||||
#ruma = { version = "0.4.0", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
|
||||
ruma = { git = "https://github.com/ruma/ruma", rev = "8eea3e05490fa9a318f9ed66c3a75272e6ef0ee5", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-msc2448", "unstable-exhaustive-types", "ring-compat", "unstable-unspecified" ] }
|
||||
ruma = { git = "https://github.com/ruma/ruma", rev = "761771a317460f30590da170115d007892381e85", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-msc2448", "unstable-exhaustive-types", "ring-compat", "unstable-unspecified" ] }
|
||||
#ruma = { git = "https://github.com/timokoesters/ruma", rev = "50c1db7e0a3a21fc794b0cce3b64285a4c750c71", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
|
||||
#ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
|
||||
|
||||
|
|
23
Cross.toml
23
Cross.toml
|
@ -1,23 +0,0 @@
|
|||
[build.env]
|
||||
# CI uses an S3 endpoint to store sccache artifacts, so their config needs to
|
||||
# be available in the cross container as well
|
||||
passthrough = [
|
||||
"RUSTC_WRAPPER",
|
||||
"AWS_ACCESS_KEY_ID",
|
||||
"AWS_SECRET_ACCESS_KEY",
|
||||
"SCCACHE_BUCKET",
|
||||
"SCCACHE_ENDPOINT",
|
||||
"SCCACHE_S3_USE_SSL",
|
||||
]
|
||||
|
||||
[target.aarch64-unknown-linux-musl]
|
||||
image = "registry.gitlab.com/jfowl/conduit-containers/rust-cross-aarch64-unknown-linux-musl:latest"
|
||||
|
||||
[target.arm-unknown-linux-musleabihf]
|
||||
image = "registry.gitlab.com/jfowl/conduit-containers/rust-cross-arm-unknown-linux-musleabihf:latest"
|
||||
|
||||
[target.armv7-unknown-linux-musleabihf]
|
||||
image = "registry.gitlab.com/jfowl/conduit-containers/rust-cross-armv7-unknown-linux-musleabihf:latest"
|
||||
|
||||
[target.x86_64-unknown-linux-musl]
|
||||
image = "registry.gitlab.com/jfowl/conduit-containers/rust-cross-x86_64-unknown-linux-musl@sha256:b6d689e42f0236c8a38b961bca2a12086018b85ed20e0826310421daf182e2bb"
|
|
@ -69,18 +69,18 @@ pub async fn get_context_route(
|
|||
lazy_loaded.insert(base_event.sender.as_str().to_owned());
|
||||
}
|
||||
|
||||
// Use limit with maximum 100
|
||||
let limit = usize::try_from(body.limit)
|
||||
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Limit value is invalid."))?
|
||||
.min(100);
|
||||
|
||||
let base_event = base_event.to_room_event();
|
||||
|
||||
let events_before: Vec<_> = services()
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_until(sender_user, &room_id, base_token)?
|
||||
.take(
|
||||
u32::try_from(body.limit).map_err(|_| {
|
||||
Error::BadRequest(ErrorKind::InvalidParam, "Limit value is invalid.")
|
||||
})? as usize
|
||||
/ 2,
|
||||
)
|
||||
.take(limit / 2)
|
||||
.filter_map(|r| r.ok()) // Remove buggy events
|
||||
.filter(|(_, pdu)| {
|
||||
services()
|
||||
|
@ -114,12 +114,7 @@ pub async fn get_context_route(
|
|||
.rooms
|
||||
.timeline
|
||||
.pdus_after(sender_user, &room_id, base_token)?
|
||||
.take(
|
||||
u32::try_from(body.limit).map_err(|_| {
|
||||
Error::BadRequest(ErrorKind::InvalidParam, "Limit value is invalid.")
|
||||
})? as usize
|
||||
/ 2,
|
||||
)
|
||||
.take(limit / 2)
|
||||
.filter_map(|r| r.ok()) // Remove buggy events
|
||||
.filter(|(_, pdu)| {
|
||||
services()
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use crate::{service::media::FileMeta, services, utils, Error, Result, Ruma};
|
||||
use ruma::api::client::{
|
||||
error::ErrorKind,
|
||||
|
@ -67,6 +69,8 @@ pub async fn get_remote_content(
|
|||
allow_remote: false,
|
||||
server_name: server_name.to_owned(),
|
||||
media_id,
|
||||
timeout_ms: Duration::from_secs(20),
|
||||
allow_redirect: false,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
@ -194,6 +198,8 @@ pub async fn get_content_thumbnail_route(
|
|||
method: body.method.clone(),
|
||||
server_name: body.server_name.clone(),
|
||||
media_id: body.media_id.clone(),
|
||||
timeout_ms: Duration::from_secs(20),
|
||||
allow_redirect: false,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
|
|
@ -133,8 +133,12 @@ pub async fn get_message_events_route(
|
|||
from,
|
||||
)?;
|
||||
|
||||
// Use limit or else 10
|
||||
let limit = body.limit.try_into().map_or(10_usize, |l: u32| l as usize);
|
||||
// Use limit or else 10, with maximum 100
|
||||
let limit = body
|
||||
.limit
|
||||
.try_into()
|
||||
.map_or(10_usize, |l: u32| l as usize)
|
||||
.min(100);
|
||||
|
||||
let next_token;
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ mod state;
|
|||
mod sync;
|
||||
mod tag;
|
||||
mod thirdparty;
|
||||
mod threads;
|
||||
mod to_device;
|
||||
mod typing;
|
||||
mod unversioned;
|
||||
|
@ -56,6 +57,7 @@ pub use state::*;
|
|||
pub use sync::*;
|
||||
pub use tag::*;
|
||||
pub use thirdparty::*;
|
||||
pub use threads::*;
|
||||
pub use to_device::*;
|
||||
pub use typing::*;
|
||||
pub use unversioned::*;
|
||||
|
|
10
src/api/client_server/relations.rs
Normal file
10
src/api/client_server/relations.rs
Normal file
|
@ -0,0 +1,10 @@
|
|||
use crate::{services, Result, Ruma};
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
/// # `GET /_matrix/client/r0/todo`
|
||||
pub async fn get_relating_events_route(
|
||||
body: Ruma<get_turn_server_info::v3::Request>,
|
||||
) -> Result<get_turn_server_info::v3::Response> {
|
||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||
todo!();
|
||||
}
|
49
src/api/client_server/threads.rs
Normal file
49
src/api/client_server/threads.rs
Normal file
|
@ -0,0 +1,49 @@
|
|||
use ruma::api::client::{error::ErrorKind, threads::get_threads};
|
||||
|
||||
use crate::{services, Error, Result, Ruma};
|
||||
|
||||
/// # `GET /_matrix/client/r0/rooms/{roomId}/threads`
|
||||
pub async fn get_threads_route(
|
||||
body: Ruma<get_threads::v1::Request>,
|
||||
) -> Result<get_threads::v1::Response> {
|
||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||
|
||||
// Use limit or else 10, with maximum 100
|
||||
let limit = body
|
||||
.limit
|
||||
.and_then(|l| l.try_into().ok())
|
||||
.unwrap_or(10)
|
||||
.min(100);
|
||||
|
||||
let from = if let Some(from) = &body.from {
|
||||
from.parse()
|
||||
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, ""))?
|
||||
} else {
|
||||
u64::MAX
|
||||
};
|
||||
|
||||
let threads = services()
|
||||
.rooms
|
||||
.threads
|
||||
.threads_until(sender_user, &body.room_id, from, &body.include)?
|
||||
.take(limit)
|
||||
.filter_map(|r| r.ok())
|
||||
.filter(|(_, pdu)| {
|
||||
services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.user_can_see_event(sender_user, &body.room_id, &pdu.event_id)
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let next_batch = threads.last().map(|(count, _)| count.to_string());
|
||||
|
||||
Ok(get_threads::v1::Response {
|
||||
chunk: threads
|
||||
.into_iter()
|
||||
.map(|(_, pdu)| pdu.to_room_event())
|
||||
.collect(),
|
||||
next_batch,
|
||||
})
|
||||
}
|
|
@ -23,6 +23,8 @@ pub async fn get_supported_versions_route(
|
|||
"r0.6.0".to_owned(),
|
||||
"v1.1".to_owned(),
|
||||
"v1.2".to_owned(),
|
||||
"v1.3".to_owned(),
|
||||
"v1.4".to_owned(),
|
||||
],
|
||||
unstable_features: BTreeMap::from_iter([("org.matrix.e2e_cross_signing".to_owned(), true)]),
|
||||
};
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
#[allow(deprecated)]
|
||||
use crate::{
|
||||
api::client_server::{self, claim_keys_helper, get_keys_helper},
|
||||
service::pdu::{gen_event_id_canonical_json, PduBuilder},
|
||||
|
@ -497,6 +498,9 @@ async fn request_well_known(destination: &str) -> Option<String> {
|
|||
.send()
|
||||
.await;
|
||||
debug!("Got well known response");
|
||||
if let Err(e) = &response {
|
||||
error!("Well known error: {e:?}");
|
||||
}
|
||||
let text = response.ok()?.text().await;
|
||||
debug!("Got well known response text");
|
||||
let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?;
|
||||
|
|
|
@ -12,6 +12,7 @@ mod state;
|
|||
mod state_accessor;
|
||||
mod state_cache;
|
||||
mod state_compressor;
|
||||
mod threads;
|
||||
mod timeline;
|
||||
mod user;
|
||||
|
||||
|
|
|
@ -5,6 +5,13 @@ use ruma::{EventId, RoomId};
|
|||
use crate::{database::KeyValueDatabase, service, Result};
|
||||
|
||||
impl service::rooms::pdu_metadata::Data for KeyValueDatabase {
|
||||
fn add_relation(&self, from: u64, to: u64) -> Result<()> {
|
||||
let mut key = from.to_be_bytes().to_vec();
|
||||
key.extend_from_slice(&to.to_be_bytes());
|
||||
self.fromto_relation.insert(&key, &[])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> {
|
||||
for prev in event_ids {
|
||||
let mut key = room_id.as_bytes().to_vec();
|
||||
|
|
78
src/database/key_value/rooms/threads.rs
Normal file
78
src/database/key_value/rooms/threads.rs
Normal file
|
@ -0,0 +1,78 @@
|
|||
use std::mem;
|
||||
|
||||
use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId};
|
||||
|
||||
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
|
||||
|
||||
impl service::rooms::threads::Data for KeyValueDatabase {
|
||||
fn threads_until<'a>(
|
||||
&'a self,
|
||||
user_id: &'a UserId,
|
||||
room_id: &'a RoomId,
|
||||
until: u64,
|
||||
include: &'a IncludeThreads,
|
||||
) -> Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>> {
|
||||
let prefix = services()
|
||||
.rooms
|
||||
.short
|
||||
.get_shortroomid(room_id)?
|
||||
.expect("room exists")
|
||||
.to_be_bytes()
|
||||
.to_vec();
|
||||
|
||||
let mut current = prefix.clone();
|
||||
current.extend_from_slice(&(until - 1).to_be_bytes());
|
||||
|
||||
Ok(Box::new(
|
||||
self.threadid_userids
|
||||
.iter_from(¤t, true)
|
||||
.take_while(move |(k, _)| k.starts_with(&prefix))
|
||||
.map(move |(pduid, users)| {
|
||||
let count = utils::u64_from_bytes(&pduid[(mem::size_of::<u64>())..])
|
||||
.map_err(|_| Error::bad_database("Invalid pduid in threadid_userids."))?;
|
||||
let mut pdu = services()
|
||||
.rooms
|
||||
.timeline
|
||||
.get_pdu_from_id(&pduid)?
|
||||
.ok_or_else(|| {
|
||||
Error::bad_database("Invalid pduid reference in threadid_userids")
|
||||
})?;
|
||||
if pdu.sender != user_id {
|
||||
pdu.remove_transaction_id()?;
|
||||
}
|
||||
Ok((count, pdu))
|
||||
}),
|
||||
))
|
||||
}
|
||||
|
||||
fn update_participants(&self, root_id: &[u8], participants: &[OwnedUserId]) -> Result<()> {
|
||||
let users = participants
|
||||
.iter()
|
||||
.map(|user| user.as_bytes())
|
||||
.collect::<Vec<_>>()
|
||||
.join(&[0xff][..]);
|
||||
|
||||
self.threadid_userids.insert(&root_id, &users)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_participants(&self, root_id: &[u8]) -> Result<Option<Vec<OwnedUserId>>> {
|
||||
if let Some(users) = self.threadid_userids.get(&root_id)? {
|
||||
Ok(Some(
|
||||
users
|
||||
.split(|b| *b == 0xff)
|
||||
.map(|bytes| {
|
||||
UserId::parse(utils::string_from_bytes(bytes).map_err(|_| {
|
||||
Error::bad_database("Invalid UserId bytes in threadid_userids.")
|
||||
})?)
|
||||
.map_err(|_| Error::bad_database("Invalid UserId in threadid_userids."))
|
||||
})
|
||||
.filter_map(|r| r.ok())
|
||||
.collect(),
|
||||
))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -198,19 +198,30 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|||
}
|
||||
|
||||
/// Removes a pdu and creates a new one with the same id.
|
||||
fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> {
|
||||
fn replace_pdu(
|
||||
&self,
|
||||
pdu_id: &[u8],
|
||||
pdu_json: &CanonicalJsonObject,
|
||||
pdu: &PduEvent,
|
||||
) -> Result<()> {
|
||||
if self.pduid_pdu.get(pdu_id)?.is_some() {
|
||||
self.pduid_pdu.insert(
|
||||
pdu_id,
|
||||
&serde_json::to_vec(pdu).expect("CanonicalJsonObject is always a valid"),
|
||||
&serde_json::to_vec(pdu_json).expect("CanonicalJsonObject is always a valid"),
|
||||
)?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::BadRequest(
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::NotFound,
|
||||
"PDU does not exist.",
|
||||
))
|
||||
));
|
||||
}
|
||||
|
||||
self.pdu_cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.remove(&(*pdu.event_id).to_owned());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns an iterator over all events and their tokens in a room that happened before the
|
||||
|
|
|
@ -80,6 +80,8 @@ pub struct KeyValueDatabase {
|
|||
pub(super) aliasid_alias: Arc<dyn KvTree>, // AliasId = RoomId + Count
|
||||
pub(super) publicroomids: Arc<dyn KvTree>,
|
||||
|
||||
pub(super) threadid_userids: Arc<dyn KvTree>, // ThreadId = RoomId + Count
|
||||
|
||||
pub(super) tokenids: Arc<dyn KvTree>, // TokenId = ShortRoomId + Token + PduIdCount
|
||||
|
||||
/// Participating servers in a room.
|
||||
|
@ -128,6 +130,8 @@ pub struct KeyValueDatabase {
|
|||
pub(super) eventid_outlierpdu: Arc<dyn KvTree>,
|
||||
pub(super) softfailedeventids: Arc<dyn KvTree>,
|
||||
|
||||
/// ShortEventId + ShortEventId -> ().
|
||||
pub(super) fromto_relation: Arc<dyn KvTree>,
|
||||
/// RoomId + EventId -> Parent PDU EventId.
|
||||
pub(super) referencedevents: Arc<dyn KvTree>,
|
||||
|
||||
|
@ -302,6 +306,8 @@ impl KeyValueDatabase {
|
|||
aliasid_alias: builder.open_tree("aliasid_alias")?,
|
||||
publicroomids: builder.open_tree("publicroomids")?,
|
||||
|
||||
threadid_userids: builder.open_tree("threadid_userids")?,
|
||||
|
||||
tokenids: builder.open_tree("tokenids")?,
|
||||
|
||||
roomserverids: builder.open_tree("roomserverids")?,
|
||||
|
@ -342,6 +348,7 @@ impl KeyValueDatabase {
|
|||
eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?,
|
||||
softfailedeventids: builder.open_tree("softfailedeventids")?,
|
||||
|
||||
fromto_relation: builder.open_tree("fromto_relation")?,
|
||||
referencedevents: builder.open_tree("referencedevents")?,
|
||||
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
|
||||
roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?,
|
||||
|
|
|
@ -383,6 +383,7 @@ fn routes() -> Router {
|
|||
.ruma_route(client_server::set_pushers_route)
|
||||
// .ruma_route(client_server::third_party_route)
|
||||
.ruma_route(client_server::upgrade_room_route)
|
||||
.ruma_route(client_server::get_threads_route)
|
||||
.ruma_route(server_server::get_server_version_route)
|
||||
.route(
|
||||
"/_matrix/key/v2/server",
|
||||
|
|
|
@ -8,7 +8,7 @@ use image::imageops::FilterType;
|
|||
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
io::{AsyncReadExt, AsyncWriteExt, BufReader},
|
||||
};
|
||||
|
||||
pub struct FileMeta {
|
||||
|
@ -70,7 +70,9 @@ impl Service {
|
|||
{
|
||||
let path = services().globals.get_media_file(&key);
|
||||
let mut file = Vec::new();
|
||||
File::open(path).await?.read_to_end(&mut file).await?;
|
||||
BufReader::new(File::open(path).await?)
|
||||
.read_to_end(&mut file)
|
||||
.await?;
|
||||
|
||||
Ok(Some(FileMeta {
|
||||
content_disposition,
|
||||
|
|
|
@ -97,6 +97,7 @@ impl Services {
|
|||
db,
|
||||
lasttimelinecount_cache: Mutex::new(HashMap::new()),
|
||||
},
|
||||
threads: rooms::threads::Service { db },
|
||||
user: rooms::user::Service { db },
|
||||
},
|
||||
transaction_ids: transaction_ids::Service { db },
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
use crate::Error;
|
||||
use ruma::{
|
||||
events::{
|
||||
room::member::RoomMemberEventContent, AnyEphemeralRoomEvent, AnyStateEvent,
|
||||
AnyStrippedStateEvent, AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent,
|
||||
StateEvent, TimelineEventType,
|
||||
room::member::RoomMemberEventContent, AnyEphemeralRoomEvent, AnyMessageLikeEvent,
|
||||
AnyStateEvent, AnyStrippedStateEvent, AnySyncStateEvent, AnySyncTimelineEvent,
|
||||
AnyTimelineEvent, StateEvent, TimelineEventType,
|
||||
},
|
||||
serde::Raw,
|
||||
state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
|
||||
|
@ -175,6 +175,30 @@ impl PduEvent {
|
|||
serde_json::from_value(json).expect("Raw::from_value always works")
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn to_message_like_event(&self) -> Raw<AnyMessageLikeEvent> {
|
||||
let mut json = json!({
|
||||
"content": self.content,
|
||||
"type": self.kind,
|
||||
"event_id": self.event_id,
|
||||
"sender": self.sender,
|
||||
"origin_server_ts": self.origin_server_ts,
|
||||
"room_id": self.room_id,
|
||||
});
|
||||
|
||||
if let Some(unsigned) = &self.unsigned {
|
||||
json["unsigned"] = json!(unsigned);
|
||||
}
|
||||
if let Some(state_key) = &self.state_key {
|
||||
json["state_key"] = json!(state_key);
|
||||
}
|
||||
if let Some(redacts) = &self.redacts {
|
||||
json["redacts"] = json!(redacts);
|
||||
}
|
||||
|
||||
serde_json::from_value(json).expect("Raw::from_value always works")
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn to_state_event(&self) -> Raw<AnyStateEvent> {
|
||||
let mut json = json!({
|
||||
|
|
|
@ -162,9 +162,7 @@ impl Service {
|
|||
&pdu.room_id,
|
||||
)? {
|
||||
let n = match action {
|
||||
Action::DontNotify => false,
|
||||
// TODO: Implement proper support for coalesce
|
||||
Action::Notify | Action::Coalesce => true,
|
||||
Action::Notify => true,
|
||||
Action::SetTweak(tweak) => {
|
||||
tweaks.push(tweak.clone());
|
||||
continue;
|
||||
|
|
|
@ -13,6 +13,7 @@ pub mod state;
|
|||
pub mod state_accessor;
|
||||
pub mod state_cache;
|
||||
pub mod state_compressor;
|
||||
pub mod threads;
|
||||
pub mod timeline;
|
||||
pub mod user;
|
||||
|
||||
|
@ -32,6 +33,7 @@ pub trait Data:
|
|||
+ state_cache::Data
|
||||
+ state_compressor::Data
|
||||
+ timeline::Data
|
||||
+ threads::Data
|
||||
+ user::Data
|
||||
{
|
||||
}
|
||||
|
@ -53,5 +55,6 @@ pub struct Service {
|
|||
pub state_cache: state_cache::Service,
|
||||
pub state_compressor: state_compressor::Service,
|
||||
pub timeline: timeline::Service,
|
||||
pub threads: threads::Service,
|
||||
pub user: user::Service,
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ use crate::Result;
|
|||
use ruma::{EventId, RoomId};
|
||||
|
||||
pub trait Data: Send + Sync {
|
||||
fn add_relation(&self, from: u64, to: u64) -> Result<()>;
|
||||
fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()>;
|
||||
fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool>;
|
||||
fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()>;
|
||||
|
|
|
@ -4,13 +4,20 @@ use std::sync::Arc;
|
|||
pub use data::Data;
|
||||
use ruma::{EventId, RoomId};
|
||||
|
||||
use crate::Result;
|
||||
use crate::{services, Result};
|
||||
|
||||
pub struct Service {
|
||||
pub db: &'static dyn Data,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
#[tracing::instrument(skip(self, from, to))]
|
||||
pub fn add_relation(&self, from: &EventId, to: &EventId) -> Result<()> {
|
||||
let from = services().rooms.short.get_or_create_shorteventid(from)?;
|
||||
let to = services().rooms.short.get_or_create_shorteventid(to)?;
|
||||
self.db.add_relation(from, to)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, room_id, event_ids))]
|
||||
pub fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> {
|
||||
self.db.mark_as_referenced(room_id, event_ids)
|
||||
|
|
15
src/service/rooms/threads/data.rs
Normal file
15
src/service/rooms/threads/data.rs
Normal file
|
@ -0,0 +1,15 @@
|
|||
use crate::{PduEvent, Result};
|
||||
use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId};
|
||||
|
||||
pub trait Data: Send + Sync {
|
||||
fn threads_until<'a>(
|
||||
&'a self,
|
||||
user_id: &'a UserId,
|
||||
room_id: &'a RoomId,
|
||||
until: u64,
|
||||
include: &'a IncludeThreads,
|
||||
) -> Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>>;
|
||||
|
||||
fn update_participants(&self, root_id: &[u8], participants: &[OwnedUserId]) -> Result<()>;
|
||||
fn get_participants(&self, root_id: &[u8]) -> Result<Option<Vec<OwnedUserId>>>;
|
||||
}
|
119
src/service/rooms/threads/mod.rs
Normal file
119
src/service/rooms/threads/mod.rs
Normal file
|
@ -0,0 +1,119 @@
|
|||
mod data;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use data::Data;
|
||||
use ruma::{
|
||||
api::client::{error::ErrorKind, threads::get_threads::v1::IncludeThreads},
|
||||
events::{relation::BundledThread, StateEventType},
|
||||
uint, CanonicalJsonValue, EventId, OwnedUserId, RoomId, UserId,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use serde_json::json;
|
||||
|
||||
use crate::{services, utils, Error, PduEvent, Result};
|
||||
|
||||
use super::timeline::PduCount;
|
||||
|
||||
pub struct Service {
|
||||
pub db: &'static dyn Data,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn threads_until<'a>(
|
||||
&'a self,
|
||||
user_id: &'a UserId,
|
||||
room_id: &'a RoomId,
|
||||
until: u64,
|
||||
include: &'a IncludeThreads,
|
||||
) -> Result<impl Iterator<Item = Result<(u64, PduEvent)>> + 'a> {
|
||||
self.db.threads_until(user_id, room_id, until, include)
|
||||
}
|
||||
|
||||
pub fn add_to_thread<'a>(&'a self, root_event_id: &EventId, pdu: &PduEvent) -> Result<()> {
|
||||
let root_id = &services()
|
||||
.rooms
|
||||
.timeline
|
||||
.get_pdu_id(root_event_id)?
|
||||
.ok_or_else(|| {
|
||||
Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Invalid event id in thread message",
|
||||
)
|
||||
})?;
|
||||
|
||||
let root_pdu = services()
|
||||
.rooms
|
||||
.timeline
|
||||
.get_pdu_from_id(root_id)?
|
||||
.ok_or_else(|| {
|
||||
Error::BadRequest(ErrorKind::InvalidParam, "Thread root pdu not found")
|
||||
})?;
|
||||
|
||||
let mut root_pdu_json = services()
|
||||
.rooms
|
||||
.timeline
|
||||
.get_pdu_json_from_id(root_id)?
|
||||
.ok_or_else(|| {
|
||||
Error::BadRequest(ErrorKind::InvalidParam, "Thread root pdu not found")
|
||||
})?;
|
||||
|
||||
if let CanonicalJsonValue::Object(unsigned) = root_pdu_json
|
||||
.entry("unsigned".to_owned())
|
||||
.or_insert_with(|| CanonicalJsonValue::Object(Default::default()))
|
||||
{
|
||||
if let Some(mut relations) = unsigned
|
||||
.get("m.relations")
|
||||
.and_then(|r| r.as_object())
|
||||
.and_then(|r| r.get("m.thread"))
|
||||
.and_then(|relations| {
|
||||
serde_json::from_value::<BundledThread>(relations.clone().into()).ok()
|
||||
})
|
||||
{
|
||||
// Thread already existed
|
||||
relations.count += uint!(1);
|
||||
relations.latest_event = pdu.to_message_like_event();
|
||||
|
||||
let content = serde_json::to_value(relations).expect("to_value always works");
|
||||
|
||||
unsigned.insert(
|
||||
"m.relations".to_owned(),
|
||||
json!({ "m.thread": content })
|
||||
.try_into()
|
||||
.expect("thread is valid json"),
|
||||
);
|
||||
} else {
|
||||
// New thread
|
||||
let relations = BundledThread {
|
||||
latest_event: pdu.to_message_like_event(),
|
||||
count: uint!(1),
|
||||
current_user_participated: true,
|
||||
};
|
||||
|
||||
let content = serde_json::to_value(relations).expect("to_value always works");
|
||||
|
||||
unsigned.insert(
|
||||
"m.relations".to_owned(),
|
||||
json!({ "m.thread": content })
|
||||
.try_into()
|
||||
.expect("thread is valid json"),
|
||||
);
|
||||
}
|
||||
|
||||
services()
|
||||
.rooms
|
||||
.timeline
|
||||
.replace_pdu(root_id, &root_pdu_json, &root_pdu)?;
|
||||
}
|
||||
|
||||
let mut users = Vec::new();
|
||||
if let Some(userids) = self.db.get_participants(&root_id)? {
|
||||
users.extend_from_slice(&userids);
|
||||
users.push(pdu.sender.clone());
|
||||
} else {
|
||||
users.push(root_pdu.sender);
|
||||
users.push(pdu.sender.clone());
|
||||
}
|
||||
|
||||
self.db.update_participants(root_id, &users)
|
||||
}
|
||||
}
|
|
@ -57,7 +57,12 @@ pub trait Data: Send + Sync {
|
|||
) -> Result<()>;
|
||||
|
||||
/// Removes a pdu and creates a new one with the same id.
|
||||
fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()>;
|
||||
fn replace_pdu(
|
||||
&self,
|
||||
pdu_id: &[u8],
|
||||
pdu_json: &CanonicalJsonObject,
|
||||
pdu: &PduEvent,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Returns an iterator over all events and their tokens in a room that happened before the
|
||||
/// event with id `until` in reverse-chronological order.
|
||||
|
|
|
@ -18,13 +18,13 @@ use ruma::{
|
|||
events::{
|
||||
push_rules::PushRulesEvent,
|
||||
room::{
|
||||
create::RoomCreateEventContent, member::MembershipState,
|
||||
create::RoomCreateEventContent, encrypted::Relation, member::MembershipState,
|
||||
power_levels::RoomPowerLevelsEventContent,
|
||||
},
|
||||
GlobalAccountDataEventType, StateEventType, TimelineEventType,
|
||||
},
|
||||
push::{Action, Ruleset, Tweak},
|
||||
serde::Base64,
|
||||
serde::{Base64, JsonObject},
|
||||
state_res,
|
||||
state_res::{Event, RoomVersion},
|
||||
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
|
||||
|
@ -197,8 +197,13 @@ impl Service {
|
|||
|
||||
/// Removes a pdu and creates a new one with the same id.
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> {
|
||||
self.db.replace_pdu(pdu_id, pdu)
|
||||
pub fn replace_pdu(
|
||||
&self,
|
||||
pdu_id: &[u8],
|
||||
pdu_json: &CanonicalJsonObject,
|
||||
pdu: &PduEvent,
|
||||
) -> Result<()> {
|
||||
self.db.replace_pdu(pdu_id, pdu_json, pdu)
|
||||
}
|
||||
|
||||
/// Creates a new persisted data unit and adds it to a room.
|
||||
|
@ -352,9 +357,7 @@ impl Service {
|
|||
&pdu.room_id,
|
||||
)? {
|
||||
match action {
|
||||
Action::DontNotify => notify = false,
|
||||
// TODO: Implement proper support for coalesce
|
||||
Action::Notify | Action::Coalesce => notify = true,
|
||||
Action::Notify => notify = true,
|
||||
Action::SetTweak(Tweak::Highlight(true)) => {
|
||||
highlight = true;
|
||||
}
|
||||
|
@ -457,6 +460,50 @@ impl Service {
|
|||
_ => {}
|
||||
}
|
||||
|
||||
// Update Relationships
|
||||
#[derive(Deserialize)]
|
||||
struct ExtractRelatesTo {
|
||||
#[serde(rename = "m.relates_to")]
|
||||
relates_to: Relation,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
struct ExtractEventId {
|
||||
event_id: OwnedEventId,
|
||||
}
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
struct ExtractRelatesToEventId {
|
||||
#[serde(rename = "m.relates_to")]
|
||||
relates_to: ExtractEventId,
|
||||
}
|
||||
|
||||
if let Ok(content) = serde_json::from_str::<ExtractRelatesToEventId>(pdu.content.get()) {
|
||||
services()
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_relation(&pdu.event_id, &content.relates_to.event_id)?;
|
||||
}
|
||||
|
||||
if let Ok(content) = serde_json::from_str::<ExtractRelatesTo>(pdu.content.get()) {
|
||||
match content.relates_to {
|
||||
Relation::Reply { in_reply_to } => {
|
||||
// We need to do it again here, because replies don't have
|
||||
// event_id as a top level field
|
||||
services()
|
||||
.rooms
|
||||
.pdu_metadata
|
||||
.add_relation(&pdu.event_id, &in_reply_to.event_id)?;
|
||||
}
|
||||
Relation::Thread(thread) => {
|
||||
services()
|
||||
.rooms
|
||||
.threads
|
||||
.add_to_thread(&thread.event_id, pdu)?;
|
||||
}
|
||||
_ => {} // TODO: Aggregate other types
|
||||
}
|
||||
}
|
||||
|
||||
for appservice in services().appservice.all()? {
|
||||
if services()
|
||||
.rooms
|
||||
|
@ -957,12 +1004,17 @@ impl Service {
|
|||
/// Replace a PDU with the redacted form.
|
||||
#[tracing::instrument(skip(self, reason))]
|
||||
pub fn redact_pdu(&self, event_id: &EventId, reason: &PduEvent) -> Result<()> {
|
||||
// TODO: Don't reserialize, keep original json
|
||||
if let Some(pdu_id) = self.get_pdu_id(event_id)? {
|
||||
let mut pdu = self
|
||||
.get_pdu_from_id(&pdu_id)?
|
||||
.ok_or_else(|| Error::bad_database("PDU ID points to invalid PDU."))?;
|
||||
pdu.redact(reason)?;
|
||||
self.replace_pdu(&pdu_id, &pdu)?;
|
||||
self.replace_pdu(
|
||||
&pdu_id,
|
||||
&utils::to_canonical_object(&pdu).expect("PDU is an object"),
|
||||
&pdu,
|
||||
)?;
|
||||
}
|
||||
// If event does not exist, just noop
|
||||
Ok(())
|
||||
|
|
Loading…
Reference in a new issue