From 270cc0cba20adde0eca09a5ab963c86d781c1b25 Mon Sep 17 00:00:00 2001 From: sigoden Date: Thu, 21 Dec 2023 14:24:20 +0800 Subject: [PATCH] feat: upgrade to hyper 1.0 (#321) --- Cargo.lock | 389 +++++++++++++++++++++++++++++++--------------- Cargo.toml | 32 ++-- src/http_utils.rs | 105 +++++++++++++ src/main.rs | 169 ++++++++++++-------- src/server.rs | 103 +++++++----- src/streamer.rs | 68 -------- src/tls.rs | 161 ------------------- src/unix.rs | 31 ---- src/utils.rs | 42 +++++ tests/range.rs | 2 +- tests/utils.rs | 2 +- 11 files changed, 595 insertions(+), 509 deletions(-) create mode 100644 src/http_utils.rs delete mode 100644 src/streamer.rs delete mode 100644 src/tls.rs delete mode 100644 src/unix.rs diff --git a/Cargo.lock b/Cargo.lock index 4b77a44..5b86462 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,9 +49,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" +checksum = "d664a92ecae85fd0a7392615844904654d1d5f5514837f471ddef4a057aba1b6" dependencies = [ "anstyle", "anstyle-parse", @@ -69,37 +69,37 @@ checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.0.0" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.1" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" dependencies = [ "anstyle", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "anyhow" -version = "1.0.75" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +checksum = "59d2a3357dde987206219e78ecfbbb6e8dad06cbb65292758d3270e6254f7355" [[package]] name = "assert_cmd" @@ -170,9 +170,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.74" +version = "0.1.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98" dependencies = [ "proc-macro2", "quote", @@ -334,18 +334,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.10" +version = "4.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41fffed7514f420abec6d183b1d3acfd9099c79c3a10a06ade4f8203f1411272" +checksum = "bfaff671f6b22ca62406885ece523383b9b64022e341e53e009a62ebc47a45f2" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.4.9" +version = "4.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63361bae7eef3771745f02d8d892bec2fee5f6e34af316ba556e7f97a7069ff1" +checksum = "a216b506622bb1d316cd51328dce24e07bdff4a6128a47c7e7fad11878d5adbb" dependencies = [ "anstream", "anstyle", @@ -386,9 +386,9 @@ dependencies = [ [[package]] name = "core-foundation" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" dependencies = [ "core-foundation-sys", "libc", @@ -396,9 +396,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.4" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" [[package]] name = "cpufeatures" @@ -420,9 +420,9 @@ dependencies = [ [[package]] name = "crossbeam-deque" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +checksum = "fca89a0e215bab21874660c67903c5f143333cab1da83d041c7ded6053774751" dependencies = [ "cfg-if", "crossbeam-epoch", @@ -431,22 +431,21 @@ dependencies = [ [[package]] name = "crossbeam-epoch" -version = "0.9.15" +version = "0.9.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +checksum = "2d2fe95351b870527a5d09bf563ed3c97c0cffb87cf1c78a591bf48bb218d9aa" dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", "memoffset", - "scopeguard", ] [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" dependencies = [ "cfg-if", ] @@ -492,9 +491,9 @@ dependencies = [ [[package]] name = "diqwest" -version = "1.2.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "950c7bfe1f2b1558dd016a87a4d86fd12bf5c806429c045dcb2aa80e152ee626" +checksum = "cad41e37bc7d22a32ada3a0dad12b12b2347e88e8511683b111967b776f750eb" dependencies = [ "async-trait", "digest_auth", @@ -519,6 +518,7 @@ dependencies = [ "async-stream", "async_zip", "base64", + "bytes", "chardetng", "chrono", "clap", @@ -526,10 +526,12 @@ dependencies = [ "content_inspector", "diqwest", "form_urlencoded", - "futures", + "futures-util", "glob", "headers", - "hyper", + "http-body-util", + "hyper 1.1.0", + "hyper-util", "if-addrs", "indexmap", "lazy_static", @@ -537,21 +539,22 @@ dependencies = [ "md5", "mime_guess", "percent-encoding", + "pin-project-lite", "port_check", "predicates", "regex", "reqwest", "rstest", - "rustls", - "rustls-pemfile", + "rustls-pemfile 2.0.0", + "rustls-pki-types", "serde", "serde_json", "serde_yaml", "sha-crypt", "smart-default", - "socket2 0.5.5", + "socket2", "tokio", - "tokio-rustls", + "tokio-rustls 0.25.0", "tokio-util", "url", "urlencoding", @@ -794,7 +797,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.11", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d308f63daf4181410c242d34c11f928dcb3aa105852019e043c9d1f4e4368a" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", "indexmap", "slab", "tokio", @@ -810,14 +832,14 @@ checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" [[package]] name = "headers" -version = "0.3.9" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" dependencies = [ "base64", "bytes", "headers-core", - "http", + "http 1.0.0", "httpdate", "mime", "sha1", @@ -825,11 +847,11 @@ dependencies = [ [[package]] name = "headers-core" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" dependencies = [ - "http", + "http 1.0.0", ] [[package]] @@ -856,13 +878,47 @@ dependencies = [ ] [[package]] -name = "http-body" -version = "0.4.5" +name = "http" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" dependencies = [ "bytes", - "http", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.11", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -880,28 +936,47 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "0.14.27" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.22", + "http 0.2.11", + "http-body 0.4.6", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2", "tokio", "tower-service", "tracing", "want", ] +[[package]] +name = "hyper" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.0", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -909,11 +984,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", - "hyper", - "rustls", + "http 0.2.11", + "hyper 0.14.28", + "rustls 0.21.10", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", +] + +[[package]] +name = "hyper-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdea9aac0dbe5a9240d68cfd9501e2db94222c6dc06843e06640b9e07f0fdc67" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.1.0", + "pin-project-lite", + "socket2", + "tokio", + "tracing", ] [[package]] @@ -951,12 +1044,12 @@ dependencies = [ [[package]] name = "if-addrs" -version = "0.10.2" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cabb0019d51a643781ff15c9c8a3e5dedc365c47211270f4e8f82812fedd8f0a" +checksum = "5ad1fe622fcc3ccd2bc6d08f7485577535a15af46be880abb7535e5f3a4c322d" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -1002,9 +1095,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" @@ -1023,15 +1116,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.150" +version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "linux-raw-sys" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829" +checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" [[package]] name = "log" @@ -1108,9 +1201,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "wasi", @@ -1153,9 +1246,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "percent-encoding" @@ -1197,9 +1290,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" [[package]] name = "port_check" @@ -1338,19 +1431,19 @@ checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca" [[package]] name = "reqwest" -version = "0.11.22" +version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ "base64", "bytes", "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.22", + "http 0.2.11", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-rustls", "ipnet", "js-sys", @@ -1360,14 +1453,14 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls", - "rustls-pemfile", + "rustls 0.21.10", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", "system-configuration", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", "tower-service", "url", "wasm-bindgen", @@ -1379,9 +1472,9 @@ dependencies = [ [[package]] name = "ring" -version = "0.17.6" +version = "0.17.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "684d5e6e18f669ccebf64a92236bb7db9a34f07be010e3627368182027180866" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" dependencies = [ "cc", "getrandom", @@ -1437,29 +1530,43 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.25" +version = "0.38.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc99bc2d4f1fed22595588a013687477aedf3cdcfb26558c559edb67b4d9b22e" +checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" dependencies = [ "bitflags 2.4.1", "errno", "libc", "linux-raw-sys", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "rustls" -version = "0.21.9" +version = "0.21.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9" +checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ "log", "ring", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe6b63262c9fcac8659abfaa96cac103d28166d3ff3eaf8f412e19f3ae9e5a48" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.0", + "subtle", + "zeroize", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -1469,6 +1576,22 @@ dependencies = [ "base64", ] +[[package]] +name = "rustls-pemfile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" +dependencies = [ + "base64", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7673e0aa20ee4937c6aacfc12bb8341cfbf054cdd21df6bec5fd0629fe9339b" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -1480,10 +1603,21 @@ dependencies = [ ] [[package]] -name = "ryu" -version = "1.0.15" +name = "rustls-webpki" +version = "0.102.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +checksum = "de2635c8bc2b88d367767c5de8ea1d8db9af3f6219eba28442242d9ab81d1b89" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + +[[package]] +name = "ryu" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" [[package]] name = "same-file" @@ -1494,12 +1628,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "scopeguard" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" - [[package]] name = "sct" version = "0.7.1" @@ -1561,9 +1689,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.27" +version = "0.9.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cc7a1570e38322cfe4154732e5110f887ea57e22b76f4bfd32b5bdd3368666c" +checksum = "9269cfafc7e0257ee4a42f3f68a307f458c63d9e7c8ba4b58c5d15f1b7d7e8d3" dependencies = [ "indexmap", "itoa", @@ -1635,16 +1763,6 @@ dependencies = [ "syn", ] -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.5" @@ -1675,9 +1793,9 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "syn" -version = "2.0.39" +version = "2.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "5b7d0a2c048d661a1a59fcd7355baa232f7ed34e0ee4df2eef3c1c1c0d3852d8" dependencies = [ "proc-macro2", "quote", @@ -1736,18 +1854,18 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "thiserror" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" +checksum = "f11c217e1416d6f036b870f14e0413d480dbf28edbee1f877abaf0206af43bb7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" +checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" dependencies = [ "proc-macro2", "quote", @@ -1771,9 +1889,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.34.0" +version = "1.35.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", "bytes", @@ -1782,7 +1900,7 @@ dependencies = [ "num_cpus", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.5", + "socket2", "tokio-macros", "windows-sys 0.48.0", ] @@ -1804,7 +1922,18 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.10", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.1", + "rustls-pki-types", "tokio", ] @@ -1850,9 +1979,9 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "typenum" @@ -1871,9 +2000,9 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" +checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" [[package]] name = "unicode-ident" @@ -1892,9 +2021,9 @@ dependencies = [ [[package]] name = "unsafe-libyaml" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28467d3e1d3c6586d8f25fa243f544f5800fec42d97032474e17222c2b75cfa" +checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" [[package]] name = "untrusted" @@ -2253,3 +2382,9 @@ checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" dependencies = [ "lzma-sys", ] + +[[package]] +name = "zeroize" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" diff --git a/Cargo.toml b/Cargo.toml index 93e7547..7a4a21d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,18 +16,17 @@ clap_complete = "4" chrono = { version = "0.4", default-features = false, features = ["clock"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "fs", "io-util", "signal"]} tokio-util = { version = "0.7", features = ["io-util", "compat"] } -hyper = { version = "0.14", features = ["http1", "server", "tcp", "stream"] } +hyper = { version = "1.0", features = ["http1", "server"] } percent-encoding = "2.3" serde = { version = "1", features = ["derive"] } serde_json = "1" -futures = "0.3" +futures-util = { version = "0.3", default-features = false, features = ["alloc"] } async_zip = { version = "0.0.15", default-features = false, features = ["deflate", "bzip2", "xz", "chrono", "tokio"] } -headers = "0.3" +headers = "0.4" mime_guess = "2.0" -if-addrs = "0.10.1" -rustls = { version = "0.21", default-features = false, features = ["tls12"], optional = true } -rustls-pemfile = { version = "1", optional = true } -tokio-rustls = { version = "0.24", optional = true } +if-addrs = "0.11" +rustls-pemfile = { version = "2.0", optional = true } +tokio-rustls = { version = "0.25", optional = true } md5 = "0.7" lazy_static = "1.4" uuid = { version = "1.4", features = ["v4", "fast-rng"] } @@ -42,16 +41,21 @@ alphanumeric-sort = "1.4" content_inspector = "0.2" anyhow = "1.0" chardetng = "0.1" -glob = "0.3.1" +glob = "0.3" indexmap = "2.0" -serde_yaml = "0.9.27" -sha-crypt = "0.5.0" -base64 = "0.21.5" -smart-default = "0.7.1" +serde_yaml = "0.9" +sha-crypt = "0.5" +base64 = "0.21" +smart-default = "0.7" +rustls-pki-types = "1.0" +hyper-util = { version = "0.1", features = ["server-auto", "tokio"] } +http-body-util = "0.1" +bytes = "1.5" +pin-project-lite = "0.2" [features] default = ["tls"] -tls = ["rustls", "rustls-pemfile", "tokio-rustls"] +tls = ["rustls-pemfile", "tokio-rustls"] [dev-dependencies] assert_cmd = "2" @@ -61,7 +65,7 @@ port_check = "0.1" rstest = "0.18" regex = "1" url = "2" -diqwest = { version = "1", features = ["blocking", "rustls-tls"], default-features = false } +diqwest = { version = "2.0", features = ["blocking"], default-features = false } predicates = "3" [profile.release] diff --git a/src/http_utils.rs b/src/http_utils.rs new file mode 100644 index 0000000..c18f0b0 --- /dev/null +++ b/src/http_utils.rs @@ -0,0 +1,105 @@ +use bytes::{Bytes, BytesMut}; +use futures_util::Stream; +use http_body_util::{combinators::BoxBody, BodyExt, Full}; +use hyper::body::{Body, Incoming}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tokio::io::AsyncRead; +use tokio_util::io::poll_read_buf; + +#[derive(Debug)] +pub struct IncomingStream { + inner: Incoming, +} + +impl IncomingStream { + pub fn new(inner: Incoming) -> Self { + Self { inner } + } +} + +impl Stream for IncomingStream { + type Item = Result; + + #[inline] + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match futures_util::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) { + Some(frame) => match frame.into_data() { + Ok(data) => return Poll::Ready(Some(Ok(data))), + Err(_frame) => {} + }, + None => return Poll::Ready(None), + } + } + } +} + +pin_project_lite::pin_project! { + pub struct LengthLimitedStream { + #[pin] + reader: Option, + remaining: usize, + buf: BytesMut, + capacity: usize, + } +} + +impl LengthLimitedStream { + pub fn new(reader: R, limit: usize) -> Self { + Self { + reader: Some(reader), + remaining: limit, + buf: BytesMut::new(), + capacity: 4096, + } + } +} + +impl Stream for LengthLimitedStream { + type Item = std::io::Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + + if *this.remaining == 0 { + self.project().reader.set(None); + return Poll::Ready(None); + } + + let reader = match this.reader.as_pin_mut() { + Some(r) => r, + None => return Poll::Ready(None), + }; + + if this.buf.capacity() == 0 { + this.buf.reserve(*this.capacity); + } + + match poll_read_buf(reader, cx, &mut this.buf) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => { + self.project().reader.set(None); + Poll::Ready(Some(Err(err))) + } + Poll::Ready(Ok(0)) => { + self.project().reader.set(None); + Poll::Ready(None) + } + Poll::Ready(Ok(_)) => { + let mut chunk = this.buf.split(); + let chunk_size = (*this.remaining).min(chunk.len()); + chunk.truncate(chunk_size); + *this.remaining -= chunk_size; + Poll::Ready(Some(Ok(chunk.freeze()))) + } + } + } +} + +pub fn body_full(content: impl Into) -> BoxBody { + Full::new(content.into()) + .map_err(anyhow::Error::new) + .boxed() +} diff --git a/src/main.rs b/src/main.rs index a3ac7d7..6933eb6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,38 +1,37 @@ mod args; mod auth; mod http_logger; +mod http_utils; mod logger; mod server; -mod streamer; -#[cfg(feature = "tls")] -mod tls; -#[cfg(unix)] -mod unix; mod utils; #[macro_use] extern crate log; use crate::args::{build_cli, print_completions, Args}; -use crate::server::{Request, Server}; +use crate::server::Server; #[cfg(feature = "tls")] -use crate::tls::{load_certs, load_private_key, TlsAcceptor, TlsStream}; +use crate::utils::{load_certs, load_private_key}; use anyhow::{anyhow, Context, Result}; -use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - use args::BindAddr; use clap_complete::Shell; -use futures::future::join_all; -use tokio::net::TcpListener; -use tokio::task::JoinHandle; +use futures_util::future::join_all; -use hyper::server::conn::{AddrIncoming, AddrStream}; -use hyper::service::{make_service_fn, service_fn}; +use hyper::{body::Incoming, service::service_fn, Request}; +use hyper_util::{ + rt::{TokioExecutor, TokioIo}, + server::conn::auto::Builder, +}; +use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; +use tokio::{net::TcpListener, task::JoinHandle}; #[cfg(feature = "tls")] -use rustls::ServerConfig; +use tokio_rustls::{rustls::ServerConfig, TlsAcceptor}; #[tokio::main] async fn main() -> Result<()> { @@ -45,10 +44,10 @@ async fn main() -> Result<()> { return Ok(()); } let args = Args::parse(matches)?; - let args = Arc::new(args); let running = Arc::new(AtomicBool::new(true)); - let handles = serve(args.clone(), running.clone())?; - print_listening(args)?; + let listening = print_listening(&args)?; + let handles = serve(args, running.clone())?; + println!("{listening}"); tokio::select! { ret = join_all(handles) => { @@ -66,56 +65,65 @@ async fn main() -> Result<()> { } } -fn serve( - args: Arc, - running: Arc, -) -> Result>>> { - let inner = Arc::new(Server::init(args.clone(), running)?); - let mut handles = vec![]; +fn serve(args: Args, running: Arc) -> Result>> { + let addrs = args.addrs.clone(); let port = args.port; - for bind_addr in args.addrs.iter() { - let inner = inner.clone(); - let serve_func = move |remote_addr: Option| { - let inner = inner.clone(); - async move { - Ok::<_, hyper::Error>(service_fn(move |req: Request| { - let inner = inner.clone(); - inner.call(req, remote_addr) - })) - } - }; + let tls_config = (args.tls_cert.clone(), args.tls_key.clone()); + let server_handle = Arc::new(Server::init(args, running)?); + let mut handles = vec![]; + for bind_addr in addrs.iter() { + let server_handle = server_handle.clone(); match bind_addr { BindAddr::Address(ip) => { - let incoming = create_addr_incoming(SocketAddr::new(*ip, port)) + let listener = create_listener(SocketAddr::new(*ip, port)) .with_context(|| format!("Failed to bind `{ip}:{port}`"))?; - match (&args.tls_cert, &args.tls_key) { + match &tls_config { #[cfg(feature = "tls")] (Some(cert_file), Some(key_file)) => { let certs = load_certs(cert_file)?; let key = load_private_key(key_file)?; - let config = ServerConfig::builder() - .with_safe_defaults() + let mut config = ServerConfig::builder() .with_no_client_auth() - .with_single_cert(certs.clone(), key.clone())?; + .with_single_cert(certs, key)?; + config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; let config = Arc::new(config); - let accepter = TlsAcceptor::new(config.clone(), incoming); - let new_service = make_service_fn(move |socket: &TlsStream| { - let remote_addr = socket.remote_addr(); - serve_func(Some(remote_addr)) + let tls_accepter = TlsAcceptor::from(config); + + let handle = tokio::spawn(async move { + loop { + let (cnx, addr) = listener.accept().await.unwrap(); + let Ok(stream) = tls_accepter.accept(cnx).await else { + eprintln!( + "Warning during tls handshake connection from {}", + addr + ); + continue; + }; + let stream = TokioIo::new(stream); + tokio::spawn(handle_stream( + server_handle.clone(), + stream, + Some(addr), + )); + } }); - let server = - tokio::spawn(hyper::Server::builder(accepter).serve(new_service)); - handles.push(server); + + handles.push(handle); } (None, None) => { - let new_service = make_service_fn(move |socket: &AddrStream| { - let remote_addr = socket.remote_addr(); - serve_func(Some(remote_addr)) + let handle = tokio::spawn(async move { + loop { + let (cnx, addr) = listener.accept().await.unwrap(); + let stream = TokioIo::new(cnx); + tokio::spawn(handle_stream( + server_handle.clone(), + stream, + Some(addr), + )); + } }); - let server = - tokio::spawn(hyper::Server::builder(incoming).serve(new_service)); - handles.push(server); + handles.push(handle); } _ => { unreachable!() @@ -130,10 +138,15 @@ fn serve( { let listener = tokio::net::UnixListener::bind(path) .with_context(|| format!("Failed to bind `{}`", path.display()))?; - let acceptor = unix::UnixAcceptor::from_listener(listener); - let new_service = make_service_fn(move |_| serve_func(None)); - let server = tokio::spawn(hyper::Server::builder(acceptor).serve(new_service)); - handles.push(server); + let handle = tokio::spawn(async move { + loop { + let (cnx, _) = listener.accept().await.unwrap(); + let stream = TokioIo::new(cnx); + tokio::spawn(handle_stream(server_handle.clone(), stream, None)); + } + }); + + handles.push(handle); } } } @@ -141,7 +154,30 @@ fn serve( Ok(handles) } -fn create_addr_incoming(addr: SocketAddr) -> Result { +async fn handle_stream(handle: Arc, stream: TokioIo, addr: Option) +where + T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, +{ + let hyper_service = + service_fn(move |request: Request| handle.clone().call(request, addr)); + + let ret = Builder::new(TokioExecutor::new()) + .serve_connection_with_upgrades(stream, hyper_service) + .await; + + if let Err(err) = ret { + let scope = match addr { + Some(addr) => format!(" from {}", addr), + None => String::new(), + }; + match err.downcast_ref::() { + Some(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {} + _ => eprintln!("Warning serving connection{}: {}", scope, err), + } + } +} + +fn create_listener(addr: SocketAddr) -> Result { use socket2::{Domain, Protocol, Socket, Type}; let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?; if addr.is_ipv6() { @@ -152,11 +188,12 @@ fn create_addr_incoming(addr: SocketAddr) -> Result { socket.listen(1024 /* Default backlog */)?; let std_listener = StdTcpListener::from(socket); std_listener.set_nonblocking(true)?; - let incoming = AddrIncoming::from_listener(TcpListener::from_std(std_listener)?)?; - Ok(incoming) + let listener = TcpListener::from_std(std_listener)?; + Ok(listener) } -fn print_listening(args: Arc) -> Result<()> { +fn print_listening(args: &Args) -> Result { + let mut output = String::new(); let mut bind_addrs = vec![]; let (mut ipv4, mut ipv6) = (false, false); for bind_addr in args.addrs.iter() { @@ -209,17 +246,17 @@ fn print_listening(args: Arc) -> Result<()> { .collect::>(); if urls.len() == 1 { - println!("Listening on {}", urls[0]); + output.push_str(&format!("Listening on {}", urls[0])) } else { let info = urls .iter() .map(|v| format!(" {v}")) .collect::>() .join("\n"); - println!("Listening on:\n{info}\n"); + output.push_str(&format!("Listening on:\n{info}\n")) } - Ok(()) + Ok(output) } async fn shutdown_signal() { diff --git a/src/server.rs b/src/server.rs index 306e22d..5dd4add 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,29 +1,32 @@ #![allow(clippy::too_many_arguments)] use crate::auth::{www_authenticate, AccessPaths, AccessPerm}; -use crate::streamer::Streamer; +use crate::http_utils::{body_full, IncomingStream, LengthLimitedStream}; use crate::utils::{ decode_uri, encode_uri, get_file_mtime_and_mode, get_file_name, glob, try_get_file_name, }; use crate::Args; -use anyhow::{anyhow, Result}; -use walkdir::WalkDir; -use xml::escape::escape_str_pcdata; -use async_zip::tokio::write::ZipFileWriter; -use async_zip::{Compression, ZipDateTime, ZipEntryBuilder}; +use anyhow::{anyhow, Result}; +use async_zip::{tokio::write::ZipFileWriter, Compression, ZipDateTime, ZipEntryBuilder}; +use bytes::Bytes; use chrono::{LocalResult, TimeZone, Utc}; -use futures::TryStreamExt; +use futures_util::{pin_mut, TryStreamExt}; use headers::{ AcceptRanges, AccessControlAllowCredentials, AccessControlAllowOrigin, CacheControl, ContentLength, ContentType, ETag, HeaderMap, HeaderMapExt, IfModifiedSince, IfNoneMatch, IfRange, LastModified, Range, }; -use hyper::header::{ - HeaderValue, AUTHORIZATION, CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, - RANGE, +use http_body_util::{combinators::BoxBody, BodyExt, StreamBody}; +use hyper::body::Frame; +use hyper::{ + body::Incoming, + header::{ + HeaderValue, AUTHORIZATION, CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_RANGE, + CONTENT_TYPE, RANGE, + }, + Method, StatusCode, Uri, }; -use hyper::{Body, Method, StatusCode, Uri}; use serde::Serialize; use std::borrow::Cow; use std::cmp::Ordering; @@ -39,11 +42,13 @@ use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite}; use tokio::{fs, io}; use tokio_util::compat::FuturesAsyncWriteCompatExt; -use tokio_util::io::StreamReader; +use tokio_util::io::{ReaderStream, StreamReader}; use uuid::Uuid; +use walkdir::WalkDir; +use xml::escape::escape_str_pcdata; -pub type Request = hyper::Request; -pub type Response = hyper::Response; +pub type Request = hyper::Request; +pub type Response = hyper::Response>; const INDEX_HTML: &str = include_str!("../assets/index.html"); const INDEX_CSS: &str = include_str!("../assets/index.css"); @@ -54,7 +59,7 @@ const BUF_SIZE: usize = 65536; const TEXT_MAX_SIZE: u64 = 4194304; // 4M pub struct Server { - args: Arc, + args: Args, assets_prefix: String, html: Cow<'static, str>, single_file_req_paths: Vec, @@ -62,7 +67,7 @@ pub struct Server { } impl Server { - pub fn init(args: Arc, running: Arc) -> Result { + pub fn init(args: Args, running: Arc) -> Result { let assets_prefix = format!("{}__dufs_v{}_", args.uri_prefix, env!("CARGO_PKG_VERSION")); let single_file_req_paths = if args.path_is_file { vec![ @@ -365,7 +370,7 @@ impl Server { status_forbid(&mut res); } else if !is_miss { *res.status_mut() = StatusCode::METHOD_NOT_ALLOWED; - *res.body_mut() = Body::from("Already exists"); + *res.body_mut() = body_full("Already exists"); } else { self.handle_mkcol(path, &mut res).await?; } @@ -411,7 +416,7 @@ impl Server { Ok(res) } - async fn handle_upload(&self, path: &Path, mut req: Request, res: &mut Response) -> Result<()> { + async fn handle_upload(&self, path: &Path, req: Request, res: &mut Response) -> Result<()> { ensure_path_parent(path).await?; let mut file = match fs::File::create(&path).await { @@ -422,13 +427,12 @@ impl Server { } }; - let body_with_io_error = req - .body_mut() - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)); + let stream = IncomingStream::new(req.into_body()); + let body_with_io_error = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err)); let body_reader = StreamReader::new(body_with_io_error); - futures::pin_mut!(body_reader); + pin_mut!(body_reader); let ret = io::copy(&mut body_reader, &mut file).await; if ret.is_err() { @@ -596,8 +600,14 @@ impl Server { error!("Failed to zip {}, {}", path.display(), e); } }); - let reader = Streamer::new(reader, BUF_SIZE); - *res.body_mut() = Body::wrap_stream(reader.into_stream()); + let reader_stream = ReaderStream::new(reader); + let stream_body = StreamBody::new( + reader_stream + .map_ok(Frame::data) + .map_err(|err| anyhow!("{err}")), + ); + let boxed_body = stream_body.boxed(); + *res.body_mut() = boxed_body; Ok(()) } @@ -660,21 +670,21 @@ impl Server { } None => match name { "index.js" => { - *res.body_mut() = Body::from(INDEX_JS); + *res.body_mut() = body_full(INDEX_JS); res.headers_mut().insert( "content-type", HeaderValue::from_static("application/javascript; charset=UTF-8"), ); } "index.css" => { - *res.body_mut() = Body::from(INDEX_CSS); + *res.body_mut() = body_full(INDEX_CSS); res.headers_mut().insert( "content-type", HeaderValue::from_static("text/css; charset=UTF-8"), ); } "favicon.ico" => { - *res.body_mut() = Body::from(FAVICON_ICO); + *res.body_mut() = body_full(FAVICON_ICO); res.headers_mut() .insert("content-type", HeaderValue::from_static("image/x-icon")); } @@ -761,18 +771,24 @@ impl Server { && file.seek(SeekFrom::Start(range.start)).await.is_ok() { let end = range.end.unwrap_or(size - 1).min(size - 1); - let part_size = end - range.start + 1; - let reader = Streamer::new(file, BUF_SIZE); + let range_size = end - range.start + 1; *res.status_mut() = StatusCode::PARTIAL_CONTENT; let content_range = format!("bytes {}-{}/{}", range.start, end, size); res.headers_mut() .insert(CONTENT_RANGE, content_range.parse()?); res.headers_mut() - .insert(CONTENT_LENGTH, format!("{part_size}").parse()?); + .insert(CONTENT_LENGTH, format!("{range_size}").parse()?); if head_only { return Ok(()); } - *res.body_mut() = Body::wrap_stream(reader.into_stream_sized(part_size)); + + let stream_body = StreamBody::new( + LengthLimitedStream::new(file, range_size as usize) + .map_ok(Frame::data) + .map_err(|err| anyhow!("{err}")), + ); + let boxed_body = stream_body.boxed(); + *res.body_mut() = boxed_body; } else { *res.status_mut() = StatusCode::RANGE_NOT_SATISFIABLE; res.headers_mut() @@ -784,8 +800,15 @@ impl Server { if head_only { return Ok(()); } - let reader = Streamer::new(file, BUF_SIZE); - *res.body_mut() = Body::wrap_stream(reader.into_stream()); + + let reader_stream = ReaderStream::new(file); + let stream_body = StreamBody::new( + reader_stream + .map_ok(Frame::data) + .map_err(|err| anyhow!("{err}")), + ); + let boxed_body = stream_body.boxed(); + *res.body_mut() = boxed_body; } Ok(()) } @@ -828,7 +851,7 @@ impl Server { if head_only { return Ok(()); } - *res.body_mut() = output.into(); + *res.body_mut() = body_full(output); Ok(()) } @@ -943,7 +966,7 @@ impl Server { res.headers_mut() .insert("lock-token", format!("<{token}>").parse()?); - *res.body_mut() = Body::from(format!( + *res.body_mut() = body_full(format!( r#" {token} @@ -1014,7 +1037,7 @@ impl Server { .typed_insert(ContentType::from(mime_guess::mime::TEXT_HTML_UTF_8)); res.headers_mut() .typed_insert(ContentLength(output.as_bytes().len() as u64)); - *res.body_mut() = output.into(); + *res.body_mut() = body_full(output); if head_only { return Ok(()); } @@ -1060,7 +1083,7 @@ impl Server { if head_only { return Ok(()); } - *res.body_mut() = output.into(); + *res.body_mut() = body_full(output); Ok(()) } @@ -1419,7 +1442,7 @@ fn res_multistatus(res: &mut Response, content: &str) { "content-type", HeaderValue::from_static("application/xml; charset=utf-8"), ); - *res.body_mut() = Body::from(format!( + *res.body_mut() = body_full(format!( r#" {content} @@ -1539,12 +1562,12 @@ fn parse_range(headers: &HeaderMap) -> Option { fn status_forbid(res: &mut Response) { *res.status_mut() = StatusCode::FORBIDDEN; - *res.body_mut() = Body::from("Forbidden"); + *res.body_mut() = body_full("Forbidden"); } fn status_not_found(res: &mut Response) { *res.status_mut() = StatusCode::NOT_FOUND; - *res.body_mut() = Body::from("Not Found"); + *res.body_mut() = body_full("Not Found"); } fn status_no_content(res: &mut Response) { diff --git a/src/streamer.rs b/src/streamer.rs deleted file mode 100644 index 163b36f..0000000 --- a/src/streamer.rs +++ /dev/null @@ -1,68 +0,0 @@ -use async_stream::stream; -use futures::{Stream, StreamExt}; -use std::io::Error; -use std::pin::Pin; -use tokio::io::{AsyncRead, AsyncReadExt}; - -pub struct Streamer -where - R: AsyncRead + Unpin + Send + 'static, -{ - reader: R, - buf_size: usize, -} - -impl Streamer -where - R: AsyncRead + Unpin + Send + 'static, -{ - #[inline] - pub fn new(reader: R, buf_size: usize) -> Self { - Self { reader, buf_size } - } - pub fn into_stream( - mut self, - ) -> Pin, Error>> + 'static>> { - let stream = stream! { - loop { - let mut buf = vec![0; self.buf_size]; - let r = self.reader.read(&mut buf).await?; - if r == 0 { - break - } - buf.truncate(r); - yield Ok(buf); - } - }; - stream.boxed() - } - // allow truncation as truncated remaining is always less than buf_size: usize - pub fn into_stream_sized( - mut self, - max_length: u64, - ) -> Pin, Error>> + 'static>> { - let stream = stream! { - let mut remaining = max_length; - loop { - if remaining == 0 { - break; - } - let bs = if remaining >= self.buf_size as u64 { - self.buf_size - } else { - remaining as usize - }; - let mut buf = vec![0; bs]; - let r = self.reader.read(&mut buf).await?; - if r == 0 { - break; - } else { - buf.truncate(r); - yield Ok(buf); - } - remaining -= r as u64; - } - }; - stream.boxed() - } -} diff --git a/src/tls.rs b/src/tls.rs deleted file mode 100644 index d419892..0000000 --- a/src/tls.rs +++ /dev/null @@ -1,161 +0,0 @@ -use anyhow::{anyhow, bail, Context as AnyhowContext, Result}; -use core::task::{Context, Poll}; -use futures::ready; -use hyper::server::accept::Accept; -use hyper::server::conn::{AddrIncoming, AddrStream}; -use rustls::{Certificate, PrivateKey}; -use std::future::Future; -use std::net::SocketAddr; -use std::path::Path; -use std::pin::Pin; -use std::sync::Arc; -use std::{fs, io}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio_rustls::rustls::ServerConfig; - -enum State { - Handshaking(tokio_rustls::Accept), - Streaming(tokio_rustls::server::TlsStream), -} - -// tokio_rustls::server::TlsStream doesn't expose constructor methods, -// so we have to TlsAcceptor::accept and handshake to have access to it -// TlsStream implements AsyncRead/AsyncWrite handshaking tokio_rustls::Accept first -pub struct TlsStream { - state: State, - remote_addr: SocketAddr, -} - -impl TlsStream { - fn new(stream: AddrStream, config: Arc) -> TlsStream { - let remote_addr = stream.remote_addr(); - let accept = tokio_rustls::TlsAcceptor::from(config).accept(stream); - TlsStream { - state: State::Handshaking(accept), - remote_addr, - } - } - pub fn remote_addr(&self) -> SocketAddr { - self.remote_addr - } -} - -impl AsyncRead for TlsStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut ReadBuf, - ) -> Poll> { - let pin = self.get_mut(); - match pin.state { - State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) { - Ok(mut stream) => { - let result = Pin::new(&mut stream).poll_read(cx, buf); - pin.state = State::Streaming(stream); - result - } - Err(err) => Poll::Ready(Err(err)), - }, - State::Streaming(ref mut stream) => Pin::new(stream).poll_read(cx, buf), - } - } -} - -impl AsyncWrite for TlsStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let pin = self.get_mut(); - match pin.state { - State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) { - Ok(mut stream) => { - let result = Pin::new(&mut stream).poll_write(cx, buf); - pin.state = State::Streaming(stream); - result - } - Err(err) => Poll::Ready(Err(err)), - }, - State::Streaming(ref mut stream) => Pin::new(stream).poll_write(cx, buf), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.state { - State::Handshaking(_) => Poll::Ready(Ok(())), - State::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx), - } - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.state { - State::Handshaking(_) => Poll::Ready(Ok(())), - State::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx), - } - } -} - -pub struct TlsAcceptor { - config: Arc, - incoming: AddrIncoming, -} - -impl TlsAcceptor { - pub fn new(config: Arc, incoming: AddrIncoming) -> TlsAcceptor { - TlsAcceptor { config, incoming } - } -} - -impl Accept for TlsAcceptor { - type Conn = TlsStream; - type Error = io::Error; - - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let pin = self.get_mut(); - match ready!(Pin::new(&mut pin.incoming).poll_accept(cx)) { - Some(Ok(sock)) => Poll::Ready(Some(Ok(TlsStream::new(sock, pin.config.clone())))), - Some(Err(e)) => Poll::Ready(Some(Err(e))), - None => Poll::Ready(None), - } - } -} - -// Load public certificate from file. -pub fn load_certs>(filename: T) -> Result> { - // Open certificate file. - let cert_file = fs::File::open(filename.as_ref()) - .with_context(|| format!("Failed to access `{}`", filename.as_ref().display()))?; - let mut reader = io::BufReader::new(cert_file); - - // Load and return certificate. - let certs = rustls_pemfile::certs(&mut reader).with_context(|| "Failed to load certificate")?; - if certs.is_empty() { - bail!("No supported certificate in file"); - } - Ok(certs.into_iter().map(Certificate).collect()) -} - -// Load private key from file. -pub fn load_private_key>(filename: T) -> Result { - let key_file = fs::File::open(filename.as_ref()) - .with_context(|| format!("Failed to access `{}`", filename.as_ref().display()))?; - let mut reader = io::BufReader::new(key_file); - - // Load and return a single private key. - let keys = rustls_pemfile::read_all(&mut reader) - .with_context(|| "There was a problem with reading private key")? - .into_iter() - .find_map(|item| match item { - rustls_pemfile::Item::RSAKey(key) - | rustls_pemfile::Item::PKCS8Key(key) - | rustls_pemfile::Item::ECKey(key) => Some(key), - _ => None, - }) - .ok_or_else(|| anyhow!("No supported private key in file"))?; - - Ok(PrivateKey(keys)) -} diff --git a/src/unix.rs b/src/unix.rs deleted file mode 100644 index b8b1710..0000000 --- a/src/unix.rs +++ /dev/null @@ -1,31 +0,0 @@ -use hyper::server::accept::Accept; -use tokio::net::UnixListener; - -use std::pin::Pin; -use std::task::{Context, Poll}; - -pub struct UnixAcceptor { - inner: UnixListener, -} - -impl UnixAcceptor { - pub fn from_listener(listener: UnixListener) -> Self { - Self { inner: listener } - } -} - -impl Accept for UnixAcceptor { - type Conn = tokio::net::UnixStream; - type Error = std::io::Error; - - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - match self.inner.poll_accept(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok((socket, _addr))) => Poll::Ready(Some(Ok(socket))), - Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), - } - } -} diff --git a/src/utils.rs b/src/utils.rs index 2948679..a4f8def 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,5 +1,7 @@ use anyhow::{anyhow, Context, Result}; use chrono::{DateTime, Utc}; +#[cfg(feature = "tls")] +use rustls_pki_types::{CertificateDer, PrivateKeyDer}; use std::{ borrow::Cow, path::Path, @@ -58,6 +60,46 @@ pub fn glob(pattern: &str, target: &str) -> bool { pat.matches(target) } +// Load public certificate from file. +#[cfg(feature = "tls")] +pub fn load_certs>(filename: T) -> Result>> { + // Open certificate file. + let cert_file = std::fs::File::open(filename.as_ref()) + .with_context(|| format!("Failed to access `{}`", filename.as_ref().display()))?; + let mut reader = std::io::BufReader::new(cert_file); + + // Load and return certificate. + let mut certs = vec![]; + for cert in rustls_pemfile::certs(&mut reader) { + let cert = cert.with_context(|| "Failed to load certificate")?; + certs.push(cert) + } + if certs.is_empty() { + anyhow::bail!("No supported certificate in file"); + } + Ok(certs) +} + +// Load private key from file. +#[cfg(feature = "tls")] +pub fn load_private_key>(filename: T) -> Result> { + let key_file = std::fs::File::open(filename.as_ref()) + .with_context(|| format!("Failed to access `{}`", filename.as_ref().display()))?; + let mut reader = std::io::BufReader::new(key_file); + + // Load and return a single private key. + for key in rustls_pemfile::read_all(&mut reader) { + let key = key.with_context(|| "There was a problem with reading private key")?; + match key { + rustls_pemfile::Item::Pkcs1Key(key) => return Ok(PrivateKeyDer::Pkcs1(key)), + rustls_pemfile::Item::Pkcs8Key(key) => return Ok(PrivateKeyDer::Pkcs8(key)), + rustls_pemfile::Item::Sec1Key(key) => return Ok(PrivateKeyDer::Sec1(key)), + _ => {} + } + } + anyhow::bail!("No supported private key in file"); +} + #[test] fn test_glob_key() { assert!(glob("", "")); diff --git a/tests/range.rs b/tests/range.rs index a2c9c50..4da721b 100644 --- a/tests/range.rs +++ b/tests/range.rs @@ -2,7 +2,7 @@ mod fixtures; mod utils; use fixtures::{server, Error, TestServer}; -use headers::HeaderValue; +use reqwest::header::HeaderValue; use rstest::rstest; #[rstest] diff --git a/tests/utils.rs b/tests/utils.rs index 90d3f54..c987050 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -20,7 +20,7 @@ macro_rules! assert_resp_paths { #[macro_export] macro_rules! fetch { ($method:literal, $url:expr) => { - reqwest::blocking::Client::new().request(hyper::Method::from_bytes($method)?, $url) + reqwest::blocking::Client::new().request(reqwest::Method::from_bytes($method)?, $url) }; }