Browse Source

log sql query

master
hole-thu 3 years ago
parent
commit
b95129dccb
  1. 3
      Cargo.toml
  2. 148
      src/libs/diesel_logger.rs
  3. 1
      src/libs/mod.rs
  4. 13
      src/main.rs
  5. 48
      src/models.rs
  6. 11
      src/rds_models.rs

3
Cargo.toml

@ -14,6 +14,9 @@ chrono = { version="0.*", features =["serde"] }
rand = "0.*"
dotenv = "0.*"
sha2 = "0.*"
diesel_logger = "0.1.1"
log = "0.4.16"
env_logger = "0.9.0"
[dependencies.rocket_sync_db_pools]
version = "0.1.0-rc.1"

148
src/libs/diesel_logger.rs

@ -0,0 +1,148 @@
/*
* from https://github.com/shssoichiro/diesel-logger
* change Connection to &mut Connection
*/
use std::ops::Deref;
use std::time::{Duration, Instant};
use diesel::backend::{Backend, UsesAnsiSavepointSyntax};
use diesel::connection::{AnsiTransactionManager, SimpleConnection};
use diesel::debug_query;
use diesel::deserialize::QueryableByName;
use diesel::prelude::*;
use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
use diesel::sql_types::HasSqlType;
/// Wraps a diesel `Connection` to time and log each query using
/// the configured logger for the `log` crate.
///
/// Currently, this produces a `debug` log on every query,
/// an `info` on queries that take longer than 1 second,
/// and a `warn`ing on queries that take longer than 5 seconds.
/// These thresholds will be configurable in a future version.
pub struct LoggingConnection<'r, C: Connection>(&'r mut C);
impl<'r, C: Connection> LoggingConnection<'r, C> {
pub fn new(conn: &'r mut C) -> Self {
LoggingConnection(conn)
}
}
impl<'r, C: Connection> Deref for LoggingConnection<'r, C> {
type Target = C;
fn deref(&self) -> &Self::Target {
self.0
}
}
impl<'r, C> SimpleConnection for LoggingConnection<'r, C>
where
C: Connection + Send + 'static,
{
fn batch_execute(&self, query: &str) -> QueryResult<()> {
let start_time = Instant::now();
let result = self.0.batch_execute(query);
let duration = start_time.elapsed();
log_query(query, duration);
result
}
}
impl<C: Connection> Connection for LoggingConnection<'_, C>
where
C: Connection<TransactionManager = AnsiTransactionManager> + Send + 'static,
C::Backend: UsesAnsiSavepointSyntax,
<C::Backend as Backend>::QueryBuilder: Default,
{
type Backend = C::Backend;
type TransactionManager = C::TransactionManager;
fn establish(_: &str) -> ConnectionResult<Self> {
Err(ConnectionError::__Nonexhaustive)
//Ok(LoggingConnection(C::establish(database_url)?))
}
fn execute(&self, query: &str) -> QueryResult<usize> {
let start_time = Instant::now();
let result = self.0.execute(query);
let duration = start_time.elapsed();
log_query(query, duration);
result
}
fn query_by_index<T, U>(&self, source: T) -> QueryResult<Vec<U>>
where
T: AsQuery,
T::Query: QueryFragment<Self::Backend> + QueryId,
Self::Backend: HasSqlType<T::SqlType>,
U: Queryable<T::SqlType, Self::Backend>,
{
let query = source.as_query();
let debug_query = debug_query::<Self::Backend, _>(&query).to_string();
let start_time = Instant::now();
let result = self.0.query_by_index(query);
let duration = start_time.elapsed();
log_query(&debug_query, duration);
result
}
fn query_by_name<T, U>(&self, source: &T) -> QueryResult<Vec<U>>
where
T: QueryFragment<Self::Backend> + QueryId,
U: QueryableByName<Self::Backend>,
{
let debug_query = debug_query::<Self::Backend, _>(&source).to_string();
let start_time = Instant::now();
let result = self.0.query_by_name(source);
let duration = start_time.elapsed();
log_query(&debug_query, duration);
result
}
fn execute_returning_count<T>(&self, source: &T) -> QueryResult<usize>
where
T: QueryFragment<Self::Backend> + QueryId,
{
let debug_query = debug_query::<Self::Backend, _>(&source).to_string();
let start_time = Instant::now();
let result = self.0.execute_returning_count(source);
let duration = start_time.elapsed();
log_query(&debug_query, duration);
result
}
fn transaction_manager(&self) -> &Self::TransactionManager {
self.0.transaction_manager()
}
}
fn log_query(query: &str, duration: Duration) {
if duration.as_secs() >= 5 {
warn!(
"Slow query ran in {:.2} seconds: {}",
duration_to_secs(duration),
query
);
} else if duration.as_secs() >= 1 {
info!(
"Slow query ran in {:.2} seconds: {}",
duration_to_secs(duration),
query
);
} else {
debug!("Query ran in {:.1} ms: {}", duration_to_ms(duration), query);
}
}
const NANOS_PER_MILLI: u32 = 1_000_000;
const MILLIS_PER_SEC: u32 = 1_000;
fn duration_to_secs(duration: Duration) -> f32 {
duration_to_ms(duration) / MILLIS_PER_SEC as f32
}
fn duration_to_ms(duration: Duration) -> f32 {
(duration.as_secs() as u32 * 1000) as f32
+ (duration.subsec_nanos() as f32 / NANOS_PER_MILLI as f32)
}

1
src/libs/mod.rs

@ -0,0 +1 @@
pub mod diesel_logger;

13
src/main.rs

@ -4,21 +4,26 @@ extern crate rocket;
#[macro_use]
extern crate diesel;
#[macro_use]
extern crate log;
mod api;
mod db_conn;
mod rds_conn;
mod libs;
mod models;
mod rds_models;
mod random_hasher;
mod rds_conn;
mod rds_models;
mod schema;
use db_conn::Db;
use rds_conn::init_rds_client;
use random_hasher::RandomHasher;
use rds_conn::init_rds_client;
#[rocket::main]
async fn main() -> Result<(), rocket::Error> {
async fn main() -> Result<(), rocket::Error> {
load_env();
env_logger::init();
rocket::build()
.mount(
"/_api/v1",

48
src/models.rs

@ -1,6 +1,7 @@
#![allow(clippy::all)]
use crate::db_conn::Db;
use crate::libs::diesel_logger::LoggingConnection;
use crate::rds_conn::RdsConn;
use crate::rds_models::PostCache;
use crate::schema::*;
@ -17,7 +18,8 @@ macro_rules! get {
($table:ident) => {
pub async fn get(db: &Db, id: i32) -> QueryResult<Self> {
let pid = id;
db.run(move |c| $table::table.find(pid).first(c)).await
db.run(move |c| $table::table.find(pid).first(with_log!((c))))
.await
}
};
}
@ -31,7 +33,7 @@ macro_rules! get_multi {
.filter($table::id.eq_any(ids))
.filter($table::is_deleted.eq(false))
.order($table::id.desc())
.load(c)
.load(with_log!(c))
})
.await
}
@ -45,7 +47,7 @@ macro_rules! set_deleted {
db.run(move |c| {
diesel::update($table::table.find(pid))
.set($table::is_deleted.eq(true))
.execute(c)
.execute(with_log!(c))
})
.await
}
@ -60,6 +62,12 @@ macro_rules! base_query {
};
}
macro_rules! with_log {
($c: expr) => {
&LoggingConnection::new($c)
};
}
#[derive(Queryable, Insertable)]
pub struct Comment {
pub id: i32,
@ -152,7 +160,7 @@ impl Post {
_ => panic!("Wrong order mode!"),
};
query.offset(start).limit(limit).load(c)
query.offset(start).limit(limit).load(with_log!(c))
})
.await
}
@ -197,15 +205,19 @@ impl Post {
.order(posts::id.desc())
.offset(start)
.limit(limit)
.load(c)
.load(with_log!(c))
})
.await
}
pub async fn create(db: &Db, new_post: NewPost) -> QueryResult<Self> {
// TODO: tags
db.run(move |c| insert_into(posts::table).values(&new_post).get_result(c))
.await
db.run(move |c| {
insert_into(posts::table)
.values(&new_post)
.get_result(with_log!(c))
})
.await
}
pub async fn update_cw(&self, db: &Db, new_cw: String) -> QueryResult<usize> {
@ -213,7 +225,7 @@ impl Post {
db.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::cw.eq(new_cw))
.execute(c)
.execute(with_log!(c))
})
.await
}
@ -223,7 +235,7 @@ impl Post {
db.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::n_comments.eq(posts::n_comments + delta))
.get_result(c)
.get_result(with_log!(c))
})
.await
}
@ -233,7 +245,7 @@ impl Post {
db.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::n_attentions.eq(posts::n_attentions + delta))
.get_result(c)
.get_result(with_log!(c))
})
.await
}
@ -243,7 +255,7 @@ impl Post {
db.run(move |c| {
diesel::update(posts::table.find(pid))
.set(posts::hot_score.eq(posts::hot_score + delta))
.get_result(c)
.get_result(with_log!(c))
})
.await
}
@ -259,9 +271,13 @@ impl Post {
impl User {
pub async fn get_by_token(db: &Db, token: &str) -> Option<Self> {
let token = token.to_string();
db.run(move |c| users::table.filter(users::token.eq(token)).first(c))
.await
.ok()
db.run(move |c| {
users::table
.filter(users::token.eq(token))
.first(with_log!(c))
})
.await
.ok()
}
}
@ -284,7 +300,7 @@ impl Comment {
db.run(move |c| {
insert_into(comments::table)
.values(&new_comment)
.get_result(c)
.get_result(with_log!(c))
})
.await
}
@ -295,7 +311,7 @@ impl Comment {
comments::table
.filter(comments::post_id.eq(pid))
.order(comments::id)
.load(c)
.load(with_log!(c))
})
.await
}

11
src/rds_models.rs

@ -58,23 +58,26 @@ impl PostCache {
)
.await
.unwrap_or_else(|e| {
dbg!("set post cache failed", e, p.id);
warn!("set post cache failed: {}, {}", e, p.id);
})
}
pub async fn get(&mut self) -> Option<Post> {
let rds_result = self.rconn.get::<&String, String>(&self.key).await;
if let Ok(s) = rds_result {
dbg!("hint post cache", &s);
debug!("hint post cache: {}", &s);
self.rconn
.expire::<&String, bool>(&self.key, INSTANCE_EXPIRE_TIME)
.await
.unwrap_or_else(|e| {
dbg!("get post cache, set new expire failed", e, &self.key, &s);
warn!(
"get post cache, set new expire failed: {}, {}, {} ",
e, &self.key, &s
);
false
});
serde_json::from_str(&s).unwrap_or_else(|e| {
dbg!("get post cache failed", e, s);
warn!("get post cache, decode failed {}, {}", e, s);
None
})
} else {

Loading…
Cancel
Save