Skip to content

Commit

Permalink
libsql-server: track slowest queries too
Browse files Browse the repository at this point in the history
This PoC adds another stats entry - slowest queries, since they
do not necessarily match the *most expensive* queries we count
right now.

Ideas to entertain:
 - tracking aggregated query time
 - tracking the number of times the query was received
     (to ignore oneshot hyperlong queries as less interesting than
      lots of fairly-long queries)

Example:
```
$ curl -s http://localhost:8081/v1/namespaces/default/stats | jq '.slowest_queries' | jtbl
  elapsed_ms  query
------------  ------------------------------
          44  SELECT count (*) FROM t;
          63  SELECT count (*) FROM t;
        3483  INSERT INTO t SELECT * FROM t;
```
  • Loading branch information
psarna committed Oct 24, 2023
1 parent af74c27 commit b037499
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
16 changes: 12 additions & 4 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ impl<W: WalHook> Connection<W> {
builder: &mut impl QueryResultBuilder,
) -> Result<(u64, Option<i64>)> {
tracing::trace!("executing query: {}", query.stmt.stmt);

let start = Instant::now();
let config = self.config_store.get();
let blocked = match query.stmt.kind {
StmtKind::Read | StmtKind::TxnBegin | StmtKind::Other => config.block_reads,
Expand Down Expand Up @@ -606,7 +606,7 @@ impl<W: WalHook> Connection<W> {

drop(qresult);

self.update_stats(query.stmt.stmt.clone(), &stmt);
self.update_stats(query.stmt.stmt.clone(), &stmt, Instant::now() - start);

Ok((affected_row_count, last_insert_rowid))
}
Expand Down Expand Up @@ -640,7 +640,7 @@ impl<W: WalHook> Connection<W> {
Ok(())
}

fn update_stats(&self, sql: String, stmt: &rusqlite::Statement) {
fn update_stats(&self, sql: String, stmt: &rusqlite::Statement, elapsed: Duration) {
let rows_read = stmt.get_status(StatementStatus::RowsRead);
let rows_written = stmt.get_status(StatementStatus::RowsWritten);
let rows_read = if rows_read == 0 && rows_written == 0 {
Expand All @@ -652,8 +652,16 @@ impl<W: WalHook> Connection<W> {
self.stats.inc_rows_written(rows_written as u64);
let weight = (rows_read + rows_written) as i64;
if self.stats.qualifies_as_top_query(weight) {
self.stats.add_top_query(crate::stats::TopQuery::new(
sql.clone(),
rows_read,
rows_written,
));
}
let elapsed = elapsed.as_millis() as u64;
if self.stats.qualifies_as_slowest_query(elapsed) {
self.stats
.add_top_query(crate::stats::TopQuery::new(sql, rows_read, rows_written));
.add_slowest_query(crate::stats::SlowestQuery::new(sql, elapsed));
}
}

Expand Down
10 changes: 9 additions & 1 deletion libsql-server/src/http/admin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::Json;

use crate::namespace::{MakeNamespace, NamespaceName};
use crate::replication::FrameNo;
use crate::stats::{Stats, TopQuery};
use crate::stats::{SlowestQuery, Stats, TopQuery};

use super::AppState;

Expand All @@ -19,6 +19,7 @@ pub struct StatsResponse {
pub write_requests_delegated: u64,
pub replication_index: FrameNo,
pub top_queries: Vec<TopQuery>,
pub slowest_queries: Vec<SlowestQuery>,
}

impl From<&Stats> for StatsResponse {
Expand All @@ -36,6 +37,13 @@ impl From<&Stats> for StatsResponse {
.iter()
.cloned()
.collect(),
slowest_queries: stats
.slowest_queries()
.read()
.unwrap()
.iter()
.cloned()
.collect(),
}
}
}
Expand Down
38 changes: 38 additions & 0 deletions libsql-server/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ impl TopQuery {
}
}

#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct SlowestQuery {
pub elapsed_ms: u64,
pub query: String,
}

impl SlowestQuery {
pub fn new(query: String, elapsed_ms: u64) -> Self {
Self { elapsed_ms, query }
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Stats {
#[serde(default)]
Expand All @@ -48,6 +60,11 @@ pub struct Stats {
top_query_threshold: AtomicI64,
#[serde(default)]
top_queries: Arc<RwLock<BTreeSet<TopQuery>>>,
// Lowest value in currently stored slowest queries
#[serde(default)]
slowest_query_threshold: AtomicU64,
#[serde(default)]
slowest_queries: Arc<RwLock<BTreeSet<SlowestQuery>>>,
}

impl Stats {
Expand Down Expand Up @@ -141,6 +158,27 @@ impl Stats {
pub(crate) fn top_queries(&self) -> &Arc<RwLock<BTreeSet<TopQuery>>> {
&self.top_queries
}

pub(crate) fn add_slowest_query(&self, query: SlowestQuery) {
let mut slowest_queries = self.slowest_queries.write().unwrap();
tracing::debug!("slowest query: {}: {}", query.elapsed_ms, query.query);
slowest_queries.insert(query);
if slowest_queries.len() > 10 {
slowest_queries.pop_first();
}
self.slowest_query_threshold.store(
slowest_queries.first().unwrap().elapsed_ms,
Ordering::Relaxed,
);
}

pub(crate) fn qualifies_as_slowest_query(&self, elapsed_ms: u64) -> bool {
elapsed_ms >= self.slowest_query_threshold.load(Ordering::Relaxed)
}

pub(crate) fn slowest_queries(&self) -> &Arc<RwLock<BTreeSet<SlowestQuery>>> {
&self.slowest_queries
}
}

async fn spawn_stats_persist_thread(stats: Weak<Stats>, path: PathBuf) -> anyhow::Result<()> {
Expand Down

0 comments on commit b037499

Please sign in to comment.