Browse Source

feat: post comments cache & use cache everywhere

master
hole-thu 3 years ago
parent
commit
c8b8ad0787
  1. 8
      src/api/attention.rs
  2. 26
      src/api/comment.rs
  3. 13
      src/api/mod.rs
  4. 25
      src/api/operation.rs
  5. 27
      src/api/post.rs
  6. 6
      src/api/search.rs
  7. 80
      src/cache.rs
  8. 174
      src/models.rs

8
src/api/attention.rs

@ -22,7 +22,7 @@ pub async fn attention_post(
rconn: RdsConn,
) -> API<Value> {
user.id.ok_or_else(|| APIError::PcError(NotAllowed))?;
let mut p = Post::get(&db, ai.pid).await?;
let mut p = Post::get(&db, &rconn, ai.pid).await?;
p.check_permission(&user, "r")?;
let mut att = Attention::init(&user.namehash, &rconn);
let switch_to = ai.switch == 1;
@ -35,8 +35,8 @@ pub async fn attention_post(
att.remove(ai.pid).await?;
delta = -1;
}
p = p.change_n_attentions(&db, delta).await?;
p = p.change_hot_score(&db, delta * 2).await?;
p.change_n_attentions(&db, delta).await?;
p.change_hot_score(&db, delta * 2).await?;
p.refresh_cache(&rconn, false).await;
}
@ -52,7 +52,7 @@ pub async fn attention_post(
#[get("/getattention")]
pub async fn get_attention(user: CurrentUser, db: Db, rconn: RdsConn) -> API<Value> {
let ids = Attention::init(&user.namehash, &rconn).all().await?;
let ps = Post::get_multi(&db, ids).await?;
let ps = Post::get_multi(&db, &rconn, &ids).await?;
let ps_data = ps2outputs(&ps, &user, &db, &rconn).await;
Ok(json!({

26
src/api/comment.rs

@ -5,7 +5,7 @@ use crate::rds_conn::RdsConn;
use crate::rds_models::*;
use chrono::{offset::Utc, DateTime};
use rocket::form::Form;
use rocket::futures::{future::TryFutureExt, try_join};
use rocket::futures::{future::TryFutureExt, join, try_join};
use rocket::serde::{
json::{json, Value},
Serialize,
@ -34,11 +34,7 @@ pub struct CommentOutput {
timestamp: i64,
}
pub fn c2output<'r>(
p: &'r Post,
cs: &Vec<Comment>,
user: &CurrentUser,
) -> Vec<CommentOutput> {
pub fn c2output<'r>(p: &'r Post, cs: &Vec<Comment>, user: &CurrentUser) -> Vec<CommentOutput> {
let mut hash2id = HashMap::<&String, i32>::from([(&p.author_hash, 0)]);
cs.iter()
.filter_map(|c| {
@ -71,12 +67,11 @@ pub fn c2output<'r>(
#[get("/getcomment?<pid>")]
pub async fn get_comment(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> API<Value> {
let p = Post::get(&db, pid).await?;
let p = Post::get(&db, &rconn, pid).await?;
if p.is_deleted {
return Err(APIError::PcError(IsDeleted));
}
let pid = p.id;
let cs = Comment::gets_by_post_id(&db, pid).await?;
let cs = p.get_comments(&db, &rconn).await?;
let data = c2output(&p, &cs, &user);
Ok(json!({
@ -96,7 +91,7 @@ pub async fn add_comment(
db: Db,
rconn: RdsConn,
) -> API<Value> {
let mut p = Post::get(&db, ci.pid).await?;
let mut p = Post::get(&db, &rconn, ci.pid).await?;
Comment::create(
&db,
NewComment {
@ -108,7 +103,7 @@ pub async fn add_comment(
},
)
.await?;
p = p.change_n_comments(&db, 1).await?;
p.change_n_comments(&db, 1).await?;
// auto attention after comment
let mut att = Attention::init(&user.namehash, &rconn);
@ -119,15 +114,18 @@ pub async fn add_comment(
try_join!(
att.add(p.id).err_into::<APIError>(),
async {
p = p.change_n_attentions(&db, 1).await?;
p.change_n_attentions(&db, 1).await?;
Ok::<(), APIError>(())
}
.err_into::<APIError>(),
)?;
}
p = p.change_hot_score(&db, hs_delta).await?;
p.refresh_cache(&rconn, false).await;
p.change_hot_score(&db, hs_delta).await?;
join!(
p.refresh_cache(&rconn, false),
p.clear_comments_cache(&rconn),
);
Ok(json!({
"code": 0

13
src/api/mod.rs

@ -40,7 +40,7 @@ impl<'r> FromRequest<'r> for CurrentUser {
} else {
let db = try_outcome!(request.guard::<Db>().await);
let rconn = try_outcome!(request.guard::<RdsConn>().await);
if let Some(user) = User::get_by_token_with_cache(&db, &rconn, token).await {
if let Some(user) = User::get_by_token(&db, &rconn, token).await {
let namehash = rh.hash_with_salt(&user.name);
cu = Some(CurrentUser {
id: Some(user.id),
@ -74,7 +74,6 @@ pub enum APIError {
impl<'r> Responder<'r, 'static> for APIError {
fn respond_to(self, req: &'r Request<'_>) -> response::Result<'static> {
dbg!(&self);
match self {
APIError::DbError(e) => json!({
"code": -1,
@ -120,7 +119,7 @@ pub trait UGC {
fn get_is_deleted(&self) -> bool;
fn get_is_reported(&self) -> bool;
fn extra_delete_condition(&self) -> bool;
async fn do_set_deleted(&self, db: &Db) -> API<usize>;
async fn do_set_deleted(&mut self, db: &Db) -> API<()>;
fn check_permission(&self, user: &CurrentUser, mode: &str) -> API<()> {
if user.is_admin {
return Ok(());
@ -140,10 +139,10 @@ pub trait UGC {
Ok(())
}
async fn soft_delete(&self, user: &CurrentUser, db: &Db) -> API<()> {
async fn soft_delete(&mut self, user: &CurrentUser, db: &Db) -> API<()> {
self.check_permission(user, "rwd")?;
let _ = self.do_set_deleted(db).await?;
self.do_set_deleted(db).await?;
Ok(())
}
}
@ -162,7 +161,7 @@ impl UGC for Post {
fn extra_delete_condition(&self) -> bool {
self.n_comments == 0
}
async fn do_set_deleted(&self, db: &Db) -> API<usize> {
async fn do_set_deleted(&mut self, db: &Db) -> API<()> {
self.set_deleted(db).await.map_err(From::from)
}
}
@ -181,7 +180,7 @@ impl UGC for Comment {
fn extra_delete_condition(&self) -> bool {
true
}
async fn do_set_deleted(&self, db: &Db) -> API<usize> {
async fn do_set_deleted(&mut self, db: &Db) -> API<()> {
self.set_deleted(db).await.map_err(From::from)
}
}

25
src/api/operation.rs

@ -1,7 +1,7 @@
use crate::api::{APIError, CurrentUser, PolicyError::*, API, UGC};
use crate::db_conn::Db;
use crate::rds_conn::RdsConn;
use crate::models::*;
use crate::rds_conn::RdsConn;
use rocket::form::Form;
use rocket::serde::json::{json, Value};
@ -14,23 +14,32 @@ pub struct DeleteInput {
}
#[post("/delete", data = "<di>")]
pub async fn delete(di: Form<DeleteInput>, user: CurrentUser, db: Db, rconn: RdsConn) -> API<Value> {
pub async fn delete(
di: Form<DeleteInput>,
user: CurrentUser,
db: Db,
rconn: RdsConn,
) -> API<Value> {
let mut p: Post;
match di.id_type.as_str() {
"cid" => {
let c = Comment::get(&db, di.id).await?;
let mut c = Comment::get(&db, di.id).await?;
c.soft_delete(&user, &db).await?;
let mut p = Post::get(&db, c.post_id).await?;
p = p.change_n_comments(&db, -1).await?;
p = p.change_hot_score(&db, -2).await?;
p.refresh_cache(&rconn, false).await;
p = Post::get(&db, &rconn, c.post_id).await?;
p.change_n_comments(&db, -1).await?;
p.change_hot_score(&db, -1).await?;
p.clear_comments_cache(&rconn).await;
}
"pid" => {
let p = Post::get(&db, di.id).await?;
p = Post::get(&db, &rconn, di.id).await?;
p.soft_delete(&user, &db).await?;
}
_ => return Err(APIError::PcError(NotAllowed)),
}
p.refresh_cache(&rconn, false).await;
Ok(json!({
"code": 0
}))

27
src/api/post.rs

@ -50,12 +50,7 @@ pub struct CwInput {
cw: String,
}
async fn p2output(
p: &Post,
user: &CurrentUser,
db: &Db,
rconn: &RdsConn,
) -> PostOutput {
async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> PostOutput {
PostOutput {
pid: p.id,
text: format!("{}{}", if p.is_tmp { "[tmp]\n" } else { "" }, p.content),
@ -84,8 +79,7 @@ async fn p2output(
None
} else {
// 单个洞还有查询评论的接口,这里挂了不用报错
let pid = p.id;
if let Some(cs) = Comment::gets_by_post_id(db, pid).await.ok() {
if let Some(cs) = p.get_comments(db, rconn).await.ok() {
Some(c2output(p, &cs, user))
} else {
None
@ -96,7 +90,11 @@ async fn p2output(
.has(p.id)
.await
.unwrap_or_default(),
hot_score: if user.is_admin { Some(p.hot_score) } else { None },
hot_score: if user.is_admin {
Some(p.hot_score)
} else {
None
},
// for old version frontend
timestamp: p.create_time.timestamp(),
likenum: p.n_attentions,
@ -120,7 +118,7 @@ pub async fn ps2outputs(
#[get("/getone?<pid>")]
pub async fn get_one(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI {
// let p = Post::get(&db, pid).await?;
let p = Post::get_with_cache(&db, &rconn, pid).await?;
let p = Post::get(&db, &rconn, pid).await?;
p.check_permission(&user, "ro")?;
Ok(json!({
"data": p2output(&p, &user,&db, &rconn).await,
@ -175,19 +173,20 @@ pub async fn publish_post(
}
#[post("/editcw", data = "<cwi>")]
pub async fn edit_cw(cwi: Form<CwInput>, user: CurrentUser, db: Db) -> JsonAPI {
let p = Post::get(&db, cwi.pid).await?;
pub async fn edit_cw(cwi: Form<CwInput>, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI {
let mut p = Post::get(&db, &rconn, cwi.pid).await?;
if !(user.is_admin || p.author_hash == user.namehash) {
return Err(APIError::PcError(NotAllowed));
}
p.check_permission(&user, "w")?;
_ = p.update_cw(&db, cwi.cw.to_string()).await?;
p.update_cw(&db, cwi.cw.to_string()).await?;
p.refresh_cache(&rconn, false);
Ok(json!({"code": 0}))
}
#[get("/getmulti?<pids>")]
pub async fn get_multi(pids: Vec<i32>, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI {
let ps = Post::get_multi_with_cache(&db, &rconn, &pids).await?;
let ps = Post::get_multi(&db, &rconn, &pids).await?;
let ps_data = ps2outputs(&ps, &user, &db, &rconn).await;
Ok(json!({

6
src/api/search.rs

@ -17,7 +17,10 @@ pub async fn search(
let page_size = 25;
let start = (page - 1) * page_size;
let kws = keywords.split(" ").filter(|x| !x.is_empty()).collect::<Vec<&str>>();
let kws = keywords
.split(" ")
.filter(|x| !x.is_empty())
.collect::<Vec<&str>>();
let ps = if kws.is_empty() {
vec![]
} else {
@ -30,7 +33,6 @@ pub async fn search(
)
.await?
};
let mark_kws = if search_mode == 1 {kws} else {vec![]};
let ps_data = ps2outputs(&ps, &user, &db, &rconn).await;
Ok(json!({
"data": ps_data,

80
src/cache.rs

@ -28,19 +28,12 @@ impl PostCache {
}
let kvs: Vec<(String, String)> = ps
.iter()
.map(|p| (
post_cache_key!(p.id),
serde_json::to_string(p).unwrap(),
) ).collect();
dbg!(&kvs);
let ret = self.rconn
.set_multiple(&kvs)
.await
.unwrap_or_else(|e| {
warn!("set post cache failed: {}", e);
"x".to_string()
});
dbg!(ret);
.map(|p| (post_cache_key!(p.id), serde_json::to_string(p).unwrap()))
.collect();
self.rconn.set_multiple(&kvs).await.unwrap_or_else(|e| {
warn!("set post cache failed: {}", e);
dbg!(&kvs);
});
}
pub async fn get(&mut self, pid: &i32) -> Option<Post> {
@ -100,6 +93,63 @@ impl PostCache {
}
}
pub struct PostCommentCache {
key: String,
rconn: RdsConn,
}
impl PostCommentCache {
pub fn init(pid: i32, rconn: &RdsConn) -> Self {
PostCommentCache {
key: format!("hole_v2:cache:post_comments:{}", pid),
rconn: rconn.clone(),
}
}
pub async fn set(&mut self, cs: &Vec<Comment>) {
self.rconn
.set_ex(
&self.key,
serde_json::to_string(cs).unwrap(),
INSTANCE_EXPIRE_TIME,
)
.await
.unwrap_or_else(|e| {
warn!("set comments cache failed: {}", e);
dbg!(cs);
})
}
pub async fn get(&mut self) -> Option<Vec<Comment>> {
let rds_result = self.rconn.get::<&String, String>(&self.key).await;
// dbg!(&rds_result);
if let Ok(s) = rds_result {
self.rconn
.expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME)
.await
.unwrap_or_else(|e| {
warn!(
"get comments cache, set new expire failed: {}, {}, {} ",
e, &self.key, &s
);
false
});
serde_json::from_str(&s).unwrap_or_else(|e| {
warn!("get comments cache, decode failed {}, {}", e, s);
None
})
} else {
None
}
}
pub async fn clear(&mut self) {
self.rconn.del(&self.key).await.unwrap_or_else(|e| {
warn!("clear commenrs cache fail, {}", e);
});
}
}
pub struct UserCache {
key: String,
rconn: RdsConn,
@ -122,14 +172,14 @@ impl UserCache {
)
.await
.unwrap_or_else(|e| {
warn!("set user cache failed: {}, {}, {}", e, u.id, u.name);
warn!("set user cache failed: {}", e);
dbg!(u);
})
}
pub async fn get(&mut self) -> Option<User> {
let rds_result = self.rconn.get::<&String, String>(&self.key).await;
if let Ok(s) = rds_result {
debug!("hint user cache");
self.rconn
.expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME)
.await

174
src/models.rs

@ -1,6 +1,6 @@
#![allow(clippy::all)]
use crate::cache::{PostCache, UserCache};
use crate::cache::{PostCache, PostCommentCache, UserCache};
use crate::db_conn::Db;
use crate::libs::diesel_logger::LoggingConnection;
use crate::rds_conn::RdsConn;
@ -17,9 +17,9 @@ use std::convert::identity;
no_arg_sql_function!(RANDOM, (), "Represents the sql RANDOM() function");
macro_rules! get {
macro_rules! _get {
($table:ident) => {
pub async fn get(db: &Db, id: i32) -> QueryResult<Self> {
async fn _get(db: &Db, id: i32) -> QueryResult<Self> {
let pid = id;
db.run(move |c| $table::table.find(pid).first(with_log!((c))))
.await
@ -27,9 +27,9 @@ macro_rules! get {
};
}
macro_rules! get_multi {
macro_rules! _get_multi {
($table:ident) => {
pub async fn get_multi(db: &Db, ids: Vec<i32>) -> QueryResult<Vec<Self>> {
async fn _get_multi(db: &Db, ids: Vec<i32>) -> QueryResult<Vec<Self>> {
if ids.is_empty() {
return Ok(vec![]);
}
@ -47,14 +47,16 @@ macro_rules! get_multi {
macro_rules! set_deleted {
($table:ident) => {
pub async fn set_deleted(&self, db: &Db) -> QueryResult<usize> {
let pid = self.id;
db.run(move |c| {
diesel::update($table::table.find(pid))
.set($table::is_deleted.eq(true))
.execute(with_log!(c))
})
.await
pub async fn set_deleted(&mut self, db: &Db) -> QueryResult<()> {
let id = self.id;
*self = db
.run(move |c| {
diesel::update($table::table.find(id))
.set($table::is_deleted.eq(true))
.get_result(with_log!(c))
})
.await?;
Ok(())
}
};
}
@ -73,7 +75,7 @@ macro_rules! with_log {
};
}
#[derive(Queryable, Insertable, Serialize, Deserialize)]
#[derive(Queryable, Insertable, Serialize, Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct Comment {
pub id: i32,
@ -106,7 +108,7 @@ pub struct Post {
pub allow_search: bool,
}
#[derive(Queryable, Insertable, Serialize, Deserialize)]
#[derive(Queryable, Insertable, Serialize, Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct User {
pub id: i32,
@ -129,17 +131,13 @@ pub struct NewPost {
}
impl Post {
get!(posts);
_get!(posts);
get_multi!(posts);
_get_multi!(posts);
set_deleted!(posts);
pub async fn get_multi_with_cache(
db: &Db,
rconn: &RdsConn,
ids: &Vec<i32>,
) -> QueryResult<Vec<Self>> {
pub async fn get_multi(db: &Db, rconn: &RdsConn, ids: &Vec<i32>) -> QueryResult<Vec<Self>> {
let mut cacher = PostCache::init(&rconn);
let mut cached_posts = cacher.gets(ids).await;
let mut id2po = HashMap::<i32, &mut Option<Post>>::new();
@ -153,17 +151,19 @@ impl Post {
None => {
id2po.insert(pid.clone(), p);
Some(pid)
},
}
_ => None,
})
.copied()
.collect();
dbg!(&missing_ids);
let missing_ps = Self::get_multi(db, missing_ids).await?;
// dbg!(&missing_ids);
let missing_ps = Self::_get_multi(db, missing_ids).await?;
// dbg!(&missing_ps);
cacher.sets(&missing_ps.iter().map(identity).collect()).await;
cacher
.sets(&missing_ps.iter().map(identity).collect())
.await;
for p in missing_ps.into_iter() {
if let Some(op) = id2po.get_mut(&p.id) {
@ -171,16 +171,43 @@ impl Post {
}
}
// dbg!(&cached_posts);
Ok(
cached_posts.into_iter().filter_map(identity).collect()
)
Ok(cached_posts
.into_iter()
.filter_map(|p| p.filter(|p| !p.is_deleted))
.collect())
}
pub async fn get(db: &Db, rconn: &RdsConn, id: i32) -> QueryResult<Self> {
let mut cacher = PostCache::init(&rconn);
if let Some(p) = cacher.get(&id).await {
if p.is_deleted {
return Err(diesel::result::Error::NotFound);
}
Ok(p)
} else {
let p = Self::_get(db, id).await?;
cacher.sets(&vec![&p]).await;
Ok(p)
}
}
pub async fn get_comments(
&self,
db: &Db,
rconn: &RdsConn,
) -> QueryResult<Vec<Comment>> {
let mut cacher = PostCommentCache::init(self.id, rconn);
if let Some(cs) = cacher.get().await {
Ok(cs)
} else {
let cs = Comment::gets_by_post_id(db, self.id).await?;
cacher.set(&cs).await;
Ok(cs)
}
}
pub async fn get_with_cache(db: &Db, rconn: &RdsConn, id: i32) -> QueryResult<Self> {
Self::get_multi_with_cache(db, rconn, &vec![id])
.await?
.pop()
.ok_or(diesel::result::Error::NotFound)
pub async fn clear_comments_cache(&self, rconn: &RdsConn) {
PostCommentCache::init(self.id, rconn).clear().await;
}
pub async fn gets_by_page(
@ -264,44 +291,52 @@ impl Post {
.await
}
pub async fn update_cw(&self, db: &Db, new_cw: String) -> QueryResult<usize> {
pub async fn update_cw(&mut self, db: &Db, new_cw: String) -> QueryResult<()> {
let pid = self.id;
db.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::cw.eq(new_cw))
.execute(with_log!(c))
})
.await
*self = db
.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::cw.eq(new_cw))
.get_result(with_log!(c))
})
.await?;
Ok(())
}
pub async fn change_n_comments(&self, db: &Db, delta: i32) -> QueryResult<Self> {
pub async fn change_n_comments(&mut self, db: &Db, delta: i32) -> QueryResult<()> {
let pid = self.id;
db.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::n_comments.eq(posts::n_comments + delta))
.get_result(with_log!(c))
})
.await
*self = db
.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::n_comments.eq(posts::n_comments + delta))
.get_result(with_log!(c))
})
.await?;
Ok(())
}
pub async fn change_n_attentions(&self, db: &Db, delta: i32) -> QueryResult<Self> {
pub async fn change_n_attentions(&mut self, db: &Db, delta: i32) -> QueryResult<()> {
let pid = self.id;
db.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::n_attentions.eq(posts::n_attentions + delta))
.get_result(with_log!(c))
})
.await
*self = db
.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::n_attentions.eq(posts::n_attentions + delta))
.get_result(with_log!(c))
})
.await?;
Ok(())
}
pub async fn change_hot_score(&self, db: &Db, delta: i32) -> QueryResult<Self> {
pub async fn change_hot_score(&mut self, db: &Db, delta: i32) -> QueryResult<()> {
let pid = self.id;
db.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::hot_score.eq(posts::hot_score + delta))
.get_result(with_log!(c))
})
.await
*self = db
.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::hot_score.eq(posts::hot_score + delta))
.get_result(with_log!(c))
})
.await?;
Ok(())
}
pub async fn set_instance_cache(&self, rconn: &RdsConn) {
@ -313,7 +348,7 @@ impl Post {
}
impl User {
pub async fn get_by_token(db: &Db, token: &str) -> Option<Self> {
async fn _get_by_token(db: &Db, token: &str) -> Option<Self> {
let token = token.to_string();
db.run(move |c| {
users::table
@ -324,12 +359,12 @@ impl User {
.ok()
}
pub async fn get_by_token_with_cache(db: &Db, rconn: &RdsConn, token: &str) -> Option<Self> {
pub async fn get_by_token(db: &Db, rconn: &RdsConn, token: &str) -> Option<Self> {
let mut cacher = UserCache::init(token, &rconn);
if let Some(u) = cacher.get().await {
Some(u)
} else {
let u = Self::get_by_token(db, token).await?;
let u = Self::_get_by_token(db, token).await?;
cacher.set(&u).await;
Some(u)
}
@ -347,10 +382,15 @@ pub struct NewComment {
}
impl Comment {
get!(comments);
_get!(comments);
set_deleted!(comments);
pub async fn get(db: &Db, id: i32) -> QueryResult<Self> {
// no cache for single comment
Self::_get(db, id).await
}
pub async fn create(db: &Db, new_comment: NewComment) -> QueryResult<Self> {
db.run(move |c| {
insert_into(comments::table)

Loading…
Cancel
Save