From e4f0df5043bcfa37e669333b43d26c0ba077603e Mon Sep 17 00:00:00 2001 From: "marek.simunek" Date: Fri, 2 Mar 2018 15:37:32 +0100 Subject: [PATCH] [euphoria-core] #259 Hints are now in operator, not Dataset fixed tests [euphoria-core] hints in getBasicOps are only on last operator [euphoria-core] code style corrections [euphoria-core] code style corrections 2 --- .../euphoria/core/client/dataset/Dataset.java | 4 +- .../core/client/dataset/Datasets.java | 7 +- .../core/client/dataset/InputDataset.java | 9 -- .../core/client/dataset/OutputDataset.java | 9 +- .../core/client/operator/AssignEventTime.java | 6 +- .../core/client/operator/CountByKey.java | 15 +-- .../core/client/operator/Distinct.java | 6 +- .../euphoria/core/client/operator/Filter.java | 10 +- .../core/client/operator/FlatMap.java | 4 +- .../euphoria/core/client/operator/Join.java | 15 +-- .../core/client/operator/MapElements.java | 11 +- .../core/client/operator/Operator.java | 9 +- .../core/client/operator/ReduceByKey.java | 29 ++--- .../client/operator/ReduceStateByKey.java | 12 --- .../core/client/operator/ReduceWindow.java | 4 +- .../core/client/operator/SumByKey.java | 2 +- .../core/client/operator/TopPerKey.java | 6 +- .../client/operator/hint/OutputHintAware.java | 31 ------ .../core/client/operator/hint/SizeHint.java | 3 + .../core/client/operator/HintTest.java | 101 +++++++++++++++++ .../core/client/operator/JoinTest.java | 102 ++++++++++-------- .../core/client/operator/MapElementsTest.java | 6 +- .../euphoria/core/client/operator/Util.java | 29 +++++ .../core/executor/FlowUnfolderTest.java | 3 + .../batch/BroadcastHashJoinTranslator.java | 10 +- .../spark/BroadcastHashJoinTranslator.java | 11 +- 26 files changed, 278 insertions(+), 176 deletions(-) delete mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHintAware.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/HintTest.java diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java index af71aad8..615f122f 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java @@ -20,8 +20,6 @@ import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.operator.Operator; -import cz.seznam.euphoria.core.client.operator.hint.OutputHint; -import cz.seznam.euphoria.core.client.operator.hint.OutputHintAware; import javax.annotation.Nullable; import java.io.Serializable; @@ -33,7 +31,7 @@ * @param type of elements of this data set */ @Audience(Audience.Type.CLIENT) -public interface Dataset extends OutputHintAware, Serializable { +public interface Dataset extends Serializable { /** * @return the flow associated with this data set diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java index 019054e1..b8d8fe35 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java @@ -19,9 +19,6 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.operator.Operator; -import cz.seznam.euphoria.core.client.operator.hint.OutputHint; - -import java.util.Set; /** * Various dataset related utils. @@ -42,9 +39,9 @@ public class Datasets { * @return a dataset representing the output of the given operator */ public static Dataset createOutputFor( - Flow flow, Dataset input, Operator op, Set outputHints) { + Flow flow, Dataset input, Operator op) { - return new OutputDataset<>(flow, op, input.isBounded(), outputHints); + return new OutputDataset<>(flow, op, input.isBounded()); } /** diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java index a00e93fb..291b82e4 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java @@ -58,15 +58,6 @@ public void persist(DataSink sink) { "The input dataset is already stored."); } - /** - * Input Dataset doesn't have hints - * @return empty set - */ - @Override - public Set getHints() { - return Collections.emptySet(); - } - @Override public Flow getFlow() { return flow; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java index 727e1da8..2d8c211c 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java @@ -33,15 +33,13 @@ class OutputDataset implements Dataset { private final Flow flow; private final Operator producer; private final boolean bounded; - private final Set outputHints; private DataSink outputSink = null; - public OutputDataset(Flow flow, Operator producer, boolean bounded, Set outputHints) { + public OutputDataset(Flow flow, Operator producer, boolean bounded) { this.flow = flow; this.producer = producer; this.bounded = bounded; - this.outputHints = outputHints; } @Nullable @@ -81,9 +79,4 @@ public boolean isBounded() { public Collection> getConsumers() { return flow.getConsumersOf(this); } - - @Override - public Set getHints() { - return outputHints; - } } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java index a7b72504..fc1b2bb2 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java @@ -92,8 +92,8 @@ public static class OutputBuilder implements Builders.Output { @Override public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); - AssignEventTime op = new AssignEventTime<>(name, flow, input, eventTimeFn, Sets.newHashSet - (outputHints)); + AssignEventTime op = new AssignEventTime<>(name, flow, input, eventTimeFn, + Sets.newHashSet(outputHints)); flow.add(op); return op.output(); } @@ -111,7 +111,7 @@ public Dataset output(OutputHint... outputHints) { public DAG> getBasicOps() { return DAG.of(new FlatMap<>( getName(), getFlow(), input, - (i, c) -> c.collect(i), eventTimeFn)); + (i, c) -> c.collect(i), eventTimeFn, getHints())); } /** diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java index f5c55e2b..c0583225 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java @@ -52,7 +52,7 @@ ) public class CountByKey extends StateAwareWindowWiseSingleInputOperator< - IN, IN, IN, KEY, Pair, W, CountByKey> { + IN, IN, IN, KEY, Pair, W, CountByKey> { public static class OfBuilder implements Builders.Of { private final String name; @@ -175,12 +175,13 @@ public static OfBuilder named(String name) { @Override public DAG> getBasicOps() { SumByKey sum = new SumByKey<>( - getName(), - input.getFlow(), - input, - keyExtractor, - e -> 1L, - windowing); + getName(), + input.getFlow(), + input, + keyExtractor, + e -> 1L, + windowing, + getHints()); return DAG.of(sum); } } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java index ec7db07a..099a6cfc 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java @@ -30,6 +30,7 @@ import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Objects; import java.util.Set; @@ -206,10 +207,11 @@ public static OfBuilder named(String name) { new ReduceByKey<>(name, flow, input, getKeyExtractor(), e -> null, windowing, - (CombinableReduceFunction) e -> null); + (CombinableReduceFunction) e -> null, + Collections.emptySet()); MapElements format = new MapElements<>( - getName() + "::" + "Map", flow, reduce.output(), Pair::getFirst); + getName() + "::" + "Map", flow, reduce.output(), Pair::getFirst, getHints()); DAG> dag = DAG.of(reduce); dag.add(format, reduce); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java index 3d2b44fc..112cb3cb 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java @@ -137,8 +137,10 @@ public static OfBuilder named(String name) { final UnaryPredicate predicate; - Filter(String name, Flow flow, Dataset input, UnaryPredicate predicate, Set - outputHints) { + Filter(String name, + Flow flow, Dataset input, + UnaryPredicate predicate, + Set outputHints) { super(name, flow, input, outputHints); this.predicate = predicate; } @@ -155,6 +157,8 @@ public UnaryPredicate getPredicate() { if (predicate.apply(elem)) { collector.collect(elem); } - }, null)); + }, + null, + getHints())); } } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java index b7c7a068..9481616d 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java @@ -172,8 +172,8 @@ public static class OutputBuilder implements Builders.Output { @Override public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); - FlatMap map = new FlatMap<>(name, flow, input, functor, evtTimeFn, Sets.newHashSet - (outputHints)); + FlatMap map = new FlatMap<>(name, flow, input, functor, evtTimeFn, + Sets.newHashSet(outputHints)); flow.add(map); return map.output(); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java index 72b42e8f..30cb2a52 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java @@ -74,8 +74,7 @@ ) public class Join extends StateAwareWindowWiseOperator, - Either, KEY, Pair, W, Join> - implements Builders.OutputValues { + Either, KEY, Pair, W, Join> { public enum Type { INNER, @@ -287,11 +286,6 @@ public Dataset> output() { return output; } - @Override - public Dataset> output(OutputHint... outputHints) { - return output; - } - @SuppressWarnings("unchecked") private static final ListStorageDescriptor LEFT_STATE_DESCR = ListStorageDescriptor.of("left", (Class) Object.class); @@ -525,8 +519,8 @@ public BinaryFunctor getJoiner() { getName() + "::Map-right", flow, right, Either::right); final Union> union = - new Union<>(getName() + "::Union", flow, Arrays.asList( - leftMap.output(), rightMap.output())); + new Union<>(getName() + "::Union", flow, + Arrays.asList(leftMap.output(), rightMap.output())); final ReduceStateByKey, KEY, Either, OUT, StableJoinState, W> reduce = new ReduceStateByKey( @@ -541,7 +535,8 @@ public BinaryFunctor getJoiner() { return ctx == null ? new StableJoinState(storages) : new EarlyEmittingJoinState(storages, ctx); - }, new StateSupport.MergeFromStateMerger<>()); + }, new StateSupport.MergeFromStateMerger<>(), + getHints()); final DAG> dag = DAG.of(leftMap, rightMap); dag.add(union, leftMap, rightMap); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java index 4c4bf906..bb117183 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java @@ -157,6 +157,15 @@ public static OfBuilder named(String name) { this(name, flow, input, (el, ctx) -> mapper.apply(el), Collections.emptySet()); } + MapElements(String name, + Flow flow, + Dataset input, + UnaryFunction mapper, + Set outputHints) { + this(name, flow, input, (el, ctx) -> mapper.apply(el), outputHints); + } + + MapElements(String name, Flow flow, Dataset input, @@ -175,7 +184,7 @@ public static OfBuilder named(String name) { return DAG.of( // do not use the client API here, because it modifies the Flow! new FlatMap(getName(), getFlow(), input, - (i, c) -> c.collect(mapper.apply(i, c.asContext())), null)); + (i, c) -> c.collect(mapper.apply(i, c.asContext())), null, getHints())); } public UnaryFunctionEnv getMapper() { diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java index d95f5d20..f028a6cb 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java @@ -37,6 +37,8 @@ public abstract class Operator implements Serializable { /** Associated Flow. */ private final Flow flow; + private Set hints; + protected Operator(String name, Flow flow) { this.name = name; this.flow = flow; @@ -73,7 +75,12 @@ public final Flow getFlow() { */ final Dataset createOutput(final Dataset input, Set outputHints) { Flow flow = input.getFlow(); - return Datasets.createOutputFor(flow, input, this, outputHints); + this.hints = outputHints; + return Datasets.createOutputFor(flow, input, this); + } + + public Set getHints() { + return hints; } /** diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java index 87e08218..a74f28b1 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java @@ -98,8 +98,7 @@ public class ReduceByKey extends StateAwareWindowWiseSingleInputOperator< IN, IN, IN, KEY, Pair, W, - ReduceByKey> - implements Builders.OutputValues { + ReduceByKey> { public static class OfBuilder implements Builders.Of { @@ -349,7 +348,7 @@ public static class DatasetBuilder5 public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); ReduceByKey reduce = new ReduceByKey<>( - name, flow, input, keyExtractor, valueExtractor, + name, flow, input, keyExtractor, valueExtractor, windowing, reducer, valuesComparator, Sets.newHashSet(outputHints)); flow.add(reduce); return reduce.output(); @@ -396,23 +395,12 @@ public static OfBuilder named(String name) { UnaryFunction keyExtractor, UnaryFunction valueExtractor, @Nullable Windowing windowing, - CombinableReduceFunction reducer) { + CombinableReduceFunction reducer, + Set outputHints) { this( name, flow, input, keyExtractor, valueExtractor, windowing, (ReduceFunctor) toReduceFunctor(reducer), - null, Collections.emptySet()); - } - - ReduceByKey(String name, - Flow flow, - Dataset input, - UnaryFunction keyExtractor, - UnaryFunction valueExtractor, - @Nullable Windowing windowing, - ReduceFunctor reducer, - @Nullable BinaryFunction valueComparator) { - this(name, flow, input, keyExtractor, valueExtractor, windowing, reducer, valueComparator, - Collections.emptySet()); + null, outputHints); } ReduceByKey(String name, @@ -443,11 +431,6 @@ public UnaryFunction getValueExtractor() { return valueExtractor; } - @Override - public Dataset> output(OutputHint... outputHints) { - return output(); - } - @SuppressWarnings("unchecked") @Override public DAG> getBasicOps() { @@ -460,7 +443,7 @@ public Dataset> output(OutputHint... outputHints) { Operator reduceState = new ReduceStateByKey(getName(), flow, input, keyExtractor, valueExtractor, windowing, - stateFactory, stateCombine); + stateFactory, stateCombine, getHints()); return DAG.of(reduceState); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java index f7bdb953..2d1978fd 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java @@ -352,18 +352,6 @@ public static OfBuilder named(String name) { private final UnaryFunction valueExtractor; private final StateMerger stateCombiner; - ReduceStateByKey(String name, - Flow flow, - Dataset input, - UnaryFunction keyExtractor, - UnaryFunction valueExtractor, - @Nullable Windowing windowing, - StateFactory stateFactory, - StateMerger stateMerger) { - this(name, flow, input, keyExtractor, valueExtractor, windowing, stateFactory, stateMerger, - Collections.emptySet()); - } - ReduceStateByKey(String name, Flow flow, Dataset input, diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java index c6e5768d..29a5d4ad 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java @@ -299,7 +299,7 @@ public ReduceFunctor getReducer() { rbk = new ReduceByKey<>( getName() + "::ReduceByKey", getFlow(), input, getKeyExtractor(), valueExtractor, - windowing, reducer, valueComparator); + windowing, reducer, valueComparator, getHints()); dag.add(rbk); } else { // otherwise we use attached windowing, therefore @@ -315,7 +315,7 @@ public ReduceFunctor getReducer() { rbk = new ReduceByKey<>( getName() + "::ReduceByKey::attached", getFlow(), map.output(), Pair::getFirst, p -> valueExtractor.apply(p.getSecond()), - null, reducer, valueComparator); + null, reducer, valueComparator, getHints()); dag.add(map); dag.add(rbk); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java index 58ee7c0b..681a2b41 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java @@ -246,7 +246,7 @@ public static OfBuilder named(String name) { public DAG> getBasicOps() { ReduceByKey reduceByKey = new ReduceByKey<>(getName(), input.getFlow(), input, - keyExtractor, valueExtractor, windowing, Sums.ofLongs()); + keyExtractor, valueExtractor, windowing, Sums.ofLongs(), getHints()); return DAG.of(reduceByKey); } } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java index 48e76e76..ce46acf2 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java @@ -36,6 +36,7 @@ import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Set; import static java.util.Objects.requireNonNull; @@ -330,7 +331,7 @@ public UnaryFunction getScoreExtractor() { windowing, (StateContext context, Collector> collector) -> { return new MaxScored<>(context.getStorageProvider()); - }, stateCombiner); + }, stateCombiner, Collections.emptySet()); MapElements>, Triple> format = @@ -338,7 +339,8 @@ public UnaryFunction getScoreExtractor() { e -> Triple.of( e.getFirst(), e.getSecond().getFirst(), - e.getSecond().getSecond())); + e.getSecond().getSecond()), + getHints()); DAG> dag = DAG.of(reduce); dag.add(format, reduce); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHintAware.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHintAware.java deleted file mode 100644 index f1c6bc16..00000000 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHintAware.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2016-2018 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.core.client.operator.hint; - -import cz.seznam.euphoria.core.annotation.audience.Audience; - -import java.util.Set; - -@Audience(Audience.Type.INTERNAL) -public interface OutputHintAware { - - /** - * Returns all hints for the operator or Dataset. - * - * @return hints for the operator or Dataset - */ - Set getHints(); -} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/SizeHint.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/SizeHint.java index f14cf50f..78fe3428 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/SizeHint.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/SizeHint.java @@ -15,9 +15,12 @@ */ package cz.seznam.euphoria.core.client.operator.hint; +import cz.seznam.euphoria.core.annotation.audience.Audience; + /** * Extra information for runner about Dataset size */ +@Audience(Audience.Type.CLIENT) public enum SizeHint implements OutputHint { /** * Indicate to runner that dataset can fit in memory and this information diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/HintTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/HintTest.java new file mode 100644 index 00000000..681b0e02 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/HintTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2016-2018 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.operator; + +import cz.seznam.euphoria.core.client.dataset.Dataset; +import cz.seznam.euphoria.core.client.dataset.windowing.Time; +import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.io.Collector; +import cz.seznam.euphoria.core.client.io.MockStreamDataSource; +import cz.seznam.euphoria.core.client.io.VoidSink; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; +import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.core.executor.Executor; +import cz.seznam.euphoria.core.executor.FlowUnfolder; +import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; +import org.junit.Test; + +import java.time.Duration; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class HintTest { + + /** + * Test every node in DAG which was unfolded from original operator, if preserves hints + */ + @Test + @SuppressWarnings("unchecked") + public void testHintsAfterUnfold() { + Flow flow = Flow.create(getClass().getSimpleName()); + Dataset input = flow.createInput(new MockStreamDataSource<>()); + + Dataset mapped = MapElements + .named("mapElementsFitInMemoryHint") + .of(input).using(e -> e).output(SizeHint.FITS_IN_MEMORY); + Dataset> reduced = ReduceByKey + .named("reduceByKeyTwoHints") + .of(mapped) + .keyBy(e -> e) + .reduceBy(values -> 1L) + .windowBy(Time.of(Duration.ofSeconds(1))) + .output(new Util.TestHint(), new Util.TestHint2()); + + Dataset mapped2 = MapElements + .named("mapElementsTestHint2") + .of(reduced).using(Pair::getFirst).output(new Util.TestHint2()); + mapped2.persist(new VoidSink<>()); + + Dataset> output = Join + .named("joinHint") + .of(mapped, reduced) + .by(e -> e, Pair::getFirst) + .using((Object l, Pair r, Collector c) -> c.collect(r.getSecond())) + .windowBy(Time.of(Duration.ofSeconds(1))) + .output(new Util.TestHint()); + + output.persist(new VoidSink<>()); + + DAG> unfolded = FlowUnfolder.unfold(flow, Executor.getBasicOps()); + + testNodesByName(unfolded, "mapElementsFitInMemoryHint", 1, Sets.newHashSet(SizeHint.FITS_IN_MEMORY)); + + testNodesByName(unfolded, "mapElementsTestHint2", 1, Sets.newHashSet(new Util.TestHint2())); + + testNodesByName(unfolded, "reduceByKeyTwoHints", 2, + Sets.newHashSet(new Util.TestHint(), new Util.TestHint2())); + + testNodesByName(unfolded, "joinHint::ReduceStateByKey", 1, Sets.newHashSet(new Util.TestHint())); + + } + + private void testNodesByName(DAG> unfolded, + String name, + int expectedHintCount, + Set expectedHints) { + unfolded.nodes() + .filter(node -> node.getName().equalsIgnoreCase(name)) + .forEach(operator -> { + assertEquals(expectedHintCount, operator.getHints().size()); + assertTrue(expectedHints.containsAll(operator.getHints())); + } + ); + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java index c8339a25..0c5fbea3 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java @@ -19,7 +19,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.Collector; -import cz.seznam.euphoria.core.client.operator.hint.OutputHint; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Pair; import org.junit.Test; @@ -198,7 +198,6 @@ public void testBuild_Windowing() { assertTrue(join.getWindowing() instanceof Time); } - @Test @SuppressWarnings("unchecked") public void testBuild_Hints() { @@ -206,21 +205,40 @@ public void testBuild_Hints() { Dataset left = Util.createMockDataset(flow, 1); Dataset right = Util.createMockDataset(flow, 1); - Join.named("Join1") - .of(MapElements.of(left).using(i -> i).output(new TestHint(), new TestHint2()), + + Dataset outputDataset = Join.named("Join1") + .of(MapElements.of(left).using(i -> i).output(new Util.TestHint(), new Util.TestHint2()), right) .by(String::length, String::length) .using((String l, String r, Collector c) -> { // no-op }) - .output(); + .outputValues(SizeHint.FITS_IN_MEMORY); + + assertTrue(outputDataset.getProducer().getHints().contains(SizeHint.FITS_IN_MEMORY)); + + Join join = (Join) flow.operators() + .stream() + .filter(op -> op instanceof Join) + .findFirst() + .get(); + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input).getProducer().getHints().contains(new Util.TestHint()))); + + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input).getProducer().getHints().contains(new Util.TestHint2()))); + + assertEquals(2, + ((Dataset) join.listInputs() + .stream() + .findFirst() + .get() + ).getProducer().getHints().size()); - Join join = (Join) flow.operators().stream().filter(op -> op instanceof Join).findFirst().get(); - assertTrue(join.listInputs().stream().anyMatch(input -> ((Dataset) input).getHints().contains(new - TestHint()))); - assertTrue(join.listInputs().stream().anyMatch(input -> ((Dataset) input).getHints().contains(new - TestHint2()))); - assertEquals(2, ((Dataset) join.listInputs().stream().findFirst().get()).getHints().size()); } @Test @@ -231,7 +249,9 @@ public void testBuild_Hints_afterWindowing() { Dataset right = Util.createMockDataset(flow, 1); Join.named("Join1") - .of(MapElements.of(left).using(i -> i).output(new TestHint(), new TestHint2(), new TestHint2()), + .of(MapElements.of(left) + .using(i -> i) + .output(new Util.TestHint(), new Util.TestHint2(), new Util.TestHint2()), right) .by(String::length, String::length) .using((String l, String r, Collector c) -> { @@ -240,39 +260,31 @@ public void testBuild_Hints_afterWindowing() { .windowBy(Time.of(Duration.ofHours(1))) .output(); - Join join = (Join) flow.operators().stream().filter(op -> op instanceof Join).findFirst().get(); - assertTrue(join.listInputs().stream().anyMatch(input -> ((Dataset) input).getHints().contains(new - TestHint()))); - assertTrue(join.listInputs().stream().anyMatch(input -> ((Dataset) input).getHints().contains(new - TestHint2()))); - assertEquals(2, ((Dataset) join.listInputs().stream().findFirst().get()).getHints().size()); - assertTrue(join.getWindowing() instanceof Time); - } - - private static class TestHint implements OutputHint { + Join join = (Join) flow.operators() + .stream() + .filter(op -> op instanceof Join) + .findFirst() + .get(); + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input).getProducer().getHints().contains(new Util.TestHint()))); + + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input) + .getProducer() + .getHints() + .contains(new Util.TestHint2()))); + + assertEquals(2, + ((Dataset) join.listInputs() + .stream() + .findFirst() + .get() + ).getProducer().getHints().size()); - @Override - public int hashCode() { - return 0; - } - - @Override - public boolean equals(Object obj) { - return obj instanceof TestHint; - } - } - - private static class TestHint2 implements OutputHint { - - - @Override - public int hashCode() { - return 0; - } - - @Override - public boolean equals(Object obj) { - return obj instanceof TestHint2; - } + assertTrue(join.getWindowing() instanceof Time); } } \ No newline at end of file diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java index 2e1fe4f3..96ce150d 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java @@ -89,10 +89,10 @@ public void testBuild_Hints() { Dataset dataSetWithHint = MapElements.of(dataset).using(i -> i).output(SizeHint.FITS_IN_MEMORY); - assertTrue(dataSetWithHint.getHints().contains(SizeHint.FITS_IN_MEMORY)); - assertEquals(1, dataSetWithHint.getHints().size()); + assertTrue(dataSetWithHint.getProducer().getHints().contains(SizeHint.FITS_IN_MEMORY)); + assertEquals(1, dataSetWithHint.getProducer().getHints().size()); Dataset dataSetWithoutHint = MapElements.of(dataset).using(i -> i).output(); - assertEquals(0, dataSetWithoutHint.getHints().size()); + assertEquals(0, dataSetWithoutHint.getProducer().getHints().size()); } } \ No newline at end of file diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/Util.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/Util.java index 5f9ec314..7f144e6b 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/Util.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/Util.java @@ -18,6 +18,7 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.ListDataSource; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import java.util.ArrayList; import java.util.List; @@ -33,4 +34,32 @@ public static Dataset createMockDataset(Flow flow, int numPartitions) { return flow.createInput(ListDataSource.bounded(partitions)); } + + + public static class TestHint implements OutputHint { + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof TestHint; + } + } + + public static class TestHint2 implements OutputHint { + + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof TestHint2; + } + } } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java index 377ab657..04daf9bb 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java @@ -24,11 +24,14 @@ import cz.seznam.euphoria.core.client.io.StdoutSink; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.core.client.operator.Join; +import cz.seznam.euphoria.core.client.operator.JoinTest; import cz.seznam.euphoria.core.client.operator.MapElements; import cz.seznam.euphoria.core.client.operator.Operator; import cz.seznam.euphoria.core.client.operator.ReduceByKey; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; import cz.seznam.euphoria.core.client.operator.Union; +import cz.seznam.euphoria.core.client.operator.Util; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.executor.FlowUnfolder.InputOperator; import cz.seznam.euphoria.core.executor.graph.DAG; diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java index 0017a87c..09541656 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java @@ -22,6 +22,7 @@ import cz.seznam.euphoria.core.client.functional.BinaryFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.Join; +import cz.seznam.euphoria.core.client.operator.Operator; import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.executor.util.MultiValueContext; @@ -41,13 +42,20 @@ public class BroadcastHashJoinTranslator implements BatchOperatorTranslator ((Dataset) input).getHints().contains(SizeHint.FITS_IN_MEMORY)) + .anyMatch(input -> hasSizeHint(((Dataset) input).getProducer())) && (o.getType() == Join.Type.LEFT || o.getType() == Join.Type.RIGHT) && !(o.getWindowing() instanceof MergingWindowing); } + static boolean hasSizeHint(Operator operator) { + return operator != null && + operator.getHints() != null && + operator.getHints().contains(SizeHint.FITS_IN_MEMORY); + } + @Override @SuppressWarnings("unchecked") public DataSet translate(FlinkOperator operator, BatchExecutorContext context) { diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java index 9e3c7f9f..79dc4608 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java @@ -24,6 +24,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.Join; +import cz.seznam.euphoria.core.client.operator.Operator; import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Either; import cz.seznam.euphoria.core.client.util.Pair; @@ -58,11 +59,17 @@ public class BroadcastHashJoinTranslator implements SparkOperatorTranslator ((Dataset) input).getHints().contains(SizeHint.FITS_IN_MEMORY)) + .anyMatch(input -> hasSizeHint(((Dataset) input).getProducer())) && (o.getType() == Join.Type.LEFT || o.getType() == Join.Type.RIGHT) && !(o.getWindowing() instanceof MergingWindowing); } + static boolean hasSizeHint(Operator operator) { + return operator != null && + operator.getHints() != null && + operator.getHints().contains(SizeHint.FITS_IN_MEMORY); + } + @Override @SuppressWarnings("unchecked") public JavaRDD translate(Join operator, SparkExecutorContext context) { @@ -71,7 +78,7 @@ public JavaRDD translate(Join operator, SparkExecutorContext context) { Preconditions.checkArgument( operator.listInputs() .stream() - .anyMatch(input -> ((Dataset) input).getHints().contains(SizeHint.FITS_IN_MEMORY)), + .anyMatch(input -> hasSizeHint(((Dataset) input).getProducer())), "Missing broadcastHashJoin hint"); Preconditions.checkArgument( operator.getType() == Join.Type.LEFT || operator.getType() == Join.Type.RIGHT,