Skip to content

Commit

Permalink
Merge pull request #264 from seznam/simunek/260/flinkBroadcastHashJoin
Browse files Browse the repository at this point in the history
[euphoria-flink] #260 Flink - broadcast hash join
  • Loading branch information
mareksimunek authored Feb 14, 2018
2 parents f543c83 + 2ef9f50 commit bb9e81f
Show file tree
Hide file tree
Showing 7 changed files with 468 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.spark;
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.operator.JoinHint;

@Audience(Audience.Type.CLIENT)
public class JoinHints {
Expand All @@ -28,8 +27,7 @@ public static BroadcastHashJoin broadcastHashJoin() {
}

/**
* Broadcasts optional join side to all executors. See {@link BroadcastHashJoinTranslator}
* for more details.
* Broadcasts optional join side to all executors.
*/
public static class BroadcastHashJoin implements JoinHint {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2016-2018 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.executor.util;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.accumulators.Counter;
import cz.seznam.euphoria.core.client.accumulators.Histogram;
import cz.seznam.euphoria.core.client.accumulators.Timer;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.io.Collector;
import cz.seznam.euphoria.core.client.io.Context;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Audience(Audience.Type.EXECUTOR)
public class MultiValueContext<T> implements Context, Collector<T> {

private final List<T> elements = new ArrayList<>(1);
@Nullable
final Context wrap;

public MultiValueContext() {
this(null);
}

public MultiValueContext(Context wrap) {
this.wrap = wrap;
}

/**
* Replace the stored value with given one.
*
* @param elem the element to store
*/
@Override
public void collect(T elem) {
elements.add(elem);
}

@Override
public Context asContext() {
return this;
}

/**
* Retrieve window associated with the stored element.
*/
@Override
public Window<?> getWindow() throws UnsupportedOperationException {
if (wrap == null) {
throw new UnsupportedOperationException(
"The window is unknown in this context");
}
return wrap.getWindow();
}

@Override
public Counter getCounter(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}
return wrap.getCounter(name);
}

@Override
public Histogram getHistogram(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}
return wrap.getHistogram(name);

}

@Override
public Timer getTimer(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}
return wrap.getTimer(name);

}

/**
* Retrieve and reset the stored elements.
*
* @return the stored value
*/
public List<T> getAndResetValues() {
List<T> copiedElements = new ArrayList<>(elements);
elements.clear();
return copiedElements;
}

/**
* Retrieve value of this context.
*
* @return value
*/
public List<T> get() {
return Collections.unmodifiableList(elements);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@
*/
package cz.seznam.euphoria.flink.batch;

import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory;
import cz.seznam.euphoria.shadow.com.google.common.base.Preconditions;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryPredicate;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.executor.graph.Node;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.operator.FlatMap;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.ReduceByKey;
import cz.seznam.euphoria.core.client.operator.ReduceStateByKey;
import cz.seznam.euphoria.core.client.operator.Union;
import cz.seznam.euphoria.core.client.operator.*;
import cz.seznam.euphoria.core.executor.FlowUnfolder;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.executor.graph.Node;
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.FlowOptimizer;
import cz.seznam.euphoria.flink.FlowTranslator;
import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory;
import cz.seznam.euphoria.flink.batch.io.DataSinkWrapper;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.java.DataSet;
Expand All @@ -49,10 +44,14 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
* Translate flow for Flink Batch Mode. Only first translation match is used in flow
*/
public class BatchFlowTranslator extends FlowTranslator {

public interface SplitAssignerFactory
extends BiFunction<LocatableInputSplit[], Integer, InputSplitAssigner>, Serializable {}
extends BiFunction<LocatableInputSplit[], Integer, InputSplitAssigner>, Serializable {
}

public static final SplitAssignerFactory DEFAULT_SPLIT_ASSIGNER_FACTORY =
(splits, partitions) -> new LocatableInputSplitAssigner(splits);
Expand All @@ -67,22 +66,21 @@ private Translation(
this.accept = accept;
}

static <O extends Operator<?, ?>> void set(
Map<Class, Translation> idx,
Class<O> type, BatchOperatorTranslator<O> translator)
{
set(idx, type, translator, null);
static <O extends Operator<?, ?>> void add(
Map<Class, List<Translation>> idx,
Class<O> type, BatchOperatorTranslator<O> translator) {
add(idx, type, translator, null);
}

static <O extends Operator<?, ?>> void set(
Map<Class, Translation> idx,
Class<O> type, BatchOperatorTranslator<O> translator, UnaryPredicate<O> accept)
{
idx.put(type, new Translation<>(translator, accept));
static <O extends Operator<?, ?>> void add(
Map<Class, List<Translation>> idx,
Class<O> type, BatchOperatorTranslator<O> translator, UnaryPredicate<O> accept) {
idx.putIfAbsent(type, new ArrayList<>());
idx.get(type).add(new Translation<>(translator, accept));
}
}

private final Map<Class, Translation> translations = new IdentityHashMap<>();
private final Map<Class, List<Translation>> translations = new IdentityHashMap<>();

private final Settings settings;
private final ExecutionEnvironment env;
Expand All @@ -103,21 +101,28 @@ public BatchFlowTranslator(Settings settings,
this.accumulatorFactory = Objects.requireNonNull(accumulatorFactory);

// basic operators
Translation.set(translations, FlowUnfolder.InputOperator.class, new InputTranslator(splitAssignerFactory));
Translation.set(translations, FlatMap.class, new FlatMapTranslator());
Translation.set(translations, ReduceStateByKey.class, new ReduceStateByKeyTranslator());
Translation.set(translations, Union.class, new UnionTranslator());
Translation.add(translations, FlowUnfolder.InputOperator.class, new InputTranslator(
splitAssignerFactory));
Translation.add(translations, FlatMap.class, new FlatMapTranslator());
Translation.add(translations, ReduceStateByKey.class, new ReduceStateByKeyTranslator());
Translation.add(translations, Union.class, new UnionTranslator());

// derived operators
Translation.set(translations, ReduceByKey.class, new ReduceByKeyTranslator(),
Translation.add(translations, ReduceByKey.class, new ReduceByKeyTranslator(),
ReduceByKeyTranslator::wantTranslate);

// ~ batch broadcast join for a very small left side
Translation.add(translations, Join.class, new BroadcastHashJoinTranslator(),
BroadcastHashJoinTranslator::wantTranslate);
}

@SuppressWarnings("unchecked")
@Override
protected Collection<TranslateAcceptor> getAcceptors() {
return translations.entrySet().stream()
.map(e -> new TranslateAcceptor(e.getKey(), e.getValue().accept))
.flatMap((entry) -> entry.getValue()
.stream()
.map(translator -> new TranslateAcceptor(entry.getKey(), translator.accept)))
.collect(Collectors.toList());
}

Expand All @@ -128,48 +133,62 @@ protected FlowOptimizer createOptimizer() {
return opt;
}

/**
* Take only first translation operator
* @param flow the user defined flow to be translated
*
* @return all output sinks
*/
@Override
@SuppressWarnings("unchecked")
public List<DataSink<?>> translateInto(Flow flow) {
// transform flow to acyclic graph of supported operators
DAG<FlinkOperator<Operator<?, ?>>> dag = flowToDag(flow);

BatchExecutorContext executorContext = new BatchExecutorContext(env, (DAG) dag,
accumulatorFactory, settings);
accumulatorFactory, settings);

// translate each operator to proper Flink transformation
dag.traverse().map(Node::get).forEach(op -> {
Operator<?, ?> originalOp = op.getOriginalOperator();
Translation<Operator<?, ?>> tx = translations.get(originalOp.getClass());
if (tx == null) {
List<Translation> txs = this.translations.get(originalOp.getClass());
if (txs.isEmpty()) {
throw new UnsupportedOperationException(
"Operator " + op.getClass().getSimpleName() + " not supported");
"Operator " + op.getClass().getSimpleName() + " not supported");
}
// ~ verify the flowToDag translation
Preconditions.checkState(
tx.accept == null || Boolean.TRUE.equals(tx.accept.apply(originalOp)));

DataSet<?> out = tx.translator.translate(op, executorContext);

// save output of current operator to context
executorContext.setOutput(op, out);
Translation<Operator<?, ?>> firstMatch = null;
for (Translation<Operator<?, ?>> tx : txs) {
if (tx.accept == null || tx.accept.apply(originalOp)) {
firstMatch = tx;
break;
}
}
final DataSet<?> out;
if (firstMatch != null) {
out = firstMatch.translator.translate(op, executorContext);
// save output of current operator to context
executorContext.setOutput(op, out);
} else {
throw new IllegalStateException("No matching translation.");
}
});

// process all sinks in the DAG (leaf nodes)
final List<DataSink<?>> sinks = new ArrayList<>();
dag.getLeafs()
.stream()
.map(Node::get)
.filter(op -> op.output().getOutputSink() != null)
.forEach(op -> {

final DataSink<?> sink = op.output().getOutputSink();
sinks.add(sink);
DataSet<?> flinkOutput =
Objects.requireNonNull(executorContext.getOutputStream(op));

flinkOutput.output(new DataSinkWrapper<>((DataSink) sink));
});
.stream()
.map(Node::get)
.filter(op -> op.output().getOutputSink() != null)
.forEach(op -> {

final DataSink<?> sink = op.output().getOutputSink();
sinks.add(sink);
DataSet<?> flinkOutput =
Objects.requireNonNull(executorContext.getOutputStream(op));

flinkOutput.output(new DataSinkWrapper<>((DataSink) sink));
});

return sinks;
}
Expand Down
Loading

0 comments on commit bb9e81f

Please sign in to comment.