cache in memory

This commit is contained in:
2026-03-23 22:14:01 +08:00
parent fae30ed97a
commit 8e386d98d0
13 changed files with 318 additions and 392 deletions

View File

@@ -29,3 +29,4 @@ futures-util = "0.3.24"
lru = "0.11" lru = "0.11"
reqwest = { version = "0.11.10", features = ["json"], optional = true } reqwest = { version = "0.11.10", features = ["json"], optional = true }
moka = { version = "0.12.15", features = ["future"] }

View File

@@ -78,7 +78,7 @@ clone 代码 (略)
安装postgresql (略) 安装postgresql (略)
安装redis (略) 安装redis/valkey (略)
#### 准备数据库 #### 准备数据库

View File

@@ -31,7 +31,7 @@ pub async fn attention_post(
// 临时用户不允许手动关注 // 临时用户不允许手动关注
user.id.ok_or(YouAreTmp)?; user.id.ok_or(YouAreTmp)?;
let mut p = Post::get(&db, &rconn, ai.pid).await?; let mut p = Post::get(&db, ai.pid).await?;
p.check_permission(&user, "r")?; p.check_permission(&user, "r")?;
let mut att = Attention::init(&user.namehash, &rconn); let mut att = Attention::init(&user.namehash, &rconn);
let switch_to = ai.switch == 1; let switch_to = ai.switch == 1;
@@ -59,7 +59,7 @@ pub async fn attention_post(
if switch_to && user.is_admin { if switch_to && user.is_admin {
update!(p, posts, &db, { is_reported, to false }); update!(p, posts, &db, { is_reported, to false });
} }
p.refresh_cache(&rconn, false).await; p.refresh_cache(false).await;
} }
Ok(json!({ Ok(json!({
@@ -75,7 +75,7 @@ pub async fn attention_post(
pub async fn get_attention(user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi { pub async fn get_attention(user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi {
let mut ids = Attention::init(&user.namehash, &rconn).all().await?; let mut ids = Attention::init(&user.namehash, &rconn).all().await?;
ids.sort_by_key(|x| -x); ids.sort_by_key(|x| -x);
let ps: Vec<Post> = Post::get_multi(&db, &rconn, &ids) let ps: Vec<Post> = Post::get_multi(&db, &ids)
.await? .await?
.into_iter() .into_iter()
.filter(|post| { .filter(|post| {

View File

@@ -88,14 +88,14 @@ pub async fn c2output<'r>(
#[get("/getcomment?<pid>")] #[get("/getcomment?<pid>")]
pub async fn get_comment(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi { pub async fn get_comment(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi {
let p = Post::get(&db, &rconn, pid).await?; let p = Post::get(&db, pid).await?;
if p.is_deleted { if p.is_deleted {
return Err(ApiError::Pc(IsDeleted)); return Err(ApiError::Pc(IsDeleted));
} }
let cs = p.get_comments(&db, &rconn).await?; let cs = p.get_comments(&db).await?;
let hash_list = cs.iter().map(|c| &c.author_hash).collect::<Vec<_>>(); let hash_list = cs.iter().map(|c| &c.author_hash).collect::<Vec<_>>();
let cached_block_dict = BlockDictCache::init(&user.namehash, p.id, &rconn) let cached_block_dict = BlockDictCache::init(&user.namehash, p.id)
.get_or_create(&user, &hash_list) .get_or_create(&user, &hash_list, &rconn)
.await?; .await?;
let data = c2output(&p, &cs, &user, &cached_block_dict).await; let data = c2output(&p, &cs, &user, &cached_block_dict).await;
@@ -117,7 +117,7 @@ pub async fn add_comment(
db: Db, db: Db,
rconn: RdsConn, rconn: RdsConn,
) -> JsonApi { ) -> JsonApi {
let mut p = Post::get(&db, &rconn, pid).await?; let mut p = Post::get(&db, pid).await?;
if p.author_hash != user.namehash { if p.author_hash != user.namehash {
user.id.ok_or(YouAreTmp)?; user.id.ok_or(YouAreTmp)?;
} }
@@ -160,10 +160,7 @@ pub async fn add_comment(
{ hot_score, add hs_delta } { hot_score, add hs_delta }
); );
join!( join!(p.refresh_cache(false), p.clear_comments_cache(),);
p.refresh_cache(&rconn, false),
p.clear_comments_cache(&rconn),
);
Ok(json!({ Ok(json!({
"code": 0 "code": 0

View File

@@ -101,7 +101,7 @@ impl<'r> FromRequest<'r> for CurrentUser {
Some(CurrentUser::from_hash(&rconn, rh.hash_with_salt(sp[1])).await) Some(CurrentUser::from_hash(&rconn, rh.hash_with_salt(sp[1])).await)
} else { } else {
let db = try_outcome!(request.guard::<Db>().await); let db = try_outcome!(request.guard::<Db>().await);
if let Some(u) = User::get_by_token(&db, &rconn, token).await { if let Some(u) = User::get_by_token(&db, token).await {
let namehash = rh.hash_with_salt(&u.name); let namehash = rh.hash_with_salt(&u.name);
let user_base = CurrentUser::from_hash(&rconn, namehash).await; let user_base = CurrentUser::from_hash(&rconn, namehash).await;
Some(CurrentUser { Some(CurrentUser {

View File

@@ -22,7 +22,7 @@ pub async fn delete(di: Form<DeleteInput>, user: CurrentUser, db: Db, rconn: Rds
"cid" => { "cid" => {
let mut c = Comment::get(&db, di.id).await?; let mut c = Comment::get(&db, di.id).await?;
c.soft_delete(&user, &db).await?; c.soft_delete(&user, &db).await?;
let mut p = Post::get(&db, &rconn, c.post_id).await?; let mut p = Post::get(&db, c.post_id).await?;
update!( update!(
p, p,
posts, posts,
@@ -31,13 +31,13 @@ pub async fn delete(di: Form<DeleteInput>, user: CurrentUser, db: Db, rconn: Rds
{ hot_score, add -1 } { hot_score, add -1 }
); );
p.refresh_cache(&rconn, false).await; p.refresh_cache(false).await;
p.clear_comments_cache(&rconn).await; p.clear_comments_cache().await;
(c.author_hash.clone(), p) (c.author_hash.clone(), p)
} }
"pid" => { "pid" => {
let mut p = Post::get(&db, &rconn, di.id).await?; let mut p = Post::get(&db, di.id).await?;
// 有评论:清空主楼而非删除 // 有评论:清空主楼而非删除
if p.author_hash == user.namehash && p.n_comments > 0 { if p.author_hash == user.namehash && p.n_comments > 0 {
@@ -52,7 +52,7 @@ pub async fn delete(di: Form<DeleteInput>, user: CurrentUser, db: Db, rconn: Rds
} }
// 如果是删除需要也从0号缓存队列中去掉 // 如果是删除需要也从0号缓存队列中去掉
p.refresh_cache(&rconn, true).await; p.refresh_cache(true).await;
(p.author_hash.clone(), p) (p.author_hash.clone(), p)
} }
@@ -110,10 +110,10 @@ pub async fn report(ri: Form<ReportInput>, user: CurrentUser, db: Db, rconn: Rds
(!ri.reason.is_empty()).then_some(()).ok_or(NoReason)?; (!ri.reason.is_empty()).then_some(()).ok_or(NoReason)?;
let mut p = Post::get(&db, &rconn, ri.pid).await?; let mut p = Post::get(&db, ri.pid).await?;
if ri.should_hide.is_some() { if ri.should_hide.is_some() {
update!(p, posts, &db, { is_reported, to true }); update!(p, posts, &db, { is_reported, to true });
p.refresh_cache(&rconn, false).await; p.refresh_cache(false).await;
} }
Systemlog { Systemlog {
@@ -142,7 +142,7 @@ pub async fn report(ri: Form<ReportInput>, user: CurrentUser, db: Db, rconn: Rds
) )
.await?; .await?;
Attention::init(&user.namehash, &rconn).add(p.id).await?; Attention::init(&user.namehash, &rconn).add(p.id).await?;
p.refresh_cache(&rconn, true).await; p.refresh_cache(true).await;
code0!() code0!()
} }
@@ -163,7 +163,7 @@ pub async fn block(bi: Form<BlockInput>, user: CurrentUser, db: Db, rconn: RdsCo
let pid; let pid;
let nh_to_block = match bi.content_type.as_str() { let nh_to_block = match bi.content_type.as_str() {
"post" => { "post" => {
let p = Post::get(&db, &rconn, bi.id).await?; let p = Post::get(&db, bi.id).await?;
pid = p.id; pid = p.id;
p.author_hash p.author_hash
} }
@@ -187,9 +187,7 @@ pub async fn block(bi: Form<BlockInput>, user: CurrentUser, db: Db, rconn: RdsCo
.unwrap_or_default() .unwrap_or_default()
}; };
BlockDictCache::init(&user.namehash, pid, &rconn) BlockDictCache::init(&user.namehash, pid).clear().await;
.clear()
.await?;
Ok(json!({ Ok(json!({
"code": 0, "code": 0,

View File

@@ -67,7 +67,7 @@ pub struct CwInput {
async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> Api<PostOutput> { async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> Api<PostOutput> {
let comments: Option<Vec<Comment>> = if p.n_comments < 5 { let comments: Option<Vec<Comment>> = if p.n_comments < 5 {
Some(p.get_comments(db, rconn).await?) Some(p.get_comments(db).await?)
} else { } else {
None None
}; };
@@ -78,8 +78,8 @@ async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> Api
.chain(std::iter::once(&p.author_hash)) .chain(std::iter::once(&p.author_hash))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
//dbg!(&hash_list); //dbg!(&hash_list);
let cached_block_dict = BlockDictCache::init(&user.namehash, p.id, rconn) let cached_block_dict = BlockDictCache::init(&user.namehash, p.id)
.get_or_create(user, &hash_list) .get_or_create(user, &hash_list, rconn)
.await?; .await?;
let is_blocked = cached_block_dict[&p.author_hash]; let is_blocked = cached_block_dict[&p.author_hash];
let can_view = let can_view =
@@ -144,7 +144,7 @@ pub async fn ps2outputs(
#[get("/getone?<pid>")] #[get("/getone?<pid>")]
pub async fn get_one(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi { pub async fn get_one(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi {
user.id.ok_or(YouAreTmp)?; user.id.ok_or(YouAreTmp)?;
let p = Post::get(&db, &rconn, pid).await?; let p = Post::get(&db, pid).await?;
p.check_permission(&user, "ro")?; p.check_permission(&user, "ro")?;
Ok(json!({ Ok(json!({
"data": p2output(&p, &user,&db, &rconn).await?, "data": p2output(&p, &user,&db, &rconn).await?,
@@ -165,18 +165,12 @@ pub async fn get_list(
let page = p.unwrap_or(1); let page = p.unwrap_or(1);
let page_size = 25; let page_size = 25;
let start = (page - 1) * page_size; let start = (page - 1) * page_size;
let ps: Vec<Post> = Post::gets_by_page( let ps: Vec<Post> =
&db, Post::gets_by_page(&db, room_id, order_mode, start as usize, page_size as usize)
&rconn, .await?
room_id, .into_iter()
order_mode, .filter(|post| page < 40 || !post.get_is_private())
start.into(), .collect();
page_size.into(),
)
.await?
.into_iter()
.filter(|post| page < 40 || !post.get_is_private())
.collect();
let ps_data = ps2outputs(&ps, &user, &db, &rconn).await?; let ps_data = ps2outputs(&ps, &user, &db, &rconn).await?;
@@ -227,7 +221,7 @@ pub async fn publish_post(
) )
.await?; .await?;
Attention::init(&user.namehash, &rconn).add(p.id).await?; Attention::init(&user.namehash, &rconn).add(p.id).await?;
p.refresh_cache(&rconn, true).await; p.refresh_cache(true).await;
if !poi.poll_options.is_empty() { if !poi.poll_options.is_empty() {
PollOption::init(p.id, &rconn) PollOption::init(p.id, &rconn)
@@ -238,18 +232,18 @@ pub async fn publish_post(
} }
#[post("/editcw", data = "<cwi>")] #[post("/editcw", data = "<cwi>")]
pub async fn edit_cw(cwi: Form<CwInput>, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi { pub async fn edit_cw(cwi: Form<CwInput>, user: CurrentUser, db: Db) -> JsonApi {
let mut p = Post::get(&db, &rconn, cwi.pid).await?; let mut p = Post::get(&db, cwi.pid).await?;
p.check_permission(&user, "w")?; p.check_permission(&user, "w")?;
update!(p, posts, &db, { cw, to cwi.cw.to_string() }); update!(p, posts, &db, { cw, to cwi.cw.to_string() });
p.refresh_cache(&rconn, false).await; p.refresh_cache(false).await;
code0!() code0!()
} }
#[get("/getmulti?<pids>")] #[get("/getmulti?<pids>")]
pub async fn get_multi(pids: Vec<i32>, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi { pub async fn get_multi(pids: Vec<i32>, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonApi {
user.id.ok_or(YouAreTmp)?; user.id.ok_or(YouAreTmp)?;
let ps: Vec<Post> = Post::get_multi(&db, &rconn, &pids) let ps: Vec<Post> = Post::get_multi(&db, &pids)
.await? .await?
.into_iter() .into_iter()
.filter(|post| { .filter(|post| {

View File

@@ -22,7 +22,7 @@ pub async fn reaction(
) -> JsonApi { ) -> JsonApi {
user.id.ok_or(YouAreTmp)?; user.id.ok_or(YouAreTmp)?;
let mut p = Post::get(&db, &rconn, pid).await?; let mut p = Post::get(&db, pid).await?;
p.check_permission(&user, "r")?; p.check_permission(&user, "r")?;
let mut r_up = Reaction::init(pid, 1, &rconn); let mut r_up = Reaction::init(pid, 1, &rconn);
let mut r_down = Reaction::init(pid, -1, &rconn); let mut r_down = Reaction::init(pid, -1, &rconn);
@@ -51,7 +51,7 @@ pub async fn reaction(
{ down_votes, add delta_down } { down_votes, add delta_down }
); );
p.refresh_cache(&rconn, false).await; p.refresh_cache(false).await;
} }
Ok(json!({ Ok(json!({

View File

@@ -25,7 +25,6 @@ pub async fn search(
} else { } else {
Post::search( Post::search(
&db, &db,
&rconn,
room_id, room_id,
search_mode, search_mode,
keywords.to_string(), keywords.to_string(),

View File

@@ -12,7 +12,7 @@ pub async fn get_systemlog(
user: CurrentUser, user: CurrentUser,
rh: &State<RandomHasher>, rh: &State<RandomHasher>,
db: Db, db: Db,
mut rconn: RdsConn, rconn: RdsConn,
) -> JsonApi { ) -> JsonApi {
let logs = Systemlog::get_list(&rconn, 50).await?; let logs = Systemlog::get_list(&rconn, 50).await?;
@@ -20,7 +20,7 @@ pub async fn get_systemlog(
"tmp_token": rh.get_tmp_token(), "tmp_token": rh.get_tmp_token(),
"salt": look!(rh.salt), "salt": look!(rh.salt),
"start_time": rh.start_time.timestamp(), "start_time": rh.start_time.timestamp(),
"user_count": cached_user_count(&db, &mut rconn).await?, "user_count": cached_user_count(&db).await?,
"custom_title": user.custom_title, "custom_title": user.custom_title,
"admin_list": get_admin_list(&rconn).await?, "admin_list": get_admin_list(&rconn).await?,
"candidate_list": get_candidate_list(&rconn).await?, "candidate_list": get_candidate_list(&rconn).await?,

View File

@@ -2,213 +2,163 @@ use crate::api::{Api, CurrentUser};
use crate::db_conn::Db; use crate::db_conn::Db;
use crate::models::{Comment, Post, User}; use crate::models::{Comment, Post, User};
use crate::rds_conn::RdsConn; use crate::rds_conn::RdsConn;
use crate::rds_models::{clear_all, init, BlockedUsers}; use crate::rds_models::BlockedUsers;
use diesel::result::{Error as DieselError, QueryResult};
use moka::future::Cache;
use rand::Rng; use rand::Rng;
use redis::{AsyncCommands, RedisError, RedisResult}; use redis::RedisResult;
use rocket::serde::json::serde_json;
// can use rocket::serde::json::to_string in master version
use futures_util::stream::StreamExt;
use rocket::futures::future; use rocket::futures::future;
use rocket::tokio::sync::RwLock;
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future;
use std::io;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;
const KEY_USER_COUNT: &str = "hole_v2:cache:user_count"; const USER_COUNT_EXPIRE_TIME: u64 = 60;
const USER_COUNT_EXPIRE_TIME: u64 = 5 * 60;
const INSTANCE_EXPIRE_TIME: u64 = 60 * 60; const INSTANCE_EXPIRE_TIME: u64 = 60 * 60;
const MIN_LENGTH: isize = 200; // Global cache getters using OnceLock
const MAX_LENGTH: isize = 900; fn post_cache() -> &'static Cache<i32, Post> {
const CUT_LENGTH: isize = 100; static CACHE: OnceLock<Cache<i32, Post>> = OnceLock::new();
CACHE.get_or_init(|| Cache::builder().max_capacity(10_000).build())
macro_rules! post_cache_key {
($id: expr) => {
format!("hole_v2:cache:post:{}:v2", $id)
};
} }
pub struct PostCache { fn post_comment_cache() -> &'static Cache<String, Vec<Comment>> {
rconn: RdsConn, static CACHE: OnceLock<Cache<String, Vec<Comment>>> = OnceLock::new();
CACHE.get_or_init(|| {
Cache::builder()
.time_to_idle(Duration::from_secs(INSTANCE_EXPIRE_TIME))
.build()
})
} }
// Each list in post_list_cache, keyed by room_id and mode, is a sorted list. The element is a pair of numbers. The first one is the weight used to sort, the second one is the post id.
fn post_list_cache() -> &'static Cache<String, Arc<RwLock<Vec<(i64, i32)>>>> {
static CACHE: OnceLock<Cache<String, Arc<RwLock<Vec<(i64, i32)>>>>> = OnceLock::new();
CACHE.get_or_init(|| Cache::builder().build())
}
fn user_cache() -> &'static Cache<String, User> {
static CACHE: OnceLock<Cache<String, User>> = OnceLock::new();
CACHE.get_or_init(|| {
Cache::builder()
.time_to_idle(Duration::from_secs(INSTANCE_EXPIRE_TIME))
.build()
})
}
fn block_dict_cache() -> &'static Cache<String, Arc<RwLock<HashMap<String, bool>>>> {
static CACHE: OnceLock<Cache<String, Arc<RwLock<HashMap<String, bool>>>>> = OnceLock::new();
CACHE.get_or_init(|| {
Cache::builder()
.time_to_idle(Duration::from_secs(INSTANCE_EXPIRE_TIME))
.build()
})
}
fn user_count_cache() -> &'static Cache<String, i64> {
static CACHE: OnceLock<Cache<String, i64>> = OnceLock::new();
CACHE.get_or_init(|| {
Cache::builder()
.time_to_live(Duration::from_secs(USER_COUNT_EXPIRE_TIME))
.build()
})
}
fn map_shared_diesel_error(err: Arc<DieselError>) -> DieselError {
match err.as_ref() {
DieselError::NotFound => DieselError::NotFound,
DieselError::RollbackTransaction => DieselError::RollbackTransaction,
DieselError::AlreadyInTransaction => DieselError::AlreadyInTransaction,
DieselError::NotInTransaction => DieselError::NotInTransaction,
DieselError::BrokenTransactionManager => DieselError::BrokenTransactionManager,
_ => DieselError::QueryBuilderError(Box::new(io::Error::other(err.to_string()))),
}
}
pub struct PostCache;
impl PostCache { impl PostCache {
init!(); pub async fn sets(ps: &[&Post]) {
clear_all!("hole_v2:cache::post:*:v2");
pub async fn sets(&mut self, ps: &[&Post]) {
if ps.is_empty() { if ps.is_empty() {
return; return;
} }
let kvs: Vec<(String, String)> = ps for p in ps {
.iter() post_cache().insert(p.id, (*p).clone()).await;
.map(|p| (post_cache_key!(p.id), serde_json::to_string(p).unwrap()))
.collect();
self.rconn.mset(&kvs).await.unwrap_or_else(|e| {
warn!("set post cache failed: {}", e);
dbg!(&kvs);
});
}
pub async fn get(&mut self, pid: &i32) -> Option<Post> {
let key = post_cache_key!(pid);
let rds_result: Option<String> = self
.rconn
.get::<String, Option<String>>(key)
.await
.unwrap_or_else(|e| {
warn!("try to get post cache, connect rds failed, {}", e);
None
});
rds_result.and_then(|s| {
serde_json::from_str(&s).unwrap_or_else(|e| {
warn!("get post cache, decode failed {}, {}", e, s);
None
})
})
}
pub async fn gets(&mut self, pids: &[i32]) -> Vec<Option<Post>> {
// 长度为1时会走GET而非MGET返回值格式不兼容。愚蠢的设计。
match pids.len() {
0 => vec![],
1 => vec![self.get(&pids[0]).await],
_ => {
let ks: Vec<String> = pids.iter().map(|pid| post_cache_key!(pid)).collect();
// dbg!(&ks);
// Vec is single arg, while &Vec is not. Seems a bug.
let rds_result: Vec<Option<String>> = self
.rconn
.get::<Vec<String>, Vec<Option<String>>>(ks)
.await
.unwrap_or_else(|e| {
warn!("try to get posts cache, connect rds failed, {}", e);
vec![None; pids.len()]
});
// dbg!(&rds_result);
// 定期热度衰减的时候会清空缓存,这里设不设置过期时间影响不大
rds_result
.into_iter()
.map(|x| {
// dbg!(&x);
x.and_then(|s| {
serde_json::from_str(&s).unwrap_or_else(|e| {
warn!("get post cache, decode failed {}, {}", e, s);
None
})
})
})
.collect()
}
} }
} }
pub async fn get(pid: &i32) -> Option<Post> {
post_cache().get(pid).await
}
pub async fn get_with<F>(pid: i32, init: F) -> QueryResult<Post>
where
F: Future<Output = QueryResult<Post>>,
{
post_cache()
.try_get_with(pid, init)
.await
.map_err(map_shared_diesel_error)
}
pub async fn gets(pids: &[i32]) -> Vec<Option<Post>> {
future::join_all(pids.iter().map(Self::get)).await
}
pub async fn clear_all() {
post_cache().invalidate_all();
}
} }
pub struct PostCommentCache { pub struct PostCommentCache {
key: String, key: String,
rconn: RdsConn,
} }
impl PostCommentCache { impl PostCommentCache {
init!(i32, "hole_v2:cache:post_comments:{}"); pub fn init(post_id: i32) -> Self {
Self {
pub async fn set(&mut self, cs: &[Comment]) { key: format!("hole_v2:cache:post_comments:{}", post_id),
self.rconn
.set_ex(
&self.key,
serde_json::to_string(cs).unwrap(),
INSTANCE_EXPIRE_TIME,
)
.await
.unwrap_or_else(|e| {
warn!("set comments cache failed: {}", e);
dbg!(cs);
})
}
pub async fn get(&mut self) -> Option<Vec<Comment>> {
let rds_result = self.rconn.get::<&String, String>(&self.key).await;
// dbg!(&rds_result);
if let Ok(s) = rds_result {
self.rconn
.expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME as i64)
.await
.unwrap_or_else(|e| {
warn!(
"get comments cache, set new expire failed: {}, {}, {} ",
e, &self.key, &s
);
false
});
serde_json::from_str(&s).unwrap_or_else(|e| {
warn!("get comments cache, decode failed {}, {}", e, s);
None
})
} else {
None
} }
} }
pub async fn get_with<F>(&self, init: F) -> QueryResult<Vec<Comment>>
where
F: Future<Output = QueryResult<Vec<Comment>>>,
{
post_comment_cache()
.try_get_with(self.key.clone(), init)
.await
.map_err(map_shared_diesel_error)
}
pub async fn clear(&mut self) { pub async fn clear(&mut self) {
self.rconn.del(&self.key).await.unwrap_or_else(|e| { post_comment_cache().invalidate(&self.key).await;
warn!("clear commenrs cache fail, {}", e);
});
} }
} }
pub struct PostListCache { pub struct PostListCache {
key: String, key: String,
mode: u8, mode: u8,
rconn: RdsConn,
length: isize,
} }
impl PostListCache { impl PostListCache {
pub fn init(room_id: Option<i32>, mode: u8, rconn: &RdsConn) -> Self { pub const MAX_LENGTH: usize = 900;
// pub const MIN_LENGTH: usize = 200;
pub const CUT_LENGTH: usize = 100;
pub fn init(room_id: Option<i32>, mode: u8) -> Self {
Self { Self {
key: format!( key: format!(
"hole_v2:cache:post_list:{}:{}", "hole_v2:cache:post_list:{}:{}",
match room_id { room_id.map_or_else(String::new, |i| i.to_string()),
Some(i) => i.to_string(),
None => "".to_owned(),
},
&mode &mode
), ),
mode, mode,
rconn: rconn.clone(),
length: 0,
} }
} }
async fn set_and_check_length(&mut self) {
let mut l = self.rconn.zcard(&self.key).await.unwrap();
if l > MAX_LENGTH {
self.rconn
.zremrangebyrank::<&String, ()>(&self.key, MAX_LENGTH - CUT_LENGTH, -1)
.await
.unwrap_or_else(|e| {
warn!("cut list cache failed, {}, {}", e, &self.key);
});
l = MIN_LENGTH;
}
self.length = l;
}
pub async fn need_fill(&mut self) -> bool {
self.set_and_check_length().await;
self.length < MIN_LENGTH
}
pub fn i64_len(&self) -> i64 {
self.length.try_into().unwrap()
}
pub fn i64_minlen(&self) -> i64 {
MIN_LENGTH.try_into().unwrap()
}
fn p2pair(&self, p: &Post) -> (i64, i32) { fn p2pair(&self, p: &Post) -> (i64, i32) {
( (
match self.mode { match self.mode {
@@ -223,163 +173,155 @@ impl PostListCache {
) )
} }
pub async fn fill(&mut self, ps: &[Post]) { pub async fn fill_with<F>(&mut self, query_posts: F) -> QueryResult<usize>
let items: Vec<(i64, i32)> = ps.iter().map(|p| self.p2pair(p)).collect(); where
self.rconn F: Future<Output = QueryResult<Vec<Post>>>,
.zadd_multiple(&self.key, &items) {
let list_ref = post_list_cache()
.try_get_with(self.key.clone(), async {
let mut items: Vec<(i64, i32)> =
query_posts.await?.iter().map(|p| self.p2pair(&p)).collect();
items.sort_by(|a, b| a.0.cmp(&b.0));
Ok(Arc::new(RwLock::new(items)))
})
.await .await
.unwrap_or_else(|e| { .map_err(map_shared_diesel_error)?;
warn!("fill list cache failed, {} {}", e, &self.key); let list = list_ref.read().await;
});
self.set_and_check_length().await; // Double-Checked Locking
} if list.len() <= Self::MAX_LENGTH {
return Ok(list.len());
}
drop(list);
let mut list = list_ref.write().await;
pub async fn put(&mut self, p: &Post) { if list.len() <= Self::MAX_LENGTH {
// 其他都是加到最前面的但热榜不是。可能导致MIN_LENGTH到MAX_LENGTH之间的数据不可靠 Ok(list.len())
// 影响不大,先不管了
if p.is_deleted || (self.mode > 0 && p.is_reported) {
self.rconn.zrem(&self.key, p.id).await.unwrap_or_else(|e| {
warn!(
"remove from list cache failed, {} {} {}",
e, &self.key, p.id
);
});
} else { } else {
let (s, m) = self.p2pair(p); list.truncate(Self::MAX_LENGTH - Self::CUT_LENGTH);
self.rconn.zadd(&self.key, m, s).await.unwrap_or_else(|e| { Ok(list.len())
warn!(
"put into list cache failed, {} {} {} {}",
e, &self.key, m, s
);
});
} }
} }
pub async fn get_pids(&mut self, start: i64, limit: i64) -> Vec<i32> { pub async fn put(&mut self, p: &Post) {
self.rconn // Don't put is there is no cache. Let fill_with handle it.
.zrange( if let Some(list_ref) = post_list_cache().get(&self.key).await {
&self.key, let mut list = list_ref.write().await;
start.try_into().unwrap(), // Remove any existing entry for this post_id
(start + limit - 1).try_into().unwrap(), if let Some(pos) = list.iter().position(|(_, pid)| *pid == p.id) {
) list.remove(pos);
.await }
.unwrap() if p.is_deleted || (self.mode > 0 && p.is_reported) {
return;
}
list.push(self.p2pair(p));
list.sort_by(|a, b| a.0.cmp(&b.0));
}
}
pub async fn get_pids(&mut self, start: usize, limit: usize) -> Vec<i32> {
if let Some(list_ref) = post_list_cache().get(&self.key).await {
let list = list_ref.read().await;
list.iter()
.skip(start)
.take(limit)
.map(|(_, pid)| *pid)
.collect()
} else {
vec![]
}
} }
pub async fn clear(&mut self) { pub async fn clear(&mut self) {
self.rconn.del(&self.key).await.unwrap_or_else(|e| { post_list_cache().invalidate(&self.key).await;
warn!("clear post list cache failed, {}", e);
});
} }
} }
pub struct UserCache { pub struct UserCache {
key: String, key: String,
rconn: RdsConn,
} }
impl UserCache { impl UserCache {
init!(&str, "hole_v2:cache:user:{}"); pub fn init(user_id: &str) -> Self {
Self {
clear_all!("hole_v2:cache:user:*"); key: format!("hole_v2:cache:user:{}", user_id),
}
pub async fn set(&mut self, u: &User) {
self.rconn
.set_ex(
&self.key,
serde_json::to_string(u).unwrap(),
INSTANCE_EXPIRE_TIME,
)
.await
.unwrap_or_else(|e| {
warn!("set user cache failed: {}", e);
dbg!(u);
})
} }
pub async fn get(&mut self) -> Option<User> { // No need to use get_with for User. Just check and set separately.
let rds_result = self.rconn.get::<&String, String>(&self.key).await; pub async fn set(&self, u: &User) {
if let Ok(s) = rds_result { user_cache().insert(self.key.clone(), u.clone()).await;
self.rconn }
.expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME as i64)
.await pub async fn get(&self) -> Option<User> {
.unwrap_or_else(|e| { user_cache().get(&self.key).await
warn!( }
"get user cache, set new expire failed: {}, {}, {} ",
e, &self.key, &s pub async fn clear_all() {
); user_cache().invalidate_all();
false
});
serde_json::from_str(&s).unwrap_or_else(|e| {
warn!("get user cache, decode failed {}, {}", e, s);
None
})
} else {
None
}
} }
} }
pub struct BlockDictCache { pub struct BlockDictCache {
key: String, key: String,
rconn: RdsConn,
} }
impl BlockDictCache { impl BlockDictCache {
// namehash, pid pub fn init(namehash: &str, post_id: i32) -> Self {
init!(&str, i32, "hole_v2:cache:block_dict:{}:{}"); Self {
key: format!("hole_v2:cache:block_dict:{}:{}", namehash, post_id),
}
}
pub async fn get_or_create( pub async fn get_or_create(
&mut self, &mut self,
user: &CurrentUser, user: &CurrentUser,
hash_list: &[&String], hash_list: &[&String],
rconn: &RdsConn,
) -> RedisResult<HashMap<String, bool>> { ) -> RedisResult<HashMap<String, bool>> {
let mut block_dict = self let dict_ref = block_dict_cache()
.rconn .get_with(self.key.clone(), async move {
.hgetall::<&String, HashMap<String, bool>>(&self.key) Arc::new(RwLock::new(HashMap::new()))
.await?; })
.await;
//dbg!(&self.key, &block_dict); // Find missing hashes
let mut missing_keys: Vec<String> = Vec::new();
let missing: Vec<(String, bool)> = {
future::try_join_all(hash_list.iter().filter_map(|hash| { let block_dict = dict_ref.read().await;
(!block_dict.contains_key(&hash.to_string())).then_some(async { for hash in hash_list {
Ok::<(String, bool), RedisError>(( if !block_dict.contains_key(hash.as_str()) {
hash.to_string(), missing_keys.push((*hash).clone());
BlockedUsers::check_if_block(&self.rconn, user, hash).await?, }
)) }
})
}))
.await?;
if !missing.is_empty() {
self.rconn.hset_multiple(&self.key, &missing).await?;
self.rconn
.expire(&self.key, INSTANCE_EXPIRE_TIME as i64)
.await?;
block_dict.extend(missing.into_iter());
} }
//dbg!(&block_dict); if !missing_keys.is_empty() {
let mut missing: Vec<(String, bool)> = Vec::with_capacity(missing_keys.len());
for hash in missing_keys {
let is_blocked = BlockedUsers::check_if_block(rconn, user, &hash).await?;
missing.push((hash, is_blocked));
}
Ok(block_dict) let mut block_dict = dict_ref.write().await;
for (hash, is_blocked) in missing {
block_dict.entry(hash).or_insert(is_blocked);
}
}
let out = dict_ref.read().await.clone();
Ok(out)
} }
pub async fn clear(&mut self) -> RedisResult<()> { pub async fn clear(&mut self) {
self.rconn.del(&self.key).await block_dict_cache().invalidate(&self.key).await;
} }
} }
pub async fn cached_user_count(db: &Db, rconn: &mut RdsConn) -> Api<i64> { pub async fn cached_user_count(db: &Db) -> Api<i64> {
let cnt: Option<i64> = rconn.get(KEY_USER_COUNT).await?; let key = "hole_v2:cache:user_count";
if let Some(x) = cnt { Ok(user_count_cache()
Ok(x) .try_get_with(key.to_string(), async { User::get_count(db).await })
} else { .await
let x = User::get_count(db).await?; .map_err(map_shared_diesel_error)?)
rconn
.set_ex(KEY_USER_COUNT, x, USER_COUNT_EXPIRE_TIME)
.await?;
Ok(x)
}
} }

View File

@@ -49,20 +49,19 @@ async fn main() {
let rmc = init_rds_client().await; let rmc = init_rds_client().await;
let mut rconn = RdsConn(rmc.clone()); let mut rconn = RdsConn(rmc.clone());
let mut c_start = establish_connection(); let mut c_start = establish_connection();
models::User::clear_non_admin_users(&mut c_start, &mut rconn).await; models::User::clear_non_admin_users(&mut c_start).await;
clear_outdate_redis_data(&mut rconn).await; clear_outdate_redis_data(&mut rconn).await;
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
sleep(Duration::from_secs(3 * 60 * 60)).await; sleep(Duration::from_secs(3 * 60 * 60)).await;
models::Post::annealing(&mut c_start, &mut rconn).await; models::Post::annealing(&mut c_start).await;
} }
}); });
let rconn = RdsConn(rmc.clone());
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
for room_id in (0..5).map(Some).chain([None, Some(42)]) { for room_id in (0..5).map(Some).chain([None, Some(42)]) {
cache::PostListCache::init(room_id, 3, &rconn).clear().await; cache::PostListCache::init(room_id, 3).clear().await;
} }
sleep(Duration::from_secs(5 * 60)).await; sleep(Duration::from_secs(5 * 60)).await;
} }

View File

@@ -3,7 +3,6 @@
use crate::cache::*; use crate::cache::*;
use crate::db_conn::{Conn, Db}; use crate::db_conn::{Conn, Db};
use crate::random_hasher::random_string; use crate::random_hasher::random_string;
use crate::rds_conn::RdsConn;
use crate::schema::*; use crate::schema::*;
use chrono::{offset::Utc, DateTime}; use chrono::{offset::Utc, DateTime};
use diesel::sql_types::*; use diesel::sql_types::*;
@@ -93,7 +92,7 @@ macro_rules! with_log {
}; };
} }
#[derive(Queryable, Insertable, Serialize, Deserialize, Debug)] #[derive(Queryable, Insertable, Serialize, Deserialize, Debug, Clone)]
#[serde(crate = "rocket::serde")] #[serde(crate = "rocket::serde")]
pub struct Comment { pub struct Comment {
pub id: i32, pub id: i32,
@@ -107,7 +106,7 @@ pub struct Comment {
pub post_id: i32, pub post_id: i32,
} }
#[derive(Queryable, Insertable, Serialize, Deserialize, Debug)] #[derive(Queryable, Insertable, Serialize, Deserialize, Debug, Clone)]
#[serde(crate = "rocket::serde")] #[serde(crate = "rocket::serde")]
pub struct Post { pub struct Post {
pub id: i32, pub id: i32,
@@ -129,7 +128,7 @@ pub struct Post {
pub down_votes: i32, pub down_votes: i32,
} }
#[derive(Queryable, Insertable, Serialize, Deserialize, Debug)] #[derive(Queryable, Insertable, Serialize, Deserialize, Debug, Clone)]
#[serde(crate = "rocket::serde")] #[serde(crate = "rocket::serde")]
pub struct User { pub struct User {
pub id: i32, pub id: i32,
@@ -156,9 +155,8 @@ impl Post {
_get_multi!(posts); _get_multi!(posts);
pub async fn get_multi(db: &Db, rconn: &RdsConn, ids: &[i32]) -> QueryResult<Vec<Self>> { pub async fn get_multi(db: &Db, ids: &[i32]) -> QueryResult<Vec<Self>> {
let mut cacher = PostCache::init(rconn); let mut cached_posts = PostCache::gets(ids).await;
let mut cached_posts = cacher.gets(ids).await;
let mut id2po = HashMap::<i32, &mut Option<Post>>::new(); let mut id2po = HashMap::<i32, &mut Option<Post>>::new();
// dbg!(&cached_posts); // dbg!(&cached_posts);
@@ -180,7 +178,7 @@ impl Post {
let missing_ps = Self::_get_multi(db, missing_ids).await?; let missing_ps = Self::_get_multi(db, missing_ids).await?;
// dbg!(&missing_ps); // dbg!(&missing_ps);
cacher.sets(&missing_ps.iter().collect::<Vec<_>>()).await; PostCache::sets(&missing_ps.iter().collect::<Vec<_>>()).await;
for p in missing_ps.into_iter() { for p in missing_ps.into_iter() {
if let Some(op) = id2po.get_mut(&p.id) { if let Some(op) = id2po.get_mut(&p.id) {
@@ -194,55 +192,52 @@ impl Post {
.collect()) .collect())
} }
pub async fn get(db: &Db, rconn: &RdsConn, id: i32) -> QueryResult<Self> { pub async fn get(db: &Db, id: i32) -> QueryResult<Self> {
// 注意即使is_deleted也应该缓存和返回 // 注意即使is_deleted也应该缓存和返回
let mut cacher = PostCache::init(rconn); PostCache::get_with(id, async move { Self::_get(db, id).await }).await
if let Some(p) = cacher.get(&id).await {
Ok(p)
} else {
let p = Self::_get(db, id).await?;
cacher.sets(&[&p]).await;
Ok(p)
}
} }
pub async fn get_comments(&self, db: &Db, rconn: &RdsConn) -> QueryResult<Vec<Comment>> { pub async fn get_comments(&self, db: &Db) -> QueryResult<Vec<Comment>> {
let mut cacher = PostCommentCache::init(self.id, rconn); let cacher = PostCommentCache::init(self.id);
if let Some(cs) = cacher.get().await { cacher
Ok(cs) .get_with(async move { Comment::gets_by_post_id(db, self.id).await })
} else { .await
let cs = Comment::gets_by_post_id(db, self.id).await?;
cacher.set(&cs).await;
Ok(cs)
}
} }
pub async fn clear_comments_cache(&self, rconn: &RdsConn) { pub async fn clear_comments_cache(&self) {
PostCommentCache::init(self.id, rconn).clear().await; PostCommentCache::init(self.id).clear().await;
} }
pub async fn gets_by_page( pub async fn gets_by_page(
db: &Db, db: &Db,
rconn: &RdsConn,
room_id: Option<i32>, room_id: Option<i32>,
order_mode: u8, order_mode: u8,
start: i64, start: usize,
limit: i64, limit: usize,
) -> QueryResult<Vec<Self>> { ) -> QueryResult<Vec<Self>> {
let mut cacher = PostListCache::init(room_id, order_mode, rconn); let mut cacher = PostListCache::init(room_id, order_mode);
if cacher.need_fill().await {
let pids = let current_len = cacher
Self::_get_ids_by_page(db, room_id, order_mode, 0, cacher.i64_minlen()).await?; .fill_with(async move {
let ps = Self::get_multi(db, rconn, &pids).await?; let pids = Self::_get_ids_by_page(
cacher.fill(&ps).await; db,
} room_id,
let pids = if start + limit > cacher.i64_len() { order_mode,
Self::_get_ids_by_page(db, room_id, order_mode, start, limit).await? 0,
PostListCache::MAX_LENGTH as i64,
)
.await?;
Self::get_multi(db, &pids).await
})
.await?;
let pids = if start + limit > current_len {
Self::_get_ids_by_page(db, room_id, order_mode, start as i64, limit as i64).await?
} else { } else {
cacher.get_pids(start, limit).await cacher.get_pids(start, limit).await
}; };
Self::get_multi(db, rconn, &pids).await Self::get_multi(db, &pids).await
} }
async fn _get_ids_by_page( async fn _get_ids_by_page(
db: &Db, db: &Db,
@@ -281,7 +276,6 @@ impl Post {
pub async fn search( pub async fn search(
db: &Db, db: &Db,
rconn: &RdsConn,
room_id: Option<i32>, room_id: Option<i32>,
search_mode: u8, search_mode: u8,
search_text: String, search_text: String,
@@ -339,7 +333,7 @@ impl Post {
.load(with_log!(c)) .load(with_log!(c))
}) })
.await?; .await?;
Self::get_multi(db, rconn, &pids).await Self::get_multi(db, &pids).await
} }
pub async fn create(db: &Db, new_post: NewPost) -> QueryResult<Self> { pub async fn create(db: &Db, new_post: NewPost) -> QueryResult<Self> {
@@ -351,18 +345,16 @@ impl Post {
.await .await
} }
pub async fn set_instance_cache(&self, rconn: &RdsConn) { pub async fn set_instance_cache(&self) {
PostCache::init(rconn).sets(&[self]).await; PostCache::sets(&[self]).await;
} }
pub async fn refresh_cache(&self, rconn: &RdsConn, is_new: bool) { pub async fn refresh_cache(&self, is_new: bool) {
join!( join!(
self.set_instance_cache(rconn), self.set_instance_cache(),
future::join_all((if is_new { [0, 2, 3, 4] } else { [1, 2, 3, 4] }).map( future::join_all((if is_new { [0, 2, 3, 4] } else { [1, 2, 3, 4] }).map(
|mode| async move { |mode| async move {
PostListCache::init(None, mode, &rconn.clone()) PostListCache::init(None, mode).put(self).await;
.put(self) PostListCache::init(Some(self.room_id), mode)
.await;
PostListCache::init(Some(self.room_id), mode, &rconn.clone())
.put(self) .put(self)
.await; .await;
} }
@@ -370,16 +362,16 @@ impl Post {
); );
} }
pub async fn annealing(c: &mut Conn, rconn: &mut RdsConn) { pub async fn annealing(c: &mut Conn) {
info!("Time for annealing!"); info!("Time for annealing!");
diesel::update(posts::table.filter(posts::hot_score.gt(10))) diesel::update(posts::table.filter(posts::hot_score.gt(10)))
.set(posts::hot_score.eq(floor(float4(posts::hot_score) * 0.9))) .set(posts::hot_score.eq(floor(float4(posts::hot_score) * 0.9)))
.execute(with_log!(c)) .execute(with_log!(c))
.unwrap(); .unwrap();
PostCache::clear_all(rconn).await; PostCache::clear_all().await;
for room_id in (0..5).map(Some).chain([None, Some(42)]) { for room_id in (0..5).map(Some).chain([None, Some(42)]) {
PostListCache::init(room_id, 2, rconn).clear().await; PostListCache::init(room_id, 2).clear().await;
} }
} }
} }
@@ -396,7 +388,11 @@ impl User {
.ok() .ok()
} }
pub async fn get_by_token(db: &Db, rconn: &RdsConn, token: &str) -> Option<Self> { pub async fn get_by_token(db: &Db, token: &str) -> Option<Self> {
let mut cacher = UserCache::init(token);
if let Some(u) = cacher.get().await {
return Some(u);
}
let real_token; let real_token;
let token = match &token.split(':').collect::<Vec<&str>>()[..] { let token = match &token.split(':').collect::<Vec<&str>>()[..] {
@@ -410,7 +406,7 @@ impl User {
_ => token, _ => token,
}; };
// dbg!(token); // dbg!(token);
let mut cacher = UserCache::init(token, rconn); let mut cacher = UserCache::init(token);
if let Some(u) = cacher.get().await { if let Some(u) = cacher.get().await {
Some(u) Some(u)
} else { } else {
@@ -452,11 +448,11 @@ impl User {
.await .await
} }
pub async fn clear_non_admin_users(c: &mut Conn, rconn: &mut RdsConn) { pub async fn clear_non_admin_users(c: &mut Conn) {
diesel::delete(users::table.filter(users::is_admin.eq(false))) diesel::delete(users::table.filter(users::is_admin.eq(false)))
.execute(c) .execute(c)
.unwrap(); .unwrap();
UserCache::clear_all(rconn).await; UserCache::clear_all().await;
} }
pub async fn get_count(db: &Db) -> QueryResult<i64> { pub async fn get_count(db: &Db) -> QueryResult<i64> {