1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-04-22 14:10:16 +03:00

feat: add S3 client for media storage

This commit is contained in:
AndSDev 2024-10-22 16:11:58 +03:00
parent 12ada1c86a
commit ae05ca1d46
9 changed files with 271 additions and 34 deletions

39
Cargo.lock generated
View file

@ -523,6 +523,7 @@ dependencies = [
"rusqlite",
"rust-argon2",
"rust-rocksdb",
"rusty-s3",
"sd-notify",
"serde",
"serde_html_form",
@ -1602,6 +1603,15 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "md-5"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
dependencies = [
"digest",
]
[[package]]
name = "memchr"
version = "2.7.2"
@ -2059,6 +2069,16 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quick-xml"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quote"
version = "1.0.36"
@ -2601,6 +2621,25 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6"
[[package]]
name = "rusty-s3"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31aa883f1b986a5249641e574ca0e11ac4fb9970b009c6fbb96fedaf4fa78db8"
dependencies = [
"base64 0.21.7",
"hmac",
"md-5",
"percent-encoding",
"quick-xml",
"serde",
"serde_json",
"sha2",
"time",
"url",
"zeroize",
]
[[package]]
name = "ryu"
version = "1.0.18"

View file

@ -146,6 +146,7 @@ tikv-jemallocator = { version = "0.5.0", features = [
], optional = true }
sd-notify = { version = "0.4.1", optional = true }
rusty-s3 = "0.5.0"
# Used for matrix spec type definitions and helpers
[dependencies.ruma]

View file

@ -11,4 +11,5 @@
- [NixOS](deploying/nixos.md)
- [TURN](turn.md)
- [Appservices](appservices.md)
- [S3 media storage](s3.md)
- [FAQ](faq.md)

33
docs/s3.md Normal file
View file

@ -0,0 +1,33 @@
# Setting up S3 for media storage
## Edit/Add a few settings to your existing conduit.toml
Setup S3 parameters from your S3 provider.
```
[global.media_storage]
type = "s3"
endpoint = "http://minio:9000"
bucket = "test"
region = "minio"
key = "omBkn6OM4vvy1Fyb"
secret = "Sn1MvqDgNOR1P0muhaH2aKOEph4Tj2qp"
bucket_use_path = true
```
Or you can use environment variables
```
CONDUIT_MEDIA_STORAGE_TYPE=s3
CONDUIT_MEDIA_STORAGE_ENDPOINT=http://minio:9000
CONDUIT_MEDIA_STORAGE_BUCKET=test
CONDUIT_MEDIA_STORAGE_REGION=minio
CONDUIT_MEDIA_STORAGE_KEY=omBkn6OM4vvy1Fyb
CONDUIT_MEDIA_STORAGE_SECRET=Sn1MvqDgNOR1P0muhaH2aKOEph4Tj2qp
CONDUIT_MEDIA_STORAGE_DURATION=15
CONDUIT_MEDIA_STORAGE_BUCKET_USE_PATH=true
```
## Migration
For migration, you can manually copy all files from the media folder to S3 storage (or all files from S3 storage to the media folder).

View file

@ -85,6 +85,9 @@ pub struct Config {
pub emergency_password: Option<String>,
#[serde(default)]
pub media_storage: MediaStorage,
#[serde(flatten)]
pub catchall: BTreeMap<String, IgnoredAny>,
}
@ -101,6 +104,25 @@ pub struct WellKnownConfig {
pub server: Option<OwnedServerName>,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[derive(Default)]
pub enum MediaStorage {
#[default]
File,
S3 {
endpoint: Url,
bucket: String,
region: String,
key: String,
secret: String,
#[serde(default = "default_s3_duration")]
duration: u64,
#[serde(default = "false_fn")]
bucket_use_path: bool,
},
}
const DEPRECATED_KEYS: &[&str] = &["cache_capacity"];
impl Config {
@ -232,6 +254,21 @@ impl fmt::Display for Config {
}),
("Well-known server name", well_known_server.as_str()),
("Well-known client URL", &self.well_known_client()),
(
"Media storage",
&match &self.media_storage {
MediaStorage::File => "file storage".to_owned(),
MediaStorage::S3 {
endpoint,
bucket: _,
region,
key: _,
secret: _,
duration: _,
bucket_use_path: _,
} => format!("S3 storage {} [{}]", endpoint, region),
},
),
];
let mut msg: String = "Active config values:\n\n".to_owned();
@ -312,3 +349,7 @@ fn default_openid_token_ttl() -> u64 {
pub fn default_default_room_version() -> RoomVersionId {
RoomVersionId::V10
}
fn default_s3_duration() -> u64 {
30
}

View file

@ -24,7 +24,6 @@ use serde::Deserialize;
use std::{
collections::{BTreeMap, HashMap, HashSet},
fs::{self, remove_dir_all},
io::Write,
mem::size_of,
path::Path,
sync::{Arc, Mutex, RwLock},
@ -478,9 +477,7 @@ impl KeyValueDatabase {
continue;
}
let path = services().globals.get_media_file(&key);
let mut file = fs::File::create(path)?;
file.write_all(&content)?;
services().globals.media_storage.put(&key, &content).await?;
db.mediaid_file.insert(&key, &[])?;
}

View file

@ -45,7 +45,7 @@ use tikv_jemallocator::Jemalloc;
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
static SUB_TABLES: [&str; 2] = ["well_known", "tls"]; // Not doing `proxy` cause setting that with env vars would be a pain
static SUB_TABLES: [&str; 3] = ["well_known", "tls", "media_storage"]; // Not doing `proxy` cause setting that with env vars would be a pain
#[tokio::main]
async fn main() {

View file

@ -4,6 +4,10 @@ use ruma::{
serde::Base64, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedRoomAliasId,
OwnedRoomId, OwnedServerName, OwnedUserId, RoomAliasId,
};
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt, BufReader},
};
use crate::api::server_server::DestinationResponse;
@ -16,6 +20,7 @@ use ruma::{
api::{client::sync::sync_events, federation::discovery::ServerSigningKeys},
DeviceId, RoomVersionId, ServerName, UserId,
};
use rusty_s3::S3Action;
use std::{
collections::{BTreeMap, HashMap},
error::Error as StdError,
@ -33,9 +38,9 @@ use std::{
};
use tokio::sync::{broadcast, watch::Receiver, Mutex, RwLock, Semaphore};
use tower_service::Service as TowerService;
use tracing::{error, info};
use tracing::{error, info, warn};
use base64::{engine::general_purpose, Engine as _};
use base64::{engine::general_purpose, prelude, Engine as _};
type WellKnownMap = HashMap<OwnedServerName, DestinationResponse>;
type TlsNameMap = HashMap<String, (Vec<IpAddr>, u16)>;
@ -74,6 +79,7 @@ pub struct Service {
pub rotate: RotationHandler,
pub shutdown: AtomicBool,
pub media_storage: MediaStorage,
}
/// Handles "rotation" of long-polling requests. "Rotation" in this context is similar to "rotation" of log files and the like.
@ -190,6 +196,8 @@ impl Service {
// Experimental, partially supported room versions
let unstable_room_versions = vec![RoomVersionId::V3, RoomVersionId::V4, RoomVersionId::V5];
let media_storage = media_storage_builder(&config)?;
let mut s = Self {
allow_registration: RwLock::new(config.allow_registration),
admin_alias: RoomAliasId::parse(format!("#admins:{}", &config.server_name))
@ -225,10 +233,9 @@ impl Service {
sync_receivers: RwLock::new(HashMap::new()),
rotate: RotationHandler::new(),
shutdown: AtomicBool::new(false),
media_storage,
};
fs::create_dir_all(s.get_media_folder())?;
if !s
.supported_room_versions()
.contains(&s.config.default_room_version)
@ -525,3 +532,135 @@ fn reqwest_client_builder(config: &Config) -> Result<reqwest::ClientBuilder> {
Ok(reqwest_client_builder)
}
fn media_storage_builder(config: &Config) -> Result<MediaStorage> {
match &config.media_storage {
crate::config::MediaStorage::File => {
let mut path = PathBuf::new();
path.push(config.database_path.clone());
path.push("media");
fs::create_dir_all(&path)?;
Ok(MediaStorage::File { path })
}
crate::config::MediaStorage::S3 {
endpoint,
bucket,
region,
key,
secret,
duration,
bucket_use_path,
} => {
let path_style = if *bucket_use_path {
rusty_s3::UrlStyle::Path
} else {
rusty_s3::UrlStyle::VirtualHost
};
let bucket =
rusty_s3::Bucket::new(endpoint.clone(), path_style, bucket.clone(), region.clone())
.map_err(|_| Error::bad_config("Invalid S3 config."))?;
let credentials = rusty_s3::Credentials::new(key, secret);
Ok(MediaStorage::S3 {
bucket,
credentials,
duration: Duration::from_secs(*duration),
http_client: reqwest_client_builder(config)?.build()?,
})
}
}
}
pub enum MediaStorage {
File {
path: PathBuf,
},
S3 {
bucket: rusty_s3::Bucket,
credentials: rusty_s3::Credentials,
duration: Duration,
http_client: reqwest::Client,
},
}
impl MediaStorage {
pub async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
match self {
MediaStorage::File { path } => {
let mut fname = PathBuf::new();
fname.push(path);
fname.push(general_purpose::URL_SAFE_NO_PAD.encode(key));
let mut file = Vec::new();
BufReader::new(File::open(fname).await?)
.read_to_end(&mut file)
.await?;
Ok(file)
}
MediaStorage::S3 {
bucket,
credentials,
duration,
http_client,
} => {
let url = bucket
.get_object(
Some(credentials),
&prelude::BASE64_URL_SAFE_NO_PAD.encode(key),
)
.sign(*duration);
let resp = http_client.get(url).send().await?;
if !resp.status().is_success() {
warn!("S3 request error:\n{}", resp.text().await?);
return Err(Error::bad_database("Cannot read media file"));
}
Ok(resp.bytes().await?.to_vec())
}
}
}
pub async fn put(&self, key: &[u8], data: &[u8]) -> Result<()> {
match self {
MediaStorage::File { path } => {
let mut fname = PathBuf::new();
fname.push(path);
fname.push(general_purpose::URL_SAFE_NO_PAD.encode(key));
let mut f = File::create(fname).await?;
f.write_all(data).await?;
Ok(())
}
MediaStorage::S3 {
bucket,
credentials,
duration,
http_client,
} => {
let url = bucket
.put_object(
Some(credentials),
&prelude::BASE64_URL_SAFE_NO_PAD.encode(key),
)
.sign(*duration);
let resp = http_client.put(url).body(data.to_vec()).send().await?;
if !resp.status().is_success() {
warn!("S3 request error:\n{}", resp.text().await?);
return Err(Error::bad_database("Cannot create media file"));
}
Ok(())
}
}
}
}

View file

@ -7,11 +7,6 @@ use ruma::http_headers::{ContentDisposition, ContentDispositionType};
use crate::{services, Result};
use image::imageops::FilterType;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt, BufReader},
};
pub struct FileMeta {
pub content_disposition: ContentDisposition,
pub content_type: Option<String>,
@ -39,9 +34,8 @@ impl Service {
.db
.create_file_metadata(mxc, 0, 0, &content_disposition, content_type)?;
let path = services().globals.get_media_file(&key);
let mut f = File::create(path).await?;
f.write_all(file).await?;
services().globals.media_storage.put(&key, file).await?;
Ok(())
}
@ -63,9 +57,7 @@ impl Service {
content_type,
)?;
let path = services().globals.get_media_file(&key);
let mut f = File::create(path).await?;
f.write_all(file).await?;
services().globals.media_storage.put(&key, file).await?;
Ok(())
}
@ -75,11 +67,7 @@ impl Service {
if let Ok((content_disposition, content_type, key)) =
self.db.search_file_metadata(mxc, 0, 0)
{
let path = services().globals.get_media_file(&key);
let mut file = Vec::new();
BufReader::new(File::open(path).await?)
.read_to_end(&mut file)
.await?;
let file = services().globals.media_storage.get(&key).await?;
Ok(Some(FileMeta {
content_disposition,
@ -128,9 +116,7 @@ impl Service {
self.db.search_file_metadata(mxc.clone(), width, height)
{
// Using saved thumbnail
let path = services().globals.get_media_file(&key);
let mut file = Vec::new();
File::open(path).await?.read_to_end(&mut file).await?;
let file = services().globals.media_storage.get(&key).await?;
Ok(Some(FileMeta {
content_disposition,
@ -141,9 +127,7 @@ impl Service {
self.db.search_file_metadata(mxc.clone(), 0, 0)
{
// Generate a thumbnail
let path = services().globals.get_media_file(&key);
let mut file = Vec::new();
File::open(path).await?.read_to_end(&mut file).await?;
let file = services().globals.media_storage.get(&key).await?;
if let Ok(image) = image::load_from_memory(&file) {
let original_width = image.width();
@ -209,9 +193,11 @@ impl Service {
content_type.as_deref(),
)?;
let path = services().globals.get_media_file(&thumbnail_key);
let mut f = File::create(path).await?;
f.write_all(&thumbnail_bytes).await?;
services()
.globals
.media_storage
.put(&thumbnail_key, &thumbnail_bytes)
.await?;
Ok(Some(FileMeta {
content_disposition,