From 38bacc1ee04a0cbc9c1bef5155b402669158ec54 Mon Sep 17 00:00:00 2001 From: hole-thu Date: Fri, 25 Mar 2022 03:00:38 +0800 Subject: [PATCH] feat: cache for posts list --- src/api/comment.rs | 3 +- src/api/operation.rs | 3 +- src/api/post.rs | 8 ++- src/api/search.rs | 1 + src/api/systemlog.rs | 2 +- src/cache.rs | 108 +++++++++++++++++++++++++++++++++++ src/models.rs | 133 +++++++++++++++++++++++++++++-------------- 7 files changed, 209 insertions(+), 49 deletions(-) diff --git a/src/api/comment.rs b/src/api/comment.rs index bc6bb91..bfca314 100644 --- a/src/api/comment.rs +++ b/src/api/comment.rs @@ -92,7 +92,7 @@ pub async fn add_comment( rconn: RdsConn, ) -> API { let mut p = Post::get(&db, &rconn, ci.pid).await?; - Comment::create( + let c = Comment::create( &db, NewComment { content: ci.text.to_string(), @@ -104,6 +104,7 @@ pub async fn add_comment( ) .await?; p.change_n_comments(&db, 1).await?; + p.update_comment_time(&db, c.create_time).await?; // auto attention after comment let mut att = Attention::init(&user.namehash, &rconn); diff --git a/src/api/operation.rs b/src/api/operation.rs index 16056e6..9d0aacf 100644 --- a/src/api/operation.rs +++ b/src/api/operation.rs @@ -38,7 +38,8 @@ pub async fn delete( _ => return Err(APIError::PcError(NotAllowed)), } - p.refresh_cache(&rconn, false).await; + // 如果是删除,需要也从0号缓存队列中去掉 + p.refresh_cache(&rconn, true).await; Ok(json!({ "code": 0 diff --git a/src/api/post.rs b/src/api/post.rs index e0d91b4..eaff116 100644 --- a/src/api/post.rs +++ b/src/api/post.rs @@ -67,7 +67,10 @@ async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> Pos None } else { // 单个洞还有查询评论的接口,这里挂了不用报错 - p.get_comments(db, rconn).await.ok().map(|cs| c2output(p, &cs, user)) + p.get_comments(db, rconn) + .await + .ok() + .map(|cs| c2output(p, &cs, user)) }, can_del: p.check_permission(user, "wd").is_ok(), attention: Attention::init(&user.namehash, &rconn) @@ -117,7 +120,7 @@ pub async fn get_list( let page = p.unwrap_or(1); let page_size = 25; let start = (page - 1) * page_size; - let ps = Post::gets_by_page(&db, order_mode, start.into(), page_size.into()).await?; + let ps = Post::gets_by_page(&db, &rconn, order_mode, start.into(), page_size.into()).await?; let ps_data = ps2outputs(&ps, &user, &db, &rconn).await; Ok(json!({ "data": ps_data, @@ -147,6 +150,7 @@ pub async fn publish_post( ) .await?; Attention::init(&user.namehash, &rconn).add(p.id).await?; + p.refresh_cache(&rconn, true).await; Ok(json!({ "code": 0 })) diff --git a/src/api/search.rs b/src/api/search.rs index 1641dd5..e5bf73b 100644 --- a/src/api/search.rs +++ b/src/api/search.rs @@ -26,6 +26,7 @@ pub async fn search( } else { Post::search( &db, + &rconn, search_mode, keywords.to_string(), start.into(), diff --git a/src/api/systemlog.rs b/src/api/systemlog.rs index a909a89..fd9df47 100644 --- a/src/api/systemlog.rs +++ b/src/api/systemlog.rs @@ -1,8 +1,8 @@ use crate::api::{CurrentUser, API}; +use crate::db_conn::Db; use crate::random_hasher::RandomHasher; use rocket::serde::json::{json, Value}; use rocket::State; -use crate::db_conn::Db; #[get("/systemlog")] pub async fn get_systemlog(user: CurrentUser, rh: &State, db: Db) -> API { diff --git a/src/cache.rs b/src/cache.rs index 1845699..87dd76c 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,10 +1,15 @@ use crate::models::{Comment, Post, User}; use crate::rds_conn::RdsConn; +use rand::Rng; use redis::AsyncCommands; use rocket::serde::json::serde_json; // can use rocket::serde::json::to_string in master version const INSTANCE_EXPIRE_TIME: usize = 60 * 60; + +const MIN_LENGTH: isize = 200; +const MAX_LENGTH: isize = 900; + macro_rules! post_cache_key { ($id: expr) => { format!("hole_v2:cache:post:{}", $id) @@ -150,6 +155,109 @@ impl PostCommentCache { } } +pub struct PostListCommentCache { + key: String, + mode: u8, + rconn: RdsConn, + length: isize, +} + +impl PostListCommentCache { + pub async fn init(mode: u8, rconn: &RdsConn) -> Self { + let mut cacher = PostListCommentCache { + key: format!("hole_v2:cache:post_list:{}", &mode), + mode: mode, + rconn: rconn.clone(), + length: 0, + }; + cacher.set_and_check_length().await; + cacher + } + + async fn set_and_check_length(&mut self) { + let mut l = self.rconn.zcard(&self.key).await.unwrap(); + if l > MAX_LENGTH { + self.rconn + .zremrangebyrank::<&String, ()>(&self.key, MIN_LENGTH, -1) + .await + .unwrap_or_else(|e| { + warn!("cut list cache failed, {}, {}", e, &self.key); + }); + l = MIN_LENGTH; + } + self.length = l; + } + + pub fn need_fill(&self) -> bool { + self.length < MIN_LENGTH + } + + pub fn i64_len(&self) -> i64 { + self.length.try_into().unwrap() + } + + pub fn i64_minlen(&self) -> i64 { + MIN_LENGTH.try_into().unwrap() + } + + fn p2pair(&self, p: &Post) -> (i64, i32) { + ( + match self.mode { + 0 => (-p.id).into(), + 1 => -p.last_comment_time.timestamp(), + 2 => (-p.hot_score).into(), + 3 => rand::thread_rng().gen_range(0..i64::MAX), + _ => panic!("wrong mode"), + }, + p.id, + ) + } + + pub async fn fill(&mut self, ps: &Vec) { + let items: Vec<(i64, i32)> = ps.iter().map(|p| self.p2pair(p)).collect(); + self.rconn + .zadd_multiple(&self.key, &items) + .await + .unwrap_or_else(|e| { + warn!("fill list cache failed, {} {}", e, &self.key); + }); + + self.set_and_check_length().await; + } + + pub async fn put(&mut self, p: &Post) { + // 其他都是加到最前面的,但热榜不是。可能导致MIN_LENGTH到MAX_LENGTH之间的数据不可靠 + // 影响不大,先不管了 + if p.is_deleted { + self.rconn.zrem(&self.key, p.id).await.unwrap_or_else(|e| { + warn!( + "remove from list cache failed, {} {} {}", + e, &self.key, p.id + ); + }); + } else { + let (s, m) = self.p2pair(p); + self.rconn.zadd(&self.key, m, s).await.unwrap_or_else(|e| { + warn!( + "put into list cache failed, {} {} {} {}", + e, &self.key, m, s + ); + }); + } + } + + pub async fn get_pids(&mut self, start: i64, limit: i64) -> Vec { + self.rconn + .zrange( + &self.key, + start.try_into().unwrap(), + (start + limit - 1).try_into().unwrap(), + ) + .await + .unwrap() + } +} + pub struct UserCache { key: String, rconn: RdsConn, diff --git a/src/models.rs b/src/models.rs index 3e6f226..f5e5c18 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,6 +1,6 @@ #![allow(clippy::all)] -use crate::cache::{PostCache, PostCommentCache, UserCache}; +use crate::cache::*; use crate::db_conn::Db; use crate::libs::diesel_logger::LoggingConnection; use crate::rds_conn::RdsConn; @@ -11,6 +11,7 @@ use diesel::{ insert_into, BoolExpressionMethods, ExpressionMethods, QueryDsl, QueryResult, RunQueryDsl, TextExpressionMethods, }; +use rocket::futures::{future, join}; use rocket::serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::convert::identity; @@ -191,11 +192,7 @@ impl Post { } } - pub async fn get_comments( - &self, - db: &Db, - rconn: &RdsConn, - ) -> QueryResult> { + pub async fn get_comments(&self, db: &Db, rconn: &RdsConn) -> QueryResult> { let mut cacher = PostCommentCache::init(self.id, rconn); if let Some(cs) = cacher.get().await { Ok(cs) @@ -212,12 +209,34 @@ impl Post { pub async fn gets_by_page( db: &Db, + rconn: &RdsConn, order_mode: u8, start: i64, limit: i64, ) -> QueryResult> { + let mut cacher = PostListCommentCache::init(order_mode, &rconn).await; + if cacher.need_fill() { + let pids = + Self::_get_ids_by_page(db, order_mode.clone(), 0, cacher.i64_minlen()).await?; + let ps = Self::get_multi(db, rconn, &pids).await?; + cacher.fill(&ps).await; + } + let pids = if start + limit > cacher.i64_len() { + Self::_get_ids_by_page(db, order_mode, start, limit).await? + } else { + cacher.get_pids(start, limit).await + }; + + Self::get_multi(db, rconn, &pids).await + } + async fn _get_ids_by_page( + db: &Db, + order_mode: u8, + start: i64, + limit: i64, + ) -> QueryResult> { db.run(move |c| { - let mut query = base_query!(posts); + let mut query = base_query!(posts).select(posts::id); if order_mode > 0 { query = query.filter(posts::is_reported.eq(false)) } @@ -237,48 +256,54 @@ impl Post { pub async fn search( db: &Db, + rconn: &RdsConn, search_mode: u8, search_text: String, start: i64, limit: i64, ) -> QueryResult> { let search_text2 = search_text.replace("%", "\\%"); - db.run(move |c| { - let pat; - let mut query = base_query!(posts).distinct().left_join(comments::table); - // 先用搜索+缓存,性能有问题了再真的做tag表 - query = match search_mode { - 0 => { - pat = format!("%#{}%", &search_text2); - query - .filter(posts::cw.eq(&search_text)) - .or_filter(posts::cw.eq(format!("#{}", &search_text))) - .or_filter(posts::content.like(&pat)) - .or_filter( - comments::content - .like(&pat) - .and(comments::is_deleted.eq(false)), - ) - } - 1 => { - pat = format!("%{}%", search_text2.replace(" ", "%")); - query - .filter(posts::content.like(&pat).or(comments::content.like(&pat))) - .filter(posts::allow_search.eq(true)) - } - 2 => query - .filter(posts::author_title.eq(&search_text)) - .or_filter(comments::author_title.eq(&search_text)), - _ => panic!("Wrong search mode!"), - }; - - query - .order(posts::id.desc()) - .offset(start) - .limit(limit) - .load(with_log!(c)) - }) - .await + let pids = db + .run(move |c| { + let pat; + let mut query = base_query!(posts) + .select(posts::id) + .distinct() + .left_join(comments::table); + // 先用搜索+缓存,性能有问题了再真的做tag表 + query = match search_mode { + 0 => { + pat = format!("%#{}%", &search_text2); + query + .filter(posts::cw.eq(&search_text)) + .or_filter(posts::cw.eq(format!("#{}", &search_text))) + .or_filter(posts::content.like(&pat)) + .or_filter( + comments::content + .like(&pat) + .and(comments::is_deleted.eq(false)), + ) + } + 1 => { + pat = format!("%{}%", search_text2.replace(" ", "%")); + query + .filter(posts::content.like(&pat).or(comments::content.like(&pat))) + .filter(posts::allow_search.eq(true)) + } + 2 => query + .filter(posts::author_title.eq(&search_text)) + .or_filter(comments::author_title.eq(&search_text)), + _ => panic!("Wrong search mode!"), + }; + + query + .order(posts::id.desc()) + .offset(start) + .limit(limit) + .load(with_log!(c)) + }) + .await?; + Self::get_multi(db, rconn, &pids).await } pub async fn create(db: &Db, new_post: NewPost) -> QueryResult { @@ -303,6 +328,18 @@ impl Post { Ok(()) } + pub async fn update_comment_time(&mut self, db: &Db, t: DateTime) -> QueryResult<()> { + let pid = self.id; + *self = db + .run(move |c| { + diesel::update(posts::table.find(pid)) + .set(posts::last_comment_time.eq(t)) + .get_result(with_log!(c)) + }) + .await?; + Ok(()) + } + pub async fn change_n_comments(&mut self, db: &Db, delta: i32) -> QueryResult<()> { let pid = self.id; *self = db @@ -343,7 +380,15 @@ impl Post { PostCache::init(rconn).sets(&vec![self]).await; } pub async fn refresh_cache(&self, rconn: &RdsConn, is_new: bool) { - self.set_instance_cache(rconn).await; + join!( + self.set_instance_cache(rconn), + future::join_all((if is_new { 0..4 } else { 1..4 }).map(|mode| async move { + PostListCommentCache::init(mode, &rconn.clone()) + .await + .put(self) + .await + })), + ); } }