Skip to content

Commit

Permalink
[FLINK-36047] Add CompiledPlan annotations to BatchExecOverAggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
jnh5y committed Nov 8, 2024
1 parent 35af38a commit a0dad3f
Show file tree
Hide file tree
Showing 8 changed files with 1,191 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.plan.nodes.exec.batch;

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.spec.OverSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.OverSpec.GroupSpec;
Expand Down Expand Up @@ -66,6 +68,9 @@
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rex.RexWindowBound;
import org.apache.calcite.sql.SqlKind;
Expand All @@ -76,6 +81,13 @@
import java.util.List;

/** Batch {@link ExecNode} for sort-based over window aggregate. */
@ExecNodeMetadata(
name = "batch-exec-over-aggregate",
version = 1,
producedTransformations = BatchExecOverAggregateBase.OVER_TRANSFORMATION,
consumedOptions = {"table.exec.resource.external-buffer-memory"},
minPlanVersion = FlinkVersion.v2_0,
minStateVersion = FlinkVersion.v2_0)
public class BatchExecOverAggregate extends BatchExecOverAggregateBase {

public BatchExecOverAggregate(
Expand All @@ -94,6 +106,18 @@ public BatchExecOverAggregate(
description);
}

@JsonCreator
public BatchExecOverAggregate(
@JsonProperty(FIELD_NAME_ID) int id,
@JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
@JsonProperty(FIELD_NAME_OVER_SPEC) OverSpec overSpec,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, context, persistedConfig, overSpec, inputProperties, outputType, description);
}

@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(
Expand Down Expand Up @@ -187,8 +211,7 @@ protected Transformation<RowData> translateToPlanInternal(
}
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
createTransformationName(config),
createTransformationDescription(config),
createTransformationMeta(OVER_TRANSFORMATION, config),
SimpleOperatorFactory.of(operator),
InternalTypeInfo.of(getOutputType()),
inputTransform.getParallelism(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.sql.fun.SqlLeadLagAggFunction;
Expand All @@ -47,6 +49,11 @@
public abstract class BatchExecOverAggregateBase extends ExecNodeBase<RowData>
implements InputSortedExecNode<RowData>, SingleTransformationTranslator<RowData> {

public static final String OVER_TRANSFORMATION = "over";

public static final String FIELD_NAME_OVER_SPEC = "overSpec";

@JsonProperty(FIELD_NAME_OVER_SPEC)
protected final OverSpec overSpec;

public BatchExecOverAggregateBase(
Expand All @@ -67,6 +74,18 @@ public BatchExecOverAggregateBase(
this.overSpec = overSpec;
}

public BatchExecOverAggregateBase(
int id,
ExecNodeContext context,
ReadableConfig persistedConfig,
OverSpec overSpec,
List<InputProperty> inputProperties,
RowType outputType,
String description) {
super(id, context, persistedConfig, inputProperties, outputType, description);
this.overSpec = overSpec;
}

protected RowType getInputTypeWithConstants() {
final RowType inputRowType = (RowType) getInputEdges().get(0).getOutputType();
final List<LogicalType> inputTypesWithConstants =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecLimit;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMatch;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNestedLoopJoin;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecOverAggregate;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortAggregate;
Expand Down Expand Up @@ -181,6 +182,7 @@ private ExecNodeMetadataUtil() {
add(BatchExecSortAggregate.class);
add(BatchExecSortLimit.class);
add(BatchExecMatch.class);
add(BatchExecOverAggregate.class);
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec.batch;

import org.apache.flink.table.planner.plan.nodes.exec.common.OverAggregateTestPrograms;
import org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;

import java.util.Arrays;
import java.util.List;

/** Batch Compiled Plan tests for {@link BatchExecOverAggregate}. */
// @Disabled
public class OverAggregateBatchRestoreTest extends BatchRestoreTestBase {

public OverAggregateBatchRestoreTest() {
super(BatchExecOverAggregate.class);
}

@Override
public List<TableTestProgram> programs() {
return Arrays.asList(
// These tests fail due to FLINK-25802
// OverAggregateTestPrograms.OVER_AGGREGATE_TIME_BOUNDED_PARTITIONED_ROWS,
// OverAggregateTestPrograms.OVER_AGGREGATE_TIME_BOUNDED_NON_PARTITIONED_ROWS
OverAggregateTestPrograms.OVER_AGGREGATE_UNBOUNDED_PARTITIONED_ROWS,
OverAggregateTestPrograms.OVER_AGGREGATE_ROW_BOUNDED_PARTITIONED_PRECEDING_ROWS);
}
}
Loading

0 comments on commit a0dad3f

Please sign in to comment.