fix: panic on launch

Now we start the admin and sending threads at a later time.
This commit is contained in:
Timo Kösters 2022-10-08 13:57:01 +02:00 committed by Nyaaori
parent 50b0eb9929
commit 8b5b7a1f63
No known key found for this signature in database
GPG key ID: E7819C3ED4D1F82E
20 changed files with 46 additions and 53 deletions

View file

@ -29,7 +29,7 @@ use ruma::{
}, },
ServerName, UInt, ServerName, UInt,
}; };
use tracing::{info, warn}; use tracing::{error, info, warn};
/// # `POST /_matrix/client/r0/publicRooms` /// # `POST /_matrix/client/r0/publicRooms`
/// ///
@ -279,15 +279,14 @@ pub(crate) async fn get_public_rooms_filtered_helper(
JoinRule::Knock => Some(PublicRoomJoinRule::Knock), JoinRule::Knock => Some(PublicRoomJoinRule::Knock),
_ => None, _ => None,
}) })
.map_err(|_| { .map_err(|e| {
Error::bad_database("Invalid room join rule event in database.") error!("Invalid room join rule event in database: {}", e);
Error::BadDatabase("Invalid room join rule event in database.")
}) })
}) })
.transpose()? .transpose()?
.flatten() .flatten()
.ok_or(Error::bad_database( .ok_or_else(|| Error::bad_database("Missing room join rule event for room."))?,
"Invalid room join rule event in database.",
))?,
room_id, room_id,
}; };
Ok(chunk) Ok(chunk)

View file

@ -1,10 +1,7 @@
pub mod abstraction; pub mod abstraction;
pub mod key_value; pub mod key_value;
use crate::{ use crate::{services, utils, Config, Error, PduEvent, Result, Services, SERVICES};
services, utils, Config, Error,
PduEvent, Result, Services, SERVICES,
};
use abstraction::KeyValueDatabaseEngine; use abstraction::KeyValueDatabaseEngine;
use abstraction::KvTree; use abstraction::KvTree;
use directories::ProjectDirs; use directories::ProjectDirs;
@ -830,6 +827,8 @@ impl KeyValueDatabase {
// This data is probably outdated // This data is probably outdated
db.presenceid_presence.clear()?; db.presenceid_presence.clear()?;
services().admin.start_handler();
// Set emergency access for the conduit user // Set emergency access for the conduit user
match set_emergency_access() { match set_emergency_access() {
Ok(pwd_set) => { Ok(pwd_set) => {
@ -846,6 +845,8 @@ impl KeyValueDatabase {
} }
}; };
services().sending.start_handler();
Self::start_cleanup_task().await; Self::start_cleanup_task().await;
Ok(()) Ok(())

View file

@ -28,7 +28,7 @@ use http::{
}; };
use opentelemetry::trace::{FutureExt, Tracer}; use opentelemetry::trace::{FutureExt, Tracer};
use ruma::api::{client::error::ErrorKind, IncomingRequest}; use ruma::api::{client::error::ErrorKind, IncomingRequest};
use tokio::{signal}; use tokio::signal;
use tower::ServiceBuilder; use tower::ServiceBuilder;
use tower_http::{ use tower_http::{
cors::{self, CorsLayer}, cors::{self, CorsLayer},

View file

@ -8,7 +8,7 @@ use ruma::{
RoomId, UserId, RoomId, UserId,
}; };
use std::{collections::HashMap}; use std::collections::HashMap;
use crate::Result; use crate::Result;

View file

@ -26,7 +26,7 @@ use ruma::{
EventId, RoomAliasId, RoomId, RoomName, RoomVersionId, ServerName, UserId, EventId, RoomAliasId, RoomId, RoomName, RoomVersionId, ServerName, UserId,
}; };
use serde_json::value::to_raw_value; use serde_json::value::to_raw_value;
use tokio::sync::{mpsc, MutexGuard}; use tokio::sync::{mpsc, Mutex, MutexGuard};
use crate::{ use crate::{
api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH}, api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH},
@ -164,25 +164,29 @@ pub enum AdminRoomEvent {
SendMessage(RoomMessageEventContent), SendMessage(RoomMessageEventContent),
} }
#[derive(Clone)]
pub struct Service { pub struct Service {
pub sender: mpsc::UnboundedSender<AdminRoomEvent>, pub sender: mpsc::UnboundedSender<AdminRoomEvent>,
receiver: Mutex<mpsc::UnboundedReceiver<AdminRoomEvent>>,
} }
impl Service { impl Service {
pub fn build() -> Arc<Self> { pub fn build() -> Arc<Self> {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = mpsc::unbounded_channel();
let self1 = Arc::new(Self { sender }); Arc::new(Self {
let self2 = Arc::clone(&self1); sender,
receiver: Mutex::new(receiver),
tokio::spawn(async move { })
self2.start_handler(receiver).await;
});
self1
} }
async fn start_handler(&self, mut receiver: mpsc::UnboundedReceiver<AdminRoomEvent>) { pub fn start_handler(self: &Arc<Self>) {
let self2 = Arc::clone(&self);
tokio::spawn(async move {
self2.handler().await;
});
}
async fn handler(&self) {
let mut receiver = self.receiver.lock().await;
// TODO: Use futures when we have long admin commands // TODO: Use futures when we have long admin commands
//let mut futures = FuturesUnordered::new(); //let mut futures = FuturesUnordered::new();

View file

@ -1,6 +1,5 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use crate::Result; use crate::Result;

View file

@ -7,7 +7,7 @@ use ruma::{
serde::Raw, serde::Raw,
RoomId, UserId, RoomId, UserId,
}; };
use std::{collections::BTreeMap}; use std::collections::BTreeMap;
pub struct Service { pub struct Service {
pub db: &'static dyn Data, pub db: &'static dyn Data,

View file

@ -1,6 +1,5 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use crate::Result; use crate::Result;

View file

@ -1,6 +1,5 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use ruma::RoomId; use ruma::RoomId;

View file

@ -1,5 +1,5 @@
mod data; mod data;
use std::{collections::HashMap}; use std::collections::HashMap;
pub use data::Data; pub use data::Data;
use ruma::{events::presence::PresenceEvent, RoomId, UserId}; use ruma::{events::presence::PresenceEvent, RoomId, UserId};

View file

@ -1,6 +1,5 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use crate::Result; use crate::Result;

View file

@ -1,6 +1,5 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use ruma::{events::SyncEphemeralRoomEvent, RoomId, UserId}; use ruma::{events::SyncEphemeralRoomEvent, RoomId, UserId};

View file

@ -1,7 +1,7 @@
mod data; mod data;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
sync::{Mutex}, sync::Mutex,
}; };
pub use data::Data; pub use data::Data;

View file

@ -1,6 +1,5 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use ruma::RoomId; use ruma::RoomId;

View file

@ -1,6 +1,5 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use ruma::{signatures::CanonicalJsonObject, EventId}; use ruma::{signatures::CanonicalJsonObject, EventId};

View file

@ -1,6 +1,5 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use crate::Result; use crate::Result;

View file

@ -1,6 +1,5 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use ruma::{RoomId, UserId}; use ruma::{RoomId, UserId};

View file

@ -12,7 +12,7 @@ use std::{
use crate::{ use crate::{
api::{appservice_server, server_server}, api::{appservice_server, server_server},
services, services,
utils::{calculate_hash}, utils::calculate_hash,
Config, Error, PduEvent, Result, Config, Error, PduEvent, Result,
}; };
use federation::transactions::send_transaction_message; use federation::transactions::send_transaction_message;
@ -37,7 +37,7 @@ use ruma::{
}; };
use tokio::{ use tokio::{
select, select,
sync::{mpsc, Semaphore}, sync::{mpsc, Mutex, Semaphore},
}; };
use tracing::{error, warn}; use tracing::{error, warn};
@ -88,6 +88,7 @@ pub struct Service {
/// The state for a given state hash. /// The state for a given state hash.
pub(super) maximum_requests: Arc<Semaphore>, pub(super) maximum_requests: Arc<Semaphore>,
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>, pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>,
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>,
} }
enum TransactionStatus { enum TransactionStatus {
@ -99,25 +100,24 @@ enum TransactionStatus {
impl Service { impl Service {
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = mpsc::unbounded_channel();
Arc::new(Self {
let self1 = Arc::new(Self {
db, db,
sender, sender,
receiver: Mutex::new(receiver),
maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)), maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)),
}); })
let self2 = Arc::clone(&self1);
tokio::spawn(async move {
self2.start_handler(receiver).await.unwrap();
});
self1
} }
async fn start_handler( pub fn start_handler(self: &Arc<Self>) {
&self, let self2 = Arc::clone(&self);
mut receiver: mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>, tokio::spawn(async move {
) -> Result<()> { self2.handler().await.unwrap();
});
}
async fn handler(&self) -> Result<()> {
let mut receiver = self.receiver.lock().await;
let mut futures = FuturesUnordered::new(); let mut futures = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new(); let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new();

View file

@ -1,6 +1,5 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use crate::Result; use crate::Result;

View file

@ -1,6 +1,5 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use ruma::{ use ruma::{