From 59714bfbef86afd12ab6da1cc71408412fbe0983 Mon Sep 17 00:00:00 2001 From: hole-thu Date: Sat, 26 Mar 2022 01:04:22 +0800 Subject: [PATCH] feat: annealing for hot score & clean cache --- Cargo.toml | 14 ++++++-------- src/api/operation.rs | 6 +++--- src/api/post.rs | 2 +- src/cache.rs | 28 +++++++++++++++++++++++++--- src/db_conn.rs | 10 ++++++++++ src/main.rs | 16 +++++++++++++--- src/models.rs | 21 ++++++++++++++++----- 7 files changed, 74 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2de1914..6b02b58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,16 +8,14 @@ license = "AGPL-3.0" [dependencies] rocket = { version = "0.5.0-rc.1", features = ["json"] } +rocket_sync_db_pools = { version = "0.1.0-cr.1", features = ["diesel_postgres_pool"] } diesel = { version = "1.4.8", features = ["postgres", "chrono"] } diesel_migrations = "1.4.0" +tokio = "1.17.0" redis = { version="0.21.5", features = ["aio", "tokio-comp"] } -chrono = { version="0.*", features =["serde"] } -rand = "0.*" -dotenv = "0.*" -sha2 = "0.*" +chrono = { version="0.4.19", features =["serde"] } +rand = "0.8.5" +dotenv = "0.15.0" +sha2 = "0.10.2" log = "0.4.16" env_logger = "0.9.0" - -[dependencies.rocket_sync_db_pools] -version = "0.1.0-rc.1" -features = ["diesel_postgres_pool"] diff --git a/src/api/operation.rs b/src/api/operation.rs index 9d0aacf..419b958 100644 --- a/src/api/operation.rs +++ b/src/api/operation.rs @@ -29,18 +29,18 @@ pub async fn delete( p.change_n_comments(&db, -1).await?; p.change_hot_score(&db, -1).await?; + p.refresh_cache(&rconn, false).await; p.clear_comments_cache(&rconn).await; } "pid" => { p = Post::get(&db, &rconn, di.id).await?; p.soft_delete(&user, &db).await?; + // 如果是删除,需要也从0号缓存队列中去掉 + p.refresh_cache(&rconn, true).await; } _ => return Err(APIError::PcError(NotAllowed)), } - // 如果是删除,需要也从0号缓存队列中去掉 - p.refresh_cache(&rconn, true).await; - Ok(json!({ "code": 0 })) diff --git a/src/api/post.rs b/src/api/post.rs index eaff116..3142b8f 100644 --- a/src/api/post.rs +++ b/src/api/post.rs @@ -164,7 +164,7 @@ pub async fn edit_cw(cwi: Form, user: CurrentUser, db: Db, rconn: RdsCo } p.check_permission(&user, "w")?; p.update_cw(&db, cwi.cw.to_string()).await?; - p.refresh_cache(&rconn, false); + p.refresh_cache(&rconn, false).await; Ok(json!({"code": 0})) } diff --git a/src/cache.rs b/src/cache.rs index 87dd76c..ca04476 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -48,7 +48,7 @@ impl PostCache { .get::>(key) .await .unwrap_or_else(|e| { - warn!("try to get post cache, connect rds fail, {}", e); + warn!("try to get post cache, connect rds failed, {}", e); None }); @@ -74,7 +74,7 @@ impl PostCache { .get::, Vec>>(ks) .await .unwrap_or_else(|e| { - warn!("try to get posts cache, connect rds fail, {}", e); + warn!("try to get posts cache, connect rds failed, {}", e); vec![None; pids.len()] }); // dbg!(&rds_result); @@ -96,6 +96,22 @@ impl PostCache { } } } + + pub async fn clear_all(&mut self) { + let mut keys = self.rconn.scan_match::(post_cache_key!("*")).await.unwrap(); //.collect::>().await; + // colllect() does not work + // also see: https://github.com/mitsuhiko/redis-rs/issues/583 + let mut ks_for_del = Vec::new(); + while let Some(key) = keys.next_item().await { + ks_for_del.push(key); + } + if ks_for_del.is_empty() { + return; + } + self.rconn.del(ks_for_del).await.unwrap_or_else(|e| { + warn!("clear all post cache fail, {}", e) + }); + } } pub struct PostCommentCache { @@ -228,7 +244,7 @@ impl PostListCommentCache { pub async fn put(&mut self, p: &Post) { // 其他都是加到最前面的,但热榜不是。可能导致MIN_LENGTH到MAX_LENGTH之间的数据不可靠 // 影响不大,先不管了 - if p.is_deleted { + if p.is_deleted || (self.mode > 0 && p.is_reported) { self.rconn.zrem(&self.key, p.id).await.unwrap_or_else(|e| { warn!( "remove from list cache failed, {} {} {}", @@ -256,6 +272,12 @@ impl PostListCommentCache { .await .unwrap() } + + pub async fn clear(&mut self) { + self.rconn.del(&self.key).await.unwrap_or_else(|e| { + warn!("clear post list cache failed, {}", e); + }); + } } pub struct UserCache { diff --git a/src/db_conn.rs b/src/db_conn.rs index 5eb0e66..1263019 100644 --- a/src/db_conn.rs +++ b/src/db_conn.rs @@ -1,7 +1,17 @@ use rocket_sync_db_pools::{database, diesel}; +use diesel::Connection; +use std::env; pub type Conn = diesel::pg::PgConnection; #[database("pg_v2")] pub struct Db(Conn); + +// get sync connection, only for annealing +pub fn establish_connection() -> Conn { + let database_url = env::var("DATABASE_URL") + .expect("DATABASE_URL must be set"); + Conn::establish(&database_url) + .expect(&format!("Error connecting to {}", database_url)) +} diff --git a/src/main.rs b/src/main.rs index 8bd9e9f..4642424 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,20 +11,21 @@ extern crate diesel_migrations; extern crate log; mod api; +mod cache; mod db_conn; mod libs; mod models; mod random_hasher; mod rds_conn; mod rds_models; -mod cache; mod schema; -use db_conn::{Conn, Db}; +use db_conn::{establish_connection, Conn, Db}; use diesel::Connection; use random_hasher::RandomHasher; use rds_conn::init_rds_client; use std::env; +use tokio::time::{interval, Duration}; embed_migrations!("migrations/postgres"); @@ -36,6 +37,15 @@ async fn main() -> Result<(), rocket::Error> { return Ok(()); } env_logger::init(); + let rmc = init_rds_client().await; + let rconn = rds_conn::RdsConn(rmc.clone()); + tokio::spawn(async move { + let mut itv = interval(Duration::from_secs(4 * 60 * 60)); + loop { + itv.tick().await; + models::Post::annealing(establish_connection(), &rconn).await; + } + }); rocket::build() .mount( "/_api/v1", @@ -56,7 +66,7 @@ async fn main() -> Result<(), rocket::Error> { ) .register("/_api", catchers![api::catch_401_error]) .manage(RandomHasher::get_random_one()) - .manage(init_rds_client().await) + .manage(rmc) .attach(Db::fairing()) .launch() .await diff --git a/src/models.rs b/src/models.rs index f5e5c18..95ce1f9 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,12 +1,13 @@ #![allow(clippy::all)] use crate::cache::*; -use crate::db_conn::Db; +use crate::db_conn::{Conn, Db}; use crate::libs::diesel_logger::LoggingConnection; use crate::rds_conn::RdsConn; use crate::schema::*; use chrono::{offset::Utc, DateTime}; use diesel::dsl::any; +use diesel::sql_types::*; use diesel::{ insert_into, BoolExpressionMethods, ExpressionMethods, QueryDsl, QueryResult, RunQueryDsl, TextExpressionMethods, @@ -14,9 +15,10 @@ use diesel::{ use rocket::futures::{future, join}; use rocket::serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::convert::identity; no_arg_sql_function!(RANDOM, (), "Represents the sql RANDOM() function"); +sql_function!(fn floor(x: Float) -> Int4); +sql_function!(fn float4(x: Int4) -> Float); macro_rules! _get { ($table:ident) => { @@ -162,9 +164,7 @@ impl Post { let missing_ps = Self::_get_multi(db, missing_ids).await?; // dbg!(&missing_ps); - cacher - .sets(&missing_ps.iter().map(identity).collect()) - .await; + cacher.sets(&missing_ps.iter().collect()).await; for p in missing_ps.into_iter() { if let Some(op) = id2po.get_mut(&p.id) { @@ -390,6 +390,17 @@ impl Post { })), ); } + + pub async fn annealing(mut c: Conn, rconn: &RdsConn) { + info!("Time for annealing!"); + diesel::update(posts::table.filter(posts::hot_score.gt(10))) + .set(posts::hot_score.eq(floor(float4(posts::hot_score) * 0.9))) + .execute(with_log!(&mut c)) + .unwrap(); + + PostCache::init(&rconn).clear_all().await; + PostListCommentCache::init(2, rconn).await.clear().await + } } impl User {