Browse Source

feat: attention & redis & async

master
hole-thu 3 years ago
parent
commit
58eb7aba6f
  1. 15
      Cargo.toml
  2. 3
      migrations/2022-03-11-065048_create_posts/up.sql
  3. 3
      migrations/2022-03-15-104943_create_comments/up.sql
  4. 63
      src/api/attention.rs
  5. 67
      src/api/comment.rs
  6. 64
      src/api/mod.rs
  7. 23
      src/api/operation.rs
  8. 117
      src/api/post.rs
  9. 4
      src/api/systemlog.rs
  10. 37
      src/db_conn.rs
  11. 17
      src/main.rs
  12. 166
      src/models.rs
  13. 42
      src/rds_conn.rs
  14. 32
      src/rds_models.rs
  15. 6
      src/schema.rs
  16. 47
      tools/migdb.py

15
Cargo.toml

@ -8,8 +8,13 @@ license = "AGPL-3.0"
[dependencies] [dependencies]
rocket = { version = "0.5.0-rc.1", features = ["json"] } rocket = { version = "0.5.0-rc.1", features = ["json"] }
diesel = { version = "1.4.8", features= ["sqlite", "chrono", "r2d2"] } diesel = { version = "1.4.8", features = ["sqlite", "chrono", "r2d2"] }
chrono = { version="0.4", features=["serde"] } redis = { version="0.21.5", features = ["aio", "async-std-comp"] }
rand = "0.8.5" chrono = { version="0.*", features =["serde"] }
dotenv = "0.15.0" rand = "0.*"
sha2 = "0.10.2" dotenv = "0.*"
sha2 = "0.*"
[dependencies.rocket_sync_db_pools]
version = "0.1.0-rc.1"
features = ["diesel_sqlite_pool"]

3
migrations/2022-03-11-065048_create_posts/up.sql

@ -6,7 +6,8 @@ CREATE TABLE posts (
content TEXT NOT NULL, content TEXT NOT NULL,
cw VARCHAR NOT NULL DEFAULT '', cw VARCHAR NOT NULL DEFAULT '',
author_title VARCHAR NOT NULL DEFAULT '', author_title VARCHAR NOT NULL DEFAULT '',
n_likes INTEGER NOT NULL DEFAULT 0, is_tmp BOOLEAN NOT NULL DEFAULT FALSE,
n_attentions INTEGER NOT NULL DEFAULT 0,
n_comments INTEGER NOT NULL DEFAULT 0, n_comments INTEGER NOT NULL DEFAULT 0,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_comment_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_comment_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

3
migrations/2022-03-15-104943_create_comments/up.sql

@ -4,11 +4,12 @@ CREATE TABLE comments (
id INTEGER NOT NULL PRIMARY KEY, id INTEGER NOT NULL PRIMARY KEY,
author_hash VARCHAR NOT NULL, author_hash VARCHAR NOT NULL,
author_title VARCHAR(10) NOT NULL DEFAULT '', author_title VARCHAR(10) NOT NULL DEFAULT '',
is_tmp BOOLEAN NOT NULL DEFAULT FALSE,
content TEXT NOT NULL, content TEXT NOT NULL,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
is_deleted BOOLEAN NOT NULL DEFAULT FALSE, is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
post_id INTEGER NOT NULL, post_id INTEGER NOT NULL,
FOREIGN KEY(post_id) REFERENCES post(id) FOREIGN KEY(post_id) REFERENCES posts(id)
); );
CREATE INDEX comments_postId_idx ON comments (`post_id`); CREATE INDEX comments_postId_idx ON comments (`post_id`);

63
src/api/attention.rs

@ -0,0 +1,63 @@
use crate::api::post::ps2outputs;
use crate::api::{APIError, CurrentUser, MapToAPIError, PolicyError::*, API, UGC};
use crate::db_conn::Db;
use crate::models::*;
use crate::rds_conn::RdsConn;
use crate::rds_models::*;
use rocket::form::Form;
use rocket::serde::json::{json, Value};
#[derive(FromForm)]
pub struct AttentionInput {
pid: i32,
#[field(validate = range(0..2))]
switch: i32,
}
#[post("/attention", data = "<ai>")]
pub async fn attention_post(
ai: Form<AttentionInput>,
user: CurrentUser,
db: Db,
rconn: RdsConn,
) -> API<Value> {
user.id.ok_or_else(|| APIError::PcError(NotAllowed))?;
let p = Post::get(&db, ai.pid).await.m()?;
p.check_permission(&user, "r")?;
let mut att = Attention::init(&user.namehash, rconn);
let switch_to = ai.switch == 1;
let mut delta: i32 = 0;
if att.has(ai.pid).await.m()? != switch_to {
if switch_to {
att.add(ai.pid).await.m()?;
delta = 1;
} else {
att.remove(ai.pid).await.m()?;
delta = -1;
}
p.change_n_attentions(&db, delta).await.m()?;
}
Ok(json!({
"code": 0,
"attention": ai.switch == 1,
"n_attentions": p.n_attentions + delta,
// for old version frontend
"likenum": p.n_attentions + delta,
}))
}
#[get("/getattention")]
pub async fn get_attention(user: CurrentUser, db: Db, rconn: RdsConn) -> API<Value> {
let ids = Attention::init(&user.namehash, rconn.clone())
.all()
.await
.m()?;
let ps = Post::get_multi(&db, ids).await.m()?;
let ps_data = ps2outputs(&ps, &user, &db, rconn.clone()).await;
Ok(json!({
"code": 0,
"data": ps_data,
}))
}

67
src/api/comment.rs

@ -1,6 +1,8 @@
use crate::api::{APIError, CurrentUser, PolicyError::*, API}; use crate::api::{APIError, CurrentUser, MapToAPIError, PolicyError::*, API};
use crate::db_conn::DbConn; use crate::db_conn::Db;
use crate::models::*; use crate::models::*;
use crate::rds_conn::RdsConn;
use crate::rds_models::*;
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use rocket::form::Form; use rocket::form::Form;
use rocket::serde::{ use rocket::serde::{
@ -10,10 +12,10 @@ use rocket::serde::{
use std::collections::HashMap; use std::collections::HashMap;
#[derive(FromForm)] #[derive(FromForm)]
pub struct CommentInput<'r> { pub struct CommentInput {
pid: i32, pid: i32,
#[field(validate = len(1..4097))] #[field(validate = len(1..4097))]
text: &'r str, text: String,
use_title: Option<i8>, use_title: Option<i8>,
} }
@ -24,12 +26,13 @@ pub struct CommentOutput {
text: String, text: String,
can_del: bool, can_del: bool,
name_id: i32, name_id: i32,
is_tmp: bool,
create_time: NaiveDateTime, create_time: NaiveDateTime,
// for old version frontend // for old version frontend
timestamp: i64, timestamp: i64,
} }
pub fn c2output(p: &Post, cs: &Vec<Comment>, user: &CurrentUser) -> Vec<CommentOutput> { pub fn c2output<'r>(p: &'r Post, cs: &Vec<Comment>, user: &CurrentUser) -> Vec<CommentOutput> {
let mut hash2id = HashMap::<&String, i32>::from([(&p.author_hash, 0)]); let mut hash2id = HashMap::<&String, i32>::from([(&p.author_hash, 0)]);
cs.iter() cs.iter()
.filter_map(|c| { .filter_map(|c| {
@ -41,19 +44,16 @@ pub fn c2output(p: &Post, cs: &Vec<Comment>, user: &CurrentUser) -> Vec<CommentO
x x
} }
}; };
if false { if c.is_deleted {
// TODO: block // TODO: block
None None
} else { } else {
Some(CommentOutput { Some(CommentOutput {
cid: c.id, cid: c.id,
text: if c.is_deleted { text: format!("{}{}", if c.is_tmp { "[tmp]\n" } else { "" }, c.content),
"[已删除]".to_string()
} else {
c.content.to_string()
},
can_del: user.is_admin || c.author_hash == user.namehash, can_del: user.is_admin || c.author_hash == user.namehash,
name_id: name_id, name_id: name_id,
is_tmp: c.is_tmp,
create_time: c.create_time, create_time: c.create_time,
timestamp: c.create_time.timestamp(), timestamp: c.create_time.timestamp(),
}) })
@ -63,35 +63,52 @@ pub fn c2output(p: &Post, cs: &Vec<Comment>, user: &CurrentUser) -> Vec<CommentO
} }
#[get("/getcomment?<pid>")] #[get("/getcomment?<pid>")]
pub fn get_comment(pid: i32, user: CurrentUser, conn: DbConn) -> API<Value> { pub async fn get_comment(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> API<Value> {
let p = Post::get(&conn, pid).map_err(APIError::from_db)?; let p = Post::get(&db, pid).await.m()?;
if p.is_deleted { if p.is_deleted {
return Err(APIError::PcError(IsDeleted)); return Err(APIError::PcError(IsDeleted));
} }
let cs = p.get_comments(&conn).map_err(APIError::from_db)?; let pid = p.id;
let cs = Comment::gets_by_post_id(&db, pid).await.m()?;
let data = c2output(&p, &cs, &user);
Ok(json!({ Ok(json!({
"code": 0, "code": 0,
"data": c2output(&p, &cs, &user), "data": data,
"n_likes": p.n_likes, "n_attentions": p.n_attentions,
// for old version frontend // for old version frontend
"likenum": p.n_likes, "likenum": p.n_attentions,
"attention": Attention::init(&user.namehash, rconn.clone()).has(p.id).await.m()? ,
})) }))
} }
#[post("/docomment", data = "<ci>")] #[post("/docomment", data = "<ci>")]
pub fn add_comment(ci: Form<CommentInput>, user: CurrentUser, conn: DbConn) -> API<Value> { pub async fn add_comment(
let p = Post::get(&conn, ci.pid).map_err(APIError::from_db)?; ci: Form<CommentInput>,
user: CurrentUser,
db: Db,
rconn: RdsConn,
) -> API<Value> {
let p = Post::get(&db, ci.pid).await.m()?;
Comment::create( Comment::create(
&conn, &db,
NewComment { NewComment {
content: &ci.text, content: ci.text.to_string(),
author_hash: &user.namehash, author_hash: user.namehash.to_string(),
author_title: "", author_title: "".to_string(),
is_tmp: user.id.is_none(),
post_id: ci.pid, post_id: ci.pid,
}, },
) )
.map_err(APIError::from_db)?; .await
p.after_add_comment(&conn).map_err(APIError::from_db)?; .m()?;
p.change_n_comments(&db, 1).await.m()?;
// auto attention after comment
let mut att = Attention::init(&user.namehash, rconn);
if !att.has(p.id).await.m()? {
att.add(p.id).await.m()?;
p.change_n_attentions(&db, 1).await.m()?;
}
Ok(json!({ Ok(json!({
"code": 0 "code": 0
})) }))

64
src/api/mod.rs

@ -1,10 +1,11 @@
use crate::db_conn::{Conn, DbPool}; use crate::db_conn::Db;
use crate::models::*; use crate::models::*;
use crate::random_hasher::RandomHasher; use crate::random_hasher::RandomHasher;
use rocket::http::Status; use rocket::http::Status;
use rocket::outcome::try_outcome;
use rocket::request::{FromRequest, Outcome, Request}; use rocket::request::{FromRequest, Outcome, Request};
use rocket::response::{self, Responder}; use rocket::response::{self, Responder};
use rocket::serde::json::json; use rocket::serde::json::{json, Value};
#[catch(401)] #[catch(401)]
pub fn catch_401_error() -> &'static str { pub fn catch_401_error() -> &'static str {
@ -36,8 +37,8 @@ impl<'r> FromRequest<'r> for CurrentUser {
is_admin: false, is_admin: false,
}); });
} else { } else {
let conn = request.rocket().state::<DbPool>().unwrap().get().unwrap(); let db = try_outcome!(request.guard::<Db>().await);
if let Some(user) = User::get_by_token(&conn, token) { if let Some(user) = User::get_by_token(&db, token).await {
let namehash = rh.hash_with_salt(&user.name); let namehash = rh.hash_with_salt(&user.name);
cu = Some(CurrentUser { cu = Some(CurrentUser {
id: Some(user.id), id: Some(user.id),
@ -55,14 +56,17 @@ impl<'r> FromRequest<'r> for CurrentUser {
} }
} }
#[derive(Debug)]
pub enum PolicyError { pub enum PolicyError {
IsReported, IsReported,
IsDeleted, IsDeleted,
NotAllowed, NotAllowed,
} }
#[derive(Debug)]
pub enum APIError { pub enum APIError {
DbError(diesel::result::Error), DbError(diesel::result::Error),
RdsError(redis::RedisError),
PcError(PolicyError), PcError(PolicyError),
} }
@ -70,16 +74,26 @@ impl APIError {
fn from_db(err: diesel::result::Error) -> APIError { fn from_db(err: diesel::result::Error) -> APIError {
APIError::DbError(err) APIError::DbError(err)
} }
fn from_rds(err: redis::RedisError) -> APIError {
APIError::RdsError(err)
}
} }
impl<'r> Responder<'r, 'static> for APIError { impl<'r> Responder<'r, 'static> for APIError {
fn respond_to(self, req: &'r Request<'_>) -> response::Result<'static> { fn respond_to(self, req: &'r Request<'_>) -> response::Result<'static> {
dbg!(&self);
match self { match self {
APIError::DbError(e) => json!({ APIError::DbError(e) => json!({
"code": -1, "code": -1,
"msg": e.to_string() "msg": e.to_string()
}) })
.respond_to(req), .respond_to(req),
APIError::RdsError(e) => json!({
"code": -1,
"msg": e.to_string()
})
.respond_to(req),
APIError::PcError(e) => json!({ APIError::PcError(e) => json!({
"code": -1, "code": -1,
"msg": match e { "msg": match e {
@ -93,12 +107,35 @@ impl<'r> Responder<'r, 'static> for APIError {
} }
} }
pub type API<T> = Result<T, APIError>;
pub type JsonAPI = API<Value>;
pub trait MapToAPIError {
type Data;
fn m(self) -> API<Self::Data>;
}
impl<T> MapToAPIError for redis::RedisResult<T> {
type Data = T;
fn m(self) -> API<Self::Data> {
Ok(self.map_err(APIError::from_rds)?)
}
}
impl<T> MapToAPIError for diesel::QueryResult<T> {
type Data = T;
fn m(self) -> API<Self::Data> {
Ok(self.map_err(APIError::from_db)?)
}
}
#[rocket::async_trait]
pub trait UGC { pub trait UGC {
fn get_author_hash(&self) -> &str; fn get_author_hash(&self) -> &str;
fn get_is_deleted(&self) -> bool; fn get_is_deleted(&self) -> bool;
fn get_is_reported(&self) -> bool; fn get_is_reported(&self) -> bool;
fn extra_delete_condition(&self) -> bool; fn extra_delete_condition(&self) -> bool;
fn do_set_deleted(&self, conn: &Conn) -> API<()>; async fn do_set_deleted(&self, db: &Db) -> API<usize>;
fn check_permission(&self, user: &CurrentUser, mode: &str) -> API<()> { fn check_permission(&self, user: &CurrentUser, mode: &str) -> API<()> {
if user.is_admin { if user.is_admin {
return Ok(()); return Ok(());
@ -118,14 +155,15 @@ pub trait UGC {
Ok(()) Ok(())
} }
fn soft_delete(&self, user: &CurrentUser, conn: &Conn) -> API<()> { async fn soft_delete(&self, user: &CurrentUser, db: &Db) -> API<()> {
self.check_permission(user, "rwd")?; self.check_permission(user, "rwd")?;
self.do_set_deleted(conn)?; let _ = self.do_set_deleted(db).await?;
Ok(()) Ok(())
} }
} }
#[rocket::async_trait]
impl UGC for Post { impl UGC for Post {
fn get_author_hash(&self) -> &str { fn get_author_hash(&self) -> &str {
&self.author_hash &self.author_hash
@ -139,11 +177,12 @@ impl UGC for Post {
fn extra_delete_condition(&self) -> bool { fn extra_delete_condition(&self) -> bool {
self.n_comments == 0 self.n_comments == 0
} }
fn do_set_deleted(&self, conn: &Conn) -> API<()> { async fn do_set_deleted(&self, db: &Db) -> API<usize> {
self.set_deleted(conn).map_err(APIError::from_db) self.set_deleted(db).await.m()
} }
} }
#[rocket::async_trait]
impl UGC for Comment { impl UGC for Comment {
fn get_author_hash(&self) -> &str { fn get_author_hash(&self) -> &str {
&self.author_hash &self.author_hash
@ -157,8 +196,8 @@ impl UGC for Comment {
fn extra_delete_condition(&self) -> bool { fn extra_delete_condition(&self) -> bool {
true true
} }
fn do_set_deleted(&self, conn: &Conn) -> API<()> { async fn do_set_deleted(&self, db: &Db) -> API<usize> {
self.set_deleted(conn).map_err(APIError::from_db) self.set_deleted(db).await.m()
} }
} }
@ -168,8 +207,7 @@ macro_rules! look {
}; };
} }
pub type API<T> = Result<T, APIError>; pub mod attention;
pub mod comment; pub mod comment;
pub mod operation; pub mod operation;
pub mod post; pub mod post;

23
src/api/operation.rs

@ -1,27 +1,28 @@
use crate::api::{APIError, CurrentUser, PolicyError::*, API, UGC}; use crate::api::{APIError, CurrentUser, PolicyError::*, API, UGC, MapToAPIError};
use crate::db_conn::DbConn; use crate::db_conn::Db;
use crate::models::*; use crate::models::*;
use rocket::form::Form; use rocket::form::Form;
use rocket::serde::json::{json, Value}; use rocket::serde::json::{json, Value};
#[derive(FromForm)] #[derive(FromForm)]
pub struct DeleteInput<'r> { pub struct DeleteInput {
#[field(name = "type")] #[field(name = "type")]
id_type: &'r str, id_type: String,
id: i32, id: i32,
note: &'r str, note: String,
} }
#[post("/delete", data = "<di>")] #[post("/delete", data = "<di>")]
pub fn delete(di: Form<DeleteInput>, user: CurrentUser, conn: DbConn) -> API<Value> { pub async fn delete(di: Form<DeleteInput>, user: CurrentUser, db: Db) -> API<Value> {
match di.id_type { match di.id_type.as_str() {
"cid" => { "cid" => {
let c = Comment::get(&conn, di.id).map_err(APIError::from_db)?; let c = Comment::get(&db, di.id).await.m()?;
c.soft_delete(&user, &conn)?; c.soft_delete(&user, &db).await?;
} }
"pid" => { "pid" => {
let p = Post::get(&conn, di.id).map_err(APIError::from_db)?; let p = Post::get(&db, di.id).await.m()?;
p.soft_delete(&user, &conn)?; p.soft_delete(&user, &db).await?;
p.change_n_comments(&db, -1).await.m()?;
} }
_ => return Err(APIError::PcError(NotAllowed)), _ => return Err(APIError::PcError(NotAllowed)),
} }

117
src/api/post.rs

@ -1,20 +1,20 @@
use crate::api::comment::{c2output, CommentOutput}; use crate::api::comment::{c2output, CommentOutput};
use crate::api::{APIError, CurrentUser, PolicyError::*, API, UGC}; use crate::api::{APIError, CurrentUser, JsonAPI, MapToAPIError, PolicyError::*, UGC};
use crate::db_conn::DbConn; use crate::db_conn::Db;
use crate::models::*; use crate::models::*;
use crate::rds_conn::RdsConn;
use crate::rds_models::*;
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use rocket::form::Form; use rocket::form::Form;
use rocket::serde::{ use rocket::futures::future;
json::{json, Value}, use rocket::serde::{json::json, Serialize};
Serialize,
};
#[derive(FromForm)] #[derive(FromForm)]
pub struct PostInput<'r> { pub struct PostInput {
#[field(validate = len(1..4097))] #[field(validate = len(1..4097))]
text: &'r str, text: String,
#[field(validate = len(0..33))] #[field(validate = len(0..33))]
cw: &'r str, cw: String,
allow_search: Option<i8>, allow_search: Option<i8>,
use_title: Option<i8>, use_title: Option<i8>,
} }
@ -26,7 +26,8 @@ pub struct PostOutput {
text: String, text: String,
cw: Option<String>, cw: Option<String>,
custom_title: Option<String>, custom_title: Option<String>,
n_likes: i32, is_tmp: bool,
n_attentions: i32,
n_comments: i32, n_comments: i32,
create_time: NaiveDateTime, create_time: NaiveDateTime,
last_comment_time: NaiveDateTime, last_comment_time: NaiveDateTime,
@ -34,6 +35,7 @@ pub struct PostOutput {
is_reported: Option<bool>, is_reported: Option<bool>,
comments: Option<Vec<CommentOutput>>, comments: Option<Vec<CommentOutput>>,
can_del: bool, can_del: bool,
attention: bool,
// for old version frontend // for old version frontend
timestamp: i64, timestamp: i64,
likenum: i32, likenum: i32,
@ -41,23 +43,22 @@ pub struct PostOutput {
} }
#[derive(FromForm)] #[derive(FromForm)]
pub struct CwInput<'r> { pub struct CwInput {
pid: i32, pid: i32,
#[field(validate = len(0..33))] #[field(validate = len(0..33))]
cw: &'r str, cw: String,
} }
fn p2output(p: &Post, user: &CurrentUser, conn: &DbConn) -> PostOutput { async fn p2output(p: &Post, user: &CurrentUser, db: &Db, rconn: RdsConn) -> PostOutput {
PostOutput { PostOutput {
pid: p.id, pid: p.id,
text: p.content.to_string(), text: format!("{}{}", if p.is_tmp { "[tmp]\n" } else { "" }, p.content),
cw: if p.cw.len() > 0 { cw: if p.cw.len() > 0 {
Some(p.cw.to_string()) Some(p.cw.to_string())
} else { } else {
None None
}, },
n_likes: p.n_likes, n_attentions: p.n_attentions,
n_comments: p.n_comments, n_comments: p.n_comments,
create_time: p.create_time, create_time: p.create_time,
last_comment_time: p.last_comment_time, last_comment_time: p.last_comment_time,
@ -67,6 +68,7 @@ fn p2output(p: &Post, user: &CurrentUser, conn: &DbConn) -> PostOutput {
} else { } else {
None None
}, },
is_tmp: p.is_tmp,
is_reported: if user.is_admin { is_reported: if user.is_admin {
Some(p.is_reported) Some(p.is_reported)
} else { } else {
@ -76,34 +78,59 @@ fn p2output(p: &Post, user: &CurrentUser, conn: &DbConn) -> PostOutput {
None None
} else { } else {
// 单个洞还有查询评论的接口,这里挂了不用报错 // 单个洞还有查询评论的接口,这里挂了不用报错
Some(c2output(p, &p.get_comments(conn).unwrap_or(vec![]), user)) let pid = p.id;
if let Some(cs) = Comment::gets_by_post_id(db, pid).await.ok() {
Some(c2output(p, &cs, user))
} else {
None
}
}, },
can_del: user.is_admin || p.author_hash == user.namehash, can_del: user.is_admin || p.author_hash == user.namehash,
attention: Attention::init(&user.namehash, rconn.clone())
.has(p.id)
.await
.unwrap_or_default(),
// for old version frontend // for old version frontend
timestamp: p.create_time.timestamp(), timestamp: p.create_time.timestamp(),
likenum: p.n_likes, likenum: p.n_attentions,
reply: p.n_comments, reply: p.n_comments,
} }
} }
pub async fn ps2outputs(
ps: &Vec<Post>,
user: &CurrentUser,
db: &Db,
rconn: RdsConn,
) -> Vec<PostOutput> {
future::join_all(
ps.iter()
.map(|p| async { p2output(p, &user, &db, rconn.clone()).await }),
)
.await
}
#[get("/getone?<pid>")] #[get("/getone?<pid>")]
pub fn get_one(pid: i32, user: CurrentUser, conn: DbConn) -> API<Value> { pub async fn get_one(pid: i32, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI {
let p = Post::get(&conn, pid).map_err(APIError::from_db)?; let p = Post::get(&db, pid).await.m()?;
p.check_permission(&user, "ro")?; p.check_permission(&user, "ro")?;
Ok(json!({ Ok(json!({
"data": p2output(&p, &user, &conn), "data": p2output(&p, &user,&db, rconn).await,
"code": 0, "code": 0,
})) }))
} }
#[get("/getlist?<p>&<order_mode>")] #[get("/getlist?<p>&<order_mode>")]
pub fn get_list(p: Option<u32>, order_mode: u8, user: CurrentUser, conn: DbConn) -> API<Value> { pub async fn get_list(
p: Option<u32>,
order_mode: u8,
user: CurrentUser,
db: Db,
rconn: RdsConn,
) -> JsonAPI {
let page = p.unwrap_or(1); let page = p.unwrap_or(1);
let ps = Post::gets_by_page(&conn, order_mode, page, 25).map_err(APIError::from_db)?; let ps = Post::gets_by_page(&db, order_mode, page, 25).await.m()?;
let ps_data = ps let ps_data = ps2outputs(&ps, &user, &db, rconn.clone()).await;
.iter()
.map(|p| p2output(p, &user, &conn))
.collect::<Vec<PostOutput>>();
Ok(json!({ Ok(json!({
"data": ps_data, "data": ps_data,
"count": ps_data.len(), "count": ps_data.len(),
@ -112,19 +139,22 @@ pub fn get_list(p: Option<u32>, order_mode: u8, user: CurrentUser, conn: DbConn)
} }
#[post("/dopost", data = "<poi>")] #[post("/dopost", data = "<poi>")]
pub fn publish_post(poi: Form<PostInput>, user: CurrentUser, conn: DbConn) -> API<Value> { pub async fn publish_post(poi: Form<PostInput>, user: CurrentUser, db: Db) -> JsonAPI {
dbg!(poi.use_title, poi.allow_search);
let r = Post::create( let r = Post::create(
&conn, &db,
NewPost { NewPost {
content: &poi.text, content: poi.text.to_string(),
cw: &poi.cw, cw: poi.cw.to_string(),
author_hash: &user.namehash, author_hash: user.namehash.to_string(),
author_title: "", author_title: "".to_string(),
is_tmp: user.id.is_none(),
n_attentions: 1,
allow_search: poi.allow_search.is_some(), allow_search: poi.allow_search.is_some(),
}, },
) )
.map_err(APIError::from_db)?; .await
.m()?;
// TODO: attention
Ok(json!({ Ok(json!({
"data": r, "data": r,
"code": 0 "code": 0
@ -132,12 +162,23 @@ pub fn publish_post(poi: Form<PostInput>, user: CurrentUser, conn: DbConn) -> AP
} }
#[post("/editcw", data = "<cwi>")] #[post("/editcw", data = "<cwi>")]
pub fn edit_cw(cwi: Form<CwInput>, user: CurrentUser, conn: DbConn) -> API<Value> { pub async fn edit_cw(cwi: Form<CwInput>, user: CurrentUser, db: Db) -> JsonAPI {
let p = Post::get(&conn, cwi.pid).map_err(APIError::from_db)?; let p = Post::get(&db, cwi.pid).await.m()?;
if !(user.is_admin || p.author_hash == user.namehash) { if !(user.is_admin || p.author_hash == user.namehash) {
return Err(APIError::PcError(NotAllowed)); return Err(APIError::PcError(NotAllowed));
} }
p.check_permission(&user, "w")?; p.check_permission(&user, "w")?;
_ = p.update_cw(&conn, cwi.cw); _ = p.update_cw(&db, cwi.cw.to_string()).await.m()?;
Ok(json!({"code": 0})) Ok(json!({"code": 0}))
} }
#[get("/getmulti?<pids>")]
pub async fn get_multi(pids: Vec<i32>, user: CurrentUser, db: Db, rconn: RdsConn) -> JsonAPI {
let ps = Post::get_multi(&db, pids).await.m()?;
let ps_data = ps2outputs(&ps, &user, &db, rconn.clone()).await;
Ok(json!({
"code": 0,
"data": ps_data,
}))
}

4
src/api/systemlog.rs

@ -2,10 +2,10 @@ use crate::api::{CurrentUser, API};
use crate::random_hasher::RandomHasher; use crate::random_hasher::RandomHasher;
use rocket::serde::json::{json, Value}; use rocket::serde::json::{json, Value};
use rocket::State; use rocket::State;
use crate::db_conn::DbConn; use crate::db_conn::Db;
#[get("/systemlog")] #[get("/systemlog")]
pub fn get_systemlog(user: CurrentUser, rh: &State<RandomHasher>, conn: DbConn) -> API<Value> { pub async fn get_systemlog(user: CurrentUser, rh: &State<RandomHasher>, db: Db) -> API<Value> {
Ok(json!({ Ok(json!({
"tmp_token": rh.get_tmp_token(), "tmp_token": rh.get_tmp_token(),
"salt": look!(rh.salt), "salt": look!(rh.salt),

37
src/db_conn.rs

@ -1,38 +1,7 @@
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection}; use rocket_sync_db_pools::{database, diesel};
use std::env;
use std::ops::Deref;
use rocket::http::Status;
use rocket::request::{FromRequest, Request, Outcome};
pub type Conn = diesel::SqliteConnection; pub type Conn = diesel::SqliteConnection;
pub type DbPool = Pool<ConnectionManager<Conn>>;
pub struct DbConn(pub PooledConnection<ConnectionManager<Conn>>);
#[rocket::async_trait] #[database("sqlite_v2")]
impl<'r> FromRequest<'r> for DbConn { pub struct Db(Conn);
type Error = ();
async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
let pool = request.rocket().state::<DbPool>().unwrap();
match pool.get() {
Ok(conn) => Outcome::Success(DbConn(conn)),
Err(_) => Outcome::Failure((Status::ServiceUnavailable, ())),
}
}
}
// For the convenience of using an &DbConn as an &Connection.
impl Deref for DbConn {
type Target = Conn;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub fn init_pool() -> DbPool {
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let manager = ConnectionManager::<Conn>::new(database_url);
Pool::builder()
.build(manager)
.expect("database poll init fail")
}

17
src/main.rs

@ -6,15 +6,18 @@ extern crate diesel;
mod api; mod api;
mod db_conn; mod db_conn;
mod rds_conn;
mod models; mod models;
mod rds_models;
mod random_hasher; mod random_hasher;
mod schema; mod schema;
use db_conn::init_pool; use db_conn::Db;
use rds_conn::init_rds_client;
use random_hasher::RandomHasher; use random_hasher::RandomHasher;
#[launch] #[rocket::main]
fn rocket() -> _ { async fn main() -> Result<(), rocket::Error> {
load_env(); load_env();
rocket::build() rocket::build()
.mount( .mount(
@ -26,13 +29,19 @@ fn rocket() -> _ {
api::post::get_one, api::post::get_one,
api::post::publish_post, api::post::publish_post,
api::post::edit_cw, api::post::edit_cw,
api::post::get_multi,
api::attention::attention_post,
api::attention::get_attention,
api::systemlog::get_systemlog, api::systemlog::get_systemlog,
api::operation::delete, api::operation::delete,
], ],
) )
.register("/_api", catchers![api::catch_401_error]) .register("/_api", catchers![api::catch_401_error])
.manage(RandomHasher::get_random_one()) .manage(RandomHasher::get_random_one())
.manage(init_pool()) .manage(init_rds_client().await)
.attach(Db::fairing())
.launch()
.await
} }
fn load_env() { fn load_env() {

166
src/models.rs

@ -1,30 +1,41 @@
#![allow(clippy::all)] #![allow(clippy::all)]
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use diesel::{insert_into, ExpressionMethods, QueryDsl, RunQueryDsl}; use diesel::{insert_into, ExpressionMethods, QueryDsl, QueryResult, RunQueryDsl};
use crate::db_conn::Conn; use crate::db_conn::Db;
use crate::schema::*; use crate::schema::*;
type MR<T> = Result<T, diesel::result::Error>;
no_arg_sql_function!(RANDOM, (), "Represents the sql RANDOM() function"); no_arg_sql_function!(RANDOM, (), "Represents the sql RANDOM() function");
macro_rules! get { macro_rules! get {
($table:ident) => { ($table:ident) => {
pub fn get(conn: &Conn, id: i32) -> MR<Self> { pub async fn get(db: &Db, id: i32) -> QueryResult<Self> {
$table::table.find(id).first(conn) let pid = id;
db.run(move |c| $table::table.find(pid).first(c)).await
}
};
}
macro_rules! get_multi {
($table:ident) => {
pub async fn get_multi(db: &Db, ids: Vec<i32>) -> QueryResult<Vec<Self>> {
db.run(move |c| $table::table.filter($table::id.eq_any(ids)).load(c))
.await
} }
}; };
} }
macro_rules! set_deleted { macro_rules! set_deleted {
($table:ident) => { ($table:ident) => {
pub fn set_deleted(&self, conn: &Conn) -> MR<()> { pub async fn set_deleted(&self, db: &Db) -> QueryResult<usize> {
diesel::update(self) let pid = self.id;
.set($table::is_deleted.eq(true)) db.run(move |c| {
.execute(conn)?; diesel::update($table::table.find(pid))
Ok(()) .set($table::is_deleted.eq(true))
.execute(c)
})
.await
} }
}; };
} }
@ -36,7 +47,8 @@ pub struct Post {
pub content: String, pub content: String,
pub cw: String, pub cw: String,
pub author_title: String, pub author_title: String,
pub n_likes: i32, pub is_tmp: bool,
pub n_attentions: i32,
pub n_comments: i32, pub n_comments: i32,
pub create_time: NaiveDateTime, pub create_time: NaiveDateTime,
pub last_comment_time: NaiveDateTime, pub last_comment_time: NaiveDateTime,
@ -48,11 +60,13 @@ pub struct Post {
#[derive(Insertable)] #[derive(Insertable)]
#[table_name = "posts"] #[table_name = "posts"]
pub struct NewPost<'a> { pub struct NewPost {
pub content: &'a str, pub content: String,
pub cw: &'a str, pub cw: String,
pub author_hash: &'a str, pub author_hash: String,
pub author_title: &'a str, pub author_title: String,
pub is_tmp: bool,
pub n_attentions: i32,
pub allow_search: bool, pub allow_search: bool,
// TODO: tags // TODO: tags
} }
@ -60,48 +74,72 @@ pub struct NewPost<'a> {
impl Post { impl Post {
get!(posts); get!(posts);
set_deleted!(posts); get_multi!(posts);
pub fn gets_by_page(conn: &Conn, order_mode: u8, page: u32, page_size: u32) -> MR<Vec<Self>> { set_deleted!(posts);
let mut query = posts::table.into_boxed();
query = query.filter(posts::is_deleted.eq(false));
if order_mode > 0 {
query = query.filter(posts::is_reported.eq(false))
}
match order_mode { pub async fn gets_by_page(
1 => query = query.order(posts::last_comment_time.desc()), db: &Db,
2 => query = query.order(posts::hot_score.desc()), order_mode: u8,
3 => query = query.order(RANDOM), page: u32,
_ => query = query.order(posts::id.desc()), page_size: u32,
} ) -> QueryResult<Vec<Self>> {
query db.run(move |c| {
.offset(((page - 1) * page_size).into()) let mut query = posts::table.into_boxed();
.limit(page_size.into()) query = query.filter(posts::is_deleted.eq(false));
.load(conn) if order_mode > 0 {
query = query.filter(posts::is_reported.eq(false))
}
match order_mode {
1 => query = query.order(posts::last_comment_time.desc()),
2 => query = query.order(posts::hot_score.desc()),
3 => query = query.order(RANDOM),
_ => query = query.order(posts::id.desc()),
}
query
.offset(((page - 1) * page_size).into())
.limit(page_size.into())
.load(c)
})
.await
} }
pub fn get_comments(&self, conn: &Conn) -> MR<Vec<Comment>> { pub async fn create(db: &Db, new_post: NewPost) -> QueryResult<usize> {
comments::table // TODO: tags
.filter(comments::post_id.eq(self.id)) db.run(move |c| insert_into(posts::table).values(&new_post).execute(c))
.load(conn) .await
} }
pub fn create(conn: &Conn, new_post: NewPost) -> MR<usize> { pub async fn update_cw(&self, db: &Db, new_cw: String) -> QueryResult<usize> {
// TODO: tags let pid = self.id;
insert_into(posts::table).values(&new_post).execute(conn) db.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::cw.eq(new_cw))
.execute(c)
})
.await
} }
pub fn update_cw(&self, conn: &Conn, new_cw: &str) -> MR<usize> { pub async fn change_n_comments(&self, db: &Db, delta: i32) -> QueryResult<usize> {
diesel::update(self).set(posts::cw.eq(new_cw)).execute(conn) let pid = self.id;
db.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::n_comments.eq(posts::n_comments + delta))
.execute(c)
})
.await
} }
pub fn after_add_comment(&self, conn: &Conn) -> MR<()> { pub async fn change_n_attentions(&self, db: &Db, delta: i32) -> QueryResult<usize> {
diesel::update(self) let pid = self.id;
.set(posts::n_comments.eq(posts::n_comments + 1)) db.run(move |c| {
.execute(conn)?; diesel::update(posts::table.find(pid))
// TODO: attention, hot_score .set(posts::n_attentions.eq(posts::n_attentions + delta))
Ok(()) .execute(c)
})
.await
} }
} }
@ -114,8 +152,11 @@ pub struct User {
} }
impl User { impl User {
pub fn get_by_token(conn: &Conn, token: &str) -> Option<Self> { pub async fn get_by_token(db: &Db, token: &str) -> Option<Self> {
users::table.filter(users::token.eq(token)).first(conn).ok() let token = token.to_string();
db.run(move |c| users::table.filter(users::token.eq(token)).first(c))
.await
.ok()
} }
} }
@ -124,6 +165,7 @@ pub struct Comment {
pub id: i32, pub id: i32,
pub author_hash: String, pub author_hash: String,
pub author_title: String, pub author_title: String,
pub is_tmp: bool,
pub content: String, pub content: String,
pub create_time: NaiveDateTime, pub create_time: NaiveDateTime,
pub is_deleted: bool, pub is_deleted: bool,
@ -132,10 +174,11 @@ pub struct Comment {
#[derive(Insertable)] #[derive(Insertable)]
#[table_name = "comments"] #[table_name = "comments"]
pub struct NewComment<'a> { pub struct NewComment {
pub content: &'a str, pub content: String,
pub author_hash: &'a str, pub author_hash: String,
pub author_title: &'a str, pub author_title: String,
pub is_tmp: bool,
pub post_id: i32, pub post_id: i32,
} }
@ -144,9 +187,14 @@ impl Comment {
set_deleted!(comments); set_deleted!(comments);
pub fn create(conn: &Conn, new_comment: NewComment) -> MR<usize> { pub async fn create(db: &Db, new_comment: NewComment) -> QueryResult<usize> {
insert_into(comments::table) db.run(move |c| insert_into(comments::table).values(&new_comment).execute(c))
.values(&new_comment) .await
.execute(conn) }
pub async fn gets_by_post_id(db: &Db, post_id: i32) -> QueryResult<Vec<Self>> {
let pid = post_id;
db.run(move |c| comments::table.filter(comments::post_id.eq(pid)).load(c))
.await
} }
} }

42
src/rds_conn.rs

@ -0,0 +1,42 @@
use redis::aio::MultiplexedConnection;
use rocket::request::{FromRequest, Outcome, Request};
use std::ops::{Deref, DerefMut};
use std::env;
pub struct RdsConn(pub MultiplexedConnection);
#[rocket::async_trait]
impl<'r> FromRequest<'r> for RdsConn {
type Error = ();
async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
let rconn = request.rocket().state::<MultiplexedConnection>().unwrap();
Outcome::Success(RdsConn(rconn.clone()))
}
}
impl Clone for RdsConn {
fn clone(&self) -> Self {
RdsConn(self.0.clone())
}
}
impl Deref for RdsConn {
type Target = MultiplexedConnection;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for RdsConn {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub async fn init_rds_client() -> MultiplexedConnection {
let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let client = redis::Client::open(redis_url).expect("connect to redis fail");
client.get_multiplexed_async_connection().await.unwrap()
}

32
src/rds_models.rs

@ -0,0 +1,32 @@
use crate::rds_conn::RdsConn;
use redis::{AsyncCommands, RedisResult};
pub struct Attention {
key: String,
rconn: RdsConn,
}
impl Attention {
pub fn init(namehash: &str, rconn: RdsConn) -> Self {
Attention {
key: format!("hole_v2:attention:{}", namehash),
rconn: rconn,
}
}
pub async fn add(&mut self, pid: i32) -> RedisResult<()> {
self.rconn.sadd(&self.key, pid).await
}
pub async fn remove(&mut self, pid: i32) -> RedisResult<()> {
self.rconn.srem(&self.key, pid).await
}
pub async fn has(&mut self, pid: i32) -> RedisResult<bool> {
self.rconn.sismember(&self.key, pid).await
}
pub async fn all(&mut self) -> RedisResult<Vec<i32>> {
self.rconn.smembers(&self.key).await
}
}

6
src/schema.rs

@ -3,6 +3,7 @@ table! {
id -> Integer, id -> Integer,
author_hash -> Text, author_hash -> Text,
author_title -> Text, author_title -> Text,
is_tmp -> Bool,
content -> Text, content -> Text,
create_time -> Timestamp, create_time -> Timestamp,
is_deleted -> Bool, is_deleted -> Bool,
@ -17,7 +18,8 @@ table! {
content -> Text, content -> Text,
cw -> Text, cw -> Text,
author_title -> Text, author_title -> Text,
n_likes -> Integer, is_tmp -> Bool,
n_attentions -> Integer,
n_comments -> Integer, n_comments -> Integer,
create_time -> Timestamp, create_time -> Timestamp,
last_comment_time -> Timestamp, last_comment_time -> Timestamp,
@ -37,6 +39,8 @@ table! {
} }
} }
joinable!(comments -> posts (post_id));
allow_tables_to_appear_in_same_query!( allow_tables_to_appear_in_same_query!(
comments, comments,
posts, posts,

47
tools/migdb.py

@ -23,8 +23,9 @@ def mig_post():
r[7] = datetime.fromtimestamp(r[7]) r[7] = datetime.fromtimestamp(r[7])
r[8] = datetime.fromtimestamp(r[8]) r[8] = datetime.fromtimestamp(r[8])
r[10] = r[10] or False # comment r[10] = r[10] or False # comment
r.insert(4, r[2].startswith('[tmp]\n'))
c_new.execute( c_new.execute(
'INSERT OR REPLACE INTO posts VALUES({})'.format(','.join(['?'] * 13)), 'INSERT OR REPLACE INTO posts VALUES({})'.format(','.join(['?'] * 14)),
r r
) )
db_new.commit() db_new.commit()
@ -42,27 +43,39 @@ def mig_user():
def mig_comment(): def mig_comment():
rs = c_old.execute( _start = 0
'SELECT id, name_hash, author_title, content, timestamp, deleted, post_id ' _step = 1000
'FROM comment' while True:
) print("comment loop...", _start)
for r in rs: rs = c_old.execute(
r = list(r) 'SELECT id, name_hash, author_title, content, timestamp, deleted, post_id '
r[2] = r[2] or '' 'FROM comment WHERE id > ? ORDER BY id LIMIT ?',
r[4] = datetime.fromtimestamp(r[4]) (_start, _step)
r[5] = r[5] or False
c_new.execute(
'INSERT OR REPLACE INTO comments VALUES({})'.format(','.join(['?'] * 7)),
r
) )
db_new.commit() r = None
for r in rs:
r = list(r)
r[2] = r[2] or ''
r[4] = datetime.fromtimestamp(r[4])
r[5] = r[5] or False
r.insert(2, r[3].startswith('[tmp]\n'))
c_new.execute(
'INSERT OR REPLACE INTO comments VALUES({})'.format(','.join(['?'] * 8)),
r
)
if not r:
break
db_new.commit()
_start = r[0]
if __name__ == '__main__': if __name__ == '__main__':
# mig_post() mig_post()
# mig_user() mig_user()
mig_comment() mig_comment()
pass
c_old.close() c_old.close()
c_new.close() c_new.close()

Loading…
Cancel
Save