From 1e0cdafbcfb6b7823591f7aee9cbc83d23ea4bd9 Mon Sep 17 00:00:00 2001 From: sigoden Date: Sat, 2 Jul 2022 22:55:22 +0800 Subject: [PATCH] fix: unexpect stack overflow when searching a lot (#87) --- Cargo.lock | 117 +-------------------------------------------- Cargo.toml | 2 +- src/main.rs | 12 +++-- src/server.rs | 130 +++++++++++++++++++++++++++++--------------------- 4 files changed, 87 insertions(+), 174 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05d9aa1..0299df2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -54,17 +54,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "async-channel" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - [[package]] name = "async-compression" version = "0.3.14" @@ -78,26 +67,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "async-fs" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b3ca4f8ff117c37c278a2f7415ce9be55560b846b5bc4412aaa5d29c1c3dae2" -dependencies = [ - "async-lock", - "blocking", - "futures-lite", -] - -[[package]] -name = "async-lock" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6" -dependencies = [ - "event-listener", -] - [[package]] name = "async-stream" version = "0.3.3" @@ -119,12 +88,6 @@ dependencies = [ "syn", ] -[[package]] -name = "async-task" -version = "4.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9" - [[package]] name = "async-trait" version = "0.1.56" @@ -136,16 +99,6 @@ dependencies = [ "syn", ] -[[package]] -name = "async-walkdir" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "826d88d73e87e7504b635b6e427561faa6a65f4a2f59e75efcbfa51a0876bb90" -dependencies = [ - "async-fs", - "futures-lite", -] - [[package]] name = "async_io_utilities" version = "0.1.3" @@ -169,12 +122,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "atomic-waker" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" - [[package]] name = "autocfg" version = "1.1.0" @@ -226,20 +173,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blocking" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6ccb65d468978a086b69884437ded69a90faab3bbe6e67f242173ea728acccc" -dependencies = [ - "async-channel", - "async-task", - "atomic-waker", - "fastrand", - "futures-lite", - "once_cell", -] - [[package]] name = "bstr" version = "0.2.17" @@ -263,12 +196,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" -[[package]] -name = "cache-padded" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" - [[package]] name = "cc" version = "1.0.73" @@ -316,15 +243,6 @@ dependencies = [ "os_str_bytes", ] -[[package]] -name = "concurrent-queue" -version = "1.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" -dependencies = [ - "cache-padded", -] - [[package]] name = "core-foundation" version = "0.9.3" @@ -457,7 +375,6 @@ dependencies = [ "assert_cmd", "assert_fs", "async-stream", - "async-walkdir", "async_zip", "base64", "chrono", @@ -490,6 +407,7 @@ dependencies = [ "url", "urlencoding", "uuid", + "walkdir", "xml-rs", ] @@ -508,12 +426,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "event-listener" -version = "2.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" - [[package]] name = "fastrand" version = "1.7.0" @@ -631,21 +543,6 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" -[[package]] -name = "futures-lite" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.21" @@ -1260,12 +1157,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "parking" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" - [[package]] name = "parking_lot" version = "0.12.1" @@ -2210,12 +2101,6 @@ dependencies = [ "libc", ] -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - [[package]] name = "walkdir" version = "2.3.2" diff --git a/Cargo.toml b/Cargo.toml index 5f1da9d..2f7000d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,6 @@ serde_json = "1" futures = "0.3" base64 = "0.13" async_zip = { version = "0.0.8", default-features = false, features = ["deflate"] } -async-walkdir = "0.2" headers = "0.3" mime_guess = "2.0" if-addrs = "0.7" @@ -37,6 +36,7 @@ xml-rs = "0.8" log = "0.4" socket2 = "0.4" async-stream = "0.3" +walkdir = "2.3" [features] default = ["tls"] diff --git a/src/main.rs b/src/main.rs index 44f8cbb..9a2ed24 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,7 @@ use crate::server::{Request, Server}; use crate::tls::{TlsAcceptor, TlsStream}; use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use futures::future::join_all; @@ -38,7 +39,8 @@ async fn run() -> BoxResult<()> { logger::init().map_err(|e| format!("Failed to init logger, {}", e))?; let args = Args::parse(matches())?; let args = Arc::new(args); - let handles = serve(args.clone())?; + let running = Arc::new(AtomicBool::new(true)); + let handles = serve(args.clone(), running.clone())?; print_listening(args)?; tokio::select! { @@ -51,13 +53,17 @@ async fn run() -> BoxResult<()> { Ok(()) }, _ = shutdown_signal() => { + running.store(false, Ordering::SeqCst); Ok(()) }, } } -fn serve(args: Arc) -> BoxResult>>> { - let inner = Arc::new(Server::new(args.clone())); +fn serve( + args: Arc, + running: Arc, +) -> BoxResult>>> { + let inner = Arc::new(Server::new(args.clone(), running)); let mut handles = vec![]; let port = args.port; for ip in args.addrs.iter() { diff --git a/src/server.rs b/src/server.rs index c5c5e69..356c6ef 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,13 +1,12 @@ use crate::streamer::Streamer; use crate::utils::{decode_uri, encode_uri, get_file_name, try_get_file_name}; use crate::{Args, BoxResult}; -use async_walkdir::{Filtering, WalkDir}; +use walkdir::WalkDir; use xml::escape::escape_str_pcdata; use async_zip::write::{EntryOptions, ZipFileWriter}; use async_zip::Compression; use chrono::{TimeZone, Utc}; -use futures::stream::StreamExt; use futures::TryStreamExt; use headers::{ AcceptRanges, AccessControlAllowCredentials, AccessControlAllowHeaders, @@ -24,6 +23,7 @@ use std::fs::Metadata; use std::io::SeekFrom; use std::net::SocketAddr; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::SystemTime; use tokio::fs::File; @@ -45,13 +45,15 @@ const BUF_SIZE: usize = 65536; pub struct Server { args: Arc, assets_prefix: String, + running: Arc, } impl Server { - pub fn new(args: Arc) -> Self { + pub fn new(args: Arc, running: Arc) -> Self { let assets_prefix = format!("{}__dufs_v{}_", args.uri_prefix, env!("CARGO_PKG_VERSION")); Self { args, + running, assets_prefix, } } @@ -331,34 +333,40 @@ impl Server { res: &mut Response, ) -> BoxResult<()> { let mut paths: Vec = vec![]; + let path_buf = path.to_path_buf(); let hidden = self.args.hidden.to_string(); - let search = search.to_string(); - let mut walkdir = WalkDir::new(path).filter(move |entry| { - let hidden_cloned = hidden.clone(); - let search_cloned = search.clone(); - async move { + let running = self.running.clone(); + let search = search.to_lowercase(); + let search_paths = tokio::task::spawn_blocking(move || { + let mut it = WalkDir::new(&path_buf).into_iter(); + let mut paths: Vec = vec![]; + while let Some(Ok(entry)) = it.next() { + if !running.load(Ordering::SeqCst) { + break; + } let entry_path = entry.path(); - let base_name = get_file_name(&entry_path); - if is_hidden(&hidden_cloned, base_name) { - return Filtering::IgnoreDir; + let base_name = get_file_name(entry_path); + let file_type = entry.file_type(); + if is_hidden(&hidden, base_name) { + if file_type.is_dir() { + it.skip_current_dir(); + } + continue; } - if !base_name - .to_lowercase() - .contains(&search_cloned.to_lowercase()) - { - return Filtering::Ignore; + if !base_name.to_lowercase().contains(&search) { + continue; } - if fs::symlink_metadata(entry.path()).await.is_err() { - return Filtering::Ignore; + if entry.path().symlink_metadata().is_err() { + continue; } - Filtering::Continue + paths.push(entry_path.to_path_buf()); } - }); - while let Some(entry) = walkdir.next().await { - if let Ok(entry) = entry { - if let Ok(Some(item)) = self.to_pathitem(entry.path(), path.to_path_buf()).await { - paths.push(item); - } + paths + }) + .await?; + for search_path in search_paths.into_iter() { + if let Ok(Some(item)) = self.to_pathitem(search_path, path.to_path_buf()).await { + paths.push(item); } } self.send_index(path, paths, true, head_only, res) @@ -387,8 +395,9 @@ impl Server { } let path = path.to_owned(); let hidden = self.args.hidden.clone(); + let running = self.running.clone(); tokio::spawn(async move { - if let Err(e) = zip_dir(&mut writer, &path, &hidden).await { + if let Err(e) = zip_dir(&mut writer, &path, &hidden, running).await { error!("Failed to zip {}, {}", path.display(), e); } }); @@ -992,41 +1001,54 @@ fn res_multistatus(res: &mut Response, content: &str) { )); } -async fn zip_dir(writer: &mut W, dir: &Path, hidden: &str) -> BoxResult<()> { +async fn zip_dir( + writer: &mut W, + dir: &Path, + hidden: &str, + running: Arc, +) -> BoxResult<()> { let mut writer = ZipFileWriter::new(writer); + let hidden = Arc::new(hidden.to_string()); let hidden = hidden.to_string(); - let mut walkdir = WalkDir::new(dir).filter(move |entry| { - let hidden = hidden.clone(); - async move { + let dir_path_buf = dir.to_path_buf(); + let zip_paths = tokio::task::spawn_blocking(move || { + let mut it = WalkDir::new(&dir_path_buf).into_iter(); + let mut paths: Vec = vec![]; + while let Some(Ok(entry)) = it.next() { + if !running.load(Ordering::SeqCst) { + break; + } let entry_path = entry.path(); - let base_name = get_file_name(&entry_path); + let base_name = get_file_name(entry_path); + let file_type = entry.file_type(); if is_hidden(&hidden, base_name) { - return Filtering::IgnoreDir; + if file_type.is_dir() { + it.skip_current_dir(); + } + continue; } - let meta = match fs::symlink_metadata(entry.path()).await { - Ok(meta) => meta, - Err(_) => return Filtering::Ignore, - }; - if !meta.is_file() { - return Filtering::Ignore; + if entry.path().symlink_metadata().is_err() { + continue; } - Filtering::Continue - } - }); - while let Some(entry) = walkdir.next().await { - if let Ok(entry) = entry { - let entry_path = entry.path(); - let filename = match entry_path.strip_prefix(dir).ok().and_then(|v| v.to_str()) { - Some(v) => v, - None => continue, - }; - let entry_options = EntryOptions::new(filename.to_owned(), Compression::Deflate) - .unix_permissions(0o644); - let mut file = File::open(&entry_path).await?; - let mut file_writer = writer.write_entry_stream(entry_options).await?; - io::copy(&mut file, &mut file_writer).await?; - file_writer.close().await?; + if !file_type.is_file() { + continue; + } + paths.push(entry_path.to_path_buf()); } + paths + }) + .await?; + for zip_path in zip_paths.into_iter() { + let filename = match zip_path.strip_prefix(dir).ok().and_then(|v| v.to_str()) { + Some(v) => v, + None => continue, + }; + let entry_options = + EntryOptions::new(filename.to_owned(), Compression::Deflate).unix_permissions(0o644); + let mut file = File::open(&zip_path).await?; + let mut file_writer = writer.write_entry_stream(entry_options).await?; + io::copy(&mut file, &mut file_writer).await?; + file_writer.close().await?; } writer.close().await?; Ok(())