From 12c08021a63d0cbb024619762157ecf358c6e84f Mon Sep 17 00:00:00 2001 From: Roman Shtylman Date: Sun, 22 Oct 2023 15:09:51 -0700 Subject: [PATCH] add note on arrays of schemas --- rust/examples/avro/src/bin/writer.rs | 143 ++++++++++++++++++++------- website/docs/spec/registry.md | 18 ++++ 2 files changed, 125 insertions(+), 36 deletions(-) diff --git a/rust/examples/avro/src/bin/writer.rs b/rust/examples/avro/src/bin/writer.rs index c52d1c1e57..afb72bbc7d 100644 --- a/rust/examples/avro/src/bin/writer.rs +++ b/rust/examples/avro/src/bin/writer.rs @@ -41,6 +41,14 @@ struct PosesInFrame { poses: Vec, } +#[derive(Debug, Serialize)] + +struct Custom { + my_field: String, + point_a: Vector3, + point_b: Vector3, +} + fn main() { let raw_schema_time = r#" { @@ -101,16 +109,28 @@ fn main() { ] }"#; - let schemas = Schema::parse_list(&[ - raw_schema_time, - raw_schema_vector, - raw_schema_quaternion, - raw_schema_pose, - raw_schema_poses_in_frame, - ]) - .unwrap(); + let raw_schema_custom = r#" + { + "type": "record", + "name": "Custom", + "namespace": "another", + "fields": [ + { "name": "my_field", "type": "string" }, + { "name": "point_a", "type": { + "type": "record", + "name": "foxglove.Vector3", + "fields": [ + { "name": "x", "type": "double" }, + { "name": "y", "type": "double" }, + { "name": "z", "type": "double" } + ] + }}, + { "name": "point_b", "type": "foxglove.Vector3" } + ] + }"#; - // for multiple schemas we need to write them as an array + // We can also write multiple schemas as an array. A schema definition must + // appear before it is used. let arr = format!( "[{}]", vec![ @@ -123,19 +143,32 @@ fn main() { .join(",") ); - let schema_b = mcap::Schema { + let schema_poses_in_frame = mcap::Schema { name: "foxglove.PosesInFrame".to_string(), encoding: "avro".to_string(), data: Cow::Borrowed(arr.as_bytes()), }; let channel_poses = mcap::Channel { - schema: Some(Arc::new(schema_b.to_owned())), + schema: Some(Arc::new(schema_poses_in_frame.to_owned())), topic: "poses".to_string(), message_encoding: "avro".to_string(), metadata: std::collections::BTreeMap::new(), }; + let schema_custom = mcap::Schema { + name: "another.Custom".to_string(), + encoding: "avro".to_string(), + data: Cow::Borrowed(raw_schema_custom.as_bytes()), + }; + + let channel_custom = mcap::Channel { + schema: Some(Arc::new(schema_custom.to_owned())), + topic: "custom".to_string(), + message_encoding: "avro".to_string(), + metadata: std::collections::BTreeMap::new(), + }; + let mut avro_mcap = mcap::Writer::new(BufWriter::new(File::create("avro.mcap").unwrap())).unwrap(); @@ -144,13 +177,6 @@ fn main() { .expect("Couldn't write channel"); { - // fetch_schema_ref? but not accessible cause we don't get the parser that parse_list uses - let time_schema = schemas.get(0).unwrap(); - let vector3_schema = schemas.get(1).unwrap(); - let quat_schema = schemas.get(2).unwrap(); - let pose_schema = schemas.get(3).unwrap(); - let poses_schema = schemas.get(4).unwrap(); - let pose_1 = Pose { position: Vector3 { x: 0.0, @@ -188,24 +214,69 @@ fn main() { poses: vec![pose_1, pose_2], }; - { - let encoded = apache_avro::to_avro_datum_schemata( - &poses_schema, - [time_schema, vector3_schema, quat_schema, pose_schema].into(), - apache_avro::to_value(&poses).unwrap(), - ) - .unwrap(); - - let message = mcap::Message { - channel: Arc::new(channel_poses.to_owned()), - data: Cow::from(encoded), - log_time: 1000000, - publish_time: 0, - sequence: 0, - }; - - avro_mcap.write(&message).unwrap(); - } + let schemas = Schema::parse_list(&[ + raw_schema_time, + raw_schema_vector, + raw_schema_quaternion, + raw_schema_pose, + raw_schema_poses_in_frame, + ]) + .unwrap(); + + // fetch_schema_ref? but not accessible cause we don't get the parser that parse_list uses + let time_schema = schemas.get(0).unwrap(); + let vector3_schema = schemas.get(1).unwrap(); + let quat_schema = schemas.get(2).unwrap(); + let pose_schema = schemas.get(3).unwrap(); + let poses_schema = schemas.get(4).unwrap(); + + let encoded = apache_avro::to_avro_datum_schemata( + &poses_schema, + [time_schema, vector3_schema, quat_schema, pose_schema].into(), + apache_avro::to_value(&poses).unwrap(), + ) + .unwrap(); + + let message = mcap::Message { + channel: Arc::new(channel_poses.to_owned()), + data: Cow::from(encoded), + log_time: 1000000, + publish_time: 0, + sequence: 0, + }; + + avro_mcap.write(&message).unwrap(); + } + + { + let custom = Custom { + my_field: "custom field".to_string(), + point_a: Vector3 { + x: 1.0, + y: 2.0, + z: 3.0, + }, + point_b: Vector3 { + x: 4.0, + y: 5.0, + z: 6.0, + }, + }; + + let schema = Schema::parse_str(&raw_schema_custom).unwrap(); + + let encoded = + apache_avro::to_avro_datum(&schema, apache_avro::to_value(&custom).unwrap()).unwrap(); + + let message = mcap::Message { + channel: Arc::new(channel_custom.to_owned()), + data: Cow::from(encoded), + log_time: 1000000, + publish_time: 0, + sequence: 0, + }; + + avro_mcap.write(&message).unwrap(); } avro_mcap.finish().unwrap(); diff --git a/website/docs/spec/registry.md b/website/docs/spec/registry.md index 71a92a99c2..7700e317b8 100644 --- a/website/docs/spec/registry.md +++ b/website/docs/spec/registry.md @@ -42,6 +42,10 @@ The Channel `message_encoding` field describes the encoding for all messages wit - `message_encoding`: [`json`](https://www.json.org/json-en.html) +### avro + +- `message_encoding`: [`avro`](https://avro.apache.org/) (binary encoding) + ## Schema encodings The Schema `encoding` field describes the encoding of a Channel's schema. Typically, this is related to the Channel's `message_encoding`, but they are separate concepts (e.g. there are multiple schema languages for `json`). @@ -186,6 +190,20 @@ For this example, `schema.name` should be set to `top_level_module::my_module::M - `encoding`: `jsonschema` - `data`: [JSON Schema](https://json-schema.org) +### avro + +- `name`: Fully qualified name of the record type (including namespace), e.g. `example.MyRecord` +- `encoding`: `avro` +- `data`: utf8 encoded json object or array with a valid [AVRO schema declaration](https://avro.apache.org/docs/1.11.1/specification/#schema-declaration) + +In AVRO schemas a name must be defined before used as noted in the AVRO specification: + +> Further, a name must be defined before it is used (“before” in the depth-first, left-to-right traversal of the JSON parse tree, where the types attribute of a protocol is always deemed to come “before” the messages attribute.) + +You can define a name inline using a single schema object for `data` or an array of schema objects. If the `data` is an array of schemas, the `name` must reference a single +"record" within the array of schemas. This referenced record type will be used as the schema for the +channel and messages. The array of schemas DO NOT represent a union type for channel messages. + ## Profiles ### ROS1