Refactor State, centralizing db connection and cpu pool handling
This commit is contained in:
parent
59e1ca8fb4
commit
7b485f87b9
2 changed files with 253 additions and 225 deletions
|
@ -18,7 +18,7 @@ impl CustomizeConnection<SqliteConnection, r2d2_diesel::Error> for SqliteInitial
|
|||
}
|
||||
}
|
||||
|
||||
pub fn create_pool(connection_string: String) -> Result<Pool<ConnectionManager<SqliteConnection>>, Box<::std::error::Error>> {
|
||||
pub fn create_pool<S: Into<String>>(connection_string: S) -> Result<Pool<ConnectionManager<SqliteConnection>>, Box<::std::error::Error>> {
|
||||
let config = Config::builder()
|
||||
.connection_customizer(Box::new(SqliteInitializer {}))
|
||||
.build();
|
||||
|
|
476
src/state.rs
476
src/state.rs
|
@ -81,6 +81,240 @@ fn decide_slug(conn: &SqliteConnection, article_id: i32, prev_title: &str, title
|
|||
}
|
||||
}
|
||||
|
||||
struct SyncState<'a> {
|
||||
db_connection: &'a diesel::SqliteConnection,
|
||||
}
|
||||
|
||||
impl<'a> SyncState<'a> {
|
||||
fn new(db_connection: &diesel::SqliteConnection) -> SyncState {
|
||||
SyncState { db_connection }
|
||||
}
|
||||
|
||||
pub fn get_article_slug(&self, article_id: i32) -> Result<Option<String>, Error> {
|
||||
use schema::article_revisions;
|
||||
|
||||
Ok(article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.filter(article_revisions::latest.eq(true))
|
||||
.select((article_revisions::slug))
|
||||
.first::<String>(self.db_connection)
|
||||
.optional()?)
|
||||
}
|
||||
|
||||
pub fn get_article_revision(&self, article_id: i32, revision: i32) -> Result<Option<models::ArticleRevision>, Error> {
|
||||
use schema::article_revisions;
|
||||
|
||||
Ok(article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.filter(article_revisions::revision.eq(revision))
|
||||
.first::<models::ArticleRevision>(self.db_connection)
|
||||
.optional()?)
|
||||
}
|
||||
|
||||
pub fn query_article_revision_stubs<F>(&self, f: F) -> Result<Vec<models::ArticleRevisionStub>, Error>
|
||||
where
|
||||
F: 'static + Send + Sync,
|
||||
for <'x> F:
|
||||
FnOnce(article_revisions::BoxedQuery<'x, diesel::sqlite::Sqlite>) ->
|
||||
article_revisions::BoxedQuery<'x, diesel::sqlite::Sqlite>,
|
||||
{
|
||||
use schema::article_revisions::dsl::*;
|
||||
|
||||
Ok(f(article_revisions.into_boxed())
|
||||
.select((
|
||||
sequence_number,
|
||||
article_id,
|
||||
revision,
|
||||
created,
|
||||
slug,
|
||||
title,
|
||||
latest,
|
||||
author,
|
||||
))
|
||||
.load(self.db_connection)?
|
||||
)
|
||||
}
|
||||
|
||||
pub fn lookup_slug(&self, slug: String) -> Result<SlugLookup, Error> {
|
||||
#[derive(Queryable)]
|
||||
struct ArticleRevisionStub {
|
||||
article_id: i32,
|
||||
revision: i32,
|
||||
latest: bool,
|
||||
}
|
||||
|
||||
self.db_connection.transaction(|| {
|
||||
use schema::article_revisions;
|
||||
|
||||
Ok(match article_revisions::table
|
||||
.filter(article_revisions::slug.eq(slug))
|
||||
.order(article_revisions::sequence_number.desc())
|
||||
.select((
|
||||
article_revisions::article_id,
|
||||
article_revisions::revision,
|
||||
article_revisions::latest,
|
||||
))
|
||||
.first::<ArticleRevisionStub>(self.db_connection)
|
||||
.optional()?
|
||||
{
|
||||
None => SlugLookup::Miss,
|
||||
Some(ref stub) if stub.latest => SlugLookup::Hit {
|
||||
article_id: stub.article_id,
|
||||
revision: stub.revision,
|
||||
},
|
||||
Some(stub) => SlugLookup::Redirect(
|
||||
article_revisions::table
|
||||
.filter(article_revisions::latest.eq(true))
|
||||
.filter(article_revisions::article_id.eq(stub.article_id))
|
||||
.select(article_revisions::slug)
|
||||
.first::<String>(self.db_connection)?
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_article(&self, article_id: i32, base_revision: i32, title: String, body: String, author: Option<String>)
|
||||
-> Result<models::ArticleRevision, Error>
|
||||
{
|
||||
if title.is_empty() {
|
||||
Err("title cannot be empty")?;
|
||||
}
|
||||
|
||||
self.db_connection.transaction(|| {
|
||||
use schema::article_revisions;
|
||||
|
||||
let (latest_revision, prev_title, prev_slug) = article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.order(article_revisions::revision.desc())
|
||||
.select((
|
||||
article_revisions::revision,
|
||||
article_revisions::title,
|
||||
article_revisions::slug,
|
||||
))
|
||||
.first::<(i32, String, String)>(self.db_connection)?;
|
||||
|
||||
if latest_revision != base_revision {
|
||||
// TODO: If it is the same edit repeated, just respond OK
|
||||
// TODO: If there is a conflict, transform the edit to work seamlessly
|
||||
unimplemented!("TODO Missing handling of revision conflicts");
|
||||
}
|
||||
let new_revision = base_revision + 1;
|
||||
|
||||
let slug = decide_slug(self.db_connection, article_id, &prev_title, &title, Some(&prev_slug))?;
|
||||
|
||||
diesel::update(
|
||||
article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.filter(article_revisions::revision.eq(base_revision))
|
||||
)
|
||||
.set(article_revisions::latest.eq(false))
|
||||
.execute(self.db_connection)?;
|
||||
|
||||
diesel::insert(&NewRevision {
|
||||
article_id,
|
||||
revision: new_revision,
|
||||
slug: &slug,
|
||||
title: &title,
|
||||
body: &body,
|
||||
author: author.as_ref().map(|x| &**x),
|
||||
latest: true,
|
||||
})
|
||||
.into(article_revisions::table)
|
||||
.execute(self.db_connection)?;
|
||||
|
||||
Ok(article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.filter(article_revisions::revision.eq(new_revision))
|
||||
.first::<models::ArticleRevision>(self.db_connection)?
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn create_article(&self, target_slug: Option<String>, title: String, body: String, author: Option<String>)
|
||||
-> Result<models::ArticleRevision, Error>
|
||||
{
|
||||
if title.is_empty() {
|
||||
Err("title cannot be empty")?;
|
||||
}
|
||||
|
||||
self.db_connection.transaction(|| {
|
||||
#[derive(Insertable)]
|
||||
#[table_name="articles"]
|
||||
struct NewArticle {
|
||||
id: Option<i32>
|
||||
}
|
||||
|
||||
let article_id = {
|
||||
use diesel::expression::sql_literal::sql;
|
||||
// Diesel and SQLite are a bit in disagreement for how this should look:
|
||||
sql::<(diesel::types::Integer)>("INSERT INTO articles VALUES (null)")
|
||||
.execute(self.db_connection)?;
|
||||
sql::<(diesel::types::Integer)>("SELECT LAST_INSERT_ROWID()")
|
||||
.load::<i32>(self.db_connection)?
|
||||
.pop().expect("Statement must evaluate to an integer")
|
||||
};
|
||||
|
||||
let slug = decide_slug(self.db_connection, article_id, "", &title, target_slug.as_ref().map(|x| &**x))?;
|
||||
|
||||
let new_revision = 1;
|
||||
|
||||
diesel::insert(&NewRevision {
|
||||
article_id,
|
||||
revision: new_revision,
|
||||
slug: &slug,
|
||||
title: &title,
|
||||
body: &body,
|
||||
author: author.as_ref().map(|x| &**x),
|
||||
latest: true,
|
||||
})
|
||||
.into(article_revisions::table)
|
||||
.execute(self.db_connection)?;
|
||||
|
||||
Ok(article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.filter(article_revisions::revision.eq(new_revision))
|
||||
.first::<models::ArticleRevision>(self.db_connection)?
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn search_query(&self, query_string: String, limit: i32, offset: i32, snippet_size: i32) -> Result<Vec<models::SearchResult>, Error> {
|
||||
use diesel::expression::sql_literal::sql;
|
||||
use diesel::types::{Integer, Text};
|
||||
|
||||
fn fts_quote(src: &str) -> String {
|
||||
format!("\"{}\"", src.replace('\"', "\"\""))
|
||||
}
|
||||
|
||||
let words = query_string
|
||||
.split_whitespace()
|
||||
.map(fts_quote)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let query = if words.len() > 1 {
|
||||
format!("NEAR({})", words.join(" "))
|
||||
} else if words.len() == 1 {
|
||||
format!("{}*", words[0])
|
||||
} else {
|
||||
"\"\"".to_owned()
|
||||
};
|
||||
|
||||
Ok(
|
||||
sql::<(Text, Text, Text)>(
|
||||
"SELECT title, snippet(article_search, 1, '', '', '\u{2026}', ?), slug \
|
||||
FROM article_search \
|
||||
WHERE article_search MATCH ? \
|
||||
ORDER BY rank \
|
||||
LIMIT ? OFFSET ?"
|
||||
)
|
||||
.bind::<Integer, _>(snippet_size)
|
||||
.bind::<Text, _>(query)
|
||||
.bind::<Integer, _>(limit)
|
||||
.bind::<Integer, _>(offset)
|
||||
.load(self.db_connection)?)
|
||||
}
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn new(connection_pool: Pool<ConnectionManager<SqliteConnection>>, cpu_pool: futures_cpupool::CpuPool) -> State {
|
||||
State {
|
||||
|
@ -89,33 +323,27 @@ impl State {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn get_article_slug(&self, article_id: i32) -> CpuFuture<Option<String>, Error> {
|
||||
fn execute<F, T>(&self, f: F) -> CpuFuture<T, Error>
|
||||
where
|
||||
F: 'static + Sync + Send,
|
||||
for <'a> F: FnOnce(SyncState<'a>) -> Result<T, Error>,
|
||||
T: 'static + Send,
|
||||
{
|
||||
let connection_pool = self.connection_pool.clone();
|
||||
|
||||
self.cpu_pool.spawn_fn(move || {
|
||||
use schema::article_revisions;
|
||||
let db_connection = connection_pool.get()?;
|
||||
|
||||
Ok(article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.filter(article_revisions::latest.eq(true))
|
||||
.select((article_revisions::slug))
|
||||
.first::<String>(&*connection_pool.get()?)
|
||||
.optional()?)
|
||||
f(SyncState::new(&*db_connection))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_article_slug(&self, article_id: i32) -> CpuFuture<Option<String>, Error> {
|
||||
self.execute(move |state| state.get_article_slug(article_id))
|
||||
}
|
||||
|
||||
pub fn get_article_revision(&self, article_id: i32, revision: i32) -> CpuFuture<Option<models::ArticleRevision>, Error> {
|
||||
let connection_pool = self.connection_pool.clone();
|
||||
|
||||
self.cpu_pool.spawn_fn(move || {
|
||||
use schema::article_revisions;
|
||||
|
||||
Ok(article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.filter(article_revisions::revision.eq(revision))
|
||||
.first::<models::ArticleRevision>(&*connection_pool.get()?)
|
||||
.optional()?)
|
||||
})
|
||||
self.execute(move |state| state.get_article_revision(article_id, revision))
|
||||
}
|
||||
|
||||
pub fn query_article_revision_stubs<F>(&self, f: F) -> CpuFuture<Vec<models::ArticleRevisionStub>, Error>
|
||||
|
@ -125,25 +353,7 @@ impl State {
|
|||
FnOnce(article_revisions::BoxedQuery<'a, diesel::sqlite::Sqlite>) ->
|
||||
article_revisions::BoxedQuery<'a, diesel::sqlite::Sqlite>,
|
||||
{
|
||||
let connection_pool = self.connection_pool.clone();
|
||||
|
||||
self.cpu_pool.spawn_fn(move || {
|
||||
use schema::article_revisions::dsl::*;
|
||||
|
||||
Ok(f(article_revisions.into_boxed())
|
||||
.select((
|
||||
sequence_number,
|
||||
article_id,
|
||||
revision,
|
||||
created,
|
||||
slug,
|
||||
title,
|
||||
latest,
|
||||
author,
|
||||
))
|
||||
.load(&*connection_pool.get()?)?
|
||||
)
|
||||
})
|
||||
self.execute(move |state| state.query_article_revision_stubs(f))
|
||||
}
|
||||
|
||||
pub fn get_latest_article_revision_stubs(&self) -> CpuFuture<Vec<models::ArticleRevisionStub>, Error> {
|
||||
|
@ -155,204 +365,22 @@ impl State {
|
|||
}
|
||||
|
||||
pub fn lookup_slug(&self, slug: String) -> CpuFuture<SlugLookup, Error> {
|
||||
#[derive(Queryable)]
|
||||
struct ArticleRevisionStub {
|
||||
article_id: i32,
|
||||
revision: i32,
|
||||
latest: bool,
|
||||
}
|
||||
|
||||
let connection_pool = self.connection_pool.clone();
|
||||
|
||||
self.cpu_pool.spawn_fn(move || {
|
||||
let conn = connection_pool.get()?;
|
||||
|
||||
conn.transaction(|| {
|
||||
use schema::article_revisions;
|
||||
|
||||
Ok(match article_revisions::table
|
||||
.filter(article_revisions::slug.eq(slug))
|
||||
.order(article_revisions::sequence_number.desc())
|
||||
.select((
|
||||
article_revisions::article_id,
|
||||
article_revisions::revision,
|
||||
article_revisions::latest,
|
||||
))
|
||||
.first::<ArticleRevisionStub>(&*conn)
|
||||
.optional()?
|
||||
{
|
||||
None => SlugLookup::Miss,
|
||||
Some(ref stub) if stub.latest => SlugLookup::Hit {
|
||||
article_id: stub.article_id,
|
||||
revision: stub.revision,
|
||||
},
|
||||
Some(stub) => SlugLookup::Redirect(
|
||||
article_revisions::table
|
||||
.filter(article_revisions::latest.eq(true))
|
||||
.filter(article_revisions::article_id.eq(stub.article_id))
|
||||
.select(article_revisions::slug)
|
||||
.first::<String>(&*conn)?
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
self.execute(move |state| state.lookup_slug(slug))
|
||||
}
|
||||
|
||||
pub fn update_article(&self, article_id: i32, base_revision: i32, title: String, body: String, author: Option<String>)
|
||||
-> CpuFuture<models::ArticleRevision, Error>
|
||||
{
|
||||
let connection_pool = self.connection_pool.clone();
|
||||
|
||||
self.cpu_pool.spawn_fn(move || {
|
||||
if title.is_empty() {
|
||||
Err("title cannot be empty")?;
|
||||
}
|
||||
|
||||
let conn = connection_pool.get()?;
|
||||
|
||||
conn.transaction(|| {
|
||||
use schema::article_revisions;
|
||||
|
||||
let (latest_revision, prev_title, prev_slug) = article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.order(article_revisions::revision.desc())
|
||||
.select((
|
||||
article_revisions::revision,
|
||||
article_revisions::title,
|
||||
article_revisions::slug,
|
||||
))
|
||||
.first::<(i32, String, String)>(&*conn)?;
|
||||
|
||||
if latest_revision != base_revision {
|
||||
// TODO: If it is the same edit repeated, just respond OK
|
||||
// TODO: If there is a conflict, transform the edit to work seamlessly
|
||||
unimplemented!("TODO Missing handling of revision conflicts");
|
||||
}
|
||||
let new_revision = base_revision + 1;
|
||||
|
||||
let slug = decide_slug(&*conn, article_id, &prev_title, &title, Some(&prev_slug))?;
|
||||
|
||||
diesel::update(
|
||||
article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.filter(article_revisions::revision.eq(base_revision))
|
||||
)
|
||||
.set(article_revisions::latest.eq(false))
|
||||
.execute(&*conn)?;
|
||||
|
||||
diesel::insert(&NewRevision {
|
||||
article_id,
|
||||
revision: new_revision,
|
||||
slug: &slug,
|
||||
title: &title,
|
||||
body: &body,
|
||||
author: author.as_ref().map(|x| &**x),
|
||||
latest: true,
|
||||
})
|
||||
.into(article_revisions::table)
|
||||
.execute(&*conn)?;
|
||||
|
||||
Ok(article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.filter(article_revisions::revision.eq(new_revision))
|
||||
.first::<models::ArticleRevision>(&*conn)?
|
||||
)
|
||||
})
|
||||
})
|
||||
self.execute(move |state| state.update_article(article_id, base_revision, title, body, author))
|
||||
}
|
||||
|
||||
pub fn create_article(&self, target_slug: Option<String>, title: String, body: String, author: Option<String>)
|
||||
-> CpuFuture<models::ArticleRevision, Error>
|
||||
{
|
||||
let connection_pool = self.connection_pool.clone();
|
||||
|
||||
self.cpu_pool.spawn_fn(move || {
|
||||
if title.is_empty() {
|
||||
Err("title cannot be empty")?;
|
||||
}
|
||||
|
||||
let conn = connection_pool.get()?;
|
||||
|
||||
conn.transaction(|| {
|
||||
#[derive(Insertable)]
|
||||
#[table_name="articles"]
|
||||
struct NewArticle {
|
||||
id: Option<i32>
|
||||
}
|
||||
|
||||
let article_id = {
|
||||
use diesel::expression::sql_literal::sql;
|
||||
// Diesel and SQLite are a bit in disagreement for how this should look:
|
||||
sql::<(diesel::types::Integer)>("INSERT INTO articles VALUES (null)")
|
||||
.execute(&*conn)?;
|
||||
sql::<(diesel::types::Integer)>("SELECT LAST_INSERT_ROWID()")
|
||||
.load::<i32>(&*conn)?
|
||||
.pop().expect("Statement must evaluate to an integer")
|
||||
};
|
||||
|
||||
let slug = decide_slug(&*conn, article_id, "", &title, target_slug.as_ref().map(|x| &**x))?;
|
||||
|
||||
let new_revision = 1;
|
||||
|
||||
diesel::insert(&NewRevision {
|
||||
article_id,
|
||||
revision: new_revision,
|
||||
slug: &slug,
|
||||
title: &title,
|
||||
body: &body,
|
||||
author: author.as_ref().map(|x| &**x),
|
||||
latest: true,
|
||||
})
|
||||
.into(article_revisions::table)
|
||||
.execute(&*conn)?;
|
||||
|
||||
Ok(article_revisions::table
|
||||
.filter(article_revisions::article_id.eq(article_id))
|
||||
.filter(article_revisions::revision.eq(new_revision))
|
||||
.first::<models::ArticleRevision>(&*conn)?
|
||||
)
|
||||
})
|
||||
})
|
||||
self.execute(move |state| state.create_article(target_slug, title, body, author))
|
||||
}
|
||||
|
||||
pub fn search_query(&self, query_string: String, limit: i32, offset: i32, snippet_size: i32) -> CpuFuture<Vec<models::SearchResult>, Error> {
|
||||
let connection_pool = self.connection_pool.clone();
|
||||
|
||||
self.cpu_pool.spawn_fn(move || {
|
||||
use diesel::expression::sql_literal::sql;
|
||||
use diesel::types::{Integer, Text};
|
||||
|
||||
fn fts_quote(src: &str) -> String {
|
||||
format!("\"{}\"", src.replace('\"', "\"\""))
|
||||
}
|
||||
|
||||
let words = query_string
|
||||
.split_whitespace()
|
||||
.map(fts_quote)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let query = if words.len() > 1 {
|
||||
format!("NEAR({})", words.join(" "))
|
||||
} else if words.len() == 1 {
|
||||
format!("{}*", words[0])
|
||||
} else {
|
||||
"\"\"".to_owned()
|
||||
};
|
||||
|
||||
Ok(
|
||||
sql::<(Text, Text, Text)>(
|
||||
"SELECT title, snippet(article_search, 1, '', '', '\u{2026}', ?), slug \
|
||||
FROM article_search \
|
||||
WHERE article_search MATCH ? \
|
||||
ORDER BY rank \
|
||||
LIMIT ? OFFSET ?"
|
||||
)
|
||||
.bind::<Integer, _>(snippet_size)
|
||||
.bind::<Text, _>(query)
|
||||
.bind::<Integer, _>(limit)
|
||||
.bind::<Integer, _>(offset)
|
||||
.load(&*connection_pool.get()?)?)
|
||||
})
|
||||
self.execute(move |state| state.search_query(query_string, limit, offset, snippet_size))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue