From 7b485f87b9c088cc325d602efc9d05e0caa17af5 Mon Sep 17 00:00:00 2001 From: Magnus Hoff Date: Wed, 15 Nov 2017 15:48:45 +0100 Subject: [PATCH] Refactor State, centralizing db connection and cpu pool handling --- src/db.rs | 2 +- src/state.rs | 476 +++++++++++++++++++++++++++------------------------ 2 files changed, 253 insertions(+), 225 deletions(-) diff --git a/src/db.rs b/src/db.rs index c7e697a..33f4909 100644 --- a/src/db.rs +++ b/src/db.rs @@ -18,7 +18,7 @@ impl CustomizeConnection for SqliteInitial } } -pub fn create_pool(connection_string: String) -> Result>, Box<::std::error::Error>> { +pub fn create_pool>(connection_string: S) -> Result>, Box<::std::error::Error>> { let config = Config::builder() .connection_customizer(Box::new(SqliteInitializer {})) .build(); diff --git a/src/state.rs b/src/state.rs index 641c002..b6bf468 100644 --- a/src/state.rs +++ b/src/state.rs @@ -81,6 +81,240 @@ fn decide_slug(conn: &SqliteConnection, article_id: i32, prev_title: &str, title } } +struct SyncState<'a> { + db_connection: &'a diesel::SqliteConnection, +} + +impl<'a> SyncState<'a> { + fn new(db_connection: &diesel::SqliteConnection) -> SyncState { + SyncState { db_connection } + } + + pub fn get_article_slug(&self, article_id: i32) -> Result, Error> { + use schema::article_revisions; + + Ok(article_revisions::table + .filter(article_revisions::article_id.eq(article_id)) + .filter(article_revisions::latest.eq(true)) + .select((article_revisions::slug)) + .first::(self.db_connection) + .optional()?) + } + + pub fn get_article_revision(&self, article_id: i32, revision: i32) -> Result, Error> { + use schema::article_revisions; + + Ok(article_revisions::table + .filter(article_revisions::article_id.eq(article_id)) + .filter(article_revisions::revision.eq(revision)) + .first::(self.db_connection) + .optional()?) + } + + pub fn query_article_revision_stubs(&self, f: F) -> Result, Error> + where + F: 'static + Send + Sync, + for <'x> F: + FnOnce(article_revisions::BoxedQuery<'x, diesel::sqlite::Sqlite>) -> + article_revisions::BoxedQuery<'x, diesel::sqlite::Sqlite>, + { + use schema::article_revisions::dsl::*; + + Ok(f(article_revisions.into_boxed()) + .select(( + sequence_number, + article_id, + revision, + created, + slug, + title, + latest, + author, + )) + .load(self.db_connection)? + ) + } + + pub fn lookup_slug(&self, slug: String) -> Result { + #[derive(Queryable)] + struct ArticleRevisionStub { + article_id: i32, + revision: i32, + latest: bool, + } + + self.db_connection.transaction(|| { + use schema::article_revisions; + + Ok(match article_revisions::table + .filter(article_revisions::slug.eq(slug)) + .order(article_revisions::sequence_number.desc()) + .select(( + article_revisions::article_id, + article_revisions::revision, + article_revisions::latest, + )) + .first::(self.db_connection) + .optional()? + { + None => SlugLookup::Miss, + Some(ref stub) if stub.latest => SlugLookup::Hit { + article_id: stub.article_id, + revision: stub.revision, + }, + Some(stub) => SlugLookup::Redirect( + article_revisions::table + .filter(article_revisions::latest.eq(true)) + .filter(article_revisions::article_id.eq(stub.article_id)) + .select(article_revisions::slug) + .first::(self.db_connection)? + ) + }) + }) + } + + pub fn update_article(&self, article_id: i32, base_revision: i32, title: String, body: String, author: Option) + -> Result + { + if title.is_empty() { + Err("title cannot be empty")?; + } + + self.db_connection.transaction(|| { + use schema::article_revisions; + + let (latest_revision, prev_title, prev_slug) = article_revisions::table + .filter(article_revisions::article_id.eq(article_id)) + .order(article_revisions::revision.desc()) + .select(( + article_revisions::revision, + article_revisions::title, + article_revisions::slug, + )) + .first::<(i32, String, String)>(self.db_connection)?; + + if latest_revision != base_revision { + // TODO: If it is the same edit repeated, just respond OK + // TODO: If there is a conflict, transform the edit to work seamlessly + unimplemented!("TODO Missing handling of revision conflicts"); + } + let new_revision = base_revision + 1; + + let slug = decide_slug(self.db_connection, article_id, &prev_title, &title, Some(&prev_slug))?; + + diesel::update( + article_revisions::table + .filter(article_revisions::article_id.eq(article_id)) + .filter(article_revisions::revision.eq(base_revision)) + ) + .set(article_revisions::latest.eq(false)) + .execute(self.db_connection)?; + + diesel::insert(&NewRevision { + article_id, + revision: new_revision, + slug: &slug, + title: &title, + body: &body, + author: author.as_ref().map(|x| &**x), + latest: true, + }) + .into(article_revisions::table) + .execute(self.db_connection)?; + + Ok(article_revisions::table + .filter(article_revisions::article_id.eq(article_id)) + .filter(article_revisions::revision.eq(new_revision)) + .first::(self.db_connection)? + ) + }) + } + + pub fn create_article(&self, target_slug: Option, title: String, body: String, author: Option) + -> Result + { + if title.is_empty() { + Err("title cannot be empty")?; + } + + self.db_connection.transaction(|| { + #[derive(Insertable)] + #[table_name="articles"] + struct NewArticle { + id: Option + } + + let article_id = { + use diesel::expression::sql_literal::sql; + // Diesel and SQLite are a bit in disagreement for how this should look: + sql::<(diesel::types::Integer)>("INSERT INTO articles VALUES (null)") + .execute(self.db_connection)?; + sql::<(diesel::types::Integer)>("SELECT LAST_INSERT_ROWID()") + .load::(self.db_connection)? + .pop().expect("Statement must evaluate to an integer") + }; + + let slug = decide_slug(self.db_connection, article_id, "", &title, target_slug.as_ref().map(|x| &**x))?; + + let new_revision = 1; + + diesel::insert(&NewRevision { + article_id, + revision: new_revision, + slug: &slug, + title: &title, + body: &body, + author: author.as_ref().map(|x| &**x), + latest: true, + }) + .into(article_revisions::table) + .execute(self.db_connection)?; + + Ok(article_revisions::table + .filter(article_revisions::article_id.eq(article_id)) + .filter(article_revisions::revision.eq(new_revision)) + .first::(self.db_connection)? + ) + }) + } + + pub fn search_query(&self, query_string: String, limit: i32, offset: i32, snippet_size: i32) -> Result, Error> { + use diesel::expression::sql_literal::sql; + use diesel::types::{Integer, Text}; + + fn fts_quote(src: &str) -> String { + format!("\"{}\"", src.replace('\"', "\"\"")) + } + + let words = query_string + .split_whitespace() + .map(fts_quote) + .collect::>(); + + let query = if words.len() > 1 { + format!("NEAR({})", words.join(" ")) + } else if words.len() == 1 { + format!("{}*", words[0]) + } else { + "\"\"".to_owned() + }; + + Ok( + sql::<(Text, Text, Text)>( + "SELECT title, snippet(article_search, 1, '', '', '\u{2026}', ?), slug \ + FROM article_search \ + WHERE article_search MATCH ? \ + ORDER BY rank \ + LIMIT ? OFFSET ?" + ) + .bind::(snippet_size) + .bind::(query) + .bind::(limit) + .bind::(offset) + .load(self.db_connection)?) + } +} + impl State { pub fn new(connection_pool: Pool>, cpu_pool: futures_cpupool::CpuPool) -> State { State { @@ -89,33 +323,27 @@ impl State { } } - pub fn get_article_slug(&self, article_id: i32) -> CpuFuture, Error> { + fn execute(&self, f: F) -> CpuFuture + where + F: 'static + Sync + Send, + for <'a> F: FnOnce(SyncState<'a>) -> Result, + T: 'static + Send, + { let connection_pool = self.connection_pool.clone(); self.cpu_pool.spawn_fn(move || { - use schema::article_revisions; + let db_connection = connection_pool.get()?; - Ok(article_revisions::table - .filter(article_revisions::article_id.eq(article_id)) - .filter(article_revisions::latest.eq(true)) - .select((article_revisions::slug)) - .first::(&*connection_pool.get()?) - .optional()?) + f(SyncState::new(&*db_connection)) }) } + pub fn get_article_slug(&self, article_id: i32) -> CpuFuture, Error> { + self.execute(move |state| state.get_article_slug(article_id)) + } + pub fn get_article_revision(&self, article_id: i32, revision: i32) -> CpuFuture, Error> { - let connection_pool = self.connection_pool.clone(); - - self.cpu_pool.spawn_fn(move || { - use schema::article_revisions; - - Ok(article_revisions::table - .filter(article_revisions::article_id.eq(article_id)) - .filter(article_revisions::revision.eq(revision)) - .first::(&*connection_pool.get()?) - .optional()?) - }) + self.execute(move |state| state.get_article_revision(article_id, revision)) } pub fn query_article_revision_stubs(&self, f: F) -> CpuFuture, Error> @@ -125,25 +353,7 @@ impl State { FnOnce(article_revisions::BoxedQuery<'a, diesel::sqlite::Sqlite>) -> article_revisions::BoxedQuery<'a, diesel::sqlite::Sqlite>, { - let connection_pool = self.connection_pool.clone(); - - self.cpu_pool.spawn_fn(move || { - use schema::article_revisions::dsl::*; - - Ok(f(article_revisions.into_boxed()) - .select(( - sequence_number, - article_id, - revision, - created, - slug, - title, - latest, - author, - )) - .load(&*connection_pool.get()?)? - ) - }) + self.execute(move |state| state.query_article_revision_stubs(f)) } pub fn get_latest_article_revision_stubs(&self) -> CpuFuture, Error> { @@ -155,204 +365,22 @@ impl State { } pub fn lookup_slug(&self, slug: String) -> CpuFuture { - #[derive(Queryable)] - struct ArticleRevisionStub { - article_id: i32, - revision: i32, - latest: bool, - } - - let connection_pool = self.connection_pool.clone(); - - self.cpu_pool.spawn_fn(move || { - let conn = connection_pool.get()?; - - conn.transaction(|| { - use schema::article_revisions; - - Ok(match article_revisions::table - .filter(article_revisions::slug.eq(slug)) - .order(article_revisions::sequence_number.desc()) - .select(( - article_revisions::article_id, - article_revisions::revision, - article_revisions::latest, - )) - .first::(&*conn) - .optional()? - { - None => SlugLookup::Miss, - Some(ref stub) if stub.latest => SlugLookup::Hit { - article_id: stub.article_id, - revision: stub.revision, - }, - Some(stub) => SlugLookup::Redirect( - article_revisions::table - .filter(article_revisions::latest.eq(true)) - .filter(article_revisions::article_id.eq(stub.article_id)) - .select(article_revisions::slug) - .first::(&*conn)? - ) - }) - }) - }) + self.execute(move |state| state.lookup_slug(slug)) } pub fn update_article(&self, article_id: i32, base_revision: i32, title: String, body: String, author: Option) -> CpuFuture { - let connection_pool = self.connection_pool.clone(); - - self.cpu_pool.spawn_fn(move || { - if title.is_empty() { - Err("title cannot be empty")?; - } - - let conn = connection_pool.get()?; - - conn.transaction(|| { - use schema::article_revisions; - - let (latest_revision, prev_title, prev_slug) = article_revisions::table - .filter(article_revisions::article_id.eq(article_id)) - .order(article_revisions::revision.desc()) - .select(( - article_revisions::revision, - article_revisions::title, - article_revisions::slug, - )) - .first::<(i32, String, String)>(&*conn)?; - - if latest_revision != base_revision { - // TODO: If it is the same edit repeated, just respond OK - // TODO: If there is a conflict, transform the edit to work seamlessly - unimplemented!("TODO Missing handling of revision conflicts"); - } - let new_revision = base_revision + 1; - - let slug = decide_slug(&*conn, article_id, &prev_title, &title, Some(&prev_slug))?; - - diesel::update( - article_revisions::table - .filter(article_revisions::article_id.eq(article_id)) - .filter(article_revisions::revision.eq(base_revision)) - ) - .set(article_revisions::latest.eq(false)) - .execute(&*conn)?; - - diesel::insert(&NewRevision { - article_id, - revision: new_revision, - slug: &slug, - title: &title, - body: &body, - author: author.as_ref().map(|x| &**x), - latest: true, - }) - .into(article_revisions::table) - .execute(&*conn)?; - - Ok(article_revisions::table - .filter(article_revisions::article_id.eq(article_id)) - .filter(article_revisions::revision.eq(new_revision)) - .first::(&*conn)? - ) - }) - }) + self.execute(move |state| state.update_article(article_id, base_revision, title, body, author)) } pub fn create_article(&self, target_slug: Option, title: String, body: String, author: Option) -> CpuFuture { - let connection_pool = self.connection_pool.clone(); - - self.cpu_pool.spawn_fn(move || { - if title.is_empty() { - Err("title cannot be empty")?; - } - - let conn = connection_pool.get()?; - - conn.transaction(|| { - #[derive(Insertable)] - #[table_name="articles"] - struct NewArticle { - id: Option - } - - let article_id = { - use diesel::expression::sql_literal::sql; - // Diesel and SQLite are a bit in disagreement for how this should look: - sql::<(diesel::types::Integer)>("INSERT INTO articles VALUES (null)") - .execute(&*conn)?; - sql::<(diesel::types::Integer)>("SELECT LAST_INSERT_ROWID()") - .load::(&*conn)? - .pop().expect("Statement must evaluate to an integer") - }; - - let slug = decide_slug(&*conn, article_id, "", &title, target_slug.as_ref().map(|x| &**x))?; - - let new_revision = 1; - - diesel::insert(&NewRevision { - article_id, - revision: new_revision, - slug: &slug, - title: &title, - body: &body, - author: author.as_ref().map(|x| &**x), - latest: true, - }) - .into(article_revisions::table) - .execute(&*conn)?; - - Ok(article_revisions::table - .filter(article_revisions::article_id.eq(article_id)) - .filter(article_revisions::revision.eq(new_revision)) - .first::(&*conn)? - ) - }) - }) + self.execute(move |state| state.create_article(target_slug, title, body, author)) } pub fn search_query(&self, query_string: String, limit: i32, offset: i32, snippet_size: i32) -> CpuFuture, Error> { - let connection_pool = self.connection_pool.clone(); - - self.cpu_pool.spawn_fn(move || { - use diesel::expression::sql_literal::sql; - use diesel::types::{Integer, Text}; - - fn fts_quote(src: &str) -> String { - format!("\"{}\"", src.replace('\"', "\"\"")) - } - - let words = query_string - .split_whitespace() - .map(fts_quote) - .collect::>(); - - let query = if words.len() > 1 { - format!("NEAR({})", words.join(" ")) - } else if words.len() == 1 { - format!("{}*", words[0]) - } else { - "\"\"".to_owned() - }; - - Ok( - sql::<(Text, Text, Text)>( - "SELECT title, snippet(article_search, 1, '', '', '\u{2026}', ?), slug \ - FROM article_search \ - WHERE article_search MATCH ? \ - ORDER BY rank \ - LIMIT ? OFFSET ?" - ) - .bind::(snippet_size) - .bind::(query) - .bind::(limit) - .bind::(offset) - .load(&*connection_pool.get()?)?) - }) + self.execute(move |state| state.search_query(query_string, limit, offset, snippet_size)) } - }