mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-02-05 23:18:27 +03:00
fix: make backfilled events reachable
This commit is contained in:
parent
fcfb06ffa6
commit
17a6431f5f
3 changed files with 79 additions and 64 deletions
src
|
@ -10,28 +10,6 @@ use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEven
|
||||||
use service::rooms::timeline::PduCount;
|
use service::rooms::timeline::PduCount;
|
||||||
|
|
||||||
impl service::rooms::timeline::Data for KeyValueDatabase {
|
impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> {
|
|
||||||
let prefix = services()
|
|
||||||
.rooms
|
|
||||||
.short
|
|
||||||
.get_shortroomid(room_id)?
|
|
||||||
.expect("room exists")
|
|
||||||
.to_be_bytes()
|
|
||||||
.to_vec();
|
|
||||||
|
|
||||||
// Look for PDUs in that room.
|
|
||||||
self.pduid_pdu
|
|
||||||
.iter_from(&prefix, false)
|
|
||||||
.filter(|(k, _)| k.starts_with(&prefix))
|
|
||||||
.map(|(_, pdu)| {
|
|
||||||
serde_json::from_slice(&pdu)
|
|
||||||
.map_err(|_| Error::bad_database("Invalid first PDU in db."))
|
|
||||||
.map(Arc::new)
|
|
||||||
})
|
|
||||||
.next()
|
|
||||||
.transpose()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<PduCount> {
|
fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<PduCount> {
|
||||||
match self
|
match self
|
||||||
.lasttimelinecount_cache
|
.lasttimelinecount_cache
|
||||||
|
@ -81,20 +59,18 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
|
|
||||||
/// Returns the json of a pdu.
|
/// Returns the json of a pdu.
|
||||||
fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
|
fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
|
||||||
self.eventid_pduid
|
self.get_non_outlier_pdu_json(event_id)?.map_or_else(
|
||||||
|
|| {
|
||||||
|
self.eventid_outlierpdu
|
||||||
.get(event_id.as_bytes())?
|
.get(event_id.as_bytes())?
|
||||||
.map_or_else(
|
|
||||||
|| self.eventid_outlierpdu.get(event_id.as_bytes()),
|
|
||||||
|pduid| {
|
|
||||||
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
|
|
||||||
Error::bad_database("Invalid pduid in eventid_pduid.")
|
|
||||||
})?))
|
|
||||||
},
|
|
||||||
)?
|
|
||||||
.map(|pdu| {
|
.map(|pdu| {
|
||||||
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
serde_json::from_slice(&pdu)
|
||||||
|
.map_err(|_| Error::bad_database("Invalid PDU in db."))
|
||||||
})
|
})
|
||||||
.transpose()
|
.transpose()
|
||||||
|
},
|
||||||
|
|x| Ok(Some(x)),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the json of a pdu.
|
/// Returns the json of a pdu.
|
||||||
|
@ -107,6 +83,21 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid."))
|
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid."))
|
||||||
})
|
})
|
||||||
.transpose()?
|
.transpose()?
|
||||||
|
.map_or_else(
|
||||||
|
|| {
|
||||||
|
Ok::<_, Error>(
|
||||||
|
self.eventid_backfillpduid
|
||||||
|
.get(event_id.as_bytes())?
|
||||||
|
.map(|pduid| {
|
||||||
|
self.pduid_backfillpdu.get(&pduid)?.ok_or_else(|| {
|
||||||
|
Error::bad_database("Invalid pduid in eventid_pduid.")
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.transpose()?,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
|x| Ok(Some(x)),
|
||||||
|
)?
|
||||||
.map(|pdu| {
|
.map(|pdu| {
|
||||||
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
||||||
})
|
})
|
||||||
|
@ -115,7 +106,10 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
|
|
||||||
/// Returns the pdu's id.
|
/// Returns the pdu's id.
|
||||||
fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> {
|
fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> {
|
||||||
self.eventid_pduid.get(event_id.as_bytes())
|
Ok(self.eventid_pduid.get(event_id.as_bytes())?.map_or_else(
|
||||||
|
|| self.eventid_backfillpduid.get(event_id.as_bytes()),
|
||||||
|
|x| Ok(Some(x)),
|
||||||
|
)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the pdu.
|
/// Returns the pdu.
|
||||||
|
@ -130,6 +124,21 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid."))
|
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid."))
|
||||||
})
|
})
|
||||||
.transpose()?
|
.transpose()?
|
||||||
|
.map_or_else(
|
||||||
|
|| {
|
||||||
|
Ok::<_, Error>(
|
||||||
|
self.eventid_backfillpduid
|
||||||
|
.get(event_id.as_bytes())?
|
||||||
|
.map(|pduid| {
|
||||||
|
self.pduid_backfillpdu.get(&pduid)?.ok_or_else(|| {
|
||||||
|
Error::bad_database("Invalid pduid in eventid_pduid.")
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.transpose()?,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
|x| Ok(Some(x)),
|
||||||
|
)?
|
||||||
.map(|pdu| {
|
.map(|pdu| {
|
||||||
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
||||||
})
|
})
|
||||||
|
@ -145,22 +154,20 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(pdu) = self
|
if let Some(pdu) = self
|
||||||
.eventid_pduid
|
.get_non_outlier_pdu(event_id)?
|
||||||
.get(event_id.as_bytes())?
|
|
||||||
.map_or_else(
|
.map_or_else(
|
||||||
|| self.eventid_outlierpdu.get(event_id.as_bytes()),
|
|| {
|
||||||
|pduid| {
|
self.eventid_outlierpdu
|
||||||
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
|
.get(event_id.as_bytes())?
|
||||||
Error::bad_database("Invalid pduid in eventid_pduid.")
|
|
||||||
})?))
|
|
||||||
},
|
|
||||||
)?
|
|
||||||
.map(|pdu| {
|
.map(|pdu| {
|
||||||
serde_json::from_slice(&pdu)
|
serde_json::from_slice(&pdu)
|
||||||
.map_err(|_| Error::bad_database("Invalid PDU in db."))
|
.map_err(|_| Error::bad_database("Invalid PDU in db."))
|
||||||
.map(Arc::new)
|
|
||||||
})
|
})
|
||||||
.transpose()?
|
.transpose()
|
||||||
|
},
|
||||||
|
|x| Ok(Some(x)),
|
||||||
|
)?
|
||||||
|
.map(Arc::new)
|
||||||
{
|
{
|
||||||
self.pdu_cache
|
self.pdu_cache
|
||||||
.lock()
|
.lock()
|
||||||
|
@ -176,7 +183,10 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
///
|
///
|
||||||
/// This does __NOT__ check the outliers `Tree`.
|
/// This does __NOT__ check the outliers `Tree`.
|
||||||
fn get_pdu_from_id(&self, pdu_id: &[u8]) -> Result<Option<PduEvent>> {
|
fn get_pdu_from_id(&self, pdu_id: &[u8]) -> Result<Option<PduEvent>> {
|
||||||
self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| {
|
self.pduid_pdu
|
||||||
|
.get(pdu_id)?
|
||||||
|
.map_or_else(|| self.pduid_backfillpdu.get(pdu_id), |x| Ok(Some(x)))?
|
||||||
|
.map_or(Ok(None), |pdu| {
|
||||||
Ok(Some(
|
Ok(Some(
|
||||||
serde_json::from_slice(&pdu)
|
serde_json::from_slice(&pdu)
|
||||||
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
|
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
|
||||||
|
@ -186,7 +196,10 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
|
|
||||||
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
|
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
|
||||||
fn get_pdu_json_from_id(&self, pdu_id: &[u8]) -> Result<Option<CanonicalJsonObject>> {
|
fn get_pdu_json_from_id(&self, pdu_id: &[u8]) -> Result<Option<CanonicalJsonObject>> {
|
||||||
self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| {
|
self.pduid_pdu
|
||||||
|
.get(pdu_id)?
|
||||||
|
.map_or_else(|| self.pduid_backfillpdu.get(pdu_id), |x| Ok(Some(x)))?
|
||||||
|
.map_or(Ok(None), |pdu| {
|
||||||
Ok(Some(
|
Ok(Some(
|
||||||
serde_json::from_slice(&pdu)
|
serde_json::from_slice(&pdu)
|
||||||
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
|
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
|
||||||
|
|
|
@ -7,7 +7,6 @@ use crate::{PduEvent, Result};
|
||||||
use super::PduCount;
|
use super::PduCount;
|
||||||
|
|
||||||
pub trait Data: Send + Sync {
|
pub trait Data: Send + Sync {
|
||||||
fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>>;
|
|
||||||
fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<PduCount>;
|
fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<PduCount>;
|
||||||
|
|
||||||
/// Returns the `count` of this pdu's id.
|
/// Returns the `count` of this pdu's id.
|
||||||
|
|
|
@ -114,7 +114,10 @@ pub struct Service {
|
||||||
impl Service {
|
impl Service {
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> {
|
pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> {
|
||||||
self.db.first_pdu_in_room(room_id)
|
self.all_pdus(&user_id!("@doesntmatter:conduit.rs"), &room_id)?
|
||||||
|
.next()
|
||||||
|
.map(|o| o.map(|(_, p)| Arc::new(p)))
|
||||||
|
.transpose()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
|
|
Loading…
Reference in a new issue