Skip to content

Commit

Permalink
[FLINK-34217][docs] Update user doc for type serialization with FLIP-398
Browse files Browse the repository at this point in the history
  • Loading branch information
X-czh authored and reswqa committed Feb 4, 2024
1 parent 5fe6f20 commit 60795b7
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,37 @@ If you use a custom type in your Flink program which cannot be serialized by the
Flink type serializer, Flink falls back to using the generic Kryo
serializer. You may register your own serializer or a serialization system like
Google Protobuf or Apache Thrift with Kryo. To do that, simply register the type
class and the serializer in the `ExecutionConfig` of your Flink program.
class and the serializer via the configuration option
[pipeline.serialization-config]({{< ref "docs/deployment/config#pipeline-serialization-config" >}}):

```yaml
pipeline.serialization-config:
- org.example.MyCustomType: {type: kryo, kryo-type: registered, class: org.example.MyCustomSerializer}
```
You could also programmatically set it as follows:
```java
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Configuration config = new Configuration();

// register the class of the serializer as serializer for a type
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
config.set(PipelineOptions.SERIALIZATION_CONFIG,
"[org.example.MyCustomType: {type: kryo, kryo-type: registered, class: org.example.MyCustomSerializer}]");

// register an instance as serializer for a type
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
```

Note that your custom serializer has to extend Kryo's Serializer class. In the
case of Google Protobuf or Apache Thrift, this has already been done for
you:

```java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// register the Google Protobuf serializer with Kryo
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);

// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);

you.

```yaml
pipeline.serialization-config:
# register the Google Protobuf serializer with Kryo
- org.example.MyCustomProtobufType: {type: kryo, kryo-type: registered, class: com.twitter.chill.protobuf.ProtobufSerializer}
# register the serializer included with Apache Thrift as the standard serializer
# TBaseSerializer states it should be initialized as a default Kryo serializer
- org.example.MyCustomThriftType: {type: kryo, kryo-type: default, class: com.twitter.chill.thrift.TBaseSerializer}
```
For the above example to work, you need to include the necessary dependencies in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,38 @@ The most frequent issues where users need to interact with Flink's data type han

* **Registering subtypes:** If the function signatures describe only the supertypes, but they actually use subtypes of those during execution,
it may increase performance a lot to make Flink aware of these subtypes.
For that, call `.registerType(clazz)` on the `StreamExecutionEnvironment` for each subtype.
For that, register serializers for each subtype via
[pipeline.serialization-config]({{< ref "docs/deployment/config#pipeline-serialization-config" >}}):

```yaml
pipeline.serialization-config:
# register serializer for POJO types
- org.example.MyCustomType1: {type: pojo, class: org.example.MyCustomSerializer1}
# register serializer for generic types with Kryo
- org.example.MyCustomType2: {type: kryo, kryo-type: registered, class: org.example.MyCustomSerializer2}
```
You could also programmatically set it as follows (note that the more compact flow-style YAML is used in the code example,
while block-style YAML list is used in the above example for better readability):
```java
Configuration config = new Configuration();
config.set(PipelineOptions.SERIALIZATION_CONFIG,
"[org.example.MyCustomType1: {type: pojo, class: org.example.MyCustomSerializer1},"
+ "org.example.MyCustomType2: {type: kryo, kryo-type: registered, class: org.example.MyCustomSerializer2}]");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
```

* **Registering custom serializers:** Flink falls back to [Kryo](https://github.com/EsotericSoftware/kryo) for the types that it does not handle transparently
by itself. Not all types are seamlessly handled by Kryo (and thus by Flink). For example, many Google Guava collection types do not work well
by default. The solution is to register additional serializers for the types that cause problems.
Call `.getConfig().addDefaultKryoSerializer(clazz, serializer)` on the `StreamExecutionEnvironment`.
by default. The solution is to register additional serializers for the types that cause problems via
[pipeline.serialization-config]({{< ref "docs/deployment/config#pipeline-serialization-config" >}}):

```yaml
pipeline.serialization-config:
- org.example.MyCustomType: {type: kryo, kryo-type: default, class: org.example.MyCustomSerializer}
```
Additional Kryo serializers are available in many libraries. See [3rd party serializer]({{< ref "docs/dev/datastream/fault-tolerance/serialization/third_party_serializers" >}}) for more details on working with external serializers.
* **Adding Type Hints:** Sometimes, when Flink cannot infer the generic types despite all tricks, a user must pass a *type hint*. That is generally
Expand Down Expand Up @@ -359,12 +385,12 @@ You can still use the same method as in Java as a fallback.
There are two ways to create a TypeSerializer.

The first is to simply call `typeInfo.createSerializer(config)` on the `TypeInformation` object.
The `config` parameter is of type `ExecutionConfig` and holds the information about the program's registered
custom serializers. Where ever possibly, try to pass the programs proper ExecutionConfig. You can usually
obtain it from `DataStream` via calling `getExecutionConfig()`.
The `config` parameter is of type `SerializerConfig` and holds the information about the program's registered
custom serializers. Where ever possibly, try to pass the programs proper `SerializerConfig`.

The second is to use getRuntimeContext().createSerializer(typeInfo) within a function. Inside functions
(like `MapFunction`), you can get it by making the function a [Rich Function]() and calling
The second is to use `getRuntimeContext().createSerializer(typeInfo)` within a function. Inside functions
(like `MapFunction`), you can get it by making the function a
[Rich Function]({{< ref "docs/dev/datastream/user_defined_functions#rich-functions" >}}) and calling
`getRuntimeContext().createSerializer(typeInfo)`.

--------
Expand Down Expand Up @@ -489,54 +515,62 @@ int, long, String etc. are handled by serializers we ship with Flink.
For all other types, we fall back to [Kryo](https://github.com/EsotericSoftware/kryo).

If Kryo is not able to handle the type, you can ask the `PojoTypeInfo` to serialize the POJO using [Avro](https://avro.apache.org).
To do so, you have to call
To do so, make sure to include the `flink-avro` module, and set:

```java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();
```yaml
pipeline.force-avro: true
```

Note that Flink is automatically serializing POJOs generated by Avro with the Avro serializer.

If you want your **entire** POJO Type to be treated by the Kryo serializer, set
If you want your **entire** POJO Type to be treated by the Kryo serializer, set:

```java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();
```

If Kryo is not able to serialize your POJO, you can add a custom serializer to Kryo, using
```java
env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);
```yaml
pipeline.force-kryo: true
```

There are different variants of these methods available.
If Kryo is not able to serialize your POJO, you can add a custom serializer to Kryo,
using [pipeline.serialization-config]({{< ref "docs/deployment/config#pipeline-serialization-config" >}}):

```yaml
pipeline.serialization-config:
- org.example.MyCustomType: {type: kryo, kryo-type: registered, class: org.example.MyCustomSerializer}
```

## Disabling Kryo Fallback

There are cases when programs may want to explicitly avoid using Kryo as a fallback for generic types. The most
common one is wanting to ensure that all types are efficiently serialized either through Flink's own serializers,
or via user-defined custom serializers.
or via user-defined custom serializers. To do that, set:

The setting below will raise an exception whenever a data type is encountered that would go through Kryo:
```java
env.getConfig().disableGenericTypes();
```yaml
pipeline.generic-types: false
```

An exception will be raised whenever a data type is encountered that would go through Kryo.

## Defining Type Information using a Factory

A type information factory allows for plugging-in user-defined type information into the Flink type system.
You have to implement `org.apache.flink.api.common.typeinfo.TypeInfoFactory` to return your custom type information.
The factory is called during the type extraction phase if either the corresponding type or a POJO's field using
this type has been annotated with the `@org.apache.flink.api.common.typeinfo.TypeInfo` annotation.
The factory is called during the type extraction phase to supply custom type information for the corresponding type.

In a hierarchy of types, the closest factory will be chosen while traversing upwards.
However, a built-in factory has the highest precedence. A factory also has higher precedence than
Flink's built-in types, therefore you should know what you are doing.

You can associate your custom type information factory with the corresponding type via the configuration option
[pipeline.serialization-config]({{< ref "docs/deployment/config#pipeline-serialization-config" >}}):

Type information factories can be used in both the Java and Scala API.
```yaml
pipeline.serialization-config:
- org.example.MyCustomType: {type: typeinfo, class: org.example.MyCustomTypeInfoFactory}
```

In a hierarchy of types the closest factory
will be chosen while traversing upwards, however, a built-in factory has highest precedence. A factory has
also higher precedence than Flink's built-in types, therefore you should know what you are doing.
Alternatively, you can annotate either the corresponding type or a POJO's field using this type with the
`@org.apache.flink.api.common.typeinfo.TypeInfo` annotation to have the factory associated.
It can be used in both the Java and Scala API. Note that the type information factory associated via
configuration option will have higher precedence.

The following example shows how to annotate a custom type `MyTuple` and supply custom type information for it using a factory in Java.

Expand Down

0 comments on commit 60795b7

Please sign in to comment.