From cd72f0c3690e06f08c56daa063a448639cf0fe75 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Fri, 2 Aug 2024 16:22:08 -0700 Subject: [PATCH 1/5] Emit one kafka message per row in a recordbatch --- Cargo.toml | 4 +- .../core/src/datasource/kafka/topic_writer.rs | 21 ++--- crates/core/src/utils/mod.rs | 1 + crates/core/src/utils/row_encoder.rs | 90 +++++++++++++++++++ 4 files changed, 101 insertions(+), 15 deletions(-) create mode 100644 crates/core/src/utils/row_encoder.rs diff --git a/Cargo.toml b/Cargo.toml index 2c4d3e0..1e83231 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,10 @@ authors = [ "Amey Chaugule ", ] edition = "2021" -homepage = "https://github.com/probably-nothing-labs/df-streams" +homepage = "https://github.com/probably-nothing-labs/denormalized" license = "Apache-2.0" readme = "README.md" -repository = "https://github.com/probably-nothing-labs/df-streams" +repository = "https://github.com/probably-nothing-labs/denormalized" version = "0.1.0" description = "Embeddable stream processing engine" diff --git a/crates/core/src/datasource/kafka/topic_writer.rs b/crates/core/src/datasource/kafka/topic_writer.rs index 9d330da..70a8f6c 100644 --- a/crates/core/src/datasource/kafka/topic_writer.rs +++ b/crates/core/src/datasource/kafka/topic_writer.rs @@ -4,7 +4,6 @@ use std::fmt::{self, Debug}; use std::time::Duration; use std::{any::Any, sync::Arc}; -use arrow::json::LineDelimitedWriter; use arrow_schema::SchemaRef; use datafusion::catalog::Session; @@ -22,6 +21,7 @@ use rdkafka::producer::FutureProducer; use rdkafka::producer::FutureRecord; use super::KafkaWriteConfig; +use crate::utils::row_encoder::{JsonRowEncoder, RowEncoder}; // Used to createa kafka source pub struct TopicWriter(pub Arc); @@ -110,21 +110,16 @@ impl DataSink for KafkaSink { while let Some(batch) = data.next().await.transpose()? { row_count += batch.num_rows(); - if batch.num_rows() > 0 { - let buf = Vec::new(); - let mut writer = LineDelimitedWriter::new(buf); - writer.write_batches(&[&batch])?; - writer.finish()?; - let buf = writer.into_inner(); + let encoder = JsonRowEncoder {}; + let rows = encoder.encode(&batch)?; - let record = FutureRecord::<[u8], _>::to(topic).payload(&buf); + for row in rows { + let record = FutureRecord::<[u8], _>::to(topic).payload(&row); // .key(key.as_str()), - let _delivery_status = self - .producer - .send(record, Duration::from_secs(0)) - .await - .expect("Message not delivered"); + if let Err(msg) = self.producer.send(record, Duration::from_secs(0)).await { + tracing::error!("{}", msg.0); + } } } diff --git a/crates/core/src/utils/mod.rs b/crates/core/src/utils/mod.rs index f77961d..ce4f0b7 100644 --- a/crates/core/src/utils/mod.rs +++ b/crates/core/src/utils/mod.rs @@ -2,5 +2,6 @@ pub mod arrow_helpers; mod default_optimizer_rules; pub mod serialize; +pub mod row_encoder; pub use default_optimizer_rules::get_default_optimizer_rules; diff --git a/crates/core/src/utils/row_encoder.rs b/crates/core/src/utils/row_encoder.rs new file mode 100644 index 0000000..2144865 --- /dev/null +++ b/crates/core/src/utils/row_encoder.rs @@ -0,0 +1,90 @@ +use arrow::json::writer::{JsonFormat, Writer}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion_common::Result; + +pub trait RowEncoder { + fn encode(&self, batch: &RecordBatch) -> Result>>; +} + +#[derive(Debug, Default)] +// Formats json without any characting separating items. +pub struct NoDelimiter {} +impl JsonFormat for NoDelimiter {} +// writes rows as json without any character separating them +type JsonWriter = Writer; + +pub struct JsonRowEncoder {} + +impl JsonRowEncoder { + pub fn batch_to_json(&self, batch: &RecordBatch) -> Result> { + let buf = Vec::new(); + let mut writer = JsonWriter::new(buf); + writer.write(batch)?; + writer.finish()?; + let buf = writer.into_inner(); + + Ok(buf) + } +} + +impl RowEncoder for JsonRowEncoder { + fn encode(&self, batch: &RecordBatch) -> Result>> { + if batch.num_rows() == 0 { + return Ok(vec![]); + } + + // BufWriter uses a buffer size of 8KB + // We therefore double this and flush once we have more than 8KB + let mut buffer = Vec::with_capacity(batch.num_rows()); + + for i in 0..batch.num_rows() { + let row = batch.slice(i, 1); + buffer.push(self.batch_to_json(&row)?); + } + + Ok(buffer) + } +} + +#[cfg(test)] +mod tests { + use super::{JsonRowEncoder, RowEncoder}; + + use datafusion::arrow::array::{Int32Array, StringArray}; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::record_batch::RecordBatch; + use std::sync::Arc; + + #[test] + fn serialize_record_batch_to_json() { + // define a schema. + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Utf8, false), + Field::new("col2", DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), + Arc::new(Int32Array::from(vec![1, 10, 20, 100])), + ], + ) + .unwrap(); + + let encoder = JsonRowEncoder {}; + let buf = encoder.encode(&batch).unwrap(); + + let res: Vec<&[u8]> = vec![ + "{\"col1\":\"a\",\"col2\":1}", + "{\"col1\":\"b\",\"col2\":10}", + "{\"col1\":\"c\",\"col2\":20}", + "{\"col1\":\"d\",\"col2\":100}", + ] + .iter() + .map(|v| v.as_bytes()) + .collect::<_>(); + + assert_eq!(buf, res); + } +} From 203f25ae1e35756c3d6d443bb031d6755e435114 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Fri, 2 Aug 2024 16:59:59 -0700 Subject: [PATCH 2/5] fix formatting --- crates/core/src/utils/mod.rs | 2 +- crates/core/src/utils/row_encoder.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/utils/mod.rs b/crates/core/src/utils/mod.rs index ce4f0b7..73cef59 100644 --- a/crates/core/src/utils/mod.rs +++ b/crates/core/src/utils/mod.rs @@ -1,7 +1,7 @@ #[allow(dead_code)] pub mod arrow_helpers; mod default_optimizer_rules; -pub mod serialize; pub mod row_encoder; +pub mod serialize; pub use default_optimizer_rules::get_default_optimizer_rules; diff --git a/crates/core/src/utils/row_encoder.rs b/crates/core/src/utils/row_encoder.rs index 2144865..49ba86a 100644 --- a/crates/core/src/utils/row_encoder.rs +++ b/crates/core/src/utils/row_encoder.rs @@ -75,7 +75,7 @@ mod tests { let encoder = JsonRowEncoder {}; let buf = encoder.encode(&batch).unwrap(); - let res: Vec<&[u8]> = vec![ + let res: Vec<&[u8]> = [ "{\"col1\":\"a\",\"col2\":1}", "{\"col1\":\"b\",\"col2\":10}", "{\"col1\":\"c\",\"col2\":20}", From 3edfee4add13ccd8fa3a642c47b402d51e6794d1 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 5 Aug 2024 11:44:59 -0700 Subject: [PATCH 3/5] remove comment --- crates/core/src/utils/row_encoder.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/core/src/utils/row_encoder.rs b/crates/core/src/utils/row_encoder.rs index 49ba86a..fea5a8f 100644 --- a/crates/core/src/utils/row_encoder.rs +++ b/crates/core/src/utils/row_encoder.rs @@ -33,10 +33,7 @@ impl RowEncoder for JsonRowEncoder { return Ok(vec![]); } - // BufWriter uses a buffer size of 8KB - // We therefore double this and flush once we have more than 8KB let mut buffer = Vec::with_capacity(batch.num_rows()); - for i in 0..batch.num_rows() { let row = batch.slice(i, 1); buffer.push(self.batch_to_json(&row)?); From e76189fa1e1e7b9d32de18aefa3a614bef7b7ddd Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 5 Aug 2024 14:25:59 -0700 Subject: [PATCH 4/5] Update Readme --- .gitignore | 1 + README.md | 14 +++++++++----- docs/images/denormalized_dark.png | Bin 0 -> 14013 bytes docs/images/denormalized_logo.png | Bin 0 -> 15675 bytes 4 files changed, 10 insertions(+), 5 deletions(-) create mode 100644 docs/images/denormalized_dark.png create mode 100644 docs/images/denormalized_logo.png diff --git a/.gitignore b/.gitignore index 9026c77..a1ab9bc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target .vscode +.DS_Store diff --git a/README.md b/README.md index 2ec0b23..94c88ee 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,13 @@ -# Denormalized +

+ + Denormalized Logo + +

Denormalized is a fast embeddable stream processing engine built on Apache DataFusion. +It currently supports sourcing and sinking to kafka, windowed aggregations, and stream joins. -While this repo is very much a *work-in-progress*, we currently support windowed aggregations and joins on streams of data with a -connector available for Kafka. +This repo is still a *work-in-progress* and we are actively seeking design partners. If you have have a specific use-case you'd like to discuss please drop us a line via a github issue or email hello@denormalized.io. ## Building Denormalized @@ -19,5 +23,5 @@ With the data generation in place, run - ## Credits -Denormalized is built and maintained by [Denormalized Inc](www.denormalized.io) from San Francisco. Please drop in a line to -hello@denormalized.io or simply open up a GitHub Issue. +Denormalized is built and maintained by [Denormalized](https://www.denormalized.io) in San Francisco. Please drop in a line to +hello@denormalized.io or simply open up a GitHub Issue! diff --git a/docs/images/denormalized_dark.png b/docs/images/denormalized_dark.png new file mode 100644 index 0000000000000000000000000000000000000000..1c3e188b8f31462483c985182b5422cdbe95ad22 GIT binary patch literal 14013 zcmeIZ_gB-)7d8qg7U0MMktRh21SttcR7yZR(wm?J2vs_v2}lhA)T4r-NC&A3NDYC| zrKv>_w4=bXQH1NY5&E-&%(gK z@RzR6V^ao(3$pa~;iU`oclM4e-1LvjFLW&Z85mfv{&_Jne9Gdb|H$ZXs{M$eVn|?} z{^h)@hM@)nLsdNM@pEPdhI^*Ek2RhJGZGh!D{M4##D6Ev>MWSnKTiE-rYQAO!qvSx zGI56`#@xP{=_gC%H3q557N+HEyzw531uofFf9G9PVj4ylrrNfV{5Ac?Rp&?xLAQ(^ zXANtAlTSwfZn?i5eb7nbi3qQk?(b!y56{3bl7IvL*I>vtY5!m2TYe_?Srz?hL1wo98hI)#f7Zgl@Gyw$&$1a9BDW>%|7$QT{NF$NPZ0kv8DXs!nS|8B z8s=MIL6+m@cJSIT8a|Dcvaz0q9Iqbk_#4hwNbcL5t#<9-$3>t-HVrT!BK+iVuQ~#dv|6T@ z>B^wrM93YdR`L`I5BjI5qYkY@a_Um<8Jq>r%Yj|UuLtbuK9(Icx z=VXDjX#|dPKA3RV93Ey5a!=b;G0aT)cj%Meb$<3ptl%p9R2*V32%{3CJteTb!ZeG( zWw#n}m+BQ!=yEu1$%m-?9g+0XI>Q>izYt`eM|>hc zkq=J|cU1+cg|EK{cqyuBVH7pOF#*7`QMIG3YO}WGjtk~|j(vL7XwGmCb9fi|x=~wA zYIB-TE6qg-+&Xu(Ps#;3Dz@Tk;Nj<@?}s83uN5(#%?>LXZ>JLkSVCXoe- zh-B5h)zaa>E{tMfOI(OBO(0%aUCsSYgdIEF5&IJi*T+aXc5I#W@T26vq87xFH+{8o z@Us{>az!nEzqQk~NFn|B%Y+)Bg*ik@B42N5?EDFuADZ6+pdjA z@Da#C6#k9Dz%xn4koc?Qp2xq^zv~q3QO#dtbLLuPt6Jyv8JCxnYHg?>3Z{rYujr z4s*A)DB6j1bh41sV7<(?SxoPky5vweVEHYppJaWkSybVpP@L$>RkEN%w%hhNrDLOP_XIr_|7{&)zS$gS#IK{xi>3?p4sxARt~Y z9M&}I_bQsFR!qdS-9lrCN671S&XqF!dH;K~Q?0D1YfK!W_e^skOd7*uKCz!FC9kla zvhbxPHQkU`GHoaZ4uj)y0g3X8;ytw3LZ9^(l3~+U$mq!0j^9o13l&gGgN1rIR-8n4yDI9VOeChF;uI|g2nIeeSQCW@6Ve;rz z@Y`%eT2iz%1{-8%9<3dETG8*2QG3*ho^wh*Eu7jPX&S|>-;$pqWIUa?0N8lZoO37Q zqgwbd;+abi^ENzzdD*1>f$8nF>hpMKhZKhkKB z@n<%I>4fYoud@5to2zg}MNc&`+&5ORjW+LG-y5~tx?kz$MisOuEcW72`3(>Q8b&(vb#u*q|N97K5|IdTaW!>`86y0cYWwVy}Xicrh52s@J959{1-EiLRww)LQ`dy zbeFa|dzxGUQjDk4WouK(2hS%uc6Is^O!D`D_(r9BtqpV+^fC7+z`1j^+!s9is`mZP zFd%TzkjlwJLgrav0_uY6w>T473xZg9)1D-nDu~OB@ohGQn{hJ3^q!?G@WS+3(Xr?y z$1XxySe@vCi#aC(wjthszAKof7bqcVJCWup)7E5?biVsGiJS$Zd%aI9I()(|i0Z`S znYk;P-jiptEUiLA39G3cQTJ}P=P7eyZ< zJT9^F)19hdOPCVps@Lu;StYN63%FYzTOk?-JRERxCd71Bp^sj?>jSB*8_8;M-dq#4u9R!`RQ(>B#eQrl*TS~|J;U$bbCn6S?m*j&plE3ftZ48WpF&pAQZH{w* zuUTn=mJ(Hj1C=HtEeWhB_g&-ArZ8;kt0mxer$ia}x!TfF zzkbAnAM@IPzYnR{A2{c~(GV9q`OBL(4-W8YcB#4Hhwe0b%+lN(M%&%Ou{-}ruf9lj z<#ez@VTamt_A|~4z6iOzlB*hm4m&&9*C&Opi)%k+SrLX)MlH{-G-oVF+1_D(ZB_3r z1KRl9Zn}i;=xga7$S`+dV^|&S_pKCJWe-32^&a0uYPAYoIZ$8AW$I!~p}>1ZJM)Vfy~y)7L!6zbiF5V$FYD}{;9Cd^3~6aMl%iv&#RWjU{@%>%J690A zpn8RocuHfmF-(8gDXylI!#r9s7X+j3r<2ie_vnnP!1}l&^8Tty_2R|Vk6%{6O&&== z2}JHLe+f`peqUD|JNF}p6BQrW0o;mZ>WbKK?hm~1g|oQGaaiiCe)mK3@XLzAPdO27 z;V0ij$Hklq?KX+83f&SeubY|MxGoM$5z4jIjEAgzXxAlNY5D_Iwa2G=vRNr0)P6=` zn>5_{bnBN}ZX(-8K-n($$x4Xc(N`~hm9%JM8o4sIexbBK zMl=WSEX}LQpE#aoDdngN0CzzGw@viU60VhrwPN)SV;ni3Db~&x<_w<@>v1_^r>H|g zfvy4f@#9vjp`+a(Jc(QC=gc;)4c8oHb@;=yrs2cSR#``z54@tV%99wf!=@P*8yDx3@Ej!Qz32l4ys5QrP{pDCwNR#)8AZ5F>t)6m+zPm3! zNj!krr4e02L%gS6cxJWiJ*YCvI{u3LhgHUL^%#WgY81lAlb(CggPoa`>WPm8_{x6U zePnLQu^OarCbOYR(=Pp#-fQ~lsr}}R*Pyk%n*LHN2Zd|ddS&l-{3hY8+DV$PS_BY< zJ0YlVfhE(1bJcMLOP?eU=^33Fo{bWgV`@g)vnUUv%7FKCE}S=PO;R+Ew*h0+r0|eh z_z+a+m-Jb`vW&fS5%YVqg~Rx9*E78QzvZR_ftVbp$tQvGNE)o)Ke1J!%)D+5SiixF z`)a06`&Q#ITLR9<4fuT9vha|}b=^FYt%FVn-H{roQ%jU}q?%#>X^aW=@aPt%S-02$ z;5mxRTk=LVTPkCL!r=$cd8GEM9qOnFA|cf6`Nr{@c!Q?Z>Pg#1%s*DE`%P4_Z_&MV zGd97Wjv!O1RA0NvNvEL^hl_Sy^;0T0sUOwdKvVl4)n|4eQ8A6}^0)+6>uz^O$Ht@K z{?hFeJf-bu`mO#(Yo94O0Hd2*lltM1zf2Fft0z|1^&wNy(Ai55w3kEz|Gi zx^JyHU$aA!?t0bXhsqJ9v$-|fDxoutlMZ;_F072YBcdu|v##dcGBGs0KK2-TO~qS| z;JG&|KRv5_yAOwh0x0xY*HGQVcKs)$pWv-X&~3ZT(=gP3CC1zKpO#rOJ7hhRH!aUQ zr932)uRh$x0r%61{5ypatvLp*(OA0$WR8h9uA5S>$~HgzFcpop&B>iwcF=9`-V5=W zZy0SxCaqvM$ljLiZXtIEz$h73-i*KNb=ti~)orynfimHa?wj15Sx7`q$fVJ4iu~!M z(XQu0DhsbqQKawj=IP0ixQx@5`+^a2_o7wTJ3 zKd(jfjp&y%W?Uw6$;QE)x99q}$y*rsF}G$0q7{N?ulvAmb(;d~W2Qc0{D*NleVUY( zElIGi|L9Nj+uoaURnZ=LI)BKvEmMH>s0(Tx&RU-N=X!2YbK56k&XLoUnpN7#AwBUJ z9;#p1ZC*P*QOd1;tEw(D^jCN$=49n_7HMwWB3Z%0swKB3V6!^2K2c5%=h2E28rsP< zIV_%PE2?oemNU+oJ061M7TP$JVcLk8Ame7ku0jaGl~9q79cmjoDxo!bD|S6%#b3$b zmIu0lR#!rhArTLw6G8|3Yx;yeIj$CYSq(P%)!P|2KHNW%MK5s6Lc}0R1@3ZA%&JX91R$rP1mF1!dAc3u!^aw$gbTn_Pc;))2Qyg*ZbTEnX~M4M za}KsoR3b*K;?`7Ux@I~$XrS5&=ui^QdshUs%G61SaZ6*4d1htx%(~v+FzpO|<+!P7xH;rIPV6vda9DtQqCh-C&$U+m<`=fReN-`;%u0;SYvlt9L&`O+Z zp=z%Wdx+h=;uX;R7~v~NJo^3XXJHVstIIN|d@opfz%x;ngjX5zAA_c0Hqo!!iX7LV zt!XkWyz9A8iU}=W;nq(l`KisOy zd<(4evUy{if|z62bZW&JMGUe18;zDK6`wAD5na|g!9v9a%A4`%U6=uGw)(ijBeyD? zj(!x@az|4{`L!i%x#*LcJ?yiMf19D(y4)cQ(nZ$?s3yPm^MUIP2F@<6eZGF*TUyc6 z?Dw~r7{*#|nTbQ&&u_UFAuCFC1{G7yN3JXX`VKC*2-`6zpnD=A9kgsQY z_WjL8eO9aO%CZb~UXWJbjFm^0M^#iAbsJGbazi9(4cMXWnk4N-b)V3);u8Bfi%T=R zmm(o!Q?+%i!+xq-NGH!6MX7;?C{<&TJy0#qXR~^U95fjQ)+Pb1YWAg@WaU2#x~C0B zq1W=JNe@V6X2+SX=xK|lsnQdfrP9r56wu?k`;KwAqav0RqkU1n&Cfjup~d*Y^Arn}}@jg$)jnIjs6?7*^qXo4% zJoBWiVeDI^iQ^Dur8mi0R1xdbYEIvbq--9U@OmrZPML_`-hA&pV)qq!u20+ExXp8E zucdxVXYjFM>4u0u1r8)V!SNz;JCOSQmRh zD$*D}gMjzGM_3h0Xkd#^TwYH=UcHDlLlH_^q6>Ptv=ZYF%R+ zS~x5shf`$Iws{2hSz69G5v#Z=fMn%WWO2#w9H~959eC=(C!Aq-xW}C~>;q3WL^hc! z+#8}cyPl@{dK15ZPcv%+Z%+>}e~opWd7I^{-uMvHaH2jvd>>A8$iR5B@YVuJIfbRR zk{kJ`9Mt4gb%iW4Ax5Tm6#Rx3d*;h;f42GKp;mZKpCv)@@Do^DXL3H}!MF*`B(MyA zvNx^&mpdYXl_4KlDm0p}9iNKDHx8FgNiLXd8~3m*@kb|4l~J#W**js&OZgvB+&mq%Eq;l#}B2kD_&b@ zhYEO3cJ37x5I=Dw(cT=_uP_0{V~nPASf4k+k&C3^!Ed ze5fF1g)8iOh`;4yd;jS}I}fuLgQ8}qbG5H&psb8f)QVcNONXEbeUT}+Ik&k3k7nFx zxNDp(gH{r6!KANIz4yY#SNa-{DYg>_RflU|v;5cx5v01ZDQ(T?tu|;hlUdb!|8e^r>r z-4^s8`YxauTZgs6E9C|al6U$wvA4-w)nbk0kf|$ixQMfp!c+)z49IY*Zz!{t)lh^x zY>!y87*uCmXl!8DG`V95n0E;bsiWsn7|{#8*^DiMPi-9Eo?>?|^5suw*{#&4yqO<%3M%m(s!_&OGaQ(nAq2PoNk=p?uJ~I_t@@E| z;@tZ`dHW>U+k;DXg$ecbI5oDL;&90HPR7wZ?Mi4eQy1pVvkS{DB~{95M08rdb8r*i zxx-Plr^p*GeTIVrv&wDSEj&8;9>$M#JE!@4s9jeVRA8Cw+t^SH!nWyh%i`CHg?fdp zZB`kANLZVxGv+?Y`@rzqlnMEf!{XAGp0)99pyGBE+VL-5vqKcU1Od#2Q!`rGlv5hS(gN#) zwfxzIL*NXAnjLKV9 zUhOjdrnVF68~{jETS<-#pOjE5f9?~MHS*QLzAg}h!@;PY+MqWE)xF713SM0D1$0f@4pAuRp>&dm4o5Nf8abeXpF9<`~&l8)s z-Lo>8x|F$O*WUi=>R4p9oKDA)@*){4+38AByUBDR(N&STn_GcWL!@v3=uk7%RUM9^w6|5)c+B-qVn`qUi*S~Qwv*mr`?yfkY)Q@W{ z`p`4xQv|)1Ug?t?wi~@7%67XGF|2mvSt3Jjo`*5EcNPXG+`7O+xKtuCbMG$(h5@9U7-l|zQ1$w;_qR+k9_t$_BE3dLl#UlnL|2dE6 zxYrl#1C`Y6>@F_5S-u=7ToB;_S{u!yZ0EXyFU^^>+w)_-*(vc{o3TZPn=(3r0Db2| zM>mec>;>`kz4WyN5W^3C|B{!NGjE&f5w;j$^Y^{j#X;rmi61eMvZwUdIjp^5>iPx^ z+s=(R4Pl&ttg3((U?vW|Q*zF&7c~X668=P7C>*_Z$F;xIBTVr?(#^f

7(~^)OTp z&M2a>jbWb`ihs)gc|1{+lqSeK@*Z-vHiiQhHiWWkYb_s4fIVbKA33n{6(tIs{CY1u zNWzitNN!xp7DxFD|ZawQDYcOU04pbXDi?t=aGfJgq%@~ z&uX@GO9uk&Ur2Ydu4yAhM{Li{@jK>MvCTjZDaPt2FWtpXu=Tj`ra$1MD{#^84+!R@ z{dlRz8UxZVcVKkNo$3>&mkt(Uuixih>!dKp@Duo_EPQ9HO_Y)Rj5-&LFCUckvMAHd z<8~J8IuC+>IluRAWi7Y~IEEX15vS%^g7@{A4Uv1AWhw$N;s8qrDSF`9p<7VwTvm6s zFvW>(qlv@+^I$h_vhUi0b+;QMceZA$@zuj>Rtzp5!_i$Kl`!k|KSyr-9CbWM@3oHwqur5vesQ)p3s z3fmYi(OVF&cIZ}YHcx6hJ7oeL9&VyC2h0@Oa%hiV*Cx%w5bt4NDV~NRfrKz#P)2K{ zN!iA?G(9Kq%a{pwBpmlB(GS*=#hspHA19?d0HQ6<_Ln>}wwlWYWTn&mI+y=Z&?%Dv{=dBsF^y3O@6 z$+CVwo^=96#4(6I21gH^OnXi|Yg*>aYaw6ZNY_V|?zFTA+6T4^}X z`bN!ty2?V4Zu9>*G_Bs-KC65*v=iXBq6I1!BCG%&RGagjc zQ61an;Hrs_j`J?}8~`6_v#)%at5Np^YkxwT`dZUiR4RpQS&m>9B}ofuiisERND`kH zij1HjK}Rv)ng(WZ6jAyCa$p!vFSHNF6>n&8Qzv?abxx@_+X(zF>RM1BVvM(+9y}`! zwIaCFT)mtpO8sIx=C`f|+S@^eL==zyCDP##{lDVknd*`s8^k&d{K+6pbRUi(1~jci zxvD=pOb9+THxY?g6dF{UfDB~q_^n#!**vDW(c>*HAT+>Q*J;2a#4z&iMrl6385NXG zyfT2Af)=DVE?W7k>?MgCb4Ma4FkY&=qrpmA+;ypsgAsxpy%}4-#+HlE!0Xx`F^=ESo8S_`6xu&=#M(6{zRkHli;H*g2)Ru^i4inc0I?zi$fpszyuHGH_@w$H zfael%>uaQmhgo>R&rf2ONkhJ@V> zbI13F3D%2j+E@AlMw|9i1?h_HQwruvDyOaG$G-zzxTP^>Aa&kPR6DjENX%j#a!JrA z?W&Tv93_U&^RWyt_jqj$-U&OU5&C;}m7qr3#8!qi#I<72JC{k^nr{at<%;S!F3 zGkS>xjLhFVQzWUpuJ5hhJ=j?;d@E|{A&_?W+X_2W=39ms&QyWjl!^Ku(m%G z)Yg`Jy>Lx-?9LSSc81fSj_YErvZFt%I|3rrhu*bcA;0zzLL?IF6a8f0K~9`Z4*NNh zENZ*8nt+kE6Z{voGOy|`Ri=QeOdaAajW~-xPX#3fKZssRAYY{8Zg_UTP%YxI#%ze~ zxLafUK#)sA+Z*_d4P9O6Ug3^q(nI(vq6tdJ&l>eyyzmyu5XR&@-Y%2sTxq>r1w97| zM@OQ{XL;SSmNB-)FHmHckaNPbxFFvQIV)YxD$ST=!aV7i$eB9{DY1*=<2NDBYa=v4 zGOMbEPZae4FTSe0$iUG7!dml8tve1B>%pv+U=uo$YS5?Eq*6f5le}|r*NYyU=L~cgY zORWC0&W!n*ctySLlxrYRFGKPTI8YEwq#FI5Nb>)em{%BVQeC;K!^La7>08wu*=jHZ z5#>O?T>xo3G?u?Ip>w0qhZyqKm1xuZ`yMjzbcNO$u$d|n4wS$>9aQo<>@fMjD`Ujt zo}uohLeOkQ@k@NZ&peV3>DF0ji)ia-`tH_!T|Vu28zNS9HAuqN z+K8BPNS74$ELjb`y41aL1(J&$ap2KPKBjZF${p*OV_sUwNVSx_xc$5u&ydpl59eK2 zbAdP!PQtbnehov_vYzjjlwyF^!-*u`;v!A;5Q*ibsjKw{(()y4G&S^wJ}2Ya=pP`1 zzwg6vPV$f1C1xwP5LXvJA?|b=TSS=2;b&U}uUNf_N$4|M`9eqv>sQ#qwaT~U0PiZB%B_Z3KbUR*_tHJd z3wb$Udci;*P?tWE*6NE$d9p5U6;aQ?lOj?8y`97#Cdv|=9~ase`*kl{MJ zeNZ-Mb(%N1l!MuivF_lGh?tE{2&~9vl_m=|K6Kf^qWv#VjQiGJ`8sY%=f+8;5VYfk zkfw}AUz8a~7n*_so)ILp3s!6sF=Ji2UHJM*zfEsCX$~iU&`affL_dT4@v zbV4Q0Wr`{4{kSa{tXz?$p82Cs6ELPPp0b~9PGhTk*J5)aI`!P-=G&N>9n6~mnxLng zQa_zicz!@+_~l{66eB!gQ3Z9L%EwAC)$i{6?%XO{+{KQI(>ZJnB_#0t9(d}%yyB^^&aLQop*lv6&u%&hM&DpYKyn>C>rSryKSm#&6 zV0v*oE@vn`h@L@K=U3Og%&@laJ-1;zLnn-!J&Q}+@XB zl(MGjoXrS@Cn<~@m|724%{1E&=8gY!jhe_7B?cQm#669)249c8rD=O2Dw{YE4MFyR z3`SDSiqwyP|HSbeB){H!j$V}w@u%}F+jsz?{*jQnIE0aDJ+HzBi!_Y)1grQD} z8iG4c(S*);Wc=lG0Mb6|*$u06KtHU2zmAporC)x2?{)eWtX~mDqw&QJ{rWqP)GxTqCNhS(*Oj^KJOpf$aFa`gLN zOp|ls*B*M|>!fwEn4wJ!(3&1|^F7bgeSp%##&$b=Q#y#J=K+^2_F?y)o>Ci(-Kytl zhfY;jAdQ@|^pV&a zC7 z(4!=w;<5)eR61Gh!InMfqJ^L#jf(&gctOK~d9T=DrAP=$lMR7hOQEx;3V!xQ)}$e} zvT!GxsXHxpw&}g4%TFT7$$4syUA8F)H_Av1E~zQ$um_};`L}D^?2Gbx1`ULU2Pdw( z7Zvb{)Ku1I&3UeLYT3uQVCtlH^|Suz#CyYim=?1IeTT6+jYY39bEkoxnTC2B2p{5EWgWEbFmKNLLsCn=q9jFjHLxi7i8&fVkdyso z_R*id2>g|VtYqLxsPYA#Xl3L%H@~bAs=ObKaFxa?GhMLg&skcMK!mygPj5G>I|5|M z88x{}%q?`Cd8B0Pmda92Zo(nU<#R&rYw7kiX7N1GTK~soTU=pV>NgY2d>fJZ87Qrt z(?I&mVqAz$WCLtuDDt=wZi+lNgJr^WX(?m-$w&1AMV<$KOX?($*8@MbB)Ca?!p!P7 zG;jf=H$n=fF9^x;Sg<_ldZ_g!vMLcRAMr2I+_^S^+=N=)JHH^29f48pEZc=Cx-r(q zSoG)!TY~!s{T{PCG<*+-55rxQ0Hyo;mj~P@nAKQ#e1g(hw?OrN4P^pa`H@v<^p_Ta z+|j3m@P@<*zM#!JuXIXU_mywzWu~=@bV|Kt%}e=t*$VX?ez-lRO`T6WN7;Dc`j2sM zBZW}-;#Y%&y$7nhvt`xg%8}7qM3B_m7@_#F+OjOZ=K3d^%2A2OJ&Tl`Zed`(56<@V z_j`3|89Yc8M%|LOXzPniI>Xf_DOCq#IZt}q3WxpvzS}FFn^hZv374Y9(T@PXz>3dK z0%<8Y3TAjc=9Nz@X&v96)*rJAN(tZn1HUaV4`$vUJ`G{bko_93Dp1YX@M)~*_OFx& z2lQNU@Q4;OsCxs+an5xM;vYlDfrV42rcT+aItft;gpZu>1(ahDN$Nefe#$H_?dFUH zeTa=WZ*8g}JU4PfSlh2E(crzz@^Aj5gn!cQPBG_8pW5!wd+{l9w^Z=jrjBf4-OyqH z-FcQW_C7yy{o!KY@Tn+58Sa9!`?mk9Ew}Cj$IS~q?3GuW+7j0n&iE02sN*R5@Oojc z@hx$9*eTpl?0e!VsM9_AzWp=g(h{BIQe*~rK_^@SQ8N8JXWw3uuy(2VU-XL=8jOyP zh0Bt!j{orWPeEr{>iR{24?eEsj`QLFz;M+O#(B1Q*Kd>|EV<5 z?t&Dt4ZzGcqs7P5pUXpM>^JZh+TGP+qLffbVz-SF0oZ#NB~QXmeP&B64iCq=&kd}l zHyn(F(B4ooL!x8r$)SMXx0ds)7n|K1>g}C?MeH-5Izqaqu&QWz39c{PU zBjW@sB>vpcc@Y1FS+?x|;2S-(h4oFR|J~DAixT|5SBU6)#_<2;HLL71_TXB!fpE#S Rus=iTY8pMRcx3TKHiZDVpx)FmI-EEMPlN_VOF`6MQ zQjWpM%`rO0Gd}kZ&o9p(a6jMUyW<#K<6u``ulM;n&-03UVx)bM9H(!3x~v>-+8^m*rrM9_DhGL2XdlivJv4ks zM~8@KJ$lYWN2fBRtNGB}kA7puzTa-l|HeUXWM5cdCgY+RN#{&lKtUL%qZfO(rd3?Z zg)Lq|4)&lcw}8=^G3N6LZ>ADPE3@>@#=G1~h*Q$7ew6w|@OkD6UB$bbrVmz4TsbqY zIV-z&Y<gLanjL+t=@6?uRu3u(*8e%Z(qac|0|hm(=+~8D2Zb@oew&? zCwxp7{wsV^{=XIdHxd8u91XtYEC1OVD6kx-y6>e>aM)JoK|XkVcC2Z)#&*~EWb!G= z<0Nn*cq?9+j!tpnvjBAf=Uu=vOa!)$^7-$bwRH~t_X~phe0-4FMzW}E{l++doYlv~ ze@y9I!#1t*`8BWkgj9f!bK3uFm+t=;t3;awkq&n#0%6wiWknGph_}_L7TNRb3 zYd>ma{9n5*X#@Uhqk3rJyVOr=GOIc+vat;gBx9XW_f9sK{7)MG)Z5Owp#<$Ly~M>W zcTW5dy#+`o1*%TeMPE|u_~E)`Wh<22^zGz=Ya0>F_b;T6G~x(4c02Mr7rxoCM1CS>O{*sRoD6Ys?TuqV!VW+;>y_xmSkYhk&s%{r)f zcDc>o=GbmuEw(H)t$1#^t9uU%V3}7C%Y*bs_3e`beyG(XJ0>Kz9gP)%vG>)8P1|W9 zuJDuTw>QD0I`U4XjfcGU1&O?;@Z;d-=DmJQkc?(|vqiv%CB{!a5XxDNuFi4bVnwX> zhnxjGLnbQ|oWb#v^3F&xfN_lSjDfs3=jnr#v2Et}-|9L(kz}s2dO@7>*S{CjFB(M% zIYPr?oHkc+SipCc-RhhpJJLy9!tLrA-=NacGmP6&H})r!Lnkh-P(6ab{`q!pUg#e% z1h@Z#VILnfSIn1B#iPKb4sWc4m%1)v@1uD$3-C`>{vg*M$Lu+D?JVA0N&uc(To((#vQg}3g2o4EY|N_yFG5;rnL35ZRJD#ZXT-sKGj1r zE%jCv~}$kxFBw+vPo@QI6^+yzfrVw`V8n6WG>}?pm$~=J&Cz` zK&o`>cUFfPLEnvln3>!|RUS)KBSb5T;tD^b37ZpDN5KsyWHt58Wn*T^B2e$% z9&31=jcE7(Oou$GsKhYVlWYOghNT;EjHj@|1>HDHI=SSokuPrE&m7`M6&EkZs z=Ni4BOOwW-CNqlHbdRlGp!;h0EGj&IacW6G=TRS{xulXNJFivkQF;CMq@%m89q;ZU z1mXjdM5sitF|{&Z2yH`9copv4;+U){rWp|TKuWHw69Us_p{!X3O36gmLc?pfW=3d-MBdHB()q z=NLw;l(*tiN_7`rBex%Fkj{{hOlmN zH|t5s6yIh(;sQ8EJ$RQ@cfY9U!I1AtgteiSFoc!La*c-rm;v@Ic#~)!gmnq3TDXrk zY&g=b_mouw%eG2%%?7Xl2+@SQ1VfFAuD+~PLadnxa+~znIUcf~wM9SGk01CIcpyYLk zk(lwrXl<8}s*zWjpSwD#wLJuPk5@rG+iIgyxhz>7DhX(ifM@g<`D!}|VNG=Lk|R8y zaywNN&p>GHdAoDR!PQh^$i`a5*1+0It4&QxV(RW`+sC6;39|Je2DuNu2r>NFlNyk_ zY=BBAQcNpXMcx@wZ?o#JZ%%+{iS5+0$D!yC2W-kPA7v<&aGTbPp((x&GuZr(w%Wiw z6~_rp6$+3vtG@C++CmY2Gq|lNWee^gA5yG?>(}Zlc9It}lERRb3vD|!BKJOzp`rSw zeI?_V^u^s**rJ6PorK!GzH}=4qTwW@m!wnHP?5!}MD3#)T+sXtvq+%}6w09W71uBnF*x-~<$? zJwsX}=ZSf9w}+BC$yr{v`Zqo}2rKyTZYo}9YD2M3QJmcy1`JPKVvxO;plNWwq0x_x zX+{bE9?wvk%YacHC@roDhltKbAa}?{J(l^5P~G$+qvOM?x(%YC@q*9f3emX71BJr< zooG_XJSy~eq9vy^4mIAC^3Rs!`)aObF7Jh>gby&}?~e1W))C!a(R!U{@zi^&iB9KT z4bh!uq^UPI&D^z8py4pTN@)0~0X>+{)R>@@NolWZ4)e)kU5dRMp7;|AAga7=m+2(^ zn&z;87`D(F)a(meBoRA^IgSbh&S-#ckR5TFG7vLJ?+jPk!Am!lW<9zhq*{z@7|k|) z$TF$uNgn33xiTOJST?%gQV`~O9!6Z@ZQGredZ`zDvAEe`wq%djK-9CN5qBt8jMKVX zNcQTjXp#{0`a+1M0J?cYp<;}WbEl)xZw1PXdC+ttu~-c+i3&A4w+7@)($DI?V$RyRtz-Qcy>;2HZx55y0IgOar_A8bFgfYB@!K&y`asdi=0#$^hkV1xxFS+En!8#p zxNY56DxCC3Z$Oc%7J_?!JHV*R-C(UCCdSX7Wy36t!U3qVNB@+J{Rj5N3UDIgH1!6Gw*=j1eMwuy7dMuenh8@VspbPIGdJ(`hP^RqN~>!bQ8kT0M*#MS(JPvX_; zZP3Ma!v2EDdzj-}Cd=`D_o&k$Gjcrq`eyySWavtxI+ggE_0`Wy20t^3Awu@m(?^^O ze?_vdvnGdZP^P=e{!At}L2JixfThRu3w(0xcUWYvNHT7(0j_SBHKnRILrqND9LWdW zsAO}A8y1$>9Y!|;O{b7U6xCR_Xxpa#p;(}b%vDLphfQO)6DIc~Cr%-mZaLhusa)wwT+putUux} zkIh`j#1Mc>^`=+_`J;8}W;(ies4?=lKEQLtDq~Z_2H_(CN4B>4t{?;ewj6~hItVb+ z!c6MT#9(&%TJA+WLpp@t4U(zG1b08`Tb5Ur-9De~nzj&2Z z-vm4du!rkdXLLFtM=7L});8r~zMy3)o1QgF2zHM?SkEy<>L^#7i%nK-G;NX_Rfv1c z+8TUUK5n?}o&pVyFZrIJYO|HNT)>h>I9UDh6lA@luq4E+zgYcuXuRXY=0wk;52S%7 z^4WCPVR=8qy(4YSwQ1*PxDU>bKWvTyo)LhtMe})K#r-NhP1I%1nknw9>9K{Qq{51X8~C>~OJJ zxbt2lY3qCL54okdaJ*hnL5C;&spsnV9sXZRnB*;zgpvJ)0< z0Z;Xtz48YAj`RGt>po<3jWoHYz~Zb-?cO@ZX)mgaLh6Nwp8O!qSOx*?T%=7RhAEEl z%tAtELQ{tlg{^a|CgJ^+Q%Pf_IkNWc3%w-^%)d{i`91=&G z-ECEuTX&IClCCstj1#uz>a3CSf2ZV=Gu9drtV7}|wL3O`mb?ZVk59#>iek3O>tFsg zq*~Z0p$TFv;6&a#8GBtcpC#c zIf)(XN!4@>VjjWS^qbqm{DORG0xaGZA187*Mdwkfo~G@dQ)88tBd}Fn#QwU1?$b^8 z$fWZV+mYy}rTN80916%(#j7&D;B~urzAVJ$O5r8l^Pgm+XM}wikLtfz09J?e)Q5^|<@&lYf(jyD^W9{u2!^;G#|n$2h`TapI) zbQtpbw<{}mq4f*Rq}yEna+1>!<9b2@#eaf7=$7!?gqNQPoXpt(@XK3C5TzdT&9?}} z_p(K%6h|?mvD=tU_TsS)UZe6ci-N2#vc6Ttf|yBcgQhF2v`u{~1b_e4FfhQ0Fx220 zwA@-LPe#Vs&LYR%!1+btpUd-t%O)4QWG7+~Qr)@gcV^fi_w@P^;G9R&bxToL_tLcF zH94Q@Gd8k)43F1ui(PpzJ~XoZXjt~U5~nPbn70h5)b1i#BJG54OBBmYmcL|8IEXGQ zjOLb=j`fBs?@HC&3OE`cf8|kOy5U(A;#je?=OpIxx+OWOW#RbP(j0LvK3PC1Un}&H zM*-TmvQ1&OD$AFT)It!qWdjdVWC~2v1+-Pro$(#`8Qa%4BC1{eQL+S2Na=uhhuTt$Uv}2SBUbU(I0k- zBd4^>8aYs{a0yeIXH|xf>>tG&Wq6>idI~QC_tu;>c4N!c^<7)VBr(Ojx_N&2UkV=& zIJRQ1iU_Fxz3VZ>)pN{#o>2Qr(#;_oKjU73m12KuNuc#f6%&p{-gT9?zLR~P~Q(6;dby`xfq}bj49 z9F?ns;Awl0(=v5&55gk^yvRHtC;jk< zd@rTgF_d4K9%u+y#{rRRc%K=4eQv>uE}i%1!fuu4U}U3=rK=)lI$>c{o3&Uv2~;d) z+=}PFFvZF9aChUx$g(3tWF5M6$vQsKkvJfSO;yZITYF(R?16VKfA12@1D=2M>S=pq zUlEiI?p*vJ=6%>SaI@<&FcZcXoUa!M7y3xK1qK5;OGiAs-iTHN=O5(n8W)eOsj!gKiqbYg4QFT^B^+_47W0koImWTXZ*r#IDSlPmHE1>0Y;5ExsNk{zNq4m@e z$9p8NIw^B*iPXZTp`Y%PKIwHOii%ij0u?89xi%=)g0hicp5((W*1BFys|q%s+8h0n zvBsu`Cn~Qhlrdn)99XAZFPk}kZs4Yk`qOHQ%JVM$tl}TXNm>0VIj~HHwJQrokcC%7 z#9gO{e?drJ!Mf|+RjHI(>qriW*%RECLu6U7rF~@@5JC%)>PsI$ovFjcG8Wj@nTu`{ z6$-wRJ+eF6vft25-(?Dd4+ZXQBKc5P8;*VycBSxwTNseDbSl8xtUj^Wo$>L zj#Go9xkY_Fwz#A^;mLQE#(4`_xjjiEL-qE)h7(DA`K_P(ZjOsiZTyO!Oi8eeAEpH4 z5;83H>JobW#XR*Yb{;8L_|Pb^CSF0=At?O~_ImtSM~7nTF1fc(zajOI!)K#eSpQ4G zR%NhOkJgXHDab*rq6Rj#&uQhjFyz-JhWV#dtuzUF(PYWCo0@wa%i-UHQ6i0T-+*?3Z)Fe-%4t zICRgP|5J77r%;>Ja^$S95Ae6ssjPogm}Ks0{6xwORKDoyn+CJAIfnrrM?TO%%}8cDgb+Jo?Q!>o_G|^r8(R%T7+3-*u!fulk#dsnRU$ZS;u5OWL3u=_+1 znfP_UF5{8s>Lcg?W1+N%KvwRQf87?g;_v(%)DNjf6 z8t^{gD)iW?8opvb?8@85Fu%2!INdK>eonE!ay`RzUuV=C6FD%_A$|-suY)dsPH{`E%EXDRBqF%7= zM`TcH@Ryk3QQ(m@&^$)d?)7durQFv+{FO<2w~5Imv$<4()CsPOxo2ETcegASbubty z7OS#C+uHQ+3nL0%O68SHhNihe`(sicmDVxXI&smixEl6byPJL9!{X_@2y(BUO@2+P zdVh^tWO4-^lK0^6+!nx>z(qlGXkYTJ;|};~TZs;d{mv;9qBKpRbum`zV<~}Q#zQ-H z$)tVnWvdX_EDfR^Vx(mBr@vJ7aa^B*&CF8+HOV>+WXi^bU9xayZq0=M5z3e*#H4G0 zyi~nq6_%Px8av6u=i$wc99odBy#_ZPD#2|7mbP|=`GFL|tYx%t2CL#-o8_r$dANd{ z^j!Ftjr^bzB?V|yB3~Q7oi^3~B`mQFyUg1nP^l^Eo01n@^{r47Z^tJDOK{t)-{L(pv`G#n{^Z%mUuVhS`q@F4ERKuSQPy}^b4UuSEs zDotG-aGQ_YQR_0kFmd+2E#56%s%JE5ntXU6*hyn$cIl$%^r-OESN9(1F-G#OlV` zMg-s8S!KJuAME1h^rq653RAA&!m1F3xtpAzl{oeXv8GT_OsgV-XDv2z(b96x=eZK+ zcaIZBt?`2^BIAB5RhjI526W= zKIs~9f$A=DUcPLT(JKI%LA-7=R@5XJ{HXV)kLN{OKKDgm0n%`MF`bPgUwyo337g zK@wYmHHop`n*nTuZ0E~2ZU4kSBJJ>^@qr@qGHt-tzqJy;ZZS@~154v1)DCYp7-dUWe5N_T7>AiX0Lx>S{y&HS8>&lNwIn4NCa{{+j z8wh1j@UxaQn#DH?^+w5kdTP?>hyNuYX{Y|{9*@{ngac^G76-hZ?lUnR0MrIagg%Mq z-M|-58qPc7LO4n~X)CA1PESFN?ti;2MM~VFZ*1fR4s`Ica^E!4yI|L@ST@gEtM!** z3PGCn03d4vNvgh2X+haqc=oJ(@pxJU3@EJHFfHPp9YQbkv+~Ho=+*^Mv4J{ioijJK zH(fumA-Nd;)DKVwKk)A6FJHM-d&`H1gxUo)MwJn_H?7X}dGokBcO=4Q8fh zK5BG(;&|m3e$Jp5CsWn256h@uV(HGzg*Dm_C!c-DvCSqMiotPyHJ_ytFk_2mkq1A!`&y+F!BA!ArzK$L`1uYMJ^{l-SC&V}Ea3CUc z!8#&ip_GGey!gGXk5Yf#8^-<#gh;;6D#LYlH?J@ARvok~d~TGFo;dVQAI1=2Z9qM9 z>L{_{IHu^=exm^G3)lnHa*n^Pb0o>QB;xO#F-sXUwr zCDPF?8IT#KIOhJacg84s|LQ25p&Og@-xd709qLt{&%m5un^+GKIR09R+9tX`5uvKn zTq_Raaq5T7IHvozqXC|tzV3*@m_TW|$uM|DbM zkZ&l?RI96zaU-)hZg4a1$)+{(pf^srkrHi93o3~Vj=z?Ll@ypI>mSiEjMNACY;V>)IYP#fhrT3fbwym7neZZaETXr+ZO<}gXFgY@18*nP)7cF zT=>!I?*`P=`3I|uR7z7~s}M%yKWo?X%hTXD^(GXPo`aoGk62Sql+WyR{;cgYq{}`basl^09+=Xw zhsQV|kFPzR3qln7*fwVvelfv2y5r|%o`3^y>=3ejZcqkL5@{@a+rGU!u{_g8U~=;L zR0pa2H$yXUyj$gZCuBdckm;&qKPF_G-CBX?%ci|qq4aiACr?2 zt$0^zBd^!lgy+WEeRHbApQ{2ZALJY4u%#2T%b&PT1C!2~9A(uiX>#dE+lqC95M2~s zC;LB}h2w|-{5kwjP{bcz%Gb{OGWDj#(vHnB^7kute|?lqb=~#B{}nJ0GwMOmL%U)X z6b(^Y8jF|G^2aQjZVpHS8k?G?9!mZ<0F~HpkQ|*1{rpu6<`v%XN9zp0y_Q2@m}uk! zwWZ3>#$$`G3(Zj=EuP}@;j*d9!B<42zWDu}6_{+^YfWpe4RjGHsJBOL2XYP8Entjy zNLLs75(Sc2-;skFvl~Hxdjl&mL92E0>}H-*ua=`;#OW$IOhNl&oI-$YT&74P5jIcf zo9(Y?NXF9E=t`M7O*%MczS+q-nFZVps?pKUHh+_o#apZk@&m<#ir+7#u&V6$JvXjy zx1eoh#Oc;j!m|+}!={jg&m8I%oE~)J;$1`}jS}ag;F=kx7qG2E7u`zqS?T4)2`e!= zw_Em(uvt@`e7^LLV%b#uO1fX6UYDvY#+}g+K@zwwO-NWuRi|>TX;gns&pJIVWhe8X z+eMQPhlc6+=*JTTMHMFcoWq@ay`=|o-tH#|ee!j>EK{U2POx5(Ft+1$s|=;Cx=_be zs!cqN4MEenHbM6JiUEz;-#@c{EA70z(3i^EkBBOLj7WSY1Rwm)9fY9nL(x0;B<-lEfSnCC7j~XI zdbU$Dj=wy=Lh+Z3TJfBZFG6d%Fa0V6b0OQ)mqxrl$e+RpsPLJ%{EU$AMQGs=jb2ZK zCv)l!deAyzhW>MP-oQ!lf%&I9O2Ug_ zD}CN~WqA)*Y6+dUCZ?CqFv}EyS}JWe^5=$C!wl?e{SG_ad;!iY(36JNU#_RaK--yN zIqGg%@|3<_kbfz;#E_{bB2(|M8ns~|Ehf;^tCr3Jlo3E0J@5Y@v)%JvwMh@02g^@b%2F^_~4v(W&hcA`r!vK_PmeSV7OBflN&tW~Z+ zX_Sb7@%PQ^^rmH~I9e8oGm5Au8HTQ5eOOipuxYxM9Oc^g^M#og871>GOfHKuI$JMcObL{NLX8*G#9U`$ZI`! z#cMI6pa}H*?2@pPjQXM(w^GbKC8=v?xNT4yUuV6-6~4w`|F#~_t>~TdT{UhmuPuxY z@d1lkHQI!cOE*s|qzN{;`#$nU6`RWMqakSw7)|l@c9li3^^c-?9g%zq$SDO)cv9Jz zaC?B}7q8cnA{uq^j;`cfI4$^>tJ8vij^}e$pHTljAK%Th-rb%Zw-Z_4L0_4*ch`xl zYc6gN*&BYL=?W9qlrM&xeC9!-Za5tJ* zE|sJ+O^cs3O|^%QplC;0t&u7Ym2aCsH&X7=GX8EFb1YV6rZcXxIT?T3#B<6+-+@@m8VgRLXA~So9a4W$^URvr z)XmGNJJ2&B2%e==^H7l>!#A35##W*BT%_>gCwE!tCF>j7biBYMHZ`x%iSN^>#GF>T zk{H_9M8_9|)gllUG5V;w>L6^TjXH2|M!|$v?ce+lV$|0YM0PY6tzODd~I=2Lp14f)ja{ZG`l)q#DW%NBaI+EKbL%Y5zReSaEk+>UA!jeKhm zM?%wJ%mlsnAMdqI;(L&Yu%q_y&YIe|tL(k){K1|K=E z!i<;rSM~gG%YT2{O$FaXw1(n;@V>(pC}AOgYPUW+|ysA2mW%V=YV(KSd^P@uPUd z4i>MABwIx`DBCUN@xXTQOx!l>D(-HQoHl8&Y&7XPwiUAjR~%tZVBHoYRWgqikz)F~*r zYjiK{6j;gGvWbNL#@sG_P5Sbl8T>5IV!*yR(hOj2X$Kk%H7`H%`YCj%nVomT#(G$d6mn62#4?}N{fDPa% zc5C+ih#Zl3diCaYOV6_Or4pPoU($coT$I~loY?T)1qQgVb3EDF&g%Xf;XtR#&t>i-^g<;$kE zBLgWuVWy?6>dp8Krj@R|BP|6HngGQqOM~Wl-^$iS_qvs$mQ1nbdJM`t|0Elfe(?BvC5GXlH2}H$Ru)aDG6#NE+%EM)$Rd4LyBe(y zrH=1EUYb7QMn(+J@aW}qoGTTN0o}L&u4jUmG^b6A@?RY^2A%Aih?GnobDqyqvK!{W z9(Mm!Uxr5S%K3&n)Zk#2xZi}3`0yY-crv83S#hUBk_Ck*vEusFcdHtQGtfQr=`m0( zRV0d2H)qPavZ$jD#{|BEC{vA^*^tk01LaE}j5P0m>E;==M(ZGBr}radn-o&~>LFv* zOPMgKgp~|8dZCocog&*}Pf#^}{wjUIg^XVlfrDLN7oHK^e3&9&-t>?>C8jYMfb`JayZ%o%&v_Hm&d}K)PI)F85JWghc<^a9E3 z&R>mqy6zKA*Mc#l2h*NBXSL#3X*$_2YZnP&kupDHrY@O_o-|cN5owVH01x)^jTR#i z_|lkLn4V#hWIGM>?_`zt6hy{%84Pc~U(+(JTmmX(wC=H`Gz#5o?bBupZ39E}Iy>v| zG;^E;0$KdQP*s>Tt268j_Zy^Xv(7C-fNpad&jZ>Dhz+mV5|&TA|BRWeeb7re^|F@G zc?3_G@oKAv;E=6Vx#6o32V9~tf{#Hja&1%=bTFOw;;>ewUs1BpIw{iEj?koHVHN|8 zwoiK_!JX0oxGZHdxoxehvJtP0>Hlmr&^f(FLk`xrF4%EzEPK_)4R_F&*p}-;D`RkT ze7e~>ycM9}mzIyv81k+)k8J&&ywqNxri$fiTr48xb+GkaDWm;}$oJr2Rn3sssu*q| zny8LG4{y9AG=5IFkih#Mv6!VzvqFP+nBbQq#xrzdGd|(2GWxm9A&#fwE7k;CxC>|o z=>gpKoX<=E>{rz4^rOcHMjFzS!vqI-=Wi&kt&FiqDDJ8D+fuIw`I=>k zep(J~$4!ufM~{z?G<3cDepx@tbC+h@-d%X!j3Mq`r=Vg`;ar;V9N>@ZYkAz+F!z5^ zlZr{uM47bi&&+*+4ZUtzaO}xwY7@oUbB{s|o(%0mnWHz5ly&Mn)IRyrm0=l%7k}Gy&7>O_aE7 za?QBU7009IQ|6KJ^+R~YmHZ{6*Nc<$W*V*+0NN*@Q1ts z;&^$<6v-`OAK+bk^w6-P7(5tAdmy%Yk7vjyvBFO_R#*REI9_SfoI%0jrB|$9 zf%I-Uy69sJv<%F_TMsS~ZsT#3^xGvFnpoTR!t-KktcbG4G~C#akHisWmfK>OFFMV0 zUVZuIS8sG-p{rg%`oOJxc_RkNs2!tUK-9as^G%T-4|9u;i6f9%Q7dWi!esN+^YdFL z$<+N(h=&9#KuCBYj3y{a<>^sx%>JCh;ryh(J`e_+^EPX@0TJln7M{wCnuwrRU!yi;BP2=(#Wm3OWNq3Ge4wKC$Kzw9JbtOT^|K6xX!XnC=v zGTp2(s9Cmew~X4TIP;PAP4Clhj?N9W*Kvw)7UfmP={~)(Figul?^M31{u2&>nlHVv2X(%=B@RPQ4MaW=IkaNWmWQC zs27OJU`p;dWTae;sZ+EY*ze3KSs~t( zub_ETPN^?8J-jrh^F#Ut!6mgf1BlSwQ{(@$Tx|nTG~hv`K9Ms}y*n4s&RM7rAuP3c z&!--*1cgze3=$8RDo225K>Lf%$7xErtudaq0$vU+j&M#un+R0bOfmEBgy09m zXQO&FbEso`^eTBauytx6oI118y*Gzl%Cy%BV&jcr8WdsLmUZfXDj*=iRZc_Pj%Q_F zdLFp59nXIClkFOSw2w=GJMyv0QgP&an`g~>X(2{Ay*UBLBxlw>85S{z^4LVh7nw>5 z(EQha>Pr^h_P1{f__|(hEa4V+gu{`Z?!~o^LA8I_GLlPo1C6zuJ6Rp>eVLgCE#PVq z&e8i3ewB%Utbi-_!|gmjO@{gAQ#9@Nr$k{{i0HA&Su!rn6|{}d`Bsn@q4mW;&xAfOmIV; zjrT>S;bU!w!^2VwqWKsdU1Vd4i7DG;$^8!o{yA`e`*1)8%FBM56Voxei4OK?PguGP zz`U6D9` z_wU~;LwA=7&zEhVv7KXfAYV(M)tO>cQKx2v`69vYMZRmv5cI?)b$Hk+KUx~&^XVP8 z6OQfu_yW$nac{fqM3?$0(COYkm^$PxU=r}@_9R=9vspX0&QzvAgylw8)xpf;m>+s4 z#0n*ahZDW?0$7!3%N2^tFay*Xk#Gf!hc-;#MJ~R$-riZkw+7+AXND@73N>=bvYwg$ z`o_}}0t>GrSd8b1eH;gk%a(9AcyD_){E*X>KjwBmbaF0{sN&p`zA5hPTi+-zu3b+_C&`{)w|i8 zT)VzA=kKo|N1oYLt1yNiqDYP31Vq4}ebznFDGOFH_2!eozJtsTeZv{bub`FjeE{8t zn~w7t3mg{c)wITg&8dwCloQCiYS|%qwZE7E(>@Med89P^;i5^SI_;& zJM_nc*#n7m?+ZND*p*}lF9owmV<;*2qj9xpo}q81o}LNncZ$uW-t4U9d<|{&JAP)_ z!Wi#6Gg#$Kqn0t6-Rv0k&>FvS52^fJ9IU!2U?&)R9E8i%z8bW6scJQUe-k_TbXyl2 z`kIAXCYqZuJ^a{aji1(Moc6&9F9o;xGVV-?$#ufW^*h4LQuWNyd%l0Jrb|yeLa+sh zFcZ7{bQK)F$kR;E-mOOuIRhH(ePbv6Gzm)#6ZbClGF3(}U|xtiJm_>Bj_wh+%7}e% zdaW~zS<_P}QQaYpQ|5LbANLbPYg1*?=`B&vyDv;KPX%Z!`{_&Uf8S+L%I7(0um9=Y mRZoWhwO0ZEkGV5TX`{PUv}EqvWXXOy>$+M-nw5_n-u^$qVj$oE literal 0 HcmV?d00001 From 82aa86421d7a4d23cab0c8cc48e9fc46abb39c44 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 6 Aug 2024 14:37:02 -0700 Subject: [PATCH 5/5] add new example --- Cargo.lock | 2 + Cargo.toml | 2 +- README.md | 37 ++++++++++----- docs/kafka_rideshare_example.md | 35 +++++++++++++++ examples/Cargo.toml | 2 + examples/examples/emit_measurements.rs | 51 +++++++++++++++++++++ examples/examples/simple_aggregation.rs | 46 +++++++++++++++++++ examples/examples/test.rs | 60 ------------------------- 8 files changed, 163 insertions(+), 72 deletions(-) create mode 100644 docs/kafka_rideshare_example.md create mode 100644 examples/examples/emit_measurements.rs create mode 100644 examples/examples/simple_aggregation.rs delete mode 100644 examples/examples/test.rs diff --git a/Cargo.lock b/Cargo.lock index 99d8f0c..ef8c7e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1110,7 +1110,9 @@ dependencies = [ "datafusion-physical-expr", "df-streams-core", "futures", + "rand", "rdkafka", + "serde", "serde_json", "tempfile", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 1e83231..34cd93d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ futures = "0.3" tracing = "0.1.40" tracing-log = "0.2.0" tracing-subscriber = "0.3.18" -tokio = { version = "1.36", features = ["macros", "rt", "sync"] } +tokio = { version = "1.36", features = ["macros", "rt", "sync", "rt-multi-thread"] } async-trait = "0.1.81" rdkafka = "0.36.2" log = "^0.4" diff --git a/README.md b/README.md index 94c88ee..2dcf250 100644 --- a/README.md +++ b/README.md @@ -9,17 +9,32 @@ It currently supports sourcing and sinking to kafka, windowed aggregations, and This repo is still a *work-in-progress* and we are actively seeking design partners. If you have have a specific use-case you'd like to discuss please drop us a line via a github issue or email hello@denormalized.io. -## Building Denormalized - -Simply run `cargo build` - -## Running Examples - -See our [benchmarking repo](https://github.com/probably-nothing-labs/benchmarking) for local Kafka setup and data generation. - -With the data generation in place, run - - -`cargo run --example kafka_rideshare` +## Quickstart + +### Prerequisites +- Docker +- Rust/Cargo installed + +### Running an example +1. Start kafka in docker `docker run -p 9092:9092 --name kafka apache/kafka` +2. Start emitting some sample data: `cargo run --example emit_measurements` +3. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example emit_measurements` + +## More examples + +A more powerful example can be seen in our [kafka ridesharing example](./docs/kafka_rideshare_example.md) + +## Roadmap +- [x] Stream aggregation +- [x] Stream joins +- [ ] Checkpointing / restoration +- [ ] Session windows +- [ ] Stateful UDF API +- [ ] DuckDB support +- [ ] Reading/writing from Postgres +- [ ] Python bindings +- [ ] Typescript bindings +- [ ] UI ## Credits diff --git a/docs/kafka_rideshare_example.md b/docs/kafka_rideshare_example.md new file mode 100644 index 0000000..607c45c --- /dev/null +++ b/docs/kafka_rideshare_example.md @@ -0,0 +1,35 @@ +# Kafka Rideshare Example + +This example application aggregates data across a more involved example setup. + +### Configure Kafka Cluster + +Clone our [docker compose files for running kafka](https://github.com/probably-nothing-labs/kafka-monitoring-stack-docker-compose). If you already have a different kafka cluster running, you can skip this step. +```sh +git clone git@github.com:probably-nothing-labs/kafka-monitoring-stack-docker-compose.git +cd kafka-monitoring-stack-docker-compose +docker compose -f denormalized-benchmark-cluster.yml up +``` + +This will spin up a 3 node kafka cluster in docker along with an instance of kafka-ui that can be viewed at http://localhost:8080/ + +### Generate some sample data to the kafka cluster + +We wrote a [small rust tool](https://github.com/probably-nothing-labs/benchmarking) that will send fake traffic to the locally run rust program. +```sh +git clone git@github.com:probably-nothing-labs/benchmarking.git +cd benchmarking +cargo run -- -d 60 -a 1000 +``` + +This will start a simulation for 60s and will create two topics: `driver-imu-data` and `trips` which should have around ~58k and ~500 messages accordingly. +There are several other knobs that can be tuned to change the amount of traffic which can be viewed with `cargo run -- --help`. +There are also several other knobs that are not exposes but can be changed in the [src/main.rs](https://github.com/probably-nothing-labs/benchmarking/blob/main/src/main.rs#L104-L108) file + +### Run a Streaming Datafusion job + +```sh +cargo run --example kafka_rideshare +``` + +Once everything is setup and one of the two streaming jobs is running, it is recommend to re-run the kafka data generation tool so that live data is produced. This is because watermark tracking of streaming data makes it difficult to properly aggregate older data that lives in the kafka topic. diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 66611e9..a94c834 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -20,7 +20,9 @@ tracing = { workspace = true } futures = { workspace = true } tracing-log = { workspace = true } tracing-subscriber = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tempfile = { version = "3" } rdkafka = { workspace = true } +rand = "0.8.5" diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs new file mode 100644 index 0000000..311bbcd --- /dev/null +++ b/examples/examples/emit_measurements.rs @@ -0,0 +1,51 @@ +use datafusion::error::Result; +use rdkafka::producer::FutureProducer; +use serde::{Deserialize, Serialize}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use rdkafka::config::ClientConfig; +use rdkafka::producer::FutureRecord; +use rdkafka::util::Timeout; + +#[derive(Serialize, Deserialize)] +pub struct Measurment { + occurred_at_ms: u64, + temperature: f64, +} + +/// docker run -p 9092:9092 --name kafka apache/kafka +#[tokio::main] +async fn main() -> Result<()> { + let producer: FutureProducer = ClientConfig::new() + .set("bootstrap.servers", String::from("localhost:9092")) + .set("message.timeout.ms", "60000") + .create() + .expect("Producer creation error"); + + let topic = "temperature".to_string(); + + loop { + let msg = serde_json::to_vec(&Measurment { + occurred_at_ms: get_timestamp_ms(), + temperature: rand::random::() * 115.0, + }) + .unwrap(); + + producer + .send( + FutureRecord::<(), Vec>::to(topic.as_str()).payload(&msg), + Timeout::Never, + ) + .await + .unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + } +} + +fn get_timestamp_ms() -> u64 { + let now = SystemTime::now(); + let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); + + since_the_epoch.as_millis() as u64 +} diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs new file mode 100644 index 0000000..3ce7878 --- /dev/null +++ b/examples/examples/simple_aggregation.rs @@ -0,0 +1,46 @@ +use std::time::Duration; + +use datafusion::error::Result; +use datafusion::functions_aggregate::average::avg; +use datafusion_expr::{col, max, min}; + +use df_streams_core::context::Context; +use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; +use df_streams_core::physical_plan::utils::time::TimestampUnit; + +#[tokio::main] +async fn main() -> Result<()> { + let sample_event = r#"{"occurred_at_ms": 1715201766763, "temperature": 87.2}"#; + + let bootstrap_servers = String::from("localhost:9092"); + + let ctx = Context::new()?; + + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); + + let source_topic = topic_builder + .with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis) + .with_encoding("json")? + .with_topic(String::from("temperature")) + .infer_schema_from_json(sample_event)? + .build_reader(ConnectionOpts::from([ + ("auto.offset.reset".to_string(), "earliest".to_string()), + ("group.id".to_string(), "sample_pipeline".to_string()), + ])) + .await?; + + let ds = ctx.from_topic(source_topic).await?.streaming_window( + vec![], + vec![ + min(col("temperature")).alias("min"), + max(col("temperature")).alias("max"), + avg(col("temperature")).alias("average"), + ], + Duration::from_millis(1_000), // 5 second window + None, + )?; + + ds.clone().print_stream().await?; + + Ok(()) +} diff --git a/examples/examples/test.rs b/examples/examples/test.rs deleted file mode 100644 index d4d20b0..0000000 --- a/examples/examples/test.rs +++ /dev/null @@ -1,60 +0,0 @@ -#![allow(dead_code)] -#![allow(unused_variables)] -#![allow(unused_imports)] - -use datafusion::error::Result; -use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaReadConfig, KafkaTopicBuilder}; -use std::{sync::Arc, time::Duration}; - -use rdkafka::admin::AdminClient; -use rdkafka::admin::AdminOptions; -use rdkafka::admin::ConfigResource; -use rdkafka::admin::ConfigResourceResult; -use rdkafka::admin::ResourceSpecifier; -use rdkafka::config::ClientConfig; -use rdkafka::config::FromClientConfig; -use rdkafka::consumer::Consumer; -use rdkafka::consumer::StreamConsumer; -use rdkafka::metadata::MetadataTopic; -use tracing::field::debug; - -#[tokio::main] -async fn main() -> Result<()> { - let bootstrap_servers = String::from("localhost:19092,localhost:29092,localhost:39092"); - let mut client_config = ClientConfig::new(); - client_config.set("bootstrap.servers", bootstrap_servers.to_string()); - - let admin = AdminClient::from_config(&client_config).unwrap(); - - // let res: Vec = admin - // .describe_configs( - // &vec![ResourceSpecifier::Topic("out_topic")], - // &AdminOptions::default(), - // ) - // .await - // .unwrap() - // .into_iter() - // .map(|v| v.unwrap()) - // .collect(); - // - // for (k, v) in res[0].entry_map().into_iter() { - // println!("{}: {:?}", k, v.value); - // } - - let mut client_config = ClientConfig::new(); - - client_config.set("bootstrap.servers", bootstrap_servers.to_string()); - - let consumer: StreamConsumer = client_config.create().expect("Consumer creation failed"); - - let data = consumer - .fetch_metadata(Some("out_topic"), Duration::from_millis(5_000)) - .unwrap(); - let topic_metadata = data.topics(); - let md = &topic_metadata[0]; - let partitions = md.partitions(); - - println!("{:?}", partitions.len()); - - Ok(()) -}