diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java index c96d9d36..615f122f 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java @@ -20,9 +20,10 @@ import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.operator.Operator; + +import javax.annotation.Nullable; import java.io.Serializable; import java.util.Collection; -import javax.annotation.Nullable; /** * A dataset abstraction. diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java index 2f3ba96b..2d8c211c 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java @@ -79,6 +79,4 @@ public boolean isBounded() { public Collection> getConsumers() { return flow.getConsumersOf(this); } - - } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java index 2695d5fb..fc1b2bb2 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java @@ -16,14 +16,17 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.annotation.audience.Audience; -import cz.seznam.euphoria.core.client.functional.ExtractEventTime; import cz.seznam.euphoria.core.annotation.operator.Derived; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.functional.ExtractEventTime; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import java.util.Objects; +import java.util.Set; /** A convenient alias for assignment of event time. * @@ -87,9 +90,10 @@ public static class OutputBuilder implements Builders.Output { } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); - AssignEventTime op = new AssignEventTime<>(name, flow, input, eventTimeFn); + AssignEventTime op = new AssignEventTime<>(name, flow, input, eventTimeFn, + Sets.newHashSet(outputHints)); flow.add(op); return op.output(); } @@ -98,8 +102,8 @@ public Dataset output() { private final ExtractEventTime eventTimeFn; AssignEventTime(String name, Flow flow, Dataset input, - ExtractEventTime eventTimeFn) { - super(name, flow, input); + ExtractEventTime eventTimeFn, Set outputHints) { + super(name, flow, input, outputHints); this.eventTimeFn = eventTimeFn; } @@ -107,7 +111,7 @@ public Dataset output() { public DAG> getBasicOps() { return DAG.of(new FlatMap<>( getName(), getFlow(), input, - (i, c) -> c.collect(i), eventTimeFn)); + (i, c) -> c.collect(i), eventTimeFn, getHints())); } /** diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Builders.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Builders.java index ce0773e7..485a0a8f 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Builders.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Builders.java @@ -20,10 +20,9 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.util.Pair; -import java.util.Set; - /** * Common methods used in operator builders to share related javadoc * descriptions.

@@ -94,7 +93,7 @@ public interface Output { * * @return the dataset representing the new operator's output */ - Dataset output(); + Dataset output(OutputHint... outputHints); } public interface OutputValues extends Output> { @@ -106,24 +105,13 @@ public interface OutputValues extends Output> { * * @return the dataset representing the new operator's output */ - default Dataset outputValues() { + default Dataset outputValues(OutputHint... outputHints) { return MapElements .named("extract-values") .of(output()) .using(Pair::getSecond) - .output(); + .output(outputHints); } } - public interface OutputWithHint extends Output { - - /** - * Add runtime specific hints for the operator - * - * @param hints runtime specific hints - * @return output builder - */ - Output withHints(Set hints); - } - } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java index 96f868f8..c0583225 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java @@ -23,11 +23,14 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; import java.util.Objects; +import java.util.Set; /** * Operator counting elements with same key. @@ -49,7 +52,7 @@ ) public class CountByKey extends StateAwareWindowWiseSingleInputOperator< - IN, IN, IN, KEY, Pair, W, CountByKey> { + IN, IN, IN, KEY, Pair, W, CountByKey> { public static class OfBuilder implements Builders.Of { private final String name; @@ -100,8 +103,8 @@ public static class WindowingBuilder } @Override - public Dataset> output() { - return windowBy(null).output(); + public Dataset> output(OutputHint... outputHints) { + return windowBy(null).output(outputHints); } } @@ -122,10 +125,10 @@ public static class OutputBuilder } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); CountByKey count = new CountByKey<>( - name, flow, input, keyExtractor, windowing); + name, flow, input, keyExtractor, windowing, Sets.newHashSet(outputHints)); flow.add(count); return count.output(); } @@ -163,20 +166,22 @@ public static OfBuilder named(String name) { Flow flow, Dataset input, UnaryFunction extractor, - @Nullable Windowing windowing) { + @Nullable Windowing windowing, + Set outputHints) { - super(name, flow, input, extractor, windowing); + super(name, flow, input, extractor, windowing, outputHints); } @Override public DAG> getBasicOps() { SumByKey sum = new SumByKey<>( - getName(), - input.getFlow(), - input, - keyExtractor, - e -> 1L, - windowing); + getName(), + input.getFlow(), + input, + keyExtractor, + e -> 1L, + windowing, + getHints()); return DAG.of(sum); } } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java index 7129dade..099a6cfc 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java @@ -24,11 +24,15 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Objects; +import java.util.Set; /** * Operator outputting distinct (based on {@link Object#equals}) elements. @@ -126,8 +130,7 @@ private WindowingBuilder( return new OutputBuilder<>(name, input, mapper, windowing); } - @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { return new OutputBuilder<>(name, input, mapper, null).output(); } } @@ -149,10 +152,10 @@ public static class OutputBuilder } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); Distinct distinct = new Distinct<>( - name, flow, input, mapper, windowing); + name, flow, input, mapper, windowing, Sets.newHashSet(outputHints)); flow.add(distinct); return distinct.output(); } @@ -190,9 +193,10 @@ public static OfBuilder named(String name) { Flow flow, Dataset input, UnaryFunction mapper, - @Nullable Windowing windowing) { + @Nullable Windowing windowing, + Set outputHints) { - super(name, flow, input, mapper, windowing); + super(name, flow, input, mapper, windowing, outputHints); } @Override @@ -203,10 +207,11 @@ public static OfBuilder named(String name) { new ReduceByKey<>(name, flow, input, getKeyExtractor(), e -> null, windowing, - (CombinableReduceFunction) e -> null); + (CombinableReduceFunction) e -> null, + Collections.emptySet()); MapElements format = new MapElements<>( - getName() + "::" + "Map", flow, reduce.output(), Pair::getFirst); + getName() + "::" + "Map", flow, reduce.output(), Pair::getFirst, getHints()); DAG> dag = DAG.of(reduce); dag.add(format, reduce); diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ElementWiseOperator.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ElementWiseOperator.java index 7c967833..ab3b5b5a 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ElementWiseOperator.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ElementWiseOperator.java @@ -18,6 +18,9 @@ import cz.seznam.euphoria.core.annotation.audience.Audience; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; + +import java.util.Set; /** * Operator working element-wise, with no context between elements. @@ -29,9 +32,9 @@ public abstract class ElementWiseOperator protected final Dataset output; - protected ElementWiseOperator(String name, Flow flow, Dataset input) { + protected ElementWiseOperator(String name, Flow flow, Dataset input, Set outputHints) { super(name, flow, input); - this.output = createOutput(input); + this.output = createOutput(input, outputHints); } @Override diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java index 10ebd126..112cb3cb 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java @@ -21,9 +21,12 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryPredicate; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import java.util.Objects; +import java.util.Set; /** * Operator performing a filter operation. @@ -94,9 +97,9 @@ private OutputBuilder(String name, Dataset input, UnaryPredicate predica } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); - Filter filter = new Filter<>(name, flow, input, predicate); + Filter filter = new Filter<>(name, flow, input, predicate, Sets.newHashSet(outputHints)); flow.add(filter); return filter.output(); @@ -134,8 +137,11 @@ public static OfBuilder named(String name) { final UnaryPredicate predicate; - Filter(String name, Flow flow, Dataset input, UnaryPredicate predicate) { - super(name, flow, input); + Filter(String name, + Flow flow, Dataset input, + UnaryPredicate predicate, + Set outputHints) { + super(name, flow, input, outputHints); this.predicate = predicate; } @@ -151,6 +157,8 @@ public UnaryPredicate getPredicate() { if (predicate.apply(elem)) { collector.collect(elem); } - }, null)); + }, + null, + getHints())); } } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java index c7db0553..9481616d 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java @@ -16,16 +16,20 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.annotation.audience.Audience; -import cz.seznam.euphoria.core.client.functional.ExtractEventTime; import cz.seznam.euphoria.core.annotation.operator.Basic; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.functional.ExtractEventTime; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.io.Collector; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Objects; +import java.util.Set; /** * A transformation of a dataset from one type into another allowing user code @@ -142,9 +146,9 @@ public static class EventTimeBuilder implements Builders.Output { } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { return new OutputBuilder<>( - this.using.name, this.using.input, this.functor, null).output(); + this.using.name, this.using.input, this.functor, null).output(outputHints); } } @@ -166,9 +170,10 @@ public static class OutputBuilder implements Builders.Output { } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); - FlatMap map = new FlatMap<>(name, flow, input, functor, evtTimeFn); + FlatMap map = new FlatMap<>(name, flow, input, functor, evtTimeFn, + Sets.newHashSet(outputHints)); flow.add(map); return map.output(); } @@ -207,12 +212,19 @@ public static OfBuilder named(String name) { FlatMap(String name, Flow flow, Dataset input, UnaryFunctor functor, - @Nullable ExtractEventTime evtTimeFn) { - super(name, flow, input); + @Nullable ExtractEventTime evtTimeFn, + Set outputHints) { + super(name, flow, input, outputHints); this.functor = functor; this.eventTimeFn = evtTimeFn; } + FlatMap(String name, Flow flow, Dataset input, + UnaryFunctor functor, + @Nullable ExtractEventTime evtTimeFn) { + this(name, flow, input, functor, evtTimeFn, Collections.emptySet()); + } + /** * Retrieves the user defined map function to be applied to this operator's * input elements. diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FullJoin.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FullJoin.java index 005e9672..0afa10a3 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FullJoin.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FullJoin.java @@ -40,7 +40,6 @@ *

  • {@code by .......................} {@link UnaryFunction}s transforming left and right elements into keys *
  • {@code using ....................} {@link BinaryFunctor} receiving left and right element from joined window *
  • {@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing - *
  • {@code [withHints] ..............} specify hints about runtime data characteristics, see {@link JoinHint} *
  • {@code (output | outputValues) ..} build output dataset * * diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java index 335819e6..30cb2a52 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java @@ -26,6 +26,7 @@ import cz.seznam.euphoria.core.client.functional.BinaryFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.io.Collector; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.State; @@ -35,11 +36,11 @@ import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.executor.graph.DAG; import cz.seznam.euphoria.shadow.com.google.common.annotations.VisibleForTesting; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Objects; import java.util.Set; @@ -58,7 +59,6 @@ *
  • {@code by .......................} {@link UnaryFunction}s transforming left and right elements into keys *
  • {@code using ....................} {@link BinaryFunctor} receiving left and right element from joined window *
  • {@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing - *
  • {@code [withHints] ..............} specify hints about runtime data characteristics, see {@link JoinHint} *
  • {@code (output | outputValues) ..} build output dataset * * @@ -74,8 +74,7 @@ ) public class Join extends StateAwareWindowWiseOperator, - Either, KEY, Pair, W, Join> - implements HintAware, Builders.OutputValues { + Either, KEY, Pair, W, Join> { public enum Type { INNER, @@ -156,7 +155,7 @@ public Join.WindowingBuilder using( } public static class WindowingBuilder - implements Builders.OutputWithHint, JoinHint>, + implements Builders.Output>, Builders.OutputValues, OptionalMethodBuilder> { @@ -186,69 +185,19 @@ public static class WindowingBuilder } @Override - public Dataset> output() { - return windowBy(null).withHints(Collections.emptySet()).output(); + public Dataset> output(OutputHint... outputHints) { + return windowBy(null).output(outputHints); } - @Override - public OutputBuilder withHints(Set hints) { - return windowBy(null).withHints(hints); - } - - public HintBuilderOutput windowBy( + public OutputBuilder windowBy( Windowing, W> windowing) { - return new HintBuilderOutput<>(name, left, right, leftKeyExtractor, - rightKeyExtractor, joinFunc, type, windowing); - } - } - - public static class HintBuilderOutput - implements Builders.OutputWithHint, JoinHint>, - Builders.OutputValues { - - private final String name; - private final Dataset left; - private final Dataset right; - private final UnaryFunction leftKeyExtractor; - private final UnaryFunction rightKeyExtractor; - private final BinaryFunctor joinFunc; - private final Type type; - - @Nullable - private final Windowing, W> windowing; - - HintBuilderOutput(String name, - Dataset left, - Dataset right, - UnaryFunction leftKeyExtractor, - UnaryFunction rightKeyExtractor, - BinaryFunctor joinFunc, - Type type, - @Nullable Windowing, W> windowing) { - this.name = Objects.requireNonNull(name); - this.left = Objects.requireNonNull(left); - this.right = Objects.requireNonNull(right); - this.leftKeyExtractor = Objects.requireNonNull(leftKeyExtractor); - this.rightKeyExtractor = Objects.requireNonNull(rightKeyExtractor); - this.joinFunc = Objects.requireNonNull(joinFunc); - this.type = Objects.requireNonNull(type); - this.windowing = windowing; - } - - @Override - public Dataset> output() { - return withHints(Collections.emptySet()).output(); - } - - @Override - public OutputBuilder withHints(Set hints) { return new OutputBuilder<>(name, left, right, leftKeyExtractor, - rightKeyExtractor, joinFunc, type, windowing, hints); + rightKeyExtractor, joinFunc, type, windowing); } } public static class OutputBuilder - implements Builders.OutputValues { + implements Builders.OutputValues, Builders.Output> { private final String name; private final Dataset left; @@ -260,7 +209,6 @@ public static class OutputBuilder @Nullable private final Windowing, W> windowing; - private final Set hints; OutputBuilder(String name, Dataset left, @@ -269,8 +217,7 @@ public static class OutputBuilder UnaryFunction rightKeyExtractor, BinaryFunctor joinFunc, Type type, - @Nullable Windowing, W> windowing, - Set hints) { + @Nullable Windowing, W> windowing) { this.name = Objects.requireNonNull(name); this.left = Objects.requireNonNull(left); this.right = Objects.requireNonNull(right); @@ -279,15 +226,14 @@ public static class OutputBuilder this.joinFunc = Objects.requireNonNull(joinFunc); this.type = Objects.requireNonNull(type); this.windowing = windowing; - this.hints = Objects.requireNonNull(hints); } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { final Flow flow = left.getFlow(); final Join join = new Join<>( name, flow, left, right, leftKeyExtractor, - rightKeyExtractor, joinFunc, type, windowing, hints); + rightKeyExtractor, joinFunc, type, windowing, Sets.newHashSet(outputHints)); flow.add(join); return join.output(); } @@ -302,7 +248,6 @@ public Dataset> output() { @VisibleForTesting final UnaryFunction rightKeyExtractor; private final Type type; - private final Set hints; Join(String name, Flow flow, @@ -312,7 +257,7 @@ public Dataset> output() { BinaryFunctor functor, Type type, @Nullable Windowing, W> windowing, - Set hints) { + Set outputHints) { super(name, flow, windowing, (Either elem) -> { if (elem.isLeft()) { return leftKeyExtractor.apply(elem.left()); @@ -325,10 +270,9 @@ public Dataset> output() { this.rightKeyExtractor = rightKeyExtractor; this.functor = functor; @SuppressWarnings("unchecked") - Dataset> output = createOutput((Dataset) left); + Dataset> output = createOutput((Dataset) left, outputHints); this.output = output; this.type = type; - this.hints = Objects.requireNonNull(hints); } @Override @@ -563,11 +507,6 @@ public BinaryFunctor getJoiner() { return functor; } - @Override - public Set getHints() { - return hints; - } - @Override @SuppressWarnings("unchecked") public DAG> getBasicOps() { @@ -580,8 +519,8 @@ public Set getHints() { getName() + "::Map-right", flow, right, Either::right); final Union> union = - new Union<>(getName() + "::Union", flow, Arrays.asList( - leftMap.output(), rightMap.output())); + new Union<>(getName() + "::Union", flow, + Arrays.asList(leftMap.output(), rightMap.output())); final ReduceStateByKey, KEY, Either, OUT, StableJoinState, W> reduce = new ReduceStateByKey( @@ -596,12 +535,12 @@ public Set getHints() { return ctx == null ? new StableJoinState(storages) : new EarlyEmittingJoinState(storages, ctx); - }, new StateSupport.MergeFromStateMerger<>()); + }, new StateSupport.MergeFromStateMerger<>(), + getHints()); final DAG> dag = DAG.of(leftMap, rightMap); dag.add(union, leftMap, rightMap); dag.add(reduce, union); return dag; } - } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHint.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHint.java deleted file mode 100644 index 63d67aac..00000000 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHint.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2016-2018 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.core.client.operator; - -import cz.seznam.euphoria.core.annotation.audience.Audience; - -@Audience(Audience.Type.INTERNAL) -public interface JoinHint extends Hint { - -} diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHints.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHints.java deleted file mode 100644 index 02b30ea1..00000000 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHints.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2016-2018 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.core.client.operator; - -import cz.seznam.euphoria.core.annotation.audience.Audience; - -@Audience(Audience.Type.CLIENT) -public class JoinHints { - - private static final BroadcastHashJoin BROADCAST_HASH_JOIN = new BroadcastHashJoin(); - - public static BroadcastHashJoin broadcastHashJoin() { - return BROADCAST_HASH_JOIN; - } - - /** - * Broadcasts optional join side to all executors. - */ - public static class BroadcastHashJoin implements JoinHint { - - private BroadcastHashJoin() { - - } - } - -} diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/LeftJoin.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/LeftJoin.java index b2ebc28c..03039f41 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/LeftJoin.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/LeftJoin.java @@ -40,7 +40,6 @@ *
  • {@code by .......................} {@link UnaryFunction}s transforming left and right elements into keys *
  • {@code using ....................} {@link BinaryFunctor} receiving left and right element from joined window *
  • {@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing - *
  • {@code [withHints] ..............} specify hints about runtime data characteristics, see {@link JoinHint} *
  • {@code (output | outputValues) ..} build output dataset * * diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java index 85274370..bb117183 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java @@ -22,9 +22,13 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.functional.UnaryFunctionEnv; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; +import java.util.Collections; import java.util.Objects; +import java.util.Set; /** * Simple one-to-one transformation of input elements. It is a special case of @@ -107,9 +111,9 @@ public static class OutputBuilder implements Builders.Output { } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); - MapElements map = new MapElements<>(name, flow, input, mapper); + MapElements map = new MapElements<>(name, flow, input, mapper, Sets.newHashSet(outputHints)); flow.add(map); return map.output(); @@ -150,14 +154,23 @@ public static OfBuilder named(String name) { Flow flow, Dataset input, UnaryFunction mapper) { - this(name, flow, input, (el, ctx) -> mapper.apply(el)); + this(name, flow, input, (el, ctx) -> mapper.apply(el), Collections.emptySet()); } MapElements(String name, Flow flow, Dataset input, - UnaryFunctionEnv mapper) { - super(name, flow, input); + UnaryFunction mapper, + Set outputHints) { + this(name, flow, input, (el, ctx) -> mapper.apply(el), outputHints); + } + + + MapElements(String name, + Flow flow, + Dataset input, + UnaryFunctionEnv mapper, Set outputHints) { + super(name, flow, input, outputHints); this.mapper = mapper; } @@ -171,7 +184,7 @@ public static OfBuilder named(String name) { return DAG.of( // do not use the client API here, because it modifies the Flow! new FlatMap(getName(), getFlow(), input, - (i, c) -> c.collect(mapper.apply(i, c.asContext())), null)); + (i, c) -> c.collect(mapper.apply(i, c.asContext())), null, getHints())); } public UnaryFunctionEnv getMapper() { diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java index ff8431e4..f028a6cb 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java @@ -19,10 +19,12 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.dataset.Datasets; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.executor.graph.DAG; import java.io.Serializable; import java.util.Collection; +import java.util.Set; /** * An operator base class. All operators inherit his class. @@ -35,6 +37,8 @@ public abstract class Operator implements Serializable { /** Associated Flow. */ private final Flow flow; + private Set hints; + protected Operator(String name, Flow flow) { this.name = name; this.flow = flow; @@ -69,11 +73,16 @@ public final Flow getFlow() { * * @return a newly created dataset associated with this operator as its output */ - final Dataset createOutput(final Dataset input) { + final Dataset createOutput(final Dataset input, Set outputHints) { Flow flow = input.getFlow(); + this.hints = outputHints; return Datasets.createOutputFor(flow, input, this); } + public Set getHints() { + return hints; + } + /** * @return the output dataset */ diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java index 81aed03c..a74f28b1 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java @@ -27,10 +27,10 @@ import cz.seznam.euphoria.core.client.functional.ReduceFunction; import cz.seznam.euphoria.core.client.functional.ReduceFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.ExternalIterable; import cz.seznam.euphoria.core.client.io.SpillTools; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.State; @@ -40,11 +40,15 @@ import cz.seznam.euphoria.core.client.operator.state.ValueStorage; import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.core.executor.graph.DAG; import cz.seznam.euphoria.core.executor.util.SingleValueContext; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Comparator; import java.util.Objects; +import java.util.Set; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -94,8 +98,8 @@ public class ReduceByKey extends StateAwareWindowWiseSingleInputOperator< IN, IN, IN, KEY, Pair, W, - ReduceByKey> - implements Builders.OutputValues { + ReduceByKey> { + public static class OfBuilder implements Builders.Of { private final String name; @@ -279,14 +283,15 @@ public static class DatasetBuilder4 } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { return new DatasetBuilder5<>( name, input, keyExtractor, valueExtractor, - reducer, null, valuesComparator).output(); + reducer, null, valuesComparator).output(outputHints); } } + public static class SortableDatasetBuilder4 extends DatasetBuilder4 { @@ -319,7 +324,6 @@ public DatasetBuilder4 withSortedValues( } - public static class DatasetBuilder5 extends DatasetBuilder4 implements Builders.OutputValues { @@ -341,11 +345,11 @@ public static class DatasetBuilder5 } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); ReduceByKey reduce = new ReduceByKey<>( - name, flow, input, keyExtractor, valueExtractor, - windowing, reducer, valuesComparator); + name, flow, input, keyExtractor, valueExtractor, + windowing, reducer, valuesComparator, Sets.newHashSet(outputHints)); flow.add(reduce); return reduce.output(); } @@ -391,14 +395,14 @@ public static OfBuilder named(String name) { UnaryFunction keyExtractor, UnaryFunction valueExtractor, @Nullable Windowing windowing, - CombinableReduceFunction reducer) { + CombinableReduceFunction reducer, + Set outputHints) { this( name, flow, input, keyExtractor, valueExtractor, windowing, (ReduceFunctor) toReduceFunctor(reducer), - null); + null, outputHints); } - ReduceByKey(String name, Flow flow, Dataset input, @@ -406,9 +410,10 @@ public static OfBuilder named(String name) { UnaryFunction valueExtractor, @Nullable Windowing windowing, ReduceFunctor reducer, - @Nullable BinaryFunction valueComparator) { + @Nullable BinaryFunction valueComparator, + Set outputHints) { - super(name, flow, input, keyExtractor, windowing); + super(name, flow, input, keyExtractor, windowing, outputHints); this.reducer = reducer; this.valueExtractor = valueExtractor; this.valueComparator = valueComparator; @@ -438,7 +443,7 @@ public UnaryFunction getValueExtractor() { Operator reduceState = new ReduceStateByKey(getName(), flow, input, keyExtractor, valueExtractor, windowing, - stateFactory, stateCombine); + stateFactory, stateCombine, getHints()); return DAG.of(reduceState); } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java index 4c1778c7..2d1978fd 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java @@ -23,13 +23,17 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.operator.state.State; import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.operator.state.StateMerger; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Objects; +import java.util.Set; /** * A {@link ReduceStateByKey} operator is a stateful, complex, lower-level-api, @@ -277,10 +281,10 @@ public static class DatasetBuilder5< } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { return new DatasetBuilder6<>(name, input, keyExtractor, valueExtractor, stateFactory, stateMerger, null) - .output(); + .output(outputHints); } } @@ -303,13 +307,13 @@ public static class DatasetBuilder6< } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); ReduceStateByKey reduceStateByKey = new ReduceStateByKey<>(name, flow, input, keyExtractor, valueExtractor, - windowing, stateFactory, stateMerger); + windowing, stateFactory, stateMerger, Sets.newHashSet(outputHints)); flow.add(reduceStateByKey); return reduceStateByKey.output(); @@ -355,9 +359,9 @@ public static OfBuilder named(String name) { UnaryFunction valueExtractor, @Nullable Windowing windowing, StateFactory stateFactory, - StateMerger stateMerger) - { - super(name, flow, input, keyExtractor, windowing); + StateMerger stateMerger, + Set outputHints) { + super(name, flow, input, keyExtractor, windowing, outputHints); this.stateFactory = stateFactory; this.valueExtractor = valueExtractor; this.stateCombiner = stateMerger; diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java index 9afe5343..29a5d4ad 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java @@ -27,12 +27,13 @@ import cz.seznam.euphoria.core.client.functional.ReduceFunction; import cz.seznam.euphoria.core.client.functional.ReduceFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.util.Pair; -import java.util.stream.Stream; +import cz.seznam.euphoria.core.executor.graph.DAG; import javax.annotation.Nullable; +import java.util.Collections; +import java.util.stream.Stream; /** * Reduces all elements in a window. The operator corresponds to @@ -278,7 +279,7 @@ private ReduceWindow( ReduceFunctor reducer, @Nullable BinaryFunction valueComparator) { - super(name, flow, input, e -> B_ZERO, windowing); + super(name, flow, input, e -> B_ZERO, windowing, Collections.emptySet()); this.reducer = reducer; this.valueExtractor = valueExtractor; this.valueComparator = valueComparator; @@ -298,7 +299,7 @@ public ReduceFunctor getReducer() { rbk = new ReduceByKey<>( getName() + "::ReduceByKey", getFlow(), input, getKeyExtractor(), valueExtractor, - windowing, reducer, valueComparator); + windowing, reducer, valueComparator, getHints()); dag.add(rbk); } else { // otherwise we use attached windowing, therefore @@ -314,7 +315,7 @@ public ReduceFunctor getReducer() { rbk = new ReduceByKey<>( getName() + "::ReduceByKey::attached", getFlow(), map.output(), Pair::getFirst, p -> valueExtractor.apply(p.getSecond()), - null, reducer, valueComparator); + null, reducer, valueComparator, getHints()); dag.add(map); dag.add(rbk); } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/RightJoin.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/RightJoin.java index 0c2043d6..c665d098 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/RightJoin.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/RightJoin.java @@ -40,7 +40,6 @@ *
  • {@code by .......................} {@link UnaryFunction}s transforming left and right elements into keys *
  • {@code using ....................} {@link BinaryFunctor} receiving left and right element from joined window *
  • {@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing - *
  • {@code [withHints] ..............} specify hints about runtime data characteristics, see {@link JoinHint} *
  • {@code (output | outputValues) ..} build output dataset * * diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java index 62e944c3..3459ebe7 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java @@ -17,14 +17,16 @@ import cz.seznam.euphoria.core.annotation.audience.Audience; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import javax.annotation.Nullable; import java.util.Collection; import java.util.Collections; +import java.util.Set; /** * Operator operating on window level with state information. @@ -39,14 +41,14 @@ public class StateAwareWindowWiseSingleInputOperator< private final Dataset output; protected StateAwareWindowWiseSingleInputOperator( - String name, - Flow flow, Dataset input, - UnaryFunction extractor, - @Nullable Windowing windowing) { + String name, + Flow flow, Dataset input, + UnaryFunction extractor, + @Nullable Windowing windowing, Set outputHints) { super(name, flow, windowing, extractor); this.input = input; - this.output = createOutput(input); + this.output = createOutput(input, outputHints); } @Override diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java index 9ab036a0..681a2b41 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java @@ -23,12 +23,16 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.client.util.Sums; +import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Objects; +import java.util.Set; /** * Operator for summing of long values extracted from elements. The sum is operated upon @@ -93,7 +97,7 @@ public ValueByBuilder keyBy(UnaryFunction keyExtractor) } public static class ValueByBuilder implements Builders.WindowBy>, - Builders.Output>, + Builders.Output>, Builders.OutputValues { @@ -118,9 +122,9 @@ public ByBuilder2 valueBy(UnaryFunction valueExtractor) { } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { return new OutputBuilder<>(name, input, keyExtractor, e -> 1L, null) - .output(); + .output(outputHints); } } @@ -154,7 +158,7 @@ public static class ByBuilder2 } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { return new OutputBuilder<>( name, input, keyExtractor, valueExtractor, null) .output(); @@ -177,12 +181,12 @@ public static class OutputBuilder this.windowing = windowing; } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); SumByKey sumByKey = new SumByKey<>( name, flow, input, keyExtractor, valueExtractor, - windowing); + windowing, Sets.newHashSet(outputHints)); flow.add(sumByKey); return sumByKey.output(); } @@ -223,9 +227,18 @@ public static OfBuilder named(String name) { Dataset input, UnaryFunction keyExtractor, UnaryFunction valueExtractor, - @Nullable Windowing windowing) - { - super(name, flow, input, keyExtractor, windowing); + @Nullable Windowing windowing) { + this(name, flow, input, keyExtractor, valueExtractor, windowing, Collections.emptySet()); + } + + SumByKey(String name, + Flow flow, + Dataset input, + UnaryFunction keyExtractor, + UnaryFunction valueExtractor, + @Nullable Windowing windowing, + Set outputHints) { + super(name, flow, input, keyExtractor, windowing, outputHints); this.valueExtractor = valueExtractor; } @@ -233,7 +246,7 @@ public static OfBuilder named(String name) { public DAG> getBasicOps() { ReduceByKey reduceByKey = new ReduceByKey<>(getName(), input.getFlow(), input, - keyExtractor, valueExtractor, windowing, Sums.ofLongs()); + keyExtractor, valueExtractor, windowing, Sums.ofLongs(), getHints()); return DAG.of(reduceByKey); } } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java index da9ed2e3..ce46acf2 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java @@ -23,8 +23,8 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; import cz.seznam.euphoria.core.client.io.Collector; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.operator.state.State; import cz.seznam.euphoria.core.client.operator.state.StateContext; import cz.seznam.euphoria.core.client.operator.state.StorageProvider; @@ -32,8 +32,12 @@ import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.client.util.Triple; +import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Set; import static java.util.Objects.requireNonNull; @@ -222,9 +226,9 @@ public static class WindowByBuilder> } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { return new OutputBuilder<>( - name, input, keyFn, valueFn, scoreFn, null).output(); + name, input, keyFn, valueFn, scoreFn, null).output(outputHints); } } @@ -247,11 +251,11 @@ public static class OutputBuilder< } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); TopPerKey top = new TopPerKey<>(flow, name, input, keyFn, valueFn, - scoreFn, windowing); + scoreFn, windowing, Sets.newHashSet(outputHints)); flow.add(top); return top.output(); } @@ -296,8 +300,9 @@ public static OfBuilder named(String name) { UnaryFunction keyFn, UnaryFunction valueFn, UnaryFunction scoreFn, - @Nullable Windowing windowing) { - super(name, flow, input, keyFn, windowing); + @Nullable Windowing windowing, + Set outputHints) { + super(name, flow, input, keyFn, windowing, outputHints); this.valueFn = valueFn; this.scoreFn = scoreFn; @@ -326,7 +331,7 @@ public UnaryFunction getScoreExtractor() { windowing, (StateContext context, Collector> collector) -> { return new MaxScored<>(context.getStorageProvider()); - }, stateCombiner); + }, stateCombiner, Collections.emptySet()); MapElements>, Triple> format = @@ -334,7 +339,8 @@ public UnaryFunction getScoreExtractor() { e -> Triple.of( e.getFirst(), e.getSecond().getFirst(), - e.getSecond().getSecond())); + e.getSecond().getSecond()), + getHints()); DAG> dag = DAG.of(reduce); dag.add(format, reduce); diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java index d12fde7c..f21fa5b3 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java @@ -20,12 +20,16 @@ import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.shadow.com.google.common.base.Preconditions; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; /** * The union of at least two datasets of the same type.

    @@ -119,9 +123,9 @@ public static class OutputBuilder } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { final Flow flow = dataSets.get(0).getFlow(); - final Union union = new Union<>(name, flow, dataSets); + final Union union = new Union<>(name, flow, dataSets, Sets.newHashSet(outputHints)); flow.add(union); return union.output(); } @@ -176,6 +180,11 @@ public static OfBuilder named(String name) { @SuppressWarnings("unchecked") Union(String name, Flow flow, List> dataSets) { + this(name, flow, dataSets, Collections.emptySet()); + } + + @SuppressWarnings("unchecked") + Union(String name, Flow flow, List> dataSets, Set outputHints) { super(name, flow); Preconditions.checkArgument( dataSets.size() > 1, @@ -184,7 +193,7 @@ public static OfBuilder named(String name) { dataSets.stream().map(Dataset::getFlow).distinct().count() == 1, "Only data sets from the same flow can be passed to Union."); this.dataSets = dataSets; - this.output = createOutput(dataSets.get(0)); + this.output = createOutput(dataSets.get(0), outputHints); } /** diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Hint.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHint.java similarity index 81% rename from euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Hint.java rename to euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHint.java index 18d2bab6..65b831ef 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Hint.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHint.java @@ -13,13 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.core.client.operator; +package cz.seznam.euphoria.core.client.operator.hint; import cz.seznam.euphoria.core.annotation.audience.Audience; import java.io.Serializable; +/** + * Specify hints about runtime data characteristics + */ @Audience(Audience.Type.INTERNAL) -public interface Hint extends Serializable { +public interface OutputHint extends Serializable { } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/HintAware.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/SizeHint.java similarity index 66% rename from euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/HintAware.java rename to euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/SizeHint.java index 6fec4b70..78fe3428 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/HintAware.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/SizeHint.java @@ -13,18 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.core.client.operator; +package cz.seznam.euphoria.core.client.operator.hint; import cz.seznam.euphoria.core.annotation.audience.Audience; -import java.util.Set; - -@Audience(Audience.Type.INTERNAL) -public interface HintAware { +/** + * Extra information for runner about Dataset size + */ +@Audience(Audience.Type.CLIENT) +public enum SizeHint implements OutputHint { /** - * Returns all hints for the operator. - * - * @return hints for the operator + * Indicate to runner that dataset can fit in memory and this information + * could be used for optimization (e.g. Broadcast hash join) */ - Set getHints(); + FITS_IN_MEMORY } diff --git a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/HintTest.java b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/HintTest.java new file mode 100644 index 00000000..681b0e02 --- /dev/null +++ b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/HintTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2016-2018 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.operator; + +import cz.seznam.euphoria.core.client.dataset.Dataset; +import cz.seznam.euphoria.core.client.dataset.windowing.Time; +import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.io.Collector; +import cz.seznam.euphoria.core.client.io.MockStreamDataSource; +import cz.seznam.euphoria.core.client.io.VoidSink; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; +import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.core.executor.Executor; +import cz.seznam.euphoria.core.executor.FlowUnfolder; +import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; +import org.junit.Test; + +import java.time.Duration; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class HintTest { + + /** + * Test every node in DAG which was unfolded from original operator, if preserves hints + */ + @Test + @SuppressWarnings("unchecked") + public void testHintsAfterUnfold() { + Flow flow = Flow.create(getClass().getSimpleName()); + Dataset input = flow.createInput(new MockStreamDataSource<>()); + + Dataset mapped = MapElements + .named("mapElementsFitInMemoryHint") + .of(input).using(e -> e).output(SizeHint.FITS_IN_MEMORY); + Dataset> reduced = ReduceByKey + .named("reduceByKeyTwoHints") + .of(mapped) + .keyBy(e -> e) + .reduceBy(values -> 1L) + .windowBy(Time.of(Duration.ofSeconds(1))) + .output(new Util.TestHint(), new Util.TestHint2()); + + Dataset mapped2 = MapElements + .named("mapElementsTestHint2") + .of(reduced).using(Pair::getFirst).output(new Util.TestHint2()); + mapped2.persist(new VoidSink<>()); + + Dataset> output = Join + .named("joinHint") + .of(mapped, reduced) + .by(e -> e, Pair::getFirst) + .using((Object l, Pair r, Collector c) -> c.collect(r.getSecond())) + .windowBy(Time.of(Duration.ofSeconds(1))) + .output(new Util.TestHint()); + + output.persist(new VoidSink<>()); + + DAG> unfolded = FlowUnfolder.unfold(flow, Executor.getBasicOps()); + + testNodesByName(unfolded, "mapElementsFitInMemoryHint", 1, Sets.newHashSet(SizeHint.FITS_IN_MEMORY)); + + testNodesByName(unfolded, "mapElementsTestHint2", 1, Sets.newHashSet(new Util.TestHint2())); + + testNodesByName(unfolded, "reduceByKeyTwoHints", 2, + Sets.newHashSet(new Util.TestHint(), new Util.TestHint2())); + + testNodesByName(unfolded, "joinHint::ReduceStateByKey", 1, Sets.newHashSet(new Util.TestHint())); + + } + + private void testNodesByName(DAG> unfolded, + String name, + int expectedHintCount, + Set expectedHints) { + unfolded.nodes() + .filter(node -> node.getName().equalsIgnoreCase(name)) + .forEach(operator -> { + assertEquals(expectedHintCount, operator.getHints().size()); + assertTrue(expectedHints.containsAll(operator.getHints())); + } + ); + } +} diff --git a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java index 903cb45e..0c5fbea3 100644 --- a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java +++ b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java @@ -19,8 +19,8 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.Collector; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Pair; -import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import org.junit.Test; import java.time.Duration; @@ -199,73 +199,92 @@ public void testBuild_Windowing() { } @Test + @SuppressWarnings("unchecked") public void testBuild_Hints() { Flow flow = Flow.create("TEST"); Dataset left = Util.createMockDataset(flow, 1); Dataset right = Util.createMockDataset(flow, 1); - Join.named("Join1") - .of(left, right) + + Dataset outputDataset = Join.named("Join1") + .of(MapElements.of(left).using(i -> i).output(new Util.TestHint(), new Util.TestHint2()), + right) .by(String::length, String::length) .using((String l, String r, Collector c) -> { // no-op }) - .withHints(Sets.newHashSet(new TestHint(), new TestHint2(), new TestHint2())) - .output(); + .outputValues(SizeHint.FITS_IN_MEMORY); + + assertTrue(outputDataset.getProducer().getHints().contains(SizeHint.FITS_IN_MEMORY)); + + Join join = (Join) flow.operators() + .stream() + .filter(op -> op instanceof Join) + .findFirst() + .get(); + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input).getProducer().getHints().contains(new Util.TestHint()))); + + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input).getProducer().getHints().contains(new Util.TestHint2()))); + + assertEquals(2, + ((Dataset) join.listInputs() + .stream() + .findFirst() + .get() + ).getProducer().getHints().size()); - Join join = (Join) flow.operators().iterator().next(); - assertTrue(join.getHints().contains(new TestHint())); - assertTrue(join.getHints().contains(new TestHint2())); - assertEquals(2, join.getHints().size()); } @Test + @SuppressWarnings("unchecked") public void testBuild_Hints_afterWindowing() { Flow flow = Flow.create("TEST"); Dataset left = Util.createMockDataset(flow, 1); Dataset right = Util.createMockDataset(flow, 1); Join.named("Join1") - .of(left, right) + .of(MapElements.of(left) + .using(i -> i) + .output(new Util.TestHint(), new Util.TestHint2(), new Util.TestHint2()), + right) .by(String::length, String::length) .using((String l, String r, Collector c) -> { // no-op }) .windowBy(Time.of(Duration.ofHours(1))) - .withHints(Sets.newHashSet(new TestHint(), new TestHint2(), new TestHint2())) .output(); - Join join = (Join) flow.operators().iterator().next(); - assertTrue(join.getHints().contains(new TestHint())); - assertTrue(join.getHints().contains(new TestHint2())); - assertEquals(2, join.getHints().size()); - assertTrue(join.getWindowing() instanceof Time); - } + Join join = (Join) flow.operators() + .stream() + .filter(op -> op instanceof Join) + .findFirst() + .get(); + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input).getProducer().getHints().contains(new Util.TestHint()))); + + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input) + .getProducer() + .getHints() + .contains(new Util.TestHint2()))); + + assertEquals(2, + ((Dataset) join.listInputs() + .stream() + .findFirst() + .get() + ).getProducer().getHints().size()); - private static class TestHint implements JoinHint { - - @Override - public int hashCode() { - return 0; - } - - @Override - public boolean equals(Object obj) { - return obj instanceof TestHint; - } - } - - private static class TestHint2 implements JoinHint { - - - @Override - public int hashCode() { - return 0; - } - - @Override - public boolean equals(Object obj) { - return obj instanceof TestHint2; - } + assertTrue(join.getWindowing() instanceof Time); } } \ No newline at end of file diff --git a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java index c8577920..96ce150d 100644 --- a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java +++ b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java @@ -17,11 +17,10 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; public class MapElementsTest { @@ -82,4 +81,18 @@ public void testBuild_ImplicitName() { MapElements map = (MapElements) flow.operators().iterator().next(); assertEquals("MapElements", map.getName()); } + + @Test + public void testBuild_Hints() { + Flow flow = Flow.create("TEST"); + Dataset dataset = Util.createMockDataset(flow, 1); + + Dataset dataSetWithHint = MapElements.of(dataset).using(i -> i).output(SizeHint.FITS_IN_MEMORY); + + assertTrue(dataSetWithHint.getProducer().getHints().contains(SizeHint.FITS_IN_MEMORY)); + assertEquals(1, dataSetWithHint.getProducer().getHints().size()); + + Dataset dataSetWithoutHint = MapElements.of(dataset).using(i -> i).output(); + assertEquals(0, dataSetWithoutHint.getProducer().getHints().size()); + } } \ No newline at end of file diff --git a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/Util.java b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/Util.java index 5f9ec314..7f144e6b 100644 --- a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/Util.java +++ b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/Util.java @@ -18,6 +18,7 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.ListDataSource; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import java.util.ArrayList; import java.util.List; @@ -33,4 +34,32 @@ public static Dataset createMockDataset(Flow flow, int numPartitions) { return flow.createInput(ListDataSource.bounded(partitions)); } + + + public static class TestHint implements OutputHint { + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof TestHint; + } + } + + public static class TestHint2 implements OutputHint { + + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof TestHint2; + } + } } diff --git a/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java b/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java index 377ab657..04daf9bb 100644 --- a/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java +++ b/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java @@ -24,11 +24,14 @@ import cz.seznam.euphoria.core.client.io.StdoutSink; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.core.client.operator.Join; +import cz.seznam.euphoria.core.client.operator.JoinTest; import cz.seznam.euphoria.core.client.operator.MapElements; import cz.seznam.euphoria.core.client.operator.Operator; import cz.seznam.euphoria.core.client.operator.ReduceByKey; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; import cz.seznam.euphoria.core.client.operator.Union; +import cz.seznam.euphoria.core.client.operator.Util; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.executor.FlowUnfolder.InputOperator; import cz.seznam.euphoria.core.executor.graph.DAG; diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java index 392b93b0..09541656 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java @@ -15,13 +15,15 @@ */ package cz.seznam.euphoria.flink.batch; +import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.BinaryFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.Join; -import cz.seznam.euphoria.core.client.operator.JoinHints; +import cz.seznam.euphoria.core.client.operator.Operator; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.executor.util.MultiValueContext; import cz.seznam.euphoria.flink.FlinkOperator; @@ -38,12 +40,22 @@ public class BroadcastHashJoinTranslator implements BatchOperatorTranslator { + @SuppressWarnings("unchecked") static boolean wantTranslate(Join o) { - return o.getHints().contains(JoinHints.broadcastHashJoin()) + + return o.listInputs() + .stream() + .anyMatch(input -> hasSizeHint(((Dataset) input).getProducer())) && (o.getType() == Join.Type.LEFT || o.getType() == Join.Type.RIGHT) && !(o.getWindowing() instanceof MergingWindowing); } + static boolean hasSizeHint(Operator operator) { + return operator != null && + operator.getHints() != null && + operator.getHints().contains(SizeHint.FITS_IN_MEMORY); + } + @Override @SuppressWarnings("unchecked") public DataSet translate(FlinkOperator operator, BatchExecutorContext context) { diff --git a/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java b/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java index 60b7f865..51efbe81 100644 --- a/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java +++ b/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java @@ -18,11 +18,11 @@ import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.io.DataSink; +import cz.seznam.euphoria.core.client.operator.Builders.Output; import cz.seznam.euphoria.core.client.operator.Distinct; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.core.client.operator.MapElements; import cz.seznam.euphoria.core.client.operator.Union; -import cz.seznam.euphoria.core.client.operator.Builders.Output; import cz.seznam.euphoria.core.executor.Executor; import static java.util.Objects.requireNonNull; diff --git a/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java b/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java index 607fb476..f82e0c3d 100644 --- a/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java +++ b/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java @@ -17,13 +17,13 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.io.Collector; -import cz.seznam.euphoria.core.client.operator.JoinHints; import cz.seznam.euphoria.core.client.operator.LeftJoin; +import cz.seznam.euphoria.core.client.operator.MapElements; import cz.seznam.euphoria.core.client.operator.RightJoin; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.operator.test.junit.AbstractOperatorTest; import cz.seznam.euphoria.operator.test.junit.Processing; -import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import org.junit.Test; import java.util.Arrays; @@ -40,11 +40,10 @@ public void leftBroadcastHashJoin() { @Override protected Dataset> getOutput( Dataset left, Dataset right) { - return LeftJoin.of(left, right) + return LeftJoin.of(left, MapElements.of(right).using(i -> i).output(SizeHint.FITS_IN_MEMORY)) .by(e -> e, e -> (int) (e % 10)) .using((Integer l, Optional r, Collector c) -> c.collect(l + "+" + r.orElse(null))) - .withHints(Sets.newHashSet(JoinHints.broadcastHashJoin())) .output(); } @@ -81,11 +80,10 @@ public void rightBroadcastHashJoin() { @Override protected Dataset> getOutput( Dataset left, Dataset right) { - return RightJoin.of(left, right) + return RightJoin.of(MapElements.of(left).using(i -> i).output(SizeHint.FITS_IN_MEMORY), right) .by(e -> e, e -> (int) (e % 10)) .using((Optional l, Long r, Collector c) -> c.collect(l.orElse(null) + "+" + r)) - .withHints(Sets.newHashSet(JoinHints.broadcastHashJoin())) .output(); } @@ -123,11 +121,10 @@ public void keyHashCollisionBroadcastHashJoin() { @Override protected Dataset> getOutput( Dataset left, Dataset right) { - return LeftJoin.of(left, right) + return LeftJoin.of(left, MapElements.of(right).using(i -> i).output(SizeHint.FITS_IN_MEMORY)) .by(e -> e, e -> e % 2 == 0 ? sameHashCodeKey2 : sameHashCodeKey1) .using((String l, Optional r, Collector c) -> c.collect(l + "+" + r.orElse(null))) - .withHints(Sets.newHashSet(JoinHints.broadcastHashJoin())) .output(); } diff --git a/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java b/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java index 0121c230..79dc4608 100644 --- a/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java +++ b/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java @@ -18,12 +18,14 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; +import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.Join; -import cz.seznam.euphoria.core.client.operator.JoinHints; +import cz.seznam.euphoria.core.client.operator.Operator; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Either; import cz.seznam.euphoria.core.client.util.Pair; import org.apache.spark.api.java.JavaPairRDD; @@ -47,25 +49,36 @@ * map side join with lookups to in memory hash table on non-optional side. *

    *

    - * In order to use this translator, you need to pass {@link JoinHints.BroadcastHashJoin} hint + * In order to use this translator, you need to have on one Dataset {@link SizeHint#FITS_IN_MEMORY} hint * to the {@link Join} operator. *

    */ public class BroadcastHashJoinTranslator implements SparkOperatorTranslator { + @SuppressWarnings("unchecked") static boolean wantTranslate(Join o) { - return o.getHints().contains(JoinHints.broadcastHashJoin()) + return o.listInputs() + .stream() + .anyMatch(input -> hasSizeHint(((Dataset) input).getProducer())) && (o.getType() == Join.Type.LEFT || o.getType() == Join.Type.RIGHT) && !(o.getWindowing() instanceof MergingWindowing); } + static boolean hasSizeHint(Operator operator) { + return operator != null && + operator.getHints() != null && + operator.getHints().contains(SizeHint.FITS_IN_MEMORY); + } + @Override @SuppressWarnings("unchecked") public JavaRDD translate(Join operator, SparkExecutorContext context) { // ~ sanity check Preconditions.checkArgument( - operator.getHints().contains(JoinHints.broadcastHashJoin()), + operator.listInputs() + .stream() + .anyMatch(input -> hasSizeHint(((Dataset) input).getProducer())), "Missing broadcastHashJoin hint"); Preconditions.checkArgument( operator.getType() == Join.Type.LEFT || operator.getType() == Join.Type.RIGHT,