mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-04-22 14:10:16 +03:00
feat: add to media storage migration support
This commit is contained in:
parent
ae05ca1d46
commit
767db4e4a1
2 changed files with 147 additions and 29 deletions
|
@ -943,11 +943,8 @@ impl KeyValueDatabase {
|
|||
// Reconstruct all media using the filesystem
|
||||
db.mediaid_file.clear().unwrap();
|
||||
|
||||
for file in fs::read_dir(services().globals.get_media_folder()).unwrap() {
|
||||
let file = file.unwrap();
|
||||
let mediaid = general_purpose::URL_SAFE_NO_PAD
|
||||
.decode(file.file_name().into_string().unwrap())
|
||||
.unwrap();
|
||||
for file in services().globals.media_storage.list().await? {
|
||||
let mediaid = general_purpose::URL_SAFE_NO_PAD.decode(file).unwrap();
|
||||
|
||||
let mut parts = mediaid.rsplit(|&b| b == 0xff);
|
||||
|
||||
|
@ -992,19 +989,22 @@ impl KeyValueDatabase {
|
|||
new_key.extend_from_slice(content_type_bytes);
|
||||
|
||||
// Some file names are too long. Ignore those.
|
||||
match fs::rename(
|
||||
services().globals.get_media_file(&mediaid),
|
||||
services().globals.get_media_file(&new_key),
|
||||
) {
|
||||
match services()
|
||||
.globals
|
||||
.media_storage
|
||||
.rename(&mediaid, &new_key)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
db.mediaid_file.insert(&new_key, &[])?;
|
||||
}
|
||||
Err(_) => {
|
||||
fs::rename(
|
||||
services().globals.get_media_file(&mediaid),
|
||||
services().globals.get_media_file(&shorter_key),
|
||||
)
|
||||
.unwrap();
|
||||
services()
|
||||
.globals
|
||||
.media_storage
|
||||
.rename(&mediaid, &shorter_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.mediaid_file.insert(&shorter_key, &[])?;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ use ruma::{
|
|||
OwnedRoomId, OwnedServerName, OwnedUserId, RoomAliasId,
|
||||
};
|
||||
use tokio::{
|
||||
fs::File,
|
||||
fs::{self, File},
|
||||
io::{AsyncReadExt, AsyncWriteExt, BufReader},
|
||||
};
|
||||
|
||||
|
@ -20,11 +20,10 @@ use ruma::{
|
|||
api::{client::sync::sync_events, federation::discovery::ServerSigningKeys},
|
||||
DeviceId, RoomVersionId, ServerName, UserId,
|
||||
};
|
||||
use rusty_s3::S3Action;
|
||||
use rusty_s3::{actions::ListObjectsV2, S3Action};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
error::Error as StdError,
|
||||
fs,
|
||||
future::{self, Future},
|
||||
iter,
|
||||
net::{IpAddr, SocketAddr},
|
||||
|
@ -40,7 +39,7 @@ use tokio::sync::{broadcast, watch::Receiver, Mutex, RwLock, Semaphore};
|
|||
use tower_service::Service as TowerService;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use base64::{engine::general_purpose, prelude, Engine as _};
|
||||
use base64::{engine::general_purpose, Engine as _};
|
||||
|
||||
type WellKnownMap = HashMap<OwnedServerName, DestinationResponse>;
|
||||
type TlsNameMap = HashMap<String, (Vec<IpAddr>, u16)>;
|
||||
|
@ -540,7 +539,7 @@ fn media_storage_builder(config: &Config) -> Result<MediaStorage> {
|
|||
path.push(config.database_path.clone());
|
||||
path.push("media");
|
||||
|
||||
fs::create_dir_all(&path)?;
|
||||
std::fs::create_dir_all(&path)?;
|
||||
|
||||
Ok(MediaStorage::File { path })
|
||||
}
|
||||
|
@ -588,12 +587,18 @@ pub enum MediaStorage {
|
|||
}
|
||||
|
||||
impl MediaStorage {
|
||||
fn get_file_name(key: &[u8]) -> String {
|
||||
general_purpose::URL_SAFE_NO_PAD.encode(key)
|
||||
}
|
||||
|
||||
pub async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||
let key = Self::get_file_name(key);
|
||||
|
||||
match self {
|
||||
MediaStorage::File { path } => {
|
||||
let mut fname = PathBuf::new();
|
||||
fname.push(path);
|
||||
fname.push(general_purpose::URL_SAFE_NO_PAD.encode(key));
|
||||
fname.push(key);
|
||||
|
||||
let mut file = Vec::new();
|
||||
BufReader::new(File::open(fname).await?)
|
||||
|
@ -609,10 +614,7 @@ impl MediaStorage {
|
|||
http_client,
|
||||
} => {
|
||||
let url = bucket
|
||||
.get_object(
|
||||
Some(credentials),
|
||||
&prelude::BASE64_URL_SAFE_NO_PAD.encode(key),
|
||||
)
|
||||
.get_object(Some(credentials), &key)
|
||||
.sign(*duration);
|
||||
|
||||
let resp = http_client.get(url).send().await?;
|
||||
|
@ -628,11 +630,13 @@ impl MediaStorage {
|
|||
}
|
||||
|
||||
pub async fn put(&self, key: &[u8], data: &[u8]) -> Result<()> {
|
||||
let key = Self::get_file_name(key);
|
||||
|
||||
match self {
|
||||
MediaStorage::File { path } => {
|
||||
let mut fname = PathBuf::new();
|
||||
fname.push(path);
|
||||
fname.push(general_purpose::URL_SAFE_NO_PAD.encode(key));
|
||||
fname.push(key);
|
||||
|
||||
let mut f = File::create(fname).await?;
|
||||
f.write_all(data).await?;
|
||||
|
@ -646,10 +650,7 @@ impl MediaStorage {
|
|||
http_client,
|
||||
} => {
|
||||
let url = bucket
|
||||
.put_object(
|
||||
Some(credentials),
|
||||
&prelude::BASE64_URL_SAFE_NO_PAD.encode(key),
|
||||
)
|
||||
.put_object(Some(credentials), &key)
|
||||
.sign(*duration);
|
||||
|
||||
let resp = http_client.put(url).body(data.to_vec()).send().await?;
|
||||
|
@ -663,4 +664,121 @@ impl MediaStorage {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list(&self) -> Result<Vec<String>> {
|
||||
match self {
|
||||
MediaStorage::File { path } => {
|
||||
let os_entries = std::fs::read_dir(path)?
|
||||
.map(|res| res.map(|e| e.file_name()))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let entries = os_entries
|
||||
.iter()
|
||||
.map(|res| res.clone().into_string())
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|_| Error::BadDatabase("failed to read media dir file name"))?;
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
MediaStorage::S3 {
|
||||
bucket,
|
||||
credentials,
|
||||
duration,
|
||||
http_client,
|
||||
} => {
|
||||
let mut result = vec![];
|
||||
|
||||
let mut req = bucket.list_objects_v2(Some(credentials));
|
||||
loop {
|
||||
let resp = http_client.get(req.sign(*duration)).send().await?;
|
||||
if !resp.status().is_success() {
|
||||
warn!("S3 request error:\n{}", resp.text().await?);
|
||||
return Err(Error::bad_database("Cannot list media files"));
|
||||
}
|
||||
|
||||
let resp = resp.text().await?;
|
||||
|
||||
let resp = ListObjectsV2::parse_response(&resp).map_err(|e| {
|
||||
warn!("Incorrect S3 response: {:?}", e);
|
||||
Error::BadDatabase("Incorrect S3 response")
|
||||
})?;
|
||||
|
||||
let mut t = resp.contents.iter().map(|v| v.key.clone()).collect();
|
||||
result.append(&mut t);
|
||||
|
||||
if let Some(token) = resp.next_continuation_token {
|
||||
req.with_continuation_token(token);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete(&self, key: &[u8]) -> Result<()> {
|
||||
let key = Self::get_file_name(key);
|
||||
|
||||
match self {
|
||||
MediaStorage::File { path } => {
|
||||
let mut fname = PathBuf::new();
|
||||
fname.push(path);
|
||||
fname.push(key);
|
||||
|
||||
fs::remove_file(fname).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
MediaStorage::S3 {
|
||||
bucket,
|
||||
credentials,
|
||||
duration,
|
||||
http_client,
|
||||
} => {
|
||||
let url = bucket
|
||||
.delete_object(Some(credentials), &key)
|
||||
.sign(*duration);
|
||||
|
||||
let resp = http_client.delete(url).send().await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
warn!("S3 request error:\n{}", resp.text().await?);
|
||||
return Err(Error::bad_database("Cannot delete media file"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn rename(&self, key_old: &[u8], key_new: &[u8]) -> Result<()> {
|
||||
if key_old == key_new {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match self {
|
||||
MediaStorage::File { path } => {
|
||||
let mut fname_old = PathBuf::new();
|
||||
fname_old.push(path);
|
||||
fname_old.push(Self::get_file_name(key_old));
|
||||
|
||||
let mut fname_new = PathBuf::new();
|
||||
fname_new.push(path);
|
||||
fname_new.push(Self::get_file_name(key_new));
|
||||
|
||||
fs::rename(fname_old, fname_new).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// Common way if raw-renaming not supported
|
||||
_ => {
|
||||
let data = self.get(key_old).await?;
|
||||
self.put(key_new, &data).await?;
|
||||
self.delete(key_old).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue