diff --git a/Cargo.lock b/Cargo.lock index 9ce0862..c8213f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1985,6 +1985,7 @@ dependencies = [ "futures", "http", "http-body", + "iceberg-catalog-rest", "iceberg-datafusion", "insta", "itertools 0.13.0", @@ -2694,6 +2695,26 @@ dependencies = [ "uuid", ] +[[package]] +name = "iceberg-catalog-rest" +version = "0.3.0" +source = "git+https://github.com/apache/iceberg-rust?rev=16f9411dd3897134a401ece97d73cd33d6790bff#16f9411dd3897134a401ece97d73cd33d6790bff" +dependencies = [ + "async-trait", + "chrono", + "http", + "iceberg", + "itertools 0.13.0", + "log", + "reqwest", + "serde", + "serde_derive", + "serde_json", + "tokio", + "typed-builder 0.20.0", + "uuid", +] + [[package]] name = "iceberg-datafusion" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 6dcbcc6..1f053b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ env_logger = "0.11.5" futures = "0.3.30" http = "1" http-body = "1" +iceberg-catalog-rest = { git = "https://github.com/apache/iceberg-rust", rev = "16f9411dd3897134a401ece97d73cd33d6790bff"} iceberg-datafusion = { git = "https://github.com/apache/iceberg-rust", rev = "16f9411dd3897134a401ece97d73cd33d6790bff", optional = true } itertools = "0.13.0" lazy_static = "1.4.0" diff --git a/README.md b/README.md index fceebae..8a679e1 100644 --- a/README.md +++ b/README.md @@ -201,6 +201,23 @@ Register deltalake tables. For example: CREATE EXTERNAL TABLE table_name STORED AS DELTATABLE LOCATION 's3://bucket/table' ``` +##### Iceberg (`--features=iceberg`) + +Register iceberg tables. For example: + +```sql +CREATE EXTERNAL TABLE table_name STORED AS ICEBERG LOCATION 's3://bucket/table' +``` + +Register Iceberg Rest Catalog + +```toml +[[execution.iceberg.rest_catalog]] +name = "my_iceberg_catalog" +addr = "192.168.1.1:8181" +``` + + ##### Json Functions (`--features=function-json`) Adds functions from [datafusion-function-json] for querying JSON strings in DataFusion in `dft`. For example: diff --git a/src/config.rs b/src/config.rs index adcb3f2..594260f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -176,6 +176,8 @@ pub struct ExecutionConfig { pub dedicated_executor_enabled: bool, #[serde(default = "default_dedicated_executor_threads")] pub dedicated_executor_threads: usize, + #[serde(default = "default_iceberg_config")] + pub iceberg: IcebergConfig, } fn default_ddl_path() -> Option { @@ -220,6 +222,12 @@ fn default_dedicated_executor_threads() -> usize { num_cpus::get() } +fn default_iceberg_config() -> IcebergConfig { + IcebergConfig { + rest_catalogs: Vec::new(), + } +} + impl Default for ExecutionConfig { fn default() -> Self { Self { @@ -231,10 +239,22 @@ impl Default for ExecutionConfig { flightsql_server_batch_size: default_flightsql_server_batch_size(), dedicated_executor_enabled: default_dedicated_executor_enabled(), dedicated_executor_threads: default_dedicated_executor_threads(), + iceberg: default_iceberg_config(), } } } +#[derive(Clone, Debug, Deserialize)] +pub struct RestCatalogConfig { + pub name: String, + pub addr: String, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct IcebergConfig { + pub rest_catalogs: Vec, +} + #[derive(Clone, Debug, Default, Deserialize)] pub struct InteractionConfig { #[serde(default = "default_mouse")] diff --git a/src/extensions/builder.rs b/src/extensions/builder.rs index ce26855..2359098 100644 --- a/src/extensions/builder.rs +++ b/src/extensions/builder.rs @@ -18,7 +18,8 @@ //! [`DftSessionStateBuilder`] for configuring DataFusion [`SessionState`] use color_eyre::eyre; -use datafusion::catalog::TableProviderFactory; +use datafusion::catalog::{CatalogProvider, CatalogProviderList, TableProviderFactory}; +use datafusion::catalog_common::MemoryCatalogProviderList; use datafusion::execution::context::SessionState; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::session_state::SessionStateBuilder; @@ -54,6 +55,7 @@ pub struct DftSessionStateBuilder { execution_config: Option, session_config: SessionConfig, table_factories: Option>>, + catalog_providers: Option>>, runtime_env: Option>, } @@ -86,6 +88,7 @@ impl DftSessionStateBuilder { app_type: None, execution_config: None, table_factories: None, + catalog_providers: None, runtime_env: None, } } @@ -118,6 +121,18 @@ impl DftSessionStateBuilder { } } + /// Add a catalog provider to the list of providers on this builder + pub fn add_catalog_provider(&mut self, name: &str, factory: Arc) { + if self.catalog_providers.is_none() { + self.catalog_providers = Some(HashMap::from([(name.to_string(), factory)])); + } else { + self.catalog_providers + .as_mut() + .unwrap() + .insert(name.to_string(), factory); + } + } + /// Return the current [`RuntimeEnv`], creating a default if it doesn't exist pub fn runtime_env(&mut self) -> &RuntimeEnv { if self.runtime_env.is_none() { @@ -166,6 +181,7 @@ impl DftSessionStateBuilder { execution_config, mut session_config, table_factories, + catalog_providers, runtime_env, .. } = self; @@ -197,6 +213,14 @@ impl DftSessionStateBuilder { builder = builder.with_table_factories(table_factories); } + if let Some(catalog_providers) = catalog_providers { + let catalogs_list = MemoryCatalogProviderList::new(); + for (k, v) in catalog_providers { + catalogs_list.register_catalog(k, v); + } + builder = builder.with_catalog_list(Arc::new(catalogs_list)); + } + Ok(builder.build()) } } diff --git a/src/extensions/iceberg.rs b/src/extensions/iceberg.rs index 13b2cd5..3c08ccd 100644 --- a/src/extensions/iceberg.rs +++ b/src/extensions/iceberg.rs @@ -19,7 +19,9 @@ use crate::config::ExecutionConfig; use crate::extensions::{DftSessionStateBuilder, Extension}; -use iceberg_datafusion::IcebergTableProviderFactory; +use datafusion_common::DataFusionError; +use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; +use iceberg_datafusion::{IcebergCatalogProvider, IcebergTableProviderFactory}; use std::sync::Arc; #[derive(Debug, Default)] @@ -35,9 +37,17 @@ impl IcebergExtension { impl Extension for IcebergExtension { async fn register( &self, - _config: ExecutionConfig, + config: ExecutionConfig, builder: &mut DftSessionStateBuilder, ) -> datafusion_common::Result<()> { + for cfg in config.iceberg.rest_catalogs { + let rest_catalog_config = RestCatalogConfig::builder().uri(cfg.addr).build(); + let rest_catalog = RestCatalog::new(rest_catalog_config); + let catalog_provider = IcebergCatalogProvider::try_new(Arc::new(rest_catalog)) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + builder.add_catalog_provider(&cfg.name, Arc::new(catalog_provider)); + } // TODO Add Iceberg Catalog let factory = Arc::new(IcebergTableProviderFactory {}); builder.add_table_factory("ICEBERG", factory);