Skip to content

Commit

Permalink
Add 'easy' feature and easy encoders to easily work with async code w…
Browse files Browse the repository at this point in the history
…ithout having to worry about mutability, while still using the cache.
  • Loading branch information
gklijs committed Jul 10, 2021
1 parent e3ec5ef commit 17432bc
Show file tree
Hide file tree
Showing 9 changed files with 476 additions and 3 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ blocking = ["reqwest/blocking"]
json = ["url", "valico"]
proto_decoder = ["bytes", "integer-encoding", "logos", "protofish"]
proto_raw = ["integer-encoding", "logos"]
easy = ["tokio"]
kafka_test = []
default = ["futures","native_tls"]

Expand Down Expand Up @@ -70,6 +71,11 @@ optional = true
version = "^2"
optional = true

[dependencies.tokio]
version = "^1.2.0"
features = ["macros"]
optional = true

[dependencies.valico]
version = "^3.5"
optional = true
Expand All @@ -79,7 +85,6 @@ mockito = "^0.29.0"
rdkafka = { version = "^0.25.0", features = ["cmake-build"] }
rand = "^0.8.3"
test_utils = {path = "test_utils"}
tokio = { version = "^1.2.0", features = ["macros"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
14 changes: 13 additions & 1 deletion src/async_impl/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,19 @@ mod tests {
.await
.unwrap();

assert_eq!(bytes, vec![0, 0, 0, 0, 3, 6])
assert_eq!(bytes, vec![0, 0, 0, 0, 3, 6]);

let value_strategy =
SubjectNameStrategy::TopicNameStrategy(String::from("heartbeat"), true);
let bytes = encoder
.encode(
vec![("name", Value::String(String::from("x")))],
value_strategy,
)
.await
.unwrap();

assert_eq!(bytes, vec![0, 0, 0, 0, 4, 2, 120])
}

#[tokio::test]
Expand Down
142 changes: 142 additions & 0 deletions src/async_impl/easy_avro.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use crate::async_impl::avro::{AvroDecoder, AvroEncoder};
use crate::async_impl::schema_registry::SrSettings;
use crate::avro_common::DecodeResult;
use crate::error::SRCError;
use crate::schema_registry_common::SubjectNameStrategy;
use avro_rs::types::Value;
use serde::Serialize;
use std::sync::Arc;
use tokio::sync::Mutex;

/// A decoder used to transform bytes to a [DecodeResult], its much like [AvroDecoder] but includes a mutex, so the user does not need to care about mutability.
pub struct EasyAvroDecoder {
decoder: Arc<Mutex<AvroDecoder<'static>>>,
}

impl EasyAvroDecoder {
pub fn new(sr_settings: SrSettings) -> EasyAvroDecoder {
let decoder = Arc::new(Mutex::new(AvroDecoder::new(sr_settings)));
EasyAvroDecoder { decoder }
}
pub async fn decode(&self, bytes: Option<&[u8]>) -> Result<DecodeResult, SRCError> {
let mut lock = self.decoder.lock().await;
lock.decode(bytes).await
}
}

/// An encoder used to transform a [Value] to bytes, its much like [AvroEncoder] but includes a mutex, so the user does not need to care about mutability.
pub struct EasyAvroEncoder {
encoder: Arc<Mutex<AvroEncoder<'static>>>,
}

impl EasyAvroEncoder {
pub fn new(sr_settings: SrSettings) -> EasyAvroEncoder {
let encoder = Arc::new(Mutex::new(AvroEncoder::new(sr_settings)));
EasyAvroEncoder { encoder }
}
pub async fn encode(
&self,
values: Vec<(&'static str, Value)>,
subject_name_strategy: SubjectNameStrategy,
) -> Result<Vec<u8>, SRCError> {
let mut lock = self.encoder.lock().await;
lock.encode(values, subject_name_strategy).await
}
pub async fn encode_struct(
&self,
item: impl Serialize,
subject_name_strategy: &SubjectNameStrategy,
) -> Result<Vec<u8>, SRCError> {
let mut lock = self.encoder.lock().await;
lock.encode_struct(item, subject_name_strategy).await
}
}

#[cfg(test)]
mod tests {
use crate::async_impl::easy_avro::{EasyAvroDecoder, EasyAvroEncoder};
use crate::async_impl::schema_registry::SrSettings;
use crate::avro_common::get_supplied_schema;
use crate::schema_registry_common::SubjectNameStrategy;
use avro_rs::types::Value;
use avro_rs::{from_value, Schema};
use mockito::{mock, server_address};
use test_utils::Heartbeat;

#[tokio::test]
async fn test_decoder_default() {
let _m = mock("GET", "/schemas/ids/1?deleted=true")
.with_status(200)
.with_header("content-type", "application/vnd.schemaregistry.v1+json")
.with_body(r#"{"schema":"{\"type\":\"record\",\"name\":\"Heartbeat\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"beat\",\"type\":\"long\"}]}"}"#)
.create();

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let decoder = EasyAvroDecoder::new(sr_settings);
let heartbeat = decoder
.decode(Some(&[0, 0, 0, 0, 1, 6]))
.await
.unwrap()
.value;

assert_eq!(
heartbeat,
Value::Record(vec![("beat".to_string(), Value::Long(3))])
);

let item = match from_value::<Heartbeat>(&heartbeat) {
Ok(h) => h,
Err(_) => unreachable!(),
};
assert_eq!(item.beat, 3i64);
}

#[tokio::test]
async fn test_encode_value() {
let _m = mock("GET", "/subjects/heartbeat-value/versions/latest")
.with_status(200)
.with_header("content-type", "application/vnd.schemaregistry.v1+json")
.with_body(r#"{"subject":"heartbeat-value","version":1,"id":3,"schema":"{\"type\":\"record\",\"name\":\"Heartbeat\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"beat\",\"type\":\"long\"}]}"}"#)
.create();

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let encoder = EasyAvroEncoder::new(sr_settings);

let value_strategy =
SubjectNameStrategy::TopicNameStrategy(String::from("heartbeat"), false);
let bytes = encoder
.encode(vec![("beat", Value::Long(3))], value_strategy)
.await
.unwrap();

assert_eq!(bytes, vec![0, 0, 0, 0, 3, 6])
}

#[tokio::test]
async fn test_primitive_schema() {
let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let encoder = EasyAvroEncoder::new(sr_settings);

let _n = mock("POST", "/subjects/heartbeat-key/versions")
.with_status(200)
.with_header("content-type", "application/vnd.schemaregistry.v1+json")
.with_body(r#"{"id":4}"#)
.create();

let primitive_schema_strategy = SubjectNameStrategy::TopicNameStrategyWithSchema(
String::from("heartbeat"),
true,
get_supplied_schema(&Schema::String),
);
let bytes = encoder
.encode_struct("key-value", &primitive_schema_strategy)
.await;

assert_eq!(
bytes,
Ok(vec![
0, 0, 0, 0, 4, 18, 107, 101, 121, 45, 118, 97, 108, 117, 101
])
);
}
}
119 changes: 119 additions & 0 deletions src/async_impl/easy_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use crate::async_impl::json::{DecodeResult, JsonDecoder, JsonEncoder};
use crate::async_impl::schema_registry::SrSettings;
use crate::error::SRCError;
use crate::schema_registry_common::SubjectNameStrategy;
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::Mutex;

/// A decoder used to transform bytes to a [DecodeResult], its much like [JsonDecoder] but includes a mutex, so the user does not need to care about mutability.
pub struct EasyJsonDecoder {
decoder: Arc<Mutex<JsonDecoder<'static>>>,
}

impl EasyJsonDecoder {
pub fn new(sr_settings: SrSettings) -> EasyJsonDecoder {
let decoder = Arc::new(Mutex::new(JsonDecoder::new(sr_settings)));
EasyJsonDecoder { decoder }
}
pub async fn decode(&self, bytes: Option<&[u8]>) -> Result<Option<DecodeResult>, SRCError> {
let mut lock = self.decoder.lock().await;
lock.decode(bytes).await
}
}

/// An encoder used to transform a [Value] to bytes, its much like [JsonEncoder] but includes a mutex, so the user does not need to care about mutability.
pub struct EasyJsonEncoder {
encoder: Arc<Mutex<JsonEncoder<'static>>>,
}

impl EasyJsonEncoder {
pub fn new(sr_settings: SrSettings) -> EasyJsonEncoder {
let encoder = Arc::new(Mutex::new(JsonEncoder::new(sr_settings)));
EasyJsonEncoder { encoder }
}
pub async fn encode(
&self,
value: &Value,
subject_name_strategy: SubjectNameStrategy,
) -> Result<Vec<u8>, SRCError> {
let mut lock = self.encoder.lock().await;
lock.encode(value, subject_name_strategy).await
}
}

#[cfg(test)]
mod tests {
use crate::async_impl::easy_json::{EasyJsonDecoder, EasyJsonEncoder};
use crate::async_impl::json::validate;
use crate::async_impl::schema_registry::SrSettings;
use crate::schema_registry_common::{get_payload, SubjectNameStrategy};
use mockito::{mock, server_address};
use serde_json::Value;
use std::fs::{read_to_string, File};
use test_utils::{get_json_body, json_result_java_bytes, json_result_schema};

#[tokio::test]
async fn test_decoder_default() {
let result_value: String = read_to_string("tests/schema/result-example.json")
.unwrap()
.parse()
.unwrap();
let _m = mock("GET", "/schemas/ids/7?deleted=true")
.with_status(200)
.with_header("content-type", "application/vnd.schemaregistry.v1+json")
.with_body(&get_json_body(json_result_schema(), 7))
.create();

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let decoder = EasyJsonDecoder::new(sr_settings);
let message = decoder
.decode(Some(&*get_payload(7, result_value.into_bytes())))
.await
.unwrap()
.unwrap();
validate(message.schema, &message.value).unwrap();
assert_eq!(
"string",
message
.value
.as_object()
.unwrap()
.get("down")
.unwrap()
.as_str()
.unwrap()
);
assert_eq!(
"STRING",
message
.value
.as_object()
.unwrap()
.get("up")
.unwrap()
.as_str()
.unwrap()
)
}

#[tokio::test]
async fn test_encode_default() {
let _m = mock("GET", "/subjects/testresult-value/versions/latest")
.with_status(200)
.with_header("content-type", "application/vnd.schemaregistry.v1+json")
.with_body(&get_json_body(json_result_schema(), 10))
.create();

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let encoder = EasyJsonEncoder::new(sr_settings);
let strategy = SubjectNameStrategy::TopicNameStrategy(String::from("testresult"), false);
let result_example: Value =
serde_json::from_reader(File::open("tests/schema/result-example.json").unwrap())
.unwrap();

let encoded_data = encoder.encode(&result_example, strategy).await.unwrap();

assert_eq!(encoded_data, json_result_java_bytes())
}
}
53 changes: 53 additions & 0 deletions src/async_impl/easy_proto_decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use crate::async_impl::proto_decoder::ProtoDecoder;
use crate::async_impl::schema_registry::SrSettings;
use crate::error::SRCError;
use protofish::Value;
use std::sync::Arc;
use tokio::sync::Mutex;

/// A decoder used to transform bytes to a [Value], its much like [ProtoDecoder] but includes a mutex, so the user does not need to care about mutability.
/// The mean use of this way of decoding is if you don't know the format at compile time.
/// If you do know the format it's better to use the ProtoRawDecoder, and use a different library to deserialize just the proto bytes.
pub struct EasyProtoDecoder {
decoder: Arc<Mutex<ProtoDecoder<'static>>>,
}

impl EasyProtoDecoder {
pub fn new(sr_settings: SrSettings) -> EasyProtoDecoder {
let decoder = Arc::new(Mutex::new(ProtoDecoder::new(sr_settings)));
EasyProtoDecoder { decoder }
}
pub async fn decode(&self, bytes: Option<&[u8]>) -> Result<Value, SRCError> {
let mut lock = self.decoder.lock().await;
lock.decode(bytes).await
}
}

#[cfg(test)]
mod tests {
use crate::async_impl::easy_proto_decoder::EasyProtoDecoder;
use crate::async_impl::schema_registry::SrSettings;
use mockito::{mock, server_address};
use protofish::Value;
use test_utils::{get_proto_body, get_proto_hb_101, get_proto_hb_schema};

#[tokio::test]
async fn test_decoder_default() {
let _m = mock("GET", "/schemas/ids/7?deleted=true")
.with_status(200)
.with_header("content-type", "application/vnd.schemaregistry.v1+json")
.with_body(&get_proto_body(get_proto_hb_schema(), 1))
.create();

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let decoder = EasyProtoDecoder::new(sr_settings);
let heartbeat = decoder.decode(Some(get_proto_hb_101())).await.unwrap();

let message = match heartbeat {
Value::Message(x) => *x,
v => panic!("Other value: {:?} than expected Message", v),
};

assert_eq!(Value::UInt64(101u64), message.fields[0].value)
}
}
Loading

0 comments on commit 17432bc

Please sign in to comment.