feat: annealing for hot score & clean cache
This commit is contained in:
14
Cargo.toml
14
Cargo.toml
@@ -8,16 +8,14 @@ license = "AGPL-3.0"
|
||||
|
||||
[dependencies]
|
||||
rocket = { version = "0.5.0-rc.1", features = ["json"] }
|
||||
rocket_sync_db_pools = { version = "0.1.0-cr.1", features = ["diesel_postgres_pool"] }
|
||||
diesel = { version = "1.4.8", features = ["postgres", "chrono"] }
|
||||
diesel_migrations = "1.4.0"
|
||||
tokio = "1.17.0"
|
||||
redis = { version="0.21.5", features = ["aio", "tokio-comp"] }
|
||||
chrono = { version="0.*", features =["serde"] }
|
||||
rand = "0.*"
|
||||
dotenv = "0.*"
|
||||
sha2 = "0.*"
|
||||
chrono = { version="0.4.19", features =["serde"] }
|
||||
rand = "0.8.5"
|
||||
dotenv = "0.15.0"
|
||||
sha2 = "0.10.2"
|
||||
log = "0.4.16"
|
||||
env_logger = "0.9.0"
|
||||
|
||||
[dependencies.rocket_sync_db_pools]
|
||||
version = "0.1.0-rc.1"
|
||||
features = ["diesel_postgres_pool"]
|
||||
|
||||
@@ -29,18 +29,18 @@ pub async fn delete(
|
||||
p.change_n_comments(&db, -1).await?;
|
||||
p.change_hot_score(&db, -1).await?;
|
||||
|
||||
p.refresh_cache(&rconn, false).await;
|
||||
p.clear_comments_cache(&rconn).await;
|
||||
}
|
||||
"pid" => {
|
||||
p = Post::get(&db, &rconn, di.id).await?;
|
||||
p.soft_delete(&user, &db).await?;
|
||||
// 如果是删除,需要也从0号缓存队列中去掉
|
||||
p.refresh_cache(&rconn, true).await;
|
||||
}
|
||||
_ => return Err(APIError::PcError(NotAllowed)),
|
||||
}
|
||||
|
||||
// 如果是删除,需要也从0号缓存队列中去掉
|
||||
p.refresh_cache(&rconn, true).await;
|
||||
|
||||
Ok(json!({
|
||||
"code": 0
|
||||
}))
|
||||
|
||||
@@ -164,7 +164,7 @@ pub async fn edit_cw(cwi: Form<CwInput>, user: CurrentUser, db: Db, rconn: RdsCo
|
||||
}
|
||||
p.check_permission(&user, "w")?;
|
||||
p.update_cw(&db, cwi.cw.to_string()).await?;
|
||||
p.refresh_cache(&rconn, false);
|
||||
p.refresh_cache(&rconn, false).await;
|
||||
Ok(json!({"code": 0}))
|
||||
}
|
||||
|
||||
|
||||
28
src/cache.rs
28
src/cache.rs
@@ -48,7 +48,7 @@ impl PostCache {
|
||||
.get::<String, Option<String>>(key)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!("try to get post cache, connect rds fail, {}", e);
|
||||
warn!("try to get post cache, connect rds failed, {}", e);
|
||||
None
|
||||
});
|
||||
|
||||
@@ -74,7 +74,7 @@ impl PostCache {
|
||||
.get::<Vec<String>, Vec<Option<String>>>(ks)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!("try to get posts cache, connect rds fail, {}", e);
|
||||
warn!("try to get posts cache, connect rds failed, {}", e);
|
||||
vec![None; pids.len()]
|
||||
});
|
||||
// dbg!(&rds_result);
|
||||
@@ -96,6 +96,22 @@ impl PostCache {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn clear_all(&mut self) {
|
||||
let mut keys = self.rconn.scan_match::<String, String>(post_cache_key!("*")).await.unwrap(); //.collect::<Vec<String>>().await;
|
||||
// colllect() does not work
|
||||
// also see: https://github.com/mitsuhiko/redis-rs/issues/583
|
||||
let mut ks_for_del = Vec::new();
|
||||
while let Some(key) = keys.next_item().await {
|
||||
ks_for_del.push(key);
|
||||
}
|
||||
if ks_for_del.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.rconn.del(ks_for_del).await.unwrap_or_else(|e| {
|
||||
warn!("clear all post cache fail, {}", e)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostCommentCache {
|
||||
@@ -228,7 +244,7 @@ impl PostListCommentCache {
|
||||
pub async fn put(&mut self, p: &Post) {
|
||||
// 其他都是加到最前面的,但热榜不是。可能导致MIN_LENGTH到MAX_LENGTH之间的数据不可靠
|
||||
// 影响不大,先不管了
|
||||
if p.is_deleted {
|
||||
if p.is_deleted || (self.mode > 0 && p.is_reported) {
|
||||
self.rconn.zrem(&self.key, p.id).await.unwrap_or_else(|e| {
|
||||
warn!(
|
||||
"remove from list cache failed, {} {} {}",
|
||||
@@ -256,6 +272,12 @@ impl PostListCommentCache {
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub async fn clear(&mut self) {
|
||||
self.rconn.del(&self.key).await.unwrap_or_else(|e| {
|
||||
warn!("clear post list cache failed, {}", e);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UserCache {
|
||||
|
||||
@@ -1,7 +1,17 @@
|
||||
use rocket_sync_db_pools::{database, diesel};
|
||||
use diesel::Connection;
|
||||
use std::env;
|
||||
|
||||
pub type Conn = diesel::pg::PgConnection;
|
||||
|
||||
#[database("pg_v2")]
|
||||
pub struct Db(Conn);
|
||||
|
||||
|
||||
// get sync connection, only for annealing
|
||||
pub fn establish_connection() -> Conn {
|
||||
let database_url = env::var("DATABASE_URL")
|
||||
.expect("DATABASE_URL must be set");
|
||||
Conn::establish(&database_url)
|
||||
.expect(&format!("Error connecting to {}", database_url))
|
||||
}
|
||||
|
||||
16
src/main.rs
16
src/main.rs
@@ -11,20 +11,21 @@ extern crate diesel_migrations;
|
||||
extern crate log;
|
||||
|
||||
mod api;
|
||||
mod cache;
|
||||
mod db_conn;
|
||||
mod libs;
|
||||
mod models;
|
||||
mod random_hasher;
|
||||
mod rds_conn;
|
||||
mod rds_models;
|
||||
mod cache;
|
||||
mod schema;
|
||||
|
||||
use db_conn::{Conn, Db};
|
||||
use db_conn::{establish_connection, Conn, Db};
|
||||
use diesel::Connection;
|
||||
use random_hasher::RandomHasher;
|
||||
use rds_conn::init_rds_client;
|
||||
use std::env;
|
||||
use tokio::time::{interval, Duration};
|
||||
|
||||
embed_migrations!("migrations/postgres");
|
||||
|
||||
@@ -36,6 +37,15 @@ async fn main() -> Result<(), rocket::Error> {
|
||||
return Ok(());
|
||||
}
|
||||
env_logger::init();
|
||||
let rmc = init_rds_client().await;
|
||||
let rconn = rds_conn::RdsConn(rmc.clone());
|
||||
tokio::spawn(async move {
|
||||
let mut itv = interval(Duration::from_secs(4 * 60 * 60));
|
||||
loop {
|
||||
itv.tick().await;
|
||||
models::Post::annealing(establish_connection(), &rconn).await;
|
||||
}
|
||||
});
|
||||
rocket::build()
|
||||
.mount(
|
||||
"/_api/v1",
|
||||
@@ -56,7 +66,7 @@ async fn main() -> Result<(), rocket::Error> {
|
||||
)
|
||||
.register("/_api", catchers![api::catch_401_error])
|
||||
.manage(RandomHasher::get_random_one())
|
||||
.manage(init_rds_client().await)
|
||||
.manage(rmc)
|
||||
.attach(Db::fairing())
|
||||
.launch()
|
||||
.await
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use crate::cache::*;
|
||||
use crate::db_conn::Db;
|
||||
use crate::db_conn::{Conn, Db};
|
||||
use crate::libs::diesel_logger::LoggingConnection;
|
||||
use crate::rds_conn::RdsConn;
|
||||
use crate::schema::*;
|
||||
use chrono::{offset::Utc, DateTime};
|
||||
use diesel::dsl::any;
|
||||
use diesel::sql_types::*;
|
||||
use diesel::{
|
||||
insert_into, BoolExpressionMethods, ExpressionMethods, QueryDsl, QueryResult, RunQueryDsl,
|
||||
TextExpressionMethods,
|
||||
@@ -14,9 +15,10 @@ use diesel::{
|
||||
use rocket::futures::{future, join};
|
||||
use rocket::serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::convert::identity;
|
||||
|
||||
no_arg_sql_function!(RANDOM, (), "Represents the sql RANDOM() function");
|
||||
sql_function!(fn floor(x: Float) -> Int4);
|
||||
sql_function!(fn float4(x: Int4) -> Float);
|
||||
|
||||
macro_rules! _get {
|
||||
($table:ident) => {
|
||||
@@ -162,9 +164,7 @@ impl Post {
|
||||
let missing_ps = Self::_get_multi(db, missing_ids).await?;
|
||||
// dbg!(&missing_ps);
|
||||
|
||||
cacher
|
||||
.sets(&missing_ps.iter().map(identity).collect())
|
||||
.await;
|
||||
cacher.sets(&missing_ps.iter().collect()).await;
|
||||
|
||||
for p in missing_ps.into_iter() {
|
||||
if let Some(op) = id2po.get_mut(&p.id) {
|
||||
@@ -390,6 +390,17 @@ impl Post {
|
||||
})),
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn annealing(mut c: Conn, rconn: &RdsConn) {
|
||||
info!("Time for annealing!");
|
||||
diesel::update(posts::table.filter(posts::hot_score.gt(10)))
|
||||
.set(posts::hot_score.eq(floor(float4(posts::hot_score) * 0.9)))
|
||||
.execute(with_log!(&mut c))
|
||||
.unwrap();
|
||||
|
||||
PostCache::init(&rconn).clear_all().await;
|
||||
PostListCommentCache::init(2, rconn).await.clear().await
|
||||
}
|
||||
}
|
||||
|
||||
impl User {
|
||||
|
||||
Reference in New Issue
Block a user