diff --git a/Cargo.toml b/Cargo.toml index 4dbbe07f..3c8cb97b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,11 +101,11 @@ blocks_in_conditions = "allow" # opentelemetry tracing macro would be noisy [workspace.dependencies] actix-cors = { version = "0.7" } -actix-multipart = { version = "0.6", features = ["derive", "tempfile"] } +actix-multipart = { version = "0.7", features = ["derive", "tempfile"] } actix-web = { version = "4.6", default-features = false, features = ["macros"] } actix-web-opentelemetry = { version = "0.18", features = ["metrics"] } anyhow = { version = "1.0", features = ["backtrace"] } -arrow = { version = "51" } # should be synced with deltalake and lancedb +arrow = { version = "52" } # should be synced with deltalake and lancedb argon2 = { version = "0.5" } async-nats = { version = "0.35", default-features = false, features = [ "server_2_10", @@ -122,7 +122,7 @@ chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.5", features = ["derive", "env", "string"] } csv = { version = "1.3" } ctrlc = { version = "3.4" } -deltalake = { version = "0.17", features = [ +deltalake = { version = "0.18", features = [ "datafusion", "datafusion-ext", "json", @@ -142,7 +142,7 @@ egui = { version = "0.27" } egui_graphs = { version = "0.20" } email_address = { version = "0.2" } futures = { version = "0.3" } -gethostname = { version = "0.4" } +gethostname = { version = "0.5" } glob = { version = "0.3" } hickory-server = { version = "*", default-features = false, features = [ "backtrace", @@ -154,15 +154,14 @@ inflector = { package = "Inflector", version = "0.11" } ipnet = { version = "2.9", features = ["schemars", "serde"] } itertools = { version = "0.13" } k8s-openapi = { version = "0.22", features = ["latest", "schemars"] } -kube = { version = "0.92", default-features = false } +kube = { version = "0.93", default-features = false } lalrpop = { version = "0.20" } lalrpop-util = { version = "0.20", features = ["lexer", "unicode"] } -lancedb = { version = "0.5", default-features = false } +lancedb = { version = "0.7", default-features = false } # langchain-rust = { version = "4.1", default-features = false } mime = { version = "0.3" } # FIXME: push a PR: rustls-tls feature support minio = { git = "https://github.com/ulagbulag/minio-rs.git", version = "0.2.0-alpha", default-features = false } # not deployed to crates.io -map-macro = { version = "0.3" } maplit = { version = "1.0" } ndarray = { version = "0.15" } num-traits = { version = "0.2" } @@ -173,18 +172,18 @@ octocrab = { version = "0.38", default-features = false, features = [ opencv = { version = "0.92", default-features = false, features = [ "clang-runtime", ] } -opentelemetry = { version = "0.23", features = ["logs", "metrics", "trace"] } -opentelemetry-appender-tracing = { version = "0.4", features = [ +opentelemetry = { version = "0.24", features = ["logs", "metrics", "trace"] } +opentelemetry-appender-tracing = { version = "0.5", features = [ # "experimental_metadata_attributes", "logs_level_enabled", ] } -opentelemetry-otlp = { version = "0.16", features = [ +opentelemetry-otlp = { version = "0.17", features = [ "logs", "metrics", "trace", ] } -opentelemetry-proto = { version = "0.6", features = ["with-serde", "zpages"] } -opentelemetry_sdk = { version = "0.23", features = [ +opentelemetry-proto = { version = "0.7", features = ["with-serde", "zpages"] } +opentelemetry_sdk = { version = "0.24", features = [ "logs_level_enabled", "metrics", "rt-tokio", @@ -205,7 +204,7 @@ ordered-float = { version = "4.2", default-features = false, features = [ "std", ] } petgraph = { version = "0.6" } -polars = { version = "0.40", features = [ +polars = { version = "0.41", features = [ "diagonal_concat", "diff", "fmt", @@ -222,7 +221,7 @@ polars = { version = "0.40", features = [ ] } procfs = { version = "0.16" } prometheus-http-query = { version = "0.8", default-features = false } -pyo3 = { version = "0.21" } +pyo3 = { version = "0.22" } r2r = { version = "0.9" } rand = { version = "0.8" } rand_distr = { version = "0.4" } @@ -280,11 +279,11 @@ tera = { version = "1.19" } thiserror = { version = "1.0" } tokio = { version = "1.37", features = ["macros", "rt"] } tokio-stream = { version = "0.1" } -tonic = { version = "0.11", features = [ +tonic = { version = "0.12", features = [ "gzip", ] } # should be synced with opentelemetry-proto tracing = { version = "0.1" } -tracing-opentelemetry = { version = "0.24", features = [ +tracing-opentelemetry = { version = "0.25", features = [ "metrics", "tracing-log", ] } # should be synced with opentelemetry @@ -301,9 +300,5 @@ winit = { version = "0.29", features = [ ] } # should be synced with eframe [patch.crates-io] -# FIXME: Sync with the components -deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "rust-v0.17.3" } -deltalake-aws = { git = "https://github.com/delta-io/delta-rs", rev = "rust-v0.17.3" } - # FIXME: Waiting for PR merged: https://github.com/GREsau/schemars/pull/255 schemars = { git = "https://github.com/ulagbulag/schemars" } diff --git a/crates/ark/core/src/tracer.rs b/crates/ark/core/src/tracer.rs index 863eaec0..4ab7c1fa 100644 --- a/crates/ark/core/src/tracer.rs +++ b/crates/ark/core/src/tracer.rs @@ -78,10 +78,13 @@ fn init_once_opentelemetry(export: bool) { where S: Subscriber + for<'span> LookupSpan<'span>, { + use opentelemetry::trace::TracerProvider; + otlp::new_pipeline() .tracing() .with_exporter(init_otlp_pipeline()) .install_batch(Runtime) + .map(|provider| provider.tracer("ark-tracer")) .map(::tracing_opentelemetry::OpenTelemetryLayer::new) .expect("failed to init a tracer") } diff --git a/crates/dash/pipe/provider/Cargo.toml b/crates/dash/pipe/provider/Cargo.toml index 3a7fb5dd..707142e1 100644 --- a/crates/dash/pipe/provider/Cargo.toml +++ b/crates/dash/pipe/provider/Cargo.toml @@ -35,7 +35,7 @@ ros2 = ["dep:r2r"] # storage storage = ["deltalake", "s3"] -deltalake = ["arrow", "dash-api", "dep:deltalake", "inflector", "map-macro"] +deltalake = ["arrow", "dash-api", "dep:deltalake", "inflector"] lancedb = ["arrow", "dep:lancedb", "object_store/aws"] s3 = ["chrono", "minio"] @@ -74,7 +74,6 @@ gethostname = { workspace = true } inflector = { workspace = true, optional = true } kube = { workspace = true } lancedb = { workspace = true, optional = true } -map-macro = { workspace = true, optional = true } minio = { workspace = true, optional = true } object_store = { workspace = true, optional = true } opentelemetry = { workspace = true } diff --git a/crates/dash/pipe/provider/src/schema/deltalake/mod.rs b/crates/dash/pipe/provider/src/schema/deltalake/mod.rs index f9ed45fb..d3238987 100644 --- a/crates/dash/pipe/provider/src/schema/deltalake/mod.rs +++ b/crates/dash/pipe/provider/src/schema/deltalake/mod.rs @@ -12,12 +12,10 @@ use deltalake::kernel::{ ArrayType, DataType as DeltaDataType, MapType, PrimitiveType, PrimitiveType as DeltaPrimitiveType, StructField, StructType, }; -use map_macro::hash_map; use schemars::schema::{ ArrayValidation, InstanceType, ObjectValidation, RootSchema, Schema, SchemaObject, SingleOrVec, SubschemaValidation, }; -use serde_json::{json, Value}; use super::arrow::ToDataType; @@ -90,7 +88,6 @@ impl ToDataType for PrimitiveType { impl ToDataType for StructType { fn to_data_type(&self) -> Result { self.fields() - .iter() .map(|field| field.to_field()) .collect::>() .map(DataType::Struct) @@ -109,8 +106,7 @@ pub trait FieldColumns { impl FieldColumns for ArrowSchema { fn to_data_columns(&self) -> Result> { - let api_version = json!("http://arrow.apache.org/"); - self.fields().to_data_columns(&api_version) + self.fields().to_data_columns() } } @@ -173,7 +169,6 @@ impl FieldColumns for RootSchema { } struct Context<'a> { - api_version: &'a Value, definitions: &'a Definitions, name: &'a str, } @@ -181,7 +176,6 @@ impl FieldColumns for RootSchema { trait JsonFieldColumn { fn to_data_column( &self, - api_version: &Value, definitions: &Definitions, name: &str, nullable: bool, @@ -191,74 +185,52 @@ impl FieldColumns for RootSchema { impl JsonFieldColumn for Schema { fn to_data_column( &self, - api_version: &Value, definitions: &Definitions, name: &str, nullable: bool, ) -> Result> { fn parse_instance_type( - Context { - api_version, - definitions, - name, - }: Context, + Context { definitions, name }: Context, value: &SchemaObject, instance_type: &InstanceType, - metadata: impl FnOnce(Value) -> FieldMetadata, nullable: bool, ) -> Result> { Ok(match instance_type { InstanceType::Null => None, - InstanceType::Boolean => Some( - StructField::new( - name, - DeltaDataType::Primitive(DeltaPrimitiveType::Boolean), - nullable, - ) - .with_metadata(metadata("Boolean".into())), - ), - InstanceType::Integer => Some( - StructField::new( - name, - DeltaDataType::Primitive(DeltaPrimitiveType::Long), - nullable, - ) - .with_metadata(metadata("Integer".into())), - ), - InstanceType::Number => Some( - StructField::new( - name, - DeltaDataType::Primitive(DeltaPrimitiveType::Double), - nullable, - ) - .with_metadata(metadata("Number".into())), - ), - InstanceType::String => Some( - StructField::new( - name, - DeltaDataType::Primitive(DeltaPrimitiveType::String), - nullable, - ) - .with_metadata(metadata("String".into())), - ), + InstanceType::Boolean => Some(StructField::new( + name, + DeltaDataType::Primitive(DeltaPrimitiveType::Boolean), + nullable, + )), + InstanceType::Integer => Some(StructField::new( + name, + DeltaDataType::Primitive(DeltaPrimitiveType::Long), + nullable, + )), + InstanceType::Number => Some(StructField::new( + name, + DeltaDataType::Primitive(DeltaPrimitiveType::Double), + nullable, + )), + InstanceType::String => Some(StructField::new( + name, + DeltaDataType::Primitive(DeltaPrimitiveType::String), + nullable, + )), InstanceType::Array => value .array - .to_array_data_type(api_version, definitions)? + .to_array_data_type(definitions)? .map(Box::new) .map(|type_| { StructField::new(name, DeltaDataType::Array(type_), nullable) - .with_metadata(metadata("Array".into())) }), - InstanceType::Object => Some( - StructField::new( - name, - DeltaDataType::Struct(Box::new(StructType::new( - value.object.to_data_columns(api_version, definitions)?, - ))), - nullable, - ) - .with_metadata(metadata("Object".into())), - ), + InstanceType::Object => Some(StructField::new( + name, + DeltaDataType::Struct(Box::new(StructType::new( + value.object.to_data_columns(definitions)?, + ))), + nullable, + )), }) } @@ -266,58 +238,27 @@ impl FieldColumns for RootSchema { Schema::Bool(true) => bail!("dynamic object is not supported yet"), Schema::Bool(false) => Ok(None), Schema::Object(value) => { - let metadata = - |kind| match json!(value.metadata.clone().unwrap_or_default()) { - Value::Object(mut metadata) => { - metadata.insert("apiVersion".into(), api_version.clone()); - metadata.insert("array".into(), json!(&value.array)); - metadata.insert("format".into(), json!(&value.format)); - metadata.insert("kind".into(), kind); - metadata.insert("number".into(), json!(&value.number)); - metadata.insert("object".into(), json!(&value.object)); - metadata.insert("string".into(), json!(&value.string)); - metadata.into_iter().collect() - } - _ => unreachable!("json schema metadata should be Object"), - }; - let instance_type = match find_schema_object_definition(definitions, value)? { Some(schema) => { - return schema.to_data_column( - api_version, - definitions, - name, - nullable, - ); + return schema.to_data_column(definitions, name, nullable); } None => value.instance_type.as_ref(), }; - let ctx = Context { - api_version, - definitions, - name, - }; + let ctx = Context { definitions, name }; Ok(match instance_type { Some(SingleOrVec::Single(instance_type)) => { - parse_instance_type(ctx, value, instance_type, metadata, nullable)? + parse_instance_type(ctx, value, instance_type, nullable)? } Some(SingleOrVec::Vec(instance_types)) => match instance_types.len() { 0 => None, - 1 => parse_instance_type( - ctx, - value, - &instance_types[0], - metadata, - nullable, - )?, + 1 => parse_instance_type(ctx, value, &instance_types[0], nullable)?, 2 => match find_instance_type_none(instance_types) { Some(index) => parse_instance_type( ctx, value, &instance_types[1 - index], - metadata, true, )?, None => bail!("union object is not supported"), @@ -374,33 +315,20 @@ impl FieldColumns for RootSchema { impl JsonFieldColumnEnum for Schema { fn to_enum_data_type( &self, - Context { - api_version, - definitions, - name, - }: Context, + Context { definitions, name }: Context, nullable: bool, ) -> Result> { - self.to_data_column(api_version, definitions, name, nullable) + self.to_data_column(definitions, name, nullable) } } trait JsonFieldColumnArray { - fn to_array_data_type( - &self, - api_version: &Value, - definitions: &Definitions, - ) -> Result>; + fn to_array_data_type(&self, definitions: &Definitions) -> Result>; } impl JsonFieldColumnArray for Schema { - fn to_array_data_type( - &self, - api_version: &Value, - definitions: &Definitions, - ) -> Result> { + fn to_array_data_type(&self, definitions: &Definitions) -> Result> { fn parse_instance_type( - api_version: &Value, definitions: &Definitions, instance_type: &InstanceType, nullable: bool, @@ -426,12 +354,12 @@ impl FieldColumns for RootSchema { )), InstanceType::Array => value .array - .to_array_data_type(api_version, definitions)? + .to_array_data_type(definitions)? .map(Box::new) .map(|type_| ArrayType::new(DeltaDataType::Array(type_), nullable)), InstanceType::Object => Some(ArrayType::new( DeltaDataType::Struct(Box::new(StructType::new( - value.object.to_data_columns(api_version, definitions)?, + value.object.to_data_columns(definitions)?, ))), nullable, )), @@ -446,20 +374,15 @@ impl FieldColumns for RootSchema { Schema::Bool(false) => Ok(None), Schema::Object(value) => { match find_schema_object_definition(definitions, value)? { - Some(schema) => schema.to_array_data_type(api_version, definitions), + Some(schema) => schema.to_array_data_type(definitions), None => match &value.instance_type { - Some(SingleOrVec::Single(instance_type)) => parse_instance_type( - api_version, - definitions, - instance_type, - nullable, - value, - ), + Some(SingleOrVec::Single(instance_type)) => { + parse_instance_type(definitions, instance_type, nullable, value) + } Some(SingleOrVec::Vec(instance_types)) => { match instance_types.len() { 0 => Ok(None), 1 => parse_instance_type( - api_version, definitions, &instance_types[0], nullable, @@ -467,7 +390,6 @@ impl FieldColumns for RootSchema { ), 2 => match find_instance_type_none(instance_types) { Some(index) => parse_instance_type( - api_version, definitions, &instance_types[1 - index], true, @@ -487,15 +409,9 @@ impl FieldColumns for RootSchema { } impl JsonFieldColumnArray for Option<&SingleOrVec> { - fn to_array_data_type( - &self, - api_version: &Value, - definitions: &Definitions, - ) -> Result> { + fn to_array_data_type(&self, definitions: &Definitions) -> Result> { match self { - Some(SingleOrVec::Single(value)) => { - value.to_array_data_type(api_version, definitions) - } + Some(SingleOrVec::Single(value)) => value.to_array_data_type(definitions), Some(SingleOrVec::Vec(_)) => { bail!("union array is not supported") } @@ -505,37 +421,25 @@ impl FieldColumns for RootSchema { } impl JsonFieldColumnArray for Option> { - fn to_array_data_type( - &self, - api_version: &Value, - definitions: &Definitions, - ) -> Result> { + fn to_array_data_type(&self, definitions: &Definitions) -> Result> { self.as_ref() .and_then(|value| value.items.as_ref()) - .to_array_data_type(api_version, definitions) + .to_array_data_type(definitions) } } trait JsonFieldColumns { - fn to_data_columns( - &self, - api_version: &Value, - definitions: &Definitions, - ) -> Result>; + fn to_data_columns(&self, definitions: &Definitions) -> Result>; } impl JsonFieldColumns for Box { - fn to_data_columns( - &self, - api_version: &Value, - definitions: &Definitions, - ) -> Result> { + fn to_data_columns(&self, definitions: &Definitions) -> Result> { self.properties .iter() .filter_map(|(child_name, child)| { let nullable = !self.required.contains(child_name); child - .to_data_column(api_version, definitions, child_name, nullable) + .to_data_column(definitions, child_name, nullable) .transpose() }) .collect() @@ -543,22 +447,14 @@ impl FieldColumns for RootSchema { } impl JsonFieldColumns for Option> { - fn to_data_columns( - &self, - api_version: &Value, - definitions: &Definitions, - ) -> Result> { + fn to_data_columns(&self, definitions: &Definitions) -> Result> { match self { - Some(value) => value.to_data_columns(api_version, definitions), + Some(value) => value.to_data_columns(definitions), None => Ok(Default::default()), } } } - let api_version = json!(self - .meta_schema - .as_deref() - .unwrap_or("http://json-schema.org/")); let definitions = &self.definitions; // is metadta value dynamic? @@ -572,9 +468,7 @@ impl FieldColumns for RootSchema { { Ok(Default::default()) } else { - self.schema - .object - .to_data_columns(&api_version, definitions) + self.schema.object.to_data_columns(definitions) } } } @@ -585,13 +479,11 @@ impl FieldColumns for [ModelFieldNativeSpec] { name: String, type_: FieldBuilderType, attributes: ModelFieldAttributeSpec, - metadata: FieldMetadata, } impl FieldBuilder { fn push<'a>( &mut self, - api_version: &Value, mut child_names: impl Iterator, name: &'a str, field: &'a ModelFieldNativeSpec, @@ -604,9 +496,8 @@ impl FieldColumns for [ModelFieldNativeSpec] { name: name.into(), type_: FieldBuilderType::Object(Default::default()), attributes: field.attribute, - metadata: field.to_metadata(api_version), }) - .push(api_version, child_names, child_name, field), + .push(child_names, child_name, field), None => match &field.kind { // BEGIN primitive types ModelFieldKindNativeSpec::None {} => Ok(()), @@ -619,7 +510,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { FieldBuilderPrimitiveType::Boolean, ), attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -637,7 +527,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { FieldBuilderPrimitiveType::Integer, ), attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -655,7 +544,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { FieldBuilderPrimitiveType::Number, ), attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -672,7 +560,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { FieldBuilderPrimitiveType::String, ), attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -689,7 +576,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { FieldBuilderPrimitiveType::String, ), attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -704,7 +590,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { FieldBuilderPrimitiveType::DateTime, ), attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -718,7 +603,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { FieldBuilderPrimitiveType::String, ), attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -732,7 +616,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { FieldBuilderPrimitiveType::String, ), attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -749,7 +632,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { ), ), attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -762,7 +644,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { name: name.into(), type_: FieldBuilderType::Dynamic, attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -775,7 +656,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { name: name.into(), type_: FieldBuilderType::Object(Default::default()), attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -790,7 +670,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { FieldBuilderArrayType::Object, ), attributes: field.attribute, - metadata: field.to_metadata(api_version), }, ); Ok(()) @@ -817,7 +696,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { name, type_, attributes: ModelFieldAttributeSpec { optional: nullable }, - metadata, } = field; Ok(Self::new( @@ -846,8 +724,7 @@ impl FieldColumns for [ModelFieldNativeSpec] { FieldBuilderType::Dynamic => bail!("dynamic array is not supported yet"), }, nullable, - ) - .with_metadata(metadata)) + )) } } @@ -893,101 +770,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { } } - trait ToFieldMetadata { - fn to_metadata(&self, api_version: &Value) -> FieldMetadata; - } - - impl ToFieldMetadata for ModelFieldNativeSpec { - fn to_metadata(&self, api_version: &Value) -> FieldMetadata { - match &self.kind { - // BEGIN primitive types - ModelFieldKindNativeSpec::None {} => hash_map! { - "apiVersion".into() => json!(api_version), - "kind".into() => json!("None"), - }, - ModelFieldKindNativeSpec::Boolean { default } => hash_map! { - "apiVersion".into() => json!(api_version), - "kind".into() => json!("Boolean"), - "default".into() => json!(default), - }, - ModelFieldKindNativeSpec::Integer { - default, - minimum, - maximum, - } => hash_map! { - "apiVersion".into() => json!(api_version), - "kind".into() => json!("Integer"), - "default".into() => json!(default), - "minimum".into() => json!(minimum), - "maximum".into() => json!(maximum), - }, - ModelFieldKindNativeSpec::Number { - default, - minimum, - maximum, - } => hash_map! { - "apiVersion".into() => json!(api_version), - "kind".into() => json!("Number"), - "default".into() => json!(default), - "minimum".into() => json!(minimum), - "maximum".into() => json!(maximum), - }, - ModelFieldKindNativeSpec::String { default, kind } => hash_map! { - "apiVersion".into() => json!(api_version), - "kind".into() => json!("String"), - "default".into() => json!(default), - "spec".into() => json!(kind), - }, - ModelFieldKindNativeSpec::OneOfStrings { default, choices } => hash_map! { - "apiVersion".into() => json!(api_version), - "kind".into() => json!("OneOfStrings"), - "default".into() => json!(default), - "choices".into() => json!(choices), - }, - // BEGIN string formats - ModelFieldKindNativeSpec::DateTime { default } => hash_map! { - "apiVersion".into() => json!(api_version), - "kind".into() => json!("DateTime"), - "default".into() => json!(default), - }, - ModelFieldKindNativeSpec::Ip {} => hash_map! { - "apiVersion".into() => json!(api_version), - "kind".into() => json!("Ip"), - }, - ModelFieldKindNativeSpec::Uuid {} => hash_map! { - "apiVersion".into() => json!(api_version), - "kind".into() => json!("Uuid"), - }, - // BEGIN aggregation types - ModelFieldKindNativeSpec::StringArray {} => hash_map! { - "apiVersion".into() => json!(api_version), - "arrayKind".into() => json!("String"), - "kind".into() => json!("Array"), - }, - ModelFieldKindNativeSpec::Object { children, kind } => hash_map! { - "apiVersion".into() => json!(api_version), - "kind".into() => json!("Object"), - "children".into() => json!(children), - "spec".into() => json!(kind), - }, - ModelFieldKindNativeSpec::ObjectArray { children } => hash_map! { - "apiVersion".into() => json!(api_version), - "arrayKind".into() => json!("Object"), - "kind".into() => json!("Array"), - "children".into() => json!(children), - }, - } - } - } - - let api_version = json!(format!( - "http://{crd_version}", - crd_version = { - use kube::Resource; - ::dash_api::model::ModelCrd::api_version(&()) - }, - )); - let root = match self.first() { Some(root) => root, None => return Ok(Default::default()), @@ -996,7 +778,6 @@ impl FieldColumns for [ModelFieldNativeSpec] { name: Default::default(), type_: FieldBuilderType::Object(Default::default()), attributes: root.attribute, - metadata: root.to_metadata(&api_version), }; for field in &self[1..] { @@ -1004,7 +785,7 @@ impl FieldColumns for [ModelFieldNativeSpec] { let field_name = field_child_names .next() .ok_or_else(|| anyhow!("fields are not ordered"))?; - root.push(&api_version, field_child_names, field_name, field)?; + root.push(field_child_names, field_name, field)?; } root.try_into_children() .and_then(|children| children.into_values().map(TryInto::try_into).collect()) @@ -1018,48 +799,41 @@ impl FieldColumns for Vec { } trait FieldChildren { - fn to_data_columns(&self, api_version: &Value) -> Result>; + fn to_data_columns(&self) -> Result>; } impl FieldChildren for Fields { - fn to_data_columns(&self, api_version: &Value) -> Result> { + fn to_data_columns(&self) -> Result> { self.iter() - .filter_map(|field| field.to_data_column(api_version).transpose()) + .filter_map(|field| field.to_data_column().transpose()) .collect() } } trait FieldChild { - fn to_data_column(&self, api_version: &Value) -> Result>; + fn to_data_column(&self) -> Result>; } impl FieldChild for Field { - fn to_data_column(&self, api_version: &Value) -> Result> { - self.data_type().to_data_type(api_version).map(|type_| { - type_.map(|type_| { - StructField::new(self.name().clone(), type_, self.is_nullable()).with_metadata( - self.metadata() - .iter() - .map(|(key, value)| (key.clone(), Value::String(value.clone()))) - .chain([("apiVersion".into(), api_version.clone())]), - ) - }) + fn to_data_column(&self) -> Result> { + self.data_type().to_data_type().map(|type_| { + type_.map(|type_| StructField::new(self.name().clone(), type_, self.is_nullable())) }) } } trait FieldSchema { - fn to_data_type(&self, api_version: &Value) -> Result>; + fn to_data_type(&self) -> Result>; } impl FieldSchema for Field { - fn to_data_type(&self, api_version: &Value) -> Result> { - self.data_type().to_data_type(api_version) + fn to_data_type(&self) -> Result> { + self.data_type().to_data_type() } } impl FieldSchema for DataType { - fn to_data_type(&self, api_version: &Value) -> Result> { + fn to_data_type(&self) -> Result> { Ok(match self { // BEGIN primitive types DataType::Null => None, @@ -1080,7 +854,7 @@ impl FieldSchema for DataType { DataType::Float32 => Some(DeltaDataType::Primitive(DeltaPrimitiveType::Float)), DataType::Float64 => Some(DeltaDataType::Primitive(DeltaPrimitiveType::Double)), DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { - Some(DeltaDataType::decimal(*precision, *scale)?) + Some(DeltaDataType::decimal(*precision, (*scale).try_into()?)?) } // BEGIN binary formats DataType::Binary | DataType::FixedSizeBinary(_) | DataType::LargeBinary => { @@ -1105,13 +879,13 @@ impl FieldSchema for DataType { DataType::FixedSizeList(field, _) | DataType::List(field) | DataType::LargeList(field) => field - .to_data_type(api_version)? + .to_data_type()? .map(Into::into) .map(|type_| ArrayType::new(type_, field.is_nullable())) .map(Box::new) .map(DeltaDataType::Array), DataType::Struct(fields) => Some(DeltaDataType::Struct(Box::new(StructType::new( - fields.to_data_columns(api_version)?, + fields.to_data_columns()?, )))), // DataType::Dictionary(_, _) => todo!(), // DataType::Map(_, _) => todo!(), @@ -1120,5 +894,3 @@ impl FieldSchema for DataType { }) } } - -type FieldMetadata = ::std::collections::HashMap; diff --git a/crates/kubegraph/api/src/frame/mod.rs b/crates/kubegraph/api/src/frame/mod.rs index 5f91a0a1..668bd9e8 100644 --- a/crates/kubegraph/api/src/frame/mod.rs +++ b/crates/kubegraph/api/src/frame/mod.rs @@ -158,11 +158,10 @@ impl LazyFrame { #[cfg(feature = "df-polars")] Self::Polars(nodes) => Ok(Self::Polars( select_polars_edge_side(&nodes, metadata.name(), metadata.src()) - .cross_join(select_polars_edge_side( - &nodes, - metadata.name(), - metadata.sink(), - )) + .cross_join( + select_polars_edge_side(&nodes, metadata.name(), metadata.sink()), + None, + ) .with_column( dsl::lit(ProblemSpec::::MAX_CAPACITY).alias(metadata.capacity()), ), diff --git a/crates/kubegraph/solver/ortools/tests/simple.rs b/crates/kubegraph/solver/ortools/tests/simple.rs index 9cee6cee..0a568229 100644 --- a/crates/kubegraph/solver/ortools/tests/simple.rs +++ b/crates/kubegraph/solver/ortools/tests/simple.rs @@ -183,7 +183,9 @@ fn min_cost_flow() { let edges_flow = Series::from_iter((0..edges_capacity.len()).map(|edge| output.flow(edge as ArcIndex))) .with_name("flow"); - let edges_cost = (edges_flow.clone() * edges_unit_cost).with_name("cost"); + let edges_cost = (edges_flow.clone() * edges_unit_cost) + .expect("failed to get edges cost") + .with_name("cost"); let mut optimized_edges = edges.clone(); optimized_edges diff --git a/crates/kubegraph/solver/ortools/tests/simple_solver.rs b/crates/kubegraph/solver/ortools/tests/simple_solver.rs index 363f0430..f5f64bfb 100644 --- a/crates/kubegraph/solver/ortools/tests/simple_solver.rs +++ b/crates/kubegraph/solver/ortools/tests/simple_solver.rs @@ -55,7 +55,9 @@ async fn solver_simple() { let edges_flow = optimized_edges.column("flow").unwrap(); let edges_unit_cost = optimized_edges.column("unit_cost").unwrap(); - let edges_cost = (edges_flow * edges_unit_cost).with_name("cost"); + let edges_cost = (edges_flow * edges_unit_cost) + .expect("failed to get edges cost") + .with_name("cost"); optimized_edges .with_column(edges_cost) .expect("failed to insert edge cost column"); diff --git a/crates/kubegraph/visualizer/egui/src/lib.rs b/crates/kubegraph/visualizer/egui/src/lib.rs index 09b85961..2a57a417 100644 --- a/crates/kubegraph/visualizer/egui/src/lib.rs +++ b/crates/kubegraph/visualizer/egui/src/lib.rs @@ -170,7 +170,7 @@ impl NetworkVisualizerApp { async fn update_graph(&mut self, ui: &mut Ui) { if let Some(graph) = self.data.graph.lock().await.as_mut() { - let interaction_settings = &SettingsInteraction::new() + let settings_interaction = &SettingsInteraction::new() .with_dragging_enabled(true) .with_node_clicking_enabled(true) .with_node_selection_enabled(true) @@ -178,11 +178,11 @@ impl NetworkVisualizerApp { .with_edge_clicking_enabled(true) .with_edge_selection_enabled(true) .with_edge_selection_multi_enabled(true); - let style_settings = &SettingsStyle::new().with_labels_always(true); + let settings_style = &SettingsStyle::new().with_labels_always(true); ui.add( &mut GraphView::<_, _, _, _, self::node::NodeShape, DefaultEdgeShape>::new(graph) - .with_styles(style_settings) - .with_interactions(interaction_settings), + .with_styles(settings_style) + .with_interactions(settings_interaction), ); } }