Skip to content

Commit

Permalink
Merge pull request #257 from seznam/simunek/flinkSerializer
Browse files Browse the repository at this point in the history
[euphoria-flink] flink register classes with serializers
  • Loading branch information
dmvk authored Feb 5, 2018
2 parents 5d2f371 + 258fdc6 commit fd674d8
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.flink;

import com.esotericsoftware.kryo.Serializer;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.windowing.GlobalWindowing;
import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval;
Expand All @@ -26,14 +27,12 @@
import cz.seznam.euphoria.flink.batch.BatchElement;
import cz.seznam.euphoria.flink.streaming.StreamingElement;
import cz.seznam.euphoria.flink.streaming.windowing.KeyedMultiWindowedElement;
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;
import java.util.HashMap;

/**
* Unified interface for Flink batch and stream execution environments.
Expand Down Expand Up @@ -61,21 +60,21 @@ public enum Mode {
Mode mode,
boolean local,
int parallelism,
Set<Class<?>> registeredClasses) {
HashMap<Class<?>, Class<? extends Serializer>> registeredClasses) {

Set<Class<?>> toRegister = getClassesToRegister(registeredClasses);
HashMap<Class<?>, Class<? extends Serializer>> toRegister = getClassesToRegister(registeredClasses);

LOG.info(
"Creating ExecutionEnvironment mode {} with parallelism {}",
mode, parallelism);
if (mode == Mode.BATCH) {
batchEnv = local ? org.apache.flink.api.java.ExecutionEnvironment.createLocalEnvironment(parallelism) :
org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment();
toRegister.forEach(batchEnv::registerType);
registerClasses(toRegister, batchEnv);
} else {
streamEnv = local ? StreamExecutionEnvironment.createLocalEnvironment(parallelism) :
StreamExecutionEnvironment.getExecutionEnvironment();
toRegister.forEach(streamEnv::registerType);
registerClasses(toRegister, streamEnv);
}
LOG.info("Registered classes {} within flink's runtime", toRegister);
}
Expand Down Expand Up @@ -141,20 +140,42 @@ public static Mode determineMode(Flow flow) {
return Mode.BATCH;
}

private Set<Class<?>> getClassesToRegister(Set<Class<?>> registeredClasses) {
HashSet<Class<?>> ret = Sets.newHashSet(registeredClasses);
@SuppressWarnings("unchecked")
private void registerClasses(HashMap<Class<?>, Class<? extends Serializer>> toRegister,
org.apache.flink.api.java.ExecutionEnvironment environment) {
toRegister.forEach((Class<?> key, Class<? extends Serializer> value) -> {
if (value != null) {
environment.registerTypeWithKryoSerializer(key, (Class<? extends Serializer<?>>) value);
} else {
environment.registerType(key);
}
});
}

private void registerClasses(HashMap<Class<?>, Class<? extends Serializer>> toRegister,
StreamExecutionEnvironment environment) {
toRegister.forEach((Class<?> key, Class<? extends Serializer> value) -> {
if (value != null) {
environment.registerTypeWithKryoSerializer(key, value);
} else {
environment.registerType(key);
}
});
}

private HashMap<Class<?>, Class<? extends Serializer>> getClassesToRegister(
HashMap<Class<?>, Class<? extends Serializer>> registeredClasses) {
HashMap<Class<?>, Class<? extends Serializer>> ret = new HashMap<>(registeredClasses);
// register all types of used windows
ret.add(GlobalWindowing.Window.class);
ret.add(TimeInterval.class);
ret.add(TimeSliding.SlidingWindowSet.class);

ret.add(Either.class);
ret.add(Pair.class);
ret.add(Triple.class);
ret.add(StreamingElement.class);
ret.add(BatchElement.class);
ret.add(KeyedMultiWindowedElement.class);
ret.put(GlobalWindowing.Window.class, null);
ret.put(TimeInterval.class, null);
ret.put(TimeSliding.SlidingWindowSet.class, null);
ret.put(Either.class, null);
ret.put(Pair.class, null);
ret.put(Triple.class, null);
ret.put(StreamingElement.class, null);
ret.put(BatchElement.class, null);
ret.put(KeyedMultiWindowedElement.class, null);
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.flink;

import com.esotericsoftware.kryo.Serializer;
import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider;
import cz.seznam.euphoria.core.client.accumulators.VoidAccumulatorProvider;
import cz.seznam.euphoria.core.client.dataset.windowing.GlobalWindowing;
Expand All @@ -30,7 +31,6 @@
import cz.seznam.euphoria.flink.batch.BatchFlowTranslator;
import cz.seznam.euphoria.flink.streaming.StreamingElement;
import cz.seznam.euphoria.flink.streaming.StreamingFlowTranslator;
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.slf4j.Logger;
Expand All @@ -40,10 +40,10 @@
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -63,7 +63,7 @@ public class FlinkExecutor implements Executor {
private Duration latencyTracking = Duration.ofSeconds(2);
private FlinkAccumulatorFactory accumulatorFactory =
new FlinkAccumulatorFactory.Adapter(VoidAccumulatorProvider.getFactory());
private final Set<Class<?>> registeredClasses = getDefaultClasses();
private final HashMap<Class<?>, Class<? extends Serializer>> registeredClasses = getDefaultClasses();
@Nullable
private Duration checkpointInterval;

Expand Down Expand Up @@ -287,7 +287,19 @@ public FlinkExecutor setLatencyTrackingInterval(Duration interval) {
* @return this instance (for method chaining purposes)
*/
public FlinkExecutor registerClass(Class<?> cls) {
registeredClasses.add(cls);
registeredClasses.put(cls, null);
return this;
}

/**
* Pre-register given class to flink for serialization purposes.
*
* @param cls the type of objects which flink is supposed to serialize/deserialize
* @param classSeriliazer serilizer
* @return this instance (for method chaining purposes)
*/
public FlinkExecutor registerClass(Class<?> cls, Class<? extends Serializer> classSeriliazer) {
registeredClasses.put(cls, classSeriliazer);
return this;
}

Expand Down Expand Up @@ -326,13 +338,15 @@ protected int getParallelism() {

// return classes that should be registered by default
// because the flink executor (might) use them by default
private Set<Class<?>> getDefaultClasses() {
return Sets.newHashSet(
Pair.class,
Window.class,
GlobalWindowing.Window.class,
TimeInterval.class,
BatchElement.class,
StreamingElement.class);
private HashMap<Class<?>, Class<? extends Serializer>> getDefaultClasses() {
HashMap<Class<?>, Class<? extends Serializer>> classSerializerMap = new HashMap<>();
classSerializerMap.put(Pair.class, null);
classSerializerMap.put(Window.class, null);
classSerializerMap.put(GlobalWindowing.class, null);
classSerializerMap.put(TimeInterval.class, null);
classSerializerMap.put(BatchElement.class, null);
classSerializerMap.put(StreamingElement.class, null);
return classSerializerMap;

}
}

0 comments on commit fd674d8

Please sign in to comment.