From 6fb1062376a2f7fd4e9a47ca145890b3726db476 Mon Sep 17 00:00:00 2001 From: Magnus Hoff Date: Fri, 8 Sep 2017 16:21:24 +0200 Subject: [PATCH] Execute State functions in thread pool --- Cargo.lock | 1 + Cargo.toml | 1 + src/main.rs | 1 + src/state.rs | 111 +++++++++++++++++++++++++++------------------------ 4 files changed, 62 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a79157..0b9c13f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,6 +9,7 @@ dependencies = [ "diesel 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)", "diesel_codegen 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "libsqlite3-sys 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index b6c0d76..94b3e1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Magnus Hoff "] [dependencies] futures = "0.1" +futures-cpupool = "0.1" hyper = "0.11" tokio-io = "0.1" tokio-proto = "0.1" diff --git a/src/main.rs b/src/main.rs index b948e69..0c89d91 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ extern crate chrono; extern crate clap; extern crate futures; +extern crate futures_cpupool; extern crate hyper; extern crate pulldown_cmark; extern crate r2d2; diff --git a/src/state.rs b/src/state.rs index 47282c6..18f0ce4 100644 --- a/src/state.rs +++ b/src/state.rs @@ -3,7 +3,7 @@ use std; use diesel; use diesel::sqlite::SqliteConnection; use diesel::prelude::*; -use futures::{Future, IntoFuture, BoxFuture}; +use futures_cpupool::{self, CpuFuture}; use r2d2::Pool; use r2d2_diesel::ConnectionManager; @@ -11,79 +11,86 @@ use models; #[derive(Clone)] pub struct State { - connection_pool: Pool> + connection_pool: Pool>, + cpu_pool: futures_cpupool::CpuPool, } pub type Error = Box; impl State { pub fn new(connection_pool: Pool>) -> State { - State { connection_pool } + State { + connection_pool, + cpu_pool: futures_cpupool::CpuPool::new_num_cpus(), + } } - pub fn get_article_revision_by_id(&self, article_id: i32) -> BoxFuture, Error> { - (|| -> Result<_, _> { + pub fn get_article_revision_by_id(&self, article_id: i32) -> CpuFuture, Error> { + let connection_pool = self.connection_pool.clone(); + + self.cpu_pool.spawn_fn(move || { use schema::article_revisions; Ok(article_revisions::table .filter(article_revisions::article_id.eq(article_id)) .order(article_revisions::revision.desc()) .limit(1) - .load::(&*self.connection_pool.get()?)? + .load::(&*connection_pool.get()?)? .pop()) - }()).into_future().boxed() + }) } - pub fn update_article(&self, article_id: i32, base_revision: i32, body: String) -> BoxFuture { - self.connection_pool.get().into_future() - .map_err(Into::into) - .and_then(move |conn| { - conn.transaction(|| { - use schema::article_revisions; + pub fn update_article(&self, article_id: i32, base_revision: i32, body: String) -> CpuFuture { + let connection_pool = self.connection_pool.clone(); - let (latest_revision, title) = article_revisions::table - .filter(article_revisions::article_id.eq(article_id)) - .order(article_revisions::revision.desc()) - .limit(1) - .select((article_revisions::revision, article_revisions::title)) - .load::<(i32, String)>(&*conn)? - .pop() - .unwrap_or_else(|| unimplemented!("TODO Missing an error type")); + self.cpu_pool.spawn_fn(move || { + let conn = connection_pool.get()?; - if latest_revision != base_revision { - // TODO: If it is the same edit repeated, just respond OK - // TODO: If there is a conflict, transform the edit to work seamlessly - unimplemented!("TODO Missing handling of revision conflicts"); - } - let new_revision = base_revision + 1; + conn.transaction(|| { + use schema::article_revisions; - #[derive(Insertable)] - #[table_name="article_revisions"] - struct NewRevision<'a> { - article_id: i32, - revision: i32, - title: &'a str, - body: &'a str, - } + let (latest_revision, title) = article_revisions::table + .filter(article_revisions::article_id.eq(article_id)) + .order(article_revisions::revision.desc()) + .limit(1) + .select((article_revisions::revision, article_revisions::title)) + .load::<(i32, String)>(&*conn)? + .pop() + .unwrap_or_else(|| unimplemented!("TODO Missing an error type")); - diesel::insert(&NewRevision { - article_id, - revision: new_revision, - title: &title, - body: &body, - }) - .into(article_revisions::table) - .execute(&*conn)?; + if latest_revision != base_revision { + // TODO: If it is the same edit repeated, just respond OK + // TODO: If there is a conflict, transform the edit to work seamlessly + unimplemented!("TODO Missing handling of revision conflicts"); + } + let new_revision = base_revision + 1; - Ok(article_revisions::table - .filter(article_revisions::article_id.eq(article_id)) - .filter(article_revisions::revision.eq(new_revision)) - .load::(&*conn)? - .pop() - .expect("We just inserted this row!") - ) - }) + #[derive(Insertable)] + #[table_name="article_revisions"] + struct NewRevision<'a> { + article_id: i32, + revision: i32, + title: &'a str, + body: &'a str, + } + + diesel::insert(&NewRevision { + article_id, + revision: new_revision, + title: &title, + body: &body, + }) + .into(article_revisions::table) + .execute(&*conn)?; + + Ok(article_revisions::table + .filter(article_revisions::article_id.eq(article_id)) + .filter(article_revisions::revision.eq(new_revision)) + .load::(&*conn)? + .pop() + .expect("We just inserted this row!") + ) }) - .boxed() + }) } }