Execute State functions in thread pool

This commit is contained in:
Magnus Hoff 2017-09-08 16:21:24 +02:00
parent b3e7552c16
commit 6fb1062376
4 changed files with 62 additions and 52 deletions

1
Cargo.lock generated
View file

@ -9,6 +9,7 @@ dependencies = [
"diesel 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)", "diesel 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel_codegen 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "diesel_codegen 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.11.2 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"libsqlite3-sys 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "libsqlite3-sys 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",

View file

@ -5,6 +5,7 @@ authors = ["Magnus Hoff <maghoff@gmail.com>"]
[dependencies] [dependencies]
futures = "0.1" futures = "0.1"
futures-cpupool = "0.1"
hyper = "0.11" hyper = "0.11"
tokio-io = "0.1" tokio-io = "0.1"
tokio-proto = "0.1" tokio-proto = "0.1"

View file

@ -7,6 +7,7 @@
extern crate chrono; extern crate chrono;
extern crate clap; extern crate clap;
extern crate futures; extern crate futures;
extern crate futures_cpupool;
extern crate hyper; extern crate hyper;
extern crate pulldown_cmark; extern crate pulldown_cmark;
extern crate r2d2; extern crate r2d2;

View file

@ -3,7 +3,7 @@ use std;
use diesel; use diesel;
use diesel::sqlite::SqliteConnection; use diesel::sqlite::SqliteConnection;
use diesel::prelude::*; use diesel::prelude::*;
use futures::{Future, IntoFuture, BoxFuture}; use futures_cpupool::{self, CpuFuture};
use r2d2::Pool; use r2d2::Pool;
use r2d2_diesel::ConnectionManager; use r2d2_diesel::ConnectionManager;
@ -11,79 +11,86 @@ use models;
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {
connection_pool: Pool<ConnectionManager<SqliteConnection>> connection_pool: Pool<ConnectionManager<SqliteConnection>>,
cpu_pool: futures_cpupool::CpuPool,
} }
pub type Error = Box<std::error::Error + Send + Sync>; pub type Error = Box<std::error::Error + Send + Sync>;
impl State { impl State {
pub fn new(connection_pool: Pool<ConnectionManager<SqliteConnection>>) -> State { pub fn new(connection_pool: Pool<ConnectionManager<SqliteConnection>>) -> State {
State { connection_pool } State {
connection_pool,
cpu_pool: futures_cpupool::CpuPool::new_num_cpus(),
}
} }
pub fn get_article_revision_by_id(&self, article_id: i32) -> BoxFuture<Option<models::ArticleRevision>, Error> { pub fn get_article_revision_by_id(&self, article_id: i32) -> CpuFuture<Option<models::ArticleRevision>, Error> {
(|| -> Result<_, _> { let connection_pool = self.connection_pool.clone();
self.cpu_pool.spawn_fn(move || {
use schema::article_revisions; use schema::article_revisions;
Ok(article_revisions::table Ok(article_revisions::table
.filter(article_revisions::article_id.eq(article_id)) .filter(article_revisions::article_id.eq(article_id))
.order(article_revisions::revision.desc()) .order(article_revisions::revision.desc())
.limit(1) .limit(1)
.load::<models::ArticleRevision>(&*self.connection_pool.get()?)? .load::<models::ArticleRevision>(&*connection_pool.get()?)?
.pop()) .pop())
}()).into_future().boxed() })
} }
pub fn update_article(&self, article_id: i32, base_revision: i32, body: String) -> BoxFuture<models::ArticleRevision, Error> { pub fn update_article(&self, article_id: i32, base_revision: i32, body: String) -> CpuFuture<models::ArticleRevision, Error> {
self.connection_pool.get().into_future() let connection_pool = self.connection_pool.clone();
.map_err(Into::into)
.and_then(move |conn| {
conn.transaction(|| {
use schema::article_revisions;
let (latest_revision, title) = article_revisions::table self.cpu_pool.spawn_fn(move || {
.filter(article_revisions::article_id.eq(article_id)) let conn = connection_pool.get()?;
.order(article_revisions::revision.desc())
.limit(1)
.select((article_revisions::revision, article_revisions::title))
.load::<(i32, String)>(&*conn)?
.pop()
.unwrap_or_else(|| unimplemented!("TODO Missing an error type"));
if latest_revision != base_revision { conn.transaction(|| {
// TODO: If it is the same edit repeated, just respond OK use schema::article_revisions;
// TODO: If there is a conflict, transform the edit to work seamlessly
unimplemented!("TODO Missing handling of revision conflicts");
}
let new_revision = base_revision + 1;
#[derive(Insertable)] let (latest_revision, title) = article_revisions::table
#[table_name="article_revisions"] .filter(article_revisions::article_id.eq(article_id))
struct NewRevision<'a> { .order(article_revisions::revision.desc())
article_id: i32, .limit(1)
revision: i32, .select((article_revisions::revision, article_revisions::title))
title: &'a str, .load::<(i32, String)>(&*conn)?
body: &'a str, .pop()
} .unwrap_or_else(|| unimplemented!("TODO Missing an error type"));
diesel::insert(&NewRevision { if latest_revision != base_revision {
article_id, // TODO: If it is the same edit repeated, just respond OK
revision: new_revision, // TODO: If there is a conflict, transform the edit to work seamlessly
title: &title, unimplemented!("TODO Missing handling of revision conflicts");
body: &body, }
}) let new_revision = base_revision + 1;
.into(article_revisions::table)
.execute(&*conn)?;
Ok(article_revisions::table #[derive(Insertable)]
.filter(article_revisions::article_id.eq(article_id)) #[table_name="article_revisions"]
.filter(article_revisions::revision.eq(new_revision)) struct NewRevision<'a> {
.load::<models::ArticleRevision>(&*conn)? article_id: i32,
.pop() revision: i32,
.expect("We just inserted this row!") title: &'a str,
) body: &'a str,
}) }
diesel::insert(&NewRevision {
article_id,
revision: new_revision,
title: &title,
body: &body,
})
.into(article_revisions::table)
.execute(&*conn)?;
Ok(article_revisions::table
.filter(article_revisions::article_id.eq(article_id))
.filter(article_revisions::revision.eq(new_revision))
.load::<models::ArticleRevision>(&*conn)?
.pop()
.expect("We just inserted this row!")
)
}) })
.boxed() })
} }
} }