Skip to content

Commit

Permalink
[euphoria-core] #259 Hints are now in operator, not Dataset
Browse files Browse the repository at this point in the history
fixed tests

[euphoria-core] hints in getBasicOps are only on last operator

[euphoria-core] code style corrections

[euphoria-core] code style corrections 2
  • Loading branch information
mareksimunek committed Mar 8, 2018
1 parent 817a4ab commit e4f0df5
Show file tree
Hide file tree
Showing 26 changed files with 278 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.client.operator.hint.OutputHintAware;

import javax.annotation.Nullable;
import java.io.Serializable;
Expand All @@ -33,7 +31,7 @@
* @param <T> type of elements of this data set
*/
@Audience(Audience.Type.CLIENT)
public interface Dataset<T> extends OutputHintAware<OutputHint>, Serializable {
public interface Dataset<T> extends Serializable {

/**
* @return the flow associated with this data set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;

import java.util.Set;

/**
* Various dataset related utils.
Expand All @@ -42,9 +39,9 @@ public class Datasets {
* @return a dataset representing the output of the given operator
*/
public static <IN, OUT> Dataset<OUT> createOutputFor(
Flow flow, Dataset<IN> input, Operator<IN, OUT> op, Set<OutputHint> outputHints) {
Flow flow, Dataset<IN> input, Operator<IN, OUT> op) {

return new OutputDataset<>(flow, op, input.isBounded(), outputHints);
return new OutputDataset<>(flow, op, input.isBounded());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,6 @@ public void persist(DataSink<T> sink) {
"The input dataset is already stored.");
}

/**
* Input Dataset doesn't have hints
* @return empty set
*/
@Override
public Set<OutputHint> getHints() {
return Collections.emptySet();
}

@Override
public Flow getFlow() {
return flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ class OutputDataset<T> implements Dataset<T> {
private final Flow flow;
private final Operator<?, T> producer;
private final boolean bounded;
private final Set<OutputHint> outputHints;

private DataSink<T> outputSink = null;

public OutputDataset(Flow flow, Operator<?, T> producer, boolean bounded, Set<OutputHint> outputHints) {
public OutputDataset(Flow flow, Operator<?, T> producer, boolean bounded) {
this.flow = flow;
this.producer = producer;
this.bounded = bounded;
this.outputHints = outputHints;
}

@Nullable
Expand Down Expand Up @@ -81,9 +79,4 @@ public boolean isBounded() {
public Collection<Operator<?, ?>> getConsumers() {
return flow.getConsumersOf(this);
}

@Override
public Set<OutputHint> getHints() {
return outputHints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public static class OutputBuilder<IN> implements Builders.Output<IN> {
@Override
public Dataset<IN> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
AssignEventTime<IN> op = new AssignEventTime<>(name, flow, input, eventTimeFn, Sets.newHashSet
(outputHints));
AssignEventTime<IN> op = new AssignEventTime<>(name, flow, input, eventTimeFn,
Sets.newHashSet(outputHints));
flow.add(op);
return op.output();
}
Expand All @@ -111,7 +111,7 @@ public Dataset<IN> output(OutputHint... outputHints) {
public DAG<Operator<?, ?>> getBasicOps() {
return DAG.of(new FlatMap<>(
getName(), getFlow(), input,
(i, c) -> c.collect(i), eventTimeFn));
(i, c) -> c.collect(i), eventTimeFn, getHints()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
)
public class CountByKey<IN, KEY, W extends Window>
extends StateAwareWindowWiseSingleInputOperator<
IN, IN, IN, KEY, Pair<KEY, Long>, W, CountByKey<IN, KEY, W>> {
IN, IN, IN, KEY, Pair<KEY, Long>, W, CountByKey<IN, KEY, W>> {

public static class OfBuilder implements Builders.Of {
private final String name;
Expand Down Expand Up @@ -175,12 +175,13 @@ public static OfBuilder named(String name) {
@Override
public DAG<Operator<?, ?>> getBasicOps() {
SumByKey<IN, KEY, W> sum = new SumByKey<>(
getName(),
input.getFlow(),
input,
keyExtractor,
e -> 1L,
windowing);
getName(),
input.getFlow(),
input,
keyExtractor,
e -> 1L,
windowing,
getHints());
return DAG.of(sum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;

Expand Down Expand Up @@ -206,10 +207,11 @@ public static OfBuilder named(String name) {
new ReduceByKey<>(name,
flow, input, getKeyExtractor(), e -> null,
windowing,
(CombinableReduceFunction<Void>) e -> null);
(CombinableReduceFunction<Void>) e -> null,
Collections.emptySet());

MapElements format = new MapElements<>(
getName() + "::" + "Map", flow, reduce.output(), Pair::getFirst);
getName() + "::" + "Map", flow, reduce.output(), Pair::getFirst, getHints());

DAG<Operator<?, ?>> dag = DAG.of(reduce);
dag.add(format, reduce);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ public static OfBuilder named(String name) {

final UnaryPredicate<IN> predicate;

Filter(String name, Flow flow, Dataset<IN> input, UnaryPredicate<IN> predicate, Set<OutputHint>
outputHints) {
Filter(String name,
Flow flow, Dataset<IN> input,
UnaryPredicate<IN> predicate,
Set<OutputHint> outputHints) {
super(name, flow, input, outputHints);
this.predicate = predicate;
}
Expand All @@ -155,6 +157,8 @@ public UnaryPredicate<IN> getPredicate() {
if (predicate.apply(elem)) {
collector.collect(elem);
}
}, null));
},
null,
getHints()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public static class OutputBuilder<IN, OUT> implements Builders.Output<OUT> {
@Override
public Dataset<OUT> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
FlatMap<IN, OUT> map = new FlatMap<>(name, flow, input, functor, evtTimeFn, Sets.newHashSet
(outputHints));
FlatMap<IN, OUT> map = new FlatMap<>(name, flow, input, functor, evtTimeFn,
Sets.newHashSet(outputHints));
flow.add(map);
return map.output();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@
)
public class Join<LEFT, RIGHT, KEY, OUT, W extends Window>
extends StateAwareWindowWiseOperator<Object, Either<LEFT, RIGHT>,
Either<LEFT, RIGHT>, KEY, Pair<KEY, OUT>, W, Join<LEFT, RIGHT, KEY, OUT, W>>
implements Builders.OutputValues<KEY, OUT> {
Either<LEFT, RIGHT>, KEY, Pair<KEY, OUT>, W, Join<LEFT, RIGHT, KEY, OUT, W>> {

public enum Type {
INNER,
Expand Down Expand Up @@ -287,11 +286,6 @@ public Dataset<Pair<KEY, OUT>> output() {
return output;
}

@Override
public Dataset<Pair<KEY, OUT>> output(OutputHint... outputHints) {
return output;
}

@SuppressWarnings("unchecked")
private static final ListStorageDescriptor LEFT_STATE_DESCR =
ListStorageDescriptor.of("left", (Class) Object.class);
Expand Down Expand Up @@ -525,8 +519,8 @@ public BinaryFunctor<LEFT, RIGHT, OUT> getJoiner() {
getName() + "::Map-right", flow, right, Either::right);

final Union<Either<LEFT, RIGHT>> union =
new Union<>(getName() + "::Union", flow, Arrays.asList(
leftMap.output(), rightMap.output()));
new Union<>(getName() + "::Union", flow,
Arrays.asList(leftMap.output(), rightMap.output()));

final ReduceStateByKey<Either<LEFT, RIGHT>, KEY, Either<LEFT, RIGHT>, OUT, StableJoinState, W>
reduce = new ReduceStateByKey(
Expand All @@ -541,7 +535,8 @@ public BinaryFunctor<LEFT, RIGHT, OUT> getJoiner() {
return ctx == null
? new StableJoinState(storages)
: new EarlyEmittingJoinState(storages, ctx);
}, new StateSupport.MergeFromStateMerger<>());
}, new StateSupport.MergeFromStateMerger<>(),
getHints());

final DAG<Operator<?, ?>> dag = DAG.of(leftMap, rightMap);
dag.add(union, leftMap, rightMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ public static OfBuilder named(String name) {
this(name, flow, input, (el, ctx) -> mapper.apply(el), Collections.emptySet());
}

MapElements(String name,
Flow flow,
Dataset<IN> input,
UnaryFunction<IN, OUT> mapper,
Set<OutputHint> outputHints) {
this(name, flow, input, (el, ctx) -> mapper.apply(el), outputHints);
}


MapElements(String name,
Flow flow,
Dataset<IN> input,
Expand All @@ -175,7 +184,7 @@ public static OfBuilder named(String name) {
return DAG.of(
// do not use the client API here, because it modifies the Flow!
new FlatMap<IN, OUT>(getName(), getFlow(), input,
(i, c) -> c.collect(mapper.apply(i, c.asContext())), null));
(i, c) -> c.collect(mapper.apply(i, c.asContext())), null, getHints()));
}

public UnaryFunctionEnv<IN, OUT> getMapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public abstract class Operator<IN, OUT> implements Serializable {
/** Associated Flow. */
private final Flow flow;

private Set<OutputHint> hints;

protected Operator(String name, Flow flow) {
this.name = name;
this.flow = flow;
Expand Down Expand Up @@ -73,7 +75,12 @@ public final Flow getFlow() {
*/
final Dataset<OUT> createOutput(final Dataset<IN> input, Set<OutputHint> outputHints) {
Flow flow = input.getFlow();
return Datasets.createOutputFor(flow, input, this, outputHints);
this.hints = outputHints;
return Datasets.createOutputFor(flow, input, this);
}

public Set<OutputHint> getHints() {
return hints;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@
public class ReduceByKey<IN, KEY, VALUE, OUT, W extends Window>
extends StateAwareWindowWiseSingleInputOperator<
IN, IN, IN, KEY, Pair<KEY, OUT>, W,
ReduceByKey<IN, KEY, VALUE, OUT, W>>
implements Builders.OutputValues<KEY, OUT> {
ReduceByKey<IN, KEY, VALUE, OUT, W>> {


public static class OfBuilder implements Builders.Of {
Expand Down Expand Up @@ -349,7 +348,7 @@ public static class DatasetBuilder5<IN, KEY, VALUE, OUT, W extends Window>
public Dataset<Pair<KEY, OUT>> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
ReduceByKey<IN, KEY, VALUE, OUT, W> reduce = new ReduceByKey<>(
name, flow, input, keyExtractor, valueExtractor,
name, flow, input, keyExtractor, valueExtractor,
windowing, reducer, valuesComparator, Sets.newHashSet(outputHints));
flow.add(reduce);
return reduce.output();
Expand Down Expand Up @@ -396,23 +395,12 @@ public static OfBuilder named(String name) {
UnaryFunction<IN, KEY> keyExtractor,
UnaryFunction<IN, VALUE> valueExtractor,
@Nullable Windowing<IN, W> windowing,
CombinableReduceFunction<OUT> reducer) {
CombinableReduceFunction<OUT> reducer,
Set<OutputHint> outputHints) {
this(
name, flow, input, keyExtractor, valueExtractor,
windowing, (ReduceFunctor<VALUE, OUT>) toReduceFunctor(reducer),
null, Collections.emptySet());
}

ReduceByKey(String name,
Flow flow,
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
UnaryFunction<IN, VALUE> valueExtractor,
@Nullable Windowing<IN, W> windowing,
ReduceFunctor<VALUE, OUT> reducer,
@Nullable BinaryFunction<VALUE, VALUE, Integer> valueComparator) {
this(name, flow, input, keyExtractor, valueExtractor, windowing, reducer, valueComparator,
Collections.emptySet());
null, outputHints);
}

ReduceByKey(String name,
Expand Down Expand Up @@ -443,11 +431,6 @@ public UnaryFunction<IN, VALUE> getValueExtractor() {
return valueExtractor;
}

@Override
public Dataset<Pair<KEY, OUT>> output(OutputHint... outputHints) {
return output();
}

@SuppressWarnings("unchecked")
@Override
public DAG<Operator<?, ?>> getBasicOps() {
Expand All @@ -460,7 +443,7 @@ public Dataset<Pair<KEY, OUT>> output(OutputHint... outputHints) {
Operator reduceState = new ReduceStateByKey(getName(),
flow, input, keyExtractor, valueExtractor,
windowing,
stateFactory, stateCombine);
stateFactory, stateCombine, getHints());
return DAG.of(reduceState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,18 +352,6 @@ public static OfBuilder named(String name) {
private final UnaryFunction<IN, VALUE> valueExtractor;
private final StateMerger<VALUE, OUT, STATE> stateCombiner;

ReduceStateByKey(String name,
Flow flow,
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
UnaryFunction<IN, VALUE> valueExtractor,
@Nullable Windowing<IN, W> windowing,
StateFactory<VALUE, OUT, STATE> stateFactory,
StateMerger<VALUE, OUT, STATE> stateMerger) {
this(name, flow, input, keyExtractor, valueExtractor, windowing, stateFactory, stateMerger,
Collections.emptySet());
}

ReduceStateByKey(String name,
Flow flow,
Dataset<IN> input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public ReduceFunctor<VALUE, OUT> getReducer() {
rbk = new ReduceByKey<>(
getName() + "::ReduceByKey", getFlow(), input,
getKeyExtractor(), valueExtractor,
windowing, reducer, valueComparator);
windowing, reducer, valueComparator, getHints());
dag.add(rbk);
} else {
// otherwise we use attached windowing, therefore
Expand All @@ -315,7 +315,7 @@ public ReduceFunctor<VALUE, OUT> getReducer() {
rbk = new ReduceByKey<>(
getName() + "::ReduceByKey::attached", getFlow(), map.output(),
Pair::getFirst, p -> valueExtractor.apply(p.getSecond()),
null, reducer, valueComparator);
null, reducer, valueComparator, getHints());
dag.add(map);
dag.add(rbk);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public static OfBuilder named(String name) {
public DAG<Operator<?, ?>> getBasicOps() {
ReduceByKey<IN, KEY, Long, Long, W> reduceByKey =
new ReduceByKey<>(getName(), input.getFlow(), input,
keyExtractor, valueExtractor, windowing, Sums.ofLongs());
keyExtractor, valueExtractor, windowing, Sums.ofLongs(), getHints());
return DAG.of(reduceByKey);
}
}
Loading

0 comments on commit e4f0df5

Please sign in to comment.