diff --git a/docs/content.zh/docs/dev/datastream/operators/process_function.md b/docs/content.zh/docs/dev/datastream/operators/process_function.md index 652189393346b..82708b653bd33 100644 --- a/docs/content.zh/docs/dev/datastream/operators/process_function.md +++ b/docs/content.zh/docs/dev/datastream/operators/process_function.md @@ -26,68 +26,61 @@ under the License. # Process Function -## The ProcessFunction +## ProcessFunction -The `ProcessFunction` is a low-level stream processing operation, giving access to the basic building blocks of -all (acyclic) streaming applications: +`ProcessFunction` 是一种底层的流处理操作,基于它用户可以访问(无环)流应用程序的所有基本构建块: - - events (stream elements) - - state (fault-tolerant, consistent, only on keyed stream) - - timers (event time and processing time, only on keyed stream) + - 事件(流元素) + - 状态(容错,一致性,仅在 keyed stream 上) + - 定时器(事件时间和处理时间,仅在 keyed stream 上) -The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events -by being invoked for each event received in the input stream(s). +可以将 `ProcessFunction` 视为一种可以访问 keyed state 和定时器的 `FlatMapFunction`。Flink 为收到的输入流中的每个事件都调用该函数来进行处理。 -For fault-tolerant state, the `ProcessFunction` gives access to Flink's [keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}), accessible via the -`RuntimeContext`, similar to the way other stateful functions can access keyed state. +对于容错,与其它有状态的函数类似,`ProcessFunction` 可以通过 `RuntimeContext` 访问 Flink 的 [keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" >}})。 -The timers allow applications to react to changes in processing time and in [event time]({{< ref "docs/concepts/time" >}}). -Every call to the function `processElement(...)` gets a `Context` object which gives access to the element's -event time timestamp, and to the *TimerService*. The `TimerService` can be used to register callbacks for future -event-/processing-time instants. With event-time timers, the `onTimer(...)` method is called when the current watermark is advanced up to or beyond the timestamp of the timer, while with processing-time timers, `onTimer(...)` is called when wall clock time reaches the specified time. During that call, all states are again scoped to the key with which the timer was created, allowing -timers to manipulate keyed state. +定时器允许应用程序对处理时间和 [事件时间]({{< ref "docs/concepts/time" >}}) 中的更改做出反应。 +每次调用 `processElement(...)` 时参数中都会提供一个 `Context` 对象,该对象可以访问元素的事件时间戳和 *TimerService*。 +`TimerService` 可用于为将来特定的事件时间/处理时间注册回调。 +特定事件时间的 `onTimer(...)` 回调函数会在当前对齐的 watermark 超过所注册的时间戳时调用。 +特定处理时间的 `onTimer(...)` 回调函数则会在系统物理时间超过所注册的时间戳时调用。 +在该调用期间,所有状态会被再次绑定到创建定时器时的键上,从而允许定时器操作与之对应的 keyed state。 {{< hint info >}} -If you want to access keyed state and timers you have -to apply the `ProcessFunction` on a keyed stream: +如果想要访问 keyed state 和定时器,需要在 +keyed stream 上使用 `ProcessFunction`。 {{< /hint >}} ```java stream.keyBy(...).process(new MyProcessFunction()); ``` -## Low-level Joins +## 底层 Join -To realize low-level operations on two inputs, applications can use `CoProcessFunction` or `KeyedCoProcessFunction`. This -function is bound to two different inputs and gets individual calls to `processElement1(...)` and -`processElement2(...)` for records from the two different inputs. +为了在两个输入上实现底层操作,应用程序可以使用 `CoProcessFunction` 或 `KeyedCoProcessFunction`。 +这些函数绑定两个不同的输入,从两个不同的输入中获取元素并分别调用 +`processElement1(...)` 和 `processElement2(...)` 进行处理。 -Implementing a low level join typically follows this pattern: +实现底层 join 一般需要遵循以下模式: - - Create a state object for one input (or both) - - Update the state upon receiving elements from its input - - Upon receiving elements from the other input, probe the state and produce the joined result + - 为一个输入(或两者)创建状态对象。 + - 从某个输入接收元素时更新状态。 + - 从另一个输入接收元素时,查询状态并生成 join 结果。 -For example, you might be joining customer data to financial trades, -while keeping state for the customer data. If you care about having -complete and deterministic joins in the face of out-of-order events, -you can use a timer to evaluate and emit the join for a trade when the -watermark for the customer data stream has passed the time of that -trade. +例如,你可能会将客户数据与金融交易进行 join,同时想要保留客户数据的状态。如果你希望即使在出现乱序事件时仍然可以得到完整且确定的 join 结果,你可以通过注册一个定时器在客户数据流的 watermark 已经超过当前这条金融交易记录时计算和发送 join 结果。 -## Example +## 示例 -In the following example a `KeyedProcessFunction` maintains counts per key, and emits a key/count pair whenever a minute passes (in event time) without an update for that key: +在下面的例子中,`KeyedProcessFunction` 维护每个键的计数,并且每次超过一分钟(事件时间)没有更新时输出一次键/计数对。 - - The count, key, and last-modification-timestamp are stored in a `ValueState`, which is implicitly scoped by key. - - For each record, the `KeyedProcessFunction` increments the counter and sets the last-modification timestamp - - The function also schedules a callback one minute into the future (in event time) - - Upon each callback, it checks the callback's event time timestamp against the last-modification time of the stored count - and emits the key/count if they match (i.e., no further update occurred during that minute) + - 计数,键和最后修改时间存储在 `ValueState` 中,它由键隐式限定范围。 + - 对于每条记录,`KeyedProcessFunction` 递增计数器并设置最后修改时间。 + - 对于每条记录,该函数还会注册了一个一分钟后(事件时间)的回调函数。 + - 在每次回调时,它会根据注册的时间和最后修改时间进行比较,如果正好差一分钟则 + 输出键/计数对(即,在该分钟内没有进一步更新) {{< hint info >}} -This simple example could have been implemented with -session windows. We use `KeyedProcessFunction` here to illustrate the basic pattern it provides. +这个简单的例子本身可以用会话窗口(session window)实现, +这里我们使用 `KeyedProcessFunction` 来展示使用它的基本模式。 {{< /hint >}} {{< tabs "6c8c009c-4c12-4338-9eeb-3be83cfa9e37" >}} @@ -105,16 +98,16 @@ import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerCont import org.apache.flink.util.Collector; -// the source data stream +// 源数据流 DataStream> stream = ...; -// apply the process function onto a keyed stream +// 使用 process function 来处理一个 Keyed Stream DataStream> result = stream .keyBy(value -> value.f0) .process(new CountWithTimeoutFunction()); /** - * The data type stored in the state + * 在状态中保存的数据类型 */ public class CountWithTimestamp { @@ -124,12 +117,12 @@ public class CountWithTimestamp { } /** - * The implementation of the ProcessFunction that maintains the count and timeouts + * 用来维护数量和超时的 ProcessFunction 实现 */ public class CountWithTimeoutFunction extends KeyedProcessFunction, Tuple2> { - /** The state that is maintained by this process function */ + /** 由 process function 管理的状态 */ private ValueState state; @Override @@ -143,23 +136,23 @@ public class CountWithTimeoutFunction Context ctx, Collector> out) throws Exception { - // retrieve the current count + // 获得当前的数量 CountWithTimestamp current = state.value(); if (current == null) { current = new CountWithTimestamp(); current.key = value.f0; } - // update the state's count + // 更新状态中的数量 current.count++; - // set the state's timestamp to the record's assigned event time timestamp + // 将状态中的最后修改时间改为记录的事件时间 current.lastModified = ctx.timestamp(); - // write the state back + // 将更新后的状态写回 state.update(current); - // schedule the next timer 60 seconds from the current event time + // 注册一个 60s 之后的事件时间回调 ctx.timerService().registerEventTimeTimer(current.lastModified + 60000); } @@ -169,12 +162,12 @@ public class CountWithTimeoutFunction OnTimerContext ctx, Collector> out) throws Exception { - // get the state for the key that scheduled the timer + // 获得注册该回调时使用的键对应的状态 CountWithTimestamp result = state.value(); - // check if this is an outdated timer or the latest timer + // 检查当前回调时否是最新的回调还是后续注册了新的回调 if (timestamp == result.lastModified + 60000) { - // emit the state on timeout + // 超时后发送状态 out.collect(new Tuple2(result.key, result.count)); } } @@ -189,25 +182,25 @@ import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.util.Collector -// the source data stream +// 源数据流 val stream: DataStream[Tuple2[String, String]] = ... -// apply the process function onto a keyed stream +// 使用 process function 来处理一个 Keyed Stream val result: DataStream[Tuple2[String, Long]] = stream .keyBy(_._1) .process(new CountWithTimeoutFunction()) /** - * The data type stored in the state + * 存储在状态中的数据类型 */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** - * The implementation of the ProcessFunction that maintains the count and timeouts + * 该 ProcessFunction 的实现用于维护计数和超时 */ class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] { - /** The state that is maintained by this process function */ + /** 由 process function 管理的状态 */ lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])) @@ -217,7 +210,7 @@ class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, Stri ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context, out: Collector[(String, Long)]): Unit = { - // initialize or retrieve/update the state + // 初始化或更新状态 val current: CountWithTimestamp = state.value match { case null => CountWithTimestamp(value._1, 1, ctx.timestamp) @@ -225,10 +218,10 @@ class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, Stri CountWithTimestamp(key, count + 1, ctx.timestamp) } - // write the state back + // 将更新后的状态写回 state.update(current) - // schedule the next timer 60 seconds from the current event time + // 注册一个 60s 之后的事件时间回调 ctx.timerService.registerEventTimeTimer(current.lastModified + 60000) } @@ -269,28 +262,28 @@ class CountWithTimeoutFunction(KeyedProcessFunction): "my_state", Types.PICKLED_BYTE_ARRAY())) def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): - # retrieve the current count + # 获取当前计数 current = self.state.value() if current is None: current = Row(value.f1, 0, 0) - # update the state's count + # 更新该状态的计数 current[1] += 1 - # set the state's timestamp to the record's assigned event time timestamp + # 把状态的时间戳设置为记录指定的事件时间戳 current[2] = ctx.timestamp() - # write the state back + # 将更新后的状态写回 self.state.update(current) - # schedule the next timer 60 seconds from the current event time + # 注册一个 60s 之后的事件时间回调 ctx.timer_service().register_event_time_timer(current[2] + 60000) def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'): - # get the state for the key that scheduled the timer + # 获取调度定时器的键状态 result = self.state.value() - # check if this is an outdated timer or the latest timer + # 检查这是一个过时的定时器还是最新的定时器 if timestamp == result[2] + 60000: # emit the state on timeout yield result[0], result[1] @@ -327,7 +320,7 @@ if __name__ == '__main__': WatermarkStrategy.for_monotonous_timestamps() .with_timestamp_assigner(MyTimestampAssigner())) - # apply the process function onto a keyed stream + # 将 process function 应用于 keyed stream result = watermarked_stream.key_by(lambda value: value[1]) \ .process(CountWithTimeoutFunction()) \ .print() @@ -338,17 +331,14 @@ if __name__ == '__main__': {{< hint warning >}} -Before Flink 1.4.0, when called from a processing-time timer, the `ProcessFunction.onTimer()` method sets -the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's -harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic -depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs -that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. +在 Flink 1.4.0 之前,在调用处理时间定时器时,`ProcessFunction.onTimer()` 方法将当前的处理时间设置为事件时间的时间戳。此行为非常不明显,用户可能不会注意到。 +然而,这样做是有害的,因为处理时间的时间戳是不确定的,并且和 watermark 不一致。此外,用户依赖于此错误的时间戳来实现逻辑很有可能导致非预期的错误。 +因此,我们决定对其进行修复。在 1.4.0 后,使用此错误的事件时间时间戳的 Flink 作业将失败,用户应将其作业更正为正确的逻辑。 {{< /hint >}} -## The KeyedProcessFunction +## KeyedProcessFunction -`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to the key of timers in its `onTimer(...)` -method. +`KeyedProcessFunction` 是 `ProcessFunction` 的一个扩展, 可以在其 `onTimer(...)` 方法中访问定时器的键。 {{< tabs "f8b6791f-023f-4e56-a6e4-8541dd0b3e1b" >}} {{< tab "Java" >}} @@ -380,34 +370,31 @@ def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'): ## Timers -Both types of timers (processing-time and event-time) are internally maintained by the `TimerService` and enqueued for execution. +两种定时器(处理时间定时器和事件时间定时器)都在 `TimerService` 内部维护,并排队等待执行。 -The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. +对于相同的键和时间戳,`TimerService` 会删除重复的定时器,即每个键和时间戳最多有一个定时器。如果为同一时间戳注册了多个定时器,则只调用一次 `onTimer()` 方法。 -Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. +Flink 会同步 `onTimer()` 和 `processElement()` 的调用,因此用户不必担心状态的并发修改。 ### Fault Tolerance -Timers are fault tolerant and checkpointed along with the state of the application. -In case of a failure recovery or when starting an application from a savepoint, the timers are restored. +定时器支持容错,它会和应用程序的状态一起进行 checkpoint。当进行故障恢复或从保存点启动应用程序时,定时器也会被恢复。 {{< hint info >}} -Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. -This might happen when an application recovers from a failure or when it is started from a savepoint. +当应用程序从故障中恢复或从保存点启动时,可能会发生这种情况。即:在恢复之前就应该触发的处理时间定时器会立即触发。 {{< /hint >}} {{< hint info >}} -Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with `FLINK-10026`). -Notice that large numbers of timers can increase the checkpointing time because timers are part of the checkpointed state. See the "Timer Coalescing" section for advice on how to reduce the number of timers. +除了使用基于 RocksDB backend 的增量 snapshots 并使用基于 Heap 的定时器的情况外,Flink 总是会异步执行计算器的快照操作(前者将会在 `FLINK-10026` 解决)。 +大量定时器会增加 checkpoint 的时间,因为定时器是需要 checkpoint 的状态的一部分。有关如何减少定时器数量,请参阅“Timer Coalescing”部分。 {{< /hint >}} ### Timer Coalescing -Since Flink maintains only one timer per key and timestamp, you can reduce the number of timers by reducing the timer resolution to coalesce them. +由于 Flink 中每个键和时间戳只保存一个定时器,因此可以通过降低定时器的精度来合并它们,从而减少定时器的数量。 -For a timer resolution of 1 second (event or processing time), you -can round down the target time to full seconds. Timers will fire at most 1 second earlier but not later than requested with millisecond accuracy. -As a result, there are at most one timer per key and second. +对于精度为 1 秒(事件或处理时间)的定时器,可以将目标时间向下舍入为整秒。定时器最多会提前 1 秒,但不迟于要求的毫秒精度。 +这样,每个键在每秒内最多有一个定时器。 {{< tabs "aa23eeb6-d15f-44f2-85ab-d130a4202d57" >}} {{< tab "Java" >}} @@ -430,8 +417,7 @@ ctx.timer_service().register_processing_time_timer(coalesced_time) {{< /tab >}} {{< /tabs >}} -Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce -these timers with the next watermark by using the current one: +由于事件时间定时器仅在 watermark 到来时才触发,因此还可以将下一个 watermark 到达前的定时器与当前定时器合并: {{< tabs "ef74a1da-c4cd-4fab-8035-d29ffd7039d4" >}} {{< tab "Java" >}} @@ -454,9 +440,9 @@ ctx.timer_service().register_event_time_timer(coalesced_time) {{< /tab >}} {{< /tabs >}} -Timers can also be stopped and removed as follows: +定时器也可以按照以下方式被停止或者删除: -Stopping a processing-time timer: +停止处理时间定时器: {{< tabs "5d0d1344-6f51-44f8-b500-ebe863cedba4" >}} {{< tab "Java" >}} @@ -479,7 +465,7 @@ ctx.timer_service().delete_processing_time_timer(timestamp_of_timer_to_stop) {{< /tab >}} {{< /tabs >}} -Stopping an event-time timer: +停止事件时间定时器: {{< tabs "581e5996-503c-452e-8b2a-a4daeaf4ac88" >}} {{< tab "Java" >}} @@ -503,7 +489,7 @@ ctx.timer_service().delete_event_time_timer(timestamp_of_timer_to_stop) {{< /tabs >}} {{< hint info >}} -Stopping a timer has no effect if no such timer with the given timestamp is registered. +如果没有注册给定时间戳的定时器,则停止定时器不会产生影响。 {{< /hint >}} {{< top >}}