From ef3caa6294aa182033554bd2b7c7e953c99e53fd Mon Sep 17 00:00:00 2001 From: Gerard Klijs Date: Mon, 21 Jun 2021 19:54:41 +0200 Subject: [PATCH] Fix issue 48 --- src/async_impl/proto_raw.rs | 43 ++++++++++++++++++++++++++++++- src/async_impl/schema_registry.rs | 10 +++---- src/blocking/proto_raw.rs | 39 +++++++++++++++++++++++++++- src/blocking/schema_registry.rs | 10 +++---- src/proto_raw_common.rs | 17 ++++++++++++ src/proto_resolver.rs | 4 +++ test_utils/src/lib.rs | 16 ++++++------ 7 files changed, 117 insertions(+), 22 deletions(-) diff --git a/src/async_impl/proto_raw.rs b/src/async_impl/proto_raw.rs index e2b9f92..707aa6a 100644 --- a/src/async_impl/proto_raw.rs +++ b/src/async_impl/proto_raw.rs @@ -5,7 +5,9 @@ use crate::async_impl::schema_registry::{ get_schema_by_id_and_type, get_schema_by_subject, SrSettings, }; use crate::error::SRCError; -use crate::proto_raw_common::{to_bytes, to_decode_context, DecodeContext, EncodeContext}; +use crate::proto_raw_common::{ + to_bytes, to_bytes_single_message, to_decode_context, DecodeContext, EncodeContext, +}; use crate::proto_resolver::{resolve_name, to_index_and_data, IndexResolver}; use crate::schema_registry_common::{ get_bytes_result, get_subject, BytesResult, RegisteredSchema, SchemaType, SubjectNameStrategy, @@ -54,6 +56,21 @@ impl<'a> ProtoRawEncoder<'a> { to_bytes(&encode_context, bytes, full_name) } + /// Encodes the bytes by adding a few bytes to the message with additional information. + /// This should only be used when the schema only had one message + pub async fn encode_single_message( + &mut self, + bytes: &[u8], + subject_name_strategy: SubjectNameStrategy, + ) -> Result, SRCError> { + let key = get_subject(&subject_name_strategy)?; + let encode_context = self + .get_encoding_context(key, subject_name_strategy) + .clone() + .await?; + to_bytes_single_message(&encode_context, bytes) + } + fn get_encoding_context( &mut self, key: String, @@ -203,6 +220,30 @@ mod tests { assert_eq!(encoded_data, get_proto_hb_101()) } + #[tokio::test] + async fn test_encode_single_message() { + let _m = mock("GET", "/subjects/nl.openweb.data.Heartbeat/versions/latest") + .with_status(200) + .with_header("content-type", "application/vnd.schemaregistry.v1+json") + .with_body(&get_proto_body(get_proto_hb_schema(), 7)) + .create(); + + let sr_settings = SrSettings::new(format!("http://{}", server_address())); + let mut encoder = ProtoRawEncoder::new(sr_settings); + let strategy = + SubjectNameStrategy::RecordNameStrategy(String::from("nl.openweb.data.Heartbeat")); + + let encoded_data = encoder + .encode_single_message( + get_proto_hb_101_only_data(), + strategy, + ) + .await + .unwrap(); + + assert_eq!(encoded_data, get_proto_hb_101()) + } + #[tokio::test] async fn test_encode_cache() { let sr_settings = SrSettings::new(format!("http://{}", server_address())); diff --git a/src/async_impl/schema_registry.rs b/src/async_impl/schema_registry.rs index b52c8b5..0486f43 100644 --- a/src/async_impl/schema_registry.rs +++ b/src/async_impl/schema_registry.rs @@ -182,12 +182,10 @@ impl SrSettingsBuilder { builder = builder.timeout(self.timeout); match builder.build() { Ok(client) => Ok(client), - Err(e) => { - return Err(SRCError::non_retryable_with_cause( - e, - "could not create new client", - )) - } + Err(e) => Err(SRCError::non_retryable_with_cause( + e, + "could not create new client", + )), } } } diff --git a/src/blocking/proto_raw.rs b/src/blocking/proto_raw.rs index 3b58586..0deb0ff 100644 --- a/src/blocking/proto_raw.rs +++ b/src/blocking/proto_raw.rs @@ -5,7 +5,9 @@ use crate::blocking::schema_registry::{ get_schema_by_id_and_type, get_schema_by_subject, SrSettings, }; use crate::error::SRCError; -use crate::proto_raw_common::{to_bytes, to_decode_context, DecodeContext, EncodeContext}; +use crate::proto_raw_common::{ + to_bytes, to_bytes_single_message, to_decode_context, DecodeContext, EncodeContext, +}; use crate::proto_resolver::{resolve_name, to_index_and_data, IndexResolver}; use crate::schema_registry_common::{ get_bytes_result, get_subject, BytesResult, RegisteredSchema, SchemaType, SubjectNameStrategy, @@ -48,6 +50,18 @@ impl ProtoRawEncoder { } } + pub fn encode_single_message( + &mut self, + bytes: &[u8], + subject_name_strategy: &SubjectNameStrategy, + ) -> Result, SRCError> { + let key = get_subject(subject_name_strategy)?; + match self.get_encoding_context(key, subject_name_strategy) { + Ok(encode_context) => to_bytes_single_message(encode_context, bytes), + Err(e) => Err(Clone::clone(e)), + } + } + fn get_encoding_context( &mut self, key: String, @@ -184,6 +198,29 @@ mod tests { assert_eq!(encoded_data, get_proto_hb_101()) } + #[test] + fn test_encode_single_message() { + let _m = mock("GET", "/subjects/nl.openweb.data.Heartbeat/versions/latest") + .with_status(200) + .with_header("content-type", "application/vnd.schemaregistry.v1+json") + .with_body(&get_proto_body(get_proto_hb_schema(), 7)) + .create(); + + let sr_settings = SrSettings::new(format!("http://{}", server_address())); + let mut encoder = ProtoRawEncoder::new(sr_settings); + let strategy = + SubjectNameStrategy::RecordNameStrategy(String::from("nl.openweb.data.Heartbeat")); + + let encoded_data = encoder + .encode_single_message( + get_proto_hb_101_only_data(), + &strategy, + ) + .unwrap(); + + assert_eq!(encoded_data, get_proto_hb_101()) + } + #[test] fn test_encode_cache() { let sr_settings = SrSettings::new(format!("http://{}", server_address())); diff --git a/src/blocking/schema_registry.rs b/src/blocking/schema_registry.rs index e865233..de6c679 100644 --- a/src/blocking/schema_registry.rs +++ b/src/blocking/schema_registry.rs @@ -180,12 +180,10 @@ impl SrSettingsBuilder { builder = builder.timeout(self.timeout); match builder.build() { Ok(client) => Ok(client), - Err(e) => { - return Err(SRCError::non_retryable_with_cause( - e, - "could not create new client", - )) - } + Err(e) => Err(SRCError::non_retryable_with_cause( + e, + "could not create new client", + )), } } } diff --git a/src/proto_raw_common.rs b/src/proto_raw_common.rs index d628f11..bf9f1f1 100644 --- a/src/proto_raw_common.rs +++ b/src/proto_raw_common.rs @@ -28,6 +28,23 @@ pub(crate) fn to_bytes( Ok(get_payload(encode_context.id, index_bytes)) } +pub(crate) fn to_bytes_single_message( + encode_context: &EncodeContext, + bytes: &[u8], +) -> Result, SRCError> { + if encode_context.resolver.is_single_message() { + let mut index_bytes = vec![0u8]; + index_bytes.extend(bytes); + Ok(get_payload(encode_context.id, index_bytes)) + } else { + Err(SRCError::new( + "Schema was no single message schema", + None, + false, + )) + } +} + pub(crate) fn to_decode_context(registered_schema: RegisteredSchema) -> DecodeContext { let schema = String::from(®istered_schema.schema); DecodeContext { diff --git a/src/proto_resolver.rs b/src/proto_resolver.rs index 579f588..ad1b064 100644 --- a/src/proto_resolver.rs +++ b/src/proto_resolver.rs @@ -44,6 +44,10 @@ impl IndexResolver { pub(crate) fn find_index(&self, name: &str) -> Option<&Vec> { self.map.get(name) } + + pub(crate) fn is_single_message(&self) -> bool { + self.map.len() == 1 + } } struct ResolverHelper { diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index beb9c10..26ee383 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -40,9 +40,9 @@ pub fn get_proto_complex_proto_test_message_data_only() -> &'static [u8] { pub fn get_proto_complex_proto_test_message() -> &'static [u8] { &[ - 0, 0, 0, 0, 6, 2, 6, 10, 16, 11, 134, 69, 48, 212, 168, 77, 40, 147, 167, 30, 246, 208, - 32, 252, 79, 24, 1, 34, 6, 83, 116, 114, 105, 110, 103, 42, 16, 10, 6, 83, 84, 82, 73, - 78, 71, 18, 6, 115, 116, 114, 105, 110, 103, + 0, 0, 0, 0, 6, 2, 6, 10, 16, 11, 134, 69, 48, 212, 168, 77, 40, 147, 167, 30, 246, 208, 32, + 252, 79, 24, 1, 34, 6, 83, 116, 114, 105, 110, 103, 42, 16, 10, 6, 83, 84, 82, 73, 78, 71, + 18, 6, 115, 116, 114, 105, 110, 103, ] } @@ -82,15 +82,15 @@ pub fn json_get_result_references() -> &'static str { pub fn json_result_java_bytes() -> &'static [u8] { &[ - 0, 0, 0, 0, 10, 123, 34, 100, 111, 119, 110, 34, 58, 34, 115, 116, 114, 105, 110, 103, - 34, 44, 34, 117, 112, 34, 58, 34, 83, 84, 82, 73, 78, 71, 34, 125, + 0, 0, 0, 0, 10, 123, 34, 100, 111, 119, 110, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, + 44, 34, 117, 112, 34, 58, 34, 83, 84, 82, 73, 78, 71, 34, 125, ] } pub fn json_incorrect_bytes() -> &'static [u8] { &[ - 0, 0, 0, 0, 10, 0, 34, 100, 111, 119, 110, 34, 58, 34, 115, 116, 114, 105, 110, 103, - 34, 44, 34, 117, 112, 34, 58, 34, 83, 84, 82, 73, 78, 71, 34, 125, + 0, 0, 0, 0, 10, 0, 34, 100, 111, 119, 110, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, + 44, 34, 117, 112, 34, 58, 34, 83, 84, 82, 73, 78, 71, 34, 125, ] } @@ -129,4 +129,4 @@ impl Default for ConfirmAccountCreation { a_type: Atype::Auto, } } -} \ No newline at end of file +}