Skip to content

Commit

Permalink
chore: bump tonic to v0.12 (#17889)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: William Wen <[email protected]>
  • Loading branch information
BugenZhao and wenym1 authored Aug 14, 2024
1 parent 3043efd commit e96c39d
Show file tree
Hide file tree
Showing 59 changed files with 513 additions and 329 deletions.
360 changes: 238 additions & 122 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 11 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,19 @@ aws-smithy-types = { version = "1", default-features = false, features = [
aws-endpoint = "0.60"
aws-types = "1"
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
etcd-client = { package = "madsim-etcd-client", version = "0.4" }
etcd-client = { package = "madsim-etcd-client", version = "0.6" }
futures-async-stream = "0.2.9"
hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", version = "0.4.1", features = [
"cmake-build",
] }
hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] }
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "492c244e0be91feb659c0cd48a624bbd96045a33" }
prost = { version = "0.12" }
prost-build = { version = "0.12" }
tonic = { package = "madsim-tonic", version = "0.5.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.5" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "e6cd165b9bc85783b42c106e99186b86b73e3507" }
prost = { version = "0.13" }
prost-build = { version = "0.13" }
icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "1860eb315183a5f3f72b4097c1e40d49407f8373", features = [
"prometheus",
] }
Expand Down Expand Up @@ -180,6 +180,7 @@ tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git"
"profiling",
"stats",
], rev = "64a2d9" }
# TODO(http-bump): bump to use tonic 0.12 once minitrace-opentelemetry is updated
opentelemetry = "0.23"
opentelemetry-otlp = "0.16"
opentelemetry_sdk = { version = "0.23", default-features = false }
Expand All @@ -195,6 +196,7 @@ sea-orm = { version = "0.12.14", features = [
"runtime-tokio-native-tls",
] }
sqlx = "0.7"
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055", features = ["net", "fs"] }
tokio-util = "0.7"
tracing-opentelemetry = "0.24"
rand = { version = "0.8", features = ["small_rng"] }
Expand Down Expand Up @@ -335,7 +337,9 @@ opt-level = 2
# Patch third-party crates for deterministic simulation.
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }
getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" }
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e" }
# Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies.
# Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`.
# tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055" }
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88" }
futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" }
Expand Down
1 change: 0 additions & 1 deletion ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ services:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry:8082
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: message_queue:29092
SCHEMA_REGISTRY_DEBUG: 'true'

pulsar-server:
container_name: pulsar-server
Expand Down
16 changes: 5 additions & 11 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -201,25 +201,19 @@ format plain encode protobuf (
message = 'recursive.AllTypes');

statement ok
drop sink sink_upsert;
drop table from_kafka cascade;

statement ok
drop sink sink_csr_nested;
drop table from_kafka_csr_trivial cascade;

statement ok
drop sink sink_csr_trivial;
drop table from_kafka_csr_nested cascade;

statement ok
drop sink sink0;
drop table from_kafka_raw cascade;

statement ok
drop table into_kafka;

statement ok
drop table from_kafka_raw;
drop table into_kafka cascade;

system ok
rpk topic delete test-rw-sink-upsert-protobuf

statement ok
drop table from_kafka;
2 changes: 1 addition & 1 deletion src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
] }
tokio-metrics = "0.3.0"
tokio-stream = "0.1"
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use bytes::Bytes;
use futures_async_stream::try_stream;
use hashbrown::hash_map::Entry;
use itertools::Itertools;
use prost::Message;
use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{Field, Schema};
Expand All @@ -35,6 +34,7 @@ use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HashAggNode;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;

use crate::error::{BatchError, Result};
use crate::executor::aggregation::build as build_agg;
Expand Down
7 changes: 2 additions & 5 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
let pk_prefix = OwnedRow::new(scan_range.eq_conds);

if self.lookup_prefix_len == self.table.pk_indices().len() {
let row = self
.table
.get_row(&pk_prefix, self.epoch.clone().into())
.await?;
let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;

if let Some(row) = row {
self.row_list.push(row);
Expand All @@ -366,7 +363,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
let iter = self
.table
.batch_iter_with_pk_bounds(
self.epoch.clone().into(),
self.epoch.into(),
&pk_prefix,
..,
false,
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::Arc;
use bytes::Bytes;
use futures_async_stream::try_stream;
use itertools::Itertools;
use prost::Message;
use risingwave_common::array::{Array, DataChunk, RowRef};
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::Schema;
Expand All @@ -34,6 +33,7 @@ use risingwave_common_estimate_size::EstimateSize;
use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;

use super::{ChunkedData, JoinType, RowId};
use crate::error::{BatchError, Result};
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
..Default::default()
}),
}),
epoch: Some(self.epoch.clone()),
epoch: Some(self.epoch),
tracing_context: TracingContext::from_current_span().to_protobuf(),
};

Expand Down Expand Up @@ -237,7 +237,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
&plan_node,
&task_id,
self.context.clone(),
self.epoch.clone(),
self.epoch,
self.shutdown_rx.clone(),
);

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {
plan_node,
self.task_id,
self.context.clone(),
self.epoch.clone(),
self.epoch,
self.shutdown_rx.clone(),
)
}
Expand All @@ -188,7 +188,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {
}

pub fn epoch(&self) -> BatchQueryEpoch {
self.epoch.clone()
self.epoch
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;
use bytes::Bytes;
use futures_async_stream::try_stream;
use itertools::Itertools;
use prost::Message;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::memory::MemoryContext;
Expand All @@ -28,6 +27,7 @@ use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;

use super::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
Expand Down
7 changes: 3 additions & 4 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {

let ordered = seq_scan_node.ordered;

let epoch = source.epoch.clone();
let epoch = source.epoch;
let limit = seq_scan_node.limit;
let as_of = seq_scan_node
.as_of
Expand Down Expand Up @@ -341,8 +341,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
for point_get in point_gets {
let table = table.clone();
if let Some(row) =
Self::execute_point_get(table, point_get, query_epoch.clone(), histogram.clone())
.await?
Self::execute_point_get(table, point_get, query_epoch, histogram.clone()).await?
{
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
returned += chunk.cardinality() as u64;
Expand Down Expand Up @@ -373,7 +372,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
table.clone(),
range,
ordered,
query_epoch.clone(),
query_epoch,
chunk_size,
limit,
histogram.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/spill/spill_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use futures_util::AsyncReadExt;
use opendal::layers::RetryLayer;
use opendal::services::{Fs, Memory};
use opendal::Operator;
use prost::Message;
use risingwave_common::array::DataChunk;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;
use thiserror_ext::AsReport;
use tokio::sync::Mutex;
use twox_hash::XxHash64;
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/broadcast_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub fn new_broadcast_channel(
output_channel_size: usize,
) -> (ChanSenderImpl, Vec<ChanReceiverImpl>) {
let broadcast_info = match shuffle.distribution {
Some(exchange_info::Distribution::BroadcastInfo(ref v)) => v.clone(),
Some(exchange_info::Distribution::BroadcastInfo(ref v)) => *v,
_ => BroadcastInfo::default(),
};

Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
self.plan.root.as_ref().unwrap(),
&self.task_id,
self.context.clone(),
self.epoch.clone(),
self.epoch,
self.shutdown_rx.clone(),
)
.build(),
Expand Down
2 changes: 1 addition & 1 deletion src/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"time",
"signal",
] }
tokio-stream = "0.1"
tokio-stream = { workspace = true }
toml = "0.8"
tracing = "0.1"
tracing-subscriber = "0.3.17"
Expand Down
2 changes: 1 addition & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
governor = { version = "0.6", default-features = false, features = ["std"] }
hashbrown = "0.14"
hex = "0.4.3"
http = "0.2"
http = "1"
humantime = "2.1"
hytra = { workspace = true }
itertools = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/common/common_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ normal = ["workspace-hack"]
async-trait = "0.1"
axum = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14" # required by tonic
http = "1"
prometheus = { version = "0.13" }
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions src/common/common_service/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::task::{Context, Poll};

use futures::Future;
use hyper::Body;
use risingwave_common::util::tracing::TracingContext;
use tonic::body::BoxBody;
use tower::{Layer, Service};
use tracing::Instrument;

Expand Down Expand Up @@ -49,9 +49,9 @@ pub struct TracingExtract<S> {
inner: S,
}

impl<S> Service<hyper::Request<Body>> for TracingExtract<S>
impl<S> Service<http::Request<BoxBody>> for TracingExtract<S>
where
S: Service<hyper::Request<Body>> + Clone + Send + 'static,
S: Service<http::Request<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
{
type Error = S::Error;
Expand All @@ -63,7 +63,7 @@ where
self.inner.poll_ready(cx)
}

fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
// This is necessary because tonic internally uses `tower::buffer::Buffer`.
// See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
// for details on why this is necessary
Expand Down
14 changes: 9 additions & 5 deletions src/common/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
auto_impl = "1"
bytes = "1"
clap = { workspace = true }
easy-ext = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
http = "0.2"
hyper = { version = "0.14", features = ["client"] } # used by tonic
http = "1"
http-02 = { package = "http", version = "0.2" }
hyper = { version = "1" }
hyper-014 = { package = "hyper", version = "0.14" }
hyper-util = { version = "0.1", features = ["client-legacy"] }
hytra = { workspace = true }
itertools = { workspace = true }
parking_lot = { workspace = true }
Expand All @@ -32,13 +36,13 @@ serde = { version = "1", features = ["derive"] }
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio" }
tonic = { workspace = true }
tower-layer = "0.3.2"
tower-service = "0.3.2"
tracing = "0.1"
tracing-subscriber = "0.3.17"

[target.'cfg(not(madsim))'.dependencies]
http-body = "0.4.5"
tower-layer = "0.3.2"
tower-service = "0.3.2"
http-body = "1"
[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.16", default-features = false }
libc = "0.2"
Expand Down
Loading

0 comments on commit e96c39d

Please sign in to comment.