From 8e704cae9a1a00448fb8d75cc2d8723da8b11f1d Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 13 Aug 2024 14:54:57 -0700 Subject: [PATCH 1/4] upgrade to datafusion 41.0.0 --- Cargo.lock | 60 ++++++++++++------------- Cargo.toml | 9 +++- examples/examples/csv_streaming.rs | 3 +- examples/examples/kafka_rideshare.rs | 3 +- examples/examples/simple_aggregation.rs | 4 +- 5 files changed, 43 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9720ae0..ffeb915 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -796,8 +796,8 @@ dependencies = [ [[package]] name = "datafusion" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "ahash", "arrow", @@ -851,8 +851,8 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "arrow-schema", "async-trait", @@ -864,8 +864,8 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "ahash", "arrow", @@ -890,16 +890,16 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "tokio", ] [[package]] name = "datafusion-execution" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "arrow", "chrono", @@ -918,8 +918,8 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "ahash", "arrow", @@ -936,8 +936,8 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "arrow", "arrow-buffer", @@ -962,8 +962,8 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "ahash", "arrow", @@ -979,8 +979,8 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "arrow", "arrow-array", @@ -1000,8 +1000,8 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "arrow", "async-trait", @@ -1019,8 +1019,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "ahash", "arrow", @@ -1048,8 +1048,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "ahash", "arrow", @@ -1061,8 +1061,8 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "datafusion-common", "datafusion-execution", @@ -1072,8 +1072,8 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "ahash", "arrow", @@ -1105,8 +1105,8 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "40.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#7dc9d395099b4c3830852dbbf32f1882173332bb" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion#f5ef67c91bf1f17166fa63a5005acd628501e5b0" dependencies = [ "arrow", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index 38ca59c..8df5af0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ description = "Embeddable stream processing engine" [workspace.dependencies] df-streams-core = { path = "crates/core" } -datafusion = "40.0" +datafusion = "41.0.0" arrow = { version = "52.0.0", features = ["prettyprint"] } arrow-array = { version = "52.0.0", default-features = false, features = [ @@ -39,7 +39,12 @@ futures = "0.3" tracing = "0.1.40" tracing-log = "0.2.0" tracing-subscriber = "0.3.18" -tokio = { version = "1.36", features = ["macros", "rt", "sync", "rt-multi-thread"] } +tokio = { version = "1.36", features = [ + "macros", + "rt", + "sync", + "rt-multi-thread", +] } async-trait = "0.1.81" rdkafka = "0.36.2" log = "^0.4" diff --git a/examples/examples/csv_streaming.rs b/examples/examples/csv_streaming.rs index aee299f..b51930f 100644 --- a/examples/examples/csv_streaming.rs +++ b/examples/examples/csv_streaming.rs @@ -2,7 +2,8 @@ use datafusion::common::test_util::datafusion_test_data; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::MemTable; use datafusion::error::Result; -use datafusion::logical_expr::{col, max, min}; +use datafusion::functions_aggregate::expr_fn::{max, min}; +use datafusion::logical_expr::col; use datafusion::prelude::*; /// This example demonstrates executing a simple query against an Arrow data source (CSV) and diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index 90902bd..694b8af 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -1,7 +1,8 @@ use datafusion::error::Result; use datafusion::functions::core::expr_ext::FieldAccessor; use datafusion::functions_aggregate::count::count; -use datafusion::logical_expr::{col, max, min}; +use datafusion::functions_aggregate::expr_fn::{max, min}; +use datafusion::logical_expr::col; use df_streams_core::context::Context; use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 556192e..4097ea5 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -3,8 +3,8 @@ use std::time::Duration; use datafusion::error::Result; use datafusion::functions_aggregate::average::avg; use datafusion::functions_aggregate::count::count; -use datafusion::logical_expr::lit; -use datafusion::logical_expr::{col, max, min}; +use datafusion::functions_aggregate::expr_fn::{max, min}; +use datafusion::logical_expr::{col, lit}; use df_streams_core::context::Context; use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; From 7c74ca5ff0abbd023fd3007d5a0c876d397a5bac Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 13 Aug 2024 22:29:02 -0700 Subject: [PATCH 2/4] disable coalesce_batches --- crates/core/src/context.rs | 18 +++++++++++++----- crates/core/src/datastream.rs | 18 +++++++++++++++++- examples/examples/emit_measurements.rs | 2 +- examples/examples/simple_aggregation.rs | 3 +-- 4 files changed, 32 insertions(+), 9 deletions(-) diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index a261ea7..d48df98 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -16,15 +16,23 @@ use crate::utils::get_default_optimizer_rules; #[derive(Clone)] pub struct Context { - session_conext: Arc>, + pub session_conext: Arc>, } impl Context { pub fn new() -> Result { - let config = SessionConfig::new().set( - "datafusion.execution.batch_size", - datafusion::common::ScalarValue::UInt64(Some(32)), - ); + let config = SessionConfig::new() + .set( + "datafusion.execution.batch_size", + datafusion::common::ScalarValue::UInt64(Some(32)), + ) + // coalesce_batches slows down the pipeline and increases latency as it tries to concat + // small batches together so we disable it. + .set( + "datafusion.execution.coalesce_batches", + datafusion::common::ScalarValue::Boolean(Some(false)), + ); + let runtime = Arc::new(RuntimeEnv::default()); let state = SessionStateBuilder::new() diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 5e2159b..af877ea 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -9,6 +9,7 @@ use datafusion::execution::SendableRecordBatchStream; use datafusion::logical_expr::{ logical_plan::LogicalPlanBuilder, utils::find_window_exprs, Expr, JoinType, }; +use datafusion::physical_plan::display::DisplayableExecutionPlan; use crate::context::Context; use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; @@ -163,13 +164,28 @@ impl DataStream { Ok(self) } - /// Prints the underlying logical_plan. + /// Prints the underlying logical plan. /// Useful for debugging chained method calls. pub fn print_plan(self) -> Result { println!("{}", self.df.logical_plan().display_indent()); Ok(self) } + /// Prints the underlying physical plan. + /// Useful for debugging and development + pub async fn print_physical_plan(self) -> Result { + let (session_state, plan) = self.df.as_ref().clone().into_parts(); + let physical_plan = self.df.as_ref().clone().create_physical_plan().await?; + let displayable_plan = DisplayableExecutionPlan::new(physical_plan.as_ref()); + + println!("{}", displayable_plan.indent(true)); + + Ok(Self { + df: Arc::new(DataFrame::new(session_state, plan)), + context: self.context.clone(), + }) + } + /// execute the stream and write the results to a give kafka topic pub async fn sink_kafka( self, diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index 65a7198..736b81e 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -60,7 +60,7 @@ async fn main() -> Result<()> { .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; } } diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 4097ea5..4682819 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -41,7 +41,6 @@ async fn main() -> Result<()> { let ds = ctx .from_topic(source_topic) .await? - // .filter(col("reading").gt(lit(70)))? .window( vec![], vec![ @@ -53,7 +52,7 @@ async fn main() -> Result<()> { Duration::from_millis(1_000), None, )? - .filter(col("max").lt(lit(113)))?; + .filter(col("max").gt(lit(113)))?; ds.clone().print_stream().await?; From e774b20d48fed406db8375cd00237720998b890d Mon Sep 17 00:00:00 2001 From: Matt Green Date: Wed, 14 Aug 2024 16:59:32 -0700 Subject: [PATCH 3/4] change package version and update name --- Cargo.lock | 10 +++++----- Cargo.toml | 6 +++--- crates/core/Cargo.toml | 2 +- examples/Cargo.toml | 4 ++-- examples/examples/emit_measurements.rs | 2 +- examples/examples/kafka_rideshare.rs | 6 +++--- examples/examples/simple_aggregation.rs | 8 ++++---- examples/examples/stream_join.rs | 8 ++++---- 8 files changed, 23 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ffeb915..9091434 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1131,8 +1131,8 @@ dependencies = [ ] [[package]] -name = "df-streams-core" -version = "0.1.0" +name = "denormalized" +version = "0.0.1" dependencies = [ "apache-avro", "arrow", @@ -1159,13 +1159,13 @@ dependencies = [ ] [[package]] -name = "df-streams-examples" -version = "0.1.0" +name = "denormalized-examples" +version = "0.0.1" dependencies = [ "arrow", "arrow-schema", "datafusion", - "df-streams-core", + "denormalized", "env_logger", "futures", "log", diff --git a/Cargo.toml b/Cargo.toml index 8df5af0..6d3cc8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,12 +11,12 @@ edition = "2021" homepage = "https://github.com/probably-nothing-labs/denormalized" license = "Apache-2.0" readme = "README.md" -repository = "https://github.com/probably-nothing-labs/denormalized" -version = "0.1.0" +repository = "https://github.com/probably-nothing-labs/denormalized.git" +version = "0.0.1" description = "Embeddable stream processing engine" [workspace.dependencies] -df-streams-core = { path = "crates/core" } +denormalized = { path = "crates/core" } datafusion = "41.0.0" arrow = { version = "52.0.0", features = ["prettyprint"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 44d23e9..330f5f0 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "df-streams-core" +name = "denormalized" version = { workspace = true } edition = { workspace = true } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 551a0c2..dfc991b 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "df-streams-examples" +name = "denormalized-examples" version = { workspace = true } edition = { workspace = true } publish = false @@ -7,7 +7,7 @@ publish = false [dependencies] datafusion = { workspace = true } -df-streams-core = { workspace = true } +denormalized = { workspace = true } arrow = { workspace = true } arrow-schema = { workspace = true } diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index 736b81e..a0925b5 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -7,7 +7,7 @@ use rdkafka::config::ClientConfig; use rdkafka::producer::FutureRecord; use rdkafka::util::Timeout; -use df_streams_examples::Measurment; +use denormalized_examples::Measurment; /// This script emits test data to a kafka cluster /// diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index 694b8af..be9fa74 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -4,9 +4,9 @@ use datafusion::functions_aggregate::count::count; use datafusion::functions_aggregate::expr_fn::{max, min}; use datafusion::logical_expr::col; -use df_streams_core::context::Context; -use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; -use df_streams_core::physical_plan::utils::time::TimestampUnit; +use denormalized::context::Context; +use denormalized::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; +use denormalized::physical_plan::utils::time::TimestampUnit; use std::time::Duration; use tracing_subscriber::{fmt::format::FmtSpan, FmtSubscriber}; diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 4682819..c3cb337 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -6,11 +6,11 @@ use datafusion::functions_aggregate::count::count; use datafusion::functions_aggregate::expr_fn::{max, min}; use datafusion::logical_expr::{col, lit}; -use df_streams_core::context::Context; -use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; -use df_streams_core::physical_plan::utils::time::TimestampUnit; +use denormalized::context::Context; +use denormalized::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; +use denormalized::physical_plan::utils::time::TimestampUnit; -use df_streams_examples::get_sample_json; +use denormalized_examples::get_sample_json; /// Demonstrates a simple stream aggregate job on data generated via the `emit_measurements.rs` /// example script. diff --git a/examples/examples/stream_join.rs b/examples/examples/stream_join.rs index a9184fa..9b84f70 100644 --- a/examples/examples/stream_join.rs +++ b/examples/examples/stream_join.rs @@ -5,11 +5,11 @@ use datafusion::error::Result; use datafusion::functions_aggregate::average::avg; use datafusion::logical_expr::col; -use df_streams_core::context::Context; -use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; -use df_streams_core::physical_plan::utils::time::TimestampUnit; +use denormalized::context::Context; +use denormalized::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; +use denormalized::physical_plan::utils::time::TimestampUnit; -use df_streams_examples::get_sample_json; +use denormalized_examples::get_sample_json; /// Demonstrates a simple stream join on data generated via the `emit_measurements.rs` /// example script. From c3c211a2709d05982625a8b8dbe6b1d480e84757 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Wed, 14 Aug 2024 20:03:56 -0700 Subject: [PATCH 4/4] optimize emit measurements script --- examples/examples/emit_measurements.rs | 81 ++++++++++++++------------ 1 file changed, 45 insertions(+), 36 deletions(-) diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index a0925b5..dfe7abf 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -5,7 +5,6 @@ use std::time::{SystemTime, UNIX_EPOCH}; use rdkafka::config::ClientConfig; use rdkafka::producer::FutureRecord; -use rdkafka::util::Timeout; use denormalized_examples::Measurment; @@ -16,52 +15,62 @@ use denormalized_examples::Measurment; /// This data is read processed by other exmpales #[tokio::main] async fn main() -> Result<()> { - let mut rng = rand::thread_rng(); + let mut tasks = tokio::task::JoinSet::new(); let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", String::from("localhost:9092")) - .set("message.timeout.ms", "60000") + .set("message.timeout.ms", "100") .create() .expect("Producer creation error"); - let sensors = ["sensor_0", "sensor_1", "sensor_2", "sensor_3", "sensor_4"]; + for _ in 0..2048 { + let producer = producer.clone(); - loop { - let sensor_name = sensors.choose(&mut rng).unwrap().to_string(); + tasks.spawn(async move { + let sensors = ["sensor_0", "sensor_1", "sensor_2", "sensor_3", "sensor_4"]; - // Alternate between sending random temperature and humidity readings - let (topic, msg) = if rand::random::() < 0.4 { - ( - "temperature".to_string(), - serde_json::to_vec(&Measurment { - occurred_at_ms: get_timestamp_ms(), - sensor_name, - reading: rand::random::() * 115.0, - }) - .unwrap(), - ) - } else { - ( - "humidity".to_string(), - serde_json::to_vec(&Measurment { - occurred_at_ms: get_timestamp_ms(), - sensor_name, - reading: rand::random::(), - }) - .unwrap(), - ) - }; + loop { + let sensor_name = sensors.choose(&mut rand::thread_rng()).unwrap().to_string(); - producer - .send( - FutureRecord::<(), Vec>::to(topic.as_str()).payload(&msg), - Timeout::Never, - ) - .await - .unwrap(); + // Alternate between sending random temperature and humidity readings + let (topic, msg) = if rand::random::() < 0.4 { + ( + "temperature".to_string(), + serde_json::to_vec(&Measurment { + occurred_at_ms: get_timestamp_ms(), + sensor_name, + reading: rand::random::() * 115.0, + }) + .unwrap(), + ) + } else { + ( + "humidity".to_string(), + serde_json::to_vec(&Measurment { + occurred_at_ms: get_timestamp_ms(), + sensor_name, + reading: rand::random::(), + }) + .unwrap(), + ) + }; - tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + let _ = producer + .send_result(FutureRecord::<(), Vec>::to(topic.as_str()).payload(&msg)) + .unwrap() + .await + .unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + }); + } + + while let Some(res) = tasks.join_next().await { + let _ = res; } + + Ok(()) } fn get_timestamp_ms() -> u64 {