Skip to content

Commit

Permalink
[euphoria-core] #259 Hints are not runtime specific. Hints describe O…
Browse files Browse the repository at this point in the history
…utputDataset
  • Loading branch information
mareksimunek committed Mar 7, 2018
1 parent 537cef1 commit 83ec83b
Show file tree
Hide file tree
Showing 34 changed files with 325 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.client.operator.hint.OutputHintAware;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Collection;
import javax.annotation.Nullable;

/**
* A dataset abstraction.
*
* @param <T> type of elements of this data set
*/
@Audience(Audience.Type.CLIENT)
public interface Dataset<T> extends Serializable {
public interface Dataset<T> extends OutputHintAware<OutputHint>, Serializable {

/**
* @return the flow associated with this data set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;

import java.util.Set;

/**
* Various dataset related utils.
Expand All @@ -39,9 +42,9 @@ public class Datasets {
* @return a dataset representing the output of the given operator
*/
public static <IN, OUT> Dataset<OUT> createOutputFor(
Flow flow, Dataset<IN> input, Operator<IN, OUT> op) {
Flow flow, Dataset<IN> input, Operator<IN, OUT> op, Set<OutputHint> outputHints) {

return new OutputDataset<>(flow, op, input.isBounded());
return new OutputDataset<>(flow, op, input.isBounded(), outputHints);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import java.util.Collection;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;

/**
* {@code PCollection} that is input of a {@code Flow}.
Expand Down Expand Up @@ -57,6 +61,15 @@ public void persist(DataSink<T> sink) {
"The input dataset is already stored.");
}

/**
* Input Dataset doesn't have hints
* @return empty set
*/
@Override
public Set<OutputHint> getHints() {
return Collections.emptySet();
}

@Override
public Flow getFlow() {
return flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import java.util.Collection;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Set;

/**
* {@code PCollection} that is output of some operator.
Expand All @@ -32,13 +35,15 @@ class OutputDataset<T> implements Dataset<T> {
private final Flow flow;
private final Operator<?, T> producer;
private final boolean bounded;
private final Set<OutputHint> outputHints;

private DataSink<T> outputSink = null;

public OutputDataset(Flow flow, Operator<?, T> producer, boolean bounded) {
public OutputDataset(Flow flow, Operator<?, T> producer, boolean bounded, Set<OutputHint> outputHints) {
this.flow = flow;
this.producer = producer;
this.bounded = bounded;
this.outputHints = outputHints;
}

@Nullable
Expand Down Expand Up @@ -79,5 +84,8 @@ public boolean isBounded() {
return flow.getConsumersOf(this);
}


@Override
public Set<OutputHint> getHints() {
return outputHints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.functional.ExtractEventTime;
import cz.seznam.euphoria.core.annotation.operator.Derived;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.ExtractEventTime;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;

import java.util.Objects;
import java.util.Set;

/** A convenient alias for assignment of event time.
*
Expand Down Expand Up @@ -87,9 +90,10 @@ public static class OutputBuilder<IN> implements Builders.Output<IN> {
}

@Override
public Dataset<IN> output() {
public Dataset<IN> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
AssignEventTime<IN> op = new AssignEventTime<>(name, flow, input, eventTimeFn);
AssignEventTime<IN> op = new AssignEventTime<>(name, flow, input, eventTimeFn, Sets.newHashSet
(outputHints));
flow.add(op);
return op.output();
}
Expand All @@ -98,8 +102,8 @@ public Dataset<IN> output() {
private final ExtractEventTime<IN> eventTimeFn;

AssignEventTime(String name, Flow flow, Dataset<IN> input,
ExtractEventTime<IN> eventTimeFn) {
super(name, flow, input);
ExtractEventTime<IN> eventTimeFn, Set<OutputHint> outputHints) {
super(name, flow, input, outputHints);
this.eventTimeFn = eventTimeFn;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.client.util.Pair;

import java.util.Set;

/**
* Common methods used in operator builders to share related javadoc
* descriptions.<p>
Expand Down Expand Up @@ -94,7 +93,7 @@ public interface Output<T> {
*
* @return the dataset representing the new operator's output
*/
Dataset<T> output();
Dataset<T> output(OutputHint... outputHints);
}

public interface OutputValues<K, V> extends Output<Pair<K, V>> {
Expand All @@ -106,24 +105,13 @@ public interface OutputValues<K, V> extends Output<Pair<K, V>> {
*
* @return the dataset representing the new operator's output
*/
default Dataset<V> outputValues() {
default Dataset<V> outputValues(OutputHint... outputHints) {
return MapElements
.named("extract-values")
.of(output())
.using(Pair::getSecond)
.output();
.output(outputHints);
}
}

public interface OutputWithHint<T, HINT extends Hint> extends Output<T> {

/**
* Add runtime specific hints for the operator
*
* @param hints runtime specific hints
* @return output builder
*/
Output<T> withHints(Set<HINT> hints);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;

import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Set;

/**
* Operator counting elements with same key.
Expand Down Expand Up @@ -100,8 +103,8 @@ public static class WindowingBuilder<IN, KEY>
}

@Override
public Dataset<Pair<KEY, Long>> output() {
return windowBy(null).output();
public Dataset<Pair<KEY, Long>> output(OutputHint... outputHints) {
return windowBy(null).output(outputHints);
}
}

Expand All @@ -122,10 +125,10 @@ public static class OutputBuilder<IN, KEY, W extends Window>
}

@Override
public Dataset<Pair<KEY, Long>> output() {
public Dataset<Pair<KEY, Long>> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
CountByKey<IN, KEY, W> count = new CountByKey<>(
name, flow, input, keyExtractor, windowing);
name, flow, input, keyExtractor, windowing, Sets.newHashSet(outputHints));
flow.add(count);
return count.output();
}
Expand Down Expand Up @@ -163,9 +166,10 @@ public static OfBuilder named(String name) {
Flow flow,
Dataset<IN> input,
UnaryFunction<IN, KEY> extractor,
@Nullable Windowing<IN, W> windowing) {
@Nullable Windowing<IN, W> windowing,
Set<OutputHint> outputHints) {

super(name, flow, input, extractor, windowing);
super(name, flow, input, extractor, windowing, outputHints);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;

import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Set;

/**
* Operator outputting distinct (based on {@link Object#equals}) elements.
Expand Down Expand Up @@ -126,8 +129,7 @@ private WindowingBuilder(
return new OutputBuilder<>(name, input, mapper, windowing);
}

@Override
public Dataset<ELEM> output() {
public Dataset<ELEM> output(OutputHint... outputHints) {
return new OutputBuilder<>(name, input, mapper, null).output();
}
}
Expand All @@ -149,10 +151,10 @@ public static class OutputBuilder<IN, ELEM, W extends Window>
}

@Override
public Dataset<ELEM> output() {
public Dataset<ELEM> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
Distinct<IN, ELEM, W> distinct = new Distinct<>(
name, flow, input, mapper, windowing);
name, flow, input, mapper, windowing, Sets.newHashSet(outputHints));
flow.add(distinct);
return distinct.output();
}
Expand Down Expand Up @@ -190,9 +192,10 @@ public static OfBuilder named(String name) {
Flow flow,
Dataset<IN> input,
UnaryFunction<IN, ELEM> mapper,
@Nullable Windowing<IN, W> windowing) {
@Nullable Windowing<IN, W> windowing,
Set<OutputHint> outputHints) {

super(name, flow, input, mapper, windowing);
super(name, flow, input, mapper, windowing, outputHints);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;

import java.util.Set;

/**
* Operator working element-wise, with no context between elements.
Expand All @@ -29,9 +32,9 @@ public abstract class ElementWiseOperator<IN, OUT>

protected final Dataset<OUT> output;

protected ElementWiseOperator(String name, Flow flow, Dataset<IN> input) {
protected ElementWiseOperator(String name, Flow flow, Dataset<IN> input, Set<OutputHint> outputHints) {
super(name, flow, input);
this.output = createOutput(input);
this.output = createOutput(input, outputHints);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryPredicate;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;

import java.util.Objects;
import java.util.Set;

/**
* Operator performing a filter operation.
Expand Down Expand Up @@ -94,9 +97,9 @@ private OutputBuilder(String name, Dataset<IN> input, UnaryPredicate<IN> predica
}

@Override
public Dataset<IN> output() {
public Dataset<IN> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
Filter<IN> filter = new Filter<>(name, flow, input, predicate);
Filter<IN> filter = new Filter<>(name, flow, input, predicate, Sets.newHashSet(outputHints));
flow.add(filter);

return filter.output();
Expand Down Expand Up @@ -134,8 +137,9 @@ public static OfBuilder named(String name) {

final UnaryPredicate<IN> predicate;

Filter(String name, Flow flow, Dataset<IN> input, UnaryPredicate<IN> predicate) {
super(name, flow, input);
Filter(String name, Flow flow, Dataset<IN> input, UnaryPredicate<IN> predicate, Set<OutputHint>
outputHints) {
super(name, flow, input, outputHints);
this.predicate = predicate;
}

Expand Down
Loading

0 comments on commit 83ec83b

Please sign in to comment.