Skip to content

Commit

Permalink
fix #39 and prepare for 2.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
gklijs committed Nov 10, 2020
1 parent 9242bd9 commit 14e2247
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 132 deletions.
44 changes: 20 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "schema_registry_converter"
version = "2.0.0"
version = "2.0.1"
authors = ["Gerard Klijs <[email protected]>"]
include = ["src/**/*", "Cargo.toml"]
description = "Encode/decode data from/to kafka using the Confluent Schema Registry"
Expand All @@ -21,26 +21,37 @@ proto_raw = ["integer-encoding", "logos"]
kafka_test = []
default = ["futures"]

[dependencies.avro-rs]
[dependencies.byteorder]
version = "^1.3"

[dependencies.failure]
version = "^0.1"

[dependencies.reqwest]
version = "^0.10"
features = ["json"]

[dependencies.serde]
version = "^1.0"
features = ["derive"]

[dependencies.serde_json]
version = "^1.0"

[dependencies.avro-rs]
version = "^0.11"
optional = true

[dependencies.bytes]
version = "^0.5"
optional = true

[dependencies.byteorder]
version = "^1.3"

[dependencies.failure]
version = "^0.1"

[dependencies.futures]
version = "^0.3"
optional = true

[dependencies.integer-encoding]
version = "^1.1"
version = "^2.1"
optional = true

[dependencies.logos]
Expand All @@ -51,21 +62,6 @@ optional = true
version = "^0.2"
optional = true

[dependencies.reqwest]
version = "^0.10"
features = ["json"]

[dependencies.serde]
version = "^1.0"
features = ["derive"]

[dependencies.serde_json]
version = "^1.0"

[dependencies.tokio]
version = "0.2.22"
optional = true

[dependencies.url]
version = "^2"
optional = true
Expand Down
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ To use it to convert using avro async use:

```toml
[dependencies]
schema_registry_converter = { version = "2.0.0", features = ["avro"] }
schema_registry_converter = { version = "2.0.1", features = ["avro"] }
```

...and see the [docs](https://docs.rs/schema_registry_converter) for how to use it.
Expand All @@ -44,7 +44,14 @@ All the converters also have a blocking (non async) version, in that case use so

```toml
[dependencies]
schema_registry_converter = { version = "2.0.0", default-features = false, features = ["avro", "blocking"]}
schema_registry_converter = { version = "2.0.1", default-features = false, features = ["avro", "blocking"]}
```

If you need to use both in a project you can use something like, but have to be weary you import the correct paths depending on your use.

```toml
[dependencies]
schema_registry_converter = { version = "2.0.1", features = ["avro", "blocking"]}
```

# Example with consumer and producer using Avro
Expand Down
7 changes: 6 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## Release notes

### 2.0.1

Maintenance release with mainly updated dependencies, making the blocking sr settings cloneable and no longer needs `kafka_test` feature to use both blocking and async in the same project.

### 2.0.0

This release has a breaking change in the SubjectNameStrategy where the supplied schema now is in a Box, to keep the size of the Enum smaller.
Expand All @@ -10,7 +14,7 @@ Another major change is by default support for async.

To use the new version of the library, and continue to use it in a blocking way like it was before, you need to use the library like:
```toml
schema_registry_converter = { version = "2.0.0", default-features = false, features = ["avro", "blocking"]}
schema_registry_converter = { version = "2.0.1", default-features = false, features = ["avro", "blocking"]}
```
Also the Converters are moved to the blocking module, and to create the converters you need a SrSettings object, which can be created with just the
schema registry url.
Expand Down Expand Up @@ -57,5 +61,6 @@ instead of the `encode` function on the encoder.

#### Contributors

- [@cbzehner](https://github.com/cbzehner)
- [@kitsuneninetails](https://github.com/kitsuneninetails)
- [@j-halbert](https://github.com/j-halbert)
40 changes: 8 additions & 32 deletions src/async_impl/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,16 +710,9 @@ mod tests {

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let mut decoder = AvroDecoder::new(sr_settings);
let heartbeat = decoder.decode(Some(&[0, 0, 0, 0, 1])).await.unwrap_err();
let error = decoder.decode(Some(&[0, 0, 0, 0, 1])).await.unwrap_err();

assert_eq!(
heartbeat,
SRCError::new(
"Could not transform bytes using schema",
Some(String::from("failed to fill whole buffer")),
false,
)
)
assert_eq!(error.error, "Could not transform bytes using schema")
}

#[tokio::test]
Expand All @@ -732,16 +725,9 @@ mod tests {

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let mut decoder = AvroDecoder::new(sr_settings);
let heartbeat = decoder.decode(Some(&[0, 0, 0, 0, 1])).await.unwrap_err();
let error = decoder.decode(Some(&[0, 0, 0, 0, 1])).await.unwrap_err();

assert_eq!(
heartbeat,
SRCError::new(
"Could not transform bytes using schema",
Some(String::from("failed to fill whole buffer")),
false,
)
)
assert_eq!(error.error, "Could not transform bytes using schema")
}

#[tokio::test]
Expand Down Expand Up @@ -836,14 +822,8 @@ mod tests {
let err = decoder.decode(Some(&[0, 0, 0, 0, 1, 6])).await.unwrap_err();

assert_eq!(
err,
SRCError::new(
"Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Heartbeat\\\",\\\"namespace\\\":\\\"nl.openweb.data\\\"}\" cant be turned into a Schema",
Some(String::from(
"Failed to parse schema: No `fields` in record"
)),
false,
)
err.error,
"Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Heartbeat\\\",\\\"namespace\\\":\\\"nl.openweb.data\\\"}\" cant be turned into a Schema",
)
}

Expand Down Expand Up @@ -1308,12 +1288,8 @@ mod tests {
.await
.unwrap_err();
assert_eq!(
result,
SRCError::new(
"Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Name\\\"}\" cant be turned into a Schema",
Some(String::from("Failed to parse schema: No `fields` in record")),
false,
)
result.error,
"Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Name\\\"}\" cant be turned into a Schema"
)
}

Expand Down
41 changes: 8 additions & 33 deletions src/avro_common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashSet;

use avro_rs::schema::{Name, Schema};
use avro_rs::types::{Record, ToAvro, Value};
use avro_rs::types::{Record, Value};
use avro_rs::{to_avro_datum, to_value};
use serde::ser::Serialize;
use serde_json::{value, Map};
Expand Down Expand Up @@ -95,7 +95,7 @@ pub(crate) fn replace_reference(parent: value::Value, child: value::Value) -> va
}
}

fn to_bytes<T: ToAvro>(avro_schema: &AvroSchema, record: T) -> Result<Vec<u8>, SRCError> {
fn to_bytes(avro_schema: &AvroSchema, record: Value) -> Result<Vec<u8>, SRCError> {
match to_avro_datum(&avro_schema.parsed, record) {
Ok(v) => Ok(get_payload(avro_schema.id, v)),
Err(e) => Err(SRCError::non_retryable_with_cause(
Expand Down Expand Up @@ -124,7 +124,7 @@ pub(crate) fn values_to_bytes(
for value in values {
record.put(value.0, value.1)
}
to_bytes(avro_schema, record)
to_bytes(avro_schema, Value::from(record))
}

/// Using the schema with an item implementing serialize the item will be correctly deserialized
Expand Down Expand Up @@ -200,17 +200,8 @@ mod tests {
raw: String::from(r#"{"type":"record","name":"Name","namespace":"nl.openweb.data","fields":[{"name":"name","type":"string","avro.java.string":"String"}]}"#),
parsed: Schema::parse_str(r#"{"type":"record","name":"Name","namespace":"nl.openweb.data","fields":[{"name":"name","type":"string","avro.java.string":"String"}]}"#).unwrap(),
};
let result = values_to_bytes(&schema, vec![("beat", Value::Long(3))]);
assert_eq!(
result,
Err(SRCError::new(
"Could not get Avro bytes",
Some(String::from(
"Validation error: value does not match schema"
)),
false,
))
)
let err = values_to_bytes(&schema, vec![("beat", Value::Long(3))]).unwrap_err();
assert_eq!(err.error, "Could not get Avro bytes")
}
#[test]
fn item_to_bytes_no_tranfer_wrong() {
Expand All @@ -224,16 +215,7 @@ mod tests {
).unwrap(),
};
let err = crate::avro_common::item_to_bytes(&schema, Heartbeat { beat: 3 }).unwrap_err();
assert_eq!(
err,
SRCError::new(
"Failed to resolve",
Some(String::from(
"Schema resoulution error: missing field name in record"
)),
false,
)
)
assert_eq!(err.error, "Failed to resolve")
}

#[test]
Expand All @@ -253,14 +235,7 @@ mod tests {
],
a_type: Atype::Manual,
};
let result = crate::avro_common::item_to_bytes(&schema, item).unwrap_err();
assert_eq!(
result,
SRCError::new(
"Failed to resolve",
Some(String::from("Schema resoulution error: String expected, got Array([Int(204), Int(240), Int(237), Int(74), Int(227), Int(188), Int(75), Int(46), Int(183), Int(163), Int(122), Int(214), Int(178), Int(72), Int(118), Int(162)])")),
false,
)
)
let err = crate::avro_common::item_to_bytes(&schema, item).unwrap_err();
assert_eq!(err.error, "Failed to resolve")
}
}
47 changes: 10 additions & 37 deletions src/blocking/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,16 +666,9 @@ mod tests {

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let mut decoder = AvroDecoder::new(sr_settings);
let heartbeat = decoder.decode(Some(&[0, 0, 0, 0, 1]));
let err = decoder.decode(Some(&[0, 0, 0, 0, 1])).unwrap_err();

assert_eq!(
heartbeat,
Err(SRCError::new(
"Could not transform bytes using schema",
Some(String::from("failed to fill whole buffer")),
false,
))
)
assert_eq!(err.error, "Could not transform bytes using schema")
}

#[test]
Expand All @@ -688,16 +681,9 @@ mod tests {

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let mut decoder = AvroDecoder::new(sr_settings);
let heartbeat = decoder.decode(Some(&[0, 0, 0, 0, 1]));
let err = decoder.decode(Some(&[0, 0, 0, 0, 1])).unwrap_err();

assert_eq!(
heartbeat,
Err(SRCError::new(
"Could not transform bytes using schema",
Some(String::from("failed to fill whole buffer")),
false,
))
)
assert_eq!(err.error, "Could not transform bytes using schema")
}

#[test]
Expand Down Expand Up @@ -790,17 +776,11 @@ mod tests {

let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let mut decoder = AvroDecoder::new(sr_settings);
let heartbeat = decoder.decode(Some(&[0, 0, 0, 0, 1, 6]));
let err = decoder.decode(Some(&[0, 0, 0, 0, 1, 6])).unwrap_err();

assert_eq!(
heartbeat,
Err(SRCError::new(
"Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Heartbeat\\\",\\\"namespace\\\":\\\"nl.openweb.data\\\"}\" cant be turned into a Schema",
Some(String::from(
"Failed to parse schema: No `fields` in record"
)),
false,
))
err.error,
"Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Heartbeat\\\",\\\"namespace\\\":\\\"nl.openweb.data\\\"}\" cant be turned into a Schema",
)
}

Expand Down Expand Up @@ -1241,17 +1221,10 @@ mod tests {
references: vec![],
};
let sr_settings = SrSettings::new(format!("http://{}", server_address()));
let result = match to_avro_schema(&sr_settings, registered_schema) {
Err(e) => e,
_ => panic!(),
};
let err = to_avro_schema(&sr_settings, registered_schema).unwrap_err();
assert_eq!(
result,
SRCError::new(
"Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Name\\\"}\" cant be turned into a Schema",
Some(String::from("Failed to parse schema: No `fields` in record")),
false,
)
err.error,
"Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Name\\\"}\" cant be turned into a Schema"
)
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! automatically does retries.
//!
//! [avro-rs]: https://crates.io/crates/avro-rs
#[cfg(any(not(feature = "blocking"), feature = "kafka_test"))]
#[cfg(feature = "futures")]
pub mod async_impl;
#[cfg(feature = "avro")]
pub mod avro_common;
Expand Down
2 changes: 1 addition & 1 deletion tests/blocking/avro_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn consume_avro(
registry: String,
topics: &[&str],
auto_commit: bool,
test: Box<dyn Fn(DeserializedAvroRecord) -> ()>,
test: Box<dyn Fn(DeserializedAvroRecord)>,
) {
let sr_settings = SrSettings::new(registry);
let mut decoder = AvroDecoder::new(sr_settings);
Expand Down
2 changes: 1 addition & 1 deletion tests/blocking/avro_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn get_heartbeat_schema() -> Box<SuppliedSchema> {
})
}

fn test_beat_value(key_value: i64, value_value: i64) -> Box<dyn Fn(DeserializedAvroRecord) -> ()> {
fn test_beat_value(key_value: i64, value_value: i64) -> Box<dyn Fn(DeserializedAvroRecord)> {
Box::new(move |rec: DeserializedAvroRecord| {
println!("testing record {:#?}", rec);
let key_values = match rec.key {
Expand Down

0 comments on commit 14e2247

Please sign in to comment.