Skip to content

Commit

Permalink
allow config changes at runtime in SQL
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Liang <[email protected]>
  • Loading branch information
yliang412 committed Nov 12, 2024
1 parent c77a719 commit d5b0239
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 12 deletions.
10 changes: 8 additions & 2 deletions datafusion-optd-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ pub async fn main() -> Result<()> {
session_config = session_config.with_batch_size(batch_size);
};

// Set optd configs from command line.
let mut config_ext = datafusion::config::Extensions::new();
let mut optd_conf = optd_datafusion_bridge::OptdDFConfig::default();
optd_conf.enable_adaptive = args.enable_adaptive;
config_ext.insert(optd_conf);
session_config.options_mut().extensions = config_ext;

let rn_config = RuntimeConfig::new();
let rn_config =
// set memory pool size
Expand All @@ -204,8 +211,7 @@ pub async fn main() -> Result<()> {
let runtime_env = create_runtime_env(rn_config.clone())?;

let mut ctx = {
let mut state =
SessionState::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
let mut state = SessionState::new_with_config_rt(session_config, Arc::new(runtime_env));
if !args.enable_df_logical {
// clean up optimizer rules so that we can plug in our own optimizer
state = state.with_optimizer_rules(vec![]);
Expand Down
4 changes: 2 additions & 2 deletions optd-datafusion-bridge/src/from_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,8 @@ impl OptdPlanContext<'_> {
typ => unimplemented!("{}", typ),
};

let optimizer = self.optimizer.as_ref().unwrap();
if optimizer.adaptive_enabled() {
let config = self.optd_config();
if config.enable_adaptive {
let bare_with_collector: Result<Arc<dyn ExecutionPlan>> =
Ok(Arc::new(CollectorExec::new(
bare,
Expand Down
133 changes: 125 additions & 8 deletions optd-datafusion-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use datafusion::arrow::datatypes::DataType;
use datafusion::catalog::information_schema::InformationSchemaProvider;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::CatalogList;
use datafusion::error::Result;
use datafusion::execution::context::{QueryPlanner, SessionState};
Expand All @@ -23,13 +25,107 @@ use datafusion::logical_expr::{
use datafusion::physical_plan::explain::ExplainExec;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion::sql::TableReference;
use itertools::Itertools;
use optd_datafusion_repr::plan_nodes::{
dispatch_plan_explain_to_string, ArcDfPlanNode, ConstantType, DfNodeType, DfReprPlanNode,
PhysicalHashJoin, PhysicalNestedLoopJoin,
dispatch_plan_explain_to_string, ArcDfPlanNode, DfNodeType, DfReprPlanNode, PhysicalHashJoin,
PhysicalNestedLoopJoin,
};
use optd_datafusion_repr::properties::schema::Catalog;
use optd_datafusion_repr::{DatafusionOptimizer, MemoExt};
use optd_datafusion_repr::{
plan_nodes::ConstantType, properties::schema::Catalog, DatafusionOptimizer, MemoExt,
};

macro_rules! optd_extensions_options {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
#[derive(Debug, Clone)]
#[non_exhaustive]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$field_vis $field_name : $field_type,
)*
}

impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: $default),*
}
}
}

impl datafusion::config::ExtensionOptions for $struct_name {
fn as_any(&self) -> &dyn ::std::any::Any {
self
}

fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
self
}

fn cloned(&self) -> Box<dyn datafusion::config::ExtensionOptions> {
Box::new(self.clone())
}

fn set(&mut self, key: &str, value: &str) -> Result<()> {
match key {
$(
stringify!($field_name) => {
self.$field_name = value.parse().map_err(|e| {
::datafusion::error::DataFusionError::Context(
format!(concat!("Error parsing {} as ", stringify!($t),), value),
Box::new(::datafusion::error::DataFusionError::External(Box::new(e))),
)
})?;
Ok(())
}
)*
_ => Err(::datafusion::error::DataFusionError::Internal(
format!(concat!("Config value \"{}\" not found on ", stringify!($struct_name)), key)
))
}
}

fn entries(&self) -> Vec<datafusion::config::ConfigEntry> {
vec![
$(
datafusion::config::ConfigEntry {
key: format!("optd.{}", stringify!($field_name)),
value: (self.$field_name != $default).then(|| self.$field_name.to_string()),
description: concat!($($d),*).trim(),
},
)*
]
}
}
}
}

optd_extensions_options! {
/// optd configurations
pub struct OptdDFConfig {
/// Turn on adaptive optimization.
pub enable_adaptive: bool, default = false
/// Use heuristic optimizer before entering cascades.
pub enable_heuristic: bool, default = true

pub explain_logical: bool, default = true
}

}

impl datafusion::config::ConfigExtension for OptdDFConfig {
const PREFIX: &'static str = "optd";
}

pub struct OptdPlanContext<'a> {
tables: HashMap<String, Arc<dyn TableSource>>,
Expand All @@ -45,23 +141,44 @@ impl<'a> OptdPlanContext<'a> {
optimizer: None,
}
}

pub fn optd_config(&self) -> &OptdDFConfig {
let config = self
.session_state
.config_options()
.extensions
.get::<OptdDFConfig>()
.expect("optd config not set");

config
}
}

pub struct DatafusionCatalog {
catalog: Arc<dyn CatalogList>,
information_schema: Arc<dyn SchemaProvider>,
}

impl DatafusionCatalog {
pub fn new(catalog: Arc<dyn CatalogList>) -> Self {
Self { catalog }
let information_schema = Arc::new(InformationSchemaProvider::new(catalog.clone()));
Self {
catalog,
information_schema,
}
}
}

impl Catalog for DatafusionCatalog {
fn get(&self, name: &str) -> optd_datafusion_repr::properties::schema::Schema {
let catalog = self.catalog.catalog("datafusion").unwrap();
let schema = catalog.schema("public").unwrap();
let table = futures_lite::future::block_on(schema.table(name.as_ref())).unwrap();
let resolved = TableReference::from(name).resolve("datafusion", "public");
let catalog = self.catalog.catalog(&resolved.catalog).unwrap();
let schema = if resolved.schema == "information_schema" {
self.information_schema.clone()
} else {
catalog.schema(&resolved.schema).unwrap()
};
let table = futures_lite::future::block_on(schema.table(&resolved.table)).unwrap();
let schema = table.schema();
let fields = schema.fields();
let mut optd_fields = Vec::with_capacity(fields.len());
Expand Down

0 comments on commit d5b0239

Please sign in to comment.