Skip to content

Commit

Permalink
refactor: remove DfPlan wrapper (#4733)
Browse files Browse the repository at this point in the history
* refactor: remove DfPlan wrapper

Signed-off-by: Ruihang Xia <[email protected]>

* clean up

Signed-off-by: Ruihang Xia <[email protected]>

* remove unused errors

Signed-off-by: Ruihang Xia <[email protected]>

* fix test assertion

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Sep 19, 2024
1 parent 1acda74 commit f5cf25b
Show file tree
Hide file tree
Showing 38 changed files with 194 additions and 293 deletions.
12 changes: 7 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use either::Either;
use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::{DefaultSerializer, QueryEngineState};
use query::QueryEngine;
use rustyline::error::ReadlineError;
Expand Down Expand Up @@ -179,7 +178,7 @@ impl Repl {
.await
.context(PlanStatementSnafu)?;

let LogicalPlan::DfPlan(plan) = query_engine
let plan = query_engine
.optimize(&query_engine.engine_context(query_ctx), &plan)
.context(PlanStatementSnafu)?;

Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ impl RegionServerInner {

let result = self
.query_engine
.execute(request.plan.into(), query_ctx)
.execute(request.plan, query_ctx)
.await
.context(ExecuteLogicalPlanSnafu)?;

Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_runtime::Runtime;
use datafusion_expr::LogicalPlan;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use query::planner::LogicalPlanner;
use query::query_engine::{DescribeResult, QueryEngineState};
use query::{QueryEngine, QueryEngineContext};
Expand Down
2 changes: 0 additions & 2 deletions src/flow/src/df_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use datafusion_expr::{
BinaryExpr, Expr, Operator, Projection, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
use snafu::ResultExt;
Expand Down Expand Up @@ -111,7 +110,6 @@ pub async fn sql_to_flow_plan(
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let LogicalPlan::DfPlan(plan) = plan;

let opted_plan = apply_df_optimizer(plan).await?;

Expand Down
3 changes: 0 additions & 3 deletions src/flow/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ mod test {
use itertools::Itertools;
use prost::Message;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
use session::context::QueryContext;
Expand Down Expand Up @@ -274,7 +273,6 @@ mod test {
.plan(stmt, QueryContext::arc())
.await
.unwrap();
let LogicalPlan::DfPlan(plan) = plan;
let plan = apply_df_optimizer(plan).await.unwrap();

// encode then decode so to rely on the impl of conversion from logical plan to substrait plan
Expand All @@ -297,7 +295,6 @@ mod test {
.plan(stmt, QueryContext::arc())
.await
.unwrap();
let LogicalPlan::DfPlan(plan) = plan;
let plan = apply_df_optimizer(plan).await;

assert!(plan.is_err());
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
datafusion-expr.workspace = true
datanode.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::{debug, error, tracing};
use datafusion_expr::LogicalPlan;
use log_store::raft_engine::RaftEngineBackend;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
Expand All @@ -48,7 +49,6 @@ use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer;
use query::metrics::OnDone;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::QueryEngineRef;
Expand Down
16 changes: 8 additions & 8 deletions src/operator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,6 @@ pub enum Error {
source: query::error::Error,
},

#[snafu(display("Failed to get schema from logical plan"))]
GetSchema {
#[snafu(implicit)]
location: Location,
source: query::error::Error,
},

#[snafu(display("Column datatype error"))]
ColumnDataType {
#[snafu(implicit)]
Expand Down Expand Up @@ -184,6 +177,13 @@ pub enum Error {
source: datatypes::error::Error,
},

#[snafu(display("Failed to convert datafusion schema"))]
ConvertSchema {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to convert expr to struct"))]
InvalidExpr {
#[snafu(implicit)]
Expand Down Expand Up @@ -795,6 +795,7 @@ impl ErrorExt for Error {
| Error::PrepareFileTable { .. }
| Error::InferFileTableSchema { .. }
| Error::SchemaIncompatible { .. }
| Error::ConvertSchema { .. }
| Error::UnsupportedRegionRequest { .. }
| Error::InvalidTableName { .. }
| Error::InvalidViewName { .. }
Expand Down Expand Up @@ -872,7 +873,6 @@ impl ErrorExt for Error {
| Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(),

Error::ExecuteStatement { source, .. }
| Error::GetSchema { source, .. }
| Error::ExtractTableNames { source, .. }
| Error::PlanStatement { source, .. }
| Error::ParseQuery { source, .. }
Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ use common_query::Output;
use common_telemetry::tracing;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion_expr::LogicalPlan;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use query::parser::QueryStatement;
use query::plan::LogicalPlan;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
Expand Down
3 changes: 1 addition & 2 deletions src/operator/src/statement/copy_table_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::datasource::DefaultTableSource;
use datafusion_common::TableReference as DfTableReference;
use datafusion_expr::LogicalPlanBuilder;
use object_store::ObjectStore;
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::requests::CopyTableRequest;
Expand Down Expand Up @@ -133,7 +132,7 @@ impl StatementExecutor {

let output = self
.query_engine
.execute(LogicalPlan::DfPlan(plan), query_ctx)
.execute(plan, query_ctx)
.await
.context(ExecLogicalPlanSnafu)?;
let stream = match output.data {
Expand Down
24 changes: 13 additions & 11 deletions src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use common_query::Output;
use common_telemetry::{debug, info, tracing};
use common_time::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use datatypes::schema::{RawSchema, Schema};
use datatypes::value::Value;
use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
Expand Down Expand Up @@ -69,11 +69,11 @@ use table::TableRef;
use super::StatementExecutor;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu,
ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
InvalidPartitionSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu,
ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
};
use crate::expr_factory;
Expand Down Expand Up @@ -406,9 +406,12 @@ impl StatementExecutor {

// Save the columns in plan, it may changed when the schemas of tables in plan
// are altered.
let plan_columns: Vec<_> = logical_plan
let schema: Schema = logical_plan
.schema()
.context(error::GetSchemaSnafu)?
.clone()
.try_into()
.context(ConvertSchemaSnafu)?;
let plan_columns: Vec<_> = schema
.column_schemas()
.iter()
.map(|c| c.name.clone())
Expand All @@ -434,9 +437,8 @@ impl StatementExecutor {

// Extract the table names from the original plan
// and rewrite them as fully qualified names.
let (table_names, plan) =
extract_and_rewrite_full_table_names(logical_plan.unwrap_df_plan(), ctx.clone())
.context(ExtractTableNamesSnafu)?;
let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
.context(ExtractTableNamesSnafu)?;

let table_names = table_names.into_iter().map(|t| t.into()).collect();

Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/statement/tql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use std::collections::HashMap;

use common_query::Output;
use common_telemetry::tracing;
use datafusion_expr::LogicalPlan;
use query::parser::{
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
};
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::statements::tql::Tql;
Expand Down
7 changes: 3 additions & 4 deletions src/pipeline/src/manager/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ use common_telemetry::{debug, info};
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::logical_expr::col;
use datafusion_common::{TableReference, ToDFSchema};
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan};
use datafusion_expr::{DmlStatement, LogicalPlan};
use datatypes::prelude::ScalarVector;
use datatypes::timestamp::TimestampNanosecond;
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
use moka::sync::Cache;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use query::QueryEngineRef;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
Expand Down Expand Up @@ -373,7 +372,7 @@ impl PipelineTable {
Arc::new(dataframe.into_parts().1),
);

let plan = LogicalPlan::DfPlan(DfLogicalPlan::Dml(stmt));
let plan = LogicalPlan::Dml(stmt);

// 4. execute dml stmt
let output = self
Expand Down Expand Up @@ -427,7 +426,7 @@ impl PipelineTable {
.limit(0, Some(1))
.context(BuildDfLogicalPlanSnafu)?;

let plan = LogicalPlan::DfPlan(dataframe.into_parts().1);
let plan = dataframe.into_parts().1;

let table_info = self.table.table_info();

Expand Down
Loading

0 comments on commit f5cf25b

Please sign in to comment.