Skip to content

Commit

Permalink
Clippy fixes (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Sep 6, 2021
1 parent 3d713b4 commit 6e6988d
Show file tree
Hide file tree
Showing 21 changed files with 137 additions and 142 deletions.
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion shotover-proxy/src/config/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl Topology {
for (key, value) in self.chain_config.clone() {
temp.insert(
key.clone(),
build_chain_from_config(key, &value, &topics).await?,
build_chain_from_config(key, &value, topics).await?,
);
}
Ok(temp)
Expand Down
12 changes: 6 additions & 6 deletions shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ impl From<&Frame> for Value {
}
}

impl Into<Frame> for Value {
fn into(self) -> Frame {
match self {
impl From<Value> for Frame {
fn from(value: Value) -> Frame {
match value {
Value::NULL => Frame::Null,
Value::None => unimplemented!(),
Value::Bytes(b) => Frame::BulkString(b),
Expand Down Expand Up @@ -545,9 +545,9 @@ impl Value {
}
}

impl Into<cassandra_proto::types::value::Bytes> for Value {
fn into(self) -> cassandra_proto::types::value::Bytes {
match self {
impl From<Value> for cassandra_proto::types::value::Bytes {
fn from(value: Value) -> cassandra_proto::types::value::Bytes {
match value {
Value::NULL => (-1).into(),
Value::None => cassandra_proto::types::value::Bytes::new(vec![]),
Value::Bytes(b) => cassandra_proto::types::value::Bytes::new(b.to_vec()),
Expand Down
71 changes: 34 additions & 37 deletions shotover-proxy/src/protocols/cassandra_protocol2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,50 +562,47 @@ impl CassandraCodec2 {

match frame.opcode {
Opcode::Query => {
if let Ok(body) = frame.get_body() {
if let ResponseBody::Query(brq) = body {
let parsed_string = CassandraCodec2::parse_query_string(
brq.query.clone().into_plain(),
&self.pk_col_map,
);
if parsed_string.ast.is_none() {
// TODO: Currently this will probably catch schema changes that don't match
// what the SQL parser expects
return Messages::new_single_bypass(RawFrame::Cassandra(frame));
}
return Messages::new_single_query(
QueryMessage {
query_string: brq.query.into_plain(),
namespace: parsed_string.namespace.unwrap(),
primary_key: parsed_string.primary_key,
query_values: parsed_string.colmap,
projection: parsed_string.projection,
query_type: QueryType::Read,
ast: parsed_string.ast.map(ASTHolder::SQL),
},
false,
RawFrame::Cassandra(frame),
);
if let Ok(ResponseBody::Query(body)) = frame.get_body() {
let parsed_string = CassandraCodec2::parse_query_string(
body.query.clone().into_plain(),
&self.pk_col_map,
);
if parsed_string.ast.is_none() {
// TODO: Currently this will probably catch schema changes that don't match
// what the SQL parser expects
return Messages::new_single_bypass(RawFrame::Cassandra(frame));
}
return Messages::new_single_query(
QueryMessage {
query_string: body.query.into_plain(),
namespace: parsed_string.namespace.unwrap(),
primary_key: parsed_string.primary_key,
query_values: parsed_string.colmap,
projection: parsed_string.projection,
query_type: QueryType::Read,
ast: parsed_string.ast.map(ASTHolder::SQL),
},
false,
RawFrame::Cassandra(frame),
);
}
Messages::new_single_bypass(RawFrame::Cassandra(frame))
}
Opcode::Result => CassandraCodec2::build_response_message(frame, None),
Opcode::Error => {
if let Ok(body) = frame.get_body() {
if let ResponseBody::Error(e) = body {
return Messages::new_single_response(
QueryResponse {
matching_query: None,
result: None,
error: Some(Value::Strings(e.message.as_plain())),
response_meta: None,
},
false,
RawFrame::Cassandra(frame),
);
}
if let Ok(ResponseBody::Error(body)) = frame.get_body() {
return Messages::new_single_response(
QueryResponse {
matching_query: None,
result: None,
error: Some(Value::Strings(body.message.as_plain())),
response_meta: None,
},
false,
RawFrame::Cassandra(frame),
);
}

Messages::new_single_bypass(RawFrame::Cassandra(frame))
}
_ => Messages::new_single_bypass(RawFrame::Cassandra(frame)),
Expand Down
6 changes: 2 additions & 4 deletions shotover-proxy/src/protocols/redis_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,10 +595,8 @@ impl RedisCodec {
if let Some(result) = &resp.result {
return result.clone().into();
}
if let Some(result) = &resp.error {
if let Value::Strings(s) = result {
return Frame::Error(s.clone());
}
if let Some(Value::Strings(s)) = &resp.error {
return Frame::Error(s.clone());
}

debug!("{:?}", resp);
Expand Down
6 changes: 4 additions & 2 deletions shotover-proxy/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ impl Runner {
}
}

type TracingStateHandle =
Handle<EnvFilter, Layered<Layer<Registry, DefaultFields, Format, NonBlocking>, Registry>>;

struct TracingState {
/// Once this is dropped tracing logs are ignored
guard: WorkerGuard,
handle:
Handle<EnvFilter, Layered<Layer<Registry, DefaultFields, Format, NonBlocking>, Registry>>,
handle: TracingStateHandle,
}

/// Returns a new `EnvFilter` by parsing each directive string, or an error if any directive is invalid.
Expand Down
8 changes: 4 additions & 4 deletions shotover-proxy/src/runtimes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub trait ScriptDefinition<A, R> {
type Args;
type Return;

fn call<'de>(&self, lua: &'de Lua, args: Self::Args) -> Result<Self::Return>
fn call(&self, lua: &Lua, args: Self::Args) -> Result<Self::Return>
where
A: serde::Serialize + Clone,
R: serde::de::DeserializeOwned + Clone;
Expand Down Expand Up @@ -176,16 +176,16 @@ impl<A, R> ScriptDefinition<A, R> for ScriptHolder<A, R> {
let func: Func<(i32, u32), i32> =
wasm_inst.exports.get(function_name.as_str())?;
let start = func
.call(5 as i32, len as u32)
.call(5_i32, len as u32)
.map_err(|e| anyhow!("wasm error: {}", e))?;
let new_view = mmemory.view::<u8>();
let mut new_len_bytes = [0u8; 4];

for i in 0..4 {
for (i, item) in new_len_bytes.iter_mut().enumerate() {
// attempt to get i+1 from the memory view (1,2,3,4)
// If we can, return the value it contains, otherwise
// default back to 0
new_len_bytes[i] = new_view.get(i + 1).map(|c| c.get()).unwrap_or(0);
*item = new_view.get(i + 1).map(|c| c.get()).unwrap_or(0);
}

let new_len = u32::from_ne_bytes(new_len_bytes) as usize;
Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/src/sources/cassandra_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct CassandraSource {
}

impl CassandraSource {
#![allow(clippy::too_many_arguments)]
pub async fn new(
chain: &TransformChain,
listen_addr: String,
Expand Down
5 changes: 2 additions & 3 deletions shotover-proxy/src/sources/mpsc_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl SourcesFromConfig for AsyncMpscConfig {
&self.topic_name,
Shutdown::new(trigger_shutdown_on_drop_rx),
shutdown_complete_tx,
behavior.clone(),
behavior,
))])
} else {
Err(anyhow!(
Expand Down Expand Up @@ -71,8 +71,7 @@ impl AsyncMpsc {
max_behavior: CoalesceBehavior,
) -> AsyncMpsc {
info!("Starting MPSC source for the topic [{}] ", name);
let mut main_chain = chain.clone();
let max_behavior = max_behavior.clone();
let mut main_chain = chain;
let mut buffer: Vec<Message> = Vec::new();

let jh = Handle::current().spawn(async move {
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/sources/redis_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl RedisSource {
RedisSource {
name,
join_handle,
listen_addr: listen_addr.clone(),
listen_addr,
}
}
}
17 changes: 9 additions & 8 deletions shotover-proxy/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ use futures::TryFutureExt;

use itertools::Itertools;
use metrics::{counter, histogram};
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Receiver as OneReceiver;
use tokio::sync::Mutex;
use tokio::time::Duration;
use tokio::time::Instant;
use tracing::{debug, trace, warn};
Expand All @@ -35,7 +33,7 @@ impl Clone for TransformChain {
pub struct BufferedChain {
send_handle: Sender<ChannelMessage>,
#[cfg(test)]
pub count: Arc<Mutex<usize>>,
pub count: std::sync::Arc<tokio::sync::Mutex<usize>>,
}

impl BufferedChain {
Expand Down Expand Up @@ -109,9 +107,10 @@ impl TransformChain {
pub fn into_buffered_chain(self, buffer_size: usize) -> BufferedChain {
let (tx, mut rx) = tokio::sync::mpsc::channel::<ChannelMessage>(buffer_size);

// If this is not a test, this should get removed by the compiler
let count_outer: Arc<Mutex<usize>> = Arc::new(Mutex::new(0_usize));
let count = count_outer.clone();
#[cfg(test)]
let count = std::sync::Arc::new(tokio::sync::Mutex::new(0_usize));
#[cfg(test)]
let count_clone = count.clone();

// Even though we don't keep the join handle, this thread will wrap up once all corresponding senders have been dropped.
let _jh = tokio::spawn(async move {
Expand All @@ -123,7 +122,9 @@ impl TransformChain {
}) = rx.recv().await
{
let name = chain.name.clone();
if cfg!(test) {

#[cfg(test)]
{
let mut count = count.lock().await;
*count += 1;
}
Expand Down Expand Up @@ -158,7 +159,7 @@ impl TransformChain {
BufferedChain {
send_handle: tx,
#[cfg(test)]
count: count_outer,
count: count_clone,
}
}

Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/distributed/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl TransformsFromConfig for RouteConfig {
for (key, value) in self.route_map.clone() {
temp.insert(
key.clone(),
build_chain_from_config(key, &value, &topics).await?,
build_chain_from_config(key, &value, topics).await?,
);
}
Ok(Transforms::Route(Route {
Expand Down
Loading

0 comments on commit 6e6988d

Please sign in to comment.