diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/util/MultiValueContext.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/util/MultiValueContext.java index f482aed5..c2c5eb35 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/util/MultiValueContext.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/util/MultiValueContext.java @@ -25,97 +25,98 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; @Audience(Audience.Type.EXECUTOR) public class MultiValueContext implements Context, Collector { - private final List elements = new ArrayList<>(1); - @Nullable - final Context wrap; - - public MultiValueContext() { - this(null); - } - - public MultiValueContext(Context wrap) { - this.wrap = wrap; - } - - /** - * Replace the stored value with given one. - * - * @param elem the element to store - */ - @Override - public void collect(T elem) { - elements.add(elem); - } - - @Override - public Context asContext() { - return this; - } - - /** - * Retrieve window associated with the stored element. - */ - @Override - public Window getWindow() throws UnsupportedOperationException { - if (wrap == null) { - throw new UnsupportedOperationException( - "The window is unknown in this context"); - } - return wrap.getWindow(); - } - - @Override - public Counter getCounter(String name) { - if (wrap == null) { - throw new UnsupportedOperationException( - "Accumulators not supported in this context"); - } - return wrap.getCounter(name); + private final List elements = new ArrayList<>(1); + @Nullable + final Context wrap; + + public MultiValueContext() { + this(null); + } + + public MultiValueContext(Context wrap) { + this.wrap = wrap; + } + + /** + * Replace the stored value with given one. + * + * @param elem the element to store + */ + @Override + public void collect(T elem) { + elements.add(elem); + } + + @Override + public Context asContext() { + return this; + } + + /** + * Retrieve window associated with the stored element. + */ + @Override + public Window getWindow() throws UnsupportedOperationException { + if (wrap == null) { + throw new UnsupportedOperationException( + "The window is unknown in this context"); } - - @Override - public Histogram getHistogram(String name) { - if (wrap == null) { - throw new UnsupportedOperationException( - "Accumulators not supported in this context"); - } - return wrap.getHistogram(name); - + return wrap.getWindow(); + } + + @Override + public Counter getCounter(String name) { + if (wrap == null) { + throw new UnsupportedOperationException( + "Accumulators not supported in this context"); } - - @Override - public Timer getTimer(String name) { - if (wrap == null) { - throw new UnsupportedOperationException( - "Accumulators not supported in this context"); - } - return wrap.getTimer(name); - + return wrap.getCounter(name); + } + + @Override + public Histogram getHistogram(String name) { + if (wrap == null) { + throw new UnsupportedOperationException( + "Accumulators not supported in this context"); } + return wrap.getHistogram(name); - /** - * Retrieve and reset the stored elements. - * - * @return the stored value - */ - public List getAndResetValue() { - List copiedElements = new ArrayList<>(elements); - elements.clear(); - return copiedElements; - } + } - /** - * Retrieve value of this context. - * - * @return value - */ - public List get() { - return elements; + @Override + public Timer getTimer(String name) { + if (wrap == null) { + throw new UnsupportedOperationException( + "Accumulators not supported in this context"); } + return wrap.getTimer(name); + + } + + /** + * Retrieve and reset the stored elements. + * + * @return the stored value + */ + public List getAndResetValues() { + List copiedElements = new ArrayList<>(elements); + elements.clear(); + return copiedElements; + } + + /** + * Retrieve value of this context. + * + * @return value + */ + public List get() { + return Collections.unmodifiableList(elements); + } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java index b137d2d4..bde945d9 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java @@ -44,10 +44,14 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; +/** + * Translate flow for Flink Batch Mode. Only first translation match is used in flow + */ public class BatchFlowTranslator extends FlowTranslator { public interface SplitAssignerFactory - extends BiFunction, Serializable {} + extends BiFunction, Serializable { + } public static final SplitAssignerFactory DEFAULT_SPLIT_ASSIGNER_FACTORY = (splits, partitions) -> new LocatableInputSplitAssigner(splits); @@ -63,16 +67,14 @@ private Translation( } static > void add( - Map> idx, - Class type, BatchOperatorTranslator translator) - { + Map> idx, + Class type, BatchOperatorTranslator translator) { add(idx, type, translator, null); } static > void add( - Map> idx, - Class type, BatchOperatorTranslator translator, UnaryPredicate accept) - { + Map> idx, + Class type, BatchOperatorTranslator translator, UnaryPredicate accept) { idx.putIfAbsent(type, new ArrayList<>()); idx.get(type).add(new Translation<>(translator, accept)); } @@ -100,7 +102,7 @@ public BatchFlowTranslator(Settings settings, // basic operators Translation.add(translations, FlowUnfolder.InputOperator.class, new InputTranslator - (splitAssignerFactory)); + (splitAssignerFactory)); Translation.add(translations, FlatMap.class, new FlatMapTranslator()); Translation.add(translations, ReduceStateByKey.class, new ReduceStateByKeyTranslator()); Translation.add(translations, Union.class, new UnionTranslator()); @@ -111,16 +113,16 @@ public BatchFlowTranslator(Settings settings, // ~ batch broadcast join for a very small left side Translation.add(translations, Join.class, new BroadcastHashJoinTranslator(), - BroadcastHashJoinTranslator::wantTranslate); + BroadcastHashJoinTranslator::wantTranslate); } @SuppressWarnings("unchecked") @Override protected Collection getAcceptors() { return translations.entrySet().stream() - .flatMap((entry) -> entry.getValue() - .stream() - .map(translator -> new TranslateAcceptor(entry.getKey(), translator.accept))) + .flatMap((entry) -> entry.getValue() + .stream() + .map(translator -> new TranslateAcceptor(entry.getKey(), translator.accept))) .collect(Collectors.toList()); } @@ -131,6 +133,12 @@ protected FlowOptimizer createOptimizer() { return opt; } + /** + * Take only first translation operator + * @param flow the user defined flow to be translated + * + * @return all output sinks + */ @Override @SuppressWarnings("unchecked") public List> translateInto(Flow flow) { @@ -138,7 +146,7 @@ public List> translateInto(Flow flow) { DAG>> dag = flowToDag(flow); BatchExecutorContext executorContext = new BatchExecutorContext(env, (DAG) dag, - accumulatorFactory, settings); + accumulatorFactory, settings); // translate each operator to proper Flink transformation dag.traverse().map(Node::get).forEach(op -> { @@ -146,12 +154,12 @@ public List> translateInto(Flow flow) { List txs = this.translations.get(originalOp.getClass()); if (txs.isEmpty()) { throw new UnsupportedOperationException( - "Operator " + op.getClass().getSimpleName() + " not supported"); + "Operator " + op.getClass().getSimpleName() + " not supported"); } // ~ verify the flowToDag translation Translation> firstMatch = null; for (Translation tx : txs) { - if (tx.accept == null || Boolean.TRUE.equals(tx.accept.apply(originalOp))) { + if (tx.accept == null || (boolean)tx.accept.apply(originalOp)) { firstMatch = tx; break; } @@ -169,18 +177,18 @@ public List> translateInto(Flow flow) { // process all sinks in the DAG (leaf nodes) final List> sinks = new ArrayList<>(); dag.getLeafs() - .stream() - .map(Node::get) - .filter(op -> op.output().getOutputSink() != null) - .forEach(op -> { - - final DataSink sink = op.output().getOutputSink(); - sinks.add(sink); - DataSet flinkOutput = - Objects.requireNonNull(executorContext.getOutputStream(op)); - - flinkOutput.output(new DataSinkWrapper<>((DataSink) sink)); - }); + .stream() + .map(Node::get) + .filter(op -> op.output().getOutputSink() != null) + .forEach(op -> { + + final DataSink sink = op.output().getOutputSink(); + sinks.add(sink); + DataSet flinkOutput = + Objects.requireNonNull(executorContext.getOutputStream(op)); + + flinkOutput.output(new DataSinkWrapper<>((DataSink) sink)); + }); return sinks; } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java index 88324a27..392b93b0 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java @@ -38,202 +38,197 @@ public class BroadcastHashJoinTranslator implements BatchOperatorTranslator { - static boolean wantTranslate(Join o) { - return o.getHints().contains(JoinHints.broadcastHashJoin()) - && (o.getType() == Join.Type.LEFT || o.getType() == Join.Type.RIGHT) - && !(o.getWindowing() instanceof MergingWindowing); + static boolean wantTranslate(Join o) { + return o.getHints().contains(JoinHints.broadcastHashJoin()) + && (o.getType() == Join.Type.LEFT || o.getType() == Join.Type.RIGHT) + && !(o.getWindowing() instanceof MergingWindowing); + } + + @Override + @SuppressWarnings("unchecked") + public DataSet translate(FlinkOperator operator, BatchExecutorContext context) { + final List inputs = (List) context.getInputStreams(operator); + + if (inputs.size() != 2) { + throw new IllegalStateException( + "Join should have two data sets on input, got " + inputs.size()); } - - @Override - @SuppressWarnings("unchecked") - public DataSet translate(FlinkOperator operator, BatchExecutorContext context) { - final List inputs = (List) context.getInputStreams(operator); - - if (inputs.size() != 2) { - throw new IllegalStateException( - "Join should have two data sets on input, got " + inputs.size()); - } - final DataSet left = inputs.get(0); - final DataSet right = inputs.get(1); - final Join originalOperator = operator.getOriginalOperator(); - - final UnaryFunction leftKeyExtractor = originalOperator.getLeftKeyExtractor(); - final UnaryFunction rightKeyExtractor = originalOperator.getRightKeyExtractor(); - final Windowing windowing = - originalOperator.getWindowing() == null - ? AttachedWindowing.INSTANCE - : originalOperator.getWindowing(); - DataSet> leftExtracted = - left.flatMap(new KeyExtractor(leftKeyExtractor, windowing)) - .returns(new TypeHint>() { - }) - .name(operator.getName() + "::extract-left-key") - .setParallelism(operator.getParallelism()); - - DataSet> rightExtracted = - right.flatMap(new KeyExtractor(rightKeyExtractor, windowing)) - .returns(new TypeHint>() { - }) - .name(operator.getName() + "::extract-right-key") - .setParallelism(operator.getParallelism()); - - DataSet joined; - switch (originalOperator.getType()) { - case LEFT: - joined = leftExtracted - .leftOuterJoin(rightExtracted, JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND) - .where(new JoinKeySelector()) - .equalTo(new JoinKeySelector()) - .with(new BroadcastFlatJoinFunction(originalOperator.getJoiner())) - .returns(new TypeHint>() { - }) - .name(operator.getName() + "::left-join"); - - break; - case RIGHT: - joined = leftExtracted - .rightOuterJoin(rightExtracted, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST) - .where(new JoinKeySelector()) - .equalTo(new JoinKeySelector()) - .with(new BroadcastFlatJoinFunction(originalOperator.getJoiner())) - .returns(new TypeHint>() { - }) - .name(operator.getName() + "::right-join"); - break; - default: - throw new IllegalStateException("Invalid type: " + originalOperator.getType() + "."); - } - return joined; + final DataSet left = inputs.get(0); + final DataSet right = inputs.get(1); + final Join originalOperator = operator.getOriginalOperator(); + + final UnaryFunction leftKeyExtractor = originalOperator.getLeftKeyExtractor(); + final UnaryFunction rightKeyExtractor = originalOperator.getRightKeyExtractor(); + final Windowing windowing = + originalOperator.getWindowing() == null + ? AttachedWindowing.INSTANCE + : originalOperator.getWindowing(); + DataSet> leftExtracted = + left.flatMap(new KeyExtractor(leftKeyExtractor, windowing)) + .returns(new TypeHint>() {}) + .name(operator.getName() + "::extract-left-key") + .setParallelism(operator.getParallelism()); + + DataSet> rightExtracted = + right.flatMap(new KeyExtractor(rightKeyExtractor, windowing)) + .returns(new TypeHint>() {}) + .name(operator.getName() + "::extract-right-key") + .setParallelism(operator.getParallelism()); + + DataSet joined; + switch (originalOperator.getType()) { + case LEFT: + joined = leftExtracted + .leftOuterJoin(rightExtracted, JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND) + .where(new JoinKeySelector()) + .equalTo(new JoinKeySelector()) + .with(new BroadcastFlatJoinFunction(originalOperator.getJoiner())) + .returns(new TypeHint>() {}) + .name(operator.getName() + "::left-broadcast-hash-join"); + + break; + case RIGHT: + joined = leftExtracted + .rightOuterJoin(rightExtracted, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST) + .where(new JoinKeySelector()) + .equalTo(new JoinKeySelector()) + .with(new BroadcastFlatJoinFunction(originalOperator.getJoiner())) + .returns(new TypeHint>() {}) + .name(operator.getName() + "::right-broadcast-hash-join"); + break; + default: + throw new IllegalStateException("Invalid type: " + originalOperator.getType() + "."); } + return joined; + } - private static class KeyExtractor - implements FlatMapFunction> { - - private final UnaryFunction keyExtractor; - private final Windowing windowing; + private static class KeyExtractor + implements FlatMapFunction> { - KeyExtractor(UnaryFunction keyExtractor, Windowing windowing) { - this.keyExtractor = keyExtractor; - this.windowing = windowing; - } + private final UnaryFunction keyExtractor; + private final Windowing windowing; - @Override - @SuppressWarnings("unchecked") - public void flatMap(BatchElement wel, Collector> coll) throws Exception { - Iterable assigned = windowing.assignWindowsToElement(wel); - for (Window wid : assigned) { - Object el = wel.getElement(); - long stamp = wid.maxTimestamp() - 1; - coll.collect(new BatchElement( - wid, stamp, Pair.of(keyExtractor.apply(el), el))); - } - } + KeyExtractor(UnaryFunction keyExtractor, Windowing windowing) { + this.keyExtractor = keyExtractor; + this.windowing = windowing; } - static class BroadcastFlatJoinFunction - implements FlatJoinFunction, BatchElement, - BatchElement> { - final BinaryFunctor joiner; - transient MultiValueContext multiValueContext; + @Override + @SuppressWarnings("unchecked") + public void flatMap(BatchElement wel, Collector> coll) throws Exception { + Iterable assigned = windowing.assignWindowsToElement(wel); + for (Window wid : assigned) { + Object el = wel.getElement(); + coll.collect(new BatchElement( + wid, wid.maxTimestamp(), Pair.of(keyExtractor.apply(el), el))); + } + } + } - BroadcastFlatJoinFunction(BinaryFunctor joiner) { - this.joiner = joiner; - } + static class BroadcastFlatJoinFunction + implements FlatJoinFunction, BatchElement, + BatchElement> { + final BinaryFunctor joiner; + transient MultiValueContext multiValueContext; - @Override - @SuppressWarnings("unchecked") - public void join(BatchElement first, BatchElement second, - Collector> coll) throws Exception { - - if (multiValueContext == null) { - multiValueContext = new MultiValueContext<>(); - } - final Window window = first == null ? second.getWindow() : first.getWindow(); - - final long maxTimestamp = Math.max( - first == null ? window.maxTimestamp() - 1 : first.getTimestamp(), - second == null ? window.maxTimestamp() - 1 : second.getTimestamp()); - - Object firstEl = first == null ? null : first.getElement().getSecond(); - Object secondEl = second == null ? null : second.getElement().getSecond(); - - joiner.apply(firstEl, secondEl, multiValueContext); - - final Object key = first == null - ? second.getElement().getFirst() - : first.getElement().getFirst(); - List values = multiValueContext.getAndResetValue(); - values.forEach(val -> coll.collect(new BatchElement<>( - window, - maxTimestamp, - Pair.of(key, val)))); - } + BroadcastFlatJoinFunction(BinaryFunctor joiner) { + this.joiner = joiner; } - static class JoinKeySelector - implements KeySelector, KeyedWindow> { + @Override + @SuppressWarnings("unchecked") + public void join(BatchElement first, BatchElement second, + Collector> coll) throws Exception { + + if (multiValueContext == null) { + multiValueContext = new MultiValueContext<>(); + } + final Window window = first == null ? second.getWindow() : first.getWindow(); + + final long maxTimestamp = Math.max( + first == null ? window.maxTimestamp() - 1 : first.getTimestamp(), + second == null ? window.maxTimestamp() - 1 : second.getTimestamp()); + + Object firstEl = first == null ? null : first.getElement().getSecond(); + Object secondEl = second == null ? null : second.getElement().getSecond(); + + joiner.apply(firstEl, secondEl, multiValueContext); + + final Object key = first == null + ? second.getElement().getFirst() + : first.getElement().getFirst(); + List values = multiValueContext.getAndResetValues(); + values.forEach(val -> coll.collect(new BatchElement<>( + window, + maxTimestamp, + Pair.of(key, val)))); + } + } - @Override - public KeyedWindow getKey(BatchElement value) { - return new KeyedWindow(value.getWindow(), value.getElement().getFirst()); - } + static class JoinKeySelector + implements KeySelector, KeyedWindow> { + @Override + public KeyedWindow getKey(BatchElement value) { + return new KeyedWindow(value.getWindow(), value.getElement().getFirst()); } - public static final class KeyedWindow implements Comparable { - private final W window; - private final K key; + } - KeyedWindow(W window, K key) { - this.window = Objects.requireNonNull(window); - this.key = Objects.requireNonNull(key); - } + public static final class KeyedWindow implements Comparable { + private final W window; + private final K key; - public W window() { - return window; - } + KeyedWindow(W window, K key) { + this.window = Objects.requireNonNull(window); + this.key = Objects.requireNonNull(key); + } - public K key() { - return key; - } + public W window() { + return window; + } - @Override - public boolean equals(Object o) { - if (o instanceof KeyedWindow) { - final KeyedWindow other = (KeyedWindow) o; - return Objects.equals(window, other.window) && Objects.equals(key, other.key); - } - return false; - } + public K key() { + return key; + } - @Override - public int hashCode() { - int result = window.hashCode(); - result = 31 * result + (key != null ? key.hashCode() : 0); - return result; - } + @Override + public boolean equals(Object o) { + if (o instanceof KeyedWindow) { + final KeyedWindow other = (KeyedWindow) o; + return Objects.equals(window, other.window) && Objects.equals(key, other.key); + } + return false; + } - @Override - public String toString() { - return "KeyedWindow{" + - "window=" + window + - ", key=" + key + - '}'; - } + @Override + public int hashCode() { + int result = window.hashCode(); + result = 31 * result + (key != null ? key.hashCode() : 0); + return result; + } - @Override - public int compareTo(KeyedWindow other) { - final int compareWindowResult = this.window.compareTo(other.window); - if (compareWindowResult == 0) { - if (Objects.equals(key, other.key)) { - return 0; - } else { - return 1; - } - } - return compareWindowResult; + @Override + public String toString() { + return "KeyedWindow{" + + "window=" + window + + ", key=" + key + + '}'; + } + + @Override + public int compareTo(KeyedWindow other) { + final int compareWindowResult = this.window.compareTo(other.window); + if (compareWindowResult == 0) { + if (Objects.equals(key, other.key)) { + return 0; + } else { + return 1; } + } + return compareWindowResult; } + } } diff --git a/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/AllOperatorsSuite.java b/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/AllOperatorsSuite.java index 99965a4d..47049368 100644 --- a/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/AllOperatorsSuite.java +++ b/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/AllOperatorsSuite.java @@ -25,7 +25,7 @@ */ @RunWith(ExecutorProviderRunner.class) @Suite.SuiteClasses({ - BroadcastHashJoinTest.class, + BroadcastHashJoinTest.class, CountByKeyTest.class, DistinctTest.class, FilterTest.class, diff --git a/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java b/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java index 96c82490..607fb476 100644 --- a/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java +++ b/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java @@ -113,39 +113,39 @@ public List> getUnorderedOutput() { }); } - @Processing(Processing.Type.BOUNDED) - @Test - public void keyHashCollisionBroadcastHashJoin() { - final String sameHashCodeKey1 = "FB"; - final String sameHashCodeKey2 = "Ea"; - execute(new JoinTest.JoinTestCase>() { - - @Override - protected Dataset> getOutput( - Dataset left, Dataset right) { - return LeftJoin.of(left, right) - .by(e -> e, e -> e % 2 == 0 ? sameHashCodeKey2 : sameHashCodeKey1) - .using((String l, Optional r, Collector c) -> - c.collect(l + "+" + r.orElse(null))) - .withHints(Sets.newHashSet(JoinHints.broadcastHashJoin())) - .output(); - } - - @Override - protected List getLeftInput() { - return Arrays.asList(sameHashCodeKey1, sameHashCodeKey2, "keyWithoutRightSide"); - } - - @Override - protected List getRightInput() { - return Arrays.asList(1, 2); - } - - @Override - public List> getUnorderedOutput() { - return Arrays.asList(Pair.of(sameHashCodeKey1, "FB+1"), Pair.of(sameHashCodeKey2, "Ea+2"), - Pair.of("keyWithoutRightSide", "keyWithoutRightSide+null")); - } - }); - } + @Processing(Processing.Type.BOUNDED) + @Test + public void keyHashCollisionBroadcastHashJoin() { + final String sameHashCodeKey1 = "FB"; + final String sameHashCodeKey2 = "Ea"; + execute(new JoinTest.JoinTestCase>() { + + @Override + protected Dataset> getOutput( + Dataset left, Dataset right) { + return LeftJoin.of(left, right) + .by(e -> e, e -> e % 2 == 0 ? sameHashCodeKey2 : sameHashCodeKey1) + .using((String l, Optional r, Collector c) -> + c.collect(l + "+" + r.orElse(null))) + .withHints(Sets.newHashSet(JoinHints.broadcastHashJoin())) + .output(); + } + + @Override + protected List getLeftInput() { + return Arrays.asList(sameHashCodeKey1, sameHashCodeKey2, "keyWithoutRightSide"); + } + + @Override + protected List getRightInput() { + return Arrays.asList(1, 2); + } + + @Override + public List> getUnorderedOutput() { + return Arrays.asList(Pair.of(sameHashCodeKey1, "FB+1"), Pair.of(sameHashCodeKey2, "Ea+2"), + Pair.of("keyWithoutRightSide", "keyWithoutRightSide+null")); + } + }); + } }