Skip to content

Commit

Permalink
Make catalog work
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed Dec 8, 2024
1 parent b68da83 commit b5ee9db
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 3 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ env_logger = "0.11.5"
futures = "0.3.30"
http = "1"
http-body = "1"
iceberg-catalog-rest = { git = "https://github.com/apache/iceberg-rust", rev = "16f9411dd3897134a401ece97d73cd33d6790bff"}
iceberg-datafusion = { git = "https://github.com/apache/iceberg-rust", rev = "16f9411dd3897134a401ece97d73cd33d6790bff", optional = true }
itertools = "0.13.0"
lazy_static = "1.4.0"
Expand Down
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,23 @@ Register deltalake tables. For example:
CREATE EXTERNAL TABLE table_name STORED AS DELTATABLE LOCATION 's3://bucket/table'
```

##### Iceberg (`--features=iceberg`)

Register iceberg tables. For example:

```sql
CREATE EXTERNAL TABLE table_name STORED AS ICEBERG LOCATION 's3://bucket/table'
```

Register Iceberg Rest Catalog

```toml
[[execution.iceberg.rest_catalog]]
name = "my_iceberg_catalog"
addr = "192.168.1.1:8181"
```


##### Json Functions (`--features=function-json`)

Adds functions from [datafusion-function-json] for querying JSON strings in DataFusion in `dft`. For example:
Expand Down
20 changes: 20 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub struct ExecutionConfig {
pub dedicated_executor_enabled: bool,
#[serde(default = "default_dedicated_executor_threads")]
pub dedicated_executor_threads: usize,
#[serde(default = "default_iceberg_config")]
pub iceberg: IcebergConfig,
}

fn default_ddl_path() -> Option<PathBuf> {
Expand Down Expand Up @@ -220,6 +222,12 @@ fn default_dedicated_executor_threads() -> usize {
num_cpus::get()
}

fn default_iceberg_config() -> IcebergConfig {
IcebergConfig {
rest_catalogs: Vec::new(),
}
}

impl Default for ExecutionConfig {
fn default() -> Self {
Self {
Expand All @@ -231,10 +239,22 @@ impl Default for ExecutionConfig {
flightsql_server_batch_size: default_flightsql_server_batch_size(),
dedicated_executor_enabled: default_dedicated_executor_enabled(),
dedicated_executor_threads: default_dedicated_executor_threads(),
iceberg: default_iceberg_config(),
}
}
}

#[derive(Clone, Debug, Deserialize)]
pub struct RestCatalogConfig {
pub name: String,
pub addr: String,
}

#[derive(Clone, Debug, Deserialize)]
pub struct IcebergConfig {
pub rest_catalogs: Vec<RestCatalogConfig>,
}

#[derive(Clone, Debug, Default, Deserialize)]
pub struct InteractionConfig {
#[serde(default = "default_mouse")]
Expand Down
26 changes: 25 additions & 1 deletion src/extensions/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
//! [`DftSessionStateBuilder`] for configuring DataFusion [`SessionState`]
use color_eyre::eyre;
use datafusion::catalog::TableProviderFactory;
use datafusion::catalog::{CatalogProvider, CatalogProviderList, TableProviderFactory};
use datafusion::catalog_common::MemoryCatalogProviderList;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::session_state::SessionStateBuilder;
Expand Down Expand Up @@ -54,6 +55,7 @@ pub struct DftSessionStateBuilder {
execution_config: Option<ExecutionConfig>,
session_config: SessionConfig,
table_factories: Option<HashMap<String, Arc<dyn TableProviderFactory>>>,
catalog_providers: Option<HashMap<String, Arc<dyn CatalogProvider>>>,
runtime_env: Option<Arc<RuntimeEnv>>,
}

Expand Down Expand Up @@ -86,6 +88,7 @@ impl DftSessionStateBuilder {
app_type: None,
execution_config: None,
table_factories: None,
catalog_providers: None,
runtime_env: None,
}
}
Expand Down Expand Up @@ -118,6 +121,18 @@ impl DftSessionStateBuilder {
}
}

/// Add a catalog provider to the list of providers on this builder
pub fn add_catalog_provider(&mut self, name: &str, factory: Arc<dyn CatalogProvider>) {
if self.catalog_providers.is_none() {
self.catalog_providers = Some(HashMap::from([(name.to_string(), factory)]));
} else {
self.catalog_providers
.as_mut()
.unwrap()
.insert(name.to_string(), factory);
}
}

/// Return the current [`RuntimeEnv`], creating a default if it doesn't exist
pub fn runtime_env(&mut self) -> &RuntimeEnv {
if self.runtime_env.is_none() {
Expand Down Expand Up @@ -166,6 +181,7 @@ impl DftSessionStateBuilder {
execution_config,
mut session_config,
table_factories,
catalog_providers,
runtime_env,
..
} = self;
Expand Down Expand Up @@ -197,6 +213,14 @@ impl DftSessionStateBuilder {
builder = builder.with_table_factories(table_factories);
}

if let Some(catalog_providers) = catalog_providers {
let catalogs_list = MemoryCatalogProviderList::new();
for (k, v) in catalog_providers {
catalogs_list.register_catalog(k, v);
}
builder = builder.with_catalog_list(Arc::new(catalogs_list));
}

Ok(builder.build())
}
}
14 changes: 12 additions & 2 deletions src/extensions/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
use crate::config::ExecutionConfig;
use crate::extensions::{DftSessionStateBuilder, Extension};
use iceberg_datafusion::IcebergTableProviderFactory;
use datafusion_common::DataFusionError;
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
use iceberg_datafusion::{IcebergCatalogProvider, IcebergTableProviderFactory};
use std::sync::Arc;

#[derive(Debug, Default)]
Expand All @@ -35,9 +37,17 @@ impl IcebergExtension {
impl Extension for IcebergExtension {
async fn register(
&self,
_config: ExecutionConfig,
config: ExecutionConfig,
builder: &mut DftSessionStateBuilder,
) -> datafusion_common::Result<()> {
for cfg in config.iceberg.rest_catalogs {
let rest_catalog_config = RestCatalogConfig::builder().uri(cfg.addr).build();
let rest_catalog = RestCatalog::new(rest_catalog_config);
let catalog_provider = IcebergCatalogProvider::try_new(Arc::new(rest_catalog))
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
builder.add_catalog_provider(&cfg.name, Arc::new(catalog_provider));
}
// TODO Add Iceberg Catalog
let factory = Arc::new(IcebergTableProviderFactory {});
builder.add_table_factory("ICEBERG", factory);
Expand Down

0 comments on commit b5ee9db

Please sign in to comment.