diff --git a/src/api/post.rs b/src/api/post.rs index d2eb2a5..a7e09c3 100644 --- a/src/api/post.rs +++ b/src/api/post.rs @@ -36,6 +36,7 @@ pub struct PostOutput { comments: Option>, can_del: bool, attention: bool, + hot_score: Option, // for old version frontend timestamp: i64, likenum: i32, @@ -95,6 +96,7 @@ async fn p2output( .has(p.id) .await .unwrap_or_default(), + hot_score: if user.is_admin { Some(p.hot_score) } else { None }, // for old version frontend timestamp: p.create_time.timestamp(), likenum: p.n_attentions, @@ -185,7 +187,7 @@ pub async fn edit_cw(cwi: Form, user: CurrentUser, db: Db) -> JsonAPI { #[get("/getmulti?")] pub async fn get_multi(pids: Vec, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI { - let ps = Post::get_multi(&db, pids).await?; + let ps = Post::get_multi_with_cache(&db, &rconn, &pids).await?; let ps_data = ps2outputs(&ps, &user, &db, &rconn).await; Ok(json!({ diff --git a/src/cache.rs b/src/cache.rs index 2290dfb..08bfcc5 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -5,53 +5,97 @@ use rocket::serde::json::serde_json; // can use rocket::serde::json::to_string in master version const INSTANCE_EXPIRE_TIME: usize = 60 * 60; +macro_rules! post_cache_key { + ($id: expr) => { + format!("hole_v2:cache:post:{}", $id) + }; +} pub struct PostCache { - key: String, rconn: RdsConn, } impl PostCache { - pub fn init(pid: &i32, rconn: &RdsConn) -> Self { + pub fn init(rconn: &RdsConn) -> Self { PostCache { - key: format!("hole_v2:cache:post:{}", pid), rconn: rconn.clone(), } } - pub async fn set(&mut self, p: &Post) { - self.rconn - .set_ex( - &self.key, + pub async fn sets(&mut self, ps: &Vec<&Post>) { + if ps.is_empty() { + return; + } + let kvs: Vec<(String, String)> = ps + .iter() + .map(|p| ( + post_cache_key!(p.id), serde_json::to_string(p).unwrap(), - INSTANCE_EXPIRE_TIME, - ) + ) ).collect(); + dbg!(&kvs); + let ret = self.rconn + .set_multiple(&kvs) .await .unwrap_or_else(|e| { - warn!("set post cache failed: {}, {}", e, p.id); - }) + warn!("set post cache failed: {}", e); + "x".to_string() + }); + dbg!(ret); } - pub async fn get(&mut self) -> Option { - let rds_result = self.rconn.get::<&String, String>(&self.key).await; - if let Ok(s) = rds_result { - debug!("hint user cache"); - self.rconn - .expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME) - .await - .unwrap_or_else(|e| { - warn!( - "get post cache, set new expire failed: {}, {}, {} ", - e, &self.key, &s - ); - false - }); + pub async fn get(&mut self, pid: &i32) -> Option { + let key = post_cache_key!(pid); + let rds_result: Option = self + .rconn + .get::>(key) + .await + .unwrap_or_else(|e| { + warn!("try to get post cache, connect rds fail, {}", e); + None + }); + + rds_result.and_then(|s| { serde_json::from_str(&s).unwrap_or_else(|e| { warn!("get post cache, decode failed {}, {}", e, s); None }) - } else { - None + }) + } + + pub async fn gets(&mut self, pids: &Vec) -> Vec> { + // 长度为1时会走GET而非MGET,返回值格式不兼容。愚蠢的设计。 + match pids.len() { + 0 => vec![], + 1 => vec![self.get(&pids[0]).await], + _ => { + let ks: Vec = pids.iter().map(|pid| post_cache_key!(pid)).collect(); + // dbg!(&ks); + // Vec is single arg, while &Vec is not. Seems a bug. + let rds_result: Vec> = self + .rconn + .get::, Vec>>(ks) + .await + .unwrap_or_else(|e| { + warn!("try to get posts cache, connect rds fail, {}", e); + vec![None; pids.len()] + }); + // dbg!(&rds_result); + + // 定期热度衰减的时候会清空缓存,这里设不设置过期时间影响不大 + + rds_result + .into_iter() + .map(|x| { + // dbg!(&x); + x.and_then(|s| { + serde_json::from_str(&s).unwrap_or_else(|e| { + warn!("get post cache, decode failed {}, {}", e, s); + None + }) + }) + }) + .collect() + } } } } @@ -85,19 +129,19 @@ impl UserCache { pub async fn get(&mut self) -> Option { let rds_result = self.rconn.get::<&String, String>(&self.key).await; if let Ok(s) = rds_result { - debug!("hint post cache"); + debug!("hint user cache"); self.rconn .expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME) .await .unwrap_or_else(|e| { warn!( - "get post cache, set new expire failed: {}, {}, {} ", + "get user cache, set new expire failed: {}, {}, {} ", e, &self.key, &s ); false }); serde_json::from_str(&s).unwrap_or_else(|e| { - warn!("get post cache, decode failed {}, {}", e, s); + warn!("get user cache, decode failed {}, {}", e, s); None }) } else { diff --git a/src/models.rs b/src/models.rs index 89fac30..215b169 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,17 +1,19 @@ #![allow(clippy::all)] +use crate::cache::{PostCache, UserCache}; use crate::db_conn::Db; use crate::libs::diesel_logger::LoggingConnection; use crate::rds_conn::RdsConn; -use crate::cache::{PostCache, UserCache}; use crate::schema::*; use chrono::{offset::Utc, DateTime}; +use diesel::dsl::any; use diesel::{ insert_into, BoolExpressionMethods, ExpressionMethods, QueryDsl, QueryResult, RunQueryDsl, TextExpressionMethods, }; -use diesel::dsl::any; use rocket::serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::convert::identity; no_arg_sql_function!(RANDOM, (), "Represents the sql RANDOM() function"); @@ -28,12 +30,14 @@ macro_rules! get { macro_rules! get_multi { ($table:ident) => { pub async fn get_multi(db: &Db, ids: Vec) -> QueryResult> { + if ids.is_empty() { + return Ok(vec![]); + } // eq(any()) is only for postgres db.run(move |c| { $table::table .filter($table::id.eq(any(ids))) .filter($table::is_deleted.eq(false)) - .order($table::id.desc()) .load(with_log!(c)) }) .await @@ -83,7 +87,7 @@ pub struct Comment { pub post_id: i32, } -#[derive(Queryable, Insertable, Serialize, Deserialize)] +#[derive(Queryable, Insertable, Serialize, Deserialize, Debug)] #[serde(crate = "rocket::serde")] pub struct Post { pub id: i32, @@ -131,15 +135,52 @@ impl Post { set_deleted!(posts); - pub async fn get_with_cache(db: &Db, rconn: &RdsConn, id: i32) -> QueryResult { - let mut cacher = PostCache::init(&id, &rconn); - if let Some(p) = cacher.get().await { - Ok(p) - } else { - let p = Self::get(db, id).await?; - cacher.set(&p).await; - Ok(p) + pub async fn get_multi_with_cache( + db: &Db, + rconn: &RdsConn, + ids: &Vec, + ) -> QueryResult> { + let mut cacher = PostCache::init(&rconn); + let mut cached_posts = cacher.gets(ids).await; + let mut id2po = HashMap::>::new(); + + // dbg!(&cached_posts); + + let missing_ids = ids + .iter() + .zip(cached_posts.iter_mut()) + .filter_map(|(pid, p)| match p { + None => { + id2po.insert(pid.clone(), p); + Some(pid) + }, + _ => None, + }) + .copied() + .collect(); + + dbg!(&missing_ids); + let missing_ps = Self::get_multi(db, missing_ids).await?; + // dbg!(&missing_ps); + + cacher.sets(&missing_ps.iter().map(identity).collect()).await; + + for p in missing_ps.into_iter() { + if let Some(op) = id2po.get_mut(&p.id) { + **op = Some(p); + } } + // dbg!(&cached_posts); + Ok( + cached_posts.into_iter().filter_map(identity).collect() + ) + } + + pub async fn get_with_cache(db: &Db, rconn: &RdsConn, id: i32) -> QueryResult { + Self::get_multi_with_cache(db, rconn, &vec![id]) + .await? + .pop() + .ok_or(diesel::result::Error::NotFound) } pub async fn gets_by_page( @@ -177,9 +218,7 @@ impl Post { let search_text2 = search_text.replace("%", "\\%"); db.run(move |c| { let pat; - let mut query = base_query!(posts) - .distinct() - .left_join(comments::table); + let mut query = base_query!(posts).distinct().left_join(comments::table); // 先用搜索+缓存,性能有问题了再真的做tag表 query = match search_mode { 0 => { @@ -188,7 +227,11 @@ impl Post { .filter(posts::cw.eq(&search_text)) .or_filter(posts::cw.eq(format!("#{}", &search_text))) .or_filter(posts::content.like(&pat)) - .or_filter(comments::content.like(&pat).and(comments::is_deleted.eq(false))) + .or_filter( + comments::content + .like(&pat) + .and(comments::is_deleted.eq(false)), + ) } 1 => { pat = format!("%{}%", search_text2.replace(" ", "%")); @@ -262,7 +305,7 @@ impl Post { } pub async fn set_instance_cache(&self, rconn: &RdsConn) { - PostCache::init(&self.id, rconn).set(self).await; + PostCache::init(rconn).sets(&vec![self]).await; } pub async fn refresh_cache(&self, rconn: &RdsConn, is_new: bool) { self.set_instance_cache(rconn).await;