2021-06-08 19:10:00 +03:00
pub mod abstraction ;
2020-07-29 22:27:49 +03:00
pub mod account_data ;
2020-11-09 14:21:04 +03:00
pub mod admin ;
2020-12-08 12:33:44 +03:00
pub mod appservice ;
2020-07-29 22:27:49 +03:00
pub mod globals ;
pub mod key_backups ;
pub mod media ;
2021-07-01 22:38:25 +03:00
pub mod proxy ;
2021-01-27 05:54:35 +03:00
pub mod pusher ;
2020-07-29 22:27:49 +03:00
pub mod rooms ;
2020-09-15 17:13:54 +03:00
pub mod sending ;
2020-08-25 14:24:38 +03:00
pub mod transaction_ids ;
2020-07-29 22:27:49 +03:00
pub mod uiaa ;
pub mod users ;
2020-05-03 18:25:31 +03:00
2021-05-30 22:55:43 +03:00
use crate ::{ utils , Error , Result } ;
2021-06-08 19:10:00 +03:00
use abstraction ::DatabaseEngine ;
2020-03-30 14:46:18 +03:00
use directories ::ProjectDirs ;
2021-06-30 10:52:01 +03:00
use lru_cache ::LruCache ;
2021-07-14 10:07:08 +03:00
use rocket ::{
futures ::{ channel ::mpsc , stream ::FuturesUnordered , StreamExt } ,
2021-07-14 12:28:24 +03:00
outcome ::{ try_outcome , IntoOutcome } ,
2021-07-14 10:07:08 +03:00
request ::{ FromRequest , Request } ,
2021-07-14 15:50:07 +03:00
Shutdown , State ,
2021-07-14 10:07:08 +03:00
} ;
2021-08-01 16:14:54 +03:00
use ruma ::{ DeviceId , EventId , RoomId , ServerName , UserId } ;
2021-07-14 10:07:08 +03:00
use serde ::{ de ::IgnoredAny , Deserialize } ;
2021-06-08 19:24:36 +03:00
use std ::{
2021-08-01 16:14:54 +03:00
collections ::{ BTreeMap , HashMap , HashSet } ,
2021-08-13 00:04:00 +03:00
convert ::{ TryFrom , TryInto } ,
2021-06-08 19:24:36 +03:00
fs ::{ self , remove_dir_all } ,
io ::Write ,
2021-08-01 16:14:54 +03:00
mem ::size_of ,
2021-07-14 10:07:08 +03:00
ops ::Deref ,
path ::Path ,
2021-07-18 21:43:39 +03:00
sync ::{ Arc , Mutex , RwLock } ,
2021-06-08 19:24:36 +03:00
} ;
2021-07-14 10:07:08 +03:00
use tokio ::sync ::{ OwnedRwLockReadGuard , RwLock as TokioRwLock , Semaphore } ;
2021-07-29 09:36:01 +03:00
use tracing ::{ debug , error , warn } ;
2020-12-05 23:03:43 +03:00
2021-07-01 22:38:25 +03:00
use self ::proxy ::ProxyConfig ;
2021-01-15 05:32:22 +03:00
#[ derive(Clone, Debug, Deserialize) ]
2020-12-05 23:03:43 +03:00
pub struct Config {
server_name : Box < ServerName > ,
2020-12-31 23:07:05 +03:00
database_path : String ,
2021-07-14 10:07:08 +03:00
#[ serde(default = " default_db_cache_capacity_mb " ) ]
db_cache_capacity_mb : f64 ,
2021-09-01 12:03:12 +03:00
#[ serde(default = " default_pdu_cache_capacity " ) ]
pdu_cache_capacity : u32 ,
2021-07-14 10:07:08 +03:00
#[ serde(default = " default_sqlite_wal_clean_second_interval " ) ]
sqlite_wal_clean_second_interval : u32 ,
2020-12-05 23:03:43 +03:00
#[ serde(default = " default_max_request_size " ) ]
max_request_size : u32 ,
2020-12-19 18:00:11 +03:00
#[ serde(default = " default_max_concurrent_requests " ) ]
max_concurrent_requests : u16 ,
2021-08-31 19:25:35 +03:00
#[ serde(default = " false_fn " ) ]
2021-01-01 15:47:53 +03:00
allow_registration : bool ,
#[ serde(default = " true_fn " ) ]
allow_encryption : bool ,
#[ serde(default = " false_fn " ) ]
allow_federation : bool ,
2021-09-24 10:16:34 +03:00
#[ serde(default = " true_fn " ) ]
allow_room_creation : bool ,
2021-02-28 14:41:03 +03:00
#[ serde(default = " false_fn " ) ]
pub allow_jaeger : bool ,
2021-07-29 09:36:01 +03:00
#[ serde(default = " false_fn " ) ]
pub tracing_flame : bool ,
2021-04-13 21:15:58 +03:00
#[ serde(default) ]
proxy : ProxyConfig ,
2021-02-07 19:38:45 +03:00
jwt_secret : Option < String > ,
2021-03-01 17:17:53 +03:00
#[ serde(default = " Vec::new " ) ]
trusted_servers : Vec < Box < ServerName > > ,
2021-03-24 00:01:14 +03:00
#[ serde(default = " default_log " ) ]
pub log : String ,
2021-10-01 16:53:16 +03:00
#[ serde(default) ]
turn_username : String ,
#[ serde(default) ]
turn_password : String ,
#[ serde(default = " Vec::new " ) ]
turn_uris : Vec < String > ,
2021-10-02 01:37:39 +03:00
#[ serde(default) ]
turn_secret : String ,
2021-10-01 16:53:16 +03:00
#[ serde(default = " default_turn_ttl " ) ]
turn_ttl : u64 ,
2021-07-14 10:07:08 +03:00
#[ serde(flatten) ]
catchall : BTreeMap < String , IgnoredAny > ,
}
const DEPRECATED_KEYS : & [ & str ] = & [ " cache_capacity " ] ;
impl Config {
pub fn warn_deprecated ( & self ) {
let mut was_deprecated = false ;
for key in self
. catchall
. keys ( )
. filter ( | key | DEPRECATED_KEYS . iter ( ) . any ( | s | s = = key ) )
{
2021-07-29 09:36:01 +03:00
warn! ( " Config parameter {} is deprecated " , key ) ;
2021-07-14 10:07:08 +03:00
was_deprecated = true ;
}
if was_deprecated {
2021-07-29 09:36:01 +03:00
warn! ( " Read conduit documentation and check your configuration if any new configuration parameters should be adjusted " ) ;
2021-07-14 10:07:08 +03:00
}
}
2021-01-01 15:47:53 +03:00
}
fn false_fn ( ) -> bool {
false
}
fn true_fn ( ) -> bool {
true
2020-12-05 23:03:43 +03:00
}
2021-07-14 10:07:08 +03:00
fn default_db_cache_capacity_mb ( ) -> f64 {
200.0
}
2021-09-01 12:03:12 +03:00
fn default_pdu_cache_capacity ( ) -> u32 {
100_000
}
2021-07-14 10:07:08 +03:00
fn default_sqlite_wal_clean_second_interval ( ) -> u32 {
2021-08-14 20:07:50 +03:00
1 * 60 // every minute
2020-12-05 23:03:43 +03:00
}
fn default_max_request_size ( ) -> u32 {
20 * 1024 * 1024 // Default to 20 MB
}
2020-05-06 16:36:44 +03:00
2020-12-19 18:00:11 +03:00
fn default_max_concurrent_requests ( ) -> u16 {
2021-05-24 18:59:06 +03:00
100
2020-12-19 18:00:11 +03:00
}
2021-03-24 00:01:14 +03:00
fn default_log ( ) -> String {
2021-03-26 13:10:45 +03:00
" info,state_res=warn,rocket=off,_=off,sled=off " . to_owned ( )
2021-03-24 00:01:14 +03:00
}
2021-10-01 16:53:16 +03:00
fn default_turn_ttl ( ) -> u64 {
60 * 60 * 24
}
2021-06-12 16:04:28 +03:00
#[ cfg(feature = " sled " ) ]
2021-07-14 10:07:08 +03:00
pub type Engine = abstraction ::sled ::Engine ;
2021-06-12 16:04:28 +03:00
2021-07-14 10:07:08 +03:00
#[ cfg(feature = " sqlite " ) ]
pub type Engine = abstraction ::sqlite ::Engine ;
2021-06-08 19:10:00 +03:00
2021-07-29 21:17:47 +03:00
#[ cfg(feature = " heed " ) ]
pub type Engine = abstraction ::heed ::Engine ;
2020-03-30 14:46:18 +03:00
pub struct Database {
2021-07-14 10:07:08 +03:00
_db : Arc < Engine > ,
2020-05-03 18:25:31 +03:00
pub globals : globals ::Globals ,
pub users : users ::Users ,
2020-06-06 19:44:50 +03:00
pub uiaa : uiaa ::Uiaa ,
2020-05-03 18:25:31 +03:00
pub rooms : rooms ::Rooms ,
pub account_data : account_data ::AccountData ,
2020-05-18 18:53:34 +03:00
pub media : media ::Media ,
2020-06-16 13:11:38 +03:00
pub key_backups : key_backups ::KeyBackups ,
2020-08-25 14:24:38 +03:00
pub transaction_ids : transaction_ids ::TransactionIds ,
2020-09-15 17:13:54 +03:00
pub sending : sending ::Sending ,
2020-11-09 14:21:04 +03:00
pub admin : admin ::Admin ,
2020-12-08 12:33:44 +03:00
pub appservice : appservice ::Appservice ,
2021-01-27 05:54:35 +03:00
pub pusher : pusher ::PushData ,
2020-03-30 14:46:18 +03:00
}
impl Database {
2020-04-10 14:36:57 +03:00
/// Tries to remove the old database but ignores all errors.
2020-06-09 16:13:17 +03:00
pub fn try_remove ( server_name : & str ) -> Result < ( ) > {
2020-04-11 21:03:22 +03:00
let mut path = ProjectDirs ::from ( " xyz " , " koesters " , " conduit " )
2020-11-15 14:17:21 +03:00
. ok_or_else ( | | Error ::bad_config ( " The OS didn't return a valid home directory path. " ) ) ?
2020-04-10 14:36:57 +03:00
. data_dir ( )
. to_path_buf ( ) ;
2020-05-06 16:36:44 +03:00
path . push ( server_name ) ;
2020-04-10 14:36:57 +03:00
let _ = remove_dir_all ( path ) ;
2020-06-09 16:13:17 +03:00
Ok ( ( ) )
2020-04-10 14:36:57 +03:00
}
2021-07-14 10:07:08 +03:00
fn check_sled_or_sqlite_db ( config : & Config ) -> Result < ( ) > {
2021-07-30 13:11:06 +03:00
#[ cfg(feature = " backend_sqlite " ) ]
{
let path = Path ::new ( & config . database_path ) ;
let sled_exists = path . join ( " db " ) . exists ( ) ;
let sqlite_exists = path . join ( " conduit.db " ) . exists ( ) ;
if sled_exists {
if sqlite_exists {
// most likely an in-place directory, only warn
warn! ( " Both sled and sqlite databases are detected in database directory " ) ;
warn! ( " Currently running from the sqlite database, but consider removing sled database files to free up space " )
} else {
error! (
" Sled database detected, conduit now uses sqlite for database operations "
) ;
error! ( " This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite " ) ;
return Err ( Error ::bad_config (
" sled database detected, migrate to sqlite " ,
) ) ;
}
2021-07-14 10:07:08 +03:00
}
}
Ok ( ( ) )
}
2020-03-30 14:46:18 +03:00
/// Load an existing database or create a new one.
2021-07-14 15:50:07 +03:00
pub async fn load_or_create ( config : & Config ) -> Result < Arc < TokioRwLock < Self > > > {
2021-09-13 20:45:56 +03:00
Self ::check_sled_or_sqlite_db ( config ) ? ;
2021-07-14 10:07:08 +03:00
2021-09-07 21:41:14 +03:00
if ! Path ::new ( & config . database_path ) . exists ( ) {
std ::fs ::create_dir_all ( & config . database_path )
. map_err ( | _ | Error ::BadConfig ( " Database folder doesn't exists and couldn't be created (e.g. due to missing permissions). Please create the database folder yourself. " ) ) ? ;
}
2021-09-13 20:45:56 +03:00
let builder = Engine ::open ( config ) ? ;
2020-10-21 22:43:59 +03:00
2021-05-22 11:34:19 +03:00
if config . max_request_size < 1024 {
eprintln! ( " ERROR: Max request size is less than 1KB. Please increase it. " ) ;
}
2020-03-30 14:46:18 +03:00
2020-11-09 14:21:04 +03:00
let ( admin_sender , admin_receiver ) = mpsc ::unbounded ( ) ;
2021-06-08 19:10:00 +03:00
let ( sending_sender , sending_receiver ) = mpsc ::unbounded ( ) ;
2020-11-09 14:21:04 +03:00
2021-07-14 10:07:08 +03:00
let db = Arc ::new ( TokioRwLock ::from ( Self {
_db : builder . clone ( ) ,
2020-05-03 18:25:31 +03:00
users : users ::Users {
2021-06-08 19:10:00 +03:00
userid_password : builder . open_tree ( " userid_password " ) ? ,
userid_displayname : builder . open_tree ( " userid_displayname " ) ? ,
userid_avatarurl : builder . open_tree ( " userid_avatarurl " ) ? ,
2021-07-16 00:17:58 +03:00
userid_blurhash : builder . open_tree ( " userid_blurhash " ) ? ,
2021-06-08 19:10:00 +03:00
userdeviceid_token : builder . open_tree ( " userdeviceid_token " ) ? ,
userdeviceid_metadata : builder . open_tree ( " userdeviceid_metadata " ) ? ,
userid_devicelistversion : builder . open_tree ( " userid_devicelistversion " ) ? ,
token_userdeviceid : builder . open_tree ( " token_userdeviceid " ) ? ,
onetimekeyid_onetimekeys : builder . open_tree ( " onetimekeyid_onetimekeys " ) ? ,
userid_lastonetimekeyupdate : builder . open_tree ( " userid_lastonetimekeyupdate " ) ? ,
keychangeid_userid : builder . open_tree ( " keychangeid_userid " ) ? ,
keyid_key : builder . open_tree ( " keyid_key " ) ? ,
userid_masterkeyid : builder . open_tree ( " userid_masterkeyid " ) ? ,
userid_selfsigningkeyid : builder . open_tree ( " userid_selfsigningkeyid " ) ? ,
userid_usersigningkeyid : builder . open_tree ( " userid_usersigningkeyid " ) ? ,
todeviceid_events : builder . open_tree ( " todeviceid_events " ) ? ,
2020-05-03 18:25:31 +03:00
} ,
2020-06-06 19:44:50 +03:00
uiaa : uiaa ::Uiaa {
2021-06-08 19:10:00 +03:00
userdevicesessionid_uiaainfo : builder . open_tree ( " userdevicesessionid_uiaainfo " ) ? ,
userdevicesessionid_uiaarequest : builder
. open_tree ( " userdevicesessionid_uiaarequest " ) ? ,
2020-06-06 19:44:50 +03:00
} ,
2020-05-03 18:25:31 +03:00
rooms : rooms ::Rooms {
edus : rooms ::RoomEdus {
2021-06-08 19:10:00 +03:00
readreceiptid_readreceipt : builder . open_tree ( " readreceiptid_readreceipt " ) ? ,
roomuserid_privateread : builder . open_tree ( " roomuserid_privateread " ) ? , // "Private" read receipt
roomuserid_lastprivatereadupdate : builder
2021-03-23 14:59:27 +03:00
. open_tree ( " roomuserid_lastprivatereadupdate " ) ? ,
2021-06-08 19:10:00 +03:00
typingid_userid : builder . open_tree ( " typingid_userid " ) ? ,
roomid_lasttypingupdate : builder . open_tree ( " roomid_lasttypingupdate " ) ? ,
presenceid_presence : builder . open_tree ( " presenceid_presence " ) ? ,
userid_lastpresenceupdate : builder . open_tree ( " userid_lastpresenceupdate " ) ? ,
2020-05-03 18:25:31 +03:00
} ,
2021-06-08 19:10:00 +03:00
pduid_pdu : builder . open_tree ( " pduid_pdu " ) ? ,
eventid_pduid : builder . open_tree ( " eventid_pduid " ) ? ,
roomid_pduleaves : builder . open_tree ( " roomid_pduleaves " ) ? ,
alias_roomid : builder . open_tree ( " alias_roomid " ) ? ,
aliasid_alias : builder . open_tree ( " aliasid_alias " ) ? ,
publicroomids : builder . open_tree ( " publicroomids " ) ? ,
tokenids : builder . open_tree ( " tokenids " ) ? ,
roomserverids : builder . open_tree ( " roomserverids " ) ? ,
serverroomids : builder . open_tree ( " serverroomids " ) ? ,
userroomid_joined : builder . open_tree ( " userroomid_joined " ) ? ,
roomuserid_joined : builder . open_tree ( " roomuserid_joined " ) ? ,
2021-08-04 22:15:01 +03:00
roomid_joinedcount : builder . open_tree ( " roomid_joinedcount " ) ? ,
2021-08-28 12:39:33 +03:00
roomid_invitedcount : builder . open_tree ( " roomid_invitedcount " ) ? ,
2021-06-08 19:10:00 +03:00
roomuseroncejoinedids : builder . open_tree ( " roomuseroncejoinedids " ) ? ,
userroomid_invitestate : builder . open_tree ( " userroomid_invitestate " ) ? ,
roomuserid_invitecount : builder . open_tree ( " roomuserid_invitecount " ) ? ,
userroomid_leftstate : builder . open_tree ( " userroomid_leftstate " ) ? ,
roomuserid_leftcount : builder . open_tree ( " roomuserid_leftcount " ) ? ,
userroomid_notificationcount : builder . open_tree ( " userroomid_notificationcount " ) ? ,
userroomid_highlightcount : builder . open_tree ( " userroomid_highlightcount " ) ? ,
statekey_shortstatekey : builder . open_tree ( " statekey_shortstatekey " ) ? ,
2021-08-24 20:10:31 +03:00
shortstatekey_statekey : builder . open_tree ( " shortstatekey_statekey " ) ? ,
2021-08-01 16:14:54 +03:00
2021-08-26 15:18:19 +03:00
shorteventid_authchain : builder . open_tree ( " shorteventid_authchain " ) ? ,
2021-08-01 16:14:54 +03:00
roomid_shortroomid : builder . open_tree ( " roomid_shortroomid " ) ? ,
shortstatehash_statediff : builder . open_tree ( " shortstatehash_statediff " ) ? ,
2021-06-08 19:10:00 +03:00
eventid_shorteventid : builder . open_tree ( " eventid_shorteventid " ) ? ,
shorteventid_eventid : builder . open_tree ( " shorteventid_eventid " ) ? ,
shorteventid_shortstatehash : builder . open_tree ( " shorteventid_shortstatehash " ) ? ,
roomid_shortstatehash : builder . open_tree ( " roomid_shortstatehash " ) ? ,
2021-08-25 18:36:10 +03:00
roomsynctoken_shortstatehash : builder . open_tree ( " roomsynctoken_shortstatehash " ) ? ,
2021-06-08 19:10:00 +03:00
statehash_shortstatehash : builder . open_tree ( " statehash_shortstatehash " ) ? ,
eventid_outlierpdu : builder . open_tree ( " eventid_outlierpdu " ) ? ,
2021-08-28 12:39:33 +03:00
softfailedeventids : builder . open_tree ( " softfailedeventids " ) ? ,
2021-07-29 09:36:01 +03:00
referencedevents : builder . open_tree ( " referencedevents " ) ? ,
2021-09-01 12:03:12 +03:00
pdu_cache : Mutex ::new ( LruCache ::new (
config
. pdu_cache_capacity
. try_into ( )
. expect ( " pdu cache capacity fits into usize " ) ,
) ) ,
2021-08-26 15:18:19 +03:00
auth_chain_cache : Mutex ::new ( LruCache ::new ( 1_000_000 ) ) ,
2021-08-12 18:55:16 +03:00
shorteventid_cache : Mutex ::new ( LruCache ::new ( 1_000_000 ) ) ,
2021-08-17 17:06:09 +03:00
eventidshort_cache : Mutex ::new ( LruCache ::new ( 1_000_000 ) ) ,
2021-08-24 20:10:31 +03:00
shortstatekey_cache : Mutex ::new ( LruCache ::new ( 1_000_000 ) ) ,
2021-08-17 17:06:09 +03:00
statekeyshort_cache : Mutex ::new ( LruCache ::new ( 1_000_000 ) ) ,
2021-08-28 12:39:33 +03:00
our_real_users_cache : RwLock ::new ( HashMap ::new ( ) ) ,
2021-08-29 21:00:02 +03:00
appservice_in_room_cache : RwLock ::new ( HashMap ::new ( ) ) ,
stateinfo_cache : Mutex ::new ( LruCache ::new ( 1000 ) ) ,
2020-05-03 18:25:31 +03:00
} ,
account_data : account_data ::AccountData {
2021-06-08 19:10:00 +03:00
roomuserdataid_accountdata : builder . open_tree ( " roomuserdataid_accountdata " ) ? ,
2021-07-30 13:11:06 +03:00
roomusertype_roomuserdataid : builder . open_tree ( " roomusertype_roomuserdataid " ) ? ,
2020-05-03 18:25:31 +03:00
} ,
2020-05-18 18:53:34 +03:00
media : media ::Media {
2021-06-08 19:10:00 +03:00
mediaid_file : builder . open_tree ( " mediaid_file " ) ? ,
2020-05-18 18:53:34 +03:00
} ,
2020-06-16 13:11:38 +03:00
key_backups : key_backups ::KeyBackups {
2021-06-08 19:10:00 +03:00
backupid_algorithm : builder . open_tree ( " backupid_algorithm " ) ? ,
backupid_etag : builder . open_tree ( " backupid_etag " ) ? ,
backupkeyid_backup : builder . open_tree ( " backupkeyid_backup " ) ? ,
2020-06-16 13:11:38 +03:00
} ,
2020-08-25 14:24:38 +03:00
transaction_ids : transaction_ids ::TransactionIds {
2021-06-08 19:10:00 +03:00
userdevicetxnid_response : builder . open_tree ( " userdevicetxnid_response " ) ? ,
2020-08-25 14:24:38 +03:00
} ,
2020-09-15 17:13:54 +03:00
sending : sending ::Sending {
2021-06-08 19:10:00 +03:00
servername_educount : builder . open_tree ( " servername_educount " ) ? ,
2021-07-29 21:17:47 +03:00
servernameevent_data : builder . open_tree ( " servernameevent_data " ) ? ,
servercurrentevent_data : builder . open_tree ( " servercurrentevent_data " ) ? ,
2021-03-18 02:09:57 +03:00
maximum_requests : Arc ::new ( Semaphore ::new ( config . max_concurrent_requests as usize ) ) ,
2021-06-08 19:10:00 +03:00
sender : sending_sender ,
2020-09-15 17:13:54 +03:00
} ,
2020-11-09 14:21:04 +03:00
admin : admin ::Admin {
sender : admin_sender ,
} ,
2020-12-08 12:33:44 +03:00
appservice : appservice ::Appservice {
cached_registrations : Arc ::new ( RwLock ::new ( HashMap ::new ( ) ) ) ,
2021-06-08 19:10:00 +03:00
id_appserviceregistrations : builder . open_tree ( " id_appserviceregistrations " ) ? ,
} ,
pusher : pusher ::PushData {
senderkey_pusher : builder . open_tree ( " senderkey_pusher " ) ? ,
2020-12-08 12:33:44 +03:00
} ,
2021-03-18 02:09:57 +03:00
globals : globals ::Globals ::load (
2021-06-08 19:10:00 +03:00
builder . open_tree ( " global " ) ? ,
builder . open_tree ( " server_signingkeys " ) ? ,
2021-07-14 10:07:08 +03:00
config . clone ( ) ,
2021-03-18 02:09:57 +03:00
) ? ,
2021-07-14 10:07:08 +03:00
} ) ) ;
{
let db = db . read ( ) . await ;
// MIGRATIONS
// TODO: database versions of new dbs should probably not be 0
if db . globals . database_version ( ) ? < 1 {
for ( roomserverid , _ ) in db . rooms . roomserverids . iter ( ) {
let mut parts = roomserverid . split ( | & b | b = = 0xff ) ;
let room_id = parts . next ( ) . expect ( " split always returns one element " ) ;
let servername = match parts . next ( ) {
Some ( s ) = > s ,
None = > {
error! ( " Migration: Invalid roomserverid in db. " ) ;
continue ;
}
} ;
let mut serverroomid = servername . to_vec ( ) ;
serverroomid . push ( 0xff ) ;
serverroomid . extend_from_slice ( room_id ) ;
2020-11-09 14:21:04 +03:00
2021-07-14 10:07:08 +03:00
db . rooms . serverroomids . insert ( & serverroomid , & [ ] ) ? ;
}
db . globals . bump_database_version ( 1 ) ? ;
2021-05-17 11:25:27 +03:00
2021-07-14 10:07:08 +03:00
println! ( " Migration: 0 -> 1 finished " ) ;
2021-05-17 11:25:27 +03:00
}
2021-07-14 10:07:08 +03:00
if db . globals . database_version ( ) ? < 2 {
// We accidentally inserted hashed versions of "" into the db instead of just ""
for ( userid , password ) in db . users . userid_password . iter ( ) {
let password = utils ::string_from_bytes ( & password ) ;
2021-05-17 11:25:27 +03:00
2021-07-14 10:07:08 +03:00
let empty_hashed_password = password . map_or ( false , | password | {
argon2 ::verify_encoded ( & password , b " " ) . unwrap_or ( false )
} ) ;
2021-05-17 11:25:27 +03:00
2021-07-14 10:07:08 +03:00
if empty_hashed_password {
db . users . userid_password . insert ( & userid , b " " ) ? ;
}
}
2021-05-30 22:55:43 +03:00
2021-07-14 10:07:08 +03:00
db . globals . bump_database_version ( 2 ) ? ;
2021-06-08 19:23:24 +03:00
2021-07-14 10:07:08 +03:00
println! ( " Migration: 1 -> 2 finished " ) ;
2021-05-30 22:55:43 +03:00
}
2021-07-14 10:07:08 +03:00
if db . globals . database_version ( ) ? < 3 {
// Move media to filesystem
for ( key , content ) in db . media . mediaid_file . iter ( ) {
2021-07-14 13:31:38 +03:00
if content . is_empty ( ) {
2021-07-14 10:07:08 +03:00
continue ;
}
2021-05-30 22:55:43 +03:00
2021-07-14 10:07:08 +03:00
let path = db . globals . get_media_file ( & key ) ;
let mut file = fs ::File ::create ( path ) ? ;
file . write_all ( & content ) ? ;
db . media . mediaid_file . insert ( & key , & [ ] ) ? ;
2021-06-08 19:23:24 +03:00
}
2021-07-14 10:07:08 +03:00
db . globals . bump_database_version ( 3 ) ? ;
2021-06-08 19:23:24 +03:00
2021-07-14 10:07:08 +03:00
println! ( " Migration: 2 -> 3 finished " ) ;
}
2021-06-12 19:40:33 +03:00
2021-07-14 10:07:08 +03:00
if db . globals . database_version ( ) ? < 4 {
// Add federated users to db as deactivated
for our_user in db . users . iter ( ) {
let our_user = our_user ? ;
if db . users . is_deactivated ( & our_user ) ? {
continue ;
}
for room in db . rooms . rooms_joined ( & our_user ) {
for user in db . rooms . room_members ( & room ? ) {
let user = user ? ;
if user . server_name ( ) ! = db . globals . server_name ( ) {
println! ( " Migration: Creating user {} " , user ) ;
db . users . create ( & user , None ) ? ;
}
2021-06-12 19:40:33 +03:00
}
}
}
2021-07-14 10:07:08 +03:00
db . globals . bump_database_version ( 4 ) ? ;
2021-06-12 19:40:33 +03:00
2021-07-14 10:07:08 +03:00
println! ( " Migration: 3 -> 4 finished " ) ;
}
2021-07-30 13:11:06 +03:00
if db . globals . database_version ( ) ? < 5 {
// Upgrade user data store
for ( roomuserdataid , _ ) in db . account_data . roomuserdataid_accountdata . iter ( ) {
let mut parts = roomuserdataid . split ( | & b | b = = 0xff ) ;
let room_id = parts . next ( ) . unwrap ( ) ;
2021-07-30 19:05:26 +03:00
let user_id = parts . next ( ) . unwrap ( ) ;
2021-07-30 13:11:06 +03:00
let event_type = roomuserdataid . rsplit ( | & b | b = = 0xff ) . next ( ) . unwrap ( ) ;
let mut key = room_id . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( user_id ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( event_type ) ;
db . account_data
. roomusertype_roomuserdataid
. insert ( & key , & roomuserdataid ) ? ;
}
db . globals . bump_database_version ( 5 ) ? ;
println! ( " Migration: 4 -> 5 finished " ) ;
}
2021-08-04 22:15:01 +03:00
2021-08-04 22:17:40 +03:00
if db . globals . database_version ( ) ? < 6 {
2021-08-04 22:15:01 +03:00
// Set room member count
for ( roomid , _ ) in db . rooms . roomid_shortstatehash . iter ( ) {
let room_id =
2021-11-26 22:36:40 +03:00
Box ::< RoomId > ::try_from ( utils ::string_from_bytes ( & roomid ) . unwrap ( ) )
. unwrap ( ) ;
2021-08-04 22:15:01 +03:00
2021-08-28 12:39:33 +03:00
db . rooms . update_joined_count ( & room_id , & db ) ? ;
2021-08-04 22:15:01 +03:00
}
db . globals . bump_database_version ( 6 ) ? ;
println! ( " Migration: 5 -> 6 finished " ) ;
}
2021-08-01 16:14:54 +03:00
if db . globals . database_version ( ) ? < 7 {
// Upgrade state store
2021-11-26 22:36:40 +03:00
let mut last_roomstates : HashMap < Box < RoomId > , u64 > = HashMap ::new ( ) ;
2021-08-13 00:04:00 +03:00
let mut current_sstatehash : Option < u64 > = None ;
2021-08-01 16:14:54 +03:00
let mut current_room = None ;
let mut current_state = HashSet ::new ( ) ;
let mut counter = 0 ;
2021-08-13 00:04:00 +03:00
let mut handle_state =
| current_sstatehash : u64 ,
current_room : & RoomId ,
current_state : HashSet < _ > ,
last_roomstates : & mut HashMap < _ , _ > | {
counter + = 1 ;
println! ( " counter: {} " , counter ) ;
let last_roomsstatehash = last_roomstates . get ( current_room ) ;
let states_parents = last_roomsstatehash . map_or_else (
| | Ok ( Vec ::new ( ) ) ,
| & last_roomsstatehash | {
db . rooms . load_shortstatehash_info ( dbg! ( last_roomsstatehash ) )
} ,
) ? ;
let ( statediffnew , statediffremoved ) =
if let Some ( parent_stateinfo ) = states_parents . last ( ) {
let statediffnew = current_state
. difference ( & parent_stateinfo . 1 )
2021-10-13 11:24:39 +03:00
. copied ( )
2021-08-13 00:04:00 +03:00
. collect ::< HashSet < _ > > ( ) ;
let statediffremoved = parent_stateinfo
. 1
. difference ( & current_state )
2021-10-13 11:24:39 +03:00
. copied ( )
2021-08-13 00:04:00 +03:00
. collect ::< HashSet < _ > > ( ) ;
( statediffnew , statediffremoved )
} else {
( current_state , HashSet ::new ( ) )
} ;
db . rooms . save_state_from_diff (
dbg! ( current_sstatehash ) ,
statediffnew ,
statediffremoved ,
2 , // every state change is 2 event changes on average
states_parents ,
) ? ;
/*
let mut tmp = db . rooms . load_shortstatehash_info ( & current_sstatehash , & db ) ? ;
let state = tmp . pop ( ) . unwrap ( ) ;
println! (
" {} \t {}{:?}: {:?} + {:?} - {:?} " ,
current_room ,
" " . repeat ( tmp . len ( ) ) ,
utils ::u64_from_bytes ( & current_sstatehash ) . unwrap ( ) ,
tmp . last ( ) . map ( | b | utils ::u64_from_bytes ( & b . 0 ) . unwrap ( ) ) ,
state
. 2
. iter ( )
. map ( | b | utils ::u64_from_bytes ( & b [ size_of ::< u64 > ( ) .. ] ) . unwrap ( ) )
. collect ::< Vec < _ > > ( ) ,
state
. 3
. iter ( )
. map ( | b | utils ::u64_from_bytes ( & b [ size_of ::< u64 > ( ) .. ] ) . unwrap ( ) )
. collect ::< Vec < _ > > ( )
) ;
* /
Ok ::< _ , Error > ( ( ) )
} ;
2021-08-01 16:14:54 +03:00
for ( k , seventid ) in db . _db . open_tree ( " stateid_shorteventid " ) ? . iter ( ) {
2021-08-13 00:04:00 +03:00
let sstatehash = utils ::u64_from_bytes ( & k [ 0 .. size_of ::< u64 > ( ) ] )
. expect ( " number of bytes is correct " ) ;
2021-08-01 16:14:54 +03:00
let sstatekey = k [ size_of ::< u64 > ( ) .. ] . to_vec ( ) ;
2021-08-13 00:04:00 +03:00
if Some ( sstatehash ) ! = current_sstatehash {
if let Some ( current_sstatehash ) = current_sstatehash {
handle_state (
current_sstatehash ,
2021-11-26 22:36:40 +03:00
current_room . as_deref ( ) . unwrap ( ) ,
2021-08-13 00:04:00 +03:00
current_state ,
& mut last_roomstates ,
2021-08-01 16:14:54 +03:00
) ? ;
2021-08-13 00:04:00 +03:00
last_roomstates
. insert ( current_room . clone ( ) . unwrap ( ) , current_sstatehash ) ;
2021-08-01 16:14:54 +03:00
}
current_state = HashSet ::new ( ) ;
2021-08-13 00:04:00 +03:00
current_sstatehash = Some ( sstatehash ) ;
2021-08-01 16:14:54 +03:00
let event_id = db
. rooms
. shorteventid_eventid
. get ( & seventid )
. unwrap ( )
. unwrap ( ) ;
let event_id =
2021-11-26 22:36:40 +03:00
Box ::< EventId > ::try_from ( utils ::string_from_bytes ( & event_id ) . unwrap ( ) )
2021-08-01 16:14:54 +03:00
. unwrap ( ) ;
let pdu = db . rooms . get_pdu ( & event_id ) . unwrap ( ) . unwrap ( ) ;
if Some ( & pdu . room_id ) ! = current_room . as_ref ( ) {
current_room = Some ( pdu . room_id . clone ( ) ) ;
}
}
let mut val = sstatekey ;
val . extend_from_slice ( & seventid ) ;
2021-08-13 00:04:00 +03:00
current_state . insert ( val . try_into ( ) . expect ( " size is correct " ) ) ;
}
if let Some ( current_sstatehash ) = current_sstatehash {
handle_state (
current_sstatehash ,
2021-11-26 22:36:40 +03:00
current_room . as_deref ( ) . unwrap ( ) ,
2021-08-13 00:04:00 +03:00
current_state ,
& mut last_roomstates ,
) ? ;
2021-08-01 16:14:54 +03:00
}
db . globals . bump_database_version ( 7 ) ? ;
println! ( " Migration: 6 -> 7 finished " ) ;
}
if db . globals . database_version ( ) ? < 8 {
// Generate short room ids for all rooms
for ( room_id , _ ) in db . rooms . roomid_shortstatehash . iter ( ) {
let shortroomid = db . globals . next_count ( ) ? . to_be_bytes ( ) ;
db . rooms . roomid_shortroomid . insert ( & room_id , & shortroomid ) ? ;
2021-08-02 23:32:28 +03:00
println! ( " Migration: 8 " ) ;
2021-08-01 16:14:54 +03:00
}
// Update pduids db layout
2021-08-02 23:32:28 +03:00
let mut batch = db . rooms . pduid_pdu . iter ( ) . filter_map ( | ( key , v ) | {
if ! key . starts_with ( b " ! " ) {
return None ;
}
2021-08-01 16:14:54 +03:00
let mut parts = key . splitn ( 2 , | & b | b = = 0xff ) ;
let room_id = parts . next ( ) . unwrap ( ) ;
let count = parts . next ( ) . unwrap ( ) ;
2021-08-02 23:32:28 +03:00
let short_room_id = db
. rooms
. roomid_shortroomid
2021-09-13 20:45:56 +03:00
. get ( room_id )
2021-08-02 23:32:28 +03:00
. unwrap ( )
. expect ( " shortroomid should exist " ) ;
2021-08-01 16:14:54 +03:00
let mut new_key = short_room_id ;
new_key . extend_from_slice ( count ) ;
2021-08-02 23:32:28 +03:00
Some ( ( new_key , v ) )
} ) ;
db . rooms . pduid_pdu . insert_batch ( & mut batch ) ? ;
2021-08-13 00:04:00 +03:00
let mut batch2 = db . rooms . eventid_pduid . iter ( ) . filter_map ( | ( k , value ) | {
if ! value . starts_with ( b " ! " ) {
return None ;
2021-08-02 23:32:28 +03:00
}
2021-08-13 00:04:00 +03:00
let mut parts = value . splitn ( 2 , | & b | b = = 0xff ) ;
let room_id = parts . next ( ) . unwrap ( ) ;
let count = parts . next ( ) . unwrap ( ) ;
let short_room_id = db
. rooms
. roomid_shortroomid
2021-09-13 20:45:56 +03:00
. get ( room_id )
2021-08-13 00:04:00 +03:00
. unwrap ( )
. expect ( " shortroomid should exist " ) ;
let mut new_value = short_room_id ;
new_value . extend_from_slice ( count ) ;
Some ( ( k , new_value ) )
} ) ;
db . rooms . eventid_pduid . insert_batch ( & mut batch2 ) ? ;
2021-08-01 16:14:54 +03:00
2021-08-02 23:32:28 +03:00
db . globals . bump_database_version ( 8 ) ? ;
println! ( " Migration: 7 -> 8 finished " ) ;
}
if db . globals . database_version ( ) ? < 9 {
2021-08-01 16:14:54 +03:00
// Update tokenids db layout
2021-08-31 22:20:03 +03:00
let mut iter = db
2021-08-21 15:24:10 +03:00
. rooms
. tokenids
. iter ( )
. filter_map ( | ( key , _ ) | {
if ! key . starts_with ( b " ! " ) {
return None ;
}
let mut parts = key . splitn ( 4 , | & b | b = = 0xff ) ;
let room_id = parts . next ( ) . unwrap ( ) ;
let word = parts . next ( ) . unwrap ( ) ;
let _pdu_id_room = parts . next ( ) . unwrap ( ) ;
let pdu_id_count = parts . next ( ) . unwrap ( ) ;
2021-08-01 16:14:54 +03:00
2021-08-21 15:24:10 +03:00
let short_room_id = db
. rooms
. roomid_shortroomid
2021-09-13 20:45:56 +03:00
. get ( room_id )
2021-08-21 15:24:10 +03:00
. unwrap ( )
. expect ( " shortroomid should exist " ) ;
let mut new_key = short_room_id ;
new_key . extend_from_slice ( word ) ;
new_key . push ( 0xff ) ;
new_key . extend_from_slice ( pdu_id_count ) ;
println! ( " old {:?} " , key ) ;
println! ( " new {:?} " , new_key ) ;
Some ( ( new_key , Vec ::new ( ) ) )
} )
2021-08-31 22:20:03 +03:00
. peekable ( ) ;
2021-08-02 23:32:28 +03:00
2021-08-21 15:22:21 +03:00
while iter . peek ( ) . is_some ( ) {
2021-08-21 15:24:10 +03:00
db . rooms
. tokenids
. insert_batch ( & mut iter . by_ref ( ) . take ( 1000 ) ) ? ;
2021-08-21 15:22:21 +03:00
println! ( " smaller batch done " ) ;
}
2021-08-02 23:32:28 +03:00
2021-08-21 15:22:21 +03:00
println! ( " Deleting starts " ) ;
2021-10-13 12:51:30 +03:00
let batch2 : Vec < _ > = db
2021-08-21 15:24:10 +03:00
. rooms
. tokenids
. iter ( )
. filter_map ( | ( key , _ ) | {
if key . starts_with ( b " ! " ) {
println! ( " del {:?} " , key ) ;
Some ( key )
} else {
None
}
} )
2021-10-13 12:51:30 +03:00
. collect ( ) ;
2021-08-21 15:22:21 +03:00
for key in batch2 {
println! ( " del " ) ;
db . rooms . tokenids . remove ( & key ) ? ;
2021-08-01 16:14:54 +03:00
}
2021-08-02 23:32:28 +03:00
db . globals . bump_database_version ( 9 ) ? ;
2021-08-01 16:14:54 +03:00
2021-08-02 23:32:28 +03:00
println! ( " Migration: 8 -> 9 finished " ) ;
2021-08-01 16:14:54 +03:00
}
2021-08-24 20:10:31 +03:00
if db . globals . database_version ( ) ? < 10 {
// Add other direction for shortstatekeys
for ( statekey , shortstatekey ) in db . rooms . statekey_shortstatekey . iter ( ) {
db . rooms
. shortstatekey_statekey
. insert ( & shortstatekey , & statekey ) ? ;
}
2021-08-25 18:40:10 +03:00
// Force E2EE device list updates so we can send them over federation
for user_id in db . users . iter ( ) . filter_map ( | r | r . ok ( ) ) {
db . users
. mark_device_key_update ( & user_id , & db . rooms , & db . globals ) ? ;
}
2021-08-24 20:10:31 +03:00
db . globals . bump_database_version ( 10 ) ? ;
println! ( " Migration: 9 -> 10 finished " ) ;
}
2021-06-12 19:40:33 +03:00
}
2021-07-14 10:07:08 +03:00
let guard = db . read ( ) . await ;
2021-05-12 21:04:28 +03:00
// This data is probably outdated
2021-07-14 10:07:08 +03:00
guard . rooms . edus . presenceid_presence . clear ( ) ? ;
guard . admin . start_handler ( Arc ::clone ( & db ) , admin_receiver ) ;
guard
. sending
. start_handler ( Arc ::clone ( & db ) , sending_receiver ) ;
2021-05-12 21:04:28 +03:00
2021-07-14 10:07:08 +03:00
drop ( guard ) ;
#[ cfg(feature = " sqlite " ) ]
2021-07-19 16:56:20 +03:00
{
2021-09-13 20:45:56 +03:00
Self ::start_wal_clean_task ( Arc ::clone ( & db ) , config ) . await ;
2021-07-19 16:56:20 +03:00
}
2020-11-09 14:21:04 +03:00
Ok ( db )
2020-03-30 14:46:18 +03:00
}
2020-07-27 18:36:54 +03:00
2021-07-14 15:50:07 +03:00
#[ cfg(feature = " conduit_bin " ) ]
pub async fn start_on_shutdown_tasks ( db : Arc < TokioRwLock < Self > > , shutdown : Shutdown ) {
2021-07-29 09:36:01 +03:00
use tracing ::info ;
2021-07-14 15:50:07 +03:00
tokio ::spawn ( async move {
shutdown . await ;
2021-07-29 09:36:01 +03:00
info! ( target : " shutdown-sync " , " Received shutdown notification, notifying sync helpers... " ) ;
2021-07-14 15:50:07 +03:00
db . read ( ) . await . globals . rotate . fire ( ) ;
} ) ;
}
2020-07-28 16:00:23 +03:00
pub async fn watch ( & self , user_id : & UserId , device_id : & DeviceId ) {
2021-04-05 22:44:21 +03:00
let userid_bytes = user_id . as_bytes ( ) . to_vec ( ) ;
2020-07-30 15:05:08 +03:00
let mut userid_prefix = userid_bytes . clone ( ) ;
2020-07-27 18:36:54 +03:00
userid_prefix . push ( 0xff ) ;
2020-07-30 15:05:08 +03:00
2020-07-27 18:36:54 +03:00
let mut userdeviceid_prefix = userid_prefix . clone ( ) ;
userdeviceid_prefix . extend_from_slice ( device_id . as_bytes ( ) ) ;
userdeviceid_prefix . push ( 0xff ) ;
2021-06-08 19:10:00 +03:00
let mut futures = FuturesUnordered ::new ( ) ;
2020-07-27 18:36:54 +03:00
// Return when *any* user changed his key
// TODO: only send for user they share a room with
futures . push (
self . users
. todeviceid_events
. watch_prefix ( & userdeviceid_prefix ) ,
) ;
futures . push ( self . rooms . userroomid_joined . watch_prefix ( & userid_prefix ) ) ;
2021-04-11 22:01:27 +03:00
futures . push (
self . rooms
. userroomid_invitestate
. watch_prefix ( & userid_prefix ) ,
) ;
2021-04-13 16:00:45 +03:00
futures . push ( self . rooms . userroomid_leftstate . watch_prefix ( & userid_prefix ) ) ;
2021-08-03 12:24:21 +03:00
futures . push (
self . rooms
. userroomid_notificationcount
. watch_prefix ( & userid_prefix ) ,
) ;
futures . push (
self . rooms
. userroomid_highlightcount
. watch_prefix ( & userid_prefix ) ,
) ;
2020-07-27 18:36:54 +03:00
// Events for rooms we are in
for room_id in self . rooms . rooms_joined ( user_id ) . filter_map ( | r | r . ok ( ) ) {
2021-09-08 15:50:44 +03:00
let short_roomid = self
. rooms
. get_shortroomid ( & room_id )
. ok ( )
. flatten ( )
. expect ( " room exists " )
. to_be_bytes ( )
. to_vec ( ) ;
2021-04-05 22:44:21 +03:00
let roomid_bytes = room_id . as_bytes ( ) . to_vec ( ) ;
2020-08-22 23:02:32 +03:00
let mut roomid_prefix = roomid_bytes . clone ( ) ;
2020-07-27 18:36:54 +03:00
roomid_prefix . push ( 0xff ) ;
// PDUs
2021-09-08 15:50:44 +03:00
futures . push ( self . rooms . pduid_pdu . watch_prefix ( & short_roomid ) ) ;
2020-07-27 18:36:54 +03:00
// EDUs
futures . push (
self . rooms
. edus
2020-08-23 18:29:39 +03:00
. roomid_lasttypingupdate
2020-08-22 23:02:32 +03:00
. watch_prefix ( & roomid_bytes ) ,
2020-07-27 18:36:54 +03:00
) ;
futures . push (
self . rooms
. edus
2020-08-23 18:29:39 +03:00
. readreceiptid_readreceipt
2020-07-27 18:36:54 +03:00
. watch_prefix ( & roomid_prefix ) ,
) ;
2020-07-30 15:05:08 +03:00
// Key changes
futures . push ( self . users . keychangeid_userid . watch_prefix ( & roomid_prefix ) ) ;
2020-07-27 18:36:54 +03:00
// Room account data
let mut roomuser_prefix = roomid_prefix . clone ( ) ;
roomuser_prefix . extend_from_slice ( & userid_prefix ) ;
futures . push (
self . account_data
2021-07-30 13:11:06 +03:00
. roomusertype_roomuserdataid
2020-07-27 18:36:54 +03:00
. watch_prefix ( & roomuser_prefix ) ,
) ;
}
let mut globaluserdata_prefix = vec! [ 0xff ] ;
globaluserdata_prefix . extend_from_slice ( & userid_prefix ) ;
futures . push (
self . account_data
2021-07-30 13:11:06 +03:00
. roomusertype_roomuserdataid
2020-07-27 18:36:54 +03:00
. watch_prefix ( & globaluserdata_prefix ) ,
) ;
2020-07-30 15:05:08 +03:00
// More key changes (used when user is not joined to any rooms)
futures . push ( self . users . keychangeid_userid . watch_prefix ( & userid_prefix ) ) ;
// One time keys
futures . push (
self . users
. userid_lastonetimekeyupdate
. watch_prefix ( & userid_bytes ) ,
) ;
2021-07-14 10:07:08 +03:00
futures . push ( Box ::pin ( self . globals . rotate . watch ( ) ) ) ;
2020-07-27 18:36:54 +03:00
// Wait until one of them finds something
futures . next ( ) . await ;
}
2020-10-21 22:28:02 +03:00
2021-07-29 09:36:01 +03:00
#[ tracing::instrument(skip(self)) ]
2021-08-02 11:13:34 +03:00
pub fn flush ( & self ) -> Result < ( ) > {
2021-07-14 10:07:08 +03:00
let start = std ::time ::Instant ::now ( ) ;
let res = self . _db . flush ( ) ;
2021-07-29 09:36:01 +03:00
debug! ( " flush: took {:?} " , start . elapsed ( ) ) ;
2021-07-14 10:07:08 +03:00
res
}
#[ cfg(feature = " sqlite " ) ]
2021-07-29 09:36:01 +03:00
#[ tracing::instrument(skip(self)) ]
2021-07-14 10:07:08 +03:00
pub fn flush_wal ( & self ) -> Result < ( ) > {
self . _db . flush_wal ( )
}
#[ cfg(feature = " sqlite " ) ]
2021-08-01 17:59:52 +03:00
#[ tracing::instrument(skip(db, config)) ]
pub async fn start_wal_clean_task ( db : Arc < TokioRwLock < Self > > , config : & Config ) {
use tokio ::time ::interval ;
2021-07-15 19:09:10 +03:00
#[ cfg(unix) ]
use tokio ::signal ::unix ::{ signal , SignalKind } ;
2021-07-29 09:36:01 +03:00
use tracing ::info ;
2021-07-14 10:07:08 +03:00
2021-08-01 17:59:52 +03:00
use std ::time ::{ Duration , Instant } ;
2021-07-14 10:07:08 +03:00
let timer_interval = Duration ::from_secs ( config . sqlite_wal_clean_second_interval as u64 ) ;
tokio ::spawn ( async move {
let mut i = interval ( timer_interval ) ;
2021-07-15 19:09:10 +03:00
#[ cfg(unix) ]
2021-07-14 10:07:08 +03:00
let mut s = signal ( SignalKind ::hangup ( ) ) . unwrap ( ) ;
loop {
2021-07-15 19:09:10 +03:00
#[ cfg(unix) ]
tokio ::select! {
2021-08-01 17:59:52 +03:00
_ = i . tick ( ) = > {
info! ( " wal-trunc: Timer ticked " ) ;
2021-07-14 10:07:08 +03:00
}
_ = s . recv ( ) = > {
2021-08-01 17:59:52 +03:00
info! ( " wal-trunc: Received SIGHUP " ) ;
2021-07-14 10:07:08 +03:00
}
} ;
2021-07-15 19:09:10 +03:00
#[ cfg(not(unix)) ]
2021-08-01 17:59:52 +03:00
{
2021-07-15 19:09:10 +03:00
i . tick ( ) . await ;
2021-08-01 17:59:52 +03:00
info! ( " wal-trunc: Timer ticked " )
2021-07-15 19:09:10 +03:00
}
2021-08-01 17:59:52 +03:00
let start = Instant ::now ( ) ;
if let Err ( e ) = db . read ( ) . await . flush_wal ( ) {
error! ( " wal-trunc: Errored: {} " , e ) ;
2021-07-14 10:07:08 +03:00
} else {
2021-08-01 17:59:52 +03:00
info! ( " wal-trunc: Flushed in {:?} " , start . elapsed ( ) ) ;
2021-07-14 10:07:08 +03:00
}
}
} ) ;
}
}
pub struct DatabaseGuard ( OwnedRwLockReadGuard < Database > ) ;
impl Deref for DatabaseGuard {
type Target = OwnedRwLockReadGuard < Database > ;
fn deref ( & self ) -> & Self ::Target {
& self . 0
}
}
#[ rocket::async_trait ]
impl < ' r > FromRequest < ' r > for DatabaseGuard {
type Error = ( ) ;
async fn from_request ( req : & ' r Request < '_ > ) -> rocket ::request ::Outcome < Self , ( ) > {
2021-07-14 12:28:24 +03:00
let db = try_outcome! ( req . guard ::< & State < Arc < TokioRwLock < Database > > > > ( ) . await ) ;
2021-07-14 10:07:08 +03:00
2021-09-13 20:45:56 +03:00
Ok ( DatabaseGuard ( Arc ::clone ( db ) . read_owned ( ) . await ) ) . or_forward ( ( ) )
2021-07-14 10:07:08 +03:00
}
}
2021-07-14 13:31:38 +03:00
impl From < OwnedRwLockReadGuard < Database > > for DatabaseGuard {
fn from ( val : OwnedRwLockReadGuard < Database > ) -> Self {
Self ( val )
2020-10-21 22:28:02 +03:00
}
2020-03-30 14:46:18 +03:00
}