diff --git a/Cargo.toml b/Cargo.toml index 680cb04..1504024 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,13 @@ license = "AGPL-3.0" [dependencies] rocket = { version = "0.5.0-rc.1", features = ["json"] } -diesel = { version = "1.4.8", features= ["sqlite", "chrono", "r2d2"] } -chrono = { version="0.4", features=["serde"] } -rand = "0.8.5" -dotenv = "0.15.0" -sha2 = "0.10.2" +diesel = { version = "1.4.8", features = ["sqlite", "chrono", "r2d2"] } +redis = { version="0.21.5", features = ["aio", "async-std-comp"] } +chrono = { version="0.*", features =["serde"] } +rand = "0.*" +dotenv = "0.*" +sha2 = "0.*" + +[dependencies.rocket_sync_db_pools] +version = "0.1.0-rc.1" +features = ["diesel_sqlite_pool"] diff --git a/migrations/2022-03-11-065048_create_posts/up.sql b/migrations/2022-03-11-065048_create_posts/up.sql index 9560e62..91a785a 100644 --- a/migrations/2022-03-11-065048_create_posts/up.sql +++ b/migrations/2022-03-11-065048_create_posts/up.sql @@ -6,7 +6,8 @@ CREATE TABLE posts ( content TEXT NOT NULL, cw VARCHAR NOT NULL DEFAULT '', author_title VARCHAR NOT NULL DEFAULT '', - n_likes INTEGER NOT NULL DEFAULT 0, + is_tmp BOOLEAN NOT NULL DEFAULT FALSE, + n_attentions INTEGER NOT NULL DEFAULT 0, n_comments INTEGER NOT NULL DEFAULT 0, create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_comment_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, diff --git a/migrations/2022-03-15-104943_create_comments/up.sql b/migrations/2022-03-15-104943_create_comments/up.sql index 2f87559..68fca2d 100644 --- a/migrations/2022-03-15-104943_create_comments/up.sql +++ b/migrations/2022-03-15-104943_create_comments/up.sql @@ -4,11 +4,12 @@ CREATE TABLE comments ( id INTEGER NOT NULL PRIMARY KEY, author_hash VARCHAR NOT NULL, author_title VARCHAR(10) NOT NULL DEFAULT '', + is_tmp BOOLEAN NOT NULL DEFAULT FALSE, content TEXT NOT NULL, create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, is_deleted BOOLEAN NOT NULL DEFAULT FALSE, post_id INTEGER NOT NULL, - FOREIGN KEY(post_id) REFERENCES post(id) + FOREIGN KEY(post_id) REFERENCES posts(id) ); CREATE INDEX comments_postId_idx ON comments (`post_id`); diff --git a/src/api/attention.rs b/src/api/attention.rs new file mode 100644 index 0000000..911eb9f --- /dev/null +++ b/src/api/attention.rs @@ -0,0 +1,63 @@ +use crate::api::post::ps2outputs; +use crate::api::{APIError, CurrentUser, MapToAPIError, PolicyError::*, API, UGC}; +use crate::db_conn::Db; +use crate::models::*; +use crate::rds_conn::RdsConn; +use crate::rds_models::*; +use rocket::form::Form; +use rocket::serde::json::{json, Value}; + +#[derive(FromForm)] +pub struct AttentionInput { + pid: i32, + #[field(validate = range(0..2))] + switch: i32, +} + +#[post("/attention", data = "")] +pub async fn attention_post( + ai: Form, + user: CurrentUser, + db: Db, + rconn: RdsConn, +) -> API { + user.id.ok_or_else(|| APIError::PcError(NotAllowed))?; + let p = Post::get(&db, ai.pid).await.m()?; + p.check_permission(&user, "r")?; + let mut att = Attention::init(&user.namehash, rconn); + let switch_to = ai.switch == 1; + let mut delta: i32 = 0; + if att.has(ai.pid).await.m()? != switch_to { + if switch_to { + att.add(ai.pid).await.m()?; + delta = 1; + } else { + att.remove(ai.pid).await.m()?; + delta = -1; + } + p.change_n_attentions(&db, delta).await.m()?; + } + + Ok(json!({ + "code": 0, + "attention": ai.switch == 1, + "n_attentions": p.n_attentions + delta, + // for old version frontend + "likenum": p.n_attentions + delta, + })) +} + +#[get("/getattention")] +pub async fn get_attention(user: CurrentUser, db: Db, rconn: RdsConn) -> API { + let ids = Attention::init(&user.namehash, rconn.clone()) + .all() + .await + .m()?; + let ps = Post::get_multi(&db, ids).await.m()?; + let ps_data = ps2outputs(&ps, &user, &db, rconn.clone()).await; + + Ok(json!({ + "code": 0, + "data": ps_data, + })) +} diff --git a/src/api/comment.rs b/src/api/comment.rs index 2efc735..b1d372e 100644 --- a/src/api/comment.rs +++ b/src/api/comment.rs @@ -1,6 +1,8 @@ -use crate::api::{APIError, CurrentUser, PolicyError::*, API}; -use crate::db_conn::DbConn; +use crate::api::{APIError, CurrentUser, MapToAPIError, PolicyError::*, API}; +use crate::db_conn::Db; use crate::models::*; +use crate::rds_conn::RdsConn; +use crate::rds_models::*; use chrono::NaiveDateTime; use rocket::form::Form; use rocket::serde::{ @@ -10,10 +12,10 @@ use rocket::serde::{ use std::collections::HashMap; #[derive(FromForm)] -pub struct CommentInput<'r> { +pub struct CommentInput { pid: i32, #[field(validate = len(1..4097))] - text: &'r str, + text: String, use_title: Option, } @@ -24,12 +26,13 @@ pub struct CommentOutput { text: String, can_del: bool, name_id: i32, + is_tmp: bool, create_time: NaiveDateTime, // for old version frontend timestamp: i64, } -pub fn c2output(p: &Post, cs: &Vec, user: &CurrentUser) -> Vec { +pub fn c2output<'r>(p: &'r Post, cs: &Vec, user: &CurrentUser) -> Vec { let mut hash2id = HashMap::<&String, i32>::from([(&p.author_hash, 0)]); cs.iter() .filter_map(|c| { @@ -41,19 +44,16 @@ pub fn c2output(p: &Post, cs: &Vec, user: &CurrentUser) -> Vec, user: &CurrentUser) -> Vec")] -pub fn get_comment(pid: i32, user: CurrentUser, conn: DbConn) -> API { - let p = Post::get(&conn, pid).map_err(APIError::from_db)?; +pub async fn get_comment(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> API { + let p = Post::get(&db, pid).await.m()?; if p.is_deleted { return Err(APIError::PcError(IsDeleted)); } - let cs = p.get_comments(&conn).map_err(APIError::from_db)?; + let pid = p.id; + let cs = Comment::gets_by_post_id(&db, pid).await.m()?; + let data = c2output(&p, &cs, &user); + Ok(json!({ "code": 0, - "data": c2output(&p, &cs, &user), - "n_likes": p.n_likes, + "data": data, + "n_attentions": p.n_attentions, // for old version frontend - "likenum": p.n_likes, + "likenum": p.n_attentions, + "attention": Attention::init(&user.namehash, rconn.clone()).has(p.id).await.m()? , })) } #[post("/docomment", data = "")] -pub fn add_comment(ci: Form, user: CurrentUser, conn: DbConn) -> API { - let p = Post::get(&conn, ci.pid).map_err(APIError::from_db)?; +pub async fn add_comment( + ci: Form, + user: CurrentUser, + db: Db, + rconn: RdsConn, +) -> API { + let p = Post::get(&db, ci.pid).await.m()?; Comment::create( - &conn, + &db, NewComment { - content: &ci.text, - author_hash: &user.namehash, - author_title: "", + content: ci.text.to_string(), + author_hash: user.namehash.to_string(), + author_title: "".to_string(), + is_tmp: user.id.is_none(), post_id: ci.pid, }, ) - .map_err(APIError::from_db)?; - p.after_add_comment(&conn).map_err(APIError::from_db)?; + .await + .m()?; + p.change_n_comments(&db, 1).await.m()?; + // auto attention after comment + let mut att = Attention::init(&user.namehash, rconn); + if !att.has(p.id).await.m()? { + att.add(p.id).await.m()?; + p.change_n_attentions(&db, 1).await.m()?; + } Ok(json!({ "code": 0 })) diff --git a/src/api/mod.rs b/src/api/mod.rs index 60eadf3..30b6faf 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,10 +1,11 @@ -use crate::db_conn::{Conn, DbPool}; +use crate::db_conn::Db; use crate::models::*; use crate::random_hasher::RandomHasher; use rocket::http::Status; +use rocket::outcome::try_outcome; use rocket::request::{FromRequest, Outcome, Request}; use rocket::response::{self, Responder}; -use rocket::serde::json::json; +use rocket::serde::json::{json, Value}; #[catch(401)] pub fn catch_401_error() -> &'static str { @@ -36,8 +37,8 @@ impl<'r> FromRequest<'r> for CurrentUser { is_admin: false, }); } else { - let conn = request.rocket().state::().unwrap().get().unwrap(); - if let Some(user) = User::get_by_token(&conn, token) { + let db = try_outcome!(request.guard::().await); + if let Some(user) = User::get_by_token(&db, token).await { let namehash = rh.hash_with_salt(&user.name); cu = Some(CurrentUser { id: Some(user.id), @@ -55,14 +56,17 @@ impl<'r> FromRequest<'r> for CurrentUser { } } +#[derive(Debug)] pub enum PolicyError { IsReported, IsDeleted, NotAllowed, } +#[derive(Debug)] pub enum APIError { DbError(diesel::result::Error), + RdsError(redis::RedisError), PcError(PolicyError), } @@ -70,16 +74,26 @@ impl APIError { fn from_db(err: diesel::result::Error) -> APIError { APIError::DbError(err) } + + fn from_rds(err: redis::RedisError) -> APIError { + APIError::RdsError(err) + } } impl<'r> Responder<'r, 'static> for APIError { fn respond_to(self, req: &'r Request<'_>) -> response::Result<'static> { + dbg!(&self); match self { APIError::DbError(e) => json!({ "code": -1, "msg": e.to_string() }) .respond_to(req), + APIError::RdsError(e) => json!({ + "code": -1, + "msg": e.to_string() + }) + .respond_to(req), APIError::PcError(e) => json!({ "code": -1, "msg": match e { @@ -93,12 +107,35 @@ impl<'r> Responder<'r, 'static> for APIError { } } +pub type API = Result; +pub type JsonAPI = API; + +pub trait MapToAPIError { + type Data; + fn m(self) -> API; +} + +impl MapToAPIError for redis::RedisResult { + type Data = T; + fn m(self) -> API { + Ok(self.map_err(APIError::from_rds)?) + } +} + +impl MapToAPIError for diesel::QueryResult { + type Data = T; + fn m(self) -> API { + Ok(self.map_err(APIError::from_db)?) + } +} + +#[rocket::async_trait] pub trait UGC { fn get_author_hash(&self) -> &str; fn get_is_deleted(&self) -> bool; fn get_is_reported(&self) -> bool; fn extra_delete_condition(&self) -> bool; - fn do_set_deleted(&self, conn: &Conn) -> API<()>; + async fn do_set_deleted(&self, db: &Db) -> API; fn check_permission(&self, user: &CurrentUser, mode: &str) -> API<()> { if user.is_admin { return Ok(()); @@ -118,14 +155,15 @@ pub trait UGC { Ok(()) } - fn soft_delete(&self, user: &CurrentUser, conn: &Conn) -> API<()> { + async fn soft_delete(&self, user: &CurrentUser, db: &Db) -> API<()> { self.check_permission(user, "rwd")?; - self.do_set_deleted(conn)?; + let _ = self.do_set_deleted(db).await?; Ok(()) } } +#[rocket::async_trait] impl UGC for Post { fn get_author_hash(&self) -> &str { &self.author_hash @@ -139,11 +177,12 @@ impl UGC for Post { fn extra_delete_condition(&self) -> bool { self.n_comments == 0 } - fn do_set_deleted(&self, conn: &Conn) -> API<()> { - self.set_deleted(conn).map_err(APIError::from_db) + async fn do_set_deleted(&self, db: &Db) -> API { + self.set_deleted(db).await.m() } } +#[rocket::async_trait] impl UGC for Comment { fn get_author_hash(&self) -> &str { &self.author_hash @@ -157,8 +196,8 @@ impl UGC for Comment { fn extra_delete_condition(&self) -> bool { true } - fn do_set_deleted(&self, conn: &Conn) -> API<()> { - self.set_deleted(conn).map_err(APIError::from_db) + async fn do_set_deleted(&self, db: &Db) -> API { + self.set_deleted(db).await.m() } } @@ -168,8 +207,7 @@ macro_rules! look { }; } -pub type API = Result; - +pub mod attention; pub mod comment; pub mod operation; pub mod post; diff --git a/src/api/operation.rs b/src/api/operation.rs index b9d51a1..25e978f 100644 --- a/src/api/operation.rs +++ b/src/api/operation.rs @@ -1,27 +1,28 @@ -use crate::api::{APIError, CurrentUser, PolicyError::*, API, UGC}; -use crate::db_conn::DbConn; +use crate::api::{APIError, CurrentUser, PolicyError::*, API, UGC, MapToAPIError}; +use crate::db_conn::Db; use crate::models::*; use rocket::form::Form; use rocket::serde::json::{json, Value}; #[derive(FromForm)] -pub struct DeleteInput<'r> { +pub struct DeleteInput { #[field(name = "type")] - id_type: &'r str, + id_type: String, id: i32, - note: &'r str, + note: String, } #[post("/delete", data = "")] -pub fn delete(di: Form, user: CurrentUser, conn: DbConn) -> API { - match di.id_type { +pub async fn delete(di: Form, user: CurrentUser, db: Db) -> API { + match di.id_type.as_str() { "cid" => { - let c = Comment::get(&conn, di.id).map_err(APIError::from_db)?; - c.soft_delete(&user, &conn)?; + let c = Comment::get(&db, di.id).await.m()?; + c.soft_delete(&user, &db).await?; } "pid" => { - let p = Post::get(&conn, di.id).map_err(APIError::from_db)?; - p.soft_delete(&user, &conn)?; + let p = Post::get(&db, di.id).await.m()?; + p.soft_delete(&user, &db).await?; + p.change_n_comments(&db, -1).await.m()?; } _ => return Err(APIError::PcError(NotAllowed)), } diff --git a/src/api/post.rs b/src/api/post.rs index c5d17c8..78192b9 100644 --- a/src/api/post.rs +++ b/src/api/post.rs @@ -1,20 +1,20 @@ use crate::api::comment::{c2output, CommentOutput}; -use crate::api::{APIError, CurrentUser, PolicyError::*, API, UGC}; -use crate::db_conn::DbConn; +use crate::api::{APIError, CurrentUser, JsonAPI, MapToAPIError, PolicyError::*, UGC}; +use crate::db_conn::Db; use crate::models::*; +use crate::rds_conn::RdsConn; +use crate::rds_models::*; use chrono::NaiveDateTime; use rocket::form::Form; -use rocket::serde::{ - json::{json, Value}, - Serialize, -}; +use rocket::futures::future; +use rocket::serde::{json::json, Serialize}; #[derive(FromForm)] -pub struct PostInput<'r> { +pub struct PostInput { #[field(validate = len(1..4097))] - text: &'r str, + text: String, #[field(validate = len(0..33))] - cw: &'r str, + cw: String, allow_search: Option, use_title: Option, } @@ -26,7 +26,8 @@ pub struct PostOutput { text: String, cw: Option, custom_title: Option, - n_likes: i32, + is_tmp: bool, + n_attentions: i32, n_comments: i32, create_time: NaiveDateTime, last_comment_time: NaiveDateTime, @@ -34,6 +35,7 @@ pub struct PostOutput { is_reported: Option, comments: Option>, can_del: bool, + attention: bool, // for old version frontend timestamp: i64, likenum: i32, @@ -41,23 +43,22 @@ pub struct PostOutput { } #[derive(FromForm)] -pub struct CwInput<'r> { +pub struct CwInput { pid: i32, #[field(validate = len(0..33))] - cw: &'r str, + cw: String, } -fn p2output(p: &Post, user: &CurrentUser, conn: &DbConn) -> PostOutput { +async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: RdsConn) -> PostOutput { PostOutput { pid: p.id, - text: p.content.to_string(), - + text: format!("{}{}", if p.is_tmp { "[tmp]\n" } else { "" }, p.content), cw: if p.cw.len() > 0 { Some(p.cw.to_string()) } else { None }, - n_likes: p.n_likes, + n_attentions: p.n_attentions, n_comments: p.n_comments, create_time: p.create_time, last_comment_time: p.last_comment_time, @@ -67,6 +68,7 @@ fn p2output(p: &Post, user: &CurrentUser, conn: &DbConn) -> PostOutput { } else { None }, + is_tmp: p.is_tmp, is_reported: if user.is_admin { Some(p.is_reported) } else { @@ -76,34 +78,59 @@ fn p2output(p: &Post, user: &CurrentUser, conn: &DbConn) -> PostOutput { None } else { // 单个洞还有查询评论的接口,这里挂了不用报错 - Some(c2output(p, &p.get_comments(conn).unwrap_or(vec![]), user)) + let pid = p.id; + if let Some(cs) = Comment::gets_by_post_id(db, pid).await.ok() { + Some(c2output(p, &cs, user)) + } else { + None + } }, can_del: user.is_admin || p.author_hash == user.namehash, + attention: Attention::init(&user.namehash, rconn.clone()) + .has(p.id) + .await + .unwrap_or_default(), // for old version frontend timestamp: p.create_time.timestamp(), - likenum: p.n_likes, + likenum: p.n_attentions, reply: p.n_comments, } } +pub async fn ps2outputs( + ps: &Vec, + user: &CurrentUser, + db: &Db, + rconn: RdsConn, +) -> Vec { + future::join_all( + ps.iter() + .map(|p| async { p2output(p, &user, &db, rconn.clone()).await }), + ) + .await +} + #[get("/getone?")] -pub fn get_one(pid: i32, user: CurrentUser, conn: DbConn) -> API { - let p = Post::get(&conn, pid).map_err(APIError::from_db)?; +pub async fn get_one(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI { + let p = Post::get(&db, pid).await.m()?; p.check_permission(&user, "ro")?; Ok(json!({ - "data": p2output(&p, &user, &conn), + "data": p2output(&p, &user,&db, rconn).await, "code": 0, })) } #[get("/getlist?

&")] -pub fn get_list(p: Option, order_mode: u8, user: CurrentUser, conn: DbConn) -> API { +pub async fn get_list( + p: Option, + order_mode: u8, + user: CurrentUser, + db: Db, + rconn: RdsConn, +) -> JsonAPI { let page = p.unwrap_or(1); - let ps = Post::gets_by_page(&conn, order_mode, page, 25).map_err(APIError::from_db)?; - let ps_data = ps - .iter() - .map(|p| p2output(p, &user, &conn)) - .collect::>(); + let ps = Post::gets_by_page(&db, order_mode, page, 25).await.m()?; + let ps_data = ps2outputs(&ps, &user, &db, rconn.clone()).await; Ok(json!({ "data": ps_data, "count": ps_data.len(), @@ -112,19 +139,22 @@ pub fn get_list(p: Option, order_mode: u8, user: CurrentUser, conn: DbConn) } #[post("/dopost", data = "")] -pub fn publish_post(poi: Form, user: CurrentUser, conn: DbConn) -> API { - dbg!(poi.use_title, poi.allow_search); +pub async fn publish_post(poi: Form, user: CurrentUser, db: Db) -> JsonAPI { let r = Post::create( - &conn, + &db, NewPost { - content: &poi.text, - cw: &poi.cw, - author_hash: &user.namehash, - author_title: "", + content: poi.text.to_string(), + cw: poi.cw.to_string(), + author_hash: user.namehash.to_string(), + author_title: "".to_string(), + is_tmp: user.id.is_none(), + n_attentions: 1, allow_search: poi.allow_search.is_some(), }, ) - .map_err(APIError::from_db)?; + .await + .m()?; + // TODO: attention Ok(json!({ "data": r, "code": 0 @@ -132,12 +162,23 @@ pub fn publish_post(poi: Form, user: CurrentUser, conn: DbConn) -> AP } #[post("/editcw", data = "")] -pub fn edit_cw(cwi: Form, user: CurrentUser, conn: DbConn) -> API { - let p = Post::get(&conn, cwi.pid).map_err(APIError::from_db)?; +pub async fn edit_cw(cwi: Form, user: CurrentUser, db: Db) -> JsonAPI { + let p = Post::get(&db, cwi.pid).await.m()?; if !(user.is_admin || p.author_hash == user.namehash) { return Err(APIError::PcError(NotAllowed)); } p.check_permission(&user, "w")?; - _ = p.update_cw(&conn, cwi.cw); + _ = p.update_cw(&db, cwi.cw.to_string()).await.m()?; Ok(json!({"code": 0})) } + +#[get("/getmulti?")] +pub async fn get_multi(pids: Vec, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI { + let ps = Post::get_multi(&db, pids).await.m()?; + let ps_data = ps2outputs(&ps, &user, &db, rconn.clone()).await; + + Ok(json!({ + "code": 0, + "data": ps_data, + })) +} diff --git a/src/api/systemlog.rs b/src/api/systemlog.rs index 8c879e8..a909a89 100644 --- a/src/api/systemlog.rs +++ b/src/api/systemlog.rs @@ -2,10 +2,10 @@ use crate::api::{CurrentUser, API}; use crate::random_hasher::RandomHasher; use rocket::serde::json::{json, Value}; use rocket::State; -use crate::db_conn::DbConn; +use crate::db_conn::Db; #[get("/systemlog")] -pub fn get_systemlog(user: CurrentUser, rh: &State, conn: DbConn) -> API { +pub async fn get_systemlog(user: CurrentUser, rh: &State, db: Db) -> API { Ok(json!({ "tmp_token": rh.get_tmp_token(), "salt": look!(rh.salt), diff --git a/src/db_conn.rs b/src/db_conn.rs index 642ae55..2a7ef2f 100644 --- a/src/db_conn.rs +++ b/src/db_conn.rs @@ -1,38 +1,7 @@ -use diesel::r2d2::{ConnectionManager, Pool, PooledConnection}; -use std::env; -use std::ops::Deref; -use rocket::http::Status; -use rocket::request::{FromRequest, Request, Outcome}; +use rocket_sync_db_pools::{database, diesel}; pub type Conn = diesel::SqliteConnection; -pub type DbPool = Pool>; -pub struct DbConn(pub PooledConnection>); -#[rocket::async_trait] -impl<'r> FromRequest<'r> for DbConn { - type Error = (); - async fn from_request(request: &'r Request<'_>) -> Outcome { - let pool = request.rocket().state::().unwrap(); - match pool.get() { - Ok(conn) => Outcome::Success(DbConn(conn)), - Err(_) => Outcome::Failure((Status::ServiceUnavailable, ())), - } - } -} +#[database("sqlite_v2")] +pub struct Db(Conn); -// For the convenience of using an &DbConn as an &Connection. -impl Deref for DbConn { - type Target = Conn; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -pub fn init_pool() -> DbPool { - let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let manager = ConnectionManager::::new(database_url); - Pool::builder() - .build(manager) - .expect("database poll init fail") -} diff --git a/src/main.rs b/src/main.rs index d249da2..935178f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,15 +6,18 @@ extern crate diesel; mod api; mod db_conn; +mod rds_conn; mod models; +mod rds_models; mod random_hasher; mod schema; -use db_conn::init_pool; +use db_conn::Db; +use rds_conn::init_rds_client; use random_hasher::RandomHasher; -#[launch] -fn rocket() -> _ { +#[rocket::main] +async fn main() -> Result<(), rocket::Error> { load_env(); rocket::build() .mount( @@ -26,13 +29,19 @@ fn rocket() -> _ { api::post::get_one, api::post::publish_post, api::post::edit_cw, + api::post::get_multi, + api::attention::attention_post, + api::attention::get_attention, api::systemlog::get_systemlog, api::operation::delete, ], ) .register("/_api", catchers![api::catch_401_error]) .manage(RandomHasher::get_random_one()) - .manage(init_pool()) + .manage(init_rds_client().await) + .attach(Db::fairing()) + .launch() + .await } fn load_env() { diff --git a/src/models.rs b/src/models.rs index 0d88f10..18c6066 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,30 +1,41 @@ #![allow(clippy::all)] use chrono::NaiveDateTime; -use diesel::{insert_into, ExpressionMethods, QueryDsl, RunQueryDsl}; +use diesel::{insert_into, ExpressionMethods, QueryDsl, QueryResult, RunQueryDsl}; -use crate::db_conn::Conn; +use crate::db_conn::Db; use crate::schema::*; -type MR = Result; - no_arg_sql_function!(RANDOM, (), "Represents the sql RANDOM() function"); macro_rules! get { ($table:ident) => { - pub fn get(conn: &Conn, id: i32) -> MR { - $table::table.find(id).first(conn) + pub async fn get(db: &Db, id: i32) -> QueryResult { + let pid = id; + db.run(move |c| $table::table.find(pid).first(c)).await + } + }; +} + +macro_rules! get_multi { + ($table:ident) => { + pub async fn get_multi(db: &Db, ids: Vec) -> QueryResult> { + db.run(move |c| $table::table.filter($table::id.eq_any(ids)).load(c)) + .await } }; } macro_rules! set_deleted { ($table:ident) => { - pub fn set_deleted(&self, conn: &Conn) -> MR<()> { - diesel::update(self) - .set($table::is_deleted.eq(true)) - .execute(conn)?; - Ok(()) + pub async fn set_deleted(&self, db: &Db) -> QueryResult { + let pid = self.id; + db.run(move |c| { + diesel::update($table::table.find(pid)) + .set($table::is_deleted.eq(true)) + .execute(c) + }) + .await } }; } @@ -36,7 +47,8 @@ pub struct Post { pub content: String, pub cw: String, pub author_title: String, - pub n_likes: i32, + pub is_tmp: bool, + pub n_attentions: i32, pub n_comments: i32, pub create_time: NaiveDateTime, pub last_comment_time: NaiveDateTime, @@ -48,11 +60,13 @@ pub struct Post { #[derive(Insertable)] #[table_name = "posts"] -pub struct NewPost<'a> { - pub content: &'a str, - pub cw: &'a str, - pub author_hash: &'a str, - pub author_title: &'a str, +pub struct NewPost { + pub content: String, + pub cw: String, + pub author_hash: String, + pub author_title: String, + pub is_tmp: bool, + pub n_attentions: i32, pub allow_search: bool, // TODO: tags } @@ -60,48 +74,72 @@ pub struct NewPost<'a> { impl Post { get!(posts); - set_deleted!(posts); + get_multi!(posts); - pub fn gets_by_page(conn: &Conn, order_mode: u8, page: u32, page_size: u32) -> MR> { - let mut query = posts::table.into_boxed(); - query = query.filter(posts::is_deleted.eq(false)); - if order_mode > 0 { - query = query.filter(posts::is_reported.eq(false)) - } + set_deleted!(posts); - match order_mode { - 1 => query = query.order(posts::last_comment_time.desc()), - 2 => query = query.order(posts::hot_score.desc()), - 3 => query = query.order(RANDOM), - _ => query = query.order(posts::id.desc()), - } - query - .offset(((page - 1) * page_size).into()) - .limit(page_size.into()) - .load(conn) + pub async fn gets_by_page( + db: &Db, + order_mode: u8, + page: u32, + page_size: u32, + ) -> QueryResult> { + db.run(move |c| { + let mut query = posts::table.into_boxed(); + query = query.filter(posts::is_deleted.eq(false)); + if order_mode > 0 { + query = query.filter(posts::is_reported.eq(false)) + } + + match order_mode { + 1 => query = query.order(posts::last_comment_time.desc()), + 2 => query = query.order(posts::hot_score.desc()), + 3 => query = query.order(RANDOM), + _ => query = query.order(posts::id.desc()), + } + + query + .offset(((page - 1) * page_size).into()) + .limit(page_size.into()) + .load(c) + }) + .await } - pub fn get_comments(&self, conn: &Conn) -> MR> { - comments::table - .filter(comments::post_id.eq(self.id)) - .load(conn) + pub async fn create(db: &Db, new_post: NewPost) -> QueryResult { + // TODO: tags + db.run(move |c| insert_into(posts::table).values(&new_post).execute(c)) + .await } - pub fn create(conn: &Conn, new_post: NewPost) -> MR { - // TODO: tags - insert_into(posts::table).values(&new_post).execute(conn) + pub async fn update_cw(&self, db: &Db, new_cw: String) -> QueryResult { + let pid = self.id; + db.run(move |c| { + diesel::update(posts::table.find(pid)) + .set(posts::cw.eq(new_cw)) + .execute(c) + }) + .await } - pub fn update_cw(&self, conn: &Conn, new_cw: &str) -> MR { - diesel::update(self).set(posts::cw.eq(new_cw)).execute(conn) + pub async fn change_n_comments(&self, db: &Db, delta: i32) -> QueryResult { + let pid = self.id; + db.run(move |c| { + diesel::update(posts::table.find(pid)) + .set(posts::n_comments.eq(posts::n_comments + delta)) + .execute(c) + }) + .await } - pub fn after_add_comment(&self, conn: &Conn) -> MR<()> { - diesel::update(self) - .set(posts::n_comments.eq(posts::n_comments + 1)) - .execute(conn)?; - // TODO: attention, hot_score - Ok(()) + pub async fn change_n_attentions(&self, db: &Db, delta: i32) -> QueryResult { + let pid = self.id; + db.run(move |c| { + diesel::update(posts::table.find(pid)) + .set(posts::n_attentions.eq(posts::n_attentions + delta)) + .execute(c) + }) + .await } } @@ -114,8 +152,11 @@ pub struct User { } impl User { - pub fn get_by_token(conn: &Conn, token: &str) -> Option { - users::table.filter(users::token.eq(token)).first(conn).ok() + pub async fn get_by_token(db: &Db, token: &str) -> Option { + let token = token.to_string(); + db.run(move |c| users::table.filter(users::token.eq(token)).first(c)) + .await + .ok() } } @@ -124,6 +165,7 @@ pub struct Comment { pub id: i32, pub author_hash: String, pub author_title: String, + pub is_tmp: bool, pub content: String, pub create_time: NaiveDateTime, pub is_deleted: bool, @@ -132,10 +174,11 @@ pub struct Comment { #[derive(Insertable)] #[table_name = "comments"] -pub struct NewComment<'a> { - pub content: &'a str, - pub author_hash: &'a str, - pub author_title: &'a str, +pub struct NewComment { + pub content: String, + pub author_hash: String, + pub author_title: String, + pub is_tmp: bool, pub post_id: i32, } @@ -144,9 +187,14 @@ impl Comment { set_deleted!(comments); - pub fn create(conn: &Conn, new_comment: NewComment) -> MR { - insert_into(comments::table) - .values(&new_comment) - .execute(conn) + pub async fn create(db: &Db, new_comment: NewComment) -> QueryResult { + db.run(move |c| insert_into(comments::table).values(&new_comment).execute(c)) + .await + } + + pub async fn gets_by_post_id(db: &Db, post_id: i32) -> QueryResult> { + let pid = post_id; + db.run(move |c| comments::table.filter(comments::post_id.eq(pid)).load(c)) + .await } } diff --git a/src/rds_conn.rs b/src/rds_conn.rs new file mode 100644 index 0000000..3405345 --- /dev/null +++ b/src/rds_conn.rs @@ -0,0 +1,42 @@ +use redis::aio::MultiplexedConnection; +use rocket::request::{FromRequest, Outcome, Request}; +use std::ops::{Deref, DerefMut}; +use std::env; + +pub struct RdsConn(pub MultiplexedConnection); + +#[rocket::async_trait] +impl<'r> FromRequest<'r> for RdsConn { + type Error = (); + async fn from_request(request: &'r Request<'_>) -> Outcome { + let rconn = request.rocket().state::().unwrap(); + Outcome::Success(RdsConn(rconn.clone())) + } +} + +impl Clone for RdsConn { + fn clone(&self) -> Self { + RdsConn(self.0.clone()) + } +} + +impl Deref for RdsConn { + type Target = MultiplexedConnection; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for RdsConn { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + + +pub async fn init_rds_client() -> MultiplexedConnection { + let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set"); + let client = redis::Client::open(redis_url).expect("connect to redis fail"); + client.get_multiplexed_async_connection().await.unwrap() +} diff --git a/src/rds_models.rs b/src/rds_models.rs new file mode 100644 index 0000000..5341aec --- /dev/null +++ b/src/rds_models.rs @@ -0,0 +1,32 @@ +use crate::rds_conn::RdsConn; +use redis::{AsyncCommands, RedisResult}; + +pub struct Attention { + key: String, + rconn: RdsConn, +} + +impl Attention { + pub fn init(namehash: &str, rconn: RdsConn) -> Self { + Attention { + key: format!("hole_v2:attention:{}", namehash), + rconn: rconn, + } + } + + pub async fn add(&mut self, pid: i32) -> RedisResult<()> { + self.rconn.sadd(&self.key, pid).await + } + + pub async fn remove(&mut self, pid: i32) -> RedisResult<()> { + self.rconn.srem(&self.key, pid).await + } + + pub async fn has(&mut self, pid: i32) -> RedisResult { + self.rconn.sismember(&self.key, pid).await + } + + pub async fn all(&mut self) -> RedisResult> { + self.rconn.smembers(&self.key).await + } +} diff --git a/src/schema.rs b/src/schema.rs index abe665a..7e7c600 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -3,6 +3,7 @@ table! { id -> Integer, author_hash -> Text, author_title -> Text, + is_tmp -> Bool, content -> Text, create_time -> Timestamp, is_deleted -> Bool, @@ -17,7 +18,8 @@ table! { content -> Text, cw -> Text, author_title -> Text, - n_likes -> Integer, + is_tmp -> Bool, + n_attentions -> Integer, n_comments -> Integer, create_time -> Timestamp, last_comment_time -> Timestamp, @@ -37,6 +39,8 @@ table! { } } +joinable!(comments -> posts (post_id)); + allow_tables_to_appear_in_same_query!( comments, posts, diff --git a/tools/migdb.py b/tools/migdb.py index 22dc389..9dd8327 100644 --- a/tools/migdb.py +++ b/tools/migdb.py @@ -23,8 +23,9 @@ def mig_post(): r[7] = datetime.fromtimestamp(r[7]) r[8] = datetime.fromtimestamp(r[8]) r[10] = r[10] or False # comment + r.insert(4, r[2].startswith('[tmp]\n')) c_new.execute( - 'INSERT OR REPLACE INTO posts VALUES({})'.format(','.join(['?'] * 13)), + 'INSERT OR REPLACE INTO posts VALUES({})'.format(','.join(['?'] * 14)), r ) db_new.commit() @@ -42,27 +43,39 @@ def mig_user(): def mig_comment(): - rs = c_old.execute( - 'SELECT id, name_hash, author_title, content, timestamp, deleted, post_id ' - 'FROM comment' - ) - for r in rs: - r = list(r) - r[2] = r[2] or '' - r[4] = datetime.fromtimestamp(r[4]) - r[5] = r[5] or False - c_new.execute( - 'INSERT OR REPLACE INTO comments VALUES({})'.format(','.join(['?'] * 7)), - r + _start = 0 + _step = 1000 + while True: + print("comment loop...", _start) + rs = c_old.execute( + 'SELECT id, name_hash, author_title, content, timestamp, deleted, post_id ' + 'FROM comment WHERE id > ? ORDER BY id LIMIT ?', + (_start, _step) ) - db_new.commit() + r = None + for r in rs: + r = list(r) + r[2] = r[2] or '' + r[4] = datetime.fromtimestamp(r[4]) + r[5] = r[5] or False + r.insert(2, r[3].startswith('[tmp]\n')) + c_new.execute( + 'INSERT OR REPLACE INTO comments VALUES({})'.format(','.join(['?'] * 8)), + r + ) + if not r: + break + db_new.commit() + + _start = r[0] if __name__ == '__main__': - # mig_post() - # mig_user() + mig_post() + mig_user() mig_comment() + pass + c_old.close() c_new.close() -