Browse Source

feat: cache for posts list

master
hole-thu 3 years ago
parent
commit
38bacc1ee0
  1. 3
      src/api/comment.rs
  2. 3
      src/api/operation.rs
  3. 8
      src/api/post.rs
  4. 1
      src/api/search.rs
  5. 2
      src/api/systemlog.rs
  6. 108
      src/cache.rs
  7. 133
      src/models.rs

3
src/api/comment.rs

@ -92,7 +92,7 @@ pub async fn add_comment(
rconn: RdsConn,
) -> API<Value> {
let mut p = Post::get(&db, &rconn, ci.pid).await?;
Comment::create(
let c = Comment::create(
&db,
NewComment {
content: ci.text.to_string(),
@ -104,6 +104,7 @@ pub async fn add_comment(
)
.await?;
p.change_n_comments(&db, 1).await?;
p.update_comment_time(&db, c.create_time).await?;
// auto attention after comment
let mut att = Attention::init(&user.namehash, &rconn);

3
src/api/operation.rs

@ -38,7 +38,8 @@ pub async fn delete(
_ => return Err(APIError::PcError(NotAllowed)),
}
p.refresh_cache(&rconn, false).await;
// 如果是删除,需要也从0号缓存队列中去掉
p.refresh_cache(&rconn, true).await;
Ok(json!({
"code": 0

8
src/api/post.rs

@ -67,7 +67,10 @@ async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> Pos
None
} else {
// 单个洞还有查询评论的接口,这里挂了不用报错
p.get_comments(db, rconn).await.ok().map(|cs| c2output(p, &cs, user))
p.get_comments(db, rconn)
.await
.ok()
.map(|cs| c2output(p, &cs, user))
},
can_del: p.check_permission(user, "wd").is_ok(),
attention: Attention::init(&user.namehash, &rconn)
@ -117,7 +120,7 @@ pub async fn get_list(
let page = p.unwrap_or(1);
let page_size = 25;
let start = (page - 1) * page_size;
let ps = Post::gets_by_page(&db, order_mode, start.into(), page_size.into()).await?;
let ps = Post::gets_by_page(&db, &rconn, order_mode, start.into(), page_size.into()).await?;
let ps_data = ps2outputs(&ps, &user, &db, &rconn).await;
Ok(json!({
"data": ps_data,
@ -147,6 +150,7 @@ pub async fn publish_post(
)
.await?;
Attention::init(&user.namehash, &rconn).add(p.id).await?;
p.refresh_cache(&rconn, true).await;
Ok(json!({
"code": 0
}))

1
src/api/search.rs

@ -26,6 +26,7 @@ pub async fn search(
} else {
Post::search(
&db,
&rconn,
search_mode,
keywords.to_string(),
start.into(),

2
src/api/systemlog.rs

@ -1,8 +1,8 @@
use crate::api::{CurrentUser, API};
use crate::db_conn::Db;
use crate::random_hasher::RandomHasher;
use rocket::serde::json::{json, Value};
use rocket::State;
use crate::db_conn::Db;
#[get("/systemlog")]
pub async fn get_systemlog(user: CurrentUser, rh: &State<RandomHasher>, db: Db) -> API<Value> {

108
src/cache.rs

@ -1,10 +1,15 @@
use crate::models::{Comment, Post, User};
use crate::rds_conn::RdsConn;
use rand::Rng;
use redis::AsyncCommands;
use rocket::serde::json::serde_json;
// can use rocket::serde::json::to_string in master version
const INSTANCE_EXPIRE_TIME: usize = 60 * 60;
const MIN_LENGTH: isize = 200;
const MAX_LENGTH: isize = 900;
macro_rules! post_cache_key {
($id: expr) => {
format!("hole_v2:cache:post:{}", $id)
@ -150,6 +155,109 @@ impl PostCommentCache {
}
}
pub struct PostListCommentCache {
key: String,
mode: u8,
rconn: RdsConn,
length: isize,
}
impl PostListCommentCache {
pub async fn init(mode: u8, rconn: &RdsConn) -> Self {
let mut cacher = PostListCommentCache {
key: format!("hole_v2:cache:post_list:{}", &mode),
mode: mode,
rconn: rconn.clone(),
length: 0,
};
cacher.set_and_check_length().await;
cacher
}
async fn set_and_check_length(&mut self) {
let mut l = self.rconn.zcard(&self.key).await.unwrap();
if l > MAX_LENGTH {
self.rconn
.zremrangebyrank::<&String, ()>(&self.key, MIN_LENGTH, -1)
.await
.unwrap_or_else(|e| {
warn!("cut list cache failed, {}, {}", e, &self.key);
});
l = MIN_LENGTH;
}
self.length = l;
}
pub fn need_fill(&self) -> bool {
self.length < MIN_LENGTH
}
pub fn i64_len(&self) -> i64 {
self.length.try_into().unwrap()
}
pub fn i64_minlen(&self) -> i64 {
MIN_LENGTH.try_into().unwrap()
}
fn p2pair(&self, p: &Post) -> (i64, i32) {
(
match self.mode {
0 => (-p.id).into(),
1 => -p.last_comment_time.timestamp(),
2 => (-p.hot_score).into(),
3 => rand::thread_rng().gen_range(0..i64::MAX),
_ => panic!("wrong mode"),
},
p.id,
)
}
pub async fn fill(&mut self, ps: &Vec<Post>) {
let items: Vec<(i64, i32)> = ps.iter().map(|p| self.p2pair(p)).collect();
self.rconn
.zadd_multiple(&self.key, &items)
.await
.unwrap_or_else(|e| {
warn!("fill list cache failed, {} {}", e, &self.key);
});
self.set_and_check_length().await;
}
pub async fn put(&mut self, p: &Post) {
// 其他都是加到最前面的,但热榜不是。可能导致MIN_LENGTH到MAX_LENGTH之间的数据不可靠
// 影响不大,先不管了
if p.is_deleted {
self.rconn.zrem(&self.key, p.id).await.unwrap_or_else(|e| {
warn!(
"remove from list cache failed, {} {} {}",
e, &self.key, p.id
);
});
} else {
let (s, m) = self.p2pair(p);
self.rconn.zadd(&self.key, m, s).await.unwrap_or_else(|e| {
warn!(
"put into list cache failed, {} {} {} {}",
e, &self.key, m, s
);
});
}
}
pub async fn get_pids(&mut self, start: i64, limit: i64) -> Vec<i32> {
self.rconn
.zrange(
&self.key,
start.try_into().unwrap(),
(start + limit - 1).try_into().unwrap(),
)
.await
.unwrap()
}
}
pub struct UserCache {
key: String,
rconn: RdsConn,

133
src/models.rs

@ -1,6 +1,6 @@
#![allow(clippy::all)]
use crate::cache::{PostCache, PostCommentCache, UserCache};
use crate::cache::*;
use crate::db_conn::Db;
use crate::libs::diesel_logger::LoggingConnection;
use crate::rds_conn::RdsConn;
@ -11,6 +11,7 @@ use diesel::{
insert_into, BoolExpressionMethods, ExpressionMethods, QueryDsl, QueryResult, RunQueryDsl,
TextExpressionMethods,
};
use rocket::futures::{future, join};
use rocket::serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::identity;
@ -191,11 +192,7 @@ impl Post {
}
}
pub async fn get_comments(
&self,
db: &Db,
rconn: &RdsConn,
) -> QueryResult<Vec<Comment>> {
pub async fn get_comments(&self, db: &Db, rconn: &RdsConn) -> QueryResult<Vec<Comment>> {
let mut cacher = PostCommentCache::init(self.id, rconn);
if let Some(cs) = cacher.get().await {
Ok(cs)
@ -212,12 +209,34 @@ impl Post {
pub async fn gets_by_page(
db: &Db,
rconn: &RdsConn,
order_mode: u8,
start: i64,
limit: i64,
) -> QueryResult<Vec<Self>> {
let mut cacher = PostListCommentCache::init(order_mode, &rconn).await;
if cacher.need_fill() {
let pids =
Self::_get_ids_by_page(db, order_mode.clone(), 0, cacher.i64_minlen()).await?;
let ps = Self::get_multi(db, rconn, &pids).await?;
cacher.fill(&ps).await;
}
let pids = if start + limit > cacher.i64_len() {
Self::_get_ids_by_page(db, order_mode, start, limit).await?
} else {
cacher.get_pids(start, limit).await
};
Self::get_multi(db, rconn, &pids).await
}
async fn _get_ids_by_page(
db: &Db,
order_mode: u8,
start: i64,
limit: i64,
) -> QueryResult<Vec<i32>> {
db.run(move |c| {
let mut query = base_query!(posts);
let mut query = base_query!(posts).select(posts::id);
if order_mode > 0 {
query = query.filter(posts::is_reported.eq(false))
}
@ -237,48 +256,54 @@ impl Post {
pub async fn search(
db: &Db,
rconn: &RdsConn,
search_mode: u8,
search_text: String,
start: i64,
limit: i64,
) -> QueryResult<Vec<Self>> {
let search_text2 = search_text.replace("%", "\\%");
db.run(move |c| {
let pat;
let mut query = base_query!(posts).distinct().left_join(comments::table);
// 先用搜索+缓存,性能有问题了再真的做tag表
query = match search_mode {
0 => {
pat = format!("%#{}%", &search_text2);
query
.filter(posts::cw.eq(&search_text))
.or_filter(posts::cw.eq(format!("#{}", &search_text)))
.or_filter(posts::content.like(&pat))
.or_filter(
comments::content
.like(&pat)
.and(comments::is_deleted.eq(false)),
)
}
1 => {
pat = format!("%{}%", search_text2.replace(" ", "%"));
query
.filter(posts::content.like(&pat).or(comments::content.like(&pat)))
.filter(posts::allow_search.eq(true))
}
2 => query
.filter(posts::author_title.eq(&search_text))
.or_filter(comments::author_title.eq(&search_text)),
_ => panic!("Wrong search mode!"),
};
query
.order(posts::id.desc())
.offset(start)
.limit(limit)
.load(with_log!(c))
})
.await
let pids = db
.run(move |c| {
let pat;
let mut query = base_query!(posts)
.select(posts::id)
.distinct()
.left_join(comments::table);
// 先用搜索+缓存,性能有问题了再真的做tag表
query = match search_mode {
0 => {
pat = format!("%#{}%", &search_text2);
query
.filter(posts::cw.eq(&search_text))
.or_filter(posts::cw.eq(format!("#{}", &search_text)))
.or_filter(posts::content.like(&pat))
.or_filter(
comments::content
.like(&pat)
.and(comments::is_deleted.eq(false)),
)
}
1 => {
pat = format!("%{}%", search_text2.replace(" ", "%"));
query
.filter(posts::content.like(&pat).or(comments::content.like(&pat)))
.filter(posts::allow_search.eq(true))
}
2 => query
.filter(posts::author_title.eq(&search_text))
.or_filter(comments::author_title.eq(&search_text)),
_ => panic!("Wrong search mode!"),
};
query
.order(posts::id.desc())
.offset(start)
.limit(limit)
.load(with_log!(c))
})
.await?;
Self::get_multi(db, rconn, &pids).await
}
pub async fn create(db: &Db, new_post: NewPost) -> QueryResult<Self> {
@ -303,6 +328,18 @@ impl Post {
Ok(())
}
pub async fn update_comment_time(&mut self, db: &Db, t: DateTime<Utc>) -> QueryResult<()> {
let pid = self.id;
*self = db
.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::last_comment_time.eq(t))
.get_result(with_log!(c))
})
.await?;
Ok(())
}
pub async fn change_n_comments(&mut self, db: &Db, delta: i32) -> QueryResult<()> {
let pid = self.id;
*self = db
@ -343,7 +380,15 @@ impl Post {
PostCache::init(rconn).sets(&vec![self]).await;
}
pub async fn refresh_cache(&self, rconn: &RdsConn, is_new: bool) {
self.set_instance_cache(rconn).await;
join!(
self.set_instance_cache(rconn),
future::join_all((if is_new { 0..4 } else { 1..4 }).map(|mode| async move {
PostListCommentCache::init(mode, &rconn.clone())
.await
.put(self)
.await
})),
);
}
}

Loading…
Cancel
Save