From 6e6988d345bda568e9beeb416a448e4e7c2769e8 Mon Sep 17 00:00:00 2001 From: Conor Date: Mon, 6 Sep 2021 14:38:53 +1000 Subject: [PATCH] Clippy fixes (#171) --- Cargo.lock | 28 ++++---- shotover-proxy/src/config/topology.rs | 2 +- shotover-proxy/src/message/mod.rs | 12 ++-- .../src/protocols/cassandra_protocol2.rs | 71 +++++++++---------- shotover-proxy/src/protocols/redis_codec.rs | 6 +- shotover-proxy/src/runner.rs | 6 +- shotover-proxy/src/runtimes/mod.rs | 8 +-- .../src/sources/cassandra_source.rs | 1 + shotover-proxy/src/sources/mpsc_source.rs | 5 +- shotover-proxy/src/sources/redis_source.rs | 2 +- shotover-proxy/src/transforms/chain.rs | 17 ++--- .../src/transforms/distributed/route.rs | 2 +- .../src/transforms/distributed/scatter.rs | 21 +++--- .../tunable_consistency_scatter.rs | 12 ++-- shotover-proxy/src/transforms/load_balance.rs | 2 +- shotover-proxy/src/transforms/mpsc.rs | 2 +- shotover-proxy/src/transforms/parallel_map.rs | 2 +- .../redis_transforms/redis_cache.rs | 37 +++++----- .../redis_transforms/redis_cluster.rs | 20 ++++-- .../util/cluster_connection_pool.rs | 19 +++-- .../util/unordered_cluster_connection_pool.rs | 4 +- 21 files changed, 137 insertions(+), 142 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10726e417..7577e9878 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1849,9 +1849,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "039f02eb0f69271f26abe3202189275d7aa2258b903cb0281b5de710a2570ff3" +checksum = "97c9d06878b3a851e8026ef94bf7fef9ba93062cd412601da4d9cf369b1cc62d" dependencies = [ "num-traits 0.2.14", ] @@ -2384,9 +2384,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.21.1" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4d99f2a2f80b100036dde5f8ae106546196592751f35c7afb58ec0683059a3e" +checksum = "202c5bf92cad3d57605c366e644a7fbf305a83f19754fc66678c6265dcc9b8b4" dependencies = [ "async-trait", "bytes", @@ -2916,9 +2916,9 @@ checksum = "c19772be3c4dd2ceaacf03cb41d5885f2a02c4d8804884918e3a258480803335" [[package]] name = "siphasher" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "729a25c17d72b06c68cb47955d44fda88ad2d3e7d77e025663fdd69b93dd71a1" +checksum = "533494a8f9b724d33625ab53c6c4800f7cc445895924a8ef649222dcb76e938b" [[package]] name = "sketches-ddsketch" @@ -3015,9 +3015,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.75" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7f58f7e8eaa0009c5fec437aabf511bd9933e4b2d7407bd05273c01a8906ea7" +checksum = "c6f107db402c2c2055242dbf4d2af0e69197202e9faacbef9571bbe47f5a1b84" dependencies = [ "proc-macro2", "quote", @@ -3088,18 +3088,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.28" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "283d5230e63df9608ac7d9691adc1dfb6e701225436eb64d0b9a7f0a5a04f6ec" +checksum = "602eca064b2d83369e2b2f34b09c70b605402801927c65c11071ac911d299b88" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.28" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa3884228611f5cd3608e2d409bf7dce832e4eb3135e3f11addbd7e41bd68e71" +checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c" dependencies = [ "proc-macro2", "quote", @@ -3227,9 +3227,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" +checksum = "08d3725d3efa29485e87311c5b699de63cde14b00ed4d256b8318aa30ca452cd" dependencies = [ "bytes", "futures-core", diff --git a/shotover-proxy/src/config/topology.rs b/shotover-proxy/src/config/topology.rs index b7a173d50..b50dcc2a7 100644 --- a/shotover-proxy/src/config/topology.rs +++ b/shotover-proxy/src/config/topology.rs @@ -98,7 +98,7 @@ impl Topology { for (key, value) in self.chain_config.clone() { temp.insert( key.clone(), - build_chain_from_config(key, &value, &topics).await?, + build_chain_from_config(key, &value, topics).await?, ); } Ok(temp) diff --git a/shotover-proxy/src/message/mod.rs b/shotover-proxy/src/message/mod.rs index 063c58ab3..21ad12bb4 100644 --- a/shotover-proxy/src/message/mod.rs +++ b/shotover-proxy/src/message/mod.rs @@ -438,9 +438,9 @@ impl From<&Frame> for Value { } } -impl Into for Value { - fn into(self) -> Frame { - match self { +impl From for Frame { + fn from(value: Value) -> Frame { + match value { Value::NULL => Frame::Null, Value::None => unimplemented!(), Value::Bytes(b) => Frame::BulkString(b), @@ -545,9 +545,9 @@ impl Value { } } -impl Into for Value { - fn into(self) -> cassandra_proto::types::value::Bytes { - match self { +impl From for cassandra_proto::types::value::Bytes { + fn from(value: Value) -> cassandra_proto::types::value::Bytes { + match value { Value::NULL => (-1).into(), Value::None => cassandra_proto::types::value::Bytes::new(vec![]), Value::Bytes(b) => cassandra_proto::types::value::Bytes::new(b.to_vec()), diff --git a/shotover-proxy/src/protocols/cassandra_protocol2.rs b/shotover-proxy/src/protocols/cassandra_protocol2.rs index 1cc35e2eb..ff5734292 100644 --- a/shotover-proxy/src/protocols/cassandra_protocol2.rs +++ b/shotover-proxy/src/protocols/cassandra_protocol2.rs @@ -562,50 +562,47 @@ impl CassandraCodec2 { match frame.opcode { Opcode::Query => { - if let Ok(body) = frame.get_body() { - if let ResponseBody::Query(brq) = body { - let parsed_string = CassandraCodec2::parse_query_string( - brq.query.clone().into_plain(), - &self.pk_col_map, - ); - if parsed_string.ast.is_none() { - // TODO: Currently this will probably catch schema changes that don't match - // what the SQL parser expects - return Messages::new_single_bypass(RawFrame::Cassandra(frame)); - } - return Messages::new_single_query( - QueryMessage { - query_string: brq.query.into_plain(), - namespace: parsed_string.namespace.unwrap(), - primary_key: parsed_string.primary_key, - query_values: parsed_string.colmap, - projection: parsed_string.projection, - query_type: QueryType::Read, - ast: parsed_string.ast.map(ASTHolder::SQL), - }, - false, - RawFrame::Cassandra(frame), - ); + if let Ok(ResponseBody::Query(body)) = frame.get_body() { + let parsed_string = CassandraCodec2::parse_query_string( + body.query.clone().into_plain(), + &self.pk_col_map, + ); + if parsed_string.ast.is_none() { + // TODO: Currently this will probably catch schema changes that don't match + // what the SQL parser expects + return Messages::new_single_bypass(RawFrame::Cassandra(frame)); } + return Messages::new_single_query( + QueryMessage { + query_string: body.query.into_plain(), + namespace: parsed_string.namespace.unwrap(), + primary_key: parsed_string.primary_key, + query_values: parsed_string.colmap, + projection: parsed_string.projection, + query_type: QueryType::Read, + ast: parsed_string.ast.map(ASTHolder::SQL), + }, + false, + RawFrame::Cassandra(frame), + ); } Messages::new_single_bypass(RawFrame::Cassandra(frame)) } Opcode::Result => CassandraCodec2::build_response_message(frame, None), Opcode::Error => { - if let Ok(body) = frame.get_body() { - if let ResponseBody::Error(e) = body { - return Messages::new_single_response( - QueryResponse { - matching_query: None, - result: None, - error: Some(Value::Strings(e.message.as_plain())), - response_meta: None, - }, - false, - RawFrame::Cassandra(frame), - ); - } + if let Ok(ResponseBody::Error(body)) = frame.get_body() { + return Messages::new_single_response( + QueryResponse { + matching_query: None, + result: None, + error: Some(Value::Strings(body.message.as_plain())), + response_meta: None, + }, + false, + RawFrame::Cassandra(frame), + ); } + Messages::new_single_bypass(RawFrame::Cassandra(frame)) } _ => Messages::new_single_bypass(RawFrame::Cassandra(frame)), diff --git a/shotover-proxy/src/protocols/redis_codec.rs b/shotover-proxy/src/protocols/redis_codec.rs index 96e242412..1b08a9604 100644 --- a/shotover-proxy/src/protocols/redis_codec.rs +++ b/shotover-proxy/src/protocols/redis_codec.rs @@ -595,10 +595,8 @@ impl RedisCodec { if let Some(result) = &resp.result { return result.clone().into(); } - if let Some(result) = &resp.error { - if let Value::Strings(s) = result { - return Frame::Error(s.clone()); - } + if let Some(Value::Strings(s)) = &resp.error { + return Frame::Error(s.clone()); } debug!("{:?}", resp); diff --git a/shotover-proxy/src/runner.rs b/shotover-proxy/src/runner.rs index 62fb2af91..d679ffe90 100644 --- a/shotover-proxy/src/runner.rs +++ b/shotover-proxy/src/runner.rs @@ -140,11 +140,13 @@ impl Runner { } } +type TracingStateHandle = + Handle, Registry>>; + struct TracingState { /// Once this is dropped tracing logs are ignored guard: WorkerGuard, - handle: - Handle, Registry>>, + handle: TracingStateHandle, } /// Returns a new `EnvFilter` by parsing each directive string, or an error if any directive is invalid. diff --git a/shotover-proxy/src/runtimes/mod.rs b/shotover-proxy/src/runtimes/mod.rs index 9ea73cdf2..8027140f9 100644 --- a/shotover-proxy/src/runtimes/mod.rs +++ b/shotover-proxy/src/runtimes/mod.rs @@ -99,7 +99,7 @@ pub trait ScriptDefinition { type Args; type Return; - fn call<'de>(&self, lua: &'de Lua, args: Self::Args) -> Result + fn call(&self, lua: &Lua, args: Self::Args) -> Result where A: serde::Serialize + Clone, R: serde::de::DeserializeOwned + Clone; @@ -176,16 +176,16 @@ impl ScriptDefinition for ScriptHolder { let func: Func<(i32, u32), i32> = wasm_inst.exports.get(function_name.as_str())?; let start = func - .call(5 as i32, len as u32) + .call(5_i32, len as u32) .map_err(|e| anyhow!("wasm error: {}", e))?; let new_view = mmemory.view::(); let mut new_len_bytes = [0u8; 4]; - for i in 0..4 { + for (i, item) in new_len_bytes.iter_mut().enumerate() { // attempt to get i+1 from the memory view (1,2,3,4) // If we can, return the value it contains, otherwise // default back to 0 - new_len_bytes[i] = new_view.get(i + 1).map(|c| c.get()).unwrap_or(0); + *item = new_view.get(i + 1).map(|c| c.get()).unwrap_or(0); } let new_len = u32::from_ne_bytes(new_len_bytes) as usize; diff --git a/shotover-proxy/src/sources/cassandra_source.rs b/shotover-proxy/src/sources/cassandra_source.rs index 58e6c92d1..50e7fd946 100644 --- a/shotover-proxy/src/sources/cassandra_source.rs +++ b/shotover-proxy/src/sources/cassandra_source.rs @@ -57,6 +57,7 @@ pub struct CassandraSource { } impl CassandraSource { + #![allow(clippy::too_many_arguments)] pub async fn new( chain: &TransformChain, listen_addr: String, diff --git a/shotover-proxy/src/sources/mpsc_source.rs b/shotover-proxy/src/sources/mpsc_source.rs index 513f93b7d..09ec78a6c 100644 --- a/shotover-proxy/src/sources/mpsc_source.rs +++ b/shotover-proxy/src/sources/mpsc_source.rs @@ -43,7 +43,7 @@ impl SourcesFromConfig for AsyncMpscConfig { &self.topic_name, Shutdown::new(trigger_shutdown_on_drop_rx), shutdown_complete_tx, - behavior.clone(), + behavior, ))]) } else { Err(anyhow!( @@ -71,8 +71,7 @@ impl AsyncMpsc { max_behavior: CoalesceBehavior, ) -> AsyncMpsc { info!("Starting MPSC source for the topic [{}] ", name); - let mut main_chain = chain.clone(); - let max_behavior = max_behavior.clone(); + let mut main_chain = chain; let mut buffer: Vec = Vec::new(); let jh = Handle::current().spawn(async move { diff --git a/shotover-proxy/src/sources/redis_source.rs b/shotover-proxy/src/sources/redis_source.rs index 2c1a75f78..ae344a5ab 100644 --- a/shotover-proxy/src/sources/redis_source.rs +++ b/shotover-proxy/src/sources/redis_source.rs @@ -107,7 +107,7 @@ impl RedisSource { RedisSource { name, join_handle, - listen_addr: listen_addr.clone(), + listen_addr, } } } diff --git a/shotover-proxy/src/transforms/chain.rs b/shotover-proxy/src/transforms/chain.rs index aed6ce2b6..82afa5358 100644 --- a/shotover-proxy/src/transforms/chain.rs +++ b/shotover-proxy/src/transforms/chain.rs @@ -6,10 +6,8 @@ use futures::TryFutureExt; use itertools::Itertools; use metrics::{counter, histogram}; -use std::sync::Arc; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Receiver as OneReceiver; -use tokio::sync::Mutex; use tokio::time::Duration; use tokio::time::Instant; use tracing::{debug, trace, warn}; @@ -35,7 +33,7 @@ impl Clone for TransformChain { pub struct BufferedChain { send_handle: Sender, #[cfg(test)] - pub count: Arc>, + pub count: std::sync::Arc>, } impl BufferedChain { @@ -109,9 +107,10 @@ impl TransformChain { pub fn into_buffered_chain(self, buffer_size: usize) -> BufferedChain { let (tx, mut rx) = tokio::sync::mpsc::channel::(buffer_size); - // If this is not a test, this should get removed by the compiler - let count_outer: Arc> = Arc::new(Mutex::new(0_usize)); - let count = count_outer.clone(); + #[cfg(test)] + let count = std::sync::Arc::new(tokio::sync::Mutex::new(0_usize)); + #[cfg(test)] + let count_clone = count.clone(); // Even though we don't keep the join handle, this thread will wrap up once all corresponding senders have been dropped. let _jh = tokio::spawn(async move { @@ -123,7 +122,9 @@ impl TransformChain { }) = rx.recv().await { let name = chain.name.clone(); - if cfg!(test) { + + #[cfg(test)] + { let mut count = count.lock().await; *count += 1; } @@ -158,7 +159,7 @@ impl TransformChain { BufferedChain { send_handle: tx, #[cfg(test)] - count: count_outer, + count: count_clone, } } diff --git a/shotover-proxy/src/transforms/distributed/route.rs b/shotover-proxy/src/transforms/distributed/route.rs index 633e84517..b32c46fb5 100644 --- a/shotover-proxy/src/transforms/distributed/route.rs +++ b/shotover-proxy/src/transforms/distributed/route.rs @@ -37,7 +37,7 @@ impl TransformsFromConfig for RouteConfig { for (key, value) in self.route_map.clone() { temp.insert( key.clone(), - build_chain_from_config(key, &value, &topics).await?, + build_chain_from_config(key, &value, topics).await?, ); } Ok(Transforms::Route(Route { diff --git a/shotover-proxy/src/transforms/distributed/scatter.rs b/shotover-proxy/src/transforms/distributed/scatter.rs index 29d1929c9..f479d0aba 100644 --- a/shotover-proxy/src/transforms/distributed/scatter.rs +++ b/shotover-proxy/src/transforms/distributed/scatter.rs @@ -17,7 +17,6 @@ use mlua::Lua; use serde::{Deserialize, Serialize}; use std::borrow::Borrow; use std::collections::HashMap; -use std::iter::FromIterator; use std::sync::Arc; use tokio::sync::Mutex; @@ -76,16 +75,18 @@ impl Transform for Scatter { } else if chosen_route.is_empty() { ChainResponse::Err(anyhow!("no routes found")) } else { - let mut fu = FuturesUnordered::from_iter(self.route_map.iter_mut().filter_map( - |(name, chain)| { + let mut fu: FuturesUnordered<_> = self + .route_map + .iter_mut() + .filter_map(|(name, chain)| { if let Some(_f) = chosen_route.iter().find(|p| *p == name) { let wrapper = message_wrapper.clone(); Some(chain.process_request(wrapper, name.clone())) } else { None } - }, - )); + }) + .collect(); let mut results: Vec = Vec::new(); while let Some(Ok(messages)) = fu.next().await { @@ -99,15 +100,11 @@ impl Transform for Scatter { for res in &mut results { if let Some(m) = res.messages.pop() { if let MessageDetails::Response(QueryResponse { - matching_query: _, - result, - error: _, - response_meta: _, + result: Some(res), + .. }) = &m.details { - if let Some(res) = result { - collated_results.push(res.clone()); - } + collated_results.push(res.clone()); } } } diff --git a/shotover-proxy/src/transforms/distributed/tunable_consistency_scatter.rs b/shotover-proxy/src/transforms/distributed/tunable_consistency_scatter.rs index f54f0d8db..957464dcc 100644 --- a/shotover-proxy/src/transforms/distributed/tunable_consistency_scatter.rs +++ b/shotover-proxy/src/transforms/distributed/tunable_consistency_scatter.rs @@ -148,14 +148,12 @@ impl Transform for TunableConsistency { QueryType::Read => self.read_consistency, _ => self.write_consistency, } + } else if std::mem::discriminant(&m.original.get_query_type()) + == std::mem::discriminant(&QueryType::Read) + { + self.read_consistency } else { - if std::mem::discriminant(&m.original.get_query_type()) - == std::mem::discriminant(&QueryType::Read) - { - self.read_consistency - } else { - self.write_consistency - } + self.write_consistency } }) .collect_vec(); diff --git a/shotover-proxy/src/transforms/load_balance.rs b/shotover-proxy/src/transforms/load_balance.rs index 942106aaa..404d6b0b9 100644 --- a/shotover-proxy/src/transforms/load_balance.rs +++ b/shotover-proxy/src/transforms/load_balance.rs @@ -20,7 +20,7 @@ pub struct ConnectionBalanceAndPoolConfig { #[async_trait] impl TransformsFromConfig for ConnectionBalanceAndPoolConfig { async fn get_source(&self, topics: &TopicHolder) -> Result { - let chain = build_chain_from_config(self.name.clone(), &self.chain, &topics).await?; + let chain = build_chain_from_config(self.name.clone(), &self.chain, topics).await?; Ok(Transforms::PoolConnections(ConnectionBalanceAndPool { name: "PoolConnections", diff --git a/shotover-proxy/src/transforms/mpsc.rs b/shotover-proxy/src/transforms/mpsc.rs index 667391203..304f40966 100644 --- a/shotover-proxy/src/transforms/mpsc.rs +++ b/shotover-proxy/src/transforms/mpsc.rs @@ -53,7 +53,7 @@ impl Clone for Buffer { #[async_trait] impl TransformsFromConfig for BufferConfig { async fn get_source(&self, topics: &TopicHolder) -> Result { - let chain = build_chain_from_config("forward".to_string(), &self.chain, &topics).await?; + let chain = build_chain_from_config("forward".to_string(), &self.chain, topics).await?; let buffer = self.buffer_size.unwrap_or(5); Ok(Transforms::MPSCForwarder(Buffer { name: "forward", diff --git a/shotover-proxy/src/transforms/parallel_map.rs b/shotover-proxy/src/transforms/parallel_map.rs index bb8e003aa..0b4ee3f1a 100644 --- a/shotover-proxy/src/transforms/parallel_map.rs +++ b/shotover-proxy/src/transforms/parallel_map.rs @@ -73,7 +73,7 @@ pub struct ParallelMapConfig { #[async_trait] impl TransformsFromConfig for ParallelMapConfig { async fn get_source(&self, topics: &TopicHolder) -> Result { - let chain = build_chain_from_config(self.name.clone(), &self.chain, &topics).await?; + let chain = build_chain_from_config(self.name.clone(), &self.chain, topics).await?; Ok(Transforms::ParallelMap(ParallelMap { name: "SequentialMap", diff --git a/shotover-proxy/src/transforms/redis_transforms/redis_cache.rs b/shotover-proxy/src/transforms/redis_transforms/redis_cache.rs index 8331ce8df..e211e6aaf 100644 --- a/shotover-proxy/src/transforms/redis_transforms/redis_cache.rs +++ b/shotover-proxy/src/transforms/redis_transforms/redis_cache.rs @@ -50,7 +50,7 @@ impl TransformsFromConfig for RedisConfig { async fn get_source(&self, topics: &TopicHolder) -> Result { Ok(Transforms::RedisCache(SimpleRedisCache { name: "SimpleRedisCache", - cache_chain: build_chain_from_config("cache_chain".to_string(), &self.chain, &topics) + cache_chain: build_chain_from_config("cache_chain".to_string(), &self.chain, topics) .await?, caching_schema: self.caching_schema.clone(), })) @@ -83,12 +83,12 @@ impl SimpleRedisCache { let table = self .caching_schema .get(&table_lookup) - .ok_or(anyhow!("not a caching table"))?; + .ok_or_else(|| anyhow!("not a caching table"))?; let ast_ref = qm .ast .as_ref() - .ok_or(anyhow!("No AST to convert query to cache query"))? + .ok_or_else(|| anyhow!("No AST to convert query to cache query"))? .clone(); qm.ast.replace(build_redis_ast_from_sql( @@ -141,19 +141,19 @@ fn append_seperator(command_builder: &mut Vec) { // TODO this is super fragile and depends on hidden array values to signal whether we should build the query a certain way if min_size == 1 { - if *prev_char == '-' as u8 { - *prev_char = '[' as u8 - } else if *prev_char == '+' as u8 { - *prev_char = ']' as u8 + if *prev_char == b'-' { + *prev_char = b'[' + } else if *prev_char == b'+' { + *prev_char = b']' } } else { - command_builder.push(':' as u8); + command_builder.push(b':'); } } fn build_redis_commands( expr: &Expr, - pks: &Vec, + pks: &[String], min: &mut Vec, max: &mut Vec, ) -> Result<()> { @@ -162,7 +162,7 @@ fn build_redis_commands( // first check if this is a related to PK if let Expr::Identifier(i) = left.borrow() { let id_string = i.to_string(); - if pks.iter().find(|&v| v == &id_string).is_some() { + if pks.iter().any(|v| v == &id_string) { //Ignore this as we build the pk constraint elsewhere return Ok(()); } @@ -261,10 +261,8 @@ fn build_redis_ast_from_sql( SetExpr::Select(ref mut s) if s.selection.is_some() => { let expr = s.selection.as_mut().unwrap(); let mut commands_buffer: Vec = Vec::new(); - let mut min: Vec = Vec::new(); - min.push('-' as u8); - let mut max: Vec = Vec::new(); - max.push('+' as u8); + let mut min: Vec = vec![b'-']; + let mut max: Vec = vec![b'+']; build_redis_commands(expr, &pk_schema.partition_key, &mut min, &mut max)?; @@ -285,9 +283,8 @@ fn build_redis_ast_from_sql( _ => Err(anyhow!("Couldn't build query")), }, Statement::Insert { .. } | Statement::Update { .. } => { - let mut commands_buffer: Vec = Vec::new(); - - commands_buffer.push(ShotoverValue::Bytes("ZADD".into())); + let mut commands_buffer: Vec = + vec![ShotoverValue::Bytes("ZADD".into())]; let pk = pk_schema .partition_key @@ -310,7 +307,7 @@ fn build_redis_ast_from_sql( let values = query_values .as_ref() - .ok_or(anyhow!("Couldn't build query"))? + .ok_or_else(|| anyhow!("Couldn't build query"))? .iter() .filter_map(|(p, v)| { if !pk_schema.partition_key.contains(p) && !pk_schema.range_key.contains(p) @@ -325,8 +322,8 @@ fn build_redis_ast_from_sql( for v in values { commands_buffer.push(ShotoverValue::Bytes(Bytes::from("0"))); let mut value = clustering.clone(); - if value.len() != 0 { - value.put_u8(':' as u8); + if !value.is_empty() { + value.put_u8(b':'); } value.extend(v.clone().into_str_bytes()); commands_buffer.push(ShotoverValue::Bytes(value.freeze())); diff --git a/shotover-proxy/src/transforms/redis_transforms/redis_cluster.rs b/shotover-proxy/src/transforms/redis_transforms/redis_cluster.rs index 41107fdd1..a37500de7 100644 --- a/shotover-proxy/src/transforms/redis_transforms/redis_cluster.rs +++ b/shotover-proxy/src/transforms/redis_transforms/redis_cluster.rs @@ -57,7 +57,7 @@ impl TransformsFromConfig for RedisClusterConfig { for node in slots.masters.values() { match connection_pool - .get_connections(&node, self.connection_count.unwrap_or(1)) + .get_connections(node, self.connection_count.unwrap_or(1)) .await { Ok(conn) => { @@ -107,7 +107,7 @@ impl RedisCluster { #[inline] async fn choose_and_send( &mut self, - host: &String, + host: &str, message: Message, ) -> Result> { let (one_tx, one_rx) = tokio::sync::oneshot::channel::(); @@ -122,8 +122,14 @@ impl RedisCluster { let aidx = candidates.index(0); let bidx = candidates.index(1); - let aload = *self.load_scores.entry((host.clone(), aidx)).or_insert(0); - let bload = *self.load_scores.entry((host.clone(), bidx)).or_insert(0); + let aload = *self + .load_scores + .entry((host.to_string(), aidx)) + .or_insert(0); + let bload = *self + .load_scores + .entry((host.to_string(), bidx)) + .or_insert(0); chans .get_mut(if aload <= bload { aidx } else { bidx }) @@ -140,7 +146,7 @@ impl RedisCluster { { if let Ok(conn) = res { debug!("Found {} live connections for {}", conn.len(), host); - self.channels.insert(host.clone(), conn); + self.channels.insert(host.to_string(), conn); self.channels .get_mut(host) .unwrap() @@ -185,7 +191,7 @@ impl RedisCluster { async fn get_channels(&mut self, redis_frame: &RawFrame) -> Vec { match &redis_frame { RawFrame::Redis(Frame::Array(ref commands)) => { - match RoutingInfo::for_command_frame(&commands) { + match RoutingInfo::for_command_frame(commands) { Some(RoutingInfo::Slot(slot)) => { if let Some((_, lookup)) = self.slots.masters.range(&slot..).next() { // let idx = self.choose(lookup); @@ -315,7 +321,7 @@ impl RoutingInfo { #[inline(always)] pub fn for_key(key: &Frame) -> Option { if let Frame::BulkString(key) = key { - let key = get_hashtag(&key).unwrap_or(&key); + let key = get_hashtag(key).unwrap_or(key); Some(RoutingInfo::Slot( crc16::State::::calculate(key) % SLOT_SIZE as u16, )) diff --git a/shotover-proxy/src/transforms/util/cluster_connection_pool.rs b/shotover-proxy/src/transforms/util/cluster_connection_pool.rs index 019ebb2e8..5c50d6431 100644 --- a/shotover-proxy/src/transforms/util/cluster_connection_pool.rs +++ b/shotover-proxy/src/transforms/util/cluster_connection_pool.rs @@ -1,5 +1,4 @@ use std::collections::{HashMap, HashSet}; -use std::iter::FromIterator; use std::sync::Arc; use anyhow::{anyhow, Result}; @@ -34,7 +33,7 @@ pub struct ConnectionPool { impl ConnectionPool { pub fn new(hosts: Vec, codec: C) -> Self { ConnectionPool { - host_set: Arc::new(Mutex::new(HashSet::from_iter(hosts.into_iter()))), + host_set: Arc::new(Mutex::new(hosts.into_iter().collect())), queue_map: Arc::new(Mutex::new(HashMap::new())), codec, auth_func: |_, _| Ok(()), @@ -47,7 +46,7 @@ impl ConnectionPool { auth_func: fn(&ConnectionPool, &mut UnboundedSender) -> Result<()>, ) -> Self { ConnectionPool { - host_set: Arc::new(Mutex::new(HashSet::from_iter(hosts.into_iter()))), + host_set: Arc::new(Mutex::new(hosts.into_iter().collect())), queue_map: Arc::new(Mutex::new(HashMap::new())), codec, auth_func, @@ -59,23 +58,23 @@ impl ConnectionPool { /// updating the connection map. Errors are returned when a connection can't be established. pub async fn get_connections( &self, - host: &String, + host: &str, connection_count: i32, ) -> Result>> { let mut queue_map = self.queue_map.lock().await; if let Some(x) = queue_map.get(host) { if x.iter().all(|x| !x.is_closed()) { - return Ok(x.clone()); + return Ok(x.to_vec()); } } - let connections = self.new_connections(&host, connection_count).await?; - queue_map.insert(host.clone(), connections.clone()); + let connections = self.new_connections(host, connection_count).await?; + queue_map.insert(host.to_string(), connections.clone()); Ok(connections) } pub async fn new_connections( &self, - host: &String, + host: &str, connection_count: i32, ) -> Result>> where @@ -87,7 +86,7 @@ impl ConnectionPool { let stream = TcpStream::connect(host).await?; let mut out_tx = spawn_from_stream(&self.codec, stream); - match (self.auth_func)(&self, &mut out_tx) { + match (self.auth_func)(self, &mut out_tx) { Ok(_) => { connections.push(out_tx); } @@ -97,7 +96,7 @@ impl ConnectionPool { } } - if connections.len() == 0 { + if connections.is_empty() { Err(anyhow!("Couldn't connect to upstream TCP service")) } else { Ok(connections) diff --git a/shotover-proxy/src/transforms/util/unordered_cluster_connection_pool.rs b/shotover-proxy/src/transforms/util/unordered_cluster_connection_pool.rs index e77aab8cb..f87423486 100644 --- a/shotover-proxy/src/transforms/util/unordered_cluster_connection_pool.rs +++ b/shotover-proxy/src/transforms/util/unordered_cluster_connection_pool.rs @@ -73,7 +73,7 @@ impl OwnedUnorderedConnectionPool { tokio::spawn(tx_process(write, out_rx, return_tx, self.codec.clone())); tokio::spawn(rx_process(read, return_rx, self.codec.clone())); - match (self.auth_func)(&self, &mut out_tx) { + match (self.auth_func)(self, &mut out_tx) { Ok(_) => { connection_pool.push(out_tx); } @@ -83,7 +83,7 @@ impl OwnedUnorderedConnectionPool { } } - if connection_pool.len() == 0 { + if connection_pool.is_empty() { Err(anyhow!("Couldn't connect to upstream TCP service")) } else { self.connections = connection_pool;