Browse Source

feat: cache for get multi posts

master
hole-thu 3 years ago
parent
commit
cd940d59ba
  1. 4
      src/api/post.rs
  2. 104
      src/cache.rs
  3. 77
      src/models.rs

4
src/api/post.rs

@ -36,6 +36,7 @@ pub struct PostOutput {
comments: Option<Vec<CommentOutput>>, comments: Option<Vec<CommentOutput>>,
can_del: bool, can_del: bool,
attention: bool, attention: bool,
hot_score: Option<i32>,
// for old version frontend // for old version frontend
timestamp: i64, timestamp: i64,
likenum: i32, likenum: i32,
@ -95,6 +96,7 @@ async fn p2output(
.has(p.id) .has(p.id)
.await .await
.unwrap_or_default(), .unwrap_or_default(),
hot_score: if user.is_admin { Some(p.hot_score) } else { None },
// for old version frontend // for old version frontend
timestamp: p.create_time.timestamp(), timestamp: p.create_time.timestamp(),
likenum: p.n_attentions, likenum: p.n_attentions,
@ -185,7 +187,7 @@ pub async fn edit_cw(cwi: Form<CwInput>, user: CurrentUser, db: Db) -> JsonAPI {
#[get("/getmulti?<pids>")] #[get("/getmulti?<pids>")]
pub async fn get_multi(pids: Vec<i32>, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI { pub async fn get_multi(pids: Vec<i32>, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI {
let ps = Post::get_multi(&db, pids).await?; let ps = Post::get_multi_with_cache(&db, &rconn, &pids).await?;
let ps_data = ps2outputs(&ps, &user, &db, &rconn).await; let ps_data = ps2outputs(&ps, &user, &db, &rconn).await;
Ok(json!({ Ok(json!({

104
src/cache.rs

@ -5,53 +5,97 @@ use rocket::serde::json::serde_json;
// can use rocket::serde::json::to_string in master version // can use rocket::serde::json::to_string in master version
const INSTANCE_EXPIRE_TIME: usize = 60 * 60; const INSTANCE_EXPIRE_TIME: usize = 60 * 60;
macro_rules! post_cache_key {
($id: expr) => {
format!("hole_v2:cache:post:{}", $id)
};
}
pub struct PostCache { pub struct PostCache {
key: String,
rconn: RdsConn, rconn: RdsConn,
} }
impl PostCache { impl PostCache {
pub fn init(pid: &i32, rconn: &RdsConn) -> Self { pub fn init(rconn: &RdsConn) -> Self {
PostCache { PostCache {
key: format!("hole_v2:cache:post:{}", pid),
rconn: rconn.clone(), rconn: rconn.clone(),
} }
} }
pub async fn set(&mut self, p: &Post) { pub async fn sets(&mut self, ps: &Vec<&Post>) {
self.rconn if ps.is_empty() {
.set_ex( return;
&self.key, }
let kvs: Vec<(String, String)> = ps
.iter()
.map(|p| (
post_cache_key!(p.id),
serde_json::to_string(p).unwrap(), serde_json::to_string(p).unwrap(),
INSTANCE_EXPIRE_TIME, ) ).collect();
) dbg!(&kvs);
let ret = self.rconn
.set_multiple(&kvs)
.await .await
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
warn!("set post cache failed: {}, {}", e, p.id); warn!("set post cache failed: {}", e);
}) "x".to_string()
});
dbg!(ret);
} }
pub async fn get(&mut self) -> Option<Post> { pub async fn get(&mut self, pid: &i32) -> Option<Post> {
let rds_result = self.rconn.get::<&String, String>(&self.key).await; let key = post_cache_key!(pid);
if let Ok(s) = rds_result { let rds_result: Option<String> = self
debug!("hint user cache"); .rconn
self.rconn .get::<String, Option<String>>(key)
.expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME) .await
.await .unwrap_or_else(|e| {
.unwrap_or_else(|e| { warn!("try to get post cache, connect rds fail, {}", e);
warn!( None
"get post cache, set new expire failed: {}, {}, {} ", });
e, &self.key, &s
); rds_result.and_then(|s| {
false
});
serde_json::from_str(&s).unwrap_or_else(|e| { serde_json::from_str(&s).unwrap_or_else(|e| {
warn!("get post cache, decode failed {}, {}", e, s); warn!("get post cache, decode failed {}, {}", e, s);
None None
}) })
} else { })
None }
pub async fn gets(&mut self, pids: &Vec<i32>) -> Vec<Option<Post>> {
// 长度为1时会走GET而非MGET,返回值格式不兼容。愚蠢的设计。
match pids.len() {
0 => vec![],
1 => vec![self.get(&pids[0]).await],
_ => {
let ks: Vec<String> = pids.iter().map(|pid| post_cache_key!(pid)).collect();
// dbg!(&ks);
// Vec is single arg, while &Vec is not. Seems a bug.
let rds_result: Vec<Option<String>> = self
.rconn
.get::<Vec<String>, Vec<Option<String>>>(ks)
.await
.unwrap_or_else(|e| {
warn!("try to get posts cache, connect rds fail, {}", e);
vec![None; pids.len()]
});
// dbg!(&rds_result);
// 定期热度衰减的时候会清空缓存,这里设不设置过期时间影响不大
rds_result
.into_iter()
.map(|x| {
// dbg!(&x);
x.and_then(|s| {
serde_json::from_str(&s).unwrap_or_else(|e| {
warn!("get post cache, decode failed {}, {}", e, s);
None
})
})
})
.collect()
}
} }
} }
} }
@ -85,19 +129,19 @@ impl UserCache {
pub async fn get(&mut self) -> Option<User> { pub async fn get(&mut self) -> Option<User> {
let rds_result = self.rconn.get::<&String, String>(&self.key).await; let rds_result = self.rconn.get::<&String, String>(&self.key).await;
if let Ok(s) = rds_result { if let Ok(s) = rds_result {
debug!("hint post cache"); debug!("hint user cache");
self.rconn self.rconn
.expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME) .expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME)
.await .await
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
warn!( warn!(
"get post cache, set new expire failed: {}, {}, {} ", "get user cache, set new expire failed: {}, {}, {} ",
e, &self.key, &s e, &self.key, &s
); );
false false
}); });
serde_json::from_str(&s).unwrap_or_else(|e| { serde_json::from_str(&s).unwrap_or_else(|e| {
warn!("get post cache, decode failed {}, {}", e, s); warn!("get user cache, decode failed {}, {}", e, s);
None None
}) })
} else { } else {

77
src/models.rs

@ -1,17 +1,19 @@
#![allow(clippy::all)] #![allow(clippy::all)]
use crate::cache::{PostCache, UserCache};
use crate::db_conn::Db; use crate::db_conn::Db;
use crate::libs::diesel_logger::LoggingConnection; use crate::libs::diesel_logger::LoggingConnection;
use crate::rds_conn::RdsConn; use crate::rds_conn::RdsConn;
use crate::cache::{PostCache, UserCache};
use crate::schema::*; use crate::schema::*;
use chrono::{offset::Utc, DateTime}; use chrono::{offset::Utc, DateTime};
use diesel::dsl::any;
use diesel::{ use diesel::{
insert_into, BoolExpressionMethods, ExpressionMethods, QueryDsl, QueryResult, RunQueryDsl, insert_into, BoolExpressionMethods, ExpressionMethods, QueryDsl, QueryResult, RunQueryDsl,
TextExpressionMethods, TextExpressionMethods,
}; };
use diesel::dsl::any;
use rocket::serde::{Deserialize, Serialize}; use rocket::serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::identity;
no_arg_sql_function!(RANDOM, (), "Represents the sql RANDOM() function"); no_arg_sql_function!(RANDOM, (), "Represents the sql RANDOM() function");
@ -28,12 +30,14 @@ macro_rules! get {
macro_rules! get_multi { macro_rules! get_multi {
($table:ident) => { ($table:ident) => {
pub async fn get_multi(db: &Db, ids: Vec<i32>) -> QueryResult<Vec<Self>> { pub async fn get_multi(db: &Db, ids: Vec<i32>) -> QueryResult<Vec<Self>> {
if ids.is_empty() {
return Ok(vec![]);
}
// eq(any()) is only for postgres // eq(any()) is only for postgres
db.run(move |c| { db.run(move |c| {
$table::table $table::table
.filter($table::id.eq(any(ids))) .filter($table::id.eq(any(ids)))
.filter($table::is_deleted.eq(false)) .filter($table::is_deleted.eq(false))
.order($table::id.desc())
.load(with_log!(c)) .load(with_log!(c))
}) })
.await .await
@ -83,7 +87,7 @@ pub struct Comment {
pub post_id: i32, pub post_id: i32,
} }
#[derive(Queryable, Insertable, Serialize, Deserialize)] #[derive(Queryable, Insertable, Serialize, Deserialize, Debug)]
#[serde(crate = "rocket::serde")] #[serde(crate = "rocket::serde")]
pub struct Post { pub struct Post {
pub id: i32, pub id: i32,
@ -131,15 +135,52 @@ impl Post {
set_deleted!(posts); set_deleted!(posts);
pub async fn get_with_cache(db: &Db, rconn: &RdsConn, id: i32) -> QueryResult<Self> { pub async fn get_multi_with_cache(
let mut cacher = PostCache::init(&id, &rconn); db: &Db,
if let Some(p) = cacher.get().await { rconn: &RdsConn,
Ok(p) ids: &Vec<i32>,
} else { ) -> QueryResult<Vec<Self>> {
let p = Self::get(db, id).await?; let mut cacher = PostCache::init(&rconn);
cacher.set(&p).await; let mut cached_posts = cacher.gets(ids).await;
Ok(p) let mut id2po = HashMap::<i32, &mut Option<Post>>::new();
// dbg!(&cached_posts);
let missing_ids = ids
.iter()
.zip(cached_posts.iter_mut())
.filter_map(|(pid, p)| match p {
None => {
id2po.insert(pid.clone(), p);
Some(pid)
},
_ => None,
})
.copied()
.collect();
dbg!(&missing_ids);
let missing_ps = Self::get_multi(db, missing_ids).await?;
// dbg!(&missing_ps);
cacher.sets(&missing_ps.iter().map(identity).collect()).await;
for p in missing_ps.into_iter() {
if let Some(op) = id2po.get_mut(&p.id) {
**op = Some(p);
}
} }
// dbg!(&cached_posts);
Ok(
cached_posts.into_iter().filter_map(identity).collect()
)
}
pub async fn get_with_cache(db: &Db, rconn: &RdsConn, id: i32) -> QueryResult<Self> {
Self::get_multi_with_cache(db, rconn, &vec![id])
.await?
.pop()
.ok_or(diesel::result::Error::NotFound)
} }
pub async fn gets_by_page( pub async fn gets_by_page(
@ -177,9 +218,7 @@ impl Post {
let search_text2 = search_text.replace("%", "\\%"); let search_text2 = search_text.replace("%", "\\%");
db.run(move |c| { db.run(move |c| {
let pat; let pat;
let mut query = base_query!(posts) let mut query = base_query!(posts).distinct().left_join(comments::table);
.distinct()
.left_join(comments::table);
// 先用搜索+缓存,性能有问题了再真的做tag表 // 先用搜索+缓存,性能有问题了再真的做tag表
query = match search_mode { query = match search_mode {
0 => { 0 => {
@ -188,7 +227,11 @@ impl Post {
.filter(posts::cw.eq(&search_text)) .filter(posts::cw.eq(&search_text))
.or_filter(posts::cw.eq(format!("#{}", &search_text))) .or_filter(posts::cw.eq(format!("#{}", &search_text)))
.or_filter(posts::content.like(&pat)) .or_filter(posts::content.like(&pat))
.or_filter(comments::content.like(&pat).and(comments::is_deleted.eq(false))) .or_filter(
comments::content
.like(&pat)
.and(comments::is_deleted.eq(false)),
)
} }
1 => { 1 => {
pat = format!("%{}%", search_text2.replace(" ", "%")); pat = format!("%{}%", search_text2.replace(" ", "%"));
@ -262,7 +305,7 @@ impl Post {
} }
pub async fn set_instance_cache(&self, rconn: &RdsConn) { pub async fn set_instance_cache(&self, rconn: &RdsConn) {
PostCache::init(&self.id, rconn).set(self).await; PostCache::init(rconn).sets(&vec![self]).await;
} }
pub async fn refresh_cache(&self, rconn: &RdsConn, is_new: bool) { pub async fn refresh_cache(&self, rconn: &RdsConn, is_new: bool) {
self.set_instance_cache(rconn).await; self.set_instance_cache(rconn).await;

Loading…
Cancel
Save