From f6cb672980ebc1c3ba172ba93d3db974b54c617b Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Wed, 20 Sep 2023 10:10:05 -0700 Subject: [PATCH] Optimize group-by and join for single key scenario (#11630) --- .../core/util/DataBlockExtractUtils.java | 38 +++- .../query/planner/logical/PlanFragmenter.java | 27 ++- .../logical/RelToPlanNodeConverter.java | 4 +- .../logical/ShuffleRewriteVisitor.java | 32 ++- .../partitioning/EmptyKeySelector.java | 37 ++++ .../planner/partitioning/KeySelector.java | 22 ++- .../partitioning/KeySelectorFactory.java | 42 ++++ ...ector.java => MultiColumnKeySelector.java} | 58 ++---- .../partitioning/SingleColumnKeySelector.java | 37 ++++ .../GreedyShuffleRewriteVisitor.java | 52 +++-- .../query/planner/plannode/JoinNode.java | 20 +- .../planner/plannode/MailboxReceiveNode.java | 14 +- .../planner/plannode/MailboxSendNode.java | 13 +- .../pinot/query/runtime/QueryRunner.java | 2 +- .../runtime/operator/HashJoinOperator.java | 64 +++--- .../runtime/operator/MailboxSendOperator.java | 25 ++- .../operator/MultistageGroupByExecutor.java | 183 +++++++++++------- .../operator/exchange/BlockExchange.java | 16 +- .../operator/exchange/HashExchange.java | 36 ++-- .../runtime/plan/PhysicalPlanVisitor.java | 2 +- .../plan/server/ServerPlanRequestUtils.java | 11 +- .../operator/HashJoinOperatorTest.java | 7 +- .../operator/exchange/HashExchangeTest.java | 11 +- 23 files changed, 439 insertions(+), 314 deletions(-) create mode 100644 pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java create mode 100644 pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java rename pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/{FieldSelectionKeySelector.java => MultiColumnKeySelector.java} (59%) create mode 100644 pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java index 825900df7537..823527102f7a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java @@ -105,7 +105,7 @@ private static Object extractValue(DataBlock dataBlock, ColumnDataType storedTyp } } - public static List extractKeys(DataBlock dataBlock, int[] keyIds) { + public static Key[] extractKeys(DataBlock dataBlock, int[] keyIds) { DataSchema dataSchema = dataBlock.getDataSchema(); int numKeys = keyIds.length; ColumnDataType[] storedTypes = new ColumnDataType[numKeys]; @@ -115,7 +115,7 @@ public static List extractKeys(DataBlock dataBlock, int[] keyIds) { nullBitmaps[colId] = dataBlock.getNullRowIds(keyIds[colId]); } int numRows = dataBlock.getNumberOfRows(); - List keys = new ArrayList<>(numRows); + Key[] keys = new Key[numRows]; for (int rowId = 0; rowId < numRows; rowId++) { Object[] values = new Object[numKeys]; for (int colId = 0; colId < numKeys; colId++) { @@ -124,13 +124,12 @@ public static List extractKeys(DataBlock dataBlock, int[] keyIds) { values[colId] = extractValue(dataBlock, storedTypes[colId], rowId, keyIds[colId]); } } - keys.add(new Key(values)); + keys[rowId] = new Key(values); } return keys; } - public static List extractKeys(DataBlock dataBlock, int[] keyIds, int numMatchedRows, - RoaringBitmap matchedBitmap) { + public static Key[] extractKeys(DataBlock dataBlock, int[] keyIds, int numMatchedRows, RoaringBitmap matchedBitmap) { DataSchema dataSchema = dataBlock.getDataSchema(); int numKeys = keyIds.length; ColumnDataType[] storedTypes = new ColumnDataType[numKeys]; @@ -139,9 +138,9 @@ public static List extractKeys(DataBlock dataBlock, int[] keyIds, int numMa storedTypes[colId] = dataSchema.getColumnDataType(keyIds[colId]).getStoredType(); nullBitmaps[colId] = dataBlock.getNullRowIds(keyIds[colId]); } - List keys = new ArrayList<>(numMatchedRows); + Key[] keys = new Key[numMatchedRows]; PeekableIntIterator iterator = matchedBitmap.getIntIterator(); - for (int i = 0; i < numMatchedRows; i++) { + for (int matchedRowId = 0; matchedRowId < numMatchedRows; matchedRowId++) { int rowId = iterator.next(); Object[] values = new Object[numKeys]; for (int colId = 0; colId < numKeys; colId++) { @@ -150,7 +149,7 @@ public static List extractKeys(DataBlock dataBlock, int[] keyIds, int numMa values[colId] = extractValue(dataBlock, storedTypes[colId], rowId, keyIds[colId]); } } - keys.add(new Key(values)); + keys[matchedRowId] = new Key(values); } return keys; } @@ -175,6 +174,29 @@ public static Object[] extractColumn(DataBlock dataBlock, int colId) { return values; } + public static Object[] extractColumn(DataBlock dataBlock, int colId, int numMatchedRows, + RoaringBitmap matchedBitmap) { + DataSchema dataSchema = dataBlock.getDataSchema(); + ColumnDataType storedType = dataSchema.getColumnDataType(colId).getStoredType(); + RoaringBitmap nullBitmap = dataBlock.getNullRowIds(colId); + Object[] values = new Object[numMatchedRows]; + PeekableIntIterator iterator = matchedBitmap.getIntIterator(); + if (nullBitmap == null) { + for (int matchedRowId = 0; matchedRowId < numMatchedRows; matchedRowId++) { + int rowId = iterator.next(); + values[matchedRowId] = extractValue(dataBlock, storedType, rowId, colId); + } + } else { + for (int matchedRowId = 0; matchedRowId < numMatchedRows; matchedRowId++) { + int rowId = iterator.next(); + if (!nullBitmap.contains(rowId)) { + values[matchedRowId] = extractValue(dataBlock, storedType, rowId, colId); + } + } + } + return values; + } + public static int[] extractIntColumn(DataType storedType, DataBlock dataBlock, int colId, @Nullable RoaringBitmap nullBitmap) { int numRows = dataBlock.getNumberOfRows(); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java index 8ea16e882818..afc139fd99f4 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java @@ -26,8 +26,6 @@ import org.apache.calcite.rel.logical.PinotRelExchangeType; import org.apache.pinot.query.planner.PlanFragment; import org.apache.pinot.query.planner.PlanFragmentMetadata; -import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; -import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.planner.plannode.AggregateNode; import org.apache.pinot.query.planner.plannode.ExchangeNode; import org.apache.pinot.query.planner.plannode.FilterNode; @@ -132,23 +130,20 @@ public PlanNode visitExchange(ExchangeNode node, Context context) { int nextPlanFragmentId = ++context._currentPlanFragmentId; PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, context); - List distributionKeys = node.getDistributionKeys(); - RelDistribution.Type distributionType = node.getDistributionType(); PinotRelExchangeType exchangeType = node.getExchangeType(); - - // make an exchange sender and receiver node pair - // only HASH_DISTRIBUTED requires a partition key selector; so all other types (SINGLETON and BROADCAST) - // of exchange will not carry a partition key selector. - KeySelector keySelector = distributionType == RelDistribution.Type.HASH_DISTRIBUTED - ? new FieldSelectionKeySelector(distributionKeys) : null; + RelDistribution.Type distributionType = node.getDistributionType(); + // NOTE: Only HASH_DISTRIBUTED requires distribution keys + // TODO: Revisit ExchangeNode creation logic to avoid using HASH_DISTRIBUTED with empty distribution keys + List distributionKeys = + distributionType == RelDistribution.Type.HASH_DISTRIBUTED ? node.getDistributionKeys() : null; PlanNode mailboxSender = - new MailboxSendNode(nextPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), - currentPlanFragmentId, distributionType, exchangeType, keySelector, node.getCollations(), - node.isSortOnSender()); - PlanNode mailboxReceiver = new MailboxReceiveNode(currentPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), - nextPlanFragmentId, distributionType, exchangeType, keySelector, - node.getCollations(), node.isSortOnSender(), node.isSortOnReceiver(), mailboxSender); + new MailboxSendNode(nextPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), currentPlanFragmentId, + distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender()); + PlanNode mailboxReceiver = + new MailboxReceiveNode(currentPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), nextPlanFragmentId, + distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(), + node.isSortOnReceiver(), mailboxSender); mailboxSender.addInput(nextPlanFragmentRoot); context._planFragmentIdToRootNodeMap.put(nextPlanFragmentId, diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java index 131cc29e4183..b9e00711afe4 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java @@ -48,7 +48,6 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; import org.apache.pinot.query.planner.plannode.AggregateNode; import org.apache.pinot.query.planner.plannode.ExchangeNode; import org.apache.pinot.query.planner.plannode.FilterNode; @@ -180,8 +179,7 @@ private static PlanNode convertLogicalJoin(LogicalJoin node, int currentStageId) // Parse out all equality JOIN conditions JoinInfo joinInfo = node.analyzeCondition(); - JoinNode.JoinKeys joinKeys = new JoinNode.JoinKeys(new FieldSelectionKeySelector(joinInfo.leftKeys), - new FieldSelectionKeySelector(joinInfo.rightKeys)); + JoinNode.JoinKeys joinKeys = new JoinNode.JoinKeys(joinInfo.leftKeys, joinInfo.rightKeys); List joinClause = joinInfo.nonEquiConditions.stream().map(RexExpressionUtils::fromRexNode).collect(Collectors.toList()); return new JoinNode(currentStageId, toDataSchema(node.getRowType()), toDataSchema(node.getLeft().getRowType()), diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java index ba06f3f405fa..762f2a44030f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java @@ -23,9 +23,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; -import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; -import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.planner.plannode.AggregateNode; import org.apache.pinot.query.planner.plannode.ExchangeNode; import org.apache.pinot.query.planner.plannode.FilterNode; @@ -105,14 +104,14 @@ public Set visitJoin(JoinNode node, Void context) { Set rightPks = node.getInputs().get(1).visit(this, context); // Currently, JOIN criteria is guaranteed to only have one FieldSelectionKeySelector - FieldSelectionKeySelector leftJoinKey = (FieldSelectionKeySelector) node.getJoinKeys().getLeftJoinKeySelector(); - FieldSelectionKeySelector rightJoinKey = (FieldSelectionKeySelector) node.getJoinKeys().getRightJoinKeySelector(); + List leftJoinKeys = node.getJoinKeys().getLeftKeys(); + List rightJoinKeys = node.getJoinKeys().getRightKeys(); int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size(); Set partitionKeys = new HashSet<>(); - for (int i = 0; i < leftJoinKey.getColumnIndices().size(); i++) { - int leftIdx = leftJoinKey.getColumnIndices().get(i); - int rightIdx = rightJoinKey.getColumnIndices().get(i); + for (int i = 0; i < leftJoinKeys.size(); i++) { + int leftIdx = leftJoinKeys.get(i); + int rightIdx = rightJoinKeys.get(i); if (leftPKs.contains(leftIdx)) { partitionKeys.add(leftIdx); } @@ -133,24 +132,24 @@ public Set visitJoin(JoinNode node, Void context) { @Override public Set visitMailboxReceive(MailboxReceiveNode node, Void context) { Set oldPartitionKeys = node.getSender().visit(this, context); - KeySelector selector = node.getPartitionKeySelector(); + List distributionKeys = node.getDistributionKeys(); - if (canSkipShuffle(oldPartitionKeys, selector)) { + if (canSkipShuffle(oldPartitionKeys, distributionKeys)) { node.setDistributionType(RelDistribution.Type.SINGLETON); return oldPartitionKeys; - } else if (selector == null) { + } else if (distributionKeys == null) { return new HashSet<>(); } else { - return new HashSet<>(((FieldSelectionKeySelector) selector).getColumnIndices()); + return new HashSet<>(distributionKeys); } } @Override public Set visitMailboxSend(MailboxSendNode node, Void context) { Set oldPartitionKeys = node.getInputs().get(0).visit(this, context); - KeySelector selector = node.getPartitionKeySelector(); + List distributionKeys = node.getDistributionKeys(); - if (canSkipShuffle(oldPartitionKeys, selector)) { + if (canSkipShuffle(oldPartitionKeys, distributionKeys)) { node.setDistributionType(RelDistribution.Type.SINGLETON); return oldPartitionKeys; } else { @@ -185,10 +184,9 @@ public Set visitValue(ValueNode node, Void context) { return new HashSet<>(); } - private static boolean canSkipShuffle(Set partitionKeys, KeySelector keySelector) { - if (!partitionKeys.isEmpty() && keySelector != null) { - Set targetSet = new HashSet<>(((FieldSelectionKeySelector) keySelector).getColumnIndices()); - return targetSet.containsAll(partitionKeys); + private static boolean canSkipShuffle(Set partitionKeys, @Nullable List distributionKeys) { + if (!partitionKeys.isEmpty() && distributionKeys != null) { + return distributionKeys.containsAll(partitionKeys); } return false; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java new file mode 100644 index 000000000000..7f02389eaa3b --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/EmptyKeySelector.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.partitioning; + +public class EmptyKeySelector implements KeySelector { + private EmptyKeySelector() { + } + + public static final EmptyKeySelector INSTANCE = new EmptyKeySelector(); + private static final Integer PLACE_HOLDER = 0; + + @Override + public Integer getKey(Object[] row) { + return PLACE_HOLDER; + } + + @Override + public int computeHash(Object[] input) { + return 0; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java index 57e3978d2ed6..e5003fbb267a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.query.planner.partitioning; - /** * The {@code KeySelector} provides a partitioning function to encode a specific input data type into a key. * @@ -26,20 +25,23 @@ * *

Key selector should always produce the same selection hash key when the same input is provided. */ -public interface KeySelector { +public interface KeySelector { + String DEFAULT_HASH_ALGORITHM = "absHashCode"; /** - * Extract the key out of an input data construct. - * - * @param input input data. - * @return the key of the input data. + * Extracts the key out of the given row. */ - OUT getKey(IN input); + T getKey(Object[] row); - int computeHash(IN input); + /** + * Computes the hash of the given row. + */ + int computeHash(Object[] input); /** - * @return the hash-algorithm used for distributing rows + * Returns the hash algorithm used to compute the hash. */ - String hashAlgorithm(); + default String hashAlgorithm() { + return DEFAULT_HASH_ALGORITHM; + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java new file mode 100644 index 000000000000..b70615f68d0f --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelectorFactory.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.partitioning; + +import java.util.List; + + +public class KeySelectorFactory { + private KeySelectorFactory() { + } + + public static KeySelector getKeySelector(List keyIds) { + int numKeys = keyIds.size(); + if (numKeys == 0) { + return EmptyKeySelector.INSTANCE; + } else if (numKeys == 1) { + return new SingleColumnKeySelector(keyIds.get(0)); + } else { + int[] ids = new int[numKeys]; + for (int i = 0; i < numKeys; i++) { + ids[i] = keyIds.get(i); + } + return new MultiColumnKeySelector(ids); + } + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/MultiColumnKeySelector.java similarity index 59% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/MultiColumnKeySelector.java index b23b34433b32..c510e5b70142 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/MultiColumnKeySelector.java @@ -18,51 +18,24 @@ */ package org.apache.pinot.query.planner.partitioning; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.pinot.query.planner.serde.ProtoProperties; +import org.apache.pinot.core.data.table.Key; -/** - * The {@code FieldSelectionKeySelector} simply extract a column value out from a row array {@link Object[]}. - */ -public class FieldSelectionKeySelector implements KeySelector { - private static final String HASH_ALGORITHM = "absHashCode"; - - @ProtoProperties - private List _columnIndices; - - public FieldSelectionKeySelector() { - } - - public FieldSelectionKeySelector(int columnIndex) { - _columnIndices = Collections.singletonList(columnIndex); - } +public class MultiColumnKeySelector implements KeySelector { + private final int[] _keyIds; - public FieldSelectionKeySelector(List columnIndices) { - _columnIndices = new ArrayList<>(); - _columnIndices.addAll(columnIndices); - } - - public FieldSelectionKeySelector(int... columnIndices) { - _columnIndices = new ArrayList<>(); - for (int columnIndex : columnIndices) { - _columnIndices.add(columnIndex); - } - } - - public List getColumnIndices() { - return _columnIndices; + public MultiColumnKeySelector(int[] keyIds) { + _keyIds = keyIds; } @Override - public Object[] getKey(Object[] input) { - Object[] key = new Object[_columnIndices.size()]; - for (int i = 0; i < _columnIndices.size(); i++) { - key[i] = input[_columnIndices.get(i)]; + public Key getKey(Object[] row) { + int numKeys = _keyIds.length; + Object[] values = new Object[numKeys]; + for (int i = 0; i < numKeys; i++) { + values[i] = row[_keyIds[i]]; } - return key; + return new Key(values); } @Override @@ -84,8 +57,8 @@ public int computeHash(Object[] input) { // // TODO: consider better hashing algorithms than hashCode sum, such as XOR'ing int hashCode = 0; - for (int columnIndex : _columnIndices) { - Object value = input[columnIndex]; + for (int keyId : _keyIds) { + Object value = input[keyId]; if (value != null) { hashCode += value.hashCode(); } @@ -94,9 +67,4 @@ public int computeHash(Object[] input) { // return a positive number because this is used directly to modulo-index return hashCode & Integer.MAX_VALUE; } - - @Override - public String hashAlgorithm() { - return HASH_ALGORITHM; - } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java new file mode 100644 index 000000000000..114f5322683e --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/SingleColumnKeySelector.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.partitioning; + +public class SingleColumnKeySelector implements KeySelector { + private final int _keyId; + + public SingleColumnKeySelector(int keyId) { + _keyId = keyId; + } + + @Override + public Object getKey(Object[] row) { + return row[_keyId]; + } + + @Override + public int computeHash(Object[] input) { + return input[_keyId].hashCode() & Integer.MAX_VALUE; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java index 287db51daaf5..3d74074bb3b3 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java @@ -20,17 +20,16 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata; import org.apache.pinot.query.planner.plannode.AggregateNode; @@ -163,14 +162,15 @@ public Set visitJoin(JoinNode node, GreedyShuffleRewriteContext c @Override public Set visitMailboxReceive(MailboxReceiveNode node, GreedyShuffleRewriteContext context) { - KeySelector selector = node.getPartitionKeySelector(); Set oldColocationKeys = context.getColocationKeys(node.getSenderStageId()); + List distributionKeys = node.getDistributionKeys(); + // If the current stage is not a join-stage, then we already know sender's distribution if (!context.isJoinStage(node.getPlanFragmentId())) { - if (selector == null) { + if (distributionKeys == null) { return new HashSet<>(); - } else if (colocationKeyCondition(oldColocationKeys, selector) && areServersSuperset(node.getPlanFragmentId(), - node.getSenderStageId())) { + } else if (colocationKeyCondition(oldColocationKeys, distributionKeys) && areServersSuperset( + node.getPlanFragmentId(), node.getSenderStageId())) { node.setDistributionType(RelDistribution.Type.SINGLETON); _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setWorkerIdToServerInstanceMap( _dispatchablePlanMetadataMap.get(node.getSenderStageId()).getWorkerIdToServerInstanceMap()); @@ -179,8 +179,9 @@ public Set visitMailboxReceive(MailboxReceiveNode node, GreedyShu // This means we can't skip shuffle and there's a partitioning enforced by receiver. int numPartitions = new HashSet<>( _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getWorkerIdToServerInstanceMap().values()).size(); - List colocationKeys = ((FieldSelectionKeySelector) selector).getColumnIndices().stream() - .map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList()); + List colocationKeys = + distributionKeys.stream().map(x -> new ColocationKey(x, numPartitions, KeySelector.DEFAULT_HASH_ALGORITHM)) + .collect(Collectors.toList()); return new HashSet<>(colocationKeys); } // If the current stage is a join-stage then we already know whether shuffle can be skipped. @@ -190,23 +191,24 @@ public Set visitMailboxReceive(MailboxReceiveNode node, GreedyShu // distribution. ((MailboxSendNode) node.getSender()).setDistributionType(RelDistribution.Type.SINGLETON); return oldColocationKeys; - } else if (selector == null) { + } else if (distributionKeys == null) { return new HashSet<>(); } // This means we can't skip shuffle and there's a partitioning enforced by receiver. int numPartitions = new HashSet<>( _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getWorkerIdToServerInstanceMap().values()).size(); - List colocationKeys = ((FieldSelectionKeySelector) selector).getColumnIndices().stream() - .map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList()); + List colocationKeys = + distributionKeys.stream().map(x -> new ColocationKey(x, numPartitions, KeySelector.DEFAULT_HASH_ALGORITHM)) + .collect(Collectors.toList()); return new HashSet<>(colocationKeys); } @Override public Set visitMailboxSend(MailboxSendNode node, GreedyShuffleRewriteContext context) { Set oldColocationKeys = node.getInputs().get(0).visit(this, context); - KeySelector selector = node.getPartitionKeySelector(); + List distributionKeys = node.getDistributionKeys(); - boolean canSkipShuffleBasic = colocationKeyCondition(oldColocationKeys, selector); + boolean canSkipShuffleBasic = colocationKeyCondition(oldColocationKeys, distributionKeys); // If receiver is not a join-stage, then we can determine distribution type now. if (!context.isJoinStage(node.getReceiverStageId())) { Set colocationKeys; @@ -356,11 +358,10 @@ private static Set computeNewColocationKeys(Set ol } private static boolean colocationKeyCondition(Set colocationKeys, - KeySelector keySelector) { - if (!colocationKeys.isEmpty() && keySelector != null) { - List targetSet = new ArrayList<>(((FieldSelectionKeySelector) keySelector).getColumnIndices()); + @Nullable List distributionKeys) { + if (!colocationKeys.isEmpty() && distributionKeys != null) { for (ColocationKey colocationKey : colocationKeys) { - if (targetSet.size() >= colocationKey.getIndices().size() && targetSet.subList(0, + if (distributionKeys.size() >= colocationKey.getIndices().size() && distributionKeys.subList(0, colocationKey.getIndices().size()).equals(colocationKey.getIndices())) { return true; } @@ -373,22 +374,19 @@ private static boolean partitionKeyConditionForJoin(MailboxReceiveNode mailboxRe MailboxSendNode mailboxSendNode, GreedyShuffleRewriteContext context) { // First check ColocationKeyCondition for the sender <--> sender.getInputs().get(0) pair Set oldColocationKeys = context.getColocationKeys(mailboxSendNode.getPlanFragmentId()); - KeySelector selector = mailboxSendNode.getPartitionKeySelector(); - if (!colocationKeyCondition(oldColocationKeys, selector)) { + if (!colocationKeyCondition(oldColocationKeys, mailboxSendNode.getDistributionKeys())) { return false; } // Check ColocationKeyCondition for the sender <--> receiver pair // Since shuffle can be skipped, oldPartitionsKeys == senderColocationKeys - selector = mailboxReceiveNode.getPartitionKeySelector(); - return colocationKeyCondition(oldColocationKeys, selector); + return colocationKeyCondition(oldColocationKeys, mailboxReceiveNode.getDistributionKeys()); } private static ColocationKey getEquivalentSenderKey(Set colocationKeys, - KeySelector keySelector) { - if (!colocationKeys.isEmpty() && keySelector != null) { - List targetSet = new ArrayList<>(((FieldSelectionKeySelector) keySelector).getColumnIndices()); + List distributionKeys) { + if (!colocationKeys.isEmpty() && distributionKeys != null) { for (ColocationKey colocationKey : colocationKeys) { - if (targetSet.size() >= colocationKey.getIndices().size() && targetSet.subList(0, + if (distributionKeys.size() >= colocationKey.getIndices().size() && distributionKeys.subList(0, colocationKey.getIndices().size()).equals(colocationKey.getIndices())) { return colocationKey; } @@ -402,9 +400,9 @@ private static boolean checkPartitionScheme(MailboxReceiveNode leftReceiveNode, int leftSender = leftReceiveNode.getSenderStageId(); int rightSender = rightReceiveNode.getSenderStageId(); ColocationKey leftPKey = - getEquivalentSenderKey(context.getColocationKeys(leftSender), leftReceiveNode.getPartitionKeySelector()); + getEquivalentSenderKey(context.getColocationKeys(leftSender), leftReceiveNode.getDistributionKeys()); ColocationKey rightPKey = - getEquivalentSenderKey(context.getColocationKeys(rightSender), rightReceiveNode.getPartitionKeySelector()); + getEquivalentSenderKey(context.getColocationKeys(rightSender), rightReceiveNode.getDistributionKeys()); if (leftPKey.getNumPartitions() != rightPKey.getNumPartitions()) { return false; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java index fe55facf01eb..4ba28a9cc6b7 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java @@ -24,8 +24,6 @@ import org.apache.calcite.rel.hint.RelHint; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; -import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.planner.serde.ProtoProperties; @@ -94,24 +92,24 @@ public T visit(PlanNodeVisitor visitor, C context) { public static class JoinKeys { @ProtoProperties - private KeySelector _leftJoinKeySelector; + private List _leftKeys; @ProtoProperties - private KeySelector _rightJoinKeySelector; + private List _rightKeys; public JoinKeys() { } - public JoinKeys(FieldSelectionKeySelector leftKeySelector, FieldSelectionKeySelector rightKeySelector) { - _leftJoinKeySelector = leftKeySelector; - _rightJoinKeySelector = rightKeySelector; + public JoinKeys(List leftKeys, List rightKeys) { + _leftKeys = leftKeys; + _rightKeys = rightKeys; } - public KeySelector getLeftJoinKeySelector() { - return _leftJoinKeySelector; + public List getLeftKeys() { + return _leftKeys; } - public KeySelector getRightJoinKeySelector() { - return _rightJoinKeySelector; + public List getRightKeys() { + return _rightKeys; } } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java index c2515edbb6ae..ed268c0985e8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java @@ -31,7 +31,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.planner.serde.ProtoProperties; @@ -43,7 +42,7 @@ public class MailboxReceiveNode extends AbstractPlanNode { @ProtoProperties private PinotRelExchangeType _exchangeType; @ProtoProperties - private KeySelector _partitionKeySelector; + private List _distributionKeys; @ProtoProperties private List _collationKeys; @ProtoProperties @@ -65,14 +64,13 @@ public MailboxReceiveNode(int planFragmentId) { public MailboxReceiveNode(int planFragmentId, DataSchema dataSchema, int senderStageId, RelDistribution.Type distributionType, PinotRelExchangeType exchangeType, - @Nullable KeySelector partitionKeySelector, - @Nullable List fieldCollations, boolean isSortOnSender, boolean isSortOnReceiver, - PlanNode sender) { + @Nullable List distributionKeys, @Nullable List fieldCollations, + boolean isSortOnSender, boolean isSortOnReceiver, PlanNode sender) { super(planFragmentId, dataSchema); _senderStageId = senderStageId; _distributionType = distributionType; _exchangeType = exchangeType; - _partitionKeySelector = partitionKeySelector; + _distributionKeys = distributionKeys; if (!CollectionUtils.isEmpty(fieldCollations)) { int numCollations = fieldCollations.size(); _collationKeys = new ArrayList<>(numCollations); @@ -125,8 +123,8 @@ public PinotRelExchangeType getExchangeType() { return _exchangeType; } - public KeySelector getPartitionKeySelector() { - return _partitionKeySelector; + public List getDistributionKeys() { + return _distributionKeys; } public List getCollationKeys() { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java index 1edf2902df43..1834ac90c11b 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java @@ -29,7 +29,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.planner.serde.ProtoProperties; @@ -41,7 +40,7 @@ public class MailboxSendNode extends AbstractPlanNode { @ProtoProperties private PinotRelExchangeType _exchangeType; @ProtoProperties - private KeySelector _partitionKeySelector; + private List _distributionKeys; @ProtoProperties private List _collationKeys; @ProtoProperties @@ -55,13 +54,13 @@ public MailboxSendNode(int planFragmentId) { public MailboxSendNode(int planFragmentId, DataSchema dataSchema, int receiverStageId, RelDistribution.Type distributionType, PinotRelExchangeType exchangeType, - @Nullable KeySelector partitionKeySelector, - @Nullable List fieldCollations, boolean isSortOnSender) { + @Nullable List distributionKeys, @Nullable List fieldCollations, + boolean isSortOnSender) { super(planFragmentId, dataSchema); _receiverStageId = receiverStageId; _distributionType = distributionType; _exchangeType = exchangeType; - _partitionKeySelector = partitionKeySelector; + _distributionKeys = distributionKeys; // TODO: Support ordering here if the 'fieldCollations' aren't empty and 'sortOnSender' is true Preconditions.checkState(!isSortOnSender, "Ordering is not yet supported on Mailbox Send"); if (!CollectionUtils.isEmpty(fieldCollations) && isSortOnSender) { @@ -102,8 +101,8 @@ public PinotRelExchangeType getExchangeType() { return _exchangeType; } - public KeySelector getPartitionKeySelector() { - return _partitionKeySelector; + public List getDistributionKeys() { + return _distributionKeys; } public List getCollationKeys() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 1e27fb559cae..8c441e652493 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -264,7 +264,7 @@ private OpChain compileLeafStage(OpChainExecutionContext executionContext, _leafQueryExecutor, _executorService); MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(executionContext, leafStageOperator, sendNode.getDistributionType(), - sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(), + sendNode.getDistributionKeys(), sendNode.getCollationKeys(), sendNode.getCollationDirections(), sendNode.isSortOnSender(), sendNode.getReceiverStageId()); return new OpChain(executionContext, mailboxSendOperator); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index bed297706ca8..32ca46290a3b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -22,8 +22,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import java.util.ArrayList; +import java.util.BitSet; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,9 +35,9 @@ import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.config.QueryOptionsUtils; -import org.apache.pinot.core.data.table.Key; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.partitioning.KeySelector; +import org.apache.pinot.query.planner.partitioning.KeySelectorFactory; import org.apache.pinot.query.planner.plannode.AbstractPlanNode; import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.runtime.blocks.TransferableBlock; @@ -73,12 +73,11 @@ public class HashJoinOperator extends MultiStageOperator { ImmutableSet.of(JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT, JoinRelType.FULL, JoinRelType.SEMI, JoinRelType.ANTI); - private final HashMap> _broadcastRightTable; + private final Map> _broadcastRightTable; // Used to track matched right rows. // Only used for right join and full join to output non-matched right rows. - // TODO: Replace hashset with rolling bit map. - private final HashMap> _matchedRightRows; + private final Map _matchedRightRows; private final MultiStageOperator _leftTableOperator; private final MultiStageOperator _rightTableOperator; @@ -94,8 +93,8 @@ public class HashJoinOperator extends MultiStageOperator { // TODO: Remove this special handling by fixing data block EOS abstraction or operator's invariant. private boolean _isTerminated; private TransferableBlock _upstreamErrorBlock; - private final KeySelector _leftKeySelector; - private final KeySelector _rightKeySelector; + private final KeySelector _leftKeySelector; + private final KeySelector _rightKeySelector; // Below are specific parameters to protect the hash table from growing too large. // Once the hash table reaches the limit, we will throw exception or break the right table build process. @@ -119,10 +118,9 @@ public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator left Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()), "Join type: " + node.getJoinRelType() + " is not supported!"); _joinType = node.getJoinRelType(); - _leftKeySelector = node.getJoinKeys().getLeftJoinKeySelector(); - _rightKeySelector = node.getJoinKeys().getRightJoinKeySelector(); - Preconditions.checkState(_leftKeySelector != null, "LeftKeySelector for join cannot be null"); - Preconditions.checkState(_rightKeySelector != null, "RightKeySelector for join cannot be null"); + JoinNode.JoinKeys joinKeys = node.getJoinKeys(); + _leftKeySelector = KeySelectorFactory.getKeySelector(joinKeys.getLeftKeys()); + _rightKeySelector = KeySelectorFactory.getKeySelector(joinKeys.getRightKeys()); _leftColumnSize = leftSchema.size(); Preconditions.checkState(_leftColumnSize > 0, "leftColumnSize has to be greater than zero:" + _leftColumnSize); _resultSchema = node.getDataSchema(); @@ -231,9 +229,8 @@ private void buildBroadcastHashTable() } // put all the rows into corresponding hash collections keyed by the key selector function. for (Object[] row : container) { - ArrayList hashCollection = - _broadcastRightTable.computeIfAbsent(new Key(_rightKeySelector.getKey(row)), - k -> new ArrayList<>(INITIAL_HEURISTIC_SIZE)); + ArrayList hashCollection = _broadcastRightTable.computeIfAbsent(_rightKeySelector.getKey(row), + k -> new ArrayList<>(INITIAL_HEURISTIC_SIZE)); int size = hashCollection.size(); if ((size & size - 1) == 0 && size < _maxRowsInHashTable && size < Integer.MAX_VALUE / 2) { // is power of 2 hashCollection.ensureCapacity(Math.min(size << 1, _maxRowsInHashTable)); @@ -267,15 +264,18 @@ private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock) { // TODO: Moved to a different function. // Return remaining non-matched rows for non-inner join. List returnRows = new ArrayList<>(); - for (Map.Entry> entry : _broadcastRightTable.entrySet()) { - Set matchedIdx = _matchedRightRows.getOrDefault(entry.getKey(), new HashSet<>()); + for (Map.Entry> entry : _broadcastRightTable.entrySet()) { List rightRows = entry.getValue(); - if (rightRows.size() == matchedIdx.size()) { - continue; - } - for (int i = 0; i < rightRows.size(); i++) { - if (!matchedIdx.contains(i)) { - returnRows.add(joinRow(null, rightRows.get(i))); + BitSet matchedIndices = _matchedRightRows.get(entry.getKey()); + if (matchedIndices == null) { + for (Object[] rightRow : rightRows) { + returnRows.add(joinRow(null, rightRow)); + } + } else { + int numRightRows = rightRows.size(); + int unmatchedIndex = 0; + while ((unmatchedIndex = matchedIndices.nextClearBit(unmatchedIndex)) < numRightRows) { + returnRows.add(joinRow(null, rightRows.get(unmatchedIndex++))); } } } @@ -313,7 +313,7 @@ private List buildJoinedDataBlockSemi(TransferableBlock leftBlock) { List rows = new ArrayList<>(container.size()); for (Object[] leftRow : container) { - Key key = new Key(_leftKeySelector.getKey(leftRow)); + Object key = _leftKeySelector.getKey(leftRow); // SEMI-JOIN only checks existence of the key if (_broadcastRightTable.containsKey(key)) { rows.add(joinRow(leftRow, null)); @@ -328,19 +328,20 @@ private List buildJoinedDataBlockDefault(TransferableBlock leftBlock) ArrayList rows = new ArrayList<>(container.size()); for (Object[] leftRow : container) { - Key key = new Key(_leftKeySelector.getKey(leftRow)); + Object key = _leftKeySelector.getKey(leftRow); // NOTE: Empty key selector will always give same hash code. - List matchedRightRows = _broadcastRightTable.getOrDefault(key, null); - if (matchedRightRows == null) { + List rightRows = _broadcastRightTable.get(key); + if (rightRows == null) { if (needUnmatchedLeftRows()) { rows.add(joinRow(leftRow, null)); } continue; } boolean hasMatchForLeftRow = false; - rows.ensureCapacity(rows.size() + matchedRightRows.size()); - for (int i = 0; i < matchedRightRows.size(); i++) { - Object[] rightRow = matchedRightRows.get(i); + int numRightRows = rightRows.size(); + rows.ensureCapacity(rows.size() + numRightRows); + for (int i = 0; i < numRightRows; i++) { + Object[] rightRow = rightRows.get(i); // TODO: Optimize this to avoid unnecessary object copy. Object[] resultRow = joinRow(leftRow, rightRow); if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream() @@ -348,8 +349,7 @@ private List buildJoinedDataBlockDefault(TransferableBlock leftBlock) rows.add(resultRow); hasMatchForLeftRow = true; if (_matchedRightRows != null) { - HashSet matchedRows = _matchedRightRows.computeIfAbsent(key, k -> new HashSet<>()); - matchedRows.add(i); + _matchedRightRows.computeIfAbsent(key, k -> new BitSet(numRightRows)).set(i); } } } @@ -366,7 +366,7 @@ private List buildJoinedDataBlockAnti(TransferableBlock leftBlock) { List rows = new ArrayList<>(container.size()); for (Object[] leftRow : container) { - Key key = new Key(_leftKeySelector.getKey(leftRow)); + Object key = _leftKeySelector.getKey(leftRow); // ANTI-JOIN only checks non-existence of the key if (!_broadcastRightTable.containsKey(key)) { rows.add(joinRow(leftRow, null)); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 017220e4f7f3..74fe297a8f69 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -20,11 +20,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; @@ -33,7 +32,6 @@ import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.routing.MailboxMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; @@ -51,8 +49,8 @@ * TODO: Add support to sort the data prior to sending if sorting is enabled */ public class MailboxSendOperator extends MultiStageOperator { - public static final Set SUPPORTED_EXCHANGE_TYPES = - ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED, + public static final EnumSet SUPPORTED_EXCHANGE_TYPES = + EnumSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED); private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class); @@ -65,11 +63,11 @@ public class MailboxSendOperator extends MultiStageOperator { private final boolean _isSortOnSender; public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator sourceOperator, - RelDistribution.Type exchangeType, KeySelector keySelector, + RelDistribution.Type distributionType, @Nullable List distributionKeys, @Nullable List collationKeys, @Nullable List collationDirections, boolean isSortOnSender, int receiverStageId) { - this(context, sourceOperator, getBlockExchange(context, exchangeType, keySelector, receiverStageId), collationKeys, - collationDirections, isSortOnSender); + this(context, sourceOperator, getBlockExchange(context, distributionType, distributionKeys, receiverStageId), + collationKeys, collationDirections, isSortOnSender); } @VisibleForTesting @@ -84,10 +82,10 @@ public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator s _isSortOnSender = isSortOnSender; } - private static BlockExchange getBlockExchange(OpChainExecutionContext context, RelDistribution.Type exchangeType, - KeySelector keySelector, int receiverStageId) { - Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType), "Unsupported exchange type: %s", - exchangeType); + private static BlockExchange getBlockExchange(OpChainExecutionContext context, RelDistribution.Type distributionType, + @Nullable List distributionKeys, int receiverStageId) { + Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(distributionType), "Unsupported distribution type: %s", + distributionType); MailboxService mailboxService = context.getMailboxService(); long requestId = context.getRequestId(); long deadlineMs = context.getDeadlineMs(); @@ -101,7 +99,8 @@ private static BlockExchange getBlockExchange(OpChainExecutionContext context, R sendingMailboxes.add(mailboxService.getSendingMailbox(mailboxMetadata.getVirtualAddress(i).hostname(), mailboxMetadata.getVirtualAddress(i).port(), sendingMailboxIds.get(i), deadlineMs)); } - return BlockExchange.getExchange(sendingMailboxes, exchangeType, keySelector, TransferableBlockUtils::splitBlock); + return BlockExchange.getExchange(sendingMailboxes, distributionType, distributionKeys, + TransferableBlockUtils::splitBlock); } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java index 1a171ed00779..075e24282f82 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java @@ -18,9 +18,10 @@ */ package org.apache.pinot.query.runtime.operator; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -60,13 +61,11 @@ public class MultistageGroupByExecutor { // Group By Result holders for each mode private final GroupByResultHolder[] _aggregateResultHolders; - private final Map _mergeResultHolder; + private final List _mergeResultHolder; // Mapping from the row-key to a zero based integer index. This is used when we invoke the v1 aggregation functions // because they use the zero based integer indexes to store results. - private final Map _groupKeyToIdMap = new HashMap<>(); - - private boolean _numGroupsLimitReached; + private final Object2IntOpenHashMap _groupKeyToIdMap; public MultistageGroupByExecutor(int[] groupKeyIds, AggregationFunction[] aggFunctions, int[] filterArgIds, int maxFilterArgId, AggType aggType, DataSchema resultSchema, Map opChainMetadata, @@ -82,16 +81,19 @@ public MultistageGroupByExecutor(int[] groupKeyIds, AggregationFunction[] aggFun int numFunctions = aggFunctions.length; if (!aggType.isInputIntermediateFormat()) { - _aggregateResultHolders = new GroupByResultHolder[_aggFunctions.length]; - for (int i = 0; i < _aggFunctions.length; i++) { + _aggregateResultHolders = new GroupByResultHolder[numFunctions]; + for (int i = 0; i < numFunctions; i++) { _aggregateResultHolders[i] = _aggFunctions[i].createGroupByResultHolder(maxInitialResultHolderCapacity, _numGroupsLimit); } _mergeResultHolder = null; } else { - _mergeResultHolder = new HashMap<>(); + _mergeResultHolder = new ArrayList<>(maxInitialResultHolderCapacity); _aggregateResultHolders = null; } + + _groupKeyToIdMap = new Object2IntOpenHashMap<>(); + _groupKeyToIdMap.defaultReturnValue(GroupKeyGenerator.INVALID_ID); } private int getNumGroupsLimit(Map customProperties, @Nullable AbstractPlanNode.NodeHint nodeHint) { @@ -152,43 +154,54 @@ public List getResult() { int numFunctions = _aggFunctions.length; int numColumns = numKeys + numFunctions; ColumnDataType[] resultStoredTypes = _resultSchema.getStoredColumnDataTypes(); - for (Map.Entry entry : _groupKeyToIdMap.entrySet()) { - Object[] row = new Object[numColumns]; - Object[] keyValues = entry.getKey().getValues(); - System.arraycopy(keyValues, 0, row, 0, numKeys); - int groupId = entry.getValue(); - for (int i = 0; i < numFunctions; i++) { - AggregationFunction func = _aggFunctions[i]; - int index = numKeys + i; - Object value; - switch (_aggType) { - case LEAF: - value = func.extractGroupByResult(_aggregateResultHolders[i], groupId); - break; - case INTERMEDIATE: - value = _mergeResultHolder.get(groupId)[i]; - break; - case FINAL: - value = func.extractFinalResult(_mergeResultHolder.get(groupId)[i]); - break; - case DIRECT: - Object intermediate = _aggFunctions[i].extractGroupByResult(_aggregateResultHolders[i], groupId); - value = func.extractFinalResult(intermediate); - break; - default: - throw new UnsupportedOperationException("Unsupported aggTyp: " + _aggType); + if (numKeys == 1) { + for (Object2IntMap.Entry entry : _groupKeyToIdMap.object2IntEntrySet()) { + Object[] row = new Object[numColumns]; + row[0] = entry.getKey(); + int groupId = entry.getIntValue(); + for (int i = 0; i < numFunctions; i++) { + row[i + 1] = getResultValue(i, groupId); + } + // Convert the results from AggregationFunction to the desired type + TypeUtils.convertRow(row, resultStoredTypes); + rows.add(row); + } + } else { + for (Object2IntMap.Entry entry : _groupKeyToIdMap.object2IntEntrySet()) { + Object[] row = new Object[numColumns]; + Object[] keyValues = ((Key) entry.getKey()).getValues(); + System.arraycopy(keyValues, 0, row, 0, numKeys); + int groupId = entry.getIntValue(); + for (int i = 0; i < numFunctions; i++) { + row[numKeys + i] = getResultValue(i, groupId); } - row[index] = value; + // Convert the results from AggregationFunction to the desired type + TypeUtils.convertRow(row, resultStoredTypes); + rows.add(row); } - // Convert the results from AggregationFunction to the desired type - TypeUtils.convertRow(row, resultStoredTypes); - rows.add(row); } return rows; } + private Object getResultValue(int functionId, int groupId) { + AggregationFunction aggFunction = _aggFunctions[functionId]; + switch (_aggType) { + case LEAF: + return aggFunction.extractGroupByResult(_aggregateResultHolders[functionId], groupId); + case INTERMEDIATE: + return _mergeResultHolder.get(groupId)[functionId]; + case FINAL: + return aggFunction.extractFinalResult(_mergeResultHolder.get(groupId)[functionId]); + case DIRECT: + Object intermediate = aggFunction.extractGroupByResult(_aggregateResultHolders[functionId], groupId); + return aggFunction.extractFinalResult(intermediate); + default: + throw new IllegalStateException("Unsupported aggType: " + _aggType); + } + } + public boolean isNumGroupsLimitReached() { - return _numGroupsLimitReached; + return _groupKeyToIdMap.size() == _numGroupsLimit; } private void processAggregate(TransferableBlock block) { @@ -243,15 +256,25 @@ private void processAggregate(TransferableBlock block) { } private void processMerge(TransferableBlock block) { - int[] intKeys = generateGroupByKeys(block); - int numRows = intKeys.length; + int[] groupByKeys = generateGroupByKeys(block); + int numRows = groupByKeys.length; int numFunctions = _aggFunctions.length; Object[][] intermediateResults = new Object[numFunctions][numRows]; for (int i = 0; i < numFunctions; i++) { intermediateResults[i] = AggregateOperator.getIntermediateResults(_aggFunctions[i], block); } for (int i = 0; i < numRows; i++) { - Object[] mergedResults = _mergeResultHolder.computeIfAbsent(intKeys[i], k -> new Object[numFunctions]); + int groupByKey = groupByKeys[i]; + if (groupByKey == GroupKeyGenerator.INVALID_ID) { + continue; + } + Object[] mergedResults; + if (_mergeResultHolder.size() == groupByKey) { + mergedResults = new Object[numFunctions]; + _mergeResultHolder.add(mergedResults); + } else { + mergedResults = _mergeResultHolder.get(groupByKey); + } for (int j = 0; j < numFunctions; j++) { AggregationFunction aggFunction = _aggFunctions[j]; Object intermediateResult = intermediateResults[j][i]; @@ -282,23 +305,35 @@ private int[] generateGroupByKeys(List rows) { int numRows = rows.size(); int[] intKeys = new int[numRows]; int numKeys = _groupKeyIds.length; - for (int i = 0; i < numRows; i++) { - Object[] row = rows.get(i); - Object[] keyValues = new Object[numKeys]; - for (int j = 0; j < numKeys; j++) { - keyValues[j] = row[_groupKeyIds[j]]; + if (numKeys == 1) { + int groupKeyId = _groupKeyIds[0]; + for (int i = 0; i < numRows; i++) { + intKeys[i] = getGroupId(rows.get(i)[groupKeyId]); + } + } else { + for (int i = 0; i < numRows; i++) { + Object[] row = rows.get(i); + Object[] keyValues = new Object[numKeys]; + for (int j = 0; j < numKeys; j++) { + keyValues[j] = row[_groupKeyIds[j]]; + } + intKeys[i] = getGroupId(new Key(keyValues)); } - intKeys[i] = getGroupId(new Key(keyValues)); } return intKeys; } private int[] generateGroupByKeys(DataBlock dataBlock) { - List keys = DataBlockExtractUtils.extractKeys(dataBlock, _groupKeyIds); - int numRows = keys.size(); + Object[] keys; + if (_groupKeyIds.length == 1) { + keys = DataBlockExtractUtils.extractColumn(dataBlock, _groupKeyIds[0]); + } else { + keys = DataBlockExtractUtils.extractKeys(dataBlock, _groupKeyIds); + } + int numRows = keys.length; int[] intKeys = new int[numRows]; for (int i = 0; i < numRows; i++) { - intKeys[i] = getGroupId(keys.get(i)); + intKeys[i] = getGroupId(keys[i]); } return intKeys; } @@ -316,37 +351,45 @@ private int[] generateGroupByKeys(List rows, int numMatchedRows, Roari int[] intKeys = new int[numMatchedRows]; int numKeys = _groupKeyIds.length; PeekableIntIterator iterator = matchedBitmap.getIntIterator(); - for (int i = 0; i < numMatchedRows; i++) { - int rowId = iterator.next(); - Object[] row = rows.get(rowId); - Object[] keyValues = new Object[numKeys]; - for (int j = 0; j < numKeys; j++) { - keyValues[j] = row[_groupKeyIds[j]]; + if (numKeys == 1) { + int groupKeyId = _groupKeyIds[0]; + for (int i = 0; i < numMatchedRows; i++) { + intKeys[i] = getGroupId(rows.get(iterator.next())[groupKeyId]); + } + } else { + for (int i = 0; i < numMatchedRows; i++) { + int rowId = iterator.next(); + Object[] row = rows.get(rowId); + Object[] keyValues = new Object[numKeys]; + for (int j = 0; j < numKeys; j++) { + keyValues[j] = row[_groupKeyIds[j]]; + } + intKeys[i] = getGroupId(new Key(keyValues)); } - intKeys[i] = getGroupId(new Key(keyValues)); } return intKeys; } private int[] generateGroupByKeys(DataBlock dataBlock, int numMatchedRows, RoaringBitmap matchedBitmap) { - List keys = DataBlockExtractUtils.extractKeys(dataBlock, _groupKeyIds, numMatchedRows, matchedBitmap); + Object[] keys; + if (_groupKeyIds.length == 1) { + keys = DataBlockExtractUtils.extractColumn(dataBlock, _groupKeyIds[0], numMatchedRows, matchedBitmap); + } else { + keys = DataBlockExtractUtils.extractKeys(dataBlock, _groupKeyIds, numMatchedRows, matchedBitmap); + } int[] intKeys = new int[numMatchedRows]; for (int i = 0; i < numMatchedRows; i++) { - intKeys[i] = getGroupId(keys.get(i)); + intKeys[i] = getGroupId(keys[i]); } return intKeys; } - private int getGroupId(Key key) { - Integer groupKey = _groupKeyToIdMap.computeIfAbsent(key, k -> { - int numGroupKeys = _groupKeyToIdMap.size(); - if (numGroupKeys == _numGroupsLimit) { - _numGroupsLimitReached = true; - return null; - } else { - return numGroupKeys; - } - }); - return groupKey != null ? groupKey : GroupKeyGenerator.INVALID_ID; + private int getGroupId(Object key) { + int numGroups = _groupKeyToIdMap.size(); + if (numGroups < _numGroupsLimit) { + return _groupKeyToIdMap.computeIntIfAbsent(key, k -> numGroups); + } else { + return _groupKeyToIdMap.getInt(key); + } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java index 60bcf66e1df9..ae8d0c75d452 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java @@ -18,12 +18,14 @@ */ package org.apache.pinot.query.runtime.operator.exchange; +import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.List; +import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.query.mailbox.SendingMailbox; -import org.apache.pinot.query.planner.partitioning.KeySelector; +import org.apache.pinot.query.planner.partitioning.KeySelectorFactory; import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.spi.exception.EarlyTerminationException; @@ -40,13 +42,15 @@ public abstract class BlockExchange { private final List _sendingMailboxes; private final BlockSplitter _splitter; - public static BlockExchange getExchange(List sendingMailboxes, RelDistribution.Type exchangeType, - KeySelector selector, BlockSplitter splitter) { - switch (exchangeType) { + public static BlockExchange getExchange(List sendingMailboxes, RelDistribution.Type distributionType, + @Nullable List distributionKeys, BlockSplitter splitter) { + switch (distributionType) { case SINGLETON: return new SingletonExchange(sendingMailboxes, splitter); case HASH_DISTRIBUTED: - return new HashExchange(sendingMailboxes, selector, splitter); + Preconditions.checkArgument(distributionKeys != null, + "Distribution keys must be provided for hash distribution"); + return new HashExchange(sendingMailboxes, KeySelectorFactory.getKeySelector(distributionKeys), splitter); case RANDOM_DISTRIBUTED: return new RandomExchange(sendingMailboxes, splitter); case BROADCAST_DISTRIBUTED: @@ -55,7 +59,7 @@ public static BlockExchange getExchange(List sendingMailboxes, R case RANGE_DISTRIBUTED: case ANY: default: - throw new UnsupportedOperationException("Unsupported mailbox exchange type: " + exchangeType); + throw new UnsupportedOperationException("Unsupported distribution type: " + distributionType); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java index c0ba262a3d81..3b3eeb1d03d4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.pinot.query.mailbox.SendingMailbox; +import org.apache.pinot.query.planner.partitioning.EmptyKeySelector; import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.TransferableBlock; @@ -32,39 +33,36 @@ * them up if necessary). */ class HashExchange extends BlockExchange { + private final KeySelector _keySelector; - // TODO: ensure that server instance list is sorted using same function in sender. - private final KeySelector _keySelector; - - HashExchange(List sendingMailboxes, KeySelector selector, - BlockSplitter splitter) { + HashExchange(List sendingMailboxes, KeySelector keySelector, BlockSplitter splitter) { super(sendingMailboxes, splitter); - _keySelector = selector; + _keySelector = keySelector; } @Override protected void route(List destinations, TransferableBlock block) throws Exception { int numMailboxes = destinations.size(); - if (numMailboxes == 1) { + if (numMailboxes == 1 || _keySelector == EmptyKeySelector.INSTANCE) { sendBlock(destinations.get(0), block); return; } - List[] destIdxToRows = new List[numMailboxes]; - List container = block.getContainer(); - for (Object[] row : container) { - int index = _keySelector.computeHash(row) % numMailboxes; - List rows = destIdxToRows[index]; - if (rows == null) { - rows = new ArrayList<>(); - destIdxToRows[index] = rows; - } - rows.add(row); + //noinspection unchecked + List[] mailboxIdToRowsMap = new List[numMailboxes]; + for (int i = 0; i < numMailboxes; i++) { + mailboxIdToRowsMap[i] = new ArrayList<>(); + } + List rows = block.getContainer(); + for (Object[] row : rows) { + int mailboxId = _keySelector.computeHash(row) % numMailboxes; + mailboxIdToRowsMap[mailboxId].add(row); } for (int i = 0; i < numMailboxes; i++) { - if (destIdxToRows[i] != null) { - sendBlock(destinations.get(i), new TransferableBlock(destIdxToRows[i], block.getDataSchema(), block.getType())); + if (!mailboxIdToRowsMap[i].isEmpty()) { + sendBlock(destinations.get(i), + new TransferableBlock(mailboxIdToRowsMap[i], block.getDataSchema(), block.getType())); } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java index 2e5812b53730..861cb4abc7ac 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java @@ -81,7 +81,7 @@ public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, OpChainEx @Override public MultiStageOperator visitMailboxSend(MailboxSendNode node, OpChainExecutionContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - return new MailboxSendOperator(context, nextOperator, node.getDistributionType(), node.getPartitionKeySelector(), + return new MailboxSendOperator(context, nextOperator, node.getDistributionType(), node.getDistributionKeys(), node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(), node.getReceiverStageId()); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index 21e50c99930c..459e022f859d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -39,7 +39,6 @@ import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.routing.TimeBoundaryInfo; -import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; @@ -207,12 +206,12 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t */ static void attachDynamicFilter(PinotQuery pinotQuery, JoinNode.JoinKeys joinKeys, List dataContainer, DataSchema dataSchema) { - FieldSelectionKeySelector leftSelector = (FieldSelectionKeySelector) joinKeys.getLeftJoinKeySelector(); - FieldSelectionKeySelector rightSelector = (FieldSelectionKeySelector) joinKeys.getRightJoinKeySelector(); + List leftJoinKeys = joinKeys.getLeftKeys(); + List rightJoinKeys = joinKeys.getRightKeys(); List expressions = new ArrayList<>(); - for (int i = 0; i < leftSelector.getColumnIndices().size(); i++) { - Expression leftExpr = pinotQuery.getSelectList().get(leftSelector.getColumnIndices().get(i)); - int rightIdx = rightSelector.getColumnIndices().get(i); + for (int i = 0; i < leftJoinKeys.size(); i++) { + Expression leftExpr = pinotQuery.getSelectList().get(leftJoinKeys.get(i)); + int rightIdx = rightJoinKeys.get(i); Expression inFilterExpr = RequestUtils.getFunctionExpression(FilterKind.IN.name()); List operands = new ArrayList<>(dataContainer.size() + 1); operands.add(leftExpr); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java index f7b85e5e6781..9abd2e78b503 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java @@ -33,7 +33,6 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; @@ -71,10 +70,8 @@ public void tearDown() _mocks.close(); } - private static JoinNode.JoinKeys getJoinKeys(List leftIdx, List rightIdx) { - FieldSelectionKeySelector leftSelect = new FieldSelectionKeySelector(leftIdx); - FieldSelectionKeySelector rightSelect = new FieldSelectionKeySelector(rightIdx); - return new JoinNode.JoinKeys(leftSelect, rightSelect); + private static JoinNode.JoinKeys getJoinKeys(List leftKeys, List rightKeys) { + return new JoinNode.JoinKeys(leftKeys, rightKeys); } private static List getJoinHints(Map hintsMap) { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java index 65307e681c8b..6279d0596e54 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java @@ -83,9 +83,7 @@ public void shouldSplitAndRouteBlocksBasedOnPartitionKey() Assert.assertEquals(captor.getValue().getContainer().get(0), new Object[]{2}); } - private static class TestSelector implements KeySelector { - private static final String HASH_ALGORITHM = "dummyHash"; - + private static class TestSelector implements KeySelector { private final Iterator _hashes; public TestSelector(Iterator hashes) { @@ -93,7 +91,7 @@ public TestSelector(Iterator hashes) { } @Override - public Object[] getKey(Object[] input) { + public Object getKey(Object[] input) { throw new UnsupportedOperationException("Should not be called"); } @@ -101,10 +99,5 @@ public Object[] getKey(Object[] input) { public int computeHash(Object[] input) { return _hashes.next(); } - - @Override - public String hashAlgorithm() { - return HASH_ALGORITHM; - } } }