From 17432bcf18c046112cbb049aa77193edcfc9bf46 Mon Sep 17 00:00:00 2001 From: Gerard Klijs Date: Tue, 6 Jul 2021 23:28:58 +0200 Subject: [PATCH] Add 'easy' feature and easy encoders to easily work with async code without having to worry about mutability, while still using the cache. --- Cargo.toml | 7 +- src/async_impl/avro.rs | 14 ++- src/async_impl/easy_avro.rs | 142 +++++++++++++++++++++++++++ src/async_impl/easy_json.rs | 119 ++++++++++++++++++++++ src/async_impl/easy_proto_decoder.rs | 53 ++++++++++ src/async_impl/easy_proto_raw.rs | 132 +++++++++++++++++++++++++ src/async_impl/mod.rs | 8 ++ src/async_impl/proto_raw.rs | 2 + test_utils/src/lib.rs | 2 +- 9 files changed, 476 insertions(+), 3 deletions(-) create mode 100644 src/async_impl/easy_avro.rs create mode 100644 src/async_impl/easy_json.rs create mode 100644 src/async_impl/easy_proto_decoder.rs create mode 100644 src/async_impl/easy_proto_raw.rs diff --git a/Cargo.toml b/Cargo.toml index 210419e..1576e6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ blocking = ["reqwest/blocking"] json = ["url", "valico"] proto_decoder = ["bytes", "integer-encoding", "logos", "protofish"] proto_raw = ["integer-encoding", "logos"] +easy = ["tokio"] kafka_test = [] default = ["futures","native_tls"] @@ -70,6 +71,11 @@ optional = true version = "^2" optional = true +[dependencies.tokio] +version = "^1.2.0" +features = ["macros"] +optional = true + [dependencies.valico] version = "^3.5" optional = true @@ -79,7 +85,6 @@ mockito = "^0.29.0" rdkafka = { version = "^0.25.0", features = ["cmake-build"] } rand = "^0.8.3" test_utils = {path = "test_utils"} -tokio = { version = "^1.2.0", features = ["macros"] } [package.metadata.docs.rs] all-features = true diff --git a/src/async_impl/avro.rs b/src/async_impl/avro.rs index 341bb2e..2f2526c 100644 --- a/src/async_impl/avro.rs +++ b/src/async_impl/avro.rs @@ -936,7 +936,19 @@ mod tests { .await .unwrap(); - assert_eq!(bytes, vec![0, 0, 0, 0, 3, 6]) + assert_eq!(bytes, vec![0, 0, 0, 0, 3, 6]); + + let value_strategy = + SubjectNameStrategy::TopicNameStrategy(String::from("heartbeat"), true); + let bytes = encoder + .encode( + vec![("name", Value::String(String::from("x")))], + value_strategy, + ) + .await + .unwrap(); + + assert_eq!(bytes, vec![0, 0, 0, 0, 4, 2, 120]) } #[tokio::test] diff --git a/src/async_impl/easy_avro.rs b/src/async_impl/easy_avro.rs new file mode 100644 index 0000000..a8abef4 --- /dev/null +++ b/src/async_impl/easy_avro.rs @@ -0,0 +1,142 @@ +use crate::async_impl::avro::{AvroDecoder, AvroEncoder}; +use crate::async_impl::schema_registry::SrSettings; +use crate::avro_common::DecodeResult; +use crate::error::SRCError; +use crate::schema_registry_common::SubjectNameStrategy; +use avro_rs::types::Value; +use serde::Serialize; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// A decoder used to transform bytes to a [DecodeResult], its much like [AvroDecoder] but includes a mutex, so the user does not need to care about mutability. +pub struct EasyAvroDecoder { + decoder: Arc>>, +} + +impl EasyAvroDecoder { + pub fn new(sr_settings: SrSettings) -> EasyAvroDecoder { + let decoder = Arc::new(Mutex::new(AvroDecoder::new(sr_settings))); + EasyAvroDecoder { decoder } + } + pub async fn decode(&self, bytes: Option<&[u8]>) -> Result { + let mut lock = self.decoder.lock().await; + lock.decode(bytes).await + } +} + +/// An encoder used to transform a [Value] to bytes, its much like [AvroEncoder] but includes a mutex, so the user does not need to care about mutability. +pub struct EasyAvroEncoder { + encoder: Arc>>, +} + +impl EasyAvroEncoder { + pub fn new(sr_settings: SrSettings) -> EasyAvroEncoder { + let encoder = Arc::new(Mutex::new(AvroEncoder::new(sr_settings))); + EasyAvroEncoder { encoder } + } + pub async fn encode( + &self, + values: Vec<(&'static str, Value)>, + subject_name_strategy: SubjectNameStrategy, + ) -> Result, SRCError> { + let mut lock = self.encoder.lock().await; + lock.encode(values, subject_name_strategy).await + } + pub async fn encode_struct( + &self, + item: impl Serialize, + subject_name_strategy: &SubjectNameStrategy, + ) -> Result, SRCError> { + let mut lock = self.encoder.lock().await; + lock.encode_struct(item, subject_name_strategy).await + } +} + +#[cfg(test)] +mod tests { + use crate::async_impl::easy_avro::{EasyAvroDecoder, EasyAvroEncoder}; + use crate::async_impl::schema_registry::SrSettings; + use crate::avro_common::get_supplied_schema; + use crate::schema_registry_common::SubjectNameStrategy; + use avro_rs::types::Value; + use avro_rs::{from_value, Schema}; + use mockito::{mock, server_address}; + use test_utils::Heartbeat; + + #[tokio::test] + async fn test_decoder_default() { + let _m = mock("GET", "/schemas/ids/1?deleted=true") + .with_status(200) + .with_header("content-type", "application/vnd.schemaregistry.v1+json") + .with_body(r#"{"schema":"{\"type\":\"record\",\"name\":\"Heartbeat\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"beat\",\"type\":\"long\"}]}"}"#) + .create(); + + let sr_settings = SrSettings::new(format!("http://{}", server_address())); + let decoder = EasyAvroDecoder::new(sr_settings); + let heartbeat = decoder + .decode(Some(&[0, 0, 0, 0, 1, 6])) + .await + .unwrap() + .value; + + assert_eq!( + heartbeat, + Value::Record(vec![("beat".to_string(), Value::Long(3))]) + ); + + let item = match from_value::(&heartbeat) { + Ok(h) => h, + Err(_) => unreachable!(), + }; + assert_eq!(item.beat, 3i64); + } + + #[tokio::test] + async fn test_encode_value() { + let _m = mock("GET", "/subjects/heartbeat-value/versions/latest") + .with_status(200) + .with_header("content-type", "application/vnd.schemaregistry.v1+json") + .with_body(r#"{"subject":"heartbeat-value","version":1,"id":3,"schema":"{\"type\":\"record\",\"name\":\"Heartbeat\",\"namespace\":\"nl.openweb.data\",\"fields\":[{\"name\":\"beat\",\"type\":\"long\"}]}"}"#) + .create(); + + let sr_settings = SrSettings::new(format!("http://{}", server_address())); + let encoder = EasyAvroEncoder::new(sr_settings); + + let value_strategy = + SubjectNameStrategy::TopicNameStrategy(String::from("heartbeat"), false); + let bytes = encoder + .encode(vec![("beat", Value::Long(3))], value_strategy) + .await + .unwrap(); + + assert_eq!(bytes, vec![0, 0, 0, 0, 3, 6]) + } + + #[tokio::test] + async fn test_primitive_schema() { + let sr_settings = SrSettings::new(format!("http://{}", server_address())); + let encoder = EasyAvroEncoder::new(sr_settings); + + let _n = mock("POST", "/subjects/heartbeat-key/versions") + .with_status(200) + .with_header("content-type", "application/vnd.schemaregistry.v1+json") + .with_body(r#"{"id":4}"#) + .create(); + + let primitive_schema_strategy = SubjectNameStrategy::TopicNameStrategyWithSchema( + String::from("heartbeat"), + true, + get_supplied_schema(&Schema::String), + ); + let bytes = encoder + .encode_struct("key-value", &primitive_schema_strategy) + .await; + + assert_eq!( + bytes, + Ok(vec![ + 0, 0, 0, 0, 4, 18, 107, 101, 121, 45, 118, 97, 108, 117, 101 + ]) + ); + } +} diff --git a/src/async_impl/easy_json.rs b/src/async_impl/easy_json.rs new file mode 100644 index 0000000..2cf6d91 --- /dev/null +++ b/src/async_impl/easy_json.rs @@ -0,0 +1,119 @@ +use crate::async_impl::json::{DecodeResult, JsonDecoder, JsonEncoder}; +use crate::async_impl::schema_registry::SrSettings; +use crate::error::SRCError; +use crate::schema_registry_common::SubjectNameStrategy; +use serde_json::Value; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// A decoder used to transform bytes to a [DecodeResult], its much like [JsonDecoder] but includes a mutex, so the user does not need to care about mutability. +pub struct EasyJsonDecoder { + decoder: Arc>>, +} + +impl EasyJsonDecoder { + pub fn new(sr_settings: SrSettings) -> EasyJsonDecoder { + let decoder = Arc::new(Mutex::new(JsonDecoder::new(sr_settings))); + EasyJsonDecoder { decoder } + } + pub async fn decode(&self, bytes: Option<&[u8]>) -> Result, SRCError> { + let mut lock = self.decoder.lock().await; + lock.decode(bytes).await + } +} + +/// An encoder used to transform a [Value] to bytes, its much like [JsonEncoder] but includes a mutex, so the user does not need to care about mutability. +pub struct EasyJsonEncoder { + encoder: Arc>>, +} + +impl EasyJsonEncoder { + pub fn new(sr_settings: SrSettings) -> EasyJsonEncoder { + let encoder = Arc::new(Mutex::new(JsonEncoder::new(sr_settings))); + EasyJsonEncoder { encoder } + } + pub async fn encode( + &self, + value: &Value, + subject_name_strategy: SubjectNameStrategy, + ) -> Result, SRCError> { + let mut lock = self.encoder.lock().await; + lock.encode(value, subject_name_strategy).await + } +} + +#[cfg(test)] +mod tests { + use crate::async_impl::easy_json::{EasyJsonDecoder, EasyJsonEncoder}; + use crate::async_impl::json::validate; + use crate::async_impl::schema_registry::SrSettings; + use crate::schema_registry_common::{get_payload, SubjectNameStrategy}; + use mockito::{mock, server_address}; + use serde_json::Value; + use std::fs::{read_to_string, File}; + use test_utils::{get_json_body, json_result_java_bytes, json_result_schema}; + + #[tokio::test] + async fn test_decoder_default() { + let result_value: String = read_to_string("tests/schema/result-example.json") + .unwrap() + .parse() + .unwrap(); + let _m = mock("GET", "/schemas/ids/7?deleted=true") + .with_status(200) + .with_header("content-type", "application/vnd.schemaregistry.v1+json") + .with_body(&get_json_body(json_result_schema(), 7)) + .create(); + + let sr_settings = SrSettings::new(format!("http://{}", server_address())); + let decoder = EasyJsonDecoder::new(sr_settings); + let message = decoder + .decode(Some(&*get_payload(7, result_value.into_bytes()))) + .await + .unwrap() + .unwrap(); + validate(message.schema, &message.value).unwrap(); + assert_eq!( + "string", + message + .value + .as_object() + .unwrap() + .get("down") + .unwrap() + .as_str() + .unwrap() + ); + assert_eq!( + "STRING", + message + .value + .as_object() + .unwrap() + .get("up") + .unwrap() + .as_str() + .unwrap() + ) + } + + #[tokio::test] + async fn test_encode_default() { + let _m = mock("GET", "/subjects/testresult-value/versions/latest") + .with_status(200) + .with_header("content-type", "application/vnd.schemaregistry.v1+json") + .with_body(&get_json_body(json_result_schema(), 10)) + .create(); + + let sr_settings = SrSettings::new(format!("http://{}", server_address())); + let encoder = EasyJsonEncoder::new(sr_settings); + let strategy = SubjectNameStrategy::TopicNameStrategy(String::from("testresult"), false); + let result_example: Value = + serde_json::from_reader(File::open("tests/schema/result-example.json").unwrap()) + .unwrap(); + + let encoded_data = encoder.encode(&result_example, strategy).await.unwrap(); + + assert_eq!(encoded_data, json_result_java_bytes()) + } +} diff --git a/src/async_impl/easy_proto_decoder.rs b/src/async_impl/easy_proto_decoder.rs new file mode 100644 index 0000000..f5972d6 --- /dev/null +++ b/src/async_impl/easy_proto_decoder.rs @@ -0,0 +1,53 @@ +use crate::async_impl::proto_decoder::ProtoDecoder; +use crate::async_impl::schema_registry::SrSettings; +use crate::error::SRCError; +use protofish::Value; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// A decoder used to transform bytes to a [Value], its much like [ProtoDecoder] but includes a mutex, so the user does not need to care about mutability. +/// The mean use of this way of decoding is if you don't know the format at compile time. +/// If you do know the format it's better to use the ProtoRawDecoder, and use a different library to deserialize just the proto bytes. +pub struct EasyProtoDecoder { + decoder: Arc>>, +} + +impl EasyProtoDecoder { + pub fn new(sr_settings: SrSettings) -> EasyProtoDecoder { + let decoder = Arc::new(Mutex::new(ProtoDecoder::new(sr_settings))); + EasyProtoDecoder { decoder } + } + pub async fn decode(&self, bytes: Option<&[u8]>) -> Result { + let mut lock = self.decoder.lock().await; + lock.decode(bytes).await + } +} + +#[cfg(test)] +mod tests { + use crate::async_impl::easy_proto_decoder::EasyProtoDecoder; + use crate::async_impl::schema_registry::SrSettings; + use mockito::{mock, server_address}; + use protofish::Value; + use test_utils::{get_proto_body, get_proto_hb_101, get_proto_hb_schema}; + + #[tokio::test] + async fn test_decoder_default() { + let _m = mock("GET", "/schemas/ids/7?deleted=true") + .with_status(200) + .with_header("content-type", "application/vnd.schemaregistry.v1+json") + .with_body(&get_proto_body(get_proto_hb_schema(), 1)) + .create(); + + let sr_settings = SrSettings::new(format!("http://{}", server_address())); + let decoder = EasyProtoDecoder::new(sr_settings); + let heartbeat = decoder.decode(Some(get_proto_hb_101())).await.unwrap(); + + let message = match heartbeat { + Value::Message(x) => *x, + v => panic!("Other value: {:?} than expected Message", v), + }; + + assert_eq!(Value::UInt64(101u64), message.fields[0].value) + } +} diff --git a/src/async_impl/easy_proto_raw.rs b/src/async_impl/easy_proto_raw.rs new file mode 100644 index 0000000..37652f2 --- /dev/null +++ b/src/async_impl/easy_proto_raw.rs @@ -0,0 +1,132 @@ +use crate::async_impl::proto_raw::{ProtoRawDecoder, ProtoRawEncoder, RawDecodeResult}; +use crate::async_impl::schema_registry::SrSettings; +use crate::error::SRCError; +use crate::schema_registry_common::SubjectNameStrategy; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// A decoder used to transform bytes to a [RawDecodeResult], its much like [ProtoRawDecoder] but includes a mutex, so the user does not need to care about mutability. +/// You can use the bytes from the result to create a proto object. +pub struct EasyProtoRawDecoder { + decoder: Arc>>, +} + +impl EasyProtoRawDecoder { + pub fn new(sr_settings: SrSettings) -> EasyProtoRawDecoder { + let decoder = Arc::new(Mutex::new(ProtoRawDecoder::new(sr_settings))); + EasyProtoRawDecoder { decoder } + } + pub async fn decode(&self, bytes: Option<&[u8]>) -> Result, SRCError> { + let mut lock = self.decoder.lock().await; + lock.decode(bytes).await + } +} + +/// An encoder used to transform the proto bytes to bytes compatible with confluent schema registry, its much like [ProtoRawEncoder] but includes a mutex, so the user does not need to care about mutability. +/// This wil just add the magic byte, schema reference, and message reference. The bytes should already be valid proto bytes for the schema used. +/// When a schema with multiple messages is used the full_name needs to be supplied to properly encode the message reference. +pub struct EasyProtoRawEncoder { + encoder: Arc>>, +} + +impl EasyProtoRawEncoder { + pub fn new(sr_settings: SrSettings) -> EasyProtoRawEncoder { + let encoder = Arc::new(Mutex::new(ProtoRawEncoder::new(sr_settings))); + EasyProtoRawEncoder { encoder } + } + pub async fn encode( + &self, + bytes: &[u8], + full_name: &str, + subject_name_strategy: SubjectNameStrategy, + ) -> Result, SRCError> { + let mut lock = self.encoder.lock().await; + lock.encode(bytes, full_name, subject_name_strategy).await + } + pub async fn encode_single_message( + &self, + bytes: &[u8], + subject_name_strategy: SubjectNameStrategy, + ) -> Result, SRCError> { + let mut lock = self.encoder.lock().await; + lock.encode_single_message(bytes, subject_name_strategy) + .await + } +} + +#[cfg(test)] +mod tests { + use crate::async_impl::easy_proto_raw::{EasyProtoRawDecoder, EasyProtoRawEncoder}; + use crate::async_impl::schema_registry::SrSettings; + use crate::schema_registry_common::SubjectNameStrategy; + use mockito::{mock, server_address}; + use test_utils::{ + get_proto_body, get_proto_hb_101, get_proto_hb_101_only_data, get_proto_hb_schema, + }; + + #[tokio::test] + async fn test_decoder_default() { + let _m = mock("GET", "/schemas/ids/7?deleted=true") + .with_status(200) + .with_header("content-type", "application/vnd.schemaregistry.v1+json") + .with_body(&get_proto_body(get_proto_hb_schema(), 7)) + .create(); + + let sr_settings = SrSettings::new(format!("http://{}", server_address())); + let decoder = EasyProtoRawDecoder::new(sr_settings); + let raw_result = decoder + .decode(Some(get_proto_hb_101())) + .await + .unwrap() + .unwrap(); + + assert_eq!(raw_result.bytes, get_proto_hb_101_only_data()); + assert_eq!(raw_result.full_name, "nl.openweb.data.Heartbeat") + } + + #[tokio::test] + async fn test_encode_default() { + let _m = mock("GET", "/subjects/nl.openweb.data.Heartbeat/versions/latest") + .with_status(200) + .with_header("content-type", "application/vnd.schemaregistry.v1+json") + .with_body(&get_proto_body(get_proto_hb_schema(), 7)) + .create(); + + let sr_settings = SrSettings::new(format!("http://{}", server_address())); + let encoder = EasyProtoRawEncoder::new(sr_settings); + let strategy = + SubjectNameStrategy::RecordNameStrategy(String::from("nl.openweb.data.Heartbeat")); + + let encoded_data = encoder + .encode( + get_proto_hb_101_only_data(), + "nl.openweb.data.Heartbeat", + strategy, + ) + .await + .unwrap(); + + assert_eq!(encoded_data, get_proto_hb_101()) + } + + #[tokio::test] + async fn test_encode_single_message() { + let _m = mock("GET", "/subjects/nl.openweb.data.Heartbeat/versions/latest") + .with_status(200) + .with_header("content-type", "application/vnd.schemaregistry.v1+json") + .with_body(&get_proto_body(get_proto_hb_schema(), 7)) + .create(); + + let sr_settings = SrSettings::new(format!("http://{}", server_address())); + let encoder = EasyProtoRawEncoder::new(sr_settings); + let strategy = + SubjectNameStrategy::RecordNameStrategy(String::from("nl.openweb.data.Heartbeat")); + + let encoded_data = encoder + .encode_single_message(get_proto_hb_101_only_data(), strategy) + .await + .unwrap(); + + assert_eq!(encoded_data, get_proto_hb_101()) + } +} diff --git a/src/async_impl/mod.rs b/src/async_impl/mod.rs index af1addf..fcd6e2e 100644 --- a/src/async_impl/mod.rs +++ b/src/async_impl/mod.rs @@ -1,5 +1,13 @@ #[cfg(feature = "avro")] pub mod avro; +#[cfg(all(feature = "easy", feature = "avro"))] +pub mod easy_avro; +#[cfg(all(feature = "easy", feature = "json"))] +pub mod easy_json; +#[cfg(all(feature = "easy", feature = "proto_decoder"))] +pub mod easy_proto_decoder; +#[cfg(all(feature = "easy", feature = "proto_raw"))] +pub mod easy_proto_raw; #[cfg(feature = "json")] pub mod json; #[cfg(feature = "proto_decoder")] diff --git a/src/async_impl/proto_raw.rs b/src/async_impl/proto_raw.rs index b25cf03..9a427c1 100644 --- a/src/async_impl/proto_raw.rs +++ b/src/async_impl/proto_raw.rs @@ -19,6 +19,8 @@ use futures::FutureExt; /// bytes. Ideally you want to make sure the bytes are based on the exact schema used for encoding /// but you need a protobuf struct that has introspection to make that work, and both protobuf and /// prost don't support that at the moment. +/// This wil just add the magic byte, schema reference, and message reference. The bytes should already be valid proto bytes for the schema used. +/// When a schema with multiple messages is used the full_name needs to be supplied to properly encode the message reference. #[derive(Debug)] pub struct ProtoRawEncoder<'a> { sr_settings: SrSettings, diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index 9733a8f..a7d8495 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -115,8 +115,8 @@ impl Default for Atype { pub type Uuid = [u8; 16]; -#[serde(default)] #[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] +#[serde(default)] pub struct ConfirmAccountCreation { pub id: Uuid, pub a_type: Atype,