Skip to content

Commit

Permalink
add note on arrays of schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
defunctzombie committed Oct 22, 2023
1 parent d330fc2 commit 12c0802
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 36 deletions.
143 changes: 107 additions & 36 deletions rust/examples/avro/src/bin/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ struct PosesInFrame {
poses: Vec<Pose>,
}

#[derive(Debug, Serialize)]

struct Custom {
my_field: String,
point_a: Vector3,
point_b: Vector3,
}

fn main() {
let raw_schema_time = r#"
{
Expand Down Expand Up @@ -101,16 +109,28 @@ fn main() {
]
}"#;

let schemas = Schema::parse_list(&[
raw_schema_time,
raw_schema_vector,
raw_schema_quaternion,
raw_schema_pose,
raw_schema_poses_in_frame,
])
.unwrap();
let raw_schema_custom = r#"
{
"type": "record",
"name": "Custom",
"namespace": "another",
"fields": [
{ "name": "my_field", "type": "string" },
{ "name": "point_a", "type": {
"type": "record",
"name": "foxglove.Vector3",
"fields": [
{ "name": "x", "type": "double" },
{ "name": "y", "type": "double" },
{ "name": "z", "type": "double" }
]
}},
{ "name": "point_b", "type": "foxglove.Vector3" }
]
}"#;

// for multiple schemas we need to write them as an array
// We can also write multiple schemas as an array. A schema definition must
// appear before it is used.
let arr = format!(
"[{}]",
vec![
Expand All @@ -123,19 +143,32 @@ fn main() {
.join(",")
);

let schema_b = mcap::Schema {
let schema_poses_in_frame = mcap::Schema {
name: "foxglove.PosesInFrame".to_string(),
encoding: "avro".to_string(),
data: Cow::Borrowed(arr.as_bytes()),
};

let channel_poses = mcap::Channel {
schema: Some(Arc::new(schema_b.to_owned())),
schema: Some(Arc::new(schema_poses_in_frame.to_owned())),
topic: "poses".to_string(),
message_encoding: "avro".to_string(),
metadata: std::collections::BTreeMap::new(),
};

let schema_custom = mcap::Schema {
name: "another.Custom".to_string(),
encoding: "avro".to_string(),
data: Cow::Borrowed(raw_schema_custom.as_bytes()),
};

let channel_custom = mcap::Channel {
schema: Some(Arc::new(schema_custom.to_owned())),
topic: "custom".to_string(),
message_encoding: "avro".to_string(),
metadata: std::collections::BTreeMap::new(),
};

let mut avro_mcap =
mcap::Writer::new(BufWriter::new(File::create("avro.mcap").unwrap())).unwrap();

Expand All @@ -144,13 +177,6 @@ fn main() {
.expect("Couldn't write channel");

{
// fetch_schema_ref? but not accessible cause we don't get the parser that parse_list uses
let time_schema = schemas.get(0).unwrap();
let vector3_schema = schemas.get(1).unwrap();
let quat_schema = schemas.get(2).unwrap();
let pose_schema = schemas.get(3).unwrap();
let poses_schema = schemas.get(4).unwrap();

let pose_1 = Pose {
position: Vector3 {
x: 0.0,
Expand Down Expand Up @@ -188,24 +214,69 @@ fn main() {
poses: vec![pose_1, pose_2],
};

{
let encoded = apache_avro::to_avro_datum_schemata(
&poses_schema,
[time_schema, vector3_schema, quat_schema, pose_schema].into(),
apache_avro::to_value(&poses).unwrap(),
)
.unwrap();

let message = mcap::Message {
channel: Arc::new(channel_poses.to_owned()),
data: Cow::from(encoded),
log_time: 1000000,
publish_time: 0,
sequence: 0,
};

avro_mcap.write(&message).unwrap();
}
let schemas = Schema::parse_list(&[
raw_schema_time,
raw_schema_vector,
raw_schema_quaternion,
raw_schema_pose,
raw_schema_poses_in_frame,
])
.unwrap();

// fetch_schema_ref? but not accessible cause we don't get the parser that parse_list uses
let time_schema = schemas.get(0).unwrap();
let vector3_schema = schemas.get(1).unwrap();
let quat_schema = schemas.get(2).unwrap();
let pose_schema = schemas.get(3).unwrap();
let poses_schema = schemas.get(4).unwrap();

let encoded = apache_avro::to_avro_datum_schemata(
&poses_schema,
[time_schema, vector3_schema, quat_schema, pose_schema].into(),
apache_avro::to_value(&poses).unwrap(),
)
.unwrap();

let message = mcap::Message {
channel: Arc::new(channel_poses.to_owned()),
data: Cow::from(encoded),
log_time: 1000000,
publish_time: 0,
sequence: 0,
};

avro_mcap.write(&message).unwrap();
}

{
let custom = Custom {
my_field: "custom field".to_string(),
point_a: Vector3 {
x: 1.0,
y: 2.0,
z: 3.0,
},
point_b: Vector3 {
x: 4.0,
y: 5.0,
z: 6.0,
},
};

let schema = Schema::parse_str(&raw_schema_custom).unwrap();

let encoded =
apache_avro::to_avro_datum(&schema, apache_avro::to_value(&custom).unwrap()).unwrap();

let message = mcap::Message {
channel: Arc::new(channel_custom.to_owned()),
data: Cow::from(encoded),
log_time: 1000000,
publish_time: 0,
sequence: 0,
};

avro_mcap.write(&message).unwrap();
}

avro_mcap.finish().unwrap();
Expand Down
18 changes: 18 additions & 0 deletions website/docs/spec/registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ The Channel `message_encoding` field describes the encoding for all messages wit

- `message_encoding`: [`json`](https://www.json.org/json-en.html)

### avro

- `message_encoding`: [`avro`](https://avro.apache.org/) (binary encoding)

## Schema encodings

The Schema `encoding` field describes the encoding of a Channel's schema. Typically, this is related to the Channel's `message_encoding`, but they are separate concepts (e.g. there are multiple schema languages for `json`).
Expand Down Expand Up @@ -186,6 +190,20 @@ For this example, `schema.name` should be set to `top_level_module::my_module::M
- `encoding`: `jsonschema`
- `data`: [JSON Schema](https://json-schema.org)

### avro

- `name`: Fully qualified name of the record type (including namespace), e.g. `example.MyRecord`
- `encoding`: `avro`
- `data`: utf8 encoded json object or array with a valid [AVRO schema declaration](https://avro.apache.org/docs/1.11.1/specification/#schema-declaration)

In AVRO schemas a name must be defined before used as noted in the AVRO specification:

> Further, a name must be defined before it is used (“before” in the depth-first, left-to-right traversal of the JSON parse tree, where the types attribute of a protocol is always deemed to come “before” the messages attribute.)
You can define a name inline using a single schema object for `data` or an array of schema objects. If the `data` is an array of schemas, the `name` must reference a single
"record" within the array of schemas. This referenced record type will be used as the schema for the
channel and messages. The array of schemas DO NOT represent a union type for channel messages.

## Profiles

### ROS1
Expand Down

0 comments on commit 12c0802

Please sign in to comment.