diff --git a/src/api/attention.rs b/src/api/attention.rs index 7e2a1c0..75a99b9 100644 --- a/src/api/attention.rs +++ b/src/api/attention.rs @@ -22,7 +22,7 @@ pub async fn attention_post( rconn: RdsConn, ) -> API { user.id.ok_or_else(|| APIError::PcError(NotAllowed))?; - let mut p = Post::get(&db, ai.pid).await?; + let mut p = Post::get(&db, &rconn, ai.pid).await?; p.check_permission(&user, "r")?; let mut att = Attention::init(&user.namehash, &rconn); let switch_to = ai.switch == 1; @@ -35,8 +35,8 @@ pub async fn attention_post( att.remove(ai.pid).await?; delta = -1; } - p = p.change_n_attentions(&db, delta).await?; - p = p.change_hot_score(&db, delta * 2).await?; + p.change_n_attentions(&db, delta).await?; + p.change_hot_score(&db, delta * 2).await?; p.refresh_cache(&rconn, false).await; } @@ -52,7 +52,7 @@ pub async fn attention_post( #[get("/getattention")] pub async fn get_attention(user: CurrentUser, db: Db, rconn: RdsConn) -> API { let ids = Attention::init(&user.namehash, &rconn).all().await?; - let ps = Post::get_multi(&db, ids).await?; + let ps = Post::get_multi(&db, &rconn, &ids).await?; let ps_data = ps2outputs(&ps, &user, &db, &rconn).await; Ok(json!({ diff --git a/src/api/comment.rs b/src/api/comment.rs index 810f202..bc6bb91 100644 --- a/src/api/comment.rs +++ b/src/api/comment.rs @@ -5,7 +5,7 @@ use crate::rds_conn::RdsConn; use crate::rds_models::*; use chrono::{offset::Utc, DateTime}; use rocket::form::Form; -use rocket::futures::{future::TryFutureExt, try_join}; +use rocket::futures::{future::TryFutureExt, join, try_join}; use rocket::serde::{ json::{json, Value}, Serialize, @@ -34,11 +34,7 @@ pub struct CommentOutput { timestamp: i64, } -pub fn c2output<'r>( - p: &'r Post, - cs: &Vec, - user: &CurrentUser, -) -> Vec { +pub fn c2output<'r>(p: &'r Post, cs: &Vec, user: &CurrentUser) -> Vec { let mut hash2id = HashMap::<&String, i32>::from([(&p.author_hash, 0)]); cs.iter() .filter_map(|c| { @@ -71,12 +67,11 @@ pub fn c2output<'r>( #[get("/getcomment?")] pub async fn get_comment(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> API { - let p = Post::get(&db, pid).await?; + let p = Post::get(&db, &rconn, pid).await?; if p.is_deleted { return Err(APIError::PcError(IsDeleted)); } - let pid = p.id; - let cs = Comment::gets_by_post_id(&db, pid).await?; + let cs = p.get_comments(&db, &rconn).await?; let data = c2output(&p, &cs, &user); Ok(json!({ @@ -96,7 +91,7 @@ pub async fn add_comment( db: Db, rconn: RdsConn, ) -> API { - let mut p = Post::get(&db, ci.pid).await?; + let mut p = Post::get(&db, &rconn, ci.pid).await?; Comment::create( &db, NewComment { @@ -108,7 +103,7 @@ pub async fn add_comment( }, ) .await?; - p = p.change_n_comments(&db, 1).await?; + p.change_n_comments(&db, 1).await?; // auto attention after comment let mut att = Attention::init(&user.namehash, &rconn); @@ -119,15 +114,18 @@ pub async fn add_comment( try_join!( att.add(p.id).err_into::(), async { - p = p.change_n_attentions(&db, 1).await?; + p.change_n_attentions(&db, 1).await?; Ok::<(), APIError>(()) } .err_into::(), )?; } - p = p.change_hot_score(&db, hs_delta).await?; - p.refresh_cache(&rconn, false).await; + p.change_hot_score(&db, hs_delta).await?; + join!( + p.refresh_cache(&rconn, false), + p.clear_comments_cache(&rconn), + ); Ok(json!({ "code": 0 diff --git a/src/api/mod.rs b/src/api/mod.rs index e90bef2..ffb10e6 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -40,7 +40,7 @@ impl<'r> FromRequest<'r> for CurrentUser { } else { let db = try_outcome!(request.guard::().await); let rconn = try_outcome!(request.guard::().await); - if let Some(user) = User::get_by_token_with_cache(&db, &rconn, token).await { + if let Some(user) = User::get_by_token(&db, &rconn, token).await { let namehash = rh.hash_with_salt(&user.name); cu = Some(CurrentUser { id: Some(user.id), @@ -74,7 +74,6 @@ pub enum APIError { impl<'r> Responder<'r, 'static> for APIError { fn respond_to(self, req: &'r Request<'_>) -> response::Result<'static> { - dbg!(&self); match self { APIError::DbError(e) => json!({ "code": -1, @@ -120,7 +119,7 @@ pub trait UGC { fn get_is_deleted(&self) -> bool; fn get_is_reported(&self) -> bool; fn extra_delete_condition(&self) -> bool; - async fn do_set_deleted(&self, db: &Db) -> API; + async fn do_set_deleted(&mut self, db: &Db) -> API<()>; fn check_permission(&self, user: &CurrentUser, mode: &str) -> API<()> { if user.is_admin { return Ok(()); @@ -140,10 +139,10 @@ pub trait UGC { Ok(()) } - async fn soft_delete(&self, user: &CurrentUser, db: &Db) -> API<()> { + async fn soft_delete(&mut self, user: &CurrentUser, db: &Db) -> API<()> { self.check_permission(user, "rwd")?; - let _ = self.do_set_deleted(db).await?; + self.do_set_deleted(db).await?; Ok(()) } } @@ -162,7 +161,7 @@ impl UGC for Post { fn extra_delete_condition(&self) -> bool { self.n_comments == 0 } - async fn do_set_deleted(&self, db: &Db) -> API { + async fn do_set_deleted(&mut self, db: &Db) -> API<()> { self.set_deleted(db).await.map_err(From::from) } } @@ -181,7 +180,7 @@ impl UGC for Comment { fn extra_delete_condition(&self) -> bool { true } - async fn do_set_deleted(&self, db: &Db) -> API { + async fn do_set_deleted(&mut self, db: &Db) -> API<()> { self.set_deleted(db).await.map_err(From::from) } } diff --git a/src/api/operation.rs b/src/api/operation.rs index 6b5258b..16056e6 100644 --- a/src/api/operation.rs +++ b/src/api/operation.rs @@ -1,7 +1,7 @@ use crate::api::{APIError, CurrentUser, PolicyError::*, API, UGC}; use crate::db_conn::Db; -use crate::rds_conn::RdsConn; use crate::models::*; +use crate::rds_conn::RdsConn; use rocket::form::Form; use rocket::serde::json::{json, Value}; @@ -14,23 +14,32 @@ pub struct DeleteInput { } #[post("/delete", data = "")] -pub async fn delete(di: Form, user: CurrentUser, db: Db, rconn: RdsConn) -> API { +pub async fn delete( + di: Form, + user: CurrentUser, + db: Db, + rconn: RdsConn, +) -> API { + let mut p: Post; match di.id_type.as_str() { "cid" => { - let c = Comment::get(&db, di.id).await?; + let mut c = Comment::get(&db, di.id).await?; c.soft_delete(&user, &db).await?; - let mut p = Post::get(&db, c.post_id).await?; - p = p.change_n_comments(&db, -1).await?; - p = p.change_hot_score(&db, -2).await?; - p.refresh_cache(&rconn, false).await; + p = Post::get(&db, &rconn, c.post_id).await?; + p.change_n_comments(&db, -1).await?; + p.change_hot_score(&db, -1).await?; + + p.clear_comments_cache(&rconn).await; } "pid" => { - let p = Post::get(&db, di.id).await?; + p = Post::get(&db, &rconn, di.id).await?; p.soft_delete(&user, &db).await?; } _ => return Err(APIError::PcError(NotAllowed)), } + p.refresh_cache(&rconn, false).await; + Ok(json!({ "code": 0 })) diff --git a/src/api/post.rs b/src/api/post.rs index a7e09c3..d11cf08 100644 --- a/src/api/post.rs +++ b/src/api/post.rs @@ -50,12 +50,7 @@ pub struct CwInput { cw: String, } -async fn p2output( - p: &Post, - user: &CurrentUser, - db: &Db, - rconn: &RdsConn, -) -> PostOutput { +async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> PostOutput { PostOutput { pid: p.id, text: format!("{}{}", if p.is_tmp { "[tmp]\n" } else { "" }, p.content), @@ -84,8 +79,7 @@ async fn p2output( None } else { // 单个洞还有查询评论的接口,这里挂了不用报错 - let pid = p.id; - if let Some(cs) = Comment::gets_by_post_id(db, pid).await.ok() { + if let Some(cs) = p.get_comments(db, rconn).await.ok() { Some(c2output(p, &cs, user)) } else { None @@ -96,7 +90,11 @@ async fn p2output( .has(p.id) .await .unwrap_or_default(), - hot_score: if user.is_admin { Some(p.hot_score) } else { None }, + hot_score: if user.is_admin { + Some(p.hot_score) + } else { + None + }, // for old version frontend timestamp: p.create_time.timestamp(), likenum: p.n_attentions, @@ -120,7 +118,7 @@ pub async fn ps2outputs( #[get("/getone?")] pub async fn get_one(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI { // let p = Post::get(&db, pid).await?; - let p = Post::get_with_cache(&db, &rconn, pid).await?; + let p = Post::get(&db, &rconn, pid).await?; p.check_permission(&user, "ro")?; Ok(json!({ "data": p2output(&p, &user,&db, &rconn).await, @@ -175,19 +173,20 @@ pub async fn publish_post( } #[post("/editcw", data = "")] -pub async fn edit_cw(cwi: Form, user: CurrentUser, db: Db) -> JsonAPI { - let p = Post::get(&db, cwi.pid).await?; +pub async fn edit_cw(cwi: Form, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI { + let mut p = Post::get(&db, &rconn, cwi.pid).await?; if !(user.is_admin || p.author_hash == user.namehash) { return Err(APIError::PcError(NotAllowed)); } p.check_permission(&user, "w")?; - _ = p.update_cw(&db, cwi.cw.to_string()).await?; + p.update_cw(&db, cwi.cw.to_string()).await?; + p.refresh_cache(&rconn, false); Ok(json!({"code": 0})) } #[get("/getmulti?")] pub async fn get_multi(pids: Vec, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI { - let ps = Post::get_multi_with_cache(&db, &rconn, &pids).await?; + let ps = Post::get_multi(&db, &rconn, &pids).await?; let ps_data = ps2outputs(&ps, &user, &db, &rconn).await; Ok(json!({ diff --git a/src/api/search.rs b/src/api/search.rs index 009da81..1641dd5 100644 --- a/src/api/search.rs +++ b/src/api/search.rs @@ -17,7 +17,10 @@ pub async fn search( let page_size = 25; let start = (page - 1) * page_size; - let kws = keywords.split(" ").filter(|x| !x.is_empty()).collect::>(); + let kws = keywords + .split(" ") + .filter(|x| !x.is_empty()) + .collect::>(); let ps = if kws.is_empty() { vec![] } else { @@ -30,7 +33,6 @@ pub async fn search( ) .await? }; - let mark_kws = if search_mode == 1 {kws} else {vec![]}; let ps_data = ps2outputs(&ps, &user, &db, &rconn).await; Ok(json!({ "data": ps_data, diff --git a/src/cache.rs b/src/cache.rs index 08bfcc5..1845699 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -28,19 +28,12 @@ impl PostCache { } let kvs: Vec<(String, String)> = ps .iter() - .map(|p| ( - post_cache_key!(p.id), - serde_json::to_string(p).unwrap(), - ) ).collect(); - dbg!(&kvs); - let ret = self.rconn - .set_multiple(&kvs) - .await - .unwrap_or_else(|e| { - warn!("set post cache failed: {}", e); - "x".to_string() - }); - dbg!(ret); + .map(|p| (post_cache_key!(p.id), serde_json::to_string(p).unwrap())) + .collect(); + self.rconn.set_multiple(&kvs).await.unwrap_or_else(|e| { + warn!("set post cache failed: {}", e); + dbg!(&kvs); + }); } pub async fn get(&mut self, pid: &i32) -> Option { @@ -100,6 +93,63 @@ impl PostCache { } } +pub struct PostCommentCache { + key: String, + rconn: RdsConn, +} + +impl PostCommentCache { + pub fn init(pid: i32, rconn: &RdsConn) -> Self { + PostCommentCache { + key: format!("hole_v2:cache:post_comments:{}", pid), + rconn: rconn.clone(), + } + } + + pub async fn set(&mut self, cs: &Vec) { + self.rconn + .set_ex( + &self.key, + serde_json::to_string(cs).unwrap(), + INSTANCE_EXPIRE_TIME, + ) + .await + .unwrap_or_else(|e| { + warn!("set comments cache failed: {}", e); + dbg!(cs); + }) + } + + pub async fn get(&mut self) -> Option> { + let rds_result = self.rconn.get::<&String, String>(&self.key).await; + // dbg!(&rds_result); + if let Ok(s) = rds_result { + self.rconn + .expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME) + .await + .unwrap_or_else(|e| { + warn!( + "get comments cache, set new expire failed: {}, {}, {} ", + e, &self.key, &s + ); + false + }); + serde_json::from_str(&s).unwrap_or_else(|e| { + warn!("get comments cache, decode failed {}, {}", e, s); + None + }) + } else { + None + } + } + + pub async fn clear(&mut self) { + self.rconn.del(&self.key).await.unwrap_or_else(|e| { + warn!("clear commenrs cache fail, {}", e); + }); + } +} + pub struct UserCache { key: String, rconn: RdsConn, @@ -122,14 +172,14 @@ impl UserCache { ) .await .unwrap_or_else(|e| { - warn!("set user cache failed: {}, {}, {}", e, u.id, u.name); + warn!("set user cache failed: {}", e); + dbg!(u); }) } pub async fn get(&mut self) -> Option { let rds_result = self.rconn.get::<&String, String>(&self.key).await; if let Ok(s) = rds_result { - debug!("hint user cache"); self.rconn .expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME) .await diff --git a/src/models.rs b/src/models.rs index 215b169..3e6f226 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,6 +1,6 @@ #![allow(clippy::all)] -use crate::cache::{PostCache, UserCache}; +use crate::cache::{PostCache, PostCommentCache, UserCache}; use crate::db_conn::Db; use crate::libs::diesel_logger::LoggingConnection; use crate::rds_conn::RdsConn; @@ -17,9 +17,9 @@ use std::convert::identity; no_arg_sql_function!(RANDOM, (), "Represents the sql RANDOM() function"); -macro_rules! get { +macro_rules! _get { ($table:ident) => { - pub async fn get(db: &Db, id: i32) -> QueryResult { + async fn _get(db: &Db, id: i32) -> QueryResult { let pid = id; db.run(move |c| $table::table.find(pid).first(with_log!((c)))) .await @@ -27,9 +27,9 @@ macro_rules! get { }; } -macro_rules! get_multi { +macro_rules! _get_multi { ($table:ident) => { - pub async fn get_multi(db: &Db, ids: Vec) -> QueryResult> { + async fn _get_multi(db: &Db, ids: Vec) -> QueryResult> { if ids.is_empty() { return Ok(vec![]); } @@ -47,14 +47,16 @@ macro_rules! get_multi { macro_rules! set_deleted { ($table:ident) => { - pub async fn set_deleted(&self, db: &Db) -> QueryResult { - let pid = self.id; - db.run(move |c| { - diesel::update($table::table.find(pid)) - .set($table::is_deleted.eq(true)) - .execute(with_log!(c)) - }) - .await + pub async fn set_deleted(&mut self, db: &Db) -> QueryResult<()> { + let id = self.id; + *self = db + .run(move |c| { + diesel::update($table::table.find(id)) + .set($table::is_deleted.eq(true)) + .get_result(with_log!(c)) + }) + .await?; + Ok(()) } }; } @@ -73,7 +75,7 @@ macro_rules! with_log { }; } -#[derive(Queryable, Insertable, Serialize, Deserialize)] +#[derive(Queryable, Insertable, Serialize, Deserialize, Debug)] #[serde(crate = "rocket::serde")] pub struct Comment { pub id: i32, @@ -106,7 +108,7 @@ pub struct Post { pub allow_search: bool, } -#[derive(Queryable, Insertable, Serialize, Deserialize)] +#[derive(Queryable, Insertable, Serialize, Deserialize, Debug)] #[serde(crate = "rocket::serde")] pub struct User { pub id: i32, @@ -129,17 +131,13 @@ pub struct NewPost { } impl Post { - get!(posts); + _get!(posts); - get_multi!(posts); + _get_multi!(posts); set_deleted!(posts); - pub async fn get_multi_with_cache( - db: &Db, - rconn: &RdsConn, - ids: &Vec, - ) -> QueryResult> { + pub async fn get_multi(db: &Db, rconn: &RdsConn, ids: &Vec) -> QueryResult> { let mut cacher = PostCache::init(&rconn); let mut cached_posts = cacher.gets(ids).await; let mut id2po = HashMap::>::new(); @@ -153,17 +151,19 @@ impl Post { None => { id2po.insert(pid.clone(), p); Some(pid) - }, + } _ => None, }) .copied() .collect(); - dbg!(&missing_ids); - let missing_ps = Self::get_multi(db, missing_ids).await?; + // dbg!(&missing_ids); + let missing_ps = Self::_get_multi(db, missing_ids).await?; // dbg!(&missing_ps); - - cacher.sets(&missing_ps.iter().map(identity).collect()).await; + + cacher + .sets(&missing_ps.iter().map(identity).collect()) + .await; for p in missing_ps.into_iter() { if let Some(op) = id2po.get_mut(&p.id) { @@ -171,16 +171,43 @@ impl Post { } } // dbg!(&cached_posts); - Ok( - cached_posts.into_iter().filter_map(identity).collect() - ) + Ok(cached_posts + .into_iter() + .filter_map(|p| p.filter(|p| !p.is_deleted)) + .collect()) + } + + pub async fn get(db: &Db, rconn: &RdsConn, id: i32) -> QueryResult { + let mut cacher = PostCache::init(&rconn); + if let Some(p) = cacher.get(&id).await { + if p.is_deleted { + return Err(diesel::result::Error::NotFound); + } + Ok(p) + } else { + let p = Self::_get(db, id).await?; + cacher.sets(&vec![&p]).await; + Ok(p) + } + } + + pub async fn get_comments( + &self, + db: &Db, + rconn: &RdsConn, + ) -> QueryResult> { + let mut cacher = PostCommentCache::init(self.id, rconn); + if let Some(cs) = cacher.get().await { + Ok(cs) + } else { + let cs = Comment::gets_by_post_id(db, self.id).await?; + cacher.set(&cs).await; + Ok(cs) + } } - pub async fn get_with_cache(db: &Db, rconn: &RdsConn, id: i32) -> QueryResult { - Self::get_multi_with_cache(db, rconn, &vec![id]) - .await? - .pop() - .ok_or(diesel::result::Error::NotFound) + pub async fn clear_comments_cache(&self, rconn: &RdsConn) { + PostCommentCache::init(self.id, rconn).clear().await; } pub async fn gets_by_page( @@ -264,44 +291,52 @@ impl Post { .await } - pub async fn update_cw(&self, db: &Db, new_cw: String) -> QueryResult { + pub async fn update_cw(&mut self, db: &Db, new_cw: String) -> QueryResult<()> { let pid = self.id; - db.run(move |c| { - diesel::update(posts::table.find(pid)) - .set(posts::cw.eq(new_cw)) - .execute(with_log!(c)) - }) - .await + *self = db + .run(move |c| { + diesel::update(posts::table.find(pid)) + .set(posts::cw.eq(new_cw)) + .get_result(with_log!(c)) + }) + .await?; + Ok(()) } - pub async fn change_n_comments(&self, db: &Db, delta: i32) -> QueryResult { + pub async fn change_n_comments(&mut self, db: &Db, delta: i32) -> QueryResult<()> { let pid = self.id; - db.run(move |c| { - diesel::update(posts::table.find(pid)) - .set(posts::n_comments.eq(posts::n_comments + delta)) - .get_result(with_log!(c)) - }) - .await + *self = db + .run(move |c| { + diesel::update(posts::table.find(pid)) + .set(posts::n_comments.eq(posts::n_comments + delta)) + .get_result(with_log!(c)) + }) + .await?; + Ok(()) } - pub async fn change_n_attentions(&self, db: &Db, delta: i32) -> QueryResult { + pub async fn change_n_attentions(&mut self, db: &Db, delta: i32) -> QueryResult<()> { let pid = self.id; - db.run(move |c| { - diesel::update(posts::table.find(pid)) - .set(posts::n_attentions.eq(posts::n_attentions + delta)) - .get_result(with_log!(c)) - }) - .await + *self = db + .run(move |c| { + diesel::update(posts::table.find(pid)) + .set(posts::n_attentions.eq(posts::n_attentions + delta)) + .get_result(with_log!(c)) + }) + .await?; + Ok(()) } - pub async fn change_hot_score(&self, db: &Db, delta: i32) -> QueryResult { + pub async fn change_hot_score(&mut self, db: &Db, delta: i32) -> QueryResult<()> { let pid = self.id; - db.run(move |c| { - diesel::update(posts::table.find(pid)) - .set(posts::hot_score.eq(posts::hot_score + delta)) - .get_result(with_log!(c)) - }) - .await + *self = db + .run(move |c| { + diesel::update(posts::table.find(pid)) + .set(posts::hot_score.eq(posts::hot_score + delta)) + .get_result(with_log!(c)) + }) + .await?; + Ok(()) } pub async fn set_instance_cache(&self, rconn: &RdsConn) { @@ -313,7 +348,7 @@ impl Post { } impl User { - pub async fn get_by_token(db: &Db, token: &str) -> Option { + async fn _get_by_token(db: &Db, token: &str) -> Option { let token = token.to_string(); db.run(move |c| { users::table @@ -324,12 +359,12 @@ impl User { .ok() } - pub async fn get_by_token_with_cache(db: &Db, rconn: &RdsConn, token: &str) -> Option { + pub async fn get_by_token(db: &Db, rconn: &RdsConn, token: &str) -> Option { let mut cacher = UserCache::init(token, &rconn); if let Some(u) = cacher.get().await { Some(u) } else { - let u = Self::get_by_token(db, token).await?; + let u = Self::_get_by_token(db, token).await?; cacher.set(&u).await; Some(u) } @@ -347,10 +382,15 @@ pub struct NewComment { } impl Comment { - get!(comments); + _get!(comments); set_deleted!(comments); + pub async fn get(db: &Db, id: i32) -> QueryResult { + // no cache for single comment + Self::_get(db, id).await + } + pub async fn create(db: &Db, new_comment: NewComment) -> QueryResult { db.run(move |c| { insert_into(comments::table)