From 8e386d98d0582c2ad6bce8edd994dedd408f7a11 Mon Sep 17 00:00:00 2001 From: hole-thu Date: Mon, 23 Mar 2026 22:14:01 +0800 Subject: [PATCH] cache in memory --- Cargo.toml | 1 + README.md | 2 +- src/api/attention.rs | 6 +- src/api/comment.rs | 15 +- src/api/mod.rs | 2 +- src/api/operation.rs | 22 +- src/api/post.rs | 36 ++-- src/api/reaction.rs | 4 +- src/api/search.rs | 1 - src/api/systemlog.rs | 4 +- src/cache.rs | 498 +++++++++++++++++++------------------------ src/main.rs | 7 +- src/models.rs | 112 +++++----- 13 files changed, 318 insertions(+), 392 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f3725bd..6070832 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,3 +29,4 @@ futures-util = "0.3.24" lru = "0.11" reqwest = { version = "0.11.10", features = ["json"], optional = true } +moka = { version = "0.12.15", features = ["future"] } diff --git a/README.md b/README.md index d038841..dfd11c1 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ clone 代码 (略) 安装postgresql (略) -安装redis (略) +安装redis/valkey (略) #### 准备数据库 diff --git a/src/api/attention.rs b/src/api/attention.rs index 2dbbc92..c6b56db 100644 --- a/src/api/attention.rs +++ b/src/api/attention.rs @@ -31,7 +31,7 @@ pub async fn attention_post( // 临时用户不允许手动关注 user.id.ok_or(YouAreTmp)?; - let mut p = Post::get(&db, &rconn, ai.pid).await?; + let mut p = Post::get(&db, ai.pid).await?; p.check_permission(&user, "r")?; let mut att = Attention::init(&user.namehash, &rconn); let switch_to = ai.switch == 1; @@ -59,7 +59,7 @@ pub async fn attention_post( if switch_to && user.is_admin { update!(p, posts, &db, { is_reported, to false }); } - p.refresh_cache(&rconn, false).await; + p.refresh_cache(false).await; } Ok(json!({ @@ -75,7 +75,7 @@ pub async fn attention_post( pub async fn get_attention(user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi { let mut ids = Attention::init(&user.namehash, &rconn).all().await?; ids.sort_by_key(|x| -x); - let ps: Vec = Post::get_multi(&db, &rconn, &ids) + let ps: Vec = Post::get_multi(&db, &ids) .await? .into_iter() .filter(|post| { diff --git a/src/api/comment.rs b/src/api/comment.rs index d26ba36..2d0d66e 100644 --- a/src/api/comment.rs +++ b/src/api/comment.rs @@ -88,14 +88,14 @@ pub async fn c2output<'r>( #[get("/getcomment?")] pub async fn get_comment(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi { - let p = Post::get(&db, &rconn, pid).await?; + let p = Post::get(&db, pid).await?; if p.is_deleted { return Err(ApiError::Pc(IsDeleted)); } - let cs = p.get_comments(&db, &rconn).await?; + let cs = p.get_comments(&db).await?; let hash_list = cs.iter().map(|c| &c.author_hash).collect::>(); - let cached_block_dict = BlockDictCache::init(&user.namehash, p.id, &rconn) - .get_or_create(&user, &hash_list) + let cached_block_dict = BlockDictCache::init(&user.namehash, p.id) + .get_or_create(&user, &hash_list, &rconn) .await?; let data = c2output(&p, &cs, &user, &cached_block_dict).await; @@ -117,7 +117,7 @@ pub async fn add_comment( db: Db, rconn: RdsConn, ) -> JsonApi { - let mut p = Post::get(&db, &rconn, pid).await?; + let mut p = Post::get(&db, pid).await?; if p.author_hash != user.namehash { user.id.ok_or(YouAreTmp)?; } @@ -160,10 +160,7 @@ pub async fn add_comment( { hot_score, add hs_delta } ); - join!( - p.refresh_cache(&rconn, false), - p.clear_comments_cache(&rconn), - ); + join!(p.refresh_cache(false), p.clear_comments_cache(),); Ok(json!({ "code": 0 diff --git a/src/api/mod.rs b/src/api/mod.rs index 9834e49..5f0e2c3 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -101,7 +101,7 @@ impl<'r> FromRequest<'r> for CurrentUser { Some(CurrentUser::from_hash(&rconn, rh.hash_with_salt(sp[1])).await) } else { let db = try_outcome!(request.guard::().await); - if let Some(u) = User::get_by_token(&db, &rconn, token).await { + if let Some(u) = User::get_by_token(&db, token).await { let namehash = rh.hash_with_salt(&u.name); let user_base = CurrentUser::from_hash(&rconn, namehash).await; Some(CurrentUser { diff --git a/src/api/operation.rs b/src/api/operation.rs index 7ff956f..46dbb8b 100644 --- a/src/api/operation.rs +++ b/src/api/operation.rs @@ -22,7 +22,7 @@ pub async fn delete(di: Form, user: CurrentUser, db: Db, rconn: Rds "cid" => { let mut c = Comment::get(&db, di.id).await?; c.soft_delete(&user, &db).await?; - let mut p = Post::get(&db, &rconn, c.post_id).await?; + let mut p = Post::get(&db, c.post_id).await?; update!( p, posts, @@ -31,13 +31,13 @@ pub async fn delete(di: Form, user: CurrentUser, db: Db, rconn: Rds { hot_score, add -1 } ); - p.refresh_cache(&rconn, false).await; - p.clear_comments_cache(&rconn).await; + p.refresh_cache(false).await; + p.clear_comments_cache().await; (c.author_hash.clone(), p) } "pid" => { - let mut p = Post::get(&db, &rconn, di.id).await?; + let mut p = Post::get(&db, di.id).await?; // 有评论:清空主楼而非删除 if p.author_hash == user.namehash && p.n_comments > 0 { @@ -52,7 +52,7 @@ pub async fn delete(di: Form, user: CurrentUser, db: Db, rconn: Rds } // 如果是删除,需要也从0号缓存队列中去掉 - p.refresh_cache(&rconn, true).await; + p.refresh_cache(true).await; (p.author_hash.clone(), p) } @@ -110,10 +110,10 @@ pub async fn report(ri: Form, user: CurrentUser, db: Db, rconn: Rds (!ri.reason.is_empty()).then_some(()).ok_or(NoReason)?; - let mut p = Post::get(&db, &rconn, ri.pid).await?; + let mut p = Post::get(&db, ri.pid).await?; if ri.should_hide.is_some() { update!(p, posts, &db, { is_reported, to true }); - p.refresh_cache(&rconn, false).await; + p.refresh_cache(false).await; } Systemlog { @@ -142,7 +142,7 @@ pub async fn report(ri: Form, user: CurrentUser, db: Db, rconn: Rds ) .await?; Attention::init(&user.namehash, &rconn).add(p.id).await?; - p.refresh_cache(&rconn, true).await; + p.refresh_cache(true).await; code0!() } @@ -163,7 +163,7 @@ pub async fn block(bi: Form, user: CurrentUser, db: Db, rconn: RdsCo let pid; let nh_to_block = match bi.content_type.as_str() { "post" => { - let p = Post::get(&db, &rconn, bi.id).await?; + let p = Post::get(&db, bi.id).await?; pid = p.id; p.author_hash } @@ -187,9 +187,7 @@ pub async fn block(bi: Form, user: CurrentUser, db: Db, rconn: RdsCo .unwrap_or_default() }; - BlockDictCache::init(&user.namehash, pid, &rconn) - .clear() - .await?; + BlockDictCache::init(&user.namehash, pid).clear().await; Ok(json!({ "code": 0, diff --git a/src/api/post.rs b/src/api/post.rs index 8ba3bff..70d2c6d 100644 --- a/src/api/post.rs +++ b/src/api/post.rs @@ -67,7 +67,7 @@ pub struct CwInput { async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> Api { let comments: Option> = if p.n_comments < 5 { - Some(p.get_comments(db, rconn).await?) + Some(p.get_comments(db).await?) } else { None }; @@ -78,8 +78,8 @@ async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> Api .chain(std::iter::once(&p.author_hash)) .collect::>(); //dbg!(&hash_list); - let cached_block_dict = BlockDictCache::init(&user.namehash, p.id, rconn) - .get_or_create(user, &hash_list) + let cached_block_dict = BlockDictCache::init(&user.namehash, p.id) + .get_or_create(user, &hash_list, rconn) .await?; let is_blocked = cached_block_dict[&p.author_hash]; let can_view = @@ -144,7 +144,7 @@ pub async fn ps2outputs( #[get("/getone?")] pub async fn get_one(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi { user.id.ok_or(YouAreTmp)?; - let p = Post::get(&db, &rconn, pid).await?; + let p = Post::get(&db, pid).await?; p.check_permission(&user, "ro")?; Ok(json!({ "data": p2output(&p, &user,&db, &rconn).await?, @@ -165,18 +165,12 @@ pub async fn get_list( let page = p.unwrap_or(1); let page_size = 25; let start = (page - 1) * page_size; - let ps: Vec = Post::gets_by_page( - &db, - &rconn, - room_id, - order_mode, - start.into(), - page_size.into(), - ) - .await? - .into_iter() - .filter(|post| page < 40 || !post.get_is_private()) - .collect(); + let ps: Vec = + Post::gets_by_page(&db, room_id, order_mode, start as usize, page_size as usize) + .await? + .into_iter() + .filter(|post| page < 40 || !post.get_is_private()) + .collect(); let ps_data = ps2outputs(&ps, &user, &db, &rconn).await?; @@ -227,7 +221,7 @@ pub async fn publish_post( ) .await?; Attention::init(&user.namehash, &rconn).add(p.id).await?; - p.refresh_cache(&rconn, true).await; + p.refresh_cache(true).await; if !poi.poll_options.is_empty() { PollOption::init(p.id, &rconn) @@ -238,18 +232,18 @@ pub async fn publish_post( } #[post("/editcw", data = "")] -pub async fn edit_cw(cwi: Form, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi { - let mut p = Post::get(&db, &rconn, cwi.pid).await?; +pub async fn edit_cw(cwi: Form, user: CurrentUser, db: Db) -> JsonApi { + let mut p = Post::get(&db, cwi.pid).await?; p.check_permission(&user, "w")?; update!(p, posts, &db, { cw, to cwi.cw.to_string() }); - p.refresh_cache(&rconn, false).await; + p.refresh_cache(false).await; code0!() } #[get("/getmulti?")] pub async fn get_multi(pids: Vec, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi { user.id.ok_or(YouAreTmp)?; - let ps: Vec = Post::get_multi(&db, &rconn, &pids) + let ps: Vec = Post::get_multi(&db, &pids) .await? .into_iter() .filter(|post| { diff --git a/src/api/reaction.rs b/src/api/reaction.rs index 325ef2f..32f27b3 100644 --- a/src/api/reaction.rs +++ b/src/api/reaction.rs @@ -22,7 +22,7 @@ pub async fn reaction( ) -> JsonApi { user.id.ok_or(YouAreTmp)?; - let mut p = Post::get(&db, &rconn, pid).await?; + let mut p = Post::get(&db, pid).await?; p.check_permission(&user, "r")?; let mut r_up = Reaction::init(pid, 1, &rconn); let mut r_down = Reaction::init(pid, -1, &rconn); @@ -51,7 +51,7 @@ pub async fn reaction( { down_votes, add delta_down } ); - p.refresh_cache(&rconn, false).await; + p.refresh_cache(false).await; } Ok(json!({ diff --git a/src/api/search.rs b/src/api/search.rs index 151e469..11c9913 100644 --- a/src/api/search.rs +++ b/src/api/search.rs @@ -25,7 +25,6 @@ pub async fn search( } else { Post::search( &db, - &rconn, room_id, search_mode, keywords.to_string(), diff --git a/src/api/systemlog.rs b/src/api/systemlog.rs index b6dee4c..446c48c 100644 --- a/src/api/systemlog.rs +++ b/src/api/systemlog.rs @@ -12,7 +12,7 @@ pub async fn get_systemlog( user: CurrentUser, rh: &State, db: Db, - mut rconn: RdsConn, + rconn: RdsConn, ) -> JsonApi { let logs = Systemlog::get_list(&rconn, 50).await?; @@ -20,7 +20,7 @@ pub async fn get_systemlog( "tmp_token": rh.get_tmp_token(), "salt": look!(rh.salt), "start_time": rh.start_time.timestamp(), - "user_count": cached_user_count(&db, &mut rconn).await?, + "user_count": cached_user_count(&db).await?, "custom_title": user.custom_title, "admin_list": get_admin_list(&rconn).await?, "candidate_list": get_candidate_list(&rconn).await?, diff --git a/src/cache.rs b/src/cache.rs index fe37018..36d5982 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -2,213 +2,163 @@ use crate::api::{Api, CurrentUser}; use crate::db_conn::Db; use crate::models::{Comment, Post, User}; use crate::rds_conn::RdsConn; -use crate::rds_models::{clear_all, init, BlockedUsers}; +use crate::rds_models::BlockedUsers; +use diesel::result::{Error as DieselError, QueryResult}; +use moka::future::Cache; use rand::Rng; -use redis::{AsyncCommands, RedisError, RedisResult}; -use rocket::serde::json::serde_json; -// can use rocket::serde::json::to_string in master version -use futures_util::stream::StreamExt; +use redis::RedisResult; use rocket::futures::future; +use rocket::tokio::sync::RwLock; use std::collections::HashMap; +use std::future::Future; +use std::io; +use std::sync::Arc; +use std::sync::OnceLock; +use std::time::Duration; -const KEY_USER_COUNT: &str = "hole_v2:cache:user_count"; -const USER_COUNT_EXPIRE_TIME: u64 = 5 * 60; - +const USER_COUNT_EXPIRE_TIME: u64 = 60; const INSTANCE_EXPIRE_TIME: u64 = 60 * 60; -const MIN_LENGTH: isize = 200; -const MAX_LENGTH: isize = 900; -const CUT_LENGTH: isize = 100; - -macro_rules! post_cache_key { - ($id: expr) => { - format!("hole_v2:cache:post:{}:v2", $id) - }; +// Global cache getters using OnceLock +fn post_cache() -> &'static Cache { + static CACHE: OnceLock> = OnceLock::new(); + CACHE.get_or_init(|| Cache::builder().max_capacity(10_000).build()) } -pub struct PostCache { - rconn: RdsConn, +fn post_comment_cache() -> &'static Cache> { + static CACHE: OnceLock>> = OnceLock::new(); + CACHE.get_or_init(|| { + Cache::builder() + .time_to_idle(Duration::from_secs(INSTANCE_EXPIRE_TIME)) + .build() + }) } +// Each list in post_list_cache, keyed by room_id and mode, is a sorted list. The element is a pair of numbers. The first one is the weight used to sort, the second one is the post id. +fn post_list_cache() -> &'static Cache>>> { + static CACHE: OnceLock>>>> = OnceLock::new(); + CACHE.get_or_init(|| Cache::builder().build()) +} + +fn user_cache() -> &'static Cache { + static CACHE: OnceLock> = OnceLock::new(); + CACHE.get_or_init(|| { + Cache::builder() + .time_to_idle(Duration::from_secs(INSTANCE_EXPIRE_TIME)) + .build() + }) +} + +fn block_dict_cache() -> &'static Cache>>> { + static CACHE: OnceLock>>>> = OnceLock::new(); + CACHE.get_or_init(|| { + Cache::builder() + .time_to_idle(Duration::from_secs(INSTANCE_EXPIRE_TIME)) + .build() + }) +} + +fn user_count_cache() -> &'static Cache { + static CACHE: OnceLock> = OnceLock::new(); + CACHE.get_or_init(|| { + Cache::builder() + .time_to_live(Duration::from_secs(USER_COUNT_EXPIRE_TIME)) + .build() + }) +} + +fn map_shared_diesel_error(err: Arc) -> DieselError { + match err.as_ref() { + DieselError::NotFound => DieselError::NotFound, + DieselError::RollbackTransaction => DieselError::RollbackTransaction, + DieselError::AlreadyInTransaction => DieselError::AlreadyInTransaction, + DieselError::NotInTransaction => DieselError::NotInTransaction, + DieselError::BrokenTransactionManager => DieselError::BrokenTransactionManager, + _ => DieselError::QueryBuilderError(Box::new(io::Error::other(err.to_string()))), + } +} + +pub struct PostCache; + impl PostCache { - init!(); - - clear_all!("hole_v2:cache::post:*:v2"); - - pub async fn sets(&mut self, ps: &[&Post]) { + pub async fn sets(ps: &[&Post]) { if ps.is_empty() { return; } - let kvs: Vec<(String, String)> = ps - .iter() - .map(|p| (post_cache_key!(p.id), serde_json::to_string(p).unwrap())) - .collect(); - self.rconn.mset(&kvs).await.unwrap_or_else(|e| { - warn!("set post cache failed: {}", e); - dbg!(&kvs); - }); - } - - pub async fn get(&mut self, pid: &i32) -> Option { - let key = post_cache_key!(pid); - let rds_result: Option = self - .rconn - .get::>(key) - .await - .unwrap_or_else(|e| { - warn!("try to get post cache, connect rds failed, {}", e); - None - }); - - rds_result.and_then(|s| { - serde_json::from_str(&s).unwrap_or_else(|e| { - warn!("get post cache, decode failed {}, {}", e, s); - None - }) - }) - } - - pub async fn gets(&mut self, pids: &[i32]) -> Vec> { - // 长度为1时会走GET而非MGET,返回值格式不兼容。愚蠢的设计。 - match pids.len() { - 0 => vec![], - 1 => vec![self.get(&pids[0]).await], - _ => { - let ks: Vec = pids.iter().map(|pid| post_cache_key!(pid)).collect(); - // dbg!(&ks); - // Vec is single arg, while &Vec is not. Seems a bug. - let rds_result: Vec> = self - .rconn - .get::, Vec>>(ks) - .await - .unwrap_or_else(|e| { - warn!("try to get posts cache, connect rds failed, {}", e); - vec![None; pids.len()] - }); - // dbg!(&rds_result); - - // 定期热度衰减的时候会清空缓存,这里设不设置过期时间影响不大 - - rds_result - .into_iter() - .map(|x| { - // dbg!(&x); - x.and_then(|s| { - serde_json::from_str(&s).unwrap_or_else(|e| { - warn!("get post cache, decode failed {}, {}", e, s); - None - }) - }) - }) - .collect() - } + for p in ps { + post_cache().insert(p.id, (*p).clone()).await; } } + + pub async fn get(pid: &i32) -> Option { + post_cache().get(pid).await + } + + pub async fn get_with(pid: i32, init: F) -> QueryResult + where + F: Future>, + { + post_cache() + .try_get_with(pid, init) + .await + .map_err(map_shared_diesel_error) + } + + pub async fn gets(pids: &[i32]) -> Vec> { + future::join_all(pids.iter().map(Self::get)).await + } + + pub async fn clear_all() { + post_cache().invalidate_all(); + } } pub struct PostCommentCache { key: String, - rconn: RdsConn, } impl PostCommentCache { - init!(i32, "hole_v2:cache:post_comments:{}"); - - pub async fn set(&mut self, cs: &[Comment]) { - self.rconn - .set_ex( - &self.key, - serde_json::to_string(cs).unwrap(), - INSTANCE_EXPIRE_TIME, - ) - .await - .unwrap_or_else(|e| { - warn!("set comments cache failed: {}", e); - dbg!(cs); - }) - } - - pub async fn get(&mut self) -> Option> { - let rds_result = self.rconn.get::<&String, String>(&self.key).await; - // dbg!(&rds_result); - if let Ok(s) = rds_result { - self.rconn - .expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME as i64) - .await - .unwrap_or_else(|e| { - warn!( - "get comments cache, set new expire failed: {}, {}, {} ", - e, &self.key, &s - ); - false - }); - serde_json::from_str(&s).unwrap_or_else(|e| { - warn!("get comments cache, decode failed {}, {}", e, s); - None - }) - } else { - None + pub fn init(post_id: i32) -> Self { + Self { + key: format!("hole_v2:cache:post_comments:{}", post_id), } } + pub async fn get_with(&self, init: F) -> QueryResult> + where + F: Future>>, + { + post_comment_cache() + .try_get_with(self.key.clone(), init) + .await + .map_err(map_shared_diesel_error) + } + pub async fn clear(&mut self) { - self.rconn.del(&self.key).await.unwrap_or_else(|e| { - warn!("clear commenrs cache fail, {}", e); - }); + post_comment_cache().invalidate(&self.key).await; } } pub struct PostListCache { key: String, mode: u8, - rconn: RdsConn, - length: isize, } impl PostListCache { - pub fn init(room_id: Option, mode: u8, rconn: &RdsConn) -> Self { + pub const MAX_LENGTH: usize = 900; + // pub const MIN_LENGTH: usize = 200; + pub const CUT_LENGTH: usize = 100; + pub fn init(room_id: Option, mode: u8) -> Self { Self { key: format!( "hole_v2:cache:post_list:{}:{}", - match room_id { - Some(i) => i.to_string(), - None => "".to_owned(), - }, + room_id.map_or_else(String::new, |i| i.to_string()), &mode ), mode, - rconn: rconn.clone(), - length: 0, } } - async fn set_and_check_length(&mut self) { - let mut l = self.rconn.zcard(&self.key).await.unwrap(); - if l > MAX_LENGTH { - self.rconn - .zremrangebyrank::<&String, ()>(&self.key, MAX_LENGTH - CUT_LENGTH, -1) - .await - .unwrap_or_else(|e| { - warn!("cut list cache failed, {}, {}", e, &self.key); - }); - l = MIN_LENGTH; - } - self.length = l; - } - - pub async fn need_fill(&mut self) -> bool { - self.set_and_check_length().await; - self.length < MIN_LENGTH - } - - pub fn i64_len(&self) -> i64 { - self.length.try_into().unwrap() - } - - pub fn i64_minlen(&self) -> i64 { - MIN_LENGTH.try_into().unwrap() - } - fn p2pair(&self, p: &Post) -> (i64, i32) { ( match self.mode { @@ -223,163 +173,155 @@ impl PostListCache { ) } - pub async fn fill(&mut self, ps: &[Post]) { - let items: Vec<(i64, i32)> = ps.iter().map(|p| self.p2pair(p)).collect(); - self.rconn - .zadd_multiple(&self.key, &items) + pub async fn fill_with(&mut self, query_posts: F) -> QueryResult + where + F: Future>>, + { + let list_ref = post_list_cache() + .try_get_with(self.key.clone(), async { + let mut items: Vec<(i64, i32)> = + query_posts.await?.iter().map(|p| self.p2pair(&p)).collect(); + items.sort_by(|a, b| a.0.cmp(&b.0)); + Ok(Arc::new(RwLock::new(items))) + }) .await - .unwrap_or_else(|e| { - warn!("fill list cache failed, {} {}", e, &self.key); - }); + .map_err(map_shared_diesel_error)?; + let list = list_ref.read().await; - self.set_and_check_length().await; - } + // Double-Checked Locking + if list.len() <= Self::MAX_LENGTH { + return Ok(list.len()); + } + drop(list); + let mut list = list_ref.write().await; - pub async fn put(&mut self, p: &Post) { - // 其他都是加到最前面的,但热榜不是。可能导致MIN_LENGTH到MAX_LENGTH之间的数据不可靠 - // 影响不大,先不管了 - if p.is_deleted || (self.mode > 0 && p.is_reported) { - self.rconn.zrem(&self.key, p.id).await.unwrap_or_else(|e| { - warn!( - "remove from list cache failed, {} {} {}", - e, &self.key, p.id - ); - }); + if list.len() <= Self::MAX_LENGTH { + Ok(list.len()) } else { - let (s, m) = self.p2pair(p); - self.rconn.zadd(&self.key, m, s).await.unwrap_or_else(|e| { - warn!( - "put into list cache failed, {} {} {} {}", - e, &self.key, m, s - ); - }); + list.truncate(Self::MAX_LENGTH - Self::CUT_LENGTH); + Ok(list.len()) } } - pub async fn get_pids(&mut self, start: i64, limit: i64) -> Vec { - self.rconn - .zrange( - &self.key, - start.try_into().unwrap(), - (start + limit - 1).try_into().unwrap(), - ) - .await - .unwrap() + pub async fn put(&mut self, p: &Post) { + // Don't put is there is no cache. Let fill_with handle it. + if let Some(list_ref) = post_list_cache().get(&self.key).await { + let mut list = list_ref.write().await; + // Remove any existing entry for this post_id + if let Some(pos) = list.iter().position(|(_, pid)| *pid == p.id) { + list.remove(pos); + } + if p.is_deleted || (self.mode > 0 && p.is_reported) { + return; + } + list.push(self.p2pair(p)); + list.sort_by(|a, b| a.0.cmp(&b.0)); + } + } + + pub async fn get_pids(&mut self, start: usize, limit: usize) -> Vec { + if let Some(list_ref) = post_list_cache().get(&self.key).await { + let list = list_ref.read().await; + list.iter() + .skip(start) + .take(limit) + .map(|(_, pid)| *pid) + .collect() + } else { + vec![] + } } pub async fn clear(&mut self) { - self.rconn.del(&self.key).await.unwrap_or_else(|e| { - warn!("clear post list cache failed, {}", e); - }); + post_list_cache().invalidate(&self.key).await; } } pub struct UserCache { key: String, - rconn: RdsConn, } impl UserCache { - init!(&str, "hole_v2:cache:user:{}"); - - clear_all!("hole_v2:cache:user:*"); - - pub async fn set(&mut self, u: &User) { - self.rconn - .set_ex( - &self.key, - serde_json::to_string(u).unwrap(), - INSTANCE_EXPIRE_TIME, - ) - .await - .unwrap_or_else(|e| { - warn!("set user cache failed: {}", e); - dbg!(u); - }) + pub fn init(user_id: &str) -> Self { + Self { + key: format!("hole_v2:cache:user:{}", user_id), + } } - pub async fn get(&mut self) -> Option { - let rds_result = self.rconn.get::<&String, String>(&self.key).await; - if let Ok(s) = rds_result { - self.rconn - .expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME as i64) - .await - .unwrap_or_else(|e| { - warn!( - "get user cache, set new expire failed: {}, {}, {} ", - e, &self.key, &s - ); - false - }); - serde_json::from_str(&s).unwrap_or_else(|e| { - warn!("get user cache, decode failed {}, {}", e, s); - None - }) - } else { - None - } + // No need to use get_with for User. Just check and set separately. + pub async fn set(&self, u: &User) { + user_cache().insert(self.key.clone(), u.clone()).await; + } + + pub async fn get(&self) -> Option { + user_cache().get(&self.key).await + } + + pub async fn clear_all() { + user_cache().invalidate_all(); } } pub struct BlockDictCache { key: String, - rconn: RdsConn, } impl BlockDictCache { - // namehash, pid - init!(&str, i32, "hole_v2:cache:block_dict:{}:{}"); + pub fn init(namehash: &str, post_id: i32) -> Self { + Self { + key: format!("hole_v2:cache:block_dict:{}:{}", namehash, post_id), + } + } pub async fn get_or_create( &mut self, user: &CurrentUser, hash_list: &[&String], + rconn: &RdsConn, ) -> RedisResult> { - let mut block_dict = self - .rconn - .hgetall::<&String, HashMap>(&self.key) - .await?; + let dict_ref = block_dict_cache() + .get_with(self.key.clone(), async move { + Arc::new(RwLock::new(HashMap::new())) + }) + .await; - //dbg!(&self.key, &block_dict); - - let missing: Vec<(String, bool)> = - future::try_join_all(hash_list.iter().filter_map(|hash| { - (!block_dict.contains_key(&hash.to_string())).then_some(async { - Ok::<(String, bool), RedisError>(( - hash.to_string(), - BlockedUsers::check_if_block(&self.rconn, user, hash).await?, - )) - }) - })) - .await?; - - if !missing.is_empty() { - self.rconn.hset_multiple(&self.key, &missing).await?; - self.rconn - .expire(&self.key, INSTANCE_EXPIRE_TIME as i64) - .await?; - block_dict.extend(missing.into_iter()); + // Find missing hashes + let mut missing_keys: Vec = Vec::new(); + { + let block_dict = dict_ref.read().await; + for hash in hash_list { + if !block_dict.contains_key(hash.as_str()) { + missing_keys.push((*hash).clone()); + } + } } - //dbg!(&block_dict); + if !missing_keys.is_empty() { + let mut missing: Vec<(String, bool)> = Vec::with_capacity(missing_keys.len()); + for hash in missing_keys { + let is_blocked = BlockedUsers::check_if_block(rconn, user, &hash).await?; + missing.push((hash, is_blocked)); + } - Ok(block_dict) + let mut block_dict = dict_ref.write().await; + for (hash, is_blocked) in missing { + block_dict.entry(hash).or_insert(is_blocked); + } + } + + let out = dict_ref.read().await.clone(); + Ok(out) } - pub async fn clear(&mut self) -> RedisResult<()> { - self.rconn.del(&self.key).await + pub async fn clear(&mut self) { + block_dict_cache().invalidate(&self.key).await; } } -pub async fn cached_user_count(db: &Db, rconn: &mut RdsConn) -> Api { - let cnt: Option = rconn.get(KEY_USER_COUNT).await?; - if let Some(x) = cnt { - Ok(x) - } else { - let x = User::get_count(db).await?; - rconn - .set_ex(KEY_USER_COUNT, x, USER_COUNT_EXPIRE_TIME) - .await?; - Ok(x) - } +pub async fn cached_user_count(db: &Db) -> Api { + let key = "hole_v2:cache:user_count"; + Ok(user_count_cache() + .try_get_with(key.to_string(), async { User::get_count(db).await }) + .await + .map_err(map_shared_diesel_error)?) } diff --git a/src/main.rs b/src/main.rs index 3965997..d53592c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -49,20 +49,19 @@ async fn main() { let rmc = init_rds_client().await; let mut rconn = RdsConn(rmc.clone()); let mut c_start = establish_connection(); - models::User::clear_non_admin_users(&mut c_start, &mut rconn).await; + models::User::clear_non_admin_users(&mut c_start).await; clear_outdate_redis_data(&mut rconn).await; tokio::spawn(async move { loop { sleep(Duration::from_secs(3 * 60 * 60)).await; - models::Post::annealing(&mut c_start, &mut rconn).await; + models::Post::annealing(&mut c_start).await; } }); - let rconn = RdsConn(rmc.clone()); tokio::spawn(async move { loop { for room_id in (0..5).map(Some).chain([None, Some(42)]) { - cache::PostListCache::init(room_id, 3, &rconn).clear().await; + cache::PostListCache::init(room_id, 3).clear().await; } sleep(Duration::from_secs(5 * 60)).await; } diff --git a/src/models.rs b/src/models.rs index 2a406a0..05da0af 100644 --- a/src/models.rs +++ b/src/models.rs @@ -3,7 +3,6 @@ use crate::cache::*; use crate::db_conn::{Conn, Db}; use crate::random_hasher::random_string; -use crate::rds_conn::RdsConn; use crate::schema::*; use chrono::{offset::Utc, DateTime}; use diesel::sql_types::*; @@ -93,7 +92,7 @@ macro_rules! with_log { }; } -#[derive(Queryable, Insertable, Serialize, Deserialize, Debug)] +#[derive(Queryable, Insertable, Serialize, Deserialize, Debug, Clone)] #[serde(crate = "rocket::serde")] pub struct Comment { pub id: i32, @@ -107,7 +106,7 @@ pub struct Comment { pub post_id: i32, } -#[derive(Queryable, Insertable, Serialize, Deserialize, Debug)] +#[derive(Queryable, Insertable, Serialize, Deserialize, Debug, Clone)] #[serde(crate = "rocket::serde")] pub struct Post { pub id: i32, @@ -129,7 +128,7 @@ pub struct Post { pub down_votes: i32, } -#[derive(Queryable, Insertable, Serialize, Deserialize, Debug)] +#[derive(Queryable, Insertable, Serialize, Deserialize, Debug, Clone)] #[serde(crate = "rocket::serde")] pub struct User { pub id: i32, @@ -156,9 +155,8 @@ impl Post { _get_multi!(posts); - pub async fn get_multi(db: &Db, rconn: &RdsConn, ids: &[i32]) -> QueryResult> { - let mut cacher = PostCache::init(rconn); - let mut cached_posts = cacher.gets(ids).await; + pub async fn get_multi(db: &Db, ids: &[i32]) -> QueryResult> { + let mut cached_posts = PostCache::gets(ids).await; let mut id2po = HashMap::>::new(); // dbg!(&cached_posts); @@ -180,7 +178,7 @@ impl Post { let missing_ps = Self::_get_multi(db, missing_ids).await?; // dbg!(&missing_ps); - cacher.sets(&missing_ps.iter().collect::>()).await; + PostCache::sets(&missing_ps.iter().collect::>()).await; for p in missing_ps.into_iter() { if let Some(op) = id2po.get_mut(&p.id) { @@ -194,55 +192,52 @@ impl Post { .collect()) } - pub async fn get(db: &Db, rconn: &RdsConn, id: i32) -> QueryResult { + pub async fn get(db: &Db, id: i32) -> QueryResult { // 注意即使is_deleted也应该缓存和返回 - let mut cacher = PostCache::init(rconn); - if let Some(p) = cacher.get(&id).await { - Ok(p) - } else { - let p = Self::_get(db, id).await?; - cacher.sets(&[&p]).await; - Ok(p) - } + PostCache::get_with(id, async move { Self::_get(db, id).await }).await } - pub async fn get_comments(&self, db: &Db, rconn: &RdsConn) -> QueryResult> { - let mut cacher = PostCommentCache::init(self.id, rconn); - if let Some(cs) = cacher.get().await { - Ok(cs) - } else { - let cs = Comment::gets_by_post_id(db, self.id).await?; - cacher.set(&cs).await; - Ok(cs) - } + pub async fn get_comments(&self, db: &Db) -> QueryResult> { + let cacher = PostCommentCache::init(self.id); + cacher + .get_with(async move { Comment::gets_by_post_id(db, self.id).await }) + .await } - pub async fn clear_comments_cache(&self, rconn: &RdsConn) { - PostCommentCache::init(self.id, rconn).clear().await; + pub async fn clear_comments_cache(&self) { + PostCommentCache::init(self.id).clear().await; } pub async fn gets_by_page( db: &Db, - rconn: &RdsConn, room_id: Option, order_mode: u8, - start: i64, - limit: i64, + start: usize, + limit: usize, ) -> QueryResult> { - let mut cacher = PostListCache::init(room_id, order_mode, rconn); - if cacher.need_fill().await { - let pids = - Self::_get_ids_by_page(db, room_id, order_mode, 0, cacher.i64_minlen()).await?; - let ps = Self::get_multi(db, rconn, &pids).await?; - cacher.fill(&ps).await; - } - let pids = if start + limit > cacher.i64_len() { - Self::_get_ids_by_page(db, room_id, order_mode, start, limit).await? + let mut cacher = PostListCache::init(room_id, order_mode); + + let current_len = cacher + .fill_with(async move { + let pids = Self::_get_ids_by_page( + db, + room_id, + order_mode, + 0, + PostListCache::MAX_LENGTH as i64, + ) + .await?; + Self::get_multi(db, &pids).await + }) + .await?; + + let pids = if start + limit > current_len { + Self::_get_ids_by_page(db, room_id, order_mode, start as i64, limit as i64).await? } else { cacher.get_pids(start, limit).await }; - Self::get_multi(db, rconn, &pids).await + Self::get_multi(db, &pids).await } async fn _get_ids_by_page( db: &Db, @@ -281,7 +276,6 @@ impl Post { pub async fn search( db: &Db, - rconn: &RdsConn, room_id: Option, search_mode: u8, search_text: String, @@ -339,7 +333,7 @@ impl Post { .load(with_log!(c)) }) .await?; - Self::get_multi(db, rconn, &pids).await + Self::get_multi(db, &pids).await } pub async fn create(db: &Db, new_post: NewPost) -> QueryResult { @@ -351,18 +345,16 @@ impl Post { .await } - pub async fn set_instance_cache(&self, rconn: &RdsConn) { - PostCache::init(rconn).sets(&[self]).await; + pub async fn set_instance_cache(&self) { + PostCache::sets(&[self]).await; } - pub async fn refresh_cache(&self, rconn: &RdsConn, is_new: bool) { + pub async fn refresh_cache(&self, is_new: bool) { join!( - self.set_instance_cache(rconn), + self.set_instance_cache(), future::join_all((if is_new { [0, 2, 3, 4] } else { [1, 2, 3, 4] }).map( |mode| async move { - PostListCache::init(None, mode, &rconn.clone()) - .put(self) - .await; - PostListCache::init(Some(self.room_id), mode, &rconn.clone()) + PostListCache::init(None, mode).put(self).await; + PostListCache::init(Some(self.room_id), mode) .put(self) .await; } @@ -370,16 +362,16 @@ impl Post { ); } - pub async fn annealing(c: &mut Conn, rconn: &mut RdsConn) { + pub async fn annealing(c: &mut Conn) { info!("Time for annealing!"); diesel::update(posts::table.filter(posts::hot_score.gt(10))) .set(posts::hot_score.eq(floor(float4(posts::hot_score) * 0.9))) .execute(with_log!(c)) .unwrap(); - PostCache::clear_all(rconn).await; + PostCache::clear_all().await; for room_id in (0..5).map(Some).chain([None, Some(42)]) { - PostListCache::init(room_id, 2, rconn).clear().await; + PostListCache::init(room_id, 2).clear().await; } } } @@ -396,7 +388,11 @@ impl User { .ok() } - pub async fn get_by_token(db: &Db, rconn: &RdsConn, token: &str) -> Option { + pub async fn get_by_token(db: &Db, token: &str) -> Option { + let mut cacher = UserCache::init(token); + if let Some(u) = cacher.get().await { + return Some(u); + } let real_token; let token = match &token.split(':').collect::>()[..] { @@ -410,7 +406,7 @@ impl User { _ => token, }; // dbg!(token); - let mut cacher = UserCache::init(token, rconn); + let mut cacher = UserCache::init(token); if let Some(u) = cacher.get().await { Some(u) } else { @@ -452,11 +448,11 @@ impl User { .await } - pub async fn clear_non_admin_users(c: &mut Conn, rconn: &mut RdsConn) { + pub async fn clear_non_admin_users(c: &mut Conn) { diesel::delete(users::table.filter(users::is_admin.eq(false))) .execute(c) .unwrap(); - UserCache::clear_all(rconn).await; + UserCache::clear_all().await; } pub async fn get_count(db: &Db) -> QueryResult {