Skip to content

Commit

Permalink
add an disaster restore enable switch button on incr_build panel for …
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Oct 4, 2023
1 parent d88ca3e commit b100002
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.qlangtech.plugins.incr.flink.common;

import com.alibaba.citrus.turbine.Context;
import com.alibaba.fastjson.annotation.JSONField;
import com.qlangtech.tis.annotation.Public;
import com.qlangtech.tis.config.ParamsConfig;
import com.qlangtech.tis.config.flink.IFlinkCluster;
Expand Down Expand Up @@ -58,6 +59,7 @@ public static void main(String[] args) {
@FormField(identity = true, ordinal = 0, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.identity})
public String name;

@JSONField(serialize = false)
@FormField(ordinal = 1, type = FormFieldType.INPUTTEXT, validate = {Validator.host, Validator.require})
public String jobManagerAddress;

Expand All @@ -66,9 +68,16 @@ public static void main(String[] args) {

@Override
public JobManagerAddress getJobManagerAddress() {

return JobManagerAddress.parse(this.jobManagerAddress);
}

@JSONField(serialize = false)
@Override
public Class<?> getDescribleClass() {
return super.getDescribleClass();
}

/**
* 校验是否可用
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.incr.IncrStreamFactory;
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
import org.apache.flink.annotation.Public;
import org.apache.flink.client.program.rest.RestClusterClient;
Expand All @@ -52,6 +53,8 @@
@Public
public class TISFlinkCDCStreamFactory extends IncrStreamFactory {
public static final String NAME_FLINK_CDC = "Flink";
private static final String KEY_FIELD_CHECKPOINT = "checkpoint";
private static final String KEY_FIELD_STATEBACKEND = "stateBackend";

// @FormField(identity = true, ordinal = 0, type = FormFieldType.INPUTTEXT, validate = {Validator.identity})
// public String name = NAME_FLINK_CDC;
Expand All @@ -68,23 +71,35 @@ public class TISFlinkCDCStreamFactory extends IncrStreamFactory {
@FormField(ordinal = 3, type = FormFieldType.INT_NUMBER, validate = {Validator.integer, Validator.require})
public Integer parallelism;


@FormField(ordinal = 4, validate = {Validator.require})
public RestartStrategyFactory restartStrategy;

@FormField(ordinal = 5, validate = {Validator.require})
/**
* 支持任务恢复,当Flink节点因为服务器意外宕机导致当前运行的flink job意外终止,需要支持Flink Job恢复执行,需要Flink配置,配置支持
* 1.持久化stateBackend
* 2.开启checkpoint
*/
@FormField(ordinal = 5, type = FormFieldType.ENUM, validate = {Validator.require})
public Boolean enableRestore;
@FormField(ordinal = 6, validate = {Validator.require})
public CheckpointFactory checkpoint;

@FormField(ordinal = 6, validate = {Validator.require})

@FormField(ordinal = 7, validate = {Validator.require})
public StateBackendFactory stateBackend;

@Override
public Optional<ISavePointSupport> restorable() {
if (checkpoint instanceof CKOn) {
if (isCheckpointEnable()) {
return StateBackendFactory.getSavePointSupport(stateBackend);
}
return Optional.empty();
}

private boolean isCheckpointEnable() {
return checkpoint instanceof CKOn;
}

@Override
public IFlinkIncrJobStatus getIncrJobStatus(TargetResName collection) {
return stateBackend.getIncrJobStatus(collection);
Expand Down Expand Up @@ -159,6 +174,28 @@ public String getDisplayName() {
return NAME_FLINK_CDC;
}

@Override
protected boolean validateAll(IControlMsgHandler msgHandler, Context context, PostFormVals postFormVals) {

TISFlinkCDCStreamFactory plugin = postFormVals.newInstance();

if (plugin.enableRestore) {
if (!plugin.restorable().isPresent()) {
if (!plugin.isCheckpointEnable()) {
msgHandler.addFieldError(context, KEY_FIELD_CHECKPOINT, "请确认是否开启");
}
if (!StateBackendFactory.getSavePointSupport(plugin.stateBackend).isPresent()) {
msgHandler.addFieldError(context, KEY_FIELD_STATEBACKEND, "请使用持久化stateBackend");
}
msgHandler.addErrorMessage(context, "尚未满足可恢复任务配置要求");
return false;
}
}


return super.validateAll(msgHandler, context, postFormVals);
}

/**
* 校验并行度
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@
]
}
},
"enableRestore": {
"label": "支持意外恢复",
"dftVal": false,
"enum": [
{
"val": true,
"label": "支持"
},
{
"val": false,
"label": ""
}
]
},
"restartStrategy": {
"label": "重启策略",
"dftVal": "off",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ However, the default state backend can be overridden on a per-job basis, as show

For more information about the available state backends, their advantages, limitations, and configuration parameters see the corresponding section in [Deployment & Operations](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/).

## enableRestore

支持任务恢复,当Flink节点因为服务器意外宕机导致当前运行的flink job意外终止,需要支持Flink Job恢复执行,

需要Flink配置支持:

1. 持久化stateBackend
2. 开启checkpoint




Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.qlangtech;

import com.qlangtech.plugins.incr.flink.common.TestFlinkCluster;
import com.qlangtech.plugins.incr.flink.launch.TestFlinkIncrJobStatus;
import com.qlangtech.plugins.incr.flink.launch.TestFlinkTaskNodeController;
import com.qlangtech.plugins.incr.flink.launch.TestTISFlinkCDCStreamFactory;
Expand All @@ -33,7 +34,8 @@

@RunWith(Suite.class)
@Suite.SuiteClasses(
{TestTISFlinkCDCStreamFactory.class,
{TestFlinkCluster.class,
TestTISFlinkCDCStreamFactory.class,
TestFlinkIncrJobStatus.class
, TestFlinkTaskNodeController.class
, TestFlinkClient.class
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.qlangtech.plugins.incr.flink.common;

import com.qlangtech.tis.trigger.util.JsonUtil;
import org.junit.Assert;
import org.junit.Test;

/**
* @author 百岁 ([email protected])
* @date 2023/10/4
*/
public class TestFlinkCluster {

@Test
public void testInstanceSerialize() {
FlinkCluster flinkCluster = new FlinkCluster();

flinkCluster.jobManagerAddress = "192.168.28.201:8081";
flinkCluster.clusterId = "my-first-flink-cluster";
flinkCluster.name = "flink200";

JsonUtil.assertJSONEqual(TestFlinkCluster.class, "flink-cluster-serialize.json"
, JsonUtil.toString(flinkCluster), (message, expected, actual) -> {
Assert.assertEquals(message, expected, actual);
});

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"clusterId":"my-first-flink-cluster",
"jobManagerAddress":{
"URL":"http://192.168.28.201:8081",
"host":"192.168.28.201",
"port":8081
},
"name":"flink200"
}

0 comments on commit b100002

Please sign in to comment.