Skip to content

Commit

Permalink
fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mareksimunek committed Mar 7, 2018
1 parent 77addbe commit 0e18e81
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2016-2018 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.client.dataset.Dataset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ public void testBuild_Hints() {
assertTrue(outputDataset.getProducer().getHints().contains(SizeHint.FITS_IN_MEMORY));

Join join = (Join) flow.operators().stream().filter(op -> op instanceof Join).findFirst().get();
assertTrue(join.listInputs().stream().anyMatch(input -> ((Dataset)input).getProducer().getHints().contains(new
Util.TestHint())));
assertTrue(join.listInputs().stream().anyMatch(input ->
((Dataset) input).getProducer().getHints().contains(new Util.TestHint())));
assertTrue(join.listInputs().stream().anyMatch(input ->
((Dataset) input).getProducer().getHints().contains(new Util.TestHint2())));
assertEquals(2, ((Dataset) join.listInputs().stream().findFirst().get()).getProducer().getHints().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import cz.seznam.euphoria.core.client.functional.BinaryFunctor;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.Join;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.hint.SizeHint;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.util.MultiValueContext;
Expand All @@ -41,13 +42,20 @@ public class BroadcastHashJoinTranslator implements BatchOperatorTranslator<Join

@SuppressWarnings("unchecked")
static boolean wantTranslate(Join o) {

return o.listInputs()
.stream()
.anyMatch(input -> ((Dataset) input).getHints().contains(SizeHint.FITS_IN_MEMORY))
.anyMatch(input -> hasSizeHint(((Dataset) input).getProducer()))
&& (o.getType() == Join.Type.LEFT || o.getType() == Join.Type.RIGHT)
&& !(o.getWindowing() instanceof MergingWindowing);
}

static boolean hasSizeHint(Operator operator) {
return operator != null &&
operator.getHints() != null &&
operator.getHints().contains(SizeHint.FITS_IN_MEMORY);
}

@Override
@SuppressWarnings("unchecked")
public DataSet<?> translate(FlinkOperator<Join> operator, BatchExecutorContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.Join;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.hint.SizeHint;
import cz.seznam.euphoria.core.client.util.Either;
import cz.seznam.euphoria.core.client.util.Pair;
Expand Down Expand Up @@ -58,11 +59,17 @@ public class BroadcastHashJoinTranslator implements SparkOperatorTranslator<Join
static boolean wantTranslate(Join o) {
return o.listInputs()
.stream()
.anyMatch(input -> ((Dataset) input).getProducer().getHints().contains(SizeHint.FITS_IN_MEMORY))
.anyMatch(input -> hasSizeHint(((Dataset) input).getProducer()))
&& (o.getType() == Join.Type.LEFT || o.getType() == Join.Type.RIGHT)
&& !(o.getWindowing() instanceof MergingWindowing);
}

static boolean hasSizeHint(Operator operator) {
return operator != null &&
operator.getHints() != null &&
operator.getHints().contains(SizeHint.FITS_IN_MEMORY);
}

@Override
@SuppressWarnings("unchecked")
public JavaRDD<?> translate(Join operator, SparkExecutorContext context) {
Expand All @@ -71,7 +78,7 @@ public JavaRDD<?> translate(Join operator, SparkExecutorContext context) {
Preconditions.checkArgument(
operator.listInputs()
.stream()
.anyMatch(input -> ((Dataset) input).getProducer().getHints().contains(SizeHint.FITS_IN_MEMORY)),
.anyMatch(input -> hasSizeHint(((Dataset) input).getProducer())),
"Missing broadcastHashJoin hint");
Preconditions.checkArgument(
operator.getType() == Join.Type.LEFT || operator.getType() == Join.Type.RIGHT,
Expand Down

0 comments on commit 0e18e81

Please sign in to comment.