Skip to content

Commit

Permalink
Fix issue 48
Browse files Browse the repository at this point in the history
  • Loading branch information
gklijs committed Jun 23, 2021
1 parent c299c2f commit ef3caa6
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 22 deletions.
43 changes: 42 additions & 1 deletion src/async_impl/proto_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::async_impl::schema_registry::{
get_schema_by_id_and_type, get_schema_by_subject, SrSettings,
};
use crate::error::SRCError;
use crate::proto_raw_common::{to_bytes, to_decode_context, DecodeContext, EncodeContext};
use crate::proto_raw_common::{
to_bytes, to_bytes_single_message, to_decode_context, DecodeContext, EncodeContext,
};
use crate::proto_resolver::{resolve_name, to_index_and_data, IndexResolver};
use crate::schema_registry_common::{
get_bytes_result, get_subject, BytesResult, RegisteredSchema, SchemaType, SubjectNameStrategy,
Expand Down Expand Up @@ -54,6 +56,21 @@ impl<'a> ProtoRawEncoder<'a> {
to_bytes(&encode_context, bytes, full_name)
}

/// Encodes the bytes by adding a few bytes to the message with additional information.
/// This should only be used when the schema only had one message
pub async fn encode_single_message(
&mut self,
bytes: &[u8],
subject_name_strategy: SubjectNameStrategy,
) -> Result<Vec<u8>, SRCError> {
let key = get_subject(&subject_name_strategy)?;
let encode_context = self
.get_encoding_context(key, subject_name_strategy)
.clone()
.await?;
to_bytes_single_message(&encode_context, bytes)
}

fn get_encoding_context(
&mut self,
key: String,
Expand Down Expand Up @@ -203,6 +220,30 @@ mod tests {
assert_eq!(encoded_data, get_proto_hb_101())
}

#[tokio::test]
async fn test_encode_single_message() {
let _m = mock("GET", "/subjects/nl.openweb.data.Heartbeat/versions/latest")
.with_status(200)
.with_header("content-type", "application/vnd.schemaregistry.v1+json")
.with_body(&get_proto_body(get_proto_hb_schema(), 7))
.create();

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let mut encoder = ProtoRawEncoder::new(sr_settings);
let strategy =
SubjectNameStrategy::RecordNameStrategy(String::from("nl.openweb.data.Heartbeat"));

let encoded_data = encoder
.encode_single_message(
get_proto_hb_101_only_data(),
strategy,
)
.await
.unwrap();

assert_eq!(encoded_data, get_proto_hb_101())
}

#[tokio::test]
async fn test_encode_cache() {
let sr_settings = SrSettings::new(format!("http://{}", server_address()));
Expand Down
10 changes: 4 additions & 6 deletions src/async_impl/schema_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,10 @@ impl SrSettingsBuilder {
builder = builder.timeout(self.timeout);
match builder.build() {
Ok(client) => Ok(client),
Err(e) => {
return Err(SRCError::non_retryable_with_cause(
e,
"could not create new client",
))
}
Err(e) => Err(SRCError::non_retryable_with_cause(
e,
"could not create new client",
)),
}
}
}
Expand Down
39 changes: 38 additions & 1 deletion src/blocking/proto_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::blocking::schema_registry::{
get_schema_by_id_and_type, get_schema_by_subject, SrSettings,
};
use crate::error::SRCError;
use crate::proto_raw_common::{to_bytes, to_decode_context, DecodeContext, EncodeContext};
use crate::proto_raw_common::{
to_bytes, to_bytes_single_message, to_decode_context, DecodeContext, EncodeContext,
};
use crate::proto_resolver::{resolve_name, to_index_and_data, IndexResolver};
use crate::schema_registry_common::{
get_bytes_result, get_subject, BytesResult, RegisteredSchema, SchemaType, SubjectNameStrategy,
Expand Down Expand Up @@ -48,6 +50,18 @@ impl ProtoRawEncoder {
}
}

pub fn encode_single_message(
&mut self,
bytes: &[u8],
subject_name_strategy: &SubjectNameStrategy,
) -> Result<Vec<u8>, SRCError> {
let key = get_subject(subject_name_strategy)?;
match self.get_encoding_context(key, subject_name_strategy) {
Ok(encode_context) => to_bytes_single_message(encode_context, bytes),
Err(e) => Err(Clone::clone(e)),
}
}

fn get_encoding_context(
&mut self,
key: String,
Expand Down Expand Up @@ -184,6 +198,29 @@ mod tests {
assert_eq!(encoded_data, get_proto_hb_101())
}

#[test]
fn test_encode_single_message() {
let _m = mock("GET", "/subjects/nl.openweb.data.Heartbeat/versions/latest")
.with_status(200)
.with_header("content-type", "application/vnd.schemaregistry.v1+json")
.with_body(&get_proto_body(get_proto_hb_schema(), 7))
.create();

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let mut encoder = ProtoRawEncoder::new(sr_settings);
let strategy =
SubjectNameStrategy::RecordNameStrategy(String::from("nl.openweb.data.Heartbeat"));

let encoded_data = encoder
.encode_single_message(
get_proto_hb_101_only_data(),
&strategy,
)
.unwrap();

assert_eq!(encoded_data, get_proto_hb_101())
}

#[test]
fn test_encode_cache() {
let sr_settings = SrSettings::new(format!("http://{}", server_address()));
Expand Down
10 changes: 4 additions & 6 deletions src/blocking/schema_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,10 @@ impl SrSettingsBuilder {
builder = builder.timeout(self.timeout);
match builder.build() {
Ok(client) => Ok(client),
Err(e) => {
return Err(SRCError::non_retryable_with_cause(
e,
"could not create new client",
))
}
Err(e) => Err(SRCError::non_retryable_with_cause(
e,
"could not create new client",
)),
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions src/proto_raw_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ pub(crate) fn to_bytes(
Ok(get_payload(encode_context.id, index_bytes))
}

pub(crate) fn to_bytes_single_message(
encode_context: &EncodeContext,
bytes: &[u8],
) -> Result<Vec<u8>, SRCError> {
if encode_context.resolver.is_single_message() {
let mut index_bytes = vec![0u8];
index_bytes.extend(bytes);
Ok(get_payload(encode_context.id, index_bytes))
} else {
Err(SRCError::new(
"Schema was no single message schema",
None,
false,
))
}
}

pub(crate) fn to_decode_context(registered_schema: RegisteredSchema) -> DecodeContext {
let schema = String::from(&registered_schema.schema);
DecodeContext {
Expand Down
4 changes: 4 additions & 0 deletions src/proto_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl IndexResolver {
pub(crate) fn find_index(&self, name: &str) -> Option<&Vec<i32>> {
self.map.get(name)
}

pub(crate) fn is_single_message(&self) -> bool {
self.map.len() == 1
}
}

struct ResolverHelper {
Expand Down
16 changes: 8 additions & 8 deletions test_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ pub fn get_proto_complex_proto_test_message_data_only() -> &'static [u8] {

pub fn get_proto_complex_proto_test_message() -> &'static [u8] {
&[
0, 0, 0, 0, 6, 2, 6, 10, 16, 11, 134, 69, 48, 212, 168, 77, 40, 147, 167, 30, 246, 208,
32, 252, 79, 24, 1, 34, 6, 83, 116, 114, 105, 110, 103, 42, 16, 10, 6, 83, 84, 82, 73,
78, 71, 18, 6, 115, 116, 114, 105, 110, 103,
0, 0, 0, 0, 6, 2, 6, 10, 16, 11, 134, 69, 48, 212, 168, 77, 40, 147, 167, 30, 246, 208, 32,
252, 79, 24, 1, 34, 6, 83, 116, 114, 105, 110, 103, 42, 16, 10, 6, 83, 84, 82, 73, 78, 71,
18, 6, 115, 116, 114, 105, 110, 103,
]
}

Expand Down Expand Up @@ -82,15 +82,15 @@ pub fn json_get_result_references() -> &'static str {

pub fn json_result_java_bytes() -> &'static [u8] {
&[
0, 0, 0, 0, 10, 123, 34, 100, 111, 119, 110, 34, 58, 34, 115, 116, 114, 105, 110, 103,
34, 44, 34, 117, 112, 34, 58, 34, 83, 84, 82, 73, 78, 71, 34, 125,
0, 0, 0, 0, 10, 123, 34, 100, 111, 119, 110, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34,
44, 34, 117, 112, 34, 58, 34, 83, 84, 82, 73, 78, 71, 34, 125,
]
}

pub fn json_incorrect_bytes() -> &'static [u8] {
&[
0, 0, 0, 0, 10, 0, 34, 100, 111, 119, 110, 34, 58, 34, 115, 116, 114, 105, 110, 103,
34, 44, 34, 117, 112, 34, 58, 34, 83, 84, 82, 73, 78, 71, 34, 125,
0, 0, 0, 0, 10, 0, 34, 100, 111, 119, 110, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34,
44, 34, 117, 112, 34, 58, 34, 83, 84, 82, 73, 78, 71, 34, 125,
]
}

Expand Down Expand Up @@ -129,4 +129,4 @@ impl Default for ConfirmAccountCreation {
a_type: Atype::Auto,
}
}
}
}

0 comments on commit ef3caa6

Please sign in to comment.