diff --git a/Cargo.toml b/Cargo.toml index 1576e6d..dc22fcc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "schema_registry_converter" -version = "2.0.2" +version = "2.1.0" authors = ["Gerard Klijs "] include = ["src/**/*", "Cargo.toml"] description = "Encode/decode data from/to kafka using the Confluent Schema Registry" @@ -64,39 +64,27 @@ version = "^0.12" optional = true [dependencies.protofish] -version = "^0.3" +version = "^0.4" optional = true [dependencies.url] -version = "^2" +version = "^2.2" optional = true [dependencies.tokio] -version = "^1.2.0" +version = "^1.8" features = ["macros"] optional = true [dependencies.valico] -version = "^3.5" +version = "^3.6" optional = true [dev-dependencies] -mockito = "^0.29.0" -rdkafka = { version = "^0.25.0", features = ["cmake-build"] } -rand = "^0.8.3" +mockito = "^0.30.0" +rdkafka = { version = "^0.26.0", features = ["cmake-build"] } +rand = "^0.8.4" test_utils = {path = "test_utils"} [package.metadata.docs.rs] all-features = true - -[badges.travis-ci] -repository = "gklijs/schema_registry_converter" - -[badges.codecov] -repository = "gklijs/schema_registry_converter" - -[badges.is-it-maintained-open-issues] -repository = "gklijs/schema_registry_converter" - -[badges.maintenance] -status = "passively-maintained" diff --git a/README.md b/README.md index 9047d09..2cd273f 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -> #schema_registry_converter +> # schema_registry_converter [![Build Status](https://github.com/gklijs/schema_registry_converter/actions/workflows/ci.yml/badge.svg)](https://github.com/gklijs/schema_registry_converter/actions/workflows/ci.yml) [![codecov](https://codecov.io/gh/gklijs/schema_registry_converter/branch/master/graph/badge.svg)](https://codecov.io/gh/gklijs/schema_registry_converter) @@ -7,35 +7,44 @@ [![docs.rs](https://docs.rs/schema_registry_converter/badge.svg)](https://docs.rs/schema_registry_converter/) --- -This library provides a way of using the Confluent Schema Registry in a way that is compliant with the Java client. -The release notes can be found on [github](https://github.com/gklijs/schema_registry_converter/blob/master/RELEASE_NOTES.md) +This library provides a way of using +the [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) in a way that is +compliant with the Java client. Since +[Karapace](https://github.com/aiven/karapace/blob/master/README.rst) is API compatible it could also be used with this +library. The release notes can be found +on [github](https://github.com/gklijs/schema_registry_converter/blob/master/RELEASE_NOTES.md) Consuming/decoding and producing/encoding is supported. It's also possible to provide the schema to use when decoding. -You can also include references when decoding. -When no schema is provided, the latestschema with the same `subject` will be used. -It's supposed to be feature complete compared to the Java version. If anything is missing or not working as expected please create an issue. +You can also include references when decoding. Without a schema provided, the latest schema with the same `subject` will +be used. -## Consumer - -For consuming messages encoded with the schema registry, you need to fetch the correct schema from the schema registry to transform it into a record. For clarity, error handling is omitted from the diagram. +It's supposed to be feature complete compared to the Java version. If anything is missing or not working as expected +please create an issue or start a discussion +on [github discussions](https://github.com/gklijs/schema_registry_converter/discussions). An example of using this +library async with protobuf to produce data to Kafka can be found +in [ksqlDB-GraphQL-poc](https://github.com/gklijs/ksqlDB-GraphQL-poc/tree/main/rust-data-creator). A blog with a bit of +background on this library can be found +titled [confluent Schema Registry and Rust](https://blog.openweb.nl/blog/confluent-schema-registry-and-rust) -![Consumer activity flow](http://www.plantuml.com/plantuml/proxy?cache=no&src=https://raw.githubusercontent.com/gklijs/schema_registry_converter/master/uml/consumer.puml) +## Getting Started -## Producer - -For producing messages which can be properly consumed by other clients, the proper id needs to be encoded with the message. To get the correct id, it might be necessary to register a new schema. For clarity, error handling is omitted from the diagram. - -![Producer activity flow](http://www.plantuml.com/plantuml/proxy?cache=no&src=https://raw.githubusercontent.com/gklijs/schema_registry_converter/master/uml/producer.puml) +[schema_registry_converter.rs is available on crates.io](https://crates.io/crates/schema_registry_converter). It is +recommended to look there for the newest and more elaborate documentation. It has a couple of feature flags, be sure to +set them correctly. -# Getting Started +To use it to convert using avro async use: -[schema_registry_converter.rs is available on crates.io](https://crates.io/crates/schema_registry_converter). -It is recommended to look there for the newest and more elaborate documentation. +```toml +[dependencies] +schema_registry_converter = { version = "2.1.0", features = ["avro"] } +``` -To use it to convert using avro async use: +The resulting converter needs mutability through, for simplicity there are `easy` variants that internally have a mutex. +Making it easier to use at the price of some overhead. To use the `easy` variants add the `easy` feature and use the +structs that start with `Easy` in the name to do the conversions. ```toml [dependencies] -schema_registry_converter = { version = "2.0.2", features = ["avro"] } +schema_registry_converter = { version = "2.1.0", features = ["easy", "avro"] } ``` ...and see the [docs](https://docs.rs/schema_registry_converter) for how to use it. @@ -44,20 +53,37 @@ All the converters also have a blocking (non async) version, in that case use so ```toml [dependencies] -schema_registry_converter = { version = "2.0.2", default-features = false, features = ["avro", "blocking"]} +schema_registry_converter = { version = "2.1.0", default-features = false, features = ["avro", "blocking"] } ``` -If you need to use both in a project you can use something like, but have to be weary you import the correct paths depending on your use. +If you need to use both in a project you can use something like, but have to be weary you import the correct paths +depending on your use. ```toml [dependencies] -schema_registry_converter = { version = "2.0.2", features = ["avro", "blocking"]} +schema_registry_converter = { version = "2.1.0", features = ["avro", "blocking"] } ``` -# Example with consumer and producer using Avro +## Consumer -Two examples of but consuming/decoding and producing/encoding. -To use structs with Avro they must have an implementation of either the `serde::Deserialize` or `serde::Serialize` trait to work. +For consuming messages encoded with the schema registry, you need to fetch the correct schema from the schema registry +to transform it into a record. For clarity, error handling is omitted from the diagram. + +![Consumer activity flow](http://www.plantuml.com/plantuml/proxy?cache=no&src=https://raw.githubusercontent.com/gklijs/schema_registry_converter/master/uml/consumer.puml) + +## Producer + +For producing messages which can be properly consumed by other clients, the proper id needs to be encoded with the +message. To get the correct id, it might be necessary to register a new schema. For clarity, error handling is omitted +from the diagram. + +![Producer activity flow](http://www.plantuml.com/plantuml/proxy?cache=no&src=https://raw.githubusercontent.com/gklijs/schema_registry_converter/master/uml/producer.puml) + +## Example with consumer and producer using Avro (blocking) + +Examples which does both consuming/decoding and producing/encoding. To use structs with Avro they must have an implementation +of either the `serde::Deserialize` or `serde::Serialize` trait to work. The examples are especially useful to update +from the 1.x.x version, when starting you probably want to use the async versions. ```rust use rdkafka::message::{Message, BorrowedMessage}; @@ -66,8 +92,8 @@ use schema_registry_converter::blocking::{Decoder, Encoder}; use schema_registry_converter::blocking::schema_registry::SubjectNameStrategy; fn main() { - let mut decoder = Decoder::new(String::from("http://localhost:8081")); - let mut encoder = Encoder::new(String::from("http://localhost:8081")); + let mut decoder = Decoder::new(SrSettings::new(String::from("http://localhost:8081"))); + let mut encoder = Encoder::new(SrSettings::new(String::from("http://localhost:8081"))); let hb = get_heartbeat(msg, decoder); let record = get_future_record_from_struct("hb", Some("id"), hb, encoder); producer.send(record); @@ -76,32 +102,32 @@ fn main() { fn get_value<'a>( msg: &'a BorrowedMessage, decoder: &'a mut Decoder, -) -> Value{ - match decoder.decode(msg.payload()){ - Ok(v) => v, - Err(e) => panic!("Error getting value: {}", e), +) -> Value { + match decoder.decode(msg.payload()) { + Ok(v) => v, + Err(e) => panic!("Error getting value: {}", e), } } fn get_heartbeat<'a>( msg: &'a BorrowedMessage, decoder: &'a mut Decoder, -) -> Heartbeat{ - match decoder.decode_with_name(msg.payload()){ +) -> Heartbeat { + match decoder.decode_with_name(msg.payload()) { Ok((name, value)) => { match name.name.as_str() { "Heartbeat" => { - match name.namespace{ + match name.namespace { Some(namespace) => { - match namespace.as_str(){ + match namespace.as_str() { "nl.openweb.data" => from_value::(&value).unwrap(), - ns=> panic!("Unexpected namespace {}", ns), + ns => panic!("Unexpected namespace {}", ns), } - }, + } None => panic!("No namespace in schema, while expected"), } } - name=> panic!("Unexpected name {}", name), + name => panic!("Unexpected name {}", name), } } Err(e) => panic!("error getting heartbeat: {}, e"), @@ -113,7 +139,7 @@ fn get_future_record<'a>( key: Option<&'a str>, values: Vec<(&'static str, Value)>, encoder: &'a mut Encoder, -) -> FutureRecord<'a>{ +) -> FutureRecord<'a> { let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false); let payload = match encoder.encode(values, &subject_name_strategy) { Ok(v) => v, @@ -134,7 +160,7 @@ fn get_future_record_from_struct<'a>( key: Option<&'a str>, heartbeat: Heartbeat, encoder: &'a mut Encoder, -) -> FutureRecord<'a>{ +) -> FutureRecord<'a> { let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false); let payload = match encoder.encode_struct(heartbeat, &subject_name_strategy) { Ok(v) => v, @@ -151,7 +177,7 @@ fn get_future_record_from_struct<'a>( } ``` -# Example using to post schema to schema registry +## Example using to post schema to schema registry ```rust use schema_registry_converter::blocking::schema_registry::{ @@ -159,53 +185,55 @@ use schema_registry_converter::blocking::schema_registry::{ SuppliedSchema }; -fn main(){ +fn main() { let schema = SuppliedSchema { - name: String::from("nl.openweb.data.Heartbeat"), - schema_type: SchemaType::AVRO, - schema: String::from(r#"{"type":"record","name":"Heartbeat","namespace":"nl.openweb.data","fields":[{"name":"beat","type":"long"}]}"#), - references: vec![], + name: String::from("nl.openweb.data.Heartbeat"), + schema_type: SchemaType::AVRO, + schema: String::from(r#"{"type":"record","name":"Heartbeat","namespace":"nl.openweb.data","fields":[{"name":"beat","type":"long"}]}"#), + references: vec![], }; let result = post_schema("http://localhost:8081/subjects/test-value/versions", heartbeat_schema); } - ``` -# Relation to related libraries +## Relation to related libraries -The avro part of the conversion is handled by avro-rs. As such, I don't include tests for every possible schema. -While I used rdkafka in combination to successfully consume from and produce to kafka, and while it's used in the example, this crate has no direct dependency on it. -All this crate does is convert [u8] <-> Some Value (based on converter used). -With Json and Protobuf some other dependencies are pulled in, by using said features. -I have tried to encapsulate all the errors in the SRCError type. -So even when you get a pannic/error that's an SRCError it could be an error from one of the dependencies. -Please make sure you are using the library correctly, and the error is not caused by a depency, before creating an issue. +The avro part of the conversion is handled by avro-rs. As such, I don't include tests for every possible schema. While I +used rdkafka in combination to successfully consume from and produce to kafka, and while it's used in the example, this +crate has no direct dependency on it. All this crate does is convert [u8] <-> Some Value (based on converter used). With +Json and Protobuf some other dependencies are pulled in, by using said features. I have tried to encapsulate all the +errors in the SRCError type. So even when you get a pannic/error that's an SRCError it could be an error from one of the +dependencies. Please make sure you are using the library correctly, and the error is not caused by a depency, before +creating an issue. -# Tests +## Tests -Due to mockito, used for mocking the schema registry responses, being run in a separate thread, tests have to be run using ` --test-threads=1` for example like +Due to mockito, used for mocking the schema registry responses, being run in a separate thread, tests have to be run +using ` --test-threads=1` for example like `cargo +stable test --color=always --features avro,json,proto_decoder,proto_raw -- --nocapture --test-threads=1` -# Integration test +## Integration test -The integration tests require a Kafka cluster running on the default ports. It will create topics, register schema's, produce and consume some messages. -They are only included when compiled with the `kafka_test` feature, so to include them in testing `cargo +stable test --all-features --color=always -- --nocapture --test-threads=1` needs to be run. -The 'prepare_integration_test.sh' script can be used to create the 3 topics needed for the tests. -To ensure Java compatibility it's also needed to run the [schema-registry-test-app](https://hub.docker.com/repository/docker/gklijs/schema-registry-test-app) docker image. +The integration tests require a Kafka cluster running on the default ports. It will create topics, register schema's, +produce and consume some messages. They are only included when compiled with the `kafka_test` feature, so to include +them in testing `cargo +stable test --all-features --color=always -- --nocapture --test-threads=1` needs to be run. +The 'prepare_integration_test.sh' script can be used to create the 3 topics needed for the tests. To ensure Java +compatibility it's also needed to run +the [schema-registry-test-app](https://hub.docker.com/repository/docker/gklijs/schema-registry-test-app) docker image. -# License +## License This project is licensed under either of - * Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or - http://www.apache.org/licenses/LICENSE-2.0) - * MIT license ([LICENSE-MIT](LICENSE-MIT) or - http://opensource.org/licenses/MIT) +* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or + http://www.apache.org/licenses/LICENSE-2.0) +* MIT license ([LICENSE-MIT](LICENSE-MIT) or + http://opensource.org/licenses/MIT) at your option. ### Contribution -Unless you explicitly state otherwise, any contribution intentionally submitted -for inclusion in Schema Registry Converter by you, as defined in the Apache-2.0 license, shall be -dual licensed as above, without any additional terms or conditions. +Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Schema Registry +Converter by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or +conditions. diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index aaeb010..b6fa35a 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,15 @@ ## Release notes +### 2.1.0 + +Dependencies updated and ci is now run in Github Actions also some improvements where made making it easier to use, and open up some additional use cases. + +#### Issues +- It's now possible to set additional options on the `reqwest` client, and use that to create the SrSettings. Mainly for custom security requirements. +- It's possible to use the `rustls_tls` feature to let `reqwest` use `rustls-tls`. +- For each async converter an `easy` variant was added. This makes it easier to use the library, as internally a mutex is used, so the user has to care about mutability less. +- For the protobuf encoders an `encode_single_message` method was eded to encode when the schema contains only one message. The full name of the proto message is not needed for this. + ### 2.0.2 Updated dependencies @@ -68,4 +78,6 @@ instead of the `encode` function on the encoder. - [@cbzehner](https://github.com/cbzehner) - [@j-halbert](https://github.com/j-halbert) - [@kitsuneninetails](https://github.com/kitsuneninetails) -- [@naamancurtis](https://github.com/naamancurtis) \ No newline at end of file +- [@naamancurtis](https://github.com/naamancurtis) +- [@MariellHoversholm-Paf](https://github.com/MariellHoversholm-Paf) +- [@SergeStrashko](https://github.com/SergeStrashko) \ No newline at end of file diff --git a/src/async_impl/avro.rs b/src/async_impl/avro.rs index 2f2526c..ba4a38c 100644 --- a/src/async_impl/avro.rs +++ b/src/async_impl/avro.rs @@ -180,7 +180,7 @@ impl<'a> AvroDecoder<'a> { /// The actual deserialization trying to get the id from the bytes to retrieve the schema, and /// using a reader transforms the bytes to a value. async fn deserialize(&mut self, id: u32, bytes: &[u8]) -> Result { - let schema = self.get_schema(id).clone().await?; + let schema = self.schema(id).clone().await?; let mut reader = Cursor::new(bytes); match from_avro_datum(&schema.parsed, &mut reader, None) { Ok(v) => Ok(DecodeResult { @@ -194,7 +194,7 @@ impl<'a> AvroDecoder<'a> { } } - fn get_schema(&mut self, id: u32) -> &Shared>> { + fn schema(&mut self, id: u32) -> &Shared>> { match self.cache.entry(id) { Entry::Occupied(e) => &*e.into_mut(), Entry::Vacant(e) => { diff --git a/src/async_impl/easy_proto_decoder.rs b/src/async_impl/easy_proto_decoder.rs index f5972d6..1f1e97d 100644 --- a/src/async_impl/easy_proto_decoder.rs +++ b/src/async_impl/easy_proto_decoder.rs @@ -1,7 +1,7 @@ use crate::async_impl::proto_decoder::ProtoDecoder; use crate::async_impl::schema_registry::SrSettings; use crate::error::SRCError; -use protofish::Value; +use protofish::decode::Value; use std::sync::Arc; use tokio::sync::Mutex; @@ -28,7 +28,7 @@ mod tests { use crate::async_impl::easy_proto_decoder::EasyProtoDecoder; use crate::async_impl::schema_registry::SrSettings; use mockito::{mock, server_address}; - use protofish::Value; + use protofish::decode::Value; use test_utils::{get_proto_body, get_proto_hb_101, get_proto_hb_schema}; #[tokio::test] diff --git a/src/async_impl/json.rs b/src/async_impl/json.rs index 8d0da43..6f08fef 100644 --- a/src/async_impl/json.rs +++ b/src/async_impl/json.rs @@ -152,7 +152,7 @@ impl<'a> JsonDecoder<'a> { /// The actual deserialization trying to get the id from the bytes to retrieve the schema, and /// using a reader transforms the bytes to a value. async fn deserialize(&mut self, id: u32, bytes: &[u8]) -> Result { - let schema = self.get_schema(id).clone().await?; + let schema = self.schema(id).clone().await?; match serde_json::from_slice(bytes) { Ok(value) => Ok(DecodeResult { schema, value }), Err(e) => Err(SRCError::non_retryable_with_cause( @@ -163,7 +163,7 @@ impl<'a> JsonDecoder<'a> { } /// Gets the Context object, either from the cache, or from the schema registry and then putting /// it into the cache. - fn get_schema(&mut self, id: u32) -> &Shared>> { + fn schema(&mut self, id: u32) -> &Shared>> { match self.cache.entry(id) { Entry::Occupied(e) => &*e.into_mut(), Entry::Vacant(e) => { diff --git a/src/async_impl/proto_decoder.rs b/src/async_impl/proto_decoder.rs index 215d246..ae78f6c 100644 --- a/src/async_impl/proto_decoder.rs +++ b/src/async_impl/proto_decoder.rs @@ -4,7 +4,6 @@ use std::collections::HashMap; use bytes::Bytes; use futures::future::{BoxFuture, Shared}; use futures::FutureExt; -use protofish::{Context, MessageValue, Value}; use crate::async_impl::schema_registry::{ get_referenced_schema, get_schema_by_id_and_type, SrSettings, @@ -12,6 +11,8 @@ use crate::async_impl::schema_registry::{ use crate::error::SRCError; use crate::proto_resolver::{resolve_name, to_index_and_data, MessageResolver}; use crate::schema_registry_common::{get_bytes_result, BytesResult, RegisteredSchema, SchemaType}; +use protofish::context::Context; +use protofish::decode::{MessageValue, Value}; type SharedFutureOfSchemas<'a> = Shared, SRCError>>>; @@ -59,7 +60,7 @@ impl<'a> ProtoDecoder<'a> { /// The actual deserialization trying to get the id from the bytes to retrieve the schema, and /// using a reader transforms the bytes to a value. async fn deserialize(&mut self, id: u32, bytes: &[u8]) -> Result { - let vec_of_schemas = self.get_vec_of_schemas(id).clone().await?; + let vec_of_schemas = self.vec_of_schemas(id).clone().await?; let context = into_decode_context(&vec_of_schemas)?; let (index, data) = to_index_and_data(bytes); let full_name = resolve_name(&context.resolver, &index)?; @@ -68,7 +69,7 @@ impl<'a> ProtoDecoder<'a> { } /// Gets the Context object, either from the cache, or from the schema registry and then putting /// it into the cache. - fn get_vec_of_schemas(&mut self, id: u32) -> &SharedFutureOfSchemas<'a> { + fn vec_of_schemas(&mut self, id: u32) -> &SharedFutureOfSchemas<'a> { match self.cache.entry(id) { Entry::Occupied(e) => &*e.into_mut(), Entry::Vacant(e) => { @@ -132,10 +133,10 @@ async fn to_vec_of_schemas( #[cfg(test)] mod tests { use mockito::{mock, server_address}; - use protofish::Value; use crate::async_impl::proto_decoder::ProtoDecoder; use crate::async_impl::schema_registry::SrSettings; + use protofish::prelude::Value; use test_utils::{ get_proto_complex, get_proto_complex_proto_test_message, get_proto_complex_references, get_proto_hb_101, get_proto_hb_schema, get_proto_result, diff --git a/src/async_impl/proto_raw.rs b/src/async_impl/proto_raw.rs index 9a427c1..a8f89c9 100644 --- a/src/async_impl/proto_raw.rs +++ b/src/async_impl/proto_raw.rs @@ -52,7 +52,7 @@ impl<'a> ProtoRawEncoder<'a> { ) -> Result, SRCError> { let key = get_subject(&subject_name_strategy)?; let encode_context = self - .get_encoding_context(key, subject_name_strategy) + .encoding_context(key, subject_name_strategy) .clone() .await?; to_bytes(&encode_context, bytes, full_name) @@ -67,13 +67,13 @@ impl<'a> ProtoRawEncoder<'a> { ) -> Result, SRCError> { let key = get_subject(&subject_name_strategy)?; let encode_context = self - .get_encoding_context(key, subject_name_strategy) + .encoding_context(key, subject_name_strategy) .clone() .await?; to_bytes_single_message(&encode_context, bytes) } - fn get_encoding_context( + fn encoding_context( &mut self, key: String, subject_name_strategy: SubjectNameStrategy, diff --git a/src/blocking/avro.rs b/src/blocking/avro.rs index 2984e08..944204c 100644 --- a/src/blocking/avro.rs +++ b/src/blocking/avro.rs @@ -169,7 +169,7 @@ impl AvroDecoder { /// The actual deserialization trying to get the id from the bytes to retrieve the schema, and /// using a reader transforms the bytes to a value. fn deserialize(&mut self, id: u32, bytes: &[u8]) -> Result { - let schema = self.get_schema(id); + let schema = self.schema(id); let mut reader = Cursor::new(bytes); match schema { Ok(s) => match from_avro_datum(&s.parsed, &mut reader, None) { @@ -186,7 +186,7 @@ impl AvroDecoder { } } - fn get_schema(&mut self, id: u32) -> &Result { + fn schema(&mut self, id: u32) -> &Result { let sr_settings = &self.sr_settings; match self.cache.entry(id) { Entry::Occupied(e) => &*e.into_mut(), diff --git a/src/blocking/json.rs b/src/blocking/json.rs index e8d4f36..b25eb59 100644 --- a/src/blocking/json.rs +++ b/src/blocking/json.rs @@ -138,7 +138,7 @@ impl JsonDecoder { /// The actual deserialization trying to get the id from the bytes to retrieve the schema, and /// using a reader transforms the bytes to a value. fn deserialize(&mut self, id: u32, bytes: &[u8]) -> Result { - let schema = self.get_schema(id)?; + let schema = self.schema(id)?; match serde_json::from_slice(bytes) { Ok(value) => Ok(DecodeResult { schema, value }), Err(e) => Err(SRCError::non_retryable_with_cause( @@ -149,7 +149,7 @@ impl JsonDecoder { } /// Gets the Context object, either from the cache, or from the schema registry and then putting /// it into the cache. - fn get_schema(&mut self, id: u32) -> Result { + fn schema(&mut self, id: u32) -> Result { let url = match self.cache.entry(id) { Entry::Occupied(e) => &*e.into_mut(), Entry::Vacant(e) => { diff --git a/src/blocking/proto_decoder.rs b/src/blocking/proto_decoder.rs index da45494..b680d66 100644 --- a/src/blocking/proto_decoder.rs +++ b/src/blocking/proto_decoder.rs @@ -2,7 +2,6 @@ use std::collections::hash_map::{Entry, RandomState}; use std::collections::HashMap; use bytes::Bytes; -use protofish::{Context, MessageValue, Value}; use crate::blocking::schema_registry::{ get_referenced_schema, get_schema_by_id_and_type, SrSettings, @@ -10,6 +9,8 @@ use crate::blocking::schema_registry::{ use crate::error::SRCError; use crate::proto_resolver::{resolve_name, to_index_and_data, MessageResolver}; use crate::schema_registry_common::{get_bytes_result, BytesResult, RegisteredSchema, SchemaType}; +use protofish::decode::{MessageValue, Value}; +use protofish::context::Context; #[derive(Debug)] pub struct ProtoDecoder { @@ -52,7 +53,7 @@ impl ProtoDecoder { /// The actual deserialization trying to get the id from the bytes to retrieve the schema, and /// using a reader transforms the bytes to a value. fn deserialize(&mut self, id: u32, bytes: &[u8]) -> Result { - match self.get_context(id) { + match self.context(id) { Ok(s) => { let (index, data) = to_index_and_data(bytes); let full_name = resolve_name(&s.resolver, &index)?; @@ -64,7 +65,7 @@ impl ProtoDecoder { } /// Gets the Context object, either from the cache, or from the schema registry and then putting /// it into the cache. - fn get_context(&mut self, id: u32) -> &Result { + fn context(&mut self, id: u32) -> &Result { match self.cache.entry(id) { Entry::Occupied(e) => &*e.into_mut(), Entry::Vacant(e) => { @@ -117,10 +118,10 @@ fn to_resolve_context( #[cfg(test)] mod tests { use mockito::{mock, server_address}; - use protofish::Value; use crate::blocking::proto_decoder::ProtoDecoder; use crate::blocking::schema_registry::SrSettings; + use protofish::decode::Value; use test_utils::{ get_proto_body, get_proto_body_with_reference, get_proto_complex, get_proto_complex_proto_test_message, get_proto_complex_references, get_proto_hb_101, diff --git a/src/blocking/proto_raw.rs b/src/blocking/proto_raw.rs index 92c2780..4a1f7ce 100644 --- a/src/blocking/proto_raw.rs +++ b/src/blocking/proto_raw.rs @@ -44,7 +44,7 @@ impl ProtoRawEncoder { subject_name_strategy: &SubjectNameStrategy, ) -> Result, SRCError> { let key = get_subject(subject_name_strategy)?; - match self.get_encoding_context(key, subject_name_strategy) { + match self.encoding_context(key, subject_name_strategy) { Ok(encode_context) => to_bytes(encode_context, bytes, full_name), Err(e) => Err(Clone::clone(e)), } @@ -56,13 +56,13 @@ impl ProtoRawEncoder { subject_name_strategy: &SubjectNameStrategy, ) -> Result, SRCError> { let key = get_subject(subject_name_strategy)?; - match self.get_encoding_context(key, subject_name_strategy) { + match self.encoding_context(key, subject_name_strategy) { Ok(encode_context) => to_bytes_single_message(encode_context, bytes), Err(e) => Err(Clone::clone(e)), } } - fn get_encoding_context( + fn encoding_context( &mut self, key: String, subject_name_strategy: &SubjectNameStrategy, @@ -121,7 +121,7 @@ impl ProtoRawDecoder { /// The actual deserialization trying to get the id from the bytes to retrieve the schema, and /// using a reader transforms the bytes to a value. fn deserialize(&mut self, id: u32, bytes: &[u8]) -> Result { - match self.get_context(id) { + match self.context(id) { Ok(s) => { let (index, data) = to_index_and_data(bytes); let full_name = resolve_name(&s.resolver, &index)?; @@ -136,7 +136,7 @@ impl ProtoRawDecoder { } /// Gets the Context object, either from the cache, or from the schema registry and then putting /// it into the cache. - fn get_context(&mut self, id: u32) -> &Result { + fn context(&mut self, id: u32) -> &Result { match self.cache.entry(id) { Entry::Occupied(e) => &*e.into_mut(), Entry::Vacant(e) => {