Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#14 [euphoria-flink] Don't send timestamp along with each element. #59

Merged
merged 3 commits into from
Mar 27, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.flink.batch.BatchElement;
import cz.seznam.euphoria.flink.streaming.StreamingElement;
import cz.seznam.euphoria.flink.streaming.windowing.KeyedMultiWindowedElement;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -138,7 +140,8 @@ private Set<Class<?>> getClassesToRegister(Set<Class<?>> registeredClasses) {
ret.add(TimeInterval.class);

ret.add(Pair.class);
ret.add(FlinkElement.class);
ret.add(StreamingElement.class);
ret.add(BatchElement.class);
ret.add(KeyedMultiWindowedElement.class);
return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ protected FlowTranslator createStreamTranslator(Settings settings,
/**
* See {@link ExecutionConfig#disableObjectReuse()}
* and {@link ExecutionConfig#disableObjectReuse()}.
*
* @param reuse set TRUE for enabling object reuse
*
* @return this instance (for method chaining purposes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

*/
public FlinkExecutor setObjectReuse(boolean reuse){
this.objectReuse = reuse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.flink;
package cz.seznam.euphoria.flink.batch;

import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement;

/**
* Single element flowing through Flink pipeline. Every such element
* Single element flowing through Flink batch pipeline. Every such element
* is associated with a window identifier and timestamp.
* @param <W> type of the assigned window
* @param <T> type of the data element
*/
public class FlinkElement<W extends Window, T> implements WindowedElement<W, T> {
public class BatchElement<W extends Window, T> implements WindowedElement<W, T> {

private Window window;
private long timestamp;
private T element;

// This class needs to ne POJO for effective serialization
public FlinkElement() {
public BatchElement() {
}

public FlinkElement(W window, long timestamp, T element) {
public BatchElement(W window, long timestamp, T element) {
this.window = window;
this.timestamp = timestamp;
this.element = element;
Expand All @@ -51,20 +51,20 @@ public void setWindow(W window) {
}

@Override
public long getTimestamp() {
return timestamp;
public T getElement() {
return element;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
public void setElement(T element) {
this.element = element;
}

@Override
public T getElement() {
return element;
public long getTimestamp() {
return timestamp;
}

public void setElement(T element) {
this.element = element;
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import cz.seznam.euphoria.core.client.functional.UnaryFunctor;
import cz.seznam.euphoria.core.client.operator.FlatMap;
import cz.seznam.euphoria.flink.FlinkElement;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.functions.UnaryFunctorWrapper;
import org.apache.flink.api.java.DataSet;
Expand All @@ -32,7 +31,7 @@ public DataSet<?> translate(FlinkOperator<FlatMap> operator,
UnaryFunctor mapper = operator.getOriginalOperator().getFunctor();
return input
.flatMap(new UnaryFunctorWrapper<>(mapper))
.returns((Class) FlinkElement.class)
.returns((Class) BatchElement.class)
.setParallelism(operator.getParallelism())
.name(operator.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.ReduceByKey;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.flink.FlinkElement;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.Utils;
import cz.seznam.euphoria.flink.functions.PartitionerWrapper;
Expand Down Expand Up @@ -78,14 +77,14 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
final UnaryFunction udfValue = origOperator.getValueExtractor();

// ~ extract key/value from input elements and assign windows
DataSet<FlinkElement<Window, Pair>> tuples;
DataSet<BatchElement<Window, Pair>> tuples;
{
// FIXME require keyExtractor to deliver `Comparable`s

UnaryFunction<Object, Long> timeAssigner = origOperator.getEventTimeAssigner();
FlatMapOperator<Object, FlinkElement<Window, Pair>> wAssigned =
FlatMapOperator<Object, BatchElement<Window, Pair>> wAssigned =
input.flatMap((i, c) -> {
FlinkElement wel = (FlinkElement) i;
BatchElement wel = (BatchElement) i;
if (timeAssigner != null) {
long stamp = timeAssigner.apply(wel.getElement());
wel.setTimestamp(stamp);
Expand All @@ -96,18 +95,18 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
long stamp = (wid instanceof TimedWindow)
? ((TimedWindow) wid).maxTimestamp()
: wel.getTimestamp();
c.collect(new FlinkElement<>(
c.collect(new BatchElement<>(
wid, stamp, Pair.of(udfKey.apply(el), udfValue.apply(el))));
}
});
tuples = wAssigned
.name(operator.getName() + "::map-input")
.setParallelism(operator.getParallelism())
.returns(new TypeHint<FlinkElement<Window, Pair>>() {});
.returns(new TypeHint<BatchElement<Window, Pair>>() {});
}

// ~ reduce the data now
Operator<FlinkElement<Window, Pair>, ?> reduced;
Operator<BatchElement<Window, Pair>, ?> reduced;
reduced = tuples
.groupBy(new RBKKeySelector())
.reduce(new RBKReducer(reducer));
Expand All @@ -125,8 +124,8 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
.partitionCustom(
new PartitionerWrapper<>(origOperator.getPartitioning().getPartitioner()),
Utils.wrapQueryable(
(KeySelector<FlinkElement<Window, Pair>, Comparable>)
(FlinkElement<Window, Pair> we) -> (Comparable) we.getElement().getKey(),
(KeySelector<BatchElement<Window, Pair>, Comparable>)
(BatchElement<Window, Pair> we) -> (Comparable) we.getElement().getKey(),
Comparable.class))
.setParallelism(operator.getParallelism());
}
Expand All @@ -141,18 +140,18 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
*/
@SuppressWarnings("unchecked")
static class RBKKeySelector
implements KeySelector<FlinkElement<Window, Pair>, Tuple2<Comparable, Comparable>> {
implements KeySelector<BatchElement<Window, Pair>, Tuple2<Comparable, Comparable>> {

@Override
public Tuple2<Comparable, Comparable> getKey(
FlinkElement<Window, Pair> value) {
BatchElement<Window, Pair> value) {

return new Tuple2(value.getWindow(), value.getElement().getKey());
}
}

static class RBKReducer
implements ReduceFunction<FlinkElement<Window, Pair>> {
implements ReduceFunction<BatchElement<Window, Pair>> {

final UnaryFunction<Iterable, Object> reducer;

Expand All @@ -161,11 +160,11 @@ static class RBKReducer
}

@Override
public FlinkElement<Window, Pair>
reduce(FlinkElement<Window, Pair> p1, FlinkElement<Window, Pair> p2) {
public BatchElement<Window, Pair>
reduce(BatchElement<Window, Pair> p1, BatchElement<Window, Pair> p2) {

Window wid = p1.getWindow();
return new FlinkElement<>(
return new BatchElement<>(
wid,
Math.max(p1.getTimestamp(), p2.getTimestamp()),
Pair.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.common.collect.Iterables;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.functional.StateFactory;
Expand All @@ -29,7 +28,6 @@
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.greduce.GroupReducer;
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.flink.FlinkElement;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.Utils;
import cz.seznam.euphoria.flink.functions.PartitionerWrapper;
Expand Down Expand Up @@ -75,9 +73,9 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
UnaryFunction<Object, Long> timeAssigner = origOperator.getEventTimeAssigner();

// FIXME require keyExtractor to deliver `Comparable`s
DataSet<FlinkElement> wAssigned =
DataSet<BatchElement> wAssigned =
input.flatMap((i, c) -> {
FlinkElement wel = (FlinkElement) i;
BatchElement wel = (BatchElement) i;

// assign timestamp if timeAssigner defined
if (timeAssigner != null) {
Expand All @@ -86,27 +84,27 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
Set<Window> assigned = windowing.assignWindowsToElement(wel);
for (Window wid : assigned) {
Object el = wel.getElement();
c.collect(new FlinkElement<>(
c.collect(new BatchElement<>(
wid,
wel.getTimestamp(),
Pair.of(udfKey.apply(el), udfValue.apply(el))));
}
})
.returns(FlinkElement.class)
.returns(BatchElement.class)
.name(operator.getName() + "::map-input")
.setParallelism(operator.getParallelism());

// ~ reduce the data now
DataSet<FlinkElement<?, Pair>> reduced =
DataSet<BatchElement<?, Pair>> reduced =
wAssigned.groupBy((KeySelector)
Utils.wrapQueryable(
// ~ FIXME if the underlying windowing is "non merging" we can group by
// "key _and_ window", thus, better utilizing the available resources
(FlinkElement<?, Pair> we) -> (Comparable) we.getElement().getFirst(),
(BatchElement<?, Pair> we) -> (Comparable) we.getElement().getFirst(),
Comparable.class))
.sortGroup(Utils.wrapQueryable(
(KeySelector<FlinkElement<?, ?>, Long>)
FlinkElement::getTimestamp, Long.class),
(KeySelector<BatchElement<?, ?>, Long>)
BatchElement::getTimestamp, Long.class),
Order.ASCENDING)
.reduceGroup(new RSBKReducer(origOperator, stateStorageProvider, windowing))
.setParallelism(operator.getParallelism())
Expand All @@ -118,8 +116,8 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
.partitionCustom(new PartitionerWrapper<>(
origOperator.getPartitioning().getPartitioner()),
Utils.wrapQueryable(
(KeySelector<FlinkElement<?, Pair>, Comparable>)
(FlinkElement<?, Pair> we) -> (Comparable) we.getElement().getKey(),
(KeySelector<BatchElement<?, Pair>, Comparable>)
(BatchElement<?, Pair> we) -> (Comparable) we.getElement().getKey(),
Comparable.class))
.setParallelism(operator.getParallelism());
}
Expand All @@ -128,8 +126,8 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
}

static class RSBKReducer
implements GroupReduceFunction<FlinkElement<?, Pair>, FlinkElement<?, Pair>>,
ResultTypeQueryable<FlinkElement<?, Pair>>
implements GroupReduceFunction<BatchElement<?, Pair>, BatchElement<?, Pair>>,
ResultTypeQueryable<BatchElement<?, Pair>>
{
private final StateFactory<?, State> stateFactory;
private final CombinableReduceFunction<State> stateCombiner;
Expand All @@ -152,27 +150,27 @@ static class RSBKReducer

@Override
@SuppressWarnings("unchecked")
public void reduce(Iterable<FlinkElement<?, Pair>> values,
org.apache.flink.util.Collector<FlinkElement<?, Pair>> out)
public void reduce(Iterable<BatchElement<?, Pair>> values,
org.apache.flink.util.Collector<BatchElement<?, Pair>> out)
{
GroupReducer reducer = new GroupReducer<>(
stateFactory,
FlinkElement::new,
BatchElement::new,
stateCombiner,
stateStorageProvider,
windowing,
trigger,
elem -> out.collect((FlinkElement) elem));
for (FlinkElement value : values) {
elem -> out.collect((BatchElement) elem));
for (BatchElement value : values) {
reducer.process(value);
}
reducer.close();
}

@Override
@SuppressWarnings("unchecked")
public TypeInformation<FlinkElement<?, Pair>> getProducedType() {
return TypeInformation.of((Class) FlinkElement.class);
public TypeInformation<BatchElement<?, Pair>> getProducedType() {
return TypeInformation.of((Class) BatchElement.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.operator.Repartition;
import cz.seznam.euphoria.flink.FlinkElement;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.Utils;
import cz.seznam.euphoria.flink.functions.PartitionerWrapper;
Expand All @@ -30,16 +29,16 @@ class RepartitionTranslator implements BatchOperatorTranslator<Repartition> {
public DataSet translate(FlinkOperator<Repartition> operator,
BatchExecutorContext context) {

DataSet<FlinkElement> input =
(DataSet<FlinkElement>)context.getSingleInputStream(operator);
DataSet<BatchElement> input =
(DataSet<BatchElement>)context.getSingleInputStream(operator);

Partitioning partitioning = operator.getOriginalOperator().getPartitioning();
PartitionerWrapper flinkPartitioner =
new PartitionerWrapper<>(partitioning.getPartitioner());

return input.partitionCustom(
flinkPartitioner,
Utils.wrapQueryable((FlinkElement we) -> (Comparable) we.getElement(), Comparable.class))
Utils.wrapQueryable((BatchElement we) -> (Comparable) we.getElement(), Comparable.class))
.setParallelism(operator.getParallelism());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.Writer;
import cz.seznam.euphoria.flink.FlinkElement;
import cz.seznam.euphoria.flink.batch.BatchElement;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;

import java.io.IOException;

public class DataSinkWrapper<T>
implements OutputFormat<FlinkElement<?, T>>
implements OutputFormat<BatchElement<?, T>>
{
private final DataSink<T> dataSink;

Expand All @@ -45,7 +45,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
}

@Override
public void writeRecord(FlinkElement<?, T> record) throws IOException {
public void writeRecord(BatchElement<?, T> record) throws IOException {
writer.write(record.getElement());
}

Expand Down
Loading