From 5bef37cd62a7261f154c54b335e1df82acd6044a Mon Sep 17 00:00:00 2001 From: hole-thu Date: Sun, 27 Mar 2022 01:35:46 +0800 Subject: [PATCH] feat: block and dangerous users --- src/api/comment.rs | 79 ++++++++++++++++++++++++--------------- src/api/mod.rs | 6 +++ src/api/operation.rs | 88 ++++++++++++++++++++++++++++++++------------ src/api/post.rs | 32 ++++++++++++---- src/cache.rs | 54 ++++++++++++--------------- src/main.rs | 1 + src/models.rs | 7 ++-- src/rds_models.rs | 88 +++++++++++++++++++++++++++++++++++++++++--- 8 files changed, 254 insertions(+), 101 deletions(-) diff --git a/src/api/comment.rs b/src/api/comment.rs index 667b5f3..306e241 100644 --- a/src/api/comment.rs +++ b/src/api/comment.rs @@ -8,6 +8,7 @@ use crate::schema; use chrono::{offset::Utc, DateTime}; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; use rocket::form::Form; +use rocket::futures::future; use rocket::futures::join; use rocket::serde::{json::json, Serialize}; use std::collections::HashMap; @@ -30,39 +31,59 @@ pub struct CommentOutput { name_id: i32, is_tmp: bool, create_time: DateTime, + is_blocked: bool, + blocked_count: Option, // for old version frontend timestamp: i64, + blocked: bool, } -pub fn c2output<'r>(p: &'r Post, cs: &Vec, user: &CurrentUser) -> Vec { +pub async fn c2output<'r>( + p: &'r Post, + cs: &Vec, + user: &CurrentUser, + rconn: &RdsConn, +) -> Vec { let mut hash2id = HashMap::<&String, i32>::from([(&p.author_hash, 0)]); - cs.iter() - .filter_map(|c| { - let name_id: i32 = match hash2id.get(&c.author_hash) { - Some(id) => *id, - None => { - let x = hash2id.len().try_into().unwrap(); - hash2id.insert(&c.author_hash, x); - x - } - }; - if c.is_deleted { - // TODO: block - None - } else { - Some(CommentOutput { - cid: c.id, - text: format!("{}{}", if c.is_tmp { "[tmp]\n" } else { "" }, c.content), - author_title: c.author_title.to_string(), - can_del: c.check_permission(user, "wd").is_ok(), - name_id: name_id, - is_tmp: c.is_tmp, - create_time: c.create_time, - timestamp: c.create_time.timestamp(), - }) - } - }) - .collect() + let name_ids_iter = cs.iter().map(|c| match hash2id.get(&c.author_hash) { + Some(id) => *id, + None => { + let x = hash2id.len().try_into().unwrap(); + hash2id.insert(&c.author_hash, x); + x + } + }); + future::join_all(cs.iter().zip(name_ids_iter).map(|(c, name_id)| async move { + if c.is_deleted { + None + } else { + let is_blocked = + BlockedUsers::check_blocked(rconn, user.id, &user.namehash, &c.author_hash) + .await + .unwrap_or_default(); + Some(CommentOutput { + cid: c.id, + text: format!("{}{}", if c.is_tmp { "[tmp]\n" } else { "" }, c.content), + author_title: c.author_title.to_string(), + can_del: c.check_permission(user, "wd").is_ok(), + name_id: name_id, + is_tmp: c.is_tmp, + create_time: c.create_time, + is_blocked: is_blocked, + blocked_count: if user.is_admin { + BlockCounter::get_count(rconn, &c.author_hash).await.ok() + } else { + None + }, + timestamp: c.create_time.timestamp(), + blocked: is_blocked, + }) + } + })) + .await + .into_iter() + .filter_map(|x| x) + .collect() } #[get("/getcomment?")] @@ -72,7 +93,7 @@ pub async fn get_comment(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> return Err(APIError::PcError(IsDeleted)); } let cs = p.get_comments(&db, &rconn).await?; - let data = c2output(&p, &cs, &user); + let data = c2output(&p, &cs, &user, &rconn).await; Ok(json!({ "code": 0, diff --git a/src/api/mod.rs b/src/api/mod.rs index 192960c..a0d3aa0 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -12,6 +12,12 @@ use rocket::request::{FromRequest, Outcome, Request}; use rocket::response::{self, Responder}; use rocket::serde::json::{json, Value}; +macro_rules! code0 { + () => ( + Ok(json!({"code": 0})) + ); +} + #[catch(401)] pub fn catch_401_error() -> &'static str { "未登录或token过期" diff --git a/src/api/operation.rs b/src/api/operation.rs index 019a356..b06fd97 100644 --- a/src/api/operation.rs +++ b/src/api/operation.rs @@ -1,14 +1,14 @@ -use crate::api::{APIError, CurrentUser, JsonAPI, PolicyError::*, UGC}; +use crate::api::{CurrentUser, JsonAPI, PolicyError::*, UGC}; use crate::db_conn::Db; +use crate::libs::diesel_logger::LoggingConnection; use crate::models::*; use crate::rds_conn::RdsConn; use crate::rds_models::*; +use crate::schema; use chrono::offset::Local; +use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; use rocket::form::Form; use rocket::serde::json::json; -use crate::libs::diesel_logger::LoggingConnection; -use crate::schema; -use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; #[derive(FromForm)] pub struct DeleteInput { @@ -20,14 +20,11 @@ pub struct DeleteInput { #[post("/delete", data = "")] pub async fn delete(di: Form, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI { - let mut p: Post; - let mut c: Comment; - let author_hash: &str; - match di.id_type.as_str() { + let (author_hash, p) = match di.id_type.as_str() { "cid" => { - c = Comment::get(&db, di.id).await?; + let mut c = Comment::get(&db, di.id).await?; c.soft_delete(&user, &db).await?; - p = Post::get(&db, &rconn, c.post_id).await?; + let mut p = Post::get(&db, &rconn, c.post_id).await?; update!( p, posts, @@ -39,20 +36,21 @@ pub async fn delete(di: Form, user: CurrentUser, db: Db, rconn: Rds p.refresh_cache(&rconn, false).await; p.clear_comments_cache(&rconn).await; - author_hash = &c.author_hash; + (c.author_hash.clone(), p) } "pid" => { - p = Post::get(&db, &rconn, di.id).await?; + let mut p = Post::get(&db, &rconn, di.id).await?; p.soft_delete(&user, &db).await?; + // 如果是删除,需要也从0号缓存队列中去掉 p.refresh_cache(&rconn, true).await; - author_hash = &p.author_hash; + (p.author_hash.clone(), p) } - _ => return Err(APIError::PcError(NotAllowed)), - } + _ => { Err(NotAllowed) }?, + }; - if user.is_admin && !user.namehash.eq(author_hash) { + if user.is_admin && !user.namehash.eq(&author_hash) { Systemlog { user_hash: user.namehash.clone(), action_type: LogType::AdminDelete, @@ -73,18 +71,17 @@ pub async fn delete(di: Form, user: CurrentUser, db: Db, rconn: Rds } .create(&rconn) .await?; - BannedUsers::add(&rconn, author_hash).await?; + BannedUsers::add(&rconn, &author_hash).await?; } } - Ok(json!({ - "code": 0 - })) + code0!() } #[derive(FromForm)] pub struct ReportInput { pid: i32, + #[field(validate = len(0..1000))] reason: String, } @@ -99,12 +96,57 @@ pub async fn report(ri: Form, user: CurrentUser, db: Db, rconn: Rds Systemlog { user_hash: user.namehash, action_type: LogType::Report, - target: format!("#{} {}", ri.pid, if ri.reason.starts_with("评论区") { "评论区" } else {""}), + target: format!( + "#{} {}", + ri.pid, + if ri.reason.starts_with("评论区") { + "评论区" + } else { + "" + } + ), detail: ri.reason.clone(), time: Local::now(), - }.create(&rconn) + } + .create(&rconn) .await?; + + code0!() +} + +#[derive(FromForm)] +pub struct BlockInput { + #[field(name = "type")] + content_type: String, + id: i32, +} + +#[post("/block", data = "")] +pub async fn block(bi: Form, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI { + let mut blk = BlockedUsers::init(user.id.ok_or_else(|| NotAllowed)?, &rconn); + + let nh_to_block = match bi.content_type.as_str() { + "post" => Post::get(&db, &rconn, bi.id).await?.author_hash, + "comment" => Comment::get(&db, bi.id).await?.author_hash, + _ => { Err(NotAllowed) }?, + }; + + if nh_to_block.eq(&user.namehash) { + { Err(NotAllowed) }?; + } + + blk.add(&nh_to_block).await?; + let curr = BlockCounter::count_incr(&rconn, &nh_to_block).await?; + + if curr >= BLOCK_THRESHOLD || user.is_admin { + DangerousUser::add(&rconn, &nh_to_block).await?; + } + Ok(json!({ - "code": 0 + "code": 0, + "data": { + "curr": curr, + "threshold": BLOCK_THRESHOLD, + }, })) } diff --git a/src/api/post.rs b/src/api/post.rs index 4879e8c..5eda4f7 100644 --- a/src/api/post.rs +++ b/src/api/post.rs @@ -40,10 +40,13 @@ pub struct PostOutput { can_del: bool, attention: bool, hot_score: Option, + is_blocked: bool, + blocked_count: Option, // for old version frontend timestamp: i64, likenum: i32, reply: i32, + blocked: bool, } #[derive(FromForm)] @@ -54,6 +57,9 @@ pub struct CwInput { } async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> PostOutput { + let is_blocked = BlockedUsers::check_blocked(rconn, user.id, &user.namehash, &p.author_hash) + .await + .unwrap_or_default(); PostOutput { pid: p.id, text: format!("{}{}", if p.is_tmp { "[tmp]\n" } else { "" }, p.content), @@ -70,10 +76,15 @@ async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> Pos None } else { // 单个洞还有查询评论的接口,这里挂了不用报错 - p.get_comments(db, rconn) - .await - .ok() - .map(|cs| c2output(p, &cs, user)) + Some( + c2output( + p, + &p.get_comments(db, rconn).await.unwrap_or(vec![]), + user, + rconn, + ) + .await, + ) }, can_del: p.check_permission(user, "wd").is_ok(), attention: Attention::init(&user.namehash, &rconn) @@ -81,10 +92,17 @@ async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: &RdsConn) -> Pos .await .unwrap_or_default(), hot_score: user.is_admin.then(|| p.hot_score), + is_blocked: is_blocked, + blocked_count: if user.is_admin { + BlockCounter::get_count(rconn, &p.author_hash).await.ok() + } else { + None + }, // for old version frontend timestamp: p.create_time.timestamp(), likenum: p.n_attentions, reply: p.n_comments, + blocked: is_blocked, } } @@ -153,9 +171,7 @@ pub async fn publish_post( .await?; Attention::init(&user.namehash, &rconn).add(p.id).await?; p.refresh_cache(&rconn, true).await; - Ok(json!({ - "code": 0 - })) + code0!() } #[post("/editcw", data = "")] @@ -164,7 +180,7 @@ pub async fn edit_cw(cwi: Form, user: CurrentUser, db: Db, rconn: RdsCo p.check_permission(&user, "w")?; update!(p, posts, &db, { cw, to cwi.cw.to_string() }); p.refresh_cache(&rconn, false).await; - Ok(json!({"code": 0})) + code0!() } #[get("/getmulti?")] diff --git a/src/cache.rs b/src/cache.rs index ca04476..2a5e518 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,5 +1,6 @@ use crate::models::{Comment, Post, User}; use crate::rds_conn::RdsConn; +use crate::rds_models::init; use rand::Rng; use redis::AsyncCommands; use rocket::serde::json::serde_json; @@ -9,6 +10,7 @@ const INSTANCE_EXPIRE_TIME: usize = 60 * 60; const MIN_LENGTH: isize = 200; const MAX_LENGTH: isize = 900; +const CUT_LENGTH: isize = 100; macro_rules! post_cache_key { ($id: expr) => { @@ -21,11 +23,7 @@ pub struct PostCache { } impl PostCache { - pub fn init(rconn: &RdsConn) -> Self { - PostCache { - rconn: rconn.clone(), - } - } + init!(); pub async fn sets(&mut self, ps: &Vec<&Post>) { if ps.is_empty() { @@ -98,9 +96,13 @@ impl PostCache { } pub async fn clear_all(&mut self) { - let mut keys = self.rconn.scan_match::(post_cache_key!("*")).await.unwrap(); //.collect::>().await; - // colllect() does not work - // also see: https://github.com/mitsuhiko/redis-rs/issues/583 + let mut keys = self + .rconn + .scan_match::(post_cache_key!("*")) + .await + .unwrap(); //.collect::>().await; + // colllect() does not work + // also see: https://github.com/mitsuhiko/redis-rs/issues/583 let mut ks_for_del = Vec::new(); while let Some(key) = keys.next_item().await { ks_for_del.push(key); @@ -108,9 +110,10 @@ impl PostCache { if ks_for_del.is_empty() { return; } - self.rconn.del(ks_for_del).await.unwrap_or_else(|e| { - warn!("clear all post cache fail, {}", e) - }); + self.rconn + .del(ks_for_del) + .await + .unwrap_or_else(|e| warn!("clear all post cache fail, {}", e)); } } @@ -120,12 +123,7 @@ pub struct PostCommentCache { } impl PostCommentCache { - pub fn init(pid: i32, rconn: &RdsConn) -> Self { - PostCommentCache { - key: format!("hole_v2:cache:post_comments:{}", pid), - rconn: rconn.clone(), - } - } + init!(i32, "hole_v2:cache:post_comments:{}"); pub async fn set(&mut self, cs: &Vec) { self.rconn @@ -179,22 +177,20 @@ pub struct PostListCommentCache { } impl PostListCommentCache { - pub async fn init(mode: u8, rconn: &RdsConn) -> Self { - let mut cacher = PostListCommentCache { + pub fn init(mode: u8, rconn: &RdsConn) -> Self { + Self { key: format!("hole_v2:cache:post_list:{}", &mode), mode: mode, rconn: rconn.clone(), length: 0, - }; - cacher.set_and_check_length().await; - cacher + } } async fn set_and_check_length(&mut self) { let mut l = self.rconn.zcard(&self.key).await.unwrap(); if l > MAX_LENGTH { self.rconn - .zremrangebyrank::<&String, ()>(&self.key, MIN_LENGTH, -1) + .zremrangebyrank::<&String, ()>(&self.key, MAX_LENGTH - CUT_LENGTH, -1) .await .unwrap_or_else(|e| { warn!("cut list cache failed, {}, {}", e, &self.key); @@ -204,7 +200,8 @@ impl PostListCommentCache { self.length = l; } - pub fn need_fill(&self) -> bool { + pub async fn need_fill(&mut self) -> bool { + self.set_and_check_length().await; self.length < MIN_LENGTH } @@ -244,7 +241,7 @@ impl PostListCommentCache { pub async fn put(&mut self, p: &Post) { // 其他都是加到最前面的,但热榜不是。可能导致MIN_LENGTH到MAX_LENGTH之间的数据不可靠 // 影响不大,先不管了 - if p.is_deleted || (self.mode > 0 && p.is_reported) { + if p.is_deleted || (self.mode > 0 && p.is_reported) { self.rconn.zrem(&self.key, p.id).await.unwrap_or_else(|e| { warn!( "remove from list cache failed, {} {} {}", @@ -286,12 +283,7 @@ pub struct UserCache { } impl UserCache { - pub fn init(token: &str, rconn: &RdsConn) -> Self { - UserCache { - key: format!("hole_v2:cache:user:{}", token), - rconn: rconn.clone(), - } - } + init!(&str, "hole_v2:cache:user:{}"); pub async fn set(&mut self, u: &User) { self.rconn diff --git a/src/main.rs b/src/main.rs index f1cb828..b06fb32 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,6 +66,7 @@ async fn main() -> Result<(), rocket::Error> { api::systemlog::get_systemlog, api::operation::delete, api::operation::report, + api::operation::block, ], ) .register( diff --git a/src/models.rs b/src/models.rs index afba01f..5b57a73 100644 --- a/src/models.rs +++ b/src/models.rs @@ -218,8 +218,8 @@ impl Post { start: i64, limit: i64, ) -> QueryResult> { - let mut cacher = PostListCommentCache::init(order_mode, &rconn).await; - if cacher.need_fill() { + let mut cacher = PostListCommentCache::init(order_mode, &rconn); + if cacher.need_fill().await { let pids = Self::_get_ids_by_page(db, order_mode.clone(), 0, cacher.i64_minlen()).await?; let ps = Self::get_multi(db, rconn, &pids).await?; @@ -327,7 +327,6 @@ impl Post { self.set_instance_cache(rconn), future::join_all((if is_new { 0..4 } else { 1..4 }).map(|mode| async move { PostListCommentCache::init(mode, &rconn.clone()) - .await .put(self) .await })), @@ -342,7 +341,7 @@ impl Post { .unwrap(); PostCache::init(&rconn).clear_all().await; - PostListCommentCache::init(2, rconn).await.clear().await + PostListCommentCache::init(2, rconn).clear().await } } diff --git a/src/rds_models.rs b/src/rds_models.rs index ea85c02..6c1035c 100644 --- a/src/rds_models.rs +++ b/src/rds_models.rs @@ -4,9 +4,31 @@ use redis::{AsyncCommands, RedisResult}; use rocket::serde::json::serde_json; use rocket::serde::{Deserialize, Serialize}; +macro_rules! init { + ($ktype:ty, $formatter:expr) => { + pub fn init(k: $ktype, rconn: &RdsConn) -> Self { + Self { + key: format!($formatter, k), + rconn: rconn.clone(), + } + } + }; + () => { + pub fn init(rconn: &RdsConn) -> Self { + Self { + rconn: rconn.clone(), + } + } + }; +} + const KEY_SYSTEMLOG: &str = "hole_v2:systemlog_list"; const KEY_BANNED_USERS: &str = "hole_v2:banned_user_hash_list"; +const KEY_BLOCKED_COUNTER: &str = "hole_v2:blocked_counter"; +const KEY_DANGEROUS_USERS: &str = "hole_thu:dangerous_users"; //兼容一下旧版 + const SYSTEMLOG_MAX_LEN: isize = 1000; +pub const BLOCK_THRESHOLD: i32 = 10; pub struct Attention { key: String, @@ -14,12 +36,7 @@ pub struct Attention { } impl Attention { - pub fn init(namehash: &str, rconn: &RdsConn) -> Self { - Attention { - key: format!("hole_v2:attention:{}", namehash), - rconn: rconn.clone(), - } - } + init!(&str, "hole_v2:attention:{}"); pub async fn add(&mut self, pid: i32) -> RedisResult<()> { self.rconn.sadd(&self.key, pid).await @@ -108,3 +125,62 @@ impl BannedUsers { rconn.clone().del(KEY_BANNED_USERS).await } } + +pub struct BlockedUsers { + pub key: String, + rconn: RdsConn, +} + +impl BlockedUsers { + init!(i32, "hole_v2:blocked_users:{}"); + + pub async fn add(&mut self, namehash: &str) -> RedisResult<()> { + self.rconn.sadd(&self.key, namehash).await + } + + pub async fn has(&mut self, namehash: &str) -> RedisResult { + self.rconn.sismember(&self.key, namehash).await + } + + pub async fn check_blocked( + rconn: &RdsConn, + viewer_id: Option, + viewer_hash: &str, + author_hash: &str, + ) -> RedisResult { + Ok(match viewer_id { + Some(id) => Self::init(id, rconn).has(author_hash).await?, + None => false, + } || (DangerousUser::has(rconn, author_hash).await? + && !DangerousUser::has(rconn,viewer_hash).await?)) + } +} + +pub struct BlockCounter; + +impl BlockCounter { + pub async fn count_incr(rconn: &RdsConn, namehash: &str) -> RedisResult { + rconn.clone().hincr(KEY_BLOCKED_COUNTER, namehash, 1).await + } + + pub async fn get_count(rconn: &RdsConn, namehash: &str) -> RedisResult { + rconn.clone().hget(KEY_BLOCKED_COUNTER, namehash).await + } +} + +pub struct DangerousUser; + +impl DangerousUser { + pub async fn add(rconn: &RdsConn, namehash: &str) -> RedisResult<()> { + rconn + .clone() + .sadd::<&str, &str, ()>(KEY_DANGEROUS_USERS, namehash) + .await + } + + pub async fn has(rconn: &RdsConn, namehash: &str) -> RedisResult { + rconn.clone().sismember(KEY_DANGEROUS_USERS, namehash).await + } +} + +pub(crate) use init;