Skip to content

Commit

Permalink
ExpandExec Physical Operator
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Mar 17, 2024
1 parent 1d73104 commit 6baa397
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 32 deletions.
4 changes: 2 additions & 2 deletions docs/aggregations/SortBasedAggregationIterator.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
initialize(): Unit
```

!!! warning "Procedure"
`initialize` returns `Unit` (_nothing_) and whatever happens inside stays inside (just like in Las Vegas, _doesn't it?!_ 😉)
??? warning "Procedure"
`initialize` is a procedure (returns `Unit`) so _what happens inside stays inside_ (paraphrasing the [former advertising slogan of Las Vegas, Nevada](https://idioms.thefreedictionary.com/what+happens+in+Vegas+stays+in+Vegas)).

`initialize`...FIXME

Expand Down
18 changes: 13 additions & 5 deletions docs/execution-planning-strategies/BasicOperators.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,27 @@ title: BasicOperators

# BasicOperators Execution Planning Strategy

`BasicOperators` is an [execution planning strategy](SparkStrategy.md) for [basic conversions](#conversions) of [logical operators](../logical-operators/LogicalPlan.md) to their [physical representatives](../physical-operators/SparkPlan.md).
`BasicOperators` is an [execution planning strategy](SparkStrategy.md) for [basic conversions](#conversions) of [logical operators](../logical-operators/LogicalPlan.md) to their [physical counterparts](../physical-operators/SparkPlan.md).

## Conversions

Logical Operator | Physical Operator
---------|---------
[CollectMetrics](../logical-operators/CollectMetrics.md) | [CollectMetricsExec](../physical-operators/CollectMetricsExec.md)
[DataWritingCommand](../logical-operators/DataWritingCommand.md) | [DataWritingCommandExec](../physical-operators/DataWritingCommandExec.md)
[RunnableCommand](../logical-operators/RunnableCommand.md) | [ExecutedCommandExec](../physical-operators/ExecutedCommandExec.md)
MemoryPlan ([Spark Structured Streaming]({{ book.structured_streaming }}/connectors/memory/MemoryPlan/)) | [LocalTableScanExec](../physical-operators/LocalTableScanExec.md)
[DeserializeToObject](../logical-operators/DeserializeToObject.md) | [DeserializeToObjectExec](../physical-operators/DeserializeToObjectExec.md)
[Expand](../logical-operators/Expand.md) | [ExpandExec](../physical-operators/ExpandExec.md)
[ExternalRDD](../logical-operators/ExternalRDD.md) | [ExternalRDDScanExec](../physical-operators/ExternalRDDScanExec.md)
[FlatMapGroupsWithState](../logical-operators/FlatMapGroupsWithState.md) | `CoGroupExec` or `MapGroupsExec`
... | ...
[CollectMetrics](../logical-operators/CollectMetrics.md) | [CollectMetricsExec](../physical-operators/CollectMetricsExec.md)
[Generate](../logical-operators/Generate.md) | [GenerateExec](../physical-operators/GenerateExec.md)
[LogicalRDD](../logical-operators/LogicalRDD.md) | `RDDScanExec`
MemoryPlan ([Spark Structured Streaming]({{ book.structured_streaming }}/datasources/memory/MemoryPlan/)) | [LocalTableScanExec](../physical-operators/LocalTableScanExec.md)
`Range` | [RangeExec](../physical-operators/RangeExec.md)
[RebalancePartitions](../logical-operators/RebalancePartitions.md) | [ShuffleExchangeExec](../physical-operators/ShuffleExchangeExec.md)
[RepartitionByExpression](../logical-operators/RepartitionByExpression.md) | [ShuffleExchangeExec](../physical-operators/ShuffleExchangeExec.md)
[RunnableCommand](../logical-operators/RunnableCommand.md) | [ExecutedCommandExec](../physical-operators/ExecutedCommandExec.md)
[WriteFiles](../logical-operators/WriteFiles.md) | [WriteFilesExec](../physical-operators/WriteFilesExec.md)

!!! tip
Refer to the source code of [BasicOperators]({{ spark.github }}/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L677-L826) to confirm the most up-to-date operator mapping.
Refer to the source code of [BasicOperators]({{ spark.github }}/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L775-L943) to confirm the most up-to-date operator mapping.
2 changes: 1 addition & 1 deletion docs/logical-operators/Expand.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ apply(

## Physical Planning

`Expand` logical operator is resolved to `ExpandExec` physical operator in [BasicOperators](../execution-planning-strategies/BasicOperators.md) execution planning strategy.
`Expand` logical operator is resolved to [ExpandExec](../physical-operators/ExpandExec.md) physical operator in [BasicOperators](../execution-planning-strategies/BasicOperators.md) execution planning strategy.
47 changes: 29 additions & 18 deletions docs/physical-operators/CodegenSupport.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
---
title: CodegenSupport
---

# CodegenSupport Physical Operators

`CodegenSupport` is an [extension](#contract) of the [SparkPlan](SparkPlan.md) abstraction for [physical operators](#implementations) that support [Whole-Stage Java Code Generation](../whole-stage-code-generation/index.md).

## Contract

### <span id="doConsume"> Java Source Code for Consume Path
### Generating Java Source Code for Consume Path { #doConsume }

```scala
doConsume(
Expand All @@ -18,9 +22,11 @@ Generates a Java source code (as a text) for this physical operator for the [con
!!! note "UnsupportedOperationException"
`doConsume` throws an `UnsupportedOperationException` by default.

Used when the physical operator is requested to [generate the Java source code for consume code path](#consume) (a Java code that consumers the generated columns or a row from a physical operator)
Used when:

### <span id="doProduce"> Java Source Code for Produce Path
* This physical operator is requested to [generate the Java source code for consume code path](#consume) (a Java code that consumers the generated columns or a row from a physical operator)

### Generating Java Source Code for Produce Path { #doProduce }

```scala
doProduce(
Expand All @@ -29,9 +35,11 @@ doProduce(

Generates a Java source code (as a text) for the physical operator to process the rows from the [input RDDs](#inputRDDs) for the [whole-stage-codegen "produce" path](../whole-stage-code-generation/index.md#produce-path).

Used when the physical operator is requested to [generate the Java source code for "produce" code path](#produce)
Used when:

* This physical operator is requested to [generate the Java source code for "produce" code path](#produce)

### <span id="inputRDDs"> Input RDDs
### Input RDDs { #inputRDDs }

```scala
inputRDDs(): Seq[RDD[InternalRow]]
Expand All @@ -42,13 +50,16 @@ Input RDDs of the physical operator
!!! important
[Whole-Stage Java Code Generation](../whole-stage-code-generation/index.md) supports up to two input RDDs.

Used when [WholeStageCodegenExec](WholeStageCodegenExec.md) unary physical operator is executed
Used when:

* [WholeStageCodegenExec](WholeStageCodegenExec.md) unary physical operator is executed

## Implementations

* [BroadcastHashJoinExec](BroadcastHashJoinExec.md)
* [ColumnarToRowExec](ColumnarToRowExec.md)
* [DebugExec](DebugExec.md)
* [ExpandExec](ExpandExec.md)
* [FilterExec](FilterExec.md)
* [GenerateExec](GenerateExec.md)
* [ProjectExec](ProjectExec.md)
Expand All @@ -62,7 +73,7 @@ Used when [WholeStageCodegenExec](WholeStageCodegenExec.md) unary physical opera

Final methods are used to generate the Java source code in different phases of [Whole-Stage Java Code Generation](../whole-stage-code-generation/index.md).

### <span id="consume"> Generating Java Source Code for Consume Code Path
### Generating Java Source Code for Consume Code Path { #consume }

```scala
consume(
Expand Down Expand Up @@ -157,7 +168,7 @@ Found 2 WholeStageCodegen subtrees.

* [HashAggregateExec](HashAggregateExec.md#doProduce), [InputAdapter](InputAdapter.md#doProduce), [RowDataSourceScanExec](RowDataSourceScanExec.md#doProduce), [RangeExec](RangeExec.md#doProduce), [SortExec](SortExec.md#doProduce), [SortMergeJoinExec](SortMergeJoinExec.md#doProduce) physical operators are requested to generate the Java source code for the ["produce" path](../whole-stage-code-generation/index.md#produce-path) in whole-stage code generation

### <span id="limitNotReachedCond"> Data-Producing Loop Condition
### Data-Producing Loop Condition { #limitNotReachedCond }

```scala
limitNotReachedCond: String
Expand All @@ -169,7 +180,7 @@ limitNotReachedCond: String

`limitNotReachedCond` returns an empty string for no [limit-not-reached checks](#limitNotReachedChecks) or concatenates them with `&&`.

### <span id="produce"> Generating Java Source Code for Produce Code Path
### Generating Java Source Code for Produce Code Path { #produce }

```scala
produce(
Expand Down Expand Up @@ -230,7 +241,7 @@ Found 2 WholeStageCodegen subtrees.
...
```

## <span id="supportCodegen"> supportCodegen Flag
## supportCodegen Flag { #supportCodegen }

```scala
supportCodegen: Boolean
Expand All @@ -251,7 +262,7 @@ supportCodegen: Boolean

`supportCodegen` flag is used to select between `InputAdapter` or `WholeStageCodegenExec` physical operators when [CollapseCodegenStages](../physical-optimizations/CollapseCodegenStages.md) physical optimization is executed (and [checks whether a physical operator meets the requirements of whole-stage Java code generation or not](../physical-optimizations/CollapseCodegenStages.md#supportCodegen)).

## <span id="prepareRowVar"> prepareRowVar Internal Method
## prepareRowVar Internal Method { #prepareRowVar }

```scala
prepareRowVar(
Expand All @@ -264,7 +275,7 @@ prepareRowVar(

`prepareRowVar` is used when `CodegenSupport` is requested to [consume](#consume) (and [constructDoConsumeFunction](#constructDoConsumeFunction) with [spark.sql.codegen.splitConsumeFuncByOperator](../configuration-properties.md#spark.sql.codegen.splitConsumeFuncByOperator) enabled).

## <span id="constructDoConsumeFunction"> constructDoConsumeFunction Internal Method
## constructDoConsumeFunction Internal Method { #constructDoConsumeFunction }

```scala
constructDoConsumeFunction(
Expand All @@ -277,7 +288,7 @@ constructDoConsumeFunction(

`constructDoConsumeFunction` is used when `CodegenSupport` is requested to [consume](#consume).

## <span id="usedInputs"> Used Input Attributes
## Used Input Attributes { #usedInputs }

```scala
usedInputs: AttributeSet
Expand All @@ -294,7 +305,7 @@ usedInputs: AttributeSet

* `CodegenSupport` is requested to [generate a Java source code for consume path](#consume)

## <span id="parent"> parent Internal Variable Property
## parent Internal Variable Property { #parent }

```scala
parent: CodegenSupport
Expand All @@ -304,7 +315,7 @@ parent: CodegenSupport

`parent` starts empty, (defaults to `null` value) and is assigned a physical operator (with `CodegenContext`) only when `CodegenContext` is requested to [generate a Java source code for produce code path](#produce). The physical operator is passed in as an input argument for the [produce](#produce) code path.

## <span id="limitNotReachedChecks"> limitNotReachedChecks
## limitNotReachedChecks { #limitNotReachedChecks }

```scala
limitNotReachedChecks: Seq[String]
Expand All @@ -322,7 +333,7 @@ limitNotReachedChecks: Seq[String]
* `BaseLimitExec` physical operator is requested to `limitNotReachedChecks`
* `CodegenSupport` physical operator is requested to [limitNotReachedCond](#limitNotReachedCond)

## <span id="canCheckLimitNotReached"> canCheckLimitNotReached
## canCheckLimitNotReached { #canCheckLimitNotReached }

```scala
canCheckLimitNotReached: Boolean
Expand All @@ -336,7 +347,7 @@ canCheckLimitNotReached: Boolean

* `CodegenSupport` physical operator is requested to [limitNotReachedCond](#limitNotReachedCond).

## <span id="variablePrefix"> Variable Name Prefix
## Variable Name Prefix { #variablePrefix }

```scala
variablePrefix: String
Expand All @@ -360,7 +371,7 @@ Physical Operator | Prefix

* `CodegenSupport` is requested to generate the Java source code for [produce](#produce) and [consume](#consume) code paths

## <span id="needCopyResult"> needCopyResult Flag
## needCopyResult Flag { #needCopyResult }

```scala
needCopyResult: Boolean
Expand Down
91 changes: 91 additions & 0 deletions docs/physical-operators/ExpandExec.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
---
title: ExpandExec
---

# ExpandExec Physical Operator

`ExpandExec` is a [unary physical operator](UnaryExecNode.md) with support for [Whole-Stage Java Code Generation](CodegenSupport.md).

## Creating Instance

`ExpandExec` takes the following to be created:

* <span id="projections"> Projection [Expression](../expressions/Expression.md)s (`Seq[Seq[Expression]]`)
* <span id="output"> Output [Attribute](../expressions/Attribute.md)s
* <span id="child"> Child [physical operator](SparkPlan.md)

`ExpandExec` is created when:

* [BasicOperators](../execution-planning-strategies/BasicOperators.md) execution planning strategy is executed (to plan a [Expand](../logical-operators/Expand.md) logical operator)

## Performance Metrics { #metrics }

### number of output rows { #numOutputRows }

## Generating Java Source Code for Consume Path { #doConsume }

??? note "CodegenSupport"

```scala
doConsume(
ctx: CodegenContext,
input: Seq[ExprCode],
row: ExprCode): String
```

`doConsume` is part of the [CodegenSupport](CodegenSupport.md#doConsume) abstraction.

`doConsume`...FIXME

## Generating Java Source Code for Produce Path { #doProduce }

??? note "CodegenSupport"

```scala
doProduce(
ctx: CodegenContext): String
```

`doProduce` is part of the [CodegenSupport](CodegenSupport.md#doProduce) abstraction.

`doProduce` requests the [child](#child) operator (that is supposed to be a [CodegenSupport](CodegenSupport.md) physical operator) to [generate a Java source code for produce code path](CodegenSupport.md#produce).

## Executing Operator { #doExecute }

??? note "SparkPlan"

```scala
doExecute(): RDD[InternalRow]
```

`doExecute` is part of the [SparkPlan](SparkPlan.md#doExecute) abstraction.

`doExecute` requests the [child](#child) operator to [execute](SparkPlan.md#execute) (that creates a `RDD[InternalRow]`).

`doExecute` uses `RDD.mapPartitions` operator to apply a function to each partition of the `RDD[InternalRow]`.

`doExecute`...FIXME

## needCopyResult { #needCopyResult }

??? note "CodegenSupport"

```scala
needCopyResult: Boolean
```

`needCopyResult` is part of the [CodegenSupport](CodegenSupport.md#needCopyResult) abstraction.

`needCopyResult` is always enabled (`true`).

## canPassThrough { #canPassThrough }

`ExpandExec` [canPassThrough](../physical-optimizations/RemoveRedundantProjects.md#canPassThrough) in [RemoveRedundantProjects](../physical-optimizations/RemoveRedundantProjects.md) physical optimization.

## Demo

!!! note "FIXME"

1. Create an plan with `ExpandExec`
1. Access the operator
1. Request it to produce the consume path code
6 changes: 3 additions & 3 deletions docs/physical-operators/LocalTableScanExec.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@

## Performance Metrics

### <span id="numOutputRows"> number of output rows
### number of output rows { #numOutputRows }

## <span id="InputRDDCodegen"> InputRDDCodegen
## InputRDDCodegen { #InputRDDCodegen }

`LocalTableScanExec` is a `InputRDDCodegen`.

## <span id="CollapseCodegenStages"> CollapseCodegenStages
## CollapseCodegenStages { #CollapseCodegenStages }

[CollapseCodegenStages](../physical-optimizations/CollapseCodegenStages.md) physical optimization considers `LocalTableScanExec` special when [insertWholeStageCodegen](../physical-optimizations/CollapseCodegenStages.md#insertWholeStageCodegen) (so it won't be the root of [WholeStageCodegen](../whole-stage-code-generation/index.md) to support the fast driver-local collect/take paths).

Expand Down
6 changes: 5 additions & 1 deletion docs/physical-operators/SparkPlan.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ A `SparkPlan` physical operator is a [Catalyst tree node](../catalyst/TreeNode.m

## Contract

### <span id="doExecute"> doExecute
### Executing Operator { #doExecute }

```scala
doExecute(): RDD[InternalRow]
Expand All @@ -51,6 +51,10 @@ Generates a distributed computation (that is a runtime representation of the ope

Part of [execute](#execute)

See:

* [ExpandExec](ExpandExec.md#doExecute)

## Implementations

* [BaseSubqueryExec](BaseSubqueryExec.md)
Expand Down
9 changes: 7 additions & 2 deletions docs/physical-operators/UnaryExecNode.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
---
title: UnaryExecNode
---

# UnaryExecNode Physical Operators

`UnaryExecNode` is an [extension](#contract) of the [SparkPlan](SparkPlan.md) abstraction for [unary physical operators](#implementations) (with one [child physical operator](#child) only).
`UnaryExecNode` is an [extension](#contract) of the [SparkPlan](SparkPlan.md) abstraction for [unary physical operators](#implementations) that have a single [child physical operator](#child) only.

## Contract

### <span id="child"> Child Physical Operator
### Child Physical Operator { #child }

```scala
child: SparkPlan
Expand All @@ -29,6 +33,7 @@ Used when:
* [DebugExec](DebugExec.md)
* [EvalPythonExec](EvalPythonExec.md)
* [Exchange](Exchange.md)
* [ExpandExec](ExpandExec.md)
* [FilterExec](FilterExec.md)
* FlatMapGroupsInPandasExec ([PySpark]({{ book.pyspark }}/sql/FlatMapGroupsInPandasExec))
* FlatMapGroupsWithStateExec ([Structured Streaming]({{ book.structured_streaming }}/FlatMapGroupsWithStateExec))
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ nav:
- EvalPythonExec: physical-operators/EvalPythonExec.md
- Exchange: physical-operators/Exchange.md
- ExecutedCommandExec: physical-operators/ExecutedCommandExec.md
- ExpandExec: physical-operators/ExpandExec.md
- ExternalRDDScanExec: physical-operators/ExternalRDDScanExec.md
- FileSourceScanExec: physical-operators/FileSourceScanExec.md
- FilterExec: physical-operators/FilterExec.md
Expand Down

0 comments on commit 6baa397

Please sign in to comment.